Skip to content
Snippets Groups Projects
Commit f40813a9 authored by scott.wang's avatar scott.wang
Browse files

ADD etcdv3 registry for dubbo

parent 98be65cf
No related branches found
No related tags found
No related merge requests found
......@@ -2,20 +2,26 @@ package etcdv3
import (
"context"
"strings"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
import (
"github.com/juju/errors"
"strings"
)
type dataListener struct {
interestedURL []*common.URL
listener *configurationListener
listener remoting.ConfigurationListener
}
func NewRegistryDataListener(listener *configurationListener) *dataListener {
func NewRegistryDataListener(listener remoting.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
}
......@@ -28,13 +34,13 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
serviceURL, err := common.NewURL(context.TODO(), url)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&remoting.ConfigChangeEvent{Key: eventType.Path,Value: serviceURL, ConfigType: eventType.Action})
l.listener.Process(&remoting.ConfigChangeEvent{Key: eventType.Path, Value: serviceURL, ConfigType: eventType.Action})
return true
}
}
......@@ -48,6 +54,8 @@ type configurationListener struct {
}
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.wg.Add(1)
return &configurationListener{registry: reg, events: make(chan *remoting.ConfigChangeEvent, 32)}
}
func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent) {
......@@ -71,9 +79,6 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
continue
}
//r.update(e.res)
//write to invoker
//r.outerEventCh <- e.res
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
......
package etcdv3
import (
"context"
"testing"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/remoting"
)
func Test_DataChange(t *testing.T) {
listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL(context.TODO(), "jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) {
t.Fatal("data change not ok")
}
}
type MockDataListener struct {
}
func (*MockDataListener) Process(configType *remoting.ConfigChangeEvent) {
}
......@@ -9,7 +9,9 @@ import (
"strings"
"sync"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
......@@ -18,6 +20,9 @@ import (
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/etcdv3"
"github.com/apache/dubbo-go/version"
)
import (
"github.com/juju/errors"
perrors "github.com/pkg/errors"
)
......@@ -46,7 +51,7 @@ type etcdV3Registry struct {
dataListener *dataListener
configListener *configurationListener
wg sync.WaitGroup // wg+done for zk restart
wg sync.WaitGroup // wg+done for etcd client restart
done chan struct{}
}
......@@ -76,7 +81,7 @@ func (r *etcdV3Registry) RestartCallBack() bool {
for _, confIf := range services {
err := r.Register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
......@@ -99,13 +104,17 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
logger.Infof("time-out is: %v", timeout.String())
r := &etcdV3Registry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]common.URL),
}
if err := etcdv3.ValidateClient(r, etcdv3.WithName(etcdv3.RegistryETCDV3Client)); err != nil {
if err := etcdv3.ValidateClient(r,
etcdv3.WithName(etcdv3.RegistryETCDV3Client),
etcdv3.WithTimeout(timeout),
etcdv3.WithEndpoints(url.Location),
); err != nil {
return nil, err
}
......@@ -135,9 +144,6 @@ func (r *etcdV3Registry) IsAvailable() bool {
func (r *etcdV3Registry) Destroy() {
logger.Warn("destory be call")
if r.configListener != nil {
r.configListener.Close()
}
......@@ -212,10 +218,10 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
logger.Errorf("etcd client create path %s: %v", consumersNode, err)
return errors.Annotate(err, "etcd create consumer nodes")
}
//providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
//if err := r.createDirIfNotExist(providersNode); err != nil {
// return errors.Annotate(err, "create provider node")
//}
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return errors.Annotate(err, "create provider node")
}
params := url.Values{}
......@@ -300,7 +306,7 @@ func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) {
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
return nil, perrors.New("etcd client broken")
}
// new client & listener
......@@ -313,7 +319,6 @@ func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) {
//注册到dataconfig的interested
r.dataListener.AddInterestedURL(&svc)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener)
return configListener, nil
......
package etcdv3
import (
"context"
"strconv"
"testing"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
import (
"github.com/stretchr/testify/assert"
)
func initRegistry(t *testing.T) *etcdV3Registry {
regurl, err := common.NewURL(context.TODO(), "registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
reg, err := newETCDV3Registry(&regurl)
if err != nil {
t.Fatal(err)
}
return reg.(*etcdV3Registry)
}
func Test_Register(t *testing.T) {
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
err := reg.Register(url)
children, _, err := reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers")
if err != nil {
t.Fatal(err)
}
assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children)
assert.NoError(t, err)
}
func Test_Subscribe(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
//provider register
err := reg.Register(url)
if err != nil {
t.Fatal(err)
}
//consumer register
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
reg2 := initRegistry(t)
reg2.Register(url)
listener, err := reg2.Subscribe(url)
if err != nil {
t.Fatal(err)
}
serviceEvent, err := listener.Next()
if err != nil {
t.Fatal(err)
}
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
}
func Test_ConsumerDestory(t *testing.T) {
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
_, err := reg.Subscribe(url)
if err != nil {
t.Fatal(err)
}
//listener.Close()
time.Sleep(1e9)
reg.Destroy()
assert.Equal(t, false, reg.IsAvailable())
}
func Test_ProviderDestory(t *testing.T) {
reg := initRegistry(t)
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg.Register(url)
//listener.Close()
time.Sleep(1e9)
reg.Destroy()
assert.Equal(t, false, reg.IsAvailable())
}
......@@ -5,8 +5,11 @@ import (
"path"
"sync"
"time"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
import (
"github.com/coreos/etcd/clientv3"
"github.com/juju/errors"
"go.etcd.io/etcd/clientv3/concurrency"
......@@ -198,13 +201,14 @@ func (c *Client) maintenanceStatus() error {
return errors.Annotate(err, "new session with server")
}
// must add wg before go maintenance status goroutine
c.Wait.Add(1)
go c.maintenanceStatusLoop(s)
return nil
}
func (c *Client) maintenanceStatusLoop(s *concurrency.Session) {
c.Wait.Add(1)
defer func() {
c.Wait.Done()
logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name)
......@@ -428,6 +432,16 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
return kList, vList, nil
}
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
if err != nil {
return "", errors.Annotatef(err, "get key value (key %s)", k)
}
return v, nil
}
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
......
package etcdv3
import (
"path"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/juju/errors"
"google.golang.org/grpc/connectivity"
)
// etcd connect config
var (
name = "test"
timeout = time.Second
heartbeat = 1
endpoints = []string{"localhost:2379"}
)
// tests dataset
var tests = []struct {
input struct {
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "name", v: "scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix", v: "prefix.scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix1", v: "prefix1.scott.wang"}},
{input: struct {
k string
v string
}{k: "age", v: "27"}},
}
// test dataset prefix
const prefix = "name"
func initClient(t *testing.T) *Client {
c, err := newClient(name, endpoints, timeout, heartbeat)
if err != nil {
t.Fatal(err)
}
return c
}
func Test_newClient(t *testing.T) {
c := initClient(t)
defer c.Close()
if c.rawClient.ActiveConnection().GetState() != connectivity.Ready {
t.Fatal(c.rawClient.ActiveConnection().GetState())
}
}
func TestClient_Close(t *testing.T) {
c := initClient(t)
c.Close()
}
func TestClient_Create(t *testing.T) {
tests := tests
c := initClient(t)
c.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
value, err := c.Get(k)
if err != nil {
t.Fatal(err)
}
if value != expect {
t.Fatalf("expect %v but get %v", expect, value)
}
}
}
func TestClient_Delete(t *testing.T) {
tests := tests
c := initClient(t)
c.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := ErrKVPairNotFound
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
if err := c.Delete(k); err != nil {
t.Fatal(err)
}
_, err := c.Get(k)
if errors.Cause(err) == expect {
continue
}
if err != nil {
t.Fatal(err)
}
}
}
func TestClient_GetChildrenKVList(t *testing.T) {
tests := tests
c := initClient(t)
defer c.Close()
var expectKList []string
var expectVList []string
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if strings.Contains(k, prefix) {
expectKList = append(expectKList, k)
expectVList = append(expectVList, v)
}
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
}
kList, vList, err := c.GetChildrenKVList(prefix)
if err != nil {
t.Fatal(err)
}
if reflect.DeepEqual(expectKList, kList) && reflect.DeepEqual(expectVList, vList) {
return
}
t.Fatalf("expect keylist %v but got %v expect valueList %v but got %v ", expectKList, kList, expectVList, vList)
}
func TestClient_Watch(t *testing.T) {
tests := tests
c := initClient(t)
defer c.Close()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
wc, err := c.watch(prefix)
if err != nil {
t.Fatal(err)
}
for e := range wc {
if e.Err() != nil {
t.Fatal(err)
}
for _, event := range e.Events {
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
}()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
if err := c.delete(k); err != nil {
t.Fatal(err)
}
}
c.Close()
wg.Wait()
}
func TestClient_RegisterTemp(t *testing.T) {
c := initClient(t)
observeC := initClient(t)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
completePath := path.Join("scott", "wang")
wc, err := observeC.watch(completePath)
if err != nil {
t.Fatal(err)
}
for e := range wc {
for _, event := range e.Events {
if e.Err() != nil {
t.Fatal(e.Err())
}
if event.Type == mvccpb.DELETE {
t.Logf("complete key (%s) is delete", completePath)
wg.Done()
observeC.Close()
return
}
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
}()
_, err := c.RegisterTemp("scott", "wang")
if err != nil {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
c.Close()
wg.Wait()
}
func TestClient_Valid(t *testing.T) {
c := initClient(t)
if c.Valid() != true {
t.Fatal("client is not valid")
}
c.Close()
if c.Valid() != false {
t.Fatal("client is valid")
}
}
func TestClient_Done(t *testing.T) {
c := initClient(t)
go func() {
time.Sleep(2 * time.Second)
c.Close()
}()
c.Wait.Wait()
}
......@@ -2,17 +2,20 @@ package etcdv3
import (
"sync"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
type clientFacade interface {
Client() *Client
SetClient(*Client)
......@@ -40,11 +43,10 @@ LOOP:
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
clientName := RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
endpoint := r.GetUrl().Location
r.Client().Close()
clientName := r.Client().name
endpoints := r.Client().endpoints
timeout := r.Client().timeout
heartbeat := r.Client().heartbeat
r.SetClient(nil)
r.ClientLock().Unlock()
......@@ -59,12 +61,11 @@ LOOP:
}
err = ValidateClient(r,
WithName(clientName),
WithEndpoints(endpoints...),
WithEndpoints(endpoint),
WithTimeout(timeout),
WithHeartbeat(heartbeat),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoints, perrors.WithStack(err))
endpoint, perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
......
package etcdv3
import (
"context"
"sync"
"testing"
"time"
"github.com/apache/dubbo-go/common"
)
type mockFacade struct {
client *Client
cltLock sync.Mutex
wg sync.WaitGroup
URL *common.URL
done chan struct{}
}
func (r *mockFacade) Client() *Client {
return r.client
}
func (r *mockFacade) SetClient(client *Client) {
r.client = client
}
func (r *mockFacade) ClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *mockFacade) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *mockFacade) GetDone() chan struct{} {
return r.done
}
func (r *mockFacade) GetUrl() common.URL {
return *r.URL
}
func (r *mockFacade) Destroy() {
close(r.done)
r.wg.Wait()
}
func (r *mockFacade) RestartCallBack() bool {
return true
}
func (r *mockFacade) IsAvailable() bool {
return true
}
func Test_Fascade(t *testing.T) {
c := initClient(t)
defer c.Close()
url, err := common.NewURL(context.Background(), "mock://127.0.0.1")
if err != nil {
t.Fatal(err)
}
mock := &mockFacade{client: c, URL: &url}
go HandleClientRestart(mock)
time.Sleep(2 * time.Second)
}
......@@ -3,18 +3,17 @@ package etcdv3
import (
"sync"
"time"
"github.com/coreos/etcd/clientv3"
)
import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/juju/errors"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/juju/errors"
)
type EventListener struct {
......@@ -187,7 +186,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
for i, k := range keyList {
logger.Warnf("get children list key -> %s", k)
logger.Infof("got children list key -> %s", k)
if !listener.DataChange(remoting.Event{
Path: k,
Action: remoting.EventTypeAdd,
......
package etcdv3
import (
"testing"
"github.com/apache/dubbo-go/remoting"
"github.com/stretchr/testify/assert"
)
var changedData = `
dubbo.consumer.request_timeout=3s
dubbo.consumer.connect_timeout=5s
dubbo.application.organization=ikurento.com
dubbo.application.name=BDTService
dubbo.application.module=dubbogo user-info server
dubbo.application.version=0.0.1
dubbo.application.owner=ZX
dubbo.application.environment=dev
dubbo.registries.hangzhouzk.protocol=zookeeper
dubbo.registries.hangzhouzk.timeout=3s
dubbo.registries.hangzhouzk.address=127.0.0.1:2181
dubbo.registries.shanghaizk.protocol=zookeeper
dubbo.registries.shanghaizk.timeout=3s
dubbo.registries.shanghaizk.address=127.0.0.1:2182
dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo
dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider
dubbo.service.com.ikurento.user.UserProvider.loadbalance=random
dubbo.service.com.ikurento.user.UserProvider.warmup=100
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
func TestListener(t *testing.T) {
var tests = []struct{
input struct{
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "/dubbo", v: changedData}},
}
c := initClient(t)
defer c.Close()
listener := NewEventListener(c)
dataListener := &mockDataListener{client: c, changedData: changedData}
listener.ListenServiceEvent("/dubbo", dataListener)
for _, tc := range tests{
k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil{
t.Fatal(err)
}
}
assert.Equal(t, changedData, dataListener.eventList[0].Content)
}
type mockDataListener struct {
eventList []remoting.Event
client *Client
changedData string
}
func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
m.eventList = append(m.eventList, eventType)
if eventType.Content == m.changedData {
//m.client.Close()
}
return true
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment