diff --git a/common/extension/service_instance_selector_factory.go b/common/extension/service_instance_selector_factory.go index d767e0a7dce9300777af1c5ef647c22ba8d7b374..3ba3db46e65ed01a5417075d998209ae39df38ec 100644 --- a/common/extension/service_instance_selector_factory.go +++ b/common/extension/service_instance_selector_factory.go @@ -18,10 +18,13 @@ package extension import ( - "github.com/apache/dubbo-go/registry/servicediscovery/instance" perrors "github.com/pkg/errors" ) +import ( + "github.com/apache/dubbo-go/registry/servicediscovery/instance" +) + var ( serviceInstanceSelectorMappings = make(map[string]func() instance.ServiceInstanceSelector) ) diff --git a/go.sum b/go.sum index 326b4e68974fe8a851f73d27fbc7e14ee1045c19..304025f27087dd66c409592c0a36c1a893ac4d60 100644 --- a/go.sum +++ b/go.sum @@ -35,11 +35,8 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vaj github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0= -github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= -github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s= github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= @@ -53,7 +50,6 @@ github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f h1:/8NcnxL6 github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.15.24 h1:xLAdTA/ore6xdPAljzZRed7IGqQgC+nY+ERS5vaj4Ro= github.com/aws/aws-sdk-go v1.15.24/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= -github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/registry/event_listener.go b/registry/event_listener.go index 23da690e55cba3364efff745e410fb8b8f3909de..34fd81de74aba98dddcce9e99afbd858f47112ee 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -25,25 +25,30 @@ import ( "github.com/apache/dubbo-go/common/observer" ) +//The Service Discovery Changed Event Listener type ServiceInstancesChangedListener struct { ServiceName string observer.ConditionalEventListener ChangedNotify ChangedNotify } +//On ServiceInstancesChangedEvent the service instances change event func (sicl *ServiceInstancesChangedListener) OnEvent(e ServiceInstancesChangedEvent) error { sicl.ChangedNotify.Notify(e) return nil } +//get listener priority func (sicl *ServiceInstancesChangedListener) GetPriority() int { return -1 } +//get event type func (sicl *ServiceInstancesChangedListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceInstancesChangedEvent{}) } +//If service name matches,return true or false func (sicl *ServiceInstancesChangedListener) Accept(e ServiceInstancesChangedEvent) bool { return e.ServiceName == sicl.ServiceName } diff --git a/registry/servicediscovery/instance/random/random_service_instance_selector.go b/registry/servicediscovery/instance/random/random_service_instance_selector.go index 1efbbc3fcbba81615a296e05c85441fd999a7e1f..3f8f30dc8e9e91f9c75f8ff0611c98bb2f0c7b85 100644 --- a/registry/servicediscovery/instance/random/random_service_instance_selector.go +++ b/registry/servicediscovery/instance/random/random_service_instance_selector.go @@ -17,19 +17,23 @@ package random +import ( + "math/rand" + "time" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry/servicediscovery/instance" - "math/rand" - "time" ) func init() { extension.SetServiceInstanceSelector("random", NewRandomServiceInstanceSelector) } +//the ServiceInstanceSelector implementation based on Random algorithm type RandomServiceInstanceSelector struct { } diff --git a/registry/servicediscovery/instance/random/random_service_instance_selector_test.go b/registry/servicediscovery/instance/random/random_service_instance_selector_test.go index ed4b7838fca9ba30600cd7f0f684017598feee95..cddeb42c904131cdc6a62e5142de850410a3ec5a 100644 --- a/registry/servicediscovery/instance/random/random_service_instance_selector_test.go +++ b/registry/servicediscovery/instance/random/random_service_instance_selector_test.go @@ -17,11 +17,17 @@ package random +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/registry" - "github.com/stretchr/testify/assert" - "testing" ) func TestRandomServiceInstanceSelector_Select(t *testing.T) { diff --git a/registry/servicediscovery/instance/service_instance_selector.go b/registry/servicediscovery/instance/service_instance_selector.go index acacde5617342513e490c78fab6363142970f320..82fb3458be2838e9a5780e95be71aa89039b664f 100644 --- a/registry/servicediscovery/instance/service_instance_selector.go +++ b/registry/servicediscovery/instance/service_instance_selector.go @@ -23,5 +23,6 @@ import ( ) type ServiceInstanceSelector interface { + //Select an instance of ServiceInstance by the specified ServiceInstance service instances Select(url common.URL, serviceInstances []registry.ServiceInstance) registry.ServiceInstance } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index c110eb22828330adc42cfa2f027dc7b55e6071fe..0adbee203bedb5d26b46fed0cfe0af6ccdb80903 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -20,7 +20,17 @@ package servicediscovery import ( "bytes" "encoding/json" + "strconv" + "strings" + "sync" +) + +import ( cm "github.com/Workiva/go-datastructures/common" + gxset "github.com/dubbogo/gost/container/set" +) + +import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -32,10 +42,6 @@ import ( "github.com/apache/dubbo-go/registry/servicediscovery/proxy" "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" "github.com/apache/dubbo-go/remoting" - gxset "github.com/dubbogo/gost/container/set" - "strconv" - "strings" - "sync" ) const ( @@ -53,14 +59,17 @@ func init() { // 1. when we registry the service, we should create the mapping from service name to application name // 2. when we sub type serviceDiscoveryRegistry struct { - lock sync.RWMutex - url *common.URL - serviceDiscovery registry.ServiceDiscovery - subscribedServices *gxset.HashSet - serviceNameMapping mapping.ServiceNameMapping - metaDataService service.MetadataService - registeredListeners *gxset.HashSet - subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer + lock sync.RWMutex + url *common.URL + serviceDiscovery registry.ServiceDiscovery + subscribedServices *gxset.HashSet + serviceNameMapping mapping.ServiceNameMapping + metaDataService service.MetadataService + //cache the registered listen + registeredListeners *gxset.HashSet + //all synthesize + subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer + //cache exported urls, serviceName->revision->[]URL serviceRevisionExportedURLsCache map[string]map[string][]common.URL } @@ -104,18 +113,22 @@ func parseServices(literalServices string) *gxset.HashSet { return set } +//GetServiceDiscovery for get serviceDiscovery of the registry func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery { return s.serviceDiscovery } +//GetUrl for get url of the registry func (s *serviceDiscoveryRegistry) GetUrl() common.URL { return *s.url } +//IsAvailable for make sure is't available func (s *serviceDiscoveryRegistry) IsAvailable() bool { return true } +//Destroy for destroy graceful down func (s *serviceDiscoveryRegistry) Destroy() { err := s.serviceDiscovery.Destroy() if err != nil { @@ -149,6 +162,7 @@ func shouldRegister(url common.URL) bool { return false } +//Subscribe for listen the change of services that from the exported url func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) { if !shouldSubscribe(*url) { return @@ -347,17 +361,17 @@ func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registr } func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { - s.lock.Lock() // 1. expunge stale s.expungeStaleRevisionExportedURLs(serviceInstances) // 2. Initialize s.initRevisionExportedURLs(serviceInstances) - s.lock.Unlock() } func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { serviceName := serviceInstances[0].GetServiceName() + s.lock.Lock() revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName] + s.lock.Unlock() if !exist { return } @@ -368,7 +382,7 @@ func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInsta currentRevision := gxset.NewSet() for _, s := range serviceInstances { rv := getExportedServicesRevision(s) - if len(rv) != 0 { + if len(rv) > 0 { currentRevision.Add(rv) } } @@ -424,7 +438,9 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc } serviceName := serviceInstance.GetServiceName() revision := getExportedServicesRevision(serviceInstance) + s.lock.Lock() revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName] + s.lock.Unlock() revisionExportedURLs := revisionExportedURLsMap[revision] firstGet := false if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 { diff --git a/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer.go b/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer.go index e09dfd6be7a02f6a32c56383b7fbd9a42c16e188..086a26de58f8472e35e07a8a174fdee86afa82f2 100644 --- a/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer.go +++ b/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer.go @@ -17,19 +17,23 @@ package rest +import ( + "net/url" + "strings" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" - "net/url" - "strings" ) func init() { synthesizer.AddSynthesizer(NewRestSubscribedURLsSynthesizer()) } +//SubscribedURLsSynthesizer implementation for rest protocol type RestSubscribedURLsSynthesizer struct { } diff --git a/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go b/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go index 8ad1b5022785374d4c9b1aeee71b6a941a874106..b52cc2323d6f9ae1bca8cfd1a4c5217af5e25f12 100644 --- a/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go +++ b/registry/servicediscovery/synthesizer/rest/rest_subscribed_urls_synthesizer_test.go @@ -17,13 +17,19 @@ package rest +import ( + "net/url" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/registry" - "github.com/stretchr/testify/assert" - "net/url" - "testing" ) func TestRestSubscribedURLsSynthesizer_Synthesize(t *testing.T) { diff --git a/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go index f2d5b99f1c6f86714bf9095a3f99d71bf6befd5e..a7d2f3ee98f20bf16437008135e4964195680abb 100644 --- a/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go +++ b/registry/servicediscovery/synthesizer/subscribed_urls_synthesizer.go @@ -23,6 +23,8 @@ import ( ) type SubscribedURLsSynthesizer interface { + //Supports the synthesis of the subscribed url or not Support(subscribedURL *common.URL) bool + //synthesize the subscribed url Synthesize(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL }