Skip to content
Snippets Groups Projects
Commit 71cc6333 authored by shen's avatar shen
Browse files

consul service discovery unit test

parent bea99263
No related branches found
No related tags found
No related merge requests found
......@@ -18,8 +18,6 @@
package consul
import (
"crypto/md5"
"encoding/json"
"fmt"
"strconv"
"strings"
......@@ -159,6 +157,10 @@ func (csd consulServiceDiscovery) String() string {
func (csd consulServiceDiscovery) Destroy() error {
csd.consulClient = nil
for _, t := range csd.ttl {
close(t)
}
csd.ttl = nil
return nil
}
......@@ -188,7 +190,12 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
select {
case <-timer.C:
timer.Reset(period)
csd.consulClient.Agent().PassTTL(checkID, "")
err := csd.consulClient.Agent().PassTTL(checkID, "")
if err != nil {
logger.Warnf("pass ttl heartbeat fail:%v", err)
break
}
logger.Debugf("passed ttl heartbeat for %s", checkID)
break
case <-stopChan:
logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
......@@ -200,15 +207,26 @@ func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance
}
func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
ins, err := csd.buildRegisterInstance(instance)
ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
if err != nil {
panic(err)
logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)
}
return csd.consulClient.Agent().ServiceRegisterOpts(ins, consul.ServiceRegisterOpts{ReplaceExistingChecks: true})
err = csd.consulClient.Agent().ServiceRegister(ins)
return err
}
func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
return csd.consulClient.Agent().ServiceDeregister(buildID(instance))
err := csd.consulClient.Agent().ServiceDeregister(buildID(instance))
if err != nil {
return err
}
stopChanel, ok := csd.ttl[buildID(instance)]
if ok {
close(stopChanel)
delete(csd.ttl, buildID(instance))
}
return nil
}
func (csd consulServiceDiscovery) GetDefaultPageSize() int {
......@@ -233,7 +251,7 @@ func (csd consulServiceDiscovery) GetServices() *gxset.HashSet {
func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(waitTime),
WaitTime: time.Duration(waitTime),
})
if err != nil {
return nil
......@@ -352,19 +370,17 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance
Metadata: metadata,
})
}
if len(instances) < 1 {
return
}
e := csd.DispatchEventForInstances(listener.ServiceName, instances)
if e != nil {
logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err)
}
}
err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger)
if err != nil {
logger.Error("consul plan run failure!error:%v", err)
return err
}
go func() {
err = plan.RunWithClientAndHclog(csd.consulClient, hcLogger)
if err != nil {
logger.Error("consul plan run failure!error:%v", err)
}
}()
return nil
}
......@@ -408,6 +424,7 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
deregister = DEFAULT_DEREGISTER_TIME
}
return consul.AgentServiceCheck{
CheckID: buildID(instance),
TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s",
DeregisterCriticalServiceAfter: deregister,
}
......@@ -415,10 +432,8 @@ func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance)
func buildID(instance registry.ServiceInstance) string {
metaBytes, _ := json.Marshal(instance.GetMetadata())
id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%t,healthy:%t,meta:%s", instance.GetId(), instance.GetServiceName(),
instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes)
Md5Inst := md5.New()
Md5Inst.Write([]byte(id))
return string(Md5Inst.Sum([]byte("")))
id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort())
//Md5Inst := md5.New()
//Md5Inst.Write([]byte(id))
return id
}
......@@ -35,7 +35,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
......@@ -96,8 +95,9 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
defer consulAgent.Shutdown()
prepareData()
var eventDispatcher = MockEventDispatcher{Notify: make(chan struct{}, 1)}
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
return &eventDispatcher
})
extension.SetAndInitGlobalDispatcher("mock")
......@@ -113,33 +113,33 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
assert.Nil(t, err)
err = serviceDiscovery.Unregister(instance)
assert.NotNil(t, err)
assert.Nil(t, err)
err = serviceDiscovery.Register(instance)
assert.Nil(t, err)
//sometimes nacos may be failed to push update of instance,
//so it need 10s to pull, we sleep 10 second to make sure instance has been update
time.Sleep(11 * time.Second)
time.Sleep(3 * time.Second)
page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
assert.Equal(t, 10, page.GetPageSize())
assert.Equal(t, 1, page.GetDataSize())
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
assert.NotNil(t, instance)
assert.Equal(t, buildID(instance), instance.GetId())
assert.Equal(t, instance.GetHost(), instance.GetHost())
assert.Equal(t, instance.GetPort(), instance.GetPort())
assert.Equal(t, instance.GetServiceName(), instance.GetServiceName())
assert.Equal(t, 0, len(instance.GetMetadata()))
instanceResult := page.GetData()[0].(*registry.DefaultServiceInstance)
assert.NotNil(t, instanceResult)
assert.Equal(t, buildID(instance), instanceResult.GetId())
assert.Equal(t, instance.GetHost(), instanceResult.GetHost())
assert.Equal(t, instance.GetPort(), instanceResult.GetPort())
assert.Equal(t, instance.GetServiceName(), instanceResult.GetServiceName())
assert.Equal(t, 0, len(instanceResult.GetMetadata()))
instance.GetMetadata()["a"] = "b"
instance.GetMetadata()["aaa"] = "bbb"
err = serviceDiscovery.Update(instance)
assert.Nil(t, err)
time.Sleep(11 * time.Second)
time.Sleep(3 * time.Second)
pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1)
assert.Equal(t, 1, len(pageMap))
......@@ -147,17 +147,28 @@ func TestConsulServiceDiscovery_CRUD(t *testing.T) {
assert.NotNil(t, page)
assert.Equal(t, 1, len(page.GetData()))
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
v, _ := instance.GetMetadata()["a"]
assert.Equal(t, "b", v)
instanceResult = page.GetData()[0].(*registry.DefaultServiceInstance)
v, _ := instanceResult.GetMetadata()["aaa"]
assert.Equal(t, "bbb", v)
// test dispatcher event
err = serviceDiscovery.DispatchEventByServiceName(instance.GetServiceName())
assert.Nil(t, err)
//err = serviceDiscovery.DispatchEventByServiceName(instanceResult.GetServiceName())
//assert.Nil(t, err)
// test AddListener
err = serviceDiscovery.AddListener(&registry.ServiceInstancesChangedListener{})
err = serviceDiscovery.AddListener(&registry.ServiceInstancesChangedListener{ServiceName: instance.GetServiceName()})
assert.Nil(t, err)
err = serviceDiscovery.Unregister(instance)
assert.Nil(t, err)
timer := time.NewTimer(time.Second * 10)
select {
case <-eventDispatcher.Notify:
assert.NotNil(t, eventDispatcher.Event)
break
case <-timer.C:
assert.Fail(t, "")
break
}
}
func prepareData() {
......@@ -190,3 +201,39 @@ func prepareService() (registry.ServiceInstance, common.URL) {
Metadata: nil,
}, registryUrl
}
type MockEventDispatcher struct {
Notify chan struct{}
Event observer.Event
}
// AddEventListener do nothing
func (m MockEventDispatcher) AddEventListener(listener observer.EventListener) {
}
// AddEventListeners do nothing
func (m MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) {
}
// RemoveEventListener do nothing
func (m MockEventDispatcher) RemoveEventListener(listener observer.EventListener) {
}
// RemoveEventListeners do nothing
func (m MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) {
}
// GetAllEventListeners return empty list
func (m MockEventDispatcher) GetAllEventListeners() []observer.EventListener {
return make([]observer.EventListener, 0)
}
// RemoveAllEventListeners do nothing
func (m MockEventDispatcher) RemoveAllEventListeners() {
}
// Dispatch do nothing
func (m *MockEventDispatcher) Dispatch(event observer.Event) {
m.Event = event
m.Notify <- struct{}{}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment