From 1fdd429b8c4a02c477ceca510cce779f782fe512 Mon Sep 17 00:00:00 2001 From: Laurence <45508533+LaurenceLiZhixin@users.noreply.github.com> Date: Fri, 12 Nov 2021 16:44:44 +0800 Subject: [PATCH] Fix: some bugs and features for 3.0 (#1586) * fix: some bugs * Fix: remove file\k8s registry, k8s remote --- common/proxy/proxy.go | 31 +- config_center/nacos/listener.go | 2 +- go.mod | 7 +- go.sum | 18 +- protocol/dubbo3/dubbo3_invoker.go | 19 +- protocol/dubbo3/dubbo3_protocol.go | 8 +- registry/base_registry.go | 6 +- registry/file/listener.go | 29 - registry/file/service_discovery.go | 267 --------- registry/file/service_discovery_test.go | 91 --- registry/kubernetes/listener.go | 127 ----- registry/kubernetes/listener_test.go | 53 -- registry/kubernetes/registry.go | 241 -------- registry/kubernetes/registry_test.go | 344 ----------- remoting/kubernetes/client.go | 213 ------- remoting/kubernetes/client_test.go | 455 --------------- remoting/kubernetes/facade.go | 28 - remoting/kubernetes/facade_test.go | 78 --- remoting/kubernetes/listener.go | 212 ------- remoting/kubernetes/listener_test.go | 103 ---- remoting/kubernetes/registry_controller.go | 633 --------------------- remoting/kubernetes/watch.go | 320 ----------- remoting/kubernetes/watch_test.go | 96 ---- remoting/zookeeper/listener.go | 2 +- 24 files changed, 53 insertions(+), 3330 deletions(-) delete mode 100644 registry/file/listener.go delete mode 100644 registry/file/service_discovery.go delete mode 100644 registry/file/service_discovery_test.go delete mode 100644 registry/kubernetes/listener.go delete mode 100644 registry/kubernetes/listener_test.go delete mode 100644 registry/kubernetes/registry.go delete mode 100644 registry/kubernetes/registry_test.go delete mode 100644 remoting/kubernetes/client.go delete mode 100644 remoting/kubernetes/client_test.go delete mode 100644 remoting/kubernetes/facade.go delete mode 100644 remoting/kubernetes/facade_test.go delete mode 100644 remoting/kubernetes/listener.go delete mode 100644 remoting/kubernetes/listener_test.go delete mode 100644 remoting/kubernetes/registry_controller.go delete mode 100644 remoting/kubernetes/watch.go delete mode 100644 remoting/kubernetes/watch_test.go diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index e1c6222f9..c32eedc57 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -19,6 +19,7 @@ package proxy import ( "context" + "errors" "reflect" "sync" ) @@ -116,13 +117,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { valueOf := reflect.ValueOf(v) valueOfElem := valueOf.Elem() - typeOf := valueOfElem.Type() - - // check incoming interface, incoming interface's elem must be a struct. - if typeOf.Kind() != reflect.Struct { - logger.Errorf("The type of RPCService(=\"%T\") must be a pointer of a struct.", v) - return - } makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { return func(in []reflect.Value) []reflect.Value { @@ -227,6 +221,18 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { } } + if err := refectAndMakeObjectFunc(valueOfElem, makeDubboCallProxy); err != nil { + logger.Errorf("The type or combination type of RPCService %T must be a pointer of a struct. error is %s", v, err) + return + } +} + +func refectAndMakeObjectFunc(valueOfElem reflect.Value, makeDubboCallProxy func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value) error { + typeOf := valueOfElem.Type() + // check incoming interface, incoming interface's elem must be a struct. + if typeOf.Kind() != reflect.Struct { + return errors.New("invalid type kind") + } numField := valueOfElem.NumField() for i := 0; i < numField; i++ { t := typeOf.Field(i) @@ -258,6 +264,17 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { // do method proxy here: f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts))) logger.Debugf("set method [%s]", methodName) + } else if f.IsValid() && f.CanSet() { + // for struct combination + valueOfSub := reflect.New(t.Type) + valueOfElemInterface := valueOfSub.Elem() + if valueOfElemInterface.Type().Kind() == reflect.Struct { + if err := refectAndMakeObjectFunc(valueOfElemInterface, makeDubboCallProxy); err != nil { + return err + } + f.Set(valueOfElemInterface) + } } } + return nil } diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index 3d60d2ae3..a4cf589e4 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -22,6 +22,7 @@ import ( ) import ( + constant2 "github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/vo" ) @@ -30,7 +31,6 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/config_center" "dubbo.apache.org/dubbo-go/v3/remoting" - constant2 "github.com/nacos-group/nacos-sdk-go/common/constant" ) func callback(listener config_center.ConfigurationListener, _, _, dataId, data string) { diff --git a/go.mod b/go.mod index 30d3bb8ac..3256db06d 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,8 @@ require ( github.com/creasty/defaults v1.5.2 github.com/dubbogo/go-zookeeper v1.0.3 github.com/dubbogo/gost v1.11.19 - github.com/dubbogo/grpc-go v1.42.4-triple - github.com/dubbogo/triple v1.1.2 + github.com/dubbogo/grpc-go v1.42.5-triple + github.com/dubbogo/triple v1.1.3 github.com/emicklei/go-restful/v3 v3.7.1 github.com/fsnotify/fsnotify v1.5.1 github.com/ghodss/yaml v1.0.0 @@ -32,8 +32,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go v1.0.9 github.com/natefinch/lumberjack v2.0.0+incompatible - github.com/onsi/ginkgo v1.10.1 // indirect - github.com/onsi/gomega v1.7.0 // indirect github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 @@ -49,7 +47,6 @@ require ( google.golang.org/protobuf v1.27.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 - k8s.io/api v0.16.9 k8s.io/apimachinery v0.16.9 k8s.io/client-go v0.16.9 ) diff --git a/go.sum b/go.sum index 2fbdc68c2..4ab084c0e 100644 --- a/go.sum +++ b/go.sum @@ -177,13 +177,13 @@ github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZT github.com/dubbogo/gost v1.11.18/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI= github.com/dubbogo/gost v1.11.19 h1:R1rZ3TNJKV9W5XHLMv+GDO2Wy6UDnwGQtVWbsWYvo0A= github.com/dubbogo/gost v1.11.19/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI= -github.com/dubbogo/grpc-go v1.42.4-triple h1:ysiabUrEGcaeXgnjSBT0bB1M7EexSJFiO0Mebg/Iqa4= -github.com/dubbogo/grpc-go v1.42.4-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM= +github.com/dubbogo/grpc-go v1.42.5-triple h1:Ed5z/ikkpdZHBMA4mTEthQFTQeKlHtkdAsQrZjTbFk8= +github.com/dubbogo/grpc-go v1.42.5-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM= github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dubbogo/net v0.0.4/go.mod h1:1CGOnM7X3he+qgGNqjeADuE5vKZQx/eMSeUkpU3ujIc= github.com/dubbogo/triple v1.0.9/go.mod h1:1t9me4j4CTvNDcsMZy6/OGarbRyAUSY0tFXGXHCp7Iw= -github.com/dubbogo/triple v1.1.2 h1:7lmQ0uNvcIYlMj5gNwPQadFx8w8UDEtcYl4DL6X+idM= -github.com/dubbogo/triple v1.1.2/go.mod h1:x+H41M5yP1ULnJu4b+o8VrgsIKdTPslTum2yUqA9N1I= +github.com/dubbogo/triple v1.1.3 h1:XKSh42lE2HLud++g4Fif7XY2hSMEsohFpegZPvsNXVQ= +github.com/dubbogo/triple v1.1.3/go.mod h1:suMeAfZliq0p/lWIytgEdiuKcRlmeJC9pYeNHVE7FWU= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -208,7 +208,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.0/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.5.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU= @@ -358,7 +357,6 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -440,7 +438,6 @@ github.com/hashicorp/vault/sdk v0.3.0 h1:kR3dpxNkhh/wr6ycaJYqp6AFT/i2xaftbfnwZdu github.com/hashicorp/vault/sdk v0.3.0/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0= github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -585,13 +582,9 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= -github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= -github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= @@ -1200,7 +1193,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= @@ -1212,7 +1204,6 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= @@ -1248,7 +1239,6 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= -k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ= k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= k8s.io/utils v0.0.0-20190801114015-581e00157fb1 h1:+ySTxfHnfzZb9ys375PXNlLhkJPLKgHajBU0N62BDvE= k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go index c978f853f..da42a3220 100644 --- a/protocol/dubbo3/dubbo3_invoker.go +++ b/protocol/dubbo3/dubbo3_invoker.go @@ -27,6 +27,8 @@ import ( ) import ( + "github.com/dubbogo/grpc-go/metadata" + tripleConstant "github.com/dubbogo/triple/pkg/common/constant" triConfig "github.com/dubbogo/triple/pkg/config" "github.com/dubbogo/triple/pkg/triple" @@ -134,7 +136,19 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } // append interface id to ctx - ctx = context.WithValue(ctx, tripleConstant.CtxAttachmentKey, invocation.Attachments()) + gRPCMD := make(metadata.MD, 0) + for k, v := range invocation.Attachments() { + if str, ok := v.(string); ok { + gRPCMD.Set(k, str) + continue + } + if str, ok := v.([]string); ok { + gRPCMD.Set(k, str...) + continue + } + logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k) + } + ctx = metadata.NewOutgoingContext(ctx, gRPCMD) ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, "")) in := make([]reflect.Value, 0, 16) in = append(in, reflect.ValueOf(ctx)) @@ -146,8 +160,9 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati methodName := invocation.MethodName() triAttachmentWithErr := di.client.Invoke(methodName, in, invocation.Reply()) result.Err = triAttachmentWithErr.GetError() + result.Attrs = make(map[string]interface{}) for k, v := range triAttachmentWithErr.GetAttachments() { - result.Attachment(k, v) + result.Attrs[k] = v } result.Rest = invocation.Reply() return &result diff --git a/protocol/dubbo3/dubbo3_protocol.go b/protocol/dubbo3/dubbo3_protocol.go index 3e097c77f..2ad3dffb9 100644 --- a/protocol/dubbo3/dubbo3_protocol.go +++ b/protocol/dubbo3/dubbo3_protocol.go @@ -190,13 +190,7 @@ func (d *UnaryService) GetReqParamsInterfaces(methodName string) ([]interface{}, } func (d *UnaryService) InvokeWithArgs(ctx context.Context, methodName string, arguments []interface{}) (interface{}, error) { - dubboAttachment := make(map[string]interface{}) - tripleAttachment, ok := ctx.Value(tripleConstant.TripleAttachement).(tripleCommon.TripleAttachment) - if ok { - for k, v := range tripleAttachment { - dubboAttachment[k] = v - } - } + dubboAttachment, _ := ctx.Value(tripleConstant.TripleAttachement).(tripleCommon.DubboAttachment) res := d.proxyImpl.Invoke(ctx, invocation.NewRPCInvocation(methodName, arguments, dubboAttachment)) return res, res.Error() } diff --git a/registry/base_registry.go b/registry/base_registry.go index 7ac9f2b0b..c6151e8c8 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -358,7 +358,7 @@ func (r *BaseRegistry) consumerRegistry(c *common.URL, params url.Values, f crea rawURL string err error ) - dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER]) + dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.CONSUMER]) if f != nil { err = f(dubboPath) @@ -412,7 +412,7 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) listener.Close() break } else { - logger.Infof("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String()) + logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String()) notifyListener.Notify(serviceEvent) } } @@ -443,7 +443,7 @@ func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListene listener.Close() break } else { - logger.Infof("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String()) + logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String()) notifyListener.Notify(serviceEvent) } } diff --git a/registry/file/listener.go b/registry/file/listener.go deleted file mode 100644 index 55f35ead0..000000000 --- a/registry/file/listener.go +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package file - -import ( - "dubbo.apache.org/dubbo-go/v3/config_center" -) - -// RegistryConfigurationListener represent the processor of flie watcher -type RegistryConfigurationListener struct{} - -// Process submit the ConfigChangeEvent to the event chan to notify all observer -func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) { -} diff --git a/registry/file/service_discovery.go b/registry/file/service_discovery.go deleted file mode 100644 index 65f2c24cc..000000000 --- a/registry/file/service_discovery.go +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package file - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "path" - "strconv" -) - -import ( - gxset "github.com/dubbogo/gost/container/set" - gxpage "github.com/dubbogo/gost/hash/page" - - perrors "github.com/pkg/errors" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/common/logger" - "dubbo.apache.org/dubbo-go/v3/config_center" - "dubbo.apache.org/dubbo-go/v3/config_center/file" - "dubbo.apache.org/dubbo-go/v3/registry" -) - -// init will put the service discovery into extension -func init() { - extension.SetServiceDiscovery(constant.FileKey, newFileSystemServiceDiscovery) -} - -// fileServiceDiscovery is the implementation of service discovery based on file. -type fileSystemServiceDiscovery struct { - dynamicConfiguration file.FileSystemDynamicConfiguration - rootPath string - fileMap map[string]string -} - -func newFileSystemServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - if url.Protocol != constant.FileKey { - return nil, perrors.New("could not init the instance because the config is invalid") - } - - rp, err := file.Home() - if err != nil { - return nil, perrors.WithStack(err) - } - fdcf := extension.GetConfigCenterFactory(constant.FileKey) - p := path.Join(rp, ".dubbo", constant.RegistryKey) - url.AddParamAvoidNil(file.ConfigCenterDirParamName, p) - c, err := fdcf.GetDynamicConfiguration(url) - if err != nil { - return nil, perrors.WithStack(err) - } - - sd := &fileSystemServiceDiscovery{ - dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration), - rootPath: p, - fileMap: make(map[string]string), - } - - extension.AddCustomShutdownCallback(func() { - if err := sd.Destroy(); err != nil { - logger.Warnf("sd.Destroy() = error:%v", err) - } - }) - - for _, v := range sd.GetServices().Values() { - for _, i := range sd.GetInstances(v.(string)) { - // like java do nothing - l := &RegistryConfigurationListener{} - sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i))) - } - } - - return sd, nil -} - -// nolint -func (fssd *fileSystemServiceDiscovery) String() string { - return fmt.Sprintf("file-system-service-discovery") -} - -// Destroy will destroy the service discovery. -// If the discovery cannot be destroy, it will return an error. -func (fssd *fileSystemServiceDiscovery) Destroy() error { - fssd.dynamicConfiguration.Close() - - for _, f := range fssd.fileMap { - fssd.releaseAndRemoveRegistrationFiles(f) - } - - return nil -} - -// nolint -func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file string) { - os.RemoveAll(file) -} - -// ----------------- registration ---------------- - -// Register will register an instance of ServiceInstance to registry -func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error { - id := getServiceInstanceId(instance) - sn := getServiceName(instance) - - c, err := toJsonString(instance) - if err != nil { - return perrors.WithStack(err) - } - - err = fssd.dynamicConfiguration.PublishConfig(id, sn, c) - if err != nil { - return perrors.WithStack(err) - } - - fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn) - - return nil -} - -// nolint -func getServiceInstanceId(si registry.ServiceInstance) string { - if si.GetID() == "" { - return si.GetHost() + "." + strconv.Itoa(si.GetPort()) - } - - return si.GetID() -} - -// nolint -func getServiceName(si registry.ServiceInstance) string { - return si.GetServiceName() -} - -// toJsonString to json string -func toJsonString(si registry.ServiceInstance) (string, error) { - bytes, err := json.Marshal(si) - if err != nil { - return "", perrors.WithStack(err) - } - - return string(bytes), nil -} - -// Update will update the data of the instance in registry -func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance) error { - return fssd.Register(instance) -} - -// Unregister will unregister this instance from registry -func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error { - id := getServiceInstanceId(instance) - sn := getServiceName(instance) - - err := fssd.dynamicConfiguration.RemoveConfig(id, sn) - if err != nil { - return perrors.WithStack(err) - } - - delete(fssd.fileMap, instance.GetID()) - return nil -} - -// ----------------- discovery ------------------- -// GetDefaultPageSize will return the default page size -func (fssd *fileSystemServiceDiscovery) GetDefaultPageSize() int { - return 100 -} - -// GetServices will return the all service names. -func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet { - r := gxset.NewSet() - // dynamicConfiguration root path is the actual root path - fileInfo, _ := ioutil.ReadDir(fssd.dynamicConfiguration.RootPath()) - - for _, file := range fileInfo { - if file.IsDir() { - r.Add(file.Name()) - } - } - - return r -} - -// GetInstances will return all service instances with serviceName -func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { - set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName) - if err != nil { - logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ", - serviceName, err) - return make([]registry.ServiceInstance, 0) - } - - res := make([]registry.ServiceInstance, 0, set.Size()) - for _, v := range set.Values() { - id := v.(string) - p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName)) - if err != nil { - logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+ - "error = err{%v} ", - id, serviceName, err) - return make([]registry.ServiceInstance, 0) - } - - dsi := ®istry.DefaultServiceInstance{} - err = json.Unmarshal([]byte(p), dsi) - if err != nil { - logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+ - "error = err{%v} ", - id, serviceName, err) - return make([]registry.ServiceInstance, 0) - } - - res = append(res, dsi) - } - - return res -} - -// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName -// the page will start at offset -func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { - return nil -} - -// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance. -// The param healthy indices that the instance should be healthy or not. -// The page will start at offset -func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, - healthy bool) gxpage.Pager { - return nil -} - -// Batch get all instances by the specified service names -func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, - requestedSize int) map[string]gxpage.Pager { - return nil -} - -// ----------------- event ---------------------- -// AddListener adds a new ServiceInstancesChangedListenerImpl -// client -func (fssd *fileSystemServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error { - // fssd.dynamicConfiguration.AddListener(listener.ServiceName) - return nil -} diff --git a/registry/file/service_discovery_test.go b/registry/file/service_discovery_test.go deleted file mode 100644 index 0152fc634..000000000 --- a/registry/file/service_discovery_test.go +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package file - -// -//import ( -// "math/rand" -// "strconv" -// "testing" -// "time" -//) -// -//import ( -// "github.com/stretchr/testify/assert" -//) -// -//import ( -// "dubbo.apache.org/dubbo-go/v3/common/constant" -// "dubbo.apache.org/dubbo-go/v3/common/extension" -// "dubbo.apache.org/dubbo-go/v3/registry" -//) -// -//func TestNewFileSystemServiceDiscoveryAndDestroy(t *testing.T) { -// prepareData() -// serviceDiscovery, err := newFileSystemServiceDiscovery() -// assert.NoError(t, err) -// assert.NotNil(t, serviceDiscovery) -// defer func() { -// err = serviceDiscovery.Destroy() -// assert.Nil(t, err) -// }() -//} -// -//func TestCURDFileSystemServiceDiscovery(t *testing.T) { -// prepareData() -// serviceDiscovery, err := extension.GetServiceDiscovery(constant.FILE_KEY) -// assert.NoError(t, err) -// md := make(map[string]string) -// -// rand.Seed(time.Now().Unix()) -// serviceName := "service-name" + strconv.Itoa(rand.Intn(10000)) -// md["t1"] = "test1" -// r1 := ®istry.DefaultServiceInstance{ -// ID: "123456789", -// ServiceName: serviceName, -// Host: "127.0.0.1", -// Port: 2233, -// Enable: true, -// Healthy: true, -// Metadata: md, -// } -// err = serviceDiscovery.Register(r1) -// assert.NoError(t, err) -// -// instances := serviceDiscovery.GetInstances(r1.ServiceName) -// assert.Equal(t, 1, len(instances)) -// assert.Equal(t, r1.ID, instances[0].GetID()) -// assert.Equal(t, r1.ServiceName, instances[0].GetServiceName()) -// assert.Equal(t, r1.Port, instances[0].GetPort()) -// -// err = serviceDiscovery.Unregister(r1) -// assert.NoError(t, err) -// -// err = serviceDiscovery.Register(r1) -// assert.NoError(t, err) -// defer func() { -// err = serviceDiscovery.Destroy() -// assert.NoError(t, err) -// }() -//} -// -//func prepareData() { -// //config.GetRootConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{ -// // Protocol: "file", -// //} -//} diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go deleted file mode 100644 index 760523568..000000000 --- a/registry/kubernetes/listener.go +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "strings" -) - -import ( - gxchan "github.com/dubbogo/gost/container/chan" - - perrors "github.com/pkg/errors" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/logger" - "dubbo.apache.org/dubbo-go/v3/config_center" - "dubbo.apache.org/dubbo-go/v3/registry" - "dubbo.apache.org/dubbo-go/v3/remoting" -) - -type dataListener struct { - interestedURL []*common.URL - listener config_center.ConfigurationListener -} - -// NewRegistryDataListener creates a data listener for kubernetes -func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { - return &dataListener{listener: listener} -} - -// AddInterestedURL adds the @url of registry center to the listener -func (l *dataListener) AddInterestedURL(url *common.URL) { - l.interestedURL = append(l.interestedURL, url) -} - -// DataChange -// notify listen, when interest event -func (l *dataListener) DataChange(eventType remoting.Event) bool { - index := strings.Index(eventType.Path, "/providers/") - if index == -1 { - logger.Warnf("Listen with no url, event.path={%v}", eventType.Path) - return false - } - url := eventType.Path[index+len("/providers/"):] - serviceURL, err := common.NewURL(url) - if err != nil { - logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err) - return false - } - - for _, v := range l.interestedURL { - if serviceURL.URLEqual(v) { - l.listener.Process( - &config_center.ConfigChangeEvent{ - Key: eventType.Path, - Value: serviceURL, - ConfigType: eventType.Action, - }, - ) - return true - } - } - return false -} - -type configurationListener struct { - registry *kubernetesRegistry - events *gxchan.UnboundedChan -} - -// NewConfigurationListener for listening the event of kubernetes. -func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener { - // add a new waiter - reg.WaitGroup().Add(1) - return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)} -} - -// Process processes the data change event from config center of kubernetes -func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { - l.events.In() <- configType -} - -// Next returns next service event once received -func (l *configurationListener) Next() (*registry.ServiceEvent, error) { - for { - select { - case <-l.registry.Done(): - logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.") - return nil, perrors.New("listener stopped") - - case val := <-l.events.Out(): - e, _ := val.(*config_center.ConfigChangeEvent) - logger.Debugf("got kubernetes event %#v", e) - if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() { - select { - case <-l.registry.Done(): - logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) - default: - } - continue - } - return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil - } - } -} - -// Close kubernetes registry center -func (l *configurationListener) Close() { - l.registry.WaitGroup().Done() -} diff --git a/registry/kubernetes/listener_test.go b/registry/kubernetes/listener_test.go deleted file mode 100644 index a4913f5da..000000000 --- a/registry/kubernetes/listener_test.go +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "testing" -) - -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/config_center" - "dubbo.apache.org/dubbo-go/v3/remoting" -) - -func Test_DataChange(t *testing.T) { - listener := NewRegistryDataListener(&MockDataListener{}) - url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") - listener.AddInterestedURL(url) - int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) - assert.Equal(t, true, int) -} - -type MockDataListener struct{} - -func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {} - -func TestDataChange(t *testing.T) { - listener := NewRegistryDataListener(&MockDataListener{}) - url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100") - listener.AddInterestedURL(url) - if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100"}) { - t.Fatal("data change not ok") - } -} diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go deleted file mode 100644 index 63af25f16..000000000 --- a/registry/kubernetes/registry.go +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "fmt" - "path" - "sync" - "time" -) - -import ( - gxtime "github.com/dubbogo/gost/time" - - perrors "github.com/pkg/errors" - - v1 "k8s.io/api/core/v1" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/common/extension" - "dubbo.apache.org/dubbo-go/v3/common/logger" - "dubbo.apache.org/dubbo-go/v3/registry" - "dubbo.apache.org/dubbo-go/v3/remoting/kubernetes" -) - -const ( - Name = "kubernetes" - ConnDelay = 3 - MaxFailTimes = 15 -) - -func init() { - // processID = fmt.Sprintf("%d", os.Getpid()) - // localIP = common.GetLocalIp() - extension.SetRegistry(Name, newKubernetesRegistry) -} - -type kubernetesRegistry struct { - registry.BaseRegistry - cltLock sync.RWMutex - client *kubernetes.Client - listenerLock sync.Mutex - listener *kubernetes.EventListener - dataListener *dataListener - configListener *configurationListener -} - -// Client gets the etcdv3 kubernetes -func (r *kubernetesRegistry) Client() *kubernetes.Client { - r.cltLock.RLock() - client := r.client - r.cltLock.RUnlock() - return client -} - -// SetClient sets the kubernetes client -func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { - r.cltLock.Lock() - r.client = client - r.cltLock.Unlock() -} - -// CloseAndNilClient closes listeners and clear client -func (r *kubernetesRegistry) CloseAndNilClient() { - r.client.Close() - r.client = nil -} - -// CloseListener closes listeners -func (r *kubernetesRegistry) CloseListener() { - r.cltLock.Lock() - l := r.configListener - r.cltLock.Unlock() - if l != nil { - l.Close() - } - r.configListener = nil -} - -// CreatePath create the path in the registry center of kubernetes -func (r *kubernetesRegistry) CreatePath(k string) error { - if err := r.client.Create(k, ""); err != nil { - return perrors.WithMessagef(err, "create path %s in kubernetes", k) - } - return nil -} - -// DoRegister actually do the register job in the registry center of kubernetes -func (r *kubernetesRegistry) DoRegister(root string, node string) error { - return r.client.Create(path.Join(root, node), "") -} - -func (r *kubernetesRegistry) DoUnregister(root string, node string) error { - return perrors.New("DoUnregister is not support in kubernetesRegistry") -} - -// DoSubscribe actually subscribe the provider URL -func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { - var configListener *configurationListener - - r.listenerLock.Lock() - configListener = r.configListener - r.listenerLock.Unlock() - if r.listener == nil { - r.cltLock.Lock() - client := r.client - r.cltLock.Unlock() - if client == nil { - return nil, perrors.New("kubernetes client broken") - } - - r.listenerLock.Lock() - if r.listener == nil { - // double check - r.listener = kubernetes.NewEventListener(r.client) - } - r.listenerLock.Unlock() - } - - // register the svc to dataListener - r.dataListener.AddInterestedURL(svc) - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory, svc.Service()), r.dataListener) - - return configListener, nil -} - -// nolint -func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { - return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry") -} - -// InitListeners init listeners of kubernetes registry center -func (r *kubernetesRegistry) InitListeners() { - r.listener = kubernetes.NewEventListener(r.client) - r.configListener = NewConfigurationListener(r) - r.dataListener = NewRegistryDataListener(r.configListener) -} - -func newKubernetesRegistry(url *common.URL) (registry.Registry, error) { - // actually, kubernetes use in-cluster config, - r := &kubernetesRegistry{} - - r.InitBaseRegistry(url, r) - - if err := kubernetes.ValidateClient(r); err != nil { - return nil, perrors.WithStack(err) - } - - go r.HandleClientRestart() - r.InitListeners() - - logger.Debugf("kubernetes registry started") - - return r, nil -} - -func newMockKubernetesRegistry( - url *common.URL, - podsList *v1.PodList, -) (registry.Registry, error) { - - var err error - - r := &kubernetesRegistry{} - - r.InitBaseRegistry(url, r) - r.client, err = kubernetes.NewMockClient(podsList) - if err != nil { - return nil, perrors.WithMessage(err, "new mock client") - } - r.InitListeners() - return r, nil -} - -// HandleClientRestart will reconnect to kubernetes registry center -func (r *kubernetesRegistry) HandleClientRestart() { - r.WaitGroup().Add(1) - defer r.WaitGroup().Done() - var ( - err error - failTimes int - ) -LOOP: - for { - select { - case <-r.Done(): - logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...") - break LOOP - // re-register all services - case <-r.Client().Done(): - r.Client().Close() - r.SetClient(nil) - - // try to connect to kubernetes, - failTimes = 0 - for { - after := gxtime.After(timeSecondDuration(failTimes * ConnDelay)) - select { - case <-r.Done(): - logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...") - break LOOP - case <-after: // avoid connect frequent - } - err = kubernetes.ValidateClient(r) - logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err)) - - if err == nil { - if r.RestartCallBack() { - break - } - } - failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes - } - } - } - } -} - -func timeSecondDuration(sec int) time.Duration { - return time.Duration(sec) * time.Second -} diff --git a/registry/kubernetes/registry_test.go b/registry/kubernetes/registry_test.go deleted file mode 100644 index 491ff94d0..000000000 --- a/registry/kubernetes/registry_test.go +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "encoding/json" - "os" - "strconv" - "testing" - "time" -) - -import ( - "github.com/stretchr/testify/assert" - - v1 "k8s.io/api/core/v1" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" -) - -var clientPodListJsonData = `{ - "apiVersion": "v1", - "items": [ - { - "apiVersion": "v1", - "kind": "Pod", - "metadata": { - "annotations": { - "dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5saW1pdCUzRCUyNmV4ZWN1dGUubGltaXQucmVqZWN0ZWQuaGFuZGxlciUzRCUyNmdyb3VwJTNEJTI2aW50ZXJmYWNlJTNEY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyJTI2aXAlM0QxNzIuMTcuMC42JTI2bG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIubG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIucmV0cmllcyUzRDElMjZtZXRob2RzLkdldFVzZXIudHBzLmxpbWl0LmludGVydmFsJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5yYXRlJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5zdHJhdGVneSUzRCUyNm1ldGhvZHMuR2V0VXNlci53ZWlnaHQlM0QwJTI2bW9kdWxlJTNEZHViYm9nbyUyQnVzZXItaW5mbyUyQnNlcnZlciUyNm5hbWUlM0RCRFRTZXJ2aWNlJTI2b3JnYW5pemF0aW9uJTNEaWt1cmVudG8uY29tJTI2b3duZXIlM0RaWCUyNnBhcmFtLnNpZ24lM0QlMjZwaWQlM0Q2JTI2cmVnaXN0cnkucm9sZSUzRDMlMjZyZWxlYXNlJTNEZHViYm8tZ29sYW5nLTEuMy4wJTI2cmV0cmllcyUzRCUyNnNlcnZpY2UuZmlsdGVyJTNEZWNobyUyNTJDdG9rZW4lMjUyQ2FjY2Vzc2xvZyUyNTJDdHBzJTI1MkNnZW5lcmljX3NlcnZpY2UlMjUyQ2V4ZWN1dGUlMjUyQ3BzaHV0ZG93biUyNnNpZGUlM0Rwcm92aWRlciUyNnRpbWVzdGFtcCUzRDE1OTExNTYxNTUlMjZ0cHMubGltaXQuaW50ZXJ2YWwlM0QlMjZ0cHMubGltaXQucmF0ZSUzRCUyNnRwcy5saW1pdC5yZWplY3RlZC5oYW5kbGVyJTNEJTI2dHBzLmxpbWl0LnN0cmF0ZWd5JTNEJTI2dHBzLmxpbWl0ZXIlM0QlMjZ2ZXJzaW9uJTNEJTI2d2FybXVwJTNEMTAwIiwidiI6IiJ9XQ==" - }, - "creationTimestamp": "2020-06-03T03:49:14Z", - "generateName": "server-84c864f5bc-", - "labels": { - "dubbo.io/label": "dubbo.io-value", - "pod-template-hash": "84c864f5bc", - "role": "server" - }, - "name": "server-84c864f5bc-r8qvz", - "namespace": "default", - "ownerReferences": [ - { - "apiVersion": "apps/v1", - "blockOwnerDeletion": true, - "controller": true, - "kind": "ReplicaSet", - "name": "server-84c864f5bc", - "uid": "fa376dbb-4f37-4705-8e80-727f592c19b3" - } - ], - "resourceVersion": "517460", - "selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz", - "uid": "f4fc811c-200c-4445-8d4f-532144957dcc" - }, - "spec": { - "containers": [ - { - "env": [ - { - "name": "DUBBO_NAMESPACE", - "value": "default" - }, - { - "name": "NAMESPACE", - "valueFrom": { - "fieldRef": { - "apiVersion": "v1", - "fieldPath": "metadata.namespace" - } - } - } - ], - "image": "192.168.240.101:5000/scott/go-server", - "imagePullPolicy": "Always", - "name": "server", - "resources": {}, - "terminationMessagePath": "/dev/termination-log", - "terminationMessagePolicy": "File", - "volumeMounts": [ - { - "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount", - "name": "dubbo-sa-token-5qbtb", - "readOnly": true - } - ] - } - ], - "dnsPolicy": "ClusterFirst", - "enableServiceLinks": true, - "nodeName": "minikube", - "priority": 0, - "restartPolicy": "Always", - "schedulerName": "default-scheduler", - "securityContext": {}, - "serviceAccount": "dubbo-sa", - "serviceAccountName": "dubbo-sa", - "terminationGracePeriodSeconds": 30, - "tolerations": [ - { - "effect": "NoExecute", - "key": "node.kubernetes.io/not-ready", - "operator": "Exists", - "tolerationSeconds": 300 - }, - { - "effect": "NoExecute", - "key": "node.kubernetes.io/unreachable", - "operator": "Exists", - "tolerationSeconds": 300 - } - ], - "volumes": [ - { - "name": "dubbo-sa-token-5qbtb", - "secret": { - "defaultMode": 420, - "secretName": "dubbo-sa-token-5qbtb" - } - } - ] - }, - "status": { - "conditions": [ - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:14Z", - "status": "True", - "type": "Initialized" - }, - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:15Z", - "status": "True", - "type": "Ready" - }, - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:15Z", - "status": "True", - "type": "ContainersReady" - }, - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:14Z", - "status": "True", - "type": "PodScheduled" - } - ], - "containerStatuses": [ - { - "containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406", - "image": "192.168.240.101:5000/scott/go-server:latest", - "imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461", - "lastState": {}, - "name": "server", - "ready": true, - "restartCount": 0, - "started": true, - "state": { - "running": { - "startedAt": "2020-06-03T03:49:15Z" - } - } - } - ], - "hostIP": "10.0.2.15", - "phase": "Running", - "podIP": "172.17.0.6", - "podIPs": [ - { - "ip": "172.17.0.6" - } - ], - "qosClass": "BestEffort", - "startTime": "2020-06-03T03:49:14Z" - } - } - ], - "kind": "List", - "metadata": { - "resourceVersion": "", - "selfLink": "" - } -} -` - -func getTestRegistry(t *testing.T) *kubernetesRegistry { - const ( - podNameKey = "HOSTNAME" - nameSpaceKey = "NAMESPACE" - needWatchedNameSpaceKey = "DUBBO_NAMESPACE" - ) - pl := &v1.PodList{} - // 1. install test data - if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil { - t.Fatal(err) - } - currentPod := pl.Items[0] - - env := map[string]string{ - nameSpaceKey: currentPod.GetNamespace(), - podNameKey: currentPod.GetName(), - needWatchedNameSpaceKey: "default", - } - - for k, v := range env { - if err := os.Setenv(k, v); err != nil { - t.Fatal(err) - } - } - - regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))) - if err != nil { - t.Fatal(err) - } - out, err := newMockKubernetesRegistry(regurl, pl) - if err != nil { - t.Fatal(err) - } - - return out.(*kubernetesRegistry) -} - -func TestRegister(t *testing.T) { - r := getTestRegistry(t) - defer r.Destroy() - - url, _ := common.NewURL( - "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", - common.WithParamsValue(constant.ClusterKey, "mock"), - common.WithMethods([]string{"GetUser", "AddUser"}), - ) - - err := r.Register(url) - assert.NoError(t, err) - _, _, err = r.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") - if err != nil { - t.Fatal(err) - } -} - -// -//func TestSubscribe(t *testing.T) { -// r := getTestRegistry(t) -// defer r.Destroy() -// -// url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.ClusterKey, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) -// if err != nil { -// t.Fatal(err) -// } -// -// listener, err := r.DoSubscribe(url) -// if err != nil { -// t.Fatal(err) -// } -// -// wg := sync.WaitGroup{} -// wg.Add(1) -// go func() { -// defer wg.Done() -// registerErr := r.Register(url) -// if registerErr != nil { -// t.Error(registerErr) -// } -// }() -// -// wg.Wait() -// -// serviceEvent, err := listener.Next() -// if err != nil { -// t.Fatal(err) -// } -// t.Logf("get service event %s", serviceEvent) -//} - -func TestConsumerDestroy(t *testing.T) { - r := getTestRegistry(t) - - url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", - common.WithParamsValue(constant.ClusterKey, "mock"), - common.WithMethods([]string{"GetUser", "AddUser"})) - - _, err := r.DoSubscribe(url) - if err != nil { - t.Fatal(err) - } - - // listener.Close() - time.Sleep(1e9) - r.Destroy() - - assert.Equal(t, false, r.IsAvailable()) -} - -func TestProviderDestroy(t *testing.T) { - r := getTestRegistry(t) - - url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", - common.WithParamsValue(constant.ClusterKey, "mock"), - common.WithMethods([]string{"GetUser", "AddUser"})) - err := r.Register(url) - assert.NoError(t, err) - - time.Sleep(1e9) - r.Destroy() - assert.Equal(t, false, r.IsAvailable()) -} - -func TestNewRegistry(t *testing.T) { - regUrl, err := common.NewURL("registry://127.0.0.1:443", - common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))) - if err != nil { - t.Fatal(err) - } - _, err = newKubernetesRegistry(regUrl) - if err == nil { - t.Fatal("not in cluster, should be a err") - } -} - -func TestHandleClientRestart(t *testing.T) { - r := getTestRegistry(t) - r.WaitGroup().Add(1) - go r.HandleClientRestart() - time.Sleep(timeSecondDuration(1)) - r.client.Close() -} diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go deleted file mode 100644 index fe161bf72..000000000 --- a/remoting/kubernetes/client.go +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "context" - "strconv" - "sync" -) - -import ( - perrors "github.com/pkg/errors" - - v1 "k8s.io/api/core/v1" - - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/common/logger" -) - -type Client struct { - lock sync.RWMutex - - // manage the client lifecycle - ctx context.Context - cancel context.CancelFunc - - controller *dubboRegistryController -} - -// NewClient returns Client instance for registry -func NewClient(url *common.URL) (*Client, error) { - // read type - r, err := strconv.Atoi(url.GetParams().Get(constant.RegistryRoleKey)) - if err != nil { - return nil, perrors.WithMessage(err, "atoi role") - } - ctx, cancel := context.WithCancel(context.Background()) - - controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient) - if err != nil { - cancel() - return nil, perrors.WithMessage(err, "new dubbo-registry controller") - } - - c := &Client{ - ctx: ctx, - cancel: cancel, - controller: controller, - } - - if r == common.CONSUMER { - // only consumer have to start informer factory - c.controller.startALLInformers() - } - return c, nil -} - -func (c *Client) SetLabel(k, v string) error { - c.lock.Lock() - defer c.lock.Unlock() - - if err := c.controller.assembleLabel(k, v); err != nil { - return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v) - } - - logger.Debugf("put the @key = %s @value = %s success", k, v) - return nil -} - -// Create creates k/v pair in watcher-set -func (c *Client) Create(k, v string) error { - // the read current pod must be lock, protect every - // create operation can be atomic - c.lock.Lock() - defer c.lock.Unlock() - - if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil { - return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v) - } - - logger.Debugf("put the @key = %s @value = %s success", k, v) - return nil -} - -// GetChildren gets k children list from kubernetes-watcherSet -func (c *Client) GetChildren(k string) ([]string, []string, error) { - objectList, err := c.controller.watcherSet.Get(k, true) - if err != nil { - return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k) - } - - var kList []string - var vList []string - - for _, o := range objectList { - kList = append(kList, o.Key) - vList = append(vList, o.Value) - } - - return kList, vList, nil -} - -// Watch watches on spec key -func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) { - w, err := c.controller.watcherSet.Watch(k, false) - if err != nil { - return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k) - } - - return w.ResultChan(), w.done(), nil -} - -// WatchWithPrefix watches on spec prefix -func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) { - w, err := c.controller.watcherSet.Watch(prefix, true) - if err != nil { - return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix) - } - - return w.ResultChan(), w.done(), nil -} - -// if returns false, the client is die -func (c *Client) Valid() bool { - select { - case <-c.Done(): - return false - default: - } - c.lock.RLock() - defer c.lock.RUnlock() - return c.controller != nil -} - -// nolint -func (c *Client) Done() <-chan struct{} { - return c.ctx.Done() -} - -// nolint -func (c *Client) Close() { - select { - case <-c.ctx.Done(): - // already stopped - return - default: - } - c.cancel() - - // the client ctx be canceled - // will trigger the watcherSet watchers all stopped - // so, just wait -} - -// ValidateClient validates the kubernetes client -func ValidateClient(container clientFacade) error { - client := container.Client() - - // new Client - if client == nil || client.Valid() { - - newClient, err := NewClient(container.GetURL()) - if err != nil { - logger.Warnf("new kubernetes client: %v)", err) - return perrors.WithMessage(err, "new kubernetes client") - } - container.SetClient(newClient) - } - - return nil -} - -// NewMockClient exports for registry package test -func NewMockClient(podList *v1.PodList) (*Client, error) { - ctx, cancel := context.WithCancel(context.Background()) - controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) { - return fake.NewSimpleClientset(podList), nil - }) - if err != nil { - cancel() - return nil, perrors.WithMessage(err, "new dubbo-registry controller") - } - - c := &Client{ - ctx: ctx, - cancel: cancel, - controller: controller, - } - - c.controller.startALLInformers() - return c, nil -} diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go deleted file mode 100644 index be7256167..000000000 --- a/remoting/kubernetes/client_test.go +++ /dev/null @@ -1,455 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "encoding/json" - "fmt" - _ "net/http/pprof" - "os" - "strings" - "sync" - "testing" -) - -import ( - v1 "k8s.io/api/core/v1" -) - -// tests dataset -var tests = []struct { - input struct { - k string - v string - } -}{ - {input: struct { - k string - v string - }{k: "name", v: "scott.wang"}}, - {input: struct { - k string - v string - }{k: "namePrefix", v: "prefix.scott.wang"}}, - {input: struct { - k string - v string - }{k: "namePrefix1", v: "prefix1.scott.wang"}}, - {input: struct { - k string - v string - }{k: "age", v: "27"}}, -} - -// test dataset prefix -const prefix = "name" - -var watcherStopLog = "the watcherSet watcher was stopped" - -var clientPodListJsonData = `{ - "apiVersion": "v1", - "items": [ - { - "apiVersion": "v1", - "kind": "Pod", - "metadata": { - "annotations": { - "dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5saW1pdCUzRCUyNmV4ZWN1dGUubGltaXQucmVqZWN0ZWQuaGFuZGxlciUzRCUyNmdyb3VwJTNEJTI2aW50ZXJmYWNlJTNEY29tLmlrdXJlbnRvLnVzZXIuVXNlclByb3ZpZGVyJTI2aXAlM0QxNzIuMTcuMC42JTI2bG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIubG9hZGJhbGFuY2UlM0RyYW5kb20lMjZtZXRob2RzLkdldFVzZXIucmV0cmllcyUzRDElMjZtZXRob2RzLkdldFVzZXIudHBzLmxpbWl0LmludGVydmFsJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5yYXRlJTNEJTI2bWV0aG9kcy5HZXRVc2VyLnRwcy5saW1pdC5zdHJhdGVneSUzRCUyNm1ldGhvZHMuR2V0VXNlci53ZWlnaHQlM0QwJTI2bW9kdWxlJTNEZHViYm9nbyUyQnVzZXItaW5mbyUyQnNlcnZlciUyNm5hbWUlM0RCRFRTZXJ2aWNlJTI2b3JnYW5pemF0aW9uJTNEaWt1cmVudG8uY29tJTI2b3duZXIlM0RaWCUyNnBhcmFtLnNpZ24lM0QlMjZwaWQlM0Q2JTI2cmVnaXN0cnkucm9sZSUzRDMlMjZyZWxlYXNlJTNEZHViYm8tZ29sYW5nLTEuMy4wJTI2cmV0cmllcyUzRCUyNnNlcnZpY2UuZmlsdGVyJTNEZWNobyUyNTJDdG9rZW4lMjUyQ2FjY2Vzc2xvZyUyNTJDdHBzJTI1MkNnZW5lcmljX3NlcnZpY2UlMjUyQ2V4ZWN1dGUlMjUyQ3BzaHV0ZG93biUyNnNpZGUlM0Rwcm92aWRlciUyNnRpbWVzdGFtcCUzRDE1OTExNTYxNTUlMjZ0cHMubGltaXQuaW50ZXJ2YWwlM0QlMjZ0cHMubGltaXQucmF0ZSUzRCUyNnRwcy5saW1pdC5yZWplY3RlZC5oYW5kbGVyJTNEJTI2dHBzLmxpbWl0LnN0cmF0ZWd5JTNEJTI2dHBzLmxpbWl0ZXIlM0QlMjZ2ZXJzaW9uJTNEJTI2d2FybXVwJTNEMTAwIiwidiI6IiJ9XQ==" - }, - "creationTimestamp": "2020-06-03T03:49:14Z", - "generateName": "server-84c864f5bc-", - "labels": { - "dubbo.io/label": "dubbo.io-value", - "pod-template-hash": "84c864f5bc", - "role": "server" - }, - "name": "server-84c864f5bc-r8qvz", - "namespace": "default", - "ownerReferences": [ - { - "apiVersion": "apps/v1", - "blockOwnerDeletion": true, - "controller": true, - "kind": "ReplicaSet", - "name": "server-84c864f5bc", - "uid": "fa376dbb-4f37-4705-8e80-727f592c19b3" - } - ], - "resourceVersion": "517460", - "selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz", - "uid": "f4fc811c-200c-4445-8d4f-532144957dcc" - }, - "spec": { - "containers": [ - { - "env": [ - { - "name": "DUBBO_NAMESPACE", - "value": "default" - }, - { - "name": "NAMESPACE", - "valueFrom": { - "fieldRef": { - "apiVersion": "v1", - "fieldPath": "metadata.namespace" - } - } - } - ], - "image": "192.168.240.101:5000/scott/go-server", - "imagePullPolicy": "Always", - "name": "server", - "resources": {}, - "terminationMessagePath": "/dev/termination-log", - "terminationMessagePolicy": "File", - "volumeMounts": [ - { - "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount", - "name": "dubbo-sa-token-5qbtb", - "readOnly": true - } - ] - } - ], - "dnsPolicy": "ClusterFirst", - "enableServiceLinks": true, - "nodeName": "minikube", - "priority": 0, - "restartPolicy": "Always", - "schedulerName": "default-scheduler", - "securityContext": {}, - "serviceAccount": "dubbo-sa", - "serviceAccountName": "dubbo-sa", - "terminationGracePeriodSeconds": 30, - "tolerations": [ - { - "effect": "NoExecute", - "key": "node.kubernetes.io/not-ready", - "operator": "Exists", - "tolerationSeconds": 300 - }, - { - "effect": "NoExecute", - "key": "node.kubernetes.io/unreachable", - "operator": "Exists", - "tolerationSeconds": 300 - } - ], - "volumes": [ - { - "name": "dubbo-sa-token-5qbtb", - "secret": { - "defaultMode": 420, - "secretName": "dubbo-sa-token-5qbtb" - } - } - ] - }, - "status": { - "conditions": [ - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:14Z", - "status": "True", - "type": "Initialized" - }, - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:15Z", - "status": "True", - "type": "Ready" - }, - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:15Z", - "status": "True", - "type": "ContainersReady" - }, - { - "lastProbeTime": null, - "lastTransitionTime": "2020-06-03T03:49:14Z", - "status": "True", - "type": "PodScheduled" - } - ], - "containerStatuses": [ - { - "containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406", - "image": "192.168.240.101:5000/scott/go-server:latest", - "imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461", - "lastState": {}, - "name": "server", - "ready": true, - "restartCount": 0, - "started": true, - "state": { - "running": { - "startedAt": "2020-06-03T03:49:15Z" - } - } - } - ], - "hostIP": "10.0.2.15", - "phase": "Running", - "podIP": "172.17.0.6", - "podIPs": [ - { - "ip": "172.17.0.6" - } - ], - "qosClass": "BestEffort", - "startTime": "2020-06-03T03:49:14Z" - } - } - ], - "kind": "List", - "metadata": { - "resourceVersion": "", - "selfLink": "" - } -} -` - -func getTestClient(t *testing.T) *Client { - pl := &v1.PodList{} - // 1. install test data - if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil { - t.Fatal(err) - } - currentPod := pl.Items[0] - - env := map[string]string{ - nameSpaceKey: currentPod.GetNamespace(), - podNameKey: currentPod.GetName(), - needWatchedNameSpaceKey: "default", - } - - for k, v := range env { - if err := os.Setenv(k, v); err != nil { - t.Fatal(err) - } - } - - client, err := NewMockClient(pl) - if err != nil { - t.Fatal(err) - } - - return client -} - -func TestClientValid(t *testing.T) { - client := getTestClient(t) - defer client.Close() - - if !client.Valid() { - t.Fatal("client is not valid") - } - - client.Close() - if client.Valid() { - t.Fatal("client is valid") - } -} - -func TestClientDone(t *testing.T) { - client := getTestClient(t) - - go func() { - client.Close() - }() - - <-client.Done() - - if client.Valid() { - t.Fatal("client should be invalid") - } -} - -func TestClientCreateKV(t *testing.T) { - client := getTestClient(t) - defer client.Close() - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - - if err := client.Create(k, v); err != nil { - t.Fatal(err) - } - - } -} - -func TestClientGetChildrenKVList(t *testing.T) { - client := getTestClient(t) - defer client.Close() - - wg := sync.WaitGroup{} - wg.Add(1) - - syncDataComplete := make(chan struct{}) - - go func() { - wc, done, err := client.WatchWithPrefix(prefix) - if err != nil { - t.Error(err) - return - } - - wg.Done() - i := 0 - - for { - select { - case e := <-wc: - i++ - fmt.Printf("got event %v k %s v %s\n", e.EventType, e.Key, e.Value) - if i == 3 { - // already sync all event - syncDataComplete <- struct{}{} - return - } - case <-done: - t.Log(watcherStopLog) - return - } - } - }() - - // wait the watch goroutine start - wg.Wait() - - expect := make(map[string]string) - got := make(map[string]string) - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - - if strings.Contains(k, prefix) { - expect[k] = v - } - - if err := client.Create(k, v); err != nil { - t.Fatal(err) - } - } - - <-syncDataComplete - - // start get all children - kList, vList, err := client.GetChildren(prefix) - if err != nil { - t.Error(err) - } - - for i := 0; i < len(kList); i++ { - got[kList[i]] = vList[i] - } - - for expectK, expectV := range expect { - if got[expectK] != expectV { - t.Fatalf("expect {%s: %s} but got {%s: %v}", expectK, expectV, expectK, got[expectK]) - } - } -} - -func TestClientWatchPrefix(t *testing.T) { - client := getTestClient(t) - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - wc, done, err := client.WatchWithPrefix(prefix) - if err != nil { - t.Error(err) - } - - wg.Done() - - for { - select { - case e := <-wc: - t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) - case <-done: - t.Log(watcherStopLog) - return - } - } - }() - - // must wait the watch goroutine work - wg.Wait() - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - - if err := client.Create(k, v); err != nil { - t.Fatal(err) - } - } - - client.Close() -} - -func TestClientWatch(t *testing.T) { - client := getTestClient(t) - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - wc, done, err := client.Watch(prefix) - if err != nil { - t.Error(err) - } - wg.Done() - - for { - select { - case e := <-wc: - t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value) - case <-done: - t.Log(watcherStopLog) - return - } - } - }() - - // must wait the watch goroutine already start the watch goroutine - wg.Wait() - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - - if err := client.Create(k, v); err != nil { - t.Fatal(err) - } - } - - client.Close() -} diff --git a/remoting/kubernetes/facade.go b/remoting/kubernetes/facade.go deleted file mode 100644 index 63115b02e..000000000 --- a/remoting/kubernetes/facade.go +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "dubbo.apache.org/dubbo-go/v3/common" -) - -type clientFacade interface { - Client() *Client - SetClient(*Client) - common.Node -} diff --git a/remoting/kubernetes/facade_test.go b/remoting/kubernetes/facade_test.go deleted file mode 100644 index 72ddb067d..000000000 --- a/remoting/kubernetes/facade_test.go +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "strconv" - "testing" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" -) - -type mockFacade struct { - *common.URL - client *Client - // cltLock sync.Mutex - // done chan struct{} -} - -func (r *mockFacade) Client() *Client { - return r.client -} - -func (r *mockFacade) SetClient(client *Client) { - r.client = client -} - -func (r *mockFacade) GetURL() *common.URL { - return r.URL -} - -func (r *mockFacade) Destroy() { - // TODO implementation me -} - -func (r *mockFacade) RestartCallBack() bool { - return true -} - -func (r *mockFacade) IsAvailable() bool { - return true -} - -func Test_Facade(t *testing.T) { - regUrl, err := common.NewURL("registry://127.0.0.1:443", - common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.CONSUMER))) - if err != nil { - t.Fatal(err) - } - - mockClient := getTestClient(t) - m := &mockFacade{ - URL: regUrl, - client: mockClient, - } - - if err := ValidateClient(m); err == nil { - t.Fatal("out of cluster should err") - } - mockClient.Close() -} diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go deleted file mode 100644 index 9a8e0dcb6..000000000 --- a/remoting/kubernetes/listener.go +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "sync" -) - -import ( - perrors "github.com/pkg/errors" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common/logger" - "dubbo.apache.org/dubbo-go/v3/remoting" -) - -type EventListener struct { - client *Client - keyMapLock sync.RWMutex - keyMap map[string]struct{} - wg sync.WaitGroup -} - -func NewEventListener(client *Client) *EventListener { - return &EventListener{ - client: client, - keyMap: make(map[string]struct{}, 8), - } -} - -// Listen on a spec key -// this method returns true when spec key deleted, -// this method returns false when deep layer connection lose -func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool { - defer l.wg.Done() - for { - wc, done, err := l.client.Watch(key) - if err != nil { - logger.Warnf("watch exist{key:%s} = error{%v}", key, err) - return false - } - - select { - - // client stopped - case <-l.client.Done(): - logger.Warnf("kubernetes client stopped") - return false - - // watcherSet watcher stopped - case <-done: - logger.Warnf("kubernetes watcherSet watcher stopped") - return false - - // handle kubernetes-watcherSet events - case e, ok := <-wc: - if !ok { - logger.Warnf("kubernetes-watcherSet watch-chan closed") - return false - } - - if l.handleEvents(e, listener...) { - // if event is delete - return true - } - } - } -} - -// return true means the event type is DELETE -// return false means the event type is CREATE || UPDATE -func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool { - logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key) - - switch event.EventType { - case Create: - for _, listener := range listeners { - logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataCreated}", event.Key) - listener.DataChange(remoting.Event{ - Path: string(event.Key), - Action: remoting.EventTypeAdd, - Content: string(event.Value), - }) - } - return false - case Update: - for _, listener := range listeners { - logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataChanged}", event.Key) - listener.DataChange(remoting.Event{ - Path: string(event.Key), - Action: remoting.EventTypeUpdate, - Content: string(event.Value), - }) - } - return false - case Delete: - logger.Warnf("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDeleted}", event.Key) - return true - default: - return false - } -} - -// Listen on a set of key with spec prefix -func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) { - defer l.wg.Done() - for { - wc, done, err := l.client.WatchWithPrefix(prefix) - if err != nil { - logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err) - } - - select { - // client stopped - case <-l.client.Done(): - logger.Warnf("kubernetes client stopped") - return - - // watcher stopped - case <-done: - logger.Warnf("kubernetes watcherSet watcher stopped") - return - - // kuberentes-watcherSet event stream - case e, ok := <-wc: - - if !ok { - logger.Warnf("kubernetes-watcherSet watch-chan closed") - return - } - - l.handleEvents(e, listener...) - } - } -} - -// this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener -// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent -// | -// --------> ListenServiceNodeEvent -func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { - l.keyMapLock.RLock() - _, ok := l.keyMap[key] - l.keyMapLock.RUnlock() - if ok { - logger.Warnf("kubernetes-watcherSet key %s has already been listened.", key) - return - } - - l.keyMapLock.Lock() - // double check - if _, ok := l.keyMap[key]; ok { - // another goroutine already set it - l.keyMapLock.Unlock() - return - } - l.keyMap[key] = struct{}{} - l.keyMapLock.Unlock() - - keyList, valueList, err := l.client.GetChildren(key) - if err != nil { - logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children")) - } - - logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList) - - for i, k := range keyList { - logger.Infof("got children list key -> %s", k) - listener.DataChange(remoting.Event{ - Path: k, - Action: remoting.EventTypeAdd, - Content: valueList[i], - }) - } - - logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key) - - l.wg.Add(1) - go func(key string, listener remoting.DataListener) { - l.ListenServiceNodeEventWithPrefix(key, listener) - logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key) - }(key, listener) - - logger.Infof("listen dubbo service key{%s}", key) - l.wg.Add(1) - go func(key string) { - if l.ListenServiceNodeEvent(key) { - listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel}) - } - logger.Warnf("listenSelf(kubernetes key{%s}) goroutine exit now", key) - }(key) -} - -func (l *EventListener) Close() { - l.wg.Wait() -} diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go deleted file mode 100644 index ffe58cbb9..000000000 --- a/remoting/kubernetes/listener_test.go +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "testing" - "time" -) - -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/remoting" -) - -var changedData = ` - dubbo.consumer.request_timeout=3s - dubbo.consumer.connect_timeout=5s - dubbo.application.organization=ikurento.com - dubbo.application.name=BDTService - dubbo.application.module=dubbogo user-info server - dubbo.application.version=0.0.1 - dubbo.application.owner=ZX - dubbo.application.environment=dev - dubbo.registries.hangzhouzk.protocol=zookeeper - dubbo.registries.hangzhouzk.timeout=3s - dubbo.registries.hangzhouzk.address=127.0.0.1:2181 - dubbo.registries.shanghaizk.protocol=zookeeper - dubbo.registries.shanghaizk.timeout=3s - dubbo.registries.shanghaizk.address=127.0.0.1:2182 - dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo - dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider - dubbo.service.com.ikurento.user.UserProvider.loadbalance=random - dubbo.service.com.ikurento.user.UserProvider.warmup=100 - dubbo.service.com.ikurento.user.UserProvider.cluster=failover -` - -type mockDataListener struct { - eventList []remoting.Event - client *Client - changedData string - - rc chan remoting.Event -} - -func (m *mockDataListener) DataChange(eventType remoting.Event) bool { - m.eventList = append(m.eventList, eventType) - if eventType.Content == m.changedData { - m.rc <- eventType - } - return true -} - -func TestListener(t *testing.T) { - tests := []struct { - input struct { - k string - v string - } - }{ - {input: struct { - k string - v string - }{k: "/dubbo", v: changedData}}, - } - - c := getTestClient(t) - defer c.Close() - - listener := NewEventListener(c) - dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)} - listener.ListenServiceEvent("/dubbo", dataListener) - time.Sleep(1e9) - - for _, tc := range tests { - - k := tc.input.k - v := tc.input.v - if err := c.Create(k, v); err != nil { - t.Fatal(err) - } - - } - msg := <-dataListener.rc - assert.Equal(t, changedData, msg.Content) -} diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go deleted file mode 100644 index f6a804aa1..000000000 --- a/remoting/kubernetes/registry_controller.go +++ /dev/null @@ -1,633 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io/ioutil" - "os" - "strconv" - "strings" - "sync" - "time" -) - -import ( - perrors "github.com/pkg/errors" - - v1 "k8s.io/api/core/v1" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/watch" - - "k8s.io/client-go/informers" - informerscorev1 "k8s.io/client-go/informers/core/v1" - - "k8s.io/client-go/kubernetes" - - "k8s.io/client-go/rest" - - "k8s.io/client-go/tools/cache" - - "k8s.io/client-go/util/workqueue" -) - -import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/logger" -) - -const ( - // kubernetes inject env var - podNameKey = "HOSTNAME" - nameSpaceKey = "NAMESPACE" - nameSpaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - needWatchedNameSpaceKey = "DUBBO_NAMESPACE" - - // all pod annotation key - DubboIOAnnotationKey = "dubbo.io/annotation" - // all pod label key and value pair - DubboIOLabelKey = "dubbo.io/label" - DubboIOConsumerLabelValue = "dubbo.io.consumer" - DubboIOProviderLabelValue = "dubbo.io.provider" - - // kubernetes suggest resync - defaultResync = 5 * time.Minute -) - -var ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist") - -// dubboRegistryController works like a kubernetes controller -type dubboRegistryController struct { - - // clone from client - // manage lifecycle - ctx context.Context - - role common.RoleType - - // protect patch current pod operation - lock sync.Mutex - - // current pod config - needWatchedNamespace map[string]struct{} - namespace string - name string - - watcherSet WatcherSet - - // kubernetes - kc kubernetes.Interface - listAndWatchStartResourceVersion uint64 - namespacedInformerFactory map[string]informers.SharedInformerFactory - namespacedPodInformers map[string]informerscorev1.PodInformer - queue workqueue.Interface // shared by namespaced informers -} - -func newDubboRegistryController( - ctx context.Context, - // different provider and consumer have behavior - roleType common.RoleType, - // used to inject mock kubernetes client - kcGetter func() (kubernetes.Interface, error), -) (*dubboRegistryController, error) { - - kc, err := kcGetter() - if err != nil { - return nil, perrors.WithMessage(err, "get kubernetes client") - } - - c := &dubboRegistryController{ - ctx: ctx, - role: roleType, - watcherSet: newWatcherSet(ctx), - needWatchedNamespace: make(map[string]struct{}), - namespacedInformerFactory: make(map[string]informers.SharedInformerFactory), - namespacedPodInformers: make(map[string]informerscorev1.PodInformer), - kc: kc, - } - - if err := c.readConfig(); err != nil { - return nil, perrors.WithMessage(err, "read config") - } - - if err := c.initCurrentPod(); err != nil { - return nil, perrors.WithMessage(err, "init current pod") - } - - if err := c.initWatchSet(); err != nil { - return nil, perrors.WithMessage(err, "init watch set") - } - - if err := c.initPodInformer(); err != nil { - return nil, perrors.WithMessage(err, "init pod informer") - } - - go c.run() - - return c, nil -} - -// GetInClusterKubernetesClient -// current pod running in kubernetes-cluster -func GetInClusterKubernetesClient() (kubernetes.Interface, error) { - // read in-cluster config - cfg, err := rest.InClusterConfig() - if err != nil { - return nil, perrors.WithMessage(err, "get in-cluster config") - } - - return kubernetes.NewForConfig(cfg) -} - -// initWatchSet -// 1. get all with dubbo label pods -// 2. put every element to watcherSet -// 3. refresh watch book-mark -func (c *dubboRegistryController) initWatchSet() error { - req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue}) - if err != nil { - return perrors.WithMessage(err, "new requirement") - } - - for ns := range c.needWatchedNamespace { - pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{ - LabelSelector: req.String(), - }) - if err != nil { - return perrors.WithMessagef(err, "list pods in namespace (%s)", ns) - } - for _, p := range pods.Items { - // set resource version - rv, err := strconv.ParseUint(p.GetResourceVersion(), 10, 0) - if err != nil { - return perrors.WithMessagef(err, "parse resource version %s", p.GetResourceVersion()) - } - if c.listAndWatchStartResourceVersion < rv { - c.listAndWatchStartResourceVersion = rv - } - c.handleWatchedPodEvent(&p, watch.Added) - } - } - return nil -} - -// read dubbo-registry controller config -// 1. current pod name -// 2. current pod working namespace -func (c *dubboRegistryController) readConfig() error { - // read current pod name && namespace - c.name = os.Getenv(podNameKey) - if len(c.name) == 0 { - return perrors.Errorf("read pod name from env %s failed", podNameKey) - } - namespace, err := ioutil.ReadFile(nameSpaceFilePath) - if err == nil && len(namespace) != 0 { - c.namespace = string(namespace) - return nil - } - c.namespace = os.Getenv(nameSpaceKey) - if len(c.namespace) != 0 { - return nil - } - return perrors.Errorf("get empty namesapce, please check if namespace file at %s exist, or environment %s"+ - " is set", nameSpaceFilePath, nameSpaceKey) - -} - -func (c *dubboRegistryController) initNamespacedPodInformer(ns string) error { - req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue}) - if err != nil { - return perrors.WithMessage(err, "new requirement") - } - - informersFactory := informers.NewSharedInformerFactoryWithOptions( - c.kc, - defaultResync, - informers.WithNamespace(ns), - informers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.LabelSelector = req.String() - options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10) - }), - ) - podInformer := informersFactory.Core().V1().Pods() - - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.addPod, - UpdateFunc: c.updatePod, - DeleteFunc: c.deletePod, - }) - - c.namespacedInformerFactory[ns] = informersFactory - c.namespacedPodInformers[ns] = podInformer - - return nil -} - -func (c *dubboRegistryController) initPodInformer() error { - if c.role == common.PROVIDER { - return nil - } - - // read need watched namespaces list - needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey) - if len(needWatchedNameSpaceList) == 0 { - return perrors.New("read value from env by key (DUBBO_NAMESPACE)") - } - for _, ns := range strings.Split(needWatchedNameSpaceList, ",") { - c.needWatchedNamespace[ns] = struct{}{} - } - // current work namespace should be watched - c.needWatchedNamespace[c.namespace] = struct{}{} - - c.queue = workqueue.New() - - // init all watch needed pod-informer - for watchedNS := range c.needWatchedNamespace { - if err := c.initNamespacedPodInformer(watchedNS); err != nil { - return err - } - } - return nil -} - -type kubernetesEvent struct { - p *v1.Pod - t watch.EventType -} - -func (c *dubboRegistryController) addPod(obj interface{}) { - p, ok := obj.(*v1.Pod) - if !ok { - logger.Warnf("pod-informer got object %T not *v1.Pod", obj) - return - } - c.queue.Add(&kubernetesEvent{ - t: watch.Added, - p: p, - }) -} - -func (c *dubboRegistryController) updatePod(oldObj, newObj interface{}) { - op, ok := oldObj.(*v1.Pod) - if !ok { - logger.Warnf("pod-informer got object %T not *v1.Pod", oldObj) - return - } - np, ok := newObj.(*v1.Pod) - if !ok { - logger.Warnf("pod-informer got object %T not *v1.Pod", newObj) - return - } - if op.GetResourceVersion() == np.GetResourceVersion() { - return - } - c.queue.Add(&kubernetesEvent{ - p: np, - t: watch.Modified, - }) -} - -func (c *dubboRegistryController) deletePod(obj interface{}) { - p, ok := obj.(*v1.Pod) - if !ok { - logger.Warnf("pod-informer got object %T not *v1.Pod", obj) - return - } - c.queue.Add(&kubernetesEvent{ - p: p, - t: watch.Deleted, - }) -} - -func (c *dubboRegistryController) startALLInformers() { - logger.Debugf("starting namespaced informer-factory") - for _, factory := range c.namespacedInformerFactory { - go factory.Start(c.ctx.Done()) - } -} - -// run -// controller process every event in work-queue -func (c *dubboRegistryController) run() { - if c.role == common.PROVIDER { - return - } - - defer logger.Warn("dubbo registry controller work stopped") - defer c.queue.ShutDown() - - for ns, podInformer := range c.namespacedPodInformers { - if !cache.WaitForCacheSync(c.ctx.Done(), podInformer.Informer().HasSynced) { - logger.Errorf("wait for cache sync finish @namespace %s fail", ns) - return - } - } - - logger.Infof("kubernetes registry-controller running @Namespace = %q @PodName = %q", c.namespace, c.name) - - // start work - go c.work() - // block wait context cancel - <-c.ctx.Done() -} - -func (c *dubboRegistryController) work() { - for c.processNextWorkItem() { - } -} - -// processNextWorkItem process work-queue elements -func (c *dubboRegistryController) processNextWorkItem() bool { - item, shutdown := c.queue.Get() - if shutdown { - return false - } - defer c.queue.Done(item) - o := item.(*kubernetesEvent) - c.handleWatchedPodEvent(o.p, o.t) - return true -} - -// handleWatchedPodEvent handles watched pod event -func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) { - logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName()) - - for ak, av := range p.GetAnnotations() { - // not dubbo interest annotation - if ak != DubboIOAnnotationKey { - continue - } - ol, err := c.unmarshalRecord(av) - if err != nil { - logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err) - return - } - for _, o := range ol { - switch eventType { - case watch.Added: - // if pod is added, the record always be create - o.EventType = Create - case watch.Modified: - o.EventType = Update - case watch.Deleted: - o.EventType = Delete - default: - logger.Errorf("no valid kubernetes event-type (%s) ", eventType) - return - } - - logger.Debugf("putting @key=%s @value=%s to watcherSet", o.Key, o.Value) - if err := c.watcherSet.Put(o); err != nil { - logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err) - return - } - } - } -} - -// unmarshalRecord unmarshals the kubernetes dubbo annotation value -func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) { - if len(record) == 0 { - // []*WatcherEvent is nil. - return nil, nil - } - - rawMsg, err := base64.URLEncoding.DecodeString(record) - if err != nil { - return nil, perrors.WithMessagef(err, "decode record (%s)", record) - } - - var out []*WatcherEvent - if err := json.Unmarshal(rawMsg, &out); err != nil { - return nil, perrors.WithMessage(err, "decode json") - } - return out, nil -} - -// initCurrentPod -// 1. get current pod -// 2. give the dubbo-label for this pod -func (c *dubboRegistryController) initCurrentPod() error { - currentPod, err := c.readCurrentPod() - if err != nil { - return perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace) - } - - oldPod, newPod, err := c.assembleDUBBOLabel(currentPod) - if err != nil { - if err == ErrDubboLabelAlreadyExist { - return nil - } - return perrors.WithMessage(err, "assemble dubbo label") - } - // current pod don't have label - p, err := c.getPatch(oldPod, newPod) - if err != nil { - return perrors.WithMessage(err, "get patch") - } - - _, err = c.patchCurrentPod(p) - if err != nil { - return perrors.WithMessage(err, "patch to current pod") - } - - return nil -} - -// patchCurrentPod writes new meta for current pod -func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) { - updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch) - if err != nil { - return nil, perrors.WithMessage(err, "patch in kubernetes pod ") - } - return updatedPod, nil -} - -func (c *dubboRegistryController) assembleLabel(k, v string) error { - var ( - oldPod = &v1.Pod{} - newPod = &v1.Pod{} - ) - oldPod.Labels = make(map[string]string, 8) - newPod.Labels = make(map[string]string, 8) - currentPod, err := c.readCurrentPod() - if err != nil { - return err - } - // copy current pod labels to oldPod && newPod - for k, v := range currentPod.GetLabels() { - oldPod.Labels[k] = v - newPod.Labels[k] = v - } - newPod.Labels[k] = v - - p, err := c.getPatch(oldPod, newPod) - if err != nil { - return perrors.WithMessage(err, "get patch") - } - - _, err = c.patchCurrentPod(p) - if err != nil { - return perrors.WithMessage(err, "patch to current pod") - } - return nil -} - -// assembleDUBBOLabel assembles the dubbo kubernetes label -// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label -func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) { - var ( - oldPod = &v1.Pod{} - newPod = &v1.Pod{} - ) - oldPod.Labels = make(map[string]string, 8) - newPod.Labels = make(map[string]string, 8) - - if p.GetLabels() != nil { - if _, ok := p.GetLabels()[DubboIOLabelKey]; ok { - // already have label - return nil, nil, ErrDubboLabelAlreadyExist - } - } - - // copy current pod labels to oldPod && newPod - for k, v := range p.GetLabels() { - oldPod.Labels[k] = v - newPod.Labels[k] = v - } - - // assign new label for current pod - switch c.role { - case common.CONSUMER: - newPod.Labels[DubboIOLabelKey] = DubboIOConsumerLabelValue - case common.PROVIDER: - newPod.Labels[DubboIOLabelKey] = DubboIOProviderLabelValue - default: - return nil, nil, perrors.New(fmt.Sprintf("unknown role %s", c.role)) - } - return oldPod, newPod, nil -} - -// assembleDUBBOAnnotations assembles the dubbo kubernetes annotations -// accord the current pod && (k,v) assemble the old-pod, new-pod -func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) { - oldPod = &v1.Pod{} - newPod = &v1.Pod{} - oldPod.Annotations = make(map[string]string, 8) - newPod.Annotations = make(map[string]string, 8) - - for k, v := range currentPod.GetAnnotations() { - oldPod.Annotations[k] = v - newPod.Annotations[k] = v - } - - al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey]) - if err != nil { - err = perrors.WithMessage(err, "unmarshal record") - return - } - - newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v})) - if err != nil { - err = perrors.WithMessage(err, "marshal record") - return - } - - newPod.Annotations[DubboIOAnnotationKey] = newAnnotations - return -} - -// getPatch gets the kubernetes pod patch bytes -func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) { - oldData, err := json.Marshal(oldPod) - if err != nil { - return nil, perrors.WithMessage(err, "marshal old pod") - } - - newData, err := json.Marshal(newPod) - if err != nil { - return nil, perrors.WithMessage(err, "marshal newPod pod") - } - - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{}) - if err != nil { - return nil, perrors.WithMessage(err, "create two-way-merge-patch") - } - return patchBytes, nil -} - -// marshalRecord marshals the kubernetes dubbo annotation value -func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) { - msg, err := json.Marshal(ol) - if err != nil { - return "", perrors.WithMessage(err, "json encode object list") - } - return base64.URLEncoding.EncodeToString(msg), nil -} - -// readCurrentPod reads from kubernetes-env current pod status -func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) { - currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{}) - if err != nil { - return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace) - } - return currentPod, nil -} - -// addAnnotationForCurrentPod adds annotation for current pod -func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error { - c.lock.Lock() - defer c.lock.Unlock() - - // 1. accord old pod && (k, v) assemble new pod dubbo annotation v - // 2. get patch data - // 3. PATCH the pod - currentPod, err := c.readCurrentPod() - if err != nil { - return perrors.WithMessage(err, "read current pod") - } - - oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod) - if err != nil { - return perrors.WithMessage(err, "assemble") - } - - patchBytes, err := c.getPatch(oldPod, newPod) - if err != nil { - return perrors.WithMessage(err, "get patch") - } - - _, err = c.patchCurrentPod(patchBytes) - if err != nil { - return perrors.WithMessage(err, "patch current pod") - } - - return c.watcherSet.Put(&WatcherEvent{ - Key: k, - Value: v, - EventType: Create, - }) -} diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go deleted file mode 100644 index 34f5a3458..000000000 --- a/remoting/kubernetes/watch.go +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "context" - "strconv" - "strings" - "sync" -) - -import ( - perrors "github.com/pkg/errors" -) - -var ( - ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped") - ErrKVPairNotFound = perrors.New("k/v pair not found") -) - -const ( - defaultWatcherChanSize = 100 -) - -type eventType int - -const ( - Create eventType = iota - Update - Delete -) - -func (e eventType) String() string { - switch e { - case Create: - return "CREATE" - case Update: - return "UPDATE" - case Delete: - return "DELETE" - default: - return "UNKNOWN" - } -} - -// WatcherEvent -// watch event is element in watcherSet -type WatcherEvent struct { - // event-type - EventType eventType `json:"-"` - // the dubbo-go should consume the key - Key string `json:"k"` - // the dubbo-go should consume the value - Value string `json:"v"` -} - -// Watchable WatcherSet -// thread-safe -type WatcherSet interface { - - // put the watch event to the watch set - Put(object *WatcherEvent) error - // if prefix is false, - // the len([]*WatcherEvent) == 1 - Get(key string, prefix bool) ([]*WatcherEvent, error) - // watch the spec key or key prefix - Watch(key string, prefix bool) (Watcher, error) - // check the watcher set status - Done() <-chan struct{} -} - -// Watcher -type Watcher interface { - // the watcher's id - ID() string - // result stream - ResultChan() <-chan *WatcherEvent - // Stop the watcher - stop() - // check the watcher status - done() <-chan struct{} -} - -// the watch set implement -type watcherSetImpl struct { - - // Client's ctx, client die, the watch set will die too - ctx context.Context - - // protect watcher-set and watchers - lock sync.RWMutex - - // the key is dubbo-go interest meta - cache map[string]*WatcherEvent - - currentWatcherId uint64 - watchers map[uint64]*watcher -} - -// closeWatchers -// when the watcher-set was closed -func (s *watcherSetImpl) closeWatchers() { - <-s.ctx.Done() - // parent ctx be canceled, close the watch-set's watchers - s.lock.Lock() - watchers := s.watchers - s.lock.Unlock() - - for _, w := range watchers { - // stop data stream - // close(w.ch) - // stop watcher - w.stop() - } -} - -// Watch -// watch on spec key, with or without prefix -func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) { - return s.addWatcher(key, prefix) -} - -// Done gets the watcher-set status -func (s *watcherSetImpl) Done() <-chan struct{} { - return s.ctx.Done() -} - -// Put puts the watch event to watcher-set -func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error { - blockSendMsg := func(object *WatcherEvent, w *watcher) { - select { - case <-w.done(): - // the watcher already stop - case w.ch <- object: - // block send the msg - } - } - - s.lock.Lock() - defer s.lock.Unlock() - - if err := s.valid(); err != nil { - return err - } - - // put to watcher-set - switch watcherEvent.EventType { - case Delete: - // delete from store - delete(s.cache, watcherEvent.Key) - case Update, Create: - o, ok := s.cache[watcherEvent.Key] - if !ok { - // pod update, but create new k/v pair - watcherEvent.EventType = Create - s.cache[watcherEvent.Key] = watcherEvent - break - } - // k/v pair already latest - if o.Value == watcherEvent.Value { - return nil - } - // update to latest status - s.cache[watcherEvent.Key] = watcherEvent - } - - // notify watcher - for _, w := range s.watchers { - if !strings.Contains(watcherEvent.Key, w.interested.key) { - // this watcher no interest in this element - continue - } - if !w.interested.prefix { - if watcherEvent.Key == w.interested.key { - blockSendMsg(watcherEvent, w) - } - // not interest - continue - } - blockSendMsg(watcherEvent, w) - } - return nil -} - -// valid -func (s *watcherSetImpl) valid() error { - select { - case <-s.ctx.Done(): - return ErrWatcherSetAlreadyStopped - default: - return nil - } -} - -// addWatcher -func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) { - if err := s.valid(); err != nil { - return nil, err - } - - s.lock.Lock() - defer s.lock.Unlock() - - // increase the watcher-id - s.currentWatcherId++ - - w := &watcher{ - id: s.currentWatcherId, - watcherSet: s, - interested: struct { - key string - prefix bool - }{key: key, prefix: prefix}, - ch: make(chan *WatcherEvent, defaultWatcherChanSize), - exit: make(chan struct{}), - } - s.watchers[s.currentWatcherId] = w - return w, nil -} - -// Get gets elements from watcher-set -func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) { - s.lock.RLock() - defer s.lock.RUnlock() - - if err := s.valid(); err != nil { - return nil, err - } - - if !prefix { - for k, v := range s.cache { - if k == key { - return []*WatcherEvent{v}, nil - } - } - // object - return nil, ErrKVPairNotFound - } - - var out []*WatcherEvent - - for k, v := range s.cache { - if strings.Contains(k, key) { - out = append(out, v) - } - } - - if len(out) == 0 { - return nil, ErrKVPairNotFound - } - - return out, nil -} - -// the watcher-set watcher -type watcher struct { - id uint64 - - // the underlay watcherSet - watcherSet *watcherSetImpl - - // the interest topic - interested struct { - key string - prefix bool - } - ch chan *WatcherEvent - - closeOnce sync.Once - exit chan struct{} -} - -// nolint -func (w *watcher) ResultChan() <-chan *WatcherEvent { - return w.ch -} - -// nolint -func (w *watcher) ID() string { - return strconv.FormatUint(w.id, 10) -} - -// nolint -func (w *watcher) stop() { - // double close will panic - w.closeOnce.Do(func() { - close(w.exit) - }) -} - -// done checks watcher status -func (w *watcher) done() <-chan struct{} { - return w.exit -} - -// newWatcherSet returns new watcher set from parent context -func newWatcherSet(ctx context.Context) WatcherSet { - s := &watcherSetImpl{ - ctx: ctx, - cache: map[string]*WatcherEvent{}, - watchers: map[uint64]*watcher{}, - } - go s.closeWatchers() - return s -} diff --git a/remoting/kubernetes/watch_test.go b/remoting/kubernetes/watch_test.go deleted file mode 100644 index 9a0139dd6..000000000 --- a/remoting/kubernetes/watch_test.go +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kubernetes - -import ( - "context" - "strconv" - "sync" - "testing" - "time" -) - -func TestWatchSet(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - s := newWatcherSet(ctx) - - wg := sync.WaitGroup{} - - for i := 0; i < 2; i++ { - - wg.Add(1) - - go func() { - defer wg.Done() - w, err := s.Watch("key-1", false) - if err != nil { - t.Error(err) - return - } - for { - select { - case e := <-w.ResultChan(): - t.Logf("consumer %s got %s\n", w.ID(), e.Key) - - case <-w.done(): - t.Logf("consumer %s stopped", w.ID()) - return - } - } - }() - } - for i := 2; i < 3; i++ { - - wg.Add(1) - go func() { - defer wg.Done() - w, err := s.Watch("key", true) - if err != nil { - t.Error(err) - return - } - - for { - select { - case e := <-w.ResultChan(): - t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key) - - case <-w.done(): - t.Logf("prefix consumer %s stopped", w.ID()) - return - } - } - }() - } - - for i := 0; i < 5; i++ { - go func(i int) { - if err := s.Put(&WatcherEvent{ - Key: "key-" + strconv.Itoa(i), - Value: strconv.Itoa(i), - }); err != nil { - t.Error(err) - return - } - }(i) - } - - wg.Wait() -} diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index b4a250352..fbe4749f0 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -175,7 +175,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li for _, n := range newChildren { newNode = path.Join(zkPath, n) - logger.Infof("[Zookeeper Listener] add zkNode{%s}", newNode) + logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode) content, _, connErr := l.client.Conn.Get(newNode) if connErr != nil { logger.Errorf("Get new node path {%v} 's content error,message is {%v}", -- GitLab