diff --git a/common/constant/key.go b/common/constant/key.go index 1479af2305b1ce4280c2aa7f4016e314ac358513..d9413fcc9e24a857cdb398cc6fb96074ffef31b4 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -223,6 +223,7 @@ const ( KEY_SEPARATOR = ":" DEFAULT_PATH_TAG = "metadata" KEY_REVISON_PREFIX = "revision" + PATH_SEPARATOR = "/" // metadata service METADATA_SERVICE_NAME = "org.apache.dubbo.metadata.MetadataService" diff --git a/common/extension/registry_directory.go b/common/extension/registry_directory.go new file mode 100644 index 0000000000000000000000000000000000000000..6b92189c4e98b391a90e6e71a68d51a252eede2a --- /dev/null +++ b/common/extension/registry_directory.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" +) + +type registryDirectory func(url *common.URL, registry registry.Registry) (cluster.Directory, error) + +var defaultRegistry registryDirectory + +// SetDefaultRegistryDirectory ... +func SetDefaultRegistryDirectory(v registryDirectory) { + defaultRegistry = v +} + +// GetDefaultRegistryDirectory ... +func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) { + if defaultRegistry == nil { + panic("registry directory is not existing, make sure you have import the package.") + } + return defaultRegistry(config, registry) +} diff --git a/go.mod b/go.mod index 54d532eac06a56057f815e3c8e91fdd267c9c6ad..b77fd3edae0f5b37392c37b6191468f03d2f44bb 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/Workiva/go-datastructures v1.0.50 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect - github.com/apache/dubbo-go-hessian2 v1.4.0 + github.com/apache/dubbo-go-hessian2 v1.5.0 github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible @@ -12,7 +12,7 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/creasty/defaults v1.3.0 - github.com/dubbogo/getty v1.3.3 + github.com/dubbogo/getty v1.3.4 github.com/dubbogo/go-zookeeper v1.0.0 github.com/dubbogo/gost v1.8.0 github.com/emicklei/go-restful/v3 v3.0.0 diff --git a/go.sum b/go.sum index e499992eb0a0335ca0a1e1f746caca3418af7655..e0e1c7a9ff49221d4c2657ee7d8cb6fb7e6991af 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/apache/dubbo-go-hessian2 v1.4.0 h1:Cb9FQVTy3G93dnDr7P93U8DeKFYpDTJjQp44JG5TafA= github.com/apache/dubbo-go-hessian2 v1.4.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8= +github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s= +github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -102,12 +104,15 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/digitalocean/godo v1.1.1/go.mod h1:h6faOIcZ8lWIwNQ+DN7b3CgX4Kwby5T+nbpNqkUIozU= github.com/digitalocean/godo v1.10.0 h1:uW1/FcvZE/hoixnJcnlmIUvTVNdZCLjRLzmDtRi1xXY= github.com/digitalocean/godo v1.10.0/go.mod h1:h6faOIcZ8lWIwNQ+DN7b3CgX4Kwby5T+nbpNqkUIozU= +github.com/divebomb/dubbo-go v1.0.0 h1:QsQxD6UU2WbcaA8YCxU9stiuPUsVCPabFg8hhKGJR8A= github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF+n1M6o= github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dubbogo/getty v1.3.3 h1:8m4zZBqFHO+NmhH7rMPlFuuYRVjcPD7cUhumevqMZZs= github.com/dubbogo/getty v1.3.3/go.mod h1:U92BDyJ6sW9Jpohr2Vlz8w2uUbIbNZ3d+6rJvFTSPp0= +github.com/dubbogo/getty v1.3.4 h1:5TvH213pnSIKYzY7IK8TT/r6yr5uPTB/U6YNLT+GsU0= +github.com/dubbogo/getty v1.3.4/go.mod h1:36f+gH/ekaqcDWKbxNBQk9b9HXcGtaI6YHxp4YTntX8= github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM= github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c= github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= @@ -326,6 +331,7 @@ github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/errors v0.0.0-20190930114154-d42613fe1ab9/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/keybase/go-crypto v0.0.0-20180614160407-5114a9a81e1b h1:VE6r2OwP5gj+Z9aCkSKl3MlmnZbfMAjhvR5T7abKHEo= github.com/keybase/go-crypto v0.0.0-20180614160407-5114a9a81e1b/go.mod h1:ghbZscTyKdM07+Fw3KSi0hcJm+AlEUWj8QLlPtijN/M= diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 20be268d7401976ef1b7884f2a7bd40eeacb8158..552aa57061c99bf92ff986b6e672743ebb375e76 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -19,7 +19,6 @@ package directory import ( "sync" - "time" ) import ( @@ -28,6 +27,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -42,15 +42,11 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// Options ... -type Options struct { - serviceTTL time.Duration +func init() { + extension.SetDefaultRegistryDirectory(NewRegistryDirectory) } -// Option ... -type Option func(*Options) - -type registryDirectory struct { +type RegistryDirectory struct { directory.BaseDirectory cacheInvokers []protocol.Invoker listenerLock sync.Mutex @@ -61,48 +57,41 @@ type registryDirectory struct { configurators []config_center.Configurator consumerConfigurationListener *consumerConfigurationListener referenceConfigurationListener *referenceConfigurationListener - Options - serviceKey string - forbidden atomic.Bool + serviceKey string + forbidden atomic.Bool } // NewRegistryDirectory ... -func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) { - options := Options{ - //default 300s - serviceTTL: time.Duration(300e9), - } - for _, opt := range opts { - opt(&options) - } +func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) { if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } - dir := ®istryDirectory{ + dir := &RegistryDirectory{ BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), registry: registry, - Options: options, } dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) + + go dir.subscribe(url.SubURL) return dir, nil } //subscribe from registry -func (dir *registryDirectory) Subscribe(url *common.URL) { +func (dir *RegistryDirectory) subscribe(url *common.URL) { dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) dir.registry.Subscribe(url, dir) } -func (dir *registryDirectory) Notify(event *registry.ServiceEvent) { +func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) { go dir.update(event) } -//subscribe service from registry, and update the cacheServices -func (dir *registryDirectory) update(res *registry.ServiceEvent) { +// update the cacheServices and subscribe service from registry +func (dir *RegistryDirectory) update(res *registry.ServiceEvent) { if res == nil { return } @@ -111,7 +100,7 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { dir.refreshInvokers(res) } -func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { +func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { var ( url *common.URL oldInvoker protocol.Invoker = nil @@ -162,7 +151,7 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { } -func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { +func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { newInvokersList := []protocol.Invoker{} groupInvokersMap := make(map[string][]protocol.Invoker) groupInvokersList := []protocol.Invoker{} @@ -198,8 +187,8 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil -func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { +// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { dir.cacheInvokersMap.Delete(url.Key()) @@ -208,8 +197,8 @@ func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { return nil } -// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil -func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { +// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -244,8 +233,8 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { return nil } -//select the protocol invokers from the directory -func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { +// List selected protocol invokers from the directory +func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { invokers := dir.cacheInvokers routerChain := dir.RouterChain() @@ -255,7 +244,7 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.In return routerChain.Route(invokers, dir.cacheOriginUrl, invocation) } -func (dir *registryDirectory) IsAvailable() bool { +func (dir *RegistryDirectory) IsAvailable() bool { if !dir.BaseDirectory.IsAvailable() { return dir.BaseDirectory.IsAvailable() } @@ -269,7 +258,7 @@ func (dir *registryDirectory) IsAvailable() bool { return false } -func (dir *registryDirectory) Destroy() { +func (dir *RegistryDirectory) Destroy() { //TODO:unregister & unsubscribe dir.BaseDirectory.Destroy(func() { invokers := dir.cacheInvokers @@ -280,7 +269,7 @@ func (dir *registryDirectory) Destroy() { }) } -func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) { +func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) { doOverrideUrl(dir.configurators, targetUrl) doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl) doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl) @@ -294,11 +283,11 @@ func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common type referenceConfigurationListener struct { registry.BaseConfigurationListener - directory *registryDirectory + directory *RegistryDirectory url *common.URL } -func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener { +func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener { listener := &referenceConfigurationListener{directory: dir, url: url} listener.InitWith( url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX, @@ -316,10 +305,10 @@ func (l *referenceConfigurationListener) Process(event *config_center.ConfigChan type consumerConfigurationListener struct { registry.BaseConfigurationListener listeners []registry.NotifyListener - directory *registryDirectory + directory *RegistryDirectory } -func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener { +func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener { listener := &consumerConfigurationListener{directory: dir} listener.InitWith( config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX, diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 0dde44e73c18f65f262e01f499e198995907dece..f1d5ce434aa00185f784f208eefe603274f05ab0 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -79,10 +79,9 @@ func TestSubscribe_Group(t *testing.T) { suburl.SetParam(constant.CLUSTER_KEY, "mock") regurl.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) - - go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) + dir, _ := NewRegistryDirectory(®url, mockRegistry) + go dir.(*RegistryDirectory).subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) //for group1 urlmap := url.Values{} urlmap.Set(constant.GROUP_KEY, "group1") @@ -101,7 +100,7 @@ func TestSubscribe_Group(t *testing.T) { } time.Sleep(1e9) - assert.Len(t, registryDirectory.cacheInvokers, 2) + assert.Len(t, dir.(*RegistryDirectory).cacheInvokers, 2) } func Test_Destroy(t *testing.T) { @@ -173,7 +172,7 @@ Loop1: } -func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockRegistry) { +func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) { extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) url, _ := common.NewURL("mock://127.0.0.1:1111") @@ -185,9 +184,9 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR ) url.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) + dir, _ := NewRegistryDirectory(&url, mockRegistry) - go registryDirectory.Subscribe(&suburl) + go dir.(*RegistryDirectory).subscribe(&suburl) if len(noMockEvent) == 0 { for i := 0; i < 3; i++ { mockRegistry.(*registry.MockRegistry).MockEvent( @@ -201,5 +200,5 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR ) } } - return registryDirectory, mockRegistry.(*registry.MockRegistry) + return dir.(*RegistryDirectory), mockRegistry.(*registry.MockRegistry) } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index f9e9cd6b9e0c7b84d456fab0ecc9f398e29e54c3..52a7dcbfc77fd576ef8d2917ce51cc09f3cd0b97 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -39,7 +39,7 @@ import ( "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" - directory2 "github.com/apache/dubbo-go/registry/directory" + _ "github.com/apache/dubbo-go/registry/directory" "github.com/apache/dubbo-go/remoting" ) @@ -124,7 +124,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } //new registry directory for store service url from registry - directory, err := directory2.NewRegistryDirectory(®istryUrl, reg) + directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) @@ -136,7 +136,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) } - go directory.Subscribe(serviceUrl) //new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))