Skip to content
Snippets Groups Projects
Commit 2e0d7102 authored by Ming Deng's avatar Ming Deng Committed by GitHub
Browse files

Merge pull request #461 from flycash/nacos-discovery

Nacos discovery support
parents 0c87c6d1 b2fedc1c
No related branches found
No related tags found
No related merge requests found
......@@ -241,3 +241,9 @@ const (
// The default time window of circuit-tripped in millisecond if not specfied
MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000
)
// service discovery
const (
NACOS_GROUP = "nacos.group"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
)
var (
discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4)
)
// SetServiceDiscovery will store the creator and name
func SetServiceDiscovery(name string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) {
discoveryCreatorMap[name] = creator
}
// GetServiceDiscovery will return the registry.ServiceDiscovery
// if not found, or initialize instance failed, it will return error.
func GetServiceDiscovery(name string, url *common.URL) (registry.ServiceDiscovery, error) {
creator, ok := discoveryCreatorMap[name]
if !ok {
return nil, perrors.New("Could not find the service discovery with name: " + name)
}
return creator(url)
}
......@@ -14,7 +14,7 @@ require (
github.com/creasty/defaults v1.3.0
github.com/dubbogo/getty v1.3.3
github.com/dubbogo/go-zookeeper v1.0.0
github.com/dubbogo/gost v1.7.0
github.com/dubbogo/gost v1.8.0
github.com/emicklei/go-restful/v3 v3.0.0
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-errors/errors v1.0.1 // indirect
......@@ -38,7 +38,7 @@ require (
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
......
......@@ -115,6 +115,8 @@ github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.7.0 h1:lWNBIE2hk1Aj2be2uXkyRTpZG0RQZj0/xbXnkIq6EHE=
github.com/dubbogo/gost v1.7.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.8.0 h1:9ACbQe5OwMjqtinQcNJC5xp16kky27OsfSGw5L9A6vw=
github.com/dubbogo/gost v1.8.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M=
github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y=
......@@ -385,6 +387,8 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo=
github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s=
github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk=
github.com/oklog/run v0.0.0-20180308005104-6934b124db28 h1:Hbr3fbVPXea52oPQeP7KLSxP52g6SFaNY1IqAmUyEW0=
......
......@@ -42,6 +42,7 @@ type ServiceEvent struct {
Service common.URL
}
// String return the description of event
func (e ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service)
}
......@@ -86,6 +87,24 @@ func newBaseEvent(source interface{}) *baseEvent {
// ServiceInstancesChangedEvent represents service instances make some changing
type ServiceInstancesChangedEvent struct {
fmt.Stringer
baseEvent
ServiceName string
Instances []ServiceInstance
}
// String return the description of the event
func (s *ServiceInstancesChangedEvent) String() string {
return fmt.Sprintf("ServiceInstancesChangedEvent[source=%s]", s.ServiceName)
}
// NewServiceInstancesChangedEvent will create the ServiceInstanceChangedEvent instance
func NewServiceInstancesChangedEvent(serviceName string, instances []ServiceInstance) *ServiceInstancesChangedEvent {
return &ServiceInstancesChangedEvent{
baseEvent: baseEvent{
source: serviceName,
timestamp: time.Now(),
},
ServiceName: serviceName,
Instances: instances,
}
}
......@@ -38,4 +38,5 @@ type ConditionalEventListener interface {
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"net"
"strconv"
"strings"
"time"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
// baseRegistry is the parent of both interface-level registry
// and service discovery(related to application-level registry)
type nacosBaseRegistry struct {
*common.URL
namingClient naming_client.INamingClient
}
// newBaseRegistry will create new instance
func newBaseRegistry(url *common.URL) (nacosBaseRegistry, error) {
nacosConfig, err := getNacosConfig(url)
if err != nil {
return nacosBaseRegistry{}, err
}
client, err := clients.CreateNamingClient(nacosConfig)
if err != nil {
return nacosBaseRegistry{}, err
}
registry := nacosBaseRegistry{
URL: url,
namingClient: client,
}
return registry, nil
}
// getNacosConfig will return the nacos config
func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if url == nil {
return nil, perrors.New("url is empty!")
}
if len(url.Location) == 0 {
return nil, perrors.New("url.location is empty!")
}
configMap := make(map[string]interface{}, 2)
addresses := strings.Split(url.Location, ",")
serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
})
}
configMap["serverConfigs"] = serverConfigs
var clientConfig nacosConstant.ClientConfig
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
return nil, err
}
clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NotLoadCacheAtStart = true
configMap["clientConfig"] = clientConfig
return configMap, nil
}
......@@ -19,7 +19,6 @@ package nacos
import (
"bytes"
"net"
"strconv"
"strings"
"time"
......@@ -27,9 +26,6 @@ import (
import (
gxnet "github.com/dubbogo/gost/net"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
......@@ -57,64 +53,18 @@ func init() {
}
type nacosRegistry struct {
*common.URL
namingClient naming_client.INamingClient
}
func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if url == nil {
return nil, perrors.New("url is empty!")
}
if len(url.Location) == 0 {
return nil, perrors.New("url.location is empty!")
}
configMap := make(map[string]interface{}, 2)
addresses := strings.Split(url.Location, ",")
serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
})
}
configMap["serverConfigs"] = serverConfigs
var clientConfig nacosConstant.ClientConfig
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
return nil, err
}
clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NotLoadCacheAtStart = true
configMap["clientConfig"] = clientConfig
return configMap, nil
nacosBaseRegistry
}
// newNacosRegistry will create an instance
func newNacosRegistry(url *common.URL) (registry.Registry, error) {
nacosConfig, err := getNacosConfig(url)
base, err := newBaseRegistry(url)
if err != nil {
return nil, err
}
client, err := clients.CreateNamingClient(nacosConfig)
if err != nil {
return nil, err
}
registry := nacosRegistry{
URL: url,
namingClient: client,
return nil, perrors.WithStack(err)
}
return &registry, nil
return &nacosRegistry{
base,
}, nil
}
func getCategory(url common.URL) string {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
)
const (
defaultGroup = "DEFAULT_GROUP"
idKey = "id"
)
// init will put the service discovery into extension
func init() {
extension.SetServiceDiscovery(constant.NACOS_KEY, newNacosServiceDiscovery)
}
// nacosServiceDiscovery is the implementation of service discovery based on nacos.
// There is a problem, the go client for nacos does not support the id field.
// we will use the metadata to store the id of ServiceInstance
type nacosServiceDiscovery struct {
nacosBaseRegistry
group string
}
// Destroy will close the service discovery.
// Actually, it only marks the naming client as null and then return
func (n *nacosServiceDiscovery) Destroy() error {
n.namingClient = nil
return nil
}
// Register will register the service to nacos
func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) error {
ins := n.toRegisterInstance(instance)
ok, err := n.namingClient.RegisterInstance(ins)
if err != nil || !ok {
return perrors.WithMessage(err, "Could not register the instance. "+instance.GetServiceName())
}
return nil
}
// Update will update the information
// However, because nacos client doesn't support the update API,
// so we should unregister the instance and then register it again.
// the error handling is hard to implement
func (n *nacosServiceDiscovery) Update(instance registry.ServiceInstance) error {
// TODO(wait for nacos support)
err := n.Unregister(instance)
if err != nil {
return perrors.WithStack(err)
}
return n.Register(instance)
}
// Unregister will unregister the instance
func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
ok, err := n.namingClient.DeregisterInstance(n.toDeregisterInstance(instance))
if err != nil || !ok {
return perrors.WithMessage(err, "Could not unregister the instance. "+instance.GetServiceName())
}
return nil
}
// GetDefaultPageSize will return the constant registry.DefaultPageSize
func (n *nacosServiceDiscovery) GetDefaultPageSize() int {
return registry.DefaultPageSize
}
// GetServices will return the all services
func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet {
services, err := n.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: n.group,
})
res := gxset.NewSet()
if err != nil {
logger.Errorf("Could not query the services: %v", err)
return res
}
for _, e := range services {
res.Add(e.Name)
}
return res
}
// GetInstances will return the instances of serviceName and the group
func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
instances, err := n.namingClient.SelectAllInstances(vo.SelectAllInstancesParam{
ServiceName: serviceName,
GroupName: n.group,
})
if err != nil {
logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group)
return make([]registry.ServiceInstance, 0, 0)
}
res := make([]registry.ServiceInstance, 0, len(instances))
for _, ins := range instances {
metadata := ins.Metadata
id := metadata[idKey]
delete(metadata, idKey)
res = append(res, &registry.DefaultServiceInstance{
Id: id,
ServiceName: ins.ServiceName,
Host: ins.Ip,
Port: int(ins.Port),
Enable: ins.Enable,
Healthy: ins.Healthy,
Metadata: metadata,
})
}
return res
}
// GetInstancesByPage will return the instances
// Due to nacos client does not support pagination, so we have to query all instances and then return part of them
func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := n.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
}
// GetHealthyInstancesByPage will return the instance
// The nacos client has an API SelectInstances, which has a parameter call HealthyOnly.
// However, the healthy parameter in this method maybe false. So we can not use that API.
// Thus, we must query all instances and then do filter
func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
all := n.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
var (
i = offset
count = 0
)
for i < len(all) && count < pageSize {
ins := all[i]
if ins.IsHealthy() == healthy {
res = append(res, all[i])
count++
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
}
// GetRequestInstances will return the instances
// The nacos client doesn't have batch API, so we should query those serviceNames one by one.
func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
res[name] = n.GetInstancesByPage(name, offset, requestedSize)
}
return res
}
// AddListener will add a listener
func (n *nacosServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
return n.namingClient.Subscribe(&vo.SubscribeParam{
ServiceName: listener.ServiceName,
SubscribeCallback: func(services []model.SubscribeService, err error) {
if err != nil {
logger.Errorf("Could not handle the subscribe notification because the err is not nil."+
" service name: %s, err: %v", listener.ServiceName, err)
}
instances := make([]registry.ServiceInstance, 0, len(services))
for _, service := range services {
// we won't use the nacos instance id here but use our instance id
metadata := service.Metadata
id := metadata[idKey]
delete(metadata, idKey)
instances = append(instances, &registry.DefaultServiceInstance{
Id: id,
ServiceName: service.ServiceName,
Host: service.Ip,
Port: int(service.Port),
Enable: service.Enable,
Healthy: true,
Metadata: metadata,
})
}
e := n.DispatchEventForInstances(listener.ServiceName, instances)
if e != nil {
logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err)
}
},
})
}
// DispatchEventByServiceName will dispatch the event for the service with the service name
func (n *nacosServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return n.DispatchEventForInstances(serviceName, n.GetInstances(serviceName))
}
// DispatchEventForInstances will dispatch the event to those instances
func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
return n.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
// DispatchEvent will dispatch the event
func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
// TODO(waiting for event dispatcher, another task)
return nil
}
// toRegisterInstance convert the ServiceInstance to RegisterInstanceParam
// the Ephemeral will be true
func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInstance) vo.RegisterInstanceParam {
metadata := instance.GetMetadata()
if metadata == nil {
metadata = make(map[string]string, 1)
}
metadata[idKey] = instance.GetId()
return vo.RegisterInstanceParam{
ServiceName: instance.GetServiceName(),
Ip: instance.GetHost(),
Port: uint64(instance.GetPort()),
Metadata: metadata,
Enable: instance.IsEnable(),
Healthy: instance.IsHealthy(),
GroupName: n.group,
Ephemeral: true,
}
}
// toDeregisterInstance will convert the ServiceInstance to DeregisterInstanceParam
func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceInstance) vo.DeregisterInstanceParam {
return vo.DeregisterInstanceParam{
ServiceName: instance.GetServiceName(),
Ip: instance.GetHost(),
Port: uint64(instance.GetPort()),
GroupName: n.group,
}
}
// toDeregisterInstance will create new service discovery instance
func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
base, err := newBaseRegistry(url)
if err != nil {
return nil, perrors.WithStack(err)
}
return &nacosServiceDiscovery{
nacosBaseRegistry: base,
group: url.GetParam(constant.NACOS_GROUP, defaultGroup),
}, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"strconv"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/registry"
)
func TestNacosServiceDiscovery_Destroy(t *testing.T) {
serviceDiscovry, err := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl())
assert.Nil(t, err)
assert.NotNil(t, serviceDiscovry)
err = serviceDiscovry.Destroy()
assert.Nil(t, err)
assert.Nil(t, serviceDiscovry.(*nacosServiceDiscovery).namingClient)
}
func TestNacosServiceDiscovery_CRUD(t *testing.T) {
serviceName := "service-name"
id := "id"
host := "host"
port := 123
instance := &registry.DefaultServiceInstance{
Id: id,
ServiceName: serviceName,
Host: host,
Port: port,
Enable: true,
Healthy: true,
Metadata: nil,
}
// clean data
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl())
// clean data for local test
serviceDiscovry.Unregister(&registry.DefaultServiceInstance{
Id: id,
ServiceName: serviceName,
Host: host,
Port: port,
})
err := serviceDiscovry.Register(instance)
assert.Nil(t, err)
page := serviceDiscovry.GetHealthyInstancesByPage(serviceName, 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
assert.Equal(t, 10, page.GetPageSize())
assert.Equal(t, 1, page.GetDataSize())
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
assert.NotNil(t, instance)
assert.Equal(t, id, instance.GetId())
assert.Equal(t, host, instance.GetHost())
assert.Equal(t, port, instance.GetPort())
assert.Equal(t, serviceName, instance.GetServiceName())
assert.Equal(t, 0, len(instance.GetMetadata()))
instance.Metadata["a"] = "b"
err = serviceDiscovry.Update(instance)
assert.Nil(t, err)
pageMap := serviceDiscovry.GetRequestInstances([]string{serviceName}, 0, 1)
assert.Equal(t, 1, len(pageMap))
page = pageMap[serviceName]
assert.NotNil(t, page)
assert.Equal(t, 1, len(page.GetData()))
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
v, _ := instance.Metadata["a"]
assert.Equal(t, "b", v)
// test dispatcher event
err = serviceDiscovry.DispatchEventByServiceName(serviceName)
assert.Nil(t, err)
// test AddListener
err = serviceDiscovry.AddListener(&registry.ServiceInstancesChangedListener{})
assert.Nil(t, err)
}
func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) {
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl())
assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize())
}
func mockUrl() *common.URL {
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
return &regurl
}
......@@ -26,19 +26,13 @@ import (
gxpage "github.com/dubbogo/gost/page"
)
import (
"github.com/apache/dubbo-go/common"
)
const DefaultPageSize = 100
type ServiceDiscovery interface {
fmt.Stringer
// ----------------- lifecycle -------------------
// Initialize will initialize the service discovery instance
// if initialize failed, it will return the error
Initialize(url common.URL) error
// Destroy will destroy the service discovery.
// If the discovery cannot be destroy, it will return an error.
Destroy() error
......@@ -59,7 +53,7 @@ type ServiceDiscovery interface {
GetDefaultPageSize() int
// GetServices will return the all service names.
GetServices() gxset.HashSet
GetServices() *gxset.HashSet
// GetInstances will return all service instances with serviceName
GetInstances(serviceName string) []ServiceInstance
......@@ -78,6 +72,7 @@ type ServiceDiscovery interface {
// ----------------- event ----------------------
// AddListener adds a new ServiceInstancesChangedListener
// see addServiceInstancesChangedListener in Java
AddListener(listener *ServiceInstancesChangedListener) error
// DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName
......@@ -87,5 +82,5 @@ type ServiceDiscovery interface {
DispatchEventForInstances(serviceName string, instances []ServiceInstance) error
// DispatchEvent dispatches the event
DispatchEvent(event ServiceInstancesChangedEvent) error
DispatchEvent(event *ServiceInstancesChangedEvent) error
}
......@@ -29,8 +29,7 @@ type ServiceInstance interface {
GetHost() string
// GetPort will return the port.
// if the port is not present, return error
GetPort() (int, error)
GetPort() int
// IsEnable will return the enable status of this instance
IsEnable() bool
......@@ -41,3 +40,50 @@ type ServiceInstance interface {
// GetMetadata will return the metadata
GetMetadata() map[string]string
}
// DefaultServiceInstance the default implementation of ServiceInstance
// or change the ServiceInstance to be struct???
type DefaultServiceInstance struct {
Id string
ServiceName string
Host string
Port int
Enable bool
Healthy bool
Metadata map[string]string
}
// GetId will return this instance's id. It should be unique.
func (d *DefaultServiceInstance) GetId() string {
return d.Id
}
// GetServiceName will return the serviceName
func (d *DefaultServiceInstance) GetServiceName() string {
return d.ServiceName
}
// GetHost will return the hostname
func (d *DefaultServiceInstance) GetHost() string {
return d.Host
}
// GetPort will return the port.
func (d *DefaultServiceInstance) GetPort() int {
return d.Port
}
// IsEnable will return the enable status of this instance
func (d *DefaultServiceInstance) IsEnable() bool {
return d.Enable
}
// IsHealthy will return the value represent the instance whether healthy or not
func (d *DefaultServiceInstance) IsHealthy() bool {
return d.Healthy
}
// GetMetadata will return the metadata
func (d *DefaultServiceInstance) GetMetadata() map[string]string {
return d.Metadata
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment