From 65605ac66aa455bebb65354c2bcb752b7afcee8c Mon Sep 17 00:00:00 2001 From: XuPeng-SH <xupeng3112@163.com> Date: Wed, 8 Dec 2021 11:21:47 +0800 Subject: [PATCH] Update event listener and related dependency (#1304) --- pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go | 2 +- pkg/vm/engine/aoe/storage/db/split.go | 8 +++++--- pkg/vm/engine/aoe/storage/event/listeners.go | 14 ++++++++++++-- pkg/vm/engine/aoe/storage/event/logging.go | 14 ++++++++++++-- pkg/vm/engine/aoe/storage/event/types.go | 9 ++++++--- pkg/vm/engine/aoe/storage/metadata/v1/database.go | 2 +- 6 files changed, 37 insertions(+), 12 deletions(-) diff --git a/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go b/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go index b7311d1c6..548e2abe3 100644 --- a/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go +++ b/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go @@ -325,7 +325,7 @@ func TestShard2(t *testing.T) { } assert.Equal(t, 0, total) for _, shard := range shards { - testutils.WaitExpect(400, func() bool { + testutils.WaitExpect(800, func() bool { return shard.gen.Get() == shard.getSafeId() }) } diff --git a/pkg/vm/engine/aoe/storage/db/split.go b/pkg/vm/engine/aoe/storage/db/split.go index e753a7605..173fd4531 100644 --- a/pkg/vm/engine/aoe/storage/db/split.go +++ b/pkg/vm/engine/aoe/storage/db/split.go @@ -177,10 +177,12 @@ func (splitter *Splitter) Prepare() error { } func (splitter *Splitter) Commit() error { - err := splitter.msplitter.Commit() - if err == nil { - splitter.dbImpl.Opts.EventListener.OnDatabaseSplitted(splitter.event) + err := splitter.dbImpl.Opts.EventListener.OnPreSplit(splitter.event) + if err != nil { + panic(err) } + err = splitter.msplitter.Commit() + splitter.dbImpl.Opts.EventListener.OnPostSplit(err, splitter.event) return err } diff --git a/pkg/vm/engine/aoe/storage/event/listeners.go b/pkg/vm/engine/aoe/storage/event/listeners.go index bee6e0912..9b4350a16 100644 --- a/pkg/vm/engine/aoe/storage/event/listeners.go +++ b/pkg/vm/engine/aoe/storage/event/listeners.go @@ -12,10 +12,20 @@ func NewListensers(l1, l2 Listener) *listensers { return s } -func (s *listensers) OnDatabaseSplitted(event *SplitEvent) error { +func (s *listensers) OnPreSplit(event *SplitEvent) error { var err error for _, l := range s.ls { - if err = l.OnDatabaseSplitted(event); err != nil { + if err = l.OnPreSplit(event); err != nil { + break + } + } + return err +} + +func (s *listensers) OnPostSplit(res error, event *SplitEvent) error { + var err error + for _, l := range s.ls { + if err = l.OnPostSplit(res, event); err != nil { break } } diff --git a/pkg/vm/engine/aoe/storage/event/logging.go b/pkg/vm/engine/aoe/storage/event/logging.go index 518a296b9..7803343a3 100644 --- a/pkg/vm/engine/aoe/storage/event/logging.go +++ b/pkg/vm/engine/aoe/storage/event/logging.go @@ -24,8 +24,18 @@ func NewLoggingListener() *loggingListener { return new(loggingListener) } -func (l *loggingListener) OnDatabaseSplitted(event *SplitEvent) error { - logutil.Info(event.String()) +func (l *loggingListener) OnPreSplit(event *SplitEvent) error { + return nil +} + +func (l *loggingListener) OnPostSplit(res error, event *SplitEvent) error { + var state string + if res != nil { + state = res.Error() + } else { + state = "Done" + } + logutil.Infof("%s | %s", event.String(), state) return nil } diff --git a/pkg/vm/engine/aoe/storage/event/types.go b/pkg/vm/engine/aoe/storage/event/types.go index 51ed50bf1..315d37b3b 100644 --- a/pkg/vm/engine/aoe/storage/event/types.go +++ b/pkg/vm/engine/aoe/storage/event/types.go @@ -14,7 +14,9 @@ package event -import "fmt" +import ( + "fmt" +) type SplitEvent struct { DB string @@ -22,7 +24,7 @@ type SplitEvent struct { } func (e *SplitEvent) String() string { - info := fmt.Sprintf("DB<\"%s\"> Splitted | {", e.DB) + info := fmt.Sprintf("DB<\"%s\"> Splitting | {", e.DB) for db, tables := range e.Names { dbInfo := fmt.Sprintf("[\"%s\"]: ", db) for _, table := range tables { @@ -36,7 +38,8 @@ func (e *SplitEvent) String() string { } type Listener interface { - OnDatabaseSplitted(*SplitEvent) error + OnPreSplit(*SplitEvent) error + OnPostSplit(error, *SplitEvent) error OnBackgroundError(error) error } diff --git a/pkg/vm/engine/aoe/storage/metadata/v1/database.go b/pkg/vm/engine/aoe/storage/metadata/v1/database.go index 83d670ee5..fb9abffbd 100644 --- a/pkg/vm/engine/aoe/storage/metadata/v1/database.go +++ b/pkg/vm/engine/aoe/storage/metadata/v1/database.go @@ -870,7 +870,7 @@ func (db *Database) onNewTable(entry *Table) error { if nn != nil { e := nn.GetTable() // Conflict checks all committed and uncommitted entries. - if !e.IsDeletedLocked() { + if !e.IsDeleted() { return DuplicateErr } db.TableSet[entry.Id] = entry -- GitLab