Skip to content
Snippets Groups Projects
Commit 4e82b6fa authored by vito.he's avatar vito.he
Browse files

Add: more comment of registry module

parent 9123780c
No related branches found
No related tags found
No related merge requests found
......@@ -84,12 +84,11 @@ type BaseRegistry struct {
context context.Context
facadeBasedRegistry FacadeBasedRegistry
*common.URL
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
cltLock sync.Mutex
services map[string]common.URL // service name + protocol -> service config
birth int64 // time of file birth, seconds since Epoch; 0 if unknown
wg sync.WaitGroup // wg+done for zk restart
done chan struct{}
cltLock sync.Mutex //ctl lock is a lock for services map
services map[string]common.URL // service name + protocol -> service config, for store the service registered
}
func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry {
......@@ -98,17 +97,22 @@ func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBa
r.done = make(chan struct{})
r.services = make(map[string]common.URL)
r.facadeBasedRegistry = facadeRegistry
r.wg.Add(1)
return r
}
func (r *BaseRegistry) GetUrl() common.URL {
return *r.URL
}
//for graceful down
func (r *BaseRegistry) Destroy() {
//first step close registry's all listeners
r.facadeBasedRegistry.CloseListener()
// then close r.done to notify other program who listen to it
close(r.done)
// wait waitgroup done (wait listeners outside close over)
r.wg.Wait()
//close registry client
r.closeRegisters()
}
......@@ -165,6 +169,7 @@ func (r *BaseRegistry) Register(conf common.URL) error {
return nil
}
//get service path stored in url
func (r *BaseRegistry) service(c common.URL) string {
return url.QueryEscape(c.Service())
}
......@@ -322,7 +327,7 @@ func sleepWait(n int) {
time.Sleep(wait)
}
//subscribe from registry
//Subscribe from registry
func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
n := 0
for {
......@@ -362,11 +367,13 @@ func (r *BaseRegistry) closeRegisters() {
r.cltLock.Lock()
defer r.cltLock.Unlock()
logger.Infof("begin to close provider client")
// Close the old client first to close the tmp node.
// Close and remove(set to nil) the registry client
r.facadeBasedRegistry.CloseAndNilClient()
// reset the services map
r.services = nil
}
//judge to is registry not closed by chan r.done
func (r *BaseRegistry) IsAvailable() bool {
select {
case <-r.done:
......@@ -376,10 +383,12 @@ func (r *BaseRegistry) IsAvailable() bool {
}
}
//open for outside add the waitgroup to add some logic before registry destroyed over(graceful down)
func (r *BaseRegistry) WaitGroup() *sync.WaitGroup {
return &r.wg
}
//open for outside to listen the event of registry Destroy() called.
func (r *BaseRegistry) Done() chan struct{} {
return r.done
}
......@@ -78,7 +78,7 @@ type configurationListener struct {
events chan *config_center.ConfigChangeEvent
}
// NewConfigurationListener ...
// NewConfigurationListener for listening the event of etcdv3.
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
......
......@@ -95,6 +95,7 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
); err != nil {
return nil, err
}
r.WaitGroup().Add(1) //etcdv3 client start successful, then wg +1
go etcdv3.HandleClientRestart(r)
......
......@@ -85,7 +85,7 @@ type RegistryConfigurationListener struct {
closeOnce sync.Once
}
// NewRegistryConfigurationListener ...
// NewRegistryConfigurationListener for listening the event of zk.
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
reg.WaitGroup().Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
......
......@@ -78,6 +78,7 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
if err != nil {
return nil, err
}
r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
......@@ -112,6 +113,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust
if err != nil {
return nil, nil, err
}
r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
r.InitListeners()
return c, r, nil
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment