Skip to content
Snippets Groups Projects
Commit cdf86151 authored by vito.he's avatar vito.he
Browse files

Add:2.6.0 registry directory ut & destroy func & bug fix in zk.registry

parent 56a03009
No related branches found
No related tags found
No related merge requests found
package directory
import (
"github.com/tevino/abool"
)
import (
"github.com/dubbo/dubbo-go/config"
)
type BaseDirectory struct {
url *config.URL
destroyed bool
destroyed *abool.AtomicBool
}
func NewBaseDirectory(url *config.URL) BaseDirectory {
return BaseDirectory{
url: url,
url: url,
destroyed: abool.NewBool(false),
}
}
func (dir *BaseDirectory) GetUrl() config.URL {
......@@ -19,5 +23,10 @@ func (dir *BaseDirectory) GetUrl() config.URL {
}
func (dir *BaseDirectory) Destroy() {
dir.destroyed = false
if dir.destroyed.SetToIf(false, true) {
}
}
func (dir *BaseDirectory) IsAvailable() bool {
return !dir.destroyed.IsSet()
}
......@@ -80,6 +80,6 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: jerrors.Errorf("Failed to invoke the method %v in the service %v . Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invocation, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, version.Version, result.Error(),
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, version.Version, result.Error().Error(),
)}
}
package cluster
import (
"github.com/dubbo/dubbo-go/cluster"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
type MockCluster struct {
}
func NewMockCluster() cluster.Cluster {
return &MockCluster{}
}
func (cluster *MockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(config.URL{})
}
......@@ -11,6 +11,7 @@ const (
DEFAULT_PROTOCOL = "dubbo"
DEFAULT_VERSION = ""
DEFAULT_REG_TIMEOUT = "10s"
DEFAULT_CLUSTER = "failover"
)
const (
......
......@@ -47,7 +47,7 @@ func main() {
initProfiling()
time.Sleep(3e9)
time.Sleep(5e9)
gxlog.CInfo("\n\n\nstart to test jsonrpc")
user := &JsonRPCUser{}
......
package protocolwrapper
import (
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol"
)
type mockProtocolFilter struct {
}
func NewMockProtocolFilter() protocol.Protocol {
return &mockProtocolFilter{}
}
func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporter {
return protocol.NewBaseExporter("key", invoker, nil)
}
func (pfw *mockProtocolFilter) Refer(url config.URL) protocol.Invoker {
return protocol.NewBaseInvoker(url)
}
func (pfw *mockProtocolFilter) Destroy() {
}
......@@ -27,12 +27,6 @@ type Options struct {
}
type Option func(*Options)
func WithServiceTTL(ttl time.Duration) Option {
return func(o *Options) {
o.serviceTTL = ttl
}
}
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
......@@ -44,7 +38,7 @@ type RegistryDirectory struct {
Options
}
func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...Option) *RegistryDirectory {
func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...Option) (*RegistryDirectory, error) {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
......@@ -52,7 +46,9 @@ func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...O
for _, opt := range opts {
opt(&options)
}
if url.SubURL == nil {
return nil, jerrors.Errorf("url is invalid, suburl can not be nil")
}
return &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
......@@ -60,20 +56,20 @@ func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...O
serviceType: url.SubURL.Service(),
registry: registry,
Options: options,
}
}, nil
}
//subscibe from registry
func (dir *RegistryDirectory) Subscribe(url config.URL) {
for {
if dir.registry.IsClosed() {
if !dir.registry.IsAvailable() {
log.Warn("event listener game over.")
return
}
listener, err := dir.registry.Subscribe(url)
if err != nil {
if dir.registry.IsClosed() {
if !dir.registry.IsAvailable() {
log.Warn("event listener game over.")
return
}
......@@ -159,7 +155,7 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc
} else {
for _, invokers := range groupInvokersMap {
staticDir := directory.NewStaticDirectory(invokers)
cluster := extension.GetCluster(dir.GetUrl().SubURL.Params.Get(constant.CLUSTER_KEY))
cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
groupInvokersList = append(groupInvokersList, cluster.Join(staticDir))
}
}
......@@ -184,7 +180,9 @@ func (dir *RegistryDirectory) cacheInvoker(url config.URL) *sync.Map {
if _, ok := newCacheInvokers.Load(url.Key()); !ok {
log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key())
newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url)
newCacheInvokers.Store(url.Key(), newInvoker)
if newInvoker != nil {
newCacheInvokers.Store(url.Key(), newInvoker)
}
}
}
return newCacheInvokers
......@@ -197,10 +195,12 @@ func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.In
}
func (dir *RegistryDirectory) IsAvailable() bool {
return true
return dir.BaseDirectory.IsAvailable()
}
func (dir *RegistryDirectory) Destroy() {
//dir.registry.Destroy() should move it in protocol
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy()
}
......
package directory
import (
"context"
"net/url"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/dubbo/dubbo-go/cluster/support"
"github.com/dubbo/dubbo-go/common/constant"
"github.com/dubbo/dubbo-go/common/extension"
"github.com/dubbo/dubbo-go/config"
"github.com/dubbo/dubbo-go/protocol/protocolwrapper"
"github.com/dubbo/dubbo-go/registry"
)
func TestSubscribe(t *testing.T) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000")
url.SubURL = &suburl
mockRegistry := registry.NewMockRegistry()
registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry)
go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))})
}
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
}
func TestSubscribe_Delete(t *testing.T) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000")
url.SubURL = &suburl
mockRegistry := registry.NewMockRegistry()
registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry)
go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))})
}
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceDel, Service: *config.NewURLWithOptions("TEST0", config.WithProtocol("dubbo"))})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
func TestSubscribe_InvalidUrl(t *testing.T) {
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
mockRegistry := registry.NewMockRegistry()
_, err := NewRegistryDirectory(&url, mockRegistry)
assert.Error(t, err)
}
func TestSubscribe_Group(t *testing.T) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
extension.SetCluster("mock", cluster.NewMockCluster)
regurl, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000")
suburl.Params.Set(constant.CLUSTER_KEY, "mock")
regurl.SubURL = &suburl
mockRegistry := registry.NewMockRegistry()
registryDirectory, _ := NewRegistryDirectory(&regurl, mockRegistry)
go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice"))
//for group1
urlmap := url.Values{}
urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"),
config.WithParams(urlmap))})
}
//for group2
urlmap2 := url.Values{}
urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"),
config.WithParams(urlmap2))})
}
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
package registry
import "github.com/dubbo/dubbo-go/config"
import (
"github.com/dubbo/dubbo-go/config"
)
type MockRegistry struct {
listener *listener
isClosed bool
}
func NewMockRegistry() *MockRegistry {
registry := &MockRegistry{
isClosed: false,
}
listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)}
registry.listener = listener
return registry
}
func (*MockRegistry) Register(url config.URL) error {
return nil
}
func (*MockRegistry) Close() {
func (r *MockRegistry) Destroy() {
r.isClosed = true
}
func (r *MockRegistry) IsAvailable() bool {
return r.isClosed
}
func (r *MockRegistry) GetUrl() config.URL {
return config.URL{}
}
func (r *MockRegistry) Subscribe(config.URL) (Listener, error) {
return r.listener, nil
}
func (*MockRegistry) IsClosed() bool {
return false
type listener struct {
count int64
registry *MockRegistry
listenChan chan *ServiceEvent
}
//func (*MockRegistry) Subscribe(config.URL) (Listener, error) {
//
//}
//
//type listener struct{}
//
//func Next() (*ServiceEvent, error) {
//
//}
func (l *listener) Next() (*ServiceEvent, error) {
select {
case e := <-l.listenChan:
return e, nil
}
}
func (*listener) Close() {
}
func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event
}
......@@ -64,9 +64,12 @@ func (proto *RegistryProtocol) Refer(url config.URL) protocol.Invoker {
}
//new registry directory for store service url from registry
directory := directory2.NewRegistryDirectory(&registryUrl, reg)
err := reg.Register(*serviceUrl)
directory, err := directory2.NewRegistryDirectory(&registryUrl, reg)
if err != nil {
log.Error("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error())
return nil
}
err = reg.Register(*serviceUrl)
if err != nil {
log.Error("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error())
}
......@@ -113,6 +116,7 @@ func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}
func (*RegistryProtocol) Destroy() {
}
func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL {
......
package registry
import (
"github.com/dubbo/dubbo-go/common"
"github.com/dubbo/dubbo-go/config"
)
// Extension - Registry
type Registry interface {
common.Node
//used for service provider calling , register services to registry
//And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring.
Register(url config.URL) error
......@@ -17,9 +18,9 @@ type Registry interface {
//input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available
//GetService(SubURL) ([]SubURL, error)
//close the registry for Elegant closing
Close()
//Close()
//return if the registry is closed for consumer subscribing
IsClosed() bool
//IsClosed() bool
}
type Listener interface {
......
......@@ -102,8 +102,11 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) {
return r, nil
}
func (r *ZkRegistry) GetUrl() config.URL {
return *r.URL
}
func (r *ZkRegistry) Close() {
func (r *ZkRegistry) Destroy() {
close(r.done)
r.wg.Wait()
r.closeRegisters()
......@@ -135,7 +138,7 @@ func (r *ZkRegistry) validateZookeeperClient() error {
if r.client.conn == nil {
var event <-chan zk.Event
r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout)
if err != nil {
if err == nil {
r.client.wait.Add(1)
go r.client.handleZkEvent(event)
}
......@@ -417,11 +420,11 @@ func (r *ZkRegistry) closeRegisters() {
r.services = nil
}
func (r *ZkRegistry) IsClosed() bool {
func (r *ZkRegistry) IsAvailable() bool {
select {
case <-r.done:
return true
default:
return false
default:
return true
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment