diff --git a/common/constant/key.go b/common/constant/key.go index 6acb2299c4049cb54ae6f40a1ef3a0f16410aed6..c07399111234723cfe94a370fb0018a563e64406 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -216,9 +216,9 @@ const ( // consumer CONSUMER = "consumer" // key of access key id - ACCESS_KEY_ID_KEY = "accessKeyId" + ACCESS_KEY_ID_KEY = ".accessKeyId" // key of secret access key - SECRET_ACCESS_KEY_KEY = "secretAccessKey" + SECRET_ACCESS_KEY_KEY = ".secretAccessKey" ) // metadata report diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index 11e137a14be574f0607f146def873552ebf1a501..fa195d09d7efe022be9bdf40658e355a44b8705e 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -29,7 +29,7 @@ import ( "github.com/apache/dubbo-go/common/constant" ) -// ServiceDefinition is a interface of service's definition +// ServiceDefiner is a interface of service's definition type ServiceDefiner interface { ToBytes() ([]byte, error) } @@ -42,11 +42,11 @@ type ServiceDefinition struct { Types []TypeDefinition } -func (def ServiceDefinition) ToBytes() ([]byte, error) { +func (def *ServiceDefinition) ToBytes() ([]byte, error) { return json.Marshal(def) } -func (def ServiceDefinition) String() string { +func (def *ServiceDefinition) String() string { var methodStr strings.Builder for _, m := range def.Methods { var paramType strings.Builder @@ -91,8 +91,8 @@ type TypeDefinition struct { } // BuildServiceDefinition can build service definition which will be used to describe a service -func BuildServiceDefinition(service common.Service, url common.URL) ServiceDefinition { - sd := ServiceDefinition{} +func BuildServiceDefinition(service common.Service, url common.URL) *ServiceDefinition { + sd := &ServiceDefinition{} sd.CanonicalName = url.Service() for k, m := range service.Method() { diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index 4e3995d2ea88ff6f5e06bd02e420c690bb2fa590..cb7e42030b2dec32b0537b20e2f825e638f228d0 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -19,6 +19,7 @@ package delegate import ( "encoding/json" + "runtime/debug" "sync" "time" ) @@ -129,7 +130,7 @@ func NewMetadataReport() (*MetadataReport, error) { scheduler := gocron.NewScheduler(time.UTC) _, err := scheduler.Every(1).Day().Do( func() { - logger.Info("start to publish all metadata.") + logger.Info("start to publish all metadata in metadata report %v.", url) bmr.allMetadataReportsLock.RLock() bmr.doHandlerMetadataCollection(bmr.allMetadataReports) bmr.allMetadataReportsLock.RUnlock() @@ -145,30 +146,30 @@ func NewMetadataReport() (*MetadataReport, error) { } // retry will do metadata failed reports collection by call metadata report sdk -func (bmr *MetadataReport) retry() bool { - bmr.failedReportsLock.RLock() - defer bmr.failedReportsLock.RUnlock() - return bmr.doHandlerMetadataCollection(bmr.failedReports) +func (mr *MetadataReport) retry() bool { + mr.failedReportsLock.RLock() + defer mr.failedReportsLock.RUnlock() + return mr.doHandlerMetadataCollection(mr.failedReports) } // StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition -func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { - if bmr.syncReport { - bmr.storeMetadataTask(common.PROVIDER, identifier, definer) +func (mr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { + if mr.syncReport { + mr.storeMetadataTask(common.PROVIDER, identifier, definer) } - go bmr.storeMetadataTask(common.PROVIDER, identifier, definer) + go mr.storeMetadataTask(common.PROVIDER, identifier, definer) } // storeMetadataTask will delegate to call remote metadata's sdk to store -func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { +func (mr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer) - bmr.allMetadataReportsLock.Lock() - bmr.allMetadataReports[identifier] = definer - bmr.allMetadataReportsLock.Unlock() + mr.allMetadataReportsLock.Lock() + mr.allMetadataReports[identifier] = definer + mr.allMetadataReportsLock.Unlock() - bmr.failedReportsLock.Lock() - delete(bmr.failedReports, identifier) - bmr.failedReportsLock.Unlock() + mr.failedReportsLock.Lock() + delete(mr.failedReports, identifier) + mr.failedReportsLock.Unlock() // data is store the json marshaled definition var ( data []byte @@ -177,17 +178,18 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me defer func() { if r := recover(); r != nil { - bmr.failedReportsLock.Lock() - bmr.failedReports[identifier] = definer - bmr.failedReportsLock.Unlock() - bmr.metadataReportRetry.startRetryTask() - logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r) + mr.failedReportsLock.Lock() + mr.failedReports[identifier] = definer + mr.failedReportsLock.Unlock() + mr.metadataReportRetry.startRetryTask() + logger.Errorf("Failed to put provider metadata %v in %v, cause: %v\n%s\n", + identifier, string(data), r, string(debug.Stack())) } }() data, err = json.Marshal(definer) if err != nil { - logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err) + logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %+v", err) panic(err) } report := instance.GetMetadataReportInstance() @@ -198,23 +200,23 @@ func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.Me } if err != nil { - logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %v", err) + logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %+v", err) panic(err) } } // StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition -func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { - if bmr.syncReport { - bmr.storeMetadataTask(common.CONSUMER, identifier, definer) +func (mr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { + if mr.syncReport { + mr.storeMetadataTask(common.CONSUMER, identifier, definer) } - go bmr.storeMetadataTask(common.CONSUMER, identifier, definer) + go mr.storeMetadataTask(common.CONSUMER, identifier, definer) } // SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata -func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { +func (mr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { report := instance.GetMetadataReportInstance() - if bmr.syncReport { + if mr.syncReport { return report.SaveServiceMetadata(identifier, url) } go report.SaveServiceMetadata(identifier, url) @@ -222,9 +224,9 @@ func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMet } // RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata -func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { +func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { report := instance.GetMetadataReportInstance() - if bmr.syncReport { + if mr.syncReport { return report.RemoveServiceMetadata(identifier) } go report.RemoveServiceMetadata(identifier) @@ -232,15 +234,15 @@ func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceM } // GetExportedURLs will delegate to call remote metadata's sdk to get exported urls -func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { +func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { report := instance.GetMetadataReportInstance() return report.GetExportedURLs(identifier) } // SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data -func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { +func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { report := instance.GetMetadataReportInstance() - if bmr.syncReport { + if mr.syncReport { return report.SaveSubscribedData(identifier, urls) } go report.SaveSubscribedData(identifier, urls) @@ -260,15 +262,15 @@ func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdenti } // doHandlerMetadataCollection will store metadata to metadata support with given metadataMap -func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { +func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { if len(metadataMap) == 0 { return true } for e := range metadataMap { if common.RoleType(common.PROVIDER).Role() == e.Side { - bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) + mr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) } else if common.RoleType(common.CONSUMER).Role() == e.Side { - bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) + mr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) } } return false diff --git a/metadata/report/delegate/delegate_report_test.go b/metadata/report/delegate/delegate_report_test.go index 0e8da607004e213e9bc0220a97af1664f5e431e2..04c9e6483929d3ed58fd85337db6ccb4ebd53d00 100644 --- a/metadata/report/delegate/delegate_report_test.go +++ b/metadata/report/delegate/delegate_report_test.go @@ -106,7 +106,7 @@ func TestMetadataReport_StoreProviderMetadata(t *testing.T) { mtr.StoreProviderMetadata(metadataId, getMockDefinition(metadataId, t)) } -func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) definition.ServiceDefinition { +func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) *definition.ServiceDefinition { protocol := "dubbo" beanName := "UserProvider" url, err := common.NewURL(fmt.Sprintf( diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index f4587638ef45d414f9b74007720cf7a628624415..f55c482ad846d801e57e2a98436161c6c70165c4 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -132,7 +132,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR mts.exportedRevision.Store(exportedRevision) urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "") if err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) result = false } iterator := urls.Iter(inmemory.Comparator{}) @@ -145,7 +145,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR id := identifier.NewServiceMetadataIdentifier(common.URL(url)) id.Revision = mts.exportedRevision.Load() if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) result = false } } @@ -155,7 +155,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR mts.subscribedRevision.Store(subscribedRevision) urls, err := mts.inMemoryMetadataService.GetSubscribedURLs() if err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err) result = false } if urls != nil && urls.Len() > 0 { @@ -166,7 +166,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR Revision: subscribedRevision, } if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v", err) + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) result = false } } diff --git a/registry/base_registry.go b/registry/base_registry.go index 3e1bddf233310871182544b6415c10c8df27e622..ad1a3b61741e003625612ad58409eb8615271a84 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -56,6 +56,8 @@ func init() { localIP, _ = gxnet.GetLocalIP() } +type createPathFunc func(dubboPath string) error + /* * -----------------------------------NOTICE--------------------------------------------- * If there is no special case, you'd better inherit BaseRegistry and implement the @@ -74,8 +76,12 @@ type FacadeBasedRegistry interface { CreatePath(string) error // DoRegister actually do the register job DoRegister(string, string) error + // DoUnregister do the unregister job + DoUnregister(string, string) error // DoSubscribe actually subscribe the URL DoSubscribe(conf *common.URL) (Listener, error) + // DoUnsubscribe does unsubscribe the URL + DoUnsubscribe(conf *common.URL) (Listener, error) // CloseAndNilClient close the client and then reset the client in registry to nil // you should notice that this method will be invoked inside a lock. // So you should implement this method as light weighted as you can. @@ -94,7 +100,7 @@ type BaseRegistry struct { birth int64 // time of file birth, seconds since Epoch; 0 if unknown wg sync.WaitGroup // wg+done for zk restart done chan struct{} - cltLock sync.Mutex //ctl lock is a lock for services map + cltLock sync.RWMutex //ctl lock is a lock for services map services map[string]common.URL // service name + protocol -> service config, for store the service registered } @@ -154,6 +160,43 @@ func (r *BaseRegistry) Register(conf common.URL) error { return nil } +// UnRegister implement interface registry to unregister +func (r *BaseRegistry) UnRegister(conf common.URL) error { + var ( + ok bool + err error + oldURL common.URL + ) + + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + oldURL, ok = r.services[conf.Key()] + + if !ok { + err = perrors.Errorf("Path{%s} has not registered", conf.Key()) + } + + delete(r.services, conf.Key()) + }() + + if err != nil { + return err + } + + err = r.unregister(conf) + if err != nil { + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + r.services[conf.Key()] = oldURL + }() + return perrors.WithMessagef(err, "register(conf:%+v)", conf) + } + + return nil +} + // service is for getting service path stored in url func (r *BaseRegistry) service(c common.URL) string { return url.QueryEscape(c.Service()) @@ -189,6 +232,18 @@ func (r *BaseRegistry) RestartCallBack() bool { // register for register url to registry, include init params func (r *BaseRegistry) register(c common.URL) error { + return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath) +} + +// unregister for unregister url to registry, include init params +func (r *BaseRegistry) unregister(c common.URL) error { + return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil) +} + +func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error, cpf createPathFunc) error { + if f == nil { + panic(" Must provide a `function(string, string) error` to process URL. ") + } var ( err error //revision string @@ -213,15 +268,15 @@ func (r *BaseRegistry) register(c common.URL) error { switch role { case common.PROVIDER: - dubboPath, rawURL, err = r.providerRegistry(c, params) + dubboPath, rawURL, err = r.providerRegistry(c, params, cpf) case common.CONSUMER: - dubboPath, rawURL, err = r.consumerRegistry(c, params) + dubboPath, rawURL, err = r.consumerRegistry(c, params, cpf) default: return perrors.Errorf("@c{%v} type is not referencer or provider", c) } encodedURL = url.QueryEscape(rawURL) dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") - err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL) + err = f(dubboPath, encodedURL) if err != nil { return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL) @@ -229,8 +284,15 @@ func (r *BaseRegistry) register(c common.URL) error { return nil } +// createPath will create dubbo path in register +func (r *BaseRegistry) createPath(dubboPath string) error { + r.cltLock.Lock() + defer r.cltLock.Unlock() + return r.facadeBasedRegistry.CreatePath(dubboPath) +} + // providerRegistry for provider role do -func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) { +func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) { var ( dubboPath string rawURL string @@ -240,11 +302,9 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - }() + if f != nil { + err = f(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath) @@ -274,7 +334,7 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string } // consumerRegistry for consumer role do -func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) { +func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) { var ( dubboPath string rawURL string @@ -282,23 +342,18 @@ func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string ) dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - - }() + if f != nil { + err = f(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithStack(err) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - }() + if f != nil { + err = f(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) @@ -323,20 +378,20 @@ func sleepWait(n int) { } // Subscribe :subscribe from registry, event will notify by notifyListener -func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { +func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { n := 0 for { n++ if !r.IsAvailable() { logger.Warnf("event listener game over.") - return + return perrors.New("BaseRegistry is not available.") } listener, err := r.facadeBasedRegistry.DoSubscribe(url) if err != nil { if !r.IsAvailable() { logger.Warnf("event listener game over.") - return + return err } logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) time.Sleep(time.Duration(RegistryConnDelay) * time.Second) @@ -358,6 +413,37 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } } +// UnSubscribe URL +func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return perrors.New("BaseRegistry is not available.") + } + + listener, err := r.facadeBasedRegistry.DoUnsubscribe(url) + if err != nil { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return perrors.New("BaseRegistry is not available.") + } + logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) + return perrors.WithStack(err) + } + + for { + if serviceEvent, err := listener.Next(); err != nil { + logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) + listener.Close() + break + } else { + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) + } + + } + return nil +} + // closeRegisters close and remove registry client and reset services map func (r *BaseRegistry) closeRegisters() { logger.Infof("begin to close provider client") diff --git a/registry/consul/registry.go b/registry/consul/registry.go index c5b8510a6c87068a5b4f1ce52203d401a896a6c2..c9e0718346258b6b38f2a793dc215bcf8e65cdb7 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -95,7 +95,7 @@ func (r *consulRegistry) register(url common.URL) error { return r.client.Agent().ServiceRegister(service) } -func (r *consulRegistry) Unregister(url common.URL) error { +func (r *consulRegistry) UnRegister(url common.URL) error { var err error role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) @@ -112,11 +112,17 @@ func (r *consulRegistry) unregister(url common.URL) error { return r.client.Agent().ServiceDeregister(buildId(url)) } -func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { +func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) if role == common.CONSUMER { r.subscribe(url, notifyListener) } + return nil +} + +// UnSubscribe : +func (r *consulRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error { + return perrors.New("UnSubscribe not support in consulRegistry") } func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.NotifyListener) { diff --git a/registry/consul/registry_test.go b/registry/consul/registry_test.go index bb6842cd8fb67dd2cc70b1a7530fbb94f618a9b0..94718f5ab657c198882f065a50e5d5a2c9d4bc6f 100644 --- a/registry/consul/registry_test.go +++ b/registry/consul/registry_test.go @@ -44,7 +44,7 @@ func (suite *consulRegistryTestSuite) testRegister() { func (suite *consulRegistryTestSuite) testUnregister() { consulProviderRegistry, _ := suite.providerRegistry.(*consulRegistry) - err := consulProviderRegistry.Unregister(suite.providerUrl) + err := consulProviderRegistry.UnRegister(suite.providerUrl) assert.NoError(suite.t, err) } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 5d389c36374fe9de5561418bc90d44a7d780fd48..a65d090349b40d473c769e3130e4f000ee03bd00 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -114,6 +114,10 @@ func (r *etcdV3Registry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } +func (r *etcdV3Registry) DoUnregister(root string, node string) error { + return perrors.New("DoUnregister is not support in etcdV3Registry") +} + func (r *etcdV3Registry) CloseAndNilClient() { r.client.Close() r.client = nil @@ -168,3 +172,7 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) return configListener, nil } + +func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry") +} diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 8a02d0e3e693b58946a97e7b47238e0be4272dcf..7ee0f6b0eeb83181bfd20e1abe4685e8319cd09b 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -107,6 +107,10 @@ func (r *kubernetesRegistry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } +func (r *kubernetesRegistry) DoUnregister(root string, node string) error { + return perrors.New("DoUnregister is not support in kubernetesRegistry") +} + func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( @@ -139,6 +143,10 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er return configListener, nil } +func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry") +} + func (r *kubernetesRegistry) InitListeners() { r.listener = kubernetes.NewEventListener(r.client) r.configListener = NewConfigurationListener(r) diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 9591928eebd22bf2a99ec9dcfeb285c4519a3b90..f39490a26755a96aab1438d965bd8ee6fc75006f 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -51,6 +51,11 @@ func (*MockRegistry) Register(url common.URL) error { return nil } +// UnRegister +func (r *MockRegistry) UnRegister(conf common.URL) error { + return nil +} + // Destroy ... func (r *MockRegistry) Destroy() { if r.destroyed.CAS(false, true) { @@ -72,7 +77,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { } // Subscribe ... -func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { +func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { go func() { for { if !r.IsAvailable() { @@ -104,6 +109,12 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } } }() + return nil +} + +// UnSubscribe : +func (r *MockRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error { + return nil } type listener struct { diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index a436b85064829b9f42c9dcc45545e5bf2fd2fefe..c98bbc7843d4317d9f7d74040481052b28c0f493 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -136,23 +136,28 @@ func (nr *nacosRegistry) Register(url common.URL) error { return nil } +// UnRegister +func (nr *nacosRegistry) UnRegister(conf common.URL) error { + return perrors.New("UnRegister is not support in nacosRegistry") +} + func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) { return NewNacosListener(*conf, nr.namingClient) } //subscribe from registry -func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { +func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { for { if !nr.IsAvailable() { logger.Warnf("event listener game over.") - return + return perrors.New("nacosRegistry is not available.") } listener, err := nr.subscribe(url) if err != nil { if !nr.IsAvailable() { logger.Warnf("event listener game over.") - return + return err } logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) time.Sleep(time.Duration(RegistryConnDelay) * time.Second) @@ -164,7 +169,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti if err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() - return + return err } logger.Infof("update begin, service event: %v", serviceEvent.String()) @@ -172,6 +177,12 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } } + return nil +} + +// UnSubscribe : +func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error { + return perrors.New("UnSubscribe not support in nacosRegistry") } func (nr *nacosRegistry) GetUrl() common.URL { diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 52a7dcbfc77fd576ef8d2917ce51cc09f3cd0b97..aa8fbcbe7d6eca682892d4627878fe6bfc3756fe 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -22,9 +22,8 @@ import ( "strings" "sync" ) - import ( - "github.com/dubbogo/gost/container/set" + gxset "github.com/dubbogo/gost/container/set" ) import ( @@ -96,8 +95,24 @@ func getRegistry(regUrl *common.URL) registry.Registry { func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.URL { if registryUrl.GetParamBool("simplified", false) { return providerUrl.CloneWithParams(reserveParams) + } else { + return filterHideKey(providerUrl) + } +} + +// filterHideKey filter the parameters that do not need to be output in url(Starting with .) +func filterHideKey(url *common.URL) *common.URL { + + //be careful params maps in url is map type + cloneURL := url.Clone() + removeSet := gxset.NewSet() + for k, _ := range cloneURL.GetParams() { + if strings.HasPrefix(k, ".") { + removeSet.Add(k) + } } - return providerUrl + cloneURL.RemoveParams(removeSet) + return cloneURL } func (proto *registryProtocol) initConfigurationListeners() { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index cee2a6a625368f655d1b9bc5fe8cc37031e1aef7..15fd3cacfacad36309e0ad4deb3c7c7441e47e26 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -284,3 +284,12 @@ func TestExportWithApplicationConfig(t *testing.T) { v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) assert.NotNil(t, v2) } + +func TestGetProviderUrlWithHideKey(t *testing.T) { + url, _ := common.NewURL("dubbo://127.0.0.1:1111?a=a1&b=b1&.c=c1&.d=d1&e=e1&protocol=registry") + providerUrl := getUrlToRegistry(&url, &url) + assert.NotContains(t, providerUrl.GetParams(), ".c") + assert.NotContains(t, providerUrl.GetParams(), ".d") + assert.Contains(t, providerUrl.GetParams(), "a") + +} diff --git a/registry/registry.go b/registry/registry.go index d673864700e6ba99e8f0283247d53760b85598aa..74e63aa66ebdc674261ce4109b27a067ce769007 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -34,6 +34,12 @@ type Registry interface { //And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring. Register(url common.URL) error + // UnRegister is required to support the contract: + // 1. If it is the persistent stored data of dynamic=false, the registration data can not be found, then the IllegalStateException is thrown, otherwise it is ignored. + // 2. Unregister according to the full url match. + // url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin + UnRegister(url common.URL) error + //When creating new registry extension,pls select one of the following modes. //Will remove in dubbogo version v1.1.0 //mode1 : return Listener with Next function which can return subscribe service event from registry @@ -42,7 +48,14 @@ type Registry interface { //Will relace mode1 in dubbogo version v1.1.0 //mode2 : callback mode, subscribe with notify(notify listener). - Subscribe(*common.URL, NotifyListener) + Subscribe(*common.URL, NotifyListener) error + + // UnSubscribe is required to support the contract: + // 1. If don't subscribe, ignore it directly. + // 2. Unsubscribe by full URL match. + // url Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin + // listener A listener of the change event, not allowed to be empty + UnSubscribe(*common.URL, NotifyListener) error } // NotifyListener ... diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index c5b2f33c6107e82aa172c818c0d8aca1483248c6..ec82fa0309118fba4b5c21772d4dfd356f3b0c5c 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -37,7 +37,7 @@ import ( // RegistryDataListener contains all URL information subscribed by zookeeper registry type RegistryDataListener struct { - subscribed map[*common.URL]config_center.ConfigurationListener + subscribed map[string]config_center.ConfigurationListener mutex sync.Mutex closed bool } @@ -45,7 +45,7 @@ type RegistryDataListener struct { // NewRegistryDataListener constructs a new RegistryDataListener func NewRegistryDataListener() *RegistryDataListener { return &RegistryDataListener{ - subscribed: make(map[*common.URL]config_center.ConfigurationListener)} + subscribed: make(map[string]config_center.ConfigurationListener)} } // SubscribeURL is used to set a watch listener for url @@ -53,7 +53,17 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen if l.closed { return } - l.subscribed[url] = listener + l.subscribed[url.ServiceKey()] = listener +} + +// UnSubscribeURL is used to set a watch listener for url +func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { + if l.closed { + return nil + } + listener := l.subscribed[url.ServiceKey()] + delete(l.subscribed, url.ServiceKey()) + return listener } // DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing @@ -75,8 +85,8 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { if l.closed { return false } - for url, listener := range l.subscribed { - if serviceURL.URLEqual(*url) { + for serviceKey, listener := range l.subscribed { + if serviceURL.ServiceKey() == serviceKey { listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, @@ -101,18 +111,25 @@ func (l *RegistryDataListener) Close() { // RegistryConfigurationListener represent the processor of zookeeper watcher type RegistryConfigurationListener struct { - client *zk.ZookeeperClient - registry *zkRegistry - events chan *config_center.ConfigChangeEvent - isClosed bool - close chan struct{} - closeOnce sync.Once + client *zk.ZookeeperClient + registry *zkRegistry + events chan *config_center.ConfigChangeEvent + isClosed bool + close chan struct{} + closeOnce sync.Once + subscribeURL *common.URL } // NewRegistryConfigurationListener for listening the event of zk. -func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { +func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry, conf *common.URL) *RegistryConfigurationListener { reg.WaitGroup().Add(1) - return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false, close: make(chan struct{}, 1)} + return &RegistryConfigurationListener{ + client: client, + registry: reg, + events: make(chan *config_center.ConfigChangeEvent, 32), + isClosed: false, + close: make(chan struct{}, 1), + subscribeURL: conf} } // Process submit the ConfigChangeEvent to the event chan to notify all observer diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 88d5d6221b4bc7136ba4c3e7c95fb53ba35a9a58..5d5f9e0526b7b8a9c5a2e2524f27f03573d758a8 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -20,6 +20,7 @@ package zookeeper import ( "fmt" "net/url" + "path" "sync" "time" ) @@ -128,12 +129,17 @@ func (r *zkRegistry) InitListeners() { recoverd := r.dataListener.subscribed if recoverd != nil && len(recoverd) > 0 { // recover all subscribed url - for conf, oldListener := range recoverd { - if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok { + for _, oldListener := range recoverd { + var ( + regConfigListener *RegistryConfigurationListener + ok bool + ) + + if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok { regConfigListener.Close() } - newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r)) - go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener) + newDataListener.SubscribeURL(regConfigListener.subscribeURL, NewRegistryConfigurationListener(r.client, r, regConfigListener.subscribeURL)) + go r.listener.ListenServiceEvent(regConfigListener.subscribeURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(regConfigListener.subscribeURL.Service())), newDataListener) } } @@ -149,10 +155,23 @@ func (r *zkRegistry) DoRegister(root string, node string) error { return r.registerTempZookeeperNode(root, node) } +func (r *zkRegistry) DoUnregister(root string, node string) error { + r.cltLock.Lock() + defer r.cltLock.Unlock() + if !r.ZkClient().ZkConnValid() { + return perrors.Errorf("zk client is not valid.") + } + return r.ZkClient().Delete(path.Join(root, node)) +} + func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } +func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return r.getCloseListener(conf) +} + func (r *zkRegistry) CloseAndNilClient() { r.client.Close() r.client = nil @@ -217,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen dataListener := r.dataListener dataListener.mutex.Lock() defer dataListener.mutex.Unlock() - if r.dataListener.subscribed[conf] != nil { + if r.dataListener.subscribed[conf.ServiceKey()] != nil { - zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener) + zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener) if zkListener != nil { r.listenerLock.Lock() defer r.listenerLock.Unlock() @@ -231,7 +250,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen } } - zkListener = NewRegistryConfigurationListener(r.client, r) + zkListener = NewRegistryConfigurationListener(r.client, r, conf) if r.listener == nil { r.cltLock.Lock() client := r.client @@ -255,3 +274,37 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen return zkListener, nil } + +func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) { + + var zkListener *RegistryConfigurationListener + r.dataListener.mutex.Lock() + configurationListener := r.dataListener.subscribed[conf.ServiceKey()] + if configurationListener != nil { + + zkListener, _ := configurationListener.(*RegistryConfigurationListener) + if zkListener != nil { + if zkListener.isClosed { + return nil, perrors.New("configListener already been closed") + } + } + } + + zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener) + r.dataListener.mutex.Unlock() + + if r.listener == nil { + return nil, perrors.New("listener is null can not close.") + } + + //Interested register to dataconfig. + r.listenerLock.Lock() + listener := r.listener + r.listener = nil + r.listenerLock.Unlock() + + r.dataListener.Close() + listener.Close() + + return zkListener, nil +} diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 688deccfbec67771c4071f6307802a16e4e0fc8b..d915fc2ce10359f0dd1970daf019746ce066f511 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -45,6 +45,31 @@ func Test_Register(t *testing.T) { assert.NoError(t, err) } +func Test_UnRegister(t *testing.T) { + // register + regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + + ts, reg, _ := newMockZkRegistry(®url) + defer ts.Stop() + err := reg.Register(url) + children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") + assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children) + assert.NoError(t, err) + + err = reg.UnRegister(url) + children, err = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") + assert.Equal(t, 0, len(children)) + assert.Error(t, err) + assert.True(t, reg.IsAvailable()) + + err = reg.Register(url) + children, _ = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") + assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children) + assert.NoError(t, err) + +} + func Test_Subscribe(t *testing.T) { regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) @@ -74,6 +99,39 @@ func Test_Subscribe(t *testing.T) { defer ts.Stop() } +func Test_UnSubscribe(t *testing.T) { + regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + ts, reg, _ := newMockZkRegistry(®url) + + //provider register + err := reg.Register(url) + assert.NoError(t, err) + + if err != nil { + return + } + + //consumer register + regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) + + reg2.Register(url) + listener, _ := reg2.DoSubscribe(&url) + + serviceEvent, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + return + } + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) + + reg2.UnSubscribe(&url, nil) + assert.Nil(t, reg2.listener) + + defer ts.Stop() +} + func Test_ConsumerDestory(t *testing.T) { regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 84877667763ce870e76202844e9dc9dc1c3f008c..097106acf6b44d03708362d587b5faa8281edeab 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -314,7 +314,8 @@ func (l *ZkEventListener) valid() bool { return l.client.ZkConnValid() } -// Close ... +// Close will let client listen exit func (l *ZkEventListener) Close() { + close(l.client.exit) l.wg.Wait() }