diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 572563129a0bb9dde89e512f9e4fc3e916c7962b..c6f49c1550223480f4fa16ed38d7df7cf1204465 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -61,6 +61,7 @@ type Server struct { insertChannels []string msFactory msgstream.Factory ttBarrier timesync.TimeTickBarrier + allocMu sync.RWMutex } func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { @@ -499,6 +500,8 @@ func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, err } func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error) { + s.allocMu.Lock() + defer s.allocMu.Unlock() if !s.checkStateIsHealthy() { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -512,6 +515,8 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb } func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { + s.allocMu.Lock() + defer s.allocMu.Unlock() resp := &datapb.AssignSegmentIDResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success,