diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index b257da8d2236b3540f6bf4c5a9eea14fdb7ef38d..04a2e6faedd7944ee510c8f8fca91f4c60deec75 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -59,6 +59,8 @@ type ProxyNode struct { sched *TaskScheduler tick *timeTick + chTicker channelsTimeTicker + idAllocator *allocator.IDAllocator tsoAllocator *TimestampAllocator segAssigner *SegIDAssigner @@ -238,6 +240,14 @@ func (node *ProxyNode) Init() error { node.tick = newTimeTick(node.ctx, node.tsoAllocator, time.Millisecond*200, node.sched.TaskDoneTest, node.msFactory) + // TODO(dragondriver): read this from config + interval := time.Millisecond * 200 + // TODO(dragondriver): use scheduler's method + getStats := func(ch pChan) (pChanStatistics, error) { + return pChanStatistics{}, nil + } + node.chTicker = newChannelsTimeTicker(node.ctx, interval, []string{}, getStats, tsoAllocator) + return nil } @@ -266,6 +276,12 @@ func (node *ProxyNode) Start() error { node.tick.Start() log.Debug("start time tick ...") + err = node.chTicker.start() + if err != nil { + return err + } + log.Debug("start channelsTimeTicker") + // Start callbacks for _, cb := range node.startCallbacks { cb() @@ -286,6 +302,10 @@ func (node *ProxyNode) Stop() error { node.sched.Close() node.queryMsgStream.Close() node.tick.Close() + err := node.chTicker.close() + if err != nil { + return err + } node.wg.Wait()