Skip to content
Snippets Groups Projects
Commit 0520f484 authored by shen's avatar shen
Browse files

consul service discovery (not test yet)

parent e0d492d3
No related branches found
No related tags found
No related merge requests found
......@@ -18,21 +18,26 @@
package consul
import (
"crypto/md5"
"encoding/json"
"fmt"
"github.com/hashicorp/consul/api/watch"
"github.com/hashicorp/go-hclog"
"strconv"
"strings"
"sync"
"time"
)
import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/hashicorp/go-hclog"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
......@@ -45,6 +50,19 @@ const (
Enable = "enable"
)
const (
CHECK_PASS_INTERVAL = "consul-check-pass-interval"
// default time-to-live in millisecond
DEFAULT_CHECK_PASS_INTERVAL = 16000
UERY_TAG = "consul_query_tag"
ACL_TOKEN = "acl-token"
// default deregister critical server after
DEFAULT_DEREGISTER_TIME = "20s"
DEFAULT_WATCH_TIMEOUT = 60 * 1000
WATCH_TIMEOUT = "consul-watch-timeout"
DEREGISTER_AFTER = "consul-deregister-critical-service-after"
)
var (
// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
instanceMap = make(map[string]registry.ServiceDiscovery, 16)
......@@ -84,12 +102,6 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
}
config := &consul.Config{Address: remoteConfig.Address}
client, err := consul.NewClient(config)
if err != nil {
return nil, perrors.WithMessage(err, "create consul client failed.")
}
descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address)
pageSize := 20
......@@ -102,9 +114,9 @@ func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
}
}
return &consulServiceDiscovery{
consulClient: client,
descriptor: descriptor,
PageSize: pageSize,
address: remoteConfig.Address,
descriptor: descriptor,
PageSize: pageSize,
}, nil
}
......@@ -116,10 +128,29 @@ type consulServiceDiscovery struct {
// descriptor is a short string about the basic information of this instance
descriptor string
// Consul client.
consulClient *consul.Client
PageSize int
consulClient *consul.Client
PageSize int
serviceUrl common.URL
checkPassInterval int64
tag string
tags []string
address string
}
func (csd consulServiceDiscovery) Initialize(registryURL common.URL) error {
csd.serviceUrl = registryURL
csd.checkPassInterval = registryURL.GetParamInt(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL)
csd.tag = registryURL.GetParam(UERY_TAG, "")
csd.tags = strings.Split(registryURL.GetParam("tags", ""), ",")
aclToken := registryURL.GetParam(ACL_TOKEN, "")
config := &consul.Config{Address: csd.address, Token: aclToken}
client, err := consul.NewClient(config)
if err != nil {
return perrors.WithMessage(err, "create consul client failed.")
}
csd.consulClient = client
return nil
}
func (csd consulServiceDiscovery) String() string {
return csd.descriptor
}
......@@ -130,11 +161,8 @@ func (csd consulServiceDiscovery) Destroy() error {
}
func (csd consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
ins, err := csd.buildRegisterInstance(instance)
if err != nil {
panic(err)
}
err = csd.consulClient.Agent().ServiceRegister(ins)
ins, _ := csd.buildRegisterInstance(instance)
err := csd.consulClient.Agent().ServiceRegister(ins)
if err != nil {
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
}
......@@ -151,7 +179,7 @@ func (csd consulServiceDiscovery) Update(instance registry.ServiceInstance) erro
}
func (csd consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
return csd.consulClient.Agent().ServiceDeregister(instance.GetId())
return csd.consulClient.Agent().ServiceDeregister(buildID(instance))
}
func (csd consulServiceDiscovery) GetDefaultPageSize() int {
......@@ -161,20 +189,24 @@ func (csd consulServiceDiscovery) GetDefaultPageSize() int {
func (csd consulServiceDiscovery) GetServices() *gxset.HashSet {
var res = gxset.NewSet()
services, err := csd.consulClient.Agent().Services()
services, _, err := csd.consulClient.Catalog().Services(nil)
if err != nil {
return res
}
for _, service := range services {
res.Add(service.Service)
for service, _ := range services {
res.Add(service)
}
return res
}
func (csd consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
_, instances, err := csd.consulClient.Agent().AgentHealthServiceByName(serviceName)
waitTime := csd.serviceUrl.GetParamInt(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000
instances, _, err := csd.consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(waitTime),
WaitIndex: -1,
})
if err != nil {
return nil
}
......@@ -249,6 +281,7 @@ func (csd consulServiceDiscovery) AddListener(listener *registry.ServiceInstance
params := make(map[string]interface{}, 8)
params["type"] = "service"
params["service"] = listener.ServiceName
params["passingonly"] = true
//params["tag"] = "dubbo"
//params["passingonly"] = true
plan, err := watch.Parse(params)
......@@ -327,23 +360,37 @@ func (csd consulServiceDiscovery) buildRegisterInstance(instance registry.Servic
}
metadata[Enable] = strconv.FormatBool(instance.IsEnable())
// tcp
tcp := fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort())
// check
check := &consul.AgentServiceCheck{
TCP: tcp,
//Interval: url.GetParam("consul-check-interval", "10s"),
//Timeout: url.GetParam("consul-check-timeout", "1s"),
//DeregisterCriticalServiceAfter: url.GetParam("consul-deregister-critical-service-after", "20s"),
}
check := csd.buildCheck(instance)
return &consul.AgentServiceRegistration{
ID: instance.GetId(),
ID: buildID(instance),
Name: instance.GetServiceName(),
Port: instance.GetPort(),
Address: instance.GetHost(),
Meta: metadata,
Check: check,
Check: &check,
}, nil
}
func (csd consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck {
deregister, ok := instance.GetMetadata()[DEREGISTER_AFTER]
if !ok || deregister == "" {
deregister = DEFAULT_DEREGISTER_TIME
}
return consul.AgentServiceCheck{
TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s",
DeregisterCriticalServiceAfter: deregister,
}
}
func buildID(instance registry.ServiceInstance) string {
metaBytes, _ := json.Marshal(instance.GetMetadata())
id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d,enable:%b,healthy:%b,meta:%s", instance.GetId(), instance.GetServiceName(),
instance.GetHost(), instance.GetPort(), instance.IsEnable(), instance.IsHealthy(), metaBytes)
Md5Inst := md5.New()
Md5Inst.Write([]byte(id))
return string(Md5Inst.Sum([]byte("")))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"github.com/apache/dubbo-go/common"
"math/rand"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
var (
testName = "test"
registryURL = common.URL{
Path: "",
Username: "",
Password: "",
Methods: nil,
SubURL: nil,
}
)
func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) {
name := "consul1"
_, err := newConsulServiceDiscovery(name)
assert.NotNil(t, err)
sdc := &config.ServiceDiscoveryConfig{
Protocol: "consul",
RemoteRef: "mock",
}
config.GetBaseConfig().ServiceDiscoveries[name] = sdc
_, err = newConsulServiceDiscovery(name)
assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
Address: "", // TODO
}
res, err := newConsulServiceDiscovery(name)
assert.Nil(t, err)
assert.NotNil(t, res)
}
func TestConsulServiceDiscovery_Destroy(t *testing.T) {
prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName)
_, registryUrl := prepareService()
serviceDiscovery.Initialize(registryUrl)
assert.Nil(t, err)
assert.NotNil(t, serviceDiscovery)
err = serviceDiscovery.Destroy()
assert.Nil(t, err)
assert.Nil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient)
}
func TestConsulServiceDiscovery_CRUD(t *testing.T) {
prepareData()
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
})
extension.SetAndInitGlobalDispatcher("mock")
rand.Seed(time.Now().Unix())
instance, registryUrl := prepareService()
// clean data
serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName)
assert.Nil(t, err)
err = serviceDiscovery.Initialize(registryUrl)
assert.Nil(t, err)
// clean data for local test
err = serviceDiscovery.Unregister(instance)
assert.Nil(t, err)
err = serviceDiscovery.Register(instance)
assert.Nil(t, err)
//sometimes nacos may be failed to push update of instance,
//so it need 10s to pull, we sleep 10 second to make sure instance has been update
time.Sleep(11 * time.Second)
page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
assert.Equal(t, 10, page.GetPageSize())
assert.Equal(t, 1, page.GetDataSize())
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
assert.NotNil(t, instance)
assert.Equal(t, buildID(instance), instance.GetId())
assert.Equal(t, instance.GetHost(), instance.GetHost())
assert.Equal(t, instance.GetPort(), instance.GetPort())
assert.Equal(t, instance.GetServiceName(), instance.GetServiceName())
assert.Equal(t, 0, len(instance.GetMetadata()))
instance.GetMetadata()["a"] = "b"
err = serviceDiscovery.Update(instance)
assert.Nil(t, err)
time.Sleep(11 * time.Second)
pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1)
assert.Equal(t, 1, len(pageMap))
page = pageMap[instance.GetServiceName()]
assert.NotNil(t, page)
assert.Equal(t, 1, len(page.GetData()))
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
v, _ := instance.GetMetadata()["a"]
assert.Equal(t, "b", v)
// test dispatcher event
err = serviceDiscovery.DispatchEventByServiceName(instance.GetServiceName())
assert.Nil(t, err)
// test AddListener
err = serviceDiscovery.AddListener(&registry.ServiceInstancesChangedListener{})
assert.Nil(t, err)
}
func prepareData() {
config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "consul",
RemoteRef: testName,
}
config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
Address: "", // TODO
TimeoutStr: "10s",
}
}
func prepareService() (registry.ServiceInstance, common.URL) {
serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
id := "id"
host := "host"
port := 123
registryUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&consul-check-pass-interval=17000&consul-deregister-critical-service-after=20s&" +
"consul-watch-timeout=60000")
return &registry.DefaultServiceInstance{
Id: id,
ServiceName: serviceName,
Host: host,
Port: port,
Enable: true,
Healthy: true,
Metadata: nil,
}, registryUrl
}
......@@ -31,6 +31,7 @@ import (
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
......@@ -66,6 +67,10 @@ type etcdV3ServiceDiscovery struct {
childListenerMap map[string]*etcdv3.EventListener
}
func (e *etcdV3ServiceDiscovery) Initialize(registryURL common.URL) error {
return nil
}
// basic information of this instance
func (e *etcdV3ServiceDiscovery) String() string {
return e.descriptor
......
......@@ -18,6 +18,10 @@
package event
import (
"sync"
)
import (
"github.com/apache/dubbo-go/common"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
)
......@@ -34,6 +38,7 @@ import (
// Publish some event about service discovery
type EventPublishingServiceDiscovery struct {
serviceDiscovery registry.ServiceDiscovery
once sync.Once
}
// NewEventPublishingServiceDiscovery is a constructor
......@@ -48,6 +53,14 @@ func (epsd *EventPublishingServiceDiscovery) String() string {
return epsd.serviceDiscovery.String()
}
func (epsd *EventPublishingServiceDiscovery) Initialize(registryURL common.URL) error {
var err error
epsd.once.Do(func() {
err = epsd.serviceDiscovery.Initialize(registryURL)
})
return err
}
// Destroy delegate function
func (epsd *EventPublishingServiceDiscovery) Destroy() error {
f := func() error {
......
......@@ -32,6 +32,7 @@ import (
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
......@@ -62,6 +63,10 @@ type nacosServiceDiscovery struct {
namingClient naming_client.INamingClient
}
func (n *nacosServiceDiscovery) Initialize(registryURL common.URL) error {
return nil
}
// Destroy will close the service discovery.
// Actually, it only marks the naming client as null and then return
func (n *nacosServiceDiscovery) Destroy() error {
......
......@@ -22,6 +22,7 @@ import (
)
import (
"github.com/apache/dubbo-go/common"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
)
......@@ -34,6 +35,12 @@ type ServiceDiscovery interface {
// ----------------- lifecycle -------------------
/**
* Initializes the ServiceDiscovery
*
*/
Initialize(registryURL common.URL) error
// Destroy will destroy the service discovery.
// If the discovery cannot be destroy, it will return an error.
Destroy() error
......
......@@ -125,7 +125,9 @@ func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
if err != nil {
return nil, perrors.WithMessage(err, "Create service discovery fialed")
}
return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
serviceDiscovery := event.NewEventPublishingServiceDiscovery(originServiceDiscovery)
serviceDiscovery.Initialize(*url)
return serviceDiscovery, nil
}
func parseServices(literalServices string) *gxset.HashSet {
......
......@@ -126,6 +126,10 @@ func (m *mockServiceNameMapping) Get(serviceInterface string, group string, vers
type mockServiceDiscovery struct {
}
func (m *mockServiceDiscovery) Initialize(registryURL common.URL) error {
panic("implement me")
}
func (m *mockServiceDiscovery) String() string {
panic("implement me")
}
......
......@@ -163,6 +163,10 @@ func (zksd *zookeeperServiceDiscovery) String() string {
return fmt.Sprintf("zookeeper-service-discovery[%s]", zksd.url)
}
func (zksd *zookeeperServiceDiscovery) Initialize(registryURL common.URL) error {
return nil
}
// Close client be closed
func (zksd *zookeeperServiceDiscovery) Destroy() error {
zksd.client.Close()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment