diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go index c75ee43e89b344fe4f240b2316c8f2b557ca59e1..7110a7c63342ee468cb8cc78ff1a219d3d30e9fb 100644 --- a/config_center/nacos/client.go +++ b/config_center/nacos/client.go @@ -19,52 +19,58 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -const ( - ConnDelay = 3 - MaxFailTimes = 15 -) - +// NacosClient Nacos client type NacosClient struct { name string NacosAddrs []string sync.Mutex // for Client - Client *config_client.IConfigClient + client *config_client.IConfigClient exit chan struct{} Timeout time.Duration once sync.Once onceClose func() } -type Option func(*Options) +// Client Get Client +func (n *NacosClient) Client() *config_client.IConfigClient { + return n.client +} + +// SetClient Set client +func (n *NacosClient) SetClient(client *config_client.IConfigClient) { + n.Lock() + n.client = client + n.Unlock() +} + +type option func(*options) -type Options struct { +type options struct { nacosName string client *NacosClient } -func WithNacosName(name string) Option { - return func(opt *Options) { +// WithNacosName Set nacos name +func WithNacosName(name string) option { + return func(opt *options) { opt.nacosName = name } } -func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { +// ValidateNacosClient Validate nacos client , if null then create it +func ValidateNacosClient(container nacosClientFacade, opts ...option) error { var ( err error ) - opions := &Options{} + os := &options{} for _, opt := range opts { - opt(opions) + opt(os) } err = nil - lock := container.NacosClientLock() url := container.GetUrl() - lock.Lock() - defer lock.Unlock() - if container.NacosClient() == nil { //in dubbo ,every registry only connect one node ,so this is []string{r.Address} timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) @@ -74,16 +80,16 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) } nacosAddresses := strings.Split(url.Location, ",") - newClient, err := newNacosClient(opions.nacosName, nacosAddresses, timeout) + newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout) if err != nil { logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}", - opions.nacosName, url.Location, timeout.String(), err) + os.nacosName, url.Location, timeout.String(), err) return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) } container.SetNacosClient(newClient) } - if container.NacosClient().Client == nil { + if container.NacosClient().Client() == nil { svrConfList := []nacosconst.ServerConfig{} for _, nacosAddr := range container.NacosClient().NacosAddrs { split := strings.Split(nacosAddr, ":") @@ -108,7 +114,8 @@ func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { LogDir: "logs/nacos/log", }, }) - container.NacosClient().Client = &client + + container.NacosClient().SetClient(&client) if err != nil { logger.Errorf("nacos create config client error:%v", err) } @@ -155,7 +162,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N LogDir: "logs/nacos/log", }, }) - n.Client = &client + n.SetClient(&client) if err != nil { return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) } @@ -163,6 +170,7 @@ func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*N return n, nil } +// Done Get nacos client exit signal func (n *NacosClient) Done() <-chan struct{} { return n.exit } @@ -178,6 +186,7 @@ func (n *NacosClient) stop() bool { return false } +// NacosClientValid Get nacos client valid status func (n *NacosClient) NacosClientValid() bool { select { case <-n.exit: @@ -187,7 +196,7 @@ func (n *NacosClient) NacosClientValid() bool { valid := true n.Lock() - if n.Client == nil { + if n.Client() == nil { valid = false } n.Unlock() @@ -195,14 +204,13 @@ func (n *NacosClient) NacosClientValid() bool { return valid } +// Close Close nacos client , then set null func (n *NacosClient) Close() { if n == nil { return } n.stop() - n.Lock() - n.Client = nil - n.Unlock() + n.SetClient(nil) logger.Warnf("nacosClient{name:%s, nacos addr:%s} exit now.", n.name, n.NacosAddrs) } diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go index e4f8ab7850ed008aa4788417ca0dff5a8e69cf48..fc83e14eac7fcc51025b54f6daff2553f309312c 100644 --- a/config_center/nacos/facade.go +++ b/config_center/nacos/facade.go @@ -31,13 +31,18 @@ import ( "github.com/apache/dubbo-go/common/logger" ) +const ( + connDelay = 3 + maxFailTimes = 15 +) + type nacosClientFacade interface { NacosClient() *NacosClient SetNacosClient(*NacosClient) - NacosClientLock() *sync.Mutex - WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container - GetDone() chan struct{} //for nacos client control - RestartCallBack() bool + // WaitGroup for wait group control, zk client listener & zk client container + WaitGroup() *sync.WaitGroup + // GetDone For nacos client control RestartCallBack() bool + GetDone() chan struct{} common.Node } @@ -45,7 +50,7 @@ func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second } -//TODO nacos HandleClientRestart +// HandleClientRestart Restart client handler func HandleClientRestart(r nacosClientFacade) { var ( err error @@ -62,12 +67,10 @@ LOOP: break LOOP // re-register all services case <-r.NacosClient().Done(): - r.NacosClientLock().Lock() r.NacosClient().Close() nacosName := r.NacosClient().name nacosAddress := r.NacosClient().NacosAddrs r.SetNacosClient(nil) - r.NacosClientLock().Unlock() // Connect nacos until success. failTimes = 0 @@ -76,19 +79,17 @@ LOOP: case <-r.GetDone(): logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...") break LOOP - case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection nacos. + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * connDelay)): // Prevent crazy reconnection nacos. } err = ValidateNacosClient(r, WithNacosName(nacosName)) logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}", nacosAddress, perrors.WithStack(err)) if err == nil { - if r.RestartCallBack() { - break - } + break } failTimes++ - if MaxFailTimes <= failTimes { - failTimes = MaxFailTimes + if maxFailTimes <= failTimes { + failTimes = maxFailTimes } } } diff --git a/config_center/nacos/factory.go b/config_center/nacos/factory.go index 62c8835fa534ed0f144138c87abf0ba8a449a2c1..3de91ea013df0c6bef8d70c741ff840ba3b77572 100644 --- a/config_center/nacos/factory.go +++ b/config_center/nacos/factory.go @@ -31,6 +31,7 @@ func init() { type nacosDynamicConfigurationFactory struct { } +// GetDynamicConfiguration Get Configuration with URL func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { dynamicConfiguration, err := newNacosDynamicConfiguration(url) if err != nil { diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go index 9519523b17095178d03d5b9c6a588f8654cf18b6..d4ff3d9e9da2ce4c57ea4c10f885815de4f3c168 100644 --- a/config_center/nacos/impl.go +++ b/config_center/nacos/impl.go @@ -34,7 +34,7 @@ import ( "github.com/apache/dubbo-go/config_center/parser" ) -const NacosClientName = "nacos config_center" +const nacosClientName = "nacos config_center" type nacosDynamicConfiguration struct { url *common.URL @@ -53,7 +53,7 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, url: url, done: make(chan struct{}), } - err := ValidateNacosClient(c, WithNacosName(NacosClientName)) + err := ValidateNacosClient(c, WithNacosName(nacosClientName)) if err != nil { logger.Errorf("nacos client start error ,error message is %v", err) return nil, err @@ -64,10 +64,12 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, } +// AddListener Add listener func (n *nacosDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { n.addListener(key, listener) } +// RemoveListener Remove listener func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { n.removeListener(key, listener) } @@ -79,7 +81,7 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen for _, opt := range opts { opt(tmpOpts) } - content, err := (*n.client.Client).GetConfig(vo.ConfigParam{ + content, err := (*n.client.Client()).GetConfig(vo.ConfigParam{ DataId: key, Group: tmpOpts.Group, }) @@ -91,52 +93,61 @@ func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_cen } +// GetInternalProperty Get properties value by key func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { return n.GetProperties(key, opts...) } +// GetRule Get router rule func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { return n.GetProperties(key, opts...) } +// Parser Get Parser func (n *nacosDynamicConfiguration) Parser() parser.ConfigurationParser { return n.parser } +// SetParser Set Parser func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) { n.parser = p } +// NacosClient Get Nacos Client func (n *nacosDynamicConfiguration) NacosClient() *NacosClient { return n.client } +// SetNacosClient Set Nacos Client func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) { + n.cltLock.Lock() n.client = client + n.cltLock.Unlock() } -func (n *nacosDynamicConfiguration) NacosClientLock() *sync.Mutex { - return &n.cltLock -} - +// WaitGroup for wait group control, zk client listener & zk client container func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup { return &n.wg } +// GetDone For nacos client control RestartCallBack() bool func (n *nacosDynamicConfiguration) GetDone() chan struct{} { return n.done } +// GetUrl Get Url func (n *nacosDynamicConfiguration) GetUrl() common.URL { return *n.url } +// Destroy Destroy configuration instance func (n *nacosDynamicConfiguration) Destroy() { close(n.done) n.wg.Wait() n.closeConfigs() } +// IsAvailable Get available status func (n *nacosDynamicConfiguration) IsAvailable() bool { select { case <-n.done: @@ -154,7 +165,3 @@ func (r *nacosDynamicConfiguration) closeConfigs() { r.client.Close() r.client = nil } - -func (r *nacosDynamicConfiguration) RestartCallBack() bool { - return true -} diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go index ea03e718b7e284aafbe542d2a3f38c19f28dd40a..90af27dc0bc0cc0d226489ac62154c1060912b0b 100644 --- a/config_center/nacos/listener.go +++ b/config_center/nacos/listener.go @@ -31,7 +31,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) -func callback(ctx context.Context, listener config_center.ConfigurationListener, namespace, group, dataId, data string) { +func callback(listener config_center.ConfigurationListener, namespace, group, dataId, data string) { listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) } @@ -39,13 +39,14 @@ func (l *nacosDynamicConfiguration) addListener(key string, listener config_cent _, loaded := l.keyListeners.Load(key) if !loaded { _, cancel := context.WithCancel(context.Background()) - (*l.client.Client).ListenConfig(vo.ConfigParam{ + err := (*l.client.Client()).ListenConfig(vo.ConfigParam{ DataId: key, Group: "dubbo", OnChange: func(namespace, group, dataId, data string) { - go callback(context.TODO(), listener, namespace, group, dataId, data) + go callback(listener, namespace, group, dataId, data) }, }) + logger.Errorf("nacos : listen config fail, error:%v ", err) newListener := make(map[config_center.ConfigurationListener]context.CancelFunc) newListener[listener] = cancel l.keyListeners.Store(key, newListener)