diff --git a/common/constant/key.go b/common/constant/key.go index 0515094f285a4bf598b04e2ea1ef376325de7ac1..24b1aa781fd54bb8b769a78ad9eb2ed1287f2ead 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -314,17 +314,18 @@ const ( // service discovery const ( - SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services" - PROVIDER_BY = "provided-by" - EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision" - SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision" - SERVICE_INSTANCE_SELECTOR = "service-instance-selector" - METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type" - DEFAULT_METADATA_STORAGE_TYPE = "local" - SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints" - METADATA_SERVICE_PREFIX = "dubbo.metadata-service." - METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" - METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" + SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services" + PROVIDER_BY = "provided-by" + EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision" + //SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision" + SERVICE_INSTANCE_SELECTOR = "service-instance-selector" + METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type" + DEFAULT_METADATA_STORAGE_TYPE = "local" + REMOTE_METADATA_STORAGE_TYPE = "remote" + SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints" + METADATA_SERVICE_PREFIX = "dubbo.metadata-service." + METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" + METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" // SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used SERVICE_DISCOVERY_KEY = "service_discovery" diff --git a/common/metadata_info.go b/common/metadata_info.go new file mode 100644 index 0000000000000000000000000000000000000000..9e51e2426bee71be1ec126fcdc82732b3659976b --- /dev/null +++ b/common/metadata_info.go @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "fmt" + "github.com/apache/dubbo-go/common/constant" + gxset "github.com/dubbogo/gost/container/set" + "go.uber.org/atomic" + "hash/crc32" + "net/url" + "sort" + "strings" +) + +var IncludeKeys = gxset.NewSet( + constant.APPLICATION_KEY, + constant.GROUP_KEY, constant.TIMESTAMP_KEY, constant.SERIALIZATION_KEY, constant.CLUSTER_KEY, + constant.LOADBALANCE_KEY, constant.PATH_KEY, constant.TIMEOUT_KEY, + constant.TOKEN_KEY, constant.VERSION_KEY, constant.WARMUP_KEY, + constant.WEIGHT_KEY, constant.RELEASE_KEY) + +type MetadataInfo struct { + App string `json:"app"` + Revision string `json:"revision"` + Services map[string]*ServiceInfo `json:"services"` + + reported *atomic.Bool `json:"-"` +} + +func NewMetadataInfWithApp(app string) *MetadataInfo { + return NewMetadataInfo(app, "", make(map[string]*ServiceInfo)) +} + +func NewMetadataInfo(app string, revision string, services map[string]*ServiceInfo) *MetadataInfo { + return &MetadataInfo{ + App: app, + Revision: revision, + Services: services, + reported: atomic.NewBool(false), + } +} + +// CalAndGetRevision is different from Dubbo because golang doesn't support overload +// so that we could use interface + method name as identifier and ignore the method params +// per my understanding, it's enough because Dubbo actually ignore the url params. +// please refer org.apache.dubbo.common.URL#toParameterString(java.lang.String...) +func (mi *MetadataInfo) CalAndGetRevision() string { + if mi.Revision != "" && mi.reported.Load() { + return mi.Revision + } + if len(mi.Services) == 0 { + return "0" + } + candidates := make([]string, 8) + + for _, s := range mi.Services { + sk := s.serviceKey + ms := s.GetMethods() + if len(ms) == 0 { + candidates = append(candidates, sk) + } else { + for _, m := range ms { + // methods are part of candidates + candidates = append(candidates, sk+constant.KEY_SEPARATOR+m) + } + } + + // append url params if we need it + } + sort.Strings(candidates) + + // it's nearly impossible to be overflow + res := uint64(0) + for _, c := range candidates { + res += uint64(crc32.ChecksumIEEE([]byte(c))) + } + mi.Revision = fmt.Sprint(res) + return mi.Revision + +} + +func (mi *MetadataInfo) HasReported() bool { + return mi.reported.Load() +} + +func (mi *MetadataInfo) MarkReported() { + mi.reported.CAS(false, true) +} + +func (mi *MetadataInfo) AddService(service *ServiceInfo) { + if service == nil { + return + } + mi.Services[service.GetMatchKey()] = service +} + +func (mi *MetadataInfo) RemoveService(service *ServiceInfo) { + if service == nil { + return + } + delete(mi.Services, service.matchKey) +} + +type ServiceInfo struct { + Name string `json:"name"` + Group string `json:"group"` + Version string `json:"version"` + Protocol string `json:"protocol"` + Path string `json:"path"` + Params map[string]string `json:"params"` + + serviceKey string `json:"-"` + matchKey string `json:"-"` + url *URL `json:"-"` +} + +func NewServiceInfoWithUrl(url *URL) *ServiceInfo { + service := NewServiceInfo(url.Service(), url.Group(), url.Version(), url.Protocol, url.Path, nil) + service.url = url + // TODO includeKeys load dynamic + p := make(map[string]string, 8) + for _, keyInter := range IncludeKeys.Values() { + key := keyInter.(string) + value := url.GetParam(key, "") + if len(value) != 0 { + p[key] = value + } + for _, method := range url.Methods { + value = url.GetMethodParam(method, key, "") + if len(value) != 0 { + p[method+"."+key] = value + } + } + } + service.Params = p + return service +} + +func NewServiceInfo(name string, group string, version string, protocol string, path string, params map[string]string) *ServiceInfo { + serviceKey := ServiceKey(name, group, version) + matchKey := MatchKey(serviceKey, protocol) + return &ServiceInfo{ + Name: name, + Group: group, + Version: version, + Protocol: protocol, + Path: path, + Params: params, + serviceKey: serviceKey, + matchKey: matchKey, + } +} + +func (si *ServiceInfo) GetMethods() []string { + if si.Params[constant.METHODS_KEY] != "" { + s := si.Params[constant.METHODS_KEY] + return strings.Split(s, ",") + } + methods := make([]string, 8) + for k, _ := range si.Params { + ms := strings.Index(k, ".") + if ms > 0 { + methods = append(methods, k[0:ms]) + } + } + return methods +} + +func (si *ServiceInfo) GetParams() url.Values { + v := url.Values{} + for k, p := range si.Params { + ms := strings.Index(k, ".") + if ms > 0 { + v.Set("methods."+k, p) + } else { + v.Set(k, p) + } + } + return v +} + +func (si *ServiceInfo) GetMatchKey() string { + if si.matchKey != "" { + return si.matchKey + } + serviceKey := si.GetServiceKey() + si.matchKey = MatchKey(serviceKey, si.Protocol) + return si.matchKey +} + +func (si *ServiceInfo) GetServiceKey() string { + if si.serviceKey != "" { + return si.serviceKey + } + si.serviceKey = ServiceKey(si.Name, si.Group, si.Version) + return si.serviceKey +} diff --git a/common/url.go b/common/url.go index ac49465346a95e6aa0ac1e6915bc6c42b91713c6..1b8effb202fc38217cb6f123c38e714a5b652f27 100644 --- a/common/url.go +++ b/common/url.go @@ -371,6 +371,10 @@ func ServiceKey(intf string, group string, version string) string { return buf.String() } +func MatchKey(serviceKey string, protocol string) string { + return serviceKey + ":" + protocol +} + // ColonSeparatedKey // The format is "{interface}:[version]:[group]" func (c *URL) ColonSeparatedKey() string { @@ -413,6 +417,16 @@ func (c *URL) Service() string { return "" } +// Group get group +func (c *URL) Group() string { + return c.GetParam(constant.GROUP_KEY, "") +} + +// Version get group +func (c *URL) Version() string { + return c.GetParam(constant.VERSION_KEY, "") +} + // AddParam will add the key-value pair func (c *URL) AddParam(key string, value string) { c.paramsLock.Lock() diff --git a/metadata/identifier/subscribe_metadata_identifier.go b/metadata/identifier/subscribe_metadata_identifier.go index b1e37db971ada56a77bc3b716606b6fc8d137d34..dd1808050300162e36fb8efbc05b63b9aef2d7a7 100644 --- a/metadata/identifier/subscribe_metadata_identifier.go +++ b/metadata/identifier/subscribe_metadata_identifier.go @@ -23,6 +23,21 @@ type SubscriberMetadataIdentifier struct { MetadataIdentifier } +func NewSubscriberMetadataIdentifier(application string, revision string) *SubscriberMetadataIdentifier { + return &SubscriberMetadataIdentifier{ + Revision: revision, + MetadataIdentifier: MetadataIdentifier{ + Application: application, + BaseMetadataIdentifier: BaseMetadataIdentifier{ + ServiceInterface: "", + Version: "", + Group: "", + Side: "", + }, + }, + } +} + // GetIdentifierKey returns string that format is service:Version:Group:Side:Revision func (mdi *SubscriberMetadataIdentifier) GetIdentifierKey() string { return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.Revision) diff --git a/metadata/report/consul/report.go b/metadata/report/consul/report.go index e211f7fde4d462bd20603af54444036681a46f4d..7017889682885c42a5bae73101f9da22c37b8a0a 100644 --- a/metadata/report/consul/report.go +++ b/metadata/report/consul/report.go @@ -46,6 +46,14 @@ type consulMetadataReport struct { client *consul.Client } +func (m *consulMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) { + panic("implement me") +} + +func (m *consulMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error { + panic("implement me") +} + // StoreProviderMetadata stores the metadata. func (m *consulMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error { kv := &consul.KVPair{Key: providerIdentifier.GetIdentifierKey(), Value: []byte(serviceDefinitions)} diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index 56a22de8f8e56c040260edf86859a4499f1b2f39..723992bff2e82c82426d3f9ad165176004ff382f 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -302,3 +302,13 @@ func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifie } return false } + +func (mr *MetadataReport) PublishAppMetadata(identifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error { + report := instance.GetMetadataReportInstance() + return report.PublishAppMetadata(identifier, info) +} + +func (mr *MetadataReport) GetAppMetadata(identifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) { + report := instance.GetMetadataReportInstance() + return report.GetAppMetadata(identifier) +} diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go index 1939b911822b6b5db3d41f999d4dcb4ca3fae1f2..6a9000fc98f19e6bcdc0377372f95a54db81928f 100644 --- a/metadata/report/etcd/report.go +++ b/metadata/report/etcd/report.go @@ -47,6 +47,16 @@ type etcdMetadataReport struct { root string } +func (e *etcdMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) { + // TODO will implement + panic("implement me") +} + +func (e *etcdMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error { + // TODO will implement + panic("implement me") +} + // StoreProviderMetadata will store the metadata // metadata including the basic info of the server, provider info, and other user custom info func (e *etcdMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error { diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go index 42e9859c9fae70cc110dc45fca43dd95a5627d6e..283290e484a0c60adef3d1989e9ea8f014ac9b3d 100644 --- a/metadata/report/nacos/report.go +++ b/metadata/report/nacos/report.go @@ -50,6 +50,16 @@ type nacosMetadataReport struct { client config_client.IConfigClient } +func (n *nacosMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) { + // TODO will implement + panic("implement me") +} + +func (n *nacosMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error { + // TODO will implement + panic("implement me") +} + // StoreProviderMetadata stores the metadata. func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error { return n.storeMetadata(vo.ConfigParam{ diff --git a/metadata/report/report.go b/metadata/report/report.go index dcb414209c1e6188dc195e9609f60124adaf2173..7cfc814eda28d7eb17c02a64c70b2bcf7e8c7671 100644 --- a/metadata/report/report.go +++ b/metadata/report/report.go @@ -57,4 +57,8 @@ type MetadataReport interface { // GetServiceDefinition gets the service definition. GetServiceDefinition(*identifier.MetadataIdentifier) (string, error) + + GetAppMetadata(*identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) + + PublishAppMetadata(*identifier.SubscriberMetadataIdentifier, *common.MetadataInfo) error } diff --git a/metadata/report/zookeeper/report.go b/metadata/report/zookeeper/report.go index 5d5e74007b9901276d7144bf0b052018e2f7e0a7..89172356b3c6551ca12468bb7f10cf5f914d21bb 100644 --- a/metadata/report/zookeeper/report.go +++ b/metadata/report/zookeeper/report.go @@ -18,6 +18,9 @@ package zookeeper import ( + "encoding/json" + "github.com/apache/dubbo-go/common/logger" + "github.com/dubbogo/go-zookeeper/zk" "strings" "time" ) @@ -50,6 +53,34 @@ type zookeeperMetadataReport struct { rootDir string } +func (m *zookeeperMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) { + k := m.rootDir + metadataIdentifier.GetFilePathKey() + data, _, err := m.client.GetContent(k) + if err != nil { + return nil, err + } + var metadataInfo common.MetadataInfo + err = json.Unmarshal(data, &metadataInfo) + if err != nil { + return nil, err + } + return &metadataInfo, nil +} + +func (m *zookeeperMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error { + k := m.rootDir + metadataIdentifier.GetFilePathKey() + data, err := json.Marshal(metadataIdentifier) + if err != nil { + return err + } + err = m.client.CreateTempWithValue(k, data) + if err == zk.ErrNodeExists { + logger.Debugf("Try to create the node data failed. In most cases, it's not a problem. ") + return nil + } + return err +} + // StoreProviderMetadata stores the metadata. func (m *zookeeperMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error { k := m.rootDir + providerIdentifier.GetFilePathKey() diff --git a/metadata/service/inmemory/metadata_service_proxy_factory.go b/metadata/service/inmemory/metadata_service_proxy_factory.go index becd804f6741e27dda418a4ac9ce976755c634c4..345a2b37d28e1245eb15622457d789bc9a4254e7 100644 --- a/metadata/service/inmemory/metadata_service_proxy_factory.go +++ b/metadata/service/inmemory/metadata_service_proxy_factory.go @@ -19,6 +19,7 @@ package inmemory import ( "encoding/json" + "sync" ) import ( @@ -37,6 +38,17 @@ func init() { }) } +var factory service.MetadataServiceProxyFactory + +var once *sync.Once + +func GetInMemoryMetadataServiceProxyFactory() service.MetadataServiceProxyFactory { + once.Do(func() { + factory = service.NewBaseMetadataServiceProxyFactory(createProxy) + }) + return factory +} + // createProxy creates an instance of MetadataServiceProxy // we read the metadata from ins.Metadata() // and then create an Invoker instance diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index 8da78c34207c37a7f7e3f475502ac43e7d88b6fd..b9e372dea58d91c82e676d27982b3e5be6e155e5 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -28,7 +28,6 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/metadata/definition" @@ -41,10 +40,6 @@ const ( local = "local" ) -func init() { - extension.SetMetadataService(local, NewMetadataService) -} - // MetadataService is store and query the metadata info in memory when each service registry type MetadataService struct { service.BaseMetadataService @@ -52,6 +47,9 @@ type MetadataService struct { subscribedServiceURLs *sync.Map serviceDefinitions *sync.Map lock *sync.RWMutex + mlock *sync.Mutex + metadataInfo *common.MetadataInfo + metadataServiceURL *common.URL } var ( @@ -61,7 +59,7 @@ var ( // NewMetadataService: initiate a metadata service // it should be singleton -func NewMetadataService() (service.MetadataService, error) { +func GetInMemoryMetadataService() (service.MetadataService, error) { metadataServiceInitOnce.Do(func() { metadataServiceInstance = &MetadataService{ BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name), @@ -69,6 +67,7 @@ func NewMetadataService() (service.MetadataService, error) { subscribedServiceURLs: &sync.Map{}, serviceDefinitions: &sync.Map{}, lock: &sync.RWMutex{}, + metadataInfo: nil, } }) return metadataServiceInstance, nil @@ -153,11 +152,28 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s // ExportURL can store the in memory func (mts *MetadataService) ExportURL(url *common.URL) (bool, error) { + if constant.METADATA_SERVICE_NAME == url.GetParam(constant.INTERFACE_KEY, "") { + mts.metadataServiceURL = url + return true, nil + } + mts.mlock.Lock() + if mts.metadataInfo == nil { + mts.metadataInfo = common.NewMetadataInfWithApp(config.GetApplicationConfig().Name) + } + mts.mlock.Unlock() + mts.metadataInfo.AddService(common.NewServiceInfoWithUrl(url)) return mts.addURL(mts.exportedServiceURLs, url), nil } // UnexportURL can remove the url store in memory func (mts *MetadataService) UnexportURL(url *common.URL) error { + if constant.METADATA_SERVICE_NAME == url.GetParam(constant.INTERFACE_KEY, "") { + mts.metadataServiceURL = nil + return nil + } + if mts.metadataInfo != nil { + mts.metadataInfo.RemoveService(common.NewServiceInfoWithUrl(url)) + } mts.removeURL(mts.exportedServiceURLs, url) return nil } @@ -220,6 +236,20 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) return v.(string), nil } +func (mts *MetadataService) GetMetadataInfo(revision string) *common.MetadataInfo { + if revision == "" { + return mts.metadataInfo + } + if mts.metadataInfo.CalAndGetRevision() != revision { + return nil + } + return mts.metadataInfo +} + +func (mts *MetadataService) GetExportedServiceURLs() []*common.URL { + return mts.getAllService(mts.exportedServiceURLs) +} + // RefreshMetadata will always return true because it will be implement by remote service func (mts *MetadataService) RefreshMetadata(string, string) (bool, error) { return true, nil @@ -229,3 +259,7 @@ func (mts *MetadataService) RefreshMetadata(string, string) (bool, error) { func (mts *MetadataService) Version() (string, error) { return version, nil } + +func (mts *MetadataService) GetMetadataServiceURL() *common.URL { + return mts.metadataServiceURL +} diff --git a/metadata/service/inmemory/service_proxy.go b/metadata/service/inmemory/service_proxy.go index 8b93aab9accfd8375ded971c27f898069e73e231..cbc50a2b498223fb8fbb7a7ff330d1bb5d58abcf 100644 --- a/metadata/service/inmemory/service_proxy.go +++ b/metadata/service/inmemory/service_proxy.go @@ -70,6 +70,15 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st return append(ret, *urlStrs...), nil } +func (m *MetadataServiceProxy) GetExportedServiceURLs() []*common.URL { + logger.Error("you should never invoke this implementation") + return nil +} + +func (m *MetadataServiceProxy) GetMetadataServiceURL() *common.URL { + return nil +} + func (m *MetadataServiceProxy) MethodMapper() map[string]string { return map[string]string{} } @@ -133,3 +142,8 @@ func (m *MetadataServiceProxy) Version() (string, error) { logger.Error("you should never invoke this implementation") return "", nil } + +func (m *MetadataServiceProxy) GetMetadataInfo(revision string) *common.MetadataInfo { + logger.Error("you should never invoke this implementation") + return nil +} diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index bb520d3df72c369eb4022246f20bb9e9aa1bd8b8..d922af1e059761f055639a369d130a5743988e48 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -18,6 +18,7 @@ package remote import ( + "github.com/apache/dubbo-go/registry" "sync" ) @@ -28,13 +29,10 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/metadata/definition" "github.com/apache/dubbo-go/metadata/identifier" "github.com/apache/dubbo-go/metadata/report/delegate" - "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service/inmemory" ) @@ -44,14 +42,9 @@ const ( remote = "remote" ) -func init() { - extension.SetMetadataService(remote, newMetadataService) -} - // MetadataService is a implement of metadata service which will delegate the remote metadata report // This is singleton -type MetadataService struct { - service.BaseMetadataService +type RemoteMetadataService struct { inMemoryMetadataService *inmemory.MetadataService exportedRevision atomic.String subscribedRevision atomic.String @@ -60,11 +53,11 @@ type MetadataService struct { var ( metadataServiceOnce sync.Once - metadataServiceInstance *MetadataService + metadataServiceInstance *RemoteMetadataService ) -// newMetadataService will create a new remote MetadataService instance -func newMetadataService() (service.MetadataService, error) { +// GetRemoteMetadataService will create a new remote MetadataService instance +func GetRemoteMetadataService() (*RemoteMetadataService, error) { var err error metadataServiceOnce.Do(func() { var mr *delegate.MetadataReport @@ -73,9 +66,8 @@ func newMetadataService() (service.MetadataService, error) { return } // it will never return error - inms, _ := inmemory.NewMetadataService() - metadataServiceInstance = &MetadataService{ - BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name), + inms, _ := inmemory.GetInMemoryMetadataService() + metadataServiceInstance = &RemoteMetadataService{ inMemoryMetadataService: inms.(*inmemory.MetadataService), delegateReport: mr, } @@ -83,37 +75,30 @@ func newMetadataService() (service.MetadataService, error) { return metadataServiceInstance, err } -// setInMemoryMetadataService will replace the in memory metadata service by the specific param -func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) { - mts.inMemoryMetadataService = metadata -} - -// ExportURL will be implemented by in memory service -func (mts *MetadataService) ExportURL(url *common.URL) (bool, error) { - return mts.inMemoryMetadataService.ExportURL(url) -} - -// UnexportURL remove @url's metadata -func (mts *MetadataService) UnexportURL(url *common.URL) error { - smi := identifier.NewServiceMetadataIdentifier(url) - smi.Revision = mts.exportedRevision.Load() - return mts.delegateReport.RemoveServiceMetadata(smi) -} - -// SubscribeURL will be implemented by in memory service -func (mts *MetadataService) SubscribeURL(url *common.URL) (bool, error) { - return mts.inMemoryMetadataService.SubscribeURL(url) +// publishMetadata +func (mts *RemoteMetadataService) PublishMetadata(service string) { + info := mts.inMemoryMetadataService.GetMetadataInfo("") + if info.HasReported() { + return + } + id := identifier.NewSubscriberMetadataIdentifier(service, info.CalAndGetRevision()) + err := mts.delegateReport.PublishAppMetadata(id, info) + if err != nil { + logger.Errorf("Publishing metadata to error[%v]", err) + return + } + info.MarkReported() } -// UnsubscribeURL will be implemented by in memory service -func (mts *MetadataService) UnsubscribeURL(url *common.URL) error { - // TODO remove call self. - return nil - //return mts.UnsubscribeURL(url) +// publishMetadata +func (mts *RemoteMetadataService) GetMetadata(instance registry.ServiceInstance) (*common.MetadataInfo, error) { + revision := instance.GetMetadata()[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] + id := identifier.NewSubscriberMetadataIdentifier(instance.GetServiceName(), revision) + return mts.delegateReport.GetAppMetadata(id) } // PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition -func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error { +func (mts *RemoteMetadataService) PublishServiceDefinition(url *common.URL) error { interfaceName := url.GetParam(constant.INTERFACE_KEY, "") isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) if common.RoleType(common.PROVIDER).Role() == url.GetParam(constant.SIDE_KEY, "") { @@ -154,75 +139,21 @@ func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error { } // GetExportedURLs will be implemented by in memory service -func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { +func (mts *RemoteMetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol) } // GetSubscribedURLs will be implemented by in memory service -func (mts *MetadataService) GetSubscribedURLs() ([]*common.URL, error) { +func (mts *RemoteMetadataService) GetSubscribedURLs() ([]*common.URL, error) { return mts.inMemoryMetadataService.GetSubscribedURLs() } // GetServiceDefinition will be implemented by in memory service -func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { +func (mts *RemoteMetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { return mts.inMemoryMetadataService.GetServiceDefinition(interfaceName, group, version) } // GetServiceDefinitionByServiceKey will be implemented by in memory service -func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { +func (mts *RemoteMetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { return mts.inMemoryMetadataService.GetServiceDefinitionByServiceKey(serviceKey) } - -// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service -func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { - if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() { - mts.exportedRevision.Store(exportedRevision) - urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "") - if err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) - return false, err - } - logger.Infof("urls length = %v", len(urls)) - for _, ui := range urls { - - u, err := common.NewURL(ui.(string)) - if err != nil { - logger.Errorf("this is not valid url string: %s ", ui.(string)) - continue - } - id := identifier.NewServiceMetadataIdentifier(u) - id.Revision = mts.exportedRevision.Load() - if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) - return false, err - } - } - } - - if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() { - mts.subscribedRevision.Store(subscribedRevision) - urls, err := mts.inMemoryMetadataService.GetSubscribedURLs() - if err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err) - return false, err - } - if len(urls) > 0 { - id := &identifier.SubscriberMetadataIdentifier{ - MetadataIdentifier: identifier.MetadataIdentifier{ - Application: config.GetApplicationConfig().Name, - }, - Revision: subscribedRevision, - } - if err := mts.delegateReport.SaveSubscribedData(id, urls); err != nil { - logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) - return false, err - } - } - } - return true, nil -} - -// Version will return the remote service version -func (MetadataService) Version() (string, error) { - return version, nil -} diff --git a/metadata/service/remote/service_proxy.go b/metadata/service/remote/service_proxy.go deleted file mode 100644 index e0cd6e0783fe4572e0a69cf0694d70f74ba46b42..0000000000000000000000000000000000000000 --- a/metadata/service/remote/service_proxy.go +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package remote - -import ( - "strings" -) -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/config/instance" - "github.com/apache/dubbo-go/metadata/identifier" - "github.com/apache/dubbo-go/metadata/report" - "github.com/apache/dubbo-go/metadata/service" - "github.com/apache/dubbo-go/registry" -) - -type metadataServiceProxy struct { - serviceName string - revision string - report report.MetadataReport -} - -func (m *metadataServiceProxy) Reference() string { - return constant.METADATA_SERVICE_NAME -} - -func (m *metadataServiceProxy) ServiceName() (string, error) { - return m.serviceName, nil -} - -func (m *metadataServiceProxy) ExportURL(url *common.URL) (bool, error) { - logger.Error("you should never invoke this implementation") - return true, nil -} - -func (m *metadataServiceProxy) UnexportURL(url *common.URL) error { - logger.Error("you should never invoke this implementation") - return nil -} - -func (m *metadataServiceProxy) SubscribeURL(url *common.URL) (bool, error) { - logger.Error("you should never invoke this implementation") - return true, nil -} - -func (m *metadataServiceProxy) UnsubscribeURL(url *common.URL) error { - logger.Error("you should never invoke this implementation") - return nil -} - -func (m *metadataServiceProxy) PublishServiceDefinition(url *common.URL) error { - logger.Error("you should never invoke this implementation") - return nil -} - -func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) { - urls, err := m.report.GetExportedURLs(&identifier.ServiceMetadataIdentifier{ - BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ - ServiceInterface: serviceInterface, - Version: version, - Group: group, - Side: constant.PROVIDER_PROTOCOL, - }, - Revision: m.revision, - Protocol: protocol, - }) - - if err != nil { - return []interface{}{}, nil - } - var res []*common.URL - for _, s := range urls { - u, err := common.NewURL(s) - if err != nil { - logger.Errorf("could not parse the url string to URL structure", err) - continue - } - res = append(res, u) - } - return service.ConvertURLArrToIntfArr(res), nil -} - -func (m *metadataServiceProxy) MethodMapper() map[string]string { - return map[string]string{} -} - -func (m *metadataServiceProxy) GetSubscribedURLs() ([]*common.URL, error) { - logger.Error("you should never invoke this implementation") - return nil, nil -} - -func (m *metadataServiceProxy) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { - return m.report.GetServiceDefinition(&identifier.MetadataIdentifier{ - BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ - ServiceInterface: interfaceName, - Group: group, - Version: version, - Side: constant.PROVIDER_PROTOCOL, - }, - Application: m.serviceName, - }) -} - -func (m *metadataServiceProxy) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { - params := parse(serviceKey) - return m.GetServiceDefinition(params[0], params[1], params[2]) -} - -func (m *metadataServiceProxy) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { - logger.Error("you should never invoke this implementation") - return true, nil -} - -func (m metadataServiceProxy) Version() (string, error) { - return version, nil -} - -func newMetadataServiceProxy(ins registry.ServiceInstance) service.MetadataService { - revision := ins.GetMetadata()[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] - if len(revision) == 0 { - revision = constant.DEFAULT_REVISION - } - - return &metadataServiceProxy{ - serviceName: ins.GetServiceName(), - revision: revision, - report: instance.GetMetadataReportInstance(), - } -} - -func parse(key string) []string { - arr := make([]string, 3) - tmp := strings.SplitN(key, "/", 2) - if len(tmp) > 1 { - arr[0] = tmp[0] - key = tmp[1] - } - tmp = strings.SplitN(key, "/", 2) - if len(tmp) > 1 { - arr[2] = tmp[1] - key = tmp[0] - } - arr[1] = key - return arr -} diff --git a/metadata/service/remote/service_proxy_test.go b/metadata/service/remote/service_proxy_test.go index 1899d02205e17f62637488e68630649e60cff061..600880b03d75decd6d0b82b04b5bf605e767505f 100644 --- a/metadata/service/remote/service_proxy_test.go +++ b/metadata/service/remote/service_proxy_test.go @@ -112,6 +112,14 @@ func (m *mockMetadataReportFactory) CreateMetadataReport(*common.URL) report.Met type mockMetadataReport struct { } +func (m mockMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) { + panic("implement me") +} + +func (m mockMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error { + panic("implement me") +} + func (m mockMetadataReport) StoreProviderMetadata(*identifier.MetadataIdentifier, string) error { panic("implement me") } diff --git a/metadata/service/service.go b/metadata/service/service.go index 1d90f8a516831adcae20163a3620dd765459310d..808752527772e6d065e88b546cd335e3d68d561e 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -62,6 +62,12 @@ type MetadataService interface { RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) // Version will return the metadata service version Version() (string, error) + + GetMetadataInfo(revision string) *common.MetadataInfo + + GetExportedServiceURLs() []*common.URL + + GetMetadataServiceURL() *common.URL } // BaseMetadataService is used for the event logic for struct who will implement interface MetadataService diff --git a/registry/consul/service_discovery.go b/registry/consul/service_discovery.go index fba142e04e4979b9c4fe123cc121111838ae2d4e..7e46d51621f95521ccad40d0898dffc37eb302b0 100644 --- a/registry/consul/service_discovery.go +++ b/registry/consul/service_discovery.go @@ -370,59 +370,61 @@ func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, of } func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { - - params := make(map[string]interface{}, 8) - params[watch_type] = watch_type_service - params[watch_service] = listener.ServiceName - params[watch_passingonly] = watch_passingonly_true - plan, err := watch.Parse(params) - if err != nil { - logger.Errorf("add listener for service %s,error:%v", listener.ServiceName, err) - return err - } - - plan.Handler = func(idx uint64, raw interface{}) { - services, ok := raw.([]*consul.ServiceEntry) - if !ok { - err = perrors.New("handler get non ServiceEntry type parameter") - return + for _, v := range listener.ServiceNames.Values() { + serviceName := v.(string) + params := make(map[string]interface{}, 8) + params[watch_type] = watch_type_service + params[watch_service] = serviceName + params[watch_passingonly] = watch_passingonly_true + plan, err := watch.Parse(params) + if err != nil { + logger.Errorf("add listener for service %s,error:%v", serviceName, err) + return err } - instances := make([]registry.ServiceInstance, 0, len(services)) - for _, ins := range services { - metadata := ins.Service.Meta - - // enable status - enableStr := metadata[enable] - delete(metadata, enable) - enable, _ := strconv.ParseBool(enableStr) - - // health status - status := ins.Checks.AggregatedStatus() - healthy := false - if status == consul.HealthPassing { - healthy = true + + plan.Handler = func(idx uint64, raw interface{}) { + services, ok := raw.([]*consul.ServiceEntry) + if !ok { + err = perrors.New("handler get non ServiceEntry type parameter") + return + } + instances := make([]registry.ServiceInstance, 0, len(services)) + for _, ins := range services { + metadata := ins.Service.Meta + + // enable status + enableStr := metadata[enable] + delete(metadata, enable) + enable, _ := strconv.ParseBool(enableStr) + + // health status + status := ins.Checks.AggregatedStatus() + healthy := false + if status == consul.HealthPassing { + healthy = true + } + instances = append(instances, ®istry.DefaultServiceInstance{ + Id: ins.Service.ID, + ServiceName: ins.Service.Service, + Host: ins.Service.Address, + Port: ins.Service.Port, + Enable: enable, + Healthy: healthy, + Metadata: metadata, + }) + } + e := csd.DispatchEventForInstances(serviceName, instances) + if e != nil { + logger.Errorf("Dispatching event got exception, service name: %s, err: %v", serviceName, err) } - instances = append(instances, ®istry.DefaultServiceInstance{ - Id: ins.Service.ID, - ServiceName: ins.Service.Service, - Host: ins.Service.Address, - Port: ins.Service.Port, - Enable: enable, - Healthy: healthy, - Metadata: metadata, - }) - } - e := csd.DispatchEventForInstances(listener.ServiceName, instances) - if e != nil { - logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err) } + go func() { + err = plan.RunWithConfig(csd.Config.Address, csd.Config) + if err != nil { + logger.Error("consul plan run failure!error:%v", err) + } + }() } - go func() { - err = plan.RunWithConfig(csd.Config.Address, csd.Config) - if err != nil { - logger.Error("consul plan run failure!error:%v", err) - } - }() return nil } diff --git a/registry/event/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go index 773eee6e83c6b3e24c293053b55b028d0e7b2e9a..7745e0b55073e8c20f84422671e824078075940a 100644 --- a/registry/event/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -18,6 +18,7 @@ package event import ( + "github.com/apache/dubbo-go/metadata/service/inmemory" gxset "github.com/dubbogo/gost/container/set" gxpage "github.com/dubbogo/gost/hash/page" ) @@ -25,7 +26,6 @@ import ( import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/observer" - "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/registry" ) @@ -153,5 +153,5 @@ func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent obser // getMetadataService returns metadata service instance func getMetadataService() (service.MetadataService, error) { - return extension.GetMetadataService(config.GetApplicationConfig().MetadataType) + return inmemory.GetInMemoryMetadataService() } diff --git a/registry/event/metadata_service_url_params_customizer.go b/registry/event/metadata_service_url_params_customizer.go index 6d8f99b327363c9a2d636079ef1f74e78d4e0184..c90dda0ed0ce24c419ad4aad01243559ebd00b94 100644 --- a/registry/event/metadata_service_url_params_customizer.go +++ b/registry/event/metadata_service_url_params_customizer.go @@ -28,23 +28,10 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/registry" ) -func init() { - exceptKeys := gxset.NewSet( - // remove APPLICATION_KEY because service name must be present - constant.APPLICATION_KEY, - // remove GROUP_KEY, always uses service name. - constant.GROUP_KEY, - // remove TIMESTAMP_KEY because it's nonsense - constant.TIMESTAMP_KEY) - extension.AddCustomizers(&metadataServiceURLParamsMetadataCustomizer{exceptKeys: exceptKeys}) - -} - type metadataServiceURLParamsMetadataCustomizer struct { exceptKeys *gxset.HashSet } @@ -60,16 +47,8 @@ func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry logger.Errorf("could not find the metadata service", err) return } - serviceName := constant.METADATA_SERVICE_NAME - // error always is nil - version, _ := ms.Version() - group := instance.GetServiceName() - urls, err := ms.GetExportedURLs(serviceName, group, version, constant.ANY_VALUE) - if err != nil || len(urls) == 0 { - logger.Info("could not find the exported urls", err) - return - } - ps := m.convertToParams(urls) + url := ms.GetMetadataServiceURL() + ps := m.convertToParams(url) str, err := json.Marshal(ps) if err != nil { logger.Errorf("could not transfer the map to json", err) @@ -78,28 +57,19 @@ func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry instance.GetMetadata()[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME] = string(str) } -func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []interface{}) map[string]map[string]string { - +func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(url *common.URL) map[string]string { // usually there will be only one protocol - res := make(map[string]map[string]string, 1) + res := make(map[string]string, 1) // those keys are useless - - for _, ui := range urls { - u, err := common.NewURL(ui.(string)) - if err != nil { - logger.Errorf("could not parse the string to url: %s", ui.(string), err) + p := make(map[string]string, len(url.GetParams())) + for k, v := range url.GetParams() { + // we will ignore that + if !common.IncludeKeys.Contains(k) || len(v) == 0 || len(v[0]) == 0 { continue } - p := make(map[string]string, len(u.GetParams())) - for k, v := range u.GetParams() { - // we will ignore that - if m.exceptKeys.Contains(k) || len(v) == 0 || len(v[0]) == 0 { - continue - } - p[k] = v[0] - } - p[constant.PORT_KEY] = u.Port - res[u.Protocol] = p + p[k] = v[0] } + p[constant.PORT_KEY] = url.Port + p[constant.PROTOCOL_KEY] = url.Protocol return res } diff --git a/registry/event/protocol_ports_metadata_customizer.go b/registry/event/protocol_ports_metadata_customizer.go index a58471c2bd5a2e1b7b4211e02f605763b2e72c9c..37f88108252062336e9a44deb6f0a8601531ddf4 100644 --- a/registry/event/protocol_ports_metadata_customizer.go +++ b/registry/event/protocol_ports_metadata_customizer.go @@ -23,7 +23,6 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -54,16 +53,15 @@ func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceIns // 4 is enough... we don't have many protocol protocolMap := make(map[string]int, 4) - list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE) - if err != nil || len(list) == 0 { + list := metadataService.GetExportedServiceURLs() + if list == nil || len(list) == 0 { logger.Debugf("Could not find exported urls", err) return } - for _, ui := range list { - u, err := common.NewURL(ui.(string)) + for _, u := range list { if err != nil || len(u.Protocol) == 0 { - logger.Errorf("the url string is invalid: %s", ui.(string), err) + logger.Errorf("the url string is invalid: %s", u, err) continue } @@ -83,9 +81,9 @@ func endpointsStr(protocolMap map[string]int) string { return "" } - endpoints := make([]endpoint, 0, len(protocolMap)) + endpoints := make([]registry.Endpoint, 0, len(protocolMap)) for k, v := range protocolMap { - endpoints = append(endpoints, endpoint{ + endpoints = append(endpoints, registry.Endpoint{ Port: v, Protocol: k, }) @@ -98,9 +96,3 @@ func endpointsStr(protocolMap map[string]int) string { } return string(str) } - -// nolint -type endpoint struct { - Port int `json:"port, omitempty"` - Protocol string `json:"protocol, omitempty"` -} diff --git a/registry/event/service_revision_customizer.go b/registry/event/service_revision_customizer.go deleted file mode 100644 index 4793e91948fe4c30fffbfd21f0dcc3efe57c5095..0000000000000000000000000000000000000000 --- a/registry/event/service_revision_customizer.go +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package event - -import ( - "fmt" - "hash/crc32" - "sort" -) - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/metadata/service" - "github.com/apache/dubbo-go/registry" -) - -const defaultRevision = "N/A" - -func init() { - extension.AddCustomizers(&exportedServicesRevisionMetadataCustomizer{}) - extension.AddCustomizers(&subscribedServicesRevisionMetadataCustomizer{}) -} - -type exportedServicesRevisionMetadataCustomizer struct { -} - -// GetPriority will return 1 so that it will be invoked in front of user defining Customizer -func (e *exportedServicesRevisionMetadataCustomizer) GetPriority() int { - return 1 -} - -// Customize calculate the revision for exported urls and then put it into instance metadata -func (e *exportedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) { - ms, err := getMetadataService() - if err != nil { - logger.Errorf("could not get metadata service", err) - return - } - - urls, err := ms.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE) - - if err != nil { - logger.Errorf("could not find the exported url", err) - } - - revision := resolveRevision(urls) - if len(revision) == 0 { - revision = defaultRevision - } - instance.GetMetadata()[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] = revision -} - -type subscribedServicesRevisionMetadataCustomizer struct { -} - -// GetPriority will return 2 so that it will be invoked in front of user defining Customizer -func (e *subscribedServicesRevisionMetadataCustomizer) GetPriority() int { - return 2 -} - -// Customize calculate the revision for subscribed urls and then put it into instance metadata -func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance registry.ServiceInstance) { - ms, err := getMetadataService() - if err != nil { - logger.Errorf("could not get metadata service", err) - return - } - - urls, err := ms.GetSubscribedURLs() - - if err != nil { - logger.Errorf("could not find the subscribed url", err) - } - - revision := resolveRevision(service.ConvertURLArrToIntfArr(urls)) - if len(revision) == 0 { - revision = defaultRevision - } - instance.GetMetadata()[constant.SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME] = revision -} - -// resolveRevision is different from Dubbo because golang doesn't support overload -// so that we could use interface + method name as identifier and ignore the method params -// per my understanding, it's enough because Dubbo actually ignore the url params. -// please refer org.apache.dubbo.common.URL#toParameterString(java.lang.String...) -func resolveRevision(urls []interface{}) string { - if len(urls) == 0 { - return "" - } - candidates := make([]string, 0, len(urls)) - - for _, ui := range urls { - u, err := common.NewURL(ui.(string)) - if err != nil { - logger.Errorf("could not parse the string to URL structure") - continue - } - sk := u.GetParam(constant.INTERFACE_KEY, "") - - if len(u.Methods) == 0 { - candidates = append(candidates, sk) - } else { - for _, m := range u.Methods { - // methods are part of candidates - candidates = append(candidates, sk+constant.KEY_SEPARATOR+m) - } - } - - // append url params if we need it - } - sort.Strings(candidates) - - // it's nearly impossible to be overflow - res := uint64(0) - for _, c := range candidates { - res += uint64(crc32.ChecksumIEEE([]byte(c))) - } - return fmt.Sprint(res) -} diff --git a/registry/event_listener.go b/registry/event_listener.go index 9e9ec2d5d4bcb8d1af90fff73db1c6708427f7f7..4f1e5a356ba4e01b1de430ffdde95416e0c4abb2 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -18,6 +18,13 @@ package registry import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/service/inmemory" + "github.com/apache/dubbo-go/metadata/service/remote" + "github.com/apache/dubbo-go/remoting" + gxset "github.com/dubbogo/gost/container/set" "reflect" ) @@ -27,20 +34,127 @@ import ( // The Service Discovery Changed Event Listener type ServiceInstancesChangedListener struct { - ServiceName string - ChangedNotify observer.ChangedNotify + ServiceNames *gxset.HashSet + listeners map[string]NotifyListener + serviceUrls map[string][]*common.URL + revisionToMetadata map[string]*common.MetadataInfo + allInstances map[string][]ServiceInstance } // OnEvent on ServiceInstancesChangedEvent the service instances change event func (lstn *ServiceInstancesChangedListener) OnEvent(e observer.Event) error { - lstn.ChangedNotify.Notify(e) + ce, ok := e.(*ServiceInstancesChangedEvent) + if !ok { + return nil + } + var err error + lstn.allInstances[ce.ServiceName] = ce.Instances + revisionToInstances := make(map[string][]ServiceInstance) + newRevisionToMetadata := make(map[string]*common.MetadataInfo) + localServiceToRevisions := make(map[*common.ServiceInfo]*gxset.HashSet) + protocolRevisionsToUrls := make(map[string]map[*gxset.HashSet][]*common.URL) + newServiceURLs := make(map[string][]*common.URL) + + for _, instances := range lstn.allInstances { + for _, instance := range instances { + revision := instance.GetMetadata()[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] + if "0" == revision { + logger.Infof("Find instance without valid service metadata: %s", instance.GetHost()) + continue + } + subInstances := revisionToInstances[revision] + if subInstances == nil { + subInstances = make([]ServiceInstance, 8) + } + revisionToInstances[revision] = append(subInstances, instance) + metadataInfo := lstn.revisionToMetadata[revision] + if metadataInfo == nil { + metadataInfo, err = lstn.getMetadataInfo(instance, metadataInfo, revision) + if err != nil { + return err + } + } + for _, service := range metadataInfo.Services { + if localServiceToRevisions[service] == nil { + localServiceToRevisions[service] = gxset.NewSet() + } + localServiceToRevisions[service].Add(revision) + } + + newRevisionToMetadata[revision] = metadataInfo + } + lstn.revisionToMetadata = newRevisionToMetadata + + for serviceInstance, revisions := range localServiceToRevisions { + revisionsToUrls := protocolRevisionsToUrls[serviceInstance.Protocol] + urls := revisionsToUrls[revisions] + if urls != nil { + newServiceURLs[serviceInstance.GetMatchKey()] = urls + } else { + urls = make([]*common.URL, 0, 8) + for _, v := range revisions.Values() { + r := v.(string) + for _, i := range revisionToInstances[r] { + urls = append(urls, i.ToURLs()...) + } + } + revisionsToUrls[revisions] = urls + newServiceURLs[serviceInstance.GetMatchKey()] = urls + } + } + lstn.serviceUrls = newServiceURLs + + for key, notifyListener := range lstn.listeners { + urls := lstn.serviceUrls[key] + for _, url := range urls { + notifyListener.Notify(&ServiceEvent{ + Action: remoting.EventTypeAdd, + Service: url, + }) + } + } + } return nil } +func (lstn *ServiceInstancesChangedListener) getMetadataInfo(instance ServiceInstance, metadataInfo *common.MetadataInfo, revision string) (*common.MetadataInfo, error) { + metadataStorageType := instance.GetMetadata()[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] + if metadataStorageType == constant.REMOTE_METADATA_STORAGE_TYPE { + remoteMetadataService, err := remote.GetRemoteMetadataService() + if err != nil { + return nil, err + } + metadataInfo, err = remoteMetadataService.GetMetadata(instance) + if err != nil { + return nil, err + } + } else { + proxyFactory := inmemory.GetInMemoryMetadataServiceProxyFactory() + metadataService := proxyFactory.GetProxy(instance) + metadataInfo = metadataService.GetMetadataInfo(revision) + } + return metadataInfo, nil +} + +func (lstn *ServiceInstancesChangedListener) AddListenerAndNotify(serviceKey string, notify NotifyListener) { + lstn.listeners[serviceKey] = notify + urls := lstn.serviceUrls[serviceKey] + for _, url := range urls { + notify.Notify(&ServiceEvent{ + Action: remoting.EventTypeAdd, + Service: url, + }) + } +} + +func (lstn *ServiceInstancesChangedListener) RemoveListener(serviceKey string) { + delete(lstn.listeners, serviceKey) +} + // Accept return true if the name is the same func (lstn *ServiceInstancesChangedListener) Accept(e observer.Event) bool { if ce, ok := e.(*ServiceInstancesChangedEvent); ok { - return ce.ServiceName == lstn.ServiceName + return lstn.ServiceNames.Contains(ce.ServiceName) } return false } diff --git a/metadata/service/remote/metadata_service_proxy_factory.go b/registry/instance_address_url.go similarity index 68% rename from metadata/service/remote/metadata_service_proxy_factory.go rename to registry/instance_address_url.go index a1a8594282c581913d97586630f3e5e74305642d..efe33548be186e43bc6e800a6fe61d3a22cd90a1 100644 --- a/metadata/service/remote/metadata_service_proxy_factory.go +++ b/registry/instance_address_url.go @@ -15,16 +15,21 @@ * limitations under the License. */ -package remote +package registry -import ( - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/metadata/service" -) +import "github.com/apache/dubbo-go/common" -func init() { - factory := service.NewBaseMetadataServiceProxyFactory(newMetadataServiceProxy) - extension.SetMetadataServiceProxyFactory(remote, func() service.MetadataServiceProxyFactory { - return factory - }) +type InstanceAddressURL struct { + common.URL + instance *ServiceInstance + metadataInfo *common.MetadataInfo +} + +func NewInstanceAddressURL(instance *ServiceInstance, info *common.MetadataInfo) *common.URL { + + a := &InstanceAddressURL{ + instance: instance, + metadataInfo: info, + } + return &a.URL } diff --git a/registry/service_instance.go b/registry/service_instance.go index 43a1640eead7be1774556f25c9b8f97a75588801..5fb7d9ad9febad77222c1ed2cdbe4f9539b92750 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -18,7 +18,12 @@ package registry import ( + "encoding/json" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" gxsort "github.com/dubbogo/gost/sort" + "strconv" ) // ServiceInstance is the model class of an instance of a service, which is used for service registration and discovery. @@ -44,18 +49,38 @@ type ServiceInstance interface { // GetMetadata will return the metadata GetMetadata() map[string]string + + // ToURLs + ToURLs() []*common.URL + + // GetEndPoints + GetEndPoints() []*Endpoint + + // Copy + Copy(endpoint *Endpoint) ServiceInstance + + // GetAddress + GetAddress() string +} + +// nolint +type Endpoint struct { + Port int `json:"port, omitempty"` + Protocol string `json:"protocol, omitempty"` } // DefaultServiceInstance the default implementation of ServiceInstance // or change the ServiceInstance to be struct??? type DefaultServiceInstance struct { - Id string - ServiceName string - Host string - Port int - Enable bool - Healthy bool - Metadata map[string]string + Id string + ServiceName string + Host string + Port int + Enable bool + Healthy bool + Metadata map[string]string + ServiceMetadata *common.MetadataInfo + Address string } // GetId will return this instance's id. It should be unique. @@ -88,6 +113,59 @@ func (d *DefaultServiceInstance) IsHealthy() bool { return d.Healthy } +func (d *DefaultServiceInstance) GetAddress() string { + if d.Address != "" { + return d.Address + } + if d.Port <= 0 { + d.Address = d.Host + } else { + d.Address = d.Host + ":" + strconv.Itoa(d.Port) + } + return d.Address +} + +// ToURLs +func (d *DefaultServiceInstance) ToURLs() []*common.URL { + urls := make([]*common.URL, 0, 8) + for _, service := range d.ServiceMetadata.Services { + url := common.NewURLWithOptions(common.WithProtocol(service.Protocol), + common.WithIp(d.Host), common.WithPort(strconv.Itoa(d.Port)), + common.WithMethods(service.GetMethods()), common.WithParams(service.GetParams())) + urls = append(urls, url) + } + return urls +} + +func (d *DefaultServiceInstance) GetEndPoints() []*Endpoint { + rawEndpoints := d.Metadata[constant.SERVICE_INSTANCE_ENDPOINTS] + if len(rawEndpoints) == 0 { + return nil + } + var endpoints []*Endpoint + err := json.Unmarshal([]byte(rawEndpoints), &endpoints) + if err != nil { + logger.Errorf("json umarshal rawEndpoints[%s] catch error:%s", rawEndpoints, err.Error()) + return nil + } + return endpoints +} + +func (d *DefaultServiceInstance) Copy(endpoint *Endpoint) ServiceInstance { + dn := &DefaultServiceInstance{ + Id: d.Id, + ServiceName: d.ServiceName, + Host: d.Host, + Port: endpoint.Port, + Enable: d.Enable, + Healthy: d.Healthy, + Metadata: d.Metadata, + ServiceMetadata: d.ServiceMetadata, + } + dn.Id = d.GetAddress() + return dn +} + // GetMetadata will return the metadata, it will never return nil func (d *DefaultServiceInstance) GetMetadata() map[string]string { if d.Metadata == nil { diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index c97a7f7b51d2344f1a3fc0c59582e089f1e63b62..614efaf51fa41996b9c58e67cccaa8b2e0095683 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -19,8 +19,6 @@ package servicediscovery import ( "bytes" - "encoding/json" - "strconv" "strings" "sync" ) @@ -36,15 +34,14 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/metadata/mapping" "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service/exporter/configurable" + "github.com/apache/dubbo-go/metadata/service/inmemory" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry/event" "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" - "github.com/apache/dubbo-go/remoting" ) const ( @@ -70,6 +67,7 @@ type serviceDiscoveryRegistry struct { registeredListeners *gxset.HashSet subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer serviceRevisionExportedURLsCache map[string]map[string][]*common.URL + serviceListeners map[string]*registry.ServiceInstancesChangedListener } func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { @@ -83,7 +81,7 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() serviceNameMapping := extension.GetGlobalServiceNameMapping() - metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType) + metaDataService, err := inmemory.GetInMemoryMetadataService() if err != nil { return nil, perrors.WithMessage(err, "could not init metadata service") } @@ -110,7 +108,19 @@ func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registr if !shouldSubscribe(url) { return nil } - return s.metaDataService.UnsubscribeURL(url) + err := s.metaDataService.UnsubscribeURL(url) + if err != nil { + return err + } + services := s.getServices(url) + if services == nil { + return nil + } + // FIXME ServiceNames.String() is not good + serviceNamesKey := services.String() + l := s.serviceListeners[serviceNamesKey] + l.RemoveListener(url.ServiceKey()) + return nil } func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { @@ -149,8 +159,10 @@ func (s *serviceDiscoveryRegistry) GetUrl() *common.URL { } func (s *serviceDiscoveryRegistry) IsAvailable() bool { - // TODO(whether available depends on metadata service and service discovery) - return true + if s.serviceDiscovery.GetServices() == nil { + return false + } + return len(s.serviceDiscovery.GetServices().Values()) > 0 } func (s *serviceDiscoveryRegistry) Destroy() { @@ -202,24 +214,35 @@ func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.No return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+ "subscription url:%s", url.String()) } - for _, srv := range services.Values() { - serviceName := srv.(string) - serviceInstances := s.serviceDiscovery.GetInstances(serviceName) - s.subscribe(url, notify, serviceName, serviceInstances) - listener := ®istry.ServiceInstancesChangedListener{ - ServiceName: serviceName, - ChangedNotify: &InstanceChangeNotify{ - notify: notify, - serviceDiscoveryRegistry: s, - }, + // FIXME ServiceNames.String() is not good + serviceNamesKey := services.String() + protocolServiceKey := url.ServiceKey() + ":" + url.Protocol + listener := s.serviceListeners[serviceNamesKey] + if listener == nil { + listener = ®istry.ServiceInstancesChangedListener{ + ServiceNames: services, + } + for _, serviceNameTmp := range services.Values() { + serviceName := serviceNameTmp.(string) + instances := s.serviceDiscovery.GetInstances(serviceName) + err = listener.OnEvent(®istry.ServiceInstancesChangedEvent{ + ServiceName: serviceName, + Instances: instances, + }) + if err != nil { + logger.Warnf("[ServiceDiscoveryRegistry] ServiceInstancesChangedListener handle error:%v", err) + } } - s.registerServiceInstancesChangedListener(url, listener) } + s.serviceListeners[serviceNamesKey] = listener + listener.AddListenerAndNotify(protocolServiceKey, notify) + s.registerServiceInstancesChangedListener(url, listener) return nil } func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url *common.URL, listener *registry.ServiceInstancesChangedListener) { - listenerId := listener.ServiceName + ":" + getUrlKey(url) + // FIXME ServiceNames.String() is not good + listenerId := listener.ServiceNames.String() + ":" + getUrlKey(url) if !s.subscribedServices.Contains(listenerId) { err := s.serviceDiscovery.AddListener(listener) if err != nil { @@ -257,26 +280,6 @@ func appendParam(buffer bytes.Buffer, paramKey string, url *common.URL) { buffer.WriteString(url.GetParam(paramKey, "")) } -func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener, - serviceName string, serviceInstances []registry.ServiceInstance) { - if len(serviceInstances) == 0 { - logger.Warnf("here is no instance in service[name : %s]", serviceName) - return - } - var subscribedURLs []*common.URL - subscribedURLs = append(subscribedURLs, s.getExportedUrls(url, serviceInstances)...) - if len(subscribedURLs) == 0 { - subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances) - } - for _, url := range subscribedURLs { - notify.Notify(®istry.ServiceEvent{ - Action: remoting.EventTypeAdd, - Service: url, - }) - } - -} - func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []*common.URL { var urls []*common.URL for _, syn := range s.subscribedURLsSynthesizers { @@ -320,300 +323,6 @@ func (s *serviceDiscoveryRegistry) findMappedServices(url *common.URL) *gxset.Ha return serviceNames } -func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []*common.URL { - var filterInstances []registry.ServiceInstance - for _, s := range serviceInstances { - if !s.IsEnable() || !s.IsHealthy() { - continue - } - metaData := s.GetMetadata() - _, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME] - _, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME] - if !ok1 && !ok2 { - continue - } - filterInstances = append(filterInstances, s) - } - if len(filterInstances) == 0 { - return []*common.URL{} - } - s.prepareServiceRevisionExportedURLs(filterInstances) - subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances) - return subscribedURLs -} - -func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []*common.URL { - var urls []*common.URL - metadataStorageType := getExportedStoreType(serviceInstance) - proxyFactory := extension.GetMetadataServiceProxyFactory(metadataStorageType) - if proxyFactory == nil { - return urls - } - metadataService := proxyFactory.GetProxy(serviceInstance) - if metadataService == nil { - return urls - } - result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE) - if err != nil { - logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance) - return urls - } - - ret := make([]*common.URL, 0, len(result)) - for _, ui := range result { - - u, err := common.NewURL(ui.(string)) - - if err != nil { - logger.Errorf("could not parse the url string to URL structure: %s", ui.(string), err) - continue - } - ret = append(ret, u) - } - return ret -} - -func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { - s.lock.Lock() - // 1. expunge stale - s.expungeStaleRevisionExportedURLs(serviceInstances) - // 2. Initialize - s.initRevisionExportedURLs(serviceInstances) - s.lock.Unlock() -} - -func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { - serviceName := serviceInstances[0].GetServiceName() - revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName] - if !exist { - return - } - existRevision := gxset.NewSet() - for k := range revisionExportedURLsMap { - existRevision.Add(k) - } - currentRevision := gxset.NewSet() - for _, s := range serviceInstances { - rv := getExportedServicesRevision(s) - if len(rv) != 0 { - currentRevision.Add(rv) - } - } - // staleRevisions = existedRevisions(copy) - currentRevisions - staleRevision := gxset.NewSet(existRevision.Values()...) - staleRevision.Remove(currentRevision.Values()...) - // remove exported URLs if staled - for _, s := range staleRevision.Values() { - delete(revisionExportedURLsMap, s.(string)) - } -} - -func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { - // initialize the revision exported URLs that the selected service instance exported - s.initSelectedRevisionExportedURLs(serviceInstances) - // initialize the revision exported URLs that other service instances exported - for _, serviceInstance := range serviceInstances { - s.initRevisionExportedURLsByInst(serviceInstance) - } -} - -func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { - for range serviceInstances { - selectServiceInstance := s.selectServiceInstance(serviceInstances) - revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance) - if len(revisionExportedURLs) > 0 { - // If the result is valid,break - break - } - } -} - -func (s *serviceDiscoveryRegistry) selectServiceInstance(serviceInstances []registry.ServiceInstance) registry.ServiceInstance { - size := len(serviceInstances) - if size == 0 { - return nil - } - if size == 1 { - return serviceInstances[0] - } - selectorName := s.url.GetParam(constant.SERVICE_INSTANCE_SELECTOR, "random") - selector, err := extension.GetServiceInstanceSelector(selectorName) - if err != nil { - logger.Errorf("get service instance selector cathe error:%s", err.Error()) - return nil - } - return selector.Select(s.url, serviceInstances) -} - -func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstance registry.ServiceInstance) []*common.URL { - if serviceInstance == nil { - return nil - } - serviceName := serviceInstance.GetServiceName() - revision := getExportedServicesRevision(serviceInstance) - revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName] - if revisionExportedURLsMap == nil { - revisionExportedURLsMap = make(map[string][]*common.URL, 4) - s.serviceRevisionExportedURLsCache[serviceName] = revisionExportedURLsMap - } - revisionExportedURLs := revisionExportedURLsMap[revision] - firstGet := false - if len(revisionExportedURLs) == 0 { - if len(revisionExportedURLsMap) > 0 { - // The case is that current ServiceInstance with the different revision - logger.Warnf("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s"+ - ", please make sure the service [name : %s] is changing or not.", serviceInstance.GetId(), - serviceInstance.GetHost(), serviceInstance.GetPort(), revision, serviceInstance.GetServiceName()) - } else { - firstGet = true - } - revisionExportedURLs = s.getExportedUrlsByInst(serviceInstance) - if revisionExportedURLs != nil { - revisionExportedURLsMap[revision] = revisionExportedURLs - logger.Debugf("Get the exported URLs[size : %s, first : %s] from the target service "+ - "instance [id: %s , service : %s , host : %s , port : %s , revision : %s]", - len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(), - serviceInstance.GetHost(), serviceInstance.GetPort(), revision) - } - } else { - // Else, The cache is hit - logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+ - "[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet, - serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(), - serviceInstance.GetPort(), revision) - } - return revisionExportedURLs -} - -func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string { - metaData := serviceInstance.GetMetadata() - return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] -} - -func getExportedStoreType(serviceInstance registry.ServiceInstance) string { - metaData := serviceInstance.GetMetadata() - result, ok := metaData[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] - if !ok { - return constant.DEFAULT_METADATA_STORAGE_TYPE - } - return result -} - -func (s *serviceDiscoveryRegistry) cloneExportedURLs(url *common.URL, serviceInsances []registry.ServiceInstance) []*common.URL { - if len(serviceInsances) == 0 { - return []*common.URL{} - } - var clonedExportedURLs []*common.URL - removeParamSet := gxset.NewSet() - removeParamSet.Add(constant.PID_KEY) - removeParamSet.Add(constant.TIMESTAMP_KEY) - for _, serviceInstance := range serviceInsances { - templateExportURLs := s.getTemplateExportedURLs(url, serviceInstance) - host := serviceInstance.GetHost() - for _, u := range templateExportURLs { - port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol)) - if u.Location != host || u.Port != port { - u.Port = port // reset port - u.Location = host + ":" + port // reset host - } - - cloneUrl := u.CloneExceptParams(removeParamSet) - clonedExportedURLs = append(clonedExportedURLs, cloneUrl) - } - } - return clonedExportedURLs - -} - -type endpoint struct { - Port int `json:"port,omitempty"` - Protocol string `json:"protocol,omitempty"` -} - -func getProtocolPort(serviceInstance registry.ServiceInstance, protocol string) int { - md := serviceInstance.GetMetadata() - rawEndpoints := md[constant.SERVICE_INSTANCE_ENDPOINTS] - if len(rawEndpoints) == 0 { - return -1 - } - var endpoints []endpoint - err := json.Unmarshal([]byte(rawEndpoints), &endpoints) - if err != nil { - logger.Errorf("json umarshal rawEndpoints[%s] catch error:%s", rawEndpoints, err.Error()) - return -1 - } - for _, e := range endpoints { - if e.Protocol == protocol { - return e.Port - } - } - return -1 -} -func (s *serviceDiscoveryRegistry) getTemplateExportedURLs(url *common.URL, serviceInstance registry.ServiceInstance) []*common.URL { - exportedURLs := s.getRevisionExportedURLs(serviceInstance) - if len(exportedURLs) == 0 { - return []*common.URL{} - } - return filterSubscribedURLs(url, exportedURLs) -} - -func (s *serviceDiscoveryRegistry) getRevisionExportedURLs(serviceInstance registry.ServiceInstance) []*common.URL { - if serviceInstance == nil { - return []*common.URL{} - } - serviceName := serviceInstance.GetServiceName() - revision := getExportedServicesRevision(serviceInstance) - s.lock.RLock() - revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName] - if !exist { - return []*common.URL{} - } - exportedURLs, exist := revisionExportedURLsMap[revision] - if !exist { - return []*common.URL{} - } - s.lock.RUnlock() - // Get a copy from source in order to prevent the caller trying to change the cached data - cloneExportedURLs := make([]*common.URL, len(exportedURLs)) - copy(cloneExportedURLs, exportedURLs) - return cloneExportedURLs -} - -func filterSubscribedURLs(subscribedURL *common.URL, exportedURLs []*common.URL) []*common.URL { - var filterExportedURLs []*common.URL - for _, url := range exportedURLs { - if url.GetParam(constant.INTERFACE_KEY, url.Path) != subscribedURL.GetParam(constant.INTERFACE_KEY, url.Path) { - break - } - if url.GetParam(constant.VERSION_KEY, "") != subscribedURL.GetParam(constant.VERSION_KEY, "") { - break - } - if url.GetParam(constant.GROUP_KEY, "") != subscribedURL.GetParam(constant.GROUP_KEY, "") { - break - } - if len(subscribedURL.Protocol) != 0 { - if subscribedURL.Protocol != url.Protocol { - break - } - } - filterExportedURLs = append(filterExportedURLs, url) - } - return filterExportedURLs -} - -type InstanceChangeNotify struct { - notify registry.NotifyListener - serviceDiscoveryRegistry *serviceDiscoveryRegistry -} - -func (icn *InstanceChangeNotify) Notify(event observer.Event) { - - if se, ok := event.(*registry.ServiceInstancesChangedEvent); ok { - sdr := icn.serviceDiscoveryRegistry - sdr.subscribe(sdr.url.SubURL, icn.notify, se.ServiceName, se.Instances) - } -} - var ( exporting = &atomic.Bool{} ) @@ -622,7 +331,7 @@ var ( // TODO (move to somewhere) func tryInitMetadataService(url *common.URL) { - ms, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType) + ms, err := inmemory.GetInMemoryMetadataService() if err != nil { logger.Errorf("could not init metadata service", err) } diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go index 095a78dc3c1650197ba1ded75218e8fd0020b6ea..f77adf678a7e5fd9033d0bd80435da9eb55c3ab9 100644 --- a/registry/zookeeper/service_discovery.go +++ b/registry/zookeeper/service_discovery.go @@ -270,8 +270,15 @@ func (zksd *zookeeperServiceDiscovery) GetRequestInstances(serviceNames []string func (zksd *zookeeperServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { zksd.listenLock.Lock() defer zksd.listenLock.Unlock() - zksd.listenNames = append(zksd.listenNames, listener.ServiceName) - zksd.csd.ListenServiceEvent(listener.ServiceName, zksd) + for _, t := range listener.ServiceNames.Values() { + serviceName, ok := t.(string) + if !ok { + logger.Errorf("service name error %s", t) + continue + } + zksd.listenNames = append(zksd.listenNames, serviceName) + zksd.csd.ListenServiceEvent(serviceName, zksd) + } return nil }