From 0b0fde8942bca09b4796212419e0fca22c3b9747 Mon Sep 17 00:00:00 2001 From: "vito.he" <hxmhlt@163.com> Date: Thu, 9 Apr 2020 14:09:48 +0800 Subject: [PATCH] Add: add exporter --- common/rpc_service.go | 5 + config/config_loader.go | 1 + config/config_loader_test.go | 2 +- config/service_config.go | 99 ++++++++++++++++- config/service_config_test.go | 50 --------- metadata/definition/definition.go | 24 ++++- metadata/report/report.go | 1 + metadata/service/inmemory/in_memory_test.go | 37 ------- .../inmemory/{in_memory.go => service.go} | 97 +++++++++++------ metadata/service/inmemory/service_test.go | 95 +++++++++++++++++ metadata/service/service.go | 42 ++++---- .../service_exporter/configurable/exporter.go | 100 ++++++++++++++++++ .../configurable/exporter_test.go | 57 ++++++++++ .../exporter.go | 8 +- 14 files changed, 466 insertions(+), 152 deletions(-) delete mode 100644 metadata/service/inmemory/in_memory_test.go rename metadata/service/inmemory/{in_memory.go => service.go} (59%) create mode 100644 metadata/service/inmemory/service_test.go create mode 100644 metadata/service_exporter/configurable/exporter.go create mode 100644 metadata/service_exporter/configurable/exporter_test.go rename metadata/{exporter => service_exporter}/exporter.go (89%) diff --git a/common/rpc_service.go b/common/rpc_service.go index b235c32ab..cb4bc9d96 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -132,6 +132,11 @@ func (s *Service) Method() map[string]*MethodType { return s.methods } +// Method ... +func (s *Service) Name() string { + return s.name +} + // RcvrType ... func (s *Service) RcvrType() reflect.Type { return s.rcvrType diff --git a/config/config_loader.go b/config/config_loader.go index c0687d8fc..814c85b0a 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -198,6 +198,7 @@ func Load() { } svs.id = key svs.Implement(rpcService) + svs.Protocols = providerConfig.Protocols if err := svs.Export(); err != nil { panic(fmt.Sprintf("service %s export failed! ", key)) } diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 498f82678..3105dc2c0 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -89,7 +89,7 @@ func TestLoad(t *testing.T) { func TestLoadWithSingleReg(t *testing.T) { doInitConsumerWithSingleRegistry() - doInitProviderWithSingleRegistry() + MockInitProviderWithSingleRegistry() ms := &MockService{} SetConsumerService(ms) diff --git a/config/service_config.go b/config/service_config.go index 5853146aa..d5d087f42 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -71,11 +71,13 @@ type ServiceConfig struct { ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"` Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"` + Protocols map[string]*ProtocolConfig unexported *atomic.Bool exported *atomic.Bool rpcService common.RPCService cacheProtocol protocol.Protocol cacheMutex sync.Mutex + exporters []protocol.Exporter } // Prefix ... @@ -92,6 +94,8 @@ func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { if err := unmarshal((*plain)(c)); err != nil { return err } + c.exported = atomic.NewBool(false) + c.unexported = atomic.NewBool(false) return nil } @@ -105,6 +109,11 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig { } } +// IsExport will return whether the service config is exported or not +func (c *ServiceConfig) IsExport() bool { + return c.exported.Load() +} + // Export ... func (c *ServiceConfig) Export() error { // TODO: config center start here @@ -122,7 +131,7 @@ func (c *ServiceConfig) Export() error { regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER) urlMap := c.getUrlMap() - protocolConfigs := loadProtocol(c.Protocol, providerConfig.Protocols) + protocolConfigs := loadProtocol(c.Protocol, c.Protocols) if len(protocolConfigs) == 0 { logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol) return nil @@ -148,6 +157,9 @@ func (c *ServiceConfig) Export() error { if len(c.Tag) > 0 { ivkURL.AddParam(constant.Tagkey, c.Tag) } + + var exporter protocol.Exporter + if len(regUrls) > 0 { for _, regUrl := range regUrls { regUrl.SubURL = ivkURL @@ -160,22 +172,39 @@ func (c *ServiceConfig) Export() error { c.cacheMutex.Unlock() invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) - exporter := c.cacheProtocol.Export(invoker) + exporter = c.cacheProtocol.Export(invoker) if exporter == nil { panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL))) } } } else { invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL) - exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker) + exporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker) if exporter == nil { panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL))) } } + c.exporters = append(c.exporters, exporter) } + c.exported.Store(true) return nil } +// Unexport will call unexport of all exporters service config exported +func (c *ServiceConfig) Unexport() { + if !c.exported.Load() { + return + } + if c.unexported.Load() { + return + } + for _, exporter := range c.exporters { + exporter.Unexport() + } + c.exporters = nil + c.unexported.Store(true) +} + // Implement ... func (c *ServiceConfig) Implement(s common.RPCService) { c.rpcService = s @@ -246,3 +275,67 @@ func (c *ServiceConfig) getUrlMap() url.Values { return urlMap } + +// GetExportedUrls will return the url in service config's exporter +func (c *ServiceConfig) GetExportedUrls() []*common.URL { + if c.exported.Load() { + var urls []*common.URL + for _, exporter := range c.exporters { + url := exporter.GetInvoker().GetUrl() + urls = append(urls, &url) + } + return urls + } + return nil +} + +// MockInitProviderWithSingleRegistry will init a mocked providerConfig +func MockInitProviderWithSingleRegistry() { + providerConfig = &ProviderConfig{ + ApplicationConfig: &ApplicationConfig{ + Organization: "dubbo_org", + Name: "dubbo", + Module: "module", + Version: "2.6.0", + Owner: "dubbo", + Environment: "test"}, + Registry: &RegistryConfig{ + Address: "mock://127.0.0.1:2181", + Username: "user1", + Password: "pwd1", + }, + Registries: map[string]*RegistryConfig{}, + Services: map[string]*ServiceConfig{ + "MockService": { + InterfaceName: "com.MockService", + Protocol: "mock", + Cluster: "failover", + Loadbalance: "random", + Retries: "3", + Group: "huadong_idc", + Version: "1.0.0", + Methods: []*MethodConfig{ + { + Name: "GetUser", + Retries: "2", + Loadbalance: "random", + Weight: 200, + }, + { + Name: "GetUser1", + Retries: "2", + Loadbalance: "random", + Weight: 200, + }, + }, + }, + }, + Protocols: map[string]*ProtocolConfig{ + "mock": { + Name: "mock", + Ip: "127.0.0.1", + Port: "20000", + }, + }, + } +} diff --git a/config/service_config_test.go b/config/service_config_test.go index 6f3230890..2b53dc385 100644 --- a/config/service_config_test.go +++ b/config/service_config_test.go @@ -128,56 +128,6 @@ func doInitProvider() { } } -func doInitProviderWithSingleRegistry() { - providerConfig = &ProviderConfig{ - ApplicationConfig: &ApplicationConfig{ - Organization: "dubbo_org", - Name: "dubbo", - Module: "module", - Version: "2.6.0", - Owner: "dubbo", - Environment: "test"}, - Registry: &RegistryConfig{ - Address: "mock://127.0.0.1:2181", - Username: "user1", - Password: "pwd1", - }, - Registries: map[string]*RegistryConfig{}, - Services: map[string]*ServiceConfig{ - "MockService": { - InterfaceName: "com.MockService", - Protocol: "mock", - Cluster: "failover", - Loadbalance: "random", - Retries: "3", - Group: "huadong_idc", - Version: "1.0.0", - Methods: []*MethodConfig{ - { - Name: "GetUser", - Retries: "2", - Loadbalance: "random", - Weight: 200, - }, - { - Name: "GetUser1", - Retries: "2", - Loadbalance: "random", - Weight: 200, - }, - }, - }, - }, - Protocols: map[string]*ProtocolConfig{ - "mock": { - Name: "mock", - Ip: "127.0.0.1", - Port: "20000", - }, - }, - } -} - func Test_Export(t *testing.T) { doInitProvider() extension.SetProtocol("registry", GetProtocol) diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go index 5a983dd6d..4676af3e7 100644 --- a/metadata/definition/definition.go +++ b/metadata/definition/definition.go @@ -19,8 +19,10 @@ package definition import ( "bytes" + "github.com/apache/dubbo-go/common" ) +// ServiceDefinition is the describer of service definition type ServiceDefinition struct { CanonicalName string CodeSource string @@ -28,6 +30,7 @@ type ServiceDefinition struct { Types []TypeDefinition } +// MethodDefinition is the describer of method definition type MethodDefinition struct { Name string ParameterTypes []string @@ -35,6 +38,7 @@ type MethodDefinition struct { Parameters []TypeDefinition } +// TypeDefinition is the describer of type definition type TypeDefinition struct { Id string Type string @@ -44,9 +48,23 @@ type TypeDefinition struct { TypeBuilderName string } -// name... -func ServiceDefinitionBuild() *ServiceDefinition { - sd := &ServiceDefinition{} +// BuildServiceDefinition can build service definition which will be used to describe a service +func BuildServiceDefinition(service common.Service, url common.URL) ServiceDefinition { + sd := ServiceDefinition{} + sd.CanonicalName = url.Service() + + for k, m := range service.Method() { + var paramTypes []string + for _, t := range m.ArgsType() { + paramTypes = append(paramTypes, t.Kind().String()) + } + methodD := MethodDefinition{ + Name: k, + ParameterTypes: paramTypes, + ReturnType: m.ReplyType().Kind().String(), + } + sd.Methods = append(sd.Methods, methodD) + } return sd } diff --git a/metadata/report/report.go b/metadata/report/report.go index d3436101a..6b2efb240 100644 --- a/metadata/report/report.go +++ b/metadata/report/report.go @@ -23,6 +23,7 @@ import ( "github.com/apache/dubbo-go/metadata/identifier" ) +// MetadataReport is an interface of remote metadata report type MetadataReport interface { StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition) StoreConsumeretadata(*identifier.MetadataIdentifier, map[string]string) diff --git a/metadata/service/inmemory/in_memory_test.go b/metadata/service/inmemory/in_memory_test.go deleted file mode 100644 index 57d7664f5..000000000 --- a/metadata/service/inmemory/in_memory_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package inmemory - -import ( - "fmt" - "github.com/apache/dubbo-go/common" - "github.com/bmizerany/assert" - "testing" -) - -func TestMetadataService(t *testing.T) { - mts := NewMetadataService() - serviceName := "com.ikurento.user.UserProvider" - group := "group1" - version := "0.0.1" - protocol := "dubbo" - u, _ := common.NewURL(fmt.Sprintf("%v://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ - "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ - "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&"+ - "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ - "side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v", protocol, serviceName, group, version)) - mts.ExportURL(u) - sets := mts.GetExportedURLs(serviceName, group, version, protocol) - assert.Equal(t, 1, sets.Size()) - mts.SubscribeURL(u) - - mts.SubscribeURL(u) - sets2 := mts.GetSubscribedURLs() - assert.Equal(t, 1, sets2.Size()) - - mts.UnexportURL(u) - sets11 := mts.GetExportedURLs(serviceName, group, version, protocol) - assert.Equal(t, 0, sets11.Size()) - - mts.UnsubscribeURL(u) - sets22 := mts.GetSubscribedURLs() - assert.Equal(t, 0, sets22.Size()) -} diff --git a/metadata/service/inmemory/in_memory.go b/metadata/service/inmemory/service.go similarity index 59% rename from metadata/service/inmemory/in_memory.go rename to metadata/service/inmemory/service.go index f61fe3de0..d31e8d862 100644 --- a/metadata/service/inmemory/in_memory.go +++ b/metadata/service/inmemory/service.go @@ -17,11 +17,11 @@ package inmemory import ( + "encoding/json" "sync" ) import ( - "github.com/apache/dubbo-go/common/logger" "github.com/emirpasic/gods/sets" "github.com/emirpasic/gods/sets/treeset" "github.com/emirpasic/gods/utils" @@ -30,15 +30,17 @@ import ( import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/metadata/definition" "github.com/apache/dubbo-go/metadata/service" ) -// InMemoryMetadataService is store and query the metadata info in memory when each service registry +// MetadataService is store and query the metadata info in memory when each service registry type MetadataService struct { service.BaseMetadataService exportedServiceURLs *sync.Map subscribedServiceURLs *sync.Map + serviceDefinitions *sync.Map lock *sync.RWMutex } @@ -47,11 +49,12 @@ func NewMetadataService() *MetadataService { return &MetadataService{ exportedServiceURLs: new(sync.Map), subscribedServiceURLs: new(sync.Map), + serviceDefinitions: new(sync.Map), lock: new(sync.RWMutex), } } -// urlComparator: defined as utils.Comparator for treeset to compare the URL +// urlComparator is defined as utils.Comparator for treeset to compare the URL func urlComparator(a, b interface{}) int { url1 := a.(*common.URL) url2 := b.(*common.URL) @@ -65,7 +68,7 @@ func urlComparator(a, b interface{}) int { } } -// addURL: add URL in memory +// addURL will add URL in memory func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { var ( urlSet interface{} @@ -91,7 +94,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { return true } -// removeURL: used to remove specified url +// removeURL is used to remove specified url func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) { if value, loaded := targetMap.Load(url.ServiceKey()); loaded { mts.lock.Lock() @@ -105,7 +108,7 @@ func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) { } } -// getAllService: return all the exportedUrlString except for metadataService +// getAllService can return all the exportedUrlString except for metadataService func (mts *MetadataService) getAllService(services *sync.Map) sets.Set { sets := treeset.NewWith(utils.StringComparator) services.Range(func(key, value interface{}) bool { @@ -121,7 +124,7 @@ func (mts *MetadataService) getAllService(services *sync.Map) sets.Set { return sets } -// getSpecifiedService: return specified service url by serviceKey +// getSpecifiedService can return specified service url by serviceKey func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) sets.Set { targetSets := treeset.NewWith(utils.StringComparator) serviceSet, loaded := services.Load(serviceKey) @@ -136,63 +139,89 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s return targetSets } -// ExportURL: store the in memory treeset -func (mts *MetadataService) ExportURL(url common.URL) bool { - return mts.addURL(mts.exportedServiceURLs, &url) +// ExportURL can store the in memory treeset +func (mts *MetadataService) ExportURL(url common.URL) (bool, error) { + return mts.addURL(mts.exportedServiceURLs, &url), nil } -// UnexportURL: remove the url store in memory treeset -func (mts *MetadataService) UnexportURL(url common.URL) { +// UnexportURL can remove the url store in memory treeset +func (mts *MetadataService) UnexportURL(url common.URL) error { mts.removeURL(mts.exportedServiceURLs, &url) + return nil } -// SubscribeURL... -func (mts *MetadataService) SubscribeURL(url common.URL) bool { - return mts.addURL(mts.subscribedServiceURLs, &url) +// SubscribeURL can store the in memory treeset +func (mts *MetadataService) SubscribeURL(url common.URL) (bool, error) { + return mts.addURL(mts.subscribedServiceURLs, &url), nil } -// UnsubscribeURL... -func (mts *MetadataService) UnsubscribeURL(url common.URL) { +// UnsubscribeURL can remove the url store in memory treeset +func (mts *MetadataService) UnsubscribeURL(url common.URL) error { mts.removeURL(mts.subscribedServiceURLs, &url) + return nil } // PublishServiceDefinition: publish url's service metadata info, and write into memory -func (MetadataService) PublishServiceDefinition(url common.URL) { +func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { interfaceName := url.GetParam(constant.INTERFACE_KEY, "") isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) if len(interfaceName) > 0 && !isGeneric { //judge is consumer or provider - role := url.GetParam(constant.SIDE_KEY, "") + //side := url.GetParam(constant.SIDE_KEY, "") //var service common.RPCService - if role == common.RoleType(common.CONSUMER).Role() { - - //TODO:BOSS FANG - } else if role == common.RoleType(common.PROVIDER).Role() { - //TODO:BOSS FANG + service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + //if side == common.RoleType(common.CONSUMER).Role() { + // //TODO:generate the service definition and store it + // + //} else if side == common.RoleType(common.PROVIDER).Role() { + // //TODO:generate the service definition and store it + //} + sd := definition.BuildServiceDefinition(*service, url) + data, err := json.Marshal(sd) + if err != nil { + logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error: ", url, err) } - + mts.serviceDefinitions.Store(url.ServiceKey(), string(data)) + return nil } + logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) + return nil } // GetExportedURLs get all exported urls -func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) sets.Set { +func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (sets.Set, error) { if serviceInterface == constant.ANY_VALUE { - return mts.getAllService(mts.exportedServiceURLs) + return mts.getAllService(mts.exportedServiceURLs), nil } else { serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version) - return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol) + return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol), nil } } // GetSubscribedURLs get all subscribedUrl -func (mts *MetadataService) GetSubscribedURLs() sets.Set { - return mts.getAllService(mts.subscribedServiceURLs) +func (mts *MetadataService) GetSubscribedURLs() (sets.Set, error) { + return mts.getAllService(mts.subscribedServiceURLs), nil +} + +// GetServiceDefinition can get service definition by interfaceName, group and version +func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + serviceKey := definition.ServiceDescriperBuild(interfaceName, group, version) + v, _ := mts.serviceDefinitions.Load(serviceKey) + return v.(string), nil +} + +// GetServiceDefinition can get service definition by serviceKey +func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + v, _ := mts.serviceDefinitions.Load(serviceKey) + return v.(string), nil } -func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) string { - panic("implement me") +// Version will return the version of metadata service +func (mts *MetadataService) Version() string { + return "1.0.0" } -func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) string { - panic("implement me") +// Version will return the version of metadata service +func (mts *MetadataService) Reference() string { + return "MetadataService" } diff --git a/metadata/service/inmemory/service_test.go b/metadata/service/inmemory/service_test.go new file mode 100644 index 000000000..93e31b523 --- /dev/null +++ b/metadata/service/inmemory/service_test.go @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package inmemory + +import ( + "context" + "fmt" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/metadata/definition" +) + +type User struct { + Id string + Name string + Age int32 + Time time.Time +} + +type UserProvider struct { +} + +func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) { + rsp := User{"A001", "Alex Stocks", 18, time.Now()} + return &rsp, nil +} + +func (u *UserProvider) Reference() string { + return "UserProvider" +} + +func (u User) JavaClassName() string { + return "com.ikurento.user.User" +} + +func TestMetadataService(t *testing.T) { + mts := NewMetadataService() + serviceName := "com.ikurento.user.UserProvider" + group := "group1" + version := "0.0.1" + protocol := "dubbo" + beanName := "UserProvider" + u, _ := common.NewURL(fmt.Sprintf("%v://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245&group=%v&version=%v&bean.name=%v", protocol, serviceName, group, version, beanName)) + mts.ExportURL(u) + sets := mts.GetExportedURLs(serviceName, group, version, protocol) + assert.Equal(t, 1, sets.Size()) + mts.SubscribeURL(u) + + mts.SubscribeURL(u) + sets2 := mts.GetSubscribedURLs() + assert.Equal(t, 1, sets2.Size()) + + mts.UnexportURL(u) + sets11 := mts.GetExportedURLs(serviceName, group, version, protocol) + assert.Equal(t, 0, sets11.Size()) + + mts.UnsubscribeURL(u) + sets22 := mts.GetSubscribedURLs() + assert.Equal(t, 0, sets22.Size()) + + userProvider := &UserProvider{} + common.ServiceMap.Register(protocol, userProvider) + mts.PublishServiceDefinition(u) + expected := `{"CanonicalName":"com.ikurento.user.UserProvider","CodeSource":"","Methods":[{"Name":"GetUser","ParameterTypes":["slice"],"ReturnType":"ptr","Parameters":null}],"Types":null}` + assert.Equal(t, mts.GetServiceDefinition(serviceName, group, version), expected) + serviceKey := definition.ServiceDescriperBuild(serviceName, group, version) + assert.Equal(t, mts.GetServiceDefinitionByServiceKey(serviceKey), expected) +} diff --git a/metadata/service/service.go b/metadata/service/service.go index 4682def41..e3c6f21c5 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -28,30 +28,32 @@ import ( // Metadataservice is used to define meta data related behaviors type MetadataService interface { - ServiceName() string - ExportURL(url common.URL) bool - UnexportURL(url common.URL) - RefreshMetadata(exportedRevision string, subscribedRevision string) bool - SubscribeURL(url common.URL) bool - UnsubscribeURL(url common.URL) - PublishServiceDefinition(url common.URL) - - GetExportedURLs(serviceInterface string, group string, version string, protocol string) sets.Set - GetSubscribedURLs() sets.Set - GetServiceDefinition(interfaceName string, group string, version string) string - GetServiceDefinitionByServiceKey(serviceKey string) string + ServiceName() (string, error) + ExportURL(url common.URL) (bool, error) + UnexportURL(url common.URL) error + //RefreshMetadata(exportedRevision string, subscribedRevision string) bool + SubscribeURL(url common.URL) (bool, error) + UnsubscribeURL(url common.URL) error + PublishServiceDefinition(url common.URL) error + + GetExportedURLs(serviceInterface string, group string, version string, protocol string) (sets.Set, error) + GetSubscribedURLs() (sets.Set, error) + GetServiceDefinition(interfaceName string, group string, version string) (string, error) + GetServiceDefinitionByServiceKey(serviceKey string) (string, error) + Version() string + common.RPCService } -// BaseMetadataService: is used for the common logic for struct who will implement interface MetadataService +// BaseMetadataService is used for the common logic for struct who will implement interface MetadataService type BaseMetadataService struct { } -// ServiceName: get the service's name in meta service , which is application name -func (mts *BaseMetadataService) ServiceName() string { - return config.GetApplicationConfig().Name +// ServiceName can get the service's name in meta service , which is application name +func (mts *BaseMetadataService) ServiceName() (string, error) { + return config.GetApplicationConfig().Name, nil } -// RefreshMetadata: used for event listener's calling, to refresh metadata -func (mts *BaseMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { - return true -} +// RefreshMetadata is used for event listener's calling, to refresh metadata +//func (mts *BaseMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { +// return true +//} diff --git a/metadata/service_exporter/configurable/exporter.go b/metadata/service_exporter/configurable/exporter.go new file mode 100644 index 000000000..8033e1642 --- /dev/null +++ b/metadata/service_exporter/configurable/exporter.go @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package configurable + +import ( + "context" + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/service" + "github.com/apache/dubbo-go/metadata/service_exporter" +) + +// MetadataServiceExporter is the ConfigurableMetadataServiceExporter which implement MetadataServiceExporter interface +type MetadataServiceExporter struct { + serviceConfig *config.ServiceConfig + lock sync.RWMutex + metadataService service.MetadataService +} + +// NewMetadataServiceExporter will return a service_exporter.MetadataServiceExporter with the specified metadata service +func NewMetadataServiceExporter(metadataService service.MetadataService) service_exporter.MetadataServiceExporter { + return &MetadataServiceExporter{ + metadataService: metadataService, + } +} + +// Export will export the metadataService +func (exporter *MetadataServiceExporter) Export() error { + if !exporter.IsExported() { + exporter.lock.Lock() + defer exporter.lock.Unlock() + exporter.serviceConfig = config.NewServiceConfig("MetadataService", context.Background()) + exporter.serviceConfig.Protocol = constant.DEFAULT_PROTOCOL + exporter.serviceConfig.Protocols = map[string]*config.ProtocolConfig{ + constant.DEFAULT_PROTOCOL: generateMetadataProtocol(), + } + exporter.serviceConfig.InterfaceName = constant.METADATA_SERVICE_NAME + exporter.serviceConfig.Group = config.GetApplicationConfig().Name + exporter.serviceConfig.Version = exporter.metadataService.Version() + exporter.serviceConfig.Implement(exporter.metadataService) + err := exporter.serviceConfig.Export() + logger.Infof("The MetadataService exports urls : %v ", exporter.serviceConfig.GetExportedUrls()) + return err + } else { + logger.Warnf("The MetadataService has been exported : %v ", exporter.serviceConfig.GetExportedUrls()) + return nil + } + +} + +// Unexport will unexport the metadataService +func (exporter *MetadataServiceExporter) Unexport() { + if exporter.IsExported() { + exporter.lock.Lock() + defer exporter.lock.Unlock() + exporter.serviceConfig.Unexport() + } +} + +// GetExportedURLs will return the urls that export use. +// Notice锛乀he exported url is not same as url in registry , for example it lack the ip. +func (exporter *MetadataServiceExporter) GetExportedURLs() []*common.URL { + return exporter.serviceConfig.GetExportedUrls() +} + +// isExported will return is metadataServiceExporter exported or not +func (exporter *MetadataServiceExporter) IsExported() bool { + exporter.lock.RLock() + defer exporter.lock.RUnlock() + return exporter.serviceConfig != nil && exporter.serviceConfig.IsExport() +} + +// generateMetadataProtocol will return a default ProtocolConfig +func generateMetadataProtocol() *config.ProtocolConfig { + return &config.ProtocolConfig{ + Name: constant.DEFAULT_PROTOCOL, + Port: "20000", + } +} diff --git a/metadata/service_exporter/configurable/exporter_test.go b/metadata/service_exporter/configurable/exporter_test.go new file mode 100644 index 000000000..4056a0087 --- /dev/null +++ b/metadata/service_exporter/configurable/exporter_test.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package configurable + +import ( + "fmt" + _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/config" + _ "github.com/apache/dubbo-go/filter/filter_impl" + "github.com/apache/dubbo-go/metadata/service/inmemory" + "github.com/apache/dubbo-go/protocol/dubbo" + _ "github.com/apache/dubbo-go/protocol/dubbo" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestConfigurableExporter(t *testing.T) { + dubbo.SetServerConfig(dubbo.ServerConfig{ + SessionNumber: 700, + SessionTimeout: "20s", + GettySessionParam: dubbo.GettySessionParam{ + CompressEncoding: false, + TcpNoDelay: true, + TcpKeepAlive: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpWBufSize: 65536, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 10240000000, + SessionName: "server", + }}) + config.MockInitProviderWithSingleRegistry() + metadataService := inmemory.NewMetadataService() + exported := NewMetadataServiceExporter(metadataService) + assert.Equal(t, false, exported.IsExported()) + assert.NoError(t, exported.Export()) + assert.Equal(t, true, exported.IsExported()) + fmt.Println(exported.GetExportedURLs()) +} diff --git a/metadata/exporter/exporter.go b/metadata/service_exporter/exporter.go similarity index 89% rename from metadata/exporter/exporter.go rename to metadata/service_exporter/exporter.go index a6290c1ea..2ccf17f4d 100644 --- a/metadata/exporter/exporter.go +++ b/metadata/service_exporter/exporter.go @@ -15,15 +15,15 @@ * limitations under the License. */ -package exporter +package service_exporter import ( "github.com/apache/dubbo-go/common" ) -type MetadataExporter interface { - Export() MetadataExporter - Unexport() MetadataExporter +type MetadataServiceExporter interface { + Export() error + Unexport() GetExportedURLs() []*common.URL IsExported() bool } -- GitLab