Skip to content
Snippets Groups Projects
Unverified Commit 106040d5 authored by Ming Deng's avatar Ming Deng Committed by GitHub
Browse files

Merge pull request #577 from sxllwx/k8s_enhance

kubernetes as registry enhance
parents a7af5385 0f9a7170
No related branches found
No related tags found
No related merge requests found
Showing
with 1116 additions and 1285 deletions
......@@ -103,7 +103,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
return nil, perrors.New("listener stopped")
case e := <-l.events:
logger.Infof("got kubernetes event %#v", e)
logger.Debugf("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
case <-l.registry.Done():
......
......@@ -18,24 +18,15 @@
package kubernetes
import (
"encoding/json"
"os"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
......@@ -184,66 +175,7 @@ type MockDataListener struct{}
func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {}
type KubernetesRegistryTestSuite struct {
suite.Suite
currentPod v1.Pod
}
func (s *KubernetesRegistryTestSuite) initRegistry() *kubernetesRegistry {
t := s.T()
regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
mock, err := newMockKubernetesRegistry(&regurl, s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) {
out := fake.NewSimpleClientset()
// mock current pod
if _, err = out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil {
t.Fatal(err)
}
return out, nil
})
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
return mock.(*kubernetesRegistry)
}
func (s *KubernetesRegistryTestSuite) SetupSuite() {
t := s.T()
const (
// kubernetes inject the var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
)
// 1. install test data
if err := json.Unmarshal([]byte(clientPodJsonData), &s.currentPod); err != nil {
t.Fatal(err)
}
// 2. set downward-api inject env
if err := os.Setenv(podNameKey, s.currentPod.GetName()); err != nil {
t.Fatal(err)
}
if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil {
t.Fatal(err)
}
}
func (s *KubernetesRegistryTestSuite) TestDataChange() {
t := s.T()
func TestDataChange(t *testing.T) {
listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
......@@ -252,7 +184,3 @@ func (s *KubernetesRegistryTestSuite) TestDataChange() {
t.Fatal("data change not ok")
}
}
func TestKubernetesRegistrySuite(t *testing.T) {
suite.Run(t, &KubernetesRegistryTestSuite{})
}
......@@ -29,7 +29,7 @@ import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
k8s "k8s.io/client-go/kubernetes"
v1 "k8s.io/api/core/v1"
)
import (
......@@ -160,15 +160,14 @@ func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
go r.HandleClientRestart()
r.InitListeners()
logger.Debugf("the kubernetes registry started")
logger.Debugf("kubernetes registry started")
return r, nil
}
func newMockKubernetesRegistry(
url *common.URL,
namespace string,
clientGeneratorFunc func() (k8s.Interface, error),
podsList *v1.PodList,
) (registry.Registry, error) {
var err error
......@@ -176,7 +175,7 @@ func newMockKubernetesRegistry(
r := &kubernetesRegistry{}
r.InitBaseRegistry(url, r)
r.client, err = kubernetes.NewMockClient(namespace, clientGeneratorFunc)
r.client, err = kubernetes.NewMockClient(podsList)
if err != nil {
return nil, perrors.WithMessage(err, "new mock client")
}
......
......@@ -18,12 +18,17 @@
package kubernetes
import (
"encoding/json"
"os"
"strconv"
"sync"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
)
import (
......@@ -31,11 +36,212 @@ import (
"github.com/apache/dubbo-go/common/constant"
)
func (s *KubernetesRegistryTestSuite) TestRegister() {
var clientPodListJsonData = `{
"apiVersion": "v1",
"items": [
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"annotations": {
"dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5saW1pdCUzRCUyNmV4ZWN1dGUubGltaXQucmVqZWN0ZWQuaGFuZGxlciUzRCUyNmdyb3VwJTNEJTI2aW50ZXJmYWNlJTNEY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyJTI2aXAlM0QxNzIuMTcuMC42JTI2bG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIubG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIucmV0cmllcyUzRDElMjZtZXRob2RzLkdldFVzZXIudHBzLmxpbWl0LmludGVydmFsJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5yYXRlJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5zdHJhdGVneSUzRCUyNm1ldGhvZHMuR2V0VXNlci53ZWlnaHQlM0QwJTI2bW9kdWxlJTNEZHViYm9nbyUyQnVzZXItaW5mbyUyQnNlcnZlciUyNm5hbWUlM0RCRFRTZXJ2aWNlJTI2b3JnYW5pemF0aW9uJTNEaWt1cmVudG8uY29tJTI2b3duZXIlM0RaWCUyNnBhcmFtLnNpZ24lM0QlMjZwaWQlM0Q2JTI2cmVnaXN0cnkucm9sZSUzRDMlMjZyZWxlYXNlJTNEZHViYm8tZ29sYW5nLTEuMy4wJTI2cmV0cmllcyUzRCUyNnNlcnZpY2UuZmlsdGVyJTNEZWNobyUyNTJDdG9rZW4lMjUyQ2FjY2Vzc2xvZyUyNTJDdHBzJTI1MkNnZW5lcmljX3NlcnZpY2UlMjUyQ2V4ZWN1dGUlMjUyQ3BzaHV0ZG93biUyNnNpZGUlM0Rwcm92aWRlciUyNnRpbWVzdGFtcCUzRDE1OTExNTYxNTUlMjZ0cHMubGltaXQuaW50ZXJ2YWwlM0QlMjZ0cHMubGltaXQucmF0ZSUzRCUyNnRwcy5saW1pdC5yZWplY3RlZC5oYW5kbGVyJTNEJTI2dHBzLmxpbWl0LnN0cmF0ZWd5JTNEJTI2dHBzLmxpbWl0ZXIlM0QlMjZ2ZXJzaW9uJTNEJTI2d2FybXVwJTNEMTAwIiwidiI6IiJ9XQ=="
},
"creationTimestamp": "2020-06-03T03:49:14Z",
"generateName": "server-84c864f5bc-",
"labels": {
"dubbo.io/label": "dubbo.io-value",
"pod-template-hash": "84c864f5bc",
"role": "server"
},
"name": "server-84c864f5bc-r8qvz",
"namespace": "default",
"ownerReferences": [
{
"apiVersion": "apps/v1",
"blockOwnerDeletion": true,
"controller": true,
"kind": "ReplicaSet",
"name": "server-84c864f5bc",
"uid": "fa376dbb-4f37-4705-8e80-727f592c19b3"
}
],
"resourceVersion": "517460",
"selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz",
"uid": "f4fc811c-200c-4445-8d4f-532144957dcc"
},
"spec": {
"containers": [
{
"env": [
{
"name": "DUBBO_NAMESPACE",
"value": "default"
},
{
"name": "NAMESPACE",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "metadata.namespace"
}
}
}
],
"image": "192.168.240.101:5000/scott/go-server",
"imagePullPolicy": "Always",
"name": "server",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"volumeMounts": [
{
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
"name": "dubbo-sa-token-5qbtb",
"readOnly": true
}
]
}
],
"dnsPolicy": "ClusterFirst",
"enableServiceLinks": true,
"nodeName": "minikube",
"priority": 0,
"restartPolicy": "Always",
"schedulerName": "default-scheduler",
"securityContext": {},
"serviceAccount": "dubbo-sa",
"serviceAccountName": "dubbo-sa",
"terminationGracePeriodSeconds": 30,
"tolerations": [
{
"effect": "NoExecute",
"key": "node.kubernetes.io/not-ready",
"operator": "Exists",
"tolerationSeconds": 300
},
{
"effect": "NoExecute",
"key": "node.kubernetes.io/unreachable",
"operator": "Exists",
"tolerationSeconds": 300
}
],
"volumes": [
{
"name": "dubbo-sa-token-5qbtb",
"secret": {
"defaultMode": 420,
"secretName": "dubbo-sa-token-5qbtb"
}
}
]
},
"status": {
"conditions": [
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:14Z",
"status": "True",
"type": "Initialized"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:15Z",
"status": "True",
"type": "Ready"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:15Z",
"status": "True",
"type": "ContainersReady"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:14Z",
"status": "True",
"type": "PodScheduled"
}
],
"containerStatuses": [
{
"containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406",
"image": "192.168.240.101:5000/scott/go-server:latest",
"imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461",
"lastState": {},
"name": "server",
"ready": true,
"restartCount": 0,
"started": true,
"state": {
"running": {
"startedAt": "2020-06-03T03:49:15Z"
}
}
}
],
"hostIP": "10.0.2.15",
"phase": "Running",
"podIP": "172.17.0.6",
"podIPs": [
{
"ip": "172.17.0.6"
}
],
"qosClass": "BestEffort",
"startTime": "2020-06-03T03:49:14Z"
}
}
],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": ""
}
}
`
func getTestRegistry(t *testing.T) *kubernetesRegistry {
const (
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
)
pl := &v1.PodList{}
// 1. install test data
if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil {
t.Fatal(err)
}
currentPod := pl.Items[0]
env := map[string]string{
nameSpaceKey: currentPod.GetNamespace(),
podNameKey: currentPod.GetName(),
needWatchedNameSpaceKey: "default",
}
t := s.T()
for k, v := range env {
if err := os.Setenv(k, v); err != nil {
t.Fatal(err)
}
}
r := s.initRegistry()
regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
out, err := newMockKubernetesRegistry(&regurl, pl)
if err != nil {
t.Fatal(err)
}
return out.(*kubernetesRegistry)
}
func TestRegister(t *testing.T) {
r := getTestRegistry(t)
defer r.Destroy()
url, _ := common.NewURL(
......@@ -52,41 +258,44 @@ func (s *KubernetesRegistryTestSuite) TestRegister() {
}
}
func (s *KubernetesRegistryTestSuite) TestSubscribe() {
func TestSubscribe(t *testing.T) {
t := s.T()
r := s.initRegistry()
r := getTestRegistry(t)
defer r.Destroy()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
if err != nil {
t.Fatal(err)
}
listener, err := r.DoSubscribe(&url)
if err != nil {
t.Fatal(err)
}
time.Sleep(1e9)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
registerErr := r.Register(url)
if registerErr != nil {
t.Fatal(registerErr)
}
}()
wg.Wait()
serviceEvent, err := listener.Next()
if err != nil {
t.Fatal(err)
}
t.Logf("got event %s", serviceEvent)
t.Logf("get service event %s", serviceEvent)
}
func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() {
func TestConsumerDestroy(t *testing.T) {
t := s.T()
r := s.initRegistry()
r := getTestRegistry(t)
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
common.WithParamsValue(constant.CLUSTER_KEY, "mock"),
......@@ -105,11 +314,9 @@ func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() {
}
func (s *KubernetesRegistryTestSuite) TestProviderDestroy() {
t := s.T()
func TestProviderDestroy(t *testing.T) {
r := s.initRegistry()
r := getTestRegistry(t)
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
common.WithParamsValue(constant.CLUSTER_KEY, "mock"),
......@@ -122,9 +329,7 @@ func (s *KubernetesRegistryTestSuite) TestProviderDestroy() {
assert.Equal(t, false, r.IsAvailable())
}
func (s *KubernetesRegistryTestSuite) TestNewRegistry() {
t := s.T()
func TestNewRegistry(t *testing.T) {
regUrl, err := common.NewURL("registry://127.0.0.1:443",
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
......@@ -137,9 +342,9 @@ func (s *KubernetesRegistryTestSuite) TestNewRegistry() {
}
}
func (s *KubernetesRegistryTestSuite) TestHandleClientRestart() {
func TestHandleClientRestart(t *testing.T) {
r := s.initRegistry()
r := getTestRegistry(t)
r.WaitGroup().Add(1)
go r.HandleClientRestart()
time.Sleep(timeSecondDuration(1))
......
......@@ -19,442 +19,62 @@ package kubernetes
import (
"context"
"encoding/base64"
"encoding/json"
"os"
"strconv"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/kubernetes/fake"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
const (
// kubernetes inject the var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
// all pod annotation key
DubboIOAnnotationKey = "dubbo.io/annotation"
DubboIOLabelKey = "dubbo.io/label"
DubboIOLabelValue = "dubbo.io-value"
)
var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)
type Client struct {
// kubernetes connection config
cfg *rest.Config
// the kubernetes interface
rawClient kubernetes.Interface
// current pod config
currentPodName string
ns string
// current resource version
lastResourceVersion string
// the memory watcherSet
watcherSet WatcherSet
// protect the wg && currentPod
lock sync.RWMutex
// current pod status
currentPod *v1.Pod
// protect the watchPods loop && watcher
wg sync.WaitGroup
// manage the client lifecycle
ctx context.Context
cancel context.CancelFunc
}
// load CurrentPodName
func getCurrentPodName() (string, error) {
v := os.Getenv(podNameKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (HOSTNAME)")
}
return v, nil
}
// load CurrentNameSpace
func getCurrentNameSpace() (string, error) {
v := os.Getenv(nameSpaceKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (NAMESPACE)")
}
return v, nil
}
// NewMockClient
// export for registry package test
func NewMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
return newMockClient(namespace, mockClientGenerator)
}
// newMockClient
// new a client for test
func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
rawClient, err := mockClientGenerator()
if err != nil {
return nil, perrors.WithMessage(err, "call mock generator")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
rawClient: rawClient,
ctx: ctx,
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
c.lastResourceVersion = c.currentPod.GetResourceVersion()
// start kubernetes watch loop
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "watch pods")
}
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
controller *dubboRegistryController
}
// newClient
// new a client for registry
func newClient(namespace string) (*Client, error) {
func newClient(url common.URL) (*Client, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, perrors.WithMessage(err, "get in-cluster config")
}
ctx, cancel := context.WithCancel(context.Background())
rawClient, err := kubernetes.NewForConfig(cfg)
// read type
r, err := strconv.Atoi(url.GetParams().Get(constant.ROLE_KEY))
if err != nil {
return nil, perrors.WithMessage(err, "new kubernetes client by in cluster config")
return nil, perrors.WithMessage(err, "atoi role")
}
currentPodName, err := getCurrentPodName()
controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient)
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
return nil, perrors.WithMessage(err, "new dubbo-registry controller")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
ctx: ctx,
cancel: cancel,
controller: controller,
}
// start kubernetes watch loop
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "watch pods")
if r == common.CONSUMER {
// only consumer have to start informer factory
c.controller.startALLInformers()
}
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
}
// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
func (c *Client) initCurrentPod() (*v1.Pod, error) {
// read the current pod status
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
if err != nil {
if err != ErrDubboLabelAlreadyExist {
return nil, perrors.WithMessage(err, "assemble dubbo label")
}
// current pod don't have label
}
p, err := c.getPatch(oldPod, newPod)
if err != nil {
return nil, perrors.WithMessage(err, "get patch")
}
currentPod, err = c.patchCurrentPod(p)
if err != nil {
return nil, perrors.WithMessage(err, "patch to current pod")
}
return currentPod, nil
}
// initWatchSet
// 1. get all with dubbo label pods
// 2. put every element to watcherSet
func (c *Client) initWatchSet() error {
pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
})
if err != nil {
return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns)
}
// set resource version
c.lastResourceVersion = pods.GetResourceVersion()
for _, pod := range pods.Items {
logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations())
c.handleWatchedPodEvent(&pod, watch.Added)
}
return nil
}
// watchPods
// try to watch kubernetes pods
func (c *Client) watchPods() error {
// try once
watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: c.lastResourceVersion,
})
if err != nil {
return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns)
}
watcher.Stop()
c.wg.Add(1)
// add wg, grace close the client
go c.watchPodsLoop()
return nil
}
type resourceVersionGetter interface {
GetResourceVersion() string
}
// watchPods
// try to notify
func (c *Client) watchPodsLoop() {
defer func() {
// notify other goroutine, this loop over
c.wg.Done()
logger.Info("watchPodsLoop goroutine game over")
}()
for {
onceWatch:
wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: c.lastResourceVersion,
})
if err != nil {
logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err)
time.Sleep(2 * time.Second)
continue
}
logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion)
for {
select {
// double check ctx
case <-c.ctx.Done():
logger.Infof("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan()))
return
// get one element from result-chan
case event, ok := <-wc.ResultChan():
if !ok {
wc.Stop()
logger.Info("kubernetes watch chan die, create new")
goto onceWatch
}
if event.Type == watch.Error {
// watched a error event
logger.Warnf("kubernetes watch api report err (%#v)", event)
continue
}
o, ok := event.Object.(resourceVersionGetter)
if !ok {
logger.Warnf("kubernetes response object not a versioned object, its real type %T", event.Object)
continue
}
// record the last resource version avoid to sync all pod
c.lastResourceVersion = o.GetResourceVersion()
logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion)
// check event object type
p, ok := event.Object.(*v1.Pod)
if !ok {
logger.Warnf("kubernetes response object not a Pod, its real type %T", event.Object)
continue
}
logger.Debugf("kubernetes got pod %#v", p)
// handle the watched pod
go c.handleWatchedPodEvent(p, event.Type)
}
}
}
}
// handleWatchedPodEvent
// handle watched pod event
func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
for ak, av := range p.GetAnnotations() {
// not dubbo interest annotation
if ak != DubboIOAnnotationKey {
continue
}
ol, err := c.unmarshalRecord(av)
if err != nil {
logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
return
}
for _, o := range ol {
switch eventType {
case watch.Added:
// if pod is added, the record always be create
o.EventType = Create
case watch.Modified:
o.EventType = Update
case watch.Deleted:
o.EventType = Delete
default:
logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
return
}
logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o)
if err := c.watcherSet.Put(o); err != nil {
logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
return
}
}
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) {
if len(record) == 0 {
// []*WatcherEvent is nil.
return nil, nil
}
rawMsg, err := base64.URLEncoding.DecodeString(record)
if err != nil {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}
var out []*WatcherEvent
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
return out, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
return "", perrors.WithMessage(err, "json encode object list")
}
return base64.URLEncoding.EncodeToString(msg), nil
}
// readCurrentPod
// read the current pod status from kubernetes api
func (c *Client) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
return currentPod, nil
}
// Create
// create k/v pair in watcher-set
func (c *Client) Create(k, v string) error {
......@@ -464,132 +84,19 @@ func (c *Client) Create(k, v string) error {
c.lock.Lock()
defer c.lock.Unlock()
// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
// 2. get patch data
// 3. PATCH the pod
currentPod, err := c.readCurrentPod()
if err != nil {
return perrors.WithMessage(err, "read current pod")
}
oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
if err != nil {
return perrors.WithMessage(err, "assemble")
}
patchBytes, err := c.getPatch(oldPod, newPod)
if err != nil {
return perrors.WithMessage(err, "get patch")
}
updatedPod, err := c.patchCurrentPod(patchBytes)
if err != nil {
return perrors.WithMessage(err, "patch current pod")
if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil {
return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v)
}
c.currentPod = updatedPod
logger.Debugf("put the @key = %s @value = %s success", k, v)
// not update the watcherSet, the watcherSet should be write by the watchPodsLoop
return nil
}
// patch current pod
// write new meta for current pod
func (c *Client) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.rawClient.CoreV1().Pods(c.ns).Patch(c.currentPodName, types.StrategicMergePatchType, patch)
if err != nil {
return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
}
return updatedPod, nil
}
// assemble the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *Client) assembleDUBBOLabel(currentPod *v1.Pod) (*v1.Pod, *v1.Pod, error) {
var (
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
)
oldPod.Labels = make(map[string]string, 8)
newPod.Labels = make(map[string]string, 8)
if currentPod.GetLabels() != nil {
if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue {
// already have label
return nil, nil, ErrDubboLabelAlreadyExist
}
}
// copy current pod labels to oldPod && newPod
for k, v := range currentPod.GetLabels() {
oldPod.Labels[k] = v
newPod.Labels[k] = v
}
// assign new label for current pod
newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue
return oldPod, newPod, nil
}
// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
oldPod.Annotations = make(map[string]string, 8)
newPod.Annotations = make(map[string]string, 8)
for k, v := range currentPod.GetAnnotations() {
oldPod.Annotations[k] = v
newPod.Annotations[k] = v
}
al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
if err != nil {
err = perrors.WithMessage(err, "unmarshal record")
return
}
newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
}
newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
return
}
// getPatch
// get the kubernetes pod patch bytes
func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal old pod")
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal newPod pod")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil {
return nil, perrors.WithMessage(err, "create two-way-merge-patch")
}
return patchBytes, nil
}
// GetChildren
// get k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {
objectList, err := c.watcherSet.Get(k, true)
objectList, err := c.controller.watcherSet.Get(k, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
}
......@@ -609,7 +116,7 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
// watch on spec key
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.watcherSet.Watch(k, false)
w, err := c.controller.watcherSet.Watch(k, false)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
}
......@@ -621,7 +128,7 @@ func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error)
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.watcherSet.Watch(prefix, true)
w, err := c.controller.watcherSet.Watch(prefix, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
}
......@@ -641,7 +148,7 @@ func (c *Client) Valid() bool {
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.rawClient != nil
return c.controller != nil
}
// Done
......@@ -665,7 +172,6 @@ func (c *Client) Close() {
// the client ctx be canceled
// will trigger the watcherSet watchers all stopped
// so, just wait
c.wg.Wait()
}
// ValidateClient
......@@ -676,17 +182,36 @@ func ValidateClient(container clientFacade) error {
// new Client
if client == nil || client.Valid() {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
}
newClient, err := newClient(ns)
newClient, err := newClient(container.GetUrl())
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns)
logger.Warnf("new kubernetes client: %v)", err)
return perrors.WithMessage(err, "new kubernetes client")
}
container.SetClient(newClient)
}
return nil
}
// NewMockClient
// export for registry package test
func NewMockClient(podList *v1.PodList) (*Client, error) {
ctx, cancel := context.WithCancel(context.Background())
controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) {
return fake.NewSimpleClientset(podList), nil
})
if err != nil {
return nil, perrors.WithMessage(err, "new dubbo-registry controller")
}
c := &Client{
ctx: ctx,
cancel: cancel,
controller: controller,
}
c.controller.startALLInformers()
return c, nil
}
......@@ -20,20 +20,15 @@ package kubernetes
import (
"encoding/json"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"strings"
"sync"
"testing"
"time"
)
import (
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
// tests dataset
......@@ -64,220 +59,203 @@ var tests = []struct {
// test dataset prefix
const prefix = "name"
var clientPodJsonData = `{
var clientPodListJsonData = `{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"annotations": {
"dubbo.io/annotation": "W3siayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzL2NvbnN1bWVyJTNBJTJGJTJGMTcyLjE3LjAuOCUyRlVzZXJQcm92aWRlciUzRmNhdGVnb3J5JTNEY29uc3VtZXJzJTI2ZHViYm8lM0RkdWJib2dvLWNvbnN1bWVyLTIuNi4wJTI2cHJvdG9jb2wlM0RkdWJibyIsInYiOiIifV0="
},
"creationTimestamp": "2020-03-13T03:38:57Z",
"labels": {
"dubbo.io/label": "dubbo.io-value"
},
"name": "client",
"namespace": "default",
"resourceVersion": "2449700",
"selfLink": "/api/v1/namespaces/default/pods/client",
"uid": "3ec394f5-dcc6-49c3-8061-57b4b2b41344"
},
"spec": {
"containers": [
{
"env": [
"items": [
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"annotations": {
"dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5saW1pdCUzRCUyNmV4ZWN1dGUubGltaXQucmVqZWN0ZWQuaGFuZGxlciUzRCUyNmdyb3VwJTNEJTI2aW50ZXJmYWNlJTNEY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyJTI2aXAlM0QxNzIuMTcuMC42JTI2bG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIubG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIucmV0cmllcyUzRDElMjZtZXRob2RzLkdldFVzZXIudHBzLmxpbWl0LmludGVydmFsJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5yYXRlJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5zdHJhdGVneSUzRCUyNm1ldGhvZHMuR2V0VXNlci53ZWlnaHQlM0QwJTI2bW9kdWxlJTNEZHViYm9nbyUyQnVzZXItaW5mbyUyQnNlcnZlciUyNm5hbWUlM0RCRFRTZXJ2aWNlJTI2b3JnYW5pemF0aW9uJTNEaWt1cmVudG8uY29tJTI2b3duZXIlM0RaWCUyNnBhcmFtLnNpZ24lM0QlMjZwaWQlM0Q2JTI2cmVnaXN0cnkucm9sZSUzRDMlMjZyZWxlYXNlJTNEZHViYm8tZ29sYW5nLTEuMy4wJTI2cmV0cmllcyUzRCUyNnNlcnZpY2UuZmlsdGVyJTNEZWNobyUyNTJDdG9rZW4lMjUyQ2FjY2Vzc2xvZyUyNTJDdHBzJTI1MkNnZW5lcmljX3NlcnZpY2UlMjUyQ2V4ZWN1dGUlMjUyQ3BzaHV0ZG93biUyNnNpZGUlM0Rwcm92aWRlciUyNnRpbWVzdGFtcCUzRDE1OTExNTYxNTUlMjZ0cHMubGltaXQuaW50ZXJ2YWwlM0QlMjZ0cHMubGltaXQucmF0ZSUzRCUyNnRwcy5saW1pdC5yZWplY3RlZC5oYW5kbGVyJTNEJTI2dHBzLmxpbWl0LnN0cmF0ZWd5JTNEJTI2dHBzLmxpbWl0ZXIlM0QlMjZ2ZXJzaW9uJTNEJTI2d2FybXVwJTNEMTAwIiwidiI6IiJ9XQ=="
},
"creationTimestamp": "2020-06-03T03:49:14Z",
"generateName": "server-84c864f5bc-",
"labels": {
"dubbo.io/label": "dubbo.io-value",
"pod-template-hash": "84c864f5bc",
"role": "server"
},
"name": "server-84c864f5bc-r8qvz",
"namespace": "default",
"ownerReferences": [
{
"apiVersion": "apps/v1",
"blockOwnerDeletion": true,
"controller": true,
"kind": "ReplicaSet",
"name": "server-84c864f5bc",
"uid": "fa376dbb-4f37-4705-8e80-727f592c19b3"
}
],
"resourceVersion": "517460",
"selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz",
"uid": "f4fc811c-200c-4445-8d4f-532144957dcc"
},
"spec": {
"containers": [
{
"name": "NAMESPACE",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "metadata.namespace"
"env": [
{
"name": "DUBBO_NAMESPACE",
"value": "default"
},
{
"name": "NAMESPACE",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "metadata.namespace"
}
}
}
}
],
"image": "192.168.240.101:5000/scott/go-server",
"imagePullPolicy": "Always",
"name": "server",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"volumeMounts": [
{
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
"name": "dubbo-sa-token-5qbtb",
"readOnly": true
}
]
}
],
"image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client",
"imagePullPolicy": "Always",
"name": "client",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"volumeMounts": [
"dnsPolicy": "ClusterFirst",
"enableServiceLinks": true,
"nodeName": "minikube",
"priority": 0,
"restartPolicy": "Always",
"schedulerName": "default-scheduler",
"securityContext": {},
"serviceAccount": "dubbo-sa",
"serviceAccountName": "dubbo-sa",
"terminationGracePeriodSeconds": 30,
"tolerations": [
{
"effect": "NoExecute",
"key": "node.kubernetes.io/not-ready",
"operator": "Exists",
"tolerationSeconds": 300
},
{
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
"name": "dubbo-sa-token-l2lzh",
"readOnly": true
"effect": "NoExecute",
"key": "node.kubernetes.io/unreachable",
"operator": "Exists",
"tolerationSeconds": 300
}
],
"volumes": [
{
"name": "dubbo-sa-token-5qbtb",
"secret": {
"defaultMode": 420,
"secretName": "dubbo-sa-token-5qbtb"
}
}
]
}
],
"dnsPolicy": "ClusterFirst",
"enableServiceLinks": true,
"nodeName": "minikube",
"priority": 0,
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"serviceAccount": "dubbo-sa",
"serviceAccountName": "dubbo-sa",
"terminationGracePeriodSeconds": 30,
"tolerations": [
{
"effect": "NoExecute",
"key": "node.kubernetes.io/not-ready",
"operator": "Exists",
"tolerationSeconds": 300
},
{
"effect": "NoExecute",
"key": "node.kubernetes.io/unreachable",
"operator": "Exists",
"tolerationSeconds": 300
}
],
"volumes": [
{
"name": "dubbo-sa-token-l2lzh",
"secret": {
"defaultMode": 420,
"secretName": "dubbo-sa-token-l2lzh"
}
}
]
},
"status": {
"conditions": [
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:38:57Z",
"status": "True",
"type": "Initialized"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:40:18Z",
"status": "True",
"type": "Ready"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:40:18Z",
"status": "True",
"type": "ContainersReady"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:38:57Z",
"status": "True",
"type": "PodScheduled"
}
],
"containerStatuses": [
{
"containerID": "docker://2870d6abc19ca7fe22ca635ebcfac5d48c6d5550a659bafd74fb48104f6dfe3c",
"image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client:latest",
"imageID": "docker-pullable://registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client@sha256:1f075131f708a0d400339e81549d7c4d4ed917ab0b6bd38ef458dd06ad25a559",
"lastState": {},
"name": "client",
"ready": true,
"restartCount": 0,
"state": {
"running": {
"startedAt": "2020-03-13T03:40:17Z"
"status": {
"conditions": [
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:14Z",
"status": "True",
"type": "Initialized"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:15Z",
"status": "True",
"type": "Ready"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:15Z",
"status": "True",
"type": "ContainersReady"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-06-03T03:49:14Z",
"status": "True",
"type": "PodScheduled"
}
],
"containerStatuses": [
{
"containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406",
"image": "192.168.240.101:5000/scott/go-server:latest",
"imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461",
"lastState": {},
"name": "server",
"ready": true,
"restartCount": 0,
"started": true,
"state": {
"running": {
"startedAt": "2020-06-03T03:49:15Z"
}
}
}
],
"hostIP": "10.0.2.15",
"phase": "Running",
"podIP": "172.17.0.6",
"podIPs": [
{
"ip": "172.17.0.6"
}
}
],
"qosClass": "BestEffort",
"startTime": "2020-06-03T03:49:14Z"
}
],
"hostIP": "10.0.2.15",
"phase": "Running",
"podIP": "172.17.0.8",
"qosClass": "BestEffort",
"startTime": "2020-03-13T03:38:57Z"
}
],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": ""
}
}
`
type KubernetesClientTestSuite struct {
suite.Suite
currentPod v1.Pod
}
func (s *KubernetesClientTestSuite) initClient() *Client {
t := s.T()
client, err := newMockClient(s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) {
out := fake.NewSimpleClientset()
// mock current pod
if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil {
t.Fatal(err)
}
return out, nil
})
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
return client
}
func (s *KubernetesClientTestSuite) SetupSuite() {
runtime.GOMAXPROCS(1)
t := s.T()
func getTestClient(t *testing.T) *Client {
pl := &v1.PodList{}
// 1. install test data
if err := json.Unmarshal([]byte(clientPodJsonData), &s.currentPod); err != nil {
if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil {
t.Fatal(err)
}
currentPod := pl.Items[0]
// 2. set downward-api inject env
if err := os.Setenv(podNameKey, s.currentPod.GetName()); err != nil {
t.Fatal(err)
}
if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil {
t.Fatal(err)
}
go http.ListenAndServe(":6061", nil)
}
func (s *KubernetesClientTestSuite) TestReadCurrentPodName() {
t := s.T()
n, err := getCurrentPodName()
if err != nil {
t.Fatal(err)
env := map[string]string{
nameSpaceKey: currentPod.GetNamespace(),
podNameKey: currentPod.GetName(),
needWatchedNameSpaceKey: "default",
}
if n != s.currentPod.GetName() {
t.Fatalf("expect %s but got %s", s.currentPod.GetName(), n)
for k, v := range env {
if err := os.Setenv(k, v); err != nil {
t.Fatal(err)
}
}
}
func (s *KubernetesClientTestSuite) TestReadCurrentNameSpace() {
t := s.T()
ns, err := getCurrentNameSpace()
client, err := NewMockClient(pl)
if err != nil {
t.Fatal(err)
}
if ns != s.currentPod.GetNamespace() {
t.Fatalf("expect %s but got %s", s.currentPod.GetNamespace(), ns)
}
return client
}
func (s *KubernetesClientTestSuite) TestClientValid() {
t := s.T()
func TestClientValid(t *testing.T) {
client := s.initClient()
client := getTestClient(t)
defer client.Close()
if client.Valid() != true {
......@@ -290,29 +268,24 @@ func (s *KubernetesClientTestSuite) TestClientValid() {
}
}
func (s *KubernetesClientTestSuite) TestClientDone() {
t := s.T()
func TestClientDone(t *testing.T) {
client := s.initClient()
client := getTestClient(t)
go func() {
time.Sleep(time.Second)
client.Close()
}()
<-client.Done()
if client.Valid() == true {
t.Fatal("client should be invalid then")
t.Fatal("client should be invalid")
}
}
func (s *KubernetesClientTestSuite) TestClientCreateKV() {
func TestClientCreateKV(t *testing.T) {
t := s.T()
client := s.initClient()
client := getTestClient(t)
defer client.Close()
for _, tc := range tests {
......@@ -327,11 +300,9 @@ func (s *KubernetesClientTestSuite) TestClientCreateKV() {
}
}
func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() {
t := s.T()
func TestClientGetChildrenKVList(t *testing.T) {
client := s.initClient()
client := getTestClient(t)
defer client.Close()
wg := sync.WaitGroup{}
......@@ -407,11 +378,9 @@ func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() {
}
func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
func TestClientWatchPrefix(t *testing.T) {
t := s.T()
client := s.initClient()
client := getTestClient(t)
wg := sync.WaitGroup{}
wg.Add(1)
......@@ -452,22 +421,9 @@ func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
client.Close()
}
func (s *KubernetesClientTestSuite) TestNewClient() {
t := s.T()
_, err := newClient(s.currentPod.GetNamespace())
if err == nil {
t.Fatal("the out of cluster test should fail")
}
}
func (s *KubernetesClientTestSuite) TestClientWatch() {
t := s.T()
func TestClientWatch(t *testing.T) {
client := s.initClient()
client := getTestClient(t)
wg := sync.WaitGroup{}
wg.Add(1)
......@@ -507,7 +463,3 @@ func (s *KubernetesClientTestSuite) TestClientWatch() {
client.Close()
}
func TestKubernetesClient(t *testing.T) {
suite.Run(t, new(KubernetesClientTestSuite))
}
......@@ -17,7 +17,10 @@
package kubernetes
import "github.com/apache/dubbo-go/common"
type clientFacade interface {
Client() *Client
SetClient(*Client)
common.Node
}
......@@ -18,14 +18,18 @@
package kubernetes
import (
"strconv"
"sync"
"testing"
)
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
type mockFacade struct {
*common.URL
client *Client
cltLock sync.Mutex
done chan struct{}
......@@ -39,25 +43,31 @@ func (r *mockFacade) SetClient(client *Client) {
r.client = client
}
func (s *KubernetesClientTestSuite) Test_Facade() {
func (r *mockFacade) GetUrl() common.URL {
return *r.URL
}
t := s.T()
func (r *mockFacade) Destroy() {
}
mockClient, err := newMockClient(s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) {
func (r *mockFacade) RestartCallBack() bool {
return true
}
out := fake.NewSimpleClientset()
func (r *mockFacade) IsAvailable() bool {
return true
}
func Test_Facade(t *testing.T) {
// mock current pod
if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil {
t.Fatal(err)
}
return out, nil
})
regUrl, err := common.NewURL("registry://127.0.0.1:443",
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)))
if err != nil {
t.Fatal(err)
}
mockClient := getTestClient(t)
m := &mockFacade{
URL: &regUrl,
client: mockClient,
}
......
......@@ -81,8 +81,6 @@ func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.
}
}
}
return false
}
// return true mean the event type is DELETE
......
......@@ -18,7 +18,7 @@
package kubernetes
import (
"time"
"testing"
)
import (
......@@ -67,9 +67,7 @@ func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
return true
}
func (s *KubernetesClientTestSuite) TestListener() {
t := s.T()
func TestListener(t *testing.T) {
var tests = []struct {
input struct {
......@@ -83,15 +81,13 @@ func (s *KubernetesClientTestSuite) TestListener() {
}{k: "/dubbo", v: changedData}},
}
c := s.initClient()
c := getTestClient(t)
defer c.Close()
listener := NewEventListener(c)
dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
listener.ListenServiceEvent("/dubbo", dataListener)
// NOTICE: direct listen will lose create msg
time.Sleep(time.Second)
for _, tc := range tests {
k := tc.input.k
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
informerscorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
const (
// kubernetes inject env var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
// all pod annotation key
DubboIOAnnotationKey = "dubbo.io/annotation"
// all pod label key and value pair
DubboIOLabelKey = "dubbo.io/label"
DubboIOConsumerLabelValue = "dubbo.io.consumer"
DubboIOProviderLabelValue = "dubbo.io.provider"
// kubernetes suggest resync
defaultResync = 5 * time.Minute
)
var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)
// dubboRegistryController
// work like a kubernetes controller
type dubboRegistryController struct {
// clone from client
// manage lifecycle
ctx context.Context
role common.RoleType
// protect patch current pod operation
lock sync.Mutex
// current pod config
needWatchedNamespace map[string]struct{}
namespace string
name string
watcherSet WatcherSet
// kubernetes
kc kubernetes.Interface
listAndWatchStartResourceVersion uint64
namespacedInformerFactory map[string]informers.SharedInformerFactory
namespacedPodInformers map[string]informerscorev1.PodInformer
queue workqueue.Interface //shared by namespaced informers
}
func newDubboRegistryController(
ctx context.Context,
// different provider and consumer have behavior
roleType common.RoleType,
// used to inject mock kubernetes client
kcGetter func() (kubernetes.Interface, error),
) (*dubboRegistryController, error) {
kc, err := kcGetter()
if err != nil {
return nil, perrors.WithMessage(err, "get kubernetes client")
}
c := &dubboRegistryController{
ctx: ctx,
role: roleType,
watcherSet: newWatcherSet(ctx),
needWatchedNamespace: make(map[string]struct{}),
namespacedInformerFactory: make(map[string]informers.SharedInformerFactory),
namespacedPodInformers: make(map[string]informerscorev1.PodInformer),
kc: kc,
}
if err := c.readConfig(); err != nil {
return nil, perrors.WithMessage(err, "read config")
}
if err := c.initCurrentPod(); err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watch set")
}
if err := c.initPodInformer(); err != nil {
return nil, perrors.WithMessage(err, "init pod informer")
}
go c.run()
return c, nil
}
// GetInClusterKubernetesClient
// current pod running in kubernetes-cluster
func GetInClusterKubernetesClient() (kubernetes.Interface, error) {
// read in-cluster config
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, perrors.WithMessage(err, "get in-cluster config")
}
return kubernetes.NewForConfig(cfg)
}
// initWatchSet
// 1. get all with dubbo label pods
// 2. put every element to watcherSet
// 3. refresh watch book-mark
func (c *dubboRegistryController) initWatchSet() error {
req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
if err != nil {
return perrors.WithMessage(err, "new requirement")
}
for ns := range c.needWatchedNamespace {
pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: req.String(),
})
if err != nil {
return perrors.WithMessagef(err, "list pods in namespace (%s)", ns)
}
for _, p := range pods.Items {
// set resource version
rv, err := strconv.ParseUint(p.GetResourceVersion(), 10, 0)
if err != nil {
return perrors.WithMessagef(err, "parse resource version %s", p.GetResourceVersion())
}
if c.listAndWatchStartResourceVersion < rv {
c.listAndWatchStartResourceVersion = rv
}
c.handleWatchedPodEvent(&p, watch.Added)
}
}
return nil
}
// read dubbo-registry controller config
// 1. current pod name
// 2. current pod working namespace
func (c *dubboRegistryController) readConfig() error {
// read current pod name && namespace
c.name = os.Getenv(podNameKey)
if len(c.name) == 0 {
return perrors.New("read value from env by key (HOSTNAME)")
}
c.namespace = os.Getenv(nameSpaceKey)
if len(c.namespace) == 0 {
return perrors.New("read value from env by key (NAMESPACE)")
}
return nil
}
func (c *dubboRegistryController) initNamespacedPodInformer(ns string) error {
req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
if err != nil {
return perrors.WithMessage(err, "new requirement")
}
informersFactory := informers.NewSharedInformerFactoryWithOptions(
c.kc,
defaultResync,
informers.WithNamespace(ns),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = req.String()
options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10)
}),
)
podInformer := informersFactory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addPod,
UpdateFunc: c.updatePod,
DeleteFunc: c.deletePod,
})
c.namespacedInformerFactory[ns] = informersFactory
c.namespacedPodInformers[ns] = podInformer
return nil
}
func (c *dubboRegistryController) initPodInformer() error {
if c.role == common.PROVIDER {
return nil
}
// read need watched namespaces list
needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
if len(needWatchedNameSpaceList) == 0 {
return perrors.New("read value from env by key (DUBBO_NAMESPACE)")
}
for _, ns := range strings.Split(needWatchedNameSpaceList, ",") {
c.needWatchedNamespace[ns] = struct{}{}
}
// current work namespace should be watched
c.needWatchedNamespace[c.namespace] = struct{}{}
c.queue = workqueue.New()
// init all watch needed pod-informer
for watchedNS := range c.needWatchedNamespace {
if err := c.initNamespacedPodInformer(watchedNS); err != nil {
return err
}
}
return nil
}
type kubernetesEvent struct {
p *v1.Pod
t watch.EventType
}
func (c *dubboRegistryController) addPod(obj interface{}) {
p, ok := obj.(*v1.Pod)
if !ok {
logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
return
}
c.queue.Add(&kubernetesEvent{
t: watch.Added,
p: p,
})
}
func (c *dubboRegistryController) updatePod(oldObj, newObj interface{}) {
op, ok := oldObj.(*v1.Pod)
if !ok {
logger.Warnf("pod-informer got object %T not *v1.Pod", oldObj)
return
}
np, ok := newObj.(*v1.Pod)
if !ok {
logger.Warnf("pod-informer got object %T not *v1.Pod", newObj)
return
}
if op.GetResourceVersion() == np.GetResourceVersion() {
return
}
c.queue.Add(&kubernetesEvent{
p: np,
t: watch.Modified,
})
}
func (c *dubboRegistryController) deletePod(obj interface{}) {
p, ok := obj.(*v1.Pod)
if !ok {
logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
return
}
c.queue.Add(&kubernetesEvent{
p: p,
t: watch.Deleted,
})
}
func (c *dubboRegistryController) startALLInformers() {
logger.Debugf("starting namespaced informer-factory")
for _, factory := range c.namespacedInformerFactory {
go factory.Start(c.ctx.Done())
}
}
// run
// controller process every event in work-queue
func (c *dubboRegistryController) run() {
if c.role == common.PROVIDER {
return
}
defer logger.Warn("dubbo registry controller work stopped")
defer c.queue.ShutDown()
for ns, podInformer := range c.namespacedPodInformers {
if !cache.WaitForCacheSync(c.ctx.Done(), podInformer.Informer().HasSynced) {
logger.Errorf("wait for cache sync finish @namespace %s fail", ns)
return
}
}
logger.Infof("kubernetes registry-controller running @Namespace = %q @PodName = %q", c.namespace, c.name)
// start work
go c.work()
// block wait context cancel
<-c.ctx.Done()
}
func (c *dubboRegistryController) work() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem process work-queue elements
func (c *dubboRegistryController) processNextWorkItem() bool {
item, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(item)
o := item.(*kubernetesEvent)
c.handleWatchedPodEvent(o.p, o.t)
return true
}
// handleWatchedPodEvent
// handle watched pod event
func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName())
for ak, av := range p.GetAnnotations() {
// not dubbo interest annotation
if ak != DubboIOAnnotationKey {
continue
}
ol, err := c.unmarshalRecord(av)
if err != nil {
logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
return
}
for _, o := range ol {
switch eventType {
case watch.Added:
// if pod is added, the record always be create
o.EventType = Create
case watch.Modified:
o.EventType = Update
case watch.Deleted:
o.EventType = Delete
default:
logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
return
}
logger.Debugf("putting @key=%s @value=%s to watcherSet", o.Key, o.Value)
if err := c.watcherSet.Put(o); err != nil {
logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
return
}
}
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {
if len(record) == 0 {
// []*WatcherEvent is nil.
return nil, nil
}
rawMsg, err := base64.URLEncoding.DecodeString(record)
if err != nil {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}
var out []*WatcherEvent
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
return out, nil
}
// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
func (c *dubboRegistryController) initCurrentPod() error {
currentPod, err := c.readCurrentPod()
if err != nil {
return perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
}
oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
if err != nil {
if err == ErrDubboLabelAlreadyExist {
return nil
}
return perrors.WithMessage(err, "assemble dubbo label")
}
// current pod don't have label
p, err := c.getPatch(oldPod, newPod)
if err != nil {
return perrors.WithMessage(err, "get patch")
}
currentPod, err = c.patchCurrentPod(p)
if err != nil {
return perrors.WithMessage(err, "patch to current pod")
}
return nil
}
// patch current pod
// write new meta for current pod
func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch)
if err != nil {
return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
}
return updatedPod, nil
}
// assemble the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) {
var (
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
)
oldPod.Labels = make(map[string]string, 8)
newPod.Labels = make(map[string]string, 8)
if p.GetLabels() != nil {
if _, ok := p.GetLabels()[DubboIOLabelKey]; ok {
// already have label
return nil, nil, ErrDubboLabelAlreadyExist
}
}
// copy current pod labels to oldPod && newPod
for k, v := range p.GetLabels() {
oldPod.Labels[k] = v
newPod.Labels[k] = v
}
// assign new label for current pod
switch c.role {
case common.CONSUMER:
newPod.Labels[DubboIOLabelKey] = DubboIOConsumerLabelValue
case common.PROVIDER:
newPod.Labels[DubboIOLabelKey] = DubboIOProviderLabelValue
default:
return nil, nil, perrors.New(fmt.Sprintf("unknown role %s", c.role))
}
return oldPod, newPod, nil
}
// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
oldPod.Annotations = make(map[string]string, 8)
newPod.Annotations = make(map[string]string, 8)
for k, v := range currentPod.GetAnnotations() {
oldPod.Annotations[k] = v
newPod.Annotations[k] = v
}
al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
if err != nil {
err = perrors.WithMessage(err, "unmarshal record")
return
}
newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
}
newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
return
}
// getPatch
// get the kubernetes pod patch bytes
func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal old pod")
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal newPod pod")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil {
return nil, perrors.WithMessage(err, "create two-way-merge-patch")
}
return patchBytes, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
return "", perrors.WithMessage(err, "json encode object list")
}
return base64.URLEncoding.EncodeToString(msg), nil
}
// read from kubernetes-env current pod status
func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
}
return currentPod, nil
}
// add annotation for current pod
func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error {
c.lock.Lock()
defer c.lock.Unlock()
// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
// 2. get patch data
// 3. PATCH the pod
currentPod, err := c.readCurrentPod()
if err != nil {
return perrors.WithMessage(err, "read current pod")
}
oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
if err != nil {
return perrors.WithMessage(err, "assemble")
}
patchBytes, err := c.getPatch(oldPod, newPod)
if err != nil {
return perrors.WithMessage(err, "get patch")
}
_, err = c.patchCurrentPod(patchBytes)
if err != nil {
return perrors.WithMessage(err, "patch current pod")
}
return c.watcherSet.Put(&WatcherEvent{
Key: k,
Value: v,
EventType: Create,
})
}
......@@ -71,6 +71,7 @@ type WatcherEvent struct {
}
// Watchable WatcherSet
// thread-safe
type WatcherSet interface {
// put the watch event to the watch set
......@@ -149,7 +150,7 @@ func (s *watcherSetImpl) Done() <-chan struct{} {
// put the watch event to watcher-set
func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
sendMsg := func(object *WatcherEvent, w *watcher) {
blockSendMsg := func(object *WatcherEvent, w *watcher) {
select {
case <-w.done():
......@@ -167,40 +168,40 @@ func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
}
// put to watcher-set
if watcherEvent.EventType == Delete {
switch watcherEvent.EventType {
case Delete:
// delete from store
delete(s.cache, watcherEvent.Key)
} else {
old, ok := s.cache[watcherEvent.Key]
if ok {
if old.Value == watcherEvent.Value {
// already have this k/v pair
return nil
}
case Update, Create:
o, ok := s.cache[watcherEvent.Key]
if !ok {
// pod update, but create new k/v pair
watcherEvent.EventType = Create
s.cache[watcherEvent.Key] = watcherEvent
break
}
// refresh the watcherEvent
// k/v pair already latest
if o.Value == watcherEvent.Value {
return nil
}
// update to latest status
s.cache[watcherEvent.Key] = watcherEvent
}
// notify watcher
for _, w := range s.watchers {
w := w
if !strings.Contains(watcherEvent.Key, w.interested.key) {
// this watcher no interest in this element
continue
}
if !w.interested.prefix {
if watcherEvent.Key == w.interested.key {
go sendMsg(watcherEvent, w)
blockSendMsg(watcherEvent, w)
}
// not interest
continue
}
go sendMsg(watcherEvent, w)
blockSendMsg(watcherEvent, w)
}
return nil
}
......
......@@ -30,7 +30,7 @@ type DataListener interface {
// event type
//////////////////////////////////////////
// EventType ...
// SourceObjectEventType ...
type EventType int
const (
......
......@@ -106,7 +106,6 @@ func TestCreateDelete(t *testing.T) {
assert.NoError(t, err)
err2 := z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err2)
//verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
}
func TestRegisterTemp(t *testing.T) {
......
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment