diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000000000000000000000000000000000000..2038d8ecc81ce319906b66333458eb6eda9afc30 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,17 @@ +language: go + +go: + - "1.11" + - "1.12" + +env: + - GO111MODULE=on + +install: true + +script: + - go fmt ./... && [[ -z `git status -s` ]] + - go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/README.md b/README.md index 4c8cee70f54e11e0d727bf3d9ba321f3315223df..dcd882a35015b8f97fc45ee28297b50462722316 100644 --- a/README.md +++ b/README.md @@ -1,54 +1,53 @@ -# dubbo-go # +# go-for-apache-dubbo # --- -Apache Dubbo Golang Implementation. +Apache Dubbo Go Implementation. ## License Apache License, Version 2.0 ## Code design ## -Based on dubbo's layered design (protocol layer,registry layer,cluster layer,config layer and so on), +Based on dubbo's layered code design (protocol layer,registry layer,cluster layer,config layer and so on), About detail design please refer to [code layered design](https://github.com/dubbo/go-for-apache-dubbo/wiki/dubbo-go-V2.6-design) ## Feature list ## -+ Role: Consumer(√), Provider(√) +Finished List: -+ Transport: HTTP(√), TCP(√) Based on [getty](https://github.com/AlexStocks/getty) +- Role: Consumer(√), Provider(√) +- Transport: HTTP(√), TCP(√) +- Codec: JsonRPC v2(√), Hessian v2(√) +- Registry: ZooKeeper(√) +- Cluster Strategy: Failover(√) +- Load Balance: Random(√) +- Filter: Echo Health Check(√) -+ Codec: JsonRPC(√), Hessian(√) Based on [Hession2](https://github.com/dubbogo/hessian2) +Working List: -+ Registry: ZooKeeper(√) +- Cluster Strategy: Failfast/Failsafe/Failback/Forking +- Load Balance: RoundRobin/LeastActive/ConsistentHash +- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter +- Registry: etcd/k8s/consul -+ Cluster Strategy: Failover(√) +Todo List: -+ Load Balance: Random(√) +- routing rule (dubbo v2.6.x) +- monitoring (dubbo v2.6.x) +- metrics (dubbo v2.6.x) +- dynamic configuration (dubbo v2.7.x) -+ Filter: Echo(√) +You can know more about [dubbo-go](github.com/dubbo/go-dubbo) by its [roadmap](https://github.com/dubbo/go-for-apache-dubbo/wiki/Roadmap). -## Code Example +## Quick Start -The subdirectory examples shows how to use dubbo-go. Please read the examples/readme.md carefully to learn how to dispose the configuration and compile the program. +The subdirectory examples shows how to use go-for-apache-dubbo. Please read the examples/readme.md carefully to learn how to dispose the configuration and compile the program. +## Benchmark -## Todo list +Benchmark project please refer to [go-for-apache-dubbo-benchmark](https://github.com/dubbogo/go-for-apache-dubbo-benchmark) -Implement more extention: +About go-for-apache-dubbo benchmarking report, please refer to [dubbo benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-dubbo) & [jsonrpc benchmarking report](https://github.com/dubbo/go-for-apache-dubbo/wiki/pressure-test-report-for-jsonrpc) - * cluster strategy : Failfast/Failsafe/Failback/Forking/Broadcast - * load balance strategy: RoundRobin/LeastActive/ConsistentHash - * standard filter in dubbo: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter - * registry impl: consul/etcd/k8s - -Compatible with dubbo v2.7.x and not finished function in dubbo v2.6.x: - - * routing rule (dubbo v2.6.x) - - * monitoring (dubbo v2.6.x) - - * metrics (dubbo v2.6.x) - - * dynamic configuration (dubbo v2.7.x) diff --git a/cluster/loadbalance.go b/cluster/loadbalance.go index 4df6088100bf1957ecdeb649b6b973b86f8a09ec..8e1ff5d14946220ad8ad8d8304680a11412c6317 100644 --- a/cluster/loadbalance.go +++ b/cluster/loadbalance.go @@ -8,5 +8,3 @@ import ( type LoadBalance interface { Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker } - - diff --git a/cluster/loadbalance/random.go b/cluster/loadbalance/random.go index 556c503dde3224e572f9949e0d0e5bae2aefa97d..aad09f2fa316c36d522d97827be7bf6d33d4ad29 100644 --- a/cluster/loadbalance/random.go +++ b/cluster/loadbalance/random.go @@ -31,7 +31,7 @@ func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, invocation prot sameWeight := true weights := make([]int64, length) - firstWeight :=GetWeight(invokers[0], invocation) + firstWeight := GetWeight(invokers[0], invocation) totalWeight := firstWeight weights[0] = firstWeight diff --git a/common/proxy/proxy_test.go b/common/proxy/proxy_test.go index 73a3fb8ba92bd673cfb8f1d89f04b0f245ed8bf5..8a7c8c880c564664c849096d242320e09708f5fb 100644 --- a/common/proxy/proxy_test.go +++ b/common/proxy/proxy_test.go @@ -12,11 +12,14 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/protocol" ) type TestService struct { MethodOne func(context.Context, []interface{}, *struct{}) error + MethodTwo func([]interface{}, *struct{}) error + Echo func([]interface{}, *struct{}) error } func (s *TestService) Service() string { @@ -26,18 +29,30 @@ func (s *TestService) Version() string { return "" } +type TestServiceInt int + +func (s *TestServiceInt) Service() string { + return "com.test.TestServiceInt" +} +func (s *TestServiceInt) Version() string { + return "" +} + func TestProxy_Implement(t *testing.T) { invoker := protocol.NewBaseInvoker(common.URL{}) - p := NewProxy(invoker, nil, nil) - s := &TestService{MethodOne: func(i context.Context, i2 []interface{}, i3 *struct{}) error { - return errors.New("errors") - }} + p := NewProxy(invoker, nil, map[string]string{constant.ASYNC_KEY: "false"}) + s := &TestService{} p.Implement(s) err := p.Get().(*TestService).MethodOne(nil, nil, nil) assert.NoError(t, err) + err = p.Get().(*TestService).MethodTwo(nil, nil) + assert.NoError(t, err) + err = p.Get().(*TestService).Echo(nil, nil) + assert.NoError(t, err) // inherit & lowercase + p.rpc = nil type S1 struct { TestService methodOne func(context.Context, []interface{}, *struct{}) error @@ -51,7 +66,14 @@ func TestProxy_Implement(t *testing.T) { err = s1.methodOne(nil, nil, nil) assert.EqualError(t, err, "errors") + // no struct + p.rpc = nil + it := TestServiceInt(1) + p.Implement(&it) + assert.Nil(t, p.rpc) + // args number + p.rpc = nil type S2 struct { TestService MethodOne func([]interface{}) error @@ -61,6 +83,7 @@ func TestProxy_Implement(t *testing.T) { assert.Nil(t, s2.MethodOne) // returns number + p.rpc = nil type S3 struct { TestService MethodOne func(context.Context, []interface{}, *struct{}) (interface{}, error) @@ -70,6 +93,7 @@ func TestProxy_Implement(t *testing.T) { assert.Nil(t, s3.MethodOne) // returns type + p.rpc = nil type S4 struct { TestService MethodOne func(context.Context, []interface{}, *struct{}) interface{} @@ -79,6 +103,7 @@ func TestProxy_Implement(t *testing.T) { assert.Nil(t, s4.MethodOne) // reply type for number 3 + p.rpc = nil type S5 struct { TestService MethodOne func(context.Context, []interface{}, interface{}) error @@ -88,6 +113,7 @@ func TestProxy_Implement(t *testing.T) { assert.Nil(t, s5.MethodOne) // reply type for number 2 + p.rpc = nil type S6 struct { TestService MethodOne func([]interface{}, interface{}) error diff --git a/config/config_loader.go b/config/config_loader.go index 3b731323a9e5e6b0ca51a7bdd35b5298f214679c..6ff55a5765c90a33be22dd0dbf3c8ad5992bc72e 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -221,7 +221,7 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) { length := len(consumerConfig.References) for index := 0; index < length; index++ { con := &consumerConfig.References[index] - rpcService := conServices[con.InterfaceName] + rpcService := GetConService(con.InterfaceName) if rpcService == nil { log.Warn("%s is not exsist!", con.InterfaceName) continue @@ -264,7 +264,7 @@ func Load() (map[string]*ReferenceConfig, map[string]*ServiceConfig) { length := len(providerConfig.Services) for index := 0; index < length; index++ { pro := &providerConfig.Services[index] - rpcService := proServices[pro.InterfaceName] + rpcService := GetProService(pro.InterfaceName) if rpcService == nil { log.Warn("%s is not exsist!", pro.InterfaceName) continue diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 1b3e9a36cb28e7e5e65923bbb44e894f7ef07705..ed96da81e8e8ae418d7243c73c4dffde1c7bc197 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -1,6 +1,7 @@ package config import ( + "github.com/dubbo/go-for-apache-dubbo/common" "path/filepath" "testing" ) @@ -9,6 +10,11 @@ import ( "github.com/stretchr/testify/assert" ) +import ( + "github.com/dubbo/go-for-apache-dubbo/cluster/cluster_impl" + "github.com/dubbo/go-for-apache-dubbo/common/extension" +) + func TestConfigLoader(t *testing.T) { conPath, err := filepath.Abs("./consumer_config.yml") assert.NoError(t, err) @@ -16,7 +22,9 @@ func TestConfigLoader(t *testing.T) { assert.NoError(t, err) assert.Nil(t, consumerConfig) + assert.Equal(t, ConsumerConfig{}, GetConsumerConfig()) assert.Nil(t, providerConfig) + assert.Equal(t, ProviderConfig{}, GetProviderConfig()) err = consumerInit(conPath) assert.NoError(t, err) @@ -24,5 +32,29 @@ func TestConfigLoader(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, consumerConfig) + assert.NotEqual(t, ConsumerConfig{}, GetConsumerConfig()) assert.NotNil(t, providerConfig) + assert.NotEqual(t, ProviderConfig{}, GetProviderConfig()) +} + +func TestLoad(t *testing.T) { + doInit() + doinit() + + SetConService(&MockService{}) + SetProService(&MockService{}) + + extension.SetProtocol("registry", GetProtocol) + extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + consumerConfig.References[0].Registries = []ConfigRegistry{"shanghai_reg1"} + + refConfigs, svcConfigs := Load() + assert.NotEqual(t, 0, len(refConfigs)) + assert.NotEqual(t, 0, len(svcConfigs)) + + conServices = map[string]common.RPCService{} + proServices = map[string]common.RPCService{} + common.ServiceMap.UnRegister("mock", "MockService") + consumerConfig = nil + providerConfig = nil } diff --git a/config/mock_rpcservice.go b/config/mock_rpcservice.go index 1bd01bd4f5e3b629dbf7736c7db579c8c5968186..14641de4ef187f662a41393cec3d31b94f5025c9 100644 --- a/config/mock_rpcservice.go +++ b/config/mock_rpcservice.go @@ -6,7 +6,7 @@ type MockService struct { } func (*MockService) Service() string { - return "mockservice" + return "MockService" } func (*MockService) Version() string { return "1.0" diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 6e63b4bc3dce81f4f0d680aee9b22c7336ec05df..a2ba14a66a787e7419fc71a634cdc80178b0ee66 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -61,7 +61,7 @@ func doInit() { }, References: []ReferenceConfig{ { - InterfaceName: "testInterface", + InterfaceName: "MockService", Protocol: "mock", Registries: []ConfigRegistry{"shanghai_reg1", "shanghai_reg2", "hangzhou_reg1", "hangzhou_reg2"}, Cluster: "failover", diff --git a/config/service_config_test.go b/config/service_config_test.go index 9f991e56c7016e1f81a90290b1ddbc833903691e..f6874b15fbc9d7e79566acf2516c230876a2c040 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -55,7 +55,7 @@ func doinit() { }, Services: []ServiceConfig{ { - InterfaceName: "testInterface", + InterfaceName: "MockService", Protocol: "mock", Registries: []ConfigRegistry{"shanghai_reg1", "shanghai_reg2", "hangzhou_reg1", "hangzhou_reg2"}, Cluster: "failover", diff --git a/contributing.md b/contributing.md new file mode 100644 index 0000000000000000000000000000000000000000..cce54951db45b5c8e892ed9e00ef9b3ab27f84be --- /dev/null +++ b/contributing.md @@ -0,0 +1,26 @@ +Contributing to Dubbogo + +## 1. Branch + + >- The name of branches `SHOULD` be in the format of `feature/xxx`. + >- You `SHOULD` checkout a new branch after a feature branch already being merged into upstream, `DO NOT` commit in the old branch. + +## 2. Pull Request + +### 2.1. Title Format + +The pr head format is `<head> <subject>`. The title should be simpler to show your intent. + +The title format of the pull request `MUST` follow the following rules: + + >- Start with `Doc:` for adding/formatting/improving docs. + >- Start with `Mod:` for formatting codes or adding comment. + >- Start with `Fix:` for fixing bug, and its ending should be ` #issue-id` if being relevant to some issue. + >- Start with `Imp:` for improving performance. + >- Start with `Ftr:` for adding a new feature. + >- Start with `Add:` for adding struct function/member. + >- Start with `Rft:` for refactoring codes. + >- Start with `Tst:` for adding tests. + >- Start with `Dep:` for adding depending libs. + >- Start with `Rem:` for removing feature/struct/function/member/files. + diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000000000000000000000000000000000000..03658cf61bcb3fa9c09a42d96b61884dbb486d32 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,70 @@ +# examples + +Examples of go-for-apache-dubbo + +## dubbo + +#### Build by these command + +java server +```bash +cd dubbo/java-server +sh build.sh +``` + +java client +```bash +cd dubbo/java-client +sh build.sh +``` + +go server +```bash +cd dubbo/go-server +#linux, mac windows represent the os +#release, dev and test represent the environment +sh ./assembly/linux/release.sh +``` + +go client +```bash +cd dubbo/go-client +#linux, mac windows represent the os +#release, dev and test represent the environment +sh ./assembly/linux/release.sh +``` + +#### Run by these command: + +java server +```bash +cd dubbo/java-server/target +tar -zxvf user-info-server-0.2.0-assembly.tar.gz +cd ./user-info-server-0.2.0 +sh ./bin/server.sh start +``` + +java client +```bash +cd dubbo/java-client/target +tar -zxvf user-info-client-0.2.0-assembly.tar.gz +cd ./user-info-client-0.2.0 +sh ./bin/server.sh start +``` + +go server +> It must not listen on IP 127.0.0.1 when called by java-client. +> You should change IP in dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release/conf/server.yml +```bash +cd dubbo/go-server/target/linux/user_info_server-0.3.1-20190517-0930-release +sh ./bin/load.sh start +``` + +go client +```bash +cd dubbo/go-client/target/linux/user_info_client-0.3.1-20190517-0921-release +sh ./bin/load_user_info_client.sh start +``` + +## jsonrpc +Similar to dubbo diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index c094ea6a105bf89713bb016347dbbf8f0d54543c..7477a965ad5f1f12f6c5714ffbcdfbfa55d5de57 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -83,38 +83,38 @@ type CallOptions struct { type CallOption func(*CallOptions) -func WithCallRequestTimeout(d time.Duration) CallOption { - return func(o *CallOptions) { - o.RequestTimeout = d - } -} - -func WithCallResponseTimeout(d time.Duration) CallOption { - return func(o *CallOptions) { - o.ResponseTimeout = d - } -} - -func WithCallSerialID(s SerialID) CallOption { - return func(o *CallOptions) { - o.SerialID = s - } -} - -func WithCallMeta_All(callMeta map[interface{}]interface{}) CallOption { - return func(o *CallOptions) { - o.Meta = callMeta - } -} - -func WithCallMeta(k, v interface{}) CallOption { - return func(o *CallOptions) { - if o.Meta == nil { - o.Meta = make(map[interface{}]interface{}) - } - o.Meta[k] = v - } -} +//func WithCallRequestTimeout(d time.Duration) CallOption { +// return func(o *CallOptions) { +// o.RequestTimeout = d +// } +//} +// +//func WithCallResponseTimeout(d time.Duration) CallOption { +// return func(o *CallOptions) { +// o.ResponseTimeout = d +// } +//} +// +//func WithCallSerialID(s SerialID) CallOption { +// return func(o *CallOptions) { +// o.SerialID = s +// } +//} +// +//func WithCallMeta_All(callMeta map[interface{}]interface{}) CallOption { +// return func(o *CallOptions) { +// o.Meta = callMeta +// } +//} + +//func WithCallMeta(k, v interface{}) CallOption { +// return func(o *CallOptions) { +// if o.Meta == nil { +// o.Meta = make(map[interface{}]interface{}) +// } +// o.Meta[k] = v +// } +//} type CallResponse struct { Opts CallOptions @@ -197,6 +197,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/") p.Service.Target = svcUrl.GetParam(constant.INTERFACE_KEY, "") + p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "") p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION) p.Service.Method = method p.Service.Timeout = opts.RequestTimeout @@ -209,11 +210,14 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string var rsp *PendingResponse if ct != CT_OneWay { + p.Header.Type = hessian.PackageRequest_TwoWay rsp = NewPendingResponse() rsp.reply = reply rsp.callback = callback rsp.opts = opts } + // todo: it must be PackageRequest because of hessian2, but it is twoway actually + p.Header.Type = hessian.PackageRequest var ( err error @@ -280,8 +284,6 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, pkg.Body = []interface{}{} pkg.Header.Type = hessian.PackageHeartbeat pkg.Header.SerialID = byte(S_Dubbo) - } else { - pkg.Header.Type = hessian.PackageRequest } pkg.Header.ID = int64(sequence) diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a22135ba93b89961542345ddfef760777143ad47 --- /dev/null +++ b/protocol/dubbo/client_test.go @@ -0,0 +1,192 @@ +package dubbo + +import ( + "context" + "errors" + "sync" + "testing" + "time" +) + +import ( + "github.com/dubbogo/hessian2" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +type ( + User struct { + Id string `json:"id"` + Name string `json:"name"` + } + + UserProvider struct { + user map[string]User + } +) + +func TestClient_CallOneway(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + //user := &User{} + err := c.CallOneway("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}) + assert.NoError(t, err) + + // destroy + proto.Destroy() +} + +func TestClient_Call(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + user := &User{} + err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) + assert.NoError(t, err) + assert.Equal(t, User{Id: "1", Name: "username"}, *user) + + user = &User{} + err = c.Call("127.0.0.1:20000", url, "GetUser0", []interface{}{"1", "username"}, user) + assert.NoError(t, err) + assert.Equal(t, User{Id: "1", Name: "username"}, *user) + + // destroy + proto.Destroy() +} + +func TestClient_AsyncCall(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + user := &User{} + lock := sync.Mutex{} + lock.Lock() + err := c.AsyncCall("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, func(response CallResponse) { + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + lock.Unlock() + }, user) + assert.NoError(t, err) + assert.Equal(t, User{}, *user) + + // destroy + lock.Lock() + proto.Destroy() + lock.Unlock() +} + +func InitTest(t *testing.T) (protocol.Protocol, common.URL) { + + hessian.RegisterPOJO(&User{}) + + methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) + assert.NoError(t, err) + assert.Equal(t, "GetUser,GetUser0,GetUser1", methods) + + // config + SetClientConf(ClientConfig{ + ConnectionNum: 2, + HeartbeatPeriod: "5s", + SessionTimeout: "20s", + FailFastTimeout: "5s", + PoolTTL: 600, + PoolSize: 64, + GettySessionParam: GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgRQSize: 1024, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 1024, + SessionName: "client", + }, + }) + assert.NoError(t, clientConf.CheckValidity()) + SetServerConfig(ServerConfig{ + SessionNumber: 700, + SessionTimeout: "20s", + FailFastTimeout: "5s", + GettySessionParam: GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgRQSize: 1024, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 1024, + SessionName: "server", + }}) + assert.NoError(t, srvConf.CheckValidity()) + + // Export + proto := GetProtocol() + url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + proto.Export(protocol.NewBaseInvoker(url)) + + time.Sleep(time.Second * 2) + + return proto, url +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error { + rsp.Id = req[0].(string) + rsp.Name = req[1].(string) + return nil +} + +func (u *UserProvider) GetUser0(req []interface{}, rsp *User) error { + rsp.Id = req[0].(string) + rsp.Name = req[1].(string) + return nil +} + +func (u *UserProvider) GetUser1(ctx context.Context, req []interface{}, rsp *User) error { + return errors.New("error") +} + +func (u *UserProvider) Service() string { + return "com.ikurento.user.UserProvider" +} + +func (u *UserProvider) Version() string { + return "" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f044d9e822fdc614af8b66f4ab967ce2265f8fcd --- /dev/null +++ b/protocol/dubbo/codec_test.go @@ -0,0 +1,57 @@ +package dubbo + +import ( + "testing" + "time" +) + +import ( + "github.com/dubbogo/hessian2" + "github.com/stretchr/testify/assert" +) + +func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { + pkg := &DubboPackage{} + pkg.Body = []interface{}{"a"} + pkg.Header.Type = hessian.PackageHeartbeat + pkg.Header.SerialID = byte(S_Dubbo) + pkg.Header.ID = 10086 + + // heartbeat + data, err := pkg.Marshal() + assert.NoError(t, err) + + pkgres := &DubboPackage{} + pkgres.Body = []interface{}{} + err = pkgres.Unmarshal(data) + assert.NoError(t, err) + assert.Equal(t, hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type) + assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) + assert.Equal(t, int64(10086), pkgres.Header.ID) + assert.Equal(t, 0, len(pkgres.Body.([]interface{}))) + + // request + pkg.Header.Type = hessian.PackageRequest + pkg.Service.Interface = "Service" + pkg.Service.Target = "Service" + pkg.Service.Version = "2.6" + pkg.Service.Method = "Method" + pkg.Service.Timeout = time.Second + data, err = pkg.Marshal() + assert.NoError(t, err) + + pkgres = &DubboPackage{} + pkgres.Body = make([]interface{}, 7) + err = pkgres.Unmarshal(data) + assert.NoError(t, err) + assert.Equal(t, hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type) + assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) + assert.Equal(t, int64(10086), pkgres.Header.ID) + assert.Equal(t, "2.5.4", pkgres.Body.([]interface{})[0]) + assert.Equal(t, "Service", pkgres.Body.([]interface{})[1]) + assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2]) + assert.Equal(t, "Method", pkgres.Body.([]interface{})[3]) + assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4]) + assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5]) + assert.Equal(t, map[interface{}]interface{}{"interface": "Service", "path": "", "timeout": "1000"}, pkgres.Body.([]interface{})[6]) +} diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index c0e48183723a1f2b79a9380d9e0ceebbe2bb3e22..72da3294b9d5dc69c1b051b4e7690991966446f0 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -53,17 +53,17 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } else { result.Err = di.client.CallOneway(url.Location, url, inv.MethodName(), inv.Arguments()) } - log.Debug("result.Err: %v, result.Rest: %v", result.Err, result.Rest) } else { if inv.Reply() == nil { result.Err = Err_No_Reply } else { - result.Err = di.client.Call(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Reply()) - result.Rest = inv.Reply() // reply should be set to result.Rest when sync } - log.Debug("result.Err: %v, result.Rest: %v", result.Err, result.Rest) } + if result.Err == nil { + result.Rest = inv.Reply() + } + log.Debug("result.Err: %v, result.Rest: %v", result.Err, result.Rest) return &result } diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..505c0b3486963c9ec7de1e9bf6bcd770fdc8cfc5 --- /dev/null +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -0,0 +1,62 @@ +package dubbo + +import ( + "sync" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" +) + +func TestDubboInvoker_Invoke(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + invoker := NewDubboInvoker(url, c) + user := &User{} + + inv := invocation.NewRPCInvocationForConsumer("GetUser", nil, []interface{}{"1", "username"}, user, nil, url, nil) + + // Call + res := invoker.Invoke(inv) + assert.NoError(t, res.Error()) + assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) + + // CallOneway + inv.SetAttachments(constant.ASYNC_KEY, "true") + res = invoker.Invoke(inv) + assert.NoError(t, res.Error()) + + // AsyncCall + lock := sync.Mutex{} + lock.Lock() + inv.SetCallBack(func(response CallResponse) { + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + lock.Unlock() + }) + res = invoker.Invoke(inv) + assert.NoError(t, res.Error()) + + // Err_No_Reply + inv.SetAttachments(constant.ASYNC_KEY, "false") + inv.SetReply(nil) + res = invoker.Invoke(inv) + assert.EqualError(t, res.Error(), "request need @reply") + + // destroy + lock.Lock() + proto.Destroy() + lock.Unlock() +} diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 99a11feebd6861c4fd13d8721fcc094118fe45fd..f180ca63196ce707d4ce7f864621acf1d07511aa 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -183,11 +183,11 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { return } + twoway := true // not twoway if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 { + twoway = false h.reply(session, p, hessian.PackageResponse) - h.callService(p, nil) - return } invoker := h.exporter.GetInvoker() @@ -213,6 +213,9 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { } h.callService(p, nil) + if !twoway { + return + } h.reply(session, p, hessian.PackageResponse) } diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index d930a5e819df341e3494ef1b0cec225925b4cc99..ad1a93ef1b990af563914db6ed8d9d22559aad14 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -2,6 +2,7 @@ package dubbo import ( "bytes" + "github.com/dubbo/go-for-apache-dubbo/common/constant" "reflect" ) @@ -88,7 +89,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface dubboVersion = req[0].(string) } if req[1] != nil { - pkg.Service.Target = req[1].(string) + pkg.Service.Path = req[1].(string) } if req[2] != nil { pkg.Service.Version = req[2].(string) @@ -105,11 +106,12 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface if req[6] != nil { attachments = req[6].(map[interface{}]interface{}) } + pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) pkg.Body = map[string]interface{}{ "dubboVersion": dubboVersion, "argsTypes": argsTypes, "args": args, - "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Target), + "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Interface), "attachments": attachments, } } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 31a060a9abf3907de3778bf99d19ba3dba8f8847..fa4a514d172e23b18496ddeed42e371431da8e54 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -68,6 +68,10 @@ func (r *RPCInvocation) Reply() interface{} { return r.reply } +func (r *RPCInvocation) SetReply(reply interface{}) { + r.reply = reply +} + func (r *RPCInvocation) Attachments() map[string]string { return r.attachments } @@ -94,9 +98,13 @@ func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } -//func (r *RPCInvocation) SetInvoker() protocol.Invoker { -// return r.invoker -//} +func (r *RPCInvocation) SetInvoker() protocol.Invoker { + return r.invoker +} + +func (r *RPCInvocation) SetCallBack(c interface{}) { + r.callBack = c +} func (r *RPCInvocation) CallBack() interface{} { return r.callBack diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 8c681e8d06ce978ead14624604b1d5916ae4f20d..19025fa0deeda0e8f91d89042d6b09f932d4a00b 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -33,22 +33,20 @@ func TestHTTPClient_Call(t *testing.T) { methods, err := common.ServiceMap.Register("jsonrpc", &UserProvider{}) assert.NoError(t, err) - assert.Equal(t, "GetUser,GetUser1", methods) + assert.Equal(t, "GetUser,GetUser0,GetUser1", methods) // Export proto := GetProtocol() - url, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + url, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "side=provider&timeout=3000×tamp=1556509797245") assert.NoError(t, err) proto.Export(protocol.NewBaseInvoker(url)) + time.Sleep(time.Second * 2) - client := NewHTTPClient(&HTTPOptions{ - HandshakeTimeout: time.Second, - HTTPTimeout: time.Second, - }) + client := NewHTTPClient(&HTTPOptions{}) // call GetUser ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ @@ -63,6 +61,19 @@ func TestHTTPClient_Call(t *testing.T) { assert.Equal(t, "1", reply.Id) assert.Equal(t, "username", reply.Name) + // call GetUser + ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ + "X-Proxy-Id": "dubbogo", + "X-Services": url.Path, + "X-Method": "GetUser", + }) + req = client.NewRequest(url, "GetUser0", []interface{}{"1", "username"}) + reply = &User{} + err = client.Call(ctx, url, req, reply) + assert.NoError(t, err) + assert.Equal(t, "1", reply.Id) + assert.Equal(t, "username", reply.Name) + // call GetUser1 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", @@ -86,6 +97,12 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User return nil } +func (u *UserProvider) GetUser0(req []interface{}, rsp *User) error { + rsp.Id = req[0].(string) + rsp.Name = req[1].(string) + return nil +} + func (u *UserProvider) GetUser1(ctx context.Context, req []interface{}, rsp *User) error { return errors.New("error") } diff --git a/protocol/jsonrpc/json_test.go b/protocol/jsonrpc/json_test.go index b14afe7778a626fdeeeb71916ff8a51613a38258..4b83f181150316c00c1249348aea71fbf633031d 100644 --- a/protocol/jsonrpc/json_test.go +++ b/protocol/jsonrpc/json_test.go @@ -23,6 +23,10 @@ func TestJsonClientCodec_Write(t *testing.T) { data, err := codec.Write(cd) assert.NoError(t, err) assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n", string(data)) + + cd.Args = 1 + data, err = codec.Write(cd) + assert.EqualError(t, err, "unsupported param type: int") } func TestJsonClientCodec_Read(t *testing.T) { @@ -46,6 +50,10 @@ func TestServerCodec_Write(t *testing.T) { data, err := codec.Write("error", &TestData{Test: "test"}) assert.NoError(t, err) assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) + + data, err = codec.Write("{\"code\":-32000,\"message\":\"error\"}", &TestData{Test: "test"}) + assert.NoError(t, err) + assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) } func TestServerCodec_Read(t *testing.T) { diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index 645fe8b0d99fb322cb5a83ac66b0b1b3541868cf..e427c554f3323b5e1927817114709d87707cbd02 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -6,7 +6,6 @@ import ( import ( log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" ) import ( @@ -42,13 +41,11 @@ func (ji *JsonrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result "X-Services": url.Path, "X-Method": inv.MethodName(), }) - if err := ji.client.Call(ctx, url, req, inv.Reply()); err != nil { - log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) - result.Err = err - } else { - log.Debug("result: %v", inv.Reply()) + result.Err = ji.client.Call(ctx, url, req, inv.Reply()) + if result.Err == nil { result.Rest = inv.Reply() } + log.Debug("result.Err: %v, result.Rest: %v", result.Err, result.Rest) return &result } diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5e1bd3538c306c42c6502c1c51d1a82759db3967 --- /dev/null +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -0,0 +1,49 @@ +package jsonrpc + +import ( + "context" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" +) + +func TestJsonrpcInvoker_Invoke(t *testing.T) { + + methods, err := common.ServiceMap.Register("jsonrpc", &UserProvider{}) + assert.NoError(t, err) + assert.Equal(t, "GetUser,GetUser0,GetUser1", methods) + + // Export + proto := GetProtocol() + url, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + proto.Export(protocol.NewBaseInvoker(url)) + time.Sleep(time.Second * 2) + + client := NewHTTPClient(&HTTPOptions{ + HandshakeTimeout: time.Second, + HTTPTimeout: time.Second, + }) + + jsonInvoker := NewJsonrpcInvoker(url, client) + user := &User{} + res := jsonInvoker.Invoke(invocation.NewRPCInvocationForConsumer("GetUser", nil, []interface{}{"1", "username"}, user, nil, url, nil)) + assert.NoError(t, res.Error()) + assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) + + // destroy + proto.Destroy() +} diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index dba427b7102ead621909d57780ae23b580fc3c79..78c1b791e2b462fc58a9b519901b936f6f564a9d 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -367,13 +367,24 @@ func serveRequest(ctx context.Context, replyv := reflect.New(mtype.ReplyType().Elem()) // call service.method(args) - var errMsg string - returnValues := mtype.Method().Func.Call([]reflect.Value{ - svc.Rcvr(), - mtype.SuiteContext(ctx), - reflect.ValueOf(argvTmp), - reflect.ValueOf(replyv.Interface()), - }) + var ( + errMsg string + returnValues []reflect.Value + ) + if mtype.CtxType() == nil { + returnValues = mtype.Method().Func.Call([]reflect.Value{ + svc.Rcvr(), + reflect.ValueOf(argvTmp), + reflect.ValueOf(replyv.Interface()), + }) + } else { + returnValues = mtype.Method().Func.Call([]reflect.Value{ + svc.Rcvr(), + mtype.SuiteContext(ctx), + reflect.ValueOf(argvTmp), + reflect.ValueOf(replyv.Interface()), + }) + } // The return value for the method is an error. if retErr := returnValues[0].Interface(); retErr != nil { errMsg = retErr.(error).Error() diff --git a/protocol/protocol.go b/protocol/protocol.go index cd11b101d8211fa2bfe67aa5141d42f7c1642088..379a83b53859f63417cdc8f79b364bf83a4297ca 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -56,11 +56,11 @@ func (bp *BaseProtocol) Invokers() []Invoker { } func (bp *BaseProtocol) Export(invoker Invoker) Exporter { - return nil + return NewBaseExporter("base", invoker, bp.exporterMap) } func (bp *BaseProtocol) Refer(url common.URL) Invoker { - return nil + return NewBaseInvoker(url) } // Destroy will destroy all invoker and exporter, so it only is called once. diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a3e91add8f1647c5649b9338887cb5a02c935d19 --- /dev/null +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -0,0 +1,42 @@ +package protocolwrapper + +import ( + "net/url" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/common/extension" + "github.com/dubbo/go-for-apache-dubbo/filter/impl" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func TestProtocolFilterWrapper_Export(t *testing.T) { + filtProto := extension.GetProtocolExtension(FILTER) + filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} + + u := common.NewURLWithOptions("Service", + common.WithParams(url.Values{}), + common.WithParamsValue(constant.SERVICE_FILTER_KEY, impl.ECHO)) + exporter := filtProto.Export(protocol.NewBaseInvoker(*u)) + _, ok := exporter.GetInvoker().(*FilterInvoker) + assert.True(t, ok) +} + +func TestProtocolFilterWrapper_Refer(t *testing.T) { + filtProto := extension.GetProtocolExtension(FILTER) + filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} + + u := common.NewURLWithOptions("Service", + common.WithParams(url.Values{}), + common.WithParamsValue(constant.REFERENCE_FILTER_KEY, impl.ECHO)) + invoker := filtProto.Refer(*u) + _, ok := invoker.(*FilterInvoker) + assert.True(t, ok) +}