From dc3736281add491d9ba93daec7172e305166567a Mon Sep 17 00:00:00 2001
From: groot <yihua.mo@zilliz.com>
Date: Mon, 8 Feb 2021 14:30:54 +0800
Subject: [PATCH] Integrate message stream

Signed-off-by: groot <yihua.mo@zilliz.com>
---
 cmd/datanode/main.go                          |  5 +-
 cmd/dataservice/main.go                       |  5 +-
 cmd/distributed/components/data_node.go       |  5 +-
 cmd/distributed/components/data_service.go    |  5 +-
 cmd/distributed/components/master_service.go  |  5 +-
 cmd/distributed/components/proxy_node.go      |  6 ++-
 cmd/distributed/components/proxy_service.go   |  6 ++-
 cmd/distributed/components/query_node.go      |  5 +-
 cmd/distributed/components/query_service.go   |  5 +-
 cmd/distributed/main.go                       |  2 +-
 cmd/distributed/roles/roles.go                | 33 +++++++++----
 cmd/masterservice/main.go                     |  4 +-
 cmd/proxy/node/proxy_node.go                  |  4 +-
 cmd/proxy/service/proxy_service.go            |  4 +-
 cmd/querynode/querynode.go                    |  4 +-
 cmd/queryservice/queryservice.go              |  5 +-
 cmd/singlenode/main.go                        |  2 +-
 go.mod                                        |  1 +
 internal/datanode/data_node.go                |  8 +++-
 internal/datanode/data_sync_service.go        | 20 ++++++--
 internal/datanode/data_sync_service_test.go   | 17 ++++---
 .../datanode/flow_graph_insert_buffer_node.go |  5 +-
 .../flow_graph_insert_buffer_node_test.go     | 13 +++++-
 .../flow_graph_msg_stream_input_node.go       |  7 +--
 internal/dataservice/server.go                | 33 +++++++------
 internal/distributed/datanode/service.go      | 12 +++--
 .../distributed/dataservice/grpc_service.go   |  5 +-
 .../masterservice/masterservice_test.go       |  4 +-
 internal/distributed/masterservice/server.go  |  6 ++-
 internal/distributed/proxynode/service.go     |  8 ++--
 internal/distributed/proxyservice/service.go  |  5 +-
 internal/distributed/querynode/service.go     |  5 +-
 internal/distributed/queryservice/service.go  |  8 +++-
 internal/masterservice/master_service.go      | 30 ++++++++----
 internal/masterservice/master_service_test.go | 19 +++++---
 internal/msgstream/msgstream.go               |  1 +
 internal/msgstream/pulsarms/factory.go        | 27 +++++++----
 .../msgstream/pulsarms/pulsar_msgstream.go    |  2 +-
 .../pulsarms/pulsar_msgstream_test.go         |  2 +-
 internal/msgstream/rmqms/factory.go           | 26 +++++++++--
 internal/msgstream/rmqms/rmq_msgstream.go     |  2 +-
 internal/proxynode/insert_channels.go         |  7 ++-
 internal/proxynode/proxy_node.go              | 30 +++++++-----
 internal/proxynode/task_scheduler.go          | 10 ++--
 internal/proxynode/timetick.go                |  9 ++--
 internal/proxynode/timetick_test.go           |  4 +-
 internal/proxyservice/impl.go                 | 19 +++++---
 internal/proxyservice/proxyservice.go         | 11 +++--
 internal/querynode/data_sync_service.go       | 17 +++----
 internal/querynode/data_sync_service_test.go  | 17 +++++--
 .../flow_graph_msg_stream_input_nodes.go      |  9 +---
 .../querynode/flow_graph_service_time_node.go |  4 +-
 internal/querynode/load_service_test.go       | 36 +++++++++++----
 internal/querynode/query_node.go              | 28 +++++++++--
 internal/querynode/query_node_test.go         |  4 +-
 internal/querynode/search_service.go          |  8 +---
 internal/querynode/search_service_test.go     | 46 ++++++++++++-------
 internal/querynode/stats_service.go           |  9 ++--
 internal/querynode/stats_service_test.go      | 23 ++++++++--
 internal/queryservice/queryservice.go         |  8 +++-
 internal/queryservice/queryservice_test.go    |  7 ++-
 61 files changed, 447 insertions(+), 230 deletions(-)

diff --git a/cmd/datanode/main.go b/cmd/datanode/main.go
index 7bde774e9..42e8de857 100644
--- a/cmd/datanode/main.go
+++ b/cmd/datanode/main.go
@@ -8,6 +8,7 @@ import (
 	"syscall"
 
 	distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 func main() {
@@ -15,7 +16,9 @@ func main() {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	dn, err := distributed.NewDataNode(ctx)
+	msFactory := pulsarms.NewFactory()
+
+	dn, err := distributed.NewDataNode(ctx, msFactory)
 	if err != nil {
 		panic(err)
 	}
diff --git a/cmd/dataservice/main.go b/cmd/dataservice/main.go
index d7d317836..ad1c81e61 100644
--- a/cmd/dataservice/main.go
+++ b/cmd/dataservice/main.go
@@ -8,12 +8,15 @@ import (
 	"syscall"
 
 	"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 func main() {
 	ctx, cancel := context.WithCancel(context.Background())
 
-	svr, err := components.NewDataService(ctx)
+	msFactory := pulsarms.NewFactory()
+
+	svr, err := components.NewDataService(ctx, msFactory)
 	if err != nil {
 		panic(err)
 	}
diff --git a/cmd/distributed/components/data_node.go b/cmd/distributed/components/data_node.go
index 9a9ae3836..1b372cdc6 100644
--- a/cmd/distributed/components/data_node.go
+++ b/cmd/distributed/components/data_node.go
@@ -12,6 +12,7 @@ import (
 	dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
 	msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 )
@@ -24,12 +25,12 @@ type DataNode struct {
 	dataService   *dsc.Client
 }
 
-func NewDataNode(ctx context.Context) (*DataNode, error) {
+func NewDataNode(ctx context.Context, factory msgstream.Factory) (*DataNode, error) {
 
 	const retry = 10
 	const interval = 200
 
-	svr, err := dnc.New(ctx)
+	svr, err := dnc.New(ctx, factory)
 	if err != nil {
 		panic(err)
 	}
diff --git a/cmd/distributed/components/data_service.go b/cmd/distributed/components/data_service.go
index c1b2ee4f2..1205db71a 100644
--- a/cmd/distributed/components/data_service.go
+++ b/cmd/distributed/components/data_service.go
@@ -9,6 +9,7 @@ import (
 
 	ms "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
 	"github.com/zilliztech/milvus-distributed/internal/masterservice"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 
 	"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
@@ -20,8 +21,8 @@ type DataService struct {
 	masterClient *ms.GrpcClient
 }
 
-func NewDataService(ctx context.Context) (*DataService, error) {
-	service := dataservice.NewGrpcService(ctx)
+func NewDataService(ctx context.Context, factory msgstream.Factory) (*DataService, error) {
+	service := dataservice.NewGrpcService(ctx, factory)
 
 	masterservice.Params.Init()
 	client, err := ms.NewGrpcClient(fmt.Sprintf("%s:%d", masterservice.Params.Address, masterservice.Params.Port), 30*time.Second)
diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go
index 9d90c7e13..562171643 100644
--- a/cmd/distributed/components/master_service.go
+++ b/cmd/distributed/components/master_service.go
@@ -15,6 +15,7 @@ import (
 	qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
 	is "github.com/zilliztech/milvus-distributed/internal/indexservice"
 	ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
@@ -30,10 +31,10 @@ type MasterService struct {
 	queryService *qsc.Client
 }
 
-func NewMasterService(ctx context.Context) (*MasterService, error) {
+func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterService, error) {
 	const reTryCnt = 3
 
-	svr, err := msc.NewGrpcServer(ctx)
+	svr, err := msc.NewGrpcServer(ctx, factory)
 	if err != nil {
 		return nil, err
 	}
diff --git a/cmd/distributed/components/proxy_node.go b/cmd/distributed/components/proxy_node.go
index da6c5d97c..a3b47cedd 100644
--- a/cmd/distributed/components/proxy_node.go
+++ b/cmd/distributed/components/proxy_node.go
@@ -3,6 +3,8 @@ package components
 import (
 	"context"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+
 	grpcproxynode "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode"
 )
 
@@ -10,9 +12,9 @@ type ProxyNode struct {
 	svr *grpcproxynode.Server
 }
 
-func NewProxyNode(ctx context.Context) (*ProxyNode, error) {
+func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) {
 	n := &ProxyNode{}
-	svr, err := grpcproxynode.NewServer(ctx)
+	svr, err := grpcproxynode.NewServer(ctx, factory)
 	if err != nil {
 		return nil, err
 	}
diff --git a/cmd/distributed/components/proxy_service.go b/cmd/distributed/components/proxy_service.go
index f044ae246..4581e729d 100644
--- a/cmd/distributed/components/proxy_service.go
+++ b/cmd/distributed/components/proxy_service.go
@@ -3,6 +3,8 @@ package components
 import (
 	"context"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+
 	grpcproxyservice "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
 )
 
@@ -10,9 +12,9 @@ type ProxyService struct {
 	svr *grpcproxyservice.Server
 }
 
-func NewProxyService(ctx context.Context) (*ProxyService, error) {
+func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) {
 	service := &ProxyService{}
-	svr, err := grpcproxyservice.NewServer(ctx)
+	svr, err := grpcproxyservice.NewServer(ctx, factory)
 	if err != nil {
 		return nil, err
 	}
diff --git a/cmd/distributed/components/query_node.go b/cmd/distributed/components/query_node.go
index c9009f282..621c3a5f0 100644
--- a/cmd/distributed/components/query_node.go
+++ b/cmd/distributed/components/query_node.go
@@ -11,6 +11,7 @@ import (
 	msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
 	qns "github.com/zilliztech/milvus-distributed/internal/distributed/querynode"
 	qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 
 	ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
 	is "github.com/zilliztech/milvus-distributed/internal/indexservice"
@@ -32,11 +33,11 @@ type QueryNode struct {
 	queryService  *qsc.Client
 }
 
-func NewQueryNode(ctx context.Context) (*QueryNode, error) {
+func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, error) {
 	const retry = 10
 	const interval = 500
 
-	svr, err := qns.NewServer(ctx)
+	svr, err := qns.NewServer(ctx, factory)
 	if err != nil {
 		panic(err)
 	}
diff --git a/cmd/distributed/components/query_service.go b/cmd/distributed/components/query_service.go
index 78337cd10..ca288ffb7 100644
--- a/cmd/distributed/components/query_service.go
+++ b/cmd/distributed/components/query_service.go
@@ -12,6 +12,7 @@ import (
 
 	ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
 	ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/queryservice"
@@ -25,12 +26,12 @@ type QueryService struct {
 	masterService *msc.GrpcClient
 }
 
-func NewQueryService(ctx context.Context) (*QueryService, error) {
+func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) {
 	const retry = 10
 	const interval = 200
 
 	queryservice.Params.Init()
-	svr, err := qs.NewServer(ctx)
+	svr, err := qs.NewServer(ctx, factory)
 	if err != nil {
 		panic(err)
 	}
diff --git a/cmd/distributed/main.go b/cmd/distributed/main.go
index aacedf752..5e6b50e92 100644
--- a/cmd/distributed/main.go
+++ b/cmd/distributed/main.go
@@ -64,7 +64,7 @@ func run(serverType string) error {
 	default:
 		return errors.Errorf("unknown server type = %s", serverType)
 	}
-	role.Run()
+	role.Run(false)
 	return nil
 }
 
diff --git a/cmd/distributed/roles/roles.go b/cmd/distributed/roles/roles.go
index 5419d9483..ec7036830 100644
--- a/cmd/distributed/roles/roles.go
+++ b/cmd/distributed/roles/roles.go
@@ -9,8 +9,18 @@ import (
 	"syscall"
 
 	"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
 )
 
+func newMsgFactory(localMsg bool) msgstream.Factory {
+	if localMsg {
+		return rmqms.NewFactory()
+	}
+	return pulsarms.NewFactory()
+}
+
 type MilvusRoles struct {
 	EnableMaster           bool `env:"ENABLE_MASTER"`
 	EnableProxyService     bool `env:"ENABLE_PROXY_SERVICE"`
@@ -42,7 +52,7 @@ func (mr *MilvusRoles) EnvValue(env string) bool {
 	return false
 }
 
-func (mr *MilvusRoles) Run() {
+func (mr *MilvusRoles) Run(localMsg bool) {
 	if !mr.HasAnyRole() {
 		log.Printf("set the roles please ...")
 		return
@@ -55,8 +65,9 @@ func (mr *MilvusRoles) Run() {
 	if mr.EnableMaster {
 		log.Print("start as master service")
 		go func() {
+			factory := newMsgFactory(localMsg)
 			var err error
-			masterService, err = components.NewMasterService(ctx)
+			masterService, err = components.NewMasterService(ctx, factory)
 			if err != nil {
 				panic(err)
 			}
@@ -68,8 +79,9 @@ func (mr *MilvusRoles) Run() {
 	if mr.EnableProxyService {
 		log.Print("start as proxy service")
 		go func() {
+			factory := newMsgFactory(localMsg)
 			var err error
-			proxyService, err = components.NewProxyService(ctx)
+			proxyService, err = components.NewProxyService(ctx, factory)
 			if err != nil {
 				panic(err)
 			}
@@ -81,8 +93,9 @@ func (mr *MilvusRoles) Run() {
 	if mr.EnableProxyNode {
 		log.Print("start as proxy node")
 		go func() {
+			factory := newMsgFactory(localMsg)
 			var err error
-			proxyNode, err = components.NewProxyNode(ctx)
+			proxyNode, err = components.NewProxyNode(ctx, factory)
 			if err != nil {
 				panic(err)
 			}
@@ -94,8 +107,9 @@ func (mr *MilvusRoles) Run() {
 	if mr.EnableQueryService {
 		log.Print("start as query service")
 		go func() {
+			factory := newMsgFactory(localMsg)
 			var err error
-			queryService, err = components.NewQueryService(ctx)
+			queryService, err = components.NewQueryService(ctx, factory)
 			if err != nil {
 				panic(err)
 			}
@@ -107,8 +121,9 @@ func (mr *MilvusRoles) Run() {
 	if mr.EnableQueryNode {
 		log.Print("start as query node")
 		go func() {
+			factory := newMsgFactory(localMsg)
 			var err error
-			queryNode, err = components.NewQueryNode(ctx)
+			queryNode, err = components.NewQueryNode(ctx, factory)
 			if err != nil {
 				panic(err)
 			}
@@ -120,8 +135,9 @@ func (mr *MilvusRoles) Run() {
 	if mr.EnableDataService {
 		log.Print("start as data service")
 		go func() {
+			factory := newMsgFactory(localMsg)
 			var err error
-			dataService, err = components.NewDataService(ctx)
+			dataService, err = components.NewDataService(ctx, factory)
 			if err != nil {
 				panic(err)
 			}
@@ -133,8 +149,9 @@ func (mr *MilvusRoles) Run() {
 	if mr.EnableDataNode {
 		log.Print("start as data node")
 		go func() {
+			factory := newMsgFactory(localMsg)
 			var err error
-			dataNode, err = components.NewDataNode(ctx)
+			dataNode, err = components.NewDataNode(ctx, factory)
 			if err != nil {
 				panic(err)
 			}
diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go
index a59d5e1e8..34b146ecd 100644
--- a/cmd/masterservice/main.go
+++ b/cmd/masterservice/main.go
@@ -8,13 +8,15 @@ import (
 	"syscall"
 
 	distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 func main() {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	ms, err := distributed.NewMasterService(ctx)
+	msFactory := pulsarms.NewFactory()
+	ms, err := distributed.NewMasterService(ctx, msFactory)
 	if err != nil {
 		panic(err)
 	}
diff --git a/cmd/proxy/node/proxy_node.go b/cmd/proxy/node/proxy_node.go
index 9e271920f..e146b9905 100644
--- a/cmd/proxy/node/proxy_node.go
+++ b/cmd/proxy/node/proxy_node.go
@@ -8,13 +8,15 @@ import (
 	"syscall"
 
 	"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 
 	"go.uber.org/zap"
 )
 
 func main() {
 	ctx, cancel := context.WithCancel(context.Background())
-	n, err := components.NewProxyNode(ctx)
+	msFactory := pulsarms.NewFactory()
+	n, err := components.NewProxyNode(ctx, msFactory)
 	if err != nil {
 		log.Print("create server failed", zap.Error(err))
 	}
diff --git a/cmd/proxy/service/proxy_service.go b/cmd/proxy/service/proxy_service.go
index 9086d87b3..fdc1d2171 100644
--- a/cmd/proxy/service/proxy_service.go
+++ b/cmd/proxy/service/proxy_service.go
@@ -8,13 +8,15 @@ import (
 	"syscall"
 
 	"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 
 	"go.uber.org/zap"
 )
 
 func main() {
 	ctx, cancel := context.WithCancel(context.Background())
-	s, err := components.NewProxyService(ctx)
+	msFactory := pulsarms.NewFactory()
+	s, err := components.NewProxyService(ctx, msFactory)
 	if err != nil {
 		log.Fatal("create proxy service error: " + err.Error())
 	}
diff --git a/cmd/querynode/querynode.go b/cmd/querynode/querynode.go
index 86e984cff..179f5618c 100644
--- a/cmd/querynode/querynode.go
+++ b/cmd/querynode/querynode.go
@@ -8,13 +8,15 @@ import (
 	"syscall"
 
 	distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 func main() {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	svr, err := distributed.NewQueryNode(ctx)
+	msFactory := pulsarms.NewFactory()
+	svr, err := distributed.NewQueryNode(ctx, msFactory)
 
 	if err != nil {
 		panic(err)
diff --git a/cmd/queryservice/queryservice.go b/cmd/queryservice/queryservice.go
index 1390e307f..1cc29da53 100644
--- a/cmd/queryservice/queryservice.go
+++ b/cmd/queryservice/queryservice.go
@@ -8,13 +8,16 @@ import (
 	"syscall"
 
 	distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 func main() {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	svr, err := distributed.NewQueryService(ctx)
+	msFactory := pulsarms.NewFactory()
+
+	svr, err := distributed.NewQueryService(ctx, msFactory)
 	if err != nil {
 		panic(err)
 	}
diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go
index 40265fd14..c597ab4fd 100644
--- a/cmd/singlenode/main.go
+++ b/cmd/singlenode/main.go
@@ -20,5 +20,5 @@ func initRoles(roles *roles.MilvusRoles) {
 func main() {
 	var roles roles.MilvusRoles
 	initRoles(&roles)
-	roles.Run()
+	roles.Run(false)
 }
diff --git a/go.mod b/go.mod
index dd5cf6093..0f160674e 100644
--- a/go.mod
+++ b/go.mod
@@ -27,6 +27,7 @@ require (
 	github.com/klauspost/compress v1.10.11 // indirect
 	github.com/kr/text v0.2.0 // indirect
 	github.com/minio/minio-go/v7 v7.0.5
+	github.com/mitchellh/mapstructure v1.1.2
 	github.com/modern-go/reflect2 v1.0.1
 	github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
 	github.com/oklog/run v1.1.0
diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index b97e15db5..c7eb4696e 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/zilliztech/milvus-distributed/internal/errors"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -70,10 +71,12 @@ type (
 		replica   Replica
 
 		closer io.Closer
+
+		msFactory msgstream.Factory
 	}
 )
 
-func NewDataNode(ctx context.Context) *DataNode {
+func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
 
 	Params.Init()
 	ctx2, cancel2 := context.WithCancel(ctx)
@@ -89,6 +92,7 @@ func NewDataNode(ctx context.Context) *DataNode {
 		masterService:   nil,
 		dataService:     nil,
 		replica:         nil,
+		msFactory:       factory,
 	}
 
 	node.State.Store(internalpb2.StateCode_INITIALIZING)
@@ -165,7 +169,7 @@ func (node *DataNode) Init() error {
 	chanSize := 100
 	node.flushChan = make(chan *flushMsg, chanSize)
 
-	node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc)
+	node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc, node.msFactory)
 	node.dataSyncService.init()
 	node.metaService = newMetaService(node.ctx, replica, node.masterService)
 
diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go
index 0e7146dfb..d883b3003 100644
--- a/internal/datanode/data_sync_service.go
+++ b/internal/datanode/data_sync_service.go
@@ -5,6 +5,7 @@ import (
 	"log"
 
 	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
 	"go.etcd.io/etcd/clientv3"
 )
@@ -15,16 +16,18 @@ type dataSyncService struct {
 	flushChan   chan *flushMsg
 	replica     Replica
 	idAllocator allocator
+	msFactory   msgstream.Factory
 }
 
 func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
-	replica Replica, alloc allocator) *dataSyncService {
+	replica Replica, alloc allocator, factory msgstream.Factory) *dataSyncService {
 	service := &dataSyncService{
 		ctx:         ctx,
 		fg:          nil,
 		flushChan:   flushChan,
 		replica:     replica,
 		idAllocator: alloc,
+		msFactory:   factory,
 	}
 	return service
 }
@@ -65,12 +68,21 @@ func (dsService *dataSyncService) initNodes() {
 
 	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
 
-	var dmStreamNode Node = newDmInputNode(dsService.ctx)
-	var ddStreamNode Node = newDDInputNode(dsService.ctx)
+	m := map[string]interface{}{
+		"PulsarAddress":  Params.PulsarAddress,
+		"ReceiveBufSize": 1024,
+		"PulsarBufSize":  1024}
+	err = dsService.msFactory.SetParams(m)
+	if err != nil {
+		panic(err)
+	}
+
+	var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory)
+	var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory)
 
 	var filterDmNode Node = newFilteredDmNode()
 	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
-	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator)
+	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator, dsService.msFactory)
 	var gcNode Node = newGCNode(dsService.replica)
 
 	dsService.fg.AddNode(&dmStreamNode)
diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go
index 7973aecb7..ffa8d893e 100644
--- a/internal/datanode/data_sync_service_test.go
+++ b/internal/datanode/data_sync_service_test.go
@@ -39,7 +39,13 @@ func TestDataSyncService_Start(t *testing.T) {
 	flushChan := make(chan *flushMsg, chanSize)
 	replica := newReplica()
 	allocFactory := AllocatorFactory{}
-	sync := newDataSyncService(ctx, flushChan, replica, allocFactory)
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"pulsarAddress":  pulsarURL,
+		"receiveBufSize": 1024,
+		"pulsarBufSize":  1024}
+	err := msFactory.SetParams(m)
+	sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory)
 	sync.replica.addCollection(collMeta.ID, collMeta.Schema)
 	sync.init()
 	go sync.start()
@@ -78,15 +84,14 @@ func TestDataSyncService_Start(t *testing.T) {
 	timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
 
 	// pulsar produce
-	const receiveBufSize = 1024
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(ctx)
+	assert.NoError(t, err)
+	insertStream, _ := msFactory.NewMsgStream(ctx)
 	insertStream.AsProducer(insertChannels)
 
-	ddStream, _ := factory.NewMsgStream(ctx)
+	ddStream, _ := msFactory.NewMsgStream(ctx)
 	ddStream.AsProducer(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
@@ -95,7 +100,7 @@ func TestDataSyncService_Start(t *testing.T) {
 	var ddMsgStream msgstream.MsgStream = ddStream
 	ddMsgStream.Start()
 
-	err := insertMsgStream.Produce(&msgPack)
+	err = insertMsgStream.Produce(&msgPack)
 	assert.NoError(t, err)
 
 	err = insertMsgStream.Broadcast(&timeTickMsgPack)
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index eac2177df..663328094 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -15,7 +15,6 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/kv"
 	miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@@ -621,7 +620,7 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (
 }
 
 func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
-	replica Replica, alloc allocator) *insertBufferNode {
+	replica Replica, alloc allocator, factory msgstream.Factory) *insertBufferNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
@@ -651,8 +650,6 @@ func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
 	}
 	minioPrefix := Params.InsertBinlogRootPath
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-
 	//input stream, data node time tick
 	wTt, _ := factory.NewMsgStream(ctx)
 	wTt.AsProducer([]string{Params.TimeTickChannelName})
diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go
index 12c776b98..527f7d577 100644
--- a/internal/datanode/flow_graph_insert_buffer_node_test.go
+++ b/internal/datanode/flow_graph_insert_buffer_node_test.go
@@ -8,7 +8,9 @@ import (
 
 	"github.com/stretchr/testify/require"
 
+	"github.com/stretchr/testify/assert"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
 )
 
@@ -39,7 +41,16 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 	require.NoError(t, err)
 
 	idFactory := AllocatorFactory{}
-	iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory)
+
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"receiveBufSize": 1024,
+		"pulsarAddress":  Params.PulsarAddress,
+		"pulsarBufSize":  1024}
+	err = msFactory.SetParams(m)
+	assert.Nil(t, err)
+
+	iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory, msFactory)
 	inMsg := genInsertMsg()
 	var iMsg flowgraph.Msg = &inMsg
 	iBNode.Operate([]*flowgraph.Msg{&iMsg})
diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go
index 0476ce4d6..96b5c2093 100644
--- a/internal/datanode/flow_graph_msg_stream_input_node.go
+++ b/internal/datanode/flow_graph_msg_stream_input_node.go
@@ -4,18 +4,16 @@ import (
 	"context"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
 )
 
-func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
+func newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode {
 
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 	consumeChannels := Params.InsertChannelNames
 	consumeSubName := Params.MsgChannelSubName
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
 	insertStream, _ := factory.NewTtMsgStream(ctx)
 	insertStream.AsConsumer(consumeChannels, consumeSubName)
 
@@ -24,13 +22,12 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
 	return node
 }
 
-func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
+func newDDInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode {
 
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 	consumeSubName := Params.MsgChannelSubName
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
 	tmpStream, _ := factory.NewTtMsgStream(ctx)
 	tmpStream.AsConsumer(Params.DDChannelNames, consumeSubName)
 
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index 5b7921267..318b104de 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -17,7 +17,6 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
 
@@ -94,17 +93,19 @@ type (
 		ddChannelName     string
 		segmentInfoStream msgstream.MsgStream
 		insertChannels    []string
+		msFactory         msgstream.Factory
 		ttBarrier         timesync.TimeTickBarrier
 	}
 )
 
-func CreateServer(ctx context.Context) (*Server, error) {
+func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
 	Params.Init()
 	ch := make(chan struct{})
 	s := &Server{
 		ctx:              ctx,
 		registerFinishCh: ch,
 		cluster:          newDataNodeCluster(ch),
+		msFactory:        factory,
 	}
 	s.insertChannels = s.getInsertChannels()
 	s.state.Store(internalpb2.StateCode_INITIALIZING)
@@ -130,6 +131,15 @@ func (s *Server) Init() error {
 
 func (s *Server) Start() error {
 	var err error
+	m := map[string]interface{}{
+		"PulsarAddress":  Params.PulsarAddress,
+		"ReceiveBufSize": 1024,
+		"PulsarBufSize":  1024}
+	err = s.msFactory.SetParams(m)
+	if err != nil {
+		return err
+	}
+
 	s.allocator = newAllocatorImpl(s.masterClient)
 	if err = s.initMeta(); err != nil {
 		return err
@@ -171,23 +181,21 @@ func (s *Server) initMeta() error {
 }
 
 func (s *Server) initSegmentInfoChannel() {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	segmentInfoStream, _ := factory.NewMsgStream(s.ctx)
+	segmentInfoStream, _ := s.msFactory.NewMsgStream(s.ctx)
 	segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
 	s.segmentInfoStream = segmentInfoStream
 	s.segmentInfoStream.Start()
 }
 func (s *Server) initMsgProducer() error {
 	var err error
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
+	if s.ttMsgStream, err = s.msFactory.NewTtMsgStream(s.ctx); err != nil {
 		return err
 	}
 	s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
 	s.ttMsgStream.Start()
 	s.ttBarrier = timesync.NewHardTimeTickBarrier(s.ctx, s.ttMsgStream, s.cluster.GetNodeIDs())
 	s.ttBarrier.Start()
-	if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
+	if s.k2sMsgStream, err = s.msFactory.NewMsgStream(s.ctx); err != nil {
 		return err
 	}
 	s.k2sMsgStream.AsProducer(Params.K2SChannelNames)
@@ -308,8 +316,7 @@ func (s *Server) startServerLoop() {
 
 func (s *Server) startStatsChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	statsStream, _ := factory.NewMsgStream(ctx)
+	statsStream, _ := s.msFactory.NewMsgStream(ctx)
 	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
 	statsStream.Start()
 	defer statsStream.Close()
@@ -334,8 +341,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
 
 func (s *Server) startSegmentFlushChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	flushStream, _ := factory.NewMsgStream(ctx)
+	flushStream, _ := s.msFactory.NewMsgStream(ctx)
 	flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
 	flushStream.Start()
 	defer flushStream.Close()
@@ -370,8 +376,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
 
 func (s *Server) startDDChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	ddStream, _ := factory.NewMsgStream(ctx)
+	ddStream, _ := s.msFactory.NewMsgStream(ctx)
 	ddStream.AsConsumer([]string{s.ddChannelName}, Params.DataServiceSubscriptionName)
 	ddStream.Start()
 	defer ddStream.Close()
@@ -603,7 +608,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
 			Segment: segmentInfo,
 		},
 	}
-	msgPack := &pulsarms.MsgPack{
+	msgPack := &msgstream.MsgPack{
 		Msgs: []msgstream.TsMsg{infoMsg},
 	}
 	if err = s.segmentInfoStream.Produce(msgPack); err != nil {
diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go
index aa3a06717..080fc27eb 100644
--- a/internal/distributed/datanode/service.go
+++ b/internal/distributed/datanode/service.go
@@ -8,6 +8,7 @@ import (
 
 	dn "github.com/zilliztech/milvus-distributed/internal/datanode"
 	"github.com/zilliztech/milvus-distributed/internal/errors"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -24,16 +25,19 @@ type Server struct {
 
 	ctx    context.Context
 	cancel context.CancelFunc
+
+	msFactory msgstream.Factory
 }
 
-func New(ctx context.Context) (*Server, error) {
+func New(ctx context.Context, factory msgstream.Factory) (*Server, error) {
 	ctx1, cancel := context.WithCancel(ctx)
 	var s = &Server{
-		ctx:    ctx1,
-		cancel: cancel,
+		ctx:       ctx1,
+		cancel:    cancel,
+		msFactory: factory,
 	}
 
-	s.core = dn.NewDataNode(s.ctx)
+	s.core = dn.NewDataNode(s.ctx, s.msFactory)
 	s.grpcServer = grpc.NewServer()
 	datapb.RegisterDataNodeServer(s.grpcServer, s)
 	addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10)
diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go
index 13a8fab20..7f3990bf1 100644
--- a/internal/distributed/dataservice/grpc_service.go
+++ b/internal/distributed/dataservice/grpc_service.go
@@ -13,6 +13,7 @@ import (
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -28,11 +29,11 @@ func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInf
 	return s.server.GetSegmentInfo(request)
 }
 
-func NewGrpcService(ctx context.Context) *Service {
+func NewGrpcService(ctx context.Context, factory msgstream.Factory) *Service {
 	s := &Service{}
 	var err error
 	s.ctx = ctx
-	s.server, err = dataservice.CreateServer(s.ctx)
+	s.server, err = dataservice.CreateServer(s.ctx, factory)
 	if err != nil {
 		log.Fatalf("create server error: %s", err.Error())
 		return nil
diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go
index 0a8e75495..f69d7a212 100644
--- a/internal/distributed/masterservice/masterservice_test.go
+++ b/internal/distributed/masterservice/masterservice_test.go
@@ -12,6 +12,7 @@ import (
 	"github.com/golang/protobuf/proto"
 	"github.com/stretchr/testify/assert"
 	cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -27,7 +28,8 @@ func TestGrpcService(t *testing.T) {
 	//cms.Params.Address = "127.0.0.1"
 	cms.Params.Port = (randVal % 100) + 10000
 
-	svr, err := NewGrpcServer(context.Background())
+	msFactory := pulsarms.NewFactory()
+	svr, err := NewGrpcServer(context.Background(), msFactory)
 	assert.Nil(t, err)
 
 	// cms.Params.NodeID = 0
diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go
index ebd1b87d2..491b6c5bc 100644
--- a/internal/distributed/masterservice/server.go
+++ b/internal/distributed/masterservice/server.go
@@ -8,6 +8,7 @@ import (
 
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
@@ -26,13 +27,14 @@ type GrpcServer struct {
 	cancel context.CancelFunc
 }
 
-func NewGrpcServer(ctx context.Context) (*GrpcServer, error) {
+func NewGrpcServer(ctx context.Context, factory msgstream.Factory) (*GrpcServer, error) {
 	s := &GrpcServer{}
 	var err error
 	s.ctx, s.cancel = context.WithCancel(ctx)
-	if s.core, err = cms.NewCore(s.ctx); err != nil {
+	if s.core, err = cms.NewCore(s.ctx, factory); err != nil {
 		return nil, err
 	}
+
 	s.grpcServer = grpc.NewServer()
 	s.grpcError = nil
 	masterpb.RegisterMasterServiceServer(s.grpcServer, s)
diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go
index d69f30782..5963c568f 100644
--- a/internal/distributed/proxynode/service.go
+++ b/internal/distributed/proxynode/service.go
@@ -11,9 +11,9 @@ import (
 
 	grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
 
-	"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
-
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+	"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
 
 	"google.golang.org/grpc"
 
@@ -48,7 +48,7 @@ type Server struct {
 	indexServiceClient  *grpcindexserviceclient.Client
 }
 
-func NewServer(ctx context.Context) (*Server, error) {
+func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
 
 	server := &Server{
 		ctx:         ctx,
@@ -56,7 +56,7 @@ func NewServer(ctx context.Context) (*Server, error) {
 	}
 
 	var err error
-	server.impl, err = proxynode.NewProxyNodeImpl(server.ctx)
+	server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go
index 19fe18ecd..e40f5c974 100644
--- a/internal/distributed/proxyservice/service.go
+++ b/internal/distributed/proxyservice/service.go
@@ -7,6 +7,7 @@ import (
 	"strconv"
 	"sync"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@@ -27,7 +28,7 @@ type Server struct {
 	impl *proxyservice.ServiceImpl
 }
 
-func NewServer(ctx1 context.Context) (*Server, error) {
+func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) {
 	ctx, cancel := context.WithCancel(ctx1)
 
 	server := &Server{
@@ -37,7 +38,7 @@ func NewServer(ctx1 context.Context) (*Server, error) {
 	}
 
 	var err error
-	server.impl, err = proxyservice.NewServiceImpl(server.ctx)
+	server.impl, err = proxyservice.NewServiceImpl(server.ctx, factory)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go
index 0a0f6e615..2d0d1249c 100644
--- a/internal/distributed/querynode/service.go
+++ b/internal/distributed/querynode/service.go
@@ -9,6 +9,7 @@ import (
 
 	"google.golang.org/grpc"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
@@ -26,10 +27,10 @@ type Server struct {
 	cancel context.CancelFunc
 }
 
-func NewServer(ctx context.Context) (*Server, error) {
+func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
 	s := &Server{
 		ctx:  ctx,
-		node: qn.NewQueryNodeWithoutID(ctx),
+		node: qn.NewQueryNodeWithoutID(ctx, factory),
 	}
 
 	qn.Params.Init()
diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go
index 0fc2208c7..bc2ae5b19 100644
--- a/internal/distributed/queryservice/service.go
+++ b/internal/distributed/queryservice/service.go
@@ -9,6 +9,7 @@ import (
 
 	"google.golang.org/grpc"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@@ -25,11 +26,13 @@ type Server struct {
 	loopCancel context.CancelFunc
 
 	queryService *qs.QueryService
+
+	msFactory msgstream.Factory
 }
 
-func NewServer(ctx context.Context) (*Server, error) {
+func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
 	ctx1, cancel := context.WithCancel(ctx)
-	service, err := qs.NewQueryService(ctx1)
+	service, err := qs.NewQueryService(ctx1, factory)
 	if err != nil {
 		cancel()
 		return nil, err
@@ -39,6 +42,7 @@ func NewServer(ctx context.Context) (*Server, error) {
 		queryService: service,
 		loopCtx:      ctx1,
 		loopCancel:   cancel,
+		msFactory:    factory,
 	}, nil
 }
 
diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go
index 2862e77df..02095d592 100644
--- a/internal/masterservice/master_service.go
+++ b/internal/masterservice/master_service.go
@@ -12,7 +12,6 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
 	ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
@@ -181,17 +180,20 @@ type Core struct {
 	initOnce  sync.Once
 	startOnce sync.Once
 	isInit    atomic.Value
+
+	msFactory ms.Factory
 }
 
 // --------------------- function --------------------------
 
-func NewCore(c context.Context) (*Core, error) {
+func NewCore(c context.Context, factory ms.Factory) (*Core, error) {
 	ctx, cancel := context.WithCancel(c)
 	rand.Seed(time.Now().UnixNano())
 	Params.Init()
 	core := &Core{
-		ctx:    ctx,
-		cancel: cancel,
+		ctx:       ctx,
+		cancel:    cancel,
+		msFactory: factory,
 	}
 	core.stateCode.Store(internalpb2.StateCode_INITIALIZING)
 	core.isInit.Store(false)
@@ -414,7 +416,6 @@ func (c *Core) tsLoop() {
 	}
 }
 func (c *Core) setMsgStreams() error {
-
 	if Params.PulsarAddress == "" {
 		return errors.Errorf("PulsarAddress is empty")
 	}
@@ -427,8 +428,17 @@ func (c *Core) setMsgStreams() error {
 		return errors.Errorf("ProxyTimeTickChannel is empty")
 	}
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	proxyTimeTickStream, _ := factory.NewMsgStream(c.ctx)
+	var err error
+	m := map[string]interface{}{
+		"PulsarAddress":  Params.PulsarAddress,
+		"ReceiveBufSize": 1024,
+		"PulsarBufSize":  1024}
+	err = c.msFactory.SetParams(m)
+	if err != nil {
+		return err
+	}
+
+	proxyTimeTickStream, _ := c.msFactory.NewMsgStream(c.ctx)
 	proxyTimeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName)
 	proxyTimeTickStream.Start()
 
@@ -436,14 +446,14 @@ func (c *Core) setMsgStreams() error {
 	if Params.TimeTickChannel == "" {
 		return errors.Errorf("TimeTickChannel is empty")
 	}
-	timeTickStream, _ := factory.NewMsgStream(c.ctx)
+	timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx)
 	timeTickStream.AsProducer([]string{Params.TimeTickChannel})
 
 	// master dd channel
 	if Params.DdChannel == "" {
 		return errors.Errorf("DdChannel is empty")
 	}
-	ddStream, _ := factory.NewMsgStream(c.ctx)
+	ddStream, _ := c.msFactory.NewMsgStream(c.ctx)
 	ddStream.AsProducer([]string{Params.DdChannel})
 
 	c.SendTimeTick = func(t typeutil.Timestamp) error {
@@ -577,7 +587,7 @@ func (c *Core) setMsgStreams() error {
 	if Params.DataServiceSegmentChannel == "" {
 		return errors.Errorf("DataServiceSegmentChannel is empty")
 	}
-	dataServiceStream, _ := factory.NewMsgStream(c.ctx)
+	dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx)
 	dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName)
 	dataServiceStream.Start()
 	c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024)
diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go
index de1bebbae..7c1210a8c 100644
--- a/internal/masterservice/master_service_test.go
+++ b/internal/masterservice/master_service_test.go
@@ -148,7 +148,8 @@ func TestMasterService(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	core, err := NewCore(ctx)
+	msFactory := pulsarms.NewFactory()
+	core, err := NewCore(ctx, msFactory)
 	assert.Nil(t, err)
 	randVal := rand.Int()
 
@@ -192,18 +193,24 @@ func TestMasterService(t *testing.T) {
 	err = core.Start()
 	assert.Nil(t, err)
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-	proxyTimeTickStream, _ := factory.NewMsgStream(ctx)
+	m := map[string]interface{}{
+		"receiveBufSize": 1024,
+		"pulsarAddress":  Params.PulsarAddress,
+		"pulsarBufSize":  1024}
+	err = msFactory.SetParams(m)
+	assert.Nil(t, err)
+
+	proxyTimeTickStream, _ := msFactory.NewMsgStream(ctx)
 	proxyTimeTickStream.AsProducer([]string{Params.ProxyTimeTickChannel})
 
-	dataServiceSegmentStream, _ := factory.NewMsgStream(ctx)
+	dataServiceSegmentStream, _ := msFactory.NewMsgStream(ctx)
 	dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel})
 
-	timeTickStream, _ := factory.NewMsgStream(ctx)
+	timeTickStream, _ := msFactory.NewMsgStream(ctx)
 	timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName)
 	timeTickStream.Start()
 
-	ddStream, _ := factory.NewMsgStream(ctx)
+	ddStream, _ := msFactory.NewMsgStream(ctx)
 	ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName)
 	ddStream.Start()
 
diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go
index e62e00595..a56ce7463 100644
--- a/internal/msgstream/msgstream.go
+++ b/internal/msgstream/msgstream.go
@@ -37,6 +37,7 @@ type MsgStream interface {
 }
 
 type Factory interface {
+	SetParams(params map[string]interface{}) error
 	NewMsgStream(ctx context.Context) (MsgStream, error)
 	NewTtMsgStream(ctx context.Context) (MsgStream, error)
 }
diff --git a/internal/msgstream/pulsarms/factory.go b/internal/msgstream/pulsarms/factory.go
index 8e10c2e3f..8f5da7e04 100644
--- a/internal/msgstream/pulsarms/factory.go
+++ b/internal/msgstream/pulsarms/factory.go
@@ -3,30 +3,39 @@ package pulsarms
 import (
 	"context"
 
+	"github.com/mitchellh/mapstructure"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 )
 
 type Factory struct {
 	dispatcherFactory msgstream.ProtoUDFactory
-	address           string
-	receiveBufSize    int64
-	pulsarBufSize     int64
+	// the following members must be public, so that mapstructure.Decode() can access them
+	PulsarAddress  string
+	ReceiveBufSize int64
+	PulsarBufSize  int64
+}
+
+func (f *Factory) SetParams(params map[string]interface{}) error {
+	err := mapstructure.Decode(params, f)
+	if err != nil {
+		return err
+	}
+	return nil
 }
 
 func (f *Factory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
-	return newPulsarMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
+	return newPulsarMsgStream(ctx, f.PulsarAddress, f.ReceiveBufSize, f.PulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
 }
 
 func (f *Factory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
-	return NewPulsarTtMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
+	return newPulsarTtMsgStream(ctx, f.PulsarAddress, f.ReceiveBufSize, f.PulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
 }
 
-func NewFactory(address string, receiveBufSize int64, pulsarBufSize int64) *Factory {
+func NewFactory() msgstream.Factory {
 	f := &Factory{
 		dispatcherFactory: msgstream.ProtoUDFactory{},
-		address:           address,
-		receiveBufSize:    receiveBufSize,
-		pulsarBufSize:     pulsarBufSize,
+		ReceiveBufSize:    64,
+		PulsarBufSize:     64,
 	}
 	return f
 }
diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go
index 9e5ada560..f8dbe5dce 100644
--- a/internal/msgstream/pulsarms/pulsar_msgstream.go
+++ b/internal/msgstream/pulsarms/pulsar_msgstream.go
@@ -409,7 +409,7 @@ type PulsarTtMsgStream struct {
 	lastTimeStamp Timestamp
 }
 
-func NewPulsarTtMsgStream(ctx context.Context,
+func newPulsarTtMsgStream(ctx context.Context,
 	address string,
 	receiveBufSize int64,
 	pulsarBufSize int64,
diff --git a/internal/msgstream/pulsarms/pulsar_msgstream_test.go b/internal/msgstream/pulsarms/pulsar_msgstream_test.go
index b406cbb74..255aaf749 100644
--- a/internal/msgstream/pulsarms/pulsar_msgstream_test.go
+++ b/internal/msgstream/pulsarms/pulsar_msgstream_test.go
@@ -210,7 +210,7 @@ func initPulsarTtStream(pulsarAddress string,
 	var input msgstream.MsgStream = inputStream
 
 	// set output stream
-	outputStream, _ := NewPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
+	outputStream, _ := newPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher())
 	outputStream.AsConsumer(consumerChannels, consumerSubName)
 	outputStream.Start()
 	var output msgstream.MsgStream = outputStream
diff --git a/internal/msgstream/rmqms/factory.go b/internal/msgstream/rmqms/factory.go
index 0da978f1c..31f2d730a 100644
--- a/internal/msgstream/rmqms/factory.go
+++ b/internal/msgstream/rmqms/factory.go
@@ -3,23 +3,39 @@ package rmqms
 import (
 	"context"
 
+	"github.com/mitchellh/mapstructure"
+
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 )
 
 type Factory struct {
 	dispatcherFactory msgstream.ProtoUDFactory
-	receiveBufSize    int64
-	rmqBufSize        int64
+	// the following members must be public, so that mapstructure.Decode() can access them
+	ReceiveBufSize int64
+	RmqBufSize     int64
+}
+
+func (f *Factory) SetParams(params map[string]interface{}) error {
+	err := mapstructure.Decode(params, f)
+	if err != nil {
+		return err
+	}
+	return nil
 }
 
 func (f *Factory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
-	return newRmqMsgStream(ctx, f.receiveBufSize, f.rmqBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
+	return newRmqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
+}
+
+func (f *Factory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
+	return newRmqTtMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, f.dispatcherFactory.NewUnmarshalDispatcher())
 }
 
-func NewFactory(address string, receiveBufSize int64, pulsarBufSize int64) *Factory {
+func NewFactory() msgstream.Factory {
 	f := &Factory{
 		dispatcherFactory: msgstream.ProtoUDFactory{},
-		receiveBufSize:    receiveBufSize,
+		ReceiveBufSize:    64,
+		RmqBufSize:        64,
 	}
 	return f
 }
diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go
index 6bcaa9458..5bfbfbdd0 100644
--- a/internal/msgstream/rmqms/rmq_msgstream.go
+++ b/internal/msgstream/rmqms/rmq_msgstream.go
@@ -350,7 +350,7 @@ type RmqTtMsgStream struct {
 	lastTimeStamp Timestamp
 }
 
-func NewRmqTtMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64,
+func newRmqTtMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64,
 	unmarshal msgstream.UnmarshalDispatcher) (*RmqTtMsgStream, error) {
 	rmqMsgStream, err := newRmqMsgStream(ctx, receiveBufSize, rmqBufSize, unmarshal)
 	if err != nil {
diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go
index 3aa1e08e0..ef01ece4a 100644
--- a/internal/proxynode/insert_channels.go
+++ b/internal/proxynode/insert_channels.go
@@ -8,8 +8,6 @@ import (
 	"strconv"
 	"sync"
 
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
-
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
@@ -78,6 +76,7 @@ type InsertChannelsMap struct {
 	usageHistogram              []int                 // message stream can be closed only when the use count is zero
 	mtx                         sync.RWMutex
 	nodeInstance                *NodeImpl
+	msFactory                   msgstream.Factory
 }
 
 func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []string) error {
@@ -101,8 +100,7 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st
 	m.insertChannels = append(m.insertChannels, channels)
 	m.collectionID2InsertChannels[collID] = len(m.insertChannels) - 1
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamInsertBufSize, 1024)
-	stream, _ := factory.NewMsgStream(context.Background())
+	stream, _ := m.msFactory.NewMsgStream(context.Background())
 	stream.AsProducer(channels)
 	repack := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
 		return insertRepackFunc(tsMsgs, hashKeys, m.nodeInstance.segAssigner, true)
@@ -198,6 +196,7 @@ func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap {
 		droppedBitMap:               make([]int, 0),
 		usageHistogram:              make([]int, 0),
 		nodeInstance:                node,
+		msFactory:                   node.msFactory,
 	}
 }
 
diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go
index 850027122..2408eb0e6 100644
--- a/internal/proxynode/proxy_node.go
+++ b/internal/proxynode/proxy_node.go
@@ -15,8 +15,6 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
-
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 
 	"github.com/zilliztech/milvus-distributed/internal/allocator"
@@ -53,6 +51,7 @@ type NodeImpl struct {
 
 	manipulationMsgStream msgstream.MsgStream
 	queryMsgStream        msgstream.MsgStream
+	msFactory             msgstream.Factory
 
 	closer io.Closer
 
@@ -61,12 +60,13 @@ type NodeImpl struct {
 	closeCallbacks []func()
 }
 
-func NewProxyNodeImpl(ctx context.Context) (*NodeImpl, error) {
+func NewProxyNodeImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error) {
 	rand.Seed(time.Now().UnixNano())
 	ctx1, cancel := context.WithCancel(ctx)
 	node := &NodeImpl{
-		ctx:    ctx1,
-		cancel: cancel,
+		ctx:       ctx1,
+		cancel:    cancel,
+		msFactory: factory,
 	}
 
 	return node, nil
@@ -102,7 +102,6 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string)
 }
 
 func (node *NodeImpl) Init() error {
-
 	// todo wait for proxyservice state changed to Healthy
 
 	err := node.waitForServiceReady(node.proxyServiceClient, "ProxyService")
@@ -131,8 +130,6 @@ func (node *NodeImpl) Init() error {
 		return err
 	}
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024)
-
 	// wait for dataservice state changed to Healthy
 	if node.dataServiceClient != nil {
 		err = node.waitForServiceReady(node.dataServiceClient, "DataService")
@@ -179,7 +176,16 @@ func (node *NodeImpl) Init() error {
 	//	return err
 	//}
 
-	node.queryMsgStream, _ = factory.NewMsgStream(node.ctx)
+	m := map[string]interface{}{
+		"PulsarAddress":  Params.PulsarAddress,
+		"ReceiveBufSize": Params.MsgStreamSearchBufSize,
+		"PulsarBufSize":  1024}
+	err = node.msFactory.SetParams(m)
+	if err != nil {
+		return err
+	}
+
+	node.queryMsgStream, _ = node.msFactory.NewMsgStream(node.ctx)
 	node.queryMsgStream.AsProducer(Params.SearchChannelNames)
 	log.Println("create query message stream ...")
 
@@ -206,7 +212,7 @@ func (node *NodeImpl) Init() error {
 	node.segAssigner = segAssigner
 	node.segAssigner.PeerID = Params.ProxyID
 
-	node.manipulationMsgStream, _ = factory.NewMsgStream(node.ctx)
+	node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx)
 	node.manipulationMsgStream.AsProducer(Params.InsertChannelNames)
 	repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
 		return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
@@ -214,12 +220,12 @@ func (node *NodeImpl) Init() error {
 	node.manipulationMsgStream.SetRepackFunc(repackFuncImpl)
 	log.Println("create manipulation message stream ...")
 
-	node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator)
+	node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory)
 	if err != nil {
 		return err
 	}
 
-	node.tick = newTimeTick(node.ctx, node.tsoAllocator, time.Millisecond*200, node.sched.TaskDoneTest)
+	node.tick = newTimeTick(node.ctx, node.tsoAllocator, time.Millisecond*200, node.sched.TaskDoneTest, node.msFactory)
 
 	return nil
 }
diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go
index 24cbc5771..fdfab974b 100644
--- a/internal/proxynode/task_scheduler.go
+++ b/internal/proxynode/task_scheduler.go
@@ -13,7 +13,6 @@ import (
 
 	"github.com/zilliztech/milvus-distributed/internal/allocator"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 type TaskQueue interface {
@@ -247,17 +246,21 @@ type TaskScheduler struct {
 	wg     sync.WaitGroup
 	ctx    context.Context
 	cancel context.CancelFunc
+
+	msFactory msgstream.Factory
 }
 
 func NewTaskScheduler(ctx context.Context,
 	idAllocator *allocator.IDAllocator,
-	tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) {
+	tsoAllocator *allocator.TimestampAllocator,
+	factory msgstream.Factory) (*TaskScheduler, error) {
 	ctx1, cancel := context.WithCancel(ctx)
 	s := &TaskScheduler{
 		idAllocator:  idAllocator,
 		tsoAllocator: tsoAllocator,
 		ctx:          ctx1,
 		cancel:       cancel,
+		msFactory:    factory,
 	}
 	s.DdQueue = NewDdTaskQueue(s)
 	s.DmQueue = NewDmTaskQueue(s)
@@ -371,9 +374,8 @@ func (sched *TaskScheduler) queryLoop() {
 
 func (sched *TaskScheduler) queryResultLoop() {
 	defer sched.wg.Done()
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchResultBufSize, 1024)
 
-	queryResultMsgStream, _ := factory.NewMsgStream(sched.ctx)
+	queryResultMsgStream, _ := sched.msFactory.NewMsgStream(sched.ctx)
 	queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames,
 		Params.ProxySubName)
 	queryNodeNum := Params.QueryNodeNum
diff --git a/internal/proxynode/timetick.go b/internal/proxynode/timetick.go
index 30861feb2..1e76fcf6e 100644
--- a/internal/proxynode/timetick.go
+++ b/internal/proxynode/timetick.go
@@ -13,7 +13,6 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar"
 	"github.com/zilliztech/milvus-distributed/internal/allocator"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 type tickCheckFunc = func(Timestamp) bool
@@ -27,6 +26,7 @@ type timeTick struct {
 
 	tsoAllocator  *allocator.TimestampAllocator
 	tickMsgStream msgstream.MsgStream
+	msFactory     msgstream.Factory
 
 	peerID    UniqueID
 	wg        sync.WaitGroup
@@ -40,7 +40,8 @@ type timeTick struct {
 func newTimeTick(ctx context.Context,
 	tsoAllocator *allocator.TimestampAllocator,
 	interval time.Duration,
-	checkFunc tickCheckFunc) *timeTick {
+	checkFunc tickCheckFunc,
+	factory msgstream.Factory) *timeTick {
 	ctx1, cancel := context.WithCancel(ctx)
 	t := &timeTick{
 		ctx:          ctx1,
@@ -49,10 +50,10 @@ func newTimeTick(ctx context.Context,
 		interval:     interval,
 		peerID:       Params.ProxyID,
 		checkFunc:    checkFunc,
+		msFactory:    factory,
 	}
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamTimeTickBufSize, 1024)
-	t.tickMsgStream, _ = factory.NewMsgStream(t.ctx)
+	t.tickMsgStream, _ = t.msFactory.NewMsgStream(t.ctx)
 	t.tickMsgStream.AsProducer(Params.ProxyTimeTickChannelNames)
 	return t
 }
diff --git a/internal/proxynode/timetick_test.go b/internal/proxynode/timetick_test.go
index 6520a99fc..af73f5dec 100644
--- a/internal/proxynode/timetick_test.go
+++ b/internal/proxynode/timetick_test.go
@@ -9,6 +9,7 @@ import (
 
 	"github.com/stretchr/testify/assert"
 	"github.com/zilliztech/milvus-distributed/internal/allocator"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 )
 
 var trueCnt = 0
@@ -34,7 +35,8 @@ func TestTimeTick_Start2(t *testing.T) {
 	err = tsoAllocator.Start()
 	assert.Nil(t, err)
 
-	tt := newTimeTick(ctx, tsoAllocator, Params.TimeTickInterval, checkFunc)
+	msFactory := pulsarms.NewFactory()
+	tt := newTimeTick(ctx, tsoAllocator, Params.TimeTickInterval, checkFunc, msFactory)
 
 	defer func() {
 		cancel()
diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go
index 379694f3a..6a70511d8 100644
--- a/internal/proxyservice/impl.go
+++ b/internal/proxyservice/impl.go
@@ -10,8 +10,6 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
-
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@@ -95,15 +93,22 @@ func (s *ServiceImpl) fillNodeInitParams() error {
 }
 
 func (s *ServiceImpl) Init() error {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
-
 	err := s.fillNodeInitParams()
 	if err != nil {
 		return err
 	}
 	log.Println("fill node init params ...")
 
-	serviceTimeTickMsgStream, _ := factory.NewTtMsgStream(s.ctx)
+	m := map[string]interface{}{
+		"PulsarAddress":  Params.PulsarAddress,
+		"ReceiveBufSize": 1024,
+		"PulsarBufSize":  1024}
+	err = s.msFactory.SetParams(m)
+	if err != nil {
+		return err
+	}
+
+	serviceTimeTickMsgStream, _ := s.msFactory.NewTtMsgStream(s.ctx)
 	serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel})
 	log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel})
 
@@ -112,11 +117,11 @@ func (s *ServiceImpl) Init() error {
 	for ; i < Params.InsertChannelNum; i++ {
 		channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
 	}
-	insertTickMsgStream, _ := factory.NewMsgStream(s.ctx)
+	insertTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx)
 	insertTickMsgStream.AsProducer(channels)
 	log.Println("create insert time tick producer channel: ", channels)
 
-	nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx)
+	nodeTimeTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx)
 	nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel,
 		"proxyservicesub") // TODO: add config
 	log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel)
diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go
index e6e48337b..66cc49f71 100644
--- a/internal/proxyservice/proxyservice.go
+++ b/internal/proxyservice/proxyservice.go
@@ -7,8 +7,8 @@ import (
 
 	"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
-
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 )
 
@@ -26,14 +26,17 @@ type ServiceImpl struct {
 
 	ctx    context.Context
 	cancel context.CancelFunc
+
+	msFactory msgstream.Factory
 }
 
-func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) {
+func NewServiceImpl(ctx context.Context, factory msgstream.Factory) (*ServiceImpl, error) {
 	rand.Seed(time.Now().UnixNano())
 	ctx1, cancel := context.WithCancel(ctx)
 	s := &ServiceImpl{
-		ctx:    ctx1,
-		cancel: cancel,
+		ctx:       ctx1,
+		cancel:    cancel,
+		msFactory: factory,
 	}
 
 	s.allocator = NewNodeIDAllocator()
diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go
index 04856af94..e0ebdc83f 100644
--- a/internal/querynode/data_sync_service.go
+++ b/internal/querynode/data_sync_service.go
@@ -12,18 +12,19 @@ type dataSyncService struct {
 	ctx context.Context
 	fg  *flowgraph.TimeTickedFlowGraph
 
-	dmStream msgstream.MsgStream
-	ddStream msgstream.MsgStream
+	dmStream  msgstream.MsgStream
+	ddStream  msgstream.MsgStream
+	msFactory msgstream.Factory
 
 	replica collectionReplica
 }
 
-func newDataSyncService(ctx context.Context, replica collectionReplica) *dataSyncService {
+func newDataSyncService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *dataSyncService {
 	service := &dataSyncService{
-		ctx: ctx,
-		fg:  nil,
-
-		replica: replica,
+		ctx:       ctx,
+		fg:        nil,
+		replica:   replica,
+		msFactory: factory,
 	}
 
 	service.initNodes()
@@ -52,7 +53,7 @@ func (dsService *dataSyncService) initNodes() {
 	var ddNode node = newDDNode(dsService.replica)
 
 	var insertNode node = newInsertNode(dsService.replica)
-	var serviceTimeNode node = newServiceTimeNode(dsService.ctx, dsService.replica)
+	var serviceTimeNode node = newServiceTimeNode(dsService.ctx, dsService.replica, dsService.msFactory)
 	var gcNode node = newGCNode(dsService.replica)
 
 	dsService.fg.AddNode(&dmStreamNode)
diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go
index dc961aa87..7b3ef3288 100644
--- a/internal/querynode/data_sync_service_test.go
+++ b/internal/querynode/data_sync_service_test.go
@@ -109,11 +109,18 @@ func TestDataSyncService_Start(t *testing.T) {
 	ddChannels := Params.DDChannelNames
 	pulsarURL := Params.PulsarAddress
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"receiveBufSize": receiveBufSize,
+		"pulsarAddress":  pulsarURL,
+		"pulsarBufSize":  1024}
+	err := msFactory.SetParams(m)
+	assert.Nil(t, err)
+
+	insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	insertStream.AsProducer(insertChannels)
 
-	ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	ddStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	ddStream.AsProducer(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
@@ -122,7 +129,7 @@ func TestDataSyncService_Start(t *testing.T) {
 	var ddMsgStream msgstream.MsgStream = ddStream
 	ddMsgStream.Start()
 
-	err := insertMsgStream.Produce(&msgPack)
+	err = insertMsgStream.Produce(&msgPack)
 	assert.NoError(t, err)
 
 	err = insertMsgStream.Broadcast(&timeTickMsgPack)
@@ -131,7 +138,7 @@ func TestDataSyncService_Start(t *testing.T) {
 	assert.NoError(t, err)
 
 	// dataSync
-	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
+	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory)
 	go node.dataSyncService.start()
 
 	<-node.queryNodeLoopCtx.Done()
diff --git a/internal/querynode/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go
index 1816d0e83..d91a302a7 100644
--- a/internal/querynode/flow_graph_msg_stream_input_nodes.go
+++ b/internal/querynode/flow_graph_msg_stream_input_nodes.go
@@ -3,15 +3,12 @@ package querynode
 import (
 	"context"
 
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
 )
 
 func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.InsertReceiveBufSize, Params.InsertPulsarBufSize)
-
 	// query node doesn't need to consume any topic
-	insertStream, _ := factory.NewTtMsgStream(ctx)
+	insertStream, _ := dsService.msFactory.NewTtMsgStream(ctx)
 	dsService.dmStream = insertStream
 
 	maxQueueLength := Params.FlowGraphMaxQueueLength
@@ -22,12 +19,10 @@ func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph
 }
 
 func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph.InputNode {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.DDReceiveBufSize, Params.DDPulsarBufSize)
-
 	consumeChannels := Params.DDChannelNames
 	consumeSubName := Params.MsgChannelSubName
 
-	ddStream, _ := factory.NewTtMsgStream(ctx)
+	ddStream, _ := dsService.msFactory.NewTtMsgStream(ctx)
 	ddStream.AsConsumer(consumeChannels, consumeSubName)
 
 	dsService.ddStream = ddStream
diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go
index 651be6fbc..85e9d8767 100644
--- a/internal/querynode/flow_graph_service_time_node.go
+++ b/internal/querynode/flow_graph_service_time_node.go
@@ -5,7 +5,6 @@ import (
 	"log"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 )
@@ -70,7 +69,7 @@ func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error {
 	return stNode.timeTickMsgStream.Produce(&msgPack)
 }
 
-func newServiceTimeNode(ctx context.Context, replica collectionReplica) *serviceTimeNode {
+func newServiceTimeNode(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *serviceTimeNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
@@ -78,7 +77,6 @@ func newServiceTimeNode(ctx context.Context, replica collectionReplica) *service
 	baseNode.SetMaxQueueLength(maxQueueLength)
 	baseNode.SetMaxParallelism(maxParallelism)
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.SearchReceiveBufSize, 1024)
 	timeTimeMsgStream, _ := factory.NewMsgStream(ctx)
 	timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName})
 
diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go
index 247c76b3c..a8f2145da 100644
--- a/internal/querynode/load_service_test.go
+++ b/internal/querynode/load_service_test.go
@@ -1014,14 +1014,22 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
 	const receiveBufSize = 1024
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
-	pulsarURL := Params.PulsarAddress
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(ctx)
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"receiveBufSize": receiveBufSize,
+		"pulsarAddress":  Params.PulsarAddress,
+		"pulsarBufSize":  1024}
+	err := msFactory.SetParams(m)
+	if err != nil {
+		return err
+	}
+
+	insertStream, _ := msFactory.NewMsgStream(ctx)
 	insertStream.AsProducer(insertChannels)
 	insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName)
 
-	ddStream, _ := factory.NewMsgStream(ctx)
+	ddStream, _ := msFactory.NewMsgStream(ctx)
 	ddStream.AsProducer(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
@@ -1030,7 +1038,7 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
 	var ddMsgStream msgstream.MsgStream = ddStream
 	ddMsgStream.Start()
 
-	err := insertMsgStream.Produce(&msgPack)
+	err = insertMsgStream.Produce(&msgPack)
 	if err != nil {
 		return err
 	}
@@ -1072,14 +1080,22 @@ func sentTimeTick(ctx context.Context) error {
 	const receiveBufSize = 1024
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
-	pulsarURL := Params.PulsarAddress
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	insertStream, _ := factory.NewMsgStream(ctx)
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"receiveBufSize": receiveBufSize,
+		"pulsarAddress":  Params.PulsarAddress,
+		"pulsarBufSize":  1024}
+	err := msFactory.SetParams(m)
+	if err != nil {
+		return err
+	}
+
+	insertStream, _ := msFactory.NewMsgStream(ctx)
 	insertStream.AsProducer(insertChannels)
 	insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName)
 
-	ddStream, _ := factory.NewMsgStream(ctx)
+	ddStream, _ := msFactory.NewMsgStream(ctx)
 	ddStream.AsProducer(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
@@ -1088,7 +1104,7 @@ func sentTimeTick(ctx context.Context) error {
 	var ddMsgStream msgstream.MsgStream = ddStream
 	ddMsgStream.Start()
 
-	err := insertMsgStream.Broadcast(&timeTickMsgPack)
+	err = insertMsgStream.Broadcast(&timeTickMsgPack)
 	if err != nil {
 		return err
 	}
diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go
index 2d79e8a69..3dde735b8 100644
--- a/internal/querynode/query_node.go
+++ b/internal/querynode/query_node.go
@@ -16,6 +16,7 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"io"
 	"log"
 	"sync/atomic"
@@ -66,9 +67,11 @@ type QueryNode struct {
 	queryClient  QueryServiceInterface
 	indexClient  IndexServiceInterface
 	dataClient   DataServiceInterface
+
+	msFactory msgstream.Factory
 }
 
-func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
+func NewQueryNode(ctx context.Context, queryNodeID uint64, factory msgstream.Factory) *QueryNode {
 	ctx1, cancel := context.WithCancel(ctx)
 	node := &QueryNode{
 		queryNodeLoopCtx:    ctx1,
@@ -79,6 +82,8 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
 		metaService:     nil,
 		searchService:   nil,
 		statsService:    nil,
+
+		msFactory: factory,
 	}
 
 	node.replica = newCollectionReplicaImpl()
@@ -86,7 +91,7 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
 	return node
 }
 
-func NewQueryNodeWithoutID(ctx context.Context) *QueryNode {
+func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode {
 	ctx1, cancel := context.WithCancel(ctx)
 	node := &QueryNode{
 		queryNodeLoopCtx:    ctx1,
@@ -96,6 +101,8 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode {
 		metaService:     nil,
 		searchService:   nil,
 		statsService:    nil,
+
+		msFactory: factory,
 	}
 
 	node.replica = newCollectionReplicaImpl()
@@ -143,12 +150,23 @@ func (node *QueryNode) Init() error {
 }
 
 func (node *QueryNode) Start() error {
+	var err error
+	m := map[string]interface{}{
+		"PulsarAddress":  Params.PulsarAddress,
+		"ReceiveBufSize": 1024,
+		"PulsarBufSize":  1024}
+	err = node.msFactory.SetParams(m)
+	if err != nil {
+		return err
+	}
+
 	// init services and manager
-	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
-	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
+	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory)
+	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
 	//node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
+
 	node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream)
-	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan)
+	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
 
 	// start services
 	go node.dataSyncService.start()
diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go
index 939ed0917..02cb89f5f 100644
--- a/internal/querynode/query_node_test.go
+++ b/internal/querynode/query_node_test.go
@@ -10,6 +10,7 @@ import (
 
 	"github.com/stretchr/testify/assert"
 
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -133,7 +134,8 @@ func newQueryNodeMock() *QueryNode {
 		}()
 	}
 
-	svr := NewQueryNode(ctx, 0)
+	msFactory := pulsarms.NewFactory()
+	svr := NewQueryNode(ctx, 0, msFactory)
 	err := svr.SetQueryService(&queryServiceMock{})
 	if err != nil {
 		panic(err)
diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go
index f995935f7..f6e55192c 100644
--- a/internal/querynode/search_service.go
+++ b/internal/querynode/search_service.go
@@ -12,7 +12,6 @@ import (
 	"github.com/golang/protobuf/proto"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
@@ -38,13 +37,8 @@ type searchService struct {
 
 type ResultEntityIds []UniqueID
 
-func newSearchService(ctx context.Context, replica collectionReplica) *searchService {
+func newSearchService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *searchService {
 	receiveBufSize := Params.SearchReceiveBufSize
-	pulsarBufSize := Params.SearchPulsarBufSize
-
-	msgStreamURL := Params.PulsarAddress
-
-	factory := pulsarms.NewFactory(msgStreamURL, receiveBufSize, pulsarBufSize)
 
 	consumeChannels := Params.SearchChannelNames
 	consumeSubName := Params.MsgChannelSubName
diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go
index ca33a0ffc..1765e3464 100644
--- a/internal/querynode/search_service_test.go
+++ b/internal/querynode/search_service_test.go
@@ -93,14 +93,21 @@ func TestSearch_Search(t *testing.T) {
 	msgPackSearch := msgstream.MsgPack{}
 	msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"receiveBufSize": receiveBufSize,
+		"pulsarAddress":  pulsarURL,
+		"pulsarBufSize":  1024}
+	err = msFactory.SetParams(m)
+	assert.Nil(t, err)
+
+	searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	searchStream.AsProducer(searchProducerChannels)
 	searchStream.Start()
 	err = searchStream.Produce(&msgPackSearch)
 	assert.NoError(t, err)
 
-	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
+	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory)
 	go node.searchService.start()
 
 	// start insert
@@ -179,10 +186,10 @@ func TestSearch_Search(t *testing.T) {
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
 
-	insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	insertStream.AsProducer(insertChannels)
 
-	ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	ddStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	ddStream.AsProducer(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
@@ -200,7 +207,7 @@ func TestSearch_Search(t *testing.T) {
 	assert.NoError(t, err)
 
 	// dataSync
-	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
+	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory)
 	go node.dataSyncService.start()
 
 	time.Sleep(1 * time.Second)
@@ -209,14 +216,22 @@ func TestSearch_Search(t *testing.T) {
 }
 
 func TestSearch_SearchMultiSegments(t *testing.T) {
-	node := NewQueryNode(context.Background(), 0)
-	initTestMeta(t, node, 0, 0)
-
 	pulsarURL := Params.PulsarAddress
+	const receiveBufSize = 1024
+
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"receiveBufSize": receiveBufSize,
+		"pulsarAddress":  pulsarURL,
+		"pulsarBufSize":  1024}
+	err := msFactory.SetParams(m)
+	assert.Nil(t, err)
+
+	node := NewQueryNode(context.Background(), 0, msFactory)
+	initTestMeta(t, node, 0, 0)
 
 	// test data generate
 	const msgLength = 10
-	const receiveBufSize = 1024
 	const DIM = 16
 	searchProducerChannels := Params.SearchChannelNames
 	var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
@@ -283,14 +298,13 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
 	msgPackSearch := msgstream.MsgPack{}
 	msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
 
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	searchStream.AsProducer(searchProducerChannels)
 	searchStream.Start()
 	err = searchStream.Produce(&msgPackSearch)
 	assert.NoError(t, err)
 
-	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
+	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory)
 	go node.searchService.start()
 
 	// start insert
@@ -373,10 +387,10 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
 	insertChannels := Params.InsertChannelNames
 	ddChannels := Params.DDChannelNames
 
-	insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	insertStream.AsProducer(insertChannels)
 
-	ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx)
+	ddStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	ddStream.AsProducer(ddChannels)
 
 	var insertMsgStream msgstream.MsgStream = insertStream
@@ -394,7 +408,7 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
 	assert.NoError(t, err)
 
 	// dataSync
-	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
+	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory)
 	go node.dataSyncService.start()
 
 	time.Sleep(1 * time.Second)
diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go
index 9608128e8..cf6db549f 100644
--- a/internal/querynode/stats_service.go
+++ b/internal/querynode/stats_service.go
@@ -8,7 +8,6 @@ import (
 	"time"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 )
@@ -20,9 +19,10 @@ type statsService struct {
 
 	fieldStatsChan chan []*internalpb2.FieldStats
 	statsStream    msgstream.MsgStream
+	msFactory      msgstream.Factory
 }
 
-func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb2.FieldStats) *statsService {
+func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService {
 
 	return &statsService{
 		ctx: ctx,
@@ -31,6 +31,8 @@ func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsC
 
 		fieldStatsChan: fieldStatsChan,
 		statsStream:    nil,
+
+		msFactory: factory,
 	}
 }
 
@@ -40,8 +42,7 @@ func (sService *statsService) start() {
 	// start pulsar
 	producerChannels := []string{Params.StatsChannelName}
 
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.StatsReceiveBufSize, 1024)
-	statsStream, _ := factory.NewMsgStream(sService.ctx)
+	statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx)
 	statsStream.AsProducer(producerChannels)
 
 	var statsMsgStream msgstream.MsgStream = statsStream
diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go
index d21230fd0..1222982ff 100644
--- a/internal/querynode/stats_service_test.go
+++ b/internal/querynode/stats_service_test.go
@@ -12,7 +12,14 @@ import (
 func TestStatsService_start(t *testing.T) {
 	node := newQueryNodeMock()
 	initTestMeta(t, node, 0, 0)
-	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil)
+
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"PulsarAddress":  Params.PulsarAddress,
+		"ReceiveBufSize": 1024,
+		"PulsarBufSize":  1024}
+	msFactory.SetParams(m)
+	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory)
 	node.statsService.start()
 	node.Stop()
 }
@@ -26,15 +33,21 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) {
 	// start pulsar
 	producerChannels := []string{Params.StatsChannelName}
 
-	pulsarURL := Params.PulsarAddress
-	factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024)
-	statsStream, err := factory.NewMsgStream(node.queryNodeLoopCtx)
+	msFactory := pulsarms.NewFactory()
+	m := map[string]interface{}{
+		"receiveBufSize": receiveBufSize,
+		"pulsarAddress":  Params.PulsarAddress,
+		"pulsarBufSize":  1024}
+	err := msFactory.SetParams(m)
+	assert.Nil(t, err)
+
+	statsStream, err := msFactory.NewMsgStream(node.queryNodeLoopCtx)
 	assert.Nil(t, err)
 	statsStream.AsProducer(producerChannels)
 
 	var statsMsgStream msgstream.MsgStream = statsStream
 
-	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil)
+	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory)
 	node.statsService.statsStream = statsMsgStream
 	node.statsService.statsStream.Start()
 
diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go
index ffae64b47..60941772e 100644
--- a/internal/queryservice/queryservice.go
+++ b/internal/queryservice/queryservice.go
@@ -9,6 +9,7 @@ import (
 
 	nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client"
 	"github.com/zilliztech/milvus-distributed/internal/errors"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -54,6 +55,8 @@ type QueryService struct {
 	stateCode  atomic.Value
 	isInit     atomic.Value
 	enableGrpc bool
+
+	msFactory msgstream.Factory
 }
 
 func (qs *QueryService) Init() error {
@@ -140,7 +143,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb
 		}
 		node = newQueryNodeInfo(client)
 	} else {
-		client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID))
+		client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID), qs.msFactory)
 		node = newQueryNodeInfo(client)
 	}
 	qs.queryNodes[UniqueID(allocatedID)] = node
@@ -546,7 +549,7 @@ func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*queryp
 	}, nil
 }
 
-func NewQueryService(ctx context.Context) (*QueryService, error) {
+func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) {
 	nodes := make(map[UniqueID]*queryNodeInfo)
 	ctx1, cancel := context.WithCancel(ctx)
 	replica := newMetaReplica()
@@ -558,6 +561,7 @@ func NewQueryService(ctx context.Context) (*QueryService, error) {
 		numRegisterNode: 0,
 		numQueryChannel: 0,
 		enableGrpc:      false,
+		msFactory:       factory,
 	}
 	service.stateCode.Store(internalpb2.StateCode_INITIALIZING)
 	service.isInit.Store(false)
diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go
index 3c7ca3616..2316d25cb 100644
--- a/internal/queryservice/queryservice_test.go
+++ b/internal/queryservice/queryservice_test.go
@@ -8,6 +8,7 @@ import (
 	"github.com/stretchr/testify/assert"
 
 	"github.com/zilliztech/milvus-distributed/internal/errors"
+	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -165,7 +166,8 @@ func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*inte
 }
 
 func TestQueryService_Init(t *testing.T) {
-	service, err := NewQueryService(context.Background())
+	msFactory := pulsarms.NewFactory()
+	service, err := NewQueryService(context.Background(), msFactory)
 	assert.Nil(t, err)
 	service.Init()
 	service.Start()
@@ -193,7 +195,8 @@ func TestQueryService_Init(t *testing.T) {
 }
 
 func TestQueryService_load(t *testing.T) {
-	service, err := NewQueryService(context.Background())
+	msFactory := pulsarms.NewFactory()
+	service, err := NewQueryService(context.Background(), msFactory)
 	assert.Nil(t, err)
 	service.Init()
 	service.Start()
-- 
GitLab