diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 8b2723431fc2436c81747f119f841b2436f65bc1..01d9a42861a30956310412d17a0cc7fca02c353b 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -28,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/remoting/getty" ) @@ -40,7 +41,7 @@ const ( "side=provider&timeout=3000×tamp=1556509797245" ) -func init() { +func initDubboInvokerTest() { getty.SetServerConfig(getty.ServerConfig{ SessionNumber: 700, SessionTimeout: "20s", @@ -80,7 +81,9 @@ func init() { }, }) } + func TestDubboProtocol_Export(t *testing.T) { + initDubboInvokerTest() srvCfg := getty.GetDefaultServerConfig() getty.SetServerConfig(srvCfg) // Export @@ -117,6 +120,7 @@ func TestDubboProtocol_Export(t *testing.T) { func TestDubboProtocol_Refer_No_connect(t *testing.T) { // Refer + initDubboInvokerTest() proto := GetProtocol() url, err := common.NewURL(mockCommonUrl) assert.NoError(t, err) @@ -125,15 +129,18 @@ func TestDubboProtocol_Refer_No_connect(t *testing.T) { } func TestDubboProtocol_Refer(t *testing.T) { + initDubboInvokerTest() cliCfg := getty.GetDefaultClientConfig() getty.SetClientConf(cliCfg) // Refer proto := GetProtocol() url, err := common.NewURL(mockCommonUrl) + proto.Export(&proxy_factory.ProxyInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + }) assert.NoError(t, err) invoker := proto.Refer(url) - // make sure url eq := invoker.GetUrl().URLEqual(url) assert.True(t, eq) diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go index 8303bb782c5a96989c05730a72f5840debcc3c92..bde7d9e696fbe8c2fd2af065bad7c9105bad1cdd 100644 --- a/remoting/getty/dubbo_codec_for_test.go +++ b/remoting/getty/dubbo_codec_for_test.go @@ -20,9 +20,20 @@ package getty // it is used to unit test. import ( "bytes" + "strconv" + "time" ) import ( + hessian "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/dubbo/impl" + "github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/remoting" ) @@ -34,26 +45,232 @@ func init() { type DubboTestCodec struct { } +// encode request for transport func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) { - return &bytes.Buffer{}, nil + if request.Event { + return c.encodeHeartbeartReqeust(request) + } + + invoc, ok := request.Data.(*invocation.RPCInvocation) + if !ok { + return nil, perrors.Errorf("encode request failed for parameter type :%+v", request) + } + invocation := *invoc + + svc := impl.Service{} + svc.Path = invocation.AttachmentsByKey(constant.PATH_KEY, "") + svc.Interface = invocation.AttachmentsByKey(constant.INTERFACE_KEY, "") + svc.Version = invocation.AttachmentsByKey(constant.VERSION_KEY, "") + svc.Group = invocation.AttachmentsByKey(constant.GROUP_KEY, "") + svc.Method = invocation.MethodName() + timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_REMOTING_TIMEOUT))) + if err != nil { + // it will be wrapped in readwrite.Write . + return nil, perrors.WithStack(err) + } + svc.Timeout = time.Duration(timeout) + + header := impl.DubboHeader{} + serialization := invocation.AttachmentsByKey(constant.SERIALIZATION_KEY, constant.HESSIAN2_SERIALIZATION) + if serialization == constant.PROTOBUF_SERIALIZATION { + header.SerialID = constant.S_Proto + } else { + header.SerialID = constant.S_Hessian2 + } + header.ID = request.ID + if request.TwoWay { + header.Type = impl.PackageRequest_TwoWay + } else { + header.Type = impl.PackageRequest + } + + pkg := &impl.DubboPackage{ + Header: header, + Service: svc, + Body: impl.NewRequestPayload(invocation.Arguments(), invocation.Attachments()), + Err: nil, + Codec: impl.NewDubboCodec(nil), + } + + if err := impl.LoadSerializer(pkg); err != nil { + return nil, perrors.WithStack(err) + } + + return pkg.Marshal() } + +// encode heartbeart request func (c *DubboTestCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) { - return &bytes.Buffer{}, nil + header := impl.DubboHeader{ + Type: impl.PackageHeartbeat, + SerialID: constant.S_Hessian2, + ID: request.ID, + } + + pkg := &impl.DubboPackage{ + Header: header, + Service: impl.Service{}, + Body: impl.NewRequestPayload([]interface{}{}, nil), + Err: nil, + Codec: impl.NewDubboCodec(nil), + } + + if err := impl.LoadSerializer(pkg); err != nil { + return nil, err + } + return pkg.Marshal() } + +// encode response func (c *DubboTestCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) { - return &bytes.Buffer{}, nil + var ptype = impl.PackageResponse + if response.IsHeartbeat() { + ptype = impl.PackageHeartbeat + } + resp := &impl.DubboPackage{ + Header: impl.DubboHeader{ + SerialID: response.SerialID, + Type: ptype, + ID: response.ID, + ResponseStatus: response.Status, + }, + } + if !response.IsHeartbeat() { + resp.Body = &impl.ResponsePayload{ + RspObj: response.Result.(protocol.RPCResult).Rest, + Exception: response.Result.(protocol.RPCResult).Err, + Attachments: response.Result.(protocol.RPCResult).Attrs, + } + } + + codec := impl.NewDubboCodec(nil) + + pkg, err := codec.Encode(*resp) + if err != nil { + return nil, perrors.WithStack(err) + } + + return bytes.NewBuffer(pkg), nil } + +// Decode data, including request and response. func (c *DubboTestCodec) Decode(data []byte) (remoting.DecodeResult, int, error) { - return remoting.DecodeResult{}, 0, nil + if c.isRequest(data) { + req, len, err := c.decodeRequest(data) + if err != nil { + return remoting.DecodeResult{}, len, perrors.WithStack(err) + } + return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err) + } else { + resp, len, err := c.decodeResponse(data) + if err != nil { + return remoting.DecodeResult{}, len, perrors.WithStack(err) + } + return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err) + } } + func (c *DubboTestCodec) isRequest(data []byte) bool { + if data[2]&byte(0x80) == 0x00 { + return false + } return true } +// decode request func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, error) { - return nil, 0, nil + var request *remoting.Request = nil + buf := bytes.NewBuffer(data) + pkg := impl.NewDubboPackage(buf) + pkg.SetBody(make([]interface{}, 7)) + err := pkg.Unmarshal() + if err != nil { + originErr := perrors.Cause(err) + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + //FIXME + return nil, 0, originErr + } + return request, 0, perrors.WithStack(err) + } + request = &remoting.Request{ + ID: pkg.Header.ID, + SerialID: pkg.Header.SerialID, + TwoWay: pkg.Header.Type&impl.PackageRequest_TwoWay != 0x00, + Event: pkg.Header.Type&impl.PackageHeartbeat != 0x00, + } + if (pkg.Header.Type & impl.PackageHeartbeat) == 0x00 { + // convert params of request + req := pkg.Body.(map[string]interface{}) + + //invocation := request.Data.(*invocation.RPCInvocation) + var methodName string + var args []interface{} + var attachments map[string]string = make(map[string]string) + if req[impl.DubboVersionKey] != nil { + //dubbo version + request.Version = req[impl.DubboVersionKey].(string) + } + //path + attachments[constant.PATH_KEY] = pkg.Service.Path + //version + attachments[constant.VERSION_KEY] = pkg.Service.Version + //method + methodName = pkg.Service.Method + args = req[impl.ArgsKey].([]interface{}) + attachments = req[impl.AttachmentsKey].(map[string]string) + invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), + invocation.WithArguments(args), invocation.WithMethodName(methodName)) + request.Data = invoc + + } + return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } +// decode response func (c *DubboTestCodec) decodeResponse(data []byte) (*remoting.Response, int, error) { - return nil, 0, nil + buf := bytes.NewBuffer(data) + pkg := impl.NewDubboPackage(buf) + response := &remoting.Response{} + err := pkg.Unmarshal() + if err != nil { + originErr := perrors.Cause(err) + // if the data is very big, so the receive need much times. + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + return nil, 0, originErr + } + return nil, 0, perrors.WithStack(err) + } + response = &remoting.Response{ + ID: pkg.Header.ID, + //Version: pkg.Header., + SerialID: pkg.Header.SerialID, + Status: pkg.Header.ResponseStatus, + Event: (pkg.Header.Type & impl.PackageHeartbeat) != 0, + } + var error error + if pkg.Header.Type&impl.PackageHeartbeat != 0x00 { + if pkg.Header.Type&impl.PackageResponse != 0x00 { + if pkg.Err != nil { + error = pkg.Err + } + } else { + response.Status = hessian.Response_OK + //reply(session, p, hessian.PackageHeartbeat) + } + return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error + } + rpcResult := &protocol.RPCResult{} + response.Result = rpcResult + if pkg.Header.Type&impl.PackageRequest == 0x00 { + if pkg.Err != nil { + rpcResult.Err = pkg.Err + } else if pkg.Body.(*impl.ResponsePayload).Exception != nil { + rpcResult.Err = pkg.Body.(*impl.ResponsePayload).Exception + response.Error = rpcResult.Err + } + rpcResult.Attrs = pkg.Body.(*impl.ResponsePayload).Attachments + rpcResult.Rest = pkg.Body.(*impl.ResponsePayload).RspObj + } + + return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go index 12726547d69e75912d1074af6f96b4427ed20ac8..41ca3108a8a8d578c6aaaf374dc9f5fa6300a8b0 100644 --- a/remoting/getty/getty_client_test.go +++ b/remoting/getty/getty_client_test.go @@ -48,7 +48,6 @@ func TestRunSuite(t *testing.T) { testRequestOneWay(t, svr, url, client) testClient_Call(t, svr, url, client) testClient_AsyncCall(t, svr, url, client) - svr.Stop() } @@ -106,15 +105,10 @@ func testClient_Call(t *testing.T, svr *Server, url common.URL, c *Client) { testGetUser5(t, c) testGetUser6(t, c) testGetUser61(t, c) - } -func testGetBigPkg(t *testing.T, c *Client) { - var ( - user *User - err error - ) - user = &User{} +func testGetBigPkg(t *testing.T, c *Client) { + user := &User{} request := remoting.NewRequest("2.0.2") invocation := createInvocation("GetBigPkg", nil, nil, []interface{}{[]interface{}{nil}, user}, []reflect.Value{reflect.ValueOf([]interface{}{nil}), reflect.ValueOf(user)}) @@ -126,17 +120,14 @@ func testGetBigPkg(t *testing.T, c *Client) { pendingResponse := remoting.NewPendingResponse(request.ID) pendingResponse.Reply = user remoting.AddPendingResponse(pendingResponse) - err = c.Request(request, 8*time.Second, pendingResponse) + err := c.Request(request, 8*time.Second, pendingResponse) assert.NoError(t, err) assert.NotEqual(t, "", user.Id) assert.NotEqual(t, "", user.Name) } + func testGetUser(t *testing.T, c *Client) { - var ( - user *User - err error - ) - user = &User{} + user := &User{} request := remoting.NewRequest("2.0.2") invocation := createInvocation("GetUser", nil, nil, []interface{}{"1", "username"}, []reflect.Value{reflect.ValueOf("1"), reflect.ValueOf("username")}) @@ -148,7 +139,7 @@ func testGetUser(t *testing.T, c *Client) { pendingResponse := remoting.NewPendingResponse(request.ID) pendingResponse.Reply = user remoting.AddPendingResponse(pendingResponse) - err = c.Request(request, 3*time.Second, pendingResponse) + err := c.Request(request, 3*time.Second, pendingResponse) assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) } @@ -175,6 +166,7 @@ func testGetUser0(t *testing.T, c *Client) { assert.NoError(t, err) assert.Equal(t, User{Id: "1", Name: "username"}, *user) } + func testGetUser1(t *testing.T, c *Client) { var ( err error @@ -194,6 +186,7 @@ func testGetUser1(t *testing.T, c *Client) { err = c.Request(request, 3*time.Second, pendingResponse) assert.NoError(t, err) } + func testGetUser2(t *testing.T, c *Client) { var ( err error @@ -211,6 +204,7 @@ func testGetUser2(t *testing.T, c *Client) { err = c.Request(request, 3*time.Second, pendingResponse) assert.EqualError(t, err, "error") } + func testGetUser3(t *testing.T, c *Client) { var ( err error @@ -231,6 +225,7 @@ func testGetUser3(t *testing.T, c *Client) { assert.NoError(t, err) assert.Equal(t, &User{Id: "1", Name: "username"}, user2[0]) } + func testGetUser4(t *testing.T, c *Client) { var ( err error