Skip to content
Snippets Groups Projects
Commit 26597d8f authored by lzp0412's avatar lzp0412
Browse files

support nacos registry

parent 58a6aa06
No related branches found
No related tags found
No related merge requests found
......@@ -84,3 +84,14 @@ const (
ProviderConfigPrefix = "dubbo.provider."
ConsumerConfigPrefix = "dubbo.consumer."
)
const (
NACOS_DEFAULT_ROLETYPE = 3
NACOS_CACHE_DIR_KEY = "cacheDir"
NACOS_LOG_DIR_KEY = "logDir"
NACOS_ENDPOINT = "endpoint"
NACOS_SERVICE_NAME_SEPARATOR = ":"
NACOS_CATEGORY_KEY = "category"
NACOS_PROTOCOL_KEY = "protocol"
NACOS_PATH_KEY = "path"
)
......@@ -157,6 +157,7 @@ func (refconfig *ReferenceConfig) getUrlMap() url.Values {
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(refconfig.Retries, 10))
urlMap.Set(constant.GROUP_KEY, refconfig.Group)
urlMap.Set(constant.VERSION_KEY, refconfig.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
//getty invoke async or sync
urlMap.Set(constant.ASYNC_KEY, strconv.FormatBool(refconfig.async))
......
......@@ -36,9 +36,10 @@ type RegistryConfig struct {
TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second
Group string `yaml:"group" json:"group,omitempty" property:"group"`
//for registry
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Address string `yaml:"address" json:"address,omitempty" property:"address"`
Username string `yaml:"username" json:"username,omitempty" property:"username"`
Password string `yaml:"password" json:"password,omitempty" property:"password"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
}
func (*RegistryConfig) Prefix() string {
......@@ -109,6 +110,8 @@ func (regconfig *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType)))
urlMap.Set(constant.REGISTRY_KEY, regconfig.Protocol)
urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, regconfig.TimeoutStr)
for k, v := range regconfig.Params {
urlMap.Set(k, v)
}
return urlMap
}
......@@ -163,6 +163,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.RETRIES_KEY, strconv.FormatInt(srvconfig.Retries, 10))
urlMap.Set(constant.GROUP_KEY, srvconfig.Group)
urlMap.Set(constant.VERSION_KEY, srvconfig.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
//application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
......
module github.com/apache/dubbo-go
require (
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/dubbogo/getty v1.2.2
github.com/dubbogo/gost v1.1.1
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/golang/mock v1.3.1 // indirect
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 // indirect
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/pkg/errors v0.8.1
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/stretchr/testify v1.3.0
github.com/tebeka/strftime v0.1.3 // indirect
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect
go.uber.org/atomic v1.4.0
go.uber.org/zap v1.10.0
gopkg.in/yaml.v2 v2.2.2
......
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8 h1:7zJlM+8bpCAUhv03TZnXkT4MLlLWng1s7An8CLuN73E=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190731020727-1697039810c8/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 h1:D21IyuvjDCshj1/qq+pCNd3VZOAEI9jy6Bi131YlXgI=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
......@@ -8,21 +14,67 @@ github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE=
github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU=
github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE=
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4=
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8=
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc=
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI=
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto=
github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk=
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
......@@ -30,13 +82,22 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 h1:kcXqo9vE6fsZY5X5Rd7R1l7fTgnWaDCVmln65REefiE=
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
package nacos
import (
"bytes"
"net/url"
"reflect"
"strconv"
"sync"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)
type nacosListener struct {
sync.Mutex
namingClient naming_client.INamingClient
listenUrl common.URL
events chan *remoting.ConfigChangeEvent
hostMapInstance map[string]model.Instance
done chan struct{}
subscribeParam *vo.SubscribeParam
}
func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
listener := &nacosListener{namingClient: namingClient, listenUrl: url, events: make(chan *remoting.ConfigChangeEvent, 32), hostMapInstance: map[string]model.Instance{}, done: make(chan struct{})}
err := listener.startListen()
return listener, err
}
func generateInstance(ss model.SubscribeService) model.Instance {
return model.Instance{
InstanceId: ss.InstanceId,
Ip: ss.Ip,
Port: ss.Port,
ServiceName: ss.ServiceName,
Valid: ss.Valid,
Enable: ss.Enable,
Weight: ss.Weight,
Metadata: ss.Metadata,
ClusterName: ss.ClusterName,
}
}
func generateUrl(instance model.Instance) *common.URL {
if instance.Metadata == nil {
logger.Errorf("nacos instance metadata is empty,instance:%+v", instance)
return nil
}
path := instance.Metadata["path"]
myInterface := instance.Metadata["interface"]
if path == "" && myInterface == "" {
logger.Errorf("nacos instance metadata does not have both path key and interface key,instance:%+v", instance)
return nil
}
if path == "" && myInterface != "" {
path = "/" + myInterface
}
protocol := instance.Metadata["protocol"]
if protocol == "" {
logger.Errorf("nacos instance metadata does not have protocol key,instance:%+v", instance)
return nil
}
urlMap := url.Values{}
for k, v := range instance.Metadata {
urlMap.Set(k, v)
}
return common.NewURLWithOptions(common.WithIp(instance.Ip), common.WithPort(strconv.Itoa(int(instance.Port))), common.WithProtocol(protocol), common.WithParams(urlMap), common.WithPath(path))
}
func (nl *nacosListener) Callback(services []model.SubscribeService, err error) {
if err != nil {
logger.Errorf("nacos subscribe callback error:%s ", err.Error())
return
}
nl.Lock()
defer nl.Unlock()
var addInstances []model.Instance
var delInstances []model.Instance
var updateInstances []model.Instance
newInstanceMap := map[string]model.Instance{}
for _, s := range services {
if !s.Enable || !s.Valid {
//实例不可以用
continue
}
host := s.Ip + ":" + strconv.Itoa(int(s.Port))
instance := generateInstance(s)
newInstanceMap[host] = instance
if old, ok := nl.hostMapInstance[host]; !ok {
//新增实例节点
addInstances = append(addInstances, instance)
} else {
//实例更新
if !reflect.DeepEqual(old, instance) {
updateInstances = append(updateInstances, instance)
}
}
}
//判断旧的实例是否在新实例列表中,不存在则代表实例已下线
for host, inst := range nl.hostMapInstance {
if _, ok := newInstanceMap[host]; !ok {
delInstances = append(delInstances, inst)
}
}
nl.hostMapInstance = newInstanceMap
for _, add := range addInstances {
newUrl := generateUrl(add)
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeAdd})
}
}
for _, del := range delInstances {
newUrl := generateUrl(del)
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EventTypeDel})
}
}
for _, update := range updateInstances {
newUrl := generateUrl(update)
if newUrl != nil {
nl.process(&remoting.ConfigChangeEvent{Value: *newUrl, ConfigType: remoting.EvnetTypeUpdate})
}
}
}
func getSubscribeName(url common.URL) string {
var buffer bytes.Buffer
buffer.Write([]byte(common.DubboNodes[common.PROVIDER]))
appendParam(&buffer, url, constant.INTERFACE_KEY)
appendParam(&buffer, url, constant.VERSION_KEY)
appendParam(&buffer, url, constant.GROUP_KEY)
return buffer.String()
}
func (nl *nacosListener) startListen() error {
if nl.namingClient == nil {
return perrors.New("nacos naming client stopped")
}
serviceName := getSubscribeName(nl.listenUrl)
nl.subscribeParam = &vo.SubscribeParam{ServiceName: serviceName, SubscribeCallback: nl.Callback}
return nl.namingClient.Subscribe(nl.subscribeParam)
}
func (nl *nacosListener) stopListen() error {
return nl.namingClient.Unsubscribe(nl.subscribeParam)
}
func (nl *nacosListener) process(configType *remoting.ConfigChangeEvent) {
nl.events <- configType
}
func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-nl.done:
logger.Warnf("nacos listener is close!")
return nil, perrors.New("listener stopped")
case e := <-nl.events:
logger.Debugf("got nacos event %s", e)
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (nl *nacosListener) Close() {
nl.stopListen()
close(nl.done)
}
package nacos
import (
"bytes"
"net"
"strconv"
"strings"
"time"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/registry"
)
var (
localIP = ""
)
func init() {
localIP, _ = utils.GetLocalIP()
extension.SetRegistry("nacos", newNacosRegistry)
}
type nacosRegistry struct {
*common.URL
namingClient naming_client.INamingClient
}
func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if url == nil {
return nil, perrors.New("url is empty!")
}
if url.Location == "" {
return nil, perrors.New("url.location is empty!")
}
configMap := make(map[string]interface{})
var serverConfigs []nacosConstant.ServerConfig
addresses := strings.Split(url.Location, ",")
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
})
}
configMap["serverConfigs"] = serverConfigs
var clientConfig nacosConstant.ClientConfig
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
return nil, err
}
clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NotLoadCacheAtStart = true
configMap["clientConfig"] = clientConfig
return configMap, nil
}
func newNacosRegistry(url *common.URL) (registry.Registry, error) {
nacosConfig, err := getNacosConfig(url)
if err != nil {
return nil, err
}
client, err := clients.CreateNamingClient(nacosConfig)
if err != nil {
return nil, err
}
registry := nacosRegistry{
URL: url,
namingClient: client,
}
return &registry, nil
}
func getCategory(url common.URL) string {
role, _ := strconv.Atoi(url.GetParam(constant.ROLE_KEY, strconv.Itoa(constant.NACOS_DEFAULT_ROLETYPE)))
category := common.DubboNodes[role]
return category
}
func getServiceName(url common.URL) string {
var buffer bytes.Buffer
buffer.Write([]byte(getCategory(url)))
appendParam(&buffer, url, constant.INTERFACE_KEY)
appendParam(&buffer, url, constant.VERSION_KEY)
appendParam(&buffer, url, constant.GROUP_KEY)
return buffer.String()
}
func appendParam(target *bytes.Buffer, url common.URL, key string) {
value := url.GetParam(key, "")
if strings.TrimSpace(value) != "" {
target.Write([]byte(constant.NACOS_SERVICE_NAME_SEPARATOR))
target.Write([]byte(value))
}
}
func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstanceParam {
category := getCategory(url)
params := map[string]string{}
for k, _ := range url.Params {
params[k] = url.Params.Get(k)
}
params[constant.NACOS_CATEGORY_KEY] = category
params[constant.NACOS_PROTOCOL_KEY] = url.Protocol
params[constant.NACOS_PATH_KEY] = url.Path
if url.Ip == "" {
url.Ip = localIP
}
if url.Port == "" || url.Port == "0" {
url.Port = "80"
}
port, _ := strconv.Atoi(url.Port)
instance := vo.RegisterInstanceParam{
Ip: url.Ip,
Port: uint64(port),
Metadata: params,
Weight: 1,
Enable: true,
Healthy: true,
Ephemeral: true,
ServiceName: serviceName,
}
return instance
}
func (nr *nacosRegistry) Register(url common.URL) error {
serviceName := getServiceName(url)
param := createRegisterParam(url, serviceName)
isRegistry, err := nr.namingClient.RegisterInstance(param)
if err != nil {
return err
}
if !isRegistry {
return perrors.New("registry to nacos failed")
}
return nil
}
func (nr *nacosRegistry) Subscribe(conf common.URL) (registry.Listener, error) {
return NewNacosListener(conf, nr.namingClient)
}
func (nr *nacosRegistry) GetUrl() common.URL {
return *nr.URL
}
func (nr *nacosRegistry) IsAvailable() bool {
return true
}
func (nr *nacosRegistry) Destroy() {
return
}
package nacos
import (
"context"
"encoding/json"
"net/url"
"strconv"
"testing"
)
import (
"github.com/nacos-group/nacos-sdk-go/vo"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
func Test_Register(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider")
urlMap.Set(constant.VERSION_KEY, "1.0.0")
urlMap.Set(constant.CLUSTER_KEY, "mock")
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"}))
reg, err := newNacosRegistry(&regurl)
assert.Nil(t, err)
if err != nil {
t.Errorf("new nacos registry error:%s \n", err.Error())
return
}
err = reg.Register(url)
assert.Nil(t, err)
if err != nil {
t.Errorf("register error:%s \n", err.Error())
return
}
nacosReg := reg.(*nacosRegistry)
service, _ := nacosReg.namingClient.GetService(vo.GetServiceParam{ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"})
data, _ := json.Marshal(service)
t.Logf(string(data))
assert.Equal(t, 1, len(service.Hosts))
}
func TestNacosRegistry_Subscribe(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider")
urlMap.Set(constant.VERSION_KEY, "1.0.0")
urlMap.Set(constant.CLUSTER_KEY, "mock")
urlMap.Set(constant.NACOS_PATH_KEY, "")
url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"}))
reg, _ := newNacosRegistry(&regurl)
err := reg.Register(url)
assert.Nil(t, err)
if err != nil {
t.Errorf("new nacos registry error:%s \n", err.Error())
return
}
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
reg2, _ := newNacosRegistry(&regurl)
listener, err := reg2.Subscribe(url)
assert.Nil(t, err)
if err != nil {
t.Errorf("subscribe error:%s \n", err.Error())
return
}
serviceEvent, _ := listener.Next()
assert.NoError(t, err)
if err != nil {
t.Errorf("listener error:%s \n", err.Error())
return
}
t.Logf("serviceEvent:%+v \n", serviceEvent)
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
}
func TestNacosRegistry_Subscribe_del(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider")
urlMap.Set(constant.VERSION_KEY, "1.0.0")
urlMap.Set(constant.CLUSTER_KEY, "mock")
urlMap.Set(constant.NACOS_PATH_KEY, "")
url1, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"}))
url2, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.2:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"}))
reg, _ := newNacosRegistry(&regurl)
err := reg.Register(url1)
assert.Nil(t, err)
if err != nil {
t.Errorf("register1 error:%s \n", err.Error())
return
}
err = reg.Register(url2)
assert.Nil(t, err)
if err != nil {
t.Errorf("register2 error:%s \n", err.Error())
return
}
regurl.Params.Set(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
reg2, _ := newNacosRegistry(&regurl)
listener, err := reg2.Subscribe(url1)
assert.Nil(t, err)
if err != nil {
t.Errorf("subscribe error:%s \n", err.Error())
return
}
serviceEvent1, _ := listener.Next()
assert.NoError(t, err)
if err != nil {
t.Errorf("listener1 error:%s \n", err.Error())
return
}
t.Logf("serviceEvent1:%+v \n", serviceEvent1)
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent1.String())
serviceEvent2, _ := listener.Next()
assert.NoError(t, err)
if err != nil {
t.Errorf("listener2 error:%s \n", err.Error())
return
}
t.Logf("serviceEvent2:%+v \n", serviceEvent2)
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent2.String())
nacosReg := reg.(*nacosRegistry)
//手动注销实例
nacosReg.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{Ip: "127.0.0.2", Port: 20000, ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"})
serviceEvent3, _ := listener.Next()
assert.NoError(t, err)
if err != nil {
return
}
t.Logf("serviceEvent3:%+v \n", serviceEvent3)
assert.Regexp(t, ".*ServiceEvent{Action{delete}.*", serviceEvent3.String())
}
func TestNacosListener_Close(t *testing.T) {
regurl, _ := common.NewURL(context.TODO(), "registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider2")
urlMap.Set(constant.VERSION_KEY, "1.0.0")
urlMap.Set(constant.CLUSTER_KEY, "mock")
urlMap.Set(constant.NACOS_PATH_KEY, "")
url1, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider2", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"}))
reg, _ := newNacosRegistry(&regurl)
listener, err := reg.Subscribe(url1)
assert.Nil(t, err)
if err != nil {
t.Errorf("subscribe error:%s \n", err.Error())
return
}
listener.Close()
_, err = listener.Next()
assert.NotNil(t, err)
}
......@@ -52,6 +52,7 @@ const (
var serviceEventTypeStrings = [...]string{
"add",
"delete",
"update",
}
func (t EventType) String() string {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment