diff --git a/cmd/mo-service/config.go b/cmd/mo-service/config.go index 6d7431e4ed85d3f8710d6966f2752870b298498f..a7abc16bf71dcd0782ad2794495668bfd871eeab 100644 --- a/cmd/mo-service/config.go +++ b/cmd/mo-service/config.go @@ -98,14 +98,33 @@ func (c *Config) validate() error { return nil } -// FIXME: fileservice created by a config instance is very strange at best -func (c *Config) createFileService(name string) (fileservice.FileService, error) { - for _, cfg := range c.FileServices { - if strings.EqualFold(cfg.Name, name) { - return fileservice.NewFileService(cfg) +func (c *Config) createFileService(defaultName string) (*fileservice.FileServices, error) { + // create all services + services := make([]fileservice.FileService, 0, len(c.FileServices)) + for _, config := range c.FileServices { + service, err := fileservice.NewFileService(config) + if err != nil { + return nil, err } + services = append(services, service) + } + + // create FileServices + s, err := fileservice.NewFileServices( + defaultName, + services..., + ) + if err != nil { + return nil, err } - return nil, fmt.Errorf("file service named %s not set", name) + + // validate default name + _, err = fileservice.Get[fileservice.FileService](s, defaultName) + if err != nil { + return nil, err + } + + return s, nil } func (c *Config) getLogServiceConfig() logservice.Config { diff --git a/pkg/dnservice/store.go b/pkg/dnservice/store.go index b810dec73dc8618d7ae1b7903af7346493b1f330..d6805246dc7fe9bfb9024dfa37d2e9aeab41d23e 100644 --- a/pkg/dnservice/store.go +++ b/pkg/dnservice/store.go @@ -81,7 +81,7 @@ type store struct { sender rpc.TxnSender server rpc.TxnServer hakeeperClient logservice.DNHAKeeperClient - fsFactory fileservice.FileServiceFactory + newFS fileservice.NewFileServicesFunc fs fileservice.FileService metadataFS fileservice.ReplaceableFileService replicas *sync.Map @@ -102,15 +102,15 @@ type store struct { // NewService create DN Service func NewService(cfg *Config, - fsFactory fileservice.FileServiceFactory, + newFS fileservice.NewFileServicesFunc, opts ...Option) (Service, error) { if err := cfg.Validate(); err != nil { return nil, err } s := &store{ - cfg: cfg, - fsFactory: fsFactory, + cfg: cfg, + newFS: newFS, } for _, opt := range opts { opt(s) @@ -379,26 +379,33 @@ func (s *store) initHAKeeperClient() error { } func (s *store) initFileService() error { - localFS, err := s.fsFactory(localFileServiceName) + // create + fs, err := s.newFS(localFileServiceName) if err != nil { return err } - rfs, err := fileservice.Get[fileservice.ReplaceableFileService]( - localFS, localFileServiceName) + + // ensure local exists + localFS, err := fileservice.Get[fileservice.FileService](fs, localFileServiceName) if err != nil { return err } - s3FS, err := s.fsFactory(s3FileServiceName) + // ensure s3 exists + _, err = fileservice.Get[fileservice.FileService](fs, s3FileServiceName) if err != nil { return err } - s.fs, err = fileservice.NewFileServices(localFileServiceName, localFS, s3FS) + // get metadata fs + metadataFS, err := fileservice.Get[fileservice.ReplaceableFileService](localFS, localFileServiceName) if err != nil { return err } - s.metadataFS = rfs + + // set + s.fs = fs + s.metadataFS = metadataFS return nil } diff --git a/pkg/dnservice/store_test.go b/pkg/dnservice/store_test.go index b3a074fde3e7ed6876b9a317122c9182a6139572..1aea87e7d9d236de07db2314cb38ade89d276773 100644 --- a/pkg/dnservice/store_test.go +++ b/pkg/dnservice/store_test.go @@ -56,11 +56,16 @@ func TestAddReplica(t *testing.T) { func TestStartWithReplicas(t *testing.T) { localFS, err := fileservice.NewMemoryFS(localFileServiceName) assert.NoError(t, err) - factory := func(name string) (fileservice.FileService, error) { - if name == localFileServiceName { - return localFS, nil + factory := func(name string) (*fileservice.FileServices, error) { + s3fs, err := fileservice.NewMemoryFS(s3FileServiceName) + if err != nil { + return nil, err } - return fileservice.NewMemoryFS(name) + return fileservice.NewFileServices( + localFileServiceName, + s3fs, + localFS, + ) } runDNStoreTestWithFileServiceFactory(t, func(s *store) { @@ -131,15 +136,27 @@ func runDNStoreTest( t *testing.T, testFn func(*store), opts ...Option) { - runDNStoreTestWithFileServiceFactory(t, testFn, func(name string) (fileservice.FileService, error) { - return fileservice.NewMemoryFS(name) + runDNStoreTestWithFileServiceFactory(t, testFn, func(name string) (*fileservice.FileServices, error) { + local, err := fileservice.NewMemoryFS( + localFileServiceName, + ) + if err != nil { + return nil, err + } + s3, err := fileservice.NewMemoryFS( + s3FileServiceName, + ) + if err != nil { + return nil, err + } + return fileservice.NewFileServices(name, local, s3) }, opts...) } func runDNStoreTestWithFileServiceFactory( t *testing.T, testFn func(*store), - fsFactory fileservice.FileServiceFactory, + fsFactory fileservice.NewFileServicesFunc, opts ...Option) { thc := newTestHAKeeperClient() opts = append(opts, @@ -155,8 +172,12 @@ func runDNStoreTestWithFileServiceFactory( })) if fsFactory == nil { - fsFactory = func(name string) (fileservice.FileService, error) { - return fileservice.NewMemoryFS(name) + fsFactory = func(name string) (*fileservice.FileServices, error) { + fs, err := fileservice.NewMemoryFS(name) + if err != nil { + return nil, err + } + return fileservice.NewFileServices(name, fs) } } s := newTestStore(t, "u1", fsFactory, opts...) @@ -199,7 +220,7 @@ func addTestReplica(t *testing.T, s *store, shardID, replicaID, logShardID uint6 func newTestStore( t *testing.T, uuid string, - fsFactory fileservice.FileServiceFactory, + fsFactory fileservice.NewFileServicesFunc, options ...Option) *store { assert.NoError(t, os.RemoveAll(testDNStoreAddr[7:])) c := &Config{ diff --git a/pkg/fileservice/config.go b/pkg/fileservice/config.go index 3d8dd23b262e7a91609bdada1383c2235903a870..9f6e41c1ff478320784c971aa8a9121011a5c84b 100644 --- a/pkg/fileservice/config.go +++ b/pkg/fileservice/config.go @@ -22,10 +22,11 @@ import ( ) const ( - memFileServiceBackend = "MEM" - diskFileServiceBackend = "DISK" - s3FileServiceBackend = "S3" - minioFileServiceBackend = "MINIO" + memFileServiceBackend = "MEM" + diskFileServiceBackend = "DISK" + diskETLFileServiceBackend = "DISK-ETL" + s3FileServiceBackend = "S3" + minioFileServiceBackend = "MINIO" ) // Config fileService config @@ -42,10 +43,8 @@ type Config struct { DataDir string `toml:"data-dir"` } -// FileServiceFactory returns an instance of fileservice by name. When starting a MO node, multiple -// FileServiceConfig configurations are specified in the configuration file for use in specific scenarios. -// The corresponding instance is obtained according to Name. -type FileServiceFactory func(name string) (FileService, error) +// NewFileServicesFunc creates a new *FileServices +type NewFileServicesFunc = func(defaultName string) (*FileServices, error) // NewFileService create file service from config func NewFileService(cfg Config) (FileService, error) { @@ -54,6 +53,8 @@ func NewFileService(cfg Config) (FileService, error) { return newMemFileService(cfg) case diskFileServiceBackend: return newDiskFileService(cfg) + case diskETLFileServiceBackend: + return newDiskETLFileService(cfg) case minioFileServiceBackend: return newMinioFileService(cfg) case s3FileServiceBackend: @@ -83,6 +84,17 @@ func newDiskFileService(cfg Config) (FileService, error) { return fs, nil } +func newDiskETLFileService(cfg Config) (FileService, error) { + fs, err := NewLocalETLFS( + cfg.Name, + cfg.DataDir, + ) + if err != nil { + return nil, err + } + return fs, nil +} + func newMinioFileService(cfg Config) (FileService, error) { fs, err := NewS3FSOnMinio( cfg.S3.SharedConfigProfile, diff --git a/pkg/tests/service/dnservice.go b/pkg/tests/service/dnservice.go index 60d92cec244f0613e483dcbc03c11a1e6e5308f5..2a283f16db85ff5e8c7aa366a84899936f6d117b 100644 --- a/pkg/tests/service/dnservice.go +++ b/pkg/tests/service/dnservice.go @@ -126,7 +126,7 @@ type dnOptions []dnservice.Option // newDNService initializes an instance of `DNService`. func newDNService( cfg *dnservice.Config, - factory fileservice.FileServiceFactory, + factory fileservice.NewFileServicesFunc, opts dnOptions, ) (DNService, error) { svc, err := dnservice.NewService(cfg, factory, opts...) diff --git a/pkg/tests/service/service.go b/pkg/tests/service/service.go index d974cde2e622a1bdaeffbe4b5857c74ae7178775..44793363894997256cf7a499fc182170aa1422c0 100644 --- a/pkg/tests/service/service.go +++ b/pkg/tests/service/service.go @@ -17,7 +17,6 @@ package service import ( "context" "fmt" - "strings" "sync" "testing" "time" @@ -998,16 +997,13 @@ func (c *testCluster) initDNServices(fileservices *fileServices) []DNService { for i := 0; i < batch; i++ { cfg := c.dn.cfgs[i] opt := c.dn.opts[i] - fsFactory := func(name string) (fileservice.FileService, error) { + fsFactory := func(name string) (*fileservice.FileServices, error) { index := i - switch strings.ToUpper(name) { - case "LOCAL": - return fileservices.getLocalFileService(index), nil - case "S3": - return fileservices.getS3FileService(), nil - default: - return nil, ErrInvalidFSName - } + return fileservice.NewFileServices( + "LOCAL", + fileservices.getLocalFileService(index), + fileservices.getS3FileService(), + ) } ds, err := newDNService(cfg, fsFactory, opt)