Skip to content
Snippets Groups Projects
Commit b460d32f authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #589 from zouyx/feature/addRegistryComment

Mod: update the comments in registy directory
parents 069ab751 ca498fbf
No related branches found
No related tags found
No related merge requests found
Showing
with 84 additions and 30 deletions
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
// ConfigurationListener for changing listener's event // ConfigurationListener for changing listener's event
type ConfigurationListener interface { type ConfigurationListener interface {
// Process the notification event once there's any change happens on the config
Process(*ConfigChangeEvent) Process(*ConfigChangeEvent)
} }
......
...@@ -29,19 +29,19 @@ import ( ...@@ -29,19 +29,19 @@ import (
"github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting"
) )
// BaseConfigurationListener ... // nolint
type BaseConfigurationListener struct { type BaseConfigurationListener struct {
configurators []config_center.Configurator configurators []config_center.Configurator
dynamicConfiguration config_center.DynamicConfiguration dynamicConfiguration config_center.DynamicConfiguration
defaultConfiguratorFunc func(url *common.URL) config_center.Configurator defaultConfiguratorFunc func(url *common.URL) config_center.Configurator
} }
// Configurators ... // Configurators gets Configurator from config center
func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator { func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator {
return bcl.configurators return bcl.configurators
} }
// InitWith ... // InitWith will init BaseConfigurationListener by @key+@Listener+@f
func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) { func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) {
bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration() bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration()
if bcl.dynamicConfiguration == nil { if bcl.dynamicConfiguration == nil {
...@@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente ...@@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente
} }
} }
// Process ... // Process the notification event once there's any change happens on the config.
func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
if event.ConfigType == remoting.EventTypeDel { if event.ConfigType == remoting.EventTypeDel {
...@@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin ...@@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin
return nil return nil
} }
// OverrideUrl ... // OverrideUrl gets existing configuration rule and overrides provider url before exporting.
func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) { func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) {
for _, v := range bcl.configurators { for _, v := range bcl.configurators {
v.Configure(url) v.Configure(url)
} }
} }
// ToConfigurators ... // ToConfigurators converts @urls by @f to config_center.Configurators
func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator { func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator {
if len(urls) == 0 { if len(urls) == 0 {
return nil return nil
......
...@@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) { ...@@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) {
} }
} }
// Next returns the service event from consul.
func (l *consulListener) Next() (*registry.ServiceEvent, error) { func (l *consulListener) Next() (*registry.ServiceEvent, error) {
select { select {
case event := <-l.eventCh: case event := <-l.eventCh:
...@@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) { ...@@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) {
} }
} }
// Close closes this listener
func (l *consulListener) Close() { func (l *consulListener) Close() {
close(l.done) close(l.done)
l.plan.Stop() l.plan.Stop()
......
...@@ -36,8 +36,7 @@ import ( ...@@ -36,8 +36,7 @@ import (
) )
const ( const (
// RegistryConnDelay ... registryConnDelay = 3
RegistryConnDelay = 3
) )
func init() { func init() {
...@@ -74,6 +73,7 @@ func newConsulRegistry(url *common.URL) (registry.Registry, error) { ...@@ -74,6 +73,7 @@ func newConsulRegistry(url *common.URL) (registry.Registry, error) {
return r, nil return r, nil
} }
// Register service to consul registry center
func (r *consulRegistry) Register(url common.URL) error { func (r *consulRegistry) Register(url common.URL) error {
var err error var err error
...@@ -95,6 +95,7 @@ func (r *consulRegistry) register(url common.URL) error { ...@@ -95,6 +95,7 @@ func (r *consulRegistry) register(url common.URL) error {
return r.client.Agent().ServiceRegister(service) return r.client.Agent().ServiceRegister(service)
} }
// Unregister service from consul registry center
func (r *consulRegistry) Unregister(url common.URL) error { func (r *consulRegistry) Unregister(url common.URL) error {
var err error var err error
...@@ -112,6 +113,7 @@ func (r *consulRegistry) unregister(url common.URL) error { ...@@ -112,6 +113,7 @@ func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url)) return r.client.Agent().ServiceDeregister(buildId(url))
} }
// Subscribe service from consul registry center
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER { if role == common.CONSUMER {
...@@ -133,7 +135,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti ...@@ -133,7 +135,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti
return return
} }
logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second) time.Sleep(time.Duration(registryConnDelay) * time.Second)
continue continue
} }
...@@ -156,10 +158,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error) ...@@ -156,10 +158,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error)
return listener, err return listener, err
} }
// GetUrl get registry URL of consul registry center
func (r *consulRegistry) GetUrl() common.URL { func (r *consulRegistry) GetUrl() common.URL {
return *r.URL return *r.URL
} }
// IsAvailable checks consul registry center whether is available
func (r *consulRegistry) IsAvailable() bool { func (r *consulRegistry) IsAvailable() bool {
select { select {
case <-r.done: case <-r.done:
...@@ -169,6 +173,7 @@ func (r *consulRegistry) IsAvailable() bool { ...@@ -169,6 +173,7 @@ func (r *consulRegistry) IsAvailable() bool {
} }
} }
// Destroy consul registry center
func (r *consulRegistry) Destroy() { func (r *consulRegistry) Destroy() {
close(r.done) close(r.done)
} }
...@@ -46,6 +46,8 @@ func init() { ...@@ -46,6 +46,8 @@ func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory) extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
} }
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct { type RegistryDirectory struct {
directory.BaseDirectory directory.BaseDirectory
cacheInvokers []protocol.Invoker cacheInvokers []protocol.Invoker
......
...@@ -38,15 +38,17 @@ type dataListener struct { ...@@ -38,15 +38,17 @@ type dataListener struct {
listener config_center.ConfigurationListener listener config_center.ConfigurationListener
} }
// NewRegistryDataListener // NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener} return &dataListener{listener: listener}
} }
// AddInterestedURL adds a registration @url to listen
func (l *dataListener) AddInterestedURL(url *common.URL) { func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url) l.interestedURL = append(l.interestedURL, url)
} }
// DataChange processes the data change event from registry center of etcd
func (l *dataListener) DataChange(eventType remoting.Event) bool { func (l *dataListener) DataChange(eventType remoting.Event) bool {
index := strings.Index(eventType.Path, "/providers/") index := strings.Index(eventType.Path, "/providers/")
...@@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { ...@@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
} }
// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType l.events <- configType
} }
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) { func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for { for {
select { select {
...@@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { ...@@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
} }
} }
// Close etcd registry center
func (l *configurationListener) Close() { func (l *configurationListener) Close() {
l.registry.WaitGroup().Done() l.registry.WaitGroup().Done()
} }
...@@ -104,27 +104,32 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { ...@@ -104,27 +104,32 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
return r, nil return r, nil
} }
// InitListeners init listeners of etcd registry center
func (r *etcdV3Registry) InitListeners() { func (r *etcdV3Registry) InitListeners() {
r.listener = etcdv3.NewEventListener(r.client) r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r) r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener) r.dataListener = NewRegistryDataListener(r.configListener)
} }
// DoRegister actually do the register job in the registry center of etcd
func (r *etcdV3Registry) DoRegister(root string, node string) error { func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "") return r.client.Create(path.Join(root, node), "")
} }
// CloseAndNilClient closes listeners and clear client
func (r *etcdV3Registry) CloseAndNilClient() { func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close() r.client.Close()
r.client = nil r.client = nil
} }
// CloseListener closes listeners
func (r *etcdV3Registry) CloseListener() { func (r *etcdV3Registry) CloseListener() {
if r.configListener != nil { if r.configListener != nil {
r.configListener.Close() r.configListener.Close()
} }
} }
// CreatePath create the path in the registry center of etcd
func (r *etcdV3Registry) CreatePath(k string) error { func (r *etcdV3Registry) CreatePath(k string) error {
var tmpPath string var tmpPath string
for _, str := range strings.Split(k, "/")[1:] { for _, str := range strings.Split(k, "/")[1:] {
...@@ -137,6 +142,7 @@ func (r *etcdV3Registry) CreatePath(k string) error { ...@@ -137,6 +142,7 @@ func (r *etcdV3Registry) CreatePath(k string) error {
return nil return nil
} }
// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var ( var (
......
...@@ -36,7 +36,7 @@ func init() { ...@@ -36,7 +36,7 @@ func init() {
// service event // service event
// //////////////////////////////////////// // ////////////////////////////////////////
// ServiceEvent ... // ServiceEvent includes create, update, delete event
type ServiceEvent struct { type ServiceEvent struct {
Action remoting.EventType Action remoting.EventType
Service common.URL Service common.URL
......
...@@ -36,6 +36,7 @@ type ConditionalEventListener interface { ...@@ -36,6 +36,7 @@ type ConditionalEventListener interface {
Accept(e Event) bool Accept(e Event) bool
} }
// ServiceInstancesChangedListener is used when the Service Discovery Changed
// TODO (implement ConditionalEventListener) // TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct { type ServiceInstancesChangedListener struct {
ServiceName string ServiceName string
......
...@@ -38,12 +38,12 @@ type dataListener struct { ...@@ -38,12 +38,12 @@ type dataListener struct {
listener config_center.ConfigurationListener listener config_center.ConfigurationListener
} }
// NewRegistryDataListener // NewRegistryDataListener creates a data listener for kubernetes
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener} return &dataListener{listener: listener}
} }
// AddInterestedURL // AddInterestedURL adds the @url of registry center to the listener
func (l *dataListener) AddInterestedURL(url *common.URL) { func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url) l.interestedURL = append(l.interestedURL, url)
} }
...@@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener { ...@@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
} }
// Process processes the data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType l.events <- configType
} }
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) { func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for { for {
select { select {
...@@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { ...@@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
} }
} }
} }
// Close kubernetes registry center
func (l *configurationListener) Close() { func (l *configurationListener) Close() {
l.registry.WaitGroup().Done() l.registry.WaitGroup().Done()
} }
...@@ -68,23 +68,28 @@ type kubernetesRegistry struct { ...@@ -68,23 +68,28 @@ type kubernetesRegistry struct {
configListener *configurationListener configListener *configurationListener
} }
// Client gets the etcdv3 kubernetes
func (r *kubernetesRegistry) Client() *kubernetes.Client { func (r *kubernetesRegistry) Client() *kubernetes.Client {
r.cltLock.RLock() r.cltLock.RLock()
client := r.client client := r.client
r.cltLock.RUnlock() r.cltLock.RUnlock()
return client return client
} }
// SetClient sets the kubernetes client
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.cltLock.Lock() r.cltLock.Lock()
r.client = client r.client = client
r.cltLock.Unlock() r.cltLock.Unlock()
} }
// CloseAndNilClient closes listeners and clear client
func (r *kubernetesRegistry) CloseAndNilClient() { func (r *kubernetesRegistry) CloseAndNilClient() {
r.client.Close() r.client.Close()
r.client = nil r.client = nil
} }
// CloseListener closes listeners
func (r *kubernetesRegistry) CloseListener() { func (r *kubernetesRegistry) CloseListener() {
r.cltLock.Lock() r.cltLock.Lock()
...@@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() { ...@@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() {
r.configListener = nil r.configListener = nil
} }
// CreatePath create the path in the registry center of kubernetes
func (r *kubernetesRegistry) CreatePath(k string) error { func (r *kubernetesRegistry) CreatePath(k string) error {
if err := r.client.Create(k, ""); err != nil { if err := r.client.Create(k, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in kubernetes", k) return perrors.WithMessagef(err, "create path %s in kubernetes", k)
...@@ -103,10 +109,12 @@ func (r *kubernetesRegistry) CreatePath(k string) error { ...@@ -103,10 +109,12 @@ func (r *kubernetesRegistry) CreatePath(k string) error {
return nil return nil
} }
// DoRegister actually do the register job in the registry center of kubernetes
func (r *kubernetesRegistry) DoRegister(root string, node string) error { func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "") return r.client.Create(path.Join(root, node), "")
} }
// DoSubscribe actually subscribe the provider URL
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var ( var (
...@@ -139,6 +147,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er ...@@ -139,6 +147,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
return configListener, nil return configListener, nil
} }
// InitListeners init listeners of kubernetes registry center
func (r *kubernetesRegistry) InitListeners() { func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client) r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r) r.configListener = NewConfigurationListener(r)
...@@ -183,6 +192,7 @@ func newMockKubernetesRegistry( ...@@ -183,6 +192,7 @@ func newMockKubernetesRegistry(
return r, nil return r, nil
} }
// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() { func (r *kubernetesRegistry) HandleClientRestart() {
var ( var (
......
...@@ -30,13 +30,13 @@ import ( ...@@ -30,13 +30,13 @@ import (
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
) )
// MockRegistry ... // MockRegistry is used as mock registry
type MockRegistry struct { type MockRegistry struct {
listener *listener listener *listener
destroyed *atomic.Bool destroyed *atomic.Bool
} }
// NewMockRegistry ... // NewMockRegistry creates a mock registry
func NewMockRegistry(url *common.URL) (Registry, error) { func NewMockRegistry(url *common.URL) (Registry, error) {
registry := &MockRegistry{ registry := &MockRegistry{
destroyed: atomic.NewBool(false), destroyed: atomic.NewBool(false),
...@@ -46,23 +46,23 @@ func NewMockRegistry(url *common.URL) (Registry, error) { ...@@ -46,23 +46,23 @@ func NewMockRegistry(url *common.URL) (Registry, error) {
return registry, nil return registry, nil
} }
// Register ... // Register is used as a mock registry
func (*MockRegistry) Register(url common.URL) error { func (*MockRegistry) Register(url common.URL) error {
return nil return nil
} }
// Destroy ... // nolint
func (r *MockRegistry) Destroy() { func (r *MockRegistry) Destroy() {
if r.destroyed.CAS(false, true) { if r.destroyed.CAS(false, true) {
} }
} }
// IsAvailable ... // IsAvailable is use for determine a mock registry available
func (r *MockRegistry) IsAvailable() bool { func (r *MockRegistry) IsAvailable() bool {
return !r.destroyed.Load() return !r.destroyed.Load()
} }
// GetUrl ... // nolint
func (r *MockRegistry) GetUrl() common.URL { func (r *MockRegistry) GetUrl() common.URL {
return common.URL{} return common.URL{}
} }
...@@ -71,7 +71,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { ...@@ -71,7 +71,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
return r.listener, nil return r.listener, nil
} }
// Subscribe ... // nolint
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
go func() { go func() {
for { for {
...@@ -123,7 +123,7 @@ func (*listener) Close() { ...@@ -123,7 +123,7 @@ func (*listener) Close() {
} }
// MockEvent ... // nolint
func (r *MockRegistry) MockEvent(event *ServiceEvent) { func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event r.listener.listenChan <- event
} }
...@@ -51,7 +51,7 @@ type nacosListener struct { ...@@ -51,7 +51,7 @@ type nacosListener struct {
subscribeParam *vo.SubscribeParam subscribeParam *vo.SubscribeParam
} }
// NewNacosListener ... // NewRegistryDataListener creates a data listener for nacos
func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
listener := &nacosListener{ listener := &nacosListener{
namingClient: namingClient, namingClient: namingClient,
...@@ -109,6 +109,7 @@ func generateUrl(instance model.Instance) *common.URL { ...@@ -109,6 +109,7 @@ func generateUrl(instance model.Instance) *common.URL {
) )
} }
// Callback will be invoked when got subscribed events.
func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { func (nl *nacosListener) Callback(services []model.SubscribeService, err error) {
if err != nil { if err != nil {
logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
...@@ -198,6 +199,7 @@ func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) { ...@@ -198,6 +199,7 @@ func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
nl.events <- configType nl.events <- configType
} }
// Next returns the service event from nacos.
func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
for { for {
select { select {
...@@ -212,6 +214,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { ...@@ -212,6 +214,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
} }
} }
// nolint
func (nl *nacosListener) Close() { func (nl *nacosListener) Close() {
nl.stopListen() nl.stopListen()
close(nl.done) close(nl.done)
......
...@@ -123,6 +123,7 @@ func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstance ...@@ -123,6 +123,7 @@ func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstance
return instance return instance
} }
// Register will register the service @url to its nacos registry center
func (nr *nacosRegistry) Register(url common.URL) error { func (nr *nacosRegistry) Register(url common.URL) error {
serviceName := getServiceName(url) serviceName := getServiceName(url)
param := createRegisterParam(url, serviceName) param := createRegisterParam(url, serviceName)
...@@ -140,7 +141,7 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) ...@@ -140,7 +141,7 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error)
return NewNacosListener(*conf, nr.namingClient) return NewNacosListener(*conf, nr.namingClient)
} }
//subscribe from registry // subscribe from registry
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for { for {
if !nr.IsAvailable() { if !nr.IsAvailable() {
...@@ -174,14 +175,17 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti ...@@ -174,14 +175,17 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
} }
} }
// GetUrl gets its registration URL
func (nr *nacosRegistry) GetUrl() common.URL { func (nr *nacosRegistry) GetUrl() common.URL {
return *nr.URL return *nr.URL
} }
// IsAvailable determines nacos registry center whether it is available
func (nr *nacosRegistry) IsAvailable() bool { func (nr *nacosRegistry) IsAvailable() bool {
return true return true
} }
// nolint
func (nr *nacosRegistry) Destroy() { func (nr *nacosRegistry) Destroy() {
return return
} }
...@@ -34,8 +34,9 @@ import ( ...@@ -34,8 +34,9 @@ import (
) )
const ( const (
defaultGroup = "DEFAULT_GROUP" defaultGroup = "DEFAULT_GROUP"
idKey = "id" idKey = "id"
defaultPageSize = 100
) )
// init will put the service discovery into extension // init will put the service discovery into extension
...@@ -92,7 +93,7 @@ func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) er ...@@ -92,7 +93,7 @@ func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) er
// GetDefaultPageSize will return the constant registry.DefaultPageSize // GetDefaultPageSize will return the constant registry.DefaultPageSize
func (n *nacosServiceDiscovery) GetDefaultPageSize() int { func (n *nacosServiceDiscovery) GetDefaultPageSize() int {
return registry.DefaultPageSize return defaultPageSize
} }
// GetServices will return the all services // GetServices will return the all services
......
...@@ -113,7 +113,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) { ...@@ -113,7 +113,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) { func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) {
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl())
assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) assert.Equal(t, defaultPageSize, serviceDiscovry.GetDefaultPageSize())
} }
func mockUrl() *common.URL { func mockUrl() *common.URL {
......
...@@ -117,6 +117,7 @@ func (proto *registryProtocol) initConfigurationListeners() { ...@@ -117,6 +117,7 @@ func (proto *registryProtocol) initConfigurationListeners() {
proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners)
} }
// Refer provider service from registry center
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url var registryUrl = url
var serviceUrl = registryUrl.SubURL var serviceUrl = registryUrl.SubURL
...@@ -156,6 +157,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { ...@@ -156,6 +157,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
return invoker return invoker
} }
// Export provider service to registry center
func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
proto.once.Do(func() { proto.once.Do(func() {
proto.initConfigurationListeners() proto.initConfigurationListeners()
...@@ -229,6 +231,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv ...@@ -229,6 +231,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto}
} }
// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd { if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
...@@ -325,6 +328,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { ...@@ -325,6 +328,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL {
return newUrl return newUrl
} }
// Destroy registry protocol
func (proto *registryProtocol) Destroy() { func (proto *registryProtocol) Destroy() {
for _, ivk := range proto.invokers { for _, ivk := range proto.invokers {
ivk.Destroy() ivk.Destroy()
...@@ -389,6 +393,7 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke ...@@ -389,6 +393,7 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke
} }
} }
// Invoke remote service base on URL of wrappedInvoker
func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
// get right url // get right url
ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl()) ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl())
...@@ -411,6 +416,7 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf ...@@ -411,6 +416,7 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf
return listener return listener
} }
// Process notified once there's any change happens on the provider config
func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
listener.BaseConfigurationListener.Process(event) listener.BaseConfigurationListener.Process(event)
listener.overrideListeners.Range(func(key, value interface{}) bool { listener.overrideListeners.Range(func(key, value interface{}) bool {
...@@ -435,6 +441,7 @@ func newServiceConfigurationListener(overrideListener *overrideSubscribeListener ...@@ -435,6 +441,7 @@ func newServiceConfigurationListener(overrideListener *overrideSubscribeListener
return listener return listener
} }
// Process notified once there's any change happens on the service config
func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
listener.BaseConfigurationListener.Process(event) listener.BaseConfigurationListener.Process(event)
listener.overrideListener.doOverrideIfNecessary() listener.overrideListener.doOverrideIfNecessary()
......
...@@ -45,13 +45,16 @@ type Registry interface { ...@@ -45,13 +45,16 @@ type Registry interface {
Subscribe(*common.URL, NotifyListener) Subscribe(*common.URL, NotifyListener)
} }
// NotifyListener ... // nolint
type NotifyListener interface { type NotifyListener interface {
// Notify supports notifications on the service interface and the dimension of the data type.
Notify(*ServiceEvent) Notify(*ServiceEvent)
} }
// Listener Deprecated! // Listener Deprecated!
type Listener interface { type Listener interface {
// Next returns next service event once received
Next() (*ServiceEvent, error) Next() (*ServiceEvent, error)
// Close closes this listener
Close() Close()
} }
...@@ -26,8 +26,7 @@ import ( ...@@ -26,8 +26,7 @@ import (
gxpage "github.com/dubbogo/gost/page" gxpage "github.com/dubbogo/gost/page"
) )
const DefaultPageSize = 100 // ServiceDiscovery is the common operations of Service Discovery
type ServiceDiscovery interface { type ServiceDiscovery interface {
fmt.Stringer fmt.Stringer
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package registry package registry
// ServiceInstance is the model class of an instance of a service, which is used for service registration and discovery.
type ServiceInstance interface { type ServiceInstance interface {
// GetId will return this instance's id. It should be unique. // GetId will return this instance's id. It should be unique.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment