Skip to content
Snippets Groups Projects
Commit 95065e54 authored by Ming Deng's avatar Ming Deng Committed by fangyincheng
Browse files

Merge pull request #358 from pantianying/addRlockForDubboInvoker

Fix:deal the panic when invoker destroy
parent 0b9ba3f8
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,8 @@ package dubbo ...@@ -20,6 +20,8 @@ package dubbo
import ( import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time"
) )
import ( import (
...@@ -34,7 +36,11 @@ import ( ...@@ -34,7 +36,11 @@ import (
invocation_impl "github.com/apache/dubbo-go/protocol/invocation" invocation_impl "github.com/apache/dubbo-go/protocol/invocation"
) )
var Err_No_Reply = perrors.New("request need @response") var (
// ErrNoReply ...
ErrNoReply = perrors.New("request need @response")
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)
var ( var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY}
...@@ -44,12 +50,15 @@ type DubboInvoker struct { ...@@ -44,12 +50,15 @@ type DubboInvoker struct {
protocol.BaseInvoker protocol.BaseInvoker
client *Client client *Client
quitOnce sync.Once quitOnce sync.Once
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
} }
func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker {
return &DubboInvoker{ return &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url), BaseInvoker: *protocol.NewBaseInvoker(url),
client: client, client: client,
reqNum: 0,
} }
} }
...@@ -59,6 +68,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { ...@@ -59,6 +68,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
err error err error
result protocol.RPCResult result protocol.RPCResult
) )
if di.reqNum < 0 {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)
inv := invocation.(*invocation_impl.RPCInvocation) inv := invocation.(*invocation_impl.RPCInvocation)
for _, k := range attachmentKey { for _, k := range attachmentKey {
...@@ -82,7 +100,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { ...@@ -82,7 +100,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
} }
} else { } else {
if inv.Reply() == nil { if inv.Reply() == nil {
result.Err = Err_No_Reply result.Err = ErrNoReply
} else { } else {
result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response)
} }
...@@ -98,10 +116,20 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { ...@@ -98,10 +116,20 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (di *DubboInvoker) Destroy() { func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() { di.quitOnce.Do(func() {
di.BaseInvoker.Destroy() for {
if di.reqNum == 0 {
if di.client != nil { di.reqNum = -1
di.client.Close() logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
} }
}) })
} }
...@@ -106,7 +106,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { ...@@ -106,7 +106,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
} }
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var url *common.URL var (
url *common.URL
oldInvoker protocol.Invoker = nil
)
//judge is override or others //judge is override or others
if res != nil { if res != nil {
url = &res.Service url = &res.Service
...@@ -123,10 +126,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { ...@@ -123,10 +126,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action { switch res.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate: case remoting.EventTypeAdd, remoting.EventTypeUpdate:
//dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL)
dir.cacheInvoker(url) oldInvoker = dir.cacheInvoker(url)
case remoting.EventTypeDel: case remoting.EventTypeDel:
//dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL) //dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL)
dir.uncacheInvoker(url) oldInvoker = dir.uncacheInvoker(url)
logger.Infof("selector delete service url{%s}", res.Service) logger.Infof("selector delete service url{%s}", res.Service)
default: default:
return return
...@@ -135,8 +138,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { ...@@ -135,8 +138,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
newInvokers := dir.toGroupInvokers() newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock() dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers dir.cacheInvokers = newInvokers
dir.listenerLock.Unlock()
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
if oldInvoker != nil {
oldInvoker.Destroy()
}
} }
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
...@@ -174,12 +183,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { ...@@ -174,12 +183,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList return groupInvokersList
} }
func (dir *registryDirectory) uncacheInvoker(url *common.URL) { // uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
dir.cacheInvokersMap.Delete(url.Key()) if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
return cacheInvoker.(protocol.Invoker)
}
return nil
} }
func (dir *registryDirectory) cacheInvoker(url *common.URL) { // cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl()) dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL referenceUrl := dir.GetDirectoryUrl().SubURL
...@@ -190,7 +205,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { ...@@ -190,7 +205,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
} }
if url == nil { if url == nil {
logger.Error("URL is nil ,pls check if service url is subscribe successfully!") logger.Error("URL is nil ,pls check if service url is subscribe successfully!")
return return nil
} }
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
...@@ -207,10 +222,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { ...@@ -207,10 +222,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil { if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy() return cacheInvoker.(protocol.Invoker)
} }
} }
} }
return nil
} }
//select the protocol invokers from the directory //select the protocol invokers from the directory
...@@ -235,10 +251,11 @@ func (dir *registryDirectory) IsAvailable() bool { ...@@ -235,10 +251,11 @@ func (dir *registryDirectory) IsAvailable() bool {
func (dir *registryDirectory) Destroy() { func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe //TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() { dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers { invokers := dir.cacheInvokers
dir.cacheInvokers = []protocol.Invoker{}
for _, ivk := range invokers {
ivk.Destroy() ivk.Destroy()
} }
dir.cacheInvokers = []protocol.Invoker{}
}) })
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment