diff --git a/.gitignore b/.gitignore index 57c914908890e6171c60f107321fc2b31bb50f9f..8ace3c1f3c6f16d23ca31f4cffac0bd1c7822c59 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ cwrapper_build **/compile_commands.json **/.lint typescript +**/.pytest_cache/ diff --git a/configs/advanced/write_node.yaml b/configs/advanced/write_node.yaml index 619ece9330a959298bdf0a8fe2629348ffbf02d0..31c57e200955ba92443f6b30795eaa9e60da86dc 100644 --- a/configs/advanced/write_node.yaml +++ b/configs/advanced/write_node.yaml @@ -36,5 +36,5 @@ writeNode: flush: # max buffer size to flush - insertBufSize: 20 + insertBufSize: 500 ddBufSize: 20 diff --git a/internal/allocator/id.go b/internal/allocator/id.go index 65890d92c8c18a62737125ff8d421a8283946250..7ef275e12da6e38805c9cc9196f6a385133cad5f 100644 --- a/internal/allocator/id.go +++ b/internal/allocator/id.go @@ -2,7 +2,6 @@ package allocator import ( "context" - "fmt" "log" "time" @@ -46,7 +45,6 @@ func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error } func (ia *IDAllocator) syncID() bool { - fmt.Println("syncID") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) req := &internalpb.IDRequest{ PeerID: ia.PeerID, diff --git a/internal/master/timesync.go b/internal/master/timesync.go index 9ea8b35a9dac7873ef75f709cff985ab52b6dcd7..49c388a8dd9249349777a99a3f54fd9b5a2ea472 100644 --- a/internal/master/timesync.go +++ b/internal/master/timesync.go @@ -163,7 +163,6 @@ func (ttBarrier *hardTimeTickBarrier) Start() error { // Suppose ttmsg.Timestamp from stream is always larger than the previous one, // that `ttmsg.Timestamp > oldT` ttmsg := timetickmsg.(*ms.TimeTickMsg) - log.Printf("[hardTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp) oldT, ok := ttBarrier.peer2Tt[ttmsg.PeerID] if !ok { diff --git a/internal/writenode/flow_graph_dd_node.go b/internal/writenode/flow_graph_dd_node.go index 96bb0ebbba2bfca0239069c7e9250beff9274ce2..7dd8e1fd433fda223cab267bbae39bb07ce2122c 100644 --- a/internal/writenode/flow_graph_dd_node.go +++ b/internal/writenode/flow_graph_dd_node.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "path" "sort" "strconv" @@ -148,14 +149,14 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { // Blob key example: // ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} // ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} - keyCommon := Params.DdLogRootPath + strconv.FormatInt(collectionID, 10) + "/" + keyCommon := path.Join(Params.DdLogRootPath, strconv.FormatInt(collectionID, 10)) // save ts binlog timestampLogIdx, err := ddNode.idAllocator.AllocOne() if err != nil { log.Println(err) } - timestampKey := keyCommon + binLogs[0].GetKey() + "/" + strconv.FormatInt(timestampLogIdx, 10) + timestampKey := path.Join(keyCommon, binLogs[0].GetKey(), strconv.FormatInt(timestampLogIdx, 10)) err = ddNode.kv.Save(timestampKey, string(binLogs[0].GetValue())) if err != nil { log.Println(err) @@ -167,7 +168,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { if err != nil { log.Println(err) } - ddKey := keyCommon + binLogs[1].GetKey() + "/" + strconv.FormatInt(ddLogIdx, 10) + ddKey := path.Join(keyCommon, binLogs[1].GetKey(), strconv.FormatInt(ddLogIdx, 10)) err = ddNode.kv.Save(ddKey, string(binLogs[1].GetValue())) if err != nil { log.Println(err) diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go index 24c59692109ea6d9db976f2edc85588aa1c7f652..4609a784723ea8485a2904842e6e160d496ac9f8 100644 --- a/internal/writenode/flow_graph_insert_buffer_node.go +++ b/internal/writenode/flow_graph_insert_buffer_node.go @@ -64,12 +64,12 @@ func (ib *insertBuffer) size(segmentID UniqueID) int { for _, data := range idata.Data { fdata, ok := data.(*storage.FloatVectorFieldData) if ok && fdata.NumRows > maxSize { - maxSize = len(fdata.Data) + maxSize = fdata.NumRows } bdata, ok := data.(*storage.BinaryVectorFieldData) if ok && bdata.NumRows > maxSize { - maxSize = len(bdata.Data) + maxSize = bdata.NumRows } } @@ -97,424 +97,433 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { log.Println("Error: type assertion failed for insertMsg") // TODO: add error handling } - for _, task := range iMsg.insertMessages { - if len(task.RowIDs) != len(task.Timestamps) || len(task.RowIDs) != len(task.RowData) { + + // iMsg is insertMsg + // 1. iMsg -> buffer + for _, msg := range iMsg.insertMessages { + if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { log.Println("Error: misaligned messages detected") continue } + currentSegID := msg.GetSegmentID() - // iMsg is insertMsg - // 1. iMsg -> buffer - for _, msg := range iMsg.insertMessages { - currentSegID := msg.GetSegmentID() - - idata, ok := ibNode.insertBuffer.insertData[currentSegID] - if !ok { - idata = &InsertData{ - Data: make(map[UniqueID]storage.FieldData), - } + idata, ok := ibNode.insertBuffer.insertData[currentSegID] + if !ok { + idata = &InsertData{ + Data: make(map[UniqueID]storage.FieldData), } + } - // Timestamps - _, ok = idata.Data[1].(*storage.Int64FieldData) - if !ok { - idata.Data[1] = &storage.Int64FieldData{ - Data: []int64{}, - NumRows: 0, - } - } - tsData := idata.Data[1].(*storage.Int64FieldData) - for _, ts := range msg.Timestamps { - tsData.Data = append(tsData.Data, int64(ts)) + // Timestamps + _, ok = idata.Data[1].(*storage.Int64FieldData) + if !ok { + idata.Data[1] = &storage.Int64FieldData{ + Data: []int64{}, + NumRows: 0, } - tsData.NumRows += len(msg.Timestamps) + } + tsData := idata.Data[1].(*storage.Int64FieldData) + for _, ts := range msg.Timestamps { + tsData.Data = append(tsData.Data, int64(ts)) + } + tsData.NumRows += len(msg.Timestamps) - // 1.1 Get CollectionMeta from etcd - segMeta, collMeta, err := ibNode.getMeta(currentSegID) - if err != nil { - // GOOSE TODO add error handler - log.Println("Get meta wrong") - } + // 1.1 Get CollectionMeta from etcd + segMeta, collMeta, err := ibNode.getMeta(currentSegID) + if err != nil { + // GOOSE TODO add error handler + log.Println("Get meta wrong") + } - // 1.2 Get Fields - var pos = 0 // Record position of blob - for _, field := range collMeta.Schema.Fields { - switch field.DataType { - case schemapb.DataType_VECTOR_FLOAT: - var dim int - for _, t := range field.TypeParams { - if t.Key == "dim" { - dim, err = strconv.Atoi(t.Value) - if err != nil { - log.Println("strconv wrong") - } - break + // 1.2 Get Fields + var pos = 0 // Record position of blob + for _, field := range collMeta.Schema.Fields { + switch field.DataType { + case schemapb.DataType_VECTOR_FLOAT: + var dim int + for _, t := range field.TypeParams { + if t.Key == "dim" { + dim, err = strconv.Atoi(t.Value) + if err != nil { + log.Println("strconv wrong") } + break } - if dim <= 0 { - log.Println("invalid dim") - // TODO: add error handling - } + } + if dim <= 0 { + log.Println("invalid dim") + // TODO: add error handling + } - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ - NumRows: 0, - Data: make([]float32, 0), - Dim: dim, - } + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.FloatVectorFieldData{ + NumRows: 0, + Data: make([]float32, 0), + Dim: dim, } + } - fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) - - for _, blob := range msg.RowData { - for j := 0; j < dim; j++ { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) - pos++ - } - } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Float vector data:", - idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Data, - "NumRows:", - idata.Data[field.FieldID].(*storage.FloatVectorFieldData).NumRows, - "Dim:", - idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Dim) - - case schemapb.DataType_VECTOR_BINARY: - var dim int - for _, t := range field.TypeParams { - if t.Key == "dim" { - dim, err = strconv.Atoi(t.Value) - if err != nil { - log.Println("strconv wrong") - } - break - } - } - if dim <= 0 { - log.Println("invalid dim") - // TODO: add error handling - } + fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ - NumRows: 0, - Data: make([]byte, 0), - Dim: dim, - } + for _, blob := range msg.RowData { + for j := 0; j < dim; j++ { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) + pos++ } - fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) - - for _, blob := range msg.RowData { - for d := 0; d < dim/8; d++ { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, byte(v)) - pos++ + } + fieldData.NumRows += len(msg.RowIDs) + // log.Println(".Float vector data:\n", + // "..NumRows:", + // idata.Data[field.FieldID].(*storage.FloatVectorFieldData).NumRows, + // "..Dim:", + // idata.Data[field.FieldID].(*storage.FloatVectorFieldData).Dim) + + case schemapb.DataType_VECTOR_BINARY: + var dim int + for _, t := range field.TypeParams { + if t.Key == "dim" { + dim, err = strconv.Atoi(t.Value) + if err != nil { + log.Println("strconv wrong") } + break } + } + if dim <= 0 { + log.Println("invalid dim") + // TODO: add error handling + } - fieldData.NumRows += len(msg.RowData) - log.Println( - "Binary vector data:", - idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Data, - "NumRows:", - idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).NumRows, - "Dim:", - idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Dim) - case schemapb.DataType_BOOL: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.BoolFieldData{ - NumRows: 0, - Data: make([]bool, 0), - } + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.BinaryVectorFieldData{ + NumRows: 0, + Data: make([]byte, 0), + Dim: dim, } + } + fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) - fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) - for _, blob := range msg.RowData { - boolInt := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - if boolInt == 1 { - fieldData.Data = append(fieldData.Data, true) - } else { - fieldData.Data = append(fieldData.Data, false) - } + for _, blob := range msg.RowData { + for d := 0; d < dim/8; d++ { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, byte(v)) pos++ } + } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Bool data:", - idata.Data[field.FieldID].(*storage.BoolFieldData).Data) - case schemapb.DataType_INT8: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int8FieldData{ - NumRows: 0, - Data: make([]int8, 0), - } + fieldData.NumRows += len(msg.RowData) + // log.Println( + // ".Binary vector data:\n", + // "..NumRows:", + // idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).NumRows, + // "..Dim:", + // idata.Data[field.FieldID].(*storage.BinaryVectorFieldData).Dim) + case schemapb.DataType_BOOL: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.BoolFieldData{ + NumRows: 0, + Data: make([]bool, 0), } + } - fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) - for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int8(v)) - pos++ - } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Int8 data:", - idata.Data[field.FieldID].(*storage.Int8FieldData).Data) - - case schemapb.DataType_INT16: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int16FieldData{ - NumRows: 0, - Data: make([]int16, 0), - } + fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) + for _, blob := range msg.RowData { + boolInt := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + if boolInt == 1 { + fieldData.Data = append(fieldData.Data, true) + } else { + fieldData.Data = append(fieldData.Data, false) } + pos++ + } - fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) - for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int16(v)) - pos++ + fieldData.NumRows += len(msg.RowIDs) + // log.Println("Bool data:", + // idata.Data[field.FieldID].(*storage.BoolFieldData).Data) + case schemapb.DataType_INT8: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int8FieldData{ + NumRows: 0, + Data: make([]int8, 0), } + } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Int16 data:", - idata.Data[field.FieldID].(*storage.Int16FieldData).Data) - case schemapb.DataType_INT32: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int32FieldData{ - NumRows: 0, - Data: make([]int32, 0), - } + fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) + for _, blob := range msg.RowData { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, int8(v)) + pos++ + } + fieldData.NumRows += len(msg.RowIDs) + // log.Println("Int8 data:", + // idata.Data[field.FieldID].(*storage.Int8FieldData).Data) + + case schemapb.DataType_INT16: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int16FieldData{ + NumRows: 0, + Data: make([]int16, 0), } + } - fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) - for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int32(v)) - pos++ - } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Int32 data:", - idata.Data[field.FieldID].(*storage.Int32FieldData).Data) - - case schemapb.DataType_INT64: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.Int64FieldData{ - NumRows: 0, - Data: make([]int64, 0), - } + fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) + for _, blob := range msg.RowData { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, int16(v)) + pos++ + } + + fieldData.NumRows += len(msg.RowIDs) + // log.Println("Int16 data:", + // idata.Data[field.FieldID].(*storage.Int16FieldData).Data) + case schemapb.DataType_INT32: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int32FieldData{ + NumRows: 0, + Data: make([]int32, 0), } + } - fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) - for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, int64(v)) - pos++ + fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) + for _, blob := range msg.RowData { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, int32(v)) + pos++ + } + fieldData.NumRows += len(msg.RowIDs) + // log.Println("Int32 data:", + // idata.Data[field.FieldID].(*storage.Int32FieldData).Data) + + case schemapb.DataType_INT64: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.Int64FieldData{ + NumRows: 0, + Data: make([]int64, 0), } + } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Int64 data:", - idata.Data[field.FieldID].(*storage.Int64FieldData).Data) + fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) + for _, blob := range msg.RowData { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, int64(v)) + pos++ + } - case schemapb.DataType_FLOAT: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.FloatFieldData{ - NumRows: 0, - Data: make([]float32, 0), - } - } + fieldData.NumRows += len(msg.RowIDs) + // log.Println("Int64 data:", + // idata.Data[field.FieldID].(*storage.Int64FieldData).Data) - fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) - for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) - pos++ + case schemapb.DataType_FLOAT: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.FloatFieldData{ + NumRows: 0, + Data: make([]float32, 0), } + } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Float32 data:", - idata.Data[field.FieldID].(*storage.FloatFieldData).Data) + fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) + for _, blob := range msg.RowData { + v := binary.LittleEndian.Uint32(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, math.Float32frombits(v)) + pos++ + } - case schemapb.DataType_DOUBLE: - if _, ok := idata.Data[field.FieldID]; !ok { - idata.Data[field.FieldID] = &storage.DoubleFieldData{ - NumRows: 0, - Data: make([]float64, 0), - } - } + fieldData.NumRows += len(msg.RowIDs) + // log.Println("Float32 data:", + // idata.Data[field.FieldID].(*storage.FloatFieldData).Data) - fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) - for _, blob := range msg.RowData { - v := binary.LittleEndian.Uint64(blob.GetValue()[pos*4:]) - fieldData.Data = append(fieldData.Data, math.Float64frombits(v)) - pos++ + case schemapb.DataType_DOUBLE: + if _, ok := idata.Data[field.FieldID]; !ok { + idata.Data[field.FieldID] = &storage.DoubleFieldData{ + NumRows: 0, + Data: make([]float64, 0), } + } - fieldData.NumRows += len(msg.RowIDs) - log.Println("Float64 data:", - idata.Data[field.FieldID].(*storage.DoubleFieldData).Data) + fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) + for _, blob := range msg.RowData { + v := binary.LittleEndian.Uint64(blob.GetValue()[pos*4:]) + fieldData.Data = append(fieldData.Data, math.Float64frombits(v)) + pos++ } + + fieldData.NumRows += len(msg.RowIDs) + // log.Println("Float64 data:", + // idata.Data[field.FieldID].(*storage.DoubleFieldData).Data) + } + } + + // 1.3 store in buffer + ibNode.insertBuffer.insertData[currentSegID] = idata + + // 1.4 if full + // 1.4.1 generate binlogs + if ibNode.insertBuffer.full(currentSegID) { + log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID)) + // partitionTag -> partitionID + partitionTag := msg.GetPartitionTag() + partitionID, err := typeutil.Hash32String(partitionTag) + if err != nil { + log.Println("partitionTag to partitionID wrong") + // TODO GOOSE add error handler + } + + inCodec := storage.NewInsertCodec(collMeta) + + // buffer data to binlogs + binLogs, err := inCodec.Serialize(partitionID, + currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + + if err != nil { + log.Println("generate binlog wrong") } - // 1.3 store in buffer - ibNode.insertBuffer.insertData[currentSegID] = idata - // 1.4 Send hardTimeTick msg, GOOSE TODO - - // 1.5 if full - // 1.5.1 generate binlogs - if ibNode.insertBuffer.full(currentSegID) { - log.Println("Insert Buffer full, auto flushing ...") - // partitionTag -> partitionID - partitionTag := msg.GetPartitionTag() - partitionID, err := typeutil.Hash32String(partitionTag) + // clear buffer + delete(ibNode.insertBuffer.insertData, currentSegID) + log.Println(".. Clearing buffer") + + // 1.5.2 binLogs -> minIO/S3 + collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10) + partitionIDStr := strconv.FormatInt(partitionID, 10) + segIDStr := strconv.FormatInt(currentSegID, 10) + keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) + + log.Printf(".. Saving (%v) binlogs to MinIO ...", len(binLogs)) + for index, blob := range binLogs { + uid, err := ibNode.idAllocator.AllocOne() if err != nil { - log.Println("partitionTag to partitionID Wrong") + log.Println("Allocate Id failed") + // GOOSE TODO error handler } - inCodec := storage.NewInsertCodec(collMeta) - - // buffer data to binlogs - binLogs, err := inCodec.Serialize(partitionID, - currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) + err = ibNode.minIOKV.Save(key, string(blob.Value[:])) + if err != nil { + log.Println("Save to MinIO failed") + // GOOSE TODO error handler + } + fieldID, err := strconv.ParseInt(blob.Key, 10, 32) if err != nil { - log.Println("generate binlog wrong") + log.Println("string to fieldID wrong") + // GOOSE TODO error handler } - // clear buffer - delete(ibNode.insertBuffer.insertData, currentSegID) + inBinlogMsg := &insertFlushSyncMsg{ + flushCompleted: false, + insertBinlogPathMsg: insertBinlogPathMsg{ + ts: iMsg.timeRange.timestampMax, + segID: currentSegID, + fieldID: fieldID, + paths: []string{key}, + }, + } - // 1.5.2 binLogs -> minIO/S3 - collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10) - partitionIDStr := strconv.FormatInt(partitionID, 10) - segIDStr := strconv.FormatInt(currentSegID, 10) - keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) + log.Println("... Appending binlog paths ...", index) + ibNode.outCh <- inBinlogMsg + } + } + } - for _, blob := range binLogs { - uid, err := ibNode.idAllocator.AllocOne() - if err != nil { - log.Println("Allocate Id failed") - // GOOSE TODO error handle - } + if len(iMsg.insertMessages) > 0 { + log.Println("---insert buffer status---") + var stopSign int = 0 + for k := range ibNode.insertBuffer.insertData { + if stopSign >= 10 { + break + } + log.Printf("seg(%v) buffer size = (%v)", k, ibNode.insertBuffer.size(k)) + stopSign++ + } + } - key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) - err = ibNode.minIOKV.Save(key, string(blob.Value[:])) - if err != nil { - log.Println("Save to MinIO failed") - // GOOSE TODO error handle - } - log.Println(".. Saving binlogs to MinIO ...") + // iMsg is Flush() msg from master + // 1. insertBuffer(not empty) -> binLogs -> minIO/S3 + for _, msg := range iMsg.flushMessages { + currentSegID := msg.GetSegmentID() + flushTs := msg.GetTimestamp() - fieldID, err := strconv.ParseInt(blob.Key, 10, 32) - if err != nil { - log.Println("string to fieldID wrong") - // GOOSE TODO error handle - } + log.Printf(". Receiving flush message segID(%v)...", currentSegID) - inBinlogMsg := &insertFlushSyncMsg{ - flushCompleted: false, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: iMsg.timeRange.timestampMax, - segID: currentSegID, - fieldID: fieldID, - paths: []string{key}, - }, - } + if ibNode.insertBuffer.size(currentSegID) > 0 { + log.Println(".. Buffer not empty, flushing ...") + segMeta, collMeta, err := ibNode.getMeta(currentSegID) + if err != nil { + // GOOSE TODO add error handler + log.Println("Get meta wrong") + } + inCodec := storage.NewInsertCodec(collMeta) - log.Println(".. Appending binlog paths ...") - ibNode.outCh <- inBinlogMsg - } + // partitionTag -> partitionID + partitionTag := segMeta.GetPartitionTag() + partitionID, err := typeutil.Hash32String(partitionTag) + if err != nil { + // GOOSE TODO add error handler + log.Println("partitionTag to partitionID Wrong") + } + // buffer data to binlogs + binLogs, err := inCodec.Serialize(partitionID, + currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + if err != nil { + log.Println("generate binlog wrong") } - } - // iMsg is Flush() msg from master - // 1. insertBuffer(not empty) -> binLogs -> minIO/S3 - for _, msg := range iMsg.flushMessages { - currentSegID := msg.GetSegmentID() - flushTs := msg.GetTimestamp() - log.Printf(". Receiving flush message segID(%v)...", currentSegID) + // clear buffer + delete(ibNode.insertBuffer.insertData, currentSegID) + + // binLogs -> minIO/S3 + collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10) + partitionIDStr := strconv.FormatInt(partitionID, 10) + segIDStr := strconv.FormatInt(currentSegID, 10) + keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) - if ibNode.insertBuffer.size(currentSegID) > 0 { - log.Println(".. Buffer not empty, flushing ...") - segMeta, collMeta, err := ibNode.getMeta(currentSegID) + for _, blob := range binLogs { + uid, err := ibNode.idAllocator.AllocOne() if err != nil { - // GOOSE TODO add error handler - log.Println("Get meta wrong") + log.Println("Allocate Id failed") + // GOOSE TODO error handler } - inCodec := storage.NewInsertCodec(collMeta) - // partitionTag -> partitionID - partitionTag := segMeta.GetPartitionTag() - partitionID, err := typeutil.Hash32String(partitionTag) + key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) + err = ibNode.minIOKV.Save(key, string(blob.Value[:])) if err != nil { - // GOOSE TODO add error handler - log.Println("partitionTag to partitionID Wrong") + log.Println("Save to MinIO failed") + // GOOSE TODO error handler } - // buffer data to binlogs - binLogs, err := inCodec.Serialize(partitionID, - currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + fieldID, err := strconv.ParseInt(blob.Key, 10, 32) if err != nil { - log.Println("generate binlog wrong") + log.Println("string to fieldID wrong") + // GOOSE TODO error handler } - // clear buffer - delete(ibNode.insertBuffer.insertData, currentSegID) - - // binLogs -> minIO/S3 - collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10) - partitionIDStr := strconv.FormatInt(partitionID, 10) - segIDStr := strconv.FormatInt(currentSegID, 10) - keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr) - - for _, blob := range binLogs { - uid, err := ibNode.idAllocator.AllocOne() - if err != nil { - log.Println("Allocate Id failed") - // GOOSE TODO error handler - } - - key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) - err = ibNode.minIOKV.Save(key, string(blob.Value[:])) - if err != nil { - log.Println("Save to MinIO failed") - // GOOSE TODO error handler - } - - fieldID, err := strconv.ParseInt(blob.Key, 10, 32) - if err != nil { - log.Println("string to fieldID wrong") - // GOOSE TODO error handler - } - - // Append binlogs - inBinlogMsg := &insertFlushSyncMsg{ - flushCompleted: false, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: flushTs, - segID: currentSegID, - fieldID: fieldID, - paths: []string{key}, - }, - } - ibNode.outCh <- inBinlogMsg + // Append binlogs + inBinlogMsg := &insertFlushSyncMsg{ + flushCompleted: false, + insertBinlogPathMsg: insertBinlogPathMsg{ + ts: flushTs, + segID: currentSegID, + fieldID: fieldID, + paths: []string{key}, + }, } + ibNode.outCh <- inBinlogMsg } + } - // Flushed - log.Println(".. Flush finished ...") - inBinlogMsg := &insertFlushSyncMsg{ - flushCompleted: true, - insertBinlogPathMsg: insertBinlogPathMsg{ - ts: flushTs, - segID: currentSegID, - }, - } - - ibNode.outCh <- inBinlogMsg + // Flushed + log.Println(".. Flush finished ...") + inBinlogMsg := &insertFlushSyncMsg{ + flushCompleted: true, + insertBinlogPathMsg: insertBinlogPathMsg{ + ts: flushTs, + segID: currentSegID, + }, } + + ibNode.outCh <- inBinlogMsg } if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil { diff --git a/internal/writenode/param_table_test.go b/internal/writenode/param_table_test.go index fe84cb7a2d31a833ba6f4d0088b6a3ddf9f5733a..d8f79da9864db1cca6b9480f151526520f0138fa 100644 --- a/internal/writenode/param_table_test.go +++ b/internal/writenode/param_table_test.go @@ -89,7 +89,7 @@ func TestParamTable_WriteNode(t *testing.T) { t.Run("Test FlushInsertBufSize", func(t *testing.T) { name := Params.FlushInsertBufSize - assert.Equal(t, name, 20) + assert.Equal(t, name, 500) }) t.Run("Test FlushDdBufSize", func(t *testing.T) {