Skip to content
Snippets Groups Projects
Commit 31642878 authored by scott.wang's avatar scott.wang
Browse files

Basic function comleted, wait for refactor

parent 0497d27c
No related branches found
No related tags found
No related merge requests found
......@@ -2,12 +2,12 @@ package etcdv3
import (
"context"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
"github.com/juju/errors"
"strings"
)
type dataListener struct {
......@@ -24,14 +24,17 @@ func (l *dataListener) AddInterestedURL(url *common.URL) {
}
func (l *dataListener) DataChange(eventType remoting.Event) bool {
serviceURL, err := common.NewURL(context.TODO(), eventType.Content)
url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
serviceURL, err := common.NewURL(context.TODO(), url)
if err != nil {
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Content, err)
logger.Errorf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&remoting.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
l.listener.Process(&remoting.ConfigChangeEvent{Key: eventType.Path,Value: serviceURL, ConfigType: eventType.Action})
return true
}
}
......@@ -59,7 +62,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
return nil, errors.New("listener stopped")
case e := <-l.events:
logger.Debugf("got etcd event %s", e)
logger.Warnf("got etcd event %#s", e)
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.done:
......
......@@ -46,8 +46,6 @@ type etcdV3Registry struct {
dataListener *dataListener
configListener *configurationListener
servicesCache sync.Map // service name + protocol -> service config
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
}
......@@ -104,6 +102,7 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]common.URL),
}
if err := etcdv3.ValidateClient(r, etcdv3.WithName(etcdv3.RegistryETCDV3Client)); err != nil {
......@@ -136,6 +135,9 @@ func (r *etcdV3Registry) IsAvailable() bool {
func (r *etcdV3Registry) Destroy() {
logger.Warn("destory be call")
if r.configListener != nil {
r.configListener.Close()
}
......@@ -150,10 +152,10 @@ func (r *etcdV3Registry) stop() {
r.client.Close()
r.client = nil
r.servicesCache.Range(func(key, value interface{}) bool {
r.servicesCache.Delete(key)
return true
}) // empty service catalog
r.cltLock.Lock()
r.services = nil
r.cltLock.Unlock()
}
func (r *etcdV3Registry) Register(svc common.URL) error {
......@@ -163,9 +165,11 @@ func (r *etcdV3Registry) Register(svc common.URL) error {
return errors.Annotate(err, "get registry role")
}
if _, ok := r.servicesCache.Load(svc.Key()); ok {
r.cltLock.Lock()
if _, ok := r.services[svc.Key()]; ok {
return errors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
}
r.cltLock.Unlock()
switch role {
case common.PROVIDER:
......@@ -182,7 +186,9 @@ func (r *etcdV3Registry) Register(svc common.URL) error {
return errors.New(fmt.Sprintf("unknown role %d", role))
}
r.servicesCache.Store(svc.Key(), svc)
r.cltLock.Lock()
r.services[svc.Key()] = svc
r.cltLock.Unlock()
return nil
}
......@@ -206,10 +212,10 @@ func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
logger.Errorf("etcd client create path %s: %v", consumersNode, err)
return errors.Annotate(err, "etcd create consumer nodes")
}
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return errors.Annotate(err, "create provider node")
}
//providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
//if err := r.createDirIfNotExist(providersNode); err != nil {
// return errors.Annotate(err, "create provider node")
//}
params := url.Values{}
......
......@@ -250,10 +250,10 @@ func (c *clientSet) watch(k string) (clientv3.WatchChan, error) {
return nil, ErrNilETCDV3ClientConn
}
_, err := c.get(k)
if err != nil {
return nil, errors.Annotatef(err, "pre check key %s", k)
}
//_, err := c.get(k)
//if err != nil {
// return nil, errors.Annotatef(err, "pre check key %s", k)
//}
return c.rawClient.Watch(c.ctx, k), nil
}
......@@ -625,26 +625,27 @@ func (c *Client) WatchChildren(key string) ([]string, []string, clientv3.WatchCh
return childrenKeys, childrenValues, wc, nil
}
func (c *Client) GetChildren(key string) ([]string, error) {
func (c *Client) GetChildren(key string) ([]string, []string, error) {
var (
err error
children []string
childrenKeyList []string
childrenValueList []string
)
err = ErrNilETCDV3ClientConn
c.Lock()
if c.cs != nil {
children, _, err = c.cs.getChildren(key)
childrenKeyList, childrenValueList, err = c.cs.getChildren(key)
}
c.Unlock()
if err != nil {
if errors.Cause(err) == ErrKVPairNotFound {
return nil, errors.Annotatef(err, "key{%s} has none children", key)
return nil, nil, errors.Annotatef(err, "key{%s} has none children", key)
}
logger.Errorf("clientv3.Children(key{%s}) = error(%v)", key, perrors.WithStack(err))
return nil, errors.Annotatef(err, "client GetChildren(key:%s)", key)
return nil, nil, errors.Annotatef(err, "client GetChildren(key:%s)", key)
}
return children, nil
return childrenKeyList,childrenValueList, nil
}
func (c *Client) WatchExist(key string) (clientv3.WatchChan, error) {
......
......@@ -9,6 +9,7 @@ import (
import (
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/juju/errors"
)
import (
......@@ -40,7 +41,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
l.wg.Add(1)
defer l.wg.Done()
for {
keyEventCh, err := l.client.WatchExist(key)
wc, err := l.client.WatchExist(key)
if err != nil {
logger.Warnf("WatchExist{key:%s} = error{%v}", key, err)
return false
......@@ -56,8 +57,8 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
case <-l.client.Done():
return false
// etcd event stream
case e := <-keyEventCh:
// handle etcd events
case e := <-wc:
if e.Err() != nil {
logger.Errorf("get a etcdv3 event {err: %s}", e.Err())
......@@ -65,6 +66,7 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
}
for _, event := range e.Events {
if l.handleEvents(event, listener...) {
// if event is delete
return true
}
}
......@@ -157,6 +159,8 @@ func timeSecondDuration(sec int) time.Duration {
// --------> ListenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.keyMapLock.Lock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
......@@ -169,6 +173,26 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
keyList, valueList, err := l.client.GetChildren(key)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, errors.Annotate(err, "get children"))
}
logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
for i, k := range keyList{
logger.Warnf("get children list key -> %s", k)
if !listener.DataChange(remoting.Event{
Path: k,
Action: remoting.EventTypeAdd,
Content: valueList[i],
}) {
continue
}
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment