From e2634281192af2a3757948b7b6e4ab8597f12db9 Mon Sep 17 00:00:00 2001
From: xujianhai666 <zero.xu@bytedance.com>
Date: Tue, 6 Aug 2019 23:06:42 +0800
Subject: [PATCH] add broadcast cluster

---
 cluster/cluster_impl/broadcast_cluster.go     |  40 +++++++
 .../cluster_impl/broadcast_cluster_invoker.go |  59 ++++++++++
 .../broadcast_cluster_invoker_test.go         | 109 ++++++++++++++++++
 3 files changed, 208 insertions(+)
 create mode 100644 cluster/cluster_impl/broadcast_cluster.go
 create mode 100644 cluster/cluster_impl/broadcast_cluster_invoker.go
 create mode 100644 cluster/cluster_impl/broadcast_cluster_invoker_test.go

diff --git a/cluster/cluster_impl/broadcast_cluster.go b/cluster/cluster_impl/broadcast_cluster.go
new file mode 100644
index 000000000..50aae3cfa
--- /dev/null
+++ b/cluster/cluster_impl/broadcast_cluster.go
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cluster_impl
+
+import (
+	"github.com/apache/dubbo-go/cluster"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+type broadcastCluster struct{}
+
+const broadcast = "broadcast"
+
+func init() {
+	extension.SetCluster(broadcast, NewBroadcastCluster)
+}
+
+func NewBroadcastCluster() cluster.Cluster {
+	return &broadcastCluster{}
+}
+
+func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker {
+	return newBroadcastClusterInvoker(directory)
+}
diff --git a/cluster/cluster_impl/broadcast_cluster_invoker.go b/cluster/cluster_impl/broadcast_cluster_invoker.go
new file mode 100644
index 000000000..238df0acf
--- /dev/null
+++ b/cluster/cluster_impl/broadcast_cluster_invoker.go
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cluster_impl
+
+import (
+	"github.com/apache/dubbo-go/cluster"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/protocol"
+)
+
+type broadcastClusterInvoker struct {
+	baseClusterInvoker
+}
+
+func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker {
+	return &broadcastClusterInvoker{
+		baseClusterInvoker: newBaseClusterInvoker(directory),
+	}
+}
+
+func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
+	invokers := invoker.directory.List(invocation)
+	err := invoker.checkInvokers(invokers, invocation)
+	if err != nil {
+		return &protocol.RPCResult{Err: err}
+	}
+	err = invoker.checkWhetherDestroyed()
+	if err != nil {
+		return &protocol.RPCResult{Err: err}
+	}
+
+	var result protocol.Result
+	for _, ivk := range invokers {
+		result = ivk.Invoke(invocation)
+		if result.Error() != nil {
+			logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk)
+			err = result.Error()
+		}
+	}
+	if err != nil {
+		return &protocol.RPCResult{Err: err}
+	}
+	return result
+}
diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster_impl/broadcast_cluster_invoker_test.go
new file mode 100644
index 000000000..565684a8a
--- /dev/null
+++ b/cluster/cluster_impl/broadcast_cluster_invoker_test.go
@@ -0,0 +1,109 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package cluster_impl
+
+import (
+	"context"
+	"errors"
+	"testing"
+)
+
+import (
+	"github.com/golang/mock/gomock"
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"github.com/apache/dubbo-go/cluster/directory"
+	"github.com/apache/dubbo-go/cluster/loadbalance"
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/protocol"
+	"github.com/apache/dubbo-go/protocol/invocation"
+	"github.com/apache/dubbo-go/protocol/mock"
+)
+
+var (
+	broadcastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
+)
+
+func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker {
+	extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
+
+	invokers := []protocol.Invoker{}
+	for i, ivk := range mockInvokers {
+		invokers = append(invokers, ivk)
+		if i == 0 {
+			ivk.EXPECT().GetUrl().Return(broadcastUrl)
+		}
+	}
+	staticDir := directory.NewStaticDirectory(invokers)
+
+	broadcastCluster := NewBroadcastCluster()
+	clusterInvoker := broadcastCluster.Join(staticDir)
+	return clusterInvoker
+}
+
+func Test_BroadcastInvokeSuccess(t *testing.T) {
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+
+	invokers := make([]*mock.MockInvoker, 0)
+
+	mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
+	for i := 0; i < 3; i++ {
+		invoker := mock.NewMockInvoker(ctrl)
+		invokers = append(invokers, invoker)
+		invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+	}
+
+	clusterInvoker := registerBroadcast(t, invokers...)
+
+	result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
+	assert.Equal(t, mockResult, result)
+}
+
+func Test_BroadcastInvokeFailed(t *testing.T) {
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+
+	invokers := make([]*mock.MockInvoker, 0)
+
+	mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
+	mockFailedResult := &protocol.RPCResult{Err: errors.New("just failed")}
+	for i := 0; i < 10; i++ {
+		invoker := mock.NewMockInvoker(ctrl)
+		invokers = append(invokers, invoker)
+		invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+	}
+	{
+		invoker := mock.NewMockInvoker(ctrl)
+		invokers = append(invokers, invoker)
+		invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
+	}
+	for i := 0; i < 10; i++ {
+		invoker := mock.NewMockInvoker(ctrl)
+		invokers = append(invokers, invoker)
+		invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
+	}
+
+	clusterInvoker := registerBroadcast(t, invokers...)
+
+	result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
+	assert.Equal(t, mockFailedResult.Err, result.Error())
+}
-- 
GitLab