Skip to content
Snippets Groups Projects
Unverified Commit 4d80a818 authored by XuPeng-SH's avatar XuPeng-SH Committed by GitHub
Browse files

(AOE): Add first version of logstore (#893)

parent 8c4837eb
Branches
Tags
No related merge requests found
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logstore
import (
"errors"
"matrixone/pkg/logutil"
ops "matrixone/pkg/vm/engine/aoe/storage/worker"
"matrixone/pkg/vm/engine/aoe/storage/worker/base"
"sync/atomic"
"time"
)
type state = uint32
const (
stInited state = iota
stPreCheckpoint
)
type SyncerCfg struct {
Factory HBHandleFactory
Interval time.Duration
}
type HBHandleFactory = func(BufferedStore) base.IHBHandle
var (
DefaultHBInterval = time.Duration(100) * time.Millisecond
)
type BufferedStore interface {
Store
Start()
AppendEntryWithCommitId(Entry, uint64) error
Checkpoint(Entry, uint64) error
GetSyncedId() uint64
SetSyncedId(uint64)
GetCheckpointId() uint64
}
type syncHandler struct {
store *bufferedStore
}
func (h *syncHandler) OnExec() { h.store.Sync() }
func (h *syncHandler) OnStopped() {
h.store.Sync()
logutil.Infof("syncHandler Stoped at: %d", h.store.GetSyncedId())
}
type bufferedStore struct {
store
committed uint64
uncommitted uint64
checkpointed uint64
state state
syncer base.IHeartbeater
}
func NewBufferedStore(dir, name string, syncerCfg *SyncerCfg) (*bufferedStore, error) {
ss, err := New(dir, name)
if err != nil {
return nil, err
}
s := &bufferedStore{
store: *ss,
}
var handle base.IHBHandle
if syncerCfg == nil {
syncerCfg = &SyncerCfg{
Interval: DefaultHBInterval,
}
handle = &syncHandler{store: s}
} else {
handle = syncerCfg.Factory(s)
}
s.syncer = ops.NewHeartBeater(syncerCfg.Interval, handle)
return s, nil
}
func (s *bufferedStore) Start() {
s.syncer.Start()
}
func (s *bufferedStore) Close() error {
s.syncer.Stop()
return s.store.Close()
}
func (s *bufferedStore) Checkpoint(entry Entry, id uint64) error {
if !atomic.CompareAndSwapUint32(&s.state, stInited, stPreCheckpoint) {
return errors.New("Another checkpoint job is running")
}
defer atomic.StoreUint32(&s.state, stInited)
curr := atomic.LoadUint64(&s.checkpointed)
if id <= curr {
return nil
}
if err := s.store.AppendEntry(entry); err != nil {
return err
}
if err := s.Sync(); err != nil {
return err
}
atomic.StoreUint64(&s.checkpointed, id)
return nil
}
func (s *bufferedStore) GetCheckpointId() uint64 {
return atomic.LoadUint64(&s.checkpointed)
}
func (s *bufferedStore) GetSyncedId() uint64 {
return atomic.LoadUint64(&s.committed)
}
func (s *bufferedStore) SetSyncedId(id uint64) {
atomic.StoreUint64(&s.committed, id)
}
func (s *bufferedStore) AppendEntry(entry Entry) error {
panic("not supported")
}
func (s *bufferedStore) AppendEntryWithCommitId(entry Entry, commitId uint64) error {
err := s.store.AppendEntry(entry)
if err != nil {
return err
}
atomic.StoreUint64(&s.uncommitted, commitId)
if entry.GetMeta().IsFlush() {
return s.Sync()
}
return nil
}
func (s *bufferedStore) Sync() error {
uncommitted := atomic.LoadUint64(&s.uncommitted)
// if uncommitted == s.GetSyncedId() {
// return nil
// }
// if err := s.writer.Flush(); err != nil {
// return err
// }
if err := s.file.Sync(); err != nil {
return err
}
s.SetSyncedId(uncommitted)
return nil
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logstore
import "encoding/binary"
func UnmarshallEntryType(buf []byte) EntryType {
return binary.BigEndian.Uint16(buf)
}
func UnmarshallEntrySize(buf []byte) uint32 {
return binary.BigEndian.Uint32(buf)
}
func MarshallEntryTypeWithBuf(buf []byte, typ EntryType) {
binary.BigEndian.PutUint16(buf, typ)
}
func MarshallEntrySizeWithBuf(buf []byte, size uint32) {
binary.BigEndian.PutUint32(buf, size)
}
func MarshallEntryType(typ EntryType) []byte {
buf := make([]byte, EntryTypeSize)
binary.BigEndian.PutUint16(buf, typ)
return buf
}
func MarshallEntrySize(size uint32) []byte {
buf := make([]byte, EntrySizeSize)
binary.BigEndian.PutUint32(buf, size)
return buf
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logstore
import (
"fmt"
"io"
"sync"
"unsafe"
)
type Entry interface {
GetMeta() *EntryMeta
SetMeta(*EntryMeta)
GetPayload() []byte
Unmarshal([]byte) error
ReadFrom(io.Reader) (int, error)
WriteTo(io.Writer, sync.Locker) (int, error)
}
type EntryType = uint16
const (
ETInvalid EntryType = iota
ETFlush
ETCheckpoint
ETCustomizeStart
)
var (
EntryTypeSize = int(unsafe.Sizeof(ETFlush))
EntrySizeSize = int(unsafe.Sizeof(uint32(0)))
EntryMetaSize = EntryTypeSize + EntrySizeSize
FlushEntry *BaseEntry
)
func init() {
meta := &EntryMeta{
Buf: make([]byte, EntryMetaSize),
}
meta.SetType(ETFlush)
meta.SetPayloadSize(uint32(0))
FlushEntry = &BaseEntry{
Meta: meta,
Payload: make([]byte, 0),
}
}
type EntryMeta struct {
Buf []byte
}
func NewEntryMeta() *EntryMeta {
meta := &EntryMeta{
Buf: make([]byte, EntryMetaSize),
}
return meta
}
func (meta *EntryMeta) SetType(typ EntryType) {
MarshallEntryTypeWithBuf(meta.Buf[:EntryTypeSize], typ)
}
func (meta *EntryMeta) SetPayloadSize(size uint32) {
MarshallEntrySizeWithBuf(meta.Buf[EntryTypeSize:], size)
}
func (meta *EntryMeta) GetType() EntryType {
return UnmarshallEntryType(meta.Buf[:EntryTypeSize])
}
func (meta *EntryMeta) PayloadSize() uint32 {
return UnmarshallEntrySize(meta.Buf[EntryTypeSize:])
}
func (meta *EntryMeta) Size() uint32 {
return uint32(EntryMetaSize)
}
func (meta *EntryMeta) IsFlush() bool {
typ := meta.GetType()
return typ == ETFlush
}
func (meta *EntryMeta) IsCheckpoint() bool {
typ := meta.GetType()
return typ == ETCheckpoint
}
func (meta *EntryMeta) WriteTo(w io.Writer) (int, error) {
// logutil.Info(meta.String())
return w.Write(meta.Buf)
}
func (meta *EntryMeta) String() string {
s := fmt.Sprintf("<EntryMeta(%d,%d)>", meta.GetType(), meta.PayloadSize())
return s
}
func (meta *EntryMeta) ReadFrom(r io.Reader) (int, error) {
if meta.Buf == nil {
meta.Buf = make([]byte, EntryMetaSize)
}
return r.Read(meta.Buf)
}
type BaseEntry struct {
Meta *EntryMeta
Payload []byte
}
func NewBaseEntry() *BaseEntry {
e := &BaseEntry{
Meta: NewEntryMeta(),
Payload: make([]byte, 0),
}
return e
}
func NewBaseEntryWithMeta(meta *EntryMeta) *BaseEntry {
e := &BaseEntry{
Meta: meta,
Payload: make([]byte, 0),
}
return e
}
func (e *BaseEntry) GetMeta() *EntryMeta { return e.Meta }
func (e *BaseEntry) SetMeta(meta *EntryMeta) { e.Meta = meta }
func (e *BaseEntry) GetPayload() []byte { return e.Payload }
func (e *BaseEntry) Unmarshal(buf []byte) error {
e.Payload = make([]byte, len(buf))
copy(e.Payload, buf)
e.Meta.SetPayloadSize(uint32(len(buf)))
return nil
}
func (e *BaseEntry) ReadFrom(r io.Reader) (int, error) {
size := e.Meta.PayloadSize()
e.Payload = make([]byte, size)
return r.Read(e.Payload)
}
func (e *BaseEntry) WriteTo(w io.Writer, locker sync.Locker) (int, error) {
locker.Lock()
defer locker.Unlock()
n1, err := e.Meta.WriteTo(w)
if err != nil {
return n1, err
}
n2, err := w.Write(e.Payload)
if err != nil {
return n2, err
}
return n1 + n2, err
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logstore
import (
"errors"
"fmt"
"io"
"matrixone/pkg/logutil"
)
type EntryHandler = func(io.Reader, *EntryMeta) (Entry, int64, error)
type Replayer interface {
Replay(Store) error
Truncate(Store) error
RegisterEntryHandler(EntryType, EntryHandler) error
GetOffset() int64
}
type simpleReplayer struct {
uncommitted []Entry
committed []Entry
handlers map[EntryType]EntryHandler
count int
offset int64
truncOffset int64
}
func NewSimpleReplayer() *simpleReplayer {
replayer := &simpleReplayer{
uncommitted: make([]Entry, 0),
committed: make([]Entry, 0),
handlers: make(map[EntryType]EntryHandler),
}
replayer.handlers[ETFlush] = replayer.onFlush
return replayer
}
func (replayer *simpleReplayer) onFlush(r io.Reader, meta *EntryMeta) (Entry, int64, error) {
entry := NewBaseEntryWithMeta(meta)
n, err := entry.ReadFrom(r)
if err != nil {
return nil, int64(n), err
}
return entry, int64(n), nil
}
func (replayer *simpleReplayer) GetOffset() int64 {
return replayer.truncOffset
}
func (replayer *simpleReplayer) Truncate(s Store) error {
return s.Truncate(replayer.truncOffset)
}
func (replayer *simpleReplayer) RegisterEntryHandler(eType EntryType, handler EntryHandler) error {
duplicate := replayer.handlers[eType]
if duplicate != nil {
return errors.New(fmt.Sprintf("duplicate handler found for %d", eType))
}
replayer.handlers[eType] = handler
return nil
}
func (replayer *simpleReplayer) doReplay(r io.Reader) error {
meta := NewEntryMeta()
n, err := meta.ReadFrom(r)
if err != nil {
return err
}
eType := meta.GetType()
replayer.offset += int64(n)
handler := replayer.handlers[eType]
if handler == nil {
logutil.Infof("Replaying (%d, %d, %d) - %d", eType, meta.PayloadSize(), replayer.offset, replayer.count)
return errors.New(fmt.Sprintf("no handler for type: %d", eType))
}
if entry, n, err := handler(r, meta); err != nil {
return err
} else {
if n != int64(meta.PayloadSize()) {
panic(fmt.Sprintf("bad %d, %d for type %d", n, meta.PayloadSize(), eType))
}
replayer.offset += n
if !entry.GetMeta().IsFlush() {
replayer.uncommitted = append(replayer.uncommitted, entry)
} else {
replayer.committed = append(replayer.committed, replayer.uncommitted...)
replayer.uncommitted = replayer.uncommitted[:0]
replayer.truncOffset = replayer.offset
}
}
replayer.count++
return nil
}
func (replayer *simpleReplayer) Replay(s Store) error {
err := s.ForLoopEntries(replayer.doReplay)
logutil.Infof("replay count: %d", replayer.count)
return err
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logstore
import (
"encoding/binary"
"io"
"os"
"testing"
"github.com/stretchr/testify/assert"
)
var (
mockETDDL = ETCustomizeStart + 1
)
type mockDDLOp uint8
const (
mockCreateOp mockDDLOp = iota
mockDropOp
)
type mockDDLEntry struct {
BaseEntry
}
func newMockDDLEntry(op mockDDLOp, data []byte) *mockDDLEntry {
payload := make([]byte, len(data)+1)
payload[0] = byte(op)
copy(payload[1:], data)
entry := newEmptyDDLEntry(nil)
entry.Unmarshal(payload)
entry.Meta.SetType(mockETDDL)
entry.Meta.SetPayloadSize(uint32(len(payload)))
return entry
}
func newEmptyDDLEntry(meta *EntryMeta) *mockDDLEntry {
entry := new(mockDDLEntry)
if meta == nil {
entry.BaseEntry = *NewBaseEntry()
entry.Meta.SetType(mockETDDL)
} else {
entry.BaseEntry = *NewBaseEntryWithMeta(meta)
}
return entry
}
func mockETDDLHandler(r io.Reader, meta *EntryMeta) (Entry, int64, error) {
entry := newEmptyDDLEntry(meta)
n, err := entry.ReadFrom(r)
if err != nil {
return nil, int64(n), err
}
return entry, int64(n), err
}
func TestStore(t *testing.T) {
dir := "/tmp/teststore"
os.RemoveAll(dir)
name := "sstore"
store, err := New(dir, name)
assert.Nil(t, err)
buf := make([]byte, 8)
step := 5
uncommitted := make([]Entry, 0)
committed := make([]Entry, 0)
for i := 0; i < 13; i++ {
binary.BigEndian.PutUint64(buf, uint64(i))
e := newMockDDLEntry(mockCreateOp, buf)
err = store.AppendEntry(e)
assert.Nil(t, err)
uncommitted = append(uncommitted, e)
if i%step == step-1 {
err = store.Sync()
assert.Nil(t, err)
committed = append(committed, uncommitted...)
uncommitted = uncommitted[:0]
}
}
store.Close()
store, err = New(dir, name)
assert.Nil(t, err)
defer store.Close()
replayer := NewSimpleReplayer()
err = replayer.RegisterEntryHandler(mockETDDL, mockETDDLHandler)
assert.Nil(t, err)
err = replayer.Replay(store)
assert.Nil(t, err)
t.Log(replayer.GetOffset())
assert.Equal(t, len(committed), len(replayer.committed))
err = replayer.Truncate(store)
assert.Nil(t, err)
}
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logstore
import (
"errors"
"io"
"os"
"path/filepath"
"sync"
)
type StoreEntryHandler = func(io.Reader) error
type Store interface {
io.Closer
AppendEntry(Entry) error
Sync() error
ForLoopEntries(StoreEntryHandler) error
Truncate(int64) error
}
type store struct {
mu sync.RWMutex
dir string
name string
file *os.File
pos int64
writer io.Writer
// reader *bufio.Reader
reader io.Reader
}
func New(dir, name string) (*store, error) {
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err = os.MkdirAll(dir, os.ModePerm); err != nil {
return nil, err
}
}
f, err := os.OpenFile(filepath.Join(dir, name), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
return nil, err
}
s := &store{
dir: dir,
name: name,
file: f,
}
stats, err := s.file.Stat()
if err != nil {
f.Close()
return nil, err
}
s.pos = stats.Size()
// s.writer = bufio.NewWriter(s.file)
s.writer = s.file
// s.reader = bufio.NewReader(s.file)
s.reader = s.file
return s, err
}
func (s *store) Close() error {
// if err := s.writer.Flush(); err != nil {
// return err
// }
if err := s.file.Sync(); err != nil {
return err
}
return s.file.Close()
}
func (s *store) Truncate(size int64) error {
return s.file.Truncate(size)
}
func (s *store) AppendEntry(entry Entry) error {
if _, err := entry.WriteTo(s.writer, &s.mu); err != nil {
return err
}
// logutil.Infof("WriteEntry %d, Size=%d", entry.Type(), entry.Size())
return nil
}
func (s *store) Sync() error {
if err := s.AppendEntry(FlushEntry); err != nil {
return err
}
// if err := s.writer.Flush(); err != nil {
// return err
// }
err := s.file.Sync()
return err
}
func (s *store) ForLoopEntries(handler StoreEntryHandler) error {
for {
if err := handler(s.reader); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment