Skip to content
Snippets Groups Projects
Commit c6e8b0c6 authored by flycash's avatar flycash
Browse files

Fix BUG

parent 9d5d9e3b
No related branches found
No related tags found
No related merge requests found
Showing
with 127 additions and 63 deletions
......@@ -277,4 +277,6 @@ const (
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
LANGUAGE_KEY = "language"
GO_LANG = "golang"
)
......@@ -343,6 +343,10 @@ func suiteMethod(method reflect.Method) *MethodType {
argsType []reflect.Type
)
if mname == "Reference" {
return nil
}
if outNum != 1 && outNum != 2 {
logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
mname, mtype.String(), outNum)
......
......@@ -189,6 +189,7 @@ func loadProviderConfig() {
logger.Errorf("[provider config center refresh] %#v", err)
}
checkRegistries(providerConfig.Registries, providerConfig.Registry)
for key, svs := range providerConfig.Services {
rpcService := GetProviderService(key)
if rpcService == nil {
......
......@@ -55,6 +55,8 @@ func (exporter *MetadataServiceExporter) Export() error {
constant.DEFAULT_PROTOCOL: generateMetadataProtocol(),
}
serviceConfig.InterfaceName = constant.METADATA_SERVICE_NAME
// identify this is a golang server
serviceConfig.Params = map[string]string{constant.LANGUAGE_KEY: constant.GO_LANG}
serviceConfig.Group = config.GetApplicationConfig().Name
// now the error will always be nil
serviceConfig.Version, _ = exporter.metadataService.Version()
......
......@@ -41,9 +41,14 @@ func createProxy(ins registry.ServiceInstance) service.MetadataService {
logger.Errorf("metadata service urls not found, %v", ins)
return nil
}
p := extension.GetProtocol(urls[0].Protocol)
invoker := p.Refer(*urls[0])
return &MetadataServiceProxy{invkr: invoker}
u := urls[0]
p := extension.GetProtocol(u.Protocol)
invoker := p.Refer(*u)
golang := u.GetParam(constant.LANGUAGE_KEY, "")
return &MetadataServiceProxy{invkr: invoker,
golangServer: golang == constant.GO_LANG,
}
}
// buildStandardMetadataServiceURL will use standard format to build the metadata service url.
......
......@@ -143,7 +143,7 @@ func (mts *MetadataService) getAllService(services *sync.Map) []common.URL {
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(Comparator))
if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.SIMPLE_METADATA_SERVICE_NAME {
if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.METADATA_SERVICE_NAME {
res = append(res, url)
}
}
......@@ -220,12 +220,12 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
}
// GetExportedURLs get all exported urls
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) {
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
if serviceInterface == constant.ANY_VALUE {
return mts.getAllService(mts.exportedServiceURLs), nil
return service.ConvertURLArrToIntfArr(mts.getAllService(mts.exportedServiceURLs)), nil
} else {
serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version)
return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol), nil
return service.ConvertURLArrToIntfArr(mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol)), nil
}
}
......
......@@ -20,34 +20,59 @@ package inmemory
import (
"context"
"reflect"
"time"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
// actually it's RPC stub
// this will only be used by client-side
// if the metadata service is "local" metadata service in server side,
// which means that metadata service is RPC service too.
// so in client-side, if we want to get the metadata information,
// we must call metadata service
// this is the stub, or proxy
// for now, only GetExportedURLs will be implemented
type MetadataServiceProxy struct {
invkr protocol.Invoker
invkr protocol.Invoker
golangServer bool
}
func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) {
func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
siV := reflect.ValueOf(serviceInterface)
gV := reflect.ValueOf(group)
vV := reflect.ValueOf(version)
pV := reflect.ValueOf(protocol)
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getExportedURLs"),
// this is a strange logic
// we should notice that
// when we call java server, the method was register as "getExportedURLs"
// however, if we call golang server, the method was register as "GetExportedURLs"
// it's so tricky...
methodName := "getExportedURLs"
if m.golangServer {
methodName = "GetExportedURLs"
}
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName(methodName),
invocation.WithArguments([]interface{}{siV.Interface(), gV.Interface(), vV.Interface(), pV.Interface()}),
invocation.WithReply(reflect.ValueOf(&[]interface{}{}).Interface()),
invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}),
invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV}))
start := time.Now()
res := m.invkr.Invoke(context.Background(), inv)
end := time.Now()
logger.Infof("duration: ", (end.Sub(start)).String())
if res.Error() != nil {
logger.Errorf("could not get the metadata service from remote provider: %v", res.Error())
return []interface{}{}, nil
}
urlStrs := res.Result().(*[]interface{})
......@@ -62,61 +87,65 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st
}
ret = append(ret, u)
}
return ret, nil
return service.ConvertURLArrToIntfArr(ret), nil
}
func (m *MetadataServiceProxy) Reference() string {
panic("implement me")
logger.Error("you should never invoke this implementation")
return constant.METADATA_SERVICE_NAME
}
func (m *MetadataServiceProxy) ServiceName() (string, error) {
panic("implement me")
logger.Error("you should never invoke this implementation")
return "", nil
}
func (m *MetadataServiceProxy) ExportURL(url common.URL) (bool, error) {
panic("implement me")
logger.Error("you should never invoke this implementation")
return false, nil
}
func (m *MetadataServiceProxy) UnexportURL(url common.URL) error {
panic("implement me")
logger.Error("you should never invoke this implementation")
return nil
}
func (m *MetadataServiceProxy) SubscribeURL(url common.URL) (bool, error) {
panic("implement me")
logger.Error("you should never invoke this implementation")
return false, nil
}
func (m *MetadataServiceProxy) UnsubscribeURL(url common.URL) error {
panic("implement me")
logger.Error("you should never invoke this implementation")
return nil
}
func (m *MetadataServiceProxy) PublishServiceDefinition(url common.URL) error {
panic("implement me")
logger.Error("you should never invoke this implementation")
return nil
}
func (m *MetadataServiceProxy) GetSubscribedURLs() ([]common.URL, error) {
panic("implement me")
logger.Error("you should never invoke this implementation")
return []common.URL{}, nil
}
func (m *MetadataServiceProxy) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
panic("implement me")
logger.Error("you should never invoke this implementation")
return "", nil
}
func (m *MetadataServiceProxy) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
panic("implement me")
logger.Error("you should never invoke this implementation")
return "", nil
}
func (m *MetadataServiceProxy) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
panic("implement me")
logger.Error("you should never invoke this implementation")
return false, nil
}
func (m *MetadataServiceProxy) Version() (string, error) {
panic("implement me")
}
type MetadataServiceStub struct {
GetExportedURLs func(serviceInterface string, group string, version string, protocol string) ([]interface{}, error)
}
func (m *MetadataServiceStub) Reference() string {
return constant.METADATA_SERVICE_NAME
logger.Error("you should never invoke this implementation")
return "", nil
}
......@@ -132,7 +132,7 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
}
// GetExportedURLs will be implemented by in memory service
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) {
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol)
}
......@@ -162,9 +162,10 @@ func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedR
}
logger.Infof("urls length = %v", len(urls))
for _, u := range urls {
id := identifier.NewServiceMetadataIdentifier(u)
id := identifier.NewServiceMetadataIdentifier(u.(common.URL))
id.Revision = mts.exportedRevision.Load()
if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil {
if err := mts.delegateReport.SaveServiceMetadata(id, u.(common.URL)); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
return false, err
}
......
......@@ -45,31 +45,31 @@ func (m *metadataServiceProxy) ServiceName() (string, error) {
}
func (m *metadataServiceProxy) ExportURL(url common.URL) (bool, error) {
logger.Errorf("you should never invoke this implementation")
logger.Error("you should never invoke this implementation")
return true, nil
}
func (m *metadataServiceProxy) UnexportURL(url common.URL) error {
logger.Errorf("you should never invoke this implementation")
logger.Error("you should never invoke this implementation")
return nil
}
func (m *metadataServiceProxy) SubscribeURL(url common.URL) (bool, error) {
logger.Errorf("you should never invoke this implementation")
logger.Error("you should never invoke this implementation")
return true, nil
}
func (m *metadataServiceProxy) UnsubscribeURL(url common.URL) error {
logger.Errorf("you should never invoke this implementation")
logger.Error("you should never invoke this implementation")
return nil
}
func (m *metadataServiceProxy) PublishServiceDefinition(url common.URL) error {
logger.Errorf("you should never invoke this implementation")
logger.Error("you should never invoke this implementation")
return nil
}
func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) {
func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
urls := m.report.GetExportedURLs(&identifier.ServiceMetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: serviceInterface,
......@@ -89,11 +89,11 @@ func (m *metadataServiceProxy) GetExportedURLs(serviceInterface string, group st
}
res = append(res, u)
}
return res, nil
return service.ConvertURLArrToIntfArr(res), nil
}
func (m *metadataServiceProxy) GetSubscribedURLs() ([]common.URL, error) {
logger.Errorf("you should never invoke this implementation")
logger.Error("you should never invoke this implementation")
return []common.URL{}, nil
}
......@@ -115,7 +115,7 @@ func (m *metadataServiceProxy) GetServiceDefinitionByServiceKey(serviceKey strin
}
func (m *metadataServiceProxy) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
logger.Errorf("you should never invoke this implementation")
logger.Error("you should never invoke this implementation")
return true, nil
}
......
......@@ -43,7 +43,8 @@ type MetadataService interface {
PublishServiceDefinition(url common.URL) error
// GetExportedURLs will get the target exported url in metadata
// the url should be unique
GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error)
// due to dubbo-go only support return array []interface{} in RPCService, so we should declare the return type as []interface{}
GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error)
// GetExportedURLs will get the target subscribed url in metadata
// the url should be unique
GetSubscribedURLs() ([]common.URL, error)
......@@ -108,3 +109,15 @@ func getExportedServicesRevision(serviceInstance registry.ServiceInstance) strin
metaData := serviceInstance.GetMetadata()
return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME]
}
func ConvertURLArrToIntfArr(urls []common.URL) []interface{} {
if len(urls) == 0 {
return []interface{}{}
}
res := make([]interface{}, 0, len(urls))
for _, u := range urls {
res = append(res, u)
}
return res
}
\ No newline at end of file
......@@ -74,13 +74,14 @@ func (m *metadataServiceURLParamsMetadataCustomizer) Customize(instance registry
instance.GetMetadata()[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME] = string(str)
}
func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []common.URL) map[string]map[string]string {
func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []interface{}) map[string]map[string]string {
// usually there will be only one protocol
res := make(map[string]map[string]string, 1)
// those keys are useless
for _, u := range urls {
for _, ui := range urls {
u := ui.(common.URL)
p := make(map[string]string, len(u.GetParams()))
for k, v := range u.GetParams() {
// we will ignore that
......
......@@ -21,6 +21,7 @@ import (
"encoding/json"
"strconv"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/registry"
......@@ -52,7 +53,8 @@ func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceIns
return
}
for _, u := range list {
for _, ui := range list {
u := ui.(common.URL)
if len(u.Protocol) == 0 {
continue
}
......
......@@ -26,6 +26,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/registry"
)
......@@ -85,7 +86,7 @@ func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance regist
logger.Errorf("could not find the subscribed url", err)
}
revision := resolveRevision(urls)
revision := resolveRevision(service.ConvertURLArrToIntfArr(urls))
if len(revision) == 0 {
revision = defaultRevision
}
......@@ -96,13 +97,14 @@ func (e *subscribedServicesRevisionMetadataCustomizer) Customize(instance regist
// so that we could use interface + method name as identifier and ignore the method params
// per my understanding, it's enough because Dubbo actually ignore the url params.
// please refer org.apache.dubbo.common.URL#toParameterString(java.lang.String...)
func resolveRevision(urls []common.URL) string {
func resolveRevision(urls []interface{}) string {
if len(urls) == 0 {
return ""
}
candidates := make([]string, 0, len(urls))
for _, u := range urls {
for _, ui := range urls {
u := ui.(common.URL)
sk := u.GetParam(constant.INTERFACE_KEY, "")
for _, m := range u.Methods {
// methods are part of candidates
......
......@@ -56,8 +56,7 @@ func init() {
// It's completely different from other registry implementations
// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
// In order to keep compatible with interface-level registry,
// 1. when we registry the service, we should create the mapping from service name to application name
// 2. when we sub
// this implementation is
type serviceDiscoveryRegistry struct {
lock sync.RWMutex
url *common.URL
......@@ -72,13 +71,7 @@ type serviceDiscoveryRegistry struct {
func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
// the metadata service is exported in DubboBootstrap of Java Dubbo
// but I don't want to introduce similar structure because we has less logic to do
// so I codes the related logic here.
// If necessary we need to think about moving there codes to somewhere else.
// init and expose metadata service
initMetadataService()
tryInitMetadataService()
serviceDiscovery, err := creatServiceDiscovery(url)
if err != nil {
......@@ -419,7 +412,12 @@ func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registr
logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
return urls
}
return result
ret := make([]common.URL, 0, len(result))
for _, ui := range result {
ret = append(ret, ui.(common.URL))
}
return ret
}
func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
......@@ -663,17 +661,21 @@ func (icn *InstanceChangeNotify) Notify(event observer.Event) {
}
}
func initMetadataService() {
var exporting = false
func tryInitMetadataService() {
ms, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
if err != nil {
logger.Errorf("could not init metadata service", err)
}
// we don't need to expose the metadata service since this is a pure consumer application
if !config.IsProvider() {
if !config.IsProvider() || exporting {
return
}
exporting = true
expt := configurable.NewMetadataServiceExporter(ms)
err = expt.Export()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment