diff --git a/.gitignore b/.gitignore index 3d70f85a59b1367114bf04a7286fbb020d6825a4..3769d10aa12348464cb315fd336bf6b7234069cc 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ vendor/ logs/ .vscode/ +coverage.txt diff --git a/.travis.yml b/.travis.yml index 2038d8ecc81ce319906b66333458eb6eda9afc30..a4cbb8f750991a2e7538fb666823dadd695cfcd6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: go go: - - "1.11" - "1.12" env: diff --git a/README.md b/README.md index b64ce4973d6df5efcb560058b857d455e6371a57..593be70222b69f5aa62ac0b781a0a3be6b0e1ff5 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,16 @@ Finished List: - Codec: JsonRPC v2, Hessian v2 - Registry: ZooKeeper - Configure Center: Zookeeper -- Cluster Strategy: Failover -- Load Balance: Random, RoundRobin, LeastActive +- Cluster Strategy: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/Available/Broadcast +- Load Balance: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65) - Filter: Echo Health Check +- Other feature: [generic invoke](https://github.com/apache/dubbo-go/pull/122)/start check/connecting certain provider/multi-protocols/multi-registries/multi-versions/service group Working List: -- Cluster Strategy: Failfast/Failsafe/Failback/Forking +- Cluster Strategy: Forking - Load Balance: ConsistentHash -- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter +- Filter: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter - Registry: etcd/k8s/consul/nacos - Configure Center: apollo - Dynamic Configuration Center & Metadata Center (dubbo v2.7.x) @@ -55,7 +56,7 @@ You can know more about dubbo-go by its [roadmap](https://github.com/apache/dubb ## Document -Move [here](https://dubbogo.github.io/dubbo-go-website/) +TODO ## Quick Start diff --git a/README_CN.md b/README_CN.md index 0553527534506f21eb9bd154c034b8345298f93c..7732a655adfbaa2a2a963201d6580767758c4fce 100644 --- a/README_CN.md +++ b/README_CN.md @@ -26,29 +26,29 @@ Apache License, Version 2.0 瀹炵幇鍒楄〃: -- Role: Consumer, Provider -- Transport: HTTP, TCP -- Codec: JsonRPC v2, Hessian v2 -- Registry: ZooKeeper -- Routing: Rule(dubbo v2.6.x) -- Configure Center: Zookeeper -- Cluster Strategy: Failover -- Load Balance: Random, RoundRobin, LeastActive -- Filter: Echo Health Check +- 瑙掕壊绔�: Consumer, Provider +- 浼犺緭鍗忚: HTTP, TCP +- 搴忓垪鍖栧崗璁�: JsonRPC v2, Hessian v2 +- 娉ㄥ唽涓績: ZooKeeper +- 閰嶇疆涓績: Zookeeper +- 闆嗙兢绛栫暐: Failover/[Failfast](https://github.com/apache/dubbo-go/pull/140)/[Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136)/Available/Broadcast +- 璐熻浇鍧囪 绛栫暐: Random/[RoundRobin](https://github.com/apache/dubbo-go/pull/66)/[LeastActive](https://github.com/apache/dubbo-go/pull/65) +- 杩囨护鍣�: Echo Health Check +- 鍏朵粬鍔熻兘鏀寔: [娉涘寲璋冪敤](https://github.com/apache/dubbo-go/pull/122)/鍚姩鏃舵鏌�/鏈嶅姟鐩磋繛/澶氭湇鍔″崗璁�/澶氭敞鍐屼腑蹇�/澶氭湇鍔$増鏈�/鏈嶅姟鍒嗙粍 寮€鍙戜腑鍒楄〃: -- Cluster Strategy: Failfast/Failsafe/Failback/Forking -- Load Balance: ConsistentHash -- Filter: TokenFilter/AccessLogFilter/CountFilter/ActiveLimitFilter/ExecuteLimitFilter/GenericFilter/TpsLimitFilter -- Registry: etcd/k8s/consul/nacos -- Configure Center: apollo -- Dynamic Configuration Center & Metadata Center (dubbo v2.7.x) +- 闆嗙兢绛栫暐: Forking +- 璐熻浇鍧囪 绛栫暐: ConsistentHash +- 杩囨护鍣�: TokenFilter/AccessLogFilter/CountFilter/ExecuteLimitFilter/TpsLimitFilter +- 娉ㄥ唽涓績: etcd/k8s/consul/nacos +- 閰嶇疆涓績: apollo +- 鍔ㄦ€侀厤缃腑蹇� & 鍏冩暟鎹腑蹇� (dubbo v2.7.x) - Metrics: Promethus(dubbo v2.7.x) 浠诲姟鍒楄〃: -- Registry: kubernetes +- 娉ㄥ唽涓績: kubernetes - Routing: istio - tracing (dubbo ecosystem) @@ -56,7 +56,7 @@ Apache License, Version 2.0 ## 鏂囨。 -绉绘[杩欓噷](https://dubbogo.github.io/dubbo-go-website/) +TODO ## 蹇€熷紑濮� ## diff --git a/cluster/cluster_impl/available_cluster.go b/cluster/cluster_impl/available_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..7e748cd938319ff437bb3fb6c7945b857d316069 --- /dev/null +++ b/cluster/cluster_impl/available_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type availableCluster struct{} + +const available = "available" + +func init() { + extension.SetCluster(available, NewAvailableCluster) +} + +func NewAvailableCluster() cluster.Cluster { + return &availableCluster{} +} + +func (cluser *availableCluster) Join(directory cluster.Directory) protocol.Invoker { + return NewAvailableClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/available_cluster_invoker.go b/cluster/cluster_impl/available_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..c59c0702c216fe5c58d190a023322aaa00ac9c17 --- /dev/null +++ b/cluster/cluster_impl/available_cluster_invoker.go @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "fmt" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/protocol" +) + +type availableClusterInvoker struct { + baseClusterInvoker +} + +func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &availableClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + for _, ivk := range invokers { + if ivk.IsAvailable() { + return ivk.Invoke(invocation) + } + } + return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))} +} diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster_impl/available_cluster_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..04032a7f24dec0e73acb15921f753921391f1515 --- /dev/null +++ b/cluster/cluster_impl/available_cluster_invoker_test.go @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "strings" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + availableUrl, _ = common.NewURL(context.Background(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +func registerAvailable(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + availableCluster := NewAvailableCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + invoker.EXPECT().GetUrl().Return(availableUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := availableCluster.Join(staticDir) + return clusterInvoker +} + +func TestAvailableClusterInvokerSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerAvailable(t, invoker) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().IsAvailable().Return(true) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.Equal(t, mockResult, result) +} + +func TestAvailableClusterInvokerNoAvail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerAvailable(t, invoker) + + invoker.EXPECT().IsAvailable().Return(false) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NotNil(t, result.Error()) + assert.True(t, strings.Contains(result.Error().Error(), "no provider available")) + assert.Nil(t, result.Result()) +} diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index 6cbed7748e496afb37f9a1e93dbc40e845c62b06..2426258b95e8a2ecf5067b2ed469eeb9a5617f18 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -25,6 +25,8 @@ import ( import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/version" @@ -115,12 +117,24 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p } func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool { - if len(invoked) > 0 { - for _, i := range invoked { - if i == selectedInvoker { - return true - } + for _, i := range invoked { + if i == selectedInvoker { + return true } } return false } + +func getLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) cluster.LoadBalance { + url := invoker.GetUrl() + + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); len(v) > 0 { + lb = v + } + return extension.GetLoadbalance(lb) +} diff --git a/cluster/cluster_impl/broadcast_cluster.go b/cluster/cluster_impl/broadcast_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..50aae3cfab8d67570b50dcab4e53bbfad29d6d30 --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type broadcastCluster struct{} + +const broadcast = "broadcast" + +func init() { + extension.SetCluster(broadcast, NewBroadcastCluster) +} + +func NewBroadcastCluster() cluster.Cluster { + return &broadcastCluster{} +} + +func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker { + return newBroadcastClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/broadcast_cluster_invoker.go b/cluster/cluster_impl/broadcast_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..238df0acfa7fb946e38bfbfd490bce7c0bb34e60 --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster_invoker.go @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +type broadcastClusterInvoker struct { + baseClusterInvoker +} + +func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &broadcastClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + var result protocol.Result + for _, ivk := range invokers { + result = ivk.Invoke(invocation) + if result.Error() != nil { + logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk) + err = result.Error() + } + } + if err != nil { + return &protocol.RPCResult{Err: err} + } + return result +} diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster_impl/broadcast_cluster_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..565684a8ae25c648ff77aef71d2ced0665202fe7 --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster_invoker_test.go @@ -0,0 +1,109 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "errors" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + broadcastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + + invokers := []protocol.Invoker{} + for i, ivk := range mockInvokers { + invokers = append(invokers, ivk) + if i == 0 { + ivk.EXPECT().GetUrl().Return(broadcastUrl) + } + } + staticDir := directory.NewStaticDirectory(invokers) + + broadcastCluster := NewBroadcastCluster() + clusterInvoker := broadcastCluster.Join(staticDir) + return clusterInvoker +} + +func Test_BroadcastInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + for i := 0; i < 3; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + + clusterInvoker := registerBroadcast(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) +} + +func Test_BroadcastInvokeFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockFailedResult := &protocol.RPCResult{Err: errors.New("just failed")} + for i := 0; i < 10; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + } + for i := 0; i < 10; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + + clusterInvoker := registerBroadcast(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockFailedResult.Err, result.Error()) +} diff --git a/cluster/cluster_impl/failback_cluster.go b/cluster/cluster_impl/failback_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..de22c78e947d0b8124add721ab7ff42efebcdbe4 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failbackCluster struct{} + +const failback = "failback" + +func init() { + extension.SetCluster(failback, NewFailbackCluster) +} + +func NewFailbackCluster() cluster.Cluster { + return &failbackCluster{} +} + +func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailbackClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..027461ccb7f32d6383d756ac986295b6300f249c --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "sync" + "time" +) + +import ( + "github.com/Workiva/go-datastructures/queue" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +/** + * When fails, record failure requests and schedule for retry on a regular interval. + * Especially useful for services of notification. + * + * <a href="http://en.wikipedia.org/wiki/Failback">Failback</a> + */ +type failbackClusterInvoker struct { + baseClusterInvoker + + once sync.Once + ticker *time.Ticker + maxRetries int64 + failbackTasks int64 + taskList *queue.Queue +} + +func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { + invoker := &failbackClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } + retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) + if retriesConfig <= 0 { + retriesConfig = constant.DEFAULT_FAILBACK_TIMES + } + failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) + if failbackTasksConfig <= 0 { + failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS + } + invoker.maxRetries = retriesConfig + invoker.failbackTasks = failbackTasksConfig + return invoker +} + +func (invoker *failbackClusterInvoker) process() { + invoker.ticker = time.NewTicker(time.Second * 1) + for range invoker.ticker.C { + // check each timeout task and re-run + for { + value, err := invoker.taskList.Peek() + if err == queue.ErrDisposed { + return + } + if err == queue.ErrEmptyQueue { + break + } + + retryTask := value.(*retryTimerTask) + if time.Since(retryTask.lastT).Seconds() < 5 { + break + } + + // ignore return. the get must success. + _, err = invoker.taskList.Get(1) + if err != nil { + logger.Warnf("get task found err: %v\n", err) + break + } + + go func(retryTask *retryTimerTask) { + invoked := make([]protocol.Invoker, 0) + invoked = append(invoked, retryTask.lastInvoker) + + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) + var result protocol.Result + result = retryInvoker.Invoke(retryTask.invocation) + if result.Error() != nil { + retryTask.lastInvoker = retryInvoker + invoker.checkRetry(retryTask, result.Error()) + } + }(retryTask) + + } + } +} + +func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) { + logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n", + retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error()) + retryTask.retries++ + retryTask.lastT = time.Now() + if retryTask.retries > invoker.maxRetries { + logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n", + retryTask.retries, retryTask.invocation) + } else { + invoker.taskList.Put(retryTask) + } +} + +func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", + invocation.MethodName(), invoker.GetUrl().Service(), err) + return &protocol.RPCResult{} + } + url := invokers[0].GetUrl() + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := make([]protocol.Invoker, 0, len(invokers)) + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + invoker.once.Do(func() { + invoker.taskList = queue.New(invoker.failbackTasks) + go invoker.process() + }) + + taskLen := invoker.taskList.Len() + if taskLen >= invoker.failbackTasks { + logger.Warnf("tasklist is too full > %d.\n", taskLen) + return &protocol.RPCResult{} + } + + timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk) + invoker.taskList.Put(timerTask) + + logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", + methodName, url.Service(), result.Error().Error()) + // ignore + return &protocol.RPCResult{} + } + + return result +} + +func (invoker *failbackClusterInvoker) Destroy() { + invoker.baseClusterInvoker.Destroy() + + // stop ticker + if invoker.ticker != nil { + invoker.ticker.Stop() + } + + _ = invoker.taskList.Dispose() +} + +type retryTimerTask struct { + loadbalance cluster.LoadBalance + invocation protocol.Invocation + invokers []protocol.Invoker + lastInvoker protocol.Invoker + retries int64 + lastT time.Time +} + +func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, + lastInvoker protocol.Invoker) *retryTimerTask { + return &retryTimerTask{ + loadbalance: loadbalance, + invocation: invocation, + invokers: invokers, + lastInvoker: lastInvoker, + lastT: time.Now(), + } +} diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c94347a1251a69a10c0a4d50007ef569bd6dd996 --- /dev/null +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -0,0 +1,242 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "sync" + "testing" + "time" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// registerFailback register failbackCluster to cluster extension. +func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failbackCluster := NewFailbackCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failbackCluster.Join(staticDir) + return clusterInvoker +} + +// success firstly, failback should return origin invoke result. +func Test_FailbackSuceess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) +} + +// failed firstly, success later after one retry. +func Test_FailbackRetryOneSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // failed at first + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + + // success second + var wg sync.WaitGroup + wg.Add(1) + now := time.Now() + mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= 5) + wg.Done() + return mockSuccResult + }) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + // ensure the retry task has been executed + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + // wait until the retry task is executed, the taskList will be empty. + wg.Wait() + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +// failed firstly, and failed again after ech retry time. +func Test_FailbackRetryFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + + // + var wg sync.WaitGroup + retries := 2 + wg.Add(retries) + now := time.Now() + + // add retry call that eventually failed. + for i := 0; i < retries; i++ { + j := i + 1 + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= int64(5*j)) + wg.Done() + return mockFailedResult + }) + } + + // first call should failed. + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + wg.Wait() + time.Sleep(time.Second) + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + // after destroy, the taskList will be empty + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +// add 10 tasks but all failed firstly, and failed again with one retry. +func Test_FailbackRetryFailed10Times(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker.maxRetries = 10 + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // 10 task should failed firstly. + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(10) + + // 10 task should retry and failed. + var wg sync.WaitGroup + wg.Add(10) + now := time.Now() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result { + delta := time.Since(now).Nanoseconds() / int64(time.Second) + assert.True(t, delta >= 5) + wg.Done() + return mockFailedResult + }).Times(10) + + for i := 0; i < 10; i++ { + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + } + + wg.Wait() + time.Sleep(time.Second) // in order to ensure checkRetry have done + assert.Equal(t, int64(10), clusterInvoker.taskList.Len()) + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() + + assert.Equal(t, int64(0), clusterInvoker.taskList.Len()) +} + +func Test_FailbackOutOfLimit(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker) + clusterInvoker.failbackTasks = 1 + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).Times(11) + + // reached limit + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + // all will be out of limit + for i := 0; i < 10; i++ { + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + assert.Equal(t, int64(1), clusterInvoker.taskList.Len()) + } + + invoker.EXPECT().Destroy().Return() + clusterInvoker.Destroy() +} diff --git a/cluster/cluster_impl/failfast_cluster.go b/cluster/cluster_impl/failfast_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..6301d945626103226132b433dd21e8647f53a38b --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failfastCluster struct{} + +const failfast = "failfast" + +func init() { + extension.SetCluster(failfast, NewFailFastCluster) +} + +func NewFailFastCluster() cluster.Cluster { + return &failfastCluster{} +} + +func (cluster *failfastCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailFastClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failfast_cluster_invoker.go b/cluster/cluster_impl/failfast_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..734ea2c6cb19bf54a338a76a10c9cfcc59d3954b --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster_invoker.go @@ -0,0 +1,51 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/protocol" +) + +type failfastClusterInvoker struct { + baseClusterInvoker +} + +func newFailFastClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failfastClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failfastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + loadbalance := getLoadBalance(invokers[0], invocation) + + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + ivk := invoker.doSelect(loadbalance, invocation, invokers, nil) + return ivk.Invoke(invocation) +} diff --git a/cluster/cluster_impl/failfast_cluster_test.go b/cluster/cluster_impl/failfast_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7a19e80ccda15aa13a1c4fcf250e05a6effa7f0b --- /dev/null +++ b/cluster/cluster_impl/failfast_cluster_test.go @@ -0,0 +1,97 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failfastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// registerFailfast register failfastCluster to cluster extension. +func registerFailfast(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failfastCluster := NewFailFastCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failfastCluster.Join(staticDir) + return clusterInvoker +} + +func Test_FailfastInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailfast(t, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + res := result.Result().(rest) + assert.True(t, res.success) + assert.Equal(t, 0, res.tried) +} + +func Test_FailfastInvokeFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerFailfast(t, invoker) + + invoker.EXPECT().GetUrl().Return(failfastUrl) + + mockResult := &protocol.RPCResult{Err: perrors.New("error")} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NotNil(t, result.Error()) + assert.Equal(t, "error", result.Error().Error()) + assert.Nil(t, result.Result()) +} diff --git a/cluster/cluster_impl/failover_cluster_invoker.go b/cluster/cluster_impl/failover_cluster_invoker.go index cd17a85e00a1d4307b3b861467051e9719345d1a..69664269e2ccf362e6b7a12488fe006802d460e2 100644 --- a/cluster/cluster_impl/failover_cluster_invoker.go +++ b/cluster/cluster_impl/failover_cluster_invoker.go @@ -24,7 +24,6 @@ import ( import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/version" @@ -48,17 +47,11 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr if err != nil { return &protocol.RPCResult{Err: err} } - url := invokers[0].GetUrl() - methodName := invocation.MethodName() - //Get the service loadbalance config - lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + loadbalance := getLoadBalance(invokers[0], invocation) - //Get the service method loadbalance config if have - if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { - lb = v - } - loadbalance := extension.GetLoadbalance(lb) + methodName := invocation.MethodName() + url := invokers[0].GetUrl() //get reties retries := url.GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_RETRIES) diff --git a/cluster/cluster_impl/failsafe_cluster.go b/cluster/cluster_impl/failsafe_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..3ff97d25eae80980a90a03e71865bb8f9a63defe --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type failsafeCluster struct{} + +const failsafe = "failsafe" + +func init() { + extension.SetCluster(failsafe, NewFailsafeCluster) +} + +func NewFailsafeCluster() cluster.Cluster { + return &failsafeCluster{} +} + +func (cluster *failsafeCluster) Join(directory cluster.Directory) protocol.Invoker { + return newFailsafeClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster_impl/failsafe_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..b95f997fef87cf466f07c4e506e41758e7998e52 --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_invoker.go @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +/** + * When invoke fails, log the error message and ignore this error by returning an empty Result. + * Usually used to write audit logs and other operations + * + * <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a> + * + */ +type failsafeClusterInvoker struct { + baseClusterInvoker +} + +func newFailsafeClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &failsafeClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *failsafeClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{} + } + + url := invokers[0].GetUrl() + methodName := invocation.MethodName() + //Get the service loadbalance config + lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) + //Get the service method loadbalance config if have + if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { + lb = v + } + loadbalance := extension.GetLoadbalance(lb) + + invoked := make([]protocol.Invoker, 0) + var result protocol.Result + + ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) + //DO INVOKE + result = ivk.Invoke(invocation) + if result.Error() != nil { + // ignore + logger.Errorf("Failsafe ignore exception: %v.\n", result.Error().Error()) + return &protocol.RPCResult{} + } + return result +} diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9ee9d9fee31b0cb24d877ab3dc0e24fb552f5f11 --- /dev/null +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -0,0 +1,95 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + failsafeUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// register_failsafe register failsafeCluster to cluster extension. +func register_failsafe(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failsafeCluster := NewFailsafeCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failsafeCluster.Join(staticDir) + return clusterInvoker +} + +func Test_FailSafeInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) + + invoker.EXPECT().GetUrl().Return(failsafeUrl) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + res := result.Result().(rest) + assert.True(t, res.success) +} + +func Test_FailSafeInvokeFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) + + invoker.EXPECT().GetUrl().Return(failsafeUrl) + + mockResult := &protocol.RPCResult{Err: perrors.New("error")} + + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + assert.Nil(t, result.Result()) +} diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go index 695ca21b0ed53dcf94907223e4c222af17311db9..39c5620f02d607bb68430e17a206a1649ee31c54 100644 --- a/cluster/loadbalance/least_active.go +++ b/cluster/loadbalance/least_active.go @@ -1,14 +1,19 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ // @author yiji@apache.org package loadbalance diff --git a/cluster/loadbalance/round_robin.go b/cluster/loadbalance/round_robin.go index e173e211c3630f4d4786edc2c1a09709fd7bf0a1..075acac7cdc60086ececb7b655dee86ec5198369 100644 --- a/cluster/loadbalance/round_robin.go +++ b/cluster/loadbalance/round_robin.go @@ -1,15 +1,19 @@ -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package loadbalance diff --git a/common/constant/default.go b/common/constant/default.go index 05461ca6e7360e08d716d6b78d20ad4411df99e8..d103c6aadc0f76d4c4d82fedb89ea2f21f5eb7aa 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -26,11 +26,13 @@ const ( ) const ( - DEFAULT_LOADBALANCE = "random" - DEFAULT_RETRIES = 2 - DEFAULT_PROTOCOL = "dubbo" - DEFAULT_REG_TIMEOUT = "10s" - DEFAULT_CLUSTER = "failover" + DEFAULT_LOADBALANCE = "random" + DEFAULT_RETRIES = 2 + DEFAULT_PROTOCOL = "dubbo" + DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_CLUSTER = "failover" + DEFAULT_FAILBACK_TIMES = 3 + DEFAULT_FAILBACK_TASKS = 100 ) const ( @@ -38,6 +40,8 @@ const ( PREFIX_DEFAULT_KEY = "default." DEFAULT_SERVICE_FILTERS = "echo" DEFAULT_REFERENCE_FILTERS = "" + GENERIC_REFERENCE_FILTERS = "generic" + GENERIC = "$invoke" ECHO = "$echo" ) diff --git a/common/constant/key.go b/common/constant/key.go index bca658b2623e3ca9241103b5f9c82c7e15fb4062..4d2664baf9b311f6ed8aa8e295599d7e8e5846c4 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -30,6 +30,7 @@ const ( METHODS_KEY = "methods" TIMEOUT_KEY = "timeout" BEAN_NAME_KEY = "bean.name" + GENERIC_KEY = "generic" ) const ( @@ -45,6 +46,8 @@ const ( WEIGHT_KEY = "weight" WARMUP_KEY = "warmup" RETRIES_KEY = "retries" + BEAN_NAME = "bean.name" + FAIL_BACK_TASKS_KEY = "failbacktasks" ) const ( diff --git a/config/config_loader.go b/config/config_loader.go index 0dd0fb7f963e09a4402f4fd01c8a6b91959a7ca3..720f65f5de6b24b445b7ce07dd2daaba0a856681 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -66,6 +66,10 @@ func Load() { logger.Errorf("[consumer config center refresh] %#v", err) } for key, ref := range consumerConfig.References { + if ref.Generic { + genericService := NewGenericService(key) + SetConsumerService(genericService) + } rpcService := GetConsumerService(key) if rpcService == nil { diff --git a/config/generic_service.go b/config/generic_service.go new file mode 100644 index 0000000000000000000000000000000000000000..8a4e88df9788554bc4a5ee33884166e4ccede37f --- /dev/null +++ b/config/generic_service.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package config + +type GenericService struct { + Invoke func(req []interface{}) (interface{}, error) `dubbo:"$invoke"` + referenceStr string +} + +func NewGenericService(referenceStr string) *GenericService { + return &GenericService{referenceStr: referenceStr} +} + +func (u *GenericService) Reference() string { + return u.referenceStr +} diff --git a/config/reference_config.go b/config/reference_config.go index a5b7d50db51df02cd1871b3fb6d0bd2d6a545dd2..67a4a3d816060475fa49182f7141aba9807849e1 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -55,6 +55,7 @@ type ReferenceConfig struct { Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` invoker protocol.Invoker urls []*common.URL + Generic bool `yaml:"generic" json:"generic,omitempty" property:"generic"` } func (c *ReferenceConfig) Prefix() string { @@ -93,7 +94,7 @@ func (refconfig *ReferenceConfig) Refer() { refconfig.urls = append(refconfig.urls, &serviceUrl) } else { if serviceUrl.Path == "" { - serviceUrl.Path = "/" + refconfig.InterfaceName + serviceUrl.Path = "/" + refconfig.id } // merge url need to do newUrl := common.MergeUrl(serviceUrl, url) @@ -110,7 +111,6 @@ func (refconfig *ReferenceConfig) Refer() { regUrl.SubURL = url } } - if len(refconfig.urls) == 1 { refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0]) } else { @@ -157,6 +157,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10)) urlMap.Set(constant.GROUP_KEY, refconfig.Group) urlMap.Set(constant.VERSION_KEY, refconfig.Version) + urlMap.Set(constant.GENERIC_KEY, strconv.FormatBool(refconfig.Generic)) //getty invoke async or sync urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async)) @@ -170,7 +171,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap.Set(constant.ENVIRONMENT_KEY, consumerConfig.ApplicationConfig.Environment) //filter - urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, refconfig.Filter, constant.DEFAULT_REFERENCE_FILTERS)) + var defaultReferenceFilter = constant.DEFAULT_REFERENCE_FILTERS + if refconfig.Generic { + defaultReferenceFilter = constant.GENERIC_REFERENCE_FILTERS + defaultReferenceFilter + } + urlMap.Set(constant.REFERENCE_FILTER_KEY, mergeValue(consumerConfig.Filter, refconfig.Filter, defaultReferenceFilter)) for _, v := range refconfig.Methods { urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance) @@ -180,3 +185,11 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values { return urlMap } +func (refconfig *ReferenceConfig) GenericLoad(id string) { + genericService := NewGenericService(refconfig.id) + SetConsumerService(genericService) + refconfig.id = id + refconfig.Refer() + refconfig.Implement(genericService) + return +} diff --git a/examples/general/dubbo/go-client/app/client.go b/examples/general/dubbo/go-client/app/client.go index e6f1ae96fb387a1756c8a1c0b026746378377fff..b7ee0e662a26251a79af13d36215cc2cb44fd794 100644 --- a/examples/general/dubbo/go-client/app/client.go +++ b/examples/general/dubbo/go-client/app/client.go @@ -25,16 +25,15 @@ import ( "syscall" "time" ) - import ( - "github.com/apache/dubbo-go-hessian2" + hessian "github.com/apache/dubbo-go-hessian2" ) import ( "github.com/apache/dubbo-go/common/logger" _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" - _ "github.com/apache/dubbo-go/protocol/dubbo" + "github.com/apache/dubbo-go/protocol/dubbo" _ "github.com/apache/dubbo-go/registry/protocol" _ "github.com/apache/dubbo-go/filter/impl" @@ -65,7 +64,8 @@ func main() { test1() println("\n\ntest2") test2() - + println("\n\ntest3") + test3() initSignal() } @@ -288,3 +288,24 @@ func test2() { } println("error: %v", err) } +func test3() { + var appName = "UserProviderGer" + var referenceConfig = config.ReferenceConfig{ + InterfaceName: "com.ikurento.user.UserProvider", + Cluster: "failover", + Registry: "hangzhouzk", + Protocol: dubbo.DUBBO, + Generic: true, + } + referenceConfig.GenericLoad(appName) //appName is the unique identification of RPCService + + time.Sleep(3 * time.Second) + println("\n\n\nstart to generic invoke") + resp, err := referenceConfig.GetRPCService().(*config.GenericService).Invoke([]interface{}{"GetUser", []string{"java.lang.String"}, []interface{}{"A003"}}) + if err != nil { + panic(err) + } + println("res: %v\n", resp) + println("succ!") + +} diff --git a/filter/impl/active_filter.go b/filter/impl/active_filter.go index 435bfe7488c520b14d770aea01b3c3baf4950056..d7dad74cf3f5ccadf39372335bc1efb22f497523 100644 --- a/filter/impl/active_filter.go +++ b/filter/impl/active_filter.go @@ -1,14 +1,19 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ // @author yiji@apache.org package impl diff --git a/filter/impl/generic_filter.go b/filter/impl/generic_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..12cb4c7fa59ac4f1f7c1466f385ea9e9b59567b5 --- /dev/null +++ b/filter/impl/generic_filter.go @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "reflect" + "strings" +) +import ( + hessian "github.com/apache/dubbo-go-hessian2" +) +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" + invocation2 "github.com/apache/dubbo-go/protocol/invocation" +) + +const ( + GENERIC = "generic" +) + +func init() { + extension.SetFilter(GENERIC, GetGenericFilter) +} + +// when do a generic invoke, struct need to be map + +type GenericFilter struct{} + +func (ef *GenericFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 { + oldArguments := invocation.Arguments() + var newParams []hessian.Object + if oldParams, ok := oldArguments[2].([]interface{}); ok { + for i := range oldParams { + newParams = append(newParams, hessian.Object(struct2MapAll(oldParams[i]))) + } + } else { + return invoker.Invoke(invocation) + } + newArguments := []interface{}{ + oldArguments[0], + oldArguments[1], + newParams, + } + newInvocation := invocation2.NewRPCInvocation(invocation.MethodName(), newArguments, invocation.Attachments()) + newInvocation.SetReply(invocation.Reply()) + return invoker.Invoke(newInvocation) + } + return invoker.Invoke(invocation) +} + +func (ef *GenericFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return result +} + +func GetGenericFilter() filter.Filter { + return &GenericFilter{} +} +func struct2MapAll(obj interface{}) interface{} { + if obj == nil { + return obj + } + t := reflect.TypeOf(obj) + v := reflect.ValueOf(obj) + if t.Kind() == reflect.Struct { + result := make(map[string]interface{}, t.NumField()) + for i := 0; i < t.NumField(); i++ { + if v.Field(i).Kind() == reflect.Struct { + if v.Field(i).CanInterface() { + setInMap(result, t.Field(i), struct2MapAll(v.Field(i).Interface())) + } + } else if v.Field(i).Kind() == reflect.Slice { + if v.Field(i).CanInterface() { + setInMap(result, t.Field(i), struct2MapAll(v.Field(i).Interface())) + } + } else { + if v.Field(i).CanInterface() { + setInMap(result, t.Field(i), v.Field(i).Interface()) + } + } + } + return result + } else if t.Kind() == reflect.Slice { + value := reflect.ValueOf(obj) + var newTemps = make([]interface{}, 0, value.Len()) + for i := 0; i < value.Len(); i++ { + newTemp := struct2MapAll(value.Index(i).Interface()) + newTemps = append(newTemps, newTemp) + } + return newTemps + } else { + return obj + } +} +func setInMap(m map[string]interface{}, structField reflect.StructField, value interface{}) (result map[string]interface{}) { + result = m + if tagName := structField.Tag.Get("m"); tagName == "" { + result[headerAtoa(structField.Name)] = value + } else { + result[tagName] = value + } + return +} +func headerAtoa(a string) (b string) { + b = strings.ToLower(a[:1]) + a[1:] + return +} diff --git a/filter/impl/generic_filter_test.go b/filter/impl/generic_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a71a9db95759a143186fe9a1a4fb0c861c8949e8 --- /dev/null +++ b/filter/impl/generic_filter_test.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package impl + +import ( + "reflect" + "testing" +) +import ( + "github.com/stretchr/testify/assert" +) + +func Test_struct2MapAll(t *testing.T) { + var testData struct { + AaAa string `m:"aaAa"` + BaBa string + CaCa struct { + AaAa string + BaBa string `m:"baBa"` + XxYy struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + } `m:"xxYy"` + } `m:"caCa"` + } + testData.AaAa = "1" + testData.BaBa = "1" + testData.CaCa.BaBa = "2" + testData.CaCa.AaAa = "2" + testData.CaCa.XxYy.xxXx = "3" + testData.CaCa.XxYy.Xx = "3" + m := struct2MapAll(testData).(map[string]interface{}) + assert.Equal(t, "1", m["aaAa"].(string)) + assert.Equal(t, "1", m["baBa"].(string)) + assert.Equal(t, "2", m["caCa"].(map[string]interface{})["aaAa"].(string)) + assert.Equal(t, "3", m["caCa"].(map[string]interface{})["xxYy"].(map[string]interface{})["xx"].(string)) + + assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"]).Kind()) + assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"].(map[string]interface{})["xxYy"]).Kind()) +} + +type testStruct struct { + AaAa string + BaBa string `m:"baBa"` + XxYy struct { + xxXx string `m:"xxXx"` + Xx string `m:"xx"` + } `m:"xxYy"` +} + +func Test_struct2MapAll_Slice(t *testing.T) { + var testData struct { + AaAa string `m:"aaAa"` + BaBa string + CaCa []testStruct `m:"caCa"` + } + testData.AaAa = "1" + testData.BaBa = "1" + var tmp testStruct + tmp.BaBa = "2" + tmp.AaAa = "2" + tmp.XxYy.xxXx = "3" + tmp.XxYy.Xx = "3" + testData.CaCa = append(testData.CaCa, tmp) + m := struct2MapAll(testData).(map[string]interface{}) + + assert.Equal(t, "1", m["aaAa"].(string)) + assert.Equal(t, "1", m["baBa"].(string)) + assert.Equal(t, "2", m["caCa"].([]interface{})[0].(map[string]interface{})["aaAa"].(string)) + assert.Equal(t, "3", m["caCa"].([]interface{})[0].(map[string]interface{})["xxYy"].(map[string]interface{})["xx"].(string)) + + assert.Equal(t, reflect.Slice, reflect.TypeOf(m["caCa"]).Kind()) + assert.Equal(t, reflect.Map, reflect.TypeOf(m["caCa"].([]interface{})[0].(map[string]interface{})["xxYy"]).Kind()) +} diff --git a/filter/impl/hystrix_filter.go b/filter/impl/hystrix_filter.go index 7a98e537fb7bbc54c35d8da5bd88217c6373f644..3fd9f87168616b69d5ec72460767890d6956c154 100644 --- a/filter/impl/hystrix_filter.go +++ b/filter/impl/hystrix_filter.go @@ -41,11 +41,11 @@ const ( ) var ( - isConsumerConfigLoaded = false - isProviderConfigLoaded = false - confConsumer = &HystrixFilterConfig{} - confProvider = &HystrixFilterConfig{} - configLoadMutex = sync.RWMutex{} + confConsumer = &HystrixFilterConfig{} + confProvider = &HystrixFilterConfig{} + configLoadMutex = sync.RWMutex{} + consumerConfigOnce sync.Once + providerConfigOnce sync.Once ) //The filter in the server end of dubbo-go can't get the invoke result for now, @@ -149,29 +149,24 @@ func (hf *HystrixFilter) OnResponse(result protocol.Result, invoker protocol.Inv } func GetHystrixFilterConsumer() filter.Filter { //When first called, load the config in - if !isConsumerConfigLoaded { + consumerConfigOnce.Do(func() { if err := initHystrixConfigConsumer(); err != nil { logger.Warnf("[Hystrix Filter]Config load failed for consumer, error is: %v , will use default", err) } - isConsumerConfigLoaded = true - } - + }) return &HystrixFilter{COrP: true} } func GetHystrixFilterProvider() filter.Filter { - if !isProviderConfigLoaded { + providerConfigOnce.Do(func() { if err := initHystrixConfigProvider(); err != nil { logger.Warnf("[Hystrix Filter]Config load failed for provider, error is: %v , will use default", err) } - isProviderConfigLoaded = true - } - + }) return &HystrixFilter{COrP: false} } func getConfig(service string, method string, cOrP bool) CommandConfigWithError { - //Find method level config var conf *HystrixFilterConfig if cOrP { diff --git a/filter/impl/hystrix_filter_test.go b/filter/impl/hystrix_filter_test.go index 8acc4917f53106d80619d262fdb9e5cae4b885b1..d3a5183ede25d8a325bb1c73020edddd2ffbc638 100644 --- a/filter/impl/hystrix_filter_test.go +++ b/filter/impl/hystrix_filter_test.go @@ -17,11 +17,11 @@ package impl import ( - "github.com/afex/hystrix-go/hystrix" "regexp" "testing" ) import ( + "github.com/afex/hystrix-go/hystrix" "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -48,28 +48,28 @@ func mockInitHystrixConfig() { make(map[string]ServiceHystrixConfig), } confConsumer.Configs["Default"] = &CommandConfigWithError{ - 1000, - 600, - 5, - 5000, - 5, - nil, + Timeout: 1000, + MaxConcurrentRequests: 600, + RequestVolumeThreshold: 5, + SleepWindow: 5000, + ErrorPercentThreshold: 5, + Error: nil, } confConsumer.Configs["userp"] = &CommandConfigWithError{ - 2000, - 64, - 15, - 4000, - 45, - nil, + Timeout: 2000, + MaxConcurrentRequests: 64, + RequestVolumeThreshold: 15, + SleepWindow: 4000, + ErrorPercentThreshold: 45, + Error: nil, } confConsumer.Configs["userp_m"] = &CommandConfigWithError{ - 1200, - 64, - 5, - 6000, - 60, - []string{ + Timeout: 1200, + MaxConcurrentRequests: 64, + RequestVolumeThreshold: 5, + SleepWindow: 6000, + ErrorPercentThreshold: 60, + Error: []string{ "exception", }, } diff --git a/go.mod b/go.mod index 345482280e5310e2861fa8f14934912d6561b034..c29167c082dad0a845fd218127ba9a8dd5fee068 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,13 @@ module github.com/apache/dubbo-go require ( + github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dubbogo/getty v1.2.2 github.com/dubbogo/gost v1.1.1 + github.com/golang/mock v1.3.1 github.com/magiconair/properties v1.8.1 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec diff --git a/go.sum b/go.sum index d4dd4e2535ef7f3bc2e41b137b43a82810405eb5..58ecf5c5e3ee98c9733c2799a23e9b35972147c6 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,18 @@ +github.com/Workiva/go-datastructures v1.0.50 h1:slDmfW6KCHcC7U+LP3DDBbm4fqTwZGn1beOFPfGaLvo= +github.com/Workiva/go-datastructures v1.0.50/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E= github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE= github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU= github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI= github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= @@ -41,11 +46,14 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE= golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/protocol/RpcStatus.go b/protocol/RpcStatus.go index b9f3e6ecb18583034e310471164773bebe689cd9..78796b6beaf24dac33d7e0210703a9027f9fe568 100644 --- a/protocol/RpcStatus.go +++ b/protocol/RpcStatus.go @@ -1,14 +1,19 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ // @author yiji@apache.org package protocol diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index b8822311ef83776c62a13a0ea468a185fb94cabf..8c6c8a5a53af4df9a89eae5db5752eb07f3aa446 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -23,10 +23,11 @@ import ( ) import ( - "github.com/apache/dubbo-go-hessian2" + hessian "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" perrors "github.com/pkg/errors" ) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" diff --git a/protocol/invoker.go b/protocol/invoker.go index fe6aab848caceed50fc3db3e657ce87c45eac2ed..f5d41a09ad2778c12c7e5e68167a4d0acc9e3f4c 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -22,6 +22,7 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +//go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker // Extension - Invoker type Invoker interface { common.Node diff --git a/protocol/mock/mock_invoker.go b/protocol/mock/mock_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..557dafa277e95c39d0b960436ac10ba8842c9186 --- /dev/null +++ b/protocol/mock/mock_invoker.go @@ -0,0 +1,87 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: invoker.go + +// Package mock is a generated GoMock package. +package mock + +import ( + "reflect" +) + +import ( + "github.com/golang/mock/gomock" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +// MockInvoker is a mock of Invoker interface +type MockInvoker struct { + ctrl *gomock.Controller + recorder *MockInvokerMockRecorder +} + +// MockInvokerMockRecorder is the mock recorder for MockInvoker +type MockInvokerMockRecorder struct { + mock *MockInvoker +} + +// NewMockInvoker creates a new mock instance +func NewMockInvoker(ctrl *gomock.Controller) *MockInvoker { + mock := &MockInvoker{ctrl: ctrl} + mock.recorder = &MockInvokerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockInvoker) EXPECT() *MockInvokerMockRecorder { + return m.recorder +} + +// GetUrl mocks base method +func (m *MockInvoker) GetUrl() common.URL { + ret := m.ctrl.Call(m, "GetUrl") + ret0, _ := ret[0].(common.URL) + return ret0 +} + +// GetUrl indicates an expected call of GetUrl +func (mr *MockInvokerMockRecorder) GetUrl() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUrl", reflect.TypeOf((*MockInvoker)(nil).GetUrl)) +} + +// IsAvailable mocks base method +func (m *MockInvoker) IsAvailable() bool { + ret := m.ctrl.Call(m, "IsAvailable") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsAvailable indicates an expected call of IsAvailable +func (mr *MockInvokerMockRecorder) IsAvailable() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAvailable", reflect.TypeOf((*MockInvoker)(nil).IsAvailable)) +} + +// Destroy mocks base method +func (m *MockInvoker) Destroy() { + m.ctrl.Call(m, "Destroy") +} + +// Destroy indicates an expected call of Destroy +func (mr *MockInvokerMockRecorder) Destroy() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Destroy", reflect.TypeOf((*MockInvoker)(nil).Destroy)) +} + +// Invoke mocks base method +func (m *MockInvoker) Invoke(arg0 protocol.Invocation) protocol.Result { + ret := m.ctrl.Call(m, "Invoke", arg0) + ret0, _ := ret[0].(protocol.Result) + return ret0 +} + +// Invoke indicates an expected call of Invoke +func (mr *MockInvokerMockRecorder) Invoke(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invoke", reflect.TypeOf((*MockInvoker)(nil).Invoke), arg0) +} diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index e0eb3c5514e3b1c9395fcc626d1e2ce36734f68f..f6a73d448525da51d9d5578a0d1ec552a1d4128d 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -18,6 +18,8 @@ package protocolwrapper import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/filter" "net/url" "testing" ) @@ -30,42 +32,17 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/filter" //"github.com/apache/dubbo-go/filter/impl" "github.com/apache/dubbo-go/protocol" ) -//To avoid import cycle -const ( - MOCK = "mock" -) - -type MockFilter struct { -} - -func (mf *MockFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - return invoker.Invoke(invocation) -} - -func (mf *MockFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - return result -} - -func GetFilter() filter.Filter { - return &MockFilter{} -} - -func init() { - extension.SetFilter(MOCK, GetFilter) -} - func TestProtocolFilterWrapper_Export(t *testing.T) { filtProto := extension.GetProtocol(FILTER) filtProto.(*ProtocolFilterWrapper).protocol = &protocol.BaseProtocol{} u := common.NewURLWithOptions( common.WithParams(url.Values{}), - common.WithParamsValue(constant.SERVICE_FILTER_KEY, MOCK)) + common.WithParamsValue(constant.SERVICE_FILTER_KEY, "echo")) exporter := filtProto.Export(protocol.NewBaseInvoker(*u)) _, ok := exporter.GetInvoker().(*FilterInvoker) assert.True(t, ok) @@ -77,8 +54,35 @@ func TestProtocolFilterWrapper_Refer(t *testing.T) { u := common.NewURLWithOptions( common.WithParams(url.Values{}), - common.WithParamsValue(constant.REFERENCE_FILTER_KEY, MOCK)) + common.WithParamsValue(constant.REFERENCE_FILTER_KEY, "echo")) invoker := filtProto.Refer(*u) _, ok := invoker.(*FilterInvoker) assert.True(t, ok) } + +//the same as echo filter, for test +func init() { + extension.SetFilter("echo", GetFilter) +} + +type EchoFilterForTest struct{} + +func (ef *EchoFilterForTest) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + logger.Infof("invoking echo filter.") + logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments())) + if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 { + return &protocol.RPCResult{ + Rest: invocation.Arguments()[0], + } + } + + return invoker.Invoke(invocation) +} + +func (ef *EchoFilterForTest) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return result +} + +func GetFilter() filter.Filter { + return &EchoFilterForTest{} +}