Skip to content
Snippets Groups Projects
Unverified Commit 4eb202d7 authored by reusee's avatar reusee Committed by GitHub
Browse files

fileservice: add Path, ParsePath, ParsePathAtService (#4748)

fileservice: add Path, ParsePath, ParsePathAtService

Approved by: @zhangxu19830126
parent 083897b4
No related branches found
No related tags found
No related merge requests found
......@@ -43,33 +43,33 @@ func NewFileServices(defaultName string, fss ...FileService) (*FileServices, err
var _ FileService = &FileServices{}
func (f *FileServices) Delete(ctx context.Context, filePath string) error {
name, path, err := splitPath("", filePath)
path, err := ParsePathAtService(filePath, "")
if err != nil {
return err
}
if name == "" {
name = f.defaultName
if path.Service == "" {
path.Service = f.defaultName
}
fs, err := Get[FileService](f, name)
fs, err := Get[FileService](f, path.Service)
if err != nil {
return err
}
return fs.Delete(ctx, path)
return fs.Delete(ctx, path.Full)
}
func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error) {
name, path, err := splitPath("", dirPath)
path, err := ParsePathAtService(dirPath, "")
if err != nil {
return nil, err
}
if name == "" {
name = f.defaultName
if path.Service == "" {
path.Service = f.defaultName
}
fs, err := Get[FileService](f, name)
fs, err := Get[FileService](f, path.Service)
if err != nil {
return nil, err
}
return fs.List(ctx, path)
return fs.List(ctx, path.Full)
}
func (f *FileServices) Name() string {
......@@ -77,14 +77,14 @@ func (f *FileServices) Name() string {
}
func (f *FileServices) Read(ctx context.Context, vector *IOVector) error {
name, _, err := splitPath("", vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, "")
if err != nil {
return err
}
if name == "" {
name = f.defaultName
if path.Service == "" {
path.Service = f.defaultName
}
fs, err := Get[FileService](f, name)
fs, err := Get[FileService](f, path.Service)
if err != nil {
return err
}
......@@ -92,14 +92,14 @@ func (f *FileServices) Read(ctx context.Context, vector *IOVector) error {
}
func (f *FileServices) Write(ctx context.Context, vector IOVector) error {
name, _, err := splitPath("", vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, "")
if err != nil {
return err
}
if name == "" {
name = f.defaultName
if path.Service == "" {
path.Service = f.defaultName
}
fs, err := Get[FileService](f, name)
fs, err := Get[FileService](f, path.Service)
if err != nil {
return err
}
......
......@@ -58,11 +58,11 @@ func (l *LocalETLFS) ensureTempDir() (err error) {
}
func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error {
_, path, err := splitPath(l.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
// check existence
_, err = os.Stat(nativePath)
......@@ -75,11 +75,11 @@ func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error {
}
func (l *LocalETLFS) write(ctx context.Context, vector IOVector) error {
_, path, err := splitPath(l.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
// sort
sort.Slice(vector.Entries, func(i, j int) bool {
......@@ -140,11 +140,11 @@ func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error {
return ErrEmptyVector
}
_, path, err := splitPath(l.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
_, err = os.Stat(nativePath)
if os.IsNotExist(err) {
......@@ -302,11 +302,11 @@ func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error {
func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error) {
_, path, err := splitPath(l.name, dirPath)
path, err := ParsePathAtService(dirPath, l.name)
if err != nil {
return nil, err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
f, err := os.Open(nativePath)
if os.IsNotExist(err) {
......@@ -347,11 +347,11 @@ func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry,
}
func (l *LocalETLFS) Delete(ctx context.Context, filePath string) error {
_, path, err := splitPath(l.name, filePath)
path, err := ParsePathAtService(filePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
_, err = os.Stat(nativePath)
if os.IsNotExist(err) {
......@@ -456,11 +456,11 @@ func (l *LocalETLFS) ETLCompatible() {}
var _ MutableFileService = new(LocalETLFS)
func (l *LocalETLFS) NewMutator(filePath string) (Mutator, error) {
_, path, err := splitPath(l.name, filePath)
path, err := ParsePathAtService(filePath, l.name)
if err != nil {
return nil, err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
f, err := os.OpenFile(nativePath, os.O_RDWR, 0644)
if os.IsNotExist(err) {
return nil, ErrFileNotFound
......
......@@ -109,11 +109,11 @@ func (l *LocalFS) Name() string {
}
func (l *LocalFS) Write(ctx context.Context, vector IOVector) error {
_, path, err := splitPath(l.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
// check existence
_, err = os.Stat(nativePath)
......@@ -126,11 +126,11 @@ func (l *LocalFS) Write(ctx context.Context, vector IOVector) error {
}
func (l *LocalFS) write(ctx context.Context, vector IOVector) error {
_, path, err := splitPath(l.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
// sort
sort.Slice(vector.Entries, func(i, j int) bool {
......@@ -206,11 +206,11 @@ func (l *LocalFS) Read(ctx context.Context, vector *IOVector) error {
func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
_, path, err := splitPath(l.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
file, err := os.Open(nativePath)
if os.IsNotExist(err) {
......@@ -363,11 +363,11 @@ func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error) {
_, path, err := splitPath(l.name, dirPath)
path, err := ParsePathAtService(dirPath, l.name)
if err != nil {
return nil, err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
f, err := os.Open(nativePath)
if os.IsNotExist(err) {
......@@ -408,11 +408,11 @@ func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err
}
func (l *LocalFS) Delete(ctx context.Context, filePath string) error {
_, path, err := splitPath(l.name, filePath)
path, err := ParsePathAtService(filePath, l.name)
if err != nil {
return err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
_, err = os.Stat(nativePath)
if os.IsNotExist(err) {
......@@ -515,11 +515,11 @@ func (l *LocalFS) toNativeFilePath(filePath string) string {
var _ MutableFileService = new(LocalFS)
func (l *LocalFS) NewMutator(filePath string) (Mutator, error) {
_, path, err := splitPath(l.name, filePath)
path, err := ParsePathAtService(filePath, l.name)
if err != nil {
return nil, err
}
nativePath := l.toNativeFilePath(path)
nativePath := l.toNativeFilePath(path.File)
f, err := os.OpenFile(nativePath, os.O_RDWR, 0644)
if os.IsNotExist(err) {
return nil, ErrFileNotFound
......
......@@ -51,7 +51,7 @@ func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry
m.RLock()
defer m.RUnlock()
_, dirPath, err = splitPath(m.name, dirPath)
path, err := ParsePathAtService(dirPath, m.name)
if err != nil {
return nil, err
}
......@@ -60,15 +60,15 @@ func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry
defer iter.Release()
pivot := &_MemFSEntry{
FilePath: dirPath,
FilePath: path.File,
}
for ok := iter.Seek(pivot); ok; ok = iter.Next() {
item := iter.Item()
if !strings.HasPrefix(item.FilePath, dirPath) {
if !strings.HasPrefix(item.FilePath, path.File) {
break
}
relPath := strings.TrimPrefix(item.FilePath, dirPath)
relPath := strings.TrimPrefix(item.FilePath, path.File)
relPath = strings.Trim(relPath, "/")
parts := strings.Split(relPath, "/")
isDir := len(parts) > 1
......@@ -90,13 +90,13 @@ func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error {
m.Lock()
defer m.Unlock()
_, path, err := splitPath(m.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, m.name)
if err != nil {
return err
}
pivot := &_MemFSEntry{
FilePath: path,
FilePath: path.File,
}
_, ok := m.tree.Get(pivot)
if ok {
......@@ -108,7 +108,7 @@ func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error {
func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
_, path, err := splitPath(m.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, m.name)
if err != nil {
return err
}
......@@ -133,7 +133,7 @@ func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
return err
}
entry := &_MemFSEntry{
FilePath: path,
FilePath: path.File,
Data: data,
}
m.tree.Set(entry)
......@@ -143,7 +143,7 @@ func (m *MemoryFS) write(ctx context.Context, vector IOVector) error {
func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) error {
_, path, err := splitPath(m.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, m.name)
if err != nil {
return err
}
......@@ -156,7 +156,7 @@ func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) error {
defer m.RUnlock()
pivot := &_MemFSEntry{
FilePath: path,
FilePath: path.File,
}
fsEntry, ok := m.tree.Get(pivot)
......@@ -217,13 +217,13 @@ func (m *MemoryFS) Delete(ctx context.Context, filePath string) error {
m.Lock()
defer m.Unlock()
_, path, err := splitPath(m.name, filePath)
path, err := ParsePathAtService(filePath, m.name)
if err != nil {
return err
}
pivot := &_MemFSEntry{
FilePath: path,
FilePath: path.File,
}
m.tree.Delete(pivot)
......
......@@ -21,17 +21,41 @@ import (
const ServiceNameSeparator = ":"
func splitPath(serviceName string, path string) (string, string, error) {
parts := strings.SplitN(path, ServiceNameSeparator, 2)
if len(parts) == 2 {
if serviceName != "" &&
parts[0] != "" &&
!strings.EqualFold(parts[0], serviceName) {
return "", "", fmt.Errorf("wrong file service name, expecting %s, got %s", serviceName, parts[0])
}
return parts[0], parts[1], nil
type Path struct {
Full string
Service string
File string
}
func ParsePath(s string) (path Path, err error) {
parts := strings.SplitN(s, ServiceNameSeparator, 2)
switch len(parts) {
case 1:
// no service
path.File = parts[0]
case 2:
// with service
path.Service = parts[0]
path.File = parts[1]
default:
panic("impossible")
}
path.Full = joinPath(path.Service, path.File)
return
}
func ParsePathAtService(s string, serviceName string) (path Path, err error) {
path, err = ParsePath(s)
if err != nil {
return
}
if serviceName != "" &&
path.Service != "" &&
!strings.EqualFold(path.Service, serviceName) {
err = fmt.Errorf("wrong file service name, expecting %s, got %s", serviceName, path.Service)
return
}
return "", parts[0], nil
return
}
func joinPath(serviceName string, path string) string {
......
......@@ -22,7 +22,7 @@ import (
"io"
"math"
"net/url"
"path"
pathpkg "path"
"sort"
"strings"
"time"
......@@ -170,11 +170,11 @@ func (s *S3FS) Name() string {
func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error) {
_, p, err := splitPath(s.name, dirPath)
path, err := ParsePathAtService(dirPath, s.name)
if err != nil {
return nil, err
}
prefix := s.pathToKey(p)
prefix := s.pathToKey(path.File)
if prefix != "" {
prefix += "/"
}
......@@ -197,7 +197,7 @@ func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, er
for _, obj := range output.Contents {
filePath := s.keyToPath(*obj.Key)
filePath = strings.TrimRight(filePath, "/")
_, name := path.Split(filePath)
_, name := pathpkg.Split(filePath)
entries = append(entries, DirEntry{
Name: name,
IsDir: false,
......@@ -227,11 +227,11 @@ func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, er
func (s *S3FS) Write(ctx context.Context, vector IOVector) error {
// check existence
_, path, err := splitPath(s.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, s.name)
if err != nil {
return err
}
key := s.pathToKey(path)
key := s.pathToKey(path.File)
output, err := s.client.HeadObject(
ctx,
&s3.HeadObjectInput{
......@@ -257,11 +257,11 @@ func (s *S3FS) Write(ctx context.Context, vector IOVector) error {
}
func (s *S3FS) write(ctx context.Context, vector IOVector) error {
_, path, err := splitPath(s.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, s.name)
if err != nil {
return err
}
key := s.pathToKey(path)
key := s.pathToKey(path.File)
// sort
sort.Slice(vector.Entries, func(i, j int) bool {
......@@ -315,7 +315,7 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) error {
}
func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
_, path, err := splitPath(s.name, vector.FilePath)
path, err := ParsePathAtService(vector.FilePath, s.name)
if err != nil {
return err
}
......@@ -347,7 +347,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
ctx,
&s3.GetObjectInput{
Bucket: ptrTo(s.bucket),
Key: ptrTo(s.pathToKey(path)),
Key: ptrTo(s.pathToKey(path.File)),
Range: ptrTo(rang),
},
)
......@@ -368,7 +368,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
ctx,
&s3.GetObjectInput{
Bucket: ptrTo(s.bucket),
Key: ptrTo(s.pathToKey(path)),
Key: ptrTo(s.pathToKey(path.File)),
Range: ptrTo(rang),
},
)
......@@ -444,7 +444,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
func (s *S3FS) Delete(ctx context.Context, filePath string) error {
_, path, err := splitPath(s.name, filePath)
path, err := ParsePathAtService(filePath, s.name)
if err != nil {
return err
}
......@@ -452,7 +452,7 @@ func (s *S3FS) Delete(ctx context.Context, filePath string) error {
ctx,
&s3.DeleteObjectInput{
Bucket: ptrTo(s.bucket),
Key: ptrTo(s.pathToKey(path)),
Key: ptrTo(s.pathToKey(path.File)),
},
)
if err != nil {
......@@ -463,7 +463,7 @@ func (s *S3FS) Delete(ctx context.Context, filePath string) error {
}
func (s *S3FS) pathToKey(filePath string) string {
return path.Join(s.keyPrefix, filePath)
return pathpkg.Join(s.keyPrefix, filePath)
}
func (s *S3FS) keyToPath(key string) string {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment