diff --git a/pkg/frontend/epochgc.go b/pkg/frontend/epochgc.go
index a4d629f3250df668951f1d940a3c12acedd20767..3074408bc7eb4ae954f1d5246fc5eecb280830b4 100644
--- a/pkg/frontend/epochgc.go
+++ b/pkg/frontend/epochgc.go
@@ -52,20 +52,11 @@ type PDCallbackImpl struct {
chan may be blocked.
temporal scheme
*/
- msgChan chan *ChanMessage
//ms
periodOfTimer int
timerClose CloseFlag
- /*
- 0 : for the persistence of the cluster epoch
- 1 : for the persistence of the server info
- 2 : for the persistence of the minimumRemovableEpoch
- */
- persistTimeout []*Timeout
- persistClose CloseFlag
-
//second
periodOfDDLDelete int
ddlDeleteClose CloseFlag
@@ -109,8 +100,6 @@ type PDCallbackImpl struct {
the limit of cube load
*/
limitOfCubeLoad int
-
- mutex sync.Mutex
}
/*
@@ -118,15 +107,9 @@ NewPDCallbackImpl
*/
func NewPDCallbackImpl(pu *PDCallbackParameterUnit) *PDCallbackImpl {
return &PDCallbackImpl{
- cluster_epoch: 1,
- serverInfo: make(map[uint64]uint64),
- msgChan: make(chan *ChanMessage),
- periodOfTimer: pu.timerPeriod,
- persistTimeout: []*Timeout{
- NewTimeout(time.Duration(pu.persistencePeriod)*time.Second, true),
- NewTimeout(time.Duration(pu.persistencePeriod)*time.Second, true),
- NewTimeout(time.Duration(pu.persistencePeriod)*time.Second, true),
- },
+ cluster_epoch: 1,
+ serverInfo: make(map[uint64]uint64),
+ periodOfTimer: pu.timerPeriod,
periodOfDDLDelete: pu.ddlDeletePeriod,
epoch_info: make(map[uint64]uint64),
ddl_info: make(map[uint64]uint64),
@@ -191,21 +174,6 @@ func NewPDCallbackParameterUnit(tp, pp, ddp, ht int, enableLog bool, lt int) *PD
}
}
-type MsgType int
-
-const (
- MSG_TYPE_SERVER_INFO MsgType = iota + 1
- MSG_TYPE_MINI_REM_EPOCH
- MSG_TYPE_CLUSTER_EPOCH
-)
-
-type ChanMessage struct {
- tp MsgType
- body []byte
- body2 [][]byte
- body3 [][]byte
-}
-
const (
META_TYPE_TABLE int = iota
META_TYPE_DATABASE
@@ -268,25 +236,10 @@ func (pci *PDCallbackImpl) Start(kv storage.Storage) error {
defer logutil.Infof("-------PDC Start exit\n")
}
//TODO:When the cluster runs initially, there is not keys any more.
- //load cluster_epoch
- //load minimumRemovableEpoch
- //load kv<server,maximumRemovableEpoch>
- err := kv.LoadCustomData(int64(pci.limitOfCubeLoad), pci.getCustomData)
- if err != nil {
- return err
- }
-
- //make new channel
- if pci.msgChan == nil {
- pci.msgChan = make(chan *ChanMessage)
- }
//start timer for epoch increment
go pci.IncrementEpochPeriodlyRoutine(pci.periodOfTimer)
- //start persistent worker
- go pci.PersistentWorkerRoutine(pci.msgChan, kv)
-
//start delete ddl worker
go pci.DeleteDDLPeriodicallyRoutine()
@@ -318,13 +271,7 @@ func (pci *PDCallbackImpl) Stop(kv storage.Storage) error {
if pci.enableLog {
logutil.Infof("-------PDC Stop close channel\n")
}
- closeChan := func() {
- pci.mutex.Lock()
- defer pci.mutex.Unlock()
- close(pci.msgChan)
- pci.msgChan = nil
- }
- pci.closeOnce.Do(closeChan)
+
if pci.enableLog {
logutil.Infof("-------PDC Stop close channel done\n")
}
@@ -371,63 +318,15 @@ func (pci *PDCallbackImpl) HandleHeartbeatReq(id uint64, data []byte, kv storage
When a server leaves, its info will exist in the server_info in next several epochs.
It has no side effects also.
*/
- if pci.persistTimeout[1].isTimeout() {
- var keys [][]byte = nil
- var b2 [][]byte = nil
- var kk [8]byte
- for k, v := range pci.serverInfo {
- minRE = MinUint64(minRE, v)
-
- var k_buf []byte = nil
- k_buf = append(k_buf, SERVER_PREFIX...)
-
- binary.BigEndian.PutUint64(kk[:], k)
- k_buf = append(k_buf, kk[:]...)
- v_buf := make([]byte, 8)
- binary.BigEndian.PutUint64(v_buf, v)
-
- keys = append(keys, k_buf)
- b2 = append(b2, v_buf)
- }
- if pci.enableLog {
- logutil.Infof("-------PDC HandleHeartbeatReq server_info to channel\n")
- }
- //step 3: put these values into the worker
- pci.msgChan <- &ChanMessage{
- tp: MSG_TYPE_SERVER_INFO,
- body: nil,
- body2: keys,
- body3: b2,
- }
- if pci.enableLog {
- logutil.Infof("-------PDC HandleHeartbeatReq enter server_info to channel done\n")
- }
- } else {
- for _, v := range pci.serverInfo {
- minRE = MinUint64(minRE, v)
- }
+ for _, v := range pci.serverInfo {
+ minRE = MinUint64(minRE, v)
}
pci.cluster_minimumRemovableEpoch = minRE
//logutil.Infof("node %d maxre %d minRe %d \n",id,maxre,minRE)
- if pci.persistTimeout[2].isTimeout() {
- buf := make([]byte, 8)
- binary.BigEndian.PutUint64(buf, pci.cluster_minimumRemovableEpoch)
- if pci.enableLog {
- logutil.Infof("-------PDC HandleHeartbeatReq epoch to channel\n")
- }
- pci.msgChan <- &ChanMessage{
- tp: MSG_TYPE_MINI_REM_EPOCH,
- body: buf,
- }
- if pci.enableLog {
- logutil.Infof("-------PDC HandleHeartbeatReq epoch to channel done\n")
- }
- }
-
//step 4: response to the server
var rsp []byte = make([]byte, 16)
ce := atomic.LoadUint64(&pci.cluster_epoch)
@@ -445,68 +344,12 @@ func (pci *PDCallbackImpl) IncrementEpochPeriodlyRoutine(period int) {
for pci.timerClose.IsOpened() {
//step 1: incr cluster_epoch
- ce := atomic.AddUint64(&pci.cluster_epoch, 1)
-
- if pci.persistTimeout[0].isTimeout() {
- buf := make([]byte, 8)
- binary.BigEndian.PutUint64(buf, ce)
- if pci.enableLog {
- logutil.Infof("------- gen epoch %d\n", ce)
- }
- //step 2: put these values into the worker
- pci.msgChan <- &ChanMessage{
- tp: MSG_TYPE_CLUSTER_EPOCH,
- body: buf,
- }
- }
+ atomic.AddUint64(&pci.cluster_epoch, 1)
time.Sleep(time.Duration(pci.periodOfTimer) * time.Second)
}
}
-/*
-store the message into the kv
-*/
-func (pci *PDCallbackImpl) PersistentWorkerRoutine(msgChan chan *ChanMessage, kv storage.Storage) {
- pci.persistClose.Open()
-
- //get the message
- //put the body into kv
- pci.mutex.Lock()
- defer pci.mutex.Unlock()
- for msg := range pci.msgChan {
- switch msg.tp {
- case MSG_TYPE_CLUSTER_EPOCH:
- if pci.enableLog {
- logutil.Infof("-------cluster epoch %v \n", msg.body)
- }
- err := kv.PutCustomData(CLUSTER_EPOCH_KEY, msg.body)
- if err != nil {
- logutil.Errorf(err.Error())
- }
-
- case MSG_TYPE_SERVER_INFO:
- if pci.enableLog {
- logutil.Infof("-------server info %v \n", msg.body2)
- }
- //save kv<server,maximumRemovableEpoch>
- err := kv.BatchPutCustomData(msg.body2, msg.body3)
- if err != nil {
- logutil.Errorf(err.Error())
- }
- case MSG_TYPE_MINI_REM_EPOCH:
- if pci.enableLog {
- logutil.Infof("-------minimum removable epoch %v \n", msg.body)
- }
- //save minimumRemovableEpoch
- err := kv.PutCustomData(MINI_REM_EPOCH_KEY, msg.body)
- if err != nil {
- logutil.Errorf(err.Error())
- }
- }
- }
-}
-
/**
pd leader start the routine.
*/