diff --git a/common/constant/key.go b/common/constant/key.go index da21a3a9e1254d5a22d670a11c5c01022892e096..a2f893875719171ace6cdd40f2ce0d5e1c609656 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -79,6 +79,7 @@ const ( EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler" PROVIDER_SHUTDOWN_FILTER = "pshutdown" CONSUMER_SHUTDOWN_FILTER = "cshutdown" + PID_KEY = "pid" ) const ( @@ -255,5 +256,16 @@ const ( // service discovery const ( - NACOS_GROUP = "nacos.group" + NACOS_GROUP = "nacos.group" + SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services" + PROVIDER_BY = "provided-by" + EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision" + SERVICE_INSTANCE_SELECTOR = "service-instance-selector" + METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type" + DEFAULT_METADATA_STORAGE_TYPE = "local" + SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints" + METADATA_SERVICE_PREFIX = "dubbo.metadata-service." + METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" + METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" + SERVICE_NAME_MAPPING_KEY = "service-name-mapping" ) diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go index 25b80cf3353505c058bea40cc4c80712ad923d2d..488d94ebf13162a135a1e38b3375cb69532a5f25 100644 --- a/common/extension/service_discovery.go +++ b/common/extension/service_discovery.go @@ -18,11 +18,11 @@ package extension import ( + "github.com/apache/dubbo-go/registry" perrors "github.com/pkg/errors" ) import ( "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/registry" ) var ( diff --git a/common/extension/service_instance_selector_factory.go b/common/extension/service_instance_selector_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..1dd1ac6c996d6bf392558a3f6543acf7a8225ab6 --- /dev/null +++ b/common/extension/service_instance_selector_factory.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/registry/service/instance" + perrors "github.com/pkg/errors" +) + +var ( + serviceInstanceSelectorMappings = make(map[string]func() instance.ServiceInstanceSelector) +) + +func SetServiceInstanceSelector(name string, f func() instance.ServiceInstanceSelector) { + serviceInstanceSelectorMappings[name] = f +} + +func GetServiceInstanceSelector(name string) (instance.ServiceInstanceSelector, error) { + serviceInstanceSelector, ok := serviceInstanceSelectorMappings[name] + if !ok { + return nil, perrors.New("Could not find service instance selector with" + + "name:" + name) + } + return serviceInstanceSelector(), nil +} diff --git a/common/extension/service_name_mapping.go b/common/extension/service_name_mapping.go new file mode 100644 index 0000000000000000000000000000000000000000..1598430752d3f54d08374a9e5367217b52bf6850 --- /dev/null +++ b/common/extension/service_name_mapping.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/metadata/mapping" +) + +var ( + nameMappings = make(map[string]func() mapping.ServiceNameMapping) +) + +func SetServiceNameMapping(name string, creator func() mapping.ServiceNameMapping) { + // TODO(@閭撳ぇ鏄�) +} + +func GetServiceNameMapping(name string) mapping.ServiceNameMapping { + // TODO(@閭撳ぇ鏄�) + return nil +} diff --git a/go.mod b/go.mod index 1c3c52d5794b98c782caedc57d00c7adae32ba61..34b95a471362a0a7fc291d3cb4d9947a533c834c 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect + github.com/gotestyourself/gotestyourself v2.2.0+incompatible github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect diff --git a/registry/event_listener.go b/registry/event_listener.go index 0aaa081a4488de5c291485277cdc0fc14ef4e380..23da690e55cba3364efff745e410fb8b8f3909de 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -25,13 +25,14 @@ import ( "github.com/apache/dubbo-go/common/observer" ) -// TODO (implement ConditionalEventListener) type ServiceInstancesChangedListener struct { ServiceName string - observer.EventListener + observer.ConditionalEventListener + ChangedNotify ChangedNotify } -func (sicl *ServiceInstancesChangedListener) OnEvent(e observer.Event) error { +func (sicl *ServiceInstancesChangedListener) OnEvent(e ServiceInstancesChangedEvent) error { + sicl.ChangedNotify.Notify(e) return nil } @@ -42,3 +43,11 @@ func (sicl *ServiceInstancesChangedListener) GetPriority() int { func (sicl *ServiceInstancesChangedListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceInstancesChangedEvent{}) } + +func (sicl *ServiceInstancesChangedListener) Accept(e ServiceInstancesChangedEvent) bool { + return e.ServiceName == sicl.ServiceName +} + +type ChangedNotify interface { + Notify(e ServiceInstancesChangedEvent) +} diff --git a/registry/service/instance/random/random_service_instance_selector.go b/registry/service/instance/random/random_service_instance_selector.go new file mode 100644 index 0000000000000000000000000000000000000000..47f84c76c8d62b26fe74e9302424e5d1e452d0ee --- /dev/null +++ b/registry/service/instance/random/random_service_instance_selector.go @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package random + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/registry/service/instance" + "math/rand" + "time" +) + +func init() { + extension.SetServiceInstanceSelector("random", NewRandomServiceInstanceSelector) +} + +type RandomServiceInstanceSelector struct { +} + +func NewRandomServiceInstanceSelector() instance.ServiceInstanceSelector { + return &RandomServiceInstanceSelector{} +} + +func (r *RandomServiceInstanceSelector) Select(url common.URL, serviceInstances []registry.ServiceInstance) registry.ServiceInstance { + if len(serviceInstances) == 0 { + return nil + } + if len(serviceInstances) == 1 { + return serviceInstances[0] + } + rand.Seed(time.Now().UnixNano()) + index := rand.Intn(len(serviceInstances)) + return serviceInstances[index] + +} diff --git a/registry/service/instance/random/random_service_instance_selector_test.go b/registry/service/instance/random/random_service_instance_selector_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ed4b7838fca9ba30600cd7f0f684017598feee95 --- /dev/null +++ b/registry/service/instance/random/random_service_instance_selector_test.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package random + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestRandomServiceInstanceSelector_Select(t *testing.T) { + selector := NewRandomServiceInstanceSelector() + serviceInstances := []registry.ServiceInstance{ + ®istry.DefaultServiceInstance{ + Id: "1", + ServiceName: "test1", + Host: "127.0.0.1:80", + Port: 0, + Enable: false, + Healthy: false, + Metadata: nil, + }, + ®istry.DefaultServiceInstance{ + Id: "2", + ServiceName: "test2", + Host: "127.0.0.1:80", + Port: 0, + Enable: false, + Healthy: false, + Metadata: nil, + }, + } + assert.NotNil(t, selector.Select(common.URL{}, serviceInstances)) +} diff --git a/registry/service/instance/service_instance_selector.go b/registry/service/instance/service_instance_selector.go new file mode 100644 index 0000000000000000000000000000000000000000..acacde5617342513e490c78fab6363142970f320 --- /dev/null +++ b/registry/service/instance/service_instance_selector.go @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package instance + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" +) + +type ServiceInstanceSelector interface { + Select(url common.URL, serviceInstances []registry.ServiceInstance) registry.ServiceInstance +} diff --git a/registry/service/proxy/metadata_service_proxy_factory.go b/registry/service/proxy/metadata_service_proxy_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..7e8d891bf3566c298bb62527df1040ef82ad8fe9 --- /dev/null +++ b/registry/service/proxy/metadata_service_proxy_factory.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package proxy + +var ( + serviceProxy = make(map[string]func() BaseMetadataServiceProxy) +) + +func SetMetadataServiceProxy(name string, creator func() BaseMetadataServiceProxy) { + //TODO +} + +func GetMetadataServiceProxy(name string) BaseMetadataServiceProxy { + //TODO + return nil +} diff --git a/registry/service/proxy/service_proxy.go b/registry/service/proxy/service_proxy.go new file mode 100644 index 0000000000000000000000000000000000000000..7a14201fdd81dcf663d08260058c631e35f8002f --- /dev/null +++ b/registry/service/proxy/service_proxy.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package proxy + +import ( + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/registry" +) + +type BaseMetadataServiceProxy interface { + GetProxy(serviceInstance registry.ServiceInstance) service.MetadataService + + CreateProxy(serviceInstance registry.ServiceInstance) service.MetadataService +} diff --git a/registry/service/service_discovery_registry.go b/registry/service/service_discovery_registry.go new file mode 100644 index 0000000000000000000000000000000000000000..b3c9e6a632054e89e787aa45720b84d076a02971 --- /dev/null +++ b/registry/service/service_discovery_registry.go @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +import ( + "bytes" + "encoding/json" + cm "github.com/Workiva/go-datastructures/common" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/mapping" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/metadata/service/inmemory" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/registry/service/proxy" + "github.com/apache/dubbo-go/registry/service/synthesizer" + "github.com/apache/dubbo-go/remoting" + gxset "github.com/dubbogo/gost/container/set" + "strconv" + "strings" + "sync" +) + +const ( + SERVICE_REGISTRY_PROTOCOL = "service-discovery-registry" +) + +func init() { + extension.SetRegistry(SERVICE_REGISTRY_PROTOCOL, newServiceDiscoveryRegistry) +} + +type serviceDiscoveryRegistry struct { + lock sync.RWMutex + url *common.URL + serviceDiscovery registry.ServiceDiscovery + subscribedServices *gxset.HashSet + serviceNameMapping mapping.ServiceNameMapping + metaDataService service.MetadataService + registeredListeners *gxset.HashSet + subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer + serviceRevisionExportedURLsCache map[string]map[string][]common.URL +} + +func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { + serviceDiscovery, err := creatServiceDiscovery(url) + if err != nil { + return nil, err + } + subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) + subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() + serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) + //TODO it's need to get implement by factory + metaDataService := inmemory.NewMetadataService() + return &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: serviceDiscovery, + subscribedServices: subscribedServices, + subscribedURLsSynthesizers: subscribedURLsSynthesizers, + registeredListeners: gxset.NewSet(), + serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), + serviceNameMapping: serviceNameMapping, + metaDataService: metaDataService, + }, nil +} + +func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { + return extension.GetServiceDiscovery(url.Protocol, url) +} + +func parseServices(literalServices string) *gxset.HashSet { + set := gxset.NewSet() + if len(literalServices) == 0 { + return set + } + var splitServices = strings.Split(literalServices, ",") + for _, s := range splitServices { + if len(s) != 0 { + set.Add(s) + } + } + return set +} + +func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery { + return s.serviceDiscovery +} + +func (s *serviceDiscoveryRegistry) GetUrl() common.URL { + return *s.url +} + +func (s *serviceDiscoveryRegistry) IsAvailable() bool { + return true +} + +func (s *serviceDiscoveryRegistry) Destroy() { + err := s.serviceDiscovery.Destroy() + if err != nil { + logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error()) + } +} + +func (s *serviceDiscoveryRegistry) Register(url common.URL) error { + if !shouldRegister(url) { + return nil + } + ok, err := s.metaDataService.ExportURL(url) + if err != nil { + logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error()) + return err + } + if ok { + logger.Infof("The URL[%s] registry successfully!", url.String()) + } else { + logger.Warnf("The URL[%s] has been registry!", url.String()) + } + return nil +} + +func shouldRegister(url common.URL) bool { + side := url.GetParam(constant.SIDE_KEY, "") + if side == constant.PROVIDER_PROTOCOL { + return true + } + logger.Debugf("The URL should not be register.", url.String()) + return false +} + +func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) { + if !shouldSubscribe(*url) { + return + } + _, err := s.metaDataService.SubscribeURL(*url) + if err != nil { + logger.Errorf("subscribe url[%s] catch error:%s", url.String(), err.Error()) + return + } + services := s.getServices(*url) + if services.Empty() { + logger.Errorf("Should has at least one way to know which services this interface belongs to, "+ + "subscription url:%s", url.String()) + return + } + for _, srv := range services.Values() { + serviceName := srv.(string) + serviceInstances := s.serviceDiscovery.GetInstances(serviceName) + s.subscribe(url, notify, serviceName, serviceInstances) + listener := ®istry.ServiceInstancesChangedListener{ + ServiceName: serviceName, + ChangedNotify: &InstanceChangeNotify{ + notify: notify, + serviceDiscoveryRegistry: s, + }, + } + s.registerServiceInstancesChangedListener(*url, listener) + } + +} +func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) { + listenerId := listener.ServiceName + ":" + getUrlKey(url) + if !s.subscribedServices.Contains(listenerId) { + err := s.serviceDiscovery.AddListener(listener) + if err != nil { + logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error()) + } + } + +} + +func getUrlKey(url common.URL) string { + var bf bytes.Buffer + if len(url.Protocol) != 0 { + bf.WriteString(url.Protocol) + bf.WriteString("://") + } + if len(url.Location) != 0 { + bf.WriteString(url.Location) + bf.WriteString(":") + bf.WriteString(url.Port) + } + if len(url.Path) != 0 { + bf.WriteString("/") + bf.WriteString(url.Path) + } + bf.WriteString("?") + appendParam(bf, constant.VERSION_KEY, url) + appendParam(bf, constant.GROUP_KEY, url) + appendParam(bf, constant.NACOS_PROTOCOL_KEY, url) + return bf.String() +} + +func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) { + buffer.WriteString(paramKey) + buffer.WriteString("=") + buffer.WriteString(url.GetParam(paramKey, "")) +} + +func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener, + serviceName string, serviceInstances []registry.ServiceInstance) { + if len(serviceInstances) == 0 { + logger.Warnf("here is no instance in service[name : %s]", serviceName) + return + } + var subscribedURLs []common.URL + subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...) + if len(subscribedURLs) == 0 { + subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances) + } + //TODO make sure it's workable + for _, url := range subscribedURLs { + notify.Notify(®istry.ServiceEvent{ + Action: remoting.EventTypeAdd, + Service: url, + }) + } + +} + +func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL { + var urls []common.URL + for _, syn := range s.subscribedURLsSynthesizers { + if syn.Support(subscribedURL) { + urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...) + } + } + return urls +} +func shouldSubscribe(url common.URL) bool { + return !shouldRegister(url) +} + +func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet { + services := gxset.NewSet() + serviceNames := url.GetParam(constant.PROVIDER_BY, "") + if len(serviceNames) != 0 { + services = parseServices(serviceNames) + } + if services.Empty() { + services = s.findMappedServices(url) + if services.Empty() { + return s.subscribedServices + } + } + return services +} + +func (s *serviceDiscoveryRegistry) findMappedServices(url common.URL) *gxset.HashSet { + serviceInterface := url.GetParam(constant.INTERFACE_KEY, url.Path) + group := url.GetParam(constant.GROUP_KEY, "") + version := url.GetParam(constant.VERSION_KEY, "") + protocol := url.Protocol + serviceNames, err := s.serviceNameMapping.Get(serviceInterface, group, version, protocol) + if err != nil { + logger.Errorf("get serviceInterface:[%s] group:[%s] version:[%s] protocol:[%s] from "+ + "serviceNameMap error:%s", err.Error()) + return gxset.NewSet() + } + return serviceNames +} + +func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL common.URL, serviceInstances []registry.ServiceInstance) []common.URL { + var filterInstances []registry.ServiceInstance + for _, s := range serviceInstances { + if !s.IsEnable() || !s.IsHealthy() { + continue + } + metaData := s.GetMetadata() + _, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME] + _, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME] + if !ok1 && !ok2 { + continue + } + filterInstances = append(filterInstances, s) + } + if len(filterInstances) == 0 { + return []common.URL{} + } + s.prepareServiceRevisionExportedURLs(filterInstances) + subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances) + return subscribedURLs +} + +// comparator is defined as Comparator for skip list to compare the URL +type comparator common.URL + +// Compare is defined as Comparator for skip list to compare the URL +func (c comparator) Compare(comp cm.Comparator) int { + a := common.URL(c).String() + b := common.URL(comp.(comparator)).String() + switch { + case a > b: + return 1 + case a < b: + return -1 + default: + return 0 + } +} + +func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []common.URL { + var urls []common.URL + metadataStorageType := getExportedStoreType(serviceInstance) + metadataProxy := proxy.GetMetadataServiceProxy(metadataStorageType) + if metadataProxy == nil { + return urls + } + metadataService := metadataProxy.GetProxy(serviceInstance) + if metadataService == nil { + return urls + } + result, err := metadataService.GetExportedURLs("*", "", "", "") + if err != nil { + logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance) + return urls + } + if result == nil { + logger.Errorf("get empty exported urls,instance:%+v", serviceInstance) + return urls + } + for i := uint64(0); i < result.Len(); i++ { + urls = append(urls, common.URL(result.ByPosition(i).(comparator))) + } + return urls +} + +func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { + s.lock.Lock() + // 1. expunge stale + s.expungeStaleRevisionExportedURLs(serviceInstances) + // 2. Initialize + s.initRevisionExportedURLs(serviceInstances) + s.lock.Unlock() +} + +func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { + serviceName := serviceInstances[0].GetServiceName() + revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName] + if !exist { + return + } + existRevision := gxset.NewSet() + for k := range revisionExportedURLsMap { + existRevision.Add(k) + } + currentRevision := gxset.NewSet() + for _, s := range serviceInstances { + rv := getExportedServicesRevision(s) + if len(rv) != 0 { + currentRevision.Add(rv) + } + } + // staleRevisions = existedRevisions(copy) - currentRevisions + staleRevision := gxset.NewSet(existRevision.Values()...) + staleRevision.Remove(currentRevision.Values()...) + // remove exported URLs if staled + for _, s := range staleRevision.Values() { + delete(revisionExportedURLsMap, s.(string)) + } +} + +func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { + // initialize the revision exported URLs that the selected service instance exported + s.initSelectedRevisionExportedURLs(serviceInstances) + // initialize the revision exported URLs that other service instances exported + for _, serviceInstance := range serviceInstances { + s.initRevisionExportedURLsByInst(serviceInstance) + } +} + +func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { + for range serviceInstances { + selectServiceInstance := s.selectServiceInstance(serviceInstances) + revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance) + if len(revisionExportedURLs) != 0 { + // If the result is valid,break + break + } + } +} + +func (s *serviceDiscoveryRegistry) selectServiceInstance(serviceInstances []registry.ServiceInstance) registry.ServiceInstance { + size := len(serviceInstances) + if size == 0 { + return nil + } + if size == 1 { + return serviceInstances[0] + } + selectorName := s.url.GetParam(constant.SERVICE_INSTANCE_SELECTOR, "random") + selector, err := extension.GetServiceInstanceSelector(selectorName) + if err != nil { + logger.Errorf("get service instance selector cathe error:%s", err.Error()) + return nil + } + return selector.Select(*s.url, serviceInstances) +} + +func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstance registry.ServiceInstance) []common.URL { + if serviceInstance == nil { + return []common.URL{} + } + serviceName := serviceInstance.GetServiceName() + revision := getExportedServicesRevision(serviceInstance) + revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName] + revisionExportedURLs := revisionExportedURLsMap[revision] + firstGet := false + if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 { + if len(revisionExportedURLsMap) > 0 { + // The case is that current ServiceInstance with the different revision + logger.Warnf("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s"+ + ", please make sure the service [name : %s] is changing or not.", serviceInstance.GetId(), + serviceInstance.GetHost(), serviceInstance.GetPort(), revision, serviceInstance.GetServiceName()) + } else { + firstGet = true + } + revisionExportedURLs = s.getExportedUrlsByInst(serviceInstance) + if revisionExportedURLs != nil { + revisionExportedURLsMap[revision] = revisionExportedURLs + logger.Debugf("Get the exported URLs[size : %s, first : %s] from the target service "+ + "instance [id: %s , service : %s , host : %s , port : %s , revision : %s]", + len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(), + serviceInstance.GetHost(), serviceInstance.GetPort(), revision) + } + } else { + //Else, The cache is hit + logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+ + "[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet, + serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(), + serviceInstance.GetPort(), revision) + } + return revisionExportedURLs +} + +func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string { + metaData := serviceInstance.GetMetadata() + return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME] +} + +func getExportedStoreType(serviceInstance registry.ServiceInstance) string { + metaData := serviceInstance.GetMetadata() + result, ok := metaData[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] + if !ok { + return constant.DEFAULT_METADATA_STORAGE_TYPE + } + return result +} + +func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsances []registry.ServiceInstance) []common.URL { + if serviceInsances == nil || len(serviceInsances) == 0 { + return []common.URL{} + } + var clonedExportedURLs []common.URL + removeParamSet := gxset.NewSet() + removeParamSet.Add(constant.PID_KEY) + removeParamSet.Add(constant.TIMESTAMP_KEY) + for _, serviceInstance := range serviceInsances { + templateExportURLs := s.getTemplateExportedURLs(url, serviceInstance) + host := serviceInstance.GetHost() + for _, u := range templateExportURLs { + u.RemoveParams(removeParamSet) + port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol)) + if u.Location != host || u.Port != port { + u.Port = port //reset port + u.Location = host //reset host + } + clonedExportedURLs = append(clonedExportedURLs, u) + } + } + return clonedExportedURLs + +} + +type endpoint struct { + Port int `json:"port"` + Protocol string `json:"protocol"` +} + +func getProtocolPort(serviceInstance registry.ServiceInstance, protocol string) int { + md := serviceInstance.GetMetadata() + rawEndpoints := md[constant.SERVICE_INSTANCE_ENDPOINTS] + if len(rawEndpoints) == 0 { + return -1 + } + var endpoints []endpoint + err := json.Unmarshal([]byte(rawEndpoints), &endpoints) + if err != nil { + logger.Errorf("json umarshal rawEndpoints[%s] catch error:%s", rawEndpoints, err.Error()) + return -1 + } + for _, e := range endpoints { + if e.Protocol == protocol { + return e.Port + } + } + return -1 +} +func (s *serviceDiscoveryRegistry) getTemplateExportedURLs(url common.URL, serviceInstance registry.ServiceInstance) []common.URL { + exportedURLs := s.getRevisionExportedURLs(serviceInstance) + if len(exportedURLs) == 0 { + return []common.URL{} + } + return filterSubscribedURLs(url, exportedURLs) +} + +func (s *serviceDiscoveryRegistry) getRevisionExportedURLs(serviceInstance registry.ServiceInstance) []common.URL { + if serviceInstance == nil { + return []common.URL{} + } + serviceName := serviceInstance.GetServiceName() + revision := getExportedServicesRevision(serviceInstance) + s.lock.RLock() + revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName] + if !exist { + return []common.URL{} + } + exportedURLs, exist := revisionExportedURLsMap[revision] + if !exist { + return []common.URL{} + } + s.lock.RUnlock() + // Get a copy from source in order to prevent the caller trying to change the cached data + cloneExportedURLs := make([]common.URL, len(exportedURLs)) + copy(cloneExportedURLs, exportedURLs) + return cloneExportedURLs +} + +func filterSubscribedURLs(subscribedURL common.URL, exportedURLs []common.URL) []common.URL { + var filterExportedURLs []common.URL + for _, url := range exportedURLs { + if url.GetParam(constant.INTERFACE_KEY, url.Path) != subscribedURL.GetParam(constant.INTERFACE_KEY, url.Path) { + break + } + if url.GetParam(constant.VERSION_KEY, "") != subscribedURL.GetParam(constant.VERSION_KEY, "") { + break + } + if url.GetParam(constant.GROUP_KEY, "") != subscribedURL.GetParam(constant.GROUP_KEY, "") { + break + } + if len(subscribedURL.Protocol) != 0 { + if subscribedURL.Protocol != url.Protocol { + break + } + } + filterExportedURLs = append(filterExportedURLs, url) + } + return filterExportedURLs +} + +type InstanceChangeNotify struct { + notify registry.NotifyListener + serviceDiscoveryRegistry *serviceDiscoveryRegistry +} + +func (icn *InstanceChangeNotify) Notify(event registry.ServiceInstancesChangedEvent) { + sdr := icn.serviceDiscoveryRegistry + sdr.subscribe(sdr.url, icn.notify, event.ServiceName, event.Instances) +} diff --git a/registry/service/service_discovery_registry_test.go b/registry/service/service_discovery_registry_test.go new file mode 100644 index 0000000000000000000000000000000000000000..41b681d991258d0616d1898da3eb1b40c0cc12af --- /dev/null +++ b/registry/service/service_discovery_registry_test.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +import ( + "testing" +) + +var ( + SERVICE_INTERFACE = "org.apache.dubbo.metadata.MetadataService" + GROUP = "dubbo-provider" + VERSION = "1.0.0" +) + +func TestServiceDiscoveryRegistry_Register(t *testing.T) { + //registryURL,_:=common.NewURL("in-memory://localhost:12345", + // common.WithParamsValue("registry-type","service"), + // common.WithParamsValue("subscribed-services","a, b , c,d,e ,")) + //url,_:=common.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + + // "?&application=" + GROUP + + // "&interface=" + SERVICE_INTERFACE + + // "&group=" + GROUP + + // "&version=" + VERSION + + // "&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedURLs" + + // "&side=provider") + //registry,err:=newServiceDiscoveryRegistry(®istryURL) + //if err!=nil{ + // logger.Errorf("create service discovery registry catch error:%s",err.Error()) + //} + //assert.Nil(t,err) + //assert.NotNil(t,registry) + //registry.Register(url) + +} diff --git a/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer.go b/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer.go new file mode 100644 index 0000000000000000000000000000000000000000..a7e7a5bc3065008197963f1f566f6a1593c15e58 --- /dev/null +++ b/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/registry/service/synthesizer" + "net/url" + "strings" +) + +func init() { + synthesizer.AddSynthesizer(NewRestSubscribedURLsSynthesizer()) +} + +type RestSubscribedURLsSynthesizer struct { +} + +func (r RestSubscribedURLsSynthesizer) Support(subscribedURL *common.URL) bool { + if "rest" == subscribedURL.Protocol { + return true + } + return false +} + +func (r RestSubscribedURLsSynthesizer) Synthesize(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL { + urls := make([]common.URL, len(serviceInstances), len(serviceInstances)) + for i, s := range serviceInstances { + splitHost := strings.Split(s.GetHost(), ":") + u := common.NewURLWithOptions(common.WithProtocol(subscribedURL.Protocol), common.WithIp(splitHost[0]), + common.WithPort(splitHost[1]), common.WithPath(subscribedURL.GetParam(constant.INTERFACE_KEY, subscribedURL.Path)), + common.WithParams(url.Values{}), + common.WithParamsValue(constant.SIDE_KEY, constant.PROVIDER_PROTOCOL), + common.WithParamsValue(constant.APPLICATION_KEY, s.GetServiceName()), + common.WithParamsValue(constant.REGISTRY_KEY, "true"), + ) + urls[i] = *u + } + return urls +} + +func NewRestSubscribedURLsSynthesizer() RestSubscribedURLsSynthesizer { + return RestSubscribedURLsSynthesizer{} +} diff --git a/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go b/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8ad1b5022785374d4c9b1aeee71b6a941a874106 --- /dev/null +++ b/registry/service/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/registry" + "github.com/stretchr/testify/assert" + "net/url" + "testing" +) + +func TestRestSubscribedURLsSynthesizer_Synthesize(t *testing.T) { + syn := RestSubscribedURLsSynthesizer{} + subUrl, _ := common.NewURL("rest://127.0.0.1:20000/org.apache.dubbo-go.mockService") + instances := []registry.ServiceInstance{ + ®istry.DefaultServiceInstance{ + Id: "test1", + ServiceName: "test1", + Host: "127.0.0.1:80", + Port: 80, + Enable: false, + Healthy: false, + Metadata: nil, + }, + ®istry.DefaultServiceInstance{ + Id: "test2", + ServiceName: "test2", + Host: "127.0.0.2:8081", + Port: 8081, + Enable: false, + Healthy: false, + Metadata: nil, + }, + } + + var expectUrls []common.URL + u1 := common.NewURLWithOptions(common.WithProtocol("rest"), common.WithIp("127.0.0.1"), + common.WithPort("80"), common.WithPath("org.apache.dubbo-go.mockService"), + common.WithParams(url.Values{}), + common.WithParamsValue(constant.SIDE_KEY, constant.PROVIDER_PROTOCOL), + common.WithParamsValue(constant.APPLICATION_KEY, "test1"), + common.WithParamsValue(constant.REGISTRY_KEY, "true")) + u2 := common.NewURLWithOptions(common.WithProtocol("rest"), common.WithIp("127.0.0.2"), + common.WithPort("8081"), common.WithPath("org.apache.dubbo-go.mockService"), + common.WithParams(url.Values{}), + common.WithParamsValue(constant.SIDE_KEY, constant.PROVIDER_PROTOCOL), + common.WithParamsValue(constant.APPLICATION_KEY, "test2"), + common.WithParamsValue(constant.REGISTRY_KEY, "true")) + expectUrls = append(expectUrls, *u1, *u2) + result := syn.Synthesize(&subUrl, instances) + assert.Equal(t, expectUrls, result) +} diff --git a/registry/service/synthesizer/subscribed_urls_synthesizer.go b/registry/service/synthesizer/subscribed_urls_synthesizer.go new file mode 100644 index 0000000000000000000000000000000000000000..f2d5b99f1c6f86714bf9095a3f99d71bf6befd5e --- /dev/null +++ b/registry/service/synthesizer/subscribed_urls_synthesizer.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package synthesizer + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" +) + +type SubscribedURLsSynthesizer interface { + Support(subscribedURL *common.URL) bool + Synthesize(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL +} diff --git a/registry/service/synthesizer/subscribed_urls_synthesizer_factory.go b/registry/service/synthesizer/subscribed_urls_synthesizer_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..f8c76f6e84eb2ceba47481d5f856f6885525f09c --- /dev/null +++ b/registry/service/synthesizer/subscribed_urls_synthesizer_factory.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package synthesizer + +var ( + synthesizers []SubscribedURLsSynthesizer +) + +func AddSynthesizer(synthesizer SubscribedURLsSynthesizer) { + synthesizers = append(synthesizers, synthesizer) +} + +func GetAllSynthesizer() []SubscribedURLsSynthesizer { + return synthesizers +}