diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go index 0a494a422d9dadb7545885cc5b5f63e186198f95..7c59a36308318da8d167ba31e87f35151f98df0f 100644 --- a/registry/kubernetes/listener.go +++ b/registry/kubernetes/listener.go @@ -18,7 +18,6 @@ package kubernetes import ( - "context" "strings" ) @@ -55,7 +54,7 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool { return false } url := eventType.Path[index+len("/providers/"):] - serviceURL, err := common.NewURL(context.Background(), url) + serviceURL, err := common.NewURL(url) if err != nil { logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err) return false diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 61ac32fb42bc69dc2c1f702dbacba1c2f8c695d6..c68f0963a94683090becaf91cdf5513b738b816d 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -19,17 +19,14 @@ package kubernetes import ( "fmt" - "net/url" "os" "path" - "strconv" "strings" "sync" - "time" ) import ( - gxnet "github.com/dubbogo/gost/net" + "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) @@ -48,8 +45,7 @@ var ( ) const ( - Name = "kubernetes" - RegistryConnDelay = 3 + Name = "kubernetes" ) func init() { @@ -59,22 +55,13 @@ func init() { } type kubernetesRegistry struct { - *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown registry.BaseRegistry - - cltLock sync.Mutex - client *kubernetes.Client - services map[string]common.URL // service name + protocol -> service config - + cltLock sync.Mutex + client *kubernetes.Client listenerLock sync.Mutex listener *kubernetes.EventListener dataListener *dataListener configListener *configurationListener - - wg sync.WaitGroup // wg+done for kubernetes client restart - closeOnce sync.Once // protect the done - done chan struct{} } func (r *kubernetesRegistry) Client() *kubernetes.Client { @@ -86,129 +73,19 @@ func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { func (r *kubernetesRegistry) ClientLock() *sync.Mutex { return &r.cltLock } -func (r *kubernetesRegistry) WaitGroup() *sync.WaitGroup { - return &r.wg -} -func (r *kubernetesRegistry) GetDone() chan struct{} { - return r.done -} -func (r *kubernetesRegistry) RestartCallBack() bool { - - services := []common.URL{} - for _, confIf := range r.services { - services = append(services, confIf) - } - - for _, confIf := range services { - err := r.Register(confIf) - if err != nil { - logger.Errorf("(kubernetesProviderRegistry)register(conf{%#v}) = error{%#v}", - confIf, perrors.WithStack(err)) - return false - } - logger.Infof("success to re-register service :%v", confIf.Key()) - } - return true -} - -func newKubernetesRegistry(url *common.URL) (registry.Registry, error) { - r := &kubernetesRegistry{ - URL: url, - birth: time.Now().UnixNano(), - done: make(chan struct{}), - services: make(map[string]common.URL), - } - - if err := kubernetes.ValidateClient(r); err != nil { - return nil, err - } - - r.wg.Add(1) - go kubernetes.HandleClientRestart(r) - - r.listener = kubernetes.NewEventListener(r.client) - r.configListener = NewConfigurationListener(r) - r.dataListener = NewRegistryDataListener(r.configListener) - - return r, nil -} - -func (r *kubernetesRegistry) GetUrl() common.URL { - return *r.URL -} - -func (r *kubernetesRegistry) IsAvailable() bool { - - select { - case <-r.done: - return false - default: - return true - } -} - -func (r *kubernetesRegistry) Destroy() { - - if r.configListener != nil { - r.configListener.Close() - } - r.stop() -} - -func (r *kubernetesRegistry) stop() { - - // close will be call concurrent - r.closeOnce.Do(func() { - close(r.done) - }) - - // close current client +func (r *kubernetesRegistry) CloseAndNilClient() { r.client.Close() - - r.cltLock.Lock() r.client = nil - r.services = nil - r.cltLock.Unlock() } -func (r *kubernetesRegistry) Register(svc common.URL) error { - - role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) - if err != nil { - return perrors.WithMessage(err, "get registry role") - } - - r.cltLock.Lock() - if _, ok := r.services[svc.Key()]; ok { - r.cltLock.Unlock() - return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path)) - } - r.cltLock.Unlock() - - switch role { - case common.PROVIDER: - logger.Debugf("(provider register )Register(conf{%#v})", svc) - if err := r.registerProvider(svc); err != nil { - return perrors.WithMessage(err, "register provider") - } - case common.CONSUMER: - logger.Debugf("(consumer register )Register(conf{%#v})", svc) - if err := r.registerConsumer(svc); err != nil { - return perrors.WithMessage(err, "register consumer") - } - default: - return perrors.New(fmt.Sprintf("unknown role %d", role)) +func (r *kubernetesRegistry) CloseListener() { + if r.configListener != nil { + r.configListener.Close() } - - r.cltLock.Lock() - r.services[svc.Key()] = svc - r.cltLock.Unlock() - return nil } -func (r *kubernetesRegistry) createDirIfNotExist(k string) error { - +func (r *kubernetesRegistry) CreatePath(k string) error { var tmpPath string for _, str := range strings.Split(k, "/")[1:] { tmpPath = path.Join(tmpPath, "/", str) @@ -220,87 +97,11 @@ func (r *kubernetesRegistry) createDirIfNotExist(k string) error { return nil } -func (r *kubernetesRegistry) registerConsumer(svc common.URL) error { - - consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER]) - if err := r.createDirIfNotExist(consumersNode); err != nil { - logger.Errorf("kubernetes client create path %s: %v", consumersNode, err) - return perrors.WithMessage(err, "kubernetes create consumer nodes") - } - - // NOTICE kubernetes && etcdv3 not need create provider metadata dir in consumer logic - //providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) - //if err := r.createDirIfNotExist(providersNode); err != nil { - // return perrors.WithMessage(err, "create provider node") - //} - - params := url.Values{} - - params.Add("protocol", svc.Protocol) - - params.Add("category", (common.RoleType(common.CONSUMER)).String()) - params.Add("dubbo", "dubbogo-consumer-"+constant.Version) - - encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode())) - dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String()) - if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL) - } - - return nil -} - -func (r *kubernetesRegistry) registerProvider(svc common.URL) error { - - if len(svc.Path) == 0 || len(svc.Methods) == 0 { - return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods)) - } - - var ( - urlPath string - encodedURL string - dubboPath string - ) - - providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER]) - if err := r.createDirIfNotExist(providersNode); err != nil { - return perrors.WithMessage(err, "create provider node") - } - - params := url.Values{} - - svc.RangeParams(func(key, value string) bool { - params[key] = []string{value} - return true - }) - params.Add("pid", processID) - params.Add("ip", localIP) - params.Add("anyhost", "true") - params.Add("category", (common.RoleType(common.PROVIDER)).String()) - params.Add("dubbo", "dubbo-provider-golang-"+constant.Version) - params.Add("side", (common.RoleType(common.PROVIDER)).Role()) - - logger.Debugf("provider url params:%#v", params) - var host string - if len(svc.Ip) == 0 { - host = localIP + ":" + svc.Port - } else { - host = svc.Ip + ":" + svc.Port - } - - urlPath = svc.Path - - encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode())) - dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String()) - - if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil { - return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL) - } - - return nil +func (r *kubernetesRegistry) DoRegister(root string, node string) error { + return r.client.Create(path.Join(root, node), "") } -func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, error) { +func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( configListener *configurationListener @@ -321,12 +122,7 @@ func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, erro listener := kubernetes.NewEventListener(r.client) r.listenerLock.Lock() - // NOTICE: - // double-check the listener - // if r.listener already be assigned, discard the new value - if r.listener == nil { - r.listener = listener - } + r.listener = listener r.listenerLock.Unlock() } @@ -339,36 +135,28 @@ func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, erro return configListener, nil } -//subscribe from registry -func (r *kubernetesRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { - for { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } +func (r *kubernetesRegistry) InitListeners() { + r.listener = kubernetes.NewEventListener(r.client) + r.configListener = NewConfigurationListener(r) + r.dataListener = NewRegistryDataListener(r.configListener) +} - listener, err := r.subscribe(url) - if err != nil { - if !r.IsAvailable() { - logger.Warnf("event listener game over.") - return - } - logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) - continue - } +func newKubernetesRegistry(url *common.URL) (registry.Registry, error) { - for { - if serviceEvent, err := listener.Next(); err != nil { - logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) - listener.Close() - return - } else { - logger.Infof("update begin, service event: %v", serviceEvent.String()) - notifyListener.Notify(serviceEvent) - } + // actually, kubernetes use in-cluster config, + r := &kubernetesRegistry{} - } + r.InitBaseRegistry(url, r) + if err := kubernetes.ValidateClient(r); err != nil { + return nil, err } + + r.WaitGroup().Add(1) + go kubernetes.HandleClientRestart(r) + r.InitListeners() + + logger.Debugf("the kubernetes registry started") + + return r, nil } diff --git a/remoting/kubernetes/facade.go b/remoting/kubernetes/facade.go index 0ab689d5175e8fdd830a2a65128efe519cb64ec0..17a4f41767ede98fbf0b940ce6f8328aa9f79836 100644 --- a/remoting/kubernetes/facade.go +++ b/remoting/kubernetes/facade.go @@ -40,8 +40,8 @@ type clientFacade interface { Client() *Client SetClient(*Client) ClientLock() *sync.Mutex - WaitGroup() *sync.WaitGroup - GetDone() chan struct{} + WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container + Done() chan struct{} //for etcd client control RestartCallBack() bool common.Node } @@ -57,7 +57,7 @@ func HandleClientRestart(r clientFacade) { LOOP: for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...") break LOOP // re-register all services @@ -71,7 +71,7 @@ LOOP: failTimes = 0 for { select { - case <-r.GetDone(): + case <-r.Done(): logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...") break LOOP case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent