Skip to content
Snippets Groups Projects
Unverified Commit 669301f4 authored by Ming Deng's avatar Ming Deng Committed by GitHub
Browse files

Merge pull request #522 from flycash/nacos-meta

Nacos MetadataReport implementation
parents ff9eec75 d997d51e
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"encoding/json"
"net/url"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
"github.com/apache/dubbo-go/remoting/nacos"
)
func init() {
ftry := &nacosMetadataReportFactory{}
extension.SetMetadataReportFactory("nacos", func() factory.MetadataReportFactory {
return ftry
})
}
// nacosMetadataReport is the implementation of MetadataReport based Nacos
type nacosMetadataReport struct {
client config_client.IConfigClient
}
// StoreProviderMetadata will store the metadata
// metadata including the basic info of the server, provider info, and other user custom info
func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: providerIdentifier.GetIdentifierKey(),
Group: providerIdentifier.Group,
Content: serviceDefinitions,
})
}
// StoreConsumerMetadata will store the metadata
// metadata including the basic info of the server, consumer info, and other user custom info
func (n *nacosMetadataReport) StoreConsumerMetadata(consumerMetadataIdentifier *identifier.MetadataIdentifier, serviceParameterString string) error {
return n.storeMetadata(vo.ConfigParam{
DataId: consumerMetadataIdentifier.GetIdentifierKey(),
Group: consumerMetadataIdentifier.Group,
Content: serviceParameterString,
})
}
// SaveServiceMetadata will store the metadata
// metadata including the basic info of the server, service info, and other user custom info
func (n *nacosMetadataReport) SaveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
return n.storeMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
Content: url.String(),
})
}
// RemoveServiceMetadata will remove the service metadata
func (n *nacosMetadataReport) RemoveServiceMetadata(metadataIdentifier *identifier.ServiceMetadataIdentifier) error {
return n.deleteMetadata(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
})
}
// GetExportedURLs will look up the exported urls.
// if not found, an empty list will be returned.
func (n *nacosMetadataReport) GetExportedURLs(metadataIdentifier *identifier.ServiceMetadataIdentifier) []string {
return n.getConfigAsArray(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
})
}
// SaveSubscribedData will convert the urlList to json array and then store it
func (n *nacosMetadataReport) SaveSubscribedData(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier, urlList []common.URL) error {
if len(urlList) == 0 {
logger.Warnf("The url list is empty")
return nil
}
urlStrList := make([]string, 0, len(urlList))
for _, e := range urlList {
urlStrList = append(urlStrList, e.String())
}
bytes, err := json.Marshal(urlStrList)
if err != nil {
return perrors.WithMessage(err, "Could not convert the array to json")
}
return n.storeMetadata(vo.ConfigParam{
DataId: subscriberMetadataIdentifier.GetIdentifierKey(),
Group: subscriberMetadataIdentifier.Group,
Content: string(bytes),
})
}
// GetSubscribedURLs will lookup the url
// if not found, an empty list will be returned
func (n *nacosMetadataReport) GetSubscribedURLs(subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier) []string {
return n.getConfigAsArray(vo.ConfigParam{
DataId: subscriberMetadataIdentifier.GetIdentifierKey(),
Group: subscriberMetadataIdentifier.Group,
})
}
// GetServiceDefinition will lookup the service definition
func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifier.MetadataIdentifier) string {
return n.getConfig(vo.ConfigParam{
DataId: metadataIdentifier.GetIdentifierKey(),
Group: metadataIdentifier.Group,
})
}
// storeMetadata will publish the metadata to Nacos
// if failed or error is not nil, error will be returned
func (n *nacosMetadataReport) storeMetadata(param vo.ConfigParam) error {
res, err := n.client.PublishConfig(param)
if err != nil {
return perrors.WithMessage(err, "Could not publish the metadata")
}
if !res {
return perrors.New("Publish the metadata failed.")
}
return nil
}
// deleteMetadata will delete the metadata
func (n *nacosMetadataReport) deleteMetadata(param vo.ConfigParam) error {
res, err := n.client.DeleteConfig(param)
if err != nil {
return perrors.WithMessage(err, "Could not delete the metadata")
}
if !res {
return perrors.New("Deleting the metadata failed.")
}
return nil
}
// getConfigAsArray will read the config and then convert it as an one-element array
// error or config not found, an empty list will be returned.
func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) []string {
cfg := n.getConfig(param)
res := make([]string, 0, 1)
if len(cfg) == 0 {
return res
}
decodeCfg, err := url.QueryUnescape(cfg)
if err != nil {
logger.Errorf("The config is invalid: %s", cfg)
return res
}
res = append(res, decodeCfg)
return res
}
// getConfig will read the config
func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) string {
cfg, err := n.client.GetConfig(param)
if err != nil {
logger.Errorf("Finding the configuration failed: %v", param)
}
return cfg
}
type nacosMetadataReportFactory struct {
}
func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
client, err := nacos.NewNacosConfigClient(url)
if err != nil {
logger.Errorf("Could not create nacos metadata report. URL: %s", url.String())
return nil
}
return &nacosMetadataReport{client: client}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"strconv"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
)
func TestNacosMetadataReport_CRUD(t *testing.T) {
rpt := newTestReport()
assert.NotNil(t, rpt)
providerMi := newMetadataIdentifier("server")
providerMeta := "provider"
err := rpt.StoreProviderMetadata(providerMi, providerMeta)
consumerMi := newMetadataIdentifier("client")
consumerMeta := "consumer"
err = rpt.StoreConsumerMetadata(consumerMi, consumerMeta)
assert.Nil(t, err)
serviceMi := newServiceMetadataIdentifier()
serviceUrl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
err = rpt.SaveServiceMetadata(serviceMi, serviceUrl)
assert.Nil(t, err)
exportedUrls := rpt.GetExportedURLs(serviceMi)
assert.Equal(t, 1, len(exportedUrls))
subMi := newSubscribeMetadataIdentifier()
urlList := make([]common.URL, 0, 1)
urlList = append(urlList, serviceUrl)
err = rpt.SaveSubscribedData(subMi, urlList)
assert.Nil(t, err)
subscribeUrl := rpt.GetSubscribedURLs(subMi)
assert.Equal(t, 1, len(subscribeUrl))
err = rpt.RemoveServiceMetadata(serviceMi)
assert.Nil(t, err)
}
func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier {
return &identifier.SubscriberMetadataIdentifier{
Revision: "subscribe",
MetadataIdentifier: *newMetadataIdentifier("provider"),
}
}
func newServiceMetadataIdentifier() *identifier.ServiceMetadataIdentifier {
return &identifier.ServiceMetadataIdentifier{
Protocol: "nacos",
Revision: "a",
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: "com.test.MyTest",
Version: "1.0.0",
Group: "test_group",
Side: "service",
},
}
}
func newMetadataIdentifier(side string) *identifier.MetadataIdentifier {
return &identifier.MetadataIdentifier{
Application: "test",
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: "com.test.MyTest",
Version: "1.0.0",
Group: "test_group",
Side: side,
},
}
}
func TestNacosMetadataReportFactory_CreateMetadataReport(t *testing.T) {
res := newTestReport()
assert.NotNil(t, res)
}
func newTestReport() report.MetadataReport {
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(&regurl)
return res
}
......@@ -18,22 +18,12 @@
package nacos
import (
"net"
"strconv"
"strings"
"time"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/remoting/nacos"
)
// baseRegistry is the parent of both interface-level registry
......@@ -45,11 +35,7 @@ type nacosBaseRegistry struct {
// newBaseRegistry will create new instance
func newBaseRegistry(url *common.URL) (nacosBaseRegistry, error) {
nacosConfig, err := getNacosConfig(url)
if err != nil {
return nacosBaseRegistry{}, err
}
client, err := clients.CreateNamingClient(nacosConfig)
client, err := nacos.NewNacosNamingClient(url)
if err != nil {
return nacosBaseRegistry{}, err
}
......@@ -59,44 +45,3 @@ func newBaseRegistry(url *common.URL) (nacosBaseRegistry, error) {
}
return registry, nil
}
// getNacosConfig will return the nacos config
func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if url == nil {
return nil, perrors.New("url is empty!")
}
if len(url.Location) == 0 {
return nil, perrors.New("url.location is empty!")
}
configMap := make(map[string]interface{}, 2)
addresses := strings.Split(url.Location, ",")
serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
})
}
configMap["serverConfigs"] = serverConfigs
var clientConfig nacosConstant.ClientConfig
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
return nil, err
}
clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NotLoadCacheAtStart = true
configMap["clientConfig"] = clientConfig
return configMap, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"net"
"strconv"
"strings"
"time"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
func NewNacosNamingClient(url *common.URL) (naming_client.INamingClient, error) {
nacosConfig, err := getNacosConfig(url)
if err != nil {
return nil, err
}
return clients.CreateNamingClient(nacosConfig)
}
func NewNacosConfigClient(url *common.URL) (config_client.IConfigClient, error) {
nacosConfig, err := getNacosConfig(url)
if err != nil {
return nil, err
}
return clients.CreateConfigClient(nacosConfig)
}
// getNacosConfig will return the nacos config
func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
if url == nil {
return nil, perrors.New("url is empty!")
}
if len(url.Location) == 0 {
return nil, perrors.New("url.location is empty!")
}
configMap := make(map[string]interface{}, 2)
addresses := strings.Split(url.Location, ",")
serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, err := strconv.Atoi(portStr)
if err != nil {
return configMap, perrors.WithMessage(err, "the port string is invalid. "+portStr)
}
serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
})
}
configMap["serverConfigs"] = serverConfigs
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
return nil, err
}
timeoutMs := uint64(timeout.Nanoseconds() / constant.MsToNanoRate)
configMap["clientConfig"] = nacosConstant.ClientConfig{
TimeoutMs: timeoutMs,
ListenInterval: 2 * timeoutMs,
CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
NotLoadCacheAtStart: true,
}
return configMap, nil
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment