Skip to content
Snippets Groups Projects
Commit e5efee27 authored by scott's avatar scott
Browse files

Fix kubernetes import block and make map param

parent bfc14453
No related branches found
No related tags found
No related merge requests found
......@@ -50,4 +50,10 @@ require (
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
k8s.io/api v0.0.0-20190325185214-7544f9db76f6
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841
k8s.io/client-go v8.0.0+incompatible
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
)
go 1.13
......@@ -14,10 +14,13 @@ github.com/Jeffail/gabs v1.1.0 h1:kw5zCcl9tlJNHTDme7qbi21fDHZmXrnjMoXos3Jw/NI=
github.com/Jeffail/gabs v1.1.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc=
github.com/Microsoft/go-winio v0.4.3 h1:M3NHMuPgMSUPdE5epwNUHlRPSVzHs8HpRTrVXhR0myo=
github.com/Microsoft/go-winio v0.4.3/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0=
github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/SAP/go-hdb v0.12.0 h1:5hBQZ2jjyZ268qjDmoDZJuCyLzR6oRLI60eYzmTW9m4=
github.com/SAP/go-hdb v0.12.0/go.mod h1:etBT+FAi1t5k3K3tf5vQTnosgYmhDkRi8jEnQqCnxF0=
github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc h1:LkkwnbY+S8WmwkWq1SVyRWMH9nYWO1P5XN3OD1tts/w=
......@@ -86,6 +89,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbp
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/creasty/defaults v1.3.0 h1:uG+RAxYbJgOPCOdKEcec9ZJXeva7Y6mj/8egdzwmLtw=
github.com/creasty/defaults v1.3.0/go.mod h1:CIEEvs7oIVZm30R8VxtFJs+4k201gReYyuYHJxZc68I=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
......@@ -112,6 +116,7 @@ github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIh
github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.8.0 h1:uE6Fp4fOcAJdc1wTQXLJ+SYistkbG1dNoi6Zs1+Ybvk=
github.com/envoyproxy/go-control-plane v0.8.0/go.mod h1:GSSbY9P1neVhdY7G4wu+IK1rk/dqhiCC/4ExuWJZVuk=
github.com/envoyproxy/protoc-gen-validate v0.0.14 h1:YBW6/cKy9prEGRYLnaGa4IDhzxZhRCtKsax8srGKDnM=
......@@ -123,6 +128,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7 h1:bGT+Ub6bpzHl7AAYQhBrZ5nYTAH2SF/848WducU0Ao4=
github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
......@@ -136,6 +142,10 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-ole/go-ole v1.2.1 h1:2lOsA72HgjxAuMlKpFiCbHTvu44PIVkZ5hqm3RSdI/E=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I=
github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck=
github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
......@@ -156,6 +166,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
......@@ -174,9 +185,11 @@ github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4r
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 h1:zLTLjkaOFEFIOxY5BWLFLwh+cL8vOBW4XJ2aqLE/Tf0=
github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g=
github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/gophercloud/gophercloud v0.0.0-20180828235145-f29afc2cceca h1:wobTb8SE189AuxzEKClyYxiI4nUGWlpVtl13eLiFlOE=
......@@ -291,6 +304,7 @@ github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/joyent/triton-go v0.0.0-20180628001255-830d2b111e62 h1:JHCT6xuyPUrbbgAPE/3dqlvUKzRHMNuTBKKUb6OeR/k=
github.com/joyent/triton-go v0.0.0-20180628001255-830d2b111e62/go.mod h1:U+RSyWxWd04xTqnuOQxnai7XGS2PrPY2cfGoDKtMHjA=
github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo=
......@@ -321,6 +335,7 @@ github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5Wu
github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
......@@ -350,9 +365,11 @@ github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
......@@ -360,7 +377,9 @@ github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW
github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk=
github.com/oklog/run v0.0.0-20180308005104-6934b124db28 h1:Hbr3fbVPXea52oPQeP7KLSxP52g6SFaNY1IqAmUyEW0=
github.com/oklog/run v0.0.0-20180308005104-6934b124db28/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ=
......@@ -383,6 +402,7 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1 h1:ccV59UEOTzVDnDUEFdT95ZzHVZ+5+158q8+SJb2QV5w=
......@@ -435,12 +455,14 @@ github.com/softlayer/softlayer-go v0.0.0-20180806151055-260589d94c7d h1:bVQRCxQv
github.com/softlayer/softlayer-go v0.0.0-20180806151055-260589d94c7d/go.mod h1:Cw4GTlQccdRGSEf6KiMju767x0NEHE0YIVPJSaXjlsw=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
......@@ -477,6 +499,7 @@ golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c h1:Vj5n4GlwjmQteupaxJ9+0F
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
......@@ -496,6 +519,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......@@ -508,6 +532,7 @@ golang.org/x/sys v0.0.0-20190508220229-2d0786266e9c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
......@@ -517,6 +542,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZe
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
......@@ -572,3 +598,9 @@ k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841 h1:Q4RZrHNtlC/mSdC1sTrcZ5
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0=
k8s.io/client-go v8.0.0+incompatible h1:tTI4hRmb1DRMl4fG6Vclfdi6nTM82oIrTT7HfitmxC4=
k8s.io/client-go v8.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLyy7rfbeuf1PYyBf973pgyU=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
package kubernetes
import (
"context"
"strings"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
type dataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
}
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
}
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
func (l *dataListener) DataChange(eventType remoting.Event) bool {
index := strings.Index(eventType.Path, "/providers/")
if index == -1 {
logger.Warn("Listen with no url, event.path={%v}", eventType.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(context.Background(), url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}
for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
ConfigType: eventType.Action,
},
)
return true
}
}
return false
}
type configurationListener struct {
registry *kubernetesRegistry
events chan *config_center.ConfigChangeEvent
}
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
// add a new waiter
reg.wg.Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.done:
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exit now.")
return nil, perrors.New("listener stopped")
case e := <-l.events:
logger.Infof("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
select {
case <-l.registry.done:
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *configurationListener) Close() {
l.registry.wg.Done()
}
package kubernetes
package kubernetes
import (
"fmt"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
)
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/kubernetes"
)
var (
processID = ""
localIP = ""
)
const (
Name = "kubernetes"
RegistryConnDelay = 3
)
func init() {
processID = fmt.Sprintf("%d", os.Getpid())
localIP, _ = gxnet.GetLocalIP()
extension.SetRegistry(Name, newKubernetesRegistry)
}
type kubernetesRegistry struct {
*common.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
cltLock sync.Mutex
client *kubernetes.Client
services map[string]common.URL // service name + protocol -> service config
listenerLock sync.Mutex
listener *kubernetes.EventListener
dataListener *dataListener
configListener *configurationListener
wg sync.WaitGroup // wg+done for kubernetes client restart
done chan struct{}
}
func (r *kubernetesRegistry) Client() *kubernetes.Client {
return r.client
}
func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
r.client = client
}
func (r *kubernetesRegistry) ClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *kubernetesRegistry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *kubernetesRegistry) GetDone() chan struct{} {
return r.done
}
func (r *kubernetesRegistry) RestartCallBack() bool {
services := []common.URL{}
for _, confIf := range r.services {
services = append(services, confIf)
}
flag := true
for _, confIf := range services {
err := r.Register(confIf)
if err != nil {
logger.Errorf("(kubernetesProviderRegistry)register(conf{%#v}) = error{%#v}",
confIf, perrors.WithStack(err))
flag = false
break
}
logger.Infof("success to re-register service :%v", confIf.Key())
}
return flag
}
func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
r := &kubernetesRegistry{
URL: url,
birth: time.Now().UnixNano(),
done: make(chan struct{}),
services: make(map[string]common.URL),
}
if err := kubernetes.ValidateClient(r); err != nil {
return nil, err
}
r.wg.Add(1)
go kubernetes.HandleClientRestart(r)
r.listener = kubernetes.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
return r, nil
}
func (r *kubernetesRegistry) GetUrl() common.URL {
return *r.URL
}
func (r *kubernetesRegistry) IsAvailable() bool {
select {
case <-r.done:
return false
default:
return true
}
}
func (r *kubernetesRegistry) Destroy() {
if r.configListener != nil {
r.configListener.Close()
}
r.stop()
}
func (r *kubernetesRegistry) stop() {
close(r.done)
// close current client
r.client.Close()
r.cltLock.Lock()
r.client = nil
r.services = nil
r.cltLock.Unlock()
}
func (r *kubernetesRegistry) Register(svc common.URL) error {
role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if err != nil {
return perrors.WithMessage(err, "get registry role")
}
r.cltLock.Lock()
if _, ok := r.services[svc.Key()]; ok {
r.cltLock.Unlock()
return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
}
r.cltLock.Unlock()
switch role {
case common.PROVIDER:
logger.Debugf("(provider register )Register(conf{%#v})", svc)
if err := r.registerProvider(svc); err != nil {
return perrors.WithMessage(err, "register provider")
}
case common.CONSUMER:
logger.Debugf("(consumer register )Register(conf{%#v})", svc)
if err := r.registerConsumer(svc); err != nil {
return perrors.WithMessage(err, "register consumer")
}
default:
return perrors.New(fmt.Sprintf("unknown role %d", role))
}
r.cltLock.Lock()
r.services[svc.Key()] = svc
r.cltLock.Unlock()
return nil
}
func (r *kubernetesRegistry) createDirIfNotExist(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
if err := r.client.Create(tmpPath, ""); err != nil {
return perrors.WithMessagef(err, "create path %s in kubernetes", tmpPath)
}
}
return nil
}
func (r *kubernetesRegistry) registerConsumer(svc common.URL) error {
consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER])
if err := r.createDirIfNotExist(consumersNode); err != nil {
logger.Errorf("kubernetes client create path %s: %v", consumersNode, err)
return perrors.WithMessage(err, "kubernetes create consumer nodes")
}
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return perrors.WithMessage(err, "create provider node")
}
params := url.Values{}
params.Add("protocol", svc.Protocol)
params.Add("category", (common.RoleType(common.CONSUMER)).String())
params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
}
func (r *kubernetesRegistry) registerProvider(svc common.URL) error {
if len(svc.Path) == 0 || len(svc.Methods) == 0 {
return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
}
var (
urlPath string
encodedURL string
dubboPath string
)
providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
if err := r.createDirIfNotExist(providersNode); err != nil {
return perrors.WithMessage(err, "create provider node")
}
params := url.Values{}
svc.RangeParams(func(key, value string) bool {
params[key] = []string{value}
return true
})
params.Add("pid", processID)
params.Add("ip", localIP)
params.Add("anyhost", "true")
params.Add("category", (common.RoleType(common.PROVIDER)).String())
params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
params.Add("side", (common.RoleType(common.PROVIDER)).Role())
if len(svc.Methods) == 0 {
params.Add("methods", strings.Join(svc.Methods, ","))
}
logger.Debugf("provider url params:%#v", params)
var host string
if len(svc.Ip) == 0 {
host = localIP + ":" + svc.Port
} else {
host = svc.Ip + ":" + svc.Port
}
urlPath = svc.Path
encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
return perrors.WithMessagef(err, "create k/v in kubernetes (path:%s, url:%s)", dubboPath, encodedURL)
}
return nil
}
func (r *kubernetesRegistry) subscribe(svc *common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
)
r.listenerLock.Lock()
configListener = r.configListener
r.listenerLock.Unlock()
if r.listener == nil {
r.cltLock.Lock()
client := r.client
r.cltLock.Unlock()
if client == nil {
return nil, perrors.New("kubernetes client broken")
}
// new client & listener
listener := kubernetes.NewEventListener(r.client)
r.listenerLock.Lock()
r.listener = listener
r.listenerLock.Unlock()
}
//register the svc to dataListener
r.dataListener.AddInterestedURL(svc)
for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") {
go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener)
}
return configListener, nil
}
//subscribe from registry
func (r *kubernetesRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {
if serviceEvent, err := listener.Next(); err != nil {
logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
listener.Close()
return
} else {
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}
}
}
package kubernetes
package kubernetes
import (
"context"
"encoding/base64"
"encoding/json"
"os"
"runtime/debug"
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
import (
"github.com/apache/dubbo-go/common/logger"
)
const (
// kubernetes inject the var
podNameKey = "HOSTNAME"
nameSpaceKey = "NAMESPACE"
// all pod annotation key
DubboIOAnnotationKey = "dubbo.io/annotation"
DubboIOLabelKey = "dubbo.io/label"
DubboIOLabelValue = "dubbo.io-value"
)
var (
ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
ErrDubboAnnotationsAlreadyExist = perrors.New("dubbo annotations already exist")
)
type Client struct {
// kubernetes connection config
cfg *rest.Config
// the kubernetes interface
rawClient kubernetes.Interface
// current pod config
currentPodName string
ns string
// the memory store
store Store
// protect the wg && currentPod
lock sync.Mutex
// current pod status
currentPod *v1.Pod
// protect the maintenanceStatus loop && watcher
wg sync.WaitGroup
// manage the client lifecycle
ctx context.Context
cancel context.CancelFunc
}
// load CurrentPodName
func getCurrentPodName() (string, error) {
v := os.Getenv(podNameKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (HOSTNAME)")
}
return v, nil
}
// load CurrentNameSpace
func getCurrentNameSpace() (string, error) {
v := os.Getenv(nameSpaceKey)
if len(v) == 0 {
return "", perrors.New("read value from env by key (NAMESPACE)")
}
return v, nil
}
// newClient
// new a client for registry
func newClient(namespace string) (*Client, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, perrors.WithMessage(err, "get in-cluster config")
}
rawClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, perrors.WithMessage(err, "new kubernetes client by in cluster config")
}
currentPodName, err := getCurrentPodName()
if err != nil {
return nil, perrors.WithMessage(err, "get pod name")
}
ctx, cancel := context.WithCancel(context.Background())
c := &Client{
currentPodName: currentPodName,
ns: namespace,
cfg: cfg,
rawClient: rawClient,
ctx: ctx,
store: newStore(ctx),
cancel: cancel,
}
currentPod, err := c.initCurrentPod()
if err != nil {
return nil, perrors.WithMessage(err, "init current pod")
}
// record current status
c.currentPod = currentPod
// init the store by current pods
if err := c.initStore(); err != nil {
return nil, perrors.WithMessage(err, "init store")
}
// start kubernetes watch loop
if err := c.maintenanceStatus(); err != nil {
return nil, perrors.WithMessage(err, "maintenance the kubernetes status")
}
logger.Info("init kubernetes registry success")
return c, nil
}
// initCurrentPod
// 1. get current pod
// 2. give the dubbo-label for this pod
func (c *Client) initCurrentPod() (*v1.Pod, error) {
// read the current pod status
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
if err != nil {
if err != ErrDubboLabelAlreadyExist {
return nil, perrors.WithMessage(err, "assemble dubbo label")
}
// current pod don't have label
}
p, err := c.getPatch(oldPod, newPod)
if err != nil {
return nil, perrors.WithMessage(err, "get patch")
}
currentPod, err = c.patchCurrentPod(p)
if err != nil {
return nil, perrors.WithMessage(err, "patch to current pod")
}
return currentPod, nil
}
// initStore
// 1. get all with dubbo label pods
// 2. put every element to store
func (c *Client) initStore() error {
pods, err := c.rawClient.CoreV1().Pods(c.ns).List(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
})
if err != nil {
return perrors.WithMessagef(err, "list pods in namespace (%s)", c.ns)
}
for _, pod := range pods.Items {
logger.Debugf("got the pod (name: %s), (label: %v), (annotations: %v)", pod.Name, pod.GetLabels(), pod.GetAnnotations())
c.handleWatchedPodEvent(&pod, watch.Added)
}
return nil
}
// maintenanceStatus
// try to watch kubernetes pods
func (c *Client) maintenanceStatus() error {
c.wg.Add(1)
// try once
watcher, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
})
if err != nil {
return perrors.WithMessagef(err, "try to watch the namespace (%s) pods", c.ns)
}
watcher.Stop()
// add wg, grace close the client
go c.maintenanceStatusLoop()
return nil
}
// maintenanceStatus
// try to notify
func (c *Client) maintenanceStatusLoop() {
defer func() {
// notify other goroutine, this loop over
c.wg.Done()
logger.Info("maintenanceStatusLoop goroutine game over")
}()
var lastResourceVersion string
for {
wc, err := c.rawClient.CoreV1().Pods(c.ns).Watch(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
Watch: true,
ResourceVersion: lastResourceVersion,
})
if err != nil {
logger.Warnf("watch the namespace (%s) pods: %v, retry after 2 seconds", c.ns, err)
time.Sleep(2 * time.Second)
continue
}
logger.Infof("the old kubernetes client broken, collect the resource status from resource version (%s)", lastResourceVersion)
select {
case <-c.ctx.Done():
// the client stopped
logger.Info("the kubernetes client stopped")
return
default:
for {
select {
// double check ctx
case <-c.ctx.Done():
logger.Info("the kubernetes client stopped")
// get one element from result-chan
case event, ok := <-wc.ResultChan():
if !ok {
wc.Stop()
logger.Info("kubernetes watch chan die, create new")
goto onceWatch
}
if event.Type == watch.Error {
// watched a error event
logger.Warnf("kubernetes watch api report err (%#v)", event)
continue
}
type resourceVersionGetter interface {
GetResourceVersion() string
}
o, ok := event.Object.(resourceVersionGetter)
if !ok {
continue
}
// record the last resource version avoid to sync all pod
lastResourceVersion = o.GetResourceVersion()
logger.Infof("kuberentes get the current resource version %v", lastResourceVersion)
// check event object type
p, ok := event.Object.(*v1.Pod)
if !ok {
// not a pod
continue
}
// handle the watched pod
go c.handleWatchedPodEvent(p, event.Type)
}
}
onceWatch:
}
}
}
// handleWatchedPodEvent
// handle watched pod event
func (c *Client) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
for ak, av := range p.GetAnnotations() {
// not dubbo interest annotation
if ak != DubboIOAnnotationKey {
continue
}
ol, err := c.unmarshalRecord(av)
if err != nil {
logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
return
}
for _, o := range ol {
switch eventType {
case watch.Added:
// if pod is added, the record always be create
o.EventType = Create
case watch.Modified:
o.EventType = Update
case watch.Deleted:
o.EventType = Delete
default:
logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
return
}
logger.Debugf("prepare to put object (%#v) to kuberentes-store", o)
if err := c.store.Put(o); err != nil {
logger.Errorf("put (%#v) to cache store: %v ", o, err)
return
}
}
}
}
// unmarshalRecord
// unmarshal the kubernetes dubbo annotation value
func (c *Client) unmarshalRecord(record string) ([]*Object, error) {
if len(record) == 0 {
// NOTICE:
// []*Object is nil.
return nil, nil
}
rawMsg, err := base64.URLEncoding.DecodeString(record)
if err != nil {
return nil, perrors.WithMessagef(err, "decode record (%s)", record)
}
var out []*Object
if err := json.Unmarshal(rawMsg, &out); err != nil {
return nil, perrors.WithMessage(err, "decode json")
}
return out, nil
}
// marshalRecord
// marshal the kubernetes dubbo annotation value
func (c *Client) marshalRecord(ol []*Object) (string, error) {
msg, err := json.Marshal(ol)
if err != nil {
return "", perrors.WithMessage(err, "json encode object list")
}
return base64.URLEncoding.EncodeToString(msg), nil
}
// readCurrentPod
// read the current pod status from kubernetes api
func (c *Client) readCurrentPod() (*v1.Pod, error) {
currentPod, err := c.rawClient.CoreV1().Pods(c.ns).Get(c.currentPodName, metav1.GetOptions{})
if err != nil {
return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.currentPodName, c.ns)
}
return currentPod, nil
}
// Create
// create k/v pair in storage
func (c *Client) Create(k, v string) error {
// 1. accord old pod && (k, v) assemble new pod dubbo annotion v
// 2. get patch data
// 3. PATCH the pod
c.lock.Lock()
defer c.lock.Unlock()
currentPod, err := c.readCurrentPod()
if err != nil {
return perrors.WithMessage(err, "read current pod")
}
oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
if err != nil {
return perrors.WithMessage(err, "assemble")
}
patchBytes, err := c.getPatch(oldPod, newPod)
if err != nil {
return perrors.WithMessage(err, "get patch")
}
updatedPod, err := c.patchCurrentPod(patchBytes)
if err != nil {
return perrors.WithMessage(err, "patch current pod")
}
c.currentPod = updatedPod
// not update the store, the store should be write by the maintenanceStatusLoop
return nil
}
// patch current pod
// write new meta for current pod
func (c *Client) patchCurrentPod(patch []byte) (*v1.Pod, error) {
updatedPod, err := c.rawClient.CoreV1().Pods(c.ns).Patch(c.currentPodName, types.StrategicMergePatchType, patch)
if err != nil {
return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
}
return updatedPod, nil
}
// assemble the dubbo kubernete label
// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
func (c *Client) assembleDUBBOLabel(currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
oldPod.Labels = make(map[string]string, 8)
newPod.Labels = make(map[string]string, 8)
if currentPod.GetLabels() != nil {
if currentPod.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue {
// already have label
err = ErrDubboLabelAlreadyExist
return
}
}
// copy current pod labels to oldPod && newPod
for k, v := range currentPod.GetLabels() {
oldPod.Labels[k] = v
newPod.Labels[k] = v
}
// assign new label for current pod
newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue
return
}
// assemble the dubbo kubernetes annotations
// accord the current pod && (k,v) assemble the old-pod, new-pod
func (c *Client) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
oldPod = &v1.Pod{}
newPod = &v1.Pod{}
oldPod.Annotations = make(map[string]string, 8)
newPod.Annotations = make(map[string]string, 8)
for k, v := range currentPod.GetAnnotations() {
oldPod.Annotations[k] = v
newPod.Annotations[k] = v
}
al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
if err != nil {
err = perrors.WithMessage(err, "unmarshal record")
return
}
newAnnotations, err := c.marshalRecord(append(al, &Object{Key: k, Value: v}))
if err != nil {
err = perrors.WithMessage(err, "marshal record")
return
}
newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
return
}
// getPatch
// get the kubernetes pod patch bytes
func (c *Client) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
oldData, err := json.Marshal(oldPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal old pod")
}
newData, err := json.Marshal(newPod)
if err != nil {
return nil, perrors.WithMessage(err, "marshal newPod pod")
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
if err != nil {
return nil, perrors.WithMessage(err, "create two-way-merge-patch")
}
return patchBytes, nil
}
// GetChildren
// get k children list from kubernetes-store
func (c *Client) GetChildren(k string) ([]string, []string, error) {
objectList, err := c.store.Get(k, true)
if err != nil {
return nil, nil, perrors.WithMessagef(err, "get children from store on (%s)", k)
}
var kList []string
var vList []string
for _, o := range objectList {
kList = append(kList, o.Key)
vList = append(vList, o.Value)
}
return kList, vList, nil
}
// Watch
// watch on spec key
func (c *Client) Watch(k string) (<-chan *Object, error) {
debug.PrintStack()
w, err := c.store.Watch(k, false)
if err != nil {
return nil, perrors.WithMessagef(err, "watch on (%s)", k)
}
return w.ResultChan(), nil
}
// Watch
// watch on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (<-chan *Object, error) {
w, err := c.store.Watch(prefix, true)
if err != nil {
return nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
}
return w.ResultChan(), nil
}
// Valid
// Valid the client
// if return false, the client is die
func (c *Client) Valid() bool {
select {
case <-c.Done():
return false
default:
return true
}
}
// Done
// read the client status
func (c *Client) Done() <-chan struct{} {
return c.ctx.Done()
}
// Stop
// read the client status
func (c *Client) Close() {
select {
case <-c.ctx.Done():
//already stopped
return
default:
}
c.cancel()
// the client ctx be canceled
// will trigger the store watchers all stopped
// so, just wait
c.wg.Wait()
}
// ValidateClient
// validate the kubernetes client
func ValidateClient(container clientFacade) error {
lock := container.ClientLock()
lock.Lock()
defer lock.Unlock()
// new Client
if container.Client() == nil {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
}
newClient, err := newClient(ns)
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns)
}
container.SetClient(newClient)
}
if !container.Client().Valid() {
ns, err := getCurrentNameSpace()
if err != nil {
return perrors.WithMessage(err, "get current namespace")
}
newClient, err := newClient(ns)
if err != nil {
logger.Warnf("new kubernetes client (namespace{%s}: %v)", ns, err)
return perrors.WithMessagef(err, "new kubernetes client (:%+v)", ns)
}
container.SetClient(newClient)
}
return nil
}
package kubernetes
import (
"sync"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
const (
ConnDelay = 3
MaxFailTimes = 15
)
type clientFacade interface {
Client() *Client
SetClient(*Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup
GetDone() chan struct{}
RestartCallBack() bool
common.Node
}
func HandleClientRestart(r clientFacade) {
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
break LOOP
// re-register all services
case <-r.Client().Done():
r.ClientLock().Lock()
r.Client().Close()
r.SetClient(nil)
r.ClientLock().Unlock()
// try to connect to kubernetes,
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
}
err = ValidateClient(r)
logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
}
}
}
}
package kubernetes
import (
"sync"
"time"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/remoting"
)
type EventListener struct {
client *Client
keyMapLock sync.Mutex
keyMap map[string]struct{}
wg sync.WaitGroup
}
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
keyMap: make(map[string]struct{}),
}
}
// Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.Watch(key)
if err != nil {
logger.Warnf("watch exist{key:%s} = error{%v}", key, err)
return false
}
select {
// client stopped
case <-l.client.Done():
logger.Warnf("kubernetes client stopped")
return false
// handle kubernetes-store events
case e, ok := <-wc:
if !ok {
logger.Warnf("kubernetes-store watch-chan closed")
return false
}
if l.handleEvents(e, listener...) {
// if event is delete
return true
}
}
}
return false
}
// return true mean the event type is DELETE
// return false mean the event type is CREATE || UPDATE
func (l *EventListener) handleEvents(event *Object, listeners ...remoting.DataListener) bool {
logger.Infof("got a kubernetes-store event {type: %d, key: %s}", event.EventType, event.Key)
switch event.EventType {
case Create:
for _, listener := range listeners {
logger.Infof("kubernetes-store get event (key{%s}) = event{EventNodeDataCreated}", event.Key)
listener.DataChange(remoting.Event{
Path: string(event.Key),
Action: remoting.EventTypeAdd,
Content: string(event.Value),
})
}
return false
case Update:
for _, listener := range listeners {
logger.Infof("kubernetes-store get event (key{%s}) = event{EventNodeDataChanged}", event.Key)
listener.DataChange(remoting.Event{
Path: string(event.Key),
Action: remoting.EventTypeUpdate,
Content: string(event.Value),
})
}
return false
case Delete:
logger.Warnf("kubernetes-store get event (key{%s}) = event{EventNodeDeleted}", event.Key)
return true
default:
return false
}
}
// Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
defer l.wg.Done()
for {
wc, err := l.client.WatchWithPrefix(prefix)
if err != nil {
logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
}
select {
// client stopped
case <-l.client.Done():
logger.Warnf("kubernetes client stopped")
return
// kuberentes-store event stream
case e, ok := <-wc:
if !ok {
logger.Warnf("kubernetes-store watch-chan closed")
return
}
l.handleEvents(e, listener...)
}
}
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
l.keyMapLock.Lock()
_, ok := l.keyMap[key]
l.keyMapLock.Unlock()
if ok {
logger.Warnf("kubernetes-store key %s has already been listened.", key)
return
}
l.keyMapLock.Lock()
l.keyMap[key] = struct{}{}
l.keyMapLock.Unlock()
keyList, valueList, err := l.client.GetChildren(key)
if err != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
}
logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
for i, k := range keyList {
logger.Infof("got children list key -> %s", k)
listener.DataChange(remoting.Event{
Path: k,
Action: remoting.EventTypeAdd,
Content: valueList[i],
})
}
logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-store", key)
go func(key string, listener remoting.DataListener) {
l.ListenServiceNodeEventWithPrefix(key, listener)
logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
}(key, listener)
logger.Infof("listen dubbo service key{%s}", key)
go func(key string) {
if l.ListenServiceNodeEvent(key) {
listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
}
logger.Warnf("listenSelf(kubernetes key{%s}) goroutine exit now", key)
}(key)
}
func (l *EventListener) Close() {
l.wg.Wait()
}
package kubernetes
import (
"context"
"strconv"
"strings"
"sync"
)
import (
perrors "github.com/pkg/errors"
)
var (
ErrStoreAlreadyStopped = perrors.New("the store already be stopped")
ErrKVPairNotFound = perrors.New("k/v pair not found")
)
const (
defaultWatcherChanSize = 100
)
type eventType int
const (
Create eventType = iota
Update
Delete
)
func (e eventType) String() string {
switch e {
case Create:
return "CREATE"
case Update:
return "UPDATE"
case Delete:
return "DELETE"
default:
return "UNKNOWN"
}
}
// Object
// object is element in store
type Object struct {
// event-type
EventType eventType `json:"-"`
// the dubbo-go should consume the key
Key string `json:"k"`
// the dubbo-go should consume the value
Value string `json:"v"`
}
// Watchable Store
type Store interface {
// put the object to the store
Put(object *Object) error
// if prefix is false,
// the len([]*Object) == 0
Get(key string, prefix bool) ([]*Object, error)
// watch the spec key or key prefix
Watch(key string, prefix bool) (Watcher, error)
// check the store status
Done() <-chan struct{}
}
// Stopped Watcher
type Watcher interface {
// the watcher's id
ID() string
// result stream
ResultChan() <-chan *Object
// Stop the watcher
stop()
// check the watcher status
done() <-chan struct{}
}
// the store
type storeImpl struct {
// Client's ctx, client die, the store will die too
ctx context.Context
// protect store and watchers
lock sync.RWMutex
// the key is dubbo-go interest meta
cache map[string]*Object
currentWatcherId uint64
watchers map[uint64]*watcher
}
func (s *storeImpl) loop() {
select {
case <-s.ctx.Done():
// parent ctx be canceled, close the store
s.lock.Lock()
defer s.lock.Unlock()
for _, w := range s.watchers {
// stop data stream
close(w.ch)
// stop watcher
w.stop()
}
}
}
// Watch
// watch on spec key, with or without prefix
func (s *storeImpl) Watch(key string, prefix bool) (Watcher, error) {
return s.addWatcher(key, prefix)
}
// Done
// get the store status
func (s *storeImpl) Done() <-chan struct{} {
return s.ctx.Done()
}
// Put
// put the object to store
func (s *storeImpl) Put(object *Object) error {
sendMsg := func(object *Object, w *watcher) {
s.lock.Lock()
defer s.lock.Unlock()
select {
case <-w.done():
// the watcher already stop
case w.ch <- object:
// block send the msg
}
}
s.lock.Lock()
defer s.lock.Unlock()
if err := s.valid(); err != nil {
return err
}
// put to store
if object.EventType == Delete {
delete(s.cache, object.Key)
} else {
s.cache[object.Key] = object
}
// notify watcher
for _, w := range s.watchers {
if !strings.Contains(object.Key, w.interested.key) {
// this watcher no interest in this element
continue
}
if !w.interested.prefix {
if object.Key == w.interested.key {
go sendMsg(object, w)
}
// not interest
continue
}
go sendMsg(object, w)
}
return nil
}
// valid
// valid the client status
// NOTICE:
// should protected by lock
func (s *storeImpl) valid() error {
select {
case <-s.ctx.Done():
return ErrStoreAlreadyStopped
default:
return nil
}
}
// addWatcher
func (s *storeImpl) addWatcher(key string, prefix bool) (Watcher, error) {
w := &watcher{
store: s,
interested: struct {
key string
prefix bool
}{key: key, prefix: prefix},
ch: make(chan *Object, defaultWatcherChanSize),
exit: make(chan struct{}),
}
s.lock.Lock()
defer s.lock.Unlock()
if err := s.valid(); err != nil {
return nil, err
}
s.watchers[s.currentWatcherId] = w
w.id = s.currentWatcherId
s.currentWatcherId = s.currentWatcherId + 1
return w, nil
}
// Get
// get elements from cache
func (s *storeImpl) Get(key string, prefix bool) ([]*Object, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if err := s.valid(); err != nil {
return nil, err
}
if !prefix {
for k, v := range s.cache {
if k == key {
return []*Object{v}, nil
}
}
// object
return nil, ErrKVPairNotFound
}
var out []*Object
for k, v := range s.cache {
if strings.Contains(k, key) {
out = append(out, v)
}
}
if len(out) == 0 {
return nil, ErrKVPairNotFound
}
return out, nil
}
// the store watcher
type watcher struct {
id uint64
// the underlay store
store *storeImpl
// the interest topic
interested struct {
key string
prefix bool
}
ch chan *Object
closeOnce sync.Once
exit chan struct{}
}
// ResultChan
func (w *watcher) ResultChan() <-chan *Object {
return w.ch
}
// ID
// the watcher's id
func (w *watcher) ID() string {
return strconv.FormatUint(w.id, 10)
}
// stop
// stop the watcher
func (w *watcher) stop() {
// double close will panic
w.closeOnce.Do(func() {
close(w.exit)
})
}
// done
// check watcher status
func (w *watcher) done() <-chan struct{} {
return w.exit
}
// newStore
// new store from parent context
func newStore(ctx context.Context) Store {
s := &storeImpl{
ctx: ctx,
cache: map[string]*Object{},
watchers: map[uint64]*watcher{},
}
go s.loop()
return s
}
package kubernetes
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
)
func TestStore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
s := newStore(ctx)
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
w, err := s.Watch("key-1", false)
if err != nil {
fmt.Println("watch spec result", err)
return
}
for e := range w.ResultChan() {
fmt.Printf("consumer %s got %s\n", w.ID(), e.Key)
}
}()
}
for i := 2; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
w, err := s.Watch("key", true)
if err != nil {
fmt.Println("watch prefix result", err)
return
}
for e := range w.ResultChan() {
fmt.Printf("prefix consumer %s got %s\n", w.ID(), e.Key)
}
}()
}
for i := 0; i < 5; i++ {
go func(i int) {
if err := s.Put(&Object{
Key: "key-" + strconv.Itoa(i),
Value: strconv.Itoa(i),
}); err != nil {
t.Fatal(err)
}
}(i)
}
wg.Wait()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment