Skip to content
Snippets Groups Projects
Commit ef563fda authored by yangxuan's avatar yangxuan Committed by zhenshan.cao
Browse files

Updata flushe procedure


Signed-off-by: default avataryangxuan <xuan.yang@zilliz.com>
parent c53afee6
No related branches found
No related tags found
No related merge requests found
......@@ -284,8 +284,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
return status, errors.New(status.GetReason())
}
ddlFlushedCh := make(chan bool)
dmlFlushedCh := make(chan bool)
ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta)
dmlFlushedCh := make(chan []*datapb.ID2PathList)
flushmsg := &flushMsg{
msgID: req.Base.MsgID,
......@@ -299,15 +299,58 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
flushCh <- flushmsg
// GOOSE TODO get binlog paths.
waitReceive := func(wg *sync.WaitGroup, flushedCh <-chan bool, req *datapb.SaveBinlogPathsRequest) {
// waitReceive := func(wg *sync.WaitGroup, flushedCh <-chan bool, req *datapb.SaveBinlogPathsRequest) {
// defer wg.Done()
// select {
// case <-time.After(300 * time.Second):
// return
// case isFlushed := <-flushedCh:
// if isFlushed {
// log.Debug("Yeah! It's safe to notify dataservice")
// }
// }
// }
waitReceive := func(wg *sync.WaitGroup, flushedCh interface{}, req *datapb.SaveBinlogPathsRequest) {
defer wg.Done()
select {
case <-time.After(300 * time.Second):
return
case isFlushed := <-flushedCh:
if isFlushed {
log.Debug("Yeah! It's safe to notify dataservice")
switch flushedCh.(type) {
case chan []*datapb.ID2PathList:
select {
case <-time.After(300 * time.Second):
return
case meta := <-flushedCh.(chan []*datapb.ID2PathList):
if meta == nil {
log.Info("Dml messages flush failed!")
// Modify req to confirm failure
return
}
// Modify req with valid dml binlog paths
log.Info("Insert messeges flush done!", zap.Any("Binlog paths", meta))
}
case chan []*datapb.DDLBinlogMeta:
select {
case <-time.After(300 * time.Second):
return
case meta := <-flushedCh.(chan []*datapb.DDLBinlogMeta):
if meta == nil {
log.Info("Ddl messages flush failed!")
// Modify req to confirm failure
return
}
if len(meta) == 0 {
log.Info("Ddl messages flush Done")
// Modify req with empty ddl binlog paths
return
}
// Modify req with valid ddl binlog paths
log.Info("Ddl messages flush done!", zap.Any("Binlog paths", meta))
}
default:
log.Error("Not supported type")
}
}
......
......@@ -159,7 +159,7 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
go ddNode.flushComplete(binlogMetaCh, collID, fmsg.ddlFlushedCh)
} else {
// GOOSE TODO newest position
fmsg.ddlFlushedCh <- true
fmsg.ddlFlushedCh <- make([]*datapb.DDLBinlogMeta, 0)
}
log.Debug(".. notifying insertbuffer ...")
......@@ -176,9 +176,10 @@ func (ddNode *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return []Msg{res}
}
func (ddNode *ddNode) flushComplete(binlogMetaCh <-chan *datapb.DDLBinlogMeta, collID UniqueID, ddlFlushedCh chan<- bool) {
func (ddNode *ddNode) flushComplete(binlogMetaCh <-chan *datapb.DDLBinlogMeta, collID UniqueID, ddlFlushedCh chan<- []*datapb.DDLBinlogMeta) {
binlogMeta := <-binlogMetaCh
if binlogMeta == nil {
ddlFlushedCh <- nil
return
}
......@@ -188,7 +189,7 @@ func (ddNode *ddNode) flushComplete(binlogMetaCh <-chan *datapb.DDLBinlogMeta, c
log.Error("Save binlog meta to etcd Wrong", zap.Error(err))
}
ddlFlushedCh <- true
ddlFlushedCh <- []*datapb.DDLBinlogMeta{binlogMeta}
// TODO remove above
// ddlFlushCh <- binlogMetaCh
}
......
......@@ -33,6 +33,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
......@@ -688,14 +689,15 @@ func (ibNode *insertBufferNode) bufferAutoFlushPaths(wait <-chan map[UniqueID]st
return ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path)
}
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[UniqueID]string, dmlFlushedCh chan<- bool) {
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[UniqueID]string, dmlFlushedCh chan<- []*datapb.ID2PathList) {
field2Path := <-wait
if field2Path == nil {
dmlFlushedCh <- nil
return
}
dmlFlushedCh <- true
// dmlFlushedCh <- true
// TODO Call DataService RPC SaveBinlogPaths
// TODO GetBufferedAutoFlushBinlogPaths
......@@ -703,6 +705,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[Un
bufferField2Paths, err := ibNode.replica.getBufferPaths(segID)
if err != nil {
log.Error("Flush failed ... cannot get buffered paths", zap.Error(err))
dmlFlushedCh <- nil
}
// GOOSE TODO remove the below
......@@ -710,9 +713,21 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[Un
err = ibNode.flushMeta.SaveSegmentBinlogMetaTxn(segID, bufferField2Paths)
if err != nil {
log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
dmlFlushedCh <- nil
return
}
binlogPaths := make([]*datapb.ID2PathList, 0, len(bufferField2Paths))
for k, paths := range bufferField2Paths {
binlogPaths = append(binlogPaths, &datapb.ID2PathList{
ID: k,
Paths: paths,
})
}
dmlFlushedCh <- binlogPaths
log.Debug(".. Segment flush completed ..")
ibNode.replica.setIsFlushed(segID)
ibNode.updateSegStatistics([]UniqueID{segID})
......
......@@ -22,10 +22,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
......@@ -71,18 +74,19 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
iBNode := newInsertBufferNode(ctx, newBinlogMeta(), replica, msFactory, NewAllocatorFactory())
ddlFlushedCh := make(chan bool)
dmlFlushedCh := make(chan bool)
ddlFlushedCh := make(chan []*datapb.DDLBinlogMeta)
dmlFlushedCh := make(chan []*datapb.ID2PathList)
inMsg := genInsertMsg(ddlFlushedCh, dmlFlushedCh)
var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]flowgraph.Msg{iMsg})
isflushed := <-dmlFlushedCh
assert.True(t, isflushed)
assert.NotNil(t, isflushed)
log.Debug("DML binlog paths", zap.Any("paths", isflushed))
}
func genInsertMsg(ddlFlushedCh, dmlFlushedCh chan<- bool) insertMsg {
func genInsertMsg(ddlFlushedCh chan<- []*datapb.DDLBinlogMeta, dmlFlushedCh chan<- []*datapb.ID2PathList) insertMsg {
timeRange := TimeRange{
timestampMin: 0,
......
......@@ -13,6 +13,7 @@ package datanode
import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
......@@ -68,8 +69,10 @@ type flushMsg struct {
timestamp Timestamp
segmentID UniqueID
collectionID UniqueID
ddlFlushedCh chan<- bool
dmlFlushedCh chan<- bool
// ddlFlushedCh chan<- bool
// dmlFlushedCh chan<- bool
ddlFlushedCh chan<- []*datapb.DDLBinlogMeta
dmlFlushedCh chan<- []*datapb.ID2PathList
}
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
......
......@@ -254,7 +254,7 @@ message SaveBinlogPathsRequest {
common.MsgBase base = 1;
int64 segmentID = 2;
int64 collectionID = 3;
ID2PathList field2BinlogPaths = 4;
repeated ID2PathList field2BinlogPaths = 4;
repeated DDLBinlogMeta ddlBinlogPaths = 5;
PositionPair dml_position = 6;
PositionPair ddl_position =7;
......
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment