diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 1b3e9a36cb28e7e5e65923bbb44e894f7ef07705..b67835d565c01588029217ba7da694cca2c7e33c 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -16,7 +16,9 @@ func TestConfigLoader(t *testing.T) { assert.NoError(t, err) assert.Nil(t, consumerConfig) + assert.Equal(t, ConsumerConfig{}, GetConsumerConfig()) assert.Nil(t, providerConfig) + assert.Equal(t, ProviderConfig{}, GetProviderConfig()) err = consumerInit(conPath) assert.NoError(t, err) @@ -24,5 +26,7 @@ func TestConfigLoader(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, consumerConfig) + assert.NotEqual(t, ConsumerConfig{}, GetConsumerConfig()) assert.NotNil(t, providerConfig) + assert.NotEqual(t, ProviderConfig{}, GetProviderConfig()) } diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index c094ea6a105bf89713bb016347dbbf8f0d54543c..6c50f51a250258b997b12637774c46c3d83806c0 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -197,6 +197,7 @@ func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string p := &DubboPackage{} p.Service.Path = strings.TrimPrefix(svcUrl.Path, "/") p.Service.Target = svcUrl.GetParam(constant.INTERFACE_KEY, "") + p.Service.Interface = svcUrl.GetParam(constant.INTERFACE_KEY, "") p.Service.Version = svcUrl.GetParam(constant.VERSION_KEY, constant.DEFAULT_VERSION) p.Service.Method = method p.Service.Timeout = opts.RequestTimeout diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7febfcc597caacddaf7152b02a640847cef34305 --- /dev/null +++ b/protocol/dubbo/client_test.go @@ -0,0 +1,179 @@ +package dubbo + +import ( + "context" + "errors" + "github.com/dubbogo/hessian2" + "sync" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +type ( + User struct { + Id string `json:"id"` + Name string `json:"name"` + } + + UserProvider struct { + user map[string]User + } +) + +func TestClient_CallOneway(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + //user := &User{} + err := c.CallOneway("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}) + assert.NoError(t, err) + + // destroy + proto.Destroy() +} + +func TestClient_Call(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + user := &User{} + err := c.Call("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, user) + assert.NoError(t, err) + assert.Equal(t, User{Id: "1", Name: "username"}, *user) + + // destroy + proto.Destroy() +} + +func TestClient_AsyncCall(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + user := &User{} + lock := sync.Mutex{} + lock.Lock() + err := c.AsyncCall("127.0.0.1:20000", url, "GetUser", []interface{}{"1", "username"}, func(response CallResponse) { + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + lock.Unlock() + }, user) + assert.NoError(t, err) + assert.Equal(t, User{}, *user) + + // destroy + lock.Lock() + proto.Destroy() + lock.Unlock() +} + +func InitTest(t *testing.T) (protocol.Protocol, common.URL) { + + hessian.RegisterPOJO(&User{}) + + methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) + assert.NoError(t, err) + assert.Equal(t, "GetUser,GetUser1", methods) + + // config + SetClientConf(ClientConfig{ + ConnectionNum: 2, + HeartbeatPeriod: "5s", + SessionTimeout: "20s", + FailFastTimeout: "5s", + PoolTTL: 600, + PoolSize: 64, + GettySessionParam: GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgRQSize: 1024, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 1024, + SessionName: "client", + }, + }) + assert.NoError(t, clientConf.CheckValidity()) + SetServerConfig(ServerConfig{ + SessionNumber: 700, + SessionTimeout: "20s", + FailFastTimeout: "5s", + GettySessionParam: GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgRQSize: 1024, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 1024, + SessionName: "server", + }}) + assert.NoError(t, srvConf.CheckValidity()) + + // Export + proto := GetProtocol() + url, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + proto.Export(protocol.NewBaseInvoker(url)) + + return proto, url +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User) error { + rsp.Id = req[0].(string) + rsp.Name = req[1].(string) + return nil +} + +func (u *UserProvider) GetUser1(ctx context.Context, req []interface{}, rsp *User) error { + return errors.New("error") +} + +func (u *UserProvider) Service() string { + return "com.ikurento.user.UserProvider" +} + +func (u *UserProvider) Version() string { + return "" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f044d9e822fdc614af8b66f4ab967ce2265f8fcd --- /dev/null +++ b/protocol/dubbo/codec_test.go @@ -0,0 +1,57 @@ +package dubbo + +import ( + "testing" + "time" +) + +import ( + "github.com/dubbogo/hessian2" + "github.com/stretchr/testify/assert" +) + +func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { + pkg := &DubboPackage{} + pkg.Body = []interface{}{"a"} + pkg.Header.Type = hessian.PackageHeartbeat + pkg.Header.SerialID = byte(S_Dubbo) + pkg.Header.ID = 10086 + + // heartbeat + data, err := pkg.Marshal() + assert.NoError(t, err) + + pkgres := &DubboPackage{} + pkgres.Body = []interface{}{} + err = pkgres.Unmarshal(data) + assert.NoError(t, err) + assert.Equal(t, hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type) + assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) + assert.Equal(t, int64(10086), pkgres.Header.ID) + assert.Equal(t, 0, len(pkgres.Body.([]interface{}))) + + // request + pkg.Header.Type = hessian.PackageRequest + pkg.Service.Interface = "Service" + pkg.Service.Target = "Service" + pkg.Service.Version = "2.6" + pkg.Service.Method = "Method" + pkg.Service.Timeout = time.Second + data, err = pkg.Marshal() + assert.NoError(t, err) + + pkgres = &DubboPackage{} + pkgres.Body = make([]interface{}, 7) + err = pkgres.Unmarshal(data) + assert.NoError(t, err) + assert.Equal(t, hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type) + assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) + assert.Equal(t, int64(10086), pkgres.Header.ID) + assert.Equal(t, "2.5.4", pkgres.Body.([]interface{})[0]) + assert.Equal(t, "Service", pkgres.Body.([]interface{})[1]) + assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2]) + assert.Equal(t, "Method", pkgres.Body.([]interface{})[3]) + assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4]) + assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5]) + assert.Equal(t, map[interface{}]interface{}{"interface": "Service", "path": "", "timeout": "1000"}, pkgres.Body.([]interface{})[6]) +} diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..505c0b3486963c9ec7de1e9bf6bcd770fdc8cfc5 --- /dev/null +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -0,0 +1,62 @@ +package dubbo + +import ( + "sync" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" +) + +func TestDubboInvoker_Invoke(t *testing.T) { + proto, url := InitTest(t) + + c := &Client{ + pendingResponses: make(map[SequenceType]*PendingResponse), + conf: *clientConf, + } + c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) + + invoker := NewDubboInvoker(url, c) + user := &User{} + + inv := invocation.NewRPCInvocationForConsumer("GetUser", nil, []interface{}{"1", "username"}, user, nil, url, nil) + + // Call + res := invoker.Invoke(inv) + assert.NoError(t, res.Error()) + assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) + + // CallOneway + inv.SetAttachments(constant.ASYNC_KEY, "true") + res = invoker.Invoke(inv) + assert.NoError(t, res.Error()) + + // AsyncCall + lock := sync.Mutex{} + lock.Lock() + inv.SetCallBack(func(response CallResponse) { + assert.Equal(t, User{Id: "1", Name: "username"}, *response.Reply.(*User)) + lock.Unlock() + }) + res = invoker.Invoke(inv) + assert.NoError(t, res.Error()) + + // Err_No_Reply + inv.SetAttachments(constant.ASYNC_KEY, "false") + inv.SetReply(nil) + res = invoker.Invoke(inv) + assert.EqualError(t, res.Error(), "request need @reply") + + // destroy + lock.Lock() + proto.Destroy() + lock.Unlock() +} diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index d930a5e819df341e3494ef1b0cec225925b4cc99..ad1a93ef1b990af563914db6ed8d9d22559aad14 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -2,6 +2,7 @@ package dubbo import ( "bytes" + "github.com/dubbo/go-for-apache-dubbo/common/constant" "reflect" ) @@ -88,7 +89,7 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface dubboVersion = req[0].(string) } if req[1] != nil { - pkg.Service.Target = req[1].(string) + pkg.Service.Path = req[1].(string) } if req[2] != nil { pkg.Service.Version = req[2].(string) @@ -105,11 +106,12 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface if req[6] != nil { attachments = req[6].(map[interface{}]interface{}) } + pkg.Service.Interface = attachments[constant.INTERFACE_KEY].(string) pkg.Body = map[string]interface{}{ "dubboVersion": dubboVersion, "argsTypes": argsTypes, "args": args, - "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Target), + "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Interface), "attachments": attachments, } } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 31a060a9abf3907de3778bf99d19ba3dba8f8847..fa4a514d172e23b18496ddeed42e371431da8e54 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -68,6 +68,10 @@ func (r *RPCInvocation) Reply() interface{} { return r.reply } +func (r *RPCInvocation) SetReply(reply interface{}) { + r.reply = reply +} + func (r *RPCInvocation) Attachments() map[string]string { return r.attachments } @@ -94,9 +98,13 @@ func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } -//func (r *RPCInvocation) SetInvoker() protocol.Invoker { -// return r.invoker -//} +func (r *RPCInvocation) SetInvoker() protocol.Invoker { + return r.invoker +} + +func (r *RPCInvocation) SetCallBack(c interface{}) { + r.callBack = c +} func (r *RPCInvocation) CallBack() interface{} { return r.callBack diff --git a/protocol/jsonrpc/json_test.go b/protocol/jsonrpc/json_test.go index b14afe7778a626fdeeeb71916ff8a51613a38258..4b83f181150316c00c1249348aea71fbf633031d 100644 --- a/protocol/jsonrpc/json_test.go +++ b/protocol/jsonrpc/json_test.go @@ -23,6 +23,10 @@ func TestJsonClientCodec_Write(t *testing.T) { data, err := codec.Write(cd) assert.NoError(t, err) assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"method\":\"GetUser\",\"params\":[\"args\",2],\"id\":1}\n", string(data)) + + cd.Args = 1 + data, err = codec.Write(cd) + assert.EqualError(t, err, "unsupported param type: int") } func TestJsonClientCodec_Read(t *testing.T) { @@ -46,6 +50,10 @@ func TestServerCodec_Write(t *testing.T) { data, err := codec.Write("error", &TestData{Test: "test"}) assert.NoError(t, err) assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) + + data, err = codec.Write("{\"code\":-32000,\"message\":\"error\"}", &TestData{Test: "test"}) + assert.NoError(t, err) + assert.Equal(t, "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"Test\":\"test\"},\"error\":{\"code\":-32000,\"message\":\"error\"}}\n", string(data)) } func TestServerCodec_Read(t *testing.T) { diff --git a/protocol/jsonrpc/jsonrpc_invoker_test.go b/protocol/jsonrpc/jsonrpc_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cde655623aa3d176e2ca4c92ad79a96549c047cc --- /dev/null +++ b/protocol/jsonrpc/jsonrpc_invoker_test.go @@ -0,0 +1,48 @@ +package jsonrpc + +import ( + "context" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func TestJsonrpcInvoker_Invoke(t *testing.T) { + + methods, err := common.ServiceMap.Register("jsonrpc", &UserProvider{}) + assert.NoError(t, err) + assert.Equal(t, "GetUser,GetUser1", methods) + + // Export + proto := GetProtocol() + url, err := common.NewURL(context.Background(), "jsonrpc://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + proto.Export(protocol.NewBaseInvoker(url)) + + client := NewHTTPClient(&HTTPOptions{ + HandshakeTimeout: time.Second, + HTTPTimeout: time.Second, + }) + + jsonInvoker := NewJsonrpcInvoker(url, client) + user := &User{} + res := jsonInvoker.Invoke(invocation.NewRPCInvocationForConsumer("GetUser", nil, []interface{}{"1", "username"}, user, nil, url, nil)) + assert.NoError(t, res.Error()) + assert.Equal(t, User{Id: "1", Name: "username"}, *res.Result().(*User)) + + // destroy + proto.Destroy() +} diff --git a/protocol/protocol.go b/protocol/protocol.go index cd11b101d8211fa2bfe67aa5141d42f7c1642088..379a83b53859f63417cdc8f79b364bf83a4297ca 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -56,11 +56,11 @@ func (bp *BaseProtocol) Invokers() []Invoker { } func (bp *BaseProtocol) Export(invoker Invoker) Exporter { - return nil + return NewBaseExporter("base", invoker, bp.exporterMap) } func (bp *BaseProtocol) Refer(url common.URL) Invoker { - return nil + return NewBaseInvoker(url) } // Destroy will destroy all invoker and exporter, so it only is called once. diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a3e91add8f1647c5649b9338887cb5a02c935d19 --- /dev/null +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -0,0 +1,42 @@ +package protocolwrapper + +import ( + "net/url" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/common/extension" + "github.com/dubbo/go-for-apache-dubbo/filter/impl" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func TestProtocolFilterWrapper_Export(t *testing.T) { + filtProto := extension.GetProtocolExtension(FILTER) + filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} + + u := common.NewURLWithOptions("Service", + common.WithParams(url.Values{}), + common.WithParamsValue(constant.SERVICE_FILTER_KEY, impl.ECHO)) + exporter := filtProto.Export(protocol.NewBaseInvoker(*u)) + _, ok := exporter.GetInvoker().(*FilterInvoker) + assert.True(t, ok) +} + +func TestProtocolFilterWrapper_Refer(t *testing.T) { + filtProto := extension.GetProtocolExtension(FILTER) + filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} + + u := common.NewURLWithOptions("Service", + common.WithParams(url.Values{}), + common.WithParamsValue(constant.REFERENCE_FILTER_KEY, impl.ECHO)) + invoker := filtProto.Refer(*u) + _, ok := invoker.(*FilterInvoker) + assert.True(t, ok) +}