Skip to content
Snippets Groups Projects
Unverified Commit 4b792839 authored by reusee's avatar reusee Committed by GitHub
Browse files

fileservice: various changes (#4511)

fileservice: refactor testBlockMapper

fileservice: use CRC32 since most CPU has crc32 instructions

fileservice: rename BlockMappable to FileLike

fileservice: rename BlockMapper to FileWithChecksum

fileservice: ignore ignored io entries in S3 bytes-range calculation

Approved by: @lni
parent 19d6c553
No related branches found
No related tags found
No related merge requests found
......@@ -14,34 +14,10 @@
package fileservice
import "math"
import "io"
type IOVector struct {
// path to file, '/' separated
FilePath string
// io entries
// empty entry not allowed
Entries []IOEntry
}
func (i IOVector) offsetRange() (
min int,
max int,
readToEnd bool,
) {
min = math.MaxInt
max = 0
for _, entry := range i.Entries {
if entry.Offset < min {
min = entry.Offset
}
if entry.Size < 0 {
entry.Size = 0
readToEnd = true
}
if end := entry.Offset + entry.Size; end > max {
max = end
}
}
return
type FileLike interface {
io.ReadWriteSeeker
io.WriterAt
io.ReaderAt
}
......@@ -42,6 +42,14 @@ type FileService interface {
Delete(ctx context.Context, filePath string) error
}
type IOVector struct {
// path to file, '/' separated
FilePath string
// io entries
// empty entry not allowed
Entries []IOEntry
}
type IOEntry struct {
// offset in file, [0, len(file) - 1]
Offset int
......
......@@ -17,13 +17,13 @@ package fileservice
import (
"encoding/binary"
"errors"
"hash/crc64"
"hash/crc32"
"io"
"os"
)
// BlockMapper maps file content to blocks with CRC checksum
type BlockMapper[T BlockMappable] struct {
// FileWithChecksum maps file contents to blocks with checksum
type FileWithChecksum[T FileLike] struct {
underlying T
blockSize int
blockContentSize int
......@@ -31,40 +31,34 @@ type BlockMapper[T BlockMappable] struct {
}
const (
_ChecksumSize = 8
_ChecksumSize = crc32.Size
_BlockContentSize = 2048 - _ChecksumSize
)
var (
crc64Table = crc64.MakeTable(crc64.ECMA)
crcTable = crc32.MakeTable(crc32.Castagnoli)
ErrChecksumNotMatch = errors.New("checksum not match")
)
type BlockMappable interface {
io.ReadWriteSeeker
io.WriterAt
io.ReaderAt
}
func NewBlockMapper[T BlockMappable](
func NewFileWithChecksum[T FileLike](
underlying T,
blockContentSize int,
) *BlockMapper[T] {
return &BlockMapper[T]{
) *FileWithChecksum[T] {
return &FileWithChecksum[T]{
underlying: underlying,
blockSize: blockContentSize + _ChecksumSize,
blockContentSize: blockContentSize,
}
}
var _ BlockMappable = new(BlockMapper[*os.File])
var _ FileLike = new(FileWithChecksum[*os.File])
func (b *BlockMapper[T]) ReadAt(buf []byte, offset int64) (n int, err error) {
func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error) {
for len(buf) > 0 {
blockOffset, offsetInBlock := b.contentOffsetToBlockOffset(offset)
blockOffset, offsetInBlock := f.contentOffsetToBlockOffset(offset)
var data []byte
data, err = b.readBlock(blockOffset)
data, err = f.readBlock(blockOffset)
if err != nil && err != io.EOF {
// read error
return
......@@ -86,25 +80,25 @@ func (b *BlockMapper[T]) ReadAt(buf []byte, offset int64) (n int, err error) {
return
}
func (b *BlockMapper[T]) Read(buf []byte) (n int, err error) {
n, err = b.ReadAt(buf, b.contentOffset)
b.contentOffset += int64(n)
func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error) {
n, err = f.ReadAt(buf, f.contentOffset)
f.contentOffset += int64(n)
return
}
func (b *BlockMapper[T]) WriteAt(buf []byte, offset int64) (n int, err error) {
func (f *FileWithChecksum[T]) WriteAt(buf []byte, offset int64) (n int, err error) {
for len(buf) > 0 {
blockOffset, offsetInBlock := b.contentOffsetToBlockOffset(offset)
data, err := b.readBlock(blockOffset)
blockOffset, offsetInBlock := f.contentOffsetToBlockOffset(offset)
data, err := f.readBlock(blockOffset)
if err != nil && err != io.EOF {
return 0, err
}
if len(data[offsetInBlock:]) == 0 {
nAppend := len(buf)
if nAppend+len(data) > b.blockContentSize {
nAppend = b.blockContentSize - len(data)
if nAppend+len(data) > f.blockContentSize {
nAppend = f.blockContentSize - len(data)
}
data = append(data, make([]byte, nAppend)...)
}
......@@ -112,14 +106,14 @@ func (b *BlockMapper[T]) WriteAt(buf []byte, offset int64) (n int, err error) {
nBytes := copy(data[offsetInBlock:], buf)
buf = buf[nBytes:]
checksum := crc64.Checksum(data, crc64Table)
checksum := crc32.Checksum(data, crcTable)
checksumBytes := make([]byte, _ChecksumSize)
binary.LittleEndian.PutUint64(checksumBytes, checksum)
if _, err := b.underlying.WriteAt(checksumBytes, blockOffset); err != nil {
binary.LittleEndian.PutUint32(checksumBytes, checksum)
if _, err := f.underlying.WriteAt(checksumBytes, blockOffset); err != nil {
return n, err
}
if _, err := b.underlying.WriteAt(data, blockOffset+_ChecksumSize); err != nil {
if _, err := f.underlying.WriteAt(data, blockOffset+_ChecksumSize); err != nil {
return n, err
}
......@@ -130,61 +124,61 @@ func (b *BlockMapper[T]) WriteAt(buf []byte, offset int64) (n int, err error) {
return
}
func (b *BlockMapper[T]) Write(buf []byte) (n int, err error) {
n, err = b.WriteAt(buf, b.contentOffset)
b.contentOffset += int64(n)
func (f *FileWithChecksum[T]) Write(buf []byte) (n int, err error) {
n, err = f.WriteAt(buf, f.contentOffset)
f.contentOffset += int64(n)
return
}
func (b *BlockMapper[T]) Seek(offset int64, whence int) (int64, error) {
func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error) {
fileSize, err := b.underlying.Seek(0, io.SeekEnd)
fileSize, err := f.underlying.Seek(0, io.SeekEnd)
if err != nil {
return 0, err
}
contentSize := fileSize
nBlock := ceilingDiv(contentSize, int64(b.blockSize))
nBlock := ceilingDiv(contentSize, int64(f.blockSize))
contentSize -= _ChecksumSize * nBlock
switch whence {
case io.SeekStart:
b.contentOffset = offset
f.contentOffset = offset
case io.SeekCurrent:
b.contentOffset += offset
f.contentOffset += offset
case io.SeekEnd:
b.contentOffset = contentSize + offset
f.contentOffset = contentSize + offset
}
if b.contentOffset < 0 {
b.contentOffset = 0
if f.contentOffset < 0 {
f.contentOffset = 0
}
if b.contentOffset > contentSize {
b.contentOffset = contentSize
if f.contentOffset > contentSize {
f.contentOffset = contentSize
}
return b.contentOffset, nil
return f.contentOffset, nil
}
func (b *BlockMapper[T]) contentOffsetToBlockOffset(
func (f *FileWithChecksum[T]) contentOffsetToBlockOffset(
contentOffset int64,
) (
blockOffset int64,
offsetInBlock int64,
) {
nBlock := contentOffset / int64(b.blockContentSize)
blockOffset += nBlock * int64(b.blockSize)
nBlock := contentOffset / int64(f.blockContentSize)
blockOffset += nBlock * int64(f.blockSize)
offsetInBlock = contentOffset % int64(b.blockContentSize)
offsetInBlock = contentOffset % int64(f.blockContentSize)
return
}
func (b *BlockMapper[T]) readBlock(offset int64) (data []byte, err error) {
func (f *FileWithChecksum[T]) readBlock(offset int64) (data []byte, err error) {
data = make([]byte, b.blockSize)
n, err := b.underlying.ReadAt(data, offset)
data = make([]byte, f.blockSize)
n, err := f.underlying.ReadAt(data, offset)
data = data[:n]
if err != nil && err != io.EOF {
return nil, err
......@@ -195,10 +189,10 @@ func (b *BlockMapper[T]) readBlock(offset int64) (data []byte, err error) {
return
}
checksum := binary.LittleEndian.Uint64(data[:_ChecksumSize])
checksum := binary.LittleEndian.Uint32(data[:_ChecksumSize])
data = data[_ChecksumSize:]
expectedChecksum := crc64.Checksum(data, crc64Table)
expectedChecksum := crc32.Checksum(data, crcTable)
if checksum != expectedChecksum {
return nil, ErrChecksumNotMatch
}
......
......@@ -24,42 +24,42 @@ import (
"github.com/stretchr/testify/assert"
)
func TestContentOffsetToBlockOffset(t *testing.T) {
mapper := NewBlockMapper[*os.File](nil, 64)
func TestFileWithChecksumOffsets(t *testing.T) {
f := NewFileWithChecksum[*os.File](nil, 64)
blockOffset, offsetInBlock := mapper.contentOffsetToBlockOffset(0)
blockOffset, offsetInBlock := f.contentOffsetToBlockOffset(0)
assert.Equal(t, int64(0), blockOffset)
assert.Equal(t, int64(0), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(1)
blockOffset, offsetInBlock = f.contentOffsetToBlockOffset(1)
assert.Equal(t, int64(0), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize))
assert.Equal(t, int64(mapper.blockSize), blockOffset)
blockOffset, offsetInBlock = f.contentOffsetToBlockOffset(int64(f.blockContentSize))
assert.Equal(t, int64(f.blockSize), blockOffset)
assert.Equal(t, int64(0), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize) + 1)
assert.Equal(t, int64(mapper.blockSize), blockOffset)
blockOffset, offsetInBlock = f.contentOffsetToBlockOffset(int64(f.blockContentSize) + 1)
assert.Equal(t, int64(f.blockSize), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize)*2 + 1)
assert.Equal(t, int64(mapper.blockSize*2), blockOffset)
blockOffset, offsetInBlock = f.contentOffsetToBlockOffset(int64(f.blockContentSize)*2 + 1)
assert.Equal(t, int64(f.blockSize*2), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize)*3 + 1)
assert.Equal(t, int64(mapper.blockSize*3), blockOffset)
blockOffset, offsetInBlock = f.contentOffsetToBlockOffset(int64(f.blockContentSize)*3 + 1)
assert.Equal(t, int64(f.blockSize*3), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
}
func TestBlockMapper(t *testing.T) {
func TestFileWithChecksum(t *testing.T) {
blockContentSize := 8
tempDir := t.TempDir()
testBlockMapper(
testFileWithChecksum(
t,
blockContentSize,
func() BlockMappable {
func() FileLike {
f, err := os.CreateTemp(tempDir, "*")
assert.Nil(t, err)
t.Cleanup(func() {
......@@ -70,16 +70,36 @@ func TestBlockMapper(t *testing.T) {
)
}
func testBlockMapper(
func testFileWithChecksum(
t *testing.T,
blockContentSize int,
newUnderlying func() BlockMappable,
newUnderlying func() FileLike,
) {
for i := 0; i < blockContentSize*4; i++ {
underlying := newUnderlying()
mapper := NewBlockMapper(underlying, blockContentSize)
fileWithChecksum := NewFileWithChecksum(underlying, blockContentSize)
check := func(data []byte) {
// check content
pos, err := fileWithChecksum.Seek(0, io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(0), pos)
content, err := io.ReadAll(fileWithChecksum)
assert.Nil(t, err)
assert.Equal(t, data, content)
// iotest
pos, err = fileWithChecksum.Seek(0, io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(0), pos)
err = iotest.TestReader(fileWithChecksum, data)
if err != nil {
t.Logf("%s", err)
}
assert.Nil(t, err)
}
// random bytes
data := make([]byte, i)
......@@ -87,18 +107,10 @@ func testBlockMapper(
assert.Nil(t, err)
// write
n, err := mapper.Write(data)
n, err := fileWithChecksum.Write(data)
assert.Nil(t, err)
assert.Equal(t, i, n)
// check content
pos, err := mapper.Seek(0, io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(0), pos)
content, err := io.ReadAll(mapper)
assert.Nil(t, err)
assert.Equal(t, data, content)
// underlying size
underlyingSize, err := underlying.Seek(0, io.SeekEnd)
assert.Nil(t, err)
......@@ -109,66 +121,52 @@ func testBlockMapper(
}
assert.Equal(t, expectedSize, int(underlyingSize))
// iotest
pos, err = mapper.Seek(0, io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(0), pos)
err = iotest.TestReader(mapper, data)
if err != nil {
t.Logf("%s", err)
}
assert.Nil(t, err)
check(data)
for j := 0; j < len(data); j++ {
// seek and write random bytes
_, err = rand.Read(data[j:])
assert.Nil(t, err)
pos, err = mapper.Seek(int64(j), io.SeekStart)
pos, err := fileWithChecksum.Seek(int64(j), io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(j), pos)
n, err = mapper.Write(data[j:])
n, err = fileWithChecksum.Write(data[j:])
assert.Nil(t, err)
assert.Equal(t, len(data[j:]), n)
// check content
pos, err = mapper.Seek(0, io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(0), pos)
content, err = io.ReadAll(mapper)
assert.Nil(t, err)
assert.Equal(t, data, content)
// seek and read
pos, err = mapper.Seek(int64(j), io.SeekStart)
pos, err = fileWithChecksum.Seek(int64(j), io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(j), pos)
content, err = io.ReadAll(mapper)
content, err := io.ReadAll(fileWithChecksum)
assert.Nil(t, err)
assert.Equal(t, data[j:], content)
check(data)
}
}
}
func TestMultiLayerBlockMapper(t *testing.T) {
func TestMultiLayerFileWithChecksum(t *testing.T) {
blockContentSize := 8
tempDir := t.TempDir()
testBlockMapper(
testFileWithChecksum(
t,
blockContentSize,
func() BlockMappable {
func() FileLike {
f, err := os.CreateTemp(tempDir, "*")
assert.Nil(t, err)
t.Cleanup(func() {
f.Close()
})
m := NewBlockMapper(f, blockContentSize)
m2 := NewBlockMapper(m, blockContentSize)
m3 := NewBlockMapper(m2, blockContentSize)
return m3
f2 := NewFileWithChecksum(f, blockContentSize)
f3 := NewFileWithChecksum(f2, blockContentSize)
f4 := NewFileWithChecksum(f3, blockContentSize)
return f4
},
)
}
......@@ -137,8 +137,8 @@ func (l *LocalFS) write(ctx context.Context, vector IOVector) error {
if err != nil {
return err
}
mapper := NewBlockMapper(f, _BlockContentSize)
n, err := io.Copy(mapper, newIOEntriesReader(vector.Entries))
fileWithChecksum := NewFileWithChecksum(f, _BlockContentSize)
n, err := io.Copy(fileWithChecksum, newIOEntriesReader(vector.Entries))
if err != nil {
return err
}
......@@ -211,15 +211,15 @@ func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
}
if entry.WriterForRead != nil {
mapper := NewBlockMapper(file, _BlockContentSize)
fileWithChecksum := NewFileWithChecksum(file, _BlockContentSize)
if entry.Offset > 0 {
_, err := mapper.Seek(int64(entry.Offset), io.SeekStart)
_, err := fileWithChecksum.Seek(int64(entry.Offset), io.SeekStart)
if err != nil {
return err
}
}
r := (io.Reader)(mapper)
r := (io.Reader)(fileWithChecksum)
if entry.Size > 0 {
r = io.LimitReader(r, int64(entry.Size))
}
......@@ -257,15 +257,15 @@ func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
if err != nil {
return nil
}
mapper := NewBlockMapper(file, _BlockContentSize)
fileWithChecksum := NewFileWithChecksum(file, _BlockContentSize)
if entry.Offset > 0 {
_, err := mapper.Seek(int64(entry.Offset), io.SeekStart)
_, err := fileWithChecksum.Seek(int64(entry.Offset), io.SeekStart)
if err != nil {
return err
}
}
r := (io.Reader)(mapper)
r := (io.Reader)(fileWithChecksum)
if entry.Size > 0 {
r = io.LimitReader(r, int64(entry.Size))
}
......@@ -294,15 +294,15 @@ func (l *LocalFS) read(ctx context.Context, vector *IOVector) error {
}
} else {
mapper := NewBlockMapper(file, _BlockContentSize)
fileWithChecksum := NewFileWithChecksum(file, _BlockContentSize)
if entry.Offset > 0 {
_, err := mapper.Seek(int64(entry.Offset), io.SeekStart)
_, err := fileWithChecksum.Seek(int64(entry.Offset), io.SeekStart)
if err != nil {
return err
}
}
r := (io.Reader)(mapper)
r := (io.Reader)(fileWithChecksum)
if entry.Size > 0 {
r = io.LimitReader(r, int64(entry.Size))
}
......@@ -489,14 +489,14 @@ func (l *LocalFS) NewMutator(filePath string) (Mutator, error) {
return nil, ErrFileNotFound
}
return &_LocalFSMutator{
f: f,
mapper: NewBlockMapper(f, _BlockContentSize),
osFile: f,
fileWithChecksum: NewFileWithChecksum(f, _BlockContentSize),
}, nil
}
type _LocalFSMutator struct {
f *os.File
mapper *BlockMapper[*os.File]
osFile *os.File
fileWithChecksum *FileWithChecksum[*os.File]
}
func (l *_LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error {
......@@ -506,11 +506,11 @@ func (l *_LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error
if entry.ReaderForWrite != nil {
// seek and copy
_, err := l.mapper.Seek(int64(entry.Offset), 0)
_, err := l.fileWithChecksum.Seek(int64(entry.Offset), 0)
if err != nil {
return err
}
n, err := io.Copy(l.mapper, entry.ReaderForWrite)
n, err := io.Copy(l.fileWithChecksum, entry.ReaderForWrite)
if err != nil {
return err
}
......@@ -520,7 +520,7 @@ func (l *_LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error
} else {
// WriteAt
n, err := l.mapper.WriteAt(entry.Data, int64(entry.Offset))
n, err := l.fileWithChecksum.WriteAt(entry.Data, int64(entry.Offset))
if err != nil {
return err
}
......@@ -536,12 +536,12 @@ func (l *_LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error
func (l *_LocalFSMutator) Close() error {
// sync
if err := l.f.Sync(); err != nil {
if err := l.osFile.Sync(); err != nil {
return err
}
// close
if err := l.f.Close(); err != nil {
if err := l.osFile.Close(); err != nil {
return err
}
......
......@@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/url"
"path"
"sort"
......@@ -283,7 +284,24 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) error {
func (s *S3FS) read(ctx context.Context, vector *IOVector) error {
min, max, readToEnd := vector.offsetRange()
min := math.MaxInt
max := 0
readToEnd := false
for _, entry := range vector.Entries {
if entry.ignore {
continue
}
if entry.Offset < min {
min = entry.Offset
}
if entry.Size < 0 {
entry.Size = 0
readToEnd = true
}
if end := entry.Offset + entry.Size; end > max {
max = end
}
}
var content []byte
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment