Skip to content
Snippets Groups Projects
Commit 0b41031c authored by dragondriver's avatar dragondriver Committed by zhenshan.cao
Browse files

Add channelsTimeTicker to proxy (#5462)


Signed-off-by: default avatardragondriver <jiquan.long@zilliz.com>
parent a2dd1647
No related branches found
No related tags found
No related merge requests found
......@@ -59,6 +59,8 @@ type ProxyNode struct {
sched *TaskScheduler
tick *timeTick
chTicker channelsTimeTicker
idAllocator *allocator.IDAllocator
tsoAllocator *TimestampAllocator
segAssigner *SegIDAssigner
......@@ -238,6 +240,14 @@ func (node *ProxyNode) Init() error {
node.tick = newTimeTick(node.ctx, node.tsoAllocator, time.Millisecond*200, node.sched.TaskDoneTest, node.msFactory)
// TODO(dragondriver): read this from config
interval := time.Millisecond * 200
// TODO(dragondriver): use scheduler's method
getStats := func(ch pChan) (pChanStatistics, error) {
return pChanStatistics{}, nil
}
node.chTicker = newChannelsTimeTicker(node.ctx, interval, []string{}, getStats, tsoAllocator)
return nil
}
......@@ -266,6 +276,12 @@ func (node *ProxyNode) Start() error {
node.tick.Start()
log.Debug("start time tick ...")
err = node.chTicker.start()
if err != nil {
return err
}
log.Debug("start channelsTimeTicker")
// Start callbacks
for _, cb := range node.startCallbacks {
cb()
......@@ -286,6 +302,10 @@ func (node *ProxyNode) Stop() error {
node.sched.Close()
node.queryMsgStream.Close()
node.tick.Close()
err := node.chTicker.close()
if err != nil {
return err
}
node.wg.Wait()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment