diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go
index b48634f2da36fed46e82e5a26ecc47c1a6b5bdcd..046cf7fff31567365a47aea1b258f13e425f81bf 100644
--- a/internal/dataservice/server.go
+++ b/internal/dataservice/server.go
@@ -360,7 +360,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
 
 func (s *Server) startDDChannel(ctx context.Context) {
 	defer s.serverLoopWg.Done()
-	ddStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024)
+	ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
 	ddStream.SetPulsarClient(Params.PulsarAddress)
 	ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
 	ddStream.Start()