Skip to content
Snippets Groups Projects
Commit 3d172bfd authored by LaurenceLiZhixin's avatar LaurenceLiZhixin
Browse files

âfeat: support v3 remote zk metadata report

parent abbd985f
No related branches found
No related tags found
No related merge requests found
Showing
with 634 additions and 383 deletions
......@@ -313,11 +313,12 @@ const (
const (
SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services"
PROVIDER_BY = "provided-by"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision"
SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision"
SERVICE_INSTANCE_SELECTOR = "service-instance-selector"
METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type"
DEFAULT_METADATA_STORAGE_TYPE = "local"
REMOTE_METADATA_STORAGE_TYPE = "remote"
SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints"
METADATA_SERVICE_PREFIX = "dubbo.metadata-service."
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
......
package extension
import (
"fmt"
"github.com/apache/dubbo-go/metadata/remote"
perrors "github.com/pkg/errors"
)
var (
creator func() (remote.RemoteMetadataService, error)
)
// SetMetadataRemoteService will store the
func SetMetadataRemoteService(creatorFunc func() (remote.RemoteMetadataService, error)) {
creator = creatorFunc
}
// GetRemoteMetadataServiceFactory will create a MetadataService instance
func GetRemoteMetadataService() (remote.RemoteMetadataService, error) {
if creator != nil {
return creator()
}
return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: remote, please check whether you have imported relative packages, \n" +
"remote - github.com/apache/dubbo-go/metadata/remote/impl"))
}
......@@ -26,7 +26,6 @@ import (
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service"
)
......@@ -51,17 +50,3 @@ func GetMetadataService(msType string) (service.MetadataService, error) {
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+
"remote - github.com/apache/dubbo-go/metadata/service/remote", msType))
}
// GetRemoteMetadataService will get a RemoteMetadataService instance
func GetRemoteMetadataService() (service.MetadataService, error) {
if remoteMetadataService != nil {
return remoteMetadataService, nil
}
if creator, ok := metadataServiceInsMap["remote"]; ok {
var err error
remoteMetadataService, err = creator()
return remoteMetadataService, err
}
logger.Warn("could not find the metadata service creator for metadataType: remote")
return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: remote"))
}
......@@ -41,6 +41,5 @@ func GetMetadataServiceProxyFactory(name string) service.MetadataServiceProxyFac
return f()
}
panic(fmt.Sprintf("could not find the metadata service factory creator for name: %s, please check whether you have imported relative packages, \n" +
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+
"remote - github.com/apache/dubbo-go/metadata/service/remote", name))
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package common
import (
"fmt"
"github.com/apache/dubbo-go/common/constant"
gxset "github.com/dubbogo/gost/container/set"
"go.uber.org/atomic"
"hash/crc32"
"net/url"
"sort"
"strings"
)
var IncludeKeys = gxset.NewSet(
constant.APPLICATION_KEY,
constant.GROUP_KEY, constant.TIMESTAMP_KEY, constant.SERIALIZATION_KEY, constant.CLUSTER_KEY,
constant.LOADBALANCE_KEY, constant.PATH_KEY, constant.TIMEOUT_KEY,
constant.TOKEN_KEY, constant.VERSION_KEY, constant.WARMUP_KEY,
constant.WEIGHT_KEY, constant.RELEASE_KEY)
type MetadataInfo struct {
App string `json:"app"`
Revision string `json:"revision"`
Services map[string]*ServiceInfo `json:"services"`
reported *atomic.Bool `json:"-"`
}
func NewMetadataInfWithApp(app string) *MetadataInfo {
return NewMetadataInfo(app, "", make(map[string]*ServiceInfo))
}
func NewMetadataInfo(app string, revision string, services map[string]*ServiceInfo) *MetadataInfo {
return &MetadataInfo{
App: app,
Revision: revision,
Services: services,
reported: atomic.NewBool(false),
}
}
// CalAndGetRevision is different from Dubbo because golang doesn't support overload
// so that we could use interface + method name as identifier and ignore the method params
// per my understanding, it's enough because Dubbo actually ignore the url params.
// please refer org.apache.dubbo.common.URL#toParameterString(java.lang.String...)
func (mi *MetadataInfo) CalAndGetRevision() string {
if mi.Revision != "" && mi.reported.Load() {
return mi.Revision
}
if len(mi.Services) == 0 {
return "0"
}
candidates := make([]string, 8)
for _, s := range mi.Services {
sk := s.serviceKey
ms := s.url.Methods
if len(ms) == 0 {
candidates = append(candidates, sk)
} else {
for _, m := range ms {
// methods are part of candidates
candidates = append(candidates, sk+constant.KEY_SEPARATOR+m)
}
}
// append url params if we need it
}
sort.Strings(candidates)
// it's nearly impossible to be overflow
res := uint64(0)
for _, c := range candidates {
res += uint64(crc32.ChecksumIEEE([]byte(c)))
}
mi.Revision = fmt.Sprint(res)
return mi.Revision
}
func (mi *MetadataInfo) HasReported() bool {
return mi.reported.Load()
}
func (mi *MetadataInfo) MarkReported() {
mi.reported.CAS(false, true)
}
func (mi *MetadataInfo) AddService(service *ServiceInfo) {
if service == nil {
return
}
mi.Services[service.GetMatchKey()] = service
}
func (mi *MetadataInfo) RemoveService(service *ServiceInfo) {
if service == nil {
return
}
delete(mi.Services, service.matchKey)
}
type ServiceInfo struct {
Name string `json:"name"`
Group string `json:"group"`
Version string `json:"version"`
Protocol string `json:"protocol"`
Path string `json:"path"`
Params map[string]string `json:"params"`
serviceKey string `json:"-"`
matchKey string `json:"-"`
url *URL `json:"-"`
}
func NewServiceInfoWithUrl(url *URL) *ServiceInfo {
service := NewServiceInfo(url.Service(), url.Group(), url.Version(), url.Protocol, url.Path, nil)
service.url = url
// TODO includeKeys load dynamic
p := make(map[string]string, 8)
for _, keyInter := range IncludeKeys.Values() {
key := keyInter.(string)
value := url.GetParam(key, "")
if len(value) != 0 {
p[key] = value
}
for _, method := range url.Methods {
value = url.GetMethodParam(method, key, "")
if len(value) != 0 {
p[method+"."+key] = value
}
}
}
service.Params = p
return service
}
func NewServiceInfo(name string, group string, version string, protocol string, path string, params map[string]string) *ServiceInfo {
serviceKey := ServiceKey(name, group, version)
matchKey := MatchKey(serviceKey, protocol)
return &ServiceInfo{
Name: name,
Group: group,
Version: version,
Protocol: protocol,
Path: path,
Params: params,
serviceKey: serviceKey,
matchKey: matchKey,
}
}
func (si *ServiceInfo) GetMethods() []string {
if si.Params[constant.METHODS_KEY] != "" {
s := si.Params[constant.METHODS_KEY]
return strings.Split(s, ",")
}
methods := make([]string, 8)
for k, _ := range si.Params {
ms := strings.Index(k, ".")
if ms > 0 {
methods = append(methods, k[0:ms])
}
}
return methods
}
func (si *ServiceInfo) GetParams() url.Values {
v := url.Values{}
for k, p := range si.Params {
ms := strings.Index(k, ".")
if ms > 0 {
v.Set("methods."+k, p)
} else {
v.Set(k, p)
}
}
return v
}
func (si *ServiceInfo) GetMatchKey() string {
if si.matchKey != "" {
return si.matchKey
}
serviceKey := si.GetServiceKey()
si.matchKey = MatchKey(serviceKey, si.Protocol)
return si.matchKey
}
func (si *ServiceInfo) GetServiceKey() string {
if si.serviceKey != "" {
return si.serviceKey
}
si.serviceKey = ServiceKey(si.Name, si.Group, si.Version)
return si.serviceKey
}
......@@ -277,6 +277,20 @@ func NewURL(urlString string, opts ...Option) (*URL, error) {
return &s, nil
}
func MatchKey(serviceKey string, protocol string) string {
return serviceKey + ":" + protocol
}
// Group get group
func (c *URL) Group() string {
return c.GetParam(constant.GROUP_KEY, "")
}
// Version get group
func (c *URL) Version() string {
return c.GetParam(constant.VERSION_KEY, "")
}
// URLEqual judge @url and @c is equal or not.
func (c *URL) URLEqual(url *URL) bool {
tmpC := c.Clone()
......@@ -663,7 +677,7 @@ func MergeUrl(serviceUrl *URL, referenceUrl *URL) *URL {
mergedUrl := serviceUrl.Clone()
params := mergedUrl.GetParams()
// iterator the referenceUrl if serviceUrl not have the key ,merge in
// referenceUrl usually will not changed. so change RangeParams to GetParams to avoid the string value copy.
// referenceUrl usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group
for key, value := range referenceUrl.GetParams() {
if v := mergedUrl.GetParam(key, ""); len(v) == 0 {
if len(value) > 0 {
......
......@@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"reflect"
"strconv"
......@@ -299,6 +298,10 @@ func registerServiceInstance() {
panic(err)
}
}
// todo publish metadata to remote
if remoteMetadataServiceImpl, err := extension.GetRemoteMetadataService(); err == nil {
remoteMetadataServiceImpl.PublishMetadata(GetApplicationConfig().Name)
}
}
// nolint
......@@ -337,19 +340,14 @@ func selectMetadataServiceExportedURL() *common.URL {
logger.Warn(err)
return nil
}
list, err := metaDataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
urlList, err := metaDataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
if err != nil {
panic(err)
}
if len(list) == 0 {
if len(urlList) == 0 {
return nil
}
for _, urlStr := range list {
url, err := common.NewURL(urlStr.(string))
if err != nil {
logger.Errorf("url format error {%v}", url)
continue
}
for _, url := range urlList {
selectedUrl = url
// rest first
if url.Protocol == "rest" {
......
......@@ -117,7 +117,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
serviceURL.Path = "/" + c.InterfaceName
}
// merge url need to do
newURL := common.MergeURL(serviceURL, cfgURL)
newURL := common.MergeUrl(serviceURL, cfgURL)
c.urls = append(c.urls, newURL)
}
}
......@@ -152,7 +152,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
// not a registry url, must be direct invoke.
hitClu = constant.FAILOVER_CLUSTER_NAME
if len(invokers) > 0 {
u := invokers[0].GetURL()
u := invokers[0].GetUrl()
if nil != &u {
hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME)
}
......@@ -262,8 +262,8 @@ func (c *ReferenceConfig) GetInvoker() protocol.Invoker {
}
func publishConsumerDefinition(url *common.URL) {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
if remoteMetadataServiceImpl, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataServiceImpl != nil {
remoteMetadataServiceImpl.PublishServiceDefinition(url)
}
}
......
......@@ -349,8 +349,9 @@ func (c *ServiceConfig) GetExportedUrls() []*common.URL {
}
func publishServiceDefinition(url *common.URL) {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
if remoteMetadataServiceImpl, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataServiceImpl != nil {
remoteMetadataServiceImpl.PublishServiceDefinition(url)
}
}
......
......@@ -32,3 +32,18 @@ func (mdi *SubscriberMetadataIdentifier) GetIdentifierKey() string {
func (mdi *SubscriberMetadataIdentifier) GetFilePathKey() string {
return mdi.BaseMetadataIdentifier.getFilePathKey(mdi.Revision)
}
func NewSubscriberMetadataIdentifier(application string, revision string) *SubscriberMetadataIdentifier {
return &SubscriberMetadataIdentifier{
Revision: revision,
MetadataIdentifier: MetadataIdentifier{
Application: application,
BaseMetadataIdentifier: BaseMetadataIdentifier{
ServiceInterface: "",
Version: "",
Group: "",
Side: "",
},
},
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment