diff --git a/common/constant/key.go b/common/constant/key.go index 8c84a2a8884d909bdd743f738f3d69dde399cc22..1479af2305b1ce4280c2aa7f4016e314ac358513 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -131,6 +131,7 @@ const ( ProviderConfigPrefix = "dubbo.provider." ConsumerConfigPrefix = "dubbo.consumer." ShutdownConfigPrefix = "dubbo.shutdown." + MetadataReportPrefix = "dubbo.metadata-report." RouterConfigPrefix = "dubbo.router." ) @@ -214,6 +215,19 @@ const ( SECRET_ACCESS_KEY_KEY = "secretAccessKey" ) +// metadata report + +const ( + METACONFIG_REMOTE = "remote" + METACONFIG_LOCAL = "local" + KEY_SEPARATOR = ":" + DEFAULT_PATH_TAG = "metadata" + KEY_REVISON_PREFIX = "revision" + + // metadata service + METADATA_SERVICE_NAME = "org.apache.dubbo.metadata.MetadataService" +) + // HealthCheck Router const ( // The key of HealthCheck SPI @@ -235,3 +249,9 @@ const ( // The default time window of circuit-tripped in millisecond if not specfied MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 ) + +// service discovery + +const ( + NACOS_GROUP = "nacos.group" +) diff --git a/common/extension/metadata_report_factory.go b/common/extension/metadata_report_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..0ae0793bb4459767cb42fb1860fc484388aae1a3 --- /dev/null +++ b/common/extension/metadata_report_factory.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/metadata" +) + +var ( + metaDataReportFactories = make(map[string]func() metadata.MetadataReportFactory, 8) +) + +// SetMetadataReportFactory ... +func SetMetadataReportFactory(name string, v func() metadata.MetadataReportFactory) { + metaDataReportFactories[name] = v +} + +// GetMetadataReportFactory ... +func GetMetadataReportFactory(name string) metadata.MetadataReportFactory { + if metaDataReportFactories[name] == nil { + panic("metadata report for " + name + " is not existing, make sure you have import the package.") + } + return metaDataReportFactories[name]() +} diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..25b80cf3353505c058bea40cc4c80712ad923d2d --- /dev/null +++ b/common/extension/service_discovery.go @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + perrors "github.com/pkg/errors" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/registry" +) + +var ( + discoveryCreatorMap = make(map[string]func(url *common.URL) (registry.ServiceDiscovery, error), 4) +) + +// SetServiceDiscovery will store the creator and name +func SetServiceDiscovery(name string, creator func(url *common.URL) (registry.ServiceDiscovery, error)) { + discoveryCreatorMap[name] = creator +} + +// GetServiceDiscovery will return the registry.ServiceDiscovery +// if not found, or initialize instance failed, it will return error. +func GetServiceDiscovery(name string, url *common.URL) (registry.ServiceDiscovery, error) { + creator, ok := discoveryCreatorMap[name] + if !ok { + return nil, perrors.New("Could not find the service discovery with name: " + name) + } + return creator(url) +} diff --git a/config/application_config.go b/config/application_config.go index 23ab7d34aceaba02d7f592906d6f4e3d6cf36dae..33b47c81dd0da9959984cd1f53648167863cb713 100644 --- a/config/application_config.go +++ b/config/application_config.go @@ -33,6 +33,7 @@ type ApplicationConfig struct { Version string `yaml:"version" json:"version,omitempty" property:"version"` Owner string `yaml:"owner" json:"owner,omitempty" property:"owner"` Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"` + MetadataType string `default:"local" yaml:"metadataType" json:"metadataType,omitempty" property:"metadataType"` //field for metadata report } // Prefix ... diff --git a/config/consumer_config.go b/config/consumer_config.go index 1fa68415bfc3c7e622c0b455e9945c926fed4df2..debcd79fa281c40e5526f60f5c5cdb66688688f4 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -44,6 +44,7 @@ type ConsumerConfig struct { Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` // application ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` + // client Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty" property:"connect_timeout"` ConnectTimeout time.Duration @@ -117,6 +118,7 @@ func ConsumerInit(confConFile string) error { return perrors.WithMessagef(err, "time.ParseDuration(Connect_Timeout{%#v})", consumerConfig.Connect_Timeout) } } + logger.Debugf("consumer config{%#v}\n", consumerConfig) return nil diff --git a/config/instance/metedata_report.go b/config/instance/metedata_report.go new file mode 100644 index 0000000000000000000000000000000000000000..cd54b0a7940df166c88f02234ab1a4e3bf384163 --- /dev/null +++ b/config/instance/metedata_report.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package instance + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/metadata" +) + +var ( + instance metadata.MetadataReport + once sync.Once +) + +// GetMetadataReportInstance ... +func GetMetadataReportInstance(url *common.URL) metadata.MetadataReport { + once.Do(func() { + instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url) + }) + return instance +} diff --git a/config/metadata_report_config.go b/config/metadata_report_config.go new file mode 100644 index 0000000000000000000000000000000000000000..41fb6b4769e59784d8d18c3f82b956fd029d4ff7 --- /dev/null +++ b/config/metadata_report_config.go @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "net/url" +) + +import ( + "github.com/creasty/defaults" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config/instance" +) + +// MethodConfig ... +type MetadataReportConfig struct { + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"` + Address string `yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + TimeoutStr string `yaml:"timeout" default:"5s" json:"timeout,omitempty" property:"timeout"` // unit: second + Group string `yaml:"group" json:"group,omitempty" property:"group"` +} + +// Prefix ... +func (c *MetadataReportConfig) Prefix() string { + return constant.MetadataReportPrefix +} + +// UnmarshalYAML ... +func (c *MetadataReportConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return perrors.WithStack(err) + } + type plain MetadataReportConfig + if err := unmarshal((*plain)(c)); err != nil { + return perrors.WithStack(err) + } + return nil +} + +// ToUrl ... +func (c *MetadataReportConfig) ToUrl() (*common.URL, error) { + urlMap := make(url.Values) + + if c.Params != nil { + for k, v := range c.Params { + urlMap.Set(k, v) + } + } + + url, err := common.NewURL(c.Address, + common.WithParams(urlMap), + common.WithUsername(c.Username), + common.WithPassword(c.Password), + common.WithLocation(c.Address), + common.WithProtocol(c.Protocol), + ) + if err != nil || len(url.Protocol) == 0 { + return nil, perrors.New("Invalid MetadataReportConfig.") + } + url.SetParam("metadata", url.Protocol) + return &url, nil +} + +func (c *MetadataReportConfig) IsValid() bool { + return len(c.Protocol) != 0 +} + +// StartMetadataReport: The entry of metadata report start +func startMetadataReport(metadataType string, metadataReportConfig *MetadataReportConfig) error { + if metadataReportConfig == nil || metadataReportConfig.IsValid() { + return nil + } + + if metadataType == constant.METACONFIG_REMOTE { + return perrors.New("No MetadataConfig found, you must specify the remote Metadata Center address when 'metadata=remote' is enabled.") + } else if metadataType == constant.METACONFIG_REMOTE && len(metadataReportConfig.Address) == 0 { + return perrors.New("MetadataConfig address can not be empty.") + } + + if url, err := metadataReportConfig.ToUrl(); err == nil { + instance.GetMetadataReportInstance(url) + } else { + return perrors.New("MetadataConfig is invalid!") + } + + return nil +} diff --git a/config/metadata_report_config_test.go b/config/metadata_report_config_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d6b08d5fb0c51495940d4dc021a0796c1d577923 --- /dev/null +++ b/config/metadata_report_config_test.go @@ -0,0 +1,30 @@ +package config + +import "testing" + +import ( + "github.com/stretchr/testify/assert" +) + +func TestMetadataReportConfig_ToUrl(t *testing.T) { + metadataReportConfig := MetadataReportConfig{ + Protocol: "mock", + Address: "127.0.0.1:2181", + Username: "test", + Password: "test", + TimeoutStr: "3s", + Params: map[string]string{ + "k": "v", + }, + } + url, error := metadataReportConfig.ToUrl() + assert.NoError(t, error) + assert.Equal(t, "mock", url.Protocol) + assert.Equal(t, "127.0.0.1:2181", url.Location) + assert.Equal(t, "127.0.0.1", url.Ip) + assert.Equal(t, "2181", url.Port) + assert.Equal(t, "test", url.Username) + assert.Equal(t, "test", url.Password) + assert.Equal(t, "v", url.GetParam("k", "")) + assert.Equal(t, "mock", url.GetParam("metadata", "")) +} diff --git a/config/provider_config.go b/config/provider_config.go index 14b77cafb3487754b9583d3b4e64ff605394b7db..79569917455773653750d1d5921a722daf079b0a 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -41,6 +41,8 @@ type ProviderConfig struct { BaseConfig `yaml:",inline"` Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` ProxyFactory string `yaml:"proxy_factory" default:"default" json:"proxy_factory,omitempty" property:"proxy_factory"` + // metadata-report + MetadataReportConfig *MetadataReportConfig `yaml:"metadata_report" json:"metadata_report,omitempty" property:"metadata_report"` ApplicationConfig *ApplicationConfig `yaml:"application" json:"application,omitempty" property:"application"` Registry *RegistryConfig `yaml:"registry" json:"registry,omitempty" property:"registry"` @@ -95,7 +97,10 @@ func ProviderInit(confProFile string) error { n.InterfaceId = k } } - + //start the metadata report if config set + if err := startMetadataReport(providerConfig.ApplicationConfig.MetadataType, providerConfig.MetadataReportConfig); err != nil { + return perrors.WithMessagef(err, "Provider starts metadata report error, and the error is {%#v}", err) + } logger.Debugf("provider config{%#v}\n", providerConfig) return nil diff --git a/config/registry_config.go b/config/registry_config.go index 4e4b6e97d79a9402616b6cac954f7a09b2973dcc..f3d22311b86d4cc3b66f12e9926dff9565ae4cd6 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -93,7 +93,7 @@ func loadRegistries(targetRegistries string, registries map[string]*RegistryConf addresses := strings.Split(registryConf.Address, ",") address := addresses[0] - address = traslateRegistryConf(address, registryConf) + address = translateRegistryConf(address, registryConf) url, err = common.NewURL(constant.REGISTRY_PROTOCOL+"://"+address, common.WithParams(registryConf.getUrlMap(roleType)), common.WithUsername(registryConf.Username), @@ -127,7 +127,7 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { return urlMap } -func traslateRegistryConf(address string, registryConf *RegistryConfig) string { +func translateRegistryConf(address string, registryConf *RegistryConfig) string { if strings.Contains(address, "://") { translatedUrl, err := url.Parse(address) if err != nil { diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go index 3b5d1f4ebe8e728eb4d21e78ab675036c1ca9f63..b049d334bca7e5191caaf9674734e731bc709ba2 100644 --- a/config_center/apollo/impl.go +++ b/config_center/apollo/impl.go @@ -25,7 +25,8 @@ import ( ) import ( - "github.com/pkg/errors" + gxset "github.com/dubbogo/gost/container/set" + perrors "github.com/pkg/errors" "github.com/zouyx/agollo" ) @@ -119,7 +120,7 @@ func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat func (c *apolloConfiguration) GetInternalProperty(key string, opts ...cc.Option) (string, error) { config := agollo.GetConfig(c.appConf.NamespaceName) if config == nil { - return "", errors.New(fmt.Sprintf("nothing in namespace:%s ", key)) + return "", perrors.New(fmt.Sprintf("nothing in namespace:%s ", key)) } return config.GetStringValue(key, ""), nil } @@ -128,6 +129,16 @@ func (c *apolloConfiguration) GetRule(key string, opts ...cc.Option) (string, er return c.GetInternalProperty(key, opts...) } +// PublishConfig will publish the config with the (key, group, value) pair +func (c *apolloConfiguration) PublishConfig(string, string, string) error { + return perrors.New("unsupport operation") +} + +// GetConfigKeysByGroup will return all keys with the group +func (c *apolloConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { + return nil, perrors.New("unsupport operation") +} + func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (string, error) { /** * when group is not null, we are getting startup configs(config file) from Config Center, for example: @@ -135,7 +146,7 @@ func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (stri */ config := agollo.GetConfig(key) if config == nil { - return "", errors.New(fmt.Sprintf("nothing in namespace:%s ", key)) + return "", perrors.New(fmt.Sprintf("nothing in namespace:%s ", key)) } return config.GetContent(agollo.Properties), nil } diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index d6c3b06b327f16c709b09121e589db6694d3663e..9013d7140e757520f2e8f048ce53a5ac2a13f982 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -21,14 +21,18 @@ import ( "time" ) +import ( + gxset "github.com/dubbogo/gost/container/set" +) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/config_center/parser" ) -////////////////////////////////////////// +// //////////////////////////////////////// // DynamicConfiguration -////////////////////////////////////////// +// //////////////////////////////////////// const ( // DEFAULT_GROUP: default group DEFAULT_GROUP = "dubbo" @@ -42,14 +46,20 @@ type DynamicConfiguration interface { SetParser(parser.ConfigurationParser) AddListener(string, ConfigurationListener, ...Option) RemoveListener(string, ConfigurationListener, ...Option) - //GetProperties get properties file + // GetProperties get properties file GetProperties(string, ...Option) (string, error) - //GetRule get Router rule properties file + // GetRule get Router rule properties file GetRule(string, ...Option) (string, error) - //GetInternalProperty get value by key in Default properties file(dubbo.properties) + // GetInternalProperty get value by key in Default properties file(dubbo.properties) GetInternalProperty(string, ...Option) (string, error) + + // PublishConfig will publish the config with the (key, group, value) pair + PublishConfig(string, string, string) error + + // GetConfigKeysByGroup will return all keys with the group + GetConfigKeysByGroup(group string) (*gxset.HashSet, error) } // Options ... @@ -75,7 +85,7 @@ func WithTimeout(time time.Duration) Option { } } -//GetRuleKey The format is '{interfaceName}:[version]:[group]' +// GetRuleKey The format is '{interfaceName}:[version]:[group]' func GetRuleKey(url common.URL) string { return url.ColonSeparatedKey() } diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go index 4d972b629abb7abd7cc0d0018026e4ccc04a1e4f..59c788b65bce2a4773975ea1a96a314649781832 100644 --- a/config_center/mock_dynamic_config.go +++ b/config_center/mock_dynamic_config.go @@ -22,6 +22,7 @@ import ( ) import ( + gxset "github.com/dubbogo/gost/container/set" "gopkg.in/yaml.v2" ) @@ -81,6 +82,16 @@ func (f *MockDynamicConfigurationFactory) GetDynamicConfiguration(_ *common.URL) } +// PublishConfig will publish the config with the (key, group, value) pair +func (c *MockDynamicConfiguration) PublishConfig(string, string, string) error { + return nil +} + +// GetConfigKeysByGroup will return all keys with the group +func (c *MockDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { + return gxset.NewSet(c.content), nil +} + // MockDynamicConfiguration ... type MockDynamicConfiguration struct { parser parser.ConfigurationParser diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index 60ab89b003ff62016b9137223425c1051356975f..007b8be142274b63ceb56dd00399cdaf29c3746d 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -18,10 +18,12 @@ package nacos import ( + "strings" "sync" ) import ( + gxset "github.com/dubbogo/gost/container/set" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" ) @@ -74,7 +76,7 @@ func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_c n.removeListener(key, listener) } -//nacos distinguishes configuration files based on group and dataId. defalut group = "dubbo" and dataId = key +// GetProperties nacos distinguishes configuration files based on group and dataId. defalut group = "dubbo" and dataId = key func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { return n.GetRule(key, opts...) } @@ -84,6 +86,33 @@ func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...conf return n.GetProperties(key, opts...) } +// PublishConfig will publish the config with the (key, group, value) pair +func (n *nacosDynamicConfiguration) PublishConfig(key string, group string, value string) error { + + group = n.resolvedGroup(group) + + ok, err := (*n.client.Client()).PublishConfig(vo.ConfigParam{ + DataId: key, + Group: group, + Content: value, + }) + + if err != nil { + return perrors.WithStack(err) + } + if !ok { + return perrors.New("publish config to Nocos failed") + } + return nil +} + +// GetConfigKeysByGroup will return all keys with the group +func (n *nacosDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { + // TODO (the golang client of nacos does not support batch API) + // we should build a issue and then think about how to resolve this problem + return nil, perrors.New("unsupport operation, wait for implement") +} + // GetRule Get router rule func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { tmpOpts := &config_center.Options{} @@ -92,12 +121,12 @@ func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Op } content, err := (*n.client.Client()).GetConfig(vo.ConfigParam{ DataId: key, - Group: tmpOpts.Group, + Group: n.resolvedGroup(tmpOpts.Group), }) if err != nil { return "", perrors.WithStack(err) } else { - return string(content), nil + return content, nil } } @@ -145,6 +174,15 @@ func (n *nacosDynamicConfiguration) Destroy() { n.closeConfigs() } +// resolvedGroup will regular the group. Now, it will replace the '/' with '-'. +// '/' is a special character for nacos +func (n *nacosDynamicConfiguration) resolvedGroup(group string) string { + if len(group) <= 0 { + return group + } + return strings.ReplaceAll(group, "/", "-") +} + // IsAvailable Get available status func (n *nacosDynamicConfiguration) IsAvailable() bool { select { @@ -155,12 +193,12 @@ func (n *nacosDynamicConfiguration) IsAvailable() bool { } } -func (r *nacosDynamicConfiguration) closeConfigs() { - r.cltLock.Lock() - client := r.client - r.client = nil - r.cltLock.Unlock() +func (n *nacosDynamicConfiguration) closeConfigs() { + n.cltLock.Lock() + client := n.client + n.client = nil + n.cltLock.Unlock() // Close the old client first to close the tmp node client.Close() - logger.Infof("begin to close provider nacos client") + logger.Infof("begin to close provider n client") } diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go index b4e6f1d0259979eba28dd81e8f480ab4ae03a39f..4032c91cda512b649140db6eea3dc11eeb482f27 100644 --- a/config_center/nacos/impl_test.go +++ b/config_center/nacos/impl_test.go @@ -60,12 +60,7 @@ func runMockConfigServer(configHandler func(http.ResponseWriter, *http.Request), func mockCommonNacosServer() *httptest.Server { return runMockConfigServer(func(writer http.ResponseWriter, request *http.Request) { - data := ` - dubbo.service.com.ikurento.user.UserProvider.cluster=failback - dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo1 - dubbo.protocols.myDubbo.port=20000 - dubbo.protocols.myDubbo.name=dubbo -` + data := "true" fmt.Fprintf(writer, "%s", data) }, func(writer http.ResponseWriter, request *http.Request) { data := `dubbo.properties%02dubbo%02dubbo.service.com.ikurento.user.UserProvider.cluster=failback` @@ -93,6 +88,16 @@ func Test_GetConfig(t *testing.T) { assert.NoError(t, err) } +func TestNacosDynamicConfiguration_PublishConfig(t *testing.T) { + nacos, err := initNacosData(t) + assert.Nil(t, err) + key := "myKey" + group := "/custom/a/b" + value := "MyValue" + err = nacos.PublishConfig(key, group, value) + assert.Nil(t, err) +} + func Test_AddListener(t *testing.T) { nacos, err := initNacosData(t) assert.NoError(t, err) diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index 25c586586c7202e42ff44d6104e8132961add25a..de74cff8f64683a47278825b670352a04b69b791 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -35,11 +35,11 @@ func callback(listener config_center.ConfigurationListener, namespace, group, da listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) } -func (l *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { - _, loaded := l.keyListeners.Load(key) +func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { + _, loaded := n.keyListeners.Load(key) if !loaded { _, cancel := context.WithCancel(context.Background()) - err := (*l.client.Client()).ListenConfig(vo.ConfigParam{ + err := (*n.client.Client()).ListenConfig(vo.ConfigParam{ DataId: key, Group: "dubbo", OnChange: func(namespace, group, dataId, data string) { @@ -49,14 +49,14 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent logger.Errorf("nacos : listen config fail, error:%v ", err) newListener := make(map[config_center.ConfigurationListener]context.CancelFunc) newListener[listener] = cancel - l.keyListeners.Store(key, newListener) + n.keyListeners.Store(key, newListener) } else { // TODO check goroutine alive, but this version of go_nacos_sdk is not support. logger.Infof("profile:%s. this profile is already listening", key) } } -func (l *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { +func (n *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { // TODO: not supported in current go_nacos_sdk version logger.Warn("not supported in current go_nacos_sdk version") } diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go index 5ffbb5f95dd5830534599c689681f94db4365660..0a1ce35306dab98363ca475cd5d1b0648e924b90 100644 --- a/config_center/zookeeper/impl.go +++ b/config_center/zookeeper/impl.go @@ -25,6 +25,7 @@ import ( import ( "github.com/dubbogo/go-zookeeper/zk" + gxset "github.com/dubbogo/gost/container/set" perrors "github.com/pkg/errors" ) @@ -39,8 +40,9 @@ import ( const ( // ZkClient - //zookeeper client name - ZkClient = "zk config_center" + // zookeeper client name + ZkClient = "zk config_center" + pathSeparator = "/" ) type zookeeperDynamicConfiguration struct { @@ -143,11 +145,39 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config return string(content), nil } -//For zookeeper, getConfig and getConfigs have the same meaning. +// GetInternalProperty For zookeeper, getConfig and getConfigs have the same meaning. func (c *zookeeperDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { return c.GetProperties(key, opts...) } +// PublishConfig will put the value into Zk with specific path +func (c *zookeeperDynamicConfiguration) PublishConfig(key string, group string, value string) error { + path := c.getPath(key, group) + err := c.client.CreateWithValue(path, []byte(value)) + if err != nil { + return perrors.WithStack(err) + } + return nil +} + +// GetConfigKeysByGroup will return all keys with the group +func (c *zookeeperDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) { + path := c.getPath("", group) + result, err := c.client.GetChildren(path) + if err != nil { + return nil, perrors.WithStack(err) + } + + if len(result) == 0 { + return nil, perrors.New("could not find keys with group: " + group) + } + set := gxset.NewSet() + for _, e := range result { + set.Add(e) + } + return set, nil +} + func (c *zookeeperDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { return c.GetProperties(key, opts...) } @@ -214,3 +244,17 @@ func (c *zookeeperDynamicConfiguration) closeConfigs() { func (c *zookeeperDynamicConfiguration) RestartCallBack() bool { return true } + +func (c *zookeeperDynamicConfiguration) getPath(key string, group string) string { + if len(key) == 0 { + return c.buildPath(group) + } + return c.buildPath(group) + pathSeparator + key +} + +func (c *zookeeperDynamicConfiguration) buildPath(group string) string { + if len(group) == 0 { + group = config_center.DEFAULT_GROUP + } + return c.rootPath + pathSeparator + group +} diff --git a/config_center/zookeeper/impl_test.go b/config_center/zookeeper/impl_test.go index 22e15193cba1b533a2b1b965a44bf9665a6a4e5e..30389122a3a06ee260f2ed8b21057523137995d5 100644 --- a/config_center/zookeeper/impl_test.go +++ b/config_center/zookeeper/impl_test.go @@ -24,6 +24,7 @@ import ( import ( "github.com/dubbogo/go-zookeeper/zk" + gxset "github.com/dubbogo/gost/container/set" "github.com/stretchr/testify/assert" ) @@ -156,6 +157,26 @@ func Test_RemoveListener(t *testing.T) { assert.Equal(t, "", listener.event) } +func TestZookeeperDynamicConfiguration_PublishConfig(t *testing.T) { + value := "Test Data" + customGroup := "Custom Group" + key := "myKey" + ts, zk := initZkData(config_center.DEFAULT_GROUP, t) + defer ts.Stop() + err := zk.PublishConfig(key, customGroup, value) + assert.Nil(t, err) + result, err := zk.GetInternalProperty("myKey", config_center.WithGroup(customGroup)) + assert.Nil(t, err) + assert.Equal(t, value, result) + + var keys *gxset.HashSet + keys, err = zk.GetConfigKeysByGroup(customGroup) + assert.Nil(t, err) + assert.Equal(t, 1, keys.Size()) + assert.True(t, keys.Contains(key)) + +} + type mockDataListener struct { wg sync.WaitGroup event string diff --git a/go.mod b/go.mod index 67ed1eb402c34eecc5214411531b0a4ab68fda0c..54d532eac06a56057f815e3c8e91fdd267c9c6ad 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/magiconair/properties v1.8.1 github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb + github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 diff --git a/go.sum b/go.sum index bbbaf34640b63d6b7c423d8c7f9c4419025842cf..e499992eb0a0335ca0a1e1f746caca3418af7655 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,8 @@ github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4D github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8= github.com/dubbogo/gost v1.5.2/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= +github.com/dubbogo/gost v1.7.0 h1:lWNBIE2hk1Aj2be2uXkyRTpZG0RQZj0/xbXnkIq6EHE= +github.com/dubbogo/gost v1.7.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/dubbogo/gost v1.8.0 h1:9ACbQe5OwMjqtinQcNJC5xp16kky27OsfSGw5L9A6vw= github.com/dubbogo/gost v1.8.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8= github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M= @@ -385,6 +387,8 @@ github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb h1:lbmvw8r9W55w+aQgWn35W1nuleRIECMoqUrmwAOAvoI= github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= +github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c h1:WoCa3AvgQMVKNs+RIFlWPRgY9QVJwUxJDrGxHs0fcRo= +github.com/nacos-group/nacos-sdk-go v0.0.0-20191128082542-fe1b325b125c/go.mod h1:CEkSvEpoveoYjA81m4HNeYQ0sge0LFGKSEqO3JKHllo= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2 h1:BQ1HW7hr4IVovMwWg0E0PYcyW8CzqDcVmaew9cujU4s= github.com/nicolai86/scaleway-sdk v1.10.2-0.20180628010248-798f60e20bb2/go.mod h1:TLb2Sg7HQcgGdloNxkrmtgDNR9uVYF3lfdFIN4Ro6Sk= github.com/oklog/run v0.0.0-20180308005104-6934b124db28 h1:Hbr3fbVPXea52oPQeP7KLSxP52g6SFaNY1IqAmUyEW0= diff --git a/metadata/definition/definition.go b/metadata/definition/definition.go new file mode 100644 index 0000000000000000000000000000000000000000..ead984345efde1ddd1d54b7599fd9d5584947ea2 --- /dev/null +++ b/metadata/definition/definition.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package definition + +type ServiceDefinition struct { + CanonicalName string + CodeSource string + Methods []MethodDefinition + Types []TypeDefinition +} + +type MethodDefinition struct { + Name string + ParameterTypes []string + ReturnType string + Parameters []TypeDefinition +} + +type TypeDefinition struct { + Id string + Type string + Items []TypeDefinition + Enums []string + Properties map[string]TypeDefinition + TypeBuilderName string +} diff --git a/metadata/exporter.go b/metadata/exporter.go new file mode 100644 index 0000000000000000000000000000000000000000..5d47f8bd808ec802ba73c7db73d22c78c675d12a --- /dev/null +++ b/metadata/exporter.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "github.com/apache/dubbo-go/common" +) + +type MetadataExporter interface { + Export() MetadataExporter + Unexport() MetadataExporter + GetExportedURLs() []*common.URL + IsExported() bool +} diff --git a/metadata/identifier/base_metadata_identifier.go b/metadata/identifier/base_metadata_identifier.go new file mode 100644 index 0000000000000000000000000000000000000000..a314671055be523844fd7d8f9589b8b6031632bc --- /dev/null +++ b/metadata/identifier/base_metadata_identifier.go @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package identifier + +import ( + "encoding/base64" +) + +import ( + "github.com/apache/dubbo-go/common/constant" +) + +type BaseMetadataIdentifier interface { + getFilePathKey(params ...string) string + getIdentifierKey(params ...string) string +} + +type BaseServiceMetadataIdentifier struct { + serviceInterface string + version string + group string + side string +} + +// joinParams... +func joinParams(joinChar string, params []string) string { + var joinedStr string + for _, param := range params { + joinedStr += joinChar + joinedStr += param + } + return joinedStr +} + +// getIdentifierKey... +func (mdi *BaseServiceMetadataIdentifier) getIdentifierKey(params ...string) string { + return mdi.serviceInterface + + constant.KEY_SEPARATOR + mdi.version + + constant.KEY_SEPARATOR + mdi.group + + constant.KEY_SEPARATOR + mdi.side + + joinParams(constant.KEY_SEPARATOR, params) +} + +// getFilePathKey... +func (mdi *BaseServiceMetadataIdentifier) getFilePathKey(params ...string) string { + path := serviceToPath(mdi.serviceInterface) + + return constant.DEFAULT_PATH_TAG + + withPathSeparator(path) + + withPathSeparator(mdi.version) + + withPathSeparator(mdi.group) + + withPathSeparator(mdi.side) + + joinParams(constant.PATH_SEPARATOR, params) + +} + +// serviceToPath... +func serviceToPath(serviceInterface string) string { + if serviceInterface == constant.ANY_VALUE { + return "" + } else { + decoded, err := base64.URLEncoding.DecodeString(serviceInterface) + if err != nil { + return "" + } + return string(decoded) + } + +} + +//withPathSeparator... +func withPathSeparator(path string) string { + if len(path) != 0 { + path = constant.PATH_SEPARATOR + path + } + return path +} diff --git a/metadata/identifier/metadata_identifier.go b/metadata/identifier/metadata_identifier.go new file mode 100644 index 0000000000000000000000000000000000000000..f3df8f36546093a826279c4e9ec1546f78d444bd --- /dev/null +++ b/metadata/identifier/metadata_identifier.go @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package identifier + +type MetadataIdentifier struct { + application string + BaseMetadataIdentifier +} + +// getIdentifierKey... +func (mdi *MetadataIdentifier) getIdentifierKey(params ...string) string { + return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.application) +} + +// getIdentifierKey... +func (mdi *MetadataIdentifier) getFilePathKey(params ...string) string { + return mdi.BaseMetadataIdentifier.getFilePathKey(mdi.application) +} diff --git a/metadata/identifier/service_metadata_identifier.go b/metadata/identifier/service_metadata_identifier.go new file mode 100644 index 0000000000000000000000000000000000000000..373df0130dd1f87e3175918bde50060c4be89616 --- /dev/null +++ b/metadata/identifier/service_metadata_identifier.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package identifier + +import ( + "github.com/apache/dubbo-go/common/constant" +) + +type ServiceMetadataIdentifier struct { + revision string + protocol string + BaseMetadataIdentifier +} + +// getIdentifierKey... +func (mdi *ServiceMetadataIdentifier) getIdentifierKey(params ...string) string { + return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.protocol + constant.KEY_REVISON_PREFIX + mdi.revision) +} + +// getIdentifierKey... +func (mdi *ServiceMetadataIdentifier) getFilePathKey(params ...string) string { + return mdi.BaseMetadataIdentifier.getFilePathKey(mdi.protocol + constant.KEY_REVISON_PREFIX + mdi.revision) +} diff --git a/metadata/identifier/subscribe_metadata_identifier.go b/metadata/identifier/subscribe_metadata_identifier.go new file mode 100644 index 0000000000000000000000000000000000000000..fd3a290b41e870674366943e12a396c3dae7e238 --- /dev/null +++ b/metadata/identifier/subscribe_metadata_identifier.go @@ -0,0 +1,16 @@ +package identifier + +type SubscriberMetadataIdentifier struct { + revision string + BaseMetadataIdentifier +} + +// getIdentifierKey... +func (mdi *SubscriberMetadataIdentifier) getIdentifierKey(params ...string) string { + return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.revision) +} + +// getIdentifierKey... +func (mdi *SubscriberMetadataIdentifier) getFilePathKey(params ...string) string { + return mdi.BaseMetadataIdentifier.getFilePathKey(mdi.revision) +} diff --git a/metadata/namemapping/dynamic/service_name_mapping.go b/metadata/namemapping/dynamic/service_name_mapping.go new file mode 100644 index 0000000000000000000000000000000000000000..e93c256fe093b4a3e3c431e1d012038b2bb7976b --- /dev/null +++ b/metadata/namemapping/dynamic/service_name_mapping.go @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dynamic + +import ( + "strconv" + "time" +) + +import ( + "github.com/dubbogo/gost/container/set" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/metadata" +) + +const ( + defaultGroup = config_center.DEFAULT_GROUP + slash = "/" +) + +// DynamicConfigurationServiceNameMapping is the implementation based on config center +type DynamicConfigurationServiceNameMapping struct { + dc config_center.DynamicConfiguration +} + +// Map will map the service to this application-level service +func (d *DynamicConfigurationServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error { + // metadata service is admin service, should not be mapped + if constant.METADATA_SERVICE_NAME == serviceInterface { + return perrors.New("try to map the metadata service, will be ignored") + } + + appName := config.GetApplicationConfig().Name + value := time.Now().UnixNano() + + err := d.dc.PublishConfig(appName, + d.buildGroup(serviceInterface), + strconv.FormatInt(value, 10)) + if err != nil { + return perrors.WithStack(err) + } + return nil +} + +// Get will return the application-level services. If not found, the empty set will be returned. +// if the dynamic configuration got error, the error will return +func (d *DynamicConfigurationServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { + return d.dc.GetConfigKeysByGroup(d.buildGroup(serviceInterface)) +} + +// buildGroup will return group, now it looks like defaultGroup/serviceInterface +func (d *DynamicConfigurationServiceNameMapping) buildGroup(serviceInterface string) string { + // the issue : https://github.com/apache/dubbo/issues/4671 + // so other params are ignored and remove, including group string, version string, protocol string + return defaultGroup + slash + serviceInterface +} + +// NewServiceNameMapping will create an instance of DynamicConfigurationServiceNameMapping +func NewServiceNameMapping(dc config_center.DynamicConfiguration) metadata.ServiceNameMapping { + return &DynamicConfigurationServiceNameMapping{dc: dc} +} diff --git a/metadata/namemapping/dynamic/service_name_mapping_test.go b/metadata/namemapping/dynamic/service_name_mapping_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e3d620cd738421c256d8fd232b1afcfd425ca989 --- /dev/null +++ b/metadata/namemapping/dynamic/service_name_mapping_test.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dynamic + +import ( + "testing" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/config_center" +) + +func TestDynamicConfigurationServiceNameMapping(t *testing.T) { + + // mock data + appName := "myApp" + dc, err := (&config_center.MockDynamicConfigurationFactory{ + Content: appName, + }).GetDynamicConfiguration(nil) + config.GetApplicationConfig().Name = appName + + mapping := NewServiceNameMapping(dc) + intf := constant.METADATA_SERVICE_NAME + group := "myGroup" + version := "myVersion" + protocol := "myProtocol" + + err = mapping.Map(intf, group, version, protocol) + assert.NotNil(t, err) + intf = "MyService" + err = mapping.Map(intf, group, version, protocol) + assert.Nil(t, err) + + var result *gxset.HashSet + result, err = mapping.Get(intf, group, version, protocol) + assert.Nil(t, err) + assert.Equal(t, 1, result.Size()) + assert.True(t, result.Contains(appName)) +} diff --git a/metadata/namemapping/memory/service_name_mapping.go b/metadata/namemapping/memory/service_name_mapping.go new file mode 100644 index 0000000000000000000000000000000000000000..8a891491bdb97808b77422092a1043c1c0ffafbf --- /dev/null +++ b/metadata/namemapping/memory/service_name_mapping.go @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + gxset "github.com/dubbogo/gost/container/set" +) + +import ( + "github.com/apache/dubbo-go/config" +) + +type InMemoryServiceNameMapping struct{} + +func (i InMemoryServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error { + return nil +} + +func (i InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { + return gxset.NewSet(config.GetApplicationConfig().Name), nil +} diff --git a/metadata/report.go b/metadata/report.go new file mode 100644 index 0000000000000000000000000000000000000000..3fcc71241411d4a8f9577bb5fb3233e67942cd52 --- /dev/null +++ b/metadata/report.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/metadata/definition" + "github.com/apache/dubbo-go/metadata/identifier" +) + +type MetadataReport interface { + StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition) + StoreConsumeretadata(*identifier.MetadataIdentifier, map[string]string) + SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, *common.URL) + RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) + GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string + SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []*common.URL) + GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string + GetServiceDefinition(*identifier.MetadataIdentifier) +} diff --git a/metadata/report_factory.go b/metadata/report_factory.go new file mode 100644 index 0000000000000000000000000000000000000000..19b1004eee57073acec13c7f114179c47c73f145 --- /dev/null +++ b/metadata/report_factory.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "github.com/apache/dubbo-go/common" +) + +var ( + MetadataReportInstance MetadataReport +) + +type MetadataReportFactory interface { + CreateMetadataReport(*common.URL) MetadataReport +} diff --git a/metadata/service.go b/metadata/service.go new file mode 100644 index 0000000000000000000000000000000000000000..d85703c95a57183d5c0a5b2445839e946dc6a59b --- /dev/null +++ b/metadata/service.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "github.com/apache/dubbo-go/common" + gxset "github.com/dubbogo/gost/container/set" +) + +type MetadataService interface { + ServiceName() string + ExportURL(url *common.URL) bool + UnexportURL(url *common.URL) bool + RefreshMetadata(exportedRevision string, subscribedRevision string) bool + SubscribeURL(url *common.URL) bool + UnsubscribeURL(url *common.URL) bool + PublishServiceDefinition(url *common.URL) + + GetExportedURLs(serviceInterface string, group string, version string, protocol string) gxset.HashSet + GetServiceDefinition(interfaceName string, version string, group string) string + GetServiceDefinitionByServiceKey(serviceKey string) string +} diff --git a/metadata/service_name_mapping.go b/metadata/service_name_mapping.go new file mode 100644 index 0000000000000000000000000000000000000000..c14e8ce2e7c40d1573897dfd6ba64c16e18acac7 --- /dev/null +++ b/metadata/service_name_mapping.go @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + gxset "github.com/dubbogo/gost/container/set" +) + +// ServiceNameMapping try to build the mapping between application-level service and interface-level service. +type ServiceNameMapping interface { + + // Map will map the service to this application-level service + Map(serviceInterface string, group string, version string, protocol string) error + + // Get will return the application-level services + Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) +} diff --git a/registry/event.go b/registry/event.go index 37d863d2162cb3b9d6a9f7eba8823286eb99441c..be9f11d00bb5a70b0d787d15bcdc98471aad0a4b 100644 --- a/registry/event.go +++ b/registry/event.go @@ -32,9 +32,9 @@ func init() { rand.Seed(time.Now().UnixNano()) } -////////////////////////////////////////// +// //////////////////////////////////////// // service event -////////////////////////////////////////// +// //////////////////////////////////////// // ServiceEvent ... type ServiceEvent struct { @@ -42,6 +42,69 @@ type ServiceEvent struct { Service common.URL } +// String return the description of event func (e ServiceEvent) String() string { return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service) } + +// Event is align with Event interface in Java. +// it's the top abstraction +// Align with 2.7.5 +type Event interface { + fmt.Stringer + GetSource() interface{} + GetTimestamp() time.Time +} + +// baseEvent is the base implementation of Event +// You should never use it directly +type baseEvent struct { + source interface{} + timestamp time.Time +} + +// GetSource return the source +func (b *baseEvent) GetSource() interface{} { + return b.source +} + +// GetTimestamp return the timestamp when the event is created +func (b *baseEvent) GetTimestamp() time.Time { + return b.timestamp +} + +// String return a human readable string representing this event +func (b *baseEvent) String() string { + return fmt.Sprintf("baseEvent[source = %#v]", b.source) +} + +func newBaseEvent(source interface{}) *baseEvent { + return &baseEvent{ + source: source, + timestamp: time.Now(), + } +} + +// ServiceInstancesChangedEvent represents service instances make some changing +type ServiceInstancesChangedEvent struct { + baseEvent + ServiceName string + Instances []ServiceInstance +} + +// String return the description of the event +func (s *ServiceInstancesChangedEvent) String() string { + return fmt.Sprintf("ServiceInstancesChangedEvent[source=%s]", s.ServiceName) +} + +// NewServiceInstancesChangedEvent will create the ServiceInstanceChangedEvent instance +func NewServiceInstancesChangedEvent(serviceName string, instances []ServiceInstance) *ServiceInstancesChangedEvent { + return &ServiceInstancesChangedEvent{ + baseEvent: baseEvent{ + source: serviceName, + timestamp: time.Now(), + }, + ServiceName: serviceName, + Instances: instances, + } +} diff --git a/registry/event_listener.go b/registry/event_listener.go new file mode 100644 index 0000000000000000000000000000000000000000..b8d6148442d9e10e210958dead690c4a95b33fb6 --- /dev/null +++ b/registry/event_listener.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + gxsort "github.com/dubbogo/gost/sort" +) + +// EventListener is an new interface used to align with dubbo 2.7.5 +// It contains the Prioritized means that the listener has its priority +type EventListener interface { + gxsort.Prioritizer + // OnEvent handle this event + OnEvent(e Event) error +} + +// ConditionalEventListener only handle the event which it can handle +type ConditionalEventListener interface { + EventListener + // Accept will make the decision whether it should handle this event + Accept(e Event) bool +} + +// TODO (implement ConditionalEventListener) +type ServiceInstancesChangedListener struct { + ServiceName string +} diff --git a/registry/nacos/base_registry.go b/registry/nacos/base_registry.go new file mode 100644 index 0000000000000000000000000000000000000000..63f4999675470853d0f48d1a22b709efdc1c9d26 --- /dev/null +++ b/registry/nacos/base_registry.go @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "net" + "strconv" + "strings" + "time" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +// baseRegistry is the parent of both interface-level registry +// and service discovery(related to application-level registry) +type nacosBaseRegistry struct { + *common.URL + namingClient naming_client.INamingClient +} + +// newBaseRegistry will create new instance +func newBaseRegistry(url *common.URL) (nacosBaseRegistry, error) { + nacosConfig, err := getNacosConfig(url) + if err != nil { + return nacosBaseRegistry{}, err + } + client, err := clients.CreateNamingClient(nacosConfig) + if err != nil { + return nacosBaseRegistry{}, err + } + registry := nacosBaseRegistry{ + URL: url, + namingClient: client, + } + return registry, nil +} + +// getNacosConfig will return the nacos config +func getNacosConfig(url *common.URL) (map[string]interface{}, error) { + if url == nil { + return nil, perrors.New("url is empty!") + } + if len(url.Location) == 0 { + return nil, perrors.New("url.location is empty!") + } + configMap := make(map[string]interface{}, 2) + + addresses := strings.Split(url.Location, ",") + serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) + for _, addr := range addresses { + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, perrors.WithMessagef(err, "split [%s] ", addr) + } + port, _ := strconv.Atoi(portStr) + serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ + IpAddr: ip, + Port: uint64(port), + }) + } + configMap["serverConfigs"] = serverConfigs + + var clientConfig nacosConstant.ClientConfig + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + return nil, err + } + clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) + clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs + clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") + clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") + clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") + clientConfig.NotLoadCacheAtStart = true + configMap["clientConfig"] = clientConfig + + return configMap, nil +} diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 965e91e894ac61562bfd25c8f564f789afd6c8a1..a436b85064829b9f42c9dcc45545e5bf2fd2fefe 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -19,7 +19,6 @@ package nacos import ( "bytes" - "net" "strconv" "strings" "time" @@ -27,9 +26,6 @@ import ( import ( gxnet "github.com/dubbogo/gost/net" - "github.com/nacos-group/nacos-sdk-go/clients" - "github.com/nacos-group/nacos-sdk-go/clients/naming_client" - nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/vo" perrors "github.com/pkg/errors" ) @@ -57,64 +53,18 @@ func init() { } type nacosRegistry struct { - *common.URL - namingClient naming_client.INamingClient -} - -func getNacosConfig(url *common.URL) (map[string]interface{}, error) { - if url == nil { - return nil, perrors.New("url is empty!") - } - if len(url.Location) == 0 { - return nil, perrors.New("url.location is empty!") - } - configMap := make(map[string]interface{}, 2) - - addresses := strings.Split(url.Location, ",") - serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses)) - for _, addr := range addresses { - ip, portStr, err := net.SplitHostPort(addr) - if err != nil { - return nil, perrors.WithMessagef(err, "split [%s] ", addr) - } - port, _ := strconv.Atoi(portStr) - serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{ - IpAddr: ip, - Port: uint64(port), - }) - } - configMap["serverConfigs"] = serverConfigs - - var clientConfig nacosConstant.ClientConfig - timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) - if err != nil { - return nil, err - } - clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000) - clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs - clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "") - clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "") - clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "") - clientConfig.NotLoadCacheAtStart = true - configMap["clientConfig"] = clientConfig - - return configMap, nil + nacosBaseRegistry } +// newNacosRegistry will create an instance func newNacosRegistry(url *common.URL) (registry.Registry, error) { - nacosConfig, err := getNacosConfig(url) + base, err := newBaseRegistry(url) if err != nil { - return nil, err - } - client, err := clients.CreateNamingClient(nacosConfig) - if err != nil { - return nil, err - } - registry := nacosRegistry{ - URL: url, - namingClient: client, + return nil, perrors.WithStack(err) } - return ®istry, nil + return &nacosRegistry{ + base, + }, nil } func getCategory(url common.URL) string { diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..8ef72c1b11dca257103c155534b68cbd522d358f --- /dev/null +++ b/registry/nacos/service_discovery.go @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "github.com/dubbogo/gost/container/set" + "github.com/dubbogo/gost/page" + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/registry" +) + +const ( + defaultGroup = "DEFAULT_GROUP" + idKey = "id" +) + +// init will put the service discovery into extension +func init() { + extension.SetServiceDiscovery(constant.NACOS_KEY, newNacosServiceDiscovery) +} + +// nacosServiceDiscovery is the implementation of service discovery based on nacos. +// There is a problem, the go client for nacos does not support the id field. +// we will use the metadata to store the id of ServiceInstance +type nacosServiceDiscovery struct { + nacosBaseRegistry + group string +} + +// Destroy will close the service discovery. +// Actually, it only marks the naming client as null and then return +func (n *nacosServiceDiscovery) Destroy() error { + n.namingClient = nil + return nil +} + +// Register will register the service to nacos +func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) error { + ins := n.toRegisterInstance(instance) + ok, err := n.namingClient.RegisterInstance(ins) + if err != nil || !ok { + return perrors.WithMessage(err, "Could not register the instance. "+instance.GetServiceName()) + } + return nil +} + +// Update will update the information +// However, because nacos client doesn't support the update API, +// so we should unregister the instance and then register it again. +// the error handling is hard to implement +func (n *nacosServiceDiscovery) Update(instance registry.ServiceInstance) error { + // TODO(wait for nacos support) + err := n.Unregister(instance) + if err != nil { + return perrors.WithStack(err) + } + return n.Register(instance) +} + +// Unregister will unregister the instance +func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) error { + ok, err := n.namingClient.DeregisterInstance(n.toDeregisterInstance(instance)) + if err != nil || !ok { + return perrors.WithMessage(err, "Could not unregister the instance. "+instance.GetServiceName()) + } + return nil +} + +// GetDefaultPageSize will return the constant registry.DefaultPageSize +func (n *nacosServiceDiscovery) GetDefaultPageSize() int { + return registry.DefaultPageSize +} + +// GetServices will return the all services +func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet { + services, err := n.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ + GroupName: n.group, + }) + + res := gxset.NewSet() + if err != nil { + logger.Errorf("Could not query the services: %v", err) + return res + } + + for _, e := range services { + res.Add(e.Name) + } + return res +} + +// GetInstances will return the instances of serviceName and the group +func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance { + instances, err := n.namingClient.SelectAllInstances(vo.SelectAllInstancesParam{ + ServiceName: serviceName, + GroupName: n.group, + }) + if err != nil { + logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group) + return make([]registry.ServiceInstance, 0, 0) + } + res := make([]registry.ServiceInstance, 0, len(instances)) + for _, ins := range instances { + metadata := ins.Metadata + id := metadata[idKey] + + delete(metadata, idKey) + + res = append(res, ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: ins.ServiceName, + Host: ins.Ip, + Port: int(ins.Port), + Enable: ins.Enable, + Healthy: ins.Healthy, + Metadata: metadata, + }) + } + + return res +} + +// GetInstancesByPage will return the instances +// Due to nacos client does not support pagination, so we have to query all instances and then return part of them +func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager { + all := n.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance + for i := offset; i < len(all) && i < offset+pageSize; i++ { + res = append(res, all[i]) + } + return gxpage.New(offset, pageSize, res, len(all)) +} + +// GetHealthyInstancesByPage will return the instance +// The nacos client has an API SelectInstances, which has a parameter call HealthyOnly. +// However, the healthy parameter in this method maybe false. So we can not use that API. +// Thus, we must query all instances and then do filter +func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager { + all := n.GetInstances(serviceName) + res := make([]interface{}, 0, pageSize) + // could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance + var ( + i = offset + count = 0 + ) + for i < len(all) && count < pageSize { + ins := all[i] + if ins.IsHealthy() == healthy { + res = append(res, all[i]) + count++ + } + i++ + } + return gxpage.New(offset, pageSize, res, len(all)) +} + +// GetRequestInstances will return the instances +// The nacos client doesn't have batch API, so we should query those serviceNames one by one. +func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager { + res := make(map[string]gxpage.Pager, len(serviceNames)) + for _, name := range serviceNames { + res[name] = n.GetInstancesByPage(name, offset, requestedSize) + } + return res +} + +// AddListener will add a listener +func (n *nacosServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error { + return n.namingClient.Subscribe(&vo.SubscribeParam{ + ServiceName: listener.ServiceName, + SubscribeCallback: func(services []model.SubscribeService, err error) { + if err != nil { + logger.Errorf("Could not handle the subscribe notification because the err is not nil."+ + " service name: %s, err: %v", listener.ServiceName, err) + } + instances := make([]registry.ServiceInstance, 0, len(services)) + for _, service := range services { + // we won't use the nacos instance id here but use our instance id + metadata := service.Metadata + id := metadata[idKey] + + delete(metadata, idKey) + + instances = append(instances, ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: service.ServiceName, + Host: service.Ip, + Port: int(service.Port), + Enable: service.Enable, + Healthy: true, + Metadata: metadata, + }) + } + + e := n.DispatchEventForInstances(listener.ServiceName, instances) + if e != nil { + logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err) + } + }, + }) +} + +// DispatchEventByServiceName will dispatch the event for the service with the service name +func (n *nacosServiceDiscovery) DispatchEventByServiceName(serviceName string) error { + return n.DispatchEventForInstances(serviceName, n.GetInstances(serviceName)) +} + +// DispatchEventForInstances will dispatch the event to those instances +func (n *nacosServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error { + return n.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances)) +} + +// DispatchEvent will dispatch the event +func (n *nacosServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error { + // TODO(waiting for event dispatcher, another task) + return nil +} + +// toRegisterInstance convert the ServiceInstance to RegisterInstanceParam +// the Ephemeral will be true +func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInstance) vo.RegisterInstanceParam { + metadata := instance.GetMetadata() + if metadata == nil { + metadata = make(map[string]string, 1) + } + metadata[idKey] = instance.GetId() + return vo.RegisterInstanceParam{ + ServiceName: instance.GetServiceName(), + Ip: instance.GetHost(), + Port: uint64(instance.GetPort()), + Metadata: metadata, + Enable: instance.IsEnable(), + Healthy: instance.IsHealthy(), + GroupName: n.group, + Ephemeral: true, + } +} + +// toDeregisterInstance will convert the ServiceInstance to DeregisterInstanceParam +func (n *nacosServiceDiscovery) toDeregisterInstance(instance registry.ServiceInstance) vo.DeregisterInstanceParam { + return vo.DeregisterInstanceParam{ + ServiceName: instance.GetServiceName(), + Ip: instance.GetHost(), + Port: uint64(instance.GetPort()), + GroupName: n.group, + } +} + +// toDeregisterInstance will create new service discovery instance +func newNacosServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) { + + base, err := newBaseRegistry(url) + if err != nil { + return nil, perrors.WithStack(err) + } + return &nacosServiceDiscovery{ + nacosBaseRegistry: base, + group: url.GetParam(constant.NACOS_GROUP, defaultGroup), + }, nil +} diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a756e8669301919d406a4bcf0e1c962cf532a5c6 --- /dev/null +++ b/registry/nacos/service_discovery_test.go @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "strconv" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/registry" +) + +func TestNacosServiceDiscovery_Destroy(t *testing.T) { + serviceDiscovry, err := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + assert.Nil(t, err) + assert.NotNil(t, serviceDiscovry) + err = serviceDiscovry.Destroy() + assert.Nil(t, err) + assert.Nil(t, serviceDiscovry.(*nacosServiceDiscovery).namingClient) +} + +func TestNacosServiceDiscovery_CRUD(t *testing.T) { + serviceName := "service-name" + id := "id" + host := "host" + port := 123 + instance := ®istry.DefaultServiceInstance{ + Id: id, + ServiceName: serviceName, + Host: host, + Port: port, + Enable: true, + Healthy: true, + Metadata: nil, + } + + // clean data + + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + + // clean data for local test + serviceDiscovry.Unregister(®istry.DefaultServiceInstance{ + Id: id, + ServiceName: serviceName, + Host: host, + Port: port, + }) + + err := serviceDiscovry.Register(instance) + assert.Nil(t, err) + + page := serviceDiscovry.GetHealthyInstancesByPage(serviceName, 0, 10, true) + assert.NotNil(t, page) + + assert.Equal(t, 0, page.GetOffset()) + assert.Equal(t, 10, page.GetPageSize()) + assert.Equal(t, 1, page.GetDataSize()) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + assert.NotNil(t, instance) + assert.Equal(t, id, instance.GetId()) + assert.Equal(t, host, instance.GetHost()) + assert.Equal(t, port, instance.GetPort()) + assert.Equal(t, serviceName, instance.GetServiceName()) + assert.Equal(t, 0, len(instance.GetMetadata())) + + instance.Metadata["a"] = "b" + + err = serviceDiscovry.Update(instance) + assert.Nil(t, err) + + pageMap := serviceDiscovry.GetRequestInstances([]string{serviceName}, 0, 1) + assert.Equal(t, 1, len(pageMap)) + page = pageMap[serviceName] + assert.NotNil(t, page) + assert.Equal(t, 1, len(page.GetData())) + + instance = page.GetData()[0].(*registry.DefaultServiceInstance) + v, _ := instance.Metadata["a"] + assert.Equal(t, "b", v) + + // test dispatcher event + err = serviceDiscovry.DispatchEventByServiceName(serviceName) + assert.Nil(t, err) + + // test AddListener + err = serviceDiscovry.AddListener(®istry.ServiceInstancesChangedListener{}) + assert.Nil(t, err) +} + +func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) { + serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) + assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) +} + +func mockUrl() *common.URL { + regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) + return ®url +} diff --git a/registry/service_discovery.go b/registry/service_discovery.go new file mode 100644 index 0000000000000000000000000000000000000000..a8228a4abe8ed07e3c5afda300702f778daea4ae --- /dev/null +++ b/registry/service_discovery.go @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +import ( + "fmt" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" +) + +const DefaultPageSize = 100 + +type ServiceDiscovery interface { + fmt.Stringer + + // ----------------- lifecycle ------------------- + + // Destroy will destroy the service discovery. + // If the discovery cannot be destroy, it will return an error. + Destroy() error + + // ----------------- registration ---------------- + + // Register will register an instance of ServiceInstance to registry + Register(instance ServiceInstance) error + + // Update will update the data of the instance in registry + Update(instance ServiceInstance) error + + // Unregister will unregister this instance from registry + Unregister(instance ServiceInstance) error + + // ----------------- discovery ------------------- + // GetDefaultPageSize will return the default page size + GetDefaultPageSize() int + + // GetServices will return the all service names. + GetServices() *gxset.HashSet + + // GetInstances will return all service instances with serviceName + GetInstances(serviceName string) []ServiceInstance + + // GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName + // the page will start at offset + GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager + + // GetHealthyInstancesByPage will return a page containing instances of ServiceInstance. + // The param healthy indices that the instance should be healthy or not. + // The page will start at offset + GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager + + // Batch get all instances by the specified service names + GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager + + // ----------------- event ---------------------- + // AddListener adds a new ServiceInstancesChangedListener + // see addServiceInstancesChangedListener in Java + AddListener(listener *ServiceInstancesChangedListener) error + + // DispatchEventByServiceName dispatches the ServiceInstancesChangedEvent to service instance whose name is serviceName + DispatchEventByServiceName(serviceName string) error + + // DispatchEventForInstances dispatches the ServiceInstancesChangedEvent to target instances + DispatchEventForInstances(serviceName string, instances []ServiceInstance) error + + // DispatchEvent dispatches the event + DispatchEvent(event *ServiceInstancesChangedEvent) error +} diff --git a/registry/service_instance.go b/registry/service_instance.go new file mode 100644 index 0000000000000000000000000000000000000000..2cc229ee3b056da2d9f1a1b70d3e0f5858c9da5f --- /dev/null +++ b/registry/service_instance.go @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package registry + +type ServiceInstance interface { + + // GetId will return this instance's id. It should be unique. + GetId() string + + // GetServiceName will return the serviceName + GetServiceName() string + + // GetHost will return the hostname + GetHost() string + + // GetPort will return the port. + GetPort() int + + // IsEnable will return the enable status of this instance + IsEnable() bool + + // IsHealthy will return the value represent the instance whether healthy or not + IsHealthy() bool + + // GetMetadata will return the metadata + GetMetadata() map[string]string +} + +// DefaultServiceInstance the default implementation of ServiceInstance +// or change the ServiceInstance to be struct??? +type DefaultServiceInstance struct { + Id string + ServiceName string + Host string + Port int + Enable bool + Healthy bool + Metadata map[string]string +} + +// GetId will return this instance's id. It should be unique. +func (d *DefaultServiceInstance) GetId() string { + return d.Id +} + +// GetServiceName will return the serviceName +func (d *DefaultServiceInstance) GetServiceName() string { + return d.ServiceName +} + +// GetHost will return the hostname +func (d *DefaultServiceInstance) GetHost() string { + return d.Host +} + +// GetPort will return the port. +func (d *DefaultServiceInstance) GetPort() int { + return d.Port +} + +// IsEnable will return the enable status of this instance +func (d *DefaultServiceInstance) IsEnable() bool { + return d.Enable +} + +// IsHealthy will return the value represent the instance whether healthy or not +func (d *DefaultServiceInstance) IsHealthy() bool { + return d.Healthy +} + +// GetMetadata will return the metadata +func (d *DefaultServiceInstance) GetMetadata() map[string]string { + return d.Metadata +} diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index c788bc4c1157bc0c8e10fa5bb723cc0cc56f791d..bd1da547766abb12dc742234787262212e3db314 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -400,8 +400,16 @@ func (z *ZookeeperClient) Close() { logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs) } -// Create ... +// Create will create the node recursively, which means that if the parent node is absent, +// it will create parent node first. +// And the value for the basePath is "" func (z *ZookeeperClient) Create(basePath string) error { + return z.CreateWithValue(basePath, []byte("")) +} + +// CreateWithValue will create the node recursively, which means that if the parent node is absent, +// it will create parent node first. +func (z *ZookeeperClient) CreateWithValue(basePath string, value []byte) error { var ( err error tmpPath string @@ -415,7 +423,7 @@ func (z *ZookeeperClient) Create(basePath string) error { conn := z.Conn z.Unlock() if conn != nil { - _, err = conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll)) + _, err = conn.Create(tmpPath, value, 0, zk.WorldACL(zk.PermAll)) } if err != nil {