From 74154a11a4540ed16e1d50e7c730c3c6c1551330 Mon Sep 17 00:00:00 2001
From: quicksilver <zhifeng.zhang@zilliz.com>
Date: Fri, 5 Mar 2021 16:52:45 +0800
Subject: [PATCH] Fix deploy error during regression stage

Signed-off-by: quicksilver <zhifeng.zhang@zilliz.com>
---
 .../Regression/PythonRegression.groovy        |  1 +
 internal/datanode/allocator.go                | 10 +-
 internal/datanode/collection_replica.go       | 60 ++++++------
 internal/datanode/collection_replica_test.go  |  4 +-
 internal/datanode/data_node.go                |  2 +-
 internal/datanode/data_sync_service.go        |  4 +-
 internal/datanode/flow_graph_dd_node.go       |  4 +-
 .../datanode/flow_graph_insert_buffer_node.go |  4 +-
 internal/dataservice/allocator.go             | 12 +--
 internal/dataservice/dd_handler.go            |  4 +-
 internal/dataservice/mock.go                  |  2 +-
 internal/dataservice/segment_allocator.go     | 76 +++++++--------
 internal/dataservice/server.go                |  8 +-
 internal/dataservice/watcher.go               |  8 +-
 internal/distributed/indexnode/service.go     |  4 +-
 internal/distributed/indexservice/service.go  |  4 +-
 internal/distributed/proxynode/service.go     |  4 +-
 internal/distributed/proxyservice/service.go  |  6 +-
 internal/indexnode/indexnode.go               | 30 +++---
 internal/indexservice/indexservice.go         | 36 +++----
 internal/indexservice/node_mgr.go             | 10 +-
 internal/masterservice/master_service.go      |  5 +-
 internal/masterservice/meta_table_test.go     |  3 +-
 internal/proxynode/impl.go                    | 58 +++++------
 internal/proxynode/insert_channels.go         |  6 +-
 internal/proxynode/interface.go               |  2 +-
 internal/proxynode/proxy_node.go              | 34 +++----
 internal/proxynode/task.go                    |  4 +-
 internal/proxynode/task_scheduler.go          |  4 +-
 internal/proxyservice/impl.go                 | 22 ++---
 internal/proxyservice/interface.go            |  8 +-
 internal/proxyservice/nodeid_allocator.go     | 12 +--
 internal/proxyservice/proxyservice.go         |  8 +-
 internal/proxyservice/timetick.go             | 29 +++---
 internal/querynode/collection_replica.go      | 96 +++++++++----------
 internal/querynode/data_sync_service.go       |  4 +-
 internal/querynode/flow_graph_dd_node.go      |  4 +-
 .../querynode/flow_graph_filter_dm_node.go    |  4 +-
 internal/querynode/flow_graph_gc_node.go      |  4 +-
 internal/querynode/flow_graph_insert_node.go  |  4 +-
 .../querynode/flow_graph_service_time_node.go |  4 +-
 internal/querynode/index_loader.go            |  4 +-
 internal/querynode/load_service.go            |  2 +-
 internal/querynode/meta_service.go            |  4 +-
 internal/querynode/query_node.go              |  6 +-
 internal/querynode/search_service.go          |  4 +-
 internal/querynode/segment_loader.go          |  4 +-
 internal/querynode/stats_service.go           |  4 +-
 internal/querynode/tsafe.go                   | 14 +--
 internal/queryservice/meta_replica.go         | 34 +++----
 internal/queryservice/queryservice.go         | 18 +++-
 internal/util/typeutil/interface.go           |  4 +-
 52 files changed, 355 insertions(+), 351 deletions(-)

diff --git a/.jenkins/modules/Regression/PythonRegression.groovy b/.jenkins/modules/Regression/PythonRegression.groovy
index 2ccb9ed52..0d680d469 100644
--- a/.jenkins/modules/Regression/PythonRegression.groovy
+++ b/.jenkins/modules/Regression/PythonRegression.groovy
@@ -6,6 +6,7 @@ timeout(time: 60, unit: 'MINUTES') {
         sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d etcd'
         sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d minio'
         dir ('build/docker/deploy') {
+            sh 'docker pull ${TARGET_REPO}/milvus-distributed:${TARGET_TAG}'
             if ("${REGRESSION_SERVICE_NAME}" == "regression_distributed") {
                 sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d master'
                 sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d indexservice'
diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go
index bf4dfe3f5..80ebf08f2 100644
--- a/internal/datanode/allocator.go
+++ b/internal/datanode/allocator.go
@@ -8,22 +8,22 @@ import (
 )
 
 type (
-	allocator interface {
+	allocatorInterface interface {
 		allocID() (UniqueID, error)
 	}
 
-	allocatorImpl struct {
+	allocator struct {
 		masterService MasterServiceInterface
 	}
 )
 
-func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
-	return &allocatorImpl{
+func newAllocator(s MasterServiceInterface) *allocator {
+	return &allocator{
 		masterService: s,
 	}
 }
 
-func (alloc *allocatorImpl) allocID() (UniqueID, error) {
+func (alloc *allocator) allocID() (UniqueID, error) {
 	ctx := context.TODO()
 	resp, err := alloc.masterService.AllocID(ctx, &masterpb.IDRequest{
 		Base: &commonpb.MsgBase{
diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go
index a83370be9..1e34623d5 100644
--- a/internal/datanode/collection_replica.go
+++ b/internal/datanode/collection_replica.go
@@ -29,32 +29,30 @@ type Replica interface {
 	getSegmentByID(segmentID UniqueID) (*Segment, error)
 }
 
-type (
-	Segment struct {
-		segmentID     UniqueID
-		collectionID  UniqueID
-		partitionID   UniqueID
-		numRows       int64
-		memorySize    int64
-		isNew         bool
-		createTime    Timestamp // not using
-		endTime       Timestamp // not using
-		startPosition *internalpb2.MsgPosition
-		endPosition   *internalpb2.MsgPosition // not using
-	}
+type Segment struct {
+	segmentID     UniqueID
+	collectionID  UniqueID
+	partitionID   UniqueID
+	numRows       int64
+	memorySize    int64
+	isNew         bool
+	createTime    Timestamp // not using
+	endTime       Timestamp // not using
+	startPosition *internalpb2.MsgPosition
+	endPosition   *internalpb2.MsgPosition // not using
+}
 
-	ReplicaImpl struct {
-		mu          sync.RWMutex
-		segments    []*Segment
-		collections map[UniqueID]*Collection
-	}
-)
+type CollectionSegmentReplica struct {
+	mu          sync.RWMutex
+	segments    []*Segment
+	collections map[UniqueID]*Collection
+}
 
 func newReplica() Replica {
 	segments := make([]*Segment, 0)
 	collections := make(map[UniqueID]*Collection)
 
-	var replica Replica = &ReplicaImpl{
+	var replica Replica = &CollectionSegmentReplica{
 		segments:    segments,
 		collections: collections,
 	}
@@ -62,7 +60,7 @@ func newReplica() Replica {
 }
 
 // --- segment ---
-func (replica *ReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
+func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
 	replica.mu.RLock()
 	defer replica.mu.RUnlock()
 
@@ -74,7 +72,7 @@ func (replica *ReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error)
 	return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
 }
 
-func (replica *ReplicaImpl) addSegment(
+func (replica *CollectionSegmentReplica) addSegment(
 	segmentID UniqueID,
 	collID UniqueID,
 	partitionID UniqueID,
@@ -101,7 +99,7 @@ func (replica *ReplicaImpl) addSegment(
 	return nil
 }
 
-func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error {
+func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
 	replica.mu.Lock()
 	defer replica.mu.Unlock()
 
@@ -117,7 +115,7 @@ func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error {
 	return fmt.Errorf("Error, there's no segment %v", segmentID)
 }
 
-func (replica *ReplicaImpl) hasSegment(segmentID UniqueID) bool {
+func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
 	replica.mu.RLock()
 	defer replica.mu.RUnlock()
 
@@ -129,7 +127,7 @@ func (replica *ReplicaImpl) hasSegment(segmentID UniqueID) bool {
 	return false
 }
 
-func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64) error {
+func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
 	replica.mu.Lock()
 	defer replica.mu.Unlock()
 
@@ -144,7 +142,7 @@ func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64)
 	return fmt.Errorf("Error, there's no segment %v", segmentID)
 }
 
-func (replica *ReplicaImpl) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) {
+func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) {
 	replica.mu.Lock()
 	defer replica.mu.Unlock()
 
@@ -169,14 +167,14 @@ func (replica *ReplicaImpl) getSegmentStatisticsUpdates(segmentID UniqueID) (*in
 }
 
 // --- collection ---
-func (replica *ReplicaImpl) getCollectionNum() int {
+func (replica *CollectionSegmentReplica) getCollectionNum() int {
 	replica.mu.RLock()
 	defer replica.mu.RUnlock()
 
 	return len(replica.collections)
 }
 
-func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
+func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
 	replica.mu.Lock()
 	defer replica.mu.Unlock()
 
@@ -195,7 +193,7 @@ func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemap
 	return nil
 }
 
-func (replica *ReplicaImpl) removeCollection(collectionID UniqueID) error {
+func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error {
 	replica.mu.Lock()
 	defer replica.mu.Unlock()
 
@@ -204,7 +202,7 @@ func (replica *ReplicaImpl) removeCollection(collectionID UniqueID) error {
 	return nil
 }
 
-func (replica *ReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
+func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
 	replica.mu.RLock()
 	defer replica.mu.RUnlock()
 
@@ -216,7 +214,7 @@ func (replica *ReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collectio
 	return coll, nil
 }
 
-func (replica *ReplicaImpl) hasCollection(collectionID UniqueID) bool {
+func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
 	replica.mu.RLock()
 	defer replica.mu.RUnlock()
 
diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go
index 1c14c7b54..0f52d3282 100644
--- a/internal/datanode/collection_replica_test.go
+++ b/internal/datanode/collection_replica_test.go
@@ -16,10 +16,10 @@ func TestReplica_Collection(t *testing.T) {
 		replica := newReplica()
 		assert.Zero(t, replica.getCollectionNum())
 
-		replica = new(ReplicaImpl)
+		replica = new(CollectionSegmentReplica)
 		assert.Zero(t, replica.getCollectionNum())
 
-		replica = &ReplicaImpl{
+		replica = &CollectionSegmentReplica{
 			collections: map[UniqueID]*Collection{
 				0: {id: 0},
 				1: {id: 1},
diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index b61e0c98e..99336c912 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -157,7 +157,7 @@ func (node *DataNode) Init() error {
 
 	replica := newReplica()
 
-	var alloc allocator = newAllocatorImpl(node.masterService)
+	var alloc allocatorInterface = newAllocator(node.masterService)
 
 	chanSize := 100
 	node.flushChan = make(chan *flushMsg, chanSize)
diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go
index 71276889a..b35383cca 100644
--- a/internal/datanode/data_sync_service.go
+++ b/internal/datanode/data_sync_service.go
@@ -19,12 +19,12 @@ type dataSyncService struct {
 	fg          *flowgraph.TimeTickedFlowGraph
 	flushChan   chan *flushMsg
 	replica     Replica
-	idAllocator allocator
+	idAllocator allocatorInterface
 	msFactory   msgstream.Factory
 }
 
 func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
-	replica Replica, alloc allocator, factory msgstream.Factory) *dataSyncService {
+	replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
 	service := &dataSyncService{
 		ctx:         ctx,
 		fg:          nil,
diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go
index 1295004f5..48b30481d 100644
--- a/internal/datanode/flow_graph_dd_node.go
+++ b/internal/datanode/flow_graph_dd_node.go
@@ -26,7 +26,7 @@ type ddNode struct {
 	ddBuffer  *ddBuffer
 	inFlushCh chan *flushMsg
 
-	idAllocator allocator
+	idAllocator allocatorInterface
 	kv          kv.Base
 	replica     Replica
 	flushMeta   *metaTable
@@ -369,7 +369,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
 }
 
 func newDDNode(ctx context.Context, flushMeta *metaTable,
-	inFlushCh chan *flushMsg, replica Replica, alloc allocator) *ddNode {
+	inFlushCh chan *flushMsg, replica Replica, alloc allocatorInterface) *ddNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index 9d1e66af3..686151ba3 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -41,7 +41,7 @@ type (
 		minIOKV     kv.Base
 		minioPrefix string
 
-		idAllocator allocator
+		idAllocator allocatorInterface
 
 		timeTickStream          msgstream.MsgStream
 		segmentStatisticsStream msgstream.MsgStream
@@ -622,7 +622,7 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (
 }
 
 func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
-	replica Replica, alloc allocator, factory msgstream.Factory) *insertBufferNode {
+	replica Replica, alloc allocatorInterface, factory msgstream.Factory) *insertBufferNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/dataservice/allocator.go b/internal/dataservice/allocator.go
index f095fdaba..0a7cf1a7c 100644
--- a/internal/dataservice/allocator.go
+++ b/internal/dataservice/allocator.go
@@ -7,22 +7,22 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
 )
 
-type allocator interface {
+type allocatorInterface interface {
 	allocTimestamp() (Timestamp, error)
 	allocID() (UniqueID, error)
 }
 
-type allocatorImpl struct {
+type allocator struct {
 	masterClient MasterClient
 }
 
-func newAllocatorImpl(masterClient MasterClient) *allocatorImpl {
-	return &allocatorImpl{
+func newAllocator(masterClient MasterClient) *allocator {
+	return &allocator{
 		masterClient: masterClient,
 	}
 }
 
-func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
+func (allocator *allocator) allocTimestamp() (Timestamp, error) {
 	ctx := context.TODO()
 	resp, err := allocator.masterClient.AllocTimestamp(ctx, &masterpb.TsoRequest{
 		Base: &commonpb.MsgBase{
@@ -39,7 +39,7 @@ func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
 	return resp.Timestamp, nil
 }
 
-func (allocator *allocatorImpl) allocID() (UniqueID, error) {
+func (allocator *allocator) allocID() (UniqueID, error) {
 	ctx := context.TODO()
 	resp, err := allocator.masterClient.AllocID(ctx, &masterpb.IDRequest{
 		Base: &commonpb.MsgBase{
diff --git a/internal/dataservice/dd_handler.go b/internal/dataservice/dd_handler.go
index b15602e3e..9425224d8 100644
--- a/internal/dataservice/dd_handler.go
+++ b/internal/dataservice/dd_handler.go
@@ -10,10 +10,10 @@ import (
 
 type ddHandler struct {
 	meta             *meta
-	segmentAllocator segmentAllocator
+	segmentAllocator segmentAllocatorInterface
 }
 
-func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler {
+func newDDHandler(meta *meta, allocator segmentAllocatorInterface) *ddHandler {
 	return &ddHandler{
 		meta:             meta,
 		segmentAllocator: allocator,
diff --git a/internal/dataservice/mock.go b/internal/dataservice/mock.go
index 34a604064..a701083ca 100644
--- a/internal/dataservice/mock.go
+++ b/internal/dataservice/mock.go
@@ -15,7 +15,7 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
 )
 
-func newMemoryMeta(allocator allocator) (*meta, error) {
+func newMemoryMeta(allocator allocatorInterface) (*meta, error) {
 	memoryKV := memkv.NewMemoryKV()
 	return newMeta(memoryKV)
 }
diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go
index a85dd4111..7edcc3eaa 100644
--- a/internal/dataservice/segment_allocator.go
+++ b/internal/dataservice/segment_allocator.go
@@ -26,7 +26,7 @@ func (err errRemainInSufficient) Error() string {
 }
 
 // segmentAllocator is used to allocate rows for segments and record the allocations.
-type segmentAllocator interface {
+type segmentAllocatorInterface interface {
 	// OpenSegment add the segment to allocator and set it allocatable
 	OpenSegment(segmentInfo *datapb.SegmentInfo) error
 	// AllocSegment allocate rows and record the allocation.
@@ -45,34 +45,32 @@ type segmentAllocator interface {
 	IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error)
 }
 
-type (
-	segmentStatus struct {
-		id             UniqueID
-		collectionID   UniqueID
-		partitionID    UniqueID
-		total          int
-		sealed         bool
-		lastExpireTime Timestamp
-		allocations    []*allocation
-		insertChannel  string
-	}
-	allocation struct {
-		rowNums    int
-		expireTime Timestamp
-	}
-	segmentAllocatorImpl struct {
-		mt                     *meta
-		segments               map[UniqueID]*segmentStatus //segment id -> status
-		segmentExpireDuration  int64
-		segmentThreshold       float64
-		segmentThresholdFactor float64
-		mu                     sync.RWMutex
-		allocator              allocator
-	}
-)
+type segmentStatus struct {
+	id             UniqueID
+	collectionID   UniqueID
+	partitionID    UniqueID
+	total          int
+	sealed         bool
+	lastExpireTime Timestamp
+	allocations    []*allocation
+	insertChannel  string
+}
+type allocation struct {
+	rowNums    int
+	expireTime Timestamp
+}
+type segmentAllocator struct {
+	mt                     *meta
+	segments               map[UniqueID]*segmentStatus //segment id -> status
+	segmentExpireDuration  int64
+	segmentThreshold       float64
+	segmentThresholdFactor float64
+	mu                     sync.RWMutex
+	allocator              allocatorInterface
+}
 
-func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl {
-	segmentAllocator := &segmentAllocatorImpl{
+func newSegmentAllocator(meta *meta, allocator allocatorInterface) *segmentAllocator {
+	segmentAllocator := &segmentAllocator{
 		mt:                     meta,
 		segments:               make(map[UniqueID]*segmentStatus),
 		segmentExpireDuration:  Params.SegIDAssignExpiration,
@@ -83,7 +81,7 @@ func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl
 	return segmentAllocator
 }
 
-func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
+func (allocator *segmentAllocator) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
@@ -105,7 +103,7 @@ func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentIn
 	return nil
 }
 
-func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID,
+func (allocator *segmentAllocator) AllocSegment(collectionID UniqueID,
 	partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
@@ -133,7 +131,7 @@ func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID,
 	return
 }
 
-func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
+func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
 	totalOfAllocations := 0
 	for _, allocation := range segStatus.allocations {
 		totalOfAllocations += allocation.rowNums
@@ -163,7 +161,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i
 	return true, nil
 }
 
-func (allocator *segmentAllocatorImpl) estimateTotalRows(collectionID UniqueID) (int, error) {
+func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
 	collMeta, err := allocator.mt.GetCollection(collectionID)
 	if err != nil {
 		return -1, err
@@ -175,7 +173,7 @@ func (allocator *segmentAllocatorImpl) estimateTotalRows(collectionID UniqueID)
 	return int(allocator.segmentThreshold / float64(sizePerRecord)), nil
 }
 
-func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) {
+func (allocator *segmentAllocator) GetSealedSegments() ([]UniqueID, error) {
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	keys := make([]UniqueID, 0)
@@ -194,7 +192,7 @@ func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) {
 	return keys, nil
 }
 
-func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
+func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
 	segMeta, err := allocator.mt.GetSegment(segStatus.id)
 	if err != nil {
 		return false, err
@@ -202,7 +200,7 @@ func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStat
 	return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil
 }
 
-func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error {
+func (allocator *segmentAllocator) SealSegment(segmentID UniqueID) error {
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	status, ok := allocator.segments[segmentID]
@@ -213,13 +211,13 @@ func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error {
 	return nil
 }
 
-func (allocator *segmentAllocatorImpl) DropSegment(segmentID UniqueID) {
+func (allocator *segmentAllocator) DropSegment(segmentID UniqueID) {
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	delete(allocator.segments, segmentID)
 }
 
-func (allocator *segmentAllocatorImpl) ExpireAllocations(timeTick Timestamp) error {
+func (allocator *segmentAllocator) ExpireAllocations(timeTick Timestamp) error {
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	for _, segStatus := range allocator.segments {
@@ -234,7 +232,7 @@ func (allocator *segmentAllocatorImpl) ExpireAllocations(timeTick Timestamp) err
 	return nil
 }
 
-func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) {
+func (allocator *segmentAllocator) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) {
 	allocator.mu.RLock()
 	defer allocator.mu.RUnlock()
 	status, ok := allocator.segments[segmentID]
@@ -244,7 +242,7 @@ func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID,
 	return status.lastExpireTime <= ts, nil
 }
 
-func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) {
+func (allocator *segmentAllocator) SealAllSegments(collectionID UniqueID) {
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	for _, status := range allocator.segments {
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index 810a32f5c..c4d389ef0 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -32,8 +32,8 @@ import (
 const role = "dataservice"
 
 type DataService interface {
-	typeutil.Service
 	typeutil.Component
+
 	RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
 	Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error)
 
@@ -77,10 +77,10 @@ type (
 		state             atomic.Value
 		client            *etcdkv.EtcdKV
 		meta              *meta
-		segAllocator      segmentAllocator
+		segAllocator      segmentAllocatorInterface
 		statsHandler      *statsHandler
 		ddHandler         *ddHandler
-		allocator         allocator
+		allocator         allocatorInterface
 		cluster           *dataNodeCluster
 		msgProducer       *timesync.MsgProducer
 		registerFinishCh  chan struct{}
@@ -136,7 +136,7 @@ func (s *Server) Start() error {
 		return err
 	}
 
-	s.allocator = newAllocatorImpl(s.masterClient)
+	s.allocator = newAllocator(s.masterClient)
 	if err = s.initMeta(); err != nil {
 		return err
 	}
diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go
index 9a3a33450..00016d013 100644
--- a/internal/dataservice/watcher.go
+++ b/internal/dataservice/watcher.go
@@ -13,18 +13,18 @@ import (
 
 type (
 	proxyTimeTickWatcher struct {
-		allocator segmentAllocator
+		allocator segmentAllocatorInterface
 		msgQueue  chan *msgstream.TimeTickMsg
 	}
 	dataNodeTimeTickWatcher struct {
 		meta      *meta
 		cluster   *dataNodeCluster
-		allocator segmentAllocator
+		allocator segmentAllocatorInterface
 		msgQueue  chan *msgstream.TimeTickMsg
 	}
 )
 
-func newProxyTimeTickWatcher(allocator segmentAllocator) *proxyTimeTickWatcher {
+func newProxyTimeTickWatcher(allocator segmentAllocatorInterface) *proxyTimeTickWatcher {
 	return &proxyTimeTickWatcher{
 		allocator: allocator,
 		msgQueue:  make(chan *msgstream.TimeTickMsg, 1),
@@ -49,7 +49,7 @@ func (watcher *proxyTimeTickWatcher) Watch(msg *msgstream.TimeTickMsg) {
 	watcher.msgQueue <- msg
 }
 
-func newDataNodeTimeTickWatcher(meta *meta, allocator segmentAllocator, cluster *dataNodeCluster) *dataNodeTimeTickWatcher {
+func newDataNodeTimeTickWatcher(meta *meta, allocator segmentAllocatorInterface, cluster *dataNodeCluster) *dataNodeTimeTickWatcher {
 	return &dataNodeTimeTickWatcher{
 		meta:      meta,
 		allocator: allocator,
diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go
index 8ca9e909f..a1af89377 100644
--- a/internal/distributed/indexnode/service.go
+++ b/internal/distributed/indexnode/service.go
@@ -20,7 +20,7 @@ import (
 )
 
 type Server struct {
-	impl *indexnode.NodeImpl
+	impl *indexnode.IndexNode
 
 	grpcServer  *grpc.Server
 	grpcErrChan chan error
@@ -164,7 +164,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty
 
 func NewServer(ctx context.Context) (*Server, error) {
 	ctx1, cancel := context.WithCancel(ctx)
-	node, err := indexnode.NewNodeImpl(ctx1)
+	node, err := indexnode.NewIndexNode(ctx1)
 	if err != nil {
 		defer cancel()
 		return nil, err
diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go
index 8432dcafb..9233632d7 100644
--- a/internal/distributed/indexservice/service.go
+++ b/internal/distributed/indexservice/service.go
@@ -26,7 +26,7 @@ type UniqueID = typeutil.UniqueID
 type Timestamp = typeutil.Timestamp
 
 type Server struct {
-	impl *indexservice.ServiceImpl
+	impl *indexservice.IndexService
 
 	grpcServer  *grpc.Server
 	grpcErrChan chan error
@@ -161,7 +161,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty
 func NewServer(ctx context.Context) (*Server, error) {
 
 	ctx1, cancel := context.WithCancel(ctx)
-	serverImp, err := indexservice.NewServiceImpl(ctx)
+	serverImp, err := indexservice.NewIndexService(ctx)
 	if err != nil {
 		defer cancel()
 		return nil, err
diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go
index ca1718e8b..aa5d5f91c 100644
--- a/internal/distributed/proxynode/service.go
+++ b/internal/distributed/proxynode/service.go
@@ -37,7 +37,7 @@ const (
 type Server struct {
 	ctx        context.Context
 	wg         sync.WaitGroup
-	impl       *proxynode.NodeImpl
+	impl       *proxynode.ProxyNode
 	grpcServer *grpc.Server
 
 	grpcErrChan chan error
@@ -60,7 +60,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
 		grpcErrChan: make(chan error),
 	}
 
-	server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory)
+	server.impl, err = proxynode.NewProxyNode(server.ctx, factory)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go
index fc20da405..201f40070 100644
--- a/internal/distributed/proxyservice/service.go
+++ b/internal/distributed/proxyservice/service.go
@@ -30,7 +30,7 @@ type Server struct {
 	grpcServer  *grpc.Server
 	grpcErrChan chan error
 
-	impl *proxyservice.ServiceImpl
+	impl *proxyservice.ProxyService
 
 	tracer opentracing.Tracer
 	closer io.Closer
@@ -60,7 +60,7 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error)
 	}
 	opentracing.SetGlobalTracer(server.tracer)
 
-	server.impl, err = proxyservice.NewServiceImpl(server.ctx, factory)
+	server.impl, err = proxyservice.NewProxyService(server.ctx, factory)
 	if err != nil {
 		return nil, err
 	}
@@ -131,7 +131,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
 }
 
 func (s *Server) start() error {
-	log.Println("proxy ServiceImpl start ...")
+	log.Println("proxy ProxyService start ...")
 	if err := s.impl.Start(); err != nil {
 		return err
 	}
diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go
index 44e2aad02..d3f06038a 100644
--- a/internal/indexnode/indexnode.go
+++ b/internal/indexnode/indexnode.go
@@ -29,7 +29,7 @@ const (
 type UniqueID = typeutil.UniqueID
 type Timestamp = typeutil.Timestamp
 
-type NodeImpl struct {
+type IndexNode struct {
 	stateCode internalpb2.StateCode
 
 	loopCtx    context.Context
@@ -48,9 +48,9 @@ type NodeImpl struct {
 	closer io.Closer
 }
 
-func NewNodeImpl(ctx context.Context) (*NodeImpl, error) {
+func NewIndexNode(ctx context.Context) (*IndexNode, error) {
 	ctx1, cancel := context.WithCancel(ctx)
-	b := &NodeImpl{
+	b := &IndexNode{
 		loopCtx:    ctx1,
 		loopCancel: cancel,
 	}
@@ -62,7 +62,7 @@ func NewNodeImpl(ctx context.Context) (*NodeImpl, error) {
 	return b, nil
 }
 
-func (i *NodeImpl) Init() error {
+func (i *IndexNode) Init() error {
 	ctx := context.Background()
 	err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 10, time.Second)
 
@@ -125,7 +125,7 @@ func (i *NodeImpl) Init() error {
 	return nil
 }
 
-func (i *NodeImpl) Start() error {
+func (i *IndexNode) Start() error {
 	i.sched.Start()
 
 	// Start callbacks
@@ -136,7 +136,7 @@ func (i *NodeImpl) Start() error {
 }
 
 // Close closes the server.
-func (i *NodeImpl) Stop() error {
+func (i *IndexNode) Stop() error {
 	if err := i.closer.Close(); err != nil {
 		return err
 	}
@@ -151,15 +151,15 @@ func (i *NodeImpl) Stop() error {
 	return nil
 }
 
-func (i *NodeImpl) UpdateStateCode(code internalpb2.StateCode) {
+func (i *IndexNode) UpdateStateCode(code internalpb2.StateCode) {
 	i.stateCode = code
 }
 
-func (i *NodeImpl) SetIndexServiceClient(serviceClient typeutil.IndexServiceInterface) {
+func (i *IndexNode) SetIndexServiceClient(serviceClient typeutil.IndexServiceInterface) {
 	i.serviceClient = serviceClient
 }
 
-func (i *NodeImpl) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
+func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
 	t := &IndexBuildTask{
 		BaseTask: BaseTask{
 			ctx:  ctx,
@@ -185,7 +185,7 @@ func (i *NodeImpl) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCm
 	return ret, nil
 }
 
-func (i *NodeImpl) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
+func (i *IndexNode) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
 	i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID)
 	return &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_SUCCESS,
@@ -194,16 +194,16 @@ func (i *NodeImpl) DropIndex(ctx context.Context, request *indexpb.DropIndexRequ
 }
 
 // AddStartCallback adds a callback in the startServer phase.
-func (i *NodeImpl) AddStartCallback(callbacks ...func()) {
+func (i *IndexNode) AddStartCallback(callbacks ...func()) {
 	i.startCallbacks = append(i.startCallbacks, callbacks...)
 }
 
 // AddCloseCallback adds a callback in the Close phase.
-func (i *NodeImpl) AddCloseCallback(callbacks ...func()) {
+func (i *IndexNode) AddCloseCallback(callbacks ...func()) {
 	i.closeCallbacks = append(i.closeCallbacks, callbacks...)
 }
 
-func (i *NodeImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
+func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
 
 	stateInfo := &internalpb2.ComponentInfo{
 		NodeID:    Params.NodeID,
@@ -221,7 +221,7 @@ func (i *NodeImpl) GetComponentStates(ctx context.Context) (*internalpb2.Compone
 	return ret, nil
 }
 
-func (i *NodeImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
+func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
 	return &milvuspb.StringResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_SUCCESS,
@@ -229,7 +229,7 @@ func (i *NodeImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResp
 	}, nil
 }
 
-func (i *NodeImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
+func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
 	return &milvuspb.StringResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_SUCCESS,
diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go
index f61cf4b34..e6cd088f4 100644
--- a/internal/indexservice/indexservice.go
+++ b/internal/indexservice/indexservice.go
@@ -30,7 +30,7 @@ const (
 	reqTimeoutInterval = time.Second * 10
 )
 
-type ServiceImpl struct {
+type IndexService struct {
 	nodeClients *PriorityQueue
 	nodeStates  map[UniqueID]*internalpb2.ComponentStates
 	stateCode   internalpb2.StateCode
@@ -59,9 +59,9 @@ type ServiceImpl struct {
 type UniqueID = typeutil.UniqueID
 type Timestamp = typeutil.Timestamp
 
-func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) {
+func NewIndexService(ctx context.Context) (*IndexService, error) {
 	ctx1, cancel := context.WithCancel(ctx)
-	i := &ServiceImpl{
+	i := &IndexService{
 		loopCtx:     ctx1,
 		loopCancel:  cancel,
 		nodeClients: &PriorityQueue{},
@@ -70,7 +70,7 @@ func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) {
 	return i, nil
 }
 
-func (i *ServiceImpl) Init() error {
+func (i *IndexService) Init() error {
 	etcdAddress := Params.EtcdAddress
 	log.Println("etcd address = ", etcdAddress)
 	connectEtcdFn := func() error {
@@ -125,7 +125,7 @@ func (i *ServiceImpl) Init() error {
 	return nil
 }
 
-func (i *ServiceImpl) Start() error {
+func (i *IndexService) Start() error {
 	i.loopWg.Add(1)
 	go i.tsLoop()
 
@@ -134,12 +134,12 @@ func (i *ServiceImpl) Start() error {
 	for _, cb := range i.startCallbacks {
 		cb()
 	}
-	log.Print("ServiceImpl  start")
+	log.Print("IndexService  start")
 
 	return nil
 }
 
-func (i *ServiceImpl) Stop() error {
+func (i *IndexService) Stop() error {
 	i.loopCancel()
 	i.sched.Close()
 	for _, cb := range i.closeCallbacks {
@@ -148,15 +148,15 @@ func (i *ServiceImpl) Stop() error {
 	return nil
 }
 
-func (i *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
+func (i *IndexService) UpdateStateCode(code internalpb2.StateCode) {
 	i.stateCode = code
 }
 
-func (i *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
+func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
 
 	stateInfo := &internalpb2.ComponentInfo{
 		NodeID:    i.ID,
-		Role:      "ServiceImpl",
+		Role:      "IndexService",
 		StateCode: i.stateCode,
 	}
 
@@ -170,7 +170,7 @@ func (i *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.Comp
 	return ret, nil
 }
 
-func (i *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
+func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
 	return &milvuspb.StringResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_SUCCESS,
@@ -180,7 +180,7 @@ func (i *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR
 	}, nil
 }
 
-func (i *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
+func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
 	return &milvuspb.StringResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_SUCCESS,
@@ -190,7 +190,7 @@ func (i *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
 	}, nil
 }
 
-func (i *ServiceImpl) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
+func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
 	fmt.Println("builder building index ..., indexName = ", req.IndexName, "indexID = ", req.IndexID, "dataPath = ", req.DataPaths)
 	ret := &indexpb.BuildIndexResponse{
 		Status: &commonpb.Status{
@@ -245,7 +245,7 @@ func (i *ServiceImpl) BuildIndex(ctx context.Context, req *indexpb.BuildIndexReq
 	return ret, nil
 }
 
-func (i *ServiceImpl) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
+func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
 	var indexStates []*indexpb.IndexInfo
 	for _, indexID := range req.IndexBuildIDs {
 		indexState, err := i.metaTable.GetIndexState(indexID)
@@ -263,7 +263,7 @@ func (i *ServiceImpl) GetIndexStates(ctx context.Context, req *indexpb.IndexStat
 	return ret, nil
 }
 
-func (i *ServiceImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
+func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
 	i.sched.IndexAddQueue.tryToRemoveUselessIndexAddTask(req.IndexID)
 
 	err := i.metaTable.MarkIndexAsDeleted(req.IndexID)
@@ -292,7 +292,7 @@ func (i *ServiceImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexReque
 	}, nil
 }
 
-func (i *ServiceImpl) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
+func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
 	var indexPaths []*indexpb.IndexFilePathInfo = nil
 
 	for _, indexID := range req.IndexBuildIDs {
@@ -312,7 +312,7 @@ func (i *ServiceImpl) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexF
 	return ret, nil
 }
 
-func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) {
+func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) {
 	ret := &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_SUCCESS,
 	}
@@ -327,7 +327,7 @@ func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIn
 	return ret, nil
 }
 
-func (i *ServiceImpl) tsLoop() {
+func (i *IndexService) tsLoop() {
 	tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
 	defer tsoTicker.Stop()
 	ctx, cancel := context.WithCancel(i.loopCtx)
diff --git a/internal/indexservice/node_mgr.go b/internal/indexservice/node_mgr.go
index ea6dd78b4..bc036c0b4 100644
--- a/internal/indexservice/node_mgr.go
+++ b/internal/indexservice/node_mgr.go
@@ -12,13 +12,13 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 )
 
-func (i *ServiceImpl) removeNode(nodeID UniqueID) {
+func (i *IndexService) removeNode(nodeID UniqueID) {
 	i.nodeLock.Lock()
 	defer i.nodeLock.Unlock()
 	i.nodeClients.Remove(nodeID)
 }
 
-func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error {
+func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error {
 	i.nodeLock.Lock()
 	defer i.nodeLock.Unlock()
 
@@ -46,7 +46,7 @@ func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest)
 	return nil
 }
 
-func (i *ServiceImpl) prepareNodeInitParams() []*commonpb.KeyValuePair {
+func (i *IndexService) prepareNodeInitParams() []*commonpb.KeyValuePair {
 	var params []*commonpb.KeyValuePair
 	params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: Params.MinIOAddress})
 	params = append(params, &commonpb.KeyValuePair{Key: "minio.accessKeyID", Value: Params.MinIOAccessKeyID})
@@ -56,7 +56,7 @@ func (i *ServiceImpl) prepareNodeInitParams() []*commonpb.KeyValuePair {
 	return params
 }
 
-func (i *ServiceImpl) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
+func (i *IndexService) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
 	ret := &indexpb.RegisterNodeResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@@ -65,7 +65,7 @@ func (i *ServiceImpl) RegisterNode(ctx context.Context, req *indexpb.RegisterNod
 
 	nodeID, err := i.idAllocator.AllocOne()
 	if err != nil {
-		ret.Status.Reason = "ServiceImpl:RegisterNode Failed to acquire NodeID"
+		ret.Status.Reason = "IndexService:RegisterNode Failed to acquire NodeID"
 		return ret, nil
 	}
 
diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go
index 2ef51384e..d8fe51327 100644
--- a/internal/masterservice/master_service.go
+++ b/internal/masterservice/master_service.go
@@ -60,10 +60,7 @@ type QueryServiceInterface interface {
 
 type Interface interface {
 	//service
-	Init() error
-	Start() error
-	Stop() error
-	GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
+	typeutil.Component
 
 	//DDL request
 	CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go
index a45b7fbb1..bcd59c9a5 100644
--- a/internal/masterservice/meta_table_test.go
+++ b/internal/masterservice/meta_table_test.go
@@ -164,7 +164,8 @@ func TestMetaTable(t *testing.T) {
 			IndexParams: params,
 		}
 
-		_, field, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo)
+		ids, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo)
+		assert.Nil(t, ids)
 		assert.NotNil(t, err)
 		seg, field, err := mt.GetNotIndexedSegments("testColl", "field110", idxInfo)
 		assert.Nil(t, err)
diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go
index 1c88955e1..faaca797e 100644
--- a/internal/proxynode/impl.go
+++ b/internal/proxynode/impl.go
@@ -21,11 +21,11 @@ const (
 	reqTimeoutInterval = time.Second * 10
 )
 
-func (node *NodeImpl) UpdateStateCode(code internalpb2.StateCode) {
+func (node *ProxyNode) UpdateStateCode(code internalpb2.StateCode) {
 	node.stateCode.Store(code)
 }
 
-func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
 	collectionName := request.CollectionName
 	globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
 	return &commonpb.Status{
@@ -34,7 +34,7 @@ func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request
 	}, nil
 }
 
-func (node *NodeImpl) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
 	log.Println("create collection: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -66,7 +66,7 @@ func (node *NodeImpl) CreateCollection(ctx context.Context, request *milvuspb.Cr
 	return cct.result, nil
 }
 
-func (node *NodeImpl) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
 	log.Println("drop collection: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -97,7 +97,7 @@ func (node *NodeImpl) DropCollection(ctx context.Context, request *milvuspb.Drop
 	return dct.result, nil
 }
 
-func (node *NodeImpl) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
+func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
 	log.Println("has collection: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -132,7 +132,7 @@ func (node *NodeImpl) HasCollection(ctx context.Context, request *milvuspb.HasCo
 	return hct.result, nil
 }
 
-func (node *NodeImpl) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
 	log.Println("load collection: ", request)
 	//ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	//defer cancel()
@@ -163,7 +163,7 @@ func (node *NodeImpl) LoadCollection(ctx context.Context, request *milvuspb.Load
 	return lct.result, nil
 }
 
-func (node *NodeImpl) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
 	log.Println("release collection: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -194,7 +194,7 @@ func (node *NodeImpl) ReleaseCollection(ctx context.Context, request *milvuspb.R
 	return rct.result, nil
 }
 
-func (node *NodeImpl) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
+func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
 	log.Println("describe collection: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -229,7 +229,7 @@ func (node *NodeImpl) DescribeCollection(ctx context.Context, request *milvuspb.
 	return dct.result, nil
 }
 
-func (node *NodeImpl) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
+func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
 	log.Println("get collection statistics")
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -263,7 +263,7 @@ func (node *NodeImpl) GetCollectionStatistics(ctx context.Context, request *milv
 	return g.result, nil
 }
 
-func (node *NodeImpl) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
+func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
 	log.Println("show collections")
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -297,7 +297,7 @@ func (node *NodeImpl) ShowCollections(ctx context.Context, request *milvuspb.Sho
 	return sct.result, nil
 }
 
-func (node *NodeImpl) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
 	log.Println("create partition", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -326,7 +326,7 @@ func (node *NodeImpl) CreatePartition(ctx context.Context, request *milvuspb.Cre
 	return cpt.result, nil
 }
 
-func (node *NodeImpl) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
 	log.Println("drop partition: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -356,7 +356,7 @@ func (node *NodeImpl) DropPartition(ctx context.Context, request *milvuspb.DropP
 	return dpt.result, nil
 }
 
-func (node *NodeImpl) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
+func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
 	log.Println("has partition: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -392,7 +392,7 @@ func (node *NodeImpl) HasPartition(ctx context.Context, request *milvuspb.HasPar
 	return hpt.result, nil
 }
 
-func (node *NodeImpl) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
 	log.Println("load partitions: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -423,7 +423,7 @@ func (node *NodeImpl) LoadPartitions(ctx context.Context, request *milvuspb.Load
 	return lpt.result, nil
 }
 
-func (node *NodeImpl) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
 	log.Println("load partitions: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -454,11 +454,11 @@ func (node *NodeImpl) ReleasePartitions(ctx context.Context, request *milvuspb.R
 	return rpt.result, nil
 }
 
-func (node *NodeImpl) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
+func (node *ProxyNode) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
 	panic("implement me")
 }
 
-func (node *NodeImpl) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
+func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
 	log.Println("show partitions: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -493,7 +493,7 @@ func (node *NodeImpl) ShowPartitions(ctx context.Context, request *milvuspb.Show
 	return spt.result, nil
 }
 
-func (node *NodeImpl) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
 	log.Println("create index for: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -523,7 +523,7 @@ func (node *NodeImpl) CreateIndex(ctx context.Context, request *milvuspb.CreateI
 	return cit.result, nil
 }
 
-func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
+func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
 	log.Println("Describe index for: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -557,7 +557,7 @@ func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.Descr
 	return dit.result, nil
 }
 
-func (node *NodeImpl) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
 	log.Println("Drop index for: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -584,7 +584,7 @@ func (node *NodeImpl) DropIndex(ctx context.Context, request *milvuspb.DropIndex
 	return dit.result, nil
 }
 
-func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
+func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
 	// log.Println("Describe index progress for: ", request)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -619,7 +619,7 @@ func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.Index
 	return dipt.result, nil
 }
 
-func (node *NodeImpl) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
+func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
 
@@ -671,7 +671,7 @@ func (node *NodeImpl) Insert(ctx context.Context, request *milvuspb.InsertReques
 	return it.result, nil
 }
 
-func (node *NodeImpl) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
+func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
 
@@ -713,7 +713,7 @@ func (node *NodeImpl) Search(ctx context.Context, request *milvuspb.SearchReques
 	return qt.result, nil
 }
 
-func (node *NodeImpl) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
+func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
 	log.Println("AA Flush collections: ", request.CollectionNames)
 	ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
 	defer cancel()
@@ -743,11 +743,11 @@ func (node *NodeImpl) Flush(ctx context.Context, request *milvuspb.FlushRequest)
 	return ft.result, nil
 }
 
-func (node *NodeImpl) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) {
+func (node *ProxyNode) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) {
 	panic("implement me")
 }
 
-func (node *NodeImpl) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
+func (node *ProxyNode) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
 	resp := &milvuspb.PersistentSegmentInfoResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@@ -794,7 +794,7 @@ func (node *NodeImpl) GetPersistentSegmentInfo(ctx context.Context, req *milvusp
 	return resp, nil
 }
 
-func (node *NodeImpl) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) {
+func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) {
 	resp := &milvuspb.QuerySegmentInfoResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
@@ -839,7 +839,7 @@ func (node *NodeImpl) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Que
 	return resp, nil
 }
 
-func (node *NodeImpl) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
+func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
 	describeCollectionResponse, err := node.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
 		Base: &commonpb.MsgBase{
 			MsgType:   commonpb.MsgType_kDescribeCollection,
@@ -898,7 +898,7 @@ func (node *NodeImpl) getSegmentsOfCollection(ctx context.Context, dbName string
 	return ret, nil
 }
 
-func (node *NodeImpl) RegisterLink(request *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) {
+func (node *ProxyNode) RegisterLink(request *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) {
 	code := node.stateCode.Load().(internalpb2.StateCode)
 	if code != internalpb2.StateCode_HEALTHY {
 		return &milvuspb.RegisterLinkResponse{
diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go
index ed3598154..64f2ae3c3 100644
--- a/internal/proxynode/insert_channels.go
+++ b/internal/proxynode/insert_channels.go
@@ -75,7 +75,7 @@ type InsertChannelsMap struct {
 	droppedBitMap               []int                 // 0 -> normal, 1 -> dropped
 	usageHistogram              []int                 // message stream can be closed only when the use count is zero
 	mtx                         sync.RWMutex
-	nodeInstance                *NodeImpl
+	nodeInstance                *ProxyNode
 	msFactory                   msgstream.Factory
 }
 
@@ -188,7 +188,7 @@ func (m *InsertChannelsMap) closeAllMsgStream() {
 	m.usageHistogram = make([]int, 0)
 }
 
-func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap {
+func newInsertChannelsMap(node *ProxyNode) *InsertChannelsMap {
 	return &InsertChannelsMap{
 		collectionID2InsertChannels: make(map[UniqueID]int),
 		insertChannels:              make([][]string, 0),
@@ -202,6 +202,6 @@ func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap {
 
 var globalInsertChannelsMap *InsertChannelsMap
 
-func initGlobalInsertChannelsMap(node *NodeImpl) {
+func initGlobalInsertChannelsMap(node *ProxyNode) {
 	globalInsertChannelsMap = newInsertChannelsMap(node)
 }
diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go
index ba9f27d6e..4322ce790 100644
--- a/internal/proxynode/interface.go
+++ b/internal/proxynode/interface.go
@@ -67,7 +67,7 @@ type ProxyServiceClient interface {
 	GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
 }
 
-type ProxyNode interface {
+type Service interface {
 	typeutil.Service
 
 	InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go
index 0a3943563..b3f1c054c 100644
--- a/internal/proxynode/proxy_node.go
+++ b/internal/proxynode/proxy_node.go
@@ -26,7 +26,7 @@ import (
 type UniqueID = typeutil.UniqueID
 type Timestamp = typeutil.Timestamp
 
-type NodeImpl struct {
+type ProxyNode struct {
 	ctx    context.Context
 	cancel func()
 	wg     sync.WaitGroup
@@ -59,10 +59,10 @@ type NodeImpl struct {
 	closeCallbacks []func()
 }
 
-func NewProxyNodeImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error) {
+func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) {
 	rand.Seed(time.Now().UnixNano())
 	ctx1, cancel := context.WithCancel(ctx)
-	node := &NodeImpl{
+	node := &ProxyNode{
 		ctx:       ctx1,
 		cancel:    cancel,
 		msFactory: factory,
@@ -76,7 +76,7 @@ type Component interface {
 	GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
 }
 
-func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error {
+func (node *ProxyNode) waitForServiceReady(ctx context.Context, service Component, serviceName string) error {
 
 	checkFunc := func() error {
 		resp, err := service.GetComponentStates(ctx)
@@ -100,7 +100,7 @@ func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component
 	return nil
 }
 
-func (node *NodeImpl) Init() error {
+func (node *ProxyNode) Init() error {
 	// todo wait for proxyservice state changed to Healthy
 	ctx := context.Background()
 
@@ -211,10 +211,10 @@ func (node *NodeImpl) Init() error {
 
 	node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx)
 	node.manipulationMsgStream.AsProducer(Params.InsertChannelNames)
-	repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
+	repackFunc := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
 		return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
 	}
-	node.manipulationMsgStream.SetRepackFunc(repackFuncImpl)
+	node.manipulationMsgStream.SetRepackFunc(repackFunc)
 	log.Println("create manipulation message stream ...")
 
 	node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory)
@@ -227,7 +227,7 @@ func (node *NodeImpl) Init() error {
 	return nil
 }
 
-func (node *NodeImpl) Start() error {
+func (node *ProxyNode) Start() error {
 	err := InitMetaCache(node.masterClient)
 	if err != nil {
 		return err
@@ -269,7 +269,7 @@ func (node *NodeImpl) Start() error {
 	return nil
 }
 
-func (node *NodeImpl) Stop() error {
+func (node *ProxyNode) Stop() error {
 	node.cancel()
 
 	globalInsertChannelsMap.closeAllMsgStream()
@@ -291,35 +291,35 @@ func (node *NodeImpl) Stop() error {
 }
 
 // AddStartCallback adds a callback in the startServer phase.
-func (node *NodeImpl) AddStartCallback(callbacks ...func()) {
+func (node *ProxyNode) AddStartCallback(callbacks ...func()) {
 	node.startCallbacks = append(node.startCallbacks, callbacks...)
 }
 
-func (node *NodeImpl) lastTick() Timestamp {
+func (node *ProxyNode) lastTick() Timestamp {
 	return node.tick.LastTick()
 }
 
 // AddCloseCallback adds a callback in the Close phase.
-func (node *NodeImpl) AddCloseCallback(callbacks ...func()) {
+func (node *ProxyNode) AddCloseCallback(callbacks ...func()) {
 	node.closeCallbacks = append(node.closeCallbacks, callbacks...)
 }
 
-func (node *NodeImpl) SetMasterClient(cli MasterClient) {
+func (node *ProxyNode) SetMasterClient(cli MasterClient) {
 	node.masterClient = cli
 }
 
-func (node *NodeImpl) SetIndexServiceClient(cli IndexServiceClient) {
+func (node *ProxyNode) SetIndexServiceClient(cli IndexServiceClient) {
 	node.indexServiceClient = cli
 }
 
-func (node *NodeImpl) SetDataServiceClient(cli DataServiceClient) {
+func (node *ProxyNode) SetDataServiceClient(cli DataServiceClient) {
 	node.dataServiceClient = cli
 }
 
-func (node *NodeImpl) SetProxyServiceClient(cli ProxyServiceClient) {
+func (node *ProxyNode) SetProxyServiceClient(cli ProxyServiceClient) {
 	node.proxyServiceClient = cli
 }
 
-func (node *NodeImpl) SetQueryServiceClient(cli QueryServiceClient) {
+func (node *ProxyNode) SetQueryServiceClient(cli QueryServiceClient) {
 	node.queryServiceClient = cli
 }
diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go
index de4a36503..b6a798ffa 100644
--- a/internal/proxynode/task.go
+++ b/internal/proxynode/task.go
@@ -579,9 +579,9 @@ func (st *SearchTask) Execute(ctx context.Context) error {
 	}
 	msgPack.Msgs[0] = tsMsg
 	err := st.queryMsgStream.Produce(ctx, msgPack)
-	log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs))
+	log.Printf("[ProxyNode] length of searchMsg: %v", len(msgPack.Msgs))
 	if err != nil {
-		log.Printf("[NodeImpl] send search request failed: %v", err)
+		log.Printf("[ProxyNode] send search request failed: %v", err)
 	}
 	return err
 }
diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go
index 21a70a189..d29e27294 100644
--- a/internal/proxynode/task_scheduler.go
+++ b/internal/proxynode/task_scheduler.go
@@ -173,11 +173,11 @@ func (queue *BaseTaskQueue) Enqueue(t task) error {
 	}
 
 	ts, _ := queue.sched.tsoAllocator.AllocOne()
-	// log.Printf("[NodeImpl] allocate timestamp: %v", ts)
+	// log.Printf("[ProxyNode] allocate timestamp: %v", ts)
 	t.SetTs(ts)
 
 	reqID, _ := queue.sched.idAllocator.AllocOne()
-	// log.Printf("[NodeImpl] allocate reqID: %v", reqID)
+	// log.Printf("[ProxyNode] allocate reqID: %v", reqID)
 	t.SetID(reqID)
 
 	return queue.addUnissuedTask(t)
diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go
index bb8957e08..d7a4fb26b 100644
--- a/internal/proxyservice/impl.go
+++ b/internal/proxyservice/impl.go
@@ -30,7 +30,7 @@ const (
 	MilvusYamlContent    = "milvus.yaml"
 )
 
-func (s *ServiceImpl) fillNodeInitParams() error {
+func (s *ProxyService) fillNodeInitParams() error {
 	s.nodeStartParams = make([]*commonpb.KeyValuePair, 0)
 
 	getConfigContentByName := func(fileName string) []byte {
@@ -92,7 +92,7 @@ func (s *ServiceImpl) fillNodeInitParams() error {
 	return nil
 }
 
-func (s *ServiceImpl) Init() error {
+func (s *ProxyService) Init() error {
 	err := s.fillNodeInitParams()
 	if err != nil {
 		return err
@@ -134,14 +134,14 @@ func (s *ServiceImpl) Init() error {
 	return nil
 }
 
-func (s *ServiceImpl) Start() error {
+func (s *ProxyService) Start() error {
 	s.stateCode = internalpb2.StateCode_HEALTHY
 	s.sched.Start()
 	log.Println("start scheduler ...")
 	return s.tick.Start()
 }
 
-func (s *ServiceImpl) Stop() error {
+func (s *ProxyService) Stop() error {
 	s.sched.Close()
 	log.Println("close scheduler ...")
 	s.tick.Close()
@@ -158,7 +158,7 @@ func (s *ServiceImpl) Stop() error {
 	return nil
 }
 
-func (s *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
+func (s *ProxyService) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
 	stateInfo := &internalpb2.ComponentInfo{
 		NodeID:    UniqueID(0),
 		Role:      "ProxyService",
@@ -175,11 +175,11 @@ func (s *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.Comp
 	return ret, nil
 }
 
-func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
+func (s *ProxyService) UpdateStateCode(code internalpb2.StateCode) {
 	s.stateCode = code
 }
 
-func (s *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
+func (s *ProxyService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
 	return &milvuspb.StringResponse{
 		Status: &commonpb.Status{
 			ErrorCode: commonpb.ErrorCode_SUCCESS,
@@ -188,11 +188,11 @@ func (s *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR
 	}, nil
 }
 
-func (s *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
+func (s *ProxyService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
 	panic("implement me")
 }
 
-func (s *ServiceImpl) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) {
+func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) {
 	log.Println("register link")
 	ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
 	defer cancel()
@@ -230,7 +230,7 @@ func (s *ServiceImpl) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkR
 	return t.response, nil
 }
 
-func (s *ServiceImpl) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
+func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
 	log.Println("RegisterNode: ", request)
 	ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
 	defer cancel()
@@ -271,7 +271,7 @@ func (s *ServiceImpl) RegisterNode(ctx context.Context, request *proxypb.Registe
 	return t.response, nil
 }
 
-func (s *ServiceImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
+func (s *ProxyService) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
 	log.Println("InvalidateCollectionMetaCache")
 	ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
 	defer cancel()
diff --git a/internal/proxyservice/interface.go b/internal/proxyservice/interface.go
index 8894b1475..b4f9d13b1 100644
--- a/internal/proxyservice/interface.go
+++ b/internal/proxyservice/interface.go
@@ -9,12 +9,10 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
 )
 
-type Component = typeutil.Component
-type Service = typeutil.Service
+type Service interface {
+	typeutil.Component
+	typeutil.TimeTickHandler
 
-type ProxyService interface {
-	Component
-	Service
 	RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error)
 	RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
 	// TODO: i'm sure it's not a best way to keep consistency, fix me
diff --git a/internal/proxyservice/nodeid_allocator.go b/internal/proxyservice/nodeid_allocator.go
index 29dab96f7..a759f70af 100644
--- a/internal/proxyservice/nodeid_allocator.go
+++ b/internal/proxyservice/nodeid_allocator.go
@@ -15,13 +15,13 @@ type NodeIDAllocator interface {
 	AllocOne() UniqueID
 }
 
-type NaiveNodeIDAllocatorImpl struct {
-	impl *allocator.IDAllocator
-	now  UniqueID
-	mtx  sync.Mutex
+type NaiveNodeIDAllocator struct {
+	allocator *allocator.IDAllocator
+	now       UniqueID
+	mtx       sync.Mutex
 }
 
-func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
+func (allocator *NaiveNodeIDAllocator) AllocOne() UniqueID {
 	allocator.mtx.Lock()
 	defer func() {
 		// allocator.now++
@@ -31,7 +31,7 @@ func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
 }
 
 func NewNodeIDAllocator() NodeIDAllocator {
-	return &NaiveNodeIDAllocatorImpl{
+	return &NaiveNodeIDAllocator{
 		now: 1,
 	}
 }
diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go
index 4d9b03cc7..7a3ee4f51 100644
--- a/internal/proxyservice/proxyservice.go
+++ b/internal/proxyservice/proxyservice.go
@@ -10,10 +10,10 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 )
 
-type ServiceImpl struct {
+type ProxyService struct {
 	allocator NodeIDAllocator
 	sched     *TaskScheduler
-	tick      TimeTick
+	tick      *TimeTick
 	nodeInfos *GlobalNodeInfoTable
 	stateCode internalpb2.StateCode
 
@@ -27,10 +27,10 @@ type ServiceImpl struct {
 	msFactory msgstream.Factory
 }
 
-func NewServiceImpl(ctx context.Context, factory msgstream.Factory) (*ServiceImpl, error) {
+func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) {
 	rand.Seed(time.Now().UnixNano())
 	ctx1, cancel := context.WithCancel(ctx)
-	s := &ServiceImpl{
+	s := &ProxyService{
 		ctx:       ctx1,
 		cancel:    cancel,
 		msFactory: factory,
diff --git a/internal/proxyservice/timetick.go b/internal/proxyservice/timetick.go
index 92cf0c46f..6d1638423 100644
--- a/internal/proxyservice/timetick.go
+++ b/internal/proxyservice/timetick.go
@@ -10,22 +10,15 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
 )
 
-type (
-	TimeTick interface {
-		Start() error
-		Close()
-	}
-
-	TimeTickImpl struct {
-		ttBarrier TimeTickBarrier
-		channels  []msgstream.MsgStream
-		wg        sync.WaitGroup
-		ctx       context.Context
-		cancel    context.CancelFunc
-	}
-)
+type TimeTick struct {
+	ttBarrier TimeTickBarrier
+	channels  []msgstream.MsgStream
+	wg        sync.WaitGroup
+	ctx       context.Context
+	cancel    context.CancelFunc
+}
 
-func (tt *TimeTickImpl) Start() error {
+func (tt *TimeTick) Start() error {
 	log.Println("start time tick ...")
 	tt.wg.Add(1)
 	go func() {
@@ -81,7 +74,7 @@ func (tt *TimeTickImpl) Start() error {
 	return nil
 }
 
-func (tt *TimeTickImpl) Close() {
+func (tt *TimeTick) Close() {
 	for _, channel := range tt.channels {
 		channel.Close()
 	}
@@ -90,7 +83,7 @@ func (tt *TimeTickImpl) Close() {
 	tt.wg.Wait()
 }
 
-func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) TimeTick {
+func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) *TimeTick {
 	ctx1, cancel := context.WithCancel(ctx)
-	return &TimeTickImpl{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels}
+	return &TimeTick{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels}
 }
diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go
index 5b64394d2..edba4bb44 100644
--- a/internal/querynode/collection_replica.go
+++ b/internal/querynode/collection_replica.go
@@ -33,7 +33,7 @@ import (
  * Every replica tracks a value called tSafe which is the maximum timestamp that the replica
  * is up-to-date.
  */
-type collectionReplica interface {
+type ReplicaInterface interface {
 	// collection
 	getCollectionIDs() []UniqueID
 	addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
@@ -69,12 +69,12 @@ type collectionReplica interface {
 	getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
 	replaceGrowingSegmentBySealedSegment(segment *Segment) error
 
-	getTSafe() tSafe
+	getTSafe() tSafer
 	freeAll()
 }
 
-type collectionReplicaImpl struct {
-	tSafe tSafe
+type collectionReplica struct {
+	tSafe tSafer
 
 	mu          sync.RWMutex // guards all
 	collections map[UniqueID]*Collection
@@ -83,7 +83,7 @@ type collectionReplicaImpl struct {
 }
 
 //----------------------------------------------------------------------------------------------------- collection
-func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
+func (colReplica *collectionReplica) getCollectionIDs() []UniqueID {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	collectionIDs := make([]UniqueID, 0)
@@ -93,7 +93,7 @@ func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
 	return collectionIDs
 }
 
-func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
+func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 
@@ -107,13 +107,13 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
+func (colReplica *collectionReplica) removeCollection(collectionID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 	return colReplica.removeCollectionPrivate(collectionID)
 }
 
-func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID UniqueID) error {
+func (colReplica *collectionReplica) removeCollectionPrivate(collectionID UniqueID) error {
 	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
 	if err != nil {
 		return err
@@ -131,13 +131,13 @@ func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID Un
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
+func (colReplica *collectionReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return colReplica.getCollectionByIDPrivate(collectionID)
 }
 
-func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) {
+func (colReplica *collectionReplica) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) {
 	collection, ok := colReplica.collections[collectionID]
 	if !ok {
 		return nil, fmt.Errorf("cannot find collection, id = %d", collectionID)
@@ -146,24 +146,24 @@ func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID U
 	return collection, nil
 }
 
-func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool {
+func (colReplica *collectionReplica) hasCollection(collectionID UniqueID) bool {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return colReplica.hasCollectionPrivate(collectionID)
 }
 
-func (colReplica *collectionReplicaImpl) hasCollectionPrivate(collectionID UniqueID) bool {
+func (colReplica *collectionReplica) hasCollectionPrivate(collectionID UniqueID) bool {
 	_, ok := colReplica.collections[collectionID]
 	return ok
 }
 
-func (colReplica *collectionReplicaImpl) getCollectionNum() int {
+func (colReplica *collectionReplica) getCollectionNum() int {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return len(colReplica.collections)
 }
 
-func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) {
+func (colReplica *collectionReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
@@ -175,7 +175,7 @@ func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID)
 	return collection.partitionIDs, nil
 }
 
-func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
+func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
@@ -198,7 +198,7 @@ func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collection
 	return vecFields, nil
 }
 
-func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
+func (colReplica *collectionReplica) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
@@ -218,7 +218,7 @@ func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID
 	return targetFields, nil
 }
 
-func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
+func (colReplica *collectionReplica) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
 	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
 	if err != nil {
 		return nil, err
@@ -232,13 +232,13 @@ func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collecti
 }
 
 //----------------------------------------------------------------------------------------------------- partition
-func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error {
+func (colReplica *collectionReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 	return colReplica.addPartitionPrivate(collectionID, partitionID)
 }
 
-func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
+func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
 	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
 	if err != nil {
 		return err
@@ -250,13 +250,13 @@ func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID Unique
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) removePartition(partitionID UniqueID) error {
+func (colReplica *collectionReplica) removePartition(partitionID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 	return colReplica.removePartitionPrivate(partitionID)
 }
 
-func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID UniqueID) error {
+func (colReplica *collectionReplica) removePartitionPrivate(partitionID UniqueID) error {
 	partition, err := colReplica.getPartitionByIDPrivate(partitionID)
 	if err != nil {
 		return err
@@ -279,13 +279,13 @@ func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID Uniq
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) getPartitionByID(partitionID UniqueID) (*Partition, error) {
+func (colReplica *collectionReplica) getPartitionByID(partitionID UniqueID) (*Partition, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return colReplica.getPartitionByIDPrivate(partitionID)
 }
 
-func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
+func (colReplica *collectionReplica) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
 	partition, ok := colReplica.partitions[partitionID]
 	if !ok {
 		return nil, fmt.Errorf("cannot find partition, id = %d", partitionID)
@@ -294,30 +294,30 @@ func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID Uni
 	return partition, nil
 }
 
-func (colReplica *collectionReplicaImpl) hasPartition(partitionID UniqueID) bool {
+func (colReplica *collectionReplica) hasPartition(partitionID UniqueID) bool {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return colReplica.hasPartitionPrivate(partitionID)
 }
 
-func (colReplica *collectionReplicaImpl) hasPartitionPrivate(partitionID UniqueID) bool {
+func (colReplica *collectionReplica) hasPartitionPrivate(partitionID UniqueID) bool {
 	_, ok := colReplica.partitions[partitionID]
 	return ok
 }
 
-func (colReplica *collectionReplicaImpl) getPartitionNum() int {
+func (colReplica *collectionReplica) getPartitionNum() int {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return len(colReplica.partitions)
 }
 
-func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
+func (colReplica *collectionReplica) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return colReplica.getSegmentIDsPrivate(partitionID)
 }
 
-func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
+func (colReplica *collectionReplica) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
 	partition, err2 := colReplica.getPartitionByIDPrivate(partitionID)
 	if err2 != nil {
 		return nil, err2
@@ -325,7 +325,7 @@ func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID Unique
 	return partition.segmentIDs, nil
 }
 
-func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error {
+func (colReplica *collectionReplica) enablePartition(partitionID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 
@@ -338,7 +338,7 @@ func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) e
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error {
+func (colReplica *collectionReplica) disablePartition(partitionID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 
@@ -351,7 +351,7 @@ func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID)
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID {
+func (colReplica *collectionReplica) getEnabledPartitionIDsPrivate() []UniqueID {
 	partitionIDs := make([]UniqueID, 0)
 	for _, partition := range colReplica.partitions {
 		if partition.enable {
@@ -362,13 +362,13 @@ func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []Uniqu
 }
 
 //----------------------------------------------------------------------------------------------------- segment
-func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
+func (colReplica *collectionReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 	return colReplica.addSegmentPrivate(segmentID, partitionID, collectionID, segType)
 }
 
-func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
+func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
 	collection, err := colReplica.getCollectionByIDPrivate(collectionID)
 	if err != nil {
 		return err
@@ -389,13 +389,13 @@ func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, p
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
+func (colReplica *collectionReplica) removeSegment(segmentID UniqueID) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 	return colReplica.removeSegmentPrivate(segmentID)
 }
 
-func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID) error {
+func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) error {
 	log.Debug("remove segment", zap.Int64("segmentID", segmentID))
 	segment, err := colReplica.getSegmentByIDPrivate(segmentID)
 	if err != nil {
@@ -414,13 +414,13 @@ func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID
 	return nil
 }
 
-func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
+func (colReplica *collectionReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return colReplica.getSegmentByIDPrivate(segmentID)
 }
 
-func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) {
+func (colReplica *collectionReplica) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) {
 	segment, ok := colReplica.segments[segmentID]
 	if !ok {
 		return nil, errors.New("cannot find segment, id = " + strconv.FormatInt(segmentID, 10))
@@ -429,24 +429,24 @@ func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueI
 	return segment, nil
 }
 
-func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
+func (colReplica *collectionReplica) hasSegment(segmentID UniqueID) bool {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return colReplica.hasSegmentPrivate(segmentID)
 }
 
-func (colReplica *collectionReplicaImpl) hasSegmentPrivate(segmentID UniqueID) bool {
+func (colReplica *collectionReplica) hasSegmentPrivate(segmentID UniqueID) bool {
 	_, ok := colReplica.segments[segmentID]
 	return ok
 }
 
-func (colReplica *collectionReplicaImpl) getSegmentNum() int {
+func (colReplica *collectionReplica) getSegmentNum() int {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 	return len(colReplica.segments)
 }
 
-func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.SegmentStats {
+func (colReplica *collectionReplica) getSegmentStatistics() []*internalpb2.SegmentStats {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
@@ -471,7 +471,7 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S
 	return statisticData
 }
 
-func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
+func (colReplica *collectionReplica) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
@@ -500,7 +500,7 @@ func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType
 	return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
 }
 
-func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
+func (colReplica *collectionReplica) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
 	colReplica.mu.RLock()
 	defer colReplica.mu.RUnlock()
 
@@ -519,7 +519,7 @@ func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmen
 	return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
 }
 
-func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
+func (colReplica *collectionReplica) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 	if segment.segmentType != segTypeSealed && segment.segmentType != segTypeIndexing {
@@ -539,11 +539,11 @@ func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(se
 }
 
 //-----------------------------------------------------------------------------------------------------
-func (colReplica *collectionReplicaImpl) getTSafe() tSafe {
+func (colReplica *collectionReplica) getTSafe() tSafer {
 	return colReplica.tSafe
 }
 
-func (colReplica *collectionReplicaImpl) freeAll() {
+func (colReplica *collectionReplica) freeAll() {
 	colReplica.mu.Lock()
 	defer colReplica.mu.Unlock()
 
@@ -556,14 +556,14 @@ func (colReplica *collectionReplicaImpl) freeAll() {
 	colReplica.segments = make(map[UniqueID]*Segment)
 }
 
-func newCollectionReplicaImpl() collectionReplica {
+func newCollectionReplica() ReplicaInterface {
 	collections := make(map[int64]*Collection)
 	partitions := make(map[int64]*Partition)
 	segments := make(map[int64]*Segment)
 
 	tSafe := newTSafe()
 
-	var replica collectionReplica = &collectionReplicaImpl{
+	var replica ReplicaInterface = &collectionReplica{
 		collections: collections,
 		partitions:  partitions,
 		segments:    segments,
diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go
index 68cb575b5..fa816d7f6 100644
--- a/internal/querynode/data_sync_service.go
+++ b/internal/querynode/data_sync_service.go
@@ -18,10 +18,10 @@ type dataSyncService struct {
 	dmStream  msgstream.MsgStream
 	msFactory msgstream.Factory
 
-	replica collectionReplica
+	replica ReplicaInterface
 }
 
-func newDataSyncService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *dataSyncService {
+func newDataSyncService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *dataSyncService {
 	service := &dataSyncService{
 		ctx:       ctx,
 		fg:        nil,
diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go
index 221c1a119..66b80cf11 100644
--- a/internal/querynode/flow_graph_dd_node.go
+++ b/internal/querynode/flow_graph_dd_node.go
@@ -14,7 +14,7 @@ import (
 type ddNode struct {
 	baseNode
 	ddMsg   *ddMsg
-	replica collectionReplica
+	replica ReplicaInterface
 }
 
 func (ddNode *ddNode) Name() string {
@@ -160,7 +160,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
 	})
 }
 
-func newDDNode(replica collectionReplica) *ddNode {
+func newDDNode(replica ReplicaInterface) *ddNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go
index c948dc5a9..50c7ac00f 100644
--- a/internal/querynode/flow_graph_filter_dm_node.go
+++ b/internal/querynode/flow_graph_filter_dm_node.go
@@ -12,7 +12,7 @@ import (
 
 type filterDmNode struct {
 	baseNode
-	replica collectionReplica
+	replica ReplicaInterface
 }
 
 func (fdmNode *filterDmNode) Name() string {
@@ -100,7 +100,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
 	return msg
 }
 
-func newFilteredDmNode(replica collectionReplica) *filterDmNode {
+func newFilteredDmNode(replica ReplicaInterface) *filterDmNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go
index f4aba3250..07ef91b4e 100644
--- a/internal/querynode/flow_graph_gc_node.go
+++ b/internal/querynode/flow_graph_gc_node.go
@@ -10,7 +10,7 @@ import (
 
 type gcNode struct {
 	baseNode
-	replica collectionReplica
+	replica ReplicaInterface
 }
 
 func (gcNode *gcNode) Name() string {
@@ -54,7 +54,7 @@ func (gcNode *gcNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con
 	return nil, ctx
 }
 
-func newGCNode(replica collectionReplica) *gcNode {
+func newGCNode(replica ReplicaInterface) *gcNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go
index 45ad4edce..9e4933b32 100644
--- a/internal/querynode/flow_graph_insert_node.go
+++ b/internal/querynode/flow_graph_insert_node.go
@@ -12,7 +12,7 @@ import (
 
 type insertNode struct {
 	baseNode
-	replica collectionReplica
+	replica ReplicaInterface
 }
 
 type InsertData struct {
@@ -120,7 +120,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
 	wg.Done()
 }
 
-func newInsertNode(replica collectionReplica) *insertNode {
+func newInsertNode(replica ReplicaInterface) *insertNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go
index 3146a0829..b6e12942d 100644
--- a/internal/querynode/flow_graph_service_time_node.go
+++ b/internal/querynode/flow_graph_service_time_node.go
@@ -13,7 +13,7 @@ import (
 
 type serviceTimeNode struct {
 	baseNode
-	replica           collectionReplica
+	replica           ReplicaInterface
 	timeTickMsgStream msgstream.MsgStream
 }
 
@@ -71,7 +71,7 @@ func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error {
 	return stNode.timeTickMsgStream.Produce(context.TODO(), &msgPack)
 }
 
-func newServiceTimeNode(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *serviceTimeNode {
+func newServiceTimeNode(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *serviceTimeNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go
index 4d2848206..7f9f44f17 100644
--- a/internal/querynode/index_loader.go
+++ b/internal/querynode/index_loader.go
@@ -26,7 +26,7 @@ import (
 )
 
 type indexLoader struct {
-	replica collectionReplica
+	replica ReplicaInterface
 
 	fieldIndexes   map[string][]*internalpb2.IndexStats
 	fieldStatsChan chan []*internalpb2.FieldStats
@@ -389,7 +389,7 @@ func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, in
 	return nil
 }
 
-func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica collectionReplica) *indexLoader {
+func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface) *indexLoader {
 	option := &minioKV.Option{
 		Address:           Params.MinioEndPoint,
 		AccessKeyID:       Params.MinioAccessKeyID,
diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go
index 3d78d5707..bbc9b05f5 100644
--- a/internal/querynode/load_service.go
+++ b/internal/querynode/load_service.go
@@ -161,7 +161,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
 	return nil
 }
 
-func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
+func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *loadService {
 	ctx1, cancel := context.WithCancel(ctx)
 
 	segLoader := newSegmentLoader(ctx1, masterClient, indexClient, dataClient, replica, dmStream)
diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go
index 97628ad93..34daa6ba5 100644
--- a/internal/querynode/meta_service.go
+++ b/internal/querynode/meta_service.go
@@ -27,10 +27,10 @@ const (
 type metaService struct {
 	ctx     context.Context
 	kvBase  *etcdkv.EtcdKV
-	replica collectionReplica
+	replica ReplicaInterface
 }
 
-func newMetaService(ctx context.Context, replica collectionReplica) *metaService {
+func newMetaService(ctx context.Context, replica ReplicaInterface) *metaService {
 	ETCDAddr := Params.ETCDAddress
 	MetaRootPath := Params.MetaRootPath
 	var cli *clientv3.Client
diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go
index 32e783b78..52326d442 100644
--- a/internal/querynode/query_node.go
+++ b/internal/querynode/query_node.go
@@ -55,7 +55,7 @@ type QueryNode struct {
 	QueryNodeID UniqueID
 	stateCode   atomic.Value
 
-	replica collectionReplica
+	replica ReplicaInterface
 
 	// internal services
 	dataSyncService *dataSyncService
@@ -88,7 +88,7 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F
 		msFactory: factory,
 	}
 
-	node.replica = newCollectionReplicaImpl()
+	node.replica = newCollectionReplica()
 	node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
 	return node
 }
@@ -107,7 +107,7 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
 		msFactory: factory,
 	}
 
-	node.replica = newCollectionReplicaImpl()
+	node.replica = newCollectionReplica()
 	node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
 
 	return node
diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go
index 19d4e0643..018d35155 100644
--- a/internal/querynode/search_service.go
+++ b/internal/querynode/search_service.go
@@ -23,7 +23,7 @@ type searchService struct {
 	wait   sync.WaitGroup
 	cancel context.CancelFunc
 
-	replica      collectionReplica
+	replica      ReplicaInterface
 	tSafeWatcher *tSafeWatcher
 
 	serviceableTimeMutex sync.Mutex // guards serviceableTime
@@ -38,7 +38,7 @@ type searchService struct {
 
 type ResultEntityIds []UniqueID
 
-func newSearchService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *searchService {
+func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService {
 	receiveBufSize := Params.SearchReceiveBufSize
 
 	searchStream, _ := factory.NewMsgStream(ctx)
diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go
index 997a208f6..01c855045 100644
--- a/internal/querynode/segment_loader.go
+++ b/internal/querynode/segment_loader.go
@@ -17,7 +17,7 @@ import (
 
 // segmentLoader is only responsible for loading the field data from binlog
 type segmentLoader struct {
-	replica collectionReplica
+	replica ReplicaInterface
 
 	dmStream msgstream.MsgStream
 
@@ -191,7 +191,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetField
 	return nil
 }
 
-func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *segmentLoader {
+func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *segmentLoader {
 	option := &minioKV.Option{
 		Address:           Params.MinioEndPoint,
 		AccessKeyID:       Params.MinioAccessKeyID,
diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go
index ab34823c4..7d08d8528 100644
--- a/internal/querynode/stats_service.go
+++ b/internal/querynode/stats_service.go
@@ -13,14 +13,14 @@ import (
 type statsService struct {
 	ctx context.Context
 
-	replica collectionReplica
+	replica ReplicaInterface
 
 	fieldStatsChan chan []*internalpb2.FieldStats
 	statsStream    msgstream.MsgStream
 	msFactory      msgstream.Factory
 }
 
-func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService {
+func newStatsService(ctx context.Context, replica ReplicaInterface, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService {
 
 	return &statsService{
 		ctx: ctx,
diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go
index 60529a3c9..103e5200d 100644
--- a/internal/querynode/tsafe.go
+++ b/internal/querynode/tsafe.go
@@ -24,38 +24,38 @@ func (watcher *tSafeWatcher) hasUpdate() {
 	<-watcher.notifyChan
 }
 
-type tSafe interface {
+type tSafer interface {
 	get() Timestamp
 	set(t Timestamp)
 	registerTSafeWatcher(t *tSafeWatcher)
 }
 
-type tSafeImpl struct {
+type tSafe struct {
 	tSafeMu     sync.Mutex // guards all fields
 	tSafe       Timestamp
 	watcherList []*tSafeWatcher
 }
 
-func newTSafe() tSafe {
-	var t tSafe = &tSafeImpl{
+func newTSafe() tSafer {
+	var t tSafer = &tSafe{
 		watcherList: make([]*tSafeWatcher, 0),
 	}
 	return t
 }
 
-func (ts *tSafeImpl) registerTSafeWatcher(t *tSafeWatcher) {
+func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) {
 	ts.tSafeMu.Lock()
 	defer ts.tSafeMu.Unlock()
 	ts.watcherList = append(ts.watcherList, t)
 }
 
-func (ts *tSafeImpl) get() Timestamp {
+func (ts *tSafe) get() Timestamp {
 	ts.tSafeMu.Lock()
 	defer ts.tSafeMu.Unlock()
 	return ts.tSafe
 }
 
-func (ts *tSafeImpl) set(t Timestamp) {
+func (ts *tSafe) set(t Timestamp) {
 	ts.tSafeMu.Lock()
 	defer ts.tSafeMu.Unlock()
 
diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go
index 15f063153..749124631 100644
--- a/internal/queryservice/meta_replica.go
+++ b/internal/queryservice/meta_replica.go
@@ -9,7 +9,7 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
 )
 
-type metaReplica interface {
+type Replica interface {
 	getCollections(dbID UniqueID) ([]*collection, error)
 	getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error)
 	getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error)
@@ -42,23 +42,23 @@ type collection struct {
 	schema          *schemapb.CollectionSchema
 }
 
-type metaReplicaImpl struct {
+type metaReplica struct {
 	dbID           []UniqueID
 	db2collections map[UniqueID][]*collection
 }
 
-func newMetaReplica() metaReplica {
+func newMetaReplica() Replica {
 	db2collections := make(map[UniqueID][]*collection)
 	db2collections[0] = make([]*collection, 0)
 	dbIDs := make([]UniqueID, 0)
 	dbIDs = append(dbIDs, UniqueID(0))
-	return &metaReplicaImpl{
+	return &metaReplica{
 		dbID:           dbIDs,
 		db2collections: db2collections,
 	}
 }
 
-func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error {
+func (mp *metaReplica) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error {
 	//TODO:: assert dbID = 0 exist
 	if _, ok := mp.db2collections[dbID]; ok {
 		partitions := make(map[UniqueID]*partition)
@@ -76,7 +76,7 @@ func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, s
 	return errors.New("addCollection: can't find dbID when add collection")
 }
 
-func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
+func (mp *metaReplica) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collection.id == collectionID {
@@ -95,7 +95,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa
 	return errors.New("addPartition: can't find collection when add partition")
 }
 
-func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) {
+func (mp *metaReplica) getCollections(dbID UniqueID) ([]*collection, error) {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		return collections, nil
 	}
@@ -103,7 +103,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error)
 	return nil, errors.New("getCollections: can't find collectionID")
 }
 
-func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
+func (mp *metaReplica) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collectionID == collection.id {
@@ -119,7 +119,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) (
 	return nil, errors.New("getPartitions: can't find partitionIDs")
 }
 
-func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
+func (mp *metaReplica) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collectionID == collection.id {
@@ -136,7 +136,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par
 	return nil, errors.New("getSegments: can't find segmentID")
 }
 
-func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) {
+func (mp *metaReplica) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collectionID == collection.id {
@@ -148,7 +148,7 @@ func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueI
 	return nil, errors.New("getCollectionByID: can't find collectionID")
 }
 
-func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
+func (mp *metaReplica) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collectionID == collection.id {
@@ -163,7 +163,7 @@ func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID
 	return nil, errors.New("getPartitionByID: can't find partitionID")
 }
 
-func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
+func (mp *metaReplica) updatePartitionState(dbID UniqueID,
 	collectionID UniqueID,
 	partitionID UniqueID,
 	state querypb.PartitionState) error {
@@ -178,7 +178,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
 	return errors.New("updatePartitionState: update partition state fail")
 }
 
-func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
+func (mp *metaReplica) getPartitionStates(dbID UniqueID,
 	collectionID UniqueID,
 	partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) {
 	partitionStates := make([]*querypb.PartitionStates, 0)
@@ -202,7 +202,7 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
 	return partitionStates, nil
 }
 
-func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error {
+func (mp *metaReplica) releaseCollection(dbID UniqueID, collectionID UniqueID) error {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for i, collection := range collections {
 			if collectionID == collection.id {
@@ -220,7 +220,7 @@ func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueI
 	return errors.New(errorStr)
 }
 
-func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
+func (mp *metaReplica) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collectionID == collection.id {
@@ -236,7 +236,7 @@ func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID
 	return errors.New(errorStr)
 }
 
-func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]int64) error {
+func (mp *metaReplica) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]int64) error {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collectionID == collection.id {
@@ -250,7 +250,7 @@ func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, c
 	return errors.New("addDmChannels: can't find dbID or collectionID")
 }
 
-func (mp *metaReplicaImpl) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (int64, error) {
+func (mp *metaReplica) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (int64, error) {
 	if collections, ok := mp.db2collections[dbID]; ok {
 		for _, collection := range collections {
 			if collectionID == collection.id {
diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go
index 2179028cf..dd6d25ebe 100644
--- a/internal/queryservice/queryservice.go
+++ b/internal/queryservice/queryservice.go
@@ -25,8 +25,24 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
 	"github.com/zilliztech/milvus-distributed/internal/util/retry"
+	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
 )
 
+type Service interface {
+	typeutil.Component
+
+	RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error)
+	ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error)
+	LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error)
+	ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
+	ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error)
+	LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error)
+	ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error)
+	CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error)
+	GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error)
+	GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error)
+}
+
 type MasterServiceInterface interface {
 	ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
 	ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
@@ -60,7 +76,7 @@ type QueryService struct {
 	loopCancel context.CancelFunc
 
 	queryServiceID uint64
-	replica        metaReplica
+	replica        Replica
 
 	dataServiceClient   DataServiceInterface
 	masterServiceClient MasterServiceInterface
diff --git a/internal/util/typeutil/interface.go b/internal/util/typeutil/interface.go
index 985b89717..af0fd8964 100644
--- a/internal/util/typeutil/interface.go
+++ b/internal/util/typeutil/interface.go
@@ -10,6 +10,9 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
 )
 
+type TimeTickHandler interface {
+	GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
+}
 type Service interface {
 	Init() error
 	Start() error
@@ -18,7 +21,6 @@ type Service interface {
 
 type Component interface {
 	GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
-	GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
 	GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
 }
 
-- 
GitLab