diff --git a/internal/proxynode/channels_time_ticker.go b/internal/proxynode/channels_time_ticker.go index 1c821be84137f62eda47ffd3bf366025757ae827..aa7877d9e1c8e198eb5bc05d477d160e547d5561 100644 --- a/internal/proxynode/channels_time_ticker.go +++ b/internal/proxynode/channels_time_ticker.go @@ -20,7 +20,7 @@ type pChanStatistics struct { type channelsTimeTickerCheckFunc func(string, Timestamp) bool // ticker can update ts only when the minTs greater than the ts of ticker, we can use maxTs to update current later -type getPChanStatisticsFunc func(pChan) (pChanStatistics, error) +type getPChanStatisticsFuncType func(pChan) (pChanStatistics, error) // use interface tsoAllocator to keep channelsTimeTickerImpl testable type tsoAllocator interface { @@ -41,7 +41,7 @@ type channelsTimeTickerImpl struct { interval time.Duration // interval to synchronize minTsStatistics map[pChan]Timestamp // pchan -> min Timestamp statisticsMtx sync.RWMutex - getStatistics getPChanStatisticsFunc + getStatistics getPChanStatisticsFuncType tso tsoAllocator currents map[pChan]Timestamp currentsMtx sync.RWMutex @@ -96,6 +96,9 @@ func (ticker *channelsTimeTickerImpl) tick() error { ticker.currents[pchan] = getTs(current+Timestamp(ticker.interval), stats.maxTs, func(ts1, ts2 Timestamp) bool { return ts1 > ts2 }) + //} else if stats.invalid { + // ticker.minTsStatistics[pchan] = current + // ticker.currents[pchan] = current + Timestamp(ticker.interval) } } @@ -171,7 +174,7 @@ func newChannelsTimeTicker( ctx context.Context, interval time.Duration, pchans []pChan, - getStatistics getPChanStatisticsFunc, + getStatistics getPChanStatisticsFuncType, tso tsoAllocator, ) *channelsTimeTickerImpl { diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index b8342e767cfe7736c52da19869ffb6213a94b995..69562cece41350290681ffaa4bd2ca00ebd39b78 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -95,6 +95,15 @@ type task interface { Notify(err error) } +type ddlTask interface { + task +} + +type dmlTask interface { + task + getStatistics(pchan pChan) (pChanStatistics, error) +} + type BaseInsertTask = msgstream.InsertMsg type InsertTask struct { @@ -142,6 +151,37 @@ func (it *InsertTask) EndTs() Timestamp { return it.EndTimestamp } +func (it *InsertTask) getStatistics(pchan pChan) (pChanStatistics, error) { + collID, err := globalMetaCache.GetCollectionID(it.ctx, it.CollectionName) + if err != nil { + return pChanStatistics{invalid: true}, err + } + + _, err = it.chMgr.getChannels(collID) + if err != nil { + err := it.chMgr.createDMLMsgStream(collID) + if err != nil { + return pChanStatistics{invalid: true}, err + } + } + pchans, err := it.chMgr.getChannels(collID) + if err != nil { + return pChanStatistics{invalid: true}, err + } + + for _, ch := range pchans { + if pchan == ch { + return pChanStatistics{ + minTs: it.BeginTimestamp, + maxTs: it.EndTimestamp, + invalid: false, + }, nil + } + } + + return pChanStatistics{invalid: true}, nil +} + func (it *InsertTask) OnEnqueue() error { it.BaseInsertTask.InsertRequest.Base = &commonpb.MsgBase{} return nil diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 0b865105a0f643978bebc753d79b5e91202c82e1..187b55251426104873cdd8db6b066f89af9cc2d5 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -46,8 +46,8 @@ type TaskQueue interface { type BaseTaskQueue struct { unissuedTasks *list.List activeTasks map[Timestamp]task - utLock sync.Mutex - atLock sync.Mutex + utLock sync.RWMutex + atLock sync.RWMutex // maxTaskNum should keep still maxTaskNum int64 @@ -62,8 +62,8 @@ func (queue *BaseTaskQueue) utChan() <-chan int { } func (queue *BaseTaskQueue) utEmpty() bool { - queue.utLock.Lock() - defer queue.utLock.Unlock() + queue.utLock.RLock() + defer queue.utLock.RUnlock() return queue.unissuedTasks.Len() == 0 } @@ -84,8 +84,8 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error { } func (queue *BaseTaskQueue) FrontUnissuedTask() task { - queue.utLock.Lock() - defer queue.utLock.Unlock() + queue.utLock.RLock() + defer queue.utLock.RUnlock() if queue.unissuedTasks.Len() <= 0 { log.Warn("sorry, but the unissued task list is empty!") @@ -138,16 +138,16 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task { } func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task { - queue.utLock.Lock() - defer queue.utLock.Unlock() + queue.utLock.RLock() + defer queue.utLock.RUnlock() for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { if e.Value.(task).ID() == reqID { return e.Value.(task) } } - queue.atLock.Lock() - defer queue.atLock.Unlock() + queue.atLock.RLock() + defer queue.atLock.RUnlock() for ats := range queue.activeTasks { if queue.activeTasks[ats].ID() == reqID { return queue.activeTasks[ats] @@ -158,16 +158,16 @@ func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task { } func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { - queue.utLock.Lock() - defer queue.utLock.Unlock() + queue.utLock.RLock() + defer queue.utLock.RUnlock() for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { if e.Value.(task).EndTs() < ts { return false } } - queue.atLock.Lock() - defer queue.atLock.Unlock() + queue.atLock.RLock() + defer queue.atLock.RUnlock() for ats := range queue.activeTasks { if ats < ts { return false @@ -207,6 +207,34 @@ type DmTaskQueue struct { BaseTaskQueue } +func (queue *DmTaskQueue) getPChanStatistics(pchan pChan) (pChanStatistics, error) { + queue.atLock.RLock() + defer queue.atLock.RUnlock() + + stats := pChanStatistics{ + minTs: 0, + maxTs: ^uint64(0), + invalid: true, + } + + for _, t := range queue.activeTasks { + dmlT, _ := t.(dmlTask) + stat, err := dmlT.getStatistics(pchan) + if err != nil { + return pChanStatistics{invalid: true}, nil + } + if stat.minTs < stats.minTs { + stats.minTs = stat.minTs + } + if stat.maxTs > stats.maxTs { + stats.maxTs = stat.maxTs + } + stats.invalid = false + } + + return stats, nil +} + type DqTaskQueue struct { BaseTaskQueue } @@ -255,7 +283,7 @@ func NewDqTaskQueue(sched *TaskScheduler) *DqTaskQueue { type TaskScheduler struct { DdQueue TaskQueue - DmQueue TaskQueue + DmQueue *DmTaskQueue DqQueue TaskQueue idAllocator *allocator.IDAllocator @@ -528,3 +556,7 @@ func (sched *TaskScheduler) TaskDoneTest(ts Timestamp) bool { //dqTaskDone := sched.DqQueue.TaskDoneTest(ts) return ddTaskDone && dmTaskDone && true } + +func (sched *TaskScheduler) getPChanStatistics(pchan pChan) (pChanStatistics, error) { + return sched.DmQueue.getPChanStatistics(pchan) +}