Skip to content
Snippets Groups Projects
Commit b08b8fc1 authored by Patrick's avatar Patrick
Browse files

zookeeper's service_discovery

parent 9eddc5c2
No related branches found
No related tags found
No related merge requests found
......@@ -159,6 +159,10 @@ const (
NACOS_PATH_KEY = "path"
)
const (
ZOOKEEPER_KEY = "zookeeper"
)
const (
TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx"
)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"fmt"
"strconv"
"strings"
"sync"
)
import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/zookeeper"
"github.com/apache/dubbo-go/remoting/zookeeper/curator_discovery"
)
const (
// RegistryZkClient zk client name
ServiceDiscoveryZkClient = "zk service discovery"
)
var (
// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
instanceMap = make(map[string]registry.ServiceDiscovery, 16)
initLock sync.Mutex
)
// init will put the service discovery into extension
func init() {
extension.SetServiceDiscovery(constant.ZOOKEEPER_KEY, newZookeeperServiceDiscovery)
}
type zookeeperServiceDiscovery struct {
client *zookeeper.ZookeeperClient
csd *curator_discovery.ServiceDiscovery
listener *zookeeper.ZkEventListener
url *common.URL
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
rootPath string
}
func newZookeeperServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
instance, ok := instanceMap[name]
if ok {
return instance, nil
}
initLock.Lock()
defer initLock.Unlock()
// double check
instance, ok = instanceMap[name]
if ok {
return instance, nil
}
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
if !ok || len(sdc.RemoteRef) == 0 {
return nil, perrors.New("could not init the instance because the config is invalid")
}
remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
if !ok {
return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
}
basePath := remoteConfig.GetParam("rootPath", "/services")
url := common.NewURLWithOptions(
common.WithLocation(remoteConfig.Address),
common.WithPassword(remoteConfig.Password),
common.WithUsername(remoteConfig.Username),
common.WithParamsValue(constant.REGISTRY_TIMEOUT_KEY, remoteConfig.TimeoutStr))
zksd := &zookeeperServiceDiscovery{
url: url,
rootPath: basePath,
}
err := zookeeper.ValidateZookeeperClient(zksd, zookeeper.WithZkName(ServiceDiscoveryZkClient))
if err != nil {
return nil, err
}
go zookeeper.HandleClientRestart(zksd)
zksd.listener = zookeeper.NewZkEventListener(zksd.client)
zksd.listener.ListenServiceEvent(nil, basePath, zksd)
zksd.csd = curator_discovery.NewServiceDiscovery(zksd.client, basePath)
return zksd, nil
}
func (zksd zookeeperServiceDiscovery) ZkClient() *zookeeper.ZookeeperClient {
return zksd.client
}
func (zksd zookeeperServiceDiscovery) SetZkClient(client *zookeeper.ZookeeperClient) {
zksd.client = client
}
func (zksd zookeeperServiceDiscovery) ZkClientLock() *sync.Mutex {
return &zksd.cltLock
}
func (zksd zookeeperServiceDiscovery) WaitGroup() *sync.WaitGroup {
return &zksd.wg
}
func (zksd zookeeperServiceDiscovery) Done() chan struct{} {
return zksd.done
}
func (zksd zookeeperServiceDiscovery) RestartCallBack() bool {
zksd.csd.ReRegisterService()
zksd.listener.ListenServiceEvent(nil, zksd.rootPath, zksd)
return true
}
func (zksd zookeeperServiceDiscovery) GetUrl() common.URL {
return *zksd.url
}
func (zksd zookeeperServiceDiscovery) String() string {
return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
}
func (zksd zookeeperServiceDiscovery) Destroy() error {
zksd.client.Close()
return nil
}
func (zksd zookeeperServiceDiscovery) Register(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.RegisterService(cris)
}
func (zksd zookeeperServiceDiscovery) Update(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.UpdateService(cris)
}
func (zksd zookeeperServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
cris := zksd.toCuratorInstance(instance)
return zksd.csd.UnregisterService(cris)
}
func (zksd zookeeperServiceDiscovery) GetDefaultPageSize() int {
return registry.DefaultPageSize
}
func (zksd zookeeperServiceDiscovery) GetServices() *gxset.HashSet {
services, err := zksd.csd.QueryForNames()
res := gxset.NewSet()
if err != nil {
logger.Errorf("[zkServiceDiscovery] Could not query the services: %v", err)
return res
}
for _, service := range services {
res.Add(service)
}
return res
}
func (zksd zookeeperServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
criss, err := zksd.csd.QueryForInstances(serviceName)
if err != nil {
logger.Errorf("[zkServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
serviceName, err)
return make([]registry.ServiceInstance, 0, 0)
}
iss := make([]registry.ServiceInstance, 0, len(criss))
for _, cris := range criss {
iss = append(iss, zksd.toZookeeperInstance(cris))
}
return iss
}
func (zksd zookeeperServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := zksd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
}
func (zksd zookeeperServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, _ bool) gxpage.Pager {
return zksd.GetInstancesByPage(serviceName, offset, pageSize)
}
func (zksd zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
res[name] = zksd.GetInstancesByPage(name, offset, requestedSize)
}
return res
}
func (zksd zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
return nil
}
func (zksd zookeeperServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return zksd.DispatchEventForInstances(serviceName, zksd.GetInstances(serviceName))
}
func (zksd zookeeperServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
return zksd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
func (zksd zookeeperServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
extension.GetGlobalDispatcher().Dispatch(event)
return nil
}
func (zksd zookeeperServiceDiscovery) DataChange(eventType remoting.Event) bool {
path := eventType.Path
name := strings.Split(path, "/")[1]
id := strings.Split(path, "/")[2]
zksd.csd.UpdateInternalService(name, id)
err := zksd.DispatchEventByServiceName(name)
if err != nil {
logger.Errorf("[zkServiceDiscovery] DispatchEventByServiceName{%s} error = err{%v}", name, err)
}
return true
}
func (zksd zookeeperServiceDiscovery) toCuratorInstance(instance registry.ServiceInstance) *curator_discovery.ServiceInstance {
id := instance.GetHost() + ":" + strconv.Itoa(instance.GetPort())
pl := make(map[string]interface{})
pl["id"] = id
pl["name"] = instance.GetServiceName()
pl["metadata"] = instance.GetMetadata()
cuis := &curator_discovery.ServiceInstance{
Name: instance.GetServiceName(),
Id: id,
Address: instance.GetHost(),
Port: instance.GetPort(),
Payload: pl,
RegistrationTimeUTC: 0,
}
return cuis
}
func (zksd zookeeperServiceDiscovery) toZookeeperInstance(cris *curator_discovery.ServiceInstance) registry.ServiceInstance {
pl, ok := cris.Payload.(map[string]interface{})
if !ok {
logger.Errorf("[zkServiceDiscovery] toZookeeperInstance{%s} payload is not map", cris.Id)
}
md, ok := pl["metadata"].(map[string]string)
return &registry.DefaultServiceInstance{
Id: cris.Id,
ServiceName: cris.Name,
Host: cris.Address,
Port: cris.Port,
Enable: true,
Healthy: true,
Metadata: md,
}
}
......@@ -108,7 +108,7 @@ func WithZkName(name string) Option {
}
// ValidateZookeeperClient ...
func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
func ValidateZookeeperClient(container ZkClientFacade, opts ...Option) error {
var err error
options := &Options{}
for _, opt := range opts {
......@@ -604,6 +604,11 @@ func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}
// GetContent ...
func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int32) (*zk.Stat, error) {
return z.Conn.Set(zkPath, content, version)
}
// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
z.RLock()
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package curator_discovery
import (
"encoding/json"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting/zookeeper"
perrors "github.com/pkg/errors"
"sync"
)
type ServiceDiscovery struct {
client *zookeeper.ZookeeperClient
mutex *sync.Mutex
basePath string
services *sync.Map
}
func NewServiceDiscovery(client *zookeeper.ZookeeperClient, basePath string) *ServiceDiscovery {
return &ServiceDiscovery{
client: client,
mutex: &sync.Mutex{},
basePath: basePath,
services: &sync.Map{},
}
}
func (sd *ServiceDiscovery) registerService(instance *ServiceInstance) error {
path := sd.basePath + "/" + instance.Name + "/" + instance.Id
data, err := json.Marshal(instance)
if err != nil {
return err
}
err = sd.client.CreateWithValue(path, data)
if err != nil {
return err
}
return nil
}
func (sd *ServiceDiscovery) RegisterService(instance *ServiceInstance) error {
sd.services.Store(instance.Id, instance)
return sd.registerService(instance)
}
func (sd *ServiceDiscovery) UpdateService(instance *ServiceInstance) error {
sd.services.Store(instance.Id, instance)
path := sd.basePath + "/" + instance.Name + "/" + instance.Id
data, err := json.Marshal(instance)
if err != nil {
return err
}
_, err = sd.client.SetContent(path, data, -1)
if err != nil {
return err
}
return nil
}
func (sd *ServiceDiscovery) UpdateInternalService(name, id string) {
_, ok := sd.services.Load(id)
if !ok {
return
}
instance, err := sd.QueryForInstance(name, id)
if err != nil {
logger.Infof("[zkServiceDiscovery] UpdateInternalService{%s} error = err{%v}", id, err)
return
}
sd.services.Store(instance.Id, instance)
return
}
func (sd *ServiceDiscovery) UnregisterService(instance *ServiceInstance) error {
sd.services.Delete(instance.Id)
return sd.unregisterService(instance)
}
func (sd *ServiceDiscovery) unregisterService(instance *ServiceInstance) error {
path := sd.basePath + "/" + instance.Name + "/" + instance.Id
return sd.client.Delete(path)
}
func (sd *ServiceDiscovery) ReRegisterService() {
sd.services.Range(func(key, value interface{}) bool {
instance, ok := value.(*ServiceInstance)
if !ok {
}
err := sd.registerService(instance)
if err != nil {
logger.Errorf("[zkServiceDiscovery] registerService{%s} error = err{%v}", instance.Id, perrors.WithStack(err))
}
return true
})
}
func (sd *ServiceDiscovery) QueryForInstances(name string) ([]*ServiceInstance, error) {
ids, err := sd.client.GetChildren(sd.basePath + "/" + name)
if err != nil {
return nil, err
}
var (
instance *ServiceInstance
instances []*ServiceInstance
)
for _, id := range ids {
instance, err = sd.QueryForInstance(name, id)
if err != nil {
return nil, err
}
instances = append(instances, instance)
}
return instances, nil
}
func (sd *ServiceDiscovery) QueryForInstance(name string, id string) (*ServiceInstance, error) {
path := sd.basePath + "/" + name + "/" + id
data, _, err := sd.client.GetContent(path)
if err != nil {
return nil, err
}
instance := &ServiceInstance{}
err = json.Unmarshal(data, instance)
if err != nil {
return nil, err
}
return instance, nil
}
func (sd *ServiceDiscovery) QueryForNames() ([]string, error) {
return sd.client.GetChildren(sd.basePath)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package curator_discovery
type ServiceInstance struct {
Name string
Id string
Address string
Port int
Payload interface{}
RegistrationTimeUTC int64
}
......@@ -18,6 +18,7 @@
package zookeeper
import (
"github.com/apache/dubbo-go/common"
"sync"
)
import (
......@@ -26,22 +27,21 @@ import (
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
type zkClientFacade interface {
type ZkClientFacade interface {
ZkClient() *ZookeeperClient
SetZkClient(*ZookeeperClient)
ZkClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
Done() chan struct{} //for zk client control
RestartCallBack() bool
common.Node
GetUrl() common.URL
}
// HandleClientRestart ...
func HandleClientRestart(r zkClientFacade) {
func HandleClientRestart(r ZkClientFacade) {
var (
err error
......
......@@ -38,7 +38,7 @@ type mockFacade struct {
done chan struct{}
}
func newMockFacade(client *ZookeeperClient, url *common.URL) zkClientFacade {
func newMockFacade(client *ZookeeperClient, url *common.URL) ZkClientFacade {
mock := &mockFacade{
client: client,
URL: url,
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment