Skip to content
Snippets Groups Projects
Unverified Commit c979fcaa authored by vito.he's avatar vito.he Committed by GitHub
Browse files

Merge pull request #466 from hxmhlt/extension_directory

Add:registry directory extension
parents ae5ec564 a4e9a020
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
)
type registryDirectory func(url *common.URL, registry registry.Registry) (cluster.Directory, error)
var defaultRegistry registryDirectory
// SetDefaultRegistryDirectory ...
func SetDefaultRegistryDirectory(v registryDirectory) {
defaultRegistry = v
}
// GetDefaultRegistryDirectory ...
func GetDefaultRegistryDirectory(config *common.URL, registry registry.Registry) (cluster.Directory, error) {
if defaultRegistry == nil {
panic("registry directory is not existing, make sure you have import the package.")
}
return defaultRegistry(config, registry)
}
......@@ -19,7 +19,6 @@ package directory
import (
"sync"
"time"
)
import (
......@@ -28,6 +27,7 @@ import (
)
import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
......@@ -42,15 +42,11 @@ import (
"github.com/apache/dubbo-go/remoting"
)
// Options ...
type Options struct {
serviceTTL time.Duration
func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
}
// Option ...
type Option func(*Options)
type registryDirectory struct {
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
listenerLock sync.Mutex
......@@ -61,48 +57,41 @@ type registryDirectory struct {
configurators []config_center.Configurator
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
Options
serviceKey string
forbidden atomic.Bool
serviceKey string
forbidden atomic.Bool
}
// NewRegistryDirectory ...
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
}
for _, opt := range opts {
opt(&options)
}
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
dir := &registryDirectory{
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
Options: options,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
go dir.subscribe(url.SubURL)
return dir, nil
}
//subscribe from registry
func (dir *registryDirectory) Subscribe(url *common.URL) {
func (dir *RegistryDirectory) subscribe(url *common.URL) {
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}
func (dir *registryDirectory) Notify(event *registry.ServiceEvent) {
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
go dir.update(event)
}
//subscribe service from registry, and update the cacheServices
func (dir *registryDirectory) update(res *registry.ServiceEvent) {
// update the cacheServices and subscribe service from registry
func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
if res == nil {
return
}
......@@ -111,7 +100,7 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
dir.refreshInvokers(res)
}
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var (
url *common.URL
oldInvoker protocol.Invoker = nil
......@@ -162,7 +151,7 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
}
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
newInvokersList := []protocol.Invoker{}
groupInvokersMap := make(map[string][]protocol.Invoker)
groupInvokersList := []protocol.Invoker{}
......@@ -198,8 +187,8 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}
// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
......@@ -208,8 +197,8 @@ func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
return nil
}
// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
......@@ -244,8 +233,8 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
return nil
}
//select the protocol invokers from the directory
func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
invokers := dir.cacheInvokers
routerChain := dir.RouterChain()
......@@ -255,7 +244,7 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.In
return routerChain.Route(invokers, dir.cacheOriginUrl, invocation)
}
func (dir *registryDirectory) IsAvailable() bool {
func (dir *RegistryDirectory) IsAvailable() bool {
if !dir.BaseDirectory.IsAvailable() {
return dir.BaseDirectory.IsAvailable()
}
......@@ -269,7 +258,7 @@ func (dir *registryDirectory) IsAvailable() bool {
return false
}
func (dir *registryDirectory) Destroy() {
func (dir *RegistryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
invokers := dir.cacheInvokers
......@@ -280,7 +269,7 @@ func (dir *registryDirectory) Destroy() {
})
}
func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) {
func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
doOverrideUrl(dir.configurators, targetUrl)
doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
......@@ -294,11 +283,11 @@ func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common
type referenceConfigurationListener struct {
registry.BaseConfigurationListener
directory *registryDirectory
directory *RegistryDirectory
url *common.URL
}
func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener {
func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
listener := &referenceConfigurationListener{directory: dir, url: url}
listener.InitWith(
url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
......@@ -316,10 +305,10 @@ func (l *referenceConfigurationListener) Process(event *config_center.ConfigChan
type consumerConfigurationListener struct {
registry.BaseConfigurationListener
listeners []registry.NotifyListener
directory *registryDirectory
directory *RegistryDirectory
}
func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener {
func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener {
listener := &consumerConfigurationListener{directory: dir}
listener.InitWith(
config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
......
......@@ -79,10 +79,9 @@ func TestSubscribe_Group(t *testing.T) {
suburl.SetParam(constant.CLUSTER_KEY, "mock")
regurl.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
registryDirectory, _ := NewRegistryDirectory(&regurl, mockRegistry)
go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice")))
dir, _ := NewRegistryDirectory(&regurl, mockRegistry)
go dir.(*RegistryDirectory).subscribe(common.NewURLWithOptions(common.WithPath("testservice")))
//for group1
urlmap := url.Values{}
urlmap.Set(constant.GROUP_KEY, "group1")
......@@ -101,7 +100,7 @@ func TestSubscribe_Group(t *testing.T) {
}
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
assert.Len(t, dir.(*RegistryDirectory).cacheInvokers, 2)
}
func Test_Destroy(t *testing.T) {
......@@ -173,7 +172,7 @@ Loop1:
}
func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockRegistry) {
func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
url, _ := common.NewURL("mock://127.0.0.1:1111")
......@@ -185,9 +184,9 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR
)
url.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry)
dir, _ := NewRegistryDirectory(&url, mockRegistry)
go registryDirectory.Subscribe(&suburl)
go dir.(*RegistryDirectory).subscribe(&suburl)
if len(noMockEvent) == 0 {
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(
......@@ -201,5 +200,5 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR
)
}
}
return registryDirectory, mockRegistry.(*registry.MockRegistry)
return dir.(*RegistryDirectory), mockRegistry.(*registry.MockRegistry)
}
......@@ -39,7 +39,7 @@ import (
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/protocolwrapper"
"github.com/apache/dubbo-go/registry"
directory2 "github.com/apache/dubbo-go/registry/directory"
_ "github.com/apache/dubbo-go/registry/directory"
"github.com/apache/dubbo-go/remoting"
)
......@@ -112,7 +112,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
}
//new registry directory for store service url from registry
directory, err := directory2.NewRegistryDirectory(&registryUrl, reg)
directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
......@@ -124,7 +124,6 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
logger.Errorf("consumer service %v register registry %v error, error message is %s",
serviceUrl.String(), registryUrl.String(), err.Error())
}
go directory.Subscribe(serviceUrl)
//new cluster invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment