diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index b06ea262090135c7505f3a13ec08c069306721d3..debb309b487839711471fd0200789778d7ad9bee 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -274,6 +274,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen for _, id := range req.SegmentIDs { chanName := node.getChannelName(id) + log.Info("vchannel", zap.String("name", chanName)) if chanName == "" { status.Reason = fmt.Sprintf("DataNode not find segment %d!", id) return status, errors.New(status.GetReason()) @@ -296,23 +297,9 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen dmlFlushedCh: dmlFlushedCh, } - flushCh <- flushmsg - - // GOOSE TODO get binlog paths. - // waitReceive := func(wg *sync.WaitGroup, flushedCh <-chan bool, req *datapb.SaveBinlogPathsRequest) { - // defer wg.Done() - // select { - // case <-time.After(300 * time.Second): - // return - // case isFlushed := <-flushedCh: - // if isFlushed { - // log.Debug("Yeah! It's safe to notify dataservice") - // } - // } - // } - waitReceive := func(wg *sync.WaitGroup, flushedCh interface{}, req *datapb.SaveBinlogPathsRequest) { defer wg.Done() + log.Info("Inside waitReceive") switch Ch := flushedCh.(type) { case chan []*datapb.ID2PathList: select { @@ -342,7 +329,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen if len(meta) == 0 { log.Info("Ddl messages flush Done") - // Modify req with empty ddl binlog paths + // Modify req with empty ddl binlog paths and position return } @@ -357,6 +344,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen // TODO make a queue for this func currentSegID := id go func() { + flushCh <- flushmsg + log.Info("Waiting for flush completed", zap.Int64("segmentID", currentSegID)) req := &datapb.SaveBinlogPathsRequest{ Base: &commonpb.MsgBase{}, @@ -371,8 +360,20 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen go waitReceive(&wg, dmlFlushedCh, req) wg.Wait() - // TODO - //status := node.dataService.SaveBinlogPaths(req) + status, err := node.dataService.SaveBinlogPaths(node.ctx, req) + if err != nil { + log.Error("DataNode or DataService abnormal, restarting DataNode") + // TODO restart + return + } + + if status.ErrorCode != commonpb.ErrorCode_Success { + log.Error("Save paths failed, resending request", + zap.String("error message", status.GetReason())) + // TODO resend + return + } + log.Info("Flush Completed", zap.Int64("segmentID", currentSegID)) }() diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 295cd32798cdeca600a18168105f0ebdb800fe8c..0f839bfa39014755942871733000a58ee03fa4fb 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -126,14 +126,15 @@ func TestDataNode(t *testing.T) { sync, ok := node1.vchan2SyncService[dmChannelName] assert.True(t, ok) sync.replica.addSegment(0, 1, 1, dmChannelName) - sync.replica.addSegment(1, 1, 1, dmChannelName) + // sync.replica.addSegment(1, 1, 1, dmChannelName) unable to deal with this. req := &datapb.FlushSegmentsRequest{ Base: &commonpb.MsgBase{}, DbID: 0, CollectionID: 1, - SegmentIDs: []int64{0, 1}, + SegmentIDs: []int64{0}, } + status, err := node1.FlushSegments(node.ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) @@ -182,8 +183,13 @@ func TestDataNode(t *testing.T) { err = ddMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - <-node1.ctx.Done() - node1.Stop() + _, err = sync.replica.getSegmentByID(0) + assert.NoError(t, err) + + defer func() { + node1.ctx.Done() + node1.Stop() + }() }) t.Run("Test GetTimeTickChannel", func(t *testing.T) { @@ -196,6 +202,6 @@ func TestDataNode(t *testing.T) { assert.NoError(t, err) }) - <-node.ctx.Done() + // <-node.ctx.Done() node.Stop() } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index d95ac0c8988867abea8939d92f8fd4ca0055873e..741d9e89267c8cc5ee68fb8c706ee3e15e592042 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -57,15 +57,6 @@ func newDataSyncService(ctx context.Context, return service } -// func (dsService *dataSyncService) init() { -// if len(Params.InsertChannelNames) == 0 { -// log.Error("InsertChannels not readly, init datasync service failed") -// return -// } - -// dsService.initNodes() -// } - func (dsService *dataSyncService) start() { log.Debug("Data Sync Service Start Successfully") if dsService.fg != nil { diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 9db343a06d736e91240be17494610a20be2c7153..a3ffc31ab0bbbea9af6d569141d47377cbeff11e 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -182,16 +182,15 @@ func (ddNode *ddNode) flushComplete(binlogMetaCh <-chan *datapb.DDLBinlogMeta, c ddlFlushedCh <- nil return } + // + // log.Debug(".. Saving ddl binlog meta ...") + // err := ddNode.binlogMeta.SaveDDLBinlogMetaTxn(collID, binlogMeta) + // if err != nil { + // log.Error("Save binlog meta to etcd Wrong", zap.Error(err)) + // } - log.Debug(".. Saving ddl binlog meta ...") - err := ddNode.binlogMeta.SaveDDLBinlogMetaTxn(collID, binlogMeta) - if err != nil { - log.Error("Save binlog meta to etcd Wrong", zap.Error(err)) - } - - ddlFlushedCh <- []*datapb.DDLBinlogMeta{binlogMeta} // TODO remove above - // ddlFlushCh <- binlogMetaCh + ddlFlushedCh <- []*datapb.DDLBinlogMeta{binlogMeta} } /* diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 38fb6417081caa022d54d7e4b638c745d8d55a22..fd700b42ac4fae2769d85f8e9e66d93767f99410 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -16,10 +16,13 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/flowgraph" ) @@ -41,10 +44,10 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { inFlushCh := make(chan *flushMsg, 10) defer close(inFlushCh) - testPath := "/test/datanode/root/meta" - err := clearEtcd(testPath) - require.NoError(t, err) - Params.MetaRootPath = testPath + // testPath := "/test/datanode/root/meta" + // err := clearEtcd(testPath) + // require.NoError(t, err) + // Params.MetaRootPath = testPath // Params.FlushDdBufferSize = 4 replica := newReplica() @@ -148,11 +151,13 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { } replica.addSegment(1, collID, partitionID, "insert-01") + flushCh := make(chan []*datapb.DDLBinlogMeta) inFlushCh <- &flushMsg{ msgID: 5, timestamp: 5, segmentID: UniqueID(1), collectionID: collID, + ddlFlushedCh: flushCh, } startPos := []*internalpb.MsgPosition{ @@ -172,4 +177,8 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { startPos, startPos) var inMsg Msg = msgStream ddNode.Operate([]Msg{inMsg}) + + paths := <-flushCh + log.Debug("Flushed DDL binlog paths", zap.Any("paths", paths)) + assert.Equal(t, 1, len(paths)) } diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 88ed37ca1fe90480df1822807e71a4ed5eb6be96..4547c4788cc5d4da27edecb3cc3e3e3adeb57f44 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -709,13 +709,13 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[Un } // GOOSE TODO remove the below - log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number of fields", len(field2Path))) - err = ibNode.flushMeta.SaveSegmentBinlogMetaTxn(segID, bufferField2Paths) - if err != nil { - log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err)) - dmlFlushedCh <- nil - return - } + // log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number of fields", len(field2Path))) + // err = ibNode.flushMeta.SaveSegmentBinlogMetaTxn(segID, bufferField2Paths) + // if err != nil { + // log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err)) + // dmlFlushedCh <- nil + // return + // } binlogPaths := make([]*datapb.ID2PathList, 0, len(bufferField2Paths)) for k, paths := range bufferField2Paths { @@ -732,28 +732,28 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[Un ibNode.replica.setIsFlushed(segID) ibNode.updateSegStatistics([]UniqueID{segID}) - msgPack := msgstream.MsgPack{} - completeFlushMsg := internalpb.SegmentFlushCompletedMsg{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentFlushDone, - MsgID: 0, // GOOSE TODO - Timestamp: 0, // GOOSE TODO - SourceID: Params.NodeID, - }, - SegmentID: segID, - } - var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - SegmentFlushCompletedMsg: completeFlushMsg, - } - - msgPack.Msgs = append(msgPack.Msgs, msg) - err = ibNode.completeFlushStream.Produce(&msgPack) - if err != nil { - log.Error(".. Produce complete flush msg failed ..", zap.Error(err)) - } + // msgPack := msgstream.MsgPack{} + // completeFlushMsg := internalpb.SegmentFlushCompletedMsg{ + // Base: &commonpb.MsgBase{ + // MsgType: commonpb.MsgType_SegmentFlushDone, + // MsgID: 0, // GOOSE TODO + // Timestamp: 0, // GOOSE TODO + // SourceID: Params.NodeID, + // }, + // SegmentID: segID, + // } + // var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{ + // BaseMsg: msgstream.BaseMsg{ + // HashValues: []uint32{0}, + // }, + // SegmentFlushCompletedMsg: completeFlushMsg, + // } + // + // msgPack.Msgs = append(msgPack.Msgs, msg) + // err = ibNode.completeFlushStream.Produce(&msgPack) + // if err != nil { + // log.Error(".. Produce complete flush msg failed ..", zap.Error(err)) + // } } func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 64c98cf97adb6a53af898cf54141bb6bc36163ba..8cb6c555d707ab3eb0fc92bc27af27559181d479 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -193,6 +193,10 @@ type DataServiceFactory struct { types.DataService } +func (ds *DataServiceFactory) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil +} + func (ds *DataServiceFactory) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { ret := &datapb.RegisterNodeResponse{Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success}} @@ -210,11 +214,6 @@ func (ds *DataServiceFactory) RegisterNode(ctx context.Context, req *datapb.Regi return ret, nil } -func (ds *DataServiceFactory) WatchDmChannels(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error) { - ret := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} - return ret, nil -} - func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { sch := schemapb.CollectionSchema{ Name: collectionName, diff --git a/internal/dataservice/grpc_handler.go b/internal/dataservice/grpc_handler.go index 3097b92ff500262a373a99de56626abb015b9319..8efe4ebdbde669b526c91259a5952040f8ea8f75 100644 --- a/internal/dataservice/grpc_handler.go +++ b/internal/dataservice/grpc_handler.go @@ -321,21 +321,21 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath if err != nil { log.Error("Failed to get collection info", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err)) resp.Reason = err.Error() - return resp, err + return resp, nil } meta, err := s.prepareBinlogAndPos(req) if err != nil { log.Error("prepare binlog and pos meta failed", zap.Error(err)) resp.Reason = err.Error() - return resp, err + return resp, nil } // set segment to SegmentState_Flushing err = s.meta.FlushSegmentWithBinlogAndPos(req.SegmentID, meta) if err != nil { resp.Reason = err.Error() - return resp, err + return resp, nil } log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID), zap.Any("meta", meta)) @@ -345,7 +345,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath err = s.flushMsgStream.Produce(&msgPack) if err != nil { resp.Reason = err.Error() - return resp, err + return resp, nil } log.Debug("send segment flush msg", zap.Int64("id", req.SegmentID)) @@ -353,7 +353,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath if err = s.meta.FlushSegment(req.SegmentID); err != nil { log.Error("flush segment complete failed", zap.Error(err)) resp.Reason = err.Error() - return resp, err + return resp, nil } log.Debug("flush segment complete", zap.Int64("id", req.SegmentID)) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 115278462ea028eb6b18e48128cc00672fdb7492..16b3d122848eaa3a62e22d2b9b7e14636940b03a 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -20,17 +20,18 @@ import ( "time" grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" - "github.com/milvus-io/milvus/internal/logutil" + + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/logutil" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index c1183fe887ce8a2ac2a6b0ffb7c58351d4073001..a2ec0826ba3f59d7b1b9e47c8e0a135b1b4c29cf 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -562,7 +562,7 @@ func TestSaveBinlogPaths(t *testing.T) { }, }, }) - assert.NotNil(t, err) + assert.Nil(t, err) assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError) }) } diff --git a/internal/distributed/dataservice/client/client.go b/internal/distributed/dataservice/client/client.go index ffce0c5067531af370f2f721769665195b9a938d..5c0ef1e7b6252eb1664bda51bf3975e170434383 100644 --- a/internal/distributed/dataservice/client/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -272,6 +272,10 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR return ret.(*datapb.GetSegmentInfoResponse), err } +func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { + return c.grpcClient.SaveBinlogPaths(ctx, req) +} + func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { ret, err := c.recall(func() (interface{}, error) { return c.grpcClient.GetRecoveryInfo(ctx, req) diff --git a/internal/types/types.go b/internal/types/types.go index a89cc1d6a3cc7b20e0f67ed448ed2eddb98c74f7..1c28eb52e5bbba074758afddbbf9b023dde00fbd 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -62,6 +62,7 @@ type DataService interface { GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) + SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) } type IndexNode interface {