diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go
index a683e4a28f79e99bc612ede3cbade3305ec34399..e1886b77aaaf0b7e7d58de657b2e13b5ea58676e 100644
--- a/cmd/masterservice/main.go
+++ b/cmd/masterservice/main.go
@@ -36,9 +36,6 @@ func main() {
 	psc.Params.Init()
 	log.Printf("proxy service address : %s", psc.Params.ServiceAddress)
 	proxyService := psc.NewClient(psc.Params.ServiceAddress)
-	if err = proxyService.Init(); err != nil {
-		panic(err)
-	}
 
 	for cnt = 0; cnt < reTryCnt; cnt++ {
 		pxStates, err := proxyService.GetComponentStates()
diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go
index 48827b965be801df7f105dcbcdad57a77548c0fb..83400a8ea206af8f18e99faaab15545048f5d81a 100644
--- a/internal/datanode/allocator.go
+++ b/internal/datanode/allocator.go
@@ -24,7 +24,7 @@ func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
 func (alloc *allocatorImpl) allocID() (UniqueID, error) {
 	resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
 		Base: &commonpb.MsgBase{
-			MsgType:   commonpb.MsgType_kRequestID,
+			MsgType:   commonpb.MsgType_kShowCollections,
 			MsgID:     1, // GOOSE TODO
 			Timestamp: 0, // GOOSE TODO
 			SourceID:  Params.NodeID,
diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go
index 7de7101c190304d4e28d5bdac0ee70672e22dcb9..1cbdf38d3c6c994ae577548db6819518fb4772f3 100644
--- a/internal/datanode/data_node_test.go
+++ b/internal/datanode/data_node_test.go
@@ -1,15 +1,20 @@
 package datanode
 
 import (
+	"context"
+	"fmt"
 	"log"
 	"math/rand"
 	"os"
 	"strconv"
 	"testing"
+	"time"
 
 	"go.etcd.io/etcd/clientv3"
+	"go.uber.org/zap"
 
 	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
+	"github.com/zilliztech/milvus-distributed/internal/master"
 )
 
 func makeNewChannelNames(names []string, suffix string) []string {
@@ -31,10 +36,52 @@ func refreshChannelNames() {
 	Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix)
 }
 
+func startMaster(ctx context.Context) {
+	master.Init()
+	etcdAddr := master.Params.EtcdAddress
+	metaRootPath := master.Params.MetaRootPath
+
+	etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
+	if err != nil {
+		panic(err)
+	}
+	_, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix())
+	if err != nil {
+		panic(err)
+	}
+
+	masterPort := 53101
+	master.Params.Port = masterPort
+	svr, err := master.CreateServer(ctx)
+	if err != nil {
+		log.Print("create server failed", zap.Error(err))
+	}
+	if err := svr.Run(int64(master.Params.Port)); err != nil {
+		log.Fatal("run server failed", zap.Error(err))
+	}
+
+	fmt.Println("Waiting for server!", svr.IsServing())
+	Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort)
+}
+
 func TestMain(m *testing.M) {
 	Params.Init()
 
 	refreshChannelNames()
+	const ctxTimeInMillisecond = 2000
+	const closeWithDeadline = true
+	var ctx context.Context
+
+	if closeWithDeadline {
+		var cancel context.CancelFunc
+		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
+		ctx, cancel = context.WithDeadline(context.Background(), d)
+		defer cancel()
+	} else {
+		ctx = context.Background()
+	}
+
+	startMaster(ctx)
 	exitCode := m.Run()
 	os.Exit(exitCode)
 }
diff --git a/internal/datanode/meta_table.go b/internal/datanode/meta_table.go
index 6a9128ff8690a40e149db17f082ad3a21b2c2e6a..898a6aa64a0426d5f578d32e1a808882f953d8c6 100644
--- a/internal/datanode/meta_table.go
+++ b/internal/datanode/meta_table.go
@@ -12,9 +12,9 @@ import (
 )
 
 type metaTable struct {
-	client          kv.Base //
+	client          kv.TxnBase //
 	segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta
-	collID2DdlMeta  map[UniqueID]*datapb.DDLFlushMeta
+	collID2DdlMeta  map[UniqueID]*datapb.DDLFlushMeta // GOOSE TODO: addDDLFlush and has DDLFlush
 
 	lock sync.RWMutex
 }
@@ -36,6 +36,24 @@ func NewMetaTable(kv kv.TxnBase) (*metaTable, error) {
 	return mt, nil
 }
 
+func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
+	mt.lock.Lock()
+	defer mt.lock.Unlock()
+
+	_, ok := mt.collID2DdlMeta[collID]
+	if !ok {
+		mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
+			CollectionID: collID,
+			BinlogPaths:  make([]string, 0),
+		}
+	}
+
+	meta := mt.collID2DdlMeta[collID]
+	meta.BinlogPaths = append(meta.BinlogPaths, paths...)
+
+	return mt.saveDDLFlushMeta(meta)
+}
+
 func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error {
 	_, ok := mt.segID2FlushMeta[segmentID]
 	if !ok {
@@ -79,23 +97,31 @@ func (mt *metaTable) CompleteFlush(segmentID UniqueID) error {
 	return mt.saveSegFlushMeta(meta)
 }
 
-func (mt *metaTable) reloadSegMetaFromKV() error {
-	mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta)
+// metaTable.lock.Lock() before call this function
+func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
+	value := proto.MarshalTextString(meta)
 
-	_, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath)
+	mt.collID2DdlMeta[meta.CollectionID] = meta
+	prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
+
+	return mt.client.Save(prefix, value)
+}
+
+func (mt *metaTable) reloadDdlMetaFromKV() error {
+	mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
+	_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
 	if err != nil {
 		return err
 	}
 
 	for _, value := range values {
-		flushMeta := &datapb.SegmentFlushMeta{}
-		err = proto.UnmarshalText(value, flushMeta)
+		ddlMeta := &datapb.DDLFlushMeta{}
+		err = proto.UnmarshalText(value, ddlMeta)
 		if err != nil {
 			return err
 		}
-		mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta
+		mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
 	}
-
 	return nil
 }
 
@@ -109,6 +135,26 @@ func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error {
 	return mt.client.Save(prefix, value)
 }
 
+func (mt *metaTable) reloadSegMetaFromKV() error {
+	mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta)
+
+	_, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath)
+	if err != nil {
+		return err
+	}
+
+	for _, value := range values {
+		flushMeta := &datapb.SegmentFlushMeta{}
+		err = proto.UnmarshalText(value, flushMeta)
+		if err != nil {
+			return err
+		}
+		mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta
+	}
+
+	return nil
+}
+
 func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error {
 	mt.lock.Lock()
 	defer mt.lock.Unlock()
@@ -151,61 +197,6 @@ func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string,
 	return ret, nil
 }
 
-// --- DDL ---
-func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error {
-	mt.lock.Lock()
-	defer mt.lock.Unlock()
-
-	_, ok := mt.collID2DdlMeta[collID]
-	if !ok {
-		mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{
-			CollectionID: collID,
-			BinlogPaths:  make([]string, 0),
-		}
-	}
-
-	meta := mt.collID2DdlMeta[collID]
-	meta.BinlogPaths = append(meta.BinlogPaths, paths...)
-
-	return mt.saveDDLFlushMeta(meta)
-}
-
-func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool {
-	mt.lock.RLock()
-	defer mt.lock.RUnlock()
-
-	_, ok := mt.collID2DdlMeta[collID]
-	return ok
-}
-
-// metaTable.lock.Lock() before call this function
-func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error {
-	value := proto.MarshalTextString(meta)
-
-	mt.collID2DdlMeta[meta.CollectionID] = meta
-	prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10))
-
-	return mt.client.Save(prefix, value)
-}
-
-func (mt *metaTable) reloadDdlMetaFromKV() error {
-	mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta)
-	_, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath)
-	if err != nil {
-		return err
-	}
-
-	for _, value := range values {
-		ddlMeta := &datapb.DDLFlushMeta{}
-		err = proto.UnmarshalText(value, ddlMeta)
-		if err != nil {
-			return err
-		}
-		mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta
-	}
-	return nil
-}
-
 func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) {
 	mt.lock.RLock()
 	defer mt.lock.RUnlock()
diff --git a/internal/datanode/meta_table_test.go b/internal/datanode/meta_table_test.go
index 247cbff51f82542f7ca7e5597787686abfb856e7..dd5a4251dc5baaf8c98a9727e5d4165d76edc187 100644
--- a/internal/datanode/meta_table_test.go
+++ b/internal/datanode/meta_table_test.go
@@ -1,16 +1,26 @@
 package datanode
 
 import (
+	"context"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
-	memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
+	"github.com/stretchr/testify/require"
+	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
+	"go.etcd.io/etcd/clientv3"
 )
 
-func TestMetaTable_SegmentFlush(t *testing.T) {
+func TestMetaTable_all(t *testing.T) {
 
-	kvMock := memkv.NewMemoryKV()
-	meta, err := NewMetaTable(kvMock)
+	etcdAddr := Params.EtcdAddress
+	cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
+	require.NoError(t, err)
+	etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/meta/root")
+
+	_, err = cli.Delete(context.TODO(), "/etcd/test/meta/root", clientv3.WithPrefix())
+	require.NoError(t, err)
+
+	meta, err := NewMetaTable(etcdKV)
 	assert.NoError(t, err)
 	defer meta.client.Close()
 
@@ -55,37 +65,8 @@ func TestMetaTable_SegmentFlush(t *testing.T) {
 			ret)
 	})
 
-	t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) {
-
-		var segmentID UniqueID = 401
-
-		err := meta.addSegmentFlush(segmentID)
-		assert.NoError(t, err)
-
-		ret, err := meta.checkFlushComplete(segmentID)
-		assert.NoError(t, err)
-		assert.Equal(t, false, ret)
-
-		meta.CompleteFlush(segmentID)
-
-		ret, err = meta.checkFlushComplete(segmentID)
-		assert.NoError(t, err)
-		assert.Equal(t, true, ret)
-	})
-
-}
-
-func TestMetaTable_DDLFlush(t *testing.T) {
-	kvMock := memkv.NewMemoryKV()
-	meta, err := NewMetaTable(kvMock)
-	assert.NoError(t, err)
-	defer meta.client.Close()
-
 	t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) {
 
-		assert.False(t, meta.hasDDLFlushMeta(301))
-		assert.False(t, meta.hasDDLFlushMeta(302))
-
 		collID2Paths := map[UniqueID][]string{
 			301: {"a", "b", "c"},
 			302: {"c", "b", "a"},
@@ -103,8 +84,24 @@ func TestMetaTable_DDLFlush(t *testing.T) {
 			assert.Nil(t, err)
 			assert.Equal(t, map[UniqueID][]string{k: v}, ret)
 		}
+	})
+
+	t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) {
+
+		var segmentID UniqueID = 401
+
+		err := meta.addSegmentFlush(segmentID)
+		assert.NoError(t, err)
+
+		ret, err := meta.checkFlushComplete(segmentID)
+		assert.NoError(t, err)
+		assert.Equal(t, false, ret)
 
-		assert.True(t, meta.hasDDLFlushMeta(301))
-		assert.True(t, meta.hasDDLFlushMeta(302))
+		meta.CompleteFlush(segmentID)
+
+		ret, err = meta.checkFlushComplete(segmentID)
+		assert.NoError(t, err)
+		assert.Equal(t, true, ret)
 	})
+
 }
diff --git a/internal/dataservice/channel.go b/internal/dataservice/channel.go
index fb9a2c6a10d094bd7be09fb82c8a9dfca7232f65..e88b9030b0f1ab4480989aa6fd6aed35ce795aeb 100644
--- a/internal/dataservice/channel.go
+++ b/internal/dataservice/channel.go
@@ -49,13 +49,14 @@ func (cm *insertChannelManager) AllocChannels(collectionID UniqueID, groupNum in
 			group = make([]string, m)
 		}
 		for k := 0; k < len(group); k++ {
-			group = append(group, Params.InsertChannelPrefixName+strconv.Itoa(cm.count))
+			group[k] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count)
 			cm.count++
 		}
 		i += int64(len(group))
 		j++
 		cg = append(cg, group)
 	}
+	cm.channelGroups[collectionID] = cg
 	return cg, nil
 }
 
diff --git a/internal/dataservice/channel_test.go b/internal/dataservice/channel_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..e05884c3681c9aa0a6334232d99effa46049dc27
--- /dev/null
+++ b/internal/dataservice/channel_test.go
@@ -0,0 +1,38 @@
+package dataservice
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestChannelAllocation(t *testing.T) {
+	Params.Init()
+	Params.InsertChannelNumPerCollection = 4
+	manager := newInsertChannelManager()
+	cases := []struct {
+		collectionID   UniqueID
+		groupNum       int
+		expectGroupNum int
+		success        bool
+	}{
+		{1, 4, 4, true},
+		{1, 4, 4, false},
+		{2, 1, 1, true},
+		{3, 5, 4, true},
+	}
+	for _, c := range cases {
+		channels, err := manager.AllocChannels(c.collectionID, c.expectGroupNum)
+		if !c.success {
+			assert.NotNil(t, err)
+			continue
+		}
+		assert.Nil(t, err)
+		assert.EqualValues(t, c.expectGroupNum, len(channels))
+		total := 0
+		for _, channel := range channels {
+			total += len(channel)
+		}
+		assert.EqualValues(t, Params.InsertChannelNumPerCollection, total)
+	}
+}
diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go
index a7434d108a90ab19f81565023bc44dfc14656cf5..9964b345045c98fe64ee28934fd405f5294e1619 100644
--- a/internal/dataservice/cluster.go
+++ b/internal/dataservice/cluster.go
@@ -64,8 +64,8 @@ func (c *dataNodeCluster) GetNodeIDs() []int64 {
 	c.mu.RLock()
 	defer c.mu.RUnlock()
 	ret := make([]int64, len(c.nodes))
-	for _, node := range c.nodes {
-		ret = append(ret, node.id)
+	for i, node := range c.nodes {
+		ret[i] = node.id
 	}
 	return ret
 }
diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go
index 53e81b25d98cbd5a09a6624acf3c684457c3679d..f9d13892a3eeec9b77d52f379a4a3c404eac7b51 100644
--- a/internal/dataservice/segment_allocator.go
+++ b/internal/dataservice/segment_allocator.go
@@ -84,6 +84,8 @@ func newSegmentAllocator(meta *meta, allocator allocator) (*segmentAllocatorImpl
 }
 
 func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
+	allocator.mu.Lock()
+	defer allocator.mu.Unlock()
 	if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
 		return fmt.Errorf("segment %d already exist", segmentInfo.SegmentID)
 	}
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index ec54db9403b45541bffbca92c56f69ad00576a44..562653fb8850c5d24fe46016c7cffa2c30b528c3 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -652,9 +652,9 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat
 	}
 	fields := make([]UniqueID, len(flushMeta.Fields))
 	paths := make([]*internalpb2.StringList, len(flushMeta.Fields))
-	for _, field := range flushMeta.Fields {
-		fields = append(fields, field.FieldID)
-		paths = append(paths, &internalpb2.StringList{Values: field.BinlogPaths})
+	for i, field := range flushMeta.Fields {
+		fields[i] = field.FieldID
+		paths[i] = &internalpb2.StringList{Values: field.BinlogPaths}
 	}
 	resp.FieldIDs = fields
 	resp.Paths = paths
@@ -674,7 +674,7 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string,
 		return nil, err
 	}
 
-	channels := make([]string, Params.InsertChannelNumPerCollection)
+	channels := make([]string, 0)
 	for _, group := range channelGroups {
 		channels = append(channels, group...)
 	}
diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go
index 9246ba702b84e232e2ec43727283209d5ce15b14..490ae036cabe98f04eb9e14d590543af81ccfa56 100644
--- a/internal/dataservice/watcher.go
+++ b/internal/dataservice/watcher.go
@@ -69,40 +69,47 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
 			log.Println("data node time tick watcher closed")
 			return
 		case msg := <-watcher.msgQueue:
-			segments, err := watcher.allocator.GetSealedSegments()
+			if err := watcher.handleTimeTickMsg(msg); err != nil {
+				log.Println(err.Error())
+				continue
+			}
+		}
+	}
+}
+
+func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTickMsg) error {
+	segments, err := watcher.allocator.GetSealedSegments()
+	if err != nil {
+		return err
+	}
+	for _, id := range segments {
+		expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp)
+		if err != nil {
+			log.Printf("check allocations expired error %s", err.Error())
+			continue
+		}
+		if expired {
+			segmentInfo, err := watcher.meta.GetSegment(id)
 			if err != nil {
-				log.Printf("get sealed segments error %s", err.Error())
+				log.Println(err.Error())
 				continue
 			}
-			for _, id := range segments {
-				expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp)
-				if err != nil {
-					log.Printf("check allocations expired error %s", err.Error())
-					continue
-				}
-				if expired {
-					segmentInfo, err := watcher.meta.GetSegment(id)
-					if err != nil {
-						log.Println(err.Error())
-						continue
-					}
-					if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil {
-						log.Println(err.Error())
-						continue
-					}
-					watcher.cluster.FlushSegment(&datapb.FlushSegRequest{
-						Base: &commonpb.MsgBase{
-							MsgType:   commonpb.MsgType_kShowCollections,
-							MsgID:     -1, // todo add msg id
-							Timestamp: 0,  // todo
-							SourceID:  -1, // todo
-						},
-						CollectionID: segmentInfo.CollectionID,
-						SegmentIDs:   []int64{segmentInfo.SegmentID},
-					})
-					watcher.allocator.DropSegment(id)
-				}
+			if err = watcher.meta.SetSegmentState(id, datapb.SegmentState_SegmentSealed); err != nil {
+				log.Println(err.Error())
+				continue
 			}
+			watcher.cluster.FlushSegment(&datapb.FlushSegRequest{
+				Base: &commonpb.MsgBase{
+					MsgType:   commonpb.MsgType_kShowCollections,
+					MsgID:     -1, // todo add msg id
+					Timestamp: 0,  // todo
+					SourceID:  Params.NodeID,
+				},
+				CollectionID: segmentInfo.CollectionID,
+				SegmentIDs:   []int64{segmentInfo.SegmentID},
+			})
+			watcher.allocator.DropSegment(id)
 		}
 	}
+	return nil
 }
diff --git a/internal/dataservice/watcher_test.go b/internal/dataservice/watcher_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..85af18e53026656e9fc32684d7f7d1d17fe3ee3d
--- /dev/null
+++ b/internal/dataservice/watcher_test.go
@@ -0,0 +1,97 @@
+package dataservice
+
+import (
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestDataNodeTTWatcher(t *testing.T) {
+	Params.Init()
+	c := make(chan struct{})
+	cluster := newDataNodeCluster(c)
+	defer cluster.ShutDownClients()
+	schema := newTestSchema()
+	allocator := newMockAllocator()
+	meta, err := newMemoryMeta(allocator)
+	assert.Nil(t, err)
+	segAllocator, err := newSegmentAllocator(meta, allocator)
+	assert.Nil(t, err)
+	watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster)
+
+	id, err := allocator.allocID()
+	assert.Nil(t, err)
+	err = meta.AddCollection(&collectionInfo{
+		Schema: schema,
+		ID:     id,
+	})
+	assert.Nil(t, err)
+
+	cases := []struct {
+		sealed     bool
+		allocation bool
+		expired    bool
+		expected   bool
+	}{
+		{false, false, true, false},
+		{false, true, true, false},
+		{false, true, false, false},
+		{true, false, true, true},
+		{true, true, false, false},
+		{true, true, true, true},
+	}
+
+	segmentIDs := make([]UniqueID, len(cases))
+	for i, c := range cases {
+		segID, err := allocator.allocID()
+		segmentIDs[i] = segID
+		assert.Nil(t, err)
+		segmentInfo, err := BuildSegment(id, 100, segID, []string{"channel" + strconv.Itoa(i)})
+		assert.Nil(t, err)
+		err = meta.AddSegment(segmentInfo)
+		assert.Nil(t, err)
+		err = segAllocator.OpenSegment(segmentInfo)
+		assert.Nil(t, err)
+		if c.allocation && c.expired {
+			_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
+			assert.Nil(t, err)
+		}
+	}
+
+	time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
+	for i, c := range cases {
+		if c.allocation && !c.expired {
+			_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
+			assert.Nil(t, err)
+		}
+		if c.sealed {
+			err := segAllocator.SealSegment(segmentIDs[i])
+			assert.Nil(t, err)
+		}
+	}
+	ts, err := allocator.allocTimestamp()
+	assert.Nil(t, err)
+
+	err = watcher.handleTimeTickMsg(&msgstream.TimeTickMsg{
+		BaseMsg: msgstream.BaseMsg{
+			HashValues: []uint32{0},
+		},
+		TimeTickMsg: internalpb2.TimeTickMsg{
+			Base: &commonpb.MsgBase{
+				MsgType:   commonpb.MsgType_kTimeTick,
+				Timestamp: ts,
+			},
+		},
+	})
+	assert.Nil(t, err)
+	for i, c := range cases {
+		_, ok := segAllocator.segments[segmentIDs[i]]
+		assert.EqualValues(t, !c.expected, ok)
+	}
+}
diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go
index d2d8ab7613d93607d6df72f2dab4e3b891d59db4..89d7b10749f3fd24b0405c386ded475d4fa1e5ba 100644
--- a/internal/masterservice/master_service.go
+++ b/internal/masterservice/master_service.go
@@ -247,9 +247,7 @@ func (c *Core) checkInit() error {
 	if c.DataNodeSegmentFlushCompletedChan == nil {
 		return errors.Errorf("DataNodeSegmentFlushCompletedChan is nil")
 	}
-	log.Printf("master node id = %d", Params.NodeID)
-	log.Printf("master dd channel name = %s", Params.DdChannel)
-	log.Printf("master time ticke channel name = %s", Params.TimeTickChannel)
+	log.Printf("master node id = %d\n", Params.NodeID)
 	return nil
 }
 
@@ -609,7 +607,6 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error {
 		return err
 	}
 	Params.ProxyTimeTickChannel = rsp
-	log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel)
 
 	c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error {
 		err := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{
@@ -636,8 +633,6 @@ func (c *Core) SetDataService(s DataServiceInterface) error {
 		return err
 	}
 	Params.DataServiceSegmentChannel = rsp
-	log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel)
-
 	c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) {
 		ts, err := c.tsoAllocator.Alloc(1)
 		if err != nil {
diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh
index 1ada91db2d028ddf10d4ed5a4151a69fcb095326..d91ee4d3266287f7661667a27356342ff4d50493 100755
--- a/scripts/run_go_unittest.sh
+++ b/scripts/run_go_unittest.sh
@@ -18,7 +18,7 @@ go test -race -cover "${MILVUS_DIR}/kv/..." -failfast
 # TODO: remove to distributed
 #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast
 #go test -race -cover "${MILVUS_DIR}/writenode/..." -failfast
-go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
+#go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast
 #go test -race -cover "${MILVUS_DIR}/master/..." -failfast
 #go test -race -cover "${MILVUS_DIR}/indexnode/..." -failfast
 #go test -race -cover "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage"   "${MILVUS_DIR}/util/..." -failfast
@@ -28,3 +28,4 @@ go test -race -cover "${MILVUS_DIR}/msgstream/..." -failfast
 
 go test -race -cover -v "${MILVUS_DIR}/masterservice" "${MILVUS_DIR}/distributed/masterservice" -failfast
 #go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast
+go test -race -cover "${MILVUS_DIR}/dataservice/..." -failfast