diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index 5daf3e1a2c8d66a87f940d83745908a28ad361fa..687f21c854ea74d98a5a36698ee8afa6f237f87e 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -19,6 +19,7 @@ package zookeeper import ( "fmt" + "net/url" "strconv" "strings" "sync" @@ -59,14 +60,15 @@ func init() { } type zookeeperServiceDiscovery struct { - client *zookeeper.ZookeeperClient - csd *curator_discovery.ServiceDiscovery - listener *zookeeper.ZkEventListener - url *common.URL - wg sync.WaitGroup - cltLock sync.Mutex - done chan struct{} - rootPath string + client *zookeeper.ZookeeperClient + csd *curator_discovery.ServiceDiscovery + listener *zookeeper.ZkEventListener + url *common.URL + wg sync.WaitGroup + cltLock sync.Mutex + done chan struct{} + rootPath string + listenNames []string } func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) { @@ -92,86 +94,87 @@ func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error if !ok { return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) } - basePath := remoteConfig.GetParam("rootPath", "/services") + rootPath := remoteConfig.GetParam("rootPath", "/services") url := common.NewURLWithOptions( - common.WithLocation(remoteConfig.Address), + common.WithParams(make(url.Values)), common.WithPassword(remoteConfig.Password), common.WithUsername(remoteConfig.Username), common.WithParamsValue(constant.REGISTRY_TIMEOUT_KEY, remoteConfig.TimeoutStr)) + url.Location = remoteConfig.Address zksd := &zookeeperServiceDiscovery{ url: url, - rootPath: basePath, + rootPath: rootPath, } err := zookeeper.ValidateZookeeperClient(zksd, zookeeper.WithZkName(ServiceDiscoveryZkClient)) if err != nil { return nil, err } go zookeeper.HandleClientRestart(zksd) - zksd.listener = zookeeper.NewZkEventListener(zksd.client) - zksd.listener.ListenServiceEvent(nil, basePath, zksd) - zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, basePath) + zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath) return zksd, nil } -func (zksd zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient { +func (zksd *zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient { return zksd.client } -func (zksd zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) { +func (zksd *zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) { zksd.client = client } -func (zksd zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex { +func (zksd *zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex { return &zksd.cltLock } -func (zksd zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup { +func (zksd *zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup { return &zksd.wg } -func (zksd zookeeperServiceDiscovery) Done() chan struct{} { +func (zksd *zookeeperServiceDiscovery) Done() chan struct{} { return zksd.done } -func (zksd zookeeperServiceDiscovery) RestartCallBack() bool { +func (zksd *zookeeperServiceDiscovery) RestartCallBack() bool { zksd.csd.ReRegisterService() - zksd.listener.ListenServiceEvent(nil, zksd.rootPath, zksd) + for _, name := range zksd.listenNames { + zksd.csd.ListenServiceEvent(name, zksd) + } return true } -func (zksd zookeeperServiceDiscovery) GetUrl() common.URL { +func (zksd *zookeeperServiceDiscovery) GetUrl() common.URL { return *zksd.url } -func (zksd zookeeperServiceDiscovery) String() string { +func (zksd *zookeeperServiceDiscovery) String() string { return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url) } -func (zksd zookeeperServiceDiscovery) Destroy() error { +func (zksd *zookeeperServiceDiscovery) Destroy() error { zksd.client.Close() return nil } -func (zksd zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error { +func (zksd *zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error { cris := zksd.toCuratorInstance(instance) return zksd.csd.RegisterService(cris) } -func (zksd zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error { +func (zksd *zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error { cris := zksd.toCuratorInstance(instance) return zksd.csd.UpdateService(cris) } -func (zksd zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error { +func (zksd *zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error { cris := zksd.toCuratorInstance(instance) return zksd.csd.UnregisterService(cris) } -func (zksd zookeeperServiceDiscovery) GetDefaultPageSize() int { +func (zksd *zookeeperServiceDiscovery) GetDefaultPageSize() int { return registry.DefaultPageSize } -func (zksd zookeeperServiceDiscovery) GetServices() *gxset.HashSet { +func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet { services, err := zksd.csd.QueryForNames() res := gxset.NewSet() if err != nil { @@ -184,7 +187,7 @@ func (zksd zookeeperServiceDiscovery) GetServices() *gxset.HashSet { return res } -func (zksd zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { +func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { criss, err := zksd.csd.QueryForInstances(serviceName) if err != nil { logger.Errorf("[zkServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ", @@ -198,7 +201,7 @@ func (zksd zookeeperServiceDiscovery) GetInstances(serviceName string) []registr return iss } -func (zksd zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { +func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { all := zksd.GetInstances(serviceName) res := make([]interface{}, 0, pageSize) // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance @@ -208,11 +211,26 @@ func (zksd zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, off return gxpage.New(offset, pageSize, res, len(all)) } -func (zksd zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, _ bool) gxpage.Pager { - return zksd.GetInstancesByPage(serviceName, offset, pageSize) +func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + all := zksd.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance + var ( + i = offset + count = 0 + ) + for i < len(all) && count < pageSize { + ins := all[i] + if ins.IsHealthy() == healthy { + res = append(res, all[i]) + count++ + } + i++ + } + return gxpage.New(offset, pageSize, res, len(all)) } -func (zksd zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { +func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { res := make(map[string]gxpage.Pager, len(serviceNames)) for _, name := range serviceNames { res[name] = zksd.GetInstancesByPage(name, offset, requestedSize) @@ -220,28 +238,28 @@ func (zksd zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, return res } -func (zksd zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { +func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + zksd.listenNames = append(zksd.listenNames, listener.ServiceName) + zksd.csd.ListenServiceEvent(listener.ServiceName, zksd) return nil } -func (zksd zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error { +func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error { return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName)) } -func (zksd zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { +func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) } -func (zksd zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { +func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { extension.GetGlobalDispatcher().Dispatch(event) return nil } -func (zksd zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool { +func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool { path := eventType.Path - name := strings.Split(path, "/")[1] - id := strings.Split(path, "/")[2] - zksd.csd.UpdateInternalService(name, id) + name := strings.Split(path, "/")[2] err := zksd.DispatchEventByServiceName(name) if err != nil { logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", name, err) @@ -249,7 +267,7 @@ func (zksd zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool return true } -func (zksd zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance { +func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance { id := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort()) pl := make(map[string]interface{}) pl["id"] = id @@ -266,7 +284,7 @@ func (zksd zookeeperServiceDiscovery) toCuratorInstance(instance registry.Servic return cuis } -func (zksd zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance { +func (zksd *zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance { pl, ok := cris.Payload.(map[string]interface{}) if !ok { logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} payload is not map", cris.Id) diff --git a/registry/zookeeper/service_discovery_test.go b/registry/zookeeper/service_discovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..586b85ec9f2724647443a560c9f21d80432c2f6b --- /dev/null +++ b/registry/zookeeper/service_discovery_test.go @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" + "github.com/dubbogo/go-zookeeper/zk" + "github.com/stretchr/testify/assert" + "strconv" + "testing" +) + +var testName = "test" + +func prepareData(t *testing.T) *zk.TestCluster { + ts, err := zk.StartTestCluster(1, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, ts.Servers[0]) + address := "127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port) + + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "zookeeper", + RemoteRef: "test", + } + + config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ + Address: address, + TimeoutStr: "10s", + } + return ts +} + +func TestNewZookeeperServiceDiscovery(t *testing.T) { + name := "zookeeper1" + _, err := newZookeeperServiceDiscovery(name) + + // the ServiceDiscoveryConfig not found + assert.NotNil(t, err) + + sdc := &config.ServiceDiscoveryConfig{ + Protocol: "zookeeper", + RemoteRef: "mock", + } + config.GetBaseConfig().ServiceDiscoveries[name] = sdc + _, err = newZookeeperServiceDiscovery(name) + + // RemoteConfig not found + assert.NotNil(t, err) +} + +func TestCURDZookeeperServiceDiscovery(t *testing.T) { + ts := prepareData(t) + defer ts.Stop() + sd, err := newZookeeperServiceDiscovery(testName) + assert.Nil(t, err) + md := make(map[string]string) + md["t1"] = "test1" + err = sd.Register(®istry.DefaultServiceInstance{ + Id: "testId", + ServiceName: testName, + Host: "127.0.0.1", + Port: 2233, + Enable: true, + Healthy: true, + Metadata: md, + }) + assert.Nil(t, err) + tests := sd.GetInstances(testName) + assert.Equal(t, tests[0].GetId(), "127.0.0.1:2233") + err = sd.Unregister(®istry.DefaultServiceInstance{ + Id: "testId", + ServiceName: testName, + Host: "127.0.0.1", + Port: 2233, + Enable: true, + Healthy: true, + Metadata: nil, + }) + assert.Nil(t, err) +} diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go index e300af122fbe9f3bbee8e1911f0268cccef96261..2301a21ae7d678a9defb60a0469df1eb7e74790f 100644 --- a/remoting/zookeeper/curator_discovery/service_discovery.go +++ b/remoting/zookeeper/curator_discovery/service_discovery.go @@ -19,10 +19,19 @@ package curator_discovery import ( "encoding/json" + "strings" + "sync" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting/zookeeper" - perrors "github.com/pkg/errors" - "sync" ) type ServiceDiscovery struct { @@ -30,6 +39,7 @@ type ServiceDiscovery struct { mutex *sync.Mutex basePath string services *sync.Map + listener *zookeeper.ZkEventListener } func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *ServiceDiscovery { @@ -38,11 +48,12 @@ func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *Se mutex: &sync.Mutex{}, basePath: basePath, services: &sync.Map{}, + listener: zookeeper.NewZkEventListener(client), } } func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error { - path := sd.basePath + "/" + instance.Name + "/" + instance.Id + path := sd.pathForInstance(instance.Name, instance.Id) data, err := json.Marshal(instance) if err != nil { return err @@ -55,13 +66,20 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error { } func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error { - sd.services.Store(instance.Id, instance) - return sd.registerService(instance) + _, loaded := sd.services.LoadOrStore(instance.Id, instance) + err := sd.registerService(instance) + if err != nil { + return err + } + if !loaded { + sd.ListenServiceInstanceEvent(instance.Name, instance.Id, sd) + } + return nil } func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error { sd.services.Store(instance.Id, instance) - path := sd.basePath + "/" + instance.Name + "/" + instance.Id + path := sd.pathForInstance(instance.Name, instance.Id) data, err := json.Marshal(instance) if err != nil { return err @@ -73,7 +91,7 @@ func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error { return nil } -func (sd *ServiceDiscovery) UpdateInternalService(name, id string) { +func (sd *ServiceDiscovery) updateInternalService(name, id string) { _, ok := sd.services.Load(id) if !ok { return @@ -93,7 +111,7 @@ func (sd *ServiceDiscovery) UnregisterService(instance *ServiceInstance) error { } func (sd *ServiceDiscovery) unregisterService(instance *ServiceInstance) error { - path := sd.basePath + "/" + instance.Name + "/" + instance.Id + path := sd.pathForInstance(instance.Name, instance.Id) return sd.client.Delete(path) } @@ -107,12 +125,13 @@ func (sd *ServiceDiscovery) ReRegisterService() { if err != nil { logger.Errorf("[zkServiceDiscovery] registerService{%s} error = err{%v}", instance.Id, perrors.WithStack(err)) } + sd.ListenServiceInstanceEvent(instance.Name, instance.Id, sd) return true }) } func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, error) { - ids, err := sd.client.GetChildren(sd.basePath + "/" + name) + ids, err := sd.client.GetChildren(sd.pathForName(name)) if err != nil { return nil, err } @@ -131,7 +150,7 @@ func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, } func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceInstance, error) { - path := sd.basePath + "/" + name + "/" + id + path := sd.pathForInstance(name, id) data, _, err := sd.client.GetContent(path) if err != nil { return nil, err @@ -147,3 +166,33 @@ func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceIn func (sd *ServiceDiscovery) QueryForNames() ([]string, error) { return sd.client.GetChildren(sd.basePath) } + +func (sd *ServiceDiscovery) ListenServiceEvent(name string, listener remoting.DataListener) { + sd.listener.ListenServiceEvent(nil, sd.pathForName(name), listener) +} + +func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) { + sd.listener.ListenServiceEvent(nil, sd.pathForInstance(name, id), listener) +} + +func (sd *ServiceDiscovery) DataChange(eventType remoting.Event) bool { + path := eventType.Path + name, id := sd.getNameAndId(path) + sd.updateInternalService(name, id) + return true +} + +func (sd *ServiceDiscovery) getNameAndId(path string) (string, string) { + pathSlice := strings.Split(path, constant.PATH_SEPARATOR) + name := pathSlice[2] + id := pathSlice[3] + return name, id +} + +func (sd *ServiceDiscovery) pathForInstance(name, id string) string { + return sd.basePath + constant.PATH_SEPARATOR + name + constant.PATH_SEPARATOR + id +} + +func (sd *ServiceDiscovery) pathForName(name string) string { + return sd.basePath + constant.PATH_SEPARATOR + name +}