diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 15f7c2d44960ef28bb6b2442c2f9603a7de06876..c5e9de59d46ecca431711e2a2e39e39bd42e1949 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -34,6 +34,7 @@ type ParamTable struct { SegmentInfoChannelName string DataServiceSubscriptionName string K2SChannelNames []string + ProxyTimeTickChannelName string SegmentFlushMetaPath string Log log.Config @@ -73,6 +74,7 @@ func (p *ParamTable) Init() { p.initK2SChannelNames() p.initSegmentFlushMetaPath() p.initLogCfg() + p.initProxyServiceTimeTickChannelName() }) } @@ -243,3 +245,11 @@ func (p *ParamTable) initLogCfg() { p.Log.File.Filename = "" } } + +func (p *ParamTable) initProxyServiceTimeTickChannelName() { + ch, err := p.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick") + if err != nil { + panic(err) + } + p.ProxyTimeTickChannelName = ch +} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 0cc29b52f574d8ab85506a24362c896b4e74a917..470ba78b96534818629cb8a96aa92e341fb58540 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -288,9 +288,10 @@ func (s *Server) checkMasterIsHealthy() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(2) + s.serverLoopWg.Add(3) go s.startStatsChannel(s.serverLoopCtx) go s.startSegmentFlushChannel(s.serverLoopCtx) + go s.startProxyServiceTimeTickLoop(s.serverLoopCtx) } func (s *Server) startStatsChannel(ctx context.Context) { @@ -354,6 +355,36 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { } } +func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + flushStream, _ := s.msFactory.NewMsgStream(ctx) + flushStream.AsConsumer([]string{Params.ProxyTimeTickChannelName}, Params.DataServiceSubscriptionName) + flushStream.Start() + defer flushStream.Close() + for { + select { + case <-ctx.Done(): + log.Debug("Proxy service timetick loop shut down") + default: + } + msgPack := flushStream.Consume() + s.allocMu.Lock() + for _, msg := range msgPack.Msgs { + if msg.Type() != commonpb.MsgType_TimeTick { + log.Warn("receive unknown msg from proxy service timetick", zap.Stringer("msgType", msg.Type())) + continue + } + tMsg := msg.(*msgstream.TimeTickMsg) + traceCtx := context.TODO() + if err := s.segAllocator.ExpireAllocations(traceCtx, tMsg.Base.Timestamp); err != nil { + log.Error("expire allocations error", zap.Error(err)) + } + } + s.allocMu.Unlock() + } +} + func (s *Server) startDDChannel(ctx context.Context) { defer s.serverLoopWg.Done() ddStream, _ := s.msFactory.NewMsgStream(ctx)