Skip to content
Snippets Groups Projects
Commit f63b150e authored by fangyincheng's avatar fangyincheng
Browse files

Merge remote-tracking branch 'upstream/develop' into develop

parents 6db6c9d9 01e16d78
No related branches found
No related tags found
No related merge requests found
Showing
with 94 additions and 177 deletions
......@@ -6,19 +6,19 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type FailoverCluster struct {
type failoverCluster struct {
}
const name = "failover"
func init() {
extension.SetCluster(name, NewFailoverCluster)
extension.SetCluster(name, newFailoverCluster)
}
func NewFailoverCluster() cluster.Cluster {
return &FailoverCluster{}
func newFailoverCluster() cluster.Cluster {
return &failoverCluster{}
}
func (cluster *FailoverCluster) Join(directory cluster.Directory) protocol.Invoker {
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
}
......@@ -5,13 +5,13 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type MockCluster struct {
type mockCluster struct {
}
func NewMockCluster() cluster.Cluster {
return &MockCluster{}
return &mockCluster{}
}
func (cluster *MockCluster) Join(directory cluster.Directory) protocol.Invoker {
func (cluster *mockCluster) Join(directory cluster.Directory) protocol.Invoker {
return protocol.NewBaseInvoker(directory.GetUrl())
}
......@@ -6,17 +6,17 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type RegistryAwareCluster struct {
type registryAwareCluster struct {
}
func init() {
extension.SetCluster("registryAware", NewRegistryAwareCluster)
extension.SetCluster("registryAware", newRegistryAwareCluster)
}
func NewRegistryAwareCluster() cluster.Cluster {
return &RegistryAwareCluster{}
func newRegistryAwareCluster() cluster.Cluster {
return &registryAwareCluster{}
}
func (cluster *RegistryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
return newRegistryAwareClusterInvoker(directory)
}
package directory
import (
"context"
"fmt"
"strings"
"time"
)
import (
jerrors "github.com/juju/errors"
)
import (
"github.com/dubbo/go-for-apache-dubbo/common"
)
//////////////////////////////////////////
// registry array
// should be returned by registry ,will be used by client & waiting to selector
//////////////////////////////////////////
var (
ErrServiceArrayEmpty = jerrors.New("registryArray empty")
ErrServiceArrayTimeout = jerrors.New("registryArray timeout")
)
type ServiceArray struct {
context context.Context
arr []common.URL
birth time.Time
idx int64
}
func NewServiceArray(ctx context.Context, arr []common.URL) *ServiceArray {
return &ServiceArray{
context: ctx,
arr: arr,
birth: time.Now(),
}
}
func (s *ServiceArray) GetIdx() *int64 {
return &s.idx
}
func (s *ServiceArray) GetSize() int64 {
return int64(len(s.arr))
}
func (s *ServiceArray) GetService(i int64) common.URL {
return s.arr[i]
}
func (s *ServiceArray) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("birth:%s, idx:%d, arr len:%d, arr:{", s.birth, s.idx, len(s.arr)))
for i := range s.arr {
builder.WriteString(fmt.Sprintf("%d:%s, ", i, s.arr[i]))
}
builder.WriteString("}")
return builder.String()
}
func (s *ServiceArray) Add(url common.URL, ttl time.Duration) {
s.arr = append(s.arr, url)
s.birth = time.Now().Add(ttl)
}
func (s *ServiceArray) Del(url common.URL, ttl time.Duration) {
for i, svc := range s.arr {
if svc.PrimitiveURL == url.PrimitiveURL {
s.arr = append(s.arr[:i], s.arr[i+1:]...)
s.birth = time.Now().Add(ttl)
break
}
}
}
......@@ -5,20 +5,20 @@ import (
"github.com/dubbo/go-for-apache-dubbo/protocol"
)
type StaticDirectory struct {
type staticDirectory struct {
BaseDirectory
invokers []protocol.Invoker
}
func NewStaticDirectory(invokers []protocol.Invoker) *StaticDirectory {
return &StaticDirectory{
func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory {
return &staticDirectory{
BaseDirectory: NewBaseDirectory(&common.URL{}),
invokers: invokers,
}
}
//for-loop invokers ,if all invokers is available ,then it means directory is available
func (dir *StaticDirectory) IsAvailable() bool {
func (dir *staticDirectory) IsAvailable() bool {
for _, invoker := range dir.invokers {
if !invoker.IsAvailable() {
return false
......@@ -27,12 +27,12 @@ func (dir *StaticDirectory) IsAvailable() bool {
return true
}
func (dir *StaticDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
//TODO:Here should add router
return dir.invokers
}
func (dir *StaticDirectory) Destroy() {
func (dir *staticDirectory) Destroy() {
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.invokers {
ivk.Destroy()
......
......@@ -14,17 +14,17 @@ import (
const name = "random"
func init() {
extension.SetLoadbalance(name, NewRandomLoadBalance)
extension.SetLoadbalance(name, newRandomLoadBalance)
}
type RandomLoadBalance struct {
type randomLoadBalance struct {
}
func NewRandomLoadBalance() cluster.LoadBalance {
return &RandomLoadBalance{}
func newRandomLoadBalance() cluster.LoadBalance {
return &randomLoadBalance{}
}
func (lb *RandomLoadBalance) Select(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) protocol.Invoker {
func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) protocol.Invoker {
var length int
if length = len(invokers); length == 1 {
return invokers[0]
......
......@@ -27,7 +27,7 @@ type Options struct {
}
type Option func(*Options)
type RegistryDirectory struct {
type registryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
listenerLock sync.Mutex
......@@ -38,7 +38,7 @@ type RegistryDirectory struct {
Options
}
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*RegistryDirectory, error) {
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
......@@ -49,7 +49,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O
if url.SubURL == nil {
return nil, jerrors.Errorf("url is invalid, suburl can not be nil")
}
return &RegistryDirectory{
return &registryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
......@@ -60,7 +60,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O
}
//subscibe from registry
func (dir *RegistryDirectory) Subscribe(url common.URL) {
func (dir *registryDirectory) Subscribe(url common.URL) {
for {
if !dir.registry.IsAvailable() {
log.Warn("event listener game over.")
......@@ -95,7 +95,7 @@ func (dir *RegistryDirectory) Subscribe(url common.URL) {
}
//subscribe service from registry , and update the cacheServices
func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
func (dir *registryDirectory) update(res *registry.ServiceEvent) {
if res == nil {
return
}
......@@ -107,35 +107,34 @@ func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
dir.refreshInvokers(res)
}
func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
var newCacheInvokersMap sync.Map
func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) {
switch res.Action {
case registry.ServiceAdd:
//dir.cacheService.Add(res.Path, dir.serviceTTL)
newCacheInvokersMap = *dir.cacheInvoker(res.Service)
dir.cacheInvoker(res.Service)
case registry.ServiceDel:
//dir.cacheService.Del(res.Path, dir.serviceTTL)
newCacheInvokersMap = *dir.uncacheInvoker(res.Service)
dir.uncacheInvoker(res.Service)
log.Info("selector delete service url{%s}", res.Service)
default:
return
}
newInvokers := dir.toGroupInvokers(&newCacheInvokersMap)
newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
}
func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protocol.Invoker {
func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker {
newInvokersList := []protocol.Invoker{}
groupInvokersMap := make(map[string][]protocol.Invoker)
groupInvokersList := []protocol.Invoker{}
newInvokersMap.Range(func(key, value interface{}) bool {
dir.cacheInvokersMap.Range(func(key, value interface{}) bool {
newInvokersList = append(newInvokersList, value.(protocol.Invoker))
return true
})
......@@ -163,42 +162,38 @@ func (dir *RegistryDirectory) toGroupInvokers(newInvokersMap *sync.Map) []protoc
return groupInvokersList
}
func (dir *RegistryDirectory) uncacheInvoker(url common.URL) *sync.Map {
func (dir *registryDirectory) uncacheInvoker(url common.URL) {
log.Debug("service will be deleted in cache invokers: invokers key is %s!", url.Key())
newCacheInvokers := dir.cacheInvokersMap
newCacheInvokers.Delete(url.Key())
return newCacheInvokers
dir.cacheInvokersMap.Delete(url.Key())
}
func (dir *RegistryDirectory) cacheInvoker(url common.URL) *sync.Map {
func (dir *registryDirectory) cacheInvoker(url common.URL) {
referenceUrl := dir.GetUrl().SubURL
newCacheInvokers := dir.cacheInvokersMap
//check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
url = mergeUrl(url, referenceUrl)
if _, ok := newCacheInvokers.Load(url.Key()); !ok {
if _, ok := dir.cacheInvokersMap.Load(url.Key()); !ok {
log.Debug("service will be added in cache invokers: invokers key is %s!", url.Key())
newInvoker := extension.GetProtocolExtension(protocolwrapper.FILTER).Refer(url)
if newInvoker != nil {
newCacheInvokers.Store(url.Key(), newInvoker)
dir.cacheInvokersMap.Store(url.Key(), newInvoker)
}
}
}
return newCacheInvokers
}
//select the protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
//TODO:router
return dir.cacheInvokers
}
func (dir *RegistryDirectory) IsAvailable() bool {
func (dir *registryDirectory) IsAvailable() bool {
return dir.BaseDirectory.IsAvailable()
}
func (dir *RegistryDirectory) Destroy() {
func (dir *registryDirectory) Destroy() {
//TODO:unregister & unsubscribe
dir.BaseDirectory.Destroy(func() {
for _, ivk := range dir.cacheInvokers {
......
......@@ -15,7 +15,7 @@ import (
"github.com/dubbo/go-for-apache-dubbo/common"
"github.com/dubbo/go-for-apache-dubbo/common/constant"
"github.com/dubbo/go-for-apache-dubbo/common/extension"
"github.com/dubbo/go-for-apache-dubbo/protocol"
"github.com/dubbo/go-for-apache-dubbo/protocol/invocation"
"github.com/dubbo/go-for-apache-dubbo/protocol/protocolwrapper"
"github.com/dubbo/go-for-apache-dubbo/registry"
)
......@@ -93,12 +93,12 @@ func Test_List(t *testing.T) {
registryDirectory, _ := normalRegistryDir()
time.Sleep(1e9)
assert.Len(t, registryDirectory.List(protocol.Invocation()), 3)
assert.Len(t, registryDirectory.List(&invocation.RPCInvocation{}), 3)
assert.Equal(t, true, registryDirectory.IsAvailable())
}
func normalRegistryDir() (*RegistryDirectory, *registry.MockRegistry) {
func normalRegistryDir() (*registryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111")
......
......@@ -18,9 +18,9 @@ import (
directory2 "github.com/dubbo/go-for-apache-dubbo/registry/directory"
)
var registryProtocol *RegistryProtocol
var regProtocol *registryProtocol
type RegistryProtocol struct {
type registryProtocol struct {
invokers []protocol.Invoker
// Registry Map<RegistryAddress, Registry>
registries sync.Map
......@@ -33,8 +33,8 @@ func init() {
extension.SetProtocol("registry", GetProtocol)
}
func NewRegistryProtocol() *RegistryProtocol {
return &RegistryProtocol{
func newRegistryProtocol() *registryProtocol {
return &registryProtocol{
registries: sync.Map{},
bounds: sync.Map{},
}
......@@ -47,7 +47,7 @@ func getRegistry(regUrl *common.URL) registry.Registry {
}
return reg
}
func (proto *RegistryProtocol) Refer(url common.URL) protocol.Invoker {
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
var serviceUrl = registryUrl.SubURL
......@@ -84,7 +84,7 @@ func (proto *RegistryProtocol) Refer(url common.URL) protocol.Invoker {
return invoker
}
func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
registryUrl := proto.getRegistryUrl(invoker)
providerUrl := proto.getProviderUrl(invoker)
......@@ -119,7 +119,7 @@ func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}
func (proto *RegistryProtocol) Destroy() {
func (proto *registryProtocol) Destroy() {
for _, ivk := range proto.invokers {
ivk.Destroy()
}
......@@ -142,7 +142,7 @@ func (proto *RegistryProtocol) Destroy() {
})
}
func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) common.URL {
func (*registryProtocol) getRegistryUrl(invoker protocol.Invoker) common.URL {
//here add * for return a new url
url := invoker.GetUrl()
//if the protocol == registry ,set protocol the registry value in url.params
......@@ -153,16 +153,16 @@ func (*RegistryProtocol) getRegistryUrl(invoker protocol.Invoker) common.URL {
return url
}
func (*RegistryProtocol) getProviderUrl(invoker protocol.Invoker) common.URL {
func (*registryProtocol) getProviderUrl(invoker protocol.Invoker) common.URL {
url := invoker.GetUrl()
return *url.SubURL
}
func GetProtocol() protocol.Protocol {
if registryProtocol != nil {
return registryProtocol
if regProtocol != nil {
return regProtocol
}
return NewRegistryProtocol()
return newRegistryProtocol()
}
type wrappedInvoker struct {
......
......@@ -17,7 +17,7 @@ import (
"github.com/dubbo/go-for-apache-dubbo/registry"
)
func referNormal(t *testing.T, regProtocol *RegistryProtocol) {
func referNormal(t *testing.T, regProtocol *registryProtocol) {
extension.SetProtocol("registry", GetProtocol)
extension.SetRegistry("mock", registry.NewMockRegistry)
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
......@@ -33,12 +33,12 @@ func referNormal(t *testing.T, regProtocol *RegistryProtocol) {
assert.Equal(t, invoker.GetUrl().String(), url.String())
}
func TestRefer(t *testing.T) {
regProtocol := NewRegistryProtocol()
regProtocol := newRegistryProtocol()
referNormal(t, regProtocol)
}
func TestMultiRegRefer(t *testing.T) {
regProtocol := NewRegistryProtocol()
regProtocol := newRegistryProtocol()
referNormal(t, regProtocol)
url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:2222")
suburl2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000//", common.WithParamsValue(constant.CLUSTER_KEY, "mock"))
......@@ -55,7 +55,7 @@ func TestMultiRegRefer(t *testing.T) {
}
func TestOneRegRefer(t *testing.T) {
regProtocol := NewRegistryProtocol()
regProtocol := newRegistryProtocol()
referNormal(t, regProtocol)
url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111")
......@@ -71,7 +71,7 @@ func TestOneRegRefer(t *testing.T) {
})
assert.Equal(t, count, 1)
}
func exporterNormal(t *testing.T, regProtocol *RegistryProtocol) {
func exporterNormal(t *testing.T, regProtocol *registryProtocol) {
extension.SetProtocol("registry", GetProtocol)
extension.SetRegistry("mock", registry.NewMockRegistry)
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
......@@ -87,12 +87,12 @@ func exporterNormal(t *testing.T, regProtocol *RegistryProtocol) {
}
func TestExporter(t *testing.T) {
regProtocol := NewRegistryProtocol()
regProtocol := newRegistryProtocol()
exporterNormal(t, regProtocol)
}
func TestMultiRegAndMultiProtoExporter(t *testing.T) {
regProtocol := NewRegistryProtocol()
regProtocol := newRegistryProtocol()
exporterNormal(t, regProtocol)
url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:2222")
......@@ -118,7 +118,7 @@ func TestMultiRegAndMultiProtoExporter(t *testing.T) {
}
func TestOneRegAndProtoExporter(t *testing.T) {
regProtocol := NewRegistryProtocol()
regProtocol := newRegistryProtocol()
exporterNormal(t, regProtocol)
url2, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111")
......@@ -144,7 +144,7 @@ func TestOneRegAndProtoExporter(t *testing.T) {
}
func TestDestry(t *testing.T) {
regProtocol := NewRegistryProtocol()
regProtocol := newRegistryProtocol()
referNormal(t, regProtocol)
exporterNormal(t, regProtocol)
......
......@@ -38,10 +38,10 @@ type zkEventListener struct {
serviceMapLock sync.Mutex
serviceMap map[string]struct{}
wg sync.WaitGroup
registry *ZkRegistry
registry *zkRegistry
}
func newZkEventListener(registry *ZkRegistry, client *zookeeperClient) *zkEventListener {
func newZkEventListener(registry *zkRegistry, client *zookeeperClient) *zkEventListener {
return &zkEventListener{
client: client,
registry: registry,
......
......@@ -40,15 +40,15 @@ var (
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = gxnet.GetLocalIP()
//plugins.PluggableRegistries["zookeeper"] = NewZkRegistry
extension.SetRegistry("zookeeper", NewZkRegistry)
//plugins.PluggableRegistries["zookeeper"] = newZkRegistry
extension.SetRegistry("zookeeper", newZkRegistry)
}
/////////////////////////////////////
// zookeeper registry
/////////////////////////////////////
type ZkRegistry struct {
type zkRegistry struct {
context context.Context
*common.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
......@@ -66,13 +66,13 @@ type ZkRegistry struct {
zkPath map[string]int // key = protocol://ip:port/interface
}
func NewZkRegistry(url *common.URL) (registry.Registry, error) {
func newZkRegistry(url *common.URL) (registry.Registry, error) {
var (
err error
r *ZkRegistry
r *zkRegistry
)
r = &ZkRegistry{
r = &zkRegistry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
......@@ -103,15 +103,15 @@ func NewZkRegistry(url *common.URL) (registry.Registry, error) {
return r, nil
}
func NewMockZkRegistry(url *common.URL) (*zk.TestCluster, *ZkRegistry, error) {
func newMockZkRegistry(url *common.URL) (*zk.TestCluster, *zkRegistry, error) {
var (
err error
r *ZkRegistry
r *zkRegistry
c *zk.TestCluster
//event <-chan zk.Event
)
r = &ZkRegistry{
r = &zkRegistry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
......@@ -134,11 +134,11 @@ func NewMockZkRegistry(url *common.URL) (*zk.TestCluster, *ZkRegistry, error) {
return c, r, nil
}
func (r *ZkRegistry) GetUrl() common.URL {
func (r *zkRegistry) GetUrl() common.URL {
return *r.URL
}
func (r *ZkRegistry) Destroy() {
func (r *zkRegistry) Destroy() {
if r.listener != nil {
r.listener.Close()
}
......@@ -147,7 +147,7 @@ func (r *ZkRegistry) Destroy() {
r.closeRegisters()
}
func (r *ZkRegistry) validateZookeeperClient() error {
func (r *zkRegistry) validateZookeeperClient() error {
var (
err error
)
......@@ -182,7 +182,7 @@ func (r *ZkRegistry) validateZookeeperClient() error {
return jerrors.Annotatef(err, "newZookeeperClient(address:%+v)", r.PrimitiveURL)
}
func (r *ZkRegistry) handleZkRestart() {
func (r *zkRegistry) handleZkRestart() {
var (
err error
flag bool
......@@ -247,7 +247,7 @@ LOOP:
}
}
func (r *ZkRegistry) Register(conf common.URL) error {
func (r *zkRegistry) Register(conf common.URL) error {
var (
ok bool
err error
......@@ -308,7 +308,7 @@ func (r *ZkRegistry) Register(conf common.URL) error {
return nil
}
func (r *ZkRegistry) register(c common.URL) error {
func (r *zkRegistry) register(c common.URL) error {
var (
err error
//revision string
......@@ -423,7 +423,7 @@ func (r *ZkRegistry) register(c common.URL) error {
return nil
}
func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error {
func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
var (
err error
zkPath string
......@@ -446,12 +446,12 @@ func (r *ZkRegistry) registerTempZookeeperNode(root string, node string) error {
return nil
}
func (r *ZkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
func (r *zkRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
r.wg.Add(1)
return r.getListener(conf)
}
func (r *ZkRegistry) getListener(conf common.URL) (*zkEventListener, error) {
func (r *zkRegistry) getListener(conf common.URL) (*zkEventListener, error) {
var (
zkListener *zkEventListener
)
......@@ -489,7 +489,7 @@ func (r *ZkRegistry) getListener(conf common.URL) (*zkEventListener, error) {
return zkListener, nil
}
func (r *ZkRegistry) closeRegisters() {
func (r *zkRegistry) closeRegisters() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
log.Info("begin to close provider zk client")
......@@ -499,7 +499,7 @@ func (r *ZkRegistry) closeRegisters() {
r.services = nil
}
func (r *ZkRegistry) IsAvailable() bool {
func (r *zkRegistry) IsAvailable() bool {
select {
case <-r.done:
return false
......
......@@ -18,7 +18,7 @@ func Test_Register(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
err = reg.Register(url)
children, _ := reg.client.getChildren("/dubbo/com.ikurento.user.UserProvider/providers")
......@@ -29,7 +29,7 @@ func Test_Register(t *testing.T) {
func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
//provider register
......@@ -42,7 +42,7 @@ func Test_Subscribe(t *testing.T) {
//consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
_, reg2, err := NewMockZkRegistry(&regurl)
_, reg2, err := newMockZkRegistry(&regurl)
reg2.client = reg.client
err = reg2.Register(url)
listener, err := reg2.Subscribe(url)
......@@ -60,7 +60,7 @@ func Test_ConsumerDestory(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
assert.NoError(t, err)
......@@ -80,7 +80,7 @@ func Test_ProviderDestory(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
ts, reg, err := NewMockZkRegistry(&regurl)
ts, reg, err := newMockZkRegistry(&regurl)
defer ts.Stop()
assert.NoError(t, err)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment