From 7270d18c98ceeac1e10c7acfa378d93a3b1db15d Mon Sep 17 00:00:00 2001
From: sunby <bingyi.sun@zilliz.com>
Date: Thu, 4 Feb 2021 17:31:02 +0800
Subject: [PATCH] Fix timesync startup bugs

Signed-off-by: sunby <bingyi.sun@zilliz.com>
---
 internal/datanode/data_node.go               |  41 ++-
 internal/dataservice/server.go               |  32 +-
 internal/distributed/datanode/service.go     |   1 +
 internal/distributed/queryservice/service.go |   2 +-
 internal/proto/query_service.proto           |   2 +-
 internal/proto/querypb/query_service.pb.go   | 189 +++++------
 internal/querynode/query_node.go             |  22 +-
 internal/queryservice/meta_replica.go        |  88 +-----
 internal/queryservice/querynode.go           |  20 +-
 internal/queryservice/queryservice.go        | 312 +++++--------------
 internal/queryservice/queryservice_test.go   |   3 -
 internal/timesync/time_sync_producer.go      |   8 +-
 internal/timesync/timesync.go                |  22 +-
 13 files changed, 257 insertions(+), 485 deletions(-)

diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index 5599216be..e577c525c 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -56,12 +56,11 @@ type (
 	}
 
 	DataNode struct {
-		ctx     context.Context
-		cancel  context.CancelFunc
-		NodeID  UniqueID
-		Role    string
-		State   internalpb2.StateCode
-		watchDm chan struct{}
+		ctx    context.Context
+		cancel context.CancelFunc
+		NodeID UniqueID
+		Role   string
+		State  internalpb2.StateCode
 
 		dataSyncService *dataSyncService
 		metaService     *metaService
@@ -82,13 +81,11 @@ func NewDataNode(ctx context.Context) *DataNode {
 	Params.Init()
 	ctx2, cancel2 := context.WithCancel(ctx)
 	node := &DataNode{
-		ctx:     ctx2,
-		cancel:  cancel2,
-		NodeID:  Params.NodeID, // GOOSE TODO: How to init
-		Role:    typeutil.DataNodeRole,
-		State:   internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
-		watchDm: make(chan struct{}),
-
+		ctx:             ctx2,
+		cancel:          cancel2,
+		NodeID:          Params.NodeID, // GOOSE TODO: How to init
+		Role:            typeutil.DataNodeRole,
+		State:           internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
 		dataSyncService: nil,
 		metaService:     nil,
 		masterService:   nil,
@@ -138,13 +135,6 @@ func (node *DataNode) Init() error {
 		return errors.Errorf("Register node failed: %v", err)
 	}
 
-	select {
-	case <-time.After(RPCConnectionTimeout):
-		return errors.New("Get DmChannels failed in 30 seconds")
-	case <-node.watchDm:
-		log.Println("insert channel names set")
-	}
-
 	for _, kv := range resp.InitParams.StartParams {
 		switch kv.Key {
 		case "DDChannelName":
@@ -172,10 +162,10 @@ func (node *DataNode) Init() error {
 	node.flushChan = make(chan *flushMsg, chanSize)
 
 	node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc)
-	node.dataSyncService.init()
 	node.metaService = newMetaService(node.ctx, replica, node.masterService)
 
 	node.replica = replica
+	node.dataSyncService.initNodes()
 
 	// --- Opentracing ---
 	cfg := &config.Configuration{
@@ -201,9 +191,14 @@ func (node *DataNode) Init() error {
 
 func (node *DataNode) Start() error {
 	node.metaService.init()
+	return nil
+}
+
+// DataNode is HEALTHY until StartSync() is called
+func (node *DataNode) StartSync() {
+	node.dataSyncService.init()
 	go node.dataSyncService.start()
 	node.State = internalpb2.StateCode_HEALTHY
-	return nil
 }
 
 func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
@@ -224,7 +219,7 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common
 	default:
 		Params.InsertChannelNames = in.GetChannelNames()
 		status.ErrorCode = commonpb.ErrorCode_SUCCESS
-		node.watchDm <- struct{}{}
+		node.StartSync()
 		return status, nil
 	}
 }
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index fcb2c5cce..5b1c54e09 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -94,6 +94,7 @@ type (
 		ddChannelName     string
 		segmentInfoStream msgstream.MsgStream
 		insertChannels    []string
+		ttBarrier         timesync.TimeTickBarrier
 	}
 )
 
@@ -177,23 +178,23 @@ func (s *Server) initSegmentInfoChannel() {
 	s.segmentInfoStream.Start()
 }
 func (s *Server) initMsgProducer() error {
+	var err error
 	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	ttMsgStream, _ := factory.NewMsgStream(s.ctx)
-	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
-	s.ttMsgStream = ttMsgStream
+	if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
+		return err
+	}
+	s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
 	s.ttMsgStream.Start()
-	timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
-	dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
-	k2sStream, _ := factory.NewMsgStream(s.ctx)
-	k2sStream.AsProducer(Params.K2SChannelNames)
-	s.k2sMsgStream = k2sStream
+	if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
+		return err
+	}
+	s.k2sMsgStream.AsProducer(Params.K2SChannelNames)
 	s.k2sMsgStream.Start()
+	dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
 	k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
-	producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher)
-	if err != nil {
+	if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil {
 		return err
 	}
-	s.msgProducer = producer
 	s.msgProducer.Start(s.ctx)
 	return nil
 }
@@ -297,10 +298,11 @@ func (s *Server) checkMasterIsHealthy() error {
 
 func (s *Server) startServerLoop() {
 	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
-	s.serverLoopWg.Add(3)
+	s.serverLoopWg.Add(4)
 	go s.startStatsChannel(s.serverLoopCtx)
 	go s.startSegmentFlushChannel(s.serverLoopCtx)
 	go s.startDDChannel(s.serverLoopCtx)
+	go s.startTTBarrier(s.serverLoopCtx)
 }
 
 func (s *Server) startStatsChannel(ctx context.Context) {
@@ -388,6 +390,12 @@ func (s *Server) startDDChannel(ctx context.Context) {
 	}
 }
 
+func (s *Server) startTTBarrier(ctx context.Context) {
+	defer s.serverLoopWg.Done()
+	s.ttBarrier = timesync.NewHardTimeTickBarrier(ctx, s.ttMsgStream, s.cluster.GetNodeIDs())
+	s.ttBarrier.StartBackgroundLoop()
+}
+
 func (s *Server) waitDataNodeRegister() {
 	log.Println("waiting data node to register")
 	<-s.registerFinishCh
diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go
index 9821405c9..2c605b93f 100644
--- a/internal/distributed/datanode/service.go
+++ b/internal/distributed/datanode/service.go
@@ -76,6 +76,7 @@ func (s *Server) Start() error {
 
 func (s *Server) Stop() error {
 	err := s.core.Stop()
+	s.cancel()
 	s.grpcServer.GracefulStop()
 	return err
 }
diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go
index 06687170d..b6ddde255 100644
--- a/internal/distributed/queryservice/service.go
+++ b/internal/distributed/queryservice/service.go
@@ -35,7 +35,7 @@ type Server struct {
 func (s *Server) Init() error {
 	log.Println("query service init")
 	if err := s.queryService.Init(); err != nil {
-		return err
+		panic(err)
 	}
 	s.queryService.SetEnableGrpc(true)
 	return nil
diff --git a/internal/proto/query_service.proto b/internal/proto/query_service.proto
index aeba557f5..7a8b837b4 100644
--- a/internal/proto/query_service.proto
+++ b/internal/proto/query_service.proto
@@ -124,7 +124,7 @@ message LoadSegmentRequest {
   int64 partitionID = 4;
   repeated int64 segmentIDs = 5;
   repeated int64 fieldIDs = 6;
-  repeated data.SegmentStateInfo segment_states = 7;
+  data.SegmentStateInfo last_segment_state = 7;
 }
 
 message ReleaseSegmentRequest {
diff --git a/internal/proto/querypb/query_service.pb.go b/internal/proto/querypb/query_service.pb.go
index 833984a93..64d1bb5b4 100644
--- a/internal/proto/querypb/query_service.pb.go
+++ b/internal/proto/querypb/query_service.pb.go
@@ -972,16 +972,16 @@ func (m *WatchDmChannelsRequest) GetChannelIDs() []string {
 }
 
 type LoadSegmentRequest struct {
-	Base                 *commonpb.MsgBase          `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
-	DbID                 int64                      `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
-	CollectionID         int64                      `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
-	PartitionID          int64                      `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
-	SegmentIDs           []int64                    `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
-	FieldIDs             []int64                    `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
-	SegmentStates        []*datapb.SegmentStateInfo `protobuf:"bytes,7,rep,name=segment_states,json=segmentStates,proto3" json:"segment_states,omitempty"`
-	XXX_NoUnkeyedLiteral struct{}                   `json:"-"`
-	XXX_unrecognized     []byte                     `json:"-"`
-	XXX_sizecache        int32                      `json:"-"`
+	Base                 *commonpb.MsgBase        `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
+	DbID                 int64                    `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
+	CollectionID         int64                    `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
+	PartitionID          int64                    `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
+	SegmentIDs           []int64                  `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
+	FieldIDs             []int64                  `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
+	LastSegmentState     *datapb.SegmentStateInfo `protobuf:"bytes,7,opt,name=last_segment_state,json=lastSegmentState,proto3" json:"last_segment_state,omitempty"`
+	XXX_NoUnkeyedLiteral struct{}                 `json:"-"`
+	XXX_unrecognized     []byte                   `json:"-"`
+	XXX_sizecache        int32                    `json:"-"`
 }
 
 func (m *LoadSegmentRequest) Reset()         { *m = LoadSegmentRequest{} }
@@ -1051,9 +1051,9 @@ func (m *LoadSegmentRequest) GetFieldIDs() []int64 {
 	return nil
 }
 
-func (m *LoadSegmentRequest) GetSegmentStates() []*datapb.SegmentStateInfo {
+func (m *LoadSegmentRequest) GetLastSegmentState() *datapb.SegmentStateInfo {
 	if m != nil {
-		return m.SegmentStates
+		return m.LastSegmentState
 	}
 	return nil
 }
@@ -1395,90 +1395,91 @@ func init() {
 func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) }
 
 var fileDescriptor_5fcb6756dc1afb8d = []byte{
-	// 1324 bytes of a gzipped FileDescriptorProto
+	// 1331 bytes of a gzipped FileDescriptorProto
 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x5f, 0x73, 0xdb, 0x44,
 	0x10, 0xb7, 0x6c, 0xc7, 0xa9, 0x37, 0xae, 0xed, 0x5e, 0xfe, 0x19, 0x51, 0x4a, 0x39, 0xa0, 0x4d,
-	0x5b, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x69, 0xe3, 0x4e, 0xc6, 0x0c, 0x0d, 0x41, 0x4e, 0xa7,
-	0x43, 0xa0, 0x63, 0x64, 0xe9, 0xe2, 0x5c, 0x6b, 0x49, 0xae, 0xee, 0x9c, 0x34, 0x79, 0x01, 0x66,
-	0x78, 0x64, 0xe0, 0x33, 0x30, 0x30, 0xc0, 0xf0, 0x81, 0x78, 0xe1, 0x05, 0xbe, 0x09, 0xa3, 0x93,
-	0xac, 0x48, 0xf2, 0x39, 0x72, 0xea, 0xa6, 0xe1, 0x4d, 0x77, 0xda, 0xdb, 0xdf, 0xee, 0x6f, 0xf7,
-	0xf6, 0x76, 0x61, 0xfe, 0xe9, 0x80, 0xb8, 0x87, 0x6d, 0x46, 0xdc, 0x7d, 0x6a, 0x90, 0x7a, 0xdf,
-	0x75, 0xb8, 0x83, 0x90, 0x45, 0x7b, 0xfb, 0x03, 0xe6, 0xaf, 0xea, 0x42, 0x42, 0x2d, 0x19, 0x8e,
-	0x65, 0x39, 0xb6, 0xbf, 0xa7, 0x96, 0xa2, 0x12, 0x6a, 0x99, 0xda, 0x9c, 0xb8, 0xb6, 0xde, 0x0b,
-	0xd6, 0xc8, 0xd4, 0xb9, 0x1e, 0xd7, 0x89, 0xbf, 0x81, 0x79, 0x8d, 0x74, 0x29, 0xe3, 0xc4, 0xdd,
-	0x74, 0x4c, 0xa2, 0x91, 0xa7, 0x03, 0xc2, 0x38, 0x7a, 0x0f, 0xf2, 0x1d, 0x9d, 0x91, 0x9a, 0x72,
-	0x55, 0x59, 0x99, 0x5b, 0xbb, 0x5c, 0x8f, 0x21, 0x07, 0x90, 0xf7, 0x59, 0xf7, 0xae, 0xce, 0x88,
-	0x26, 0x24, 0xd1, 0x07, 0x30, 0xab, 0x9b, 0xa6, 0x4b, 0x18, 0xab, 0x65, 0x4f, 0x38, 0x74, 0xc7,
-	0x97, 0xd1, 0x86, 0xc2, 0xf8, 0x27, 0x05, 0x16, 0xe2, 0x16, 0xb0, 0xbe, 0x63, 0x33, 0x82, 0x6e,
-	0x43, 0x81, 0x71, 0x9d, 0x0f, 0x58, 0x60, 0xc4, 0xab, 0x52, 0x7d, 0x2d, 0x21, 0xa2, 0x05, 0xa2,
-	0xe8, 0x2e, 0xcc, 0x51, 0x9b, 0xf2, 0x76, 0x5f, 0x77, 0x75, 0x6b, 0x68, 0xc9, 0x1b, 0xf1, 0x93,
-	0x21, 0x2b, 0x4d, 0x9b, 0xf2, 0x2d, 0x21, 0xa8, 0x01, 0x0d, 0xbf, 0xf1, 0x23, 0x58, 0x6c, 0xed,
-	0x39, 0x07, 0xeb, 0x4e, 0xaf, 0x47, 0x0c, 0x4e, 0x1d, 0xfb, 0xf9, 0x49, 0x41, 0x90, 0x37, 0x3b,
-	0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x89, 0x6f, 0xcc, 0x60, 0x29, 0xa9, 0x7e, 0x1a, 0x8f, 0xdf, 0x82,
-	0x8b, 0x46, 0xa8, 0xaa, 0xd9, 0xf0, 0x7c, 0xce, 0xad, 0xe4, 0xb4, 0xf8, 0x26, 0xfe, 0x4e, 0x81,
-	0xc5, 0x4f, 0x1d, 0xdd, 0x3c, 0x23, 0xa7, 0x10, 0x86, 0x52, 0x14, 0xb0, 0x96, 0x13, 0xff, 0x62,
-	0x7b, 0xf8, 0x7b, 0x05, 0x6a, 0x1a, 0xe9, 0x11, 0x9d, 0x91, 0xf3, 0x34, 0xe3, 0x5b, 0x05, 0x16,
-	0xbc, 0x00, 0x6c, 0xe9, 0x2e, 0xa7, 0xe7, 0x63, 0x42, 0xdf, 0xcf, 0xb0, 0x88, 0x05, 0xd3, 0x64,
-	0x00, 0x86, 0x52, 0x7f, 0xa8, 0xe9, 0x38, 0x01, 0x62, 0x7b, 0xd8, 0x82, 0x4a, 0x88, 0xe6, 0x1d,
-	0x27, 0x0c, 0x5d, 0x85, 0xb9, 0x88, 0x88, 0x00, 0xcc, 0x69, 0xd1, 0x2d, 0xf4, 0x21, 0xcc, 0x78,
-	0x10, 0x44, 0xf8, 0x57, 0x5e, 0xc3, 0xf5, 0xd1, 0xfa, 0x53, 0x8f, 0x6b, 0xd5, 0xfc, 0x03, 0xf8,
-	0x37, 0x05, 0x96, 0x12, 0x78, 0x2f, 0x9d, 0xe5, 0x11, 0x5e, 0xf2, 0x12, 0x5e, 0xfe, 0x54, 0x60,
-	0x79, 0xc4, 0xd0, 0x69, 0x82, 0xb1, 0x03, 0x4b, 0x21, 0x40, 0xdb, 0x24, 0xcc, 0x70, 0x69, 0xdf,
-	0xfb, 0xf6, 0xc3, 0x32, 0xb7, 0xf6, 0x66, 0x3a, 0x89, 0x4c, 0x5b, 0x0c, 0x55, 0x34, 0x22, 0x1a,
-	0xf0, 0xaf, 0x0a, 0x2c, 0x78, 0x97, 0xf8, 0xfc, 0x32, 0x77, 0x22, 0x4e, 0x7f, 0x57, 0x60, 0x39,
-	0xb8, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x56, 0x40, 0x5d, 0x77, 0x89, 0xce, 0xc9, 0xe7, 0x5e, 0x1c,
-	0xd6, 0xf7, 0x74, 0xdb, 0x26, 0xbd, 0xe9, 0x12, 0xe0, 0x3a, 0x54, 0x5c, 0xdf, 0xd9, 0xb6, 0xe1,
-	0xeb, 0x13, 0xa6, 0x17, 0xb5, 0x72, 0xb0, 0x1d, 0xa0, 0xa0, 0xb7, 0xa1, 0xec, 0x12, 0x36, 0xe8,
-	0x1d, 0xcb, 0xe5, 0x84, 0xdc, 0x45, 0x7f, 0x37, 0x10, 0xc3, 0xbf, 0x28, 0xb0, 0x7c, 0xc7, 0x34,
-	0xa3, 0x06, 0x4e, 0x71, 0x97, 0x6e, 0xc1, 0xa5, 0x84, 0x75, 0x01, 0xb5, 0x45, 0xad, 0x1a, 0xb7,
-	0xaf, 0xd9, 0x40, 0x37, 0xa0, 0x1a, 0xb7, 0x30, 0xa0, 0xba, 0xa8, 0x55, 0x62, 0x36, 0x36, 0x1b,
-	0xf8, 0x6f, 0x05, 0x54, 0x8d, 0x58, 0xce, 0x3e, 0x91, 0x1a, 0xfa, 0x5c, 0x4c, 0x0e, 0xbd, 0xcb,
-	0x4e, 0xe7, 0x5d, 0xee, 0x14, 0xde, 0xe5, 0xe5, 0xde, 0x3d, 0x86, 0xa5, 0x87, 0x3a, 0x37, 0xf6,
-	0x1a, 0xd6, 0xf4, 0x11, 0xb8, 0x02, 0x10, 0xe2, 0xf9, 0x45, 0xa1, 0xa8, 0x45, 0x76, 0xf0, 0x1f,
-	0x59, 0x40, 0xde, 0x25, 0x6f, 0x91, 0xae, 0x45, 0x6c, 0xfe, 0xf2, 0x2f, 0x4e, 0xe2, 0x5d, 0xc8,
-	0x8f, 0xbe, 0x0b, 0x57, 0x00, 0x98, 0x6f, 0x9d, 0xe7, 0xc2, 0x8c, 0xb8, 0x58, 0x91, 0x1d, 0xa4,
-	0xc2, 0x85, 0x5d, 0x4a, 0x7a, 0xa6, 0xf7, 0xb7, 0x20, 0xfe, 0x86, 0x6b, 0xf4, 0x09, 0x94, 0x03,
-	0xc9, 0xb6, 0x78, 0x2a, 0x58, 0x6d, 0x56, 0x56, 0x17, 0xbd, 0x4e, 0xb5, 0x1e, 0x50, 0x20, 0x8a,
-	0x62, 0xd3, 0xde, 0x75, 0xb4, 0x8b, 0x2c, 0xb2, 0xc3, 0xf0, 0xbf, 0x0a, 0x2c, 0x06, 0x85, 0xe6,
-	0xdc, 0xd8, 0x9a, 0xa0, 0xcc, 0x4c, 0xc3, 0x17, 0xfe, 0x51, 0x81, 0xe5, 0x75, 0xc7, 0xea, 0x3b,
-	0x76, 0xe8, 0xf7, 0x74, 0xf5, 0xe9, 0x23, 0xff, 0x10, 0x19, 0x36, 0xc7, 0xd7, 0xc6, 0x34, 0xc7,
-	0x49, 0xd0, 0xe0, 0x14, 0xfe, 0x47, 0x81, 0xb9, 0x80, 0x6d, 0x2f, 0x26, 0xe8, 0x32, 0x14, 0x43,
-	0x57, 0x82, 0x26, 0xe2, 0x78, 0x63, 0x84, 0xc2, 0x6c, 0x7a, 0xc2, 0xe5, 0x46, 0x13, 0xee, 0x15,
-	0xb8, 0x60, 0x11, 0xab, 0xcd, 0xe8, 0x11, 0x09, 0xf2, 0x71, 0xd6, 0x22, 0x56, 0x8b, 0x1e, 0x11,
-	0xef, 0x97, 0x3d, 0xb0, 0xda, 0xae, 0x73, 0xe0, 0x31, 0x2b, 0x7e, 0xd9, 0x03, 0x4b, 0x73, 0x0e,
-	0x18, 0x7a, 0x0d, 0x80, 0xda, 0x26, 0x79, 0xd6, 0xb6, 0x75, 0x8b, 0xd4, 0x0a, 0xe2, 0x6a, 0x17,
-	0xc5, 0xce, 0xa6, 0x6e, 0x11, 0x54, 0x83, 0x59, 0xb1, 0x68, 0x36, 0x6a, 0xb3, 0xfe, 0xc1, 0x60,
-	0x89, 0x77, 0x01, 0x45, 0x3c, 0x9c, 0xea, 0xaa, 0x47, 0xe2, 0x9e, 0x4d, 0xc6, 0xdd, 0x6b, 0xca,
-	0xe7, 0x63, 0x40, 0xd3, 0xc4, 0xf5, 0x7d, 0x98, 0xa1, 0xf6, 0xae, 0x33, 0xec, 0x33, 0x5e, 0x97,
-	0xf5, 0x19, 0x51, 0x30, 0x5f, 0xfa, 0xe6, 0x11, 0x94, 0xe3, 0xdd, 0x07, 0x2a, 0xc1, 0x85, 0x4d,
-	0x87, 0xdf, 0x7b, 0x46, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xd3, 0xe1, 0x5b, 0x2e, 0x61, 0xc4,
-	0xe6, 0x55, 0x05, 0x01, 0x14, 0x3e, 0xb3, 0x1b, 0x94, 0x3d, 0xa9, 0x66, 0xd1, 0x7c, 0xd0, 0x54,
-	0xea, 0xbd, 0xa6, 0x7d, 0x9f, 0x58, 0x8e, 0x7b, 0x58, 0xcd, 0x79, 0xc7, 0xc3, 0x55, 0x1e, 0x55,
-	0xa1, 0x14, 0x8a, 0x6c, 0x6c, 0x3d, 0xa8, 0xce, 0xa0, 0x22, 0xcc, 0xf8, 0x9f, 0x85, 0xb5, 0x1f,
-	0x00, 0x4a, 0xe2, 0xb9, 0x68, 0xf9, 0x23, 0x29, 0x32, 0xa0, 0x14, 0x1d, 0x05, 0xd1, 0x75, 0x99,
-	0x13, 0x92, 0x71, 0x55, 0x5d, 0x49, 0x17, 0xf4, 0xb9, 0xc5, 0x19, 0xf4, 0x18, 0x2a, 0xf1, 0xf9,
-	0x8b, 0xa1, 0x1b, 0x52, 0xb2, 0x64, 0x33, 0xa0, 0x7a, 0x73, 0x12, 0xd1, 0x10, 0xab, 0x0b, 0xe5,
-	0x58, 0xa3, 0xcf, 0xd0, 0xca, 0xb8, 0xf3, 0xc9, 0x56, 0x49, 0xbd, 0x31, 0x81, 0x64, 0x08, 0xf4,
-	0x05, 0x94, 0x63, 0x9d, 0xe1, 0x18, 0x20, 0x59, 0xf7, 0xa8, 0x9e, 0x94, 0x5e, 0x38, 0x83, 0xda,
-	0x70, 0x29, 0xd9, 0xcd, 0x31, 0x74, 0x4b, 0x4e, 0xb8, 0xb4, 0xe9, 0x4b, 0x03, 0xd8, 0xf1, 0x6d,
-	0x3f, 0x26, 0x50, 0x1e, 0x0f, 0xe9, 0xf8, 0x9a, 0xa6, 0xfb, 0xeb, 0xd0, 0xf8, 0x88, 0xfa, 0x77,
-	0x4e, 0x30, 0xfe, 0xd4, 0x08, 0x1d, 0x40, 0xa3, 0x2d, 0x24, 0x52, 0xa5, 0x87, 0xee, 0x59, 0x7d,
-	0x7e, 0xa8, 0xd6, 0x65, 0xf0, 0xe3, 0xdb, 0x50, 0x9c, 0x41, 0x0f, 0x01, 0x6d, 0x10, 0xbe, 0x4d,
-	0x2d, 0xb2, 0x4d, 0x8d, 0x27, 0x93, 0x60, 0x24, 0x9e, 0xd3, 0x60, 0xd1, 0xe2, 0x2e, 0xb5, 0xbb,
-	0xb1, 0xb4, 0x59, 0xd8, 0x20, 0xa2, 0xc2, 0x53, 0xc6, 0xa9, 0xc1, 0x5e, 0xa0, 0x6a, 0x47, 0xd8,
-	0x9c, 0x1c, 0x3a, 0x6f, 0x4e, 0x32, 0xfe, 0x04, 0xc4, 0xdf, 0x9a, 0x48, 0x36, 0x04, 0xdc, 0x11,
-	0x80, 0x89, 0x67, 0xeb, 0x44, 0x4f, 0x26, 0x7c, 0xfa, 0x70, 0x06, 0x19, 0x50, 0xf6, 0x78, 0x8a,
-	0x3c, 0x7b, 0xd7, 0xd2, 0xea, 0x6b, 0xe0, 0xc4, 0xf5, 0x54, 0xb9, 0xa1, 0x03, 0x6b, 0x7f, 0x15,
-	0xa0, 0x28, 0x12, 0x40, 0xd4, 0xbe, 0x33, 0x8b, 0xf9, 0x36, 0x54, 0x82, 0x98, 0xbf, 0xc8, 0x70,
-	0xb7, 0x4f, 0xcd, 0xbe, 0x34, 0xbc, 0x63, 0x5a, 0x1d, 0x9c, 0x41, 0x8f, 0xa0, 0x92, 0x18, 0x83,
-	0xe4, 0x45, 0x68, 0xcc, 0xac, 0x94, 0x76, 0x8d, 0x0d, 0x40, 0xa3, 0xf3, 0x0b, 0xaa, 0xcb, 0x2b,
-	0xc5, 0xb8, 0x39, 0x27, 0x0d, 0xe4, 0x2b, 0xa8, 0x24, 0xe6, 0x08, 0xf9, 0x85, 0x90, 0x0f, 0x1b,
-	0x69, 0xda, 0x1f, 0x40, 0x29, 0x32, 0x38, 0x30, 0x79, 0x8a, 0x8e, 0x8e, 0x16, 0x69, 0x6a, 0xbf,
-	0x84, 0x4a, 0xbc, 0xc9, 0x1e, 0xf3, 0x5e, 0x4a, 0x3b, 0xf1, 0x74, 0xda, 0xcf, 0xfe, 0x62, 0xdd,
-	0xbd, 0xb3, 0xf3, 0x71, 0x97, 0xf2, 0xbd, 0x41, 0xc7, 0x83, 0x5f, 0x3d, 0xa2, 0xbd, 0x1e, 0x3d,
-	0xe2, 0xc4, 0xd8, 0x5b, 0xf5, 0x35, 0xbc, 0x6b, 0x52, 0xc6, 0x5d, 0xda, 0x19, 0x70, 0x62, 0xae,
-	0x0e, 0x8b, 0xc0, 0xaa, 0x50, 0xbb, 0x2a, 0xd4, 0xf6, 0x3b, 0x9d, 0x82, 0x58, 0xde, 0xfe, 0x2f,
-	0x00, 0x00, 0xff, 0xff, 0x36, 0xce, 0x45, 0x9b, 0x98, 0x17, 0x00, 0x00,
+	0x13, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x49, 0xe2, 0x4e, 0xc6, 0x33, 0x34, 0xa4, 0x72, 0x3a,
+	0x1d, 0x02, 0x1d, 0x23, 0x4b, 0x17, 0xe7, 0x5a, 0xfd, 0x71, 0x75, 0xe7, 0xa4, 0xc9, 0x0b, 0x30,
+	0xc3, 0x23, 0x03, 0x9f, 0x81, 0x81, 0x81, 0x19, 0x5e, 0xf8, 0x36, 0xbc, 0xf0, 0x02, 0xdf, 0x84,
+	0xd1, 0x49, 0x56, 0x24, 0x59, 0x8e, 0x9c, 0xba, 0x69, 0x78, 0xd3, 0x9d, 0xf6, 0xf6, 0xb7, 0xfb,
+	0xdb, 0xbd, 0xbd, 0x5d, 0x98, 0x7d, 0xd6, 0x27, 0xce, 0x71, 0x9b, 0x11, 0xe7, 0x90, 0x6a, 0xa4,
+	0xde, 0x73, 0x6c, 0x6e, 0x23, 0x64, 0x52, 0xe3, 0xb0, 0xcf, 0xbc, 0x55, 0x5d, 0x48, 0xc8, 0x25,
+	0xcd, 0x36, 0x4d, 0xdb, 0xf2, 0xf6, 0xe4, 0x52, 0x58, 0x42, 0x2e, 0x53, 0x8b, 0x13, 0xc7, 0x52,
+	0x0d, 0x7f, 0x8d, 0x74, 0x95, 0xab, 0x51, 0x9d, 0xf8, 0x1b, 0x98, 0x55, 0x48, 0x97, 0x32, 0x4e,
+	0x9c, 0x6d, 0x5b, 0x27, 0x0a, 0x79, 0xd6, 0x27, 0x8c, 0xa3, 0x0f, 0x20, 0xdf, 0x51, 0x19, 0xa9,
+	0x49, 0x37, 0xa5, 0xa5, 0x99, 0xb5, 0xeb, 0xf5, 0x08, 0xb2, 0x0f, 0x79, 0x9f, 0x75, 0x37, 0x54,
+	0x46, 0x14, 0x21, 0x89, 0x3e, 0x82, 0x69, 0x55, 0xd7, 0x1d, 0xc2, 0x58, 0x2d, 0x7b, 0xc6, 0xa1,
+	0x75, 0x4f, 0x46, 0x19, 0x08, 0xe3, 0x9f, 0x24, 0x98, 0x8b, 0x5a, 0xc0, 0x7a, 0xb6, 0xc5, 0x08,
+	0xba, 0x0b, 0x05, 0xc6, 0x55, 0xde, 0x67, 0xbe, 0x11, 0xaf, 0x27, 0xea, 0x6b, 0x09, 0x11, 0xc5,
+	0x17, 0x45, 0x1b, 0x30, 0x43, 0x2d, 0xca, 0xdb, 0x3d, 0xd5, 0x51, 0xcd, 0x81, 0x25, 0x6f, 0x45,
+	0x4f, 0x06, 0xac, 0x34, 0x2d, 0xca, 0x77, 0x84, 0xa0, 0x02, 0x34, 0xf8, 0xc6, 0x8f, 0x61, 0xbe,
+	0x75, 0x60, 0x1f, 0x6d, 0xda, 0x86, 0x41, 0x34, 0x4e, 0x6d, 0xeb, 0xc5, 0x49, 0x41, 0x90, 0xd7,
+	0x3b, 0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x88, 0x6f, 0xcc, 0x60, 0x21, 0xae, 0x7e, 0x12, 0x8f, 0xdf,
+	0x81, 0xab, 0x5a, 0xa0, 0xaa, 0xd9, 0x70, 0x7d, 0xce, 0x2d, 0xe5, 0x94, 0xe8, 0x26, 0xfe, 0x4e,
+	0x82, 0xf9, 0xcf, 0x6c, 0x55, 0xbf, 0x20, 0xa7, 0x10, 0x86, 0x52, 0x18, 0xb0, 0x96, 0x13, 0xff,
+	0x22, 0x7b, 0xf8, 0x7b, 0x09, 0x6a, 0x0a, 0x31, 0x88, 0xca, 0xc8, 0x65, 0x9a, 0xf1, 0xad, 0x04,
+	0x73, 0x6e, 0x00, 0x76, 0x54, 0x87, 0xd3, 0xcb, 0x31, 0xa1, 0xe7, 0x65, 0x58, 0xc8, 0x82, 0x49,
+	0x32, 0x00, 0x43, 0xa9, 0x37, 0xd0, 0x74, 0x9a, 0x00, 0x91, 0x3d, 0x6c, 0x42, 0x25, 0x40, 0x73,
+	0x8f, 0x13, 0x86, 0x6e, 0xc2, 0x4c, 0x48, 0x44, 0x00, 0xe6, 0x94, 0xf0, 0x16, 0xfa, 0x18, 0xa6,
+	0x5c, 0x08, 0x22, 0xfc, 0x2b, 0xaf, 0xe1, 0xfa, 0x70, 0xfd, 0xa9, 0x47, 0xb5, 0x2a, 0xde, 0x01,
+	0xfc, 0x9b, 0x04, 0x0b, 0x31, 0xbc, 0x57, 0xce, 0xf2, 0x10, 0x2f, 0xf9, 0x04, 0x5e, 0xfe, 0x90,
+	0x60, 0x71, 0xc8, 0xd0, 0x49, 0x82, 0xb1, 0x07, 0x0b, 0x01, 0x40, 0x5b, 0x27, 0x4c, 0x73, 0x68,
+	0xcf, 0xfd, 0xf6, 0xc2, 0x32, 0xb3, 0xf6, 0x76, 0x3a, 0x89, 0x4c, 0x99, 0x0f, 0x54, 0x34, 0x42,
+	0x1a, 0xf0, 0xaf, 0x12, 0xcc, 0xb9, 0x97, 0xf8, 0xf2, 0x32, 0x77, 0x2c, 0x4e, 0x7f, 0x97, 0x60,
+	0xd1, 0xbf, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x96, 0x40, 0xde, 0x74, 0x88, 0xca, 0xc9, 0x03, 0x37,
+	0x0e, 0x9b, 0x07, 0xaa, 0x65, 0x11, 0x63, 0xb2, 0x04, 0xb8, 0x0d, 0x15, 0xc7, 0x73, 0xb6, 0xad,
+	0x79, 0xfa, 0x84, 0xe9, 0x45, 0xa5, 0xec, 0x6f, 0xfb, 0x28, 0xe8, 0x5d, 0x28, 0x3b, 0x84, 0xf5,
+	0x8d, 0x53, 0xb9, 0x9c, 0x90, 0xbb, 0xea, 0xed, 0xfa, 0x62, 0xf8, 0x17, 0x09, 0x16, 0xd7, 0x75,
+	0x3d, 0x6c, 0xe0, 0x04, 0x77, 0x69, 0x05, 0xae, 0xc5, 0xac, 0xf3, 0xa9, 0x2d, 0x2a, 0xd5, 0xa8,
+	0x7d, 0xcd, 0x06, 0xba, 0x03, 0xd5, 0xa8, 0x85, 0x3e, 0xd5, 0x45, 0xa5, 0x12, 0xb1, 0xb1, 0xd9,
+	0xc0, 0x7f, 0x4b, 0x20, 0x2b, 0xc4, 0xb4, 0x0f, 0x49, 0xa2, 0xa1, 0x2f, 0xc4, 0xe4, 0xc0, 0xbb,
+	0xec, 0x64, 0xde, 0xe5, 0xce, 0xe1, 0x5d, 0x3e, 0xd9, 0xbb, 0x27, 0xb0, 0xf0, 0x48, 0xe5, 0xda,
+	0x41, 0xc3, 0x9c, 0x3c, 0x02, 0x37, 0x00, 0x02, 0x3c, 0xaf, 0x28, 0x14, 0x95, 0xd0, 0x0e, 0xfe,
+	0x33, 0x0b, 0xc8, 0xbd, 0xe4, 0x2d, 0xd2, 0x35, 0x89, 0xc5, 0x5f, 0xfd, 0xc5, 0x89, 0xbd, 0x0b,
+	0xf9, 0xe1, 0x77, 0xe1, 0x06, 0x00, 0xf3, 0xac, 0x73, 0x5d, 0x98, 0x12, 0x17, 0x2b, 0xb4, 0x83,
+	0x64, 0xb8, 0xb2, 0x4f, 0x89, 0xa1, 0xbb, 0x7f, 0x0b, 0xe2, 0x6f, 0xb0, 0x46, 0x0f, 0x00, 0x19,
+	0x2a, 0xe3, 0x6d, 0x5f, 0xbc, 0xed, 0x3d, 0x30, 0xd3, 0xc2, 0xab, 0x58, 0x6d, 0x74, 0xbb, 0xd5,
+	0xba, 0x4f, 0x83, 0x28, 0x8c, 0x4d, 0x6b, 0xdf, 0x56, 0xaa, 0xee, 0xf1, 0xf0, 0x2e, 0xfe, 0x57,
+	0x82, 0x79, 0xbf, 0xde, 0x5c, 0x1a, 0x69, 0x63, 0x54, 0x9b, 0x49, 0x68, 0xc3, 0x3f, 0x4a, 0xb0,
+	0xb8, 0x69, 0x9b, 0x3d, 0xdb, 0x1a, 0xb8, 0x3d, 0xe1, 0x3b, 0xf5, 0x89, 0x77, 0x88, 0x0c, 0x7a,
+	0xe4, 0x5b, 0x23, 0x7a, 0xe4, 0x38, 0xa8, 0x7f, 0x0a, 0xff, 0x23, 0xc1, 0x8c, 0xcf, 0xb6, 0x1b,
+	0x16, 0x74, 0x1d, 0x8a, 0x81, 0x2b, 0x7e, 0x2f, 0x71, 0xba, 0x31, 0x44, 0x61, 0x36, 0x3d, 0xef,
+	0x72, 0xc3, 0x79, 0xf7, 0x1a, 0x5c, 0x31, 0x89, 0xd9, 0x66, 0xf4, 0x84, 0xf8, 0x69, 0x39, 0x6d,
+	0x12, 0xb3, 0x45, 0x4f, 0x88, 0xfb, 0xcb, 0xea, 0x9b, 0x6d, 0xc7, 0x3e, 0x72, 0x99, 0x15, 0xbf,
+	0xac, 0xbe, 0xa9, 0xd8, 0x47, 0x0c, 0xbd, 0x01, 0x40, 0x2d, 0x9d, 0x3c, 0x6f, 0x5b, 0xaa, 0x49,
+	0x6a, 0x05, 0x71, 0xc3, 0x8b, 0x62, 0x67, 0x5b, 0x35, 0x09, 0xaa, 0xc1, 0xb4, 0x58, 0x34, 0x1b,
+	0x22, 0x0b, 0x73, 0xca, 0x60, 0x89, 0xf7, 0x01, 0x85, 0x3c, 0x9c, 0xe8, 0xc6, 0x87, 0xe2, 0x9e,
+	0x8d, 0xc7, 0xdd, 0xed, 0xcd, 0x67, 0x23, 0x40, 0x93, 0xc4, 0xf5, 0x43, 0x98, 0xa2, 0xd6, 0xbe,
+	0x3d, 0x68, 0x37, 0xde, 0x4c, 0x6a, 0x37, 0xc2, 0x60, 0x9e, 0xf4, 0xf2, 0x09, 0x94, 0xa3, 0x4d,
+	0x08, 0x2a, 0xc1, 0x95, 0x6d, 0x9b, 0xdf, 0x7b, 0x4e, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xdb,
+	0xe6, 0x3b, 0x0e, 0x61, 0xc4, 0xe2, 0x55, 0x09, 0x01, 0x14, 0x3e, 0xb7, 0x1a, 0x94, 0x3d, 0xad,
+	0x66, 0xd1, 0xac, 0xdf, 0x5b, 0xaa, 0x46, 0xd3, 0xba, 0x4f, 0x4c, 0xdb, 0x39, 0xae, 0xe6, 0xdc,
+	0xe3, 0xc1, 0x2a, 0x8f, 0xaa, 0x50, 0x0a, 0x44, 0xb6, 0x76, 0x1e, 0x56, 0xa7, 0x50, 0x11, 0xa6,
+	0xbc, 0xcf, 0xc2, 0xda, 0x0f, 0x00, 0x25, 0xf1, 0x6a, 0xb4, 0xbc, 0xc9, 0x14, 0x69, 0x50, 0x0a,
+	0x4f, 0x84, 0xe8, 0x76, 0x92, 0x13, 0x09, 0x53, 0xab, 0xbc, 0x94, 0x2e, 0xe8, 0x71, 0x8b, 0x33,
+	0xe8, 0x09, 0x54, 0xa2, 0x63, 0x18, 0x43, 0x77, 0x12, 0xc9, 0x4a, 0x1a, 0x05, 0xe5, 0xe5, 0x71,
+	0x44, 0x03, 0xac, 0x2e, 0x94, 0x23, 0xfd, 0x3e, 0x43, 0x4b, 0xa3, 0xce, 0xc7, 0x3b, 0x26, 0xf9,
+	0xce, 0x18, 0x92, 0x01, 0xd0, 0x17, 0x50, 0x8e, 0x34, 0x88, 0x23, 0x80, 0x92, 0x9a, 0x48, 0xf9,
+	0xac, 0xf4, 0xc2, 0x19, 0xd4, 0x86, 0x6b, 0xf1, 0xa6, 0x8e, 0xa1, 0x95, 0x64, 0xc2, 0x13, 0x7b,
+	0xbf, 0x34, 0x80, 0x3d, 0xcf, 0xf6, 0x53, 0x02, 0x93, 0xe3, 0x91, 0x38, 0xc5, 0xa6, 0xe9, 0xfe,
+	0x3a, 0x30, 0x3e, 0xa4, 0xfe, 0xbd, 0x33, 0x8c, 0x3f, 0x37, 0x42, 0x07, 0xd0, 0x70, 0x27, 0x89,
+	0xe4, 0xc4, 0x43, 0xf7, 0xcc, 0x1e, 0x3f, 0x96, 0xeb, 0x49, 0xf0, 0xa3, 0xbb, 0x51, 0x9c, 0x41,
+	0x8f, 0x00, 0x6d, 0x11, 0xbe, 0x4b, 0x4d, 0xb2, 0x4b, 0xb5, 0xa7, 0xe3, 0x60, 0xc4, 0x5e, 0x54,
+	0x7f, 0xd1, 0xe2, 0x0e, 0xb5, 0xba, 0x91, 0xb4, 0x99, 0xdb, 0x22, 0xa2, 0xc2, 0x53, 0xc6, 0xa9,
+	0xc6, 0x5e, 0xa2, 0x6a, 0x5b, 0xd8, 0x1c, 0x9f, 0x3d, 0x97, 0xc7, 0x99, 0x82, 0x7c, 0xe2, 0x57,
+	0xc6, 0x92, 0x0d, 0x00, 0xf7, 0x04, 0x60, 0xec, 0xd9, 0x3a, 0xd3, 0x93, 0x31, 0x9f, 0x3e, 0x9c,
+	0x41, 0x1a, 0x94, 0x5d, 0x9e, 0x42, 0xcf, 0xde, 0xad, 0xb4, 0xfa, 0xea, 0x3b, 0x71, 0x3b, 0x55,
+	0x6e, 0xe0, 0xc0, 0xda, 0x5f, 0x05, 0x28, 0x8a, 0x04, 0x10, 0xb5, 0xef, 0xc2, 0x62, 0xbe, 0x0b,
+	0x15, 0x3f, 0xe6, 0x2f, 0x33, 0xdc, 0xed, 0x73, 0xb3, 0x9f, 0x18, 0xde, 0x11, 0xad, 0x0e, 0xce,
+	0xa0, 0xc7, 0x50, 0x89, 0x4d, 0x43, 0xc9, 0x45, 0x68, 0xc4, 0xc8, 0x94, 0x76, 0x8d, 0x35, 0x40,
+	0xc3, 0x63, 0x0c, 0xaa, 0x27, 0x57, 0x8a, 0x51, 0xe3, 0x4e, 0x1a, 0xc8, 0x57, 0x50, 0x89, 0x8d,
+	0x13, 0xc9, 0x17, 0x22, 0x79, 0xe6, 0x48, 0xd3, 0xfe, 0x10, 0x4a, 0xa1, 0xf9, 0x81, 0x25, 0xa7,
+	0xe8, 0xf0, 0x84, 0x91, 0xa6, 0xf6, 0x4b, 0xa8, 0x44, 0x9b, 0xec, 0x11, 0xef, 0x65, 0x62, 0x27,
+	0x9e, 0x4e, 0xfb, 0xc5, 0x5f, 0xac, 0x8d, 0xf5, 0xbd, 0x4f, 0xbb, 0x94, 0x1f, 0xf4, 0x3b, 0x2e,
+	0xfc, 0xea, 0x09, 0x35, 0x0c, 0x7a, 0xc2, 0x89, 0x76, 0xb0, 0xea, 0x69, 0x78, 0x5f, 0xa7, 0x8c,
+	0x3b, 0xb4, 0xd3, 0xe7, 0x44, 0x5f, 0x1d, 0x14, 0x81, 0x55, 0xa1, 0x76, 0x55, 0xa8, 0xed, 0x75,
+	0x3a, 0x05, 0xb1, 0xbc, 0xfb, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xdf, 0xec, 0xde, 0x9f,
+	0x17, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.
diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go
index 9f4629c1b..ff9545780 100644
--- a/internal/querynode/query_node.go
+++ b/internal/querynode/query_node.go
@@ -445,20 +445,18 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
 	}
 
 	// segments are ordered before LoadSegments calling
-	for i, state := range in.SegmentStates {
-		if state.State == commonpb.SegmentState_SegmentGrowing {
-			position := state.StartPosition
-			err = node.loadService.seekSegment(position)
-			if err != nil {
-				status := &commonpb.Status{
-					ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
-					Reason:    err.Error(),
-				}
-				return status, err
+	if in.LastSegmentState.State == commonpb.SegmentState_SegmentGrowing {
+		segmentNum := len(segmentIDs)
+		position := in.LastSegmentState.StartPosition
+		err = node.loadService.seekSegment(position)
+		if err != nil {
+			status := &commonpb.Status{
+				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
+				Reason:    err.Error(),
 			}
-			segmentIDs = segmentIDs[:i]
-			break
+			return status, err
 		}
+		segmentIDs = segmentIDs[:segmentNum-1]
 	}
 
 	err = node.loadService.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go
index c7c97085a..2d165ad46 100644
--- a/internal/queryservice/meta_replica.go
+++ b/internal/queryservice/meta_replica.go
@@ -13,9 +13,6 @@ type metaReplica interface {
 	loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error)
 	updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error
 	getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error)
-	releaseCollection(dbID UniqueID, collectionID UniqueID) error
-	releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error
-	addDmChannels(collectionID UniqueID, channels []string) error
 }
 
 type segment struct {
@@ -29,9 +26,9 @@ type partition struct {
 }
 
 type collection struct {
-	id             UniqueID
-	partitions     map[UniqueID]*partition
-	dmChannelNames []string
+	id           UniqueID
+	partitions   map[UniqueID]*partition
+	node2channel map[int][]string
 }
 
 type metaReplicaImpl struct {
@@ -51,18 +48,18 @@ func newMetaReplica() metaReplica {
 }
 
 func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
-	//TODO:: assert dbID = 0 exist
 	if _, ok := mp.db2collections[dbID]; ok {
 		partitions := make(map[UniqueID]*partition)
+		node2channel := make(map[int][]string)
 		newCollection := &collection{
-			id:         collectionID,
-			partitions: partitions,
+			id:           collectionID,
+			partitions:   partitions,
+			node2channel: node2channel,
 		}
 		mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection)
 		return newCollection, nil
 	}
-
-	return nil, errors.New("addCollection: can't find dbID when add collection")
+	return nil, errors.New("can't find dbID when add collection")
 }
 
 func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
@@ -81,7 +78,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa
 			}
 		}
 	}
-	return nil, errors.New("addPartition: can't find collection when add partition")
+	return nil, errors.New("can't find collection when add partition")
 }
 
 func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) {
@@ -89,7 +86,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error)
 		return collections, nil
 	}
 
-	return nil, errors.New("getCollections: can't find collectionID")
+	return nil, errors.New("can't find collectionID")
 }
 
 func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
@@ -105,7 +102,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) (
 		}
 	}
 
-	return nil, errors.New("getPartitions: can't find partitionIDs")
+	return nil, errors.New("can't find partitionIDs")
 }
 
 func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
@@ -122,7 +119,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par
 			}
 		}
 	}
-	return nil, errors.New("getSegments: can't find segmentID")
+	return nil, errors.New("can't find segmentID")
 }
 
 func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
@@ -130,16 +127,14 @@ func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID)
 	if collections, err := mp.getCollections(dbID); err == nil {
 		for _, collection := range collections {
 			if collectionID == collection.id {
-				res = collection
+				return res, nil
 			}
 		}
-	}
-	if res == nil {
-		collection, err := mp.addCollection(dbID, collectionID)
+	} else {
+		res, err = mp.addCollection(dbID, collectionID)
 		if err != nil {
 			return nil, err
 		}
-		res = collection
 	}
 	return res, nil
 }
@@ -182,7 +177,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
 			}
 		}
 	}
-	return errors.New("updatePartitionState: update partition state fail")
+	return errors.New("update partition state fail")
 }
 
 func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
@@ -208,54 +203,3 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
 	}
 	return partitionStates, nil
 }
-
-func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error {
-	if collections, ok := mp.db2collections[dbID]; ok {
-		for i, collection := range collections {
-			if collectionID == collection.id {
-				collections = append(collections[:i], collections[i+1:]...)
-				return nil
-			}
-		}
-	}
-	return errors.New("releaseCollection: can't find dbID or collectionID")
-}
-
-func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
-	if collections, ok := mp.db2collections[dbID]; ok {
-		for _, collection := range collections {
-			if collectionID == collection.id {
-				if _, ok := collection.partitions[partitionID]; ok {
-					delete(collection.partitions, partitionID)
-					return nil
-				}
-			}
-		}
-	}
-	return errors.New("releasePartition: can't find dbID or collectionID or partitionID")
-}
-
-func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []string) error {
-	//TODO :: use dbID
-	if collections, ok := mp.db2collections[0]; ok {
-		for _, collection := range collections {
-			if collectionID == collection.id {
-				dmChannels := collection.dmChannelNames
-				for _, channel := range channels {
-					match := false
-					for _, existedChannel := range dmChannels {
-						if channel == existedChannel {
-							match = true
-							break
-						}
-					}
-					if !match {
-						dmChannels = append(dmChannels, channel)
-					}
-				}
-				return nil
-			}
-		}
-	}
-	return errors.New("addDmChannels: can't find dbID or collectionID")
-}
diff --git a/internal/queryservice/querynode.go b/internal/queryservice/querynode.go
index 46b03fa41..bc43fa7c0 100644
--- a/internal/queryservice/querynode.go
+++ b/internal/queryservice/querynode.go
@@ -8,6 +8,8 @@ import (
 
 type queryNodeInfo struct {
 	client         QueryNodeInterface
+	insertChannels string
+	nodeID         uint64
 	segments       []UniqueID
 	dmChannelNames []string
 }
@@ -23,21 +25,3 @@ func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb
 func (qn *queryNodeInfo) GetSegmentInfo(in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
 	return qn.client.GetSegmentInfo(in)
 }
-
-func (qn *queryNodeInfo) WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
-	return qn.client.WatchDmChannels(in)
-}
-
-func (qn *queryNodeInfo) AddDmChannels(channels []string) {
-	qn.dmChannelNames = append(qn.dmChannelNames, channels...)
-}
-
-func newQueryNodeInfo(client QueryNodeInterface) *queryNodeInfo {
-	segments := make([]UniqueID, 0)
-	dmChannelNames := make([]string, 0)
-	return &queryNodeInfo{
-		client:         client,
-		segments:       segments,
-		dmChannelNames: dmChannelNames,
-	}
-}
diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go
index cb55a836d..01b60046b 100644
--- a/internal/queryservice/queryservice.go
+++ b/internal/queryservice/queryservice.go
@@ -3,6 +3,7 @@ package queryservice
 import (
 	"context"
 	"fmt"
+	"log"
 	"sort"
 	"strconv"
 	"sync/atomic"
@@ -24,7 +25,6 @@ type MasterServiceInterface interface {
 
 type DataServiceInterface interface {
 	GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
-	GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
 }
 
 type QueryNodeInterface interface {
@@ -47,7 +47,7 @@ type QueryService struct {
 
 	dataServiceClient   DataServiceInterface
 	masterServiceClient MasterServiceInterface
-	queryNodes          map[UniqueID]*queryNodeInfo
+	queryNodes          []*queryNodeInfo
 	numRegisterNode     uint64
 	numQueryChannel     uint64
 
@@ -87,7 +87,7 @@ func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, erro
 		componentStates, err := node.GetComponentStates()
 		if err != nil {
 			subComponentInfos = append(subComponentInfos, &internalpb2.ComponentInfo{
-				NodeID:    nodeID,
+				NodeID:    int64(nodeID),
 				StateCode: internalpb2.StateCode_ABNORMAL,
 			})
 			continue
@@ -111,21 +111,28 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) {
 	return Params.StatsChannelName, nil
 }
 
+// TODO:: do addWatchDmChannel to query node after registerNode
 func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
 	fmt.Println("register query node =", req.Address)
 	// TODO:: add mutex
-	allocatedID := len(qs.queryNodes)
+	allocatedID := uint64(len(qs.queryNodes))
 
 	registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
 	var node *queryNodeInfo
 	if qs.enableGrpc {
 		client := nodeclient.NewClient(registerNodeAddress)
-		node = newQueryNodeInfo(client)
+		node = &queryNodeInfo{
+			client: client,
+			nodeID: allocatedID,
+		}
 	} else {
-		client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID))
-		node = newQueryNodeInfo(client)
+		client := querynode.NewQueryNode(qs.loopCtx, allocatedID)
+		node = &queryNodeInfo{
+			client: client,
+			nodeID: allocatedID,
+		}
 	}
-	qs.queryNodes[UniqueID(allocatedID)] = node
+	qs.queryNodes = append(qs.queryNodes, node)
 
 	//TODO::return init params to queryNode
 	return &querypb.RegisterNodeResponse{
@@ -179,20 +186,10 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
 	if err != nil {
 		return fn(err), err
 	}
-
-	if len(collection.dmChannelNames) != 0 {
+	if collection == nil {
 		return fn(nil), nil
 	}
 
-	channelRequest := datapb.InsertChannelRequest{
-		DbID:         req.DbID,
-		CollectionID: req.CollectionID,
-	}
-	dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
-	if err != nil {
-		return fn(err), err
-	}
-
 	// get partitionIDs
 	showPartitionRequest := &milvuspb.ShowPartitionRequest{
 		Base: &commonpb.MsgBase{
@@ -210,21 +207,6 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
 	}
 	partitionIDs := showPartitionResponse.PartitionIDs
 
-	if len(partitionIDs) == 0 {
-		loadSegmentRequest := &querypb.LoadSegmentRequest{
-			CollectionID: collectionID,
-		}
-		for _, node := range qs.queryNodes {
-			_, err := node.LoadSegments(loadSegmentRequest)
-			if err != nil {
-				return fn(err), err
-			}
-		}
-		nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
-		err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
-		return fn(err), err
-	}
-
 	loadPartitionsRequest := &querypb.LoadPartitionRequest{
 		Base:         req.Base,
 		DbID:         dbID,
@@ -262,14 +244,6 @@ func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest)
 
 	status, err := qs.ReleasePartitions(releasePartitionRequest)
 
-	err = qs.replica.releaseCollection(dbID, collectionID)
-	if err != nil {
-		return &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
-			Reason:    err.Error(),
-		}, err
-	}
-	//TODO:: queryNode cancel subscribe dmChannels
 	return status, err
 }
 
@@ -302,40 +276,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
 	dbID := req.DbID
 	collectionID := req.CollectionID
 	partitionIDs := req.PartitionIDs
-
+	qs.replica.loadPartition(dbID, collectionID, partitionIDs[0])
 	fn := func(err error) *commonpb.Status {
-		if err != nil {
-			return &commonpb.Status{
-				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
-				Reason:    err.Error(),
-			}
-		}
 		return &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
+			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
+			Reason:    err.Error(),
 		}
 	}
 
-	if len(partitionIDs) == 0 {
-		err := errors.New("partitionIDs are empty")
-		return fn(err), err
-	}
-
-	var collection *collection = nil
-	var err error
-	if collection, err = qs.replica.loadCollection(dbID, collectionID); err != nil {
-		return fn(err), err
-	}
-
+	// get segments and load segment to query node
 	for _, partitionID := range partitionIDs {
-		partition, err := qs.replica.loadPartition(dbID, collectionID, partitionID)
-		if err != nil {
-			return fn(err), err
-		}
-
-		if partition == nil {
-			return fn(err), nil
-		}
-
 		showSegmentRequest := &milvuspb.ShowSegmentRequest{
 			Base: &commonpb.MsgBase{
 				MsgType: commonpb.MsgType_kShowSegment,
@@ -347,110 +297,92 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
 		if err != nil {
 			return fn(err), err
 		}
-
+		if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+			log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason)
+		}
 		segmentIDs := showSegmentResponse.SegmentIDs
 		segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
-		channel2segs := make(map[string][]UniqueID)
+		channel2id := make(map[string]int)
+		//id2channels := make(map[int][]string)
+		id2segs := make(map[int][]UniqueID)
+		offset := 0
 
 		resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{
 			SegmentIDs: segmentIDs,
 		})
 
 		if err != nil {
-			return fn(err), err
+			log.Fatal("get segment states fail")
 		}
 
 		for _, state := range resp.States {
 			segmentID := state.SegmentID
 			segmentStates[segmentID] = state
-
-			channelName := state.StartPosition.ChannelName
-
-			if _, ok := channel2segs[channelName]; !ok {
-				segments := make([]UniqueID, 0)
-				segments = append(segments, segmentID)
-				channel2segs[channelName] = segments
+			var flatChannelName string
+			// channelNames := make([]string, 0)
+			// for i, str := range state.StartPositions {
+			//     flatChannelName += str.ChannelName
+			//     channelNames = append(channelNames, str.ChannelName)
+			//     if i+1 < len(state.StartPositions) {
+			//         flatChannelName += "/"
+			//     }
+			// }
+			if flatChannelName == "" {
+				log.Fatal("segmentState's channel name is empty")
+			}
+			if _, ok := channel2id[flatChannelName]; !ok {
+				channel2id[flatChannelName] = offset
+				//id2channels[offset] = channelNames
+				id2segs[offset] = make([]UniqueID, 0)
+				id2segs[offset] = append(id2segs[offset], segmentID)
+				offset++
 			} else {
-				channel2segs[channelName] = append(channel2segs[channelName], segmentID)
+				//TODO::check channel name
+				id := channel2id[flatChannelName]
+				id2segs[id] = append(id2segs[id], segmentID)
 			}
 		}
-
-		qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
-		for channel, segmentIDs := range channel2segs {
-			sort.Slice(segmentIDs, func(i, j int) bool {
-				return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp
-			})
-			var channelLoadDone = false
-			for _, node := range qs.queryNodes {
-				channels2node := node.dmChannelNames
-				for _, ch := range channels2node {
-					if channel == ch {
-						channelLoadDone = true
+		for key, value := range id2segs {
+			sort.Slice(value, func(i, j int) bool { return segmentStates[value[i]].CreateTime < segmentStates[value[j]].CreateTime })
+			selectedSegs := make([]UniqueID, 0)
+			for i, v := range value {
+				if segmentStates[v].State == commonpb.SegmentState_SegmentFlushed {
+					selectedSegs = append(selectedSegs, v)
+				} else {
+					if i > 0 && segmentStates[selectedSegs[i-1]].State != commonpb.SegmentState_SegmentFlushed {
 						break
 					}
-				}
-				if channelLoadDone {
-					break
-				}
-			}
-			if !channelLoadDone {
-				states := make([]*datapb.SegmentStateInfo, 0)
-				for _, id := range segmentIDs {
-					states = append(states, segmentStates[id])
-				}
-				loadSegmentRequest := &querypb.LoadSegmentRequest{
-					CollectionID:  collectionID,
-					PartitionID:   partitionID,
-					SegmentIDs:    segmentIDs,
-					SegmentStates: states,
-				}
-				dmChannels := []string{channel}
-				nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
-				err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
-				if err != nil {
-					return fn(err), err
-				}
-				queryNode := qs.queryNodes[nodeIDs[0]]
-				//TODO:: seek when loadSegment may cause more msgs consumed
-				status, err := queryNode.LoadSegments(loadSegmentRequest)
-				if err != nil {
-					return status, err
+					selectedSegs = append(selectedSegs, v)
 				}
 			}
+			id2segs[key] = selectedSegs
 		}
 
-		qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
-	}
-
-	if len(collection.dmChannelNames) == 0 {
-		channelRequest := datapb.InsertChannelRequest{
-			DbID:         dbID,
-			CollectionID: collectionID,
-		}
+		qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
 
-		dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
-		if err != nil {
-			return fn(err), err
-		}
-		for _, partitionID := range partitionIDs {
-			loadSegmentRequest := &querypb.LoadSegmentRequest{
-				CollectionID: collectionID,
-				PartitionID:  partitionID,
-			}
-			for _, node := range qs.queryNodes {
-				_, err := node.LoadSegments(loadSegmentRequest)
-				if err != nil {
-					return fn(err), nil
+		// TODO:: filter channel for query node
+		for channels, i := range channel2id {
+			for key, node := range qs.queryNodes {
+				if channels == node.insertChannels {
+					statesID := id2segs[i][len(id2segs[i])-1]
+					//TODO :: should be start position
+					// position := segmentStates[statesID-1].StartPositions
+					// segmentStates[statesID].StartPositions = position
+					loadSegmentRequest := &querypb.LoadSegmentRequest{
+						CollectionID:     collectionID,
+						PartitionID:      partitionID,
+						SegmentIDs:       id2segs[i],
+						LastSegmentState: segmentStates[statesID],
+					}
+					status, err := qs.queryNodes[key].LoadSegments(loadSegmentRequest)
+					if err != nil {
+						return status, err
+					}
 				}
 			}
-			nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
-			err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
-		}
-		if err != nil {
-			return fn(err), err
 		}
+		qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
 	}
-
 	return &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_SUCCESS,
 	}, nil
@@ -475,13 +407,6 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest)
 		}
 
 		segmentIDs = append(segmentIDs, res...)
-		err = qs.replica.releasePartition(dbID, collectionID, partitionID)
-		if err != nil {
-			return &commonpb.Status{
-				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
-				Reason:    err.Error(),
-			}, err
-		}
 	}
 	releaseSegmentRequest := &querypb.ReleaseSegmentRequest{
 		Base:         req.Base,
@@ -498,7 +423,6 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest)
 		}
 	}
 
-	//TODO:: queryNode cancel subscribe dmChannels
 	return &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_SUCCESS,
 	}, nil
@@ -562,14 +486,14 @@ func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*queryp
 }
 
 func NewQueryService(ctx context.Context) (*QueryService, error) {
-	nodes := make(map[UniqueID]*queryNodeInfo)
+	nodes := make([]*queryNodeInfo, 0)
 	ctx1, cancel := context.WithCancel(ctx)
 	replica := newMetaReplica()
 	service := &QueryService{
 		loopCtx:         ctx1,
 		loopCancel:      cancel,
-		replica:         replica,
 		queryNodes:      nodes,
+		replica:         replica,
 		numRegisterNode: 0,
 		numQueryChannel: 0,
 		enableGrpc:      false,
@@ -590,77 +514,3 @@ func (qs *QueryService) SetDataService(dataService DataServiceInterface) {
 func (qs *QueryService) SetEnableGrpc(en bool) {
 	qs.enableGrpc = en
 }
-
-func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []UniqueID, collection *collection) error {
-	err := qs.replica.addDmChannels(collection.id, dmChannels)
-	if err != nil {
-		return err
-	}
-	node2channels := make(map[UniqueID][]string)
-	for i, channel := range dmChannels {
-		nodeID := assignedNodeIDs[i]
-		findChannel := false
-		for _, ch := range collection.dmChannelNames {
-			if channel == ch {
-				findChannel = true
-			}
-		}
-		if !findChannel {
-			if _, ok := node2channels[nodeID]; ok {
-				node2channels[nodeID] = append(node2channels[nodeID], channel)
-			} else {
-				channels := make([]string, 0)
-				channels = append(channels, channel)
-				node2channels[nodeID] = channels
-			}
-		}
-	}
-
-	for nodeID, channels := range node2channels {
-		node := qs.queryNodes[nodeID]
-		request := &querypb.WatchDmChannelsRequest{
-			ChannelIDs: channels,
-		}
-		_, err := node.WatchDmChannels(request)
-		node.AddDmChannels(channels)
-		if err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
-func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []UniqueID {
-	maxNumDMChannel := 0
-	res := make([]UniqueID, 0)
-	node2lens := make(map[UniqueID]int)
-	for id, node := range qs.queryNodes {
-		node2lens[id] = len(node.dmChannelNames)
-	}
-	offset := 0
-	for {
-		lastOffset := offset
-		for id, len := range node2lens {
-			if len >= maxNumDMChannel {
-				maxNumDMChannel = len
-			} else {
-				res = append(res, id)
-				node2lens[id]++
-				offset++
-			}
-		}
-		if lastOffset == offset {
-			for id := range node2lens {
-				res = append(res, id)
-				node2lens[id]++
-				offset++
-				break
-			}
-		}
-		if offset == len(dmChannels) {
-			break
-		}
-	}
-	return res
-}
diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go
index a6e36bbee..e0f26c162 100644
--- a/internal/queryservice/queryservice_test.go
+++ b/internal/queryservice/queryservice_test.go
@@ -155,9 +155,6 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap
 
 	return ret, nil
 }
-func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
-	return []string{"test-insert"}, nil
-}
 
 func TestQueryService_Init(t *testing.T) {
 	service, err := NewQueryService(context.Background())
diff --git a/internal/timesync/time_sync_producer.go b/internal/timesync/time_sync_producer.go
index 55f74dfac..1fc4288ff 100644
--- a/internal/timesync/time_sync_producer.go
+++ b/internal/timesync/time_sync_producer.go
@@ -63,19 +63,13 @@ func (producer *MsgProducer) broadcastMsg() {
 
 func (producer *MsgProducer) Start(ctx context.Context) {
 	producer.ctx, producer.cancel = context.WithCancel(ctx)
-	producer.wg.Add(2 + len(producer.watchers))
-	go producer.startTTBarrier()
+	producer.wg.Add(1 + len(producer.watchers))
 	for _, watcher := range producer.watchers {
 		go producer.startWatcher(watcher)
 	}
 	go producer.broadcastMsg()
 }
 
-func (producer *MsgProducer) startTTBarrier() {
-	defer producer.wg.Done()
-	producer.ttBarrier.StartBackgroundLoop(producer.ctx)
-}
-
 func (producer *MsgProducer) startWatcher(watcher TimeTickWatcher) {
 	defer producer.wg.Done()
 	watcher.StartBackgroundLoop(producer.ctx)
diff --git a/internal/timesync/timesync.go b/internal/timesync/timesync.go
index 4f74aca5a..9df5c1e93 100644
--- a/internal/timesync/timesync.go
+++ b/internal/timesync/timesync.go
@@ -18,7 +18,7 @@ type (
 
 	TimeTickBarrier interface {
 		GetTimeTick() (Timestamp, error)
-		StartBackgroundLoop(ctx context.Context)
+		StartBackgroundLoop()
 	}
 
 	softTimeTickBarrier struct {
@@ -38,7 +38,7 @@ type (
 	}
 )
 
-func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
+func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
 	if len(peerIds) <= 0 {
 		log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n")
 		return nil
@@ -49,6 +49,7 @@ func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInte
 	sttbarrier.ttStream = ttStream
 	sttbarrier.outTt = make(chan Timestamp, 1024)
 	sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp)
+	sttbarrier.ctx = ctx
 	for _, id := range peerIds {
 		sttbarrier.peer2LastTt[id] = Timestamp(0)
 	}
@@ -79,12 +80,11 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
 	}
 }
 
-func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
-	ttBarrier.ctx = ctx
+func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop() {
 	for {
 		select {
-		case <-ctx.Done():
-			log.Printf("[TtBarrierStart] %s\n", ctx.Err())
+		case <-ttBarrier.ctx.Done():
+			log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
 			return
 		default:
 		}
@@ -137,14 +137,13 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) {
 	}
 }
 
-func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
-	ttBarrier.ctx = ctx
+func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop() {
 	// Last timestamp synchronized
 	state := Timestamp(0)
 	for {
 		select {
-		case <-ctx.Done():
-			log.Printf("[TtBarrierStart] %s\n", ctx.Err())
+		case <-ttBarrier.ctx.Done():
+			log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
 			return
 		default:
 		}
@@ -188,7 +187,7 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
 	return tempMin
 }
 
-func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
+func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
 	if len(peerIds) <= 0 {
 		log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!")
 		return nil
@@ -199,6 +198,7 @@ func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTime
 	sttbarrier.outTt = make(chan Timestamp, 1024)
 
 	sttbarrier.peer2Tt = make(map[UniqueID]Timestamp)
+	sttbarrier.ctx = ctx
 	for _, id := range peerIds {
 		sttbarrier.peer2Tt[id] = Timestamp(0)
 	}
-- 
GitLab