diff --git a/cmd/db-server/main.go b/cmd/db-server/main.go index 64ed04e5ad1ca23a7b5ddc3af70616d076436f0d..88a0b2e5dccb0033108b455307e225f28cfa9205 100644 --- a/cmd/db-server/main.go +++ b/cmd/db-server/main.go @@ -37,7 +37,7 @@ import ( ) var ( - c catalog.Catalog + c *catalog.Catalog mo *frontend.MOServer pci *frontend.PDCallbackImpl ) @@ -181,7 +181,7 @@ func main() { panic(err) } c = catalog.NewCatalog(a) - eng := aoeEngine.New(&c) + eng := aoeEngine.New(c) pci.SetRemoveEpoch(removeEpoch) hm := config.HostMmu diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index fbad54c1d619b4fc3e9b3cdc77b7db36cbe1dd6d..9f7ae8e58b1f106cd41cb4eaa9898fb3a161d400 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -24,7 +24,7 @@ const ( cTableIDPrefix = "TID" cRoutePrefix = "Route" cDeletedTablePrefix = "DeletedTableQueue" - timeout = 600 * time.Millisecond + timeout = 2000 * time.Millisecond idPoolSize = 20 ) @@ -35,11 +35,11 @@ type Catalog struct { dbIdEnd uint64 tidStart uint64 tidEnd uint64 - pLock int32 // + pLock int32 } // NewCatalog creates a Catalog. -func NewCatalog(store driver.CubeDriver) Catalog { +func NewCatalog(store driver.CubeDriver) *Catalog { catalog := Catalog{ Driver: store, } @@ -57,7 +57,7 @@ func NewCatalog(store driver.CubeDriver) Catalog { catalog.tidStart = tmpId - idPoolSize + 1 logutil.Debugf("Catalog initialize finished, db id range is [%d, %d), table id range is [%d, %d)", catalog.dbIdStart, catalog.dbIdEnd, catalog.tidStart, catalog.tidEnd) - return catalog + return &catalog } // CreateDatabase creates a database with db info. @@ -69,7 +69,6 @@ func (c *Catalog) CreateDatabase(epoch uint64, dbName string, typ int) (dbid uin if _, err := c.checkDBNotExists(dbName); err != nil { return 0, err } - //id, err := c.genGlobalUniqIDs([]byte(cDBIDPrefix)) dbid, err = c.allocId(cDBIDPrefix) if err != nil { return 0, err @@ -151,62 +150,11 @@ func (c *Catalog) GetDatabase(dbName string) (*aoe.SchemaInfo, error) { return db, nil } -// CreateTableBak creates a table with tableInfo in database. -func (c *Catalog) CreateTableBak(epoch, dbId uint64, tbl aoe.TableInfo) (uint64, error) { - t0 := time.Now() - defer func() { - logutil.Debugf("CreateTable cost %d ms", time.Since(t0).Milliseconds()) - }() - _, err := c.checkDBExists(dbId) - if err != nil { - return 0, err - } - _, err = c.checkTableNotExists(dbId, tbl.Name) - if err != nil { - return 0, err - } - tbl.Id, err = c.genGlobalUniqIDs([]byte(cTableIDPrefix)) - if err != nil { - return 0, err - } - err = c.Driver.Set(c.tableIDKey(dbId, tbl.Name), codec.Uint642Bytes(tbl.Id)) - if err != nil { - logutil.Errorf("ErrTableCreateFailed, %v", err) - return 0, err - } - tbl.Epoch = epoch - tbl.SchemaId = dbId - if shardId, err := c.getAvailableShard(tbl.Id); err == nil { - rkey := c.routeKey(dbId, tbl.Id, shardId) - if err := c.Driver.CreateTablet(c.encodeTabletName(shardId, tbl.Id), shardId, &tbl); err != nil { - logutil.Errorf("ErrTableCreateFailed, %v", err) - return 0, err - } - if c.Driver.Set(rkey, []byte(tbl.Name)) != nil { - logutil.Errorf("ErrTableCreateFailed, %v", err) - return 0, err - } - tbl.State = aoe.StatePublic - meta, err := helper.EncodeTable(tbl) - if err != nil { - logutil.Errorf("ErrTableCreateFailed, %v", err) - return 0, err - } - err = c.Driver.Set(c.tableKey(dbId, tbl.Id), meta) - if err != nil { - logutil.Errorf("ErrTableCreateFailed, %v", err) - return 0, err - } - return tbl.Id, nil - } - return 0, ErrNoAvailableShard -} - // CreateTable creates a table with tableInfo in database. func (c *Catalog) CreateTable(epoch, dbId uint64, tbl aoe.TableInfo) (tid uint64, err error) { t0 := time.Now() defer func() { - logutil.Debugf("CreateTable finished, table id is %d, cost %d ms", tid, time.Since(t0).Milliseconds()) + logutil.Debugf("CreateTable finished, table name is %v, table id is %d, sid is %d, cost %d ms", tbl.Name, tid, time.Since(t0).Milliseconds()) }() _, err = c.checkDBExists(dbId) if err != nil { @@ -592,13 +540,15 @@ func (c *Catalog) allocId(key string) (id uint64, err error) { err = ErrTableCreateTimeout return default: - if c.dbIdStart <= c.dbIdEnd { - id = c.dbIdStart - atomic.AddUint64(&c.dbIdStart, 1) - logutil.Debugf("alloc db id finished, id is %d, endId is %d", id, c.dbIdEnd) - return + if atomic.LoadInt32(&c.pLock) == 0 { + id = atomic.AddUint64(&c.dbIdStart, 1) - 1 + if id <= atomic.LoadUint64(&c.dbIdEnd) { + logutil.Debugf("alloc db id finished, id is %d, endId is %d", id, c.dbIdEnd) + return + } else { + c.refreshDBIDCache() + } } - c.refreshDBIDCache() time.Sleep(time.Millisecond * 10) } } @@ -612,12 +562,14 @@ func (c *Catalog) allocId(key string) (id uint64, err error) { err = ErrTableCreateTimeout return default: - if c.tidStart <= c.tidEnd { - id = c.tidStart - atomic.AddUint64(&c.tidStart, 1) - logutil.Infof("alloc table id finished, id is %d, endId is %d", id, c.tidEnd) - go c.refreshTableIDCache() - return + if atomic.LoadInt32(&c.pLock) == 0 { + id = atomic.AddUint64(&c.tidStart, 1) - 1 + if id <= atomic.LoadUint64(&c.tidEnd) { + logutil.Debugf("alloc table id finished, id is %d, endId is %d", id, c.tidEnd) + return + } else { + c.refreshTableIDCache() + } } time.Sleep(time.Millisecond * 10) } diff --git a/pkg/frontend/epochgc_test.go b/pkg/frontend/epochgc_test.go index 43b042935584326caee15cefa0b92cba9a96ab13..3d58bfe7b4645d86106f92f7883e99667d5a4543 100644 --- a/pkg/frontend/epochgc_test.go +++ b/pkg/frontend/epochgc_test.go @@ -164,7 +164,7 @@ func TestEpochGCWithMultiServer(t *testing.T) { }() catalog := catalog3.NewCatalog(c.CubeDrivers[0]) - eng := aoe_engine.New(&catalog) + eng := aoe_engine.New(catalog) for i := 0; i < nodeCnt; i++ { pcis[i].SetRemoveEpoch(func(epoch uint64) { diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index 902a610e646caefe1cfbc583773ac5eef29fb5db..0afaba938fe3f4468bb01c16a00a0ee9fa618399 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -29,7 +29,7 @@ import ( var _globalL, _globalP, _globalS atomic.Value func init() { - conf := &Config{Level: "debug", File: FileLogConfig{}} + conf := &Config{Level: "info", File: FileLogConfig{}} logger, props, _ := InitLogger(conf) ReplaceGlobals(logger, props) } diff --git a/pkg/vm/engine/aoe/engine/engine_test.go b/pkg/vm/engine/aoe/engine/engine_test.go index 8fbba80570e26b641848bba99f7b6f566f48d2ba..caa91dec371c43496ed1a78f7a63238779ec8a3d 100644 --- a/pkg/vm/engine/aoe/engine/engine_test.go +++ b/pkg/vm/engine/aoe/engine/engine_test.go @@ -106,14 +106,14 @@ func TestAOEEngine(t *testing.T) { stdLog.Printf("app started.") time.Sleep(3 * time.Second) - var catalogs []catalog2.Catalog + var catalogs []*catalog2.Catalog for i := 0; i < 3; i++ { catalogs = append(catalogs, catalog2.NewCatalog(c.CubeDrivers[i])) } - //testTableDDL(t, catalogs) + testTableDDL(t, catalogs) - aoeEngine := New(&catalogs[0]) + aoeEngine := New(catalogs[0]) t0 := time.Now() err := aoeEngine.Create(0, testDBName, 0) @@ -229,7 +229,7 @@ func doRestartEngine(t *testing.T) { c.RaftCluster.WaitLeadersByCount(21, time.Second*30) catalog := catalog2.NewCatalog(c.CubeDrivers[0]) - aoeEngine := New(&catalog) + aoeEngine := New(catalog) dbs := aoeEngine.Databases() require.Equal(t, 1, len(dbs)) @@ -248,7 +248,7 @@ func doRestartEngine(t *testing.T) { } } -func testTableDDL(t *testing.T, c []catalog2.Catalog) { +func testTableDDL(t *testing.T, c []*catalog2.Catalog) { //Wait shard state change logutil.Infof("ddl test begin") @@ -317,36 +317,33 @@ func testTableDDL(t *testing.T, c []catalog2.Catalog) { for index := range c { wg.Add(1) - cc := c[index] - i := index - //time.Sleep(100 * time.Millisecond) - go func() { + go func(idx int) { defer func() { wg.Done() }() - for j := 10; j < 50; j++ { - if j%3 != i { + for j := 10; j < 200; j++ { + if j%3 != idx { continue } t1.Name = fmt.Sprintf("t%d", j) - tid, err := cc.CreateTable(uint64(j), dbid, *t1) + tid, err := c[idx].CreateTable(uint64(j), dbid, *t1) if err != nil { - logutil.Infof("create table failed, catalog-%d, j is %d, tid is %d", i, j, tid) + logutil.Infof("create table failed, catalog-%d, j is %d, tid is %d", idx, j, tid) } else { - logutil.Infof("create table finished, catalog-%d, j is %d, tid is %d", i, j, tid) + logutil.Infof("create table finished, catalog-%d, j is %d, tid is %d", idx, j, tid) } require.NoError(t, err) require.Less(t, uint64(0), tid) time.Sleep(100 * time.Millisecond) } - }() + }(index) } wg.Wait() tbls, err = c[0].ListTables(dbid) require.NoError(t, err) - require.Equal(t, 40, len(tbls)) + require.Equal(t, 190, len(tbls)) for i := uint64(10); i < 15; i++ { _, err = c[0].DropTable(20+i, dbid, fmt.Sprintf("t%d", i)) @@ -355,7 +352,7 @@ func testTableDDL(t *testing.T, c []catalog2.Catalog) { tbls, err = c[0].ListTables(dbid) require.NoError(t, err) - require.Equal(t, 35, len(tbls)) + require.Equal(t, 185, len(tbls)) err = c[0].DropDatabase(3, dbName) require.NoError(t, err)