diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index 9e19cfb4a7b466d7dc5012bb1831493e9b77948f..a41b03c8d69d7b38feccebf1c2afd461582e2921 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -18,8 +18,6 @@ package consul import ( - "crypto/md5" - "encoding/json" "fmt" "strconv" "strings" @@ -159,6 +157,10 @@ func (csd consulServiceDiscovery) String() string { func (csd consulServiceDiscovery) Destroy() error { csd.consulClient = nil + for _, t := range csd.ttl { + close(t) + } + csd.ttl = nil return nil } @@ -188,7 +190,12 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance select { case <-timer.C: timer.Reset(period) - csd.consulClient.Agent().PassTTL(checkID, "") + err := csd.consulClient.Agent().PassTTL(checkID, "") + if err != nil { + logger.Warnf("pass ttl heartbeat fail:%v", err) + break + } + logger.Debugf("passed ttl heartbeat for %s", checkID) break case <-stopChan: logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName()) @@ -200,15 +207,26 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance } func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error { - ins, err := csd.buildRegisterInstance(instance) + ins, _ := csd.buildRegisterInstance(instance) + err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) if err != nil { - panic(err) + logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err) } - return csd.consulClient.Agent().ServiceRegisterOpts(ins, consul.ServiceRegisterOpts{ReplaceExistingChecks: true}) + err = csd.consulClient.Agent().ServiceRegister(ins) + return err } func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - return csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + err := csd.consulClient.Agent().ServiceDeregister(buildID(instance)) + if err != nil { + return err + } + stopChanel, ok := csd.ttl[buildID(instance)] + if ok { + close(stopChanel) + delete(csd.ttl, buildID(instance)) + } + return nil } func (csd consulServiceDiscovery) GetDefaultPageSize() int { @@ -233,7 +251,7 @@ func (csd consulServiceDiscovery) GetServices() *gxset.HashSet { func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000 instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{ - WaitTime: time.Duration(waitTime), + WaitTime: time.Duration(waitTime), }) if err != nil { return nil @@ -352,19 +370,17 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance Metadata: metadata, }) } - if len(instances) < 1 { - return - } e := csd.DispatchEventForInstances(listener.ServiceName, instances) if e != nil { logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err) } } - err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger) - if err != nil { - logger.Error("consul plan run failure!error:%v", err) - return err - } + go func() { + err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger) + if err != nil { + logger.Error("consul plan run failure!error:%v", err) + } + }() return nil } @@ -408,6 +424,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) deregister = DEFAULT_DEREGISTER_TIME } return consul.AgentServiceCheck{ + CheckID: buildID(instance), TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s", DeregisterCriticalServiceAfter: deregister, } @@ -415,10 +432,8 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) func buildID(instance registry.ServiceInstance) string { - metaBytes, _ := json.Marshal(instance.GetMetadata()) - id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(), - instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes) - Md5Inst := md5.New() - Md5Inst.Write([]byte(id)) - return string(Md5Inst.Sum([]byte(""))) + id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort()) + //Md5Inst := md5.New() + //Md5Inst.Write([]byte(id)) + return id } diff --git a/registry/consul/service_discovery_test.go b/registry/consul/service_discovery_test.go index 3ee71e17a1e21a90299dceefdce83b4c5ab9ca4a..1410326d51301f02cb519c805ba602fe50049069 100644 --- a/registry/consul/service_discovery_test.go +++ b/registry/consul/service_discovery_test.go @@ -35,7 +35,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" - "github.com/apache/dubbo-go/common/observer/dispatcher" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/registry" ) @@ -96,8 +95,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { defer consulAgent.Shutdown() prepareData() + var eventDispatcher = MockEventDispatcher{Notify: make(chan struct{}, 1)} extension.SetEventDispatcher("mock", func() observer.EventDispatcher { - return &dispatcher.MockEventDispatcher{} + return &eventDispatcher }) extension.SetAndInitGlobalDispatcher("mock") @@ -113,33 +113,33 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { assert.Nil(t, err) err = serviceDiscovery.Unregister(instance) - assert.NotNil(t, err) + assert.Nil(t, err) err = serviceDiscovery.Register(instance) assert.Nil(t, err) //sometimes nacos may be failed to push update of instance, //so it need 10s to pull, we sleep 10 second to make sure instance has been update - time.Sleep(11 * time.Second) + time.Sleep(3 * time.Second) page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true) assert.NotNil(t, page) assert.Equal(t, 0, page.GetOffset()) assert.Equal(t, 10, page.GetPageSize()) assert.Equal(t, 1, page.GetDataSize()) - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - assert.NotNil(t, instance) - assert.Equal(t, buildID(instance), instance.GetId()) - assert.Equal(t, instance.GetHost(), instance.GetHost()) - assert.Equal(t, instance.GetPort(), instance.GetPort()) - assert.Equal(t, instance.GetServiceName(), instance.GetServiceName()) - assert.Equal(t, 0, len(instance.GetMetadata())) + instanceResult := page.GetData()[0].(*registry.DefaultServiceInstance) + assert.NotNil(t, instanceResult) + assert.Equal(t, buildID(instance), instanceResult.GetId()) + assert.Equal(t, instance.GetHost(), instanceResult.GetHost()) + assert.Equal(t, instance.GetPort(), instanceResult.GetPort()) + assert.Equal(t, instance.GetServiceName(), instanceResult.GetServiceName()) + assert.Equal(t, 0, len(instanceResult.GetMetadata())) - instance.GetMetadata()["a"] = "b" + instance.GetMetadata()["aaa"] = "bbb" err = serviceDiscovery.Update(instance) assert.Nil(t, err) - time.Sleep(11 * time.Second) + time.Sleep(3 * time.Second) pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1) assert.Equal(t, 1, len(pageMap)) @@ -147,17 +147,28 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) { assert.NotNil(t, page) assert.Equal(t, 1, len(page.GetData())) - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - v, _ := instance.GetMetadata()["a"] - assert.Equal(t, "b", v) + instanceResult = page.GetData()[0].(*registry.DefaultServiceInstance) + v, _ := instanceResult.GetMetadata()["aaa"] + assert.Equal(t, "bbb", v) // test dispatcher event - err = serviceDiscovery.DispatchEventByServiceName(instance.GetServiceName()) - assert.Nil(t, err) + //err = serviceDiscovery.DispatchEventByServiceName(instanceResult.GetServiceName()) + //assert.Nil(t, err) // test AddListener - err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{}) + err = serviceDiscovery.AddListener(®istry.ServiceInstancesChangedListener{ServiceName: instance.GetServiceName()}) assert.Nil(t, err) + err = serviceDiscovery.Unregister(instance) + assert.Nil(t, err) + timer := time.NewTimer(time.Second * 10) + select { + case <-eventDispatcher.Notify: + assert.NotNil(t, eventDispatcher.Event) + break + case <-timer.C: + assert.Fail(t, "") + break + } } func prepareData() { @@ -190,3 +201,39 @@ func prepareService() (registry.ServiceInstance, common.URL) { Metadata: nil, }, registryUrl } + +type MockEventDispatcher struct { + Notify chan struct{} + Event observer.Event +} + +// AddEventListener do nothing +func (m MockEventDispatcher) AddEventListener(listener observer.EventListener) { +} + +// AddEventListeners do nothing +func (m MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) { +} + +// RemoveEventListener do nothing +func (m MockEventDispatcher) RemoveEventListener(listener observer.EventListener) { +} + +// RemoveEventListeners do nothing +func (m MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) { +} + +// GetAllEventListeners return empty list +func (m MockEventDispatcher) GetAllEventListeners() []observer.EventListener { + return make([]observer.EventListener, 0) +} + +// RemoveAllEventListeners do nothing +func (m MockEventDispatcher) RemoveAllEventListeners() { +} + +// Dispatch do nothing +func (m *MockEventDispatcher) Dispatch(event observer.Event) { + m.Event = event + m.Notify <- struct{}{} +}