Skip to content
Snippets Groups Projects
collection_replica.go 7.17 KiB
Newer Older
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

XuanYang-cn's avatar
XuanYang-cn committed
package datanode

import (
XuanYang-cn's avatar
XuanYang-cn committed
	"sync"
	"sync/atomic"
XuanYang-cn's avatar
XuanYang-cn committed

XuanYang-cn's avatar
XuanYang-cn committed
	"go.uber.org/zap"

Xiangyu Wang's avatar
Xiangyu Wang committed
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
type Replica interface {
XuanYang-cn's avatar
XuanYang-cn committed

	// collection
	getCollectionNum() int
	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
XuanYang-cn's avatar
XuanYang-cn committed
	removeCollection(collectionID UniqueID) error
	getCollectionByID(collectionID UniqueID) (*Collection, error)
	hasCollection(collectionID UniqueID) bool

XuanYang-cn's avatar
XuanYang-cn committed
	addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
	removeSegment(segmentID UniqueID) error
	hasSegment(segmentID UniqueID) bool
	setIsFlushed(segmentID UniqueID) error
	setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error
	setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error
	updateStatistics(segmentID UniqueID, numRows int64) error
陈庆祥's avatar
陈庆祥 committed
	getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
XuanYang-cn's avatar
XuanYang-cn committed
	getSegmentByID(segmentID UniqueID) (*Segment, error)
type Segment struct {
	segmentID    UniqueID
	collectionID UniqueID
	partitionID  UniqueID
	numRows      int64
	memorySize   int64
	isNew        atomic.Value // bool
	isFlushed    bool

	createTime    Timestamp // not using
	endTime       Timestamp // not using
陈庆祥's avatar
陈庆祥 committed
	startPosition *internalpb.MsgPosition
	endPosition   *internalpb.MsgPosition // not using
	channelName   string
type CollectionSegmentReplica struct {
	mu          sync.RWMutex
	segments    map[UniqueID]*Segment
	collections map[UniqueID]*Collection
}
func newReplica() Replica {
	segments := make(map[UniqueID]*Segment)
	collections := make(map[UniqueID]*Collection)
	var replica Replica = &CollectionSegmentReplica{
		segments:    segments,
		collections: collections,
// --- segment ---
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	if seg, ok := replica.segments[segmentID]; ok {
		return seg, nil
XuanYang-cn's avatar
XuanYang-cn committed
	}
	return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
XuanYang-cn's avatar
XuanYang-cn committed
}

func (replica *CollectionSegmentReplica) addSegment(
XuanYang-cn's avatar
XuanYang-cn committed
	segmentID UniqueID,
	collID UniqueID,
	partitionID UniqueID,
	channelName string) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed
	log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
XuanYang-cn's avatar
XuanYang-cn committed

陈庆祥's avatar
陈庆祥 committed
	position := &internalpb.MsgPosition{
XuanYang-cn's avatar
XuanYang-cn committed
		ChannelName: channelName,
	}

XuanYang-cn's avatar
XuanYang-cn committed
		segmentID:     segmentID,
		collectionID:  collID,
		partitionID:   partitionID,
		isFlushed:     false,
XuanYang-cn's avatar
XuanYang-cn committed
		createTime:    0,
		startPosition: position,
陈庆祥's avatar
陈庆祥 committed
		endPosition:   new(internalpb.MsgPosition),
		channelName:   channelName,

	seg.isNew.Store(true)

	replica.segments[segmentID] = seg
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	delete(replica.segments, segmentID)

	return nil
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	_, ok := replica.segments[segmentID]
	return ok
}

func (replica *CollectionSegmentReplica) setIsFlushed(segmentID UniqueID) error {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	if seg, ok := replica.segments[segmentID]; ok {
		seg.isFlushed = true
		return nil

	return fmt.Errorf("There's no segment %v", segmentID)
}

func (replica *CollectionSegmentReplica) setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	if startPos == nil {
		return fmt.Errorf("Nil MsgPosition")
	}

	if seg, ok := replica.segments[segmentID]; ok {
		seg.startPosition = startPos
		return nil
	}
	return fmt.Errorf("There's no segment %v", segmentID)
}

func (replica *CollectionSegmentReplica) setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	if endPos == nil {
		return fmt.Errorf("Nil MsgPosition")
	}

	if seg, ok := replica.segments[segmentID]; ok {
		seg.endPosition = endPos
		return nil
	}
	return fmt.Errorf("There's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	if seg, ok := replica.segments[segmentID]; ok {
		log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
		seg.memorySize = 0
		seg.numRows += numRows
		return nil

	return fmt.Errorf("There's no segment %v", segmentID)
陈庆祥's avatar
陈庆祥 committed
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	if seg, ok := replica.segments[segmentID]; ok {
		updates := &internalpb.SegmentStatisticsUpdates{
			SegmentID:  segmentID,
			MemorySize: seg.memorySize,
			NumRows:    seg.numRows,

		if seg.isNew.Load() == true {
			updates.StartPosition = seg.startPosition
			seg.isNew.Store(false)
		}

		if seg.isFlushed {
			updates.EndPosition = seg.endPosition
		}

		return updates, nil
	return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
// --- collection ---
func (replica *CollectionSegmentReplica) getCollectionNum() int {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	return len(replica.collections)
func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed

	if _, ok := replica.collections[collectionID]; ok {
		return fmt.Errorf("Create an existing collection=%s", schema.GetName())
XuanYang-cn's avatar
XuanYang-cn committed
	}
XuanYang-cn's avatar
XuanYang-cn committed

	newCollection, err := newCollection(collectionID, schema)
	if err != nil {
		return err
XuanYang-cn's avatar
XuanYang-cn committed
	}
	replica.collections[collectionID] = newCollection
XuanYang-cn's avatar
XuanYang-cn committed
	log.Debug("Create collection", zap.String("collection name", newCollection.GetName()))
func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed

	delete(replica.collections, collectionID)

	return nil
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	coll, ok := replica.collections[collectionID]
	if !ok {
		return nil, fmt.Errorf("Cannot get collection %d by ID: not exist", collectionID)
	return coll, nil
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	_, ok := replica.collections[collectionID]
	return ok
XuanYang-cn's avatar
XuanYang-cn committed
}