Skip to content
Snippets Groups Projects
Unverified Commit ed5693fa authored by lzp0412's avatar lzp0412 Committed by GitHub
Browse files

Merge pull request #5 from flycash/2.7.5-bk

Provider startup
parents 3f1a52be 266e0b6a
No related branches found
No related tags found
No related merge requests found
......@@ -254,9 +254,7 @@ const (
)
// service discovery
const (
NACOS_GROUP = "nacos.group"
SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services"
PROVIDER_BY = "provided-by"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision"
......@@ -267,5 +265,10 @@ const (
METADATA_SERVICE_PREFIX = "dubbo.metadata-service."
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls"
SERVICE_NAME_MAPPING_KEY = "service-name-mapping"
// used by URL
// SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used
SERVICE_NAME_MAPPING_KEY = "name_mapping"
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
)
......@@ -18,18 +18,21 @@
package extension
import (
"github.com/apache/dubbo-go/metadata/mapping"
"github.com/apache/dubbo-go/metadata"
)
var (
nameMappings = make(map[string]func() mapping.ServiceNameMapping)
nameMappings = make(map[string]func() metadata.ServiceNameMapping)
)
func SetServiceNameMapping(name string, creator func() mapping.ServiceNameMapping) {
// TODO(@邓大明)
func SetServiceNameMapping(name string, creator func() metadata.ServiceNameMapping) {
nameMappings[name] = creator
}
func GetServiceNameMapping(name string) mapping.ServiceNameMapping {
// TODO(@邓大明)
return nil
func GetServiceNameMapping(name string) metadata.ServiceNameMapping {
creator, ok := nameMappings[name]
if !ok {
panic("Can not find the target service name mapping: " + name)
}
return creator()
}
......@@ -43,7 +43,7 @@ type multiConfiger interface {
// BaseConfig is the common configuration for provider and consumer
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
Remotes map[string]*RemoteConfig `yaml:"remotes" json:"remotes,omitempty"`
Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"`
ServiceDiscoveries map[string]*ServiceDiscoveryConfig `yaml:"service_discovery" json:"service_discovery,omitempty"`
// application config
ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"`
......
......@@ -21,10 +21,10 @@ package config
type ServiceDiscoveryConfig struct {
// Protocol indicate which implementation will be used.
// for example, if the Protocol is nacos, it means that we will use nacosServiceDiscovery
Protocol string
Protocol string `yaml:"protocol" json:"protocol,omitempty"`
// Group, usually you don't need to config this field.
// you can use this to do some isolation
Group string
Group string `yaml:"group" json:"group,omitempty"`
// RemoteRef is the reference point to RemoteConfig which will be used to create remotes instances.
RemoteRef string
RemoteRef string `yaml:"remote_ref" json:"remote_ref,omitempty"`
}
......@@ -29,27 +29,19 @@ import (
)
import (
env "github.com/apache/dubbo-go/common/config"
common_cfg "github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/metadata/mapping"
"github.com/apache/dubbo-go/metadata"
)
const (
defaultGroup = config_center.DEFAULT_GROUP
slash = "/"
name = "dynamic"
)
func init() {
extension.SetServiceNameMapping(name, GetServiceNameMappingInstance)
extension.SetServiceNameMapping(constant.DEFAULT_KEY, GetServiceNameMappingInstance)
}
// DynamicConfigurationServiceNameMapping is the implementation based on config center
// It could be thought as singleton pattern.
type DynamicConfigurationServiceNameMapping struct {
dc config_center.DynamicConfiguration
}
......@@ -87,21 +79,15 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str
}
var (
instance *DynamicConfigurationServiceNameMapping
initOnce sync.Once
serviceNameMappingInstance *DynamicConfigurationServiceNameMapping
serviceNameMappingInitOnce sync.Once
)
// newServiceNameMapping will create an instance of DynamicConfigurationServiceNameMapping
func newServiceNameMapping(dc config_center.DynamicConfiguration) *DynamicConfigurationServiceNameMapping {
return &DynamicConfigurationServiceNameMapping{dc: dc}
}
// GetServiceNameMappingInstance will return the instance.
// If the instance is not initiated, it will create one
func GetServiceNameMappingInstance() mapping.ServiceNameMapping {
initOnce.Do(func() {
dc := env.GetEnvInstance().GetDynamicConfiguration()
instance = newServiceNameMapping(dc)
// GetServiceNameMappingInstance will return an instance of DynamicConfigurationServiceNameMapping
func GetServiceNameMappingInstance() metadata.ServiceNameMapping {
serviceNameMappingInitOnce.Do(func() {
dc := common_cfg.GetEnvInstance().GetDynamicConfiguration()
serviceNameMappingInstance = &DynamicConfigurationServiceNameMapping{dc: dc}
})
return instance
return serviceNameMappingInstance
}
......@@ -41,7 +41,7 @@ func TestDynamicConfigurationServiceNameMapping(t *testing.T) {
}).GetDynamicConfiguration(nil)
config.GetApplicationConfig().Name = appName
mapping := newServiceNameMapping(dc)
mapping := &DynamicConfigurationServiceNameMapping{dc: dc}
intf := constant.METADATA_SERVICE_NAME
group := "myGroup"
version := "myVersion"
......
......@@ -17,14 +17,24 @@
package memory
import (
"sync"
)
import (
gxset "github.com/dubbogo/gost/container/set"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata"
)
func init() {
extension.SetServiceNameMapping("in-memory", GetInMemoryServiceNameMappingInstance)
}
type InMemoryServiceNameMapping struct{}
func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error {
......@@ -34,3 +44,15 @@ func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, v
func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) {
return gxset.NewSet(config.GetApplicationConfig().Name), nil
}
var (
nameMappingInstance *InMemoryServiceNameMapping
nameMappingInitOnce sync.Once
)
func GetInMemoryServiceNameMappingInstance() metadata.ServiceNameMapping {
nameMappingInitOnce.Do(func() {
nameMappingInstance = &InMemoryServiceNameMapping{}
})
return nameMappingInstance
}
......@@ -18,21 +18,15 @@
package nacos
import (
"fmt"
"sync"
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/remoting/nacos"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
......@@ -53,12 +47,8 @@ func init() {
// There is a problem, the go client for nacos does not support the id field.
// we will use the metadata to store the id of ServiceInstance
type nacosServiceDiscovery struct {
nacosBaseRegistry
group string
// descriptor is a short string about the basic information of this instance
descriptor string
// namingClient is the Nacos' client
namingClient naming_client.INamingClient
}
// Destroy will close the service discovery.
......@@ -247,7 +237,7 @@ func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, in
// DispatchEvent will dispatch the event
func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
extension.GetGlobalDispatcher().Dispatch(event)
// TODO(waiting for event dispatcher, another task)
return nil
}
......@@ -281,58 +271,15 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn
}
}
func (n *nacosServiceDiscovery) String() string {
return n.descriptor
}
var (
// 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition
instanceMap = make(map[string]registry.ServiceDiscovery, 16)
initLock sync.Mutex
)
// newNacosServiceDiscovery will create new service discovery instance
// use double-check pattern to reduce race condition
func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
instance, ok := instanceMap[name]
if ok {
return instance, nil
}
initLock.Lock()
defer initLock.Unlock()
// toDeregisterInstance will create new service discovery instance
func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
// double check
instance, ok = instanceMap[name]
if ok {
return instance, nil
}
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
if !ok || len(sdc.RemoteRef) == 0 {
return nil, perrors.New("could not init the instance because the config is invalid")
}
remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
if !ok {
return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
}
group := sdc.Group
if len(group) == 0 {
group = defaultGroup
}
client, err := nacos.NewNacosClient(remoteConfig)
base, err := newBaseRegistry(url)
if err != nil {
return nil, perrors.WithMessage(err, "create nacos client failed.")
return nil, perrors.WithStack(err)
}
descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address)
return &nacosServiceDiscovery{
group: group,
namingClient: client,
descriptor: descriptor,
nacosBaseRegistry: base,
group: url.GetParam(constant.NACOS_GROUP, defaultGroup),
}, nil
}
......@@ -40,6 +40,35 @@ var (
testName = "test"
)
func Test_newNacosServiceDiscovery(t *testing.T) {
name := "nacos1"
_, err := newNacosServiceDiscovery(name)
// the ServiceDiscoveryConfig not found
assert.NotNil(t, err)
sdc := &config.ServiceDiscoveryConfig{
Protocol: "nacos",
RemoteRef: "mock",
}
config.GetBaseConfig().ServiceDiscoveries[name] = sdc
_, err = newNacosServiceDiscovery(name)
// RemoteConfig not found
assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
Address: "console.nacos.io:80",
Timeout: 10 * time.Second,
}
res, err := newNacosServiceDiscovery(name)
assert.Nil(t, err)
assert.NotNil(t, res)
}
func TestNacosServiceDiscovery_Destroy(t *testing.T) {
prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName)
......
......@@ -23,6 +23,10 @@ import (
"strconv"
"strings"
"sync"
perrors "github.com/pkg/errors"
"github.com/apache/dubbo-go/config"
)
import (
......@@ -48,16 +52,22 @@ const (
protocolName = "service-discovery"
)
var (
registryInstance *serviceDiscoveryRegistry
registryInitOnce sync.Once
)
func init() {
extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
}
// serviceDiscoveryRegistry is the implementation of application-level registry.
// It's completely different from other registry implementations
// The serviceDiscoveryRegistry should be singleton
// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
// In order to keep compatible with interface-level registry,
// 1. when we registry the service, we should create the mapping from service name to application name
// 2. when we sub
// 2. when we subscribe the service, we should find out related application and then find application's information
type serviceDiscoveryRegistry struct {
lock sync.RWMutex
url *common.URL
......@@ -65,38 +75,51 @@ type serviceDiscoveryRegistry struct {
subscribedServices *gxset.HashSet
serviceNameMapping mapping.ServiceNameMapping
metaDataService service.MetadataService
//cache the registered listen
// cache the registered listen
registeredListeners *gxset.HashSet
//all synthesize
// all synthesize
subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
//cache exported urls, serviceName->revision->[]URL
// cache exported urls, serviceName->revision->[]URL
serviceRevisionExportedURLsCache map[string]map[string][]common.URL
}
func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
serviceDiscovery, err := creatServiceDiscovery(url)
if err != nil {
return nil, err
}
subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, ""))
//TODO it's need to get implement by factory
metaDataService := inmemory.NewMetadataService()
return &serviceDiscoveryRegistry{
url: url,
serviceDiscovery: serviceDiscovery,
subscribedServices: subscribedServices,
subscribedURLsSynthesizers: subscribedURLsSynthesizers,
registeredListeners: gxset.NewSet(),
serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL),
serviceNameMapping: serviceNameMapping,
metaDataService: metaDataService,
}, nil
// newServiceDiscoveryRegistry will return the instance
// if not found, it will create one
func newServiceDiscoveryRegistry(url *common.URL) (res registry.Registry, err error) {
registryInitOnce.Do(func() {
serviceDiscovery, err := creatServiceDiscovery(url)
if err != nil {
return
}
subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, ""))
// TODO it's need to get implement by factory
metaDataService := inmemory.NewMetadataService()
registryInstance = &serviceDiscoveryRegistry{
url: url,
serviceDiscovery: serviceDiscovery,
subscribedServices: subscribedServices,
subscribedURLsSynthesizers: subscribedURLsSynthesizers,
registeredListeners: gxset.NewSet(),
serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL),
serviceNameMapping: serviceNameMapping,
metaDataService: metaDataService,
}
})
res = registryInstance
return
}
func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
return extension.GetServiceDiscovery(url.Protocol, "TODO")
discovery := url.GetParam(constant.SERVICE_DISCOVERY_KEY, constant.DEFAULT_KEY)
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(discovery)
if !ok {
return nil, perrors.New("could not find the ServiceDiscoverConfig with name: " + discovery)
}
return extension.GetServiceDiscovery(sdc.Protocol, discovery)
}
func parseServices(literalServices string) *gxset.HashSet {
......@@ -113,22 +136,22 @@ func parseServices(literalServices string) *gxset.HashSet {
return set
}
//GetServiceDiscovery for get serviceDiscovery of the registry
// GetServiceDiscovery for get serviceDiscovery of the registry
func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
return s.serviceDiscovery
}
//GetUrl for get url of the registry
// GetUrl for get url of the registry
func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
return *s.url
}
//IsAvailable for make sure is't available
// IsAvailable for make sure is't available
func (s *serviceDiscoveryRegistry) IsAvailable() bool {
return true
}
//Destroy for destroy graceful down
// Destroy for destroy graceful down
func (s *serviceDiscoveryRegistry) Destroy() {
err := s.serviceDiscovery.Destroy()
if err != nil {
......@@ -162,7 +185,7 @@ func shouldRegister(url common.URL) bool {
return false
}
//Subscribe for listen the change of services that from the exported url
// Subscribe for listen the change of services that from the exported url
func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) {
if !shouldSubscribe(*url) {
return
......@@ -243,7 +266,7 @@ func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.No
if len(subscribedURLs) == 0 {
subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
}
//TODO make sure it's workable
// TODO make sure it's workable
for _, url := range subscribedURLs {
notify.Notify(&registry.ServiceEvent{
Action: remoting.EventTypeAdd,
......@@ -461,7 +484,7 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc
serviceInstance.GetHost(), serviceInstance.GetPort(), revision)
}
} else {
//Else, The cache is hit
// Else, The cache is hit
logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+
"[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet,
serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(),
......@@ -499,8 +522,8 @@ func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsa
u.RemoveParams(removeParamSet)
port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol))
if u.Location != host || u.Port != port {
u.Port = port //reset port
u.Location = host //reset host
u.Port = port // reset port
u.Location = host // reset host
}
clonedExportedURLs = append(clonedExportedURLs, u)
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment