diff --git a/registry/base_registry.go b/registry/base_registry.go index 74ec6b4109ef14f22938374489835134ce317cdc..bd3a14bb5c9ed0bbd12b5c9558a94a0777e9d2b8 100644 --- a/registry/base_registry.go +++ b/registry/base_registry.go @@ -84,12 +84,11 @@ type BaseRegistry struct { context context.Context facadeBasedRegistry FacadeBasedRegistry *common.URL - birth int64 // time of file birth, seconds since Epoch; 0 if unknown - wg sync.WaitGroup // wg+done for zk restart - done chan struct{} - - cltLock sync.Mutex - services map[string]common.URL // service name + protocol -> service config + birth int64 // time of file birth, seconds since Epoch; 0 if unknown + wg sync.WaitGroup // wg+done for zk restart + done chan struct{} + cltLock sync.Mutex //ctl lock is a lock for services map + services map[string]common.URL // service name + protocol -> service config, for store the service registered } func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry { @@ -98,17 +97,22 @@ func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBa r.done = make(chan struct{}) r.services = make(map[string]common.URL) r.facadeBasedRegistry = facadeRegistry - r.wg.Add(1) return r } func (r *BaseRegistry) GetUrl() common.URL { return *r.URL } + +//for graceful down func (r *BaseRegistry) Destroy() { + //first step close registry's all listeners r.facadeBasedRegistry.CloseListener() + // then close r.done to notify other program who listen to it close(r.done) + // wait waitgroup done (wait listeners outside close over) r.wg.Wait() + //close registry client r.closeRegisters() } @@ -165,6 +169,7 @@ func (r *BaseRegistry) Register(conf common.URL) error { return nil } +//get service path stored in url func (r *BaseRegistry) service(c common.URL) string { return url.QueryEscape(c.Service()) } @@ -322,7 +327,7 @@ func sleepWait(n int) { time.Sleep(wait) } -//subscribe from registry +//Subscribe from registry func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { n := 0 for { @@ -362,11 +367,13 @@ func (r *BaseRegistry) closeRegisters() { r.cltLock.Lock() defer r.cltLock.Unlock() logger.Infof("begin to close provider client") - // Close the old client first to close the tmp node. + // Close and remove(set to nil) the registry client r.facadeBasedRegistry.CloseAndNilClient() + // reset the services map r.services = nil } +//judge to is registry not closed by chan r.done func (r *BaseRegistry) IsAvailable() bool { select { case <-r.done: @@ -376,10 +383,12 @@ func (r *BaseRegistry) IsAvailable() bool { } } +//open for outside add the waitgroup to add some logic before registry destroyed over(graceful down) func (r *BaseRegistry) WaitGroup() *sync.WaitGroup { return &r.wg } +//open for outside to listen the event of registry Destroy() called. func (r *BaseRegistry) Done() chan struct{} { return r.done } diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index de769e8ad68a555e5c4d7c5eba223528ba45c18b..5ed56f6ccf6968222712491c9c432674a04a6c18 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -78,7 +78,7 @@ type configurationListener struct { events chan *config_center.ConfigChangeEvent } -// NewConfigurationListener ... +// NewConfigurationListener for listening the event of etcdv3. func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { // add a new waiter reg.WaitGroup().Add(1) diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index f66abe3ef57f7779d64c0af9970027ed7debb8ed..e1c25768119ea7d7122b9aa22a5f881db44bafd9 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -95,6 +95,7 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { ); err != nil { return nil, err } + r.WaitGroup().Add(1) //etcdv3 client start successful, then wg +1 go etcdv3.HandleClientRestart(r) diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 6dbf588ca0acfc6e1480fb8e4ff35f566e8b2a84..e8952432645eb06a4432eedace5c39367a9f1b7b 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -85,7 +85,7 @@ type RegistryConfigurationListener struct { closeOnce sync.Once } -// NewRegistryConfigurationListener ... +// NewRegistryConfigurationListener for listening the event of zk. func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener { reg.WaitGroup().Add(1) return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false} diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 877c4e3f2320cd2c3f59ff538d22b9b3254aa917..f4e53dcc4219d947fea93a10bccc420811afd2b9 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -78,6 +78,7 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { if err != nil { return nil, err } + r.WaitGroup().Add(1) //zk client start successful, then wg +1 go zookeeper.HandleClientRestart(r) @@ -112,6 +113,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust if err != nil { return nil, nil, err } + r.WaitGroup().Add(1) //zk client start successful, then wg +1 go zookeeper.HandleClientRestart(r) r.InitListeners() return c, r, nil