diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go index 2bdb31f87e97418dc44b09ea8a47dd8742d99bd1..b059a84631d0c13314387cf4730e993d6bff068a 100644 --- a/cmd/distributed/components/master_service.go +++ b/cmd/distributed/components/master_service.go @@ -109,12 +109,16 @@ func NewMasterService(ctx context.Context) (*MasterService, error) { } qs.Params.Init() - log.Printf("query service address = %s:%d", qs.Params.Address, qs.Params.Port) - queryService, err := qsc.NewClient(fmt.Sprintf("%s:%d", qs.Params.Address, qs.Params.Port), time.Duration(ms.Params.Timeout)*time.Second) + queryService, err := qsc.NewClient(qs.Params.Address, time.Duration(ms.Params.Timeout)*time.Second) if err != nil { return nil, err } - + if err = queryService.Init(); err != nil { + return nil, err + } + if err = queryService.Start(); err != nil { + return nil, err + } if err = svr.SetQueryService(queryService); err != nil { return nil, err } diff --git a/cmd/distributed/components/query_node.go b/cmd/distributed/components/query_node.go index baa96e83a502074f48f4c438414422cb95d2e78e..c9009f282e41259aed328648556a96b5a81ca177 100644 --- a/cmd/distributed/components/query_node.go +++ b/cmd/distributed/components/query_node.go @@ -215,7 +215,6 @@ func NewQueryNode(ctx context.Context) (*QueryNode, error) { indexService: indexService, queryService: queryService, }, nil - } func (q *QueryNode) Run() error { diff --git a/cmd/distributed/components/query_service.go b/cmd/distributed/components/query_service.go index a57ba91d401607c4cc068988714140ae205792a4..78337cd10dc3f35b9946160c7108f58fbdf2070c 100644 --- a/cmd/distributed/components/query_service.go +++ b/cmd/distributed/components/query_service.go @@ -69,7 +69,7 @@ func NewQueryService(ctx context.Context) (*QueryService, error) { if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { continue } - if msStates.State.StateCode != internalpb2.StateCode_INITIALIZING && msStates.State.StateCode != internalpb2.StateCode_HEALTHY { + if msStates.State.StateCode != internalpb2.StateCode_HEALTHY && msStates.State.StateCode != internalpb2.StateCode_INITIALIZING { continue } break diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 9fb4f945878fb83b7892e5913e9d969847e7d933..a9ff30b1e5579ef3da280d109194c42648ca388a 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -546,8 +546,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error { } msgPack.Msgs = append(msgPack.Msgs, msg) - return ibNode.timeTickStream.Produce(&msgPack) - + return ibNode.completeFlushStream.Produce(&msgPack) } func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { diff --git a/internal/dataservice/dd_handler.go b/internal/dataservice/dd_handler.go index 9e6ad37faa7c7e1efb639d1fafcdd30ee80b497d..b15602e3eaca4ef97ff3f8b99b170939b5298026 100644 --- a/internal/dataservice/dd_handler.go +++ b/internal/dataservice/dd_handler.go @@ -1,8 +1,6 @@ package dataservice import ( - "fmt" - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -37,7 +35,7 @@ func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error { realMsg := msg.(*msgstream.DropPartitionMsg) return handler.handleDropPartition(realMsg) default: - return fmt.Errorf("unknown msg type: %v", msg.Type()) + return nil } } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 12d9b8a06e5b5a3ccaa3b40938e97f1ec21e5107..e1e90cfa834fc9936825dac50df9f63f4c4d4b49 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -143,10 +143,10 @@ func (s *Server) Start() error { } s.waitDataNodeRegister() s.cluster.WatchInsertChannels(s.insertChannels) - s.startServerLoop() if err = s.initMsgProducer(); err != nil { return err } + s.startServerLoop() s.state.Store(internalpb2.StateCode_HEALTHY) log.Println("start success") return nil @@ -185,6 +185,8 @@ func (s *Server) initMsgProducer() error { } s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName) s.ttMsgStream.Start() + s.ttBarrier = timesync.NewHardTimeTickBarrier(s.ctx, s.ttMsgStream, s.cluster.GetNodeIDs()) + s.ttBarrier.Start() if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil { return err } @@ -298,11 +300,10 @@ func (s *Server) checkMasterIsHealthy() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(4) + s.serverLoopWg.Add(3) go s.startStatsChannel(s.serverLoopCtx) go s.startSegmentFlushChannel(s.serverLoopCtx) go s.startDDChannel(s.serverLoopCtx) - go s.startTTBarrier(s.serverLoopCtx) } func (s *Server) startStatsChannel(ctx context.Context) { @@ -390,12 +391,6 @@ func (s *Server) startDDChannel(ctx context.Context) { } } -func (s *Server) startTTBarrier(ctx context.Context) { - defer s.serverLoopWg.Done() - s.ttBarrier = timesync.NewHardTimeTickBarrier(ctx, s.ttMsgStream, s.cluster.GetNodeIDs()) - s.ttBarrier.StartBackgroundLoop() -} - func (s *Server) waitDataNodeRegister() { log.Println("waiting data node to register") <-s.registerFinishCh @@ -404,6 +399,7 @@ func (s *Server) waitDataNodeRegister() { func (s *Server) Stop() error { s.cluster.ShutDownClients() + s.ttBarrier.Close() s.ttMsgStream.Close() s.k2sMsgStream.Close() s.msgProducer.Close() diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index 5c7bda36bade97423503dd29d8439d9bb83d9bbb..02159eae587dd7bff6de52a502f16c7264d2d736 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -100,7 +100,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic } watcher.cluster.FlushSegment(&datapb.FlushSegRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kShowCollections, + MsgType: commonpb.MsgType_kFlush, MsgID: -1, // todo add msg id Timestamp: 0, // todo SourceID: Params.NodeID, diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index f659e1a9a291ed026e13f0e33b429bcc8b51ae1e..0a0f6e6152b96eb7568b1cb93c70d5be43703230 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -35,7 +35,7 @@ func NewServer(ctx context.Context) (*Server, error) { qn.Params.Init() s.grpcServer = grpc.NewServer() querypb.RegisterQueryNodeServer(s.grpcServer, s) - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", qn.Params.QueryNodePort)) + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", qn.Params.QueryNodeIP, qn.Params.QueryNodePort)) if err != nil { return nil, err } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index d55b47ba74063ab29cab5b5ca18fe1a0fcc6c370..3bf6c5abcb073318d54b92fdc67a09a3c219fa72 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -109,7 +109,6 @@ func Init() { } func (node *QueryNode) Init() error { - Params.Init() registerReq := &queryPb.RegisterNodeRequest{ Address: &commonpb.Address{ Ip: Params.QueryNodeIP, diff --git a/internal/timesync/timesync.go b/internal/timesync/timesync.go index 9df5c1e93222291ced1b6a0703539ce67b8c0f46..b61b5ea3daf949f731b8a535a9061ad6366fc526 100644 --- a/internal/timesync/timesync.go +++ b/internal/timesync/timesync.go @@ -4,6 +4,7 @@ import ( "context" "log" "math" + "sync" "sync/atomic" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" @@ -18,7 +19,8 @@ type ( TimeTickBarrier interface { GetTimeTick() (Timestamp, error) - StartBackgroundLoop() + Start() + Close() } softTimeTickBarrier struct { @@ -31,10 +33,13 @@ type ( } hardTimeTickBarrier struct { - peer2Tt map[UniqueID]Timestamp - outTt chan Timestamp - ttStream ms.MsgStream - ctx context.Context + peer2Tt map[UniqueID]Timestamp + outTt chan Timestamp + ttStream ms.MsgStream + ctx context.Context + wg sync.WaitGroup + loopCtx context.Context + loopCancel context.CancelFunc } ) @@ -80,7 +85,7 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { } } -func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop() { +func (ttBarrier *softTimeTickBarrier) Start() { for { select { case <-ttBarrier.ctx.Done(): @@ -137,44 +142,57 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) { } } -func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop() { +func (ttBarrier *hardTimeTickBarrier) Start() { // Last timestamp synchronized + ttBarrier.wg.Add(1) + ttBarrier.loopCtx, ttBarrier.loopCancel = context.WithCancel(ttBarrier.ctx) state := Timestamp(0) - for { - select { - case <-ttBarrier.ctx.Done(): - log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) - return - default: - } - ttmsgs := ttBarrier.ttStream.Consume() - if len(ttmsgs.Msgs) > 0 { - for _, timetickmsg := range ttmsgs.Msgs { - // Suppose ttmsg.Timestamp from stream is always larger than the previous one, - // that `ttmsg.Timestamp > oldT` - ttmsg := timetickmsg.(*ms.TimeTickMsg) - - oldT, ok := ttBarrier.peer2Tt[ttmsg.Base.SourceID] - if !ok { - log.Printf("[hardTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.Base.SourceID) - continue - } + go func(ctx context.Context) { + defer ttBarrier.wg.Done() + for { + select { + case <-ctx.Done(): + log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) + return + default: + } + ttmsgs := ttBarrier.ttStream.Consume() + if len(ttmsgs.Msgs) > 0 { + log.Printf("receive tt msg") + for _, timetickmsg := range ttmsgs.Msgs { + // Suppose ttmsg.Timestamp from stream is always larger than the previous one, + // that `ttmsg.Timestamp > oldT` + ttmsg := timetickmsg.(*ms.TimeTickMsg) + + oldT, ok := ttBarrier.peer2Tt[ttmsg.Base.SourceID] + if !ok { + log.Printf("[hardTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.Base.SourceID) + continue + } - if oldT > state { - log.Printf("[hardTimeTickBarrier] Warning: peer(%d) timestamp(%d) ahead\n", - ttmsg.Base.SourceID, ttmsg.Base.Timestamp) - } + if oldT > state { + log.Printf("[hardTimeTickBarrier] Warning: peer(%d) timestamp(%d) ahead\n", + ttmsg.Base.SourceID, ttmsg.Base.Timestamp) + } - ttBarrier.peer2Tt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp + ttBarrier.peer2Tt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp - newState := ttBarrier.minTimestamp() - if newState > state { - ttBarrier.outTt <- newState - state = newState + newState := ttBarrier.minTimestamp() + log.Printf("new state %d", newState) + if newState > state { + ttBarrier.outTt <- newState + log.Printf("outtttt") + state = newState + } } } } - } + }(ttBarrier.loopCtx) +} + +func (ttBarrier *hardTimeTickBarrier) Close() { + ttBarrier.loopCancel() + ttBarrier.wg.Wait() } func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {