Skip to content
Snippets Groups Projects
Commit ba84101e authored by flycash's avatar flycash
Browse files

Add tests

parent 6e4b86a2
No related branches found
No related tags found
No related merge requests found
...@@ -254,9 +254,7 @@ const ( ...@@ -254,9 +254,7 @@ const (
) )
// service discovery // service discovery
const ( const (
NACOS_GROUP = "nacos.group"
SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services" SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services"
PROVIDER_BY = "provided-by" PROVIDER_BY = "provided-by"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision" EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision"
...@@ -267,5 +265,10 @@ const ( ...@@ -267,5 +265,10 @@ const (
METADATA_SERVICE_PREFIX = "dubbo.metadata-service." METADATA_SERVICE_PREFIX = "dubbo.metadata-service."
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls"
SERVICE_NAME_MAPPING_KEY = "service-name-mapping"
// used by URL
// SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used
SERVICE_NAME_MAPPING_KEY = "name_mapping"
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
) )
...@@ -43,7 +43,7 @@ type multiConfiger interface { ...@@ -43,7 +43,7 @@ type multiConfiger interface {
// BaseConfig is the common configuration for provider and consumer // BaseConfig is the common configuration for provider and consumer
type BaseConfig struct { type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
Remotes map[string]*RemoteConfig `yaml:"remotes" json:"remotes,omitempty"` Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"`
ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"` ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"`
// application config // application config
ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"`
......
...@@ -21,10 +21,10 @@ package config ...@@ -21,10 +21,10 @@ package config
type ServiceDiscoveryConfig struct { type ServiceDiscoveryConfig struct {
// Protocol indicate which implementation will be used. // Protocol indicate which implementation will be used.
// for example, if the Protocol is nacos, it means that we will use nacosServiceDiscovery // for example, if the Protocol is nacos, it means that we will use nacosServiceDiscovery
Protocol string Protocol string `yaml:"protocol" json:"protocol,omitempty"`
// Group, usually you don't need to config this field. // Group, usually you don't need to config this field.
// you can use this to do some isolation // you can use this to do some isolation
Group string Group string `yaml:"group" json:"group,omitempty"`
// RemoteRef is the reference point to RemoteConfig which will be used to create remotes instances. // RemoteRef is the reference point to RemoteConfig which will be used to create remotes instances.
RemoteRef string RemoteRef string `yaml:"remote_ref" json:"remote_ref,omitempty"`
} }
...@@ -281,6 +281,7 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn ...@@ -281,6 +281,7 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn
} }
} }
// String will return the description of the instance
func (n *nacosServiceDiscovery) String() string { func (n *nacosServiceDiscovery) String() string {
return n.descriptor return n.descriptor
} }
...@@ -291,7 +292,7 @@ var ( ...@@ -291,7 +292,7 @@ var (
initLock sync.Mutex initLock sync.Mutex
) )
// newNacosServiceDiscovery will create new service discovery instance // newNacosServiceDiscovery will try to find the instance withe name, if not found, a new one will be created.
// use double-check pattern to reduce race condition // use double-check pattern to reduce race condition
func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
...@@ -330,9 +331,12 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { ...@@ -330,9 +331,12 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address)
return &nacosServiceDiscovery{ instance = &nacosServiceDiscovery{
group: group, group: group,
namingClient: client, namingClient: client,
descriptor: descriptor, descriptor: descriptor,
}, nil }
instanceMap[name] = instance
return instance, nil
} }
...@@ -40,6 +40,35 @@ var ( ...@@ -40,6 +40,35 @@ var (
testName = "test" testName = "test"
) )
func Test_newNacosServiceDiscovery(t *testing.T) {
name := "nacos1"
_, err := newNacosServiceDiscovery(name)
// the ServiceDiscoveryConfig not found
assert.NotNil(t, err)
sdc := &config.ServiceDiscoveryConfig{
Protocol: "nacos",
RemoteRef: "mock",
}
config.GetBaseConfig().ServiceDiscoveries[name] = sdc
_, err = newNacosServiceDiscovery(name)
// RemoteConfig not found
assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
Address: "console.nacos.io:80",
Timeout: 10 * time.Second,
}
res, err := newNacosServiceDiscovery(name)
assert.Nil(t, err)
assert.NotNil(t, res)
}
func TestNacosServiceDiscovery_Destroy(t *testing.T) { func TestNacosServiceDiscovery_Destroy(t *testing.T) {
prepareData() prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName) serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName)
......
...@@ -23,6 +23,10 @@ import ( ...@@ -23,6 +23,10 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
perrors "github.com/pkg/errors"
"github.com/apache/dubbo-go/config"
) )
import ( import (
...@@ -48,16 +52,22 @@ const ( ...@@ -48,16 +52,22 @@ const (
protocolName = "service-discovery" protocolName = "service-discovery"
) )
var (
registryInstance *serviceDiscoveryRegistry
registryInitOnce sync.Once
)
func init() { func init() {
extension.SetRegistry(protocolName, newServiceDiscoveryRegistry) extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
} }
// serviceDiscoveryRegistry is the implementation of application-level registry. // serviceDiscoveryRegistry is the implementation of application-level registry.
// It's completely different from other registry implementations // It's completely different from other registry implementations
// The serviceDiscoveryRegistry should be singleton
// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping // This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
// In order to keep compatible with interface-level registry, // In order to keep compatible with interface-level registry,
// 1. when we registry the service, we should create the mapping from service name to application name // 1. when we registry the service, we should create the mapping from service name to application name
// 2. when we sub // 2. when we subscribe the service, we should find out related application and then find application's information
type serviceDiscoveryRegistry struct { type serviceDiscoveryRegistry struct {
lock sync.RWMutex lock sync.RWMutex
url *common.URL url *common.URL
...@@ -65,38 +75,51 @@ type serviceDiscoveryRegistry struct { ...@@ -65,38 +75,51 @@ type serviceDiscoveryRegistry struct {
subscribedServices *gxset.HashSet subscribedServices *gxset.HashSet
serviceNameMapping mapping.ServiceNameMapping serviceNameMapping mapping.ServiceNameMapping
metaDataService service.MetadataService metaDataService service.MetadataService
//cache the registered listen // cache the registered listen
registeredListeners *gxset.HashSet registeredListeners *gxset.HashSet
//all synthesize // all synthesize
subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
//cache exported urls, serviceName->revision->[]URL // cache exported urls, serviceName->revision->[]URL
serviceRevisionExportedURLsCache map[string]map[string][]common.URL serviceRevisionExportedURLsCache map[string]map[string][]common.URL
} }
func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { // newServiceDiscoveryRegistry will return the instance
serviceDiscovery, err := creatServiceDiscovery(url) // if not found, it will create one
if err != nil { func newServiceDiscoveryRegistry(url *common.URL) (res registry.Registry, err error) {
return nil, err
} registryInitOnce.Do(func() {
subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) serviceDiscovery, err := creatServiceDiscovery(url)
subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() if err != nil {
serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) return
//TODO it's need to get implement by factory }
metaDataService := inmemory.NewMetadataService() subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
return &serviceDiscoveryRegistry{ subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
url: url, serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, ""))
serviceDiscovery: serviceDiscovery, // TODO it's need to get implement by factory
subscribedServices: subscribedServices, metaDataService := inmemory.NewMetadataService()
subscribedURLsSynthesizers: subscribedURLsSynthesizers, registryInstance = &serviceDiscoveryRegistry{
registeredListeners: gxset.NewSet(), url: url,
serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), serviceDiscovery: serviceDiscovery,
serviceNameMapping: serviceNameMapping, subscribedServices: subscribedServices,
metaDataService: metaDataService, subscribedURLsSynthesizers: subscribedURLsSynthesizers,
}, nil registeredListeners: gxset.NewSet(),
serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL),
serviceNameMapping: serviceNameMapping,
metaDataService: metaDataService,
}
})
res = registryInstance
return
} }
func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
return extension.GetServiceDiscovery(url.Protocol, "TODO") discovery := url.GetParam(constant.SERVICE_DISCOVERY_KEY, constant.DEFAULT_KEY)
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(discovery)
if !ok {
return nil, perrors.New("could not find the ServiceDiscoverConfig with name: " + discovery)
}
return extension.GetServiceDiscovery(sdc.Protocol, discovery)
} }
func parseServices(literalServices string) *gxset.HashSet { func parseServices(literalServices string) *gxset.HashSet {
...@@ -113,22 +136,22 @@ func parseServices(literalServices string) *gxset.HashSet { ...@@ -113,22 +136,22 @@ func parseServices(literalServices string) *gxset.HashSet {
return set return set
} }
//GetServiceDiscovery for get serviceDiscovery of the registry // GetServiceDiscovery for get serviceDiscovery of the registry
func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery { func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
return s.serviceDiscovery return s.serviceDiscovery
} }
//GetUrl for get url of the registry // GetUrl for get url of the registry
func (s *serviceDiscoveryRegistry) GetUrl() common.URL { func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
return *s.url return *s.url
} }
//IsAvailable for make sure is't available // IsAvailable for make sure is't available
func (s *serviceDiscoveryRegistry) IsAvailable() bool { func (s *serviceDiscoveryRegistry) IsAvailable() bool {
return true return true
} }
//Destroy for destroy graceful down // Destroy for destroy graceful down
func (s *serviceDiscoveryRegistry) Destroy() { func (s *serviceDiscoveryRegistry) Destroy() {
err := s.serviceDiscovery.Destroy() err := s.serviceDiscovery.Destroy()
if err != nil { if err != nil {
...@@ -162,7 +185,7 @@ func shouldRegister(url common.URL) bool { ...@@ -162,7 +185,7 @@ func shouldRegister(url common.URL) bool {
return false return false
} }
//Subscribe for listen the change of services that from the exported url // Subscribe for listen the change of services that from the exported url
func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) { func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) {
if !shouldSubscribe(*url) { if !shouldSubscribe(*url) {
return return
...@@ -243,7 +266,7 @@ func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.No ...@@ -243,7 +266,7 @@ func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.No
if len(subscribedURLs) == 0 { if len(subscribedURLs) == 0 {
subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances) subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
} }
//TODO make sure it's workable // TODO make sure it's workable
for _, url := range subscribedURLs { for _, url := range subscribedURLs {
notify.Notify(&registry.ServiceEvent{ notify.Notify(&registry.ServiceEvent{
Action: remoting.EventTypeAdd, Action: remoting.EventTypeAdd,
...@@ -461,7 +484,7 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc ...@@ -461,7 +484,7 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc
serviceInstance.GetHost(), serviceInstance.GetPort(), revision) serviceInstance.GetHost(), serviceInstance.GetPort(), revision)
} }
} else { } else {
//Else, The cache is hit // Else, The cache is hit
logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+ logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+
"[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet, "[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet,
serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(), serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(),
...@@ -499,8 +522,8 @@ func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsa ...@@ -499,8 +522,8 @@ func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsa
u.RemoveParams(removeParamSet) u.RemoveParams(removeParamSet)
port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol)) port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol))
if u.Location != host || u.Port != port { if u.Location != host || u.Port != port {
u.Port = port //reset port u.Port = port // reset port
u.Location = host //reset host u.Location = host // reset host
} }
clonedExportedURLs = append(clonedExportedURLs, u) clonedExportedURLs = append(clonedExportedURLs, u)
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment