Skip to content
Snippets Groups Projects
Commit ba216117 authored by xianlezheng's avatar xianlezheng Committed by GitHub
Browse files

Merge pull request #8 from flycash/2.7.5-bk

2.7.5 bk-will fixs bug :(
parents a6056d3e b752e410
No related branches found
No related tags found
No related merge requests found
Showing
with 163 additions and 127 deletions
...@@ -55,7 +55,7 @@ func SetAndInitGlobalDispatcher(name string) { ...@@ -55,7 +55,7 @@ func SetAndInitGlobalDispatcher(name string) {
if dp, ok := dispatchers[name]; !ok || dp == nil { if dp, ok := dispatchers[name]; !ok || dp == nil {
panic("EventDispatcher for " + name + " is not found, make sure you have import the package, " + panic("EventDispatcher for " + name + " is not found, make sure you have import the package, " +
"like github.com/apache/dubbo-go/common/observer/dispatcher ") "like import _ github.com/apache/dubbo-go/common/observer/dispatcher ")
} }
globalEventDispatcher = dispatchers[name]() globalEventDispatcher = dispatchers[name]()
} }
......
...@@ -42,7 +42,7 @@ func SetServiceDiscovery(protocol string, creator func(name string) (registry.Se ...@@ -42,7 +42,7 @@ func SetServiceDiscovery(protocol string, creator func(name string) (registry.Se
func GetServiceDiscovery(protocol string, name string) (registry.ServiceDiscovery, error) { func GetServiceDiscovery(protocol string, name string) (registry.ServiceDiscovery, error) {
creator, ok := discoveryCreatorMap[protocol] creator, ok := discoveryCreatorMap[protocol]
if !ok { if !ok {
return nil, perrors.New("Could not find the service discovery with name: " + name) return nil, perrors.New("Could not find the service discovery with discovery protocol: " + protocol)
} }
return creator(name) return creator(name)
} }
...@@ -345,6 +345,9 @@ func suiteMethod(method reflect.Method) *MethodType { ...@@ -345,6 +345,9 @@ func suiteMethod(method reflect.Method) *MethodType {
// this method is in RPCService // this method is in RPCService
// we force users must implement RPCService interface in their provider // we force users must implement RPCService interface in their provider
// and RPCService has only one method "Reference"
// In general, this method should not be exported to client
// so we ignore this method
// see RPCService // see RPCService
if mname == "Reference" { if mname == "Reference" {
return nil return nil
......
...@@ -84,7 +84,7 @@ type ServiceConfig struct { ...@@ -84,7 +84,7 @@ type ServiceConfig struct {
exporters []protocol.Exporter exporters []protocol.Exporter
} }
// Prefix return dubbo.service.${interface}. // Prefix returns dubbo.service.${interface}.
func (c *ServiceConfig) Prefix() string { func (c *ServiceConfig) Prefix() string {
return constant.ServiceConfigPrefix + c.InterfaceName + "." return constant.ServiceConfigPrefix + c.InterfaceName + "."
} }
...@@ -141,7 +141,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List { ...@@ -141,7 +141,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
return ports return ports
} }
// Export export the service // Export exports the service
func (c *ServiceConfig) Export() error { func (c *ServiceConfig) Export() error {
// TODO: config center start here // TODO: config center start here
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
// ConfigurationListener for changing listener's event // ConfigurationListener for changing listener's event
type ConfigurationListener interface { type ConfigurationListener interface {
// Process the notification event once there's any change happens on the config
Process(*ConfigChangeEvent) Process(*ConfigChangeEvent)
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package memory
import (
"sync"
)
import (
gxset "github.com/dubbogo/gost/container/set"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/mapping"
)
func init() {
extension.SetGlobalServiceNameMapping(GetNameMappingInstance)
}
type InMemoryServiceNameMapping struct{}
func (i *InMemoryServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error {
return nil
}
func (i *InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) {
return gxset.NewSet(config.GetApplicationConfig().Name), nil
}
var serviceNameMappingInstance *InMemoryServiceNameMapping
var serviceNameMappingOnce sync.Once
func GetNameMappingInstance() mapping.ServiceNameMapping {
serviceNameMappingOnce.Do(func() {
serviceNameMappingInstance = &InMemoryServiceNameMapping{}
})
return serviceNameMappingInstance
}
...@@ -41,7 +41,7 @@ import ( ...@@ -41,7 +41,7 @@ import (
const DEFAULT_ROOT = "dubbo" const DEFAULT_ROOT = "dubbo"
func init() { func init() {
extension.SetMetadataReportFactory("etcd", func() factory.MetadataReportFactory { extension.SetMetadataReportFactory(constant.ETCDV3_KEY, func() factory.MetadataReportFactory {
return &etcdMetadataReportFactory{} return &etcdMetadataReportFactory{}
}) })
} }
......
...@@ -20,7 +20,6 @@ package inmemory ...@@ -20,7 +20,6 @@ package inmemory
import ( import (
"context" "context"
"reflect" "reflect"
"time"
) )
import ( import (
...@@ -59,10 +58,7 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st ...@@ -59,10 +58,7 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st
invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}), invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}),
invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV})) invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV}))
start := time.Now()
res := m.invkr.Invoke(context.Background(), inv) res := m.invkr.Invoke(context.Background(), inv)
end := time.Now()
logger.Infof("duration: %s, result: %v", (end.Sub(start)).String(), res.Result())
if res.Error() != nil { if res.Error() != nil {
logger.Errorf("could not get the metadata service from remote provider: %v", res.Error()) logger.Errorf("could not get the metadata service from remote provider: %v", res.Error())
return []interface{}{}, nil return []interface{}{}, nil
......
...@@ -155,6 +155,11 @@ func (r *RPCInvocation) Invoker() protocol.Invoker { ...@@ -155,6 +155,11 @@ func (r *RPCInvocation) Invoker() protocol.Invoker {
return r.invoker return r.invoker
} }
// nolint
func (r *RPCInvocation) SetInvoker(invoker protocol.Invoker) {
r.invoker = invoker
}
// CallBack sets RPC callback method. // CallBack sets RPC callback method.
func (r *RPCInvocation) CallBack() interface{} { func (r *RPCInvocation) CallBack() interface{} {
return r.callBack return r.callBack
......
...@@ -29,19 +29,19 @@ import ( ...@@ -29,19 +29,19 @@ import (
"github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting"
) )
// BaseConfigurationListener ... // nolint
type BaseConfigurationListener struct { type BaseConfigurationListener struct {
configurators []config_center.Configurator configurators []config_center.Configurator
dynamicConfiguration config_center.DynamicConfiguration dynamicConfiguration config_center.DynamicConfiguration
defaultConfiguratorFunc func(url *common.URL) config_center.Configurator defaultConfiguratorFunc func(url *common.URL) config_center.Configurator
} }
// Configurators ... // Configurators gets Configurator from config center
func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator { func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator {
return bcl.configurators return bcl.configurators
} }
// InitWith ... // InitWith will init BaseConfigurationListener by @key+@Listener+@f
func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) { func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) {
bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration() bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration()
if bcl.dynamicConfiguration == nil { if bcl.dynamicConfiguration == nil {
...@@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente ...@@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente
} }
} }
// Process ... // Process the notification event once there's any change happens on the config.
func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
if event.ConfigType == remoting.EventTypeDel { if event.ConfigType == remoting.EventTypeDel {
...@@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin ...@@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin
return nil return nil
} }
// OverrideUrl ... // OverrideUrl gets existing configuration rule and overrides provider url before exporting.
func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) { func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) {
for _, v := range bcl.configurators { for _, v := range bcl.configurators {
v.Configure(url) v.Configure(url)
} }
} }
// ToConfigurators ... // ToConfigurators converts @urls by @f to config_center.Configurators
func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator { func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator {
if len(urls) == 0 { if len(urls) == 0 {
return nil return nil
......
...@@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) { ...@@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) {
} }
} }
// Next returns the service event from consul.
func (l *consulListener) Next() (*registry.ServiceEvent, error) { func (l *consulListener) Next() (*registry.ServiceEvent, error) {
select { select {
case event := <-l.eventCh: case event := <-l.eventCh:
...@@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) { ...@@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) {
} }
} }
// Close closes this listener
func (l *consulListener) Close() { func (l *consulListener) Close() {
close(l.done) close(l.done)
l.plan.Stop() l.plan.Stop()
......
...@@ -36,8 +36,7 @@ import ( ...@@ -36,8 +36,7 @@ import (
) )
const ( const (
// RegistryConnDelay ... registryConnDelay = 3
RegistryConnDelay = 3
) )
func init() { func init() {
...@@ -148,7 +147,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti ...@@ -148,7 +147,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti
return return
} }
logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second) time.Sleep(time.Duration(registryConnDelay) * time.Second)
continue continue
} }
...@@ -171,10 +170,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error) ...@@ -171,10 +170,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error)
return listener, err return listener, err
} }
// GetUrl get registry URL of consul registry center
func (r *consulRegistry) GetUrl() common.URL { func (r *consulRegistry) GetUrl() common.URL {
return *r.URL return *r.URL
} }
// IsAvailable checks consul registry center whether is available
func (r *consulRegistry) IsAvailable() bool { func (r *consulRegistry) IsAvailable() bool {
select { select {
case <-r.done: case <-r.done:
...@@ -184,6 +185,7 @@ func (r *consulRegistry) IsAvailable() bool { ...@@ -184,6 +185,7 @@ func (r *consulRegistry) IsAvailable() bool {
} }
} }
// Destroy consul registry center
func (r *consulRegistry) Destroy() { func (r *consulRegistry) Destroy() {
close(r.done) close(r.done)
} }
...@@ -46,6 +46,8 @@ func init() { ...@@ -46,6 +46,8 @@ func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory) extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
} }
// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct { type RegistryDirectory struct {
directory.BaseDirectory directory.BaseDirectory
cacheInvokers []protocol.Invoker cacheInvokers []protocol.Invoker
......
...@@ -38,15 +38,17 @@ type dataListener struct { ...@@ -38,15 +38,17 @@ type dataListener struct {
listener config_center.ConfigurationListener listener config_center.ConfigurationListener
} }
// NewRegistryDataListener // NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener} return &dataListener{listener: listener}
} }
// AddInterestedURL adds a registration @url to listen
func (l *dataListener) AddInterestedURL(url *common.URL) { func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url) l.interestedURL = append(l.interestedURL, url)
} }
// DataChange processes the data change event from registry center of etcd
func (l *dataListener) DataChange(eventType remoting.Event) bool { func (l *dataListener) DataChange(eventType remoting.Event) bool {
index := strings.Index(eventType.Path, "/providers/") index := strings.Index(eventType.Path, "/providers/")
...@@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { ...@@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
} }
// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType l.events <- configType
} }
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) { func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for { for {
select { select {
...@@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { ...@@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
} }
} }
// Close etcd registry center
func (l *configurationListener) Close() { func (l *configurationListener) Close() {
l.registry.WaitGroup().Done() l.registry.WaitGroup().Done()
} }
...@@ -104,31 +104,37 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { ...@@ -104,31 +104,37 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
return r, nil return r, nil
} }
// InitListeners init listeners of etcd registry center
func (r *etcdV3Registry) InitListeners() { func (r *etcdV3Registry) InitListeners() {
r.listener = etcdv3.NewEventListener(r.client) r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r) r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener) r.dataListener = NewRegistryDataListener(r.configListener)
} }
// DoRegister actually do the register job in the registry center of etcd
func (r *etcdV3Registry) DoRegister(root string, node string) error { func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "") return r.client.Create(path.Join(root, node), "")
} }
// nolint
func (r *etcdV3Registry) DoUnregister(root string, node string) error { func (r *etcdV3Registry) DoUnregister(root string, node string) error {
return perrors.New("DoUnregister is not support in etcdV3Registry") return perrors.New("DoUnregister is not support in etcdV3Registry")
} }
// CloseAndNilClient closes listeners and clear client
func (r *etcdV3Registry) CloseAndNilClient() { func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close() r.client.Close()
r.client = nil r.client = nil
} }
// CloseListener closes listeners
func (r *etcdV3Registry) CloseListener() { func (r *etcdV3Registry) CloseListener() {
if r.configListener != nil { if r.configListener != nil {
r.configListener.Close() r.configListener.Close()
} }
} }
// CreatePath create the path in the registry center of etcd
func (r *etcdV3Registry) CreatePath(k string) error { func (r *etcdV3Registry) CreatePath(k string) error {
var tmpPath string var tmpPath string
for _, str := range strings.Split(k, "/")[1:] { for _, str := range strings.Split(k, "/")[1:] {
...@@ -141,6 +147,7 @@ func (r *etcdV3Registry) CreatePath(k string) error { ...@@ -141,6 +147,7 @@ func (r *etcdV3Registry) CreatePath(k string) error {
return nil return nil
} }
// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var ( var (
......
...@@ -19,6 +19,18 @@ package etcdv3 ...@@ -19,6 +19,18 @@ package etcdv3
import ( import (
"fmt" "fmt"
"sync"
"time"
)
import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
"github.com/hashicorp/vault/helper/jsonutil"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/logger"
...@@ -26,12 +38,6 @@ import ( ...@@ -26,12 +38,6 @@ import (
"github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/etcdv3" "github.com/apache/dubbo-go/remoting/etcdv3"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
"github.com/hashicorp/vault/helper/jsonutil"
perrors "github.com/pkg/errors"
"sync"
"time"
) )
const ( const (
...@@ -83,8 +89,13 @@ func (e *etcdV3ServiceDiscovery) Register(instance registry.ServiceInstance) err ...@@ -83,8 +89,13 @@ func (e *etcdV3ServiceDiscovery) Register(instance registry.ServiceInstance) err
if nil != e.client { if nil != e.client {
ins, err := jsonutil.EncodeJSON(instance) ins, err := jsonutil.EncodeJSON(instance)
if err == nil { if err == nil {
e.client.Create(path, string(ins)) err = e.client.Update(path, string(ins))
e.services.Add(instance.GetServiceName()) if err != nil {
logger.Errorf("cannot register the instance: %s", string(ins), err)
} else {
e.services.Add(instance.GetServiceName())
}
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcdv3 package etcdv3
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import ( import (
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/registry"
"github.com/stretchr/testify/assert"
"testing"
) )
var testName = "test" var testName = "test"
...@@ -20,8 +40,8 @@ func setUp() { ...@@ -20,8 +40,8 @@ func setUp() {
} }
config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
Address: "localhost:2380", Address: "localhost:2379",
TimeoutStr: "1000s", TimeoutStr: "10s",
} }
} }
...@@ -44,7 +64,7 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) { ...@@ -44,7 +64,7 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
Address: "localhost:2380", Address: "localhost:2379",
TimeoutStr: "10s", TimeoutStr: "10s",
} }
...@@ -53,96 +73,8 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) { ...@@ -53,96 +73,8 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) {
assert.NotNil(t, res) assert.NotNil(t, res)
} }
func TestEtcdV3ServiceDiscovery_Destroy(t *testing.T) {
setUp()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName)
assert.Nil(t, err)
assert.NotNil(t, serviceDiscovery)
err = serviceDiscovery.Destroy()
assert.Nil(t, err)
assert.NotNil(t, serviceDiscovery.(*etcdV3ServiceDiscovery).client)
}
func TestEtcdV3ServiceDiscovery_CRUD(t *testing.T) {
setUp()
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
})
extension.SetAndInitGlobalDispatcher("mock")
serviceName := "service-name"
id := "id"
host := "host"
port := 123
instance := &registry.DefaultServiceInstance{
Id: id,
ServiceName: serviceName,
Host: host,
Port: port,
Enable: true,
Healthy: true,
Metadata: nil,
}
// clean data
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName)
// clean data for local test
serviceDiscovry.Unregister(&registry.DefaultServiceInstance{
Id: id,
ServiceName: serviceName,
Host: host,
Port: port,
})
err := serviceDiscovry.Register(instance)
assert.Nil(t, err)
page := serviceDiscovry.GetHealthyInstancesByPage(serviceName, 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
assert.Equal(t, 10, page.GetPageSize())
assert.Equal(t, 1, page.GetDataSize())
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
assert.NotNil(t, instance)
assert.Equal(t, id, instance.GetId())
assert.Equal(t, host, instance.GetHost())
assert.Equal(t, port, instance.GetPort())
assert.Equal(t, serviceName, instance.GetServiceName())
assert.Equal(t, 0, len(instance.GetMetadata()))
instance.Metadata["a"] = "b"
err = serviceDiscovry.Update(instance)
assert.Nil(t, err)
pageMap := serviceDiscovry.GetRequestInstances([]string{serviceName}, 0, 1)
assert.Equal(t, 1, len(pageMap))
page = pageMap[serviceName]
assert.NotNil(t, page)
assert.Equal(t, 1, len(page.GetData()))
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
v, _ := instance.Metadata["a"]
assert.Equal(t, "b", v)
// test dispatcher event
err = serviceDiscovry.DispatchEventByServiceName(serviceName)
assert.Nil(t, err)
// test AddListener
err = serviceDiscovry.AddListener(&registry.ServiceInstancesChangedListener{ServiceName: serviceName})
assert.Nil(t, err)
}
func TestEtcdV3ServiceDiscovery_GetDefaultPageSize(t *testing.T) { func TestEtcdV3ServiceDiscovery_GetDefaultPageSize(t *testing.T) {
setUp() setUp()
serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) serviceDiscovry := &etcdV3ServiceDiscovery{}
assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize())
} }
...@@ -37,7 +37,7 @@ func init() { ...@@ -37,7 +37,7 @@ func init() {
// service event // service event
// //////////////////////////////////////// // ////////////////////////////////////////
// ServiceEvent ... // ServiceEvent includes create, update, delete event
type ServiceEvent struct { type ServiceEvent struct {
Action remoting.EventType Action remoting.EventType
Service common.URL Service common.URL
......
...@@ -38,12 +38,12 @@ type dataListener struct { ...@@ -38,12 +38,12 @@ type dataListener struct {
listener config_center.ConfigurationListener listener config_center.ConfigurationListener
} }
// NewRegistryDataListener // NewRegistryDataListener creates a data listener for kubernetes
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener} return &dataListener{listener: listener}
} }
// AddInterestedURL // AddInterestedURL adds the @url of registry center to the listener
func (l *dataListener) AddInterestedURL(url *common.URL) { func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url) l.interestedURL = append(l.interestedURL, url)
} }
...@@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener { ...@@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
} }
// Process processes the data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType l.events <- configType
} }
// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) { func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for { for {
select { select {
...@@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { ...@@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
} }
} }
} }
// Close kubernetes registry center
func (l *configurationListener) Close() { func (l *configurationListener) Close() {
l.registry.WaitGroup().Done() l.registry.WaitGroup().Done()
} }
...@@ -68,23 +68,28 @@ type kubernetesRegistry struct { ...@@ -68,23 +68,28 @@ type kubernetesRegistry struct {
configListener *configurationListener configListener *configurationListener
} }
// Client gets the etcdv3 kubernetes
func (r *kubernetesRegistry) Client() *kubernetes.Client { func (r *kubernetesRegistry) Client() *kubernetes.Client {
r.cltLock.RLock() r.cltLock.RLock()
client := r.client client := r.client
r.cltLock.RUnlock() r.cltLock.RUnlock()
return client return client
} }
// SetClient sets the kubernetes client
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.cltLock.Lock() r.cltLock.Lock()
r.client = client r.client = client
r.cltLock.Unlock() r.cltLock.Unlock()
} }
// CloseAndNilClient closes listeners and clear client
func (r *kubernetesRegistry) CloseAndNilClient() { func (r *kubernetesRegistry) CloseAndNilClient() {
r.client.Close() r.client.Close()
r.client = nil r.client = nil
} }
// CloseListener closes listeners
func (r *kubernetesRegistry) CloseListener() { func (r *kubernetesRegistry) CloseListener() {
r.cltLock.Lock() r.cltLock.Lock()
...@@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() { ...@@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() {
r.configListener = nil r.configListener = nil
} }
// CreatePath create the path in the registry center of kubernetes
func (r *kubernetesRegistry) CreatePath(k string) error { func (r *kubernetesRegistry) CreatePath(k string) error {
if err := r.client.Create(k, ""); err != nil { if err := r.client.Create(k, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in kubernetes", k) return perrors.WithMessagef(err, "create path %s in kubernetes", k)
...@@ -103,6 +109,7 @@ func (r *kubernetesRegistry) CreatePath(k string) error { ...@@ -103,6 +109,7 @@ func (r *kubernetesRegistry) CreatePath(k string) error {
return nil return nil
} }
// DoRegister actually do the register job in the registry center of kubernetes
func (r *kubernetesRegistry) DoRegister(root string, node string) error { func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "") return r.client.Create(path.Join(root, node), "")
} }
...@@ -111,6 +118,7 @@ func (r *kubernetesRegistry) DoUnregister(root string, node string) error { ...@@ -111,6 +118,7 @@ func (r *kubernetesRegistry) DoUnregister(root string, node string) error {
return perrors.New("DoUnregister is not support in kubernetesRegistry") return perrors.New("DoUnregister is not support in kubernetesRegistry")
} }
// DoSubscribe actually subscribe the provider URL
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var ( var (
...@@ -143,10 +151,12 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er ...@@ -143,10 +151,12 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er
return configListener, nil return configListener, nil
} }
// nolint
func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry") return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry")
} }
// InitListeners init listeners of kubernetes registry center
func (r *kubernetesRegistry) InitListeners() { func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client) r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r) r.configListener = NewConfigurationListener(r)
...@@ -191,6 +201,7 @@ func newMockKubernetesRegistry( ...@@ -191,6 +201,7 @@ func newMockKubernetesRegistry(
return r, nil return r, nil
} }
// HandleClientRestart will reconnect to kubernetes registry center
func (r *kubernetesRegistry) HandleClientRestart() { func (r *kubernetesRegistry) HandleClientRestart() {
var ( var (
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment