Skip to content
Snippets Groups Projects
Commit 83d39758 authored by scott's avatar scott
Browse files

move handle-client-restart from remote to registry

parent 06b0da85
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,8 @@ package kubernetes
import (
"encoding/json"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"testing"
......@@ -238,6 +240,8 @@ func (s *KubernetesRegistryTestSuite) SetupSuite() {
t.Fatal(err)
}
go http.ListenAndServe(":6061", nil)
}
func (s *KubernetesRegistryTestSuite) TestDataChange() {
......
......@@ -23,9 +23,11 @@ import (
"path"
"strings"
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
k8s "k8s.io/client-go/kubernetes"
......@@ -46,7 +48,9 @@ var (
)
const (
Name = "kubernetes"
Name = "kubernetes"
ConnDelay = 3
MaxFailTimes = 15
)
func init() {
......@@ -57,7 +61,7 @@ func init() {
type kubernetesRegistry struct {
registry.BaseRegistry
cltLock sync.Mutex
cltLock sync.RWMutex
client *kubernetes.Client
listenerLock sync.Mutex
listener *kubernetes.EventListener
......@@ -66,13 +70,15 @@ type kubernetesRegistry struct {
}
func (r *kubernetesRegistry) Client() *kubernetes.Client {
return r.client
r.cltLock.RLock()
client := r.client
r.cltLock.RUnlock()
return client
}
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.cltLock.Lock()
r.client = client
}
func (r *kubernetesRegistry) ClientLock() *sync.Mutex {
return &r.cltLock
r.cltLock.Unlock()
}
func (r *kubernetesRegistry) CloseAndNilClient() {
......@@ -154,7 +160,7 @@ func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
}
r.WaitGroup().Add(1)
go kubernetes.HandleClientRestart(r)
go r.HandleClientRestart()
r.InitListeners()
logger.Debugf("the kubernetes registry started")
......@@ -178,7 +184,56 @@ func newMockKubernetesRegistry(
return nil, perrors.WithMessage(err, "new mock client")
}
r.WaitGroup().Add(1) //zk client start successful, then wg +1
go kubernetes.HandleClientRestart(r)
go r.HandleClientRestart()
r.InitListeners()
return r, nil
}
func (r *kubernetesRegistry) HandleClientRestart() {
var (
err error
failTimes int
)
defer r.WaitGroup()
LOOP:
for {
select {
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.Client().Close()
r.SetClient(nil)
// try to connect to kubernetes,
failTimes = 0
for {
select {
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
}
err = kubernetes.ValidateClient(r)
logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
......@@ -46,6 +46,7 @@ func (s *KubernetesRegistryTestSuite) TestRegister() {
if err != nil {
t.Fatal(err)
}
r.WaitGroup().Done()
}
func (s *KubernetesRegistryTestSuite) TestSubscribe() {
......@@ -75,6 +76,8 @@ func (s *KubernetesRegistryTestSuite) TestSubscribe() {
}
t.Logf("got event %s", serviceEvent)
r.WaitGroup().Done()
}
func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() {
......@@ -85,12 +88,12 @@ func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() {
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
_, err := r.DoSubscribe(&url)
listener, err := r.DoSubscribe(&url)
if err != nil {
t.Fatal(err)
}
//listener.Close()
listener.Close()
time.Sleep(1e9)
r.Destroy()
......@@ -108,7 +111,8 @@ func (s *KubernetesRegistryTestSuite) TestProviderDestroy() {
err := r.Register(url)
assert.NoError(t, err)
//listener.Close()
r.WaitGroup().Done()
time.Sleep(1e9)
r.Destroy()
assert.Equal(t, false, r.IsAvailable())
......
......@@ -157,7 +157,7 @@ func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Inte
return nil, perrors.WithMessage(err, "watch pods")
}
logger.Info("init kubernetes registry success")
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
}
......@@ -210,7 +210,7 @@ func newClient(namespace string) (*Client, error) {
return nil, perrors.WithMessage(err, "watch pods")
}
logger.Info("init kubernetes registry success")
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
}
......@@ -306,7 +306,7 @@ func (c *Client) watchPodsLoop() {
}()
for {
onceWatch:
wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
......@@ -362,7 +362,6 @@ func (c *Client) watchPodsLoop() {
go c.handleWatchedPodEvent(p, event.Type)
}
}
onceWatch:
}
}
......@@ -398,7 +397,7 @@ func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
return
}
logger.Debugf("prepare to put object (%#v) to kuberentes-watcherSet", o)
logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o)
if err := c.watcherSet.Put(o); err != nil {
logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
......@@ -454,7 +453,7 @@ func (c *Client) readCurrentPod() (*v1.Pod, error) {
}
// Create
// create k/v pair in storage
// create k/v pair in watcher-set
func (c *Client) Create(k, v string) error {
// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
......@@ -671,26 +670,10 @@ func (c *Client) Close() {
// validate the kubernetes client
func ValidateClient(container clientFacade) error {
lock := container.ClientLock()
lock.Lock()
defer lock.Unlock()
client := container.Client()
// new Client
if container.Client() == nil {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
}
newClient, err := newClient(ns)
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns)
}
container.SetClient(newClient)
}
if !container.Client().Valid() {
if client == nil || client.Valid() {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
......
......@@ -17,78 +17,7 @@
package kubernetes
import (
"sync"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
const (
ConnDelay = 3
MaxFailTimes = 15
)
type clientFacade interface {
Client() *Client
SetClient(*Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container
Done() chan struct{} //for etcd client control
RestartCallBack() bool
common.Node
}
func HandleClientRestart(r clientFacade) {
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()
// try to connect to kubernetes,
failTimes = 0
for {
select {
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
}
err = ValidateClient(r)
logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
......@@ -24,15 +24,10 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
import (
"github.com/apache/dubbo-go/common"
)
type mockFacade struct {
client *Client
cltLock sync.Mutex
wg sync.WaitGroup
URL *common.URL
done chan struct{}
}
......@@ -44,35 +39,6 @@ func (r *mockFacade) SetClient(client *Client) {
r.client = client
}
func (r *mockFacade) ClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *mockFacade) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *mockFacade) Done() chan struct{} {
return r.done
}
func (r *mockFacade) GetUrl() common.URL {
return *r.URL
}
func (r *mockFacade) Destroy() {
close(r.done)
r.wg.Wait()
}
func (r *mockFacade) RestartCallBack() bool {
return true
}
func (r *mockFacade) IsAvailable() bool {
return true
}
func (s *KubernetesClientTestSuite) Test_Facade() {
t := s.T()
......@@ -91,12 +57,12 @@ func (s *KubernetesClientTestSuite) Test_Facade() {
t.Fatal(err)
}
url, _ := common.NewURL("mock://127.0.0.1")
m := &mockFacade{
client: mockClient,
URL: &url,
}
go HandleClientRestart(m)
if err := ValidateClient(m); err == nil {
t.Fatal("out of cluster should err")
}
mockClient.Close()
}
......@@ -19,7 +19,6 @@ package kubernetes
import (
"sync"
"time"
)
import (
......@@ -157,10 +156,6 @@ func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment