Skip to content
Snippets Groups Projects
Commit ed6b59bb authored by flycash's avatar flycash Committed by lzp0412
Browse files

Add events and eventListener

parent 34775e54
No related branches found
No related tags found
No related merge requests found
Showing
with 181 additions and 92 deletions
......@@ -78,3 +78,7 @@ const (
const (
SIMPLE_METADATA_SERVICE_NAME = "MetadataService"
)
const (
SERVICE_DISCOVERY_DEFAULT_GROUP = "DEFAULT_GROUP"
)
\ No newline at end of file
......@@ -273,9 +273,6 @@ const (
METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params"
METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls"
// used by URL
// SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used
SERVICE_NAME_MAPPING_KEY = "name_mapping"
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"sort"
"github.com/apache/dubbo-go/registry"
)
var (
customizers = make([]registry.ServiceInstanceCustomizer, 0, 8)
)
// AddCustomizers will put the customizer into slices and then sort them;
// this method will be invoked several time, so we sort them here.
func AddCustomizers(cus registry.ServiceInstanceCustomizer) {
customizers = append(customizers, cus)
sort.Stable(customizerSlice(customizers))
}
// GetCustomizers will return the sorted customizer
func GetCustomizers() []registry.ServiceInstanceCustomizer {
return customizers
}
type customizerSlice []registry.ServiceInstanceCustomizer
func (c customizerSlice) Len() int {
return len(c)
}
func (c customizerSlice) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c customizerSlice) Less(i, j int) bool { return c[i].GetPriority() < c[j].GetPriority() }
......@@ -20,17 +20,13 @@ package extension
import "github.com/apache/dubbo-go/metadata/mapping"
var (
nameMappings = make(map[string]func() mapping.ServiceNameMapping)
globalNameMapping mapping.ServiceNameMapping
)
func SetServiceNameMapping(name string, creator func() mapping.ServiceNameMapping) {
nameMappings[name] = creator
func SetGlobalServiceNameMapping(nameMapping mapping.ServiceNameMapping) {
globalNameMapping = nameMapping
}
func GetServiceNameMapping(name string) mapping.ServiceNameMapping {
creator, ok := nameMappings[name]
if !ok {
panic("Can not find the target service name mapping: " + name)
}
return creator()
func GetGlobalServiceNameMapping() mapping.ServiceNameMapping {
return globalNameMapping
}
......@@ -32,6 +32,7 @@ type EventListener interface {
// OnEvent handle this event
OnEvent(e Event) error
// GetEventType listen which event type
// return nil if the implementation want to listen any event
GetEventType() reflect.Type
}
......
......@@ -137,7 +137,7 @@ func TestSuiteMethod(t *testing.T) {
assert.True(t, ok)
methodType := suiteMethod(method)
method = methodType.Method()
assert.Equal(t, "func(*common.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String())
assert.Equal(t, "func(*event.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String())
at := methodType.ArgsType()
assert.Equal(t, "interface {}", at[0].String())
assert.Equal(t, "interface {}", at[1].String())
......@@ -151,7 +151,7 @@ func TestSuiteMethod(t *testing.T) {
assert.True(t, ok)
methodType = suiteMethod(method)
method = methodType.Method()
assert.Equal(t, "func(*common.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String())
assert.Equal(t, "func(*event.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String())
at = methodType.ArgsType()
assert.Equal(t, "interface {}", at[0].String())
assert.Equal(t, "interface {}", at[1].String())
......@@ -164,7 +164,7 @@ func TestSuiteMethod(t *testing.T) {
assert.True(t, ok)
methodType = suiteMethod(method)
method = methodType.Method()
assert.Equal(t, "func(*common.TestService) error", method.Type.String())
assert.Equal(t, "func(*event.TestService) error", method.Type.String())
at = methodType.ArgsType()
assert.Equal(t, 0, len(at))
assert.Nil(t, methodType.CtxType())
......
......@@ -33,7 +33,8 @@ type ApplicationConfig struct {
Version string `yaml:"version" json:"version,omitempty" property:"version"`
Owner string `yaml:"owner" json:"owner,omitempty" property:"owner"`
Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"`
MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"` //field for metadata report
// the metadata type. remote or local
MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"`
}
// Prefix ...
......
......@@ -40,7 +40,7 @@ type multiConfiger interface {
Prefix() string
}
// BaseConfig is the common configuration for provider and consumer
// BaseConfig is the event configuration for provider and consumer
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"`
......@@ -51,7 +51,7 @@ type BaseConfig struct {
configCenterUrl *common.URL
prefix string
fatherConfig interface{}
eventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"`
EventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"`
MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"`
fileStream *bytes.Buffer
}
......@@ -376,7 +376,6 @@ func initializeStruct(t reflect.Type, v reflect.Value) {
default:
}
}
}
}
}
......@@ -218,15 +218,23 @@ func Load() {
// init router
initRouter()
// event part
extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
// start the metadata report if config set
if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil {
logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
return
}
logger.Debugf("provider config{%#v}\n", providerConfig)
// reference config
loadConsumerConfig()
// service config
loadProviderConfig()
// common part
extension.SetAndInitGlobalDispatcher(providerConfig.eventDispatcherType)
// init the shutdown callback
GracefulShutdownInit()
}
......
......@@ -93,7 +93,7 @@ func (c *MetadataReportConfig) IsValid() bool {
// StartMetadataReport: The entry of metadata report start
func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error {
if metadataReportConfig == nil || metadataReportConfig.IsValid() {
if metadataReportConfig == nil || !metadataReportConfig.IsValid() {
return nil
}
......
......@@ -28,7 +28,6 @@ import (
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/yaml"
)
......@@ -95,11 +94,6 @@ func ProviderInit(confProFile string) error {
n.InterfaceId = k
}
}
// start the metadata report if config set
if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil {
return perrors.WithMessagef(err, "Provider starts metadata report error, and the error is {%#v}", err)
}
logger.Debugf("provider config{%#v}\n", providerConfig)
return nil
}
......
......@@ -26,7 +26,7 @@ import (
* RejectedExecutionHandler
* If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter,
* the implementation will be used.
* The common case is that sometimes you want to return the default value when the request was rejected.
* The event case is that sometimes you want to return the default value when the request was rejected.
* Or you want to be warned if any request was rejected.
* In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler.
*/
......
......@@ -18,11 +18,10 @@
package dynamic
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/metadata/mapping"
"strconv"
"sync"
"time"
"github.com/apache/dubbo-go/common/extension"
)
import (
......@@ -43,7 +42,8 @@ const (
)
func init() {
extension.SetServiceNameMapping("dynamic", GetServiceNameMappingInstance)
dc := common_cfg.GetEnvInstance().GetDynamicConfiguration()
extension.SetGlobalServiceNameMapping(&DynamicConfigurationServiceNameMapping{dc: dc})
}
// DynamicConfigurationServiceNameMapping is the implementation based on config center
......@@ -82,17 +82,3 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str
// so other params are ignored and remove, including group string, version string, protocol string
return defaultGroup + slash + serviceInterface
}
var (
serviceNameMappingInstance *DynamicConfigurationServiceNameMapping
serviceNameMappingInitOnce sync.Once
)
// GetServiceNameMappingInstance will return an instance of DynamicConfigurationServiceNameMapping
func GetServiceNameMappingInstance() mapping.ServiceNameMapping {
serviceNameMappingInitOnce.Do(func() {
dc := common_cfg.GetEnvInstance().GetDynamicConfiguration()
serviceNameMappingInstance = &DynamicConfigurationServiceNameMapping{dc: dc}
})
return serviceNameMappingInstance
}
......@@ -17,11 +17,6 @@
package memory
import (
"github.com/apache/dubbo-go/metadata/mapping"
"sync"
)
import (
gxset "github.com/dubbogo/gost/container/set"
)
......@@ -32,7 +27,7 @@ import (
)
func init() {
extension.SetServiceNameMapping("in-memory", GetInMemoryServiceNameMappingInstance)
extension.SetGlobalServiceNameMapping(&InMemoryServiceNameMapping{})
}
type InMemoryServiceNameMapping struct{}
......@@ -44,15 +39,3 @@ func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, v
func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) {
return gxset.NewSet(config.GetApplicationConfig().Name), nil
}
var (
nameMappingInstance *InMemoryServiceNameMapping
nameMappingInitOnce sync.Once
)
func GetInMemoryServiceNameMappingInstance() mapping.ServiceNameMapping {
nameMappingInitOnce.Do(func() {
nameMappingInstance = &InMemoryServiceNameMapping{}
})
return nameMappingInstance
}
......@@ -175,12 +175,12 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
if len(interfaceName) > 0 && !isGeneric {
//judge is consumer or provider
//side := url.GetParam(constant.SIDE_KEY, "")
//var service common.RPCService
//var service event.RPCService
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
//if side == common.RoleType(common.CONSUMER).Role() {
//if side == event.RoleType(event.CONSUMER).Role() {
// //TODO:generate the service definition and store it
//
//} else if side == common.RoleType(common.PROVIDER).Role() {
//} else if side == event.RoleType(event.PROVIDER).Role() {
// //TODO:generate the service definition and store it
//}
sd := definition.BuildServiceDefinition(*service, url)
......
......@@ -18,6 +18,8 @@
package remote
import (
"sync"
"github.com/Workiva/go-datastructures/slice/skip"
"go.uber.org/atomic"
)
......@@ -38,6 +40,7 @@ import (
const version = "1.0.0"
// MetadataService is a implement of metadata service which will delegate the remote metadata report
// This is singleton
type MetadataService struct {
service.BaseMetadataService
inMemoryMetadataService *inmemory.MetadataService
......@@ -46,16 +49,26 @@ type MetadataService struct {
delegateReport *delegate.MetadataReport
}
var (
metadataServiceOnce sync.Once
metadataServiceInstance *MetadataService
)
// NewMetadataService will create a new remote MetadataService instance
func NewMetadataService() (*MetadataService, error) {
mr, err := delegate.NewMetadataReport()
if err != nil {
return nil, err
}
return &MetadataService{
inMemoryMetadataService: inmemory.NewMetadataService(),
delegateReport: mr,
}, nil
var err error
metadataServiceOnce.Do(func() {
var mr *delegate.MetadataReport
mr, err = delegate.NewMetadataReport()
if err != nil {
return
}
metadataServiceInstance = &MetadataService{
inMemoryMetadataService: inmemory.NewMetadataService(),
delegateReport: mr,
}
})
return metadataServiceInstance, err
}
// setInMemoryMetadataService will replace the in memory metadata service by the specific param
......@@ -90,39 +103,41 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
sd := definition.BuildServiceDefinition(*service, url)
sv := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
sd := definition.BuildServiceDefinition(*sv, url)
id := &identifier.MetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: interfaceName,
Version: url.GetParam(constant.VERSION_KEY, ""),
Group: url.GetParam(constant.GROUP_KEY, ""),
// Group: url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP),
Group: url.GetParam(constant.GROUP_KEY, "test"),
},
}
mts.delegateReport.StoreProviderMetadata(id, sd)
return nil
}
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
return nil
}
// GetExportedURLs will be implemented by in memory service
func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) {
return nil, nil
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) {
return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol)
}
// GetSubscribedURLs will be implemented by in memory service
func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) {
return nil, nil
func (mts *MetadataService) GetSubscribedURLs() (*skip.SkipList, error) {
return mts.inMemoryMetadataService.GetSubscribedURLs()
}
// GetServiceDefinition will be implemented by in memory service
func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
return "", nil
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
return mts.inMemoryMetadataService.GetServiceDefinition(interfaceName, group, version)
}
// GetServiceDefinitionByServiceKey will be implemented by in memory service
func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
return "", nil
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
return mts.inMemoryMetadataService.GetServiceDefinitionByServiceKey(serviceKey)
}
// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service
......
......@@ -27,7 +27,8 @@ import (
"github.com/apache/dubbo-go/config"
)
// Metadataservice is used to define meta data related behaviors
// MetadataService is used to define meta data related behaviors
// usually the implementation should be singleton
type MetadataService interface {
common.RPCService
// ServiceName will get the service's name in meta service , which is application name
......@@ -56,7 +57,7 @@ type MetadataService interface {
Version() string
}
// BaseMetadataService is used for the common logic for struct who will implement interface MetadataService
// BaseMetadataService is used for the event logic for struct who will implement interface MetadataService
type BaseMetadataService struct {
}
......
......@@ -92,7 +92,7 @@ type FacadeBasedRegistry interface {
InitListeners()
}
// BaseRegistry is a common logic abstract for registry. It implement Registry interface.
// BaseRegistry is a event logic abstract for registry. It implement Registry interface.
type BaseRegistry struct {
context context.Context
facadeBasedRegistry FacadeBasedRegistry
......
......@@ -64,7 +64,7 @@ func TestSubscribe(t *testing.T) {
// registryDirectory, mockRegistry := normalRegistryDir()
// time.Sleep(1e9)
// assert.Len(t, registryDirectory.cacheInvokers, 3)
// mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeDel, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))})
// mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeDel, Service: *event.NewURLWithOptions(event.WithPath("TEST0"), event.WithProtocol("dubbo"))})
// time.Sleep(1e9)
// assert.Len(t, registryDirectory.cacheInvokers, 2)
//}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"reflect"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
)
func init() {
extension.AddEventListener(&customizableServiceInstanceListener{})
}
type customizableServiceInstanceListener struct {
}
// GetPriority return priority 9999,
// 9999 is big enough to make sure it will be last invoked
func (c *customizableServiceInstanceListener) GetPriority() int {
return 9999
}
// OnEvent if the event is ServiceInstancePreRegisteredEvent
// it will iterate all ServiceInstanceCustomizer instances
// or it will do nothing
func (c *customizableServiceInstanceListener) OnEvent(e observer.Event) error {
if preRegEvent, ok := e.(*ServiceInstancePreRegisteredEvent); ok {
for _, cus := range extension.GetCustomizers() {
cus.Customize(preRegEvent.serviceInstance)
}
}
return nil
}
// GetEventType will return ServiceInstancePreRegisteredEvent
func (c *customizableServiceInstanceListener) GetEventType() reflect.Type {
return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{})
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment