diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go
index a8ebfa6eebb7c9a712d3a56f02857a71debdfee9..aa7ae78ab39c5eb08077a0531a241061a2c864b1 100644
--- a/internal/datacoord/cluster.go
+++ b/internal/datacoord/cluster.go
@@ -14,59 +14,79 @@ package datacoord
 import (
 	"fmt"
 	"sync"
+	"time"
 
+	"github.com/golang/protobuf/proto"
+	grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
+	"github.com/milvus-io/milvus/internal/kv"
 	"github.com/milvus-io/milvus/internal/log"
+	"github.com/milvus-io/milvus/internal/metrics"
 	"github.com/milvus-io/milvus/internal/proto/commonpb"
 	"github.com/milvus-io/milvus/internal/proto/datapb"
-	"github.com/milvus-io/milvus/internal/util/retry"
+	"github.com/milvus-io/milvus/internal/types"
 	"go.uber.org/zap"
 	"golang.org/x/net/context"
 )
 
-type cluster struct {
-	mu               sync.RWMutex
-	ctx              context.Context
-	dataManager      *clusterNodeManager
-	sessionManager   sessionManager
-	candidateManager *candidateManager
-	posProvider      positionProvider
+const clusterPrefix = "cluster-prefix/"
+const clusterBuffer = "cluster-buffer"
+const nodeEventChBufferSize = 1024
 
-	startupPolicy    clusterStartupPolicy
-	registerPolicy   dataNodeRegisterPolicy
-	unregisterPolicy dataNodeUnregisterPolicy
-	assignPolicy     channelAssignPolicy
-}
+const eventTimeout = 5 * time.Second
+
+type EventType int
+
+const (
+	Register      EventType = 1
+	UnRegister    EventType = 2
+	WatchChannel  EventType = 3
+	FlushSegments EventType = 4
+)
 
-type clusterOption struct {
-	apply func(c *cluster)
+type NodeEventType int
+
+const (
+	Watch NodeEventType = 0
+	Flush NodeEventType = 1
+)
+
+type Event struct {
+	Type EventType
+	Data interface{}
 }
 
-func withStartupPolicy(p clusterStartupPolicy) clusterOption {
-	return clusterOption{
-		apply: func(c *cluster) { c.startupPolicy = p },
-	}
+type WatchChannelParams struct {
+	Channel      string
+	CollectionID UniqueID
 }
 
-func withRegisterPolicy(p dataNodeRegisterPolicy) clusterOption {
-	return clusterOption{
-		apply: func(c *cluster) { c.registerPolicy = p },
-	}
+type Cluster struct {
+	ctx              context.Context
+	cancel           context.CancelFunc
+	mu               sync.Mutex
+	wg               sync.WaitGroup
+	nodes            ClusterStore
+	posProvider      positionProvider
+	chanBuffer       []*datapb.ChannelStatus //Unwatched channels buffer
+	kv               kv.TxnKV
+	registerPolicy   dataNodeRegisterPolicy
+	unregisterPolicy dataNodeUnregisterPolicy
+	assignPolicy     channelAssignPolicy
+	eventCh          chan *Event
 }
 
-func withUnregistorPolicy(p dataNodeUnregisterPolicy) clusterOption {
-	return clusterOption{
-		apply: func(c *cluster) { c.unregisterPolicy = p },
-	}
+type ClusterOption func(c *Cluster)
+
+func withRegisterPolicy(p dataNodeRegisterPolicy) ClusterOption {
+	return func(c *Cluster) { c.registerPolicy = p }
 }
 
-func withAssignPolicy(p channelAssignPolicy) clusterOption {
-	return clusterOption{
-		apply: func(c *cluster) { c.assignPolicy = p },
-	}
+func withUnregistorPolicy(p dataNodeUnregisterPolicy) ClusterOption {
+	return func(c *Cluster) { c.unregisterPolicy = p }
 }
 
-func defaultStartupPolicy() clusterStartupPolicy {
-	return newWatchRestartsStartupPolicy()
+func withAssignPolicy(p channelAssignPolicy) ClusterOption {
+	return func(c *Cluster) { c.assignPolicy = p }
 }
 
 func defaultRegisterPolicy() dataNodeRegisterPolicy {
@@ -81,329 +101,313 @@ func defaultAssignPolicy() channelAssignPolicy {
 	return newBalancedAssignPolicy()
 }
 
-func newCluster(ctx context.Context, dataManager *clusterNodeManager,
-	sessionManager sessionManager, posProvider positionProvider,
-	opts ...clusterOption) *cluster {
-	c := &cluster{
+func NewCluster(ctx context.Context, kv kv.TxnKV, store ClusterStore,
+	posProvider positionProvider, opts ...ClusterOption) (*Cluster, error) {
+	ctx, cancel := context.WithCancel(ctx)
+	c := &Cluster{
 		ctx:              ctx,
-		sessionManager:   sessionManager,
-		dataManager:      dataManager,
+		cancel:           cancel,
+		kv:               kv,
+		nodes:            store,
 		posProvider:      posProvider,
-		startupPolicy:    defaultStartupPolicy(),
+		chanBuffer:       []*datapb.ChannelStatus{},
 		registerPolicy:   defaultRegisterPolicy(),
 		unregisterPolicy: defaultUnregisterPolicy(),
 		assignPolicy:     defaultAssignPolicy(),
+		eventCh:          make(chan *Event, nodeEventChBufferSize),
 	}
 
-	c.candidateManager = newCandidateManager(20, c.validateDataNode, c.enableDataNode)
-
 	for _, opt := range opts {
-		opt.apply(c)
+		opt(c)
 	}
 
-	return c
+	if err := c.loadFromKv(); err != nil {
+		return nil, err
+	}
+	return c, nil
 }
 
-// startup applies statup policy
-func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
-	/*deltaChange := c.dataManager.updateCluster(dataNodes)
-	nodes, chanBuffer := c.dataManager.getDataNodes(false)
-	var rets []*datapb.DataNodeInfo
-	var err error
-	rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer)
-	c.dataManager.updateDataNodes(rets, chanBuffer)
-	rets, err = c.watch(rets)
+func (c *Cluster) loadFromKv() error {
+	_, values, err := c.kv.LoadWithPrefix(clusterPrefix)
 	if err != nil {
-		log.Warn("Failed to watch all the status change", zap.Error(err))
-		//does not trigger new another refresh, pending evt will do
+		return err
 	}
-	c.dataManager.updateDataNodes(rets, chanBuffer)
-	return nil*/
-	return c.refresh(dataNodes)
-}
 
-// refresh rough refresh datanode status after event received
-func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error {
-	c.mu.Lock()
-	defer c.mu.Unlock()
-	deltaChange := c.dataManager.updateCluster(dataNodes)
-	log.Debug("refresh delta", zap.Any("new", deltaChange.newNodes),
-		zap.Any("restart", deltaChange.restarts),
-		zap.Any("offline", deltaChange.offlines))
-
-	// cannot use startup policy directly separate into three parts:
-	// 1. add new nodes into candidates list
-	for _, dn := range dataNodes {
-		for _, newAddr := range deltaChange.newNodes {
-			if dn.Address == newAddr {
-				c.candidateManager.add(dn)
-			}
+	for _, v := range values {
+		info := &datapb.DataNodeInfo{}
+		if err := proto.UnmarshalText(v, info); err != nil {
+			return err
 		}
-	}
 
-	// 2. restart nodes, disable node&session, execute unregister policy and put node into candidate list
-	restartNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.restarts))
-	for _, node := range deltaChange.restarts {
-		info, ok := c.dataManager.dataNodes[node]
-		if ok {
-			restartNodes = append(restartNodes, info.info)
-			c.dataManager.unregister(node) // remove from cluster
-			c.sessionManager.releaseSession(node)
-		} else {
-			log.Warn("Restart node not in node manager", zap.String("restart_node", node))
-		}
+		node := NewNodeInfo(c.ctx, info)
+		c.nodes.SetNode(info.GetVersion(), node)
+		go c.handleEvent(node)
 	}
-	if len(restartNodes) > 0 {
-		for _, node := range restartNodes {
-			cluster, buffer := c.dataManager.getDataNodes(true)
-			if len(cluster) > 0 {
-				ret := c.unregisterPolicy.apply(cluster, node)
-				c.updateNodeWatch(ret, buffer)
-			} else {
-				// no online node, put all watched channels to buffer
-				buffer = append(buffer, node.Channels...)
-				c.updateNodeWatch([]*datapb.DataNodeInfo{}, buffer)
-			}
-			node.Channels = node.Channels[:0] // clear channels
-			c.candidateManager.add(node)      // put node into candidate list
+	dn, _ := c.kv.Load(clusterBuffer)
+	//TODO add not value error check
+	if dn != "" {
+		info := &datapb.DataNodeInfo{}
+		if err := proto.UnmarshalText(dn, info); err != nil {
+			return err
 		}
+		c.chanBuffer = info.Channels
 	}
 
-	// 3. offline do unregister
-	unregisterNodes := make([]*datapb.DataNodeInfo, 0, len(deltaChange.offlines)) // possible nodes info to unregister
-	for _, node := range deltaChange.offlines {
-		c.sessionManager.releaseSession(node)
-		info := c.dataManager.unregister(node)
-		if info != nil {
-			unregisterNodes = append(unregisterNodes, info)
-		}
+	return nil
+}
+
+func (c *Cluster) Flush(segments []*datapb.SegmentInfo) {
+	c.eventCh <- &Event{
+		Type: FlushSegments,
+		Data: segments,
 	}
-	for _, node := range unregisterNodes {
-		cluster, buffer := c.dataManager.getDataNodes(true)
-		if len(cluster) > 0 { // cluster has online nodes, migrate channels
-			ret := c.unregisterPolicy.apply(cluster, node)
-			c.updateNodeWatch(ret, buffer)
-		} else {
-			// no online node, put all watched channels to buffer
-			buffer = append(buffer, node.Channels...)
-			c.updateNodeWatch([]*datapb.DataNodeInfo{}, buffer)
-		}
+}
+
+func (c *Cluster) Register(node *NodeInfo) {
+	c.eventCh <- &Event{
+		Type: Register,
+		Data: node,
 	}
+}
 
-	return nil
+func (c *Cluster) UnRegister(node *NodeInfo) {
+	c.eventCh <- &Event{
+		Type: UnRegister,
+		Data: node,
+	}
 }
 
-// updateNodeWatch save nodes uncomplete status and try to watch channels which is unwatched, save the execution result
-func (c *cluster) updateNodeWatch(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
-	c.dataManager.updateDataNodes(nodes, buffer)
-	rets, err := c.watch(nodes)
-	if err != nil {
-		log.Warn("Failed to watch all the status change", zap.Error(err)) //
+func (c *Cluster) Watch(channel string, collectionID UniqueID) {
+	c.eventCh <- &Event{
+		Type: WatchChannel,
+		Data: &WatchChannelParams{
+			Channel:      channel,
+			CollectionID: collectionID,
+		},
 	}
-	c.dataManager.updateDataNodes(rets, buffer)
-	return err
 }
 
-// paraRun parallel run, with max Parallel limit
-func paraRun(works []func(), maxRunner int) {
-	wg := sync.WaitGroup{}
-	ch := make(chan func())
-	wg.Add(len(works))
-	if maxRunner > len(works) {
-		maxRunner = len(works)
+func (c *Cluster) handleNodeEvent() {
+	defer c.wg.Done()
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		case e := <-c.eventCh:
+			switch e.Type {
+			case Register:
+				c.handleRegister(e.Data.(*NodeInfo))
+			case UnRegister:
+				c.handleUnRegister(e.Data.(*NodeInfo))
+			case WatchChannel:
+				params := e.Data.(*WatchChannelParams)
+				c.handleWatchChannel(params.Channel, params.CollectionID)
+			case FlushSegments:
+				c.handleFlush(e.Data.([]*datapb.SegmentInfo))
+			default:
+				log.Warn("Unknow node event type")
+			}
+		}
 	}
+}
 
-	for i := 0; i < maxRunner; i++ {
-		go func() {
-			work, ok := <-ch
-			if !ok {
-				return
+func (c *Cluster) handleEvent(node *NodeInfo) {
+	ctx := node.ctx
+	ch := node.GetEventChannel()
+	var cli types.DataNode
+	var err error
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case event := <-ch:
+			cli = node.GetClient()
+			if cli == nil {
+				cli, err = createClient(ctx, node.info.GetAddress())
+				if err != nil {
+					log.Warn("failed to create client", zap.Any("node", node), zap.Error(err))
+					continue
+				}
+				c.mu.Lock()
+				c.nodes.SetClient(node.info.GetVersion(), cli)
+				c.mu.Unlock()
 			}
-			work()
-			wg.Done()
-		}()
+			switch event.Type {
+			case Watch:
+				req, ok := event.Req.(*datapb.WatchDmChannelsRequest)
+				if !ok {
+					log.Warn("request type is not Watch")
+					continue
+				}
+				tCtx, cancel := context.WithTimeout(ctx, eventTimeout)
+				resp, err := cli.WatchDmChannels(tCtx, req)
+				cancel()
+				if err = VerifyResponse(resp, err); err != nil {
+					log.Warn("Failed to watch dm channels", zap.String("addr", node.info.GetAddress()))
+				}
+				c.mu.Lock()
+				c.nodes.SetWatched(node.info.GetVersion(), parseChannelsFromReq(req))
+				node = c.nodes.GetNode(node.info.GetVersion())
+				c.mu.Unlock()
+				if err = c.saveNode(node); err != nil {
+					log.Warn("failed to save node info", zap.Any("node", node))
+					continue
+				}
+			case Flush:
+				req, ok := event.Req.(*datapb.FlushSegmentsRequest)
+				if !ok {
+					log.Warn("request type is not Flush")
+					continue
+				}
+				tCtx, cancel := context.WithTimeout(ctx, eventTimeout)
+				resp, err := cli.FlushSegments(tCtx, req)
+				cancel()
+				if err = VerifyResponse(resp, err); err != nil {
+					log.Warn("Failed to flush segments", zap.String("addr", node.info.GetAddress()))
+				}
+			default:
+				log.Warn("Wrong event type", zap.Any("type", event.Type))
+			}
+		}
 	}
-	for _, work := range works {
-		ch <- work
+}
+
+func parseChannelsFromReq(req *datapb.WatchDmChannelsRequest) []string {
+	channels := make([]string, 0, len(req.GetVchannels()))
+	for _, vc := range req.GetVchannels() {
+		channels = append(channels, vc.ChannelName)
 	}
-	wg.Wait()
-	close(ch)
+	return channels
 }
 
-func (c *cluster) validateDataNode(dn *datapb.DataNodeInfo) error {
-	log.Warn("[CM] start validate candidate", zap.String("addr", dn.Address))
-	_, err := c.sessionManager.getOrCreateSession(dn.Address) // this might take time if address went offline
-	log.Warn("[CM] candidate validation finished", zap.String("addr", dn.Address), zap.Error(err))
+func createClient(ctx context.Context, addr string) (types.DataNode, error) {
+	cli, err := grpcdatanodeclient.NewClient(ctx, addr)
 	if err != nil {
-		return err
+		return nil, err
 	}
-	return nil
+	if err := cli.Init(); err != nil {
+		return nil, err
+	}
+	if err := cli.Start(); err != nil {
+		return nil, err
+	}
+	return cli, nil
 }
 
-func (c *cluster) enableDataNode(dn *datapb.DataNodeInfo) error {
-	log.Warn("[CM] enabling candidate", zap.String("addr", dn.Address))
-	c.register(dn)
-	return nil
+// Startup applies statup policy
+func (c *Cluster) Startup(nodes []*NodeInfo) {
+	c.wg.Add(1)
+	go c.handleNodeEvent()
+	// before startup, we have restore all nodes recorded last time. We should
+	// find new created/offlined/restarted nodes and adjust channels allocation.
+	addNodes, deleteNodes := c.updateCluster(nodes)
+	for _, node := range addNodes {
+		c.Register(node)
+	}
+
+	for _, node := range deleteNodes {
+		c.UnRegister(node)
+	}
 }
 
-func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, error) {
-	works := make([]func(), 0, len(nodes))
-	mut := sync.Mutex{}
-	errs := make([]error, 0, len(nodes))
+func (c *Cluster) updateCluster(nodes []*NodeInfo) (newNodes []*NodeInfo, offlines []*NodeInfo) {
+	var onCnt, offCnt float64
+	currentOnline := make(map[int64]struct{})
 	for _, n := range nodes {
-		works = append(works, func() {
-			logMsg := fmt.Sprintf("Begin to watch channels for node %s, channels:", n.Address)
-			uncompletes := make([]vchannel, 0, len(n.Channels))
-			for _, ch := range n.Channels {
-				if ch.State == datapb.ChannelWatchState_Uncomplete {
-					if len(uncompletes) == 0 {
-						logMsg += ch.Name
-					} else {
-						logMsg += "," + ch.Name
-					}
-					uncompletes = append(uncompletes, vchannel{
-						CollectionID: ch.CollectionID,
-						DmlChannel:   ch.Name,
-					})
-				}
-			}
-
-			if len(uncompletes) == 0 {
-				return // all set, just return
-			}
-			log.Debug(logMsg)
-
-			vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
-			if err != nil {
-				log.Warn("get vchannel position failed", zap.Error(err))
-				mut.Lock()
-				errs = append(errs, err)
-				mut.Unlock()
-				return
-			}
-			cli, err := c.sessionManager.getSession(n.Address) //fail fast, don't create session
-			if err != nil {
-				log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err))
-				mut.Lock()
-				errs = append(errs, err)
-				mut.Unlock()
-				return
-			}
-			req := &datapb.WatchDmChannelsRequest{
-				Base: &commonpb.MsgBase{
-					SourceID: Params.NodeID,
-				},
-				Vchannels: vchanInfos,
-			}
-			resp, err := cli.WatchDmChannels(c.ctx, req)
-			if err != nil {
-				log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err))
-				mut.Lock()
-				errs = append(errs, err)
-				mut.Unlock()
-				return
-			}
-			if resp.ErrorCode != commonpb.ErrorCode_Success {
-				log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
-				mut.Lock()
-				errs = append(errs, fmt.Errorf("watch fail with stat %v, msg:%s", resp.ErrorCode, resp.Reason))
-				mut.Unlock()
-				return
-			}
-			for _, ch := range n.Channels {
-				if ch.State == datapb.ChannelWatchState_Uncomplete {
-					ch.State = datapb.ChannelWatchState_Complete
-				}
-			}
-		})
+		currentOnline[n.info.GetVersion()] = struct{}{}
+		node := c.nodes.GetNode(n.info.GetVersion())
+		if node == nil {
+			newNodes = append(newNodes, n)
+		}
+		onCnt++
 	}
-	paraRun(works, 20)
-	if len(errs) > 0 {
-		return nodes, retry.ErrorList(errs)
+
+	currNodes := c.nodes.GetNodes()
+	for _, node := range currNodes {
+		_, has := currentOnline[node.info.GetVersion()]
+		if !has {
+			offlines = append(offlines, node)
+			offCnt++
+		}
 	}
-	return nodes, nil
+	metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt)
+	metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt)
+	return
 }
 
-func (c *cluster) register(n *datapb.DataNodeInfo) {
+func (c *Cluster) handleRegister(n *NodeInfo) {
 	c.mu.Lock()
-	defer c.mu.Unlock()
-	c.dataManager.register(n)
-	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
-	var rets []*datapb.DataNodeInfo
-	var err error
-	log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
-	rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer)
-	log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
-	c.dataManager.updateDataNodes(rets, chanBuffer)
-	rets, err = c.watch(rets)
-	if err != nil {
-		log.Warn("Failed to watch all the status change", zap.Error(err))
-		//does not trigger new another refresh, pending evt will do
+	cNodes := c.nodes.GetNodes()
+	var nodes []*NodeInfo
+	log.Debug("before register policy applied", zap.Any("n.Channels", n.info.GetChannels()), zap.Any("buffer", c.chanBuffer))
+	nodes, c.chanBuffer = c.registerPolicy(cNodes, n, c.chanBuffer)
+	log.Debug("after register policy applied", zap.Any("ret", nodes), zap.Any("buffer", c.chanBuffer))
+	go c.handleEvent(n)
+	c.txnSaveNodesAndBuffer(nodes, c.chanBuffer)
+	for _, node := range nodes {
+		c.nodes.SetNode(node.info.GetVersion(), node)
+	}
+	c.mu.Unlock()
+	for _, node := range nodes {
+		c.watch(node)
 	}
-	c.dataManager.updateDataNodes(rets, chanBuffer)
 }
 
-func (c *cluster) unregister(n *datapb.DataNodeInfo) {
+func (c *Cluster) handleUnRegister(n *NodeInfo) {
 	c.mu.Lock()
-	defer c.mu.Unlock()
-
-	c.sessionManager.releaseSession(n.Address)
-	oldNode := c.dataManager.unregister(n.Address)
-	if oldNode != nil {
-		n = oldNode
+	node := c.nodes.GetNode(n.info.GetVersion())
+	if node == nil {
+		c.mu.Unlock()
+		return
 	}
-	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
-	log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
-	var rets []*datapb.DataNodeInfo
-	var err error
+	node.Dispose()
+	c.nodes.DeleteNode(n.info.GetVersion())
+	cNodes := c.nodes.GetNodes()
+	log.Debug("before unregister policy applied", zap.Any("node.Channels", node.info.GetChannels()), zap.Any("buffer", c.chanBuffer))
+	var rets []*NodeInfo
 	if len(cNodes) == 0 {
-		for _, chStat := range n.Channels {
+		for _, chStat := range node.info.GetChannels() {
 			chStat.State = datapb.ChannelWatchState_Uncomplete
-			chanBuffer = append(chanBuffer, chStat)
+			c.chanBuffer = append(c.chanBuffer, chStat)
 		}
 	} else {
-		rets = c.unregisterPolicy.apply(cNodes, n)
+		rets = c.unregisterPolicy(cNodes, n)
 	}
-	log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
-	c.dataManager.updateDataNodes(rets, chanBuffer)
-	rets, err = c.watch(rets)
-	if err != nil {
-		log.Warn("Failed to watch all the status change", zap.Error(err))
-		//does not trigger new another refresh, pending evt will do
+	c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
+	for _, node := range rets {
+		c.nodes.SetNode(node.info.GetVersion(), node)
+	}
+	c.mu.Unlock()
+	for _, node := range rets {
+		c.watch(node)
 	}
-	c.dataManager.updateDataNodes(rets, chanBuffer)
 }
 
-func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) {
+func (c *Cluster) handleWatchChannel(channel string, collectionID UniqueID) {
 	c.mu.Lock()
-	defer c.mu.Unlock()
-	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
-	var rets []*datapb.DataNodeInfo
-	var err error
+	cNodes := c.nodes.GetNodes()
+	var rets []*NodeInfo
 	if len(cNodes) == 0 { // no nodes to assign, put into buffer
-		chanBuffer = append(chanBuffer, &datapb.ChannelStatus{
+		c.chanBuffer = append(c.chanBuffer, &datapb.ChannelStatus{
 			Name:         channel,
 			CollectionID: collectionID,
 			State:        datapb.ChannelWatchState_Uncomplete,
 		})
 	} else {
-		rets = c.assignPolicy.apply(cNodes, channel, collectionID)
+		rets = c.assignPolicy(cNodes, channel, collectionID)
 	}
-	c.dataManager.updateDataNodes(rets, chanBuffer)
-	rets, err = c.watch(rets)
-	if err != nil {
-		log.Warn("Failed to watch all the status change", zap.Error(err))
-		//does not trigger new another refresh, pending evt will do
+	c.txnSaveNodesAndBuffer(rets, c.chanBuffer)
+	for _, node := range rets {
+		c.nodes.SetNode(node.info.GetVersion(), node)
+	}
+	c.mu.Unlock()
+	for _, node := range rets {
+		c.watch(node)
 	}
-	c.dataManager.updateDataNodes(rets, chanBuffer)
 }
 
-func (c *cluster) flush(segments []*datapb.SegmentInfo) {
-	c.mu.Lock()
-	defer c.mu.Unlock()
-
+func (c *Cluster) handleFlush(segments []*datapb.SegmentInfo) {
 	m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs
-
 	for _, seg := range segments {
 		if _, ok := m[seg.InsertChannel]; !ok {
 			m[seg.InsertChannel] = make(map[UniqueID][]UniqueID)
@@ -412,12 +416,14 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) {
 		m[seg.InsertChannel][seg.CollectionID] = append(m[seg.InsertChannel][seg.CollectionID], seg.ID)
 	}
 
-	dataNodes, _ := c.dataManager.getDataNodes(true)
+	c.mu.Lock()
+	dataNodes := c.nodes.GetNodes()
+	c.mu.Unlock()
 
-	channel2Node := make(map[string]string)
+	channel2Node := make(map[string]*NodeInfo)
 	for _, node := range dataNodes {
-		for _, chstatus := range node.Channels {
-			channel2Node[chstatus.Name] = node.Address
+		for _, chstatus := range node.info.GetChannels() {
+			channel2Node[chstatus.Name] = node
 		}
 	}
 
@@ -426,11 +432,6 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) {
 		if !ok {
 			continue
 		}
-		cli, err := c.sessionManager.getSession(node)
-		if err != nil {
-			log.Warn("get session failed", zap.String("addr", node), zap.Error(err))
-			continue
-		}
 		for coll, segs := range coll2seg {
 			req := &datapb.FlushSegmentsRequest{
 				Base: &commonpb.MsgBase{
@@ -440,23 +441,96 @@ func (c *cluster) flush(segments []*datapb.SegmentInfo) {
 				CollectionID: coll,
 				SegmentIDs:   segs,
 			}
-			resp, err := cli.FlushSegments(c.ctx, req)
-			if err != nil {
-				log.Warn("flush segment failed", zap.String("addr", node), zap.Error(err))
-				continue
+			ch := node.GetEventChannel()
+			e := &NodeEvent{
+				Type: Flush,
+				Req:  req,
 			}
-			if resp.ErrorCode != commonpb.ErrorCode_Success {
-				log.Warn("flush segment failed", zap.String("dataNode", node), zap.Error(err))
-				continue
+			ch <- e
+		}
+	}
+}
+
+func (c *Cluster) watch(n *NodeInfo) {
+	var logMsg string
+	uncompletes := make([]vchannel, 0, len(n.info.Channels))
+	for _, ch := range n.info.GetChannels() {
+		if ch.State == datapb.ChannelWatchState_Uncomplete {
+			if len(uncompletes) == 0 {
+				logMsg += ch.Name
+			} else {
+				logMsg += "," + ch.Name
 			}
-			log.Debug("flush segments succeed", zap.Any("segmentIDs", segs))
+			uncompletes = append(uncompletes, vchannel{
+				CollectionID: ch.CollectionID,
+				DmlChannel:   ch.Name,
+			})
 		}
 	}
+
+	if len(uncompletes) == 0 {
+		return // all set, just return
+	}
+	log.Debug(logMsg)
+
+	vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
+	if err != nil {
+		log.Warn("get vchannel position failed", zap.Error(err))
+		return
+	}
+	req := &datapb.WatchDmChannelsRequest{
+		Base: &commonpb.MsgBase{
+			SourceID: Params.NodeID,
+		},
+		Vchannels: vchanInfos,
+	}
+	e := &NodeEvent{
+		Type: Watch,
+		Req:  req,
+	}
+	ch := n.GetEventChannel()
+	ch <- e
+}
+
+func (c *Cluster) saveNode(n *NodeInfo) error {
+	key := fmt.Sprintf("%s%d", clusterPrefix, n.info.GetVersion())
+	value := proto.MarshalTextString(n.info)
+	return c.kv.Save(key, value)
+}
+
+func (c *Cluster) txnSaveNodesAndBuffer(nodes []*NodeInfo, buffer []*datapb.ChannelStatus) error {
+	if len(nodes) == 0 && len(buffer) == 0 {
+		return nil
+	}
+	data := make(map[string]string)
+	for _, n := range nodes {
+		key := fmt.Sprintf("%s%d", clusterPrefix, n.info.GetVersion())
+		value := proto.MarshalTextString(n.info)
+		data[key] = value
+	}
+
+	// short cut, reusing datainfo to store array of channel status
+	bufNode := &datapb.DataNodeInfo{
+		Channels: buffer,
+	}
+
+	data[clusterBuffer] = proto.MarshalTextString(bufNode)
+	return c.kv.MultiSave(data)
+}
+
+func (c *Cluster) GetNodes() []*NodeInfo {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	return c.nodes.GetNodes()
 }
 
-func (c *cluster) releaseSessions() {
+func (c *Cluster) Close() {
+	c.cancel()
+	c.wg.Wait()
 	c.mu.Lock()
 	defer c.mu.Unlock()
-	c.sessionManager.release()
-	c.candidateManager.dispose()
+	nodes := c.nodes.GetNodes()
+	for _, node := range nodes {
+		node.Dispose()
+	}
 }
diff --git a/internal/datacoord/cluster_candidate.go b/internal/datacoord/cluster_candidate.go
deleted file mode 100644
index da8644fad2545994506d3f797724fbec9ab55fbd..0000000000000000000000000000000000000000
--- a/internal/datacoord/cluster_candidate.go
+++ /dev/null
@@ -1,124 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-
-package datacoord
-
-import (
-	"context"
-	"sync"
-
-	"github.com/milvus-io/milvus/internal/log"
-	"github.com/milvus-io/milvus/internal/proto/datapb"
-	"go.uber.org/zap"
-)
-
-// candidateManager manages data node candidates
-type candidateManager struct {
-	candidatePool sync.Map                         // current processing candidates
-	taskQueue     chan candidate                   // task queue to notify workers
-	cancel        func()                           // global cancel func
-	validate      func(*datapb.DataNodeInfo) error // candidate validation
-	enable        func(*datapb.DataNodeInfo) error // enable operation if candidate validate
-}
-
-// candidate stands for datanode info from etcd
-// it needs to be validated before put into cluster
-// since etcd key has a lease timeout of 10 seconds
-type candidate struct {
-	key    string               // key to specify candidate, usually candidate address
-	node   *datapb.DataNodeInfo // node info
-	ctx    context.Context      //context obj to control validation process
-	cancel func()               // cancel func to cancel single candidate
-}
-
-// newCandidateManager create candidate with specified worker number
-func newCandidateManager(wn int, validate, enable func(*datapb.DataNodeInfo) error) *candidateManager {
-	if wn <= 0 {
-		wn = 20
-	}
-	ctx, cancel := context.WithCancel(context.Background())
-	cm := &candidateManager{
-		candidatePool: sync.Map{},
-		cancel:        cancel,
-		taskQueue:     make(chan candidate, wn), // wn * 2 cap, wn worker & wn buffer
-		validate:      validate,
-		enable:        enable,
-	}
-	for i := 0; i < wn; i++ {
-		//start worker
-		go cm.work(ctx)
-	}
-	return cm
-}
-
-// work processes the candidates from channel
-// each task can be cancel by candidate contex or by global cancel fund
-func (cm *candidateManager) work(ctx context.Context) {
-	for {
-		select {
-		case cand := <-cm.taskQueue:
-			ch := make(chan struct{})
-			var err error
-			go func() {
-				err = cm.validate(cand.node)
-				ch <- struct{}{}
-			}()
-			select {
-			case <-ch:
-				if err == nil {
-					cm.enable(cand.node) // success, enable candidate
-				} else {
-					log.Warn("[CM] candidate failed", zap.String("addr", cand.node.Address))
-				}
-			case <-cand.ctx.Done():
-			}
-			cm.candidatePool.Delete(cand.key) // remove from candidatePool
-		case <-ctx.Done():
-			return
-		}
-	}
-}
-
-// add datanode into candidate pool
-// the operation is non-blocking
-func (cm *candidateManager) add(dn *datapb.DataNodeInfo) {
-	log.Warn("[CM]add new candidate", zap.String("addr", dn.Address))
-	key := dn.Address
-	ctx, cancel := context.WithCancel(context.Background())
-	cand := candidate{
-		key:    key,
-		node:   dn,
-		ctx:    ctx,
-		cancel: cancel,
-	}
-	_, loaded := cm.candidatePool.LoadOrStore(key, cand)
-	if !loaded {
-		go func() { // start goroutine to non-blocking add into queue
-			cm.taskQueue <- cand
-		}()
-	}
-}
-
-// stop the candidate validation process if it exists in the pool
-func (cm *candidateManager) stop(key string) {
-	val, loaded := cm.candidatePool.LoadAndDelete(key)
-	if loaded {
-		cand, ok := val.(candidate)
-		if ok {
-			cand.cancel()
-		}
-	}
-}
-
-// dispose the manager for stopping app
-func (cm *candidateManager) dispose() {
-	cm.cancel()
-}
diff --git a/internal/datacoord/cluster_data_manager.go b/internal/datacoord/cluster_data_manager.go
deleted file mode 100644
index a8c1a340a396734d5d910be1d18bfa4daa603019..0000000000000000000000000000000000000000
--- a/internal/datacoord/cluster_data_manager.go
+++ /dev/null
@@ -1,198 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-package datacoord
-
-import (
-	"github.com/golang/protobuf/proto"
-	"github.com/milvus-io/milvus/internal/kv"
-	"github.com/milvus-io/milvus/internal/metrics"
-	"github.com/milvus-io/milvus/internal/proto/datapb"
-)
-
-const clusterPrefix = "cluster-prefix/"
-const clusterBuffer = "cluster-buffer"
-
-type dataNodeStatus int8
-
-const (
-	online dataNodeStatus = iota
-	offline
-)
-
-type dataNodeInfo struct {
-	info   *datapb.DataNodeInfo
-	status dataNodeStatus
-}
-
-type clusterNodeManager struct {
-	kv         kv.TxnKV
-	dataNodes  map[string]*dataNodeInfo
-	chanBuffer []*datapb.ChannelStatus //Unwatched channels buffer
-}
-
-func newClusterNodeManager(kv kv.TxnKV) (*clusterNodeManager, error) {
-	c := &clusterNodeManager{
-		kv:         kv,
-		dataNodes:  make(map[string]*dataNodeInfo),
-		chanBuffer: []*datapb.ChannelStatus{},
-	}
-	return c, c.loadFromKv()
-}
-
-func (c *clusterNodeManager) loadFromKv() error {
-	_, values, err := c.kv.LoadWithPrefix(clusterPrefix)
-	if err != nil {
-		return err
-	}
-
-	for _, v := range values {
-		info := &datapb.DataNodeInfo{}
-		if err := proto.UnmarshalText(v, info); err != nil {
-			return err
-		}
-
-		node := &dataNodeInfo{
-			info:   info,
-			status: offline,
-		}
-		c.dataNodes[info.Address] = node
-	}
-	dn, _ := c.kv.Load(clusterBuffer)
-	//TODO add not value error check
-	if dn != "" {
-		info := &datapb.DataNodeInfo{}
-		if err := proto.UnmarshalText(dn, info); err != nil {
-			return err
-		}
-		c.chanBuffer = info.Channels
-	}
-
-	return nil
-}
-
-func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *clusterDeltaChange {
-	newNodes := make([]string, 0)
-	offlines := make([]string, 0)
-	restarts := make([]string, 0)
-	var onCnt, offCnt float64
-	currentOnline := make(map[string]struct{})
-	for _, n := range dataNodes {
-		currentOnline[n.Address] = struct{}{}
-		onCnt++
-		node, ok := c.dataNodes[n.Address]
-
-		if ok {
-			node.status = online
-			if node.info.Version != n.Version {
-				restarts = append(restarts, n.Address)
-			}
-			continue
-		}
-
-		newNodes = append(newNodes, n.Address)
-	}
-
-	for nAddr, node := range c.dataNodes {
-		_, has := currentOnline[nAddr]
-		if !has && node.status == online {
-			node.status = offline
-			offCnt++
-			offlines = append(offlines, nAddr)
-		}
-	}
-	metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt)
-	metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt)
-	return &clusterDeltaChange{
-		newNodes: newNodes,
-		offlines: offlines,
-		restarts: restarts,
-	}
-}
-
-// updateDataNodes update dataNodes input mereged with existing cluster and buffer
-func (c *clusterNodeManager) updateDataNodes(dataNodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
-	for _, node := range dataNodes {
-		c.dataNodes[node.Address].info = node
-	}
-
-	return c.txnSaveNodes(dataNodes, buffer)
-}
-
-// getDataNodes get current synced data nodes with buffered channel
-func (c *clusterNodeManager) getDataNodes(onlyOnline bool) (map[string]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
-	ret := make(map[string]*datapb.DataNodeInfo)
-	for k, v := range c.dataNodes {
-		if !onlyOnline || v.status == online {
-			ret[k] = proto.Clone(v.info).(*datapb.DataNodeInfo)
-		}
-	}
-	return ret, c.chanBuffer
-}
-
-func (c *clusterNodeManager) register(n *datapb.DataNodeInfo) {
-	node, ok := c.dataNodes[n.Address]
-	if ok {
-		node.status = online
-		node.info.Version = n.Version
-	} else {
-		c.dataNodes[n.Address] = &dataNodeInfo{
-			info:   n,
-			status: online,
-		}
-	}
-	c.updateMetrics()
-}
-
-// unregister removes node with specified address, returns node info if exists
-func (c *clusterNodeManager) unregister(addr string) *datapb.DataNodeInfo {
-	node, ok := c.dataNodes[addr]
-	if !ok {
-		return nil
-	}
-	delete(c.dataNodes, addr)
-	node.status = offline
-	c.updateMetrics()
-	return node.info
-}
-
-func (c *clusterNodeManager) updateMetrics() {
-	var offCnt, onCnt float64
-	for _, node := range c.dataNodes {
-		if node.status == online {
-			onCnt++
-		} else {
-			offCnt++
-		}
-	}
-	metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt)
-	metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt)
-}
-
-func (c *clusterNodeManager) txnSaveNodes(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error {
-	if len(nodes) == 0 && len(buffer) == 0 {
-		return nil
-	}
-	data := make(map[string]string)
-	for _, n := range nodes {
-		c.dataNodes[n.Address].info = n
-		key := clusterPrefix + n.Address
-		value := proto.MarshalTextString(n)
-		data[key] = value
-	}
-	c.chanBuffer = buffer
-
-	// short cut, reusing datainfo to store array of channel status
-	bufNode := &datapb.DataNodeInfo{
-		Channels: buffer,
-	}
-	data[clusterBuffer] = proto.MarshalTextString(bufNode)
-	return c.kv.MultiSave(data)
-}
diff --git a/internal/datacoord/cluster_session_manager.go b/internal/datacoord/cluster_session_manager.go
deleted file mode 100644
index f0423479cf81fef8c7bece1d6e679ddb886c7fad..0000000000000000000000000000000000000000
--- a/internal/datacoord/cluster_session_manager.go
+++ /dev/null
@@ -1,110 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-package datacoord
-
-import (
-	"context"
-	"errors"
-	"sync"
-
-	"github.com/milvus-io/milvus/internal/types"
-)
-
-type sessionManager interface {
-	// try get session, without retry
-	getSession(addr string) (types.DataNode, error)
-	// try get session from manager with addr, if not exists, create one
-	getOrCreateSession(addr string) (types.DataNode, error)
-	releaseSession(addr string)
-	release()
-}
-
-type clusterSessionManager struct {
-	sync.RWMutex
-	ctx               context.Context
-	sessions          map[string]types.DataNode
-	dataClientCreator dataNodeCreatorFunc
-}
-
-func newClusterSessionManager(ctx context.Context, dataClientCreator dataNodeCreatorFunc) *clusterSessionManager {
-	return &clusterSessionManager{
-		ctx:               ctx,
-		sessions:          make(map[string]types.DataNode),
-		dataClientCreator: dataClientCreator,
-	}
-}
-
-// getSession with out creation if not found
-func (m *clusterSessionManager) getSession(addr string) (types.DataNode, error) {
-	m.RLock()
-	defer m.RUnlock()
-	cli, has := m.sessions[addr]
-	if has {
-		return cli, nil
-	}
-	return nil, errors.New("not found")
-}
-
-func (m *clusterSessionManager) createSession(addr string) (types.DataNode, error) {
-	cli, err := m.dataClientCreator(m.ctx, addr)
-	if err != nil {
-		return nil, err
-	}
-	if err := cli.Init(); err != nil {
-		return nil, err
-	}
-	if err := cli.Start(); err != nil {
-		return nil, err
-	}
-	m.Lock()
-	m.sessions[addr] = cli
-	m.Unlock()
-	return cli, nil
-}
-
-// entry function
-func (m *clusterSessionManager) getOrCreateSession(addr string) (types.DataNode, error) {
-	m.RLock()
-	dn, has := m.sessions[addr]
-	m.RUnlock()
-	if has {
-		return dn, nil
-	}
-	// does not need double check, addr has outer sync.Map
-	dn, err := m.createSession(addr)
-	return dn, err
-}
-
-// // lock acquired
-// func (m *clusterSessionManager) hasSession(addr string) bool {
-// 	_, ok := m.sessions[addr]
-// 	return ok
-// }
-
-func (m *clusterSessionManager) releaseSession(addr string) {
-	m.Lock()
-	defer m.Unlock()
-	cli, ok := m.sessions[addr]
-	if !ok {
-		return
-	}
-	_ = cli.Stop()
-	delete(m.sessions, addr)
-}
-
-func (m *clusterSessionManager) release() {
-	m.Lock()
-	defer m.Unlock()
-	for _, cli := range m.sessions {
-		_ = cli.Stop()
-	}
-	m.sessions = map[string]types.DataNode{}
-}
diff --git a/internal/datacoord/cluster_store.go b/internal/datacoord/cluster_store.go
new file mode 100644
index 0000000000000000000000000000000000000000..fe343847049907b23a9f16285a6f82c45b5dafdd
--- /dev/null
+++ b/internal/datacoord/cluster_store.go
@@ -0,0 +1,187 @@
+// Copyright (C) 2019-2020 Zilliz. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed under the License
+// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions and limitations under the License.
+package datacoord
+
+import (
+	"context"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/milvus-io/milvus/internal/metrics"
+	"github.com/milvus-io/milvus/internal/proto/datapb"
+	"github.com/milvus-io/milvus/internal/types"
+)
+
+type ClusterStore interface {
+	GetNodes() []*NodeInfo
+	SetNode(nodeID UniqueID, node *NodeInfo)
+	DeleteNode(nodeID UniqueID)
+	GetNode(nodeID UniqueID) *NodeInfo
+	SetClient(nodeID UniqueID, client types.DataNode)
+	SetWatched(nodeID UniqueID, channelsName []string)
+}
+
+type NodeInfo struct {
+	info    *datapb.DataNodeInfo
+	eventCh chan *NodeEvent
+	client  types.DataNode
+	ctx     context.Context
+	cancel  context.CancelFunc
+}
+
+const eventChBuffer = 1024
+
+type NodeEvent struct {
+	Type NodeEventType
+	Req  interface{}
+}
+
+func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo {
+	ctx, cancel := context.WithCancel(ctx)
+	return &NodeInfo{
+		info:    info,
+		eventCh: make(chan *NodeEvent, eventChBuffer),
+		ctx:     ctx,
+		cancel:  cancel,
+	}
+}
+
+func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo {
+	cloned := &NodeInfo{
+		info:    n.info,
+		eventCh: n.eventCh,
+		client:  n.client,
+		ctx:     n.ctx,
+		cancel:  n.cancel,
+	}
+	for _, opt := range opts {
+		opt(cloned)
+	}
+	return cloned
+}
+
+func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo {
+	info := proto.Clone(n.info).(*datapb.DataNodeInfo)
+	cloned := &NodeInfo{
+		info:    info,
+		eventCh: n.eventCh,
+		client:  n.client,
+		ctx:     n.ctx,
+		cancel:  n.cancel,
+	}
+	for _, opt := range opts {
+		opt(cloned)
+	}
+	return cloned
+}
+
+func (n *NodeInfo) GetEventChannel() chan *NodeEvent {
+	return n.eventCh
+}
+
+func (n *NodeInfo) GetClient() types.DataNode {
+	return n.client
+}
+
+func (n *NodeInfo) Dispose() {
+	defer n.cancel()
+	if n.client != nil {
+		n.client.Stop()
+	}
+}
+
+type NodesInfo struct {
+	nodes map[UniqueID]*NodeInfo
+}
+
+func NewNodesInfo() *NodesInfo {
+	c := &NodesInfo{
+		nodes: make(map[UniqueID]*NodeInfo),
+	}
+	return c
+}
+
+func (c *NodesInfo) GetNodes() []*NodeInfo {
+	nodes := make([]*NodeInfo, 0, len(c.nodes))
+	for _, node := range c.nodes {
+		nodes = append(nodes, node)
+	}
+	return nodes
+}
+
+func (c *NodesInfo) SetNode(nodeID UniqueID, node *NodeInfo) {
+	c.nodes[nodeID] = node
+	metrics.DataCoordDataNodeList.WithLabelValues("online").Inc()
+	metrics.DataCoordDataNodeList.WithLabelValues("offline").Dec()
+}
+
+func (c *NodesInfo) DeleteNode(nodeID UniqueID) {
+	delete(c.nodes, nodeID)
+	metrics.DataCoordDataNodeList.WithLabelValues("online").Dec()
+	metrics.DataCoordDataNodeList.WithLabelValues("offline").Inc()
+}
+
+func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo {
+	node, ok := c.nodes[nodeID]
+	if !ok {
+		return nil
+	}
+	return node
+}
+
+func (c *NodesInfo) SetClient(nodeID UniqueID, client types.DataNode) {
+	if node, ok := c.nodes[nodeID]; ok {
+		c.nodes[nodeID] = node.ShadowClone(SetClient(client))
+	}
+}
+
+func (c *NodesInfo) SetWatched(nodeID UniqueID, channelsName []string) {
+	if node, ok := c.nodes[nodeID]; ok {
+		c.nodes[nodeID] = node.Clone(SetWatched(channelsName))
+	}
+}
+
+type NodeOpt func(n *NodeInfo)
+
+func SetWatched(channelsName []string) NodeOpt {
+	return func(n *NodeInfo) {
+		channelsMap := make(map[string]struct{})
+		for _, channelName := range channelsName {
+			channelsMap[channelName] = struct{}{}
+		}
+		for _, ch := range n.info.Channels {
+			_, ok := channelsMap[ch.GetName()]
+			if !ok {
+				continue
+			}
+			if ch.State == datapb.ChannelWatchState_Uncomplete {
+				ch.State = datapb.ChannelWatchState_Complete
+			}
+		}
+	}
+}
+
+func SetClient(client types.DataNode) NodeOpt {
+	return func(n *NodeInfo) {
+		n.client = client
+	}
+}
+
+func AddChannels(channels []*datapb.ChannelStatus) NodeOpt {
+	return func(n *NodeInfo) {
+		n.info.Channels = append(n.info.Channels, channels...)
+	}
+}
+
+func SetChannels(channels []*datapb.ChannelStatus) NodeOpt {
+	return func(n *NodeInfo) {
+		n.info.Channels = channels
+	}
+}
diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go
index 11146171c73094b9d8cc7cf8fbe351b568fb900a..aec00aa07155c37e68698a4959149d7de390b9a7 100644
--- a/internal/datacoord/cluster_test.go
+++ b/internal/datacoord/cluster_test.go
@@ -12,8 +12,7 @@ package datacoord
 
 import (
 	"context"
-	"errors"
-	"strings"
+	"fmt"
 	"testing"
 
 	memkv "github.com/milvus-io/milvus/internal/kv/mem"
@@ -21,208 +20,156 @@ import (
 	"github.com/stretchr/testify/assert"
 )
 
+type SpyClusterStore struct {
+	*NodesInfo
+	ch chan interface{}
+}
+
+func (s *SpyClusterStore) SetNode(nodeID UniqueID, node *NodeInfo) {
+	s.NodesInfo.SetNode(nodeID, node)
+	s.ch <- struct{}{}
+}
+
+func (s *SpyClusterStore) DeleteNode(nodeID UniqueID) {
+	s.NodesInfo.DeleteNode(nodeID)
+	s.ch <- struct{}{}
+}
+
+func spyWatchPolicy(ch chan interface{}) channelAssignPolicy {
+	return func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo {
+		for _, node := range cluster {
+			for _, c := range node.info.GetChannels() {
+				if c.GetName() == channel && c.GetCollectionID() == collectionID {
+					ch <- struct{}{}
+					return nil
+				}
+			}
+		}
+		ret := make([]*NodeInfo, 0)
+		c := &datapb.ChannelStatus{
+			Name:         channel,
+			State:        datapb.ChannelWatchState_Uncomplete,
+			CollectionID: collectionID,
+		}
+		n := cluster[0].Clone(AddChannels([]*datapb.ChannelStatus{c}))
+		ret = append(ret, n)
+		return ret
+	}
+}
+
 func TestClusterCreate(t *testing.T) {
-	cPolicy := newMockStartupPolicy()
-	ch := make(chan struct{}, 1)
-	cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
-	addr := "localhost:8080"
-	nodes := []*datapb.DataNodeInfo{
-		{
-			Address:  addr,
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
+	ch := make(chan interface{})
+	kv := memkv.NewMemoryKV()
+	spyClusterStore := &SpyClusterStore{
+		NodesInfo: NewNodesInfo(),
+		ch:        ch,
 	}
-	err := cluster.startup(nodes)
+	cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{})
 	assert.Nil(t, err)
+	defer cluster.Close()
+	addr := "localhost:8080"
+	info := &datapb.DataNodeInfo{
+		Address:  addr,
+		Version:  1,
+		Channels: []*datapb.ChannelStatus{},
+	}
+	nodes := []*NodeInfo{NewNodeInfo(context.TODO(), info)}
+	cluster.Startup(nodes)
 	<-ch
-	dataNodes, _ := cluster.dataManager.getDataNodes(true)
+	dataNodes := cluster.GetNodes()
 	assert.EqualValues(t, 1, len(dataNodes))
-	assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
+	assert.EqualValues(t, "localhost:8080", dataNodes[0].info.GetAddress())
 }
 
 func TestRegister(t *testing.T) {
-	cPolicy := newMockStartupPolicy()
 	registerPolicy := newEmptyRegisterPolicy()
-	cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withRegisterPolicy(registerPolicy))
+	ch := make(chan interface{})
+	kv := memkv.NewMemoryKV()
+	spyClusterStore := &SpyClusterStore{
+		NodesInfo: NewNodesInfo(),
+		ch:        ch,
+	}
+	cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withRegisterPolicy(registerPolicy))
+	assert.Nil(t, err)
+	defer cluster.Close()
 	addr := "localhost:8080"
 
-	err := cluster.startup(nil)
-	assert.Nil(t, err)
-	cluster.register(&datapb.DataNodeInfo{
+	cluster.Startup(nil)
+	info := &datapb.DataNodeInfo{
 		Address:  addr,
 		Version:  1,
 		Channels: []*datapb.ChannelStatus{},
-	})
-	dataNodes, _ := cluster.dataManager.getDataNodes(true)
+	}
+	node := NewNodeInfo(context.TODO(), info)
+	cluster.Register(node)
+	<-ch
+	dataNodes := cluster.GetNodes()
 	assert.EqualValues(t, 1, len(dataNodes))
-	assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
+	assert.EqualValues(t, "localhost:8080", dataNodes[0].info.GetAddress())
 }
 
 func TestUnregister(t *testing.T) {
-	cPolicy := newMockStartupPolicy()
 	unregisterPolicy := newEmptyUnregisterPolicy()
-	ch := make(chan struct{}, 1)
-	cluster := createCluster(t, nil, withStartupPolicy(cPolicy), withUnregistorPolicy(unregisterPolicy), mockValidatorOption(ch))
-	addr := "localhost:8080"
-	nodes := []*datapb.DataNodeInfo{
-		{
-			Address:  addr,
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
+	ch := make(chan interface{})
+	kv := memkv.NewMemoryKV()
+	spyClusterStore := &SpyClusterStore{
+		NodesInfo: NewNodesInfo(),
+		ch:        ch,
 	}
-	err := cluster.startup(nodes)
+	cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withUnregistorPolicy(unregisterPolicy))
 	assert.Nil(t, err)
-	<-ch
-	dataNodes, _ := cluster.dataManager.getDataNodes(true)
-	assert.EqualValues(t, 1, len(dataNodes))
-	assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
-	cluster.unregister(&datapb.DataNodeInfo{
+	defer cluster.Close()
+	addr := "localhost:8080"
+	info := &datapb.DataNodeInfo{
 		Address:  addr,
 		Version:  1,
 		Channels: []*datapb.ChannelStatus{},
-	})
-	dataNodes, _ = cluster.dataManager.getDataNodes(false)
-	assert.EqualValues(t, 0, len(dataNodes))
-}
-
-func TestRefresh(t *testing.T) {
-	cPolicy := newMockStartupPolicy()
-	ch := make(chan struct{}, 1)
-	cluster := createCluster(t, nil, withStartupPolicy(cPolicy), clusterOption{
-		apply: func(c *cluster) {
-			c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error {
-				if strings.Contains(dn.Address, "inv") {
-					return errors.New("invalid dn")
-				}
-				return nil
-			}
-			c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error {
-				err := c.enableDataNode(dn)
-				ch <- struct{}{}
-				return err
-			}
-		},
-	})
-	addr := "localhost:8080"
-	nodes := []*datapb.DataNodeInfo{
-		{
-			Address:  addr,
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
-		{
-			Address:  addr + "invalid",
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
 	}
-	err := cluster.startup(nodes)
-	assert.Nil(t, err)
+	nodes := []*NodeInfo{NewNodeInfo(context.TODO(), info)}
+	cluster.Startup(nodes)
 	<-ch
-	dataNodes, _ := cluster.dataManager.getDataNodes(true)
-	if !assert.Equal(t, 1, len(dataNodes)) {
-		t.FailNow()
-	}
-	assert.Equal(t, addr, dataNodes[addr].Address)
-	addr2 := "localhost:8081"
-	nodes = []*datapb.DataNodeInfo{
-		{
-			Address:  addr2,
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
-		{
-			Address:  addr2 + "invalid",
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
-	}
-	err = cluster.refresh(nodes)
-	assert.Nil(t, err)
+	dataNodes := cluster.GetNodes()
+	assert.EqualValues(t, 1, len(dataNodes))
+	assert.EqualValues(t, "localhost:8080", dataNodes[0].info.GetAddress())
+	cluster.UnRegister(nodes[0])
 	<-ch
-	dataNodes, _ = cluster.dataManager.getDataNodes(true)
-	assert.Equal(t, 1, len(dataNodes))
-	_, has := dataNodes[addr]
-	assert.False(t, has)
-	assert.Equal(t, addr2, dataNodes[addr2].Address)
+	dataNodes = cluster.GetNodes()
+	assert.EqualValues(t, 0, len(dataNodes))
 }
 
 func TestWatchIfNeeded(t *testing.T) {
-	cPolicy := newMockStartupPolicy()
-	ch := make(chan struct{}, 1)
-	cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
-	addr := "localhost:8080"
-	nodes := []*datapb.DataNodeInfo{
-		{
-			Address:  addr,
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
+	ch := make(chan interface{})
+	kv := memkv.NewMemoryKV()
+	spyClusterStore := &SpyClusterStore{
+		NodesInfo: NewNodesInfo(),
+		ch:        ch,
 	}
-	err := cluster.startup(nodes)
-	assert.Nil(t, err)
-	<-ch
-	dataNodes, _ := cluster.dataManager.getDataNodes(true)
-	assert.EqualValues(t, 1, len(dataNodes))
-	assert.EqualValues(t, "localhost:8080", dataNodes[addr].Address)
 
-	chName := "ch1"
-	cluster.watchIfNeeded(chName, 0)
-	dataNodes, _ = cluster.dataManager.getDataNodes(true)
-	assert.EqualValues(t, 1, len(dataNodes[addr].Channels))
-	assert.EqualValues(t, chName, dataNodes[addr].Channels[0].Name)
-	cluster.watchIfNeeded(chName, 0)
-	assert.EqualValues(t, 1, len(dataNodes[addr].Channels))
-	assert.EqualValues(t, chName, dataNodes[addr].Channels[0].Name)
-}
-
-func TestFlushSegments(t *testing.T) {
-	cPolicy := newMockStartupPolicy()
-	ch := make(chan struct{}, 1)
-	cluster := createCluster(t, nil, withStartupPolicy(cPolicy), mockValidatorOption(ch))
+	pch := make(chan interface{})
+	spyPolicy := spyWatchPolicy(pch)
+	cluster, err := NewCluster(context.TODO(), kv, spyClusterStore, dummyPosProvider{}, withAssignPolicy(spyPolicy))
+	assert.Nil(t, err)
+	defer cluster.Close()
 	addr := "localhost:8080"
-	nodes := []*datapb.DataNodeInfo{
-		{
-			Address:  addr,
-			Version:  1,
-			Channels: []*datapb.ChannelStatus{},
-		},
+	info := &datapb.DataNodeInfo{
+		Address:  addr,
+		Version:  1,
+		Channels: []*datapb.ChannelStatus{},
 	}
-	err := cluster.startup(nodes)
+	node := NewNodeInfo(context.TODO(), info)
+	node.client, err = newMockDataNodeClient(1, make(chan interface{}))
 	assert.Nil(t, err)
+	nodes := []*NodeInfo{node}
+	cluster.Startup(nodes)
+	fmt.Println("11111")
 	<-ch
-	segments := []*datapb.SegmentInfo{
-		{
-			ID:            0,
-			CollectionID:  0,
-			InsertChannel: "ch1",
-		},
-	}
-
-	cluster.flush(segments)
-}
-
-func mockValidatorOption(ch chan<- struct{}) clusterOption {
-	return clusterOption{
-		apply: func(c *cluster) {
-			c.candidateManager.validate = func(dn *datapb.DataNodeInfo) error {
-				return nil
-			}
-			c.candidateManager.enable = func(dn *datapb.DataNodeInfo) error {
-				err := c.enableDataNode(dn)
-				ch <- struct{}{}
-				return err
-			}
-		},
-	}
-}
-
-func createCluster(t *testing.T, ch chan interface{}, options ...clusterOption) *cluster {
-	kv := memkv.NewMemoryKV()
-	sessionManager := newMockSessionManager(ch)
-	dataManager, err := newClusterNodeManager(kv)
-	assert.Nil(t, err)
-	return newCluster(context.TODO(), dataManager, sessionManager, dummyPosProvider{}, options...)
+	chName := "ch1"
+	cluster.Watch(chName, 0)
+	fmt.Println("222")
+	<-ch
+	dataNodes := cluster.GetNodes()
+	assert.EqualValues(t, 1, len(dataNodes[0].info.GetChannels()))
+	assert.EqualValues(t, chName, dataNodes[0].info.Channels[0].Name)
+	cluster.Watch(chName, 0)
+	<-pch
 }
diff --git a/internal/datacoord/datanode_helper.go b/internal/datacoord/datanode_helper.go
index 729df3ece5e03b7187dad1a868aba44f3c5f2998..4b60182b921410d8add1c55f827333a26df5797f 100644
--- a/internal/datacoord/datanode_helper.go
+++ b/internal/datacoord/datanode_helper.go
@@ -30,7 +30,7 @@ type dummyPosProvider struct{}
 
 //GetVChanPositions implements positionProvider
 func (dp dummyPosProvider) GetVChanPositions(vchans []vchannel, isAccurate bool) ([]*datapb.VchannelInfo, error) {
-	pairs := make([]*datapb.VchannelInfo, len(vchans))
+	pairs := make([]*datapb.VchannelInfo, 0, len(vchans))
 	for _, vchan := range vchans {
 		pairs = append(pairs, &datapb.VchannelInfo{
 			CollectionID:      vchan.CollectionID,
diff --git a/internal/datacoord/flush_monitor.go b/internal/datacoord/flush_monitor.go
deleted file mode 100644
index 5e3fa4ceae7712943c513122788569df2c88f8f8..0000000000000000000000000000000000000000
--- a/internal/datacoord/flush_monitor.go
+++ /dev/null
@@ -1,150 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-
-package datacoord
-
-import (
-	"sort"
-	"time"
-
-	"github.com/milvus-io/milvus/internal/proto/commonpb"
-	"github.com/milvus-io/milvus/internal/proto/datapb"
-	"github.com/milvus-io/milvus/internal/proto/internalpb"
-)
-
-// flushMonitor check segments / channels meet the provided flush policy
-type flushMonitor struct {
-	meta          *meta
-	segmentPolicy SegmentFlushPolicy
-	channelPolicy ChannelFlushPolicy
-}
-
-// SegmentFlushPolicy checks segment size and returns whether segment needs to be flushed
-type SegmentFlushPolicy func(*datapb.SegmentInfo) bool
-
-// ChannelFlushPolicy checks segments inside single Vchannel count and returns segment ids needs to be flushed
-type ChannelFlushPolicy func(string, []*datapb.SegmentInfo, *internalpb.MsgPosition) []UniqueID
-
-// emptyFlushMonitor returns empty flush montior
-func emptyFlushMonitor(meta *meta) flushMonitor {
-	return flushMonitor{
-		meta: meta,
-	}
-}
-
-// defaultFlushMonitor generates auto flusher with default policies
-func defaultFlushMonitor(meta *meta) flushMonitor {
-	return flushMonitor{
-		meta: meta,
-		// segmentPolicy: estSegmentSizePolicy(1024, 1024*1024*1536), // row 1024 byte, limit 1.5GiB
-		channelPolicy: channelSizeEpochPolicy(1024, uint64(time.Hour)),
-	}
-}
-
-// CheckSegments check segments meet flush policy, returns segment id needs to flush
-func (f flushMonitor) CheckSegments(segments []*datapb.SegmentInfo) []UniqueID {
-	if f.segmentPolicy == nil {
-		return []UniqueID{}
-	}
-	result := make([]UniqueID, 0, len(segments))
-	for _, segment := range segments {
-		if f.segmentPolicy(segment) {
-			result = append(result, segment.ID)
-		}
-	}
-	return result
-}
-
-// CheckChannels check channels changed, apply `ChannelPolicy`
-func (f flushMonitor) CheckChannels(channels []string, latest *internalpb.MsgPosition) []UniqueID {
-	segHits := make(map[UniqueID]struct{})
-	for _, channel := range channels {
-		segments := f.meta.GetSegmentsByChannel(channel)
-
-		growingSegments := make([]*datapb.SegmentInfo, 0, len(segments))
-		for _, segment := range segments {
-			if segment.State != commonpb.SegmentState_Growing {
-				continue
-			}
-			growingSegments = append(growingSegments, segment)
-			if f.segmentPolicy != nil && f.segmentPolicy(segment) {
-				segHits[segment.ID] = struct{}{}
-			}
-		}
-		if f.channelPolicy != nil {
-			hits := f.channelPolicy(channel, growingSegments, latest)
-			for _, hit := range hits {
-				segHits[hit] = struct{}{}
-			}
-		}
-	}
-
-	result := make([]UniqueID, 0, len(segHits))
-	for segID := range segHits {
-		result = append(result, segID)
-	}
-
-	return result
-}
-
-// deprecated
-func estSegmentSizePolicy(rowSize, limit int64) SegmentFlushPolicy {
-	return func(seg *datapb.SegmentInfo) bool {
-		if seg == nil {
-			return false
-		}
-		if seg.NumOfRows*rowSize > limit {
-			return true
-		}
-		return false
-	}
-}
-
-// channelSizeEpochPolicy policy check channel sizes and segment life time
-// segmentMax is the max number of segment allowed in the channel
-// epochDuration is the max live time segment has
-func channelSizeEpochPolicy(segmentMax int, epochDuration uint64) ChannelFlushPolicy {
-	return func(channel string, segments []*datapb.SegmentInfo, latest *internalpb.MsgPosition) []UniqueID {
-		if len(segments) < segmentMax && latest == nil {
-			return []UniqueID{}
-		}
-		sortSegmentsByDmlPos(segments)
-		result := []UniqueID{}
-		overflow := len(segments) - segmentMax
-		for idx, segment := range segments {
-			if idx < overflow {
-				result = append(result, segment.ID)
-				continue
-			}
-			if latest != nil {
-				if segment.DmlPosition == nil || latest.Timestamp-segment.DmlPosition.Timestamp > epochDuration {
-					result = append(result, segment.ID)
-					continue
-				}
-			}
-			break
-		}
-		return result
-	}
-}
-
-// sortSegmentsByDmlPos sorts input segments in ascending order by `DmlPosition.Timestamp`, nil value is less than 0
-func sortSegmentsByDmlPos(segments []*datapb.SegmentInfo) {
-	sort.Slice(segments, func(i, j int) bool {
-		if segments[i].DmlPosition == nil {
-			return true
-		}
-		if segments[j].DmlPosition == nil {
-			return false
-		}
-		return segments[i].DmlPosition.Timestamp < segments[j].DmlPosition.Timestamp
-	})
-}
diff --git a/internal/datacoord/flush_monitor_test.go b/internal/datacoord/flush_monitor_test.go
deleted file mode 100644
index 2c5da05a8561f53ea972bc1cccf829435a8aa5b0..0000000000000000000000000000000000000000
--- a/internal/datacoord/flush_monitor_test.go
+++ /dev/null
@@ -1,118 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-
-package datacoord
-
-import (
-	"testing"
-	"time"
-
-	"github.com/milvus-io/milvus/internal/proto/datapb"
-	"github.com/milvus-io/milvus/internal/proto/internalpb"
-	"github.com/stretchr/testify/assert"
-)
-
-func TestFlushMonitor(t *testing.T) {
-	const collID = UniqueID(0)
-	const partID0 = UniqueID(100)
-	const partID1 = UniqueID(101)
-	const channelName = "c1"
-
-	mockAllocator := newMockAllocator()
-	meta, err := newMemoryMeta(mockAllocator)
-	assert.Nil(t, err)
-
-	testSchema := newTestSchema()
-	collInfo := &datapb.CollectionInfo{
-		ID:         collID,
-		Schema:     testSchema,
-		Partitions: []UniqueID{partID0, partID1},
-	}
-
-	meta.AddCollection(collInfo)
-
-	// create seg0 for partition0, seg0/seg1 for partition1
-	segID0_0, err := mockAllocator.allocID()
-	assert.Nil(t, err)
-	segInfo0_0 := buildSegment(collID, partID0, segID0_0, channelName)
-	segID1_0, err := mockAllocator.allocID()
-	assert.Nil(t, err)
-	segInfo1_0 := buildSegment(collID, partID1, segID1_0, channelName)
-	segID1_1, err := mockAllocator.allocID()
-	assert.Nil(t, err)
-	segInfo1_1 := buildSegment(collID, partID1, segID1_1, channelName)
-
-	// check AddSegment
-	err = meta.AddSegment(segInfo0_0)
-	assert.Nil(t, err)
-	err = meta.AddSegment(segInfo1_0)
-	assert.Nil(t, err)
-	err = meta.AddSegment(segInfo1_1)
-	assert.Nil(t, err)
-
-	t.Run("Test empty flush monitor", func(t *testing.T) {
-		fm := emptyFlushMonitor(meta)
-		ids := fm.CheckSegments([]*datapb.SegmentInfo{})
-		assert.Equal(t, 0, len(ids))
-
-		ids = fm.CheckChannels([]string{channelName}, nil)
-		assert.Equal(t, 0, len(ids))
-	})
-
-	t.Run("Test custom segment policy", func(t *testing.T) {
-		fm := emptyFlushMonitor(meta)
-		fm.segmentPolicy = estSegmentSizePolicy(1024*1024, 1024*1024*2) // row size 1Mib Limit 2 MB
-		segID3Rows, err := mockAllocator.allocID()
-		assert.Nil(t, err)
-		segInfo3Rows := buildSegment(collID, partID1, segID3Rows, channelName)
-		segInfo3Rows.NumOfRows = 3
-
-		ids := fm.CheckSegments([]*datapb.SegmentInfo{segInfo3Rows})
-		if assert.Equal(t, 1, len(ids)) {
-			assert.Equal(t, segID3Rows, ids[0])
-		}
-	})
-
-	t.Run("Test custom channel policy", func(t *testing.T) {
-		const channelName2 = `ch2`
-		fm := emptyFlushMonitor(meta)
-		fm.channelPolicy = channelSizeEpochPolicy(100, uint64(time.Hour))
-
-		for i := 0; i < 100; i++ {
-			segID, err := mockAllocator.allocID()
-			assert.Nil(t, err)
-			seg := buildSegment(collID, partID0, segID, channelName2)
-			seg.DmlPosition = &internalpb.MsgPosition{
-				Timestamp: uint64(i + 1),
-			}
-			meta.AddSegment(seg)
-		}
-
-		ids := fm.CheckChannels([]string{channelName2}, nil)
-		assert.Equal(t, 0, len(ids))
-
-		exSegID, err := mockAllocator.allocID()
-		assert.Nil(t, err)
-		seg := buildSegment(collID, partID0, exSegID, channelName2)
-		seg.DmlPosition = &internalpb.MsgPosition{
-			Timestamp: uint64(0), // the oldest
-		}
-		meta.AddSegment(seg)
-
-		ids = fm.CheckChannels([]string{channelName2}, nil)
-		if assert.Equal(t, 1, len(ids)) {
-			assert.Equal(t, exSegID, ids[0])
-		}
-
-		ids = fm.CheckChannels([]string{channelName2}, &internalpb.MsgPosition{Timestamp: uint64(time.Hour + 5)})
-		assert.Equal(t, 5, len(ids))
-	})
-}
diff --git a/internal/datacoord/grpc_services.go b/internal/datacoord/grpc_services.go
index f3a8b6ac88852266c572951effc7448249821d40..2f079af80509caad39413edcf9b92553b5325866 100644
--- a/internal/datacoord/grpc_services.go
+++ b/internal/datacoord/grpc_services.go
@@ -60,6 +60,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
 		resp.Status.Reason = fmt.Sprintf("Failed to flush %d, %s", req.CollectionID, err)
 		return resp, nil
 	}
+	log.Debug("flush response with segments", zap.Any("segments", sealedSegments))
 	resp.Status.ErrorCode = commonpb.ErrorCode_Success
 	resp.DbID = req.GetDbID()
 	resp.CollectionID = req.GetCollectionID()
@@ -110,8 +111,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
 		//assigns = append(assigns, result)
 		//continue
 		//}
-
-		s.cluster.watchIfNeeded(r.ChannelName, r.CollectionID)
+		s.cluster.Watch(r.ChannelName, r.CollectionID)
 
 		segmentID, retCount, expireTs, err := s.segmentManager.AllocSegment(ctx,
 			r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go
index bf4f3d9cdb94b05fe76455b2b36178b56c5da8d2..1775c84528d70d02b6cd0b38067d549713a1a6dd 100644
--- a/internal/datacoord/mock_test.go
+++ b/internal/datacoord/mock_test.go
@@ -16,7 +16,6 @@ import (
 	"time"
 
 	memkv "github.com/milvus-io/milvus/internal/kv/mem"
-	"github.com/milvus-io/milvus/internal/types"
 	"github.com/milvus-io/milvus/internal/util/tsoutil"
 
 	"github.com/milvus-io/milvus/internal/proto/commonpb"
@@ -301,38 +300,3 @@ func (m *mockRootCoordService) SegmentFlushCompleted(ctx context.Context, in *da
 func (m *mockRootCoordService) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
 	panic("not implemented") // TODO: Implement
 }
-
-type mockStartupPolicy struct {
-}
-
-func newMockStartupPolicy() clusterStartupPolicy {
-	return &mockStartupPolicy{}
-}
-
-func (p *mockStartupPolicy) apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
-	return nil, nil
-}
-
-type mockSessionManager struct {
-	ch chan interface{}
-}
-
-func newMockSessionManager(ch chan interface{}) sessionManager {
-	return &mockSessionManager{
-		ch: ch,
-	}
-}
-
-func (m *mockSessionManager) getSession(addr string) (types.DataNode, error) {
-	return newMockDataNodeClient(0, m.ch)
-}
-
-func (m *mockSessionManager) getOrCreateSession(addr string) (types.DataNode, error) {
-	return newMockDataNodeClient(0, m.ch)
-}
-
-func (m *mockSessionManager) releaseSession(addr string) {
-
-}
-func (m *mockSessionManager) release() {
-}
diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go
index 0a2571eaebe70ecbc74b072e72bc19d5bb31734c..ccca3d65091ca8acc88b99e4dfc7eb3c8982a561 100644
--- a/internal/datacoord/policy.go
+++ b/internal/datacoord/policy.go
@@ -27,108 +27,18 @@ type clusterDeltaChange struct {
 	restarts []string
 }
 
-// clusterStartupPolicy defines the behavior when datacoord starts/restarts
-type clusterStartupPolicy interface {
-	// apply accept all nodes and new/offline/restarts nodes and returns datanodes whose status need to be changed
-	apply(oldCluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
-}
-
-type watchRestartsStartupPolicy struct {
-}
-
-func newWatchRestartsStartupPolicy() clusterStartupPolicy {
-	return watchRestartStartup
-}
-
-// startup func
-type startupFunc func(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange,
-	buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
-
-// implement watchRestartsStartupPolicy for startupFunc
-func (f startupFunc) apply(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange,
-	buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
-	return f(cluster, delta, buffer)
-}
-
-var watchRestartStartup startupFunc = func(cluster map[string]*datapb.DataNodeInfo, delta *clusterDeltaChange,
-	buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
-	ret := make([]*datapb.DataNodeInfo, 0)
-	for _, addr := range delta.restarts {
-		node := cluster[addr]
-		for _, ch := range node.Channels {
-			ch.State = datapb.ChannelWatchState_Uncomplete
-		}
-		ret = append(ret, node)
-	}
-	// put all channels from offline into buffer first
-	for _, addr := range delta.offlines {
-		node := cluster[addr]
-		for _, ch := range node.Channels {
-			ch.State = datapb.ChannelWatchState_Uncomplete
-			buffer = append(buffer, ch)
-		}
-	}
-	// try new nodes first
-	if len(delta.newNodes) > 0 && len(buffer) > 0 {
-		idx := 0
-		for len(buffer) > 0 {
-			node := cluster[delta.newNodes[idx%len(delta.newNodes)]]
-			node.Channels = append(node.Channels, buffer[0])
-			buffer = buffer[1:]
-			if idx < len(delta.newNodes) {
-				ret = append(ret, node)
-			}
-			idx++
-		}
-	}
-	// try online nodes if buffer is not empty
-	if len(buffer) > 0 {
-		online := make([]*datapb.DataNodeInfo, 0, len(cluster))
-		for _, node := range cluster {
-			online = append(online, node)
-		}
-		if len(online) > 0 {
-			idx := 0
-			for len(buffer) > 0 {
-				node := online[idx%len(online)]
-				node.Channels = append(node.Channels, buffer[0])
-				buffer = buffer[1:]
-				if idx < len(online) {
-					ret = append(ret, node)
-				}
-				idx++
-			}
-		}
-	}
-	return ret, buffer
-}
-
-// dataNodeRegisterPolicy defines the behavior when a datanode is registered
-type dataNodeRegisterPolicy interface {
-	// apply accept all online nodes and new created node, returns nodes needed to be changed
-	apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
-}
-
 // data node register func, simple func wrapping policy
-type dataNodeRegisterFunc func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus)
-
-// implement dataNodeRegisterPolicy for dataNodeRegisterFunc
-func (f dataNodeRegisterFunc) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo,
-	buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
-	return f(cluster, session, buffer)
-}
+type dataNodeRegisterPolicy func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus)
 
 // test logic, register and do nothing
-var emptyRegister dataNodeRegisterFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo,
-	buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
-	return []*datapb.DataNodeInfo{session}, buffer
+var emptyRegister dataNodeRegisterPolicy = func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus) {
+	return []*NodeInfo{session}, buffer
 }
 
 // assign existing buffered channels into newly registered data node session
-var registerAssignWithBuffer dataNodeRegisterFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo,
-	buffer []*datapb.ChannelStatus) ([]*datapb.DataNodeInfo, []*datapb.ChannelStatus) {
-	session.Channels = append(session.Channels, buffer...)
-	return []*datapb.DataNodeInfo{session}, []*datapb.ChannelStatus{}
+var registerAssignWithBuffer dataNodeRegisterPolicy = func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus) {
+	node := session.Clone(AddChannels(buffer))
+	return []*NodeInfo{node}, []*datapb.ChannelStatus{}
 }
 
 func newEmptyRegisterPolicy() dataNodeRegisterPolicy {
@@ -139,41 +49,35 @@ func newAssiggBufferRegisterPolicy() dataNodeRegisterPolicy {
 	return registerAssignWithBuffer
 }
 
-// dataNodeUnregisterPolicy defines the behavior when datanode unregisters
-type dataNodeUnregisterPolicy interface {
-	// apply accept all online nodes and unregistered node, returns nodes needed to be changed
-	apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
-}
-
 // unregisterNodeFunc, short cut for functions implement policy
-type unregisterNodeFunc func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo
-
-// implement dataNodeUnregisterPolicy for unregisterNodeFunc
-func (f unregisterNodeFunc) apply(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
-	return f(cluster, session)
-}
+type dataNodeUnregisterPolicy func(cluster []*NodeInfo, session *NodeInfo) []*NodeInfo
 
 // test logic, do nothing when node unregister
-var emptyUnregisterFunc unregisterNodeFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
+var emptyUnregisterFunc dataNodeUnregisterPolicy = func(cluster []*NodeInfo, session *NodeInfo) []*NodeInfo {
 	return nil
 }
 
 // randomly assign channels from unregistered node into existing nodes
 // if there is no nodes online, this func will not be invoked, buffer will be filled outside this func
-var randomAssignRegisterFunc unregisterNodeFunc = func(cluster map[string]*datapb.DataNodeInfo, session *datapb.DataNodeInfo) []*datapb.DataNodeInfo {
+var randomAssignRegisterFunc dataNodeUnregisterPolicy = func(cluster []*NodeInfo, session *NodeInfo) []*NodeInfo {
 	if len(cluster) == 0 || // no available node
 		session == nil ||
-		len(session.Channels) == 0 { // lost node not watching any channels
-		return []*datapb.DataNodeInfo{}
+		len(session.info.GetChannels()) == 0 { // lost node not watching any channels
+		return []*NodeInfo{}
 	}
 
-	appliedNodes := make([]*datapb.DataNodeInfo, 0, len(session.Channels))
+	appliedNodes := make([]*NodeInfo, 0, len(session.info.GetChannels()))
+	channels := session.info.GetChannels()
+	// clear unregistered node's channels
+	node := session.Clone(SetChannels(nil))
+	appliedNodes = append(appliedNodes, node)
+
 	raResult := make(map[int][]*datapb.ChannelStatus)
-	for _, chanSt := range session.Channels {
+	for _, chanSt := range channels {
 		bIdx, err := rand.Int(rand.Reader, big.NewInt(int64(len(cluster))))
 		if err != nil {
 			log.Error("error generated rand idx", zap.Error(err))
-			return []*datapb.DataNodeInfo{}
+			return []*NodeInfo{}
 		}
 		idx := bIdx.Int64()
 		if int(idx) >= len(cluster) {
@@ -193,11 +97,10 @@ var randomAssignRegisterFunc unregisterNodeFunc = func(cluster map[string]*datap
 		cs, ok := raResult[i]
 		i++
 		if ok {
-			node.Channels = append(node.Channels, cs...)
-			appliedNodes = append(appliedNodes, node)
+			n := node.Clone(AddChannels(cs))
+			appliedNodes = append(appliedNodes, n)
 		}
 	}
-
 	return appliedNodes
 }
 
@@ -205,27 +108,16 @@ func newEmptyUnregisterPolicy() dataNodeUnregisterPolicy {
 	return emptyUnregisterFunc
 }
 
-// channelAssignPolicy defines the behavior when a new channel needs to be assigned
-type channelAssignPolicy interface {
-	// apply accept all online nodes and new created channel with collectionID, returns node needed to be changed
-	apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo
-}
-
 // channelAssignFunc, function shortcut for policy
-type channelAssignFunc func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo
-
-// implement channelAssignPolicy for channelAssign func
-func (f channelAssignFunc) apply(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
-	return f(cluster, channel, collectionID)
-}
+type channelAssignPolicy func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo
 
 // deprecated
 // test logic, assign channel to all existing data node, works fine only when there is only one data node!
-var assignAllFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
-	ret := make([]*datapb.DataNodeInfo, 0)
+var assignAllFunc channelAssignPolicy = func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo {
+	ret := make([]*NodeInfo, 0)
 	for _, node := range cluster {
 		has := false
-		for _, ch := range node.Channels {
+		for _, ch := range node.info.GetChannels() {
 			if ch.Name == channel {
 				has = true
 				break
@@ -234,45 +126,47 @@ var assignAllFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeIn
 		if has {
 			continue
 		}
-		node.Channels = append(node.Channels, &datapb.ChannelStatus{
+		c := &datapb.ChannelStatus{
 			Name:         channel,
 			State:        datapb.ChannelWatchState_Uncomplete,
 			CollectionID: collectionID,
-		})
-		ret = append(ret, node)
+		}
+		n := node.Clone(AddChannels([]*datapb.ChannelStatus{c}))
+		ret = append(ret, n)
 	}
 
 	return ret
 }
 
 // balanced assign channel, select the datanode with least amount of channels to assign
-var balancedAssignFunc channelAssignFunc = func(cluster map[string]*datapb.DataNodeInfo, channel string, collectionID UniqueID) []*datapb.DataNodeInfo {
+var balancedAssignFunc channelAssignPolicy = func(cluster []*NodeInfo, channel string, collectionID UniqueID) []*NodeInfo {
 	if len(cluster) == 0 {
-		return []*datapb.DataNodeInfo{}
+		return []*NodeInfo{}
 	}
 	// filter existed channel
 	for _, node := range cluster {
-		for _, c := range node.GetChannels() {
+		for _, c := range node.info.GetChannels() {
 			if c.GetName() == channel && c.GetCollectionID() == collectionID {
 				return nil
 			}
 		}
 	}
-	target, min := "", math.MaxInt32
+	target, min := -1, math.MaxInt32
 	for k, v := range cluster {
-		if len(v.GetChannels()) < min {
+		if len(v.info.GetChannels()) < min {
 			target = k
-			min = len(v.GetChannels())
+			min = len(v.info.GetChannels())
 		}
 	}
 
-	ret := make([]*datapb.DataNodeInfo, 0)
-	cluster[target].Channels = append(cluster[target].Channels, &datapb.ChannelStatus{
+	ret := make([]*NodeInfo, 0)
+	c := &datapb.ChannelStatus{
 		Name:         channel,
 		State:        datapb.ChannelWatchState_Uncomplete,
 		CollectionID: collectionID,
-	})
-	ret = append(ret, cluster[target])
+	}
+	n := cluster[target].Clone(AddChannels([]*datapb.ChannelStatus{c}))
+	ret = append(ret, n)
 	return ret
 }
 
diff --git a/internal/datacoord/policy_test.go b/internal/datacoord/policy_test.go
index 546bccdf1df1f8492999bfcadb254b1599b51129..99e51243319ddda7141dbcdf95846546e2d56f28 100644
--- a/internal/datacoord/policy_test.go
+++ b/internal/datacoord/policy_test.go
@@ -12,96 +12,60 @@
 package datacoord
 
 import (
+	"context"
 	"testing"
 
 	"github.com/milvus-io/milvus/internal/proto/datapb"
 	"github.com/stretchr/testify/assert"
 )
 
-func TestWatchRestartsPolicy(t *testing.T) {
-	p := newWatchRestartsStartupPolicy()
-	c := make(map[string]*datapb.DataNodeInfo)
-	c["localhost:1111"] = &datapb.DataNodeInfo{
-		Address: "localhost:1111",
-		Version: 0,
-		Channels: []*datapb.ChannelStatus{
-			{
-				Name:         "vch1",
-				State:        datapb.ChannelWatchState_Complete,
-				CollectionID: 0,
-			},
-		},
-	}
-
-	c["localhost:2222"] = &datapb.DataNodeInfo{
-		Address: "localhost:2222",
-		Version: 0,
-		Channels: []*datapb.ChannelStatus{
-			{
-				Name:         "vch2",
-				State:        datapb.ChannelWatchState_Complete,
-				CollectionID: 0,
-			},
-		},
-	}
-
-	dchange := &clusterDeltaChange{
-		newNodes: []string{},
-		offlines: []string{},
-		restarts: []string{"localhost:2222"},
-	}
-
-	nodes, _ := p.apply(c, dchange, []*datapb.ChannelStatus{})
-	assert.EqualValues(t, 1, len(nodes))
-	assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nodes[0].Channels[0].State)
-}
-
 func TestRandomReassign(t *testing.T) {
 	p := randomAssignRegisterFunc
 
-	clusters := make(map[string]*datapb.DataNodeInfo)
-	clusters["addr1"] = &datapb.DataNodeInfo{
+	clusters := make([]*NodeInfo, 0)
+	info1 := &datapb.DataNodeInfo{
 		Address:  "addr1",
 		Channels: make([]*datapb.ChannelStatus, 0, 10),
 	}
-	clusters["addr2"] = &datapb.DataNodeInfo{
+	info2 := &datapb.DataNodeInfo{
 		Address:  "addr2",
 		Channels: make([]*datapb.ChannelStatus, 0, 10),
 	}
-	clusters["addr3"] = &datapb.DataNodeInfo{
+	info3 := &datapb.DataNodeInfo{
 		Address:  "addr3",
 		Channels: make([]*datapb.ChannelStatus, 0, 10),
 	}
 
-	cases := []*datapb.DataNodeInfo{
-		{
-			Channels: []*datapb.ChannelStatus{},
-		},
-		{
-			Channels: []*datapb.ChannelStatus{
-				{Name: "VChan1", CollectionID: 1},
-				{Name: "VChan2", CollectionID: 2},
-			},
-		},
-		{
-			Channels: []*datapb.ChannelStatus{
-				{Name: "VChan3", CollectionID: 1},
-				{Name: "VChan4", CollectionID: 2},
-			},
+	node1 := NewNodeInfo(context.TODO(), info1)
+	node2 := NewNodeInfo(context.TODO(), info2)
+	node3 := NewNodeInfo(context.TODO(), info3)
+	clusters = append(clusters, node1, node2, node3)
+
+	caseInfo1 := &datapb.DataNodeInfo{
+		Channels: []*datapb.ChannelStatus{},
+	}
+	caseInfo2 := &datapb.DataNodeInfo{
+		Channels: []*datapb.ChannelStatus{
+			{Name: "VChan1", CollectionID: 1},
+			{Name: "VChan2", CollectionID: 2},
 		},
+	}
+	cases := []*NodeInfo{
+		{info: caseInfo1},
+		{info: caseInfo2},
 		nil,
 	}
 
 	for _, ca := range cases {
-		nodes := p.apply(clusters, ca)
-		if ca == nil || len(ca.Channels) == 0 {
+		nodes := p(clusters, ca)
+		if ca == nil || len(ca.info.GetChannels()) == 0 {
 			assert.Equal(t, 0, len(nodes))
 		} else {
-			for _, ch := range ca.Channels {
+			for _, ch := range ca.info.GetChannels() {
 				found := false
 			loop:
 				for _, node := range nodes {
-					for _, nch := range node.Channels {
+					for _, nch := range node.info.GetChannels() {
 						if nch.Name == ch.Name {
 							found = true
 							assert.EqualValues(t, datapb.ChannelWatchState_Uncomplete, nch.State)
diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go
index efb4687464712b96e1cde4d20848df8c6594b53f..ccc161f769906b2d4720a49688948d02a902431b 100644
--- a/internal/datacoord/server.go
+++ b/internal/datacoord/server.go
@@ -75,7 +75,7 @@ type Server struct {
 	meta            *meta
 	segmentManager  Manager
 	allocator       allocator
-	cluster         *cluster
+	cluster         *Cluster
 	rootCoordClient types.RootCoord
 	ddChannelName   string
 
@@ -169,13 +169,9 @@ func (s *Server) Start() error {
 }
 
 func (s *Server) initCluster() error {
-	dManager, err := newClusterNodeManager(s.kvClient)
-	if err != nil {
-		return err
-	}
-	sManager := newClusterSessionManager(s.ctx, s.dataClientCreator)
-	s.cluster = newCluster(s.ctx, dManager, sManager, s)
-	return nil
+	var err error
+	s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s)
+	return err
 }
 
 func (s *Server) initServiceDiscovery() error {
@@ -186,19 +182,18 @@ func (s *Server) initServiceDiscovery() error {
 	}
 	log.Debug("registered sessions", zap.Any("sessions", sessions))
 
-	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
+	datanodes := make([]*NodeInfo, 0, len(sessions))
 	for _, session := range sessions {
-		datanodes = append(datanodes, &datapb.DataNodeInfo{
+		info := &datapb.DataNodeInfo{
 			Address:  session.Address,
 			Version:  session.ServerID,
 			Channels: []*datapb.ChannelStatus{},
-		})
+		}
+		nodeInfo := NewNodeInfo(s.ctx, info)
+		datanodes = append(datanodes, nodeInfo)
 	}
 
-	if err := s.cluster.startup(datanodes); err != nil {
-		log.Debug("DataCoord loadMetaFromRootCoord failed", zap.Error(err))
-		return err
-	}
+	s.cluster.Startup(datanodes)
 
 	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
 	return nil
@@ -349,7 +344,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
 				segmentInfos = append(segmentInfos, sInfo)
 			}
 			if len(segmentInfos) > 0 {
-				s.cluster.flush(segmentInfos)
+				s.cluster.Flush(segmentInfos)
 			}
 			s.segmentManager.ExpireAllocations(ch, ts)
 		}
@@ -365,24 +360,23 @@ func (s *Server) startWatchService(ctx context.Context) {
 			log.Debug("watch service shutdown")
 			return
 		case event := <-s.eventCh:
-			datanode := &datapb.DataNodeInfo{
+			info := &datapb.DataNodeInfo{
 				Address:  event.Session.Address,
 				Version:  event.Session.ServerID,
 				Channels: []*datapb.ChannelStatus{},
 			}
+			node := NewNodeInfo(ctx, info)
 			switch event.EventType {
 			case sessionutil.SessionAddEvent:
 				log.Info("Received datanode register",
-					zap.String("address", datanode.Address),
-					zap.Int64("serverID", datanode.Version))
-				//s.cluster.register(datanode)
-				s.cluster.refresh(s.loadDataNodes())
+					zap.String("address", info.Address),
+					zap.Int64("serverID", info.Version))
+				s.cluster.Register(node)
 			case sessionutil.SessionDelEvent:
 				log.Info("Received datanode unregister",
-					zap.String("address", datanode.Address),
-					zap.Int64("serverID", datanode.Version))
-				//s.cluster.unregister(datanode)
-				s.cluster.refresh(s.loadDataNodes())
+					zap.String("address", info.Address),
+					zap.Int64("serverID", info.Version))
+				s.cluster.UnRegister(node)
 			default:
 				log.Warn("receive unknown service event type",
 					zap.Any("type", event.EventType))
@@ -482,7 +476,7 @@ func (s *Server) Stop() error {
 	}
 	log.Debug("DataCoord server shutdown")
 	atomic.StoreInt64(&s.isServing, ServerStateStopped)
-	s.cluster.releaseSessions()
+	s.cluster.Close()
 	s.stopServerLoop()
 	return nil
 }
diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go
index 7fcb10aeb4c3768d74e68ab86ce5993dacd754ff..31a0b470935b28088b8e842165d64711ca78e5ef 100644
--- a/internal/datacoord/server_test.go
+++ b/internal/datacoord/server_test.go
@@ -498,8 +498,7 @@ func TestDataNodeTtChannel(t *testing.T) {
 			},
 		}
 	}
-	svr.cluster.sessionManager.getOrCreateSession("localhost:7777") // trigger create session manually
-	svr.cluster.register(&datapb.DataNodeInfo{
+	info := &datapb.DataNodeInfo{
 		Address: "localhost:7777",
 		Version: 0,
 		Channels: []*datapb.ChannelStatus{
@@ -508,7 +507,11 @@ func TestDataNodeTtChannel(t *testing.T) {
 				State: datapb.ChannelWatchState_Complete,
 			},
 		},
-	})
+	}
+	node := NewNodeInfo(context.TODO(), info)
+	node.client, err = newMockDataNodeClient(1, ch)
+	assert.Nil(t, err)
+	svr.cluster.Register(node)
 
 	t.Run("Test segment flush after tt", func(t *testing.T) {
 		resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
diff --git a/internal/datacoord/stream_pos.go b/internal/datacoord/stream_pos.go
deleted file mode 100644
index 8a242c442e86c97a643ccaa8ae0d1724b712456a..0000000000000000000000000000000000000000
--- a/internal/datacoord/stream_pos.go
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (C) 2019-2020 Zilliz. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software distributed under the License
-// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
-// or implied. See the License for the specific language governing permissions and limitations under the License.
-
-package datacoord
-
-import (
-	"errors"
-
-	"github.com/golang/protobuf/proto"
-	"github.com/milvus-io/milvus/internal/msgstream"
-)
-
-type streamType int
-
-const (
-	_ streamType = iota
-	streamTypeFlush
-	streamTypeStats
-)
-
-var (
-	errInvalidStreamType = errors.New("invalid stream type")
-)
-
-// storeStreamPos store current processed stream pos
-func (s *Server) storeStreamPos(st streamType, pos *msgstream.MsgPosition) error {
-	key := s.streamTypeSubKey(st)
-	if key == "" {
-		return errInvalidStreamType
-	}
-	val := proto.MarshalTextString(pos)
-	err := s.kvClient.Save(key, val)
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
-// loadStreamLastPos load last successful pos with specified stream type
-func (s *Server) loadStreamLastPos(st streamType) (pos *msgstream.MsgPosition, err error) {
-	key := s.streamTypeSubKey(st)
-	if key == "" {
-		return nil, errInvalidStreamType
-	}
-	var val string
-	pos = &msgstream.MsgPosition{}
-	val, err = s.kvClient.Load(key)
-	if err != nil {
-		return pos, err
-	}
-	err = proto.UnmarshalText(val, pos)
-	return pos, err
-}
-
-// streamTypeSubKey converts stream type to corresponding k-v store key
-func (s *Server) streamTypeSubKey(st streamType) string {
-	switch st {
-	case streamTypeFlush:
-		return Params.FlushStreamPosSubPath
-	case streamTypeStats:
-		return Params.StatsStreamPosSubPath
-	default:
-		return ""
-	}
-}