From 9fe672ab86010b814b1320caa7a3fbe4f20287e9 Mon Sep 17 00:00:00 2001
From: XuanYang-cn <xuan.yang@zilliz.com>
Date: Sun, 24 Jan 2021 21:20:11 +0800
Subject: [PATCH] Enchance datanode interface

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
---
 configs/milvus.yaml                           |   4 +
 internal/datanode/allocator.go                |  38 ++
 internal/datanode/collection_replica_test.go  |   7 +-
 internal/datanode/collection_test.go          |   6 +-
 internal/datanode/data_node.go                | 153 ++++++--
 internal/datanode/data_node_test.go           |  26 +-
 internal/datanode/data_sync_service.go        |  24 +-
 internal/datanode/data_sync_service_test.go   |  19 +-
 internal/datanode/{factory => }/factory.go    |  14 +-
 internal/datanode/flow_graph_dd_node.go       |  21 +-
 internal/datanode/flow_graph_dd_node_test.go  |   5 +-
 .../datanode/flow_graph_insert_buffer_node.go |  34 +-
 .../flow_graph_insert_buffer_node_test.go     |   6 +-
 .../flow_graph_msg_stream_input_node.go       |  10 +-
 internal/datanode/interface.go                |  14 -
 internal/datanode/param_table.go              | 142 ++++----
 internal/datanode/param_table_test.go         |  26 +-
 internal/dataservice/cluster.go               |   4 +-
 internal/distributed/datanode/client.go       |  41 ++-
 internal/distributed/datanode/service.go      |  64 +++-
 internal/proto/data_service.proto             |   6 +
 internal/proto/datapb/data_service.pb.go      | 337 +++++++++++++-----
 22 files changed, 645 insertions(+), 356 deletions(-)
 create mode 100644 internal/datanode/allocator.go
 rename internal/datanode/{factory => }/factory.go (95%)
 delete mode 100644 internal/datanode/interface.go

diff --git a/configs/milvus.yaml b/configs/milvus.yaml
index 05c06eed8..9c7db935f 100644
--- a/configs/milvus.yaml
+++ b/configs/milvus.yaml
@@ -73,3 +73,7 @@ indexNode:
 indexServer:
   address: localhost
   port: 21118
+
+dataNode:
+  address: localhost
+  port: 21124
diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go
new file mode 100644
index 000000000..0cad7a46c
--- /dev/null
+++ b/internal/datanode/allocator.go
@@ -0,0 +1,38 @@
+package datanode
+
+import (
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
+)
+
+type (
+	allocator interface {
+		allocID() (UniqueID, error)
+	}
+
+	allocatorImpl struct {
+		masterService MasterServiceInterface
+	}
+)
+
+func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
+	return &allocatorImpl{
+		masterService: s,
+	}
+}
+
+func (alloc *allocatorImpl) allocID() (UniqueID, error) {
+	resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
+		Base: &commonpb.MsgBase{
+			MsgType:   commonpb.MsgType_kShowCollections,
+			MsgID:     1, // GOOSE TODO add msg id
+			Timestamp: 0, // GOOSE TODO
+			SourceID:  Params.NodeID,
+		},
+		Count: 1,
+	})
+	if err != nil {
+		return 0, err
+	}
+	return resp.ID, nil
+}
diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go
index 897168c94..25869712d 100644
--- a/internal/datanode/collection_replica_test.go
+++ b/internal/datanode/collection_replica_test.go
@@ -4,24 +4,23 @@ import (
 	"testing"
 
 	"github.com/golang/protobuf/proto"
-
-	"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
-
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
 
 func newReplica() collectionReplica {
 	collections := make([]*Collection, 0)
+	segments := make([]*Segment, 0)
 
 	var replica collectionReplica = &collectionReplicaImpl{
 		collections: collections,
+		segments:    segments,
 	}
 	return replica
 }
 
 func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) {
-	Factory := &factory.MetaFactory{}
+	Factory := &MetaFactory{}
 	collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
 
 	schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
diff --git a/internal/datanode/collection_test.go b/internal/datanode/collection_test.go
index f5c7cd1e1..cce9bc1dc 100644
--- a/internal/datanode/collection_test.go
+++ b/internal/datanode/collection_test.go
@@ -5,14 +5,12 @@ import (
 
 	"github.com/golang/protobuf/proto"
 	"github.com/stretchr/testify/assert"
-
-	"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
 )
 
 func TestCollection_newCollection(t *testing.T) {
 	collectionName := "collection0"
 	collectionID := UniqueID(1)
-	Factory := &factory.MetaFactory{}
+	Factory := &MetaFactory{}
 	collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
 
 	schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
@@ -26,7 +24,7 @@ func TestCollection_newCollection(t *testing.T) {
 func TestCollection_deleteCollection(t *testing.T) {
 	collectionName := "collection0"
 	collectionID := UniqueID(1)
-	Factory := &factory.MetaFactory{}
+	Factory := &MetaFactory{}
 	collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
 
 	schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go
index edded7907..4be2c78d9 100644
--- a/internal/datanode/data_node.go
+++ b/internal/datanode/data_node.go
@@ -4,51 +4,130 @@ import (
 	"context"
 	"io"
 	"log"
+	"time"
 
 	"github.com/opentracing/opentracing-go"
 	"github.com/uber/jaeger-client-go"
 	"github.com/uber/jaeger-client-go/config"
+
+	"github.com/zilliztech/milvus-distributed/internal/errors"
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
+	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
 )
 
-type DataNode struct {
-	ctx             context.Context
-	DataNodeID      uint64
-	dataSyncService *dataSyncService
-	metaService     *metaService
+const (
+	RPCConnectionTimeout = 30 * time.Second
+)
 
-	replica collectionReplica
+type (
+	Inteface interface {
+		typeutil.Service
 
-	tracer opentracing.Tracer
-	closer io.Closer
-}
+		GetComponentStates() (*internalpb2.ComponentStates, error)
+		WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
+		FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
+	}
 
-func NewDataNode(ctx context.Context, dataNodeID uint64) *DataNode {
+	DataServiceInterface interface {
+		RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
+	}
 
-	collections := make([]*Collection, 0)
+	MasterServiceInterface interface {
+		AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error)
+		ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
+		DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
+	}
 
-	var replica collectionReplica = &collectionReplicaImpl{
-		collections: collections,
+	DataNode struct {
+		ctx    context.Context
+		NodeID UniqueID
+		Role   string
+		State  internalpb2.StateCode
+
+		dataSyncService *dataSyncService
+		metaService     *metaService
+
+		masterService MasterServiceInterface
+		dataService   DataServiceInterface
+
+		replica collectionReplica
+
+		tracer opentracing.Tracer
+		closer io.Closer
 	}
+)
+
+func NewDataNode(ctx context.Context, nodeID UniqueID, masterService MasterServiceInterface,
+	dataService DataServiceInterface) *DataNode {
 
+	Params.Init()
 	node := &DataNode{
 		ctx:             ctx,
-		DataNodeID:      dataNodeID,
+		NodeID:          nodeID,     // GOOSE TODO
+		Role:            "DataNode", // GOOSE TODO
+		State:           internalpb2.StateCode_INITIALIZING,
 		dataSyncService: nil,
-		// metaService:     nil,
-		replica: replica,
+		metaService:     nil,
+		masterService:   masterService,
+		dataService:     dataService,
+		replica:         nil,
 	}
 
 	return node
 }
 
 func (node *DataNode) Init() error {
-	Params.Init()
-	return nil
-}
 
-func (node *DataNode) Start() error {
+	req := &datapb.RegisterNodeRequest{
+		Base: &commonpb.MsgBase{
+			MsgType:  commonpb.MsgType_kNone, //GOOSE TODO
+			SourceID: node.NodeID,
+		},
+		Address: &commonpb.Address{
+			Ip:   Params.IP,
+			Port: Params.Port,
+		},
+	}
+
+	resp, err := node.dataService.RegisterNode(req)
+	if err != nil {
+		return errors.Errorf("Init failed: %v", err)
+	}
+
+	for _, kv := range resp.InitParams.StartParams {
+		log.Println(kv)
+		switch kv.Key {
+		case "DDChannelName":
+			Params.DDChannelNames = []string{kv.Value}
+		case "SegmentStatisticsChannelName":
+			Params.SegmentStatisticsChannelName = kv.Value
+		case "TimeTickChannelName":
+			Params.TimeTickChannelName = kv.Value
+		case "CompleteFlushChannelName":
+			Params.CompleteFlushChannelName = kv.Value
+		default:
+			return errors.Errorf("Invalid key: %v", kv.Key)
+		}
+
+	}
+
+	var replica collectionReplica = &collectionReplicaImpl{
+		collections: make([]*Collection, 0),
+		segments:    make([]*Segment, 0),
+	}
+
+	var alloc allocator = newAllocatorImpl(node.masterService)
+	chanSize := 100
+	flushChan := make(chan *flushMsg, chanSize)
+	node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc)
+	node.metaService = newMetaService(node.ctx, replica)
+	node.replica = replica
+
+	// Opentracing
 	cfg := &config.Configuration{
 		ServiceName: "data_node",
 		Sampler: &config.SamplerConfig{
@@ -59,24 +138,22 @@ func (node *DataNode) Start() error {
 			LogSpans: true,
 		},
 	}
-
-	var err error
-	node.tracer, node.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger))
+	tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger))
 	if err != nil {
-		log.Printf("ERROR: cannot init Jaeger: %v\n", err)
-	} else {
-		opentracing.SetGlobalTracer(node.tracer)
+		return errors.Errorf("ERROR: cannot init Jaeger: %v\n", err)
 	}
+	node.tracer = tracer
+	node.closer = closer
 
-	// TODO GOOSE Init Size??
-	chanSize := 100
-	flushChan := make(chan *flushMsg, chanSize)
+	opentracing.SetGlobalTracer(node.tracer)
 
-	node.dataSyncService = newDataSyncService(node.ctx, flushChan, node.replica)
-	node.metaService = newMetaService(node.ctx, node.replica)
+	node.State = internalpb2.StateCode_HEALTHY
+	return nil
+}
+
+func (node *DataNode) Start() error {
 
 	go node.dataSyncService.start()
-	// go node.flushSyncService.start()
 	node.metaService.start()
 
 	return nil
@@ -88,8 +165,16 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) error {
 }
 
 func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
-	// GOOSE TODO: Implement me
-	return nil, nil
+	states := &internalpb2.ComponentStates{
+		State: &internalpb2.ComponentInfo{
+			NodeID:    Params.NodeID,
+			Role:      node.Role,
+			StateCode: node.State,
+		},
+		SubcomponentStates: make([]*internalpb2.ComponentInfo, 0),
+		Status:             &commonpb.Status{},
+	}
+	return states, nil
 }
 
 func (node *DataNode) FlushSegments(in *datapb.FlushSegRequest) error {
diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go
index c71fb8677..da9f49684 100644
--- a/internal/datanode/data_node_test.go
+++ b/internal/datanode/data_node_test.go
@@ -26,6 +26,10 @@ func makeNewChannelNames(names []string, suffix string) []string {
 }
 
 func refreshChannelNames() {
+	Params.DDChannelNames = []string{"datanode-test"}
+	Params.SegmentStatisticsChannelName = "segtment-statistics"
+	Params.CompleteFlushChannelName = "flush-completed"
+	Params.TimeTickChannelName = "hard-timetick"
 	suffix := "-test-data-node" + strconv.FormatInt(rand.Int63n(100), 10)
 	Params.DDChannelNames = makeNewChannelNames(Params.DDChannelNames, suffix)
 	Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
@@ -81,28 +85,6 @@ func TestMain(m *testing.M) {
 	os.Exit(exitCode)
 }
 
-func newDataNode() *DataNode {
-
-	const ctxTimeInMillisecond = 2000
-	const closeWithDeadline = true
-	var ctx context.Context
-
-	if closeWithDeadline {
-		var cancel context.CancelFunc
-		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
-		ctx, cancel = context.WithDeadline(context.Background(), d)
-		go func() {
-			<-ctx.Done()
-			cancel()
-		}()
-	} else {
-		ctx = context.Background()
-	}
-
-	svr := NewDataNode(ctx, 0)
-	return svr
-}
-
 func newMetaTable() *metaTable {
 	etcdClient, _ := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
 
diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go
index 40cf55371..fa8b09ec0 100644
--- a/internal/datanode/data_sync_service.go
+++ b/internal/datanode/data_sync_service.go
@@ -10,20 +10,22 @@ import (
 )
 
 type dataSyncService struct {
-	ctx       context.Context
-	fg        *flowgraph.TimeTickedFlowGraph
-	flushChan chan *flushMsg
-	replica   collectionReplica
+	ctx         context.Context
+	fg          *flowgraph.TimeTickedFlowGraph
+	flushChan   chan *flushMsg
+	replica     collectionReplica
+	idAllocator allocator
 }
 
 func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
-	replica collectionReplica) *dataSyncService {
+	replica collectionReplica, alloc allocator) *dataSyncService {
 
 	return &dataSyncService{
-		ctx:       ctx,
-		fg:        nil,
-		flushChan: flushChan,
-		replica:   replica,
+		ctx:         ctx,
+		fg:          nil,
+		flushChan:   flushChan,
+		replica:     replica,
+		idAllocator: alloc,
 	}
 }
 
@@ -59,8 +61,8 @@ func (dsService *dataSyncService) initNodes() {
 
 	var filterDmNode Node = newFilteredDmNode()
 
-	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica)
-	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica)
+	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
+	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator)
 	var gcNode Node = newGCNode(dsService.replica)
 
 	dsService.fg.AddNode(&dmStreamNode)
diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go
index e6fdde38b..0813dde84 100644
--- a/internal/datanode/data_sync_service_test.go
+++ b/internal/datanode/data_sync_service_test.go
@@ -10,7 +10,6 @@ import (
 	"github.com/golang/protobuf/proto"
 	"github.com/stretchr/testify/assert"
 
-	"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@@ -35,10 +34,16 @@ func TestDataSyncService_Start(t *testing.T) {
 	// init data node
 	pulsarURL := Params.PulsarAddress
 
-	Factory := &factory.MetaFactory{}
+	Factory := &MetaFactory{}
 	collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
-	node := NewDataNode(ctx, 0)
-	node.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema))
+
+	chanSize := 100
+	flushChan := make(chan *flushMsg, chanSize)
+	replica := newReplica()
+	allocFactory := AllocatorFactory{}
+	sync := newDataSyncService(ctx, flushChan, replica, allocFactory)
+	sync.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema))
+	go sync.start()
 
 	// test data generate
 	// GOOSE TODO orgnize
@@ -204,10 +209,6 @@ func TestDataSyncService_Start(t *testing.T) {
 
 	// dataSync
 	Params.FlushInsertBufferSize = 1
-	node.dataSyncService = newDataSyncService(node.ctx, nil, node.replica)
-	go node.dataSyncService.start()
-
-	node.Stop()
 
-	<-ctx.Done()
+	sync.close()
 }
diff --git a/internal/datanode/factory/factory.go b/internal/datanode/factory.go
similarity index 95%
rename from internal/datanode/factory/factory.go
rename to internal/datanode/factory.go
index 1a170c4ab..d8a462a9e 100644
--- a/internal/datanode/factory/factory.go
+++ b/internal/datanode/factory.go
@@ -1,21 +1,20 @@
-package factory
+package datanode
 
 import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
-	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
 )
 
 type (
-	UniqueID  = typeutil.UniqueID
-	Timestamp = typeutil.Timestamp
-
 	Factory interface {
 	}
 
 	MetaFactory struct {
 	}
+
+	AllocatorFactory struct {
+	}
 )
 
 func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
@@ -152,3 +151,8 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa
 	}
 	return &collection
 }
+
+func (alloc AllocatorFactory) allocID() (UniqueID, error) {
+	// GOOSE TODO: random ID generate
+	return UniqueID(0), nil
+}
diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go
index 5b5c6df95..d7432ae16 100644
--- a/internal/datanode/flow_graph_dd_node.go
+++ b/internal/datanode/flow_graph_dd_node.go
@@ -9,7 +9,6 @@ import (
 	"strconv"
 
 	"github.com/golang/protobuf/proto"
-	"github.com/zilliztech/milvus-distributed/internal/allocator"
 	"github.com/zilliztech/milvus-distributed/internal/kv"
 	miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
@@ -25,7 +24,7 @@ type ddNode struct {
 	ddBuffer  *ddBuffer
 	inFlushCh chan *flushMsg
 
-	idAllocator *allocator.IDAllocator
+	idAllocator allocator
 	kv          kv.Base
 	replica     collectionReplica
 	flushMeta   *metaTable
@@ -174,7 +173,7 @@ func (ddNode *ddNode) flush() {
 			keyCommon := path.Join(Params.DdBinlogRootPath, strconv.FormatInt(collectionID, 10))
 
 			// save ts binlog
-			timestampLogIdx, err := ddNode.idAllocator.AllocOne()
+			timestampLogIdx, err := ddNode.idAllocator.allocID()
 			if err != nil {
 				log.Println(err)
 			}
@@ -186,7 +185,7 @@ func (ddNode *ddNode) flush() {
 			log.Println("save ts binlog, key = ", timestampKey)
 
 			// save dd binlog
-			ddLogIdx, err := ddNode.idAllocator.AllocOne()
+			ddLogIdx, err := ddNode.idAllocator.allocID()
 			if err != nil {
 				log.Println(err)
 			}
@@ -370,7 +369,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
 }
 
 func newDDNode(ctx context.Context, flushMeta *metaTable,
-	inFlushCh chan *flushMsg, replica collectionReplica) *ddNode {
+	inFlushCh chan *flushMsg, replica collectionReplica, alloc allocator) *ddNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
@@ -397,15 +396,6 @@ func newDDNode(ctx context.Context, flushMeta *metaTable,
 		panic(err)
 	}
 
-	idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress)
-	if err != nil {
-		panic(err)
-	}
-	err = idAllocator.Start()
-	if err != nil {
-		panic(err)
-	}
-
 	return &ddNode{
 		BaseNode:  baseNode,
 		ddRecords: ddRecords,
@@ -413,10 +403,9 @@ func newDDNode(ctx context.Context, flushMeta *metaTable,
 			ddData:  make(map[UniqueID]*ddData),
 			maxSize: Params.FlushDdBufferSize,
 		},
-		// outCh:     outCh,
 		inFlushCh: inFlushCh,
 
-		idAllocator: idAllocator,
+		idAllocator: alloc,
 		kv:          minioKV,
 		replica:     replica,
 		flushMeta:   flushMeta,
diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go
index a255bb103..c0ce5d52f 100644
--- a/internal/datanode/flow_graph_dd_node_test.go
+++ b/internal/datanode/flow_graph_dd_node_test.go
@@ -15,7 +15,7 @@ import (
 
 func TestFlowGraphDDNode_Operate(t *testing.T) {
 	const ctxTimeInMillisecond = 2000
-	const closeWithDeadline = false
+	const closeWithDeadline = true
 	var ctx context.Context
 
 	if closeWithDeadline {
@@ -37,7 +37,8 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
 
 	Params.FlushDdBufferSize = 4
 	replica := newReplica()
-	ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica)
+	idFactory := AllocatorFactory{}
+	ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, idFactory)
 
 	colID := UniqueID(0)
 	colName := "col-test-0"
diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go
index 7e7d3da2e..b342d7eed 100644
--- a/internal/datanode/flow_graph_insert_buffer_node.go
+++ b/internal/datanode/flow_graph_insert_buffer_node.go
@@ -15,7 +15,6 @@ import (
 	"github.com/opentracing/opentracing-go"
 	oplog "github.com/opentracing/opentracing-go/log"
 
-	"github.com/zilliztech/milvus-distributed/internal/allocator"
 	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/kv"
 	miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
@@ -45,7 +44,7 @@ type (
 		minIOKV     kv.Base
 		minioPrefix string
 
-		idAllocator *allocator.IDAllocator
+		idAllocator allocator
 
 		timeTickStream          msgstream.MsgStream
 		segmentStatisticsStream msgstream.MsgStream
@@ -514,7 +513,7 @@ func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionID UniqueI
 
 	log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs))
 	for index, blob := range binLogs {
-		uid, err := ibNode.idAllocator.AllocOne()
+		uid, err := ibNode.idAllocator.allocID()
 		if err != nil {
 			return errors.Errorf("Allocate Id failed, %v", err)
 		}
@@ -543,7 +542,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error {
 			MsgType:   commonpb.MsgType_kSegmentFlushDone,
 			MsgID:     0, // GOOSE TODO
 			Timestamp: 0, // GOOSE TODO
-			SourceID:  Params.DataNodeID,
+			SourceID:  Params.NodeID,
 		},
 		SegmentID: segID,
 	}
@@ -572,7 +571,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
 				MsgType:   commonpb.MsgType_kTimeTick,
 				MsgID:     0,  // GOOSE TODO
 				Timestamp: ts, // GOOSE TODO
-				SourceID:  Params.DataNodeID,
+				SourceID:  Params.NodeID,
 			},
 		},
 	}
@@ -597,7 +596,7 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
 			MsgType:   commonpb.MsgType_kSegmentStatistics,
 			MsgID:     UniqueID(0),  // GOOSE TODO
 			Timestamp: Timestamp(0), // GOOSE TODO
-			SourceID:  Params.DataNodeID,
+			SourceID:  Params.NodeID,
 		},
 		SegStats: statsUpdates,
 	}
@@ -623,8 +622,8 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (
 	return ret.schema, nil
 }
 
-func newInsertBufferNode(ctx context.Context,
-	flushMeta *metaTable, replica collectionReplica) *insertBufferNode {
+func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
+	replica collectionReplica, alloc allocator) *insertBufferNode {
 	maxQueueLength := Params.FlowGraphMaxQueueLength
 	maxParallelism := Params.FlowGraphMaxParallelism
 
@@ -654,42 +653,33 @@ func newInsertBufferNode(ctx context.Context,
 	}
 	minioPrefix := Params.InsertBinlogRootPath
 
-	idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress)
-	if err != nil {
-		panic(err)
-	}
-	err = idAllocator.Start()
-	if err != nil {
-		panic(err)
-	}
-
 	//input stream, data node time tick
 	wTt := pulsarms.NewPulsarMsgStream(ctx, 1024)
 	wTt.SetPulsarClient(Params.PulsarAddress)
 	wTt.CreatePulsarProducers([]string{Params.TimeTickChannelName})
 	var wTtMsgStream msgstream.MsgStream = wTt
-	wTtMsgStream.Start() // GOOSE TODO remove
+	wTtMsgStream.Start()
 
 	// update statistics channel
-	segS := pulsarms.NewPulsarMsgStream(ctx, Params.SegmentStatisticsBufSize)
+	segS := pulsarms.NewPulsarMsgStream(ctx, 1024)
 	segS.SetPulsarClient(Params.PulsarAddress)
 	segS.CreatePulsarProducers([]string{Params.SegmentStatisticsChannelName})
 	var segStatisticsMsgStream msgstream.MsgStream = segS
-	segStatisticsMsgStream.Start() // GOOSE TODO remove
+	segStatisticsMsgStream.Start()
 
 	// segment flush completed channel
 	cf := pulsarms.NewPulsarMsgStream(ctx, 1024)
 	cf.SetPulsarClient(Params.PulsarAddress)
 	cf.CreatePulsarProducers([]string{Params.CompleteFlushChannelName})
 	var completeFlushStream msgstream.MsgStream = cf
-	completeFlushStream.Start() // GOOSE TODO remove
+	completeFlushStream.Start()
 
 	return &insertBufferNode{
 		BaseNode:                baseNode,
 		insertBuffer:            iBuffer,
 		minIOKV:                 minIOKV,
 		minioPrefix:             minioPrefix,
-		idAllocator:             idAllocator,
+		idAllocator:             alloc,
 		timeTickStream:          wTtMsgStream,
 		segmentStatisticsStream: segStatisticsMsgStream,
 		completeFlushStream:     completeFlushStream,
diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go
index 0fb73b341..78b08901e 100644
--- a/internal/datanode/flow_graph_insert_buffer_node_test.go
+++ b/internal/datanode/flow_graph_insert_buffer_node_test.go
@@ -12,7 +12,6 @@ import (
 	"github.com/golang/protobuf/proto"
 	"github.com/stretchr/testify/require"
 
-	"github.com/zilliztech/milvus-distributed/internal/datanode/factory"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -38,7 +37,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 	require.NoError(t, err)
 	Params.MetaRootPath = testPath
 
-	Factory := &factory.MetaFactory{}
+	Factory := &MetaFactory{}
 	collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
 	schemaBlob := proto.MarshalTextString(collMeta.Schema)
 	require.NotEqual(t, "", schemaBlob)
@@ -48,7 +47,8 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
 	require.NoError(t, err)
 
 	// Params.FlushInsertBufSize = 2
-	iBNode := newInsertBufferNode(ctx, newMetaTable(), replica)
+	idFactory := AllocatorFactory{}
+	iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory)
 	inMsg := genInsertMsg()
 	var iMsg flowgraph.Msg = &inMsg
 	iBNode.Operate([]*flowgraph.Msg{&iMsg})
diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go
index 413d50796..56e711f0e 100644
--- a/internal/datanode/flow_graph_msg_stream_input_node.go
+++ b/internal/datanode/flow_graph_msg_stream_input_node.go
@@ -35,18 +35,14 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
 }
 
 func newDDInputNode(ctx context.Context) *flowgraph.InputNode {
-	receiveBufSize := Params.DDReceiveBufSize
-	pulsarBufSize := Params.DDPulsarBufSize
-
-	msgStreamURL := Params.PulsarAddress
 
 	consumeChannels := Params.DDChannelNames
 	consumeSubName := Params.MsgChannelSubName
 
-	ddStream := pulsarms.NewPulsarTtMsgStream(ctx, receiveBufSize)
-	ddStream.SetPulsarClient(msgStreamURL)
+	ddStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
+	ddStream.SetPulsarClient(Params.PulsarAddress)
 	unmarshalDispatcher := util.NewUnmarshalDispatcher()
-	ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
+	ddStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, 1024)
 
 	var stream msgstream.MsgStream = ddStream
 
diff --git a/internal/datanode/interface.go b/internal/datanode/interface.go
deleted file mode 100644
index 47046f4b6..000000000
--- a/internal/datanode/interface.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package datanode
-
-import (
-	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
-)
-
-type Interface interface {
-	Init() error
-	Start() error
-	Stop() error
-
-	WatchDmChannels(in *datapb.WatchDmChannelRequest) error
-	FlushSegments(req *datapb.FlushSegRequest) error
-}
diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go
index 5c7866bec..a8e8526cc 100644
--- a/internal/datanode/param_table.go
+++ b/internal/datanode/param_table.go
@@ -16,7 +16,9 @@ type ParamTable struct {
 	paramtable.BaseTable
 
 	// === DataNode Internal Components Configs ===
-	DataNodeID              UniqueID
+	NodeID                  UniqueID
+	IP                      string // GOOSE TODO load from config file
+	Port                    int64
 	FlowGraphMaxQueueLength int32
 	FlowGraphMaxParallelism int32
 	FlushInsertBufferSize   int32
@@ -25,8 +27,9 @@ type ParamTable struct {
 	DdBinlogRootPath        string
 
 	// === DataNode External Components Configs ===
-	// --- Master ---
-	MasterAddress string
+	// --- External Client Address ---
+	MasterAddress  string
+	ServiceAddress string // GOOSE TODO: init from config file
 
 	// --- Pulsar ---
 	PulsarAddress string
@@ -38,20 +41,20 @@ type ParamTable struct {
 	InsertPulsarBufSize  int64
 
 	// - dd channel -
-	DDChannelNames   []string
-	DDReceiveBufSize int64
-	DDPulsarBufSize  int64
+	DDChannelNames []string // GOOSE TODO, set after Init
+	// DDReceiveBufSize int64
+	// DDPulsarBufSize  int64
 
 	// - seg statistics channel -
-	SegmentStatisticsChannelName    string
-	SegmentStatisticsBufSize        int64
-	SegmentStatisticsUpdateInterval int //  GOOSE TODO remove
+	SegmentStatisticsChannelName string // GOOSE TODO, set after init
+	// SegmentStatisticsBufSize        int64
+	// SegmentStatisticsUpdateInterval int //  GOOSE TODO remove
 
 	// - timetick channel -
-	TimeTickChannelName string
+	TimeTickChannelName string // GOOSE TODO: set after init
 
 	// - complete flush channel -
-	CompleteFlushChannelName string
+	CompleteFlushChannelName string // GOOSE TODO: set after init
 
 	// - channel subname -
 	MsgChannelSubName string
@@ -82,7 +85,9 @@ func (p *ParamTable) Init() {
 	}
 
 	// === DataNode Internal Components Configs ===
-	p.initDataNodeID()
+	p.initNodeID()
+	p.initIP()
+	p.initPort()
 	p.initFlowGraphMaxQueueLength()
 	p.initFlowGraphMaxParallelism()
 	p.initFlushInsertBufferSize()
@@ -104,20 +109,20 @@ func (p *ParamTable) Init() {
 	p.initInsertPulsarBufSize()
 
 	// - dd channel -
-	p.initDDChannelNames()
-	p.initDDReceiveBufSize()
-	p.initDDPulsarBufSize()
+	// p.initDDChannelNames()
+	// p.initDDReceiveBufSize()
+	// p.initDDPulsarBufSize()
 
 	// - seg statistics channel -
-	p.initSegmentStatisticsChannelName()
-	p.initSegmentStatisticsBufSize()
-	p.initSegmentStatisticsUpdateInterval()
+	// p.initSegmentStatisticsChannelName()
+	// p.initSegmentStatisticsBufSize()
+	// p.initSegmentStatisticsUpdateInterval()
 
 	// - timetick channel -
-	p.initTimeTickChannelName()
+	// p.initTimeTickChannelName()
 
 	// - flush completed channel -
-	p.initCompleteFlushChannelName()
+	// p.initCompleteFlushChannelName()
 
 	// - channel subname -
 	p.initMsgChannelSubName()
@@ -141,7 +146,7 @@ func (p *ParamTable) Init() {
 }
 
 // ==== DataNode internal components configs ====
-func (p *ParamTable) initDataNodeID() {
+func (p *ParamTable) initNodeID() {
 	p.dataNodeIDList = p.DataNodeIDList()
 	dataNodeIDStr := os.Getenv("DATA_NODE_ID")
 	if dataNodeIDStr == "" {
@@ -156,7 +161,20 @@ func (p *ParamTable) initDataNodeID() {
 		panic(err)
 	}
 
-	p.DataNodeID = p.ParseInt64("_dataNodeID")
+	p.NodeID = p.ParseInt64("_dataNodeID")
+}
+
+func (p *ParamTable) initIP() {
+	addr, err := p.Load("dataNode.address")
+	if err != nil {
+		panic(err)
+	}
+	p.IP = addr
+}
+
+func (p *ParamTable) initPort() {
+	port := p.ParseInt64("dataNode.port")
+	p.Port = port
 }
 
 // ---- flowgraph configs ----
@@ -257,7 +275,7 @@ func (p *ParamTable) initInsertPulsarBufSize() {
 	p.InsertPulsarBufSize = p.ParseInt64("dataNode.msgStream.insert.pulsarBufSize")
 }
 
-// - dd channel -
+// - dd channel - GOOSE TODO: remove
 func (p *ParamTable) initDDChannelNames() {
 	prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition")
 	if err != nil {
@@ -276,31 +294,31 @@ func (p *ParamTable) initDDChannelNames() {
 	p.DDChannelNames = ret
 }
 
-func (p *ParamTable) initDDReceiveBufSize() {
-	revBufSize, err := p.Load("dataNode.msgStream.dataDefinition.recvBufSize")
-	if err != nil {
-		panic(err)
-	}
-	bufSize, err := strconv.Atoi(revBufSize)
-	if err != nil {
-		panic(err)
-	}
-	p.DDReceiveBufSize = int64(bufSize)
-}
-
-func (p *ParamTable) initDDPulsarBufSize() {
-	pulsarBufSize, err := p.Load("dataNode.msgStream.dataDefinition.pulsarBufSize")
-	if err != nil {
-		panic(err)
-	}
-	bufSize, err := strconv.Atoi(pulsarBufSize)
-	if err != nil {
-		panic(err)
-	}
-	p.DDPulsarBufSize = int64(bufSize)
-}
-
-// - seg statistics channel -
+// func (p *ParamTable) initDDReceiveBufSize() {
+//     revBufSize, err := p.Load("dataNode.msgStream.dataDefinition.recvBufSize")
+//     if err != nil {
+//         panic(err)
+//     }
+//     bufSize, err := strconv.Atoi(revBufSize)
+//     if err != nil {
+//         panic(err)
+//     }
+//     p.DDReceiveBufSize = int64(bufSize)
+// }
+
+// func (p *ParamTable) initDDPulsarBufSize() {
+//     pulsarBufSize, err := p.Load("dataNode.msgStream.dataDefinition.pulsarBufSize")
+//     if err != nil {
+//         panic(err)
+//     }
+//     bufSize, err := strconv.Atoi(pulsarBufSize)
+//     if err != nil {
+//         panic(err)
+//     }
+//     p.DDPulsarBufSize = int64(bufSize)
+// }
+
+// - seg statistics channel - GOOSE TODO: remove
 func (p *ParamTable) initSegmentStatisticsChannelName() {
 
 	channelName, err := p.Load("msgChannel.chanNamePrefix.dataNodeSegStatistics")
@@ -311,28 +329,26 @@ func (p *ParamTable) initSegmentStatisticsChannelName() {
 	p.SegmentStatisticsChannelName = channelName
 }
 
-func (p *ParamTable) initSegmentStatisticsBufSize() {
-	p.SegmentStatisticsBufSize = p.ParseInt64("dataNode.msgStream.segStatistics.recvBufSize")
-}
-
-func (p *ParamTable) initSegmentStatisticsUpdateInterval() {
-	p.SegmentStatisticsUpdateInterval = p.ParseInt("dataNode.msgStream.segStatistics.updateInterval")
-}
-
-// - flush completed channel -
+// func (p *ParamTable) initSegmentStatisticsBufSize() {
+//     p.SegmentStatisticsBufSize = p.ParseInt64("dataNode.msgStream.segStatistics.recvBufSize")
+// }
+//
+// func (p *ParamTable) initSegmentStatisticsUpdateInterval() {
+//     p.SegmentStatisticsUpdateInterval = p.ParseInt("dataNode.msgStream.segStatistics.updateInterval")
+// }
 
+// - flush completed channel - GOOSE TODO: remove
 func (p *ParamTable) initCompleteFlushChannelName() {
-	// GOOSE TODO
 	p.CompleteFlushChannelName = "flush-completed"
 }
 
-// - Timetick channel -
+// - Timetick channel - GOOSE TODO: remove
 func (p *ParamTable) initTimeTickChannelName() {
 	channels, err := p.Load("msgChannel.chanNamePrefix.dataNodeTimeTick")
 	if err != nil {
 		panic(err)
 	}
-	p.TimeTickChannelName = channels + "-" + strconv.FormatInt(p.DataNodeID, 10)
+	p.TimeTickChannelName = channels + "-" + strconv.FormatInt(p.NodeID, 10)
 }
 
 // - msg channel subname -
@@ -341,7 +357,7 @@ func (p *ParamTable) initMsgChannelSubName() {
 	if err != nil {
 		log.Panic(err)
 	}
-	p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.DataNodeID, 10)
+	p.MsgChannelSubName = name + "-" + strconv.FormatInt(p.NodeID, 10)
 }
 
 func (p *ParamTable) initDefaultPartitionName() {
@@ -431,8 +447,8 @@ func (p *ParamTable) initMinioBucketName() {
 }
 
 func (p *ParamTable) sliceIndex() int {
-	dataNodeID := p.DataNodeID
-	dataNodeIDList := p.DataNodeIDList()
+	dataNodeID := p.NodeID
+	dataNodeIDList := p.dataNodeIDList
 	for i := 0; i < len(dataNodeIDList); i++ {
 		if dataNodeID == dataNodeIDList[i] {
 			return i
diff --git a/internal/datanode/param_table_test.go b/internal/datanode/param_table_test.go
index 9fb934268..6fa47e337 100644
--- a/internal/datanode/param_table_test.go
+++ b/internal/datanode/param_table_test.go
@@ -9,9 +9,9 @@ func TestParamTable_DataNode(t *testing.T) {
 
 	Params.Init()
 
-	t.Run("Test DataNodeID", func(t *testing.T) {
-		id := Params.DataNodeID
-		log.Println("DataNodeID:", id)
+	t.Run("Test NodeID", func(t *testing.T) {
+		id := Params.NodeID
+		log.Println("NodeID:", id)
 	})
 
 	t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) {
@@ -79,31 +79,11 @@ func TestParamTable_DataNode(t *testing.T) {
 		log.Println("DDChannelNames:", names)
 	})
 
-	t.Run("Test DdMsgStreamReceiveBufSize", func(t *testing.T) {
-		bufSize := Params.DDReceiveBufSize
-		log.Println("DDReceiveBufSize:", bufSize)
-	})
-
-	t.Run("Test DdPulsarBufSize", func(t *testing.T) {
-		bufSize := Params.DDPulsarBufSize
-		log.Println("DDPulsarBufSize:", bufSize)
-	})
-
 	t.Run("Test SegmentStatisticsChannelName", func(t *testing.T) {
 		name := Params.SegmentStatisticsChannelName
 		log.Println("SegmentStatisticsChannelName:", name)
 	})
 
-	t.Run("Test SegmentStatisticsBufSize", func(t *testing.T) {
-		size := Params.SegmentStatisticsBufSize
-		log.Println("SegmentStatisticsBufSize:", size)
-	})
-
-	t.Run("Test SegmentStatisticsUpdateInterval", func(t *testing.T) {
-		interval := Params.SegmentStatisticsUpdateInterval
-		log.Println("SegmentStatisticsUpdateInterval:", interval)
-	})
-
 	t.Run("Test timeTickChannelName", func(t *testing.T) {
 		name := Params.TimeTickChannelName
 		log.Println("TimeTickChannelName:", name)
diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go
index 4ce264253..aea5cf076 100644
--- a/internal/dataservice/cluster.go
+++ b/internal/dataservice/cluster.go
@@ -84,7 +84,7 @@ func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) {
 	defer c.mu.Unlock()
 	sort.Slice(c.nodes, func(i, j int) bool { return c.nodes[i].channelNum < c.nodes[j].channelNum })
 	for i, group := range groups {
-		err := c.nodes[i%len(c.nodes)].client.WatchDmChannels(&datapb.WatchDmChannelRequest{
+		_, err := c.nodes[i%len(c.nodes)].client.WatchDmChannels(&datapb.WatchDmChannelRequest{
 			Base: &commonpb.MsgBase{
 				MsgType:   commonpb.MsgType_kDescribeCollection,
 				MsgID:     -1, // todo
@@ -119,7 +119,7 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) {
 	c.mu.RLock()
 	defer c.mu.RUnlock()
 	for _, node := range c.nodes {
-		if err := node.client.FlushSegments(request); err != nil {
+		if _, err := node.client.FlushSegments(request); err != nil {
 			log.Println(err.Error())
 			continue
 		}
diff --git a/internal/distributed/datanode/client.go b/internal/distributed/datanode/client.go
index 2444a932a..4957d6dbf 100644
--- a/internal/distributed/datanode/client.go
+++ b/internal/distributed/datanode/client.go
@@ -6,34 +6,55 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+
+	"google.golang.org/grpc"
 )
 
 type Client struct {
-	ctx context.Context
+	ctx     context.Context
+	grpc    datapb.DataNodeClient
+	conn    *grpc.ClientConn
+	address string
+}
 
-	// GOOSE TODO: add DataNodeClient
+func NewClient(address string) *Client {
+	return &Client{
+		address: address,
+	}
 }
 
 func (c *Client) Init() error {
-	panic("implement me")
+	ctx, cancel := context.WithTimeout(context.Background(), RPCConnectionTimeout)
+	defer cancel()
+	var err error
+	for i := 0; i < Retry; i++ {
+		if c.conn, err = grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock()); err == nil {
+			break
+		}
+	}
+	if err != nil {
+		return err
+	}
+	c.grpc = datapb.NewDataNodeClient(c.conn)
+	return nil
 }
 
 func (c *Client) Start() error {
-	panic("implement me")
+	return nil
 }
 
 func (c *Client) Stop() error {
-	panic("implement me")
+	return c.conn.Close()
 }
 
 func (c *Client) GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
-	panic("implement me")
+	return c.grpc.GetComponentStates(context.Background(), empty)
 }
 
-func (c *Client) WatchDmChannels(in *datapb.WatchDmChannelRequest) error {
-	panic("implement me")
+func (c *Client) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
+	return c.grpc.WatchDmChannels(context.Background(), in)
 }
 
-func (c *Client) FlushSegments(in *datapb.FlushSegRequest) error {
-	panic("implement me")
+func (c *Client) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) {
+	return c.grpc.FlushSegments(context.Background(), in)
 }
diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go
index 9a192a60f..e39a4b8fc 100644
--- a/internal/distributed/datanode/service.go
+++ b/internal/distributed/datanode/service.go
@@ -2,9 +2,13 @@ package datanode
 
 import (
 	"context"
+	"net"
+	"strconv"
 	"sync"
+	"time"
 
-	"github.com/zilliztech/milvus-distributed/internal/datanode"
+	dn "github.com/zilliztech/milvus-distributed/internal/datanode"
+	"github.com/zilliztech/milvus-distributed/internal/errors"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@@ -12,9 +16,13 @@ import (
 	"google.golang.org/grpc"
 )
 
+const (
+	RPCConnectionTimeout = 30 * time.Second
+	Retry                = 3
+)
+
 type Server struct {
-	node datanode.Interface
-	core datanode.DataNode
+	core *dn.DataNode
 
 	grpcServer *grpc.Server
 	grpcError  error
@@ -24,11 +32,43 @@ type Server struct {
 	cancel context.CancelFunc
 }
 
-func NewGrpcServer() (*Server, error) {
-	panic("implement me")
+func New(masterService dn.MasterServiceInterface, dataService dn.DataServiceInterface) (*Server, error) {
+	var s = &Server{}
+
+	s.ctx, s.cancel = context.WithCancel(context.Background())
+	s.core = dn.NewDataNode(s.ctx, 0, masterService, dataService)
+	s.grpcServer = grpc.NewServer()
+	datapb.RegisterDataNodeServer(s.grpcServer, s)
+	addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10)
+	lis, err := net.Listen("tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+
+	go func() {
+		if err = s.grpcServer.Serve(lis); err != nil {
+			s.grpcErrMux.Lock()
+			defer s.grpcErrMux.Unlock()
+			s.grpcError = err
+		}
+	}()
+
+	s.grpcErrMux.Lock()
+	err = s.grpcError
+	s.grpcErrMux.Unlock()
+
+	if err != nil {
+		return nil, err
+	}
+	return s, nil
 }
 
 func (s *Server) Init() error {
+	err := s.core.Init()
+	if err != nil {
+		return errors.Errorf("Init failed: %v", err)
+	}
+
 	return s.core.Init()
 }
 
@@ -41,13 +81,17 @@ func (s *Server) Stop() error {
 }
 
 func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
-	return nil, nil
+	return s.core.GetComponentStates()
 }
 
-func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) error {
-	return s.core.WatchDmChannels(in)
+func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
+	return &commonpb.Status{
+		ErrorCode: commonpb.ErrorCode_SUCCESS,
+	}, s.core.WatchDmChannels(in)
 }
 
-func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) error {
-	return s.core.FlushSegments(in)
+func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) {
+	return &commonpb.Status{
+		ErrorCode: commonpb.ErrorCode_SUCCESS,
+	}, s.core.FlushSegments(in)
 }
diff --git a/internal/proto/data_service.proto b/internal/proto/data_service.proto
index de8238ccd..3df32d9ea 100644
--- a/internal/proto/data_service.proto
+++ b/internal/proto/data_service.proto
@@ -202,3 +202,9 @@ service DataService {
    rpc GetTimeTickChannel(common.Empty) returns(milvus.StringResponse) {}
    rpc GetStatisticsChannel(common.Empty) returns(milvus.StringResponse){}
 }
+
+service DataNode {
+  rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {}
+  rpc WatchDmChannels(WatchDmChannelRequest) returns (common.Status) {}
+  rpc FlushSegments(FlushSegRequest) returns(common.Status) {}
+}
diff --git a/internal/proto/datapb/data_service.pb.go b/internal/proto/datapb/data_service.pb.go
index 00ccde10f..5669f13f4 100644
--- a/internal/proto/datapb/data_service.pb.go
+++ b/internal/proto/datapb/data_service.pb.go
@@ -1548,101 +1548,104 @@ func init() {
 func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) }
 
 var fileDescriptor_3385cd32ad6cfe64 = []byte{
-	// 1496 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0xdd, 0x6e, 0x1b, 0xc5,
-	0x1e, 0xcf, 0x7a, 0xfd, 0x11, 0xff, 0xed, 0x38, 0xee, 0x24, 0x4d, 0x52, 0xb7, 0xa7, 0x4d, 0xf6,
-	0xa8, 0x4d, 0x5a, 0x9d, 0x93, 0x1c, 0xb5, 0x3a, 0x14, 0x6e, 0x10, 0x4d, 0xdd, 0x46, 0x56, 0x9b,
-	0x28, 0x1a, 0x17, 0x2a, 0x7a, 0x63, 0xad, 0xbd, 0x13, 0x67, 0x60, 0x3f, 0xcc, 0xce, 0xb8, 0x49,
-	0x73, 0x03, 0x57, 0x20, 0x21, 0x24, 0xb8, 0x86, 0x6b, 0x5e, 0x00, 0x1e, 0x80, 0x57, 0xe0, 0x31,
-	0x78, 0x0a, 0x84, 0x76, 0x66, 0xf6, 0xcb, 0xde, 0xc4, 0xc6, 0x6d, 0xe9, 0x9d, 0xe7, 0xbf, 0xbf,
-	0xf9, 0x7f, 0x7f, 0x8d, 0x01, 0x59, 0x26, 0x37, 0x3b, 0x8c, 0xf8, 0x2f, 0x69, 0x8f, 0x6c, 0x0f,
-	0x7c, 0x8f, 0x7b, 0xe8, 0x92, 0x43, 0xed, 0x97, 0x43, 0x26, 0x4f, 0xdb, 0x01, 0xa0, 0x51, 0xed,
-	0x79, 0x8e, 0xe3, 0xb9, 0x92, 0xd4, 0xa8, 0x51, 0x97, 0x13, 0xdf, 0x35, 0x6d, 0x75, 0xae, 0x26,
-	0x2f, 0x18, 0x5f, 0xc2, 0x12, 0x26, 0x7d, 0xca, 0x38, 0xf1, 0x0f, 0x3c, 0x8b, 0x60, 0xf2, 0xc5,
-	0x90, 0x30, 0x8e, 0xfe, 0x07, 0xf9, 0xae, 0xc9, 0xc8, 0x9a, 0xb6, 0xae, 0x6d, 0x55, 0xee, 0x5e,
-	0xdb, 0x4e, 0x09, 0x51, 0xec, 0xf7, 0x59, 0x7f, 0xd7, 0x64, 0x04, 0x0b, 0x24, 0x7a, 0x0f, 0x4a,
-	0xa6, 0x65, 0xf9, 0x84, 0xb1, 0xb5, 0xdc, 0x05, 0x97, 0x1e, 0x48, 0x0c, 0x0e, 0xc1, 0xc6, 0xf7,
-	0x1a, 0x2c, 0xa7, 0x35, 0x60, 0x03, 0xcf, 0x65, 0x04, 0xed, 0x42, 0x85, 0xba, 0x94, 0x77, 0x06,
-	0xa6, 0x6f, 0x3a, 0x4c, 0x69, 0xb2, 0x91, 0x66, 0x1a, 0x99, 0xd6, 0x72, 0x29, 0x3f, 0x14, 0x40,
-	0x0c, 0x34, 0xfa, 0x8d, 0xee, 0x41, 0x91, 0x71, 0x93, 0x0f, 0x43, 0x9d, 0xae, 0x66, 0xea, 0xd4,
-	0x16, 0x10, 0xac, 0xa0, 0xc6, 0xef, 0x1a, 0x54, 0xdb, 0xa4, 0xdf, 0x6a, 0x86, 0xce, 0x58, 0x86,
-	0x42, 0xcf, 0x1b, 0xba, 0x5c, 0xe8, 0xb0, 0x80, 0xe5, 0x01, 0xad, 0x43, 0xa5, 0x77, 0x6c, 0xba,
-	0x2e, 0xb1, 0x0f, 0x4c, 0x87, 0x08, 0x01, 0x65, 0x9c, 0x24, 0x21, 0x03, 0xaa, 0x3d, 0xcf, 0xb6,
-	0x49, 0x8f, 0x53, 0xcf, 0x6d, 0x35, 0xd7, 0xf4, 0x75, 0x6d, 0x4b, 0xc7, 0x29, 0x5a, 0xc0, 0x65,
-	0x60, 0xfa, 0x9c, 0x2a, 0x48, 0x5e, 0x40, 0x92, 0x24, 0x74, 0x15, 0xca, 0xc1, 0x8d, 0x8e, 0x1b,
-	0x48, 0x29, 0x08, 0x29, 0xf3, 0x01, 0x41, 0x88, 0xb8, 0x09, 0xb5, 0x08, 0x2b, 0x11, 0x45, 0x81,
-	0x58, 0x88, 0xa8, 0x01, 0xcc, 0xf8, 0x41, 0x03, 0xf4, 0x80, 0x31, 0xda, 0x77, 0x53, 0x86, 0xad,
-	0x40, 0xd1, 0xf5, 0x2c, 0xd2, 0x6a, 0x0a, 0xcb, 0x74, 0xac, 0x4e, 0x81, 0xc8, 0x01, 0x21, 0x7e,
-	0xc7, 0xf7, 0xec, 0xd0, 0xb0, 0xf9, 0x80, 0x80, 0x3d, 0x9b, 0xa0, 0x47, 0xb0, 0xc0, 0x12, 0x4c,
-	0xd8, 0x9a, 0xbe, 0xae, 0x6f, 0x55, 0xee, 0xde, 0xd8, 0x1e, 0x4b, 0xc4, 0xed, 0xa4, 0x30, 0x9c,
-	0xbe, 0x65, 0xfc, 0x96, 0x83, 0x45, 0xf1, 0x5d, 0xea, 0xe5, 0x10, 0x57, 0x38, 0x5a, 0x80, 0x94,
-	0x3a, 0xf2, 0x30, 0x85, 0xa3, 0xa3, 0x00, 0xe9, 0xc9, 0x00, 0x8d, 0xba, 0x3f, 0x3f, 0xd9, 0xfd,
-	0x85, 0x71, 0xf7, 0xdf, 0x80, 0x0a, 0x39, 0x1d, 0x50, 0x9f, 0x74, 0x38, 0x55, 0xee, 0xcd, 0x63,
-	0x90, 0xa4, 0x67, 0xd4, 0x21, 0x89, 0x1c, 0x2b, 0x4d, 0x9d, 0x63, 0xe9, 0xa0, 0xce, 0x4f, 0x0c,
-	0x6a, 0x39, 0x2b, 0xa8, 0x3f, 0x6a, 0xb0, 0x94, 0x0a, 0xaa, 0x2a, 0x9c, 0x03, 0xa8, 0xb3, 0xb4,
-	0x63, 0x83, 0xea, 0x09, 0x62, 0x64, 0x9c, 0x17, 0xa3, 0x18, 0x8a, 0xc7, 0xee, 0xce, 0x56, 0x44,
-	0xa7, 0x50, 0x7d, 0x6c, 0x0f, 0xd9, 0xf1, 0xec, 0x0d, 0x05, 0x41, 0xde, 0xea, 0xb6, 0x9a, 0x42,
-	0xa8, 0x8e, 0xc5, 0xef, 0x69, 0x42, 0x6a, 0x7c, 0xa7, 0x01, 0x6a, 0x1f, 0x7b, 0x27, 0x6d, 0xd2,
-	0x17, 0x06, 0xcd, 0xac, 0xc0, 0xa8, 0xb0, 0xdc, 0xe4, 0xfc, 0xd1, 0xc7, 0xf2, 0xc7, 0xf8, 0x0c,
-	0x96, 0x52, 0xda, 0xa8, 0x20, 0x5d, 0x07, 0x60, 0x92, 0xd4, 0x6a, 0xca, 0xf0, 0xe8, 0x38, 0x41,
-	0x99, 0xcd, 0xe9, 0x47, 0xb0, 0xac, 0xe4, 0x04, 0x1f, 0x08, 0x9b, 0xdd, 0xf6, 0x6b, 0x50, 0x8e,
-	0x94, 0x51, 0x86, 0xc7, 0x04, 0xe3, 0xcf, 0x1c, 0x5c, 0x1e, 0x11, 0xa4, 0xcc, 0xfa, 0x3f, 0x14,
-	0x02, 0x5d, 0xa4, 0xa8, 0xda, 0x79, 0x4d, 0x21, 0xba, 0x88, 0x25, 0x3a, 0x28, 0xb2, 0x9e, 0x4f,
-	0x4c, 0xae, 0x8a, 0x2c, 0x27, 0x8b, 0x4c, 0x92, 0x44, 0x91, 0xdd, 0x80, 0x0a, 0x23, 0xa6, 0x4d,
-	0x2c, 0x09, 0xd0, 0x25, 0x40, 0x92, 0x04, 0x60, 0x03, 0xaa, 0x47, 0x41, 0xbe, 0x85, 0x88, 0xbc,
-	0x40, 0x54, 0x14, 0x4d, 0x40, 0x9e, 0xc0, 0x22, 0xe3, 0xa6, 0xcf, 0x3b, 0x03, 0x8f, 0x89, 0xe8,
-	0xb0, 0xb5, 0x42, 0x56, 0x59, 0x44, 0x43, 0x65, 0x9f, 0xf5, 0x0f, 0x15, 0x14, 0xd7, 0xc4, 0xd5,
-	0xf0, 0xc8, 0xd0, 0x1e, 0x2c, 0x10, 0xd7, 0x4a, 0xb0, 0x2a, 0x4e, 0xcd, 0xaa, 0x4a, 0x5c, 0x2b,
-	0x66, 0x34, 0x4b, 0xfb, 0x30, 0x28, 0xac, 0xb6, 0x5c, 0x46, 0x7c, 0xbe, 0x4b, 0x5d, 0xdb, 0xeb,
-	0x1f, 0x9a, 0xfc, 0xf8, 0x6d, 0xc5, 0xfa, 0x67, 0x0d, 0xae, 0x8c, 0xca, 0x8a, 0xe3, 0xdd, 0x80,
-	0xf9, 0x23, 0x4a, 0x6c, 0x2b, 0x4e, 0xe2, 0xe8, 0x8c, 0xee, 0x43, 0x61, 0x10, 0x80, 0xd7, 0x72,
-	0xc2, 0x35, 0xe7, 0x8d, 0xee, 0x36, 0xf7, 0xa9, 0xdb, 0x7f, 0x4a, 0x19, 0xc7, 0x12, 0x9f, 0x70,
-	0x89, 0x3e, 0xbd, 0x4b, 0xbe, 0xd2, 0x60, 0x59, 0xea, 0xf9, 0x50, 0x4e, 0x86, 0xb7, 0xdb, 0x79,
-	0x32, 0x66, 0xb9, 0xe1, 0xc0, 0xe5, 0xe7, 0x26, 0xef, 0x1d, 0x37, 0x9d, 0xd7, 0x56, 0x21, 0x10,
-	0x17, 0x0f, 0x38, 0xe9, 0xc2, 0x32, 0x4e, 0xd1, 0x8c, 0x9f, 0x34, 0x58, 0x14, 0x3d, 0xb6, 0x4d,
-	0xfa, 0xff, 0xb8, 0xb1, 0x23, 0x0d, 0x2c, 0x3f, 0xda, 0xc0, 0x8c, 0x3f, 0x72, 0x50, 0x51, 0xa5,
-	0xde, 0x72, 0x8f, 0xbc, 0x74, 0x96, 0x69, 0x23, 0x59, 0xf6, 0x66, 0x7a, 0x2d, 0xda, 0x84, 0x45,
-	0x2a, 0x52, 0xa0, 0xa3, 0x1c, 0x25, 0x15, 0x2b, 0xe3, 0x1a, 0x4d, 0x66, 0x86, 0x18, 0xbf, 0xde,
-	0x80, 0xb8, 0xb2, 0x55, 0x14, 0x44, 0xab, 0x98, 0x0f, 0x08, 0x59, 0xbd, 0xa6, 0x38, 0xb1, 0xd7,
-	0x94, 0xc6, 0x7b, 0xcd, 0x15, 0x98, 0x77, 0x87, 0x4e, 0xc7, 0xf7, 0x4e, 0x98, 0x18, 0xef, 0x3a,
-	0x2e, 0xb9, 0x43, 0x07, 0x7b, 0x27, 0x2c, 0xf8, 0xe4, 0x10, 0xa7, 0xc3, 0xe8, 0x99, 0x9c, 0xeb,
-	0x3a, 0x2e, 0x39, 0xc4, 0x69, 0xd3, 0xb3, 0x44, 0xf7, 0x84, 0xbf, 0xd3, 0x3d, 0x8d, 0x53, 0x00,
-	0x45, 0xde, 0x67, 0xfd, 0x19, 0x52, 0xe0, 0x7d, 0x28, 0xa9, 0x48, 0xa8, 0x61, 0x73, 0xfd, 0x7c,
-	0xc1, 0x41, 0x2c, 0x71, 0x08, 0x0f, 0x66, 0xed, 0xca, 0xc3, 0x28, 0x46, 0x81, 0x52, 0xaf, 0x31,
-	0x73, 0x56, 0xa1, 0x64, 0x75, 0xe5, 0xbe, 0x23, 0x77, 0xbc, 0xa2, 0xd5, 0x15, 0xfb, 0xd0, 0x26,
-	0x2c, 0xc6, 0x89, 0x20, 0x01, 0xba, 0x00, 0xd4, 0x62, 0xb2, 0xd8, 0x88, 0xbe, 0xd1, 0x60, 0x75,
-	0x4c, 0x1d, 0xd5, 0xa9, 0xee, 0x4b, 0xdf, 0x86, 0xab, 0xd0, 0x46, 0xa6, 0x42, 0x4f, 0xc8, 0xab,
-	0x4f, 0x4c, 0x7b, 0x48, 0x0e, 0x4d, 0xea, 0x4b, 0xef, 0xce, 0x38, 0x89, 0x7f, 0xd1, 0xe0, 0xf2,
-	0x61, 0x98, 0x99, 0xef, 0xda, 0x2f, 0x19, 0x0b, 0x65, 0x3e, 0x6b, 0xa1, 0xfc, 0x5a, 0x83, 0x95,
-	0x51, 0xa5, 0xdf, 0x89, 0xf7, 0xf6, 0xa1, 0xf6, 0x38, 0x98, 0x22, 0xa2, 0xbb, 0xed, 0x13, 0x6e,
-	0xa2, 0x35, 0x28, 0xa9, 0xb9, 0xa2, 0x7a, 0x47, 0x78, 0x0c, 0x8a, 0xb1, 0x2b, 0x06, 0x53, 0x27,
-	0x1e, 0x36, 0x65, 0x5c, 0xe9, 0xc6, 0xc3, 0xca, 0xf8, 0x56, 0x83, 0xba, 0x4a, 0xdf, 0x98, 0xe3,
-	0xc5, 0xfd, 0xe8, 0x5f, 0x00, 0x94, 0x75, 0x54, 0x45, 0x0b, 0xd5, 0xe7, 0x71, 0x99, 0xb2, 0xc7,
-	0x92, 0x80, 0x3e, 0x80, 0xa2, 0x90, 0x1f, 0x6e, 0x10, 0x1b, 0x19, 0x05, 0x93, 0xb6, 0x00, 0xab,
-	0x0b, 0xc6, 0xc7, 0x50, 0x6d, 0x36, 0x9f, 0xc6, 0x7a, 0x8c, 0x76, 0x3e, 0x2d, 0xa3, 0xf3, 0x4d,
-	0xb6, 0xf1, 0x8e, 0x27, 0xde, 0xac, 0x51, 0x6b, 0x40, 0x8b, 0x51, 0xf7, 0x3d, 0xf0, 0x5c, 0x52,
-	0x9f, 0x43, 0x4b, 0xe2, 0xb9, 0x25, 0x09, 0xfc, 0xd1, 0x29, 0x65, 0xbc, 0xae, 0x21, 0x04, 0x35,
-	0x45, 0xdc, 0xf3, 0xbd, 0x13, 0xea, 0xf6, 0xeb, 0x39, 0x74, 0x09, 0x16, 0x42, 0x4e, 0xa2, 0xe5,
-	0xd5, 0xf5, 0x04, 0x4c, 0x39, 0xa0, 0x9e, 0xbf, 0xfb, 0x6b, 0x19, 0x2a, 0x4d, 0x93, 0x9b, 0x6d,
-	0xf9, 0x6f, 0x04, 0x32, 0xa1, 0x9a, 0x7c, 0xc6, 0xa3, 0x5b, 0x19, 0x2e, 0xc9, 0xf8, 0xa7, 0xa1,
-	0xb1, 0x39, 0x11, 0x27, 0x53, 0xd0, 0x98, 0x43, 0x7b, 0x50, 0x10, 0xf2, 0x51, 0x56, 0x63, 0x4c,
-	0xbe, 0x36, 0x1a, 0x17, 0x65, 0x99, 0x31, 0x87, 0xba, 0xb0, 0x18, 0x3d, 0x9c, 0x54, 0xc0, 0x6f,
-	0x66, 0xb0, 0x1c, 0x7f, 0x31, 0x37, 0x6e, 0x4d, 0x82, 0x45, 0xca, 0x76, 0xa0, 0x9a, 0xd8, 0xfb,
-	0x59, 0xa6, 0x80, 0xf1, 0x67, 0x4a, 0xa6, 0x80, 0x8c, 0xf7, 0x83, 0x31, 0x87, 0xfa, 0x50, 0xdf,
-	0x23, 0x3c, 0xb5, 0x86, 0xa3, 0xcd, 0x09, 0x13, 0x23, 0xec, 0x42, 0x8d, 0xad, 0xc9, 0xc0, 0x48,
-	0x90, 0x0f, 0xcb, 0x7b, 0x84, 0x8f, 0xed, 0x80, 0xe8, 0x4e, 0x06, 0x8f, 0x73, 0xb6, 0xd2, 0xc6,
-	0x7f, 0xa6, 0xc0, 0x26, 0x65, 0x9a, 0x70, 0x29, 0x92, 0x19, 0x4d, 0xed, 0xcd, 0x73, 0x99, 0xa4,
-	0xf7, 0xad, 0xc6, 0xe4, 0x55, 0x53, 0x98, 0xb5, 0xba, 0x47, 0x78, 0x7a, 0x5c, 0x50, 0xc6, 0x69,
-	0x8f, 0xa1, 0xdb, 0x19, 0x82, 0xb2, 0xc7, 0x5c, 0xe3, 0xce, 0x34, 0xd0, 0xc8, 0x2c, 0x0f, 0x56,
-	0xf6, 0x08, 0x4f, 0xf5, 0x58, 0x25, 0x32, 0x2b, 0x20, 0x99, 0x03, 0xa4, 0x71, 0x7b, 0x0a, 0x64,
-	0x24, 0xf0, 0x05, 0x20, 0x61, 0xa4, 0x33, 0xf0, 0xdc, 0x38, 0x4d, 0x1a, 0x99, 0xe5, 0xf1, 0xc8,
-	0x19, 0xf0, 0x57, 0xa3, 0x09, 0x18, 0xf9, 0x6e, 0x84, 0x87, 0x31, 0x87, 0x9e, 0x0b, 0xde, 0xc1,
-	0xba, 0xf3, 0x8c, 0xf6, 0x3e, 0x57, 0x21, 0xb8, 0x90, 0xf7, 0xbf, 0xd3, 0xdf, 0xd4, 0x41, 0x46,
-	0x25, 0xa1, 0xf4, 0xa7, 0x22, 0xe1, 0x62, 0xe7, 0xbc, 0x39, 0xd6, 0xbb, 0x1f, 0xbd, 0xf8, 0xb0,
-	0x4f, 0xf9, 0xf1, 0xb0, 0x1b, 0xdc, 0xde, 0x39, 0xa3, 0xb6, 0x4d, 0xcf, 0x38, 0xe9, 0x1d, 0xef,
-	0xc8, 0x0b, 0xff, 0xb5, 0x28, 0xe3, 0x3e, 0xed, 0x0e, 0x39, 0xb1, 0x76, 0x42, 0xd3, 0x77, 0x04,
-	0xcb, 0x9d, 0xc0, 0xd1, 0x83, 0x6e, 0xb7, 0x28, 0x4e, 0xf7, 0xfe, 0x0a, 0x00, 0x00, 0xff, 0xff,
-	0x6d, 0x7e, 0x0b, 0xe1, 0x8c, 0x15, 0x00, 0x00,
+	// 1541 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0xdf, 0x6e, 0x1b, 0x45,
+	0x17, 0xcf, 0x7a, 0xfd, 0xf7, 0xd8, 0xb1, 0xdd, 0x49, 0x9a, 0xa4, 0x6e, 0xbf, 0x36, 0xd9, 0x4f,
+	0x6d, 0xd2, 0xea, 0xfb, 0x12, 0x94, 0x0a, 0x0a, 0x37, 0x88, 0xa6, 0x6e, 0x23, 0xab, 0x4d, 0x14,
+	0x8d, 0x0b, 0x15, 0xbd, 0xb1, 0xd6, 0xf6, 0xc4, 0x19, 0xf0, 0xee, 0x9a, 0x9d, 0x71, 0x93, 0xe6,
+	0x06, 0xae, 0x40, 0x42, 0x48, 0x70, 0xc5, 0x05, 0x5c, 0xf3, 0x02, 0xf0, 0x00, 0xbc, 0x02, 0x8f,
+	0xc1, 0x53, 0x20, 0xb4, 0x33, 0xb3, 0xff, 0xec, 0x75, 0x6c, 0xdc, 0x96, 0xde, 0x79, 0xce, 0xfe,
+	0xe6, 0x9c, 0x33, 0xe7, 0x9c, 0xf9, 0x9d, 0x33, 0x06, 0xd4, 0x35, 0xb9, 0xd9, 0x62, 0xc4, 0x7d,
+	0x41, 0x3b, 0x64, 0x7b, 0xe0, 0x3a, 0xdc, 0x41, 0x97, 0x2c, 0xda, 0x7f, 0x31, 0x64, 0x72, 0xb5,
+	0xed, 0x01, 0x6a, 0xa5, 0x8e, 0x63, 0x59, 0x8e, 0x2d, 0x45, 0xb5, 0x32, 0xb5, 0x39, 0x71, 0x6d,
+	0xb3, 0xaf, 0xd6, 0xa5, 0xe8, 0x06, 0xe3, 0x4b, 0x58, 0xc2, 0xa4, 0x47, 0x19, 0x27, 0xee, 0xa1,
+	0xd3, 0x25, 0x98, 0x7c, 0x31, 0x24, 0x8c, 0xa3, 0x77, 0x20, 0xdd, 0x36, 0x19, 0x59, 0xd3, 0xd6,
+	0xb5, 0xad, 0xe2, 0xee, 0xb5, 0xed, 0x98, 0x11, 0xa5, 0xfe, 0x80, 0xf5, 0xf6, 0x4c, 0x46, 0xb0,
+	0x40, 0xa2, 0xf7, 0x20, 0x67, 0x76, 0xbb, 0x2e, 0x61, 0x6c, 0x2d, 0x75, 0xc1, 0xa6, 0xfb, 0x12,
+	0x83, 0x7d, 0xb0, 0xf1, 0xbd, 0x06, 0xcb, 0x71, 0x0f, 0xd8, 0xc0, 0xb1, 0x19, 0x41, 0x7b, 0x50,
+	0xa4, 0x36, 0xe5, 0xad, 0x81, 0xe9, 0x9a, 0x16, 0x53, 0x9e, 0x6c, 0xc4, 0x95, 0x06, 0x47, 0x6b,
+	0xd8, 0x94, 0x1f, 0x09, 0x20, 0x06, 0x1a, 0xfc, 0x46, 0x77, 0x21, 0xcb, 0xb8, 0xc9, 0x87, 0xbe,
+	0x4f, 0x57, 0x13, 0x7d, 0x6a, 0x0a, 0x08, 0x56, 0x50, 0xe3, 0x0f, 0x0d, 0x4a, 0x4d, 0xd2, 0x6b,
+	0xd4, 0xfd, 0x60, 0x2c, 0x43, 0xa6, 0xe3, 0x0c, 0x6d, 0x2e, 0x7c, 0x58, 0xc4, 0x72, 0x81, 0xd6,
+	0xa1, 0xd8, 0x39, 0x31, 0x6d, 0x9b, 0xf4, 0x0f, 0x4d, 0x8b, 0x08, 0x03, 0x05, 0x1c, 0x15, 0x21,
+	0x03, 0x4a, 0x1d, 0xa7, 0xdf, 0x27, 0x1d, 0x4e, 0x1d, 0xbb, 0x51, 0x5f, 0xd3, 0xd7, 0xb5, 0x2d,
+	0x1d, 0xc7, 0x64, 0x9e, 0x96, 0x81, 0xe9, 0x72, 0xaa, 0x20, 0x69, 0x01, 0x89, 0x8a, 0xd0, 0x55,
+	0x28, 0x78, 0x3b, 0x5a, 0xb6, 0x67, 0x25, 0x23, 0xac, 0xe4, 0x3d, 0x81, 0x30, 0x71, 0x13, 0xca,
+	0x01, 0x56, 0x22, 0xb2, 0x02, 0xb1, 0x18, 0x48, 0x3d, 0x98, 0xf1, 0x83, 0x06, 0xe8, 0x3e, 0x63,
+	0xb4, 0x67, 0xc7, 0x0e, 0xb6, 0x02, 0x59, 0xdb, 0xe9, 0x92, 0x46, 0x5d, 0x9c, 0x4c, 0xc7, 0x6a,
+	0xe5, 0x99, 0x1c, 0x10, 0xe2, 0xb6, 0x5c, 0xa7, 0xef, 0x1f, 0x2c, 0xef, 0x09, 0xb0, 0xd3, 0x27,
+	0xe8, 0x21, 0x2c, 0xb2, 0x88, 0x12, 0xb6, 0xa6, 0xaf, 0xeb, 0x5b, 0xc5, 0xdd, 0x1b, 0xdb, 0x63,
+	0x85, 0xb8, 0x1d, 0x35, 0x86, 0xe3, 0xbb, 0x8c, 0xdf, 0x53, 0x50, 0x11, 0xdf, 0xa5, 0x5f, 0x16,
+	0xb1, 0x45, 0xa0, 0x05, 0x48, 0xb9, 0x23, 0x17, 0x33, 0x04, 0x3a, 0x48, 0x90, 0x1e, 0x4d, 0xd0,
+	0x68, 0xf8, 0xd3, 0xd3, 0xc3, 0x9f, 0x19, 0x0f, 0xff, 0x0d, 0x28, 0x92, 0xb3, 0x01, 0x75, 0x49,
+	0x8b, 0x53, 0x15, 0xde, 0x34, 0x06, 0x29, 0x7a, 0x4a, 0x2d, 0x12, 0xa9, 0xb1, 0xdc, 0xcc, 0x35,
+	0x16, 0x4f, 0x6a, 0x7e, 0x6a, 0x52, 0x0b, 0x49, 0x49, 0xfd, 0x49, 0x83, 0xa5, 0x58, 0x52, 0xd5,
+	0xc5, 0x39, 0x84, 0x2a, 0x8b, 0x07, 0xd6, 0xbb, 0x3d, 0x5e, 0x8e, 0x8c, 0x49, 0x39, 0x0a, 0xa1,
+	0x78, 0x6c, 0xef, 0x7c, 0x97, 0xe8, 0x0c, 0x4a, 0x8f, 0xfa, 0x43, 0x76, 0x32, 0x3f, 0xa1, 0x20,
+	0x48, 0x77, 0xdb, 0x8d, 0xba, 0x30, 0xaa, 0x63, 0xf1, 0x7b, 0x96, 0x94, 0x1a, 0xdf, 0x69, 0x80,
+	0x9a, 0x27, 0xce, 0x69, 0x93, 0xf4, 0xc4, 0x81, 0xe6, 0x76, 0x60, 0xd4, 0x58, 0x6a, 0x7a, 0xfd,
+	0xe8, 0x63, 0xf5, 0x63, 0x7c, 0x06, 0x4b, 0x31, 0x6f, 0x54, 0x92, 0xae, 0x03, 0x30, 0x29, 0x6a,
+	0xd4, 0x65, 0x7a, 0x74, 0x1c, 0x91, 0xcc, 0x17, 0xf4, 0x63, 0x58, 0x56, 0x76, 0xbc, 0x0f, 0x84,
+	0xcd, 0x7f, 0xf6, 0x6b, 0x50, 0x08, 0x9c, 0x51, 0x07, 0x0f, 0x05, 0xc6, 0x5f, 0x29, 0xb8, 0x3c,
+	0x62, 0x48, 0x1d, 0xeb, 0x5d, 0xc8, 0x78, 0xbe, 0x48, 0x53, 0xe5, 0x49, 0xa4, 0x10, 0x6c, 0xc4,
+	0x12, 0xed, 0x5d, 0xb2, 0x8e, 0x4b, 0x4c, 0xae, 0x2e, 0x59, 0x4a, 0x5e, 0x32, 0x29, 0x12, 0x97,
+	0xec, 0x06, 0x14, 0x19, 0x31, 0xfb, 0xa4, 0x2b, 0x01, 0xba, 0x04, 0x48, 0x91, 0x00, 0x6c, 0x40,
+	0xe9, 0xd8, 0xab, 0x37, 0x1f, 0x91, 0x16, 0x88, 0xa2, 0x92, 0x09, 0xc8, 0x63, 0xa8, 0x30, 0x6e,
+	0xba, 0xbc, 0x35, 0x70, 0x98, 0xc8, 0x0e, 0x5b, 0xcb, 0x24, 0x5d, 0x8b, 0xa0, 0xa9, 0x1c, 0xb0,
+	0xde, 0x91, 0x82, 0xe2, 0xb2, 0xd8, 0xea, 0x2f, 0x19, 0xda, 0x87, 0x45, 0x62, 0x77, 0x23, 0xaa,
+	0xb2, 0x33, 0xab, 0x2a, 0x11, 0xbb, 0x1b, 0x2a, 0x9a, 0x87, 0x3e, 0x0c, 0x0a, 0xab, 0x0d, 0x9b,
+	0x11, 0x97, 0xef, 0x51, 0xbb, 0xef, 0xf4, 0x8e, 0x4c, 0x7e, 0xf2, 0xa6, 0x72, 0xfd, 0x8b, 0x06,
+	0x57, 0x46, 0x6d, 0x85, 0xf9, 0xae, 0x41, 0xfe, 0x98, 0x92, 0x7e, 0x37, 0x2c, 0xe2, 0x60, 0x8d,
+	0xee, 0x41, 0x66, 0xe0, 0x81, 0xd7, 0x52, 0x22, 0x34, 0x93, 0x5a, 0x77, 0x93, 0xbb, 0xd4, 0xee,
+	0x3d, 0xa1, 0x8c, 0x63, 0x89, 0x8f, 0x84, 0x44, 0x9f, 0x3d, 0x24, 0x5f, 0x69, 0xb0, 0x2c, 0xfd,
+	0x7c, 0x20, 0x3b, 0xc3, 0x9b, 0x65, 0x9e, 0x84, 0x5e, 0x6e, 0x58, 0x70, 0xf9, 0x99, 0xc9, 0x3b,
+	0x27, 0x75, 0xeb, 0x95, 0x5d, 0xf0, 0xcc, 0x85, 0x0d, 0x4e, 0x86, 0xb0, 0x80, 0x63, 0x32, 0xe3,
+	0x67, 0x0d, 0x2a, 0x82, 0x63, 0x9b, 0xa4, 0xf7, 0xaf, 0x1f, 0x76, 0x84, 0xc0, 0xd2, 0xa3, 0x04,
+	0x66, 0xfc, 0x99, 0x82, 0xa2, 0xba, 0xea, 0x0d, 0xfb, 0xd8, 0x89, 0x57, 0x99, 0x36, 0x52, 0x65,
+	0xaf, 0x87, 0x6b, 0xd1, 0x26, 0x54, 0xa8, 0x28, 0x81, 0x96, 0x0a, 0x94, 0x74, 0xac, 0x80, 0xcb,
+	0x34, 0x5a, 0x19, 0xa2, 0xfd, 0x3a, 0x03, 0x62, 0x4b, 0xaa, 0xc8, 0x08, 0xaa, 0xc8, 0x7b, 0x82,
+	0x24, 0xae, 0xc9, 0x4e, 0xe5, 0x9a, 0xdc, 0x38, 0xd7, 0x5c, 0x81, 0xbc, 0x3d, 0xb4, 0x5a, 0xae,
+	0x73, 0xca, 0x44, 0x7b, 0xd7, 0x71, 0xce, 0x1e, 0x5a, 0xd8, 0x39, 0x65, 0xde, 0x27, 0x8b, 0x58,
+	0x2d, 0x46, 0xcf, 0x65, 0x5f, 0xd7, 0x71, 0xce, 0x22, 0x56, 0x93, 0x9e, 0x47, 0xd8, 0x13, 0xfe,
+	0x09, 0x7b, 0x1a, 0x67, 0x00, 0x4a, 0x7c, 0xc0, 0x7a, 0x73, 0x94, 0xc0, 0xfb, 0x90, 0x53, 0x99,
+	0x50, 0xcd, 0xe6, 0xfa, 0x64, 0xc3, 0x5e, 0x2e, 0xb1, 0x0f, 0xf7, 0x7a, 0xed, 0xca, 0x83, 0x20,
+	0x47, 0x9e, 0x53, 0xaf, 0xd0, 0x73, 0x56, 0x21, 0xd7, 0x6d, 0xcb, 0x79, 0x47, 0xce, 0x78, 0xd9,
+	0x6e, 0x5b, 0xcc, 0x43, 0x9b, 0x50, 0x09, 0x0b, 0x41, 0x02, 0x74, 0x01, 0x28, 0x87, 0x62, 0x31,
+	0x11, 0x7d, 0xa3, 0xc1, 0xea, 0x98, 0x3b, 0x8a, 0xa9, 0xee, 0xc9, 0xd8, 0xfa, 0xa3, 0xd0, 0x46,
+	0xa2, 0x43, 0x8f, 0xc9, 0xcb, 0x4f, 0xcc, 0xfe, 0x90, 0x1c, 0x99, 0xd4, 0x95, 0xd1, 0x9d, 0xb3,
+	0x13, 0xff, 0xaa, 0xc1, 0xe5, 0x23, 0xbf, 0x32, 0xdf, 0x76, 0x5c, 0x12, 0x06, 0xca, 0x74, 0xd2,
+	0x40, 0xf9, 0xb5, 0x06, 0x2b, 0xa3, 0x4e, 0xbf, 0x95, 0xe8, 0x1d, 0x40, 0xf9, 0x91, 0xd7, 0x45,
+	0x04, 0xbb, 0x1d, 0x10, 0x6e, 0xa2, 0x35, 0xc8, 0xa9, 0xbe, 0xa2, 0xb8, 0xc3, 0x5f, 0x7a, 0x97,
+	0xb1, 0x2d, 0x1a, 0x53, 0x2b, 0x6c, 0x36, 0x05, 0x5c, 0x6c, 0x87, 0xcd, 0xca, 0xf8, 0x56, 0x83,
+	0xaa, 0x2a, 0xdf, 0x50, 0xe3, 0xc5, 0x7c, 0xf4, 0x1f, 0x00, 0xca, 0x5a, 0xea, 0x46, 0x0b, 0xd7,
+	0xf3, 0xb8, 0x40, 0xd9, 0x23, 0x29, 0x40, 0x1f, 0x40, 0x56, 0xd8, 0xf7, 0x27, 0x88, 0x8d, 0x84,
+	0x0b, 0x13, 0x3f, 0x01, 0x56, 0x1b, 0x8c, 0x8f, 0xa1, 0x54, 0xaf, 0x3f, 0x09, 0xfd, 0x18, 0x65,
+	0x3e, 0x2d, 0x81, 0xf9, 0xa6, 0x9f, 0xf1, 0x8e, 0x23, 0xde, 0xac, 0x01, 0x35, 0xa0, 0x4a, 0xc0,
+	0xbe, 0x87, 0x8e, 0x4d, 0xaa, 0x0b, 0x68, 0x49, 0x3c, 0xb7, 0xa4, 0x80, 0x3f, 0x3c, 0xa3, 0x8c,
+	0x57, 0x35, 0x84, 0xa0, 0xac, 0x84, 0xfb, 0xae, 0x73, 0x4a, 0xed, 0x5e, 0x35, 0x85, 0x2e, 0xc1,
+	0xa2, 0xaf, 0x49, 0x50, 0x5e, 0x55, 0x8f, 0xc0, 0x54, 0x00, 0xaa, 0xe9, 0xdd, 0xdf, 0x0a, 0x50,
+	0xac, 0x9b, 0xdc, 0x6c, 0xca, 0x7f, 0x23, 0x90, 0x09, 0xa5, 0xe8, 0x33, 0x1e, 0xdd, 0x4a, 0x08,
+	0x49, 0xc2, 0x3f, 0x0d, 0xb5, 0xcd, 0xa9, 0x38, 0x59, 0x82, 0xc6, 0x02, 0xda, 0x87, 0x8c, 0xb0,
+	0x8f, 0x92, 0x88, 0x31, 0xfa, 0xda, 0xa8, 0x5d, 0x54, 0x65, 0xc6, 0x02, 0x6a, 0x43, 0x25, 0x78,
+	0x38, 0xa9, 0x84, 0xdf, 0x4c, 0x50, 0x39, 0xfe, 0x62, 0xae, 0xdd, 0x9a, 0x06, 0x0b, 0x9c, 0x6d,
+	0x41, 0x29, 0x32, 0xf7, 0xb3, 0x44, 0x03, 0xe3, 0xcf, 0x94, 0x44, 0x03, 0x09, 0xef, 0x07, 0x63,
+	0x01, 0xf5, 0xa0, 0xba, 0x4f, 0x78, 0x6c, 0x0c, 0x47, 0x9b, 0x53, 0x3a, 0x86, 0xcf, 0x42, 0xb5,
+	0xad, 0xe9, 0xc0, 0xc0, 0x90, 0x0b, 0xcb, 0xfb, 0x84, 0x8f, 0xcd, 0x80, 0xe8, 0x4e, 0x82, 0x8e,
+	0x09, 0x53, 0x69, 0xed, 0x7f, 0x33, 0x60, 0xa3, 0x36, 0x4d, 0xb8, 0x14, 0xd8, 0x0c, 0xba, 0xf6,
+	0xe6, 0x44, 0x25, 0xf1, 0x79, 0xab, 0x36, 0x7d, 0xd4, 0x14, 0xc7, 0x5a, 0xdd, 0x27, 0x3c, 0xde,
+	0x2e, 0x28, 0xe3, 0xb4, 0xc3, 0xd0, 0xed, 0x04, 0x43, 0xc9, 0x6d, 0xae, 0x76, 0x67, 0x16, 0x68,
+	0x70, 0x2c, 0x07, 0x56, 0xf6, 0x09, 0x8f, 0x71, 0xac, 0x32, 0x99, 0x94, 0x90, 0xc4, 0x06, 0x52,
+	0xbb, 0x3d, 0x03, 0x32, 0x30, 0xf8, 0x1c, 0x90, 0x38, 0xa4, 0x35, 0x70, 0xec, 0xb0, 0x4c, 0x6a,
+	0x89, 0xd7, 0xe3, 0xa1, 0x35, 0xe0, 0x2f, 0x47, 0x0b, 0x30, 0x88, 0xdd, 0x88, 0x0e, 0x63, 0x01,
+	0x3d, 0x13, 0xba, 0xbd, 0x71, 0xe7, 0x29, 0xed, 0x7c, 0xae, 0x52, 0x70, 0xa1, 0xee, 0xff, 0xc6,
+	0xbf, 0xa9, 0x85, 0xcc, 0x4a, 0xc4, 0xe9, 0x4f, 0x45, 0xc1, 0x85, 0xc1, 0x79, 0x7d, 0xaa, 0x77,
+	0x7f, 0x4c, 0x41, 0xde, 0x63, 0x2d, 0x41, 0x51, 0x6f, 0x32, 0x38, 0xcf, 0xa1, 0x12, 0x7f, 0x0b,
+	0x24, 0xa7, 0x38, 0xf1, 0xbd, 0x30, 0x8d, 0xbe, 0x30, 0x2c, 0xfa, 0x73, 0xbf, 0xe4, 0x16, 0x63,
+	0x12, 0x1f, 0x86, 0x2f, 0x83, 0x29, 0x3a, 0xf7, 0x3e, 0x7a, 0xfe, 0x61, 0x8f, 0xf2, 0x93, 0x61,
+	0xdb, 0xfb, 0xb2, 0x73, 0x4e, 0xfb, 0x7d, 0x7a, 0xce, 0x49, 0xe7, 0x64, 0x47, 0xee, 0xfa, 0x7f,
+	0x97, 0x32, 0xee, 0xd2, 0xf6, 0x90, 0x93, 0xee, 0x8e, 0x7f, 0xec, 0x1d, 0xa1, 0x6a, 0xc7, 0x33,
+	0x37, 0x68, 0xb7, 0xb3, 0x62, 0x75, 0xf7, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x78, 0xd6, 0xcd,
+	0x18, 0xa5, 0x16, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -2120,3 +2123,147 @@ var _DataService_serviceDesc = grpc.ServiceDesc{
 	Streams:  []grpc.StreamDesc{},
 	Metadata: "data_service.proto",
 }
+
+// DataNodeClient is the client API for DataNode service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type DataNodeClient interface {
+	GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error)
+	WatchDmChannels(ctx context.Context, in *WatchDmChannelRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
+	FlushSegments(ctx context.Context, in *FlushSegRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
+}
+
+type dataNodeClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewDataNodeClient(cc *grpc.ClientConn) DataNodeClient {
+	return &dataNodeClient{cc}
+}
+
+func (c *dataNodeClient) GetComponentStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*internalpb2.ComponentStates, error) {
+	out := new(internalpb2.ComponentStates)
+	err := c.cc.Invoke(ctx, "/milvus.proto.data.DataNode/GetComponentStates", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *dataNodeClient) WatchDmChannels(ctx context.Context, in *WatchDmChannelRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
+	out := new(commonpb.Status)
+	err := c.cc.Invoke(ctx, "/milvus.proto.data.DataNode/WatchDmChannels", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *dataNodeClient) FlushSegments(ctx context.Context, in *FlushSegRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
+	out := new(commonpb.Status)
+	err := c.cc.Invoke(ctx, "/milvus.proto.data.DataNode/FlushSegments", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// DataNodeServer is the server API for DataNode service.
+type DataNodeServer interface {
+	GetComponentStates(context.Context, *commonpb.Empty) (*internalpb2.ComponentStates, error)
+	WatchDmChannels(context.Context, *WatchDmChannelRequest) (*commonpb.Status, error)
+	FlushSegments(context.Context, *FlushSegRequest) (*commonpb.Status, error)
+}
+
+// UnimplementedDataNodeServer can be embedded to have forward compatible implementations.
+type UnimplementedDataNodeServer struct {
+}
+
+func (*UnimplementedDataNodeServer) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method GetComponentStates not implemented")
+}
+func (*UnimplementedDataNodeServer) WatchDmChannels(ctx context.Context, req *WatchDmChannelRequest) (*commonpb.Status, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method WatchDmChannels not implemented")
+}
+func (*UnimplementedDataNodeServer) FlushSegments(ctx context.Context, req *FlushSegRequest) (*commonpb.Status, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method FlushSegments not implemented")
+}
+
+func RegisterDataNodeServer(s *grpc.Server, srv DataNodeServer) {
+	s.RegisterService(&_DataNode_serviceDesc, srv)
+}
+
+func _DataNode_GetComponentStates_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(commonpb.Empty)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(DataNodeServer).GetComponentStates(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/milvus.proto.data.DataNode/GetComponentStates",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(DataNodeServer).GetComponentStates(ctx, req.(*commonpb.Empty))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+func _DataNode_WatchDmChannels_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(WatchDmChannelRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(DataNodeServer).WatchDmChannels(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/milvus.proto.data.DataNode/WatchDmChannels",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(DataNodeServer).WatchDmChannels(ctx, req.(*WatchDmChannelRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+func _DataNode_FlushSegments_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(FlushSegRequest)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(DataNodeServer).FlushSegments(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/milvus.proto.data.DataNode/FlushSegments",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(DataNodeServer).FlushSegments(ctx, req.(*FlushSegRequest))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _DataNode_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "milvus.proto.data.DataNode",
+	HandlerType: (*DataNodeServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "GetComponentStates",
+			Handler:    _DataNode_GetComponentStates_Handler,
+		},
+		{
+			MethodName: "WatchDmChannels",
+			Handler:    _DataNode_WatchDmChannels_Handler,
+		},
+		{
+			MethodName: "FlushSegments",
+			Handler:    _DataNode_FlushSegments_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "data_service.proto",
+}
-- 
GitLab