diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index ee1fc44992e556a1d1f7d9024ae9031e198b2e29..82b95406397b4b1a52b4b8bd684ca7b5e3cece4d 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -27,8 +27,6 @@ import ( import ( "github.com/opentracing/opentracing-go" - perrors "github.com/pkg/errors" - uatomic "go.uber.org/atomic" ) import ( @@ -41,13 +39,6 @@ import ( "github.com/apache/dubbo-go/remoting" ) -var ( - // ErrNoReply - ErrNoReply = perrors.New("request need @response") - // ErrDestroyedInvoker - ErrDestroyedInvoker = perrors.New("request Destroyed invoker") -) - var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY, constant.VERSION_KEY} @@ -57,13 +48,11 @@ var ( type DubboInvoker struct { protocol.BaseInvoker // the exchange layer, it is focus on network communication. - client *remoting.ExchangeClient - quitOnce sync.Once + clientGuard *sync.RWMutex + client *remoting.ExchangeClient + quitOnce sync.Once // timeout for service(interface) level. timeout time.Duration - // Used to record the number of requests. -1 represent this DubboInvoker is destroyed - reqNum uatomic.Int64 - closed uatomic.Bool } // NewDubboInvoker constructor @@ -76,30 +65,52 @@ func NewDubboInvoker(url *common.URL, client *remoting.ExchangeClient) *DubboInv } di := &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), + clientGuard: &sync.RWMutex{}, client: client, timeout: requestTimeout, } - di.reqNum.Store(0) - di.closed.Store(false) return di } +func (di *DubboInvoker) setClient(client *remoting.ExchangeClient) { + di.clientGuard.Lock() + defer di.clientGuard.Unlock() + + di.client = client +} + +func (di *DubboInvoker) getClient() *remoting.ExchangeClient { + di.clientGuard.RLock() + defer di.clientGuard.RUnlock() + + return di.client +} + // Invoke call remoting. func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( err error result protocol.RPCResult ) - if di.closed.Load() { + if !di.BaseInvoker.IsAvailable() { // Generally, the case will not happen, because the invoker has been removed // from the invoker list before destroy,so no new request will enter the destroyed invoker logger.Warnf("this dubboInvoker is destroyed") - result.Err = ErrDestroyedInvoker + result.Err = protocol.ErrDestroyedInvoker + return &result + } + + di.AddInvokerTimes(1) + defer di.AddInvokerTimes(-1) + + if !di.BaseInvoker.IsAvailable() { + // Generally, the case will not happen, because the invoker has been removed + // from the invoker list before destroy,so no new request will enter the destroyed invoker + logger.Warnf("this dubboInvoker is destroying") + result.Err = protocol.ErrDestroyedInvoker return &result } - di.reqNum.Add(1) - defer di.reqNum.Add(-1) inv := invocation.(*invocation_impl.RPCInvocation) // init param @@ -127,18 +138,22 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati //response := NewResponse(inv.Reply(), nil) rest := &protocol.RPCResult{} timeout := di.getTimeout(inv) - if async { - if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { - //result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) - result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest) - } else { - result.Err = di.client.Send(&invocation, url, timeout) - } + client := di.getClient() + if client == nil { + result.Err = protocol.ErrClientClosed } else { - if inv.Reply() == nil { - result.Err = ErrNoReply + if async { + if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { + result.Err = client.AsyncRequest(&invocation, url, timeout, callBack, rest) + } else { + result.Err = client.Send(&invocation, url, timeout) + } } else { - result.Err = di.client.Request(&invocation, url, timeout, rest) + if inv.Reply() == nil { + result.Err = protocol.ErrNoReply + } else { + result.Err = client.Request(&invocation, url, timeout, rest) + } } } if result.Err == nil { @@ -166,25 +181,36 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti } func (di *DubboInvoker) IsAvailable() bool { - return di.client.IsAvailable() + client := di.getClient() + if client != nil { + return client.IsAvailable() + } + + return false } // Destroy destroy dubbo client invoker. func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { - di.closed.Store(true) + di.BaseInvoker.Stop() + var times int64 for { - if di.reqNum.Load() == 0 { - di.reqNum.Add(-1) - logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key()) + times = di.BaseInvoker.InvokeTimes() + if times == 0 { + di.BaseInvoker.AddInvokerTimes(-1) + logger.Infof("dubboInvoker is destroyed, url:{%s}", di.GetUrl().Key()) di.BaseInvoker.Destroy() - if di.client != nil { - di.client.Close() - di.client = nil + client := di.getClient() + if client != nil { + di.setClient(nil) + client.Close() } break + } else if times < 0 { + logger.Infof("impossible log: dubboInvoker has destroyed, url:{%s}", di.GetUrl().Key()) + break } - logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum.Load(), di.GetUrl().Key()) + logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", times, di.GetUrl().Key()) time.Sleep(1 * time.Second) } }) diff --git a/protocol/grpc/grpc_invoker.go b/protocol/grpc/grpc_invoker.go index 9647ecb9c9498c78d9df49a2223a905fda8c5c6e..4a4365caa70b42dcbefa8384da4a1d13d84fb883 100644 --- a/protocol/grpc/grpc_invoker.go +++ b/protocol/grpc/grpc_invoker.go @@ -21,43 +21,87 @@ import ( "context" "reflect" "sync" + "time" ) import ( + hessian2 "github.com/apache/dubbo-go-hessian2" "github.com/pkg/errors" "google.golang.org/grpc/connectivity" ) import ( - hessian2 "github.com/apache/dubbo-go-hessian2" - "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" ) -var errNoReply = errors.New("request need @response") +var ( + errNoReply = errors.New("request need @response") +) // nolint type GrpcInvoker struct { protocol.BaseInvoker - quitOnce sync.Once - client *Client + quitOnce sync.Once + clientGuard *sync.RWMutex + client *Client } // NewGrpcInvoker returns a Grpc invoker instance func NewGrpcInvoker(url *common.URL, client *Client) *GrpcInvoker { return &GrpcInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), + clientGuard: &sync.RWMutex{}, client: client, } } +func (gi *GrpcInvoker) setClient(client *Client) { + gi.clientGuard.Lock() + defer gi.clientGuard.Unlock() + + gi.client = client +} + +func (gi *GrpcInvoker) getClient() *Client { + gi.clientGuard.RLock() + defer gi.clientGuard.RUnlock() + + return gi.client +} + // Invoke is used to call service method by invocation func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( result protocol.RPCResult ) + if !gi.BaseInvoker.IsAvailable() { + // Generally, the case will not happen, because the invoker has been removed + // from the invoker list before destroy,so no new request will enter the destroyed invoker + logger.Warnf("this grpcInvoker is destroyed") + result.Err = protocol.ErrDestroyedInvoker + return &result + } + + gi.AddInvokerTimes(1) + defer gi.AddInvokerTimes(-1) + + client := gi.getClient() + if client == nil { + result.Err = protocol.ErrClientClosed + return &result + } + + if !gi.BaseInvoker.IsAvailable() { + // Generally, the case will not happen, because the invoker has been removed + // from the invoker list before destroy,so no new request will enter the destroyed invoker + logger.Warnf("this grpcInvoker is destroying") + result.Err = protocol.ErrDestroyedInvoker + return &result + } + if invocation.Reply() == nil { result.Err = errNoReply } @@ -67,7 +111,7 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio in = append(in, invocation.ParameterValues()...) methodName := invocation.MethodName() - method := gi.client.invoker.MethodByName(methodName) + method := client.invoker.MethodByName(methodName) res := method.Call(in) result.Rest = res[0] @@ -83,21 +127,47 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio // IsAvailable get available status func (gi *GrpcInvoker) IsAvailable() bool { - return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown + client := gi.getClient() + if client != nil { + return gi.BaseInvoker.IsAvailable() && client.GetState() != connectivity.Shutdown + } + + return false } // IsDestroyed get destroyed status func (gi *GrpcInvoker) IsDestroyed() bool { - return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown + client := gi.getClient() + if client != nil { + return gi.BaseInvoker.IsDestroyed() && client.GetState() == connectivity.Shutdown + } + + return false } // Destroy will destroy gRPC's invoker and client, so it is only called once func (gi *GrpcInvoker) Destroy() { gi.quitOnce.Do(func() { - gi.BaseInvoker.Destroy() - - if gi.client != nil { - _ = gi.client.Close() + gi.BaseInvoker.Stop() + var times int64 + for { + times = gi.BaseInvoker.InvokeTimes() + if times == 0 { + gi.BaseInvoker.AddInvokerTimes(-1) + logger.Infof("grpcInvoker is destroyed, url:{%s}", gi.GetUrl().Key()) + gi.BaseInvoker.Destroy() + client := gi.getClient() + if client != nil { + gi.setClient(nil) + client.Close() + } + break + } else if times < 0 { + logger.Infof("impossible log: grpcInvoker has destroyed, url:{%s}", gi.GetUrl().Key()) + break + } + logger.Warnf("GrpcInvoker is to be destroyed, wait {%v} req end, url:{%s}", times, gi.GetUrl().Key()) + time.Sleep(1 * time.Second) } }) } diff --git a/protocol/invoker.go b/protocol/invoker.go index 5657b6b5b2c6d13e001bec2ba3ca7b4f02fc7e60..632e03a940516e8ea3213a0b58b3b20636278801 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -21,11 +21,25 @@ import ( "context" ) +import ( + perrors "github.com/pkg/errors" + uatomic "go.uber.org/atomic" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/logger" ) +var ( + // ErrClientClosed means client has clossed. + ErrClientClosed = perrors.New("remoting client has closed") + // ErrNoReply + ErrNoReply = perrors.New("request need @response") + // ErrDestroyedInvoker + ErrDestroyedInvoker = perrors.New("request Destroyed invoker") +) + // Invoker the service invocation interface for the consumer //go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker // Extension - Invoker @@ -42,17 +56,22 @@ type Invoker interface { // BaseInvoker provides default invoker implement type BaseInvoker struct { url *common.URL - available bool - destroyed bool + available uatomic.Bool + destroyed uatomic.Bool + // Used to record the number of requests. -1 represent this invoker is destroyed + ivkNum uatomic.Int64 } // NewBaseInvoker creates a new BaseInvoker func NewBaseInvoker(url *common.URL) *BaseInvoker { - return &BaseInvoker{ - url: url, - available: true, - destroyed: false, + ivk := &BaseInvoker{ + url: url, } + ivk.available.Store(true) + ivk.destroyed.Store(false) + ivk.ivkNum.Store(0) + + return ivk } // GetUrl gets base invoker URL @@ -62,12 +81,22 @@ func (bi *BaseInvoker) GetUrl() *common.URL { // IsAvailable gets available flag func (bi *BaseInvoker) IsAvailable() bool { - return bi.available + return bi.available.Load() } // IsDestroyed gets destroyed flag func (bi *BaseInvoker) IsDestroyed() bool { - return bi.destroyed + return bi.destroyed.Load() +} + +// InvokeTimes atomically loads the wrapped value and return the invoke times. +func (bi *BaseInvoker) InvokeTimes() int64 { + return bi.ivkNum.Load() +} + +// AddInvokerTimes atomically adds to the wrapped int64 and returns the new value. +func (bi *BaseInvoker) AddInvokerTimes(num int64) int64 { + return bi.ivkNum.Add(num) } // Invoke provides default invoker implement @@ -75,9 +104,15 @@ func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Re return &RPCResult{} } +// Stop changes available flag +func (bi *BaseInvoker) Stop() { + logger.Infof("Stop invoker: %s", bi.GetUrl()) + bi.available.Store(false) +} + // Destroy changes available and destroyed flag func (bi *BaseInvoker) Destroy() { - logger.Infof("Destroy invoker: %s", bi.GetUrl().String()) - bi.destroyed = true - bi.available = false + logger.Infof("Destroy invoker: %s", bi.GetUrl()) + bi.destroyed.Store(true) + bi.available.Store(false) } diff --git a/protocol/invoker_test.go b/protocol/invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c3fdcc4936eb0e0e08cb5408d5ddcd0e692278b7 --- /dev/null +++ b/protocol/invoker_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package protocol + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +func TestBaseInvoker(t *testing.T) { + url, err := common.NewURL("dubbo://localhost:9090") + assert.Nil(t, err) + + ivk := NewBaseInvoker(url) + assert.NotNil(t, ivk.GetUrl()) + assert.True(t, ivk.IsAvailable()) + assert.False(t, ivk.IsDestroyed()) + assert.Zero(t, ivk.InvokeTimes()) + + ivk.AddInvokerTimes(1) + assert.True(t, ivk.InvokeTimes() == 1) + + ivk.Stop() + assert.False(t, ivk.IsAvailable()) + assert.False(t, ivk.IsDestroyed()) + + ivk.Destroy() + assert.False(t, ivk.IsAvailable()) + assert.True(t, ivk.IsDestroyed()) +} diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index 6783bf4684c09808326f04ee8ffb7c993045c896..d2b574648fa2fea5664fdf35b457aa99f14e323f 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -115,14 +115,14 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error) logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) return } - nl.cacheLock.Lock() - defer nl.cacheLock.Unlock() + addInstances := make([]model.Instance, 0, len(services)) delInstances := make([]model.Instance, 0, len(services)) updateInstances := make([]model.Instance, 0, len(services)) - newInstanceMap := make(map[string]model.Instance, len(services)) + nl.cacheLock.Lock() + defer nl.cacheLock.Unlock() for i := range services { if !services[i].Enable || !services[i].Valid { // instance is not available,so ignore it