diff --git a/remoting/etcdv3/coverage.html b/remoting/etcdv3/coverage.html new file mode 100644 index 0000000000000000000000000000000000000000..04a7ab97cadd39c2fa5aeeff6531069486ff038c --- /dev/null +++ b/remoting/etcdv3/coverage.html @@ -0,0 +1,970 @@ + +<!DOCTYPE html> +<html> + <head> + <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> + <style> + body { + background: black; + color: rgb(80, 80, 80); + } + body, pre, #legend span { + font-family: Menlo, monospace; + font-weight: bold; + } + #topbar { + background: black; + position: fixed; + top: 0; left: 0; right: 0; + height: 42px; + border-bottom: 1px solid rgb(80, 80, 80); + } + #content { + margin-top: 50px; + } + #nav, #legend { + float: left; + margin-left: 10px; + } + #legend { + margin-top: 12px; + } + #nav { + margin-top: 10px; + } + #legend span { + margin: 0 5px; + } + .cov0 { color: rgb(192, 0, 0) } +.cov1 { color: rgb(128, 128, 128) } +.cov2 { color: rgb(116, 140, 131) } +.cov3 { color: rgb(104, 152, 134) } +.cov4 { color: rgb(92, 164, 137) } +.cov5 { color: rgb(80, 176, 140) } +.cov6 { color: rgb(68, 188, 143) } +.cov7 { color: rgb(56, 200, 146) } +.cov8 { color: rgb(44, 212, 149) } +.cov9 { color: rgb(32, 224, 152) } +.cov10 { color: rgb(20, 236, 155) } + + </style> + </head> + <body> + <div id="topbar"> + <div id="nav"> + <select id="files"> + + <option value="file0">github.com/apache/dubbo-go/remoting/etcdv3/client.go (65.0%)</option> + + <option value="file1">github.com/apache/dubbo-go/remoting/etcdv3/facade.go (0.0%)</option> + + <option value="file2">github.com/apache/dubbo-go/remoting/etcdv3/listener.go (57.3%)</option> + + </select> + </div> + <div id="legend"> + <span>not tracked</span> + + <span class="cov0">not covered</span> + <span class="cov8">covered</span> + + </div> + </div> + <div id="content"> + + <pre class="file" id="file0" style="display: none">/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcdv3 + +import ( + "context" + "path" + "sync" + "time" +) + +import ( + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + perrors "github.com/pkg/errors" + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/common/logger" +) + +const ( + // ConnDelay connection dalay + ConnDelay = 3 + // MaxFailTimes max failure times + MaxFailTimes = 15 + // RegistryETCDV3Client client name + RegistryETCDV3Client = "etcd registry" +) + +var ( + // ErrNilETCDV3Client ... + ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR + // ErrKVPairNotFound ... + ErrKVPairNotFound = perrors.New("k/v pair not found") +) + +// Options ... +type Options struct { + name string + endpoints []string + client *Client + timeout time.Duration + heartbeat int // heartbeat second +} + +// Option ... +type Option func(*Options) + +// WithEndpoints ... +func WithEndpoints(endpoints ...string) Option <span class="cov0" title="0">{ + return func(opt *Options) </span><span class="cov0" title="0">{ + opt.endpoints = endpoints + }</span> +} + +// WithName ... +func WithName(name string) Option <span class="cov0" title="0">{ + return func(opt *Options) </span><span class="cov0" title="0">{ + opt.name = name + }</span> +} + +// WithTimeout ... +func WithTimeout(timeout time.Duration) Option <span class="cov0" title="0">{ + return func(opt *Options) </span><span class="cov0" title="0">{ + opt.timeout = timeout + }</span> +} + +// WithHeartbeat ... +func WithHeartbeat(heartbeat int) Option <span class="cov0" title="0">{ + return func(opt *Options) </span><span class="cov0" title="0">{ + opt.heartbeat = heartbeat + }</span> +} + +// ValidateClient ... +func ValidateClient(container clientFacade, opts ...Option) error <span class="cov0" title="0">{ + + options := &Options{ + heartbeat: 1, // default heartbeat + } + for _, opt := range opts </span><span class="cov0" title="0">{ + opt(options) + }</span> + + <span class="cov0" title="0">lock := container.ClientLock() + lock.Lock() + defer lock.Unlock() + + // new Client + if container.Client() == nil </span><span class="cov0" title="0">{ + newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) + if err != nil </span><span class="cov0" title="0">{ + logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", + options.name, options.endpoints, options.timeout, err) + return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) + }</span> + <span class="cov0" title="0">container.SetClient(newClient)</span> + } + + // Client lose connection with etcd server + <span class="cov0" title="0">if container.Client().rawClient == nil </span><span class="cov0" title="0">{ + + newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) + if err != nil </span><span class="cov0" title="0">{ + logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", + options.name, options.endpoints, options.timeout, err) + return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints) + }</span> + <span class="cov0" title="0">container.SetClient(newClient)</span> + } + + <span class="cov0" title="0">return nil</span> +} + +// Client ... +type Client struct { + lock sync.RWMutex + + // these properties are only set once when they are started. + name string + endpoints []string + timeout time.Duration + heartbeat int + + ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg + cancel context.CancelFunc // cancel the ctx, all watcher will stopped + rawClient *clientv3.Client + + exit chan struct{} + Wait sync.WaitGroup +} + +func newClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) <span class="cov8" title="1">{ + + ctx, cancel := context.WithCancel(context.Background()) + rawClient, err := clientv3.New(clientv3.Config{ + Context: ctx, + Endpoints: endpoints, + DialTimeout: timeout, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + }) + if err != nil </span><span class="cov0" title="0">{ + return nil, perrors.WithMessage(err, "new raw client block connect to server") + }</span> + + <span class="cov8" title="1">c := &Client{ + + name: name, + timeout: timeout, + endpoints: endpoints, + heartbeat: heartbeat, + + ctx: ctx, + cancel: cancel, + rawClient: rawClient, + + exit: make(chan struct{}), + } + + if err := c.maintenanceStatus(); err != nil </span><span class="cov0" title="0">{ + return nil, perrors.WithMessage(err, "client maintenance status") + }</span> + <span class="cov8" title="1">return c, nil</span> +} + +// NOTICE: need to get the lock before calling this method +func (c *Client) clean() <span class="cov8" title="1">{ + + // close raw client + c.rawClient.Close() + + // cancel ctx for raw client + c.cancel() + + // clean raw client + c.rawClient = nil +}</span> + +func (c *Client) stop() bool <span class="cov8" title="1">{ + + select </span>{ + case <-c.exit:<span class="cov0" title="0"> + return true</span> + default:<span class="cov8" title="1"> + close(c.exit)</span> + } + <span class="cov8" title="1">return false</span> +} + +// Close ... +func (c *Client) Close() <span class="cov8" title="1">{ + + if c == nil </span><span class="cov0" title="0">{ + return + }</span> + + // stop the client + <span class="cov8" title="1">c.stop() + + // wait client maintenance status stop + c.Wait.Wait() + + c.lock.Lock() + if c.rawClient != nil </span><span class="cov8" title="1">{ + c.clean() + }</span> + <span class="cov8" title="1">c.lock.Unlock() + logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)</span> +} + +func (c *Client) maintenanceStatus() error <span class="cov8" title="1">{ + + s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat)) + if err != nil </span><span class="cov0" title="0">{ + return perrors.WithMessage(err, "new session with server") + }</span> + + // must add wg before go maintenance status goroutine + <span class="cov8" title="1">c.Wait.Add(1) + go c.maintenanceStatusLoop(s) + return nil</span> +} + +func (c *Client) maintenanceStatusLoop(s *concurrency.Session) <span class="cov8" title="1">{ + + defer func() </span><span class="cov8" title="1">{ + c.Wait.Done() + logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name) + }</span>() + + <span class="cov8" title="1">for </span><span class="cov8" title="1">{ + select </span>{ + case <-c.Done():<span class="cov8" title="1"> + // Client be stopped, will clean the client hold resources + return</span> + case <-s.Done():<span class="cov0" title="0"> + logger.Warn("etcd server stopped") + c.lock.Lock() + // when etcd server stopped, cancel ctx, stop all watchers + c.clean() + // when connection lose, stop client, trigger reconnect to etcd + c.stop() + c.lock.Unlock() + return</span> + } + } +} + +// if k not exist will put k/v in etcd +// if k is already exist in etcd, return nil +func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">_, err := c.rawClient.Txn(c.ctx). + If(clientv3.Compare(clientv3.Version(k), "<", 1)). + Then(clientv3.OpPut(k, v, opts...)). + Commit() + if err != nil </span><span class="cov0" title="0">{ + return err + + }</span> + <span class="cov8" title="1">return nil</span> +} + +func (c *Client) delete(k string) error <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">_, err := c.rawClient.Delete(c.ctx, k) + if err != nil </span><span class="cov0" title="0">{ + return err + + }</span> + <span class="cov8" title="1">return nil</span> +} + +func (c *Client) get(k string) (string, error) <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return "", ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">resp, err := c.rawClient.Get(c.ctx, k) + if err != nil </span><span class="cov0" title="0">{ + return "", err + }</span> + + <span class="cov8" title="1">if len(resp.Kvs) == 0 </span><span class="cov8" title="1">{ + return "", ErrKVPairNotFound + }</span> + + <span class="cov8" title="1">return string(resp.Kvs[0].Value), nil</span> +} + +// CleanKV ... +func (c *Client) CleanKV() error <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">_, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix()) + if err != nil </span><span class="cov0" title="0">{ + return err + }</span> + + <span class="cov8" title="1">return nil</span> +} + +func (c *Client) getChildren(k string) ([]string, []string, error) <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return nil, nil, ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix()) + if err != nil </span><span class="cov0" title="0">{ + return nil, nil, err + }</span> + + <span class="cov8" title="1">if len(resp.Kvs) == 0 </span><span class="cov8" title="1">{ + return nil, nil, ErrKVPairNotFound + }</span> + + <span class="cov8" title="1">var ( + kList []string + vList []string + ) + + for _, kv := range resp.Kvs </span><span class="cov8" title="1">{ + kList = append(kList, string(kv.Key)) + vList = append(vList, string(kv.Value)) + }</span> + + <span class="cov8" title="1">return kList, vList, nil</span> +} + +func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return nil, ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil</span> +} + +func (c *Client) watch(k string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return nil, ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">return c.rawClient.Watch(c.ctx, k), nil</span> +} + +func (c *Client) keepAliveKV(k string, v string) error <span class="cov8" title="1">{ + + c.lock.RLock() + defer c.lock.RUnlock() + + if c.rawClient == nil </span><span class="cov0" title="0">{ + return ErrNilETCDV3Client + }</span> + + <span class="cov8" title="1">lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) + if err != nil </span><span class="cov0" title="0">{ + return perrors.WithMessage(err, "grant lease") + }</span> + + <span class="cov8" title="1">keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID) + if err != nil || keepAlive == nil </span><span class="cov0" title="0">{ + c.rawClient.Revoke(c.ctx, lease.ID) + return perrors.WithMessage(err, "keep alive lease") + }</span> + + <span class="cov8" title="1">_, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID)) + if err != nil </span><span class="cov0" title="0">{ + return perrors.WithMessage(err, "put k/v with lease") + }</span> + <span class="cov8" title="1">return nil</span> +} + +// Done ... +func (c *Client) Done() <-chan struct{} <span class="cov8" title="1">{ + return c.exit +}</span> + +// Valid ... +func (c *Client) Valid() bool <span class="cov8" title="1">{ + select </span>{ + case <-c.exit:<span class="cov8" title="1"> + return false</span> + default:<span class="cov8" title="1"></span> + } + + <span class="cov8" title="1">c.lock.RLock() + if c.rawClient == nil </span><span class="cov0" title="0">{ + c.lock.RUnlock() + return false + }</span> + <span class="cov8" title="1">c.lock.RUnlock() + return true</span> +} + +// Create ... +func (c *Client) Create(k string, v string) error <span class="cov8" title="1">{ + + err := c.put(k, v) + if err != nil </span><span class="cov0" title="0">{ + return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) + }</span> + <span class="cov8" title="1">return nil</span> +} + +// Delete ... +func (c *Client) Delete(k string) error <span class="cov8" title="1">{ + + err := c.delete(k) + if err != nil </span><span class="cov0" title="0">{ + return perrors.WithMessagef(err, "delete k/v (key %s)", k) + }</span> + + <span class="cov8" title="1">return nil</span> +} + +// RegisterTemp ... +func (c *Client) RegisterTemp(basePath string, node string) (string, error) <span class="cov8" title="1">{ + + completeKey := path.Join(basePath, node) + + err := c.keepAliveKV(completeKey, "") + if err != nil </span><span class="cov0" title="0">{ + return "", perrors.WithMessagef(err, "keepalive kv (key %s)", completeKey) + }</span> + + <span class="cov8" title="1">return completeKey, nil</span> +} + +// GetChildrenKVList ... +func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) <span class="cov8" title="1">{ + + kList, vList, err := c.getChildren(k) + if err != nil </span><span class="cov0" title="0">{ + return nil, nil, perrors.WithMessagef(err, "get key children (key %s)", k) + }</span> + <span class="cov8" title="1">return kList, vList, nil</span> +} + +// Get ... +func (c *Client) Get(k string) (string, error) <span class="cov8" title="1">{ + + v, err := c.get(k) + if err != nil </span><span class="cov8" title="1">{ + return "", perrors.WithMessagef(err, "get key value (key %s)", k) + }</span> + + <span class="cov8" title="1">return v, nil</span> +} + +// Watch ... +func (c *Client) Watch(k string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ + + wc, err := c.watch(k) + if err != nil </span><span class="cov0" title="0">{ + return nil, perrors.WithMessagef(err, "watch prefix (key %s)", k) + }</span> + <span class="cov8" title="1">return wc, nil</span> +} + +// WatchWithPrefix ... +func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) <span class="cov8" title="1">{ + + wc, err := c.watchWithPrefix(prefix) + if err != nil </span><span class="cov0" title="0">{ + return nil, perrors.WithMessagef(err, "watch prefix (key %s)", prefix) + }</span> + <span class="cov8" title="1">return wc, nil</span> +} +</pre> + + <pre class="file" id="file1" style="display: none">/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcdv3 + +import ( + "sync" + "time" +) + +import ( + "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +type clientFacade interface { + Client() *Client + SetClient(*Client) + ClientLock() *sync.Mutex + WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container + Done() chan struct{} //for etcd client control + RestartCallBack() bool + common.Node +} + +// HandleClientRestart ... +func HandleClientRestart(r clientFacade) <span class="cov0" title="0">{ + + var ( + err error + failTimes int + ) + + defer r.WaitGroup().Done() +LOOP: + for </span><span class="cov0" title="0">{ + select </span>{ + case <-r.Done():<span class="cov0" title="0"> + logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...") + break LOOP</span> + // re-register all services + case <-r.Client().Done():<span class="cov0" title="0"> + r.ClientLock().Lock() + clientName := RegistryETCDV3Client + timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + endpoint := r.GetUrl().Location + r.Client().Close() + r.SetClient(nil) + r.ClientLock().Unlock() + + // try to connect to etcd, + failTimes = 0 + for </span><span class="cov0" title="0">{ + select </span>{ + case <-r.Done():<span class="cov0" title="0"> + logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...") + break LOOP</span> + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):<span class="cov0" title="0"></span> // avoid connect frequent + } + <span class="cov0" title="0">err = ValidateClient( + r, + WithName(clientName), + WithEndpoints(endpoint), + WithTimeout(timeout), + ) + logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}", + endpoint, perrors.WithStack(err)) + if err == nil </span><span class="cov0" title="0">{ + if r.RestartCallBack() </span><span class="cov0" title="0">{ + break</span> + } + } + <span class="cov0" title="0">failTimes++ + if MaxFailTimes <= failTimes </span><span class="cov0" title="0">{ + failTimes = MaxFailTimes + }</span> + } + } + } +} +</pre> + + <pre class="file" id="file2" style="display: none">/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcdv3 + +import ( + "sync" + "time" +) + +import ( + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/remoting" +) + +// EventListener ... +type EventListener struct { + client *Client + keyMapLock sync.Mutex + keyMap map[string]struct{} + wg sync.WaitGroup +} + +// NewEventListener ... +func NewEventListener(client *Client) *EventListener <span class="cov8" title="1">{ + return &EventListener{ + client: client, + keyMap: make(map[string]struct{}), + } +}</span> + +// ListenServiceNodeEvent Listen on a spec key +// this method will return true when spec key deleted, +// this method will return false when deep layer connection lose +func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool <span class="cov8" title="1">{ + l.wg.Add(1) + defer l.wg.Done() + for </span><span class="cov8" title="1">{ + wc, err := l.client.Watch(key) + if err != nil </span><span class="cov0" title="0">{ + logger.Warnf("WatchExist{key:%s} = error{%v}", key, err) + return false + }</span> + + <span class="cov8" title="1">select </span>{ + + // client stopped + case <-l.client.Done():<span class="cov0" title="0"> + logger.Warnf("etcd client stopped") + return false</span> + + // client ctx stop + case <-l.client.ctx.Done():<span class="cov0" title="0"> + logger.Warnf("etcd client ctx cancel") + return false</span> + + // handle etcd events + case e, ok := <-wc:<span class="cov8" title="1"> + if !ok </span><span class="cov0" title="0">{ + logger.Warnf("etcd watch-chan closed") + return false + }</span> + + <span class="cov8" title="1">if e.Err() != nil </span><span class="cov0" title="0">{ + logger.Errorf("etcd watch ERR {err: %s}", e.Err()) + continue</span> + } + <span class="cov8" title="1">for _, event := range e.Events </span><span class="cov8" title="1">{ + if l.handleEvents(event, listener...) </span><span class="cov0" title="0">{ + // if event is delete + return true + }</span> + } + } + } + + <span class="cov0" title="0">return false</span> +} + +// return true mean the event type is DELETE +// return false mean the event type is CREATE || UPDATE +func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool <span class="cov8" title="1">{ + + logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key) + + switch event.Type </span>{ + // the etcdv3 event just include PUT && DELETE + case mvccpb.PUT:<span class="cov8" title="1"> + for _, listener := range listeners </span><span class="cov8" title="1">{ + switch event.IsCreate() </span>{ + case true:<span class="cov8" title="1"> + logger.Infof("etcd get event (key{%s}) = event{EventNodeDataCreated}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EventTypeAdd, + Content: string(event.Kv.Value), + })</span> + case false:<span class="cov0" title="0"> + logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key) + listener.DataChange(remoting.Event{ + Path: string(event.Kv.Key), + Action: remoting.EventTypeUpdate, + Content: string(event.Kv.Value), + })</span> + } + } + <span class="cov8" title="1">return false</span> + case mvccpb.DELETE:<span class="cov0" title="0"> + logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key) + return true</span> + + default:<span class="cov0" title="0"> + return false</span> + } + + <span class="cov0" title="0">panic("unreachable")</span> +} + +// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix +func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) <span class="cov8" title="1">{ + + l.wg.Add(1) + defer l.wg.Done() + for </span><span class="cov8" title="1">{ + wc, err := l.client.WatchWithPrefix(prefix) + if err != nil </span><span class="cov0" title="0">{ + logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) + }</span> + + <span class="cov8" title="1">select </span>{ + + // client stopped + case <-l.client.Done():<span class="cov0" title="0"> + logger.Warnf("etcd client stopped") + return</span> + + // client ctx stop + case <-l.client.ctx.Done():<span class="cov0" title="0"> + logger.Warnf("etcd client ctx cancel") + return</span> + + // etcd event stream + case e, ok := <-wc:<span class="cov8" title="1"> + + if !ok </span><span class="cov0" title="0">{ + logger.Warnf("etcd watch-chan closed") + return + }</span> + + <span class="cov8" title="1">if e.Err() != nil </span><span class="cov0" title="0">{ + logger.Errorf("etcd watch ERR {err: %s}", e.Err()) + continue</span> + } + <span class="cov8" title="1">for _, event := range e.Events </span><span class="cov8" title="1">{ + l.handleEvents(event, listener...) + }</span> + } + } +} + +func timeSecondDuration(sec int) time.Duration <span class="cov0" title="0">{ + return time.Duration(sec) * time.Second +}</span> + +// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener +// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent +// | +// --------> ListenServiceNodeEvent +func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) <span class="cov8" title="1">{ + + l.keyMapLock.Lock() + _, ok := l.keyMap[key] + l.keyMapLock.Unlock() + if ok </span><span class="cov0" title="0">{ + logger.Warnf("etcdv3 key %s has already been listened.", key) + return + }</span> + + <span class="cov8" title="1">l.keyMapLock.Lock() + l.keyMap[key] = struct{}{} + l.keyMapLock.Unlock() + + keyList, valueList, err := l.client.getChildren(key) + if err != nil </span><span class="cov8" title="1">{ + logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) + }</span> + + <span class="cov8" title="1">logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) + + for i, k := range keyList </span><span class="cov0" title="0">{ + logger.Infof("got children list key -> %s", k) + listener.DataChange(remoting.Event{ + Path: k, + Action: remoting.EventTypeAdd, + Content: valueList[i], + }) + }</span> + + <span class="cov8" title="1">logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key) + go func(key string, listener remoting.DataListener) </span><span class="cov8" title="1">{ + l.ListenServiceNodeEventWithPrefix(key, listener) + logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) + }</span>(key, listener) + + <span class="cov8" title="1">logger.Infof("listen dubbo service key{%s}", key) + go func(key string) </span><span class="cov8" title="1">{ + if l.ListenServiceNodeEvent(key) </span><span class="cov0" title="0">{ + listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) + }</span> + <span class="cov0" title="0">logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)</span> + }(key) +} + +// Close ... +func (l *EventListener) Close() <span class="cov0" title="0">{ + l.wg.Wait() +}</span> +</pre> + + </div> + </body> + <script> + (function() { + var files = document.getElementById('files'); + var visible; + files.addEventListener('change', onChange, false); + function select(part) { + if (visible) + visible.style.display = 'none'; + visible = document.getElementById(part); + if (!visible) + return; + files.value = part; + visible.style.display = 'block'; + location.hash = part; + } + function onChange() { + select(files.value); + window.scrollTo(0, 0); + } + if (location.hash != "") { + select(location.hash.substr(1)); + } + if (!visible) { + select("file0"); + } + })(); + </script> +</html> diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go index 08bc96794a0670f38f23738938850215c29e4859..7345ddfbd5f00bb169c62d8d42148163a3298da2 100644 --- a/remoting/kubernetes/client_test.go +++ b/remoting/kubernetes/client_test.go @@ -236,6 +236,32 @@ func (s *KubernetesClientTestSuite) SetupSuite() { } } +func (s *KubernetesClientTestSuite) TestReadCurrentPodName() { + t := s.T() + + n, err := getCurrentPodName() + if err != nil { + t.Fatal(err) + } + + if n != s.currentPod.GetName() { + t.Fatalf("expect %s but got %s", s.currentPod.GetName(), n) + } + +} +func (s *KubernetesClientTestSuite) TestReadCurrentNameSpace() { + t := s.T() + + ns, err := getCurrentNameSpace() + if err != nil { + t.Fatal(err) + } + + if ns != s.currentPod.GetNamespace() { + t.Fatalf("expect %s but got %s", s.currentPod.GetNamespace(), ns) + } + +} func (s *KubernetesClientTestSuite) TestClientValid() { t := s.T() @@ -358,7 +384,7 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() { } -func (s *KubernetesClientTestSuite) TestClientWatch() { +func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { t := s.T() @@ -396,6 +422,55 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { wg.Wait() } +func (s *KubernetesClientTestSuite) TestNewClient() { + + t := s.T() + + _, err := newClient(s.currentPod.GetNamespace()) + if err == nil { + t.Fatal("the out of cluster test should fail") + } + +} + +func (s *KubernetesClientTestSuite) TestClientWatch() { + + t := s.T() + + client := s.initClient() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + + defer wg.Done() + + wc, err := client.Watch(prefix) + if err != nil { + t.Fatal(err) + } + + for e := range wc { + t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) + } + + }() + + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + + if err := client.Create(k, v); err != nil { + t.Fatal(err) + } + } + + client.Close() + wg.Wait() +} + func TestKubernetesClient(t *testing.T) { suite.Run(t, new(KubernetesClientTestSuite)) } diff --git a/remoting/kubernetes/facade_test.go b/remoting/kubernetes/facade_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cb9f92723fe59e07fa52cd4760aa3ae06887a885 --- /dev/null +++ b/remoting/kubernetes/facade_test.go @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kubernetes + +import ( + "sync" +) +import ( + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) +import ( + "github.com/apache/dubbo-go/common" +) + +type mockFacade struct { + client *Client + cltLock sync.Mutex + wg sync.WaitGroup + URL *common.URL + done chan struct{} +} + +func (r *mockFacade) Client() *Client { + return r.client +} + +func (r *mockFacade) SetClient(client *Client) { + r.client = client +} + +func (r *mockFacade) ClientLock() *sync.Mutex { + return &r.cltLock +} + +func (r *mockFacade) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +func (r *mockFacade) Done() chan struct{} { + return r.done +} + +func (r *mockFacade) GetUrl() common.URL { + return *r.URL +} + +func (r *mockFacade) Destroy() { + close(r.done) + r.wg.Wait() +} + +func (r *mockFacade) RestartCallBack() bool { + return true +} + +func (r *mockFacade) IsAvailable() bool { + return true +} + +func (s *KubernetesClientTestSuite) Test_Facade() { + + t := s.T() + + mockClient, err := newMockClient(s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) { + + out := fake.NewSimpleClientset() + + // mock current pod + if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil { + t.Fatal(err) + } + return out, nil + }) + if err != nil { + t.Fatal(err) + } + + url, _ := common.NewURL("mock://127.0.0.1") + m := &mockFacade{ + client: mockClient, + URL: &url, + } + + go HandleClientRestart(m) + mockClient.Close() +} diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go index 2b6883a076d3ec7890bb87f764755e61104e6063..a9446782a5c336268c3d6418e5882031d1566ae8 100644 --- a/remoting/kubernetes/listener_test.go +++ b/remoting/kubernetes/listener_test.go @@ -17,10 +17,40 @@ package kubernetes +import ( + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + import ( "github.com/apache/dubbo-go/remoting" ) +var changedData = ` + dubbo.consumer.request_timeout=3s + dubbo.consumer.connect_timeout=5s + dubbo.application.organization=ikurento.com + dubbo.application.name=BDTService + dubbo.application.module=dubbogo user-info server + dubbo.application.version=0.0.1 + dubbo.application.owner=ZX + dubbo.application.environment=dev + dubbo.registries.hangzhouzk.protocol=zookeeper + dubbo.registries.hangzhouzk.timeout=3s + dubbo.registries.hangzhouzk.address=127.0.0.1:2181 + dubbo.registries.shanghaizk.protocol=zookeeper + dubbo.registries.shanghaizk.timeout=3s + dubbo.registries.shanghaizk.address=127.0.0.1:2182 + dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo + dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider + dubbo.service.com.ikurento.user.UserProvider.loadbalance=random + dubbo.service.com.ikurento.user.UserProvider.warmup=100 + dubbo.service.com.ikurento.user.UserProvider.cluster=failover +` + type mockDataListener struct { eventList []remoting.Event client *Client @@ -36,3 +66,41 @@ func (m *mockDataListener) DataChange(eventType remoting.Event) bool { } return true } + +func (s *KubernetesClientTestSuite) TestListener() { + + t := s.T() + + var tests = []struct { + input struct { + k string + v string + } + }{ + {input: struct { + k string + v string + }{k: "/dubbo", v: changedData}}, + } + + c := s.initClient() + defer c.Close() + + listener := NewEventListener(c) + dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} + listener.ListenServiceEvent("/dubbo", dataListener) + + // NOTICE: direct listen will lose create msg + time.Sleep(time.Second) + for _, tc := range tests { + + k := tc.input.k + v := tc.input.v + if err := c.Create(k, v); err != nil { + t.Fatal(err) + } + + } + msg := <-dataListener.rc + assert.Equal(t, changedData, msg.Content) +}