Skip to content
Snippets Groups Projects
collection_replica.go 5.49 KiB
Newer Older
XuanYang-cn's avatar
XuanYang-cn committed
package datanode

import (
XuanYang-cn's avatar
XuanYang-cn committed
	"sync"

	"github.com/zilliztech/milvus-distributed/internal/errors"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
type Replica interface {
XuanYang-cn's avatar
XuanYang-cn committed

	// collection
	getCollectionNum() int
	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
XuanYang-cn's avatar
XuanYang-cn committed
	removeCollection(collectionID UniqueID) error
	getCollectionByID(collectionID UniqueID) (*Collection, error)
	hasCollection(collectionID UniqueID) bool

XuanYang-cn's avatar
XuanYang-cn committed
	addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
	removeSegment(segmentID UniqueID) error
	hasSegment(segmentID UniqueID) bool
	updateStatistics(segmentID UniqueID, numRows int64) error
	getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error)
XuanYang-cn's avatar
XuanYang-cn committed
	getSegmentByID(segmentID UniqueID) (*Segment, error)
type (
	Segment struct {
XuanYang-cn's avatar
XuanYang-cn committed
		segmentID     UniqueID
		collectionID  UniqueID
		partitionID   UniqueID
		numRows       int64
		memorySize    int64
		isNew         bool
		createTime    Timestamp // not using
		endTime       Timestamp // not using
		startPosition *internalpb2.MsgPosition
		endPosition   *internalpb2.MsgPosition // not using
	ReplicaImpl struct {
		mu          sync.RWMutex
		segments    []*Segment
		collections map[UniqueID]*Collection
func newReplica() Replica {
	segments := make([]*Segment, 0)
	collections := make(map[UniqueID]*Collection)
	var replica Replica = &ReplicaImpl{
		segments:    segments,
		collections: collections,
// --- segment ---
func (replica *ReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	for _, segment := range replica.segments {
XuanYang-cn's avatar
XuanYang-cn committed
		if segment.segmentID == segmentID {
			return segment, nil
		}
	}
	return nil, errors.Errorf("Cannot find segment, id = %v", segmentID)
XuanYang-cn's avatar
XuanYang-cn committed
}

func (replica *ReplicaImpl) addSegment(
XuanYang-cn's avatar
XuanYang-cn committed
	segmentID UniqueID,
	collID UniqueID,
	partitionID UniqueID,
	channelName string) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	log.Println("Add Segment", segmentID)
XuanYang-cn's avatar
XuanYang-cn committed

XuanYang-cn's avatar
XuanYang-cn committed
	position := &internalpb2.MsgPosition{
		ChannelName: channelName,
	}

XuanYang-cn's avatar
XuanYang-cn committed
		segmentID:     segmentID,
		collectionID:  collID,
		partitionID:   partitionID,
		isNew:         true,
		createTime:    0,
		startPosition: position,
		endPosition:   new(internalpb2.MsgPosition),
	replica.segments = append(replica.segments, seg)
func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	for index, ele := range replica.segments {
		if ele.segmentID == segmentID {
			log.Println("Removing segment:", segmentID)
			numOfSegs := len(replica.segments)
			replica.segments[index] = replica.segments[numOfSegs-1]
			replica.segments = replica.segments[:numOfSegs-1]
			return nil
		}
	}
	return errors.Errorf("Error, there's no segment %v", segmentID)
}

func (replica *ReplicaImpl) hasSegment(segmentID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	for _, ele := range replica.segments {
		if ele.segmentID == segmentID {
			return true
		}
	}
	return false
}

func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	for _, ele := range replica.segments {
		if ele.segmentID == segmentID {
			log.Printf("updating segment(%v) row nums: (%v)", segmentID, numRows)
			ele.memorySize = 0
			ele.numRows += numRows
			return nil
		}
	}
	return errors.Errorf("Error, there's no segment %v", segmentID)
}

func (replica *ReplicaImpl) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	for _, ele := range replica.segments {
		if ele.segmentID == segmentID {
			updates := &internalpb2.SegmentStatisticsUpdates{
				SegmentID:    segmentID,
				MemorySize:   ele.memorySize,
				NumRows:      ele.numRows,
				IsNewSegment: ele.isNew,
			}

			if ele.isNew {
XuanYang-cn's avatar
XuanYang-cn committed
				updates.StartPosition = ele.startPosition
				ele.isNew = false
			}
			return updates, nil
		}
	}
	return nil, errors.Errorf("Error, there's no segment %v", segmentID)
// --- collection ---
func (replica *ReplicaImpl) getCollectionNum() int {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	return len(replica.collections)
func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed

	if _, ok := replica.collections[collectionID]; ok {
		return errors.Errorf("Create an existing collection=%s", schema.GetName())
XuanYang-cn's avatar
XuanYang-cn committed
	}
XuanYang-cn's avatar
XuanYang-cn committed

	newCollection, err := newCollection(collectionID, schema)
	if err != nil {
		return err
XuanYang-cn's avatar
XuanYang-cn committed
	}
	replica.collections[collectionID] = newCollection
	log.Println("Create collection:", newCollection.GetName())

	return nil
func (replica *ReplicaImpl) removeCollection(collectionID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed

	delete(replica.collections, collectionID)

	return nil
func (replica *ReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	coll, ok := replica.collections[collectionID]
	if !ok {
		return nil, errors.Errorf("Cannot get collection %d by ID: not exist", collectionID)
	return coll, nil
func (replica *ReplicaImpl) hasCollection(collectionID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	_, ok := replica.collections[collectionID]
	return ok
XuanYang-cn's avatar
XuanYang-cn committed
}