diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 34559549855f75b3c807f3c6bd897108cca5df76..2c3e07cdd64f501b7858482e4b0f08061f576a8c 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -67,6 +67,8 @@ func (p *ParamTable) Init() { p.initDataNodeNum() p.initSegmentInfoChannelName() p.initDataServiceSubscriptionName() + p.initK2SChannelNames() + p.initSegmentFlushMetaPath() } func (p *ParamTable) initAddress() { diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 562653fb8850c5d24fe46016c7cffa2c30b528c3..b48634f2da36fed46e82e5a26ecc47c1a6b5bdcd 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -360,7 +360,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { func (s *Server) startDDChannel(ctx context.Context) { defer s.serverLoopWg.Done() - ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024) + ddStream := pulsarms.NewPulsarTtMsgStream(ctx, 1024) ddStream.SetPulsarClient(Params.PulsarAddress) ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) ddStream.Start() @@ -574,6 +574,9 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha return err } infoMsg := &msgstream.SegmentInfoMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, SegmentMsg: datapb.SegmentMsg{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kSegmentInfo, @@ -594,16 +597,19 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha } func (s *Server) ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) { + resp := &datapb.ShowSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + }, + } if !s.checkStateIsHealthy() { - return &datapb.ShowSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "server is initializing", - }, - }, nil + resp.Status.Reason = "server is initializing" + return resp, nil } ids := s.meta.GetSegmentsByCollectionAndPartitionID(req.CollectionID, req.PartitionID) - return &datapb.ShowSegmentResponse{SegmentIDs: ids}, nil + resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS + resp.SegmentIDs = ids + return resp, nil } func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { @@ -622,6 +628,7 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg resp.Status.Reason = "get segment states error: " + err.Error() return resp, nil } + resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS resp.State = segmentInfo.State resp.CreateTime = segmentInfo.OpenTime resp.SealedTime = segmentInfo.SealedTime @@ -632,7 +639,6 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg } func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) { - // todo resp := &datapb.InsertBinlogPathsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -656,6 +662,7 @@ func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*dat fields[i] = field.FieldID paths[i] = &internalpb2.StringList{Values: field.BinlogPaths} } + resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS resp.FieldIDs = fields resp.Paths = paths return resp, nil