Skip to content
Snippets Groups Projects
Commit 3054b7fe authored by scott.wang's avatar scott.wang
Browse files

Cancel Provider's ListAndWatch function

parent 775857c3
No related branches found
No related tags found
No related merge requests found
......@@ -19,9 +19,12 @@ package kubernetes
import (
"context"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"strconv"
"sync"
)
......@@ -45,10 +48,16 @@ type Client struct {
// newClient
// new a client for registry
func newClient() (*Client, error) {
func newClient(url common.URL) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
controller, err := newDubboRegistryController(ctx, GetInClusterKubernetesClient)
// read type
r, err := strconv.Atoi(url.GetParams().Get(constant.ROLE_KEY))
if err != nil {
return nil, perrors.WithMessage(err, "atoi role")
}
controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient)
if err != nil {
return nil, perrors.WithMessage(err, "new dubbo-registry controller")
}
......@@ -59,8 +68,9 @@ func newClient() (*Client, error) {
controller: controller,
}
c.controller.Run()
if r == common.CONSUMER {
c.controller.Run()
}
return c, nil
}
......@@ -171,7 +181,8 @@ func ValidateClient(container clientFacade) error {
// new Client
if client == nil || client.Valid() {
newClient, err := newClient()
newClient, err := newClient(container.GetUrl())
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", "", err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", "")
......@@ -187,7 +198,7 @@ func ValidateClient(container clientFacade) error {
func NewMockClient(podList *v1.PodList) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
controller, err := newDubboRegistryController(ctx, func() (kubernetes.Interface, error) {
controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) {
return fake.NewSimpleClientset(podList), nil
})
if err != nil {
......@@ -203,52 +214,3 @@ func NewMockClient(podList *v1.PodList) (*Client, error) {
c.controller.Run()
return c, nil
}
// newMockClient
// new a client for test
//func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
//
// rawClient, err := mockClientGenerator()
// if err != nil {
// return nil, perrors.WithMessage(err, "call mock generator")
// }
//
// currentPodName, err := getCurrentPodName()
// if err != nil {
// return nil, perrors.WithMessage(err, "get pod name")
// }
//
// ctx, cancel := context.WithCancel(context.Background())
//
// c := &Client{
// currentPodName: currentPodName,
// ns: namespace,
// rawClient: rawClient,
// ctx: ctx,
// watcherSet: newWatcherSet(ctx),
// cancel: cancel,
// }
//
// currentPod, err := c.initCurrentPod()
// if err != nil {
// return nil, perrors.WithMessage(err, "init current pod")
// }
//
// // record current status
// c.currentPod = currentPod
//
// // init the watcherSet by current pods
// if err := c.initWatchSet(); err != nil {
// return nil, perrors.WithMessage(err, "init watcherSet")
// }
//
// c.lastResourceVersion = c.currentPod.GetResourceVersion()
//
// // start kubernetes watch loop
// if err := c.watchPods(); err != nil {
// return nil, perrors.WithMessage(err, "watch pods")
// }
//
// logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
// return c, nil
//}
......@@ -17,7 +17,10 @@
package kubernetes
import "github.com/apache/dubbo-go/common"
type clientFacade interface {
Client() *Client
SetClient(*Client)
common.Node
}
......@@ -18,11 +18,15 @@
package kubernetes
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"strconv"
"sync"
"testing"
)
type mockFacade struct {
*common.URL
client *Client
cltLock sync.Mutex
done chan struct{}
......@@ -36,10 +40,31 @@ func (r *mockFacade) SetClient(client *Client) {
r.client = client
}
func (r *mockFacade) GetUrl() common.URL {
return *r.URL
}
func (r *mockFacade) Destroy() {
}
func (r *mockFacade) RestartCallBack() bool {
return true
}
func (r *mockFacade) IsAvailable() bool {
return true
}
func Test_Facade(t *testing.T) {
regUrl, err := common.NewURL("registry://127.0.0.1:443",
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)))
if err != nil {
t.Fatal(err)
}
mockClient := getTestClient(t)
m := &mockFacade{
URL: &regUrl,
client: mockClient,
}
......
......@@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"github.com/apache/dubbo-go/common"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
......@@ -75,7 +76,7 @@ type dubboRegistryController struct {
queue workqueue.Interface //shared by namespaced informers
}
func newDubboRegistryController(ctx context.Context, kcGetter func() (kubernetes.Interface, error)) (*dubboRegistryController, error) {
func newDubboRegistryController(ctx context.Context, roleType common.RoleType, kcGetter func() (kubernetes.Interface, error)) (*dubboRegistryController, error) {
kc, err := kcGetter()
if err != nil {
......@@ -95,10 +96,6 @@ func newDubboRegistryController(ctx context.Context, kcGetter func() (kubernetes
return nil, perrors.WithMessage(err, "dubbo registry controller read config")
}
if err := c.init(); err != nil {
return nil, perrors.WithMessage(err, "dubbo registry controller init")
}
if err := c.initCurrentPod(); err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
......@@ -107,7 +104,13 @@ func newDubboRegistryController(ctx context.Context, kcGetter func() (kubernetes
return nil, perrors.WithMessage(err, "init watch set")
}
go c.run()
if roleType == common.CONSUMER {
// only consumer need list && watch
if err := c.initPodInformer(); err != nil {
return nil, perrors.WithMessage(err, "dubbo registry controller init pod informer")
}
go c.run()
}
return c, nil
}
......@@ -165,17 +168,6 @@ func (c *dubboRegistryController) readConfig() error {
if len(c.namespace) == 0 {
return perrors.New("read value from env by key (NAMESPACE)")
}
// read need watched namespaces list
needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
if len(needWatchedNameSpaceList) == 0 {
return perrors.New("read value from env by key (DUBBO_NAMESPACE)")
}
for _, ns := range strings.Split(needWatchedNameSpaceList, ",") {
c.needWatchedNamespace[ns] = struct{}{}
}
// current work namespace should be watched
c.needWatchedNamespace[c.namespace] = struct{}{}
return nil
}
......@@ -210,8 +202,19 @@ func (c *dubboRegistryController) initNamespacedPodInformer(ns string) {
c.namespacedPodInformers[ns] = podInformer
}
func (c *dubboRegistryController) init() error {
func (c *dubboRegistryController) initPodInformer() error {
// read need watched namespaces list
needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
if len(needWatchedNameSpaceList) == 0 {
return perrors.New("read value from env by key (DUBBO_NAMESPACE)")
}
for _, ns := range strings.Split(needWatchedNameSpaceList, ",") {
c.needWatchedNamespace[ns] = struct{}{}
}
// current work namespace should be watched
c.needWatchedNamespace[c.namespace] = struct{}{}
c.queue = workqueue.New()
// init all watch needed pod-informer
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment