Skip to content
Snippets Groups Projects
Commit e016b8c2 authored by vito.he's avatar vito.he
Browse files

Mod: DataListener move to common

parent 764e0940
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,10 @@ type ConfigurationListener interface {
Process(*ConfigChangeEvent)
}
type DataListener interface {
DataChange(eventType Event) bool //bool is return for interface implement is interesting
}
type ConfigChangeEvent struct {
Key string
Value interface{}
......
......@@ -2,7 +2,7 @@ module github.com/apache/dubbo-go
require (
github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af
github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb // indirect
github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb
github.com/pkg/errors v0.8.1
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/stretchr/testify v1.3.0
......
......@@ -3,6 +3,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af h1:vvXNXyq5uIlf+KlTduhRKY4hBBBjgCUNreT1yIfKftw=
github.com/dubbogo/getty v0.0.0-20190607120257-8b0e100a88af/go.mod h1:cRMSuoCmwc5lULFFnYZTxyCfZhObmRTNbS7XRnPNHSo=
github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb h1:oN6hFLXbT/iDUO8qE4NZtvh89F/7VoAQ1LDxHJdmEH4=
github.com/dubbogo/hessian2 v0.0.0-20190607144249-afb8cbfad2cb/go.mod h1:XFGDn4oSZX26zkcfhkM/fCJrOqwQJxk/xgWW1KMJBKM=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
......
......@@ -42,15 +42,15 @@ func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
func (l *RegistryDataListener) DataChange(eventType zk.ZkEvent) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Res.Content)
func (l *RegistryDataListener) DataChange(eventType common.Event) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Res.Content, err)
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Res.Action})
l.listener.Process(&common.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
return true
}
}
......
......@@ -18,7 +18,6 @@
package zookeeper
import (
"fmt"
"path"
"sync"
"time"
......@@ -34,19 +33,6 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
type DataListener interface {
DataChange(eventType ZkEvent) bool //bool is return for interface implement is interesting
}
type ZkEvent struct {
Res *common.Event
err error
}
func (e ZkEvent) String() string {
return fmt.Sprintf("err:%s, res:%s", e.err, e.Res)
}
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
......@@ -97,7 +83,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string) bool {
return false
}
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener DataListener) {
func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener common.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
......@@ -125,7 +111,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode = path.Join(zkPath, n)
logger.Infof("add zkNode{%s}", newNode)
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) {
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
continue
}
// listen l service node
......@@ -133,7 +119,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Infof("delete zkNode{%s}", node)
if l.listenServiceNodeEvent(node) {
logger.Infof("delete content{%s}", n)
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil})
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode)
......@@ -148,7 +134,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: n}, nil}) {
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: n}) {
continue
}
logger.Warnf("delete content{%s}", n)
......@@ -156,11 +142,11 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: n}, nil})
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: n})
}
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener DataListener) {
func (l *ZkEventListener) listenDirEvent(zkPath string, listener common.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
......@@ -230,7 +216,7 @@ func timeSecondDuration(sec int) time.Duration {
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
// |
// --------> listenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener DataListener) {
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener common.DataListener) {
var (
err error
dubboPath string
......@@ -258,7 +244,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener DataListene
}
for _, c := range children {
if !listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Add, Content: c}, nil}) {
if !listener.DataChange(common.Event{Path: zkPath, Action: common.Add, Content: c}) {
continue
}
......@@ -268,14 +254,14 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener DataListene
go func(zkPath string, serviceURL common.URL) {
if l.listenServiceNodeEvent(dubboPath) {
logger.Debugf("delete serviceUrl{%s}", serviceURL)
listener.DataChange(ZkEvent{&common.Event{Path: zkPath, Action: common.Del, Content: c}, nil})
listener.DataChange(common.Event{Path: zkPath, Action: common.Del, Content: c})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, serviceURL)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener DataListener) {
go func(zkPath string, listener common.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment