From aaac839075e80c564555555281113b3be4be0f2d Mon Sep 17 00:00:00 2001 From: xige-16 <xi.ge@zilliz.com> Date: Thu, 4 Feb 2021 17:09:18 +0800 Subject: [PATCH] Joint debug query service success with other components Signed-off-by: xige-16 <xi.ge@zilliz.com> --- internal/distributed/queryservice/service.go | 2 +- internal/proto/query_service.proto | 2 +- internal/proto/querypb/query_service.pb.go | 189 ++++++----- internal/querynode/query_node.go | 22 +- internal/queryservice/meta_replica.go | 88 +++++- internal/queryservice/querynode.go | 20 +- internal/queryservice/queryservice.go | 312 ++++++++++++++----- internal/queryservice/queryservice_test.go | 3 + 8 files changed, 432 insertions(+), 206 deletions(-) diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index b6ddde255..06687170d 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -35,7 +35,7 @@ type Server struct { func (s *Server) Init() error { log.Println("query service init") if err := s.queryService.Init(); err != nil { - panic(err) + return err } s.queryService.SetEnableGrpc(true) return nil diff --git a/internal/proto/query_service.proto b/internal/proto/query_service.proto index 7a8b837b4..aeba557f5 100644 --- a/internal/proto/query_service.proto +++ b/internal/proto/query_service.proto @@ -124,7 +124,7 @@ message LoadSegmentRequest { int64 partitionID = 4; repeated int64 segmentIDs = 5; repeated int64 fieldIDs = 6; - data.SegmentStateInfo last_segment_state = 7; + repeated data.SegmentStateInfo segment_states = 7; } message ReleaseSegmentRequest { diff --git a/internal/proto/querypb/query_service.pb.go b/internal/proto/querypb/query_service.pb.go index 64d1bb5b4..833984a93 100644 --- a/internal/proto/querypb/query_service.pb.go +++ b/internal/proto/querypb/query_service.pb.go @@ -972,16 +972,16 @@ func (m *WatchDmChannelsRequest) GetChannelIDs() []string { } type LoadSegmentRequest struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` - CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` - PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"` - SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"` - FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"` - LastSegmentState *datapb.SegmentStateInfo `protobuf:"bytes,7,opt,name=last_segment_state,json=lastSegmentState,proto3" json:"last_segment_state,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` + CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"` + SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"` + FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"` + SegmentStates []*datapb.SegmentStateInfo `protobuf:"bytes,7,rep,name=segment_states,json=segmentStates,proto3" json:"segment_states,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *LoadSegmentRequest) Reset() { *m = LoadSegmentRequest{} } @@ -1051,9 +1051,9 @@ func (m *LoadSegmentRequest) GetFieldIDs() []int64 { return nil } -func (m *LoadSegmentRequest) GetLastSegmentState() *datapb.SegmentStateInfo { +func (m *LoadSegmentRequest) GetSegmentStates() []*datapb.SegmentStateInfo { if m != nil { - return m.LastSegmentState + return m.SegmentStates } return nil } @@ -1395,91 +1395,90 @@ func init() { func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) } var fileDescriptor_5fcb6756dc1afb8d = []byte{ - // 1331 bytes of a gzipped FileDescriptorProto + // 1324 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x5f, 0x73, 0xdb, 0x44, 0x10, 0xb7, 0x6c, 0xc7, 0xa9, 0x37, 0xae, 0xed, 0x5e, 0xfe, 0x19, 0x51, 0x4a, 0x39, 0xa0, 0x4d, - 0x13, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x49, 0xe2, 0x4e, 0xc6, 0x33, 0x34, 0xa4, 0x72, 0x3a, - 0x1d, 0x02, 0x1d, 0x23, 0x4b, 0x17, 0xe7, 0x5a, 0xfd, 0x71, 0x75, 0xe7, 0xa4, 0xc9, 0x0b, 0x30, - 0xc3, 0x23, 0x03, 0x9f, 0x81, 0x81, 0x81, 0x19, 0x5e, 0xf8, 0x36, 0xbc, 0xf0, 0x02, 0xdf, 0x84, - 0xd1, 0x49, 0x56, 0x24, 0x59, 0x8e, 0x9c, 0xba, 0x69, 0x78, 0xd3, 0x9d, 0xf6, 0xf6, 0xb7, 0xfb, - 0xdb, 0xbd, 0xbd, 0x5d, 0x98, 0x7d, 0xd6, 0x27, 0xce, 0x71, 0x9b, 0x11, 0xe7, 0x90, 0x6a, 0xa4, - 0xde, 0x73, 0x6c, 0x6e, 0x23, 0x64, 0x52, 0xe3, 0xb0, 0xcf, 0xbc, 0x55, 0x5d, 0x48, 0xc8, 0x25, - 0xcd, 0x36, 0x4d, 0xdb, 0xf2, 0xf6, 0xe4, 0x52, 0x58, 0x42, 0x2e, 0x53, 0x8b, 0x13, 0xc7, 0x52, - 0x0d, 0x7f, 0x8d, 0x74, 0x95, 0xab, 0x51, 0x9d, 0xf8, 0x1b, 0x98, 0x55, 0x48, 0x97, 0x32, 0x4e, - 0x9c, 0x6d, 0x5b, 0x27, 0x0a, 0x79, 0xd6, 0x27, 0x8c, 0xa3, 0x0f, 0x20, 0xdf, 0x51, 0x19, 0xa9, - 0x49, 0x37, 0xa5, 0xa5, 0x99, 0xb5, 0xeb, 0xf5, 0x08, 0xb2, 0x0f, 0x79, 0x9f, 0x75, 0x37, 0x54, - 0x46, 0x14, 0x21, 0x89, 0x3e, 0x82, 0x69, 0x55, 0xd7, 0x1d, 0xc2, 0x58, 0x2d, 0x7b, 0xc6, 0xa1, - 0x75, 0x4f, 0x46, 0x19, 0x08, 0xe3, 0x9f, 0x24, 0x98, 0x8b, 0x5a, 0xc0, 0x7a, 0xb6, 0xc5, 0x08, - 0xba, 0x0b, 0x05, 0xc6, 0x55, 0xde, 0x67, 0xbe, 0x11, 0xaf, 0x27, 0xea, 0x6b, 0x09, 0x11, 0xc5, - 0x17, 0x45, 0x1b, 0x30, 0x43, 0x2d, 0xca, 0xdb, 0x3d, 0xd5, 0x51, 0xcd, 0x81, 0x25, 0x6f, 0x45, - 0x4f, 0x06, 0xac, 0x34, 0x2d, 0xca, 0x77, 0x84, 0xa0, 0x02, 0x34, 0xf8, 0xc6, 0x8f, 0x61, 0xbe, - 0x75, 0x60, 0x1f, 0x6d, 0xda, 0x86, 0x41, 0x34, 0x4e, 0x6d, 0xeb, 0xc5, 0x49, 0x41, 0x90, 0xd7, - 0x3b, 0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x88, 0x6f, 0xcc, 0x60, 0x21, 0xae, 0x7e, 0x12, 0x8f, 0xdf, - 0x81, 0xab, 0x5a, 0xa0, 0xaa, 0xd9, 0x70, 0x7d, 0xce, 0x2d, 0xe5, 0x94, 0xe8, 0x26, 0xfe, 0x4e, - 0x82, 0xf9, 0xcf, 0x6c, 0x55, 0xbf, 0x20, 0xa7, 0x10, 0x86, 0x52, 0x18, 0xb0, 0x96, 0x13, 0xff, - 0x22, 0x7b, 0xf8, 0x7b, 0x09, 0x6a, 0x0a, 0x31, 0x88, 0xca, 0xc8, 0x65, 0x9a, 0xf1, 0xad, 0x04, - 0x73, 0x6e, 0x00, 0x76, 0x54, 0x87, 0xd3, 0xcb, 0x31, 0xa1, 0xe7, 0x65, 0x58, 0xc8, 0x82, 0x49, - 0x32, 0x00, 0x43, 0xa9, 0x37, 0xd0, 0x74, 0x9a, 0x00, 0x91, 0x3d, 0x6c, 0x42, 0x25, 0x40, 0x73, - 0x8f, 0x13, 0x86, 0x6e, 0xc2, 0x4c, 0x48, 0x44, 0x00, 0xe6, 0x94, 0xf0, 0x16, 0xfa, 0x18, 0xa6, - 0x5c, 0x08, 0x22, 0xfc, 0x2b, 0xaf, 0xe1, 0xfa, 0x70, 0xfd, 0xa9, 0x47, 0xb5, 0x2a, 0xde, 0x01, - 0xfc, 0x9b, 0x04, 0x0b, 0x31, 0xbc, 0x57, 0xce, 0xf2, 0x10, 0x2f, 0xf9, 0x04, 0x5e, 0xfe, 0x90, - 0x60, 0x71, 0xc8, 0xd0, 0x49, 0x82, 0xb1, 0x07, 0x0b, 0x01, 0x40, 0x5b, 0x27, 0x4c, 0x73, 0x68, - 0xcf, 0xfd, 0xf6, 0xc2, 0x32, 0xb3, 0xf6, 0x76, 0x3a, 0x89, 0x4c, 0x99, 0x0f, 0x54, 0x34, 0x42, - 0x1a, 0xf0, 0xaf, 0x12, 0xcc, 0xb9, 0x97, 0xf8, 0xf2, 0x32, 0x77, 0x2c, 0x4e, 0x7f, 0x97, 0x60, - 0xd1, 0xbf, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x96, 0x40, 0xde, 0x74, 0x88, 0xca, 0xc9, 0x03, 0x37, - 0x0e, 0x9b, 0x07, 0xaa, 0x65, 0x11, 0x63, 0xb2, 0x04, 0xb8, 0x0d, 0x15, 0xc7, 0x73, 0xb6, 0xad, - 0x79, 0xfa, 0x84, 0xe9, 0x45, 0xa5, 0xec, 0x6f, 0xfb, 0x28, 0xe8, 0x5d, 0x28, 0x3b, 0x84, 0xf5, - 0x8d, 0x53, 0xb9, 0x9c, 0x90, 0xbb, 0xea, 0xed, 0xfa, 0x62, 0xf8, 0x17, 0x09, 0x16, 0xd7, 0x75, - 0x3d, 0x6c, 0xe0, 0x04, 0x77, 0x69, 0x05, 0xae, 0xc5, 0xac, 0xf3, 0xa9, 0x2d, 0x2a, 0xd5, 0xa8, - 0x7d, 0xcd, 0x06, 0xba, 0x03, 0xd5, 0xa8, 0x85, 0x3e, 0xd5, 0x45, 0xa5, 0x12, 0xb1, 0xb1, 0xd9, - 0xc0, 0x7f, 0x4b, 0x20, 0x2b, 0xc4, 0xb4, 0x0f, 0x49, 0xa2, 0xa1, 0x2f, 0xc4, 0xe4, 0xc0, 0xbb, - 0xec, 0x64, 0xde, 0xe5, 0xce, 0xe1, 0x5d, 0x3e, 0xd9, 0xbb, 0x27, 0xb0, 0xf0, 0x48, 0xe5, 0xda, - 0x41, 0xc3, 0x9c, 0x3c, 0x02, 0x37, 0x00, 0x02, 0x3c, 0xaf, 0x28, 0x14, 0x95, 0xd0, 0x0e, 0xfe, - 0x33, 0x0b, 0xc8, 0xbd, 0xe4, 0x2d, 0xd2, 0x35, 0x89, 0xc5, 0x5f, 0xfd, 0xc5, 0x89, 0xbd, 0x0b, - 0xf9, 0xe1, 0x77, 0xe1, 0x06, 0x00, 0xf3, 0xac, 0x73, 0x5d, 0x98, 0x12, 0x17, 0x2b, 0xb4, 0x83, - 0x64, 0xb8, 0xb2, 0x4f, 0x89, 0xa1, 0xbb, 0x7f, 0x0b, 0xe2, 0x6f, 0xb0, 0x46, 0x0f, 0x00, 0x19, - 0x2a, 0xe3, 0x6d, 0x5f, 0xbc, 0xed, 0x3d, 0x30, 0xd3, 0xc2, 0xab, 0x58, 0x6d, 0x74, 0xbb, 0xd5, - 0xba, 0x4f, 0x83, 0x28, 0x8c, 0x4d, 0x6b, 0xdf, 0x56, 0xaa, 0xee, 0xf1, 0xf0, 0x2e, 0xfe, 0x57, - 0x82, 0x79, 0xbf, 0xde, 0x5c, 0x1a, 0x69, 0x63, 0x54, 0x9b, 0x49, 0x68, 0xc3, 0x3f, 0x4a, 0xb0, - 0xb8, 0x69, 0x9b, 0x3d, 0xdb, 0x1a, 0xb8, 0x3d, 0xe1, 0x3b, 0xf5, 0x89, 0x77, 0x88, 0x0c, 0x7a, - 0xe4, 0x5b, 0x23, 0x7a, 0xe4, 0x38, 0xa8, 0x7f, 0x0a, 0xff, 0x23, 0xc1, 0x8c, 0xcf, 0xb6, 0x1b, - 0x16, 0x74, 0x1d, 0x8a, 0x81, 0x2b, 0x7e, 0x2f, 0x71, 0xba, 0x31, 0x44, 0x61, 0x36, 0x3d, 0xef, - 0x72, 0xc3, 0x79, 0xf7, 0x1a, 0x5c, 0x31, 0x89, 0xd9, 0x66, 0xf4, 0x84, 0xf8, 0x69, 0x39, 0x6d, - 0x12, 0xb3, 0x45, 0x4f, 0x88, 0xfb, 0xcb, 0xea, 0x9b, 0x6d, 0xc7, 0x3e, 0x72, 0x99, 0x15, 0xbf, - 0xac, 0xbe, 0xa9, 0xd8, 0x47, 0x0c, 0xbd, 0x01, 0x40, 0x2d, 0x9d, 0x3c, 0x6f, 0x5b, 0xaa, 0x49, - 0x6a, 0x05, 0x71, 0xc3, 0x8b, 0x62, 0x67, 0x5b, 0x35, 0x09, 0xaa, 0xc1, 0xb4, 0x58, 0x34, 0x1b, - 0x22, 0x0b, 0x73, 0xca, 0x60, 0x89, 0xf7, 0x01, 0x85, 0x3c, 0x9c, 0xe8, 0xc6, 0x87, 0xe2, 0x9e, - 0x8d, 0xc7, 0xdd, 0xed, 0xcd, 0x67, 0x23, 0x40, 0x93, 0xc4, 0xf5, 0x43, 0x98, 0xa2, 0xd6, 0xbe, - 0x3d, 0x68, 0x37, 0xde, 0x4c, 0x6a, 0x37, 0xc2, 0x60, 0x9e, 0xf4, 0xf2, 0x09, 0x94, 0xa3, 0x4d, - 0x08, 0x2a, 0xc1, 0x95, 0x6d, 0x9b, 0xdf, 0x7b, 0x4e, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xdb, - 0xe6, 0x3b, 0x0e, 0x61, 0xc4, 0xe2, 0x55, 0x09, 0x01, 0x14, 0x3e, 0xb7, 0x1a, 0x94, 0x3d, 0xad, - 0x66, 0xd1, 0xac, 0xdf, 0x5b, 0xaa, 0x46, 0xd3, 0xba, 0x4f, 0x4c, 0xdb, 0x39, 0xae, 0xe6, 0xdc, - 0xe3, 0xc1, 0x2a, 0x8f, 0xaa, 0x50, 0x0a, 0x44, 0xb6, 0x76, 0x1e, 0x56, 0xa7, 0x50, 0x11, 0xa6, - 0xbc, 0xcf, 0xc2, 0xda, 0x0f, 0x00, 0x25, 0xf1, 0x6a, 0xb4, 0xbc, 0xc9, 0x14, 0x69, 0x50, 0x0a, - 0x4f, 0x84, 0xe8, 0x76, 0x92, 0x13, 0x09, 0x53, 0xab, 0xbc, 0x94, 0x2e, 0xe8, 0x71, 0x8b, 0x33, - 0xe8, 0x09, 0x54, 0xa2, 0x63, 0x18, 0x43, 0x77, 0x12, 0xc9, 0x4a, 0x1a, 0x05, 0xe5, 0xe5, 0x71, - 0x44, 0x03, 0xac, 0x2e, 0x94, 0x23, 0xfd, 0x3e, 0x43, 0x4b, 0xa3, 0xce, 0xc7, 0x3b, 0x26, 0xf9, - 0xce, 0x18, 0x92, 0x01, 0xd0, 0x17, 0x50, 0x8e, 0x34, 0x88, 0x23, 0x80, 0x92, 0x9a, 0x48, 0xf9, - 0xac, 0xf4, 0xc2, 0x19, 0xd4, 0x86, 0x6b, 0xf1, 0xa6, 0x8e, 0xa1, 0x95, 0x64, 0xc2, 0x13, 0x7b, - 0xbf, 0x34, 0x80, 0x3d, 0xcf, 0xf6, 0x53, 0x02, 0x93, 0xe3, 0x91, 0x38, 0xc5, 0xa6, 0xe9, 0xfe, - 0x3a, 0x30, 0x3e, 0xa4, 0xfe, 0xbd, 0x33, 0x8c, 0x3f, 0x37, 0x42, 0x07, 0xd0, 0x70, 0x27, 0x89, - 0xe4, 0xc4, 0x43, 0xf7, 0xcc, 0x1e, 0x3f, 0x96, 0xeb, 0x49, 0xf0, 0xa3, 0xbb, 0x51, 0x9c, 0x41, - 0x8f, 0x00, 0x6d, 0x11, 0xbe, 0x4b, 0x4d, 0xb2, 0x4b, 0xb5, 0xa7, 0xe3, 0x60, 0xc4, 0x5e, 0x54, - 0x7f, 0xd1, 0xe2, 0x0e, 0xb5, 0xba, 0x91, 0xb4, 0x99, 0xdb, 0x22, 0xa2, 0xc2, 0x53, 0xc6, 0xa9, - 0xc6, 0x5e, 0xa2, 0x6a, 0x5b, 0xd8, 0x1c, 0x9f, 0x3d, 0x97, 0xc7, 0x99, 0x82, 0x7c, 0xe2, 0x57, - 0xc6, 0x92, 0x0d, 0x00, 0xf7, 0x04, 0x60, 0xec, 0xd9, 0x3a, 0xd3, 0x93, 0x31, 0x9f, 0x3e, 0x9c, - 0x41, 0x1a, 0x94, 0x5d, 0x9e, 0x42, 0xcf, 0xde, 0xad, 0xb4, 0xfa, 0xea, 0x3b, 0x71, 0x3b, 0x55, - 0x6e, 0xe0, 0xc0, 0xda, 0x5f, 0x05, 0x28, 0x8a, 0x04, 0x10, 0xb5, 0xef, 0xc2, 0x62, 0xbe, 0x0b, - 0x15, 0x3f, 0xe6, 0x2f, 0x33, 0xdc, 0xed, 0x73, 0xb3, 0x9f, 0x18, 0xde, 0x11, 0xad, 0x0e, 0xce, - 0xa0, 0xc7, 0x50, 0x89, 0x4d, 0x43, 0xc9, 0x45, 0x68, 0xc4, 0xc8, 0x94, 0x76, 0x8d, 0x35, 0x40, - 0xc3, 0x63, 0x0c, 0xaa, 0x27, 0x57, 0x8a, 0x51, 0xe3, 0x4e, 0x1a, 0xc8, 0x57, 0x50, 0x89, 0x8d, - 0x13, 0xc9, 0x17, 0x22, 0x79, 0xe6, 0x48, 0xd3, 0xfe, 0x10, 0x4a, 0xa1, 0xf9, 0x81, 0x25, 0xa7, - 0xe8, 0xf0, 0x84, 0x91, 0xa6, 0xf6, 0x4b, 0xa8, 0x44, 0x9b, 0xec, 0x11, 0xef, 0x65, 0x62, 0x27, - 0x9e, 0x4e, 0xfb, 0xc5, 0x5f, 0xac, 0x8d, 0xf5, 0xbd, 0x4f, 0xbb, 0x94, 0x1f, 0xf4, 0x3b, 0x2e, - 0xfc, 0xea, 0x09, 0x35, 0x0c, 0x7a, 0xc2, 0x89, 0x76, 0xb0, 0xea, 0x69, 0x78, 0x5f, 0xa7, 0x8c, - 0x3b, 0xb4, 0xd3, 0xe7, 0x44, 0x5f, 0x1d, 0x14, 0x81, 0x55, 0xa1, 0x76, 0x55, 0xa8, 0xed, 0x75, - 0x3a, 0x05, 0xb1, 0xbc, 0xfb, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xdf, 0xec, 0xde, 0x9f, - 0x17, 0x00, 0x00, + 0x5b, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x69, 0xe3, 0x4e, 0xc6, 0x0c, 0x0d, 0x41, 0x4e, 0xa7, + 0x43, 0xa0, 0x63, 0x64, 0xe9, 0xe2, 0x5c, 0x6b, 0x49, 0xae, 0xee, 0x9c, 0x34, 0x79, 0x01, 0x66, + 0x78, 0x64, 0xe0, 0x33, 0x30, 0x30, 0xc0, 0xf0, 0x81, 0x78, 0xe1, 0x05, 0xbe, 0x09, 0xa3, 0x93, + 0xac, 0x48, 0xf2, 0x39, 0x72, 0xea, 0xa6, 0xe1, 0x4d, 0x77, 0xda, 0xdb, 0xdf, 0xee, 0x6f, 0xf7, + 0xf6, 0x76, 0x61, 0xfe, 0xe9, 0x80, 0xb8, 0x87, 0x6d, 0x46, 0xdc, 0x7d, 0x6a, 0x90, 0x7a, 0xdf, + 0x75, 0xb8, 0x83, 0x90, 0x45, 0x7b, 0xfb, 0x03, 0xe6, 0xaf, 0xea, 0x42, 0x42, 0x2d, 0x19, 0x8e, + 0x65, 0x39, 0xb6, 0xbf, 0xa7, 0x96, 0xa2, 0x12, 0x6a, 0x99, 0xda, 0x9c, 0xb8, 0xb6, 0xde, 0x0b, + 0xd6, 0xc8, 0xd4, 0xb9, 0x1e, 0xd7, 0x89, 0xbf, 0x81, 0x79, 0x8d, 0x74, 0x29, 0xe3, 0xc4, 0xdd, + 0x74, 0x4c, 0xa2, 0x91, 0xa7, 0x03, 0xc2, 0x38, 0x7a, 0x0f, 0xf2, 0x1d, 0x9d, 0x91, 0x9a, 0x72, + 0x55, 0x59, 0x99, 0x5b, 0xbb, 0x5c, 0x8f, 0x21, 0x07, 0x90, 0xf7, 0x59, 0xf7, 0xae, 0xce, 0x88, + 0x26, 0x24, 0xd1, 0x07, 0x30, 0xab, 0x9b, 0xa6, 0x4b, 0x18, 0xab, 0x65, 0x4f, 0x38, 0x74, 0xc7, + 0x97, 0xd1, 0x86, 0xc2, 0xf8, 0x27, 0x05, 0x16, 0xe2, 0x16, 0xb0, 0xbe, 0x63, 0x33, 0x82, 0x6e, + 0x43, 0x81, 0x71, 0x9d, 0x0f, 0x58, 0x60, 0xc4, 0xab, 0x52, 0x7d, 0x2d, 0x21, 0xa2, 0x05, 0xa2, + 0xe8, 0x2e, 0xcc, 0x51, 0x9b, 0xf2, 0x76, 0x5f, 0x77, 0x75, 0x6b, 0x68, 0xc9, 0x1b, 0xf1, 0x93, + 0x21, 0x2b, 0x4d, 0x9b, 0xf2, 0x2d, 0x21, 0xa8, 0x01, 0x0d, 0xbf, 0xf1, 0x23, 0x58, 0x6c, 0xed, + 0x39, 0x07, 0xeb, 0x4e, 0xaf, 0x47, 0x0c, 0x4e, 0x1d, 0xfb, 0xf9, 0x49, 0x41, 0x90, 0x37, 0x3b, + 0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x89, 0x6f, 0xcc, 0x60, 0x29, 0xa9, 0x7e, 0x1a, 0x8f, 0xdf, 0x82, + 0x8b, 0x46, 0xa8, 0xaa, 0xd9, 0xf0, 0x7c, 0xce, 0xad, 0xe4, 0xb4, 0xf8, 0x26, 0xfe, 0x4e, 0x81, + 0xc5, 0x4f, 0x1d, 0xdd, 0x3c, 0x23, 0xa7, 0x10, 0x86, 0x52, 0x14, 0xb0, 0x96, 0x13, 0xff, 0x62, + 0x7b, 0xf8, 0x7b, 0x05, 0x6a, 0x1a, 0xe9, 0x11, 0x9d, 0x91, 0xf3, 0x34, 0xe3, 0x5b, 0x05, 0x16, + 0xbc, 0x00, 0x6c, 0xe9, 0x2e, 0xa7, 0xe7, 0x63, 0x42, 0xdf, 0xcf, 0xb0, 0x88, 0x05, 0xd3, 0x64, + 0x00, 0x86, 0x52, 0x7f, 0xa8, 0xe9, 0x38, 0x01, 0x62, 0x7b, 0xd8, 0x82, 0x4a, 0x88, 0xe6, 0x1d, + 0x27, 0x0c, 0x5d, 0x85, 0xb9, 0x88, 0x88, 0x00, 0xcc, 0x69, 0xd1, 0x2d, 0xf4, 0x21, 0xcc, 0x78, + 0x10, 0x44, 0xf8, 0x57, 0x5e, 0xc3, 0xf5, 0xd1, 0xfa, 0x53, 0x8f, 0x6b, 0xd5, 0xfc, 0x03, 0xf8, + 0x37, 0x05, 0x96, 0x12, 0x78, 0x2f, 0x9d, 0xe5, 0x11, 0x5e, 0xf2, 0x12, 0x5e, 0xfe, 0x54, 0x60, + 0x79, 0xc4, 0xd0, 0x69, 0x82, 0xb1, 0x03, 0x4b, 0x21, 0x40, 0xdb, 0x24, 0xcc, 0x70, 0x69, 0xdf, + 0xfb, 0xf6, 0xc3, 0x32, 0xb7, 0xf6, 0x66, 0x3a, 0x89, 0x4c, 0x5b, 0x0c, 0x55, 0x34, 0x22, 0x1a, + 0xf0, 0xaf, 0x0a, 0x2c, 0x78, 0x97, 0xf8, 0xfc, 0x32, 0x77, 0x22, 0x4e, 0x7f, 0x57, 0x60, 0x39, + 0xb8, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x56, 0x40, 0x5d, 0x77, 0x89, 0xce, 0xc9, 0xe7, 0x5e, 0x1c, + 0xd6, 0xf7, 0x74, 0xdb, 0x26, 0xbd, 0xe9, 0x12, 0xe0, 0x3a, 0x54, 0x5c, 0xdf, 0xd9, 0xb6, 0xe1, + 0xeb, 0x13, 0xa6, 0x17, 0xb5, 0x72, 0xb0, 0x1d, 0xa0, 0xa0, 0xb7, 0xa1, 0xec, 0x12, 0x36, 0xe8, + 0x1d, 0xcb, 0xe5, 0x84, 0xdc, 0x45, 0x7f, 0x37, 0x10, 0xc3, 0xbf, 0x28, 0xb0, 0x7c, 0xc7, 0x34, + 0xa3, 0x06, 0x4e, 0x71, 0x97, 0x6e, 0xc1, 0xa5, 0x84, 0x75, 0x01, 0xb5, 0x45, 0xad, 0x1a, 0xb7, + 0xaf, 0xd9, 0x40, 0x37, 0xa0, 0x1a, 0xb7, 0x30, 0xa0, 0xba, 0xa8, 0x55, 0x62, 0x36, 0x36, 0x1b, + 0xf8, 0x6f, 0x05, 0x54, 0x8d, 0x58, 0xce, 0x3e, 0x91, 0x1a, 0xfa, 0x5c, 0x4c, 0x0e, 0xbd, 0xcb, + 0x4e, 0xe7, 0x5d, 0xee, 0x14, 0xde, 0xe5, 0xe5, 0xde, 0x3d, 0x86, 0xa5, 0x87, 0x3a, 0x37, 0xf6, + 0x1a, 0xd6, 0xf4, 0x11, 0xb8, 0x02, 0x10, 0xe2, 0xf9, 0x45, 0xa1, 0xa8, 0x45, 0x76, 0xf0, 0x1f, + 0x59, 0x40, 0xde, 0x25, 0x6f, 0x91, 0xae, 0x45, 0x6c, 0xfe, 0xf2, 0x2f, 0x4e, 0xe2, 0x5d, 0xc8, + 0x8f, 0xbe, 0x0b, 0x57, 0x00, 0x98, 0x6f, 0x9d, 0xe7, 0xc2, 0x8c, 0xb8, 0x58, 0x91, 0x1d, 0xa4, + 0xc2, 0x85, 0x5d, 0x4a, 0x7a, 0xa6, 0xf7, 0xb7, 0x20, 0xfe, 0x86, 0x6b, 0xf4, 0x09, 0x94, 0x03, + 0xc9, 0xb6, 0x78, 0x2a, 0x58, 0x6d, 0x56, 0x56, 0x17, 0xbd, 0x4e, 0xb5, 0x1e, 0x50, 0x20, 0x8a, + 0x62, 0xd3, 0xde, 0x75, 0xb4, 0x8b, 0x2c, 0xb2, 0xc3, 0xf0, 0xbf, 0x0a, 0x2c, 0x06, 0x85, 0xe6, + 0xdc, 0xd8, 0x9a, 0xa0, 0xcc, 0x4c, 0xc3, 0x17, 0xfe, 0x51, 0x81, 0xe5, 0x75, 0xc7, 0xea, 0x3b, + 0x76, 0xe8, 0xf7, 0x74, 0xf5, 0xe9, 0x23, 0xff, 0x10, 0x19, 0x36, 0xc7, 0xd7, 0xc6, 0x34, 0xc7, + 0x49, 0xd0, 0xe0, 0x14, 0xfe, 0x47, 0x81, 0xb9, 0x80, 0x6d, 0x2f, 0x26, 0xe8, 0x32, 0x14, 0x43, + 0x57, 0x82, 0x26, 0xe2, 0x78, 0x63, 0x84, 0xc2, 0x6c, 0x7a, 0xc2, 0xe5, 0x46, 0x13, 0xee, 0x15, + 0xb8, 0x60, 0x11, 0xab, 0xcd, 0xe8, 0x11, 0x09, 0xf2, 0x71, 0xd6, 0x22, 0x56, 0x8b, 0x1e, 0x11, + 0xef, 0x97, 0x3d, 0xb0, 0xda, 0xae, 0x73, 0xe0, 0x31, 0x2b, 0x7e, 0xd9, 0x03, 0x4b, 0x73, 0x0e, + 0x18, 0x7a, 0x0d, 0x80, 0xda, 0x26, 0x79, 0xd6, 0xb6, 0x75, 0x8b, 0xd4, 0x0a, 0xe2, 0x6a, 0x17, + 0xc5, 0xce, 0xa6, 0x6e, 0x11, 0x54, 0x83, 0x59, 0xb1, 0x68, 0x36, 0x6a, 0xb3, 0xfe, 0xc1, 0x60, + 0x89, 0x77, 0x01, 0x45, 0x3c, 0x9c, 0xea, 0xaa, 0x47, 0xe2, 0x9e, 0x4d, 0xc6, 0xdd, 0x6b, 0xca, + 0xe7, 0x63, 0x40, 0xd3, 0xc4, 0xf5, 0x7d, 0x98, 0xa1, 0xf6, 0xae, 0x33, 0xec, 0x33, 0x5e, 0x97, + 0xf5, 0x19, 0x51, 0x30, 0x5f, 0xfa, 0xe6, 0x11, 0x94, 0xe3, 0xdd, 0x07, 0x2a, 0xc1, 0x85, 0x4d, + 0x87, 0xdf, 0x7b, 0x46, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xd3, 0xe1, 0x5b, 0x2e, 0x61, 0xc4, + 0xe6, 0x55, 0x05, 0x01, 0x14, 0x3e, 0xb3, 0x1b, 0x94, 0x3d, 0xa9, 0x66, 0xd1, 0x7c, 0xd0, 0x54, + 0xea, 0xbd, 0xa6, 0x7d, 0x9f, 0x58, 0x8e, 0x7b, 0x58, 0xcd, 0x79, 0xc7, 0xc3, 0x55, 0x1e, 0x55, + 0xa1, 0x14, 0x8a, 0x6c, 0x6c, 0x3d, 0xa8, 0xce, 0xa0, 0x22, 0xcc, 0xf8, 0x9f, 0x85, 0xb5, 0x1f, + 0x00, 0x4a, 0xe2, 0xb9, 0x68, 0xf9, 0x23, 0x29, 0x32, 0xa0, 0x14, 0x1d, 0x05, 0xd1, 0x75, 0x99, + 0x13, 0x92, 0x71, 0x55, 0x5d, 0x49, 0x17, 0xf4, 0xb9, 0xc5, 0x19, 0xf4, 0x18, 0x2a, 0xf1, 0xf9, + 0x8b, 0xa1, 0x1b, 0x52, 0xb2, 0x64, 0x33, 0xa0, 0x7a, 0x73, 0x12, 0xd1, 0x10, 0xab, 0x0b, 0xe5, + 0x58, 0xa3, 0xcf, 0xd0, 0xca, 0xb8, 0xf3, 0xc9, 0x56, 0x49, 0xbd, 0x31, 0x81, 0x64, 0x08, 0xf4, + 0x05, 0x94, 0x63, 0x9d, 0xe1, 0x18, 0x20, 0x59, 0xf7, 0xa8, 0x9e, 0x94, 0x5e, 0x38, 0x83, 0xda, + 0x70, 0x29, 0xd9, 0xcd, 0x31, 0x74, 0x4b, 0x4e, 0xb8, 0xb4, 0xe9, 0x4b, 0x03, 0xd8, 0xf1, 0x6d, + 0x3f, 0x26, 0x50, 0x1e, 0x0f, 0xe9, 0xf8, 0x9a, 0xa6, 0xfb, 0xeb, 0xd0, 0xf8, 0x88, 0xfa, 0x77, + 0x4e, 0x30, 0xfe, 0xd4, 0x08, 0x1d, 0x40, 0xa3, 0x2d, 0x24, 0x52, 0xa5, 0x87, 0xee, 0x59, 0x7d, + 0x7e, 0xa8, 0xd6, 0x65, 0xf0, 0xe3, 0xdb, 0x50, 0x9c, 0x41, 0x0f, 0x01, 0x6d, 0x10, 0xbe, 0x4d, + 0x2d, 0xb2, 0x4d, 0x8d, 0x27, 0x93, 0x60, 0x24, 0x9e, 0xd3, 0x60, 0xd1, 0xe2, 0x2e, 0xb5, 0xbb, + 0xb1, 0xb4, 0x59, 0xd8, 0x20, 0xa2, 0xc2, 0x53, 0xc6, 0xa9, 0xc1, 0x5e, 0xa0, 0x6a, 0x47, 0xd8, + 0x9c, 0x1c, 0x3a, 0x6f, 0x4e, 0x32, 0xfe, 0x04, 0xc4, 0xdf, 0x9a, 0x48, 0x36, 0x04, 0xdc, 0x11, + 0x80, 0x89, 0x67, 0xeb, 0x44, 0x4f, 0x26, 0x7c, 0xfa, 0x70, 0x06, 0x19, 0x50, 0xf6, 0x78, 0x8a, + 0x3c, 0x7b, 0xd7, 0xd2, 0xea, 0x6b, 0xe0, 0xc4, 0xf5, 0x54, 0xb9, 0xa1, 0x03, 0x6b, 0x7f, 0x15, + 0xa0, 0x28, 0x12, 0x40, 0xd4, 0xbe, 0x33, 0x8b, 0xf9, 0x36, 0x54, 0x82, 0x98, 0xbf, 0xc8, 0x70, + 0xb7, 0x4f, 0xcd, 0xbe, 0x34, 0xbc, 0x63, 0x5a, 0x1d, 0x9c, 0x41, 0x8f, 0xa0, 0x92, 0x18, 0x83, + 0xe4, 0x45, 0x68, 0xcc, 0xac, 0x94, 0x76, 0x8d, 0x0d, 0x40, 0xa3, 0xf3, 0x0b, 0xaa, 0xcb, 0x2b, + 0xc5, 0xb8, 0x39, 0x27, 0x0d, 0xe4, 0x2b, 0xa8, 0x24, 0xe6, 0x08, 0xf9, 0x85, 0x90, 0x0f, 0x1b, + 0x69, 0xda, 0x1f, 0x40, 0x29, 0x32, 0x38, 0x30, 0x79, 0x8a, 0x8e, 0x8e, 0x16, 0x69, 0x6a, 0xbf, + 0x84, 0x4a, 0xbc, 0xc9, 0x1e, 0xf3, 0x5e, 0x4a, 0x3b, 0xf1, 0x74, 0xda, 0xcf, 0xfe, 0x62, 0xdd, + 0xbd, 0xb3, 0xf3, 0x71, 0x97, 0xf2, 0xbd, 0x41, 0xc7, 0x83, 0x5f, 0x3d, 0xa2, 0xbd, 0x1e, 0x3d, + 0xe2, 0xc4, 0xd8, 0x5b, 0xf5, 0x35, 0xbc, 0x6b, 0x52, 0xc6, 0x5d, 0xda, 0x19, 0x70, 0x62, 0xae, + 0x0e, 0x8b, 0xc0, 0xaa, 0x50, 0xbb, 0x2a, 0xd4, 0xf6, 0x3b, 0x9d, 0x82, 0x58, 0xde, 0xfe, 0x2f, + 0x00, 0x00, 0xff, 0xff, 0x36, 0xce, 0x45, 0x9b, 0x98, 0x17, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index ff9545780..9f4629c1b 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -445,18 +445,20 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S } // segments are ordered before LoadSegments calling - if in.LastSegmentState.State == commonpb.SegmentState_SegmentGrowing { - segmentNum := len(segmentIDs) - position := in.LastSegmentState.StartPosition - err = node.loadService.seekSegment(position) - if err != nil { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), + for i, state := range in.SegmentStates { + if state.State == commonpb.SegmentState_SegmentGrowing { + position := state.StartPosition + err = node.loadService.seekSegment(position) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err } - return status, err + segmentIDs = segmentIDs[:i] + break } - segmentIDs = segmentIDs[:segmentNum-1] } err = node.loadService.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go index 2d165ad46..c7c97085a 100644 --- a/internal/queryservice/meta_replica.go +++ b/internal/queryservice/meta_replica.go @@ -13,6 +13,9 @@ type metaReplica interface { loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) + releaseCollection(dbID UniqueID, collectionID UniqueID) error + releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error + addDmChannels(collectionID UniqueID, channels []string) error } type segment struct { @@ -26,9 +29,9 @@ type partition struct { } type collection struct { - id UniqueID - partitions map[UniqueID]*partition - node2channel map[int][]string + id UniqueID + partitions map[UniqueID]*partition + dmChannelNames []string } type metaReplicaImpl struct { @@ -48,18 +51,18 @@ func newMetaReplica() metaReplica { } func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { + //TODO:: assert dbID = 0 exist if _, ok := mp.db2collections[dbID]; ok { partitions := make(map[UniqueID]*partition) - node2channel := make(map[int][]string) newCollection := &collection{ - id: collectionID, - partitions: partitions, - node2channel: node2channel, + id: collectionID, + partitions: partitions, } mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection) return newCollection, nil } - return nil, errors.New("can't find dbID when add collection") + + return nil, errors.New("addCollection: can't find dbID when add collection") } func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { @@ -78,7 +81,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa } } } - return nil, errors.New("can't find collection when add partition") + return nil, errors.New("addPartition: can't find collection when add partition") } func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) { @@ -86,7 +89,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) return collections, nil } - return nil, errors.New("can't find collectionID") + return nil, errors.New("getCollections: can't find collectionID") } func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) { @@ -102,7 +105,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ( } } - return nil, errors.New("can't find partitionIDs") + return nil, errors.New("getPartitions: can't find partitionIDs") } func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) { @@ -119,7 +122,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par } } } - return nil, errors.New("can't find segmentID") + return nil, errors.New("getSegments: can't find segmentID") } func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { @@ -127,14 +130,16 @@ func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) if collections, err := mp.getCollections(dbID); err == nil { for _, collection := range collections { if collectionID == collection.id { - return res, nil + res = collection } } - } else { - res, err = mp.addCollection(dbID, collectionID) + } + if res == nil { + collection, err := mp.addCollection(dbID, collectionID) if err != nil { return nil, err } + res = collection } return res, nil } @@ -177,7 +182,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID, } } } - return errors.New("update partition state fail") + return errors.New("updatePartitionState: update partition state fail") } func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, @@ -203,3 +208,54 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, } return partitionStates, nil } + +func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error { + if collections, ok := mp.db2collections[dbID]; ok { + for i, collection := range collections { + if collectionID == collection.id { + collections = append(collections[:i], collections[i+1:]...) + return nil + } + } + } + return errors.New("releaseCollection: can't find dbID or collectionID") +} + +func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error { + if collections, ok := mp.db2collections[dbID]; ok { + for _, collection := range collections { + if collectionID == collection.id { + if _, ok := collection.partitions[partitionID]; ok { + delete(collection.partitions, partitionID) + return nil + } + } + } + } + return errors.New("releasePartition: can't find dbID or collectionID or partitionID") +} + +func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []string) error { + //TODO :: use dbID + if collections, ok := mp.db2collections[0]; ok { + for _, collection := range collections { + if collectionID == collection.id { + dmChannels := collection.dmChannelNames + for _, channel := range channels { + match := false + for _, existedChannel := range dmChannels { + if channel == existedChannel { + match = true + break + } + } + if !match { + dmChannels = append(dmChannels, channel) + } + } + return nil + } + } + } + return errors.New("addDmChannels: can't find dbID or collectionID") +} diff --git a/internal/queryservice/querynode.go b/internal/queryservice/querynode.go index bc43fa7c0..46b03fa41 100644 --- a/internal/queryservice/querynode.go +++ b/internal/queryservice/querynode.go @@ -8,8 +8,6 @@ import ( type queryNodeInfo struct { client QueryNodeInterface - insertChannels string - nodeID uint64 segments []UniqueID dmChannelNames []string } @@ -25,3 +23,21 @@ func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb func (qn *queryNodeInfo) GetSegmentInfo(in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) { return qn.client.GetSegmentInfo(in) } + +func (qn *queryNodeInfo) WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { + return qn.client.WatchDmChannels(in) +} + +func (qn *queryNodeInfo) AddDmChannels(channels []string) { + qn.dmChannelNames = append(qn.dmChannelNames, channels...) +} + +func newQueryNodeInfo(client QueryNodeInterface) *queryNodeInfo { + segments := make([]UniqueID, 0) + dmChannelNames := make([]string, 0) + return &queryNodeInfo{ + client: client, + segments: segments, + dmChannelNames: dmChannelNames, + } +} diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 01b60046b..cb55a836d 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -3,7 +3,6 @@ package queryservice import ( "context" "fmt" - "log" "sort" "strconv" "sync/atomic" @@ -25,6 +24,7 @@ type MasterServiceInterface interface { type DataServiceInterface interface { GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) + GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) } type QueryNodeInterface interface { @@ -47,7 +47,7 @@ type QueryService struct { dataServiceClient DataServiceInterface masterServiceClient MasterServiceInterface - queryNodes []*queryNodeInfo + queryNodes map[UniqueID]*queryNodeInfo numRegisterNode uint64 numQueryChannel uint64 @@ -87,7 +87,7 @@ func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, erro componentStates, err := node.GetComponentStates() if err != nil { subComponentInfos = append(subComponentInfos, &internalpb2.ComponentInfo{ - NodeID: int64(nodeID), + NodeID: nodeID, StateCode: internalpb2.StateCode_ABNORMAL, }) continue @@ -111,28 +111,21 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { return Params.StatsChannelName, nil } -// TODO:: do addWatchDmChannel to query node after registerNode func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { fmt.Println("register query node =", req.Address) // TODO:: add mutex - allocatedID := uint64(len(qs.queryNodes)) + allocatedID := len(qs.queryNodes) registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) var node *queryNodeInfo if qs.enableGrpc { client := nodeclient.NewClient(registerNodeAddress) - node = &queryNodeInfo{ - client: client, - nodeID: allocatedID, - } + node = newQueryNodeInfo(client) } else { - client := querynode.NewQueryNode(qs.loopCtx, allocatedID) - node = &queryNodeInfo{ - client: client, - nodeID: allocatedID, - } + client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID)) + node = newQueryNodeInfo(client) } - qs.queryNodes = append(qs.queryNodes, node) + qs.queryNodes[UniqueID(allocatedID)] = node //TODO::return init params to queryNode return &querypb.RegisterNodeResponse{ @@ -186,10 +179,20 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com if err != nil { return fn(err), err } - if collection == nil { + + if len(collection.dmChannelNames) != 0 { return fn(nil), nil } + channelRequest := datapb.InsertChannelRequest{ + DbID: req.DbID, + CollectionID: req.CollectionID, + } + dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + if err != nil { + return fn(err), err + } + // get partitionIDs showPartitionRequest := &milvuspb.ShowPartitionRequest{ Base: &commonpb.MsgBase{ @@ -207,6 +210,21 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com } partitionIDs := showPartitionResponse.PartitionIDs + if len(partitionIDs) == 0 { + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + } + for _, node := range qs.queryNodes { + _, err := node.LoadSegments(loadSegmentRequest) + if err != nil { + return fn(err), err + } + } + nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) + err = qs.watchDmChannels(dmChannels, nodeIDs, collection) + return fn(err), err + } + loadPartitionsRequest := &querypb.LoadPartitionRequest{ Base: req.Base, DbID: dbID, @@ -244,6 +262,14 @@ func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest) status, err := qs.ReleasePartitions(releasePartitionRequest) + err = qs.replica.releaseCollection(dbID, collectionID) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, err + } + //TODO:: queryNode cancel subscribe dmChannels return status, err } @@ -276,16 +302,40 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs - qs.replica.loadPartition(dbID, collectionID, partitionIDs[0]) + fn := func(err error) *commonpb.Status { + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + } return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), + ErrorCode: commonpb.ErrorCode_SUCCESS, } } - // get segments and load segment to query node + if len(partitionIDs) == 0 { + err := errors.New("partitionIDs are empty") + return fn(err), err + } + + var collection *collection = nil + var err error + if collection, err = qs.replica.loadCollection(dbID, collectionID); err != nil { + return fn(err), err + } + for _, partitionID := range partitionIDs { + partition, err := qs.replica.loadPartition(dbID, collectionID, partitionID) + if err != nil { + return fn(err), err + } + + if partition == nil { + return fn(err), nil + } + showSegmentRequest := &milvuspb.ShowSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowSegment, @@ -297,92 +347,110 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm if err != nil { return fn(err), err } - if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason) - } + segmentIDs := showSegmentResponse.SegmentIDs segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo) - channel2id := make(map[string]int) - //id2channels := make(map[int][]string) - id2segs := make(map[int][]UniqueID) - offset := 0 + channel2segs := make(map[string][]UniqueID) resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{ SegmentIDs: segmentIDs, }) if err != nil { - log.Fatal("get segment states fail") + return fn(err), err } for _, state := range resp.States { segmentID := state.SegmentID segmentStates[segmentID] = state - var flatChannelName string - // channelNames := make([]string, 0) - // for i, str := range state.StartPositions { - // flatChannelName += str.ChannelName - // channelNames = append(channelNames, str.ChannelName) - // if i+1 < len(state.StartPositions) { - // flatChannelName += "/" - // } - // } - if flatChannelName == "" { - log.Fatal("segmentState's channel name is empty") - } - if _, ok := channel2id[flatChannelName]; !ok { - channel2id[flatChannelName] = offset - //id2channels[offset] = channelNames - id2segs[offset] = make([]UniqueID, 0) - id2segs[offset] = append(id2segs[offset], segmentID) - offset++ + + channelName := state.StartPosition.ChannelName + + if _, ok := channel2segs[channelName]; !ok { + segments := make([]UniqueID, 0) + segments = append(segments, segmentID) + channel2segs[channelName] = segments } else { - //TODO::check channel name - id := channel2id[flatChannelName] - id2segs[id] = append(id2segs[id], segmentID) + channel2segs[channelName] = append(channel2segs[channelName], segmentID) } } - for key, value := range id2segs { - sort.Slice(value, func(i, j int) bool { return segmentStates[value[i]].CreateTime < segmentStates[value[j]].CreateTime }) - selectedSegs := make([]UniqueID, 0) - for i, v := range value { - if segmentStates[v].State == commonpb.SegmentState_SegmentFlushed { - selectedSegs = append(selectedSegs, v) - } else { - if i > 0 && segmentStates[selectedSegs[i-1]].State != commonpb.SegmentState_SegmentFlushed { + + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory) + for channel, segmentIDs := range channel2segs { + sort.Slice(segmentIDs, func(i, j int) bool { + return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp + }) + var channelLoadDone = false + for _, node := range qs.queryNodes { + channels2node := node.dmChannelNames + for _, ch := range channels2node { + if channel == ch { + channelLoadDone = true break } - selectedSegs = append(selectedSegs, v) + } + if channelLoadDone { + break + } + } + if !channelLoadDone { + states := make([]*datapb.SegmentStateInfo, 0) + for _, id := range segmentIDs { + states = append(states, segmentStates[id]) + } + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + PartitionID: partitionID, + SegmentIDs: segmentIDs, + SegmentStates: states, + } + dmChannels := []string{channel} + nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) + err = qs.watchDmChannels(dmChannels, nodeIDs, collection) + if err != nil { + return fn(err), err + } + queryNode := qs.queryNodes[nodeIDs[0]] + //TODO:: seek when loadSegment may cause more msgs consumed + status, err := queryNode.LoadSegments(loadSegmentRequest) + if err != nil { + return status, err } } - id2segs[key] = selectedSegs } - qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory) + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) + } - // TODO:: filter channel for query node - for channels, i := range channel2id { - for key, node := range qs.queryNodes { - if channels == node.insertChannels { - statesID := id2segs[i][len(id2segs[i])-1] - //TODO :: should be start position - // position := segmentStates[statesID-1].StartPositions - // segmentStates[statesID].StartPositions = position - loadSegmentRequest := &querypb.LoadSegmentRequest{ - CollectionID: collectionID, - PartitionID: partitionID, - SegmentIDs: id2segs[i], - LastSegmentState: segmentStates[statesID], - } - status, err := qs.queryNodes[key].LoadSegments(loadSegmentRequest) - if err != nil { - return status, err - } + if len(collection.dmChannelNames) == 0 { + channelRequest := datapb.InsertChannelRequest{ + DbID: dbID, + CollectionID: collectionID, + } + + dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + if err != nil { + return fn(err), err + } + for _, partitionID := range partitionIDs { + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + PartitionID: partitionID, + } + for _, node := range qs.queryNodes { + _, err := node.LoadSegments(loadSegmentRequest) + if err != nil { + return fn(err), nil } } + nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) + err = qs.watchDmChannels(dmChannels, nodeIDs, collection) + } + if err != nil { + return fn(err), err } - qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) } + return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -407,6 +475,13 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest) } segmentIDs = append(segmentIDs, res...) + err = qs.replica.releasePartition(dbID, collectionID, partitionID) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, err + } } releaseSegmentRequest := &querypb.ReleaseSegmentRequest{ Base: req.Base, @@ -423,6 +498,7 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest) } } + //TODO:: queryNode cancel subscribe dmChannels return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -486,14 +562,14 @@ func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*queryp } func NewQueryService(ctx context.Context) (*QueryService, error) { - nodes := make([]*queryNodeInfo, 0) + nodes := make(map[UniqueID]*queryNodeInfo) ctx1, cancel := context.WithCancel(ctx) replica := newMetaReplica() service := &QueryService{ loopCtx: ctx1, loopCancel: cancel, - queryNodes: nodes, replica: replica, + queryNodes: nodes, numRegisterNode: 0, numQueryChannel: 0, enableGrpc: false, @@ -514,3 +590,77 @@ func (qs *QueryService) SetDataService(dataService DataServiceInterface) { func (qs *QueryService) SetEnableGrpc(en bool) { qs.enableGrpc = en } + +func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []UniqueID, collection *collection) error { + err := qs.replica.addDmChannels(collection.id, dmChannels) + if err != nil { + return err + } + node2channels := make(map[UniqueID][]string) + for i, channel := range dmChannels { + nodeID := assignedNodeIDs[i] + findChannel := false + for _, ch := range collection.dmChannelNames { + if channel == ch { + findChannel = true + } + } + if !findChannel { + if _, ok := node2channels[nodeID]; ok { + node2channels[nodeID] = append(node2channels[nodeID], channel) + } else { + channels := make([]string, 0) + channels = append(channels, channel) + node2channels[nodeID] = channels + } + } + } + + for nodeID, channels := range node2channels { + node := qs.queryNodes[nodeID] + request := &querypb.WatchDmChannelsRequest{ + ChannelIDs: channels, + } + _, err := node.WatchDmChannels(request) + node.AddDmChannels(channels) + if err != nil { + return err + } + } + + return nil +} + +func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []UniqueID { + maxNumDMChannel := 0 + res := make([]UniqueID, 0) + node2lens := make(map[UniqueID]int) + for id, node := range qs.queryNodes { + node2lens[id] = len(node.dmChannelNames) + } + offset := 0 + for { + lastOffset := offset + for id, len := range node2lens { + if len >= maxNumDMChannel { + maxNumDMChannel = len + } else { + res = append(res, id) + node2lens[id]++ + offset++ + } + } + if lastOffset == offset { + for id := range node2lens { + res = append(res, id) + node2lens[id]++ + offset++ + break + } + } + if offset == len(dmChannels) { + break + } + } + return res +} diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index e0f26c162..a6e36bbee 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -155,6 +155,9 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap return ret, nil } +func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { + return []string{"test-insert"}, nil +} func TestQueryService_Init(t *testing.T) { service, err := NewQueryService(context.Background()) -- GitLab