From a78cd3e2a7a861ac35cc18d9a0f0c7e6015a8af6 Mon Sep 17 00:00:00 2001
From: sunby <bingyi.sun@zilliz.com>
Date: Sat, 27 Mar 2021 14:01:52 +0800
Subject: [PATCH] Add proxy service timetick loop to expire allcoations

Signed-off-by: sunby <bingyi.sun@zilliz.com>
---
 internal/dataservice/param.go  | 10 ++++++++++
 internal/dataservice/server.go | 33 ++++++++++++++++++++++++++++++++-
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go
index 15f7c2d44..c5e9de59d 100644
--- a/internal/dataservice/param.go
+++ b/internal/dataservice/param.go
@@ -34,6 +34,7 @@ type ParamTable struct {
 	SegmentInfoChannelName      string
 	DataServiceSubscriptionName string
 	K2SChannelNames             []string
+	ProxyTimeTickChannelName    string
 
 	SegmentFlushMetaPath string
 	Log                  log.Config
@@ -73,6 +74,7 @@ func (p *ParamTable) Init() {
 		p.initK2SChannelNames()
 		p.initSegmentFlushMetaPath()
 		p.initLogCfg()
+		p.initProxyServiceTimeTickChannelName()
 	})
 }
 
@@ -243,3 +245,11 @@ func (p *ParamTable) initLogCfg() {
 		p.Log.File.Filename = ""
 	}
 }
+
+func (p *ParamTable) initProxyServiceTimeTickChannelName() {
+	ch, err := p.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick")
+	if err != nil {
+		panic(err)
+	}
+	p.ProxyTimeTickChannelName = ch
+}
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index 0cc29b52f..470ba78b9 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -288,9 +288,10 @@ func (s *Server) checkMasterIsHealthy() error {
 
 func (s *Server) startServerLoop() {
 	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
-	s.serverLoopWg.Add(2)
+	s.serverLoopWg.Add(3)
 	go s.startStatsChannel(s.serverLoopCtx)
 	go s.startSegmentFlushChannel(s.serverLoopCtx)
+	go s.startProxyServiceTimeTickLoop(s.serverLoopCtx)
 }
 
 func (s *Server) startStatsChannel(ctx context.Context) {
@@ -354,6 +355,36 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
 	}
 }
 
+func (s *Server) startProxyServiceTimeTickLoop(ctx context.Context) {
+	defer logutil.LogPanic()
+	defer s.serverLoopWg.Done()
+	flushStream, _ := s.msFactory.NewMsgStream(ctx)
+	flushStream.AsConsumer([]string{Params.ProxyTimeTickChannelName}, Params.DataServiceSubscriptionName)
+	flushStream.Start()
+	defer flushStream.Close()
+	for {
+		select {
+		case <-ctx.Done():
+			log.Debug("Proxy service timetick loop shut down")
+		default:
+		}
+		msgPack := flushStream.Consume()
+		s.allocMu.Lock()
+		for _, msg := range msgPack.Msgs {
+			if msg.Type() != commonpb.MsgType_TimeTick {
+				log.Warn("receive unknown msg from proxy service timetick", zap.Stringer("msgType", msg.Type()))
+				continue
+			}
+			tMsg := msg.(*msgstream.TimeTickMsg)
+			traceCtx := context.TODO()
+			if err := s.segAllocator.ExpireAllocations(traceCtx, tMsg.Base.Timestamp); err != nil {
+				log.Error("expire allocations error", zap.Error(err))
+			}
+		}
+		s.allocMu.Unlock()
+	}
+}
+
 func (s *Server) startDDChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
 	ddStream, _ := s.msFactory.NewMsgStream(ctx)
-- 
GitLab