From 117f85f2cbfa3f7ba59d496e614d34a40c7926ee Mon Sep 17 00:00:00 2001 From: "xg.gao" <xg.gao@tianrang-inc.com> Date: Fri, 26 Jun 2020 18:36:13 +0800 Subject: [PATCH] zookeeper metadata report --- metadata/report/consul/report.go | 10 +- metadata/report/nacos/report.go | 6 +- metadata/report/zookeeper/report.go | 126 +++++++++++++++++++++++ metadata/report/zookeeper/report_test.go | 18 ++++ remoting/zookeeper/client.go | 6 +- 5 files changed, 154 insertions(+), 12 deletions(-) create mode 100644 metadata/report/zookeeper/report.go create mode 100644 metadata/report/zookeeper/report_test.go diff --git a/metadata/report/consul/report.go b/metadata/report/consul/report.go index 8225b22a0..ca57f4036 100644 --- a/metadata/report/consul/report.go +++ b/metadata/report/consul/report.go @@ -37,7 +37,7 @@ func init() { } // consulMetadataReport is the implementation of -// MetadataReport based on Consul. +// MetadataReport based on consul. type consulMetadataReport struct { client *consul.Client } @@ -85,8 +85,8 @@ func (m *consulMetadataReport) GetExportedURLs(metadataIdentifier *identifier.Se } // SaveSubscribedData saves the urls. -func (m *consulMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlListStr string) error { - kv := &consul.KVPair{Key: subscriberMetadataIdentifier.GetIdentifierKey(), Value: []byte(urlListStr)} +func (m *consulMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error { + kv := &consul.KVPair{Key: subscriberMetadataIdentifier.GetIdentifierKey(), Value: []byte(urls)} _, err := m.client.KV().Put(kv, nil) return err } @@ -122,11 +122,11 @@ func (m *consulMetadataReport) GetServiceDefinition(metadataIdentifier *identifi type consulMetadataReportFactory struct { } -func (m *consulMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport { +func (mf *consulMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport { config := &consul.Config{Address: url.Location} client, err := consul.NewClient(config) if err != nil { panic(err) } return &consulMetadataReport{client: client} -} \ No newline at end of file +} diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go index f5a065b27..7db4c85d9 100644 --- a/metadata/report/nacos/report.go +++ b/metadata/report/nacos/report.go @@ -45,7 +45,7 @@ func init() { } // nacosMetadataReport is the implementation -// of MetadataReport based on Nacos. +// of MetadataReport based on nacos. type nacosMetadataReport struct { client config_client.IConfigClient } @@ -94,11 +94,11 @@ func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.Ser } // SaveSubscribedData saves the urls. -func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlListStr string) error { +func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error { return n.storeMetadata(vo.ConfigParam{ DataId: subscriberMetadataIdentifier.GetIdentifierKey(), Group: subscriberMetadataIdentifier.Group, - Content: urlListStr, + Content: urls, }) } diff --git a/metadata/report/zookeeper/report.go b/metadata/report/zookeeper/report.go new file mode 100644 index 000000000..76948379e --- /dev/null +++ b/metadata/report/zookeeper/report.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "strings" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" + "github.com/apache/dubbo-go/metadata/report/factory" + "github.com/apache/dubbo-go/remoting/zookeeper" +) + +func init() { + mf := &zookeeperMetadataReportFactory{} + extension.SetMetadataReportFactory("consul", func() factory.MetadataReportFactory { + return mf + }) +} + +// zookeeperMetadataReport is the implementation of +// MetadataReport based on zookeeper. +type zookeeperMetadataReport struct { + client *zookeeper.ZookeeperClient + rootDir string +} + +// StoreProviderMetadata stores the metadata. +func (m *zookeeperMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error { + k := m.rootDir + providerIdentifier.GetFilePathKey() + return m.client.CreateWithValue(k, []byte(serviceDefinitions)) +} + +// StoreConsumerMetadata stores the metadata. +func (m *zookeeperMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error { + k := m.rootDir + consumerMetadataIdentifier.GetFilePathKey() + return m.client.CreateWithValue(k, []byte(serviceParameterString)) +} + +// SaveServiceMetadata saves the metadata. +func (m *zookeeperMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error { + k := m.rootDir + metadataIdentifier.GetFilePathKey() + return m.client.CreateWithValue(k, []byte(url.String())) +} + +// RemoveServiceMetadata removes the metadata. +func (m *zookeeperMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error { + k := m.rootDir + metadataIdentifier.GetFilePathKey() + return m.client.Delete(k) +} + +// GetExportedURLs gets the urls. +func (m *zookeeperMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string { + k := m.rootDir + metadataIdentifier.GetFilePathKey() + v, _, err := m.client.GetContent(k) + if err != nil { + panic(err) + } + return []string{string(v)} +} + +// SaveSubscribedData saves the urls. +func (m *zookeeperMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urls string) error { + k := m.rootDir + subscriberMetadataIdentifier.GetFilePathKey() + return m.client.CreateWithValue(k, []byte(urls)) +} + +// GetSubscribedURLs gets the urls. +func (m *zookeeperMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string { + k := m.rootDir + subscriberMetadataIdentifier.GetFilePathKey() + v, _, err := m.client.GetContent(k) + if err != nil { + panic(err) + } + return []string{string(v)} +} + +// GetServiceDefinition gets the service definition. +func (m *zookeeperMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string { + k := m.rootDir + metadataIdentifier.GetFilePathKey() + v, _, err := m.client.GetContent(k) + if err != nil { + panic(err) + } + return string(v) +} + +type zookeeperMetadataReportFactory struct { +} + +func (mf *zookeeperMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport { + client, err := zookeeper.NewZookeeperClient("zookeeperMetadataReport", strings.Split(url.Location, ","), 15) + if err != nil { + panic(err) + } + + rootDir := url.GetParam(constant.GROUP_KEY, "dubbo") + if !strings.HasPrefix(rootDir, constant.PATH_SEPARATOR) { + rootDir = constant.PATH_SEPARATOR + rootDir + } + if rootDir != constant.PATH_SEPARATOR { + rootDir = rootDir + constant.PATH_SEPARATOR + } + + return &zookeeperMetadataReport{client: client, rootDir: rootDir} +} diff --git a/metadata/report/zookeeper/report_test.go b/metadata/report/zookeeper/report_test.go new file mode 100644 index 000000000..e386d95ef --- /dev/null +++ b/metadata/report/zookeeper/report_test.go @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 7904dc74e..b6c49e705 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -89,8 +89,6 @@ func StateToString(state zk.State) string { default: return state.String() } - - return "zookeeper unknown state" } // Options ... @@ -137,7 +135,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.Location) } zkAddresses := strings.Split(url.Location, ",") - newClient, err := newZookeeperClient(options.zkName, zkAddresses, timeout) + newClient, err := NewZookeeperClient(options.zkName, zkAddresses, timeout) if err != nil { logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}", options.zkName, url.Location, timeout.String(), err) @@ -165,7 +163,7 @@ func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error { return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL) } -func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) { +func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) { var ( err error event <-chan zk.Event -- GitLab