Skip to content
Snippets Groups Projects
Commit 16b65004 authored by flycash's avatar flycash
Browse files

it can be found by java consumer

parent 2997c11d
No related branches found
No related tags found
No related merge requests found
Showing
with 133 additions and 46 deletions
......@@ -18,13 +18,16 @@
package extension
import (
"sync"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
)
var (
globalEventDispatcher observer.EventDispatcher
initEventListeners []observer.EventListener
initEventListeners []func() observer.EventListener
initEventOnce sync.Once
)
var (
......@@ -48,15 +51,20 @@ func SetAndInitGlobalDispatcher(name string) {
panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.")
}
globalEventDispatcher = dispatchers[name]()
globalEventDispatcher.AddEventListeners(initEventListeners)
}
// GetGlobalDispatcher
// GetGlobalDispatcher will init all listener and then return dispatcher
func GetGlobalDispatcher() observer.EventDispatcher {
initEventOnce.Do(func() {
// we should delay to add the listeners to avoid some listeners left
for _, l := range initEventListeners {
globalEventDispatcher.AddEventListener(l())
}
})
return globalEventDispatcher
}
// AddEventListener it will be added in global event dispatcher
func AddEventListener(listener observer.EventListener) {
initEventListeners = append(initEventListeners, listener)
func AddEventListener(creator func() observer.EventListener) {
initEventListeners = append(initEventListeners, creator)
}
......@@ -20,13 +20,13 @@ package extension
import "github.com/apache/dubbo-go/metadata/mapping"
var (
globalNameMapping mapping.ServiceNameMapping
globalNameMappingCreator func() mapping.ServiceNameMapping
)
func SetGlobalServiceNameMapping(nameMapping mapping.ServiceNameMapping) {
globalNameMapping = nameMapping
func SetGlobalServiceNameMapping(nameMappingCreator func() mapping.ServiceNameMapping) {
globalNameMappingCreator = nameMappingCreator
}
func GetGlobalServiceNameMapping() mapping.ServiceNameMapping {
return globalNameMapping
return globalNameMappingCreator()
}
......@@ -29,6 +29,7 @@ import (
// It contains the Prioritized means that the listener has its priority
// Usually the priority of your custom implementation should be between [100, 9000]
// the number outside the range will be though as system reserve number
// usually implementation should be singleton
type EventListener interface {
gxsort.Prioritizer
// OnEvent handle this event
......
......@@ -19,11 +19,12 @@ package dynamic
import (
"strconv"
"sync"
"time"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/metadata/mapping"
)
import (
......@@ -39,16 +40,16 @@ import (
)
const (
defaultGroup = config_center.DEFAULT_GROUP
defaultGroup = "mapping"
slash = "/"
)
func init() {
dc := common_cfg.GetEnvInstance().GetDynamicConfiguration()
extension.SetGlobalServiceNameMapping(&DynamicConfigurationServiceNameMapping{dc: dc})
extension.SetGlobalServiceNameMapping(GetNameMappingInstance)
}
// DynamicConfigurationServiceNameMapping is the implementation based on config center
// it's a singleton
type DynamicConfigurationServiceNameMapping struct {
dc config_center.DynamicConfiguration
}
......@@ -85,3 +86,14 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str
// so other params are ignored and remove, including group string, version string, protocol string
return defaultGroup + slash + serviceInterface
}
var serviceNameMappingInstance *DynamicConfigurationServiceNameMapping
var serviceNameMappingOnce sync.Once
func GetNameMappingInstance() mapping.ServiceNameMapping {
serviceNameMappingOnce.Do(func() {
dc := common_cfg.GetEnvInstance().GetDynamicConfiguration()
serviceNameMappingInstance = &DynamicConfigurationServiceNameMapping{dc: dc}
})
return serviceNameMappingInstance
}
......@@ -18,7 +18,11 @@
package memory
import (
"sync"
gxset "github.com/dubbogo/gost/container/set"
"github.com/apache/dubbo-go/metadata/mapping"
)
import (
......@@ -27,7 +31,7 @@ import (
)
func init() {
extension.SetGlobalServiceNameMapping(&InMemoryServiceNameMapping{})
extension.SetGlobalServiceNameMapping(GetNameMappingInstance)
}
type InMemoryServiceNameMapping struct{}
......@@ -39,3 +43,13 @@ func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, v
func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) {
return gxset.NewSet(config.GetApplicationConfig().Name), nil
}
var serviceNameMappingInstance *InMemoryServiceNameMapping
var serviceNameMappingOnce sync.Once
func GetNameMappingInstance() mapping.ServiceNameMapping {
serviceNameMappingOnce.Do(func() {
serviceNameMappingInstance = &InMemoryServiceNameMapping{}
})
return serviceNameMappingInstance
}
......@@ -49,18 +49,12 @@ func TestNacosMetadataReport_CRUD(t *testing.T) {
err = rpt.SaveServiceMetadata(serviceMi, serviceUrl)
assert.Nil(t, err)
exportedUrls := rpt.GetExportedURLs(serviceMi)
assert.Equal(t, 1, len(exportedUrls))
subMi := newSubscribeMetadataIdentifier()
urlList := make([]common.URL, 0, 1)
urlList = append(urlList, serviceUrl)
err = rpt.SaveSubscribedData(subMi, urlList)
assert.Nil(t, err)
subscribeUrl := rpt.GetSubscribedURLs(subMi)
assert.Equal(t, 1, len(subscribeUrl))
err = rpt.RemoveServiceMetadata(serviceMi)
assert.Nil(t, err)
......
......@@ -53,15 +53,24 @@ type MetadataService struct {
lock *sync.RWMutex
}
var (
metadataServiceInstance *MetadataService
metadataServiceInitOnce sync.Once
)
// NewMetadataService: initiate a metadata service
// it should be singleton
func NewMetadataService() (service.MetadataService, error) {
return &MetadataService{
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
exportedServiceURLs: &sync.Map{},
subscribedServiceURLs: &sync.Map{},
serviceDefinitions: &sync.Map{},
lock: &sync.RWMutex{},
}, nil
metadataServiceInitOnce.Do(func() {
metadataServiceInstance = &MetadataService{
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
exportedServiceURLs: &sync.Map{},
subscribedServiceURLs: &sync.Map{},
serviceDefinitions: &sync.Map{},
lock: &sync.RWMutex{},
}
})
return metadataServiceInstance, nil
}
// Comparator is defined as Comparator for skip list to compare the URL
......@@ -149,7 +158,7 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s
urls := serviceList.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(Comparator))
if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
if len(protocol) == 0 || protocol == constant.ANY_VALUE || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
res = append(res, url)
}
}
......
......@@ -19,15 +19,17 @@ package event
import (
"reflect"
"sync"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
)
func init() {
extension.AddEventListener(&customizableServiceInstanceListener{})
extension.AddEventListener(GetCustomizableServiceInstanceListener)
}
// customizableServiceInstanceListener is singleton
type customizableServiceInstanceListener struct {
}
......@@ -53,3 +55,13 @@ func (c *customizableServiceInstanceListener) OnEvent(e observer.Event) error {
func (c *customizableServiceInstanceListener) GetEventType() reflect.Type {
return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{})
}
var customizableServiceInstanceListenerInstance *customizableServiceInstanceListener
var customizableServiceInstanceListenerOnce sync.Once
func GetCustomizableServiceInstanceListener() observer.EventListener {
customizableServiceInstanceListenerOnce.Do(func() {
customizableServiceInstanceListenerInstance = &customizableServiceInstanceListener{}
})
return customizableServiceInstanceListenerInstance
}
......@@ -20,6 +20,9 @@ package event
import (
"reflect"
"testing"
"github.com/apache/dubbo-go/config"
_ "github.com/apache/dubbo-go/metadata/service/inmemory"
)
import (
......@@ -37,13 +40,22 @@ import (
)
func TestEventPublishingServiceDiscovery_DispatchEvent(t *testing.T) {
// extension.SetMetadataService("local", inmemory.NewMetadataService)
config.GetApplicationConfig().MetadataType = "local"
dc := NewEventPublishingServiceDiscovery(&ServiceDiscoveryA{})
tsd := &TestServiceDiscoveryDestroyingEventListener{}
tsd.SetT(t)
tsi := &TestServiceInstancePreRegisteredEventListener{}
tsi.SetT(t)
extension.AddEventListener(tsd)
extension.AddEventListener(tsi)
extension.AddEventListener(func() observer.EventListener {
return tsd
})
extension.AddEventListener(func() observer.EventListener {
return tsi
})
extension.SetEventDispatcher("direct", dispatcher2.NewDirectEventDispatcher)
extension.SetAndInitGlobalDispatcher("direct")
err := dc.Destroy()
......
......@@ -19,6 +19,7 @@ package event
import (
"reflect"
"sync"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
......@@ -26,9 +27,10 @@ import (
)
func init() {
extension.AddEventListener(&logEventListener{})
extension.AddEventListener(GetLogEventListener)
}
// logEventListener is singleton
type logEventListener struct {
}
......@@ -44,3 +46,13 @@ func (l *logEventListener) OnEvent(e observer.Event) error {
func (l *logEventListener) GetEventType() reflect.Type {
return reflect.TypeOf(&observer.BaseEvent{})
}
var logEventListenerInstance *logEventListener
var logEventListenerOnce sync.Once
func GetLogEventListener() observer.EventListener {
logEventListenerOnce.Do(func() {
logEventListenerInstance = &logEventListener{}
})
return logEventListenerInstance
}
......@@ -84,7 +84,7 @@ func (m *metadataServiceURLParamsMetadataCustomizer) convertToParams(urls []comm
p := make(map[string]string, len(u.GetParams()))
for k, v := range u.GetParams() {
// we will ignore that
if m.exceptKeys.Contains(k) || len(v) == 0 {
if m.exceptKeys.Contains(k) || len(v) == 0 || len(v[0]) == 0 {
continue
}
p[k] = v[0]
......
......@@ -31,11 +31,10 @@ type ServiceConfigExportedEvent struct {
func NewServiceConfigExportedEvent(serviceConfig *config.ServiceConfig) *ServiceConfigExportedEvent {
return &ServiceConfigExportedEvent{
BaseEvent: observer.BaseEvent{
Source:serviceConfig,
Timestamp:time.Now(),
BaseEvent: observer.BaseEvent{
Source: serviceConfig,
Timestamp: time.Now(),
},
ServiceConfig: serviceConfig,
}
}
......@@ -19,6 +19,7 @@ package event
import (
"reflect"
"sync"
perrors "github.com/pkg/errors"
......@@ -29,9 +30,7 @@ import (
)
func init() {
extension.AddEventListener(&serviceNameMappingListener{
nameMapping: extension.GetGlobalServiceNameMapping(),
})
extension.AddEventListener(GetCustomizableServiceInstanceListener)
}
type serviceNameMappingListener struct {
......@@ -64,3 +63,17 @@ func (s *serviceNameMappingListener) OnEvent(e observer.Event) error {
func (s *serviceNameMappingListener) GetEventType() reflect.Type {
return reflect.TypeOf(&ServiceConfigExportedEvent{})
}
var (
serviceNameMappingListenerInstance *serviceNameMappingListener
serviceNameMappingListenerOnce sync.Once
)
func GetServiceNameMappingListener() observer.EventListener {
serviceNameMappingListenerOnce.Do(func() {
serviceNameMappingListenerInstance = &serviceNameMappingListener{
nameMapping: extension.GetGlobalServiceNameMapping(),
}
})
return serviceNameMappingListenerInstance
}
......@@ -264,10 +264,12 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInst
Ip: instance.GetHost(),
Port: uint64(instance.GetPort()),
Metadata: metadata,
Enable: instance.IsEnable(),
Healthy: instance.IsHealthy(),
GroupName: n.group,
Ephemeral: true,
// We must specify the weight since Java nacos client will ignore the instance whose weight is 0
Weight: 1,
Enable: instance.IsEnable(),
Healthy: instance.IsHealthy(),
GroupName: n.group,
Ephemeral: true,
}
}
......
......@@ -38,7 +38,6 @@ import (
"github.com/apache/dubbo-go/metadata/mapping"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/metadata/service/exporter/configurable"
"github.com/apache/dubbo-go/metadata/service/remote"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/registry/event"
"github.com/apache/dubbo-go/registry/servicediscovery/proxy"
......@@ -89,7 +88,7 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
serviceNameMapping := extension.GetGlobalServiceNameMapping()
metaDataService, err := remote.NewMetadataService()
metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
if err != nil {
return nil, perrors.WithMessage(err, "could not init metadata service")
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment