From ed6b59bbc74e121736d287599d99449388b791bc Mon Sep 17 00:00:00 2001 From: flycash <mingflycash@gmail.com> Date: Tue, 2 Jun 2020 11:36:07 +0800 Subject: [PATCH] Add events and eventListener --- common/constant/default.go | 4 ++ common/constant/key.go | 3 - .../extension/service_instance_customizer.go | 49 +++++++++++++++++ common/extension/service_name_mapping.go | 14 ++--- common/observer/event_listener.go | 1 + common/rpc_service_test.go | 6 +- config/application_config.go | 3 +- config/base_config.go | 5 +- config/config_loader.go | 14 ++++- config/metadata_report_config.go | 2 +- config/provider_config.go | 6 -- filter/rejected_execution_handler.go | 2 +- .../mapping/dynamic/service_name_mapping.go | 22 ++------ .../mapping/memory/service_name_mapping.go | 19 +------ metadata/service/inmemory/service.go | 6 +- metadata/service/remote/service.go | 53 +++++++++++------- metadata/service/service.go | 5 +- registry/base_registry.go | 2 +- registry/directory/directory_test.go | 2 +- .../customizable_service_instance_listener.go | 55 +++++++++++++++++++ ...vent_publishing_service_deiscovery_test.go | 2 +- .../event_publishing_service_discovery.go | 2 +- registry/event/log_event_listener.go | 48 ++++++++++++++++ registry/event/log_event_listener_test.go | 31 +++++++++++ .../protocol_ports_metadata_customizer.go | 52 ++++++++++++++++++ .../event/service_config_exported_event.go | 26 +++++++++ .../service_discovery_event.go | 2 +- .../service_instance_event.go | 2 +- .../event/service_name_mapping_listener.go | 48 ++++++++++++++++ registry/event_listener.go | 2 +- registry/nacos/service_discovery.go | 2 +- registry/registry.go | 2 +- registry/service_instance.go | 10 ++++ .../service_discovery_registry.go | 50 ++++++++++++++--- .../service_discovery_registry_test.go | 8 +-- test/integrate/dubbo/go-client/client.go | 2 +- test/integrate/dubbo/go-server/server.go | 2 +- 37 files changed, 452 insertions(+), 112 deletions(-) create mode 100644 common/extension/service_instance_customizer.go create mode 100644 registry/event/customizable_service_instance_listener.go rename registry/{common => event}/event_publishing_service_deiscovery_test.go (99%) rename registry/{common => event}/event_publishing_service_discovery.go (99%) create mode 100644 registry/event/log_event_listener.go create mode 100644 registry/event/log_event_listener_test.go create mode 100644 registry/event/protocol_ports_metadata_customizer.go create mode 100644 registry/event/service_config_exported_event.go rename registry/{common => event}/service_discovery_event.go (99%) rename registry/{common => event}/service_instance_event.go (99%) create mode 100644 registry/event/service_name_mapping_listener.go diff --git a/common/constant/default.go b/common/constant/default.go index 6b9d914c8..8442609c5 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -78,3 +78,7 @@ const ( const ( SIMPLE_METADATA_SERVICE_NAME = "MetadataService" ) + +const ( + SERVICE_DISCOVERY_DEFAULT_GROUP = "DEFAULT_GROUP" +) \ No newline at end of file diff --git a/common/constant/key.go b/common/constant/key.go index 15496628e..394f2d8c0 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -273,9 +273,6 @@ const ( METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" - // used by URL - // SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used - SERVICE_NAME_MAPPING_KEY = "name_mapping" // SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used SERVICE_DISCOVERY_KEY = "service_discovery" ) diff --git a/common/extension/service_instance_customizer.go b/common/extension/service_instance_customizer.go new file mode 100644 index 000000000..40f8ca0c5 --- /dev/null +++ b/common/extension/service_instance_customizer.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "sort" + + "github.com/apache/dubbo-go/registry" +) + +var ( + customizers = make([]registry.ServiceInstanceCustomizer, 0, 8) +) + +// AddCustomizers will put the customizer into slices and then sort them; +// this method will be invoked several time, so we sort them here. +func AddCustomizers(cus registry.ServiceInstanceCustomizer) { + customizers = append(customizers, cus) + sort.Stable(customizerSlice(customizers)) +} + +// GetCustomizers will return the sorted customizer +func GetCustomizers() []registry.ServiceInstanceCustomizer { + return customizers +} + +type customizerSlice []registry.ServiceInstanceCustomizer + +func (c customizerSlice) Len() int { + return len(c) +} + +func (c customizerSlice) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c customizerSlice) Less(i, j int) bool { return c[i].GetPriority() < c[j].GetPriority() } diff --git a/common/extension/service_name_mapping.go b/common/extension/service_name_mapping.go index 2935f49d8..0330b8a4d 100644 --- a/common/extension/service_name_mapping.go +++ b/common/extension/service_name_mapping.go @@ -20,17 +20,13 @@ package extension import "github.com/apache/dubbo-go/metadata/mapping" var ( - nameMappings = make(map[string]func() mapping.ServiceNameMapping) + globalNameMapping mapping.ServiceNameMapping ) -func SetServiceNameMapping(name string, creator func() mapping.ServiceNameMapping) { - nameMappings[name] = creator +func SetGlobalServiceNameMapping(nameMapping mapping.ServiceNameMapping) { + globalNameMapping = nameMapping } -func GetServiceNameMapping(name string) mapping.ServiceNameMapping { - creator, ok := nameMappings[name] - if !ok { - panic("Can not find the target service name mapping: " + name) - } - return creator() +func GetGlobalServiceNameMapping() mapping.ServiceNameMapping { + return globalNameMapping } diff --git a/common/observer/event_listener.go b/common/observer/event_listener.go index fabad3a6f..3f8eeffaf 100644 --- a/common/observer/event_listener.go +++ b/common/observer/event_listener.go @@ -32,6 +32,7 @@ type EventListener interface { // OnEvent handle this event OnEvent(e Event) error // GetEventType listen which event type + // return nil if the implementation want to listen any event GetEventType() reflect.Type } diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index 2311205d0..19a1d7b03 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -137,7 +137,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType := suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String()) + assert.Equal(t, "func(*event.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String()) at := methodType.ArgsType() assert.Equal(t, "interface {}", at[0].String()) assert.Equal(t, "interface {}", at[1].String()) @@ -151,7 +151,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType = suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String()) + assert.Equal(t, "func(*event.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String()) at = methodType.ArgsType() assert.Equal(t, "interface {}", at[0].String()) assert.Equal(t, "interface {}", at[1].String()) @@ -164,7 +164,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType = suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService) error", method.Type.String()) + assert.Equal(t, "func(*event.TestService) error", method.Type.String()) at = methodType.ArgsType() assert.Equal(t, 0, len(at)) assert.Nil(t, methodType.CtxType()) diff --git a/config/application_config.go b/config/application_config.go index 33b47c81d..af6637c4e 100644 --- a/config/application_config.go +++ b/config/application_config.go @@ -33,7 +33,8 @@ type ApplicationConfig struct { Version string `yaml:"version" json:"version,omitempty" property:"version"` Owner string `yaml:"owner" json:"owner,omitempty" property:"owner"` Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"` - MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"` //field for metadata report + // the metadata type. remote or local + MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"` } // Prefix ... diff --git a/config/base_config.go b/config/base_config.go index 2f51690d3..dad4d7f7e 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -40,7 +40,7 @@ type multiConfiger interface { Prefix() string } -// BaseConfig is the common configuration for provider and consumer +// BaseConfig is the event configuration for provider and consumer type BaseConfig struct { ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"` @@ -51,7 +51,7 @@ type BaseConfig struct { configCenterUrl *common.URL prefix string fatherConfig interface{} - eventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"` + EventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"` MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` fileStream *bytes.Buffer } @@ -376,7 +376,6 @@ func initializeStruct(t reflect.Type, v reflect.Value) { default: } } - } } } diff --git a/config/config_loader.go b/config/config_loader.go index a4dc62ffb..4a293e7a9 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -218,15 +218,23 @@ func Load() { // init router initRouter() + // event part + extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType) + + // start the metadata report if config set + if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil { + logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err) + return + } + + logger.Debugf("provider config{%#v}\n", providerConfig) + // reference config loadConsumerConfig() // service config loadProviderConfig() - // common part - extension.SetAndInitGlobalDispatcher(providerConfig.eventDispatcherType) - // init the shutdown callback GracefulShutdownInit() } diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 66b88d5e9..9999371ab 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -93,7 +93,7 @@ func (c *MetadataReportConfig) IsValid() bool { // StartMetadataReport: The entry of metadata report start func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error { - if metadataReportConfig == nil || metadataReportConfig.IsValid() { + if metadataReportConfig == nil || !metadataReportConfig.IsValid() { return nil } diff --git a/config/provider_config.go b/config/provider_config.go index f0d3c4cf7..0f14c9f1d 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -28,7 +28,6 @@ import ( import ( "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/yaml" ) @@ -95,11 +94,6 @@ func ProviderInit(confProFile string) error { n.InterfaceId = k } } - // start the metadata report if config set - if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil { - return perrors.WithMessagef(err, "Provider starts metadata report error, and the error is {%#v}", err) - } - logger.Debugf("provider config{%#v}\n", providerConfig) return nil } diff --git a/filter/rejected_execution_handler.go b/filter/rejected_execution_handler.go index d02481b98..9855bd535 100644 --- a/filter/rejected_execution_handler.go +++ b/filter/rejected_execution_handler.go @@ -26,7 +26,7 @@ import ( * RejectedExecutionHandler * If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter, * the implementation will be used. - * The common case is that sometimes you want to return the default value when the request was rejected. + * The event case is that sometimes you want to return the default value when the request was rejected. * Or you want to be warned if any request was rejected. * In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler. */ diff --git a/metadata/mapping/dynamic/service_name_mapping.go b/metadata/mapping/dynamic/service_name_mapping.go index 3d502b618..56ef2a0f0 100644 --- a/metadata/mapping/dynamic/service_name_mapping.go +++ b/metadata/mapping/dynamic/service_name_mapping.go @@ -18,11 +18,10 @@ package dynamic import ( - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/metadata/mapping" "strconv" - "sync" "time" + + "github.com/apache/dubbo-go/common/extension" ) import ( @@ -43,7 +42,8 @@ const ( ) func init() { - extension.SetServiceNameMapping("dynamic", GetServiceNameMappingInstance) + dc := common_cfg.GetEnvInstance().GetDynamicConfiguration() + extension.SetGlobalServiceNameMapping(&DynamicConfigurationServiceNameMapping{dc: dc}) } // DynamicConfigurationServiceNameMapping is the implementation based on config center @@ -82,17 +82,3 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str // so other params are ignored and remove, including group string, version string, protocol string return defaultGroup + slash + serviceInterface } - -var ( - serviceNameMappingInstance *DynamicConfigurationServiceNameMapping - serviceNameMappingInitOnce sync.Once -) - -// GetServiceNameMappingInstance will return an instance of DynamicConfigurationServiceNameMapping -func GetServiceNameMappingInstance() mapping.ServiceNameMapping { - serviceNameMappingInitOnce.Do(func() { - dc := common_cfg.GetEnvInstance().GetDynamicConfiguration() - serviceNameMappingInstance = &DynamicConfigurationServiceNameMapping{dc: dc} - }) - return serviceNameMappingInstance -} diff --git a/metadata/mapping/memory/service_name_mapping.go b/metadata/mapping/memory/service_name_mapping.go index 747753a14..293d97c84 100644 --- a/metadata/mapping/memory/service_name_mapping.go +++ b/metadata/mapping/memory/service_name_mapping.go @@ -17,11 +17,6 @@ package memory -import ( - "github.com/apache/dubbo-go/metadata/mapping" - "sync" -) - import ( gxset "github.com/dubbogo/gost/container/set" ) @@ -32,7 +27,7 @@ import ( ) func init() { - extension.SetServiceNameMapping("in-memory", GetInMemoryServiceNameMappingInstance) + extension.SetGlobalServiceNameMapping(&InMemoryServiceNameMapping{}) } type InMemoryServiceNameMapping struct{} @@ -44,15 +39,3 @@ func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, v func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { return gxset.NewSet(config.GetApplicationConfig().Name), nil } - -var ( - nameMappingInstance *InMemoryServiceNameMapping - nameMappingInitOnce sync.Once -) - -func GetInMemoryServiceNameMappingInstance() mapping.ServiceNameMapping { - nameMappingInitOnce.Do(func() { - nameMappingInstance = &InMemoryServiceNameMapping{} - }) - return nameMappingInstance -} diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index 4b6f4330a..31492a322 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -175,12 +175,12 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { if len(interfaceName) > 0 && !isGeneric { //judge is consumer or provider //side := url.GetParam(constant.SIDE_KEY, "") - //var service common.RPCService + //var service event.RPCService service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) - //if side == common.RoleType(common.CONSUMER).Role() { + //if side == event.RoleType(event.CONSUMER).Role() { // //TODO:generate the service definition and store it // - //} else if side == common.RoleType(common.PROVIDER).Role() { + //} else if side == event.RoleType(event.PROVIDER).Role() { // //TODO:generate the service definition and store it //} sd := definition.BuildServiceDefinition(*service, url) diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index cd8a2dc83..b616e2a4d 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -18,6 +18,8 @@ package remote import ( + "sync" + "github.com/Workiva/go-datastructures/slice/skip" "go.uber.org/atomic" ) @@ -38,6 +40,7 @@ import ( const version = "1.0.0" // MetadataService is a implement of metadata service which will delegate the remote metadata report +// This is singleton type MetadataService struct { service.BaseMetadataService inMemoryMetadataService *inmemory.MetadataService @@ -46,16 +49,26 @@ type MetadataService struct { delegateReport *delegate.MetadataReport } +var ( + metadataServiceOnce sync.Once + metadataServiceInstance *MetadataService +) + // NewMetadataService will create a new remote MetadataService instance func NewMetadataService() (*MetadataService, error) { - mr, err := delegate.NewMetadataReport() - if err != nil { - return nil, err - } - return &MetadataService{ - inMemoryMetadataService: inmemory.NewMetadataService(), - delegateReport: mr, - }, nil + var err error + metadataServiceOnce.Do(func() { + var mr *delegate.MetadataReport + mr, err = delegate.NewMetadataReport() + if err != nil { + return + } + metadataServiceInstance = &MetadataService{ + inMemoryMetadataService: inmemory.NewMetadataService(), + delegateReport: mr, + } + }) + return metadataServiceInstance, err } // setInMemoryMetadataService will replace the in memory metadata service by the specific param @@ -90,39 +103,41 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { interfaceName := url.GetParam(constant.INTERFACE_KEY, "") isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) if len(interfaceName) > 0 && !isGeneric { - service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) - sd := definition.BuildServiceDefinition(*service, url) + sv := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := definition.BuildServiceDefinition(*sv, url) id := &identifier.MetadataIdentifier{ BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ ServiceInterface: interfaceName, Version: url.GetParam(constant.VERSION_KEY, ""), - Group: url.GetParam(constant.GROUP_KEY, ""), + // Group: url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP), + Group: url.GetParam(constant.GROUP_KEY, "test"), }, } mts.delegateReport.StoreProviderMetadata(id, sd) + return nil } logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) return nil } // GetExportedURLs will be implemented by in memory service -func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) { - return nil, nil +func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) { + return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol) } // GetSubscribedURLs will be implemented by in memory service -func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) { - return nil, nil +func (mts *MetadataService) GetSubscribedURLs() (*skip.SkipList, error) { + return mts.inMemoryMetadataService.GetSubscribedURLs() } // GetServiceDefinition will be implemented by in memory service -func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { - return "", nil +func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + return mts.inMemoryMetadataService.GetServiceDefinition(interfaceName, group, version) } // GetServiceDefinitionByServiceKey will be implemented by in memory service -func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { - return "", nil +func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + return mts.inMemoryMetadataService.GetServiceDefinitionByServiceKey(serviceKey) } // RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service diff --git a/metadata/service/service.go b/metadata/service/service.go index 13464087e..2f8a98d18 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -27,7 +27,8 @@ import ( "github.com/apache/dubbo-go/config" ) -// Metadataservice is used to define meta data related behaviors +// MetadataService is used to define meta data related behaviors +// usually the implementation should be singleton type MetadataService interface { common.RPCService // ServiceName will get the service's name in meta service , which is application name @@ -56,7 +57,7 @@ type MetadataService interface { Version() string } -// BaseMetadataService is used for the common logic for struct who will implement interface MetadataService +// BaseMetadataService is used for the event logic for struct who will implement interface MetadataService type BaseMetadataService struct { } diff --git a/registry/base_registry.go b/registry/base_registry.go index ad1a3b617..504e08709 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -92,7 +92,7 @@ type FacadeBasedRegistry interface { InitListeners() } -// BaseRegistry is a common logic abstract for registry. It implement Registry interface. +// BaseRegistry is a event logic abstract for registry. It implement Registry interface. type BaseRegistry struct { context context.Context facadeBasedRegistry FacadeBasedRegistry diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 04fae049d..f2b2f8edd 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -64,7 +64,7 @@ func TestSubscribe(t *testing.T) { // registryDirectory, mockRegistry := normalRegistryDir() // time.Sleep(1e9) // assert.Len(t, registryDirectory.cacheInvokers, 3) -// mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeDel, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))}) +// mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeDel, Service: *event.NewURLWithOptions(event.WithPath("TEST0"), event.WithProtocol("dubbo"))}) // time.Sleep(1e9) // assert.Len(t, registryDirectory.cacheInvokers, 2) //} diff --git a/registry/event/customizable_service_instance_listener.go b/registry/event/customizable_service_instance_listener.go new file mode 100644 index 000000000..cd4a8d2db --- /dev/null +++ b/registry/event/customizable_service_instance_listener.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "reflect" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" +) + +func init() { + extension.AddEventListener(&customizableServiceInstanceListener{}) +} + +type customizableServiceInstanceListener struct { +} + +// GetPriority return priority 9999, +// 9999 is big enough to make sure it will be last invoked +func (c *customizableServiceInstanceListener) GetPriority() int { + return 9999 +} + +// OnEvent if the event is ServiceInstancePreRegisteredEvent +// it will iterate all ServiceInstanceCustomizer instances +// or it will do nothing +func (c *customizableServiceInstanceListener) OnEvent(e observer.Event) error { + if preRegEvent, ok := e.(*ServiceInstancePreRegisteredEvent); ok { + for _, cus := range extension.GetCustomizers() { + cus.Customize(preRegEvent.serviceInstance) + } + } + return nil +} + +// GetEventType will return ServiceInstancePreRegisteredEvent +func (c *customizableServiceInstanceListener) GetEventType() reflect.Type { + return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{}) +} diff --git a/registry/common/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go similarity index 99% rename from registry/common/event_publishing_service_deiscovery_test.go rename to registry/event/event_publishing_service_deiscovery_test.go index 1e08335e0..27d2482d0 100644 --- a/registry/common/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( "reflect" diff --git a/registry/common/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go similarity index 99% rename from registry/common/event_publishing_service_discovery.go rename to registry/event/event_publishing_service_discovery.go index f61dd8469..76fdf7237 100644 --- a/registry/common/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( gxset "github.com/dubbogo/gost/container/set" diff --git a/registry/event/log_event_listener.go b/registry/event/log_event_listener.go new file mode 100644 index 000000000..282b18de7 --- /dev/null +++ b/registry/event/log_event_listener.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "reflect" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/observer" +) + +func init() { + extension.AddEventListener(&logEventListener{}) +} + +type logEventListener struct { + +} + +func (l *logEventListener) GetPriority() int { + return 0 +} + +func (l *logEventListener) OnEvent(e observer.Event) error { + logger.Info("Event happen: " + e.String()) + return nil +} + +func (l *logEventListener) GetEventType() reflect.Type { + return nil +} + diff --git a/registry/event/log_event_listener_test.go b/registry/event/log_event_listener_test.go new file mode 100644 index 000000000..e7c0ba0df --- /dev/null +++ b/registry/event/log_event_listener_test.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLogEventListener(t *testing.T) { + l := &logEventListener{} + assert.Equal(t, 0, l.GetPriority()) + assert.Nil(t, l.GetEventType()) + assert.Nil(t, l.OnEvent(&ServiceDiscoveryDestroyedEvent{})) +} diff --git a/registry/event/protocol_ports_metadata_customizer.go b/registry/event/protocol_ports_metadata_customizer.go new file mode 100644 index 000000000..5ae0ea91f --- /dev/null +++ b/registry/event/protocol_ports_metadata_customizer.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/service/remote" + "github.com/apache/dubbo-go/registry" +) + +// ProtocolPortsMetadataCustomizer will update +type ProtocolPortsMetadataCustomizer struct { +} + +// GetPriority will return 0, which means it will be invoked at the beginning +func (p *ProtocolPortsMetadataCustomizer) GetPriority() int { + return 0 +} + +// Customize will +func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceInstance) { + metadataService, err := remote.NewMetadataService() + if err != nil { + logger.Errorf("Could not init the MetadataService", err) + return + } + + // 4 is enough... + protocolMap := make(map[string]int, 4) + + list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE,constant.ANY_VALUE) + if err != nil { + logger.Errorf("Could", err) + return + } +} diff --git a/registry/event/service_config_exported_event.go b/registry/event/service_config_exported_event.go new file mode 100644 index 000000000..3d3b0c8f0 --- /dev/null +++ b/registry/event/service_config_exported_event.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "github.com/apache/dubbo-go/common/observer" +) + +type ServiceConfigExportedEvent struct { + observer.BaseEvent +} diff --git a/registry/common/service_discovery_event.go b/registry/event/service_discovery_event.go similarity index 99% rename from registry/common/service_discovery_event.go rename to registry/event/service_discovery_event.go index a60ca56a3..74f6c5f19 100644 --- a/registry/common/service_discovery_event.go +++ b/registry/event/service_discovery_event.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( "github.com/apache/dubbo-go/common/observer" diff --git a/registry/common/service_instance_event.go b/registry/event/service_instance_event.go similarity index 99% rename from registry/common/service_instance_event.go rename to registry/event/service_instance_event.go index f70e7ee0f..650b2e8e2 100644 --- a/registry/common/service_instance_event.go +++ b/registry/event/service_instance_event.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( "github.com/apache/dubbo-go/common/observer" diff --git a/registry/event/service_name_mapping_listener.go b/registry/event/service_name_mapping_listener.go new file mode 100644 index 000000000..71abca6f8 --- /dev/null +++ b/registry/event/service_name_mapping_listener.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "reflect" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/metadata/mapping" +) + +func init() { + extension.AddEventListener(&serviceNameMappingListener{ + nameMapping: extension.GetGlobalServiceNameMapping(), + }) +} +type serviceNameMappingListener struct { + nameMapping mapping.ServiceNameMapping +} + +func (s *serviceNameMappingListener) GetPriority() int { + panic("implement me") +} + +func (s *serviceNameMappingListener) OnEvent(e observer.Event) error { + // TODO + panic("implement me") +} + +func (s *serviceNameMappingListener) GetEventType() reflect.Type { + return reflect.TypeOf(&ServiceConfigExportedEvent{}) +} diff --git a/registry/event_listener.go b/registry/event_listener.go index 1cd5ad43a..8a2bc8895 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -53,4 +53,4 @@ func (lstn *ServiceInstancesChangedListener) GetPriority() int { // get event type func (lstn *ServiceInstancesChangedListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceInstancesChangedEvent{}) -} +} \ No newline at end of file diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index e9230195f..6b1d3c23d 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -40,7 +40,7 @@ import ( ) const ( - defaultGroup = "DEFAULT_GROUP" + defaultGroup = constant.SERVICE_DISCOVERY_DEFAULT_GROUP idKey = "id" ) diff --git a/registry/registry.go b/registry/registry.go index 74e63aa66..ce214c497 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -44,7 +44,7 @@ type Registry interface { //Will remove in dubbogo version v1.1.0 //mode1 : return Listener with Next function which can return subscribe service event from registry //Deprecated! - //subscribe(common.URL) (Listener, error) + //subscribe(event.URL) (Listener, error) //Will relace mode1 in dubbogo version v1.1.0 //mode2 : callback mode, subscribe with notify(notify listener). diff --git a/registry/service_instance.go b/registry/service_instance.go index 2cc229ee3..f9fee9f45 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -17,6 +17,10 @@ package registry +import ( + gxsort "github.com/dubbogo/gost/sort" +) + type ServiceInstance interface { // GetId will return this instance's id. It should be unique. @@ -87,3 +91,9 @@ func (d *DefaultServiceInstance) IsHealthy() bool { func (d *DefaultServiceInstance) GetMetadata() map[string]string { return d.Metadata } + +type ServiceInstanceCustomizer interface { + gxsort.Prioritizer + + Customize(instance ServiceInstance) +} diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 949010431..9525a7d41 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -26,6 +26,7 @@ import ( cm "github.com/Workiva/go-datastructures/common" gxset "github.com/dubbogo/gost/container/set" + gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "github.com/apache/dubbo-go/common" @@ -38,7 +39,7 @@ import ( "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" - registryCommon "github.com/apache/dubbo-go/registry/common" + registryCommon "github.com/apache/dubbo-go/registry/event" "github.com/apache/dubbo-go/registry/servicediscovery/proxy" "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" "github.com/apache/dubbo-go/remoting" @@ -77,7 +78,7 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { } subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() - serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) + serviceNameMapping := extension.GetGlobalServiceNameMapping() metaDataService, err := remote.NewMetadataService() if err != nil { return nil, perrors.WithMessage(err, "could not init metadata service") @@ -160,17 +161,52 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error { return nil } ok, err := s.metaDataService.ExportURL(url) - s.metaDataService.PublishServiceDefinition(url) + if err != nil { logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error()) return err } - if ok { - logger.Infof("The URL[%s] registry successfully!", url.String()) - } else { + if !ok { logger.Warnf("The URL[%s] has been registry!", url.String()) } - return nil + + // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap + // But we don't want to design a similar bootstrap class. + ins, err := createInstance(url) + if err != nil { + return perrors.WithMessage(err, "could not create servcie instance, please check your service url") + } + return s.serviceDiscovery.Register(ins) +} + +func createInstance(url common.URL) (registry.ServiceInstance, error) { + appConfig := config.GetApplicationConfig() + port, err := strconv.ParseInt(url.Port, 10, 32) + if err != nil { + return nil, perrors.WithMessage(err, "invalid port: "+url.Port) + } + + host := url.Ip + if len(host) == 0 { + host, err = gxnet.GetLocalIP() + if err != nil { + return nil, perrors.WithMessage(err, "could not get the local Ip") + } + } + + // usually we will add more metadata + metadata := make(map[string]string, 8) + metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType + + return ®istry.DefaultServiceInstance{ + ServiceName: appConfig.Name, + Host: host, + Port: int(port), + Id: host + constant.KEY_SEPARATOR + url.Port, + Enable: true, + Healthy: true, + Metadata: metadata, + }, nil } func shouldRegister(url common.URL) bool { diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index 90a8c0e25..4aa52f55c 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -28,10 +28,10 @@ var ( ) func TestServiceDiscoveryRegistry_Register(t *testing.T) { - //registryURL,_:=common.NewURL("in-memory://localhost:12345", - // common.WithParamsValue("registry-type","service"), - // common.WithParamsValue("subscribed-services","a, b , c,d,e ,")) - //url,_:=common.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + + //registryURL,_:=event.NewURL("in-memory://localhost:12345", + // event.WithParamsValue("registry-type","service"), + // event.WithParamsValue("subscribed-services","a, b , c,d,e ,")) + //url,_:=event.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + // "?&application=" + GROUP + // "&interface=" + SERVICE_INTERFACE + // "&group=" + GROUP + diff --git a/test/integrate/dubbo/go-client/client.go b/test/integrate/dubbo/go-client/client.go index 4c62674d3..c075ec22c 100644 --- a/test/integrate/dubbo/go-client/client.go +++ b/test/integrate/dubbo/go-client/client.go @@ -25,7 +25,7 @@ import ( import ( hessian "github.com/apache/dubbo-go-hessian2" - _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" + _ "github.com/apache/dubbo-go/event/proxy/proxy_factory" "github.com/apache/dubbo-go/config" _ "github.com/apache/dubbo-go/protocol/dubbo" _ "github.com/apache/dubbo-go/registry/protocol" diff --git a/test/integrate/dubbo/go-server/server.go b/test/integrate/dubbo/go-server/server.go index 115bf0a4d..4cc6c4908 100644 --- a/test/integrate/dubbo/go-server/server.go +++ b/test/integrate/dubbo/go-server/server.go @@ -25,7 +25,7 @@ import ( hessian "github.com/apache/dubbo-go-hessian2" _ "github.com/apache/dubbo-go/cluster/cluster_impl" _ "github.com/apache/dubbo-go/cluster/loadbalance" - _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" + _ "github.com/apache/dubbo-go/event/proxy/proxy_factory" "github.com/apache/dubbo-go/config" _ "github.com/apache/dubbo-go/filter/filter_impl" _ "github.com/apache/dubbo-go/protocol/dubbo" -- GitLab