From 5601b2cb664c5023e73b69f519e7151ad88529d1 Mon Sep 17 00:00:00 2001
From: "vito.he" <hxmhlt@163.com>
Date: Mon, 10 Jun 2019 16:29:59 +0800
Subject: [PATCH] Mod: DataListener move to remoting

---
 config_center/dynamic_configuration.go        |  8 +++----
 .../zookeeper/dynamic_configuration.go        |  5 +++--
 registry/directory/directory.go               |  5 +++--
 registry/directory/directory_test.go          |  9 ++++----
 registry/event.go                             |  3 ++-
 registry/zookeeper/listener.go                | 13 ++++++------
 .../listener.go                               |  2 +-
 remoting/zookeeper/listener.go                | 21 ++++++++++---------
 8 files changed, 35 insertions(+), 31 deletions(-)
 rename common/configuration_listener.go => remoting/listener.go (99%)

diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go
index 57b04f744..436ac50d8 100644
--- a/config_center/dynamic_configuration.go
+++ b/config_center/dynamic_configuration.go
@@ -18,11 +18,9 @@
 package config_center
 
 import (
+	"github.com/apache/dubbo-go/remoting"
 	"time"
 )
-import (
-	"github.com/apache/dubbo-go/common"
-)
 
 //////////////////////////////////////////
 // DynamicConfiguration
@@ -31,8 +29,8 @@ const DEFAULT_GROUP = "dubbo"
 const DEFAULT_CONFIG_TIMEOUT = "10s"
 
 type DynamicConfiguration interface {
-	AddListener(string, common.ConfigurationListener, ...Option)
-	RemoveListener(string, common.ConfigurationListener, ...Option)
+	AddListener(string, remoting.ConfigurationListener, ...Option)
+	RemoveListener(string, remoting.ConfigurationListener, ...Option)
 	GetConfig(string, ...Option) string
 	GetConfigs(string, ...Option) string
 }
diff --git a/config_center/zookeeper/dynamic_configuration.go b/config_center/zookeeper/dynamic_configuration.go
index b94f18a69..bd9f97785 100644
--- a/config_center/zookeeper/dynamic_configuration.go
+++ b/config_center/zookeeper/dynamic_configuration.go
@@ -18,6 +18,7 @@
 package zookeeper
 
 import (
+	"github.com/apache/dubbo-go/remoting"
 	"sync"
 )
 import (
@@ -61,11 +62,11 @@ func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConf
 
 }
 
-func (*ZookeeperDynamicConfiguration) AddListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
+func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
 
 }
 
-func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) {
+func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) {
 
 }
 
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 8163e204f..763a6149b 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -18,6 +18,7 @@
 package directory
 
 import (
+	"github.com/apache/dubbo-go/remoting"
 	"sync"
 	"time"
 )
@@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
 func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
 
 	switch res.Action {
-	case common.Add:
+	case remoting.Add:
 		//dir.cacheService.Add(res.Path, dir.serviceTTL)
 		dir.cacheInvoker(res.Service)
-	case common.Del:
+	case remoting.Del:
 		//dir.cacheService.Del(res.Path, dir.serviceTTL)
 		dir.uncacheInvoker(res.Service)
 		logger.Infof("selector delete service url{%s}", res.Service)
diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go
index dc0cb71c9..99cf93a23 100644
--- a/registry/directory/directory_test.go
+++ b/registry/directory/directory_test.go
@@ -19,6 +19,7 @@ package directory
 
 import (
 	"context"
+	"github.com/apache/dubbo-go/remoting"
 	"net/url"
 	"strconv"
 	"testing"
@@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) {
 	registryDirectory, mockRegistry := normalRegistryDir()
 	time.Sleep(1e9)
 	assert.Len(t, registryDirectory.cacheInvokers, 3)
-	mockRegistry.MockEvent(&registry.ServiceEvent{Action: common.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
+	mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))})
 	time.Sleep(1e9)
 	assert.Len(t, registryDirectory.cacheInvokers, 2)
 }
@@ -80,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) {
 	urlmap.Set(constant.GROUP_KEY, "group1")
 	urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
 	for i := 0; i < 3; i++ {
-		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
+		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
 			common.WithParams(urlmap))})
 	}
 	//for group2
@@ -88,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) {
 	urlmap2.Set(constant.GROUP_KEY, "group2")
 	urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
 	for i := 0; i < 3; i++ {
-		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
+		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"),
 			common.WithParams(urlmap2))})
 	}
 
@@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
 
 	go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice"))
 	for i := 0; i < 3; i++ {
-		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
+		mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))})
 	}
 	return registryDirectory, mockRegistry.(*registry.MockRegistry)
 }
diff --git a/registry/event.go b/registry/event.go
index 836070875..9d5778d9e 100644
--- a/registry/event.go
+++ b/registry/event.go
@@ -19,6 +19,7 @@ package registry
 
 import (
 	"fmt"
+	"github.com/apache/dubbo-go/remoting"
 	"math/rand"
 	"time"
 )
@@ -36,7 +37,7 @@ func init() {
 //////////////////////////////////////////
 
 type ServiceEvent struct {
-	Action  common.EventType
+	Action  remoting.EventType
 	Service common.URL
 }
 
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 27fb24348..2cd1cfec8 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -19,6 +19,7 @@ package zookeeper
 
 import (
 	"context"
+	"github.com/apache/dubbo-go/remoting"
 )
 import (
 	perrors "github.com/pkg/errors"
@@ -42,7 +43,7 @@ func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
 	l.interestedURL = append(l.interestedURL, url)
 }
 
-func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
+func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
 	serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
 	if err != nil {
 		logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
@@ -50,7 +51,7 @@ func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
 	}
 	for _, v := range l.interestedURL {
 		if serviceURL.URLEqual(*v) {
-			l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
+			l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
 			return true
 		}
 	}
@@ -61,14 +62,14 @@ func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
 type RegistryConfigurationListener struct {
 	client   *zk.ZookeeperClient
 	registry *zkRegistry
-	events   chan *common.ConfigChangeEvent
+	events   chan *remoting.ConfigChangeEvent
 }
 
 func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
 	reg.wg.Add(1)
-	return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *common.ConfigChangeEvent, 32)}
+	return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
 }
-func (l *RegistryConfigurationListener) Process(configType *common.ConfigChangeEvent) {
+func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) {
 	l.events <- configType
 }
 
@@ -85,7 +86,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
 
 		case e := <-l.events:
 			logger.Debugf("got zk event %s", e)
-			if e.ConfigType == common.Del && !l.valid() {
+			if e.ConfigType == remoting.Del && !l.valid() {
 				logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
 				continue
 			}
diff --git a/common/configuration_listener.go b/remoting/listener.go
similarity index 99%
rename from common/configuration_listener.go
rename to remoting/listener.go
index ab8b8bb67..37f75d465 100644
--- a/common/configuration_listener.go
+++ b/remoting/listener.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package common
+package remoting
 
 import "fmt"
 
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index ae364860d..2a6988842 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
 package zookeeper
 
 import (
+	"github.com/apache/dubbo-go/remoting"
 	"path"
 	"sync"
 	"time"
@@ -83,7 +84,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
 	return false
 }
 
-func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener common.DataListener) {
+func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) {
 	contains := func(s []string, e string) bool {
 		for _, a := range s {
 			if a == e {
@@ -111,7 +112,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 
 		newNode = path.Join(zkPath, n)
 		logger.Infof("add zkNode{%s}", newNode)
-		if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
+		if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
 			continue
 		}
 		// listen l service node
@@ -119,7 +120,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 			logger.Infof("delete zkNode{%s}", node)
 			if l.listenServiceNodeEvent(node) {
 				logger.Infof("delete content{%s}", n)
-				listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
+				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
 			}
 			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
 		}(newNode)
@@ -134,7 +135,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 
 		oldNode = path.Join(zkPath, n)
 		logger.Warnf("delete zkPath{%s}", oldNode)
-		if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
+		if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) {
 			continue
 		}
 		logger.Warnf("delete content{%s}", n)
@@ -142,11 +143,11 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 			logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
 			continue
 		}
-		listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
+		listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n})
 	}
 }
 
-func (l *ZkEventListener) listenDirEvent(zkPath string, listener common.DataListener) {
+func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
 	l.wg.Add(1)
 	defer l.wg.Done()
 
@@ -216,7 +217,7 @@ func timeSecondDuration(sec int) time.Duration {
 // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
 //                            |
 //                            --------> listenServiceNodeEvent
-func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.DataListener) {
+func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
 	var (
 		err        error
 		dubboPath  string
@@ -244,7 +245,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data
 	}
 
 	for _, c := range children {
-		if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: c}) {
+		if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) {
 			continue
 		}
 
@@ -254,14 +255,14 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data
 		go func(zkPath string, serviceURL common.URL) {
 			if l.listenServiceNodeEvent(dubboPath) {
 				logger.Debugf("delete serviceUrl{%s}", serviceURL)
-				listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: c})
+				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c})
 			}
 			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
 		}(dubboPath, serviceURL)
 	}
 
 	logger.Infof("listen dubbo path{%s}", zkPath)
-	go func(zkPath string, listener common.DataListener) {
+	go func(zkPath string, listener remoting.DataListener) {
 		l.listenDirEvent(zkPath, listener)
 		logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
 	}(zkPath, listener)
-- 
GitLab