diff --git a/.gitignore b/.gitignore
index 568e9f24541dd6f02dd8670436fd48db481b7f21..f7622f8ac9cc1ae42ea5203df70c5327f09bf300 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,15 +20,13 @@ classes
# go mod, go test
vendor/
-coverage.txt
-
logs/
.vscode/
-coverage.txt
# unit test
remoting/zookeeper/zookeeper-4unittest/
config_center/zookeeper/zookeeper-4unittest/
registry/zookeeper/zookeeper-4unittest/
+metadata/report/zookeeper/zookeeper-4unittest/
registry/consul/agent*
config_center/apollo/mockDubbog.properties.json
diff --git a/before_ut.bat b/before_ut.bat
index 5e1c877af229b2b30bffc8b802cc35b6aab6c80a..b7c70e8d13dbc7928dd4def843cc70d168d9cc2c 100644
--- a/before_ut.bat
+++ b/before_ut.bat
@@ -34,4 +34,7 @@ md cluster\router\chain\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/"
md cluster\router\condition\zookeeper-4unittest\contrib\fatjar
-xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
+xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
+
+md metadata\report\zookeeper\zookeeper-4unittest\contrib\fatjar
+xcopy /f "%zkJar%" "metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
diff --git a/before_ut.sh b/before_ut.sh
index 7ee92e57a26cbdbb1d1a0b3e792726ad5e1954f8..210e9e723ba9e2118cf642729359808b78fddb8d 100755
--- a/before_ut.sh
+++ b/before_ut.sh
@@ -25,13 +25,16 @@ if [ ! -f "${zkJar}" ]; then
fi
mkdir -p config_center/zookeeper/zookeeper-4unittest/contrib/fatjar
-cp ${zkJar} config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/
+cp ${zkJar} config_center/zookeeper/zookeeper-4unittest/contrib/fatjar
mkdir -p registry/zookeeper/zookeeper-4unittest/contrib/fatjar
-cp ${zkJar} registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
+cp ${zkJar} registry/zookeeper/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/chain/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar
-cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
\ No newline at end of file
+cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
+
+mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
+cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
\ No newline at end of file
diff --git a/go.mod b/go.mod
index ba3cf3e2190de476592278c13ba09c9adaaf947d..85efde193c1835797c310b66b3cd3750ccf2cb59 100644
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,7 @@ require (
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creasty/defaults v1.3.0
github.com/dubbogo/getty v1.3.5
- github.com/dubbogo/go-zookeeper v1.0.0
+ github.com/dubbogo/go-zookeeper v1.0.1
github.com/dubbogo/gost v1.9.0
github.com/emicklei/go-restful/v3 v3.0.0
github.com/go-co-op/gocron v0.1.1
diff --git a/go.sum b/go.sum
index eb84bde1fb26d84ff5a3a3e36ebe178a63cfb4c9..00b1eb02ee39ff96ca8ab68b11a24ee1d59f9d14 100644
--- a/go.sum
+++ b/go.sum
@@ -106,8 +106,8 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dubbogo/getty v1.3.5 h1:xJxdDj9jm7wlrRSsVZSk2TDNxJbbac5GpxV0QpjO+Tw=
github.com/dubbogo/getty v1.3.5/go.mod h1:T55vN8Q6tZjf2AQZiGmkujneD3LfqYbv2b3QjacwYOY=
-github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM=
-github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
+github.com/dubbogo/go-zookeeper v1.0.1 h1:irLzvOsDOTNsN8Sv9tvYYxVu6DCQfLtziZQtUHmZgz8=
+github.com/dubbogo/go-zookeeper v1.0.1/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.9.0 h1:UT+dWwvLyJiDotxJERO75jB3Yxgsdy10KztR5ycxRAk=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
diff --git a/metadata/report/consul/report.go b/metadata/report/consul/report.go
new file mode 100644
index 0000000000000000000000000000000000000000..eb2bdc25ecec596a8f89abb80856b8d6e7be70a4
--- /dev/null
+++ b/metadata/report/consul/report.go
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+ consul "github.com/hashicorp/consul/api"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/metadata/identifier"
+ "github.com/apache/dubbo-go/metadata/report"
+ "github.com/apache/dubbo-go/metadata/report/factory"
+)
+
+var (
+ emptyStrSlice = make([]string, 0)
+)
+
+func init() {
+ mf := &consulMetadataReportFactory{}
+ extension.SetMetadataReportFactory("consul", func() factory.MetadataReportFactory {
+ return mf
+ })
+}
+
+// consulMetadataReport is the implementation of
+// MetadataReport based on consul.
+type consulMetadataReport struct {
+ client *consul.Client
+}
+
+// StoreProviderMetadata stores the metadata.
+func (m *consulMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
+ kv := &consul.KVPair{Key: providerIdentifier.GetIdentifierKey(), Value: []byte(serviceDefinitions)}
+ _, err := m.client.KV().Put(kv, nil)
+ return err
+}
+
+// StoreConsumerMetadata stores the metadata.
+func (m *consulMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
+ kv := &consul.KVPair{Key: consumerMetadataIdentifier.GetIdentifierKey(), Value: []byte(serviceParameterString)}
+ _, err := m.client.KV().Put(kv, nil)
+ return err
+}
+
+// SaveServiceMetadata saves the metadata.
+func (m *consulMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
+ kv := &consul.KVPair{Key: metadataIdentifier.GetIdentifierKey(), Value: []byte(url.String())}
+ _, err := m.client.KV().Put(kv, nil)
+ return err
+}
+
+// RemoveServiceMetadata removes the metadata.
+func (m *consulMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
+ k := metadataIdentifier.GetIdentifierKey()
+ _, err := m.client.KV().Delete(k, nil)
+ return err
+}
+
+// GetExportedURLs gets the urls.
+func (m *consulMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
+ k := metadataIdentifier.GetIdentifierKey()
+ kv, _, err := m.client.KV().Get(k, nil)
+ if err != nil || kv == nil {
+ return emptyStrSlice, err
+ }
+ return []string{string(kv.Value)}, nil
+}
+
+// SaveSubscribedData saves the urls.
+func (m *consulMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error {
+ kv := &consul.KVPair{Key: subscriberMetadataIdentifier.GetIdentifierKey(), Value: []byte(urls)}
+ _, err := m.client.KV().Put(kv, nil)
+ return err
+}
+
+// GetSubscribedURLs gets the urls.
+func (m *consulMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
+ k := subscriberMetadataIdentifier.GetIdentifierKey()
+ kv, _, err := m.client.KV().Get(k, nil)
+ if err != nil || kv == nil {
+ return emptyStrSlice, err
+ }
+ return []string{string(kv.Value)}, nil
+}
+
+// GetServiceDefinition gets the service definition.
+func (m *consulMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
+ k := metadataIdentifier.GetIdentifierKey()
+ kv, _, err := m.client.KV().Get(k, nil)
+ if err != nil || kv == nil {
+ return "", err
+ }
+ return string(kv.Value), nil
+}
+
+type consulMetadataReportFactory struct {
+}
+
+// nolint
+func (mf *consulMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
+ config := &consul.Config{Address: url.Location}
+ client, err := consul.NewClient(config)
+ if err != nil {
+ panic(err)
+ }
+ return &consulMetadataReport{client: client}
+}
diff --git a/metadata/report/consul/report_test.go b/metadata/report/consul/report_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..34ee29de945f2b9ac6978a55008048e62f4c6812
--- /dev/null
+++ b/metadata/report/consul/report_test.go
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+ "encoding/json"
+ "net/url"
+ "strconv"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/metadata/identifier"
+ "github.com/apache/dubbo-go/metadata/report"
+ "github.com/apache/dubbo-go/remoting/consul"
+)
+
+func newProviderRegistryUrl(host string, port int) *common.URL {
+ return common.NewURLWithOptions(
+ common.WithIp(host),
+ common.WithPort(strconv.Itoa(port)),
+ common.WithParams(url.Values{}),
+ common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
+ )
+}
+
+func newBaseMetadataIdentifier(side string) *identifier.BaseMetadataIdentifier {
+ return &identifier.BaseMetadataIdentifier{
+ ServiceInterface: "org.apache.HelloWorld",
+ Version: "1.0.0",
+ Group: "group",
+ Side: side,
+ }
+}
+
+func newMetadataIdentifier(side string) *identifier.MetadataIdentifier {
+ return &identifier.MetadataIdentifier{
+ Application: "application",
+ BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
+ }
+}
+
+func newServiceMetadataIdentifier(side string) *identifier.ServiceMetadataIdentifier {
+ return &identifier.ServiceMetadataIdentifier{
+ Revision: "1.0",
+ Protocol: "dubbo",
+ BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
+ }
+}
+
+func newSubscribeMetadataIdentifier(side string) *identifier.SubscriberMetadataIdentifier {
+ return &identifier.SubscriberMetadataIdentifier{
+ Revision: "1.0",
+ MetadataIdentifier: *newMetadataIdentifier(side),
+ }
+}
+
+type consulMetadataReportTestSuite struct {
+ t *testing.T
+ m report.MetadataReport
+}
+
+func newConsulMetadataReportTestSuite(t *testing.T, m report.MetadataReport) *consulMetadataReportTestSuite {
+ return &consulMetadataReportTestSuite{t: t, m: m}
+}
+
+func (suite *consulMetadataReportTestSuite) testStoreProviderMetadata() {
+ providerMi := newMetadataIdentifier("provider")
+ providerMeta := "provider"
+ err := suite.m.StoreProviderMetadata(providerMi, providerMeta)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *consulMetadataReportTestSuite) testStoreConsumerMetadata() {
+ consumerMi := newMetadataIdentifier("consumer")
+ consumerMeta := "consumer"
+ err := suite.m.StoreProviderMetadata(consumerMi, consumerMeta)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *consulMetadataReportTestSuite) testSaveServiceMetadata(url common.URL) {
+ serviceMi := newServiceMetadataIdentifier("provider")
+ err := suite.m.SaveServiceMetadata(serviceMi, url)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *consulMetadataReportTestSuite) testRemoveServiceMetadata() {
+ serviceMi := newServiceMetadataIdentifier("provider")
+ err := suite.m.RemoveServiceMetadata(serviceMi)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *consulMetadataReportTestSuite) testGetExportedURLs() {
+ serviceMi := newServiceMetadataIdentifier("provider")
+ urls, err := suite.m.GetExportedURLs(serviceMi)
+ assert.Equal(suite.t, 1, len(urls))
+ assert.NoError(suite.t, err)
+}
+
+func (suite *consulMetadataReportTestSuite) testSaveSubscribedData(url common.URL) {
+ subscribeMi := newSubscribeMetadataIdentifier("provider")
+ urls := []string{url.String()}
+ bytes, _ := json.Marshal(urls)
+ err := suite.m.SaveSubscribedData(subscribeMi, string(bytes))
+ assert.Nil(suite.t, err)
+}
+
+func (suite *consulMetadataReportTestSuite) testGetSubscribedURLs() {
+ subscribeMi := newSubscribeMetadataIdentifier("provider")
+ urls, err := suite.m.GetSubscribedURLs(subscribeMi)
+ assert.Equal(suite.t, 1, len(urls))
+ assert.NoError(suite.t, err)
+}
+
+func (suite *consulMetadataReportTestSuite) testGetServiceDefinition() {
+ providerMi := newMetadataIdentifier("provider")
+ providerMeta, err := suite.m.GetServiceDefinition(providerMi)
+ assert.Equal(suite.t, "provider", providerMeta)
+ assert.NoError(suite.t, err)
+}
+
+func test1(t *testing.T) {
+ consulAgent := consul.NewConsulAgent(t, 8500)
+ defer consulAgent.Close()
+
+ url := newProviderRegistryUrl("localhost", 8500)
+ mf := extension.GetMetadataReportFactory("consul")
+ m := mf.CreateMetadataReport(url)
+
+ suite := newConsulMetadataReportTestSuite(t, m)
+ suite.testStoreProviderMetadata()
+ suite.testStoreConsumerMetadata()
+ suite.testSaveServiceMetadata(*url)
+ suite.testGetExportedURLs()
+ suite.testRemoveServiceMetadata()
+ suite.testSaveSubscribedData(*url)
+ suite.testGetSubscribedURLs()
+ suite.testGetServiceDefinition()
+}
+
+func TestConsulMetadataReport(t *testing.T) {
+ t.Run("test1", test1)
+}
diff --git a/metadata/report/delegate/delegate_report.go b/metadata/report/delegate/delegate_report.go
index cb7e42030b2dec32b0537b20e2f825e638f228d0..64103b259ac0107e5b30aa64a21d80e34850ed45 100644
--- a/metadata/report/delegate/delegate_report.go
+++ b/metadata/report/delegate/delegate_report.go
@@ -26,6 +26,7 @@ import (
import (
"github.com/go-co-op/gocron"
+ perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
@@ -234,29 +235,38 @@ func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMe
}
// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls
-func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string {
+func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
report := instance.GetMetadataReportInstance()
return report.GetExportedURLs(identifier)
}
// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
+ urlStrList := make([]string, 0, len(urls))
+ for _, url := range urls {
+ urlStrList = append(urlStrList, url.String())
+ }
+ bytes, err := json.Marshal(urlStrList)
+ if err != nil {
+ return perrors.WithMessage(err, "Could not convert the array to json")
+ }
+
report := instance.GetMetadataReportInstance()
if mr.syncReport {
- return report.SaveSubscribedData(identifier, urls)
+ return report.SaveSubscribedData(identifier, string(bytes))
}
- go report.SaveSubscribedData(identifier, urls)
+ go report.SaveSubscribedData(identifier, string(bytes))
return nil
}
// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls
-func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string {
+func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
report := instance.GetMetadataReportInstance()
return report.GetSubscribedURLs(identifier)
}
// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions
-func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string {
+func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) (string, error) {
report := instance.GetMetadataReportInstance()
return report.GetServiceDefinition(identifier)
}
diff --git a/metadata/report/delegate/delegate_report_test.go b/metadata/report/delegate/delegate_report_test.go
index 04c9e6483929d3ed58fd85337db6ccb4ebd53d00..3dfca577ba06598b90c553048777951c8823b256 100644
--- a/metadata/report/delegate/delegate_report_test.go
+++ b/metadata/report/delegate/delegate_report_test.go
@@ -75,14 +75,14 @@ func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) {
func mockNewMetadataReport(t *testing.T) *MetadataReport {
syncReportKey := "false"
- retryPeroidKey := "3"
+ retryPeriodKey := "3"
retryTimesKey := "100"
cycleReportKey := "true"
url, err := common.NewURL(fmt.Sprintf(
"test://127.0.0.1:20000/?"+constant.SYNC_REPORT_KEY+"=%v&"+constant.RETRY_PERIOD_KEY+"=%v&"+
constant.RETRY_TIMES_KEY+"=%v&"+constant.CYCLE_REPORT_KEY+"=%v",
- syncReportKey, retryPeroidKey, retryTimesKey, cycleReportKey))
+ syncReportKey, retryPeriodKey, retryTimesKey, cycleReportKey))
assert.NoError(t, err)
instance.SetMetadataReportUrl(url)
mtr, err := NewMetadataReport()
diff --git a/metadata/report/factory/report_factory.go b/metadata/report/factory/report_factory.go
index 8769ebdd2fd1f088415232bd4463d02f7ebd730f..9f00007cefbd5737c9c53d69924eba1d556c0023 100644
--- a/metadata/report/factory/report_factory.go
+++ b/metadata/report/factory/report_factory.go
@@ -22,10 +22,6 @@ import (
"github.com/apache/dubbo-go/metadata/report"
)
-var (
- MetadataReportInstance report.MetadataReport
-)
-
// MetadataReportFactory interface will create metadata report
type MetadataReportFactory interface {
CreateMetadataReport(*common.URL) report.MetadataReport
diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go
index 8f29c7de0f263de206041464701819ba0fc9e133..d69913bd8fbb04da2d50770c1196917cb1efdaa5 100644
--- a/metadata/report/nacos/report.go
+++ b/metadata/report/nacos/report.go
@@ -18,7 +18,6 @@
package nacos
import (
- "encoding/json"
"net/url"
)
@@ -39,19 +38,19 @@ import (
)
func init() {
- ftry := &nacosMetadataReportFactory{}
+ mf := &nacosMetadataReportFactory{}
extension.SetMetadataReportFactory("nacos", func() factory.MetadataReportFactory {
- return ftry
+ return mf
})
}
-// nacosMetadataReport is the implementation of MetadataReport based Nacos
+// nacosMetadataReport is the implementation
+// of MetadataReport based on nacos.
type nacosMetadataReport struct {
client config_client.IConfigClient
}
-// StoreProviderMetadata will store the metadata
-// metadata including the basic info of the server, provider info, and other user custom info
+// StoreProviderMetadata stores the metadata.
func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: providerIdentifier.GetIdentifierKey(),
@@ -60,8 +59,7 @@ func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifi
})
}
-// StoreConsumerMetadata will store the metadata
-// metadata including the basic info of the server, consumer info, and other user custom info
+// StoreConsumerMetadata stores the metadata.
func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: consumerMetadataIdentifier.GetIdentifierKey(),
@@ -70,8 +68,7 @@ func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *
})
}
-// SaveServiceMetadata will store the metadata
-// metadata including the basic info of the server, service info, and other user custom info
+// SaveServiceMetadata saves the metadata.
func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
@@ -80,7 +77,7 @@ func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier
})
}
-// RemoveServiceMetadata will remove the service metadata
+// RemoveServiceMetadata removes the metadata.
func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
return n.deleteMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
@@ -88,50 +85,33 @@ func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifi
})
}
-// GetExportedURLs will look up the exported urls.
-// if not found, an empty list will be returned.
-func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string {
+// GetExportedURLs gets the urls.
+func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
return n.getConfigAsArray(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
})
}
-// SaveSubscribedData will convert the urlList to json array and then store it
-func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlList []common.URL) error {
- if len(urlList) == 0 {
- logger.Warnf("The url list is empty")
- return nil
- }
- urlStrList := make([]string, 0, len(urlList))
-
- for _, e := range urlList {
- urlStrList = append(urlStrList, e.String())
- }
-
- bytes, err := json.Marshal(urlStrList)
-
- if err != nil {
- return perrors.WithMessage(err, "Could not convert the array to json")
- }
+// SaveSubscribedData saves the urls.
+func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: subscriberMetadataIdentifier.GetIdentifierKey(),
Group: subscriberMetadataIdentifier.Group,
- Content: string(bytes),
+ Content: urls,
})
}
-// GetSubscribedURLs will lookup the url
-// if not found, an empty list will be returned
-func (n *nacosMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string {
+// GetSubscribedURLs gets the urls.
+func (n *nacosMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
return n.getConfigAsArray(vo.ConfigParam{
DataId: subscriberMetadataIdentifier.GetIdentifierKey(),
Group: subscriberMetadataIdentifier.Group,
})
}
-// GetServiceDefinition will lookup the service definition
-func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string {
+// GetServiceDefinition gets the service definition.
+func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
return n.getConfig(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
@@ -165,33 +145,38 @@ func (n *nacosMetadataReport) deleteMetadata(param vo.ConfigParam) error {
// getConfigAsArray will read the config and then convert it as an one-element array
// error or config not found, an empty list will be returned.
-func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) []string {
- cfg := n.getConfig(param)
+func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) ([]string, error) {
res := make([]string, 0, 1)
- if len(cfg) == 0 {
- return res
+
+ cfg, err := n.getConfig(param)
+ if err != nil || len(cfg) == 0 {
+ return res, err
}
+
decodeCfg, err := url.QueryUnescape(cfg)
if err != nil {
logger.Errorf("The config is invalid: %s", cfg)
- return res
+ return res, err
}
+
res = append(res, decodeCfg)
- return res
+ return res, nil
}
// getConfig will read the config
-func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) string {
+func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) (string, error) {
cfg, err := n.client.GetConfig(param)
if err != nil {
logger.Errorf("Finding the configuration failed: %v", param)
+ return "", err
}
- return cfg
+ return cfg, nil
}
type nacosMetadataReportFactory struct {
}
+// nolint
func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
client, err := nacos.NewNacosConfigClient(url)
if err != nil {
diff --git a/metadata/report/nacos/report_test.go b/metadata/report/nacos/report_test.go
index 19ca2e5a480766e2c0ae8d8dbd7d9b294ce4b905..be01eb22f7e95966c3bf816fdf648629b64380a3 100644
--- a/metadata/report/nacos/report_test.go
+++ b/metadata/report/nacos/report_test.go
@@ -18,6 +18,7 @@
package nacos
import (
+ "encoding/json"
"strconv"
"testing"
)
@@ -41,6 +42,7 @@ func TestNacosMetadataReport_CRUD(t *testing.T) {
providerMi := newMetadataIdentifier("server")
providerMeta := "provider"
err := rpt.StoreProviderMetadata(providerMi, providerMeta)
+ assert.Nil(t, err)
consumerMi := newMetadataIdentifier("client")
consumerMeta := "consumer"
@@ -49,25 +51,25 @@ func TestNacosMetadataReport_CRUD(t *testing.T) {
serviceMi := newServiceMetadataIdentifier()
serviceUrl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
-
err = rpt.SaveServiceMetadata(serviceMi, serviceUrl)
assert.Nil(t, err)
- exportedUrls := rpt.GetExportedURLs(serviceMi)
+ exportedUrls, err := rpt.GetExportedURLs(serviceMi)
assert.Equal(t, 1, len(exportedUrls))
+ assert.Nil(t, err)
subMi := newSubscribeMetadataIdentifier()
- urlList := make([]common.URL, 0, 1)
- urlList = append(urlList, serviceUrl)
- err = rpt.SaveSubscribedData(subMi, urlList)
+ urls := []string{serviceUrl.String()}
+ bytes, _ := json.Marshal(urls)
+ err = rpt.SaveSubscribedData(subMi, string(bytes))
assert.Nil(t, err)
- subscribeUrl := rpt.GetSubscribedURLs(subMi)
+ subscribeUrl, err := rpt.GetSubscribedURLs(subMi)
assert.Equal(t, 1, len(subscribeUrl))
+ assert.Nil(t, err)
err = rpt.RemoveServiceMetadata(serviceMi)
assert.Nil(t, err)
-
}
func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier {
@@ -75,7 +77,6 @@ func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier {
Revision: "subscribe",
MetadataIdentifier: *newMetadataIdentifier("provider"),
}
-
}
func newServiceMetadataIdentifier() *identifier.ServiceMetadataIdentifier {
diff --git a/metadata/report/report.go b/metadata/report/report.go
index 61cdda1f9663a9f4eaed157d7c0232e4e911c80d..62a9055e843297bd0d69ad94cb09ece64efda85f 100644
--- a/metadata/report/report.go
+++ b/metadata/report/report.go
@@ -22,14 +22,39 @@ import (
"github.com/apache/dubbo-go/metadata/identifier"
)
-// MetadataReport is an interface of remote metadata report
+// MetadataReport is an interface of
+// remote metadata report.
type MetadataReport interface {
+ // StoreProviderMetadata stores the metadata.
+ // Metadata includes the basic info of the server,
+ // provider info, and other user custom info.
StoreProviderMetadata(*identifier.MetadataIdentifier, string) error
+
+ // StoreConsumerMetadata stores the metadata.
+ // Metadata includes the basic info of the server,
+ // consumer info, and other user custom info.
StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error
+
+ // SaveServiceMetadata saves the metadata.
+ // Metadata includes the basic info of the server,
+ // service info, and other user custom info.
SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error
+
+ // RemoveServiceMetadata removes the metadata.
RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error
- GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string
- SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error
- GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string
- GetServiceDefinition(*identifier.MetadataIdentifier) string
+
+ // GetExportedURLs gets the urls.
+ // If not found, an empty list will be returned.
+ GetExportedURLs(*identifier.ServiceMetadataIdentifier) ([]string, error)
+
+ // SaveSubscribedData saves the urls.
+ // If not found, an empty str will be returned.
+ SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, string) error
+
+ // GetSubscribedURLs gets the urls.
+ // If not found, an empty list will be returned.
+ GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) ([]string, error)
+
+ // GetServiceDefinition gets the service definition.
+ GetServiceDefinition(*identifier.MetadataIdentifier) (string, error)
}
diff --git a/metadata/report/zookeeper/report.go b/metadata/report/zookeeper/report.go
new file mode 100644
index 0000000000000000000000000000000000000000..8f46bb023054ec27ca0dbff2c249dc0135af07cd
--- /dev/null
+++ b/metadata/report/zookeeper/report.go
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "strings"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/metadata/identifier"
+ "github.com/apache/dubbo-go/metadata/report"
+ "github.com/apache/dubbo-go/metadata/report/factory"
+ "github.com/apache/dubbo-go/remoting/zookeeper"
+)
+
+var (
+ emptyStrSlice = make([]string, 0)
+)
+
+func init() {
+ mf := &zookeeperMetadataReportFactory{}
+ extension.SetMetadataReportFactory("zookeeper", func() factory.MetadataReportFactory {
+ return mf
+ })
+}
+
+// zookeeperMetadataReport is the implementation of
+// MetadataReport based on zookeeper.
+type zookeeperMetadataReport struct {
+ client *zookeeper.ZookeeperClient
+ rootDir string
+}
+
+// StoreProviderMetadata stores the metadata.
+func (m *zookeeperMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
+ k := m.rootDir + providerIdentifier.GetFilePathKey()
+ return m.client.CreateWithValue(k, []byte(serviceDefinitions))
+}
+
+// StoreConsumerMetadata stores the metadata.
+func (m *zookeeperMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
+ k := m.rootDir + consumerMetadataIdentifier.GetFilePathKey()
+ return m.client.CreateWithValue(k, []byte(serviceParameterString))
+}
+
+// SaveServiceMetadata saves the metadata.
+func (m *zookeeperMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
+ k := m.rootDir + metadataIdentifier.GetFilePathKey()
+ return m.client.CreateWithValue(k, []byte(url.String()))
+}
+
+// RemoveServiceMetadata removes the metadata.
+func (m *zookeeperMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
+ k := m.rootDir + metadataIdentifier.GetFilePathKey()
+ return m.client.Delete(k)
+}
+
+// GetExportedURLs gets the urls.
+func (m *zookeeperMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
+ k := m.rootDir + metadataIdentifier.GetFilePathKey()
+ v, _, err := m.client.GetContent(k)
+ if err != nil || len(v) == 0 {
+ return emptyStrSlice, err
+ }
+ return []string{string(v)}, nil
+}
+
+// SaveSubscribedData saves the urls.
+func (m *zookeeperMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error {
+ k := m.rootDir + subscriberMetadataIdentifier.GetFilePathKey()
+ return m.client.CreateWithValue(k, []byte(urls))
+}
+
+// GetSubscribedURLs gets the urls.
+func (m *zookeeperMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
+ k := m.rootDir + subscriberMetadataIdentifier.GetFilePathKey()
+ v, _, err := m.client.GetContent(k)
+ if err != nil || len(v) == 0 {
+ return emptyStrSlice, err
+ }
+ return []string{string(v)}, nil
+}
+
+// GetServiceDefinition gets the service definition.
+func (m *zookeeperMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
+ k := m.rootDir + metadataIdentifier.GetFilePathKey()
+ v, _, err := m.client.GetContent(k)
+ return string(v), err
+}
+
+type zookeeperMetadataReportFactory struct {
+}
+
+// nolint
+func (mf *zookeeperMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
+ client, err := zookeeper.NewZookeeperClient(
+ "zookeeperMetadataReport",
+ strings.Split(url.Location, ","),
+ 15*time.Second,
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ rootDir := url.GetParam(constant.GROUP_KEY, "dubbo")
+ if !strings.HasPrefix(rootDir, constant.PATH_SEPARATOR) {
+ rootDir = constant.PATH_SEPARATOR + rootDir
+ }
+ if rootDir != constant.PATH_SEPARATOR {
+ rootDir = rootDir + constant.PATH_SEPARATOR
+ }
+
+ return &zookeeperMetadataReport{client: client, rootDir: rootDir}
+}
diff --git a/metadata/report/zookeeper/report_test.go b/metadata/report/zookeeper/report_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..a1e46e2e8d019c0415699ee409833b392a85b504
--- /dev/null
+++ b/metadata/report/zookeeper/report_test.go
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "encoding/json"
+ "net/url"
+ "strconv"
+ "testing"
+)
+
+import (
+ "github.com/dubbogo/go-zookeeper/zk"
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/extension"
+ "github.com/apache/dubbo-go/metadata/identifier"
+ "github.com/apache/dubbo-go/metadata/report"
+)
+
+func newProviderRegistryUrl(host string, port int) *common.URL {
+ return common.NewURLWithOptions(
+ common.WithIp(host),
+ common.WithPort(strconv.Itoa(port)),
+ common.WithParams(url.Values{}),
+ common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
+ )
+}
+
+func newBaseMetadataIdentifier(side string) *identifier.BaseMetadataIdentifier {
+ return &identifier.BaseMetadataIdentifier{
+ ServiceInterface: "org.apache.HelloWorld",
+ Version: "1.0.0",
+ Group: "group",
+ Side: side,
+ }
+}
+
+func newMetadataIdentifier(side string) *identifier.MetadataIdentifier {
+ return &identifier.MetadataIdentifier{
+ Application: "application",
+ BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
+ }
+}
+
+func newServiceMetadataIdentifier(side string) *identifier.ServiceMetadataIdentifier {
+ return &identifier.ServiceMetadataIdentifier{
+ Revision: "1.0",
+ Protocol: "dubbo",
+ BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
+ }
+}
+
+func newSubscribeMetadataIdentifier(side string) *identifier.SubscriberMetadataIdentifier {
+ return &identifier.SubscriberMetadataIdentifier{
+ Revision: "1.0",
+ MetadataIdentifier: *newMetadataIdentifier(side),
+ }
+}
+
+type zookeeperMetadataReportTestSuite struct {
+ t *testing.T
+ m report.MetadataReport
+}
+
+func newZookeeperMetadataReportTestSuite(t *testing.T, m report.MetadataReport) *zookeeperMetadataReportTestSuite {
+ return &zookeeperMetadataReportTestSuite{t: t, m: m}
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testStoreProviderMetadata() {
+ providerMi := newMetadataIdentifier("provider")
+ providerMeta := "provider"
+ err := suite.m.StoreProviderMetadata(providerMi, providerMeta)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testStoreConsumerMetadata() {
+ consumerMi := newMetadataIdentifier("consumer")
+ consumerMeta := "consumer"
+ err := suite.m.StoreProviderMetadata(consumerMi, consumerMeta)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testSaveServiceMetadata(url common.URL) {
+ serviceMi := newServiceMetadataIdentifier("provider")
+ err := suite.m.SaveServiceMetadata(serviceMi, url)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testRemoveServiceMetadata() {
+ serviceMi := newServiceMetadataIdentifier("provider")
+ err := suite.m.RemoveServiceMetadata(serviceMi)
+ assert.NoError(suite.t, err)
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testGetExportedURLs() {
+ serviceMi := newServiceMetadataIdentifier("provider")
+ urls, err := suite.m.GetExportedURLs(serviceMi)
+ assert.Equal(suite.t, 1, len(urls))
+ assert.NoError(suite.t, err)
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testSaveSubscribedData(url common.URL) {
+ subscribeMi := newSubscribeMetadataIdentifier("provider")
+ urls := []string{url.String()}
+ bytes, _ := json.Marshal(urls)
+ err := suite.m.SaveSubscribedData(subscribeMi, string(bytes))
+ assert.Nil(suite.t, err)
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testGetSubscribedURLs() {
+ subscribeMi := newSubscribeMetadataIdentifier("provider")
+ urls, err := suite.m.GetSubscribedURLs(subscribeMi)
+ assert.Equal(suite.t, 1, len(urls))
+ assert.NoError(suite.t, err)
+}
+
+func (suite *zookeeperMetadataReportTestSuite) testGetServiceDefinition() {
+ providerMi := newMetadataIdentifier("provider")
+ providerMeta, err := suite.m.GetServiceDefinition(providerMi)
+ assert.Equal(suite.t, "provider", providerMeta)
+ assert.NoError(suite.t, err)
+}
+
+func test1(t *testing.T) {
+ testCluster, err := zk.StartTestCluster(1, nil, nil)
+ assert.NoError(t, err)
+ defer testCluster.Stop()
+
+ url := newProviderRegistryUrl("127.0.0.1", testCluster.Servers[0].Port)
+ mf := extension.GetMetadataReportFactory("zookeeper")
+ m := mf.CreateMetadataReport(url)
+
+ suite := newZookeeperMetadataReportTestSuite(t, m)
+ suite.testStoreProviderMetadata()
+ suite.testStoreConsumerMetadata()
+ suite.testSaveServiceMetadata(*url)
+ suite.testGetExportedURLs()
+ suite.testRemoveServiceMetadata()
+ suite.testSaveSubscribedData(*url)
+ suite.testGetSubscribedURLs()
+ suite.testGetServiceDefinition()
+}
+
+func TestZookeeperMetadataReport(t *testing.T) {
+ t.Run("test1", test1)
+}
diff --git a/metadata/service/remote/service_test.go b/metadata/service/remote/service_test.go
index 308c631e413be9c3c6735f31c56da2e8f0697333..3b4a12045c8ca8dc154b0653528b0d34e5220723 100644
--- a/metadata/service/remote/service_test.go
+++ b/metadata/service/remote/service_test.go
@@ -39,7 +39,7 @@ import (
)
var serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4)
-var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4)
+var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier]string, 4)
func getMetadataReportFactory() factory.MetadataReportFactory {
return &metadataReportFactory{}
@@ -73,28 +73,27 @@ func (metadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifie
return nil
}
-func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string {
- return nil
+func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) ([]string, error) {
+ return nil, nil
}
-func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
+func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls string) error {
logger.Infof("SaveSubscribedData, , url is %v", urls)
subscribedMetadata[id] = urls
return nil
}
-func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string {
- return nil
+func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) ([]string, error) {
+ return nil, nil
}
-func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) string {
- return ""
+func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) (string, error) {
+ return "", nil
}
func TestMetadataService(t *testing.T) {
extension.SetMetadataReportFactory("mock", getMetadataReportFactory)
- u, err := common.NewURL(fmt.Sprintf(
- "mock://127.0.0.1:20000/?sync.report=true"))
+ u, err := common.NewURL(fmt.Sprintf("mock://127.0.0.1:20000/?sync.report=true"))
assert.NoError(t, err)
instance.GetMetadataReportInstance(&u)
mts, err := NewMetadataService()
@@ -112,6 +111,7 @@ func mockInmemoryProc(t *testing.T) *inmemory.MetadataService {
version := "0.0.1"
protocol := "dubbo"
beanName := "UserProvider"
+ userProvider := &definition.UserProvider{}
u, err := common.NewURL(fmt.Sprintf(
"%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+
@@ -120,13 +120,17 @@ func mockInmemoryProc(t *testing.T) *inmemory.MetadataService {
"owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v",
protocol, serviceName, group, version, beanName))
assert.NoError(t, err)
- mts.ExportURL(u)
- mts.SubscribeURL(u)
+ _, err = mts.ExportURL(u)
+ assert.NoError(t, err)
+ _, err = mts.SubscribeURL(u)
+ assert.NoError(t, err)
+
+ _, err = common.ServiceMap.Register(serviceName, protocol, userProvider)
+ assert.NoError(t, err)
+ err = mts.PublishServiceDefinition(u)
+ assert.NoError(t, err)
- userProvider := &definition.UserProvider{}
- common.ServiceMap.Register(serviceName, protocol, userProvider)
- mts.PublishServiceDefinition(u)
expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," +
"\"Methods\":[{\"Name\":\"GetUser\",\"ParameterTypes\":[\"slice\"],\"ReturnType\":\"ptr\"," +
"\"Parameters\":null}],\"Types\":null}"
diff --git a/registry/consul/utils_test.go b/registry/consul/utils_test.go
index d66600b773ee78b43ac3da4edf8849d0019c744d..327dd95f7181907f6635c7fe89ef726bdcef5204 100644
--- a/registry/consul/utils_test.go
+++ b/registry/consul/utils_test.go
@@ -19,24 +19,19 @@ package consul
import (
"fmt"
- "io/ioutil"
"net"
"net/url"
- "os"
"strconv"
"sync"
"testing"
)
-import (
- "github.com/hashicorp/consul/agent"
-)
-
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
+ "github.com/apache/dubbo-go/remoting/consul"
)
var (
@@ -51,71 +46,39 @@ var (
)
func newProviderRegistryUrl(host string, port int) *common.URL {
- url1 := common.NewURLWithOptions(
+ return common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
)
- return url1
}
func newConsumerRegistryUrl(host string, port int) *common.URL {
- url1 := common.NewURLWithOptions(
+ return common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)),
)
- return url1
}
func newProviderUrl(host string, port int, service string, protocol string) common.URL {
- url1 := common.NewURLWithOptions(
+ return *common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithPath(service),
common.WithProtocol(protocol),
)
- return *url1
}
func newConsumerUrl(host string, port int, service string, protocol string) common.URL {
- url1 := common.NewURLWithOptions(
+ return *common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithPath(service),
common.WithProtocol(protocol),
)
- return *url1
-}
-
-type testConsulAgent struct {
- dataDir string
- testAgent *agent.TestAgent
-}
-
-func newConsulAgent(t *testing.T, port int) *testConsulAgent {
- dataDir, _ := ioutil.TempDir("./", "agent")
- hcl := `
- ports {
- http = ` + strconv.Itoa(port) + `
- }
- data_dir = "` + dataDir + `"
- `
- testAgent := &agent.TestAgent{Name: t.Name(), DataDir: dataDir, HCL: hcl}
- testAgent.Start(t)
-
- consulAgent := &testConsulAgent{
- dataDir: dataDir,
- testAgent: testAgent,
- }
- return consulAgent
-}
-
-func (consulAgent *testConsulAgent) close() {
- consulAgent.testAgent.Shutdown()
- os.RemoveAll(consulAgent.dataDir)
}
type testServer struct {
@@ -184,8 +147,8 @@ func (suite *consulRegistryTestSuite) close() {
// register -> subscribe -> unregister
func test1(t *testing.T) {
- consulAgent := newConsulAgent(t, registryPort)
- defer consulAgent.close()
+ consulAgent := consul.NewConsulAgent(t, registryPort)
+ defer consulAgent.Close()
server := newServer(providerHost, providerPort)
defer server.close()
@@ -204,8 +167,8 @@ func test1(t *testing.T) {
// subscribe -> register
func test2(t *testing.T) {
- consulAgent := newConsulAgent(t, registryPort)
- defer consulAgent.close()
+ consulAgent := consul.NewConsulAgent(t, registryPort)
+ defer consulAgent.Close()
server := newServer(providerHost, providerPort)
defer server.close()
diff --git a/remoting/consul/test_agent.go b/remoting/consul/test_agent.go
new file mode 100644
index 0000000000000000000000000000000000000000..fd0694bde37b84cd59eed29e0c2ee3136f4ef51b
--- /dev/null
+++ b/remoting/consul/test_agent.go
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+ "io/ioutil"
+ "os"
+ "strconv"
+ "testing"
+)
+
+import (
+ "github.com/hashicorp/consul/agent"
+)
+
+// Consul agent, used for test, simulates
+// an embedded consul server.
+type ConsulAgent struct {
+ dataDir string
+ testAgent *agent.TestAgent
+}
+
+func NewConsulAgent(t *testing.T, port int) *ConsulAgent {
+ dataDir, _ := ioutil.TempDir("./", "agent")
+ hcl := `
+ ports {
+ http = ` + strconv.Itoa(port) + `
+ }
+ data_dir = "` + dataDir + `"
+ `
+ testAgent := &agent.TestAgent{Name: t.Name(), DataDir: dataDir, HCL: hcl}
+ testAgent.Start(t)
+
+ consulAgent := &ConsulAgent{
+ dataDir: dataDir,
+ testAgent: testAgent,
+ }
+ return consulAgent
+}
+
+func (consulAgent *ConsulAgent) Close() error {
+ var err error
+
+ err = consulAgent.testAgent.Shutdown()
+ if err != nil {
+ return err
+ }
+
+ err = os.RemoveAll(consulAgent.dataDir)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/remoting/consul/test_agent_test.go b/remoting/consul/test_agent_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..8cf0ac6cd80e517ab7bc1b52cc7774a708082d5e
--- /dev/null
+++ b/remoting/consul/test_agent_test.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package consul
+
+import (
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewConsulAgent(t *testing.T) {
+ consulAgent := NewConsulAgent(t, 8500)
+ err := consulAgent.Close()
+ assert.NoError(t, err)
+}
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 7904dc74e878ac6dfc99f52d6d0a59f99812e5dc..feceeb1197f3f20c8185f09bdffd2c94cabbab82 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -89,8 +89,6 @@ func StateToString(state zk.State) string {
default:
return state.String()
}
-
- return "zookeeper unknown state"
}
// Options ...
@@ -137,7 +135,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
zkAddresses := strings.Split(url.Location, ",")
- newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout)
+ newClient, err := NewZookeeperClient(options.zkName, zkAddresses, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
options.zkName, url.Location, timeout.String(), err)
@@ -165,7 +163,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}
-func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
+func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
var (
err error
event <-chan zk.Event
@@ -255,7 +253,7 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
case <-z.exit:
return
case event = <-session:
- logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
+ logger.Infof("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
switch (int)(event.State) {
case (int)(zk.StateDisconnected):