Skip to content
Snippets Groups Projects
listener.go 11.2 KiB
Newer Older
AlexStocks's avatar
AlexStocks committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
fangyincheng's avatar
fangyincheng committed

vito.he's avatar
vito.he committed
package zookeeper
AlexStocks's avatar
AlexStocks committed

import (
	"path"
AlexStocks's avatar
AlexStocks committed
	"sync"
	"time"
)

import (
aliiohs's avatar
aliiohs committed
	"github.com/dubbogo/getty"
pantianying's avatar
pantianying committed
	"github.com/dubbogo/go-zookeeper/zk"
pantianying's avatar
pantianying committed
	perrors "github.com/pkg/errors"
AlexStocks's avatar
AlexStocks committed
)

	"github.com/apache/dubbo-go/common"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/logger"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/remoting"
wangwx's avatar
wangwx committed
var (
	DefaultTTL = 10 * time.Minute
)

type ZkEventListener struct {
	client      *ZookeeperClient
	pathMapLock sync.Mutex
	pathMap     map[string]struct{}
	wg          sync.WaitGroup
AlexStocks's avatar
AlexStocks committed
}

// NewZkEventListener returns a EventListener instance
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
	return &ZkEventListener{
		client:  client,
		pathMap: make(map[string]struct{}),
AlexStocks's avatar
AlexStocks committed
	}
}
高辛格's avatar
高辛格 committed

func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
	l.client = client
}
高辛格's avatar
高辛格 committed

// ListenServiceNodeEvent listen a path node event
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) {
	// listen l service node
	l.wg.Add(1)
	go func(zkPath string, listener remoting.DataListener) {
		if l.listenServiceNodeEvent(zkPath, listener) {
			listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
		}
		logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
	}(zkPath, listener)
}

func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
AlexStocks's avatar
AlexStocks committed
	defer l.wg.Done()
	var zkEvent zk.Event
	for {
		keyEventCh, err := l.client.ExistW(zkPath)
AlexStocks's avatar
AlexStocks committed
		if err != nil {
			logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
AlexStocks's avatar
AlexStocks committed
			return false
		}

		select {
		case zkEvent = <-keyEventCh:
fangyincheng's avatar
fangyincheng committed
			logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
AlexStocks's avatar
AlexStocks committed
			switch zkEvent.Type {
			case zk.EventNodeDataChanged:
fangyincheng's avatar
fangyincheng committed
				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
vito.he's avatar
vito.he committed
				if len(listener) > 0 {
pantianying's avatar
pantianying committed
					content, _, err := l.client.Conn.Get(zkEvent.Path)
					if err != nil {
						logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
						return false
					}
高辛格's avatar
高辛格 committed
					listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)})
vito.he's avatar
vito.he committed
				}
AlexStocks's avatar
AlexStocks committed
			case zk.EventNodeCreated:
fangyincheng's avatar
fangyincheng committed
				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
vito.he's avatar
vito.he committed
				if len(listener) > 0 {
pantianying's avatar
pantianying committed
					content, _, err := l.client.Conn.Get(zkEvent.Path)
					if err != nil {
						logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
						return false
					}
					listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeAdd, Content: string(content)})
vito.he's avatar
vito.he committed
				}
AlexStocks's avatar
AlexStocks committed
			case zk.EventNotWatching:
fangyincheng's avatar
fangyincheng committed
				logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
AlexStocks's avatar
AlexStocks committed
			case zk.EventNodeDeleted:
fangyincheng's avatar
fangyincheng committed
				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
AlexStocks's avatar
AlexStocks committed
				return true
			}
AlexStocks's avatar
AlexStocks committed
			return false
		}
	}
}

func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, listener remoting.DataListener) {
AlexStocks's avatar
AlexStocks committed
	contains := func(s []string, e string) bool {
		for _, a := range s {
			if a == e {
				return true
			}
		}
		return false
	}

	newChildren, err := l.client.GetChildren(zkPath)
AlexStocks's avatar
AlexStocks committed
	if err != nil {
vito.he's avatar
vito.he committed
		if err == errNilChildren {
			content, _, err := l.client.Conn.Get(zkPath)
			if err != nil {
				logger.Errorf("Get new node path {%v} 's content error,message is  {%v}", zkPath, perrors.WithStack(err))
vito.he's avatar
vito.he committed
			} else {
				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
vito.he's avatar
vito.he committed
			}

		} else {
			logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
		}
AlexStocks's avatar
AlexStocks committed
	}

	// a node was added -- listen the new node
	var (
AlexStocks's avatar
AlexStocks committed
	)
	for _, n := range newChildren {
		if contains(children, n) {
			continue
		}

		newNode = path.Join(zkPath, n)
fangyincheng's avatar
fangyincheng committed
		logger.Infof("add zkNode{%s}", newNode)
vito.he's avatar
vito.he committed
		content, _, err := l.client.Conn.Get(newNode)
		if err != nil {
			logger.Errorf("Get new node path {%v} 's content error,message is  {%v}", newNode, perrors.WithStack(err))
		}
		if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
AlexStocks's avatar
AlexStocks committed
			continue
		}
		// listen l service node
		l.wg.Add(1)
高辛格's avatar
高辛格 committed
		go func(node string, zkPath string, listener remoting.DataListener) {
fangyincheng's avatar
fangyincheng committed
			logger.Infof("delete zkNode{%s}", node)
			if l.listenServiceNodeEvent(node, listener) {
vito.he's avatar
vito.he committed
				logger.Infof("delete content{%s}", node)
				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
AlexStocks's avatar
AlexStocks committed
			}
fangyincheng's avatar
fangyincheng committed
			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
高辛格's avatar
高辛格 committed
		}(newNode, zkPath, listener)
AlexStocks's avatar
AlexStocks committed
	}

	// old node was deleted
	var oldNode string
	for _, n := range children {
		if contains(newChildren, n) {
			continue
		}

		oldNode = path.Join(zkPath, n)
fangyincheng's avatar
fangyincheng committed
		logger.Warnf("delete zkPath{%s}", oldNode)
vito.he's avatar
vito.he committed

AlexStocks's avatar
AlexStocks committed
		if err != nil {
fangyincheng's avatar
fangyincheng committed
			logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
AlexStocks's avatar
AlexStocks committed
			continue
		}
		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
AlexStocks's avatar
AlexStocks committed
	}
}

func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
AlexStocks's avatar
AlexStocks committed
	defer l.wg.Done()

	var (
		failTimes int
wangwx's avatar
wangwx committed
		ttl       time.Duration
AlexStocks's avatar
AlexStocks committed
		event     chan struct{}
		zkEvent   zk.Event
	)
	event = make(chan struct{}, 4)
wangwx's avatar
wangwx committed
	ttl = DefaultTTL
wangwx's avatar
wangwx committed
	if conf != nil {
		timeout, err := time.ParseDuration(conf.GetParam(constant.REGISTRY_TTL_KEY, constant.DEFAULT_REG_TTL))
		if err == nil {
			ttl = timeout
		} else {
			logger.Warnf("wrong configuration for registry ttl, error:=%+v", err)
		}
wangwx's avatar
wangwx committed
	}
AlexStocks's avatar
AlexStocks committed
	defer close(event)
	for {
		// get current children for a zkPath
		children, childEventCh, err := l.client.GetChildrenW(zkPath)
AlexStocks's avatar
AlexStocks committed
		if err != nil {
			failTimes++
			if MaxFailTimes <= failTimes {
				failTimes = MaxFailTimes
AlexStocks's avatar
AlexStocks committed
			}
pantianying's avatar
pantianying committed
			logger.Infof("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
AlexStocks's avatar
AlexStocks committed
			// clear the event channel
		CLEAR:
			for {
				select {
				case <-event:
				default:
					break CLEAR
				}
			}
			l.client.RegisterEvent(zkPath, &event)
pantianying's avatar
pantianying committed
			if err == errNilNode {
				logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath)
				l.client.UnregisterEvent(zkPath, &event)
				return
			}
AlexStocks's avatar
AlexStocks committed
			select {
aliiohs's avatar
aliiohs committed
			case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
				l.client.UnregisterEvent(zkPath, &event)
AlexStocks's avatar
AlexStocks committed
				continue
			case <-l.client.Done():
				l.client.UnregisterEvent(zkPath, &event)
				logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
AlexStocks's avatar
AlexStocks committed
				return
			case <-event:
fangyincheng's avatar
fangyincheng committed
				logger.Infof("get zk.EventNodeDataChange notify event")
				l.client.UnregisterEvent(zkPath, &event)
				l.handleZkNodeEvent(zkPath, nil, listener)
AlexStocks's avatar
AlexStocks committed
				continue
			}
		}
		failTimes = 0

			// Only need to compare Path when subscribing to provider
			if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) != -1 {
				provider, _ := common.NewURL(c)
				if provider.ServiceKey() != conf.ServiceKey() {
flycash's avatar
flycash committed
			// listen l service node
vito.he's avatar
vito.he committed

flycash's avatar
flycash committed
			// Save the path to avoid listen repeatedly
vito.he's avatar
vito.he committed
			l.pathMapLock.Lock()
			_, ok := l.pathMap[dubboPath]
			l.pathMapLock.Unlock()
			if ok {
				logger.Warnf("@zkPath %s has already been listened.", dubboPath)
vito.he's avatar
vito.he committed
				continue
			}

			l.pathMapLock.Lock()
			l.pathMap[dubboPath] = struct{}{}
			l.pathMapLock.Unlock()
flycash's avatar
flycash committed
			// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
wangwx's avatar
wangwx committed
			l.client.RLock()
wangwx's avatar
wangwx committed
			if l.client.Conn == nil {
wangwx's avatar
wangwx committed
				l.client.RUnlock()
wangwx's avatar
wangwx committed
				break
			}
wangwx's avatar
wangwx committed
			l.client.RUnlock()
			if err != nil {
				logger.Errorf("Get new node path {%v} 's content error,message is  {%v}", dubboPath, perrors.WithStack(err))
			}
pantianying's avatar
pantianying committed
			logger.Debugf("Get children!{%s}", dubboPath)
			if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
				continue
			}
			logger.Infof("listen dubbo service key{%s}", dubboPath)
scott's avatar
scott committed
			l.wg.Add(1)
高辛格's avatar
高辛格 committed
			go func(zkPath string, listener remoting.DataListener) {
				if l.listenServiceNodeEvent(zkPath) {
高辛格's avatar
高辛格 committed
					listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
				}
				logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
高辛格's avatar
高辛格 committed
			}(dubboPath, listener)
AlexStocks's avatar
AlexStocks committed

flycash's avatar
flycash committed
			// listen sub path recursive
			// if zkPath is end of "providers/ & consumers/" we do not listen children dir
			if strings.LastIndex(zkPath, constant.PROVIDER_CATEGORY) == -1 &&
				strings.LastIndex(zkPath, constant.CONSUMER_CATEGORY) == -1 {
				l.wg.Add(1)
				go func(zkPath string, listener remoting.DataListener) {
					l.listenDirEvent(conf, zkPath, listener)
					logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
				}(dubboPath, listener)
			}
wangwx's avatar
wangwx committed
		// Periodically update provider information
		ticker := time.NewTicker(ttl)
	WATCH:
		for {
			select {
			case <-ticker.C:
				l.handleZkNodeEvent(zkEvent.Path, children, listener)
			case zkEvent = <-childEventCh:
				logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
					zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
				ticker.Stop()
				if zkEvent.Type != zk.EventNodeChildrenChanged {
					break WATCH
				}
				l.handleZkNodeEvent(zkEvent.Path, children, listener)
				break WATCH
			case <-l.client.Done():
				logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
				ticker.Stop()
				return
AlexStocks's avatar
AlexStocks committed
			}
		}
wangwx's avatar
wangwx committed

AlexStocks's avatar
AlexStocks committed
	}
}

func timeSecondDuration(sec int) time.Duration {
	return time.Duration(sec) * time.Second
}

AlexStocks's avatar
AlexStocks committed
// ListenServiceEvent is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> listenServiceNodeEvent
AlexStocks's avatar
AlexStocks committed
//                            |
//                            --------> listenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, listener remoting.DataListener) {
fangyincheng's avatar
fangyincheng committed
	logger.Infof("listen dubbo path{%s}", zkPath)
	l.wg.Add(1)
	go func(zkPath string, listener remoting.DataListener) {
		l.listenDirEvent(conf, zkPath, listener)
fangyincheng's avatar
fangyincheng committed
		logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
AlexStocks's avatar
AlexStocks committed
}

func (l *ZkEventListener) valid() bool {
	return l.client.ZkConnValid()
AlexStocks's avatar
AlexStocks committed
}

邹毅贤's avatar
邹毅贤 committed
// Close will let client listen exit
邹毅贤's avatar
邹毅贤 committed
	close(l.client.exit)
AlexStocks's avatar
AlexStocks committed
	l.wg.Wait()
}