Skip to content
Snippets Groups Projects
Commit e102328b authored by 邹毅贤's avatar 邹毅贤
Browse files

update apollo

parent acad6b41
No related branches found
No related tags found
No related merge requests found
...@@ -69,31 +69,7 @@ func newApolloDynamicConfiguration(url *common.URL) (*apolloDynamicConfiguration ...@@ -69,31 +69,7 @@ func newApolloDynamicConfiguration(url *common.URL) (*apolloDynamicConfiguration
return c, agollo.Start() return c, agollo.Start()
} }
type apolloChangeListener struct { func getChangeType(change agollo.ConfigChangeType) remoting.EventType {
c *apolloDynamicConfiguration
}
func (a *apolloChangeListener) OnChange(event *agollo.ChangeEvent) {
for name, change := range event.Changes {
cfgChangeEvent := &config_center.ConfigChangeEvent{
Key: name,
Value: change.NewValue,
ConfigType: a.c.getChangeType(change.ChangeType),
}
a.c.listeners.Range(func(key, value interface{}) bool {
for listener, _ := range value.(apolloListener).listeners {
listener.Process(cfgChangeEvent)
}
return true
})
}
}
func (c *apolloDynamicConfiguration) start() {
agollo.AddChangeListener(&apolloChangeListener{})
}
func (c *apolloDynamicConfiguration) getChangeType(change agollo.ConfigChangeType) remoting.EventType {
switch change { switch change {
case agollo.ADDED: case agollo.ADDED:
return remoting.EventTypeAdd return remoting.EventTypeAdd
...@@ -102,7 +78,7 @@ func (c *apolloDynamicConfiguration) getChangeType(change agollo.ConfigChangeTyp ...@@ -102,7 +78,7 @@ func (c *apolloDynamicConfiguration) getChangeType(change agollo.ConfigChangeTyp
case agollo.MODIFIED: case agollo.MODIFIED:
return remoting.EventTypeUpdate return remoting.EventTypeUpdate
default: default:
panic("unknow type: " + strconv.Itoa(int(change))) panic("unknown type: " + strconv.Itoa(int(change)))
} }
} }
......
...@@ -23,10 +23,12 @@ import ( ...@@ -23,10 +23,12 @@ import (
"github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser" "github.com/apache/dubbo-go/config_center/parser"
"github.com/apache/dubbo-go/remoting"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"sync"
"testing" "testing"
) )
...@@ -34,11 +36,13 @@ const ( ...@@ -34,11 +36,13 @@ const (
mockAppId = "testApplication_yang" mockAppId = "testApplication_yang"
mockCluster = "dev" mockCluster = "dev"
mockNamespace = "mockDubbog.properties" mockNamespace = "mockDubbog.properties"
mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"","application.owner":"ZX","registries.shanghaizk.username":"","protocols.dubbo.ip":"127.0.0.1","protocol_conf.dubbo.getty_session_param.tcp_write_timeout":"5s","services.UserProvider.cluster":"failover","application.module":"dubbogo user-info server","services.UserProvider.interface":"com.ikurento.user.UserProvider","protocol_conf.dubbo.getty_session_param.compress_encoding":"false","registries.shanghaizk.address":"127.0.0.1:2182","protocol_conf.dubbo.session_timeout":"20s","registries.shanghaizk.timeout":"3s","protocol_conf.dubbo.getty_session_param.keep_alive_period":"120s","services.UserProvider.warmup":"100","application.version":"0.0.1","registries.hangzhouzk.protocol":"zookeeper","registries.hangzhouzk.password":"","protocols.dubbo.name":"dubbo","protocol_conf.dubbo.getty_session_param.wait_timeout":"1s","protocols.dubbo.port":"20000","application_config.owner":"demo","application_config.name":"demo","application_config.version":"0.0.1","application_config.environment":"dev","protocol_conf.dubbo.getty_session_param.session_name":"server","application.name":"BDTService","registries.hangzhouzk.timeout":"3s","protocol_conf.dubbo.getty_session_param.tcp_read_timeout":"1s","services.UserProvider.loadbalance":"random","protocol_conf.dubbo.session_number":"700","protocol_conf.dubbo.getty_session_param.max_msg_len":"1024","services.UserProvider.registry":"hangzhouzk","application_config.module":"demo","services.UserProvider.methods[0].name":"GetUser","protocol_conf.dubbo.getty_session_param.tcp_no_delay":"true","services.UserProvider.methods[0].retries":"1","protocol_conf.dubbo.getty_session_param.tcp_w_buf_size":"65536","protocol_conf.dubbo.getty_session_param.tcp_r_buf_size":"262144","registries.shanghaizk.password":"","application_config.organization":"demo","registries.shanghaizk.protocol":"zookeeper","protocol_conf.dubbo.getty_session_param.tcp_keep_alive":"true","registries.hangzhouzk.address":"127.0.0.1:2181","application.environment":"dev","services.UserProvider.protocol":"dubbo","application.organization":"ikurento.com","protocol_conf.dubbo.getty_session_param.pkg_wq_size":"512","services.UserProvider.methods[0].loadbalance":"random"},"releaseKey":"20191104105242-0f13805d89f834a4"}`
mockNotifyRes = `[{"namespaceName":"mockDubbog","notificationId":53050,"messages":{"details":{"testApplication_yang+default+mockDubbog":53050}}}]` mockNotifyRes = `[{"namespaceName":"mockDubbog","notificationId":53050,"messages":{"details":{"testApplication_yang+default+mockDubbog":53050}}}]`
mockServiceConfigRes = `[{"appName":"APOLLO-CONFIGSERVICE","instanceId":"instance-300408ep:apollo-configservice:8080","homepageUrl":"http://localhost:8080"}]` mockServiceConfigRes = `[{"appName":"APOLLO-CONFIGSERVICE","instanceId":"instance-300408ep:apollo-configservice:8080","homepageUrl":"http://localhost:8080"}]`
apolloUrl = "apollo://%s" )
var (
mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"","application.owner":"ZX","registries.shanghaizk.username":"","protocols.dubbo.ip":"127.0.0.1","protocol_conf.dubbo.getty_session_param.tcp_write_timeout":"5s","services.UserProvider.cluster":"failover","application.module":"dubbogo user-info server","services.UserProvider.interface":"com.ikurento.user.UserProvider","protocol_conf.dubbo.getty_session_param.compress_encoding":"false","registries.shanghaizk.address":"127.0.0.1:2182","protocol_conf.dubbo.session_timeout":"20s","registries.shanghaizk.timeout":"3s","protocol_conf.dubbo.getty_session_param.keep_alive_period":"120s","services.UserProvider.warmup":"100","application.version":"0.0.1","registries.hangzhouzk.protocol":"zookeeper","registries.hangzhouzk.password":"","protocols.dubbo.name":"dubbo","protocol_conf.dubbo.getty_session_param.wait_timeout":"1s","protocols.dubbo.port":"20000","application_config.owner":"demo","application_config.name":"demo","application_config.version":"0.0.1","application_config.environment":"dev","protocol_conf.dubbo.getty_session_param.session_name":"server","application.name":"BDTService","registries.hangzhouzk.timeout":"3s","protocol_conf.dubbo.getty_session_param.tcp_read_timeout":"1s","services.UserProvider.loadbalance":"random","protocol_conf.dubbo.session_number":"700","protocol_conf.dubbo.getty_session_param.max_msg_len":"1024","services.UserProvider.registry":"hangzhouzk","application_config.module":"demo","services.UserProvider.methods[0].name":"GetUser","protocol_conf.dubbo.getty_session_param.tcp_no_delay":"true","services.UserProvider.methods[0].retries":"1","protocol_conf.dubbo.getty_session_param.tcp_w_buf_size":"65536","protocol_conf.dubbo.getty_session_param.tcp_r_buf_size":"262144","registries.shanghaizk.password":"","application_config.organization":"demo","registries.shanghaizk.protocol":"zookeeper","protocol_conf.dubbo.getty_session_param.tcp_keep_alive":"true","registries.hangzhouzk.address":"127.0.0.1:2181","application.environment":"dev","services.UserProvider.protocol":"dubbo","application.organization":"ikurento.com","protocol_conf.dubbo.getty_session_param.pkg_wq_size":"512","services.UserProvider.methods[0].loadbalance":"random"},"releaseKey":"20191104105242-0f13805d89f834a4"}`
) )
func initApollo() *httptest.Server { func initApollo() *httptest.Server {
...@@ -88,6 +92,10 @@ func runMockConfigServer(handlerMap map[string]func(http.ResponseWriter, *http.R ...@@ -88,6 +92,10 @@ func runMockConfigServer(handlerMap map[string]func(http.ResponseWriter, *http.R
} }
func Test_GetConfig(t *testing.T) { func Test_GetConfig(t *testing.T) {
initMockApollo(t)
}
func initMockApollo(t *testing.T) *apolloDynamicConfiguration {
c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{ c := &config.BaseConfig{ConfigCenterConfig: &config.ConfigCenterConfig{
Protocol: "apollo", Protocol: "apollo",
Address: "106.12.25.204:8080", Address: "106.12.25.204:8080",
...@@ -107,4 +115,52 @@ func Test_GetConfig(t *testing.T) { ...@@ -107,4 +115,52 @@ func Test_GetConfig(t *testing.T) {
mapContent, err := configuration.Parser().Parse(configs) mapContent, err := configuration.Parser().Parse(configs)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "ikurento.com", mapContent["application.organization"]) assert.Equal(t, "ikurento.com", mapContent["application.organization"])
return configuration
}
func TestAddListener(t *testing.T) {
listener := &apolloDataListener{}
listener.wg.Add(1)
apollo := initMockApollo(t)
mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"11111"},"releaseKey":"20191104105242-0f13805d89f834a4"}`
apollo.AddListener(mockNamespace, listener)
listener.wg.Wait()
assert.Equal(t, "registries.hangzhouzk.username", listener.event)
assert.Greater(t, listener.count, 0)
}
func TestRemoveListener(t *testing.T) {
listener := &apolloDataListener{}
apollo := initMockApollo(t)
mockConfigRes = `{"appId":"testApplication_yang","cluster":"default","namespaceName":"mockDubbog.properties","configurations":{"registries.hangzhouzk.username":"11111"},"releaseKey":"20191104105242-0f13805d89f834a4"}`
apollo.AddListener(mockNamespace, listener)
apollo.RemoveListener(mockNamespace, listener)
assert.Equal(t, "", listener.event)
listenerCount := 0
apollo.listeners.Range(func(key, value interface{}) bool {
apolloListener := value.(*apolloListener)
for e := range apolloListener.listeners {
fmt.Println(e)
listenerCount++
}
return true
})
assert.Equal(t, listenerCount, 0)
assert.Equal(t, listener.count, 0)
}
type apolloDataListener struct {
wg sync.WaitGroup
count int
event string
}
func (l *apolloDataListener) Process(configType *config_center.ConfigChangeEvent) {
if configType.ConfigType != remoting.EventTypeUpdate {
return
}
fmt.Println("process!!!!!")
l.wg.Done()
l.count++
l.event = configType.Key
} }
...@@ -19,12 +19,25 @@ package apollo ...@@ -19,12 +19,25 @@ package apollo
import ( import (
"github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center"
"github.com/zouyx/agollo"
) )
type apolloListener struct { type apolloListener struct {
listeners map[config_center.ConfigurationListener]struct{} listeners map[config_center.ConfigurationListener]struct{}
} }
func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) {
for key, change := range changeEvent.Changes {
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: getChangeType(change.ChangeType),
Key: key,
Value: change.NewValue,
})
}
}
}
func NewApolloListener() *apolloListener { func NewApolloListener() *apolloListener {
return &apolloListener{ return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0), listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
...@@ -34,6 +47,7 @@ func NewApolloListener() *apolloListener { ...@@ -34,6 +47,7 @@ func NewApolloListener() *apolloListener {
func (al *apolloListener) AddListener(l config_center.ConfigurationListener) { func (al *apolloListener) AddListener(l config_center.ConfigurationListener) {
if _, ok := al.listeners[l]; !ok { if _, ok := al.listeners[l]; !ok {
al.listeners[l] = struct{}{} al.listeners[l] = struct{}{}
agollo.AddChangeListener(al)
} }
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment