高辛格 authored5dd2bd0d
listener.go 4.97 KiB
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package consul
import (
import (
consul "github.com/hashicorp/consul/api"
perrors "github.com/pkg/errors"
import (
// Consul listener wraps the consul watcher, to
// listen the service information change in consul
// registry.
type consulListener struct {
// Registry url.
registryUrl common.URL
// Consumer url.
consumerUrl common.URL
// Consul watcher.
plan *watch.Plan
// Most recent service urls return by
// watcher.
urls []common.URL
// All service information changes will
// be wrapped into ServiceEvent, and be
// sent into eventCh. Then listener's
// Next method will get event from eventCh,
// and return to upstream.
eventCh chan *registry.ServiceEvent
// All errors, happening in the listening
// period, will be caught and send into
// errCh. Then listener's Next method will
// get error from errCh, and return to
// upstream.
errCh chan error
// Done field represents whether consul
// listener has been closed. When closing
// listener, this field will be closed,
// and will notify consul watcher to close.
done chan struct{}
// After listener notifies consul watcher
// to close, listener will call wg.wait to
// make sure that consul watcher is closed
// before the listener closes.
wg sync.WaitGroup
func newConsulListener(registryUrl common.URL, consumerUrl common.URL) (*consulListener, error) {
params := make(map[string]interface{}, 8)
params["type"] = "service"
params["service"] = consumerUrl.Service()
params["tag"] = "dubbo"
params["passingonly"] = true
plan, err := watch.Parse(params)
if err != nil {
return nil, err
listener := &consulListener{
registryUrl: registryUrl,
consumerUrl: consumerUrl,
plan: plan,
urls: make([]common.URL, 0, 8),
eventCh: make(chan *registry.ServiceEvent, 32),
errCh: make(chan error, 32),
done: make(chan struct{}),
// Set handler to consul watcher, and
// make watcher begin to watch service
// information change.
listener.plan.Handler = listener.handler
go listener.run()
return listener, nil
// Wrap the consul watcher run api. There are three
// conditions that will finish the run:
// - close done
// - call plan.Stop
// - close eventCh and errCh
// If run meets first two conditions, it will close
// gracefully. However, if run meets the last condition,
// run will close with panic, so use recover to cover
// this case.
func (l *consulListener) run() {
defer func() {
p := recover()
if p != nil {
logger.Warnf("consul listener finish with panic %v", p)
for {
select {
case <-l.done:
err := l.plan.Run(l.registryUrl.Location)
if err != nil {
l.errCh <- err
func (l *consulListener) handler(idx uint64, raw interface{}) {
var (
service *consul.ServiceEntry
event *registry.ServiceEvent
url common.URL
ok bool
err error
services, ok := raw.([]*consul.ServiceEntry)
if !ok {
err = perrors.New("handler get non ServiceEntry type parameter")
l.errCh <- err
newUrls := make([]common.URL, 0, 8)
events := make([]*registry.ServiceEvent, 0, 8)
for _, service = range services {
url, err = retrieveURL(service)
if err != nil {
l.errCh <- err
newUrls = append(newUrls, url)
for _, url = range l.urls {
ok = in(url, newUrls)
if !ok {
event := ®istry.ServiceEvent{Action: remoting.EventTypeDel, Service: url}
events = append(events, event)
for _, url = range newUrls {
ok = in(url, l.urls)
if !ok {
event := ®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: url}
events = append(events, event)
l.urls = newUrls
for _, event = range events {
l.eventCh <- event
func (l *consulListener) Next() (*registry.ServiceEvent, error) {
select {
case event := <-l.eventCh:
return event, nil
case err := <-l.errCh:
return nil, err
func (l *consulListener) Close() {