diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 387e4a87cba8fc35e13c4a59f0ab2b886797cb29..e1ea20be09e89e337d95d9951e75027c4dd16cdb 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -1,17 +1,21 @@ package directory +import ( + "github.com/tevino/abool" +) import ( "github.com/dubbo/dubbo-go/config" ) type BaseDirectory struct { url *config.URL - destroyed bool + destroyed *abool.AtomicBool } func NewBaseDirectory(url *config.URL) BaseDirectory { return BaseDirectory{ - url: url, + url: url, + destroyed: abool.NewBool(false), } } func (dir *BaseDirectory) GetUrl() config.URL { @@ -19,5 +23,10 @@ func (dir *BaseDirectory) GetUrl() config.URL { } func (dir *BaseDirectory) Destroy() { - dir.destroyed = false + if dir.destroyed.SetToIf(false, true) { + } +} + +func (dir *BaseDirectory) IsAvailable() bool { + return !dir.destroyed.IsSet() } diff --git a/cluster/support/failover_cluster_invoker.go b/cluster/support/failover_cluster_invoker.go index 22c537acb67ebf70973c36dffc057015afe9e47c..d52648c35bd26497251a2e0c13a363c176a74f07 100644 --- a/cluster/support/failover_cluster_invoker.go +++ b/cluster/support/failover_cluster_invoker.go @@ -80,6 +80,6 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr ip, _ := gxnet.GetLocalIP() return &protocol.RPCResult{Err: jerrors.Errorf("Failed to invoke the method %v in the service %v . Tried %v times of "+ "the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.", - methodName, invocation, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, version.Version, result.Error(), + methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, version.Version, result.Error().Error(), )} } diff --git a/cluster/support/mock_cluster.go b/cluster/support/mock_cluster.go new file mode 100644 index 0000000000000000000000000000000000000000..5e4ad2d9f79c02b68c98862985baaad4ce01041f --- /dev/null +++ b/cluster/support/mock_cluster.go @@ -0,0 +1,18 @@ +package cluster + +import ( + "github.com/dubbo/dubbo-go/cluster" + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" +) + +type MockCluster struct { +} + +func NewMockCluster() cluster.Cluster { + return &MockCluster{} +} + +func (cluster *MockCluster) Join(directory cluster.Directory) protocol.Invoker { + return protocol.NewBaseInvoker(config.URL{}) +} diff --git a/common/constant/default.go b/common/constant/default.go index bfcf304805deecd666b92b03f0de231448fceba4..5871e3cec8ec8dd1c17cb79f002fd9c386d15ecb 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -11,6 +11,7 @@ const ( DEFAULT_PROTOCOL = "dubbo" DEFAULT_VERSION = "" DEFAULT_REG_TIMEOUT = "10s" + DEFAULT_CLUSTER = "failover" ) const ( diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index f120a54455a8389328ca3884a0d9715c2e21ef4e..e559c3640aac93dc24338740b506f339a47583b6 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -47,7 +47,7 @@ func main() { initProfiling() - time.Sleep(3e9) + time.Sleep(5e9) gxlog.CInfo("\n\n\nstart to test jsonrpc") user := &JsonRPCUser{} diff --git a/protocol/protocolwrapper/mock_protocol_filter.go b/protocol/protocolwrapper/mock_protocol_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..3f9f1140111aa9ca8afd3929571dcf5407145aee --- /dev/null +++ b/protocol/protocolwrapper/mock_protocol_filter.go @@ -0,0 +1,25 @@ +package protocolwrapper + +import ( + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol" +) + +type mockProtocolFilter struct { +} + +func NewMockProtocolFilter() protocol.Protocol { + return &mockProtocolFilter{} +} + +func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporter { + return protocol.NewBaseExporter("key", invoker, nil) +} + +func (pfw *mockProtocolFilter) Refer(url config.URL) protocol.Invoker { + return protocol.NewBaseInvoker(url) +} + +func (pfw *mockProtocolFilter) Destroy() { + +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 9124d94a0bddc18a769af8865ae9d156ac08408b..a56cfa4cb9a2475853e536684a16533b99b58b89 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -27,12 +27,6 @@ type Options struct { } type Option func(*Options) -func WithServiceTTL(ttl time.Duration) Option { - return func(o *Options) { - o.serviceTTL = ttl - } -} - type RegistryDirectory struct { directory.BaseDirectory cacheInvokers []protocol.Invoker @@ -44,7 +38,7 @@ type RegistryDirectory struct { Options } -func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...Option) *RegistryDirectory { +func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...Option) (*RegistryDirectory, error) { options := Options{ //default 300s serviceTTL: time.Duration(300e9), @@ -52,7 +46,9 @@ func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...O for _, opt := range opts { opt(&options) } - + if url.SubURL == nil { + return nil, jerrors.Errorf("url is invalid, suburl can not be nil") + } return &RegistryDirectory{ BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, @@ -60,20 +56,20 @@ func NewRegistryDirectory(url *config.URL, registry registry.Registry, opts ...O serviceType: url.SubURL.Service(), registry: registry, Options: options, - } + }, nil } //subscibe from registry func (dir *RegistryDirectory) Subscribe(url config.URL) { for { - if dir.registry.IsClosed() { + if !dir.registry.IsAvailable() { log.Warn("event listener game over.") return } listener, err := dir.registry.Subscribe(url) if err != nil { - if dir.registry.IsClosed() { + if !dir.registry.IsAvailable() { log.Warn("event listener game over.") return } @@ -159,7 +155,7 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc } else { for _, invokers := range groupInvokersMap { staticDir := directory.NewStaticDirectory(invokers) - cluster := extension.GetCluster(dir.GetUrl().SubURL.Params.Get(constant.CLUSTER_KEY)) + cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) } } @@ -184,7 +180,9 @@ func (dir *RegistryDirectory) cacheInvoker(url config.URL) *sync.Map { if _, ok := newCacheInvokers.Load(url.Key()); !ok { log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key()) newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url) - newCacheInvokers.Store(url.Key(), newInvoker) + if newInvoker != nil { + newCacheInvokers.Store(url.Key(), newInvoker) + } } } return newCacheInvokers @@ -197,10 +195,12 @@ func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.In } func (dir *RegistryDirectory) IsAvailable() bool { - return true + return dir.BaseDirectory.IsAvailable() } func (dir *RegistryDirectory) Destroy() { + //dir.registry.Destroy() should move it in protocol + //TODO:unregister & unsubscribe dir.BaseDirectory.Destroy() } diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8c4a07d4f8f69942836c622c9b759ad31b49aea2 --- /dev/null +++ b/registry/directory/directory_test.go @@ -0,0 +1,99 @@ +package directory + +import ( + "context" + "net/url" + "strconv" + "testing" + "time" +) +import ( + "github.com/stretchr/testify/assert" +) +import ( + "github.com/dubbo/dubbo-go/cluster/support" + "github.com/dubbo/dubbo-go/common/constant" + "github.com/dubbo/dubbo-go/common/extension" + "github.com/dubbo/dubbo-go/config" + "github.com/dubbo/dubbo-go/protocol/protocolwrapper" + "github.com/dubbo/dubbo-go/registry" +) + +func TestSubscribe(t *testing.T) { + extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) + + url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") + url.SubURL = &suburl + mockRegistry := registry.NewMockRegistry() + registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) + + go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) + for i := 0; i < 3; i++ { + mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) + } + + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 3) +} + +func TestSubscribe_Delete(t *testing.T) { + extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) + + url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") + url.SubURL = &suburl + mockRegistry := registry.NewMockRegistry() + registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) + + go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) + for i := 0; i < 3; i++ { + mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) + } + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 3) + mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceDel, Service: *config.NewURLWithOptions("TEST0", config.WithProtocol("dubbo"))}) + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 2) + +} +func TestSubscribe_InvalidUrl(t *testing.T) { + url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + mockRegistry := registry.NewMockRegistry() + _, err := NewRegistryDirectory(&url, mockRegistry) + assert.Error(t, err) +} + +func TestSubscribe_Group(t *testing.T) { + extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) + extension.SetCluster("mock", cluster.NewMockCluster) + + regurl, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") + suburl.Params.Set(constant.CLUSTER_KEY, "mock") + regurl.SubURL = &suburl + mockRegistry := registry.NewMockRegistry() + registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) + + go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) + + //for group1 + urlmap := url.Values{} + urlmap.Set(constant.GROUP_KEY, "group1") + urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url + for i := 0; i < 3; i++ { + mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), + config.WithParams(urlmap))}) + } + //for group2 + urlmap2 := url.Values{} + urlmap2.Set(constant.GROUP_KEY, "group2") + urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url + for i := 0; i < 3; i++ { + mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), + config.WithParams(urlmap2))}) + } + + time.Sleep(1e9) + assert.Len(t, registryDirectory.cacheInvokers, 2) +} diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 65f17e0ffb877ce53eec7bffebe089f17754c589..a7ed812488dc21f83f139ef4f3005b38766c663e 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -1,27 +1,57 @@ package registry -import "github.com/dubbo/dubbo-go/config" +import ( + "github.com/dubbo/dubbo-go/config" +) type MockRegistry struct { + listener *listener + isClosed bool } +func NewMockRegistry() *MockRegistry { + registry := &MockRegistry{ + isClosed: false, + } + listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)} + registry.listener = listener + return registry +} func (*MockRegistry) Register(url config.URL) error { return nil } -func (*MockRegistry) Close() { +func (r *MockRegistry) Destroy() { + r.isClosed = true +} +func (r *MockRegistry) IsAvailable() bool { + return r.isClosed +} +func (r *MockRegistry) GetUrl() config.URL { + return config.URL{} +} +func (r *MockRegistry) Subscribe(config.URL) (Listener, error) { + return r.listener, nil } -func (*MockRegistry) IsClosed() bool { - return false + +type listener struct { + count int64 + registry *MockRegistry + listenChan chan *ServiceEvent } -//func (*MockRegistry) Subscribe(config.URL) (Listener, error) { -// -//} -// -//type listener struct{} -// -//func Next() (*ServiceEvent, error) { -// -//} +func (l *listener) Next() (*ServiceEvent, error) { + select { + case e := <-l.listenChan: + return e, nil + } +} + +func (*listener) Close() { + +} + +func (r *MockRegistry) MockEvent(event *ServiceEvent) { + r.listener.listenChan <- event +} diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 4a877591b23c26f6b8d84fa33dbf618c007341d2..9e35dd243985513f86b29f397dafd4fd9082d8e2 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -64,9 +64,12 @@ func (proto *RegistryProtocol) Refer(url config.URL) protocol.Invoker { } //new registry directory for store service url from registry - directory := directory2.NewRegistryDirectory(®istryUrl, reg) - - err := reg.Register(*serviceUrl) + directory, err := directory2.NewRegistryDirectory(®istryUrl, reg) + if err != nil { + log.Error("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) + return nil + } + err = reg.Register(*serviceUrl) if err != nil { log.Error("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) } @@ -113,6 +116,7 @@ func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporte } func (*RegistryProtocol) Destroy() { + } func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL { diff --git a/registry/registry.go b/registry/registry.go index 2674287dfcccf6e33040eca7ca52bb9d721a77b5..00e065694a3a313dba67939e71f43ff80715379c 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -1,12 +1,13 @@ package registry import ( + "github.com/dubbo/dubbo-go/common" "github.com/dubbo/dubbo-go/config" ) // Extension - Registry type Registry interface { - + common.Node //used for service provider calling , register services to registry //And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring. Register(url config.URL) error @@ -17,9 +18,9 @@ type Registry interface { //input the serviceConfig , registry should return serviceUrlArray with multi location(provider nodes) available //GetService(SubURL) ([]SubURL, error) //close the registry for Elegant closing - Close() + //Close() //return if the registry is closed for consumer subscribing - IsClosed() bool + //IsClosed() bool } type Listener interface { diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 3b05b7446bac463944fe1560719142b536202bd7..9cfa92e54b5e21b2f28dba85fa328474dae5c34e 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -102,8 +102,11 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) { return r, nil } +func (r *ZkRegistry) GetUrl() config.URL { + return *r.URL +} -func (r *ZkRegistry) Close() { +func (r *ZkRegistry) Destroy() { close(r.done) r.wg.Wait() r.closeRegisters() @@ -135,7 +138,7 @@ func (r *ZkRegistry) validateZookeeperClient() error { if r.client.conn == nil { var event <-chan zk.Event r.client.conn, event, err = zk.Connect(r.client.zkAddrs, r.client.timeout) - if err != nil { + if err == nil { r.client.wait.Add(1) go r.client.handleZkEvent(event) } @@ -417,11 +420,11 @@ func (r *ZkRegistry) closeRegisters() { r.services = nil } -func (r *ZkRegistry) IsClosed() bool { +func (r *ZkRegistry) IsAvailable() bool { select { case <-r.done: - return true - default: return false + default: + return true } }