diff --git a/common/constant/key.go b/common/constant/key.go index 9c59575eb83e6d7742c9783b22fec14dd52ede73..f0130b07448140bbb8d71134b688df708e1e3ff7 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -160,6 +160,10 @@ const ( NACOS_PATH_KEY = "path" ) +const ( + ETCDV3_KEY = "etcdv3" +) + const ( TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx" ) diff --git a/go.mod b/go.mod index f4318e5a5d6958759a6d078bb70fa01f455a451a..89d3f9ce2630907d15aa3637f5ab42b51110572c 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/consul v1.5.3 github.com/hashicorp/consul/api v1.1.0 + github.com/hashicorp/vault v0.10.3 github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8 github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect github.com/juju/testing v0.0.0-20191001232224-ce9dec17d28b // indirect @@ -43,7 +44,6 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8 - go.etcd.io/bbolt v1.3.4 // indirect go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..aab317389bb00ad2e34448d3765276fa084763da --- /dev/null +++ b/registry/etcdv3/service_discovery.go @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcdv3 + +import ( + "fmt" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/etcdv3" + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" + "github.com/hashicorp/vault/helper/jsonutil" + perrors "github.com/pkg/errors" + "sync" + "time" +) + +const ( + ROOT = "/services" +) + +var ( + initLock sync.Mutex +) + +func init() { + extension.SetServiceDiscovery(constant.ETCDV3_KEY, newEtcdV3ServiceDiscovery) +} + +// new etcd service discovery struct +type etcdV3ServiceDiscovery struct { + // descriptor is a short string about the basic information of this instance + descriptor string + // client is current Etcdv3 client + client *etcdv3.Client + // serviceInstance is current serviceInstance + serviceInstance *registry.ServiceInstance + // services is when register or update will add service name + services *gxset.HashSet + // child listener + childListenerMap map[string]*etcdv3.EventListener +} + +// basic information of this instance +func (e *etcdV3ServiceDiscovery) String() string { + return e.descriptor +} + +// Destory service discovery +func (e *etcdV3ServiceDiscovery) Destroy() error { + if e.client != nil { + e.client.Close() + } + return nil +} + +// Register will register an instance of ServiceInstance to registry +func (e *etcdV3ServiceDiscovery) Register(instance registry.ServiceInstance) error { + + e.serviceInstance = &instance + + path := toPath(instance) + + if nil != e.client { + ins, err := jsonutil.EncodeJSON(instance) + if err == nil { + e.client.Create(path, string(ins)) + e.services.Add(instance.GetServiceName()) + } + } + + return nil +} + +// Update will update the data of the instance in registry +func (e *etcdV3ServiceDiscovery) Update(instance registry.ServiceInstance) error { + path := toPath(instance) + + if nil != e.client { + ins, err := jsonutil.EncodeJSON(instance) + if nil == err { + e.client.Update(path, string(ins)) + e.services.Add(instance.GetServiceName()) + } + } + + return nil +} + +// Unregister will unregister this instance from registry +func (e *etcdV3ServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + path := toPath(instance) + + if nil != e.client { + err := e.client.Delete(path) + e.services.Remove(instance.GetServiceName()) + e.serviceInstance = nil + return err + } + + return nil +} + +// ----------------- discovery ------------------- +// GetDefaultPageSize will return the default page size +func (e *etcdV3ServiceDiscovery) GetDefaultPageSize() int { + return registry.DefaultPageSize +} + +// GetServices will return the all service names. +func (e *etcdV3ServiceDiscovery) GetServices() *gxset.HashSet { + return e.services +} + +// GetInstances will return all service instances with serviceName +func (e *etcdV3ServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + + if nil != e.client { + // get keys and values + _, vList, err := e.client.GetChildrenKVList(toParentPath(serviceName)) + if nil == err { + serviceInstances := make([]registry.ServiceInstance, 0, len(vList)) + for _, v := range vList { + instance := ®istry.DefaultServiceInstance{} + err = jsonutil.DecodeJSON([]byte(v), &instance) + if nil == err { + serviceInstances = append(serviceInstances, instance) + } + } + return serviceInstances + } + perrors.New(fmt.Sprintf("could not getChildrenKVList the err is:%v", err)) + } + + return make([]registry.ServiceInstance, 0, 0) +} + +// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName +// the page will start at offset +func (e *etcdV3ServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + + all := e.GetInstances(serviceName) + + res := make([]interface{}, 0, pageSize) + + for i := offset; i < len(all) && i < offset+pageSize; i++ { + res = append(res, all[i]) + } + + return gxpage.New(offset, pageSize, res, len(all)) +} + +// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance. +// The param healthy indices that the instance should be healthy or not. +// The page will start at offset +func (e *etcdV3ServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + all := e.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + + var ( + i = offset + count = 0 + ) + for i < len(all) && count < pageSize { + ins := all[i] + if ins.IsHealthy() == healthy { + res = append(res, all[i]) + count++ + } + i++ + } + return gxpage.New(offset, pageSize, res, len(all)) +} + +// Batch get all instances by the specified service names +func (e *etcdV3ServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + res := make(map[string]gxpage.Pager, len(serviceNames)) + for _, name := range serviceNames { + res[name] = e.GetInstancesByPage(name, offset, requestedSize) + } + return res +} + +// ----------------- event ---------------------- +// AddListener adds a new ServiceInstancesChangedListener +// see addServiceInstancesChangedListener in Java +func (e *etcdV3ServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + return e.registerSreviceWatcher(listener.ServiceName) +} + +// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName +func (e *etcdV3ServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + return e.DispatchEventForInstances(serviceName, e.GetInstances(serviceName)) +} + +// DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances +func (e *etcdV3ServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return e.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) +} + +// DispatchEvent dispatches the event +func (e *etcdV3ServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + extension.GetGlobalDispatcher().Dispatch(event) + return nil +} + +// Convert instance to dubbo path +func toPath(instance registry.ServiceInstance) string { + if instance == nil { + return "" + } + // like: /services/servicename1/host(127.0.0.1)/8080 + return fmt.Sprintf("%s%d", ROOT+constant.PATH_SEPARATOR+instance.GetServiceName()+constant.PATH_SEPARATOR+instance.GetHost()+constant.KEY_SEPARATOR, instance.GetPort()) +} + +// to dubbo service path +func toParentPath(serviceName string) string { + return ROOT + constant.PATH_SEPARATOR + serviceName +} + +// register service watcher +func (e *etcdV3ServiceDiscovery) registerSreviceWatcher(serviceName string) error { + + initLock.Lock() + defer initLock.Unlock() + + path := toParentPath(serviceName) + + listener, found := e.childListenerMap[serviceName] + + if !found { + listener = etcdv3.NewEventListener(e.client) + e.childListenerMap[serviceName] = listener + } + + listener.ListenServiceEvent(path, e) + + return nil +} + +// when child data change should DispatchEventByServiceName +func (e *etcdV3ServiceDiscovery) DataChange(eventType remoting.Event) bool { + + if eventType.Action == remoting.EventTypeUpdate { + instance := ®istry.DefaultServiceInstance{} + err := jsonutil.DecodeJSON([]byte(eventType.Content), &instance) + if err != nil { + instance.ServiceName = "" + } + + if err := e.DispatchEventByServiceName(instance.ServiceName); err != nil { + return false + } + } + + return true +} + +// netEcdv3ServiceDiscovery +func newEtcdV3ServiceDiscovery(name string) (registry.ServiceDiscovery, error) { + + initLock.Lock() + defer initLock.Unlock() + + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) + if !ok || len(sdc.RemoteRef) == 0 { + return nil, perrors.New("could not init the etcd service instance because the config is invalid") + } + + remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) + if !ok { + return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) + } + + // init etcdv3 client + timeout, err := time.ParseDuration(remoteConfig.TimeoutStr) + if err != nil { + logger.Errorf("timeout config %v is invalid,err is %v", remoteConfig.TimeoutStr, err.Error()) + return nil, perrors.WithMessagef(err, "new etcd service discovery(address:%v)", remoteConfig.Address) + } + + logger.Infof("etcd address is: %v,timeout is:%s", remoteConfig.Address, timeout.String()) + + client := etcdv3.NewServiceDiscoveryClient( + etcdv3.WithName(etcdv3.RegistryETCDV3Client), + etcdv3.WithTimeout(timeout), + etcdv3.WithEndpoints(remoteConfig.Address), + ) + + descriptor := fmt.Sprintf("etcd-service-discovery[%s]", remoteConfig.Address) + + return &etcdV3ServiceDiscovery{descriptor, client, nil, gxset.NewSet(), make(map[string]*etcdv3.EventListener, 0)}, nil +} diff --git a/registry/etcdv3/service_discovery_test.go b/registry/etcdv3/service_discovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ff3708e6f33015cba59eb72fcdff619e55518a73 --- /dev/null +++ b/registry/etcdv3/service_discovery_test.go @@ -0,0 +1,148 @@ +package etcdv3 + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" + "github.com/stretchr/testify/assert" + "testing" +) + +var testName = "test" + +func setUp() { + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "etcdv3", + RemoteRef: testName, + } + + config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ + Address: "localhost:2380", + TimeoutStr: "1000s", + } +} + +func Test_newEtcdV3ServiceDiscovery(t *testing.T) { + name := constant.ETCDV3_KEY + _, err := newEtcdV3ServiceDiscovery(name) + + // warn: log configure file name is nil + assert.NotNil(t, err) + + sdc := &config.ServiceDiscoveryConfig{ + Protocol: "etcdv3", + RemoteRef: "mock", + } + config.GetBaseConfig().ServiceDiscoveries[name] = sdc + + _, err = newEtcdV3ServiceDiscovery(name) + + // RemoteConfig not found + assert.NotNil(t, err) + + config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ + Address: "localhost:2380", + TimeoutStr: "10s", + } + + res, err := newEtcdV3ServiceDiscovery(name) + assert.Nil(t, err) + assert.NotNil(t, res) +} + +func TestEtcdV3ServiceDiscovery_Destroy(t *testing.T) { + setUp() + serviceDiscovery, err := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) + + assert.Nil(t, err) + assert.NotNil(t, serviceDiscovery) + + err = serviceDiscovery.Destroy() + assert.Nil(t, err) + assert.NotNil(t, serviceDiscovery.(*etcdV3ServiceDiscovery).client) +} + +func TestEtcdV3ServiceDiscovery_CRUD(t *testing.T) { + setUp() + extension.SetEventDispatcher("mock", func() observer.EventDispatcher { + return &dispatcher.MockEventDispatcher{} + }) + + extension.SetAndInitGlobalDispatcher("mock") + + serviceName := "service-name" + id := "id" + host := "host" + port := 123 + instance := ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: serviceName, + Host: host, + Port: port, + Enable: true, + Healthy: true, + Metadata: nil, + } + + // clean data + + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) + + // clean data for local test + serviceDiscovry.Unregister(®istry.DefaultServiceInstance{ + Id: id, + ServiceName: serviceName, + Host: host, + Port: port, + }) + + err := serviceDiscovry.Register(instance) + assert.Nil(t, err) + + page := serviceDiscovry.GetHealthyInstancesByPage(serviceName, 0, 10, true) + assert.NotNil(t, page) + + assert.Equal(t, 0, page.GetOffset()) + assert.Equal(t, 10, page.GetPageSize()) + assert.Equal(t, 1, page.GetDataSize()) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + assert.NotNil(t, instance) + assert.Equal(t, id, instance.GetId()) + assert.Equal(t, host, instance.GetHost()) + assert.Equal(t, port, instance.GetPort()) + assert.Equal(t, serviceName, instance.GetServiceName()) + assert.Equal(t, 0, len(instance.GetMetadata())) + + instance.Metadata["a"] = "b" + + err = serviceDiscovry.Update(instance) + assert.Nil(t, err) + + pageMap := serviceDiscovry.GetRequestInstances([]string{serviceName}, 0, 1) + assert.Equal(t, 1, len(pageMap)) + page = pageMap[serviceName] + assert.NotNil(t, page) + assert.Equal(t, 1, len(page.GetData())) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + v, _ := instance.Metadata["a"] + assert.Equal(t, "b", v) + + // test dispatcher event + err = serviceDiscovry.DispatchEventByServiceName(serviceName) + assert.Nil(t, err) + + // test AddListener + err = serviceDiscovry.AddListener(®istry.ServiceInstancesChangedListener{ServiceName: serviceName}) + assert.Nil(t, err) +} + +func TestEtcdV3ServiceDiscovery_GetDefaultPageSize(t *testing.T) { + setUp() + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) + assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) +} diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 85d6e3d9bde4182a30b0853f459ced23e6adf0c8..93da6402c759583ce7600c474910637e6ed77870 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -132,6 +132,26 @@ func ValidateClient(container clientFacade, opts ...Option) error { return nil } +// NewServiceDiscoveryClient +func NewServiceDiscoveryClient(opts ...Option) *Client { + options := &Options{ + heartbeat: 1, // default heartbeat + } + + for _, opt := range opts { + opt(options) + } + + newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) + if err != nil { + logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", + options.name, options.endpoints, options.timeout, err) + return nil + } + + return newClient +} + // Client represents etcd client Configuration type Client struct { lock sync.RWMutex @@ -287,6 +307,28 @@ func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error { return nil } +// if k not exist will put k/v in etcd +// if k is already exist in etcd, replace it +func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return ErrNilETCDV3Client + } + + _, err := c.rawClient.Txn(c.ctx). + If(clientv3.Compare(clientv3.Version(k), "!=", -1)). + Then(clientv3.OpPut(k, v, opts...)). + Commit() + if err != nil { + return err + + } + return nil +} + func (c *Client) delete(k string) error { c.lock.RLock() @@ -457,6 +499,15 @@ func (c *Client) Create(k string, v string) error { return nil } +// Update key value ... +func (c *Client) Update(k, v string) error { + err := c.update(k, v) + if err != nil { + return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v) + } + return nil +} + // nolint func (c *Client) Delete(k string) error {