Skip to content
Snippets Groups Projects
Commit 4bb87299 authored by vito.he's avatar vito.he
Browse files

Merge branch 'develop' of https://github.com/dubbo/go-for-apache-dubbo into develop

parents 59c74421 99e87889
No related branches found
No related tags found
No related merge requests found
Showing
with 579 additions and 85 deletions
language: go
go:
- "1.11"
- "1.12"
env:
- GO111MODULE=on
install: true
script:
- go fmt ./... && [[ -z `git status -s` ]]
- go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
after_success:
- bash <(curl -s https://codecov.io/bash)
# dubbo-go #
# go-for-apache-dubbo #
---
Apache Dubbo Golang Implementation.
Apache Dubbo Go Implementation.
## License
Apache License, Version 2.0
## Code design ##
Based on dubbo's layered design (protocol layer,registry layer,cluster layer,config layer and so on),
Based on dubbo's layered code design (protocol layer,registry layer,cluster layer,config layer and so on),
About detail design please refer to [code layered design](https://github.com/dubbo/go-for-apache-dubbo/wiki/dubbo-go-V2.6-design)
## Feature list ##
+ Role: Consumer(√), Provider(√)
Finished List:
+ Transport: HTTP(√), TCP(√) Based on [getty](https://github.com/AlexStocks/getty)
- Role: Consumer(√), Provider(√)
- Transport: HTTP(√), TCP(√)
- Codec: JsonRPC v2(√), Hessian v2(√)
- Registry: ZooKeeper(√)
- Cluster Strategy: Failover(√)
- Load Balance: Random(√)
- Filter: Echo Health Check(√)
+ Codec: JsonRPC(√), Hessian(√) Based on [Hession2](https://github.com/dubbogo/hessian2)
Working List:
+ Registry: ZooKeeper(√)
- Cluster Strategy: Failfast/Failsafe/Failback/Forking
- Load Balance: RoundRobin/LeastActive/ConsistentHash
- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
- Registry: etcd/k8s/consul
+ Cluster Strategy: Failover(√)
Todo List:
+ Load Balance: Random(√)
- routing rule (dubbo v2.6.x)
- monitoring (dubbo v2.6.x)
- metrics (dubbo v2.6.x)
- dynamic configuration (dubbo v2.7.x)
+ Filter: Echo(√)
You can know more about [dubbo-go](github.com/dubbo/go-dubbo) by its [roadmap](https://github.com/dubbo/go-for-apache-dubbo/wiki/Roadmap).
## Code Example
## Quick Start
The subdirectory examples shows how to use dubbo-go. Please read the examples/readme.md carefully to learn how to dispose the configuration and compile the program.
The subdirectory examples shows how to use go-for-apache-dubbo. Please read the examples/readme.md carefully to learn how to dispose the configuration and compile the program.
## Benchmark
## Todo list
Benchmark project please refer to [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark)
Implement more extention:
About go-for-apache-dubbo benchmarking report, please refer to [dubbo benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-dubbo) & [jsonrpc benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-jsonrpc)
* cluster strategy : Failfast/Failsafe/Failback/Forking/Broadcast
* load balance strategy: RoundRobin/LeastActive/ConsistentHash
* standard filter in dubbo: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter
* registry impl: consul/etcd/k8s
Compatible with dubbo v2.7.x and not finished function in dubbo v2.6.x:
* routing rule (dubbo v2.6.x)
* monitoring (dubbo v2.6.x)
* metrics (dubbo v2.6.x)
* dynamic configuration (dubbo v2.7.x)
......@@ -8,5 +8,3 @@ import (
type LoadBalance interface {
Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker
}
......@@ -31,7 +31,7 @@ func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, invocation prot
sameWeight := true
weights := make([]int64, length)
firstWeight :=GetWeight(invokers[0], invocation)
firstWeight := GetWeight(invokers[0], invocation)
totalWeight := firstWeight
weights[0] = firstWeight
......
......@@ -12,11 +12,14 @@ import (
import (
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type TestService struct {
MethodOne func(context.Context, []interface{}, *struct{}) error
MethodTwo func([]interface{}, *struct{}) error
Echo func([]interface{}, *struct{}) error
}
func (s *TestService) Service() string {
......@@ -26,18 +29,30 @@ func (s *TestService) Version() string {
return ""
}
type TestServiceInt int
func (s *TestServiceInt) Service() string {
return "com.test.TestServiceInt"
}
func (s *TestServiceInt) Version() string {
return ""
}
func TestProxy_Implement(t *testing.T) {
invoker := protocol.NewBaseInvoker(common.URL{})
p := NewProxy(invoker, nil, nil)
s := &TestService{MethodOne: func(i context.Context, i2 []interface{}, i3 *struct{}) error {
return errors.New("errors")
}}
p := NewProxy(invoker, nil, map[string]string{constant.ASYNC_KEY: "false"})
s := &TestService{}
p.Implement(s)
err := p.Get().(*TestService).MethodOne(nil, nil, nil)
assert.NoError(t, err)
err = p.Get().(*TestService).MethodTwo(nil, nil)
assert.NoError(t, err)
err = p.Get().(*TestService).Echo(nil, nil)
assert.NoError(t, err)
// inherit & lowercase
p.rpc = nil
type S1 struct {
TestService
methodOne func(context.Context, []interface{}, *struct{}) error
......@@ -51,7 +66,14 @@ func TestProxy_Implement(t *testing.T) {
err = s1.methodOne(nil, nil, nil)
assert.EqualError(t, err, "errors")
// no struct
p.rpc = nil
it := TestServiceInt(1)
p.Implement(&it)
assert.Nil(t, p.rpc)
// args number
p.rpc = nil
type S2 struct {
TestService
MethodOne func([]interface{}) error
......@@ -61,6 +83,7 @@ func TestProxy_Implement(t *testing.T) {
assert.Nil(t, s2.MethodOne)
// returns number
p.rpc = nil
type S3 struct {
TestService
MethodOne func(context.Context, []interface{}, *struct{}) (interface{}, error)
......@@ -70,6 +93,7 @@ func TestProxy_Implement(t *testing.T) {
assert.Nil(t, s3.MethodOne)
// returns type
p.rpc = nil
type S4 struct {
TestService
MethodOne func(context.Context, []interface{}, *struct{}) interface{}
......@@ -79,6 +103,7 @@ func TestProxy_Implement(t *testing.T) {
assert.Nil(t, s4.MethodOne)
// reply type for number 3
p.rpc = nil
type S5 struct {
TestService
MethodOne func(context.Context, []interface{}, interface{}) error
......@@ -88,6 +113,7 @@ func TestProxy_Implement(t *testing.T) {
assert.Nil(t, s5.MethodOne)
// reply type for number 2
p.rpc = nil
type S6 struct {
TestService
MethodOne func([]interface{}, interface{}) error
......
......@@ -221,7 +221,7 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) {
length := len(consumerConfig.References)
for index := 0; index < length; index++ {
con := &consumerConfig.References[index]
rpcService := conServices[con.InterfaceName]
rpcService := GetConService(con.InterfaceName)
if rpcService == nil {
log.Warn("%s is not exsist!", con.InterfaceName)
continue
......@@ -264,7 +264,7 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) {
length := len(providerConfig.Services)
for index := 0; index < length; index++ {
pro := &providerConfig.Services[index]
rpcService := proServices[pro.InterfaceName]
rpcService := GetProService(pro.InterfaceName)
if rpcService == nil {
log.Warn("%s is not exsist!", pro.InterfaceName)
continue
......
package config
import (
"github.com/dubbo/go-for-apache-dubbo/common"
"path/filepath"
"testing"
)
......@@ -9,6 +10,11 @@ import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
)
func TestConfigLoader(t *testing.T) {
conPath, err := filepath.Abs("./consumer_config.yml")
assert.NoError(t, err)
......@@ -16,7 +22,9 @@ func TestConfigLoader(t *testing.T) {
assert.NoError(t, err)
assert.Nil(t, consumerConfig)
assert.Equal(t, ConsumerConfig{}, GetConsumerConfig())
assert.Nil(t, providerConfig)
assert.Equal(t, ProviderConfig{}, GetProviderConfig())
err = consumerInit(conPath)
assert.NoError(t, err)
......@@ -24,5 +32,29 @@ func TestConfigLoader(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, consumerConfig)
assert.NotEqual(t, ConsumerConfig{}, GetConsumerConfig())
assert.NotNil(t, providerConfig)
assert.NotEqual(t, ProviderConfig{}, GetProviderConfig())
}
func TestLoad(t *testing.T) {
doInit()
doinit()
SetConService(&MockService{})
SetProService(&MockService{})
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster)
consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"}
refConfigs, svcConfigs := Load()
assert.NotEqual(t, 0, len(refConfigs))
assert.NotEqual(t, 0, len(svcConfigs))
conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
common.ServiceMap.UnRegister("mock", "MockService")
consumerConfig = nil
providerConfig = nil
}
......@@ -6,7 +6,7 @@ type MockService struct {
}
func (*MockService) Service() string {
return "mockservice"
return "MockService"
}
func (*MockService) Version() string {
return "1.0"
......
......@@ -61,7 +61,7 @@ func doInit() {
},
References: []ReferenceConfig{
{
InterfaceName: "testInterface",
InterfaceName: "MockService",
Protocol: "mock",
Registries: []ConfigRegistry{"shanghai_reg1", "shanghai_reg2", "hangzhou_reg1", "hangzhou_reg2"},
Cluster: "failover",
......
......@@ -55,7 +55,7 @@ func doinit() {
},
Services: []ServiceConfig{
{
InterfaceName: "testInterface",
InterfaceName: "MockService",
Protocol: "mock",
Registries: []ConfigRegistry{"shanghai_reg1", "shanghai_reg2", "hangzhou_reg1", "hangzhou_reg2"},
Cluster: "failover",
......
Contributing to Dubbogo
## 1. Branch
>- The name of branches `SHOULD` be in the format of `feature/xxx`.
>- You `SHOULD` checkout a new branch after a feature branch already being merged into upstream, `DO NOT` commit in the old branch.
## 2. Pull Request
### 2.1. Title Format
The pr head format is `<head> <subject>`. The title should be simpler to show your intent.
The title format of the pull request `MUST` follow the following rules:
>- Start with `Doc:` for adding/formatting/improving docs.
>- Start with `Mod:` for formatting codes or adding comment.
>- Start with `Fix:` for fixing bug, and its ending should be ` #issue-id` if being relevant to some issue.
>- Start with `Imp:` for improving performance.
>- Start with `Ftr:` for adding a new feature.
>- Start with `Add:` for adding struct function/member.
>- Start with `Rft:` for refactoring codes.
>- Start with `Tst:` for adding tests.
>- Start with `Dep:` for adding depending libs.
>- Start with `Rem:` for removing feature/struct/function/member/files.
# examples
Examples of go-for-apache-dubbo
## dubbo
#### Build by these command
java server
```bash
cd dubbo/java-server
sh build.sh
```
java client
```bash
cd dubbo/java-client
sh build.sh
```
go server
```bash
cd dubbo/go-server
#linux, mac windows represent the os
#release, dev and test represent the environment
sh ./assembly/linux/release.sh
```
go client
```bash
cd dubbo/go-client
#linux, mac windows represent the os
#release, dev and test represent the environment
sh ./assembly/linux/release.sh
```
#### Run by these command:
java server
```bash
cd dubbo/java-server/target
tar -zxvf user-info-server-0.2.0-assembly.tar.gz
cd ./user-info-server-0.2.0
sh ./bin/server.sh start
```
java client
```bash
cd dubbo/java-client/target
tar -zxvf user-info-client-0.2.0-assembly.tar.gz
cd ./user-info-client-0.2.0
sh ./bin/server.sh start
```
go server
> It must not listen on IP 127.0.0.1 when called by java-client.
> You should change IP in dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release/conf/server.yml
```bash
cd dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release
sh ./bin/load.sh start
```
go client
```bash
cd dubbo/go-client/target/linux/user_info_client-0.3.1-20190517-0921-release
sh ./bin/load_user_info_client.sh start
```
## jsonrpc
Similar to dubbo
......@@ -83,38 +83,38 @@ type CallOptions struct {
type CallOption func(*CallOptions)
func WithCallRequestTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.RequestTimeout = d
}
}
func WithCallResponseTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
o.ResponseTimeout = d
}
}
func WithCallSerialID(s SerialID) CallOption {
return func(o *CallOptions) {
o.SerialID = s
}
}
func WithCallMeta_All(callMeta map[interface{}]interface{}) CallOption {
return func(o *CallOptions) {
o.Meta = callMeta
}
}
func WithCallMeta(k, v interface{}) CallOption {
return func(o *CallOptions) {
if o.Meta == nil {
o.Meta = make(map[interface{}]interface{})
}
o.Meta[k] = v
}
}
//func WithCallRequestTimeout(d time.Duration) CallOption {
// return func(o *CallOptions) {
// o.RequestTimeout = d
// }
//}
//
//func WithCallResponseTimeout(d time.Duration) CallOption {
// return func(o *CallOptions) {
// o.ResponseTimeout = d
// }
//}
//
//func WithCallSerialID(s SerialID) CallOption {
// return func(o *CallOptions) {
// o.SerialID = s
// }
//}
//
//func WithCallMeta_All(callMeta map[interface{}]interface{}) CallOption {
// return func(o *CallOptions) {
// o.Meta = callMeta
// }
//}
//func WithCallMeta(k, v interface{}) CallOption {
// return func(o *CallOptions) {
// if o.Meta == nil {
// o.Meta = make(map[interface{}]interface{})
// }
// o.Meta[k] = v
// }
//}
type CallResponse struct {
Opts CallOptions
......@@ -197,6 +197,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string
p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/")
p.Service.Target = svcUrl.GetParam(constant.INTERFACE_KEY, "")
p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "")
p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION)
p.Service.Method = method
p.Service.Timeout = opts.RequestTimeout
......@@ -209,11 +210,14 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string
var rsp *PendingResponse
if ct != CT_OneWay {
p.Header.Type = hessian.PackageRequest_TwoWay
rsp = NewPendingResponse()
rsp.reply = reply
rsp.callback = callback
rsp.opts = opts
}
// todo: it must be PackageRequest because of hessian2, but it is twoway actually
p.Header.Type = hessian.PackageRequest
var (
err error
......@@ -280,8 +284,6 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
pkg.Body = []interface{}{}
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
} else {
pkg.Header.Type = hessian.PackageRequest
}
pkg.Header.ID = int64(sequence)
......
package dubbo
import (
"context"
"errors"
"sync"
"testing"
"time"
)
import (
"github.com/dubbogo/hessian2"
"github.com/stretchr/testify/assert"
)
import (
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type (
User struct {
Id string `json:"id"`
Name string `json:"name"`
}
UserProvider struct {
user map[string]User
}
)
func TestClient_CallOneway(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
//user := &User{}
err := c.CallOneway("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"})
assert.NoError(t, err)
// destroy
proto.Destroy()
}
func TestClient_Call(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
user := &User{}
err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user)
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
user = &User{}
err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", "username"}, user)
assert.NoError(t, err)
assert.Equal(t, User{Id: "1", Name: "username"}, *user)
// destroy
proto.Destroy()
}
func TestClient_AsyncCall(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
user := &User{}
lock := sync.Mutex{}
lock.Lock()
err := c.AsyncCall("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, func(response CallResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User))
lock.Unlock()
}, user)
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
// destroy
lock.Lock()
proto.Destroy()
lock.Unlock()
}
func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
hessian.RegisterPOJO(&User{})
methods, err := common.ServiceMap.Register("dubbo", &UserProvider{})
assert.NoError(t, err)
assert.Equal(t, "GetUser,GetUser0,GetUser1", methods)
// config
SetClientConf(ClientConfig{
ConnectionNum: 2,
HeartbeatPeriod: "5s",
SessionTimeout: "20s",
FailFastTimeout: "5s",
PoolTTL: 600,
PoolSize: 64,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgRQSize: 1024,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 1024,
SessionName: "client",
},
})
assert.NoError(t, clientConf.CheckValidity())
SetServerConfig(ServerConfig{
SessionNumber: 700,
SessionTimeout: "20s",
FailFastTimeout: "5s",
GettySessionParam: GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgRQSize: 1024,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 1024,
SessionName: "server",
}})
assert.NoError(t, srvConf.CheckValidity())
// Export
proto := GetProtocol()
url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+
"side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err)
proto.Export(protocol.NewBaseInvoker(url))
time.Sleep(time.Second * 2)
return proto, url
}
func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error {
rsp.Id = req[0].(string)
rsp.Name = req[1].(string)
return nil
}
func (u *UserProvider) GetUser0(req []interface{}, rsp *User) error {
rsp.Id = req[0].(string)
rsp.Name = req[1].(string)
return nil
}
func (u *UserProvider) GetUser1(ctx context.Context, req []interface{}, rsp *User) error {
return errors.New("error")
}
func (u *UserProvider) Service() string {
return "com.ikurento.user.UserProvider"
}
func (u *UserProvider) Version() string {
return ""
}
func (u User) JavaClassName() string {
return "com.ikurento.user.User"
}
package dubbo
import (
"testing"
"time"
)
import (
"github.com/dubbogo/hessian2"
"github.com/stretchr/testify/assert"
)
func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) {
pkg := &DubboPackage{}
pkg.Body = []interface{}{"a"}
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
pkg.Header.ID = 10086
// heartbeat
data, err := pkg.Marshal()
assert.NoError(t, err)
pkgres := &DubboPackage{}
pkgres.Body = []interface{}{}
err = pkgres.Unmarshal(data)
assert.NoError(t, err)
assert.Equal(t, hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type)
assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
assert.Equal(t, 0, len(pkgres.Body.([]interface{})))
// request
pkg.Header.Type = hessian.PackageRequest
pkg.Service.Interface = "Service"
pkg.Service.Target = "Service"
pkg.Service.Version = "2.6"
pkg.Service.Method = "Method"
pkg.Service.Timeout = time.Second
data, err = pkg.Marshal()
assert.NoError(t, err)
pkgres = &DubboPackage{}
pkgres.Body = make([]interface{}, 7)
err = pkgres.Unmarshal(data)
assert.NoError(t, err)
assert.Equal(t, hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type)
assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID)
assert.Equal(t, int64(10086), pkgres.Header.ID)
assert.Equal(t, "2.5.4", pkgres.Body.([]interface{})[0])
assert.Equal(t, "Service", pkgres.Body.([]interface{})[1])
assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2])
assert.Equal(t, "Method", pkgres.Body.([]interface{})[3])
assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4])
assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5])
assert.Equal(t, map[interface{}]interface{}{"interface": "Service", "path": "", "timeout": "1000"}, pkgres.Body.([]interface{})[6])
}
......@@ -53,17 +53,17 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
} else {
result.Err = di.client.CallOneway(url.Location, url, inv.MethodName(), inv.Arguments())
}
log.Debug("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
} else {
if inv.Reply() == nil {
result.Err = Err_No_Reply
} else {
result.Err = di.client.Call(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Reply())
result.Rest = inv.Reply() // reply should be set to result.Rest when sync
}
log.Debug("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
}
if result.Err == nil {
result.Rest = inv.Reply()
}
log.Debug("result.Err: %v, result.Rest: %v", result.Err, result.Rest)
return &result
}
......
package dubbo
import (
"sync"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/protocol/invocation"
)
func TestDubboInvoker_Invoke(t *testing.T) {
proto, url := InitTest(t)
c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
invoker := NewDubboInvoker(url, c)
user := &User{}
inv := invocation.NewRPCInvocationForConsumer("GetUser", nil, []interface{}{"1", "username"}, user, nil, url, nil)
// Call
res := invoker.Invoke(inv)
assert.NoError(t, res.Error())
assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User))
// CallOneway
inv.SetAttachments(constant.ASYNC_KEY, "true")
res = invoker.Invoke(inv)
assert.NoError(t, res.Error())
// AsyncCall
lock := sync.Mutex{}
lock.Lock()
inv.SetCallBack(func(response CallResponse) {
assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User))
lock.Unlock()
})
res = invoker.Invoke(inv)
assert.NoError(t, res.Error())
// Err_No_Reply
inv.SetAttachments(constant.ASYNC_KEY, "false")
inv.SetReply(nil)
res = invoker.Invoke(inv)
assert.EqualError(t, res.Error(), "request need @reply")
// destroy
lock.Lock()
proto.Destroy()
lock.Unlock()
}
......@@ -183,11 +183,11 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
return
}
twoway := true
// not twoway
if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 {
twoway = false
h.reply(session, p, hessian.PackageResponse)
h.callService(p, nil)
return
}
invoker := h.exporter.GetInvoker()
......@@ -213,6 +213,9 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) {
}
h.callService(p, nil)
if !twoway {
return
}
h.reply(session, p, hessian.PackageResponse)
}
......
......@@ -2,6 +2,7 @@ package dubbo
import (
"bytes"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"reflect"
)
......@@ -88,7 +89,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
dubboVersion = req[0].(string)
}
if req[1] != nil {
pkg.Service.Target = req[1].(string)
pkg.Service.Path = req[1].(string)
}
if req[2] != nil {
pkg.Service.Version = req[2].(string)
......@@ -105,11 +106,12 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface
if req[6] != nil {
attachments = req[6].(map[interface{}]interface{})
}
pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string)
pkg.Body = map[string]interface{}{
"dubboVersion": dubboVersion,
"argsTypes": argsTypes,
"args": args,
"service": common.ServiceMap.GetService(DUBBO, pkg.Service.Target),
"service": common.ServiceMap.GetService(DUBBO, pkg.Service.Interface),
"attachments": attachments,
}
}
......
......@@ -68,6 +68,10 @@ func (r *RPCInvocation) Reply() interface{} {
return r.reply
}
func (r *RPCInvocation) SetReply(reply interface{}) {
r.reply = reply
}
func (r *RPCInvocation) Attachments() map[string]string {
return r.attachments
}
......@@ -94,9 +98,13 @@ func (r *RPCInvocation) Invoker() protocol.Invoker {
return r.invoker
}
//func (r *RPCInvocation) SetInvoker() protocol.Invoker {
// return r.invoker
//}
func (r *RPCInvocation) SetInvoker() protocol.Invoker {
return r.invoker
}
func (r *RPCInvocation) SetCallBack(c interface{}) {
r.callBack = c
}
func (r *RPCInvocation) CallBack() interface{} {
return r.callBack
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment