diff --git a/common/constant/default.go b/common/constant/default.go index 3c889158e460031f06b9401008c80f55200a46e4..6b9d914c87d3d7c1018647756c59aa874e1d63f1 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -74,3 +74,7 @@ const ( const ( COMMA_SPLIT_PATTERN = "\\s*[,]+\\s*" ) + +const ( + SIMPLE_METADATA_SERVICE_NAME = "MetadataService" +) diff --git a/common/constant/key.go b/common/constant/key.go index 6e91750ddd0309545f72ce71d4317af17fa7993e..eafed2eeeb88e0606ac92766c8dd84ff6b42b58e 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -80,6 +80,10 @@ const ( PROVIDER_SHUTDOWN_FILTER = "pshutdown" CONSUMER_SHUTDOWN_FILTER = "cshutdown" PID_KEY = "pid" + SYNC_REPORT_KEY = "sync.report" + RETRY_PERIOD_KEY = "retry.period" + RETRY_TIMES_KEY = "retry.times" + CYCLE_REPORT_KEY = "cycle.report" ) const ( @@ -213,9 +217,9 @@ const ( // consumer CONSUMER = "consumer" // key of access key id - ACCESS_KEY_ID_KEY = "accessKeyId" + ACCESS_KEY_ID_KEY = ".accessKeyId" // key of secret access key - SECRET_ACCESS_KEY_KEY = "secretAccessKey" + SECRET_ACCESS_KEY_KEY = ".secretAccessKey" ) // metadata report @@ -268,7 +272,7 @@ const ( // used by URL // SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used - SERVICE_NAME_MAPPING_KEY = "name_mapping" + SERVICE_NAME_MAPPING_KEY = "name_mapping" // SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used SERVICE_DISCOVERY_KEY = "service_discovery" ) diff --git a/common/observer/event.go b/common/observer/event.go index 8c3362feeee3d62315eb734460f486dcdbfe2f36..d78179043e8a2059e1d5fd15878fe32d7596321e 100644 --- a/common/observer/event.go +++ b/common/observer/event.go @@ -58,7 +58,7 @@ func (b *BaseEvent) String() string { return fmt.Sprintf("BaseEvent[source = %#v]", b.Source) } -func newBaseEvent(source interface{}) *BaseEvent { +func NewBaseEvent(source interface{}) *BaseEvent { return &BaseEvent{ Source: source, Timestamp: time.Now(), diff --git a/common/observer/event_listener.go b/common/observer/event_listener.go index 8db60d8475da49262947329fc71fd8e364d8d0af..fabad3a6ffb10cb27a7cc31b1c08cffe5ed853e0 100644 --- a/common/observer/event_listener.go +++ b/common/observer/event_listener.go @@ -42,7 +42,6 @@ type ConditionalEventListener interface { Accept(e Event) bool } -// TODO (implement ConditionalEventListener) -type ServiceInstancesChangedListener struct { - ServiceName string +type ChangedNotify interface { + Notify(e Event) } diff --git a/config/instance/metedata_report.go b/config/instance/metedata_report.go index 9cf435bc9debf09d0707d0a3b5d699c79d2cafa7..8e833dd70bcc0db8e65cd8703f2bc1859432a887 100644 --- a/config/instance/metedata_report.go +++ b/config/instance/metedata_report.go @@ -28,14 +28,31 @@ import ( ) var ( - instance report.MetadataReport - once sync.Once + instance report.MetadataReport + reportUrl common.URL + once sync.Once ) -// GetMetadataReportInstance ... -func GetMetadataReportInstance(url *common.URL) report.MetadataReport { +// GetMetadataReportInstance will return the instance in lazy mode. Be careful the instance create will only +// execute once. +func GetMetadataReportInstance(selectiveUrl ...*common.URL) report.MetadataReport { once.Do(func() { - instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + var url *common.URL + if len(selectiveUrl) > 0 { + url = selectiveUrl[0] + instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + reportUrl = *url + } }) return instance } + +// GetMetadataReportUrl will return the report instance url +func GetMetadataReportUrl() common.URL { + return reportUrl +} + +// SetMetadataReportUrl will only can be used by unit test to mock url +func SetMetadataReportUrl(url common.URL) { + reportUrl = url +} diff --git a/go.mod b/go.mod index 490844c92913f4d0fc3737cc8260c0a53aea9db2..ba3cf3e2190de476592278c13ba09c9adaaf947d 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/dubbogo/go-zookeeper v1.0.0 github.com/dubbogo/gost v1.9.0 github.com/emicklei/go-restful/v3 v3.0.0 + github.com/go-co-op/gocron v0.1.1 github.com/go-resty/resty/v2 v2.1.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 diff --git a/go.sum b/go.sum index 304025f27087dd66c409592c0a36c1a893ac4d60..eb84bde1fb26d84ff5a3a3e36ebe178a63cfb4c9 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-co-op/gocron v0.1.1 h1:OfDmkqkCguFtFMsm6Eaayci3DADLa8pXvdmOlPU/JcU= +github.com/go-co-op/gocron v0.1.1/go.mod h1:Y9PWlYqDChf2Nbgg7kfS+ZsXHDTZbMZYPEQ0MILqH+M= github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= @@ -149,6 +151,7 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+ github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg= github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc= github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I= +github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-resty/resty/v2 v2.1.0 h1:Z6IefCpUMfnvItVJaJXWv/pMiiD11So35QgwEELsldE= github.com/go-resty/resty/v2 v2.1.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck= @@ -385,8 +388,6 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo= -github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= github.com/nacos-group/nacos-sdk-go v0.3.1 h1:MI7bNDAN5m9UFcRRUTSPfJi4dCQo+TYG85qVB1rCHeg= github.com/nacos-group/nacos-sdk-go v0.3.1/go.mod h1:ESKb6yF0gxSc8GuS+0jaMBe+n8rJ5/k4ya6LyFG2xi8= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= @@ -396,10 +397,14 @@ github.com/oklog/run v0.0.0-20180308005104-6934b124db28/go.mod h1:dlhp/R75TPv97u github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI= @@ -533,6 +538,8 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -541,6 +548,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index 8d4a584ee53c7c5ebbc4cc2222134ca9e957068f..fa195d09d7efe022be9bdf40658e355a44b8705e 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -19,6 +19,9 @@ package definition import ( "bytes" + "encoding/json" + "fmt" + "strings" ) import ( @@ -26,6 +29,11 @@ import ( "github.com/apache/dubbo-go/common/constant" ) +// ServiceDefiner is a interface of service's definition +type ServiceDefiner interface { + ToBytes() ([]byte, error) +} + // ServiceDefinition is the describer of service definition type ServiceDefinition struct { CanonicalName string @@ -34,6 +42,36 @@ type ServiceDefinition struct { Types []TypeDefinition } +func (def *ServiceDefinition) ToBytes() ([]byte, error) { + return json.Marshal(def) +} + +func (def *ServiceDefinition) String() string { + var methodStr strings.Builder + for _, m := range def.Methods { + var paramType strings.Builder + for _, p := range m.ParameterTypes { + paramType.WriteString(fmt.Sprintf("{type:%v}", p)) + } + var param strings.Builder + for _, d := range m.Parameters { + param.WriteString(fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName)) + } + methodStr.WriteString(fmt.Sprintf("{name:%v,parameterTypes:[%v],returnType:%v,params:[%v] }", m.Name, paramType.String(), m.ReturnType, param.String())) + } + var types strings.Builder + for _, d := range def.Types { + types.WriteString(fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName)) + } + return fmt.Sprintf("{canonicalName:%v, codeSource:%v, methods:[%v], types:[%v]}", def.CanonicalName, def.CodeSource, methodStr.String(), types.String()) +} + +// FullServiceDefinition is the describer of service definition with parameters +type FullServiceDefinition struct { + ServiceDefinition + Params map[string]string +} + // MethodDefinition is the describer of method definition type MethodDefinition struct { Name string @@ -53,8 +91,8 @@ type TypeDefinition struct { } // BuildServiceDefinition can build service definition which will be used to describe a service -func BuildServiceDefinition(service common.Service, url common.URL) ServiceDefinition { - sd := ServiceDefinition{} +func BuildServiceDefinition(service common.Service, url common.URL) *ServiceDefinition { + sd := &ServiceDefinition{} sd.CanonicalName = url.Service() for k, m := range service.Method() { diff --git a/metadata/definition/definition_test.go b/metadata/definition/definition_test.go new file mode 100644 index 0000000000000000000000000000000000000000..958f9324d0f9f4a5027792bd8e54b238a5f56feb --- /dev/null +++ b/metadata/definition/definition_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package definition + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func TestBuildServiceDefinition(t *testing.T) { + serviceName := "com.ikurento.user.UserProvider" + group := "group1" + version := "0.0.1" + protocol := "dubbo" + beanName := "UserProvider" + url, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, serviceName, group, version, beanName)) + assert.NoError(t, err) + _, err = common.ServiceMap.Register(serviceName, protocol, &UserProvider{}) + assert.NoError(t, err) + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := BuildServiceDefinition(*service, url) + assert.Equal(t, "{canonicalName:com.ikurento.user.UserProvider, codeSource:, methods:[{name:GetUser,parameterTypes:[{type:slice}],returnType:ptr,params:[] }], types:[]}", sd.String()) +} diff --git a/metadata/definition/mock.go b/metadata/definition/mock.go new file mode 100644 index 0000000000000000000000000000000000000000..ca9e125a7480c2b6ff57d0b7cc820b537eb908f2 --- /dev/null +++ b/metadata/definition/mock.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package definition + +import ( + "context" + "time" +) + +type User struct { + Id string + Name string + Age int32 + Time time.Time +} + +type UserProvider struct { +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) { + rsp := User{"A001", "Alex Stocks", 18, time.Now()} + return &rsp, nil +} + +func (u *UserProvider) Reference() string { + return "UserProvider" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} diff --git a/metadata/identifier/service_metadata_identifier.go b/metadata/identifier/service_metadata_identifier.go index 92c15704db3bb873b3aff26297643119f8835f45..7cdb55e53db6fdc6092f17e2688bf8078559cbec 100644 --- a/metadata/identifier/service_metadata_identifier.go +++ b/metadata/identifier/service_metadata_identifier.go @@ -18,6 +18,7 @@ package identifier import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" ) @@ -28,6 +29,18 @@ type ServiceMetadataIdentifier struct { BaseMetadataIdentifier } +func NewServiceMetadataIdentifier(url common.URL) *ServiceMetadataIdentifier { + return &ServiceMetadataIdentifier{ + BaseMetadataIdentifier: BaseMetadataIdentifier{ + ServiceInterface: url.Service(), + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, ""), + Side: url.GetParam(constant.SIDE_KEY, ""), + }, + Protocol: url.Protocol, + } +} + // GetIdentifierKey will return string format as service:Version:Group:Side:Protocol:"revision"+Revision func (mdi *ServiceMetadataIdentifier) GetIdentifierKey() string { return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.Protocol, constant.KEY_REVISON_PREFIX+mdi.Revision) diff --git a/metadata/identifier/subscribe_metadata_identifier.go b/metadata/identifier/subscribe_metadata_identifier.go index e599fc9e0da1962d60d0bde2646eed552e26e95d..fa35ab79d66fe39062462e6f5ae43b562a3c6a91 100644 --- a/metadata/identifier/subscribe_metadata_identifier.go +++ b/metadata/identifier/subscribe_metadata_identifier.go @@ -20,7 +20,7 @@ package identifier // SubscriberMetadataIdentifier is inherit baseMetaIdentifier with service params: Revision type SubscriberMetadataIdentifier struct { Revision string - BaseMetadataIdentifier + MetadataIdentifier } // GetIdentifierKey will return string format as service:Version:Group:Side:Revision diff --git a/metadata/identifier/subscribe_metadata_identifier_test.go b/metadata/identifier/subscribe_metadata_identifier_test.go index 9c9ef70641c52222a09475e97a2afbb604a467ff..215aa3c5691f20d7790029093372389ce620398c 100644 --- a/metadata/identifier/subscribe_metadata_identifier_test.go +++ b/metadata/identifier/subscribe_metadata_identifier_test.go @@ -27,11 +27,13 @@ import ( var subscribeMetadataId = &SubscriberMetadataIdentifier{ Revision: "1.0", - BaseMetadataIdentifier: BaseMetadataIdentifier{ - ServiceInterface: "org.apache.pkg.mockService", - Version: "1.0.0", - Group: "Group", - Side: "provider", + MetadataIdentifier: MetadataIdentifier{ + BaseMetadataIdentifier: BaseMetadataIdentifier{ + ServiceInterface: "org.apache.pkg.mockService", + Version: "1.0.0", + Group: "Group", + Side: "provider", + }, }, } diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go new file mode 100644 index 0000000000000000000000000000000000000000..cb7e42030b2dec32b0537b20e2f825e638f228d0 --- /dev/null +++ b/metadata/report/delegate/delegate_report.go @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package delegate + +import ( + "encoding/json" + "runtime/debug" + "sync" + "time" +) + +import ( + "github.com/go-co-op/gocron" + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" +) + +const ( + // defaultMetadataReportRetryTimes is defined for max times to retry + defaultMetadataReportRetryTimes int64 = 100 + // defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second + defaultMetadataReportRetryPeriod int64 = 3 + // defaultMetadataReportRetryPeriod is defined for cycle report or not + defaultMetadataReportCycleReport bool = true +) + +// metadataReportRetry is a scheduler for retrying task +type metadataReportRetry struct { + retryPeriod int64 + retryLimit int64 + scheduler *gocron.Scheduler + job *gocron.Job + retryCounter *atomic.Int64 + // if no failed report, wait how many times to run retry task. + retryTimesIfNonFail int64 +} + +// newMetadataReportRetry will create a scheduler for retry task +func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) { + s1 := gocron.NewScheduler(time.UTC) + + mrr := &metadataReportRetry{ + retryPeriod: retryPeriod, + retryLimit: retryLimit, + scheduler: s1, + retryCounter: atomic.NewInt64(0), + retryTimesIfNonFail: 600, + } + + newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do( + func() { + mrr.retryCounter.Inc() + logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load()) + if mrr.retryCounter.Load() > mrr.retryLimit { + mrr.scheduler.Clear() + } else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail { + mrr.scheduler.Clear() // may not interrupt the running job + } + }) + + mrr.job = newJob + return mrr, err +} + +// startRetryTask will make scheduler with retry task run +func (mrr *metadataReportRetry) startRetryTask() { + mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) + mrr.scheduler.Start() +} + +// MetadataReport is a absolute delegate for MetadataReport +type MetadataReport struct { + reportUrl common.URL + syncReport bool + metadataReportRetry *metadataReportRetry + + failedReports map[*identifier.MetadataIdentifier]interface{} + failedReportsLock sync.RWMutex + + // allMetadataReports store all the metdadata reports records in memory + allMetadataReports map[*identifier.MetadataIdentifier]interface{} + allMetadataReportsLock sync.RWMutex +} + +// NewMetadataReport will create a MetadataReport with initiation +func NewMetadataReport() (*MetadataReport, error) { + url := instance.GetMetadataReportUrl() + bmr := &MetadataReport{ + reportUrl: url, + syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false), + failedReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), + allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4), + } + + mrr, err := newMetadataReportRetry( + url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod), + url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes), + bmr.retry, + ) + + if err != nil { + return nil, err + } + + bmr.metadataReportRetry = mrr + if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) { + scheduler := gocron.NewScheduler(time.UTC) + _, err := scheduler.Every(1).Day().Do( + func() { + logger.Info("start to publish all metadata in metadata report %v.", url) + bmr.allMetadataReportsLock.RLock() + bmr.doHandlerMetadataCollection(bmr.allMetadataReports) + bmr.allMetadataReportsLock.RUnlock() + + }) + if err != nil { + return nil, err + } + scheduler.StartAt(time.Now().Add(500 * time.Millisecond)) + scheduler.Start() + } + return bmr, nil +} + +// retry will do metadata failed reports collection by call metadata report sdk +func (mr *MetadataReport) retry() bool { + mr.failedReportsLock.RLock() + defer mr.failedReportsLock.RUnlock() + return mr.doHandlerMetadataCollection(mr.failedReports) +} + +// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition +func (mr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) { + if mr.syncReport { + mr.storeMetadataTask(common.PROVIDER, identifier, definer) + } + go mr.storeMetadataTask(common.PROVIDER, identifier, definer) +} + +// storeMetadataTask will delegate to call remote metadata's sdk to store +func (mr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) { + logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer) + mr.allMetadataReportsLock.Lock() + mr.allMetadataReports[identifier] = definer + mr.allMetadataReportsLock.Unlock() + + mr.failedReportsLock.Lock() + delete(mr.failedReports, identifier) + mr.failedReportsLock.Unlock() + // data is store the json marshaled definition + var ( + data []byte + err error + ) + + defer func() { + if r := recover(); r != nil { + mr.failedReportsLock.Lock() + mr.failedReports[identifier] = definer + mr.failedReportsLock.Unlock() + mr.metadataReportRetry.startRetryTask() + logger.Errorf("Failed to put provider metadata %v in %v, cause: %v\n%s\n", + identifier, string(data), r, string(debug.Stack())) + } + }() + + data, err = json.Marshal(definer) + if err != nil { + logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %+v", err) + panic(err) + } + report := instance.GetMetadataReportInstance() + if role == common.PROVIDER { + err = report.StoreProviderMetadata(identifier, string(data)) + } else if role == common.CONSUMER { + err = report.StoreConsumerMetadata(identifier, string(data)) + } + + if err != nil { + logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %+v", err) + panic(err) + } +} + +// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition +func (mr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) { + if mr.syncReport { + mr.storeMetadataTask(common.CONSUMER, identifier, definer) + } + go mr.storeMetadataTask(common.CONSUMER, identifier, definer) +} + +// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata +func (mr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error { + report := instance.GetMetadataReportInstance() + if mr.syncReport { + return report.SaveServiceMetadata(identifier, url) + } + go report.SaveServiceMetadata(identifier, url) + return nil +} + +// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata +func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error { + report := instance.GetMetadataReportInstance() + if mr.syncReport { + return report.RemoveServiceMetadata(identifier) + } + go report.RemoveServiceMetadata(identifier) + return nil +} + +// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls +func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string { + report := instance.GetMetadataReportInstance() + return report.GetExportedURLs(identifier) +} + +// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data +func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { + report := instance.GetMetadataReportInstance() + if mr.syncReport { + return report.SaveSubscribedData(identifier, urls) + } + go report.SaveSubscribedData(identifier, urls) + return nil +} + +// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls +func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string { + report := instance.GetMetadataReportInstance() + return report.GetSubscribedURLs(identifier) +} + +// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions +func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string { + report := instance.GetMetadataReportInstance() + return report.GetServiceDefinition(identifier) +} + +// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap +func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool { + if len(metadataMap) == 0 { + return true + } + for e := range metadataMap { + if common.RoleType(common.PROVIDER).Role() == e.Side { + mr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition)) + } else if common.RoleType(common.CONSUMER).Role() == e.Side { + mr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string)) + } + } + return false +} diff --git a/metadata/report/delegate/delegate_report_test.go b/metadata/report/delegate/delegate_report_test.go new file mode 100644 index 0000000000000000000000000000000000000000..04c9e6483929d3ed58fd85337db6ccb4ebd53d00 --- /dev/null +++ b/metadata/report/delegate/delegate_report_test.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package delegate + +import ( + "fmt" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" +) + +func TestMetadataReport_MetadataReportRetry(t *testing.T) { + counter := atomic.NewInt64(1) + + retry, err := newMetadataReportRetry(1, 10, func() bool { + counter.Add(1) + return true + }) + assert.NoError(t, err) + retry.startRetryTask() + itsTime := time.After(2500 * time.Millisecond) + select { + case <-itsTime: + retry.scheduler.Clear() + assert.Equal(t, counter.Load(), int64(3)) + logger.Info("over") + } +} + +func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) { + counter := atomic.NewInt64(1) + + retry, err := newMetadataReportRetry(1, 1, func() bool { + counter.Add(1) + return true + }) + assert.NoError(t, err) + retry.startRetryTask() + itsTime := time.After(2500 * time.Millisecond) + select { + case <-itsTime: + retry.scheduler.Clear() + assert.Equal(t, counter.Load(), int64(2)) + logger.Info("over") + } +} + +func mockNewMetadataReport(t *testing.T) *MetadataReport { + syncReportKey := "false" + retryPeroidKey := "3" + retryTimesKey := "100" + cycleReportKey := "true" + + url, err := common.NewURL(fmt.Sprintf( + "test://127.0.0.1:20000/?"+constant.SYNC_REPORT_KEY+"=%v&"+constant.RETRY_PERIOD_KEY+"=%v&"+ + constant.RETRY_TIMES_KEY+"=%v&"+constant.CYCLE_REPORT_KEY+"=%v", + syncReportKey, retryPeroidKey, retryTimesKey, cycleReportKey)) + assert.NoError(t, err) + instance.SetMetadataReportUrl(url) + mtr, err := NewMetadataReport() + assert.NoError(t, err) + assert.NotNil(t, mtr) + return mtr +} + +func TestMetadataReport_StoreProviderMetadata(t *testing.T) { + mtr := mockNewMetadataReport(t) + var metadataId = &identifier.MetadataIdentifier{ + Application: "app", + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: "com.ikurento.user.UserProvider", + Version: "0.0.1", + Group: "group1", + Side: "provider", + }, + } + + mtr.StoreProviderMetadata(metadataId, getMockDefinition(metadataId, t)) +} + +func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) *definition.ServiceDefinition { + protocol := "dubbo" + beanName := "UserProvider" + url, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, id.ServiceInterface, id.Group, id.Version, beanName)) + assert.NoError(t, err) + _, err = common.ServiceMap.Register(id.ServiceInterface, protocol, &definition.UserProvider{}) + assert.NoError(t, err) + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + return definition.BuildServiceDefinition(*service, url) +} diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go new file mode 100644 index 0000000000000000000000000000000000000000..9dbec518220c3692257d29b1f01d78b0894f249e --- /dev/null +++ b/metadata/report/nacos/report.go @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "encoding/json" + "net/url" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/clients/config_client" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" + "github.com/apache/dubbo-go/metadata/report/factory" + "github.com/apache/dubbo-go/remoting/nacos" +) + +func init() { + ftry := &nacosMetadataReportFactory{} + extension.SetMetadataReportFactory("nacos", func() factory.MetadataReportFactory { + return ftry + }) +} + +// nacosMetadataReport is the implementation of MetadataReport based Nacos +type nacosMetadataReport struct { + client config_client.IConfigClient +} + +// StoreProviderMetadata will store the metadata +func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error { + return n.storeMetadata(vo.ConfigParam{ + DataId: providerIdentifier.GetIdentifierKey(), + Group: providerIdentifier.Group, + Content: serviceDefinitions, + }) +} + +// StoreConsumerMetadata will store the metadata +func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error { + return n.storeMetadata(vo.ConfigParam{ + DataId: consumerMetadataIdentifier.GetIdentifierKey(), + Group: consumerMetadataIdentifier.Group, + Content: serviceParameterString, + }) +} + +// SaveServiceMetadata will store the metadata +func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error { + return n.storeMetadata(vo.ConfigParam{ + DataId: metadataIdentifier.GetIdentifierKey(), + Group: metadataIdentifier.Group, + Content: url.String(), + }) +} + +// RemoveServiceMetadata will remove the service metadata +func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error { + return n.deleteMetadata(vo.ConfigParam{ + DataId: metadataIdentifier.GetIdentifierKey(), + Group: metadataIdentifier.Group, + }) +} + +// GetExportedURLs will look up the exported urls. +// if not found, an empty list will be returned. +func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string { + return n.getConfigAsArray(vo.ConfigParam{ + DataId: metadataIdentifier.GetIdentifierKey(), + Group: metadataIdentifier.Group, + }) +} + +// SaveSubscribedData will convert the urlList to json array and then store it +func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlList []common.URL) error { + if len(urlList) == 0 { + logger.Warnf("The url list is empty") + return nil + } + urlStrList := make([]string, 0, len(urlList)) + + for _, e := range urlList { + urlStrList = append(urlStrList, e.String()) + } + + bytes, err := json.Marshal(urlStrList) + + if err != nil { + return perrors.WithMessage(err, "Could not convert the array to json") + } + return n.storeMetadata(vo.ConfigParam{ + DataId: subscriberMetadataIdentifier.GetIdentifierKey(), + Group: subscriberMetadataIdentifier.Group, + Content: string(bytes), + }) +} + +// GetSubscribedURLs will lookup the url +// if not found, an empty list will be returned +func (n *nacosMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string { + return n.getConfigAsArray(vo.ConfigParam{ + DataId: subscriberMetadataIdentifier.GetIdentifierKey(), + Group: subscriberMetadataIdentifier.Group, + }) +} + +// GetServiceDefinition will lookup the service definition +func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string { + return n.getConfig(vo.ConfigParam{ + DataId: metadataIdentifier.GetIdentifierKey(), + Group: metadataIdentifier.Group, + }) +} + +// storeMetadata will publish the metadata to Nacos +// if failed or error is not nil, error will be returned +func (n *nacosMetadataReport) storeMetadata(param vo.ConfigParam) error { + res, err := n.client.PublishConfig(param) + if err != nil { + return perrors.WithMessage(err, "Could not publish the metadata") + } + if !res { + return perrors.New("Publish the metadata failed.") + } + return nil +} + +// deleteMetadata will delete the metadata +func (n *nacosMetadataReport) deleteMetadata(param vo.ConfigParam) error { + res, err := n.client.DeleteConfig(param) + if err != nil { + return perrors.WithMessage(err, "Could not delete the metadata") + } + if !res { + return perrors.New("Deleting the metadata failed.") + } + return nil +} + +// getConfigAsArray will read the config and then convert it as an one-element array +// error or config not found, an empty list will be returned. +func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) []string { + cfg := n.getConfig(param) + res := make([]string, 0, 1) + if len(cfg) == 0 { + return res + } + decodeCfg, err := url.QueryUnescape(cfg) + if err != nil { + logger.Errorf("The config is invalid: %s", cfg) + } + res = append(res, decodeCfg) + return res +} + +// getConfig will read the config +func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) string { + cfg, err := n.client.GetConfig(param) + if err != nil { + logger.Errorf("Finding the configuration failed: %v", param) + } + return cfg +} + +type nacosMetadataReportFactory struct { +} + +func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport { + client, err := nacos.NewNacosConfigClient(url) + if err != nil { + logger.Errorf("Could not create nacos metadata report. URL: %s", url.String()) + return nil + } + return &nacosMetadataReport{client: client} +} diff --git a/metadata/report/nacos/report_test.go b/metadata/report/nacos/report_test.go new file mode 100644 index 0000000000000000000000000000000000000000..711e6281a295005501ad9384f6fc3433c2e2830d --- /dev/null +++ b/metadata/report/nacos/report_test.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" +) + +func TestNacosMetadataReport_CRUD(t *testing.T) { + rpt := newTestReport() + assert.NotNil(t, rpt) + + providerMi := newMetadataIdentifier("server") + providerMeta := "provider" + err := rpt.StoreProviderMetadata(providerMi, providerMeta) + + consumerMi := newMetadataIdentifier("client") + consumerMeta := "consumer" + err = rpt.StoreConsumerMetadata(consumerMi, consumerMeta) + assert.Nil(t, err) + + serviceMi := newServiceMetadataIdentifier() + serviceUrl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + + err = rpt.SaveServiceMetadata(serviceMi, serviceUrl) + assert.Nil(t, err) + + exportedUrls := rpt.GetExportedURLs(serviceMi) + assert.Equal(t, 1, len(exportedUrls)) + + subMi := newSubscribeMetadataIdentifier() + urlList := make([]common.URL, 0, 1) + urlList = append(urlList, serviceUrl) + err = rpt.SaveSubscribedData(subMi, urlList) + assert.Nil(t, err) + + subscribeUrl := rpt.GetSubscribedURLs(subMi) + assert.Equal(t, 1, len(subscribeUrl)) + + err = rpt.RemoveServiceMetadata(serviceMi) + assert.Nil(t, err) + +} + +func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier { + return &identifier.SubscriberMetadataIdentifier{ + Revision: "subscribe", + MetadataIdentifier: *newMetadataIdentifier("provider"), + } + +} + +func newServiceMetadataIdentifier() *identifier.ServiceMetadataIdentifier { + return &identifier.ServiceMetadataIdentifier{ + Protocol: "nacos", + Revision: "a", + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: "com.test.MyTest", + Version: "1.0.0", + Group: "test_group", + Side: "service", + }, + } +} + +func newMetadataIdentifier(side string) *identifier.MetadataIdentifier { + return &identifier.MetadataIdentifier{ + Application: "test", + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: "com.test.MyTest", + Version: "1.0.0", + Group: "test_group", + Side: side, + }, + } +} + +func TestNacosMetadataReportFactory_CreateMetadataReport(t *testing.T) { + res := newTestReport() + assert.NotNil(t, res) +} + +func newTestReport() report.MetadataReport { + regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(®url) + return res +} diff --git a/metadata/report/report.go b/metadata/report/report.go index 81227e0c765b61df7edc8a5d025b9cd1921d1113..61cdda1f9663a9f4eaed157d7c0232e4e911c80d 100644 --- a/metadata/report/report.go +++ b/metadata/report/report.go @@ -19,18 +19,17 @@ package report import ( "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/metadata/definition" "github.com/apache/dubbo-go/metadata/identifier" ) // MetadataReport is an interface of remote metadata report type MetadataReport interface { - StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition) - StoreConsumerMetadata(*identifier.MetadataIdentifier, map[string]string) - SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, *common.URL) - RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) + StoreProviderMetadata(*identifier.MetadataIdentifier, string) error + StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error + SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error + RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string - SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []*common.URL) + SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string - GetServiceDefinition(*identifier.MetadataIdentifier) + GetServiceDefinition(*identifier.MetadataIdentifier) string } diff --git a/metadata/service/exporter/configurable/exporter.go b/metadata/service/exporter/configurable/exporter.go index 3d12e0ecd4def4b9d99f346a4f556fc3d781d1b2..ec3f8ec2d0a6f5baacfe962d06fe50fcf634a981 100644 --- a/metadata/service/exporter/configurable/exporter.go +++ b/metadata/service/exporter/configurable/exporter.go @@ -49,7 +49,7 @@ func NewMetadataServiceExporter(metadataService service.MetadataService) exporte func (exporter *MetadataServiceExporter) Export() error { if !exporter.IsExported() { - serviceConfig := config.NewServiceConfig("MetadataService", context.Background()) + serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background()) serviceConfig.Protocol = constant.DEFAULT_PROTOCOL serviceConfig.Protocols = map[string]*config.ProtocolConfig{ constant.DEFAULT_PROTOCOL: generateMetadataProtocol(), diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index c59949401f419b44ce155a914a7afff7c327a8fe..4b6f4330a1b88ecaad094df052bb7b53a96c6413 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -17,7 +17,6 @@ package inmemory import ( - "encoding/json" "sync" ) @@ -34,6 +33,9 @@ import ( "github.com/apache/dubbo-go/metadata/service" ) +// version will be used by Version func +const version = "1.0.0" + // MetadataService is store and query the metadata info in memory when each service registry type MetadataService struct { service.BaseMetadataService @@ -53,13 +55,13 @@ func NewMetadataService() *MetadataService { } } -// comparator is defined as Comparator for skip list to compare the URL -type comparator common.URL +// Comparator is defined as Comparator for skip list to compare the URL +type Comparator common.URL // Compare is defined as Comparator for skip list to compare the URL -func (c comparator) Compare(comp cm.Comparator) int { +func (c Comparator) Compare(comp cm.Comparator) int { a := common.URL(c).String() - b := common.URL(comp.(comparator)).String() + b := common.URL(comp.(Comparator)).String() switch { case a > b: return 1 @@ -79,7 +81,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { logger.Debug(url.ServiceKey()) if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded { mts.lock.RLock() - wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url)) + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) if len(wantedUrl) > 0 && wantedUrl[0] != nil { mts.lock.RUnlock() return false @@ -88,12 +90,12 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { } mts.lock.Lock() //double chk - wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url)) + wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url)) if len(wantedUrl) > 0 && wantedUrl[0] != nil { mts.lock.Unlock() return false } - urlSet.(*skip.SkipList).Insert(comparator(*url)) + urlSet.(*skip.SkipList).Insert(Comparator(*url)) mts.lock.Unlock() return true } @@ -102,7 +104,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) { if value, loaded := targetMap.Load(url.ServiceKey()); loaded { mts.lock.Lock() - value.(*skip.SkipList).Delete(comparator(*url)) + value.(*skip.SkipList).Delete(Comparator(*url)) mts.lock.Unlock() mts.lock.RLock() defer mts.lock.RUnlock() @@ -118,9 +120,9 @@ func (mts *MetadataService) getAllService(services *sync.Map) *skip.SkipList { services.Range(func(key, value interface{}) bool { urls := value.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { - url := common.URL(urls.ByPosition(i).(comparator)) - if url.GetParam(constant.INTERFACE_KEY, url.Path) != "MetadataService" { - skipList.Insert(comparator(url)) + url := common.URL(urls.ByPosition(i).(Comparator)) + if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.SIMPLE_METADATA_SERVICE_NAME { + skipList.Insert(Comparator(url)) } } return true @@ -135,9 +137,9 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s if loaded { urls := serviceList.(*skip.SkipList) for i := uint64(0); i < urls.Len(); i++ { - url := common.URL(urls.ByPosition(i).(comparator)) + url := common.URL(urls.ByPosition(i).(Comparator)) if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol { - skipList.Insert(comparator(url)) + skipList.Insert(Comparator(url)) } } } @@ -182,9 +184,9 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { // //TODO:generate the service definition and store it //} sd := definition.BuildServiceDefinition(*service, url) - data, err := json.Marshal(sd) + data, err := sd.ToBytes() if err != nil { - logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error: ", url, err) + logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err) } mts.serviceDefinitions.Store(url.ServiceKey(), string(data)) return nil @@ -221,12 +223,12 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) return v.(string), nil } -// Version will return the version of metadata service -func (mts *MetadataService) Version() string { - return "1.0.0" +// RefreshMetadata will always return true because it will be implement by remote service +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { + return true } // Version will return the version of metadata service -func (mts *MetadataService) Reference() string { - return "MetadataService" +func (mts *MetadataService) Version() string { + return version } diff --git a/metadata/service/inmemory/service_test.go b/metadata/service/inmemory/service_test.go index 9e593db282e7f4fa55d52c49129a15a9b389c67f..fc0410ecca9f83502017bca94ba706abb8ee14be 100644 --- a/metadata/service/inmemory/service_test.go +++ b/metadata/service/inmemory/service_test.go @@ -18,10 +18,8 @@ package inmemory import ( - "context" "fmt" "testing" - "time" ) import ( @@ -33,29 +31,6 @@ import ( "github.com/apache/dubbo-go/metadata/definition" ) -type User struct { - Id string - Name string - Age int32 - Time time.Time -} - -type UserProvider struct { -} - -func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) { - rsp := User{"A001", "Alex Stocks", 18, time.Now()} - return &rsp, nil -} - -func (u *UserProvider) Reference() string { - return "UserProvider" -} - -func (u User) JavaClassName() string { - return "com.ikurento.user.User" -} - func TestMetadataService(t *testing.T) { mts := NewMetadataService() serviceName := "com.ikurento.user.UserProvider" @@ -111,7 +86,7 @@ func TestMetadataService(t *testing.T) { list4, _ := mts.GetSubscribedURLs() assert.Equal(t, uint64(0), list4.Len()) - userProvider := &UserProvider{} + userProvider := &definition.UserProvider{} common.ServiceMap.Register(serviceName, protocol, userProvider) mts.PublishServiceDefinition(u) expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," + diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go new file mode 100644 index 0000000000000000000000000000000000000000..f55c482ad846d801e57e2a98436161c6c70165c4 --- /dev/null +++ b/metadata/service/remote/service.go @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remote + +import ( + "github.com/Workiva/go-datastructures/slice/skip" + "go.uber.org/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report/delegate" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/metadata/service/inmemory" +) + +// version will be used by Version func +const version = "1.0.0" + +// MetadataService is a implement of metadata service which will delegate the remote metadata report +type MetadataService struct { + service.BaseMetadataService + inMemoryMetadataService *inmemory.MetadataService + exportedRevision atomic.String + subscribedRevision atomic.String + delegateReport *delegate.MetadataReport +} + +// NewMetadataService will create a new remote MetadataService instance +func NewMetadataService() (*MetadataService, error) { + mr, err := delegate.NewMetadataReport() + if err != nil { + return nil, err + } + return &MetadataService{ + inMemoryMetadataService: inmemory.NewMetadataService(), + delegateReport: mr, + }, nil +} + +// setInMemoryMetadataService will replace the in memory metadata service by the specific param +func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) { + mts.inMemoryMetadataService = metadata +} + +// ExportURL will be implemented by in memory service +func (mts *MetadataService) ExportURL(url common.URL) (bool, error) { + return true, nil +} + +// UnexportURL +func (mts *MetadataService) UnexportURL(url common.URL) error { + smi := identifier.NewServiceMetadataIdentifier(url) + smi.Revision = mts.exportedRevision.Load() + return mts.delegateReport.RemoveServiceMetadata(smi) +} + +// SubscribeURL will be implemented by in memory service +func (MetadataService) SubscribeURL(url common.URL) (bool, error) { + return true, nil +} + +// UnsubscribeURL will be implemented by in memory service +func (MetadataService) UnsubscribeURL(url common.URL) error { + return nil +} + +// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition +func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { + interfaceName := url.GetParam(constant.INTERFACE_KEY, "") + isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) + if len(interfaceName) > 0 && !isGeneric { + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := definition.BuildServiceDefinition(*service, url) + id := &identifier.MetadataIdentifier{ + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: interfaceName, + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, ""), + }, + } + mts.delegateReport.StoreProviderMetadata(id, sd) + } + logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + return nil +} + +// GetExportedURLs will be implemented by in memory service +func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) { + return nil, nil +} + +// GetSubscribedURLs will be implemented by in memory service +func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) { + return nil, nil +} + +// GetServiceDefinition will be implemented by in memory service +func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + return "", nil +} + +// GetServiceDefinitionByServiceKey will be implemented by in memory service +func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + return "", nil +} + +// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service +func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { + result := true + if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() { + mts.exportedRevision.Store(exportedRevision) + urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "") + if err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) + result = false + } + iterator := urls.Iter(inmemory.Comparator{}) + logger.Infof("urls length = %v", urls.Len()) + for { + if !iterator.Next() { + break + } + url := iterator.Value().(inmemory.Comparator) + id := identifier.NewServiceMetadataIdentifier(common.URL(url)) + id.Revision = mts.exportedRevision.Load() + if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) + result = false + } + } + } + + if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() { + mts.subscribedRevision.Store(subscribedRevision) + urls, err := mts.inMemoryMetadataService.GetSubscribedURLs() + if err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err) + result = false + } + if urls != nil && urls.Len() > 0 { + id := &identifier.SubscriberMetadataIdentifier{ + MetadataIdentifier: identifier.MetadataIdentifier{ + Application: config.GetApplicationConfig().Name, + }, + Revision: subscribedRevision, + } + if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil { + logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err) + result = false + } + } + } + return result +} + +// Version will return the remote service version +func (MetadataService) Version() string { + return version +} + +// convertUrls will convert the skip list to slice +func convertUrls(list *skip.SkipList) []common.URL { + urls := make([]common.URL, list.Len()) + iterator := list.Iter(inmemory.Comparator{}) + for { + if iterator.Value() == nil { + break + } + url := iterator.Value().(inmemory.Comparator) + urls = append(urls, common.URL(url)) + if !iterator.Next() { + break + } + } + return urls +} diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go new file mode 100644 index 0000000000000000000000000000000000000000..308c631e413be9c3c6735f31c56da2e8f0697333 --- /dev/null +++ b/metadata/service/remote/service_test.go @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package remote + +import ( + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config/instance" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" + "github.com/apache/dubbo-go/metadata/report/factory" + "github.com/apache/dubbo-go/metadata/service/inmemory" +) + +var serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4) +var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4) + +func getMetadataReportFactory() factory.MetadataReportFactory { + return &metadataReportFactory{} +} + +type metadataReportFactory struct { +} + +func (mrf *metadataReportFactory) CreateMetadataReport(*common.URL) report.MetadataReport { + return &metadataReport{} +} + +type metadataReport struct { +} + +func (metadataReport) StoreProviderMetadata(*identifier.MetadataIdentifier, string) error { + return nil +} + +func (metadataReport) StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error { + return nil +} + +func (mr *metadataReport) SaveServiceMetadata(id *identifier.ServiceMetadataIdentifier, url common.URL) error { + logger.Infof("SaveServiceMetadata , url is %v", url) + serviceMetadata[id] = url + return nil +} + +func (metadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error { + return nil +} + +func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string { + return nil +} + +func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls []common.URL) error { + logger.Infof("SaveSubscribedData, , url is %v", urls) + subscribedMetadata[id] = urls + return nil +} + +func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string { + return nil +} + +func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) string { + return "" +} + +func TestMetadataService(t *testing.T) { + extension.SetMetadataReportFactory("mock", getMetadataReportFactory) + u, err := common.NewURL(fmt.Sprintf( + "mock://127.0.0.1:20000/?sync.report=true")) + assert.NoError(t, err) + instance.GetMetadataReportInstance(&u) + mts, err := NewMetadataService() + assert.NoError(t, err) + mts.setInMemoryMetadataService(mockInmemoryProc(t)) + mts.RefreshMetadata("0.0.1", "0.0.1") + assert.Equal(t, 1, len(serviceMetadata)) + assert.Equal(t, 1, len(subscribedMetadata)) +} + +func mockInmemoryProc(t *testing.T) *inmemory.MetadataService { + mts := inmemory.NewMetadataService() + serviceName := "com.ikurento.user.UserProvider" + group := "group1" + version := "0.0.1" + protocol := "dubbo" + beanName := "UserProvider" + + u, err := common.NewURL(fmt.Sprintf( + "%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+ + "owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", + protocol, serviceName, group, version, beanName)) + assert.NoError(t, err) + mts.ExportURL(u) + + mts.SubscribeURL(u) + + userProvider := &definition.UserProvider{} + common.ServiceMap.Register(serviceName, protocol, userProvider) + mts.PublishServiceDefinition(u) + expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," + + "\"Methods\":[{\"Name\":\"GetUser\",\"ParameterTypes\":[\"slice\"],\"ReturnType\":\"ptr\"," + + "\"Parameters\":null}],\"Types\":null}" + def1, _ := mts.GetServiceDefinition(serviceName, group, version) + assert.Equal(t, expected, def1) + serviceKey := definition.ServiceDescriperBuild(serviceName, group, version) + def2, _ := mts.GetServiceDefinitionByServiceKey(serviceKey) + assert.Equal(t, expected, def2) + return mts +} diff --git a/metadata/service/service.go b/metadata/service/service.go index bc526c5411383f0d5cee971cef4f84d6f4f48f59..13464087ed83a79064410e5b37cde79e09317c38 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -23,6 +23,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/config" ) @@ -49,6 +50,8 @@ type MetadataService interface { GetServiceDefinition(interfaceName string, group string, version string) (string, error) // GetServiceDefinition will get the target service info store in metadata by service key GetServiceDefinitionByServiceKey(serviceKey string) (string, error) + // RefreshMetadata will refresh the metadata + RefreshMetadata(exportedRevision string, subscribedRevision string) bool // Version will return the metadata service version Version() string } @@ -61,3 +64,8 @@ type BaseMetadataService struct { func (mts *BaseMetadataService) ServiceName() (string, error) { return config.GetApplicationConfig().Name, nil } + +// Version will return the version of metadata service +func (mts *BaseMetadataService) Reference() string { + return constant.SIMPLE_METADATA_SERVICE_NAME +} diff --git a/registry/base_registry.go b/registry/base_registry.go index 3e1bddf233310871182544b6415c10c8df27e622..ad1a3b61741e003625612ad58409eb8615271a84 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -56,6 +56,8 @@ func init() { localIP, _ = gxnet.GetLocalIP() } +type createPathFunc func(dubboPath string) error + /* * -----------------------------------NOTICE--------------------------------------------- * If there is no special case, you'd better inherit BaseRegistry and implement the @@ -74,8 +76,12 @@ type FacadeBasedRegistry interface { CreatePath(string) error // DoRegister actually do the register job DoRegister(string, string) error + // DoUnregister do the unregister job + DoUnregister(string, string) error // DoSubscribe actually subscribe the URL DoSubscribe(conf *common.URL) (Listener, error) + // DoUnsubscribe does unsubscribe the URL + DoUnsubscribe(conf *common.URL) (Listener, error) // CloseAndNilClient close the client and then reset the client in registry to nil // you should notice that this method will be invoked inside a lock. // So you should implement this method as light weighted as you can. @@ -94,7 +100,7 @@ type BaseRegistry struct { birth int64 // time of file birth, seconds since Epoch; 0 if unknown wg sync.WaitGroup // wg+done for zk restart done chan struct{} - cltLock sync.Mutex //ctl lock is a lock for services map + cltLock sync.RWMutex //ctl lock is a lock for services map services map[string]common.URL // service name + protocol -> service config, for store the service registered } @@ -154,6 +160,43 @@ func (r *BaseRegistry) Register(conf common.URL) error { return nil } +// UnRegister implement interface registry to unregister +func (r *BaseRegistry) UnRegister(conf common.URL) error { + var ( + ok bool + err error + oldURL common.URL + ) + + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + oldURL, ok = r.services[conf.Key()] + + if !ok { + err = perrors.Errorf("Path{%s} has not registered", conf.Key()) + } + + delete(r.services, conf.Key()) + }() + + if err != nil { + return err + } + + err = r.unregister(conf) + if err != nil { + func() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + r.services[conf.Key()] = oldURL + }() + return perrors.WithMessagef(err, "register(conf:%+v)", conf) + } + + return nil +} + // service is for getting service path stored in url func (r *BaseRegistry) service(c common.URL) string { return url.QueryEscape(c.Service()) @@ -189,6 +232,18 @@ func (r *BaseRegistry) RestartCallBack() bool { // register for register url to registry, include init params func (r *BaseRegistry) register(c common.URL) error { + return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath) +} + +// unregister for unregister url to registry, include init params +func (r *BaseRegistry) unregister(c common.URL) error { + return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil) +} + +func (r *BaseRegistry) processURL(c common.URL, f func(string, string) error, cpf createPathFunc) error { + if f == nil { + panic(" Must provide a `function(string, string) error` to process URL. ") + } var ( err error //revision string @@ -213,15 +268,15 @@ func (r *BaseRegistry) register(c common.URL) error { switch role { case common.PROVIDER: - dubboPath, rawURL, err = r.providerRegistry(c, params) + dubboPath, rawURL, err = r.providerRegistry(c, params, cpf) case common.CONSUMER: - dubboPath, rawURL, err = r.consumerRegistry(c, params) + dubboPath, rawURL, err = r.consumerRegistry(c, params, cpf) default: return perrors.Errorf("@c{%v} type is not referencer or provider", c) } encodedURL = url.QueryEscape(rawURL) dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") - err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL) + err = f(dubboPath, encodedURL) if err != nil { return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL) @@ -229,8 +284,15 @@ func (r *BaseRegistry) register(c common.URL) error { return nil } +// createPath will create dubbo path in register +func (r *BaseRegistry) createPath(dubboPath string) error { + r.cltLock.Lock() + defer r.cltLock.Unlock() + return r.facadeBasedRegistry.CreatePath(dubboPath) +} + // providerRegistry for provider role do -func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) { +func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) { var ( dubboPath string rawURL string @@ -240,11 +302,9 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - }() + if f != nil { + err = f(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath) @@ -274,7 +334,7 @@ func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string } // consumerRegistry for consumer role do -func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) { +func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values, f createPathFunc) (string, string, error) { var ( dubboPath string rawURL string @@ -282,23 +342,18 @@ func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string ) dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - - }() + if f != nil { + err = f(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) return "", "", perrors.WithStack(err) } dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER]) - func() { - r.cltLock.Lock() - defer r.cltLock.Unlock() - err = r.facadeBasedRegistry.CreatePath(dubboPath) - }() + if f != nil { + err = f(dubboPath) + } if err != nil { logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) @@ -323,20 +378,20 @@ func sleepWait(n int) { } // Subscribe :subscribe from registry, event will notify by notifyListener -func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { +func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { n := 0 for { n++ if !r.IsAvailable() { logger.Warnf("event listener game over.") - return + return perrors.New("BaseRegistry is not available.") } listener, err := r.facadeBasedRegistry.DoSubscribe(url) if err != nil { if !r.IsAvailable() { logger.Warnf("event listener game over.") - return + return err } logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) time.Sleep(time.Duration(RegistryConnDelay) * time.Second) @@ -358,6 +413,37 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } } +// UnSubscribe URL +func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return perrors.New("BaseRegistry is not available.") + } + + listener, err := r.facadeBasedRegistry.DoUnsubscribe(url) + if err != nil { + if !r.IsAvailable() { + logger.Warnf("event listener game over.") + return perrors.New("BaseRegistry is not available.") + } + logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) + return perrors.WithStack(err) + } + + for { + if serviceEvent, err := listener.Next(); err != nil { + logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) + listener.Close() + break + } else { + logger.Infof("update begin, service event: %v", serviceEvent.String()) + notifyListener.Notify(serviceEvent) + } + + } + return nil +} + // closeRegisters close and remove registry client and reset services map func (r *BaseRegistry) closeRegisters() { logger.Infof("begin to close provider client") diff --git a/registry/common/event_publishing_service_deiscovery_test.go b/registry/common/event_publishing_service_deiscovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1e08335e04232c9b5944e34133e4e979bee9ee74 --- /dev/null +++ b/registry/common/event_publishing_service_deiscovery_test.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "reflect" + "testing" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + dispatcher2 "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/registry" +) + +func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) { + dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{}) + tsd := &TestServiceDiscoveryDestroyingEventListener{} + tsd.SetT(t) + tsi := &TestServiceInstancePreRegisteredEventListener{} + tsi.SetT(t) + extension.AddEventListener(tsd) + extension.AddEventListener(tsi) + extension.SetEventDispatcher("direct", dispatcher2.NewDirectEventDispatcher) + extension.SetAndInitGlobalDispatcher("direct") + err := dc.Destroy() + assert.Nil(t, err) + si := ®istry.DefaultServiceInstance{Id: "testServiceInstance"} + err = dc.Register(si) + assert.Nil(t, err) + +} + +type TestServiceDiscoveryDestroyingEventListener struct { + suite.Suite + observer.BaseListenable +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) OnEvent(e observer.Event) error { + e1, ok := e.(*ServiceDiscoveryDestroyingEvent) + assert.Equal(tel.T(), ok, true) + assert.Equal(tel.T(), "testServiceDiscovery", e1.GetOriginal().String()) + assert.Equal(tel.T(), "testServiceDiscovery", e1.GetServiceDiscovery().String()) + return nil +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) GetPriority() int { + return -1 +} + +func (tel *TestServiceDiscoveryDestroyingEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(ServiceDiscoveryDestroyingEvent{}) +} + +type TestServiceInstancePreRegisteredEventListener struct { + suite.Suite + observer.BaseListenable +} + +func (tel *TestServiceInstancePreRegisteredEventListener) OnEvent(e observer.Event) error { + e1, ok := e.(*ServiceInstancePreRegisteredEvent) + assert.Equal(tel.T(), ok, true) + assert.Equal(tel.T(), "testServiceInstance", e1.getServiceInstance().GetId()) + return nil +} + +func (tel *TestServiceInstancePreRegisteredEventListener) GetPriority() int { + return -1 +} + +func (tel *TestServiceInstancePreRegisteredEventListener) GetEventType() reflect.Type { + return reflect.TypeOf(ServiceInstancePreRegisteredEvent{}) +} + +type ServiceDiscoveryA struct { +} + +// String return mockServiceDiscovery +func (msd *ServiceDiscoveryA) String() string { + return "testServiceDiscovery" +} + +// Destroy do nothing +func (msd *ServiceDiscoveryA) Destroy() error { + return nil +} + +func (msd *ServiceDiscoveryA) Register(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) Update(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) Unregister(instance registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) GetDefaultPageSize() int { + return 1 +} + +func (msd *ServiceDiscoveryA) GetServices() *gxset.HashSet { + return nil +} + +func (msd *ServiceDiscoveryA) GetInstances(serviceName string) []registry.ServiceInstance { + return nil +} + +func (msd *ServiceDiscoveryA) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + return nil +} + +func (msd *ServiceDiscoveryA) AddListener(listener *registry.ServiceInstancesChangedListener) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEventByServiceName(serviceName string) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return nil +} + +func (msd *ServiceDiscoveryA) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + return nil +} diff --git a/registry/common/event_publishing_service_discovery.go b/registry/common/event_publishing_service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..f61dd84690f4878eaaf7fb29890d6dab2210ef8f --- /dev/null +++ b/registry/common/event_publishing_service_discovery.go @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" +) + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +// EventPublishingServiceDiscovery will enhance Service Discovery +// Publish some event about service discovery +type EventPublishingServiceDiscovery struct { + serviceDiscovery registry.ServiceDiscovery +} + +// NewEventPublishingServiceDiscovery is a constructor +func NewEventPublishingServiceDiscovery(serviceDiscovery registry.ServiceDiscovery) *EventPublishingServiceDiscovery { + return &EventPublishingServiceDiscovery{ + serviceDiscovery: serviceDiscovery, + } +} + +// String +func (epsd *EventPublishingServiceDiscovery) String() string { + return epsd.serviceDiscovery.String() +} + +// Destroy delegate function +func (epsd *EventPublishingServiceDiscovery) Destroy() error { + f := func() error { + return epsd.serviceDiscovery.Destroy() + } + return epsd.executeWithEvents(NewServiceDiscoveryDestroyingEvent(epsd, epsd.serviceDiscovery), + f, NewServiceDiscoveryDestroyedEvent(epsd, epsd.serviceDiscovery)) +} + +// Register delegate function +func (epsd *EventPublishingServiceDiscovery) Register(instance registry.ServiceInstance) error { + f := func() error { + return epsd.serviceDiscovery.Register(instance) + } + return epsd.executeWithEvents(NewServiceInstancePreRegisteredEvent(epsd.serviceDiscovery, instance), + f, NewServiceInstanceRegisteredEvent(epsd.serviceDiscovery, instance)) + +} + +// Update delegate function +func (epsd *EventPublishingServiceDiscovery) Update(instance registry.ServiceInstance) error { + f := func() error { + return epsd.serviceDiscovery.Update(instance) + } + return epsd.executeWithEvents(nil, f, nil) +} + +// Unregister delegate function +func (epsd *EventPublishingServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + f := func() error { + return epsd.serviceDiscovery.Unregister(instance) + } + return epsd.executeWithEvents(NewServiceInstancePreUnregisteredEvent(epsd.serviceDiscovery, instance), + f, NewServiceInstanceUnregisteredEvent(epsd.serviceDiscovery, instance)) +} + +func (epsd *EventPublishingServiceDiscovery) GetDefaultPageSize() int { + return epsd.serviceDiscovery.GetDefaultPageSize() +} + +func (epsd *EventPublishingServiceDiscovery) GetServices() *gxset.HashSet { + return epsd.serviceDiscovery.GetServices() +} + +func (epsd *EventPublishingServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + return epsd.serviceDiscovery.GetInstances(serviceName) +} + +func (epsd *EventPublishingServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + return epsd.serviceDiscovery.GetInstancesByPage(serviceName, offset, pageSize) +} + +func (epsd *EventPublishingServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + return epsd.serviceDiscovery.GetHealthyInstancesByPage(serviceName, offset, pageSize, healthy) +} + +func (epsd *EventPublishingServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + return epsd.serviceDiscovery.GetRequestInstances(serviceNames, offset, requestedSize) +} + +// AddListener add event listener +func (epsd *EventPublishingServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + extension.GetGlobalDispatcher().AddEventListener(listener) + return epsd.serviceDiscovery.AddListener(listener) +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + return epsd.DispatchEventByServiceName(serviceName) +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return epsd.serviceDiscovery.DispatchEventForInstances(serviceName, instances) +} + +func (epsd *EventPublishingServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + return epsd.serviceDiscovery.DispatchEvent(event) +} + +// executeWithEvents dispatch before event and after event if return error will dispatch exception event +func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent observer.Event, f func() error, afterEvent observer.Event) error { + globalDispatcher := extension.GetGlobalDispatcher() + if beforeEvent != nil { + globalDispatcher.Dispatch(beforeEvent) + } + if err := f(); err != nil { + globalDispatcher.Dispatch(NewServiceDiscoveryExceptionEvent(epsd, epsd.serviceDiscovery, err)) + return err + } + if afterEvent != nil { + globalDispatcher.Dispatch(afterEvent) + } + return nil +} diff --git a/registry/common/service_discovery_event.go b/registry/common/service_discovery_event.go new file mode 100644 index 0000000000000000000000000000000000000000..a60ca56a39016738daf6e992b286f2b04c2fdec8 --- /dev/null +++ b/registry/common/service_discovery_event.go @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +type ServiceDiscoveryEvent struct { + observer.BaseEvent + original registry.ServiceDiscovery +} + +func NewServiceDiscoveryEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryEvent { + return &ServiceDiscoveryEvent{ + BaseEvent: *observer.NewBaseEvent(discovery), + original: original, + } +} + +func (sde *ServiceDiscoveryEvent) GetServiceDiscovery() registry.ServiceDiscovery { + return sde.GetSource().(registry.ServiceDiscovery) +} + +func (sde *ServiceDiscoveryEvent) GetOriginal() registry.ServiceDiscovery { + return sde.original +} + +// ServiceDiscoveryDestroyingEvent +// this event will be dispatched before service discovery be destroyed +type ServiceDiscoveryDestroyingEvent struct { + ServiceDiscoveryEvent +} + +// ServiceDiscoveryExceptionEvent +// this event will be dispatched when the error occur in service discovery +type ServiceDiscoveryExceptionEvent struct { + ServiceDiscoveryEvent + err error +} + +// ServiceDiscoveryInitializedEvent +// this event will be dispatched after service discovery initialize +type ServiceDiscoveryInitializedEvent struct { + ServiceDiscoveryEvent +} + +// ServiceDiscoveryInitializingEvent +// this event will be dispatched before service discovery initialize +type ServiceDiscoveryInitializingEvent struct { + ServiceDiscoveryEvent +} + +// ServiceDiscoveryDestroyedEvent +// this event will be dispatched after service discovery be destroyed +type ServiceDiscoveryDestroyedEvent struct { + ServiceDiscoveryEvent +} + +// NewServiceDiscoveryDestroyingEvent create a ServiceDiscoveryDestroyingEvent +func NewServiceDiscoveryDestroyingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyingEvent { + return &ServiceDiscoveryDestroyingEvent{*NewServiceDiscoveryEvent(discovery, original)} +} + +// NewServiceDiscoveryExceptionEvent create a ServiceDiscoveryExceptionEvent +func NewServiceDiscoveryExceptionEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery, err error) *ServiceDiscoveryExceptionEvent { + return &ServiceDiscoveryExceptionEvent{*NewServiceDiscoveryEvent(discovery, original), err} +} + +// NewServiceDiscoveryInitializedEvent create a ServiceDiscoveryInitializedEvent +func NewServiceDiscoveryInitializedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializedEvent { + return &ServiceDiscoveryInitializedEvent{*NewServiceDiscoveryEvent(discovery, original)} +} + +// NewServiceDiscoveryInitializingEvent create a ServiceDiscoveryInitializingEvent +func NewServiceDiscoveryInitializingEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryInitializingEvent { + return &ServiceDiscoveryInitializingEvent{*NewServiceDiscoveryEvent(discovery, original)} +} + +// NewServiceDiscoveryDestroyedEvent create a ServiceDiscoveryDestroyedEvent +func NewServiceDiscoveryDestroyedEvent(discovery registry.ServiceDiscovery, original registry.ServiceDiscovery) *ServiceDiscoveryDestroyedEvent { + return &ServiceDiscoveryDestroyedEvent{*NewServiceDiscoveryEvent(discovery, original)} +} diff --git a/registry/common/service_instance_event.go b/registry/common/service_instance_event.go new file mode 100644 index 0000000000000000000000000000000000000000..f70e7ee0ff12bf41c761f6bf7c239228df046980 --- /dev/null +++ b/registry/common/service_instance_event.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package common + +import ( + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/registry" +) + +type ServiceInstanceEvent struct { + observer.BaseEvent + serviceInstance registry.ServiceInstance +} + +// NewServiceInstanceEvent create a ServiceInstanceEvent +func NewServiceInstanceEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceEvent { + return &ServiceInstanceEvent{ + BaseEvent: *observer.NewBaseEvent(source), + serviceInstance: instance, + } +} + +func (sie *ServiceInstanceEvent) getServiceInstance() registry.ServiceInstance { + return sie.serviceInstance +} + +// ServiceInstancePreRegisteredEvent +// this event will be dispatched before service instance be registered +type ServiceInstancePreRegisteredEvent struct { + ServiceInstanceEvent +} + +// ServiceInstancePreUnregisteredEvent +// this event will be dispatched before service instance be unregistered +type ServiceInstancePreUnregisteredEvent struct { + ServiceInstanceEvent +} + +// ServiceInstanceRegisteredEvent +// this event will be dispatched after service instance be registered +type ServiceInstanceRegisteredEvent struct { + ServiceInstanceEvent +} + +// ServiceInstanceRegisteredEvent +// this event will be dispatched after service instance be unregistered +type ServiceInstanceUnregisteredEvent struct { + ServiceInstanceEvent +} + +// NewServiceInstancePreRegisteredEvent create a ServiceInstancePreRegisteredEvent +func NewServiceInstancePreRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreRegisteredEvent { + return &ServiceInstancePreRegisteredEvent{*NewServiceInstanceEvent(source, instance)} +} + +// NewServiceInstancePreUnregisteredEvent create a ServiceInstancePreUnregisteredEvent +func NewServiceInstancePreUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstancePreUnregisteredEvent { + return &ServiceInstancePreUnregisteredEvent{*NewServiceInstanceEvent(source, instance)} +} + +// NewServiceInstanceRegisteredEvent create a ServiceInstanceRegisteredEvent +func NewServiceInstanceRegisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceRegisteredEvent { + return &ServiceInstanceRegisteredEvent{*NewServiceInstanceEvent(source, instance)} +} + +// NewServiceInstanceUnregisteredEvent create a ServiceInstanceUnregisteredEvent +func NewServiceInstanceUnregisteredEvent(source interface{}, instance registry.ServiceInstance) *ServiceInstanceUnregisteredEvent { + return &ServiceInstanceUnregisteredEvent{*NewServiceInstanceEvent(source, instance)} +} diff --git a/registry/consul/registry.go b/registry/consul/registry.go index c5b8510a6c87068a5b4f1ce52203d401a896a6c2..c9e0718346258b6b38f2a793dc215bcf8e65cdb7 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -95,7 +95,7 @@ func (r *consulRegistry) register(url common.URL) error { return r.client.Agent().ServiceRegister(service) } -func (r *consulRegistry) Unregister(url common.URL) error { +func (r *consulRegistry) UnRegister(url common.URL) error { var err error role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) @@ -112,11 +112,17 @@ func (r *consulRegistry) unregister(url common.URL) error { return r.client.Agent().ServiceDeregister(buildId(url)) } -func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { +func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) if role == common.CONSUMER { r.subscribe(url, notifyListener) } + return nil +} + +// UnSubscribe : +func (r *consulRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error { + return perrors.New("UnSubscribe not support in consulRegistry") } func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.NotifyListener) { diff --git a/registry/consul/registry_test.go b/registry/consul/registry_test.go index bb6842cd8fb67dd2cc70b1a7530fbb94f618a9b0..94718f5ab657c198882f065a50e5d5a2c9d4bc6f 100644 --- a/registry/consul/registry_test.go +++ b/registry/consul/registry_test.go @@ -44,7 +44,7 @@ func (suite *consulRegistryTestSuite) testRegister() { func (suite *consulRegistryTestSuite) testUnregister() { consulProviderRegistry, _ := suite.providerRegistry.(*consulRegistry) - err := consulProviderRegistry.Unregister(suite.providerUrl) + err := consulProviderRegistry.UnRegister(suite.providerUrl) assert.NoError(suite.t, err) } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 5d389c36374fe9de5561418bc90d44a7d780fd48..a65d090349b40d473c769e3130e4f000ee03bd00 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -114,6 +114,10 @@ func (r *etcdV3Registry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } +func (r *etcdV3Registry) DoUnregister(root string, node string) error { + return perrors.New("DoUnregister is not support in etcdV3Registry") +} + func (r *etcdV3Registry) CloseAndNilClient() { r.client.Close() r.client = nil @@ -168,3 +172,7 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) return configListener, nil } + +func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry") +} diff --git a/registry/event_listener.go b/registry/event_listener.go index 34fd81de74aba98dddcce9e99afbd858f47112ee..1cd5ad43a66acc70c6a7938f8d6532346fd6410d 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -25,34 +25,32 @@ import ( "github.com/apache/dubbo-go/common/observer" ) -//The Service Discovery Changed Event Listener +// The Service Discovery Changed Event Listener type ServiceInstancesChangedListener struct { - ServiceName string - observer.ConditionalEventListener - ChangedNotify ChangedNotify + ServiceName string + ChangedNotify observer.ChangedNotify } -//On ServiceInstancesChangedEvent the service instances change event -func (sicl *ServiceInstancesChangedListener) OnEvent(e ServiceInstancesChangedEvent) error { - sicl.ChangedNotify.Notify(e) +// On ServiceInstancesChangedEvent the service instances change event +func (lstn *ServiceInstancesChangedListener) OnEvent(e observer.Event) error { + lstn.ChangedNotify.Notify(e) return nil } -//get listener priority -func (sicl *ServiceInstancesChangedListener) GetPriority() int { - return -1 -} - -//get event type -func (sicl *ServiceInstancesChangedListener) GetEventType() reflect.Type { - return reflect.TypeOf(&ServiceInstancesChangedEvent{}) +// return true if the name is the same +func (lstn *ServiceInstancesChangedListener) Accept(e observer.Event) bool { + if ce, ok := e.(*ServiceInstancesChangedEvent); ok { + return ce.ServiceName == lstn.ServiceName + } + return false } -//If service name matches,return true or false -func (sicl *ServiceInstancesChangedListener) Accept(e ServiceInstancesChangedEvent) bool { - return e.ServiceName == sicl.ServiceName +// get listener priority +func (lstn *ServiceInstancesChangedListener) GetPriority() int { + return -1 } -type ChangedNotify interface { - Notify(e ServiceInstancesChangedEvent) +// get event type +func (lstn *ServiceInstancesChangedListener) GetEventType() reflect.Type { + return reflect.TypeOf(&ServiceInstancesChangedEvent{}) } diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 8a02d0e3e693b58946a97e7b47238e0be4272dcf..7ee0f6b0eeb83181bfd20e1abe4685e8319cd09b 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -107,6 +107,10 @@ func (r *kubernetesRegistry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } +func (r *kubernetesRegistry) DoUnregister(root string, node string) error { + return perrors.New("DoUnregister is not support in kubernetesRegistry") +} + func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( @@ -139,6 +143,10 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er return configListener, nil } +func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry") +} + func (r *kubernetesRegistry) InitListeners() { r.listener = kubernetes.NewEventListener(r.client) r.configListener = NewConfigurationListener(r) diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 9591928eebd22bf2a99ec9dcfeb285c4519a3b90..f39490a26755a96aab1438d965bd8ee6fc75006f 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -51,6 +51,11 @@ func (*MockRegistry) Register(url common.URL) error { return nil } +// UnRegister +func (r *MockRegistry) UnRegister(conf common.URL) error { + return nil +} + // Destroy ... func (r *MockRegistry) Destroy() { if r.destroyed.CAS(false, true) { @@ -72,7 +77,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { } // Subscribe ... -func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { +func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { go func() { for { if !r.IsAvailable() { @@ -104,6 +109,12 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) } } }() + return nil +} + +// UnSubscribe : +func (r *MockRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error { + return nil } type listener struct { diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 61a0fee8f660106379cc505f491ee432de416890..2b6cab45c2f5b552738bce3a352e774aa4b8cbcd 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -131,23 +131,28 @@ func (nr *nacosRegistry) Register(url common.URL) error { return nil } +// UnRegister +func (nr *nacosRegistry) UnRegister(conf common.URL) error { + return perrors.New("UnRegister is not support in nacosRegistry") +} + func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) { return NewNacosListener(*conf, nr.namingClient) } // subscribe from registry -func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { +func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { for { if !nr.IsAvailable() { logger.Warnf("event listener game over.") - return + return perrors.New("nacosRegistry is not available.") } listener, err := nr.subscribe(url) if err != nil { if !nr.IsAvailable() { logger.Warnf("event listener game over.") - return + return err } logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) time.Sleep(time.Duration(RegistryConnDelay) * time.Second) @@ -159,7 +164,7 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti if err != nil { logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) listener.Close() - return + return err } logger.Infof("update begin, service event: %v", serviceEvent.String()) @@ -167,6 +172,12 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } } + return nil +} + +// UnSubscribe : +func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error { + return perrors.New("UnSubscribe not support in nacosRegistry") } func (nr *nacosRegistry) GetUrl() common.URL { diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 7d3406cac233dd5293c7522b4f12148fdcdd704e..e9230195f6fe4191064c7aa308db1494d8635eec 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -18,15 +18,21 @@ package nacos import ( + "fmt" + "sync" + "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/page" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/model" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" + + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/remoting/nacos" ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -47,8 +53,12 @@ func init() { // There is a problem, the go client for nacos does not support the id field. // we will use the metadata to store the id of ServiceInstance type nacosServiceDiscovery struct { - nacosBaseRegistry group string + // descriptor is a short string about the basic information of this instance + descriptor string + + // namingClient is the Nacos' client + namingClient naming_client.INamingClient } // Destroy will close the service discovery. @@ -237,7 +247,7 @@ func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, in // DispatchEvent will dispatch the event func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { - // TODO(waiting for event dispatcher, another task) + extension.GetGlobalDispatcher().Dispatch(event) return nil } @@ -271,15 +281,58 @@ func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceIn } } -// toDeregisterInstance will create new service discovery instance -func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { +func (n *nacosServiceDiscovery) String() string { + return n.descriptor +} + +var ( + // 16 would be enough. We won't use concurrentMap because in most cases, there are not race condition + instanceMap = make(map[string]registry.ServiceDiscovery, 16) + initLock sync.Mutex +) + +// newNacosServiceDiscovery will create new service discovery instance +// use double-check pattern to reduce race condition +func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) { + + instance, ok := instanceMap[name] + if ok { + return instance, nil + } + + initLock.Lock() + defer initLock.Unlock() - base, err := newBaseRegistry(url) + // double check + instance, ok = instanceMap[name] + if ok { + return instance, nil + } + + sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name) + if !ok || len(sdc.RemoteRef) == 0 { + return nil, perrors.New("could not init the instance because the config is invalid") + } + + remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef) + if !ok { + return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef) + } + group := sdc.Group + if len(group) == 0 { + group = defaultGroup + } + + client, err := nacos.NewNacosClient(remoteConfig) if err != nil { - return nil, perrors.WithStack(err) + return nil, perrors.WithMessage(err, "create nacos client failed.") } + + descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address) + return &nacosServiceDiscovery{ - nacosBaseRegistry: base, - group: url.GetParam(constant.NACOS_GROUP, defaultGroup), + group: group, + namingClient: client, + descriptor: descriptor, }, nil } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 52a7dcbfc77fd576ef8d2917ce51cc09f3cd0b97..aa8fbcbe7d6eca682892d4627878fe6bfc3756fe 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -22,9 +22,8 @@ import ( "strings" "sync" ) - import ( - "github.com/dubbogo/gost/container/set" + gxset "github.com/dubbogo/gost/container/set" ) import ( @@ -96,8 +95,24 @@ func getRegistry(regUrl *common.URL) registry.Registry { func getUrlToRegistry(providerUrl *common.URL, registryUrl *common.URL) *common.URL { if registryUrl.GetParamBool("simplified", false) { return providerUrl.CloneWithParams(reserveParams) + } else { + return filterHideKey(providerUrl) + } +} + +// filterHideKey filter the parameters that do not need to be output in url(Starting with .) +func filterHideKey(url *common.URL) *common.URL { + + //be careful params maps in url is map type + cloneURL := url.Clone() + removeSet := gxset.NewSet() + for k, _ := range cloneURL.GetParams() { + if strings.HasPrefix(k, ".") { + removeSet.Add(k) + } } - return providerUrl + cloneURL.RemoveParams(removeSet) + return cloneURL } func (proto *registryProtocol) initConfigurationListeners() { diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 6adc0fc7d7cfa3a9c966166e622550ad05d7da45..2d6e0248fddb88bfa5ce19546fb7aed703b0fd3c 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -287,3 +287,12 @@ func TestExportWithApplicationConfig(t *testing.T) { v2, _ := regProtocol.bounds.Load(getCacheKey(newUrl)) assert.NotNil(t, v2) } + +func TestGetProviderUrlWithHideKey(t *testing.T) { + url, _ := common.NewURL("dubbo://127.0.0.1:1111?a=a1&b=b1&.c=c1&.d=d1&e=e1&protocol=registry") + providerUrl := getUrlToRegistry(&url, &url) + assert.NotContains(t, providerUrl.GetParams(), ".c") + assert.NotContains(t, providerUrl.GetParams(), ".d") + assert.Contains(t, providerUrl.GetParams(), "a") + +} diff --git a/registry/registry.go b/registry/registry.go index d673864700e6ba99e8f0283247d53760b85598aa..74e63aa66ebdc674261ce4109b27a067ce769007 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -34,6 +34,12 @@ type Registry interface { //And it is also used for service consumer calling , register services cared about ,for dubbo's admin monitoring. Register(url common.URL) error + // UnRegister is required to support the contract: + // 1. If it is the persistent stored data of dynamic=false, the registration data can not be found, then the IllegalStateException is thrown, otherwise it is ignored. + // 2. Unregister according to the full url match. + // url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin + UnRegister(url common.URL) error + //When creating new registry extension,pls select one of the following modes. //Will remove in dubbogo version v1.1.0 //mode1 : return Listener with Next function which can return subscribe service event from registry @@ -42,7 +48,14 @@ type Registry interface { //Will relace mode1 in dubbogo version v1.1.0 //mode2 : callback mode, subscribe with notify(notify listener). - Subscribe(*common.URL, NotifyListener) + Subscribe(*common.URL, NotifyListener) error + + // UnSubscribe is required to support the contract: + // 1. If don't subscribe, ignore it directly. + // 2. Unsubscribe by full URL match. + // url Subscription condition, not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin + // listener A listener of the change event, not allowed to be empty + UnSubscribe(*common.URL, NotifyListener) error } // NotifyListener ... diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index d32227b4f96c75d5ce526745f42317c9decb399d..6b25bc60ef1e8cae5ed0e0c1ae3cbe492df1c107 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -24,21 +24,15 @@ import ( "strings" "sync" - perrors "github.com/pkg/errors" - - "github.com/apache/dubbo-go/config" -) - -import ( cm "github.com/Workiva/go-datastructures/common" gxset "github.com/dubbogo/gost/container/set" -) + perrors "github.com/pkg/errors" -import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/observer" "github.com/apache/dubbo-go/metadata/mapping" "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service/inmemory" @@ -52,74 +46,60 @@ const ( protocolName = "service-discovery" ) -var ( - registryInstance *serviceDiscoveryRegistry - registryInitOnce sync.Once -) - func init() { extension.SetRegistry(protocolName, newServiceDiscoveryRegistry) } // serviceDiscoveryRegistry is the implementation of application-level registry. // It's completely different from other registry implementations -// The serviceDiscoveryRegistry should be singleton // This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping // In order to keep compatible with interface-level registry, // 1. when we registry the service, we should create the mapping from service name to application name -// 2. when we subscribe the service, we should find out related application and then find application's information +// 2. when we sub type serviceDiscoveryRegistry struct { - lock sync.RWMutex - url *common.URL - serviceDiscovery registry.ServiceDiscovery - subscribedServices *gxset.HashSet - serviceNameMapping mapping.ServiceNameMapping - metaDataService service.MetadataService - // cache the registered listen - registeredListeners *gxset.HashSet - // all synthesize - subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer - // cache exported urls, serviceName->revision->[]URL + lock sync.RWMutex + url *common.URL + serviceDiscovery registry.ServiceDiscovery + subscribedServices *gxset.HashSet + serviceNameMapping mapping.ServiceNameMapping + metaDataService service.MetadataService + registeredListeners *gxset.HashSet + subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer serviceRevisionExportedURLsCache map[string]map[string][]common.URL } -// newServiceDiscoveryRegistry will return the instance -// if not found, it will create one -func newServiceDiscoveryRegistry(url *common.URL) (res registry.Registry, err error) { +func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { + serviceDiscovery, err := creatServiceDiscovery(url) + if err != nil { + return nil, err + } + subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) + subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() + serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) + // TODO it's need to get implement by factory + metaDataService := inmemory.NewMetadataService() + return &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: serviceDiscovery, + subscribedServices: subscribedServices, + subscribedURLsSynthesizers: subscribedURLsSynthesizers, + registeredListeners: gxset.NewSet(), + serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), + serviceNameMapping: serviceNameMapping, + metaDataService: metaDataService, + }, nil +} - registryInitOnce.Do(func() { - serviceDiscovery, err := creatServiceDiscovery(url) - if err != nil { - return - } - subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) - subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() - serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) - // TODO it's need to get implement by factory - metaDataService := inmemory.NewMetadataService() - registryInstance = &serviceDiscoveryRegistry{ - url: url, - serviceDiscovery: serviceDiscovery, - subscribedServices: subscribedServices, - subscribedURLsSynthesizers: subscribedURLsSynthesizers, - registeredListeners: gxset.NewSet(), - serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL), - serviceNameMapping: serviceNameMapping, - metaDataService: metaDataService, - } - }) - res = registryInstance - return +func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error { + panic("implement me") } -func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { - discovery := url.GetParam(constant.SERVICE_DISCOVERY_KEY, constant.DEFAULT_KEY) - sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(discovery) +func (s *serviceDiscoveryRegistry) UnSubscribe(*common.URL, registry.NotifyListener) error { + panic("implement me") +} - if !ok { - return nil, perrors.New("could not find the ServiceDiscoverConfig with name: " + discovery) - } - return extension.GetServiceDiscovery(sdc.Protocol, discovery) +func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { + return extension.GetServiceDiscovery(url.Protocol, "TODO") } func parseServices(literalServices string) *gxset.HashSet { @@ -136,22 +116,18 @@ func parseServices(literalServices string) *gxset.HashSet { return set } -// GetServiceDiscovery for get serviceDiscovery of the registry func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery { return s.serviceDiscovery } -// GetUrl for get url of the registry func (s *serviceDiscoveryRegistry) GetUrl() common.URL { return *s.url } -// IsAvailable for make sure is't available func (s *serviceDiscoveryRegistry) IsAvailable() bool { return true } -// Destroy for destroy graceful down func (s *serviceDiscoveryRegistry) Destroy() { err := s.serviceDiscovery.Destroy() if err != nil { @@ -185,21 +161,18 @@ func shouldRegister(url common.URL) bool { return false } -// Subscribe for listen the change of services that from the exported url -func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) { +func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error { if !shouldSubscribe(*url) { - return + return nil } _, err := s.metaDataService.SubscribeURL(*url) if err != nil { - logger.Errorf("subscribe url[%s] catch error:%s", url.String(), err.Error()) - return + return perrors.WithMessage(err, "subscribe url error: "+url.String()) } services := s.getServices(*url) if services.Empty() { - logger.Errorf("Should has at least one way to know which services this interface belongs to, "+ + return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+ "subscription url:%s", url.String()) - return } for _, srv := range services.Values() { serviceName := srv.(string) @@ -215,6 +188,7 @@ func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.No s.registerServiceInstancesChangedListener(*url, listener) } + return nil } func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) { listenerId := listener.ServiceName + ":" + getUrlKey(url) @@ -384,17 +358,17 @@ func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registr } func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { + s.lock.Lock() // 1. expunge stale s.expungeStaleRevisionExportedURLs(serviceInstances) // 2. Initialize s.initRevisionExportedURLs(serviceInstances) + s.lock.Unlock() } func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) { serviceName := serviceInstances[0].GetServiceName() - s.lock.Lock() revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName] - s.lock.Unlock() if !exist { return } @@ -405,7 +379,7 @@ func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInsta currentRevision := gxset.NewSet() for _, s := range serviceInstances { rv := getExportedServicesRevision(s) - if len(rv) > 0 { + if len(rv) != 0 { currentRevision.Add(rv) } } @@ -461,9 +435,7 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc } serviceName := serviceInstance.GetServiceName() revision := getExportedServicesRevision(serviceInstance) - s.lock.Lock() revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName] - s.lock.Unlock() revisionExportedURLs := revisionExportedURLsMap[revision] firstGet := false if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 { @@ -613,7 +585,11 @@ type InstanceChangeNotify struct { serviceDiscoveryRegistry *serviceDiscoveryRegistry } -func (icn *InstanceChangeNotify) Notify(event registry.ServiceInstancesChangedEvent) { - sdr := icn.serviceDiscoveryRegistry - sdr.subscribe(sdr.url, icn.notify, event.ServiceName, event.Instances) +func (icn *InstanceChangeNotify) Notify(event observer.Event) { + + if se, ok := event.(*registry.ServiceInstancesChangedEvent); ok { + sdr := icn.serviceDiscoveryRegistry + sdr.subscribe(sdr.url, icn.notify, se.ServiceName, se.Instances) + } + } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index c5b2f33c6107e82aa172c818c0d8aca1483248c6..ec82fa0309118fba4b5c21772d4dfd356f3b0c5c 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -37,7 +37,7 @@ import ( // RegistryDataListener contains all URL information subscribed by zookeeper registry type RegistryDataListener struct { - subscribed map[*common.URL]config_center.ConfigurationListener + subscribed map[string]config_center.ConfigurationListener mutex sync.Mutex closed bool } @@ -45,7 +45,7 @@ type RegistryDataListener struct { // NewRegistryDataListener constructs a new RegistryDataListener func NewRegistryDataListener() *RegistryDataListener { return &RegistryDataListener{ - subscribed: make(map[*common.URL]config_center.ConfigurationListener)} + subscribed: make(map[string]config_center.ConfigurationListener)} } // SubscribeURL is used to set a watch listener for url @@ -53,7 +53,17 @@ func (l *RegistryDataListener) SubscribeURL(url *common.URL, listener config_cen if l.closed { return } - l.subscribed[url] = listener + l.subscribed[url.ServiceKey()] = listener +} + +// UnSubscribeURL is used to set a watch listener for url +func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { + if l.closed { + return nil + } + listener := l.subscribed[url.ServiceKey()] + delete(l.subscribed, url.ServiceKey()) + return listener } // DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing @@ -75,8 +85,8 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool { if l.closed { return false } - for url, listener := range l.subscribed { - if serviceURL.URLEqual(*url) { + for serviceKey, listener := range l.subscribed { + if serviceURL.ServiceKey() == serviceKey { listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, @@ -101,18 +111,25 @@ func (l *RegistryDataListener) Close() { // RegistryConfigurationListener represent the processor of zookeeper watcher type RegistryConfigurationListener struct { - client *zk.ZookeeperClient - registry *zkRegistry - events chan *config_center.ConfigChangeEvent - isClosed bool - close chan struct{} - closeOnce sync.Once + client *zk.ZookeeperClient + registry *zkRegistry + events chan *config_center.ConfigChangeEvent + isClosed bool + close chan struct{} + closeOnce sync.Once + subscribeURL *common.URL } // NewRegistryConfigurationListener for listening the event of zk. -func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { +func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry, conf *common.URL) *RegistryConfigurationListener { reg.WaitGroup().Add(1) - return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false, close: make(chan struct{}, 1)} + return &RegistryConfigurationListener{ + client: client, + registry: reg, + events: make(chan *config_center.ConfigChangeEvent, 32), + isClosed: false, + close: make(chan struct{}, 1), + subscribeURL: conf} } // Process submit the ConfigChangeEvent to the event chan to notify all observer diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 88d5d6221b4bc7136ba4c3e7c95fb53ba35a9a58..5d5f9e0526b7b8a9c5a2e2524f27f03573d758a8 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -20,6 +20,7 @@ package zookeeper import ( "fmt" "net/url" + "path" "sync" "time" ) @@ -128,12 +129,17 @@ func (r *zkRegistry) InitListeners() { recoverd := r.dataListener.subscribed if recoverd != nil && len(recoverd) > 0 { // recover all subscribed url - for conf, oldListener := range recoverd { - if regConfigListener, ok := oldListener.(*RegistryConfigurationListener); ok { + for _, oldListener := range recoverd { + var ( + regConfigListener *RegistryConfigurationListener + ok bool + ) + + if regConfigListener, ok = oldListener.(*RegistryConfigurationListener); ok { regConfigListener.Close() } - newDataListener.SubscribeURL(conf, NewRegistryConfigurationListener(r.client, r)) - go r.listener.ListenServiceEvent(conf, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(conf.Service())), newDataListener) + newDataListener.SubscribeURL(regConfigListener.subscribeURL, NewRegistryConfigurationListener(r.client, r, regConfigListener.subscribeURL)) + go r.listener.ListenServiceEvent(regConfigListener.subscribeURL, fmt.Sprintf("/dubbo/%s/"+constant.DEFAULT_CATEGORY, url.QueryEscape(regConfigListener.subscribeURL.Service())), newDataListener) } } @@ -149,10 +155,23 @@ func (r *zkRegistry) DoRegister(root string, node string) error { return r.registerTempZookeeperNode(root, node) } +func (r *zkRegistry) DoUnregister(root string, node string) error { + r.cltLock.Lock() + defer r.cltLock.Unlock() + if !r.ZkClient().ZkConnValid() { + return perrors.Errorf("zk client is not valid.") + } + return r.ZkClient().Delete(path.Join(root, node)) +} + func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } +func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return r.getCloseListener(conf) +} + func (r *zkRegistry) CloseAndNilClient() { r.client.Close() r.client = nil @@ -217,9 +236,9 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen dataListener := r.dataListener dataListener.mutex.Lock() defer dataListener.mutex.Unlock() - if r.dataListener.subscribed[conf] != nil { + if r.dataListener.subscribed[conf.ServiceKey()] != nil { - zkListener, _ := r.dataListener.subscribed[conf].(*RegistryConfigurationListener) + zkListener, _ := r.dataListener.subscribed[conf.ServiceKey()].(*RegistryConfigurationListener) if zkListener != nil { r.listenerLock.Lock() defer r.listenerLock.Unlock() @@ -231,7 +250,7 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen } } - zkListener = NewRegistryConfigurationListener(r.client, r) + zkListener = NewRegistryConfigurationListener(r.client, r, conf) if r.listener == nil { r.cltLock.Lock() client := r.client @@ -255,3 +274,37 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen return zkListener, nil } + +func (r *zkRegistry) getCloseListener(conf *common.URL) (*RegistryConfigurationListener, error) { + + var zkListener *RegistryConfigurationListener + r.dataListener.mutex.Lock() + configurationListener := r.dataListener.subscribed[conf.ServiceKey()] + if configurationListener != nil { + + zkListener, _ := configurationListener.(*RegistryConfigurationListener) + if zkListener != nil { + if zkListener.isClosed { + return nil, perrors.New("configListener already been closed") + } + } + } + + zkListener = r.dataListener.UnSubscribeURL(conf).(*RegistryConfigurationListener) + r.dataListener.mutex.Unlock() + + if r.listener == nil { + return nil, perrors.New("listener is null can not close.") + } + + //Interested register to dataconfig. + r.listenerLock.Lock() + listener := r.listener + r.listener = nil + r.listenerLock.Unlock() + + r.dataListener.Close() + listener.Close() + + return zkListener, nil +} diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index 688deccfbec67771c4071f6307802a16e4e0fc8b..d915fc2ce10359f0dd1970daf019746ce066f511 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -45,6 +45,31 @@ func Test_Register(t *testing.T) { assert.NoError(t, err) } +func Test_UnRegister(t *testing.T) { + // register + regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + + ts, reg, _ := newMockZkRegistry(®url) + defer ts.Stop() + err := reg.Register(url) + children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") + assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children) + assert.NoError(t, err) + + err = reg.UnRegister(url) + children, err = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") + assert.Equal(t, 0, len(children)) + assert.Error(t, err) + assert.True(t, reg.IsAvailable()) + + err = reg.Register(url) + children, _ = reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") + assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26cluster%3Dmock%26.*.serviceid%3Dsoa.mock", children) + assert.NoError(t, err) + +} + func Test_Subscribe(t *testing.T) { regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) @@ -74,6 +99,39 @@ func Test_Subscribe(t *testing.T) { defer ts.Stop() } +func Test_UnSubscribe(t *testing.T) { + regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + ts, reg, _ := newMockZkRegistry(®url) + + //provider register + err := reg.Register(url) + assert.NoError(t, err) + + if err != nil { + return + } + + //consumer register + regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)) + _, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts)) + + reg2.Register(url) + listener, _ := reg2.DoSubscribe(&url) + + serviceEvent, _ := listener.Next() + assert.NoError(t, err) + if err != nil { + return + } + assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String()) + + reg2.UnSubscribe(&url, nil) + assert.Nil(t, reg2.listener) + + defer ts.Stop() +} + func Test_ConsumerDestory(t *testing.T) { regurl, _ := common.NewURL("registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))) url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) diff --git a/remoting/nacos/builder.go b/remoting/nacos/builder.go index 578fef49eaad0caae608dabcc69f1bd2d1e45209..170f23f60cfb0ed12e311dd6011c3a177e7f93f6 100644 --- a/remoting/nacos/builder.go +++ b/remoting/nacos/builder.go @@ -21,16 +21,81 @@ import ( "net" "strconv" "strings" + "time" + "github.com/apache/dubbo-go/config" +) + +import ( "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/config_client" "github.com/nacos-group/nacos-sdk-go/clients/naming_client" nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" perrors "github.com/pkg/errors" +) +import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/config" ) +func NewNacosNamingClient(url *common.URL) (naming_client.INamingClient, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return nil, err + } + return clients.CreateNamingClient(nacosConfig) +} + +func NewNacosConfigClient(url *common.URL) (config_client.IConfigClient, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return nil, err + } + return clients.CreateConfigClient(nacosConfig) +} + +// getNacosConfig will return the nacos config +func getNacosConfig(url *common.URL) (map[string]interface{}, error) { + if url == nil { + return nil, perrors.New("url is empty!") + } + if len(url.Location) == 0 { + return nil, perrors.New("url.location is empty!") + } + configMap := make(map[string]interface{}, 2) + + addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) + for _, addr := range addresses { + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, perrors.WithMessagef(err, "split [%s] ", addr) + } + port, _ := strconv.Atoi(portStr) + serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ + IpAddr: ip, + Port: uint64(port), + }) + } + configMap["serverConfigs"] = serverConfigs + + var clientConfig nacosConstant.ClientConfig + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + return nil, err + } + clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs + clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.NotLoadCacheAtStart = true + configMap["clientConfig"] = clientConfig + + return configMap, nil +} + func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error) { if len(rc.Address) == 0 { return nil, perrors.New("nacos address is empty!") diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 84877667763ce870e76202844e9dc9dc1c3f008c..097106acf6b44d03708362d587b5faa8281edeab 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -314,7 +314,8 @@ func (l *ZkEventListener) valid() bool { return l.client.ZkConnValid() } -// Close ... +// Close will let client listen exit func (l *ZkEventListener) Close() { + close(l.client.exit) l.wg.Wait() }