diff --git a/internal/dataservice/dd_handler.go b/internal/dataservice/dd_handler.go
index 644fff1dde6dd893c1681dbe27e7274e445a3f30..ad7ddc3c55e1fe0d1b83bbf4da904eb81f9c2978 100644
--- a/internal/dataservice/dd_handler.go
+++ b/internal/dataservice/dd_handler.go
@@ -1,8 +1,9 @@
 package dataservice
 
 import (
-	"github.com/golang/protobuf/proto"
+	"context"
 
+	"github.com/golang/protobuf/proto"
 	"github.com/zilliztech/milvus-distributed/internal/msgstream"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@@ -20,20 +21,20 @@ func newDDHandler(meta *meta, allocator segmentAllocatorInterface) *ddHandler {
 	}
 }
 
-func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error {
+func (handler *ddHandler) HandleDDMsg(ctx context.Context, msg msgstream.TsMsg) error {
 	switch msg.Type() {
 	case commonpb.MsgType_CreateCollection:
 		realMsg := msg.(*msgstream.CreateCollectionMsg)
 		return handler.handleCreateCollection(realMsg)
 	case commonpb.MsgType_DropCollection:
 		realMsg := msg.(*msgstream.DropCollectionMsg)
-		return handler.handleDropCollection(realMsg)
+		return handler.handleDropCollection(ctx, realMsg)
 	case commonpb.MsgType_CreatePartition:
 		realMsg := msg.(*msgstream.CreatePartitionMsg)
 		return handler.handleCreatePartition(realMsg)
 	case commonpb.MsgType_DropPartition:
 		realMsg := msg.(*msgstream.DropPartitionMsg)
-		return handler.handleDropPartition(realMsg)
+		return handler.handleDropPartition(ctx, realMsg)
 	default:
 		return nil
 	}
@@ -54,10 +55,10 @@ func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollection
 	return nil
 }
 
-func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg) error {
+func (handler *ddHandler) handleDropCollection(ctx context.Context, msg *msgstream.DropCollectionMsg) error {
 	segmentsOfCollection := handler.meta.GetSegmentsOfCollection(msg.CollectionID)
 	for _, id := range segmentsOfCollection {
-		handler.segmentAllocator.DropSegment(id)
+		handler.segmentAllocator.DropSegment(ctx, id)
 	}
 	if err := handler.meta.DropCollection(msg.CollectionID); err != nil {
 		return err
@@ -65,10 +66,10 @@ func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg)
 	return nil
 }
 
-func (handler *ddHandler) handleDropPartition(msg *msgstream.DropPartitionMsg) error {
+func (handler *ddHandler) handleDropPartition(ctx context.Context, msg *msgstream.DropPartitionMsg) error {
 	segmentsOfPartition := handler.meta.GetSegmentsOfPartition(msg.CollectionID, msg.PartitionID)
 	for _, id := range segmentsOfPartition {
-		handler.segmentAllocator.DropSegment(id)
+		handler.segmentAllocator.DropSegment(ctx, id)
 	}
 	if err := handler.meta.DropPartition(msg.CollectionID, msg.PartitionID); err != nil {
 		return err
diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go
index 7edcc3eaa20f1f5665494d0425057560c16fc3a5..bf81d95dd9cbe693591871de25e0abb9e75de0c1 100644
--- a/internal/dataservice/segment_allocator.go
+++ b/internal/dataservice/segment_allocator.go
@@ -1,16 +1,17 @@
 package dataservice
 
 import (
+	"context"
 	"fmt"
 	"strconv"
 	"sync"
 	"time"
 
-	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
-
+	"github.com/zilliztech/milvus-distributed/internal/util/trace"
+	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
 	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
 
-	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
+	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
 )
 
 type errRemainInSufficient struct {
@@ -28,21 +29,21 @@ func (err errRemainInSufficient) Error() string {
 // segmentAllocator is used to allocate rows for segments and record the allocations.
 type segmentAllocatorInterface interface {
 	// OpenSegment add the segment to allocator and set it allocatable
-	OpenSegment(segmentInfo *datapb.SegmentInfo) error
+	OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error
 	// AllocSegment allocate rows and record the allocation.
-	AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
+	AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
 	// GetSealedSegments get all sealed segment.
-	GetSealedSegments() ([]UniqueID, error)
+	GetSealedSegments(ctx context.Context) ([]UniqueID, error)
 	// SealSegment set segment sealed, the segment will not be allocated anymore.
-	SealSegment(segmentID UniqueID) error
+	SealSegment(ctx context.Context, segmentID UniqueID) error
 	// DropSegment drop the segment from allocator.
-	DropSegment(segmentID UniqueID)
+	DropSegment(ctx context.Context, segmentID UniqueID)
 	// ExpireAllocations check all allocations' expire time and remove the expired allocation.
-	ExpireAllocations(timeTick Timestamp) error
+	ExpireAllocations(ctx context.Context, timeTick Timestamp) error
 	// SealAllSegments get all opened segment ids of collection. return success and failed segment ids
-	SealAllSegments(collectionID UniqueID)
+	SealAllSegments(ctx context.Context, collectionID UniqueID)
 	// IsAllocationsExpired check all allocations of segment expired.
-	IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error)
+	IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error)
 }
 
 type segmentStatus struct {
@@ -81,7 +82,9 @@ func newSegmentAllocator(meta *meta, allocator allocatorInterface) *segmentAlloc
 	return segmentAllocator
 }
 
-func (allocator *segmentAllocator) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
+func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
@@ -103,8 +106,10 @@ func (allocator *segmentAllocator) OpenSegment(segmentInfo *datapb.SegmentInfo)
 	return nil
 }
 
-func (allocator *segmentAllocator) AllocSegment(collectionID UniqueID,
+func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
 	partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 
@@ -173,7 +178,9 @@ func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int
 	return int(allocator.segmentThreshold / float64(sizePerRecord)), nil
 }
 
-func (allocator *segmentAllocator) GetSealedSegments() ([]UniqueID, error) {
+func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	keys := make([]UniqueID, 0)
@@ -200,7 +207,9 @@ func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus)
 	return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil
 }
 
-func (allocator *segmentAllocator) SealSegment(segmentID UniqueID) error {
+func (allocator *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	status, ok := allocator.segments[segmentID]
@@ -211,13 +220,17 @@ func (allocator *segmentAllocator) SealSegment(segmentID UniqueID) error {
 	return nil
 }
 
-func (allocator *segmentAllocator) DropSegment(segmentID UniqueID) {
+func (allocator *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	delete(allocator.segments, segmentID)
 }
 
-func (allocator *segmentAllocator) ExpireAllocations(timeTick Timestamp) error {
+func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	for _, segStatus := range allocator.segments {
@@ -232,7 +245,9 @@ func (allocator *segmentAllocator) ExpireAllocations(timeTick Timestamp) error {
 	return nil
 }
 
-func (allocator *segmentAllocator) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) {
+func (allocator *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.RLock()
 	defer allocator.mu.RUnlock()
 	status, ok := allocator.segments[segmentID]
@@ -242,7 +257,9 @@ func (allocator *segmentAllocator) IsAllocationsExpired(segmentID UniqueID, ts T
 	return status.lastExpireTime <= ts, nil
 }
 
-func (allocator *segmentAllocator) SealAllSegments(collectionID UniqueID) {
+func (allocator *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	allocator.mu.Lock()
 	defer allocator.mu.Unlock()
 	for _, status := range allocator.segments {
diff --git a/internal/dataservice/segment_allocator_test.go b/internal/dataservice/segment_allocator_test.go
index 9f81783ef6dd86111bc4833f6b7a20e1fe07ae51..cdce69d004d2420ae1a9c92f4bdf7e9739a17205 100644
--- a/internal/dataservice/segment_allocator_test.go
+++ b/internal/dataservice/segment_allocator_test.go
@@ -1,6 +1,7 @@
 package dataservice
 
 import (
+	"context"
 	"log"
 	"math"
 	"strconv"
@@ -13,6 +14,7 @@ import (
 )
 
 func TestAllocSegment(t *testing.T) {
+	ctx := context.Background()
 	Params.Init()
 	mockAllocator := newMockAllocator()
 	meta, err := newMemoryMeta(mockAllocator)
@@ -33,7 +35,7 @@ func TestAllocSegment(t *testing.T) {
 	assert.Nil(t, err)
 	err = meta.AddSegment(segmentInfo)
 	assert.Nil(t, err)
-	err = segAllocator.OpenSegment(segmentInfo)
+	err = segAllocator.OpenSegment(ctx, segmentInfo)
 	assert.Nil(t, err)
 
 	cases := []struct {
@@ -50,7 +52,7 @@ func TestAllocSegment(t *testing.T) {
 		{collID, 100, "c1", math.MaxInt64, false},
 	}
 	for _, c := range cases {
-		id, count, expireTime, err := segAllocator.AllocSegment(c.collectionID, c.partitionID, c.channelName, c.requestRows)
+		id, count, expireTime, err := segAllocator.AllocSegment(ctx, c.collectionID, c.partitionID, c.channelName, c.requestRows)
 		if c.expectResult {
 			assert.Nil(t, err)
 			assert.EqualValues(t, c.requestRows, count)
@@ -63,6 +65,7 @@ func TestAllocSegment(t *testing.T) {
 }
 
 func TestSealSegment(t *testing.T) {
+	ctx := context.Background()
 	Params.Init()
 	mockAllocator := newMockAllocator()
 	meta, err := newMemoryMeta(mockAllocator)
@@ -85,20 +88,21 @@ func TestSealSegment(t *testing.T) {
 		assert.Nil(t, err)
 		err = meta.AddSegment(segmentInfo)
 		assert.Nil(t, err)
-		err = segAllocator.OpenSegment(segmentInfo)
+		err = segAllocator.OpenSegment(ctx, segmentInfo)
 		assert.Nil(t, err)
 		lastSegID = segmentInfo.SegmentID
 	}
 
-	err = segAllocator.SealSegment(lastSegID)
+	err = segAllocator.SealSegment(ctx, lastSegID)
 	assert.Nil(t, err)
-	segAllocator.SealAllSegments(collID)
-	sealedSegments, err := segAllocator.GetSealedSegments()
+	segAllocator.SealAllSegments(ctx, collID)
+	sealedSegments, err := segAllocator.GetSealedSegments(ctx)
 	assert.Nil(t, err)
 	assert.EqualValues(t, 10, len(sealedSegments))
 }
 
 func TestExpireSegment(t *testing.T) {
+	ctx := context.Background()
 	Params.Init()
 	mockAllocator := newMockAllocator()
 	meta, err := newMemoryMeta(mockAllocator)
@@ -119,10 +123,10 @@ func TestExpireSegment(t *testing.T) {
 	assert.Nil(t, err)
 	err = meta.AddSegment(segmentInfo)
 	assert.Nil(t, err)
-	err = segAllocator.OpenSegment(segmentInfo)
+	err = segAllocator.OpenSegment(ctx, segmentInfo)
 	assert.Nil(t, err)
 
-	id1, _, et, err := segAllocator.AllocSegment(collID, 100, "c1", 10)
+	id1, _, et, err := segAllocator.AllocSegment(ctx, collID, 100, "c1", 10)
 	ts2, _ := tsoutil.ParseTS(et)
 	log.Printf("physical ts: %s", ts2.String())
 	assert.Nil(t, err)
@@ -134,9 +138,9 @@ func TestExpireSegment(t *testing.T) {
 	time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
 	ts, err = mockAllocator.allocTimestamp()
 	assert.Nil(t, err)
-	err = segAllocator.ExpireAllocations(ts)
+	err = segAllocator.ExpireAllocations(ctx, ts)
 	assert.Nil(t, err)
-	expired, err := segAllocator.IsAllocationsExpired(id1, ts)
+	expired, err := segAllocator.IsAllocationsExpired(ctx, id1, ts)
 	if et > ts {
 		tsPhy, _ := tsoutil.ParseTS(ts)
 		log.Printf("ts %s", tsPhy.String())
diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index f9e7ab146faa7515d3b67ba081b66d046acaafde..d5fae749a1fdc9d124ec9549fc012b2cf6582222 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -22,6 +22,7 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/timesync"
 	"github.com/zilliztech/milvus-distributed/internal/types"
 	"github.com/zilliztech/milvus-distributed/internal/util/retry"
+	"github.com/zilliztech/milvus-distributed/internal/util/trace"
 	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
 	"go.etcd.io/etcd/clientv3"
 	"go.uber.org/zap"
@@ -367,9 +368,9 @@ func (s *Server) startDDChannel(ctx context.Context) {
 			return
 		default:
 		}
-		msgPack, _ := ddStream.Consume()
+		msgPack, ctx := ddStream.Consume()
 		for _, msg := range msgPack.Msgs {
-			if err := s.ddHandler.HandleDDMsg(msg); err != nil {
+			if err := s.ddHandler.HandleDDMsg(ctx, msg); err != nil {
 				log.Error("handle dd msg error", zap.Error(err))
 				continue
 			}
@@ -502,7 +503,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb
 			Reason:    "server is initializing",
 		}, nil
 	}
-	s.segAllocator.SealAllSegments(req.CollectionID)
+	s.segAllocator.SealAllSegments(ctx, req.CollectionID)
 	return &commonpb.Status{
 		ErrorCode: commonpb.ErrorCode_Success,
 	}, nil
@@ -524,7 +525,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
 	}
 	for _, r := range req.SegmentIDRequests {
 		if !s.meta.HasCollection(r.CollectionID) {
-			if err := s.loadCollectionFromMaster(r.CollectionID); err != nil {
+			if err := s.loadCollectionFromMaster(ctx, r.CollectionID); err != nil {
 				log.Error("load collection from master error", zap.Int64("collectionID", r.CollectionID), zap.Error(err))
 				continue
 			}
@@ -534,7 +535,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
 				ErrorCode: commonpb.ErrorCode_UnexpectedError,
 			},
 		}
-		segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
+		segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
 		if err != nil {
 			if _, ok := err.(errRemainInSufficient); !ok {
 				result.Status.Reason = fmt.Sprintf("allocation of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
@@ -543,14 +544,14 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
 				continue
 			}
 
-			if err = s.openNewSegment(r.CollectionID, r.PartitionID, r.ChannelName); err != nil {
+			if err = s.openNewSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName); err != nil {
 				result.Status.Reason = fmt.Sprintf("open new segment of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
 					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
 				resp.SegIDAssignments = append(resp.SegIDAssignments, result)
 				continue
 			}
 
-			segmentID, retCount, expireTs, err = s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
+			segmentID, retCount, expireTs, err = s.segAllocator.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
 			if err != nil {
 				result.Status.Reason = fmt.Sprintf("retry allocation of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
 					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
@@ -571,8 +572,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
 	return resp, nil
 }
 
-func (s *Server) loadCollectionFromMaster(collectionID int64) error {
-	ctx := context.TODO()
+func (s *Server) loadCollectionFromMaster(ctx context.Context, collectionID int64) error {
 	resp, err := s.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
 		Base: &commonpb.MsgBase{
 			MsgType:  commonpb.MsgType_DescribeCollection,
@@ -591,7 +591,9 @@ func (s *Server) loadCollectionFromMaster(collectionID int64) error {
 	return s.meta.AddCollection(collInfo)
 }
 
-func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error {
+func (s *Server) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) error {
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
 	id, err := s.allocator.allocID()
 	if err != nil {
 		return err
@@ -603,7 +605,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
 	if err = s.meta.AddSegment(segmentInfo); err != nil {
 		return err
 	}
-	if err = s.segAllocator.OpenSegment(segmentInfo); err != nil {
+	if err = s.segAllocator.OpenSegment(ctx, segmentInfo); err != nil {
 		return err
 	}
 	infoMsg := &msgstream.SegmentInfoMsg{
@@ -623,7 +625,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
 	msgPack := &msgstream.MsgPack{
 		Msgs: []msgstream.TsMsg{infoMsg},
 	}
-	if err = s.segmentInfoStream.Produce(s.ctx, msgPack); err != nil {
+	if err = s.segmentInfoStream.Produce(ctx, msgPack); err != nil {
 		return err
 	}
 	return nil
diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go
index 39dbd5355cd8152f4353b31ae3167bd83829f9ea..ac3a1ff1f80cb3fae6e9cd40b46b0c7d1d2e56ff 100644
--- a/internal/dataservice/watcher.go
+++ b/internal/dataservice/watcher.go
@@ -4,6 +4,7 @@ import (
 	"github.com/zilliztech/milvus-distributed/internal/log"
 	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
 	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
+	"github.com/zilliztech/milvus-distributed/internal/util/trace"
 	"go.uber.org/zap"
 
 	"golang.org/x/net/context"
@@ -36,7 +37,8 @@ func (watcher *proxyTimeTickWatcher) StartBackgroundLoop(ctx context.Context) {
 			log.Debug("proxy time tick watcher closed")
 			return
 		case msg := <-watcher.msgQueue:
-			if err := watcher.allocator.ExpireAllocations(msg.Base.Timestamp); err != nil {
+			traceCtx := context.TODO()
+			if err := watcher.allocator.ExpireAllocations(traceCtx, msg.Base.Timestamp); err != nil {
 				log.Error("expire allocations error", zap.Error(err))
 			}
 		}
@@ -76,12 +78,15 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
 }
 
 func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTickMsg) error {
-	segments, err := watcher.allocator.GetSealedSegments()
+	ctx := context.TODO()
+	sp, _ := trace.StartSpanFromContext(ctx)
+	defer sp.Finish()
+	segments, err := watcher.allocator.GetSealedSegments(ctx)
 	if err != nil {
 		return err
 	}
 	for _, id := range segments {
-		expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp)
+		expired, err := watcher.allocator.IsAllocationsExpired(ctx, id, msg.Base.Timestamp)
 		if err != nil {
 			log.Error("check allocations expired error", zap.Int64("segmentID", id), zap.Error(err))
 			continue
@@ -106,7 +111,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
 				CollectionID: segmentInfo.CollectionID,
 				SegmentIDs:   []int64{segmentInfo.SegmentID},
 			})
-			watcher.allocator.DropSegment(id)
+			watcher.allocator.DropSegment(ctx, id)
 		}
 	}
 	return nil
diff --git a/internal/dataservice/watcher_test.go b/internal/dataservice/watcher_test.go
index e64a66aa34f0ecef3107c2a128b575f22d14e6bc..c16f2267fa39b72048ed10889479ad145a16c2df 100644
--- a/internal/dataservice/watcher_test.go
+++ b/internal/dataservice/watcher_test.go
@@ -1,6 +1,7 @@
 package dataservice
 
 import (
+	"context"
 	"strconv"
 	"testing"
 	"time"
@@ -13,6 +14,7 @@ import (
 )
 
 func TestDataNodeTTWatcher(t *testing.T) {
+	ctx := context.Background()
 	Params.Init()
 	c := make(chan struct{})
 	cluster := newDataNodeCluster(c)
@@ -56,10 +58,10 @@ func TestDataNodeTTWatcher(t *testing.T) {
 		assert.Nil(t, err)
 		err = meta.AddSegment(segmentInfo)
 		assert.Nil(t, err)
-		err = segAllocator.OpenSegment(segmentInfo)
+		err = segAllocator.OpenSegment(ctx, segmentInfo)
 		assert.Nil(t, err)
 		if c.allocation && c.expired {
-			_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
+			_, _, _, err := segAllocator.AllocSegment(ctx, id, 100, "channel"+strconv.Itoa(i), 100)
 			assert.Nil(t, err)
 		}
 	}
@@ -67,11 +69,11 @@ func TestDataNodeTTWatcher(t *testing.T) {
 	time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
 	for i, c := range cases {
 		if c.allocation && !c.expired {
-			_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
+			_, _, _, err := segAllocator.AllocSegment(ctx, id, 100, "channel"+strconv.Itoa(i), 100)
 			assert.Nil(t, err)
 		}
 		if c.sealed {
-			err := segAllocator.SealSegment(segmentIDs[i])
+			err := segAllocator.SealSegment(ctx, segmentIDs[i])
 			assert.Nil(t, err)
 		}
 	}
diff --git a/tests/python_test/test_index.py b/tests/python_test/test_index.py
index dc63abcbb9cc24c6548edcbc57eba2b159b2033e..d02f9e009a4e963289e2ea61cc0b9c760b0124d1 100644
--- a/tests/python_test/test_index.py
+++ b/tests/python_test/test_index.py
@@ -253,6 +253,26 @@ class TestIndexBase:
         # assert index == indexs[-1]
         assert not index    # FLAT is the last index_type, drop all indexes in server
 
+    @pytest.mark.tags(CaseLabel.tags_0331)
+    @pytest.mark.level(2)
+    @pytest.mark.timeout(BUILD_TIMEOUT)
+    def test_create_different_index_repeatedly_B(self, connect, collection):
+        '''
+        target: check if index can be created repeatedly, with the different create_index params
+        method: create another index with different index_params after index have been built
+        expected: return code 0, and describe index result equals with the second index params
+        '''
+        ids = connect.insert(collection, default_entities)
+        connect.flush([collection])
+        indexs = [default_index, {"metric_type": "L2", "index_type": "IVF_SQ8", "params": {"nlist": 1024}}]
+        for index in indexs:
+            connect.create_index(collection, field_name, index)
+            connect.release_collection(collection)
+            connect.load_collection(collection)
+        index = connect.describe_index(collection, field_name)
+        assert index == indexs[-1]
+        # assert not index  # FLAT is the last index_type, drop all indexes in server
+
     @pytest.mark.tags(CaseLabel.tags_0331, CaseLabel.tags_l1, CaseLabel.tags_smoke)
     @pytest.mark.timeout(BUILD_TIMEOUT)
     def test_create_index_ip(self, connect, collection, get_simple_index):