diff --git a/pkg/fileservice/local_fs.go b/pkg/fileservice/local_fs.go index 2456254d902ced3a72ccc4dab80c04dbe347d675..32552370abae2c41655273a06c1a266d54524ccf 100644 --- a/pkg/fileservice/local_fs.go +++ b/pkg/fileservice/local_fs.go @@ -101,6 +101,12 @@ func (l *LocalFS) Write(ctx context.Context, vector IOVector) error { return ErrFileExisted } + return l.write(ctx, vector) +} + +func (l *LocalFS) write(ctx context.Context, vector IOVector) error { + nativePath := l.toNativeFilePath(vector.FilePath) + // sort sort.Slice(vector.Entries, func(i, j int) bool { return vector.Entries[i].Offset < vector.Entries[j].Offset @@ -403,3 +409,9 @@ func (l *LocalFS) Mutate(ctx context.Context, vector IOVector) error { return nil } + +var _ ReplaceableFileService = new(LocalFS) + +func (l *LocalFS) Replace(ctx context.Context, vector IOVector) error { + return l.write(ctx, vector) +} diff --git a/pkg/fileservice/local_fs_test.go b/pkg/fileservice/local_fs_test.go index 91bf21ca6d0284c998498132f42adbc2a453b7ea..08829a2668c0ad33e98e15e01da55721c2ee2053 100644 --- a/pkg/fileservice/local_fs_test.go +++ b/pkg/fileservice/local_fs_test.go @@ -40,6 +40,15 @@ func TestLocalFS(t *testing.T) { }) }) + t.Run("replaceable file service", func(t *testing.T) { + testReplaceableFileService(t, func() ReplaceableFileService { + dir := t.TempDir() + fs, err := NewLocalFS(dir) + assert.Nil(t, err) + return fs + }) + }) + } func BenchmarkLocalFS(b *testing.B) { diff --git a/pkg/fileservice/memory_fs.go b/pkg/fileservice/memory_fs.go index 5d4ee2165a3b8350a80ca121d87d04516458d855..811bac013ea684aa913e46d7a813be5763279d28 100644 --- a/pkg/fileservice/memory_fs.go +++ b/pkg/fileservice/memory_fs.go @@ -92,6 +92,11 @@ func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error { return ErrFileExisted } + return m.write(ctx, vector) +} + +func (m *MemoryFS) write(ctx context.Context, vector IOVector) error { + if len(vector.Entries) == 0 { vector.Entries = []IOEntry{ { @@ -205,3 +210,11 @@ func (m *_MemFSEntry) Less(than btree.Item) bool { m2 := than.(*_MemFSEntry) return m.FilePath < m2.FilePath } + +var _ ReplaceableFileService = new(MemoryFS) + +func (m *MemoryFS) Replace(ctx context.Context, vector IOVector) error { + m.Lock() + defer m.Unlock() + return m.write(ctx, vector) +} diff --git a/pkg/fileservice/memory_fs_test.go b/pkg/fileservice/memory_fs_test.go index df7fc78e614a911864105249bb8b9fd449b4c314..d5ad5f5ed8402842738888ae7ead03b0755194ed 100644 --- a/pkg/fileservice/memory_fs_test.go +++ b/pkg/fileservice/memory_fs_test.go @@ -30,6 +30,14 @@ func TestMemoryFS(t *testing.T) { }) }) + t.Run("replaceable file service", func(t *testing.T) { + testReplaceableFileService(t, func() ReplaceableFileService { + fs, err := NewMemoryFS() + assert.Nil(t, err) + return fs + }) + }) + } func BenchmarkMemoryFS(b *testing.B) { diff --git a/pkg/fileservice/replaceable_file_service.go b/pkg/fileservice/replaceable_file_service.go new file mode 100644 index 0000000000000000000000000000000000000000..2fc67779e93e80cba97ca99f7b96b702bb4880d2 --- /dev/null +++ b/pkg/fileservice/replaceable_file_service.go @@ -0,0 +1,24 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileservice + +import "context" + +// ReplaceableFileService is an extension interface to FileService that allow replacing a whole file +type ReplaceableFileService interface { + FileService + + Replace(ctx context.Context, vector IOVector) error +} diff --git a/pkg/fileservice/replaceable_file_service_test.go b/pkg/fileservice/replaceable_file_service_test.go new file mode 100644 index 0000000000000000000000000000000000000000..85098cce2f83f77fb6c65af2a4048c92fbecc806 --- /dev/null +++ b/pkg/fileservice/replaceable_file_service_test.go @@ -0,0 +1,66 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileservice + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func testReplaceableFileService( + t *testing.T, + newFS func() ReplaceableFileService, +) { + + ctx := context.Background() + fs := newFS() + err := fs.Write(ctx, IOVector{ + FilePath: "foo", + Entries: []IOEntry{ + { + Size: 4, + Data: []byte("abcd"), + }, + }, + }) + assert.Nil(t, err) + + err = fs.Replace(ctx, IOVector{ + FilePath: "foo", + Entries: []IOEntry{ + { + Size: 2, + Data: []byte("cd"), + }, + }, + }) + assert.Nil(t, err) + + vec := &IOVector{ + FilePath: "foo", + Entries: []IOEntry{ + { + Size: -1, + }, + }, + } + err = fs.Read(ctx, vec) + assert.Nil(t, err) + + assert.Equal(t, 2, len(vec.Entries[0].Data)) + +} diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go index 0b906326cc8f4e4c6bfd97308fbabccc5593e909..d38d34b159df8b8f566537ca86cf56d3448de644 100644 --- a/pkg/fileservice/s3_fs.go +++ b/pkg/fileservice/s3_fs.go @@ -149,6 +149,12 @@ func (m *S3FS) Write(ctx context.Context, vector IOVector) error { return ErrFileExisted } + return m.write(ctx, vector) +} + +func (m *S3FS) write(ctx context.Context, vector IOVector) error { + key := m.pathToKey(vector.FilePath) + // sort sort.Slice(vector.Entries, func(i, j int) bool { return vector.Entries[i].Offset < vector.Entries[j].Offset diff --git a/pkg/fileservice/s3_fs_minio.go b/pkg/fileservice/s3_fs_minio.go index f5d4341040b2c7ef7969db02db62e0465c2b63c7..1e3917cde9efc87be0fd1af74688913bdc286436 100644 --- a/pkg/fileservice/s3_fs_minio.go +++ b/pkg/fileservice/s3_fs_minio.go @@ -107,6 +107,13 @@ func (m *S3FSMinio) Write(ctx context.Context, vector IOVector) error { } } + return m.write(ctx, vector) +} + +func (m *S3FSMinio) write(ctx context.Context, vector IOVector) error { + + key := m.pathToKey(vector.FilePath) + // sort sort.Slice(vector.Entries, func(i, j int) bool { return vector.Entries[i].Offset < vector.Entries[j].Offset @@ -120,7 +127,7 @@ func (m *S3FSMinio) Write(ctx context.Context, vector IOVector) error { } // put - _, err = m.client.PutObject( + _, err := m.client.PutObject( ctx, m.config.Bucket, key, diff --git a/pkg/fileservice/s3_fs_minio_test.go b/pkg/fileservice/s3_fs_minio_test.go index 3414fa7c87d0bb940273fbea3564d7f293cd42bf..d92a4b7b23f858cf3754bbe4458b3f4b0b08593b 100644 --- a/pkg/fileservice/s3_fs_minio_test.go +++ b/pkg/fileservice/s3_fs_minio_test.go @@ -36,14 +36,17 @@ func TestS3FSMinio(t *testing.T) { err = json.Unmarshal(content, &sharedConfig) assert.Nil(t, err) - testFileService(t, func() FileService { + t.Run("file service", func(t *testing.T) { + testFileService(t, func() FileService { - config := sharedConfig - config.KeyPrefix = time.Now().Format("2006-01-02T15:04:05") + config := sharedConfig + config.KeyPrefix = time.Now().Format("2006-01-02T15:04:05") - fs, err := NewS3FSMinio(config) - assert.Nil(t, err) + fs, err := NewS3FSMinio(config) + assert.Nil(t, err) - return fs + return fs + }) }) + } diff --git a/pkg/fileservice/s3_fs_test.go b/pkg/fileservice/s3_fs_test.go index 8d50ee970c02232181c58e1f792614181ce2d903..2aa85d5a5efa5a14711f6efc56ab19d120c191ce 100644 --- a/pkg/fileservice/s3_fs_test.go +++ b/pkg/fileservice/s3_fs_test.go @@ -40,18 +40,21 @@ func TestS3FS(t *testing.T) { os.Setenv("AWS_ACCESS_KEY_ID", sharedConfig.APIKey) os.Setenv("AWS_SECRET_ACCESS_KEY", sharedConfig.APISecret) - testFileService(t, func() FileService { + t.Run("file service", func(t *testing.T) { + testFileService(t, func() FileService { - config := sharedConfig - config.KeyPrefix = time.Now().Format("2006-01-02.15:04:05.000000") + config := sharedConfig + config.KeyPrefix = time.Now().Format("2006-01-02.15:04:05.000000") - fs, err := NewS3FS( - config.Endpoint, - config.Bucket, - config.KeyPrefix, - ) - assert.Nil(t, err) + fs, err := NewS3FS( + config.Endpoint, + config.Bucket, + config.KeyPrefix, + ) + assert.Nil(t, err) - return fs + return fs + }) }) + }