From adb55ce8c8b8756e4aaef4977bbcec8b5cef472a Mon Sep 17 00:00:00 2001 From: cvictory <shenglicao2@gmail.com> Date: Mon, 18 May 2020 16:10:03 +0800 Subject: [PATCH] split import sentense, fix some review --- protocol/dubbo/dubbo_codec.go | 11 +++-- protocol/dubbo/dubbo_invoker.go | 13 +++--- protocol/dubbo/dubbo_invoker_test.go | 17 ++++---- protocol/dubbo/dubbo_protocol.go | 17 +++++--- protocol/dubbo/dubbo_protocol_test.go | 7 +++- protocol/invocation/rpcinvocation.go | 27 +----------- .../protocol_filter_wrapper.go | 3 ++ remoting/codec.go | 8 +--- remoting/exchange.go | 7 +++- remoting/exchange_client.go | 41 ++++++++++++++++--- remoting/getty/dubbo_codec_for_test.go | 10 +++-- remoting/getty/getty_client.go | 38 +++++++++-------- remoting/getty/getty_client_test.go | 22 +++++----- remoting/getty/getty_server.go | 14 ++++--- remoting/getty/listener.go | 15 ++++--- remoting/getty/listener_test.go | 9 +++- remoting/getty/readwriter.go | 10 +++-- 17 files changed, 162 insertions(+), 107 deletions(-) diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index c233b104f..c6ab1811e 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -22,14 +22,19 @@ import ( "fmt" "strconv" "time" +) +import ( hessian "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" +) + +import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/remoting" - perrors "github.com/pkg/errors" ) //SerialID serial ID @@ -43,7 +48,7 @@ const ( func init() { codec := &DubboCodec{} // this is for registry dubboCodec of dubbo protocol - remoting.NewCodec("dubbo", codec) + remoting.RegistryCodec("dubbo", codec) } // DubboPackage. this is for hessian encode/decode. If we refactor hessian, it will also be refactored. @@ -88,7 +93,7 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) err } if resp != nil { // for client - if p.Header.Type&hessian.PackageRequest != 0x00 { + if (p.Header.Type & hessian.PackageRequest) != 0x00 { // size of this array must be '7' // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 p.Body = make([]interface{}, 7) diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index a537f8fd9..4d4cb3653 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -19,22 +19,25 @@ package dubbo import ( "context" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/remoting" "strconv" "strings" "sync" "sync/atomic" "time" +) - "github.com/apache/dubbo-go/config" - "github.com/apache/dubbo-go/remoting" +import ( "github.com/opentracing/opentracing-go" + perrors "github.com/pkg/errors" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" - perrors "github.com/pkg/errors" - invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) @@ -117,7 +120,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati //result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest) } else { - result.Err = di.client.Send(&invocation, timeout) + result.Err = di.client.Send(&invocation, url, timeout) } } else { if inv.Reply() == nil { diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 4c435d233..d363eb39a 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -23,20 +23,23 @@ import ( "sync" "testing" "time" +) - "github.com/apache/dubbo-go/remoting" - - "github.com/apache/dubbo-go/remoting/getty" - +import ( hessian "github.com/apache/dubbo-go-hessian2" + "github.com/opentracing/opentracing-go" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" - "github.com/opentracing/opentracing-go" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/getty" ) func TestDubboInvoker_Invoke(t *testing.T) { diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 79607379e..09a23bfdd 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -21,7 +21,13 @@ import ( "context" "fmt" "sync" +) +import ( + "github.com/opentracing/opentracing-go" +) + +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -31,7 +37,6 @@ import ( "github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting/getty" - "github.com/opentracing/opentracing-go" ) // dubbo protocol constant @@ -138,10 +143,10 @@ func GetProtocol() protocol.Protocol { } func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult { - exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.ServiceKey()) + exporter, _ := dubboProtocol.ExporterMap().Load(rpcInvocation.Invoker().GetUrl().ServiceKey()) result := protocol.RPCResult{} if exporter == nil { - err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.ServiceKey()) + err := fmt.Errorf("don't have this exporter, key: %s", rpcInvocation.Invoker().GetUrl().ServiceKey()) logger.Errorf(err.Error()) result.Err = err //reply(session, p, hessian.PackageResponse) @@ -163,7 +168,7 @@ func doHandleRequest(rpcInvocation *invocation.RPCInvocation) protocol.RPCResult //p.Body = hessian.NewResponse(res, nil, result.Attachments()) } } else { - result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.ServiceKey()) + result.Err = fmt.Errorf("don't have the invoker, key: %s", rpcInvocation.Invoker().GetUrl().ServiceKey()) } return result } @@ -173,7 +178,7 @@ func getExchangeClient(url common.URL) *remoting.ExchangeClient { if !ok { exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, - }), config.GetConsumerConfig().ConnectTimeout) + }), config.GetConsumerConfig().ConnectTimeout, false) if exchangeClientTmp != nil { exchangeClientMap.Store(url.Location, exchangeClientTmp) } @@ -184,7 +189,7 @@ func getExchangeClient(url common.URL) *remoting.ExchangeClient { if !ok { exchangeClientTmp := remoting.NewExchangeClient(url, getty.NewClient(getty.Options{ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, - }), config.GetConsumerConfig().ConnectTimeout) + }), config.GetConsumerConfig().ConnectTimeout, false) if exchangeClientTmp != nil { exchangeClientMap.Store(url.Location, exchangeClientTmp) } diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index b2894ed7f..55ab0fe45 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -19,12 +19,17 @@ package dubbo import ( "testing" +) +import ( + "github.com/stretchr/testify/assert" +) + +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/remoting/getty" - "github.com/stretchr/testify/assert" ) func init() { diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 431e70a8c..b207fd0b0 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -18,11 +18,11 @@ package invocation import ( - "bytes" "reflect" "sync" +) - "github.com/apache/dubbo-go/common/constant" +import ( "github.com/apache/dubbo-go/protocol" ) @@ -141,29 +141,6 @@ func (r *RPCInvocation) SetCallBack(c interface{}) { r.callBack = c } -func (r *RPCInvocation) ServiceKey() string { - intf := r.AttachmentsByKey(constant.INTERFACE_KEY, "") - if len(intf) == 0 { - return "" - } - buf := &bytes.Buffer{} - group := r.AttachmentsByKey(constant.GROUP_KEY, "") - if len(group) != 0 { - buf.WriteString(group) - buf.WriteString("/") - } - - buf.WriteString(intf) - - version := r.AttachmentsByKey(constant.VERSION_KEY, "") - if len(version) != 0 && version != "0.0.0" { - buf.WriteString(":") - buf.WriteString(version) - } - - return buf.String() -} - // ///////////////////////// // option // ///////////////////////// diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 70d2da0fa..3fee0f41a 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -68,6 +68,9 @@ func (pfw *ProtocolFilterWrapper) Destroy() { } func buildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { + if invoker == nil { + return nil + } filtName := invoker.GetUrl().GetParam(key, "") if filtName == "" { return invoker diff --git a/remoting/codec.go b/remoting/codec.go index 7ca755255..607d1643c 100644 --- a/remoting/codec.go +++ b/remoting/codec.go @@ -33,14 +33,10 @@ type DecodeResult struct { } var ( - codec map[string]Codec -) - -func init() { codec = make(map[string]Codec, 2) -} +) -func NewCodec(protocol string, codecTmp Codec) { +func RegistryCodec(protocol string, codecTmp Codec) { codec[protocol] = codecTmp } diff --git a/remoting/exchange.go b/remoting/exchange.go index f66540390..e85e2c8ad 100644 --- a/remoting/exchange.go +++ b/remoting/exchange.go @@ -18,11 +18,16 @@ package remoting import ( "time" +) - "github.com/apache/dubbo-go/common" +import ( "go.uber.org/atomic" ) +import ( + "github.com/apache/dubbo-go/common" +) + var ( // generate request ID for global use sequence atomic.Uint64 diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go index ccbd6f7a9..7ef8e54a2 100644 --- a/remoting/exchange_client.go +++ b/remoting/exchange_client.go @@ -17,9 +17,12 @@ package remoting import ( + "errors" "sync" "time" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" @@ -51,6 +54,7 @@ type ExchangeClient struct { ConnectTimeout time.Duration address string client Client + init bool } // handle the message from server @@ -59,27 +63,46 @@ type ResponseHandler interface { } // create ExchangeClient -func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration) *ExchangeClient { +func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient { exchangeClient := &ExchangeClient{ ConnectTimeout: connectTimeout, address: url.Location, client: client, } client.SetExchangeClient(exchangeClient) - if client.Connect(url) != nil { - //retry for a while - time.Sleep(1 * time.Second) - if client.Connect(url) != nil { + if !lazyInit { + if err := exchangeClient.doInit(url); err != nil { return nil } } + client.SetResponseHandler(exchangeClient) return exchangeClient } +func (cl *ExchangeClient) doInit(url common.URL) error { + if cl.init { + return nil + } + if cl.client.Connect(url) != nil { + //retry for a while + time.Sleep(100 * time.Millisecond) + if cl.client.Connect(url) != nil { + logger.Errorf("Failed to connect server %+v " + url.Location) + return errors.New("Failed to connect server " + url.Location) + } + } + //FIXME atomic operation + cl.init = true + return nil +} + // two way request func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration, result *protocol.RPCResult) error { + if er := client.doInit(url); er != nil { + return er + } request := NewRequest("2.0.2") request.Data = invocation request.Event = false @@ -102,6 +125,9 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url commo // async two way request func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration, callback common.AsyncCallback, result *protocol.RPCResult) error { + if er := client.doInit(url); er != nil { + return er + } request := NewRequest("2.0.2") request.Data = invocation request.Event = false @@ -123,7 +149,10 @@ func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url } // oneway request -func (client *ExchangeClient) Send(invocation *protocol.Invocation, timeout time.Duration) error { +func (client *ExchangeClient) Send(invocation *protocol.Invocation, url common.URL, timeout time.Duration) error { + if er := client.doInit(url); er != nil { + return er + } request := NewRequest("2.0.2") request.Data = invocation request.Event = false diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go index 4afb20334..bb87323d5 100644 --- a/remoting/getty/dubbo_codec_for_test.go +++ b/remoting/getty/dubbo_codec_for_test.go @@ -24,14 +24,18 @@ import ( "fmt" "strconv" "time" +) +import ( + perrors "github.com/pkg/errors" +) +import ( hessian "github.com/apache/dubbo-go-hessian2" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" "github.com/apache/dubbo-go/remoting" - perrors "github.com/pkg/errors" ) //SerialID serial ID @@ -44,7 +48,7 @@ const ( func init() { codec := &DubboTestCodec{} - remoting.NewCodec("dubbo", codec) + remoting.RegistryCodec("dubbo", codec) } // DubboPackage ... @@ -88,7 +92,7 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, resp *remoting.Response) err } if resp != nil { // for client - if p.Header.Type&hessian.PackageRequest != 0x00 { + if (p.Header.Type & hessian.PackageRequest) != 0x00 { // size of this array must be '7' // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 p.Body = make([]interface{}, 7) diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go index a4e3904a9..f264652a4 100644 --- a/remoting/getty/getty_client.go +++ b/remoting/getty/getty_client.go @@ -20,17 +20,20 @@ package getty import ( "math/rand" "time" +) - "github.com/apache/dubbo-go/remoting" +import ( "github.com/dubbogo/getty" - "gopkg.in/yaml.v2" - gxsync "github.com/dubbogo/gost/sync" + perrors "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" - perrors "github.com/pkg/errors" + "github.com/apache/dubbo-go/remoting" ) var ( @@ -166,25 +169,26 @@ func (c *Client) Close() { // send request func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error { - var ( - err error - session getty.Session - conn *gettyRPCClient - ) - conn, session, err = c.selectSession(c.addr) + //var ( + // err error + // session getty.Session + // conn *gettyRPCClient + //) + conn, session, err := c.selectSession(c.addr) if err != nil { return perrors.WithStack(err) } if session == nil { return errSessionNotExist } - defer func() { - if err == nil { - c.pool.put(conn) - return - } - conn.close() - }() + // FIXME remove temporarily + //defer func() { + // if err == nil { + // c.pool.put(conn) + // return + // } + // conn.close() + //}() if err = c.transfer(session, request, timeout); err != nil { return perrors.WithStack(err) diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go index f16c146e8..516051d05 100644 --- a/remoting/getty/getty_client_test.go +++ b/remoting/getty/getty_client_test.go @@ -24,22 +24,22 @@ import ( "sync" "testing" "time" +) - "github.com/apache/dubbo-go/common/proxy/proxy_factory" - - "github.com/apache/dubbo-go/config" - - "github.com/apache/dubbo-go/remoting" - - "github.com/apache/dubbo-go/protocol/invocation" - +import ( hessian "github.com/apache/dubbo-go-hessian2" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) +import ( "github.com/apache/dubbo-go/common" . "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/protocol" - perrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" ) func TestRunSuite(t *testing.T) { @@ -345,7 +345,7 @@ func testClient_AsyncCall(t *testing.T, svr *Server, url common.URL, client *Cli func InitTest(t *testing.T) (*Server, common.URL) { hessian.RegisterPOJO(&User{}) - remoting.NewCodec("dubbo", &DubboTestCodec{}) + remoting.RegistryCodec("dubbo", &DubboTestCodec{}) methods, err := common.ServiceMap.Register("dubbo", &UserProvider{}) assert.NoError(t, err) diff --git a/remoting/getty/getty_server.go b/remoting/getty/getty_server.go index 874d0401f..6dc15d251 100644 --- a/remoting/getty/getty_server.go +++ b/remoting/getty/getty_server.go @@ -20,17 +20,21 @@ package getty import ( "fmt" "net" +) - "github.com/apache/dubbo-go/protocol" - "github.com/apache/dubbo-go/protocol/invocation" - "github.com/apache/dubbo-go/remoting" +import ( "github.com/dubbogo/getty" + gxsync "github.com/dubbogo/gost/sync" + "gopkg.in/yaml.v2" +) +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" - gxsync "github.com/dubbogo/gost/sync" - "gopkg.in/yaml.v2" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" ) var ( diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index 5a9ab4999..777e14c2e 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -22,18 +22,21 @@ import ( "sync" "sync/atomic" "time" +) - "github.com/apache/dubbo-go/common/constant" - - "github.com/apache/dubbo-go/remoting" - +import ( hessian "github.com/apache/dubbo-go-hessian2" - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/protocol/invocation" "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting" +) + // todo: WritePkg_Timeout will entry *.yml const ( // WritePkg_Timeout ... diff --git a/remoting/getty/listener_test.go b/remoting/getty/listener_test.go index 49e13d39c..3bb659c23 100644 --- a/remoting/getty/listener_test.go +++ b/remoting/getty/listener_test.go @@ -20,14 +20,19 @@ package getty import ( "context" "testing" +) - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/protocol/invocation" +import ( "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" ) +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol/invocation" +) + // test rebuild the ctx func TestRebuildCtx(t *testing.T) { opentracing.SetGlobalTracer(mocktracer.New()) diff --git a/remoting/getty/readwriter.go b/remoting/getty/readwriter.go index 1aed516a3..8842f1b8d 100644 --- a/remoting/getty/readwriter.go +++ b/remoting/getty/readwriter.go @@ -19,16 +19,20 @@ package getty import ( "reflect" +) - "github.com/apache/dubbo-go/remoting" - +import ( hessian "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" - "github.com/apache/dubbo-go/common/logger" perrors "github.com/pkg/errors" ) +import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" +) + //////////////////////////////////////////// // RpcClientPackageHandler //////////////////////////////////////////// -- GitLab