Skip to content
Snippets Groups Projects
Commit e204bc23 authored by vito.he's avatar vito.he
Browse files

Add: add exporter

parent 9276f467
No related branches found
No related tags found
No related merge requests found
...@@ -132,6 +132,11 @@ func (s *Service) Method() map[string]*MethodType { ...@@ -132,6 +132,11 @@ func (s *Service) Method() map[string]*MethodType {
return s.methods return s.methods
} }
// Method ...
func (s *Service) Name() string {
return s.name
}
// RcvrType ... // RcvrType ...
func (s *Service) RcvrType() reflect.Type { func (s *Service) RcvrType() reflect.Type {
return s.rcvrType return s.rcvrType
......
...@@ -198,6 +198,7 @@ func Load() { ...@@ -198,6 +198,7 @@ func Load() {
} }
svs.id = key svs.id = key
svs.Implement(rpcService) svs.Implement(rpcService)
svs.Protocols = providerConfig.Protocols
if err := svs.Export(); err != nil { if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! ", key)) panic(fmt.Sprintf("service %s export failed! ", key))
} }
......
...@@ -89,7 +89,7 @@ func TestLoad(t *testing.T) { ...@@ -89,7 +89,7 @@ func TestLoad(t *testing.T) {
func TestLoadWithSingleReg(t *testing.T) { func TestLoadWithSingleReg(t *testing.T) {
doInitConsumerWithSingleRegistry() doInitConsumerWithSingleRegistry()
doInitProviderWithSingleRegistry() MockInitProviderWithSingleRegistry()
ms := &MockService{} ms := &MockService{}
SetConsumerService(ms) SetConsumerService(ms)
......
...@@ -71,11 +71,13 @@ type ServiceConfig struct { ...@@ -71,11 +71,13 @@ type ServiceConfig struct {
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"` ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"` Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
Protocols map[string]*ProtocolConfig
unexported *atomic.Bool unexported *atomic.Bool
exported *atomic.Bool exported *atomic.Bool
rpcService common.RPCService rpcService common.RPCService
cacheProtocol protocol.Protocol cacheProtocol protocol.Protocol
cacheMutex sync.Mutex cacheMutex sync.Mutex
exporters []protocol.Exporter
} }
// Prefix ... // Prefix ...
...@@ -92,6 +94,8 @@ func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { ...@@ -92,6 +94,8 @@ func (c *ServiceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if err := unmarshal((*plain)(c)); err != nil { if err := unmarshal((*plain)(c)); err != nil {
return err return err
} }
c.exported = atomic.NewBool(false)
c.unexported = atomic.NewBool(false)
return nil return nil
} }
...@@ -105,6 +109,11 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig { ...@@ -105,6 +109,11 @@ func NewServiceConfig(id string, context context.Context) *ServiceConfig {
} }
} }
// IsExport will return whether the service config is exported or not
func (c *ServiceConfig) IsExport() bool {
return c.exported.Load()
}
// Export ... // Export ...
func (c *ServiceConfig) Export() error { func (c *ServiceConfig) Export() error {
// TODO: config center start here // TODO: config center start here
...@@ -122,7 +131,7 @@ func (c *ServiceConfig) Export() error { ...@@ -122,7 +131,7 @@ func (c *ServiceConfig) Export() error {
regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER) regUrls := loadRegistries(c.Registry, providerConfig.Registries, common.PROVIDER)
urlMap := c.getUrlMap() urlMap := c.getUrlMap()
protocolConfigs := loadProtocol(c.Protocol, providerConfig.Protocols) protocolConfigs := loadProtocol(c.Protocol, c.Protocols)
if len(protocolConfigs) == 0 { if len(protocolConfigs) == 0 {
logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol) logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs ", c.InterfaceName, c.Protocol)
return nil return nil
...@@ -148,6 +157,9 @@ func (c *ServiceConfig) Export() error { ...@@ -148,6 +157,9 @@ func (c *ServiceConfig) Export() error {
if len(c.Tag) > 0 { if len(c.Tag) > 0 {
ivkURL.AddParam(constant.Tagkey, c.Tag) ivkURL.AddParam(constant.Tagkey, c.Tag)
} }
var exporter protocol.Exporter
if len(regUrls) > 0 { if len(regUrls) > 0 {
for _, regUrl := range regUrls { for _, regUrl := range regUrls {
regUrl.SubURL = ivkURL regUrl.SubURL = ivkURL
...@@ -160,22 +172,39 @@ func (c *ServiceConfig) Export() error { ...@@ -160,22 +172,39 @@ func (c *ServiceConfig) Export() error {
c.cacheMutex.Unlock() c.cacheMutex.Unlock()
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl) invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*regUrl)
exporter := c.cacheProtocol.Export(invoker) exporter = c.cacheProtocol.Export(invoker)
if exporter == nil { if exporter == nil {
panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL))) panic(perrors.New(fmt.Sprintf("Registry protocol new exporter error,registry is {%v},url is {%v}", regUrl, ivkURL)))
} }
} }
} else { } else {
invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL) invoker := extension.GetProxyFactory(providerConfig.ProxyFactory).GetInvoker(*ivkURL)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker) exporter = extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil { if exporter == nil {
panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL))) panic(perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error,url is {%v}", ivkURL)))
} }
} }
c.exporters = append(c.exporters, exporter)
} }
c.exported.Store(true)
return nil return nil
} }
// Unexport will call unexport of all exporters service config exported
func (c *ServiceConfig) Unexport() {
if !c.exported.Load() {
return
}
if c.unexported.Load() {
return
}
for _, exporter := range c.exporters {
exporter.Unexport()
}
c.exporters = nil
c.unexported.Store(true)
}
// Implement ... // Implement ...
func (c *ServiceConfig) Implement(s common.RPCService) { func (c *ServiceConfig) Implement(s common.RPCService) {
c.rpcService = s c.rpcService = s
...@@ -246,3 +275,67 @@ func (c *ServiceConfig) getUrlMap() url.Values { ...@@ -246,3 +275,67 @@ func (c *ServiceConfig) getUrlMap() url.Values {
return urlMap return urlMap
} }
// GetExportedUrls will return the url in service config's exporter
func (c *ServiceConfig) GetExportedUrls() []*common.URL {
if c.exported.Load() {
var urls []*common.URL
for _, exporter := range c.exporters {
url := exporter.GetInvoker().GetUrl()
urls = append(urls, &url)
}
return urls
}
return nil
}
// MockInitProviderWithSingleRegistry will init a mocked providerConfig
func MockInitProviderWithSingleRegistry() {
providerConfig = &ProviderConfig{
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Registry: &RegistryConfig{
Address: "mock://127.0.0.1:2181",
Username: "user1",
Password: "pwd1",
},
Registries: map[string]*RegistryConfig{},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
{
Name: "GetUser1",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
},
},
},
Protocols: map[string]*ProtocolConfig{
"mock": {
Name: "mock",
Ip: "127.0.0.1",
Port: "20000",
},
},
}
}
...@@ -128,56 +128,6 @@ func doInitProvider() { ...@@ -128,56 +128,6 @@ func doInitProvider() {
} }
} }
func doInitProviderWithSingleRegistry() {
providerConfig = &ProviderConfig{
ApplicationConfig: &ApplicationConfig{
Organization: "dubbo_org",
Name: "dubbo",
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Registry: &RegistryConfig{
Address: "mock://127.0.0.1:2181",
Username: "user1",
Password: "pwd1",
},
Registries: map[string]*RegistryConfig{},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
Retries: "3",
Group: "huadong_idc",
Version: "1.0.0",
Methods: []*MethodConfig{
{
Name: "GetUser",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
{
Name: "GetUser1",
Retries: "2",
Loadbalance: "random",
Weight: 200,
},
},
},
},
Protocols: map[string]*ProtocolConfig{
"mock": {
Name: "mock",
Ip: "127.0.0.1",
Port: "20000",
},
},
}
}
func Test_Export(t *testing.T) { func Test_Export(t *testing.T) {
doInitProvider() doInitProvider()
extension.SetProtocol("registry", GetProtocol) extension.SetProtocol("registry", GetProtocol)
......
...@@ -19,8 +19,10 @@ package definition ...@@ -19,8 +19,10 @@ package definition
import ( import (
"bytes" "bytes"
"github.com/apache/dubbo-go/common"
) )
// ServiceDefinition is the describer of service definition
type ServiceDefinition struct { type ServiceDefinition struct {
CanonicalName string CanonicalName string
CodeSource string CodeSource string
...@@ -28,6 +30,7 @@ type ServiceDefinition struct { ...@@ -28,6 +30,7 @@ type ServiceDefinition struct {
Types []TypeDefinition Types []TypeDefinition
} }
// MethodDefinition is the describer of method definition
type MethodDefinition struct { type MethodDefinition struct {
Name string Name string
ParameterTypes []string ParameterTypes []string
...@@ -35,6 +38,7 @@ type MethodDefinition struct { ...@@ -35,6 +38,7 @@ type MethodDefinition struct {
Parameters []TypeDefinition Parameters []TypeDefinition
} }
// TypeDefinition is the describer of type definition
type TypeDefinition struct { type TypeDefinition struct {
Id string Id string
Type string Type string
...@@ -44,9 +48,23 @@ type TypeDefinition struct { ...@@ -44,9 +48,23 @@ type TypeDefinition struct {
TypeBuilderName string TypeBuilderName string
} }
// name... // BuildServiceDefinition can build service definition which will be used to describe a service
func ServiceDefinitionBuild() *ServiceDefinition { func BuildServiceDefinition(service common.Service, url common.URL) ServiceDefinition {
sd := &ServiceDefinition{} sd := ServiceDefinition{}
sd.CanonicalName = url.Service()
for k, m := range service.Method() {
var paramTypes []string
for _, t := range m.ArgsType() {
paramTypes = append(paramTypes, t.Kind().String())
}
methodD := MethodDefinition{
Name: k,
ParameterTypes: paramTypes,
ReturnType: m.ReplyType().Kind().String(),
}
sd.Methods = append(sd.Methods, methodD)
}
return sd return sd
} }
......
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"github.com/apache/dubbo-go/metadata/identifier" "github.com/apache/dubbo-go/metadata/identifier"
) )
// MetadataReport is an interface of remote metadata report
type MetadataReport interface { type MetadataReport interface {
StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition) StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition)
StoreConsumeretadata(*identifier.MetadataIdentifier, map[string]string) StoreConsumeretadata(*identifier.MetadataIdentifier, map[string]string)
......
...@@ -17,11 +17,11 @@ ...@@ -17,11 +17,11 @@
package inmemory package inmemory
import ( import (
"encoding/json"
"sync" "sync"
) )
import ( import (
"github.com/apache/dubbo-go/common/logger"
"github.com/emirpasic/gods/sets" "github.com/emirpasic/gods/sets"
"github.com/emirpasic/gods/sets/treeset" "github.com/emirpasic/gods/sets/treeset"
"github.com/emirpasic/gods/utils" "github.com/emirpasic/gods/utils"
...@@ -30,15 +30,17 @@ import ( ...@@ -30,15 +30,17 @@ import (
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/definition" "github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service"
) )
// InMemoryMetadataService is store and query the metadata info in memory when each service registry // MetadataService is store and query the metadata info in memory when each service registry
type MetadataService struct { type MetadataService struct {
service.BaseMetadataService service.BaseMetadataService
exportedServiceURLs *sync.Map exportedServiceURLs *sync.Map
subscribedServiceURLs *sync.Map subscribedServiceURLs *sync.Map
serviceDefinitions *sync.Map
lock *sync.RWMutex lock *sync.RWMutex
} }
...@@ -47,11 +49,12 @@ func NewMetadataService() *MetadataService { ...@@ -47,11 +49,12 @@ func NewMetadataService() *MetadataService {
return &MetadataService{ return &MetadataService{
exportedServiceURLs: new(sync.Map), exportedServiceURLs: new(sync.Map),
subscribedServiceURLs: new(sync.Map), subscribedServiceURLs: new(sync.Map),
serviceDefinitions: new(sync.Map),
lock: new(sync.RWMutex), lock: new(sync.RWMutex),
} }
} }
// urlComparator: defined as utils.Comparator for treeset to compare the URL // urlComparator is defined as utils.Comparator for treeset to compare the URL
func urlComparator(a, b interface{}) int { func urlComparator(a, b interface{}) int {
url1 := a.(*common.URL) url1 := a.(*common.URL)
url2 := b.(*common.URL) url2 := b.(*common.URL)
...@@ -65,7 +68,7 @@ func urlComparator(a, b interface{}) int { ...@@ -65,7 +68,7 @@ func urlComparator(a, b interface{}) int {
} }
} }
// addURL: add URL in memory // addURL will add URL in memory
func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
var ( var (
urlSet interface{} urlSet interface{}
...@@ -91,7 +94,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool { ...@@ -91,7 +94,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
return true return true
} }
// removeURL: used to remove specified url // removeURL is used to remove specified url
func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) { func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
if value, loaded := targetMap.Load(url.ServiceKey()); loaded { if value, loaded := targetMap.Load(url.ServiceKey()); loaded {
mts.lock.Lock() mts.lock.Lock()
...@@ -105,7 +108,7 @@ func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) { ...@@ -105,7 +108,7 @@ func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
} }
} }
// getAllService: return all the exportedUrlString except for metadataService // getAllService can return all the exportedUrlString except for metadataService
func (mts *MetadataService) getAllService(services *sync.Map) sets.Set { func (mts *MetadataService) getAllService(services *sync.Map) sets.Set {
sets := treeset.NewWith(utils.StringComparator) sets := treeset.NewWith(utils.StringComparator)
services.Range(func(key, value interface{}) bool { services.Range(func(key, value interface{}) bool {
...@@ -121,7 +124,7 @@ func (mts *MetadataService) getAllService(services *sync.Map) sets.Set { ...@@ -121,7 +124,7 @@ func (mts *MetadataService) getAllService(services *sync.Map) sets.Set {
return sets return sets
} }
// getSpecifiedService: return specified service url by serviceKey // getSpecifiedService can return specified service url by serviceKey
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) sets.Set { func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) sets.Set {
targetSets := treeset.NewWith(utils.StringComparator) targetSets := treeset.NewWith(utils.StringComparator)
serviceSet, loaded := services.Load(serviceKey) serviceSet, loaded := services.Load(serviceKey)
...@@ -136,63 +139,89 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s ...@@ -136,63 +139,89 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s
return targetSets return targetSets
} }
// ExportURL: store the in memory treeset // ExportURL can store the in memory treeset
func (mts *MetadataService) ExportURL(url common.URL) bool { func (mts *MetadataService) ExportURL(url common.URL) (bool, error) {
return mts.addURL(mts.exportedServiceURLs, &url) return mts.addURL(mts.exportedServiceURLs, &url), nil
} }
// UnexportURL: remove the url store in memory treeset // UnexportURL can remove the url store in memory treeset
func (mts *MetadataService) UnexportURL(url common.URL) { func (mts *MetadataService) UnexportURL(url common.URL) error {
mts.removeURL(mts.exportedServiceURLs, &url) mts.removeURL(mts.exportedServiceURLs, &url)
return nil
} }
// SubscribeURL... // SubscribeURL can store the in memory treeset
func (mts *MetadataService) SubscribeURL(url common.URL) bool { func (mts *MetadataService) SubscribeURL(url common.URL) (bool, error) {
return mts.addURL(mts.subscribedServiceURLs, &url) return mts.addURL(mts.subscribedServiceURLs, &url), nil
} }
// UnsubscribeURL... // UnsubscribeURL can remove the url store in memory treeset
func (mts *MetadataService) UnsubscribeURL(url common.URL) { func (mts *MetadataService) UnsubscribeURL(url common.URL) error {
mts.removeURL(mts.subscribedServiceURLs, &url) mts.removeURL(mts.subscribedServiceURLs, &url)
return nil
} }
// PublishServiceDefinition: publish url's service metadata info, and write into memory // PublishServiceDefinition: publish url's service metadata info, and write into memory
func (MetadataService) PublishServiceDefinition(url common.URL) { func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "") interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric { if len(interfaceName) > 0 && !isGeneric {
//judge is consumer or provider //judge is consumer or provider
role := url.GetParam(constant.SIDE_KEY, "") //side := url.GetParam(constant.SIDE_KEY, "")
//var service common.RPCService //var service common.RPCService
if role == common.RoleType(common.CONSUMER).Role() { service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
//if side == common.RoleType(common.CONSUMER).Role() {
//TODO:BOSS FANG // //TODO:generate the service definition and store it
} else if role == common.RoleType(common.PROVIDER).Role() { //
//TODO:BOSS FANG //} else if side == common.RoleType(common.PROVIDER).Role() {
// //TODO:generate the service definition and store it
//}
sd := definition.BuildServiceDefinition(*service, url)
data, err := json.Marshal(sd)
if err != nil {
logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error: ", url, err)
} }
mts.serviceDefinitions.Store(url.ServiceKey(), string(data))
return nil
} }
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
return nil
} }
// GetExportedURLs get all exported urls // GetExportedURLs get all exported urls
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) sets.Set { func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (sets.Set, error) {
if serviceInterface == constant.ANY_VALUE { if serviceInterface == constant.ANY_VALUE {
return mts.getAllService(mts.exportedServiceURLs) return mts.getAllService(mts.exportedServiceURLs), nil
} else { } else {
serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version) serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version)
return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol) return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol), nil
} }
} }
// GetSubscribedURLs get all subscribedUrl // GetSubscribedURLs get all subscribedUrl
func (mts *MetadataService) GetSubscribedURLs() sets.Set { func (mts *MetadataService) GetSubscribedURLs() (sets.Set, error) {
return mts.getAllService(mts.subscribedServiceURLs) return mts.getAllService(mts.subscribedServiceURLs), nil
}
// GetServiceDefinition can get service definition by interfaceName, group and version
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
serviceKey := definition.ServiceDescriperBuild(interfaceName, group, version)
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
}
// GetServiceDefinition can get service definition by serviceKey
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
} }
func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) string { // Version will return the version of metadata service
panic("implement me") func (mts *MetadataService) Version() string {
return "1.0.0"
} }
func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) string { // Version will return the version of metadata service
panic("implement me") func (mts *MetadataService) Reference() string {
return "MetadataService"
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inmemory package inmemory
import ( import (
"context"
"fmt" "fmt"
"github.com/apache/dubbo-go/common"
"github.com/bmizerany/assert"
"testing" "testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
) )
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/metadata/definition"
)
type User struct {
Id string
Name string
Age int32
Time time.Time
}
type UserProvider struct {
}
func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) {
rsp := User{"A001", "Alex Stocks", 18, time.Now()}
return &rsp, nil
}
func (u *UserProvider) Reference() string {
return "UserProvider"
}
func (u User) JavaClassName() string {
return "com.ikurento.user.User"
}
func TestMetadataService(t *testing.T) { func TestMetadataService(t *testing.T) {
mts := NewMetadataService() mts := NewMetadataService()
serviceName := "com.ikurento.user.UserProvider" serviceName := "com.ikurento.user.UserProvider"
group := "group1" group := "group1"
version := "0.0.1" version := "0.0.1"
protocol := "dubbo" protocol := "dubbo"
beanName := "UserProvider"
u, _ := common.NewURL(fmt.Sprintf("%v://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ u, _ := common.NewURL(fmt.Sprintf("%v://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+
"environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&"+ "environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&"+
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+
"side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v", protocol, serviceName, group, version)) "side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v&bean.name=%v", protocol, serviceName, group, version, beanName))
mts.ExportURL(u) mts.ExportURL(u)
sets := mts.GetExportedURLs(serviceName, group, version, protocol) sets := mts.GetExportedURLs(serviceName, group, version, protocol)
assert.Equal(t, 1, sets.Size()) assert.Equal(t, 1, sets.Size())
...@@ -34,4 +84,12 @@ func TestMetadataService(t *testing.T) { ...@@ -34,4 +84,12 @@ func TestMetadataService(t *testing.T) {
mts.UnsubscribeURL(u) mts.UnsubscribeURL(u)
sets22 := mts.GetSubscribedURLs() sets22 := mts.GetSubscribedURLs()
assert.Equal(t, 0, sets22.Size()) assert.Equal(t, 0, sets22.Size())
userProvider := &UserProvider{}
common.ServiceMap.Register(protocol, userProvider)
mts.PublishServiceDefinition(u)
expected := `{"CanonicalName":"com.ikurento.user.UserProvider","CodeSource":"","Methods":[{"Name":"GetUser","ParameterTypes":["slice"],"ReturnType":"ptr","Parameters":null}],"Types":null}`
assert.Equal(t, mts.GetServiceDefinition(serviceName, group, version), expected)
serviceKey := definition.ServiceDescriperBuild(serviceName, group, version)
assert.Equal(t, mts.GetServiceDefinitionByServiceKey(serviceKey), expected)
} }
...@@ -28,30 +28,32 @@ import ( ...@@ -28,30 +28,32 @@ import (
// Metadataservice is used to define meta data related behaviors // Metadataservice is used to define meta data related behaviors
type MetadataService interface { type MetadataService interface {
ServiceName() string ServiceName() (string, error)
ExportURL(url common.URL) bool ExportURL(url common.URL) (bool, error)
UnexportURL(url common.URL) UnexportURL(url common.URL) error
RefreshMetadata(exportedRevision string, subscribedRevision string) bool //RefreshMetadata(exportedRevision string, subscribedRevision string) bool
SubscribeURL(url common.URL) bool SubscribeURL(url common.URL) (bool, error)
UnsubscribeURL(url common.URL) UnsubscribeURL(url common.URL) error
PublishServiceDefinition(url common.URL) PublishServiceDefinition(url common.URL) error
GetExportedURLs(serviceInterface string, group string, version string, protocol string) sets.Set GetExportedURLs(serviceInterface string, group string, version string, protocol string) (sets.Set, error)
GetSubscribedURLs() sets.Set GetSubscribedURLs() (sets.Set, error)
GetServiceDefinition(interfaceName string, group string, version string) string GetServiceDefinition(interfaceName string, group string, version string) (string, error)
GetServiceDefinitionByServiceKey(serviceKey string) string GetServiceDefinitionByServiceKey(serviceKey string) (string, error)
Version() string
common.RPCService
} }
// BaseMetadataService: is used for the common logic for struct who will implement interface MetadataService // BaseMetadataService is used for the common logic for struct who will implement interface MetadataService
type BaseMetadataService struct { type BaseMetadataService struct {
} }
// ServiceName: get the service's name in meta service , which is application name // ServiceName can get the service's name in meta service , which is application name
func (mts *BaseMetadataService) ServiceName() string { func (mts *BaseMetadataService) ServiceName() (string, error) {
return config.GetApplicationConfig().Name return config.GetApplicationConfig().Name, nil
} }
// RefreshMetadata: used for event listener's calling, to refresh metadata // RefreshMetadata is used for event listener's calling, to refresh metadata
func (mts *BaseMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool { //func (mts *BaseMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool {
return true // return true
} //}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package configurable
import (
"context"
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/metadata/service_exporter"
)
// MetadataServiceExporter is the ConfigurableMetadataServiceExporter which implement MetadataServiceExporter interface
type MetadataServiceExporter struct {
serviceConfig *config.ServiceConfig
lock sync.RWMutex
metadataService service.MetadataService
}
// NewMetadataServiceExporter will return a service_exporter.MetadataServiceExporter with the specified metadata service
func NewMetadataServiceExporter(metadataService service.MetadataService) service_exporter.MetadataServiceExporter {
return &MetadataServiceExporter{
metadataService: metadataService,
}
}
// Export will export the metadataService
func (exporter *MetadataServiceExporter) Export() error {
if !exporter.IsExported() {
exporter.lock.Lock()
defer exporter.lock.Unlock()
exporter.serviceConfig = config.NewServiceConfig("MetadataService", context.Background())
exporter.serviceConfig.Protocol = constant.DEFAULT_PROTOCOL
exporter.serviceConfig.Protocols = map[string]*config.ProtocolConfig{
constant.DEFAULT_PROTOCOL: generateMetadataProtocol(),
}
exporter.serviceConfig.InterfaceName = constant.METADATA_SERVICE_NAME
exporter.serviceConfig.Group = config.GetApplicationConfig().Name
exporter.serviceConfig.Version = exporter.metadataService.Version()
exporter.serviceConfig.Implement(exporter.metadataService)
err := exporter.serviceConfig.Export()
logger.Infof("The MetadataService exports urls : %v ", exporter.serviceConfig.GetExportedUrls())
return err
} else {
logger.Warnf("The MetadataService has been exported : %v ", exporter.serviceConfig.GetExportedUrls())
return nil
}
}
// Unexport will unexport the metadataService
func (exporter *MetadataServiceExporter) Unexport() {
if exporter.IsExported() {
exporter.lock.Lock()
defer exporter.lock.Unlock()
exporter.serviceConfig.Unexport()
}
}
// GetExportedURLs will return the urls that export use.
// Notice!The exported url is not same as url in registry , for example it lack the ip.
func (exporter *MetadataServiceExporter) GetExportedURLs() []*common.URL {
return exporter.serviceConfig.GetExportedUrls()
}
// isExported will return is metadataServiceExporter exported or not
func (exporter *MetadataServiceExporter) IsExported() bool {
exporter.lock.RLock()
defer exporter.lock.RUnlock()
return exporter.serviceConfig != nil && exporter.serviceConfig.IsExport()
}
// generateMetadataProtocol will return a default ProtocolConfig
func generateMetadataProtocol() *config.ProtocolConfig {
return &config.ProtocolConfig{
Name: constant.DEFAULT_PROTOCOL,
Port: "20000",
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package configurable
import (
"fmt"
_ "github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/config"
_ "github.com/apache/dubbo-go/filter/filter_impl"
"github.com/apache/dubbo-go/metadata/service/inmemory"
"github.com/apache/dubbo-go/protocol/dubbo"
_ "github.com/apache/dubbo-go/protocol/dubbo"
"github.com/stretchr/testify/assert"
"testing"
)
func TestConfigurableExporter(t *testing.T) {
dubbo.SetServerConfig(dubbo.ServerConfig{
SessionNumber: 700,
SessionTimeout: "20s",
GettySessionParam: dubbo.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
TcpKeepAlive: true,
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
WaitTimeout: "1s",
MaxMsgLen: 10240000000,
SessionName: "server",
}})
config.MockInitProviderWithSingleRegistry()
metadataService := inmemory.NewMetadataService()
exported := NewMetadataServiceExporter(metadataService)
assert.Equal(t, false, exported.IsExported())
assert.NoError(t, exported.Export())
assert.Equal(t, true, exported.IsExported())
fmt.Println(exported.GetExportedURLs())
}
...@@ -15,15 +15,15 @@ ...@@ -15,15 +15,15 @@
* limitations under the License. * limitations under the License.
*/ */
package exporter package service_exporter
import ( import (
"github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common"
) )
type MetadataExporter interface { type MetadataServiceExporter interface {
Export() MetadataExporter Export() error
Unexport() MetadataExporter Unexport()
GetExportedURLs() []*common.URL GetExportedURLs() []*common.URL
IsExported() bool IsExported() bool
} }
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment