diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go index d7d305681808c64b452750e7d475c384b5f31d22..695ca21b0ed53dcf94907223e4c222af17311db9 100644 --- a/cluster/loadbalance/least_active.go +++ b/cluster/loadbalance/least_active.go @@ -50,7 +50,7 @@ func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation var ( leastActive int32 = -1 // The least active value of all invokers totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) - firstWeight int64 = 0 // Initial value, used for comparision + firstWeight int64 = 0 // Initial value, used for comparison leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE) leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) sameWeight = true // Every invoker has the same weight value? diff --git a/config/service_config_test.go b/config/service_config_test.go index b618085bd1f30b5854a98cd085feb11ee1bb95b8..b6080ed674ea01507305626c89fc86e205146b94 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -112,7 +112,8 @@ func Test_Export(t *testing.T) { doinit() extension.SetProtocol("registry", GetProtocol) - for _, service := range providerConfig.Services { + for i := 0; i < len(providerConfig.Services); i++ { + service := providerConfig.Services[i] service.Implement(&MockService{}) service.Export() assert.Condition(t, func() bool { diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index d6155b6021a07e51e4d4a779e56aff5e62bd40ce..56f95090c2658bc2760ca72e4fd662f2b99d95e0 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -82,6 +82,11 @@ func init() { func SetClientConf(c ClientConfig) { clientConf = &c + err := clientConf.CheckValidity() + if err != nil { + logger.Warnf("[ClientConfig CheckValidity] error: %v", err) + return + } } func GetClientConf() ClientConfig { @@ -148,14 +153,13 @@ type Client struct { pool *gettyRPCClientPool sequence atomic.Uint64 - pendingLock sync.RWMutex - pendingResponses map[SequenceType]*PendingResponse + pendingResponses *sync.Map } func NewClient() *Client { c := &Client{ - pendingResponses: make(map[SequenceType]*PendingResponse), + pendingResponses: new(sync.Map), conf: *clientConf, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -201,13 +205,6 @@ func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args i return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } -func (c *Client) GetPendingResponse(seq SequenceType) *PendingResponse { - c.pendingLock.RLock() - defer c.pendingLock.RUnlock() - - return c.pendingResponses[SequenceType(seq)] -} - func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { @@ -330,20 +327,16 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, } func (c *Client) addPendingResponse(pr *PendingResponse) { - c.pendingLock.Lock() - defer c.pendingLock.Unlock() - c.pendingResponses[SequenceType(pr.seq)] = pr + c.pendingResponses.Store(SequenceType(pr.seq), pr) } func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse { - c.pendingLock.Lock() - defer c.pendingLock.Unlock() if c.pendingResponses == nil { return nil } - if presp, ok := c.pendingResponses[seq]; ok { - delete(c.pendingResponses, seq) - return presp + if presp, ok := c.pendingResponses.Load(seq); ok { + c.pendingResponses.Delete(seq) + return presp.(*PendingResponse) } return nil } diff --git a/protocol/dubbo/client_test.go b/protocol/dubbo/client_test.go index 1ea9e4fb0e02a1e1234e8026f5a291398508133c..f4a5f4a8474b30b13bf7598bc634ac722955d91b 100644 --- a/protocol/dubbo/client_test.go +++ b/protocol/dubbo/client_test.go @@ -40,7 +40,7 @@ func TestClient_CallOneway(t *testing.T) { proto, url := InitTest(t) c := &Client{ - pendingResponses: make(map[SequenceType]*PendingResponse), + pendingResponses: new(sync.Map), conf: *clientConf, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -57,7 +57,7 @@ func TestClient_Call(t *testing.T) { proto, url := InitTest(t) c := &Client{ - pendingResponses: make(map[SequenceType]*PendingResponse), + pendingResponses: new(sync.Map), conf: *clientConf, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -118,7 +118,7 @@ func TestClient_AsyncCall(t *testing.T) { proto, url := InitTest(t) c := &Client{ - pendingResponses: make(map[SequenceType]*PendingResponse), + pendingResponses: new(sync.Map), conf: *clientConf, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 9551c2b0b7b742228ae69701a9d86975d2c85e52..cb57fc896f5e1c0be99a1e5978841ae17503064b 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -40,9 +40,9 @@ const ( type CallType int32 const ( - CT_UNKOWN CallType = 0 - CT_OneWay CallType = 1 - CT_TwoWay CallType = 2 + CT_UNKNOWN CallType = 0 + CT_OneWay CallType = 1 + CT_TwoWay CallType = 2 ) //////////////////////////////////////////// @@ -88,11 +88,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { return perrors.Errorf("opts[0] is not of type *Client") } - pendingRsp := client.GetPendingResponse(SequenceType(p.Header.ID)) - if pendingRsp == nil { + pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) + if !ok { return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) } else { - p.Body = &hessian.Response{RspObj: pendingRsp.reply} + p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).reply} } } diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 4368bb4630900eb2c4eece760b3b3e5c4887c478..182d6d8e0b11cfcb231789cebf9c4cefdecfa258 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -36,7 +36,7 @@ func TestDubboInvoker_Invoke(t *testing.T) { proto, url := InitTest(t) c := &Client{ - pendingResponses: make(map[SequenceType]*PendingResponse), + pendingResponses: new(sync.Map), conf: *clientConf, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index 3543d8da803b00befe9e08286bd67c09cd0afef2..26ce4a1906d5d6fe425f23984586914c293f47a4 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -48,7 +48,7 @@ func TestDubboProtocol_Export(t *testing.T) { eq := exporter.GetInvoker().GetUrl().URLEqual(url) assert.True(t, eq) - // second service: the same path and the diferent version + // second service: the same path and the different version url2, err := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 15e222676afe5579fc53df46a3983b55e5d3f2b4..55ee929301e4e62e6b868c5b85b9952fc354b723 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -190,7 +190,7 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { p, ok := pkg.(*DubboPackage) if !ok { - logger.Errorf("illegal packge{%#v}", pkg) + logger.Errorf("illegal package{%#v}", pkg) return } p.Header.ResponseStatus = hessian.Response_OK diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index 529aa759a5c39a467a1f72560d56c1b48738a9a6..042b8789104d1e671807405a81045dc30adcf789 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -46,8 +46,6 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { } func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - p.client.pendingLock.RLock() - defer p.client.pendingLock.RUnlock() pkg := &DubboPackage{} buf := bytes.NewBuffer(data) diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index 5b5548067225bcf6d8bcbaf35cee63c829c03edc..22dc7cfc49c978a7c042652158210ef6fda48892 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -152,7 +152,9 @@ func (s *Server) handlePkg(conn net.Conn) { timeout, err := time.ParseDuration(reqHeader["Timeout"]) if err == nil { httpTimeout = timeout - ctx, _ = context.WithTimeout(ctx, httpTimeout) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, httpTimeout) + defer cancel() } delete(reqHeader, "Timeout") }