Skip to content
Snippets Groups Projects
Commit 74e42986 authored by vito.he's avatar vito.he
Browse files

Mod: upper RegistryDirectory for inherit

parent 5348da4e
No related branches found
No related tags found
No related merge requests found
......@@ -43,10 +43,10 @@ import (
)
func init() {
extension.SetDefaultRegistryDirectory(newRegistryDirectory)
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
}
type registryDirectory struct {
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
listenerLock sync.Mutex
......@@ -61,12 +61,12 @@ type registryDirectory struct {
forbidden atomic.Bool
}
// newRegistryDirectory ...
func newRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
// NewRegistryDirectory ...
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
dir := &registryDirectory{
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
......@@ -80,18 +80,18 @@ func newRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.
}
//subscribe from registry
func (dir *registryDirectory) subscribe(url *common.URL) {
func (dir *RegistryDirectory) subscribe(url *common.URL) {
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}
func (dir *registryDirectory) Notify(event *registry.ServiceEvent) {
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
go dir.update(event)
}
// update: subscribe service from registry, and update the cacheServices
func (dir *registryDirectory) update(res *registry.ServiceEvent) {
func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
if res == nil {
return
}
......@@ -100,7 +100,7 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) {
dir.refreshInvokers(res)
}
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var (
url *common.URL
oldInvoker protocol.Invoker = nil
......@@ -151,7 +151,7 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
}
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
newInvokersList := []protocol.Invoker{}
groupInvokersMap := make(map[string][]protocol.Invoker)
groupInvokersList := []protocol.Invoker{}
......@@ -188,7 +188,7 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
}
// uncacheInvoker: return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
dir.cacheInvokersMap.Delete(url.Key())
......@@ -198,7 +198,7 @@ func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
}
// cacheInvoker: return abandoned Invoker,if no Invoker to be abandoned,return nil
func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
......@@ -234,7 +234,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
}
// list :select the protocol invokers from the directory
func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
invokers := dir.cacheInvokers
routerChain := dir.RouterChain()
......@@ -244,7 +244,7 @@ func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.In
return routerChain.Route(invokers, dir.cacheOriginUrl, invocation)
}
func (dir *registryDirectory) IsAvailable() bool {
func (dir *RegistryDirectory) IsAvailable() bool {
if !dir.BaseDirectory.IsAvailable() {
return dir.BaseDirectory.IsAvailable()
}
......@@ -258,7 +258,7 @@ func (dir *registryDirectory) IsAvailable() bool {
return false
}
func (dir *registryDirectory) Destroy() {
func (dir *RegistryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
invokers := dir.cacheInvokers
......@@ -269,7 +269,7 @@ func (dir *registryDirectory) Destroy() {
})
}
func (dir *registryDirectory) overrideUrl(targetUrl *common.URL) {
func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
doOverrideUrl(dir.configurators, targetUrl)
doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
......@@ -283,11 +283,11 @@ func doOverrideUrl(configurators []config_center.Configurator, targetUrl *common
type referenceConfigurationListener struct {
registry.BaseConfigurationListener
directory *registryDirectory
directory *RegistryDirectory
url *common.URL
}
func newReferenceConfigurationListener(dir *registryDirectory, url *common.URL) *referenceConfigurationListener {
func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
listener := &referenceConfigurationListener{directory: dir, url: url}
listener.InitWith(
url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
......@@ -305,10 +305,10 @@ func (l *referenceConfigurationListener) Process(event *config_center.ConfigChan
type consumerConfigurationListener struct {
registry.BaseConfigurationListener
listeners []registry.NotifyListener
directory *registryDirectory
directory *RegistryDirectory
}
func newConsumerConfigurationListener(dir *registryDirectory) *consumerConfigurationListener {
func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigurationListener {
listener := &consumerConfigurationListener{directory: dir}
listener.InitWith(
config.GetConsumerConfig().ApplicationConfig.Name+constant.CONFIGURATORS_SUFFIX,
......
......@@ -66,7 +66,7 @@ func TestSubscribe(t *testing.T) {
func TestSubscribe_InvalidUrl(t *testing.T) {
url, _ := common.NewURL("mock://127.0.0.1:1111")
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
_, err := newRegistryDirectory(&url, mockRegistry)
_, err := NewRegistryDirectory(&url, mockRegistry)
assert.Error(t, err)
}
......@@ -79,9 +79,9 @@ func TestSubscribe_Group(t *testing.T) {
suburl.SetParam(constant.CLUSTER_KEY, "mock")
regurl.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
dir, _ := newRegistryDirectory(&regurl, mockRegistry)
dir, _ := NewRegistryDirectory(&regurl, mockRegistry)
go dir.(*registryDirectory).subscribe(common.NewURLWithOptions(common.WithPath("testservice")))
go dir.(*RegistryDirectory).subscribe(common.NewURLWithOptions(common.WithPath("testservice")))
//for group1
urlmap := url.Values{}
urlmap.Set(constant.GROUP_KEY, "group1")
......@@ -100,7 +100,7 @@ func TestSubscribe_Group(t *testing.T) {
}
time.Sleep(1e9)
assert.Len(t, dir.(*registryDirectory).cacheInvokers, 2)
assert.Len(t, dir.(*RegistryDirectory).cacheInvokers, 2)
}
func Test_Destroy(t *testing.T) {
......@@ -172,7 +172,7 @@ Loop1:
}
func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockRegistry) {
func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
url, _ := common.NewURL("mock://127.0.0.1:1111")
......@@ -184,9 +184,9 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR
)
url.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
dir, _ := newRegistryDirectory(&url, mockRegistry)
dir, _ := NewRegistryDirectory(&url, mockRegistry)
go dir.(*registryDirectory).subscribe(&suburl)
go dir.(*RegistryDirectory).subscribe(&suburl)
if len(noMockEvent) == 0 {
for i := 0; i < 3; i++ {
mockRegistry.(*registry.MockRegistry).MockEvent(
......@@ -200,5 +200,5 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR
)
}
}
return dir.(*registryDirectory), mockRegistry.(*registry.MockRegistry)
return dir.(*RegistryDirectory), mockRegistry.(*registry.MockRegistry)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment