From 59a3d83f9772a29fd39ebde5fe127f9c087cc943 Mon Sep 17 00:00:00 2001
From: dragondriver <jiquan.long@zilliz.com>
Date: Thu, 27 May 2021 17:09:50 +0800
Subject: [PATCH] Replace insertChannelsMap with channelsMgr (#5453)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
---
 internal/proxynode/channels_mgr.go      | 36 ++++++++++++++++
 internal/proxynode/channels_mgr_test.go | 37 ----------------
 internal/proxynode/impl.go              |  2 +
 internal/proxynode/proxy_node.go        |  7 +++
 internal/proxynode/task.go              | 57 +++----------------------
 5 files changed, 52 insertions(+), 87 deletions(-)

diff --git a/internal/proxynode/channels_mgr.go b/internal/proxynode/channels_mgr.go
index c8ed8973d..3df91ed47 100644
--- a/internal/proxynode/channels_mgr.go
+++ b/internal/proxynode/channels_mgr.go
@@ -3,6 +3,7 @@ package proxynode
 import (
 	"context"
 	"fmt"
+	"math/rand"
 	"runtime"
 	"sort"
 	"sync"
@@ -64,6 +65,41 @@ type masterService interface {
 	GetChannels(collectionID UniqueID) (map[vChan]pChan, error)
 }
 
+type mockMaster struct {
+	collectionID2Channels map[UniqueID]map[vChan]pChan
+}
+
+func newMockMaster() *mockMaster {
+	return &mockMaster{
+		collectionID2Channels: make(map[UniqueID]map[vChan]pChan),
+	}
+}
+
+func genUniqueStr() string {
+	l := rand.Uint64()%100 + 1
+	b := make([]byte, l)
+	if _, err := rand.Read(b); err != nil {
+		return ""
+	}
+	return fmt.Sprintf("%X", b)
+}
+
+func (m *mockMaster) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) {
+	channels, ok := m.collectionID2Channels[collectionID]
+	if ok {
+		return channels, nil
+	}
+
+	channels = make(map[vChan]pChan)
+	l := rand.Uint64()%10 + 1
+	for i := 0; uint64(i) < l; i++ {
+		channels[genUniqueStr()] = genUniqueStr()
+	}
+
+	m.collectionID2Channels[collectionID] = channels
+	return channels, nil
+}
+
 type channelsMgrImpl struct {
 	collectionID2VIDs map[UniqueID][]int // id are sorted
 	collMtx           sync.RWMutex
diff --git a/internal/proxynode/channels_mgr_test.go b/internal/proxynode/channels_mgr_test.go
index 4426a7259..3064f8929 100644
--- a/internal/proxynode/channels_mgr_test.go
+++ b/internal/proxynode/channels_mgr_test.go
@@ -1,8 +1,6 @@
 package proxynode
 
 import (
-	"fmt"
-	"math/rand"
 	"testing"
 
 	"github.com/milvus-io/milvus/internal/msgstream"
@@ -24,41 +22,6 @@ func TestNaiveUniqueIntGenerator_get(t *testing.T) {
 	}
 }
 
-type mockMaster struct {
-	collectionID2Channels map[UniqueID]map[vChan]pChan
-}
-
-func newMockMaster() *mockMaster {
-	return &mockMaster{
-		collectionID2Channels: make(map[UniqueID]map[vChan]pChan),
-	}
-}
-
-func genUniqueStr() string {
-	l := rand.Uint64()%100 + 1
-	b := make([]byte, l)
-	if _, err := rand.Read(b); err != nil {
-		return ""
-	}
-	return fmt.Sprintf("%X", b)
-}
-
-func (m *mockMaster) GetChannels(collectionID UniqueID) (map[vChan]pChan, error) {
-	channels, ok := m.collectionID2Channels[collectionID]
-	if ok {
-		return channels, nil
-	}
-
-	channels = make(map[vChan]pChan)
-	l := rand.Uint64()%10 + 1
-	for i := 0; uint64(i) < l; i++ {
-		channels[genUniqueStr()] = genUniqueStr()
-	}
-
-	m.collectionID2Channels[collectionID] = channels
-	return channels, nil
-}
-
 func TestChannelsMgrImpl_getChannels(t *testing.T) {
 	master := newMockMaster()
 	factory := msgstream.NewSimpleMsgStreamFactory()
diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go
index f91d1ecbf..bf8a8262a 100644
--- a/internal/proxynode/impl.go
+++ b/internal/proxynode/impl.go
@@ -143,6 +143,7 @@ func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.Dro
 		Condition:             NewTaskCondition(ctx),
 		DropCollectionRequest: request,
 		masterService:         node.masterService,
+		chMgr:                 node.chMgr,
 	}
 
 	err := node.sched.DdQueue.Enqueue(dct)
@@ -1062,6 +1063,7 @@ func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertReque
 		},
 		rowIDAllocator: node.idAllocator,
 		segIDAssigner:  node.segAssigner,
+		chMgr:          node.chMgr,
 	}
 	if len(it.PartitionName) <= 0 {
 		it.PartitionName = Params.DefaultPartitionName
diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go
index e5999ade7..b257da8d2 100644
--- a/internal/proxynode/proxy_node.go
+++ b/internal/proxynode/proxy_node.go
@@ -54,6 +54,8 @@ type ProxyNode struct {
 	proxyService  types.ProxyService
 	queryService  types.QueryService
 
+	chMgr channelsMgr
+
 	sched *TaskScheduler
 	tick  *timeTick
 
@@ -224,6 +226,11 @@ func (node *ProxyNode) Init() error {
 	node.segAssigner = segAssigner
 	node.segAssigner.PeerID = Params.ProxyID
 
+	// TODO(dragondriver): use real master service instance
+	mockMasterIns := newMockMaster()
+	chMgr := newChannelsMgr(mockMasterIns, node.msFactory)
+	node.chMgr = chMgr
+
 	node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory)
 	if err != nil {
 		return err
diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go
index eebbe541f..b8342e767 100644
--- a/internal/proxynode/task.go
+++ b/internal/proxynode/task.go
@@ -106,6 +106,7 @@ type InsertTask struct {
 	result         *milvuspb.InsertResponse
 	rowIDAllocator *allocator.IDAllocator
 	segIDAssigner  *SegIDAssigner
+	chMgr          channelsMgr
 }
 
 func (it *InsertTask) TraceCtx() context.Context {
@@ -684,30 +685,14 @@ func (it *InsertTask) Execute(ctx context.Context) error {
 
 	msgPack.Msgs[0] = tsMsg
 
-	stream, err := globalInsertChannelsMap.GetInsertMsgStream(collID)
+	stream, err := it.chMgr.getDMLStream(collID)
 	if err != nil {
-		resp, _ := it.dataService.GetInsertChannels(ctx, &datapb.GetInsertChannelsRequest{
-			Base: &commonpb.MsgBase{
-				MsgType:   commonpb.MsgType_Insert, // todo
-				MsgID:     it.Base.MsgID,           // todo
-				Timestamp: 0,                       // todo
-				SourceID:  Params.ProxyID,
-			},
-			DbID:         0, // todo
-			CollectionID: collID,
-		})
-		if resp == nil {
-			return errors.New("get insert channels resp is nil")
-		}
-		if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
-			return errors.New(resp.Status.Reason)
-		}
-		err = globalInsertChannelsMap.CreateInsertMsgStream(collID, resp.Values)
+		err = it.chMgr.createDMLMsgStream(collID)
 		if err != nil {
 			return err
 		}
 	}
-	stream, err = globalInsertChannelsMap.GetInsertMsgStream(collID)
+	stream, err = it.chMgr.getDMLStream(collID)
 	if err != nil {
 		it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
 		it.result.Status.Reason = err.Error()
@@ -849,36 +834,7 @@ func (cct *CreateCollectionTask) PreExecute(ctx context.Context) error {
 func (cct *CreateCollectionTask) Execute(ctx context.Context) error {
 	var err error
 	cct.result, err = cct.masterService.CreateCollection(ctx, cct.CreateCollectionRequest)
-	if err != nil {
-		return err
-	}
-	if cct.result.ErrorCode == commonpb.ErrorCode_Success {
-		collID, err := globalMetaCache.GetCollectionID(ctx, cct.CollectionName)
-		if err != nil {
-			return err
-		}
-		resp, _ := cct.dataServiceClient.GetInsertChannels(ctx, &datapb.GetInsertChannelsRequest{
-			Base: &commonpb.MsgBase{
-				MsgType:   commonpb.MsgType_Insert, // todo
-				MsgID:     cct.Base.MsgID,          // todo
-				Timestamp: 0,                       // todo
-				SourceID:  Params.ProxyID,
-			},
-			DbID:         0, // todo
-			CollectionID: collID,
-		})
-		if resp == nil {
-			return errors.New("get insert channels resp is nil")
-		}
-		if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
-			return errors.New(resp.Status.Reason)
-		}
-		err = globalInsertChannelsMap.CreateInsertMsgStream(collID, resp.Values)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
+	return err
 }
 
 func (cct *CreateCollectionTask) PostExecute(ctx context.Context) error {
@@ -891,6 +847,7 @@ type DropCollectionTask struct {
 	ctx           context.Context
 	masterService types.MasterService
 	result        *commonpb.Status
+	chMgr         channelsMgr
 }
 
 func (dct *DropCollectionTask) TraceCtx() context.Context {
@@ -951,7 +908,7 @@ func (dct *DropCollectionTask) Execute(ctx context.Context) error {
 		return err
 	}
 
-	err = globalInsertChannelsMap.CloseInsertMsgStream(collID)
+	err = dct.chMgr.removeDMLStream(collID)
 	if err != nil {
 		return err
 	}
-- 
GitLab