diff --git a/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go b/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go index b7311d1c611b283065cb175f1a50983d3eaf8ce1..548e2abe3156be31d30c9a7013705410df728888 100644 --- a/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go +++ b/pkg/vm/engine/aoe/storage/aoedb/v1/shard_test.go @@ -325,7 +325,7 @@ func TestShard2(t *testing.T) { } assert.Equal(t, 0, total) for _, shard := range shards { - testutils.WaitExpect(400, func() bool { + testutils.WaitExpect(800, func() bool { return shard.gen.Get() == shard.getSafeId() }) } diff --git a/pkg/vm/engine/aoe/storage/db/split.go b/pkg/vm/engine/aoe/storage/db/split.go index e753a7605ddc5c2a6ec3c3b20c8575e4b8150c65..173fd45312adb14912c5e981615cc7d220432564 100644 --- a/pkg/vm/engine/aoe/storage/db/split.go +++ b/pkg/vm/engine/aoe/storage/db/split.go @@ -177,10 +177,12 @@ func (splitter *Splitter) Prepare() error { } func (splitter *Splitter) Commit() error { - err := splitter.msplitter.Commit() - if err == nil { - splitter.dbImpl.Opts.EventListener.OnDatabaseSplitted(splitter.event) + err := splitter.dbImpl.Opts.EventListener.OnPreSplit(splitter.event) + if err != nil { + panic(err) } + err = splitter.msplitter.Commit() + splitter.dbImpl.Opts.EventListener.OnPostSplit(err, splitter.event) return err } diff --git a/pkg/vm/engine/aoe/storage/event/listeners.go b/pkg/vm/engine/aoe/storage/event/listeners.go index bee6e09120bae5085a28a7597ec1b32caf176c27..9b4350a167e8b9765be193b12258bcdf3ac5a6f6 100644 --- a/pkg/vm/engine/aoe/storage/event/listeners.go +++ b/pkg/vm/engine/aoe/storage/event/listeners.go @@ -12,10 +12,20 @@ func NewListensers(l1, l2 Listener) *listensers { return s } -func (s *listensers) OnDatabaseSplitted(event *SplitEvent) error { +func (s *listensers) OnPreSplit(event *SplitEvent) error { var err error for _, l := range s.ls { - if err = l.OnDatabaseSplitted(event); err != nil { + if err = l.OnPreSplit(event); err != nil { + break + } + } + return err +} + +func (s *listensers) OnPostSplit(res error, event *SplitEvent) error { + var err error + for _, l := range s.ls { + if err = l.OnPostSplit(res, event); err != nil { break } } diff --git a/pkg/vm/engine/aoe/storage/event/logging.go b/pkg/vm/engine/aoe/storage/event/logging.go index 518a296b9d3bdad9af54597e6eff243c287781b4..7803343a37840ebfa52bdedc1e1aa9173e46d23b 100644 --- a/pkg/vm/engine/aoe/storage/event/logging.go +++ b/pkg/vm/engine/aoe/storage/event/logging.go @@ -24,8 +24,18 @@ func NewLoggingListener() *loggingListener { return new(loggingListener) } -func (l *loggingListener) OnDatabaseSplitted(event *SplitEvent) error { - logutil.Info(event.String()) +func (l *loggingListener) OnPreSplit(event *SplitEvent) error { + return nil +} + +func (l *loggingListener) OnPostSplit(res error, event *SplitEvent) error { + var state string + if res != nil { + state = res.Error() + } else { + state = "Done" + } + logutil.Infof("%s | %s", event.String(), state) return nil } diff --git a/pkg/vm/engine/aoe/storage/event/types.go b/pkg/vm/engine/aoe/storage/event/types.go index 51ed50bf130b544ee644d945e2f33961b0589570..315d37b3b7cbc3eb6ca2efedc38d21a1063a1b04 100644 --- a/pkg/vm/engine/aoe/storage/event/types.go +++ b/pkg/vm/engine/aoe/storage/event/types.go @@ -14,7 +14,9 @@ package event -import "fmt" +import ( + "fmt" +) type SplitEvent struct { DB string @@ -22,7 +24,7 @@ type SplitEvent struct { } func (e *SplitEvent) String() string { - info := fmt.Sprintf("DB<\"%s\"> Splitted | {", e.DB) + info := fmt.Sprintf("DB<\"%s\"> Splitting | {", e.DB) for db, tables := range e.Names { dbInfo := fmt.Sprintf("[\"%s\"]: ", db) for _, table := range tables { @@ -36,7 +38,8 @@ func (e *SplitEvent) String() string { } type Listener interface { - OnDatabaseSplitted(*SplitEvent) error + OnPreSplit(*SplitEvent) error + OnPostSplit(error, *SplitEvent) error OnBackgroundError(error) error } diff --git a/pkg/vm/engine/aoe/storage/metadata/v1/database.go b/pkg/vm/engine/aoe/storage/metadata/v1/database.go index 83d670ee580c5d70a54e9b9fb867abb247853788..fb9abffbdb7272e87825ce4a0fa01c6a4233db89 100644 --- a/pkg/vm/engine/aoe/storage/metadata/v1/database.go +++ b/pkg/vm/engine/aoe/storage/metadata/v1/database.go @@ -870,7 +870,7 @@ func (db *Database) onNewTable(entry *Table) error { if nn != nil { e := nn.GetTable() // Conflict checks all committed and uncommitted entries. - if !e.IsDeletedLocked() { + if !e.IsDeleted() { return DuplicateErr } db.TableSet[entry.Id] = entry