diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go index 3ee1d7b3690749a0ccc224184ccbbcb395f2698e..377032ae29ad39ccca5cc05b051f14004edd0c56 100644 --- a/remoting/kubernetes/client.go +++ b/remoting/kubernetes/client.go @@ -19,9 +19,12 @@ package kubernetes import ( "context" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" v1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "strconv" "sync" ) @@ -45,10 +48,16 @@ type Client struct { // newClient // new a client for registry -func newClient() (*Client, error) { +func newClient(url common.URL) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) - controller, err := newDubboRegistryController(ctx, GetInClusterKubernetesClient) + + // read type + r, err := strconv.Atoi(url.GetParams().Get(constant.ROLE_KEY)) + if err != nil { + return nil, perrors.WithMessage(err, "atoi role") + } + controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient) if err != nil { return nil, perrors.WithMessage(err, "new dubbo-registry controller") } @@ -59,8 +68,9 @@ func newClient() (*Client, error) { controller: controller, } - c.controller.Run() - + if r == common.CONSUMER { + c.controller.Run() + } return c, nil } @@ -171,7 +181,8 @@ func ValidateClient(container clientFacade) error { // new Client if client == nil || client.Valid() { - newClient, err := newClient() + + newClient, err := newClient(container.GetUrl()) if err != nil { logger.Warnf("new kubernetes client (namespace{%s}: %v)", "", err) return perrors.WithMessagef(err, "new kubernetes client (:%+v)", "") @@ -187,7 +198,7 @@ func ValidateClient(container clientFacade) error { func NewMockClient(podList *v1.PodList) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) - controller, err := newDubboRegistryController(ctx, func() (kubernetes.Interface, error) { + controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) { return fake.NewSimpleClientset(podList), nil }) if err != nil { @@ -203,52 +214,3 @@ func NewMockClient(podList *v1.PodList) (*Client, error) { c.controller.Run() return c, nil } - -// newMockClient -// new a client for test -//func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) { -// -// rawClient, err := mockClientGenerator() -// if err != nil { -// return nil, perrors.WithMessage(err, "call mock generator") -// } -// -// currentPodName, err := getCurrentPodName() -// if err != nil { -// return nil, perrors.WithMessage(err, "get pod name") -// } -// -// ctx, cancel := context.WithCancel(context.Background()) -// -// c := &Client{ -// currentPodName: currentPodName, -// ns: namespace, -// rawClient: rawClient, -// ctx: ctx, -// watcherSet: newWatcherSet(ctx), -// cancel: cancel, -// } -// -// currentPod, err := c.initCurrentPod() -// if err != nil { -// return nil, perrors.WithMessage(err, "init current pod") -// } -// -// // record current status -// c.currentPod = currentPod -// -// // init the watcherSet by current pods -// if err := c.initWatchSet(); err != nil { -// return nil, perrors.WithMessage(err, "init watcherSet") -// } -// -// c.lastResourceVersion = c.currentPod.GetResourceVersion() -// -// // start kubernetes watch loop -// if err := c.watchPods(); err != nil { -// return nil, perrors.WithMessage(err, "watch pods") -// } -// -// logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name) -// return c, nil -//} diff --git a/remoting/kubernetes/facade.go b/remoting/kubernetes/facade.go index dd15c918b45c353b8395e0b82aee82216f48cd0e..dc060bbb0eb673c6e380dfa3e9d5f7bacbd3fc0b 100644 --- a/remoting/kubernetes/facade.go +++ b/remoting/kubernetes/facade.go @@ -17,7 +17,10 @@ package kubernetes +import "github.com/apache/dubbo-go/common" + type clientFacade interface { Client() *Client SetClient(*Client) + common.Node } diff --git a/remoting/kubernetes/facade_test.go b/remoting/kubernetes/facade_test.go index 45848165488f477bb71d96608b8fceac3e5942df..ce303d534aa7c6d772e1250894fbe4696e890a53 100644 --- a/remoting/kubernetes/facade_test.go +++ b/remoting/kubernetes/facade_test.go @@ -18,11 +18,15 @@ package kubernetes import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "strconv" "sync" "testing" ) type mockFacade struct { + *common.URL client *Client cltLock sync.Mutex done chan struct{} @@ -36,10 +40,31 @@ func (r *mockFacade) SetClient(client *Client) { r.client = client } +func (r *mockFacade) GetUrl() common.URL { + return *r.URL +} + +func (r *mockFacade) Destroy() { +} + +func (r *mockFacade) RestartCallBack() bool { + return true +} + +func (r *mockFacade) IsAvailable() bool { + return true +} func Test_Facade(t *testing.T) { + regUrl, err := common.NewURL("registry://127.0.0.1:443", + common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))) + if err != nil { + t.Fatal(err) + } + mockClient := getTestClient(t) m := &mockFacade{ + URL: ®Url, client: mockClient, } diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go index d1be1d9697ad276c553fa622a9dd6919240b310a..42489cda96901c432a16ddf2485be60bb4ef2713 100644 --- a/remoting/kubernetes/registry_controller.go +++ b/remoting/kubernetes/registry_controller.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "encoding/json" + "github.com/apache/dubbo-go/common" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -75,7 +76,7 @@ type dubboRegistryController struct { queue workqueue.Interface //shared by namespaced informers } -func newDubboRegistryController(ctx context.Context, kcGetter func() (kubernetes.Interface, error)) (*dubboRegistryController, error) { +func newDubboRegistryController(ctx context.Context, roleType common.RoleType, kcGetter func() (kubernetes.Interface, error)) (*dubboRegistryController, error) { kc, err := kcGetter() if err != nil { @@ -95,10 +96,6 @@ func newDubboRegistryController(ctx context.Context, kcGetter func() (kubernetes return nil, perrors.WithMessage(err, "dubbo registry controller read config") } - if err := c.init(); err != nil { - return nil, perrors.WithMessage(err, "dubbo registry controller init") - } - if err := c.initCurrentPod(); err != nil { return nil, perrors.WithMessage(err, "init current pod") } @@ -107,7 +104,13 @@ func newDubboRegistryController(ctx context.Context, kcGetter func() (kubernetes return nil, perrors.WithMessage(err, "init watch set") } - go c.run() + if roleType == common.CONSUMER { + // only consumer need list && watch + if err := c.initPodInformer(); err != nil { + return nil, perrors.WithMessage(err, "dubbo registry controller init pod informer") + } + go c.run() + } return c, nil } @@ -165,17 +168,6 @@ func (c *dubboRegistryController) readConfig() error { if len(c.namespace) == 0 { return perrors.New("read value from env by key (NAMESPACE)") } - // read need watched namespaces list - needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey) - if len(needWatchedNameSpaceList) == 0 { - return perrors.New("read value from env by key (DUBBO_NAMESPACE)") - } - for _, ns := range strings.Split(needWatchedNameSpaceList, ",") { - c.needWatchedNamespace[ns] = struct{}{} - } - - // current work namespace should be watched - c.needWatchedNamespace[c.namespace] = struct{}{} return nil } @@ -210,8 +202,19 @@ func (c *dubboRegistryController) initNamespacedPodInformer(ns string) { c.namespacedPodInformers[ns] = podInformer } -func (c *dubboRegistryController) init() error { +func (c *dubboRegistryController) initPodInformer() error { + // read need watched namespaces list + needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey) + if len(needWatchedNameSpaceList) == 0 { + return perrors.New("read value from env by key (DUBBO_NAMESPACE)") + } + for _, ns := range strings.Split(needWatchedNameSpaceList, ",") { + c.needWatchedNamespace[ns] = struct{}{} + } + + // current work namespace should be watched + c.needWatchedNamespace[c.namespace] = struct{}{} c.queue = workqueue.New() // init all watch needed pod-informer