Skip to content
Snippets Groups Projects
Commit 58cbd1db authored by 邹毅贤's avatar 邹毅贤
Browse files

add comment

parent e27fb3b8
No related branches found
No related tags found
No related merge requests found
......@@ -19,52 +19,58 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
const (
ConnDelay = 3
MaxFailTimes = 15
)
// NacosClient Nacos client
type NacosClient struct {
name string
NacosAddrs []string
sync.Mutex // for Client
Client *config_client.IConfigClient
client *config_client.IConfigClient
exit chan struct{}
Timeout time.Duration
once sync.Once
onceClose func()
}
type Option func(*Options)
// Client Get Client
func (n *NacosClient) Client() *config_client.IConfigClient {
return n.client
}
// SetClient Set client
func (n *NacosClient) SetClient(client *config_client.IConfigClient) {
n.Lock()
n.client = client
n.Unlock()
}
type option func(*options)
type Options struct {
type options struct {
nacosName string
client *NacosClient
}
func WithNacosName(name string) Option {
return func(opt *Options) {
// WithNacosName Set nacos name
func WithNacosName(name string) option {
return func(opt *options) {
opt.nacosName = name
}
}
func ValidateNacosClient(container nacosClientFacade, opts ...Option) error {
// ValidateNacosClient Validate nacos client , if null then create it
func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
var (
err error
)
opions := &Options{}
os := &options{}
for _, opt := range opts {
opt(opions)
opt(os)
}
err = nil
lock := container.NacosClientLock()
url := container.GetUrl()
lock.Lock()
defer lock.Unlock()
if container.NacosClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
......@@ -74,16 +80,16 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error {
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
nacosAddresses := strings.Split(url.Location, ",")
newClient, err := newNacosClient(opions.nacosName, nacosAddresses, timeout)
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout)
if err != nil {
logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
opions.nacosName, url.Location, timeout.String(), err)
os.nacosName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.SetNacosClient(newClient)
}
if container.NacosClient().Client == nil {
if container.NacosClient().Client() == nil {
svrConfList := []nacosconst.ServerConfig{}
for _, nacosAddr := range container.NacosClient().NacosAddrs {
split := strings.Split(nacosAddr, ":")
......@@ -108,7 +114,8 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error {
LogDir: "logs/nacos/log",
},
})
container.NacosClient().Client = &client
container.NacosClient().SetClient(&client)
if err != nil {
logger.Errorf("nacos create config client error:%v", err)
}
......@@ -155,7 +162,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N
LogDir: "logs/nacos/log",
},
})
n.Client = &client
n.SetClient(&client)
if err != nil {
return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs)
}
......@@ -163,6 +170,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N
return n, nil
}
// Done Get nacos client exit signal
func (n *NacosClient) Done() <-chan struct{} {
return n.exit
}
......@@ -178,6 +186,7 @@ func (n *NacosClient) stop() bool {
return false
}
// NacosClientValid Get nacos client valid status
func (n *NacosClient) NacosClientValid() bool {
select {
case <-n.exit:
......@@ -187,7 +196,7 @@ func (n *NacosClient) NacosClientValid() bool {
valid := true
n.Lock()
if n.Client == nil {
if n.Client() == nil {
valid = false
}
n.Unlock()
......@@ -195,14 +204,13 @@ func (n *NacosClient) NacosClientValid() bool {
return valid
}
// Close Close nacos client , then set null
func (n *NacosClient) Close() {
if n == nil {
return
}
n.stop()
n.Lock()
n.Client = nil
n.Unlock()
n.SetClient(nil)
logger.Warnf("nacosClient{name:%s, nacos addr:%s} exit now.", n.name, n.NacosAddrs)
}
......@@ -31,13 +31,18 @@ import (
"github.com/apache/dubbo-go/common/logger"
)
const (
connDelay = 3
maxFailTimes = 15
)
type nacosClientFacade interface {
NacosClient() *NacosClient
SetNacosClient(*NacosClient)
NacosClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
GetDone() chan struct{} //for nacos client control
RestartCallBack() bool
// WaitGroup for wait group control, zk client listener & zk client container
WaitGroup() *sync.WaitGroup
// GetDone For nacos client control RestartCallBack() bool
GetDone() chan struct{}
common.Node
}
......@@ -45,7 +50,7 @@ func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
//TODO nacos HandleClientRestart
// HandleClientRestart Restart client handler
func HandleClientRestart(r nacosClientFacade) {
var (
err error
......@@ -62,12 +67,10 @@ LOOP:
break LOOP
// re-register all services
case <-r.NacosClient().Done():
r.NacosClientLock().Lock()
r.NacosClient().Close()
nacosName := r.NacosClient().name
nacosAddress := r.NacosClient().NacosAddrs
r.SetNacosClient(nil)
r.NacosClientLock().Unlock()
// Connect nacos until success.
failTimes = 0
......@@ -76,19 +79,17 @@ LOOP:
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection nacos.
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * connDelay)): // Prevent crazy reconnection nacos.
}
err = ValidateNacosClient(r, WithNacosName(nacosName))
logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}",
nacosAddress, perrors.WithStack(err))
if err == nil {
if r.RestartCallBack() {
break
}
break
}
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
if maxFailTimes <= failTimes {
failTimes = maxFailTimes
}
}
}
......
......@@ -31,6 +31,7 @@ func init() {
type nacosDynamicConfigurationFactory struct {
}
// GetDynamicConfiguration Get Configuration with URL
func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) {
dynamicConfiguration, err := newNacosDynamicConfiguration(url)
if err != nil {
......
......@@ -34,7 +34,7 @@ import (
"github.com/apache/dubbo-go/config_center/parser"
)
const NacosClientName = "nacos config_center"
const nacosClientName = "nacos config_center"
type nacosDynamicConfiguration struct {
url *common.URL
......@@ -53,7 +53,7 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration,
url: url,
done: make(chan struct{}),
}
err := ValidateNacosClient(c, WithNacosName(NacosClientName))
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
if err != nil {
logger.Errorf("nacos client start error ,error message is %v", err)
return nil, err
......@@ -64,10 +64,12 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration,
}
// AddListener Add listener
func (n *nacosDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
n.addListener(key, listener)
}
// RemoveListener Remove listener
func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
n.removeListener(key, listener)
}
......@@ -79,7 +81,7 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen
for _, opt := range opts {
opt(tmpOpts)
}
content, err := (*n.client.Client).GetConfig(vo.ConfigParam{
content, err := (*n.client.Client()).GetConfig(vo.ConfigParam{
DataId: key,
Group: tmpOpts.Group,
})
......@@ -91,52 +93,61 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen
}
// GetInternalProperty Get properties value by key
func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) {
return n.GetProperties(key, opts...)
}
// GetRule Get router rule
func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) {
return n.GetProperties(key, opts...)
}
// Parser Get Parser
func (n *nacosDynamicConfiguration) Parser() parser.ConfigurationParser {
return n.parser
}
// SetParser Set Parser
func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
n.parser = p
}
// NacosClient Get Nacos Client
func (n *nacosDynamicConfiguration) NacosClient() *NacosClient {
return n.client
}
// SetNacosClient Set Nacos Client
func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) {
n.cltLock.Lock()
n.client = client
n.cltLock.Unlock()
}
func (n *nacosDynamicConfiguration) NacosClientLock() *sync.Mutex {
return &n.cltLock
}
// WaitGroup for wait group control, zk client listener & zk client container
func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &n.wg
}
// GetDone For nacos client control RestartCallBack() bool
func (n *nacosDynamicConfiguration) GetDone() chan struct{} {
return n.done
}
// GetUrl Get Url
func (n *nacosDynamicConfiguration) GetUrl() common.URL {
return *n.url
}
// Destroy Destroy configuration instance
func (n *nacosDynamicConfiguration) Destroy() {
close(n.done)
n.wg.Wait()
n.closeConfigs()
}
// IsAvailable Get available status
func (n *nacosDynamicConfiguration) IsAvailable() bool {
select {
case <-n.done:
......@@ -154,7 +165,3 @@ func (r *nacosDynamicConfiguration) closeConfigs() {
r.client.Close()
r.client = nil
}
func (r *nacosDynamicConfiguration) RestartCallBack() bool {
return true
}
......@@ -31,7 +31,7 @@ import (
"github.com/apache/dubbo-go/remoting"
)
func callback(ctx context.Context, listener config_center.ConfigurationListener, namespace, group, dataId, data string) {
func callback(listener config_center.ConfigurationListener, namespace, group, dataId, data string) {
listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate})
}
......@@ -39,13 +39,14 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent
_, loaded := l.keyListeners.Load(key)
if !loaded {
_, cancel := context.WithCancel(context.Background())
(*l.client.Client).ListenConfig(vo.ConfigParam{
err := (*l.client.Client()).ListenConfig(vo.ConfigParam{
DataId: key,
Group: "dubbo",
OnChange: func(namespace, group, dataId, data string) {
go callback(context.TODO(), listener, namespace, group, dataId, data)
go callback(listener, namespace, group, dataId, data)
},
})
logger.Errorf("nacos : listen config fail, error:%v ", err)
newListener := make(map[config_center.ConfigurationListener]context.CancelFunc)
newListener[listener] = cancel
l.keyListeners.Store(key, newListener)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment