diff --git a/pkg/fileservice/get.go b/pkg/fileservice/get.go index 5899c1d5e27da8347587e3444692df32b0150f4e..76b5aba3cfde4b5e036723109df51ba93de38b07 100644 --- a/pkg/fileservice/get.go +++ b/pkg/fileservice/get.go @@ -16,6 +16,7 @@ package fileservice import ( "fmt" + "path/filepath" "strings" ) @@ -46,3 +47,27 @@ func Get[T any](fs FileService, name string) (res T, err error) { } return } + +func GetForETL(fs FileService, path string) (res ETLFileService, readPath string, err error) { + fsPath, err := ParsePath(path) + if err != nil { + return nil, "", err + } + if fsPath.Service == "" { + // no service, create local ETL fs + dir, file := filepath.Split(path) + res, err = NewLocalETLFS("etl", dir) + if err != nil { + return nil, "", err + } + readPath = file + } else { + // get etl fs + res, err = Get[ETLFileService](fs, fsPath.Service) + if err != nil { + return nil, "", err + } + readPath = fsPath.Full + } + return +} diff --git a/pkg/sql/colexec/external/external.go b/pkg/sql/colexec/external/external.go index 64d6153ad57ea466a705a911c0fd226a75986ae3..2d39dc87d3f996e8c8044195300916722c3d70c9 100644 --- a/pkg/sql/colexec/external/external.go +++ b/pkg/sql/colexec/external/external.go @@ -27,7 +27,6 @@ import ( "io" "math" "path" - "path/filepath" "strconv" "strings" "sync/atomic" @@ -95,35 +94,15 @@ func Call(_ int, proc *process.Process, arg any) (bool, error) { func ReadDir(param *tree.ExternParam) (fileList []string, err error) { dir, pattern := path.Split(param.Filepath) - - var fs fileservice.ETLFileService - var readPath string - fsPath, err := fileservice.ParsePath(dir) + fs, readPath, err := fileservice.GetForETL(param.FileService, dir+"/") if err != nil { return nil, err } - if fsPath.Service == "" { - // no service, create ETL fs - fs, err = fileservice.NewLocalETLFS("etl", dir) - if err != nil { - return nil, err - } - readPath = "" - } else { - // get etl fs - fs, err = fileservice.Get[fileservice.ETLFileService](param.FileService, fsPath.Service) - if err != nil { - return nil, err - } - readPath = dir - } - ctx := context.TODO() entries, err := fs.List(ctx, readPath) if err != nil { return nil, err } - for _, entry := range entries { matched, _ := path.Match(pattern, entry.Name) if !matched { @@ -131,35 +110,14 @@ func ReadDir(param *tree.ExternParam) (fileList []string, err error) { } fileList = append(fileList, path.Join(dir, entry.Name)) } - return } func ReadFile(param *tree.ExternParam) (io.ReadCloser, error) { - - var fs fileservice.ETLFileService - var readPath string - fsPath, err := fileservice.ParsePath(param.Filepath) + fs, readPath, err := fileservice.GetForETL(param.FileService, param.Filepath) if err != nil { return nil, err } - if fsPath.Service == "" { - // no service, create ETL fs - dir, file := filepath.Split(param.Filepath) - fs, err = fileservice.NewLocalETLFS("etl", dir) - if err != nil { - return nil, err - } - readPath = file - } else { - // get etl fs - fs, err = fileservice.Get[fileservice.ETLFileService](param.FileService, fsPath.Service) - if err != nil { - return nil, err - } - readPath = fsPath.Full - } - var r io.ReadCloser vec := fileservice.IOVector{ FilePath: readPath,