diff --git a/README.md b/README.md index 3afeb1df8dda6abe5ea8baf49d1ff1e79bc2e9ea..547ee0054ff5e76d500bccb0526e137476bbf227 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Apache Dubbo-go [涓枃](./README_CN.md) # -[](https://travis-ci.org/apache/dubbo-go) +[](https://travis-ci.org/apache/dubbo-go) [](https://codecov.io/gh/apache/dubbo-go) [](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc) [](https://goreportcard.com/report/github.com/apache/dubbo-go) diff --git a/README_CN.md b/README_CN.md index 9f3b13cd53dcb075298c5651d0b520e0eabba7e0..43f242374b620ef470dee9274cad741503a6e67a 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,6 +1,6 @@ # Apache Dubbo-go [English](./README.md) # -[](https://travis-ci.org/apache/dubbo-go) +[](https://travis-ci.org/apache/dubbo-go) [](https://codecov.io/gh/apache/dubbo-go) [](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc) [](https://goreportcard.com/report/github.com/apache/dubbo-go) diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 8134a9024d1154604e49ae4df2654dc17c3912d4..ee1fc44992e556a1d1f7d9024ae9031e198b2e29 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -22,13 +22,13 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" ) import ( "github.com/opentracing/opentracing-go" perrors "github.com/pkg/errors" + uatomic "go.uber.org/atomic" ) import ( @@ -53,7 +53,7 @@ var ( constant.VERSION_KEY} ) -// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip. +// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refers to one service and ip. type DubboInvoker struct { protocol.BaseInvoker // the exchange layer, it is focus on network communication. @@ -62,7 +62,8 @@ type DubboInvoker struct { // timeout for service(interface) level. timeout time.Duration // Used to record the number of requests. -1 represent this DubboInvoker is destroyed - reqNum int64 + reqNum uatomic.Int64 + closed uatomic.Bool } // NewDubboInvoker constructor @@ -73,12 +74,15 @@ func NewDubboInvoker(url *common.URL, client *remoting.ExchangeClient) *DubboInv if t, err := time.ParseDuration(requestTimeoutStr); err == nil { requestTimeout = t } - return &DubboInvoker{ + di := &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, - reqNum: 0, timeout: requestTimeout, } + di.reqNum.Store(0) + di.closed.Store(false) + + return di } // Invoke call remoting. @@ -87,15 +91,15 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati err error result protocol.RPCResult ) - if atomic.LoadInt64(&di.reqNum) < 0 { + if di.closed.Load() { // Generally, the case will not happen, because the invoker has been removed // from the invoker list before destroy,so no new request will enter the destroyed invoker logger.Warnf("this dubboInvoker is destroyed") result.Err = ErrDestroyedInvoker return &result } - atomic.AddInt64(&(di.reqNum), 1) - defer atomic.AddInt64(&(di.reqNum), -1) + di.reqNum.Add(1) + defer di.reqNum.Add(-1) inv := invocation.(*invocation_impl.RPCInvocation) // init param @@ -168,9 +172,10 @@ func (di *DubboInvoker) IsAvailable() bool { // Destroy destroy dubbo client invoker. func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { + di.closed.Store(true) for { - if di.reqNum == 0 { - di.reqNum = -1 + if di.reqNum.Load() == 0 { + di.reqNum.Add(-1) logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key()) di.BaseInvoker.Destroy() if di.client != nil { @@ -179,10 +184,9 @@ func (di *DubboInvoker) Destroy() { } break } - logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key()) + logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum.Load(), di.GetUrl().Key()) time.Sleep(1 * time.Second) } - }) }