Skip to content
Snippets Groups Projects
Commit 5ffdb542 authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #250 from zouyx/feature/apollo

Ftr: config center apollo
parents ab4d0468 2f0e0361
No related branches found
No related tags found
No related merge requests found
......@@ -100,6 +100,9 @@ const (
const (
CONFIG_NAMESPACE_KEY = "config.namespace"
CONFIG_GROUP_KEY = "config.group"
CONFIG_CLUSTER_KEY = "config.cluster"
CONFIG_CHECK_KEY = "config.check"
CONFIG_TIMEOUT_KET = "config.timeout"
CONFIG_VERSION_KEY = "configVersion"
COMPATIBLE_CONFIG_KEY = "compatible_config"
......
......@@ -47,7 +47,7 @@ type BaseConfig struct {
}
func (c *BaseConfig) startConfigCenter(ctx context.Context) error {
url, err := common.NewURL(ctx, c.ConfigCenterConfig.Address, common.WithProtocol(c.ConfigCenterConfig.Protocol))
url, err := common.NewURL(ctx, c.ConfigCenterConfig.Address, common.WithProtocol(c.ConfigCenterConfig.Protocol), common.WithParams(c.ConfigCenterConfig.GetUrlMap()))
if err != nil {
return err
}
......
......@@ -29,6 +29,7 @@ import (
"github.com/apache/dubbo-go/common/config"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/apollo"
)
func Test_refresh(t *testing.T) {
......
......@@ -19,6 +19,7 @@ package config
import (
"context"
"net/url"
"time"
)
......@@ -26,6 +27,10 @@ import (
"github.com/creasty/defaults"
)
import (
"github.com/apache/dubbo-go/common/constant"
)
type ConfigCenterConfig struct {
context context.Context
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty"`
......@@ -35,6 +40,7 @@ type ConfigCenterConfig struct {
Username string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
ConfigFile string `default:"dubbo.properties" yaml:"config_file" json:"config_file,omitempty"`
Namespace string `default:"dubbo.properties" yaml:"namespace" json:"namespace,omitempty"`
AppConfigFile string `default:"dubbo.properties" yaml:"app_config_file" json:"app_config_file,omitempty"`
TimeoutStr string `yaml:"timeout" json:"timeout,omitempty"`
timeout time.Duration
......@@ -50,3 +56,11 @@ func (c *ConfigCenterConfig) UnmarshalYAML(unmarshal func(interface{}) error) er
}
return nil
}
func (c *ConfigCenterConfig) GetUrlMap() url.Values {
urlMap := url.Values{}
urlMap.Set(constant.CONFIG_NAMESPACE_KEY, c.Namespace)
urlMap.Set(constant.CONFIG_GROUP_KEY, c.Group)
urlMap.Set(constant.CONFIG_CLUSTER_KEY, c.Cluster)
return urlMap
}
filter=
request_timeout=100ms
connect_timeout=100ms
check=true
application.organization=ikurento.com
application.name=BDTService
application.module=dubbogo user-info client
application.version=0.0.1
application.owner=ZX
application.environment=dev
registries.hangzhouzk.protocol=zookeeper
registries.hangzhouzk.timeout=3s
registries.hangzhouzk.address=127.0.0.1:2181
registries.hangzhouzk.username=
registries.hangzhouzk.password=
registries.shanghaizk.protocol=zookeeper
registries.shanghaizk.timeout=3s
registries.shanghaizk.address=127.0.0.1:2182
registries.shanghaizk.username=
registries.shanghaizk.password=
references.UserProvider.registry=hangzhouzk,shanghaizk
references.UserProvider.filter=
references.UserProvider.version=1.0
references.UserProvider.group=as
references.UserProvider.interface=com.ikurento.user.UserProvider
references.UserProvider.url=dubbo://127.0.0.1:20000/UserProvider
references.UserProvider.cluster=failover
references.UserProvider.methods[0].name=GetUser
references.UserProvider.methods[0].retries=3
references.UserProvider.params.serviceid=soa.com.ikurento.user.UserProvider
references.UserProvider.params.forks=5
protocol_conf.dubbo.reconnect_interval=0
protocol_conf.dubbo.connection_number=2
protocol_conf.dubbo.heartbeat_period=5s
protocol_conf.dubbo.session_timeout=20s
protocol_conf.dubbo.pool_size=64
protocol_conf.dubbo.pool_ttl=600
protocol_conf.dubbo.gr_pool_size=1200
protocol_conf.dubbo.queue_len=64
protocol_conf.dubbo.queue_number=60
protocol_conf.dubbo.getty_session_param.compress_encoding=false
protocol_conf.dubbo.getty_session_param.tcp_no_delay=true
protocol_conf.dubbo.getty_session_param.tcp_keep_alive=true
protocol_conf.dubbo.getty_session_param.keep_alive_period=120s
protocol_conf.dubbo.getty_session_param.tcp_r_buf_size=262144
protocol_conf.dubbo.getty_session_param.tcp_w_buf_size=65536
protocol_conf.dubbo.getty_session_param.pkg_wq_size=512
protocol_conf.dubbo.getty_session_param.tcp_read_timeout=1s
protocol_conf.dubbo.getty_session_param.tcp_write_timeout=5s
protocol_conf.dubbo.getty_session_param.wait_timeout=1s
protocol_conf.dubbo.getty_session_param.max_msg_len=1024
protocol_conf.dubbo.getty_session_param.session_name=client
\ No newline at end of file
# use apollo config center for fetch config file
# default config file namespace is dubbo.properties
# consumer config file Ref:consumer_config.properties
# provider config file Ref:provider_config.properties
config_center:
protocol: apollo
address: 106.12.25.204:8080
group: testApplication_yang
cluster: dev
# 'namespace' can be used for router rule , default value is dubbo.properties
# but if you want to change router rule config file ,just open this item
# namespace: governance.properties
# 'config_file' is not necessary ,default : dubbo.properties
# but if you want to change config file ,just open this item
# config_file: mockDubbog.properties
# application config required
application:
organization: "ikurento.com"
name: "BDTService"
module: "dubbogo user-info server"
version: "0.0.1"
owner: "ZX"
environment: "dev"
\ No newline at end of file
filter=
application.organization=ikurento.com
application.name=BDTService
application.module=dubbogo user-info server
application.version=0.0.1
application.owner=ZX
application.environment=dev
registries.hangzhouzk.protocol=zookeeper
registries.hangzhouzk.timeout=3s
registries.hangzhouzk.address=127.0.0.1:2181
registries.hangzhouzk.username=
registries.hangzhouzk.password=
registries.shanghaizk.protocol=zookeeper
registries.shanghaizk.timeout=3s
registries.shanghaizk.address=127.0.0.1:2182
registries.shanghaizk.username=
registries.shanghaizk.password=
services.UserProvider.registry=hangzhouzk,shanghaizk
services.UserProvider.filter=
services.UserProvider.tps.limiter=default
services.UserProvider.tps.limit.interval=60000
services.UserProvider.tps.limit.rate=200
services.UserProvider.tps.limit.strategy=slidingWindow
services.UserProvider.tps.limit.rejected.handler=default
services.UserProvider.execute.limit=200
services.UserProvider.execute.limit.rejected.handler=default
services.UserProvider.protocol=dubbo
services.UserProvider.interface=com.ikurento.user.UserProvider
services.UserProvider.loadbalance=random
services.UserProvider.version=1.0
services.UserProvider.group=as
services.UserProvider.warmup=100
services.UserProvider.cluster=failover
services.UserProvider.methods[0].name=GetUser
services.UserProvider.methods[0].retries=1
services.UserProvider.methods[0].loadbalance=random
services.UserProvider.methods[0].execute.limit=200
services.UserProvider.methods[0].execute.limit.rejected.handler=default
protocols.dubbo.name=dubbo
protocols.dubbo.ip=127.0.0.1
protocols.dubbo.port=20000
protocol_conf.dubbo.session_number=700
protocol_conf.dubbo.session_timeout=20s
protocol_conf.dubbo.gr_pool_size=120
protocol_conf.dubbo.queue_len=64
protocol_conf.dubbo.queue_number=6
protocol_conf.dubbo.getty_session_param.compress_encoding=false
protocol_conf.dubbo.getty_session_param.tcp_no_delay=true
protocol_conf.dubbo.getty_session_param.tcp_keep_alive=true
protocol_conf.dubbo.getty_session_param.keep_alive_period=120s
protocol_conf.dubbo.getty_session_param.tcp_r_buf_size=262144
protocol_conf.dubbo.getty_session_param.tcp_w_buf_size=65536
protocol_conf.dubbo.getty_session_param.pkg_wq_size=512
protocol_conf.dubbo.getty_session_param.tcp_read_timeout=1s
protocol_conf.dubbo.getty_session_param.tcp_write_timeout=5s
protocol_conf.dubbo.getty_session_param.wait_timeout=1s
protocol_conf.dubbo.getty_session_param.max_msg_len=1024
protocol_conf.dubbo.getty_session_param.session_name=server
\ No newline at end of file
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apollo
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
. "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
func init() {
extension.SetConfigCenterFactory("apollo", createDynamicConfigurationFactory)
}
func createDynamicConfigurationFactory() DynamicConfigurationFactory {
return &apolloConfigurationFactory{}
}
type apolloConfigurationFactory struct{}
func (f *apolloConfigurationFactory) GetDynamicConfiguration(url *common.URL) (DynamicConfiguration, error) {
dynamicConfiguration, err := newApolloConfiguration(url)
if err != nil {
return nil, err
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apollo
import (
"fmt"
"regexp"
"strings"
"sync"
)
import (
"github.com/pkg/errors"
"github.com/zouyx/agollo"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
. "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
"github.com/apache/dubbo-go/remoting"
)
const (
apolloProtocolPrefix = "http://"
apolloConfigFormat = "%s.%s"
)
type apolloConfiguration struct {
url *common.URL
listeners sync.Map
appConf *agollo.AppConfig
parser parser.ConfigurationParser
}
func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) {
c := &apolloConfiguration{
url: url,
}
configAddr := c.getAddressWithProtocolPrefix(url)
configCluster := url.GetParam(constant.CONFIG_CLUSTER_KEY, "")
appId := url.GetParam(constant.CONFIG_GROUP_KEY, DEFAULT_GROUP)
namespaces := url.GetParam(constant.CONFIG_NAMESPACE_KEY, getProperties(DEFAULT_GROUP))
c.appConf = &agollo.AppConfig{
AppId: appId,
Cluster: configCluster,
NamespaceName: namespaces,
Ip: configAddr,
}
agollo.InitCustomConfig(func() (*agollo.AppConfig, error) {
return c.appConf, nil
})
return c, agollo.Start()
}
func getChangeType(change agollo.ConfigChangeType) remoting.EventType {
switch change {
case agollo.ADDED:
return remoting.EventTypeAdd
case agollo.DELETED:
return remoting.EventTypeDel
default:
return remoting.EventTypeUpdate
}
}
func (c *apolloConfiguration) AddListener(key string, listener ConfigurationListener, opts ...Option) {
k := &Options{}
for _, opt := range opts {
opt(k)
}
key = k.Group + key
l, _ := c.listeners.LoadOrStore(key, NewApolloListener())
l.(*apolloListener).AddListener(listener)
}
func (c *apolloConfiguration) RemoveListener(key string, listener ConfigurationListener, opts ...Option) {
k := &Options{}
for _, opt := range opts {
opt(k)
}
key = k.Group + key
l, ok := c.listeners.Load(key)
if ok {
l.(*apolloListener).RemoveListener(listener)
}
}
func getProperties(namespace string) string {
return getNamespaceName(namespace, agollo.Properties)
}
func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat) string {
return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat)
}
func (c *apolloConfiguration) GetConfig(key string, opts ...Option) (string, error) {
k := &Options{}
for _, opt := range opts {
opt(k)
}
/**
* when group is not null, we are getting startup configs(config file) from Config Center, for example:
* key=dubbo.propertie
*/
if len(k.Group) != 0 {
config := agollo.GetConfig(key)
if config == nil {
return "", errors.New(fmt.Sprintf("nothiing in namespace:%s ", key))
}
return config.GetContent(agollo.Properties), nil
}
/**
* when group is null, we are fetching governance rules(config item) from Config Center, for example:
* namespace=use default, key =application.organization
*/
config := agollo.GetConfig(c.appConf.NamespaceName)
if config == nil {
return "", errors.New(fmt.Sprintf("nothiing in namespace:%s ", key))
}
return config.GetStringValue(key, ""), nil
}
func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string {
address := url.Location
converted := address
if len(address) != 0 {
reg := regexp.MustCompile("\\s+")
address = reg.ReplaceAllString(address, "")
parts := strings.Split(address, ",")
addrs := make([]string, 0)
for _, part := range parts {
addr := part
if !strings.HasPrefix(part, apolloProtocolPrefix) {
addr = apolloProtocolPrefix + part
}
addrs = append(addrs, addr)
}
converted = strings.Join(addrs, ",")
}
return converted
}
func (c *apolloConfiguration) Parser() parser.ConfigurationParser {
return c.parser
}
func (c *apolloConfiguration) SetParser(p parser.ConfigurationParser) {
c.parser = p
}
func (c *apolloConfiguration) GetConfigs(key string, opts ...Option) (string, error) {
return c.GetConfig(key, opts...)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package apollo
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
"github.com/apache/dubbo-go/remoting"
)
const (
mockAppId = "testApplication_yang"
mockCluster = "dev"
mockNamespace = "mockDubbog.properties"
mockNotifyRes = `[{
"namespaceName": "mockDubbog.properties",
"notificationId": 53050,
"messages": {
"details": {
"testApplication_yang+default+mockDubbog": 53050
}
}
}]`
mockServiceConfigRes = `[{
"appName": "APOLLO-CONFIGSERVICE",
"instanceId": "instance-300408ep:apollo-configservice:8080",
"homepageUrl": "http://localhost:8080"
}]`
)
var (
mockConfigRes = `{
"appId": "testApplication_yang",
"cluster": "default",
"namespaceName": "mockDubbog.properties",
"configurations": {
"registries.hangzhouzk.username": "",
"application.owner": "ZX",
"registries.shanghaizk.username": "",
"protocols.dubbo.ip": "127.0.0.1",
"protocol_conf.dubbo.getty_session_param.tcp_write_timeout": "5s",
"services.UserProvider.cluster": "failover",
"application.module": "dubbogo user-info server",
"services.UserProvider.interface": "com.ikurento.user.UserProvider",
"protocol_conf.dubbo.getty_session_param.compress_encoding": "false",
"registries.shanghaizk.address": "127.0.0.1:2182",
"protocol_conf.dubbo.session_timeout": "20s",
"registries.shanghaizk.timeout": "3s",
"protocol_conf.dubbo.getty_session_param.keep_alive_period": "120s",
"services.UserProvider.warmup": "100",
"application.version": "0.0.1",
"registries.hangzhouzk.protocol": "zookeeper",
"registries.hangzhouzk.password": "",
"protocols.dubbo.name": "dubbo",
"protocol_conf.dubbo.getty_session_param.wait_timeout": "1s",
"protocols.dubbo.port": "20000",
"application_config.owner": "demo",
"application_config.name": "demo",
"application_config.version": "0.0.1",
"application_config.environment": "dev",
"protocol_conf.dubbo.getty_session_param.session_name": "server",
"application.name": "BDTService",
"registries.hangzhouzk.timeout": "3s",
"protocol_conf.dubbo.getty_session_param.tcp_read_timeout": "1s",
"services.UserProvider.loadbalance": "random",
"protocol_conf.dubbo.session_number": "700",
"protocol_conf.dubbo.getty_session_param.max_msg_len": "1024",
"services.UserProvider.registry": "hangzhouzk",
"application_config.module": "demo",
"services.UserProvider.methods[0].name": "GetUser",
"protocol_conf.dubbo.getty_session_param.tcp_no_delay": "true",
"services.UserProvider.methods[0].retries": "1",
"protocol_conf.dubbo.getty_session_param.tcp_w_buf_size": "65536",
"protocol_conf.dubbo.getty_session_param.tcp_r_buf_size": "262144",
"registries.shanghaizk.password": "",
"application_config.organization": "demo",
"registries.shanghaizk.protocol": "zookeeper",
"protocol_conf.dubbo.getty_session_param.tcp_keep_alive": "true",
"registries.hangzhouzk.address": "127.0.0.1:2181",
"application.environment": "dev",
"services.UserProvider.protocol": "dubbo",
"application.organization": "ikurento.com",
"protocol_conf.dubbo.getty_session_param.pkg_wq_size": "512",
"services.UserProvider.methods[0].loadbalance": "random"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
)
func initApollo() *httptest.Server {
handlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 1)
handlerMap[mockNamespace] = configResponse
return runMockConfigServer(handlerMap, notifyResponse)
}
func configResponse(rw http.ResponseWriter, req *http.Request) {
result := fmt.Sprintf(mockConfigRes)
fmt.Fprintf(rw, "%s", result)
}
func notifyResponse(rw http.ResponseWriter, req *http.Request) {
result := fmt.Sprintf(mockNotifyRes)
fmt.Fprintf(rw, "%s", result)
}
func serviceConfigResponse(rw http.ResponseWriter, req *http.Request) {
result := fmt.Sprintf(mockServiceConfigRes)
fmt.Fprintf(rw, "%s", result)
}
//run mock config server
func runMockConfigServer(handlerMap map[string]func(http.ResponseWriter, *http.Request),
notifyHandler func(http.ResponseWriter, *http.Request)) *httptest.Server {
uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 0)
for namespace, handler := range handlerMap {
uri := fmt.Sprintf("/configs/%s/%s/%s", mockAppId, mockCluster, namespace)
uriHandlerMap[uri] = handler
}
uriHandlerMap["/notifications/v2"] = notifyHandler
uriHandlerMap["/services/config"] = serviceConfigResponse
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
uri := r.RequestURI
for path, handler := range uriHandlerMap {
if strings.HasPrefix(uri, path) {
handler(w, r)
break
}
}
}))
return ts
}
func Test_GetConfig(t *testing.T) {
configuration := initMockApollo(t)
configs, err := configuration.GetConfig(mockNamespace, config_center.WithGroup("dubbo"))
assert.NoError(t, err)
configuration.SetParser(&parser.DefaultConfigurationParser{})
mapContent, err := configuration.Parser().Parse(configs)
assert.NoError(t, err)
assert.Equal(t, "ikurento.com", mapContent["application.organization"])
}
func Test_GetConfigItem(t *testing.T) {
configuration := initMockApollo(t)
configs, err := configuration.GetConfig("application.organization")
assert.NoError(t, err)
configuration.SetParser(&parser.DefaultConfigurationParser{})
assert.NoError(t, err)
assert.Equal(t, "ikurento.com", configs)
}
func initMockApollo(t *testing.T) *apolloConfiguration {
c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{
Protocol: "apollo",
Address: "106.12.25.204:8080",
Group: "testApplication_yang",
Cluster: "dev",
Namespace: "mockDubbog.properties",
}}
apollo := initApollo()
apolloUrl := strings.ReplaceAll(apollo.URL, "http", "apollo")
url, err := common.NewURL(context.TODO(), apolloUrl, common.WithParams(c.ConfigCenterConfig.GetUrlMap()))
assert.NoError(t, err)
configuration, err := newApolloConfiguration(&url)
assert.NoError(t, err)
return configuration
}
func TestAddListener(t *testing.T) {
listener := &apolloDataListener{}
listener.wg.Add(1)
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
"cluster": "default",
"namespaceName": "mockDubbog.properties",
"configurations": {
"registries.hangzhouzk.username": "11111"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
apollo.AddListener(mockNamespace, listener)
listener.wg.Wait()
assert.Equal(t, "registries.hangzhouzk.username", listener.event)
assert.Greater(t, listener.count, 0)
}
func TestRemoveListener(t *testing.T) {
listener := &apolloDataListener{}
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
"cluster": "default",
"namespaceName": "mockDubbog.properties",
"configurations": {
"registries.hangzhouzk.username": "11111"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
apollo.AddListener(mockNamespace, listener)
apollo.RemoveListener(mockNamespace, listener)
assert.Equal(t, "", listener.event)
listenerCount := 0
apollo.listeners.Range(func(key, value interface{}) bool {
apolloListener := value.(*apolloListener)
for e := range apolloListener.listeners {
fmt.Println(e)
listenerCount++
}
return true
})
assert.Equal(t, listenerCount, 0)
assert.Equal(t, listener.count, 0)
}
type apolloDataListener struct {
wg sync.WaitGroup
count int
event string
}
func (l *apolloDataListener) Process(configType *config_center.ConfigChangeEvent) {
if configType.ConfigType != remoting.EventTypeUpdate {
return
}
l.wg.Done()
l.count++
l.event = configType.Key
}
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apollo
import (
"github.com/zouyx/agollo"
)
import (
"github.com/apache/dubbo-go/config_center"
)
type apolloListener struct {
listeners map[config_center.ConfigurationListener]struct{}
}
func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) {
for key, change := range changeEvent.Changes {
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: getChangeType(change.ChangeType),
Key: key,
Value: change.NewValue,
})
}
}
}
func NewApolloListener() *apolloListener {
return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
}
}
func (al *apolloListener) AddListener(l config_center.ConfigurationListener) {
if _, ok := al.listeners[l]; !ok {
al.listeners[l] = struct{}{}
agollo.AddChangeListener(al)
}
}
func (al *apolloListener) RemoveListener(l config_center.ConfigurationListener) {
delete(al.listeners, l)
}
......@@ -17,10 +17,6 @@
package zookeeper
import (
"sync"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
......@@ -35,14 +31,8 @@ func init() {
type zookeeperDynamicConfigurationFactory struct {
}
var once sync.Once
var dynamicConfiguration *zookeeperDynamicConfiguration
func (f *zookeeperDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) {
var err error
once.Do(func() {
dynamicConfiguration, err = newZookeeperDynamicConfiguration(url)
})
dynamicConfiguration, err := newZookeeperDynamicConfiguration(url)
if err != nil {
return nil, err
}
......
......@@ -44,6 +44,7 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8
go.etcd.io/bbolt v1.3.3 // indirect
go.etcd.io/etcd v3.3.13+incompatible
go.uber.org/atomic v1.4.0
......
......@@ -453,6 +453,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto=
github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8=
github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZMibh0H/k=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D4Jj5eZ41Zm3IH/J8OElK1Qtd7tVKAwLk=
......@@ -463,6 +464,8 @@ github.com/vmware/govmomi v0.18.0 h1:f7QxSmP7meCtoAmiKZogvVbLInT+CZx6Px6K5rYsJZo
github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8 h1:k8TV7Gz7cpWpOw/dz71fx8cCZdWoPuckHJ/wkJl+meg=
github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8/go.mod h1:S1cAa98KMFv4Sa8SbJ6ZtvOmf0VlgH0QJ1gXI0lBfBY=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v3.3.13+incompatible h1:jCejD5EMnlGxFvcGRyEV4VGlENZc7oPQX6o0t7n3xbw=
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment