Skip to content
Snippets Groups Projects
Unverified Commit 5e050c56 authored by XuPeng-SH's avatar XuPeng-SH Committed by GitHub
Browse files

(tae): update Rows for system virtual table (#2885)


Co-authored-by: default avatarJin Hai <haijin.chn@gmail.com>
parent b2945213
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/matrixorigin/matrixone/pkg/logutil"
......@@ -56,6 +57,9 @@ type Catalog struct {
link *common.Link
nodesMu sync.RWMutex
tableCnt int32
columnCnt int32
}
func MockCatalog(dir, name string, cfg *store.StoreCfg, scheduler tasks.TaskScheduler) *Catalog {
......@@ -500,6 +504,28 @@ func (catalog *Catalog) CoarseDBCnt() int {
return len(catalog.entries)
}
func (catalog *Catalog) CoarseTableCnt() int {
return int(atomic.LoadInt32(&catalog.tableCnt))
}
func (catalog *Catalog) CoarseColumnCnt() int {
return int(atomic.LoadInt32(&catalog.columnCnt))
}
func (catalog *Catalog) AddTableCnt(cnt int) {
n := atomic.AddInt32(&catalog.tableCnt, int32(cnt))
if n < 0 {
panic("logic error")
}
}
func (catalog *Catalog) AddColumnCnt(cnt int) {
n := atomic.AddInt32(&catalog.columnCnt, int32(cnt))
if n < 0 {
panic("logic error")
}
}
func (catalog *Catalog) GetScheduler() tasks.TaskScheduler { return catalog.scheduler }
func (catalog *Catalog) GetDatabaseByID(id uint64) (db *DBEntry, err error) {
catalog.RLock()
......
......@@ -200,6 +200,12 @@ func (e *DBEntry) CreateTableEntry(schema *Schema, txnCtx txnif.AsyncTxn, dataFa
}
func (e *DBEntry) RemoveEntry(table *TableEntry) (err error) {
defer func() {
if err == nil {
e.catalog.AddTableCnt(-1)
e.catalog.AddColumnCnt(-1 * len(table.schema.ColDefs))
}
}()
logutil.Infof("Removing: %s", table.String())
e.Lock()
defer e.Unlock()
......@@ -217,7 +223,13 @@ func (e *DBEntry) RemoveEntry(table *TableEntry) (err error) {
return
}
func (e *DBEntry) AddEntryLocked(table *TableEntry) error {
func (e *DBEntry) AddEntryLocked(table *TableEntry) (err error) {
defer func() {
if err == nil {
e.catalog.AddTableCnt(1)
e.catalog.AddColumnCnt(len(table.schema.ColDefs))
}
}()
nn := e.nameNodes[table.schema.Name]
if nn == nil {
n := e.link.Insert(table)
......@@ -231,26 +243,28 @@ func (e *DBEntry) AddEntryLocked(table *TableEntry) error {
node := nn.GetTableNode()
record := node.GetPayload().(*TableEntry)
record.RLock()
err := record.PrepareWrite(table.GetTxn(), record.RWMutex)
err = record.PrepareWrite(table.GetTxn(), record.RWMutex)
if err != nil {
record.RUnlock()
return err
return
}
if record.HasActiveTxn() {
if !record.IsDroppedUncommitted() {
record.RUnlock()
return ErrDuplicate
err = ErrDuplicate
return
}
} else if !record.HasDropped() {
record.RUnlock()
return ErrDuplicate
err = ErrDuplicate
return
}
record.RUnlock()
n := e.link.Insert(table)
e.entries[table.GetID()] = n
nn.CreateNode(table.GetID())
}
return nil
return
}
func (e *DBEntry) MakeCommand(id uint32) (txnif.TxnCmd, error) {
......
......@@ -219,6 +219,9 @@ func TestGCDB(t *testing.T) {
opts := config.WithQuickScanAndCKPOpts(nil)
tae := initDB(t, opts)
defer tae.Close()
dbCnt := tae.Catalog.CoarseDBCnt()
tableCnt := tae.Catalog.CoarseTableCnt()
columnCnt := tae.Catalog.CoarseColumnCnt()
schema1 := catalog.MockSchema(13, 12)
schema1.BlockMaxRows = 10
......@@ -318,4 +321,7 @@ func TestGCDB(t *testing.T) {
assert.Equal(t, 0, len(names))
t.Log(tae.Catalog.SimplePPString(common.PPL1))
printCheckpointStats(t, tae)
assert.Equal(t, dbCnt, tae.Catalog.CoarseDBCnt())
assert.Equal(t, tableCnt, tae.Catalog.CoarseTableCnt())
assert.Equal(t, columnCnt, tae.Catalog.CoarseColumnCnt())
}
......@@ -136,11 +136,23 @@ func (h *txnRelation) SimplePPString(level common.PPLevel) string {
return s
}
func (h *txnRelation) Close() error { return nil }
func (h *txnRelation) GetMeta() any { return h.table.entry }
func (h *txnRelation) GetSchema() any { return h.table.entry.GetSchema() }
func (h *txnRelation) Close() error { return nil }
func (h *txnRelation) Rows() int64 { return int64(h.table.entry.GetRows()) }
func (h *txnRelation) Rows() int64 {
if h.table.entry.GetDB().IsSystemDB() && h.table.entry.IsVirtual() {
if h.table.entry.GetSchema().Name == catalog.SystemTable_DB_Name {
return int64(h.table.entry.GetCatalog().CoarseDBCnt())
} else if h.table.entry.GetSchema().Name == catalog.SystemTable_Table_Name {
return int64(h.table.entry.GetCatalog().CoarseTableCnt())
} else if h.table.entry.GetSchema().Name == catalog.SystemTable_Columns_Name {
return int64(h.table.entry.GetCatalog().CoarseColumnCnt())
}
panic("logic error")
}
return int64(h.table.entry.GetRows())
}
func (h *txnRelation) Size(attr string) int64 { return 0 }
func (h *txnRelation) GetCardinality(attr string) int64 { return 0 }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment