Skip to content
Snippets Groups Projects
Commit 5a111c31 authored by 邹毅贤's avatar 邹毅贤
Browse files

add comment in all files of registry directory

parent 106040d5
No related branches found
No related tags found
No related merge requests found
Showing
with 94 additions and 32 deletions
......@@ -27,6 +27,8 @@ import (
// ConfigurationListener for changing listener's event
type ConfigurationListener interface {
// Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config
// the listener listens on.
Process(*ConfigChangeEvent)
}
......
......@@ -29,19 +29,19 @@ import (
"github.com/apache/dubbo-go/remoting"
)
// BaseConfigurationListener ...
// BaseConfigurationListener will get notified when the config it listens on changes.
type BaseConfigurationListener struct {
configurators []config_center.Configurator
dynamicConfiguration config_center.DynamicConfiguration
defaultConfiguratorFunc func(url *common.URL) config_center.Configurator
}
// Configurators ...
// Configurators gets Configurator from config center
func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator {
return bcl.configurators
}
// InitWith ...
// InitWith will init BaseConfigurationListener with @key 、@listener and @f
func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) {
bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration()
if bcl.dynamicConfiguration == nil {
......@@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente
}
}
// Process ...
// Process can reference ConfigurationListener.Process
func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
if event.ConfigType == remoting.EventTypeDel {
......@@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin
return nil
}
// OverrideUrl ...
// OverrideUrl gets existing configuration rule and override provider url before exporting.
func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) {
for _, v := range bcl.configurators {
v.Configure(url)
}
}
// ToConfigurators ...
// ToConfigurators converts override urls to map for use when re-refer. Send all rules every time, the urls will be reassembled and calculated
func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator {
if len(urls) == 0 {
return nil
......
......@@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) {
}
}
// Next returns next service event once received
func (l *consulListener) Next() (*registry.ServiceEvent, error) {
select {
case event := <-l.eventCh:
......@@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) {
}
}
// Close closes this listener
func (l *consulListener) Close() {
close(l.done)
l.plan.Stop()
......
......@@ -36,8 +36,7 @@ import (
)
const (
// RegistryConnDelay ...
RegistryConnDelay = 3
registryConnDelay = 3
)
func init() {
......@@ -74,6 +73,7 @@ func newConsulRegistry(url *common.URL) (registry.Registry, error) {
return r, nil
}
// Register service to consul registry center
func (r *consulRegistry) Register(url common.URL) error {
var err error
......@@ -95,6 +95,7 @@ func (r *consulRegistry) register(url common.URL) error {
return r.client.Agent().ServiceRegister(service)
}
// Unregister service from consul registry center
func (r *consulRegistry) Unregister(url common.URL) error {
var err error
......@@ -112,6 +113,7 @@ func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url))
}
// Subscribe service from consul registry center
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER {
......@@ -133,7 +135,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
time.Sleep(time.Duration(registryConnDelay) * time.Second)
continue
}
......@@ -156,10 +158,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error)
return listener, err
}
// GetUrl get registry URL of consul registry center
func (r *consulRegistry) GetUrl() common.URL {
return *r.URL
}
// IsAvailable determines consul registry center whether is available
func (r *consulRegistry) IsAvailable() bool {
select {
case <-r.done:
......@@ -169,6 +173,7 @@ func (r *consulRegistry) IsAvailable() bool {
}
}
// Destroy consul registry center
func (r *consulRegistry) Destroy() {
close(r.done)
}
......@@ -46,6 +46,8 @@ func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
}
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
......
......@@ -38,15 +38,17 @@ type dataListener struct {
listener config_center.ConfigurationListener
}
// NewRegistryDataListener
// NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}
// AddInterestedURL add more URL of registry center to listen
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
// Process data change event from registry center of etcd
func (l *dataListener) DataChange(eventType remoting.Event) bool {
index := strings.Index(eventType.Path, "/providers/")
......@@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}
// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
......@@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
// Close etcd registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
......@@ -57,17 +57,17 @@ type etcdV3Registry struct {
configListener *configurationListener
}
// Client get the etcdv3 client
// Client gets the etcdv3 client
func (r *etcdV3Registry) Client() *etcdv3.Client {
return r.client
}
//SetClient set the etcdv3 client
// SetClient sets the etcdv3 client
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
r.client = client
}
//
// ClientLock gets registry control lock
func (r *etcdV3Registry) ClientLock() *sync.Mutex {
return &r.cltLock
}
......@@ -104,27 +104,32 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
return r, nil
}
// InitListeners init listeners of etcd registry center
func (r *etcdV3Registry) InitListeners() {
r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
}
// DoRegister actually do the register job in the registry center of etcd
func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
// CloseAndNilClient closes listeners and clear client
func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}
// CloseListener closes listeners
func (r *etcdV3Registry) CloseListener() {
if r.configListener != nil {
r.configListener.Close()
}
}
// CreatePath create the path in the registry center of etcd
func (r *etcdV3Registry) CreatePath(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
......@@ -137,6 +142,7 @@ func (r *etcdV3Registry) CreatePath(k string) error {
return nil
}
// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
......
......@@ -36,7 +36,7 @@ func init() {
// service event
// ////////////////////////////////////////
// ServiceEvent ...
// ServiceEvent is create、update or delete event of service
type ServiceEvent struct {
Action remoting.EventType
Service common.URL
......
......@@ -36,6 +36,7 @@ type ConditionalEventListener interface {
Accept(e Event) bool
}
// ServiceInstancesChangedListener is use for the Service Discovery Changed
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
......
......@@ -38,12 +38,12 @@ type dataListener struct {
listener config_center.ConfigurationListener
}
// NewRegistryDataListener
// NewRegistryDataListener creates a data listener for kubernetes
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}
// AddInterestedURL
// AddInterestedURL add more URL of registry center to listen
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
......@@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}
// Process data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
......@@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
}
// Close kubernetes registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
......@@ -68,23 +68,28 @@ type kubernetesRegistry struct {
configListener *configurationListener
}
// Client gets the etcdv3 kubernetes
func (r *kubernetesRegistry) Client() *kubernetes.Client {
r.cltLock.RLock()
client := r.client
r.cltLock.RUnlock()
return client
}
// SetClient sets the kubernetes client
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.cltLock.Lock()
r.client = client
r.cltLock.Unlock()
}
// CloseAndNilClient closes listeners and clear client
func (r *kubernetesRegistry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}
// CloseListener closes listeners
func (r *kubernetesRegistry) CloseListener() {
r.cltLock.Lock()
......@@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() {
r.configListener = nil
}
// CreatePath create the path in the registry center of kubernetes
func (r *kubernetesRegistry) CreatePath(k string) error {
if err := r.client.Create(k, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in kubernetes", k)
......@@ -103,10 +109,12 @@ func (r *kubernetesRegistry) CreatePath(k string) error {
return nil
}
// DoRegister actually do the register job in the registry center of kubernetes
func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
// DoSubscribe actually subscribe the provider URL
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
......@@ -139,6 +147,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
return configListener, nil
}
// InitListeners init listeners of kubernetes registry center
func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
......@@ -183,6 +192,7 @@ func newMockKubernetesRegistry(
return r, nil
}
// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() {
var (
......
......@@ -30,13 +30,13 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
// MockRegistry ...
// MockRegistry is use for mock registry
type MockRegistry struct {
listener *listener
destroyed *atomic.Bool
}
// NewMockRegistry ...
// NewMockRegistry is use for create a mock registry
func NewMockRegistry(url *common.URL) (Registry, error) {
registry := &MockRegistry{
destroyed: atomic.NewBool(false),
......@@ -46,23 +46,23 @@ func NewMockRegistry(url *common.URL) (Registry, error) {
return registry, nil
}
// Register ...
// Register is use for register a mock registry
func (*MockRegistry) Register(url common.URL) error {
return nil
}
// Destroy ...
// Destroy is use for destory a mock registry
func (r *MockRegistry) Destroy() {
if r.destroyed.CAS(false, true) {
}
}
// IsAvailable ...
// IsAvailable is use for determine a mock registry available
func (r *MockRegistry) IsAvailable() bool {
return !r.destroyed.Load()
}
// GetUrl ...
// GetUrl is use for register a mock registry URL
func (r *MockRegistry) GetUrl() common.URL {
return common.URL{}
}
......@@ -71,7 +71,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
return r.listener, nil
}
// Subscribe ...
// Subscribe is use for subscribe a mock registry
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
go func() {
for {
......@@ -123,7 +123,7 @@ func (*listener) Close() {
}
// MockEvent ...
// MockEvent is use for register a mock event
func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event
}
......@@ -51,7 +51,7 @@ type nacosListener struct {
subscribeParam *vo.SubscribeParam
}
// NewNacosListener ...
// NewRegistryDataListener creates a data listener for nacos
func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
......@@ -109,6 +109,7 @@ func generateUrl(instance model.Instance) *common.URL {
)
}
// Callback will callback when subscribed
func (nl *nacosListener) Callback(services []model.SubscribeService, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
......@@ -198,6 +199,7 @@ func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
nl.events <- configType
}
// Next returns next service event once received
func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
for {
select {
......@@ -212,6 +214,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
}
}
// Close nacos registry center
func (nl *nacosListener) Close() {
nl.stopListen()
close(nl.done)
......
......@@ -123,6 +123,7 @@ func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstance
return instance
}
// Register service to nacos registry center
func (nr *nacosRegistry) Register(url common.URL) error {
serviceName := getServiceName(url)
param := createRegisterParam(url, serviceName)
......@@ -174,14 +175,17 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
}
}
// GetUrl get registry URL of nacos registry center
func (nr *nacosRegistry) GetUrl() common.URL {
return *nr.URL
}
// IsAvailable determines nacos registry center whether is available
func (nr *nacosRegistry) IsAvailable() bool {
return true
}
// Destroy nacos registry center
func (nr *nacosRegistry) Destroy() {
return
}
......@@ -34,8 +34,9 @@ import (
)
const (
defaultGroup = "DEFAULT_GROUP"
idKey = "id"
defaultGroup = "DEFAULT_GROUP"
idKey = "id"
defaultPageSize = 100
)
// init will put the service discovery into extension
......@@ -92,7 +93,7 @@ func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) er
// GetDefaultPageSize will return the constant registry.DefaultPageSize
func (n *nacosServiceDiscovery) GetDefaultPageSize() int {
return registry.DefaultPageSize
return registry.defaultPageSize
}
// GetServices will return the all services
......
......@@ -113,7 +113,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) {
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl())
assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize())
assert.Equal(t, defaultPageSize, serviceDiscovry.GetDefaultPageSize())
}
func mockUrl() *common.URL {
......
......@@ -117,6 +117,7 @@ func (proto *registryProtocol) initConfigurationListeners() {
proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners)
}
// Refer provider service from registry center
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
var serviceUrl = registryUrl.SubURL
......@@ -156,6 +157,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
return invoker
}
// Export provider service to registry center
func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
proto.once.Do(func() {
proto.initConfigurationListeners()
......@@ -229,6 +231,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto}
}
// Notify will triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
......@@ -325,6 +328,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL {
return newUrl
}
// Destroy registry protocol
func (proto *registryProtocol) Destroy() {
for _, ivk := range proto.invokers {
ivk.Destroy()
......@@ -389,6 +393,7 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke
}
}
// Invoke remote service base on URL of wrappedInvoker
func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
// get right url
ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl())
......@@ -411,6 +416,8 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf
return listener
}
// Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config
// the listener listens on.
func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
listener.BaseConfigurationListener.Process(event)
listener.overrideListeners.Range(func(key, value interface{}) bool {
......@@ -435,6 +442,8 @@ func newServiceConfigurationListener(overrideListener *overrideSubscribeListener
return listener
}
// Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config
// the listener listens on.
func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
listener.BaseConfigurationListener.Process(event)
listener.overrideListener.doOverrideIfNecessary()
......
......@@ -45,13 +45,21 @@ type Registry interface {
Subscribe(*common.URL, NotifyListener)
}
// NotifyListener ...
// NotifyListener is used for triggered when a service change notification is received.
type NotifyListener interface {
// Notify needs to support the contract: <br>
// 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br>
// 2. The first notification at a subscription must be a full notification of all types of data of a service.<br>
// 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
// 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
// 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br>
Notify(*ServiceEvent)
}
// Listener Deprecated!
type Listener interface {
// Next returns next service event once received
Next() (*ServiceEvent, error)
// Close closes this listener
Close()
}
......@@ -26,8 +26,7 @@ import (
gxpage "github.com/dubbogo/gost/page"
)
const DefaultPageSize = 100
// ServiceDiscovery is the common operations of Service Discovery
type ServiceDiscovery interface {
fmt.Stringer
......
......@@ -17,6 +17,7 @@
package registry
// ServiceInstance is the model class of an instance of a service, which is used for service registration and discovery.
type ServiceInstance interface {
// GetId will return this instance's id. It should be unique.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment