diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 82b95406397b4b1a52b4b8bd684ca7b5e3cece4d..231db381f96bd6885e4c4797e89fe4b21dc7b48d 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -101,8 +101,14 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati return &result } - di.AddInvokerTimes(1) - defer di.AddInvokerTimes(-1) + di.clientGuard.RLock() + defer di.clientGuard.RUnlock() + + if di.client == nil { + result.Err = protocol.ErrClientClosed + logger.Debugf("result.Err: %v", result.Err) + return &result + } if !di.BaseInvoker.IsAvailable() { // Generally, the case will not happen, because the invoker has been removed @@ -138,22 +144,17 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati //response := NewResponse(inv.Reply(), nil) rest := &protocol.RPCResult{} timeout := di.getTimeout(inv) - client := di.getClient() - if client == nil { - result.Err = protocol.ErrClientClosed + if async { + if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { + result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest) + } else { + result.Err = di.client.Send(&invocation, url, timeout) + } } else { - if async { - if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { - result.Err = client.AsyncRequest(&invocation, url, timeout, callBack, rest) - } else { - result.Err = client.Send(&invocation, url, timeout) - } + if inv.Reply() == nil { + result.Err = protocol.ErrNoReply } else { - if inv.Reply() == nil { - result.Err = protocol.ErrNoReply - } else { - result.Err = client.Request(&invocation, url, timeout, rest) - } + result.Err = di.client.Request(&invocation, url, timeout, rest) } } if result.Err == nil { @@ -192,26 +193,11 @@ func (di *DubboInvoker) IsAvailable() bool { // Destroy destroy dubbo client invoker. func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { - di.BaseInvoker.Stop() - var times int64 - for { - times = di.BaseInvoker.InvokeTimes() - if times == 0 { - di.BaseInvoker.AddInvokerTimes(-1) - logger.Infof("dubboInvoker is destroyed, url:{%s}", di.GetUrl().Key()) - di.BaseInvoker.Destroy() - client := di.getClient() - if client != nil { - di.setClient(nil) - client.Close() - } - break - } else if times < 0 { - logger.Infof("impossible log: dubboInvoker has destroyed, url:{%s}", di.GetUrl().Key()) - break - } - logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", times, di.GetUrl().Key()) - time.Sleep(1 * time.Second) + di.BaseInvoker.Destroy() + client := di.getClient() + if client != nil { + di.setClient(nil) + client.Close() } }) } diff --git a/protocol/grpc/grpc_invoker.go b/protocol/grpc/grpc_invoker.go index 4a4365caa70b42dcbefa8384da4a1d13d84fb883..7b33c6704db7a365bb81a1ab5bef31f220378380 100644 --- a/protocol/grpc/grpc_invoker.go +++ b/protocol/grpc/grpc_invoker.go @@ -21,7 +21,6 @@ import ( "context" "reflect" "sync" - "time" ) import ( @@ -85,11 +84,10 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio return &result } - gi.AddInvokerTimes(1) - defer gi.AddInvokerTimes(-1) + gi.clientGuard.RLock() + defer gi.clientGuard.RUnlock() - client := gi.getClient() - if client == nil { + if gi.client == nil { result.Err = protocol.ErrClientClosed return &result } @@ -111,7 +109,7 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio in = append(in, invocation.ParameterValues()...) methodName := invocation.MethodName() - method := client.invoker.MethodByName(methodName) + method := gi.client.invoker.MethodByName(methodName) res := method.Call(in) result.Rest = res[0] @@ -148,26 +146,11 @@ func (gi *GrpcInvoker) IsDestroyed() bool { // Destroy will destroy gRPC's invoker and client, so it is only called once func (gi *GrpcInvoker) Destroy() { gi.quitOnce.Do(func() { - gi.BaseInvoker.Stop() - var times int64 - for { - times = gi.BaseInvoker.InvokeTimes() - if times == 0 { - gi.BaseInvoker.AddInvokerTimes(-1) - logger.Infof("grpcInvoker is destroyed, url:{%s}", gi.GetUrl().Key()) - gi.BaseInvoker.Destroy() - client := gi.getClient() - if client != nil { - gi.setClient(nil) - client.Close() - } - break - } else if times < 0 { - logger.Infof("impossible log: grpcInvoker has destroyed, url:{%s}", gi.GetUrl().Key()) - break - } - logger.Warnf("GrpcInvoker is to be destroyed, wait {%v} req end, url:{%s}", times, gi.GetUrl().Key()) - time.Sleep(1 * time.Second) + gi.BaseInvoker.Destroy() + client := gi.getClient() + if client != nil { + gi.setClient(nil) + client.Close() } }) } diff --git a/protocol/invoker.go b/protocol/invoker.go index 632e03a940516e8ea3213a0b58b3b20636278801..e799991686a954a3f8d84ee38b58c2be35c50789 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -58,8 +58,6 @@ type BaseInvoker struct { url *common.URL available uatomic.Bool destroyed uatomic.Bool - // Used to record the number of requests. -1 represent this invoker is destroyed - ivkNum uatomic.Int64 } // NewBaseInvoker creates a new BaseInvoker @@ -69,7 +67,6 @@ func NewBaseInvoker(url *common.URL) *BaseInvoker { } ivk.available.Store(true) ivk.destroyed.Store(false) - ivk.ivkNum.Store(0) return ivk } @@ -89,27 +86,11 @@ func (bi *BaseInvoker) IsDestroyed() bool { return bi.destroyed.Load() } -// InvokeTimes atomically loads the wrapped value and return the invoke times. -func (bi *BaseInvoker) InvokeTimes() int64 { - return bi.ivkNum.Load() -} - -// AddInvokerTimes atomically adds to the wrapped int64 and returns the new value. -func (bi *BaseInvoker) AddInvokerTimes(num int64) int64 { - return bi.ivkNum.Add(num) -} - // Invoke provides default invoker implement func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result { return &RPCResult{} } -// Stop changes available flag -func (bi *BaseInvoker) Stop() { - logger.Infof("Stop invoker: %s", bi.GetUrl()) - bi.available.Store(false) -} - // Destroy changes available and destroyed flag func (bi *BaseInvoker) Destroy() { logger.Infof("Destroy invoker: %s", bi.GetUrl()) diff --git a/protocol/invoker_test.go b/protocol/invoker_test.go index c3fdcc4936eb0e0e08cb5408d5ddcd0e692278b7..c2a587dc99a1be883de7431feb7e25d552ebce2b 100644 --- a/protocol/invoker_test.go +++ b/protocol/invoker_test.go @@ -37,14 +37,6 @@ func TestBaseInvoker(t *testing.T) { assert.NotNil(t, ivk.GetUrl()) assert.True(t, ivk.IsAvailable()) assert.False(t, ivk.IsDestroyed()) - assert.Zero(t, ivk.InvokeTimes()) - - ivk.AddInvokerTimes(1) - assert.True(t, ivk.InvokeTimes() == 1) - - ivk.Stop() - assert.False(t, ivk.IsAvailable()) - assert.False(t, ivk.IsDestroyed()) ivk.Destroy() assert.False(t, ivk.IsAvailable()) diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index de68e74906ecf14330d12bfb0ae42e86648ff9e3..e9a4bd317ef5482ff98b4f1a9d3920d5b86aad42 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -172,6 +172,11 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) // subscribe from registry func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { + role, _ := strconv.Atoi(nr.URL.GetParam(constant.ROLE_KEY, "")) + if role != common.CONSUMER { + return nil + } + for { if !nr.IsAvailable() { logger.Warnf("event listener game over.")