From 58e7e86d6d7856bffa4e5eedf6f3d5f6a48be6a1 Mon Sep 17 00:00:00 2001 From: cvictory <shenglicao2@gmail.com> Date: Thu, 14 May 2020 16:53:58 +0800 Subject: [PATCH] fix review issue: add comment and some optimization --- config/application_config.go | 7 ++-- protocol/dubbo/dubbo_codec.go | 36 ++++++++++++++------- protocol/dubbo/dubbo_invoker.go | 9 ++++-- protocol/dubbo/dubbo_invoker_test.go | 2 +- protocol/dubbo/dubbo_protocol.go | 2 ++ protocol/invocation/rpcinvocation.go | 10 +++--- protocol/jsonrpc/http_test.go | 22 ++++++------- protocol/jsonrpc/jsonrpc_invoker.go | 5 ++- remoting/exchange.go | 31 +++++++++++------- remoting/exchange_client.go | 45 ++++++++++++++++++-------- remoting/exchange_server.go | 7 ++++ remoting/getty/dubbo_codec_for_test.go | 16 +++++---- remoting/getty/getty_client.go | 14 +++++--- remoting/getty/getty_client_test.go | 26 +++++++-------- remoting/getty/listener.go | 4 +-- 15 files changed, 142 insertions(+), 94 deletions(-) diff --git a/config/application_config.go b/config/application_config.go index 23ab7d34a..4f747fb16 100644 --- a/config/application_config.go +++ b/config/application_config.go @@ -17,12 +17,9 @@ package config -import ( - "github.com/creasty/defaults" -) - import ( "github.com/apache/dubbo-go/common/constant" + "github.com/creasty/defaults" ) // ApplicationConfig ... @@ -40,7 +37,7 @@ func (*ApplicationConfig) Prefix() string { return constant.DUBBO + ".application." } -// Id ... +// ID ... func (c *ApplicationConfig) Id() string { return "" } diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index f297d4c3d..c233b104f 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -42,10 +42,11 @@ const ( func init() { codec := &DubboCodec{} + // this is for registry dubboCodec of dubbo protocol remoting.NewCodec("dubbo", codec) } -// DubboPackage ... +// DubboPackage. this is for hessian encode/decode. If we refactor hessian, it will also be refactored. type DubboPackage struct { Header hessian.DubboHeader Service hessian.Service @@ -53,6 +54,7 @@ type DubboPackage struct { Err error } +// String of DubboPackage func (p DubboPackage) String() string { return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) } @@ -103,9 +105,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) err return perrors.WithStack(err) } +// DubboCodec. It is implements remoting.Codec type DubboCodec struct { } +// encode request for transport func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, error) { if request.Event { return c.encodeHeartbeartReqeust(request) @@ -127,12 +131,13 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000")) if err != nil { - panic(err) + // it will be wrapped in readwrite.Write . + return nil, err } p.Service.Timeout = time.Duration(timeout) p.Header.SerialID = byte(S_Dubbo) - p.Header.ID = request.Id + p.Header.ID = request.ID if request.TwoWay { p.Header.Type = hessian.PackageRequest_TwoWay } else { @@ -150,10 +155,12 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er return bytes.NewBuffer(pkg), nil } + +// encode heartbeart request func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) { pkg := &DubboPackage{} pkg.Body = []interface{}{} - pkg.Header.ID = request.Id + pkg.Header.ID = request.ID pkg.Header.Type = hessian.PackageHeartbeat pkg.Header.SerialID = byte(S_Dubbo) @@ -166,6 +173,8 @@ func (c *DubboCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes. return bytes.NewBuffer(byt), nil } + +// encode response func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, error) { var ptype = hessian.PackageResponse if response.IsHeartbeat() { @@ -175,7 +184,7 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, Header: hessian.DubboHeader{ SerialID: response.SerialID, Type: ptype, - ID: response.Id, + ID: response.ID, ResponseStatus: response.Status, }, } @@ -196,19 +205,21 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, return bytes.NewBuffer(pkg), nil } + +// Decode data, including request and response. func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) { if c.isRequest(data) { req, len, err := c.decodeRequest(data) if err != nil { - return remoting.DecodeResult{}, len, err + return remoting.DecodeResult{}, len, perrors.WithStack(err) } - return remoting.DecodeResult{IsRequest: true, Result: req}, len, err + return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err) } else { resp, len, err := c.decodeResponse(data) if err != nil { - return remoting.DecodeResult{}, len, err + return remoting.DecodeResult{}, len, perrors.WithStack(err) } - return remoting.DecodeResult{IsRequest: false, Result: resp}, len, err + return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err) } } func (c *DubboCodec) isRequest(data []byte) bool { @@ -218,6 +229,7 @@ func (c *DubboCodec) isRequest(data []byte) bool { return true } +// decode request func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) { pkg := &DubboPackage{ Body: make([]interface{}, 7), @@ -236,7 +248,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) return request, 0, perrors.WithStack(err) } request = &remoting.Request{ - Id: pkg.Header.ID, + ID: pkg.Header.ID, SerialID: pkg.Header.SerialID, TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00, Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00, @@ -282,6 +294,7 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) return request, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil } +// decode response func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error) { pkg := &DubboPackage{} buf := bytes.NewBuffer(data) @@ -289,6 +302,7 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error err := pkg.Unmarshal(buf, response) if err != nil { originErr := perrors.Cause(err) + // if the data is very big, so the receive need much times. if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { return nil, 0, originErr } @@ -297,7 +311,7 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error return nil, 0, perrors.WithStack(err) } response = &remoting.Response{ - Id: pkg.Header.ID, + ID: pkg.Header.ID, //Version: pkg.Header., SerialID: pkg.Header.SerialID, Status: pkg.Header.ResponseStatus, diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 1b3ac6fc8..a537f8fd9 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -48,17 +48,19 @@ var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} ) -// DubboInvoker ... +// DubboInvoker. It is implement of protocol.Invoker. One dubboInvoker refer to one service and ip. type DubboInvoker struct { protocol.BaseInvoker + // the exchange layer, it is focus on network communication. client *remoting.ExchangeClient quitOnce sync.Once - timeout time.Duration + // timeout for service(interface) level. + timeout time.Duration // Used to record the number of requests. -1 represent this DubboInvoker is destroyed reqNum int64 } -// NewDubboInvoker ... +// NewDubboInvoker constructor func NewDubboInvoker(url common.URL, client *remoting.ExchangeClient) *DubboInvoker { requestTimeout := config.GetConsumerConfig().RequestTimeout @@ -143,6 +145,7 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti return t } } + // set timeout into invocation at method level invocation.SetAttachments(constant.TIMEOUT_KEY, strconv.Itoa(int(di.timeout.Milliseconds()))) return di.timeout } diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index ddb7b783d..4c435d233 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -70,7 +70,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { r := response.(remoting.AsyncCallbackResponse) rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult) assert.Equal(t, User{Id: "1", Name: "username"}, *(rst.Rest.(*User))) - //assert.Equal(t, User{Id: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) + //assert.Equal(t, User{ID: "1", Name: "username"}, *r.Reply.(*Response).reply.(*User)) lock.Unlock() }) res = invoker.Invoke(context.Background(), inv) diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 8c2845604..79607379e 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -41,6 +41,8 @@ const ( ) var ( + // Make the connection can be shared. + // It will create one connection for one address (ip+port) exchangeClientMap *sync.Map = new(sync.Map) ) diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index e924a77da..431e70a8c 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -19,12 +19,10 @@ package invocation import ( "bytes" - "github.com/apache/dubbo-go/common/constant" "reflect" "sync" -) -import ( + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) @@ -145,12 +143,12 @@ func (r *RPCInvocation) SetCallBack(c interface{}) { func (r *RPCInvocation) ServiceKey() string { intf := r.AttachmentsByKey(constant.INTERFACE_KEY, "") - if intf == "" { + if len(intf) == 0 { return "" } buf := &bytes.Buffer{} group := r.AttachmentsByKey(constant.GROUP_KEY, "") - if group != "" { + if len(group) != 0 { buf.WriteString(group) buf.WriteString("/") } @@ -158,7 +156,7 @@ func (r *RPCInvocation) ServiceKey() string { buf.WriteString(intf) version := r.AttachmentsByKey(constant.VERSION_KEY, "") - if version != "" && version != "0.0.0" { + if len(version) != 0 && version != "0.0.0" { buf.WriteString(":") buf.WriteString(version) } diff --git a/protocol/jsonrpc/http_test.go b/protocol/jsonrpc/http_test.go index 0cb88b36a..8a4391052 100644 --- a/protocol/jsonrpc/http_test.go +++ b/protocol/jsonrpc/http_test.go @@ -22,19 +22,15 @@ import ( "strings" "testing" "time" -) -import ( "github.com/opentracing/opentracing-go" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" -) -import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) type ( @@ -71,7 +67,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser ctx := context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser", }) @@ -85,7 +81,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser0 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser0", }) @@ -98,7 +94,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser1 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser1", }) @@ -110,7 +106,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser2 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser2", }) @@ -122,7 +118,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser3 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser3", }) @@ -134,7 +130,7 @@ func TestHTTPClient_Call(t *testing.T) { // call GetUser4 ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser4", }) @@ -145,7 +141,7 @@ func TestHTTPClient_Call(t *testing.T) { assert.Equal(t, &User{Id: "", Name: ""}, reply) ctx = context.WithValue(context.Background(), constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": "GetUser4", }) diff --git a/protocol/jsonrpc/jsonrpc_invoker.go b/protocol/jsonrpc/jsonrpc_invoker.go index b6e194ce0..fd3011260 100644 --- a/protocol/jsonrpc/jsonrpc_invoker.go +++ b/protocol/jsonrpc/jsonrpc_invoker.go @@ -19,13 +19,12 @@ package jsonrpc import ( "context" -) -import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) @@ -54,7 +53,7 @@ func (ji *JsonrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invoca url := ji.GetUrl() req := ji.client.NewRequest(url, inv.MethodName(), inv.Arguments()) ctxNew := context.WithValue(ctx, constant.DUBBOGO_CTX_KEY, map[string]string{ - "X-Proxy-Id": "dubbogo", + "X-Proxy-ID": "dubbogo", "X-Services": url.Path, "X-Method": inv.MethodName(), }) diff --git a/remoting/exchange.go b/remoting/exchange.go index 45babec64..f66540390 100644 --- a/remoting/exchange.go +++ b/remoting/exchange.go @@ -24,39 +24,46 @@ import ( ) var ( + // generate request ID for global use sequence atomic.Uint64 ) func init() { + // init request ID sequence.Store(0) } func SequenceId() uint64 { + // increse 2 for every request. return sequence.Add(2) } // Request ... type Request struct { - Id int64 - Version string + ID int64 + // protocol version + Version string + // serial ID SerialID byte - Data interface{} - TwoWay bool - Event bool - broken bool + // Data + Data interface{} + TwoWay bool + Event bool + // it is used to judge the request is unbroken + // broken bool } -// NewRequest ... +// NewRequest func NewRequest(version string) *Request { return &Request{ - Id: int64(SequenceId()), + ID: int64(SequenceId()), Version: version, } } // Response ... type Response struct { - Id int64 + ID int64 Version string SerialID byte Status uint8 @@ -65,14 +72,15 @@ type Response struct { Result interface{} } -// NewResponse ... +// NewResponse func NewResponse(id int64, version string) *Response { return &Response{ - Id: id, + ID: id, Version: version, } } +// the response is heartbeat func (response *Response) IsHeartbeat() bool { return response.Event && response.Result == nil } @@ -92,6 +100,7 @@ type AsyncCallbackResponse struct { Reply interface{} } +// the client sends requst to server, there is one pendingResponse at client side to wait the response from server type PendingResponse struct { seq int64 Err error diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go index cd1e3a101..ccbd6f7a9 100644 --- a/remoting/exchange_client.go +++ b/remoting/exchange_client.go @@ -26,29 +26,39 @@ import ( ) var ( + // store requestID and response pendingResponses *sync.Map = new(sync.Map) ) type SequenceType int64 -type ExchangeClient struct { - ConnectTimeout time.Duration - address string - client Client -} - +// It is interface of client for network communication. +// If you use getty as network communication, you should define GettyClient that implements this interface. type Client interface { SetExchangeClient(client *ExchangeClient) + // responseHandler is used to deal with msg SetResponseHandler(responseHandler ResponseHandler) + // connect url Connect(url common.URL) error + // close Close() + // send request to server. Request(request *Request, timeout time.Duration, response *PendingResponse) error } +// This is abstraction level. it is like facade. +type ExchangeClient struct { + ConnectTimeout time.Duration + address string + client Client +} + +// handle the message from server type ResponseHandler interface { Handler(response *Response) } +// create ExchangeClient func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient { exchangeClient := &ExchangeClient{ ConnectTimeout: connectTimeout, @@ -67,6 +77,7 @@ func NewExchangeClient(url common.URL, client Client, connectTimeout time.Durati return exchangeClient } +// two way request func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration, result *protocol.RPCResult) error { request := NewRequest("2.0.2") @@ -74,8 +85,8 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo request.Event = false request.TwoWay = true - rsp := NewPendingResponse(request.Id) - rsp.response = NewResponse(request.Id, "2.0.2") + rsp := NewPendingResponse(request.ID) + rsp.response = NewResponse(request.ID, "2.0.2") rsp.Reply = (*invocation).Reply() AddPendingResponse(rsp) @@ -88,6 +99,7 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo return nil } +// async two way request func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration, callback common.AsyncCallback, result *protocol.RPCResult) error { request := NewRequest("2.0.2") @@ -95,8 +107,8 @@ func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url request.Event = false request.TwoWay = true - rsp := NewPendingResponse(request.Id) - rsp.response = NewResponse(request.Id, "2.0.2") + rsp := NewPendingResponse(request.ID) + rsp.response = NewResponse(request.ID, "2.0.2") rsp.Callback = callback rsp.Reply = (*invocation).Reply() AddPendingResponse(rsp) @@ -110,15 +122,15 @@ func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url return nil } -// oneway +// oneway request func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time.Duration) error { request := NewRequest("2.0.2") request.Data = invocation request.Event = false request.TwoWay = false - rsp := NewPendingResponse(request.Id) - rsp.response = NewResponse(request.Id, "2.0.2") + rsp := NewPendingResponse(request.ID) + rsp.response = NewResponse(request.ID, "2.0.2") err := client.client.Request(request, timeout, rsp) if err != nil { @@ -127,13 +139,15 @@ func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time return nil } +// close client func (client *ExchangeClient) Close() { client.client.Close() } +// handle the response from server func (client *ExchangeClient) Handler(response *Response) { - pendingResponse := removePendingResponse(SequenceType(response.Id)) + pendingResponse := removePendingResponse(SequenceType(response.ID)) if pendingResponse == nil { logger.Errorf("failed to get pending response context for response package %s", *response) return @@ -149,10 +163,12 @@ func (client *ExchangeClient) Handler(response *Response) { } } +// store response into map func AddPendingResponse(pr *PendingResponse) { pendingResponses.Store(SequenceType(pr.seq), pr) } +// get and remove response func removePendingResponse(seq SequenceType) *PendingResponse { if pendingResponses == nil { return nil @@ -164,6 +180,7 @@ func removePendingResponse(seq SequenceType) *PendingResponse { return nil } +// get response func GetPendingResponse(seq SequenceType) *PendingResponse { if presp, ok := pendingResponses.Load(seq); ok { return presp.(*PendingResponse) diff --git a/remoting/exchange_server.go b/remoting/exchange_server.go index 44f41622a..c4538075b 100644 --- a/remoting/exchange_server.go +++ b/remoting/exchange_server.go @@ -20,16 +20,21 @@ import ( "github.com/apache/dubbo-go/common" ) +// It is interface of server for network communication. +// If you use getty as network communication, you should define GettyServer that implements this interface. type Server interface { //invoke once for connection Start() + //it is for destroy Stop() } +// This is abstraction level. it is like facade. type ExchangeServer struct { Server Server } +// Create ExchangeServer func NewExchangeServer(url common.URL, server Server) *ExchangeServer { exchangServer := &ExchangeServer{ Server: server, @@ -37,10 +42,12 @@ func NewExchangeServer(url common.URL, server Server) *ExchangeServer { return exchangServer } +// start server func (server *ExchangeServer) Start() { server.Server.Start() } +// stop server func (server *ExchangeServer) Stop() { server.Server.Stop() } diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go index 83f0359c3..4afb20334 100644 --- a/remoting/getty/dubbo_codec_for_test.go +++ b/remoting/getty/dubbo_codec_for_test.go @@ -16,7 +16,8 @@ */ package getty -// copy from dubbo/dubbo_codec.go +// copy from dubbo/dubbo_codec.go . +// it is used to unit test. import ( "bufio" "bytes" @@ -129,7 +130,8 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TIMEOUT_KEY, "3000")) if err != nil { - panic(err) + // it will be wrapped in readwrite.Write . + return nil, err } p.Service.Timeout = time.Duration(timeout) //var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "") @@ -140,7 +142,7 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer //} p.Header.SerialID = byte(S_Dubbo) - p.Header.ID = request.Id + p.Header.ID = request.ID if request.TwoWay { p.Header.Type = hessian.PackageRequest_TwoWay } else { @@ -161,7 +163,7 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer func (c *DubboTestCodec) encodeHeartbeartReqeust(request *remoting.Request) (*bytes.Buffer, error) { pkg := &DubboPackage{} pkg.Body = []interface{}{} - pkg.Header.ID = request.Id + pkg.Header.ID = request.ID pkg.Header.Type = hessian.PackageHeartbeat pkg.Header.SerialID = byte(S_Dubbo) @@ -183,7 +185,7 @@ func (c *DubboTestCodec) EncodeResponse(response *remoting.Response) (*bytes.Buf Header: hessian.DubboHeader{ SerialID: response.SerialID, Type: ptype, - ID: response.Id, + ID: response.ID, ResponseStatus: response.Status, }, } @@ -249,7 +251,7 @@ func (c *DubboTestCodec) decodeRequest(data []byte) (*remoting.Request, int, err return request, 0, perrors.WithStack(err) } request = &remoting.Request{ - Id: pkg.Header.ID, + ID: pkg.Header.ID, SerialID: pkg.Header.SerialID, TwoWay: pkg.Header.Type&hessian.PackageRequest_TwoWay != 0x00, Event: pkg.Header.Type&hessian.PackageHeartbeat != 0x00, @@ -330,7 +332,7 @@ func (c *DubboTestCodec) decodeResponse(data []byte) (*remoting.Response, int, e return response, 0, perrors.WithStack(err) } response = &remoting.Response{ - Id: pkg.Header.ID, + ID: pkg.Header.ID, //Version: pkg.Header., SerialID: pkg.Header.SerialID, Status: pkg.Header.ResponseStatus, diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go index a7a0c2c6a..a4e3904a9 100644 --- a/remoting/getty/getty_client.go +++ b/remoting/getty/getty_client.go @@ -44,6 +44,7 @@ var ( clientGrpool *gxsync.TaskPool ) +// it is init client for single protocol. func initClient(protocol string) { if protocol == "" { return @@ -84,7 +85,7 @@ func initClient(protocol string) { rand.Seed(time.Now().UnixNano()) } -// SetClientConf ... +// SetClientConf: config ClientConf func SetClientConf(c ClientConfig) { clientConf = &c err := clientConf.CheckValidity() @@ -102,13 +103,14 @@ func setClientGrpool() { } } -// Options ... +// Options : param config type Options struct { // connect timeout + // remove request timeout, it will be calulate for every request ConnectTimeout time.Duration } -// Client ... +// Client : some configuration for network communication. type Client struct { addr string opts Options @@ -119,7 +121,7 @@ type Client struct { ExchangeClient *remoting.ExchangeClient } -// NewClient ... +// create client func NewClient(opt Options) *Client { switch { case opt.ConnectTimeout == 0: @@ -153,6 +155,7 @@ func (c *Client) Connect(url common.URL) error { return err } +// close network connection func (c *Client) Close() { if c.pool != nil { c.pool.close() @@ -160,6 +163,7 @@ func (c *Client) Close() { c.pool = nil } +// send request func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error { var ( @@ -212,7 +216,7 @@ func (c *Client) heartbeat(session getty.Session) error { req := remoting.NewRequest("2.0.2") req.TwoWay = true req.Event = true - resp := remoting.NewPendingResponse(req.Id) + resp := remoting.NewPendingResponse(req.ID) remoting.AddPendingResponse(resp) return c.transfer(session, req, 3*time.Second) } diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go index 9fa46daaf..f16c146e8 100644 --- a/remoting/getty/getty_client_test.go +++ b/remoting/getty/getty_client_test.go @@ -123,7 +123,7 @@ func testGetBigPkg(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) pendingResponse.Reply = user remoting.AddPendingResponse(pendingResponse) err = c.Request(request, 8*time.Second, pendingResponse) @@ -145,7 +145,7 @@ func testGetUser(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) pendingResponse.Reply = user remoting.AddPendingResponse(pendingResponse) err = c.Request(request, 3*time.Second, pendingResponse) @@ -167,8 +167,8 @@ func testGetUser0(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - rsp := remoting.NewPendingResponse(request.Id) - rsp.SetResponse(remoting.NewResponse(request.Id, "2.0.2")) + rsp := remoting.NewPendingResponse(request.ID) + rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2")) remoting.AddPendingResponse(rsp) rsp.Reply = user err = c.Request(request, 3*time.Second, rsp) @@ -187,7 +187,7 @@ func testGetUser1(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) user := &User{} pendingResponse.Reply = user remoting.AddPendingResponse(pendingResponse) @@ -206,7 +206,7 @@ func testGetUser2(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) remoting.AddPendingResponse(pendingResponse) err = c.Request(request, 3*time.Second, pendingResponse) assert.EqualError(t, err, "error") @@ -223,7 +223,7 @@ func testGetUser3(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) user2 := []interface{}{} pendingResponse.Reply = &user2 remoting.AddPendingResponse(pendingResponse) @@ -242,7 +242,7 @@ func testGetUser4(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) user2 := []interface{}{} pendingResponse.Reply = &user2 remoting.AddPendingResponse(pendingResponse) @@ -262,7 +262,7 @@ func testGetUser5(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) user3 := map[interface{}]interface{}{} pendingResponse.Reply = &user3 remoting.AddPendingResponse(pendingResponse) @@ -285,7 +285,7 @@ func testGetUser6(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) pendingResponse.Reply = user remoting.AddPendingResponse(pendingResponse) err = c.Request(request, 3*time.Second, pendingResponse) @@ -306,7 +306,7 @@ func testGetUser61(t *testing.T, c *Client) { request.Data = invocation request.Event = false request.TwoWay = true - pendingResponse := remoting.NewPendingResponse(request.Id) + pendingResponse := remoting.NewPendingResponse(request.ID) pendingResponse.Reply = user remoting.AddPendingResponse(pendingResponse) err = c.Request(request, 3*time.Second, pendingResponse) @@ -325,8 +325,8 @@ func testClient_AsyncCall(t *testing.T, svr *Server, url common.URL, client *Cli request.Data = invocation request.Event = false request.TwoWay = true - rsp := remoting.NewPendingResponse(request.Id) - rsp.SetResponse(remoting.NewResponse(request.Id, "2.0.2")) + rsp := remoting.NewPendingResponse(request.ID) + rsp.SetResponse(remoting.NewResponse(request.ID, "2.0.2")) remoting.AddPendingResponse(rsp) rsp.Reply = user rsp.Callback = func(response common.CallbackResponse) { diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index 9d4252915..5a9ab4999 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -101,7 +101,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { req := result.Result.(*remoting.Request) if req.Event { logger.Debugf("get rpc heartbeat request{%#v}", req) - resp := remoting.NewResponse(req.Id, req.Version) + resp := remoting.NewResponse(req.ID, req.Version) resp.Status = hessian.Response_OK resp.Event = req.Event resp.SerialID = req.SerialID @@ -230,7 +230,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { } req := decodeResult.Result.(*remoting.Request) - resp := remoting.NewResponse(req.Id, req.Version) + resp := remoting.NewResponse(req.ID, req.Version) resp.Status = hessian.Response_OK resp.Event = req.Event resp.SerialID = req.SerialID -- GitLab