diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index aedc18494d27d905b24348c2ec3d608517abd5bd..76aa42d0a14147bc37b348c15118deaebc2ac8a2 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -69,12 +69,6 @@ jobs: - name: Hello world run: echo Hello world ${{ secrets.DING_TOKEN }} ${{ secrets.DING_SIGN }} - - name: Install go ci lint - run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0 - - - name: Run Linter - run: golangci-lint run --timeout=10m -v --disable-all --enable=govet --enable=staticcheck --enable=ineffassign --enable=misspell - # Because the contexts of push and PR are different, there are two Notify. # Notifications are triggered only in the dubbogo/gost repository. - name: DingTalk Message Notify only Push diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go index f55cd07bc9b360d3d79a7004bcc46ee8e8bc0d89..4a373bffb4b7a0436ef35d0009bb93ce7174461e 100644 --- a/cluster/router/chain/chain.go +++ b/cluster/router/chain/chain.go @@ -195,9 +195,6 @@ func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Inv func (c *RouterChain) buildCache() { origin := c.loadCache() invokers := c.copyInvokerIfNecessary(origin) - if invokers == nil || len(invokers) == 0 { - return - } var ( mutex sync.Mutex diff --git a/metadata/report/nacos/report_test.go b/metadata/report/nacos/report_test.go index b40b4380dbf8339277436b77d26d9ee3ab728a21..1b7075ec78e4ac496983564b684d8776e1476b63 100644 --- a/metadata/report/nacos/report_test.go +++ b/metadata/report/nacos/report_test.go @@ -19,8 +19,10 @@ package nacos import ( "encoding/json" + "net/http" "strconv" "testing" + "time" ) import ( @@ -36,6 +38,9 @@ import ( ) func TestNacosMetadataReport_CRUD(t *testing.T) { + if !checkNacosServerAlive() { + return + } rpt := newTestReport() assert.NotNil(t, rpt) @@ -114,3 +119,11 @@ func newTestReport() report.MetadataReport { res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(regurl) return res } + +func checkNacosServerAlive() bool { + c := http.Client{Timeout: time.Second} + if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil { + return false + } + return true +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index e6ae0f3bb9004f8b933d420cdd2e7561f8232b48..5971921e01723477ae5140eb6874ea68e0800266 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -66,6 +66,7 @@ type RegistryDirectory struct { referenceConfigurationListener *referenceConfigurationListener serviceKey string forbidden atomic.Bool + registerLock sync.Mutex // this lock if for register } // NewRegistryDirectory will create a new RegistryDirectory @@ -73,6 +74,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } + logger.Debugf("new RegistryDirectory for service :%s.", url.Key()) dir := &RegistryDirectory{ BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, @@ -97,69 +99,128 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster. // subscribe from registry func (dir *RegistryDirectory) subscribe(url *common.URL) { + logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key()) dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) dir.registry.Subscribe(url, dir) } // Notify monitor changes from registry,and update the cacheServices -func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) { - go dir.refreshInvokers(events...) +func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) { + if event == nil { + return + } + go dir.refreshInvokers(event) +} + +// NotifyAll notify the events that are complete Service Event List. +// After notify the address, the callback func will be invoked. +func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent, callback func()) { + go dir.refreshAllInvokers(events, callback) +} + +// refreshInvokers refreshes service's events. +func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) { + if event != nil { + logger.Debugf("refresh invokers with %+v", event) + } else { + logger.Debug("refresh invokers with nil") + } + + var oldInvoker protocol.Invoker + if event != nil { + oldInvoker, _ = dir.cacheInvokerByEvent(event) + } + dir.setNewInvokers() + if oldInvoker != nil { + oldInvoker.Destroy() + } } -// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single -// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is -// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore -// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary -// since in batch mode, the register center handles the different type of events by itself, then notify the directory -// a batch of 'Update' events, instead of omit the different type of event one by one. -func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) { - var oldInvokers []protocol.Invoker +// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker +// not in the incoming list can be removed. The Action of serviceEvent should be EventTypeUpdate. +func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent, callback func()) { + var ( + oldInvokers []protocol.Invoker + addEvents []*registry.ServiceEvent + ) + dir.overrideUrl(dir.GetDirectoryUrl()) + referenceUrl := dir.GetDirectoryUrl().SubURL - // in batch mode, it is safe to remove since we have the complete list of events. - if len(events) > 1 { + // loop the events to check the Action should be EventTypeUpdate. + for _, event := range events { + if event.Action != remoting.EventTypeUpdate { + panic("Your implements of register center is wrong, " + + "please check the Action of ServiceEvent should be EventTypeUpdate") + return + } + // Originally it will Merge URL many times, now we just execute once. + // MergeUrl is executed once and put the result into Event. After this, the key will get from Event.Key(). + newUrl := dir.convertUrl(event) + newUrl = common.MergeUrl(newUrl, referenceUrl) + dir.overrideUrl(newUrl) + event.Update(newUrl) + } + // After notify all addresses, do some callback. + defer callback() + func() { + // this lock is work at batch update of InvokeCache + dir.registerLock.Lock() + defer dir.registerLock.Unlock() + // get need clear invokers from original invoker list dir.cacheInvokersMap.Range(func(k, v interface{}) bool { if !dir.eventMatched(k.(string), events) { + // delete unused invoker from cache if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil { oldInvokers = append(oldInvokers, invoker) } } return true }) - } - - for _, event := range events { - logger.Debugf("registry update, result{%s}", event) - if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil { - oldInvokers = append(oldInvokers, oldInvoker) + // get need add invokers from events + for _, event := range events { + // Get the key from Event.Key() + if _, ok := dir.cacheInvokersMap.Load(event.Key()); !ok { + addEvents = append(addEvents, event) + } } - } - - if len(events) > 0 { - dir.setNewInvokers() - } - - // After dir.cacheInvokers is updated,destroy the oldInvoker - // Ensure that no request will enter the oldInvoker + // loop the updateEvents + for _, event := range addEvents { + logger.Debugf("registry update, result{%s}", event) + logger.Infof("selector add service url{%s}", event.Service) + // FIXME: routers are built in every address notification? + dir.configRouters() + if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil { + oldInvokers = append(oldInvokers, oldInvoker) + } + } + }() + dir.setNewInvokers() + // destroy unused invokers for _, invoker := range oldInvokers { - invoker.Destroy() + go invoker.Destroy() } } // eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove. func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool { for _, event := range events { - if dir.invokerCacheKey(event.Service) == key { + if dir.invokerCacheKey(event) == key { return true } } return false } -// invokerCacheKey generates the key in the cache for a given URL. -func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string { +// invokerCacheKey generates the key in the cache for a given ServiceEvent. +func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) string { + // If the url is merged, then return Event.Key() directly. + if event.Updated() { + return event.Key() + } referenceUrl := dir.GetDirectoryUrl().SubURL - newUrl := common.MergeUrl(url, referenceUrl) + newUrl := common.MergeUrl(event.Service, referenceUrl) + event.Update(newUrl) return newUrl.Key() } @@ -294,30 +355,38 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { newUrl := common.MergeUrl(url, referenceUrl) dir.overrideUrl(newUrl) - if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok { - logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl) - newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl) - if newInvoker != nil { - dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) - } - } else { - // if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy - // the old invoker. - if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) { - return nil - } - - logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) - newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl) - if newInvoker != nil { - dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) - return cacheInvoker.(protocol.Invoker) - } + if v, ok := dir.doCacheInvoker(newUrl); ok { + return v } } return nil } +func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) { + key := newUrl.Key() + if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok { + logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl) + if newInvoker != nil { + dir.cacheInvokersMap.Store(key, newInvoker) + } + } else { + // if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy + // the old invoker. + if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) { + return nil, true + } + + logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl) + if newInvoker != nil { + dir.cacheInvokersMap.Store(key, newInvoker) + return cacheInvoker.(protocol.Invoker), true + } + } + return nil, false +} + // List selected protocol invokers from the directory func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { invokers := dir.cacheInvokers @@ -406,7 +475,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { l.BaseConfigurationListener.Process(event) // FIXME: this doesn't trigger dir.overrideUrl() - l.directory.refreshInvokers() + l.directory.refreshInvokers(nil) } type consumerConfigurationListener struct { @@ -433,5 +502,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { l.BaseConfigurationListener.Process(event) // FIXME: this doesn't trigger dir.overrideUrl() - l.directory.refreshInvokers() + l.directory.refreshInvokers(nil) } diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 9008e6ecb79993626bac19a8fcf6870532386b01..b5d81eb0da0b7ae635b8a54e3002a9380a69a2fd 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -192,6 +192,34 @@ func Test_toGroupInvokers(t *testing.T) { assert.Len(t, groupInvokers, 2) } +func Test_RefreshUrl(t *testing.T) { + registryDirectory, mockRegistry := normalRegistryDir() + providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService", + common.WithParamsValue(constant.CLUSTER_KEY, "mock1"), + common.WithParamsValue(constant.GROUP_KEY, "group"), + common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) + providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService", + common.WithParamsValue(constant.CLUSTER_KEY, "mock1"), + common.WithParamsValue(constant.GROUP_KEY, "group"), + common.WithParamsValue(constant.VERSION_KEY, "1.0.0")) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 3) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl}) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 4) + mockRegistry.MockEvents([]*registry.ServiceEvent{®istry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl}}) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 1) + mockRegistry.MockEvents([]*registry.ServiceEvent{®istry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl}, + ®istry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl2}}) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 2) + // clear all address + mockRegistry.MockEvents([]*registry.ServiceEvent{}) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 0) +} + func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) { extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) diff --git a/registry/event.go b/registry/event.go index c93415841b1d144a462e7cf71713882e0bcdd8c6..2ebc375597aef67b7605073b852ce53f68cb58e4 100644 --- a/registry/event.go +++ b/registry/event.go @@ -41,11 +41,36 @@ func init() { type ServiceEvent struct { Action remoting.EventType Service *common.URL + // store the key for Service.Key() + key string + // If the url is updated, such as Merged. + updated bool } // String return the description of event -func (e ServiceEvent) String() string { - return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service) +func (e *ServiceEvent) String() string { + return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}, Key{%s}}", e.Action, e.Service, e.key) +} + +// Update() update the url with the merged URL. Work with Updated() can reduce the process of some merging URL. +func (e *ServiceEvent) Update(url *common.URL) { + e.Service = url + e.updated = true +} + +// Updated() check if the url is updated. +// If the serviceEvent is updated, then it don't need merge url again. +func (e *ServiceEvent) Updated() bool { + return e.updated +} + +// Key() generate the key for service.Key(). It is cached once. +func (e *ServiceEvent) Key() string { + if len(e.key) > 0 { + return e.key + } + e.key = e.Service.Key() + return e.key } // ServiceInstancesChangedEvent represents service instances make some changing diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 18c87ee3ace79d98c0d0faa6e915dd64d2fe5362..7c269c3a4044079d19c7c41b53a9ae04b9eab86b 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -18,6 +18,7 @@ package registry import ( + "fmt" "time" ) @@ -32,14 +33,16 @@ import ( // MockRegistry is used as mock registry type MockRegistry struct { - listener *listener - destroyed *atomic.Bool + listener *listener + destroyed *atomic.Bool + allAddress chan []*ServiceEvent } // NewMockRegistry creates a mock registry func NewMockRegistry(url *common.URL) (Registry, error) { registry := &MockRegistry{ - destroyed: atomic.NewBool(false), + destroyed: atomic.NewBool(false), + allAddress: make(chan []*ServiceEvent), } listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)} registry.listener = listener @@ -80,22 +83,12 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { go func() { for { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - time.Sleep(time.Duration(3) * time.Second) - return - } - - listener, err := r.subscribe(url) - if err != nil { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - time.Sleep(time.Duration(3) * time.Second) + t, listener := r.checkLoopSubscribe(url) + if t == 0 { continue + } else if t == -1 { + return } - for { serviceEvent, err := listener.Next() if err != nil { @@ -109,6 +102,26 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } } }() + go func() { + for { + t, _ := r.checkLoopSubscribe(url) + if t == 0 { + continue + } else if t == -1 { + return + } + + for { + select { + case e := <-r.allAddress: + notifyListener.NotifyAll(e, func() { + fmt.Print("notify all ok") + }) + break + } + } + } + }() return nil } @@ -138,3 +151,27 @@ func (*listener) Close() { func (r *MockRegistry) MockEvent(event *ServiceEvent) { r.listener.listenChan <- event } + +// nolint +func (r *MockRegistry) MockEvents(events []*ServiceEvent) { + r.allAddress <- events +} + +func (r *MockRegistry) checkLoopSubscribe(url *common.URL) (int, Listener) { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + time.Sleep(time.Duration(3) * time.Second) + return -1, nil + } + + listener, err := r.subscribe(url) + if err != nil { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return -1, nil + } + time.Sleep(time.Duration(3) * time.Second) + return 0, nil + } + return 1, listener +} diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go index 43a6da79d13c0964162f60f45ed64a854d0e3a2c..078b8ce59c29acae35ccbefafbb9cbfb3d0b205d 100644 --- a/registry/nacos/registry_test.go +++ b/registry/nacos/registry_test.go @@ -19,9 +19,11 @@ package nacos import ( "encoding/json" + "net/http" "net/url" "strconv" "testing" + "time" ) import ( @@ -35,6 +37,9 @@ import ( ) func TestNacosRegistry_Register(t *testing.T) { + if !checkNacosServerAlive() { + return + } regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) urlMap := url.Values{} urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") @@ -64,6 +69,9 @@ func TestNacosRegistry_Register(t *testing.T) { } func TestNacosRegistry_Subscribe(t *testing.T) { + if !checkNacosServerAlive() { + return + } regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) urlMap := url.Values{} urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") @@ -102,6 +110,9 @@ func TestNacosRegistry_Subscribe(t *testing.T) { } func TestNacosRegistry_Subscribe_del(t *testing.T) { + if !checkNacosServerAlive() { + return + } regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) urlMap := url.Values{} urlMap.Set(constant.GROUP_KEY, "guangzhou-idc") @@ -188,3 +199,11 @@ func TestNacosListener_Close(t *testing.T) { _, err = listener.Next() assert.NotNil(t, err) } + +func checkNacosServerAlive() bool { + c := http.Client{Timeout: time.Second} + if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil { + return false + } + return true +} diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 0e5ad8e6990856aeb0dfdde72f9c3f7fdae3e985..b38e150e51075ce47ee38c9c8c9c7280e4437c53 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -141,7 +141,8 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv GroupName: n.group, }) if err != nil { - logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group) + logger.Errorf("Could not query the instances for service: %+v, group: %+v . It happened err %+v", + serviceName, n.group, err) return make([]registry.ServiceInstance, 0, 0) } res := make([]registry.ServiceInstance, 0, len(instances)) diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index 119be0b3aad3a828470c8c72c775abaada9512c2..3b09136d2e273090bea339e0a15b25dde719b6ac 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -81,6 +81,9 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) { } func TestNacosServiceDiscovery_CRUD(t *testing.T) { + if !checkNacosServerAlive() { + return + } prepareData() extension.SetEventDispatcher("mock", func() observer.EventDispatcher { return &dispatcher.MockEventDispatcher{} diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index e3af9ba97270c0c5981d1998c3db415f53a69ee7..3be88c3beb6d909c619ff5c8ff51b90359a31d7b 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -240,18 +240,23 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv } // Notify will be triggered when a service change notification is received. -func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) { - if len(events) == 0 { - return - } - - event := events[0] +func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { if isMatched(event.Service, nl.url) && event.Action == remoting.EventTypeAdd { nl.configurator = extension.GetDefaultConfigurator(event.Service) nl.doOverrideIfNecessary() } } +func (nl *overrideSubscribeListener) NotifyAll(events []*registry.ServiceEvent, callback func()) { + defer callback() + if len(events) == 0 { + return + } + for _, e := range events { + nl.Notify(e) + } +} + func (nl *overrideSubscribeListener) doOverrideIfNecessary() { providerUrl := getProviderUrl(nl.originInvoker) key := getCacheKey(providerUrl) diff --git a/registry/registry.go b/registry/registry.go index 73940fa12acd195bd24c05b4ef22af70b92fc659..439178390a2fcedba3bd1c9919d9a797f44e3a21 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -72,7 +72,12 @@ type NotifyListener interface { // events are passed in, it's considered as a complete list, on the other side, if one single event is // passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes, // the impl of NotifyListener may abandon the accumulated result from previous notifications. - Notify(...*ServiceEvent) + Notify(*ServiceEvent) + // NotifyAll the events are complete Service Event List. + // The argument of events []*ServiceEvent is equal to urls []*URL, The Action of serviceEvent should be EventTypeUpdate. + // If your registry center can only get all urls but can't get individual event, you should use this one. + // After notify the address, the callback func will be invoked. + NotifyAll([]*ServiceEvent, func()) } // Listener Deprecated!