Skip to content
Snippets Groups Projects
Commit bc81b791 authored by scott.wang's avatar scott.wang
Browse files

ADD etcdv3 basic complete

parent 5bfbc309
No related branches found
No related tags found
No related merge requests found
......@@ -54,7 +54,7 @@ func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent)
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.ctx.Done():
case <-l.registry.done:
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, errors.New("listener stopped")
......@@ -62,7 +62,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Debugf("got etcd event %s", e)
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.ctx.Done():
case <-l.registry.done:
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
......@@ -76,4 +76,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
func (l *configurationListener) Close() {
l.registry.wg.Done()
}
package etcd
import (
"context"
"fmt"
"github.com/apache/dubbo-go/remoting"
"net/url"
"os"
"path"
......@@ -12,16 +10,16 @@ import (
"sync"
"time"
etcd "github.com/AlexStocks/goext/database/etcd"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/etcdv3"
"github.com/apache/dubbo-go/version"
"github.com/juju/errors"
"go.etcd.io/etcd/clientv3"
perrors "github.com/pkg/errors"
)
var (
......@@ -32,23 +30,62 @@ var (
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = utils.GetLocalIP()
extension.SetRegistry("etcd", newETCDV3Registry)
extension.SetRegistry("etcdv3", newETCDV3Registry)
}
type etcdV3Registry struct {
*common.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
ctx context.Context
cancel context.CancelFunc
cltLock sync.Mutex
client *etcdv3.Client
services map[string]common.URL // service name + protocol -> service config
rawClient *clientv3.Client
client *etcd.Client
dataListener remoting.DataListener
configListener remoting.ConfigurationListener
listenerLock sync.Mutex
listener *etcdv3.EventListener
dataListener *dataListener
configListener *configurationListener
servicesCache sync.Map // service name + protocol -> service config
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
}
func (r *etcdV3Registry) Client() *etcdv3.Client {
return r.client
}
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
r.client = client
}
func (r *etcdV3Registry) ClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *etcdV3Registry) GetDone() chan struct{} {
return r.done
}
func (r *etcdV3Registry) RestartCallBack() bool {
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
}
flag := true
for _, confIf := range services {
err := r.Register(confIf)
if err != nil {
logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
return flag
}
func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
......@@ -63,56 +100,24 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
logger.Infof("etcd address is: %v", url.Location)
logger.Infof("time-out is: %v", timeout.String())
rawClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{url.Location},
DialTimeout: timeout,
//DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return nil, errors.Annotate(err, "block connect to etcd server")
}
rawClient.ActiveConnection()
rootCtx, cancel := context.WithCancel(context.Background())
client, err := etcd.NewClient(rawClient, etcd.WithTTL(time.Second), etcd.WithContext(rootCtx))
if err != nil {
return nil, errors.Annotate(err, "new etcd client")
r := &etcdV3Registry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
}
r := etcdV3Registry{
URL: url,
ctx: rootCtx,
cancel: cancel,
rawClient: rawClient,
client: client,
servicesCache: sync.Map{},
if err := etcdv3.ValidateClient(r, etcdv3.WithName(etcdv3.RegistryETCDV3Client)); err != nil {
return nil, err
}
go r.keepAlive()
return &r, nil
}
r.wg.Add(1)
go etcdv3.HandleClientRestart(r)
func (r *etcdV3Registry) keepAlive() error {
r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
resp, err := r.client.KeepAlive()
if err != nil {
return errors.Annotate(err, "keep alive")
}
go func() {
for {
select {
case _, ok := <-resp:
if !ok {
logger.Errorf("etcd server stop")
r.cancel()
return
}
}
}
}()
return nil
return r, nil
}
func (r *etcdV3Registry) GetUrl() common.URL {
......@@ -122,7 +127,7 @@ func (r *etcdV3Registry) GetUrl() common.URL {
func (r *etcdV3Registry) IsAvailable() bool {
select {
case <-r.ctx.Done():
case <-r.done:
return false
default:
return true
......@@ -130,20 +135,21 @@ func (r *etcdV3Registry) IsAvailable() bool {
}
func (r *etcdV3Registry) Destroy() {
if r.configListener != nil {
r.configListener.Close()
}
r.stop()
}
func (r *etcdV3Registry) stop() {
// close current client
r.rawClient.Close()
close(r.done)
// cancel ctx
r.cancel()
// close current client
r.client.Close()
r.rawClient = nil
r.ctx = nil
r.cancel = nil
r.client = nil
r.servicesCache.Range(func(key, value interface{}) bool {
r.servicesCache.Delete(key)
return true
......@@ -180,24 +186,12 @@ func (r *etcdV3Registry) Register(svc common.URL) error {
return nil
}
func (r *etcdV3Registry) createKVIfNotExist(k string, v string) error {
_, err := r.rawClient.Txn(r.ctx).
If(clientv3.Compare(clientv3.Version(k), "<", 1)).
Then(clientv3.OpPut(k, v)).
Commit()
if err != nil {
return errors.Annotatef(err, "etcd create k %s v %s", k, v)
}
return nil
}
func (r *etcdV3Registry) createDirIfNotExist(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
if err := r.createKVIfNotExist(tmpPath, ""); err != nil {
if err := r.client.Create(tmpPath, ""); err != nil {
return errors.Annotatef(err, "create path %s in etcd", tmpPath)
}
}
......@@ -226,7 +220,7 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil {
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
......@@ -279,7 +273,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error {
encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil {
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
}
......@@ -288,8 +282,33 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error {
func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
)
r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("zk connection broken")
}
// new client & listener
listener := etcdv3.NewEventListener(r.client)
r.listenerLock.Lock()
r.listener = listener
r.listenerLock.Unlock()
}
//注册到dataconfig的interested
r.dataListener.AddInterestedURL(&svc)
logger.Infof("subscribe svc: %s", svc)
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener)
return nil, nil
return configListener, nil
}
......@@ -2,13 +2,13 @@ package etcdv3
import (
"context"
"fmt"
"path"
"sync"
"time"
)
import (
"github.com/AlexStocks/goext/database/etcd"
"github.com/juju/errors"
perrors "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
......@@ -24,10 +24,11 @@ import (
const (
ConnDelay = 3
MaxFailTimes = 15
RegistryETCDV3Client = "etcd registry"
)
var (
ErrNilETCDV3ClientConn = errors.New("etcdv3client{conn} is nil")
ErrNilETCDV3ClientConn = errors.New("etcd clientset {conn} is nil") // full describe the ERR
ErrKVPairNotFound = errors.New("k/v pair not found")
)
......@@ -36,7 +37,7 @@ type clientSet struct {
lock sync.RWMutex // protect all element in
// clientSet
gxClient *gxetcd.Client
//gxClient *gxetcd.Client
rawClient *clientv3.Client
// client controller used to change client behave
......@@ -60,17 +61,17 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error {
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return errors.Annotate(err, "block connect to etcd server")
return errors.Annotate(err, "new raw client block connect to server")
}
// share context
gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx))
if err != nil {
return errors.Annotate(err, "new etcd client")
}
//gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx))
//if err != nil {
// return errors.Annotate(err, "new gxetcd client")
//}
out := &clientSet{
gxClient: gxClient,
//gxClient: gxClient,
rawClient: client,
ctx: rootCtx,
cancel: cancel,
......@@ -92,12 +93,20 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error {
func (c *clientSet) maintenanceStatus() error {
c.c.Wait.Add(1)
aliveResp, err := c.gxClient.KeepAlive()
lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds()))
if err != nil {
return errors.Annotatef(err, "etcd keep alive")
return errors.Annotatef(err, "grant lease")
}
keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil {
c.rawClient.Revoke(c.ctx, lease.ID)
return errors.Annotate(err, "keep alive lease")
}
// start maintenance the connection status
go c.maintenanceStatusLoop(aliveResp)
go c.maintenanceStatusLoop(keepAlive)
return nil
}
......@@ -105,7 +114,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl
defer func() {
c.c.Wait.Done()
logger.Infof("etcd {path:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name)
logger.Infof("etcdv3 clientset {endpoints:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name)
}()
// get signal, will start maintenanceStatusLoop
......@@ -118,7 +127,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl
return
case <-c.ctx.Done():
// client context exit
logger.Warn("etcd clientSet context done")
logger.Warn("etcdv3 clientset context done")
return
case msg, ok := <-aliveResp:
// etcd connection lose
......@@ -126,7 +135,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl
// if clientSet.Client is nil, it will panic
if !ok {
logger.Warnf("etcd server stop at term: %#v", msg)
logger.Warnf("etcdv3 server stop at term: %#v", msg)
c.c.Lock() // hold the c.Client lock
c.c.cs.clean()
......@@ -213,10 +222,25 @@ func (c *clientSet) getChildrenW(k string) ([]string, []string, clientv3.WatchCh
return nil, nil, nil, ErrNilETCDV3ClientConn
}
wc := c.rawClient.Watch(c.ctx, k, clientv3.WithPrefix())
wc,err := c.watchWithPrefix(k)
if err != nil{
return nil, nil, nil,errors.Annotate(err, "watch with prefix")
}
return kList, vList, wc, nil
}
func (c *clientSet) watchWithPrefix(prefix string) (clientv3.WatchChan, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return nil, ErrNilETCDV3ClientConn
}
return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil
}
func (c *clientSet) watch(k string) (clientv3.WatchChan, error) {
c.lock.RLock()
......@@ -228,7 +252,7 @@ func (c *clientSet) watch(k string) (clientv3.WatchChan, error) {
_, err := c.get(k)
if err != nil {
return nil, errors.Annotatef(err, "watch pre check key %s", k)
return nil, errors.Annotatef(err, "pre check key %s", k)
}
return c.rawClient.Watch(c.ctx, k), nil
......@@ -296,11 +320,10 @@ func (c *clientSet) keepAliveKV(k string, v string) error {
// this method will hold clientset lock
func (c *clientSet) clean() {
c.lock.Lock()
if c.gxClient != nil {
if c.rawClient != nil {
// close gx client, it will close raw etcdv3 client
c.gxClient.Close()
c.gxClient = nil
// close raw etcdv3 client
c.rawClient.Close()
c.rawClient = nil
// cancel all context
......@@ -373,8 +396,6 @@ func ValidateClient(container clientFacade, opts ...Option) error {
opt(options)
}
err = nil
lock := container.ClientLock()
url := container.GetUrl()
......@@ -388,13 +409,13 @@ func ValidateClient(container clientFacade, opts ...Option) error {
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location)
return errors.Annotate(err, "timeout parse")
}
newClient, err := newClient(options.name, []string{url.Location}, timeout)
if err != nil {
logger.Warnf("newETCDV3Client(name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}",
logger.Warnf("new client (name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}",
options.name, url.Location, timeout.String(), err)
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location)
return errors.Annotatef(err, "new client (address:%+v)", url.Location)
}
container.SetClient(newClient)
}
......@@ -403,12 +424,12 @@ func ValidateClient(container clientFacade, opts ...Option) error {
err = newClientSet(container.Client().endpoints, container.Client().timeout, container.Client())
if err != nil {
return errors.Annotate(err, "new client set")
return errors.Annotate(err, "new clientset")
}
container.Client().cs.startMaintenanceChan <- struct{}{}
}
return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.PrimitiveURL)
return nil
}
func newClient(name string, endpoints []string, timeout time.Duration) (*Client, error) {
......@@ -427,7 +448,7 @@ func newClient(name string, endpoints []string, timeout time.Duration) (*Client,
err = newClientSet(endpoints, timeout, out)
if err != nil {
return nil, errors.Annotate(err, "new client set")
return nil, errors.Annotate(err, "new clientset")
}
// start maintenanceChan
......@@ -445,19 +466,25 @@ func (c *Client) stop() bool {
return false
}
func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) {
func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) error {
if key == "" || wc == nil {
return
return errors.New(fmt.Sprintf("key is %s, wc is %v", key, wc))
}
wcc, err := c.cs.watch(key)
if err != nil {
return errors.Annotatef(err, "clientset watch %s", key)
}
c.Lock()
a := c.eventRegistry[key]
a = append(a, wc)
c.eventRegistry[key] = a
c.Unlock()
go func() {
wcc := c.cs.rawClient.Watch(c.cs.ctx, key)
for msg := range wcc {
wc <- msg
}
......@@ -465,8 +492,8 @@ func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) {
close(wc)
}()
logger.Debugf("etcdClient{%s} register event{path:%s, ptr:%p}", c.name, key, wc)
c.Unlock()
logger.Debugf("etcdv3 client{%s} register event{key:%s, ptr:%p}", c.name, key, wc)
return nil
}
func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) {
......@@ -485,10 +512,10 @@ func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse)
if e == event {
arr := infoList
infoList = append(arr[:i], arr[i+1:]...)
logger.Debugf("etcdClient{%s} unregister event{path:%s, event:%p}", c.name, key, event)
logger.Debugf("etcdv3 client{%s} unregister event{key:%s, event:%p}", c.name, key, event)
}
}
logger.Debugf("after etcdClient{%s} unregister event{path:%s, event:%p}, array length %d",
logger.Debugf("after etcdv3 client{%s} unregister event{key:%s, event:%p}, array length %d",
c.name, key, event, len(infoList))
if len(infoList) == 0 {
delete(c.eventRegistry, key)
......@@ -531,7 +558,7 @@ func (c *Client) Close() {
c.cs = nil
}
c.Unlock()
logger.Warnf("etcd client{name:%s, etcd addr:%s} exit now.", c.name, c.endpoints)
logger.Warnf("etcdv3 client{name:%s, etcdv3 addr:%s} exit now.", c.name, c.endpoints)
}
func (c *Client) Create(k string, v string) error {
......@@ -543,7 +570,7 @@ func (c *Client) Create(k string, v string) error {
err = c.cs.put(k, v)
}
c.Unlock()
return errors.Annotatef(err, "etcd client put key %s value %s", k, v)
return errors.Annotatef(err, "clientset put key %s value %s", k, v)
}
func (c *Client) Delete(key string) error {
......@@ -554,7 +581,7 @@ func (c *Client) Delete(key string) error {
err = c.cs.delete(key)
}
c.Unlock()
return errors.Annotatef(err, "etcd client delete (basePath:%s)", key)
return errors.Annotatef(err, "clientset delete (key:%s)", key)
}
func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
......@@ -566,40 +593,39 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
err = c.cs.keepAliveKV(completePath, "")
}
c.Unlock()
logger.Debugf("etcdClient{%s} create a tmp node:%s\n", c.name, completePath)
logger.Debugf("etcdv3 client{%s} create a tmp node:%s\n", c.name, completePath)
if err != nil {
return "", errors.Annotatef(err, "etcd client create tmp k %s", completePath)
return "", errors.Annotatef(err, "client create tmp key %s", completePath)
}
return completePath, nil
}
func (c *Client) GetChildrenW(path string) ([]string, clientv3.WatchChan, error) {
func (c *Client) WatchChildren(key string) ([]string, []string, clientv3.WatchChan, error) {
var (
children []string
err error
wc clientv3.WatchChan
err error
childrenKeys []string
childrenValues []string
wc clientv3.WatchChan
)
err = ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
children, _, wc, err = c.cs.getChildrenW(path)
childrenKeys, childrenValues, wc, err = c.cs.getChildrenW(key)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, nil, errors.Annotatef(err,"path{%s} has none children", path)
}
logger.Errorf("etcdv3.ChildrenW(path{%s}) = error(%v)", path, err)
return nil, nil, errors.Annotatef(err, "etcdv3.ChildrenW(path:%s)", path)
logger.Errorf("etcdv3 client Children(key{%s}) = error(%v)", key, perrors.WithStack(err))
return nil, nil, nil, errors.Annotatef(err, "client ChildrenW(key:%s)", key)
}
return children, wc, nil
return childrenKeys, childrenValues, wc, nil
}
func (c *Client) GetChildren(path string) ([]string, error) {
func (c *Client) GetChildren(key string) ([]string, error) {
var (
err error
children []string
......@@ -608,20 +634,20 @@ func (c *Client) GetChildren(path string) ([]string, error) {
err = ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
children, _, err = c.cs.getChildren(path)
children, _, err = c.cs.getChildren(key)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, errors.Annotatef(err,"path{%s} has none children", path)
return nil, errors.Annotatef(err, "key{%s} has none children", key)
}
logger.Errorf("clientv3.Children(path{%s}) = error(%v)", path, perrors.WithStack(err))
return nil, errors.Annotatef(err, "clientv3.Children(path:%s)", path)
logger.Errorf("clientv3.Children(key{%s}) = error(%v)", key, perrors.WithStack(err))
return nil, errors.Annotatef(err, "client GetChildren(key:%s)", key)
}
return children, nil
}
func (c *Client) ExistW(path string) (clientv3.WatchChan, error) {
func (c *Client) WatchExist(key string) (clientv3.WatchChan, error) {
var (
err = ErrNilETCDV3ClientConn
......@@ -630,25 +656,25 @@ func (c *Client) ExistW(path string) (clientv3.WatchChan, error) {
c.Lock()
if c.cs != nil {
_, err = c.cs.watch(path)
out, err = c.cs.watch(key)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, errors.Annotatef(err, "path{%s} not exist", path)
return nil, errors.Annotatef(err, "key{%s} not exist", key)
}
return nil, errors.Annotatef(err, "clientv3.ExistW(path:%s)", path)
return nil, errors.Annotatef(err, "client WatchExist(key:%s)", key)
}
return out, nil
}
func (c *Client) GetContent(path string) ([]byte, error) {
func (c *Client) GetContent(key string) ([]byte, error) {
c.Lock()
value, err := c.cs.get(path)
value, err := c.cs.get(key)
if err != nil {
return nil, errors.Annotatef(err, "client set get: %s", path)
return nil, errors.Annotatef(err, "clientset get(key: %s)", key)
}
c.Unlock()
......
package etcdv3
import (
"gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path"
pathlib "path"
"sync"
"time"
"go.etcd.io/etcd/clientv3"
)
import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
import (
......@@ -20,31 +17,32 @@ import (
)
type EventListener struct {
client *Client
pathMapLock sync.Mutex
pathMap map[string]struct{}
wg sync.WaitGroup
client *Client
keyMapLock sync.Mutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
pathMap: make(map[string]struct{}),
client: client,
keyMap: make(map[string]struct{}),
}
}
func (l *EventListener) SetClient(client *Client) {
l.client = client
}
// this method will return true when spec path deleted,
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting.DataListener) bool {
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
keyEventCh, err := l.client.ExistW(path)
keyEventCh, err := l.client.WatchExist(key)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", path, err)
logger.Warnf("WatchExist{key:%s} = error{%v}", key, err)
return false
}
......@@ -61,25 +59,12 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting
// etcd event stream
case e := <-keyEventCh:
if e.Err() != nil{
logger.Warnf("get a etcd event {err: %s}", e.Err())
if e.Err() != nil {
logger.Errorf("get a etcdv3 event {err: %s}", e.Err())
continue
}
for _, event := range e.Events{
logger.Warnf("get a etcd Event{type:%s, path:%s,}",
event.Type.String(), event.Kv.Key )
switch event.Type {
case mvccpb.PUT:
if len(listener) > 0 {
if event.IsCreate(){
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EventTypeAdd, Content: string(event.Kv.Value)})
}else{
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EvnetTypeUpdate, Content: string(event.Kv.Value)})
}
}
case mvccpb.DELETE:
logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
for _, event := range e.Events {
if l.handleEvents(event, listener...) {
return true
}
}
......@@ -89,285 +74,120 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting
return false
}
func (l *EventListener) handleNodeEvent(path string, children []string, listener remoting.DataListener) {
contains := func(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool {
logger.Warnf("get a etcdv3 Event {type: %s, key: %s}", event.Type, event.Kv.Key)
switch event.Type {
// the etcdv3 event just include PUT && DELETE
case mvccpb.PUT:
for _, listener := range listeners {
switch event.IsCreate() {
case true:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EventTypeAdd,
Content: string(event.Kv.Value),
})
case false:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EvnetTypeUpdate,
Content: string(event.Kv.Value),
})
}
}
return false
case mvccpb.DELETE:
logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
return true
}
newChildren, err := l.client.GetChildren(path)
if err != nil {
logger.Errorf("path{%s} child nodes changed, etcdV3.Children() = error{%v}", path, perrors.WithStack(err))
return
}
// a node was added -- listen the new node
var (
newNode string
)
for _, n := range newChildren {
if contains(children, n) {
continue
}
newNode = pathlib.Join(path, n)
logger.Infof("add zkNode{%s}", newNode)
content, _, err := l.client.Conn.Get(newNode)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
go func(node, childNode string) {
logger.Infof("delete zkNode{%s}", node)
if l.ListenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", childNode)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, n)
}
// old node was deleted
var oldNode string
for _, n := range children {
if contains(newChildren, n) {
continue
}
oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)
if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
panic("unreachable")
}
func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) {
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
var (
failTimes int
event chan struct{}
zkEvent zk.Event
)
event = make(chan struct{}, 4)
defer close(event)
for {
// get current children for a zkPath
children, childEventCh, err := l.client.GetChildrenW(zkPath)
_, _, wc, err := l.client.WatchChildren(prefix)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
select {
case <-event:
default:
break CLEAR
}
}
l.client.RegisterEvent(zkPath, &event)
select {
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
continue
case <-l.client.Done():
l.client.UnregisterEvent(zkPath, &event)
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
return
case <-event:
logger.Infof("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
continue
}
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
}
failTimes = 0
for _, c := range children {
// listen l service node
dubboPath := path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
logger.Infof("Get children!{%s}", dubboPath)
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
//liten sub path recursive
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
}
select {
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
continue
}
l.handleZkNodeEvent(zkEvent.Path, children, listener)
// client watch ctx stop
// server stopped
case <-l.client.cs.ctx.Done():
logger.Warn("etcd listener service node with prefix etcd server stopped")
return
// client stopped
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
logger.Warn("etcdv3 client stopped")
return
// etcd event stream
case e := <-wc:
if e.Err() != nil {
logger.Errorf("get a etcdv3 event {err: %s}", e.Err())
continue
}
for _, event := range e.Events {
l.handleEvents(event, listener...)
}
}
}
}
//
//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) {
// l.wg.EventTypeAdd(1)
// defer l.wg.Done()
//
// var (
// failTimes int
// event chan struct{}
// zkEvent zk.Event
// )
// event = make(chan struct{}, 4)
// defer close(event)
// for {
// // get current children for a zkPath
// content,_, eventCh, err := l.client.Conn.GetW(zkPath)
// if err != nil {
// failTimes++
// if MaxFailTimes <= failTimes {
// failTimes = MaxFailTimes
// }
// logger.Errorf("listenFileEvent(path{%s}) = error{%v}", zkPath, err)
// // clear the event channel
// CLEAR:
// for {
// select {
// case <-event:
// default:
// break CLEAR
// }
// }
// l.client.RegisterEvent(zkPath, &event)
// select {
// case <-time.After(timeSecondDuration(failTimes * ConnDelay)):
// l.client.UnregisterEvent(zkPath, &event)
// continue
// case <-l.client.Done():
// l.client.UnregisterEvent(zkPath, &event)
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// case <-event:
// logger.Infof("get zk.EventNodeDataChange notify event")
// l.client.UnregisterEvent(zkPath, &event)
// l.handleZkNodeEvent(zkPath, nil, listener)
// continue
// }
// }
// failTimes = 0
//
// select {
// case zkEvent = <-eventCh:
// logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
// zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
//
// l.handleZkNodeEvent(zkEvent.Path, children, listener)
// case <-l.client.Done():
// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
// return
// }
// }
//}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
var (
err error
dubboPath string
children []string
)
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.pathMapLock.Lock()
_, ok := l.pathMap[zkPath]
l.pathMapLock.Unlock()
l.keyMapLock.Lock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", zkPath)
logger.Warnf("etcdv3 key %s has already been listened.", key)
return
}
l.pathMapLock.Lock()
l.pathMap[zkPath] = struct{}{}
l.pathMapLock.Unlock()
l.keyMapLock.Lock()
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
children, err = l.client.GetChildren(zkPath)
if err != nil {
children = nil
logger.Warnf("fail to get children of zk path{%s}", zkPath)
}
for _, c := range children {
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
// listen l service node
dubboPath = path.Join(zkPath, c)
content, _, err := l.client.Conn.Get(dubboPath)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
}
if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
logger.Infof("listen dubbo service key{%s}", key)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
}
logger.Infof("listen dubbo service key{%s}", dubboPath)
go func(zkPath string) {
if l.ListenServiceNodeEvent(dubboPath) {
listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath)
}
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)
}(key)
}
func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
func (l *EventListener) valid() bool {
return l.client.Valid()
}
func (l *ZkEventListener) Close() {
func (l *EventListener) Close() {
l.wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment