Skip to content
Snippets Groups Projects
Unverified Commit 3d49970f authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #589 from zouyx/feature/addRegistryComment

Mod: update the comments in registy directory
parents aed6803e d109e557
No related branches found
No related tags found
No related merge requests found
Showing
with 84 additions and 30 deletions
......@@ -27,6 +27,7 @@ import (
// ConfigurationListener for changing listener's event
type ConfigurationListener interface {
// Process the notification event once there's any change happens on the config
Process(*ConfigChangeEvent)
}
......
......@@ -29,19 +29,19 @@ import (
"github.com/apache/dubbo-go/remoting"
)
// BaseConfigurationListener ...
// nolint
type BaseConfigurationListener struct {
configurators []config_center.Configurator
dynamicConfiguration config_center.DynamicConfiguration
defaultConfiguratorFunc func(url *common.URL) config_center.Configurator
}
// Configurators ...
// Configurators gets Configurator from config center
func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator {
return bcl.configurators
}
// InitWith ...
// InitWith will init BaseConfigurationListener by @key+@Listener+@f
func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) {
bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration()
if bcl.dynamicConfiguration == nil {
......@@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente
}
}
// Process ...
// Process the notification event once there's any change happens on the config.
func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
if event.ConfigType == remoting.EventTypeDel {
......@@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin
return nil
}
// OverrideUrl ...
// OverrideUrl gets existing configuration rule and overrides provider url before exporting.
func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) {
for _, v := range bcl.configurators {
v.Configure(url)
}
}
// ToConfigurators ...
// ToConfigurators converts @urls by @f to config_center.Configurators
func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator {
if len(urls) == 0 {
return nil
......
......@@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) {
}
}
// Next returns the service event from consul.
func (l *consulListener) Next() (*registry.ServiceEvent, error) {
select {
case event := <-l.eventCh:
......@@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) {
}
}
// Close closes this listener
func (l *consulListener) Close() {
close(l.done)
l.plan.Stop()
......
......@@ -36,8 +36,7 @@ import (
)
const (
// RegistryConnDelay ...
RegistryConnDelay = 3
registryConnDelay = 3
)
func init() {
......@@ -74,6 +73,7 @@ func newConsulRegistry(url *common.URL) (registry.Registry, error) {
return r, nil
}
// Register service to consul registry center
func (r *consulRegistry) Register(url common.URL) error {
var err error
......@@ -95,6 +95,7 @@ func (r *consulRegistry) register(url common.URL) error {
return r.client.Agent().ServiceRegister(service)
}
// Unregister service from consul registry center
func (r *consulRegistry) Unregister(url common.URL) error {
var err error
......@@ -112,6 +113,7 @@ func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url))
}
// Subscribe service from consul registry center
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER {
......@@ -133,7 +135,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
time.Sleep(time.Duration(registryConnDelay) * time.Second)
continue
}
......@@ -156,10 +158,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error)
return listener, err
}
// GetUrl get registry URL of consul registry center
func (r *consulRegistry) GetUrl() common.URL {
return *r.URL
}
// IsAvailable checks consul registry center whether is available
func (r *consulRegistry) IsAvailable() bool {
select {
case <-r.done:
......@@ -169,6 +173,7 @@ func (r *consulRegistry) IsAvailable() bool {
}
}
// Destroy consul registry center
func (r *consulRegistry) Destroy() {
close(r.done)
}
......@@ -46,6 +46,8 @@ func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
}
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
......
......@@ -38,15 +38,17 @@ type dataListener struct {
listener config_center.ConfigurationListener
}
// NewRegistryDataListener
// NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}
// AddInterestedURL adds a registration @url to listen
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
// DataChange processes the data change event from registry center of etcd
func (l *dataListener) DataChange(eventType remoting.Event) bool {
index := strings.Index(eventType.Path, "/providers/")
......@@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}
// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
......@@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
// Close etcd registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
......@@ -104,27 +104,32 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
return r, nil
}
// InitListeners init listeners of etcd registry center
func (r *etcdV3Registry) InitListeners() {
r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
}
// DoRegister actually do the register job in the registry center of etcd
func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
// CloseAndNilClient closes listeners and clear client
func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}
// CloseListener closes listeners
func (r *etcdV3Registry) CloseListener() {
if r.configListener != nil {
r.configListener.Close()
}
}
// CreatePath create the path in the registry center of etcd
func (r *etcdV3Registry) CreatePath(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
......@@ -137,6 +142,7 @@ func (r *etcdV3Registry) CreatePath(k string) error {
return nil
}
// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
......
......@@ -36,7 +36,7 @@ func init() {
// service event
// ////////////////////////////////////////
// ServiceEvent ...
// ServiceEvent includes create, update, delete event
type ServiceEvent struct {
Action remoting.EventType
Service common.URL
......
......@@ -36,6 +36,7 @@ type ConditionalEventListener interface {
Accept(e Event) bool
}
// ServiceInstancesChangedListener is used when the Service Discovery Changed
// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
......
......@@ -38,12 +38,12 @@ type dataListener struct {
listener config_center.ConfigurationListener
}
// NewRegistryDataListener
// NewRegistryDataListener creates a data listener for kubernetes
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}
// AddInterestedURL
// AddInterestedURL adds the @url of registry center to the listener
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
......@@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}
// Process processes the data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
......@@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
}
// Close kubernetes registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
......@@ -68,23 +68,28 @@ type kubernetesRegistry struct {
configListener *configurationListener
}
// Client gets the etcdv3 kubernetes
func (r *kubernetesRegistry) Client() *kubernetes.Client {
r.cltLock.RLock()
client := r.client
r.cltLock.RUnlock()
return client
}
// SetClient sets the kubernetes client
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.cltLock.Lock()
r.client = client
r.cltLock.Unlock()
}
// CloseAndNilClient closes listeners and clear client
func (r *kubernetesRegistry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}
// CloseListener closes listeners
func (r *kubernetesRegistry) CloseListener() {
r.cltLock.Lock()
......@@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() {
r.configListener = nil
}
// CreatePath create the path in the registry center of kubernetes
func (r *kubernetesRegistry) CreatePath(k string) error {
if err := r.client.Create(k, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in kubernetes", k)
......@@ -103,10 +109,12 @@ func (r *kubernetesRegistry) CreatePath(k string) error {
return nil
}
// DoRegister actually do the register job in the registry center of kubernetes
func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
// DoSubscribe actually subscribe the provider URL
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
......@@ -139,6 +147,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
return configListener, nil
}
// InitListeners init listeners of kubernetes registry center
func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
......@@ -183,6 +192,7 @@ func newMockKubernetesRegistry(
return r, nil
}
// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() {
var (
......
......@@ -30,13 +30,13 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
// MockRegistry ...
// MockRegistry is used as mock registry
type MockRegistry struct {
listener *listener
destroyed *atomic.Bool
}
// NewMockRegistry ...
// NewMockRegistry creates a mock registry
func NewMockRegistry(url *common.URL) (Registry, error) {
registry := &MockRegistry{
destroyed: atomic.NewBool(false),
......@@ -46,23 +46,23 @@ func NewMockRegistry(url *common.URL) (Registry, error) {
return registry, nil
}
// Register ...
// Register is used as a mock registry
func (*MockRegistry) Register(url common.URL) error {
return nil
}
// Destroy ...
// nolint
func (r *MockRegistry) Destroy() {
if r.destroyed.CAS(false, true) {
}
}
// IsAvailable ...
// IsAvailable is use for determine a mock registry available
func (r *MockRegistry) IsAvailable() bool {
return !r.destroyed.Load()
}
// GetUrl ...
// nolint
func (r *MockRegistry) GetUrl() common.URL {
return common.URL{}
}
......@@ -71,7 +71,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
return r.listener, nil
}
// Subscribe ...
// nolint
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
go func() {
for {
......@@ -123,7 +123,7 @@ func (*listener) Close() {
}
// MockEvent ...
// nolint
func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event
}
......@@ -51,7 +51,7 @@ type nacosListener struct {
subscribeParam *vo.SubscribeParam
}
// NewNacosListener ...
// NewRegistryDataListener creates a data listener for nacos
func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
......@@ -109,6 +109,7 @@ func generateUrl(instance model.Instance) *common.URL {
)
}
// Callback will be invoked when got subscribed events.
func (nl *nacosListener) Callback(services []model.SubscribeService, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam)
......@@ -198,6 +199,7 @@ func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
nl.events <- configType
}
// Next returns the service event from nacos.
func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
for {
select {
......@@ -212,6 +214,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
}
}
// nolint
func (nl *nacosListener) Close() {
nl.stopListen()
close(nl.done)
......
......@@ -123,6 +123,7 @@ func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstance
return instance
}
// Register will register the service @url to its nacos registry center
func (nr *nacosRegistry) Register(url common.URL) error {
serviceName := getServiceName(url)
param := createRegisterParam(url, serviceName)
......@@ -140,7 +141,7 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error)
return NewNacosListener(*conf, nr.namingClient)
}
//subscribe from registry
// subscribe from registry
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for {
if !nr.IsAvailable() {
......@@ -174,14 +175,17 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
}
}
// GetUrl gets its registration URL
func (nr *nacosRegistry) GetUrl() common.URL {
return *nr.URL
}
// IsAvailable determines nacos registry center whether it is available
func (nr *nacosRegistry) IsAvailable() bool {
return true
}
// nolint
func (nr *nacosRegistry) Destroy() {
return
}
......@@ -34,8 +34,9 @@ import (
)
const (
defaultGroup = "DEFAULT_GROUP"
idKey = "id"
defaultGroup = "DEFAULT_GROUP"
idKey = "id"
defaultPageSize = 100
)
// init will put the service discovery into extension
......@@ -92,7 +93,7 @@ func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) er
// GetDefaultPageSize will return the constant registry.DefaultPageSize
func (n *nacosServiceDiscovery) GetDefaultPageSize() int {
return registry.DefaultPageSize
return defaultPageSize
}
// GetServices will return the all services
......
......@@ -113,7 +113,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) {
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl())
assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize())
assert.Equal(t, defaultPageSize, serviceDiscovry.GetDefaultPageSize())
}
func mockUrl() *common.URL {
......
......@@ -117,6 +117,7 @@ func (proto *registryProtocol) initConfigurationListeners() {
proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners)
}
// Refer provider service from registry center
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
var serviceUrl = registryUrl.SubURL
......@@ -156,6 +157,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
return invoker
}
// Export provider service to registry center
func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
proto.once.Do(func() {
proto.initConfigurationListeners()
......@@ -229,6 +231,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto}
}
// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
......@@ -325,6 +328,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL {
return newUrl
}
// Destroy registry protocol
func (proto *registryProtocol) Destroy() {
for _, ivk := range proto.invokers {
ivk.Destroy()
......@@ -389,6 +393,7 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke
}
}
// Invoke remote service base on URL of wrappedInvoker
func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
// get right url
ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl())
......@@ -411,6 +416,7 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf
return listener
}
// Process notified once there's any change happens on the provider config
func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
listener.BaseConfigurationListener.Process(event)
listener.overrideListeners.Range(func(key, value interface{}) bool {
......@@ -435,6 +441,7 @@ func newServiceConfigurationListener(overrideListener *overrideSubscribeListener
return listener
}
// Process notified once there's any change happens on the service config
func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
listener.BaseConfigurationListener.Process(event)
listener.overrideListener.doOverrideIfNecessary()
......
......@@ -45,13 +45,16 @@ type Registry interface {
Subscribe(*common.URL, NotifyListener)
}
// NotifyListener ...
// nolint
type NotifyListener interface {
// Notify supports notifications on the service interface and the dimension of the data type.
Notify(*ServiceEvent)
}
// Listener Deprecated!
type Listener interface {
// Next returns next service event once received
Next() (*ServiceEvent, error)
// Close closes this listener
Close()
}
......@@ -26,8 +26,7 @@ import (
gxpage "github.com/dubbogo/gost/page"
)
const DefaultPageSize = 100
// ServiceDiscovery is the common operations of Service Discovery
type ServiceDiscovery interface {
fmt.Stringer
......
......@@ -17,6 +17,7 @@
package registry
// ServiceInstance is the model class of an instance of a service, which is used for service registration and discovery.
type ServiceInstance interface {
// GetId will return this instance's id. It should be unique.
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment