diff --git a/.gitignore b/.gitignore index 3d70f85a59b1367114bf04a7286fbb020d6825a4..3769d10aa12348464cb315fd336bf6b7234069cc 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ vendor/ logs/ .vscode/ +coverage.txt diff --git a/README.md b/README.md index b64ce4973d6df5efcb560058b857d455e6371a57..593be70222b69f5aa62ac0b781a0a3be6b0e1ff5 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,16 @@ Finished List: - Codec: JsonRPC v2, Hessian v2 - Registry: ZooKeeper - Configure Center: Zookeeper -- Cluster Strategy: Failover -- Load Balance: Random, RoundRobin, LeastActive +- Cluster Strategy: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/Available/Broadcast +- Load Balance: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65) - Filter: Echo Health Check +- Other feature: [generic invoke](https://github.com/apache/dubbo-go/pull/122)/start check/connecting certain provider/multi-protocols/multi-registries/multi-versions/service group Working List: -- Cluster Strategy: Failfast/Failsafe/Failback/Forking +- Cluster Strategy: Forking - Load Balance: ConsistentHash -- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter +- Filter: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter - Registry: etcd/k8s/consul/nacos - Configure Center: apollo - Dynamic Configuration Center & Metadata Center (dubbo v2.7.x) @@ -55,7 +56,7 @@ You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubb ## Document -Move [here](https://dubbogo.github.io/dubbo-go-website/) +TODO ## Quick Start diff --git a/README_CN.md b/README_CN.md index 0553527534506f21eb9bd154c034b8345298f93c..7732a655adfbaa2a2a963201d6580767758c4fce 100644 --- a/README_CN.md +++ b/README_CN.md @@ -26,29 +26,29 @@ Apache License, Version 2.0 瀹炵幇鍒楄〃: -- Role: Consumer, Provider -- Transport: HTTP, TCP -- Codec: JsonRPC v2, Hessian v2 -- Registry: ZooKeeper -- Routing: Rule(dubbo v2.6.x) -- Configure Center: Zookeeper -- Cluster Strategy: Failover -- Load Balance: Random, RoundRobin, LeastActive -- Filter: Echo Health Check +- 瑙掕壊绔�: Consumer, Provider +- 浼犺緭鍗忚: HTTP, TCP +- 搴忓垪鍖栧崗璁�: JsonRPC v2, Hessian v2 +- 娉ㄥ唽涓績: ZooKeeper +- 閰嶇疆涓績: Zookeeper +- 闆嗙兢绛栫暐: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/Available/Broadcast +- 璐熻浇鍧囪 绛栫暐: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65) +- 杩囨护鍣�: Echo Health Check +- 鍏朵粬鍔熻兘鏀寔: [娉涘寲璋冪敤](https://github.com/apache/dubbo-go/pull/122)/鍚姩鏃舵鏌�/鏈嶅姟鐩磋繛/澶氭湇鍔″崗璁�/澶氭敞鍐屼腑蹇�/澶氭湇鍔$増鏈�/鏈嶅姟鍒嗙粍 寮€鍙戜腑鍒楄〃: -- Cluster Strategy: Failfast/Failsafe/Failback/Forking -- Load Balance: ConsistentHash -- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter -- Registry: etcd/k8s/consul/nacos -- Configure Center: apollo -- Dynamic Configuration Center & Metadata Center (dubbo v2.7.x) +- 闆嗙兢绛栫暐: Forking +- 璐熻浇鍧囪 绛栫暐: ConsistentHash +- 杩囨护鍣�: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter +- 娉ㄥ唽涓績: etcd/k8s/consul/nacos +- 閰嶇疆涓績: apollo +- 鍔ㄦ€侀厤缃腑蹇� & 鍏冩暟鎹腑蹇� (dubbo v2.7.x) - Metrics: Promethus(dubbo v2.7.x) 浠诲姟鍒楄〃: -- Registry: kubernetes +- 娉ㄥ唽涓績: kubernetes - Routing: istio - tracing (dubbo ecosystem) @@ -56,7 +56,7 @@ Apache License, Version 2.0 ## 鏂囨。 -绉绘[杩欓噷](https://dubbogo.github.io/dubbo-go-website/) +TODO ## 蹇€熷紑濮� ## diff --git a/cluster/cluster_impl/available_cluster.go b/cluster/cluster_impl/available_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..7e748cd938319ff437bb3fb6c7945b857d316069 --- /dev/null +++ b/cluster/cluster_impl/available_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type availableCluster struct{} + +const available = "available" + +func init() { + extension.SetCluster(available, NewAvailableCluster) +} + +func NewAvailableCluster() cluster.Cluster { + return &availableCluster{} +} + +func (cluser *availableCluster) Join(directory cluster.Directory) protocol.Invoker { + return NewAvailableClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/available_cluster_invoker.go b/cluster/cluster_impl/available_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..c59c0702c216fe5c58d190a023322aaa00ac9c17 --- /dev/null +++ b/cluster/cluster_impl/available_cluster_invoker.go @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "fmt" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/protocol" +) + +type availableClusterInvoker struct { + baseClusterInvoker +} + +func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &availableClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + for _, ivk := range invokers { + if ivk.IsAvailable() { + return ivk.Invoke(invocation) + } + } + return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))} +} diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster_impl/available_cluster_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..04032a7f24dec0e73acb15921f753921391f1515 --- /dev/null +++ b/cluster/cluster_impl/available_cluster_invoker_test.go @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "strings" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + availableUrl, _ = common.NewURL(context.Background(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +func registerAvailable(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + availableCluster := NewAvailableCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + invoker.EXPECT().GetUrl().Return(availableUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := availableCluster.Join(staticDir) + return clusterInvoker +} + +func TestAvailableClusterInvokerSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerAvailable(t, invoker) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().IsAvailable().Return(true) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.Equal(t, mockResult, result) +} + +func TestAvailableClusterInvokerNoAvail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerAvailable(t, invoker) + + invoker.EXPECT().IsAvailable().Return(false) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NotNil(t, result.Error()) + assert.True(t, strings.Contains(result.Error().Error(), "no provider available")) + assert.Nil(t, result.Result()) +} diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index 6cbed7748e496afb37f9a1e93dbc40e845c62b06..2426258b95e8a2ecf5067b2ed469eeb9a5617f18 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -25,6 +25,8 @@ import ( import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/version" @@ -115,12 +117,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p } func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool { - if len(invoked) > 0 { - for _, i := range invoked { - if i == selectedInvoker { - return true - } + for _, i := range invoked { + if i == selectedInvoker { + return true } } return false } + +func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance { + url := invoker.GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 { + lb = v + } + return extension.GetLoadbalance(lb) +} diff --git a/cluster/cluster_impl/broadcast_cluster.go b/cluster/cluster_impl/broadcast_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..50aae3cfab8d67570b50dcab4e53bbfad29d6d30 --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type broadcastCluster struct{} + +const broadcast = "broadcast" + +func init() { + extension.SetCluster(broadcast, NewBroadcastCluster) +} + +func NewBroadcastCluster() cluster.Cluster { + return &broadcastCluster{} +} + +func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker { + return newBroadcastClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/broadcast_cluster_invoker.go b/cluster/cluster_impl/broadcast_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..238df0acfa7fb946e38bfbfd490bce7c0bb34e60 --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster_invoker.go @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +type broadcastClusterInvoker struct { + baseClusterInvoker +} + +func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &broadcastClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + var result protocol.Result + for _, ivk := range invokers { + result = ivk.Invoke(invocation) + if result.Error() != nil { + logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk) + err = result.Error() + } + } + if err != nil { + return &protocol.RPCResult{Err: err} + } + return result +} diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster_impl/broadcast_cluster_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..565684a8ae25c648ff77aef71d2ced0665202fe7 --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster_invoker_test.go @@ -0,0 +1,109 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "errors" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + broadcastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + + invokers := []protocol.Invoker{} + for i, ivk := range mockInvokers { + invokers = append(invokers, ivk) + if i == 0 { + ivk.EXPECT().GetUrl().Return(broadcastUrl) + } + } + staticDir := directory.NewStaticDirectory(invokers) + + broadcastCluster := NewBroadcastCluster() + clusterInvoker := broadcastCluster.Join(staticDir) + return clusterInvoker +} + +func Test_BroadcastInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + for i := 0; i < 3; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + + clusterInvoker := registerBroadcast(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) +} + +func Test_BroadcastInvokeFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockFailedResult := &protocol.RPCResult{Err: errors.New("just failed")} + for i := 0; i < 10; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + } + for i := 0; i < 10; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + + clusterInvoker := registerBroadcast(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockFailedResult.Err, result.Error()) +} diff --git a/cluster/cluster_impl/failback_cluster.go b/cluster/cluster_impl/failback_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..de22c78e947d0b8124add721ab7ff42efebcdbe4 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failbackCluster struct{} + +const failback = "failback" + +func init() { + extension.SetCluster(failback, NewFailbackCluster) +} + +func NewFailbackCluster() cluster.Cluster { + return &failbackCluster{} +} + +func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailbackClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..027461ccb7f32d6383d756ac986295b6300f249c --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "sync" + "time" +) + +import ( + "github.com/Workiva/go-datastructures/queue" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +/** + * When fails, record failure requests and schedule for retry on a regular interval. + * Especially useful for services of notification. + * + * <a href="http://en.wikipedia.org/wiki/Failback">Failback</a> + */ +type failbackClusterInvoker struct { + baseClusterInvoker + + once sync.Once + ticker *time.Ticker + maxRetries int64 + failbackTasks int64 + taskList *queue.Queue +} + +func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { + invoker := &failbackClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } + retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) + if retriesConfig <= 0 { + retriesConfig = constant.DEFAULT_FAILBACK_TIMES + } + failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) + if failbackTasksConfig <= 0 { + failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS + } + invoker.maxRetries = retriesConfig + invoker.failbackTasks = failbackTasksConfig + return invoker +} + +func (invoker *failbackClusterInvoker) process() { + invoker.ticker = time.NewTicker(time.Second * 1) + for range invoker.ticker.C { + // check each timeout task and re-run + for { + value, err := invoker.taskList.Peek() + if err == queue.ErrDisposed { + return + } + if err == queue.ErrEmptyQueue { + break + } + + retryTask := value.(*retryTimerTask) + if time.Since(retryTask.lastT).Seconds() < 5 { + break + } + + // ignore return. the get must success. + _, err = invoker.taskList.Get(1) + if err != nil { + logger.Warnf("get task found err: %v\n", err) + break + } + + go func(retryTask *retryTimerTask) { + invoked := make([]protocol.Invoker, 0) + invoked = append(invoked, retryTask.lastInvoker) + + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) + var result protocol.Result + result = retryInvoker.Invoke(retryTask.invocation) + if result.Error() != nil { + retryTask.lastInvoker = retryInvoker + invoker.checkRetry(retryTask, result.Error()) + } + }(retryTask) + + } + } +} + +func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) { + logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n", + retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error()) + retryTask.retries++ + retryTask.lastT = time.Now() + if retryTask.retries > invoker.maxRetries { + logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n", + retryTask.retries, retryTask.invocation) + } else { + invoker.taskList.Put(retryTask) + } +} + +func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", + invocation.MethodName(), invoker.GetUrl().Service(), err) + return &protocol.RPCResult{} + } + url := invokers[0].GetUrl() + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := make([]protocol.Invoker, 0, len(invokers)) + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + invoker.once.Do(func() { + invoker.taskList = queue.New(invoker.failbackTasks) + go invoker.process() + }) + + taskLen := invoker.taskList.Len() + if taskLen >= invoker.failbackTasks { + logger.Warnf("tasklist is too full > %d.\n", taskLen) + return &protocol.RPCResult{} + } + + timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk) + invoker.taskList.Put(timerTask) + + logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", + methodName, url.Service(), result.Error().Error()) + // ignore + return &protocol.RPCResult{} + } + + return result +} + +func (invoker *failbackClusterInvoker) Destroy() { + invoker.baseClusterInvoker.Destroy() + + // stop ticker + if invoker.ticker != nil { + invoker.ticker.Stop() + } + + _ = invoker.taskList.Dispose() +} + +type retryTimerTask struct { + loadbalance cluster.LoadBalance + invocation protocol.Invocation + invokers []protocol.Invoker + lastInvoker protocol.Invoker + retries int64 + lastT time.Time +} + +func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, + lastInvoker protocol.Invoker) *retryTimerTask { + return &retryTimerTask{ + loadbalance: loadbalance, + invocation: invocation, + invokers: invokers, + lastInvoker: lastInvoker, + lastT: time.Now(), + } +} diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c94347a1251a69a10c0a4d50007ef569bd6dd996 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -0,0 +1,242 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "sync" + "testing" + "time" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// registerFailback register failbackCluster to cluster extension. +func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failbackCluster := NewFailbackCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failbackCluster.Join(staticDir) + return clusterInvoker +} + +// success firstly, failback should return origin invoke result. +func Test_FailbackSuceess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) +} + +// failed firstly, success later after one retry. +func Test_FailbackRetryOneSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // failed at first + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + + // success second + var wg sync.WaitGroup + wg.Add(1) + now := time.Now() + mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= 5) + wg.Done() + return mockSuccResult + }) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + // ensure the retry task has been executed + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + // wait until the retry task is executed, the taskList will be empty. + wg.Wait() + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +// failed firstly, and failed again after ech retry time. +func Test_FailbackRetryFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + + // + var wg sync.WaitGroup + retries := 2 + wg.Add(retries) + now := time.Now() + + // add retry call that eventually failed. + for i := 0; i < retries; i++ { + j := i + 1 + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= int64(5*j)) + wg.Done() + return mockFailedResult + }) + } + + // first call should failed. + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + wg.Wait() + time.Sleep(time.Second) + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + // after destroy, the taskList will be empty + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +// add 10 tasks but all failed firstly, and failed again with one retry. +func Test_FailbackRetryFailed10Times(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker.maxRetries = 10 + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // 10 task should failed firstly. + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(10) + + // 10 task should retry and failed. + var wg sync.WaitGroup + wg.Add(10) + now := time.Now() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= 5) + wg.Done() + return mockFailedResult + }).Times(10) + + for i := 0; i < 10; i++ { + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + } + + wg.Wait() + time.Sleep(time.Second) // in order to ensure checkRetry have done + assert.Equal(t, int64(10), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +func Test_FailbackOutOfLimit(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker.failbackTasks = 1 + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11) + + // reached limit + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + // all will be out of limit + for i := 0; i < 10; i++ { + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + } + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() +} diff --git a/cluster/cluster_impl/failfast_cluster.go b/cluster/cluster_impl/failfast_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..6301d945626103226132b433dd21e8647f53a38b --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failfastCluster struct{} + +const failfast = "failfast" + +func init() { + extension.SetCluster(failfast, NewFailFastCluster) +} + +func NewFailFastCluster() cluster.Cluster { + return &failfastCluster{} +} + +func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailFastClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failfast_cluster_invoker.go b/cluster/cluster_impl/failfast_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..734ea2c6cb19bf54a338a76a10c9cfcc59d3954b --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster_invoker.go @@ -0,0 +1,51 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/protocol" +) + +type failfastClusterInvoker struct { + baseClusterInvoker +} + +func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failfastClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + loadbalance := getLoadBalance(invokers[0], invocation) + + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + ivk := invoker.doSelect(loadbalance, invocation, invokers, nil) + return ivk.Invoke(invocation) +} diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster_impl/failfast_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7a19e80ccda15aa13a1c4fcf250e05a6effa7f0b --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster_test.go @@ -0,0 +1,97 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failfastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// registerFailfast register failfastCluster to cluster extension. +func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failfastCluster := NewFailFastCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failfastCluster.Join(staticDir) + return clusterInvoker +} + +func Test_FailfastInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailfast(t, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + res := result.Result().(rest) + assert.True(t, res.success) + assert.Equal(t, 0, res.tried) +} + +func Test_FailfastInvokeFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailfast(t, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + mockResult := &protocol.RPCResult{Err: perrors.New("error")} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NotNil(t, result.Error()) + assert.Equal(t, "error", result.Error().Error()) + assert.Nil(t, result.Result()) +} diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index cd17a85e00a1d4307b3b861467051e9719345d1a..69664269e2ccf362e6b7a12488fe006802d460e2 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -24,7 +24,6 @@ import ( import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/version" @@ -48,17 +47,11 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr if err != nil { return &protocol.RPCResult{Err: err} } - url := invokers[0].GetUrl() - methodName := invocation.MethodName() - //Get the service loadbalance config - lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + loadbalance := getLoadBalance(invokers[0], invocation) - //Get the service method loadbalance config if have - if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { - lb = v - } - loadbalance := extension.GetLoadbalance(lb) + methodName := invocation.MethodName() + url := invokers[0].GetUrl() //get reties retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) diff --git a/cluster/cluster_impl/failsafe_cluster.go b/cluster/cluster_impl/failsafe_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..3ff97d25eae80980a90a03e71865bb8f9a63defe --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failsafeCluster struct{} + +const failsafe = "failsafe" + +func init() { + extension.SetCluster(failsafe, NewFailsafeCluster) +} + +func NewFailsafeCluster() cluster.Cluster { + return &failsafeCluster{} +} + +func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailsafeClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster_impl/failsafe_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..b95f997fef87cf466f07c4e506e41758e7998e52 --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_invoker.go @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +/** + * When invoke fails, log the error message and ignore this error by returning an empty Result. + * Usually used to write audit logs and other operations + * + * <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a> + * + */ +type failsafeClusterInvoker struct { + baseClusterInvoker +} + +func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failsafeClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{} + } + + url := invokers[0].GetUrl() + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := make([]protocol.Invoker, 0) + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + // ignore + logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error()) + return &protocol.RPCResult{} + } + return result +} diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9ee9d9fee31b0cb24d877ab3dc0e24fb552f5f11 --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -0,0 +1,95 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failsafeUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// register_failsafe register failsafeCluster to cluster extension. +func register_failsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failsafeCluster := NewFailsafeCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failsafeCluster.Join(staticDir) + return clusterInvoker +} + +func Test_FailSafeInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) + + invoker.EXPECT().GetUrl().Return(failsafeUrl) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + res := result.Result().(rest) + assert.True(t, res.success) +} + +func Test_FailSafeInvokeFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) + + invoker.EXPECT().GetUrl().Return(failsafeUrl) + + mockResult := &protocol.RPCResult{Err: perrors.New("error")} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + assert.Nil(t, result.Result()) +} diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go index 695ca21b0ed53dcf94907223e4c222af17311db9..39c5620f02d607bb68430e17a206a1649ee31c54 100644 --- a/cluster/loadbalance/least_active.go +++ b/cluster/loadbalance/least_active.go @@ -1,14 +1,19 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ // @author yiji@apache.org package loadbalance diff --git a/cluster/loadbalance/round_robin.go b/cluster/loadbalance/round_robin.go index e173e211c3630f4d4786edc2c1a09709fd7bf0a1..075acac7cdc60086ececb7b655dee86ec5198369 100644 --- a/cluster/loadbalance/round_robin.go +++ b/cluster/loadbalance/round_robin.go @@ -1,15 +1,19 @@ -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package loadbalance diff --git a/common/constant/default.go b/common/constant/default.go index 05461ca6e7360e08d716d6b78d20ad4411df99e8..d103c6aadc0f76d4c4d82fedb89ea2f21f5eb7aa 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -26,11 +26,13 @@ const ( ) const ( - DEFAULT_LOADBALANCE = "random" - DEFAULT_RETRIES = 2 - DEFAULT_PROTOCOL = "dubbo" - DEFAULT_REG_TIMEOUT = "10s" - DEFAULT_CLUSTER = "failover" + DEFAULT_LOADBALANCE = "random" + DEFAULT_RETRIES = 2 + DEFAULT_PROTOCOL = "dubbo" + DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_CLUSTER = "failover" + DEFAULT_FAILBACK_TIMES = 3 + DEFAULT_FAILBACK_TASKS = 100 ) const ( @@ -38,6 +40,8 @@ const ( PREFIX_DEFAULT_KEY = "default." DEFAULT_SERVICE_FILTERS = "echo" DEFAULT_REFERENCE_FILTERS = "" + GENERIC_REFERENCE_FILTERS = "generic" + GENERIC = "$invoke" ECHO = "$echo" ) diff --git a/common/constant/key.go b/common/constant/key.go index bca658b2623e3ca9241103b5f9c82c7e15fb4062..4d2664baf9b311f6ed8aa8e295599d7e8e5846c4 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -30,6 +30,7 @@ const ( METHODS_KEY = "methods" TIMEOUT_KEY = "timeout" BEAN_NAME_KEY = "bean.name" + GENERIC_KEY = "generic" ) const ( @@ -45,6 +46,8 @@ const ( WEIGHT_KEY = "weight" WARMUP_KEY = "warmup" RETRIES_KEY = "retries" + BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" ) const ( diff --git a/config/config_loader.go b/config/config_loader.go index 0dd0fb7f963e09a4402f4fd01c8a6b91959a7ca3..720f65f5de6b24b445b7ce07dd2daaba0a856681 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -66,6 +66,10 @@ func Load() { logger.Errorf("[consumer config center refresh] %#v", err) } for key, ref := range consumerConfig.References { + if ref.Generic { + genericService := NewGenericService(key) + SetConsumerService(genericService) + } rpcService := GetConsumerService(key) if rpcService == nil { diff --git a/config/generic_service.go b/config/generic_service.go new file mode 100644 index 0000000000000000000000000000000000000000..8a4e88df9788554bc4a5ee33884166e4ccede37f --- /dev/null +++ b/config/generic_service.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package config + +type GenericService struct { + Invoke func(req []interface{}) (interface{}, error) `dubbo:"$invoke"` + referenceStr string +} + +func NewGenericService(referenceStr string) *GenericService { + return &GenericService{referenceStr: referenceStr} +} + +func (u *GenericService) Reference() string { + return u.referenceStr +} diff --git a/config/reference_config.go b/config/reference_config.go index a5b7d50db51df02cd1871b3fb6d0bd2d6a545dd2..67a4a3d816060475fa49182f7141aba9807849e1 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -55,6 +55,7 @@ type ReferenceConfig struct { Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` invoker protocol.Invoker urls []*common.URL + Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` } func (c *ReferenceConfig) Prefix() string { @@ -93,7 +94,7 @@ func (refconfig *ReferenceConfig) Refer() { refconfig.urls = append(refconfig.urls, &serviceUrl) } else { if serviceUrl.Path == "" { - serviceUrl.Path = "/" + refconfig.InterfaceName + serviceUrl.Path = "/" + refconfig.id } // merge url need to do newUrl := common.MergeUrl(serviceUrl, url) @@ -110,7 +111,6 @@ func (refconfig *ReferenceConfig) Refer() { regUrl.SubURL = url } } - if len(refconfig.urls) == 1 { refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0]) } else { @@ -157,6 +157,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10)) urlMap.Set(constant.GROUP_KEY, refconfig.Group) urlMap.Set(constant.VERSION_KEY, refconfig.Version) + urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) //getty invoke async or sync urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) @@ -170,7 +171,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.ENVIRONMENT_KEY, consumerConfig.ApplicationConfig.Environment) //filter - urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, refconfig.Filter, constant.DEFAULT_REFERENCE_FILTERS)) + var defaultReferenceFilter = constant.DEFAULT_REFERENCE_FILTERS + if refconfig.Generic { + defaultReferenceFilter = constant.GENERIC_REFERENCE_FILTERS + defaultReferenceFilter + } + urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, refconfig.Filter, defaultReferenceFilter)) for _, v := range refconfig.Methods { urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) @@ -180,3 +185,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { return urlMap } +func (refconfig *ReferenceConfig) GenericLoad(id string) { + genericService := NewGenericService(refconfig.id) + SetConsumerService(genericService) + refconfig.id = id + refconfig.Refer() + refconfig.Implement(genericService) + return +} diff --git a/examples/general/dubbo/go-client/app/client.go b/examples/general/dubbo/go-client/app/client.go index e6f1ae96fb387a1756c8a1c0b026746378377fff..b7ee0e662a26251a79af13d36215cc2cb44fd794 100644 --- a/examples/general/dubbo/go-client/app/client.go +++ b/examples/general/dubbo/go-client/app/client.go @@ -25,16 +25,15 @@ import ( "syscall" "time" ) - import ( - "github.com/apache/dubbo-go-hessian2" + hessian "github.com/apache/dubbo-go-hessian2" ) import ( "github.com/apache/dubbo-go/common/logger" _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" - _ "github.com/apache/dubbo-go/protocol/dubbo" + "github.com/apache/dubbo-go/protocol/dubbo" _ "github.com/apache/dubbo-go/registry/protocol" _ "github.com/apache/dubbo-go/filter/impl" @@ -65,7 +64,8 @@ func main() { test1() println("\n\ntest2") test2() - + println("\n\ntest3") + test3() initSignal() } @@ -288,3 +288,24 @@ func test2() { } println("error: %v", err) } +func test3() { + var appName = "UserProviderGer" + var referenceConfig = config.ReferenceConfig{ + InterfaceName: "com.ikurento.user.UserProvider", + Cluster: "failover", + Registry: "hangzhouzk", + Protocol: dubbo.DUBBO, + Generic: true, + } + referenceConfig.GenericLoad(appName) //appName is the unique identification of RPCService + + time.Sleep(3 * time.Second) + println("\n\n\nstart to generic invoke") + resp, err := referenceConfig.GetRPCService().(*config.GenericService).Invoke([]interface{}{"GetUser", []string{"java.lang.String"}, []interface{}{"A003"}}) + if err != nil { + panic(err) + } + println("res: %v\n", resp) + println("succ!") + +} diff --git a/filter/impl/active_filter.go b/filter/impl/active_filter.go index 435bfe7488c520b14d770aea01b3c3baf4950056..d7dad74cf3f5ccadf39372335bc1efb22f497523 100644 --- a/filter/impl/active_filter.go +++ b/filter/impl/active_filter.go @@ -1,14 +1,19 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ // @author yiji@apache.org package impl diff --git a/filter/impl/generic_filter.go b/filter/impl/generic_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..12cb4c7fa59ac4f1f7c1466f385ea9e9b59567b5 --- /dev/null +++ b/filter/impl/generic_filter.go @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "reflect" + "strings" +) +import ( + hessian "github.com/apache/dubbo-go-hessian2" +) +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" + invocation2 "github.com/apache/dubbo-go/protocol/invocation" +) + +const ( + GENERIC = "generic" +) + +func init() { + extension.SetFilter(GENERIC, GetGenericFilter) +} + +// when do a generic invoke, struct need to be map + +type GenericFilter struct{} + +func (ef *GenericFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 { + oldArguments := invocation.Arguments() + var newParams []hessian.Object + if oldParams, ok := oldArguments[2].([]interface{}); ok { + for i := range oldParams { + newParams = append(newParams, hessian.Object(struct2MapAll(oldParams[i]))) + } + } else { + return invoker.Invoke(invocation) + } + newArguments := []interface{}{ + oldArguments[0], + oldArguments[1], + newParams, + } + newInvocation := invocation2.NewRPCInvocation(invocation.MethodName(), newArguments, invocation.Attachments()) + newInvocation.SetReply(invocation.Reply()) + return invoker.Invoke(newInvocation) + } + return invoker.Invoke(invocation) +} + +func (ef *GenericFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return result +} + +func GetGenericFilter() filter.Filter { + return &GenericFilter{} +} +func struct2MapAll(obj interface{}) interface{} { + if obj == nil { + return obj + } + t := reflect.TypeOf(obj) + v := reflect.ValueOf(obj) + if t.Kind() == reflect.Struct { + result := make(map[string]interface{}, t.NumField()) + for i := 0; i < t.NumField(); i++ { + if v.Field(i).Kind() == reflect.Struct { + if v.Field(i).CanInterface() { + setInMap(result, t.Field(i), struct2MapAll(v.Field(i).Interface())) + } + } else if v.Field(i).Kind() == reflect.Slice { + if v.Field(i).CanInterface() { + setInMap(result, t.Field(i), struct2MapAll(v.Field(i).Interface())) + } + } else { + if v.Field(i).CanInterface() { + setInMap(result, t.Field(i), v.Field(i).Interface()) + } + } + } + return result + } else if t.Kind() == reflect.Slice { + value := reflect.ValueOf(obj) + var newTemps = make([]interface{}, 0, value.Len()) + for i := 0; i < value.Len(); i++ { + newTemp := struct2MapAll(value.Index(i).Interface()) + newTemps = append(newTemps, newTemp) + } + return newTemps + } else { + return obj + } +} +func setInMap(m map[string]interface{}, structField reflect.StructField, value interface{}) (result map[string]interface{}) { + result = m + if tagName := structField.Tag.Get("m"); tagName == "" { + result[headerAtoa(structField.Name)] = value + } else { + result[tagName] = value + } + return +} +func headerAtoa(a string) (b string) { + b = strings.ToLower(a[:1]) + a[1:] + return +} diff --git a/filter/impl/generic_filter_test.go b/filter/impl/generic_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a71a9db95759a143186fe9a1a4fb0c861c8949e8 --- /dev/null +++ b/filter/impl/generic_filter_test.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "reflect" + "testing" +) +import ( + "github.com/stretchr/testify/assert" +) + +func Test_struct2MapAll(t *testing.T) { + var testData struct { + AaAa string `m:"aaAa"` + BaBa string + CaCa struct { + AaAa string + BaBa string `m:"baBa"` + XxYy struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + } `m:"xxYy"` + } `m:"caCa"` + } + testData.AaAa = "1" + testData.BaBa = "1" + testData.CaCa.BaBa = "2" + testData.CaCa.AaAa = "2" + testData.CaCa.XxYy.xxXx = "3" + testData.CaCa.XxYy.Xx = "3" + m := struct2MapAll(testData).(map[string]interface{}) + assert.Equal(t, "1", m["aaAa"].(string)) + assert.Equal(t, "1", m["baBa"].(string)) + assert.Equal(t, "2", m["caCa"].(map[string]interface{})["aaAa"].(string)) + assert.Equal(t, "3", m["caCa"].(map[string]interface{})["xxYy"].(map[string]interface{})["xx"].(string)) + + assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"]).Kind()) + assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"].(map[string]interface{})["xxYy"]).Kind()) +} + +type testStruct struct { + AaAa string + BaBa string `m:"baBa"` + XxYy struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + } `m:"xxYy"` +} + +func Test_struct2MapAll_Slice(t *testing.T) { + var testData struct { + AaAa string `m:"aaAa"` + BaBa string + CaCa []testStruct `m:"caCa"` + } + testData.AaAa = "1" + testData.BaBa = "1" + var tmp testStruct + tmp.BaBa = "2" + tmp.AaAa = "2" + tmp.XxYy.xxXx = "3" + tmp.XxYy.Xx = "3" + testData.CaCa = append(testData.CaCa, tmp) + m := struct2MapAll(testData).(map[string]interface{}) + + assert.Equal(t, "1", m["aaAa"].(string)) + assert.Equal(t, "1", m["baBa"].(string)) + assert.Equal(t, "2", m["caCa"].([]interface{})[0].(map[string]interface{})["aaAa"].(string)) + assert.Equal(t, "3", m["caCa"].([]interface{})[0].(map[string]interface{})["xxYy"].(map[string]interface{})["xx"].(string)) + + assert.Equal(t, reflect.Slice, reflect.TypeOf(m["caCa"]).Kind()) + assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"].([]interface{})[0].(map[string]interface{})["xxYy"]).Kind()) +} diff --git a/go.mod b/go.mod index aa7ade2f9d5778f088c5ffbb172d4e695f36ad55..7a970d44f0c876021e0d3544760197ac7f26f240 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/apache/dubbo-go require ( - github.com/Workiva/go-datastructures v1.0.50 // indirect + github.com/Workiva/go-datastructures v1.0.50 github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible @@ -12,7 +12,8 @@ require ( github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 github.com/gogo/protobuf v1.2.1 // indirect - github.com/golang/mock v1.3.1 // indirect + github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect + github.com/golang/mock v1.3.1 github.com/google/btree v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect @@ -28,6 +29,7 @@ require ( github.com/stretchr/testify v1.3.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect + go.etcd.io/bbolt v1.3.3 // indirect go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 diff --git a/go.sum b/go.sum index 57651bf482aad9a10bf314e1b6fac77a53eb5b66..2689e414029eff278e2cc8aa7e141de153a2742a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -38,8 +39,12 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -72,8 +77,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -121,6 +128,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw= go.etcd.io/etcd v3.3.13+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= @@ -146,6 +155,7 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -171,7 +181,9 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/protocol/RpcStatus.go b/protocol/RpcStatus.go index b9f3e6ecb18583034e310471164773bebe689cd9..78796b6beaf24dac33d7e0210703a9027f9fe568 100644 --- a/protocol/RpcStatus.go +++ b/protocol/RpcStatus.go @@ -1,14 +1,19 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ // @author yiji@apache.org package protocol diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index b8822311ef83776c62a13a0ea468a185fb94cabf..8c6c8a5a53af4df9a89eae5db5752eb07f3aa446 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -23,10 +23,11 @@ import ( ) import ( - "github.com/apache/dubbo-go-hessian2" + hessian "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" diff --git a/protocol/invoker.go b/protocol/invoker.go index fe6aab848caceed50fc3db3e657ce87c45eac2ed..f5d41a09ad2778c12c7e5e68167a4d0acc9e3f4c 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +//go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker // Extension - Invoker type Invoker interface { common.Node diff --git a/protocol/mock/mock_invoker.go b/protocol/mock/mock_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..557dafa277e95c39d0b960436ac10ba8842c9186 --- /dev/null +++ b/protocol/mock/mock_invoker.go @@ -0,0 +1,87 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: invoker.go + +// Package mock is a generated GoMock package. +package mock + +import ( + "reflect" +) + +import ( + "github.com/golang/mock/gomock" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +// MockInvoker is a mock of Invoker interface +type MockInvoker struct { + ctrl *gomock.Controller + recorder *MockInvokerMockRecorder +} + +// MockInvokerMockRecorder is the mock recorder for MockInvoker +type MockInvokerMockRecorder struct { + mock *MockInvoker +} + +// NewMockInvoker creates a new mock instance +func NewMockInvoker(ctrl *gomock.Controller) *MockInvoker { + mock := &MockInvoker{ctrl: ctrl} + mock.recorder = &MockInvokerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockInvoker) EXPECT() *MockInvokerMockRecorder { + return m.recorder +} + +// GetUrl mocks base method +func (m *MockInvoker) GetUrl() common.URL { + ret := m.ctrl.Call(m, "GetUrl") + ret0, _ := ret[0].(common.URL) + return ret0 +} + +// GetUrl indicates an expected call of GetUrl +func (mr *MockInvokerMockRecorder) GetUrl() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUrl", reflect.TypeOf((*MockInvoker)(nil).GetUrl)) +} + +// IsAvailable mocks base method +func (m *MockInvoker) IsAvailable() bool { + ret := m.ctrl.Call(m, "IsAvailable") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsAvailable indicates an expected call of IsAvailable +func (mr *MockInvokerMockRecorder) IsAvailable() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAvailable", reflect.TypeOf((*MockInvoker)(nil).IsAvailable)) +} + +// Destroy mocks base method +func (m *MockInvoker) Destroy() { + m.ctrl.Call(m, "Destroy") +} + +// Destroy indicates an expected call of Destroy +func (mr *MockInvokerMockRecorder) Destroy() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Destroy", reflect.TypeOf((*MockInvoker)(nil).Destroy)) +} + +// Invoke mocks base method +func (m *MockInvoker) Invoke(arg0 protocol.Invocation) protocol.Result { + ret := m.ctrl.Call(m, "Invoke", arg0) + ret0, _ := ret[0].(protocol.Result) + return ret0 +} + +// Invoke indicates an expected call of Invoke +func (mr *MockInvokerMockRecorder) Invoke(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invoke", reflect.TypeOf((*MockInvoker)(nil).Invoke), arg0) +} diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index db50b3d46dce2dca8b29582f97fee37fef6db093..fff2271b0f09bc2f83335fc3323de64e49219920 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -224,8 +224,8 @@ func (c *Client) maintenanceStatusLoop(s *concurrency.Session) { c.lock.Lock() // when etcd server stopped, cancel ctx, stop all watchers c.clean() - // when connection lose, stop client, trigger reconnect to etcd - c.stop() + // when connection lose, stop client, trigger reconnect to etcd + c.stop() c.lock.Unlock() return } diff --git a/remoting/etcdv3/facede_test.go b/remoting/etcdv3/facede_test.go index 8718fc3b7456ff74a3e071713fbcd56356bbc8a6..702aaece817283eafd7b7db96b8bb076f1b30cd7 100644 --- a/remoting/etcdv3/facede_test.go +++ b/remoting/etcdv3/facede_test.go @@ -6,7 +6,7 @@ import ( "time" ) -import( +import ( "github.com/apache/dubbo-go/common" )