diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go new file mode 100644 index 0000000000000000000000000000000000000000..0d49ff17838e4427613b343d16f00b34bd90a00d --- /dev/null +++ b/metadata/report/etcd/report.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcd + +import ( + "encoding/json" + "strings" + "time" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/identifier" + "github.com/apache/dubbo-go/metadata/report" + "github.com/apache/dubbo-go/metadata/report/factory" + "github.com/apache/dubbo-go/remoting/etcdv3" +) + +const DEFAULT_ROOT = "dubbo" + +func init() { + extension.SetMetadataReportFactory("etcd", func() factory.MetadataReportFactory { + return &etcdMetadataReportFactory{} + }) +} + +// etcdMetadataReport is the implementation of MetadataReport based etcd +type etcdMetadataReport struct { + client *etcdv3.Client + root string +} + +// StoreProviderMetadata will store the metadata +// metadata including the basic info of the server, provider info, and other user custom info +func (e *etcdMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error { + key := e.getNodeKey(providerIdentifier) + return e.client.Create(key, serviceDefinitions) +} + +// StoreConsumerMetadata will store the metadata +// metadata including the basic info of the server, consumer info, and other user custom info +func (e *etcdMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error { + key := e.getNodeKey(consumerMetadataIdentifier) + return e.client.Create(key, serviceParameterString) +} + +// SaveServiceMetadata will store the metadata +// metadata including the basic info of the server, service info, and other user custom info +func (e *etcdMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error { + key := e.getNodeKey(metadataIdentifier) + return e.client.Create(key, url.String()) +} + +// RemoveServiceMetadata will remove the service metadata +func (e *etcdMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error { + return e.client.Delete(e.getNodeKey(metadataIdentifier)) +} + +// GetExportedURLs will look up the exported urls. +// if not found, an empty list will be returned. +func (e *etcdMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string { + content, err := e.client.Get(e.getNodeKey(metadataIdentifier)) + if err != nil { + logger.Errorf("etcdMetadataReport GetExportedURLs err:{%v}", err.Error()) + return nil + } + if content == "" { + return []string{} + } + return []string{content} +} + +// SaveSubscribedData will convert the urlList to json array and then store it +func (e *etcdMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlList []common.URL) error { + if len(urlList) == 0 { + logger.Warnf("The url list is empty") + return nil + } + urlStrList := make([]string, 0, len(urlList)) + + for _, e := range urlList { + urlStrList = append(urlStrList, e.String()) + } + + bytes, err := json.Marshal(urlStrList) + + if err != nil { + return perrors.WithMessage(err, "Could not convert the array to json") + } + key := e.getNodeKey(subscriberMetadataIdentifier) + return e.client.Create(key, string(bytes)) +} + +// GetSubscribedURLs will lookup the url +// if not found, an empty list will be returned +func (e *etcdMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string { + content, err := e.client.Get(e.getNodeKey(subscriberMetadataIdentifier)) + if err != nil { + logger.Errorf("etcdMetadataReport GetSubscribedURLs err:{%v}", err.Error()) + } + return []string{content} +} + +// GetServiceDefinition will lookup the service definition +func (e *etcdMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string { + key := e.getNodeKey(metadataIdentifier) + content, err := e.client.Get(key) + if err != nil { + logger.Errorf("etcdMetadataReport GetServiceDefinition err:{%v}", err.Error()) + return "" + } + return content +} + +type etcdMetadataReportFactory struct{} + +// CreateMetadataReport get the MetadataReport instance of etcd +func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport { + timeout, _ := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + addresses := strings.Split(url.Location, ",") + client, err := etcdv3.NewClient(etcdv3.MetadataETCDV3Client, addresses, timeout, 1) + if err != nil { + logger.Errorf("Could not create etcd metadata report. URL: %s,error:{%v}", url.String(), err) + return nil + } + group := url.GetParam(constant.GROUP_KEY, DEFAULT_ROOT) + group = constant.PATH_SEPARATOR + strings.TrimPrefix(group, constant.PATH_SEPARATOR) + return &etcdMetadataReport{client: client, root: group} +} + +func (e *etcdMetadataReport) getNodeKey(MetadataIdentifier identifier.IMetadataIdentifier) string { + var rootDir string + if e.root == constant.PATH_SEPARATOR { + rootDir = e.root + } else { + rootDir = e.root + constant.PATH_SEPARATOR + } + return rootDir + MetadataIdentifier.GetFilePathKey() +} diff --git a/metadata/report/etcd/report_test.go b/metadata/report/etcd/report_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8219cb83c723b1472f8ed7d999f5c3e3859db65f --- /dev/null +++ b/metadata/report/etcd/report_test.go @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package etcd + +import ( + "net/url" + "strconv" + "testing" +) + +import ( + "github.com/coreos/etcd/embed" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/metadata/identifier" +) + +const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-registry.etcd" + +func initEtcd(t *testing.T) *embed.Etcd { + DefaultListenPeerURLs := "http://localhost:2380" + DefaultListenClientURLs := "http://localhost:2379" + lpurl, _ := url.Parse(DefaultListenPeerURLs) + lcurl, _ := url.Parse(DefaultListenClientURLs) + cfg := embed.NewConfig() + cfg.LPUrls = []url.URL{*lpurl} + cfg.LCUrls = []url.URL{*lcurl} + cfg.Dir = defaultEtcdV3WorkDir + e, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + return e +} + +func TestEtcdMetadataReportFactory_CreateMetadataReport(t *testing.T) { + e := initEtcd(t) + url, err := common.NewURL("registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + if err != nil { + t.Fatal(err) + } + metadataReportFactory := &etcdMetadataReportFactory{} + metadataReport := metadataReportFactory.CreateMetadataReport(&url) + assert.NotNil(t, metadataReport) + e.Close() +} + +func TestEtcdMetadataReport_CRUD(t *testing.T) { + e := initEtcd(t) + url, err := common.NewURL("registry://127.0.0.1:2379", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + if err != nil { + t.Fatal(err) + } + metadataReportFactory := &etcdMetadataReportFactory{} + metadataReport := metadataReportFactory.CreateMetadataReport(&url) + assert.NotNil(t, metadataReport) + + err = metadataReport.StoreConsumerMetadata(newMetadataIdentifier("consumer"), "consumer metadata") + assert.Nil(t, err) + + err = metadataReport.StoreProviderMetadata(newMetadataIdentifier("provider"), "provider metadata") + assert.Nil(t, err) + + serviceMi := newServiceMetadataIdentifier() + serviceUrl, _ := common.NewURL("registry://localhost:8848", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + metadataReport.SaveServiceMetadata(serviceMi, serviceUrl) + assert.Nil(t, err) + + subMi := newSubscribeMetadataIdentifier() + urlList := make([]common.URL, 0, 1) + urlList = append(urlList, serviceUrl) + err = metadataReport.SaveSubscribedData(subMi, urlList) + assert.Nil(t, err) + + err = metadataReport.RemoveServiceMetadata(serviceMi) + assert.Nil(t, err) + + e.Close() +} + +func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier { + return &identifier.SubscriberMetadataIdentifier{ + Revision: "subscribe", + MetadataIdentifier: *newMetadataIdentifier("provider"), + } + +} + +func newServiceMetadataIdentifier() *identifier.ServiceMetadataIdentifier { + return &identifier.ServiceMetadataIdentifier{ + Protocol: "nacos", + Revision: "a", + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: "com.test.MyTest", + Version: "1.0.0", + Group: "test_group", + Side: "service", + }, + } +} + +func newMetadataIdentifier(side string) *identifier.MetadataIdentifier { + return &identifier.MetadataIdentifier{ + Application: "test", + BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ + ServiceInterface: "com.test.MyTest", + Version: "1.0.0", + Group: "test_group", + Side: side, + }, + } +} diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index ba3ea6e864923b1e70cc4a0d31ee98415807699c..731ef0d807a3287e0bcb557951b8640729beae46 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -42,6 +42,8 @@ const ( MaxFailTimes = 15 // RegistryETCDV3Client client name RegistryETCDV3Client = "etcd registry" + // metadataETCDV3Client client name + MetadataETCDV3Client = "etcd metadata" ) var ( @@ -107,7 +109,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { // new Client if container.Client() == nil { - newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) + newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) @@ -119,7 +121,7 @@ func ValidateClient(container clientFacade, opts ...Option) error { // Client lose connection with etcd server if container.Client().rawClient == nil { - newClient, err := newClient(options.name, options.endpoints, options.timeout, options.heartbeat) + newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) if err != nil { logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) @@ -149,7 +151,7 @@ type Client struct { Wait sync.WaitGroup } -func newClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) { +func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat int) (*Client, error) { ctx, cancel := context.WithCancel(context.Background()) rawClient, err := clientv3.New(clientv3.Config{ diff --git a/remoting/etcdv3/client_test.go b/remoting/etcdv3/client_test.go index e37b6383df55f1c7e7b64be62fc2eb22d1034616..287d93e30dd125d743bf718dcdab574a4b5a4afc 100644 --- a/remoting/etcdv3/client_test.go +++ b/remoting/etcdv3/client_test.go @@ -120,7 +120,7 @@ func (suite *ClientTestSuite) TearDownSuite() { } func (suite *ClientTestSuite) setUpClient() *Client { - c, err := newClient(suite.etcdConfig.name, + c, err := NewClient(suite.etcdConfig.name, suite.etcdConfig.endpoints, suite.etcdConfig.timeout, suite.etcdConfig.heartbeat)