Skip to content
Snippets Groups Projects
Commit b26354db authored by Patrick's avatar Patrick
Browse files

3.0 service discovery optimize

parent 5c760406
No related branches found
No related tags found
No related merge requests found
Showing
with 501 additions and 370 deletions
......@@ -314,17 +314,18 @@ const (
// service discovery
const (
SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services"
PROVIDER_BY = "provided-by"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision"
SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision"
SERVICE_INSTANCE_SELECTOR = "service-instance-selector"
METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type"
DEFAULT_METADATA_STORAGE_TYPE = "local"
SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints"
METADATA_SERVICE_PREFIX = "dubbo.metadata-service."
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls"
SUBSCRIBED_SERVICE_NAMES_KEY = "subscribed-services"
PROVIDER_BY = "provided-by"
EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision"
//SUBSCRIBED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision"
SERVICE_INSTANCE_SELECTOR = "service-instance-selector"
METADATA_STORAGE_TYPE_PROPERTY_NAME = "dubbo.metadata.storage-type"
DEFAULT_METADATA_STORAGE_TYPE = "local"
REMOTE_METADATA_STORAGE_TYPE = "remote"
SERVICE_INSTANCE_ENDPOINTS = "dubbo.endpoints"
METADATA_SERVICE_PREFIX = "dubbo.metadata-service."
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls"
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package common
import (
"fmt"
"github.com/apache/dubbo-go/common/constant"
gxset "github.com/dubbogo/gost/container/set"
"go.uber.org/atomic"
"hash/crc32"
"net/url"
"sort"
"strings"
)
var IncludeKeys = gxset.NewSet(
constant.APPLICATION_KEY,
constant.GROUP_KEY, constant.TIMESTAMP_KEY, constant.SERIALIZATION_KEY, constant.CLUSTER_KEY,
constant.LOADBALANCE_KEY, constant.PATH_KEY, constant.TIMEOUT_KEY,
constant.TOKEN_KEY, constant.VERSION_KEY, constant.WARMUP_KEY,
constant.WEIGHT_KEY, constant.RELEASE_KEY)
type MetadataInfo struct {
App string `json:"app"`
Revision string `json:"revision"`
Services map[string]*ServiceInfo `json:"services"`
reported *atomic.Bool `json:"-"`
}
func NewMetadataInfWithApp(app string) *MetadataInfo {
return NewMetadataInfo(app, "", make(map[string]*ServiceInfo))
}
func NewMetadataInfo(app string, revision string, services map[string]*ServiceInfo) *MetadataInfo {
return &MetadataInfo{
App: app,
Revision: revision,
Services: services,
reported: atomic.NewBool(false),
}
}
// CalAndGetRevision is different from Dubbo because golang doesn't support overload
// so that we could use interface + method name as identifier and ignore the method params
// per my understanding, it's enough because Dubbo actually ignore the url params.
// please refer org.apache.dubbo.common.URL#toParameterString(java.lang.String...)
func (mi *MetadataInfo) CalAndGetRevision() string {
if mi.Revision != "" && mi.reported.Load() {
return mi.Revision
}
if len(mi.Services) == 0 {
return "0"
}
candidates := make([]string, 8)
for _, s := range mi.Services {
sk := s.serviceKey
ms := s.GetMethods()
if len(ms) == 0 {
candidates = append(candidates, sk)
} else {
for _, m := range ms {
// methods are part of candidates
candidates = append(candidates, sk+constant.KEY_SEPARATOR+m)
}
}
// append url params if we need it
}
sort.Strings(candidates)
// it's nearly impossible to be overflow
res := uint64(0)
for _, c := range candidates {
res += uint64(crc32.ChecksumIEEE([]byte(c)))
}
mi.Revision = fmt.Sprint(res)
return mi.Revision
}
func (mi *MetadataInfo) HasReported() bool {
return mi.reported.Load()
}
func (mi *MetadataInfo) MarkReported() {
mi.reported.CAS(false, true)
}
func (mi *MetadataInfo) AddService(service *ServiceInfo) {
if service == nil {
return
}
mi.Services[service.GetMatchKey()] = service
}
func (mi *MetadataInfo) RemoveService(service *ServiceInfo) {
if service == nil {
return
}
delete(mi.Services, service.matchKey)
}
type ServiceInfo struct {
Name string `json:"name"`
Group string `json:"group"`
Version string `json:"version"`
Protocol string `json:"protocol"`
Path string `json:"path"`
Params map[string]string `json:"params"`
serviceKey string `json:"-"`
matchKey string `json:"-"`
url *URL `json:"-"`
}
func NewServiceInfoWithUrl(url *URL) *ServiceInfo {
service := NewServiceInfo(url.Service(), url.Group(), url.Version(), url.Protocol, url.Path, nil)
service.url = url
// TODO includeKeys load dynamic
p := make(map[string]string, 8)
for _, keyInter := range IncludeKeys.Values() {
key := keyInter.(string)
value := url.GetParam(key, "")
if len(value) != 0 {
p[key] = value
}
for _, method := range url.Methods {
value = url.GetMethodParam(method, key, "")
if len(value) != 0 {
p[method+"."+key] = value
}
}
}
service.Params = p
return service
}
func NewServiceInfo(name string, group string, version string, protocol string, path string, params map[string]string) *ServiceInfo {
serviceKey := ServiceKey(name, group, version)
matchKey := MatchKey(serviceKey, protocol)
return &ServiceInfo{
Name: name,
Group: group,
Version: version,
Protocol: protocol,
Path: path,
Params: params,
serviceKey: serviceKey,
matchKey: matchKey,
}
}
func (si *ServiceInfo) GetMethods() []string {
if si.Params[constant.METHODS_KEY] != "" {
s := si.Params[constant.METHODS_KEY]
return strings.Split(s, ",")
}
methods := make([]string, 8)
for k, _ := range si.Params {
ms := strings.Index(k, ".")
if ms > 0 {
methods = append(methods, k[0:ms])
}
}
return methods
}
func (si *ServiceInfo) GetParams() url.Values {
v := url.Values{}
for k, p := range si.Params {
ms := strings.Index(k, ".")
if ms > 0 {
v.Set("methods."+k, p)
} else {
v.Set(k, p)
}
}
return v
}
func (si *ServiceInfo) GetMatchKey() string {
if si.matchKey != "" {
return si.matchKey
}
serviceKey := si.GetServiceKey()
si.matchKey = MatchKey(serviceKey, si.Protocol)
return si.matchKey
}
func (si *ServiceInfo) GetServiceKey() string {
if si.serviceKey != "" {
return si.serviceKey
}
si.serviceKey = ServiceKey(si.Name, si.Group, si.Version)
return si.serviceKey
}
......@@ -371,6 +371,10 @@ func ServiceKey(intf string, group string, version string) string {
return buf.String()
}
func MatchKey(serviceKey string, protocol string) string {
return serviceKey + ":" + protocol
}
// ColonSeparatedKey
// The format is "{interface}:[version]:[group]"
func (c *URL) ColonSeparatedKey() string {
......@@ -413,6 +417,16 @@ func (c *URL) Service() string {
return ""
}
// Group get group
func (c *URL) Group() string {
return c.GetParam(constant.GROUP_KEY, "")
}
// Version get group
func (c *URL) Version() string {
return c.GetParam(constant.VERSION_KEY, "")
}
// AddParam will add the key-value pair
func (c *URL) AddParam(key string, value string) {
c.paramsLock.Lock()
......
......@@ -23,6 +23,21 @@ type SubscriberMetadataIdentifier struct {
MetadataIdentifier
}
func NewSubscriberMetadataIdentifier(application string, revision string) *SubscriberMetadataIdentifier {
return &SubscriberMetadataIdentifier{
Revision: revision,
MetadataIdentifier: MetadataIdentifier{
Application: application,
BaseMetadataIdentifier: BaseMetadataIdentifier{
ServiceInterface: "",
Version: "",
Group: "",
Side: "",
},
},
}
}
// GetIdentifierKey returns string that format is service:Version:Group:Side:Revision
func (mdi *SubscriberMetadataIdentifier) GetIdentifierKey() string {
return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.Revision)
......
......@@ -46,6 +46,14 @@ type consulMetadataReport struct {
client *consul.Client
}
func (m *consulMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
panic("implement me")
}
func (m *consulMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {
panic("implement me")
}
// StoreProviderMetadata stores the metadata.
func (m *consulMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
kv := &consul.KVPair{Key: providerIdentifier.GetIdentifierKey(), Value: []byte(serviceDefinitions)}
......
......@@ -302,3 +302,13 @@ func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifie
}
return false
}
func (mr *MetadataReport) PublishAppMetadata(identifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {
report := instance.GetMetadataReportInstance()
return report.PublishAppMetadata(identifier, info)
}
func (mr *MetadataReport) GetAppMetadata(identifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
report := instance.GetMetadataReportInstance()
return report.GetAppMetadata(identifier)
}
......@@ -47,6 +47,16 @@ type etcdMetadataReport struct {
root string
}
func (e *etcdMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
// TODO will implement
panic("implement me")
}
func (e *etcdMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {
// TODO will implement
panic("implement me")
}
// StoreProviderMetadata will store the metadata
// metadata including the basic info of the server, provider info, and other user custom info
func (e *etcdMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
......
......@@ -50,6 +50,16 @@ type nacosMetadataReport struct {
client config_client.IConfigClient
}
func (n *nacosMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
// TODO will implement
panic("implement me")
}
func (n *nacosMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {
// TODO will implement
panic("implement me")
}
// StoreProviderMetadata stores the metadata.
func (n *nacosMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
return n.storeMetadata(vo.ConfigParam{
......
......@@ -57,4 +57,8 @@ type MetadataReport interface {
// GetServiceDefinition gets the service definition.
GetServiceDefinition(*identifier.MetadataIdentifier) (string, error)
GetAppMetadata(*identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error)
PublishAppMetadata(*identifier.SubscriberMetadataIdentifier, *common.MetadataInfo) error
}
......@@ -18,6 +18,9 @@
package zookeeper
import (
"encoding/json"
"github.com/apache/dubbo-go/common/logger"
"github.com/dubbogo/go-zookeeper/zk"
"strings"
"time"
)
......@@ -50,6 +53,34 @@ type zookeeperMetadataReport struct {
rootDir string
}
func (m *zookeeperMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
k := m.rootDir + metadataIdentifier.GetFilePathKey()
data, _, err := m.client.GetContent(k)
if err != nil {
return nil, err
}
var metadataInfo common.MetadataInfo
err = json.Unmarshal(data, &metadataInfo)
if err != nil {
return nil, err
}
return &metadataInfo, nil
}
func (m *zookeeperMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {
k := m.rootDir + metadataIdentifier.GetFilePathKey()
data, err := json.Marshal(metadataIdentifier)
if err != nil {
return err
}
err = m.client.CreateTempWithValue(k, data)
if err == zk.ErrNodeExists {
logger.Debugf("Try to create the node data failed. In most cases, it's not a problem. ")
return nil
}
return err
}
// StoreProviderMetadata stores the metadata.
func (m *zookeeperMetadataReport) StoreProviderMetadata(providerIdentifier *identifier.MetadataIdentifier, serviceDefinitions string) error {
k := m.rootDir + providerIdentifier.GetFilePathKey()
......
......@@ -19,6 +19,7 @@ package inmemory
import (
"encoding/json"
"sync"
)
import (
......@@ -37,6 +38,17 @@ func init() {
})
}
var factory service.MetadataServiceProxyFactory
var once *sync.Once
func GetInMemoryMetadataServiceProxyFactory() service.MetadataServiceProxyFactory {
once.Do(func() {
factory = service.NewBaseMetadataServiceProxyFactory(createProxy)
})
return factory
}
// createProxy creates an instance of MetadataServiceProxy
// we read the metadata from ins.Metadata()
// and then create an Invoker instance
......
......@@ -28,7 +28,6 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/definition"
......@@ -41,10 +40,6 @@ const (
local = "local"
)
func init() {
extension.SetMetadataService(local, NewMetadataService)
}
// MetadataService is store and query the metadata info in memory when each service registry
type MetadataService struct {
service.BaseMetadataService
......@@ -52,6 +47,9 @@ type MetadataService struct {
subscribedServiceURLs *sync.Map
serviceDefinitions *sync.Map
lock *sync.RWMutex
mlock *sync.Mutex
metadataInfo *common.MetadataInfo
metadataServiceURL *common.URL
}
var (
......@@ -61,7 +59,7 @@ var (
// NewMetadataService: initiate a metadata service
// it should be singleton
func NewMetadataService() (service.MetadataService, error) {
func GetInMemoryMetadataService() (service.MetadataService, error) {
metadataServiceInitOnce.Do(func() {
metadataServiceInstance = &MetadataService{
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
......@@ -69,6 +67,7 @@ func NewMetadataService() (service.MetadataService, error) {
subscribedServiceURLs: &sync.Map{},
serviceDefinitions: &sync.Map{},
lock: &sync.RWMutex{},
metadataInfo: nil,
}
})
return metadataServiceInstance, nil
......@@ -153,11 +152,28 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s
// ExportURL can store the in memory
func (mts *MetadataService) ExportURL(url *common.URL) (bool, error) {
if constant.METADATA_SERVICE_NAME == url.GetParam(constant.INTERFACE_KEY, "") {
mts.metadataServiceURL = url
return true, nil
}
mts.mlock.Lock()
if mts.metadataInfo == nil {
mts.metadataInfo = common.NewMetadataInfWithApp(config.GetApplicationConfig().Name)
}
mts.mlock.Unlock()
mts.metadataInfo.AddService(common.NewServiceInfoWithUrl(url))
return mts.addURL(mts.exportedServiceURLs, url), nil
}
// UnexportURL can remove the url store in memory
func (mts *MetadataService) UnexportURL(url *common.URL) error {
if constant.METADATA_SERVICE_NAME == url.GetParam(constant.INTERFACE_KEY, "") {
mts.metadataServiceURL = nil
return nil
}
if mts.metadataInfo != nil {
mts.metadataInfo.RemoveService(common.NewServiceInfoWithUrl(url))
}
mts.removeURL(mts.exportedServiceURLs, url)
return nil
}
......@@ -220,6 +236,20 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string)
return v.(string), nil
}
func (mts *MetadataService) GetMetadataInfo(revision string) *common.MetadataInfo {
if revision == "" {
return mts.metadataInfo
}
if mts.metadataInfo.CalAndGetRevision() != revision {
return nil
}
return mts.metadataInfo
}
func (mts *MetadataService) GetExportedServiceURLs() []*common.URL {
return mts.getAllService(mts.exportedServiceURLs)
}
// RefreshMetadata will always return true because it will be implement by remote service
func (mts *MetadataService) RefreshMetadata(string, string) (bool, error) {
return true, nil
......@@ -229,3 +259,7 @@ func (mts *MetadataService) RefreshMetadata(string, string) (bool, error) {
func (mts *MetadataService) Version() (string, error) {
return version, nil
}
func (mts *MetadataService) GetMetadataServiceURL() *common.URL {
return mts.metadataServiceURL
}
......@@ -70,6 +70,15 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st
return append(ret, *urlStrs...), nil
}
func (m *MetadataServiceProxy) GetExportedServiceURLs() []*common.URL {
logger.Error("you should never invoke this implementation")
return nil
}
func (m *MetadataServiceProxy) GetMetadataServiceURL() *common.URL {
return nil
}
func (m *MetadataServiceProxy) MethodMapper() map[string]string {
return map[string]string{}
}
......@@ -133,3 +142,8 @@ func (m *MetadataServiceProxy) Version() (string, error) {
logger.Error("you should never invoke this implementation")
return "", nil
}
func (m *MetadataServiceProxy) GetMetadataInfo(revision string) *common.MetadataInfo {
logger.Error("you should never invoke this implementation")
return nil
}
......@@ -18,6 +18,7 @@
package remote
import (
"github.com/apache/dubbo-go/registry"
"sync"
)
......@@ -28,13 +29,10 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report/delegate"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/metadata/service/inmemory"
)
......@@ -44,14 +42,9 @@ const (
remote = "remote"
)
func init() {
extension.SetMetadataService(remote, newMetadataService)
}
// MetadataService is a implement of metadata service which will delegate the remote metadata report
// This is singleton
type MetadataService struct {
service.BaseMetadataService
type RemoteMetadataService struct {
inMemoryMetadataService *inmemory.MetadataService
exportedRevision atomic.String
subscribedRevision atomic.String
......@@ -60,11 +53,11 @@ type MetadataService struct {
var (
metadataServiceOnce sync.Once
metadataServiceInstance *MetadataService
metadataServiceInstance *RemoteMetadataService
)
// newMetadataService will create a new remote MetadataService instance
func newMetadataService() (service.MetadataService, error) {
// GetRemoteMetadataService will create a new remote MetadataService instance
func GetRemoteMetadataService() (*RemoteMetadataService, error) {
var err error
metadataServiceOnce.Do(func() {
var mr *delegate.MetadataReport
......@@ -73,9 +66,8 @@ func newMetadataService() (service.MetadataService, error) {
return
}
// it will never return error
inms, _ := inmemory.NewMetadataService()
metadataServiceInstance = &MetadataService{
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
inms, _ := inmemory.GetInMemoryMetadataService()
metadataServiceInstance = &RemoteMetadataService{
inMemoryMetadataService: inms.(*inmemory.MetadataService),
delegateReport: mr,
}
......@@ -83,37 +75,30 @@ func newMetadataService() (service.MetadataService, error) {
return metadataServiceInstance, err
}
// setInMemoryMetadataService will replace the in memory metadata service by the specific param
func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) {
mts.inMemoryMetadataService = metadata
}
// ExportURL will be implemented by in memory service
func (mts *MetadataService) ExportURL(url *common.URL) (bool, error) {
return mts.inMemoryMetadataService.ExportURL(url)
}
// UnexportURL remove @url's metadata
func (mts *MetadataService) UnexportURL(url *common.URL) error {
smi := identifier.NewServiceMetadataIdentifier(url)
smi.Revision = mts.exportedRevision.Load()
return mts.delegateReport.RemoveServiceMetadata(smi)
}
// SubscribeURL will be implemented by in memory service
func (mts *MetadataService) SubscribeURL(url *common.URL) (bool, error) {
return mts.inMemoryMetadataService.SubscribeURL(url)
// publishMetadata
func (mts *RemoteMetadataService) PublishMetadata(service string) {
info := mts.inMemoryMetadataService.GetMetadataInfo("")
if info.HasReported() {
return
}
id := identifier.NewSubscriberMetadataIdentifier(service, info.CalAndGetRevision())
err := mts.delegateReport.PublishAppMetadata(id, info)
if err != nil {
logger.Errorf("Publishing metadata to error[%v]", err)
return
}
info.MarkReported()
}
// UnsubscribeURL will be implemented by in memory service
func (mts *MetadataService) UnsubscribeURL(url *common.URL) error {
// TODO remove call self.
return nil
//return mts.UnsubscribeURL(url)
// publishMetadata
func (mts *RemoteMetadataService) GetMetadata(instance registry.ServiceInstance) (*common.MetadataInfo, error) {
revision := instance.GetMetadata()[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME]
id := identifier.NewSubscriberMetadataIdentifier(instance.GetServiceName(), revision)
return mts.delegateReport.GetAppMetadata(id)
}
// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition
func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
func (mts *RemoteMetadataService) PublishServiceDefinition(url *common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if common.RoleType(common.PROVIDER).Role() == url.GetParam(constant.SIDE_KEY, "") {
......@@ -154,75 +139,21 @@ func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
}
// GetExportedURLs will be implemented by in memory service
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
func (mts *RemoteMetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol)
}
// GetSubscribedURLs will be implemented by in memory service
func (mts *MetadataService) GetSubscribedURLs() ([]*common.URL, error) {
func (mts *RemoteMetadataService) GetSubscribedURLs() ([]*common.URL, error) {
return mts.inMemoryMetadataService.GetSubscribedURLs()
}
// GetServiceDefinition will be implemented by in memory service
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
func (mts *RemoteMetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
return mts.inMemoryMetadataService.GetServiceDefinition(interfaceName, group, version)
}
// GetServiceDefinitionByServiceKey will be implemented by in memory service
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
func (mts *RemoteMetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
return mts.inMemoryMetadataService.GetServiceDefinitionByServiceKey(serviceKey)
}
// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() {
mts.exportedRevision.Store(exportedRevision)
urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "")
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
return false, err
}
logger.Infof("urls length = %v", len(urls))
for _, ui := range urls {
u, err := common.NewURL(ui.(string))
if err != nil {
logger.Errorf("this is not valid url string: %s ", ui.(string))
continue
}
id := identifier.NewServiceMetadataIdentifier(u)
id.Revision = mts.exportedRevision.Load()
if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
return false, err
}
}
}
if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() {
mts.subscribedRevision.Store(subscribedRevision)
urls, err := mts.inMemoryMetadataService.GetSubscribedURLs()
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err)
return false, err
}
if len(urls) > 0 {
id := &identifier.SubscriberMetadataIdentifier{
MetadataIdentifier: identifier.MetadataIdentifier{
Application: config.GetApplicationConfig().Name,
},
Revision: subscribedRevision,
}
if err := mts.delegateReport.SaveSubscribedData(id, urls); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
return false, err
}
}
}
return true, nil
}
// Version will return the remote service version
func (MetadataService) Version() (string, error) {
return version, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remote
import (
"strings"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config/instance"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/registry"
)
type metadataServiceProxy struct {
serviceName string
revision string
report report.MetadataReport
}
func (m *metadataServiceProxy) Reference() string {
return constant.METADATA_SERVICE_NAME
}
func (m *metadataServiceProxy) ServiceName() (string, error) {
return m.serviceName, nil
}
func (m *metadataServiceProxy) ExportURL(url *common.URL) (bool, error) {
logger.Error("you should never invoke this implementation")
return true, nil
}
func (m *metadataServiceProxy) UnexportURL(url *common.URL) error {
logger.Error("you should never invoke this implementation")
return nil
}
func (m *metadataServiceProxy) SubscribeURL(url *common.URL) (bool, error) {
logger.Error("you should never invoke this implementation")
return true, nil
}
func (m *metadataServiceProxy) UnsubscribeURL(url *common.URL) error {
logger.Error("you should never invoke this implementation")
return nil
}
func (m *metadataServiceProxy) PublishServiceDefinition(url *common.URL) error {
logger.Error("you should never invoke this implementation")
return nil
}
func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
urls, err := m.report.GetExportedURLs(&identifier.ServiceMetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: serviceInterface,
Version: version,
Group: group,
Side: constant.PROVIDER_PROTOCOL,
},
Revision: m.revision,
Protocol: protocol,
})
if err != nil {
return []interface{}{}, nil
}
var res []*common.URL
for _, s := range urls {
u, err := common.NewURL(s)
if err != nil {
logger.Errorf("could not parse the url string to URL structure", err)
continue
}
res = append(res, u)
}
return service.ConvertURLArrToIntfArr(res), nil
}
func (m *metadataServiceProxy) MethodMapper() map[string]string {
return map[string]string{}
}
func (m *metadataServiceProxy) GetSubscribedURLs() ([]*common.URL, error) {
logger.Error("you should never invoke this implementation")
return nil, nil
}
func (m *metadataServiceProxy) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
return m.report.GetServiceDefinition(&identifier.MetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: interfaceName,
Group: group,
Version: version,
Side: constant.PROVIDER_PROTOCOL,
},
Application: m.serviceName,
})
}
func (m *metadataServiceProxy) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
params := parse(serviceKey)
return m.GetServiceDefinition(params[0], params[1], params[2])
}
func (m *metadataServiceProxy) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
logger.Error("you should never invoke this implementation")
return true, nil
}
func (m metadataServiceProxy) Version() (string, error) {
return version, nil
}
func newMetadataServiceProxy(ins registry.ServiceInstance) service.MetadataService {
revision := ins.GetMetadata()[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME]
if len(revision) == 0 {
revision = constant.DEFAULT_REVISION
}
return &metadataServiceProxy{
serviceName: ins.GetServiceName(),
revision: revision,
report: instance.GetMetadataReportInstance(),
}
}
func parse(key string) []string {
arr := make([]string, 3)
tmp := strings.SplitN(key, "/", 2)
if len(tmp) > 1 {
arr[0] = tmp[0]
key = tmp[1]
}
tmp = strings.SplitN(key, "/", 2)
if len(tmp) > 1 {
arr[2] = tmp[1]
key = tmp[0]
}
arr[1] = key
return arr
}
......@@ -112,6 +112,14 @@ func (m *mockMetadataReportFactory) CreateMetadataReport(*common.URL) report.Met
type mockMetadataReport struct {
}
func (m mockMetadataReport) GetAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier) (*common.MetadataInfo, error) {
panic("implement me")
}
func (m mockMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.SubscriberMetadataIdentifier, info *common.MetadataInfo) error {
panic("implement me")
}
func (m mockMetadataReport) StoreProviderMetadata(*identifier.MetadataIdentifier, string) error {
panic("implement me")
}
......
......@@ -62,6 +62,12 @@ type MetadataService interface {
RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error)
// Version will return the metadata service version
Version() (string, error)
GetMetadataInfo(revision string) *common.MetadataInfo
GetExportedServiceURLs() []*common.URL
GetMetadataServiceURL() *common.URL
}
// BaseMetadataService is used for the event logic for struct who will implement interface MetadataService
......
......@@ -370,59 +370,61 @@ func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, of
}
func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
params := make(map[string]interface{}, 8)
params[watch_type] = watch_type_service
params[watch_service] = listener.ServiceName
params[watch_passingonly] = watch_passingonly_true
plan, err := watch.Parse(params)
if err != nil {
logger.Errorf("add listener for service %s,error:%v", listener.ServiceName, err)
return err
}
plan.Handler = func(idx uint64, raw interface{}) {
services, ok := raw.([]*consul.ServiceEntry)
if !ok {
err = perrors.New("handler get non ServiceEntry type parameter")
return
for _, v := range listener.ServiceNames.Values() {
serviceName := v.(string)
params := make(map[string]interface{}, 8)
params[watch_type] = watch_type_service
params[watch_service] = serviceName
params[watch_passingonly] = watch_passingonly_true
plan, err := watch.Parse(params)
if err != nil {
logger.Errorf("add listener for service %s,error:%v", serviceName, err)
return err
}
instances := make([]registry.ServiceInstance, 0, len(services))
for _, ins := range services {
metadata := ins.Service.Meta
// enable status
enableStr := metadata[enable]
delete(metadata, enable)
enable, _ := strconv.ParseBool(enableStr)
// health status
status := ins.Checks.AggregatedStatus()
healthy := false
if status == consul.HealthPassing {
healthy = true
plan.Handler = func(idx uint64, raw interface{}) {
services, ok := raw.([]*consul.ServiceEntry)
if !ok {
err = perrors.New("handler get non ServiceEntry type parameter")
return
}
instances := make([]registry.ServiceInstance, 0, len(services))
for _, ins := range services {
metadata := ins.Service.Meta
// enable status
enableStr := metadata[enable]
delete(metadata, enable)
enable, _ := strconv.ParseBool(enableStr)
// health status
status := ins.Checks.AggregatedStatus()
healthy := false
if status == consul.HealthPassing {
healthy = true
}
instances = append(instances, &registry.DefaultServiceInstance{
Id: ins.Service.ID,
ServiceName: ins.Service.Service,
Host: ins.Service.Address,
Port: ins.Service.Port,
Enable: enable,
Healthy: healthy,
Metadata: metadata,
})
}
e := csd.DispatchEventForInstances(serviceName, instances)
if e != nil {
logger.Errorf("Dispatching event got exception, service name: %s, err: %v", serviceName, err)
}
instances = append(instances, &registry.DefaultServiceInstance{
Id: ins.Service.ID,
ServiceName: ins.Service.Service,
Host: ins.Service.Address,
Port: ins.Service.Port,
Enable: enable,
Healthy: healthy,
Metadata: metadata,
})
}
e := csd.DispatchEventForInstances(listener.ServiceName, instances)
if e != nil {
logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err)
}
go func() {
err = plan.RunWithConfig(csd.Config.Address, csd.Config)
if err != nil {
logger.Error("consul plan run failure!error:%v", err)
}
}()
}
go func() {
err = plan.RunWithConfig(csd.Config.Address, csd.Config)
if err != nil {
logger.Error("consul plan run failure!error:%v", err)
}
}()
return nil
}
......
......@@ -18,6 +18,7 @@
package event
import (
"github.com/apache/dubbo-go/metadata/service/inmemory"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/hash/page"
)
......@@ -25,7 +26,6 @@ import (
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/registry"
)
......@@ -153,5 +153,5 @@ func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent obser
// getMetadataService returns metadata service instance
func getMetadataService() (service.MetadataService, error) {
return extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
return inmemory.GetInMemoryMetadataService()
}
......@@ -28,23 +28,10 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
)
func init() {
exceptKeys := gxset.NewSet(
// remove APPLICATION_KEY because service name must be present
constant.APPLICATION_KEY,
// remove GROUP_KEY, always uses service name.
constant.GROUP_KEY,
// remove TIMESTAMP_KEY because it's nonsense
constant.TIMESTAMP_KEY)
extension.AddCustomizers(&metadataServiceURLParamsMetadataCustomizer{exceptKeys: exceptKeys})
}
type metadataServiceURLParamsMetadataCustomizer struct {
exceptKeys *gxset.HashSet
}
......@@ -60,16 +47,8 @@ func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry
logger.Errorf("could not find the metadata service", err)
return
}
serviceName := constant.METADATA_SERVICE_NAME
// error always is nil
version, _ := ms.Version()
group := instance.GetServiceName()
urls, err := ms.GetExportedURLs(serviceName, group, version, constant.ANY_VALUE)
if err != nil || len(urls) == 0 {
logger.Info("could not find the exported urls", err)
return
}
ps := m.convertToParams(urls)
url := ms.GetMetadataServiceURL()
ps := m.convertToParams(url)
str, err := json.Marshal(ps)
if err != nil {
logger.Errorf("could not transfer the map to json", err)
......@@ -78,28 +57,19 @@ func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry
instance.GetMetadata()[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME] = string(str)
}
func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []interface{}) map[string]map[string]string {
func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(url *common.URL) map[string]string {
// usually there will be only one protocol
res := make(map[string]map[string]string, 1)
res := make(map[string]string, 1)
// those keys are useless
for _, ui := range urls {
u, err := common.NewURL(ui.(string))
if err != nil {
logger.Errorf("could not parse the string to url: %s", ui.(string), err)
p := make(map[string]string, len(url.GetParams()))
for k, v := range url.GetParams() {
// we will ignore that
if !common.IncludeKeys.Contains(k) || len(v) == 0 || len(v[0]) == 0 {
continue
}
p := make(map[string]string, len(u.GetParams()))
for k, v := range u.GetParams() {
// we will ignore that
if m.exceptKeys.Contains(k) || len(v) == 0 || len(v[0]) == 0 {
continue
}
p[k] = v[0]
}
p[constant.PORT_KEY] = u.Port
res[u.Protocol] = p
p[k] = v[0]
}
p[constant.PORT_KEY] = url.Port
p[constant.PROTOCOL_KEY] = url.Protocol
return res
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment