diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go
index 7a789df98258b7ff062d102c7dd68e3a46b71e57..4abf1aff91811bae0ca09e72821072f03f718f13 100644
--- a/internal/writenode/data_sync_service_test.go
+++ b/internal/writenode/data_sync_service_test.go
@@ -3,7 +3,6 @@ package writenode
import (
"context"
"encoding/binary"
- "fmt"
"math"
"strconv"
"testing"
@@ -343,8 +342,6 @@ func newMeta() {
collBytes := proto.MarshalTextString(&collection)
kvClient.Save("/collection/"+strconv.FormatInt(collection.ID, 10), collBytes)
- value, _ := kvClient.Load("/collection/1")
- fmt.Println("========value: ", value)
segSch := etcdpb.SegmentMeta{
SegmentID: UniqueID(1),
diff --git a/internal/writenode/flow_graph_dd_node.go b/internal/writenode/flow_graph_dd_node.go
index 89c0bd61a118366a656347eeebb09ccad13e8ac3..96bb0ebbba2bfca0239069c7e9250beff9274ce2 100644
--- a/internal/writenode/flow_graph_dd_node.go
+++ b/internal/writenode/flow_graph_dd_node.go
@@ -38,7 +38,7 @@ type ddData struct {
}
type ddBuffer struct {
- ddData map[UniqueID]*ddData
+ ddData map[UniqueID]*ddData // collection ID
maxSize int
}
@@ -88,6 +88,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
},
+ flushMessages: make([]*msgstream.FlushMsg, 0),
}
ddNode.ddMsg = &ddMsg
@@ -98,6 +99,8 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
return tsMessages[i].BeginTs() < tsMessages[j].BeginTs()
})
+ var flush bool = false
+ var flushSegID UniqueID
// do dd tasks
for _, msg := range tsMessages {
switch msg.Type() {
@@ -109,13 +112,19 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg))
case internalPb.MsgType_kDropPartition:
ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg))
+ case internalPb.MsgType_kFlush:
+ fMsg := msg.(*msgstream.FlushMsg)
+ flush = true
+ flushSegID = fMsg.SegmentID
+ ddMsg.flushMessages = append(ddMsg.flushMessages, fMsg)
default:
log.Println("Non supporting message type:", msg.Type())
}
}
// generate binlog
- if ddNode.ddBuffer.full() {
+ if ddNode.ddBuffer.full() || flush {
+ log.Println(". dd buffer full or receive Flush msg ...")
ddCodec := &storage.DataDefinitionCodec{}
for collectionID, data := range ddNode.ddBuffer.ddData {
// buffer data to binlog
@@ -135,6 +144,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
log.Println("illegal ddBuffer, failed to save binlog")
continue
} else {
+ log.Println(".. dd buffer flushing ...")
// Blob key example:
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
@@ -163,11 +173,35 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
log.Println(err)
}
log.Println("save dd binlog, key = ", ddKey)
+
+ ddlFlushMsg := &ddlFlushSyncMsg{
+ flushCompleted: false,
+ ddlBinlogPathMsg: ddlBinlogPathMsg{
+ collID: collectionID,
+ paths: []string{timestampKey, ddKey},
+ },
+ }
+
+ ddNode.outCh <- ddlFlushMsg
}
+
}
// clear buffer
ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData)
- log.Println("dd buffer flushed")
+ }
+
+ if flush {
+
+ log.Println(".. manual flush completed ...")
+ ddlFlushMsg := &ddlFlushSyncMsg{
+ flushCompleted: true,
+ ddlBinlogPathMsg: ddlBinlogPathMsg{
+ segID: flushSegID,
+ },
+ }
+
+ ddNode.outCh <- ddlFlushMsg
+
}
var res Msg = ddNode.ddMsg
diff --git a/internal/writenode/flow_graph_dd_node_test.go b/internal/writenode/flow_graph_dd_node_test.go
index 2e5fabeb597dae44790cae95c12d2ee486d1ce7c..25de3697eae6eea564feabea3a41469417029de7 100644
--- a/internal/writenode/flow_graph_dd_node_test.go
+++ b/internal/writenode/flow_graph_dd_node_test.go
@@ -5,6 +5,10 @@ import (
"testing"
"time"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
@@ -13,6 +17,7 @@ import (
)
func TestFlowGraphDDNode_Operate(t *testing.T) {
+ newMeta()
const ctxTimeInMillisecond = 2000
const closeWithDeadline = false
var ctx context.Context
@@ -26,9 +31,22 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
ctx = context.Background()
}
+ ddChan := make(chan *ddlFlushSyncMsg, 10)
+ defer close(ddChan)
+ insertChan := make(chan *insertFlushSyncMsg, 10)
+ defer close(insertChan)
+
+ testPath := "/test/writenode/root/meta"
+ err := clearEtcd(testPath)
+ require.NoError(t, err)
+ Params.MetaRootPath = testPath
+ fService := newFlushSyncService(ctx, ddChan, insertChan)
+ assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath("."))
+ go fService.start()
+
Params.FlushDdBufSize = 4
- ddNode := newDDNode(ctx, nil)
+ ddNode := newDDNode(ctx, ddChan)
colID := UniqueID(0)
colName := "col-test-0"
@@ -114,11 +132,25 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
DropPartitionRequest: dropPartitionReq,
}
+ flushMsg := msgstream.FlushMsg{
+ BaseMsg: msgstream.BaseMsg{
+ BeginTimestamp: Timestamp(5),
+ EndTimestamp: Timestamp(5),
+ HashValues: []uint32{uint32(0)},
+ },
+ FlushMsg: internalpb.FlushMsg{
+ MsgType: internalpb.MsgType_kFlush,
+ SegmentID: 1,
+ Timestamp: Timestamp(6),
+ },
+ }
+
tsMessages := make([]msgstream.TsMsg, 0)
tsMessages = append(tsMessages, msgstream.TsMsg(&createColMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
+ tsMessages = append(tsMessages, msgstream.TsMsg(&flushMsg))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3))
var inMsg Msg = msgStream
ddNode.Operate([]*Msg{&inMsg})
diff --git a/internal/writenode/flow_graph_filter_dm_node.go b/internal/writenode/flow_graph_filter_dm_node.go
index 312c79725c0ef1db5c0abd8ee4e25fa869f48a4d..9bac1b46ecdf9146f5d0df5b2667f7c83873ad08 100644
--- a/internal/writenode/flow_graph_filter_dm_node.go
+++ b/internal/writenode/flow_graph_filter_dm_node.go
@@ -46,6 +46,16 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
timestampMax: msgStreamMsg.TimestampMax(),
},
}
+
+ for _, fmsg := range ddMsg.flushMessages {
+ switch fmsg.Type() {
+ case internalPb.MsgType_kFlush:
+ iMsg.flushMessages = append(iMsg.flushMessages, fmsg)
+ default:
+ log.Println("Non supporting message type:", fmsg.Type())
+ }
+ }
+
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case internalPb.MsgType_kInsert:
@@ -53,8 +63,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
- case internalPb.MsgType_kFlush:
- iMsg.flushMessages = append(iMsg.flushMessages, msg.(*msgstream.FlushMsg))
// case internalPb.MsgType_kDelete:
// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
default:
diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go
index e676f5a2e0afdee2118414ebdd0676e3b97fd96a..9c4fcf08e8e60bc022616d1220165a71140baf08 100644
--- a/internal/writenode/flow_graph_insert_buffer_node.go
+++ b/internal/writenode/flow_graph_insert_buffer_node.go
@@ -62,13 +62,13 @@ func (ib *insertBuffer) size(segmentID UniqueID) int {
maxSize := 0
for _, data := range idata.Data {
- fdata, ok := data.(storage.FloatVectorFieldData)
- if ok && len(fdata.Data) > maxSize {
+ fdata, ok := data.(*storage.FloatVectorFieldData)
+ if ok && fdata.NumRows > maxSize {
maxSize = len(fdata.Data)
}
- bdata, ok := data.(storage.BinaryVectorFieldData)
- if ok && len(bdata.Data) > maxSize {
+ bdata, ok := data.(*storage.BinaryVectorFieldData)
+ if ok && bdata.NumRows > maxSize {
maxSize = len(bdata.Data)
}
@@ -77,7 +77,6 @@ func (ib *insertBuffer) size(segmentID UniqueID) int {
}
func (ib *insertBuffer) full(segmentID UniqueID) bool {
- // GOOSE TODO
return ib.size(segmentID) >= ib.maxSize
}
@@ -131,23 +130,10 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
tsData.NumRows += len(msg.Timestamps)
// 1.1 Get CollectionMeta from etcd
- segMeta := etcdpb.SegmentMeta{}
-
- key := path.Join(SegmentPrefix, strconv.FormatInt(currentSegID, 10))
- value, _ := ibNode.kvClient.Load(key)
- err := proto.UnmarshalText(value, &segMeta)
+ segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
- log.Println("Load segMeta error")
- // TODO: add error handling
- }
-
- collMeta := etcdpb.CollectionMeta{}
- key = path.Join(CollectionPrefix, strconv.FormatInt(segMeta.GetCollectionID(), 10))
- value, _ = ibNode.kvClient.Load(key)
- err = proto.UnmarshalText(value, &collMeta)
- if err != nil {
- log.Println("Load collMeta error")
- // TODO: add error handling
+ // GOOSE TODO add error handler
+ log.Println("Get meta wrong")
}
// 1.2 Get Fields
@@ -378,6 +364,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// 1.5 if full
// 1.5.1 generate binlogs
if ibNode.insertBuffer.full(currentSegID) {
+ log.Println("Insert Buffer full, auto flushing ...")
// partitionTag -> partitionID
partitionTag := msg.GetPartitionTag()
partitionID, err := typeutil.Hash32String(partitionTag)
@@ -385,20 +372,17 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
log.Println("partitionTag to partitionID Wrong")
}
- inCodec := storage.NewInsertCodec(&collMeta)
+ inCodec := storage.NewInsertCodec(collMeta)
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
- for _, v := range binLogs {
- log.Println("key ", v.Key, "- value ", v.Value)
- }
+
if err != nil {
log.Println("generate binlog wrong")
}
// clear buffer
- log.Println("=========", binLogs)
delete(ibNode.insertBuffer.insertData, currentSegID)
// 1.5.2 binLogs -> minIO/S3
@@ -420,14 +404,117 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
log.Println("Save to MinIO failed")
// GOOSE TODO error handle
}
+ log.Println(".. Saving binlogs to MinIO ...")
+
+ fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
+ if err != nil {
+ log.Println("string to fieldID wrong")
+ // GOOSE TODO error handle
+ }
+
+ inBinlogMsg := &insertFlushSyncMsg{
+ flushCompleted: false,
+ insertBinlogPathMsg: insertBinlogPathMsg{
+ ts: iMsg.timeRange.timestampMax,
+ segID: currentSegID,
+ fieldID: int32(fieldID),
+ paths: []string{key},
+ },
+ }
+
+ log.Println(".. Appending binlog paths ...")
+ ibNode.outCh <- inBinlogMsg
}
+
}
}
-
// iMsg is Flush() msg from master
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
- // Return
+ for _, msg := range iMsg.flushMessages {
+ currentSegID := msg.GetSegmentID()
+ flushTs := msg.GetTimestamp()
+
+ log.Printf(". Receiving flush message segID(%v)...", currentSegID)
+
+ if ibNode.insertBuffer.size(currentSegID) > 0 {
+ log.Println(".. Buffer not empty, flushing ...")
+ segMeta, collMeta, err := ibNode.getMeta(currentSegID)
+ if err != nil {
+ // GOOSE TODO add error handler
+ log.Println("Get meta wrong")
+ }
+ inCodec := storage.NewInsertCodec(collMeta)
+
+ // partitionTag -> partitionID
+ partitionTag := segMeta.GetPartitionTag()
+ partitionID, err := typeutil.Hash32String(partitionTag)
+ if err != nil {
+ // GOOSE TODO add error handler
+ log.Println("partitionTag to partitionID Wrong")
+ }
+ // buffer data to binlogs
+ binLogs, err := inCodec.Serialize(partitionID,
+ currentSegID, ibNode.insertBuffer.insertData[currentSegID])
+ if err != nil {
+ log.Println("generate binlog wrong")
+ }
+
+ // clear buffer
+ delete(ibNode.insertBuffer.insertData, currentSegID)
+
+ // binLogs -> minIO/S3
+ collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10)
+ partitionIDStr := strconv.FormatInt(partitionID, 10)
+ segIDStr := strconv.FormatInt(currentSegID, 10)
+ keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
+
+ for _, blob := range binLogs {
+ uid, err := ibNode.idAllocator.AllocOne()
+ if err != nil {
+ log.Println("Allocate Id failed")
+ // GOOSE TODO error handler
+ }
+
+ key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
+ err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
+ if err != nil {
+ log.Println("Save to MinIO failed")
+ // GOOSE TODO error handler
+ }
+
+ fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
+ if err != nil {
+ log.Println("string to fieldID wrong")
+ // GOOSE TODO error handler
+ }
+
+ // Append binlogs
+ inBinlogMsg := &insertFlushSyncMsg{
+ flushCompleted: false,
+ insertBinlogPathMsg: insertBinlogPathMsg{
+ ts: flushTs,
+ segID: currentSegID,
+ fieldID: int32(fieldID),
+ paths: []string{key},
+ },
+ }
+ ibNode.outCh <- inBinlogMsg
+ }
+ }
+
+ // Flushed
+ log.Println(".. Flush finished ...")
+ inBinlogMsg := &insertFlushSyncMsg{
+ flushCompleted: true,
+ insertBinlogPathMsg: insertBinlogPathMsg{
+ ts: flushTs,
+ segID: currentSegID,
+ },
+ }
+
+ ibNode.outCh <- inBinlogMsg
+ }
}
if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
@@ -437,6 +524,27 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
return nil
}
+func (ibNode *insertBufferNode) getMeta(segID UniqueID) (*etcdpb.SegmentMeta, *etcdpb.CollectionMeta, error) {
+
+ segMeta := &etcdpb.SegmentMeta{}
+
+ key := path.Join(SegmentPrefix, strconv.FormatInt(segID, 10))
+ value, _ := ibNode.kvClient.Load(key)
+ err := proto.UnmarshalText(value, segMeta)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ collMeta := &etcdpb.CollectionMeta{}
+ key = path.Join(CollectionPrefix, strconv.FormatInt(segMeta.GetCollectionID(), 10))
+ value, _ = ibNode.kvClient.Load(key)
+ err = proto.UnmarshalText(value, collMeta)
+ if err != nil {
+ return nil, nil, err
+ }
+ return segMeta, collMeta, nil
+}
+
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.TimeTickMsg{
@@ -506,6 +614,10 @@ func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg) *i
if err != nil {
panic(err)
}
+ err = idAllocator.Start()
+ if err != nil {
+ panic(err)
+ }
wTt := msgstream.NewPulsarMsgStream(ctx, 1024) //input stream, write node time tick
wTt.SetPulsarClient(Params.PulsarAddress)
diff --git a/internal/writenode/flow_graph_insert_buffer_node_test.go b/internal/writenode/flow_graph_insert_buffer_node_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..b9f1b5ae68b5d4880efc040876f2048c3b9678f3
--- /dev/null
+++ b/internal/writenode/flow_graph_insert_buffer_node_test.go
@@ -0,0 +1,187 @@
+package writenode
+
+import (
+ "context"
+ "encoding/binary"
+ "math"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
+
+ "github.com/zilliztech/milvus-distributed/internal/msgstream"
+ "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+ "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
+ "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
+)
+
+func TestFlowGraphInputBufferNode_Operate(t *testing.T) {
+ const ctxTimeInMillisecond = 2000
+ const closeWithDeadline = false
+ var ctx context.Context
+
+ if closeWithDeadline {
+ var cancel context.CancelFunc
+ d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
+ ctx, cancel = context.WithDeadline(context.Background(), d)
+ defer cancel()
+ } else {
+ ctx = context.Background()
+ }
+
+ ddChan := make(chan *ddlFlushSyncMsg, 10)
+ defer close(ddChan)
+ insertChan := make(chan *insertFlushSyncMsg, 10)
+ defer close(insertChan)
+
+ testPath := "/test/writenode/root/meta"
+ err := clearEtcd(testPath)
+ require.NoError(t, err)
+ Params.MetaRootPath = testPath
+ fService := newFlushSyncService(ctx, ddChan, insertChan)
+ assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath("."))
+ go fService.start()
+
+ // Params.FlushInsertBufSize = 2
+ iBNode := newInsertBufferNode(ctx, insertChan)
+
+ newMeta()
+ inMsg := genInsertMsg()
+ var iMsg flowgraph.Msg = &inMsg
+ iBNode.Operate([]*flowgraph.Msg{&iMsg})
+}
+
+func genInsertMsg() insertMsg {
+ // test data generate
+ // GOOSE TODO orgnize
+ const DIM = 2
+ const N = 1
+ var rawData []byte
+
+ // Float vector
+ var fvector = [DIM]float32{1, 2}
+ for _, ele := range fvector {
+ buf := make([]byte, 4)
+ binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
+ rawData = append(rawData, buf...)
+ }
+
+ // Binary vector
+ // Dimension of binary vector is 32
+ var bvector = [4]byte{255, 255, 255, 0}
+ for _, ele := range bvector {
+ bs := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bs, uint32(ele))
+ rawData = append(rawData, bs...)
+ }
+
+ // Bool
+ bb := make([]byte, 4)
+ var fieldBool = true
+ var fieldBoolInt uint32
+ if fieldBool {
+ fieldBoolInt = 1
+ } else {
+ fieldBoolInt = 0
+ }
+
+ binary.LittleEndian.PutUint32(bb, fieldBoolInt)
+ rawData = append(rawData, bb...)
+
+ // int8
+ var dataInt8 int8 = 100
+ bint8 := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bint8, uint32(dataInt8))
+ rawData = append(rawData, bint8...)
+
+ // int16
+ var dataInt16 int16 = 200
+ bint16 := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bint16, uint32(dataInt16))
+ rawData = append(rawData, bint16...)
+
+ // int32
+ var dataInt32 int32 = 300
+ bint32 := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bint32, uint32(dataInt32))
+ rawData = append(rawData, bint32...)
+
+ // int64
+ var dataInt64 int64 = 300
+ bint64 := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bint64, uint32(dataInt64))
+ rawData = append(rawData, bint64...)
+
+ // float32
+ var datafloat float32 = 1.1
+ bfloat32 := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bfloat32, math.Float32bits(datafloat))
+ rawData = append(rawData, bfloat32...)
+
+ // float64
+ var datafloat64 float64 = 2.2
+ bfloat64 := make([]byte, 8)
+ binary.LittleEndian.PutUint64(bfloat64, math.Float64bits(datafloat64))
+ rawData = append(rawData, bfloat64...)
+
+ timeRange := TimeRange{
+ timestampMin: 0,
+ timestampMax: math.MaxUint64,
+ }
+
+ var iMsg = &insertMsg{
+ insertMessages: make([]*msgstream.InsertMsg, 0),
+ flushMessages: make([]*msgstream.FlushMsg, 0),
+ timeRange: TimeRange{
+ timestampMin: timeRange.timestampMin,
+ timestampMax: timeRange.timestampMax,
+ },
+ }
+
+ // messages generate
+ const MSGLENGTH = 1
+ // insertMessages := make([]msgstream.TsMsg, 0)
+ for i := 0; i < MSGLENGTH; i++ {
+ var msg = &msgstream.InsertMsg{
+ BaseMsg: msgstream.BaseMsg{
+ HashValues: []uint32{
+ uint32(i),
+ },
+ },
+ InsertRequest: internalpb.InsertRequest{
+ MsgType: internalpb.MsgType_kInsert,
+ ReqID: UniqueID(0),
+ CollectionName: "coll1",
+ PartitionTag: "default",
+ SegmentID: UniqueID(1),
+ ChannelID: UniqueID(0),
+ ProxyID: UniqueID(0),
+ Timestamps: []Timestamp{Timestamp(i + 1000)},
+ RowIDs: []UniqueID{UniqueID(i)},
+
+ RowData: []*commonpb.Blob{
+ {Value: rawData},
+ },
+ },
+ }
+ iMsg.insertMessages = append(iMsg.insertMessages, msg)
+ }
+
+ var fmsg msgstream.FlushMsg = msgstream.FlushMsg{
+ BaseMsg: msgstream.BaseMsg{
+ HashValues: []uint32{
+ uint32(10),
+ },
+ },
+ FlushMsg: internalpb.FlushMsg{
+ MsgType: internalpb.MsgType_kFlush,
+ SegmentID: UniqueID(1),
+ Timestamp: Timestamp(2000),
+ },
+ }
+ iMsg.flushMessages = append(iMsg.flushMessages, &fmsg)
+ return *iMsg
+
+}
diff --git a/internal/writenode/flow_graph_message.go b/internal/writenode/flow_graph_message.go
index 3364b36869024858c665a23c4ec641112ae93726..147822e7c5035b8574817dc5911c845727a92078 100644
--- a/internal/writenode/flow_graph_message.go
+++ b/internal/writenode/flow_graph_message.go
@@ -21,6 +21,7 @@ type (
collectionRecords map[string][]metaOperateRecord
// TODO: use partition id
partitionRecords map[string][]metaOperateRecord
+ flushMessages []*msgstream.FlushMsg
timeRange TimeRange
}
diff --git a/internal/writenode/flush_sync_service.go b/internal/writenode/flush_sync_service.go
index f90db89ccd655b94843111deda384a376c02732b..7b6587b711c5d79c3bde04573e889d2024c7b287 100644
--- a/internal/writenode/flush_sync_service.go
+++ b/internal/writenode/flush_sync_service.go
@@ -12,10 +12,10 @@ type (
flushSyncService struct {
ctx context.Context
metaTable *metaTable
- ddChan chan *ddlFlushSyncMsg // TODO GOOSE Init Size??
- insertChan chan *insertFlushSyncMsg // TODO GOOSE Init Size??
- ddFlushed map[UniqueID]bool // Segment ID
- insertFlushed map[UniqueID]bool // Segment ID
+ ddChan chan *ddlFlushSyncMsg
+ insertChan chan *insertFlushSyncMsg
+ ddFlushed map[UniqueID]bool // Segment ID
+ insertFlushed map[UniqueID]bool // Segment ID
}
)
diff --git a/internal/writenode/flush_sync_service_test.go b/internal/writenode/flush_sync_service_test.go
index f279fc21abb9e6e26d4aac90648801b9b0ae7d0b..3fe541a610d8dd15dfe09d24ff9b32cdec317712 100644
--- a/internal/writenode/flush_sync_service_test.go
+++ b/internal/writenode/flush_sync_service_test.go
@@ -81,12 +81,12 @@ func TestFlushSyncService_Start(t *testing.T) {
for _, msg := range ddMsgs {
ddChan <- msg
- time.Sleep(time.Millisecond * 10)
+ time.Sleep(time.Millisecond * 50)
}
for _, msg := range insertMsgs {
insertChan <- msg
- time.Sleep(time.Millisecond * 10)
+ time.Sleep(time.Millisecond * 50)
}
ret, err := fService.metaTable.getSegBinlogPaths(SegID)
diff --git a/internal/writenode/write_node.go b/internal/writenode/write_node.go
index 137f033c74d2beaad5f5c9ac726f68e7521bea4e..606b8af013b39bfb1301493f704d9ee0fe397cff 100644
--- a/internal/writenode/write_node.go
+++ b/internal/writenode/write_node.go
@@ -25,8 +25,10 @@ func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode {
func (node *WriteNode) Start() {
- ddChan := make(chan *ddlFlushSyncMsg, 5)
- insertChan := make(chan *insertFlushSyncMsg, 5)
+ // TODO GOOSE Init Size??
+ chanSize := 100
+ ddChan := make(chan *ddlFlushSyncMsg, chanSize)
+ insertChan := make(chan *insertFlushSyncMsg, chanSize)
node.flushSyncService = newFlushSyncService(node.ctx, ddChan, insertChan)
node.dataSyncService = newDataSyncService(node.ctx, ddChan, insertChan)