From 4d80a8184e2a3881c20507c1aa35a482e36eb8e4 Mon Sep 17 00:00:00 2001 From: XuPeng-SH <xupeng3112@163.com> Date: Thu, 30 Sep 2021 15:59:46 +0800 Subject: [PATCH] (AOE): Add first version of logstore (#893) --- .../engine/aoe/storage/logstore/buffered.go | 163 +++++++++++++++++ pkg/vm/engine/aoe/storage/logstore/codec.go | 45 +++++ pkg/vm/engine/aoe/storage/logstore/entry.go | 169 ++++++++++++++++++ .../engine/aoe/storage/logstore/replayer.go | 114 ++++++++++++ .../engine/aoe/storage/logstore/store_test.go | 112 ++++++++++++ pkg/vm/engine/aoe/storage/logstore/types.go | 116 ++++++++++++ 6 files changed, 719 insertions(+) create mode 100644 pkg/vm/engine/aoe/storage/logstore/buffered.go create mode 100644 pkg/vm/engine/aoe/storage/logstore/codec.go create mode 100644 pkg/vm/engine/aoe/storage/logstore/entry.go create mode 100644 pkg/vm/engine/aoe/storage/logstore/replayer.go create mode 100644 pkg/vm/engine/aoe/storage/logstore/store_test.go create mode 100644 pkg/vm/engine/aoe/storage/logstore/types.go diff --git a/pkg/vm/engine/aoe/storage/logstore/buffered.go b/pkg/vm/engine/aoe/storage/logstore/buffered.go new file mode 100644 index 000000000..7178c6ed4 --- /dev/null +++ b/pkg/vm/engine/aoe/storage/logstore/buffered.go @@ -0,0 +1,163 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logstore + +import ( + "errors" + "matrixone/pkg/logutil" + ops "matrixone/pkg/vm/engine/aoe/storage/worker" + "matrixone/pkg/vm/engine/aoe/storage/worker/base" + "sync/atomic" + "time" +) + +type state = uint32 + +const ( + stInited state = iota + stPreCheckpoint +) + +type SyncerCfg struct { + Factory HBHandleFactory + Interval time.Duration +} + +type HBHandleFactory = func(BufferedStore) base.IHBHandle + +var ( + DefaultHBInterval = time.Duration(100) * time.Millisecond +) + +type BufferedStore interface { + Store + Start() + AppendEntryWithCommitId(Entry, uint64) error + Checkpoint(Entry, uint64) error + GetSyncedId() uint64 + SetSyncedId(uint64) + GetCheckpointId() uint64 +} + +type syncHandler struct { + store *bufferedStore +} + +func (h *syncHandler) OnExec() { h.store.Sync() } +func (h *syncHandler) OnStopped() { + h.store.Sync() + logutil.Infof("syncHandler Stoped at: %d", h.store.GetSyncedId()) +} + +type bufferedStore struct { + store + committed uint64 + uncommitted uint64 + checkpointed uint64 + state state + syncer base.IHeartbeater +} + +func NewBufferedStore(dir, name string, syncerCfg *SyncerCfg) (*bufferedStore, error) { + ss, err := New(dir, name) + if err != nil { + return nil, err + } + s := &bufferedStore{ + store: *ss, + } + var handle base.IHBHandle + if syncerCfg == nil { + syncerCfg = &SyncerCfg{ + Interval: DefaultHBInterval, + } + handle = &syncHandler{store: s} + } else { + handle = syncerCfg.Factory(s) + } + s.syncer = ops.NewHeartBeater(syncerCfg.Interval, handle) + return s, nil +} + +func (s *bufferedStore) Start() { + s.syncer.Start() +} + +func (s *bufferedStore) Close() error { + s.syncer.Stop() + return s.store.Close() +} + +func (s *bufferedStore) Checkpoint(entry Entry, id uint64) error { + if !atomic.CompareAndSwapUint32(&s.state, stInited, stPreCheckpoint) { + return errors.New("Another checkpoint job is running") + } + defer atomic.StoreUint32(&s.state, stInited) + curr := atomic.LoadUint64(&s.checkpointed) + if id <= curr { + return nil + } + if err := s.store.AppendEntry(entry); err != nil { + return err + } + if err := s.Sync(); err != nil { + return err + } + atomic.StoreUint64(&s.checkpointed, id) + return nil +} + +func (s *bufferedStore) GetCheckpointId() uint64 { + return atomic.LoadUint64(&s.checkpointed) +} + +func (s *bufferedStore) GetSyncedId() uint64 { + return atomic.LoadUint64(&s.committed) +} + +func (s *bufferedStore) SetSyncedId(id uint64) { + atomic.StoreUint64(&s.committed, id) +} + +func (s *bufferedStore) AppendEntry(entry Entry) error { + panic("not supported") +} + +func (s *bufferedStore) AppendEntryWithCommitId(entry Entry, commitId uint64) error { + err := s.store.AppendEntry(entry) + if err != nil { + return err + } + atomic.StoreUint64(&s.uncommitted, commitId) + if entry.GetMeta().IsFlush() { + return s.Sync() + } + return nil +} + +func (s *bufferedStore) Sync() error { + uncommitted := atomic.LoadUint64(&s.uncommitted) + // if uncommitted == s.GetSyncedId() { + // return nil + // } + // if err := s.writer.Flush(); err != nil { + // return err + // } + if err := s.file.Sync(); err != nil { + return err + } + s.SetSyncedId(uncommitted) + return nil +} diff --git a/pkg/vm/engine/aoe/storage/logstore/codec.go b/pkg/vm/engine/aoe/storage/logstore/codec.go new file mode 100644 index 000000000..b2b7d98f7 --- /dev/null +++ b/pkg/vm/engine/aoe/storage/logstore/codec.go @@ -0,0 +1,45 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logstore + +import "encoding/binary" + +func UnmarshallEntryType(buf []byte) EntryType { + return binary.BigEndian.Uint16(buf) +} + +func UnmarshallEntrySize(buf []byte) uint32 { + return binary.BigEndian.Uint32(buf) +} + +func MarshallEntryTypeWithBuf(buf []byte, typ EntryType) { + binary.BigEndian.PutUint16(buf, typ) +} + +func MarshallEntrySizeWithBuf(buf []byte, size uint32) { + binary.BigEndian.PutUint32(buf, size) +} + +func MarshallEntryType(typ EntryType) []byte { + buf := make([]byte, EntryTypeSize) + binary.BigEndian.PutUint16(buf, typ) + return buf +} + +func MarshallEntrySize(size uint32) []byte { + buf := make([]byte, EntrySizeSize) + binary.BigEndian.PutUint32(buf, size) + return buf +} diff --git a/pkg/vm/engine/aoe/storage/logstore/entry.go b/pkg/vm/engine/aoe/storage/logstore/entry.go new file mode 100644 index 000000000..9006ba1b1 --- /dev/null +++ b/pkg/vm/engine/aoe/storage/logstore/entry.go @@ -0,0 +1,169 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logstore + +import ( + "fmt" + "io" + "sync" + "unsafe" +) + +type Entry interface { + GetMeta() *EntryMeta + SetMeta(*EntryMeta) + GetPayload() []byte + Unmarshal([]byte) error + ReadFrom(io.Reader) (int, error) + WriteTo(io.Writer, sync.Locker) (int, error) +} + +type EntryType = uint16 + +const ( + ETInvalid EntryType = iota + ETFlush + ETCheckpoint + ETCustomizeStart +) + +var ( + EntryTypeSize = int(unsafe.Sizeof(ETFlush)) + EntrySizeSize = int(unsafe.Sizeof(uint32(0))) + EntryMetaSize = EntryTypeSize + EntrySizeSize + + FlushEntry *BaseEntry +) + +func init() { + meta := &EntryMeta{ + Buf: make([]byte, EntryMetaSize), + } + meta.SetType(ETFlush) + meta.SetPayloadSize(uint32(0)) + FlushEntry = &BaseEntry{ + Meta: meta, + Payload: make([]byte, 0), + } +} + +type EntryMeta struct { + Buf []byte +} + +func NewEntryMeta() *EntryMeta { + meta := &EntryMeta{ + Buf: make([]byte, EntryMetaSize), + } + return meta +} + +func (meta *EntryMeta) SetType(typ EntryType) { + MarshallEntryTypeWithBuf(meta.Buf[:EntryTypeSize], typ) +} + +func (meta *EntryMeta) SetPayloadSize(size uint32) { + MarshallEntrySizeWithBuf(meta.Buf[EntryTypeSize:], size) +} + +func (meta *EntryMeta) GetType() EntryType { + return UnmarshallEntryType(meta.Buf[:EntryTypeSize]) +} + +func (meta *EntryMeta) PayloadSize() uint32 { + return UnmarshallEntrySize(meta.Buf[EntryTypeSize:]) +} + +func (meta *EntryMeta) Size() uint32 { + return uint32(EntryMetaSize) +} + +func (meta *EntryMeta) IsFlush() bool { + typ := meta.GetType() + return typ == ETFlush +} + +func (meta *EntryMeta) IsCheckpoint() bool { + typ := meta.GetType() + return typ == ETCheckpoint +} + +func (meta *EntryMeta) WriteTo(w io.Writer) (int, error) { + // logutil.Info(meta.String()) + return w.Write(meta.Buf) +} + +func (meta *EntryMeta) String() string { + s := fmt.Sprintf("<EntryMeta(%d,%d)>", meta.GetType(), meta.PayloadSize()) + return s +} + +func (meta *EntryMeta) ReadFrom(r io.Reader) (int, error) { + if meta.Buf == nil { + meta.Buf = make([]byte, EntryMetaSize) + } + return r.Read(meta.Buf) +} + +type BaseEntry struct { + Meta *EntryMeta + Payload []byte +} + +func NewBaseEntry() *BaseEntry { + e := &BaseEntry{ + Meta: NewEntryMeta(), + Payload: make([]byte, 0), + } + return e +} + +func NewBaseEntryWithMeta(meta *EntryMeta) *BaseEntry { + e := &BaseEntry{ + Meta: meta, + Payload: make([]byte, 0), + } + return e +} + +func (e *BaseEntry) GetMeta() *EntryMeta { return e.Meta } +func (e *BaseEntry) SetMeta(meta *EntryMeta) { e.Meta = meta } +func (e *BaseEntry) GetPayload() []byte { return e.Payload } +func (e *BaseEntry) Unmarshal(buf []byte) error { + e.Payload = make([]byte, len(buf)) + copy(e.Payload, buf) + e.Meta.SetPayloadSize(uint32(len(buf))) + return nil +} + +func (e *BaseEntry) ReadFrom(r io.Reader) (int, error) { + size := e.Meta.PayloadSize() + e.Payload = make([]byte, size) + return r.Read(e.Payload) +} + +func (e *BaseEntry) WriteTo(w io.Writer, locker sync.Locker) (int, error) { + locker.Lock() + defer locker.Unlock() + n1, err := e.Meta.WriteTo(w) + if err != nil { + return n1, err + } + n2, err := w.Write(e.Payload) + if err != nil { + return n2, err + } + return n1 + n2, err +} diff --git a/pkg/vm/engine/aoe/storage/logstore/replayer.go b/pkg/vm/engine/aoe/storage/logstore/replayer.go new file mode 100644 index 000000000..f3ecdf695 --- /dev/null +++ b/pkg/vm/engine/aoe/storage/logstore/replayer.go @@ -0,0 +1,114 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logstore + +import ( + "errors" + "fmt" + "io" + "matrixone/pkg/logutil" +) + +type EntryHandler = func(io.Reader, *EntryMeta) (Entry, int64, error) + +type Replayer interface { + Replay(Store) error + Truncate(Store) error + RegisterEntryHandler(EntryType, EntryHandler) error + GetOffset() int64 +} + +type simpleReplayer struct { + uncommitted []Entry + committed []Entry + handlers map[EntryType]EntryHandler + count int + offset int64 + truncOffset int64 +} + +func NewSimpleReplayer() *simpleReplayer { + replayer := &simpleReplayer{ + uncommitted: make([]Entry, 0), + committed: make([]Entry, 0), + handlers: make(map[EntryType]EntryHandler), + } + replayer.handlers[ETFlush] = replayer.onFlush + return replayer +} + +func (replayer *simpleReplayer) onFlush(r io.Reader, meta *EntryMeta) (Entry, int64, error) { + entry := NewBaseEntryWithMeta(meta) + n, err := entry.ReadFrom(r) + if err != nil { + return nil, int64(n), err + } + return entry, int64(n), nil +} + +func (replayer *simpleReplayer) GetOffset() int64 { + return replayer.truncOffset +} + +func (replayer *simpleReplayer) Truncate(s Store) error { + return s.Truncate(replayer.truncOffset) +} + +func (replayer *simpleReplayer) RegisterEntryHandler(eType EntryType, handler EntryHandler) error { + duplicate := replayer.handlers[eType] + if duplicate != nil { + return errors.New(fmt.Sprintf("duplicate handler found for %d", eType)) + } + replayer.handlers[eType] = handler + return nil +} + +func (replayer *simpleReplayer) doReplay(r io.Reader) error { + meta := NewEntryMeta() + n, err := meta.ReadFrom(r) + if err != nil { + return err + } + eType := meta.GetType() + replayer.offset += int64(n) + handler := replayer.handlers[eType] + if handler == nil { + logutil.Infof("Replaying (%d, %d, %d) - %d", eType, meta.PayloadSize(), replayer.offset, replayer.count) + return errors.New(fmt.Sprintf("no handler for type: %d", eType)) + } + if entry, n, err := handler(r, meta); err != nil { + return err + } else { + if n != int64(meta.PayloadSize()) { + panic(fmt.Sprintf("bad %d, %d for type %d", n, meta.PayloadSize(), eType)) + } + replayer.offset += n + if !entry.GetMeta().IsFlush() { + replayer.uncommitted = append(replayer.uncommitted, entry) + } else { + replayer.committed = append(replayer.committed, replayer.uncommitted...) + replayer.uncommitted = replayer.uncommitted[:0] + replayer.truncOffset = replayer.offset + } + } + replayer.count++ + return nil +} + +func (replayer *simpleReplayer) Replay(s Store) error { + err := s.ForLoopEntries(replayer.doReplay) + logutil.Infof("replay count: %d", replayer.count) + return err +} diff --git a/pkg/vm/engine/aoe/storage/logstore/store_test.go b/pkg/vm/engine/aoe/storage/logstore/store_test.go new file mode 100644 index 000000000..bb74c5ddd --- /dev/null +++ b/pkg/vm/engine/aoe/storage/logstore/store_test.go @@ -0,0 +1,112 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logstore + +import ( + "encoding/binary" + "io" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +var ( + mockETDDL = ETCustomizeStart + 1 +) + +type mockDDLOp uint8 + +const ( + mockCreateOp mockDDLOp = iota + mockDropOp +) + +type mockDDLEntry struct { + BaseEntry +} + +func newMockDDLEntry(op mockDDLOp, data []byte) *mockDDLEntry { + payload := make([]byte, len(data)+1) + payload[0] = byte(op) + copy(payload[1:], data) + entry := newEmptyDDLEntry(nil) + entry.Unmarshal(payload) + entry.Meta.SetType(mockETDDL) + entry.Meta.SetPayloadSize(uint32(len(payload))) + return entry +} + +func newEmptyDDLEntry(meta *EntryMeta) *mockDDLEntry { + entry := new(mockDDLEntry) + if meta == nil { + entry.BaseEntry = *NewBaseEntry() + entry.Meta.SetType(mockETDDL) + } else { + entry.BaseEntry = *NewBaseEntryWithMeta(meta) + } + return entry +} + +func mockETDDLHandler(r io.Reader, meta *EntryMeta) (Entry, int64, error) { + entry := newEmptyDDLEntry(meta) + n, err := entry.ReadFrom(r) + if err != nil { + return nil, int64(n), err + } + return entry, int64(n), err +} + +func TestStore(t *testing.T) { + dir := "/tmp/teststore" + os.RemoveAll(dir) + name := "sstore" + store, err := New(dir, name) + assert.Nil(t, err) + + buf := make([]byte, 8) + step := 5 + uncommitted := make([]Entry, 0) + committed := make([]Entry, 0) + for i := 0; i < 13; i++ { + binary.BigEndian.PutUint64(buf, uint64(i)) + e := newMockDDLEntry(mockCreateOp, buf) + err = store.AppendEntry(e) + assert.Nil(t, err) + uncommitted = append(uncommitted, e) + if i%step == step-1 { + err = store.Sync() + assert.Nil(t, err) + committed = append(committed, uncommitted...) + uncommitted = uncommitted[:0] + } + } + store.Close() + + store, err = New(dir, name) + assert.Nil(t, err) + defer store.Close() + + replayer := NewSimpleReplayer() + err = replayer.RegisterEntryHandler(mockETDDL, mockETDDLHandler) + assert.Nil(t, err) + err = replayer.Replay(store) + assert.Nil(t, err) + t.Log(replayer.GetOffset()) + assert.Equal(t, len(committed), len(replayer.committed)) + + err = replayer.Truncate(store) + assert.Nil(t, err) +} diff --git a/pkg/vm/engine/aoe/storage/logstore/types.go b/pkg/vm/engine/aoe/storage/logstore/types.go new file mode 100644 index 000000000..83f05c28f --- /dev/null +++ b/pkg/vm/engine/aoe/storage/logstore/types.go @@ -0,0 +1,116 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logstore + +import ( + "errors" + "io" + "os" + "path/filepath" + "sync" +) + +type StoreEntryHandler = func(io.Reader) error + +type Store interface { + io.Closer + AppendEntry(Entry) error + Sync() error + ForLoopEntries(StoreEntryHandler) error + Truncate(int64) error +} + +type store struct { + mu sync.RWMutex + dir string + name string + file *os.File + pos int64 + writer io.Writer + // reader *bufio.Reader + reader io.Reader +} + +func New(dir, name string) (*store, error) { + if _, err := os.Stat(dir); os.IsNotExist(err) { + if err = os.MkdirAll(dir, os.ModePerm); err != nil { + return nil, err + } + } + f, err := os.OpenFile(filepath.Join(dir, name), os.O_RDWR|os.O_CREATE, os.ModePerm) + if err != nil { + return nil, err + } + s := &store{ + dir: dir, + name: name, + file: f, + } + stats, err := s.file.Stat() + if err != nil { + f.Close() + return nil, err + } + s.pos = stats.Size() + // s.writer = bufio.NewWriter(s.file) + s.writer = s.file + // s.reader = bufio.NewReader(s.file) + s.reader = s.file + return s, err +} + +func (s *store) Close() error { + // if err := s.writer.Flush(); err != nil { + // return err + // } + if err := s.file.Sync(); err != nil { + return err + } + return s.file.Close() +} + +func (s *store) Truncate(size int64) error { + return s.file.Truncate(size) +} + +func (s *store) AppendEntry(entry Entry) error { + if _, err := entry.WriteTo(s.writer, &s.mu); err != nil { + return err + } + // logutil.Infof("WriteEntry %d, Size=%d", entry.Type(), entry.Size()) + return nil +} + +func (s *store) Sync() error { + if err := s.AppendEntry(FlushEntry); err != nil { + return err + } + // if err := s.writer.Flush(); err != nil { + // return err + // } + err := s.file.Sync() + return err +} + +func (s *store) ForLoopEntries(handler StoreEntryHandler) error { + for { + if err := handler(s.reader); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + } +} -- GitLab