diff --git a/build/docker/test/docker-compose.yml b/build/docker/test/docker-compose.yml index 67a2be03b76e6b9445094393fdca8403e64ec238..caf095f411a32bda3fef3d01c85a85300a760343 100644 --- a/build/docker/test/docker-compose.yml +++ b/build/docker/test/docker-compose.yml @@ -12,7 +12,7 @@ services: - ../../..:/milvus-distributed:delegated working_dir: "/milvus-distributed/tests/python" command: > - /bin/bash -c "pytest --ip proxyservice" + /bin/bash -c "pytest --ip proxyservice -n 4" networks: - milvus diff --git a/internal/indexnode/grpc_service.go b/internal/indexnode/grpc_service.go index abd54d4eb07c3302a0a88ce948b532f8e9a022ab..496fedde9fe5063ea1b228d0f14ccfd7745869fe 100644 --- a/internal/indexnode/grpc_service.go +++ b/internal/indexnode/grpc_service.go @@ -60,7 +60,9 @@ func (b *Builder) GetIndexStates(ctx context.Context, request *indexpb.IndexStat var indexStates []*indexpb.IndexInfo for _, indexID := range request.IndexID { indexState, err := b.metaTable.GetIndexStates(indexID) - log.Println("GetIndexStates error, err=", err) + if err != nil { + log.Println("GetIndexStates error, err=", err) + } indexStates = append(indexStates, indexState) } ret := &indexpb.IndexStatesResponse{ diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index cd41fb9b0a939e743c2d90343417c8ab84597505..7614d0bfcc461cc67979ddf77c80a00b79530be6 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -173,12 +173,11 @@ func (it *IndexBuildTask) Execute() error { indexParams[key] = value } - fmt.Println("before NewCIndex ..........................") it.index, err = NewCIndex(typeParams, indexParams) if err != nil { + fmt.Println("NewCIndex err:", err.Error()) return err } - fmt.Println("after NewCIndex ..........................") getKeyByPathNaive := func(path string) string { // splitElements := strings.Split(path, "/") @@ -221,6 +220,7 @@ func (it *IndexBuildTask) Execute() error { storageBlobs := getStorageBlobs(blobs) var insertCodec storage.InsertCodec partitionID, segmentID, insertData, err2 := insertCodec.Deserialize(storageBlobs) + //fmt.Println("IndexBuilder for segmentID,", segmentID) if err2 != nil { return err2 } @@ -230,11 +230,11 @@ func (it *IndexBuildTask) Execute() error { for _, value := range insertData.Data { // TODO: BinaryVectorFieldData - fmt.Println("before build index ..................................") floatVectorFieldData, fOk := value.(*storage.FloatVectorFieldData) if fOk { err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data) if err != nil { + fmt.Println("BuildFloatVecIndexWithoutIds, error:", err.Error()) return err } } @@ -243,19 +243,19 @@ func (it *IndexBuildTask) Execute() error { if bOk { err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data) if err != nil { + fmt.Println("BuildBinaryVecIndexWithoutIds, err:", err.Error()) return err } } - fmt.Println("after build index ..................................") if !fOk && !bOk { return errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData") } - fmt.Println("before serialize .............................................") indexBlobs, err := it.index.Serialize() - fmt.Println("after serialize .............................................") if err != nil { + fmt.Println("serialize ... err:", err.Error()) + return err } @@ -284,8 +284,8 @@ func (it *IndexBuildTask) Execute() error { it.savePaths = append(it.savePaths, savePath) } } - - return it.index.Delete() + it.index.Delete() + return nil } func (it *IndexBuildTask) PostExecute() error { diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index fbef8b722d89bd43ac36547386243af6b51f452c..f2ccba0c95728acc6ab5a31f132d3c2244409405 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -116,7 +116,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task { func (queue *BaseTaskQueue) Enqueue(t task) error { tID, _ := queue.sched.idAllocator.AllocOne() - log.Printf("[Builder] allocate reqID: %v", tID) + // log.Printf("[Builder] allocate reqID: %v", tID) t.SetID(tID) err := t.OnEnqueue() if err != nil { @@ -209,17 +209,17 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { defer func() { t.Notify(err) - log.Printf("notify with error: %v", err) + // log.Printf("notify with error: %v", err) }() if err != nil { return } q.AddActiveTask(t) - log.Printf("task add to active list ...") + // log.Printf("task add to active list ...") defer func() { q.PopActiveTask(t.ID()) - log.Printf("pop from active list ...") + // log.Printf("pop from active list ...") }() err = t.Execute() @@ -227,9 +227,9 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { log.Printf("execute definition task failed, error = %v", err) return } - log.Printf("task execution done ...") + // log.Printf("task execution done ...") err = t.PostExecute() - log.Printf("post execute task done ...") + // log.Printf("post execute task done ...") } func (sched *TaskScheduler) indexBuildLoop() { diff --git a/internal/master/flush_scheduler.go b/internal/master/flush_scheduler.go index 7ed3d9a5689383f05d10a9ab598f9e4f293540c5..a595b5c51cea2d75a0fd9d0fe37850635ee748f7 100644 --- a/internal/master/flush_scheduler.go +++ b/internal/master/flush_scheduler.go @@ -50,11 +50,11 @@ func (scheduler *FlushScheduler) schedule(id interface{}) error { } // todo set corrent timestamp err = scheduler.client.FlushSegment(segmentID, segmentMeta.CollectionID, segmentMeta.PartitionTag, ts) - log.Printf("flush segment %d", segmentID) if err != nil { + log.Println("flushsegment: ", segmentID, " error :", err.Error()) return err } - + //log.Printf("flush segment %d", segmentID) scheduler.segmentDescribeChan <- segmentID return nil @@ -78,6 +78,7 @@ func (scheduler *FlushScheduler) describe() error { continue } if !description.IsClosed { + //log.Println("describe segment ", singleSegmentID, " IsClosed :False") continue } diff --git a/internal/master/index_builder_scheduler.go b/internal/master/index_builder_scheduler.go index 450821d46adc1249909c3e763d9b60c7384973e4..80cce68f46a9d022e5d9d3e6514c577a4aa013db 100644 --- a/internal/master/index_builder_scheduler.go +++ b/internal/master/index_builder_scheduler.go @@ -2,6 +2,7 @@ package master import ( "context" + "fmt" "log" "time" @@ -60,6 +61,7 @@ func (scheduler *IndexBuildScheduler) schedule(info interface{}) error { return err } indexParams, err := scheduler.metaTable.GetFieldIndexParams(segMeta.CollectionID, indexBuildInfo.fieldID) + if err != nil { return err } @@ -73,8 +75,8 @@ func (scheduler *IndexBuildScheduler) schedule(info interface{}) error { } indexID, err := scheduler.client.BuildIndex(indexBuildInfo.binlogFilePath, typeParamsMap, indexParamsMap) - log.Printf("build index for segment %d field %d", indexBuildInfo.segmentID, indexBuildInfo.fieldID) if err != nil { + log.Printf("build index for segment %d field %d, failed:%s", indexBuildInfo.segmentID, indexBuildInfo.fieldID, err.Error()) return err } @@ -151,6 +153,7 @@ func (scheduler *IndexBuildScheduler) describe() error { IndexFilePaths: filePaths, }) if err != nil { + fmt.Println("indexbuilder scheduler updateFiledIndexMetaFailed", indexBuildInfo.segmentID) return err } diff --git a/internal/master/index_task.go b/internal/master/index_task.go index 14e9c1ff86047ec51fb79388fc95331d5ee3d683..660a6b173330c736577500be8f82ea76d995c926 100644 --- a/internal/master/index_task.go +++ b/internal/master/index_task.go @@ -195,6 +195,7 @@ func (task *getIndexStateTask) Execute() error { ErrorCode: commonpb.ErrorCode_SUCCESS, }, } + if int64(totalSegmentNums) == relatedSegments { task.resp.State = commonpb.IndexState_FINISHED } else { diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 609fefaa2eaf4ec4376a23f390103366768021f3..37dc99332cad3539547cab3f7b8001c8449cb2a2 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -499,7 +499,7 @@ func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.Descr } func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) { - log.Println("Describe index progress for: ", request) + // log.Println("Describe index progress for: ", request) dipt := &GetIndexStateTask{ Condition: NewTaskCondition(ctx), IndexStateRequest: request, diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index b9334f453bace04a747fd82490ab66524d56f3ff..ac52869fcc19eb128c1a63db00c47ff60d46df58 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -121,7 +121,7 @@ func (node *NodeImpl) Init() error { node.manipulationMsgStream.SetPulsarClient(pulsarAddress) node.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames()) repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { - return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, false) + return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true) } node.manipulationMsgStream.SetRepackFunc(repackFuncImpl) diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index 5c3a86a255f69a4c661b2dc1ea1edcda6bc0ee74..2d91b0dadffe30d0c490652bf82bcfb0e00a1f42 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -3,7 +3,6 @@ package proxynode import ( "context" "errors" - "fmt" "log" "math" "strconv" @@ -491,11 +490,11 @@ func (st *SearchTask) PostExecute() error { for { select { case <-st.ctx.Done(): - log.Print("wait to finish failed, timeout!") + log.Print("SearchTask: wait to finish failed, timeout!, taskID:", st.ID()) span.LogFields(oplog.String("wait to finish failed, timeout", "wait to finish failed, timeout")) - return errors.New("wait to finish failed, timeout") + return errors.New("SearchTask:wait to finish failed, timeout:" + strconv.FormatInt(st.ID(), 10)) case searchResults := <-st.resultBuf: - fmt.Println("searchResults: ", searchResults) + // fmt.Println("searchResults: ", searchResults) span.LogFields(oplog.String("receive result", "receive result")) filterSearchResult := make([]*internalpb2.SearchResults, 0) var filterReason string diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 8f551ecbea3a198de64335990dce57eaa9468267..4de0d5986aa1392dab03a927e86d535333dc08f4 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -4,7 +4,9 @@ import ( "container/list" "context" "errors" + "fmt" "log" + "strconv" "sync" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -170,11 +172,11 @@ func (queue *BaseTaskQueue) Enqueue(t task) error { } ts, _ := queue.sched.tsoAllocator.AllocOne() - log.Printf("[NodeImpl] allocate timestamp: %v", ts) + // log.Printf("[NodeImpl] allocate timestamp: %v", ts) t.SetTs(ts) reqID, _ := queue.sched.idAllocator.AllocOne() - log.Printf("[NodeImpl] allocate reqID: %v", reqID) + // log.Printf("[NodeImpl] allocate reqID: %v", reqID) t.SetID(reqID) return queue.addUnissuedTask(t) @@ -296,17 +298,17 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { defer func() { t.Notify(err) - log.Printf("notify with error: %v", err) + // log.Printf("notify with error: %v", err) }() if err != nil { return } q.AddActiveTask(t) - log.Printf("task add to active list ...") + // log.Printf("task add to active list ...") defer func() { q.PopActiveTask(t.EndTs()) - log.Printf("pop from active list ...") + // log.Printf("pop from active list ...") }() err = t.Execute() @@ -314,9 +316,9 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) { log.Printf("execute definition task failed, error = %v", err) return } - log.Printf("task execution done ...") + // log.Printf("task execution done ...") err = t.PostExecute() - log.Printf("post execute task done ...") + // log.Printf("post execute task done ...") } func (sched *TaskScheduler) definitionLoop() { @@ -357,7 +359,7 @@ func (sched *TaskScheduler) queryLoop() { case <-sched.ctx.Done(): return case <-sched.DqQueue.utChan(): - log.Print("scheduler receive query request ...") + // log.Print("scheduler receive query request ...") if !sched.DqQueue.UTEmpty() { t := sched.scheduleDqTask() go sched.processTask(t, sched.DqQueue) @@ -398,11 +400,25 @@ func (sched *TaskScheduler) queryResultLoop() { for _, tsMsg := range msgPack.Msgs { searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg) reqID := searchResultMsg.Base.MsgID + reqIDStr := strconv.FormatInt(reqID, 10) + t := sched.getTaskByReqID(reqID) + if t == nil { + log.Println(fmt.Sprint("QueryResult:czs:GetTaskByReqID failed, reqID:", reqIDStr)) + delete(queryResultBuf, reqID) + continue + } + _, ok = queryResultBuf[reqID] if !ok { queryResultBuf[reqID] = make([]*internalpb2.SearchResults, 0) } queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResults) + + //t := sched.getTaskByReqID(reqID) + { + colName := t.(*SearchTask).query.CollectionName + fmt.Println("ljq getCollection: ", colName, " reqID: ", reqIDStr, " answer cnt:", len(queryResultBuf[reqID])) + } if len(queryResultBuf[reqID]) == queryNodeNum { t := sched.getTaskByReqID(reqID) if t != nil { @@ -413,7 +429,8 @@ func (sched *TaskScheduler) queryResultLoop() { delete(queryResultBuf, reqID) } } else { - log.Printf("task with reqID %v is nil", reqID) + + // log.Printf("task with reqID %v is nil", reqID) } } } diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 98b80665c184563e7782989e5be4ec676d4349e3..040a858aabbfec4a7d19fea5c113d8c93f779b49 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -4,7 +4,6 @@ import "C" import ( "context" "errors" - "fmt" "log" "regexp" "strconv" @@ -240,9 +239,7 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { return errors.New("unmarshal query failed") } collectionName := query.CollectionName - fmt.Println("[ljq collection name]: ", collectionName) partitionTagsInQuery := query.PartitionNames - fmt.Println("[search service ljq] query: ", query) collection, err := ss.replica.getCollectionByName(collectionName) if err != nil { span.LogFields(oplog.Error(err)) @@ -267,7 +264,7 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { searchResults := make([]*SearchResult, 0) matchedSegments := make([]*Segment, 0) - fmt.Println("search msg's partitionTag = ", partitionTagsInQuery) + //fmt.Println("search msg's partitionTag = ", partitionTagsInQuery) var partitionTagsInCol []string for _, partition := range collection.partitions { @@ -411,7 +408,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { // fmt.Println(testHits.IDs) // fmt.Println(testHits.Scores) //} - err = ss.publishSearchResult(searchResultMsg) if err != nil { span.LogFields(oplog.Error(err)) @@ -430,7 +426,6 @@ func (ss *searchService) publishSearchResult(msg msgstream.TsMsg) error { // span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "publish search result") // defer span.Finish() // msg.SetMsgContext(ctx) - fmt.Println("Public SearchResult", msg.HashKeys()) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, msg) err := ss.searchResultMsgStream.Produce(&msgPack) diff --git a/internal/writenode/client/client.go b/internal/writenode/client/client.go index 31591ce89c33da0fc5060dd8e1462725d39c7b21..4bb69b8feab91395907308e52dcc52b90979c122 100644 --- a/internal/writenode/client/client.go +++ b/internal/writenode/client/client.go @@ -95,7 +95,6 @@ func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error if err != nil { return nil, err } - if count <= 0 { ret.IsClosed = false return ret, nil @@ -103,10 +102,11 @@ func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error value, err := c.kvClient.Load(key) if err != nil { - return ret, err + return nil, err } flushMeta := pb.SegmentFlushMeta{} + err = proto.UnmarshalText(value, &flushMeta) if err != nil { return ret, err diff --git a/internal/writenode/flush_sync_service.go b/internal/writenode/flush_sync_service.go index 77e17bda86b2e7db6e4bea3add0d73e34e4f35e2..95c60c58ff4b1c5c04c83ae67b1707575d082791 100644 --- a/internal/writenode/flush_sync_service.go +++ b/internal/writenode/flush_sync_service.go @@ -63,6 +63,22 @@ func (fService *flushSyncService) completeInsertFlush(segID UniqueID) { fService.insertFlushed[segID] = true } +func (fService *flushSyncService) InsertFlushCompleted(segID UniqueID) bool { + isinsertFlushed, ok := fService.insertFlushed[segID] + if !ok { + return false + } + return isinsertFlushed +} + +func (fService *flushSyncService) DDFlushCompleted(segID UniqueID) bool { + isddFlushed, ok := fService.ddFlushed[segID] + if !ok { + return false + } + return isddFlushed +} + func (fService *flushSyncService) FlushCompleted(segID UniqueID) bool { isddFlushed, ok := fService.ddFlushed[segID] if !ok { @@ -95,12 +111,18 @@ func (fService *flushSyncService) start() { continue } fService.completeDDFlush(ddFlushMsg.segID) + if fService.FlushCompleted(ddFlushMsg.segID) { + //log.Printf("DD:Seg(%d) flush completed.", ddFlushMsg.segID) + fService.metaTable.CompleteFlush(Timestamp(0), ddFlushMsg.segID) + } case insertFlushMsg := <-fService.insertChan: if insertFlushMsg == nil { continue } + //log.Println("FlushSyncService insertFlushMsg ", insertFlushMsg.segID) if !insertFlushMsg.flushCompleted { + //log.Println("FlushSyncService", insertFlushMsg.segID, " not flushCompleted") err := fService.metaTable.AppendSegBinlogPaths(insertFlushMsg.ts, insertFlushMsg.segID, insertFlushMsg.fieldID, insertFlushMsg.paths) if err != nil { @@ -109,6 +131,7 @@ func (fService *flushSyncService) start() { } continue } + fService.completeInsertFlush(insertFlushMsg.segID) if fService.FlushCompleted(insertFlushMsg.segID) { diff --git a/internal/writenode/meta_table.go b/internal/writenode/meta_table.go index ea7828874f935da9e41880c46b87d55595f222b5..3b5d6bd0173731098b8e5b000d449850f507fec9 100644 --- a/internal/writenode/meta_table.go +++ b/internal/writenode/meta_table.go @@ -131,7 +131,6 @@ func (mt *metaTable) saveSegFlushMeta(meta *pb.SegmentFlushMeta) error { value := proto.MarshalTextString(meta) mt.segID2FlushMeta[meta.SegmentID] = *meta - return mt.client.Save(Params.WriteNodeSegKvSubPath+strconv.FormatInt(meta.SegmentID, 10), value) } diff --git a/tests/python/requirements.txt b/tests/python/requirements.txt index f41ff379e7b82ced201787d815d4cc608eadf304..a4136e23e77e33a092b21452c4cfad575d97d6b7 100644 --- a/tests/python/requirements.txt +++ b/tests/python/requirements.txt @@ -1,8 +1,13 @@ grpcio==1.26.0 grpcio-tools==1.26.0 numpy==1.18.1 -pytest==5.3.4 pytest-cov==2.8.1 -pytest-timeout==1.3.4 pymilvus-distributed==0.0.17 sklearn==0.0 +pytest==4.5.0 +pytest-timeout==1.3.3 +pytest-repeat==0.8.0 +allure-pytest==2.7.0 +pytest-print==0.1.2 +pytest-level==0.1.1 +pytest-xdist==1.23.2 diff --git a/tests/python/test_list_collections.py b/tests/python/test_list_collections.py index 1572790e143f7da3993369e74d3994f1f5e80120..2c363595c9e4b4a16286da13ead8d3e9cc6d1704 100644 --- a/tests/python/test_list_collections.py +++ b/tests/python/test_list_collections.py @@ -50,7 +50,9 @@ class TestListCollections: collection_name = gen_unique_str(uid) assert collection_name not in connect.list_collections() + @pytest.mark.level(2) + @pytest.mark.skip("can't run in parallel") def test_list_collections_no_collection(self, connect): ''' target: test show collections is correct or not, if no collection in db