diff --git a/common/constant/default.go b/common/constant/default.go index 6b9d914c87d3d7c1018647756c59aa874e1d63f1..8442609c59192a2388e68b73a3c6a0f4ecb8d3ef 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -78,3 +78,7 @@ const ( const ( SIMPLE_METADATA_SERVICE_NAME = "MetadataService" ) + +const ( + SERVICE_DISCOVERY_DEFAULT_GROUP = "DEFAULT_GROUP" +) \ No newline at end of file diff --git a/common/constant/key.go b/common/constant/key.go index 15496628ebef49517abc2e6811822a34b94caddf..394f2d8c0426d21f6eaae79a732fbea34aec597b 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -273,9 +273,6 @@ const ( METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "url-params" METADATA_SERVICE_URLS_PROPERTY_NAME = METADATA_SERVICE_PREFIX + "urls" - // used by URL - // SERVICE_NAME_MAPPING_KEY indicate that which service name mapping instance will be used - SERVICE_NAME_MAPPING_KEY = "name_mapping" // SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used SERVICE_DISCOVERY_KEY = "service_discovery" ) diff --git a/common/extension/service_instance_customizer.go b/common/extension/service_instance_customizer.go new file mode 100644 index 0000000000000000000000000000000000000000..40f8ca0c5dd8fbf16830df8ff2eb1f88fcb7552e --- /dev/null +++ b/common/extension/service_instance_customizer.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "sort" + + "github.com/apache/dubbo-go/registry" +) + +var ( + customizers = make([]registry.ServiceInstanceCustomizer, 0, 8) +) + +// AddCustomizers will put the customizer into slices and then sort them; +// this method will be invoked several time, so we sort them here. +func AddCustomizers(cus registry.ServiceInstanceCustomizer) { + customizers = append(customizers, cus) + sort.Stable(customizerSlice(customizers)) +} + +// GetCustomizers will return the sorted customizer +func GetCustomizers() []registry.ServiceInstanceCustomizer { + return customizers +} + +type customizerSlice []registry.ServiceInstanceCustomizer + +func (c customizerSlice) Len() int { + return len(c) +} + +func (c customizerSlice) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c customizerSlice) Less(i, j int) bool { return c[i].GetPriority() < c[j].GetPriority() } diff --git a/common/extension/service_name_mapping.go b/common/extension/service_name_mapping.go index 2935f49d8122775f6ad5dc3a8a37f94f533653c4..0330b8a4d373fae917d40ac4a6b2b62e513216d3 100644 --- a/common/extension/service_name_mapping.go +++ b/common/extension/service_name_mapping.go @@ -20,17 +20,13 @@ package extension import "github.com/apache/dubbo-go/metadata/mapping" var ( - nameMappings = make(map[string]func() mapping.ServiceNameMapping) + globalNameMapping mapping.ServiceNameMapping ) -func SetServiceNameMapping(name string, creator func() mapping.ServiceNameMapping) { - nameMappings[name] = creator +func SetGlobalServiceNameMapping(nameMapping mapping.ServiceNameMapping) { + globalNameMapping = nameMapping } -func GetServiceNameMapping(name string) mapping.ServiceNameMapping { - creator, ok := nameMappings[name] - if !ok { - panic("Can not find the target service name mapping: " + name) - } - return creator() +func GetGlobalServiceNameMapping() mapping.ServiceNameMapping { + return globalNameMapping } diff --git a/common/observer/event_listener.go b/common/observer/event_listener.go index fabad3a6ffb10cb27a7cc31b1c08cffe5ed853e0..3f8eeffaf97deb86cd0bbde56012f82aacdd4746 100644 --- a/common/observer/event_listener.go +++ b/common/observer/event_listener.go @@ -32,6 +32,7 @@ type EventListener interface { // OnEvent handle this event OnEvent(e Event) error // GetEventType listen which event type + // return nil if the implementation want to listen any event GetEventType() reflect.Type } diff --git a/common/rpc_service_test.go b/common/rpc_service_test.go index 2311205d0ec0c2fd4642a4d8639c0bf871fe1d17..19a1d7b03b48d16d9d75b649d603d5d22ee7ba56 100644 --- a/common/rpc_service_test.go +++ b/common/rpc_service_test.go @@ -137,7 +137,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType := suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String()) + assert.Equal(t, "func(*event.TestService, context.Context, interface {}, interface {}, interface {}) error", method.Type.String()) at := methodType.ArgsType() assert.Equal(t, "interface {}", at[0].String()) assert.Equal(t, "interface {}", at[1].String()) @@ -151,7 +151,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType = suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String()) + assert.Equal(t, "func(*event.TestService, interface {}, interface {}, interface {}) (interface {}, error)", method.Type.String()) at = methodType.ArgsType() assert.Equal(t, "interface {}", at[0].String()) assert.Equal(t, "interface {}", at[1].String()) @@ -164,7 +164,7 @@ func TestSuiteMethod(t *testing.T) { assert.True(t, ok) methodType = suiteMethod(method) method = methodType.Method() - assert.Equal(t, "func(*common.TestService) error", method.Type.String()) + assert.Equal(t, "func(*event.TestService) error", method.Type.String()) at = methodType.ArgsType() assert.Equal(t, 0, len(at)) assert.Nil(t, methodType.CtxType()) diff --git a/config/application_config.go b/config/application_config.go index 33b47c81dd0da9959984cd1f53648167863cb713..af6637c4e08df0d7160f10881bd1fe588b95f607 100644 --- a/config/application_config.go +++ b/config/application_config.go @@ -33,7 +33,8 @@ type ApplicationConfig struct { Version string `yaml:"version" json:"version,omitempty" property:"version"` Owner string `yaml:"owner" json:"owner,omitempty" property:"owner"` Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"` - MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"` //field for metadata report + // the metadata type. remote or local + MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"` } // Prefix ... diff --git a/config/base_config.go b/config/base_config.go index 2f51690d30d847871fe547055b76309a93f88066..dad4d7f7ef9a44e125556974480731519249293f 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -40,7 +40,7 @@ type multiConfiger interface { Prefix() string } -// BaseConfig is the common configuration for provider and consumer +// BaseConfig is the event configuration for provider and consumer type BaseConfig struct { ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` Remotes map[string]*RemoteConfig `yaml:"remote" json:"remote,omitempty"` @@ -51,7 +51,7 @@ type BaseConfig struct { configCenterUrl *common.URL prefix string fatherConfig interface{} - eventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"` + EventDispatcherType string `default:"direct" yaml:"event_dispatcher_type" json:"event_dispatcher_type,omitempty"` MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` fileStream *bytes.Buffer } @@ -376,7 +376,6 @@ func initializeStruct(t reflect.Type, v reflect.Value) { default: } } - } } } diff --git a/config/config_loader.go b/config/config_loader.go index a4dc62ffbd0decbf83bc8b9f5ff9f0b17718277d..4a293e7a9f7696641fc9280ca2b0ae5452e8914c 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -218,15 +218,23 @@ func Load() { // init router initRouter() + // event part + extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType) + + // start the metadata report if config set + if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil { + logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err) + return + } + + logger.Debugf("provider config{%#v}\n", providerConfig) + // reference config loadConsumerConfig() // service config loadProviderConfig() - // common part - extension.SetAndInitGlobalDispatcher(providerConfig.eventDispatcherType) - // init the shutdown callback GracefulShutdownInit() } diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go index 66b88d5e9acb7c7f737104c53fe3a04982b92d9b..9999371ab887fbf7d3c5857aea02b1dd4faad2da 100644 --- a/config/metadata_report_config.go +++ b/config/metadata_report_config.go @@ -93,7 +93,7 @@ func (c *MetadataReportConfig) IsValid() bool { // StartMetadataReport: The entry of metadata report start func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error { - if metadataReportConfig == nil || metadataReportConfig.IsValid() { + if metadataReportConfig == nil || !metadataReportConfig.IsValid() { return nil } diff --git a/config/provider_config.go b/config/provider_config.go index f0d3c4cf7f76a7d57ed0cfe4b6def2965003b868..0f14c9f1d36219c72fdd81124ab773c8bdfcb09b 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -28,7 +28,6 @@ import ( import ( "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/common/yaml" ) @@ -95,11 +94,6 @@ func ProviderInit(confProFile string) error { n.InterfaceId = k } } - // start the metadata report if config set - if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil { - return perrors.WithMessagef(err, "Provider starts metadata report error, and the error is {%#v}", err) - } - logger.Debugf("provider config{%#v}\n", providerConfig) return nil } diff --git a/filter/rejected_execution_handler.go b/filter/rejected_execution_handler.go index d02481b98d2937ac58d277becdb1240b8a4e9b0f..9855bd5354d0582bffaae177dfe8d8ddb95ed985 100644 --- a/filter/rejected_execution_handler.go +++ b/filter/rejected_execution_handler.go @@ -26,7 +26,7 @@ import ( * RejectedExecutionHandler * If the invocation cannot pass any validation in filter, like ExecuteLimitFilter and TpsLimitFilter, * the implementation will be used. - * The common case is that sometimes you want to return the default value when the request was rejected. + * The event case is that sometimes you want to return the default value when the request was rejected. * Or you want to be warned if any request was rejected. * In such situation, implement this interface and register it by invoking extension.SetRejectedExecutionHandler. */ diff --git a/metadata/mapping/dynamic/service_name_mapping.go b/metadata/mapping/dynamic/service_name_mapping.go index 3d502b618481638e90cca10e437b1a0f325eb791..56ef2a0f0d30ebb8f68a480d7631fbe2068e30bc 100644 --- a/metadata/mapping/dynamic/service_name_mapping.go +++ b/metadata/mapping/dynamic/service_name_mapping.go @@ -18,11 +18,10 @@ package dynamic import ( - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/metadata/mapping" "strconv" - "sync" "time" + + "github.com/apache/dubbo-go/common/extension" ) import ( @@ -43,7 +42,8 @@ const ( ) func init() { - extension.SetServiceNameMapping("dynamic", GetServiceNameMappingInstance) + dc := common_cfg.GetEnvInstance().GetDynamicConfiguration() + extension.SetGlobalServiceNameMapping(&DynamicConfigurationServiceNameMapping{dc: dc}) } // DynamicConfigurationServiceNameMapping is the implementation based on config center @@ -82,17 +82,3 @@ func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface str // so other params are ignored and remove, including group string, version string, protocol string return defaultGroup + slash + serviceInterface } - -var ( - serviceNameMappingInstance *DynamicConfigurationServiceNameMapping - serviceNameMappingInitOnce sync.Once -) - -// GetServiceNameMappingInstance will return an instance of DynamicConfigurationServiceNameMapping -func GetServiceNameMappingInstance() mapping.ServiceNameMapping { - serviceNameMappingInitOnce.Do(func() { - dc := common_cfg.GetEnvInstance().GetDynamicConfiguration() - serviceNameMappingInstance = &DynamicConfigurationServiceNameMapping{dc: dc} - }) - return serviceNameMappingInstance -} diff --git a/metadata/mapping/memory/service_name_mapping.go b/metadata/mapping/memory/service_name_mapping.go index 747753a1409942a13cf89192d9177b9b7e28d6bb..293d97c842820063a8c606773e9d22dfa570b8ea 100644 --- a/metadata/mapping/memory/service_name_mapping.go +++ b/metadata/mapping/memory/service_name_mapping.go @@ -17,11 +17,6 @@ package memory -import ( - "github.com/apache/dubbo-go/metadata/mapping" - "sync" -) - import ( gxset "github.com/dubbogo/gost/container/set" ) @@ -32,7 +27,7 @@ import ( ) func init() { - extension.SetServiceNameMapping("in-memory", GetInMemoryServiceNameMappingInstance) + extension.SetGlobalServiceNameMapping(&InMemoryServiceNameMapping{}) } type InMemoryServiceNameMapping struct{} @@ -44,15 +39,3 @@ func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, v func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { return gxset.NewSet(config.GetApplicationConfig().Name), nil } - -var ( - nameMappingInstance *InMemoryServiceNameMapping - nameMappingInitOnce sync.Once -) - -func GetInMemoryServiceNameMappingInstance() mapping.ServiceNameMapping { - nameMappingInitOnce.Do(func() { - nameMappingInstance = &InMemoryServiceNameMapping{} - }) - return nameMappingInstance -} diff --git a/metadata/service/inmemory/service.go b/metadata/service/inmemory/service.go index 4b6f4330a1b88ecaad094df052bb7b53a96c6413..31492a32249ea61f7832af5e267ed0e9569bb091 100644 --- a/metadata/service/inmemory/service.go +++ b/metadata/service/inmemory/service.go @@ -175,12 +175,12 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { if len(interfaceName) > 0 && !isGeneric { //judge is consumer or provider //side := url.GetParam(constant.SIDE_KEY, "") - //var service common.RPCService + //var service event.RPCService service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) - //if side == common.RoleType(common.CONSUMER).Role() { + //if side == event.RoleType(event.CONSUMER).Role() { // //TODO:generate the service definition and store it // - //} else if side == common.RoleType(common.PROVIDER).Role() { + //} else if side == event.RoleType(event.PROVIDER).Role() { // //TODO:generate the service definition and store it //} sd := definition.BuildServiceDefinition(*service, url) diff --git a/metadata/service/remote/service.go b/metadata/service/remote/service.go index cd8a2dc837caaef341d34a720070ce4e4e6f6bcd..b616e2a4d45910a7bcc9f76b854727bd97ad87a4 100644 --- a/metadata/service/remote/service.go +++ b/metadata/service/remote/service.go @@ -18,6 +18,8 @@ package remote import ( + "sync" + "github.com/Workiva/go-datastructures/slice/skip" "go.uber.org/atomic" ) @@ -38,6 +40,7 @@ import ( const version = "1.0.0" // MetadataService is a implement of metadata service which will delegate the remote metadata report +// This is singleton type MetadataService struct { service.BaseMetadataService inMemoryMetadataService *inmemory.MetadataService @@ -46,16 +49,26 @@ type MetadataService struct { delegateReport *delegate.MetadataReport } +var ( + metadataServiceOnce sync.Once + metadataServiceInstance *MetadataService +) + // NewMetadataService will create a new remote MetadataService instance func NewMetadataService() (*MetadataService, error) { - mr, err := delegate.NewMetadataReport() - if err != nil { - return nil, err - } - return &MetadataService{ - inMemoryMetadataService: inmemory.NewMetadataService(), - delegateReport: mr, - }, nil + var err error + metadataServiceOnce.Do(func() { + var mr *delegate.MetadataReport + mr, err = delegate.NewMetadataReport() + if err != nil { + return + } + metadataServiceInstance = &MetadataService{ + inMemoryMetadataService: inmemory.NewMetadataService(), + delegateReport: mr, + } + }) + return metadataServiceInstance, err } // setInMemoryMetadataService will replace the in memory metadata service by the specific param @@ -90,39 +103,41 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error { interfaceName := url.GetParam(constant.INTERFACE_KEY, "") isGeneric := url.GetParamBool(constant.GENERIC_KEY, false) if len(interfaceName) > 0 && !isGeneric { - service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) - sd := definition.BuildServiceDefinition(*service, url) + sv := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service())) + sd := definition.BuildServiceDefinition(*sv, url) id := &identifier.MetadataIdentifier{ BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ ServiceInterface: interfaceName, Version: url.GetParam(constant.VERSION_KEY, ""), - Group: url.GetParam(constant.GROUP_KEY, ""), + // Group: url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP), + Group: url.GetParam(constant.GROUP_KEY, "test"), }, } mts.delegateReport.StoreProviderMetadata(id, sd) + return nil } logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url) return nil } // GetExportedURLs will be implemented by in memory service -func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) { - return nil, nil +func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) { + return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol) } // GetSubscribedURLs will be implemented by in memory service -func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) { - return nil, nil +func (mts *MetadataService) GetSubscribedURLs() (*skip.SkipList, error) { + return mts.inMemoryMetadataService.GetSubscribedURLs() } // GetServiceDefinition will be implemented by in memory service -func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { - return "", nil +func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + return mts.inMemoryMetadataService.GetServiceDefinition(interfaceName, group, version) } // GetServiceDefinitionByServiceKey will be implemented by in memory service -func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { - return "", nil +func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + return mts.inMemoryMetadataService.GetServiceDefinitionByServiceKey(serviceKey) } // RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service diff --git a/metadata/service/service.go b/metadata/service/service.go index 13464087ed83a79064410e5b37cde79e09317c38..2f8a98d18536a7f93d79fa552325b19e1ed80508 100644 --- a/metadata/service/service.go +++ b/metadata/service/service.go @@ -27,7 +27,8 @@ import ( "github.com/apache/dubbo-go/config" ) -// Metadataservice is used to define meta data related behaviors +// MetadataService is used to define meta data related behaviors +// usually the implementation should be singleton type MetadataService interface { common.RPCService // ServiceName will get the service's name in meta service , which is application name @@ -56,7 +57,7 @@ type MetadataService interface { Version() string } -// BaseMetadataService is used for the common logic for struct who will implement interface MetadataService +// BaseMetadataService is used for the event logic for struct who will implement interface MetadataService type BaseMetadataService struct { } diff --git a/registry/base_registry.go b/registry/base_registry.go index ad1a3b61741e003625612ad58409eb8615271a84..504e087091ae30264676c90feb49d5690b85c164 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -92,7 +92,7 @@ type FacadeBasedRegistry interface { InitListeners() } -// BaseRegistry is a common logic abstract for registry. It implement Registry interface. +// BaseRegistry is a event logic abstract for registry. It implement Registry interface. type BaseRegistry struct { context context.Context facadeBasedRegistry FacadeBasedRegistry diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 04fae049d49bc2970668dc0ae8360f48da9d49dc..f2b2f8edd2d46950d2e74733b1d869e0de282ec0 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -64,7 +64,7 @@ func TestSubscribe(t *testing.T) { // registryDirectory, mockRegistry := normalRegistryDir() // time.Sleep(1e9) // assert.Len(t, registryDirectory.cacheInvokers, 3) -// mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeDel, Service: *common.NewURLWithOptions(common.WithPath("TEST0"), common.WithProtocol("dubbo"))}) +// mockRegistry.MockEvent(®istry.ServiceEvent{Action: remoting.EventTypeDel, Service: *event.NewURLWithOptions(event.WithPath("TEST0"), event.WithProtocol("dubbo"))}) // time.Sleep(1e9) // assert.Len(t, registryDirectory.cacheInvokers, 2) //} diff --git a/registry/event/customizable_service_instance_listener.go b/registry/event/customizable_service_instance_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..cd4a8d2db644e3be0cc112be661013083dfddf28 --- /dev/null +++ b/registry/event/customizable_service_instance_listener.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "reflect" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" +) + +func init() { + extension.AddEventListener(&customizableServiceInstanceListener{}) +} + +type customizableServiceInstanceListener struct { +} + +// GetPriority return priority 9999, +// 9999 is big enough to make sure it will be last invoked +func (c *customizableServiceInstanceListener) GetPriority() int { + return 9999 +} + +// OnEvent if the event is ServiceInstancePreRegisteredEvent +// it will iterate all ServiceInstanceCustomizer instances +// or it will do nothing +func (c *customizableServiceInstanceListener) OnEvent(e observer.Event) error { + if preRegEvent, ok := e.(*ServiceInstancePreRegisteredEvent); ok { + for _, cus := range extension.GetCustomizers() { + cus.Customize(preRegEvent.serviceInstance) + } + } + return nil +} + +// GetEventType will return ServiceInstancePreRegisteredEvent +func (c *customizableServiceInstanceListener) GetEventType() reflect.Type { + return reflect.TypeOf(&ServiceInstancePreRegisteredEvent{}) +} diff --git a/registry/common/event_publishing_service_deiscovery_test.go b/registry/event/event_publishing_service_deiscovery_test.go similarity index 99% rename from registry/common/event_publishing_service_deiscovery_test.go rename to registry/event/event_publishing_service_deiscovery_test.go index 1e08335e04232c9b5944e34133e4e979bee9ee74..27d2482d06c6e5339e4ba7f93f54ca8d9378edb3 100644 --- a/registry/common/event_publishing_service_deiscovery_test.go +++ b/registry/event/event_publishing_service_deiscovery_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( "reflect" diff --git a/registry/common/event_publishing_service_discovery.go b/registry/event/event_publishing_service_discovery.go similarity index 99% rename from registry/common/event_publishing_service_discovery.go rename to registry/event/event_publishing_service_discovery.go index f61dd84690f4878eaaf7fb29890d6dab2210ef8f..76fdf7237a9b566a5d6add3b1e4dae757793098c 100644 --- a/registry/common/event_publishing_service_discovery.go +++ b/registry/event/event_publishing_service_discovery.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( gxset "github.com/dubbogo/gost/container/set" diff --git a/registry/event/log_event_listener.go b/registry/event/log_event_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..282b18de79be344ba079364af446e3f26f83ce0b --- /dev/null +++ b/registry/event/log_event_listener.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "reflect" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/common/observer" +) + +func init() { + extension.AddEventListener(&logEventListener{}) +} + +type logEventListener struct { + +} + +func (l *logEventListener) GetPriority() int { + return 0 +} + +func (l *logEventListener) OnEvent(e observer.Event) error { + logger.Info("Event happen: " + e.String()) + return nil +} + +func (l *logEventListener) GetEventType() reflect.Type { + return nil +} + diff --git a/registry/event/log_event_listener_test.go b/registry/event/log_event_listener_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e7c0ba0df56f637cafa9568913d48dc6e562d5a5 --- /dev/null +++ b/registry/event/log_event_listener_test.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLogEventListener(t *testing.T) { + l := &logEventListener{} + assert.Equal(t, 0, l.GetPriority()) + assert.Nil(t, l.GetEventType()) + assert.Nil(t, l.OnEvent(&ServiceDiscoveryDestroyedEvent{})) +} diff --git a/registry/event/protocol_ports_metadata_customizer.go b/registry/event/protocol_ports_metadata_customizer.go new file mode 100644 index 0000000000000000000000000000000000000000..5ae0ea91f650978bf172e1cd4a118f277d212744 --- /dev/null +++ b/registry/event/protocol_ports_metadata_customizer.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/metadata/service/remote" + "github.com/apache/dubbo-go/registry" +) + +// ProtocolPortsMetadataCustomizer will update +type ProtocolPortsMetadataCustomizer struct { +} + +// GetPriority will return 0, which means it will be invoked at the beginning +func (p *ProtocolPortsMetadataCustomizer) GetPriority() int { + return 0 +} + +// Customize will +func (p *ProtocolPortsMetadataCustomizer) Customize(instance registry.ServiceInstance) { + metadataService, err := remote.NewMetadataService() + if err != nil { + logger.Errorf("Could not init the MetadataService", err) + return + } + + // 4 is enough... + protocolMap := make(map[string]int, 4) + + list, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE,constant.ANY_VALUE) + if err != nil { + logger.Errorf("Could", err) + return + } +} diff --git a/registry/event/service_config_exported_event.go b/registry/event/service_config_exported_event.go new file mode 100644 index 0000000000000000000000000000000000000000..3d3b0c8f0e9fb662028a194a148cff6c3405d5cb --- /dev/null +++ b/registry/event/service_config_exported_event.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "github.com/apache/dubbo-go/common/observer" +) + +type ServiceConfigExportedEvent struct { + observer.BaseEvent +} diff --git a/registry/common/service_discovery_event.go b/registry/event/service_discovery_event.go similarity index 99% rename from registry/common/service_discovery_event.go rename to registry/event/service_discovery_event.go index a60ca56a39016738daf6e992b286f2b04c2fdec8..74f6c5f19dd4b4cfb5ceaae4010df1d49b03aa41 100644 --- a/registry/common/service_discovery_event.go +++ b/registry/event/service_discovery_event.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( "github.com/apache/dubbo-go/common/observer" diff --git a/registry/common/service_instance_event.go b/registry/event/service_instance_event.go similarity index 99% rename from registry/common/service_instance_event.go rename to registry/event/service_instance_event.go index f70e7ee0ff12bf41c761f6bf7c239228df046980..650b2e8e29e23498a49f11b58aa53b018ca42e67 100644 --- a/registry/common/service_instance_event.go +++ b/registry/event/service_instance_event.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package common +package event import ( "github.com/apache/dubbo-go/common/observer" diff --git a/registry/event/service_name_mapping_listener.go b/registry/event/service_name_mapping_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..71abca6f870e3e5a635153dc7460424ec9d32d6d --- /dev/null +++ b/registry/event/service_name_mapping_listener.go @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package event + +import ( + "reflect" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/observer" + "github.com/apache/dubbo-go/metadata/mapping" +) + +func init() { + extension.AddEventListener(&serviceNameMappingListener{ + nameMapping: extension.GetGlobalServiceNameMapping(), + }) +} +type serviceNameMappingListener struct { + nameMapping mapping.ServiceNameMapping +} + +func (s *serviceNameMappingListener) GetPriority() int { + panic("implement me") +} + +func (s *serviceNameMappingListener) OnEvent(e observer.Event) error { + // TODO + panic("implement me") +} + +func (s *serviceNameMappingListener) GetEventType() reflect.Type { + return reflect.TypeOf(&ServiceConfigExportedEvent{}) +} diff --git a/registry/event_listener.go b/registry/event_listener.go index 1cd5ad43a66acc70c6a7938f8d6532346fd6410d..8a2bc889566356d1619526c6bb0739959ad2e515 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -53,4 +53,4 @@ func (lstn *ServiceInstancesChangedListener) GetPriority() int { // get event type func (lstn *ServiceInstancesChangedListener) GetEventType() reflect.Type { return reflect.TypeOf(&ServiceInstancesChangedEvent{}) -} +} \ No newline at end of file diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index e9230195f6fe4191064c7aa308db1494d8635eec..6b1d3c23d171e115a317ce4d367157680f7369c0 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -40,7 +40,7 @@ import ( ) const ( - defaultGroup = "DEFAULT_GROUP" + defaultGroup = constant.SERVICE_DISCOVERY_DEFAULT_GROUP idKey = "id" ) diff --git a/registry/registry.go b/registry/registry.go index 74e63aa66ebdc674261ce4109b27a067ce769007..ce214c4971b0b00b5e300542ad9172331a0b3d02 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -44,7 +44,7 @@ type Registry interface { //Will remove in dubbogo version v1.1.0 //mode1 : return Listener with Next function which can return subscribe service event from registry //Deprecated! - //subscribe(common.URL) (Listener, error) + //subscribe(event.URL) (Listener, error) //Will relace mode1 in dubbogo version v1.1.0 //mode2 : callback mode, subscribe with notify(notify listener). diff --git a/registry/service_instance.go b/registry/service_instance.go index 2cc229ee3b056da2d9f1a1b70d3e0f5858c9da5f..f9fee9f452a48aaefb7557766329b204c8cfb713 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -17,6 +17,10 @@ package registry +import ( + gxsort "github.com/dubbogo/gost/sort" +) + type ServiceInstance interface { // GetId will return this instance's id. It should be unique. @@ -87,3 +91,9 @@ func (d *DefaultServiceInstance) IsHealthy() bool { func (d *DefaultServiceInstance) GetMetadata() map[string]string { return d.Metadata } + +type ServiceInstanceCustomizer interface { + gxsort.Prioritizer + + Customize(instance ServiceInstance) +} diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 949010431f7f08cb5827fd45b799292848f6d9c6..9525a7d4191e92868a9ea4a2f6c5f2f62c27d5a8 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -26,6 +26,7 @@ import ( cm "github.com/Workiva/go-datastructures/common" gxset "github.com/dubbogo/gost/container/set" + gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "github.com/apache/dubbo-go/common" @@ -38,7 +39,7 @@ import ( "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/metadata/service/remote" "github.com/apache/dubbo-go/registry" - registryCommon "github.com/apache/dubbo-go/registry/common" + registryCommon "github.com/apache/dubbo-go/registry/event" "github.com/apache/dubbo-go/registry/servicediscovery/proxy" "github.com/apache/dubbo-go/registry/servicediscovery/synthesizer" "github.com/apache/dubbo-go/remoting" @@ -77,7 +78,7 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { } subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, "")) subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer() - serviceNameMapping := extension.GetServiceNameMapping(url.GetParam(constant.SERVICE_NAME_MAPPING_KEY, "")) + serviceNameMapping := extension.GetGlobalServiceNameMapping() metaDataService, err := remote.NewMetadataService() if err != nil { return nil, perrors.WithMessage(err, "could not init metadata service") @@ -160,17 +161,52 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error { return nil } ok, err := s.metaDataService.ExportURL(url) - s.metaDataService.PublishServiceDefinition(url) + if err != nil { logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error()) return err } - if ok { - logger.Infof("The URL[%s] registry successfully!", url.String()) - } else { + if !ok { logger.Warnf("The URL[%s] has been registry!", url.String()) } - return nil + + // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap + // But we don't want to design a similar bootstrap class. + ins, err := createInstance(url) + if err != nil { + return perrors.WithMessage(err, "could not create servcie instance, please check your service url") + } + return s.serviceDiscovery.Register(ins) +} + +func createInstance(url common.URL) (registry.ServiceInstance, error) { + appConfig := config.GetApplicationConfig() + port, err := strconv.ParseInt(url.Port, 10, 32) + if err != nil { + return nil, perrors.WithMessage(err, "invalid port: "+url.Port) + } + + host := url.Ip + if len(host) == 0 { + host, err = gxnet.GetLocalIP() + if err != nil { + return nil, perrors.WithMessage(err, "could not get the local Ip") + } + } + + // usually we will add more metadata + metadata := make(map[string]string, 8) + metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType + + return ®istry.DefaultServiceInstance{ + ServiceName: appConfig.Name, + Host: host, + Port: int(port), + Id: host + constant.KEY_SEPARATOR + url.Port, + Enable: true, + Healthy: true, + Metadata: metadata, + }, nil } func shouldRegister(url common.URL) bool { diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index 90a8c0e25709e4bf9b624fc7ccff2f3a9e4169f4..4aa52f55cab068919209028f1b504ce0c4ae22e2 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -28,10 +28,10 @@ var ( ) func TestServiceDiscoveryRegistry_Register(t *testing.T) { - //registryURL,_:=common.NewURL("in-memory://localhost:12345", - // common.WithParamsValue("registry-type","service"), - // common.WithParamsValue("subscribed-services","a, b , c,d,e ,")) - //url,_:=common.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + + //registryURL,_:=event.NewURL("in-memory://localhost:12345", + // event.WithParamsValue("registry-type","service"), + // event.WithParamsValue("subscribed-services","a, b , c,d,e ,")) + //url,_:=event.NewURL("dubbo://192.168.0.102:20880/"+ SERVICE_INTERFACE + // "?&application=" + GROUP + // "&interface=" + SERVICE_INTERFACE + // "&group=" + GROUP + diff --git a/test/integrate/dubbo/go-client/client.go b/test/integrate/dubbo/go-client/client.go index 4c62674d33dba7caca72ca7552e73c4c0fdf14c9..c075ec22c3991aaea1b24ec4f59b3ab7e58520b4 100644 --- a/test/integrate/dubbo/go-client/client.go +++ b/test/integrate/dubbo/go-client/client.go @@ -25,7 +25,7 @@ import ( import ( hessian "github.com/apache/dubbo-go-hessian2" - _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" + _ "github.com/apache/dubbo-go/event/proxy/proxy_factory" "github.com/apache/dubbo-go/config" _ "github.com/apache/dubbo-go/protocol/dubbo" _ "github.com/apache/dubbo-go/registry/protocol" diff --git a/test/integrate/dubbo/go-server/server.go b/test/integrate/dubbo/go-server/server.go index 115bf0a4d78f171eb7f786808def91879ed93947..4cc6c490835d7ba29d139d71892b5e6e19d628e5 100644 --- a/test/integrate/dubbo/go-server/server.go +++ b/test/integrate/dubbo/go-server/server.go @@ -25,7 +25,7 @@ import ( hessian "github.com/apache/dubbo-go-hessian2" _ "github.com/apache/dubbo-go/cluster/cluster_impl" _ "github.com/apache/dubbo-go/cluster/loadbalance" - _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" + _ "github.com/apache/dubbo-go/event/proxy/proxy_factory" "github.com/apache/dubbo-go/config" _ "github.com/apache/dubbo-go/filter/filter_impl" _ "github.com/apache/dubbo-go/protocol/dubbo"