Skip to content
Snippets Groups Projects
Commit dfa8267f authored by scott's avatar scott
Browse files

sync watch unit goroutine

parent 3349096c
No related branches found
No related tags found
No related merge requests found
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<style>
body {
background: black;
color: rgb(80, 80, 80);
}
body, pre, #legend span {
font-family: Menlo, monospace;
font-weight: bold;
}
#topbar {
background: black;
position: fixed;
top: 0; left: 0; right: 0;
height: 42px;
border-bottom: 1px solid rgb(80, 80, 80);
}
#content {
margin-top: 50px;
}
#nav, #legend {
float: left;
margin-left: 10px;
}
#legend {
margin-top: 12px;
}
#nav {
margin-top: 10px;
}
#legend span {
margin: 0 5px;
}
.cov0 { color: rgb(192, 0, 0) }
.cov1 { color: rgb(128, 128, 128) }
.cov2 { color: rgb(116, 140, 131) }
.cov3 { color: rgb(104, 152, 134) }
.cov4 { color: rgb(92, 164, 137) }
.cov5 { color: rgb(80, 176, 140) }
.cov6 { color: rgb(68, 188, 143) }
.cov7 { color: rgb(56, 200, 146) }
.cov8 { color: rgb(44, 212, 149) }
.cov9 { color: rgb(32, 224, 152) }
.cov10 { color: rgb(20, 236, 155) }
</style>
</head>
<body>
<div id="topbar">
<div id="nav">
<select id="files">
<option value="file0">github.com/apache/dubbo-go/remoting/etcdv3/client.go (65.0%)</option>
<option value="file1">github.com/apache/dubbo-go/remoting/etcdv3/facade.go (0.0%)</option>
<option value="file2">github.com/apache/dubbo-go/remoting/etcdv3/listener.go (57.3%)</option>
</select>
</div>
<div id="legend">
<span>not tracked</span>
<span class="cov0">not covered</span>
<span class="cov8">covered</span>
</div>
</div>
<div id="content">
<pre class="file" id="file0" style="display: none">/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcdv3
import (
"context"
"path"
"sync"
"time"
)
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
perrors "github.com/pkg/errors"
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
const (
// ConnDelay connection dalay
ConnDelay = 3
// MaxFailTimes max failure times
MaxFailTimes = 15
// RegistryETCDV3Client client name
RegistryETCDV3Client = "etcd registry"
)
var (
// ErrNilETCDV3Client ...
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
// ErrKVPairNotFound ...
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
// Options ...
type Options struct {
name string
endpoints []string
client *Client
timeout time.Duration
heartbeat int // heartbeat second
}
// Option ...
type Option func(*Options)
// WithEndpoints ...
func WithEndpoints(endpoints ...string) Option <span class="cov0" title="0">{
return func(opt *Options) </span><span class="cov0" title="0">{
opt.endpoints = endpoints
}</span>
}
// WithName ...
func WithName(name string) Option <span class="cov0" title="0">{
return func(opt *Options) </span><span class="cov0" title="0">{
opt.name = name
}</span>
}
// WithTimeout ...
func WithTimeout(timeout time.Duration) Option <span class="cov0" title="0">{
return func(opt *Options) </span><span class="cov0" title="0">{
opt.timeout = timeout
}</span>
}
// WithHeartbeat ...
func WithHeartbeat(heartbeat int) Option <span class="cov0" title="0">{
return func(opt *Options) </span><span class="cov0" title="0">{
opt.heartbeat = heartbeat
}</span>
}
// ValidateClient ...
func ValidateClient(container clientFacade, opts ...Option) error <span class="cov0" title="0">{
options := &amp;Options{
heartbeat: 1, // default heartbeat
}
for _, opt := range opts </span><span class="cov0" title="0">{
opt(options)
}</span>
<span class="cov0" title="0">lock := container.ClientLock()
lock.Lock()
defer lock.Unlock()
// new Client
if container.Client() == nil </span><span class="cov0" title="0">{
newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if err != nil </span><span class="cov0" title="0">{
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
}</span>
<span class="cov0" title="0">container.SetClient(newClient)</span>
}
// Client lose connection with etcd server
<span class="cov0" title="0">if container.Client().rawClient == nil </span><span class="cov0" title="0">{
newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat)
if err != nil </span><span class="cov0" title="0">{
logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}",
options.name, options.endpoints, options.timeout, err)
return perrors.WithMessagef(err, "new client (address:%+v)", options.endpoints)
}</span>
<span class="cov0" title="0">container.SetClient(newClient)</span>
}
<span class="cov0" title="0">return nil</span>
}
// Client ...
type Client struct {
lock sync.RWMutex
// these properties are only set once when they are started.
name string
endpoints []string
timeout time.Duration
heartbeat int
ctx context.Context // if etcd server connection lose, the ctx.Done will be sent msg
cancel context.CancelFunc // cancel the ctx, all watcher will stopped
rawClient *clientv3.Client
exit chan struct{}
Wait sync.WaitGroup
}
func newClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) <span class="cov8" title="1">{
ctx, cancel := context.WithCancel(context.Background())
rawClient, err := clientv3.New(clientv3.Config{
Context: ctx,
Endpoints: endpoints,
DialTimeout: timeout,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil </span><span class="cov0" title="0">{
return nil, perrors.WithMessage(err, "new raw client block connect to server")
}</span>
<span class="cov8" title="1">c := &amp;Client{
name: name,
timeout: timeout,
endpoints: endpoints,
heartbeat: heartbeat,
ctx: ctx,
cancel: cancel,
rawClient: rawClient,
exit: make(chan struct{}),
}
if err := c.maintenanceStatus(); err != nil </span><span class="cov0" title="0">{
return nil, perrors.WithMessage(err, "client maintenance status")
}</span>
<span class="cov8" title="1">return c, nil</span>
}
// NOTICE: need to get the lock before calling this method
func (c *Client) clean() <span class="cov8" title="1">{
// close raw client
c.rawClient.Close()
// cancel ctx for raw client
c.cancel()
// clean raw client
c.rawClient = nil
}</span>
func (c *Client) stop() bool <span class="cov8" title="1">{
select </span>{
case &lt;-c.exit:<span class="cov0" title="0">
return true</span>
default:<span class="cov8" title="1">
close(c.exit)</span>
}
<span class="cov8" title="1">return false</span>
}
// Close ...
func (c *Client) Close() <span class="cov8" title="1">{
if c == nil </span><span class="cov0" title="0">{
return
}</span>
// stop the client
<span class="cov8" title="1">c.stop()
// wait client maintenance status stop
c.Wait.Wait()
c.lock.Lock()
if c.rawClient != nil </span><span class="cov8" title="1">{
c.clean()
}</span>
<span class="cov8" title="1">c.lock.Unlock()
logger.Warnf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)</span>
}
func (c *Client) maintenanceStatus() error <span class="cov8" title="1">{
s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat))
if err != nil </span><span class="cov0" title="0">{
return perrors.WithMessage(err, "new session with server")
}</span>
// must add wg before go maintenance status goroutine
<span class="cov8" title="1">c.Wait.Add(1)
go c.maintenanceStatusLoop(s)
return nil</span>
}
func (c *Client) maintenanceStatusLoop(s *concurrency.Session) <span class="cov8" title="1">{
defer func() </span><span class="cov8" title="1">{
c.Wait.Done()
logger.Infof("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name)
}</span>()
<span class="cov8" title="1">for </span><span class="cov8" title="1">{
select </span>{
case &lt;-c.Done():<span class="cov8" title="1">
// Client be stopped, will clean the client hold resources
return</span>
case &lt;-s.Done():<span class="cov0" title="0">
logger.Warn("etcd server stopped")
c.lock.Lock()
// when etcd server stopped, cancel ctx, stop all watchers
c.clean()
// when connection lose, stop client, trigger reconnect to etcd
c.stop()
c.lock.Unlock()
return</span>
}
}
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd, return nil
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">_, err := c.rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "&lt;", 1)).
Then(clientv3.OpPut(k, v, opts...)).
Commit()
if err != nil </span><span class="cov0" title="0">{
return err
}</span>
<span class="cov8" title="1">return nil</span>
}
func (c *Client) delete(k string) error <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">_, err := c.rawClient.Delete(c.ctx, k)
if err != nil </span><span class="cov0" title="0">{
return err
}</span>
<span class="cov8" title="1">return nil</span>
}
func (c *Client) get(k string) (string, error) <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return "", ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">resp, err := c.rawClient.Get(c.ctx, k)
if err != nil </span><span class="cov0" title="0">{
return "", err
}</span>
<span class="cov8" title="1">if len(resp.Kvs) == 0 </span><span class="cov8" title="1">{
return "", ErrKVPairNotFound
}</span>
<span class="cov8" title="1">return string(resp.Kvs[0].Value), nil</span>
}
// CleanKV ...
func (c *Client) CleanKV() error <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">_, err := c.rawClient.Delete(c.ctx, "", clientv3.WithPrefix())
if err != nil </span><span class="cov0" title="0">{
return err
}</span>
<span class="cov8" title="1">return nil</span>
}
func (c *Client) getChildren(k string) ([]string, []string, error) <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return nil, nil, ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">resp, err := c.rawClient.Get(c.ctx, k, clientv3.WithPrefix())
if err != nil </span><span class="cov0" title="0">{
return nil, nil, err
}</span>
<span class="cov8" title="1">if len(resp.Kvs) == 0 </span><span class="cov8" title="1">{
return nil, nil, ErrKVPairNotFound
}</span>
<span class="cov8" title="1">var (
kList []string
vList []string
)
for _, kv := range resp.Kvs </span><span class="cov8" title="1">{
kList = append(kList, string(kv.Key))
vList = append(vList, string(kv.Value))
}</span>
<span class="cov8" title="1">return kList, vList, nil</span>
}
func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return nil, ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">return c.rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil</span>
}
func (c *Client) watch(k string) (clientv3.WatchChan, error) <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return nil, ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">return c.rawClient.Watch(c.ctx, k), nil</span>
}
func (c *Client) keepAliveKV(k string, v string) error <span class="cov8" title="1">{
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil </span><span class="cov0" title="0">{
return ErrNilETCDV3Client
}</span>
<span class="cov8" title="1">lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds()))
if err != nil </span><span class="cov0" title="0">{
return perrors.WithMessage(err, "grant lease")
}</span>
<span class="cov8" title="1">keepAlive, err := c.rawClient.KeepAlive(c.ctx, lease.ID)
if err != nil || keepAlive == nil </span><span class="cov0" title="0">{
c.rawClient.Revoke(c.ctx, lease.ID)
return perrors.WithMessage(err, "keep alive lease")
}</span>
<span class="cov8" title="1">_, err = c.rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID))
if err != nil </span><span class="cov0" title="0">{
return perrors.WithMessage(err, "put k/v with lease")
}</span>
<span class="cov8" title="1">return nil</span>
}
// Done ...
func (c *Client) Done() &lt;-chan struct{} <span class="cov8" title="1">{
return c.exit
}</span>
// Valid ...
func (c *Client) Valid() bool <span class="cov8" title="1">{
select </span>{
case &lt;-c.exit:<span class="cov8" title="1">
return false</span>
default:<span class="cov8" title="1"></span>
}
<span class="cov8" title="1">c.lock.RLock()
if c.rawClient == nil </span><span class="cov0" title="0">{
c.lock.RUnlock()
return false
}</span>
<span class="cov8" title="1">c.lock.RUnlock()
return true</span>
}
// Create ...
func (c *Client) Create(k string, v string) error <span class="cov8" title="1">{
err := c.put(k, v)
if err != nil </span><span class="cov0" title="0">{
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
}</span>
<span class="cov8" title="1">return nil</span>
}
// Delete ...
func (c *Client) Delete(k string) error <span class="cov8" title="1">{
err := c.delete(k)
if err != nil </span><span class="cov0" title="0">{
return perrors.WithMessagef(err, "delete k/v (key %s)", k)
}</span>
<span class="cov8" title="1">return nil</span>
}
// RegisterTemp ...
func (c *Client) RegisterTemp(basePath string, node string) (string, error) <span class="cov8" title="1">{
completeKey := path.Join(basePath, node)
err := c.keepAliveKV(completeKey, "")
if err != nil </span><span class="cov0" title="0">{
return "", perrors.WithMessagef(err, "keepalive kv (key %s)", completeKey)
}</span>
<span class="cov8" title="1">return completeKey, nil</span>
}
// GetChildrenKVList ...
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) <span class="cov8" title="1">{
kList, vList, err := c.getChildren(k)
if err != nil </span><span class="cov0" title="0">{
return nil, nil, perrors.WithMessagef(err, "get key children (key %s)", k)
}</span>
<span class="cov8" title="1">return kList, vList, nil</span>
}
// Get ...
func (c *Client) Get(k string) (string, error) <span class="cov8" title="1">{
v, err := c.get(k)
if err != nil </span><span class="cov8" title="1">{
return "", perrors.WithMessagef(err, "get key value (key %s)", k)
}</span>
<span class="cov8" title="1">return v, nil</span>
}
// Watch ...
func (c *Client) Watch(k string) (clientv3.WatchChan, error) <span class="cov8" title="1">{
wc, err := c.watch(k)
if err != nil </span><span class="cov0" title="0">{
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", k)
}</span>
<span class="cov8" title="1">return wc, nil</span>
}
// WatchWithPrefix ...
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) <span class="cov8" title="1">{
wc, err := c.watchWithPrefix(prefix)
if err != nil </span><span class="cov0" title="0">{
return nil, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
}</span>
<span class="cov8" title="1">return wc, nil</span>
}
</pre>
<pre class="file" id="file1" style="display: none">/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcdv3
import (
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
type clientFacade interface {
Client() *Client
SetClient(*Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener &amp; etcd client container
Done() chan struct{} //for etcd client control
RestartCallBack() bool
common.Node
}
// HandleClientRestart ...
func HandleClientRestart(r clientFacade) <span class="cov0" title="0">{
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for </span><span class="cov0" title="0">{
select </span>{
case &lt;-r.Done():<span class="cov0" title="0">
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
break LOOP</span>
// re-register all services
case &lt;-r.Client().Done():<span class="cov0" title="0">
r.ClientLock().Lock()
clientName := RegistryETCDV3Client
timeout, _ := time.ParseDuration(r.GetUrl().GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
endpoint := r.GetUrl().Location
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()
// try to connect to etcd,
failTimes = 0
for </span><span class="cov0" title="0">{
select </span>{
case &lt;-r.Done():<span class="cov0" title="0">
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP</span>
case &lt;-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):<span class="cov0" title="0"></span> // avoid connect frequent
}
<span class="cov0" title="0">err = ValidateClient(
r,
WithName(clientName),
WithEndpoints(endpoint),
WithTimeout(timeout),
)
logger.Infof("ETCDV3ProviderRegistry.validateETCDV3Client(etcd Addr{%s}) = error{%#v}",
endpoint, perrors.WithStack(err))
if err == nil </span><span class="cov0" title="0">{
if r.RestartCallBack() </span><span class="cov0" title="0">{
break</span>
}
}
<span class="cov0" title="0">failTimes++
if MaxFailTimes &lt;= failTimes </span><span class="cov0" title="0">{
failTimes = MaxFailTimes
}</span>
}
}
}
}
</pre>
<pre class="file" id="file2" style="display: none">/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package etcdv3
import (
"sync"
"time"
)
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
// EventListener ...
type EventListener struct {
client *Client
keyMapLock sync.Mutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
// NewEventListener ...
func NewEventListener(client *Client) *EventListener <span class="cov8" title="1">{
return &amp;EventListener{
client: client,
keyMap: make(map[string]struct{}),
}
}</span>
// ListenServiceNodeEvent Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool <span class="cov8" title="1">{
l.wg.Add(1)
defer l.wg.Done()
for </span><span class="cov8" title="1">{
wc, err := l.client.Watch(key)
if err != nil </span><span class="cov0" title="0">{
logger.Warnf("WatchExist{key:%s} = error{%v}", key, err)
return false
}</span>
<span class="cov8" title="1">select </span>{
// client stopped
case &lt;-l.client.Done():<span class="cov0" title="0">
logger.Warnf("etcd client stopped")
return false</span>
// client ctx stop
case &lt;-l.client.ctx.Done():<span class="cov0" title="0">
logger.Warnf("etcd client ctx cancel")
return false</span>
// handle etcd events
case e, ok := &lt;-wc:<span class="cov8" title="1">
if !ok </span><span class="cov0" title="0">{
logger.Warnf("etcd watch-chan closed")
return false
}</span>
<span class="cov8" title="1">if e.Err() != nil </span><span class="cov0" title="0">{
logger.Errorf("etcd watch ERR {err: %s}", e.Err())
continue</span>
}
<span class="cov8" title="1">for _, event := range e.Events </span><span class="cov8" title="1">{
if l.handleEvents(event, listener...) </span><span class="cov0" title="0">{
// if event is delete
return true
}</span>
}
}
}
<span class="cov0" title="0">return false</span>
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remoting.DataListener) bool <span class="cov8" title="1">{
logger.Infof("got a etcd event {type: %s, key: %s}", event.Type, event.Kv.Key)
switch event.Type </span>{
// the etcdv3 event just include PUT &amp;&amp; DELETE
case mvccpb.PUT:<span class="cov8" title="1">
for _, listener := range listeners </span><span class="cov8" title="1">{
switch event.IsCreate() </span>{
case true:<span class="cov8" title="1">
logger.Infof("etcd get event (key{%s}) = event{EventNodeDataCreated}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EventTypeAdd,
Content: string(event.Kv.Value),
})</span>
case false:<span class="cov0" title="0">
logger.Infof("etcd get event (key{%s}) = event{EventNodeDataChanged}", event.Kv.Key)
listener.DataChange(remoting.Event{
Path: string(event.Kv.Key),
Action: remoting.EventTypeUpdate,
Content: string(event.Kv.Value),
})</span>
}
}
<span class="cov8" title="1">return false</span>
case mvccpb.DELETE:<span class="cov0" title="0">
logger.Warnf("etcd get event (key{%s}) = event{EventNodeDeleted}", event.Kv.Key)
return true</span>
default:<span class="cov0" title="0">
return false</span>
}
<span class="cov0" title="0">panic("unreachable")</span>
}
// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) <span class="cov8" title="1">{
l.wg.Add(1)
defer l.wg.Done()
for </span><span class="cov8" title="1">{
wc, err := l.client.WatchWithPrefix(prefix)
if err != nil </span><span class="cov0" title="0">{
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
}</span>
<span class="cov8" title="1">select </span>{
// client stopped
case &lt;-l.client.Done():<span class="cov0" title="0">
logger.Warnf("etcd client stopped")
return</span>
// client ctx stop
case &lt;-l.client.ctx.Done():<span class="cov0" title="0">
logger.Warnf("etcd client ctx cancel")
return</span>
// etcd event stream
case e, ok := &lt;-wc:<span class="cov8" title="1">
if !ok </span><span class="cov0" title="0">{
logger.Warnf("etcd watch-chan closed")
return
}</span>
<span class="cov8" title="1">if e.Err() != nil </span><span class="cov0" title="0">{
logger.Errorf("etcd watch ERR {err: %s}", e.Err())
continue</span>
}
<span class="cov8" title="1">for _, event := range e.Events </span><span class="cov8" title="1">{
l.handleEvents(event, listener...)
}</span>
}
}
}
func timeSecondDuration(sec int) time.Duration <span class="cov0" title="0">{
return time.Duration(sec) * time.Second
}</span>
// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
// registry.go:Listen -&gt; listenServiceEvent -&gt; listenDirEvent -&gt; ListenServiceNodeEvent
// |
// --------&gt; ListenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) <span class="cov8" title="1">{
l.keyMapLock.Lock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
if ok </span><span class="cov0" title="0">{
logger.Warnf("etcdv3 key %s has already been listened.", key)
return
}</span>
<span class="cov8" title="1">l.keyMapLock.Lock()
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
keyList, valueList, err := l.client.getChildren(key)
if err != nil </span><span class="cov8" title="1">{
logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}</span>
<span class="cov8" title="1">logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
for i, k := range keyList </span><span class="cov0" title="0">{
logger.Infof("got children list key -&gt; %s", k)
listener.DataChange(remoting.Event{
Path: k,
Action: remoting.EventTypeAdd,
Content: valueList[i],
})
}</span>
<span class="cov8" title="1">logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
go func(key string, listener remoting.DataListener) </span><span class="cov8" title="1">{
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}</span>(key, listener)
<span class="cov8" title="1">logger.Infof("listen dubbo service key{%s}", key)
go func(key string) </span><span class="cov8" title="1">{
if l.ListenServiceNodeEvent(key) </span><span class="cov0" title="0">{
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
}</span>
<span class="cov0" title="0">logger.Warnf("listenSelf(etcd key{%s}) goroutine exit now", key)</span>
}(key)
}
// Close ...
func (l *EventListener) Close() <span class="cov0" title="0">{
l.wg.Wait()
}</span>
</pre>
</div>
</body>
<script>
(function() {
var files = document.getElementById('files');
var visible;
files.addEventListener('change', onChange, false);
function select(part) {
if (visible)
visible.style.display = 'none';
visible = document.getElementById(part);
if (!visible)
return;
files.value = part;
visible.style.display = 'block';
location.hash = part;
}
function onChange() {
select(files.value);
window.scrollTo(0, 0);
}
if (location.hash != "") {
select(location.hash.substr(1));
}
if (!visible) {
select("file0");
}
})();
</script>
</html>
...@@ -395,19 +395,22 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { ...@@ -395,19 +395,22 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
go func() { go func() {
defer wg.Done()
wc, err := client.WatchWithPrefix(prefix) wc, err := client.WatchWithPrefix(prefix)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
wg.Done()
for e := range wc { for e := range wc {
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
} }
}() }()
// must wait the watch goroutine work
wg.Wait()
for _, tc := range tests { for _, tc := range tests {
k := tc.input.k k := tc.input.k
...@@ -419,7 +422,6 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() { ...@@ -419,7 +422,6 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
} }
client.Close() client.Close()
wg.Wait()
} }
func (s *KubernetesClientTestSuite) TestNewClient() { func (s *KubernetesClientTestSuite) TestNewClient() {
...@@ -444,12 +446,11 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { ...@@ -444,12 +446,11 @@ func (s *KubernetesClientTestSuite) TestClientWatch() {
go func() { go func() {
defer wg.Done()
wc, err := client.Watch(prefix) wc, err := client.Watch(prefix)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
wg.Done()
for e := range wc { for e := range wc {
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
...@@ -457,6 +458,9 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { ...@@ -457,6 +458,9 @@ func (s *KubernetesClientTestSuite) TestClientWatch() {
}() }()
// must wait the watch goroutine already start the watch goroutine
wg.Wait()
for _, tc := range tests { for _, tc := range tests {
k := tc.input.k k := tc.input.k
...@@ -468,7 +472,6 @@ func (s *KubernetesClientTestSuite) TestClientWatch() { ...@@ -468,7 +472,6 @@ func (s *KubernetesClientTestSuite) TestClientWatch() {
} }
client.Close() client.Close()
wg.Wait()
} }
func TestKubernetesClient(t *testing.T) { func TestKubernetesClient(t *testing.T) {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment