Skip to content
Snippets Groups Projects
Unverified Commit 5a121370 authored by panty's avatar panty Committed by GitHub
Browse files

Merge pull request #633 from dubbo-x/metadata

Ftr:  zk/consul Metadata
parents 024f7b2e 302dbc22
No related branches found
No related tags found
No related merge requests found
Showing
with 822 additions and 145 deletions
......@@ -20,15 +20,13 @@ classes
# go mod, go test
vendor/
coverage.txt
logs/
.vscode/
coverage.txt
# unit test
remoting/zookeeper/zookeeper-4unittest/
config_center/zookeeper/zookeeper-4unittest/
registry/zookeeper/zookeeper-4unittest/
metadata/report/zookeeper/zookeeper-4unittest/
registry/consul/agent*
config_center/apollo/mockDubbog.properties.json
......@@ -34,4 +34,7 @@ md cluster\router\chain\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/"
md cluster\router\condition\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
xcopy /f "%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/"
md metadata\report\zookeeper\zookeeper-4unittest\contrib\fatjar
xcopy /f "%zkJar%" "metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar/"
\ No newline at end of file
......@@ -25,13 +25,16 @@ if [ ! -f "${zkJar}" ]; then
fi
mkdir -p config_center/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/
cp ${zkJar} config_center/zookeeper/zookeeper-4unittest/contrib/fatjar
mkdir -p registry/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} registry/zookeeper/zookeeper-4unittest/contrib/fatjar/
cp ${zkJar} registry/zookeeper/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/chain/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/chain/zookeeper-4unittest/contrib/fatjar
mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
\ No newline at end of file
cp ${zkJar} cluster/router/condition/zookeeper-4unittest/contrib/fatjar
mkdir -p metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
cp ${zkJar} metadata/report/zookeeper/zookeeper-4unittest/contrib/fatjar
\ No newline at end of file
......@@ -11,7 +11,7 @@ require (
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creasty/defaults v1.3.0
github.com/dubbogo/getty v1.3.5
github.com/dubbogo/go-zookeeper v1.0.0
github.com/dubbogo/go-zookeeper v1.0.1
github.com/dubbogo/gost v1.9.0
github.com/emicklei/go-restful/v3 v3.0.0
github.com/go-co-op/gocron v0.1.1
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
consul "github.com/hashicorp/consul/api"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
)
var (
emptyStrSlice = make([]string, 0)
)
func init() {
mf := &consulMetadataReportFactory{}
extension.SetMetadataReportFactory("consul", func() factory.MetadataReportFactory {
return mf
})
}
// consulMetadataReport is the implementation of
// MetadataReport based on consul.
type consulMetadataReport struct {
client *consul.Client
}
// StoreProviderMetadata stores the metadata.
func (m *consulMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
kv := &consul.KVPair{Key: providerIdentifier.GetIdentifierKey(), Value: []byte(serviceDefinitions)}
_, err := m.client.KV().Put(kv, nil)
return err
}
// StoreConsumerMetadata stores the metadata.
func (m *consulMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
kv := &consul.KVPair{Key: consumerMetadataIdentifier.GetIdentifierKey(), Value: []byte(serviceParameterString)}
_, err := m.client.KV().Put(kv, nil)
return err
}
// SaveServiceMetadata saves the metadata.
func (m *consulMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
kv := &consul.KVPair{Key: metadataIdentifier.GetIdentifierKey(), Value: []byte(url.String())}
_, err := m.client.KV().Put(kv, nil)
return err
}
// RemoveServiceMetadata removes the metadata.
func (m *consulMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
k := metadataIdentifier.GetIdentifierKey()
_, err := m.client.KV().Delete(k, nil)
return err
}
// GetExportedURLs gets the urls.
func (m *consulMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
k := metadataIdentifier.GetIdentifierKey()
kv, _, err := m.client.KV().Get(k, nil)
if err != nil || kv == nil {
return emptyStrSlice, err
}
return []string{string(kv.Value)}, nil
}
// SaveSubscribedData saves the urls.
func (m *consulMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error {
kv := &consul.KVPair{Key: subscriberMetadataIdentifier.GetIdentifierKey(), Value: []byte(urls)}
_, err := m.client.KV().Put(kv, nil)
return err
}
// GetSubscribedURLs gets the urls.
func (m *consulMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
k := subscriberMetadataIdentifier.GetIdentifierKey()
kv, _, err := m.client.KV().Get(k, nil)
if err != nil || kv == nil {
return emptyStrSlice, err
}
return []string{string(kv.Value)}, nil
}
// GetServiceDefinition gets the service definition.
func (m *consulMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
k := metadataIdentifier.GetIdentifierKey()
kv, _, err := m.client.KV().Get(k, nil)
if err != nil || kv == nil {
return "", err
}
return string(kv.Value), nil
}
type consulMetadataReportFactory struct {
}
// nolint
func (mf *consulMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
config := &consul.Config{Address: url.Location}
client, err := consul.NewClient(config)
if err != nil {
panic(err)
}
return &consulMetadataReport{client: client}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"encoding/json"
"net/url"
"strconv"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/remoting/consul"
)
func newProviderRegistryUrl(host string, port int) *common.URL {
return common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
)
}
func newBaseMetadataIdentifier(side string) *identifier.BaseMetadataIdentifier {
return &identifier.BaseMetadataIdentifier{
ServiceInterface: "org.apache.HelloWorld",
Version: "1.0.0",
Group: "group",
Side: side,
}
}
func newMetadataIdentifier(side string) *identifier.MetadataIdentifier {
return &identifier.MetadataIdentifier{
Application: "application",
BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
}
}
func newServiceMetadataIdentifier(side string) *identifier.ServiceMetadataIdentifier {
return &identifier.ServiceMetadataIdentifier{
Revision: "1.0",
Protocol: "dubbo",
BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
}
}
func newSubscribeMetadataIdentifier(side string) *identifier.SubscriberMetadataIdentifier {
return &identifier.SubscriberMetadataIdentifier{
Revision: "1.0",
MetadataIdentifier: *newMetadataIdentifier(side),
}
}
type consulMetadataReportTestSuite struct {
t *testing.T
m report.MetadataReport
}
func newConsulMetadataReportTestSuite(t *testing.T, m report.MetadataReport) *consulMetadataReportTestSuite {
return &consulMetadataReportTestSuite{t: t, m: m}
}
func (suite *consulMetadataReportTestSuite) testStoreProviderMetadata() {
providerMi := newMetadataIdentifier("provider")
providerMeta := "provider"
err := suite.m.StoreProviderMetadata(providerMi, providerMeta)
assert.NoError(suite.t, err)
}
func (suite *consulMetadataReportTestSuite) testStoreConsumerMetadata() {
consumerMi := newMetadataIdentifier("consumer")
consumerMeta := "consumer"
err := suite.m.StoreProviderMetadata(consumerMi, consumerMeta)
assert.NoError(suite.t, err)
}
func (suite *consulMetadataReportTestSuite) testSaveServiceMetadata(url common.URL) {
serviceMi := newServiceMetadataIdentifier("provider")
err := suite.m.SaveServiceMetadata(serviceMi, url)
assert.NoError(suite.t, err)
}
func (suite *consulMetadataReportTestSuite) testRemoveServiceMetadata() {
serviceMi := newServiceMetadataIdentifier("provider")
err := suite.m.RemoveServiceMetadata(serviceMi)
assert.NoError(suite.t, err)
}
func (suite *consulMetadataReportTestSuite) testGetExportedURLs() {
serviceMi := newServiceMetadataIdentifier("provider")
urls, err := suite.m.GetExportedURLs(serviceMi)
assert.Equal(suite.t, 1, len(urls))
assert.NoError(suite.t, err)
}
func (suite *consulMetadataReportTestSuite) testSaveSubscribedData(url common.URL) {
subscribeMi := newSubscribeMetadataIdentifier("provider")
urls := []string{url.String()}
bytes, _ := json.Marshal(urls)
err := suite.m.SaveSubscribedData(subscribeMi, string(bytes))
assert.Nil(suite.t, err)
}
func (suite *consulMetadataReportTestSuite) testGetSubscribedURLs() {
subscribeMi := newSubscribeMetadataIdentifier("provider")
urls, err := suite.m.GetSubscribedURLs(subscribeMi)
assert.Equal(suite.t, 1, len(urls))
assert.NoError(suite.t, err)
}
func (suite *consulMetadataReportTestSuite) testGetServiceDefinition() {
providerMi := newMetadataIdentifier("provider")
providerMeta, err := suite.m.GetServiceDefinition(providerMi)
assert.Equal(suite.t, "provider", providerMeta)
assert.NoError(suite.t, err)
}
func test1(t *testing.T) {
consulAgent := consul.NewConsulAgent(t, 8500)
defer consulAgent.Close()
url := newProviderRegistryUrl("localhost", 8500)
mf := extension.GetMetadataReportFactory("consul")
m := mf.CreateMetadataReport(url)
suite := newConsulMetadataReportTestSuite(t, m)
suite.testStoreProviderMetadata()
suite.testStoreConsumerMetadata()
suite.testSaveServiceMetadata(*url)
suite.testGetExportedURLs()
suite.testRemoveServiceMetadata()
suite.testSaveSubscribedData(*url)
suite.testGetSubscribedURLs()
suite.testGetServiceDefinition()
}
func TestConsulMetadataReport(t *testing.T) {
t.Run("test1", test1)
}
......@@ -26,6 +26,7 @@ import (
import (
"github.com/go-co-op/gocron"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
......@@ -234,29 +235,38 @@ func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMe
}
// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls
func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string {
func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
report := instance.GetMetadataReportInstance()
return report.GetExportedURLs(identifier)
}
// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
urlStrList := make([]string, 0, len(urls))
for _, url := range urls {
urlStrList = append(urlStrList, url.String())
}
bytes, err := json.Marshal(urlStrList)
if err != nil {
return perrors.WithMessage(err, "Could not convert the array to json")
}
report := instance.GetMetadataReportInstance()
if mr.syncReport {
return report.SaveSubscribedData(identifier, urls)
return report.SaveSubscribedData(identifier, string(bytes))
}
go report.SaveSubscribedData(identifier, urls)
go report.SaveSubscribedData(identifier, string(bytes))
return nil
}
// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls
func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string {
func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
report := instance.GetMetadataReportInstance()
return report.GetSubscribedURLs(identifier)
}
// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions
func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string {
func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) (string, error) {
report := instance.GetMetadataReportInstance()
return report.GetServiceDefinition(identifier)
}
......
......@@ -75,14 +75,14 @@ func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) {
func mockNewMetadataReport(t *testing.T) *MetadataReport {
syncReportKey := "false"
retryPeroidKey := "3"
retryPeriodKey := "3"
retryTimesKey := "100"
cycleReportKey := "true"
url, err := common.NewURL(fmt.Sprintf(
"test://127.0.0.1:20000/?"+constant.SYNC_REPORT_KEY+"=%v&"+constant.RETRY_PERIOD_KEY+"=%v&"+
constant.RETRY_TIMES_KEY+"=%v&"+constant.CYCLE_REPORT_KEY+"=%v",
syncReportKey, retryPeroidKey, retryTimesKey, cycleReportKey))
syncReportKey, retryPeriodKey, retryTimesKey, cycleReportKey))
assert.NoError(t, err)
instance.SetMetadataReportUrl(url)
mtr, err := NewMetadataReport()
......
......@@ -22,10 +22,6 @@ import (
"github.com/apache/dubbo-go/metadata/report"
)
var (
MetadataReportInstance report.MetadataReport
)
// MetadataReportFactory interface will create metadata report
type MetadataReportFactory interface {
CreateMetadataReport(*common.URL) report.MetadataReport
......
......@@ -18,7 +18,6 @@
package nacos
import (
"encoding/json"
"net/url"
)
......@@ -39,19 +38,19 @@ import (
)
func init() {
ftry := &nacosMetadataReportFactory{}
mf := &nacosMetadataReportFactory{}
extension.SetMetadataReportFactory("nacos", func() factory.MetadataReportFactory {
return ftry
return mf
})
}
// nacosMetadataReport is the implementation of MetadataReport based Nacos
// nacosMetadataReport is the implementation
// of MetadataReport based on nacos.
type nacosMetadataReport struct {
client config_client.IConfigClient
}
// StoreProviderMetadata will store the metadata
// metadata including the basic info of the server, provider info, and other user custom info
// StoreProviderMetadata stores the metadata.
func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: providerIdentifier.GetIdentifierKey(),
......@@ -60,8 +59,7 @@ func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifi
})
}
// StoreConsumerMetadata will store the metadata
// metadata including the basic info of the server, consumer info, and other user custom info
// StoreConsumerMetadata stores the metadata.
func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: consumerMetadataIdentifier.GetIdentifierKey(),
......@@ -70,8 +68,7 @@ func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *
})
}
// SaveServiceMetadata will store the metadata
// metadata including the basic info of the server, service info, and other user custom info
// SaveServiceMetadata saves the metadata.
func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
......@@ -80,7 +77,7 @@ func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier
})
}
// RemoveServiceMetadata will remove the service metadata
// RemoveServiceMetadata removes the metadata.
func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
return n.deleteMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
......@@ -88,50 +85,33 @@ func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifi
})
}
// GetExportedURLs will look up the exported urls.
// if not found, an empty list will be returned.
func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string {
// GetExportedURLs gets the urls.
func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
return n.getConfigAsArray(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
})
}
// SaveSubscribedData will convert the urlList to json array and then store it
func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlList []common.URL) error {
if len(urlList) == 0 {
logger.Warnf("The url list is empty")
return nil
}
urlStrList := make([]string, 0, len(urlList))
for _, e := range urlList {
urlStrList = append(urlStrList, e.String())
}
bytes, err := json.Marshal(urlStrList)
if err != nil {
return perrors.WithMessage(err, "Could not convert the array to json")
}
// SaveSubscribedData saves the urls.
func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: subscriberMetadataIdentifier.GetIdentifierKey(),
Group: subscriberMetadataIdentifier.Group,
Content: string(bytes),
Content: urls,
})
}
// GetSubscribedURLs will lookup the url
// if not found, an empty list will be returned
func (n *nacosMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string {
// GetSubscribedURLs gets the urls.
func (n *nacosMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
return n.getConfigAsArray(vo.ConfigParam{
DataId: subscriberMetadataIdentifier.GetIdentifierKey(),
Group: subscriberMetadataIdentifier.Group,
})
}
// GetServiceDefinition will lookup the service definition
func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string {
// GetServiceDefinition gets the service definition.
func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
return n.getConfig(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
......@@ -165,33 +145,38 @@ func (n *nacosMetadataReport) deleteMetadata(param vo.ConfigParam) error {
// getConfigAsArray will read the config and then convert it as an one-element array
// error or config not found, an empty list will be returned.
func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) []string {
cfg := n.getConfig(param)
func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) ([]string, error) {
res := make([]string, 0, 1)
if len(cfg) == 0 {
return res
cfg, err := n.getConfig(param)
if err != nil || len(cfg) == 0 {
return res, err
}
decodeCfg, err := url.QueryUnescape(cfg)
if err != nil {
logger.Errorf("The config is invalid: %s", cfg)
return res
return res, err
}
res = append(res, decodeCfg)
return res
return res, nil
}
// getConfig will read the config
func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) string {
func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) (string, error) {
cfg, err := n.client.GetConfig(param)
if err != nil {
logger.Errorf("Finding the configuration failed: %v", param)
return "", err
}
return cfg
return cfg, nil
}
type nacosMetadataReportFactory struct {
}
// nolint
func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
client, err := nacos.NewNacosConfigClient(url)
if err != nil {
......
......@@ -18,6 +18,7 @@
package nacos
import (
"encoding/json"
"strconv"
"testing"
)
......@@ -41,6 +42,7 @@ func TestNacosMetadataReport_CRUD(t *testing.T) {
providerMi := newMetadataIdentifier("server")
providerMeta := "provider"
err := rpt.StoreProviderMetadata(providerMi, providerMeta)
assert.Nil(t, err)
consumerMi := newMetadataIdentifier("client")
consumerMeta := "consumer"
......@@ -49,25 +51,25 @@ func TestNacosMetadataReport_CRUD(t *testing.T) {
serviceMi := newServiceMetadataIdentifier()
serviceUrl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
err = rpt.SaveServiceMetadata(serviceMi, serviceUrl)
assert.Nil(t, err)
exportedUrls := rpt.GetExportedURLs(serviceMi)
exportedUrls, err := rpt.GetExportedURLs(serviceMi)
assert.Equal(t, 1, len(exportedUrls))
assert.Nil(t, err)
subMi := newSubscribeMetadataIdentifier()
urlList := make([]common.URL, 0, 1)
urlList = append(urlList, serviceUrl)
err = rpt.SaveSubscribedData(subMi, urlList)
urls := []string{serviceUrl.String()}
bytes, _ := json.Marshal(urls)
err = rpt.SaveSubscribedData(subMi, string(bytes))
assert.Nil(t, err)
subscribeUrl := rpt.GetSubscribedURLs(subMi)
subscribeUrl, err := rpt.GetSubscribedURLs(subMi)
assert.Equal(t, 1, len(subscribeUrl))
assert.Nil(t, err)
err = rpt.RemoveServiceMetadata(serviceMi)
assert.Nil(t, err)
}
func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier {
......@@ -75,7 +77,6 @@ func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier {
Revision: "subscribe",
MetadataIdentifier: *newMetadataIdentifier("provider"),
}
}
func newServiceMetadataIdentifier() *identifier.ServiceMetadataIdentifier {
......
......@@ -22,14 +22,39 @@ import (
"github.com/apache/dubbo-go/metadata/identifier"
)
// MetadataReport is an interface of remote metadata report
// MetadataReport is an interface of
// remote metadata report.
type MetadataReport interface {
// StoreProviderMetadata stores the metadata.
// Metadata includes the basic info of the server,
// provider info, and other user custom info.
StoreProviderMetadata(*identifier.MetadataIdentifier, string) error
// StoreConsumerMetadata stores the metadata.
// Metadata includes the basic info of the server,
// consumer info, and other user custom info.
StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error
// SaveServiceMetadata saves the metadata.
// Metadata includes the basic info of the server,
// service info, and other user custom info.
SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error
// RemoveServiceMetadata removes the metadata.
RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error
GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string
SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error
GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string
GetServiceDefinition(*identifier.MetadataIdentifier) string
// GetExportedURLs gets the urls.
// If not found, an empty list will be returned.
GetExportedURLs(*identifier.ServiceMetadataIdentifier) ([]string, error)
// SaveSubscribedData saves the urls.
// If not found, an empty str will be returned.
SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, string) error
// GetSubscribedURLs gets the urls.
// If not found, an empty list will be returned.
GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) ([]string, error)
// GetServiceDefinition gets the service definition.
GetServiceDefinition(*identifier.MetadataIdentifier) (string, error)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"strings"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
"github.com/apache/dubbo-go/remoting/zookeeper"
)
var (
emptyStrSlice = make([]string, 0)
)
func init() {
mf := &zookeeperMetadataReportFactory{}
extension.SetMetadataReportFactory("zookeeper", func() factory.MetadataReportFactory {
return mf
})
}
// zookeeperMetadataReport is the implementation of
// MetadataReport based on zookeeper.
type zookeeperMetadataReport struct {
client *zookeeper.ZookeeperClient
rootDir string
}
// StoreProviderMetadata stores the metadata.
func (m *zookeeperMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
k := m.rootDir + providerIdentifier.GetFilePathKey()
return m.client.CreateWithValue(k, []byte(serviceDefinitions))
}
// StoreConsumerMetadata stores the metadata.
func (m *zookeeperMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
k := m.rootDir + consumerMetadataIdentifier.GetFilePathKey()
return m.client.CreateWithValue(k, []byte(serviceParameterString))
}
// SaveServiceMetadata saves the metadata.
func (m *zookeeperMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
k := m.rootDir + metadataIdentifier.GetFilePathKey()
return m.client.CreateWithValue(k, []byte(url.String()))
}
// RemoveServiceMetadata removes the metadata.
func (m *zookeeperMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
k := m.rootDir + metadataIdentifier.GetFilePathKey()
return m.client.Delete(k)
}
// GetExportedURLs gets the urls.
func (m *zookeeperMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) ([]string, error) {
k := m.rootDir + metadataIdentifier.GetFilePathKey()
v, _, err := m.client.GetContent(k)
if err != nil || len(v) == 0 {
return emptyStrSlice, err
}
return []string{string(v)}, nil
}
// SaveSubscribedData saves the urls.
func (m *zookeeperMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error {
k := m.rootDir + subscriberMetadataIdentifier.GetFilePathKey()
return m.client.CreateWithValue(k, []byte(urls))
}
// GetSubscribedURLs gets the urls.
func (m *zookeeperMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) ([]string, error) {
k := m.rootDir + subscriberMetadataIdentifier.GetFilePathKey()
v, _, err := m.client.GetContent(k)
if err != nil || len(v) == 0 {
return emptyStrSlice, err
}
return []string{string(v)}, nil
}
// GetServiceDefinition gets the service definition.
func (m *zookeeperMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) (string, error) {
k := m.rootDir + metadataIdentifier.GetFilePathKey()
v, _, err := m.client.GetContent(k)
return string(v), err
}
type zookeeperMetadataReportFactory struct {
}
// nolint
func (mf *zookeeperMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
client, err := zookeeper.NewZookeeperClient(
"zookeeperMetadataReport",
strings.Split(url.Location, ","),
15*time.Second,
)
if err != nil {
panic(err)
}
rootDir := url.GetParam(constant.GROUP_KEY, "dubbo")
if !strings.HasPrefix(rootDir, constant.PATH_SEPARATOR) {
rootDir = constant.PATH_SEPARATOR + rootDir
}
if rootDir != constant.PATH_SEPARATOR {
rootDir = rootDir + constant.PATH_SEPARATOR
}
return &zookeeperMetadataReport{client: client, rootDir: rootDir}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"encoding/json"
"net/url"
"strconv"
"testing"
)
import (
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
)
func newProviderRegistryUrl(host string, port int) *common.URL {
return common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
)
}
func newBaseMetadataIdentifier(side string) *identifier.BaseMetadataIdentifier {
return &identifier.BaseMetadataIdentifier{
ServiceInterface: "org.apache.HelloWorld",
Version: "1.0.0",
Group: "group",
Side: side,
}
}
func newMetadataIdentifier(side string) *identifier.MetadataIdentifier {
return &identifier.MetadataIdentifier{
Application: "application",
BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
}
}
func newServiceMetadataIdentifier(side string) *identifier.ServiceMetadataIdentifier {
return &identifier.ServiceMetadataIdentifier{
Revision: "1.0",
Protocol: "dubbo",
BaseMetadataIdentifier: *newBaseMetadataIdentifier(side),
}
}
func newSubscribeMetadataIdentifier(side string) *identifier.SubscriberMetadataIdentifier {
return &identifier.SubscriberMetadataIdentifier{
Revision: "1.0",
MetadataIdentifier: *newMetadataIdentifier(side),
}
}
type zookeeperMetadataReportTestSuite struct {
t *testing.T
m report.MetadataReport
}
func newZookeeperMetadataReportTestSuite(t *testing.T, m report.MetadataReport) *zookeeperMetadataReportTestSuite {
return &zookeeperMetadataReportTestSuite{t: t, m: m}
}
func (suite *zookeeperMetadataReportTestSuite) testStoreProviderMetadata() {
providerMi := newMetadataIdentifier("provider")
providerMeta := "provider"
err := suite.m.StoreProviderMetadata(providerMi, providerMeta)
assert.NoError(suite.t, err)
}
func (suite *zookeeperMetadataReportTestSuite) testStoreConsumerMetadata() {
consumerMi := newMetadataIdentifier("consumer")
consumerMeta := "consumer"
err := suite.m.StoreProviderMetadata(consumerMi, consumerMeta)
assert.NoError(suite.t, err)
}
func (suite *zookeeperMetadataReportTestSuite) testSaveServiceMetadata(url common.URL) {
serviceMi := newServiceMetadataIdentifier("provider")
err := suite.m.SaveServiceMetadata(serviceMi, url)
assert.NoError(suite.t, err)
}
func (suite *zookeeperMetadataReportTestSuite) testRemoveServiceMetadata() {
serviceMi := newServiceMetadataIdentifier("provider")
err := suite.m.RemoveServiceMetadata(serviceMi)
assert.NoError(suite.t, err)
}
func (suite *zookeeperMetadataReportTestSuite) testGetExportedURLs() {
serviceMi := newServiceMetadataIdentifier("provider")
urls, err := suite.m.GetExportedURLs(serviceMi)
assert.Equal(suite.t, 1, len(urls))
assert.NoError(suite.t, err)
}
func (suite *zookeeperMetadataReportTestSuite) testSaveSubscribedData(url common.URL) {
subscribeMi := newSubscribeMetadataIdentifier("provider")
urls := []string{url.String()}
bytes, _ := json.Marshal(urls)
err := suite.m.SaveSubscribedData(subscribeMi, string(bytes))
assert.Nil(suite.t, err)
}
func (suite *zookeeperMetadataReportTestSuite) testGetSubscribedURLs() {
subscribeMi := newSubscribeMetadataIdentifier("provider")
urls, err := suite.m.GetSubscribedURLs(subscribeMi)
assert.Equal(suite.t, 1, len(urls))
assert.NoError(suite.t, err)
}
func (suite *zookeeperMetadataReportTestSuite) testGetServiceDefinition() {
providerMi := newMetadataIdentifier("provider")
providerMeta, err := suite.m.GetServiceDefinition(providerMi)
assert.Equal(suite.t, "provider", providerMeta)
assert.NoError(suite.t, err)
}
func test1(t *testing.T) {
testCluster, err := zk.StartTestCluster(1, nil, nil)
assert.NoError(t, err)
defer testCluster.Stop()
url := newProviderRegistryUrl("127.0.0.1", testCluster.Servers[0].Port)
mf := extension.GetMetadataReportFactory("zookeeper")
m := mf.CreateMetadataReport(url)
suite := newZookeeperMetadataReportTestSuite(t, m)
suite.testStoreProviderMetadata()
suite.testStoreConsumerMetadata()
suite.testSaveServiceMetadata(*url)
suite.testGetExportedURLs()
suite.testRemoveServiceMetadata()
suite.testSaveSubscribedData(*url)
suite.testGetSubscribedURLs()
suite.testGetServiceDefinition()
}
func TestZookeeperMetadataReport(t *testing.T) {
t.Run("test1", test1)
}
......@@ -39,7 +39,7 @@ import (
)
var serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4)
var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4)
var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier]string, 4)
func getMetadataReportFactory() factory.MetadataReportFactory {
return &metadataReportFactory{}
......@@ -73,28 +73,27 @@ func (metadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifie
return nil
}
func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string {
return nil
func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) ([]string, error) {
return nil, nil
}
func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls string) error {
logger.Infof("SaveSubscribedData, , url is %v", urls)
subscribedMetadata[id] = urls
return nil
}
func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string {
return nil
func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) ([]string, error) {
return nil, nil
}
func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) string {
return ""
func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) (string, error) {
return "", nil
}
func TestMetadataService(t *testing.T) {
extension.SetMetadataReportFactory("mock", getMetadataReportFactory)
u, err := common.NewURL(fmt.Sprintf(
"mock://127.0.0.1:20000/?sync.report=true"))
u, err := common.NewURL(fmt.Sprintf("mock://127.0.0.1:20000/?sync.report=true"))
assert.NoError(t, err)
instance.GetMetadataReportInstance(&u)
mts, err := NewMetadataService()
......@@ -112,6 +111,7 @@ func mockInmemoryProc(t *testing.T) *inmemory.MetadataService {
version := "0.0.1"
protocol := "dubbo"
beanName := "UserProvider"
userProvider := &definition.UserProvider{}
u, err := common.NewURL(fmt.Sprintf(
"%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+
......@@ -120,13 +120,17 @@ func mockInmemoryProc(t *testing.T) *inmemory.MetadataService {
"owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v&bean.name=%v",
protocol, serviceName, group, version, beanName))
assert.NoError(t, err)
mts.ExportURL(u)
mts.SubscribeURL(u)
_, err = mts.ExportURL(u)
assert.NoError(t, err)
_, err = mts.SubscribeURL(u)
assert.NoError(t, err)
_, err = common.ServiceMap.Register(serviceName, protocol, userProvider)
assert.NoError(t, err)
err = mts.PublishServiceDefinition(u)
assert.NoError(t, err)
userProvider := &definition.UserProvider{}
common.ServiceMap.Register(serviceName, protocol, userProvider)
mts.PublishServiceDefinition(u)
expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," +
"\"Methods\":[{\"Name\":\"GetUser\",\"ParameterTypes\":[\"slice\"],\"ReturnType\":\"ptr\"," +
"\"Parameters\":null}],\"Types\":null}"
......
......@@ -19,24 +19,19 @@ package consul
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"strconv"
"sync"
"testing"
)
import (
"github.com/hashicorp/consul/agent"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/consul"
)
var (
......@@ -51,71 +46,39 @@ var (
)
func newProviderRegistryUrl(host string, port int) *common.URL {
url1 := common.NewURLWithOptions(
return common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)),
)
return url1
}
func newConsumerRegistryUrl(host string, port int) *common.URL {
url1 := common.NewURLWithOptions(
return common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithParams(url.Values{}),
common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER)),
)
return url1
}
func newProviderUrl(host string, port int, service string, protocol string) common.URL {
url1 := common.NewURLWithOptions(
return *common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithPath(service),
common.WithProtocol(protocol),
)
return *url1
}
func newConsumerUrl(host string, port int, service string, protocol string) common.URL {
url1 := common.NewURLWithOptions(
return *common.NewURLWithOptions(
common.WithIp(host),
common.WithPort(strconv.Itoa(port)),
common.WithPath(service),
common.WithProtocol(protocol),
)
return *url1
}
type testConsulAgent struct {
dataDir string
testAgent *agent.TestAgent
}
func newConsulAgent(t *testing.T, port int) *testConsulAgent {
dataDir, _ := ioutil.TempDir("./", "agent")
hcl := `
ports {
http = ` + strconv.Itoa(port) + `
}
data_dir = "` + dataDir + `"
`
testAgent := &agent.TestAgent{Name: t.Name(), DataDir: dataDir, HCL: hcl}
testAgent.Start(t)
consulAgent := &testConsulAgent{
dataDir: dataDir,
testAgent: testAgent,
}
return consulAgent
}
func (consulAgent *testConsulAgent) close() {
consulAgent.testAgent.Shutdown()
os.RemoveAll(consulAgent.dataDir)
}
type testServer struct {
......@@ -184,8 +147,8 @@ func (suite *consulRegistryTestSuite) close() {
// register -> subscribe -> unregister
func test1(t *testing.T) {
consulAgent := newConsulAgent(t, registryPort)
defer consulAgent.close()
consulAgent := consul.NewConsulAgent(t, registryPort)
defer consulAgent.Close()
server := newServer(providerHost, providerPort)
defer server.close()
......@@ -204,8 +167,8 @@ func test1(t *testing.T) {
// subscribe -> register
func test2(t *testing.T) {
consulAgent := newConsulAgent(t, registryPort)
defer consulAgent.close()
consulAgent := consul.NewConsulAgent(t, registryPort)
defer consulAgent.Close()
server := newServer(providerHost, providerPort)
defer server.close()
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"io/ioutil"
"os"
"strconv"
"testing"
)
import (
"github.com/hashicorp/consul/agent"
)
// Consul agent, used for test, simulates
// an embedded consul server.
type ConsulAgent struct {
dataDir string
testAgent *agent.TestAgent
}
func NewConsulAgent(t *testing.T, port int) *ConsulAgent {
dataDir, _ := ioutil.TempDir("./", "agent")
hcl := `
ports {
http = ` + strconv.Itoa(port) + `
}
data_dir = "` + dataDir + `"
`
testAgent := &agent.TestAgent{Name: t.Name(), DataDir: dataDir, HCL: hcl}
testAgent.Start(t)
consulAgent := &ConsulAgent{
dataDir: dataDir,
testAgent: testAgent,
}
return consulAgent
}
func (consulAgent *ConsulAgent) Close() error {
var err error
err = consulAgent.testAgent.Shutdown()
if err != nil {
return err
}
err = os.RemoveAll(consulAgent.dataDir)
if err != nil {
return err
}
return nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestNewConsulAgent(t *testing.T) {
consulAgent := NewConsulAgent(t, 8500)
err := consulAgent.Close()
assert.NoError(t, err)
}
......@@ -89,8 +89,6 @@ func StateToString(state zk.State) string {
default:
return state.String()
}
return "zookeeper unknown state"
}
// Options ...
......@@ -137,7 +135,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location)
}
zkAddresses := strings.Split(url.Location, ",")
newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout)
newClient, err := NewZookeeperClient(options.zkName, zkAddresses, timeout)
if err != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
options.zkName, url.Location, timeout.String(), err)
......@@ -165,7 +163,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL)
}
func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
var (
err error
event <-chan zk.Event
......@@ -255,7 +253,7 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
case <-z.exit:
return
case event = <-session:
logger.Warnf("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
logger.Infof("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment