diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index 4a4f689a1b424c3d34cd27943a35dca37c3b2003..c833ec75d932587684353a86bdca5539148482c8 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -1,4 +1,4 @@ -package cluster +package cluster_impl import ( gxnet "github.com/AlexStocks/goext/net" @@ -69,7 +69,7 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p if len(invokers) == 1 { return invokers[0] } - selectedInvoker := lb.Select(invokers, invoker.GetUrl(), invocation) + selectedInvoker := lb.Select(invokers, invocation) //judge to if the selectedInvoker is invoked @@ -88,7 +88,7 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p } if len(reslectInvokers) > 0 { - return lb.Select(reslectInvokers, invoker.GetUrl(), invocation) + return lb.Select(reslectInvokers, invocation) } else { return nil } diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go index 1f427019de35e04df76d494fb7587e4ad3b8ece0..d8bab9fd8c861d95a56a6ca94a92f368db3ae13b 100644 --- a/cluster/cluster_impl/failover_cluster.go +++ b/cluster/cluster_impl/failover_cluster.go @@ -1,4 +1,4 @@ -package cluster +package cluster_impl import ( "github.com/dubbo/go-for-apache-dubbo/cluster" @@ -12,10 +12,10 @@ type failoverCluster struct { const name = "failover" func init() { - extension.SetCluster(name, newFailoverCluster) + extension.SetCluster(name, NewFailoverCluster) } -func newFailoverCluster() cluster.Cluster { +func NewFailoverCluster() cluster.Cluster { return &failoverCluster{} } diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index 00b14296e53486c23a28d6764b62658a317ccb3a..66101a1f0515ce62aca80a645fbba326aa2e9aa6 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -1,4 +1,4 @@ -package cluster +package cluster_impl import ( gxnet "github.com/AlexStocks/goext/net" @@ -37,7 +37,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) //Get the service method loadbalance config if have - if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE); v != "" { + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { lb = v } loadbalance := extension.GetLoadbalance(lb) @@ -46,7 +46,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) //Get the service method loadbalance config if have - if v := url.GetMethodParamInt(methodName, constant.RETRIES_KEY, constant.DEFAULT_RETRIES); v != 0 { + if v := url.GetMethodParamInt(methodName, constant.RETRIES_KEY, 0); v != 0 { retries = v } invoked := []protocol.Invoker{} diff --git a/cluster/cluster_impl/failover_cluster_test.go b/cluster/cluster_impl/failover_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..26b3c0ae8947961e4766da07156e31c9985ec5bc --- /dev/null +++ b/cluster/cluster_impl/failover_cluster_test.go @@ -0,0 +1,155 @@ +package cluster_impl + +import ( + "context" + "errors" + "fmt" + "net/url" + "testing" +) +import ( + log "github.com/AlexStocks/log4go" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/cluster/directory" + "github.com/dubbo/go-for-apache-dubbo/cluster/loadbalance" + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/common/extension" + "github.com/dubbo/go-for-apache-dubbo/protocol" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" +) + +///////////////////////////// +// mock invoker +///////////////////////////// + +type MockInvoker struct { + url common.URL + available bool + destroyed bool + + successCount int +} + +func NewMockInvoker(url common.URL, successCount int) *MockInvoker { + return &MockInvoker{ + url: url, + available: true, + destroyed: false, + successCount: successCount, + } +} + +func (bi *MockInvoker) GetUrl() common.URL { + return bi.url +} + +func (bi *MockInvoker) IsAvailable() bool { + return bi.available +} + +func (bi *MockInvoker) IsDestroyed() bool { + return bi.destroyed +} + +type rest struct { + tried int + success bool +} + +func (bi *MockInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + count++ + var success bool + var err error = nil + if count >= bi.successCount { + success = true + } else { + err = errors.New("error") + } + result := &protocol.RPCResult{Err: err, Rest: rest{tried: count, success: success}} + + return result +} + +func (bi *MockInvoker) Destroy() { + log.Info("Destroy invoker: %v", bi.GetUrl().String()) + bi.destroyed = true + bi.available = false +} + +var count int + +func normalInvoke(t *testing.T, successCount int, urlParam url.Values, invocations ...*invocation.RPCInvocation) protocol.Result { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failoverCluster := NewFailoverCluster() + + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i), common.WithParams(urlParam)) + invokers = append(invokers, NewMockInvoker(url, successCount)) + } + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failoverCluster.Join(staticDir) + if len(invocations) > 0 { + return clusterInvoker.Invoke(invocations[0]) + } + return clusterInvoker.Invoke(&invocation.RPCInvocation{}) +} +func Test_FailoverInvokeSuccess(t *testing.T) { + urlParams := url.Values{} + result := normalInvoke(t, 2, urlParams) + assert.NoError(t, result.Error()) + count = 0 +} + +func Test_FailoverInvokeFail(t *testing.T) { + urlParams := url.Values{} + result := normalInvoke(t, 3, urlParams) + assert.Errorf(t, result.Error(), "error") + count = 0 +} + +func Test_FailoverInvoke1(t *testing.T) { + urlParams := url.Values{} + urlParams.Set(constant.RETRIES_KEY, "3") + result := normalInvoke(t, 3, urlParams) + assert.NoError(t, result.Error()) + count = 0 +} + +func Test_FailoverInvoke2(t *testing.T) { + urlParams := url.Values{} + urlParams.Set(constant.RETRIES_KEY, "2") + urlParams.Set("methods.test."+constant.RETRIES_KEY, "3") + + ivc := &invocation.RPCInvocation{} + ivc.SetMethod("test") + result := normalInvoke(t, 3, urlParams, ivc) + assert.NoError(t, result.Error()) + count = 0 +} + +func Test_FailoverDestroy(t *testing.T) { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failoverCluster := NewFailoverCluster() + + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, NewMockInvoker(url, 1)) + } + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failoverCluster.Join(staticDir) + assert.Equal(t, true, clusterInvoker.IsAvailable()) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.NoError(t, result.Error()) + count = 0 + clusterInvoker.Destroy() + assert.Equal(t, false, clusterInvoker.IsAvailable()) + +} diff --git a/cluster/cluster_impl/mock_cluster.go b/cluster/cluster_impl/mock_cluster.go index d046bd321547c52b4293211bb592423d751604b3..07963e5222e28ad8633489312f3e6f91a9bf9da6 100644 --- a/cluster/cluster_impl/mock_cluster.go +++ b/cluster/cluster_impl/mock_cluster.go @@ -1,4 +1,4 @@ -package cluster +package cluster_impl import ( "github.com/dubbo/go-for-apache-dubbo/cluster" diff --git a/cluster/cluster_impl/registry_aware_cluster.go b/cluster/cluster_impl/registry_aware_cluster.go index ec89b3bfbfa5fa68906ff08a9ba823a8dd493b3c..f61ba6da328b6d25d0906b684d26db317f1f1bc4 100644 --- a/cluster/cluster_impl/registry_aware_cluster.go +++ b/cluster/cluster_impl/registry_aware_cluster.go @@ -1,4 +1,4 @@ -package cluster +package cluster_impl import ( "github.com/dubbo/go-for-apache-dubbo/cluster" @@ -10,10 +10,10 @@ type registryAwareCluster struct { } func init() { - extension.SetCluster("registryAware", newRegistryAwareCluster) + extension.SetCluster("registryAware", NewRegistryAwareCluster) } -func newRegistryAwareCluster() cluster.Cluster { +func NewRegistryAwareCluster() cluster.Cluster { return ®istryAwareCluster{} } diff --git a/cluster/cluster_impl/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go index 2ceef25aeca7568578ea32e90f9014e611cde6e1..5e5e9f350461799fc685a9c755295f57b0a95350 100644 --- a/cluster/cluster_impl/registry_aware_cluster_invoker.go +++ b/cluster/cluster_impl/registry_aware_cluster_invoker.go @@ -1,4 +1,4 @@ -package cluster +package cluster_impl import ( "github.com/dubbo/go-for-apache-dubbo/cluster" diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7e4b07cb079f485542ef7c8eb1953f33f0a21873 --- /dev/null +++ b/cluster/cluster_impl/registry_aware_cluster_test.go @@ -0,0 +1,54 @@ +package cluster_impl + +import ( + "context" + "fmt" + "testing" +) +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/dubbo/go-for-apache-dubbo/cluster/directory" + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" +) + +func Test_RegAwareInvokeSuccess(t *testing.T) { + + regAwareCluster := NewRegistryAwareCluster() + + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, NewMockInvoker(url, 1)) + } + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := regAwareCluster.Join(staticDir) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.NoError(t, result.Error()) + count = 0 +} + +func TestDestroy(t *testing.T) { + regAwareCluster := NewRegistryAwareCluster() + + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, NewMockInvoker(url, 1)) + } + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := regAwareCluster.Join(staticDir) + assert.Equal(t, true, clusterInvoker.IsAvailable()) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.NoError(t, result.Error()) + count = 0 + clusterInvoker.Destroy() + assert.Equal(t, false, clusterInvoker.IsAvailable()) + +} diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index cedc019ac60f71fa7b5eb96ac26e0c643650633d..1d092b05df420e313146344c7b993982f72e226e 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -19,6 +19,9 @@ func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory { //for-loop invokers ,if all invokers is available ,then it means directory is available func (dir *staticDirectory) IsAvailable() bool { + if len(dir.invokers) == 0 { + return false + } for _, invoker := range dir.invokers { if !invoker.IsAvailable() { return false diff --git a/cluster/directory/static_directory_test.go b/cluster/directory/static_directory_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a71b88315db4f496aa3817fbb6b665e17ef8b785 --- /dev/null +++ b/cluster/directory/static_directory_test.go @@ -0,0 +1,37 @@ +package directory + +import ( + "context" + "fmt" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" + "github.com/stretchr/testify/assert" + "testing" +) +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func Test_StaticDirList(t *testing.T) { + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + + staticDir := NewStaticDirectory(invokers) + assert.Len(t, staticDir.List(&invocation.RPCInvocation{}), 10) +} + +func Test_StaticDirDestroy(t *testing.T) { + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + + staticDir := NewStaticDirectory(invokers) + assert.Equal(t, true, staticDir.IsAvailable()) + staticDir.Destroy() + assert.Equal(t, false, staticDir.IsAvailable()) +} diff --git a/cluster/loadbalance.go b/cluster/loadbalance.go index 880bcc9be20d9e2e75a3b5ad00e1316f0066c10c..4df6088100bf1957ecdeb649b6b973b86f8a09ec 100644 --- a/cluster/loadbalance.go +++ b/cluster/loadbalance.go @@ -1,37 +1,12 @@ package cluster import ( - "time" -) - -import ( - "github.com/dubbo/go-for-apache-dubbo/common" - "github.com/dubbo/go-for-apache-dubbo/common/constant" "github.com/dubbo/go-for-apache-dubbo/protocol" ) // Extension - LoadBalance type LoadBalance interface { - Select([]protocol.Invoker, common.URL, protocol.Invocation) protocol.Invoker + Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker } -func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { - url := invoker.GetUrl() - weight := url.GetMethodParamInt(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) - if weight > 0 { - //get service register time an do warm up time - now := time.Now().Unix() - timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) - if uptime := now - timestamp; uptime > 0 { - warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) - if uptime < warmup { - if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { - weight = 1 - } else if int64(ww) <= weight { - weight = int64(ww) - } - } - } - } - return weight -} + diff --git a/cluster/loadbalance/random.go b/cluster/loadbalance/random.go index a86b36e4356b0a10232ea6403b881155a329545e..556c503dde3224e572f9949e0d0e5bae2aefa97d 100644 --- a/cluster/loadbalance/random.go +++ b/cluster/loadbalance/random.go @@ -6,7 +6,6 @@ import ( import ( "github.com/dubbo/go-for-apache-dubbo/cluster" - "github.com/dubbo/go-for-apache-dubbo/common" "github.com/dubbo/go-for-apache-dubbo/common/extension" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -14,17 +13,17 @@ import ( const name = "random" func init() { - extension.SetLoadbalance(name, newRandomLoadBalance) + extension.SetLoadbalance(name, NewRandomLoadBalance) } type randomLoadBalance struct { } -func newRandomLoadBalance() cluster.LoadBalance { +func NewRandomLoadBalance() cluster.LoadBalance { return &randomLoadBalance{} } -func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) protocol.Invoker { +func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { var length int if length = len(invokers); length == 1 { return invokers[0] @@ -32,12 +31,12 @@ func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, url common.URL, sameWeight := true weights := make([]int64, length) - firstWeight := cluster.GetWeight(invokers[0], invocation) + firstWeight :=GetWeight(invokers[0], invocation) totalWeight := firstWeight weights[0] = firstWeight for i := 1; i < length; i++ { - weight := cluster.GetWeight(invokers[i], invocation) + weight := GetWeight(invokers[i], invocation) weights[i] = weight totalWeight += weight diff --git a/cluster/loadbalance/random_test.go b/cluster/loadbalance/random_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4c20010f8ad0b88f6c83008f8fe540f8cb889d89 --- /dev/null +++ b/cluster/loadbalance/random_test.go @@ -0,0 +1,90 @@ +package loadbalance + +import ( + "context" + "fmt" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/protocol/invocation" + "github.com/stretchr/testify/assert" + "net/url" + "strconv" + "testing" + "time" +) +import ( + "github.com/dubbo/go-for-apache-dubbo/common" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func Test_RandomlbSelect(t *testing.T) { + randomlb := NewRandomLoadBalance() + + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + randomlb.Select(invokers, &invocation.RPCInvocation{}) +} + +func Test_RandomlbSelectWeight(t *testing.T) { + randomlb := NewRandomLoadBalance() + + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + + urlParams := url.Values{} + urlParams.Set("methods.test."+constant.WEIGHT_KEY, "10000000000000") + urll, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams)) + invokers = append(invokers, protocol.NewBaseInvoker(urll)) + ivc := &invocation.RPCInvocation{} + ivc.SetMethod("test") + + var selectedInvoker []protocol.Invoker + var selected float64 + for i := 0; i < 10000; i++ { + s := randomlb.Select(invokers, ivc) + if s.GetUrl().Ip == "192.168.1.100" { + selected++ + } + selectedInvoker = append(selectedInvoker, s) + } + + assert.Condition(t, func() bool { + //really is 0.9999999999999 + return selected/10000 > 0.9 + }) +} + +func Test_RandomlbSelectWarmup(t *testing.T) { + randomlb := NewRandomLoadBalance() + + invokers := []protocol.Invoker{} + for i := 0; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + + urlParams := url.Values{} + urlParams.Set(constant.REMOTE_TIMESTAMP_KEY, strconv.FormatInt(time.Now().Add(time.Minute*(-9)).Unix(), 10)) + urll, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams)) + invokers = append(invokers, protocol.NewBaseInvoker(urll)) + ivc := &invocation.RPCInvocation{} + ivc.SetMethod("test") + + var selectedInvoker []protocol.Invoker + var selected float64 + for i := 0; i < 10000; i++ { + s := randomlb.Select(invokers, ivc) + if s.GetUrl().Ip == "192.168.1.100" { + selected++ + } + selectedInvoker = append(selectedInvoker, s) + } + assert.Condition(t, func() bool { + return selected/10000 < 0.1 + }) +} diff --git a/cluster/loadbalance/util.go b/cluster/loadbalance/util.go new file mode 100644 index 0000000000000000000000000000000000000000..c2bff75b1d6fd5282f3721b32bfa749fd4ee84ac --- /dev/null +++ b/cluster/loadbalance/util.go @@ -0,0 +1,30 @@ +package loadbalance + +import ( + "time" +) +import ( + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/protocol" +) + +func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { + url := invoker.GetUrl() + weight := url.GetMethodParamInt(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) + if weight > 0 { + //get service register time an do warm up time + now := time.Now().Unix() + timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) + if uptime := now - timestamp; uptime > 0 { + warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) + if uptime < warmup { + if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { + weight = 1 + } else if int64(ww) <= weight { + weight = int64(ww) + } + } + } + } + return weight +} diff --git a/common/constant/default.go b/common/constant/default.go index 5871e3cec8ec8dd1c17cb79f002fd9c386d15ecb..7d12a12b9324deb3fce99c6838e1d223ffe5ef57 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -1,8 +1,8 @@ package constant const ( - DEFAULT_WEIGHT = 100 // - DEFAULT_WARMUP = 10 * 60 * 1000 + DEFAULT_WEIGHT = 100 // + DEFAULT_WARMUP = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second ) const ( diff --git a/go.mod b/go.mod index b7ab0cafd6a1a83c0a6a1c8bbcd5aa97071ae005..f794d4c3a109ee431feb3354438366bdda420f36 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/dubbogo/hessian2 v0.0.0-20190410112310-f093e4436e31 github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 github.com/montanaflynn/stats v0.5.0 + github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/stretchr/testify v1.3.0 github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index e6034d2c5678d1445098b1dc93ae9bd58e3acc0d..31a060a9abf3907de3778bf99d19ba3dba8f8847 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -94,10 +94,14 @@ func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } -func (r *RPCInvocation) SetInvoker() protocol.Invoker { - return r.invoker -} +//func (r *RPCInvocation) SetInvoker() protocol.Invoker { +// return r.invoker +//} func (r *RPCInvocation) CallBack() interface{} { return r.callBack } + +func (r *RPCInvocation) SetMethod(method string) { + r.methodName = method +} diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 9d9e2b94ca39289a526ba587456cb6b7ae89375b..1f35462c8949e8ac87a93178267044f61779307f 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -45,7 +45,7 @@ func TestSubscribe_InvalidUrl(t *testing.T) { func TestSubscribe_Group(t *testing.T) { extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) - extension.SetCluster("mock", cluster.NewMockCluster) + extension.SetCluster("mock", cluster_impl.NewMockCluster) regurl, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") suburl, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000")