diff --git a/internal/querynode/search_collection.go b/internal/querynode/search_collection.go index baf46f70c03b84d9f42bb6af56107c777dee3271..5d7044a691b613286365fa5bbee044b1b5340031 100644 --- a/internal/querynode/search_collection.go +++ b/internal/querynode/search_collection.go @@ -132,23 +132,34 @@ func (s *searchCollection) receiveSearchMsg() { for { select { case <-s.releaseCtx.Done(): - log.Debug("stop receiveSearchMsg", zap.Int64("collectionID", s.collectionID)) + log.Debug("stop searchCollection's receiveSearchMsg", zap.Int64("collectionID", s.collectionID)) return case sm := <-s.msgBuffer: + log.Debug("get search message from msgBuffer", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) serviceTime := s.getServiceableTime() if sm.BeginTs() > serviceTime { s.addToUnsolvedMsg(sm) continue } + log.Debug("doing search in receiveSearchMsg...", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) err := s.search(sm) if err != nil { log.Error(err.Error()) + log.Debug("do search failed in receiveSearchMsg, prepare to publish failed search result", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) err2 := s.publishFailedSearchResult(sm, err.Error()) if err2 != nil { log.Error("publish FailedSearchResult failed", zap.Error(err2)) } } - log.Debug("ReceiveSearchMsg, do search done, num of searchMsg = 1") + log.Debug("do search done in receiveSearchMsg", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) } } } @@ -157,7 +168,7 @@ func (s *searchCollection) doUnsolvedMsgSearch() { for { select { case <-s.releaseCtx.Done(): - log.Debug("stop doUnsolvedMsgSearch", zap.Int64("collectionID", s.collectionID)) + log.Debug("stop searchCollection's doUnsolvedMsgSearch", zap.Int64("collectionID", s.collectionID)) return default: serviceTime, err := s.waitNewTSafe() @@ -166,11 +177,17 @@ func (s *searchCollection) doUnsolvedMsgSearch() { // TODO: emptySearch or continue, note: collection has been released continue } + log.Debug("get tSafe from flow graph", + zap.Int64("collectionID", s.collectionID), + zap.Uint64("tSafe", serviceTime)) searchMsg := make([]*msgstream.SearchMsg, 0) tempMsg := s.popAllUnsolvedMsg() for _, sm := range tempMsg { + log.Debug("get search message from unsolvedMsg", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) if sm.EndTs() <= serviceTime { searchMsg = append(searchMsg, sm) continue @@ -184,15 +201,24 @@ func (s *searchCollection) doUnsolvedMsgSearch() { for _, sm := range searchMsg { sp, ctx := trace.StartSpanFromContext(sm.TraceCtx()) sm.SetTraceCtx(ctx) + log.Debug("doing search in doUnsolvedMsgSearch...", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) err := s.search(sm) if err != nil { log.Error(err.Error()) + log.Debug("do search failed in doUnsolvedMsgSearch, prepare to publish failed search result", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) err2 := s.publishFailedSearchResult(sm, err.Error()) if err2 != nil { log.Error("publish FailedSearchResult failed", zap.Error(err2)) } } sp.Finish() + log.Debug("do search done in doUnsolvedMsgSearch", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) } log.Debug("doUnsolvedMsgSearch, do search done", zap.Int("num of searchMsg", len(searchMsg))) } @@ -304,7 +330,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { MetricType: plan.getMetricType(), }, } - err = s.publishSearchResult(searchResultMsg) + err = s.publishSearchResult(searchResultMsg, searchMsg.CollectionID) if err != nil { return err } @@ -378,7 +404,7 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { // fmt.Println(testHits.IDs) // fmt.Println(testHits.Scores) //} - err = s.publishSearchResult(searchResultMsg) + err = s.publishSearchResult(searchResultMsg, searchMsg.CollectionID) if err != nil { return err } @@ -391,13 +417,23 @@ func (s *searchCollection) search(searchMsg *msgstream.SearchMsg) error { return nil } -func (s *searchCollection) publishSearchResult(msg msgstream.TsMsg) error { +func (s *searchCollection) publishSearchResult(msg msgstream.TsMsg, collectionID UniqueID) error { + log.Debug("publishing search result...", + zap.Int64("msgID", msg.ID()), + zap.Int64("collectionID", collectionID)) span, ctx := trace.StartSpanFromContext(msg.TraceCtx()) defer span.Finish() msg.SetTraceCtx(ctx) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, msg) err := s.searchResultMsgStream.Produce(&msgPack) + if err != nil { + log.Error(err.Error()) + } else { + log.Debug("publish search result done", + zap.Int64("msgID", msg.ID()), + zap.Int64("collectionID", collectionID)) + } return err } diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index be0375704b70852de3cf70046d31017b09cef4dd..ba95ff3e750cab44bed6dc8392443ef49af77b2e 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -82,8 +82,8 @@ func (s *searchService) consumeSearch() { if msgPack == nil || len(msgPack.Msgs) <= 0 { continue } - emptySearchNum := 0 for _, msg := range msgPack.Msgs { + log.Debug("consume search message", zap.Int64("msgID", msg.ID())) sm, ok := msg.(*msgstream.SearchMsg) if !ok { continue @@ -93,17 +93,20 @@ func (s *searchService) consumeSearch() { err := s.collectionCheck(sm.CollectionID) if err != nil { s.emptySearchCollection.emptySearch(sm) - emptySearchNum++ + log.Debug("cannot found collection, do empty search done", + zap.Int64("msgID", sm.ID()), + zap.Int64("collectionID", sm.CollectionID)) continue } sc, ok := s.searchCollections[sm.CollectionID] if !ok { s.startSearchCollection(sm.CollectionID) + log.Debug("new search collection, start search collection service", + zap.Int64("collectionID", sm.CollectionID)) } sc.msgBuffer <- sm sp.Finish() } - log.Debug("do empty search done", zap.Int("num of searchMsg", emptySearchNum)) } } }