Skip to content
Snippets Groups Projects
Select Git revision
  • a03ab05c6c9b635813acb158b2df07301e58f3ed
  • master default protected
  • benchmark protected
  • v2.0.0-rc4
  • v2.0.0-rc2
  • v2.0.0-rc1
  • v1.1.1
  • v1.1.0
  • v1.0.0
  • v0.10.6
  • v0.10.5
  • v0.10.4
  • v0.10.3
  • v0.10.2
  • v0.10.1
  • v0.8.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
23 results

stats_processor.go

Blame
  • sunby's avatar
    Bingyi Sun 【孙秉义】 authored and yefu.chen committed
    Signed-off-by: default avatarsunby <bingyi.sun@zilliz.com>
    a03ab05c
    History
    stats_processor.go 2.27 KiB
    package master
    
    import (
    	"github.com/zilliztech/milvus-distributed/internal/errors"
    	"github.com/zilliztech/milvus-distributed/internal/msgstream"
    	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
    )
    
    type StatsProcessor struct {
    	metaTable    *metaTable
    	runtimeStats *RuntimeStats
    
    	segmentThreshold       float64
    	segmentThresholdFactor float64
    	globalTSOAllocator     func() (Timestamp, error)
    }
    
    func (processor *StatsProcessor) ProcessQueryNodeStats(msgPack *msgstream.MsgPack) error {
    	for _, msg := range msgPack.Msgs {
    		statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg)
    		if !ok {
    			return errors.Errorf("Type of message is not QueryNodeSegStatsMsg")
    		}
    
    		for _, segStat := range statsMsg.GetSegStats() {
    			if err := processor.processSegmentStat(segStat); err != nil {
    				return err
    			}
    		}
    
    		for _, fieldStat := range statsMsg.GetFieldStats() {
    			if err := processor.processFieldStat(statsMsg.PeerID, fieldStat); err != nil {
    				return err
    			}
    		}
    
    	}
    
    	return nil
    }
    
    func (processor *StatsProcessor) processSegmentStat(segStats *internalpb.SegmentStats) error {
    	if !segStats.GetRecentlyModified() {
    		return nil
    	}
    
    	segID := segStats.GetSegmentID()
    	segMeta, err := processor.metaTable.GetSegmentByID(segID)
    	if err != nil {
    		return err
    	}
    
    	segMeta.NumRows = segStats.NumRows
    	segMeta.MemSize = segStats.MemorySize
    
    	return processor.metaTable.UpdateSegment(segMeta)
    }
    
    func (processor *StatsProcessor) processFieldStat(peerID int64, fieldStats *internalpb.FieldStats) error {
    	collID := fieldStats.CollectionID
    	fieldID := fieldStats.FieldID
    
    	for _, stat := range fieldStats.IndexStats {
    		fieldStats := &FieldIndexRuntimeStats{
    			peerID:               peerID,
    			indexParams:          stat.IndexParams,
    			numOfRelatedSegments: stat.NumRelatedSegments,
    		}
    
    		if err := processor.runtimeStats.UpdateFieldStat(collID, fieldID, fieldStats); err != nil {
    			return err
    		}
    	}
    	return nil
    }
    
    func NewStatsProcessor(mt *metaTable, runTimeStats *RuntimeStats, globalTSOAllocator func() (Timestamp, error)) *StatsProcessor {
    	return &StatsProcessor{
    		metaTable:              mt,
    		runtimeStats:           runTimeStats,
    		segmentThreshold:       Params.SegmentSize * 1024 * 1024,
    		segmentThresholdFactor: Params.SegmentSizeFactor,
    		globalTSOAllocator:     globalTSOAllocator,
    	}
    }