From 71b6c88c6e3b95bcc6b6303485eaeed222da47e5 Mon Sep 17 00:00:00 2001
From: sunby <bingyi.sun@zilliz.com>
Date: Thu, 4 Feb 2021 11:52:10 +0800
Subject: [PATCH] Defer lock's unlock in meta::DropSegment

Signed-off-by: sunby <bingyi.sun@zilliz.com>
---
 internal/datanode/data_sync_service_test.go   | 14 +++---
 .../datanode/flow_graph_insert_buffer_node.go | 17 ++++---
 .../flow_graph_msg_stream_input_node.go       | 15 +++---
 internal/dataservice/meta.go                  |  2 +-
 internal/dataservice/segment_allocator.go     |  4 +-
 .../dataservice/segment_allocator_test.go     |  9 ++--
 internal/dataservice/server.go                | 45 +++++++++--------
 internal/dataservice/watcher_test.go          |  2 +-
 internal/masterservice/master_service.go      | 23 +++++----
 internal/masterservice/master_service_test.go | 26 ++++++----
 internal/msgstream/msgstream.go               | 10 ----
 internal/msgstream/pulsarms/factory.go        | 32 ------------
 internal/msgstream/pulsarms/msg_test.go       | 12 +++--
 .../msgstream/pulsarms/pulsar_msgstream.go    | 37 ++++----------
 .../pulsarms/pulsar_msgstream_test.go         | 50 +++++++++++--------
 internal/proxynode/insert_channels.go         |  8 +--
 internal/proxynode/proxy_node.go              | 18 ++++---
 internal/proxynode/task.go                    |  3 +-
 internal/proxynode/task_scheduler.go          |  7 +--
 internal/proxynode/timetick.go                | 11 ++--
 internal/proxyservice/impl.go                 | 18 ++++---
 internal/querynode/data_sync_service_test.go  | 14 +++---
 .../flow_graph_msg_stream_input_nodes.go      | 22 +++++---
 .../querynode/flow_graph_service_time_node.go |  9 ++--
 internal/querynode/load_service_test.go       | 48 ++++++++++--------
 internal/querynode/query_node.go              | 10 ++--
 internal/querynode/search_service.go          | 12 +++--
 internal/querynode/search_service_test.go     | 34 +++++++------
 internal/querynode/stats_service.go           |  9 ++--
 internal/querynode/stats_service_test.go      | 10 ++--
 internal/queryservice/load_test.go            |  6 +--
 31 files changed, 274 insertions(+), 263 deletions(-)
 delete mode 100644 internal/msgstream/pulsarms/factory.go

diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go
index 7973aecb7..e5aa64d1e 100644
--- a/internal/datanode/data_sync_service_test.go
+++ b/internal/datanode/data_sync_service_test.go
@@ -82,12 +82,14 @@ func TestDataSyncService_Start(t *testing.T) {
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(ctx)
-	insertStream.AsProducer(insertChannels)
-
-	ddStream, _ := factory.NewMsgStream(ctx)
-	ddStream.AsProducer(ddChannels)
+	factory := msgstream.ProtoUDFactory{}
+	insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(pulsarURL)
+	insertStream.CreatePulsarProducers(insertChannels)
+
+	ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(pulsarURL)
+	ddStream.CreatePulsarProducers(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
 	insertMsgStream.Start()
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index 1b5336926..96b96814c 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -664,23 +664,26 @@ func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
 	}
 	minioPrefix := Params.InsertBinlogRootPath
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
+	factory := msgstream.ProtoUDFactory{}
 
 	//input stream, data node time tick
-	wTt, _ := factory.NewMsgStream(ctx)
-	wTt.AsProducer([]string{Params.TimeTickChannelName})
+	wTt := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	wTt.SetPulsarClient(Params.PulsarAddress)
+	wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName})
 	var wTtMsgStream msgstream.MsgStream = wTt
 	wTtMsgStream.Start()
 
 	// update statistics channel
-	segS, _ := factory.NewMsgStream(ctx)
-	segS.AsProducer([]string{Params.SegmentStatisticsChannelName})
+	segS := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	segS.SetPulsarClient(Params.PulsarAddress)
+	segS.CreatePulsarProducers([]string{Params.SegmentStatisticsChannelName})
 	var segStatisticsMsgStream msgstream.MsgStream = segS
 	segStatisticsMsgStream.Start()
 
 	// segment flush completed channel
-	cf, _ := factory.NewMsgStream(ctx)
-	cf.AsProducer([]string{Params.CompleteFlushChannelName})
+	cf := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	cf.SetPulsarClient(Params.PulsarAddress)
+	cf.CreatePulsarProducers([]string{Params.CompleteFlushChannelName})
 	var completeFlushStream msgstream.MsgStream = cf
 	completeFlushStream.Start()
 
diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go
index 0476ce4d6..649f6e5d2 100644
--- a/internal/datanode/flow_graph_msg_stream_input_node.go
+++ b/internal/datanode/flow_graph_msg_stream_input_node.go
@@ -15,9 +15,11 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
 	consumeChannels := Params.InsertChannelNames
 	consumeSubName := Params.MsgChannelSubName
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	insertStream, _ := factory.NewTtMsgStream(ctx)
-	insertStream.AsConsumer(consumeChannels, consumeSubName)
+	factory := msgstream.ProtoUDFactory{}
+	insertStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(Params.PulsarAddress)
+
+	insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
 
 	var stream msgstream.MsgStream = insertStream
 	node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
@@ -30,9 +32,10 @@ func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
 	maxParallelism := Params.FlowGraphMaxParallelism
 	consumeSubName := Params.MsgChannelSubName
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	tmpStream, _ := factory.NewTtMsgStream(ctx)
-	tmpStream.AsConsumer(Params.DDChannelNames, consumeSubName)
+	factory := msgstream.ProtoUDFactory{}
+	tmpStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	tmpStream.SetPulsarClient(Params.PulsarAddress)
+	tmpStream.CreatePulsarConsumers(Params.DDChannelNames, consumeSubName)
 
 	var stream msgstream.MsgStream = tmpStream
 	node := flowgraph.NewInputNode(&stream, "ddInputNode", maxQueueLength, maxParallelism)
diff --git a/internal/dataservice/meta.go b/internal/dataservice/meta.go
index b69cc746e..61b67842b 100644
--- a/internal/dataservice/meta.go
+++ b/internal/dataservice/meta.go
@@ -183,7 +183,7 @@ func (meta *meta) UpdateSegment(segmentInfo *datapb.SegmentInfo) error {
 
 func (meta *meta) DropSegment(segmentID UniqueID) error {
 	meta.ddLock.Lock()
-	meta.ddLock.Unlock()
+	defer meta.ddLock.Unlock()
 
 	if _, ok := meta.segID2Info[segmentID]; !ok {
 		return newErrSegmentNotFound(segmentID)
diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go
index d15d3b66a..a85dd4111 100644
--- a/internal/dataservice/segment_allocator.go
+++ b/internal/dataservice/segment_allocator.go
@@ -71,7 +71,7 @@ type (
 	}
 )
 
-func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl, error) {
+func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl {
 	segmentAllocator := &segmentAllocatorImpl{
 		mt:                     meta,
 		segments:               make(map[UniqueID]*segmentStatus),
@@ -80,7 +80,7 @@ func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl
 		segmentThresholdFactor: Params.SegmentSizeFactor,
 		allocator:              allocator,
 	}
-	return segmentAllocator, nil
+	return segmentAllocator
 }
 
 func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
diff --git a/internal/dataservice/segment_allocator_test.go b/internal/dataservice/segment_allocator_test.go
index d0e248edc..9f81783ef 100644
--- a/internal/dataservice/segment_allocator_test.go
+++ b/internal/dataservice/segment_allocator_test.go
@@ -17,8 +17,7 @@ func TestAllocSegment(t *testing.T) {
 	mockAllocator := newMockAllocator()
 	meta, err := newMemoryMeta(mockAllocator)
 	assert.Nil(t, err)
-	segAllocator, err := newSegmentAllocator(meta, mockAllocator)
-	assert.Nil(t, err)
+	segAllocator := newSegmentAllocator(meta, mockAllocator)
 
 	schema := newTestSchema()
 	collID, err := mockAllocator.allocID()
@@ -68,8 +67,7 @@ func TestSealSegment(t *testing.T) {
 	mockAllocator := newMockAllocator()
 	meta, err := newMemoryMeta(mockAllocator)
 	assert.Nil(t, err)
-	segAllocator, err := newSegmentAllocator(meta, mockAllocator)
-	assert.Nil(t, err)
+	segAllocator := newSegmentAllocator(meta, mockAllocator)
 
 	schema := newTestSchema()
 	collID, err := mockAllocator.allocID()
@@ -105,8 +103,7 @@ func TestExpireSegment(t *testing.T) {
 	mockAllocator := newMockAllocator()
 	meta, err := newMemoryMeta(mockAllocator)
 	assert.Nil(t, err)
-	segAllocator, err := newSegmentAllocator(meta, mockAllocator)
-	assert.Nil(t, err)
+	segAllocator := newSegmentAllocator(meta, mockAllocator)
 
 	schema := newTestSchema()
 	collID, err := mockAllocator.allocID()
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index a5537e11c..afbbdfd24 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -134,10 +134,7 @@ func (s *Server) Start() error {
 		return err
 	}
 	s.statsHandler = newStatsHandler(s.meta)
-	s.segAllocator, err = newSegmentAllocator(s.meta, s.allocator)
-	if err != nil {
-		return err
-	}
+	s.segAllocator = newSegmentAllocator(s.meta, s.allocator)
 	s.ddHandler = newDDHandler(s.meta, s.segAllocator)
 	s.initSegmentInfoChannel()
 	if err = s.loadMetaFromMaster(); err != nil {
@@ -173,22 +170,25 @@ func (s *Server) initMeta() error {
 }
 
 func (s *Server) initSegmentInfoChannel() {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	segmentInfoStream, _ := factory.NewMsgStream(s.ctx)
-	segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
+	factory := msgstream.ProtoUDFactory{}
+	segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	segmentInfoStream.SetPulsarClient(Params.PulsarAddress)
+	segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName})
 	s.segmentInfoStream = segmentInfoStream
 	s.segmentInfoStream.Start()
 }
 func (s *Server) initMsgProducer() error {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	ttMsgStream, _ := factory.NewMsgStream(s.ctx)
-	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
+	factory := msgstream.ProtoUDFactory{}
+	ttMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	ttMsgStream.SetPulsarClient(Params.PulsarAddress)
+	ttMsgStream.CreatePulsarConsumers([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
 	s.ttMsgStream = ttMsgStream
 	s.ttMsgStream.Start()
 	timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
 	dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
-	k2sStream, _ := factory.NewMsgStream(s.ctx)
-	k2sStream.AsProducer(Params.K2SChannelNames)
+	k2sStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	k2sStream.SetPulsarClient(Params.PulsarAddress)
+	k2sStream.CreatePulsarProducers(Params.K2SChannelNames)
 	s.k2sMsgStream = k2sStream
 	s.k2sMsgStream.Start()
 	k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
@@ -308,9 +308,10 @@ func (s *Server) startServerLoop() {
 
 func (s *Server) startStatsChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	statsStream, _ := factory.NewMsgStream(ctx)
-	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
+	factory := msgstream.ProtoUDFactory{}
+	statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	statsStream.SetPulsarClient(Params.PulsarAddress)
+	statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
 	statsStream.Start()
 	defer statsStream.Close()
 	for {
@@ -334,9 +335,10 @@ func (s *Server) startStatsChannel(ctx context.Context) {
 
 func (s *Server) startSegmentFlushChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	flushStream, _ := factory.NewMsgStream(ctx)
-	flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
+	factory := msgstream.ProtoUDFactory{}
+	flushStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	flushStream.SetPulsarClient(Params.PulsarAddress)
+	flushStream.CreatePulsarConsumers([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
 	flushStream.Start()
 	defer flushStream.Close()
 	for {
@@ -369,9 +371,10 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
 
 func (s *Server) startDDChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	ddStream, _ := factory.NewMsgStream(ctx)
-	ddStream.AsConsumer([]string{s.ddChannelName}, Params.DataServiceSubscriptionName)
+	factory := msgstream.ProtoUDFactory{}
+	ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(Params.PulsarAddress)
+	ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName)
 	ddStream.Start()
 	defer ddStream.Close()
 	for {
diff --git a/internal/dataservice/watcher_test.go b/internal/dataservice/watcher_test.go
index e476f9f02..11e8fbe5f 100644
--- a/internal/dataservice/watcher_test.go
+++ b/internal/dataservice/watcher_test.go
@@ -21,7 +21,7 @@ func TestDataNodeTTWatcher(t *testing.T) {
 	allocator := newMockAllocator()
 	meta, err := newMemoryMeta(allocator)
 	assert.Nil(t, err)
-	segAllocator, err := newSegmentAllocator(meta, allocator)
+	segAllocator := newSegmentAllocator(meta, allocator)
 	assert.Nil(t, err)
 	watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster)
 
diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go
index 545c0c401..53040b408 100644
--- a/internal/masterservice/master_service.go
+++ b/internal/masterservice/master_service.go
@@ -402,6 +402,7 @@ func (c *Core) tsLoop() {
 	}
 }
 func (c *Core) setMsgStreams() error {
+	dispatcherFactory := ms.ProtoUDFactory{}
 
 	if Params.PulsarAddress == "" {
 		return errors.Errorf("PulsarAddress is empty")
@@ -414,25 +415,26 @@ func (c *Core) setMsgStreams() error {
 	if Params.ProxyTimeTickChannel == "" {
 		return errors.Errorf("ProxyTimeTickChannel is empty")
 	}
-
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	proxyTimeTickStream, _ := factory.NewMsgStream(c.ctx)
-	proxyTimeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName)
+	proxyTimeTickStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
+	proxyTimeTickStream.SetPulsarClient(Params.PulsarAddress)
+	proxyTimeTickStream.CreatePulsarConsumers([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName)
 	proxyTimeTickStream.Start()
 
 	// master time tick channel
 	if Params.TimeTickChannel == "" {
 		return errors.Errorf("TimeTickChannel is empty")
 	}
-	timeTickStream, _ := factory.NewMsgStream(c.ctx)
-	timeTickStream.AsProducer([]string{Params.TimeTickChannel})
+	timeTickStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
+	timeTickStream.SetPulsarClient(Params.PulsarAddress)
+	timeTickStream.CreatePulsarProducers([]string{Params.TimeTickChannel})
 
 	// master dd channel
 	if Params.DdChannel == "" {
 		return errors.Errorf("DdChannel is empty")
 	}
-	ddStream, _ := factory.NewMsgStream(c.ctx)
-	ddStream.AsProducer([]string{Params.DdChannel})
+	ddStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(Params.PulsarAddress)
+	ddStream.CreatePulsarProducers([]string{Params.DdChannel})
 
 	c.SendTimeTick = func(t typeutil.Timestamp) error {
 		msgPack := ms.MsgPack{}
@@ -565,8 +567,9 @@ func (c *Core) setMsgStreams() error {
 	if Params.DataServiceSegmentChannel == "" {
 		return errors.Errorf("DataServiceSegmentChannel is empty")
 	}
-	dataServiceStream, _ := factory.NewMsgStream(c.ctx)
-	dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
+	dataServiceStream := pulsarms.NewPulsarMsgStream(c.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
+	dataServiceStream.SetPulsarClient(Params.PulsarAddress)
+	dataServiceStream.CreatePulsarConsumers([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
 	dataServiceStream.Start()
 	c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
 	c.DataNodeSegmentFlushCompletedChan = make(chan typeutil.UniqueID, 1024)
diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go
index 9c7a83706..a5584e643 100644
--- a/internal/masterservice/master_service_test.go
+++ b/internal/masterservice/master_service_test.go
@@ -146,19 +146,23 @@ func TestMasterService(t *testing.T) {
 	err = core.Start()
 	assert.Nil(t, err)
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	proxyTimeTickStream, _ := factory.NewMsgStream(ctx)
-	proxyTimeTickStream.AsProducer([]string{Params.ProxyTimeTickChannel})
-
-	dataServiceSegmentStream, _ := factory.NewMsgStream(ctx)
-	dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel})
-
-	timeTickStream, _ := factory.NewMsgStream(ctx)
-	timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
+	factory := ms.ProtoUDFactory{}
+	proxyTimeTickStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	proxyTimeTickStream.SetPulsarClient(Params.PulsarAddress)
+	proxyTimeTickStream.CreatePulsarProducers([]string{Params.ProxyTimeTickChannel})
+
+	dataServiceSegmentStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	dataServiceSegmentStream.SetPulsarClient(Params.PulsarAddress)
+	dataServiceSegmentStream.CreatePulsarProducers([]string{Params.DataServiceSegmentChannel})
+
+	timeTickStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	timeTickStream.SetPulsarClient(Params.PulsarAddress)
+	timeTickStream.CreatePulsarConsumers([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
 	timeTickStream.Start()
 
-	ddStream, _ := factory.NewMsgStream(ctx)
-	ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName)
+	ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(Params.PulsarAddress)
+	ddStream.CreatePulsarConsumers([]string{Params.DdChannel}, Params.MsgChannelSubName)
 	ddStream.Start()
 
 	time.Sleep(time.Second)
diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go
index e62e00595..dd11dba3b 100644
--- a/internal/msgstream/msgstream.go
+++ b/internal/msgstream/msgstream.go
@@ -1,8 +1,6 @@
 package msgstream
 
 import (
-	"context"
-
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
 )
@@ -26,17 +24,9 @@ type MsgStream interface {
 	Start()
 	Close()
 	Chan() <-chan *MsgPack
-	AsProducer(channels []string)
-	AsConsumer(channels []string, subName string)
-	SetRepackFunc(repackFunc RepackFunc)
 
 	Produce(*MsgPack) error
 	Broadcast(*MsgPack) error
 	Consume() *MsgPack
 	Seek(offset *MsgPosition) error
 }
-
-type Factory interface {
-	NewMsgStream(ctx context.Context) (MsgStream, error)
-	NewTtMsgStream(ctx context.Context) (MsgStream, error)
-}
diff --git a/internal/msgstream/pulsarms/factory.go b/internal/msgstream/pulsarms/factory.go
deleted file mode 100644
index 8e10c2e3f..000000000
--- a/internal/msgstream/pulsarms/factory.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package pulsarms
-
-import (
-	"context"
-
-	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-)
-
-type Factory struct {
-	dispatcherFactory msgstream.ProtoUDFactory
-	address           string
-	receiveBufSize    int64
-	pulsarBufSize     int64
-}
-
-func (f *Factory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
-	return newPulsarMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
-}
-
-func (f *Factory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
-	return NewPulsarTtMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
-}
-
-func NewFactory(address string, receiveBufSize int64, pulsarBufSize int64) *Factory {
-	f := &Factory{
-		dispatcherFactory: msgstream.ProtoUDFactory{},
-		address:           address,
-		receiveBufSize:    receiveBufSize,
-		pulsarBufSize:     pulsarBufSize,
-	}
-	return f
-}
diff --git a/internal/msgstream/pulsarms/msg_test.go b/internal/msgstream/pulsarms/msg_test.go
index 51d999c4d..a1051e231 100644
--- a/internal/msgstream/pulsarms/msg_test.go
+++ b/internal/msgstream/pulsarms/msg_test.go
@@ -141,16 +141,18 @@ func TestStream_task_Insert(t *testing.T) {
 	msgPack.Msgs = append(msgPack.Msgs, getInsertTask(3, 3))
 
 	factory := msgstream.ProtoUDFactory{}
-	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	inputStream.AsProducer(producerChannels)
+	inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	inputStream.SetPulsarClient(pulsarAddress)
+	inputStream.CreatePulsarProducers(producerChannels)
 	inputStream.SetRepackFunc(newRepackFunc)
 	inputStream.Start()
 
 	dispatcher := factory.NewUnmarshalDispatcher()
-	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, dispatcher)
+	outputStream := NewPulsarMsgStream(context.Background(), 100, 100, dispatcher)
+	outputStream.SetPulsarClient(pulsarAddress)
 	testTask := InsertTask{}
 	dispatcher.AddMsgTemplate(commonpb.MsgType_kInsert, testTask.Unmarshal)
-	outputStream.AsConsumer(consumerChannels, consumerSubName)
+	outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
 	outputStream.Start()
 
 	err := inputStream.Produce(&msgPack)
@@ -159,7 +161,7 @@ func TestStream_task_Insert(t *testing.T) {
 	}
 	receiveCount := 0
 	for {
-		result := outputStream.Consume()
+		result := (*outputStream).Consume()
 		if len(result.Msgs) > 0 {
 			msgs := result.Msgs
 			for _, v := range msgs {
diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go
index 5f9986861..523426669 100644
--- a/internal/msgstream/pulsarms/pulsar_msgstream.go
+++ b/internal/msgstream/pulsarms/pulsar_msgstream.go
@@ -52,12 +52,7 @@ type PulsarMsgStream struct {
 	pulsarBufSize    int64
 }
 
-func newPulsarMsgStream(ctx context.Context,
-	address string,
-	receiveBufSize int64,
-	pulsarBufSize int64,
-	unmarshal UnmarshalDispatcher) (*PulsarMsgStream, error) {
-
+func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64, pulsarBufSize int64, unmarshal UnmarshalDispatcher) *PulsarMsgStream {
 	streamCtx, streamCancel := context.WithCancel(ctx)
 	producers := make([]Producer, 0)
 	consumers := make([]Consumer, 0)
@@ -71,17 +66,19 @@ func newPulsarMsgStream(ctx context.Context,
 		unmarshal:        unmarshal,
 		pulsarBufSize:    pulsarBufSize,
 	}
+	stream.receiveBuf = make(chan *MsgPack, receiveBufSize)
+	return stream
+}
+
+func (ms *PulsarMsgStream) SetPulsarClient(address string) {
 	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
 	if err != nil {
 		log.Printf("Set pulsar client failed, error = %v", err)
-		return nil, err
 	}
-	stream.client = client
-	stream.receiveBuf = make(chan *MsgPack, receiveBufSize)
-	return stream, nil
+	ms.client = client
 }
 
-func (ms *PulsarMsgStream) AsProducer(channels []string) {
+func (ms *PulsarMsgStream) CreatePulsarProducers(channels []string) {
 	for i := 0; i < len(channels); i++ {
 		fn := func() error {
 			pp, err := ms.client.CreateProducer(pulsar.ProducerOptions{Topic: channels[i]})
@@ -103,7 +100,7 @@ func (ms *PulsarMsgStream) AsProducer(channels []string) {
 	}
 }
 
-func (ms *PulsarMsgStream) AsConsumer(channels []string,
+func (ms *PulsarMsgStream) CreatePulsarConsumers(channels []string,
 	subName string) {
 	for i := 0; i < len(channels); i++ {
 		fn := func() error {
@@ -482,12 +479,7 @@ type PulsarTtMsgStream struct {
 	lastTimeStamp Timestamp
 }
 
-func NewPulsarTtMsgStream(ctx context.Context,
-	address string,
-	receiveBufSize int64,
-	pulsarBufSize int64,
-	unmarshal msgstream.UnmarshalDispatcher) (*PulsarTtMsgStream, error) {
-
+func NewPulsarTtMsgStream(ctx context.Context, receiveBufSize int64, pulsarBufSize int64, unmarshal msgstream.UnmarshalDispatcher) *PulsarTtMsgStream {
 	streamCtx, streamCancel := context.WithCancel(ctx)
 	pulsarMsgStream := PulsarMsgStream{
 		ctx:           streamCtx,
@@ -495,17 +487,10 @@ func NewPulsarTtMsgStream(ctx context.Context,
 		pulsarBufSize: pulsarBufSize,
 		unmarshal:     unmarshal,
 	}
-
-	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
-	if err != nil {
-		log.Printf("Set pulsar client failed, error = %v", err)
-		return nil, err
-	}
-	pulsarMsgStream.client = client
 	pulsarMsgStream.receiveBuf = make(chan *MsgPack, receiveBufSize)
 	return &PulsarTtMsgStream{
 		PulsarMsgStream: pulsarMsgStream,
-	}, nil
+	}
 }
 
 func (ms *PulsarTtMsgStream) Start() {
diff --git a/internal/msgstream/pulsarms/pulsar_msgstream_test.go b/internal/msgstream/pulsarms/pulsar_msgstream_test.go
index 2fcfad927..f9e9bb309 100644
--- a/internal/msgstream/pulsarms/pulsar_msgstream_test.go
+++ b/internal/msgstream/pulsarms/pulsar_msgstream_test.go
@@ -176,8 +176,9 @@ func initPulsarStream(pulsarAddress string,
 	factory := msgstream.ProtoUDFactory{}
 
 	// set input stream
-	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	inputStream.AsProducer(producerChannels)
+	inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	inputStream.SetPulsarClient(pulsarAddress)
+	inputStream.CreatePulsarProducers(producerChannels)
 	for _, opt := range opts {
 		inputStream.SetRepackFunc(opt)
 	}
@@ -185,8 +186,9 @@ func initPulsarStream(pulsarAddress string,
 	var input msgstream.MsgStream = inputStream
 
 	// set output stream
-	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	outputStream.AsConsumer(consumerChannels, consumerSubName)
+	outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	outputStream.SetPulsarClient(pulsarAddress)
+	outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
 	outputStream.Start()
 	var output msgstream.MsgStream = outputStream
 
@@ -201,8 +203,9 @@ func initPulsarTtStream(pulsarAddress string,
 	factory := msgstream.ProtoUDFactory{}
 
 	// set input stream
-	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	inputStream.AsProducer(producerChannels)
+	inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	inputStream.SetPulsarClient(pulsarAddress)
+	inputStream.CreatePulsarProducers(producerChannels)
 	for _, opt := range opts {
 		inputStream.SetRepackFunc(opt)
 	}
@@ -210,8 +213,9 @@ func initPulsarTtStream(pulsarAddress string,
 	var input msgstream.MsgStream = inputStream
 
 	// set output stream
-	outputStream, _ := NewPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	outputStream.AsConsumer(consumerChannels, consumerSubName)
+	outputStream := NewPulsarTtMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	outputStream.SetPulsarClient(pulsarAddress)
+	outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
 	outputStream.Start()
 	var output msgstream.MsgStream = outputStream
 
@@ -413,12 +417,14 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
 	msgPack.Msgs = append(msgPack.Msgs, insertMsg)
 
 	factory := msgstream.ProtoUDFactory{}
-	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	inputStream.AsProducer(producerChannels)
+	inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	inputStream.SetPulsarClient(pulsarAddress)
+	inputStream.CreatePulsarProducers(producerChannels)
 	inputStream.Start()
 
-	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	outputStream.AsConsumer(consumerChannels, consumerSubName)
+	outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	outputStream.SetPulsarClient(pulsarAddress)
+	outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
 	outputStream.Start()
 	var output msgstream.MsgStream = outputStream
 
@@ -464,12 +470,14 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
 	msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
 
 	factory := msgstream.ProtoUDFactory{}
-	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	inputStream.AsProducer(producerChannels)
+	inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	inputStream.SetPulsarClient(pulsarAddress)
+	inputStream.CreatePulsarProducers(producerChannels)
 	inputStream.Start()
 
-	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	outputStream.AsConsumer(consumerChannels, consumerSubName)
+	outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	outputStream.SetPulsarClient(pulsarAddress)
+	outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
 	outputStream.Start()
 	var output msgstream.MsgStream = outputStream
 
@@ -495,12 +503,14 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
 	msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kQueryNodeStats, 4, 4))
 
 	factory := msgstream.ProtoUDFactory{}
-	inputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	inputStream.AsProducer(producerChannels)
+	inputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	inputStream.SetPulsarClient(pulsarAddress)
+	inputStream.CreatePulsarProducers(producerChannels)
 	inputStream.Start()
 
-	outputStream, _ := newPulsarMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
-	outputStream.AsConsumer(consumerChannels, consumerSubName)
+	outputStream := NewPulsarMsgStream(context.Background(), 100, 100, factory.NewUnmarshalDispatcher())
+	outputStream.SetPulsarClient(pulsarAddress)
+	outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName)
 	outputStream.Start()
 	var output msgstream.MsgStream = outputStream
 
diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go
index deda7ef03..44b0dce3e 100644
--- a/internal/proxynode/insert_channels.go
+++ b/internal/proxynode/insert_channels.go
@@ -83,6 +83,7 @@ type InsertChannelsMap struct {
 func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []string) error {
 	m.mtx.Lock()
 	defer m.mtx.Unlock()
+	factory := msgstream.ProtoUDFactory{}
 
 	_, ok := m.collectionID2InsertChannels[collID]
 	if ok {
@@ -100,10 +101,9 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st
 	}
 	m.insertChannels = append(m.insertChannels, channels)
 	m.collectionID2InsertChannels[collID] = len(m.insertChannels) - 1
-
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamInsertBufSize, 1024)
-	stream, _ := factory.NewMsgStream(context.Background())
-	stream.AsProducer(channels)
+	stream := pulsarms.NewPulsarMsgStream(context.Background(), Params.MsgStreamInsertBufSize, 1024, factory.NewUnmarshalDispatcher())
+	stream.SetPulsarClient(Params.PulsarAddress)
+	stream.CreatePulsarProducers(channels)
 	repack := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
 		return insertRepackFunc(tsMsgs, hashKeys, m.nodeInstance.segAssigner, true)
 	}
diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go
index ee999d4d2..8b00c66c6 100644
--- a/internal/proxynode/proxy_node.go
+++ b/internal/proxynode/proxy_node.go
@@ -54,8 +54,8 @@ type NodeImpl struct {
 	tsoAllocator *allocator.TimestampAllocator
 	segAssigner  *SegIDAssigner
 
-	manipulationMsgStream msgstream.MsgStream
-	queryMsgStream        msgstream.MsgStream
+	manipulationMsgStream *pulsarms.PulsarMsgStream
+	queryMsgStream        *pulsarms.PulsarMsgStream
 
 	tracer opentracing.Tracer
 	closer io.Closer
@@ -106,7 +106,7 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string)
 }
 
 func (node *NodeImpl) Init() error {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024)
+	factory := msgstream.ProtoUDFactory{}
 
 	// todo wait for proxyservice state changed to Healthy
 
@@ -195,8 +195,11 @@ func (node *NodeImpl) Init() error {
 	}
 	opentracing.SetGlobalTracer(node.tracer)
 
-	node.queryMsgStream, _ = factory.NewMsgStream(node.ctx)
-	node.queryMsgStream.AsProducer(Params.SearchChannelNames)
+	pulsarAddress := Params.PulsarAddress
+
+	node.queryMsgStream = pulsarms.NewPulsarMsgStream(node.ctx, Params.MsgStreamSearchBufSize, 1024, factory.NewUnmarshalDispatcher())
+	node.queryMsgStream.SetPulsarClient(pulsarAddress)
+	node.queryMsgStream.CreatePulsarProducers(Params.SearchChannelNames)
 	log.Println("create query message stream ...")
 
 	masterAddr := Params.MasterAddress
@@ -222,8 +225,9 @@ func (node *NodeImpl) Init() error {
 	node.segAssigner = segAssigner
 	node.segAssigner.PeerID = Params.ProxyID
 
-	node.manipulationMsgStream, _ = factory.NewMsgStream(node.ctx)
-	node.manipulationMsgStream.AsProducer(Params.InsertChannelNames)
+	node.manipulationMsgStream = pulsarms.NewPulsarMsgStream(node.ctx, Params.MsgStreamInsertBufSize, 1024, factory.NewUnmarshalDispatcher())
+	node.manipulationMsgStream.SetPulsarClient(pulsarAddress)
+	node.manipulationMsgStream.CreatePulsarProducers(Params.InsertChannelNames)
 	repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
 		return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
 	}
diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go
index c915f3a0a..f29de8e2c 100644
--- a/internal/proxynode/task.go
+++ b/internal/proxynode/task.go
@@ -11,6 +11,7 @@ import (
 	"github.com/golang/protobuf/proto"
 	"github.com/zilliztech/milvus-distributed/internal/allocator"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@@ -409,7 +410,7 @@ func (dct *DropCollectionTask) PostExecute() error {
 type SearchTask struct {
 	Condition
 	internalpb2.SearchRequest
-	queryMsgStream msgstream.MsgStream
+	queryMsgStream *pulsarms.PulsarMsgStream
 	resultBuf      chan []*internalpb2.SearchResults
 	result         *milvuspb.SearchResults
 	query          *milvuspb.SearchRequest
diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go
index 24cbc5771..6a61633cc 100644
--- a/internal/proxynode/task_scheduler.go
+++ b/internal/proxynode/task_scheduler.go
@@ -371,10 +371,11 @@ func (sched *TaskScheduler) queryLoop() {
 
 func (sched *TaskScheduler) queryResultLoop() {
 	defer sched.wg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchResultBufSize, 1024)
+	factory := msgstream.ProtoUDFactory{}
 
-	queryResultMsgStream, _ := factory.NewMsgStream(sched.ctx)
-	queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames,
+	queryResultMsgStream := pulsarms.NewPulsarMsgStream(sched.ctx, Params.MsgStreamSearchResultBufSize, 1024, factory.NewUnmarshalDispatcher())
+	queryResultMsgStream.SetPulsarClient(Params.PulsarAddress)
+	queryResultMsgStream.CreatePulsarConsumers(Params.SearchResultChannelNames,
 		Params.ProxySubName)
 	queryNodeNum := Params.QueryNodeNum
 
diff --git a/internal/proxynode/timetick.go b/internal/proxynode/timetick.go
index 30861feb2..8153c8633 100644
--- a/internal/proxynode/timetick.go
+++ b/internal/proxynode/timetick.go
@@ -26,7 +26,7 @@ type timeTick struct {
 	pulsarProducer pulsar.Producer
 
 	tsoAllocator  *allocator.TimestampAllocator
-	tickMsgStream msgstream.MsgStream
+	tickMsgStream *pulsarms.PulsarMsgStream
 
 	peerID    UniqueID
 	wg        sync.WaitGroup
@@ -51,9 +51,12 @@ func newTimeTick(ctx context.Context,
 		checkFunc:    checkFunc,
 	}
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamTimeTickBufSize, 1024)
-	t.tickMsgStream, _ = factory.NewMsgStream(t.ctx)
-	t.tickMsgStream.AsProducer(Params.ProxyTimeTickChannelNames)
+	factory := msgstream.ProtoUDFactory{}
+	t.tickMsgStream = pulsarms.NewPulsarMsgStream(t.ctx, Params.MsgStreamTimeTickBufSize, 1024, factory.NewUnmarshalDispatcher())
+	pulsarAddress := Params.PulsarAddress
+
+	t.tickMsgStream.SetPulsarClient(pulsarAddress)
+	t.tickMsgStream.CreatePulsarProducers(Params.ProxyTimeTickChannelNames)
 	return t
 }
 
diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go
index f03afa130..d4951c28e 100644
--- a/internal/proxyservice/impl.go
+++ b/internal/proxyservice/impl.go
@@ -10,6 +10,7 @@ import (
 	"strconv"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@@ -95,7 +96,7 @@ func (s *ServiceImpl) fillNodeInitParams() error {
 }
 
 func (s *ServiceImpl) Init() error {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
+	dispatcherFactory := msgstream.ProtoUDFactory{}
 
 	err := s.fillNodeInitParams()
 	if err != nil {
@@ -103,8 +104,9 @@ func (s *ServiceImpl) Init() error {
 	}
 	log.Println("fill node init params ...")
 
-	serviceTimeTickMsgStream, _ := factory.NewTtMsgStream(s.ctx)
-	serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel})
+	serviceTimeTickMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
+	serviceTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress)
+	serviceTimeTickMsgStream.CreatePulsarProducers([]string{Params.ServiceTimeTickChannel})
 	log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel})
 
 	channels := make([]string, Params.InsertChannelNum)
@@ -112,12 +114,14 @@ func (s *ServiceImpl) Init() error {
 	for ; i < Params.InsertChannelNum; i++ {
 		channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
 	}
-	insertTickMsgStream, _ := factory.NewMsgStream(s.ctx)
-	insertTickMsgStream.AsProducer(channels)
+	insertTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
+	insertTickMsgStream.SetPulsarClient(Params.PulsarAddress)
+	insertTickMsgStream.CreatePulsarProducers(channels)
 	log.Println("create service time tick producer channel: ", channels)
 
-	nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx)
-	nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel,
+	nodeTimeTickMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024, 1024, dispatcherFactory.NewUnmarshalDispatcher())
+	nodeTimeTickMsgStream.SetPulsarClient(Params.PulsarAddress)
+	nodeTimeTickMsgStream.CreatePulsarConsumers(Params.NodeTimeTickChannel,
 		"proxyservicesub") // TODO: add config
 	log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel)
 
diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go
index dc961aa87..b8cd27b7a 100644
--- a/internal/querynode/data_sync_service_test.go
+++ b/internal/querynode/data_sync_service_test.go
@@ -109,12 +109,14 @@ func TestDataSyncService_Start(t *testing.T) {
 	ddChannels := Params.DDChannelNames
 	pulsarURL := Params.PulsarAddress
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	insertStream.AsProducer(insertChannels)
-
-	ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	ddStream.AsProducer(ddChannels)
+	factory := msgstream.ProtoUDFactory{}
+	insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(pulsarURL)
+	insertStream.CreatePulsarProducers(insertChannels)
+
+	ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(pulsarURL)
+	ddStream.CreatePulsarProducers(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
 	insertMsgStream.Start()
diff --git a/internal/querynode/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go
index 0d229606b..046d411ad 100644
--- a/internal/querynode/flow_graph_msg_stream_input_nodes.go
+++ b/internal/querynode/flow_graph_msg_stream_input_nodes.go
@@ -9,13 +9,18 @@ import (
 )
 
 func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.InsertReceiveBufSize, Params.InsertPulsarBufSize)
+	factory := msgstream.ProtoUDFactory{}
+	receiveBufSize := Params.InsertReceiveBufSize
+	pulsarBufSize := Params.InsertPulsarBufSize
+
+	msgStreamURL := Params.PulsarAddress
 
 	consumeChannels := Params.InsertChannelNames
 	consumeSubName := Params.MsgChannelSubName
 
-	insertStream, _ := factory.NewTtMsgStream(ctx)
-	insertStream.AsConsumer(consumeChannels, consumeSubName)
+	insertStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(msgStreamURL)
+	insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
 
 	var stream msgstream.MsgStream = insertStream
 	dsService.dmStream = stream
@@ -28,13 +33,18 @@ func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph
 }
 
 func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph.InputNode {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.DDReceiveBufSize, Params.DDPulsarBufSize)
+	factory := msgstream.ProtoUDFactory{}
+	receiveBufSize := Params.DDReceiveBufSize
+	pulsarBufSize := Params.DDPulsarBufSize
+
+	msgStreamURL := Params.PulsarAddress
 
 	consumeChannels := Params.DDChannelNames
 	consumeSubName := Params.MsgChannelSubName
 
-	ddStream, _ := factory.NewTtMsgStream(ctx)
-	ddStream.AsConsumer(consumeChannels, consumeSubName)
+	ddStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(msgStreamURL)
+	ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
 
 	var stream msgstream.MsgStream = ddStream
 	dsService.ddStream = stream
diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go
index 651be6fbc..4ee39a8d7 100644
--- a/internal/querynode/flow_graph_service_time_node.go
+++ b/internal/querynode/flow_graph_service_time_node.go
@@ -13,7 +13,7 @@ import (
 type serviceTimeNode struct {
 	baseNode
 	replica           collectionReplica
-	timeTickMsgStream msgstream.MsgStream
+	timeTickMsgStream *pulsarms.PulsarMsgStream
 }
 
 func (stNode *serviceTimeNode) Name() string {
@@ -78,9 +78,10 @@ func newServiceTimeNode(ctx context.Context, replica collectionReplica) *service
 	baseNode.SetMaxQueueLength(maxQueueLength)
 	baseNode.SetMaxParallelism(maxParallelism)
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.SearchReceiveBufSize, 1024)
-	timeTimeMsgStream, _ := factory.NewMsgStream(ctx)
-	timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName})
+	factory := msgstream.ProtoUDFactory{}
+	timeTimeMsgStream := pulsarms.NewPulsarMsgStream(ctx, Params.SearchReceiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	timeTimeMsgStream.SetPulsarClient(Params.PulsarAddress)
+	timeTimeMsgStream.CreatePulsarProducers([]string{Params.QueryTimeTickChannelName})
 
 	return &serviceTimeNode{
 		baseNode:          baseNode,
diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go
index 4bcd10052..5e00e4f09 100644
--- a/internal/querynode/load_service_test.go
+++ b/internal/querynode/load_service_test.go
@@ -130,10 +130,10 @@ import (
 //
 //	insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	insertStream.SetPulsarClient(Params.PulsarAddress)
-//	insertStream.AsProducer(insertChannels)
+//	insertStream.CreatePulsarProducers(insertChannels)
 //	ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	ddStream.SetPulsarClient(Params.PulsarAddress)
-//	ddStream.AsProducer(ddChannels)
+//	ddStream.CreatePulsarProducers(ddChannels)
 //
 //	var insertMsgStream msgstream.MsgStream = insertStream
 //	insertMsgStream.Start()
@@ -206,7 +206,7 @@ import (
 //	}
 //	searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	searchStream.SetPulsarClient(Params.PulsarAddress)
-//	searchStream.AsProducer(newSearchChannelNames)
+//	searchStream.CreatePulsarProducers(newSearchChannelNames)
 //	searchStream.Start()
 //	err = searchStream.Produce(fn(1))
 //	assert.NoError(t, err)
@@ -215,7 +215,7 @@ import (
 //	searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	searchResultStream.SetPulsarClient(Params.PulsarAddress)
 //	unmarshalDispatcher := util.NewUnmarshalDispatcher()
-//	searchResultStream.AsConsumer(newSearchResultChannelNames, "loadIndexTestSubSearchResult", unmarshalDispatcher, receiveBufSize)
+//	searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult", unmarshalDispatcher, receiveBufSize)
 //	searchResultStream.Start()
 //	searchResult := searchResultStream.Consume()
 //	assert.NotNil(t, searchResult)
@@ -295,7 +295,7 @@ import (
 //	// init message stream consumer and do checks
 //	statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
 //	statsMs.SetPulsarClient(Params.PulsarAddress)
-//	statsMs.AsConsumer([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
+//	statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
 //	statsMs.Start()
 //
 //	findFiledStats := false
@@ -464,10 +464,10 @@ import (
 //
 //	insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	insertStream.SetPulsarClient(Params.PulsarAddress)
-//	insertStream.AsProducer(insertChannels)
+//	insertStream.CreatePulsarProducers(insertChannels)
 //	ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	ddStream.SetPulsarClient(Params.PulsarAddress)
-//	ddStream.AsProducer(ddChannels)
+//	ddStream.CreatePulsarProducers(ddChannels)
 //
 //	var insertMsgStream msgstream.MsgStream = insertStream
 //	insertMsgStream.Start()
@@ -529,7 +529,7 @@ import (
 //	}
 //	searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	searchStream.SetPulsarClient(Params.PulsarAddress)
-//	searchStream.AsProducer(newSearchChannelNames)
+//	searchStream.CreatePulsarProducers(newSearchChannelNames)
 //	searchStream.Start()
 //	err = searchStream.Produce(fn(1))
 //	assert.NoError(t, err)
@@ -538,7 +538,7 @@ import (
 //	searchResultStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize)
 //	searchResultStream.SetPulsarClient(Params.PulsarAddress)
 //	unmarshalDispatcher := util.NewUnmarshalDispatcher()
-//	searchResultStream.AsConsumer(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize)
+//	searchResultStream.CreatePulsarConsumers(newSearchResultChannelNames, "loadIndexTestSubSearchResult2", unmarshalDispatcher, receiveBufSize)
 //	searchResultStream.Start()
 //	searchResult := searchResultStream.Consume()
 //	assert.NotNil(t, searchResult)
@@ -612,7 +612,7 @@ import (
 //	// init message stream consumer and do checks
 //	statsMs := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, Params.StatsReceiveBufSize)
 //	statsMs.SetPulsarClient(Params.PulsarAddress)
-//	statsMs.AsConsumer([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
+//	statsMs.CreatePulsarConsumers([]string{Params.StatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), Params.StatsReceiveBufSize)
 //	statsMs.Start()
 //
 //	findFiledStats := false
@@ -1016,13 +1016,15 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
 	ddChannels := Params.DDChannelNames
 	pulsarURL := Params.PulsarAddress
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(ctx)
-	insertStream.AsProducer(insertChannels)
-	insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName)
+	factory := msgstream.ProtoUDFactory{}
+	insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(pulsarURL)
+	insertStream.CreatePulsarProducers(insertChannels)
+	insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName)
 
-	ddStream, _ := factory.NewMsgStream(ctx)
-	ddStream.AsProducer(ddChannels)
+	ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(pulsarURL)
+	ddStream.CreatePulsarProducers(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
 	insertMsgStream.Start()
@@ -1074,13 +1076,15 @@ func sentTimeTick(ctx context.Context) error {
 	ddChannels := Params.DDChannelNames
 	pulsarURL := Params.PulsarAddress
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(ctx)
-	insertStream.AsProducer(insertChannels)
-	insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName)
+	factory := msgstream.ProtoUDFactory{}
+	insertStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(pulsarURL)
+	insertStream.CreatePulsarProducers(insertChannels)
+	insertStream.CreatePulsarConsumers(insertChannels, Params.MsgChannelSubName)
 
-	ddStream, _ := factory.NewMsgStream(ctx)
-	ddStream.AsProducer(ddChannels)
+	ddStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(pulsarURL)
+	ddStream.CreatePulsarProducers(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
 	insertMsgStream.Start()
diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go
index ff9545780..191dab46e 100644
--- a/internal/querynode/query_node.go
+++ b/internal/querynode/query_node.go
@@ -333,11 +333,11 @@ func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*co
 	// add request channel
 	consumeChannels := []string{in.RequestChannelID}
 	consumeSubName := Params.MsgChannelSubName
-	searchStream.AsConsumer(consumeChannels, consumeSubName)
+	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
 
 	// add result channel
 	producerChannels := []string{in.ResultChannelID}
-	resultStream.AsProducer(producerChannels)
+	resultStream.CreatePulsarProducers(producerChannels)
 
 	status := &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_SUCCESS,
@@ -382,12 +382,12 @@ func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest
 	consumeChannels := []string{in.RequestChannelID}
 	consumeSubName := Params.MsgChannelSubName
 	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
-	searchStream.AsConsumer(consumeChannels, consumeSubName)
+	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
 
 	// remove result channel
 	producerChannels := []string{in.ResultChannelID}
 	// TODO: resultStream.RemovePulsarProducer(producerChannels)
-	resultStream.AsProducer(producerChannels)
+	resultStream.CreatePulsarProducers(producerChannels)
 
 	status := &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_SUCCESS,
@@ -420,7 +420,7 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
 	// add request channel
 	consumeChannels := in.ChannelIDs
 	consumeSubName := Params.MsgChannelSubName
-	fgDMMsgStream.AsConsumer(consumeChannels, consumeSubName)
+	fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
 
 	status := &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_SUCCESS,
diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go
index c44e9406d..217a2c881 100644
--- a/internal/querynode/search_service.go
+++ b/internal/querynode/search_service.go
@@ -47,17 +47,19 @@ func newSearchService(ctx context.Context, replica collectionReplica) *searchSer
 
 	msgStreamURL := Params.PulsarAddress
 
-	factory := pulsarms.NewFactory(msgStreamURL, receiveBufSize, pulsarBufSize)
+	factory := msgstream.ProtoUDFactory{}
 
 	consumeChannels := Params.SearchChannelNames
 	consumeSubName := Params.MsgChannelSubName
-	searchStream, _ := factory.NewMsgStream(ctx)
-	searchStream.AsConsumer(consumeChannels, consumeSubName)
+	searchStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
+	searchStream.SetPulsarClient(msgStreamURL)
+	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName)
 	var inputStream msgstream.MsgStream = searchStream
 
 	producerChannels := Params.SearchResultChannelNames
-	searchResultStream, _ := factory.NewMsgStream(ctx)
-	searchResultStream.AsProducer(producerChannels)
+	searchResultStream := pulsarms.NewPulsarMsgStream(ctx, receiveBufSize, pulsarBufSize, factory.NewUnmarshalDispatcher())
+	searchResultStream.SetPulsarClient(msgStreamURL)
+	searchResultStream.CreatePulsarProducers(producerChannels)
 	var outputStream msgstream.MsgStream = searchResultStream
 
 	searchServiceCtx, searchServiceCancel := context.WithCancel(ctx)
diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go
index ca33a0ffc..759d4e23b 100644
--- a/internal/querynode/search_service_test.go
+++ b/internal/querynode/search_service_test.go
@@ -93,9 +93,10 @@ func TestSearch_Search(t *testing.T) {
 	msgPackSearch := msgstream.MsgPack{}
 	msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	searchStream.AsProducer(searchProducerChannels)
+	factory := msgstream.ProtoUDFactory{}
+	searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	searchStream.SetPulsarClient(pulsarURL)
+	searchStream.CreatePulsarProducers(searchProducerChannels)
 	searchStream.Start()
 	err = searchStream.Produce(&msgPackSearch)
 	assert.NoError(t, err)
@@ -179,11 +180,13 @@ func TestSearch_Search(t *testing.T) {
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
 
-	insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	insertStream.AsProducer(insertChannels)
+	insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(pulsarURL)
+	insertStream.CreatePulsarProducers(insertChannels)
 
-	ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	ddStream.AsProducer(ddChannels)
+	ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(pulsarURL)
+	ddStream.CreatePulsarProducers(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
 	insertMsgStream.Start()
@@ -283,9 +286,10 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
 	msgPackSearch := msgstream.MsgPack{}
 	msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	searchStream.AsProducer(searchProducerChannels)
+	factory := msgstream.ProtoUDFactory{}
+	searchStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	searchStream.SetPulsarClient(pulsarURL)
+	searchStream.CreatePulsarProducers(searchProducerChannels)
 	searchStream.Start()
 	err = searchStream.Produce(&msgPackSearch)
 	assert.NoError(t, err)
@@ -373,11 +377,13 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
 
-	insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	insertStream.AsProducer(insertChannels)
+	insertStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	insertStream.SetPulsarClient(pulsarURL)
+	insertStream.CreatePulsarProducers(insertChannels)
 
-	ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
-	ddStream.AsProducer(ddChannels)
+	ddStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	ddStream.SetPulsarClient(pulsarURL)
+	ddStream.CreatePulsarProducers(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
 	insertMsgStream.Start()
diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go
index 9608128e8..872476ae9 100644
--- a/internal/querynode/stats_service.go
+++ b/internal/querynode/stats_service.go
@@ -36,13 +36,16 @@ func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsC
 
 func (sService *statsService) start() {
 	sleepTimeInterval := Params.StatsPublishInterval
+	receiveBufSize := Params.StatsReceiveBufSize
 
 	// start pulsar
+	msgStreamURL := Params.PulsarAddress
 	producerChannels := []string{Params.StatsChannelName}
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.StatsReceiveBufSize, 1024)
-	statsStream, _ := factory.NewMsgStream(sService.ctx)
-	statsStream.AsProducer(producerChannels)
+	factory := msgstream.ProtoUDFactory{}
+	statsStream := pulsarms.NewPulsarMsgStream(sService.ctx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	statsStream.SetPulsarClient(msgStreamURL)
+	statsStream.CreatePulsarProducers(producerChannels)
 
 	var statsMsgStream msgstream.MsgStream = statsStream
 
diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go
index d21230fd0..0ddfef1d5 100644
--- a/internal/querynode/stats_service_test.go
+++ b/internal/querynode/stats_service_test.go
@@ -3,7 +3,6 @@ package querynode
 import (
 	"testing"
 
-	"github.com/stretchr/testify/assert"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
@@ -27,10 +26,11 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
 	producerChannels := []string{Params.StatsChannelName}
 
 	pulsarURL := Params.PulsarAddress
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	statsStream, err := factory.NewMsgStream(node.queryNodeLoopCtx)
-	assert.Nil(t, err)
-	statsStream.AsProducer(producerChannels)
+
+	factory := msgstream.ProtoUDFactory{}
+	statsStream := pulsarms.NewPulsarMsgStream(node.queryNodeLoopCtx, receiveBufSize, 1024, factory.NewUnmarshalDispatcher())
+	statsStream.SetPulsarClient(pulsarURL)
+	statsStream.CreatePulsarProducers(producerChannels)
 
 	var statsMsgStream msgstream.MsgStream = statsStream
 
diff --git a/internal/queryservice/load_test.go b/internal/queryservice/load_test.go
index 8af9a6289..9d73d5168 100644
--- a/internal/queryservice/load_test.go
+++ b/internal/queryservice/load_test.go
@@ -228,10 +228,10 @@ func TestLoadCollection(t *testing.T) {
 	//
 	//insertStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
 	//insertStream.SetPulsarClient(pulsarAddress)
-	//insertStream.AsProducer(insertChannels)
+	//insertStream.CreatePulsarProducers(insertChannels)
 	//ddStream := pulsarms.NewPulsarMsgStream(context.Background(), receiveBufSize)
 	//ddStream.SetPulsarClient(pulsarAddress)
-	//ddStream.AsProducer(ddChannels)
+	//ddStream.CreatePulsarProducers(ddChannels)
 	//
 	//var insertMsgStream msgstream.MsgStream = insertStream
 	//insertMsgStream.Start()
@@ -246,7 +246,7 @@ func TestLoadCollection(t *testing.T) {
 	//consumeStream := pulsarms.NewPulsarTtMsgStream(context.Background(), receiveBufSize)
 	//consumeStream.SetPulsarClient(pulsarAddress)
 	//unmarshalDispatcher := util.NewUnmarshalDispatcher()
-	//consumeStream.AsConsumer(insertChannels, "test", unmarshalDispatcher, pulsarBufSize)
+	//consumeStream.CreatePulsarConsumers(insertChannels, "test", unmarshalDispatcher, pulsarBufSize)
 	//consumeStream.Start()
 	//
 	//for i := 0; i < 10; i++ {
-- 
GitLab