Skip to content
Snippets Groups Projects
Commit 1660c92f authored by Patrick's avatar Patrick
Browse files

zookeeper's service_discovery modify

parent 3832ba8a
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package zookeeper
import (
"fmt"
"net/url"
"strconv"
"strings"
"sync"
......@@ -59,14 +60,15 @@ func init() {
}
type zookeeperServiceDiscovery struct {
client *zookeeper.ZookeeperClient
csd *curator_discovery.ServiceDiscovery
listener *zookeeper.ZkEventListener
url *common.URL
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
rootPath string
client *zookeeper.ZookeeperClient
csd *curator_discovery.ServiceDiscovery
listener *zookeeper.ZkEventListener
url *common.URL
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
rootPath string
listenNames []string
}
func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
......@@ -92,86 +94,87 @@ func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error
if !ok {
return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
}
basePath := remoteConfig.GetParam("rootPath", "/services")
rootPath := remoteConfig.GetParam("rootPath", "/services")
url := common.NewURLWithOptions(
common.WithLocation(remoteConfig.Address),
common.WithParams(make(url.Values)),
common.WithPassword(remoteConfig.Password),
common.WithUsername(remoteConfig.Username),
common.WithParamsValue(constant.REGISTRY_TIMEOUT_KEY, remoteConfig.TimeoutStr))
url.Location = remoteConfig.Address
zksd := &zookeeperServiceDiscovery{
url: url,
rootPath: basePath,
rootPath: rootPath,
}
err := zookeeper.ValidateZookeeperClient(zksd, zookeeper.WithZkName(ServiceDiscoveryZkClient))
if err != nil {
return nil, err
}
go zookeeper.HandleClientRestart(zksd)
zksd.listener = zookeeper.NewZkEventListener(zksd.client)
zksd.listener.ListenServiceEvent(nil, basePath, zksd)
zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, basePath)
zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, rootPath)
return zksd, nil
}
func (zksd zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient {
func (zksd *zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient {
return zksd.client
}
func (zksd zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) {
func (zksd *zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) {
zksd.client = client
}
func (zksd zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
func (zksd *zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
return &zksd.cltLock
}
func (zksd zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
func (zksd *zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
return &zksd.wg
}
func (zksd zookeeperServiceDiscovery) Done() chan struct{} {
func (zksd *zookeeperServiceDiscovery) Done() chan struct{} {
return zksd.done
}
func (zksd zookeeperServiceDiscovery) RestartCallBack() bool {
func (zksd *zookeeperServiceDiscovery) RestartCallBack() bool {
zksd.csd.ReRegisterService()
zksd.listener.ListenServiceEvent(nil, zksd.rootPath, zksd)
for _, name := range zksd.listenNames {
zksd.csd.ListenServiceEvent(name, zksd)
}
return true
}
func (zksd zookeeperServiceDiscovery) GetUrl() common.URL {
func (zksd *zookeeperServiceDiscovery) GetUrl() common.URL {
return *zksd.url
}
func (zksd zookeeperServiceDiscovery) String() string {
func (zksd *zookeeperServiceDiscovery) String() string {
return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
}
func (zksd zookeeperServiceDiscovery) Destroy() error {
func (zksd *zookeeperServiceDiscovery) Destroy() error {
zksd.client.Close()
return nil
}
func (zksd zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
func (zksd *zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.RegisterService(cris)
}
func (zksd zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
func (zksd *zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.UpdateService(cris)
}
func (zksd zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
func (zksd *zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.UnregisterService(cris)
}
func (zksd zookeeperServiceDiscovery) GetDefaultPageSize() int {
func (zksd *zookeeperServiceDiscovery) GetDefaultPageSize() int {
return registry.DefaultPageSize
}
func (zksd zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
func (zksd *zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
services, err := zksd.csd.QueryForNames()
res := gxset.NewSet()
if err != nil {
......@@ -184,7 +187,7 @@ func (zksd zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
return res
}
func (zksd zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
func (zksd *zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
criss, err := zksd.csd.QueryForInstances(serviceName)
if err != nil {
logger.Errorf("[zkServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
......@@ -198,7 +201,7 @@ func (zksd zookeeperServiceDiscovery) GetInstances(serviceName string) []registr
return iss
}
func (zksd zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
func (zksd *zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := zksd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
......@@ -208,11 +211,26 @@ func (zksd zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, off
return gxpage.New(offset, pageSize, res, len(all))
}
func (zksd zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, _ bool) gxpage.Pager {
return zksd.GetInstancesByPage(serviceName, offset, pageSize)
func (zksd *zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
all := zksd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
var (
i = offset
count = 0
)
for i < len(all) && count < pageSize {
ins := all[i]
if ins.IsHealthy() == healthy {
res = append(res, all[i])
count++
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
}
func (zksd zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
res[name] = zksd.GetInstancesByPage(name, offset, requestedSize)
......@@ -220,28 +238,28 @@ func (zksd zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string,
return res
}
func (zksd zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
zksd.listenNames = append(zksd.listenNames, listener.ServiceName)
zksd.csd.ListenServiceEvent(listener.ServiceName, zksd)
return nil
}
func (zksd zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
func (zksd *zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
}
func (zksd zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
func (zksd *zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
func (zksd zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
func (zksd *zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
extension.GetGlobalDispatcher().Dispatch(event)
return nil
}
func (zksd zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
func (zksd *zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
path := eventType.Path
name := strings.Split(path, "/")[1]
id := strings.Split(path, "/")[2]
zksd.csd.UpdateInternalService(name, id)
name := strings.Split(path, "/")[2]
err := zksd.DispatchEventByServiceName(name)
if err != nil {
logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", name, err)
......@@ -249,7 +267,7 @@ func (zksd zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool
return true
}
func (zksd zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance {
func (zksd *zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance {
id := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort())
pl := make(map[string]interface{})
pl["id"] = id
......@@ -266,7 +284,7 @@ func (zksd zookeeperServiceDiscovery) toCuratorInstance(instance registry.Servic
return cuis
}
func (zksd zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
func (zksd *zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
pl, ok := cris.Payload.(map[string]interface{})
if !ok {
logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} payload is not map", cris.Id)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
"strconv"
"testing"
)
var testName = "test"
func prepareData(t *testing.T) *zk.TestCluster {
ts, err := zk.StartTestCluster(1, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, ts.Servers[0])
address := "127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)
config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "zookeeper",
RemoteRef: "test",
}
config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
Address: address,
TimeoutStr: "10s",
}
return ts
}
func TestNewZookeeperServiceDiscovery(t *testing.T) {
name := "zookeeper1"
_, err := newZookeeperServiceDiscovery(name)
// the ServiceDiscoveryConfig not found
assert.NotNil(t, err)
sdc := &config.ServiceDiscoveryConfig{
Protocol: "zookeeper",
RemoteRef: "mock",
}
config.GetBaseConfig().ServiceDiscoveries[name] = sdc
_, err = newZookeeperServiceDiscovery(name)
// RemoteConfig not found
assert.NotNil(t, err)
}
func TestCURDZookeeperServiceDiscovery(t *testing.T) {
ts := prepareData(t)
defer ts.Stop()
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
md := make(map[string]string)
md["t1"] = "test1"
err = sd.Register(&registry.DefaultServiceInstance{
Id: "testId",
ServiceName: testName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: md,
})
assert.Nil(t, err)
tests := sd.GetInstances(testName)
assert.Equal(t, tests[0].GetId(), "127.0.0.1:2233")
err = sd.Unregister(&registry.DefaultServiceInstance{
Id: "testId",
ServiceName: testName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: nil,
})
assert.Nil(t, err)
}
......@@ -19,10 +19,19 @@ package curator_discovery
import (
"encoding/json"
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
perrors "github.com/pkg/errors"
"sync"
)
type ServiceDiscovery struct {
......@@ -30,6 +39,7 @@ type ServiceDiscovery struct {
mutex *sync.Mutex
basePath string
services *sync.Map
listener *zookeeper.ZkEventListener
}
func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *ServiceDiscovery {
......@@ -38,11 +48,12 @@ func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *Se
mutex: &sync.Mutex{},
basePath: basePath,
services: &sync.Map{},
listener: zookeeper.NewZkEventListener(client),
}
}
func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
path := sd.basePath + "/" + instance.Name + "/" + instance.Id
path := sd.pathForInstance(instance.Name, instance.Id)
data, err := json.Marshal(instance)
if err != nil {
return err
......@@ -55,13 +66,20 @@ func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
}
func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error {
sd.services.Store(instance.Id, instance)
return sd.registerService(instance)
_, loaded := sd.services.LoadOrStore(instance.Id, instance)
err := sd.registerService(instance)
if err != nil {
return err
}
if !loaded {
sd.ListenServiceInstanceEvent(instance.Name, instance.Id, sd)
}
return nil
}
func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error {
sd.services.Store(instance.Id, instance)
path := sd.basePath + "/" + instance.Name + "/" + instance.Id
path := sd.pathForInstance(instance.Name, instance.Id)
data, err := json.Marshal(instance)
if err != nil {
return err
......@@ -73,7 +91,7 @@ func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error {
return nil
}
func (sd *ServiceDiscovery) UpdateInternalService(name, id string) {
func (sd *ServiceDiscovery) updateInternalService(name, id string) {
_, ok := sd.services.Load(id)
if !ok {
return
......@@ -93,7 +111,7 @@ func (sd *ServiceDiscovery) UnregisterService(instance *ServiceInstance) error {
}
func (sd *ServiceDiscovery) unregisterService(instance *ServiceInstance) error {
path := sd.basePath + "/" + instance.Name + "/" + instance.Id
path := sd.pathForInstance(instance.Name, instance.Id)
return sd.client.Delete(path)
}
......@@ -107,12 +125,13 @@ func (sd *ServiceDiscovery) ReRegisterService() {
if err != nil {
logger.Errorf("[zkServiceDiscovery] registerService{%s} error = err{%v}", instance.Id, perrors.WithStack(err))
}
sd.ListenServiceInstanceEvent(instance.Name, instance.Id, sd)
return true
})
}
func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, error) {
ids, err := sd.client.GetChildren(sd.basePath + "/" + name)
ids, err := sd.client.GetChildren(sd.pathForName(name))
if err != nil {
return nil, err
}
......@@ -131,7 +150,7 @@ func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance,
}
func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceInstance, error) {
path := sd.basePath + "/" + name + "/" + id
path := sd.pathForInstance(name, id)
data, _, err := sd.client.GetContent(path)
if err != nil {
return nil, err
......@@ -147,3 +166,33 @@ func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceIn
func (sd *ServiceDiscovery) QueryForNames() ([]string, error) {
return sd.client.GetChildren(sd.basePath)
}
func (sd *ServiceDiscovery) ListenServiceEvent(name string, listener remoting.DataListener) {
sd.listener.ListenServiceEvent(nil, sd.pathForName(name), listener)
}
func (sd *ServiceDiscovery) ListenServiceInstanceEvent(name, id string, listener remoting.DataListener) {
sd.listener.ListenServiceEvent(nil, sd.pathForInstance(name, id), listener)
}
func (sd *ServiceDiscovery) DataChange(eventType remoting.Event) bool {
path := eventType.Path
name, id := sd.getNameAndId(path)
sd.updateInternalService(name, id)
return true
}
func (sd *ServiceDiscovery) getNameAndId(path string) (string, string) {
pathSlice := strings.Split(path, constant.PATH_SEPARATOR)
name := pathSlice[2]
id := pathSlice[3]
return name, id
}
func (sd *ServiceDiscovery) pathForInstance(name, id string) string {
return sd.basePath + constant.PATH_SEPARATOR + name + constant.PATH_SEPARATOR + id
}
func (sd *ServiceDiscovery) pathForName(name string) string {
return sd.basePath + constant.PATH_SEPARATOR + name
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment