Skip to content
Snippets Groups Projects
collection_replica.go 10 KiB
Newer Older
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

XuanYang-cn's avatar
XuanYang-cn committed
package datanode

import (
XuanYang-cn's avatar
XuanYang-cn committed
	"sync"
	"sync/atomic"
XuanYang-cn's avatar
XuanYang-cn committed

XuanYang-cn's avatar
XuanYang-cn committed
	"go.uber.org/zap"

Xiangyu Wang's avatar
Xiangyu Wang committed
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
type Replica interface {
XuanYang-cn's avatar
XuanYang-cn committed

	// collection
	getCollectionNum() int
	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
XuanYang-cn's avatar
XuanYang-cn committed
	removeCollection(collectionID UniqueID) error
	getCollectionByID(collectionID UniqueID) (*Collection, error)
	hasCollection(collectionID UniqueID) bool

XuanYang-cn's avatar
XuanYang-cn committed
	addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
	removeSegment(segmentID UniqueID) error
	hasSegment(segmentID UniqueID) bool
	updateStatistics(segmentID UniqueID, numRows int64) error
陈庆祥's avatar
陈庆祥 committed
	getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
XuanYang-cn's avatar
XuanYang-cn committed
	getSegmentByID(segmentID UniqueID) (*Segment, error)
	bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
	getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
	getChannelName(segID UniqueID) (string, error)
	setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
	setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
	getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
neza2017's avatar
neza2017 committed
	listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
// Segment is the data structure of segments in data node replica.
type Segment struct {
	segmentID    UniqueID
	collectionID UniqueID
	partitionID  UniqueID
	numRows      int64
	memorySize   int64
	isNew        atomic.Value // bool
	channelName  string
	field2Paths  map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
// CollectionSegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
type CollectionSegmentReplica struct {
	mu          sync.RWMutex
	segments    map[UniqueID]*Segment
	collections map[UniqueID]*Collection
neza2017's avatar
neza2017 committed
	posMu          sync.Mutex
	startPositions map[UniqueID][]*internalpb.MsgPosition
	endPositions   map[UniqueID][]*internalpb.MsgPosition
var _ Replica = &CollectionSegmentReplica{}

func newReplica() Replica {
	segments := make(map[UniqueID]*Segment)
	collections := make(map[UniqueID]*Collection)
	var replica Replica = &CollectionSegmentReplica{
neza2017's avatar
neza2017 committed
		segments:       segments,
		collections:    collections,
		startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
		endPositions:   make(map[UniqueID][]*internalpb.MsgPosition),
func (replica *CollectionSegmentReplica) getChannelName(segID UniqueID) (string, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	seg, ok := replica.segments[segID]
	if !ok {
		return "", fmt.Errorf("Cannot find segment, id = %v", segID)
	}

	return seg.channelName, nil
}

// bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush
func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	seg, ok := replica.segments[segID]
	if !ok {
		return fmt.Errorf("Cannot find segment, id = %v", segID)
	}

	for fieldID, path := range field2Path {
		buffpaths, ok := seg.field2Paths[fieldID]

		if !ok {
			buffpaths = make([]string, 0)
		}

		buffpaths = append(buffpaths, path)
		seg.field2Paths[fieldID] = buffpaths
	}
	log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID))

	return nil
}

func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()

	if seg, ok := replica.segments[segID]; ok {
		return seg.field2Paths, nil
	}

	return nil, fmt.Errorf("Cannot find segment, id = %v", segID)

}

func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	if seg, ok := replica.segments[segmentID]; ok {
		return seg, nil
XuanYang-cn's avatar
XuanYang-cn committed
	}
	return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
// `addSegment` add a new segment into replica when data node see the segment
func (replica *CollectionSegmentReplica) addSegment(
XuanYang-cn's avatar
XuanYang-cn committed
	segmentID UniqueID,
	collID UniqueID,
	partitionID UniqueID,
	channelName string) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed
	log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
XuanYang-cn's avatar
XuanYang-cn committed

		segmentID:    segmentID,
		collectionID: collID,
		partitionID:  partitionID,
		channelName:  channelName,
		field2Paths:  make(map[UniqueID][]string),

	seg.isNew.Store(true)

	replica.segments[segmentID] = seg
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
	replica.mu.Lock()
	delete(replica.segments, segmentID)
	replica.mu.Unlock()

	replica.posMu.Lock()
	delete(replica.startPositions, segmentID)
	delete(replica.endPositions, segmentID)
	replica.posMu.Unlock()

	return nil
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
	_, ok := replica.segments[segmentID]
	return ok
}

// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	if seg, ok := replica.segments[segmentID]; ok {
		log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
		seg.memorySize = 0
		seg.numRows += numRows
		return nil

	return fmt.Errorf("There's no segment %v", segmentID)
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
陈庆祥's avatar
陈庆祥 committed
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
	replica.mu.Lock()
	defer replica.mu.Unlock()
	if seg, ok := replica.segments[segmentID]; ok {
		updates := &internalpb.SegmentStatisticsUpdates{
			SegmentID:  segmentID,
			MemorySize: seg.memorySize,
			NumRows:    seg.numRows,

		return updates, nil
	return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
// --- collection ---
func (replica *CollectionSegmentReplica) getCollectionNum() int {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	return len(replica.collections)
func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed

	if _, ok := replica.collections[collectionID]; ok {
		return fmt.Errorf("Create an existing collection=%s", schema.GetName())
XuanYang-cn's avatar
XuanYang-cn committed
	}
XuanYang-cn's avatar
XuanYang-cn committed

	newCollection, err := newCollection(collectionID, schema)
	if err != nil {
		return err
XuanYang-cn's avatar
XuanYang-cn committed
	}
	replica.collections[collectionID] = newCollection
XuanYang-cn's avatar
XuanYang-cn committed
	log.Debug("Create collection", zap.String("collection name", newCollection.GetName()))
func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error {
	replica.mu.Lock()
	defer replica.mu.Unlock()
XuanYang-cn's avatar
XuanYang-cn committed

	delete(replica.collections, collectionID)

	return nil
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	coll, ok := replica.collections[collectionID]
	if !ok {
		return nil, fmt.Errorf("Cannot get collection %d by ID: not exist", collectionID)
	return coll, nil
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
	replica.mu.RLock()
	defer replica.mu.RUnlock()
XuanYang-cn's avatar
XuanYang-cn committed

	_, ok := replica.collections[collectionID]
	return ok
XuanYang-cn's avatar
XuanYang-cn committed
}
// getSegmentsCheckpoints get current open segments checkpoints
func (replica *CollectionSegmentReplica) getSegmentsCheckpoints() {
	replica.mu.RLock()
	//for segID, segment := range replica.segments {
	//	if segment
	//}
	replica.mu.RUnlock()
}

// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found
func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error {
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
	replica.startPositions[segID] = startPositions
	return nil
}

// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed
func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error {
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
	replica.endPositions[segID] = endPositions
	return nil
}

// getSegmentPositions returns stored segment start-end Positions
// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack!
// see setStartPositions, setEndPositions comment
func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
	startPos := replica.startPositions[segID]
	endPos := replica.endPositions[segID]
	return startPos, endPos
}
neza2017's avatar
neza2017 committed

func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
	replica.posMu.Lock()
	defer replica.posMu.Unlock()
	r1 := make(map[UniqueID]internalpb.MsgPosition)
	r2 := make(map[UniqueID]int64)
neza2017's avatar
neza2017 committed
	for _, seg := range segs {
		r1[seg] = *replica.endPositions[seg][0]
		r2[seg] = replica.segments[seg].numRows