From e102328b4e849a9274a0497f1b464d5d382df616 Mon Sep 17 00:00:00 2001 From: Joe Zou <yixian.zou@gmail.com> Date: Tue, 5 Nov 2019 15:30:30 +0800 Subject: [PATCH] update apollo --- config_center/apollo/impl.go | 28 ++------------- config_center/apollo/impl_test.go | 60 +++++++++++++++++++++++++++++-- config_center/apollo/listener.go | 14 ++++++++ 3 files changed, 74 insertions(+), 28 deletions(-) diff --git a/config_center/apollo/impl.go b/config_center/apollo/impl.go index a8c2dc9d5..545294e19 100644 --- a/config_center/apollo/impl.go +++ b/config_center/apollo/impl.go @@ -69,31 +69,7 @@ func newApolloDynamicConfiguration(url *common.URL) (*apolloDynamicConfiguration return c, agollo.Start() } -type apolloChangeListener struct { - c *apolloDynamicConfiguration -} - -func (a *apolloChangeListener) OnChange(event *agollo.ChangeEvent) { - for name, change := range event.Changes { - cfgChangeEvent := &config_center.ConfigChangeEvent{ - Key: name, - Value: change.NewValue, - ConfigType: a.c.getChangeType(change.ChangeType), - } - a.c.listeners.Range(func(key, value interface{}) bool { - for listener, _ := range value.(apolloListener).listeners { - listener.Process(cfgChangeEvent) - } - return true - }) - } -} - -func (c *apolloDynamicConfiguration) start() { - agollo.AddChangeListener(&apolloChangeListener{}) -} - -func (c *apolloDynamicConfiguration) getChangeType(change agollo.ConfigChangeType) remoting.EventType { +func getChangeType(change agollo.ConfigChangeType) remoting.EventType { switch change { case agollo.ADDED: return remoting.EventTypeAdd @@ -102,7 +78,7 @@ func (c *apolloDynamicConfiguration) getChangeType(change agollo.ConfigChangeTyp case agollo.MODIFIED: return remoting.EventTypeUpdate default: - panic("unknow type: " + strconv.Itoa(int(change))) + panic("unknown type: " + strconv.Itoa(int(change))) } } diff --git a/config_center/apollo/impl_test.go b/config_center/apollo/impl_test.go index 41846ae14..4cb740166 100644 --- a/config_center/apollo/impl_test.go +++ b/config_center/apollo/impl_test.go @@ -23,10 +23,12 @@ import ( "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center/parser" + "github.com/apache/dubbo-go/remoting" "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" "strings" + "sync" "testing" ) @@ -34,11 +36,13 @@ const ( mockAppId = "testApplication_yang" mockCluster = "dev" mockNamespace = "mockDubbog.properties" - mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"","application.owner":"ZX","registries.shanghaizk.username":"","protocols.dubbo.ip":"127.0.0.1","protocol_conf.dubbo.getty_session_param.tcp_write_timeout":"5s","services.UserProvider.cluster":"failover","application.module":"dubbogo user-info server","services.UserProvider.interface":"com.ikurento.user.UserProvider","protocol_conf.dubbo.getty_session_param.compress_encoding":"false","registries.shanghaizk.address":"127.0.0.1:2182","protocol_conf.dubbo.session_timeout":"20s","registries.shanghaizk.timeout":"3s","protocol_conf.dubbo.getty_session_param.keep_alive_period":"120s","services.UserProvider.warmup":"100","application.version":"0.0.1","registries.hangzhouzk.protocol":"zookeeper","registries.hangzhouzk.password":"","protocols.dubbo.name":"dubbo","protocol_conf.dubbo.getty_session_param.wait_timeout":"1s","protocols.dubbo.port":"20000","application_config.owner":"demo","application_config.name":"demo","application_config.version":"0.0.1","application_config.environment":"dev","protocol_conf.dubbo.getty_session_param.session_name":"server","application.name":"BDTService","registries.hangzhouzk.timeout":"3s","protocol_conf.dubbo.getty_session_param.tcp_read_timeout":"1s","services.UserProvider.loadbalance":"random","protocol_conf.dubbo.session_number":"700","protocol_conf.dubbo.getty_session_param.max_msg_len":"1024","services.UserProvider.registry":"hangzhouzk","application_config.module":"demo","services.UserProvider.methods[0].name":"GetUser","protocol_conf.dubbo.getty_session_param.tcp_no_delay":"true","services.UserProvider.methods[0].retries":"1","protocol_conf.dubbo.getty_session_param.tcp_w_buf_size":"65536","protocol_conf.dubbo.getty_session_param.tcp_r_buf_size":"262144","registries.shanghaizk.password":"","application_config.organization":"demo","registries.shanghaizk.protocol":"zookeeper","protocol_conf.dubbo.getty_session_param.tcp_keep_alive":"true","registries.hangzhouzk.address":"127.0.0.1:2181","application.environment":"dev","services.UserProvider.protocol":"dubbo","application.organization":"ikurento.com","protocol_conf.dubbo.getty_session_param.pkg_wq_size":"512","services.UserProvider.methods[0].loadbalance":"random"},"releaseKey":"20191104105242-0f13805d89f834a4"}` mockNotifyRes = `[{"namespaceName":"mockDubbog","notificationId":53050,"messages":{"details":{"testApplication_yang+default+mockDubbog":53050}}}]` mockServiceConfigRes = `[{"appName":"APOLLO-CONFIGSERVICE","instanceId":"instance-300408ep:apollo-configservice:8080","homepageUrl":"http://localhost:8080"}]` - apolloUrl = "apollo://%s" +) + +var ( + mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"","application.owner":"ZX","registries.shanghaizk.username":"","protocols.dubbo.ip":"127.0.0.1","protocol_conf.dubbo.getty_session_param.tcp_write_timeout":"5s","services.UserProvider.cluster":"failover","application.module":"dubbogo user-info server","services.UserProvider.interface":"com.ikurento.user.UserProvider","protocol_conf.dubbo.getty_session_param.compress_encoding":"false","registries.shanghaizk.address":"127.0.0.1:2182","protocol_conf.dubbo.session_timeout":"20s","registries.shanghaizk.timeout":"3s","protocol_conf.dubbo.getty_session_param.keep_alive_period":"120s","services.UserProvider.warmup":"100","application.version":"0.0.1","registries.hangzhouzk.protocol":"zookeeper","registries.hangzhouzk.password":"","protocols.dubbo.name":"dubbo","protocol_conf.dubbo.getty_session_param.wait_timeout":"1s","protocols.dubbo.port":"20000","application_config.owner":"demo","application_config.name":"demo","application_config.version":"0.0.1","application_config.environment":"dev","protocol_conf.dubbo.getty_session_param.session_name":"server","application.name":"BDTService","registries.hangzhouzk.timeout":"3s","protocol_conf.dubbo.getty_session_param.tcp_read_timeout":"1s","services.UserProvider.loadbalance":"random","protocol_conf.dubbo.session_number":"700","protocol_conf.dubbo.getty_session_param.max_msg_len":"1024","services.UserProvider.registry":"hangzhouzk","application_config.module":"demo","services.UserProvider.methods[0].name":"GetUser","protocol_conf.dubbo.getty_session_param.tcp_no_delay":"true","services.UserProvider.methods[0].retries":"1","protocol_conf.dubbo.getty_session_param.tcp_w_buf_size":"65536","protocol_conf.dubbo.getty_session_param.tcp_r_buf_size":"262144","registries.shanghaizk.password":"","application_config.organization":"demo","registries.shanghaizk.protocol":"zookeeper","protocol_conf.dubbo.getty_session_param.tcp_keep_alive":"true","registries.hangzhouzk.address":"127.0.0.1:2181","application.environment":"dev","services.UserProvider.protocol":"dubbo","application.organization":"ikurento.com","protocol_conf.dubbo.getty_session_param.pkg_wq_size":"512","services.UserProvider.methods[0].loadbalance":"random"},"releaseKey":"20191104105242-0f13805d89f834a4"}` ) func initApollo() *httptest.Server { @@ -88,6 +92,10 @@ func runMockConfigServer(handlerMap map[string]func(http.ResponseWriter, *http.R } func Test_GetConfig(t *testing.T) { + initMockApollo(t) +} + +func initMockApollo(t *testing.T) *apolloDynamicConfiguration { c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{ Protocol: "apollo", Address: "106.12.25.204:8080", @@ -107,4 +115,52 @@ func Test_GetConfig(t *testing.T) { mapContent, err := configuration.Parser().Parse(configs) assert.NoError(t, err) assert.Equal(t, "ikurento.com", mapContent["application.organization"]) + return configuration +} + +func TestAddListener(t *testing.T) { + listener := &apolloDataListener{} + listener.wg.Add(1) + apollo := initMockApollo(t) + mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"11111"},"releaseKey":"20191104105242-0f13805d89f834a4"}` + apollo.AddListener(mockNamespace, listener) + listener.wg.Wait() + assert.Equal(t, "registries.hangzhouzk.username", listener.event) + assert.Greater(t, listener.count, 0) +} + +func TestRemoveListener(t *testing.T) { + listener := &apolloDataListener{} + apollo := initMockApollo(t) + mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"11111"},"releaseKey":"20191104105242-0f13805d89f834a4"}` + apollo.AddListener(mockNamespace, listener) + apollo.RemoveListener(mockNamespace, listener) + assert.Equal(t, "", listener.event) + listenerCount := 0 + apollo.listeners.Range(func(key, value interface{}) bool { + apolloListener := value.(*apolloListener) + for e := range apolloListener.listeners { + fmt.Println(e) + listenerCount++ + } + return true + }) + assert.Equal(t, listenerCount, 0) + assert.Equal(t, listener.count, 0) +} + +type apolloDataListener struct { + wg sync.WaitGroup + count int + event string +} + +func (l *apolloDataListener) Process(configType *config_center.ConfigChangeEvent) { + if configType.ConfigType != remoting.EventTypeUpdate { + return + } + fmt.Println("process!!!!!") + l.wg.Done() + l.count++ + l.event = configType.Key } diff --git a/config_center/apollo/listener.go b/config_center/apollo/listener.go index d9a0fddcb..739b1f609 100644 --- a/config_center/apollo/listener.go +++ b/config_center/apollo/listener.go @@ -19,12 +19,25 @@ package apollo import ( "github.com/apache/dubbo-go/config_center" + "github.com/zouyx/agollo" ) type apolloListener struct { listeners map[config_center.ConfigurationListener]struct{} } +func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) { + for key, change := range changeEvent.Changes { + for listener := range a.listeners { + listener.Process(&config_center.ConfigChangeEvent{ + ConfigType: getChangeType(change.ChangeType), + Key: key, + Value: change.NewValue, + }) + } + } +} + func NewApolloListener() *apolloListener { return &apolloListener{ listeners: make(map[config_center.ConfigurationListener]struct{}, 0), @@ -34,6 +47,7 @@ func NewApolloListener() *apolloListener { func (al *apolloListener) AddListener(l config_center.ConfigurationListener) { if _, ok := al.listeners[l]; !ok { al.listeners[l] = struct{}{} + agollo.AddChangeListener(al) } } -- GitLab