Skip to content
Snippets Groups Projects
Unverified Commit baec13a6 authored by reusee's avatar reusee Committed by GitHub
Browse files

fileservice: add BlockMapper (#4496)

fileservice: add BlockMapper

Approved by: @fengttt
parent 61889ac6
No related branches found
No related tags found
No related merge requests found
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileservice
import (
"encoding/binary"
"errors"
"hash/crc64"
"io"
"os"
)
// BlockMapper maps file content to blocks with CRC checksum
type BlockMapper struct {
file *os.File
blockSize int
blockContentSize int
contentOffset int64
}
const (
_ChecksumSize = 8
)
var (
crc64Table = crc64.MakeTable(crc64.ECMA)
ErrChecksumNotMatch = errors.New("checksum not match")
)
func NewBlockMapper(
file *os.File,
blockContentSize int,
) *BlockMapper {
return &BlockMapper{
file: file,
blockSize: blockContentSize + _ChecksumSize,
blockContentSize: blockContentSize,
}
}
var _ io.ReadWriteSeeker = new(BlockMapper)
func (b *BlockMapper) Read(buf []byte) (n int, err error) {
blockOffset, offsetInBlock := b.contentOffsetToBlockOffset(b.contentOffset)
data, err := b.readBlock(blockOffset)
if err != nil {
return 0, err
}
data = data[offsetInBlock:]
n = copy(buf, data)
if len(data) < b.blockContentSize && n == len(data) {
err = io.EOF
}
b.contentOffset += int64(n)
return
}
func (b *BlockMapper) Write(buf []byte) (n int, err error) {
for len(buf) > 0 {
blockOffset, offsetInBlock := b.contentOffsetToBlockOffset(b.contentOffset)
data, err := b.readBlock(blockOffset)
if err != nil {
return 0, err
}
if len(data[offsetInBlock:]) == 0 {
nAppend := len(buf)
if nAppend+len(data) > b.blockContentSize {
nAppend = b.blockContentSize - len(data)
}
data = append(data, make([]byte, nAppend)...)
}
nBytes := copy(data[offsetInBlock:], buf)
buf = buf[nBytes:]
checksum := crc64.Checksum(data, crc64Table)
checksumBytes := make([]byte, _ChecksumSize)
binary.LittleEndian.PutUint64(checksumBytes, checksum)
if _, err := b.file.WriteAt(checksumBytes, blockOffset); err != nil {
return n, err
}
if _, err := b.file.WriteAt(data, blockOffset+_ChecksumSize); err != nil {
return n, err
}
n += nBytes
b.contentOffset += int64(nBytes)
}
return
}
func (b *BlockMapper) Seek(offset int64, whence int) (int64, error) {
fileSize, err := b.file.Seek(0, io.SeekEnd)
if err != nil {
return 0, err
}
contentSize := fileSize
nBlock := ceilingDiv(contentSize, int64(b.blockSize))
contentSize -= _ChecksumSize * nBlock
switch whence {
case io.SeekStart:
b.contentOffset = offset
case io.SeekCurrent:
b.contentOffset += offset
case io.SeekEnd:
b.contentOffset += offset
}
if b.contentOffset < 0 {
b.contentOffset = 0
}
if b.contentOffset > contentSize {
b.contentOffset = contentSize
}
return b.contentOffset, nil
}
func (b *BlockMapper) contentOffsetToBlockOffset(
contentOffset int64,
) (
blockOffset int64,
offsetInBlock int64,
) {
nBlock := contentOffset / int64(b.blockContentSize)
blockOffset += nBlock * int64(b.blockSize)
offsetInBlock = contentOffset % int64(b.blockContentSize)
return
}
func (b *BlockMapper) readBlock(offset int64) ([]byte, error) {
buf := make([]byte, b.blockSize)
n, err := b.file.ReadAt(buf, offset)
if err == io.EOF {
buf = buf[:n]
} else if err != nil {
return nil, err
}
if n < _ChecksumSize {
// empty
return nil, nil
}
data := buf[_ChecksumSize:]
expectedChecksum := crc64.Checksum(data, crc64Table)
checksum := binary.LittleEndian.Uint64(buf[:_ChecksumSize])
if checksum != expectedChecksum {
return nil, ErrChecksumNotMatch
}
return data, nil
}
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileservice
import (
"crypto/rand"
"io"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestContentOffsetToBlockOffset(t *testing.T) {
mapper := NewBlockMapper(nil, 64)
blockOffset, offsetInBlock := mapper.contentOffsetToBlockOffset(0)
assert.Equal(t, int64(0), blockOffset)
assert.Equal(t, int64(0), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(1)
assert.Equal(t, int64(0), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize))
assert.Equal(t, int64(mapper.blockSize), blockOffset)
assert.Equal(t, int64(0), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize) + 1)
assert.Equal(t, int64(mapper.blockSize), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize)*2 + 1)
assert.Equal(t, int64(mapper.blockSize*2), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
blockOffset, offsetInBlock = mapper.contentOffsetToBlockOffset(int64(mapper.blockContentSize)*3 + 1)
assert.Equal(t, int64(mapper.blockSize*3), blockOffset)
assert.Equal(t, int64(1), offsetInBlock)
}
func TestBlockMapper(t *testing.T) {
blockContentSize := 8
tempDir := t.TempDir()
for i := 0; i < blockContentSize*4; i++ {
f, err := os.CreateTemp(tempDir, "*")
assert.Nil(t, err)
defer f.Close()
mapper := NewBlockMapper(f, blockContentSize)
data := make([]byte, i)
_, err = rand.Read(data)
assert.Nil(t, err)
n, err := mapper.Write(data)
assert.Nil(t, err)
assert.Equal(t, i, n)
pos, err := mapper.Seek(0, io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(0), pos)
content, err := io.ReadAll(mapper)
assert.Nil(t, err)
assert.Equal(t, data, content)
stat, err := f.Stat()
assert.Nil(t, err)
expectedSize := len(data) / blockContentSize * (blockContentSize + _ChecksumSize)
mod := len(data) % blockContentSize
if mod != 0 {
expectedSize += _ChecksumSize + mod
}
assert.Equal(t, expectedSize, int(stat.Size()))
for j := 0; j < len(data); j++ {
_, err = rand.Read(data[j:])
assert.Nil(t, err)
pos, err = mapper.Seek(int64(j), io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(j), pos)
n, err = mapper.Write(data[j:])
assert.Nil(t, err)
assert.Equal(t, len(data[j:]), n)
pos, err = mapper.Seek(0, io.SeekStart)
assert.Nil(t, err)
assert.Equal(t, int64(0), pos)
content, err = io.ReadAll(mapper)
assert.Nil(t, err)
assert.Equal(t, data, content)
}
}
}
......@@ -17,3 +17,11 @@ package fileservice
func ptrTo[T any](v T) *T {
return &v
}
func ceilingDiv[T int | int64](n, by T) T {
res := n / by
if n%by == 0 {
return res
}
return res + 1
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment