Skip to content
Snippets Groups Projects
Select Git revision
  • b8b1a2e5eca6bbf20e3a29c5f9db8331ec52af2d
  • openEuler-1.0-LTS default protected
  • openEuler-22.09
  • OLK-5.10
  • openEuler-22.03-LTS
  • openEuler-22.03-LTS-Ascend
  • master
  • openEuler-22.03-LTS-LoongArch-NW
  • openEuler-22.09-HCK
  • openEuler-20.03-LTS-SP3
  • openEuler-21.09
  • openEuler-21.03
  • openEuler-20.09
  • 4.19.90-2210.5.0
  • 5.10.0-123.0.0
  • 5.10.0-60.63.0
  • 5.10.0-60.62.0
  • 4.19.90-2210.4.0
  • 5.10.0-121.0.0
  • 5.10.0-60.61.0
  • 4.19.90-2210.3.0
  • 5.10.0-60.60.0
  • 5.10.0-120.0.0
  • 5.10.0-60.59.0
  • 5.10.0-119.0.0
  • 4.19.90-2210.2.0
  • 4.19.90-2210.1.0
  • 5.10.0-118.0.0
  • 5.10.0-106.19.0
  • 5.10.0-60.58.0
  • 4.19.90-2209.6.0
  • 5.10.0-106.18.0
  • 5.10.0-106.17.0
33 results

cgroup-defs.h

Blame
  • stats_processor.go 2.27 KiB
    package master
    
    import (
    	"github.com/zilliztech/milvus-distributed/internal/errors"
    	"github.com/zilliztech/milvus-distributed/internal/msgstream"
    	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
    )
    
    type StatsProcessor struct {
    	metaTable    *metaTable
    	runtimeStats *RuntimeStats
    
    	segmentThreshold       float64
    	segmentThresholdFactor float64
    	globalTSOAllocator     func() (Timestamp, error)
    }
    
    func (processor *StatsProcessor) ProcessQueryNodeStats(msgPack *msgstream.MsgPack) error {
    	for _, msg := range msgPack.Msgs {
    		statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg)
    		if !ok {
    			return errors.Errorf("Type of message is not QueryNodeSegStatsMsg")
    		}
    
    		for _, segStat := range statsMsg.GetSegStats() {
    			if err := processor.processSegmentStat(segStat); err != nil {
    				return err
    			}
    		}
    
    		for _, fieldStat := range statsMsg.GetFieldStats() {
    			if err := processor.processFieldStat(statsMsg.PeerID, fieldStat); err != nil {
    				return err
    			}
    		}
    
    	}
    
    	return nil
    }
    
    func (processor *StatsProcessor) processSegmentStat(segStats *internalpb.SegmentStats) error {
    	if !segStats.GetRecentlyModified() {
    		return nil
    	}
    
    	segID := segStats.GetSegmentID()
    	segMeta, err := processor.metaTable.GetSegmentByID(segID)
    	if err != nil {
    		return err
    	}
    
    	segMeta.NumRows = segStats.NumRows
    	segMeta.MemSize = segStats.MemorySize
    
    	return processor.metaTable.UpdateSegment(segMeta)
    }
    
    func (processor *StatsProcessor) processFieldStat(peerID int64, fieldStats *internalpb.FieldStats) error {
    	collID := fieldStats.CollectionID
    	fieldID := fieldStats.FieldID
    
    	for _, stat := range fieldStats.IndexStats {
    		fieldStats := &FieldIndexRuntimeStats{
    			peerID:               peerID,
    			indexParams:          stat.IndexParams,
    			numOfRelatedSegments: stat.NumRelatedSegments,
    		}
    
    		if err := processor.runtimeStats.UpdateFieldStat(collID, fieldID, fieldStats); err != nil {
    			return err
    		}
    	}
    	return nil
    }
    
    func NewStatsProcessor(mt *metaTable, runTimeStats *RuntimeStats, globalTSOAllocator func() (Timestamp, error)) *StatsProcessor {
    	return &StatsProcessor{
    		metaTable:              mt,
    		runtimeStats:           runTimeStats,
    		segmentThreshold:       Params.SegmentSize * 1024 * 1024,
    		segmentThresholdFactor: Params.SegmentSizeFactor,
    		globalTSOAllocator:     globalTSOAllocator,
    	}
    }