diff --git a/internal/proxynode/channels_mgr.go b/internal/proxynode/channels_mgr.go index 2e43557cc9c36b5c704da6814542c972e9d2c763..7429413888228a3f0df8ed62064daf974c567a36 100644 --- a/internal/proxynode/channels_mgr.go +++ b/internal/proxynode/channels_mgr.go @@ -64,16 +64,16 @@ func getUniqueIntGeneratorIns() uniqueIntGenerator { type getChannelsFuncType = func(collectionID UniqueID) (map[vChan]pChan, error) -type masterService interface { +type getChannelsService interface { GetChannels(collectionID UniqueID) (map[vChan]pChan, error) } -type mockMaster struct { +type mockGetChannelsService struct { collectionID2Channels map[UniqueID]map[vChan]pChan } -func newMockMaster() *mockMaster { - return &mockMaster{ +func newMockGetChannelsService() *mockGetChannelsService { + return &mockGetChannelsService{ collectionID2Channels: make(map[UniqueID]map[vChan]pChan), } } @@ -87,37 +87,7 @@ func genUniqueStr() string { return fmt.Sprintf("%X", b) } -func (m *mockMaster) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) { - channels, ok := m.collectionID2Channels[collectionID] - if ok { - return channels, nil - } - - channels = make(map[vChan]pChan) - l := rand.Uint64()%10 + 1 - for i := 0; uint64(i) < l; i++ { - channels[genUniqueStr()] = genUniqueStr() - } - - m.collectionID2Channels[collectionID] = channels - return channels, nil -} - -type queryService interface { - GetChannels(collectionID UniqueID) (map[vChan]pChan, error) -} - -type mockQueryService struct { - collectionID2Channels map[UniqueID]map[vChan]pChan -} - -func newMockQueryService() *mockQueryService { - return &mockQueryService{ - collectionID2Channels: make(map[UniqueID]map[vChan]pChan), - } -} - -func (m *mockQueryService) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) { +func (m *mockGetChannelsService) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) { channels, ok := m.collectionID2Channels[collectionID] if ok { return channels, nil @@ -475,9 +445,9 @@ func (mgr *channelsMgrImpl) removeAllDMLStream() error { return mgr.dmlChannelsMgr.removeAllStream() } -func newChannelsMgr(master masterService, query queryService, msgStreamFactory msgstream.Factory) channelsMgr { +func newChannelsMgr(getDmlChannelsFunc getChannelsFuncType, getDqlChannelsFunc getChannelsFuncType, msgStreamFactory msgstream.Factory) channelsMgr { return &channelsMgrImpl{ - dmlChannelsMgr: newSingleTypeChannelsMgr(master.GetChannels, msgStreamFactory), - dqlChannelsMgr: newSingleTypeChannelsMgr(query.GetChannels, msgStreamFactory), + dmlChannelsMgr: newSingleTypeChannelsMgr(getDmlChannelsFunc, msgStreamFactory), + dqlChannelsMgr: newSingleTypeChannelsMgr(getDqlChannelsFunc, msgStreamFactory), } } diff --git a/internal/proxynode/channels_mgr_test.go b/internal/proxynode/channels_mgr_test.go index bd8574c9fd894fc82418ec714d3ee1dc7f5ce751..05c5e261c0a18ba3da00dbd2e04a8c2e891af618 100644 --- a/internal/proxynode/channels_mgr_test.go +++ b/internal/proxynode/channels_mgr_test.go @@ -23,10 +23,10 @@ func TestNaiveUniqueIntGenerator_get(t *testing.T) { } func TestChannelsMgrImpl_getChannels(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) @@ -41,10 +41,10 @@ func TestChannelsMgrImpl_getChannels(t *testing.T) { } func TestChannelsMgrImpl_getVChannels(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) @@ -59,10 +59,10 @@ func TestChannelsMgrImpl_getVChannels(t *testing.T) { } func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) @@ -81,10 +81,10 @@ func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) { } func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) @@ -99,10 +99,10 @@ func TestChannelsMgrImpl_getDMLMsgStream(t *testing.T) { } func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) @@ -126,10 +126,10 @@ func TestChannelsMgrImpl_removeDMLMsgStream(t *testing.T) { } func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) defer mgr.removeAllDMLStream() num := 10 @@ -141,11 +141,11 @@ func TestChannelsMgrImpl_removeAllDMLMsgStream(t *testing.T) { } func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) - defer mgr.removeAllDQLStream() + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) + defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) _, err := mgr.getChannels(collID) @@ -163,11 +163,11 @@ func TestChannelsMgrImpl_createDQLMsgStream(t *testing.T) { } func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) - defer mgr.removeAllDQLStream() + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) + defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) _, err := mgr.getDQLStream(collID) @@ -181,11 +181,11 @@ func TestChannelsMgrImpl_getDQLMsgStream(t *testing.T) { } func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) - defer mgr.removeAllDQLStream() + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) + defer mgr.removeAllDMLStream() collID := UniqueID(getUniqueIntGeneratorIns().get()) _, err := mgr.getDQLStream(collID) @@ -208,11 +208,11 @@ func TestChannelsMgrImpl_removeDQLMsgStream(t *testing.T) { } func TestChannelsMgrImpl_removeAllDQLMsgStream(t *testing.T) { - master := newMockMaster() - query := newMockQueryService() + master := newMockGetChannelsService() + query := newMockGetChannelsService() factory := msgstream.NewSimpleMsgStreamFactory() - mgr := newChannelsMgr(master, query, factory) - defer mgr.removeAllDQLStream() + mgr := newChannelsMgr(master.GetChannels, query.GetChannels, factory) + defer mgr.removeAllDMLStream() num := 10 for i := 0; i < num; i++ { diff --git a/internal/proxynode/channels_time_ticker.go b/internal/proxynode/channels_time_ticker.go index aa7877d9e1c8e198eb5bc05d477d160e547d5561..19a890c412fb7fa2d1f416ea1e4ae58c0ecdaaff 100644 --- a/internal/proxynode/channels_time_ticker.go +++ b/internal/proxynode/channels_time_ticker.go @@ -35,19 +35,24 @@ type channelsTimeTicker interface { close() error addPChan(pchan pChan) error getLastTick(pchan pChan) (Timestamp, error) + getMinTsStatistics() (map[pChan]Timestamp, error) } type channelsTimeTickerImpl struct { - interval time.Duration // interval to synchronize - minTsStatistics map[pChan]Timestamp // pchan -> min Timestamp - statisticsMtx sync.RWMutex - getStatistics getPChanStatisticsFuncType - tso tsoAllocator - currents map[pChan]Timestamp - currentsMtx sync.RWMutex - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc + interval time.Duration // interval to synchronize + minTsStatistics map[pChan]Timestamp // pchan -> min Timestamp + statisticsMtx sync.RWMutex + getStatisticsFunc getPChanStatisticsFuncType + tso tsoAllocator + currents map[pChan]Timestamp + currentsMtx sync.RWMutex + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +func (ticker *channelsTimeTickerImpl) getMinTsStatistics() (map[pChan]Timestamp, error) { + panic("implement me") } func (ticker *channelsTimeTickerImpl) initStatistics() { @@ -86,7 +91,7 @@ func (ticker *channelsTimeTickerImpl) tick() error { for pchan := range ticker.currents { current := ticker.currents[pchan] - stats, err := ticker.getStatistics(pchan) + stats, err := ticker.getStatisticsFunc(pchan) if err != nil { continue } @@ -174,20 +179,20 @@ func newChannelsTimeTicker( ctx context.Context, interval time.Duration, pchans []pChan, - getStatistics getPChanStatisticsFuncType, + getStatisticsFunc getPChanStatisticsFuncType, tso tsoAllocator, ) *channelsTimeTickerImpl { ctx1, cancel := context.WithCancel(ctx) ticker := &channelsTimeTickerImpl{ - interval: interval, - minTsStatistics: make(map[pChan]Timestamp), - getStatistics: getStatistics, - tso: tso, - currents: make(map[pChan]Timestamp), - ctx: ctx1, - cancel: cancel, + interval: interval, + minTsStatistics: make(map[pChan]Timestamp), + getStatisticsFunc: getStatisticsFunc, + tso: tso, + currents: make(map[pChan]Timestamp), + ctx: ctx1, + cancel: cancel, } for _, pchan := range pchans { diff --git a/internal/proxynode/channels_time_ticker_test.go b/internal/proxynode/channels_time_ticker_test.go index 973ff7d2f9f2fc0b937f692c308e9f9eaedc3d80..3ade827069a61cf2aec4ff4089a121af662d3764 100644 --- a/internal/proxynode/channels_time_ticker_test.go +++ b/internal/proxynode/channels_time_ticker_test.go @@ -152,3 +152,50 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) { time.Sleep(time.Second) } + +func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) { + interval := time.Millisecond * 10 + pchanNum := rand.Uint64()%10 + 1 + pchans := make([]pChan, 0, pchanNum) + for i := 0; uint64(i) < pchanNum; i++ { + pchans = append(pchans, genUniqueStr()) + } + tso := newMockTsoAllocator() + ctx := context.Background() + + ticker := newChannelsTimeTicker(ctx, interval, pchans, getStatistics, tso) + err := ticker.start() + assert.Equal(t, nil, err) + + var wg sync.WaitGroup + wg.Add(1) + b := make(chan struct{}, 1) + go func() { + defer wg.Done() + timer := time.NewTicker(interval * 40) + for { + select { + case <-b: + return + case <-timer.C: + stats, err := ticker.getMinTsStatistics() + assert.Equal(t, nil, err) + for pchan, ts := range stats { + log.Debug("TestChannelsTimeTickerImpl_getLastTick", + zap.Any("pchan", pchan), + zap.Any("minTs", ts)) + } + } + } + }() + time.Sleep(time.Second) + b <- struct{}{} + wg.Wait() + + defer func() { + err := ticker.close() + assert.Equal(t, nil, err) + }() + + time.Sleep(time.Second) +} diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index bf8a8262a4ecd7166f2a5b2b5f8c87997983436f..110fc4ecef62132e10d88320ab4aa506dd57969f 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -1064,6 +1064,7 @@ func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertReque rowIDAllocator: node.idAllocator, segIDAssigner: node.segAssigner, chMgr: node.chMgr, + chTicker: node.chTicker, } if len(it.PartitionName) <= 0 { it.PartitionName = Params.DefaultPartitionName diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 09e694d542121cf2fcfb56fe62060dd35a1c891a..f173e204bf4554a63d61b7a9cabde0a0712445bb 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -14,11 +14,14 @@ package proxynode import ( "context" "errors" + "fmt" "math/rand" "sync" "sync/atomic" "time" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/allocator" @@ -228,10 +231,55 @@ func (node *ProxyNode) Init() error { node.segAssigner = segAssigner node.segAssigner.PeerID = Params.ProxyID - // TODO(dragondriver): use real master service & query service instance - mockMasterIns := newMockMaster() - mockQueryIns := newMockQueryService() - chMgr := newChannelsMgr(mockMasterIns, mockQueryIns, node.msFactory) + getDmlChannelsFunc := func(collectionID UniqueID) (map[vChan]pChan, error) { + req := &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + MsgID: 0, // todo + Timestamp: 0, // todo + SourceID: 0, // todo + }, + DbName: "", // todo + CollectionName: "", // todo + CollectionID: collectionID, + TimeStamp: 0, // todo + } + resp, err := node.masterService.DescribeCollection(node.ctx, req) + if err != nil { + log.Warn("DescribeCollection", zap.Error(err)) + return nil, err + } + if resp.Status.ErrorCode != 0 { + log.Warn("DescribeCollection", + zap.Any("ErrorCode", resp.Status.ErrorCode), + zap.Any("Reason", resp.Status.Reason)) + return nil, err + } + if len(resp.VirtualChannelNames) != len(resp.PhysicalChannelNames) { + err := fmt.Errorf( + "len(VirtualChannelNames): %v, len(PhysicalChannelNames): %v", + len(resp.VirtualChannelNames), + len(resp.PhysicalChannelNames)) + log.Warn("GetDmlChannels", zap.Error(err)) + return nil, err + } + + ret := make(map[vChan]pChan) + for idx, name := range resp.VirtualChannelNames { + if _, ok := ret[name]; ok { + err := fmt.Errorf( + "duplicated virtual channel found, vchan: %v, pchan: %v", + name, + resp.PhysicalChannelNames[idx]) + return nil, err + } + ret[name] = resp.PhysicalChannelNames[idx] + } + + return ret, nil + } + mockQueryService := newMockGetChannelsService() + chMgr := newChannelsMgr(getDmlChannelsFunc, mockQueryService.GetChannels, node.msFactory) node.chMgr = chMgr node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory) @@ -243,15 +291,61 @@ func (node *ProxyNode) Init() error { // TODO(dragondriver): read this from config interval := time.Millisecond * 200 - // TODO(dragondriver): use scheduler's method - getStats := func(ch pChan) (pChanStatistics, error) { - return pChanStatistics{}, nil - } - node.chTicker = newChannelsTimeTicker(node.ctx, interval, []string{}, getStats, tsoAllocator) + node.chTicker = newChannelsTimeTicker(node.ctx, interval, []string{}, node.sched.getPChanStatistics, tsoAllocator) return nil } +func (node *ProxyNode) sendChannelsTimeTickLoop() { + node.wg.Add(1) + go func() { + defer node.wg.Done() + + // TODO(dragondriver): read this from config + interval := time.Millisecond * 200 + timer := time.NewTicker(interval) + + for { + select { + case <-node.ctx.Done(): + return + case <-timer.C: + stats, err := node.chTicker.getMinTsStatistics() + if err != nil { + log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err)) + continue + } + + channels := make([]pChan, len(stats)) + tss := make([]Timestamp, len(stats)) + + req := &internalpb.ChannelTimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Undefined, // todo + MsgID: 0, // todo + Timestamp: 0, // todo + SourceID: node.session.ServerID, + }, + ChannelNames: channels, + Timestamps: tss, + } + + status, err := node.masterService.UpdateChannelTimeTick(node.ctx, req) + if err != nil { + log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err)) + continue + } + if status.ErrorCode != 0 { + log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", + zap.Any("ErrorCode", status.ErrorCode), + zap.Any("Reason", status.Reason)) + continue + } + } + } + }() +} + func (node *ProxyNode) Start() error { err := InitMetaCache(node.masterService) if err != nil { @@ -283,6 +377,8 @@ func (node *ProxyNode) Start() error { } log.Debug("start channelsTimeTicker") + node.sendChannelsTimeTickLoop() + // Start callbacks for _, cb := range node.startCallbacks { cb() diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index 69562cece41350290681ffaa4bd2ca00ebd39b78..b788cd626f125d9b364005f9e78135a84b568c87 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -116,6 +116,7 @@ type InsertTask struct { rowIDAllocator *allocator.IDAllocator segIDAssigner *SegIDAssigner chMgr channelsMgr + chTicker channelsTimeTicker } func (it *InsertTask) TraceCtx() context.Context { @@ -739,6 +740,14 @@ func (it *InsertTask) Execute(ctx context.Context) error { return err } + pchans, err := it.chMgr.getChannels(collID) + if err != nil { + return err + } + for _, pchan := range pchans { + _ = it.chTicker.addPChan(pchan) + } + // Assign SegmentID var pack *msgstream.MsgPack pack, err = it._assignSegmentID(stream, &msgPack)