From bc81b791fe7dc59191bd801f3c0185f3e0cd6b51 Mon Sep 17 00:00:00 2001 From: "scott.wang" <sxllwx@github.com> Date: Wed, 24 Jul 2019 16:47:12 +0800 Subject: [PATCH] ADD etcdv3 basic complete --- registry/etcd/listener.go | 5 +- registry/etcd/registry.go | 181 ++++++++++-------- remoting/etcdv3/client.go | 154 ++++++++------- remoting/etcdv3/listener.go | 368 +++++++++--------------------------- 4 files changed, 287 insertions(+), 421 deletions(-) diff --git a/registry/etcd/listener.go b/registry/etcd/listener.go index a58612da2..c0a367b1c 100644 --- a/registry/etcd/listener.go +++ b/registry/etcd/listener.go @@ -54,7 +54,7 @@ func (l *configurationListener) Process(configType *remoting.ConfigChangeEvent) func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { - case <-l.registry.ctx.Done(): + case <-l.registry.done: logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.") return nil, errors.New("listener stopped") @@ -62,7 +62,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { logger.Debugf("got etcd event %s", e) if e.ConfigType == remoting.EventTypeDel { select { - case <-l.registry.ctx.Done(): + case <-l.registry.done: logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) default: } @@ -76,4 +76,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } func (l *configurationListener) Close() { + l.registry.wg.Done() } diff --git a/registry/etcd/registry.go b/registry/etcd/registry.go index 677fab8bd..b1508449b 100644 --- a/registry/etcd/registry.go +++ b/registry/etcd/registry.go @@ -1,9 +1,7 @@ package etcd import ( - "context" "fmt" - "github.com/apache/dubbo-go/remoting" "net/url" "os" "path" @@ -12,16 +10,16 @@ import ( "sync" "time" - etcd "github.com/AlexStocks/goext/database/etcd" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/utils" "github.com/apache/dubbo-go/registry" + "github.com/apache/dubbo-go/remoting/etcdv3" "github.com/apache/dubbo-go/version" "github.com/juju/errors" - "go.etcd.io/etcd/clientv3" + perrors "github.com/pkg/errors" ) var ( @@ -32,23 +30,62 @@ var ( func init() { processID = fmt.Sprintf("%d", os.Getpid()) localIP, _ = utils.GetLocalIP() - extension.SetRegistry("etcd", newETCDV3Registry) + extension.SetRegistry("etcdv3", newETCDV3Registry) } type etcdV3Registry struct { *common.URL birth int64 // time of file birth, seconds since Epoch; 0 if unknown - ctx context.Context - cancel context.CancelFunc + cltLock sync.Mutex + client *etcdv3.Client + services map[string]common.URL // service name + protocol -> service config - rawClient *clientv3.Client - client *etcd.Client - - dataListener remoting.DataListener - configListener remoting.ConfigurationListener + listenerLock sync.Mutex + listener *etcdv3.EventListener + dataListener *dataListener + configListener *configurationListener servicesCache sync.Map // service name + protocol -> service config + + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} +} + +func (r *etcdV3Registry) Client() *etcdv3.Client { + return r.client +} +func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { + r.client = client +} +func (r *etcdV3Registry) ClientLock() *sync.Mutex { + return &r.cltLock +} +func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup { + return &r.wg +} +func (r *etcdV3Registry) GetDone() chan struct{} { + return r.done +} +func (r *etcdV3Registry) RestartCallBack() bool { + + services := []common.URL{} + for _, confIf := range r.services { + services = append(services, confIf) + } + + flag := true + for _, confIf := range services { + err := r.Register(confIf) + if err != nil { + logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}", + confIf, perrors.WithStack(err)) + flag = false + break + } + logger.Infof("success to re-register service :%v", confIf.Key()) + } + return flag } func newETCDV3Registry(url *common.URL) (registry.Registry, error) { @@ -63,56 +100,24 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { logger.Infof("etcd address is: %v", url.Location) logger.Infof("time-out is: %v", timeout.String()) - rawClient, err := clientv3.New(clientv3.Config{ - Endpoints: []string{url.Location}, - DialTimeout: timeout, - //DialOptions: []grpc.DialOption{grpc.WithBlock()}, - }) - if err != nil { - return nil, errors.Annotate(err, "block connect to etcd server") - } - - rawClient.ActiveConnection() - - rootCtx, cancel := context.WithCancel(context.Background()) - client, err := etcd.NewClient(rawClient, etcd.WithTTL(time.Second), etcd.WithContext(rootCtx)) - if err != nil { - return nil, errors.Annotate(err, "new etcd client") + r := &etcdV3Registry{ + URL: url, + birth: time.Now().UnixNano(), + done: make(chan struct{}), } - r := etcdV3Registry{ - URL: url, - ctx: rootCtx, - cancel: cancel, - rawClient: rawClient, - client: client, - servicesCache: sync.Map{}, + if err := etcdv3.ValidateClient(r, etcdv3.WithName(etcdv3.RegistryETCDV3Client)); err != nil { + return nil, err } - go r.keepAlive() - return &r, nil -} + r.wg.Add(1) + go etcdv3.HandleClientRestart(r) -func (r *etcdV3Registry) keepAlive() error { + r.listener = etcdv3.NewEventListener(r.client) + r.configListener = NewConfigurationListener(r) + r.dataListener = NewRegistryDataListener(r.configListener) - resp, err := r.client.KeepAlive() - if err != nil { - return errors.Annotate(err, "keep alive") - } - go func() { - for { - select { - case _, ok := <-resp: - if !ok { - logger.Errorf("etcd server stop") - r.cancel() - return - } - - } - } - }() - return nil + return r, nil } func (r *etcdV3Registry) GetUrl() common.URL { @@ -122,7 +127,7 @@ func (r *etcdV3Registry) GetUrl() common.URL { func (r *etcdV3Registry) IsAvailable() bool { select { - case <-r.ctx.Done(): + case <-r.done: return false default: return true @@ -130,20 +135,21 @@ func (r *etcdV3Registry) IsAvailable() bool { } func (r *etcdV3Registry) Destroy() { + + if r.configListener != nil { + r.configListener.Close() + } r.stop() } func (r *etcdV3Registry) stop() { - // close current client - r.rawClient.Close() + close(r.done) - // cancel ctx - r.cancel() + // close current client + r.client.Close() - r.rawClient = nil - r.ctx = nil - r.cancel = nil + r.client = nil r.servicesCache.Range(func(key, value interface{}) bool { r.servicesCache.Delete(key) return true @@ -180,24 +186,12 @@ func (r *etcdV3Registry) Register(svc common.URL) error { return nil } -func (r *etcdV3Registry) createKVIfNotExist(k string, v string) error { - - _, err := r.rawClient.Txn(r.ctx). - If(clientv3.Compare(clientv3.Version(k), "<", 1)). - Then(clientv3.OpPut(k, v)). - Commit() - if err != nil { - return errors.Annotatef(err, "etcd create k %s v %s", k, v) - } - return nil -} - func (r *etcdV3Registry) createDirIfNotExist(k string) error { var tmpPath string for _, str := range strings.Split(k, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) - if err := r.createKVIfNotExist(tmpPath, ""); err != nil { + if err := r.client.Create(tmpPath, ""); err != nil { return errors.Annotatef(err, "create path %s in etcd", tmpPath) } } @@ -226,7 +220,7 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error { encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) - if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil { + if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } @@ -279,7 +273,7 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode())) dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) - if err := r.createKVIfNotExist(path.Join(dubboPath, encodedURL), ""); err != nil { + if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { return errors.Annotatef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL) } @@ -288,8 +282,33 @@ func (r *etcdV3Registry) registerProvider(svc common.URL) error { func (r *etcdV3Registry) Subscribe(svc common.URL) (registry.Listener, error) { + var ( + configListener *configurationListener + ) + + r.listenerLock.Lock() + configListener = r.configListener + r.listenerLock.Unlock() + if r.listener == nil { + r.cltLock.Lock() + client := r.client + r.cltLock.Unlock() + if client == nil { + return nil, perrors.New("zk connection broken") + } + + // new client & listener + listener := etcdv3.NewEventListener(r.client) + + r.listenerLock.Lock() + r.listener = listener + r.listenerLock.Unlock() + } + + //娉ㄥ唽鍒癲ataconfig鐨刬nterested + r.dataListener.AddInterestedURL(&svc) - logger.Infof("subscribe svc: %s", svc) + go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/providers", svc.Service()), r.dataListener) - return nil, nil + return configListener, nil } diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 8cfb47f7c..a6eaee8af 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -2,13 +2,13 @@ package etcdv3 import ( "context" + "fmt" "path" "sync" "time" ) import ( - "github.com/AlexStocks/goext/database/etcd" "github.com/juju/errors" perrors "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" @@ -24,10 +24,11 @@ import ( const ( ConnDelay = 3 MaxFailTimes = 15 + RegistryETCDV3Client = "etcd registry" ) var ( - ErrNilETCDV3ClientConn = errors.New("etcdv3client{conn} is nil") + ErrNilETCDV3ClientConn = errors.New("etcd clientset {conn} is nil") // full describe the ERR ErrKVPairNotFound = errors.New("k/v pair not found") ) @@ -36,7 +37,7 @@ type clientSet struct { lock sync.RWMutex // protect all element in // clientSet - gxClient *gxetcd.Client + //gxClient *gxetcd.Client rawClient *clientv3.Client // client controller used to change client behave @@ -60,17 +61,17 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error { DialOptions: []grpc.DialOption{grpc.WithBlock()}, }) if err != nil { - return errors.Annotate(err, "block connect to etcd server") + return errors.Annotate(err, "new raw client block connect to server") } // share context - gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx)) - if err != nil { - return errors.Annotate(err, "new etcd client") - } + //gxClient, err := gxetcd.NewClient(client, gxetcd.WithTTL(time.Second), gxetcd.WithContext(rootCtx)) + //if err != nil { + // return errors.Annotate(err, "new gxetcd client") + //} out := &clientSet{ - gxClient: gxClient, + //gxClient: gxClient, rawClient: client, ctx: rootCtx, cancel: cancel, @@ -92,12 +93,20 @@ func newClientSet(endpoints []string, timeout time.Duration, c *Client) error { func (c *clientSet) maintenanceStatus() error { c.c.Wait.Add(1) - aliveResp, err := c.gxClient.KeepAlive() + + lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) if err != nil { - return errors.Annotatef(err, "etcd keep alive") + return errors.Annotatef(err, "grant lease") } + + keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) + if err != nil || keepAlive == nil { + c.rawClient.Revoke(c.ctx, lease.ID) + return errors.Annotate(err, "keep alive lease") + } + // start maintenance the connection status - go c.maintenanceStatusLoop(aliveResp) + go c.maintenanceStatusLoop(keepAlive) return nil } @@ -105,7 +114,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl defer func() { c.c.Wait.Done() - logger.Infof("etcd {path:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name) + logger.Infof("etcdv3 clientset {endpoints:%v, name:%s} connection goroutine game over.", c.c.endpoints, c.c.name) }() // get signal, will start maintenanceStatusLoop @@ -118,7 +127,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl return case <-c.ctx.Done(): // client context exit - logger.Warn("etcd clientSet context done") + logger.Warn("etcdv3 clientset context done") return case msg, ok := <-aliveResp: // etcd connection lose @@ -126,7 +135,7 @@ func (c *clientSet) maintenanceStatusLoop(aliveResp <-chan *clientv3.LeaseKeepAl // if clientSet.Client is nil, it will panic if !ok { - logger.Warnf("etcd server stop at term: %#v", msg) + logger.Warnf("etcdv3 server stop at term: %#v", msg) c.c.Lock() // hold the c.Client lock c.c.cs.clean() @@ -213,10 +222,25 @@ func (c *clientSet) getChildrenW(k string) ([]string, []string, clientv3.WatchCh return nil, nil, nil, ErrNilETCDV3ClientConn } - wc := c.rawClient.Watch(c.ctx, k, clientv3.WithPrefix()) + wc,err := c.watchWithPrefix(k) + if err != nil{ + return nil, nil, nil,errors.Annotate(err, "watch with prefix") + } return kList, vList, wc, nil } +func (c *clientSet) watchWithPrefix(prefix string) (clientv3.WatchChan, error) { + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil { + return nil, ErrNilETCDV3ClientConn + } + + return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil +} + func (c *clientSet) watch(k string) (clientv3.WatchChan, error) { c.lock.RLock() @@ -228,7 +252,7 @@ func (c *clientSet) watch(k string) (clientv3.WatchChan, error) { _, err := c.get(k) if err != nil { - return nil, errors.Annotatef(err, "watch pre check key %s", k) + return nil, errors.Annotatef(err, "pre check key %s", k) } return c.rawClient.Watch(c.ctx, k), nil @@ -296,11 +320,10 @@ func (c *clientSet) keepAliveKV(k string, v string) error { // this method will hold clientset lock func (c *clientSet) clean() { c.lock.Lock() - if c.gxClient != nil { + if c.rawClient != nil { - // close gx client, it will close raw etcdv3 client - c.gxClient.Close() - c.gxClient = nil + // close raw etcdv3 client + c.rawClient.Close() c.rawClient = nil // cancel all context @@ -373,8 +396,6 @@ func ValidateClient(container clientFacade, opts ...Option) error { opt(options) } - err = nil - lock := container.ClientLock() url := container.GetUrl() @@ -388,13 +409,13 @@ func ValidateClient(container clientFacade, opts ...Option) error { if err != nil { logger.Errorf("timeout config %v is invalid ,err is %v", url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) - return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location) + return errors.Annotate(err, "timeout parse") } newClient, err := newClient(options.name, []string{url.Location}, timeout) if err != nil { - logger.Warnf("newETCDV3Client(name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}", + logger.Warnf("new client (name{%s}, etcd addresss{%v}, timeout{%d}) = error{%v}", options.name, url.Location, timeout.String(), err) - return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.Location) + return errors.Annotatef(err, "new client (address:%+v)", url.Location) } container.SetClient(newClient) } @@ -403,12 +424,12 @@ func ValidateClient(container clientFacade, opts ...Option) error { err = newClientSet(container.Client().endpoints, container.Client().timeout, container.Client()) if err != nil { - return errors.Annotate(err, "new client set") + return errors.Annotate(err, "new clientset") } container.Client().cs.startMaintenanceChan <- struct{}{} } - return errors.Annotatef(err, "newETCDV3Client(address:%+v)", url.PrimitiveURL) + return nil } func newClient(name string, endpoints []string, timeout time.Duration) (*Client, error) { @@ -427,7 +448,7 @@ func newClient(name string, endpoints []string, timeout time.Duration) (*Client, err = newClientSet(endpoints, timeout, out) if err != nil { - return nil, errors.Annotate(err, "new client set") + return nil, errors.Annotate(err, "new clientset") } // start maintenanceChan @@ -445,19 +466,25 @@ func (c *Client) stop() bool { return false } -func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) { + +func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) error { if key == "" || wc == nil { - return + return errors.New(fmt.Sprintf("key is %s, wc is %v", key, wc)) + } + + wcc, err := c.cs.watch(key) + if err != nil { + return errors.Annotatef(err, "clientset watch %s", key) } c.Lock() a := c.eventRegistry[key] a = append(a, wc) c.eventRegistry[key] = a + c.Unlock() go func() { - wcc := c.cs.rawClient.Watch(c.cs.ctx, key) for msg := range wcc { wc <- msg } @@ -465,8 +492,8 @@ func (c *Client) RegisterEvent(key string, wc chan clientv3.WatchResponse) { close(wc) }() - logger.Debugf("etcdClient{%s} register event{path:%s, ptr:%p}", c.name, key, wc) - c.Unlock() + logger.Debugf("etcdv3 client{%s} register event{key:%s, ptr:%p}", c.name, key, wc) + return nil } func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) { @@ -485,10 +512,10 @@ func (c *Client) UnregisterEvent(key string, event chan clientv3.WatchResponse) if e == event { arr := infoList infoList = append(arr[:i], arr[i+1:]...) - logger.Debugf("etcdClient{%s} unregister event{path:%s, event:%p}", c.name, key, event) + logger.Debugf("etcdv3 client{%s} unregister event{key:%s, event:%p}", c.name, key, event) } } - logger.Debugf("after etcdClient{%s} unregister event{path:%s, event:%p}, array length %d", + logger.Debugf("after etcdv3 client{%s} unregister event{key:%s, event:%p}, array length %d", c.name, key, event, len(infoList)) if len(infoList) == 0 { delete(c.eventRegistry, key) @@ -531,7 +558,7 @@ func (c *Client) Close() { c.cs = nil } c.Unlock() - logger.Warnf("etcd client{name:%s, etcd addr:%s} exit now.", c.name, c.endpoints) + logger.Warnf("etcdv3 client{name:%s, etcdv3 addr:%s} exit now.", c.name, c.endpoints) } func (c *Client) Create(k string, v string) error { @@ -543,7 +570,7 @@ func (c *Client) Create(k string, v string) error { err = c.cs.put(k, v) } c.Unlock() - return errors.Annotatef(err, "etcd client put key %s value %s", k, v) + return errors.Annotatef(err, "clientset put key %s value %s", k, v) } func (c *Client) Delete(key string) error { @@ -554,7 +581,7 @@ func (c *Client) Delete(key string) error { err = c.cs.delete(key) } c.Unlock() - return errors.Annotatef(err, "etcd client delete (basePath:%s)", key) + return errors.Annotatef(err, "clientset delete (key:%s)", key) } func (c *Client) RegisterTemp(basePath string, node string) (string, error) { @@ -566,40 +593,39 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) { err = c.cs.keepAliveKV(completePath, "") } c.Unlock() - logger.Debugf("etcdClient{%s} create a tmp node:%s\n", c.name, completePath) + logger.Debugf("etcdv3 client{%s} create a tmp node:%s\n", c.name, completePath) if err != nil { - return "", errors.Annotatef(err, "etcd client create tmp k %s", completePath) + return "", errors.Annotatef(err, "client create tmp key %s", completePath) } return completePath, nil } -func (c *Client) GetChildrenW(path string) ([]string, clientv3.WatchChan, error) { +func (c *Client) WatchChildren(key string) ([]string, []string, clientv3.WatchChan, error) { var ( - children []string - err error - wc clientv3.WatchChan + err error + childrenKeys []string + childrenValues []string + wc clientv3.WatchChan ) + err = ErrNilETCDV3ClientConn c.Lock() if c.cs != nil { - children, _, wc, err = c.cs.getChildrenW(path) + childrenKeys, childrenValues, wc, err = c.cs.getChildrenW(key) } c.Unlock() if err != nil { - if errors.Cause(err) == ErrKVPairNotFound { - return nil, nil, errors.Annotatef(err,"path{%s} has none children", path) - } - logger.Errorf("etcdv3.ChildrenW(path{%s}) = error(%v)", path, err) - return nil, nil, errors.Annotatef(err, "etcdv3.ChildrenW(path:%s)", path) + logger.Errorf("etcdv3 client Children(key{%s}) = error(%v)", key, perrors.WithStack(err)) + return nil, nil, nil, errors.Annotatef(err, "client ChildrenW(key:%s)", key) } - return children, wc, nil + return childrenKeys, childrenValues, wc, nil } -func (c *Client) GetChildren(path string) ([]string, error) { +func (c *Client) GetChildren(key string) ([]string, error) { var ( err error children []string @@ -608,20 +634,20 @@ func (c *Client) GetChildren(path string) ([]string, error) { err = ErrNilETCDV3ClientConn c.Lock() if c.cs != nil { - children, _, err = c.cs.getChildren(path) + children, _, err = c.cs.getChildren(key) } c.Unlock() if err != nil { if errors.Cause(err) == ErrKVPairNotFound { - return nil, errors.Annotatef(err,"path{%s} has none children", path) + return nil, errors.Annotatef(err, "key{%s} has none children", key) } - logger.Errorf("clientv3.Children(path{%s}) = error(%v)", path, perrors.WithStack(err)) - return nil, errors.Annotatef(err, "clientv3.Children(path:%s)", path) + logger.Errorf("clientv3.Children(key{%s}) = error(%v)", key, perrors.WithStack(err)) + return nil, errors.Annotatef(err, "client GetChildren(key:%s)", key) } return children, nil } -func (c *Client) ExistW(path string) (clientv3.WatchChan, error) { +func (c *Client) WatchExist(key string) (clientv3.WatchChan, error) { var ( err = ErrNilETCDV3ClientConn @@ -630,25 +656,25 @@ func (c *Client) ExistW(path string) (clientv3.WatchChan, error) { c.Lock() if c.cs != nil { - _, err = c.cs.watch(path) + out, err = c.cs.watch(key) } c.Unlock() if err != nil { if errors.Cause(err) == ErrKVPairNotFound { - return nil, errors.Annotatef(err, "path{%s} not exist", path) + return nil, errors.Annotatef(err, "key{%s} not exist", key) } - return nil, errors.Annotatef(err, "clientv3.ExistW(path:%s)", path) + return nil, errors.Annotatef(err, "client WatchExist(key:%s)", key) } return out, nil } -func (c *Client) GetContent(path string) ([]byte, error) { +func (c *Client) GetContent(key string) ([]byte, error) { c.Lock() - value, err := c.cs.get(path) + value, err := c.cs.get(key) if err != nil { - return nil, errors.Annotatef(err, "client set get: %s", path) + return nil, errors.Annotatef(err, "clientset get(key: %s)", key) } c.Unlock() diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index ea71233db..bc5dcd444 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -1,17 +1,14 @@ package etcdv3 import ( - "gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path" - pathlib "path" "sync" "time" + + "go.etcd.io/etcd/clientv3" ) import ( "github.com/coreos/etcd/mvcc/mvccpb" - "github.com/dubbogo/getty" - perrors "github.com/pkg/errors" - "github.com/samuel/go-zookeeper/zk" ) import ( @@ -20,31 +17,32 @@ import ( ) type EventListener struct { - client *Client - pathMapLock sync.Mutex - pathMap map[string]struct{} - wg sync.WaitGroup + client *Client + keyMapLock sync.Mutex + keyMap map[string]struct{} + wg sync.WaitGroup } func NewEventListener(client *Client) *EventListener { return &EventListener{ - client: client, - pathMap: make(map[string]struct{}), + client: client, + keyMap: make(map[string]struct{}), } } func (l *EventListener) SetClient(client *Client) { l.client = client } -// this method will return true when spec path deleted, +// Listen on a spec key +// this method will return true when spec key deleted, // this method will return false when deep layer connection lose -func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting.DataListener) bool { +func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { l.wg.Add(1) defer l.wg.Done() for { - keyEventCh, err := l.client.ExistW(path) + keyEventCh, err := l.client.WatchExist(key) if err != nil { - logger.Warnf("existW{key:%s} = error{%v}", path, err) + logger.Warnf("WatchExist{key:%s} = error{%v}", key, err) return false } @@ -61,25 +59,12 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting // etcd event stream case e := <-keyEventCh: - if e.Err() != nil{ - logger.Warnf("get a etcd event {err: %s}", e.Err()) + if e.Err() != nil { + logger.Errorf("get a etcdv3 event {err: %s}", e.Err()) + continue } - for _, event := range e.Events{ - logger.Warnf("get a etcd Event{type:%s, path:%s,}", - event.Type.String(), event.Kv.Key ) - switch event.Type { - case mvccpb.PUT: - if len(listener) > 0 { - if event.IsCreate(){ - logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) - listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EventTypeAdd, Content: string(event.Kv.Value)}) - }else{ - logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) - listener[0].DataChange(remoting.Event{Path: string(event.Kv.Key), Action: remoting.EvnetTypeUpdate, Content: string(event.Kv.Value)}) - } - } - case mvccpb.DELETE: - logger.Warnf("etcdV3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key) + for _, event := range e.Events { + if l.handleEvents(event, listener...) { return true } } @@ -89,285 +74,120 @@ func (l *EventListener) ListenServiceNodeEvent(path string, listener ...remoting return false } - -func (l *EventListener) handleNodeEvent(path string, children []string, listener remoting.DataListener) { - contains := func(s []string, e string) bool { - for _, a := range s { - if a == e { - return true +// return true mean the event type is DELETE +// return false mean the event type is CREATE || UPDATE +func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool { + + logger.Warnf("get a etcdv3 Event {type: %s, key: %s}", event.Type, event.Kv.Key) + + switch event.Type { + // the etcdv3 event just include PUT && DELETE + case mvccpb.PUT: + for _, listener := range listeners { + switch event.IsCreate() { + case true: + logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EventTypeAdd, + Content: string(event.Kv.Value), + }) + case false: + logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EvnetTypeUpdate, + Content: string(event.Kv.Value), + }) } } - return false + case mvccpb.DELETE: + logger.Warnf("etcdv3.ExistW(key{%s}) = event{EventNodeDeleted}", event.Kv.Key) + return true } - newChildren, err := l.client.GetChildren(path) - if err != nil { - logger.Errorf("path{%s} child nodes changed, etcdV3.Children() = error{%v}", path, perrors.WithStack(err)) - return - } - - // a node was added -- listen the new node - var ( - newNode string - ) - for _, n := range newChildren { - if contains(children, n) { - continue - } - - newNode = pathlib.Join(path, n) - logger.Infof("add zkNode{%s}", newNode) - content, _, err := l.client.Conn.Get(newNode) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(err)) - } - - if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue - } - // listen l service node - go func(node, childNode string) { - logger.Infof("delete zkNode{%s}", node) - if l.ListenServiceNodeEvent(node, listener) { - logger.Infof("delete content{%s}", childNode) - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(newNode, n) - } - - // old node was deleted - var oldNode string - for _, n := range children { - if contains(newChildren, n) { - continue - } - - oldNode = path.Join(zkPath, n) - logger.Warnf("delete zkPath{%s}", oldNode) - - if err != nil { - logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err)) - continue - } - listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel}) - } + panic("unreachable") } -func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataListener) { +// Listen on a set of key with spec prefix +func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { + l.wg.Add(1) defer l.wg.Done() - - var ( - failTimes int - event chan struct{} - zkEvent zk.Event - ) - event = make(chan struct{}, 4) - defer close(event) for { - // get current children for a zkPath - children, childEventCh, err := l.client.GetChildrenW(zkPath) + _, _, wc, err := l.client.WatchChildren(prefix) if err != nil { - failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes - } - logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err) - // clear the event channel - CLEAR: - for { - select { - case <-event: - default: - break CLEAR - } - } - l.client.RegisterEvent(zkPath, &event) - select { - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): - l.client.UnregisterEvent(zkPath, &event) - continue - case <-l.client.Done(): - l.client.UnregisterEvent(zkPath, &event) - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) - return - case <-event: - logger.Infof("get zk.EventNodeDataChange notify event") - l.client.UnregisterEvent(zkPath, &event) - l.handleZkNodeEvent(zkPath, nil, listener) - continue - } + logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) } - failTimes = 0 - for _, c := range children { - // listen l service node - dubboPath := path.Join(zkPath, c) - content, _, err := l.client.Conn.Get(dubboPath) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) - } - logger.Infof("Get children!{%s}", dubboPath) - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue - } - logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string) { - if l.ListenServiceNodeEvent(dubboPath) { - listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath) - - //liten sub path recursive - go func(zkPath string, listener remoting.DataListener) { - l.listenDirEvent(zkPath, listener) - logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) - }(dubboPath, listener) - } select { - case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", - zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) - if zkEvent.Type != zk.EventNodeChildrenChanged { - continue - } - l.handleZkNodeEvent(zkEvent.Path, children, listener) + + // client watch ctx stop + // server stopped + case <-l.client.cs.ctx.Done(): + logger.Warn("etcd listener service node with prefix etcd server stopped") + return + + // client stopped case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + logger.Warn("etcdv3 client stopped") return + + // etcd event stream + case e := <-wc: + if e.Err() != nil { + logger.Errorf("get a etcdv3 event {err: %s}", e.Err()) + continue + } + for _, event := range e.Events { + l.handleEvents(event, listener...) + } } } } -// -//func (l *ZkEventListener) listenFileEvent(zkPath string, listener remoting.DataListener) { -// l.wg.EventTypeAdd(1) -// defer l.wg.Done() -// -// var ( -// failTimes int -// event chan struct{} -// zkEvent zk.Event -// ) -// event = make(chan struct{}, 4) -// defer close(event) -// for { -// // get current children for a zkPath -// content,_, eventCh, err := l.client.Conn.GetW(zkPath) -// if err != nil { -// failTimes++ -// if MaxFailTimes <= failTimes { -// failTimes = MaxFailTimes -// } -// logger.Errorf("listenFileEvent(path{%s}) = error{%v}", zkPath, err) -// // clear the event channel -// CLEAR: -// for { -// select { -// case <-event: -// default: -// break CLEAR -// } -// } -// l.client.RegisterEvent(zkPath, &event) -// select { -// case <-time.After(timeSecondDuration(failTimes * ConnDelay)): -// l.client.UnregisterEvent(zkPath, &event) -// continue -// case <-l.client.Done(): -// l.client.UnregisterEvent(zkPath, &event) -// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) -// return -// case <-event: -// logger.Infof("get zk.EventNodeDataChange notify event") -// l.client.UnregisterEvent(zkPath, &event) -// l.handleZkNodeEvent(zkPath, nil, listener) -// continue -// } -// } -// failTimes = 0 -// -// select { -// case zkEvent = <-eventCh: -// logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", -// zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) -// -// l.handleZkNodeEvent(zkEvent.Path, children, listener) -// case <-l.client.Done(): -// logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) -// return -// } -// } -//} - func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -// this func is invoked by ZkConsumerRegistry::Registe/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener +// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // | // --------> ListenServiceNodeEvent -func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) { - var ( - err error - dubboPath string - children []string - ) +func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { - l.pathMapLock.Lock() - _, ok := l.pathMap[zkPath] - l.pathMapLock.Unlock() + l.keyMapLock.Lock() + _, ok := l.keyMap[key] + l.keyMapLock.Unlock() if ok { - logger.Warnf("@zkPath %s has already been listened.", zkPath) + logger.Warnf("etcdv3 key %s has already been listened.", key) return } - l.pathMapLock.Lock() - l.pathMap[zkPath] = struct{}{} - l.pathMapLock.Unlock() + l.keyMapLock.Lock() + l.keyMap[key] = struct{}{} + l.keyMapLock.Unlock() - logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath) - children, err = l.client.GetChildren(zkPath) - if err != nil { - children = nil - logger.Warnf("fail to get children of zk path{%s}", zkPath) - } - - for _, c := range children { + logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) + go func(key string, listener remoting.DataListener) { + l.ListenServiceNodeEventWithPrefix(key, listener) + logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) + }(key, listener) - // listen l service node - dubboPath = path.Join(zkPath, c) - content, _, err := l.client.Conn.Get(dubboPath) - if err != nil { - logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err)) - } - if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) { - continue + logger.Infof("listen dubbo service key{%s}", key) + go func(key string) { + if l.ListenServiceNodeEvent(key) { + listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) } - logger.Infof("listen dubbo service key{%s}", dubboPath) - go func(zkPath string) { - if l.ListenServiceNodeEvent(dubboPath) { - listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(dubboPath) - } - - logger.Infof("listen dubbo path{%s}", zkPath) - go func(zkPath string, listener remoting.DataListener) { - l.listenDirEvent(zkPath, listener) - logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) - }(zkPath, listener) + logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key) + }(key) } -func (l *ZkEventListener) valid() bool { - return l.client.ZkConnValid() +func (l *EventListener) valid() bool { + return l.client.Valid() } -func (l *ZkEventListener) Close() { +func (l *EventListener) Close() { l.wg.Wait() } -- GitLab