Newer
Older
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
hasCollection(collectionID UniqueID) bool
addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
setIsFlushed(segmentID UniqueID) error
setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error
setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error
updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew atomic.Value // bool
isFlushed bool
createTime Timestamp // not using
endTime Timestamp // not using
startPosition *internalpb.MsgPosition
endPosition *internalpb.MsgPosition // not using
type CollectionSegmentReplica struct {
mu sync.RWMutex
collections map[UniqueID]*Collection
}
collections := make(map[UniqueID]*Collection)
var replica Replica = &CollectionSegmentReplica{
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
return seg, nil
return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
partitionID UniqueID,
channelName string) error {
replica.mu.Lock()
defer replica.mu.Unlock()
log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
seg.isNew.Store(true)
replica.segments[segmentID] = seg
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
delete(replica.segments, segmentID)
return nil
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.segments[segmentID]
return ok
}
func (replica *CollectionSegmentReplica) setIsFlushed(segmentID UniqueID) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
seg.isFlushed = true
return nil
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
return fmt.Errorf("There's no segment %v", segmentID)
}
func (replica *CollectionSegmentReplica) setStartPosition(segmentID UniqueID, startPos *internalpb.MsgPosition) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
if startPos == nil {
return fmt.Errorf("Nil MsgPosition")
}
if seg, ok := replica.segments[segmentID]; ok {
seg.startPosition = startPos
return nil
}
return fmt.Errorf("There's no segment %v", segmentID)
}
func (replica *CollectionSegmentReplica) setEndPosition(segmentID UniqueID, endPos *internalpb.MsgPosition) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
if endPos == nil {
return fmt.Errorf("Nil MsgPosition")
}
if seg, ok := replica.segments[segmentID]; ok {
seg.endPosition = endPos
return nil
}
return fmt.Errorf("There's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
seg.memorySize = 0
seg.numRows += numRows
return nil
return fmt.Errorf("There's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
updates := &internalpb.SegmentStatisticsUpdates{
SegmentID: segmentID,
MemorySize: seg.memorySize,
NumRows: seg.numRows,
if seg.isNew.Load() == true {
updates.StartPosition = seg.startPosition
seg.isNew.Store(false)
}
if seg.isFlushed {
updates.EndPosition = seg.endPosition
}
return updates, nil
return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
func (replica *CollectionSegmentReplica) getCollectionNum() int {
replica.mu.RLock()
defer replica.mu.RUnlock()
func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if _, ok := replica.collections[collectionID]; ok {
return fmt.Errorf("Create an existing collection=%s", schema.GetName())
newCollection, err := newCollection(collectionID, schema)
if err != nil {
return err
replica.collections[collectionID] = newCollection
log.Debug("Create collection", zap.String("collection name", newCollection.GetName()))
func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
delete(replica.collections, collectionID)
return nil
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
coll, ok := replica.collections[collectionID]
if !ok {
return nil, fmt.Errorf("Cannot get collection %d by ID: not exist", collectionID)
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.collections[collectionID]
return ok