diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index eb89329d711fd0fa9c9cb0b04811075ca1a83e3b..2ab2d63f34b11859f094250c4612601aac9d78b4 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -18,21 +18,26 @@ package consul import ( + "crypto/md5" + "encoding/json" "fmt" - "github.com/hashicorp/consul/api/watch" - "github.com/hashicorp/go-hclog" "strconv" + "strings" "sync" + "time" ) import ( "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/page" consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + "github.com/hashicorp/go-hclog" perrors "github.com/pkg/errors" ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -45,6 +50,19 @@ const ( Enable = "enable" ) +const ( + CHECK_PASS_INTERVAL = "consul-check-pass-interval" + // default time-to-live in millisecond + DEFAULT_CHECK_PASS_INTERVAL = 16000 + UERY_TAG = "consul_query_tag" + ACL_TOKEN = "acl-token" + // default deregister critical server after + DEFAULT_DEREGISTER_TIME = "20s" + DEFAULT_WATCH_TIMEOUT = 60 * 1000 + WATCH_TIMEOUT = "consul-watch-timeout" + DEREGISTER_AFTER = "consul-deregister-critical-service-after" +) + var ( // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition instanceMap = make(map[string]registry.ServiceDiscovery, 16) @@ -84,12 +102,6 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) } - config := &consul.Config{Address: remoteConfig.Address} - client, err := consul.NewClient(config) - if err != nil { - return nil, perrors.WithMessage(err, "create consul client failed.") - } - descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address) pageSize := 20 @@ -102,9 +114,9 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) { } } return &consulServiceDiscovery{ - consulClient: client, - descriptor: descriptor, - PageSize: pageSize, + address: remoteConfig.Address, + descriptor: descriptor, + PageSize: pageSize, }, nil } @@ -116,10 +128,29 @@ type consulServiceDiscovery struct { // descriptor is a short string about the basic information of this instance descriptor string // Consul client. - consulClient *consul.Client - PageSize int + consulClient *consul.Client + PageSize int + serviceUrl common.URL + checkPassInterval int64 + tag string + tags []string + address string } +func (csd consulServiceDiscovery) Initialize(registryURL common.URL) error { + csd.serviceUrl = registryURL + csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL) + csd.tag = registryURL.GetParam(UERY_TAG, "") + csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",") + aclToken := registryURL.GetParam(ACL_TOKEN, "") + config := &consul.Config{Address: csd.address, Token: aclToken} + client, err := consul.NewClient(config) + if err != nil { + return perrors.WithMessage(err, "create consul client failed.") + } + csd.consulClient = client + return nil +} func (csd consulServiceDiscovery) String() string { return csd.descriptor } @@ -130,11 +161,8 @@ func (csd consulServiceDiscovery) Destroy() error { } func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error { - ins, err := csd.buildRegisterInstance(instance) - if err != nil { - panic(err) - } - err = csd.consulClient.Agent().ServiceRegister(ins) + ins, _ := csd.buildRegisterInstance(instance) + err := csd.consulClient.Agent().ServiceRegister(ins) if err != nil { return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName()) } @@ -151,7 +179,7 @@ func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) erro } func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - return csd.consulClient.Agent().ServiceDeregister(instance.GetId()) + return csd.consulClient.Agent().ServiceDeregister(buildID(instance)) } func (csd consulServiceDiscovery) GetDefaultPageSize() int { @@ -161,20 +189,24 @@ func (csd consulServiceDiscovery) GetDefaultPageSize() int { func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { var res = gxset.NewSet() - services, err := csd.consulClient.Agent().Services() + services, _, err := csd.consulClient.Catalog().Services(nil) if err != nil { return res } - for _, service := range services { - res.Add(service.Service) + for service, _ := range services { + res.Add(service) } return res } func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - _, instances, err := csd.consulClient.Agent().AgentHealthServiceByName(serviceName) + waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 + instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ + WaitTime: time.Duration(waitTime), + WaitIndex: -1, + }) if err != nil { return nil } @@ -249,6 +281,7 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance params := make(map[string]interface{}, 8) params["type"] = "service" params["service"] = listener.ServiceName + params["passingonly"] = true //params["tag"] = "dubbo" //params["passingonly"] = true plan, err := watch.Parse(params) @@ -327,23 +360,37 @@ func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.Servic } metadata[Enable] = strconv.FormatBool(instance.IsEnable()) - // tcp - tcp := fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort()) - // check - check := &consul.AgentServiceCheck{ - TCP: tcp, - //Interval: url.GetParam("consul-check-interval", "10s"), - //Timeout: url.GetParam("consul-check-timeout", "1s"), - //DeregisterCriticalServiceAfter: url.GetParam("consul-deregister-critical-service-after", "20s"), - } + check := csd.buildCheck(instance) return &consul.AgentServiceRegistration{ - ID: instance.GetId(), + ID: buildID(instance), Name: instance.GetServiceName(), Port: instance.GetPort(), Address: instance.GetHost(), Meta: metadata, - Check: check, + Check: &check, }, nil } + +func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck { + + deregister, ok := instance.GetMetadata()[DEREGISTER_AFTER] + if !ok || deregister == "" { + deregister = DEFAULT_DEREGISTER_TIME + } + return consul.AgentServiceCheck{ + TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", + DeregisterCriticalServiceAfter: deregister, + } +} + +func buildID(instance registry.ServiceInstance) string { + + metaBytes, _ := json.Marshal(instance.GetMetadata()) + id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%b,healthy:%b,meta:%s", instance.GetId(), instance.GetServiceName(), + instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes) + Md5Inst := md5.New() + Md5Inst.Write([]byte(id)) + return string(Md5Inst.Sum([]byte(""))) +} diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9f7bc54cce731a531807610597eb8f5ae00e2f27 --- /dev/null +++ b/registry/consul/service_discovery_test.go @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package consul + +import ( + "github.com/apache/dubbo-go/common" + "math/rand" + "strconv" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/registry" +) + +var ( + testName = "test" + registryURL = common.URL{ + Path: "", + Username: "", + Password: "", + Methods: nil, + SubURL: nil, + } +) + +func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) { + name := "consul1" + _, err := newConsulServiceDiscovery(name) + assert.NotNil(t, err) + + sdc := &config.ServiceDiscoveryConfig{ + Protocol: "consul", + RemoteRef: "mock", + } + + config.GetBaseConfig().ServiceDiscoveries[name] = sdc + + _, err = newConsulServiceDiscovery(name) + assert.NotNil(t, err) + + config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ + Address: "", // TODO + } + + res, err := newConsulServiceDiscovery(name) + assert.Nil(t, err) + assert.NotNil(t, res) +} + +func TestConsulServiceDiscovery_Destroy(t *testing.T) { + prepareData() + serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) + _, registryUrl := prepareService() + serviceDiscovery.Initialize(registryUrl) + assert.Nil(t, err) + assert.NotNil(t, serviceDiscovery) + err = serviceDiscovery.Destroy() + assert.Nil(t, err) + assert.Nil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient) +} + +func TestConsulServiceDiscovery_CRUD(t *testing.T) { + prepareData() + extension.SetEventDispatcher("mock", func() observer.EventDispatcher { + return &dispatcher.MockEventDispatcher{} + }) + + extension.SetAndInitGlobalDispatcher("mock") + rand.Seed(time.Now().Unix()) + + instance, registryUrl := prepareService() + + // clean data + serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName) + assert.Nil(t, err) + + err = serviceDiscovery.Initialize(registryUrl) + assert.Nil(t, err) + // clean data for local test + err = serviceDiscovery.Unregister(instance) + assert.Nil(t, err) + + err = serviceDiscovery.Register(instance) + assert.Nil(t, err) + + //sometimes nacos may be failed to push update of instance, + //so it need 10s to pull, we sleep 10 second to make sure instance has been update + time.Sleep(11 * time.Second) + page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true) + assert.NotNil(t, page) + assert.Equal(t, 0, page.GetOffset()) + assert.Equal(t, 10, page.GetPageSize()) + assert.Equal(t, 1, page.GetDataSize()) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + assert.NotNil(t, instance) + assert.Equal(t, buildID(instance), instance.GetId()) + assert.Equal(t, instance.GetHost(), instance.GetHost()) + assert.Equal(t, instance.GetPort(), instance.GetPort()) + assert.Equal(t, instance.GetServiceName(), instance.GetServiceName()) + assert.Equal(t, 0, len(instance.GetMetadata())) + + instance.GetMetadata()["a"] = "b" + err = serviceDiscovery.Update(instance) + assert.Nil(t, err) + + time.Sleep(11 * time.Second) + pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1) + assert.Equal(t, 1, len(pageMap)) + + page = pageMap[instance.GetServiceName()] + assert.NotNil(t, page) + assert.Equal(t, 1, len(page.GetData())) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + v, _ := instance.GetMetadata()["a"] + assert.Equal(t, "b", v) + + // test dispatcher event + err = serviceDiscovery.DispatchEventByServiceName(instance.GetServiceName()) + assert.Nil(t, err) + + // test AddListener + err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{}) + assert.Nil(t, err) +} + +func prepareData() { + config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ + Protocol: "consul", + RemoteRef: testName, + } + + config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ + Address: "", // TODO + TimeoutStr: "10s", + } +} +func prepareService() (registry.ServiceInstance, common.URL) { + serviceName := "service-name" + strconv.Itoa(rand.Intn(10000)) + id := "id" + host := "host" + port := 123 + + registryUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" + + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" + + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" + + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" + + "side=provider&timeout=3000×tamp=1556509797245&consul-check-pass-interval=17000&consul-deregister-critical-service-after=20s&" + + "consul-watch-timeout=60000") + + return ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: serviceName, + Host: host, + Port: port, + Enable: true, + Healthy: true, + Metadata: nil, + }, registryUrl +} diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go index f381ba70d64a4bbefcb09b48a11e52eec951f5ec..010adff594d45cc219cc44ca44a456c43c0f8e82 100644 --- a/registry/etcdv3/service_discovery.go +++ b/registry/etcdv3/service_discovery.go @@ -31,6 +31,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -66,6 +67,10 @@ type etcdV3ServiceDiscovery struct { childListenerMap map[string]*etcdv3.EventListener } +func (e *etcdV3ServiceDiscovery) Initialize(registryURL common.URL) error { + return nil +} + // basic information of this instance func (e *etcdV3ServiceDiscovery) String() string { return e.descriptor diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index 3ee2f4a44946065cdf7489abc391df41f251d810..c1c9822d7c2bd09d4b23be17fbbf0c1364307583 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -18,6 +18,10 @@ package event import ( + "sync" +) +import ( + "github.com/apache/dubbo-go/common" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) @@ -34,6 +38,7 @@ import ( // Publish some event about service discovery type EventPublishingServiceDiscovery struct { serviceDiscovery registry.ServiceDiscovery + once sync.Once } // NewEventPublishingServiceDiscovery is a constructor @@ -48,6 +53,14 @@ func (epsd *EventPublishingServiceDiscovery) String() string { return epsd.serviceDiscovery.String() } +func (epsd *EventPublishingServiceDiscovery) Initialize(registryURL common.URL) error { + var err error + epsd.once.Do(func() { + err = epsd.serviceDiscovery.Initialize(registryURL) + }) + return err +} + // Destroy delegate function func (epsd *EventPublishingServiceDiscovery) Destroy() error { f := func() error { diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 63d92d70fd5e1a00f0ce1ca95b1926fb9c36c84b..78c6c4c4fc7fdd8c49ce305776d2ea0f123df7a4 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -32,6 +32,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -62,6 +63,10 @@ type nacosServiceDiscovery struct { namingClient naming_client.INamingClient } +func (n *nacosServiceDiscovery) Initialize(registryURL common.URL) error { + return nil +} + // Destroy will close the service discovery. // Actually, it only marks the naming client as null and then return func (n *nacosServiceDiscovery) Destroy() error { diff --git a/registry/service_discovery.go b/registry/service_discovery.go index cb7a3c0182ff88995ab9dd6c920523225c3cb36c..62ad53d9e1ec952bfecba7f1cfc7f3479e905ba8 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -22,6 +22,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/page" ) @@ -34,6 +35,12 @@ type ServiceDiscovery interface { // ----------------- lifecycle ------------------- + /** + * Initializes the ServiceDiscovery + * + */ + Initialize(registryURL common.URL) error + // Destroy will destroy the service discovery. // If the discovery cannot be destroy, it will return an error. Destroy() error diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 061d832b0328a5e1754c7804bf40cf83ac216a8b..a1d3f120c11d5b2f6fd2139d3d435381a12eafb6 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -125,7 +125,9 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { if err != nil { return nil, perrors.WithMessage(err, "Create service discovery fialed") } - return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil + serviceDiscovery := event.NewEventPublishingServiceDiscovery(originServiceDiscovery) + serviceDiscovery.Initialize(*url) + return serviceDiscovery, nil } func parseServices(literalServices string) *gxset.HashSet { diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index 53eb86507e635be32eb362519922f7042f945519..c1ca42341995931021dd846aa5c5d5b48e5b9de6 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -126,6 +126,10 @@ func (m *mockServiceNameMapping) Get(serviceInterface string, group string, vers type mockServiceDiscovery struct { } +func (m *mockServiceDiscovery) Initialize(registryURL common.URL) error { + panic("implement me") +} + func (m *mockServiceDiscovery) String() string { panic("implement me") } diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index 5ad83ef90947afc0a5ca75af5009e8b55b4f6627..314f56bdecec14390d8033c0c045a27b0076c488 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -163,6 +163,10 @@ func (zksd *zookeeperServiceDiscovery) String() string { return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url) } +func (zksd *zookeeperServiceDiscovery) Initialize(registryURL common.URL) error { + return nil +} + // Close client be closed func (zksd *zookeeperServiceDiscovery) Destroy() error { zksd.client.Close()