Skip to content
Snippets Groups Projects
Unverified Commit e8f75269 authored by Ming Deng's avatar Ming Deng Committed by GitHub
Browse files

Merge pull request #400 from sxllwx/k8s_merge

Add: kubernetes registry and remote package unit test
parents 36fdb012 e48b6989
No related branches found
No related tags found
No related merge requests found
Showing
with 2897 additions and 32 deletions
......@@ -57,6 +57,10 @@ require (
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
k8s.io/api v0.0.0-20190325185214-7544f9db76f6
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841
k8s.io/client-go v8.0.0+incompatible
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
)
go 1.13
This diff is collapsed.
......@@ -38,9 +38,9 @@ type dataListener struct {
listener config_center.ConfigurationListener
}
// NewRegistryDataListener ...
// NewRegistryDataListener
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
return &dataListener{listener: listener}
}
func (l *dataListener) AddInterestedURL(url *common.URL) {
......@@ -49,7 +49,12 @@ func (l *dataListener) AddInterestedURL(url *common.URL) {
func (l *dataListener) DataChange(eventType remoting.Event) bool {
url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
index := strings.Index(eventType.Path, "/providers/")
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
......@@ -68,7 +73,6 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
return true
}
}
return false
}
......@@ -97,7 +101,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
case e := <-l.events:
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
......
......@@ -18,10 +18,9 @@
package etcdv3
import (
"os"
"testing"
"time"
"github.com/apache/dubbo-go/config_center"
)
import (
......@@ -32,6 +31,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
......@@ -40,13 +40,16 @@ type RegistryTestSuite struct {
etcd *embed.Etcd
}
const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-registry.etcd"
// start etcd server
func (suite *RegistryTestSuite) SetupSuite() {
t := suite.T()
cfg := embed.NewConfig()
cfg.Dir = "/tmp/default.etcd"
// avoid conflict with default etcd work-dir
cfg.Dir = defaultEtcdV3WorkDir
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
......@@ -66,6 +69,10 @@ func (suite *RegistryTestSuite) SetupSuite() {
// stop etcd server
func (suite *RegistryTestSuite) TearDownSuite() {
suite.etcd.Close()
// clean the etcd workdir
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
suite.FailNow(err.Error())
}
}
func (suite *RegistryTestSuite) TestDataChange() {
......
......@@ -98,7 +98,7 @@ func (suite *RegistryTestSuite) TestSubscribe() {
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
}
func (suite *RegistryTestSuite) TestConsumerDestory() {
func (suite *RegistryTestSuite) TestConsumerDestroy() {
t := suite.T()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
......@@ -117,7 +117,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() {
}
func (suite *RegistryTestSuite) TestProviderDestory() {
func (suite *RegistryTestSuite) TestProviderDestroy() {
t := suite.T()
reg := initRegistry(t)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"strings"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
type dataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
}
// NewRegistryDataListener
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}
// AddInterestedURL
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
// DataChange
// notify listen, when interest event
func (l *dataListener) DataChange(eventType remoting.Event) bool {
index := strings.Index(eventType.Path, "/providers/")
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
ConfigType: eventType.Action,
},
)
return true
}
}
return false
}
type configurationListener struct {
registry *kubernetesRegistry
events chan *config_center.ConfigChangeEvent
}
// NewConfigurationListener for listening the event of kubernetes.
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.Done():
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
return nil, perrors.New("listener stopped")
case e := <-l.events:
logger.Infof("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"encoding/json"
"os"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
var clientPodJsonData = `{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"annotations": {
"dubbo.io/annotation": "W3siayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzL2NvbnN1bWVyJTNBJTJGJTJGMTcyLjE3LjAuOCUyRlVzZXJQcm92aWRlciUzRmNhdGVnb3J5JTNEY29uc3VtZXJzJTI2ZHViYm8lM0RkdWJib2dvLWNvbnN1bWVyLTIuNi4wJTI2cHJvdG9jb2wlM0RkdWJibyIsInYiOiIifV0="
},
"creationTimestamp": "2020-03-13T03:38:57Z",
"labels": {
"dubbo.io/label": "dubbo.io-value"
},
"name": "client",
"namespace": "default",
"resourceVersion": "2449700",
"selfLink": "/api/v1/namespaces/default/pods/client",
"uid": "3ec394f5-dcc6-49c3-8061-57b4b2b41344"
},
"spec": {
"containers": [
{
"env": [
{
"name": "NAMESPACE",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "metadata.namespace"
}
}
}
],
"image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client",
"imagePullPolicy": "Always",
"name": "client",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"volumeMounts": [
{
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
"name": "dubbo-sa-token-l2lzh",
"readOnly": true
}
]
}
],
"dnsPolicy": "ClusterFirst",
"enableServiceLinks": true,
"nodeName": "minikube",
"priority": 0,
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"serviceAccount": "dubbo-sa",
"serviceAccountName": "dubbo-sa",
"terminationGracePeriodSeconds": 30,
"tolerations": [
{
"effect": "NoExecute",
"key": "node.kubernetes.io/not-ready",
"operator": "Exists",
"tolerationSeconds": 300
},
{
"effect": "NoExecute",
"key": "node.kubernetes.io/unreachable",
"operator": "Exists",
"tolerationSeconds": 300
}
],
"volumes": [
{
"name": "dubbo-sa-token-l2lzh",
"secret": {
"defaultMode": 420,
"secretName": "dubbo-sa-token-l2lzh"
}
}
]
},
"status": {
"conditions": [
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:38:57Z",
"status": "True",
"type": "Initialized"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:40:18Z",
"status": "True",
"type": "Ready"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:40:18Z",
"status": "True",
"type": "ContainersReady"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:38:57Z",
"status": "True",
"type": "PodScheduled"
}
],
"containerStatuses": [
{
"containerID": "docker://2870d6abc19ca7fe22ca635ebcfac5d48c6d5550a659bafd74fb48104f6dfe3c",
"image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client:latest",
"imageID": "docker-pullable://registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client@sha256:1f075131f708a0d400339e81549d7c4d4ed917ab0b6bd38ef458dd06ad25a559",
"lastState": {},
"name": "client",
"ready": true,
"restartCount": 0,
"state": {
"running": {
"startedAt": "2020-03-13T03:40:17Z"
}
}
}
],
"hostIP": "10.0.2.15",
"phase": "Running",
"podIP": "172.17.0.8",
"qosClass": "BestEffort",
"startTime": "2020-03-13T03:38:57Z"
}
}
`
func Test_DataChange(t *testing.T) {
listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"})
assert.Equal(t, true, int)
}
type MockDataListener struct{}
func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {}
type KubernetesRegistryTestSuite struct {
suite.Suite
currentPod v1.Pod
}
func (s *KubernetesRegistryTestSuite) initRegistry() *kubernetesRegistry {
t := s.T()
regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
mock, err := newMockKubernetesRegistry(&regurl, s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) {
out := fake.NewSimpleClientset()
// mock current pod
if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil {
t.Fatal(err)
}
return out, nil
})
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
return mock.(*kubernetesRegistry)
}
func (s *KubernetesRegistryTestSuite) SetupSuite() {
t := s.T()
const (
// kubernetes inject the var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
)
// 1. install test data
if err := json.Unmarshal([]byte(clientPodJsonData), &s.currentPod); err != nil {
t.Fatal(err)
}
// 2. set downward-api inject env
if err := os.Setenv(podNameKey, s.currentPod.GetName()); err != nil {
t.Fatal(err)
}
if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil {
t.Fatal(err)
}
}
func (s *KubernetesRegistryTestSuite) TestDataChange() {
t := s.T()
listener := NewRegistryDataListener(&MockDataListener{})
url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100")
listener.AddInterestedURL(&url)
if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) {
t.Fatal("data change not ok")
}
}
func TestKubernetesRegistrySuite(t *testing.T) {
suite.Run(t, &KubernetesRegistryTestSuite{})
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"fmt"
"os"
"path"
"strings"
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
k8s "k8s.io/client-go/kubernetes"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/kubernetes"
)
var (
processID = ""
localIP = ""
)
const (
Name = "kubernetes"
ConnDelay = 3
MaxFailTimes = 15
)
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = gxnet.GetLocalIP()
extension.SetRegistry(Name, newKubernetesRegistry)
}
type kubernetesRegistry struct {
registry.BaseRegistry
cltLock sync.RWMutex
client *kubernetes.Client
listenerLock sync.Mutex
listener *kubernetes.EventListener
dataListener *dataListener
configListener *configurationListener
}
func (r *kubernetesRegistry) Client() *kubernetes.Client {
r.cltLock.RLock()
client := r.client
r.cltLock.RUnlock()
return client
}
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.cltLock.Lock()
r.client = client
r.cltLock.Unlock()
}
func (r *kubernetesRegistry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}
func (r *kubernetesRegistry) CloseListener() {
r.cltLock.Lock()
l := r.configListener
r.cltLock.Unlock()
if l != nil {
l.Close()
}
r.configListener = nil
}
func (r *kubernetesRegistry) CreatePath(k string) error {
if err := r.client.Create(k, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in kubernetes", k)
}
return nil
}
func (r *kubernetesRegistry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
)
r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("kubernetes client broken")
}
r.listenerLock.Lock()
if r.listener == nil {
// double check
r.listener = kubernetes.NewEventListener(r.client)
}
r.listenerLock.Unlock()
}
//register the svc to dataListener
r.dataListener.AddInterestedURL(svc)
for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)
}
return configListener, nil
}
func (r *kubernetesRegistry) InitListeners() {
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
}
func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
// actually, kubernetes use in-cluster config,
r := &kubernetesRegistry{}
r.InitBaseRegistry(url, r)
if err := kubernetes.ValidateClient(r); err != nil {
return nil, perrors.WithStack(err)
}
r.WaitGroup().Add(1)
go r.HandleClientRestart()
r.InitListeners()
logger.Debugf("the kubernetes registry started")
return r, nil
}
func newMockKubernetesRegistry(
url *common.URL,
namespace string,
clientGeneratorFunc func() (k8s.Interface, error),
) (registry.Registry, error) {
var err error
r := &kubernetesRegistry{}
r.InitBaseRegistry(url, r)
r.client, err = kubernetes.NewMockClient(namespace, clientGeneratorFunc)
if err != nil {
return nil, perrors.WithMessage(err, "new mock client")
}
r.InitListeners()
return r, nil
}
func (r *kubernetesRegistry) HandleClientRestart() {
var (
err error
failTimes int
)
defer r.WaitGroup()
LOOP:
for {
select {
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.Client().Close()
r.SetClient(nil)
// try to connect to kubernetes,
failTimes = 0
for {
select {
case <-r.Done():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
}
err = kubernetes.ValidateClient(r)
logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"strconv"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
func (s *KubernetesRegistryTestSuite) TestRegister() {
t := s.T()
r := s.initRegistry()
defer r.Destroy()
url, _ := common.NewURL(
"dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
common.WithParamsValue(constant.CLUSTER_KEY, "mock"),
common.WithMethods([]string{"GetUser", "AddUser"}),
)
err := r.Register(url)
assert.NoError(t, err)
_, _, err = r.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
if err != nil {
t.Fatal(err)
}
}
func (s *KubernetesRegistryTestSuite) TestSubscribe() {
t := s.T()
r := s.initRegistry()
defer r.Destroy()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
listener, err := r.DoSubscribe(&url)
if err != nil {
t.Fatal(err)
}
time.Sleep(1e9)
go func() {
err := r.Register(url)
if err != nil {
t.Fatal(err)
}
}()
serviceEvent, err := listener.Next()
if err != nil {
t.Fatal(err)
}
t.Logf("got event %s", serviceEvent)
}
func (s *KubernetesRegistryTestSuite) TestConsumerDestroy() {
t := s.T()
r := s.initRegistry()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
common.WithParamsValue(constant.CLUSTER_KEY, "mock"),
common.WithMethods([]string{"GetUser", "AddUser"}))
_, err := r.DoSubscribe(&url)
if err != nil {
t.Fatal(err)
}
//listener.Close()
time.Sleep(1e9)
r.Destroy()
assert.Equal(t, false, r.IsAvailable())
}
func (s *KubernetesRegistryTestSuite) TestProviderDestroy() {
t := s.T()
r := s.initRegistry()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
common.WithParamsValue(constant.CLUSTER_KEY, "mock"),
common.WithMethods([]string{"GetUser", "AddUser"}))
err := r.Register(url)
assert.NoError(t, err)
time.Sleep(1e9)
r.Destroy()
assert.Equal(t, false, r.IsAvailable())
}
func (s *KubernetesRegistryTestSuite) TestNewRegistry() {
t := s.T()
regUrl, err := common.NewURL("registry://127.0.0.1:443",
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
if err != nil {
t.Fatal(err)
}
_, err = newKubernetesRegistry(&regUrl)
if err == nil {
t.Fatal("not in cluster, should be a err")
}
}
func (s *KubernetesRegistryTestSuite) TestHandleClientRestart() {
r := s.initRegistry()
r.WaitGroup().Add(1)
go r.HandleClientRestart()
time.Sleep(timeSecondDuration(1))
r.client.Close()
}
......@@ -43,7 +43,7 @@ type RegistryDataListener struct {
// NewRegistryDataListener ...
func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener {
return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
return &RegistryDataListener{listener: listener}
}
// AddInterestedURL ...
......@@ -65,13 +65,19 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(&config_center.ConfigChangeEvent{Value: serviceURL, ConfigType: eventType.Action})
l.listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
ConfigType: eventType.Action,
},
)
return true
}
}
return false
}
......
......@@ -18,8 +18,8 @@
package etcdv3
import (
"fmt"
"net/url"
"os"
"path"
"reflect"
"strings"
......@@ -37,6 +37,8 @@ import (
"google.golang.org/grpc/connectivity"
)
const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd"
// tests dataset
var tests = []struct {
input struct {
......@@ -92,7 +94,7 @@ func (suite *ClientTestSuite) SetupSuite() {
cfg := embed.NewConfig()
cfg.LPUrls = []url.URL{*lpurl}
cfg.LCUrls = []url.URL{*lcurl}
cfg.Dir = "/tmp/default.etcd"
cfg.Dir = defaultEtcdV3WorkDir
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
......@@ -112,6 +114,9 @@ func (suite *ClientTestSuite) SetupSuite() {
// stop etcd server
func (suite *ClientTestSuite) TearDownSuite() {
suite.etcd.Close()
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
suite.FailNow(err.Error())
}
}
func (suite *ClientTestSuite) setUpClient() *Client {
......@@ -135,8 +140,6 @@ func (suite *ClientTestSuite) SetupTest() {
func (suite *ClientTestSuite) TestClientClose() {
fmt.Println("called client close")
c := suite.client
t := suite.T()
......@@ -148,8 +151,6 @@ func (suite *ClientTestSuite) TestClientClose() {
func (suite *ClientTestSuite) TestClientValid() {
fmt.Println("called client valid")
c := suite.client
t := suite.T()
......
......@@ -53,7 +53,6 @@ func NewEventListener(client *Client) *EventListener {
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.Watch(key)
......@@ -138,8 +137,6 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin
// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.WatchWithPrefix(prefix)
......@@ -202,7 +199,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
keyList, valueList, err := l.client.getChildren(key)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}
logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
......@@ -217,12 +214,14 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider etcdv3 nodes", key)
l.wg.Add(1)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
logger.Infof("listen dubbo service key{%s}", key)
l.wg.Add(1)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"encoding/base64"
"encoding/json"
"os"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
const (
// kubernetes inject the var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
// all pod annotation key
DubboIOAnnotationKey = "dubbo.io/annotation"
DubboIOLabelKey = "dubbo.io/label"
DubboIOLabelValue = "dubbo.io-value"
)
var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
)
type Client struct {
// kubernetes connection config
cfg *rest.Config
// the kubernetes interface
rawClient kubernetes.Interface
// current pod config
currentPodName string
ns string
// current resource version
lastResourceVersion string
// the memory watcherSet
watcherSet WatcherSet
// protect the wg && currentPod
lock sync.RWMutex
// current pod status
currentPod *v1.Pod
// protect the watchPods loop && watcher
wg sync.WaitGroup
// manage the client lifecycle
ctx context.Context
cancel context.CancelFunc
}
// load CurrentPodName
func getCurrentPodName() (string, error) {
v := os.Getenv(podNameKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (HOSTNAME)")
}
return v, nil
}
// load CurrentNameSpace
func getCurrentNameSpace() (string, error) {
v := os.Getenv(nameSpaceKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (NAMESPACE)")
}
return v, nil
}
// NewMockClient
// export for registry package test
func NewMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
return newMockClient(namespace, mockClientGenerator)
}
// newMockClient
// new a client for test
func newMockClient(namespace string, mockClientGenerator func() (kubernetes.Interface, error)) (*Client, error) {
rawClient, err := mockClientGenerator()
if err != nil {
return nil, perrors.WithMessage(err, "call mock generator")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
rawClient: rawClient,
ctx: ctx,
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
c.lastResourceVersion = c.currentPod.GetResourceVersion()
// start kubernetes watch loop
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "watch pods")
}
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
}
// newClient
// new a client for registry
func newClient(namespace string) (*Client, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, perrors.WithMessage(err, "get in-cluster config")
}
rawClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, perrors.WithMessage(err, "new kubernetes client by in cluster config")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
watcherSet: newWatcherSet(ctx),
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the watcherSet by current pods
if err := c.initWatchSet(); err != nil {
return nil, perrors.WithMessage(err, "init watcherSet")
}
// start kubernetes watch loop
if err := c.watchPods(); err != nil {
return nil, perrors.WithMessage(err, "watch pods")
}
logger.Infof("init kubernetes registry client success @namespace = %q @Podname = %q", namespace, c.currentPod.Name)
return c, nil
}
// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
func (c *Client) initCurrentPod() (*v1.Pod, error) {
// read the current pod status
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
if err != nil {
if err != ErrDubboLabelAlreadyExist {
return nil, perrors.WithMessage(err, "assemble dubbo label")
}
// current pod don't have label
}
p, err := c.getPatch(oldPod, newPod)
if err != nil {
return nil, perrors.WithMessage(err, "get patch")
}
currentPod, err = c.patchCurrentPod(p)
if err != nil {
return nil, perrors.WithMessage(err, "patch to current pod")
}
return currentPod, nil
}
// initWatchSet
// 1. get all with dubbo label pods
// 2. put every element to watcherSet
func (c *Client) initWatchSet() error {
pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
})
if err != nil {
return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns)
}
// set resource version
c.lastResourceVersion = pods.GetResourceVersion()
for _, pod := range pods.Items {
logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations())
c.handleWatchedPodEvent(&pod, watch.Added)
}
return nil
}
// watchPods
// try to watch kubernetes pods
func (c *Client) watchPods() error {
// try once
watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: c.lastResourceVersion,
})
if err != nil {
return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns)
}
watcher.Stop()
c.wg.Add(1)
// add wg, grace close the client
go c.watchPodsLoop()
return nil
}
type resourceVersionGetter interface {
GetResourceVersion() string
}
// watchPods
// try to notify
func (c *Client) watchPodsLoop() {
defer func() {
// notify other goroutine, this loop over
c.wg.Done()
logger.Info("watchPodsLoop goroutine game over")
}()
for {
onceWatch:
wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: c.lastResourceVersion,
})
if err != nil {
logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err)
time.Sleep(2 * time.Second)
continue
}
logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", c.lastResourceVersion)
for {
select {
// double check ctx
case <-c.ctx.Done():
logger.Infof("the kubernetes client stopped, resultChan len %d", len(wc.ResultChan()))
return
// get one element from result-chan
case event, ok := <-wc.ResultChan():
if !ok {
wc.Stop()
logger.Info("kubernetes watch chan die, create new")
goto onceWatch
}
if event.Type == watch.Error {
// watched a error event
logger.Warnf("kubernetes watch api report err (%#v)", event)
continue
}
o, ok := event.Object.(resourceVersionGetter)
if !ok {
logger.Warnf("kubernetes response object not a versioned object, its real type %T", event.Object)
continue
}
// record the last resource version avoid to sync all pod
c.lastResourceVersion = o.GetResourceVersion()
logger.Infof("kubernetes get the current resource version %v", c.lastResourceVersion)
// check event object type
p, ok := event.Object.(*v1.Pod)
if !ok {
logger.Warnf("kubernetes response object not a Pod, its real type %T", event.Object)
continue
}
logger.Debugf("kubernetes got pod %#v", p)
// handle the watched pod
go c.handleWatchedPodEvent(p, event.Type)
}
}
}
}
// handleWatchedPodEvent
// handle watched pod event
func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
for ak, av := range p.GetAnnotations() {
// not dubbo interest annotation
if ak != DubboIOAnnotationKey {
continue
}
ol, err := c.unmarshalRecord(av)
if err != nil {
logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
return
}
for _, o := range ol {
switch eventType {
case watch.Added:
// if pod is added, the record always be create
o.EventType = Create
case watch.Modified:
o.EventType = Update
case watch.Deleted:
o.EventType = Delete
default:
logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
return
}
logger.Debugf("prepare to put object (%#v) to kubernetes-watcherSet", o)
if err := c.watcherSet.Put(o); err != nil {
logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
return
}
}
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *Client) unmarshalRecord(record string) ([]*WatcherEvent, error) {
if len(record) == 0 {
// []*WatcherEvent is nil.
return nil, nil
}
rawMsg, err := base64.URLEncoding.DecodeString(record)
if err != nil {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}
var out []*WatcherEvent
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
return out, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *Client) marshalRecord(ol []*WatcherEvent) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
return "", perrors.WithMessage(err, "json encode object list")
}
return base64.URLEncoding.EncodeToString(msg), nil
}
// readCurrentPod
// read the current pod status from kubernetes api
func (c *Client) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
return currentPod, nil
}
// Create
// create k/v pair in watcher-set
func (c *Client) Create(k, v string) error {
// the read current pod must be lock, protect every
// create operation can be atomic
c.lock.Lock()
defer c.lock.Unlock()
// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
// 2. get patch data
// 3. PATCH the pod
currentPod, err := c.readCurrentPod()
if err != nil {
return perrors.WithMessage(err, "read current pod")
}
oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
if err != nil {
return perrors.WithMessage(err, "assemble")
}
patchBytes, err := c.getPatch(oldPod, newPod)
if err != nil {
return perrors.WithMessage(err, "get patch")
}
updatedPod, err := c.patchCurrentPod(patchBytes)
if err != nil {
return perrors.WithMessage(err, "patch current pod")
}
c.currentPod = updatedPod
logger.Debugf("put the @key = %s @value = %s success", k, v)
// not update the watcherSet, the watcherSet should be write by the watchPodsLoop
return nil
}
// patch current pod
// write new meta for current pod
func (c *Client) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.rawClient.CoreV1().Pods(c.ns).Patch(c.currentPodName, types.StrategicMergePatchType, patch)
if err != nil {
return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
}
return updatedPod, nil
}
// assemble the dubbo kubernetes label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *Client) assembleDUBBOLabel(currentPod *v1.Pod) (*v1.Pod, *v1.Pod, error) {
var (
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
)
oldPod.Labels = make(map[string]string, 8)
newPod.Labels = make(map[string]string, 8)
if currentPod.GetLabels() != nil {
if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue {
// already have label
return nil, nil, ErrDubboLabelAlreadyExist
}
}
// copy current pod labels to oldPod && newPod
for k, v := range currentPod.GetLabels() {
oldPod.Labels[k] = v
newPod.Labels[k] = v
}
// assign new label for current pod
newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue
return oldPod, newPod, nil
}
// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
oldPod.Annotations = make(map[string]string, 8)
newPod.Annotations = make(map[string]string, 8)
for k, v := range currentPod.GetAnnotations() {
oldPod.Annotations[k] = v
newPod.Annotations[k] = v
}
al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
if err != nil {
err = perrors.WithMessage(err, "unmarshal record")
return
}
newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
}
newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
return
}
// getPatch
// get the kubernetes pod patch bytes
func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal old pod")
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal newPod pod")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil {
return nil, perrors.WithMessage(err, "create two-way-merge-patch")
}
return patchBytes, nil
}
// GetChildren
// get k children list from kubernetes-watcherSet
func (c *Client) GetChildren(k string) ([]string, []string, error) {
objectList, err := c.watcherSet.Get(k, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
}
var kList []string
var vList []string
for _, o := range objectList {
kList = append(kList, o.Key)
vList = append(vList, o.Value)
}
return kList, vList, nil
}
// Watch
// watch on spec key
func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.watcherSet.Watch(k, false)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
}
return w.ResultChan(), w.done(), nil
}
// Watch
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
w, err := c.watcherSet.Watch(prefix, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
}
return w.ResultChan(), w.done(), nil
}
// Valid
// Valid the client
// if return false, the client is die
func (c *Client) Valid() bool {
select {
case <-c.Done():
return false
default:
}
c.lock.RLock()
defer c.lock.RUnlock()
return c.rawClient != nil
}
// Done
// read the client status
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
// Stop
// read the client status
func (c *Client) Close() {
select {
case <-c.ctx.Done():
//already stopped
return
default:
}
c.cancel()
// the client ctx be canceled
// will trigger the watcherSet watchers all stopped
// so, just wait
c.wg.Wait()
}
// ValidateClient
// validate the kubernetes client
func ValidateClient(container clientFacade) error {
client := container.Client()
// new Client
if client == nil || client.Valid() {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
}
newClient, err := newClient(ns)
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns)
}
container.SetClient(newClient)
}
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"encoding/json"
"fmt"
"net/http"
"os"
"runtime"
"strings"
"sync"
"testing"
"time"
)
import (
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
// tests dataset
var tests = []struct {
input struct {
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "name", v: "scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix", v: "prefix.scott.wang"}},
{input: struct {
k string
v string
}{k: "namePrefix1", v: "prefix1.scott.wang"}},
{input: struct {
k string
v string
}{k: "age", v: "27"}},
}
// test dataset prefix
const prefix = "name"
var clientPodJsonData = `{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"annotations": {
"dubbo.io/annotation": "W3siayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJibyIsInYiOiIifSx7ImsiOiIvZHViYm8vY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvY29uc3VtZXJzL2NvbnN1bWVyJTNBJTJGJTJGMTcyLjE3LjAuOCUyRlVzZXJQcm92aWRlciUzRmNhdGVnb3J5JTNEY29uc3VtZXJzJTI2ZHViYm8lM0RkdWJib2dvLWNvbnN1bWVyLTIuNi4wJTI2cHJvdG9jb2wlM0RkdWJibyIsInYiOiIifV0="
},
"creationTimestamp": "2020-03-13T03:38:57Z",
"labels": {
"dubbo.io/label": "dubbo.io-value"
},
"name": "client",
"namespace": "default",
"resourceVersion": "2449700",
"selfLink": "/api/v1/namespaces/default/pods/client",
"uid": "3ec394f5-dcc6-49c3-8061-57b4b2b41344"
},
"spec": {
"containers": [
{
"env": [
{
"name": "NAMESPACE",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "metadata.namespace"
}
}
}
],
"image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client",
"imagePullPolicy": "Always",
"name": "client",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"volumeMounts": [
{
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
"name": "dubbo-sa-token-l2lzh",
"readOnly": true
}
]
}
],
"dnsPolicy": "ClusterFirst",
"enableServiceLinks": true,
"nodeName": "minikube",
"priority": 0,
"restartPolicy": "Never",
"schedulerName": "default-scheduler",
"securityContext": {},
"serviceAccount": "dubbo-sa",
"serviceAccountName": "dubbo-sa",
"terminationGracePeriodSeconds": 30,
"tolerations": [
{
"effect": "NoExecute",
"key": "node.kubernetes.io/not-ready",
"operator": "Exists",
"tolerationSeconds": 300
},
{
"effect": "NoExecute",
"key": "node.kubernetes.io/unreachable",
"operator": "Exists",
"tolerationSeconds": 300
}
],
"volumes": [
{
"name": "dubbo-sa-token-l2lzh",
"secret": {
"defaultMode": 420,
"secretName": "dubbo-sa-token-l2lzh"
}
}
]
},
"status": {
"conditions": [
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:38:57Z",
"status": "True",
"type": "Initialized"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:40:18Z",
"status": "True",
"type": "Ready"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:40:18Z",
"status": "True",
"type": "ContainersReady"
},
{
"lastProbeTime": null,
"lastTransitionTime": "2020-03-13T03:38:57Z",
"status": "True",
"type": "PodScheduled"
}
],
"containerStatuses": [
{
"containerID": "docker://2870d6abc19ca7fe22ca635ebcfac5d48c6d5550a659bafd74fb48104f6dfe3c",
"image": "registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client:latest",
"imageID": "docker-pullable://registry.cn-hangzhou.aliyuncs.com/scottwang/dubbogo-client@sha256:1f075131f708a0d400339e81549d7c4d4ed917ab0b6bd38ef458dd06ad25a559",
"lastState": {},
"name": "client",
"ready": true,
"restartCount": 0,
"state": {
"running": {
"startedAt": "2020-03-13T03:40:17Z"
}
}
}
],
"hostIP": "10.0.2.15",
"phase": "Running",
"podIP": "172.17.0.8",
"qosClass": "BestEffort",
"startTime": "2020-03-13T03:38:57Z"
}
}
`
type KubernetesClientTestSuite struct {
suite.Suite
currentPod v1.Pod
}
func (s *KubernetesClientTestSuite) initClient() *Client {
t := s.T()
client, err := newMockClient(s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) {
out := fake.NewSimpleClientset()
// mock current pod
if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil {
t.Fatal(err)
}
return out, nil
})
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
return client
}
func (s *KubernetesClientTestSuite) SetupSuite() {
runtime.GOMAXPROCS(1)
t := s.T()
// 1. install test data
if err := json.Unmarshal([]byte(clientPodJsonData), &s.currentPod); err != nil {
t.Fatal(err)
}
// 2. set downward-api inject env
if err := os.Setenv(podNameKey, s.currentPod.GetName()); err != nil {
t.Fatal(err)
}
if err := os.Setenv(nameSpaceKey, s.currentPod.GetNamespace()); err != nil {
t.Fatal(err)
}
go http.ListenAndServe(":6061", nil)
}
func (s *KubernetesClientTestSuite) TestReadCurrentPodName() {
t := s.T()
n, err := getCurrentPodName()
if err != nil {
t.Fatal(err)
}
if n != s.currentPod.GetName() {
t.Fatalf("expect %s but got %s", s.currentPod.GetName(), n)
}
}
func (s *KubernetesClientTestSuite) TestReadCurrentNameSpace() {
t := s.T()
ns, err := getCurrentNameSpace()
if err != nil {
t.Fatal(err)
}
if ns != s.currentPod.GetNamespace() {
t.Fatalf("expect %s but got %s", s.currentPod.GetNamespace(), ns)
}
}
func (s *KubernetesClientTestSuite) TestClientValid() {
t := s.T()
client := s.initClient()
defer client.Close()
if client.Valid() != true {
t.Fatal("client is not valid")
}
client.Close()
if client.Valid() != false {
t.Fatal("client is valid")
}
}
func (s *KubernetesClientTestSuite) TestClientDone() {
t := s.T()
client := s.initClient()
go func() {
time.Sleep(time.Second)
client.Close()
}()
<-client.Done()
if client.Valid() == true {
t.Fatal("client should be invalid then")
}
}
func (s *KubernetesClientTestSuite) TestClientCreateKV() {
t := s.T()
client := s.initClient()
defer client.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := client.Create(k, v); err != nil {
t.Fatal(err)
}
}
}
func (s *KubernetesClientTestSuite) TestClientGetChildrenKVList() {
t := s.T()
client := s.initClient()
defer client.Close()
wg := sync.WaitGroup{}
wg.Add(1)
syncDataComplete := make(chan struct{})
go func() {
wc, done, err := client.WatchWithPrefix(prefix)
if err != nil {
t.Fatal(err)
}
wg.Done()
i := 0
for {
select {
case e := <-wc:
i++
fmt.Printf("got event %v k %s v %s\n", e.EventType, e.Key, e.Value)
if i == 3 {
// already sync all event
syncDataComplete <- struct{}{}
return
}
case <-done:
t.Log("the watcherSet watcher was stopped")
return
}
}
}()
// wait the watch goroutine start
wg.Wait()
expect := make(map[string]string)
got := make(map[string]string)
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if strings.Contains(k, prefix) {
expect[k] = v
}
if err := client.Create(k, v); err != nil {
t.Fatal(err)
}
}
<-syncDataComplete
// start get all children
kList, vList, err := client.GetChildren(prefix)
if err != nil {
t.Fatal(err)
}
for i := 0; i < len(kList); i++ {
got[kList[i]] = vList[i]
}
for expectK, expectV := range expect {
if got[expectK] != expectV {
t.Fatalf("expect {%s: %s} but got {%s: %v}", expectK, expectV, expectK, got[expectK])
}
}
}
func (s *KubernetesClientTestSuite) TestClientWatchPrefix() {
t := s.T()
client := s.initClient()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wc, done, err := client.WatchWithPrefix(prefix)
if err != nil {
t.Fatal(err)
}
wg.Done()
for {
select {
case e := <-wc:
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
case <-done:
t.Log("the watcherSet watcher was stopped")
return
}
}
}()
// must wait the watch goroutine work
wg.Wait()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := client.Create(k, v); err != nil {
t.Fatal(err)
}
}
client.Close()
}
func (s *KubernetesClientTestSuite) TestNewClient() {
t := s.T()
_, err := newClient(s.currentPod.GetNamespace())
if err == nil {
t.Fatal("the out of cluster test should fail")
}
}
func (s *KubernetesClientTestSuite) TestClientWatch() {
t := s.T()
client := s.initClient()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wc, done, err := client.Watch(prefix)
if err != nil {
t.Fatal(err)
}
wg.Done()
for {
select {
case e := <-wc:
t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
case <-done:
t.Log("the watcherSet watcher was stopped")
return
}
}
}()
// must wait the watch goroutine already start the watch goroutine
wg.Wait()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := client.Create(k, v); err != nil {
t.Fatal(err)
}
}
client.Close()
}
func TestKubernetesClient(t *testing.T) {
suite.Run(t, new(KubernetesClientTestSuite))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
type clientFacade interface {
Client() *Client
SetClient(*Client)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"sync"
)
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)
type mockFacade struct {
client *Client
cltLock sync.Mutex
done chan struct{}
}
func (r *mockFacade) Client() *Client {
return r.client
}
func (r *mockFacade) SetClient(client *Client) {
r.client = client
}
func (s *KubernetesClientTestSuite) Test_Facade() {
t := s.T()
mockClient, err := newMockClient(s.currentPod.GetNamespace(), func() (kubernetes.Interface, error) {
out := fake.NewSimpleClientset()
// mock current pod
if _, err := out.CoreV1().Pods(s.currentPod.GetNamespace()).Create(&s.currentPod); err != nil {
t.Fatal(err)
}
return out, nil
})
if err != nil {
t.Fatal(err)
}
m := &mockFacade{
client: mockClient,
}
if err := ValidateClient(m); err == nil {
t.Fatal("out of cluster should err")
}
mockClient.Close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"sync"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type EventListener struct {
client *Client
keyMapLock sync.RWMutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
keyMap: make(map[string]struct{}, 8),
}
}
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
defer l.wg.Done()
for {
wc, done, err := l.client.Watch(key)
if err != nil {
logger.Warnf("watch exist{key:%s} = error{%v}", key, err)
return false
}
select {
// client stopped
case <-l.client.Done():
logger.Warnf("kubernetes client stopped")
return false
// watcherSet watcher stopped
case <-done:
logger.Warnf("kubernetes watcherSet watcher stopped")
return false
// handle kubernetes-watcherSet events
case e, ok := <-wc:
if !ok {
logger.Warnf("kubernetes-watcherSet watch-chan closed")
return false
}
if l.handleEvents(e, listener...) {
// if event is delete
return true
}
}
}
return false
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {
logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key)
switch event.EventType {
case Create:
for _, listener := range listeners {
logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataCreated}", event.Key)
listener.DataChange(remoting.Event{
Path: string(event.Key),
Action: remoting.EventTypeAdd,
Content: string(event.Value),
})
}
return false
case Update:
for _, listener := range listeners {
logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataChanged}", event.Key)
listener.DataChange(remoting.Event{
Path: string(event.Key),
Action: remoting.EventTypeUpdate,
Content: string(event.Value),
})
}
return false
case Delete:
logger.Warnf("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDeleted}", event.Key)
return true
default:
return false
}
}
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
defer l.wg.Done()
for {
wc, done, err := l.client.WatchWithPrefix(prefix)
if err != nil {
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
}
select {
// client stopped
case <-l.client.Done():
logger.Warnf("kubernetes client stopped")
return
// watcher stopped
case <-done:
logger.Warnf("kubernetes watcherSet watcher stopped")
return
// kuberentes-watcherSet event stream
case e, ok := <-wc:
if !ok {
logger.Warnf("kubernetes-watcherSet watch-chan closed")
return
}
l.handleEvents(e, listener...)
}
}
}
// this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.keyMapLock.RLock()
_, ok := l.keyMap[key]
l.keyMapLock.RUnlock()
if ok {
logger.Warnf("kubernetes-watcherSet key %s has already been listened.", key)
return
}
l.keyMapLock.Lock()
// double check
if _, ok := l.keyMap[key]; ok {
// another goroutine already set it
l.keyMapLock.Unlock()
return
}
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
keyList, valueList, err := l.client.GetChildren(key)
if err != nil {
logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}
logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
for i, k := range keyList {
logger.Infof("got children list key -> %s", k)
listener.DataChange(remoting.Event{
Path: k,
Action: remoting.EventTypeAdd,
Content: valueList[i],
})
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key)
l.wg.Add(1)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
logger.Infof("listen dubbo service key{%s}", key)
l.wg.Add(1)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(kubernetes key{%s}) goroutine exit now", key)
}(key)
}
func (l *EventListener) Close() {
l.wg.Wait()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/remoting"
)
var changedData = `
dubbo.consumer.request_timeout=3s
dubbo.consumer.connect_timeout=5s
dubbo.application.organization=ikurento.com
dubbo.application.name=BDTService
dubbo.application.module=dubbogo user-info server
dubbo.application.version=0.0.1
dubbo.application.owner=ZX
dubbo.application.environment=dev
dubbo.registries.hangzhouzk.protocol=zookeeper
dubbo.registries.hangzhouzk.timeout=3s
dubbo.registries.hangzhouzk.address=127.0.0.1:2181
dubbo.registries.shanghaizk.protocol=zookeeper
dubbo.registries.shanghaizk.timeout=3s
dubbo.registries.shanghaizk.address=127.0.0.1:2182
dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo
dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider
dubbo.service.com.ikurento.user.UserProvider.loadbalance=random
dubbo.service.com.ikurento.user.UserProvider.warmup=100
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
type mockDataListener struct {
eventList []remoting.Event
client *Client
changedData string
rc chan remoting.Event
}
func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
m.eventList = append(m.eventList, eventType)
if eventType.Content == m.changedData {
m.rc <- eventType
}
return true
}
func (s *KubernetesClientTestSuite) TestListener() {
t := s.T()
var tests = []struct {
input struct {
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "/dubbo", v: changedData}},
}
c := s.initClient()
defer c.Close()
listener := NewEventListener(c)
dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
listener.ListenServiceEvent("/dubbo", dataListener)
// NOTICE: direct listen will lose create msg
time.Sleep(time.Second)
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
}
msg := <-dataListener.rc
assert.Equal(t, changedData, msg.Content)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"strconv"
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
)
var (
ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped")
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
const (
defaultWatcherChanSize = 100
)
type eventType int
const (
Create eventType = iota
Update
Delete
)
func (e eventType) String() string {
switch e {
case Create:
return "CREATE"
case Update:
return "UPDATE"
case Delete:
return "DELETE"
default:
return "UNKNOWN"
}
}
// WatcherEvent
// watch event is element in watcherSet
type WatcherEvent struct {
// event-type
EventType eventType `json:"-"`
// the dubbo-go should consume the key
Key string `json:"k"`
// the dubbo-go should consume the value
Value string `json:"v"`
}
// Watchable WatcherSet
type WatcherSet interface {
// put the watch event to the watch set
Put(object *WatcherEvent) error
// if prefix is false,
// the len([]*WatcherEvent) == 1
Get(key string, prefix bool) ([]*WatcherEvent, error)
// watch the spec key or key prefix
Watch(key string, prefix bool) (Watcher, error)
// check the watcher set status
Done() <-chan struct{}
}
// Watcher
type Watcher interface {
// the watcher's id
ID() string
// result stream
ResultChan() <-chan *WatcherEvent
// Stop the watcher
stop()
// check the watcher status
done() <-chan struct{}
}
// the watch set implement
type watcherSetImpl struct {
// Client's ctx, client die, the watch set will die too
ctx context.Context
// protect watcher-set and watchers
lock sync.RWMutex
// the key is dubbo-go interest meta
cache map[string]*WatcherEvent
currentWatcherId uint64
watchers map[uint64]*watcher
}
// closeWatchers
// when the watcher-set was closed
func (s *watcherSetImpl) closeWatchers() {
select {
case <-s.ctx.Done():
// parent ctx be canceled, close the watch-set's watchers
s.lock.Lock()
watchers := s.watchers
s.lock.Unlock()
for _, w := range watchers {
// stop data stream
// close(w.ch)
// stop watcher
w.stop()
}
}
}
// Watch
// watch on spec key, with or without prefix
func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}
// Done
// get the watcher-set status
func (s *watcherSetImpl) Done() <-chan struct{} {
return s.ctx.Done()
}
// Put
// put the watch event to watcher-set
func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
sendMsg := func(object *WatcherEvent, w *watcher) {
select {
case <-w.done():
// the watcher already stop
case w.ch <- object:
// block send the msg
}
}
s.lock.Lock()
defer s.lock.Unlock()
if err := s.valid(); err != nil {
return err
}
// put to watcher-set
if watcherEvent.EventType == Delete {
delete(s.cache, watcherEvent.Key)
} else {
old, ok := s.cache[watcherEvent.Key]
if ok {
if old.Value == watcherEvent.Value {
// already have this k/v pair
return nil
}
}
// refresh the watcherEvent
s.cache[watcherEvent.Key] = watcherEvent
}
// notify watcher
for _, w := range s.watchers {
w := w
if !strings.Contains(watcherEvent.Key, w.interested.key) {
// this watcher no interest in this element
continue
}
if !w.interested.prefix {
if watcherEvent.Key == w.interested.key {
go sendMsg(watcherEvent, w)
}
// not interest
continue
}
go sendMsg(watcherEvent, w)
}
return nil
}
// valid
func (s *watcherSetImpl) valid() error {
select {
case <-s.ctx.Done():
return ErrWatcherSetAlreadyStopped
default:
return nil
}
}
// addWatcher
func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
if err := s.valid(); err != nil {
return nil, err
}
s.lock.Lock()
defer s.lock.Unlock()
// increase the watcher-id
s.currentWatcherId++
w := &watcher{
id: s.currentWatcherId,
watcherSet: s,
interested: struct {
key string
prefix bool
}{key: key, prefix: prefix},
ch: make(chan *WatcherEvent, defaultWatcherChanSize),
exit: make(chan struct{}),
}
s.watchers[s.currentWatcherId] = w
return w, nil
}
// Get
// get elements from watcher-set
func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if err := s.valid(); err != nil {
return nil, err
}
if !prefix {
for k, v := range s.cache {
if k == key {
return []*WatcherEvent{v}, nil
}
}
// object
return nil, ErrKVPairNotFound
}
var out []*WatcherEvent
for k, v := range s.cache {
if strings.Contains(k, key) {
out = append(out, v)
}
}
if len(out) == 0 {
return nil, ErrKVPairNotFound
}
return out, nil
}
// the watcher-set watcher
type watcher struct {
id uint64
// the underlay watcherSet
watcherSet *watcherSetImpl
// the interest topic
interested struct {
key string
prefix bool
}
ch chan *WatcherEvent
closeOnce sync.Once
exit chan struct{}
}
// ResultChan
func (w *watcher) ResultChan() <-chan *WatcherEvent {
return w.ch
}
// ID
// the watcher's id
func (w *watcher) ID() string {
return strconv.FormatUint(w.id, 10)
}
// stop
// stop the watcher
func (w *watcher) stop() {
// double close will panic
w.closeOnce.Do(func() {
close(w.exit)
})
}
// done
// check watcher status
func (w *watcher) done() <-chan struct{} {
return w.exit
}
// newWatcherSet
// new watcher set from parent context
func newWatcherSet(ctx context.Context) WatcherSet {
s := &watcherSetImpl{
ctx: ctx,
cache: map[string]*WatcherEvent{},
watchers: map[uint64]*watcher{},
}
go s.closeWatchers()
return s
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kubernetes
import (
"context"
"strconv"
"sync"
"testing"
"time"
)
func TestWatchSet(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
s := newWatcherSet(ctx)
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
w, err := s.Watch("key-1", false)
if err != nil {
t.Fatal(err)
}
for {
select {
case e := <-w.ResultChan():
t.Logf("consumer %s got %s\n", w.ID(), e.Key)
case <-w.done():
t.Logf("consumer %s stopped", w.ID())
return
}
}
}()
}
for i := 2; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
w, err := s.Watch("key", true)
if err != nil {
t.Fatal(err)
}
for {
select {
case e := <-w.ResultChan():
t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key)
case <-w.done():
t.Logf("prefix consumer %s stopped", w.ID())
return
}
}
}()
}
for i := 0; i < 5; i++ {
go func(i int) {
if err := s.Put(&WatcherEvent{
Key: "key-" + strconv.Itoa(i),
Value: strconv.Itoa(i),
}); err != nil {
t.Fatal(err)
}
}(i)
}
wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment