Skip to content
Snippets Groups Projects
Commit 02df310f authored by zhenshan.cao's avatar zhenshan.cao Committed by yefu.chen
Browse files

Fix data race


Signed-off-by: default avatarzhenshan.cao <zhenshan.cao@zilliz.com>
parent f4566731
No related branches found
No related tags found
No related merge requests found
package master
import (
"sync"
"time"
buildindexclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client"
......@@ -20,9 +21,12 @@ type MockWriteNodeClient struct {
partitionTag string
timestamp Timestamp
collectionID UniqueID
lock sync.RWMutex
}
func (m *MockWriteNodeClient) FlushSegment(segmentID UniqueID, collectionID UniqueID, partitionTag string, timestamp Timestamp) error {
m.lock.Lock()
defer m.lock.Unlock()
m.flushTime = time.Now()
m.segmentID = segmentID
m.collectionID = collectionID
......@@ -33,6 +37,8 @@ func (m *MockWriteNodeClient) FlushSegment(segmentID UniqueID, collectionID Uniq
func (m *MockWriteNodeClient) DescribeSegment(segmentID UniqueID) (*writerclient.SegmentDescription, error) {
now := time.Now()
m.lock.RLock()
defer m.lock.RUnlock()
if now.Sub(m.flushTime).Seconds() > 2 {
return &writerclient.SegmentDescription{
SegmentID: segmentID,
......
......@@ -272,6 +272,8 @@ func TestSegmentManager_SycnWritenode(t *testing.T) {
syncWriteChan <- tsMsg
time.Sleep(300 * time.Millisecond)
segManager.mu.RLock()
defer segManager.mu.RUnlock()
status := segManager.collStatus[collID]
assert.Empty(t, status.segments)
}
......@@ -81,7 +81,7 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
// get a legal Timestamp
ts := ttBarrier.minTimestamp()
lastTt := atomic.LoadInt64(&(ttBarrier.lastTt))
if ttBarrier.lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) {
if lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) {
continue
}
ttBarrier.outTt <- ts
......
......@@ -526,8 +526,6 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
log.Fatalf("broadcast error = %v", err)
}
receiveMsg(outputStream, len(msgPack1.Msgs))
outputTtStream := (*outputStream).(*PulsarTtMsgStream)
fmt.Printf("timestamp = %v", outputTtStream.lastTimeStamp)
(*inputStream).Close()
(*outputStream).Close()
}
......@@ -210,6 +210,7 @@ func TestProxy_CreateCollection(t *testing.T) {
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
println("collectionName:", collectionName)
createCollection(t, collectionName)
dropCollection(t, collectionName)
}(&wg)
......
......@@ -14,7 +14,7 @@ import (
type TaskQueue interface {
utChan() <-chan int
utEmpty() bool
UTEmpty() bool
utFull() bool
addUnissuedTask(t task) error
FrontUnissuedTask() task
......@@ -44,7 +44,9 @@ func (queue *BaseTaskQueue) utChan() <-chan int {
return queue.utBufChan
}
func (queue *BaseTaskQueue) utEmpty() bool {
func (queue *BaseTaskQueue) UTEmpty() bool {
queue.utLock.Lock()
defer queue.utLock.Unlock()
return queue.unissuedTasks.Len() == 0
}
......@@ -316,7 +318,7 @@ func (sched *TaskScheduler) definitionLoop() {
case <-sched.ctx.Done():
return
case <-sched.DdQueue.utChan():
if !sched.DdQueue.utEmpty() {
if !sched.DdQueue.UTEmpty() {
t := sched.scheduleDdTask()
sched.processTask(t, sched.DdQueue)
}
......@@ -331,7 +333,7 @@ func (sched *TaskScheduler) manipulationLoop() {
case <-sched.ctx.Done():
return
case <-sched.DmQueue.utChan():
if !sched.DmQueue.utEmpty() {
if !sched.DmQueue.UTEmpty() {
t := sched.scheduleDmTask()
go sched.processTask(t, sched.DmQueue)
}
......@@ -348,7 +350,7 @@ func (sched *TaskScheduler) queryLoop() {
return
case <-sched.DqQueue.utChan():
log.Print("scheduler receive query request ...")
if !sched.DqQueue.utEmpty() {
if !sched.DqQueue.UTEmpty() {
t := sched.scheduleDqTask()
go sched.processTask(t, sched.DqQueue)
} else {
......
......@@ -24,12 +24,12 @@ type timeTick struct {
tsoAllocator *allocator.TimestampAllocator
tickMsgStream *msgstream.PulsarMsgStream
peerID UniqueID
wg sync.WaitGroup
ctx context.Context
cancel func()
timer *time.Ticker
peerID UniqueID
wg sync.WaitGroup
ctx context.Context
cancel func()
timer *time.Ticker
tickLock sync.RWMutex
checkFunc tickCheckFunc
}
......@@ -85,6 +85,8 @@ func (tt *timeTick) tick() error {
} else {
//log.Printf("proxy send time tick message")
}
tt.tickLock.Lock()
defer tt.tickLock.Unlock()
tt.lastTick = tt.currentTick
return nil
}
......@@ -105,6 +107,8 @@ func (tt *timeTick) tickLoop() {
}
func (tt *timeTick) LastTick() Timestamp {
tt.tickLock.RLock()
defer tt.tickLock.RUnlock()
return tt.lastTick
}
......
......@@ -332,11 +332,11 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb.Se
SegmentID: segmentID,
MemorySize: currentMemSize,
NumRows: segmentNumOfRows,
RecentlyModified: segment.recentlyModified,
RecentlyModified: segment.GetRecentlyModified(),
}
statisticData = append(statisticData, &stat)
segment.recentlyModified = false
segment.SetRecentlyModified(false)
}
return statisticData
......
......@@ -13,6 +13,7 @@ package querynode
import "C"
import (
"strconv"
"sync"
"unsafe"
"github.com/stretchr/testify/assert"
......@@ -28,6 +29,7 @@ type Segment struct {
collectionID UniqueID
lastMemSize int64
lastRowCount int64
mu sync.Mutex
recentlyModified bool
}
......@@ -35,6 +37,18 @@ func (s *Segment) ID() UniqueID {
return s.segmentID
}
func (s *Segment) SetRecentlyModified(modify bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.recentlyModified = modify
}
func (s *Segment) GetRecentlyModified() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.recentlyModified
}
//-------------------------------------------------------------------------------------- constructor and destructor
func newSegment(collection *Collection, segmentID int64, partitionTag string, collectionID UniqueID) *Segment {
/*
......@@ -161,7 +175,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
return errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
s.recentlyModified = true
s.SetRecentlyModified(true)
return nil
}
......
......@@ -44,6 +44,8 @@ func newTSafe() tSafe {
}
func (ts *tSafeImpl) registerTSafeWatcher(t *tSafeWatcher) {
ts.tSafeMu.Lock()
defer ts.tSafeMu.Unlock()
ts.watcherList = append(ts.watcherList, t)
}
......@@ -55,8 +57,9 @@ func (ts *tSafeImpl) get() Timestamp {
func (ts *tSafeImpl) set(t Timestamp) {
ts.tSafeMu.Lock()
defer ts.tSafeMu.Unlock()
ts.tSafe = t
ts.tSafeMu.Unlock()
for _, watcher := range ts.watcherList {
watcher.notify()
}
......
......@@ -171,6 +171,8 @@ func (mt *metaTable) addSegmentFlush(segmentID UniqueID, timestamp Timestamp) er
}
func (mt *metaTable) getFlushCloseTime(segmentID UniqueID) (Timestamp, error) {
mt.lock.RLock()
defer mt.lock.RUnlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
......@@ -179,6 +181,8 @@ func (mt *metaTable) getFlushCloseTime(segmentID UniqueID) (Timestamp, error) {
}
func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) {
mt.lock.RLock()
defer mt.lock.RUnlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
......@@ -187,6 +191,8 @@ func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) {
}
func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) {
mt.lock.RLock()
defer mt.lock.RUnlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return false, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
......@@ -195,7 +201,8 @@ func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) {
}
func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, error) {
mt.lock.RLock()
defer mt.lock.RUnlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return nil, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
......@@ -208,7 +215,8 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string,
}
func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) {
mt.lock.RLock()
defer mt.lock.RUnlock()
meta, ok := mt.collID2DdlMeta[collID]
if !ok {
return nil, errors.Errorf("collection not exists with ID = " + strconv.FormatInt(collID, 10))
......
......@@ -16,5 +16,6 @@ echo $MILVUS_DIR
go test -cover "${MILVUS_DIR}/kv/..." -failfast
go test -cover "${MILVUS_DIR}/proxy/..." -failfast
go test -cover "${MILVUS_DIR}/writenode/..." -failfast
go test -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast
go test -cover "${MILVUS_DIR}/master/..." -failfast
go test -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/util/..." -failfast
#go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment