Skip to content
Snippets Groups Projects
Unverified Commit 7d52dd11 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #1045 from apache/feature/dubbo_invoker_reqnum

Imp: destroy invoker smoothly
parents d72fcd4d d84ece59
No related branches found
No related tags found
No related merge requests found
......@@ -22,13 +22,11 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
import (
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)
import (
......@@ -41,28 +39,20 @@ import (
"github.com/apache/dubbo-go/remoting"
)
var (
// ErrNoReply
ErrNoReply = perrors.New("request need @response")
// ErrDestroyedInvoker
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)
var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY,
constant.VERSION_KEY}
)
// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip.
// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refers to one service and ip.
type DubboInvoker struct {
protocol.BaseInvoker
// the exchange layer, it is focus on network communication.
client *remoting.ExchangeClient
quitOnce sync.Once
clientGuard *sync.RWMutex
client *remoting.ExchangeClient
quitOnce sync.Once
// timeout for service(interface) level.
timeout time.Duration
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}
// NewDubboInvoker constructor
......@@ -73,12 +63,28 @@ func NewDubboInvoker(url *common.URL, client *remoting.ExchangeClient) *DubboInv
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
return &DubboInvoker{
di := &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
reqNum: 0,
timeout: requestTimeout,
}
return di
}
func (di *DubboInvoker) setClient(client *remoting.ExchangeClient) {
di.clientGuard.Lock()
defer di.clientGuard.Unlock()
di.client = client
}
func (di *DubboInvoker) getClient() *remoting.ExchangeClient {
di.clientGuard.RLock()
defer di.clientGuard.RUnlock()
return di.client
}
// Invoke call remoting.
......@@ -87,15 +93,30 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
err error
result protocol.RPCResult
)
if atomic.LoadInt64(&di.reqNum) < 0 {
if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
result.Err = protocol.ErrDestroyedInvoker
return &result
}
di.clientGuard.RLock()
defer di.clientGuard.RUnlock()
if di.client == nil {
result.Err = protocol.ErrClientClosed
logger.Debugf("result.Err: %v", result.Err)
return &result
}
if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)
inv := invocation.(*invocation_impl.RPCInvocation)
// init param
......@@ -125,14 +146,13 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
timeout := di.getTimeout(inv)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
//result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = di.client.Send(&invocation, url, timeout)
}
} else {
if inv.Reply() == nil {
result.Err = ErrNoReply
result.Err = protocol.ErrNoReply
} else {
result.Err = di.client.Request(&invocation, url, timeout, rest)
}
......@@ -162,27 +182,23 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
}
func (di *DubboInvoker) IsAvailable() bool {
return di.client.IsAvailable()
client := di.getClient()
if client != nil {
return client.IsAvailable()
}
return false
}
// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
di.BaseInvoker.Destroy()
client := di.getClient()
if client != nil {
di.setClient(nil)
client.Close()
}
})
}
......
......@@ -24,40 +24,82 @@ import (
)
import (
hessian2 "github.com/apache/dubbo-go-hessian2"
"github.com/pkg/errors"
"google.golang.org/grpc/connectivity"
)
import (
hessian2 "github.com/apache/dubbo-go-hessian2"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
var errNoReply = errors.New("request need @response")
var (
errNoReply = errors.New("request need @response")
)
// nolint
type GrpcInvoker struct {
protocol.BaseInvoker
quitOnce sync.Once
client *Client
quitOnce sync.Once
clientGuard *sync.RWMutex
client *Client
}
// NewGrpcInvoker returns a Grpc invoker instance
func NewGrpcInvoker(url *common.URL, client *Client) *GrpcInvoker {
return &GrpcInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
}
}
func (gi *GrpcInvoker) setClient(client *Client) {
gi.clientGuard.Lock()
defer gi.clientGuard.Unlock()
gi.client = client
}
func (gi *GrpcInvoker) getClient() *Client {
gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()
return gi.client
}
// Invoke is used to call service method by invocation
func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.RPCResult
)
if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroyed")
result.Err = protocol.ErrDestroyedInvoker
return &result
}
gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()
if gi.client == nil {
result.Err = protocol.ErrClientClosed
return &result
}
if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}
if invocation.Reply() == nil {
result.Err = errNoReply
}
......@@ -83,21 +125,32 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
// IsAvailable get available status
func (gi *GrpcInvoker) IsAvailable() bool {
return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsAvailable() && client.GetState() != connectivity.Shutdown
}
return false
}
// IsDestroyed get destroyed status
func (gi *GrpcInvoker) IsDestroyed() bool {
return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsDestroyed() && client.GetState() == connectivity.Shutdown
}
return false
}
// Destroy will destroy gRPC's invoker and client, so it is only called once
func (gi *GrpcInvoker) Destroy() {
gi.quitOnce.Do(func() {
gi.BaseInvoker.Destroy()
if gi.client != nil {
_ = gi.client.Close()
client := gi.getClient()
if client != nil {
gi.setClient(nil)
client.Close()
}
})
}
......@@ -21,11 +21,25 @@ import (
"context"
)
import (
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
var (
// ErrClientClosed means client has clossed.
ErrClientClosed = perrors.New("remoting client has closed")
// ErrNoReply
ErrNoReply = perrors.New("request need @response")
// ErrDestroyedInvoker
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)
// Invoker the service invocation interface for the consumer
//go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker
// Extension - Invoker
......@@ -42,17 +56,19 @@ type Invoker interface {
// BaseInvoker provides default invoker implement
type BaseInvoker struct {
url *common.URL
available bool
destroyed bool
available uatomic.Bool
destroyed uatomic.Bool
}
// NewBaseInvoker creates a new BaseInvoker
func NewBaseInvoker(url *common.URL) *BaseInvoker {
return &BaseInvoker{
url: url,
available: true,
destroyed: false,
ivk := &BaseInvoker{
url: url,
}
ivk.available.Store(true)
ivk.destroyed.Store(false)
return ivk
}
// GetUrl gets base invoker URL
......@@ -62,12 +78,12 @@ func (bi *BaseInvoker) GetUrl() *common.URL {
// IsAvailable gets available flag
func (bi *BaseInvoker) IsAvailable() bool {
return bi.available
return bi.available.Load()
}
// IsDestroyed gets destroyed flag
func (bi *BaseInvoker) IsDestroyed() bool {
return bi.destroyed
return bi.destroyed.Load()
}
// Invoke provides default invoker implement
......@@ -77,7 +93,7 @@ func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Re
// Destroy changes available and destroyed flag
func (bi *BaseInvoker) Destroy() {
logger.Infof("Destroy invoker: %s", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
logger.Infof("Destroy invoker: %s", bi.GetUrl())
bi.destroyed.Store(true)
bi.available.Store(false)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package protocol
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
)
func TestBaseInvoker(t *testing.T) {
url, err := common.NewURL("dubbo://localhost:9090")
assert.Nil(t, err)
ivk := NewBaseInvoker(url)
assert.NotNil(t, ivk.GetUrl())
assert.True(t, ivk.IsAvailable())
assert.False(t, ivk.IsDestroyed())
ivk.Destroy()
assert.False(t, ivk.IsAvailable())
assert.True(t, ivk.IsDestroyed())
}
......@@ -115,14 +115,14 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error)
logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
return
}
nl.cacheLock.Lock()
defer nl.cacheLock.Unlock()
addInstances := make([]model.Instance, 0, len(services))
delInstances := make([]model.Instance, 0, len(services))
updateInstances := make([]model.Instance, 0, len(services))
newInstanceMap := make(map[string]model.Instance, len(services))
nl.cacheLock.Lock()
defer nl.cacheLock.Unlock()
for i := range services {
if !services[i].Enable || !services[i].Valid {
// instance is not available,so ignore it
......
......@@ -172,6 +172,11 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error)
// subscribe from registry
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
role, _ := strconv.Atoi(nr.URL.GetParam(constant.ROLE_KEY, ""))
if role != common.CONSUMER {
return nil
}
for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment