diff --git a/configs/advanced/data_service.yaml b/configs/advanced/data_service.yaml
index addbe35bb8b2f0a12f821fa888ee99db8d4d042e..c82a0cea08e8932cbc6164fab4d13087a34be8ea 100644
--- a/configs/advanced/data_service.yaml
+++ b/configs/advanced/data_service.yaml
@@ -9,5 +9,5 @@ dataservice:
     defaultSizePerRecord: 1024
     # old name: segmentExpireDuration: 2000
     IDAssignExpiration: 2000 # ms
-  insertChannelNum: 16
+  insertChannelNum: 2
   dataNodeNum: 1
\ No newline at end of file
diff --git a/docs/developer_guides/chap03_index_service.md b/docs/developer_guides/chap03_index_service.md
index f114c9f05ab6891b0eb64e27b252262436092bd0..5add6dc82de4bebc56fa5ea8d581784db2782984 100644
--- a/docs/developer_guides/chap03_index_service.md
+++ b/docs/developer_guides/chap03_index_service.md
@@ -13,10 +13,12 @@
 ```go
 type IndexService interface {
   Service
-  RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
-  BuildIndex(req BuildIndexRequest) (BuildIndexResponse, error)
-	GetIndexStates(req IndexStatesRequest) (IndexStatesResponse, error)
-  GetIndexFilePaths(req IndexFilePathRequest) (IndexFilePathsResponse, error)
+  RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error)
+  BuildIndex(BuildIndexRequest) (BuildIndexResponse, error)
+  GetIndexStates(IndexStatesRequest) (IndexStatesResponse, error)
+  GetIndexFilePaths(IndexFilePathRequest) (IndexFilePathsResponse, error)
+  GetTimeTickChannel() (StringResponse, error)
+  GetStatisticsChannel() (StringResponse, error)
   NotifyTaskState(TaskStateNotification) error
 
 }
diff --git a/docs/developer_guides/chap05_proxy.md b/docs/developer_guides/chap05_proxy.md
index 6689f187ede62d4aa5f12cf5e8d1a40ad7f76ff4..e73fe9d658edd10f2eb1ac7ce61d48ae29ad7b80 100644
--- a/docs/developer_guides/chap05_proxy.md
+++ b/docs/developer_guides/chap05_proxy.md
@@ -13,7 +13,7 @@ type ProxyService interface {
   Service
   RegisterLink() (RegisterLinkResponse, error)
   RegisterNode(req RegisterNodeRequest) (RegisterNodeResponse, error)
-  InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) error
+  InvalidateCollectionMetaCache(req InvalidateCollMetaCacheRequest) (Status, error)
 }
 ```
 
@@ -72,8 +72,10 @@ type ProxyNode interface {
   Service
   //SetTimeTickChannel(channelName string) error
   //SetStatsChannel(channelName string) error
+
+  InvalidateCollectionMetaCache(request InvalidateCollMetaCacheRequest) (Status, error)
   
-	CreateCollection(req CreateCollectionRequest) error
+  CreateCollection(req CreateCollectionRequest) error
   DropCollection(req DropCollectionRequest) error
   HasCollection(req HasCollectionRequest) (bool, error)
   LoadCollection(req LoadCollectionRequest) error
@@ -92,12 +94,16 @@ type ProxyNode interface {
   
   CreateIndex(req CreateIndexRequest) error
   DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error)
-  
+  GetIndexState(IndexStateRequest) (IndexStateResponse, error)
+
   Insert(req InsertRequest) (InsertResponse, error)
   Search(req SearchRequest) (SearchResults, error)
   Flush(req FlushRequest) error
-  GetPersistentSegmentInfo(req PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error)
 
+  GetDdChannel(Empty) (StringResponse, error)
+
+  GetQuerySegmentInfo(QuerySegmentInfoRequest) (QuerySegmentInfoResponse, error)
+  GetPersistentSegmentInfo(PersistentSegmentInfoRequest) (PersistentSegmentInfoResponse, error)
 }
 ```
 
diff --git a/docs/developer_guides/chap09_data_service.md b/docs/developer_guides/chap09_data_service.md
index d92c882841e03f10d204779d38e5391b106ad870..a2f69322807642616bbdb5ca2308dfa4e7a9ea6b 100644
--- a/docs/developer_guides/chap09_data_service.md
+++ b/docs/developer_guides/chap09_data_service.md
@@ -23,8 +23,9 @@ type DataService interface {
   GetSegmentInfo(req SegmentInfoRequest) (SegmentInfoResponse, error)
 
   GetInsertBinlogPaths(req InsertBinlogPathRequest) (InsertBinlogPathsResponse, error)
-  
-  GetInsertChannels(req InsertChannelRequest) ([]string, error)
+
+  GetSegmentInfoChannel(req InsertChannelRequest) (StringResponse, error)
+  GetInsertChannels(req InsertChannelRequest) (StringList, error)
 
   GetCollectionStatistics(req CollectionStatsRequest) (CollectionStatsResponse, error)
   GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
@@ -253,16 +254,38 @@ type InsertRequest struct {
 ```go
 type DataNode interface {
   Service
+
+  GetComponentStates() (ComponentStates, error)
+  GetTimeTickChannel() (StringResponse, error)
+  GetStatisticsChannel() (StringResponse, error)
   
-  WatchDmChannels(req WatchDmChannelRequest) error
+  WatchDmChannels(WatchDmChannelRequest) error
+  FlushSegments(FlushSegRequest) (Status, error)
   //WatchDdChannel(channelName string) error
   //SetTimeTickChannel(channelName string) error
   //SetStatisticsChannel(channelName string) error
   
-  FlushSegments(req FlushSegRequest) error
+  SetMasterServiceInterface(MasterServiceInterface) error
+  SetDataServiceInterface(DataServiceInterface) error
 }
 ```
 
+```go
+type DataServiceInterface interface {
+  GetComponentStates() (ComponentStates, error)
+  RegisterNode(RegisterNodeRequest) (RegisterNodeResponse, error)
+}
+```
+```go
+type MasterServiceInterface interface {
+  GetComponentStates() (ComponentStates, error)
+  AllocID(IDRequest) (IDResponse, error)
+  ShowCollections(ShowCollectionRequest) (ShowCollectionResponse, error)
+  DescribeCollection(DescribeCollectionRequest) (DescribeCollectionResponse, error)
+}
+
+```
+
 
 * *WatchDmChannels*
 
diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index 5599216bede48787d937a7edb66382dbf817e670..b86f64e08a8cf017327d36221ba4c930bb78af00 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -7,10 +7,6 @@ import (
 	"log"
 	"time"
 
-	"github.com/opentracing/opentracing-go"
-	"github.com/uber/jaeger-client-go"
-	"github.com/uber/jaeger-client-go/config"
-
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
@@ -33,8 +29,8 @@ type (
 
 		// Component
 		GetComponentStates() (*internalpb2.ComponentStates, error)
-		GetTimeTickChannel() (string, error)   // This function has no effect
-		GetStatisticsChannel() (string, error) // This function has no effect
+		GetTimeTickChannel() (*milvuspb.StringResponse, error)   // This function has no effect
+		GetStatisticsChannel() (*milvuspb.StringResponse, error) // This function has no effect
 
 		WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
 		FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
@@ -72,7 +68,6 @@ type (
 		flushChan chan *flushMsg
 		replica   collectionReplica
 
-		tracer opentracing.Tracer
 		closer io.Closer
 	}
 )
@@ -177,25 +172,6 @@ func (node *DataNode) Init() error {
 
 	node.replica = replica
 
-	// --- Opentracing ---
-	cfg := &config.Configuration{
-		ServiceName: "data_node",
-		Sampler: &config.SamplerConfig{
-			Type:  "const",
-			Param: 1,
-		},
-		Reporter: &config.ReporterConfig{
-			LogSpans: true,
-		},
-	}
-	tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
-	if err != nil {
-		return errors.Errorf("ERROR: cannot init Jaeger: %v\n", err)
-	}
-	node.tracer = tracer
-	node.closer = closer
-	opentracing.SetGlobalTracer(node.tracer)
-
 	return nil
 }
 
diff --git a/internal/datanode/flow_graph_filter_dm_node.go b/internal/datanode/flow_graph_filter_dm_node.go
index 4663635a4165ff3b3f6cc30212407f9169aed984..c5171a6be331a0c4f154e2720e6e28faa83dbea5 100644
--- a/internal/datanode/flow_graph_filter_dm_node.go
+++ b/internal/datanode/flow_graph_filter_dm_node.go
@@ -1,12 +1,9 @@
 package datanode
 
 import (
-	"context"
 	"log"
 	"math"
 
-	"github.com/opentracing/opentracing-go"
-
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -35,28 +32,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
 		// TODO: add error handling
 	}
 
-	var childs []opentracing.Span
-	tracer := opentracing.GlobalTracer()
-	if tracer != nil {
-		for _, msg := range msgStreamMsg.TsMessages() {
-			if msg.Type() == commonpb.MsgType_kInsert {
-				var child opentracing.Span
-				ctx := msg.GetMsgContext()
-				if parent := opentracing.SpanFromContext(ctx); parent != nil {
-					child = tracer.StartSpan("pass filter node",
-						opentracing.FollowsFrom(parent.Context()))
-				} else {
-					child = tracer.StartSpan("pass filter node")
-				}
-				child.SetTag("hash keys", msg.HashKeys())
-				child.SetTag("start time", msg.BeginTs())
-				child.SetTag("end time", msg.EndTs())
-				msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
-				childs = append(childs, child)
-			}
-		}
-	}
-
 	ddMsg, ok := (*in[1]).(*ddMsg)
 	if !ok {
 		log.Println("type assertion failed for ddMsg")
@@ -77,20 +52,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
 
 	iMsg.flushMessages = append(iMsg.flushMessages, ddMsg.flushMessages...)
 
-	for key, msg := range msgStreamMsg.TsMessages() {
+	for _, msg := range msgStreamMsg.TsMessages() {
 		switch msg.Type() {
 		case commonpb.MsgType_kInsert:
-			var ctx2 context.Context
-			if childs != nil {
-				if childs[key] != nil {
-					ctx2 = opentracing.ContextWithSpan(msg.GetMsgContext(), childs[key])
-				} else {
-					ctx2 = context.Background()
-				}
-			}
 			resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
 			if resMsg != nil {
-				resMsg.SetMsgContext(ctx2)
 				iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
 			}
 		// case commonpb.MsgType_kDelete:
@@ -103,9 +69,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
 	iMsg.startPositions = append(iMsg.startPositions, msgStreamMsg.StartPositions()...)
 	iMsg.gcRecord = ddMsg.gcRecord
 	var res Msg = &iMsg
-	for _, child := range childs {
-		child.Finish()
-	}
 	return []*Msg{&res}
 }
 
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index 1b5336926baeb6ab9f33e4ae2a1cd976b22e0e0a..59a292a91363fc687d048420e0f1449ff9245d66 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -4,7 +4,6 @@ import (
 	"bytes"
 	"context"
 	"encoding/binary"
-	"fmt"
 	"log"
 	"path"
 	"strconv"
@@ -12,9 +11,6 @@ import (
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 
-	"github.com/opentracing/opentracing-go"
-	oplog "github.com/opentracing/opentracing-go/log"
-
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/kv"
 	miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
@@ -155,23 +151,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 	// iMsg is insertMsg
 	// 1. iMsg -> buffer
 	for _, msg := range iMsg.insertMessages {
-		ctx := msg.GetMsgContext()
-		var span opentracing.Span
-		if ctx != nil {
-			span, _ = opentracing.StartSpanFromContext(ctx, fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
-		} else {
-			span = opentracing.StartSpan(fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs()))
-		}
-		span.SetTag("hash keys", msg.HashKeys())
-		span.SetTag("start time", msg.BeginTs())
-		span.SetTag("end time", msg.EndTs())
 		if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
 			log.Println("Error: misaligned messages detected")
 			continue
 		}
 		currentSegID := msg.GetSegmentID()
 		collectionID := msg.GetCollectionID()
-		span.LogFields(oplog.Int("segment id", int(currentSegID)))
 
 		idata, ok := ibNode.insertBuffer.insertData[currentSegID]
 		if !ok {
@@ -426,11 +411,9 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
 
 		// 1.3 store in buffer
 		ibNode.insertBuffer.insertData[currentSegID] = idata
-		span.LogFields(oplog.String("store in buffer", "store in buffer"))
 
 		// 1.4 if full
 		//   1.4.1 generate binlogs
-		span.LogFields(oplog.String("generate binlogs", "generate binlogs"))
 		if ibNode.insertBuffer.full(currentSegID) {
 			log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID))
 
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index 5b1c54e09e8accdb6ef8f923fa7103f0ddc751eb..2116478e9a61c3e7ee7ff24035641c8cb7121a6f 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -44,8 +44,8 @@ type DataService interface {
 	ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error)
 	GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
 	GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
-	GetSegmentInfoChannel() (string, error)
-	GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
+	GetSegmentInfoChannel() (*milvuspb.StringResponse, error)
+	GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
 	GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
 	GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
 	GetComponentStates() (*internalpb2.ComponentStates, error)
@@ -438,12 +438,22 @@ func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
 	return resp, nil
 }
 
-func (s *Server) GetTimeTickChannel() (string, error) {
-	return Params.TimeTickChannelName, nil
+func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Value: Params.TimeTickChannelName,
+	}, nil
 }
 
-func (s *Server) GetStatisticsChannel() (string, error) {
-	return Params.StatisticsChannelName, nil
+func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Value: Params.StatisticsChannelName,
+	}, nil
 }
 
 func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
@@ -687,8 +697,13 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
 	return resp, nil
 }
 
-func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
-	return s.insertChannels, nil
+func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
+	return &internalpb2.StringList{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Values: s.insertChannels,
+	}, nil
 }
 
 func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
@@ -718,8 +733,13 @@ func (s *Server) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat
 	return nil, nil
 }
 
-func (s *Server) GetSegmentInfoChannel() (string, error) {
-	return Params.SegmentInfoChannelName, nil
+func (s *Server) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Value: Params.SegmentInfoChannelName,
+	}, nil
 }
 
 func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
diff --git a/internal/distributed/dataservice/client.go b/internal/distributed/dataservice/client.go
index 4507502ef070ae4871424879158f1646bbc55073..cc70a3ed36081b85349ac8cbba3ad25b113c54d7 100644
--- a/internal/distributed/dataservice/client.go
+++ b/internal/distributed/dataservice/client.go
@@ -2,9 +2,10 @@ package dataservice
 
 import (
 	"context"
-	"errors"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+
 	"google.golang.org/grpc"
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@@ -58,26 +59,12 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
 	return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{})
 }
 
-func (c *Client) GetTimeTickChannel() (string, error) {
-	resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
-	if err != nil {
-		return "", err
-	}
-	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
-		return "", errors.New(resp.Status.Reason)
-	}
-	return resp.Value, nil
+func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
 }
 
-func (c *Client) GetStatisticsChannel() (string, error) {
-	resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
-	if err != nil {
-		return "", err
-	}
-	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
-		return "", errors.New(resp.Status.Reason)
-	}
-	return resp.Value, nil
+func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
+	return c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
 }
 
 func (c *Client) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
@@ -104,15 +91,8 @@ func (c *Client) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
 	return c.grpcClient.GetInsertBinlogPaths(context.Background(), req)
 }
 
-func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
-	resp, err := c.grpcClient.GetInsertChannels(context.Background(), req)
-	if err != nil {
-		return nil, err
-	}
-	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
-		return nil, errors.New(resp.Status.Reason)
-	}
-	return resp.Values, nil
+func (c *Client) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
+	return c.grpcClient.GetInsertChannels(context.Background(), req)
 }
 
 func (c *Client) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
@@ -123,15 +103,8 @@ func (c *Client) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*dat
 	return c.grpcClient.GetPartitionStatistics(context.Background(), req)
 }
 
-func (c *Client) GetSegmentInfoChannel() (string, error) {
-	resp, err := c.grpcClient.GetSegmentInfoChannel(context.Background(), &commonpb.Empty{})
-	if err != nil {
-		return "", err
-	}
-	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
-		return "", errors.New(resp.Status.Reason)
-	}
-	return resp.Value, nil
+func (c *Client) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
+	return c.grpcClient.GetSegmentInfoChannel(context.Background(), &commonpb.Empty{})
 }
 
 func (c *Client) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go
index ff39512820b320dc86104938b5b721757774716f..13a8fab20dff7ce7fa2340fcf1b5a23bc1155aca 100644
--- a/internal/distributed/dataservice/grpc_service.go
+++ b/internal/distributed/dataservice/grpc_service.go
@@ -105,20 +105,7 @@ func (s *Service) GetInsertBinlogPaths(ctx context.Context, request *datapb.Inse
 }
 
 func (s *Service) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
-	resp := &internalpb2.StringList{
-		Status: &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
-		},
-	}
-	channels, err := s.server.GetInsertChannels(request)
-	if err != nil {
-		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
-		resp.Status.Reason = err.Error()
-		return resp, nil
-	}
-
-	resp.Values = channels
-	return resp, nil
+	return s.server.GetInsertChannels(request)
 }
 
 func (s *Service) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
@@ -134,53 +121,15 @@ func (s *Service) GetComponentStates(ctx context.Context, empty *commonpb.Empty)
 }
 
 func (s *Service) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
-	resp := &milvuspb.StringResponse{
-		Status: &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
-		},
-	}
-	channel, err := s.server.GetTimeTickChannel()
-	if err != nil {
-		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
-		resp.Status.Reason = err.Error()
-		return resp, nil
-	}
-
-	resp.Value = channel
-	return resp, nil
+	return s.server.GetTimeTickChannel()
 }
 
 func (s *Service) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
-	resp := &milvuspb.StringResponse{
-		Status: &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
-		},
-	}
-	channel, err := s.server.GetStatisticsChannel()
-	if err != nil {
-		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
-		resp.Status.Reason = err.Error()
-		return resp, nil
-	}
-
-	resp.Value = channel
-	return resp, nil
+	return s.server.GetStatisticsChannel()
 }
 
 func (s *Service) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
-	resp := &milvuspb.StringResponse{
-		Status: &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
-		},
-	}
-	channel, err := s.server.GetSegmentInfoChannel()
-	if err != nil {
-		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
-		resp.Status.Reason = err.Error()
-		return resp, nil
-	}
-	resp.Value = channel
-	return resp, nil
+	return s.server.GetSegmentInfoChannel()
 }
 
 func (s *Service) GetCount(ctx context.Context, request *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go
index 201bcb2e84fb210dd507ee9e264800b83bc84d2b..05dbc7957d5c4d08790a6db3658334c609f7605b 100644
--- a/internal/distributed/indexnode/client/client.go
+++ b/internal/distributed/indexnode/client/client.go
@@ -2,9 +2,10 @@ package grpcindexnodeclient
 
 import (
 	"context"
-	"errors"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+
 	"github.com/zilliztech/milvus-distributed/internal/util/retry"
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@@ -47,28 +48,12 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
 	return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{})
 }
 
-func (c *Client) GetTimeTickChannel() (string, error) {
-	resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
-
-	if err != nil {
-		return "", err
-	}
-	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
-		return "", errors.New(resp.Status.Reason)
-	}
-	return resp.Value, nil
+func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
 }
 
-func (c *Client) GetStatisticsChannel() (string, error) {
-	resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
-
-	if err != nil {
-		return "", err
-	}
-	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
-		return "", errors.New(resp.Status.Reason)
-	}
-	return resp.Value, nil
+func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
+	return c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
 }
 
 func (c *Client) BuildIndex(req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go
index 38cc6c9d36d1eb32ef237ea756cec42ee0080f45..ab9a7300e8a5bb68bfec92ae3b92ecf4bf17686c 100644
--- a/internal/distributed/indexnode/service.go
+++ b/internal/distributed/indexnode/service.go
@@ -142,35 +142,11 @@ func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty)
 }
 
 func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
-	ret, err := s.impl.GetTimeTickChannel()
-	resp := &milvuspb.StringResponse{
-		Status: &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
-		},
-	}
-	if err != nil {
-		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
-		resp.Status.Reason = err.Error()
-	} else {
-		resp.Value = ret
-	}
-	return resp, nil
+	return s.impl.GetTimeTickChannel()
 }
 
 func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
-	ret, err := s.impl.GetStatisticsChannel()
-	resp := &milvuspb.StringResponse{
-		Status: &commonpb.Status{
-			ErrorCode: commonpb.ErrorCode_SUCCESS,
-		},
-	}
-	if err != nil {
-		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
-		resp.Status.Reason = err.Error()
-	} else {
-		resp.Value = ret
-	}
-	return resp, nil
+	return s.impl.GetStatisticsChannel()
 }
 
 func NewServer(ctx context.Context) (*Server, error) {
diff --git a/internal/distributed/indexservice/client/client.go b/internal/distributed/indexservice/client/client.go
index f3fd99e993106e3ab837454fe85606f5fd688c1f..b84177a44bfc56d6aafbfae62fbf1d04ec9dfd66 100644
--- a/internal/distributed/indexservice/client/client.go
+++ b/internal/distributed/indexservice/client/client.go
@@ -5,6 +5,8 @@ import (
 	"log"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+
 	"google.golang.org/grpc"
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@@ -50,12 +52,20 @@ func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
 	return c.grpcClient.GetComponentStates(ctx, &commonpb.Empty{})
 }
 
-func (c *Client) GetTimeTickChannel() (string, error) {
-	return "", nil
+func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+	}, nil
 }
 
-func (c *Client) GetStatisticsChannel() (string, error) {
-	return "", nil
+func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+	}, nil
 }
 
 func (c *Client) RegisterNode(req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
diff --git a/internal/distributed/proxynode/client/client.go b/internal/distributed/proxynode/client/client.go
index 637f912d12569ebfd01c2422f4d5372c56fe6d8d..a915c2767dc1d04c6dd38948f8bd333aa0026c81 100644
--- a/internal/distributed/proxynode/client/client.go
+++ b/internal/distributed/proxynode/client/client.go
@@ -4,6 +4,8 @@ import (
 	"context"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+
 	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
 	"github.com/zilliztech/milvus-distributed/internal/util/retry"
 	"google.golang.org/grpc"
@@ -39,9 +41,8 @@ func (c *Client) Stop() error {
 	return nil
 }
 
-func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
-	_, err := c.grpcClient.InvalidateCollectionMetaCache(c.ctx, request)
-	return err
+func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
+	return c.grpcClient.InvalidateCollectionMetaCache(c.ctx, request)
 }
 
 func NewClient(ctx context.Context, address string) *Client {
diff --git a/internal/distributed/proxyservice/client/client.go b/internal/distributed/proxyservice/client/client.go
index 13621a2db483d742edb90327c77161f5e47f5d62..99ad11e5c438e89dcd9423a661a86d5561cfd0e6 100644
--- a/internal/distributed/proxyservice/client/client.go
+++ b/internal/distributed/proxyservice/client/client.go
@@ -4,6 +4,8 @@ import (
 	"context"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+
 	"google.golang.org/grpc"
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@@ -46,25 +48,25 @@ func (c *Client) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.Re
 	return c.proxyServiceClient.RegisterNode(c.ctx, request)
 }
 
-func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
+func (c *Client) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
 	_, err := c.proxyServiceClient.InvalidateCollectionMetaCache(c.ctx, request)
-	return err
+	return nil, err
 }
 
-func (c *Client) GetTimeTickChannel() (string, error) {
-	response, err := c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{})
-	if err != nil {
-		return "", err
-	}
-	return response.Value, nil
+func (c *Client) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return c.proxyServiceClient.GetTimeTickChannel(c.ctx, &commonpb.Empty{})
 }
 
 func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
 	return c.proxyServiceClient.GetComponentStates(c.ctx, &commonpb.Empty{})
 }
 
-func (c *Client) GetStatisticsChannel() (string, error) {
-	return "", nil
+func (c *Client) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+	}, nil
 }
 
 func NewClient(address string) *Client {
diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go
index 565905ff887a97d73b80b7d34d06ff956be93ba6..19fe18ecde050419c668301d50c905517e9713bf 100644
--- a/internal/distributed/proxyservice/service.go
+++ b/internal/distributed/proxyservice/service.go
@@ -133,23 +133,11 @@ func (s *Server) RegisterNode(ctx context.Context, request *proxypb.RegisterNode
 }
 
 func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
-	return &commonpb.Status{}, s.impl.InvalidateCollectionMetaCache(request)
+	return s.impl.InvalidateCollectionMetaCache(request)
 }
 
 func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
-	channel, err := s.impl.GetTimeTickChannel()
-	if err != nil {
-		return &milvuspb.StringResponse{
-			Status: &commonpb.Status{
-				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
-				Reason:    err.Error(),
-			},
-			Value: "",
-		}, nil
-	}
-	return &milvuspb.StringResponse{
-		Value: channel,
-	}, nil
+	return s.impl.GetTimeTickChannel()
 }
 
 func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go
index 11af320e42d845de728f53af630e37b6fdb55b28..97b1ad55b613876815ed491141ee8c5a0f2e408b 100644
--- a/internal/indexnode/indexnode.go
+++ b/internal/indexnode/indexnode.go
@@ -5,6 +5,8 @@ import (
 	"log"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/kv"
 	miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
@@ -209,10 +211,18 @@ func (i *NodeImpl) GetComponentStates() (*internalpb2.ComponentStates, error) {
 	return ret, nil
 }
 
-func (i *NodeImpl) GetTimeTickChannel() (string, error) {
-	return "", nil
+func (i *NodeImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+	}, nil
 }
 
-func (i *NodeImpl) GetStatisticsChannel() (string, error) {
-	return "", nil
+func (i *NodeImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+	}, nil
 }
diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go
index 545c0c401f7576523b4b5720da123b4cba760126..8f7e5f73a136e098b8652a04ab42631ecaf19d2e 100644
--- a/internal/masterservice/master_service.go
+++ b/internal/masterservice/master_service.go
@@ -34,13 +34,13 @@ import (
 //  masterpb2 -> masterpb (master_service)
 
 type ProxyServiceInterface interface {
-	GetTimeTickChannel() (string, error)
-	InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
+	GetTimeTickChannel() (*milvuspb.StringResponse, error)
+	InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
 }
 
 type DataServiceInterface interface {
 	GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
-	GetSegmentInfoChannel() (string, error)
+	GetSegmentInfoChannel() (*milvuspb.StringResponse, error)
 }
 
 type IndexServiceInterface interface {
@@ -608,11 +608,11 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
 	if err != nil {
 		return err
 	}
-	Params.ProxyTimeTickChannel = rsp
+	Params.ProxyTimeTickChannel = rsp.Value
 	log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel)
 
 	c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error {
-		err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
+		status, _ := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   0, //TODO,MsgType
 				MsgID:     0,
@@ -622,8 +622,11 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
 			DbName:         dbName,
 			CollectionName: collectionName,
 		})
-		if err != nil {
-			return err
+		if status == nil {
+			return errors.New("invalidate collection metacache resp is nil")
+		}
+		if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+			return errors.New(status.Reason)
 		}
 		return nil
 	}
@@ -635,7 +638,7 @@ func (c *Core) SetDataService(s DataServiceInterface) error {
 	if err != nil {
 		return err
 	}
-	Params.DataServiceSegmentChannel = rsp
+	Params.DataServiceSegmentChannel = rsp.Value
 	log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel)
 
 	c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go
index 5333dbf45122d739968c146c8f0ec19541e7debe..0b42e4020cfa04003d015d1041daa60966284d54 100644
--- a/internal/masterservice/master_service_test.go
+++ b/internal/masterservice/master_service_test.go
@@ -28,14 +28,21 @@ type proxyMock struct {
 	mutex     sync.Mutex
 }
 
-func (p *proxyMock) GetTimeTickChannel() (string, error) {
-	return fmt.Sprintf("proxy-time-tick-%d", p.randVal), nil
+func (p *proxyMock) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Value: fmt.Sprintf("proxy-time-tick-%d", p.randVal),
+	}, nil
 }
-func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
+func (p *proxyMock) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
 	p.mutex.Lock()
 	defer p.mutex.Unlock()
 	p.collArray = append(p.collArray, request.CollectionName)
-	return nil
+	return &commonpb.Status{
+		ErrorCode: commonpb.ErrorCode_SUCCESS,
+	}, nil
 }
 func (p *proxyMock) GetCollArray() []string {
 	p.mutex.Lock()
@@ -72,8 +79,13 @@ func (d *dataMock) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*d
 	return rst, nil
 }
 
-func (d *dataMock) GetSegmentInfoChannel() (string, error) {
-	return fmt.Sprintf("segment-info-channel-%d", d.randVal), nil
+func (d *dataMock) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Value: fmt.Sprintf("segment-info-channel-%d", d.randVal),
+	}, nil
 }
 
 type indexMock struct {
diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go
index 6e9a5ae2edc68b552082e93003bccdbd7fb25431..f23bf92e939e71fcc7948df72ccc6dd109b11004 100644
--- a/internal/msgstream/msg.go
+++ b/internal/msgstream/msg.go
@@ -2,9 +2,9 @@ package msgstream
 
 import (
 	"context"
+	"errors"
 
 	"github.com/golang/protobuf/proto"
-	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -14,8 +14,6 @@ type MsgType = commonpb.MsgType
 type MarshalType = interface{}
 
 type TsMsg interface {
-	GetMsgContext() context.Context
-	SetMsgContext(context.Context)
 	BeginTs() Timestamp
 	EndTs() Timestamp
 	Type() MsgType
@@ -59,7 +57,7 @@ func ConvertToByteArray(input interface{}) ([]byte, error) {
 	case []byte:
 		return output, nil
 	default:
-		return nil, errors.New("Cannot convert interface{} to []byte")
+		return nil, errors.New("cannot convert interface{} to []byte")
 	}
 }
 
@@ -73,14 +71,6 @@ func (it *InsertMsg) Type() MsgType {
 	return it.Base.MsgType
 }
 
-func (it *InsertMsg) GetMsgContext() context.Context {
-	return it.MsgCtx
-}
-
-func (it *InsertMsg) SetMsgContext(ctx context.Context) {
-	it.MsgCtx = ctx
-}
-
 func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) {
 	insertMsg := input.(*InsertMsg)
 	insertRequest := &insertMsg.InsertRequest
@@ -129,13 +119,6 @@ func (fl *FlushCompletedMsg) Type() MsgType {
 	return fl.Base.MsgType
 }
 
-func (fl *FlushCompletedMsg) GetMsgContext() context.Context {
-	return fl.MsgCtx
-}
-func (fl *FlushCompletedMsg) SetMsgContext(ctx context.Context) {
-	fl.MsgCtx = ctx
-}
-
 func (fl *FlushCompletedMsg) Marshal(input TsMsg) (MarshalType, error) {
 	flushCompletedMsgTask := input.(*FlushCompletedMsg)
 	flushCompletedMsg := &flushCompletedMsgTask.SegmentFlushCompletedMsg
@@ -174,13 +157,6 @@ func (fl *FlushMsg) Type() MsgType {
 	return fl.Base.MsgType
 }
 
-func (fl *FlushMsg) GetMsgContext() context.Context {
-	return fl.MsgCtx
-}
-func (fl *FlushMsg) SetMsgContext(ctx context.Context) {
-	fl.MsgCtx = ctx
-}
-
 func (fl *FlushMsg) Marshal(input TsMsg) (MarshalType, error) {
 	flushMsgTask := input.(*FlushMsg)
 	flushMsg := &flushMsgTask.FlushMsg
@@ -218,14 +194,6 @@ func (dt *DeleteMsg) Type() MsgType {
 	return dt.Base.MsgType
 }
 
-func (dt *DeleteMsg) GetMsgContext() context.Context {
-	return dt.MsgCtx
-}
-
-func (dt *DeleteMsg) SetMsgContext(ctx context.Context) {
-	dt.MsgCtx = ctx
-}
-
 func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) {
 	deleteMsg := input.(*DeleteMsg)
 	deleteRequest := &deleteMsg.DeleteRequest
@@ -275,14 +243,6 @@ func (st *SearchMsg) Type() MsgType {
 	return st.Base.MsgType
 }
 
-func (st *SearchMsg) GetMsgContext() context.Context {
-	return st.MsgCtx
-}
-
-func (st *SearchMsg) SetMsgContext(ctx context.Context) {
-	st.MsgCtx = ctx
-}
-
 func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) {
 	searchTask := input.(*SearchMsg)
 	searchRequest := &searchTask.SearchRequest
@@ -320,14 +280,6 @@ func (srt *SearchResultMsg) Type() MsgType {
 	return srt.Base.MsgType
 }
 
-func (srt *SearchResultMsg) GetMsgContext() context.Context {
-	return srt.MsgCtx
-}
-
-func (srt *SearchResultMsg) SetMsgContext(ctx context.Context) {
-	srt.MsgCtx = ctx
-}
-
 func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) {
 	searchResultTask := input.(*SearchResultMsg)
 	searchResultRequest := &searchResultTask.SearchResults
@@ -365,14 +317,6 @@ func (tst *TimeTickMsg) Type() MsgType {
 	return tst.Base.MsgType
 }
 
-func (tst *TimeTickMsg) GetMsgContext() context.Context {
-	return tst.MsgCtx
-}
-
-func (tst *TimeTickMsg) SetMsgContext(ctx context.Context) {
-	tst.MsgCtx = ctx
-}
-
 func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) {
 	timeTickTask := input.(*TimeTickMsg)
 	timeTick := &timeTickTask.TimeTickMsg
@@ -411,14 +355,6 @@ func (qs *QueryNodeStatsMsg) Type() MsgType {
 	return qs.Base.MsgType
 }
 
-func (qs *QueryNodeStatsMsg) GetMsgContext() context.Context {
-	return qs.MsgCtx
-}
-
-func (qs *QueryNodeStatsMsg) SetMsgContext(ctx context.Context) {
-	qs.MsgCtx = ctx
-}
-
 func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) {
 	queryNodeSegStatsTask := input.(*QueryNodeStatsMsg)
 	queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats
@@ -454,14 +390,6 @@ func (ss *SegmentStatisticsMsg) Type() MsgType {
 	return ss.Base.MsgType
 }
 
-func (ss *SegmentStatisticsMsg) GetMsgContext() context.Context {
-	return ss.MsgCtx
-}
-
-func (ss *SegmentStatisticsMsg) SetMsgContext(ctx context.Context) {
-	ss.MsgCtx = ctx
-}
-
 func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) {
 	segStatsTask := input.(*SegmentStatisticsMsg)
 	segStats := &segStatsTask.SegmentStatistics
@@ -507,14 +435,6 @@ func (cc *CreateCollectionMsg) Type() MsgType {
 	return cc.Base.MsgType
 }
 
-func (cc *CreateCollectionMsg) GetMsgContext() context.Context {
-	return cc.MsgCtx
-}
-
-func (cc *CreateCollectionMsg) SetMsgContext(ctx context.Context) {
-	cc.MsgCtx = ctx
-}
-
 func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
 	createCollectionMsg := input.(*CreateCollectionMsg)
 	createCollectionRequest := &createCollectionMsg.CreateCollectionRequest
@@ -551,13 +471,6 @@ type DropCollectionMsg struct {
 func (dc *DropCollectionMsg) Type() MsgType {
 	return dc.Base.MsgType
 }
-func (dc *DropCollectionMsg) GetMsgContext() context.Context {
-	return dc.MsgCtx
-}
-
-func (dc *DropCollectionMsg) SetMsgContext(ctx context.Context) {
-	dc.MsgCtx = ctx
-}
 
 func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
 	dropCollectionMsg := input.(*DropCollectionMsg)
@@ -592,14 +505,6 @@ type CreatePartitionMsg struct {
 	internalpb2.CreatePartitionRequest
 }
 
-func (cc *CreatePartitionMsg) GetMsgContext() context.Context {
-	return cc.MsgCtx
-}
-
-func (cc *CreatePartitionMsg) SetMsgContext(ctx context.Context) {
-	cc.MsgCtx = ctx
-}
-
 func (cc *CreatePartitionMsg) Type() MsgType {
 	return cc.Base.MsgType
 }
@@ -637,14 +542,6 @@ type DropPartitionMsg struct {
 	internalpb2.DropPartitionRequest
 }
 
-func (dc *DropPartitionMsg) GetMsgContext() context.Context {
-	return dc.MsgCtx
-}
-
-func (dc *DropPartitionMsg) SetMsgContext(ctx context.Context) {
-	dc.MsgCtx = ctx
-}
-
 func (dc *DropPartitionMsg) Type() MsgType {
 	return dc.Base.MsgType
 }
@@ -729,14 +626,6 @@ func (sim *SegmentInfoMsg) Type() MsgType {
 	return sim.Base.MsgType
 }
 
-func (sim *SegmentInfoMsg) GetMsgContext() context.Context {
-	return sim.MsgCtx
-}
-
-func (sim *SegmentInfoMsg) SetMsgContext(ctx context.Context) {
-	sim.MsgCtx = ctx
-}
-
 func (sim *SegmentInfoMsg) Marshal(input TsMsg) (MarshalType, error) {
 	segInfoMsg := input.(*SegmentInfoMsg)
 	mb, err := proto.Marshal(&segInfoMsg.SegmentMsg)
diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go
index aedca490ca89b13c2faf547881e1e55fd5ed4c51..f70ce3d914aaf25495de14edaba0384bd02b2562 100644
--- a/internal/msgstream/pulsarms/pulsar_msgstream.go
+++ b/internal/msgstream/pulsarms/pulsar_msgstream.go
@@ -12,10 +12,6 @@ import (
 
 	"github.com/apache/pulsar-client-go/pulsar"
 	"github.com/golang/protobuf/proto"
-	"github.com/opentracing/opentracing-go"
-	"github.com/opentracing/opentracing-go/ext"
-	oplog "github.com/opentracing/opentracing-go/log"
-
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
@@ -247,49 +243,12 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
 
 			msg := &pulsar.ProducerMessage{Payload: m}
 
-			var child opentracing.Span
-			if v.Msgs[i].Type() == commonpb.MsgType_kInsert ||
-				v.Msgs[i].Type() == commonpb.MsgType_kSearch ||
-				v.Msgs[i].Type() == commonpb.MsgType_kSearchResult {
-				tracer := opentracing.GlobalTracer()
-				ctx := v.Msgs[i].GetMsgContext()
-				if ctx == nil {
-					ctx = context.Background()
-				}
-
-				if parent := opentracing.SpanFromContext(ctx); parent != nil {
-					child = tracer.StartSpan("start send pulsar msg",
-						opentracing.FollowsFrom(parent.Context()))
-				} else {
-					child = tracer.StartSpan("start send pulsar msg")
-				}
-				child.SetTag("hash keys", v.Msgs[i].HashKeys())
-				child.SetTag("start time", v.Msgs[i].BeginTs())
-				child.SetTag("end time", v.Msgs[i].EndTs())
-				child.SetTag("msg type", v.Msgs[i].Type())
-				msg.Properties = make(map[string]string)
-				err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
-				if err != nil {
-					child.LogFields(oplog.Error(err))
-					child.Finish()
-					return err
-				}
-				child.LogFields(oplog.String("inject success", "inject success"))
-			}
-
 			if _, err := ms.producers[k].Send(
 				context.Background(),
 				msg,
 			); err != nil {
-				if child != nil {
-					child.LogFields(oplog.Error(err))
-					child.Finish()
-				}
 				return err
 			}
-			if child != nil {
-				child.Finish()
-			}
 		}
 	}
 	return nil
@@ -309,49 +268,14 @@ func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
 		}
 
 		msg := &pulsar.ProducerMessage{Payload: m}
-		var child opentracing.Span
-		if v.Type() == commonpb.MsgType_kInsert ||
-			v.Type() == commonpb.MsgType_kSearch ||
-			v.Type() == commonpb.MsgType_kSearchResult {
-			tracer := opentracing.GlobalTracer()
-			ctx := v.GetMsgContext()
-			if ctx == nil {
-				ctx = context.Background()
-			}
-			if parent := opentracing.SpanFromContext(ctx); parent != nil {
-				child = tracer.StartSpan("start send pulsar msg",
-					opentracing.FollowsFrom(parent.Context()))
-			} else {
-				child = tracer.StartSpan("start send pulsar msg, start time: %d")
-			}
-			child.SetTag("hash keys", v.HashKeys())
-			child.SetTag("start time", v.BeginTs())
-			child.SetTag("end time", v.EndTs())
-			child.SetTag("msg type", v.Type())
-			msg.Properties = make(map[string]string)
-			err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
-			if err != nil {
-				child.LogFields(oplog.Error(err))
-				child.Finish()
-				return err
-			}
-			child.LogFields(oplog.String("inject success", "inject success"))
-		}
 		for i := 0; i < producerLen; i++ {
 			if _, err := ms.producers[i].Send(
 				context.Background(),
 				msg,
 			); err != nil {
-				if child != nil {
-					child.LogFields(oplog.Error(err))
-					child.Finish()
-				}
 				return err
 			}
 		}
-		if child != nil {
-			child.Finish()
-		}
 	}
 	return nil
 }
@@ -411,23 +335,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
 					continue
 				}
 				tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType)
-				if tsMsg.Type() == commonpb.MsgType_kSearch ||
-					tsMsg.Type() == commonpb.MsgType_kSearchResult {
-					tracer := opentracing.GlobalTracer()
-					spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
-					if err != nil {
-						log.Println("extract message err")
-						log.Println(err.Error())
-					}
-					span := opentracing.StartSpan("pulsar msg received",
-						ext.RPCServerOption(spanContext))
-					span.SetTag("msg type", tsMsg.Type())
-					span.SetTag("hash keys", tsMsg.HashKeys())
-					span.SetTag("start time", tsMsg.BeginTs())
-					span.SetTag("end time", tsMsg.EndTs())
-					tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
-					span.Finish()
-				}
 				if err != nil {
 					log.Printf("Failed to unmarshal tsMsg, error = %v", err)
 					continue
@@ -521,8 +428,6 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
 	ms.unsolvedBuf = make(map[Consumer][]TsMsg)
 	isChannelReady := make(map[Consumer]bool)
 	eofMsgTimeStamp := make(map[Consumer]Timestamp)
-	spans := make(map[Timestamp]opentracing.Span)
-	ctxs := make(map[Timestamp]context.Context)
 	for _, consumer := range ms.consumers {
 		ms.unsolvedBuf[consumer] = make([]TsMsg, 0)
 	}
@@ -558,22 +463,8 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
 						timeTickMsg = v
 						continue
 					}
-					var ctx context.Context
-					var span opentracing.Span
-					if v.Type() == commonpb.MsgType_kInsert {
-						if _, ok := spans[v.BeginTs()]; !ok {
-							span, ctx = opentracing.StartSpanFromContext(v.GetMsgContext(), "after find time tick")
-							ctxs[v.BeginTs()] = ctx
-							spans[v.BeginTs()] = span
-						}
-					}
 					if v.EndTs() <= timeStamp {
 						timeTickBuf = append(timeTickBuf, v)
-						if v.Type() == commonpb.MsgType_kInsert {
-							v.SetMsgContext(ctxs[v.BeginTs()])
-							spans[v.BeginTs()].Finish()
-							delete(spans, v.BeginTs())
-						}
 					} else {
 						tempBuffer = append(tempBuffer, v)
 					}
@@ -643,23 +534,6 @@ func (ms *PulsarTtMsgStream) findTimeTick(consumer Consumer,
 				MsgID:       typeutil.PulsarMsgIDToString(pulsarMsg.ID()),
 			})
 
-			if tsMsg.Type() == commonpb.MsgType_kInsert {
-				tracer := opentracing.GlobalTracer()
-				spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()})
-				if err != nil {
-					log.Println("extract message err")
-					log.Println(err.Error())
-				}
-				span := opentracing.StartSpan("pulsar msg received",
-					ext.RPCServerOption(spanContext))
-				span.SetTag("hash keys", tsMsg.HashKeys())
-				span.SetTag("start time", tsMsg.BeginTs())
-				span.SetTag("end time", tsMsg.EndTs())
-				span.SetTag("msg type", tsMsg.Type())
-				tsMsg.SetMsgContext(opentracing.ContextWithSpan(context.Background(), span))
-				span.Finish()
-			}
-
 			mu.Lock()
 			ms.unsolvedBuf[consumer] = append(ms.unsolvedBuf[consumer], tsMsg)
 			mu.Unlock()
diff --git a/internal/msgstream/util/repack_func.go b/internal/msgstream/util/repack_func.go
index 1767780657e4b2af6a0293d26790e069f7650b57..45e58f70c8cd4c00e825f2e59710fe2d4fc5eaa0 100644
--- a/internal/msgstream/util/repack_func.go
+++ b/internal/msgstream/util/repack_func.go
@@ -53,9 +53,6 @@ func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
 			}
 
 			insertMsg := &msgstream.InsertMsg{
-				BaseMsg: BaseMsg{
-					MsgCtx: request.GetMsgContext(),
-				},
 				InsertRequest: sliceRequest,
 			}
 			result[key].Msgs = append(result[key].Msgs, insertMsg)
@@ -102,9 +99,6 @@ func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e
 			}
 
 			deleteMsg := &msgstream.DeleteMsg{
-				BaseMsg: BaseMsg{
-					MsgCtx: request.GetMsgContext(),
-				},
 				DeleteRequest: sliceRequest,
 			}
 			result[key].Msgs = append(result[key].Msgs, deleteMsg)
diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go
index 6e951f477cfe36034e5c0bddc508a24d89d5cdcd..de9bfac8282996b5b871810727dc11439e9ec7ba 100644
--- a/internal/proxynode/impl.go
+++ b/internal/proxynode/impl.go
@@ -2,21 +2,18 @@ package proxynode
 
 import (
 	"context"
-	"errors"
 	"log"
 	"strconv"
 	"time"
 
-	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
-
-	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
-
+	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
-	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
-
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
 )
 
 const (
diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go
index fcb44aa13e5426ab9c00d1b6e4d0203c1fe7c8be..509c73c6cf4c3d9b204c1494bbbc60c1b03f30a5 100644
--- a/internal/proxynode/interface.go
+++ b/internal/proxynode/interface.go
@@ -49,7 +49,7 @@ type QueryServiceClient interface {
 
 type DataServiceClient interface {
 	AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error)
-	GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
+	GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
 	Flush(req *datapb.FlushRequest) (*commonpb.Status, error)
 	GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
 
@@ -58,7 +58,7 @@ type DataServiceClient interface {
 }
 
 type ProxyServiceClient interface {
-	GetTimeTickChannel() (string, error)
+	GetTimeTickChannel() (*milvuspb.StringResponse, error)
 	RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
 	GetComponentStates() (*internalpb2.ComponentStates, error)
 }
diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go
index ee999d4d22ee885d76b3b7c31eaf53a8f271a047..850027122d59afa0a65da21ee7b3f303ab47b028 100644
--- a/internal/proxynode/proxy_node.go
+++ b/internal/proxynode/proxy_node.go
@@ -19,9 +19,6 @@ import (
 
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 
-	"github.com/opentracing/opentracing-go"
-	"github.com/uber/jaeger-client-go/config"
-
 	"github.com/zilliztech/milvus-distributed/internal/allocator"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
@@ -57,7 +54,6 @@ type NodeImpl struct {
 	manipulationMsgStream msgstream.MsgStream
 	queryMsgStream        msgstream.MsgStream
 
-	tracer opentracing.Tracer
 	closer io.Closer
 
 	// Add callback functions at different stages
@@ -106,7 +102,6 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string)
 }
 
 func (node *NodeImpl) Init() error {
-	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024)
 
 	// todo wait for proxyservice state changed to Healthy
 
@@ -136,6 +131,8 @@ func (node *NodeImpl) Init() error {
 		return err
 	}
 
+	factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024)
+
 	// wait for dataservice state changed to Healthy
 	if node.dataServiceClient != nil {
 		err = node.waitForServiceReady(node.dataServiceClient, "DataService")
@@ -182,19 +179,6 @@ func (node *NodeImpl) Init() error {
 	//	return err
 	//}
 
-	cfg := &config.Configuration{
-		ServiceName: "proxynode",
-		Sampler: &config.SamplerConfig{
-			Type:  "const",
-			Param: 1,
-		},
-	}
-	node.tracer, node.closer, err = cfg.NewTracer()
-	if err != nil {
-		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
-	}
-	opentracing.SetGlobalTracer(node.tracer)
-
 	node.queryMsgStream, _ = factory.NewMsgStream(node.ctx)
 	node.queryMsgStream.AsProducer(Params.SearchChannelNames)
 	log.Println("create query message stream ...")
diff --git a/internal/proxynode/repack_func.go b/internal/proxynode/repack_func.go
index 01a1a67ada0590e7db5cd6f73d89491948968bbb..3837338f1b73d90a675f02017a3a588a96ccbf19 100644
--- a/internal/proxynode/repack_func.go
+++ b/internal/proxynode/repack_func.go
@@ -214,7 +214,6 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg,
 			insertMsg := &msgstream.InsertMsg{
 				InsertRequest: sliceRequest,
 			}
-			insertMsg.SetMsgContext(request.GetMsgContext())
 			if together { // all rows with same hash value are accumulated to only one message
 				if len(result[key].Msgs) <= 0 {
 					result[key].Msgs = append(result[key].Msgs, insertMsg)
diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go
index bfa92e0ed24580e6cdc10343e61b04db1ac6b358..9aff4e2578ace81a57c216260f7937ec683aeedb 100644
--- a/internal/proxynode/task.go
+++ b/internal/proxynode/task.go
@@ -1,6 +1,7 @@
 package proxynode
 
 import (
+	"context"
 	"errors"
 	"log"
 	"math"
@@ -146,7 +147,6 @@ func (it *InsertTask) Execute() error {
 		EndTs:   it.EndTs(),
 		Msgs:    make([]msgstream.TsMsg, 1),
 	}
-	tsMsg.SetMsgContext(it.Ctx())
 
 	it.result = &milvuspb.InsertResponse{
 		Status: &commonpb.Status{
@@ -160,7 +160,7 @@ func (it *InsertTask) Execute() error {
 
 	stream, err := globalInsertChannelsMap.getInsertMsgStream(collID)
 	if err != nil {
-		collectionInsertChannels, err := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
+		resp, _ := it.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kInsert, // todo
 				MsgID:     it.Base.MsgID,            // todo
@@ -170,10 +170,13 @@ func (it *InsertTask) Execute() error {
 			DbID:         0, // todo
 			CollectionID: collID,
 		})
-		if err != nil {
-			return err
+		if resp == nil {
+			return errors.New("get insert channels resp is nil")
+		}
+		if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+			return errors.New(resp.Status.Reason)
 		}
-		err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels)
+		err = globalInsertChannelsMap.createInsertMsgStream(collID, resp.Values)
 		if err != nil {
 			return err
 		}
@@ -315,7 +318,7 @@ func (cct *CreateCollectionTask) Execute() error {
 		if err != nil {
 			return err
 		}
-		collectionInsertChannels, err := cct.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
+		resp, _ := cct.dataServiceClient.GetInsertChannels(&datapb.InsertChannelRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kInsert, // todo
 				MsgID:     cct.Base.MsgID,           // todo
@@ -325,10 +328,13 @@ func (cct *CreateCollectionTask) Execute() error {
 			DbID:         0, // todo
 			CollectionID: collID,
 		})
-		if err != nil {
-			return err
+		if resp == nil {
+			return errors.New("get insert channels resp is nil")
+		}
+		if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+			return errors.New(resp.Status.Reason)
 		}
-		err = globalInsertChannelsMap.createInsertMsgStream(collID, collectionInsertChannels)
+		err = globalInsertChannelsMap.createInsertMsgStream(collID, resp.Values)
 		if err != nil {
 			return err
 		}
@@ -387,19 +393,19 @@ func (dct *DropCollectionTask) PreExecute() error {
 }
 
 func (dct *DropCollectionTask) Execute() error {
-	var err error
 	collID, err := globalMetaCache.GetCollectionID(dct.CollectionName)
 	if err != nil {
 		return err
 	}
-	dct.result, err = dct.masterClient.DropCollection(dct.DropCollectionRequest)
-	if dct.result.ErrorCode == commonpb.ErrorCode_SUCCESS {
-		err = globalInsertChannelsMap.closeInsertMsgStream(collID)
-		if err != nil {
-			return err
-		}
+	dct.result, _ = dct.masterClient.DropCollection(dct.DropCollectionRequest)
+	if dct.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(dct.result.Reason)
 	}
-	return err
+	err = globalInsertChannelsMap.closeInsertMsgStream(collID)
+	if err != nil {
+		return err
+	}
+	return nil
 }
 
 func (dct *DropCollectionTask) PostExecute() error {
@@ -507,7 +513,6 @@ func (st *SearchTask) Execute() error {
 		EndTs:   st.Base.Timestamp,
 		Msgs:    make([]msgstream.TsMsg, 1),
 	}
-	tsMsg.SetMsgContext(st.Ctx())
 	msgPack.Msgs[0] = tsMsg
 	err := st.queryMsgStream.Produce(msgPack)
 	log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs))
@@ -719,6 +724,12 @@ func (hct *HasCollectionTask) PreExecute() error {
 func (hct *HasCollectionTask) Execute() error {
 	var err error
 	hct.result, err = hct.masterClient.HasCollection(hct.HasCollectionRequest)
+	if hct.result == nil {
+		return errors.New("has collection resp is nil")
+	}
+	if hct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(hct.result.Status.Reason)
+	}
 	return err
 }
 
@@ -775,10 +786,13 @@ func (dct *DescribeCollectionTask) PreExecute() error {
 func (dct *DescribeCollectionTask) Execute() error {
 	var err error
 	dct.result, err = dct.masterClient.DescribeCollection(dct.DescribeCollectionRequest)
-	if err != nil {
-		return err
+	if dct.result == nil {
+		return errors.New("has collection resp is nil")
 	}
-	return nil
+	if dct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(dct.result.Status.Reason)
+	}
+	return err
 }
 
 func (dct *DescribeCollectionTask) PostExecute() error {
@@ -842,9 +856,12 @@ func (g *GetCollectionsStatisticsTask) Execute() error {
 		CollectionID: collID,
 	}
 
-	result, err := g.dataServiceClient.GetCollectionStatistics(req)
-	if err != nil {
-		return err
+	result, _ := g.dataServiceClient.GetCollectionStatistics(req)
+	if result == nil {
+		return errors.New("get collection statistics resp is nil")
+	}
+	if result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(result.Status.Reason)
 	}
 	g.result = &milvuspb.CollectionStatsResponse{
 		Status: &commonpb.Status{
@@ -865,6 +882,7 @@ type ShowCollectionsTask struct {
 	*milvuspb.ShowCollectionRequest
 	masterClient MasterClient
 	result       *milvuspb.ShowCollectionResponse
+	ctx          context.Context
 }
 
 func (sct *ShowCollectionsTask) OnEnqueue() error {
@@ -906,6 +924,12 @@ func (sct *ShowCollectionsTask) PreExecute() error {
 func (sct *ShowCollectionsTask) Execute() error {
 	var err error
 	sct.result, err = sct.masterClient.ShowCollections(sct.ShowCollectionRequest)
+	if sct.result == nil {
+		return errors.New("get collection statistics resp is nil")
+	}
+	if sct.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(sct.result.Status.Reason)
+	}
 	return err
 }
 
@@ -968,6 +992,12 @@ func (cpt *CreatePartitionTask) PreExecute() error {
 
 func (cpt *CreatePartitionTask) Execute() (err error) {
 	cpt.result, err = cpt.masterClient.CreatePartition(cpt.CreatePartitionRequest)
+	if cpt.result == nil {
+		return errors.New("get collection statistics resp is nil")
+	}
+	if cpt.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(cpt.result.Reason)
+	}
 	return err
 }
 
@@ -1030,6 +1060,12 @@ func (dpt *DropPartitionTask) PreExecute() error {
 
 func (dpt *DropPartitionTask) Execute() (err error) {
 	dpt.result, err = dpt.masterClient.DropPartition(dpt.DropPartitionRequest)
+	if dpt.result == nil {
+		return errors.New("get collection statistics resp is nil")
+	}
+	if dpt.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(dpt.result.Reason)
+	}
 	return err
 }
 
@@ -1091,6 +1127,12 @@ func (hpt *HasPartitionTask) PreExecute() error {
 
 func (hpt *HasPartitionTask) Execute() (err error) {
 	hpt.result, err = hpt.masterClient.HasPartition(hpt.HasPartitionRequest)
+	if hpt.result == nil {
+		return errors.New("get collection statistics resp is nil")
+	}
+	if hpt.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(hpt.result.Status.Reason)
+	}
 	return err
 }
 
@@ -1147,10 +1189,13 @@ func (spt *ShowPartitionsTask) PreExecute() error {
 func (spt *ShowPartitionsTask) Execute() error {
 	var err error
 	spt.result, err = spt.masterClient.ShowPartitions(spt.ShowPartitionRequest)
-	if err != nil {
-		return err
+	if spt.result == nil {
+		return errors.New("get collection statistics resp is nil")
 	}
-	return nil
+	if spt.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(spt.result.Status.Reason)
+	}
+	return err
 }
 
 func (spt *ShowPartitionsTask) PostExecute() error {
@@ -1210,8 +1255,15 @@ func (cit *CreateIndexTask) PreExecute() error {
 	return nil
 }
 
-func (cit *CreateIndexTask) Execute() (err error) {
+func (cit *CreateIndexTask) Execute() error {
+	var err error
 	cit.result, err = cit.masterClient.CreateIndex(cit.CreateIndexRequest)
+	if cit.result == nil {
+		return errors.New("get collection statistics resp is nil")
+	}
+	if cit.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(cit.result.Reason)
+	}
 	return err
 }
 
@@ -1275,6 +1327,12 @@ func (dit *DescribeIndexTask) PreExecute() error {
 func (dit *DescribeIndexTask) Execute() error {
 	var err error
 	dit.result, err = dit.masterClient.DescribeIndex(dit.DescribeIndexRequest)
+	if dit.result == nil {
+		return errors.New("get collection statistics resp is nil")
+	}
+	if dit.result.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		return errors.New(dit.result.Status.Reason)
+	}
 	return err
 }
 
@@ -1495,9 +1553,9 @@ func (ft *FlushTask) Execute() error {
 			CollectionID: collID,
 		}
 		var status *commonpb.Status
-		status, err = ft.dataServiceClient.Flush(flushReq)
-		if err != nil {
-			return nil
+		status, _ = ft.dataServiceClient.Flush(flushReq)
+		if status == nil {
+			return errors.New("flush resp is nil")
 		}
 		if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
 			return errors.New(status.Reason)
diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go
index f03afa130e41025ac284685f937864103300acae..2f49f671743d9fd6061a17432afbf2a2a60f4ab3 100644
--- a/internal/proxyservice/impl.go
+++ b/internal/proxyservice/impl.go
@@ -114,7 +114,7 @@ func (s *ServiceImpl) Init() error {
 	}
 	insertTickMsgStream, _ := factory.NewMsgStream(s.ctx)
 	insertTickMsgStream.AsProducer(channels)
-	log.Println("create service time tick producer channel: ", channels)
+	log.Println("create insert time tick producer channel: ", channels)
 
 	nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx)
 	nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel,
@@ -175,11 +175,16 @@ func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
 	s.stateCode = code
 }
 
-func (s *ServiceImpl) GetTimeTickChannel() (string, error) {
-	return Params.ServiceTimeTickChannel, nil
+func (s *ServiceImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
+	return &milvuspb.StringResponse{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Value: Params.ServiceTimeTickChannel,
+	}, nil
 }
 
-func (s *ServiceImpl) GetStatisticsChannel() (string, error) {
+func (s *ServiceImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
 	panic("implement me")
 }
 
@@ -260,7 +265,7 @@ func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxy
 	return t.response, nil
 }
 
-func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error {
+func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
 	log.Println("InvalidateCollectionMetaCache")
 	ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
 	defer cancel()
@@ -275,13 +280,13 @@ func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateC
 
 	err = s.sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(t)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	err = t.WaitToFinish()
 	if err != nil {
-		return err
+		return nil, err
 	}
 
-	return nil
+	return nil, nil
 }
diff --git a/internal/proxyservice/interface.go b/internal/proxyservice/interface.go
index 7171a4465a9b16fbcfd58f1b8fc61e0f20b3de10..0f21af5b9786b6165b79a5fcde99ab0199a44ab1 100644
--- a/internal/proxyservice/interface.go
+++ b/internal/proxyservice/interface.go
@@ -1,6 +1,7 @@
 package proxyservice
 
 import (
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
 	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
@@ -15,5 +16,5 @@ type ProxyService interface {
 	RegisterLink() (*milvuspb.RegisterLinkResponse, error)
 	RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
 	// TODO: i'm sure it's not a best way to keep consistency, fix me
-	InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
+	InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
 }
diff --git a/internal/proxyservice/node_info.go b/internal/proxyservice/node_info.go
index 0bceb22fbe6300178b681b1dd7db59e22fa2ad1d..9b51c58923279836548db66be0a05e3c83d06648 100644
--- a/internal/proxyservice/node_info.go
+++ b/internal/proxyservice/node_info.go
@@ -8,6 +8,8 @@ import (
 	"sync"
 	"time"
 
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+
 	grpcproxynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode/client"
 
 	"github.com/zilliztech/milvus-distributed/internal/errors"
@@ -25,7 +27,7 @@ type NodeClient interface {
 	Start() error
 	Stop() error
 
-	InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) error
+	InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
 }
 
 type GlobalNodeInfoTable struct {
diff --git a/internal/proxyservice/task.go b/internal/proxyservice/task.go
index 1195eb39f78b00abe2ad60ba7085a8aed3d97357..17fab676bdf64107557fc4f7f62f532b1b7729b5 100644
--- a/internal/proxyservice/task.go
+++ b/internal/proxyservice/task.go
@@ -149,9 +149,12 @@ func (t *InvalidateCollectionMetaCacheTask) Execute() error {
 		return err
 	}
 	for _, c := range clients {
-		err = c.InvalidateCollectionMetaCache(t.request)
-		if err != nil {
-			return err
+		status, _ := c.InvalidateCollectionMetaCache(t.request)
+		if status == nil {
+			return errors.New("invalidate collection meta cache error")
+		}
+		if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+			return errors.New(status.Reason)
 		}
 	}
 	return nil
diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go
index d166d11f54f1b37266e4727d29a544b7a5c8f75e..171875b5d8e2f63444e32712df0a1ad938917319 100644
--- a/internal/querynode/flow_graph_filter_dm_node.go
+++ b/internal/querynode/flow_graph_filter_dm_node.go
@@ -1,11 +1,9 @@
 package querynode
 
 import (
-	"context"
 	"log"
 	"math"
 
-	"github.com/opentracing/opentracing-go"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 )
@@ -34,28 +32,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
 		// TODO: add error handling
 	}
 
-	var childs []opentracing.Span
-	tracer := opentracing.GlobalTracer()
-	if tracer != nil && msgStreamMsg != nil {
-		for _, msg := range msgStreamMsg.TsMessages() {
-			if msg.Type() == commonpb.MsgType_kInsert || msg.Type() == commonpb.MsgType_kSearch {
-				var child opentracing.Span
-				ctx := msg.GetMsgContext()
-				if parent := opentracing.SpanFromContext(ctx); parent != nil {
-					child = tracer.StartSpan("pass filter node",
-						opentracing.FollowsFrom(parent.Context()))
-				} else {
-					child = tracer.StartSpan("pass filter node")
-				}
-				child.SetTag("hash keys", msg.HashKeys())
-				child.SetTag("start time", msg.BeginTs())
-				child.SetTag("end time", msg.EndTs())
-				msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
-				childs = append(childs, child)
-			}
-		}
-	}
-
 	ddMsg, ok := (*in[1]).(*ddMsg)
 	if !ok {
 		log.Println("type assertion failed for ddMsg")
@@ -70,20 +46,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
 			timestampMax: msgStreamMsg.TimestampMax(),
 		},
 	}
-	for key, msg := range msgStreamMsg.TsMessages() {
+	for _, msg := range msgStreamMsg.TsMessages() {
 		switch msg.Type() {
 		case commonpb.MsgType_kInsert:
-			var ctx2 context.Context
-			if childs != nil {
-				if childs[key] != nil {
-					ctx2 = opentracing.ContextWithSpan(msg.GetMsgContext(), childs[key])
-				} else {
-					ctx2 = context.Background()
-				}
-			}
 			resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
 			if resMsg != nil {
-				resMsg.SetMsgContext(ctx2)
 				iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
 			}
 		// case commonpb.MsgType_kDelete:
@@ -96,9 +63,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
 	iMsg.gcRecord = ddMsg.gcRecord
 	var res Msg = &iMsg
 
-	for _, child := range childs {
-		child.Finish()
-	}
 	return []*Msg{&res}
 }
 
diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go
index dcb6216ae8d530d9dcded3963043e4d2904db533..a47facef0dc9dcddcc3e82a2f7cbf4b6da9d2329 100644
--- a/internal/querynode/flow_graph_insert_node.go
+++ b/internal/querynode/flow_graph_insert_node.go
@@ -6,8 +6,6 @@ import (
 	"log"
 	"sync"
 
-	"github.com/opentracing/opentracing-go"
-	oplog "github.com/opentracing/opentracing-go/log"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 )
 
@@ -42,30 +40,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
 		// TODO: add error handling
 	}
 
-	var childs []opentracing.Span
-	tracer := opentracing.GlobalTracer()
-	if tracer != nil && iMsg != nil {
-		for _, msg := range iMsg.insertMessages {
-			if msg.Type() == commonpb.MsgType_kInsert || msg.Type() == commonpb.MsgType_kSearch {
-				var child opentracing.Span
-				ctx := msg.GetMsgContext()
-				if parent := opentracing.SpanFromContext(ctx); parent != nil {
-					child = tracer.StartSpan("pass insert node",
-						opentracing.FollowsFrom(parent.Context()))
-				} else {
-					child = tracer.StartSpan("pass insert node")
-				}
-				child.SetTag("hash keys", msg.HashKeys())
-				child.SetTag("start time", msg.BeginTs())
-				child.SetTag("end time", msg.EndTs())
-				msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
-				childs = append(childs, child)
-			}
-		}
-	}
-
 	insertData := InsertData{
-		insertContext:    make(map[int64]context.Context),
 		insertIDs:        make(map[int64][]int64),
 		insertTimestamps: make(map[int64][]uint64),
 		insertRecords:    make(map[int64][]*commonpb.Blob),
@@ -74,7 +49,6 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
 
 	// 1. hash insertMessages to insertData
 	for _, task := range iMsg.insertMessages {
-		insertData.insertContext[task.SegmentID] = task.GetMsgContext()
 		insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
 		insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
 		insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
@@ -108,7 +82,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
 	wg := sync.WaitGroup{}
 	for segmentID := range insertData.insertRecords {
 		wg.Add(1)
-		go iNode.insert(insertData.insertContext[segmentID], &insertData, segmentID, &wg)
+		go iNode.insert(&insertData, segmentID, &wg)
 	}
 	wg.Wait()
 
@@ -116,21 +90,15 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
 		gcRecord:  iMsg.gcRecord,
 		timeRange: iMsg.timeRange,
 	}
-	for _, child := range childs {
-		child.Finish()
-	}
 	return []*Msg{&res}
 }
 
-func (iNode *insertNode) insert(ctx context.Context, insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
-	span, _ := opentracing.StartSpanFromContext(ctx, "insert node insert function")
-	defer span.Finish()
+func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
 	var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
 	if err != nil {
 		log.Println("cannot find segment:", segmentID)
 		// TODO: add error handling
 		wg.Done()
-		span.LogFields(oplog.Error(err))
 		return
 	}
 
@@ -144,7 +112,6 @@ func (iNode *insertNode) insert(ctx context.Context, insertData *InsertData, seg
 		log.Println(err)
 		// TODO: add error handling
 		wg.Done()
-		span.LogFields(oplog.Error(err))
 		return
 	}
 
diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go
index 9f4629c1b924392a036df8ae4f6ccf75dbc80f7e..2be99ad9530b9d8d435434bc985cf606e95b2be1 100644
--- a/internal/querynode/query_node.go
+++ b/internal/querynode/query_node.go
@@ -20,9 +20,6 @@ import (
 	"log"
 	"sync/atomic"
 
-	"github.com/opentracing/opentracing-go"
-	"github.com/uber/jaeger-client-go/config"
-
 	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -62,7 +59,6 @@ type QueryNode struct {
 	statsService    *statsService
 
 	//opentracing
-	tracer opentracing.Tracer
 	closer io.Closer
 
 	// clients
@@ -85,20 +81,6 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
 		statsService:    nil,
 	}
 
-	var err error
-	cfg := &config.Configuration{
-		ServiceName: "query_node",
-		Sampler: &config.SamplerConfig{
-			Type:  "const",
-			Param: 1,
-		},
-	}
-	node.tracer, node.closer, err = cfg.NewTracer()
-	if err != nil {
-		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
-	}
-	opentracing.SetGlobalTracer(node.tracer)
-
 	segmentsMap := make(map[int64]*Segment)
 	collections := make([]*Collection, 0)
 
@@ -126,20 +108,6 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode {
 		statsService:    nil,
 	}
 
-	var err error
-	cfg := &config.Configuration{
-		ServiceName: "query_node",
-		Sampler: &config.SamplerConfig{
-			Type:  "const",
-			Param: 1,
-		},
-	}
-	node.tracer, node.closer, err = cfg.NewTracer()
-	if err != nil {
-		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
-	}
-	opentracing.SetGlobalTracer(node.tracer)
-
 	segmentsMap := make(map[int64]*Segment)
 	collections := make([]*Collection, 0)
 
diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go
index c44e9406db831ace75c5568eda195426ce4c7a2f..b663d9e9071c2ba268c415e911de7f86aeb1fe1e 100644
--- a/internal/querynode/search_service.go
+++ b/internal/querynode/search_service.go
@@ -9,9 +9,6 @@ import (
 	"strconv"
 	"sync"
 
-	"github.com/opentracing/opentracing-go"
-	oplog "github.com/opentracing/opentracing-go/log"
-
 	"github.com/golang/protobuf/proto"
 
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
@@ -145,19 +142,14 @@ func (ss *searchService) receiveSearchMsg() {
 				searchMsg = append(searchMsg, msgPack.Msgs[i])
 			}
 			for _, msg := range searchMsg {
-				span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "receive search msg")
-				msg.SetMsgContext(ctx)
 				err := ss.search(msg)
 				if err != nil {
 					log.Println(err)
-					span.LogFields(oplog.Error(err))
 					err2 := ss.publishFailedSearchResult(msg, err.Error())
 					if err2 != nil {
-						span.LogFields(oplog.Error(err2))
 						log.Println("publish FailedSearchResult failed, error message: ", err2)
 					}
 				}
-				span.Finish()
 			}
 			log.Println("ReceiveSearchMsg, do search done, num of searchMsg = ", len(searchMsg))
 		}
@@ -219,12 +211,8 @@ func (ss *searchService) doUnsolvedMsgSearch() {
 // TODO:: cache map[dsl]plan
 // TODO: reBatched search requests
 func (ss *searchService) search(msg msgstream.TsMsg) error {
-	span, ctx := opentracing.StartSpanFromContext(msg.GetMsgContext(), "do search")
-	defer span.Finish()
-	msg.SetMsgContext(ctx)
 	searchMsg, ok := msg.(*msgstream.SearchMsg)
 	if !ok {
-		span.LogFields(oplog.Error(errors.New("invalid request type = " + string(msg.Type()))))
 		return errors.New("invalid request type = " + string(msg.Type()))
 	}
 
@@ -233,25 +221,21 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
 	query := milvuspb.SearchRequest{}
 	err := proto.Unmarshal(queryBlob, &query)
 	if err != nil {
-		span.LogFields(oplog.Error(err))
 		return errors.New("unmarshal query failed")
 	}
 	collectionID := searchMsg.CollectionID
 	collection, err := ss.replica.getCollectionByID(collectionID)
 	if err != nil {
-		span.LogFields(oplog.Error(err))
 		return err
 	}
 	dsl := query.Dsl
 	plan, err := createPlan(*collection, dsl)
 	if err != nil {
-		span.LogFields(oplog.Error(err))
 		return err
 	}
 	placeHolderGroupBlob := query.PlaceholderGroup
 	placeholderGroup, err := parserPlaceholderGroup(plan, placeHolderGroupBlob)
 	if err != nil {
-		span.LogFields(oplog.Error(err))
 		return err
 	}
 	placeholderGroups := make([]*PlaceholderGroup, 0)
@@ -290,7 +274,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
 			searchResult, err := segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp})
 
 			if err != nil {
-				span.LogFields(oplog.Error(err))
 				return err
 			}
 			searchResults = append(searchResults, searchResult)
@@ -306,7 +289,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
 			for i := 0; i < int(nq); i++ {
 				bs, err := proto.Marshal(hit)
 				if err != nil {
-					span.LogFields(oplog.Error(err))
 					return err
 				}
 				nilHits[i] = bs
@@ -329,7 +311,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
 			}
 			err = ss.publishSearchResult(searchResultMsg)
 			if err != nil {
-				span.LogFields(oplog.Error(err))
 				return err
 			}
 			return nil
@@ -340,22 +321,18 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
 	numSegment := int64(len(searchResults))
 	err2 := reduceSearchResults(searchResults, numSegment, inReduced)
 	if err2 != nil {
-		span.LogFields(oplog.Error(err2))
 		return err2
 	}
 	err = fillTargetEntry(plan, searchResults, matchedSegments, inReduced)
 	if err != nil {
-		span.LogFields(oplog.Error(err))
 		return err
 	}
 	marshaledHits, err := reorganizeQueryResults(plan, placeholderGroups, searchResults, numSegment, inReduced)
 	if err != nil {
-		span.LogFields(oplog.Error(err))
 		return err
 	}
 	hitsBlob, err := marshaledHits.getHitsBlob()
 	if err != nil {
-		span.LogFields(oplog.Error(err))
 		return err
 	}
 
@@ -407,7 +384,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
 		//}
 		err = ss.publishSearchResult(searchResultMsg)
 		if err != nil {
-			span.LogFields(oplog.Error(err))
 			return err
 		}
 	}
diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go
index cb55a836db5f7d81166c46e3732bb3f54be3ad03..f80b92e6ac20709d0482e2dcc802b1eab12a9f5f 100644
--- a/internal/queryservice/queryservice.go
+++ b/internal/queryservice/queryservice.go
@@ -24,7 +24,7 @@ type MasterServiceInterface interface {
 
 type DataServiceInterface interface {
 	GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
-	GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
+	GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
 }
 
 type QueryNodeInterface interface {
@@ -188,10 +188,17 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
 		DbID:         req.DbID,
 		CollectionID: req.CollectionID,
 	}
-	dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
+	resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
+	if resp == nil {
+		err = errors.New("get insert channels resp is nil")
+	}
+	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+		err = errors.New(resp.Status.Reason)
+	}
 	if err != nil {
 		return fn(err), err
 	}
+	dmChannels := resp.Values
 
 	// get partitionIDs
 	showPartitionRequest := &milvuspb.ShowPartitionRequest{
@@ -428,10 +435,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
 			CollectionID: collectionID,
 		}
 
-		dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
-		if err != nil {
+		resp, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
+		if resp == nil {
+			err = errors.New("get insert channels resp is nil")
+			return fn(err), err
+		}
+		if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
+			err = errors.New(resp.Status.Reason)
 			return fn(err), err
 		}
+		dmChannels := resp.Values
 		for _, partitionID := range partitionIDs {
 			loadSegmentRequest := &querypb.LoadSegmentRequest{
 				CollectionID: collectionID,
diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go
index a6e36bbee1c1cddb17b93dd8f30cc539b969ccc1..3c7ca361614f48dd96214b7b21ae513367e2cb04 100644
--- a/internal/queryservice/queryservice_test.go
+++ b/internal/queryservice/queryservice_test.go
@@ -155,8 +155,13 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap
 
 	return ret, nil
 }
-func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
-	return []string{"test-insert"}, nil
+func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
+	return &internalpb2.StringList{
+		Status: &commonpb.Status{
+			ErrorCode: commonpb.ErrorCode_SUCCESS,
+		},
+		Values: []string{"test-insert"},
+	}, nil
 }
 
 func TestQueryService_Init(t *testing.T) {
diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go
index e79f350b85517e0318eac4c7061afd2f25cf2b2a..0c0730f7f22ce68b7b6751722ef08572275ff695 100644
--- a/internal/util/flowgraph/input_node.go
+++ b/internal/util/flowgraph/input_node.go
@@ -1,12 +1,8 @@
 package flowgraph
 
 import (
-	"fmt"
 	"log"
 
-	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
-
-	"github.com/opentracing/opentracing-go"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 )
 
@@ -34,28 +30,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg {
 
 	msgPack := (*inNode.inStream).Consume()
 
-	var childs []opentracing.Span
-	tracer := opentracing.GlobalTracer()
-	if tracer != nil && msgPack != nil {
-		for _, msg := range msgPack.Msgs {
-			if msg.Type() == commonpb.MsgType_kInsert {
-				var child opentracing.Span
-				ctx := msg.GetMsgContext()
-				if parent := opentracing.SpanFromContext(ctx); parent != nil {
-					child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()),
-						opentracing.FollowsFrom(parent.Context()))
-				} else {
-					child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()))
-				}
-				child.SetTag("hash keys", msg.HashKeys())
-				child.SetTag("start time", msg.BeginTs())
-				child.SetTag("end time", msg.EndTs())
-				msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
-				childs = append(childs, child)
-			}
-		}
-	}
-
 	// TODO: add status
 	if msgPack == nil {
 		log.Println("null msg pack")
@@ -69,10 +43,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg {
 		startPositions: msgPack.StartPositions,
 	}
 
-	for _, child := range childs {
-		child.Finish()
-	}
-
 	return []*Msg{&msgStreamMsg}
 }
 
diff --git a/internal/util/typeutil/interface.go b/internal/util/typeutil/interface.go
index 2116a9e09bdaf50c82e9879db1cef3f79c261db9..dc9c3f52b44fb66b097baa8da825888b25ca0aa3 100644
--- a/internal/util/typeutil/interface.go
+++ b/internal/util/typeutil/interface.go
@@ -4,6 +4,7 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
 )
 
@@ -15,8 +16,8 @@ type Service interface {
 
 type Component interface {
 	GetComponentStates() (*internalpb2.ComponentStates, error)
-	GetTimeTickChannel() (string, error)
-	GetStatisticsChannel() (string, error)
+	GetTimeTickChannel() (*milvuspb.StringResponse, error)
+	GetStatisticsChannel() (*milvuspb.StringResponse, error)
 }
 
 type IndexNodeInterface interface {