diff --git a/config/config_loader.go b/config/config_loader.go
index 8b196305b9cf7d33cb149def149af178f0e2808c..befa01b0a670ecaf24e1fcfa24e117b04af2ca7c 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -21,6 +21,7 @@ import (
"fmt"
"log"
"os"
+ "strconv"
"sync"
"time"
)
@@ -35,6 +36,7 @@ import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
_ "github.com/apache/dubbo-go/common/observer/dispatcher"
+ "github.com/apache/dubbo-go/registry"
)
var (
@@ -206,6 +208,98 @@ func loadProviderConfig() {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
}
+ registerServiceInstance()
+}
+
+// registerServiceInstance register service instance
+func registerServiceInstance(){
+ url := selectMetadataServiceExportedURL()
+ if url == nil {
+ return
+ }
+ instance, err := createInstance(*url)
+ if err != nil {
+ panic(err)
+ }
+ p := extension.GetProtocol(constant.REGISTRY_KEY)
+ var rp registry.RegistryFactory
+ var ok bool
+ if rp, ok = p.(registry.RegistryFactory); !ok {
+ panic("dubbo registry protocol is invalid")
+ }
+ rs := rp.GetRegistries()
+ for _, r := range rs {
+ var sdr registry.ServiceDiscoveryFactory
+ if sdr, ok = r.(registry.ServiceDiscoveryFactory); !ok {
+ continue
+ }
+ err := sdr.GetServiceDiscovery().Register(instance)
+ if err != nil {
+ panic(err)
+ }
+ }
+}
+
+// createInstance
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+ appConfig := GetApplicationConfig()
+ port, err := strconv.ParseInt(url.Port, 10, 32)
+ if err != nil {
+ return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+ }
+
+ host := url.Ip
+ if len(host) == 0 {
+ host, err = gxnet.GetLocalIP()
+ if err != nil {
+ return nil, perrors.WithMessage(err, "could not get the local Ip")
+ }
+ }
+
+ // usually we will add more metadata
+ metadata := make(map[string]string, 8)
+ metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+ return ®istry.DefaultServiceInstance{
+ ServiceName: appConfig.Name,
+ Host: host,
+ Port: int(port),
+ Id: host + constant.KEY_SEPARATOR + url.Port,
+ Enable: true,
+ Healthy: true,
+ Metadata: metadata,
+ }, nil
+}
+
+// selectMetadataServiceExportedURL get already be exported url
+func selectMetadataServiceExportedURL() *common.URL {
+ var selectedUrl common.URL
+ metaDataService, err := extension.GetMetadataService(GetApplicationConfig().MetadataType)
+ if err != nil {
+ panic(err)
+ }
+ list, err := metaDataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+ if err != nil {
+ panic(err)
+ }
+ if len(list) == 0 {
+ return nil
+ }
+ for _, urlStr := range list {
+ url, err := common.NewURL(urlStr.(string))
+ if err != nil {
+ logger.Errorf("url format error {%v}", url)
+ continue
+ }
+ // rest first
+ if url.Protocol == "rest" {
+ selectedUrl = url
+ break
+ } else {
+ selectedUrl = url
+ }
+ }
+ return &selectedUrl
}
func initRouter() {
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 4c669b2cee74b95ceb3bc8287f145ccd6b99bc0b..963a3cba95a8641ebce6d8dc358bdfc70de40335 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -43,7 +43,7 @@ import (
)
var (
- regProtocol *registryProtocol
+ regProtocol *RegistryProtocol
once sync.Once
reserveParams = []string{
"application", "codec", "exchanger", "serialization", "cluster", "connections", "deprecated", "group",
@@ -52,7 +52,7 @@ var (
}
)
-type registryProtocol struct {
+type RegistryProtocol struct {
invokers []protocol.Invoker
// Registry Map<RegistryAddress, Registry>
registries *sync.Map
@@ -74,8 +74,8 @@ func getCacheKey(url *common.URL) string {
return url.CloneExceptParams(delKeys).String()
}
-func newRegistryProtocol() *registryProtocol {
- return ®istryProtocol{
+func newRegistryProtocol() *RegistryProtocol {
+ return &RegistryProtocol{
registries: &sync.Map{},
bounds: &sync.Map{},
}
@@ -111,14 +111,25 @@ func filterHideKey(url *common.URL) *common.URL {
return url.CloneExceptParams(removeSet)
}
-func (proto *registryProtocol) initConfigurationListeners() {
+func (proto *RegistryProtocol) initConfigurationListeners() {
proto.overrideListeners = &sync.Map{}
proto.serviceConfigurationListeners = &sync.Map{}
proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners)
}
+func (proto *RegistryProtocol) GetRegistries() []registry.Registry{
+ var rs []registry.Registry
+ proto.registries.Range(func(_, v interface{}) bool {
+ if r, ok := v.(registry.Registry); ok {
+ rs = append(rs, r)
+ }
+ return true
+ })
+ return rs
+}
+
// Refer provider service from registry center
-func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
+func (proto *RegistryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
var serviceUrl = registryUrl.SubURL
if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {
@@ -158,7 +169,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
}
// Export provider service to registry center
-func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
+func (proto *RegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
proto.once.Do(func() {
proto.initConfigurationListeners()
})
@@ -207,7 +218,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
}
-func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) {
+func (proto *RegistryProtocol) reExport(invoker protocol.Invoker, newUrl *common.URL) {
url := getProviderUrl(invoker)
key := getCacheKey(url)
if oldExporter, loaded := proto.bounds.Load(key); loaded {
@@ -223,11 +234,11 @@ func (proto *registryProtocol) reExport(invoker protocol.Invoker, newUrl *common
type overrideSubscribeListener struct {
url *common.URL
originInvoker protocol.Invoker
- protocol *registryProtocol
+ protocol *RegistryProtocol
configurator config_center.Configurator
}
-func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Invoker, proto *registryProtocol) *overrideSubscribeListener {
+func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Invoker, proto *RegistryProtocol) *overrideSubscribeListener {
return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto}
}
@@ -329,7 +340,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL {
}
// Destroy registry protocol
-func (proto *registryProtocol) Destroy() {
+func (proto *RegistryProtocol) Destroy() {
for _, ivk := range proto.invokers {
ivk.Destroy()
}
diff --git a/registry/registry_factory.go b/registry/registry_factory.go
new file mode 100644
index 0000000000000000000000000000000000000000..58fbe395535f5ff82a4314ab93cf7ed4047737b9
--- /dev/null
+++ b/registry/registry_factory.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+// RegistryFactory
+type RegistryFactory interface {
+ // GetRegistries get registries
+ GetRegistries() []Registry
+}
diff --git a/registry/service_discovery_factory.go b/registry/service_discovery_factory.go
new file mode 100644
index 0000000000000000000000000000000000000000..6382403a45f8290142949546bb144e41b315f28c
--- /dev/null
+++ b/registry/service_discovery_factory.go
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+// ServiceDiscoveryFactory
+type ServiceDiscoveryFactory interface {
+ // GetServiceDiscovery get service discovery
+ GetServiceDiscovery() ServiceDiscovery
+}
diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go
index 061d832b0328a5e1754c7804bf40cf83ac216a8b..cdb586c137b16a309be0cb6f8bf6dfdb6293041d 100644
--- a/registry/servicediscovery/service_discovery_registry.go
+++ b/registry/servicediscovery/service_discovery_registry.go
@@ -28,7 +28,6 @@ import (
import (
cm "github.com/Workiva/go-datastructures/common"
gxset "github.com/dubbogo/gost/container/set"
- gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
@@ -176,18 +175,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
logger.Warnf("The URL[%s] has been registry!", url.String())
}
- // we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
- // But we don't want to design a similar bootstrap class.
- ins, err := createInstance(url)
- if err != nil {
- return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
- }
-
- err = s.serviceDiscovery.Register(ins)
- if err != nil {
- return perrors.WithMessage(err, "register the service failed")
- }
-
err = s.metaDataService.PublishServiceDefinition(url)
if err != nil {
return perrors.WithMessage(err, "publish the service definition failed. ")
@@ -198,36 +185,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
url.Protocol)
}
-func createInstance(url common.URL) (registry.ServiceInstance, error) {
- appConfig := config.GetApplicationConfig()
- port, err := strconv.ParseInt(url.Port, 10, 32)
- if err != nil {
- return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
- }
-
- host := url.Ip
- if len(host) == 0 {
- host, err = gxnet.GetLocalIP()
- if err != nil {
- return nil, perrors.WithMessage(err, "could not get the local Ip")
- }
- }
-
- // usually we will add more metadata
- metadata := make(map[string]string, 8)
- metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
-
- return ®istry.DefaultServiceInstance{
- ServiceName: appConfig.Name,
- Host: host,
- Port: int(port),
- Id: host + constant.KEY_SEPARATOR + url.Port,
- Enable: true,
- Healthy: true,
- Metadata: metadata,
- }, nil
-}
-
func shouldRegister(url common.URL) bool {
side := url.GetParam(constant.SIDE_KEY, "")
if side == constant.PROVIDER_PROTOCOL {