From 4eb202d73a4cacc2d00141b25e35147027cf7bd3 Mon Sep 17 00:00:00 2001
From: reusee <reusee@gmail.com>
Date: Mon, 29 Aug 2022 15:07:23 +0800
Subject: [PATCH] fileservice: add Path, ParsePath, ParsePathAtService (#4748)

fileservice: add Path, ParsePath, ParsePathAtService

Approved by: @zhangxu19830126
---
 pkg/fileservice/file_services.go | 36 +++++++++++++-------------
 pkg/fileservice/local_etl_fs.go  | 24 ++++++++---------
 pkg/fileservice/local_fs.go      | 24 ++++++++---------
 pkg/fileservice/memory_fs.go     | 24 ++++++++---------
 pkg/fileservice/path.go          | 44 ++++++++++++++++++++++++--------
 pkg/fileservice/s3_fs.go         | 28 ++++++++++----------
 6 files changed, 102 insertions(+), 78 deletions(-)

diff --git a/pkg/fileservice/file_services.go b/pkg/fileservice/file_services.go
index 3fc7de2f2..d8a85362c 100644
--- a/pkg/fileservice/file_services.go
+++ b/pkg/fileservice/file_services.go
@@ -43,33 +43,33 @@ func NewFileServices(defaultName string, fss ...FileService) (*FileServices, err
 var _ FileService = &FileServices{}
 
 func (f *FileServices) Delete(ctx context.Context, filePath string) error {
-	name, path, err := splitPath("", filePath)
+	path, err := ParsePathAtService(filePath, "")
 	if err != nil {
 		return err
 	}
-	if name == "" {
-		name = f.defaultName
+	if path.Service == "" {
+		path.Service = f.defaultName
 	}
-	fs, err := Get[FileService](f, name)
+	fs, err := Get[FileService](f, path.Service)
 	if err != nil {
 		return err
 	}
-	return fs.Delete(ctx, path)
+	return fs.Delete(ctx, path.Full)
 }
 
 func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error) {
-	name, path, err := splitPath("", dirPath)
+	path, err := ParsePathAtService(dirPath, "")
 	if err != nil {
 		return nil, err
 	}
-	if name == "" {
-		name = f.defaultName
+	if path.Service == "" {
+		path.Service = f.defaultName
 	}
-	fs, err := Get[FileService](f, name)
+	fs, err := Get[FileService](f, path.Service)
 	if err != nil {
 		return nil, err
 	}
-	return fs.List(ctx, path)
+	return fs.List(ctx, path.Full)
 }
 
 func (f *FileServices) Name() string {
@@ -77,14 +77,14 @@ func (f *FileServices) Name() string {
 }
 
 func (f *FileServices) Read(ctx context.Context, vector *IOVector) error {
-	name, _, err := splitPath("", vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, "")
 	if err != nil {
 		return err
 	}
-	if name == "" {
-		name = f.defaultName
+	if path.Service == "" {
+		path.Service = f.defaultName
 	}
-	fs, err := Get[FileService](f, name)
+	fs, err := Get[FileService](f, path.Service)
 	if err != nil {
 		return err
 	}
@@ -92,14 +92,14 @@ func (f *FileServices) Read(ctx context.Context, vector *IOVector) error {
 }
 
 func (f *FileServices) Write(ctx context.Context, vector IOVector) error {
-	name, _, err := splitPath("", vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, "")
 	if err != nil {
 		return err
 	}
-	if name == "" {
-		name = f.defaultName
+	if path.Service == "" {
+		path.Service = f.defaultName
 	}
-	fs, err := Get[FileService](f, name)
+	fs, err := Get[FileService](f, path.Service)
 	if err != nil {
 		return err
 	}
diff --git a/pkg/fileservice/local_etl_fs.go b/pkg/fileservice/local_etl_fs.go
index 264e0c5ad..5b9f55d8f 100644
--- a/pkg/fileservice/local_etl_fs.go
+++ b/pkg/fileservice/local_etl_fs.go
@@ -58,11 +58,11 @@ func (l *LocalETLFS) ensureTempDir() (err error) {
 }
 
 func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error {
-	_, path, err := splitPath(l.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	// check existence
 	_, err = os.Stat(nativePath)
@@ -75,11 +75,11 @@ func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error {
 }
 
 func (l *LocalETLFS) write(ctx context.Context, vector IOVector) error {
-	_, path, err := splitPath(l.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	// sort
 	sort.Slice(vector.Entries, func(i, j int) bool {
@@ -140,11 +140,11 @@ func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error {
 		return ErrEmptyVector
 	}
 
-	_, path, err := splitPath(l.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	_, err = os.Stat(nativePath)
 	if os.IsNotExist(err) {
@@ -302,11 +302,11 @@ func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error {
 
 func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error) {
 
-	_, path, err := splitPath(l.name, dirPath)
+	path, err := ParsePathAtService(dirPath, l.name)
 	if err != nil {
 		return nil, err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	f, err := os.Open(nativePath)
 	if os.IsNotExist(err) {
@@ -347,11 +347,11 @@ func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry,
 }
 
 func (l *LocalETLFS) Delete(ctx context.Context, filePath string) error {
-	_, path, err := splitPath(l.name, filePath)
+	path, err := ParsePathAtService(filePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	_, err = os.Stat(nativePath)
 	if os.IsNotExist(err) {
@@ -456,11 +456,11 @@ func (l *LocalETLFS) ETLCompatible() {}
 var _ MutableFileService = new(LocalETLFS)
 
 func (l *LocalETLFS) NewMutator(filePath string) (Mutator, error) {
-	_, path, err := splitPath(l.name, filePath)
+	path, err := ParsePathAtService(filePath, l.name)
 	if err != nil {
 		return nil, err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 	f, err := os.OpenFile(nativePath, os.O_RDWR, 0644)
 	if os.IsNotExist(err) {
 		return nil, ErrFileNotFound
diff --git a/pkg/fileservice/local_fs.go b/pkg/fileservice/local_fs.go
index 803089142..24267b2e7 100644
--- a/pkg/fileservice/local_fs.go
+++ b/pkg/fileservice/local_fs.go
@@ -109,11 +109,11 @@ func (l *LocalFS) Name() string {
 }
 
 func (l *LocalFS) Write(ctx context.Context, vector IOVector) error {
-	_, path, err := splitPath(l.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	// check existence
 	_, err = os.Stat(nativePath)
@@ -126,11 +126,11 @@ func (l *LocalFS) Write(ctx context.Context, vector IOVector) error {
 }
 
 func (l *LocalFS) write(ctx context.Context, vector IOVector) error {
-	_, path, err := splitPath(l.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	// sort
 	sort.Slice(vector.Entries, func(i, j int) bool {
@@ -206,11 +206,11 @@ func (l *LocalFS) Read(ctx context.Context, vector *IOVector) error {
 
 func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
 
-	_, path, err := splitPath(l.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	file, err := os.Open(nativePath)
 	if os.IsNotExist(err) {
@@ -363,11 +363,11 @@ func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
 
 func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error) {
 
-	_, path, err := splitPath(l.name, dirPath)
+	path, err := ParsePathAtService(dirPath, l.name)
 	if err != nil {
 		return nil, err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	f, err := os.Open(nativePath)
 	if os.IsNotExist(err) {
@@ -408,11 +408,11 @@ func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err
 }
 
 func (l *LocalFS) Delete(ctx context.Context, filePath string) error {
-	_, path, err := splitPath(l.name, filePath)
+	path, err := ParsePathAtService(filePath, l.name)
 	if err != nil {
 		return err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 
 	_, err = os.Stat(nativePath)
 	if os.IsNotExist(err) {
@@ -515,11 +515,11 @@ func (l *LocalFS) toNativeFilePath(filePath string) string {
 var _ MutableFileService = new(LocalFS)
 
 func (l *LocalFS) NewMutator(filePath string) (Mutator, error) {
-	_, path, err := splitPath(l.name, filePath)
+	path, err := ParsePathAtService(filePath, l.name)
 	if err != nil {
 		return nil, err
 	}
-	nativePath := l.toNativeFilePath(path)
+	nativePath := l.toNativeFilePath(path.File)
 	f, err := os.OpenFile(nativePath, os.O_RDWR, 0644)
 	if os.IsNotExist(err) {
 		return nil, ErrFileNotFound
diff --git a/pkg/fileservice/memory_fs.go b/pkg/fileservice/memory_fs.go
index 6edbaf8ae..3143f4305 100644
--- a/pkg/fileservice/memory_fs.go
+++ b/pkg/fileservice/memory_fs.go
@@ -51,7 +51,7 @@ func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry
 	m.RLock()
 	defer m.RUnlock()
 
-	_, dirPath, err = splitPath(m.name, dirPath)
+	path, err := ParsePathAtService(dirPath, m.name)
 	if err != nil {
 		return nil, err
 	}
@@ -60,15 +60,15 @@ func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry
 	defer iter.Release()
 
 	pivot := &_MemFSEntry{
-		FilePath: dirPath,
+		FilePath: path.File,
 	}
 	for ok := iter.Seek(pivot); ok; ok = iter.Next() {
 		item := iter.Item()
-		if !strings.HasPrefix(item.FilePath, dirPath) {
+		if !strings.HasPrefix(item.FilePath, path.File) {
 			break
 		}
 
-		relPath := strings.TrimPrefix(item.FilePath, dirPath)
+		relPath := strings.TrimPrefix(item.FilePath, path.File)
 		relPath = strings.Trim(relPath, "/")
 		parts := strings.Split(relPath, "/")
 		isDir := len(parts) > 1
@@ -90,13 +90,13 @@ func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error {
 	m.Lock()
 	defer m.Unlock()
 
-	_, path, err := splitPath(m.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, m.name)
 	if err != nil {
 		return err
 	}
 
 	pivot := &_MemFSEntry{
-		FilePath: path,
+		FilePath: path.File,
 	}
 	_, ok := m.tree.Get(pivot)
 	if ok {
@@ -108,7 +108,7 @@ func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error {
 
 func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
 
-	_, path, err := splitPath(m.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, m.name)
 	if err != nil {
 		return err
 	}
@@ -133,7 +133,7 @@ func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
 		return err
 	}
 	entry := &_MemFSEntry{
-		FilePath: path,
+		FilePath: path.File,
 		Data:     data,
 	}
 	m.tree.Set(entry)
@@ -143,7 +143,7 @@ func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
 
 func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) error {
 
-	_, path, err := splitPath(m.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, m.name)
 	if err != nil {
 		return err
 	}
@@ -156,7 +156,7 @@ func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) error {
 	defer m.RUnlock()
 
 	pivot := &_MemFSEntry{
-		FilePath: path,
+		FilePath: path.File,
 	}
 
 	fsEntry, ok := m.tree.Get(pivot)
@@ -217,13 +217,13 @@ func (m *MemoryFS) Delete(ctx context.Context, filePath string) error {
 	m.Lock()
 	defer m.Unlock()
 
-	_, path, err := splitPath(m.name, filePath)
+	path, err := ParsePathAtService(filePath, m.name)
 	if err != nil {
 		return err
 	}
 
 	pivot := &_MemFSEntry{
-		FilePath: path,
+		FilePath: path.File,
 	}
 	m.tree.Delete(pivot)
 
diff --git a/pkg/fileservice/path.go b/pkg/fileservice/path.go
index 251289b8e..2c87e4923 100644
--- a/pkg/fileservice/path.go
+++ b/pkg/fileservice/path.go
@@ -21,17 +21,41 @@ import (
 
 const ServiceNameSeparator = ":"
 
-func splitPath(serviceName string, path string) (string, string, error) {
-	parts := strings.SplitN(path, ServiceNameSeparator, 2)
-	if len(parts) == 2 {
-		if serviceName != "" &&
-			parts[0] != "" &&
-			!strings.EqualFold(parts[0], serviceName) {
-			return "", "", fmt.Errorf("wrong file service name, expecting %s, got %s", serviceName, parts[0])
-		}
-		return parts[0], parts[1], nil
+type Path struct {
+	Full    string
+	Service string
+	File    string
+}
+
+func ParsePath(s string) (path Path, err error) {
+	parts := strings.SplitN(s, ServiceNameSeparator, 2)
+	switch len(parts) {
+	case 1:
+		// no service
+		path.File = parts[0]
+	case 2:
+		// with service
+		path.Service = parts[0]
+		path.File = parts[1]
+	default:
+		panic("impossible")
+	}
+	path.Full = joinPath(path.Service, path.File)
+	return
+}
+
+func ParsePathAtService(s string, serviceName string) (path Path, err error) {
+	path, err = ParsePath(s)
+	if err != nil {
+		return
+	}
+	if serviceName != "" &&
+		path.Service != "" &&
+		!strings.EqualFold(path.Service, serviceName) {
+		err = fmt.Errorf("wrong file service name, expecting %s, got %s", serviceName, path.Service)
+		return
 	}
-	return "", parts[0], nil
+	return
 }
 
 func joinPath(serviceName string, path string) string {
diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go
index 3b138efd2..105f0bfa2 100644
--- a/pkg/fileservice/s3_fs.go
+++ b/pkg/fileservice/s3_fs.go
@@ -22,7 +22,7 @@ import (
 	"io"
 	"math"
 	"net/url"
-	"path"
+	pathpkg "path"
 	"sort"
 	"strings"
 	"time"
@@ -170,11 +170,11 @@ func (s *S3FS) Name() string {
 
 func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error) {
 
-	_, p, err := splitPath(s.name, dirPath)
+	path, err := ParsePathAtService(dirPath, s.name)
 	if err != nil {
 		return nil, err
 	}
-	prefix := s.pathToKey(p)
+	prefix := s.pathToKey(path.File)
 	if prefix != "" {
 		prefix += "/"
 	}
@@ -197,7 +197,7 @@ func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, er
 		for _, obj := range output.Contents {
 			filePath := s.keyToPath(*obj.Key)
 			filePath = strings.TrimRight(filePath, "/")
-			_, name := path.Split(filePath)
+			_, name := pathpkg.Split(filePath)
 			entries = append(entries, DirEntry{
 				Name:  name,
 				IsDir: false,
@@ -227,11 +227,11 @@ func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, er
 func (s *S3FS) Write(ctx context.Context, vector IOVector) error {
 
 	// check existence
-	_, path, err := splitPath(s.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, s.name)
 	if err != nil {
 		return err
 	}
-	key := s.pathToKey(path)
+	key := s.pathToKey(path.File)
 	output, err := s.client.HeadObject(
 		ctx,
 		&s3.HeadObjectInput{
@@ -257,11 +257,11 @@ func (s *S3FS) Write(ctx context.Context, vector IOVector) error {
 }
 
 func (s *S3FS) write(ctx context.Context, vector IOVector) error {
-	_, path, err := splitPath(s.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, s.name)
 	if err != nil {
 		return err
 	}
-	key := s.pathToKey(path)
+	key := s.pathToKey(path.File)
 
 	// sort
 	sort.Slice(vector.Entries, func(i, j int) bool {
@@ -315,7 +315,7 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) error {
 }
 
 func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
-	_, path, err := splitPath(s.name, vector.FilePath)
+	path, err := ParsePathAtService(vector.FilePath, s.name)
 	if err != nil {
 		return err
 	}
@@ -347,7 +347,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
 			ctx,
 			&s3.GetObjectInput{
 				Bucket: ptrTo(s.bucket),
-				Key:    ptrTo(s.pathToKey(path)),
+				Key:    ptrTo(s.pathToKey(path.File)),
 				Range:  ptrTo(rang),
 			},
 		)
@@ -368,7 +368,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
 			ctx,
 			&s3.GetObjectInput{
 				Bucket: ptrTo(s.bucket),
-				Key:    ptrTo(s.pathToKey(path)),
+				Key:    ptrTo(s.pathToKey(path.File)),
 				Range:  ptrTo(rang),
 			},
 		)
@@ -444,7 +444,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
 
 func (s *S3FS) Delete(ctx context.Context, filePath string) error {
 
-	_, path, err := splitPath(s.name, filePath)
+	path, err := ParsePathAtService(filePath, s.name)
 	if err != nil {
 		return err
 	}
@@ -452,7 +452,7 @@ func (s *S3FS) Delete(ctx context.Context, filePath string) error {
 		ctx,
 		&s3.DeleteObjectInput{
 			Bucket: ptrTo(s.bucket),
-			Key:    ptrTo(s.pathToKey(path)),
+			Key:    ptrTo(s.pathToKey(path.File)),
 		},
 	)
 	if err != nil {
@@ -463,7 +463,7 @@ func (s *S3FS) Delete(ctx context.Context, filePath string) error {
 }
 
 func (s *S3FS) pathToKey(filePath string) string {
-	return path.Join(s.keyPrefix, filePath)
+	return pathpkg.Join(s.keyPrefix, filePath)
 }
 
 func (s *S3FS) keyToPath(key string) string {
-- 
GitLab