diff --git a/internal/datanode/binlog_meta.go b/internal/datanode/binlog_meta.go index c6b48c0da2a90e06d8aabc055b862a733d89bae7..6012a04f6974a9eb43df9a4a4d679fdfcdb48a93 100644 --- a/internal/datanode/binlog_meta.go +++ b/internal/datanode/binlog_meta.go @@ -39,6 +39,10 @@ func NewBinlogMeta(kv kv.TxnKV, idAllocator allocatorInterface) (*binlogMeta, er return mt, nil } +func (bm *binlogMeta) allocID() (key UniqueID, err error) { + return bm.idAllocator.allocID() +} + // genKey gives a valid key string for lists of UniqueIDs: // if alloc is true, the returned keys will have a generated-unique ID at the end. // if alloc is false, the returned keys will only consist of provided ids. diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index b25c45577af6388e59c4b35724beb2b8ce0430db..a5e92169d87e828718a064517813e90544c4d3c8 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -596,7 +596,7 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un return } - binLogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData)) + binLogs, statsBinlogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData)) if err != nil { log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err)) clearFn(false) @@ -607,6 +607,9 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un field2Path := make(map[UniqueID]string, len(binLogs)) kvs := make(map[string]string, len(binLogs)) paths := make([]string, 0, len(binLogs)) + field2Logidx := make(map[UniqueID]UniqueID, len(binLogs)) + + // write insert binlog for _, blob := range binLogs { fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) if err != nil { @@ -615,17 +618,39 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un return } - k, err := idAllocator.genKey(true, collID, partitionID, segID, fieldID) + logidx, err := idAllocator.allocID() if err != nil { log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err)) clearFn(false) return } + // no error raise if alloc=false + k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx) + key := path.Join(Params.InsertBinlogRootPath, k) paths = append(paths, key) kvs[key] = string(blob.Value[:]) field2Path[fieldID] = key + field2Logidx[fieldID] = logidx + } + + // write stats binlog + for _, blob := range statsBinlogs { + fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) + if err != nil { + log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) + clearFn(false) + return + } + + logidx := field2Logidx[fieldID] + + // no error raise if alloc=false + k, _ := idAllocator.genKey(false, collID, partitionID, segID, fieldID, logidx) + + key := path.Join(Params.StatsBinlogRootPath, k) + kvs[key] = string(blob.Value[:]) } err = kv.MultiSave(kvs) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 55561501c3d1e53f7312b2445341e8e562e5411f..f38d24ea3d2bcc883ed7609f972d0cdf024a2798 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -13,16 +13,24 @@ package datanode import ( "context" + "fmt" "math" + "path" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/flowgraph" - "github.com/stretchr/testify/assert" ) func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { @@ -107,3 +115,108 @@ func genInsertMsg() insertMsg { return *iMsg } + +func TestFlushSegmentTxn(t *testing.T) { + idAllocMock := NewAllocatorFactory(1) + mockMinIO := memkv.NewMemoryKV() + + segmentID, _ := idAllocMock.allocID() + partitionID, _ := idAllocMock.allocID() + collectionID, _ := idAllocMock.allocID() + fmt.Printf("generate segmentID, partitionID, collectionID: %v, %v, %v\n", + segmentID, partitionID, collectionID) + + collMeta := genCollectionMeta(collectionID, "test_flush_segment_txn") + flushMap := sync.Map{} + flushMeta := newBinlogMeta() + + finishCh := make(chan map[UniqueID]string) + + insertData := &InsertData{ + Data: make(map[storage.FieldID]storage.FieldData), + } + insertData.Data[0] = &storage.Int64FieldData{ + NumRows: 10, + Data: []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + } + insertData.Data[1] = &storage.Int64FieldData{ + NumRows: 10, + Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + } + insertData.Data[107] = &storage.FloatFieldData{ + NumRows: 10, + Data: make([]float32, 10), + } + flushMap.Store(segmentID, insertData) + + go func(wait <-chan map[UniqueID]string) { + field2Path := <-wait + assert.NotNil(t, field2Path) + }(finishCh) + + flushSegment(collMeta, + segmentID, + partitionID, + collectionID, + &flushMap, + mockMinIO, + finishCh, + idAllocMock) + + k, _ := flushMeta.genKey(false, collectionID, partitionID, segmentID, 0) + key := path.Join(Params.StatsBinlogRootPath, k) + _, values, _ := mockMinIO.LoadWithPrefix(key) + assert.Equal(t, len(values), 1) + assert.Equal(t, values[0], `{"max":9,"min":0}`) +} + +func genCollectionMeta(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { + sch := schemapb.CollectionSchema{ + Name: collectionName, + Description: "test collection by meta factory", + AutoID: true, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "RowID", + Description: "RowID field", + DataType: schemapb.DataType_Int64, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "f0_tk1", + Value: "f0_tv1", + }, + }, + }, + { + FieldID: 1, + Name: "Timestamp", + Description: "Timestamp field", + DataType: schemapb.DataType_Int64, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "f1_tk1", + Value: "f1_tv1", + }, + }, + }, + { + FieldID: 107, + Name: "float32_field", + Description: "field 107", + DataType: schemapb.DataType_Float, + TypeParams: []*commonpb.KeyValuePair{}, + IndexParams: []*commonpb.KeyValuePair{}, + }, + }, + } + + collection := etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &sch, + CreateTime: Timestamp(1), + SegmentIDs: make([]UniqueID, 0), + PartitionIDs: []UniqueID{0}, + } + return &collection +} diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index a33d1807c96d1c1ce00e68e38a53cd5830af15e6..29053c4c5087d637d7116ee1a3d3146afcf3afe3 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -33,6 +33,7 @@ type ParamTable struct { FlushInsertBufferSize int32 InsertBinlogRootPath string DdlBinlogRootPath string + StatsBinlogRootPath string Log log.Config // === DataNode External Components Configs === @@ -89,6 +90,7 @@ func (p *ParamTable) Init() { p.initFlushInsertBufferSize() p.initInsertBinlogRootPath() p.initDdlBinlogRootPath() + p.initStatsBinlogRootPath() p.initLogCfg() // === DataNode External Components Configs === @@ -166,6 +168,14 @@ func (p *ParamTable) initDdlBinlogRootPath() { p.DdlBinlogRootPath = path.Join(rootPath, "data_definition_log") } +func (p *ParamTable) initStatsBinlogRootPath() { + rootPath, err := p.Load("etcd.rootPath") + if err != nil { + panic(err) + } + p.StatsBinlogRootPath = path.Join(rootPath, "stats_log") +} + // ---- Pulsar ---- func (p *ParamTable) initPulsarAddress() { url, err := p.Load("_PulsarAddress") diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index dda6a6cf19632c91d3ca9e10b08a59715fd39d8b..1f018638455112664298b61d88b9e44c01d66706 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -810,7 +810,7 @@ func generateInsertBinLog(collectionID UniqueID, partitionID UniqueID, segmentID DataType: schemapb.DataType_Int64, }) inCodec := storage.NewInsertCodec(collMeta) - binLogs, err := inCodec.Serialize(partitionID, segmentID, insertData) + binLogs, _, err := inCodec.Serialize(partitionID, segmentID, insertData) if err != nil { return nil, nil, err diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index c08436bcaf55facada893f0f1093a2f132f0b463..83bc9e30d4983874813b9660a9224e8422d04144 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -138,12 +138,13 @@ func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec { return &InsertCodec{Schema: schema} } -func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { +func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, []*Blob, error) { var blobs []*Blob + var statsBlobs []*Blob var writer *InsertBinlogWriter timeFieldData, ok := data.Data[ms.TimeStampField] if !ok { - return nil, errors.New("data doesn't contains timestamp field") + return nil, nil, errors.New("data doesn't contains timestamp field") } ts := timeFieldData.(*Int64FieldData).Data startTs := ts[0] @@ -157,11 +158,14 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique for _, field := range insertCodec.Schema.Schema.Fields { singleData := data.Data[field.FieldID] + + // encode fields writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID) eventWriter, err := writer.NextInsertEventWriter() if err != nil { - return nil, err + return nil, nil, err } + eventWriter.SetStartTimestamp(typeutil.Timestamp(startTs)) eventWriter.SetEndTimestamp(typeutil.Timestamp(endTs)) switch field.DataType { @@ -183,7 +187,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique for _, singleString := range singleData.(*StringFieldData).Data { err = eventWriter.AddOneStringToPayload(singleString) if err != nil { - return nil, err + return nil, nil, err } } case schemapb.DataType_BinaryVector: @@ -191,22 +195,22 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique case schemapb.DataType_FloatVector: err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim) default: - return nil, fmt.Errorf("undefined data type %d", field.DataType) + return nil, nil, fmt.Errorf("undefined data type %d", field.DataType) } if err != nil { - return nil, err + return nil, nil, err } writer.SetStartTimeStamp(typeutil.Timestamp(startTs)) writer.SetEndTimeStamp(typeutil.Timestamp(endTs)) err = writer.Close() if err != nil { - return nil, err + return nil, nil, err } buffer, err := writer.GetBuffer() if err != nil { - return nil, err + return nil, nil, err } blobKey := fmt.Sprintf("%d", field.FieldID) blobs = append(blobs, &Blob{ @@ -214,9 +218,23 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique Value: buffer, }) + // stats fields + statsWriter := &StatsWriter{} + switch field.DataType { + case schemapb.DataType_Int64: + err = statsWriter.StatsInt64(singleData.(*Int64FieldData).Data) + } + if err != nil { + return nil, nil, err + } + statsBuffer := statsWriter.GetBuffer() + statsBlobs = append(statsBlobs, &Blob{ + Key: blobKey, + Value: statsBuffer, + }) } - return blobs, nil + return blobs, statsBlobs, nil } func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) { if len(blobs) == 0 { diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index e2a14958f721740ec7758152554fee90db612ec9..74e8f81b500cfb21cbea86258c8f13c4d0f30fc8 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -228,13 +228,13 @@ func TestInsertCodec(t *testing.T) { }, }, } - firstBlobs, err := insertCodec.Serialize(1, 1, insertDataFirst) + firstBlobs, _, err := insertCodec.Serialize(1, 1, insertDataFirst) assert.Nil(t, err) for _, blob := range firstBlobs { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 100) assert.Equal(t, blob.GetKey(), blob.Key) } - secondBlobs, err := insertCodec.Serialize(1, 1, insertDataSecond) + secondBlobs, _, err := insertCodec.Serialize(1, 1, insertDataSecond) assert.Nil(t, err) for _, blob := range secondBlobs { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) @@ -353,7 +353,7 @@ func TestIndexCodec(t *testing.T) { func TestTsError(t *testing.T) { insertData := &InsertData{} insertCodec := NewInsertCodec(nil) - blobs, err := insertCodec.Serialize(1, 1, insertData) + blobs, _, err := insertCodec.Serialize(1, 1, insertData) assert.Nil(t, blobs) assert.NotNil(t, err) } @@ -410,7 +410,7 @@ func TestSchemaError(t *testing.T) { }, } insertCodec := NewInsertCodec(schema) - blobs, err := insertCodec.Serialize(1, 1, insertData) + blobs, _, err := insertCodec.Serialize(1, 1, insertData) assert.Nil(t, blobs) assert.NotNil(t, err) } diff --git a/internal/storage/print_binglog_test.go b/internal/storage/print_binglog_test.go index 7d55a85f17071e214ce2ef9e4939a80db56be070..bf5b07d93fec80976df48fddca7cb11285c66e41 100644 --- a/internal/storage/print_binglog_test.go +++ b/internal/storage/print_binglog_test.go @@ -281,7 +281,7 @@ func TestPrintBinlogFiles(t *testing.T) { }, }, } - firstBlobs, err := insertCodec.Serialize(1, 1, insertDataFirst) + firstBlobs, _, err := insertCodec.Serialize(1, 1, insertDataFirst) assert.Nil(t, err) var binlogFiles []string for index, blob := range firstBlobs { @@ -296,7 +296,7 @@ func TestPrintBinlogFiles(t *testing.T) { err = fd.Close() assert.Nil(t, err) } - secondBlobs, err := insertCodec.Serialize(1, 1, insertDataSecond) + secondBlobs, _, err := insertCodec.Serialize(1, 1, insertDataSecond) assert.Nil(t, err) for index, blob := range secondBlobs { blob.Key = fmt.Sprintf("1/insert_log/2/3/4/5/%d", 99) diff --git a/internal/storage/stats.go b/internal/storage/stats.go new file mode 100644 index 0000000000000000000000000000000000000000..d192074d8b3c18bfef7edd1fe34807d0088efc7d --- /dev/null +++ b/internal/storage/stats.go @@ -0,0 +1,62 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package storage + +import ( + "encoding/json" +) + +type Int64Stats struct { + Max int64 `json:"max"` + Min int64 `json:"min"` +} + +type StatsWriter struct { + buffer []byte +} + +func (sw *StatsWriter) GetBuffer() []byte { + return sw.buffer +} + +func (sw *StatsWriter) StatsInt64(msgs []int64) error { + if len(msgs) < 1 { + // return error: msgs must has one element at least + return nil + } + + stats := &Int64Stats{ + Max: msgs[len(msgs)-1], + Min: msgs[0], + } + b, err := json.Marshal(stats) + if err != nil { + return err + } + sw.buffer = b + + return nil +} + +type StatsReader struct { + buffer []byte +} + +func (sr *StatsReader) SetBuffer(buffer []byte) { + sr.buffer = buffer +} + +func (sr *StatsReader) GetInt64Stats() Int64Stats { + stats := Int64Stats{} + json.Unmarshal(sr.buffer, &stats) + return stats +} diff --git a/internal/storage/stats_test.go b/internal/storage/stats_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7275c50ddda4dbe0523efdfd223f425365ba4dba --- /dev/null +++ b/internal/storage/stats_test.go @@ -0,0 +1,37 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStatsInt64(t *testing.T) { + data := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9} + sw := &StatsWriter{} + err := sw.StatsInt64(data) + assert.NoError(t, err) + b := sw.GetBuffer() + + assert.Equal(t, string(b), `{"max":9,"min":1}`) + + sr := &StatsReader{} + sr.SetBuffer(b) + stats := sr.GetInt64Stats() + expectedStats := Int64Stats{ + Max: 9, + Min: 1, + } + assert.Equal(t, stats, expectedStats) +}