Skip to content
Snippets Groups Projects
Commit 2997c11d authored by flycash's avatar flycash
Browse files

refactor MetadataService

parent 59eee390
No related branches found
No related tags found
No related merge requests found
Showing
with 160 additions and 63 deletions
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"fmt"
"github.com/apache/dubbo-go/metadata/service"
)
var (
// there will be two types: local or remote
metadataServiceInsMap = make(map[string]func() (service.MetadataService, error), 2)
)
func SetMetadataService(msType string, creator func() (service.MetadataService, error)) {
metadataServiceInsMap[msType] = creator
}
func GetMetadataService(msType string) (service.MetadataService, error) {
if creator, ok := metadataServiceInsMap[msType]; ok {
return creator()
}
panic(fmt.Sprintf("could not find the creator for metadataType: %s, please check whether you have imported relative packages, \n"+
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+
"remote - github.com/apache/dubbo-go/metadata/service/remote", msType))
}
......@@ -137,7 +137,7 @@ func TestSuiteMethod(t *testing.T) {
assert.True(t, ok)
methodType := suiteMethod(method)
method = methodType.Method()
assert.Equal(t, "func(*event.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String())
assert.Equal(t, "func(*common.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String())
at := methodType.ArgsType()
assert.Equal(t, "interface {}", at[0].String())
assert.Equal(t, "interface {}", at[1].String())
......@@ -151,7 +151,7 @@ func TestSuiteMethod(t *testing.T) {
assert.True(t, ok)
methodType = suiteMethod(method)
method = methodType.Method()
assert.Equal(t, "func(*event.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String())
assert.Equal(t, "func(*common.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String())
at = methodType.ArgsType()
assert.Equal(t, "interface {}", at[0].String())
assert.Equal(t, "interface {}", at[1].String())
......@@ -164,7 +164,7 @@ func TestSuiteMethod(t *testing.T) {
assert.True(t, ok)
methodType = suiteMethod(method)
method = methodType.Method()
assert.Equal(t, "func(*event.TestService) error", method.Type.String())
assert.Equal(t, "func(*common.TestService) error", method.Type.String())
at = methodType.ArgsType()
assert.Equal(t, 0, len(at))
assert.Nil(t, methodType.CtxType())
......
......@@ -44,7 +44,9 @@ import (
"github.com/apache/dubbo-go/protocol/protocolwrapper"
)
// ServiceConfig ...
// ServiceConfig is a newest structure to support Dubbo 2.7.5
// But I think it's not very necessary,
// we should think about how to reuse current ProviderConfig rather than use this
type ServiceConfig struct {
context context.Context
id string
......
......@@ -21,7 +21,9 @@ import (
"strconv"
"time"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
)
import (
......@@ -55,7 +57,8 @@ type DynamicConfigurationServiceNameMapping struct {
func (d *DynamicConfigurationServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error {
// metadata service is admin service, should not be mapped
if constant.METADATA_SERVICE_NAME == serviceInterface {
return perrors.New("try to map the metadata service, will be ignored")
logger.Info("try to map the metadata service, will be ignored")
return nil
}
appName := config.GetApplicationConfig().Name
......
......@@ -48,7 +48,7 @@ func TestDynamicConfigurationServiceNameMapping(t *testing.T) {
protocol := "myProtocol"
err = mapping.Map(intf, group, version, protocol)
assert.NotNil(t, err)
assert.Nil(t, err)
intf = "MyService"
err = mapping.Map(intf, group, version, protocol)
assert.Nil(t, err)
......
......@@ -181,7 +181,7 @@ func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) []string {
func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) string {
cfg, err := n.client.GetConfig(param)
if err != nil {
logger.Errorf("Finding the configuration failed: %v", param)
logger.Errorf("Finding the configuration failed: %v, err: %v", param, err)
}
return cfg
}
......
......@@ -44,7 +44,7 @@ func TestNacosMetadataReport_CRUD(t *testing.T) {
assert.Nil(t, err)
serviceMi := newServiceMetadataIdentifier()
serviceUrl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
serviceUrl, _ := common.NewURL("registry://localhost:8848", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
err = rpt.SaveServiceMetadata(serviceMi, serviceUrl)
assert.Nil(t, err)
......
......@@ -33,7 +33,7 @@ import (
// MetadataServiceExporter is the ConfigurableMetadataServiceExporter which implement MetadataServiceExporter interface
type MetadataServiceExporter struct {
serviceConfig *config.ServiceConfig
ServiceConfig *config.ServiceConfig
lock sync.RWMutex
metadataService service.MetadataService
}
......@@ -56,42 +56,43 @@ func (exporter *MetadataServiceExporter) Export() error {
}
serviceConfig.InterfaceName = constant.METADATA_SERVICE_NAME
serviceConfig.Group = config.GetApplicationConfig().Name
serviceConfig.Version = exporter.metadataService.Version()
// now the error will always be nil
serviceConfig.Version, _ = exporter.metadataService.Version()
var err error
func() {
exporter.lock.Lock()
defer exporter.lock.Unlock()
exporter.serviceConfig = serviceConfig
exporter.serviceConfig.Implement(exporter.metadataService)
err = exporter.serviceConfig.Export()
exporter.ServiceConfig = serviceConfig
exporter.ServiceConfig.Implement(exporter.metadataService)
err = exporter.ServiceConfig.Export()
}()
logger.Infof("The MetadataService exports urls : %v ", exporter.serviceConfig.GetExportedUrls())
logger.Infof("The MetadataService exports urls : %v ", exporter.ServiceConfig.GetExportedUrls())
return err
}
logger.Warnf("The MetadataService has been exported : %v ", exporter.serviceConfig.GetExportedUrls())
logger.Warnf("The MetadataService has been exported : %v ", exporter.ServiceConfig.GetExportedUrls())
return nil
}
// Unexport will unexport the metadataService
func (exporter *MetadataServiceExporter) Unexport() {
if exporter.IsExported() {
exporter.serviceConfig.Unexport()
exporter.ServiceConfig.Unexport()
}
}
// GetExportedURLs will return the urls that export use.
// Notice!The exported url is not same as url in registry , for example it lack the ip.
func (exporter *MetadataServiceExporter) GetExportedURLs() []*common.URL {
return exporter.serviceConfig.GetExportedUrls()
return exporter.ServiceConfig.GetExportedUrls()
}
// isExported will return is metadataServiceExporter exported or not
func (exporter *MetadataServiceExporter) IsExported() bool {
exporter.lock.RLock()
defer exporter.lock.RUnlock()
return exporter.serviceConfig != nil && exporter.serviceConfig.IsExport()
return exporter.ServiceConfig != nil && exporter.ServiceConfig.IsExport()
}
// generateMetadataProtocol will return a default ProtocolConfig
......
......@@ -53,7 +53,7 @@ func TestConfigurableExporter(t *testing.T) {
SessionName: "server",
}})
mockInitProviderWithSingleRegistry()
metadataService := inmemory.NewMetadataService()
metadataService, _ := inmemory.NewMetadataService()
exported := NewMetadataServiceExporter(metadataService)
assert.Equal(t, false, exported.IsExported())
assert.NoError(t, exported.Export())
......
......@@ -19,6 +19,9 @@ package inmemory
import (
"sort"
"sync"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config"
)
import (
......@@ -34,6 +37,10 @@ import (
"github.com/apache/dubbo-go/metadata/service"
)
func init() {
extension.SetMetadataService("local", NewMetadataService)
}
// version will be used by Version func
const version = "1.0.0"
......@@ -47,13 +54,14 @@ type MetadataService struct {
}
// NewMetadataService: initiate a metadata service
func NewMetadataService() *MetadataService {
func NewMetadataService() (service.MetadataService, error) {
return &MetadataService{
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
exportedServiceURLs: &sync.Map{},
subscribedServiceURLs: &sync.Map{},
serviceDefinitions: &sync.Map{},
lock: &sync.RWMutex{},
}
}, nil
}
// Comparator is defined as Comparator for skip list to compare the URL
......@@ -228,11 +236,11 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string)
}
// RefreshMetadata will always return true because it will be implement by remote service
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool {
return true
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
return true, nil
}
// Version will return the version of metadata service
func (mts *MetadataService) Version() string {
return version
func (mts *MetadataService) Version() (string, error) {
return version, nil
}
......@@ -32,7 +32,7 @@ import (
)
func TestMetadataService(t *testing.T) {
mts := NewMetadataService()
mts, _ := NewMetadataService()
serviceName := "com.ikurento.user.UserProvider"
group := "group1"
version := "0.0.1"
......@@ -66,25 +66,20 @@ func TestMetadataService(t *testing.T) {
assert.NoError(t, err)
mts.ExportURL(u)
list, _ := mts.GetExportedURLs(serviceName, group, version, protocol)
assert.Equal(t, uint64(3), list.Len())
iter := list.IterAtPosition(0)
for iter.Next() {
comparator := iter.Value()
fmt.Println(comparator)
}
assert.Equal(t, 3, len(list))
mts.SubscribeURL(u)
mts.SubscribeURL(u)
list2, _ := mts.GetSubscribedURLs()
assert.Equal(t, uint64(1), list2.Len())
assert.Equal(t, 1, len(list2))
mts.UnexportURL(u)
list3, _ := mts.GetExportedURLs(serviceName, group, version, protocol)
assert.Equal(t, uint64(2), list3.Len())
assert.Equal(t, 2, len(list3))
mts.UnsubscribeURL(u)
list4, _ := mts.GetSubscribedURLs()
assert.Equal(t, uint64(0), list4.Len())
assert.Equal(t, 0, len(list4))
userProvider := &definition.UserProvider{}
common.ServiceMap.Register(serviceName, protocol, userProvider)
......
......@@ -21,6 +21,8 @@ import (
"sync"
"go.uber.org/atomic"
"github.com/apache/dubbo-go/common/extension"
)
import (
......@@ -38,6 +40,10 @@ import (
// version will be used by Version func
const version = "1.0.0"
func init() {
extension.SetMetadataService("remote", newMetadataService)
}
// MetadataService is a implement of metadata service which will delegate the remote metadata report
// This is singleton
type MetadataService struct {
......@@ -53,8 +59,8 @@ var (
metadataServiceInstance *MetadataService
)
// NewMetadataService will create a new remote MetadataService instance
func NewMetadataService() (*MetadataService, error) {
// newMetadataService will create a new remote MetadataService instance
func newMetadataService() (service.MetadataService, error) {
var err error
metadataServiceOnce.Do(func() {
var mr *delegate.MetadataReport
......@@ -62,8 +68,11 @@ func NewMetadataService() (*MetadataService, error) {
if err != nil {
return
}
// it will never return error
inms, _ := inmemory.NewMetadataService()
metadataServiceInstance = &MetadataService{
inMemoryMetadataService: inmemory.NewMetadataService(),
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
inMemoryMetadataService: inms.(*inmemory.MetadataService),
delegateReport: mr,
}
})
......@@ -140,14 +149,13 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string)
}
// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool {
result := true
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() {
mts.exportedRevision.Store(exportedRevision)
urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "")
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
return false, err
}
logger.Infof("urls length = %v", len(urls))
for _, u := range urls {
......@@ -155,7 +163,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
id.Revision = mts.exportedRevision.Load()
if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
return false, err
}
}
}
......@@ -165,7 +173,7 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
urls, err := mts.inMemoryMetadataService.GetSubscribedURLs()
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err)
result = false
return false, err
}
if urls != nil && len(urls) > 0 {
id := &identifier.SubscriberMetadataIdentifier{
......@@ -176,14 +184,14 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
}
if err := mts.delegateReport.SaveSubscribedData(id, urls); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
return false, err
}
}
}
return result
return true, nil
}
// Version will return the remote service version
func (MetadataService) Version() string {
return version
func (MetadataService) Version() (string, error) {
return version, nil
}
......@@ -97,16 +97,16 @@ func TestMetadataService(t *testing.T) {
"mock://127.0.0.1:20000/?sync.report=true"))
assert.NoError(t, err)
instance.GetMetadataReportInstance(&u)
mts, err := NewMetadataService()
mts, err := newMetadataService()
assert.NoError(t, err)
mts.setInMemoryMetadataService(mockInmemoryProc(t))
mts.(*MetadataService).setInMemoryMetadataService(mockInmemoryProc(t))
mts.RefreshMetadata("0.0.1", "0.0.1")
assert.Equal(t, 1, len(serviceMetadata))
assert.Equal(t, 1, len(subscribedMetadata))
}
func mockInmemoryProc(t *testing.T) *inmemory.MetadataService {
mts := inmemory.NewMetadataService()
mts, _ := inmemory.NewMetadataService()
serviceName := "com.ikurento.user.UserProvider"
group := "group1"
version := "0.0.1"
......@@ -135,5 +135,5 @@ func mockInmemoryProc(t *testing.T) *inmemory.MetadataService {
serviceKey := definition.ServiceDescriperBuild(serviceName, group, version)
def2, _ := mts.GetServiceDefinitionByServiceKey(serviceKey)
assert.Equal(t, expected, def2)
return mts
return mts.(*inmemory.MetadataService)
}
......@@ -20,7 +20,6 @@ package service
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config"
)
// MetadataService is used to define meta data related behaviors
......@@ -50,18 +49,25 @@ type MetadataService interface {
// GetServiceDefinition will get the target service info store in metadata by service key
GetServiceDefinitionByServiceKey(serviceKey string) (string, error)
// RefreshMetadata will refresh the metadata
RefreshMetadata(exportedRevision string, subscribedRevision string) bool
RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error)
// Version will return the metadata service version
Version() string
Version() (string, error)
}
// BaseMetadataService is used for the event logic for struct who will implement interface MetadataService
type BaseMetadataService struct {
serviceName string
}
func NewBaseMetadataService(serviceName string) BaseMetadataService {
return BaseMetadataService{
serviceName: serviceName,
}
}
// ServiceName can get the service's name in meta service , which is application name
func (mts *BaseMetadataService) ServiceName() (string, error) {
return config.GetApplicationConfig().Name, nil
return mts.serviceName, nil
}
// Version will return the version of metadata service
......
......@@ -20,6 +20,9 @@ package event
import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/service"
)
import (
......@@ -139,3 +142,7 @@ func (epsd *EventPublishingServiceDiscovery) executeWithEvents(beforeEvent obser
}
return nil
}
func getMetadataService() (service.MetadataService, error) {
return extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
}
......@@ -26,6 +26,5 @@ import (
func TestLogEventListener(t *testing.T) {
l := &logEventListener{}
assert.Equal(t, 0, l.GetPriority())
assert.Nil(t, l.GetEventType())
assert.Nil(t, l.OnEvent(&ServiceDiscoveryDestroyedEvent{}))
}
......@@ -26,7 +26,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service/remote"
"github.com/apache/dubbo-go/registry"
)
......@@ -52,17 +51,18 @@ func (m *metadataServiceURLParamsMetadataCustomizer) GetPriority() int {
}
func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry.ServiceInstance) {
ms, err := remote.NewMetadataService()
ms, err := getMetadataService()
if err != nil {
logger.Errorf("could not find the metadata service", err)
return
}
serviceName := constant.METADATA_SERVICE_NAME
version := ms.Version()
// error always is nil
version, _ := ms.Version()
group := instance.GetServiceName()
urls, err := ms.GetExportedURLs(serviceName, group, version, constant.ANY_VALUE)
if err != nil || len(urls) == 0 {
logger.Errorf("could not find the exported urls", err)
logger.Info("could not find the exported urls", err)
return
}
ps := m.convertToParams(urls)
......
......@@ -23,7 +23,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service/remote"
"github.com/apache/dubbo-go/registry"
)
......@@ -38,7 +37,7 @@ func (p *ProtocolPortsMetadataCustomizer) GetPriority() int {
// Customize will
func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceInstance) {
metadataService, err := remote.NewMetadataService()
metadataService, err := getMetadataService()
if err != nil {
logger.Errorf("Could not init the MetadataService", err)
return
......
......@@ -18,11 +18,24 @@
package event
import (
"time"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/config"
)
type ServiceConfigExportedEvent struct {
observer.BaseEvent
ServiceConfig config.ServiceConfig
ServiceConfig *config.ServiceConfig
}
func NewServiceConfigExportedEvent(serviceConfig *config.ServiceConfig) *ServiceConfigExportedEvent {
return &ServiceConfigExportedEvent{
BaseEvent: observer.BaseEvent{
Source:serviceConfig,
Timestamp:time.Now(),
},
ServiceConfig: serviceConfig,
}
}
......@@ -20,6 +20,9 @@ package event
import (
"reflect"
perrors "github.com/pkg/errors"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/metadata/mapping"
......@@ -43,6 +46,17 @@ func (s *serviceNameMappingListener) GetPriority() int {
func (s *serviceNameMappingListener) OnEvent(e observer.Event) error {
if ex, ok := e.(*ServiceConfigExportedEvent); ok {
sc := ex.ServiceConfig
urls := sc.GetExportedUrls()
for _, u := range urls {
err := s.nameMapping.Map(u.GetParam(constant.INTERFACE_KEY, ""),
u.GetParam(constant.GROUP_KEY, ""),
u.GetParam(constant.Version, ""),
u.Protocol)
if err != nil {
return perrors.WithMessage(err, "could not map the service: "+u.String())
}
}
}
return nil
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment