Newer
Older
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
hasCollection(collectionID UniqueID) bool
addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
setIsFlushed(segmentID UniqueID) error
setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error
setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error
updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
// Segment is the data structure of segments in data node replica.
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew atomic.Value // bool
isFlushed bool
createTime Timestamp // not using
endTime Timestamp // not using
startPosition *internalpb.MsgPosition
endPosition *internalpb.MsgPosition // not using
field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
// CollectionSegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
type CollectionSegmentReplica struct {
mu sync.RWMutex
collections map[UniqueID]*Collection
}
var _ Replica = &CollectionSegmentReplica{}
collections := make(map[UniqueID]*Collection)
var replica Replica = &CollectionSegmentReplica{
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush
func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
seg, ok := replica.segments[segID]
if !ok {
return fmt.Errorf("Cannot find segment, id = %v", segID)
}
for fieldID, path := range field2Path {
buffpaths, ok := seg.field2Paths[fieldID]
if !ok {
buffpaths = make([]string, 0)
}
buffpaths = append(buffpaths, path)
seg.field2Paths[fieldID] = buffpaths
}
log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID))
return nil
}
func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segID]; ok {
return seg.field2Paths, nil
}
return nil, fmt.Errorf("Cannot find segment, id = %v", segID)
}
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
return seg, nil
return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
// `addSegment` add a new segment into replica when data node see the segment
// for the first time in insert channels. It sets the startPosition of a segment, and
// flags `isNew=true`
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
partitionID UniqueID,
channelName string) error {
replica.mu.Lock()
defer replica.mu.Unlock()
log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
field2Paths: make(map[UniqueID][]string),
seg.isNew.Store(true)
replica.segments[segmentID] = seg
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
delete(replica.segments, segmentID)
return nil
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.segments[segmentID]
return ok
}
func (replica *CollectionSegmentReplica) setIsFlushed(segmentID UniqueID) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
seg.isFlushed = true
return nil
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
return fmt.Errorf("There's no segment %v", segmentID)
}
func (replica *CollectionSegmentReplica) setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
if startPos == nil {
return fmt.Errorf("Nil MsgPosition")
}
if seg, ok := replica.segments[segmentID]; ok {
seg.startPosition = startPos
return nil
}
return fmt.Errorf("There's no segment %v", segmentID)
}
func (replica *CollectionSegmentReplica) setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
if endPos == nil {
return fmt.Errorf("Nil MsgPosition")
}
if seg, ok := replica.segments[segmentID]; ok {
seg.endPosition = endPos
return nil
}
return fmt.Errorf("There's no segment %v", segmentID)
// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
seg.memorySize = 0
seg.numRows += numRows
return nil
return fmt.Errorf("There's no segment %v", segmentID)
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
// if the segment's flag `isNew` is true, updates will contain a valid start position.
// if the segment's flag `isFlushed` is true, updates will contain a valid end position.
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
updates := &internalpb.SegmentStatisticsUpdates{
SegmentID: segmentID,
MemorySize: seg.memorySize,
NumRows: seg.numRows,
if seg.isNew.Load() == true {
updates.StartPosition = seg.startPosition
seg.isNew.Store(false)
}
if seg.isFlushed {
updates.EndPosition = seg.endPosition
}
return updates, nil
return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) getCollectionNum() int {
replica.mu.RLock()
defer replica.mu.RUnlock()
func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if _, ok := replica.collections[collectionID]; ok {
return fmt.Errorf("Create an existing collection=%s", schema.GetName())
newCollection, err := newCollection(collectionID, schema)
if err != nil {
return err
replica.collections[collectionID] = newCollection
log.Debug("Create collection", zap.String("collection name", newCollection.GetName()))
func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
delete(replica.collections, collectionID)
return nil
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
coll, ok := replica.collections[collectionID]
if !ok {
return nil, fmt.Errorf("Cannot get collection %d by ID: not exist", collectionID)
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.collections[collectionID]
return ok