Skip to content
Snippets Groups Projects
Commit 2db05a39 authored by pantianying's avatar pantianying
Browse files

add code

parent 4093e770
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,8 @@ import ( ...@@ -21,6 +21,8 @@ import (
"context" "context"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time"
) )
import ( import (
...@@ -47,10 +49,12 @@ var ( ...@@ -47,10 +49,12 @@ var (
// DubboInvoker ... // DubboInvoker ...
type DubboInvoker struct { type DubboInvoker struct {
sync.RWMutex
protocol.BaseInvoker protocol.BaseInvoker
client *Client client *Client
quitOnce sync.Once quitOnce sync.Once
// Used to record the number of requests
reqNum int64
} }
// NewDubboInvoker ... // NewDubboInvoker ...
...@@ -58,13 +62,16 @@ func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { ...@@ -58,13 +62,16 @@ func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{ return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url), BaseInvoker: *protocol.NewBaseInvoker(url),
client: client, client: client,
reqNum: 0,
} }
} }
// Invoke ... // Invoke ...
func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
di.RLock()
defer di.RUnlock() atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)
var ( var (
err error err error
result protocol.RPCResult result protocol.RPCResult
...@@ -112,16 +119,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati ...@@ -112,16 +119,21 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
// Destroy ... // Destroy ...
func (di *DubboInvoker) Destroy() { func (di *DubboInvoker) Destroy() {
di.Lock()
defer di.Unlock()
di.quitOnce.Do(func() { di.quitOnce.Do(func() {
di.BaseInvoker.Destroy() for {
if di.reqNum == 0 {
if di.client != nil { logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.client.Close() di.BaseInvoker.Destroy()
di.client = nil if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
} }
}) })
} }
......
...@@ -109,7 +109,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { ...@@ -109,7 +109,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
} }
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var url *common.URL var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
//judge is override or others //judge is override or others
if res != nil { if res != nil {
url = &res.Service url = &res.Service
...@@ -126,10 +129,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { ...@@ -126,10 +129,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action { switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate: case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(url) oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel: case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL) //dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(url) oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service) logger.Infof("selector delete service url{%s}", res.Service)
default: default:
return return
...@@ -138,8 +141,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { ...@@ -138,8 +141,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
newInvokers := dir.toGroupInvokers() newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock() dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers dir.cacheInvokers = newInvokers
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
}
} }
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
...@@ -177,12 +186,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { ...@@ -177,12 +186,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList return groupInvokersList
} }
func (dir *registryDirectory) uncacheInvoker(url *common.URL) { // uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
dir.cacheInvokersMap.Delete(url.Key()) if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return cacheInvoker.(protocol.Invoker)
}
return nil
} }
func (dir *registryDirectory) cacheInvoker(url *common.URL) { // cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl()) dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL referenceUrl := dir.GetDirectoryUrl().SubURL
...@@ -193,7 +208,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { ...@@ -193,7 +208,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
} }
if url == nil { if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!") logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return return nil
} }
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
...@@ -210,10 +225,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { ...@@ -210,10 +225,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil { if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy() return cacheInvoker.(protocol.Invoker)
} }
} }
} }
return nil
} }
//select the protocol invokers from the directory //select the protocol invokers from the directory
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment