Skip to content
Snippets Groups Projects
Commit cc484faa authored by flycash's avatar flycash
Browse files

Context support

parent 655ac881
No related branches found
No related tags found
No related merge requests found
Showing
with 64 additions and 48 deletions
......@@ -18,6 +18,7 @@ limitations under the License.
package cluster_impl
import (
"context"
"fmt"
)
......@@ -40,7 +41,7 @@ func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *availableClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
......@@ -54,7 +55,7 @@ func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) p
for _, ivk := range invokers {
if ivk.IsAvailable() {
return ivk.Invoke(invocation)
return ivk.Invoke(ctx, invocation)
}
}
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))}
......
......@@ -66,7 +66,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
......@@ -80,7 +80,7 @@ func TestAvailableClusterInvokerNoAvail(t *testing.T) {
invoker.EXPECT().IsAvailable().Return(false)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NotNil(t, result.Error())
assert.True(t, strings.Contains(result.Error().Error(), "no provider available"))
......
......@@ -18,6 +18,8 @@ limitations under the License.
package cluster_impl
import (
"context"
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
......@@ -33,7 +35,7 @@ func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *broadcastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
......@@ -46,7 +48,7 @@ func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) p
var result protocol.Result
for _, ivk := range invokers {
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk)
err = result.Error()
......
......@@ -74,7 +74,7 @@ func Test_BroadcastInvokeSuccess(t *testing.T) {
clusterInvoker := registerBroadcast(t, invokers...)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
......@@ -104,6 +104,6 @@ func Test_BroadcastInvokeFailed(t *testing.T) {
clusterInvoker := registerBroadcast(t, invokers...)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error())
}
......@@ -18,6 +18,7 @@
package cluster_impl
import (
"context"
"strconv"
"sync"
"time"
......@@ -71,7 +72,7 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
return invoker
}
func (invoker *failbackClusterInvoker) process() {
func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
......@@ -102,7 +103,7 @@ func (invoker *failbackClusterInvoker) process() {
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
......@@ -126,7 +127,7 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err
}
}
func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
......@@ -150,11 +151,11 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
go invoker.process()
go invoker.process(ctx)
})
taskLen := invoker.taskList.Len()
......
......@@ -72,7 +72,7 @@ func Test_FailbackSuceess(t *testing.T) {
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}
......@@ -102,7 +102,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
return mockSuccResult
})
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
......@@ -150,7 +150,7 @@ func Test_FailbackRetryFailed(t *testing.T) {
}
// first call should failed.
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
......@@ -192,7 +192,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
}).Times(10)
for i := 0; i < 10; i++ {
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
......@@ -222,14 +222,14 @@ func Test_FailbackOutOfLimit(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11)
// reached limit
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
// all will be out of limit
for i := 0; i < 10; i++ {
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))
......
......@@ -18,6 +18,8 @@ limitations under the License.
package cluster_impl
import (
"context"
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/protocol"
)
......@@ -32,7 +34,7 @@ func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failfastClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
......@@ -47,5 +49,5 @@ func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) pr
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, nil)
return ivk.Invoke(invocation)
return ivk.Invoke(ctx, invocation)
}
......@@ -69,7 +69,7 @@ func Test_FailfastInvokeSuccess(t *testing.T) {
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
res := result.Result().(rest)
......@@ -89,7 +89,7 @@ func Test_FailfastInvokeFail(t *testing.T) {
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NotNil(t, result.Error())
assert.Equal(t, "error", result.Error().Error())
......
......@@ -18,6 +18,7 @@
package cluster_impl
import (
"context"
"strconv"
)
......@@ -43,7 +44,7 @@ func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
......@@ -95,7 +96,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
}
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
......
......@@ -77,7 +77,7 @@ type rest struct {
success bool
}
func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (bi *MockInvoker) Invoke(c context.Context, invocation protocol.Invocation) protocol.Result {
count++
var success bool
var err error = nil
......@@ -112,9 +112,9 @@ func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocatio
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failoverCluster.Join(staticDir)
if len(invocations) > 0 {
return clusterInvoker.Invoke(invocations[0])
return clusterInvoker.Invoke(context.TODO(), invocations[0])
}
return clusterInvoker.Invoke(&invocation.RPCInvocation{})
return clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
}
func Test_FailoverInvokeSuccess(t *testing.T) {
urlParams := url.Values{}
......@@ -162,7 +162,7 @@ func Test_FailoverDestroy(t *testing.T) {
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failoverCluster.Join(staticDir)
assert.Equal(t, true, clusterInvoker.IsAvailable())
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
clusterInvoker.Destroy()
......
......@@ -18,6 +18,8 @@
package cluster_impl
import (
"context"
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
......@@ -42,7 +44,7 @@ func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failsafeClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
......@@ -65,7 +67,7 @@ func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) pr
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(invocation)
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
// ignore
logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error())
......
......@@ -69,7 +69,7 @@ func Test_FailSafeInvokeSuccess(t *testing.T) {
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
res := result.Result().(rest)
......@@ -88,7 +88,7 @@ func Test_FailSafeInvokeFail(t *testing.T) {
mockResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
assert.Nil(t, result.Result())
......
......@@ -18,6 +18,7 @@ limitations under the License.
package cluster_impl
import (
"context"
"errors"
"fmt"
"time"
......@@ -44,7 +45,7 @@ func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {
}
}
func (invoker *forkingClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
err := invoker.checkWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
......@@ -75,7 +76,7 @@ func (invoker *forkingClusterInvoker) Invoke(invocation protocol.Invocation) pro
resultQ := queue.New(1)
for _, ivk := range selected {
go func(k protocol.Invoker) {
result := k.Invoke(invocation)
result := k.Invoke(ctx, invocation)
err := resultQ.Put(result)
if err != nil {
logger.Errorf("resultQ put failed with exception: %v.\n", err)
......
......@@ -87,7 +87,7 @@ func Test_ForkingInvokeSuccess(t *testing.T) {
clusterInvoker := registerForking(t, invokers...)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
wg.Wait()
}
......@@ -117,7 +117,7 @@ func Test_ForkingInvokeTimeout(t *testing.T) {
clusterInvoker := registerForking(t, invokers...)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NotNil(t, result)
assert.NotNil(t, result.Error())
wg.Wait()
......@@ -156,7 +156,7 @@ func Test_ForkingInvokeHalfTimeout(t *testing.T) {
clusterInvoker := registerForking(t, invokers...)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
wg.Wait()
}
......@@ -18,6 +18,8 @@
package cluster_impl
import (
"context"
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
......@@ -33,19 +35,19 @@ func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoke
}
}
func (invoker *registryAwareClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
//First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
for _, invoker := range invokers {
if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" {
return invoker.Invoke(invocation)
return invoker.Invoke(ctx, invocation)
}
}
//If none of the invokers has a local signal, pick the first one available.
for _, invoker := range invokers {
if invoker.IsAvailable() {
return invoker.Invoke(invocation)
return invoker.Invoke(ctx, invocation)
}
}
return nil
......
......@@ -45,7 +45,7 @@ func Test_RegAwareInvokeSuccess(t *testing.T) {
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
}
......@@ -62,7 +62,7 @@ func TestDestroy(t *testing.T) {
staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := regAwareCluster.Join(staticDir)
assert.Equal(t, true, clusterInvoker.IsAvailable())
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
result := clusterInvoker.Invoke(context.TODO(), &invocation.RPCInvocation{})
assert.NoError(t, result.Error())
count = 0
clusterInvoker.Destroy()
......
......@@ -93,7 +93,7 @@ type rest struct {
var count int
func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (bi *MockInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
count++
var success bool
var err error = nil
......
......@@ -18,6 +18,7 @@
package proxy
import (
"context"
"reflect"
"sync"
)
......@@ -129,7 +130,7 @@ func (p *Proxy) Implement(v common.RPCService) {
inv.SetAttachments(k, value)
}
result := p.invoke.Invoke(inv)
result := p.invoke.Invoke(context.TODO(), inv)
err = result.Error()
logger.Infof("[makeDubboCallProxy] result: %v, err: %v", result.Result(), err)
......
......@@ -18,6 +18,7 @@
package proxy_factory
import (
"context"
"reflect"
"strings"
)
......@@ -75,7 +76,7 @@ type ProxyInvoker struct {
protocol.BaseInvoker
}
func (pi *ProxyInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (pi *ProxyInvoker) Invoke(context context.Context, invocation protocol.Invocation) protocol.Result {
result := &protocol.RPCResult{}
result.SetAttachments(invocation.Attachments())
......
......@@ -18,11 +18,13 @@
package filter
import (
"context"
"github.com/apache/dubbo-go/protocol"
)
// Extension - Filter
type Filter interface {
Invoke(protocol.Invoker, protocol.Invocation) protocol.Result
OnResponse(protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result
Invoke(context.Context, protocol.Invoker, protocol.Invocation) protocol.Result
OnResponse(context.Context, protocol.Result, protocol.Invoker, protocol.Invocation) protocol.Result
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment