Newer
Older
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
hasCollection(collectionID UniqueID) bool
addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
getChannelName(segID UniqueID) (string, error)
setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
// Segment is the data structure of segments in data node replica.
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew atomic.Value // bool
channelName string
field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
// CollectionSegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
type CollectionSegmentReplica struct {
mu sync.RWMutex
collections map[UniqueID]*Collection
posMu sync.Mutex
startPositions map[UniqueID][]*internalpb.MsgPosition
endPositions map[UniqueID][]*internalpb.MsgPosition
var _ Replica = &CollectionSegmentReplica{}
collections := make(map[UniqueID]*Collection)
var replica Replica = &CollectionSegmentReplica{
segments: segments,
collections: collections,
startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
func (replica *CollectionSegmentReplica) getChannelName(segID UniqueID) (string, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
seg, ok := replica.segments[segID]
if !ok {
return "", fmt.Errorf("Cannot find segment, id = %v", segID)
}
return seg.channelName, nil
}
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush
func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
seg, ok := replica.segments[segID]
if !ok {
return fmt.Errorf("Cannot find segment, id = %v", segID)
}
for fieldID, path := range field2Path {
buffpaths, ok := seg.field2Paths[fieldID]
if !ok {
buffpaths = make([]string, 0)
}
buffpaths = append(buffpaths, path)
seg.field2Paths[fieldID] = buffpaths
}
log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID))
return nil
}
func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segID]; ok {
return seg.field2Paths, nil
}
return nil, fmt.Errorf("Cannot find segment, id = %v", segID)
}
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
return seg, nil
return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
// `addSegment` add a new segment into replica when data node see the segment
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
partitionID UniqueID,
channelName string) error {
replica.mu.Lock()
defer replica.mu.Unlock()
log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
channelName: channelName,
field2Paths: make(map[UniqueID][]string),
seg.isNew.Store(true)
replica.segments[segmentID] = seg
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
replica.mu.Unlock()
replica.posMu.Lock()
delete(replica.startPositions, segmentID)
delete(replica.endPositions, segmentID)
replica.posMu.Unlock()
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.segments[segmentID]
return ok
}
// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
seg.memorySize = 0
seg.numRows += numRows
return nil
return fmt.Errorf("There's no segment %v", segmentID)
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
updates := &internalpb.SegmentStatisticsUpdates{
SegmentID: segmentID,
MemorySize: seg.memorySize,
NumRows: seg.numRows,
return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) getCollectionNum() int {
replica.mu.RLock()
defer replica.mu.RUnlock()
func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if _, ok := replica.collections[collectionID]; ok {
return fmt.Errorf("Create an existing collection=%s", schema.GetName())
newCollection, err := newCollection(collectionID, schema)
if err != nil {
return err
replica.collections[collectionID] = newCollection
log.Debug("Create collection", zap.String("collection name", newCollection.GetName()))
func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
delete(replica.collections, collectionID)
return nil
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
coll, ok := replica.collections[collectionID]
if !ok {
return nil, fmt.Errorf("Cannot get collection %d by ID: not exist", collectionID)
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.collections[collectionID]
return ok
// getSegmentsCheckpoints get current open segments checkpoints
func (replica *CollectionSegmentReplica) getSegmentsCheckpoints() {
replica.mu.RLock()
//for segID, segment := range replica.segments {
// if segment
//}
replica.mu.RUnlock()
}
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found
func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.startPositions[segID] = startPositions
return nil
}
// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed
func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.endPositions[segID] = endPositions
return nil
}
// getSegmentPositions returns stored segment start-end Positions
// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack!
// see setStartPositions, setEndPositions comment
func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
startPos := replica.startPositions[segID]
endPos := replica.endPositions[segID]
return startPos, endPos
}
func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
r1 := make(map[UniqueID]internalpb.MsgPosition)
r2 := make(map[UniqueID]int64)
for _, seg := range segs {
r1[seg] = *replica.endPositions[seg][0]
r2[seg] = replica.segments[seg].numRows