diff --git a/config/config_loader.go b/config/config_loader.go index d5f8c68c1bf35c40c09d7d15bae4b6b9f161e9e7..9a1c8fe9cd93c816294594ec11ae7858a29fea6a 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -21,6 +21,7 @@ import ( "fmt" "log" "os" + "strconv" "sync" "time" ) @@ -35,6 +36,7 @@ import ( "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" _ "github.com/apache/dubbo-go/common/observer/dispatcher" + "github.com/apache/dubbo-go/registry" ) var ( @@ -205,6 +207,98 @@ func loadProviderConfig() { panic(fmt.Sprintf("service %s export failed! err: %#v", key, err)) } } + registerServiceInstance() +} + +// registerServiceInstance register service instance +func registerServiceInstance(){ + url := selectMetadataServiceExportedURL() + if url == nil { + return + } + instance, err := createInstance(*url) + if err != nil { + panic(err) + } + p := extension.GetProtocol(constant.REGISTRY_KEY) + var rp registry.RegistryFactory + var ok bool + if rp, ok = p.(registry.RegistryFactory); !ok { + panic("dubbo registry protocol is invalid") + } + rs := rp.GetRegistries() + for _, r := range rs { + var sdr registry.ServiceDiscoveryFactory + if sdr, ok = r.(registry.ServiceDiscoveryFactory); !ok { + continue + } + err := sdr.GetServiceDiscovery().Register(instance) + if err != nil { + panic(err) + } + } +} + +// createInstance +func createInstance(url common.URL) (registry.ServiceInstance, error) { + appConfig := GetApplicationConfig() + port, err := strconv.ParseInt(url.Port, 10, 32) + if err != nil { + return nil, perrors.WithMessage(err, "invalid port: "+url.Port) + } + + host := url.Ip + if len(host) == 0 { + host, err = gxnet.GetLocalIP() + if err != nil { + return nil, perrors.WithMessage(err, "could not get the local Ip") + } + } + + // usually we will add more metadata + metadata := make(map[string]string, 8) + metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType + + return ®istry.DefaultServiceInstance{ + ServiceName: appConfig.Name, + Host: host, + Port: int(port), + Id: host + constant.KEY_SEPARATOR + url.Port, + Enable: true, + Healthy: true, + Metadata: metadata, + }, nil +} + +// selectMetadataServiceExportedURL get already be exported url +func selectMetadataServiceExportedURL() *common.URL { + var selectedUrl common.URL + metaDataService, err := extension.GetMetadataService(GetApplicationConfig().MetadataType) + if err != nil { + panic(err) + } + list, err := metaDataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE) + if err != nil { + panic(err) + } + if len(list) == 0 { + return nil + } + for _, urlStr := range list { + url, err := common.NewURL(urlStr.(string)) + if err != nil { + logger.Errorf("url format error {%v}", url) + continue + } + // rest first + if url.Protocol == "rest" { + selectedUrl = url + break + } else { + selectedUrl = url + } + } + return &selectedUrl } func initRouter() { diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 4c669b2cee74b95ceb3bc8287f145ccd6b99bc0b..963a3cba95a8641ebce6d8dc358bdfc70de40335 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -43,7 +43,7 @@ import ( ) var ( - regProtocol *registryProtocol + regProtocol *RegistryProtocol once sync.Once reserveParams = []string{ "application", "codec", "exchanger", "serialization", "cluster", "connections", "deprecated", "group", @@ -52,7 +52,7 @@ var ( } ) -type registryProtocol struct { +type RegistryProtocol struct { invokers []protocol.Invoker // Registry Map<RegistryAddress, Registry> registries *sync.Map @@ -74,8 +74,8 @@ func getCacheKey(url *common.URL) string { return url.CloneExceptParams(delKeys).String() } -func newRegistryProtocol() *registryProtocol { - return ®istryProtocol{ +func newRegistryProtocol() *RegistryProtocol { + return &RegistryProtocol{ registries: &sync.Map{}, bounds: &sync.Map{}, } @@ -111,14 +111,25 @@ func filterHideKey(url *common.URL) *common.URL { return url.CloneExceptParams(removeSet) } -func (proto *registryProtocol) initConfigurationListeners() { +func (proto *RegistryProtocol) initConfigurationListeners() { proto.overrideListeners = &sync.Map{} proto.serviceConfigurationListeners = &sync.Map{} proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) } +func (proto *RegistryProtocol) GetRegistries() []registry.Registry{ + var rs []registry.Registry + proto.registries.Range(func(_, v interface{}) bool { + if r, ok := v.(registry.Registry); ok { + rs = append(rs, r) + } + return true + }) + return rs +} + // Refer provider service from registry center -func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { +func (proto *RegistryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url var serviceUrl = registryUrl.SubURL if registryUrl.Protocol == constant.REGISTRY_PROTOCOL { @@ -158,7 +169,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } // Export provider service to registry center -func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { +func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { proto.once.Do(func() { proto.initConfigurationListeners() }) @@ -207,7 +218,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte } -func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) { +func (proto *RegistryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) { url := getProviderUrl(invoker) key := getCacheKey(url) if oldExporter, loaded := proto.bounds.Load(key); loaded { @@ -223,11 +234,11 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common type overrideSubscribeListener struct { url *common.URL originInvoker protocol.Invoker - protocol *registryProtocol + protocol *RegistryProtocol configurator config_center.Configurator } -func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Invoker, proto *registryProtocol) *overrideSubscribeListener { +func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Invoker, proto *RegistryProtocol) *overrideSubscribeListener { return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} } @@ -329,7 +340,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { } // Destroy registry protocol -func (proto *registryProtocol) Destroy() { +func (proto *RegistryProtocol) Destroy() { for _, ivk := range proto.invokers { ivk.Destroy() } diff --git a/registry/registry_factory.go b/registry/registry_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..58fbe395535f5ff82a4314ab93cf7ed4047737b9 --- /dev/null +++ b/registry/registry_factory.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +// RegistryFactory +type RegistryFactory interface { + // GetRegistries get registries + GetRegistries() []Registry +} diff --git a/registry/service_discovery_factory.go b/registry/service_discovery_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..6382403a45f8290142949546bb144e41b315f28c --- /dev/null +++ b/registry/service_discovery_factory.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +// ServiceDiscoveryFactory +type ServiceDiscoveryFactory interface { + // GetServiceDiscovery get service discovery + GetServiceDiscovery() ServiceDiscovery +} diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 061d832b0328a5e1754c7804bf40cf83ac216a8b..cdb586c137b16a309be0cb6f8bf6dfdb6293041d 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -28,7 +28,6 @@ import ( import ( cm "github.com/Workiva/go-datastructures/common" gxset "github.com/dubbogo/gost/container/set" - gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" "go.uber.org/atomic" ) @@ -176,18 +175,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error { logger.Warnf("The URL[%s] has been registry!", url.String()) } - // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap - // But we don't want to design a similar bootstrap class. - ins, err := createInstance(url) - if err != nil { - return perrors.WithMessage(err, "could not create servcie instance, please check your service url") - } - - err = s.serviceDiscovery.Register(ins) - if err != nil { - return perrors.WithMessage(err, "register the service failed") - } - err = s.metaDataService.PublishServiceDefinition(url) if err != nil { return perrors.WithMessage(err, "publish the service definition failed. ") @@ -198,36 +185,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error { url.Protocol) } -func createInstance(url common.URL) (registry.ServiceInstance, error) { - appConfig := config.GetApplicationConfig() - port, err := strconv.ParseInt(url.Port, 10, 32) - if err != nil { - return nil, perrors.WithMessage(err, "invalid port: "+url.Port) - } - - host := url.Ip - if len(host) == 0 { - host, err = gxnet.GetLocalIP() - if err != nil { - return nil, perrors.WithMessage(err, "could not get the local Ip") - } - } - - // usually we will add more metadata - metadata := make(map[string]string, 8) - metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType - - return ®istry.DefaultServiceInstance{ - ServiceName: appConfig.Name, - Host: host, - Port: int(port), - Id: host + constant.KEY_SEPARATOR + url.Port, - Enable: true, - Healthy: true, - Metadata: metadata, - }, nil -} - func shouldRegister(url common.URL) bool { side := url.GetParam(constant.SIDE_KEY, "") if side == constant.PROVIDER_PROTOCOL {