diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index 7b38a63f2c62e37ce1f2242707b3f1f00709b74f..a0a69304096a02314e91d6bc17bc0448af2a9996 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -2,6 +2,7 @@ package directory import ( "github.com/tevino/abool" + "sync" ) import ( "github.com/dubbo/go-for-apache-dubbo/config" @@ -10,6 +11,7 @@ import ( type BaseDirectory struct { url *config.URL destroyed *abool.AtomicBool + mutex sync.Mutex } func NewBaseDirectory(url *config.URL) BaseDirectory { @@ -22,8 +24,11 @@ func (dir *BaseDirectory) GetUrl() config.URL { return *dir.url } -func (dir *BaseDirectory) Destroy() { +func (dir *BaseDirectory) Destroy(doDestroy func()) { if dir.destroyed.SetToIf(false, true) { + dir.mutex.Lock() + doDestroy() + dir.mutex.Unlock() } } diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index 80ff645368ca5a2d6fb27c50d0ce59301e51cff4..d3117c97d4208410bf0a7c3e2e4fc828bbe903b3 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -1,6 +1,7 @@ package directory import ( + "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -11,7 +12,7 @@ type StaticDirectory struct { func NewStaticDirectory(invokers []protocol.Invoker) *StaticDirectory { return &StaticDirectory{ - BaseDirectory: NewBaseDirectory(nil), + BaseDirectory: NewBaseDirectory(&config.URL{}), invokers: invokers, } } @@ -30,3 +31,12 @@ func (dir *StaticDirectory) List(invocation protocol.Invocation) []protocol.Invo //TODO:Here should add router return dir.invokers } + +func (dir *StaticDirectory) Destroy() { + dir.BaseDirectory.Destroy(func() { + for _, ivk := range dir.invokers { + ivk.Destroy() + } + dir.invokers = []protocol.Invoker{} + }) +} diff --git a/cluster/support/mock_cluster.go b/cluster/support/mock_cluster.go index 58d7465527fdb2fbe0f6af8373feee90541b1b2f..e8a3eea7b259ffadbf4211e07a478753216bb967 100644 --- a/cluster/support/mock_cluster.go +++ b/cluster/support/mock_cluster.go @@ -2,7 +2,6 @@ package cluster import ( "github.com/dubbo/go-for-apache-dubbo/cluster" - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" ) @@ -14,5 +13,5 @@ func NewMockCluster() cluster.Cluster { } func (cluster *MockCluster) Join(directory cluster.Directory) protocol.Invoker { - return protocol.NewBaseInvoker(config.URL{}) + return protocol.NewBaseInvoker(directory.GetUrl()) } diff --git a/config/url.go b/config/url.go index a43337f51b0a559d76195bffe37539ebc356e4b3..54cc24056b5dbf6f20b7442ad5d6ad67545c8e1f 100644 --- a/config/url.go +++ b/config/url.go @@ -87,7 +87,11 @@ func WithParams(params url.Values) option { url.Params = params } } - +func WithParamsValue(key, val string) option { + return func(url *URL) { + url.Params.Set(key, val) + } +} func WithProtocol(proto string) option { return func(url *URL) { url.Protocol = proto diff --git a/go.mod b/go.mod index 06a84aa4e3cbc10da7bd3dcd45d7d54a3ed2a192..28a6e51bb114da30da2ec0f4c107f8146f412048 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ require ( github.com/AlexStocks/getty v0.0.0-20190331201845-1ca64ac5a589 github.com/AlexStocks/goext v0.3.2 github.com/AlexStocks/log4go v1.0.2 + github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect + github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect github.com/dubbogo/hessian2 v0.0.0-20190410112310-f093e4436e31 github.com/juju/errors v0.0.0-20190207033735-e65537c515d7 github.com/montanaflynn/stats v0.5.0 @@ -11,5 +13,6 @@ require ( github.com/stretchr/testify v1.3.0 github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 go.uber.org/atomic v1.3.2 + gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index b3853bbd54abd82aa2ae7f953fb75f3490f627ab..9df67210d3b95ed8227b5202a0567462c588b908 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,10 @@ github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkg github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk= github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U= @@ -156,6 +160,8 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/protocol/protocolwrapper/mock_protocol_filter.go b/protocol/protocolwrapper/mock_protocol_filter.go index e2c0326f3814b7825f5aeb8abd953926cfecfc71..2a59ac420024c5833d6822495755e13f9aa44976 100644 --- a/protocol/protocolwrapper/mock_protocol_filter.go +++ b/protocol/protocolwrapper/mock_protocol_filter.go @@ -3,6 +3,7 @@ package protocolwrapper import ( "github.com/dubbo/go-for-apache-dubbo/config" "github.com/dubbo/go-for-apache-dubbo/protocol" + "sync" ) type mockProtocolFilter struct { @@ -13,7 +14,7 @@ func NewMockProtocolFilter() protocol.Protocol { } func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporter { - return protocol.NewBaseExporter("key", invoker, nil) + return protocol.NewBaseExporter("key", invoker, &sync.Map{}) } func (pfw *mockProtocolFilter) Refer(url config.URL) protocol.Invoker { diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 0abec42657f1b39fdc0576e153b9cd47c46c2cfc..ca959b82a42c1086d40f5123c1fdadbb2f9894e7 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -199,9 +199,13 @@ func (dir *RegistryDirectory) IsAvailable() bool { } func (dir *RegistryDirectory) Destroy() { - //dir.registry.Destroy() should move it in protocol //TODO:unregister & unsubscribe - dir.BaseDirectory.Destroy() + dir.BaseDirectory.Destroy(func() { + for _, ivk := range dir.cacheInvokers { + ivk.Destroy() + } + dir.cacheInvokers = []protocol.Invoker{} + }) } // configuration > reference config >service config diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 8baf6905afafef4417e4792fd5bc842721747112..5d7c2b253d2520751f468e256140e54b77577812 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -25,12 +25,12 @@ func TestSubscribe(t *testing.T) { url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") url.SubURL = &suburl - mockRegistry := registry.NewMockRegistry() + mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) for i := 0; i < 3; i++ { - mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) } time.Sleep(1e9) @@ -43,23 +43,23 @@ func TestSubscribe_Delete(t *testing.T) { url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") url.SubURL = &suburl - mockRegistry := registry.NewMockRegistry() + mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) for i := 0; i < 3; i++ { - mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))}) } time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 3) - mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceDel, Service: *config.NewURLWithOptions("TEST0", config.WithProtocol("dubbo"))}) + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceDel, Service: *config.NewURLWithOptions("TEST0", config.WithProtocol("dubbo"))}) time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 2) } func TestSubscribe_InvalidUrl(t *testing.T) { url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") - mockRegistry := registry.NewMockRegistry() + mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) _, err := NewRegistryDirectory(&url, mockRegistry) assert.Error(t, err) } @@ -72,7 +72,7 @@ func TestSubscribe_Group(t *testing.T) { suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000") suburl.Params.Set(constant.CLUSTER_KEY, "mock") regurl.SubURL = &suburl - mockRegistry := registry.NewMockRegistry() + mockRegistry, _ := registry.NewMockRegistry(&config.URL{}) registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice")) @@ -82,7 +82,7 @@ func TestSubscribe_Group(t *testing.T) { urlmap.Set(constant.GROUP_KEY, "group1") urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), config.WithParams(urlmap))}) } //for group2 @@ -90,7 +90,7 @@ func TestSubscribe_Group(t *testing.T) { urlmap2.Set(constant.GROUP_KEY, "group2") urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url for i := 0; i < 3; i++ { - mockRegistry.MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), + mockRegistry.(*registry.MockRegistry).MockEvent(®istry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"), config.WithParams(urlmap2))}) } diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 9036d9a93f6118999ffa98439f65a5dc5dc7843f..e2fca36bbf0e4ecca192a6eaf0bf26e5747e28a6 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -1,22 +1,24 @@ package registry import ( - "github.com/dubbo/go-for-apache-dubbo/config" "github.com/tevino/abool" ) +import ( + "github.com/dubbo/go-for-apache-dubbo/config" +) type MockRegistry struct { listener *listener destroyed *abool.AtomicBool } -func NewMockRegistry() *MockRegistry { +func NewMockRegistry(url *config.URL) (Registry, error) { registry := &MockRegistry{ destroyed: abool.NewBool(false), } listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)} registry.listener = listener - return registry + return registry, nil } func (*MockRegistry) Register(url config.URL) error { return nil diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index c982edcb81d35a474e742d2c21962a30be46d702..8a61cec5bba7bc00d796df5a8c985cdabdb6ee1d 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -21,6 +21,7 @@ import ( var registryProtocol *RegistryProtocol type RegistryProtocol struct { + invokers []protocol.Invoker // Registry Map<RegistryAddress, Registry> registries sync.Map //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed. @@ -76,9 +77,11 @@ func (proto *RegistryProtocol) Refer(url config.URL) protocol.Invoker { go directory.Subscribe(*serviceUrl) //new cluster invoker - cluster := extension.GetCluster(serviceUrl.Params.Get(constant.CLUSTER_KEY)) + cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) - return cluster.Join(directory) + invoker := cluster.Join(directory) + proto.invokers = append(proto.invokers, invoker) + return invoker } func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { @@ -116,8 +119,27 @@ func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporte } -func (*RegistryProtocol) Destroy() { - +func (proto *RegistryProtocol) Destroy() { + for _, ivk := range proto.invokers { + ivk.Destroy() + } + proto.invokers = []protocol.Invoker{} + + proto.bounds.Range(func(key, value interface{}) bool { + exporter := value.(protocol.Exporter) + exporter.Unexport() + proto.bounds.Delete(key) + return true + }) + + proto.registries.Range(func(key, value interface{}) bool { + reg := value.(registry.Registry) + if reg.IsAvailable() { + reg.Destroy() + } + proto.registries.Delete(key) + return true + }) } func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go new file mode 100644 index 0000000000000000000000000000000000000000..834aaf5544c146a26e389f86d2e2d01ac0c7c6b2 --- /dev/null +++ b/registry/protocol/protocol_test.go @@ -0,0 +1,167 @@ +package protocol + +import ( + "context" + "testing" +) +import ( + "github.com/stretchr/testify/assert" +) +import ( + cluster "github.com/dubbo/go-for-apache-dubbo/cluster/support" + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/common/extension" + "github.com/dubbo/go-for-apache-dubbo/config" + "github.com/dubbo/go-for-apache-dubbo/protocol" + "github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper" + "github.com/dubbo/go-for-apache-dubbo/registry" +) + +func referNormal(t *testing.T, regProtocol *RegistryProtocol) { + extension.SetProtocol("registry", GetProtocol) + extension.SetRegistry("mock", registry.NewMockRegistry) + extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) + extension.SetCluster("mock", cluster.NewMockCluster) + + url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + + url.SubURL = &suburl + + invoker := regProtocol.Refer(url) + assert.IsType(t, &protocol.BaseInvoker{}, invoker) + assert.Equal(t, invoker.GetUrl().String(), url.String()) +} +func TestRefer(t *testing.T) { + regProtocol := NewRegistryProtocol() + referNormal(t, regProtocol) +} + +func TestMultiRegRefer(t *testing.T) { + regProtocol := NewRegistryProtocol() + referNormal(t, regProtocol) + url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:2222") + suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + + url2.SubURL = &suburl2 + + regProtocol.Refer(url2) + var count int + regProtocol.registries.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, count, 2) +} + +func TestOneRegRefer(t *testing.T) { + regProtocol := NewRegistryProtocol() + referNormal(t, regProtocol) + + url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + + url2.SubURL = &suburl2 + + regProtocol.Refer(url2) + var count int + regProtocol.registries.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, count, 1) +} +func exporterNormal(t *testing.T, regProtocol *RegistryProtocol) { + extension.SetProtocol("registry", GetProtocol) + extension.SetRegistry("mock", registry.NewMockRegistry) + extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter) + url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + + url.SubURL = &suburl + invoker := protocol.NewBaseInvoker(url) + exporter := regProtocol.Export(invoker) + + assert.IsType(t, &protocol.BaseExporter{}, exporter) + assert.Equal(t, exporter.GetInvoker().GetUrl().String(), suburl.String()) +} + +func TestExporter(t *testing.T) { + regProtocol := NewRegistryProtocol() + exporterNormal(t, regProtocol) +} + +func TestMultiRegAndMultiProtoExporter(t *testing.T) { + regProtocol := NewRegistryProtocol() + exporterNormal(t, regProtocol) + + url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:2222") + suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + + url2.SubURL = &suburl2 + invoker2 := protocol.NewBaseInvoker(url2) + regProtocol.Export(invoker2) + + var count int + regProtocol.registries.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, count, 2) + + var count2 int + regProtocol.bounds.Range(func(key, value interface{}) bool { + count2++ + return true + }) + assert.Equal(t, count2, 2) +} + +func TestOneRegAndProtoExporter(t *testing.T) { + regProtocol := NewRegistryProtocol() + exporterNormal(t, regProtocol) + + url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111") + suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock")) + + url2.SubURL = &suburl2 + invoker2 := protocol.NewBaseInvoker(url2) + regProtocol.Export(invoker2) + + var count int + regProtocol.registries.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, count, 1) + + var count2 int + regProtocol.bounds.Range(func(key, value interface{}) bool { + count2++ + return true + }) + assert.Equal(t, count2, 1) +} + +func TestDestry(t *testing.T) { + regProtocol := NewRegistryProtocol() + referNormal(t, regProtocol) + exporterNormal(t, regProtocol) + + regProtocol.Destroy() + assert.Equal(t, len(regProtocol.invokers), 0) + + var count int + regProtocol.registries.Range(func(key, value interface{}) bool { + count++ + return true + }) + assert.Equal(t, count, 0) + + var count2 int + regProtocol.bounds.Range(func(key, value interface{}) bool { + count2++ + return true + }) + assert.Equal(t, count2, 0) +} diff --git a/registry/zookeeper/consumer.go b/registry/zookeeper/consumer.go deleted file mode 100644 index 88c2c6cd31d00e2622072fc47e583be09c471a4f..0000000000000000000000000000000000000000 --- a/registry/zookeeper/consumer.go +++ /dev/null @@ -1,124 +0,0 @@ -package zookeeper - -import ( - jerrors "github.com/juju/errors" -) - -import ( - "github.com/dubbo/go-for-apache-dubbo/config" - "github.com/dubbo/go-for-apache-dubbo/registry" -) - -// name: service@protocol -//func (r *ZkRegistry) GetService(conf registry.ReferenceConfig) ([]config.SubURL, error) { -// -// var ( -// err error -// dubboPath string -// nodes []string -// listener *zkEventListener -// serviceURL config.SubURL -// serviceConf registry.ReferenceConfig -// ok bool -// ) -// r.listenerLock.Lock() -// listener = r.listener -// r.listenerLock.Unlock() -// -// if listener != nil { -// listener.listenServiceEvent(conf) -// } -// -// r.cltLock.Lock() -// serviceConf, ok = r.services[conf.Key()] -// r.cltLock.Unlock() -// if !ok { -// return nil, jerrors.Errorf("Path{%s} has not been registered", conf.Key()) -// } -// if !ok { -// return nil, jerrors.Errorf("Path{%s}: failed to get serviceConfigIf type", conf.Key()) -// } -// -// dubboPath = fmt.Sprintf("/dubbo/%s/providers", conf.Path()) -// err = r.validateZookeeperClient() -// if err != nil { -// return nil, jerrors.Trace(err) -// } -// r.cltLock.Lock() -// nodes, err = r.client.getChildren(dubboPath) -// r.cltLock.Unlock() -// if err != nil { -// log.Warn("getChildren(dubboPath{%s}) = error{%v}", dubboPath, err) -// return nil, jerrors.Trace(err) -// } -// -// var listenerServiceMap = make(map[string]config.SubURL) -// for _, n := range nodes { -// -// serviceURL, err = plugins.DefaultServiceURL()(n) -// if err != nil { -// log.Error("NewURL({%s}) = error{%v}", n, err) -// continue -// } -// if !serviceConf.ServiceEqual(serviceURL) { -// log.Warn("serviceURL{%s} is not compatible with ReferenceConfig{%#v}", serviceURL, serviceConf) -// continue -// } -// -// _, ok := listenerServiceMap[serviceURL.Params().Get(serviceURL.Location())] -// if !ok { -// listenerServiceMap[serviceURL.Location()] = serviceURL -// continue -// } -// } -// -// var services []config.SubURL -// for _, service := range listenerServiceMap { -// services = append(services, service) -// } -// -// return services, nil -//} - -func (r *ZkRegistry) Subscribe(conf config.URL) (registry.Listener, error) { - r.wg.Add(1) - return r.getListener(conf) -} - -func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) { - var ( - zkListener *zkEventListener - ) - - r.listenerLock.Lock() - zkListener = r.listener - r.listenerLock.Unlock() - if zkListener != nil { - return zkListener, nil - } - - r.cltLock.Lock() - client := r.client - r.cltLock.Unlock() - if client == nil { - return nil, jerrors.New("zk connection broken") - } - - // new client & listener - zkListener = newZkEventListener(r, client) - - r.listenerLock.Lock() - r.listener = zkListener - r.listenerLock.Unlock() - - // listen - r.cltLock.Lock() - for _, svs := range r.services { - if svs.URLEqual(conf) { - go zkListener.listenServiceEvent(svs) - } - } - r.cltLock.Unlock() - - return zkListener, nil -} diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 697574b9e3c83a8413b9b313883a7e5977fdd2da..4a0780a3f7c7512268c46775cc6eba27a8132eca 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -102,11 +102,46 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) { return r, nil } + +func NewMockZkRegistry(url *config.URL) (*zk.TestCluster, *ZkRegistry, error) { + var ( + err error + r *ZkRegistry + c *zk.TestCluster + //event <-chan zk.Event + ) + + r = &ZkRegistry{ + URL: url, + birth: time.Now().UnixNano(), + done: make(chan struct{}), + services: make(map[string]config.URL), + zkPath: make(map[string]int), + } + + c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second) + if err != nil { + return nil, nil, err + } + + r.wg.Add(1) + go r.handleZkRestart() + + //if r.RoleType == registry.CONSUMER { + // r.wg.Add(1) + // go r.listen() + //} + + return c, r, nil +} func (r *ZkRegistry) GetUrl() config.URL { return *r.URL } func (r *ZkRegistry) Destroy() { + if r.listener != nil { + r.listener.Close() + } close(r.done) r.wg.Wait() r.closeRegisters() @@ -411,6 +446,49 @@ func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error { return nil } +func (r *ZkRegistry) Subscribe(conf config.URL) (registry.Listener, error) { + r.wg.Add(1) + return r.getListener(conf) +} + +func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) { + var ( + zkListener *zkEventListener + ) + + r.listenerLock.Lock() + zkListener = r.listener + r.listenerLock.Unlock() + if zkListener != nil { + return zkListener, nil + } + + r.cltLock.Lock() + client := r.client + r.cltLock.Unlock() + if client == nil { + return nil, jerrors.New("zk connection broken") + } + + // new client & listener + zkListener = newZkEventListener(r, client) + + r.listenerLock.Lock() + r.listener = zkListener + r.listenerLock.Unlock() + + // listen + r.cltLock.Lock() + for _, svs := range r.services { + if svs.URLEqual(conf) { + go zkListener.listenServiceEvent(svs) + } + } + r.cltLock.Unlock() + + return zkListener, nil +} + func (r *ZkRegistry) closeRegisters() { r.cltLock.Lock() defer r.cltLock.Unlock() diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go new file mode 100644 index 0000000000000000000000000000000000000000..186e564e8148441ef18439b84ec8d4742726c380 --- /dev/null +++ b/registry/zookeeper/registry_test.go @@ -0,0 +1,94 @@ +package zookeeper + +import ( + "context" + "strconv" + "testing" + "time" +) +import ( + "github.com/stretchr/testify/assert" +) +import ( + "github.com/dubbo/go-for-apache-dubbo/common/constant" + "github.com/dubbo/go-for-apache-dubbo/config" +) + +func Test_Register(t *testing.T) { + regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER))) + url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + + ts, reg, err := NewMockZkRegistry(®url) + defer ts.Stop() + err = reg.Register(url) + children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers") + assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children) + assert.NoError(t, err) +} + +func Test_Subscribe(t *testing.T) { + regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER))) + url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + ts, reg, err := NewMockZkRegistry(®url) + defer ts.Stop() + + //provider register + err = reg.Register(url) + assert.NoError(t, err) + + if err != nil { + return + } + + //consumer register + regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(config.CONSUMER)) + _, reg2, err := NewMockZkRegistry(®url) + reg2.client = reg.client + err = reg2.Register(url) + listener, err := reg2.Subscribe(url) + + serviceEvent, err := listener.Next() + assert.NoError(t, err) + if err != nil { + return + } + assert.Regexp(t, ".*ServiceEvent{Action{add service}.*", serviceEvent.String()) + +} + +func Test_ConsumerDestory(t *testing.T) { + regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.CONSUMER))) + url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + + ts, reg, err := NewMockZkRegistry(®url) + defer ts.Stop() + + assert.NoError(t, err) + err = reg.Register(url) + assert.NoError(t, err) + _, err = reg.Subscribe(url) + assert.NoError(t, err) + + //listener.Close() + time.Sleep(1e9) + reg.Destroy() + assert.Equal(t, false, reg.IsAvailable()) + +} + +func Test_ProviderDestory(t *testing.T) { + regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER))) + url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"})) + + ts, reg, err := NewMockZkRegistry(®url) + defer ts.Stop() + + assert.NoError(t, err) + err = reg.Register(url) + + //listener.Close() + time.Sleep(1e9) + reg.Destroy() + time.Sleep(1e9) + assert.Equal(t, false, reg.IsAvailable()) +} diff --git a/registry/zookeeper/zk_client.go b/registry/zookeeper/zk_client.go index afd93496691fea3fef4efed19212686a7d400aaf..42c6b5b271c51f17e187b3116c0c8d5ee11fbd4b 100644 --- a/registry/zookeeper/zk_client.go +++ b/registry/zookeeper/zk_client.go @@ -89,6 +89,40 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (* return z, nil } +func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster, *zookeeperClient, <-chan zk.Event, error) { + var ( + err error + event <-chan zk.Event + z *zookeeperClient + ) + + z = &zookeeperClient{ + name: name, + zkAddrs: []string{}, + timeout: timeout, + exit: make(chan struct{}), + eventRegistry: make(map[string][]*chan struct{}), + } + // connect to zookeeper + + ts, err := zk.StartTestCluster(1, nil, nil) + if err != nil { + return nil, nil, nil, jerrors.Annotatef(err, "zk.Connect") + } + + //callbackChan := make(chan zk.Event) + //f := func(event zk.Event) { + // callbackChan <- event + //} + + z.conn, event, err = ts.ConnectWithOptions(timeout) + if err != nil { + return nil, nil, nil, jerrors.Annotatef(err, "zk.Connect") + } + //z.wait.Add(1) + + return ts, z, event, nil +} func (z *zookeeperClient) handleZkEvent(session <-chan zk.Event) { var ( diff --git a/registry/zookeeper/zk_client_test.go b/registry/zookeeper/zk_client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bd5b715a164edf1c0e0e8af020ddb90b9ff66eee --- /dev/null +++ b/registry/zookeeper/zk_client_test.go @@ -0,0 +1,132 @@ +package zookeeper + +import ( + "fmt" + "testing" + "time" +) +import ( + "github.com/samuel/go-zookeeper/zk" + "github.com/stretchr/testify/assert" +) + +func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) { + for _, state := range expectedStates { + for { + event, ok := <-c + if !ok { + t.Fatalf("unexpected channel close for %s", source) + } + fmt.Println(event) + if event.Type != zk.EventSession { + continue + } + + if event.State != state { + t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State) + } + break + } + } +} +func verifyEventOrder(t *testing.T, c <-chan zk.Event, expectedEvent []zk.EventType, source string) { + for _, e := range expectedEvent { + for { + event, ok := <-c + if !ok { + t.Fatalf("unexpected channel close for %s", source) + } + + if event.Type != e { + t.Fatalf("mismatched state order from %s, expected %v, received %v", source, event, event.Type) + } + + break + } + } +} + +//func Test_newZookeeperClient(t *testing.T) { +// ts, err := zk.StartTestCluster(1, nil, nil) +// if err != nil { +// t.Fatal(err) +// } +// defer ts.Stop() +// +// callbackChan := make(chan zk.Event) +// f := func(event zk.Event) { +// callbackChan <- event +// } +// +// zook, eventChan, err := ts.ConnectWithOptions(15*time.Second, zk.WithEventCallback(f)) +// if err != nil { +// t.Fatalf("Connect returned error: %+v", err) +// } +// +// states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} +// verifyEventStateOrder(t, callbackChan, states, "callback") +// verifyEventStateOrder(t, eventChan, states, "event channel") +// +// zook.Close() +// verifyEventStateOrder(t, callbackChan, []zk.State{zk.StateDisconnected}, "callback") +// verifyEventStateOrder(t, eventChan, []zk.State{zk.StateDisconnected}, "event channel") +// +//} + +func Test_newMockZookeeperClient(t *testing.T) { + ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + defer ts.Stop() + states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} + verifyEventStateOrder(t, event, states, "event channel") + + z.Close() + verifyEventStateOrder(t, event, []zk.State{zk.StateDisconnected}, "event channel") +} + +func TestCreate(t *testing.T) { + ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + defer ts.Stop() + err := z.Create("test1/test2/test3/test4") + assert.NoError(t, err) + + states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} + verifyEventStateOrder(t, event, states, "event channel") +} + +func TestCreateDelete(t *testing.T) { + ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + defer ts.Stop() + + states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} + verifyEventStateOrder(t, event, states, "event channel") + err := z.Create("/test1/test2/test3/test4") + assert.NoError(t, err) + err2 := z.Delete("/test1/test2/test3/test4") + assert.NoError(t, err2) + //verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel") +} + +func TestRegisterTemp(t *testing.T) { + ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + defer ts.Stop() + err := z.Create("/test1/test2/test3") + assert.NoError(t, err) + + tmpath, err := z.RegisterTemp("/test1/test2/test3", "test4") + assert.NoError(t, err) + assert.Equal(t, "/test1/test2/test3/test4", tmpath) + states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} + verifyEventStateOrder(t, event, states, "event channel") +} + +func TestRegisterTempSeq(t *testing.T) { + ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second) + defer ts.Stop() + err := z.Create("/test1/test2/test3") + assert.NoError(t, err) + tmpath, err := z.RegisterTempSeq("/test1/test2/test3", []byte("test")) + assert.NoError(t, err) + assert.Equal(t, "/test1/test2/test3/0000000000", tmpath) + states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} + verifyEventStateOrder(t, event, states, "event channel") +}