diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index 687f21c854ea74d98a5a36698ee8afa6f237f87e..1e827340dd9a31d0abae688c1761ec088f7011fb 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -66,6 +66,7 @@ type zookeeperServiceDiscovery struct { url *common.URL wg sync.WaitGroup cltLock sync.Mutex + listenLock sync.Mutex done chan struct{} rootPath string listenNames []string @@ -239,6 +240,8 @@ func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string } func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + zksd.listenLock.Lock() + defer zksd.listenLock.Unlock() zksd.listenNames = append(zksd.listenNames, listener.ServiceName) zksd.csd.ListenServiceEvent(listener.ServiceName, zksd) return nil @@ -258,8 +261,9 @@ func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInst } func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool { - path := eventType.Path - name := strings.Split(path, "/")[2] + path := strings.TrimPrefix(eventType.Path, zksd.rootPath) + path = strings.TrimPrefix(eventType.Path, constant.PATH_SEPARATOR) + name := strings.Split(path, constant.PATH_SEPARATOR)[0] err := zksd.DispatchEventByServiceName(name) if err != nil { logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", name, err) diff --git a/registry/zookeeper/service_discovery_test.go b/registry/zookeeper/service_discovery_test.go index 2f1639ad1e87fd842f2370e26d0eff245665b21f..a90d1e0ee15e6bbd8b36eda915fb131a3ef4497c 100644 --- a/registry/zookeeper/service_discovery_test.go +++ b/registry/zookeeper/service_discovery_test.go @@ -89,6 +89,16 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) { assert.Nil(t, err) tests := sd.GetInstances(testName) assert.Equal(t, tests[0].GetId(), "127.0.0.1:2233") + err = sd.Update(®istry.DefaultServiceInstance{ + Id: "testId", + ServiceName: testName, + Host: "127.0.0.1", + Port: 2233, + Enable: true, + Healthy: true, + Metadata: nil, + }) + assert.Nil(t, err) err = sd.Unregister(®istry.DefaultServiceInstance{ Id: "testId", ServiceName: testName, diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index e3cb74e4f676efa1f325ac45e32b21b39d1bbd6a..c65c4d127c691f5f8e30cae5b612e1a9920e7887 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -49,7 +49,7 @@ func NewEventListener(client *Client) *EventListener { } } -// ListenServiceNodeEvent Listen on a spec key +// listenServiceNodeEvent Listen on a spec key // this method will return true when spec key deleted, // this method will return false when deep layer connection lose func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { @@ -180,9 +180,9 @@ func timeSecondDuration(sec int) time.Duration { } // ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener -// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent +// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | -// --------> ListenServiceNodeEvent +// --------> listenServiceNodeEvent func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { l.keyMapLock.Lock() diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go index 2301a21ae7d678a9defb60a0469df1eb7e74790f..19df0711254a5fef30de329cc057913b5956729f 100644 --- a/remoting/zookeeper/curator_discovery/service_discovery.go +++ b/remoting/zookeeper/curator_discovery/service_discovery.go @@ -19,6 +19,7 @@ package curator_discovery import ( "encoding/json" + "path" "strings" "sync" ) @@ -34,6 +35,11 @@ import ( "github.com/apache/dubbo-go/remoting/zookeeper" ) +type Entry struct { + sync.Mutex + instance *ServiceInstance +} + type ServiceDiscovery struct { client *zookeeper.ZookeeperClient mutex *sync.Mutex @@ -66,7 +72,14 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error { } func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error { - _, loaded := sd.services.LoadOrStore(instance.Id, instance) + value, loaded := sd.services.LoadOrStore(instance.Id, &Entry{}) + entry, ok := value.(*Entry) + if !ok { + return perrors.New("[ServiceDiscovery] services value not entry") + } + entry.Lock() + defer entry.Unlock() + entry.instance = instance err := sd.registerService(instance) if err != nil { return err @@ -78,7 +91,17 @@ func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error { } func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error { - sd.services.Store(instance.Id, instance) + value, ok := sd.services.Load(instance.Id) + if !ok { + return perrors.Errorf("[ServiceDiscovery] Service{%s} not registered", instance.Id) + } + entry, ok := value.(*Entry) + if !ok { + return perrors.New("[ServiceDiscovery] services value not entry") + } + entry.Lock() + defer entry.Unlock() + entry.instance = instance path := sd.pathForInstance(instance.Name, instance.Id) data, err := json.Marshal(instance) if err != nil { @@ -92,7 +115,11 @@ func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error { } func (sd *ServiceDiscovery) updateInternalService(name, id string) { - _, ok := sd.services.Load(id) + value, ok := sd.services.Load(id) + if !ok { + return + } + entry, ok := value.(*Entry) if !ok { return } @@ -101,11 +128,23 @@ func (sd *ServiceDiscovery) updateInternalService(name, id string) { logger.Infof("[zkServiceDiscovery] UpdateInternalService{%s} error = err{%v}", id, err) return } - sd.services.Store(instance.Id, instance) + entry.Lock() + entry.instance = instance + entry.Unlock() return } func (sd *ServiceDiscovery) UnregisterService(instance *ServiceInstance) error { + value, ok := sd.services.Load(instance.Id) + if !ok { + return nil + } + entry, ok := value.(*Entry) + if !ok { + return perrors.New("[ServiceDiscovery] services value not entry") + } + entry.Lock() + entry.Unlock() sd.services.Delete(instance.Id) return sd.unregisterService(instance) } @@ -117,13 +156,17 @@ func (sd *ServiceDiscovery) unregisterService(instance *ServiceInstance) error { func (sd *ServiceDiscovery) ReRegisterService() { sd.services.Range(func(key, value interface{}) bool { - instance, ok := value.(*ServiceInstance) + entry, ok := value.(*Entry) if !ok { - + return true } + entry.Lock() + instance := entry.instance + entry.Unlock() err := sd.registerService(instance) if err != nil { logger.Errorf("[zkServiceDiscovery] registerService{%s} error = err{%v}", instance.Id, perrors.WithStack(err)) + return true } sd.ListenServiceInstanceEvent(instance.Name, instance.Id, sd) return true @@ -172,27 +215,36 @@ func (sd *ServiceDiscovery) ListenServiceEvent(name string, listener remoting.Da } func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) { - sd.listener.ListenServiceEvent(nil, sd.pathForInstance(name, id), listener) + sd.listener.ListenServiceNodeEvent(sd.pathForInstance(name, id), listener) } func (sd *ServiceDiscovery) DataChange(eventType remoting.Event) bool { path := eventType.Path - name, id := sd.getNameAndId(path) + name, id, err := sd.getNameAndId(path) + if err != nil { + logger.Errorf("[ServiceDiscovery] data change error = {%v}", err) + return true + } sd.updateInternalService(name, id) return true } -func (sd *ServiceDiscovery) getNameAndId(path string) (string, string) { +func (sd *ServiceDiscovery) getNameAndId(path string) (string, string, error) { + path = strings.TrimPrefix(path, sd.basePath) + path = strings.TrimPrefix(path, constant.PATH_SEPARATOR) pathSlice := strings.Split(path, constant.PATH_SEPARATOR) - name := pathSlice[2] - id := pathSlice[3] - return name, id + if len(pathSlice) < 2 { + return "", "", perrors.Errorf("[ServiceDiscovery] path{%s} dont contain name and id", path) + } + name := pathSlice[0] + id := pathSlice[1] + return name, id, nil } func (sd *ServiceDiscovery) pathForInstance(name, id string) string { - return sd.basePath + constant.PATH_SEPARATOR + name + constant.PATH_SEPARATOR + id + return path.Join(sd.basePath, name, id) } func (sd *ServiceDiscovery) pathForName(name string) string { - return sd.basePath + constant.PATH_SEPARATOR + name + return path.Join(sd.basePath, name) } diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index f9d57ba5c2276181bb551e8b8499d850b87d041a..fd7f0db3192b97cd225812ea6d76963f190cc21c 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -58,8 +58,20 @@ func (l *ZkEventListener) SetClient(client *ZookeeperClient) { l.client = client } -// ListenServiceNodeEvent ... -func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { +// ListenServiceNodeEvent listen a path node event +func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) { + // listen l service node + l.wg.Add(1) + go func(zkPath string, listener remoting.DataListener) { + if l.listenServiceNodeEvent(zkPath, listener) { + listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) + } + logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) + }(zkPath, listener) +} + +// listenServiceNodeEvent ... +func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { defer l.wg.Done() var zkEvent zk.Event for { @@ -146,7 +158,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li l.wg.Add(1) go func(node string, zkPath string, listener remoting.DataListener) { logger.Infof("delete zkNode{%s}", node) - if l.ListenServiceNodeEvent(node, listener) { + if l.listenServiceNodeEvent(node, listener) { logger.Infof("delete content{%s}", node) listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) } @@ -259,7 +271,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen logger.Infof("listen dubbo service key{%s}", dubboPath) l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { - if l.ListenServiceNodeEvent(zkPath) { + if l.listenServiceNodeEvent(zkPath) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) @@ -296,9 +308,9 @@ func timeSecondDuration(sec int) time.Duration { } // ListenServiceEvent is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener -// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent +// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent // | -// --------> ListenServiceNodeEvent +// --------> listenServiceNodeEvent func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, listener remoting.DataListener) { logger.Infof("listen dubbo path{%s}", zkPath) l.wg.Add(1)