diff --git a/cluster/cluster_impl/forking_cluster.go b/cluster/cluster_impl/forking_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..c930d36c27c5cfc904414883dcfbef90ebf147c2 --- /dev/null +++ b/cluster/cluster_impl/forking_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type forkingCluster struct{} + +const fork = "fork" + +func init() { + extension.SetCluster(fork, NewForkingCluster) +} + +func NewForkingCluster() cluster.Cluster { + return &forkingCluster{} +} + +func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker { + return newForkingClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/forking_cluster_invoker.go b/cluster/cluster_impl/forking_cluster_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..c4a59a799e38dccf9443672b0136e304ae649929 --- /dev/null +++ b/cluster/cluster_impl/forking_cluster_invoker.go @@ -0,0 +1,95 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "errors" + "fmt" + "time" +) + +import ( + "github.com/Workiva/go-datastructures/queue" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" +) + +type forkingClusterInvoker struct { + baseClusterInvoker +} + +func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &forkingClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *forkingClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + err := invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + invokers := invoker.directory.List(invocation) + err = invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + var selected []protocol.Invoker + forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS)) + timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT) + if forks < 0 || forks > len(invokers) { + selected = invokers + } else { + selected = make([]protocol.Invoker, 0) + loadbalance := getLoadBalance(invokers[0], invocation) + for i := 0; i < forks; i++ { + ivk := invoker.doSelect(loadbalance, invocation, invokers, selected) + if ivk != nil { + selected = append(selected, ivk) + } + } + } + + resultQ := queue.New(1) + for _, ivk := range selected { + go func(k protocol.Invoker) { + result := k.Invoke(invocation) + resultQ.Put(result) + }(ivk) + } + + rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts)) + if err != nil { + return &protocol.RPCResult{ + Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but no luck to perform the invocation. Last error is: %s", selected, err.Error()))} + } + if len(rsps) == 0 { + return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but no resp", selected))} + } + result, ok := rsps[0].(protocol.Result) + if !ok { + return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("failed to forking invoke provider %v, but not legal resp", selected))} + } + return result +} diff --git a/cluster/cluster_impl/forking_cluster_test.go b/cluster/cluster_impl/forking_cluster_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8603f8aedc4e28a3a4ca2f115355debc1a5ecc62 --- /dev/null +++ b/cluster/cluster_impl/forking_cluster_test.go @@ -0,0 +1,162 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "strconv" + "sync" + "testing" + "time" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + forkingUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +func registerForking(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance(loadbalance.RoundRobin, loadbalance.NewRoundRobinLoadBalance) + + invokers := []protocol.Invoker{} + for i, ivk := range mockInvokers { + invokers = append(invokers, ivk) + if i == 0 { + ivk.EXPECT().GetUrl().Return(forkingUrl) + } + } + staticDir := directory.NewStaticDirectory(invokers) + + forkingCluster := NewForkingCluster() + clusterInvoker := forkingCluster.Join(staticDir) + return clusterInvoker +} + +func Test_ForkingInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + forkingUrl.AddParam(constant.FORKS_KEY, strconv.Itoa(3)) + //forkingUrl.AddParam(constant.TIMEOUT_KEY, strconv.Itoa(constant.DEFAULT_TIMEOUT)) + + var wg sync.WaitGroup + wg.Add(2) + for i := 0; i < 2; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + wg.Done() + return mockResult + }) + } + + clusterInvoker := registerForking(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) + wg.Wait() +} + +func Test_ForkingInvokeTimeout(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + forkingUrl.AddParam(constant.FORKS_KEY, strconv.Itoa(3)) + + var wg sync.WaitGroup + wg.Add(2) + for i := 0; i < 2; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + time.Sleep(2 * time.Second) + wg.Done() + return mockResult + }) + } + + clusterInvoker := registerForking(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.NotNil(t, result) + assert.NotNil(t, result.Error()) + wg.Wait() +} + +func Test_ForkingInvokeHalfTimeout(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + forkingUrl.AddParam(constant.FORKS_KEY, strconv.Itoa(3)) + + var wg sync.WaitGroup + wg.Add(2) + for i := 0; i < 2; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + if i == 1 { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + wg.Done() + return mockResult + }) + } else { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + time.Sleep(2 * time.Second) + wg.Done() + return mockResult + }) + } + } + + clusterInvoker := registerForking(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) + wg.Wait() +} diff --git a/common/constant/key.go b/common/constant/key.go index 4d2664baf9b311f6ed8aa8e295599d7e8e5846c4..32df8cde1ffe322ac8b61f150953cc8fe4e51445 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -48,6 +48,9 @@ const ( RETRIES_KEY = "retries" BEAN_NAME = "bean.name" FAIL_BACK_TASKS_KEY = "failbacktasks" + FORKS_KEY = "forks" + DEFAULT_FORKS = 2 + DEFAULT_TIMEOUT = 1000 ) const (