From 2db05a3992a7157430979ff0115e41cc4e7c57af Mon Sep 17 00:00:00 2001 From: pantianying <601666418@qq.com> Date: Sat, 15 Feb 2020 11:11:56 +0800 Subject: [PATCH] add code --- protocol/dubbo/dubbo_invoker.go | 34 ++++++++++++++++++++++----------- registry/directory/directory.go | 34 ++++++++++++++++++++++++--------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index a843f437d..fcd0d9fde 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -21,6 +21,8 @@ import ( "context" "strconv" "sync" + "sync/atomic" + "time" ) import ( @@ -47,10 +49,12 @@ var ( // DubboInvoker ... type DubboInvoker struct { - sync.RWMutex protocol.BaseInvoker client *Client quitOnce sync.Once + + // Used to record the number of requests + reqNum int64 } // NewDubboInvoker ... @@ -58,13 +62,16 @@ func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, + reqNum: 0, } } // Invoke ... func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - di.RLock() - defer di.RUnlock() + + atomic.AddInt64(&(di.reqNum), 1) + defer atomic.AddInt64(&(di.reqNum), -1) + var ( err error result protocol.RPCResult @@ -112,16 +119,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati // Destroy ... func (di *DubboInvoker) Destroy() { - di.Lock() - defer di.Unlock() - di.quitOnce.Do(func() { - di.BaseInvoker.Destroy() - - if di.client != nil { - di.client.Close() - di.client = nil + for { + if di.reqNum == 0 { + logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key()) + di.BaseInvoker.Destroy() + if di.client != nil { + di.client.Close() + di.client = nil + } + break + } + logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key()) + time.Sleep(1 * time.Second) } + }) } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index f9670af7e..268825360 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -109,7 +109,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { } func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { - var url *common.URL + var ( + url *common.URL + oldInvoker protocol.Invoker = nil + ) //judge is override or others if res != nil { url = &res.Service @@ -126,10 +129,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { switch res.Action { case remoting.EventTypeAdd, remoting.EventTypeUpdate: //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) - dir.cacheInvoker(url) + oldInvoker = dir.cacheInvoker(url) case remoting.EventTypeDel: //dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL) - dir.uncacheInvoker(url) + oldInvoker = dir.uncacheInvoker(url) logger.Infof("selector delete service url{%s}", res.Service) default: return @@ -138,8 +141,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { newInvokers := dir.toGroupInvokers() dir.listenerLock.Lock() - defer dir.listenerLock.Unlock() dir.cacheInvokers = newInvokers + dir.listenerLock.Unlock() + // After dir.cacheInvokers is updated,destroy the oldInvoker + // Ensure that no request will enter the oldInvoker + if oldInvoker != nil { + oldInvoker.Destroy() + } + } func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { @@ -177,12 +186,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -func (dir *registryDirectory) uncacheInvoker(url *common.URL) { +// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) - dir.cacheInvokersMap.Delete(url.Key()) + if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { + dir.cacheInvokersMap.Delete(url.Key()) + return cacheInvoker.(protocol.Invoker) + } + return nil } -func (dir *registryDirectory) cacheInvoker(url *common.URL) { +// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -193,7 +208,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { } if url == nil { logger.Error("URL is nil ,pls check if service url is subscribe successfully!") - return + return nil } //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { @@ -210,10 +225,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) if newInvoker != nil { dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) - cacheInvoker.(protocol.Invoker).Destroy() + return cacheInvoker.(protocol.Invoker) } } } + return nil } //select the protocol invokers from the directory -- GitLab