Skip to content
Snippets Groups Projects
Unverified Commit 6ffed88c authored by daviszhen's avatar daviszhen Committed by GitHub
Browse files

Remove epoch persistence (#2880)

parent 832401b1
No related branches found
No related tags found
No related merge requests found
......@@ -52,20 +52,11 @@ type PDCallbackImpl struct {
chan may be blocked.
temporal scheme
*/
msgChan chan *ChanMessage
//ms
periodOfTimer int
timerClose CloseFlag
/*
0 : for the persistence of the cluster epoch
1 : for the persistence of the server info
2 : for the persistence of the minimumRemovableEpoch
*/
persistTimeout []*Timeout
persistClose CloseFlag
//second
periodOfDDLDelete int
ddlDeleteClose CloseFlag
......@@ -109,8 +100,6 @@ type PDCallbackImpl struct {
the limit of cube load
*/
limitOfCubeLoad int
mutex sync.Mutex
}
/*
......@@ -118,15 +107,9 @@ NewPDCallbackImpl
*/
func NewPDCallbackImpl(pu *PDCallbackParameterUnit) *PDCallbackImpl {
return &PDCallbackImpl{
cluster_epoch: 1,
serverInfo: make(map[uint64]uint64),
msgChan: make(chan *ChanMessage),
periodOfTimer: pu.timerPeriod,
persistTimeout: []*Timeout{
NewTimeout(time.Duration(pu.persistencePeriod)*time.Second, true),
NewTimeout(time.Duration(pu.persistencePeriod)*time.Second, true),
NewTimeout(time.Duration(pu.persistencePeriod)*time.Second, true),
},
cluster_epoch: 1,
serverInfo: make(map[uint64]uint64),
periodOfTimer: pu.timerPeriod,
periodOfDDLDelete: pu.ddlDeletePeriod,
epoch_info: make(map[uint64]uint64),
ddl_info: make(map[uint64]uint64),
......@@ -191,21 +174,6 @@ func NewPDCallbackParameterUnit(tp, pp, ddp, ht int, enableLog bool, lt int) *PD
}
}
type MsgType int
const (
MSG_TYPE_SERVER_INFO MsgType = iota + 1
MSG_TYPE_MINI_REM_EPOCH
MSG_TYPE_CLUSTER_EPOCH
)
type ChanMessage struct {
tp MsgType
body []byte
body2 [][]byte
body3 [][]byte
}
const (
META_TYPE_TABLE int = iota
META_TYPE_DATABASE
......@@ -268,25 +236,10 @@ func (pci *PDCallbackImpl) Start(kv storage.Storage) error {
defer logutil.Infof("-------PDC Start exit\n")
}
//TODO:When the cluster runs initially, there is not keys any more.
//load cluster_epoch
//load minimumRemovableEpoch
//load kv<server,maximumRemovableEpoch>
err := kv.LoadCustomData(int64(pci.limitOfCubeLoad), pci.getCustomData)
if err != nil {
return err
}
//make new channel
if pci.msgChan == nil {
pci.msgChan = make(chan *ChanMessage)
}
//start timer for epoch increment
go pci.IncrementEpochPeriodlyRoutine(pci.periodOfTimer)
//start persistent worker
go pci.PersistentWorkerRoutine(pci.msgChan, kv)
//start delete ddl worker
go pci.DeleteDDLPeriodicallyRoutine()
......@@ -318,13 +271,7 @@ func (pci *PDCallbackImpl) Stop(kv storage.Storage) error {
if pci.enableLog {
logutil.Infof("-------PDC Stop close channel\n")
}
closeChan := func() {
pci.mutex.Lock()
defer pci.mutex.Unlock()
close(pci.msgChan)
pci.msgChan = nil
}
pci.closeOnce.Do(closeChan)
if pci.enableLog {
logutil.Infof("-------PDC Stop close channel done\n")
}
......@@ -371,63 +318,15 @@ func (pci *PDCallbackImpl) HandleHeartbeatReq(id uint64, data []byte, kv storage
When a server leaves, its info will exist in the server_info in next several epochs.
It has no side effects also.
*/
if pci.persistTimeout[1].isTimeout() {
var keys [][]byte = nil
var b2 [][]byte = nil
var kk [8]byte
for k, v := range pci.serverInfo {
minRE = MinUint64(minRE, v)
var k_buf []byte = nil
k_buf = append(k_buf, SERVER_PREFIX...)
binary.BigEndian.PutUint64(kk[:], k)
k_buf = append(k_buf, kk[:]...)
v_buf := make([]byte, 8)
binary.BigEndian.PutUint64(v_buf, v)
keys = append(keys, k_buf)
b2 = append(b2, v_buf)
}
if pci.enableLog {
logutil.Infof("-------PDC HandleHeartbeatReq server_info to channel\n")
}
//step 3: put these values into the worker
pci.msgChan <- &ChanMessage{
tp: MSG_TYPE_SERVER_INFO,
body: nil,
body2: keys,
body3: b2,
}
if pci.enableLog {
logutil.Infof("-------PDC HandleHeartbeatReq enter server_info to channel done\n")
}
} else {
for _, v := range pci.serverInfo {
minRE = MinUint64(minRE, v)
}
for _, v := range pci.serverInfo {
minRE = MinUint64(minRE, v)
}
pci.cluster_minimumRemovableEpoch = minRE
//logutil.Infof("node %d maxre %d minRe %d \n",id,maxre,minRE)
if pci.persistTimeout[2].isTimeout() {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, pci.cluster_minimumRemovableEpoch)
if pci.enableLog {
logutil.Infof("-------PDC HandleHeartbeatReq epoch to channel\n")
}
pci.msgChan <- &ChanMessage{
tp: MSG_TYPE_MINI_REM_EPOCH,
body: buf,
}
if pci.enableLog {
logutil.Infof("-------PDC HandleHeartbeatReq epoch to channel done\n")
}
}
//step 4: response to the server
var rsp []byte = make([]byte, 16)
ce := atomic.LoadUint64(&pci.cluster_epoch)
......@@ -445,68 +344,12 @@ func (pci *PDCallbackImpl) IncrementEpochPeriodlyRoutine(period int) {
for pci.timerClose.IsOpened() {
//step 1: incr cluster_epoch
ce := atomic.AddUint64(&pci.cluster_epoch, 1)
if pci.persistTimeout[0].isTimeout() {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, ce)
if pci.enableLog {
logutil.Infof("------- gen epoch %d\n", ce)
}
//step 2: put these values into the worker
pci.msgChan <- &ChanMessage{
tp: MSG_TYPE_CLUSTER_EPOCH,
body: buf,
}
}
atomic.AddUint64(&pci.cluster_epoch, 1)
time.Sleep(time.Duration(pci.periodOfTimer) * time.Second)
}
}
/*
store the message into the kv
*/
func (pci *PDCallbackImpl) PersistentWorkerRoutine(msgChan chan *ChanMessage, kv storage.Storage) {
pci.persistClose.Open()
//get the message
//put the body into kv
pci.mutex.Lock()
defer pci.mutex.Unlock()
for msg := range pci.msgChan {
switch msg.tp {
case MSG_TYPE_CLUSTER_EPOCH:
if pci.enableLog {
logutil.Infof("-------cluster epoch %v \n", msg.body)
}
err := kv.PutCustomData(CLUSTER_EPOCH_KEY, msg.body)
if err != nil {
logutil.Errorf(err.Error())
}
case MSG_TYPE_SERVER_INFO:
if pci.enableLog {
logutil.Infof("-------server info %v \n", msg.body2)
}
//save kv<server,maximumRemovableEpoch>
err := kv.BatchPutCustomData(msg.body2, msg.body3)
if err != nil {
logutil.Errorf(err.Error())
}
case MSG_TYPE_MINI_REM_EPOCH:
if pci.enableLog {
logutil.Infof("-------minimum removable epoch %v \n", msg.body)
}
//save minimumRemovableEpoch
err := kv.PutCustomData(MINI_REM_EPOCH_KEY, msg.body)
if err != nil {
logutil.Errorf(err.Error())
}
}
}
}
/**
pd leader start the routine.
*/
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment