Skip to content
Snippets Groups Projects
Commit df74c55b authored by scott.wang's avatar scott.wang
Browse files

adapte for new registry

parent 568c2df4
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,6 @@
package kubernetes
import (
"context"
"strings"
)
......@@ -55,7 +54,7 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(context.Background(), url)
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
......
......@@ -19,17 +19,14 @@ package kubernetes
import (
"fmt"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
)
import (
gxnet "github.com/dubbogo/gost/net"
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
......@@ -48,8 +45,7 @@ var (
)
const (
Name = "kubernetes"
RegistryConnDelay = 3
Name = "kubernetes"
)
func init() {
......@@ -59,22 +55,13 @@ func init() {
}
type kubernetesRegistry struct {
*common.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
registry.BaseRegistry
cltLock sync.Mutex
client *kubernetes.Client
services map[string]common.URL // service name + protocol -> service config
cltLock sync.Mutex
client *kubernetes.Client
listenerLock sync.Mutex
listener *kubernetes.EventListener
dataListener *dataListener
configListener *configurationListener
wg sync.WaitGroup // wg+done for kubernetes client restart
closeOnce sync.Once // protect the done
done chan struct{}
}
func (r *kubernetesRegistry) Client() *kubernetes.Client {
......@@ -86,129 +73,19 @@ func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
func (r *kubernetesRegistry) ClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *kubernetesRegistry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *kubernetesRegistry) GetDone() chan struct{} {
return r.done
}
func (r *kubernetesRegistry) RestartCallBack() bool {
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
}
for _, confIf := range services {
err := r.Register(confIf)
if err != nil {
logger.Errorf("(kubernetesProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
return false
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
return true
}
func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
r := &kubernetesRegistry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]common.URL),
}
if err := kubernetes.ValidateClient(r); err != nil {
return nil, err
}
r.wg.Add(1)
go kubernetes.HandleClientRestart(r)
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
return r, nil
}
func (r *kubernetesRegistry) GetUrl() common.URL {
return *r.URL
}
func (r *kubernetesRegistry) IsAvailable() bool {
select {
case <-r.done:
return false
default:
return true
}
}
func (r *kubernetesRegistry) Destroy() {
if r.configListener != nil {
r.configListener.Close()
}
r.stop()
}
func (r *kubernetesRegistry) stop() {
// close will be call concurrent
r.closeOnce.Do(func() {
close(r.done)
})
// close current client
func (r *kubernetesRegistry) CloseAndNilClient() {
r.client.Close()
r.cltLock.Lock()
r.client = nil
r.services = nil
r.cltLock.Unlock()
}
func (r *kubernetesRegistry) Register(svc common.URL) error {
role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if err != nil {
return perrors.WithMessage(err, "get registry role")
}
r.cltLock.Lock()
if _, ok := r.services[svc.Key()]; ok {
r.cltLock.Unlock()
return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
}
r.cltLock.Unlock()
switch role {
case common.PROVIDER:
logger.Debugf("(provider register )Register(conf{%#v})", svc)
if err := r.registerProvider(svc); err != nil {
return perrors.WithMessage(err, "register provider")
}
case common.CONSUMER:
logger.Debugf("(consumer register )Register(conf{%#v})", svc)
if err := r.registerConsumer(svc); err != nil {
return perrors.WithMessage(err, "register consumer")
}
default:
return perrors.New(fmt.Sprintf("unknown role %d", role))
func (r *kubernetesRegistry) CloseListener() {
if r.configListener != nil {
r.configListener.Close()
}
r.cltLock.Lock()
r.services[svc.Key()] = svc
r.cltLock.Unlock()
return nil
}
func (r *kubernetesRegistry) createDirIfNotExist(k string) error {
func (r *kubernetesRegistry) CreatePath(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
......@@ -220,87 +97,11 @@ func (r *kubernetesRegistry) createDirIfNotExist(k string) error {
return nil
}
func (r *kubernetesRegistry) registerConsumer(svc common.URL) error {
consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER])
if err := r.createDirIfNotExist(consumersNode); err != nil {
logger.Errorf("kubernetes client create path %s: %v", consumersNode, err)
return perrors.WithMessage(err, "kubernetes create consumer nodes")
}
// NOTICE kubernetes && etcdv3 not need create provider metadata dir in consumer logic
//providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
//if err := r.createDirIfNotExist(providersNode); err != nil {
// return perrors.WithMessage(err, "create provider node")
//}
params := url.Values{}
params.Add("protocol", svc.Protocol)
params.Add("category", (common.RoleType(common.CONSUMER)).String())
params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
}
func (r *kubernetesRegistry) registerProvider(svc common.URL) error {
if len(svc.Path) == 0 || len(svc.Methods) == 0 {
return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
}
var (
urlPath string
encodedURL string
dubboPath string
)
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return perrors.WithMessage(err, "create provider node")
}
params := url.Values{}
svc.RangeParams(func(key, value string) bool {
params[key] = []string{value}
return true
})
params.Add("pid", processID)
params.Add("ip", localIP)
params.Add("anyhost", "true")
params.Add("category", (common.RoleType(common.PROVIDER)).String())
params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
params.Add("side", (common.RoleType(common.PROVIDER)).Role())
logger.Debugf("provider url params:%#v", params)
var host string
if len(svc.Ip) == 0 {
host = localIP + ":" + svc.Port
} else {
host = svc.Ip + ":" + svc.Port
}
urlPath = svc.Path
encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, error) {
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
......@@ -321,12 +122,7 @@ func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, erro
listener := kubernetes.NewEventListener(r.client)
r.listenerLock.Lock()
// NOTICE:
// double-check the listener
// if r.listener already be assigned, discard the new value
if r.listener == nil {
r.listener = listener
}
r.listener = listener
r.listenerLock.Unlock()
}
......@@ -339,36 +135,28 @@ func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, erro
return configListener, nil
}
//subscribe from registry
func (r *kubernetesRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
}
listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
for {
if serviceEvent, err := listener.Next(); err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return
} else {
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
// actually, kubernetes use in-cluster config,
r := &kubernetesRegistry{}
}
r.InitBaseRegistry(url, r)
if err := kubernetes.ValidateClient(r); err != nil {
return nil, err
}
r.WaitGroup().Add(1)
go kubernetes.HandleClientRestart(r)
r.InitListeners()
logger.Debugf("the kubernetes registry started")
return r, nil
}
......@@ -40,8 +40,8 @@ type clientFacade interface {
Client() *Client
SetClient(*Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup
GetDone() chan struct{}
WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container
Done() chan struct{} //for etcd client control
RestartCallBack() bool
common.Node
}
......@@ -57,7 +57,7 @@ func HandleClientRestart(r clientFacade) {
LOOP:
for {
select {
case <-r.GetDone():
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
break LOOP
// re-register all services
......@@ -71,7 +71,7 @@ LOOP:
failTimes = 0
for {
select {
case <-r.GetDone():
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment