Skip to content
Snippets Groups Projects
Commit 62a7e88a authored by Ian Luo's avatar Ian Luo
Browse files

Merge branch 'develop' into bitmap-router

parents ace8f6b5 d0bfafb4
No related branches found
No related tags found
No related merge requests found
# Release Notes
---
## 1.5.1
### New Features
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/703)
- [Add TLS support](https://github.com/apache/dubbo-go/pull/685)
- [Add Nearest first for multiple registry](https://github.com/apache/dubbo-go/pull/659)
- [Add application and service level router](https://github.com/apache/dubbo-go/pull/662)
- [Add dynamic tag router](https://github.com/apache/dubbo-go/pull/665)
### Enhancement
- [Avoid init the log twice](https://github.com/apache/dubbo-go/pull/719)
- [Correct words and format codes](https://github.com/apache/dubbo-go/pull/704)
- [Change log stack level from warn to error](https://github.com/apache/dubbo-go/pull/702)
- [Optimize remotes configuration](https://github.com/apache/dubbo-go/pull/687)
### Bugfixes
- [Fix register service instance after provider config load](https://github.com/apache/dubbo-go/pull/694)
- [Fix call subscribe function asynchronously](https://github.com/apache/dubbo-go/pull/721)
- [Fix tag router rule copy](https://github.com/apache/dubbo-go/pull/721)
- [Fix nacos unit test failed](https://github.com/apache/dubbo-go/pull/705)
- [Fix can not inovke nacos destroy when graceful shutdown](https://github.com/apache/dubbo-go/pull/689)
- [Fix zk lost event](https://github.com/apache/dubbo-go/pull/692)
- [Fix k8s ut bug](https://github.com/apache/dubbo-go/pull/693)
Milestone: [https://github.com/apache/dubbo-go/milestone/2?closed=1](https://github.com/apache/dubbo-go/milestone/2?closed=1)
Project: [https://github.com/apache/dubbo-go/projects/8](https://github.com/apache/dubbo-go/projects/8)
## 1.5.0
### New Features
......
......@@ -16,6 +16,8 @@ Apache License, Version 2.0
## Release note ##
[v1.5.1 - Aug 23, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.1)
[v1.5.0 - July 24, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.5.0)
[v1.4.0 - Mar 17, 2020](https://github.com/apache/dubbo-go/releases/tag/v1.4.0)
......
......@@ -41,8 +41,8 @@ import (
type RouterRule struct {
router.BaseRouterRule `yaml:",inline"`
Tags []Tag
addressToTagNames map[string][]string
tagNameToAddresses map[string][]string
AddressToTagNames map[string][]string
TagNameToAddresses map[string][]string
}
func getRule(rawRule string) (*RouterRule, error) {
......@@ -58,13 +58,13 @@ func getRule(rawRule string) (*RouterRule, error) {
// parseTags use for flattening tags data to @addressToTagNames and @tagNameToAddresses
func (t *RouterRule) parseTags() {
t.addressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.tagNameToAddresses = make(map[string][]string, len(t.Tags))
t.AddressToTagNames = make(map[string][]string, 2*len(t.Tags))
t.TagNameToAddresses = make(map[string][]string, len(t.Tags))
for _, tag := range t.Tags {
for _, address := range tag.Addresses {
t.addressToTagNames[address] = append(t.addressToTagNames[address], tag.Name)
t.AddressToTagNames[address] = append(t.AddressToTagNames[address], tag.Name)
}
t.tagNameToAddresses[tag.Name] = tag.Addresses
t.TagNameToAddresses[tag.Name] = tag.Addresses
}
}
......@@ -85,15 +85,15 @@ func (t *RouterRule) getTagNames() []string {
}
func (t *RouterRule) hasTag(tag string) bool {
return len(t.tagNameToAddresses[tag]) > 0
return len(t.TagNameToAddresses[tag]) > 0
}
func (t *RouterRule) getAddressToTagNames() map[string][]string {
return t.addressToTagNames
return t.AddressToTagNames
}
func (t *RouterRule) getTagNameToAddresses() map[string][]string {
return t.tagNameToAddresses
return t.TagNameToAddresses
}
func (t *RouterRule) getTags() []Tag {
......
......@@ -21,6 +21,10 @@ import (
"fmt"
)
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/metadata/service"
)
......@@ -36,12 +40,11 @@ func SetMetadataService(msType string, creator func() (service.MetadataService,
}
// GetMetadataService will create a MetadataService instance
// it will panic if msType not found
func GetMetadataService(msType string) (service.MetadataService, error) {
if creator, ok := metadataServiceInsMap[msType]; ok {
return creator()
}
panic(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+
return nil, perrors.New(fmt.Sprintf("could not find the metadata service creator for metadataType: %s, please check whether you have imported relative packages, \n"+
"local - github.com/apache/dubbo-go/metadata/service/inmemory, \n"+
"remote - github.com/apache/dubbo-go/metadata/service/remote", msType))
}
......@@ -60,6 +60,10 @@ type Logger interface {
}
func init() {
// forbidden to executing twice.
if logger != nil {
return
}
logConfFile := os.Getenv(constant.APP_LOG_CONF_FILE)
err := InitLog(logConfFile)
if err != nil {
......
......@@ -21,11 +21,14 @@ import (
"fmt"
"log"
"os"
"reflect"
"strconv"
"sync"
"time"
)
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
......@@ -35,6 +38,7 @@ import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
_ "github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/registry"
)
var (
......@@ -206,6 +210,97 @@ func loadProviderConfig() {
panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
}
}
registerServiceInstance()
}
// registerServiceInstance register service instance
func registerServiceInstance() {
url := selectMetadataServiceExportedURL()
if url == nil {
return
}
instance, err := createInstance(*url)
if err != nil {
panic(err)
}
p := extension.GetProtocol(constant.REGISTRY_KEY)
var rp registry.RegistryFactory
var ok bool
if rp, ok = p.(registry.RegistryFactory); !ok {
panic("dubbo registry protocol{" + reflect.TypeOf(p).String() + "} is invalid")
}
rs := rp.GetRegistries()
for _, r := range rs {
var sdr registry.ServiceDiscoveryHolder
if sdr, ok = r.(registry.ServiceDiscoveryHolder); !ok {
continue
}
err := sdr.GetServiceDiscovery().Register(instance)
if err != nil {
panic(err)
}
}
}
// nolint
func createInstance(url common.URL) (registry.ServiceInstance, error) {
appConfig := GetApplicationConfig()
port, err := strconv.ParseInt(url.Port, 10, 32)
if err != nil {
return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
}
host := url.Ip
if len(host) == 0 {
host, err = gxnet.GetLocalIP()
if err != nil {
return nil, perrors.WithMessage(err, "could not get the local Ip")
}
}
// usually we will add more metadata
metadata := make(map[string]string, 8)
metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
return &registry.DefaultServiceInstance{
ServiceName: appConfig.Name,
Host: host,
Port: int(port),
Id: host + constant.KEY_SEPARATOR + url.Port,
Enable: true,
Healthy: true,
Metadata: metadata,
}, nil
}
// selectMetadataServiceExportedURL get already be exported url
func selectMetadataServiceExportedURL() *common.URL {
var selectedUrl common.URL
metaDataService, err := extension.GetMetadataService(GetApplicationConfig().MetadataType)
if err != nil {
logger.Warn(err)
return nil
}
list, err := metaDataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
if err != nil {
panic(err)
}
if len(list) == 0 {
return nil
}
for _, urlStr := range list {
url, err := common.NewURL(urlStr.(string))
if err != nil {
logger.Errorf("url format error {%v}", url)
continue
}
selectedUrl = url
// rest first
if url.Protocol == "rest" {
break
}
}
return &selectedUrl
}
func initRouter() {
......
......@@ -19,10 +19,16 @@ package config
import (
"path/filepath"
"sort"
"sync"
"testing"
)
import (
cm "github.com/Workiva/go-datastructures/common"
"github.com/Workiva/go-datastructures/slice/skip"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
......@@ -33,8 +39,11 @@ import (
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/registry"
)
const mockConsumerConfigPath = "./testdata/consumer_config.yml"
......@@ -74,7 +83,17 @@ func TestLoad(t *testing.T) {
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
GetApplicationConfig().MetadataType = "mock"
var mm *mockMetadataService
extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) {
if mm == nil {
mm = &mockMetadataService{
exportedServiceURLs: new(sync.Map),
lock: new(sync.RWMutex),
}
}
return mm, nil
})
Load()
assert.Equal(t, ms, GetRPCService(ms.Reference()))
......@@ -103,7 +122,17 @@ func TestLoadWithSingleReg(t *testing.T) {
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
var mm *mockMetadataService
GetApplicationConfig().MetadataType = "mock"
extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) {
if mm == nil {
mm = &mockMetadataService{
exportedServiceURLs: new(sync.Map),
lock: new(sync.RWMutex),
}
}
return mm, nil
})
Load()
assert.Equal(t, ms, GetRPCService(ms.Reference()))
......@@ -132,7 +161,17 @@ func TestWithNoRegLoad(t *testing.T) {
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
var mm *mockMetadataService
GetApplicationConfig().MetadataType = "mock"
extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) {
if mm == nil {
mm = &mockMetadataService{
exportedServiceURLs: new(sync.Map),
lock: new(sync.RWMutex),
}
}
return mm, nil
})
Load()
assert.Equal(t, ms, GetRPCService(ms.Reference()))
......@@ -300,3 +339,234 @@ func mockInitProviderWithSingleRegistry() {
},
}
}
type mockMetadataService struct {
exportedServiceURLs *sync.Map
lock *sync.RWMutex
}
func (m *mockMetadataService) Reference() string {
panic("implement me")
}
func (m *mockMetadataService) ServiceName() (string, error) {
panic("implement me")
}
func (m *mockMetadataService) ExportURL(url common.URL) (bool, error) {
return m.addURL(m.exportedServiceURLs, &url), nil
}
func (m *mockMetadataService) UnexportURL(url common.URL) error {
panic("implement me")
}
func (m *mockMetadataService) SubscribeURL(url common.URL) (bool, error) {
panic("implement me")
}
func (m *mockMetadataService) UnsubscribeURL(url common.URL) error {
panic("implement me")
}
func (m *mockMetadataService) PublishServiceDefinition(url common.URL) error {
return nil
}
func (m *mockMetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
return ConvertURLArrToIntfArr(m.getAllService(m.exportedServiceURLs)), nil
}
func (m *mockMetadataService) MethodMapper() map[string]string {
panic("implement me")
}
func (m *mockMetadataService) GetSubscribedURLs() ([]common.URL, error) {
panic("implement me")
}
func (m *mockMetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
panic("implement me")
}
func (m *mockMetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
panic("implement me")
}
func (m *mockMetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
panic("implement me")
}
func (m *mockMetadataService) Version() (string, error) {
panic("implement me")
}
func (mts *mockMetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
var (
urlSet interface{}
loaded bool
)
logger.Debug(url.ServiceKey())
if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded {
mts.lock.RLock()
wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
if len(wantedUrl) > 0 && wantedUrl[0] != nil {
mts.lock.RUnlock()
return false
}
mts.lock.RUnlock()
}
mts.lock.Lock()
// double chk
wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
if len(wantedUrl) > 0 && wantedUrl[0] != nil {
mts.lock.Unlock()
return false
}
urlSet.(*skip.SkipList).Insert(Comparator(*url))
mts.lock.Unlock()
return true
}
func (m *mockMetadataService) getAllService(services *sync.Map) []common.URL {
// using skip list to dedup and sorting
res := make([]common.URL, 0)
services.Range(func(key, value interface{}) bool {
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(Comparator))
if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.METADATA_SERVICE_NAME {
res = append(res, url)
}
}
return true
})
sort.Sort(common.URLSlice(res))
return res
}
type Comparator common.URL
// Compare is defined as Comparator for skip list to compare the URL
func (c Comparator) Compare(comp cm.Comparator) int {
a := common.URL(c).String()
b := common.URL(comp.(Comparator)).String()
switch {
case a > b:
return 1
case a < b:
return -1
default:
return 0
}
}
type mockServiceDiscoveryRegistry struct {
}
func (mr *mockServiceDiscoveryRegistry) GetUrl() common.URL {
panic("implement me")
}
func (mr *mockServiceDiscoveryRegistry) IsAvailable() bool {
panic("implement me")
}
func (mr *mockServiceDiscoveryRegistry) Destroy() {
panic("implement me")
}
func (mr *mockServiceDiscoveryRegistry) Register(url common.URL) error {
panic("implement me")
}
func (mr *mockServiceDiscoveryRegistry) UnRegister(url common.URL) error {
panic("implement me")
}
func (mr *mockServiceDiscoveryRegistry) Subscribe(*common.URL, registry.NotifyListener) error {
panic("implement me")
}
func (mr *mockServiceDiscoveryRegistry) UnSubscribe(*common.URL, registry.NotifyListener) error {
panic("implement me")
}
func (s *mockServiceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
return &mockServiceDiscovery{}
}
type mockServiceDiscovery struct {
}
func (m *mockServiceDiscovery) String() string {
panic("implement me")
}
func (m *mockServiceDiscovery) Destroy() error {
panic("implement me")
}
func (m *mockServiceDiscovery) Register(instance registry.ServiceInstance) error {
return nil
}
func (m *mockServiceDiscovery) Update(instance registry.ServiceInstance) error {
panic("implement me")
}
func (m *mockServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
panic("implement me")
}
func (m *mockServiceDiscovery) GetDefaultPageSize() int {
panic("implement me")
}
func (m *mockServiceDiscovery) GetServices() *gxset.HashSet {
panic("implement me")
}
func (m *mockServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
panic("implement me")
}
func (m *mockServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
panic("implement me")
}
func (m *mockServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
panic("implement me")
}
func (m *mockServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
panic("implement me")
}
func (m *mockServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
panic("implement me")
}
func (m *mockServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
panic("implement me")
}
func (m *mockServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
panic("implement me")
}
func (m *mockServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
panic("implement me")
}
func ConvertURLArrToIntfArr(urls []common.URL) []interface{} {
if len(urls) == 0 {
return []interface{}{}
}
res := make([]interface{}, 0, len(urls))
for _, u := range urls {
res = append(res, u.String())
}
return res
}
......@@ -32,6 +32,7 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/registry"
)
var regProtocol protocol.Protocol
......@@ -338,9 +339,37 @@ func (*mockRegistryProtocol) Refer(url common.URL) protocol.Invoker {
}
func (*mockRegistryProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
registryUrl := getRegistryUrl(invoker)
if registryUrl.Protocol == "service-discovery" {
metaDataService, err := extension.GetMetadataService(GetApplicationConfig().MetadataType)
if err != nil {
panic(err)
}
ok, err := metaDataService.ExportURL(*invoker.GetUrl().SubURL.Clone())
if err != nil {
panic(err)
}
if !ok {
panic("The URL has been registry!")
}
}
return protocol.NewBaseExporter("test", invoker, &sync.Map{})
}
func (*mockRegistryProtocol) Destroy() {
// Destroy is a mock function
}
func getRegistryUrl(invoker protocol.Invoker) *common.URL {
// here add * for return a new url
url := invoker.GetUrl()
// if the protocol == registry ,set protocol the registry value in url.params
if url.Protocol == constant.REGISTRY_PROTOCOL {
protocol := url.GetParam(constant.REGISTRY_KEY, "")
url.Protocol = protocol
}
return &url
}
func (p *mockRegistryProtocol) GetRegistries() []registry.Registry {
return []registry.Registry{&mockServiceDiscoveryRegistry{}}
}
......@@ -40,13 +40,33 @@ func doInitProvider() {
Module: "module",
Version: "2.6.0",
Owner: "dubbo",
Environment: "test"},
Environment: "test",
},
Remotes: map[string]*RemoteConfig{
"test1": {
Address: "127.0.0.5:2181",
TimeoutStr: "5s",
Username: "user1",
Password: "pwd1",
Params: nil,
},
},
ServiceDiscoveries: map[string]*ServiceDiscoveryConfig{
"mock_servicediscovery": {
Protocol: "mock",
RemoteRef: "test1",
},
},
MetadataReportConfig: &MetadataReportConfig{
Protocol: "mock",
RemoteRef: "test1",
},
},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "com.MockService",
Protocol: "mock",
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2,hangzhou_service_discovery_reg",
Cluster: "failover",
Loadbalance: "random",
Retries: "3",
......@@ -71,7 +91,7 @@ func doInitProvider() {
"MockServiceNoRightProtocol": {
InterfaceName: "com.MockService",
Protocol: "mock1",
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2,hangzhou_service_discovery_reg",
Cluster: "failover",
Loadbalance: "random",
Retries: "3",
......@@ -128,6 +148,14 @@ func doInitProvider() {
Username: "user1",
Password: "pwd1",
},
"hangzhou_service_discovery_reg": {
Protocol: "service-discovery",
Params: map[string]string{
"service_discovery": "mock_servicediscovery",
"name_mapping": "in-memory",
"metadata": "default",
},
},
},
Protocols: map[string]*ProtocolConfig{
......
......@@ -188,7 +188,8 @@ func (nl *nacosListener) startListen() error {
}
serviceName := getSubscribeName(nl.listenUrl)
nl.subscribeParam = &vo.SubscribeParam{ServiceName: serviceName, SubscribeCallback: nl.Callback}
return nl.namingClient.Subscribe(nl.subscribeParam)
go nl.namingClient.Subscribe(nl.subscribeParam)
return nil
}
func (nl *nacosListener) stopListen() error {
......
......@@ -117,6 +117,18 @@ func (proto *registryProtocol) initConfigurationListeners() {
proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners)
}
// nolint
func (proto *registryProtocol) GetRegistries() []registry.Registry {
var rs []registry.Registry
proto.registries.Range(func(_, v interface{}) bool {
if r, ok := v.(registry.Registry); ok {
rs = append(rs, r)
}
return true
})
return rs
}
// Refer provider service from registry center
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
......@@ -372,7 +384,7 @@ func setProviderUrl(regURL *common.URL, providerURL *common.URL) {
regURL.SubURL = providerURL
}
// GetProtocol return the singleton RegistryProtocol
// GetProtocol return the singleton registryProtocol
func GetProtocol() protocol.Protocol {
once.Do(func() {
regProtocol = newRegistryProtocol()
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package registry
// RegistryFactory
type RegistryFactory interface {
// GetRegistries get registries
GetRegistries() []Registry
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package registry
// ServiceDiscoveryHolder we can get a service discovery
// it always be a service discovery registry
type ServiceDiscoveryHolder interface {
// GetServiceDiscovery get service discovery
GetServiceDiscovery() ServiceDiscovery
}
......@@ -28,7 +28,6 @@ import (
import (
cm "github.com/Workiva/go-datastructures/common"
gxset "github.com/dubbogo/gost/container/set"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
......@@ -176,18 +175,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
logger.Warnf("The URL[%s] has been registry!", url.String())
}
// we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
// But we don't want to design a similar bootstrap class.
ins, err := createInstance(url)
if err != nil {
return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
}
err = s.serviceDiscovery.Register(ins)
if err != nil {
return perrors.WithMessage(err, "register the service failed")
}
err = s.metaDataService.PublishServiceDefinition(url)
if err != nil {
return perrors.WithMessage(err, "publish the service definition failed. ")
......@@ -198,36 +185,6 @@ func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
url.Protocol)
}
func createInstance(url common.URL) (registry.ServiceInstance, error) {
appConfig := config.GetApplicationConfig()
port, err := strconv.ParseInt(url.Port, 10, 32)
if err != nil {
return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
}
host := url.Ip
if len(host) == 0 {
host, err = gxnet.GetLocalIP()
if err != nil {
return nil, perrors.WithMessage(err, "could not get the local Ip")
}
}
// usually we will add more metadata
metadata := make(map[string]string, 8)
metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
return &registry.DefaultServiceInstance{
ServiceName: appConfig.Name,
Host: host,
Port: int(port),
Id: host + constant.KEY_SEPARATOR + url.Port,
Enable: true,
Healthy: true,
Metadata: metadata,
}, nil
}
func shouldRegister(url common.URL) bool {
side := url.GetParam(constant.SIDE_KEY, "")
if side == constant.PROVIDER_PROTOCOL {
......@@ -675,7 +632,7 @@ func (icn *InstanceChangeNotify) Notify(event observer.Event) {
if se, ok := event.(*registry.ServiceInstancesChangedEvent); ok {
sdr := icn.serviceDiscoveryRegistry
sdr.subscribe(sdr.url, icn.notify, se.ServiceName, se.Instances)
sdr.subscribe(sdr.url.SubURL, icn.notify, se.ServiceName, se.Instances)
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment