diff --git a/pkg/fileservice/local_etl_fs.go b/pkg/fileservice/local_etl_fs.go index 03b29713cb07816d081b90d02e0f1a0f719967da..12fc86606de99423858aa115bf85e9cd703cc887 100644 --- a/pkg/fileservice/local_etl_fs.go +++ b/pkg/fileservice/local_etl_fs.go @@ -421,3 +421,70 @@ func (l *LocalETLFS) toNativeFilePath(filePath string) string { var _ ETLFileService = new(LocalETLFS) func (l *LocalETLFS) ETLCompatible() {} + +var _ MutableFileService = new(LocalETLFS) + +func (l *LocalETLFS) NewMutator(filePath string) (Mutator, error) { + // open + nativePath := l.toNativeFilePath(filePath) + f, err := os.OpenFile(nativePath, os.O_RDWR, 0644) + if os.IsNotExist(err) { + return nil, ErrFileNotFound + } + return &_LocalETLFSMutator{ + osFile: f, + }, nil +} + +type _LocalETLFSMutator struct { + osFile *os.File +} + +func (l *_LocalETLFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error { + + // write + for _, entry := range entries { + + if entry.ReaderForWrite != nil { + // seek and copy + _, err := l.osFile.Seek(int64(entry.Offset), 0) + if err != nil { + return err + } + n, err := io.Copy(l.osFile, entry.ReaderForWrite) + if err != nil { + return err + } + if int(n) != entry.Size { + return ErrSizeNotMatch + } + + } else { + // WriteAt + n, err := l.osFile.WriteAt(entry.Data, int64(entry.Offset)) + if err != nil { + return err + } + if int(n) != entry.Size { + return ErrSizeNotMatch + } + } + + } + + return nil +} + +func (l *_LocalETLFSMutator) Close() error { + // sync + if err := l.osFile.Sync(); err != nil { + return err + } + + // close + if err := l.osFile.Close(); err != nil { + return err + } + + return nil +} diff --git a/pkg/fileservice/local_etl_fs_test.go b/pkg/fileservice/local_etl_fs_test.go index 3de1f30eda17dac25092458de22f98d1befbd6f6..e36ba92d7847172e9e1137c7779554635b853026 100644 --- a/pkg/fileservice/local_etl_fs_test.go +++ b/pkg/fileservice/local_etl_fs_test.go @@ -31,4 +31,13 @@ func TestLocalETLFS(t *testing.T) { }) }) + t.Run("mutable file service", func(t *testing.T) { + testMutableFileService(t, func() MutableFileService { + dir := t.TempDir() + fs, err := NewLocalETLFS(dir) + assert.Nil(t, err) + return fs + }) + }) + }