diff --git a/remoting/etcdv3/coverage.html b/remoting/etcdv3/coverage.html deleted file mode 100644 index 04a7ab97cadd39c2fa5aeeff6531069486ff038c..0000000000000000000000000000000000000000 --- a/remoting/etcdv3/coverage.html +++ /dev/null @@ -1,970 +0,0 @@ - -<!DOCTYPE html> -<html> - <head> - <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> - <style> - body { - background: black; - color: rgb(80, 80, 80); - } - body, pre, #legend span { - font-family: Menlo, monospace; - font-weight: bold; - } - #topbar { - background: black; - position: fixed; - top: 0; left: 0; right: 0; - height: 42px; - border-bottom: 1px solid rgb(80, 80, 80); - } - #content { - margin-top: 50px; - } - #nav, #legend { - float: left; - margin-left: 10px; - } - #legend { - margin-top: 12px; - } - #nav { - margin-top: 10px; - } - #legend span { - margin: 0 5px; - } - .cov0 { color: rgb(192, 0, 0) } -.cov1 { color: rgb(128, 128, 128) } -.cov2 { color: rgb(116, 140, 131) } -.cov3 { color: rgb(104, 152, 134) } -.cov4 { color: rgb(92, 164, 137) } -.cov5 { color: rgb(80, 176, 140) } -.cov6 { color: rgb(68, 188, 143) } -.cov7 { color: rgb(56, 200, 146) } -.cov8 { color: rgb(44, 212, 149) } -.cov9 { color: rgb(32, 224, 152) } -.cov10 { color: rgb(20, 236, 155) } - - </style> - </head> - <body> - <div id="topbar"> - <div id="nav"> - <select id="files"> - - <option value="file0">github.com/apache/dubbo-go/remoting/etcdv3/client.go (65.0%)</option> - - <option value="file1">github.com/apache/dubbo-go/remoting/etcdv3/facade.go (0.0%)</option> - - <option value="file2">github.com/apache/dubbo-go/remoting/etcdv3/listener.go (57.3%)</option> - - </select> - </div> - <div id="legend"> - <span>not tracked</span> - - <span class="cov0">not covered</span> - <span class="cov8">covered</span> - - </div> - </div> - <div id="content"> - - <pre class="file" id="file0" style="display: none">/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package etcdv3 - -import ( - "context" - "path" - "sync" - "time" -) - -import ( - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/concurrency" - perrors "github.com/pkg/errors" - "google.golang.org/grpc" -) - -import ( - "github.com/apache/dubbo-go/common/logger" -) - -const ( - // ConnDelay connection dalay - ConnDelay = 3 - // MaxFailTimes max failure times - MaxFailTimes = 15 - // RegistryETCDV3Client client name - RegistryETCDV3Client = "etcd registry" -) - -var ( - // ErrNilETCDV3Client ... - ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR - // ErrKVPairNotFound ... - ErrKVPairNotFound = perrors.New("k/v pair not found") -) - -// Options ... -type Options struct { - name string - endpoints []string - client *Client - timeout time.Duration - heartbeat int // heartbeat second -} - -// Option ... -type Option func(*Options) - -// WithEndpoints ... -func WithEndpoints(endpoints ...string) Option <span class="cov0" title="0">{ - return func(opt *Options) </span><span class="cov0" title="0">{ - opt.endpoints = endpoints - }</span> -} - -// WithName ... -func WithName(name string) Option <span class="cov0" title="0">{ - return func(opt *Options) </span><span class="cov0" title="0">{ - opt.name = name - }</span> -} - -// WithTimeout ... -func WithTimeout(timeout time.Duration) Option <span class="cov0" title="0">{ - return func(opt *Options) </span><span class="cov0" title="0">{ - opt.timeout = timeout - }</span> -} - -// WithHeartbeat ... -func WithHeartbeat(heartbeat int) Option <span class="cov0" title="0">{ - return func(opt *Options) </span><span class="cov0" title="0">{ - opt.heartbeat = heartbeat - }</span> -} - -// ValidateClient ... -func ValidateClient(container clientFacade, opts ...Option) error <span class="cov0" title="0">{ - - options := &Options{ - heartbeat: 1, // default heartbeat - } - for _, opt := range opts </span><span class="cov0" title="0">{ - opt(options) - }</span> - - <span class="cov0" title="0">lock := container.ClientLock() - lock.Lock() - defer lock.Unlock() - - // new Client - if container.Client() == nil </span><span class="cov0" title="0">{ - newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) - if err != nil </span><span class="cov0" title="0">{ - logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", - options.name, options.endpoints, options.timeout, err) - return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) - }</span> - <span class="cov0" title="0">container.SetClient(newClient)</span> - } - - // Client lose connection with etcd server - <span class="cov0" title="0">if container.Client().rawClient == nil </span><span class="cov0" title="0">{ - - newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) - if err != nil </span><span class="cov0" title="0">{ - logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", - options.name, options.endpoints, options.timeout, err) - return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) - }</span> - <span class="cov0" title="0">container.SetClient(newClient)</span> - } - - <span class="cov0" title="0">return nil</span> -} - -// Client ... -type Client struct { - lock sync.RWMutex - - // these properties are only set once when they are started. - name string - endpoints []string - timeout time.Duration - heartbeat int - - ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg - cancel context.CancelFunc // cancel the ctx, all watcher will stopped - rawClient *clientv3.Client - - exit chan struct{} - Wait sync.WaitGroup -} - -func newClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) <span class="cov8" title="1">{ - - ctx, cancel := context.WithCancel(context.Background()) - rawClient, err := clientv3.New(clientv3.Config{ - Context: ctx, - Endpoints: endpoints, - DialTimeout: timeout, - DialOptions: []grpc.DialOption{grpc.WithBlock()}, - }) - if err != nil </span><span class="cov0" title="0">{ - return nil, perrors.WithMessage(err, "new raw client block connect to server") - }</span> - - <span class="cov8" title="1">c := &Client{ - - name: name, - timeout: timeout, - endpoints: endpoints, - heartbeat: heartbeat, - - ctx: ctx, - cancel: cancel, - rawClient: rawClient, - - exit: make(chan struct{}), - } - - if err := c.maintenanceStatus(); err != nil </span><span class="cov0" title="0">{ - return nil, perrors.WithMessage(err, "client maintenance status") - }</span> - <span class="cov8" title="1">return c, nil</span> -} - -// NOTICE: need to get the lock before calling this method -func (c *Client) clean() <span class="cov8" title="1">{ - - // close raw client - c.rawClient.Close() - - // cancel ctx for raw client - c.cancel() - - // clean raw client - c.rawClient = nil -}</span> - -func (c *Client) stop() bool <span class="cov8" title="1">{ - - select </span>{ - case <-c.exit:<span class="cov0" title="0"> - return true</span> - default:<span class="cov8" title="1"> - close(c.exit)</span> - } - <span class="cov8" title="1">return false</span> -} - -// Close ... -func (c *Client) Close() <span class="cov8" title="1">{ - - if c == nil </span><span class="cov0" title="0">{ - return - }</span> - - // stop the client - <span class="cov8" title="1">c.stop() - - // wait client maintenance status stop - c.Wait.Wait() - - c.lock.Lock() - if c.rawClient != nil </span><span class="cov8" title="1">{ - c.clean() - }</span> - <span class="cov8" title="1">c.lock.Unlock() - logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)</span> -} - -func (c *Client) maintenanceStatus() error <span class="cov8" title="1">{ - - s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat)) - if err != nil </span><span class="cov0" title="0">{ - return perrors.WithMessage(err, "new session with server") - }</span> - - // must add wg before go maintenance status goroutine - <span class="cov8" title="1">c.Wait.Add(1) - go c.maintenanceStatusLoop(s) - return nil</span> -} - -func (c *Client) maintenanceStatusLoop(s *concurrency.Session) <span class="cov8" title="1">{ - - defer func() </span><span class="cov8" title="1">{ - c.Wait.Done() - logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name) - }</span>() - - <span class="cov8" title="1">for </span><span class="cov8" title="1">{ - select </span>{ - case <-c.Done():<span class="cov8" title="1"> - // Client be stopped, will clean the client hold resources - return</span> - case <-s.Done():<span class="cov0" title="0"> - logger.Warn("etcd server stopped") - c.lock.Lock() - // when etcd server stopped, cancel ctx, stop all watchers - c.clean() - // when connection lose, stop client, trigger reconnect to etcd - c.stop() - c.lock.Unlock() - return</span> - } - } -} - -// if k not exist will put k/v in etcd -// if k is already exist in etcd, return nil -func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">_, err := c.rawClient.Txn(c.ctx). - If(clientv3.Compare(clientv3.Version(k), "<", 1)). - Then(clientv3.OpPut(k, v, opts...)). - Commit() - if err != nil </span><span class="cov0" title="0">{ - return err - - }</span> - <span class="cov8" title="1">return nil</span> -} - -func (c *Client) delete(k string) error <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">_, err := c.rawClient.Delete(c.ctx, k) - if err != nil </span><span class="cov0" title="0">{ - return err - - }</span> - <span class="cov8" title="1">return nil</span> -} - -func (c *Client) get(k string) (string, error) <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return "", ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">resp, err := c.rawClient.Get(c.ctx, k) - if err != nil </span><span class="cov0" title="0">{ - return "", err - }</span> - - <span class="cov8" title="1">if len(resp.Kvs) == 0 </span><span class="cov8" title="1">{ - return "", ErrKVPairNotFound - }</span> - - <span class="cov8" title="1">return string(resp.Kvs[0].Value), nil</span> -} - -// CleanKV ... -func (c *Client) CleanKV() error <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">_, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix()) - if err != nil </span><span class="cov0" title="0">{ - return err - }</span> - - <span class="cov8" title="1">return nil</span> -} - -func (c *Client) getChildren(k string) ([]string, []string, error) <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return nil, nil, ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix()) - if err != nil </span><span class="cov0" title="0">{ - return nil, nil, err - }</span> - - <span class="cov8" title="1">if len(resp.Kvs) == 0 </span><span class="cov8" title="1">{ - return nil, nil, ErrKVPairNotFound - }</span> - - <span class="cov8" title="1">var ( - kList []string - vList []string - ) - - for _, kv := range resp.Kvs </span><span class="cov8" title="1">{ - kList = append(kList, string(kv.Key)) - vList = append(vList, string(kv.Value)) - }</span> - - <span class="cov8" title="1">return kList, vList, nil</span> -} - -func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return nil, ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil</span> -} - -func (c *Client) watch(k string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return nil, ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">return c.rawClient.Watch(c.ctx, k), nil</span> -} - -func (c *Client) keepAliveKV(k string, v string) error <span class="cov8" title="1">{ - - c.lock.RLock() - defer c.lock.RUnlock() - - if c.rawClient == nil </span><span class="cov0" title="0">{ - return ErrNilETCDV3Client - }</span> - - <span class="cov8" title="1">lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) - if err != nil </span><span class="cov0" title="0">{ - return perrors.WithMessage(err, "grant lease") - }</span> - - <span class="cov8" title="1">keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) - if err != nil || keepAlive == nil </span><span class="cov0" title="0">{ - c.rawClient.Revoke(c.ctx, lease.ID) - return perrors.WithMessage(err, "keep alive lease") - }</span> - - <span class="cov8" title="1">_, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID)) - if err != nil </span><span class="cov0" title="0">{ - return perrors.WithMessage(err, "put k/v with lease") - }</span> - <span class="cov8" title="1">return nil</span> -} - -// Done ... -func (c *Client) Done() <-chan struct{} <span class="cov8" title="1">{ - return c.exit -}</span> - -// Valid ... -func (c *Client) Valid() bool <span class="cov8" title="1">{ - select </span>{ - case <-c.exit:<span class="cov8" title="1"> - return false</span> - default:<span class="cov8" title="1"></span> - } - - <span class="cov8" title="1">c.lock.RLock() - if c.rawClient == nil </span><span class="cov0" title="0">{ - c.lock.RUnlock() - return false - }</span> - <span class="cov8" title="1">c.lock.RUnlock() - return true</span> -} - -// Create ... -func (c *Client) Create(k string, v string) error <span class="cov8" title="1">{ - - err := c.put(k, v) - if err != nil </span><span class="cov0" title="0">{ - return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) - }</span> - <span class="cov8" title="1">return nil</span> -} - -// Delete ... -func (c *Client) Delete(k string) error <span class="cov8" title="1">{ - - err := c.delete(k) - if err != nil </span><span class="cov0" title="0">{ - return perrors.WithMessagef(err, "delete k/v (key %s)", k) - }</span> - - <span class="cov8" title="1">return nil</span> -} - -// RegisterTemp ... -func (c *Client) RegisterTemp(basePath string, node string) (string, error) <span class="cov8" title="1">{ - - completeKey := path.Join(basePath, node) - - err := c.keepAliveKV(completeKey, "") - if err != nil </span><span class="cov0" title="0">{ - return "", perrors.WithMessagef(err, "keepalive kv (key %s)", completeKey) - }</span> - - <span class="cov8" title="1">return completeKey, nil</span> -} - -// GetChildrenKVList ... -func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) <span class="cov8" title="1">{ - - kList, vList, err := c.getChildren(k) - if err != nil </span><span class="cov0" title="0">{ - return nil, nil, perrors.WithMessagef(err, "get key children (key %s)", k) - }</span> - <span class="cov8" title="1">return kList, vList, nil</span> -} - -// Get ... -func (c *Client) Get(k string) (string, error) <span class="cov8" title="1">{ - - v, err := c.get(k) - if err != nil </span><span class="cov8" title="1">{ - return "", perrors.WithMessagef(err, "get key value (key %s)", k) - }</span> - - <span class="cov8" title="1">return v, nil</span> -} - -// Watch ... -func (c *Client) Watch(k string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ - - wc, err := c.watch(k) - if err != nil </span><span class="cov0" title="0">{ - return nil, perrors.WithMessagef(err, "watch prefix (key %s)", k) - }</span> - <span class="cov8" title="1">return wc, nil</span> -} - -// WatchWithPrefix ... -func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ - - wc, err := c.watchWithPrefix(prefix) - if err != nil </span><span class="cov0" title="0">{ - return nil, perrors.WithMessagef(err, "watch prefix (key %s)", prefix) - }</span> - <span class="cov8" title="1">return wc, nil</span> -} -</pre> - - <pre class="file" id="file1" style="display: none">/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package etcdv3 - -import ( - "sync" - "time" -) - -import ( - "github.com/dubbogo/getty" - perrors "github.com/pkg/errors" -) - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" -) - -type clientFacade interface { - Client() *Client - SetClient(*Client) - ClientLock() *sync.Mutex - WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container - Done() chan struct{} //for etcd client control - RestartCallBack() bool - common.Node -} - -// HandleClientRestart ... -func HandleClientRestart(r clientFacade) <span class="cov0" title="0">{ - - var ( - err error - failTimes int - ) - - defer r.WaitGroup().Done() -LOOP: - for </span><span class="cov0" title="0">{ - select </span>{ - case <-r.Done():<span class="cov0" title="0"> - logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...") - break LOOP</span> - // re-register all services - case <-r.Client().Done():<span class="cov0" title="0"> - r.ClientLock().Lock() - clientName := RegistryETCDV3Client - timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - endpoint := r.GetUrl().Location - r.Client().Close() - r.SetClient(nil) - r.ClientLock().Unlock() - - // try to connect to etcd, - failTimes = 0 - for </span><span class="cov0" title="0">{ - select </span>{ - case <-r.Done():<span class="cov0" title="0"> - logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") - break LOOP</span> - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):<span class="cov0" title="0"></span> // avoid connect frequent - } - <span class="cov0" title="0">err = ValidateClient( - r, - WithName(clientName), - WithEndpoints(endpoint), - WithTimeout(timeout), - ) - logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}", - endpoint, perrors.WithStack(err)) - if err == nil </span><span class="cov0" title="0">{ - if r.RestartCallBack() </span><span class="cov0" title="0">{ - break</span> - } - } - <span class="cov0" title="0">failTimes++ - if MaxFailTimes <= failTimes </span><span class="cov0" title="0">{ - failTimes = MaxFailTimes - }</span> - } - } - } -} -</pre> - - <pre class="file" id="file2" style="display: none">/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package etcdv3 - -import ( - "sync" - "time" -) - -import ( - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" - perrors "github.com/pkg/errors" -) - -import ( - "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/remoting" -) - -// EventListener ... -type EventListener struct { - client *Client - keyMapLock sync.Mutex - keyMap map[string]struct{} - wg sync.WaitGroup -} - -// NewEventListener ... -func NewEventListener(client *Client) *EventListener <span class="cov8" title="1">{ - return &EventListener{ - client: client, - keyMap: make(map[string]struct{}), - } -}</span> - -// ListenServiceNodeEvent Listen on a spec key -// this method will return true when spec key deleted, -// this method will return false when deep layer connection lose -func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool <span class="cov8" title="1">{ - l.wg.Add(1) - defer l.wg.Done() - for </span><span class="cov8" title="1">{ - wc, err := l.client.Watch(key) - if err != nil </span><span class="cov0" title="0">{ - logger.Warnf("WatchExist{key:%s} = error{%v}", key, err) - return false - }</span> - - <span class="cov8" title="1">select </span>{ - - // client stopped - case <-l.client.Done():<span class="cov0" title="0"> - logger.Warnf("etcd client stopped") - return false</span> - - // client ctx stop - case <-l.client.ctx.Done():<span class="cov0" title="0"> - logger.Warnf("etcd client ctx cancel") - return false</span> - - // handle etcd events - case e, ok := <-wc:<span class="cov8" title="1"> - if !ok </span><span class="cov0" title="0">{ - logger.Warnf("etcd watch-chan closed") - return false - }</span> - - <span class="cov8" title="1">if e.Err() != nil </span><span class="cov0" title="0">{ - logger.Errorf("etcd watch ERR {err: %s}", e.Err()) - continue</span> - } - <span class="cov8" title="1">for _, event := range e.Events </span><span class="cov8" title="1">{ - if l.handleEvents(event, listener...) </span><span class="cov0" title="0">{ - // if event is delete - return true - }</span> - } - } - } - - <span class="cov0" title="0">return false</span> -} - -// return true mean the event type is DELETE -// return false mean the event type is CREATE || UPDATE -func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool <span class="cov8" title="1">{ - - logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key) - - switch event.Type </span>{ - // the etcdv3 event just include PUT && DELETE - case mvccpb.PUT:<span class="cov8" title="1"> - for _, listener := range listeners </span><span class="cov8" title="1">{ - switch event.IsCreate() </span>{ - case true:<span class="cov8" title="1"> - logger.Infof("etcd get event (key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) - listener.DataChange(remoting.Event{ - Path: string(event.Kv.Key), - Action: remoting.EventTypeAdd, - Content: string(event.Kv.Value), - })</span> - case false:<span class="cov0" title="0"> - logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) - listener.DataChange(remoting.Event{ - Path: string(event.Kv.Key), - Action: remoting.EventTypeUpdate, - Content: string(event.Kv.Value), - })</span> - } - } - <span class="cov8" title="1">return false</span> - case mvccpb.DELETE:<span class="cov0" title="0"> - logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key) - return true</span> - - default:<span class="cov0" title="0"> - return false</span> - } - - <span class="cov0" title="0">panic("unreachable")</span> -} - -// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix -func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) <span class="cov8" title="1">{ - - l.wg.Add(1) - defer l.wg.Done() - for </span><span class="cov8" title="1">{ - wc, err := l.client.WatchWithPrefix(prefix) - if err != nil </span><span class="cov0" title="0">{ - logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) - }</span> - - <span class="cov8" title="1">select </span>{ - - // client stopped - case <-l.client.Done():<span class="cov0" title="0"> - logger.Warnf("etcd client stopped") - return</span> - - // client ctx stop - case <-l.client.ctx.Done():<span class="cov0" title="0"> - logger.Warnf("etcd client ctx cancel") - return</span> - - // etcd event stream - case e, ok := <-wc:<span class="cov8" title="1"> - - if !ok </span><span class="cov0" title="0">{ - logger.Warnf("etcd watch-chan closed") - return - }</span> - - <span class="cov8" title="1">if e.Err() != nil </span><span class="cov0" title="0">{ - logger.Errorf("etcd watch ERR {err: %s}", e.Err()) - continue</span> - } - <span class="cov8" title="1">for _, event := range e.Events </span><span class="cov8" title="1">{ - l.handleEvents(event, listener...) - }</span> - } - } -} - -func timeSecondDuration(sec int) time.Duration <span class="cov0" title="0">{ - return time.Duration(sec) * time.Second -}</span> - -// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener -// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent -// | -// --------> ListenServiceNodeEvent -func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) <span class="cov8" title="1">{ - - l.keyMapLock.Lock() - _, ok := l.keyMap[key] - l.keyMapLock.Unlock() - if ok </span><span class="cov0" title="0">{ - logger.Warnf("etcdv3 key %s has already been listened.", key) - return - }</span> - - <span class="cov8" title="1">l.keyMapLock.Lock() - l.keyMap[key] = struct{}{} - l.keyMapLock.Unlock() - - keyList, valueList, err := l.client.getChildren(key) - if err != nil </span><span class="cov8" title="1">{ - logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) - }</span> - - <span class="cov8" title="1">logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) - - for i, k := range keyList </span><span class="cov0" title="0">{ - logger.Infof("got children list key -> %s", k) - listener.DataChange(remoting.Event{ - Path: k, - Action: remoting.EventTypeAdd, - Content: valueList[i], - }) - }</span> - - <span class="cov8" title="1">logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) - go func(key string, listener remoting.DataListener) </span><span class="cov8" title="1">{ - l.ListenServiceNodeEventWithPrefix(key, listener) - logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) - }</span>(key, listener) - - <span class="cov8" title="1">logger.Infof("listen dubbo service key{%s}", key) - go func(key string) </span><span class="cov8" title="1">{ - if l.ListenServiceNodeEvent(key) </span><span class="cov0" title="0">{ - listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) - }</span> - <span class="cov0" title="0">logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)</span> - }(key) -} - -// Close ... -func (l *EventListener) Close() <span class="cov0" title="0">{ - l.wg.Wait() -}</span> -</pre> - - </div> - </body> - <script> - (function() { - var files = document.getElementById('files'); - var visible; - files.addEventListener('change', onChange, false); - function select(part) { - if (visible) - visible.style.display = 'none'; - visible = document.getElementById(part); - if (!visible) - return; - files.value = part; - visible.style.display = 'block'; - location.hash = part; - } - function onChange() { - select(files.value); - window.scrollTo(0, 0); - } - if (location.hash != "") { - select(location.hash.substr(1)); - } - if (!visible) { - select("file0"); - } - })(); - </script> -</html> diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go index 7345ddfbd5f00bb169c62d8d42148163a3298da2..aedd7ced83f47bf80efa09d194e97f16840ebbba 100644 --- a/remoting/kubernetes/client_test.go +++ b/remoting/kubernetes/client_test.go @@ -395,19 +395,22 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { go func() { - defer wg.Done() - wc, err := client.WatchWithPrefix(prefix) if err != nil { t.Fatal(err) } + wg.Done() + for e := range wc { t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) } }() + // must wait the watch goroutine work + wg.Wait() + for _, tc := range tests { k := tc.input.k @@ -419,7 +422,6 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { } client.Close() - wg.Wait() } func (s *KubernetesClientTestSuite) TestNewClient() { @@ -444,12 +446,11 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { go func() { - defer wg.Done() - wc, err := client.Watch(prefix) if err != nil { t.Fatal(err) } + wg.Done() for e := range wc { t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) @@ -457,6 +458,9 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { }() + // must wait the watch goroutine already start the watch goroutine + wg.Wait() + for _, tc := range tests { k := tc.input.k @@ -468,7 +472,6 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { } client.Close() - wg.Wait() } func TestKubernetesClient(t *testing.T) {