Skip to content
Snippets Groups Projects
Commit 3949065c authored by vito.he's avatar vito.he
Browse files

Add: unit test for registry finish

parent dc262fbd
No related branches found
No related tags found
No related merge requests found
Showing
with 585 additions and 148 deletions
......@@ -2,6 +2,7 @@ package directory
import (
"github.com/tevino/abool"
"sync"
)
import (
"github.com/dubbo/go-for-apache-dubbo/config"
......@@ -10,6 +11,7 @@ import (
type BaseDirectory struct {
url *config.URL
destroyed *abool.AtomicBool
mutex sync.Mutex
}
func NewBaseDirectory(url *config.URL) BaseDirectory {
......@@ -22,8 +24,11 @@ func (dir *BaseDirectory) GetUrl() config.URL {
return *dir.url
}
func (dir *BaseDirectory) Destroy() {
func (dir *BaseDirectory) Destroy(doDestroy func()) {
if dir.destroyed.SetToIf(false, true) {
dir.mutex.Lock()
doDestroy()
dir.mutex.Unlock()
}
}
......
package directory
import (
"github.com/dubbo/go-for-apache-dubbo/config"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
......@@ -11,7 +12,7 @@ type StaticDirectory struct {
func NewStaticDirectory(invokers []protocol.Invoker) *StaticDirectory {
return &StaticDirectory{
BaseDirectory: NewBaseDirectory(nil),
BaseDirectory: NewBaseDirectory(&config.URL{}),
invokers: invokers,
}
}
......@@ -30,3 +31,12 @@ func (dir *StaticDirectory) List(invocation protocol.Invocation) []protocol.Invo
//TODO:Here should add router
return dir.invokers
}
func (dir *StaticDirectory) Destroy() {
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.invokers {
ivk.Destroy()
}
dir.invokers = []protocol.Invoker{}
})
}
......@@ -2,7 +2,6 @@ package cluster
import (
"github.com/dubbo/go-for-apache-dubbo/cluster"
"github.com/dubbo/go-for-apache-dubbo/config"
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
......@@ -14,5 +13,5 @@ func NewMockCluster() cluster.Cluster {
}
func (cluster *MockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(config.URL{})
return protocol.NewBaseInvoker(directory.GetUrl())
}
......@@ -87,7 +87,11 @@ func WithParams(params url.Values) option {
url.Params = params
}
}
func WithParamsValue(key, val string) option {
return func(url *URL) {
url.Params.Set(key, val)
}
}
func WithProtocol(proto string) option {
return func(url *URL) {
url.Protocol = proto
......
......@@ -4,6 +4,8 @@ require (
github.com/AlexStocks/getty v0.0.0-20190331201845-1ca64ac5a589
github.com/AlexStocks/goext v0.3.2
github.com/AlexStocks/log4go v1.0.2
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/dubbogo/hessian2 v0.0.0-20190410112310-f093e4436e31
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7
github.com/montanaflynn/stats v0.5.0
......@@ -11,5 +13,6 @@ require (
github.com/stretchr/testify v1.3.0
github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5
go.uber.org/atomic v1.3.2
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/yaml.v2 v2.2.2
)
......@@ -8,6 +8,10 @@ github.com/AlexStocks/goext v0.3.2/go.mod h1:3M5j9Pjge4CdkNg2WIjRLUeoPedJHHKwkkg
github.com/AlexStocks/log4go v1.0.2 h1:1K5WM8KjSUECaoXUl8FSF05KGeCJDfBrhKUBsxwUvhk=
github.com/AlexStocks/log4go v1.0.2/go.mod h1:6kCCRo/orDo8mh5CEDOeuSSM674wBQ8M6E0K8dVOIz4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U=
......@@ -156,6 +160,8 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
......
......@@ -3,6 +3,7 @@ package protocolwrapper
import (
"github.com/dubbo/go-for-apache-dubbo/config"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"sync"
)
type mockProtocolFilter struct {
......@@ -13,7 +14,7 @@ func NewMockProtocolFilter() protocol.Protocol {
}
func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporter {
return protocol.NewBaseExporter("key", invoker, nil)
return protocol.NewBaseExporter("key", invoker, &sync.Map{})
}
func (pfw *mockProtocolFilter) Refer(url config.URL) protocol.Invoker {
......
......@@ -199,9 +199,13 @@ func (dir *RegistryDirectory) IsAvailable() bool {
}
func (dir *RegistryDirectory) Destroy() {
//dir.registry.Destroy() should move it in protocol
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy()
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers {
ivk.Destroy()
}
dir.cacheInvokers = []protocol.Invoker{}
})
}
// configuration > reference config >service config
......
......@@ -25,12 +25,12 @@ func TestSubscribe(t *testing.T) {
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000")
url.SubURL = &suburl
mockRegistry := registry.NewMockRegistry()
mockRegistry, _ := registry.NewMockRegistry(&config.URL{})
registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry)
go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))})
}
time.Sleep(1e9)
......@@ -43,23 +43,23 @@ func TestSubscribe_Delete(t *testing.T) {
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000")
url.SubURL = &suburl
mockRegistry := registry.NewMockRegistry()
mockRegistry, _ := registry.NewMockRegistry(&config.URL{})
registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry)
go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice"))
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"))})
}
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceDel, Service: *config.NewURLWithOptions("TEST0", config.WithProtocol("dubbo"))})
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceDel, Service: *config.NewURLWithOptions("TEST0", config.WithProtocol("dubbo"))})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
}
func TestSubscribe_InvalidUrl(t *testing.T) {
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
mockRegistry := registry.NewMockRegistry()
mockRegistry, _ := registry.NewMockRegistry(&config.URL{})
_, err := NewRegistryDirectory(&url, mockRegistry)
assert.Error(t, err)
}
......@@ -72,7 +72,7 @@ func TestSubscribe_Group(t *testing.T) {
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000")
suburl.Params.Set(constant.CLUSTER_KEY, "mock")
regurl.SubURL = &suburl
mockRegistry := registry.NewMockRegistry()
mockRegistry, _ := registry.NewMockRegistry(&config.URL{})
registryDirectory, _ := NewRegistryDirectory(&regurl, mockRegistry)
go registryDirectory.Subscribe(*config.NewURLWithOptions("testservice"))
......@@ -82,7 +82,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap.Set(constant.GROUP_KEY, "group1")
urlmap.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"),
config.WithParams(urlmap))})
}
//for group2
......@@ -90,7 +90,7 @@ func TestSubscribe_Group(t *testing.T) {
urlmap2.Set(constant.GROUP_KEY, "group2")
urlmap2.Set(constant.CLUSTER_KEY, "failover") //to test merge url
for i := 0; i < 3; i++ {
mockRegistry.MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"),
mockRegistry.(*registry.MockRegistry).MockEvent(&registry.ServiceEvent{Action: registry.ServiceAdd, Service: *config.NewURLWithOptions("TEST"+strconv.FormatInt(int64(i), 10), config.WithProtocol("dubbo"),
config.WithParams(urlmap2))})
}
......
package registry
import (
"github.com/dubbo/go-for-apache-dubbo/config"
"github.com/tevino/abool"
)
import (
"github.com/dubbo/go-for-apache-dubbo/config"
)
type MockRegistry struct {
listener *listener
destroyed *abool.AtomicBool
}
func NewMockRegistry() *MockRegistry {
func NewMockRegistry(url *config.URL) (Registry, error) {
registry := &MockRegistry{
destroyed: abool.NewBool(false),
}
listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)}
registry.listener = listener
return registry
return registry, nil
}
func (*MockRegistry) Register(url config.URL) error {
return nil
......
......@@ -21,6 +21,7 @@ import (
var registryProtocol *RegistryProtocol
type RegistryProtocol struct {
invokers []protocol.Invoker
// Registry Map<RegistryAddress, Registry>
registries sync.Map
//To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
......@@ -76,9 +77,11 @@ func (proto *RegistryProtocol) Refer(url config.URL) protocol.Invoker {
go directory.Subscribe(*serviceUrl)
//new cluster invoker
cluster := extension.GetCluster(serviceUrl.Params.Get(constant.CLUSTER_KEY))
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
return cluster.Join(directory)
invoker := cluster.Join(directory)
proto.invokers = append(proto.invokers, invoker)
return invoker
}
func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
......@@ -116,8 +119,27 @@ func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}
func (*RegistryProtocol) Destroy() {
func (proto *RegistryProtocol) Destroy() {
for _, ivk := range proto.invokers {
ivk.Destroy()
}
proto.invokers = []protocol.Invoker{}
proto.bounds.Range(func(key, value interface{}) bool {
exporter := value.(protocol.Exporter)
exporter.Unexport()
proto.bounds.Delete(key)
return true
})
proto.registries.Range(func(key, value interface{}) bool {
reg := value.(registry.Registry)
if reg.IsAvailable() {
reg.Destroy()
}
proto.registries.Delete(key)
return true
})
}
func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) config.URL {
......
package protocol
import (
"context"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
cluster "github.com/dubbo/go-for-apache-dubbo/cluster/support"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/config"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper"
"github.com/dubbo/go-for-apache-dubbo/registry"
)
func referNormal(t *testing.T, regProtocol *RegistryProtocol) {
extension.SetProtocol("registry", GetProtocol)
extension.SetRegistry("mock", registry.NewMockRegistry)
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
extension.SetCluster("mock", cluster.NewMockCluster)
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock"))
url.SubURL = &suburl
invoker := regProtocol.Refer(url)
assert.IsType(t, &protocol.BaseInvoker{}, invoker)
assert.Equal(t, invoker.GetUrl().String(), url.String())
}
func TestRefer(t *testing.T) {
regProtocol := NewRegistryProtocol()
referNormal(t, regProtocol)
}
func TestMultiRegRefer(t *testing.T) {
regProtocol := NewRegistryProtocol()
referNormal(t, regProtocol)
url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:2222")
suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock"))
url2.SubURL = &suburl2
regProtocol.Refer(url2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
return true
})
assert.Equal(t, count, 2)
}
func TestOneRegRefer(t *testing.T) {
regProtocol := NewRegistryProtocol()
referNormal(t, regProtocol)
url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock"))
url2.SubURL = &suburl2
regProtocol.Refer(url2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
return true
})
assert.Equal(t, count, 1)
}
func exporterNormal(t *testing.T, regProtocol *RegistryProtocol) {
extension.SetProtocol("registry", GetProtocol)
extension.SetRegistry("mock", registry.NewMockRegistry)
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
url, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock"))
url.SubURL = &suburl
invoker := protocol.NewBaseInvoker(url)
exporter := regProtocol.Export(invoker)
assert.IsType(t, &protocol.BaseExporter{}, exporter)
assert.Equal(t, exporter.GetInvoker().GetUrl().String(), suburl.String())
}
func TestExporter(t *testing.T) {
regProtocol := NewRegistryProtocol()
exporterNormal(t, regProtocol)
}
func TestMultiRegAndMultiProtoExporter(t *testing.T) {
regProtocol := NewRegistryProtocol()
exporterNormal(t, regProtocol)
url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:2222")
suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock"))
url2.SubURL = &suburl2
invoker2 := protocol.NewBaseInvoker(url2)
regProtocol.Export(invoker2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
return true
})
assert.Equal(t, count, 2)
var count2 int
regProtocol.bounds.Range(func(key, value interface{}) bool {
count2++
return true
})
assert.Equal(t, count2, 2)
}
func TestOneRegAndProtoExporter(t *testing.T) {
regProtocol := NewRegistryProtocol()
exporterNormal(t, regProtocol)
url2, _ := config.NewURL(context.TODO(), "mock://127.0.0.1:1111")
suburl2, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", config.WithParamsValue(constant.CLUSTER_KEY, "mock"))
url2.SubURL = &suburl2
invoker2 := protocol.NewBaseInvoker(url2)
regProtocol.Export(invoker2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
return true
})
assert.Equal(t, count, 1)
var count2 int
regProtocol.bounds.Range(func(key, value interface{}) bool {
count2++
return true
})
assert.Equal(t, count2, 1)
}
func TestDestry(t *testing.T) {
regProtocol := NewRegistryProtocol()
referNormal(t, regProtocol)
exporterNormal(t, regProtocol)
regProtocol.Destroy()
assert.Equal(t, len(regProtocol.invokers), 0)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
return true
})
assert.Equal(t, count, 0)
var count2 int
regProtocol.bounds.Range(func(key, value interface{}) bool {
count2++
return true
})
assert.Equal(t, count2, 0)
}
package zookeeper
import (
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/go-for-apache-dubbo/config"
"github.com/dubbo/go-for-apache-dubbo/registry"
)
// name: service@protocol
//func (r *ZkRegistry) GetService(conf registry.ReferenceConfig) ([]config.SubURL, error) {
//
// var (
// err error
// dubboPath string
// nodes []string
// listener *zkEventListener
// serviceURL config.SubURL
// serviceConf registry.ReferenceConfig
// ok bool
// )
// r.listenerLock.Lock()
// listener = r.listener
// r.listenerLock.Unlock()
//
// if listener != nil {
// listener.listenServiceEvent(conf)
// }
//
// r.cltLock.Lock()
// serviceConf, ok = r.services[conf.Key()]
// r.cltLock.Unlock()
// if !ok {
// return nil, jerrors.Errorf("Path{%s} has not been registered", conf.Key())
// }
// if !ok {
// return nil, jerrors.Errorf("Path{%s}: failed to get serviceConfigIf type", conf.Key())
// }
//
// dubboPath = fmt.Sprintf("/dubbo/%s/providers", conf.Path())
// err = r.validateZookeeperClient()
// if err != nil {
// return nil, jerrors.Trace(err)
// }
// r.cltLock.Lock()
// nodes, err = r.client.getChildren(dubboPath)
// r.cltLock.Unlock()
// if err != nil {
// log.Warn("getChildren(dubboPath{%s}) = error{%v}", dubboPath, err)
// return nil, jerrors.Trace(err)
// }
//
// var listenerServiceMap = make(map[string]config.SubURL)
// for _, n := range nodes {
//
// serviceURL, err = plugins.DefaultServiceURL()(n)
// if err != nil {
// log.Error("NewURL({%s}) = error{%v}", n, err)
// continue
// }
// if !serviceConf.ServiceEqual(serviceURL) {
// log.Warn("serviceURL{%s} is not compatible with ReferenceConfig{%#v}", serviceURL, serviceConf)
// continue
// }
//
// _, ok := listenerServiceMap[serviceURL.Params().Get(serviceURL.Location())]
// if !ok {
// listenerServiceMap[serviceURL.Location()] = serviceURL
// continue
// }
// }
//
// var services []config.SubURL
// for _, service := range listenerServiceMap {
// services = append(services, service)
// }
//
// return services, nil
//}
func (r *ZkRegistry) Subscribe(conf config.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf)
}
func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) {
var (
zkListener *zkEventListener
)
r.listenerLock.Lock()
zkListener = r.listener
r.listenerLock.Unlock()
if zkListener != nil {
return zkListener, nil
}
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, jerrors.New("zk connection broken")
}
// new client & listener
zkListener = newZkEventListener(r, client)
r.listenerLock.Lock()
r.listener = zkListener
r.listenerLock.Unlock()
// listen
r.cltLock.Lock()
for _, svs := range r.services {
if svs.URLEqual(conf) {
go zkListener.listenServiceEvent(svs)
}
}
r.cltLock.Unlock()
return zkListener, nil
}
......@@ -102,11 +102,46 @@ func NewZkRegistry(url *config.URL) (registry.Registry, error) {
return r, nil
}
func NewMockZkRegistry(url *config.URL) (*zk.TestCluster, *ZkRegistry, error) {
var (
err error
r *ZkRegistry
c *zk.TestCluster
//event <-chan zk.Event
)
r = &ZkRegistry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]config.URL),
zkPath: make(map[string]int),
}
c, r.client, _, err = newMockZookeeperClient("test", 15*time.Second)
if err != nil {
return nil, nil, err
}
r.wg.Add(1)
go r.handleZkRestart()
//if r.RoleType == registry.CONSUMER {
// r.wg.Add(1)
// go r.listen()
//}
return c, r, nil
}
func (r *ZkRegistry) GetUrl() config.URL {
return *r.URL
}
func (r *ZkRegistry) Destroy() {
if r.listener != nil {
r.listener.Close()
}
close(r.done)
r.wg.Wait()
r.closeRegisters()
......@@ -411,6 +446,49 @@ func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error {
return nil
}
func (r *ZkRegistry) Subscribe(conf config.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf)
}
func (r *ZkRegistry) getListener(conf config.URL) (*zkEventListener, error) {
var (
zkListener *zkEventListener
)
r.listenerLock.Lock()
zkListener = r.listener
r.listenerLock.Unlock()
if zkListener != nil {
return zkListener, nil
}
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, jerrors.New("zk connection broken")
}
// new client & listener
zkListener = newZkEventListener(r, client)
r.listenerLock.Lock()
r.listener = zkListener
r.listenerLock.Unlock()
// listen
r.cltLock.Lock()
for _, svs := range r.services {
if svs.URLEqual(conf) {
go zkListener.listenServiceEvent(svs)
}
}
r.cltLock.Unlock()
return zkListener, nil
}
func (r *ZkRegistry) closeRegisters() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
......
package zookeeper
import (
"context"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/config"
)
func Test_Register(t *testing.T) {
regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER)))
url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
defer ts.Stop()
err = reg.Register(url)
children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers")
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.NoError(t, err)
}
func Test_Subscribe(t *testing.T) {
regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER)))
url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
defer ts.Stop()
//provider register
err = reg.Register(url)
assert.NoError(t, err)
if err != nil {
return
}
//consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(config.CONSUMER))
_, reg2, err := NewMockZkRegistry(&regurl)
reg2.client = reg.client
err = reg2.Register(url)
listener, err := reg2.Subscribe(url)
serviceEvent, err := listener.Next()
assert.NoError(t, err)
if err != nil {
return
}
assert.Regexp(t, ".*ServiceEvent{Action{add service}.*", serviceEvent.String())
}
func Test_ConsumerDestory(t *testing.T) {
regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.CONSUMER)))
url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
defer ts.Stop()
assert.NoError(t, err)
err = reg.Register(url)
assert.NoError(t, err)
_, err = reg.Subscribe(url)
assert.NoError(t, err)
//listener.Close()
time.Sleep(1e9)
reg.Destroy()
assert.Equal(t, false, reg.IsAvailable())
}
func Test_ProviderDestory(t *testing.T) {
regurl, _ := config.NewURL(context.TODO(), "registry://127.0.0.1:1111", config.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(config.PROVIDER)))
url, _ := config.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", config.WithParamsValue(constant.CLUSTER_KEY, "mock"), config.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
defer ts.Stop()
assert.NoError(t, err)
err = reg.Register(url)
//listener.Close()
time.Sleep(1e9)
reg.Destroy()
time.Sleep(1e9)
assert.Equal(t, false, reg.IsAvailable())
}
......@@ -89,6 +89,40 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*
return z, nil
}
func newMockZookeeperClient(name string, timeout time.Duration) (*zk.TestCluster, *zookeeperClient, <-chan zk.Event, error) {
var (
err error
event <-chan zk.Event
z *zookeeperClient
)
z = &zookeeperClient{
name: name,
zkAddrs: []string{},
timeout: timeout,
exit: make(chan struct{}),
eventRegistry: make(map[string][]*chan struct{}),
}
// connect to zookeeper
ts, err := zk.StartTestCluster(1, nil, nil)
if err != nil {
return nil, nil, nil, jerrors.Annotatef(err, "zk.Connect")
}
//callbackChan := make(chan zk.Event)
//f := func(event zk.Event) {
// callbackChan <- event
//}
z.conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, jerrors.Annotatef(err, "zk.Connect")
}
//z.wait.Add(1)
return ts, z, event, nil
}
func (z *zookeeperClient) handleZkEvent(session <-chan zk.Event) {
var (
......
package zookeeper
import (
"fmt"
"testing"
"time"
)
import (
"github.com/samuel/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
fmt.Println(event)
if event.Type != zk.EventSession {
continue
}
if event.State != state {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State)
}
break
}
}
}
func verifyEventOrder(t *testing.T, c <-chan zk.Event, expectedEvent []zk.EventType, source string) {
for _, e := range expectedEvent {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
if event.Type != e {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, event, event.Type)
}
break
}
}
}
//func Test_newZookeeperClient(t *testing.T) {
// ts, err := zk.StartTestCluster(1, nil, nil)
// if err != nil {
// t.Fatal(err)
// }
// defer ts.Stop()
//
// callbackChan := make(chan zk.Event)
// f := func(event zk.Event) {
// callbackChan <- event
// }
//
// zook, eventChan, err := ts.ConnectWithOptions(15*time.Second, zk.WithEventCallback(f))
// if err != nil {
// t.Fatalf("Connect returned error: %+v", err)
// }
//
// states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
// verifyEventStateOrder(t, callbackChan, states, "callback")
// verifyEventStateOrder(t, eventChan, states, "event channel")
//
// zook.Close()
// verifyEventStateOrder(t, callbackChan, []zk.State{zk.StateDisconnected}, "callback")
// verifyEventStateOrder(t, eventChan, []zk.State{zk.StateDisconnected}, "event channel")
//
//}
func Test_newMockZookeeperClient(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
z.Close()
verifyEventStateOrder(t, event, []zk.State{zk.StateDisconnected}, "event channel")
}
func TestCreate(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("test1/test2/test3/test4")
assert.NoError(t, err)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func TestCreateDelete(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
err := z.Create("/test1/test2/test3/test4")
assert.NoError(t, err)
err2 := z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err2)
//verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
}
func TestRegisterTemp(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
tmpath, err := z.RegisterTemp("/test1/test2/test3", "test4")
assert.NoError(t, err)
assert.Equal(t, "/test1/test2/test3/test4", tmpath)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func TestRegisterTempSeq(t *testing.T) {
ts, z, event, _ := newMockZookeeperClient("test", 15*time.Second)
defer ts.Stop()
err := z.Create("/test1/test2/test3")
assert.NoError(t, err)
tmpath, err := z.RegisterTempSeq("/test1/test2/test3", []byte("test"))
assert.NoError(t, err)
assert.Equal(t, "/test1/test2/test3/0000000000", tmpath)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment