Skip to content
Snippets Groups Projects
Select Git revision
  • 6d7ec0b6978e13128809158c02715259684b4af3
  • master default
2 results

ParserTest.cpp

Blame
  • load_service.go 10.19 KiB
    package querynode
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"log"
    	"path"
    	"sort"
    	"strconv"
    	"strings"
    	"time"
    
    	minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
    	"github.com/zilliztech/milvus-distributed/internal/msgstream"
    	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
    	"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
    	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
    	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
    	"github.com/zilliztech/milvus-distributed/internal/storage"
    )
    
    const indexCheckInterval = 1
    
    type loadService struct {
    	ctx    context.Context
    	cancel context.CancelFunc
    	client *minioKV.MinIOKV
    
    	queryNodeID UniqueID
    	replica     collectionReplica
    
    	fieldIndexes   map[string][]*internalpb2.IndexStats
    	fieldStatsChan chan []*internalpb2.FieldStats
    
    	loadIndexReqChan   chan []msgstream.TsMsg
    	loadIndexMsgStream msgstream.MsgStream
    
    	segManager *segmentManager
    }
    
    func (lis *loadService) consume() {
    	for {
    		select {
    		case <-lis.ctx.Done():
    			return
    		default:
    			messages := lis.loadIndexMsgStream.Consume()
    			if messages == nil || len(messages.Msgs) <= 0 {
    				log.Println("null msg pack")
    				continue
    			}
    			lis.loadIndexReqChan <- messages.Msgs
    		}
    	}
    }
    
    func (lis *loadService) indexListener() {
    	for {
    		select {
    		case <-lis.ctx.Done():
    			return
    		case <-time.After(indexCheckInterval * time.Second):
    			collectionIDs, segmentIDs := lis.replica.getSealedSegments()
    			for i := range collectionIDs {
    				// we don't need index id yet
    				_, buildID, err := lis.segManager.getIndexInfo(collectionIDs[i], segmentIDs[i])
    				if err != nil {
    					indexPaths, err := lis.segManager.getIndexPaths(buildID)
    					if err != nil {
    						log.Println(err)
    						continue
    					}
    					err = lis.segManager.loadIndex(segmentIDs[i], indexPaths)
    					if err != nil {
    						log.Println(err)
    						continue
    					}
    				}
    			}
    		}
    	}
    }
    
    func (lis *loadService) start() {
    	lis.loadIndexMsgStream.Start()
    	go lis.consume()
    	go lis.indexListener()
    
    	for {
    		select {
    		case <-lis.ctx.Done():
    			return
    		case messages := <-lis.loadIndexReqChan:
    			for _, msg := range messages {
    				err := lis.execute(msg)
    				if err != nil {
    					log.Println(err)
    					continue
    				}
    			}
    
    			// sendQueryNodeStats
    			err := lis.sendQueryNodeStats()
    			if err != nil {
    				log.Println(err)
    				continue
    			}
    		}
    	}
    }
    
    func (lis *loadService) execute(msg msgstream.TsMsg) error {
    	indexMsg, ok := msg.(*msgstream.LoadIndexMsg)
    	if !ok {
    		return errors.New("type assertion failed for LoadIndexMsg")
    	}
    	// 1. use msg's index paths to get index bytes
    	var err error
    	var indexBuffer [][]byte
    	var indexParams indexParam
    	fn := func() error {
    		indexBuffer, indexParams, err = lis.loadIndex(indexMsg.IndexPaths)
    		if err != nil {
    			return err
    		}
    		return nil
    	}
    	err = util.Retry(5, time.Millisecond*200, fn)
    	if err != nil {
    		return err
    	}
    	ok, err = lis.checkIndexReady(indexParams, indexMsg)
    	if err != nil {
    		return err
    	}
    	if ok {
    		// no error
    		return errors.New("")
    	}
    	// 2. use index bytes and index path to update segment
    	err = lis.updateSegmentIndex(indexParams, indexBuffer, indexMsg)
    	if err != nil {
    		return err
    	}
    	//3. update segment index stats
    	err = lis.updateSegmentIndexStats(indexParams, indexMsg)
    	if err != nil {
    		return err
    	}
    	fmt.Println("load index done")
    	return nil
    }
    
    func (lis *loadService) close() {
    	if lis.loadIndexMsgStream != nil {
    		lis.loadIndexMsgStream.Close()
    	}
    	lis.cancel()
    }
    
    func (lis *loadService) printIndexParams(index []*commonpb.KeyValuePair) {
    	fmt.Println("=================================================")
    	for i := 0; i < len(index); i++ {
    		fmt.Println(index[i])
    	}
    }
    
    func (lis *loadService) indexParamsEqual(index1 []*commonpb.KeyValuePair, index2 []*commonpb.KeyValuePair) bool {
    	if len(index1) != len(index2) {
    		return false
    	}
    
    	for i := 0; i < len(index1); i++ {
    		kv1 := *index1[i]
    		kv2 := *index2[i]
    		if kv1.Key != kv2.Key || kv1.Value != kv2.Value {
    			return false
    		}
    	}
    
    	return true
    }
    
    func (lis *loadService) fieldsStatsIDs2Key(collectionID UniqueID, fieldID UniqueID) string {
    	return strconv.FormatInt(collectionID, 10) + "/" + strconv.FormatInt(fieldID, 10)
    }
    
    func (lis *loadService) fieldsStatsKey2IDs(key string) (UniqueID, UniqueID, error) {
    	ids := strings.Split(key, "/")
    	if len(ids) != 2 {
    		return 0, 0, errors.New("illegal fieldsStatsKey")
    	}
    	collectionID, err := strconv.ParseInt(ids[0], 10, 64)
    	if err != nil {
    		return 0, 0, err
    	}
    	fieldID, err := strconv.ParseInt(ids[1], 10, 64)
    	if err != nil {
    		return 0, 0, err
    	}
    	return collectionID, fieldID, nil
    }
    
    func (lis *loadService) updateSegmentIndexStats(indexParams indexParam, indexMsg *msgstream.LoadIndexMsg) error {
    	targetSegment, err := lis.replica.getSegmentByID(indexMsg.SegmentID)
    	if err != nil {
    		return err
    	}
    
    	fieldStatsKey := lis.fieldsStatsIDs2Key(targetSegment.collectionID, indexMsg.FieldID)
    	_, ok := lis.fieldIndexes[fieldStatsKey]
    	newIndexParams := make([]*commonpb.KeyValuePair, 0)
    	for k, v := range indexParams {
    		newIndexParams = append(newIndexParams, &commonpb.KeyValuePair{
    			Key:   k,
    			Value: v,
    		})
    	}
    
    	// sort index params by key
    	sort.Slice(newIndexParams, func(i, j int) bool { return newIndexParams[i].Key < newIndexParams[j].Key })
    	if !ok {
    		lis.fieldIndexes[fieldStatsKey] = make([]*internalpb2.IndexStats, 0)
    		lis.fieldIndexes[fieldStatsKey] = append(lis.fieldIndexes[fieldStatsKey],
    			&internalpb2.IndexStats{
    				IndexParams:        newIndexParams,
    				NumRelatedSegments: 1,
    			})
    	} else {
    		isNewIndex := true
    		for _, index := range lis.fieldIndexes[fieldStatsKey] {
    			if lis.indexParamsEqual(newIndexParams, index.IndexParams) {
    				index.NumRelatedSegments++
    				isNewIndex = false
    			}
    		}
    		if isNewIndex {
    			lis.fieldIndexes[fieldStatsKey] = append(lis.fieldIndexes[fieldStatsKey],
    				&internalpb2.IndexStats{
    					IndexParams:        newIndexParams,
    					NumRelatedSegments: 1,
    				})
    		}
    	}
    	return targetSegment.setIndexParam(indexMsg.FieldID, indexMsg.IndexParams)
    }
    
    func (lis *loadService) loadIndex(indexPath []string) ([][]byte, indexParam, error) {
    	index := make([][]byte, 0)
    
    	var indexParams indexParam
    	for _, p := range indexPath {
    		fmt.Println("load path = ", indexPath)
    		indexPiece, err := (*lis.client).Load(p)
    		if err != nil {
    			return nil, nil, err
    		}
    		// get index params when detecting indexParamPrefix
    		if path.Base(p) == storage.IndexParamsFile {
    			indexCodec := storage.NewIndexCodec()
    			_, indexParams, err = indexCodec.Deserialize([]*storage.Blob{
    				{
    					Key:   storage.IndexParamsFile,
    					Value: []byte(indexPiece),
    				},
    			})
    			if err != nil {
    				return nil, nil, err
    			}
    		} else {
    			index = append(index, []byte(indexPiece))
    		}
    	}
    
    	if len(indexParams) <= 0 {
    		return nil, nil, errors.New("cannot find index param")
    	}
    	return index, indexParams, nil
    }
    
    func (lis *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][]byte, loadIndexMsg *msgstream.LoadIndexMsg) error {
    	segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID)
    	if err != nil {
    		return err
    	}
    
    	loadIndexInfo, err := newLoadIndexInfo()
    	defer deleteLoadIndexInfo(loadIndexInfo)
    	if err != nil {
    		return err
    	}
    	err = loadIndexInfo.appendFieldInfo(loadIndexMsg.FieldName, loadIndexMsg.FieldID)
    	if err != nil {
    		return err
    	}
    	for k, v := range indexParams {
    		err = loadIndexInfo.appendIndexParam(k, v)
    		if err != nil {
    			return err
    		}
    	}
    	err = loadIndexInfo.appendIndex(bytesIndex, loadIndexMsg.IndexPaths)
    	if err != nil {
    		return err
    	}
    	return segment.updateSegmentIndex(loadIndexInfo)
    }
    
    func (lis *loadService) sendQueryNodeStats() error {
    	resultFieldsStats := make([]*internalpb2.FieldStats, 0)
    	for fieldStatsKey, indexStats := range lis.fieldIndexes {
    		colID, fieldID, err := lis.fieldsStatsKey2IDs(fieldStatsKey)
    		if err != nil {
    			return err
    		}
    		fieldStats := internalpb2.FieldStats{
    			CollectionID: colID,
    			FieldID:      fieldID,
    			IndexStats:   indexStats,
    		}
    		resultFieldsStats = append(resultFieldsStats, &fieldStats)
    	}
    
    	lis.fieldStatsChan <- resultFieldsStats
    	fmt.Println("sent field stats")
    	return nil
    }
    
    func (lis *loadService) checkIndexReady(indexParams indexParam, loadIndexMsg *msgstream.LoadIndexMsg) (bool, error) {
    	segment, err := lis.replica.getSegmentByID(loadIndexMsg.SegmentID)
    	if err != nil {
    		return false, err
    	}
    	if !segment.matchIndexParam(loadIndexMsg.FieldID, indexParams) {
    		return false, nil
    	}
    	return true, nil
    
    }
    
    func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
    	ctx1, cancel := context.WithCancel(ctx)
    
    	option := &minioKV.Option{
    		Address:           Params.MinioEndPoint,
    		AccessKeyID:       Params.MinioAccessKeyID,
    		SecretAccessKeyID: Params.MinioSecretAccessKey,
    		UseSSL:            Params.MinioUseSSLStr,
    		CreateBucket:      true,
    		BucketName:        Params.MinioBucketName,
    	}
    
    	MinioKV, err := minioKV.NewMinIOKV(ctx1, option)
    	if err != nil {
    		panic(err)
    	}
    
    	// init msgStream
    	receiveBufSize := Params.LoadIndexReceiveBufSize
    	pulsarBufSize := Params.LoadIndexPulsarBufSize
    
    	msgStreamURL := Params.PulsarAddress
    
    	consumeChannels := Params.LoadIndexChannelNames
    	consumeSubName := Params.MsgChannelSubName
    
    	loadIndexStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize)
    	loadIndexStream.SetPulsarClient(msgStreamURL)
    	unmarshalDispatcher := util.NewUnmarshalDispatcher()
    	loadIndexStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
    
    	var stream msgstream.MsgStream = loadIndexStream
    
    	// init index load requests channel size by message receive buffer size
    	indexLoadChanSize := receiveBufSize
    
    	// init segment manager
    	loadIndexReqChan := make(chan []msgstream.TsMsg, indexLoadChanSize)
    	manager := newSegmentManager(ctx1, masterClient, dataClient, indexClient, replica, dmStream, loadIndexReqChan)
    
    	return &loadService{
    		ctx:    ctx1,
    		cancel: cancel,
    		client: MinioKV,
    
    		replica:        replica,
    		queryNodeID:    Params.QueryNodeID,
    		fieldIndexes:   make(map[string][]*internalpb2.IndexStats),
    		fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
    
    		loadIndexReqChan:   loadIndexReqChan,
    		loadIndexMsgStream: stream,
    
    		segManager: manager,
    	}
    }