diff --git a/common/extension/registry_directory.go b/common/extension/registry_directory.go new file mode 100644 index 0000000000000000000000000000000000000000..1d34add5a6995e5ca62b60ab5c3154362f23e9ba --- /dev/null +++ b/common/extension/registry_directory.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" +) + +var defaultRegistry func(url *common.URL, registry registry.Registry) (cluster.Directory, error) + +// SetDefaultRegistryDirectory ... +func SetDefaultRegistryDirectory(v func(url *common.URL, registry registry.Registry) (cluster.Directory, error)) { + defaultRegistry = v +} + +// GetDefaultRegistryDirectory ... +func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) { + if defaultRegistry == nil { + panic("registry directory is not existing, make sure you have import the package.") + } + return defaultRegistry(config, registry) +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 20be268d7401976ef1b7884f2a7bd40eeacb8158..20724c37373fa2291a91218982dada9e437bee96 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -19,7 +19,6 @@ package directory import ( "sync" - "time" ) import ( @@ -28,6 +27,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/cluster/directory" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" @@ -42,14 +42,10 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// Options ... -type Options struct { - serviceTTL time.Duration +func init() { + extension.SetDefaultRegistryDirectory(newRegistryDirectory) } -// Option ... -type Option func(*Options) - type registryDirectory struct { directory.BaseDirectory cacheInvokers []protocol.Invoker @@ -61,20 +57,12 @@ type registryDirectory struct { configurators []config_center.Configurator consumerConfigurationListener *consumerConfigurationListener referenceConfigurationListener *referenceConfigurationListener - Options - serviceKey string - forbidden atomic.Bool + serviceKey string + forbidden atomic.Bool } -// NewRegistryDirectory ... -func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) { - options := Options{ - //default 300s - serviceTTL: time.Duration(300e9), - } - for _, opt := range opts { - opt(&options) - } +// newRegistryDirectory ... +func newRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) { if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } @@ -84,14 +72,15 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), registry: registry, - Options: options, } dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) + + go dir.subscribe(url.SubURL) return dir, nil } //subscribe from registry -func (dir *registryDirectory) Subscribe(url *common.URL) { +func (dir *registryDirectory) subscribe(url *common.URL) { dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) dir.registry.Subscribe(url, dir) @@ -101,7 +90,7 @@ func (dir *registryDirectory) Notify(event *registry.ServiceEvent) { go dir.update(event) } -//subscribe service from registry, and update the cacheServices +// update: subscribe service from registry, and update the cacheServices func (dir *registryDirectory) update(res *registry.ServiceEvent) { if res == nil { return @@ -198,7 +187,7 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +// uncacheInvoker: return abandoned Invoker,if no Invoker to be abandoned,return nil func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { @@ -208,7 +197,7 @@ func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { return nil } -// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +// cacheInvoker: return abandoned Invoker,if no Invoker to be abandoned,return nil func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -244,7 +233,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { return nil } -//select the protocol invokers from the directory +// list :select the protocol invokers from the directory func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { invokers := dir.cacheInvokers routerChain := dir.RouterChain() diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 0dde44e73c18f65f262e01f499e198995907dece..9a6efa455763c0d3db1c2f01b307fd650d99af35 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -66,7 +66,7 @@ func TestSubscribe(t *testing.T) { func TestSubscribe_InvalidUrl(t *testing.T) { url, _ := common.NewURL("mock://127.0.0.1:1111") mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - _, err := NewRegistryDirectory(&url, mockRegistry) + _, err := newRegistryDirectory(&url, mockRegistry) assert.Error(t, err) } @@ -79,10 +79,9 @@ func TestSubscribe_Group(t *testing.T) { suburl.SetParam(constant.CLUSTER_KEY, "mock") regurl.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) - - go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) + dir, _ := newRegistryDirectory(®url, mockRegistry) + go dir.(*registryDirectory).subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) //for group1 urlmap := url.Values{} urlmap.Set(constant.GROUP_KEY, "group1") @@ -101,7 +100,7 @@ func TestSubscribe_Group(t *testing.T) { } time.Sleep(1e9) - assert.Len(t, registryDirectory.cacheInvokers, 2) + assert.Len(t, dir.(*registryDirectory).cacheInvokers, 2) } func Test_Destroy(t *testing.T) { @@ -185,9 +184,9 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR ) url.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) + dir, _ := newRegistryDirectory(&url, mockRegistry) - go registryDirectory.Subscribe(&suburl) + go dir.(*registryDirectory).subscribe(&suburl) if len(noMockEvent) == 0 { for i := 0; i < 3; i++ { mockRegistry.(*registry.MockRegistry).MockEvent( @@ -201,5 +200,5 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR ) } } - return registryDirectory, mockRegistry.(*registry.MockRegistry) + return dir.(*registryDirectory), mockRegistry.(*registry.MockRegistry) } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index a7678ba4e2f38cfeb77f202103e03066a7efdbef..591d4986cfaa6c612a32bd88621cff0e552b11c5 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -39,7 +39,7 @@ import ( "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/protocolwrapper" "github.com/apache/dubbo-go/registry" - directory2 "github.com/apache/dubbo-go/registry/directory" + _ "github.com/apache/dubbo-go/registry/directory" "github.com/apache/dubbo-go/remoting" ) @@ -111,7 +111,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } //new registry directory for store service url from registry - directory, err := directory2.NewRegistryDirectory(®istryUrl, reg) + directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) @@ -123,7 +123,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) } - go directory.Subscribe(serviceUrl) //new cluster invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))