Skip to content
Snippets Groups Projects
Unverified Commit 9984ed19 authored by reusee's avatar reusee Committed by GitHub
Browse files

mo-service: unify file service initialization in cnservice, dnservice, logservice (#4770)

mo-service: unify file service initialization in cnservice, dnservice, logservice

Approved by: @zhangxu19830126, @nnsgmsone, @lni
parent 4f433842
No related branches found
No related tags found
No related merge requests found
Showing
with 160 additions and 147 deletions
......@@ -34,6 +34,9 @@ const (
dnServiceType = "DN"
logServiceType = "LOG"
standaloneServiceType = "STANDALONE"
s3FileServiceName = "S3"
localFileServiceName = "LOCAL"
)
var ErrInvalidConfig = moerr.NewError(moerr.BAD_CONFIGURATION, "invalid log configuration")
......@@ -110,7 +113,7 @@ func (c *Config) createFileService(defaultName string) (*fileservice.FileService
}
// create FileServices
s, err := fileservice.NewFileServices(
fs, err := fileservice.NewFileServices(
defaultName,
services...,
)
......@@ -119,12 +122,24 @@ func (c *Config) createFileService(defaultName string) (*fileservice.FileService
}
// validate default name
_, err = fileservice.Get[fileservice.FileService](s, defaultName)
_, err = fileservice.Get[fileservice.FileService](fs, defaultName)
if err != nil {
return nil, err
}
// ensure local exists
_, err = fileservice.Get[fileservice.FileService](fs, localFileServiceName)
if err != nil {
return nil, err
}
// ensure s3 exists
_, err = fileservice.Get[fileservice.FileService](fs, s3FileServiceName)
if err != nil {
return nil, err
}
return s, nil
return fs, nil
}
func (c *Config) getLogServiceConfig() logservice.Config {
......
......@@ -75,14 +75,18 @@ func TestFileServiceFactory(t *testing.T) {
Name: "a",
Backend: "MEM",
})
c.FileServices = append(c.FileServices, fileservice.Config{
Name: localFileServiceName,
Backend: "MEM",
})
c.FileServices = append(c.FileServices, fileservice.Config{
Name: s3FileServiceName,
Backend: "MEM",
})
fs, err := c.createFileService("A")
assert.NoError(t, err)
assert.NotNil(t, fs)
fs, err = c.createFileService("B")
assert.Error(t, err)
assert.Nil(t, fs)
}
func TestResolveGossipSeedAddresses(t *testing.T) {
......
......@@ -19,8 +19,6 @@ import (
"errors"
"flag"
"fmt"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/sql/compile"
"math/rand"
"os"
"os/signal"
......@@ -28,6 +26,10 @@ import (
"syscall"
"time"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/sql/compile"
"github.com/matrixorigin/matrixone/pkg/cnservice"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
......@@ -72,28 +74,37 @@ func waitSignalToStop(stopper *stopper.Stopper) {
}
func startService(cfg *Config, stopper *stopper.Stopper) error {
// TODO: start other service
fs, err := cfg.createFileService(localFileServiceName)
if err != nil {
return err
}
switch strings.ToUpper(cfg.ServiceType) {
case cnServiceType:
return startCNService(cfg, stopper)
return startCNService(cfg, stopper, fs)
case dnServiceType:
return startDNService(cfg, stopper)
return startDNService(cfg, stopper, fs)
case logServiceType:
return startLogService(cfg, stopper)
return startLogService(cfg, stopper, fs)
case standaloneServiceType:
return startStandalone(cfg, stopper)
return startStandalone(cfg, stopper, fs)
default:
panic("unknown service type")
}
}
func startCNService(cfg *Config, stopper *stopper.Stopper) error {
func startCNService(
cfg *Config,
stopper *stopper.Stopper,
fileService fileservice.FileService,
) error {
return stopper.RunNamedTask("cn-service", func(ctx context.Context) {
c := cfg.getCNServiceConfig()
s, err := cnservice.NewService(
&c,
ctx,
cfg.createFileService,
fileService,
cnservice.WithMessageHandle(compile.CnServerMessageHandler),
)
if err != nil {
......@@ -114,11 +125,16 @@ func startCNService(cfg *Config, stopper *stopper.Stopper) error {
})
}
func startDNService(cfg *Config, stopper *stopper.Stopper) error {
func startDNService(
cfg *Config,
stopper *stopper.Stopper,
fileService fileservice.FileService,
) error {
return stopper.RunNamedTask("dn-service", func(ctx context.Context) {
c := cfg.getDNServiceConfig()
s, err := dnservice.NewService(&c,
cfg.createFileService,
s, err := dnservice.NewService(
&c,
fileService,
dnservice.WithLogger(logutil.GetGlobalLogger().Named("dn-service")))
if err != nil {
panic(err)
......@@ -134,9 +150,13 @@ func startDNService(cfg *Config, stopper *stopper.Stopper) error {
})
}
func startLogService(cfg *Config, stopper *stopper.Stopper) error {
func startLogService(
cfg *Config,
stopper *stopper.Stopper,
fileService fileservice.FileService,
) error {
lscfg := cfg.getLogServiceConfig()
s, err := logservice.NewService(lscfg)
s, err := logservice.NewService(lscfg, fileService)
if err != nil {
panic(err)
}
......@@ -158,10 +178,14 @@ func startLogService(cfg *Config, stopper *stopper.Stopper) error {
})
}
func startStandalone(cfg *Config, stopper *stopper.Stopper) error {
func startStandalone(
cfg *Config,
stopper *stopper.Stopper,
fileService fileservice.FileService,
) error {
// start log service
if err := startLogService(cfg, stopper); err != nil {
if err := startLogService(cfg, stopper, fileService); err != nil {
return err
}
......@@ -185,7 +209,7 @@ func startStandalone(cfg *Config, stopper *stopper.Stopper) error {
}
// start DN
if err := startDNService(cfg, stopper); err != nil {
if err := startDNService(cfg, stopper, fileService); err != nil {
return err
}
......@@ -213,7 +237,7 @@ func startStandalone(cfg *Config, stopper *stopper.Stopper) error {
}
// start CN
if err := startCNService(cfg, stopper); err != nil {
if err := startCNService(cfg, stopper, fileService); err != nil {
return err
}
......
......@@ -37,17 +37,12 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
)
const (
s3FileServiceName = "S3"
localFileServiceName = "LOCAL"
)
type Options func(*service)
func NewService(
cfg *Config,
ctx context.Context,
newFS fileservice.NewFileServicesFunc,
fileService fileservice.FileService,
options ...Options,
) (Service, error) {
......@@ -56,8 +51,8 @@ func NewService(
}
srv := &service{
cfg: cfg,
newFS: newFS,
cfg: cfg,
fileService: fileService,
}
srv.logger = logutil.Adjust(srv.logger)
srv.responsePool = &sync.Pool{
......@@ -127,11 +122,7 @@ func (s *service) initMOServer(ctx context.Context, pu *config.ParameterUnit) er
pu.HostMmu = host.New(pu.SV.HostMmuLimitation)
fs, err := s.getFileService()
if err != nil {
return err
}
pu.FileService = fs
pu.FileService = s.fileService
logutil.Info("Initialize the engine ...")
err = s.initEngine(ctx, cancelMoServerCtx, pu)
......@@ -268,33 +259,3 @@ func WithMessageHandle(f func(ctx context.Context, message morpc.Message, cs mor
s.requestHandler = f
}
}
func (s *service) getFileService() (fs fileservice.FileService, err error) {
s.initFileServiceOnce.Do(func() {
// create
fs, err = s.newFS(localFileServiceName)
if err != nil {
return
}
// ensure local exists
_, err := fileservice.Get[fileservice.FileService](fs, localFileServiceName)
if err != nil {
return
}
// ensure s3 exists
_, err = fileservice.Get[fileservice.FileService](fs, s3FileServiceName)
if err != nil {
return
}
// set
s._fileService = fs
})
fs = s._fileService
return
}
......@@ -119,7 +119,5 @@ type service struct {
_txnSender rpc.TxnSender
initTxnClientOnce sync.Once
_txnClient client.TxnClient
initFileServiceOnce sync.Once
_fileService fileservice.FileService
newFS fileservice.NewFileServicesFunc
fileService fileservice.FileService
}
......@@ -133,5 +133,5 @@ func (s *store) newMemKVStorage(shard metadata.DNShard, logClient logservice.Cli
}
func (s *store) newTAEStorage(shard metadata.DNShard, logClient logservice.Client) (storage.TxnStorage, error) {
return taestorage.New(shard, logClient, s.fs, s.clock)
return taestorage.New(shard, logClient, s.fileService, s.clock)
}
......@@ -75,17 +75,16 @@ func WithLogServiceClientFactory(factory func(metadata.DNShard) (logservice.Clie
}
type store struct {
cfg *Config
logger *zap.Logger
clock clock.Clock
sender rpc.TxnSender
server rpc.TxnServer
hakeeperClient logservice.DNHAKeeperClient
newFS fileservice.NewFileServicesFunc
fs fileservice.FileService
metadataFS fileservice.ReplaceableFileService
replicas *sync.Map
stopper *stopper.Stopper
cfg *Config
logger *zap.Logger
clock clock.Clock
sender rpc.TxnSender
server rpc.TxnServer
hakeeperClient logservice.DNHAKeeperClient
fileService fileservice.FileService
metadataFileService fileservice.ReplaceableFileService
replicas *sync.Map
stopper *stopper.Stopper
options struct {
logServiceClientFactory func(metadata.DNShard) (logservice.Client, error)
......@@ -102,15 +101,22 @@ type store struct {
// NewService create DN Service
func NewService(cfg *Config,
newFS fileservice.NewFileServicesFunc,
fileService fileservice.FileService,
opts ...Option) (Service, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
// get metadata fs
metadataFS, err := fileservice.Get[fileservice.ReplaceableFileService](fileService, localFileServiceName)
if err != nil {
return nil, err
}
s := &store{
cfg: cfg,
newFS: newFS,
cfg: cfg,
fileService: fileService,
metadataFileService: metadataFS,
}
for _, opt := range opts {
opt(s)
......@@ -135,9 +141,6 @@ func NewService(cfg *Config,
if err := s.initTxnServer(); err != nil {
return nil, err
}
if err := s.initFileService(); err != nil {
return nil, err
}
if err := s.initMetadata(); err != nil {
return nil, err
}
......@@ -377,35 +380,3 @@ func (s *store) initHAKeeperClient() error {
s.hakeeperClient = client
return nil
}
func (s *store) initFileService() error {
// create
fs, err := s.newFS(localFileServiceName)
if err != nil {
return err
}
// ensure local exists
localFS, err := fileservice.Get[fileservice.FileService](fs, localFileServiceName)
if err != nil {
return err
}
// ensure s3 exists
_, err = fileservice.Get[fileservice.FileService](fs, s3FileServiceName)
if err != nil {
return err
}
// get metadata fs
metadataFS, err := fileservice.Get[fileservice.ReplaceableFileService](localFS, localFileServiceName)
if err != nil {
return err
}
// set
s.fs = fs
s.metadataFS = metadataFS
return nil
}
......@@ -41,7 +41,7 @@ func (s *store) initMetadata() error {
},
},
}
if err := s.metadataFS.Read(ctx, vec); err != nil {
if err := s.metadataFileService.Read(ctx, vec); err != nil {
if err == fileservice.ErrFileNotFound {
return nil
}
......@@ -100,7 +100,7 @@ func (s *store) mustUpdateMetadataLocked() {
},
},
}
if err := s.metadataFS.Replace(ctx, vec); err != nil {
if err := s.metadataFileService.Replace(ctx, vec); err != nil {
s.logger.Fatal("update metadata to local file failed",
zap.Error(err))
}
......
......@@ -29,7 +29,7 @@ func TestInitMetadata(t *testing.T) {
fs, err := fileservice.NewMemoryFS(localFileServiceName)
assert.NoError(t, err)
s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
s := &store{logger: logutil.GetPanicLogger(), metadataFileService: fs}
assert.NoError(t, s.initMetadata())
}
......@@ -58,7 +58,7 @@ func TestInitMetadataWithExistData(t *testing.T) {
},
}))
s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
s := &store{logger: logutil.GetPanicLogger(), metadataFileService: fs}
s.mu.metadata.UUID = "dn1"
assert.NoError(t, s.initMetadata())
assert.Equal(t, value, s.mu.metadata)
......@@ -88,6 +88,6 @@ func TestInitMetadataWithInvalidUUIDWillPanic(t *testing.T) {
},
}))
s := &store{logger: logutil.GetPanicLogger(), metadataFS: fs}
s := &store{logger: logutil.GetPanicLogger(), metadataFileService: fs}
assert.NoError(t, s.initMetadata())
}
......@@ -56,6 +56,7 @@ func TestAddReplica(t *testing.T) {
func TestStartWithReplicas(t *testing.T) {
localFS, err := fileservice.NewMemoryFS(localFileServiceName)
assert.NoError(t, err)
factory := func(name string) (*fileservice.FileServices, error) {
s3fs, err := fileservice.NewMemoryFS(s3FileServiceName)
if err != nil {
......@@ -228,7 +229,9 @@ func newTestStore(
ListenAddress: testDNStoreAddr,
}
c.Txn.Clock.MaxClockOffset.Duration = time.Duration(math.MaxInt64)
s, err := NewService(c, fsFactory, options...)
fs, err := fsFactory(localFileServiceName)
assert.Nil(t, err)
s, err := NewService(c, fs, options...)
assert.NoError(t, err)
return s.(*store)
}
......
......@@ -30,6 +30,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/testutil"
)
func runClientTest(t *testing.T,
......@@ -38,6 +39,7 @@ func runClientTest(t *testing.T,
cfg := getServiceTestConfig()
defer vfs.ReportLeakedFD(cfg.FS, t)
service, err := NewService(cfg,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -125,6 +127,7 @@ func TestClientCanBeConnectedByReverseProxy(t *testing.T) {
cfg := getServiceTestConfig()
defer vfs.ReportLeakedFD(cfg.FS, t)
service, err := NewService(cfg,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......
......@@ -32,6 +32,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/hakeeper"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/testutil"
)
func TestHAKeeperClientConfigIsValidated(t *testing.T) {
......@@ -305,6 +306,7 @@ func testNotHAKeeperErrorIsHandled(t *testing.T, fn func(*testing.T, *managedHAK
}
cfg1.Fill()
service1, err := NewService(cfg1,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -315,6 +317,7 @@ func testNotHAKeeperErrorIsHandled(t *testing.T, fn func(*testing.T, *managedHAK
}()
cfg2.Fill()
service2, err := NewService(cfg2,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......
......@@ -28,6 +28,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/fileservice"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
)
......@@ -57,13 +58,14 @@ func firstError(err1 error, err2 error) error {
// clients owned by DN nodes and the HAKeeper service via network, it can
// be considered as the interface layer of the LogService.
type Service struct {
cfg Config
store *store
server morpc.RPCServer
pool *sync.Pool
respPool *sync.Pool
stopper *stopper.Stopper
haClient LogHAKeeperClient
cfg Config
store *store
server morpc.RPCServer
pool *sync.Pool
respPool *sync.Pool
stopper *stopper.Stopper
haClient LogHAKeeperClient
fileService fileservice.FileService
options struct {
// morpc client would filter remote backend via this
......@@ -71,7 +73,11 @@ type Service struct {
}
}
func NewService(cfg Config, opts ...Option) (*Service, error) {
func NewService(
cfg Config,
fileService fileservice.FileService,
opts ...Option,
) (*Service, error) {
cfg.Fill()
if err := cfg.Validate(); err != nil {
return nil, err
......@@ -108,12 +114,13 @@ func NewService(cfg Config, opts ...Option) (*Service, error) {
return nil, err
}
service := &Service{
cfg: cfg,
store: store,
server: server,
pool: pool,
respPool: respPool,
stopper: stopper.NewStopper("log-service"),
cfg: cfg,
store: store,
server: server,
pool: pool,
respPool: respPool,
stopper: stopper.NewStopper("log-service"),
fileService: fileService,
}
for _, opt := range opts {
opt(service)
......
......@@ -27,6 +27,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/testutil"
)
func TestBackgroundTickAndHeartbeat(t *testing.T) {
......@@ -48,6 +49,7 @@ func TestBackgroundTickAndHeartbeat(t *testing.T) {
cfg.HAKeeperClientConfig.ServiceAddresses = []string{"127.0.0.1:9002"}
cfg.Fill()
service, err := NewService(cfg,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......
......@@ -31,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/testutil"
)
const (
......@@ -63,6 +64,7 @@ func runServiceTest(t *testing.T,
cfg := getServiceTestConfig()
defer vfs.ReportLeakedFD(cfg.FS, t)
service, err := NewService(cfg,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -90,6 +92,7 @@ func TestNewService(t *testing.T) {
cfg := getServiceTestConfig()
defer vfs.ReportLeakedFD(cfg.FS, t)
service, err := NewService(cfg,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -537,6 +540,7 @@ func TestShardInfoCanBeQueried(t *testing.T) {
}
cfg1.Fill()
service1, err := NewService(cfg1,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -550,6 +554,7 @@ func TestShardInfoCanBeQueried(t *testing.T) {
assert.NoError(t, service1.store.startReplica(1, 1, peers1, false))
cfg2.Fill()
service2, err := NewService(cfg2,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -666,6 +671,7 @@ func TestGossipInSimulatedCluster(t *testing.T) {
cfg.GossipProbeInterval.Duration = 350 * time.Millisecond
configs = append(configs, cfg)
service, err := NewService(cfg,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -773,6 +779,7 @@ func TestGossipInSimulatedCluster(t *testing.T) {
services[12] = nil
time.Sleep(2 * time.Second)
service, err := NewService(configs[12],
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......
......@@ -16,6 +16,7 @@ package logservice
import (
"github.com/lni/dragonboat/v4"
"github.com/matrixorigin/matrixone/pkg/fileservice"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
)
......@@ -23,8 +24,12 @@ type WrappedService struct {
svc *Service
}
func NewWrappedService(c Config, opts ...Option) (*WrappedService, error) {
svc, err := NewService(c, opts...)
func NewWrappedService(
c Config,
fileService fileservice.FileService,
opts ...Option,
) (*WrappedService, error) {
svc, err := NewService(c, fileService, opts...)
if err != nil {
return nil, err
}
......
......@@ -29,6 +29,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/hakeeper"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/testutil"
)
func TestIDAllocatorDefaultState(t *testing.T) {
......@@ -196,6 +197,7 @@ func runHAKeeperClusterTest(t *testing.T, fn func(*testing.T, []*Service)) {
cfg4.HAKeeperConfig.DNStoreTimeout.Duration = 10 * time.Second
cfg1.Fill()
service1, err := NewService(cfg1,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -206,6 +208,7 @@ func runHAKeeperClusterTest(t *testing.T, fn func(*testing.T, []*Service)) {
}()
cfg2.Fill()
service2, err := NewService(cfg2,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -216,6 +219,7 @@ func runHAKeeperClusterTest(t *testing.T, fn func(*testing.T, []*Service)) {
}()
cfg3.Fill()
service3, err := NewService(cfg3,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......@@ -226,6 +230,7 @@ func runHAKeeperClusterTest(t *testing.T, fn func(*testing.T, []*Service)) {
}()
cfg4.Fill()
service4, err := NewService(cfg4,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......
......@@ -21,6 +21,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/morpc"
pb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
"github.com/matrixorigin/matrixone/pkg/testutil"
)
func NewTestService(fs vfs.FS) (*Service, ClientConfig, error) {
......@@ -37,6 +38,7 @@ func NewTestService(fs vfs.FS) (*Service, ClientConfig, error) {
}
cfg.Fill()
service, err := NewService(cfg,
testutil.NewFS(),
WithBackendFilter(func(msg morpc.Message, backendAddr string) bool {
return true
}),
......
......@@ -126,10 +126,10 @@ type dnOptions []dnservice.Option
// newDNService initializes an instance of `DNService`.
func newDNService(
cfg *dnservice.Config,
factory fileservice.NewFileServicesFunc,
fs fileservice.FileService,
opts dnOptions,
) (DNService, error) {
svc, err := dnservice.NewService(cfg, factory, opts...)
svc, err := dnservice.NewService(cfg, fs, opts...)
if err != nil {
return nil, err
}
......
......@@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/logservice"
logpb "github.com/matrixorigin/matrixone/pkg/pb/logservice"
)
......@@ -135,9 +136,11 @@ type logOptions []logservice.Option
// newLogService constructs an instance of `LogService`.
func newLogService(
cfg logservice.Config, opts logOptions,
cfg logservice.Config,
fs fileservice.FileService,
opts logOptions,
) (LogService, error) {
svc, err := logservice.NewWrappedService(cfg, opts...)
svc, err := logservice.NewWrappedService(cfg, fs, opts...)
if err != nil {
return nil, err
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment