From 0039ac6f8345ead8ca676fc294acc5b7450e9d53 Mon Sep 17 00:00:00 2001 From: "vito.he" <hxmhlt@163.com> Date: Mon, 10 Jun 2019 16:29:59 +0800 Subject: [PATCH] Mod: DataListener move to remoting --- config_center/dynamic_configuration.go | 8 +++---- .../zookeeper/dynamic_configuration.go | 5 +++-- registry/directory/directory.go | 5 +++-- registry/directory/directory_test.go | 9 ++++---- registry/event.go | 3 ++- registry/zookeeper/listener.go | 13 ++++++------ .../listener.go | 2 +- remoting/zookeeper/listener.go | 21 ++++++++++--------- 8 files changed, 35 insertions(+), 31 deletions(-) rename common/configuration_listener.go => remoting/listener.go (99%) diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 57b04f744..436ac50d8 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -18,11 +18,9 @@ package config_center import ( + "github.com/apache/dubbo-go/remoting" "time" ) -import ( - "github.com/apache/dubbo-go/common" -) ////////////////////////////////////////// // DynamicConfiguration @@ -31,8 +29,8 @@ const DEFAULT_GROUP = "dubbo" const DEFAULT_CONFIG_TIMEOUT = "10s" type DynamicConfiguration interface { - AddListener(string, common.ConfigurationListener, ...Option) - RemoveListener(string, common.ConfigurationListener, ...Option) + AddListener(string, remoting.ConfigurationListener, ...Option) + RemoveListener(string, remoting.ConfigurationListener, ...Option) GetConfig(string, ...Option) string GetConfigs(string, ...Option) string } diff --git a/config_center/zookeeper/dynamic_configuration.go b/config_center/zookeeper/dynamic_configuration.go index b94f18a69..bd9f97785 100644 --- a/config_center/zookeeper/dynamic_configuration.go +++ b/config_center/zookeeper/dynamic_configuration.go @@ -18,6 +18,7 @@ package zookeeper import ( + "github.com/apache/dubbo-go/remoting" "sync" ) import ( @@ -61,11 +62,11 @@ func NewZookeeperDynamicConfiguration(url common.URL) (config_center.DynamicConf } -func (*ZookeeperDynamicConfiguration) AddListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) { +func (*ZookeeperDynamicConfiguration) AddListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { } -func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener common.ConfigurationListener, opions ...config_center.Option) { +func (*ZookeeperDynamicConfiguration) RemoveListener(key string, listener remoting.ConfigurationListener, opions ...config_center.Option) { } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 8163e204f..763a6149b 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -18,6 +18,7 @@ package directory import ( + "github.com/apache/dubbo-go/remoting" "sync" "time" ) @@ -130,10 +131,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { switch res.Action { - case common.Add: + case remoting.Add: //dir.cacheService.Add(res.Path, dir.serviceTTL) dir.cacheInvoker(res.Service) - case common.Del: + case remoting.Del: //dir.cacheService.Del(res.Path, dir.serviceTTL) dir.uncacheInvoker(res.Service) logger.Infof("selector delete service url{%s}", res.Service) diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index dc0cb71c9..99cf93a23 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -19,6 +19,7 @@ package directory import ( "context" + "github.com/apache/dubbo-go/remoting" "net/url" "strconv" "testing" @@ -50,7 +51,7 @@ func TestSubscribe_Delete(t *testing.T) { registryDirectory, mockRegistry := normalRegistryDir() time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 3) - mockRegistry.MockEvent(®istry.ServiceEvent{Action: common.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))}) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.Del, Service: *common.NewURLWithOptions("TEST0", common.WithProtocol("dubbo"))}) time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 2) } @@ -80,7 +81,7 @@ func TestSubscribe_Group(t *testing.T) { urlmap.Set(constant.GROUP_KEY, "group1") urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), common.WithParams(urlmap))}) } //for group2 @@ -88,7 +89,7 @@ func TestSubscribe_Group(t *testing.T) { urlmap2.Set(constant.GROUP_KEY, "group2") urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"), common.WithParams(urlmap2))}) } @@ -128,7 +129,7 @@ func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) { go registryDirectory.Subscribe(*common.NewURLWithOptions("testservice")) for i := 0; i < 3; i++ { - mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: common.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: remoting.Add, Service: *common.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), common.WithProtocol("dubbo"))}) } return registryDirectory, mockRegistry.(*registry.MockRegistry) } diff --git a/registry/event.go b/registry/event.go index 836070875..9d5778d9e 100644 --- a/registry/event.go +++ b/registry/event.go @@ -19,6 +19,7 @@ package registry import ( "fmt" + "github.com/apache/dubbo-go/remoting" "math/rand" "time" ) @@ -36,7 +37,7 @@ func init() { ////////////////////////////////////////// type ServiceEvent struct { - Action common.EventType + Action remoting.EventType Service common.URL } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 27fb24348..2cd1cfec8 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -19,6 +19,7 @@ package zookeeper import ( "context" + "github.com/apache/dubbo-go/remoting" ) import ( perrors "github.com/pkg/errors" @@ -42,7 +43,7 @@ func (l *RegistryDataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } -func (l *RegistryDataListener) DataChange(eventType common.Event) bool { +func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { serviceURL, err := common.NewURL(context.TODO(), eventType.Content) if err != nil { logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err) @@ -50,7 +51,7 @@ func (l *RegistryDataListener) DataChange(eventType common.Event) bool { } for _, v := range l.interestedURL { if serviceURL.URLEqual(*v) { - l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action}) + l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action}) return true } } @@ -61,14 +62,14 @@ func (l *RegistryDataListener) DataChange(eventType common.Event) bool { type RegistryConfigurationListener struct { client *zk.ZookeeperClient registry *zkRegistry - events chan *common.ConfigChangeEvent + events chan *remoting.ConfigChangeEvent } func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { reg.wg.Add(1) - return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *common.ConfigChangeEvent, 32)} + return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)} } -func (l *RegistryConfigurationListener) Process(configType *common.ConfigChangeEvent) { +func (l *RegistryConfigurationListener) Process(configType *remoting.ConfigChangeEvent) { l.events <- configType } @@ -85,7 +86,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) { case e := <-l.events: logger.Debugf("got zk event %s", e) - if e.ConfigType == common.Del && !l.valid() { + if e.ConfigType == remoting.Del && !l.valid() { logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) continue } diff --git a/common/configuration_listener.go b/remoting/listener.go similarity index 99% rename from common/configuration_listener.go rename to remoting/listener.go index ab8b8bb67..37f75d465 100644 --- a/common/configuration_listener.go +++ b/remoting/listener.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package remoting import "fmt" diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index ae364860d..2a6988842 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -18,6 +18,7 @@ package zookeeper import ( + "github.com/apache/dubbo-go/remoting" "path" "sync" "time" @@ -83,7 +84,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool { return false } -func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener common.DataListener) { +func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) { contains := func(s []string, e string) bool { for _, a := range s { if a == e { @@ -111,7 +112,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newNode = path.Join(zkPath, n) logger.Infof("add zkNode{%s}", newNode) - if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) { + if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) { continue } // listen l service node @@ -119,7 +120,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li logger.Infof("delete zkNode{%s}", node) if l.listenServiceNodeEvent(node) { logger.Infof("delete content{%s}", n) - listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n}) + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) }(newNode) @@ -134,7 +135,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li oldNode = path.Join(zkPath, n) logger.Warnf("delete zkPath{%s}", oldNode) - if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) { + if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: n}) { continue } logger.Warnf("delete content{%s}", n) @@ -142,11 +143,11 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err)) continue } - listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n}) + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: n}) } } -func (l *ZkEventListener) listenDirEvent(zkPath string, listener common.DataListener) { +func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) { l.wg.Add(1) defer l.wg.Done() @@ -216,7 +217,7 @@ func timeSecondDuration(sec int) time.Duration { // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | // --------> listenServiceNodeEvent -func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.DataListener) { +func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { var ( err error dubboPath string @@ -244,7 +245,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data } for _, c := range children { - if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: c}) { + if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Add, Content: c}) { continue } @@ -254,14 +255,14 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.Data go func(zkPath string, serviceURL common.URL) { if l.listenServiceNodeEvent(dubboPath) { logger.Debugf("delete serviceUrl{%s}", serviceURL) - listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: c}) + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.Del, Content: c}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) }(dubboPath, serviceURL) } logger.Infof("listen dubbo path{%s}", zkPath) - go func(zkPath string, listener common.DataListener) { + go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) }(zkPath, listener) -- GitLab