diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go index e1753884a5732ab1162446b3b87f29a33c75af94..e676f5a2e0afdee2118414ebdd0676e3b97fd96a 100644 --- a/internal/writenode/flow_graph_insert_buffer_node.go +++ b/internal/writenode/flow_graph_insert_buffer_node.go @@ -16,7 +16,9 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/storage" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" @@ -34,12 +36,13 @@ type ( insertBufferNode struct { BaseNode - kvClient *etcdkv.EtcdKV - insertBuffer *insertBuffer - minIOKV kv.Base - minioPrifex string - idAllocator *allocator.IDAllocator - outCh chan *insertFlushSyncMsg + kvClient *etcdkv.EtcdKV + insertBuffer *insertBuffer + minIOKV kv.Base + minioPrifex string + idAllocator *allocator.IDAllocator + outCh chan *insertFlushSyncMsg + pulsarWriteNodeTimeTickStream *msgstream.PulsarMsgStream } insertBuffer struct { @@ -426,11 +429,33 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // Return } + + if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil { + log.Printf("Error: send hard time tick into pulsar channel failed, %s\n", err.Error()) + } + return nil } -func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg) *insertBufferNode { +func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { + msgPack := msgstream.MsgPack{} + timeTickMsg := msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: ts, + EndTimestamp: ts, + HashValues: []uint32{0}, + }, + TimeTickMsg: internalpb.TimeTickMsg{ + MsgType: internalpb.MsgType_kTimeTick, + PeerID: Params.WriteNodeID, + Timestamp: ts, + }, + } + msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg) + return ibNode.pulsarWriteNodeTimeTickStream.Produce(&msgPack) +} +func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -482,13 +507,18 @@ func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg) *i panic(err) } + wTt := msgstream.NewPulsarMsgStream(ctx, 1024) //input stream, write node time tick + wTt.SetPulsarClient(Params.PulsarAddress) + wTt.CreatePulsarProducers([]string{Params.WriteNodeTimeTickChannelName}) + return &insertBufferNode{ - BaseNode: baseNode, - kvClient: kvClient, - insertBuffer: iBuffer, - minIOKV: minIOKV, - minioPrifex: minioPrefix, - idAllocator: idAllocator, - outCh: outCh, + BaseNode: baseNode, + kvClient: kvClient, + insertBuffer: iBuffer, + minIOKV: minIOKV, + minioPrifex: minioPrefix, + idAllocator: idAllocator, + outCh: outCh, + pulsarWriteNodeTimeTickStream: wTt, } } diff --git a/internal/writenode/param_table.go b/internal/writenode/param_table.go index 375733cd072c7f367f8fdd36604ca4e328ed3c1b..c1e9bae2ede8e079bc97797d0bc050565008db35 100644 --- a/internal/writenode/param_table.go +++ b/internal/writenode/param_table.go @@ -260,7 +260,7 @@ func (p *ParamTable) initWriteNodeTimeTickChannelName() { if err != nil { panic(err) } - p.WriteNodeTimeTickChannelName = channels + p.WriteNodeTimeTickChannelName = channels + "-" + strconv.FormatInt(p.WriteNodeID, 10) } func (p *ParamTable) initSliceIndex() { diff --git a/internal/writenode/param_table_test.go b/internal/writenode/param_table_test.go index 10367b596b5c4b9d5fe471fc6d8e15f74913ad58..fe84cb7a2d31a833ba6f4d0088b6a3ddf9f5733a 100644 --- a/internal/writenode/param_table_test.go +++ b/internal/writenode/param_table_test.go @@ -64,7 +64,7 @@ func TestParamTable_WriteNode(t *testing.T) { t.Run("Test timeTickChannelName", func(t *testing.T) { name := Params.WriteNodeTimeTickChannelName - assert.Equal(t, name, "writeNodeTimeTick") + assert.Equal(t, name, "writeNodeTimeTick-3") }) t.Run("Test minioAccessKeyID", func(t *testing.T) {