diff --git a/registry/kubernetes/listener_test.go b/registry/kubernetes/listener_test.go index f89154e32e4ef3b6423d6970f9bfc0dd0207d7dd..88e1589020fe8b62a2e2f03690cece081302cc06 100644 --- a/registry/kubernetes/listener_test.go +++ b/registry/kubernetes/listener_test.go @@ -19,6 +19,8 @@ package kubernetes import ( "encoding/json" + "net/http" + _ "net/http/pprof" "os" "strconv" "testing" @@ -238,6 +240,8 @@ func (s *KubernetesRegistryTestSuite) SetupSuite() { t.Fatal(err) } + go http.ListenAndServe(":6061", nil) + } func (s *KubernetesRegistryTestSuite) TestDataChange() { diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 2a63660cc09150d09d683a9a1d99474074a895e8..79f066a9bdcb2e3e3136b450cebe6da37e2fc50e 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -23,9 +23,11 @@ import ( "path" "strings" "sync" + "time" ) import ( + "github.com/dubbogo/getty" "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" k8s "k8s.io/client-go/kubernetes" @@ -46,7 +48,9 @@ var ( ) const ( - Name = "kubernetes" + Name = "kubernetes" + ConnDelay = 3 + MaxFailTimes = 15 ) func init() { @@ -57,7 +61,7 @@ func init() { type kubernetesRegistry struct { registry.BaseRegistry - cltLock sync.Mutex + cltLock sync.RWMutex client *kubernetes.Client listenerLock sync.Mutex listener *kubernetes.EventListener @@ -66,13 +70,15 @@ type kubernetesRegistry struct { } func (r *kubernetesRegistry) Client() *kubernetes.Client { - return r.client + r.cltLock.RLock() + client := r.client + r.cltLock.RUnlock() + return client } func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { + r.cltLock.Lock() r.client = client -} -func (r *kubernetesRegistry) ClientLock() *sync.Mutex { - return &r.cltLock + r.cltLock.Unlock() } func (r *kubernetesRegistry) CloseAndNilClient() { @@ -154,7 +160,7 @@ func newKubernetesRegistry(url *common.URL) (registry.Registry, error) { } r.WaitGroup().Add(1) - go kubernetes.HandleClientRestart(r) + go r.HandleClientRestart() r.InitListeners() logger.Debugf("the kubernetes registry started") @@ -178,7 +184,56 @@ func newMockKubernetesRegistry( return nil, perrors.WithMessage(err, "new mock client") } r.WaitGroup().Add(1) //zk client start successful, then wg +1 - go kubernetes.HandleClientRestart(r) + go r.HandleClientRestart() r.InitListeners() return r, nil } + +func (r *kubernetesRegistry) HandleClientRestart() { + + var ( + err error + failTimes int + ) + + defer r.WaitGroup() +LOOP: + for { + select { + case <-r.Done(): + logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...") + break LOOP + // re-register all services + case <-r.Client().Done(): + r.Client().Close() + r.SetClient(nil) + + // try to connect to kubernetes, + failTimes = 0 + for { + select { + case <-r.Done(): + logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...") + break LOOP + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent + } + err = kubernetes.ValidateClient(r) + logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err)) + + if err == nil { + if r.RestartCallBack() { + break + } + } + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + } + } + } +} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} diff --git a/registry/kubernetes/registry_test.go b/registry/kubernetes/registry_test.go index 22873606b6fbe2d73661c04e6ba0d7a860c58898..a01af167c3ba4a5f4a10b6bd01e821229041408a 100644 --- a/registry/kubernetes/registry_test.go +++ b/registry/kubernetes/registry_test.go @@ -46,6 +46,7 @@ func (s *KubernetesRegistryTestSuite) TestRegister() { if err != nil { t.Fatal(err) } + r.WaitGroup().Done() } func (s *KubernetesRegistryTestSuite) TestSubscribe() { @@ -75,6 +76,8 @@ func (s *KubernetesRegistryTestSuite) TestSubscribe() { } t.Logf("got event %s", serviceEvent) + + r.WaitGroup().Done() } func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() { @@ -85,12 +88,12 @@ func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() { url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) - _, err := r.DoSubscribe(&url) + listener, err := r.DoSubscribe(&url) if err != nil { t.Fatal(err) } - //listener.Close() + listener.Close() time.Sleep(1e9) r.Destroy() @@ -108,7 +111,8 @@ func (s *KubernetesRegistryTestSuite) TestProviderDestroy() { err := r.Register(url) assert.NoError(t, err) - //listener.Close() + r.WaitGroup().Done() + time.Sleep(1e9) r.Destroy() assert.Equal(t, false, r.IsAvailable()) diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 605a05fb5fdb2f730b8776422a8e54becef5d0c9..43b8dd4ec3524f6b4528dabb3a142d02e3266cb9 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -157,7 +157,7 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte return nil, perrors.WithMessage(err, "watch pods") } - logger.Info("init kubernetes registry success") + logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) return c, nil } @@ -210,7 +210,7 @@ func newClient(namespace string) (*Client, error) { return nil, perrors.WithMessage(err, "watch pods") } - logger.Info("init kubernetes registry success") + logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) return c, nil } @@ -306,7 +306,7 @@ func (c *Client) watchPodsLoop() { }() for { - + onceWatch: wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{ LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(), Watch: true, @@ -362,7 +362,6 @@ func (c *Client) watchPodsLoop() { go c.handleWatchedPodEvent(p, event.Type) } } - onceWatch: } } @@ -398,7 +397,7 @@ func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { return } - logger.Debugf("prepare to put object (%#v) to kuberentes-watcherSet", o) + logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o) if err := c.watcherSet.Put(o); err != nil { logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err) @@ -454,7 +453,7 @@ func (c *Client) readCurrentPod() (*v1.Pod, error) { } // Create -// create k/v pair in storage +// create k/v pair in watcher-set func (c *Client) Create(k, v string) error { // 1. accord old pod && (k, v) assemble new pod dubbo annotion v @@ -671,26 +670,10 @@ func (c *Client) Close() { // validate the kubernetes client func ValidateClient(container clientFacade) error { - lock := container.ClientLock() - lock.Lock() - defer lock.Unlock() + client := container.Client() // new Client - if container.Client() == nil { - ns, err := getCurrentNameSpace() - if err != nil { - return perrors.WithMessage(err, "get current namespace") - } - newClient, err := newClient(ns) - if err != nil { - logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err) - return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns) - } - container.SetClient(newClient) - } - - if !container.Client().Valid() { - + if client == nil || client.Valid() { ns, err := getCurrentNameSpace() if err != nil { return perrors.WithMessage(err, "get current namespace") diff --git a/remoting/kubernetes/facade.go b/remoting/kubernetes/facade.go index 17a4f41767ede98fbf0b940ce6f8328aa9f79836..dd15c918b45c353b8395e0b82aee82216f48cd0e 100644 --- a/remoting/kubernetes/facade.go +++ b/remoting/kubernetes/facade.go @@ -17,78 +17,7 @@ package kubernetes -import ( - "sync" -) - -import ( - "github.com/dubbogo/getty" - perrors "github.com/pkg/errors" -) - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/logger" -) - -const ( - ConnDelay = 3 - MaxFailTimes = 15 -) - type clientFacade interface { Client() *Client SetClient(*Client) - ClientLock() *sync.Mutex - WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container - Done() chan struct{} //for etcd client control - RestartCallBack() bool - common.Node -} - -func HandleClientRestart(r clientFacade) { - - var ( - err error - failTimes int - ) - - defer r.WaitGroup().Done() -LOOP: - for { - select { - case <-r.Done(): - logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...") - break LOOP - // re-register all services - case <-r.Client().Done(): - r.ClientLock().Lock() - r.Client().Close() - r.SetClient(nil) - r.ClientLock().Unlock() - - // try to connect to kubernetes, - failTimes = 0 - for { - select { - case <-r.Done(): - logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...") - break LOOP - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent - } - err = ValidateClient(r) - logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err)) - - if err == nil { - if r.RestartCallBack() { - break - } - } - failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes - } - } - } - } } diff --git a/remoting/kubernetes/facade_test.go b/remoting/kubernetes/facade_test.go index cb9f92723fe59e07fa52cd4760aa3ae06887a885..024264ffdee14c2ad3fb01a8a5279084c0f085d9 100644 --- a/remoting/kubernetes/facade_test.go +++ b/remoting/kubernetes/facade_test.go @@ -24,15 +24,10 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" ) -import ( - "github.com/apache/dubbo-go/common" -) type mockFacade struct { client *Client cltLock sync.Mutex - wg sync.WaitGroup - URL *common.URL done chan struct{} } @@ -44,35 +39,6 @@ func (r *mockFacade) SetClient(client *Client) { r.client = client } -func (r *mockFacade) ClientLock() *sync.Mutex { - return &r.cltLock -} - -func (r *mockFacade) WaitGroup() *sync.WaitGroup { - return &r.wg -} - -func (r *mockFacade) Done() chan struct{} { - return r.done -} - -func (r *mockFacade) GetUrl() common.URL { - return *r.URL -} - -func (r *mockFacade) Destroy() { - close(r.done) - r.wg.Wait() -} - -func (r *mockFacade) RestartCallBack() bool { - return true -} - -func (r *mockFacade) IsAvailable() bool { - return true -} - func (s *KubernetesClientTestSuite) Test_Facade() { t := s.T() @@ -91,12 +57,12 @@ func (s *KubernetesClientTestSuite) Test_Facade() { t.Fatal(err) } - url, _ := common.NewURL("mock://127.0.0.1") m := &mockFacade{ client: mockClient, - URL: &url, } - go HandleClientRestart(m) + if err := ValidateClient(m); err == nil { + t.Fatal("out of cluster should err") + } mockClient.Close() } diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go index 575734ab30f839d0fa1264581d629dd5e27ac68d..c0f9e9f119d5b3cb007033767b53313475310150 100644 --- a/remoting/kubernetes/listener.go +++ b/remoting/kubernetes/listener.go @@ -19,7 +19,6 @@ package kubernetes import ( "sync" - "time" ) import ( @@ -157,10 +156,6 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener } } -func timeSecondDuration(sec int) time.Duration { - return time.Duration(sec) * time.Second -} - // this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent // |