diff --git a/pkg/dnservice/factory.go b/pkg/dnservice/factory.go
index a25956ebe220ad5f5b243e533b1f5c8571e86deb..e370a5e2368bc6c319febd7c66cb137c0aa49779 100644
--- a/pkg/dnservice/factory.go
+++ b/pkg/dnservice/factory.go
@@ -104,5 +104,5 @@ func (s *store) newMemTxnStorage(shard metadata.DNShard, logClient logservice.Cl
}
func (s *store) newTAEStorage(shard metadata.DNShard, logClient logservice.Client) (storage.TxnStorage, error) {
- return taestorage.New(shard, logClient, s.s3FS, s.localFS, s.clock)
+ return taestorage.New(shard, logClient, s.fs, s.clock)
}
diff --git a/pkg/dnservice/store.go b/pkg/dnservice/store.go
index cb3860de3b290294f46cda710804af40acfbb8e3..afbbd7e86a580476841f8712d156c75f168f7acf 100644
--- a/pkg/dnservice/store.go
+++ b/pkg/dnservice/store.go
@@ -20,7 +20,6 @@ import (
"time"
"github.com/fagongzi/goetty/v2"
- "github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/fileservice"
@@ -80,8 +79,8 @@ type store struct {
server rpc.TxnServer
hakeeperClient logservice.DNHAKeeperClient
fsFactory fileservice.FileServiceFactory
- localFS fileservice.ReplaceableFileService
- s3FS fileservice.FileService
+ fs fileservice.FileService
+ metadataFS fileservice.ReplaceableFileService
replicas *sync.Map
stopper *stopper.Stopper
@@ -369,21 +368,24 @@ func (s *store) initHAKeeperClient() error {
}
func (s *store) initFileService() error {
- fs, err := s.fsFactory(localFileServiceName)
+ localFS, err := s.fsFactory(localFileServiceName)
if err != nil {
return err
}
- rfs, ok := fs.(fileservice.ReplaceableFileService)
- if !ok {
- return moerr.NewError(moerr.BAD_CONFIGURATION, "local fileservice must implmented ReplaceableFileService")
+ rfs, err := fileservice.Get[fileservice.ReplaceableFileService](
+ localFS, localFileServiceName)
+ if err != nil {
+ return err
}
- s.localFS = rfs
- fs, err = s.fsFactory(s3FileServiceName)
+ s3FS, err := s.fsFactory(s3FileServiceName)
if err != nil {
return err
}
- s.s3FS = fs
+
+ s.fs = fileservice.NewFileServices(localFileServiceName, localFS, s3FS)
+ s.metadataFS = rfs
+
return nil
}
diff --git a/pkg/dnservice/store_metadata.go b/pkg/dnservice/store_metadata.go
index 7f42eabcfb9fb254a5d8b304396c85698b1c13f4..1d7eadc450cbee9701dcd457fa513d7e30ccf0b8 100644
--- a/pkg/dnservice/store_metadata.go
+++ b/pkg/dnservice/store_metadata.go
@@ -41,7 +41,7 @@ func (s *store) initMetadata() error {
},
},
}
- if err := s.localFS.Read(ctx, vec); err != nil {
+ if err := s.metadataFS.Read(ctx, vec); err != nil {
if err == fileservice.ErrFileNotFound {
return nil
}
@@ -100,7 +100,7 @@ func (s *store) mustUpdateMetadataLocked() {
},
},
}
- if err := s.localFS.Replace(ctx, vec); err != nil {
+ if err := s.metadataFS.Replace(ctx, vec); err != nil {
s.logger.Fatal("update metadata to local file failed",
zap.Error(err))
}
diff --git a/pkg/dnservice/store_metadata_test.go b/pkg/dnservice/store_metadata_test.go
index f9911a5b5550cf0964fd82ad2529c977b73505b7..24dc4b39fd9f7da916e3fa2a37b5b1b5c3a42b2f 100644
--- a/pkg/dnservice/store_metadata_test.go
+++ b/pkg/dnservice/store_metadata_test.go
@@ -26,15 +26,15 @@ import (
)
func TestInitMetadata(t *testing.T) {
- fs, err := fileservice.NewMemoryFS()
+ fs, err := fileservice.NewMemoryFS(localFileServiceName)
assert.NoError(t, err)
- s := &store{logger: logutil.GetPanicLogger(), localFS: fs}
+ s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
assert.NoError(t, s.initMetadata())
}
func TestInitMetadataWithExistData(t *testing.T) {
- fs, err := fileservice.NewMemoryFS()
+ fs, err := fileservice.NewMemoryFS(localFileServiceName)
assert.NoError(t, err)
value := metadata.DNStore{
UUID: "dn1",
@@ -58,7 +58,7 @@ func TestInitMetadataWithExistData(t *testing.T) {
},
}))
- s := &store{logger: logutil.GetPanicLogger(), localFS: fs}
+ s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
s.mu.metadata.UUID = "dn1"
assert.NoError(t, s.initMetadata())
assert.Equal(t, value, s.mu.metadata)
@@ -72,7 +72,7 @@ func TestInitMetadataWithInvalidUUIDWillPanic(t *testing.T) {
assert.Fail(t, "must panic")
}()
- fs, err := fileservice.NewMemoryFS()
+ fs, err := fileservice.NewMemoryFS(localFileServiceName)
assert.NoError(t, err)
value := metadata.DNStore{
UUID: "dn1",
@@ -88,6 +88,6 @@ func TestInitMetadataWithInvalidUUIDWillPanic(t *testing.T) {
},
}))
- s := &store{logger: logutil.GetPanicLogger(), localFS: fs}
+ s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
assert.NoError(t, s.initMetadata())
}
diff --git a/pkg/dnservice/store_test.go b/pkg/dnservice/store_test.go
index f9cbf105cad0fa04e47651b076fe4e8b9e8edc50..11eab2b9857393705f518c538e78944bff960d47 100644
--- a/pkg/dnservice/store_test.go
+++ b/pkg/dnservice/store_test.go
@@ -54,13 +54,13 @@ func TestAddReplica(t *testing.T) {
}
func TestStartWithReplicas(t *testing.T) {
- localFS, err := fileservice.NewMemoryFS()
+ localFS, err := fileservice.NewMemoryFS(localFileServiceName)
assert.NoError(t, err)
factory := func(name string) (fileservice.FileService, error) {
if name == localFileServiceName {
return localFS, nil
}
- return fileservice.NewMemoryFS()
+ return fileservice.NewMemoryFS(name)
}
runDNStoreTestWithFileServiceFactory(t, func(s *store) {
@@ -132,7 +132,7 @@ func runDNStoreTest(
testFn func(*store),
opts ...Option) {
runDNStoreTestWithFileServiceFactory(t, testFn, func(name string) (fileservice.FileService, error) {
- return fileservice.NewMemoryFS()
+ return fileservice.NewMemoryFS(name)
}, opts...)
}
@@ -156,7 +156,7 @@ func runDNStoreTestWithFileServiceFactory(
if fsFactory == nil {
fsFactory = func(name string) (fileservice.FileService, error) {
- return fileservice.NewMemoryFS()
+ return fileservice.NewMemoryFS(name)
}
}
s := newTestStore(t, "u1", fsFactory, opts...)
diff --git a/pkg/fileservice/config.go b/pkg/fileservice/config.go
index d7ba9b316b9398b76de57ca4a20b0058c87ead8d..9947d24e179f522c0beeec51204f2724d43fdf62 100644
--- a/pkg/fileservice/config.go
+++ b/pkg/fileservice/config.go
@@ -64,7 +64,7 @@ func NewFileService(cfg Config) (FileService, error) {
}
func newMemFileService(cfg Config) (FileService, error) {
- fs, err := NewMemoryFS()
+ fs, err := NewMemoryFS(cfg.Name)
if err != nil {
return nil, err
}
@@ -73,6 +73,7 @@ func newMemFileService(cfg Config) (FileService, error) {
func newDiskFileService(cfg Config) (FileService, error) {
fs, err := NewLocalFS(
+ cfg.Name,
cfg.DataDir,
int(cfg.CacheMemCapacityBytes),
)
@@ -84,6 +85,7 @@ func newDiskFileService(cfg Config) (FileService, error) {
func newMinioFileService(cfg Config) (FileService, error) {
fs, err := NewS3FSOnMinio(
+ cfg.Name,
cfg.S3.Endpoint,
cfg.S3.Bucket,
cfg.S3.KeyPrefix,
@@ -97,6 +99,7 @@ func newMinioFileService(cfg Config) (FileService, error) {
func newS3FileService(cfg Config) (FileService, error) {
fs, err := NewS3FS(
+ cfg.Name,
cfg.S3.Endpoint,
cfg.S3.Bucket,
cfg.S3.KeyPrefix,
diff --git a/pkg/fileservice/file_service.go b/pkg/fileservice/file_service.go
index db0084da61f2c0dd26f215f96e0b518a2cfa304c..0f5b55efdb13b9ced21e2fe68d60a9da26194476 100644
--- a/pkg/fileservice/file_service.go
+++ b/pkg/fileservice/file_service.go
@@ -21,6 +21,9 @@ import (
// FileService is a write-once file system
type FileService interface {
+ // Name is file service's name
+ Name() string
+
// Write writes a new file
// returns ErrFileExisted if file already existed
// returns ErrSizeNotMatch if provided size does not match data
@@ -44,6 +47,8 @@ type FileService interface {
type IOVector struct {
// path to file, '/' separated
+ // add a file service name prefix to select different service
+ // for example, 's3:a/b/c' refer to the path 'a/b/c' in S3
FilePath string
// io entries
// empty entry not allowed
diff --git a/pkg/fileservice/file_service_test.go b/pkg/fileservice/file_service_test.go
index 2a4609ce079a094b3908811a352825e5ca948769..fca5712f620f2364e9a656934b0d2d0b28e067c3 100644
--- a/pkg/fileservice/file_service_test.go
+++ b/pkg/fileservice/file_service_test.go
@@ -443,6 +443,35 @@ func testFileService(
})
+ t.Run("named path", func(t *testing.T) {
+ ctx := context.Background()
+ fs := newFS()
+
+ err := fs.Write(ctx, IOVector{
+ FilePath: fs.Name() + ":foo",
+ Entries: []IOEntry{
+ {
+ Size: 4,
+ Data: []byte("1234"),
+ },
+ },
+ })
+ assert.Nil(t, err)
+
+ vec := IOVector{
+ FilePath: "foo",
+ Entries: []IOEntry{
+ {
+ Size: -1,
+ },
+ },
+ }
+ err = fs.Read(ctx, &vec)
+ assert.Nil(t, err)
+ assert.Equal(t, []byte("1234"), vec.Entries[0].Data)
+
+ })
+
}
func randomSplit(data []byte, maxLen int) (ret [][]byte) {
diff --git a/pkg/fileservice/file_services.go b/pkg/fileservice/file_services.go
new file mode 100644
index 0000000000000000000000000000000000000000..e8fd6515777c3301a8d4551735ad74abc23b4bc4
--- /dev/null
+++ b/pkg/fileservice/file_services.go
@@ -0,0 +1,106 @@
+// Copyright 2022 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileservice
+
+import (
+ "context"
+ "fmt"
+)
+
+type FileServices struct {
+ defaultName string
+ mappings map[string]FileService
+}
+
+func NewFileServices(defaultName string, fss ...FileService) *FileServices {
+ f := &FileServices{
+ defaultName: defaultName,
+ mappings: make(map[string]FileService),
+ }
+ for _, fs := range fss {
+ name := fs.Name()
+ if _, ok := f.mappings[name]; ok {
+ panic(fmt.Errorf("duplicated file service name: %s", name))
+ }
+ f.mappings[name] = fs
+ }
+ return f
+}
+
+var _ FileService = &FileServices{}
+
+func (f *FileServices) Delete(ctx context.Context, filePath string) error {
+ name, path, err := splitPath("", filePath)
+ if err != nil {
+ return err
+ }
+ if name == "" {
+ name = f.defaultName
+ }
+ fs, err := Get[FileService](f, name)
+ if err != nil {
+ return err
+ }
+ return fs.Delete(ctx, path)
+}
+
+func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error) {
+ name, path, err := splitPath("", dirPath)
+ if err != nil {
+ return nil, err
+ }
+ if name == "" {
+ name = f.defaultName
+ }
+ fs, err := Get[FileService](f, name)
+ if err != nil {
+ return nil, err
+ }
+ return fs.List(ctx, path)
+}
+
+func (f *FileServices) Name() string {
+ return f.defaultName
+}
+
+func (f *FileServices) Read(ctx context.Context, vector *IOVector) error {
+ name, _, err := splitPath("", vector.FilePath)
+ if err != nil {
+ return err
+ }
+ if name == "" {
+ name = f.defaultName
+ }
+ fs, err := Get[FileService](f, name)
+ if err != nil {
+ return err
+ }
+ return fs.Read(ctx, vector)
+}
+
+func (f *FileServices) Write(ctx context.Context, vector IOVector) error {
+ name, _, err := splitPath("", vector.FilePath)
+ if err != nil {
+ return err
+ }
+ if name == "" {
+ name = f.defaultName
+ }
+ fs, err := Get[FileService](f, name)
+ if err != nil {
+ return err
+ }
+ return fs.Write(ctx, vector)
+}
diff --git a/pkg/fileservice/file_services_test.go b/pkg/fileservice/file_services_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..0bbfea8568be5b4223283ea4efc6e988559161b5
--- /dev/null
+++ b/pkg/fileservice/file_services_test.go
@@ -0,0 +1,32 @@
+// Copyright 2022 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileservice
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestFileServices(t *testing.T) {
+ t.Run("file service", func(t *testing.T) {
+ testFileService(t, func() FileService {
+ dir := t.TempDir()
+ fs, err := NewLocalFS("local", dir, 0)
+ assert.Nil(t, err)
+ return NewFileServices("local", fs)
+ })
+ })
+}
diff --git a/pkg/fileservice/get.go b/pkg/fileservice/get.go
new file mode 100644
index 0000000000000000000000000000000000000000..b337f7423fd990d92b5e9f13df90f1c36257802a
--- /dev/null
+++ b/pkg/fileservice/get.go
@@ -0,0 +1,44 @@
+// Copyright 2022 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileservice
+
+import "fmt"
+
+func Get[T any](fs FileService, name string) (res T, err error) {
+ if fs, ok := fs.(*FileServices); ok {
+ f, ok := fs.mappings[name]
+ if !ok {
+ err = fmt.Errorf("file service not found: %s", name)
+ return
+ }
+ res, ok = f.(T)
+ if !ok {
+ err = fmt.Errorf("%T does not implement %T", f, res)
+ return
+ }
+ return
+ }
+ var ok bool
+ res, ok = fs.(T)
+ if !ok {
+ err = fmt.Errorf("%T does not implement %T", fs, res)
+ return
+ }
+ if fs.Name() != name {
+ err = fmt.Errorf("file service name not match, expecting %s, got %s", name, fs.Name())
+ return
+ }
+ return
+}
diff --git a/pkg/fileservice/local_etl_fs.go b/pkg/fileservice/local_etl_fs.go
index 12fc86606de99423858aa115bf85e9cd703cc887..264e0c5ad2ef155f3c87092c05442fda9af31546 100644
--- a/pkg/fileservice/local_etl_fs.go
+++ b/pkg/fileservice/local_etl_fs.go
@@ -27,6 +27,7 @@ import (
// LocalETLFS is a FileService implementation backed by local file system and suitable for ETL operations
type LocalETLFS struct {
+ name string
rootPath string
sync.RWMutex
@@ -37,13 +38,18 @@ type LocalETLFS struct {
var _ FileService = new(LocalETLFS)
-func NewLocalETLFS(rootPath string) (*LocalETLFS, error) {
+func NewLocalETLFS(name string, rootPath string) (*LocalETLFS, error) {
return &LocalETLFS{
+ name: name,
rootPath: rootPath,
dirFiles: make(map[string]*os.File),
}, nil
}
+func (l *LocalETLFS) Name() string {
+ return l.name
+}
+
func (l *LocalETLFS) ensureTempDir() (err error) {
l.createTempDirOnce.Do(func() {
err = os.MkdirAll(filepath.Join(l.rootPath, ".tmp"), 0755)
@@ -52,10 +58,14 @@ func (l *LocalETLFS) ensureTempDir() (err error) {
}
func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error {
- nativePath := l.toNativeFilePath(vector.FilePath)
+ _, path, err := splitPath(l.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
// check existence
- _, err := os.Stat(nativePath)
+ _, err = os.Stat(nativePath)
if err == nil {
// existed
return ErrFileExisted
@@ -65,8 +75,11 @@ func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error {
}
func (l *LocalETLFS) write(ctx context.Context, vector IOVector) error {
-
- nativePath := l.toNativeFilePath(vector.FilePath)
+ _, path, err := splitPath(l.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
// sort
sort.Slice(vector.Entries, func(i, j int) bool {
@@ -127,8 +140,13 @@ func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error {
return ErrEmptyVector
}
- nativePath := l.toNativeFilePath(vector.FilePath)
- _, err := os.Stat(nativePath)
+ _, path, err := splitPath(l.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
+
+ _, err = os.Stat(nativePath)
if os.IsNotExist(err) {
return ErrFileNotFound
}
@@ -284,7 +302,12 @@ func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error {
func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error) {
- nativePath := l.toNativeFilePath(dirPath)
+ _, path, err := splitPath(l.name, dirPath)
+ if err != nil {
+ return nil, err
+ }
+ nativePath := l.toNativeFilePath(path)
+
f, err := os.Open(nativePath)
if os.IsNotExist(err) {
err = nil
@@ -324,23 +347,31 @@ func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry,
}
func (l *LocalETLFS) Delete(ctx context.Context, filePath string) error {
- nativePath := l.toNativeFilePath(filePath)
- _, err := os.Stat(nativePath)
+ _, path, err := splitPath(l.name, filePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
+
+ _, err = os.Stat(nativePath)
if os.IsNotExist(err) {
return ErrFileNotFound
}
if err != nil {
return err
}
+
err = os.Remove(nativePath)
if err != nil {
return err
}
+
parentDir, _ := filepath.Split(nativePath)
err = l.syncDir(parentDir)
if err != nil {
return err
}
+
return nil
}
@@ -425,8 +456,11 @@ func (l *LocalETLFS) ETLCompatible() {}
var _ MutableFileService = new(LocalETLFS)
func (l *LocalETLFS) NewMutator(filePath string) (Mutator, error) {
- // open
- nativePath := l.toNativeFilePath(filePath)
+ _, path, err := splitPath(l.name, filePath)
+ if err != nil {
+ return nil, err
+ }
+ nativePath := l.toNativeFilePath(path)
f, err := os.OpenFile(nativePath, os.O_RDWR, 0644)
if os.IsNotExist(err) {
return nil, ErrFileNotFound
diff --git a/pkg/fileservice/local_etl_fs_test.go b/pkg/fileservice/local_etl_fs_test.go
index e36ba92d7847172e9e1137c7779554635b853026..c1ac2e3b18b5ece7588249ec2e887937b86f7f69 100644
--- a/pkg/fileservice/local_etl_fs_test.go
+++ b/pkg/fileservice/local_etl_fs_test.go
@@ -25,7 +25,7 @@ func TestLocalETLFS(t *testing.T) {
t.Run("file service", func(t *testing.T) {
testFileService(t, func() FileService {
dir := t.TempDir()
- fs, err := NewLocalETLFS(dir)
+ fs, err := NewLocalETLFS("etl", dir)
assert.Nil(t, err)
return fs
})
@@ -34,7 +34,7 @@ func TestLocalETLFS(t *testing.T) {
t.Run("mutable file service", func(t *testing.T) {
testMutableFileService(t, func() MutableFileService {
dir := t.TempDir()
- fs, err := NewLocalETLFS(dir)
+ fs, err := NewLocalETLFS("etl", dir)
assert.Nil(t, err)
return fs
})
diff --git a/pkg/fileservice/local_fs.go b/pkg/fileservice/local_fs.go
index 7234b20c693a1a451fe346e8ce27bd7ed0112873..80308914295fa9735b6cf32c8ab34174fa8a42a1 100644
--- a/pkg/fileservice/local_fs.go
+++ b/pkg/fileservice/local_fs.go
@@ -29,6 +29,7 @@ import (
// LocalFS is a FileService implementation backed by local file system
type LocalFS struct {
+ name string
rootPath string
sync.RWMutex
@@ -40,6 +41,7 @@ type LocalFS struct {
var _ FileService = new(LocalFS)
func NewLocalFS(
+ name string,
rootPath string,
memCacheCapacity int,
) (*LocalFS, error) {
@@ -91,6 +93,7 @@ func NewLocalFS(
}
fs := &LocalFS{
+ name: name,
rootPath: rootPath,
dirFiles: make(map[string]*os.File),
}
@@ -101,11 +104,19 @@ func NewLocalFS(
return fs, nil
}
+func (l *LocalFS) Name() string {
+ return l.name
+}
+
func (l *LocalFS) Write(ctx context.Context, vector IOVector) error {
- nativePath := l.toNativeFilePath(vector.FilePath)
+ _, path, err := splitPath(l.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
// check existence
- _, err := os.Stat(nativePath)
+ _, err = os.Stat(nativePath)
if err == nil {
// existed
return ErrFileExisted
@@ -115,7 +126,11 @@ func (l *LocalFS) Write(ctx context.Context, vector IOVector) error {
}
func (l *LocalFS) write(ctx context.Context, vector IOVector) error {
- nativePath := l.toNativeFilePath(vector.FilePath)
+ _, path, err := splitPath(l.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
// sort
sort.Slice(vector.Entries, func(i, j int) bool {
@@ -191,7 +206,12 @@ func (l *LocalFS) Read(ctx context.Context, vector *IOVector) error {
func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
- nativePath := l.toNativeFilePath(vector.FilePath)
+ _, path, err := splitPath(l.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
+
file, err := os.Open(nativePath)
if os.IsNotExist(err) {
return ErrFileNotFound
@@ -343,7 +363,12 @@ func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error) {
- nativePath := l.toNativeFilePath(dirPath)
+ _, path, err := splitPath(l.name, dirPath)
+ if err != nil {
+ return nil, err
+ }
+ nativePath := l.toNativeFilePath(path)
+
f, err := os.Open(nativePath)
if os.IsNotExist(err) {
err = nil
@@ -383,23 +408,31 @@ func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err
}
func (l *LocalFS) Delete(ctx context.Context, filePath string) error {
- nativePath := l.toNativeFilePath(filePath)
- _, err := os.Stat(nativePath)
+ _, path, err := splitPath(l.name, filePath)
+ if err != nil {
+ return err
+ }
+ nativePath := l.toNativeFilePath(path)
+
+ _, err = os.Stat(nativePath)
if os.IsNotExist(err) {
return ErrFileNotFound
}
if err != nil {
return err
}
+
err = os.Remove(nativePath)
if err != nil {
return err
}
+
parentDir, _ := filepath.Split(nativePath)
err = l.syncDir(parentDir)
if err != nil {
return err
}
+
return nil
}
@@ -482,8 +515,11 @@ func (l *LocalFS) toNativeFilePath(filePath string) string {
var _ MutableFileService = new(LocalFS)
func (l *LocalFS) NewMutator(filePath string) (Mutator, error) {
- // open
- nativePath := l.toNativeFilePath(filePath)
+ _, path, err := splitPath(l.name, filePath)
+ if err != nil {
+ return nil, err
+ }
+ nativePath := l.toNativeFilePath(path)
f, err := os.OpenFile(nativePath, os.O_RDWR, 0644)
if os.IsNotExist(err) {
return nil, ErrFileNotFound
diff --git a/pkg/fileservice/local_fs_test.go b/pkg/fileservice/local_fs_test.go
index 95ac73e9a518e153c540482dfd63a3450727897a..52ee9320ea271b17d3b6b92959d09a3def6b93fc 100644
--- a/pkg/fileservice/local_fs_test.go
+++ b/pkg/fileservice/local_fs_test.go
@@ -25,7 +25,7 @@ func TestLocalFS(t *testing.T) {
t.Run("file service", func(t *testing.T) {
testFileService(t, func() FileService {
dir := t.TempDir()
- fs, err := NewLocalFS(dir, 0)
+ fs, err := NewLocalFS("local", dir, 0)
assert.Nil(t, err)
return fs
})
@@ -34,7 +34,7 @@ func TestLocalFS(t *testing.T) {
t.Run("mutable file service", func(t *testing.T) {
testMutableFileService(t, func() MutableFileService {
dir := t.TempDir()
- fs, err := NewLocalFS(dir, 0)
+ fs, err := NewLocalFS("local", dir, 0)
assert.Nil(t, err)
return fs
})
@@ -43,7 +43,7 @@ func TestLocalFS(t *testing.T) {
t.Run("replaceable file service", func(t *testing.T) {
testReplaceableFileService(t, func() ReplaceableFileService {
dir := t.TempDir()
- fs, err := NewLocalFS(dir, 0)
+ fs, err := NewLocalFS("local", dir, 0)
assert.Nil(t, err)
return fs
})
@@ -52,7 +52,7 @@ func TestLocalFS(t *testing.T) {
t.Run("caching file service", func(t *testing.T) {
testCachingFileService(t, func() CachingFileService {
dir := t.TempDir()
- fs, err := NewLocalFS(dir, 128*1024)
+ fs, err := NewLocalFS("local", dir, 128*1024)
assert.Nil(t, err)
return fs
})
@@ -63,7 +63,7 @@ func TestLocalFS(t *testing.T) {
func BenchmarkLocalFS(b *testing.B) {
benchmarkFileService(b, func() FileService {
dir := b.TempDir()
- fs, err := NewLocalFS(dir, 0)
+ fs, err := NewLocalFS("local", dir, 0)
assert.Nil(b, err)
return fs
})
diff --git a/pkg/fileservice/memory_fs.go b/pkg/fileservice/memory_fs.go
index 07256cee43e81fa01643b2a29f05a52396c8be58..6edbaf8ae2c3fa0bc00c5eb7ae306097c6b83db0 100644
--- a/pkg/fileservice/memory_fs.go
+++ b/pkg/fileservice/memory_fs.go
@@ -27,24 +27,35 @@ import (
// MemoryFS is an in-memory FileService implementation
type MemoryFS struct {
+ name string
sync.RWMutex
tree *btree.Generic[*_MemFSEntry]
}
var _ FileService = new(MemoryFS)
-func NewMemoryFS() (*MemoryFS, error) {
+func NewMemoryFS(name string) (*MemoryFS, error) {
return &MemoryFS{
+ name: name,
tree: btree.NewGeneric(func(a, b *_MemFSEntry) bool {
return a.FilePath < b.FilePath
}),
}, nil
}
+func (m *MemoryFS) Name() string {
+ return m.name
+}
+
func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error) {
m.RLock()
defer m.RUnlock()
+ _, dirPath, err = splitPath(m.name, dirPath)
+ if err != nil {
+ return nil, err
+ }
+
iter := m.tree.Iter()
defer iter.Release()
@@ -79,8 +90,13 @@ func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error {
m.Lock()
defer m.Unlock()
+ _, path, err := splitPath(m.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+
pivot := &_MemFSEntry{
- FilePath: vector.FilePath,
+ FilePath: path,
}
_, ok := m.tree.Get(pivot)
if ok {
@@ -92,6 +108,11 @@ func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error {
func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
+ _, path, err := splitPath(m.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+
if len(vector.Entries) == 0 {
vector.Entries = []IOEntry{
{
@@ -112,7 +133,7 @@ func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
return err
}
entry := &_MemFSEntry{
- FilePath: vector.FilePath,
+ FilePath: path,
Data: data,
}
m.tree.Set(entry)
@@ -122,6 +143,11 @@ func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) error {
+ _, path, err := splitPath(m.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+
if len(vector.Entries) == 0 {
return ErrEmptyVector
}
@@ -130,7 +156,7 @@ func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) error {
defer m.RUnlock()
pivot := &_MemFSEntry{
- FilePath: vector.FilePath,
+ FilePath: path,
}
fsEntry, ok := m.tree.Get(pivot)
@@ -191,8 +217,13 @@ func (m *MemoryFS) Delete(ctx context.Context, filePath string) error {
m.Lock()
defer m.Unlock()
+ _, path, err := splitPath(m.name, filePath)
+ if err != nil {
+ return err
+ }
+
pivot := &_MemFSEntry{
- FilePath: filePath,
+ FilePath: path,
}
m.tree.Delete(pivot)
diff --git a/pkg/fileservice/memory_fs_test.go b/pkg/fileservice/memory_fs_test.go
index d5ad5f5ed8402842738888ae7ead03b0755194ed..8958e06e4efd91758621564b6ebbbf55eeaec1ff 100644
--- a/pkg/fileservice/memory_fs_test.go
+++ b/pkg/fileservice/memory_fs_test.go
@@ -24,7 +24,7 @@ func TestMemoryFS(t *testing.T) {
t.Run("file service", func(t *testing.T) {
testFileService(t, func() FileService {
- fs, err := NewMemoryFS()
+ fs, err := NewMemoryFS("memory")
assert.Nil(t, err)
return fs
})
@@ -32,7 +32,7 @@ func TestMemoryFS(t *testing.T) {
t.Run("replaceable file service", func(t *testing.T) {
testReplaceableFileService(t, func() ReplaceableFileService {
- fs, err := NewMemoryFS()
+ fs, err := NewMemoryFS("memory")
assert.Nil(t, err)
return fs
})
@@ -42,7 +42,7 @@ func TestMemoryFS(t *testing.T) {
func BenchmarkMemoryFS(b *testing.B) {
benchmarkFileService(b, func() FileService {
- fs, err := NewMemoryFS()
+ fs, err := NewMemoryFS("memory")
assert.Nil(b, err)
return fs
})
diff --git a/pkg/fileservice/path.go b/pkg/fileservice/path.go
new file mode 100644
index 0000000000000000000000000000000000000000..fb034115352d56f04a507e4fd833c0d96e9cd304
--- /dev/null
+++ b/pkg/fileservice/path.go
@@ -0,0 +1,31 @@
+// Copyright 2022 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileservice
+
+import (
+ "fmt"
+ "strings"
+)
+
+func splitPath(serviceName string, path string) (string, string, error) {
+ parts := strings.SplitN(path, ":", 2)
+ if len(parts) == 2 {
+ if serviceName != "" && parts[0] != "" && parts[0] != serviceName {
+ return "", "", fmt.Errorf("wrong file service name, expecting %s, got %s", serviceName, parts[0])
+ }
+ return parts[0], parts[1], nil
+ }
+ return "", parts[0], nil
+}
diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go
index a44a137e79c807af2041012561e1abab03b2f4e8..c0149f16708dd86f25b2cca51be7b077b79eac54 100644
--- a/pkg/fileservice/s3_fs.go
+++ b/pkg/fileservice/s3_fs.go
@@ -35,6 +35,7 @@ import (
// S3FS is a FileService implementation backed by S3
type S3FS struct {
+ name string
client *s3.Client
bucket string
keyPrefix string
@@ -48,6 +49,7 @@ type S3FS struct {
var _ FileService = new(S3FS)
func NewS3FS(
+ name string,
endpoint string,
bucket string,
keyPrefix string,
@@ -64,6 +66,7 @@ func NewS3FS(
endpoint = u.String()
return newS3FS(
+ name,
endpoint,
bucket,
keyPrefix,
@@ -77,6 +80,7 @@ func NewS3FS(
// NewS3FSOnMinio creates S3FS on minio server
// this is needed because the URL scheme of minio server does not compatible with AWS'
func NewS3FSOnMinio(
+ name string,
endpoint string,
bucket string,
keyPrefix string,
@@ -93,6 +97,7 @@ func NewS3FSOnMinio(
endpoint = u.String()
return newS3FS(
+ name,
endpoint,
bucket,
keyPrefix,
@@ -118,6 +123,7 @@ func NewS3FSOnMinio(
}
func newS3FS(
+ name string,
endpoint string,
bucket string,
keyPrefix string,
@@ -149,13 +155,22 @@ func newS3FS(
return fs, nil
}
+func (s *S3FS) Name() string {
+ return s.name
+}
+
func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error) {
- var cont *string
- prefix := s.pathToKey(dirPath)
+ _, p, err := splitPath(s.name, dirPath)
+ if err != nil {
+ return nil, err
+ }
+ prefix := s.pathToKey(p)
if prefix != "" {
prefix += "/"
}
+ var cont *string
+
for {
output, err := s.client.ListObjectsV2(
ctx,
@@ -203,7 +218,11 @@ func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, er
func (s *S3FS) Write(ctx context.Context, vector IOVector) error {
// check existence
- key := s.pathToKey(vector.FilePath)
+ _, path, err := splitPath(s.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ key := s.pathToKey(path)
output, err := s.client.HeadObject(
ctx,
&s3.HeadObjectInput{
@@ -229,7 +248,11 @@ func (s *S3FS) Write(ctx context.Context, vector IOVector) error {
}
func (s *S3FS) write(ctx context.Context, vector IOVector) error {
- key := s.pathToKey(vector.FilePath)
+ _, path, err := splitPath(s.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
+ key := s.pathToKey(path)
// sort
sort.Slice(vector.Entries, func(i, j int) bool {
@@ -283,6 +306,10 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) error {
}
func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
+ _, path, err := splitPath(s.name, vector.FilePath)
+ if err != nil {
+ return err
+ }
min := math.MaxInt
max := 0
@@ -311,7 +338,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
ctx,
&s3.GetObjectInput{
Bucket: ptrTo(s.bucket),
- Key: ptrTo(s.pathToKey(vector.FilePath)),
+ Key: ptrTo(s.pathToKey(path)),
Range: ptrTo(rang),
},
)
@@ -332,7 +359,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
ctx,
&s3.GetObjectInput{
Bucket: ptrTo(s.bucket),
- Key: ptrTo(s.pathToKey(vector.FilePath)),
+ Key: ptrTo(s.pathToKey(path)),
Range: ptrTo(rang),
},
)
@@ -408,11 +435,15 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
func (s *S3FS) Delete(ctx context.Context, filePath string) error {
- _, err := s.client.DeleteObject(
+ _, path, err := splitPath(s.name, filePath)
+ if err != nil {
+ return err
+ }
+ _, err = s.client.DeleteObject(
ctx,
&s3.DeleteObjectInput{
Bucket: ptrTo(s.bucket),
- Key: ptrTo(s.pathToKey(filePath)),
+ Key: ptrTo(s.pathToKey(path)),
},
)
if err != nil {
diff --git a/pkg/fileservice/s3_fs_test.go b/pkg/fileservice/s3_fs_test.go
index 1fc74380f934b628bb71efffe080bd4a30891d51..935c0d1b6a6fd3b4e08b880040dc7a1ec36c81bc 100644
--- a/pkg/fileservice/s3_fs_test.go
+++ b/pkg/fileservice/s3_fs_test.go
@@ -58,6 +58,7 @@ func TestS3FS(t *testing.T) {
testFileService(t, func() FileService {
fs, err := NewS3FS(
+ "s3",
config.Endpoint,
config.Bucket,
time.Now().Format("2006-01-02.15:04:05.000000"),
@@ -71,6 +72,7 @@ func TestS3FS(t *testing.T) {
t.Run("list root", func(t *testing.T) {
fs, err := NewS3FS(
+ "s3",
config.Endpoint,
config.Bucket,
"",
@@ -86,6 +88,7 @@ func TestS3FS(t *testing.T) {
t.Run("caching file service", func(t *testing.T) {
testCachingFileService(t, func() CachingFileService {
fs, err := NewS3FS(
+ "s3",
config.Endpoint,
config.Bucket,
time.Now().Format("2006-01-02.15:04:05.000000"),
@@ -166,6 +169,7 @@ func TestS3FSMinioServer(t *testing.T) {
testFileService(t, func() FileService {
fs, err := NewS3FSOnMinio(
+ "s3",
endpoint,
"test",
time.Now().Format("2006-01-02.15:04:05.000000"),
@@ -199,6 +203,7 @@ func BenchmarkS3FS(b *testing.B) {
benchmarkFileService(b, func() FileService {
fs, err := NewS3FS(
+ "s3",
config.Endpoint,
config.Bucket,
time.Now().Format("2006-01-02.15:04:05.000000"),
diff --git a/pkg/sql/colexec/external/external.go b/pkg/sql/colexec/external/external.go
index 815cd2157e6a8d88f4ebcfdebfef607047065b77..89f8abb90ee6a6981bf71e8df74146c16d452161 100644
--- a/pkg/sql/colexec/external/external.go
+++ b/pkg/sql/colexec/external/external.go
@@ -104,6 +104,7 @@ func ReadFromS3(param *tree.ExternParam) ([]string, error) {
}
fs, err := fileservice.NewS3FS(
+ "s3",
config.Endpoint,
config.Bucket,
config.KeyPrefix,
@@ -152,6 +153,7 @@ func ReadFromS3File(param *tree.ExternParam) (io.ReadCloser, error) {
}
fs, err := fileservice.NewS3FS(
+ "s3",
config.Endpoint,
config.Bucket,
config.KeyPrefix,
@@ -188,7 +190,7 @@ func ReadFromLocal(param *tree.ExternParam) ([]string, error) {
file = string([]byte(param.Filepath)[index+1:])
}
- fs, err := fileservice.NewLocalETLFS(dir)
+ fs, err := fileservice.NewLocalETLFS("etl", dir)
if err != nil {
return nil, err
}
@@ -218,7 +220,7 @@ func ReadFromLocalFile(param *tree.ExternParam) (io.ReadCloser, error) {
file = string([]byte(param.Filepath)[index+1:])
}
- fs, err := fileservice.NewLocalETLFS(dir)
+ fs, err := fileservice.NewLocalETLFS("etl", dir)
if err != nil {
return nil, err
}
diff --git a/pkg/tests/service/fileservice.go b/pkg/tests/service/fileservice.go
index 33c5c70316cdd1061a11dc2a445e8073c447dff9..b3def1e2851cccb60b6f7f46d019445d8ddd3b89 100644
--- a/pkg/tests/service/fileservice.go
+++ b/pkg/tests/service/fileservice.go
@@ -39,12 +39,12 @@ type fileServices struct {
func newFileServices(t *testing.T, dnServiceNum int) *fileServices {
locals := make([]fileservice.FileService, 0, dnServiceNum)
for i := 0; i < dnServiceNum; i++ {
- fs, err := fileservice.NewMemoryFS()
+ fs, err := fileservice.NewMemoryFS("LOCAL")
require.NoError(t, err)
locals = append(locals, fs)
}
- s3fs, err := fileservice.NewMemoryFS()
+ s3fs, err := fileservice.NewMemoryFS("S3")
require.NoError(t, err)
return &fileServices{
diff --git a/pkg/txn/storage/tae/storage.go b/pkg/txn/storage/tae/storage.go
index d4bae9d3a9e006a8ab6a03ff836857f594c3ea37..e641875a882872cb598f22e8cedefe8c45a81f06 100644
--- a/pkg/txn/storage/tae/storage.go
+++ b/pkg/txn/storage/tae/storage.go
@@ -29,24 +29,21 @@ import (
type Storage struct {
shard metadata.DNShard
logClient logservice.Client
- s3FS fileservice.FileService
- localFS fileservice.FileService
+ fs fileservice.FileService
clock clock.Clock
}
func New(
shard metadata.DNShard,
logClient logservice.Client,
- s3FS fileservice.FileService,
- localFS fileservice.FileService,
+ fs fileservice.FileService,
clock clock.Clock,
) (*Storage, error) {
return &Storage{
shard: shard,
logClient: logClient,
- s3FS: s3FS,
- localFS: localFS,
+ fs: fs,
clock: clock,
}, nil
}