Skip to content
Snippets Groups Projects
Unverified Commit cabcaef7 authored by reusee's avatar reusee Committed by GitHub
Browse files

fileservice: add GetForETL (#4788)

fileservice: add GetForETL

Approved by: @nnsgmsone
parent 45399d8f
No related branches found
No related tags found
No related merge requests found
......@@ -16,6 +16,7 @@ package fileservice
import (
"fmt"
"path/filepath"
"strings"
)
......@@ -46,3 +47,27 @@ func Get[T any](fs FileService, name string) (res T, err error) {
}
return
}
func GetForETL(fs FileService, path string) (res ETLFileService, readPath string, err error) {
fsPath, err := ParsePath(path)
if err != nil {
return nil, "", err
}
if fsPath.Service == "" {
// no service, create local ETL fs
dir, file := filepath.Split(path)
res, err = NewLocalETLFS("etl", dir)
if err != nil {
return nil, "", err
}
readPath = file
} else {
// get etl fs
res, err = Get[ETLFileService](fs, fsPath.Service)
if err != nil {
return nil, "", err
}
readPath = fsPath.Full
}
return
}
......@@ -27,7 +27,6 @@ import (
"io"
"math"
"path"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
......@@ -95,35 +94,15 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) {
func ReadDir(param *tree.ExternParam) (fileList []string, err error) {
dir, pattern := path.Split(param.Filepath)
var fs fileservice.ETLFileService
var readPath string
fsPath, err := fileservice.ParsePath(dir)
fs, readPath, err := fileservice.GetForETL(param.FileService, dir+"/")
if err != nil {
return nil, err
}
if fsPath.Service == "" {
// no service, create ETL fs
fs, err = fileservice.NewLocalETLFS("etl", dir)
if err != nil {
return nil, err
}
readPath = ""
} else {
// get etl fs
fs, err = fileservice.Get[fileservice.ETLFileService](param.FileService, fsPath.Service)
if err != nil {
return nil, err
}
readPath = dir
}
ctx := context.TODO()
entries, err := fs.List(ctx, readPath)
if err != nil {
return nil, err
}
for _, entry := range entries {
matched, _ := path.Match(pattern, entry.Name)
if !matched {
......@@ -131,35 +110,14 @@ func ReadDir(param *tree.ExternParam) (fileList []string, err error) {
}
fileList = append(fileList, path.Join(dir, entry.Name))
}
return
}
func ReadFile(param *tree.ExternParam) (io.ReadCloser, error) {
var fs fileservice.ETLFileService
var readPath string
fsPath, err := fileservice.ParsePath(param.Filepath)
fs, readPath, err := fileservice.GetForETL(param.FileService, param.Filepath)
if err != nil {
return nil, err
}
if fsPath.Service == "" {
// no service, create ETL fs
dir, file := filepath.Split(param.Filepath)
fs, err = fileservice.NewLocalETLFS("etl", dir)
if err != nil {
return nil, err
}
readPath = file
} else {
// get etl fs
fs, err = fileservice.Get[fileservice.ETLFileService](param.FileService, fsPath.Service)
if err != nil {
return nil, err
}
readPath = fsPath.Full
}
var r io.ReadCloser
vec := fileservice.IOVector{
FilePath: readPath,
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment