diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index 40ef43c8c125fdcad481f1436e64006149ca3611..a20fb8d48bb5e65e8ec1113c60dcb12e8f0393b5 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -71,7 +71,7 @@ jobs: - name: Integrate Test run: | - chmod +x integrate_test.sh && ./integrate_test.sh + chmod +x integrate_test.sh && ./integrate_test.sh ${{github.event.pull_request.head.repo.full_name}} ${{github.event.pull_request.head.sha}} - name: Post Coverage run: bash <(curl -s https://codecov.io/bash) diff --git a/README.md b/README.md index 3afeb1df8dda6abe5ea8baf49d1ff1e79bc2e9ea..c77a0b47665db4d82064714426995834e6a778d0 100644 --- a/README.md +++ b/README.md @@ -179,21 +179,8 @@ If you are willing to do some code contributions and document contributions to [ If u want to communicate with our community, pls scan the following [dubbobo Ding-Ding QR code](https://mmbiz.qpic.cn/mmbiz_jpg/yvBJb5IiafvnHVBdtia30dxA2hKotr9DEckWsZ7aOJcDWDaSVMGwLmYv8GRgIQtqb4C2svicp8nVkMmGy7yKC5tyA/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) or search our commnity DingDing group code 31363295. -<div> -<table> - <tbody> - <tr></tr> - <tr> - <td align="center" valign="middle"> - <a href="http://alexstocks.github.io/html/dubbogo.html" target="_blank"> - <img width="80px" height="85px" src="./doc/pic/misc/dubbogo-dingding.png"> - </a> - </td> - </tr> - <tr></tr> - </tbody> -</table> -</div> +<a href="http://alexstocks.github.io/html/dubbogo.html"> +<img src="./doc/pic/misc/dubbogo-dingding.png" height="80" width="80"></a> If u want to visit the wechat group, pls add my wechat AlexanderStocks. diff --git a/README_CN.md b/README_CN.md index 9f3b13cd53dcb075298c5651d0b520e0eabba7e0..bc3b54bf4c9198a74f7a4bd5d2a5c038f408aae0 100644 --- a/README_CN.md +++ b/README_CN.md @@ -177,25 +177,12 @@ make test 如果想访问官方钉钉群,请在钉钉中搜索社区群号 31363295 或者 扫描如下[二维码](https://mmbiz.qpic.cn/mmbiz_jpg/yvBJb5IiafvnHVBdtia30dxA2hKotr9DEckWsZ7aOJcDWDaSVMGwLmYv8GRgIQtqb4C2svicp8nVkMmGy7yKC5tyA/640?wx_fmt=jpeg&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1)。 -<div> -<table> - <tbody> - <tr></tr> - <tr> - <td align="center" valign="middle"> - <a href="http://alexstocks.github.io/html/dubbogo.html" target="_blank"> - <img width="80px" height="85px" src="./doc/pic/misc/dubbogo-dingding.png"> - </a> - </td> - </tr> - <tr></tr> - </tbody> -</table> -</div> +<a href="http://alexstocks.github.io/html/dubbogo.html"> +<img src="./doc/pic/misc/dubbogo-dingding.png" height="80" width="80"></a> -如果想加入到社区微信群,可以先添加社区负责人 于雨 的微信 AlexanderStocks 。添加微信之前,请先给 dubbo-go 点 star 作为对项目的支持,添加好友时请报上 github ID 以进行验证。 +如果想加入到社区微信群,可以先添加社区负责人 于雨 的微信 AlexanderStocks 。 -作为一个维护已经帮助构建了经受多家大型微服务系统的社区,我们足以为现有的成绩感到自豪。社区欢迎能提出建设性意见者,只知索取者和喷子请绕行。 +作为一个维护已经帮助构建了经受多家大型微服务系统的社区,我们足以为现有的成绩感到自豪。社区欢迎能提出建设性意见者。 ## 性能测试 ## diff --git a/common/extension/metadata_service.go b/common/extension/metadata_service.go index e35677d148eee121c3a6c018a128b5d372c6f2c7..08ddbc333e85fbfc328bec15aa76d2a588b11afb 100644 --- a/common/extension/metadata_service.go +++ b/common/extension/metadata_service.go @@ -26,12 +26,15 @@ import ( ) import ( + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/metadata/service" ) var ( // there will be two types: local or remote metadataServiceInsMap = make(map[string]func() (service.MetadataService, error), 2) + // remoteMetadataService + remoteMetadataService service.MetadataService ) // SetMetadataService will store the msType => creator pair @@ -48,3 +51,17 @@ func GetMetadataService(msType string) (service.MetadataService, error) { "local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+ "remote - github.com/apache/dubbo-go/metadata/service/remote", msType)) } + +// GetRemoteMetadataService will get a RemoteMetadataService instance +func GetRemoteMetadataService() (service.MetadataService, error) { + if remoteMetadataService != nil { + return remoteMetadataService, nil + } + if creator, ok := metadataServiceInsMap["remote"]; ok { + var err error + remoteMetadataService, err = creator() + return remoteMetadataService, err + } + logger.Warn("could not find the metadata service creator for metadataType: remote") + return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: remote")) +} diff --git a/config/reference_config.go b/config/reference_config.go index 431ec0e2eb3c03b27cca40acd7b721cf6b8f9755..895ab9df26399c1c9a1296e44fba8a284ba2fb6c 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -167,7 +167,8 @@ func (c *ReferenceConfig) Refer(_ interface{}) { // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } - + // publish consumer metadata + publishConsumerDefinition(cfgURL) // create proxy if c.Async { callback := GetCallback(c.id) @@ -257,6 +258,12 @@ func (c *ReferenceConfig) GenericLoad(id string) { c.Implement(genericService) } +func publishConsumerDefinition(url *common.URL) { + if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil { + remoteMetadataService.PublishServiceDefinition(url) + } +} + // postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig. func (c *ReferenceConfig) postProcessConfig(url *common.URL) { for _, p := range extension.GetConfigPostProcessors() { diff --git a/config/service_config.go b/config/service_config.go index fd49390aa2b15938a8a4aa321d47ceafef4d9be9..e8523bdea7aa014836580b7ce33fe215c145289d 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -234,6 +234,7 @@ func (c *ServiceConfig) Export() error { } c.exporters = append(c.exporters, exporter) } + publishServiceDefinition(ivkURL) } c.exported.Store(true) return nil @@ -347,6 +348,12 @@ func (c *ServiceConfig) GetExportedUrls() []*common.URL { return nil } +func publishServiceDefinition(url *common.URL) { + if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil { + remoteMetadataService.PublishServiceDefinition(url) + } +} + // postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig. func (c *ServiceConfig) postProcessConfig(url *common.URL) { for _, p := range extension.GetConfigPostProcessors() { diff --git a/go.mod b/go.mod index da02bf7b71493e6da650bb420d80a8f2bbc2f4dc..2dda6cffad082dac4986d318277913c980203d55 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/coreos/etcd v3.3.25+incompatible github.com/creasty/defaults v1.5.1 github.com/dubbogo/go-zookeeper v1.0.2 - github.com/dubbogo/gost v1.10.1 + github.com/dubbogo/gost v1.11.0 github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect github.com/emicklei/go-restful/v3 v3.4.0 github.com/frankban/quicktest v1.4.1 // indirect @@ -30,9 +30,9 @@ require ( github.com/hashicorp/vault/sdk v0.1.14-0.20191112033314-390e96e22eb2 github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8 github.com/magiconair/properties v1.8.4 - github.com/mitchellh/mapstructure v1.4.0 + github.com/mitchellh/mapstructure v1.4.1 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/nacos-group/nacos-sdk-go v1.0.3 + github.com/nacos-group/nacos-sdk-go v1.0.5 github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.2.6+incompatible // indirect github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 00f7deeed3f4a69f7f77e40b16344b4ff6240913..6cc41e6b583077ffa0e48d70d7d840831d5561a6 100644 --- a/go.sum +++ b/go.sum @@ -176,6 +176,8 @@ github.com/dubbogo/go-zookeeper v1.0.2/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4D github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.10.1 h1:39kF9Cd5JOiMpmwG6dX1/aLWNFqFv9gHp8HrhzMmjLY= github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI= +github.com/dubbogo/gost v1.11.0 h1:9KtyWQz1gMlAfwzen5iyhMdoe08SPBBUVhco4rdgJ9I= +github.com/dubbogo/gost v1.11.0/go.mod h1:w8Yw29eDWtRVo3tx9nPpHkNZnOi4SRx1fZf7eVlAAU4= github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -474,7 +476,9 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= +github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40= github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= @@ -546,8 +550,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.2.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/mitchellh/mapstructure v1.4.0 h1:7ks8ZkOP5/ujthUsT07rNv+nkLXCQWKNHuwzOAesEks= -github.com/mitchellh/mapstructure v1.4.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/pointerstructure v1.0.0 h1:ATSdz4NWrmWPOF1CeCBU4sMCno2hgqdbSrRPFWQSVZI= github.com/mitchellh/pointerstructure v1.0.0/go.mod h1:k4XwG94++jLVsSiTxo7qdIfXA9pj9EAeo0QsNNJOLZ8= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= @@ -566,8 +570,8 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= -github.com/nacos-group/nacos-sdk-go v1.0.3 h1:A2tCWcjuP6bSEjEfNwNnrY+9M0h13XRMDyLY+DPqHMI= -github.com/nacos-group/nacos-sdk-go v1.0.3/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= +github.com/nacos-group/nacos-sdk-go v1.0.5 h1:xwLS0Ao7fSB3HoDFR+JRZ1fh0HuvgHPOWBdals37Oxc= +github.com/nacos-group/nacos-sdk-go v1.0.5/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= diff --git a/integrate_test.sh b/integrate_test.sh index deccda756a211821978e35b92a1f0865858ff59a..e1bbc8c11b0195f7cc4f08946d57464c88e74f19 100644 --- a/integrate_test.sh +++ b/integrate_test.sh @@ -26,12 +26,12 @@ ROOT_DIR=$(pwd) echo "integrate-test root work-space -> ${ROOT_DIR}" # show all travis-env -echo "travis current commit id -> ${TRAVIS_COMMIT}" -echo "travis pull request -> ${TRAVIS_PULL_REQUEST}" -echo "travis pull request branch -> ${TRAVIS_PULL_REQUEST_BRANCH}" -echo "travis pull request slug -> ${TRAVIS_PULL_REQUEST_SLUG}" -echo "travis pull request sha -> ${TRAVIS_PULL_REQUEST_SHA}" -echo "travis pull request repo slug -> ${TRAVIS_REPO_SLUG}" +echo "travis current commit id -> $2" +echo "travis pull request branch -> ${GITHUB_REF}" +echo "travis pull request slug -> ${GITHUB_REPOSITORY}" +echo "travis pull request repo slug -> ${GITHUB_REPOSITORY}" +echo "travis pull request actor -> ${GITHUB_ACTOR}" +echo "travis pull request repo param -> $1" # #start etcd registry insecure listen in [:]:2379 @@ -53,13 +53,13 @@ echo "zookeeper listen in [:]2181" # build go-server image cd ./test/integrate/dubbo/go-server -docker build . -t ci-provider --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_SLUG} --build-arg PR_ORIGIN_COMMITID=${TRAVIS_PULL_REQUEST_SHA} +docker build . -t ci-provider --build-arg PR_ORIGIN_REPO=$1 --build-arg PR_ORIGIN_COMMITID=$2 cd ${ROOT_DIR} docker run -d --network host ci-provider # build go-client image cd ./test/integrate/dubbo/go-client -docker build . -t ci-consumer --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_SLUG} --build-arg PR_ORIGIN_COMMITID=${TRAVIS_PULL_REQUEST_SHA} +docker build . -t ci-consumer --build-arg PR_ORIGIN_REPO=$1 --build-arg PR_ORIGIN_COMMITID=$2 cd ${ROOT_DIR} # run provider # check consumer status diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go index f364087fdc8c2fb2a595190d7ca0d9564ac5d171..56a22de8f8e56c040260edf86859a4499f1b2f39 100644 --- a/metadata/report/delegate/delegate_report.go +++ b/metadata/report/delegate/delegate_report.go @@ -109,6 +109,10 @@ type MetadataReport struct { // NewMetadataReport will create a MetadataReport with initiation func NewMetadataReport() (*MetadataReport, error) { url := instance.GetMetadataReportUrl() + if url == nil { + logger.Warn("the metadataReport URL is not configured, you should configure it.") + return nil, perrors.New("the metadataReport URL is not configured, you should configure it.") + } bmr := &MetadataReport{ reportUrl: url, syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false), diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index d21cc88a3d31ec2920a78c608ed13a491fd6c634..bb520d3df72c369eb4022246f20bb9e9aa1bd8b8 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -116,22 +116,40 @@ func (mts *MetadataService) UnsubscribeURL(url *common.URL) error { func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error { interfaceName := url.GetParam(constant.INTERFACE_KEY, "") isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) - if len(interfaceName) > 0 && !isGeneric { - sv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey()) - sd := definition.BuildServiceDefinition(*sv, url) + if common.RoleType(common.PROVIDER).Role() == url.GetParam(constant.SIDE_KEY, "") { + if len(interfaceName) > 0 && !isGeneric { + sv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey()) + sd := definition.BuildServiceDefinition(*sv, url) + id := &identifier.MetadataIdentifier{ + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: interfaceName, + Version: url.GetParam(constant.VERSION_KEY, ""), + Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO), + Side: url.GetParam(constant.SIDE_KEY, constant.PROVIDER_PROTOCOL), + }, + } + mts.delegateReport.StoreProviderMetadata(id, sd) + return nil + } + logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + } else { + params := make(map[string]string, len(url.GetParams())) + url.RangeParams(func(key, value string) bool { + params[key] = value + return true + }) id := &identifier.MetadataIdentifier{ BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ ServiceInterface: interfaceName, Version: url.GetParam(constant.VERSION_KEY, ""), - // Group: url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP), - Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO), - Side: url.GetParam(constant.SIDE_KEY, "provider"), + Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO), + Side: url.GetParam(constant.SIDE_KEY, "consumer"), }, } - mts.delegateReport.StoreProviderMetadata(id, sd) + mts.delegateReport.StoreConsumerMetadata(id, params) return nil } - logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + return nil } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 679036f20e941866652bff3bf28bc75f690908c8..448bd6f5a90ae0fe3732845d9ce9e2fa238e8e74 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -59,7 +59,11 @@ func (pfw *ProtocolFilterWrapper) Refer(url *common.URL) protocol.Invoker { if pfw.protocol == nil { pfw.protocol = extension.GetProtocol(url.Protocol) } - return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) + invoker := pfw.protocol.Refer(url) + if invoker == nil { + return nil + } + return buildInvokerChain(invoker, constant.REFERENCE_FILTER_KEY) } // Destroy will destroy all invoker and exporter. diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 4bc387cafaa3c64539efb29a181a2198fbe8a30f..f900495f97d3acfd9a2ae232264910b791961bc8 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -19,6 +19,7 @@ package etcdv3 import ( "strings" + "sync" ) import ( @@ -79,8 +80,9 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool { } type configurationListener struct { - registry *etcdV3Registry - events chan *config_center.ConfigChangeEvent + registry *etcdV3Registry + events chan *config_center.ConfigChangeEvent + closeOnce sync.Once } // NewConfigurationListener for listening the event of etcdv3. @@ -120,5 +122,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { // Close etcd registry center func (l *configurationListener) Close() { - l.registry.WaitGroup().Done() + l.closeOnce.Do(func() { + l.registry.WaitGroup().Done() + }) } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index f3cc379bd8e94b15b678f0ac1d5ed5b6c917da6a..7ccf32661c75ed4cd27fddd1fc020ded56066f37 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -51,7 +51,7 @@ type etcdV3Registry struct { registry.BaseRegistry cltLock sync.Mutex client *etcdv3.Client - listenerLock sync.Mutex + listenerLock sync.RWMutex listener *etcdv3.EventListener dataListener *dataListener configListener *configurationListener @@ -150,14 +150,9 @@ func (r *etcdV3Registry) CreatePath(k string) error { // DoSubscribe actually subscribe the provider URL func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { - - var ( - configListener *configurationListener - ) - - r.listenerLock.Lock() - configListener = r.configListener - r.listenerLock.Unlock() + r.listenerLock.RLock() + configListener := r.configListener + r.listenerLock.RUnlock() if r.listener == nil { r.cltLock.Lock() client := r.client @@ -165,12 +160,8 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) if client == nil { return nil, perrors.New("etcd client broken") } - - // new client & listener - listener := etcdv3.NewEventListener(r.client) - r.listenerLock.Lock() - r.listener = listener + r.listener = etcdv3.NewEventListener(r.client) // new client & listener r.listenerLock.Unlock() } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 2ca4b8015bc8dbc51a47a46c1bec0301ac961339..c97a7f7b51d2344f1a3fc0c59582e089f1e63b62 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -174,10 +174,6 @@ func (s *serviceDiscoveryRegistry) Register(url *common.URL) error { logger.Warnf("The URL[%s] has been registry!", url.String()) } - err = s.metaDataService.PublishServiceDefinition(url) - if err != nil { - return perrors.WithMessage(err, "publish the service definition failed. ") - } return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""), url.GetParam(constant.GROUP_KEY, ""), url.GetParam(constant.Version, ""), diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index c66928a6367cb2449de79a51b59d122a74a79911..fd6f9585979fabeb86d7a74d5817b6992b6a0ad4 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -36,7 +36,7 @@ import ( // nolint type EventListener struct { client *Client - keyMapLock sync.Mutex + keyMapLock sync.RWMutex keyMap map[string]struct{} wg sync.WaitGroup } @@ -181,9 +181,9 @@ func timeSecondDuration(sec int) time.Duration { // --------> listenServiceNodeEvent func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { - l.keyMapLock.Lock() + l.keyMapLock.RLock() _, ok := l.keyMap[key] - l.keyMapLock.Unlock() + l.keyMapLock.RUnlock() if ok { logger.Warnf("etcdv3 key %s has already been listened.", key) return diff --git a/remoting/getty/getty_client.go b/remoting/getty/getty_client.go index 57221cc6d7f2b9668e6a0518b6678b37e4924bf1..414f045d1e0da91de0b6a184bee10647d3648ff1 100644 --- a/remoting/getty/getty_client.go +++ b/remoting/getty/getty_client.go @@ -19,6 +19,7 @@ package getty import ( "math/rand" + "sync" "time" ) @@ -116,6 +117,7 @@ type Client struct { addr string opts Options conf ClientConfig + mux sync.RWMutex pool *gettyRPCClientPool codec remoting.Codec ExchangeClient *remoting.ExchangeClient @@ -161,10 +163,13 @@ func (c *Client) Connect(url *common.URL) error { // close network connection func (c *Client) Close() { - if c.pool != nil { - c.pool.close() - } + c.mux.Lock() + p := c.pool c.pool = nil + c.mux.Unlock() + if p != nil { + p.close() + } } // send request @@ -204,6 +209,11 @@ func (c *Client) IsAvailable() bool { } func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) { + c.mux.RLock() + defer c.mux.RUnlock() + if c.pool == nil { + return nil, nil, perrors.New("client pool have been closed") + } rpcClient, err := c.pool.getGettyRpcClient(addr) if err != nil { return nil, nil, perrors.WithStack(err)