Skip to content
Snippets Groups Projects
Unverified Commit 5a135e6c authored by gaoxinge's avatar gaoxinge Committed by GitHub
Browse files

Merge pull request #716 from zouyx/feature/merge1.4

Rft : merge 1.4
parents e3dd46ff a007d324
No related branches found
No related tags found
No related merge requests found
...@@ -24,7 +24,6 @@ import ( ...@@ -24,7 +24,6 @@ import (
import ( import (
"github.com/creasty/defaults" "github.com/creasty/defaults"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
) )
...@@ -34,6 +33,10 @@ import ( ...@@ -34,6 +33,10 @@ import (
"github.com/apache/dubbo-go/common/yaml" "github.com/apache/dubbo-go/common/yaml"
) )
const (
MaxWheelTimeSpan = 900e9 // 900s, 15 minute
)
///////////////////////// /////////////////////////
// consumerConfig // consumerConfig
///////////////////////// /////////////////////////
...@@ -107,9 +110,9 @@ func ConsumerInit(confConFile string) error { ...@@ -107,9 +110,9 @@ func ConsumerInit(confConFile string) error {
if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil { if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout) return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout)
} }
if consumerConfig.RequestTimeout >= time.Duration(getty.MaxWheelTimeSpan) { if consumerConfig.RequestTimeout >= time.Duration(MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "request_timeout %s should be less than %s", return perrors.WithMessagef(err, "request_timeout %s should be less than %s",
consumerConfig.Request_Timeout, time.Duration(getty.MaxWheelTimeSpan)) consumerConfig.Request_Timeout, time.Duration(MaxWheelTimeSpan))
} }
} }
if consumerConfig.Connect_Timeout != "" { if consumerConfig.Connect_Timeout != "" {
......
...@@ -27,7 +27,9 @@ import ( ...@@ -27,7 +27,9 @@ import (
import ( import (
gxset "github.com/dubbogo/gost/container/set" gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
"github.com/zouyx/agollo" "github.com/zouyx/agollo/v3"
agolloConstant "github.com/zouyx/agollo/v3/constant"
"github.com/zouyx/agollo/v3/env/config"
) )
import ( import (
...@@ -35,19 +37,18 @@ import ( ...@@ -35,19 +37,18 @@ import (
"github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/constant"
cc "github.com/apache/dubbo-go/config_center" cc "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser" "github.com/apache/dubbo-go/config_center/parser"
"github.com/apache/dubbo-go/remoting"
) )
const ( const (
apolloProtocolPrefix = "http://" apolloProtocolPrefix = "http://"
apolloConfigFormat = "%s.%s" apolloConfigFormat = "%s%s"
) )
type apolloConfiguration struct { type apolloConfiguration struct {
url *common.URL url *common.URL
listeners sync.Map listeners sync.Map
appConf *agollo.AppConfig appConf *config.AppConfig
parser parser.ConfigurationParser parser parser.ConfigurationParser
} }
...@@ -60,31 +61,20 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) { ...@@ -60,31 +61,20 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) {
appId := url.GetParam(constant.CONFIG_APP_ID_KEY, "") appId := url.GetParam(constant.CONFIG_APP_ID_KEY, "")
namespaces := getProperties(url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP)) namespaces := getProperties(url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP))
c.appConf = &agollo.AppConfig{ c.appConf = &config.AppConfig{
AppId: appId, AppID: appId,
Cluster: configCluster, Cluster: configCluster,
NamespaceName: namespaces, NamespaceName: namespaces,
Ip: configAddr, IP: configAddr,
} }
agollo.InitCustomConfig(func() (*agollo.AppConfig, error) { agollo.InitCustomConfig(func() (*config.AppConfig, error) {
return c.appConf, nil return c.appConf, nil
}) })
return c, agollo.Start() return c, agollo.Start()
} }
func getChangeType(change agollo.ConfigChangeType) remoting.EventType {
switch change {
case agollo.ADDED:
return remoting.EventTypeAdd
case agollo.DELETED:
return remoting.EventTypeDel
default:
return remoting.EventTypeUpdate
}
}
func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationListener, opts ...cc.Option) { func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationListener, opts ...cc.Option) {
k := &cc.Options{} k := &cc.Options{}
for _, opt := range opts { for _, opt := range opts {
...@@ -92,7 +82,7 @@ func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationL ...@@ -92,7 +82,7 @@ func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationL
} }
key = k.Group + key key = k.Group + key
l, _ := c.listeners.LoadOrStore(key, NewApolloListener()) l, _ := c.listeners.LoadOrStore(key, newApolloListener())
l.(*apolloListener).AddListener(listener) l.(*apolloListener).AddListener(listener)
} }
...@@ -110,10 +100,10 @@ func (c *apolloConfiguration) RemoveListener(key string, listener cc.Configurati ...@@ -110,10 +100,10 @@ func (c *apolloConfiguration) RemoveListener(key string, listener cc.Configurati
} }
func getProperties(namespace string) string { func getProperties(namespace string) string {
return getNamespaceName(namespace, agollo.Properties) return getNamespaceName(namespace, agolloConstant.Properties)
} }
func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat) string { func getNamespaceName(namespace string, configFileFormat agolloConstant.ConfigFileFormat) string {
return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat) return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat)
} }
...@@ -148,7 +138,7 @@ func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (stri ...@@ -148,7 +138,7 @@ func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (stri
if config == nil { if config == nil {
return "", perrors.New(fmt.Sprintf("nothing in namespace:%s ", key)) return "", perrors.New(fmt.Sprintf("nothing in namespace:%s ", key))
} }
return config.GetContent(agollo.Properties), nil return config.GetContent(), nil
} }
func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string { func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string {
......
...@@ -202,9 +202,9 @@ func initMockApollo(t *testing.T) *apolloConfiguration { ...@@ -202,9 +202,9 @@ func initMockApollo(t *testing.T) *apolloConfiguration {
return configuration return configuration
} }
func TestAddListener(t *testing.T) { func TestListener(t *testing.T) {
listener := &apolloDataListener{} listener := &apolloDataListener{}
listener.wg.Add(1) listener.wg.Add(2)
apollo := initMockApollo(t) apollo := initMockApollo(t)
mockConfigRes = `{ mockConfigRes = `{
"appId": "testApplication_yang", "appId": "testApplication_yang",
...@@ -215,28 +215,14 @@ func TestAddListener(t *testing.T) { ...@@ -215,28 +215,14 @@ func TestAddListener(t *testing.T) {
}, },
"releaseKey": "20191104105242-0f13805d89f834a4" "releaseKey": "20191104105242-0f13805d89f834a4"
}` }`
//test add
apollo.AddListener(mockNamespace, listener) apollo.AddListener(mockNamespace, listener)
listener.wg.Wait() listener.wg.Wait()
assert.Equal(t, "registries.hangzhouzk.username", listener.event) assert.Equal(t, "mockDubbog.properties", listener.event)
assert.Greater(t, listener.count, 0) assert.Greater(t, listener.count, 0)
deleteMockJson(t)
}
func TestRemoveListener(t *testing.T) { //test remove
listener := &apolloDataListener{}
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
"cluster": "default",
"namespaceName": "mockDubbog.properties",
"configurations": {
"registries.hangzhouzk.username": "11111"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
apollo.AddListener(mockNamespace, listener)
apollo.RemoveListener(mockNamespace, listener) apollo.RemoveListener(mockNamespace, listener)
assert.Equal(t, "", listener.event)
listenerCount := 0 listenerCount := 0
apollo.listeners.Range(func(_, value interface{}) bool { apollo.listeners.Range(func(_, value interface{}) bool {
apolloListener := value.(*apolloListener) apolloListener := value.(*apolloListener)
...@@ -247,7 +233,6 @@ func TestRemoveListener(t *testing.T) { ...@@ -247,7 +233,6 @@ func TestRemoveListener(t *testing.T) {
return true return true
}) })
assert.Equal(t, listenerCount, 0) assert.Equal(t, listenerCount, 0)
assert.Equal(t, listener.count, 0)
deleteMockJson(t) deleteMockJson(t)
} }
......
...@@ -18,34 +18,48 @@ ...@@ -18,34 +18,48 @@
package apollo package apollo
import ( import (
"github.com/zouyx/agollo" "github.com/zouyx/agollo/v3"
"github.com/zouyx/agollo/v3/storage"
"gopkg.in/yaml.v2"
) )
import ( import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center" "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
) )
type apolloListener struct { type apolloListener struct {
listeners map[config_center.ConfigurationListener]struct{} listeners map[config_center.ConfigurationListener]struct{}
} }
// NewApolloListener creates a new apolloListener // nolint
func NewApolloListener() *apolloListener { func newApolloListener() *apolloListener {
return &apolloListener{ return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0), listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
} }
} }
// OnChange process each listener // OnChange process each listener
func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) { func (a *apolloListener) OnChange(changeEvent *storage.ChangeEvent) {
for key, change := range changeEvent.Changes {
for listener := range a.listeners { }
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: getChangeType(change.ChangeType), // OnNewestChange process each listener by all changes
Key: key, func (a *apolloListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {
Value: change.NewValue, b, err := yaml.Marshal(changeEvent.Changes)
}) if err != nil {
} logger.Errorf("apollo onNewestChange err %+v",
err)
return
}
content := string(b)
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: remoting.EventTypeUpdate,
Key: changeEvent.Namespace,
Value: content,
})
} }
} }
......
...@@ -98,6 +98,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error { ...@@ -98,6 +98,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
} }
nacosAddresses := strings.Split(url.Location, ",") nacosAddresses := strings.Split(url.Location, ",")
if container.NacosClient() == nil { if container.NacosClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout, url) newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout, url)
if err != nil { if err != nil {
logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}", logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
...@@ -115,6 +116,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error { ...@@ -115,6 +116,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
} }
container.NacosClient().SetClient(&configClient) container.NacosClient().SetClient(&configClient)
} }
return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL) return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
...@@ -167,8 +169,9 @@ func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url commo ...@@ -167,8 +169,9 @@ func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url commo
"serverConfigs": svrConfList, "serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{ "clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(int32(timeout / time.Millisecond)), TimeoutMs: uint64(int32(timeout / time.Millisecond)),
ListenInterval: uint64(int32(timeout / time.Millisecond)),
NotLoadCacheAtStart: true, NotLoadCacheAtStart: true,
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, logDir), LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""), CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""), Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
Username: url.GetParam(constant.NACOS_USERNAME, ""), Username: url.GetParam(constant.NACOS_USERNAME, ""),
......
...@@ -48,6 +48,7 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent ...@@ -48,6 +48,7 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent
}) })
if err != nil { if err != nil {
logger.Errorf("nacos : listen config fail, error:%v ", err) logger.Errorf("nacos : listen config fail, error:%v ", err)
return
} }
newListener := make(map[config_center.ConfigurationListener]context.CancelFunc) newListener := make(map[config_center.ConfigurationListener]context.CancelFunc)
newListener[listener] = cancel newListener[listener] = cancel
......
module github.com/apache/dubbo-go module github.com/apache/dubbo-go
require ( require (
cloud.google.com/go v0.39.0 // indirect
github.com/Microsoft/go-winio v0.4.13 // indirect github.com/Microsoft/go-winio v0.4.13 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/Workiva/go-datastructures v1.0.50 github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/apache/dubbo-go-hessian2 v1.6.1 github.com/apache/dubbo-go-hessian2 v1.6.2
github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/creasty/defaults v1.3.0 github.com/creasty/defaults v1.3.0
github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-connections v0.4.0 // indirect
...@@ -47,14 +45,12 @@ require ( ...@@ -47,14 +45,12 @@ require (
github.com/prometheus/client_golang v1.1.0 github.com/prometheus/client_golang v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/shirou/gopsutil v2.19.9+incompatible // indirect github.com/shirou/gopsutil v2.19.9+incompatible // indirect
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1 github.com/stretchr/testify v1.5.1
github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8 github.com/zouyx/agollo/v3 v3.4.4
go.etcd.io/bbolt v1.3.4 // indirect go.etcd.io/bbolt v1.3.4 // indirect
go.uber.org/atomic v1.6.0 go.uber.org/atomic v1.6.0
go.uber.org/zap v1.15.0 go.uber.org/zap v1.15.0
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 // indirect
google.golang.org/grpc v1.23.0 google.golang.org/grpc v1.23.0
gopkg.in/yaml.v2 v2.2.8 gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.16.9 k8s.io/api v0.16.9
......
This diff is collapsed.
...@@ -22,10 +22,13 @@ import ( ...@@ -22,10 +22,13 @@ import (
) )
import ( import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
) )
import (
"github.com/apache/dubbo-go/config"
)
type ( type (
// GettySessionParam is session configuration for getty. // GettySessionParam is session configuration for getty.
GettySessionParam struct { GettySessionParam struct {
...@@ -178,9 +181,9 @@ func (c *ClientConfig) CheckValidity() error { ...@@ -178,9 +181,9 @@ func (c *ClientConfig) CheckValidity() error {
return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod) return perrors.WithMessagef(err, "time.ParseDuration(HeartbeatPeroid{%#v})", c.HeartbeatPeriod)
} }
if c.heartbeatPeriod >= time.Duration(getty.MaxWheelTimeSpan) { if c.heartbeatPeriod >= time.Duration(config.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "heartbeat_period %s should be less than %s", return perrors.WithMessagef(err, "heartbeat_period %s should be less than %s",
c.HeartbeatPeriod, time.Duration(getty.MaxWheelTimeSpan)) c.HeartbeatPeriod, time.Duration(config.MaxWheelTimeSpan))
} }
if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil { if c.sessionTimeout, err = time.ParseDuration(c.SessionTimeout); err != nil {
...@@ -198,9 +201,9 @@ func (c *ServerConfig) CheckValidity() error { ...@@ -198,9 +201,9 @@ func (c *ServerConfig) CheckValidity() error {
return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout) return perrors.WithMessagef(err, "time.ParseDuration(SessionTimeout{%#v})", c.SessionTimeout)
} }
if c.sessionTimeout >= time.Duration(getty.MaxWheelTimeSpan) { if c.sessionTimeout >= time.Duration(config.MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "session_timeout %s should be less than %s", return perrors.WithMessagef(err, "session_timeout %s should be less than %s",
c.SessionTimeout, time.Duration(getty.MaxWheelTimeSpan)) c.SessionTimeout, time.Duration(config.MaxWheelTimeSpan))
} }
return perrors.WithStack(c.GettySessionParam.CheckValidity()) return perrors.WithStack(c.GettySessionParam.CheckValidity())
......
...@@ -178,6 +178,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte ...@@ -178,6 +178,7 @@ func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporte
if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
reg = getRegistry(registryUrl) reg = getRegistry(registryUrl)
proto.registries.Store(registryUrl.Key(), reg) proto.registries.Store(registryUrl.Key(), reg)
logger.Infof("Export proto:%p registries address:%p", proto, proto.registries)
} else { } else {
reg = regI.(registry.Registry) reg = regI.(registry.Registry)
} }
...@@ -334,14 +335,12 @@ func (proto *registryProtocol) Destroy() { ...@@ -334,14 +335,12 @@ func (proto *registryProtocol) Destroy() {
ivk.Destroy() ivk.Destroy()
} }
proto.invokers = []protocol.Invoker{} proto.invokers = []protocol.Invoker{}
proto.bounds.Range(func(key, value interface{}) bool { proto.bounds.Range(func(key, value interface{}) bool {
exporter := value.(protocol.Exporter) exporter := value.(protocol.Exporter)
exporter.Unexport() exporter.Unexport()
proto.bounds.Delete(key) proto.bounds.Delete(key)
return true return true
}) })
proto.registries.Range(func(key, value interface{}) bool { proto.registries.Range(func(key, value interface{}) bool {
reg := value.(registry.Registry) reg := value.(registry.Registry)
if reg.IsAvailable() { if reg.IsAvailable() {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment