Skip to content
Snippets Groups Projects
service_discovery_test.go 7.76 KiB
Newer Older
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package zookeeper

import (
randy's avatar
randy committed
	"context"
	"strconv"
Patrick's avatar
Patrick committed
	"sync"
import (
	"github.com/dubbogo/go-zookeeper/zk"
randy's avatar
randy committed
	gxset "github.com/dubbogo/gost/container/set"
	"github.com/stretchr/testify/assert"
)

import (
randy's avatar
randy committed
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
flycash's avatar
flycash committed
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/observer"
randy's avatar
randy committed
	"github.com/apache/dubbo-go/common/observer/dispatcher"
	"github.com/apache/dubbo-go/config"
randy's avatar
randy committed
	"github.com/apache/dubbo-go/metadata/mapping"
	"github.com/apache/dubbo-go/protocol"
	"github.com/apache/dubbo-go/registry"
randy's avatar
randy committed
	"github.com/apache/dubbo-go/registry/event"
var testName = "test"

wangwx's avatar
wangwx committed
var tc *zk.TestCluster

func prepareData(t *testing.T) *zk.TestCluster {
wangwx's avatar
wangwx committed
	var err error
	tc, err = zk.StartTestCluster(1, nil, nil)
	assert.NoError(t, err)
wangwx's avatar
wangwx committed
	assert.NotNil(t, tc.Servers[0])
	address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)

	config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
		Protocol:  "zookeeper",
		RemoteRef: "test",
	}

	config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
		Address:    address,
		TimeoutStr: "10s",
	}
wangwx's avatar
wangwx committed
	return tc
}

func TestNewZookeeperServiceDiscovery(t *testing.T) {
	name := "zookeeper1"
	_, err := newZookeeperServiceDiscovery(name)

	// the ServiceDiscoveryConfig not found
	assert.NotNil(t, err)

	sdc := &config.ServiceDiscoveryConfig{
		Protocol:  "zookeeper",
		RemoteRef: "mock",
	}
	config.GetBaseConfig().ServiceDiscoveries[name] = sdc
	_, err = newZookeeperServiceDiscovery(name)

	// RemoteConfig not found
	assert.NotNil(t, err)
}

func TestCURDZookeeperServiceDiscovery(t *testing.T) {
wangwx's avatar
wangwx committed
	prepareData(t)
randy's avatar
randy committed
	extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
		return &dispatcher.MockEventDispatcher{}
	})
	extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping {
		return &mockServiceNameMapping{}
	})

	extension.SetProtocol("mock", func() protocol.Protocol {
		return &mockProtocol{}
	})

	sd, err := newZookeeperServiceDiscovery(testName)
	assert.Nil(t, err)
AlexStocks's avatar
AlexStocks committed
	defer func() {
AlexStocks's avatar
AlexStocks committed
		_ = sd.Destroy()
AlexStocks's avatar
AlexStocks committed
	}()
randy's avatar
randy committed
	ins := &registry.DefaultServiceInstance{
AlexStocks's avatar
AlexStocks committed
		ID:          "testID",
		ServiceName: testName,
		Host:        "127.0.0.1",
		Port:        2233,
		Enable:      true,
		Healthy:     true,
randy's avatar
randy committed
		Metadata:    nil,
	}
	ins.Metadata = map[string]string{"t1": "test1", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
	err = sd.Register(ins)

	assert.Nil(t, err)
Patrick's avatar
Patrick committed

	testsPager := sd.GetHealthyInstancesByPage(testName, 0, 1, true)
	assert.Equal(t, 1, testsPager.GetDataSize())
	assert.Equal(t, 1, testsPager.GetTotalPages())
	test := testsPager.GetData()[0].(registry.ServiceInstance)
AlexStocks's avatar
AlexStocks committed
	assert.Equal(t, "127.0.0.1:2233", test.GetID())
Patrick's avatar
Patrick committed
	assert.Equal(t, "test1", test.GetMetadata()["t1"])

randy's avatar
randy committed
	ins = &registry.DefaultServiceInstance{
AlexStocks's avatar
AlexStocks committed
		ID:          "testID",
		ServiceName: testName,
		Host:        "127.0.0.1",
		Port:        2233,
		Enable:      true,
		Healthy:     true,
randy's avatar
randy committed
	}
	ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}

	err = sd.Update(ins)

	assert.Nil(t, err)
Patrick's avatar
Patrick committed

	testsPager = sd.GetInstancesByPage(testName, 0, 1)
	assert.Equal(t, 1, testsPager.GetDataSize())
	test = testsPager.GetData()[0].(registry.ServiceInstance)
	assert.Equal(t, "test12", test.GetMetadata()["t1"])

	testsMap := sd.GetRequestInstances([]string{testName}, 0, 1)
	assert.Equal(t, 1, len(testsMap))
	assert.Equal(t, 1, testsMap[testName].GetDataSize())
	test = testsMap[testName].GetData()[0].(registry.ServiceInstance)
	assert.Equal(t, "test12", test.GetMetadata()["t1"])

	names := sd.GetServices()
	assert.Equal(t, 1, names.Size())
	assert.Equal(t, testName, names.Values()[0])

	err = sd.Unregister(&registry.DefaultServiceInstance{
AlexStocks's avatar
AlexStocks committed
		ID:          "testID",
		ServiceName: testName,
		Host:        "127.0.0.1",
		Port:        2233,
		Enable:      true,
		Healthy:     true,
		Metadata:    nil,
	})
	assert.Nil(t, err)
}
Patrick's avatar
Patrick committed

func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
AlexStocks's avatar
AlexStocks committed
	defer func() {
wangwx's avatar
wangwx committed
		_ = tc.Stop()
AlexStocks's avatar
AlexStocks committed
	}()
Patrick's avatar
Patrick committed
	sd, err := newZookeeperServiceDiscovery(testName)
	assert.Nil(t, err)
AlexStocks's avatar
AlexStocks committed
	defer func() {
AlexStocks's avatar
AlexStocks committed
		_ = sd.Destroy()
AlexStocks's avatar
AlexStocks committed
	}()
Patrick's avatar
Patrick committed

randy's avatar
randy committed
	ins := &registry.DefaultServiceInstance{
AlexStocks's avatar
AlexStocks committed
		ID:          "testID",
Patrick's avatar
Patrick committed
		ServiceName: testName,
		Host:        "127.0.0.1",
		Port:        2233,
		Enable:      true,
		Healthy:     true,
		Metadata:    nil,
randy's avatar
randy committed
	}
	ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
	err = sd.Register(ins)

Patrick's avatar
Patrick committed
	assert.Nil(t, err)
	wg := &sync.WaitGroup{}
	wg.Add(1)
	tn := &testNotify{
		wg: wg,
		t:  t,
	}
Patrick's avatar
Patrick committed
	hs := gxset.NewSet()
	hs.Add(testName)
Patrick's avatar
Patrick committed
	sicl := event.NewServiceInstancesChangedListener(hs)
randy's avatar
randy committed
	sicl.AddListenerAndNotify(testName, tn)
Patrick's avatar
Patrick committed
	extension.SetAndInitGlobalDispatcher("direct")
	extension.GetGlobalDispatcher().AddEventListener(sicl)
	err = sd.AddListener(sicl)
AlexStocks's avatar
AlexStocks committed
	assert.NoError(t, err)
Patrick's avatar
Patrick committed

randy's avatar
randy committed
	ins = &registry.DefaultServiceInstance{
AlexStocks's avatar
AlexStocks committed
		ID:          "testID",
Patrick's avatar
Patrick committed
		ServiceName: testName,
		Host:        "127.0.0.1",
		Port:        2233,
		Enable:      true,
		Healthy:     true,
		Metadata:    nil,
randy's avatar
randy committed
	}
	ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
	err = sd.Update(ins)
AlexStocks's avatar
AlexStocks committed
	assert.NoError(t, err)
Patrick's avatar
Patrick committed
	tn.wg.Wait()
}

type testNotify struct {
	wg *sync.WaitGroup
	t  *testing.T
}

randy's avatar
randy committed
func (tn *testNotify) Notify(e *registry.ServiceEvent) {
	assert.Equal(tn.t, "2233", e.Service.Port)
Patrick's avatar
Patrick committed
	tn.wg.Done()
}
randy's avatar
randy committed
func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) {

}

type mockServiceNameMapping struct{}

func (m *mockServiceNameMapping) Map(string, string, string, string) error {
	return nil
}

func (m *mockServiceNameMapping) Get(string, string, string, string) (*gxset.HashSet, error) {
	return gxset.NewSet(config.GetApplicationConfig().Name), nil
}

type mockProtocol struct{}

func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter {
	panic("implement me")
}

func (m mockProtocol) Refer(*common.URL) protocol.Invoker {
	return &mockInvoker{}
}

func (m mockProtocol) Destroy() {
	panic("implement me")
}

type mockInvoker struct{}

func (m *mockInvoker) GetURL() *common.URL {
	panic("implement me")
}

func (m *mockInvoker) IsAvailable() bool {
	panic("implement me")
}

func (m *mockInvoker) Destroy() {
	panic("implement me")
}

func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) protocol.Result {
	// for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent
	serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"}
	services := make(map[string]*common.ServiceInfo)
	services["test"] = serviceInfo
	return &protocol.RPCResult{
		Rest: &common.MetadataInfo{
			Services: services,
		},
	}
}