Skip to content
Snippets Groups Projects
base_registry.go 13.3 KiB
Newer Older
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package registry

import (
	"context"
	"fmt"
	"net/url"
	"os"
	"strconv"
	"strings"
	"sync"
	"time"
)

import (
	gxnet "github.com/dubbogo/gost/net"
	perrors "github.com/pkg/errors"
)

import (
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/logger"
)

const (
	// RegistryConnDelay connection delay
	RegistryConnDelay = 3
	// MaxWaitInterval max wait interval
	MaxWaitInterval = 3 * time.Second
)

var (
	processID = ""
	localIP   = ""
)

func init() {
	processID = fmt.Sprintf("%d", os.Getpid())
	localIP, _ = gxnet.GetLocalIP()
}

vito.he's avatar
vito.he committed
/*
 * -----------------------------------NOTICE---------------------------------------------
 * If there is no special case, you'd better inherit BaseRegistry and implement the
 * FacadeBasedRegistry interface instead of directly implementing the Registry interface.
 * --------------------------------------------------------------------------------------
 */

/*
vito.he's avatar
vito.he committed
 * FacadeBasedRegistry interface is subclass of Registry, and it is designed for registry who want to inherit BaseRegistry.
vito.he's avatar
vito.he committed
 * You have to implement the interface to inherit BaseRegistry.
 */
type FacadeBasedRegistry interface {
	Registry
flycash's avatar
flycash committed

	// CreatePath create the path in the registry
	CreatePath(string) error
flycash's avatar
flycash committed
	// DoRegister actually do the register job
	DoRegister(string, string) error
	// DoUnregister do the unregister job
	DoUnregister(string, string) error
flycash's avatar
flycash committed
	// DoSubscribe actually subscribe the URL
	DoSubscribe(conf *common.URL) (Listener, error)
	// DoUnsubscribe does unsubscribe the URL
邹毅贤's avatar
邹毅贤 committed
	DoUnsubscribe(conf *common.URL) (Listener, error)
flycash's avatar
flycash committed
	// CloseAndNilClient close the client and then reset the client in registry to nil
	// you should notice that this method will be invoked inside a lock.
	// So you should implement this method as light weighted as you can.
	CloseAndNilClient()
flycash's avatar
flycash committed
	// CloseListener close listeners
	CloseListener()
flycash's avatar
flycash committed
	// InitListeners init listeners
	InitListeners()
}

vito.he's avatar
vito.he committed
// BaseRegistry is a common logic abstract for registry. It implement Registry interface.
type BaseRegistry struct {
	context             context.Context
	facadeBasedRegistry FacadeBasedRegistry
	*common.URL
邹毅贤's avatar
邹毅贤 committed
	birth    int64          // time of file birth, seconds since Epoch; 0 if unknown
	wg       sync.WaitGroup // wg+done for zk restart
	done     chan struct{}
邹毅贤's avatar
邹毅贤 committed
	cltLock  sync.RWMutex          //ctl lock is a lock for services map
邹毅贤's avatar
邹毅贤 committed
	services map[string]common.URL // service name + protocol -> service config, for store the service registered
vito.he's avatar
vito.he committed
// InitBaseRegistry for init some local variables and set BaseRegistry's subclass to it
func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry {
	r.URL = url
	r.birth = time.Now().UnixNano()
	r.done = make(chan struct{})
	r.services = make(map[string]common.URL)
	r.facadeBasedRegistry = facadeRegistry
	return r
}

vito.he's avatar
vito.he committed
// GetUrl for get registry's url
func (r *BaseRegistry) GetUrl() common.URL {
	return *r.URL
}
vito.he's avatar
vito.he committed
// Destroy for graceful down
func (r *BaseRegistry) Destroy() {
	//first step close registry's all listeners
	r.facadeBasedRegistry.CloseListener()
	// then close r.done to notify other program who listen to it
	close(r.done)
	// wait waitgroup done (wait listeners outside close over)
	r.wg.Wait()
	//close registry client
	r.closeRegisters()
}

vito.he's avatar
vito.he committed
// Register implement interface registry to register
func (r *BaseRegistry) Register(conf common.URL) error {
	var (
		ok  bool
		err error
	)
	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
vito.he's avatar
vito.he committed
	// Check if the service has been registered
邹毅贤's avatar
邹毅贤 committed
	r.cltLock.Lock()
vito.he's avatar
vito.he committed
	_, ok = r.services[conf.Key()]
邹毅贤's avatar
邹毅贤 committed
	r.cltLock.Unlock()
vito.he's avatar
vito.he committed
	if ok {
		return perrors.Errorf("Path{%s} has been registered", conf.Key())
	}
vito.he's avatar
vito.he committed
	err = r.register(conf)
	if err != nil {
		return perrors.WithMessagef(err, "register(conf:%+v)", conf)
vito.he's avatar
vito.he committed
	r.cltLock.Lock()
	r.services[conf.Key()] = conf
	r.cltLock.Unlock()
	logger.Debugf("(%sRegistry)Register(conf{%#v})", common.DubboRole[role], conf)

邹毅贤's avatar
邹毅贤 committed
// UnRegister implement interface registry to unregister
func (r *BaseRegistry) UnRegister(conf common.URL) error {
邹毅贤's avatar
邹毅贤 committed
		ok     bool
		err    error
		oldURL common.URL
邹毅贤's avatar
邹毅贤 committed

	func() {
邹毅贤's avatar
邹毅贤 committed
		r.cltLock.Lock()
邹毅贤's avatar
邹毅贤 committed
		defer r.cltLock.Unlock()
邹毅贤's avatar
邹毅贤 committed
		oldURL, ok = r.services[conf.Key()]

		if !ok {
			err = perrors.Errorf("Path{%s} has not registered", conf.Key())
		}

邹毅贤's avatar
邹毅贤 committed
		delete(r.services, conf.Key())
	}()
邹毅贤's avatar
邹毅贤 committed
	if err != nil {
		return err
	}

	err = r.unregister(conf)
	if err != nil {
邹毅贤's avatar
邹毅贤 committed
		func() {
			r.cltLock.Lock()
			defer r.cltLock.Unlock()
			r.services[conf.Key()] = oldURL
		}()
		return perrors.WithMessagef(err, "register(conf:%+v)", conf)
	}

vito.he's avatar
vito.he committed
// service is for getting service path stored in url
func (r *BaseRegistry) service(c common.URL) string {
	return url.QueryEscape(c.Service())
}

vito.he's avatar
vito.he committed
// RestartCallBack for reregister when reconnect
func (r *BaseRegistry) RestartCallBack() bool {

	// copy r.services
flycash's avatar
flycash committed
	services := make([]common.URL, 0, len(r.services))
	for _, confIf := range r.services {
		services = append(services, confIf)
	}

	flag := true
	for _, confIf := range services {
		err := r.register(confIf)
		if err != nil {
			logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
				confIf, perrors.WithStack(err))
			flag = false
			break
		}
		logger.Infof("success to re-register service :%v", confIf.Key())
	}

	if flag {
		r.facadeBasedRegistry.InitListeners()
	}
vito.he's avatar
vito.he committed
// register for register url to registry, include init params
func (r *BaseRegistry) register(c common.URL) error {
邹毅贤's avatar
邹毅贤 committed
	return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath)
}

// unregister for unregister url to registry, include init params
func (r *BaseRegistry) unregister(c common.URL) error {
邹毅贤's avatar
邹毅贤 committed
	return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil)
邹毅贤's avatar
邹毅贤 committed
func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error, createPathFunc func(dubboPath string) error) error {
	if f == nil {
		panic(" Must provide a `function(string, string) error` to process URL. ")
	}
	var (
		err error
		//revision   string
		params     url.Values
		rawURL     string
		encodedURL string
		dubboPath  string
		//conf       config.URL
	)
	params = url.Values{}

	c.RangeParams(func(key, value string) bool {
		params.Add(key, value)
		return true
	})

	params.Add("pid", processID)
	params.Add("ip", localIP)
	//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))

	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
	switch role {

	case common.PROVIDER:
邹毅贤's avatar
邹毅贤 committed
		dubboPath, rawURL, err = r.providerRegistry(c, params, createPathFunc)
	case common.CONSUMER:
邹毅贤's avatar
邹毅贤 committed
		dubboPath, rawURL, err = r.consumerRegistry(c, params, createPathFunc)
vito.he's avatar
vito.he committed
	default:
		return perrors.Errorf("@c{%v} type is not referencer or provider", c)
	}
	encodedURL = url.QueryEscape(rawURL)
	dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
	err = f(dubboPath, encodedURL)
vito.he's avatar
vito.he committed
	if err != nil {
		return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL)
	}
	return nil
}
邹毅贤's avatar
邹毅贤 committed
// createPath will create dubbo path in register
func (r *BaseRegistry) createPath(dubboPath string) error {
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
	return r.facadeBasedRegistry.CreatePath(dubboPath)
}

vito.he's avatar
vito.he committed
// providerRegistry for provider role do
邹毅贤's avatar
邹毅贤 committed
func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values, createPathFunc func(dubboPath string) error) (string, string, error) {
vito.he's avatar
vito.he committed
	var (
		dubboPath string
		rawURL    string
		err       error
	)
	if c.Path == "" || len(c.Methods) == 0 {
		return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
	}
	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
邹毅贤's avatar
邹毅贤 committed
	if createPathFunc != nil {
		err = createPathFunc(dubboPath)
	}
vito.he's avatar
vito.he committed
	if err != nil {
		logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
		return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath)
	}
vito.he's avatar
vito.he committed
	params.Add(constant.ANYHOST_KEY, "true")
vito.he's avatar
vito.he committed
	// Dubbo java consumer to start looking for the provider url,because the category does not match,
	// the provider will not find, causing the consumer can not start, so we use consumers.
vito.he's avatar
vito.he committed
	if len(c.Methods) == 0 {
vito.he's avatar
vito.he committed
		params.Add(constant.METHODS_KEY, strings.Join(c.Methods, ","))
vito.he's avatar
vito.he committed
	}
	logger.Debugf("provider url params:%#v", params)
	var host string
	if c.Ip == "" {
flycash's avatar
flycash committed
		host = localIP
vito.he's avatar
vito.he committed
	} else {
flycash's avatar
flycash committed
		host = c.Ip
flycash's avatar
flycash committed
	host += ":" + c.Port
vito.he's avatar
vito.he committed
	rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
	// Print your own registration service providers.
	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String())
	logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
	return dubboPath, rawURL, nil
}
vito.he's avatar
vito.he committed
// consumerRegistry for consumer role do
邹毅贤's avatar
邹毅贤 committed
func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values, createPathFunc func(dubboPath string) error) (string, string, error) {
vito.he's avatar
vito.he committed
	var (
		dubboPath string
		rawURL    string
		err       error
	)
	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
flycash's avatar
flycash committed

邹毅贤's avatar
邹毅贤 committed
	if createPathFunc != nil {
		err = createPathFunc(dubboPath)
	}
	if err != nil {
vito.he's avatar
vito.he committed
		logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
		return "", "", perrors.WithStack(err)
vito.he's avatar
vito.he committed
	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
flycash's avatar
flycash committed

邹毅贤's avatar
邹毅贤 committed
	if createPathFunc != nil {
		err = createPathFunc(dubboPath)
	}
flycash's avatar
flycash committed

vito.he's avatar
vito.he committed
	if err != nil {
		logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
		return "", "", perrors.WithStack(err)
	}

	params.Add("protocol", c.Protocol)
	rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String())

	logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
	return dubboPath, rawURL, nil
vito.he's avatar
vito.he committed
// sleepWait...
func sleepWait(n int) {
	wait := time.Duration((n + 1) * 2e8)
	if wait > MaxWaitInterval {
		wait = MaxWaitInterval
	}
	time.Sleep(wait)
}

vito.he's avatar
vito.he committed
// Subscribe :subscribe from registry, event will notify by notifyListener
邹毅贤's avatar
邹毅贤 committed
func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
	n := 0
	for {
		n++
邹毅贤's avatar
邹毅贤 committed
		if !r.IsAvailable() {
			logger.Warnf("event listener game over.")
			return perrors.New("nacosRegistry is not available.")
		}

		listener, err := r.facadeBasedRegistry.DoSubscribe(url)
		if err != nil {
			if !r.IsAvailable() {
				logger.Warnf("event listener game over.")
				return err
			}
			logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
			continue
		}

		for {
			if serviceEvent, err := listener.Next(); err != nil {
				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
				listener.Close()
				break
			} else {
				logger.Infof("update begin, service event: %v", serviceEvent.String())
				notifyListener.Notify(serviceEvent)
			}

邹毅贤's avatar
邹毅贤 committed
		}
		sleepWait(n)
	}
邹毅贤's avatar
邹毅贤 committed
}
邹毅贤's avatar
邹毅贤 committed
// UnSubscribe URL
邹毅贤's avatar
邹毅贤 committed
func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error {
	if !r.IsAvailable() {
		logger.Warnf("event listener game over.")
		return nil
	}

邹毅贤's avatar
邹毅贤 committed
	listener, err := r.facadeBasedRegistry.DoUnsubscribe(url)
邹毅贤's avatar
邹毅贤 committed
	if err != nil {
		if !r.IsAvailable() {
			logger.Warnf("event listener game over.")
邹毅贤's avatar
邹毅贤 committed
			return nil
邹毅贤's avatar
邹毅贤 committed
		logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
		time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
		return perrors.WithStack(err)
	}
邹毅贤's avatar
邹毅贤 committed
	for {
		if serviceEvent, err := listener.Next(); err != nil {
			logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
			listener.Close()
			break
		} else {
			logger.Infof("update begin, service event: %v", serviceEvent.String())
			notifyListener.Notify(serviceEvent)
邹毅贤's avatar
邹毅贤 committed
	return nil
vito.he's avatar
vito.he committed
// closeRegisters close and remove registry client and reset services map
func (r *BaseRegistry) closeRegisters() {
flycash's avatar
flycash committed
	logger.Infof("begin to close provider client")
	r.cltLock.Lock()
	defer r.cltLock.Unlock()
	// Close and remove(set to nil) the registry client
	r.facadeBasedRegistry.CloseAndNilClient()
	// reset the services map
	r.services = nil
}

vito.he's avatar
vito.he committed
// IsAvailable judge to is registry not closed by chan r.done
func (r *BaseRegistry) IsAvailable() bool {
	select {
	case <-r.done:
		return false
	default:
		return true
	}
}

vito.he's avatar
vito.he committed
// WaitGroup open for outside add the waitgroup to add some logic before registry destroyed over(graceful down)
func (r *BaseRegistry) WaitGroup() *sync.WaitGroup {
	return &r.wg
}

vito.he's avatar
vito.he committed
// Done open for outside to listen the event of registry Destroy() called.
func (r *BaseRegistry) Done() chan struct{} {
	return r.done
}