diff --git a/cmd/querynode/query_node.go b/cmd/querynode/query_node.go index 4c7a529277b328dc66501137b0cb798b6e6e1ebf..271dc1c329945afc77bf6b05cb1bf76af19f2f94 100644 --- a/cmd/querynode/query_node.go +++ b/cmd/querynode/query_node.go @@ -10,16 +10,16 @@ import ( "go.uber.org/zap" - "github.com/zilliztech/milvus-distributed/internal/querynode" + querynodeimp "github.com/zilliztech/milvus-distributed/internal/querynode" ) func main() { - querynode.Init() - fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID) + querynodeimp.Init() + fmt.Println("QueryNodeID is", querynodeimp.Params.QueryNodeID) // Creates server. ctx, cancel := context.WithCancel(context.Background()) - svr := querynode.NewQueryNode(ctx, 0) + svr := querynodeimp.NewQueryNode(ctx, 0) sc := make(chan os.Signal, 1) signal.Notify(sc, diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index c6504686ddea7f1b450a83080ad6fc99ac3f6b0f..e61e047a21515aa34e6c2912cd648e687b005ff3 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -17,7 +17,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/indexnode" "github.com/zilliztech/milvus-distributed/internal/master" "github.com/zilliztech/milvus-distributed/internal/proxynode" - "github.com/zilliztech/milvus-distributed/internal/querynode" + querynodeimp "github.com/zilliztech/milvus-distributed/internal/querynode" "github.com/zilliztech/milvus-distributed/internal/writenode" ) @@ -101,11 +101,11 @@ func InitProxy(wg *sync.WaitGroup) { func InitQueryNode(wg *sync.WaitGroup) { defer wg.Done() - querynode.Init() - fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID) + querynodeimp.Init() + fmt.Println("QueryNodeID is", querynodeimp.Params.QueryNodeID) // Creates server. ctx, cancel := context.WithCancel(context.Background()) - svr := querynode.NewQueryNode(ctx, 0) + svr := querynodeimp.NewQueryNode(ctx, 0) sc := make(chan os.Signal, 1) signal.Notify(sc, diff --git a/internal/distributed/querynode/client.go b/internal/distributed/querynode/client.go new file mode 100644 index 0000000000000000000000000000000000000000..6e073c8b0f57463f666b3ea8d3f5d73ba0f6e7b2 --- /dev/null +++ b/internal/distributed/querynode/client.go @@ -0,0 +1,12 @@ +package querynode + +import ( + "context" + + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" +) + +type Client struct { + ctx context.Context + querypb.QueryNodeClient +} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go new file mode 100644 index 0000000000000000000000000000000000000000..93026851c16e28b0644626abbdaf5ff217cfc2b9 --- /dev/null +++ b/internal/distributed/querynode/service.go @@ -0,0 +1,68 @@ +package querynode + +import ( + "context" + "net" + + "google.golang.org/grpc" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + querynodeimp "github.com/zilliztech/milvus-distributed/internal/querynode" +) + +type Server struct { + grpcServer *grpc.Server + node querynodeimp.Node +} + +func NewServer(ctx context.Context, queryNodeID uint64) *Server { + return &Server{ + node: querynodeimp.NewQueryNode(ctx, queryNodeID), + } +} + +func (s *Server) StartGrpcServer() { + // TODO: add address + lis, err := net.Listen("tcp", "") + if err != nil { + panic(err) + } + + s.grpcServer = grpc.NewServer() + querypb.RegisterQueryNodeServer(s.grpcServer, s) + if err = s.grpcServer.Serve(lis); err != nil { + panic(err) + } +} + +func (s *Server) Start() { + go s.StartGrpcServer() + if err := s.node.Start(); err != nil { + panic(err) + } +} + +func (s *Server) AddQueryChannel(ctx context.Context, in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) { + return s.node.AddQueryChannel(ctx, in) +} + +func (s *Server) RemoveQueryChannel(ctx context.Context, in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) { + return s.node.RemoveQueryChannel(ctx, in) +} + +func (s *Server) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { + return s.node.WatchDmChannels(ctx, in) +} + +func (s *Server) LoadSegments(ctx context.Context, in *querypb.LoadSegmentRequest) (*commonpb.Status, error) { + return s.node.LoadSegments(ctx, in) +} + +func (s *Server) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) { + return s.node.ReleaseSegments(ctx, in) +} + +func (s *Server) GetPartitionState(ctx context.Context, in *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) { + return s.node.GetPartitionState(ctx, in) +} diff --git a/internal/distributed/queryservice/client.go b/internal/distributed/queryservice/client.go new file mode 100644 index 0000000000000000000000000000000000000000..34dcaee1ee3b9e376f994860e0931c9344126bba --- /dev/null +++ b/internal/distributed/queryservice/client.go @@ -0,0 +1,12 @@ +package queryservice + +import ( + "context" + + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" +) + +type Client struct { + ctx context.Context + querypb.QueryServiceClient +} diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go new file mode 100644 index 0000000000000000000000000000000000000000..37e0e7577ceb318e67c0379fdc787a873e8154cd --- /dev/null +++ b/internal/distributed/queryservice/service.go @@ -0,0 +1,49 @@ +package queryservice + +import ( + "google.golang.org/grpc" + + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + queryServiceImpl "github.com/zilliztech/milvus-distributed/internal/queryservice" +) + +type Server struct { + grpcServer *grpc.Server + queryService queryServiceImpl.QueryService +} + +func (s *Server) RegisterNode(req querypb.RegisterNodeRequest) (querypb.RegisterNodeResponse, error) { + return s.queryService.RegisterNode(req) +} + +func (s *Server) ShowCollections(req querypb.ShowCollectionRequest) (querypb.ShowCollectionResponse, error) { + return s.ShowCollections(req) +} + +func (s *Server) LoadCollection(req querypb.LoadCollectionRequest) error { + return s.LoadCollection(req) +} + +func (s *Server) ReleaseCollection(req querypb.ReleaseCollectionRequest) error { + return s.ReleaseCollection(req) +} + +func (s *Server) ShowPartitions(req querypb.ShowPartitionRequest) (querypb.ShowPartitionResponse, error) { + return s.ShowPartitions(req) +} + +func (s *Server) GetPartitionStates(req querypb.PartitionStatesRequest) (querypb.PartitionStatesResponse, error) { + return s.GetPartitionStates(req) +} + +func (s *Server) LoadPartitions(req querypb.LoadPartitionRequest) error { + return s.LoadPartitions(req) +} + +func (s *Server) ReleasePartitions(req querypb.ReleasePartitionRequest) error { + return s.ReleasePartitions(req) +} + +func (s *Server) CreateQueryChannel() (querypb.CreateQueryChannelResponse, error) { + return s.CreateQueryChannel() +} diff --git a/internal/master/master.go b/internal/master/master.go index a9ed79d0fbec12ac0a6dc24ed85aab460787b813..5f092b8ab3854098c38e700b3dce69e77fb245a5 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -189,9 +189,9 @@ func CreateServer(ctx context.Context) (*Master, error) { if err != nil { return nil, err } - loadIndexClient := client.NewLoadIndexClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames) + queryNodeClient := client.NewQueryNodeClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames) - m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable) + m.indexLoadSch = NewIndexLoadScheduler(ctx, queryNodeClient, m.metaTable) m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch) m.flushSch = NewFlushScheduler(ctx, flushClient, m.metaTable, m.indexBuildSch, func() (Timestamp, error) { return m.tsoAllocator.AllocOne() }) diff --git a/internal/proto/query_service.proto b/internal/proto/query_service.proto index 812b5506515b51bbc9122a9c7834fe3900f1df47..a1150b85045994bb42b04ce1d17bda96fc543483 100644 --- a/internal/proto/query_service.proto +++ b/internal/proto/query_service.proto @@ -78,7 +78,7 @@ message PartitionStatesResponse { } -message LoadPartitonRequest { +message LoadPartitionRequest { internal.MsgBase base = 1; int64 dbID = 2; int64 collectionID = 3; @@ -98,20 +98,20 @@ message CreateQueryChannelResponse { } -message AddQueryChannelRequest { +message AddQueryChannelsRequest { internal.MsgBase base = 1; string request_channelID = 2; string result_channelID = 3; } -message RemoveQueryChannelRequest { +message RemoveQueryChannelsRequest { internal.MsgBase base = 1; string request_channelID = 2; string result_channelID = 3; } -message WatchDmChannelRequest { +message WatchDmChannelsRequest { internal.MsgBase base = 1; repeated string channelIDs = 2; } @@ -135,6 +135,21 @@ message ReleaseSegmentRequest { repeated int64 fieldIDs = 6; } +message GetTimeTickChannelResponse { + common.Status status = 1; + string time_tick_channelID = 2; +} + +message GetStatsChannelResponse { + common.Status status = 1; + string stats_channelID = 2; +} + +message ServiceStatesResponse { + common.Status status = 1; + internal.ServiceStates server_states = 2; +} + service QueryService { /** * @brief This method is used to create collection @@ -144,17 +159,27 @@ service QueryService { * @return Status */ rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {} - rpc ShowCollections(ShowCollectionRequest) returns (ShowCollectionResponse) {} - rpc LoadCollection(LoadCollectionRequest) returns (common.Status) {} - rpc ReleaseCollection(ReleaseCollectionRequest) returns (common.Status) {} + + rpc ShowCollections(ShowCollectionRequest) returns (ShowCollectionResponse) {} rpc ShowPartitions(ShowPartitionRequest) returns (ShowPartitionResponse) {} - rpc GetPartitionStates(PartitionStatesRequest) returns (PartitionStatesResponse) {} - rpc LoadPartitions(LoadPartitonRequest) returns (common.Status) {} + + rpc LoadPartitions(LoadPartitionRequest) returns (common.Status) {} rpc ReleasePartitions(ReleasePartitionRequest) returns (common.Status) {} + rpc LoadCollection(LoadCollectionRequest) returns (common.Status) {} + rpc ReleaseCollection(ReleaseCollectionRequest) returns (common.Status) {} + rpc CreateQueryChannel(common.Empty ) returns (CreateQueryChannelResponse) {} + rpc GetTimeTickChannel(common.Empty) returns (GetTimeTickChannelResponse) {} + rpc GetStatsChannel(common.Empty) returns (GetStatsChannelResponse) {} + rpc GetPartitionStates(PartitionStatesRequest) returns (PartitionStatesResponse) {} + rpc GetServiceStates(common.Empty) returns (ServiceStatesResponse) {} } - - - - +service QueryNode { + rpc AddQueryChannel(AddQueryChannelsRequest) returns (common.Status) {} + rpc RemoveQueryChannel(RemoveQueryChannelsRequest) returns (common.Status) {} + rpc WatchDmChannels(WatchDmChannelsRequest) returns (common.Status) {} + rpc LoadSegments(LoadSegmentRequest) returns (common.Status) {} + rpc ReleaseSegments(ReleaseSegmentRequest) returns (common.Status) {} + rpc GetPartitionState(PartitionStatesRequest) returns (PartitionStatesResponse) {} +} diff --git a/internal/proto/querypb/query_service.pb.go b/internal/proto/querypb/query_service.pb.go index 0fb7fe9d6a5388e03f19ac727e8fee9540dad8e2..4745848aee27298f2773db6deaf55cc2829b7747 100644 --- a/internal/proto/querypb/query_service.pb.go +++ b/internal/proto/querypb/query_service.pb.go @@ -591,7 +591,7 @@ func (m *PartitionStatesResponse) GetPartitionDescriptions() []*PartitionStates return nil } -type LoadPartitonRequest struct { +type LoadPartitionRequest struct { Base *internalpb2.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` @@ -601,53 +601,53 @@ type LoadPartitonRequest struct { XXX_sizecache int32 `json:"-"` } -func (m *LoadPartitonRequest) Reset() { *m = LoadPartitonRequest{} } -func (m *LoadPartitonRequest) String() string { return proto.CompactTextString(m) } -func (*LoadPartitonRequest) ProtoMessage() {} -func (*LoadPartitonRequest) Descriptor() ([]byte, []int) { +func (m *LoadPartitionRequest) Reset() { *m = LoadPartitionRequest{} } +func (m *LoadPartitionRequest) String() string { return proto.CompactTextString(m) } +func (*LoadPartitionRequest) ProtoMessage() {} +func (*LoadPartitionRequest) Descriptor() ([]byte, []int) { return fileDescriptor_5fcb6756dc1afb8d, []int{11} } -func (m *LoadPartitonRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_LoadPartitonRequest.Unmarshal(m, b) +func (m *LoadPartitionRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_LoadPartitionRequest.Unmarshal(m, b) } -func (m *LoadPartitonRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_LoadPartitonRequest.Marshal(b, m, deterministic) +func (m *LoadPartitionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_LoadPartitionRequest.Marshal(b, m, deterministic) } -func (m *LoadPartitonRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_LoadPartitonRequest.Merge(m, src) +func (m *LoadPartitionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LoadPartitionRequest.Merge(m, src) } -func (m *LoadPartitonRequest) XXX_Size() int { - return xxx_messageInfo_LoadPartitonRequest.Size(m) +func (m *LoadPartitionRequest) XXX_Size() int { + return xxx_messageInfo_LoadPartitionRequest.Size(m) } -func (m *LoadPartitonRequest) XXX_DiscardUnknown() { - xxx_messageInfo_LoadPartitonRequest.DiscardUnknown(m) +func (m *LoadPartitionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LoadPartitionRequest.DiscardUnknown(m) } -var xxx_messageInfo_LoadPartitonRequest proto.InternalMessageInfo +var xxx_messageInfo_LoadPartitionRequest proto.InternalMessageInfo -func (m *LoadPartitonRequest) GetBase() *internalpb2.MsgBase { +func (m *LoadPartitionRequest) GetBase() *internalpb2.MsgBase { if m != nil { return m.Base } return nil } -func (m *LoadPartitonRequest) GetDbID() int64 { +func (m *LoadPartitionRequest) GetDbID() int64 { if m != nil { return m.DbID } return 0 } -func (m *LoadPartitonRequest) GetCollectionID() int64 { +func (m *LoadPartitionRequest) GetCollectionID() int64 { if m != nil { return m.CollectionID } return 0 } -func (m *LoadPartitonRequest) GetPartitionIDs() []int64 { +func (m *LoadPartitionRequest) GetPartitionIDs() []int64 { if m != nil { return m.PartitionIDs } @@ -764,7 +764,7 @@ func (m *CreateQueryChannelResponse) GetResultChannel() string { return "" } -type AddQueryChannelRequest struct { +type AddQueryChannelsRequest struct { Base *internalpb2.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` RequestChannelID string `protobuf:"bytes,2,opt,name=request_channelID,json=requestChannelID,proto3" json:"request_channelID,omitempty"` ResultChannelID string `protobuf:"bytes,3,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"` @@ -773,53 +773,53 @@ type AddQueryChannelRequest struct { XXX_sizecache int32 `json:"-"` } -func (m *AddQueryChannelRequest) Reset() { *m = AddQueryChannelRequest{} } -func (m *AddQueryChannelRequest) String() string { return proto.CompactTextString(m) } -func (*AddQueryChannelRequest) ProtoMessage() {} -func (*AddQueryChannelRequest) Descriptor() ([]byte, []int) { +func (m *AddQueryChannelsRequest) Reset() { *m = AddQueryChannelsRequest{} } +func (m *AddQueryChannelsRequest) String() string { return proto.CompactTextString(m) } +func (*AddQueryChannelsRequest) ProtoMessage() {} +func (*AddQueryChannelsRequest) Descriptor() ([]byte, []int) { return fileDescriptor_5fcb6756dc1afb8d, []int{14} } -func (m *AddQueryChannelRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_AddQueryChannelRequest.Unmarshal(m, b) +func (m *AddQueryChannelsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_AddQueryChannelsRequest.Unmarshal(m, b) } -func (m *AddQueryChannelRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_AddQueryChannelRequest.Marshal(b, m, deterministic) +func (m *AddQueryChannelsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_AddQueryChannelsRequest.Marshal(b, m, deterministic) } -func (m *AddQueryChannelRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_AddQueryChannelRequest.Merge(m, src) +func (m *AddQueryChannelsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_AddQueryChannelsRequest.Merge(m, src) } -func (m *AddQueryChannelRequest) XXX_Size() int { - return xxx_messageInfo_AddQueryChannelRequest.Size(m) +func (m *AddQueryChannelsRequest) XXX_Size() int { + return xxx_messageInfo_AddQueryChannelsRequest.Size(m) } -func (m *AddQueryChannelRequest) XXX_DiscardUnknown() { - xxx_messageInfo_AddQueryChannelRequest.DiscardUnknown(m) +func (m *AddQueryChannelsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_AddQueryChannelsRequest.DiscardUnknown(m) } -var xxx_messageInfo_AddQueryChannelRequest proto.InternalMessageInfo +var xxx_messageInfo_AddQueryChannelsRequest proto.InternalMessageInfo -func (m *AddQueryChannelRequest) GetBase() *internalpb2.MsgBase { +func (m *AddQueryChannelsRequest) GetBase() *internalpb2.MsgBase { if m != nil { return m.Base } return nil } -func (m *AddQueryChannelRequest) GetRequestChannelID() string { +func (m *AddQueryChannelsRequest) GetRequestChannelID() string { if m != nil { return m.RequestChannelID } return "" } -func (m *AddQueryChannelRequest) GetResultChannelID() string { +func (m *AddQueryChannelsRequest) GetResultChannelID() string { if m != nil { return m.ResultChannelID } return "" } -type RemoveQueryChannelRequest struct { +type RemoveQueryChannelsRequest struct { Base *internalpb2.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` RequestChannelID string `protobuf:"bytes,2,opt,name=request_channelID,json=requestChannelID,proto3" json:"request_channelID,omitempty"` ResultChannelID string `protobuf:"bytes,3,opt,name=result_channelID,json=resultChannelID,proto3" json:"result_channelID,omitempty"` @@ -828,53 +828,53 @@ type RemoveQueryChannelRequest struct { XXX_sizecache int32 `json:"-"` } -func (m *RemoveQueryChannelRequest) Reset() { *m = RemoveQueryChannelRequest{} } -func (m *RemoveQueryChannelRequest) String() string { return proto.CompactTextString(m) } -func (*RemoveQueryChannelRequest) ProtoMessage() {} -func (*RemoveQueryChannelRequest) Descriptor() ([]byte, []int) { +func (m *RemoveQueryChannelsRequest) Reset() { *m = RemoveQueryChannelsRequest{} } +func (m *RemoveQueryChannelsRequest) String() string { return proto.CompactTextString(m) } +func (*RemoveQueryChannelsRequest) ProtoMessage() {} +func (*RemoveQueryChannelsRequest) Descriptor() ([]byte, []int) { return fileDescriptor_5fcb6756dc1afb8d, []int{15} } -func (m *RemoveQueryChannelRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_RemoveQueryChannelRequest.Unmarshal(m, b) +func (m *RemoveQueryChannelsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RemoveQueryChannelsRequest.Unmarshal(m, b) } -func (m *RemoveQueryChannelRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_RemoveQueryChannelRequest.Marshal(b, m, deterministic) +func (m *RemoveQueryChannelsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RemoveQueryChannelsRequest.Marshal(b, m, deterministic) } -func (m *RemoveQueryChannelRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_RemoveQueryChannelRequest.Merge(m, src) +func (m *RemoveQueryChannelsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RemoveQueryChannelsRequest.Merge(m, src) } -func (m *RemoveQueryChannelRequest) XXX_Size() int { - return xxx_messageInfo_RemoveQueryChannelRequest.Size(m) +func (m *RemoveQueryChannelsRequest) XXX_Size() int { + return xxx_messageInfo_RemoveQueryChannelsRequest.Size(m) } -func (m *RemoveQueryChannelRequest) XXX_DiscardUnknown() { - xxx_messageInfo_RemoveQueryChannelRequest.DiscardUnknown(m) +func (m *RemoveQueryChannelsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RemoveQueryChannelsRequest.DiscardUnknown(m) } -var xxx_messageInfo_RemoveQueryChannelRequest proto.InternalMessageInfo +var xxx_messageInfo_RemoveQueryChannelsRequest proto.InternalMessageInfo -func (m *RemoveQueryChannelRequest) GetBase() *internalpb2.MsgBase { +func (m *RemoveQueryChannelsRequest) GetBase() *internalpb2.MsgBase { if m != nil { return m.Base } return nil } -func (m *RemoveQueryChannelRequest) GetRequestChannelID() string { +func (m *RemoveQueryChannelsRequest) GetRequestChannelID() string { if m != nil { return m.RequestChannelID } return "" } -func (m *RemoveQueryChannelRequest) GetResultChannelID() string { +func (m *RemoveQueryChannelsRequest) GetResultChannelID() string { if m != nil { return m.ResultChannelID } return "" } -type WatchDmChannelRequest struct { +type WatchDmChannelsRequest struct { Base *internalpb2.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` ChannelIDs []string `protobuf:"bytes,2,rep,name=channelIDs,proto3" json:"channelIDs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -882,39 +882,39 @@ type WatchDmChannelRequest struct { XXX_sizecache int32 `json:"-"` } -func (m *WatchDmChannelRequest) Reset() { *m = WatchDmChannelRequest{} } -func (m *WatchDmChannelRequest) String() string { return proto.CompactTextString(m) } -func (*WatchDmChannelRequest) ProtoMessage() {} -func (*WatchDmChannelRequest) Descriptor() ([]byte, []int) { +func (m *WatchDmChannelsRequest) Reset() { *m = WatchDmChannelsRequest{} } +func (m *WatchDmChannelsRequest) String() string { return proto.CompactTextString(m) } +func (*WatchDmChannelsRequest) ProtoMessage() {} +func (*WatchDmChannelsRequest) Descriptor() ([]byte, []int) { return fileDescriptor_5fcb6756dc1afb8d, []int{16} } -func (m *WatchDmChannelRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_WatchDmChannelRequest.Unmarshal(m, b) +func (m *WatchDmChannelsRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_WatchDmChannelsRequest.Unmarshal(m, b) } -func (m *WatchDmChannelRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_WatchDmChannelRequest.Marshal(b, m, deterministic) +func (m *WatchDmChannelsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_WatchDmChannelsRequest.Marshal(b, m, deterministic) } -func (m *WatchDmChannelRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_WatchDmChannelRequest.Merge(m, src) +func (m *WatchDmChannelsRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_WatchDmChannelsRequest.Merge(m, src) } -func (m *WatchDmChannelRequest) XXX_Size() int { - return xxx_messageInfo_WatchDmChannelRequest.Size(m) +func (m *WatchDmChannelsRequest) XXX_Size() int { + return xxx_messageInfo_WatchDmChannelsRequest.Size(m) } -func (m *WatchDmChannelRequest) XXX_DiscardUnknown() { - xxx_messageInfo_WatchDmChannelRequest.DiscardUnknown(m) +func (m *WatchDmChannelsRequest) XXX_DiscardUnknown() { + xxx_messageInfo_WatchDmChannelsRequest.DiscardUnknown(m) } -var xxx_messageInfo_WatchDmChannelRequest proto.InternalMessageInfo +var xxx_messageInfo_WatchDmChannelsRequest proto.InternalMessageInfo -func (m *WatchDmChannelRequest) GetBase() *internalpb2.MsgBase { +func (m *WatchDmChannelsRequest) GetBase() *internalpb2.MsgBase { if m != nil { return m.Base } return nil } -func (m *WatchDmChannelRequest) GetChannelIDs() []string { +func (m *WatchDmChannelsRequest) GetChannelIDs() []string { if m != nil { return m.ChannelIDs } @@ -1079,6 +1079,147 @@ func (m *ReleaseSegmentRequest) GetFieldIDs() []int64 { return nil } +type GetTimeTickChannelResponse struct { + Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + TimeTickChannelID string `protobuf:"bytes,2,opt,name=time_tick_channelID,json=timeTickChannelID,proto3" json:"time_tick_channelID,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetTimeTickChannelResponse) Reset() { *m = GetTimeTickChannelResponse{} } +func (m *GetTimeTickChannelResponse) String() string { return proto.CompactTextString(m) } +func (*GetTimeTickChannelResponse) ProtoMessage() {} +func (*GetTimeTickChannelResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_5fcb6756dc1afb8d, []int{19} +} + +func (m *GetTimeTickChannelResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetTimeTickChannelResponse.Unmarshal(m, b) +} +func (m *GetTimeTickChannelResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetTimeTickChannelResponse.Marshal(b, m, deterministic) +} +func (m *GetTimeTickChannelResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetTimeTickChannelResponse.Merge(m, src) +} +func (m *GetTimeTickChannelResponse) XXX_Size() int { + return xxx_messageInfo_GetTimeTickChannelResponse.Size(m) +} +func (m *GetTimeTickChannelResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetTimeTickChannelResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetTimeTickChannelResponse proto.InternalMessageInfo + +func (m *GetTimeTickChannelResponse) GetStatus() *commonpb.Status { + if m != nil { + return m.Status + } + return nil +} + +func (m *GetTimeTickChannelResponse) GetTimeTickChannelID() string { + if m != nil { + return m.TimeTickChannelID + } + return "" +} + +type GetStatsChannelResponse struct { + Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + StatsChannelID string `protobuf:"bytes,2,opt,name=stats_channelID,json=statsChannelID,proto3" json:"stats_channelID,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *GetStatsChannelResponse) Reset() { *m = GetStatsChannelResponse{} } +func (m *GetStatsChannelResponse) String() string { return proto.CompactTextString(m) } +func (*GetStatsChannelResponse) ProtoMessage() {} +func (*GetStatsChannelResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_5fcb6756dc1afb8d, []int{20} +} + +func (m *GetStatsChannelResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_GetStatsChannelResponse.Unmarshal(m, b) +} +func (m *GetStatsChannelResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_GetStatsChannelResponse.Marshal(b, m, deterministic) +} +func (m *GetStatsChannelResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GetStatsChannelResponse.Merge(m, src) +} +func (m *GetStatsChannelResponse) XXX_Size() int { + return xxx_messageInfo_GetStatsChannelResponse.Size(m) +} +func (m *GetStatsChannelResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GetStatsChannelResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GetStatsChannelResponse proto.InternalMessageInfo + +func (m *GetStatsChannelResponse) GetStatus() *commonpb.Status { + if m != nil { + return m.Status + } + return nil +} + +func (m *GetStatsChannelResponse) GetStatsChannelID() string { + if m != nil { + return m.StatsChannelID + } + return "" +} + +type ServiceStatesResponse struct { + Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + ServerStates *internalpb2.ServiceStates `protobuf:"bytes,2,opt,name=server_states,json=serverStates,proto3" json:"server_states,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ServiceStatesResponse) Reset() { *m = ServiceStatesResponse{} } +func (m *ServiceStatesResponse) String() string { return proto.CompactTextString(m) } +func (*ServiceStatesResponse) ProtoMessage() {} +func (*ServiceStatesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_5fcb6756dc1afb8d, []int{21} +} + +func (m *ServiceStatesResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ServiceStatesResponse.Unmarshal(m, b) +} +func (m *ServiceStatesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ServiceStatesResponse.Marshal(b, m, deterministic) +} +func (m *ServiceStatesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ServiceStatesResponse.Merge(m, src) +} +func (m *ServiceStatesResponse) XXX_Size() int { + return xxx_messageInfo_ServiceStatesResponse.Size(m) +} +func (m *ServiceStatesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ServiceStatesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ServiceStatesResponse proto.InternalMessageInfo + +func (m *ServiceStatesResponse) GetStatus() *commonpb.Status { + if m != nil { + return m.Status + } + return nil +} + +func (m *ServiceStatesResponse) GetServerStates() *internalpb2.ServiceStates { + if m != nil { + return m.ServerStates + } + return nil +} + func init() { proto.RegisterEnum("milvus.proto.query.PartitionState", PartitionState_name, PartitionState_value) proto.RegisterType((*RegisterNodeRequest)(nil), "milvus.proto.query.RegisterNodeRequest") @@ -1092,78 +1233,95 @@ func init() { proto.RegisterType((*PartitionStates)(nil), "milvus.proto.query.PartitionStates") proto.RegisterType((*PartitionStatesRequest)(nil), "milvus.proto.query.PartitionStatesRequest") proto.RegisterType((*PartitionStatesResponse)(nil), "milvus.proto.query.PartitionStatesResponse") - proto.RegisterType((*LoadPartitonRequest)(nil), "milvus.proto.query.LoadPartitonRequest") + proto.RegisterType((*LoadPartitionRequest)(nil), "milvus.proto.query.LoadPartitionRequest") proto.RegisterType((*ReleasePartitionRequest)(nil), "milvus.proto.query.ReleasePartitionRequest") proto.RegisterType((*CreateQueryChannelResponse)(nil), "milvus.proto.query.CreateQueryChannelResponse") - proto.RegisterType((*AddQueryChannelRequest)(nil), "milvus.proto.query.AddQueryChannelRequest") - proto.RegisterType((*RemoveQueryChannelRequest)(nil), "milvus.proto.query.RemoveQueryChannelRequest") - proto.RegisterType((*WatchDmChannelRequest)(nil), "milvus.proto.query.WatchDmChannelRequest") + proto.RegisterType((*AddQueryChannelsRequest)(nil), "milvus.proto.query.AddQueryChannelsRequest") + proto.RegisterType((*RemoveQueryChannelsRequest)(nil), "milvus.proto.query.RemoveQueryChannelsRequest") + proto.RegisterType((*WatchDmChannelsRequest)(nil), "milvus.proto.query.WatchDmChannelsRequest") proto.RegisterType((*LoadSegmentRequest)(nil), "milvus.proto.query.LoadSegmentRequest") proto.RegisterType((*ReleaseSegmentRequest)(nil), "milvus.proto.query.ReleaseSegmentRequest") + proto.RegisterType((*GetTimeTickChannelResponse)(nil), "milvus.proto.query.GetTimeTickChannelResponse") + proto.RegisterType((*GetStatsChannelResponse)(nil), "milvus.proto.query.GetStatsChannelResponse") + proto.RegisterType((*ServiceStatesResponse)(nil), "milvus.proto.query.ServiceStatesResponse") } func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) } var fileDescriptor_5fcb6756dc1afb8d = []byte{ - // 918 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x56, 0x6d, 0x6f, 0xe3, 0x44, - 0x10, 0xae, 0x9b, 0xb6, 0x77, 0x99, 0xe6, 0xdc, 0xdc, 0xf6, 0xd2, 0x2b, 0x06, 0x9d, 0x8e, 0x05, - 0xd4, 0xb4, 0x85, 0x44, 0x0a, 0x12, 0x42, 0x42, 0x02, 0xb5, 0xcd, 0xe9, 0x14, 0x89, 0x2b, 0x65, - 0x2b, 0x04, 0xea, 0x97, 0xe0, 0xd8, 0x43, 0xb2, 0xe0, 0x97, 0x9c, 0x77, 0x5d, 0x68, 0xbf, 0x01, - 0x12, 0x3f, 0x81, 0x5f, 0x00, 0x02, 0xfe, 0x15, 0x12, 0x7f, 0x04, 0x79, 0xd7, 0xf5, 0xd9, 0x89, - 0x43, 0x22, 0x5e, 0x94, 0x8a, 0x6f, 0xde, 0xd9, 0x67, 0x66, 0x9e, 0x79, 0x66, 0xd7, 0x3b, 0xb0, - 0xfd, 0x3c, 0xc6, 0xe8, 0xaa, 0x2f, 0x30, 0xba, 0xe4, 0x0e, 0xb6, 0xc6, 0x51, 0x28, 0x43, 0x42, - 0x7c, 0xee, 0x5d, 0xc6, 0x42, 0xaf, 0x5a, 0x0a, 0x61, 0xd5, 0x9c, 0xd0, 0xf7, 0xc3, 0x40, 0xdb, - 0x2c, 0x93, 0x07, 0x12, 0xa3, 0xc0, 0xf6, 0xf4, 0x9a, 0x7e, 0x6b, 0xc0, 0x36, 0xc3, 0x21, 0x17, - 0x12, 0xa3, 0xd3, 0xd0, 0x45, 0x86, 0xcf, 0x63, 0x14, 0x92, 0x74, 0x60, 0x6d, 0x60, 0x0b, 0xdc, - 0x35, 0x1e, 0x1b, 0xcd, 0xcd, 0xce, 0xa3, 0x56, 0x21, 0x70, 0x16, 0xe3, 0x99, 0x18, 0x1e, 0xdb, - 0x02, 0x99, 0xc2, 0x92, 0x77, 0xe0, 0x8e, 0xed, 0xba, 0x11, 0x0a, 0xb1, 0xbb, 0xaa, 0xdc, 0x5e, - 0x29, 0xba, 0xa5, 0x44, 0x8e, 0x34, 0x86, 0xdd, 0x80, 0xe9, 0x05, 0x3c, 0x28, 0x52, 0x10, 0xe3, - 0x30, 0x10, 0x48, 0x8e, 0x61, 0x93, 0x07, 0x5c, 0xf6, 0xc7, 0x76, 0x64, 0xfb, 0x22, 0xa5, 0xf2, - 0xea, 0x0c, 0x2a, 0xbd, 0x80, 0xcb, 0x33, 0x05, 0x64, 0xc0, 0xb3, 0x6f, 0xda, 0x87, 0xc6, 0xf9, - 0x28, 0xfc, 0xfa, 0x24, 0xf4, 0x3c, 0x74, 0x24, 0x0f, 0x83, 0x7f, 0x52, 0x20, 0x81, 0x35, 0x77, - 0xd0, 0xeb, 0xaa, 0xea, 0x2a, 0x4c, 0x7d, 0xd3, 0xf7, 0x61, 0x67, 0x32, 0x41, 0x4a, 0xff, 0x75, - 0xb8, 0xe7, 0x64, 0xd6, 0x5e, 0x37, 0x29, 0xa0, 0xd2, 0xac, 0xb0, 0xa2, 0x91, 0x7e, 0x6f, 0x40, - 0xe3, 0xc3, 0xd0, 0x76, 0xff, 0x33, 0x86, 0x84, 0x42, 0x2d, 0x9f, 0x72, 0xb7, 0xa2, 0xf6, 0x0a, - 0x36, 0xfa, 0x83, 0x01, 0xbb, 0x0c, 0x3d, 0xb4, 0x05, 0x2e, 0x97, 0xc8, 0x77, 0x06, 0x3c, 0x48, - 0xf4, 0x3c, 0xb3, 0x23, 0xc9, 0x97, 0x45, 0xe2, 0x3d, 0x7d, 0x68, 0x72, 0x1c, 0xd2, 0x96, 0x52, - 0xa8, 0x8d, 0x6f, 0x8c, 0x2f, 0x3a, 0x5a, 0xb0, 0x51, 0x1f, 0xb6, 0x32, 0xc7, 0x73, 0x69, 0x4b, - 0x14, 0xe4, 0x31, 0x6c, 0xe6, 0x20, 0xaa, 0x84, 0x0a, 0xcb, 0x9b, 0xc8, 0xbb, 0xb0, 0x2e, 0x12, - 0xac, 0xa2, 0x6a, 0x76, 0x68, 0x6b, 0xfa, 0x22, 0xb7, 0x8a, 0x51, 0x99, 0x76, 0xa0, 0xbf, 0x1a, - 0xb0, 0x33, 0x91, 0x6f, 0x09, 0x92, 0x4d, 0x29, 0xb3, 0x56, 0xa2, 0x4c, 0x0c, 0x0f, 0xa7, 0x98, - 0xa6, 0xc2, 0x5e, 0xc0, 0x4e, 0x06, 0xed, 0xbb, 0x28, 0x9c, 0x88, 0x8f, 0x93, 0x6f, 0x2d, 0xf1, - 0x66, 0xe7, 0xb5, 0xf9, 0x82, 0x08, 0xd6, 0xc8, 0x42, 0x74, 0x73, 0x11, 0xe8, 0xcf, 0x06, 0x6c, - 0x27, 0x37, 0x4c, 0xc3, 0x97, 0x72, 0xa2, 0x16, 0x92, 0xe7, 0x37, 0x03, 0x1e, 0xa6, 0x77, 0x70, - 0x99, 0xa7, 0x7f, 0x21, 0xae, 0x1e, 0x58, 0x27, 0x11, 0xda, 0x12, 0x3f, 0x4e, 0x3a, 0x71, 0x32, - 0xb2, 0x83, 0x00, 0xbd, 0xac, 0x9b, 0x7b, 0xb0, 0x15, 0x69, 0xe2, 0x7d, 0x47, 0x6f, 0x29, 0xe2, - 0x55, 0x66, 0xa6, 0xe6, 0xd4, 0x81, 0xbc, 0x01, 0x66, 0x84, 0x22, 0xf6, 0x5e, 0xe0, 0x56, 0x15, - 0xee, 0x9e, 0xb6, 0xa6, 0x30, 0xfa, 0x93, 0x01, 0x3b, 0x47, 0xae, 0x5b, 0xcc, 0xf5, 0xf7, 0x85, - 0x39, 0x84, 0xfb, 0x13, 0xf4, 0x52, 0x95, 0xaa, 0xac, 0x5e, 0x24, 0xd8, 0xeb, 0x92, 0x7d, 0xa8, - 0x17, 0x29, 0xa6, 0xaa, 0x55, 0xd9, 0x56, 0x81, 0x64, 0xaf, 0x4b, 0x7f, 0x31, 0xe0, 0x25, 0x86, - 0x7e, 0x78, 0x89, 0xb7, 0x9d, 0xe9, 0x57, 0xd0, 0xf8, 0xd4, 0x96, 0xce, 0xa8, 0xeb, 0xff, 0x0b, - 0x24, 0x1f, 0x01, 0x64, 0x09, 0x93, 0x97, 0xbf, 0xd2, 0xac, 0xb2, 0x9c, 0x85, 0xfe, 0x6e, 0x00, - 0x49, 0xee, 0xdf, 0x39, 0x0e, 0x7d, 0x0c, 0xe4, 0x2d, 0x3d, 0xd2, 0x49, 0x19, 0x42, 0x33, 0x4c, - 0x10, 0xeb, 0x0a, 0x91, 0xb3, 0x10, 0x0b, 0xee, 0x7e, 0xc1, 0xd1, 0x73, 0x93, 0xdd, 0x0d, 0xb5, - 0x9b, 0xad, 0xe9, 0x1f, 0x06, 0x34, 0xd2, 0xab, 0xfb, 0xff, 0xad, 0xf2, 0xe0, 0x1a, 0xcc, 0xe2, - 0x2f, 0x97, 0xd4, 0xe0, 0xee, 0x69, 0x28, 0x9f, 0x7c, 0xc3, 0x85, 0xac, 0xaf, 0x10, 0x13, 0xe0, - 0x34, 0x94, 0x67, 0x11, 0x0a, 0x0c, 0x64, 0xdd, 0x20, 0x00, 0x1b, 0x1f, 0x05, 0xdd, 0x64, 0x6f, - 0x95, 0x6c, 0xa7, 0xaf, 0xa2, 0xed, 0xf5, 0x82, 0x67, 0xe8, 0x87, 0xd1, 0x55, 0xbd, 0x92, 0xb8, - 0x67, 0xab, 0x35, 0x52, 0x87, 0x5a, 0x06, 0x79, 0x7a, 0xf6, 0x49, 0x7d, 0x9d, 0x54, 0x61, 0x5d, - 0x7f, 0x6e, 0x74, 0x7e, 0xbc, 0x03, 0x35, 0x75, 0xab, 0xce, 0xf5, 0xc0, 0x4b, 0x1c, 0xa8, 0xe5, - 0x87, 0x46, 0xb2, 0x57, 0xf6, 0x42, 0x94, 0x4c, 0xb6, 0x56, 0x73, 0x3e, 0x50, 0xff, 0xc6, 0xe8, - 0x0a, 0xf9, 0x12, 0xb6, 0x8a, 0xc3, 0x9d, 0x20, 0xfb, 0x65, 0xee, 0xa5, 0x23, 0xa6, 0x75, 0xb0, - 0x08, 0x34, 0xcb, 0x75, 0x01, 0x66, 0x71, 0x0e, 0x2c, 0x4f, 0x55, 0x3a, 0x2b, 0x5a, 0x2f, 0x97, - 0x4e, 0xda, 0x49, 0x93, 0x62, 0x41, 0x57, 0xc8, 0xe7, 0x70, 0x7f, 0x6a, 0xba, 0x23, 0x6f, 0x96, - 0x0b, 0x51, 0x3e, 0x04, 0xce, 0xcb, 0x30, 0x04, 0xb3, 0x30, 0x32, 0x09, 0xd2, 0x9c, 0x55, 0xfd, - 0xe4, 0xe3, 0x66, 0xed, 0x2f, 0x80, 0xcc, 0x64, 0x0a, 0x81, 0x3c, 0x45, 0x39, 0x39, 0x61, 0x1d, - 0x2c, 0x32, 0x1f, 0xa4, 0xe9, 0x0e, 0x17, 0xc2, 0x66, 0x09, 0x3f, 0xd3, 0x7d, 0xc9, 0x55, 0xb6, - 0x37, 0xab, 0x2f, 0x13, 0x13, 0xc6, 0x3c, 0xcd, 0xfa, 0x59, 0x57, 0x72, 0xc1, 0x0f, 0xff, 0xa2, - 0x2b, 0x53, 0xca, 0xcd, 0x49, 0x30, 0x00, 0x32, 0xfd, 0x4a, 0x13, 0xab, 0xd4, 0xe9, 0x89, 0x3f, - 0x96, 0x57, 0x56, 0xab, 0x2c, 0xfb, 0xec, 0x97, 0x9e, 0xae, 0x1c, 0x1f, 0x5d, 0x7c, 0x30, 0xe4, - 0x72, 0x14, 0x0f, 0x92, 0x40, 0xed, 0x6b, 0xee, 0x79, 0xfc, 0x5a, 0xa2, 0x33, 0x6a, 0xeb, 0x40, - 0x6f, 0xb9, 0x5c, 0xc8, 0x88, 0x0f, 0x62, 0x89, 0x6e, 0xfb, 0xe6, 0x7f, 0xd7, 0x56, 0xd1, 0xdb, - 0x2a, 0xfa, 0x78, 0x30, 0xd8, 0x50, 0xcb, 0xb7, 0xff, 0x0c, 0x00, 0x00, 0xff, 0xff, 0xc0, 0xd9, - 0xfd, 0xa1, 0xd3, 0x0e, 0x00, 0x00, + // 1142 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x57, 0xed, 0x6e, 0x1b, 0x45, + 0x17, 0xf6, 0xc6, 0x49, 0xde, 0xfa, 0xc4, 0xb1, 0x9d, 0xc9, 0xa7, 0xf6, 0x45, 0x55, 0x19, 0x0a, + 0xf9, 0x02, 0x47, 0x72, 0x25, 0x84, 0x84, 0x04, 0x4a, 0xe2, 0x2a, 0xb2, 0x44, 0x43, 0x58, 0xb7, + 0x42, 0xa4, 0x45, 0x66, 0xbd, 0x3b, 0xd8, 0x43, 0xf7, 0xc3, 0xdd, 0x19, 0xa7, 0x24, 0xff, 0x0a, + 0x12, 0xb7, 0xc0, 0x15, 0x80, 0xe0, 0x06, 0xb8, 0x0f, 0xee, 0x00, 0x89, 0x1b, 0x41, 0x33, 0xbb, + 0xde, 0xec, 0xae, 0x67, 0x63, 0x97, 0xb6, 0x4a, 0xc4, 0xbf, 0x9d, 0x33, 0x67, 0xce, 0xf3, 0xcc, + 0x99, 0x73, 0x66, 0x9e, 0x85, 0xe5, 0x67, 0x43, 0x12, 0x9c, 0x77, 0x18, 0x09, 0xce, 0xa8, 0x45, + 0xea, 0x83, 0xc0, 0xe7, 0x3e, 0x42, 0x2e, 0x75, 0xce, 0x86, 0x2c, 0x1c, 0xd5, 0xa5, 0x87, 0x5e, + 0xb6, 0x7c, 0xd7, 0xf5, 0xbd, 0xd0, 0xa6, 0x57, 0xa8, 0xc7, 0x49, 0xe0, 0x99, 0x4e, 0x38, 0xc6, + 0x2f, 0x34, 0x58, 0x36, 0x48, 0x8f, 0x32, 0x4e, 0x82, 0x63, 0xdf, 0x26, 0x06, 0x79, 0x36, 0x24, + 0x8c, 0xa3, 0x06, 0xcc, 0x76, 0x4d, 0x46, 0x36, 0xb4, 0x3b, 0xda, 0xd6, 0x42, 0xe3, 0x76, 0x3d, + 0x15, 0x38, 0x8e, 0xf1, 0x80, 0xf5, 0x0e, 0x4c, 0x46, 0x0c, 0xe9, 0x8b, 0x3e, 0x84, 0xff, 0x99, + 0xb6, 0x1d, 0x10, 0xc6, 0x36, 0x66, 0xe4, 0xb2, 0xb7, 0xd2, 0xcb, 0x22, 0x22, 0xfb, 0xa1, 0x8f, + 0x31, 0x72, 0xc6, 0xa7, 0xb0, 0x92, 0xa6, 0xc0, 0x06, 0xbe, 0xc7, 0x08, 0x3a, 0x80, 0x05, 0xea, + 0x51, 0xde, 0x19, 0x98, 0x81, 0xe9, 0xb2, 0x88, 0xca, 0xdb, 0x39, 0x54, 0x5a, 0x1e, 0xe5, 0x27, + 0xd2, 0xd1, 0x00, 0x1a, 0x7f, 0xe3, 0x0e, 0xac, 0xb6, 0xfb, 0xfe, 0xf3, 0x43, 0xdf, 0x71, 0x88, + 0xc5, 0xa9, 0xef, 0xbd, 0xca, 0x06, 0x11, 0xcc, 0xda, 0xdd, 0x56, 0x53, 0xee, 0xae, 0x68, 0xc8, + 0x6f, 0xfc, 0x09, 0xac, 0x65, 0x01, 0x22, 0xfa, 0x77, 0x61, 0xd1, 0x8a, 0xad, 0xad, 0xa6, 0xd8, + 0x40, 0x71, 0xab, 0x68, 0xa4, 0x8d, 0xf8, 0x47, 0x0d, 0x56, 0x3f, 0xf3, 0x4d, 0xfb, 0x8d, 0x31, + 0x44, 0x18, 0xca, 0x49, 0xc8, 0x8d, 0xa2, 0x9c, 0x4b, 0xd9, 0xf0, 0x4f, 0x1a, 0x6c, 0x18, 0xc4, + 0x21, 0x26, 0x23, 0xd7, 0x4b, 0xe4, 0x07, 0x0d, 0x56, 0x44, 0x3e, 0x4f, 0xcc, 0x80, 0xd3, 0xeb, + 0x22, 0xf1, 0x71, 0x58, 0x34, 0x09, 0x0e, 0xd1, 0x91, 0x62, 0x28, 0x0f, 0x46, 0xc6, 0xcb, 0x13, + 0x4d, 0xd9, 0xb0, 0x0b, 0xd5, 0x78, 0x61, 0x9b, 0x9b, 0x9c, 0x30, 0x74, 0x07, 0x16, 0x12, 0x2e, + 0x72, 0x0b, 0x45, 0x23, 0x69, 0x42, 0x1f, 0xc1, 0x1c, 0x13, 0xbe, 0x92, 0x6a, 0xa5, 0x81, 0xeb, + 0xe3, 0x8d, 0x5c, 0x4f, 0x47, 0x35, 0xc2, 0x05, 0xf8, 0x37, 0x0d, 0xd6, 0x32, 0x78, 0xd7, 0x90, + 0xb2, 0xb1, 0xcc, 0xcc, 0x2a, 0x32, 0x33, 0x84, 0xf5, 0x31, 0xa6, 0x51, 0x62, 0x4f, 0x61, 0x2d, + 0x76, 0xed, 0xd8, 0x84, 0x59, 0x01, 0x1d, 0x88, 0xef, 0x30, 0xc5, 0x0b, 0x8d, 0x77, 0x26, 0x27, + 0x84, 0x19, 0xab, 0x71, 0x88, 0x66, 0x22, 0x02, 0xfe, 0x55, 0x83, 0x15, 0xd1, 0x61, 0xd7, 0x59, + 0x52, 0x53, 0xe5, 0xe7, 0x77, 0x0d, 0xd6, 0xa3, 0x26, 0xbc, 0xf1, 0x5c, 0x1d, 0xd0, 0x0f, 0x03, + 0x62, 0x72, 0xf2, 0x85, 0x38, 0x8a, 0xc3, 0xbe, 0xe9, 0x79, 0xc4, 0x89, 0x8f, 0x73, 0x13, 0xaa, + 0x41, 0x48, 0xbc, 0x63, 0x85, 0x53, 0x92, 0x78, 0xc9, 0xa8, 0x44, 0xe6, 0x68, 0x01, 0x7a, 0x17, + 0x2a, 0x01, 0x61, 0x43, 0xe7, 0xd2, 0x6f, 0x46, 0xfa, 0x2d, 0x86, 0xd6, 0xc8, 0x0d, 0xff, 0xa2, + 0xc1, 0xfa, 0xbe, 0x6d, 0x27, 0xb1, 0x5e, 0xa9, 0xca, 0x77, 0x61, 0x29, 0xc3, 0x2f, 0x4a, 0x53, + 0xc9, 0xa8, 0xa5, 0x19, 0xb6, 0x9a, 0x68, 0x1b, 0x6a, 0x69, 0x8e, 0x51, 0xda, 0x4a, 0x46, 0x35, + 0xc5, 0xb2, 0xd5, 0x14, 0xcd, 0xa8, 0x1b, 0xc4, 0xf5, 0xcf, 0xc8, 0x8d, 0xa7, 0xea, 0xc0, 0xda, + 0x97, 0x26, 0xb7, 0xfa, 0x4d, 0xf7, 0x75, 0xb0, 0xbc, 0x0d, 0x10, 0x23, 0x8a, 0xd7, 0xbf, 0xb8, + 0x55, 0x32, 0x12, 0x16, 0xfc, 0x97, 0x06, 0x48, 0xf4, 0x60, 0x9b, 0xf4, 0x5c, 0xe2, 0xf1, 0x1b, + 0x5a, 0xd5, 0x62, 0x1b, 0x2c, 0x64, 0x28, 0x3c, 0xe6, 0xa4, 0x47, 0xc2, 0x82, 0x74, 0xb8, 0xf5, + 0x2d, 0x25, 0x8e, 0x2d, 0x66, 0xe7, 0xe5, 0x6c, 0x3c, 0xc6, 0x7f, 0x6b, 0xb0, 0x1a, 0x75, 0xef, + 0x7f, 0x78, 0x97, 0x2f, 0x34, 0xd0, 0x8f, 0x08, 0x7f, 0x48, 0x5d, 0xf2, 0x90, 0x5a, 0x4f, 0xb3, + 0x8d, 0x7f, 0x0f, 0xe6, 0xc5, 0xb3, 0x34, 0x1c, 0xa9, 0xb5, 0xff, 0x2b, 0x15, 0x60, 0x5b, 0xba, + 0x18, 0x91, 0x2b, 0xaa, 0xc3, 0x32, 0xa7, 0x2e, 0xe9, 0x70, 0x6a, 0x3d, 0x1d, 0x2b, 0xf2, 0x25, + 0x9e, 0x86, 0x6a, 0x35, 0xf1, 0x73, 0x58, 0x3f, 0x22, 0x5c, 0x04, 0x61, 0xaf, 0x05, 0x7f, 0x13, + 0xaa, 0xe2, 0x8b, 0x8d, 0x61, 0x57, 0x58, 0x02, 0xa3, 0xd5, 0xc4, 0x3f, 0x6b, 0xb0, 0xda, 0x0e, + 0x05, 0x77, 0xe6, 0xfd, 0xfa, 0x57, 0xb8, 0x2d, 0x58, 0x14, 0xf2, 0x9d, 0x04, 0x1d, 0xf9, 0x94, + 0x8f, 0x54, 0xf3, 0xdd, 0x9c, 0x02, 0x49, 0x23, 0x97, 0xc3, 0xa5, 0xe1, 0x68, 0xe7, 0x02, 0x2a, + 0xe9, 0xd7, 0x10, 0x95, 0xe1, 0xd6, 0xb1, 0xcf, 0xef, 0x7f, 0x4f, 0x19, 0xaf, 0x15, 0x50, 0x05, + 0xe0, 0xd8, 0xe7, 0x27, 0x01, 0x61, 0xc4, 0xe3, 0x35, 0x0d, 0x01, 0xcc, 0x7f, 0xee, 0x35, 0xc5, + 0xdc, 0x0c, 0x5a, 0x8e, 0x04, 0x8b, 0xe9, 0xb4, 0xbc, 0x07, 0xc4, 0xf5, 0x83, 0xf3, 0x5a, 0x51, + 0x2c, 0x8f, 0x47, 0xb3, 0xa8, 0x06, 0xe5, 0xd8, 0xe5, 0xe8, 0xe4, 0x51, 0x6d, 0x0e, 0x95, 0x60, + 0x2e, 0xfc, 0x9c, 0x6f, 0xfc, 0x51, 0x82, 0xb2, 0xbc, 0xee, 0x22, 0x82, 0xc8, 0x82, 0x72, 0x52, + 0xcf, 0xa3, 0x4d, 0xd5, 0xe3, 0xad, 0xf8, 0xe9, 0xd0, 0xb7, 0x26, 0x3b, 0x86, 0xf9, 0xc6, 0x05, + 0xf4, 0x1d, 0x54, 0xd3, 0xba, 0x9b, 0xa1, 0x6d, 0xd5, 0x72, 0xa5, 0xfa, 0xd7, 0x77, 0xa6, 0x71, + 0x8d, 0xb1, 0x7a, 0x50, 0x49, 0xe9, 0x41, 0x86, 0xb6, 0xf2, 0xd6, 0x67, 0x1f, 0x6e, 0x7d, 0x7b, + 0x0a, 0xcf, 0x18, 0xe8, 0x2b, 0xa8, 0xa4, 0x94, 0x4a, 0x0e, 0x90, 0x4a, 0xcd, 0xe8, 0x57, 0x95, + 0x1c, 0x2e, 0xa0, 0x0e, 0x2c, 0x65, 0xb5, 0x05, 0x43, 0xbb, 0xea, 0x84, 0x2b, 0x25, 0xc8, 0x24, + 0x80, 0xd3, 0x90, 0xfb, 0x65, 0x02, 0xd5, 0xe7, 0xa1, 0xfc, 0xd7, 0x99, 0x14, 0xfb, 0x9b, 0x98, + 0x7c, 0x22, 0xfc, 0xfb, 0x57, 0x90, 0x7f, 0x69, 0x84, 0x2e, 0xa0, 0x71, 0x3d, 0x83, 0x74, 0xe5, + 0xa2, 0xfb, 0xee, 0x80, 0x9f, 0xeb, 0x75, 0x15, 0x7c, 0xbe, 0x26, 0x0a, 0x31, 0xc6, 0xaf, 0xce, + 0x97, 0xc7, 0xc8, 0xbf, 0x7e, 0x71, 0x01, 0x3d, 0x81, 0x6a, 0xe6, 0x6e, 0xbc, 0x12, 0x60, 0x37, + 0x07, 0x40, 0x75, 0xb9, 0xe2, 0x02, 0xf2, 0xe5, 0x0e, 0xb2, 0xbf, 0x37, 0x3b, 0xd3, 0x88, 0xf3, + 0xe8, 0x18, 0x76, 0xa7, 0xf2, 0x8d, 0x01, 0x1f, 0x43, 0x4d, 0xb0, 0x49, 0xde, 0x7c, 0x57, 0xee, + 0x47, 0xdd, 0x6d, 0xaa, 0x2b, 0x1b, 0x17, 0x1a, 0x7f, 0xce, 0x42, 0x49, 0x1e, 0x95, 0xbc, 0xa5, + 0xbe, 0x86, 0x6a, 0x46, 0x62, 0xaa, 0xdb, 0x23, 0x47, 0x87, 0x4e, 0x2a, 0x30, 0x0b, 0xd0, 0xb8, + 0x32, 0x44, 0x75, 0x75, 0x0d, 0xe7, 0x29, 0xc8, 0x49, 0x20, 0x4f, 0xa0, 0x9a, 0x11, 0x75, 0xea, + 0xc3, 0x51, 0x2b, 0xbf, 0x49, 0xd1, 0x1f, 0x41, 0x39, 0xa1, 0xe1, 0x18, 0x7a, 0x2f, 0xaf, 0xbf, + 0xd3, 0xfa, 0x67, 0x52, 0xd8, 0xc7, 0x50, 0x4d, 0xeb, 0xa6, 0x9c, 0x9b, 0x5c, 0x29, 0xae, 0x26, + 0x05, 0xf7, 0x60, 0x69, 0xac, 0x62, 0xdf, 0x60, 0xc1, 0x1e, 0xec, 0x9f, 0x7e, 0xda, 0xa3, 0xbc, + 0x3f, 0xec, 0x0a, 0x26, 0x7b, 0x17, 0xd4, 0x71, 0xe8, 0x05, 0x27, 0x56, 0x7f, 0x2f, 0x8c, 0xf2, + 0x81, 0x4d, 0x19, 0x0f, 0x68, 0x77, 0xc8, 0x89, 0xbd, 0x37, 0x7a, 0xd9, 0xf7, 0x64, 0xe8, 0x3d, + 0x19, 0x7a, 0xd0, 0xed, 0xce, 0xcb, 0xe1, 0xbd, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x13, 0x6f, + 0x8a, 0xde, 0xe2, 0x13, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1186,13 +1344,16 @@ type QueryServiceClient interface { // @return Status RegisterNode(ctx context.Context, in *RegisterNodeRequest, opts ...grpc.CallOption) (*RegisterNodeResponse, error) ShowCollections(ctx context.Context, in *ShowCollectionRequest, opts ...grpc.CallOption) (*ShowCollectionResponse, error) - LoadCollection(ctx context.Context, in *LoadCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - ReleaseCollection(ctx context.Context, in *ReleaseCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) ShowPartitions(ctx context.Context, in *ShowPartitionRequest, opts ...grpc.CallOption) (*ShowPartitionResponse, error) - GetPartitionStates(ctx context.Context, in *PartitionStatesRequest, opts ...grpc.CallOption) (*PartitionStatesResponse, error) - LoadPartitions(ctx context.Context, in *LoadPartitonRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + LoadPartitions(ctx context.Context, in *LoadPartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) ReleasePartitions(ctx context.Context, in *ReleasePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + LoadCollection(ctx context.Context, in *LoadCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + ReleaseCollection(ctx context.Context, in *ReleaseCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) CreateQueryChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*CreateQueryChannelResponse, error) + GetTimeTickChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*GetTimeTickChannelResponse, error) + GetStatsChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*GetStatsChannelResponse, error) + GetPartitionStates(ctx context.Context, in *PartitionStatesRequest, opts ...grpc.CallOption) (*PartitionStatesResponse, error) + GetServiceStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*ServiceStatesResponse, error) } type queryServiceClient struct { @@ -1221,6 +1382,33 @@ func (c *queryServiceClient) ShowCollections(ctx context.Context, in *ShowCollec return out, nil } +func (c *queryServiceClient) ShowPartitions(ctx context.Context, in *ShowPartitionRequest, opts ...grpc.CallOption) (*ShowPartitionResponse, error) { + out := new(ShowPartitionResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/ShowPartitions", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *queryServiceClient) LoadPartitions(ctx context.Context, in *LoadPartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/LoadPartitions", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *queryServiceClient) ReleasePartitions(ctx context.Context, in *ReleasePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/ReleasePartitions", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *queryServiceClient) LoadCollection(ctx context.Context, in *LoadCollectionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { out := new(commonpb.Status) err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/LoadCollection", in, out, opts...) @@ -1239,45 +1427,45 @@ func (c *queryServiceClient) ReleaseCollection(ctx context.Context, in *ReleaseC return out, nil } -func (c *queryServiceClient) ShowPartitions(ctx context.Context, in *ShowPartitionRequest, opts ...grpc.CallOption) (*ShowPartitionResponse, error) { - out := new(ShowPartitionResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/ShowPartitions", in, out, opts...) +func (c *queryServiceClient) CreateQueryChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*CreateQueryChannelResponse, error) { + out := new(CreateQueryChannelResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/CreateQueryChannel", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *queryServiceClient) GetPartitionStates(ctx context.Context, in *PartitionStatesRequest, opts ...grpc.CallOption) (*PartitionStatesResponse, error) { - out := new(PartitionStatesResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/GetPartitionStates", in, out, opts...) +func (c *queryServiceClient) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*GetTimeTickChannelResponse, error) { + out := new(GetTimeTickChannelResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/GetTimeTickChannel", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *queryServiceClient) LoadPartitions(ctx context.Context, in *LoadPartitonRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { - out := new(commonpb.Status) - err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/LoadPartitions", in, out, opts...) +func (c *queryServiceClient) GetStatsChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*GetStatsChannelResponse, error) { + out := new(GetStatsChannelResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/GetStatsChannel", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *queryServiceClient) ReleasePartitions(ctx context.Context, in *ReleasePartitionRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { - out := new(commonpb.Status) - err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/ReleasePartitions", in, out, opts...) +func (c *queryServiceClient) GetPartitionStates(ctx context.Context, in *PartitionStatesRequest, opts ...grpc.CallOption) (*PartitionStatesResponse, error) { + out := new(PartitionStatesResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/GetPartitionStates", in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *queryServiceClient) CreateQueryChannel(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*CreateQueryChannelResponse, error) { - out := new(CreateQueryChannelResponse) - err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/CreateQueryChannel", in, out, opts...) +func (c *queryServiceClient) GetServiceStates(ctx context.Context, in *commonpb.Empty, opts ...grpc.CallOption) (*ServiceStatesResponse, error) { + out := new(ServiceStatesResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryService/GetServiceStates", in, out, opts...) if err != nil { return nil, err } @@ -1294,13 +1482,16 @@ type QueryServiceServer interface { // @return Status RegisterNode(context.Context, *RegisterNodeRequest) (*RegisterNodeResponse, error) ShowCollections(context.Context, *ShowCollectionRequest) (*ShowCollectionResponse, error) - LoadCollection(context.Context, *LoadCollectionRequest) (*commonpb.Status, error) - ReleaseCollection(context.Context, *ReleaseCollectionRequest) (*commonpb.Status, error) ShowPartitions(context.Context, *ShowPartitionRequest) (*ShowPartitionResponse, error) - GetPartitionStates(context.Context, *PartitionStatesRequest) (*PartitionStatesResponse, error) - LoadPartitions(context.Context, *LoadPartitonRequest) (*commonpb.Status, error) + LoadPartitions(context.Context, *LoadPartitionRequest) (*commonpb.Status, error) ReleasePartitions(context.Context, *ReleasePartitionRequest) (*commonpb.Status, error) + LoadCollection(context.Context, *LoadCollectionRequest) (*commonpb.Status, error) + ReleaseCollection(context.Context, *ReleaseCollectionRequest) (*commonpb.Status, error) CreateQueryChannel(context.Context, *commonpb.Empty) (*CreateQueryChannelResponse, error) + GetTimeTickChannel(context.Context, *commonpb.Empty) (*GetTimeTickChannelResponse, error) + GetStatsChannel(context.Context, *commonpb.Empty) (*GetStatsChannelResponse, error) + GetPartitionStates(context.Context, *PartitionStatesRequest) (*PartitionStatesResponse, error) + GetServiceStates(context.Context, *commonpb.Empty) (*ServiceStatesResponse, error) } // UnimplementedQueryServiceServer can be embedded to have forward compatible implementations. @@ -1313,27 +1504,36 @@ func (*UnimplementedQueryServiceServer) RegisterNode(ctx context.Context, req *R func (*UnimplementedQueryServiceServer) ShowCollections(ctx context.Context, req *ShowCollectionRequest) (*ShowCollectionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ShowCollections not implemented") } -func (*UnimplementedQueryServiceServer) LoadCollection(ctx context.Context, req *LoadCollectionRequest) (*commonpb.Status, error) { - return nil, status.Errorf(codes.Unimplemented, "method LoadCollection not implemented") -} -func (*UnimplementedQueryServiceServer) ReleaseCollection(ctx context.Context, req *ReleaseCollectionRequest) (*commonpb.Status, error) { - return nil, status.Errorf(codes.Unimplemented, "method ReleaseCollection not implemented") -} func (*UnimplementedQueryServiceServer) ShowPartitions(ctx context.Context, req *ShowPartitionRequest) (*ShowPartitionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ShowPartitions not implemented") } -func (*UnimplementedQueryServiceServer) GetPartitionStates(ctx context.Context, req *PartitionStatesRequest) (*PartitionStatesResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetPartitionStates not implemented") -} -func (*UnimplementedQueryServiceServer) LoadPartitions(ctx context.Context, req *LoadPartitonRequest) (*commonpb.Status, error) { +func (*UnimplementedQueryServiceServer) LoadPartitions(ctx context.Context, req *LoadPartitionRequest) (*commonpb.Status, error) { return nil, status.Errorf(codes.Unimplemented, "method LoadPartitions not implemented") } func (*UnimplementedQueryServiceServer) ReleasePartitions(ctx context.Context, req *ReleasePartitionRequest) (*commonpb.Status, error) { return nil, status.Errorf(codes.Unimplemented, "method ReleasePartitions not implemented") } +func (*UnimplementedQueryServiceServer) LoadCollection(ctx context.Context, req *LoadCollectionRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method LoadCollection not implemented") +} +func (*UnimplementedQueryServiceServer) ReleaseCollection(ctx context.Context, req *ReleaseCollectionRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReleaseCollection not implemented") +} func (*UnimplementedQueryServiceServer) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*CreateQueryChannelResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method CreateQueryChannel not implemented") } +func (*UnimplementedQueryServiceServer) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*GetTimeTickChannelResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetTimeTickChannel not implemented") +} +func (*UnimplementedQueryServiceServer) GetStatsChannel(ctx context.Context, req *commonpb.Empty) (*GetStatsChannelResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetStatsChannel not implemented") +} +func (*UnimplementedQueryServiceServer) GetPartitionStates(ctx context.Context, req *PartitionStatesRequest) (*PartitionStatesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetPartitionStates not implemented") +} +func (*UnimplementedQueryServiceServer) GetServiceStates(ctx context.Context, req *commonpb.Empty) (*ServiceStatesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetServiceStates not implemented") +} func RegisterQueryServiceServer(s *grpc.Server, srv QueryServiceServer) { s.RegisterService(&_QueryService_serviceDesc, srv) @@ -1375,6 +1575,60 @@ func _QueryService_ShowCollections_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _QueryService_ShowPartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ShowPartitionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryServiceServer).ShowPartitions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryService/ShowPartitions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryServiceServer).ShowPartitions(ctx, req.(*ShowPartitionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _QueryService_LoadPartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LoadPartitionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryServiceServer).LoadPartitions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryService/LoadPartitions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryServiceServer).LoadPartitions(ctx, req.(*LoadPartitionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _QueryService_ReleasePartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReleasePartitionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryServiceServer).ReleasePartitions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryService/ReleasePartitions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryServiceServer).ReleasePartitions(ctx, req.(*ReleasePartitionRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _QueryService_LoadCollection_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(LoadCollectionRequest) if err := dec(in); err != nil { @@ -1411,92 +1665,92 @@ func _QueryService_ReleaseCollection_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } -func _QueryService_ShowPartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ShowPartitionRequest) +func _QueryService_CreateQueryChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(commonpb.Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(QueryServiceServer).ShowPartitions(ctx, in) + return srv.(QueryServiceServer).CreateQueryChannel(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/milvus.proto.query.QueryService/ShowPartitions", + FullMethod: "/milvus.proto.query.QueryService/CreateQueryChannel", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServiceServer).ShowPartitions(ctx, req.(*ShowPartitionRequest)) + return srv.(QueryServiceServer).CreateQueryChannel(ctx, req.(*commonpb.Empty)) } return interceptor(ctx, in, info, handler) } -func _QueryService_GetPartitionStates_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PartitionStatesRequest) +func _QueryService_GetTimeTickChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(commonpb.Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(QueryServiceServer).GetPartitionStates(ctx, in) + return srv.(QueryServiceServer).GetTimeTickChannel(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/milvus.proto.query.QueryService/GetPartitionStates", + FullMethod: "/milvus.proto.query.QueryService/GetTimeTickChannel", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServiceServer).GetPartitionStates(ctx, req.(*PartitionStatesRequest)) + return srv.(QueryServiceServer).GetTimeTickChannel(ctx, req.(*commonpb.Empty)) } return interceptor(ctx, in, info, handler) } -func _QueryService_LoadPartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(LoadPartitonRequest) +func _QueryService_GetStatsChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(commonpb.Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(QueryServiceServer).LoadPartitions(ctx, in) + return srv.(QueryServiceServer).GetStatsChannel(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/milvus.proto.query.QueryService/LoadPartitions", + FullMethod: "/milvus.proto.query.QueryService/GetStatsChannel", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServiceServer).LoadPartitions(ctx, req.(*LoadPartitonRequest)) + return srv.(QueryServiceServer).GetStatsChannel(ctx, req.(*commonpb.Empty)) } return interceptor(ctx, in, info, handler) } -func _QueryService_ReleasePartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReleasePartitionRequest) +func _QueryService_GetPartitionStates_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PartitionStatesRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(QueryServiceServer).ReleasePartitions(ctx, in) + return srv.(QueryServiceServer).GetPartitionStates(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/milvus.proto.query.QueryService/ReleasePartitions", + FullMethod: "/milvus.proto.query.QueryService/GetPartitionStates", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServiceServer).ReleasePartitions(ctx, req.(*ReleasePartitionRequest)) + return srv.(QueryServiceServer).GetPartitionStates(ctx, req.(*PartitionStatesRequest)) } return interceptor(ctx, in, info, handler) } -func _QueryService_CreateQueryChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _QueryService_GetServiceStates_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(commonpb.Empty) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(QueryServiceServer).CreateQueryChannel(ctx, in) + return srv.(QueryServiceServer).GetServiceStates(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/milvus.proto.query.QueryService/CreateQueryChannel", + FullMethod: "/milvus.proto.query.QueryService/GetServiceStates", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(QueryServiceServer).CreateQueryChannel(ctx, req.(*commonpb.Empty)) + return srv.(QueryServiceServer).GetServiceStates(ctx, req.(*commonpb.Empty)) } return interceptor(ctx, in, info, handler) } @@ -1513,6 +1767,18 @@ var _QueryService_serviceDesc = grpc.ServiceDesc{ MethodName: "ShowCollections", Handler: _QueryService_ShowCollections_Handler, }, + { + MethodName: "ShowPartitions", + Handler: _QueryService_ShowPartitions_Handler, + }, + { + MethodName: "LoadPartitions", + Handler: _QueryService_LoadPartitions_Handler, + }, + { + MethodName: "ReleasePartitions", + Handler: _QueryService_ReleasePartitions_Handler, + }, { MethodName: "LoadCollection", Handler: _QueryService_LoadCollection_Handler, @@ -1522,24 +1788,276 @@ var _QueryService_serviceDesc = grpc.ServiceDesc{ Handler: _QueryService_ReleaseCollection_Handler, }, { - MethodName: "ShowPartitions", - Handler: _QueryService_ShowPartitions_Handler, + MethodName: "CreateQueryChannel", + Handler: _QueryService_CreateQueryChannel_Handler, + }, + { + MethodName: "GetTimeTickChannel", + Handler: _QueryService_GetTimeTickChannel_Handler, + }, + { + MethodName: "GetStatsChannel", + Handler: _QueryService_GetStatsChannel_Handler, }, { MethodName: "GetPartitionStates", Handler: _QueryService_GetPartitionStates_Handler, }, { - MethodName: "LoadPartitions", - Handler: _QueryService_LoadPartitions_Handler, + MethodName: "GetServiceStates", + Handler: _QueryService_GetServiceStates_Handler, }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "query_service.proto", +} + +// QueryNodeClient is the client API for QueryNode service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type QueryNodeClient interface { + AddQueryChannel(ctx context.Context, in *AddQueryChannelsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + RemoveQueryChannel(ctx context.Context, in *RemoveQueryChannelsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + WatchDmChannels(ctx context.Context, in *WatchDmChannelsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + LoadSegments(ctx context.Context, in *LoadSegmentRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + ReleaseSegments(ctx context.Context, in *ReleaseSegmentRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + GetPartitionState(ctx context.Context, in *PartitionStatesRequest, opts ...grpc.CallOption) (*PartitionStatesResponse, error) +} + +type queryNodeClient struct { + cc *grpc.ClientConn +} + +func NewQueryNodeClient(cc *grpc.ClientConn) QueryNodeClient { + return &queryNodeClient{cc} +} + +func (c *queryNodeClient) AddQueryChannel(ctx context.Context, in *AddQueryChannelsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryNode/AddQueryChannel", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *queryNodeClient) RemoveQueryChannel(ctx context.Context, in *RemoveQueryChannelsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryNode/RemoveQueryChannel", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *queryNodeClient) WatchDmChannels(ctx context.Context, in *WatchDmChannelsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryNode/WatchDmChannels", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *queryNodeClient) LoadSegments(ctx context.Context, in *LoadSegmentRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryNode/LoadSegments", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *queryNodeClient) ReleaseSegments(ctx context.Context, in *ReleaseSegmentRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { + out := new(commonpb.Status) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryNode/ReleaseSegments", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *queryNodeClient) GetPartitionState(ctx context.Context, in *PartitionStatesRequest, opts ...grpc.CallOption) (*PartitionStatesResponse, error) { + out := new(PartitionStatesResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.query.QueryNode/GetPartitionState", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// QueryNodeServer is the server API for QueryNode service. +type QueryNodeServer interface { + AddQueryChannel(context.Context, *AddQueryChannelsRequest) (*commonpb.Status, error) + RemoveQueryChannel(context.Context, *RemoveQueryChannelsRequest) (*commonpb.Status, error) + WatchDmChannels(context.Context, *WatchDmChannelsRequest) (*commonpb.Status, error) + LoadSegments(context.Context, *LoadSegmentRequest) (*commonpb.Status, error) + ReleaseSegments(context.Context, *ReleaseSegmentRequest) (*commonpb.Status, error) + GetPartitionState(context.Context, *PartitionStatesRequest) (*PartitionStatesResponse, error) +} + +// UnimplementedQueryNodeServer can be embedded to have forward compatible implementations. +type UnimplementedQueryNodeServer struct { +} + +func (*UnimplementedQueryNodeServer) AddQueryChannel(ctx context.Context, req *AddQueryChannelsRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method AddQueryChannel not implemented") +} +func (*UnimplementedQueryNodeServer) RemoveQueryChannel(ctx context.Context, req *RemoveQueryChannelsRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method RemoveQueryChannel not implemented") +} +func (*UnimplementedQueryNodeServer) WatchDmChannels(ctx context.Context, req *WatchDmChannelsRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method WatchDmChannels not implemented") +} +func (*UnimplementedQueryNodeServer) LoadSegments(ctx context.Context, req *LoadSegmentRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method LoadSegments not implemented") +} +func (*UnimplementedQueryNodeServer) ReleaseSegments(ctx context.Context, req *ReleaseSegmentRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReleaseSegments not implemented") +} +func (*UnimplementedQueryNodeServer) GetPartitionState(ctx context.Context, req *PartitionStatesRequest) (*PartitionStatesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetPartitionState not implemented") +} + +func RegisterQueryNodeServer(s *grpc.Server, srv QueryNodeServer) { + s.RegisterService(&_QueryNode_serviceDesc, srv) +} + +func _QueryNode_AddQueryChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AddQueryChannelsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryNodeServer).AddQueryChannel(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryNode/AddQueryChannel", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryNodeServer).AddQueryChannel(ctx, req.(*AddQueryChannelsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _QueryNode_RemoveQueryChannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RemoveQueryChannelsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryNodeServer).RemoveQueryChannel(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryNode/RemoveQueryChannel", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryNodeServer).RemoveQueryChannel(ctx, req.(*RemoveQueryChannelsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _QueryNode_WatchDmChannels_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WatchDmChannelsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryNodeServer).WatchDmChannels(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryNode/WatchDmChannels", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryNodeServer).WatchDmChannels(ctx, req.(*WatchDmChannelsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _QueryNode_LoadSegments_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LoadSegmentRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryNodeServer).LoadSegments(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryNode/LoadSegments", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryNodeServer).LoadSegments(ctx, req.(*LoadSegmentRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _QueryNode_ReleaseSegments_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReleaseSegmentRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryNodeServer).ReleaseSegments(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryNode/ReleaseSegments", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryNodeServer).ReleaseSegments(ctx, req.(*ReleaseSegmentRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _QueryNode_GetPartitionState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PartitionStatesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QueryNodeServer).GetPartitionState(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.query.QueryNode/GetPartitionState", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QueryNodeServer).GetPartitionState(ctx, req.(*PartitionStatesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _QueryNode_serviceDesc = grpc.ServiceDesc{ + ServiceName: "milvus.proto.query.QueryNode", + HandlerType: (*QueryNodeServer)(nil), + Methods: []grpc.MethodDesc{ { - MethodName: "ReleasePartitions", - Handler: _QueryService_ReleasePartitions_Handler, + MethodName: "AddQueryChannel", + Handler: _QueryNode_AddQueryChannel_Handler, }, { - MethodName: "CreateQueryChannel", - Handler: _QueryService_CreateQueryChannel_Handler, + MethodName: "RemoveQueryChannel", + Handler: _QueryNode_RemoveQueryChannel_Handler, + }, + { + MethodName: "WatchDmChannels", + Handler: _QueryNode_WatchDmChannels_Handler, + }, + { + MethodName: "LoadSegments", + Handler: _QueryNode_LoadSegments_Handler, + }, + { + MethodName: "ReleaseSegments", + Handler: _QueryNode_ReleaseSegments_Handler, + }, + { + MethodName: "GetPartitionState", + Handler: _QueryNode_GetPartitionState_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/internal/proxynode/proxy.go b/internal/proxynode/proxy.go index fd91c38e05a46b88c7ee3672f13b2f19604f2152..c6026d884afc1ddc87c17b79aa7d2c8b3d630c33 100644 --- a/internal/proxynode/proxy.go +++ b/internal/proxynode/proxy.go @@ -12,6 +12,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" "google.golang.org/grpc" @@ -71,8 +72,11 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { Type: "const", Param: 1, }, + Reporter: &config.ReporterConfig{ + LogSpans: true, + }, } - p.tracer, p.closer, err = cfg.NewTracer() + p.tracer, p.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } diff --git a/internal/querynode/api.go b/internal/querynode/api.go new file mode 100644 index 0000000000000000000000000000000000000000..905d3421c369d01cae66b501d44cb59d43fef867 --- /dev/null +++ b/internal/querynode/api.go @@ -0,0 +1,167 @@ +package querynodeimp + +import ( + "context" + "errors" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb" +) + +func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) { + select { + case <-ctx.Done(): + errMsg := "context exceeded" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + default: + searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search result message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + // add request channel + pulsarBufSize := Params.SearchPulsarBufSize + consumeChannels := []string{in.RequestChannelID} + consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + // add result channel + producerChannels := []string{in.ResultChannelID} + resultStream.CreatePulsarProducers(producerChannels) + + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + return status, nil + } +} + +func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) { + select { + case <-ctx.Done(): + errMsg := "context exceeded" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + default: + searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream) + if !ok { + errMsg := "type assertion failed for search result message stream" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + } + + // remove request channel + pulsarBufSize := Params.SearchPulsarBufSize + consumeChannels := []string{in.RequestChannelID} + consumeSubName := Params.MsgChannelSubName + unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + // TODO: searchStream.RemovePulsarConsumers(producerChannels) + searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + + // remove result channel + producerChannels := []string{in.ResultChannelID} + // TODO: resultStream.RemovePulsarProducer(producerChannels) + resultStream.CreatePulsarProducers(producerChannels) + + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + return status, nil + } +} + +func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) { + select { + case <-ctx.Done(): + errMsg := "context exceeded" + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: errMsg, + } + + return status, errors.New(errMsg) + default: + // TODO: add dmMsgStream reference to dataSyncService + //fgDMMsgStream, ok := node.dataSyncService.dmMsgStream.(*msgstream.PulsarMsgStream) + //if !ok { + // errMsg := "type assertion failed for dm message stream" + // status := &commonpb.Status{ + // ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + // Reason: errMsg, + // } + // + // return status, errors.New(errMsg) + //} + // + //// add request channel + //pulsarBufSize := Params.SearchPulsarBufSize + //consumeChannels := in.ChannelIDs + //consumeSubName := Params.MsgChannelSubName + //unmarshalDispatcher := msgstream.NewUnmarshalDispatcher() + //fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize) + // + //status := &commonpb.Status{ + // ErrorCode: commonpb.ErrorCode_SUCCESS, + //} + //return status, nil + return nil, nil + } +} + +func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) { + // TODO: implement + return nil, nil +} + +func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { + // TODO: implement + return nil, nil +} + +func (node *QueryNode) GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) { + // TODO: implement + return nil, nil +} diff --git a/internal/querynode/client/client.go b/internal/querynode/client/client.go index f937f7d9fd85fa48c3bf4c2d4fb387d795103c34..eb1a3bea0ca1655bd1eb524407d1d6265667d1e5 100644 --- a/internal/querynode/client/client.go +++ b/internal/querynode/client/client.go @@ -8,21 +8,29 @@ import ( internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) -type LoadIndexClient struct { +type Client struct { inputStream *msgstream.MsgStream } -func NewLoadIndexClient(ctx context.Context, pulsarAddress string, loadIndexChannels []string) *LoadIndexClient { +func NewQueryNodeClient(ctx context.Context, pulsarAddress string, loadIndexChannels []string) *Client { loadIndexStream := msgstream.NewPulsarMsgStream(ctx, 0) loadIndexStream.SetPulsarClient(pulsarAddress) loadIndexStream.CreatePulsarProducers(loadIndexChannels) var input msgstream.MsgStream = loadIndexStream - return &LoadIndexClient{ + return &Client{ inputStream: &input, } } -func (lic *LoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error { +func (c *Client) Close() { + (*c.inputStream).Close() +} + +func (c *Client) LoadIndex(indexPaths []string, + segmentID int64, + fieldID int64, + fieldName string, + indexParams map[string]string) error { baseMsg := msgstream.BaseMsg{ BeginTimestamp: 0, EndTimestamp: 0, @@ -53,10 +61,6 @@ func (lic *LoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fiel msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, loadIndexMsg) - err := (*lic.inputStream).Produce(&msgPack) + err := (*c.inputStream).Produce(&msgPack) return err } - -func (lic *LoadIndexClient) Close() { - (*lic.inputStream).Close() -} diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index 638afffa4d1edddd978d5d91ee6c1ada24ae9359..646fe33926cdf947470cbdb62dc363b34c824b6e 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index c035069146627edf0c4af0282eb5e6d8b68ad10d..115daf00ffa41b7d90683a8b455a78309d6bd77c 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index 1592ab1aab8774995a7d96ad99820e88a89f925f..fe328fea5b64495b0ee50a0ddc3ffec3c6b44dbd 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "testing" @@ -8,20 +8,20 @@ import ( //----------------------------------------------------------------------------------------------------- collection func TestCollectionReplica_getCollectionNum(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) assert.Equal(t, node.replica.getCollectionNum(), 1) node.Close() } func TestCollectionReplica_addCollection(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) node.Close() } func TestCollectionReplica_removeCollection(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) assert.Equal(t, node.replica.getCollectionNum(), 1) @@ -32,7 +32,7 @@ func TestCollectionReplica_removeCollection(t *testing.T) { } func TestCollectionReplica_getCollectionByID(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -45,7 +45,7 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) { } func TestCollectionReplica_getCollectionByName(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -60,7 +60,7 @@ func TestCollectionReplica_getCollectionByName(t *testing.T) { } func TestCollectionReplica_hasCollection(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -75,7 +75,7 @@ func TestCollectionReplica_hasCollection(t *testing.T) { //----------------------------------------------------------------------------------------------------- partition func TestCollectionReplica_getPartitionNum(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -96,7 +96,7 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) { } func TestCollectionReplica_addPartition(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -113,7 +113,7 @@ func TestCollectionReplica_addPartition(t *testing.T) { } func TestCollectionReplica_removePartition(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -133,7 +133,7 @@ func TestCollectionReplica_removePartition(t *testing.T) { } func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -157,7 +157,7 @@ func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) { } func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -182,7 +182,7 @@ func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) { } func TestCollectionReplica_getPartitionByTag(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -201,7 +201,7 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) { } func TestCollectionReplica_hasPartition(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -218,7 +218,7 @@ func TestCollectionReplica_hasPartition(t *testing.T) { //----------------------------------------------------------------------------------------------------- segment func TestCollectionReplica_addSegment(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -237,7 +237,7 @@ func TestCollectionReplica_addSegment(t *testing.T) { } func TestCollectionReplica_removeSegment(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -259,7 +259,7 @@ func TestCollectionReplica_removeSegment(t *testing.T) { } func TestCollectionReplica_getSegmentByID(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -279,7 +279,7 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) { } func TestCollectionReplica_hasSegment(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -303,7 +303,7 @@ func TestCollectionReplica_hasSegment(t *testing.T) { } func TestCollectionReplica_freeAll(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) diff --git a/internal/querynode/collection_test.go b/internal/querynode/collection_test.go index dc95dafcf9e0547d779a1a783b6af46d7fd542a3..2c4c5e96012577847d67c52dd6ba71c36321627c 100644 --- a/internal/querynode/collection_test.go +++ b/internal/querynode/collection_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "testing" @@ -8,7 +8,7 @@ import ( ) func TestCollection_Partitions(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 5ed2caa0675b53c4aa9a895a6983cd3f98dc0057..3cdf6fa562e6e3f721ee354690368181586c35ae 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" @@ -40,15 +40,15 @@ func (dsService *dataSyncService) initNodes() { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - var dmStreamNode Node = newDmInputNode(dsService.ctx) - var ddStreamNode Node = newDDInputNode(dsService.ctx) + var dmStreamNode node = newDmInputNode(dsService.ctx) + var ddStreamNode node = newDDInputNode(dsService.ctx) - var filterDmNode Node = newFilteredDmNode() - var ddNode Node = newDDNode(dsService.replica) + var filterDmNode node = newFilteredDmNode() + var ddNode node = newDDNode(dsService.replica) - var insertNode Node = newInsertNode(dsService.replica) - var serviceTimeNode Node = newServiceTimeNode(dsService.replica) - var gcNode Node = newGCNode(dsService.replica) + var insertNode node = newInsertNode(dsService.replica) + var serviceTimeNode node = newServiceTimeNode(dsService.replica) + var gcNode node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&ddStreamNode) diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 36de0523562210dd48095822e92eb35f3fdbacb0..c5f429abe32d9e7a3c9f77e073887dcd3073fdf6 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "encoding/binary" @@ -14,7 +14,7 @@ import ( // NOTE: start pulsar before test func TestDataSyncService_Start(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) // test data generate const msgLength = 10 diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index a7a2ac73201ff48439090ad0ef5f58d9c1b38e1d..880137dc6d487bdf7ff30e7f45751b31b131a00c 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "log" @@ -12,7 +12,7 @@ import ( ) type ddNode struct { - BaseNode + baseNode ddMsg *ddMsg replica collectionReplica } @@ -179,12 +179,12 @@ func newDDNode(replica collectionReplica) *ddNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - baseNode := BaseNode{} + baseNode := baseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &ddNode{ - BaseNode: baseNode, + baseNode: baseNode, replica: replica, } } diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 0366e7a65a107cf6cc3ac14873e89cd8962f4df0..d873c487fb9f517906b2d1625126e2cf65bd72f8 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -1,7 +1,7 @@ -package querynode +package querynodeimp type deleteNode struct { - BaseNode + baseNode deleteMsg deleteMsg } @@ -17,11 +17,11 @@ func newDeleteNode() *deleteNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - baseNode := BaseNode{} + baseNode := baseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &deleteNode{ - BaseNode: baseNode, + baseNode: baseNode, } } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 3368e9e31f5f5b0b5ceeb3da1c382c960e947961..26d4658e82c6c3c3d1e74f3d424d0eec2367b5c8 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" @@ -12,7 +12,7 @@ import ( ) type filterDmNode struct { - BaseNode + baseNode ddMsg *ddMsg } @@ -159,11 +159,11 @@ func newFilteredDmNode() *filterDmNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - baseNode := BaseNode{} + baseNode := baseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &filterDmNode{ - BaseNode: baseNode, + baseNode: baseNode, } } diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index cd0a9b984e7cf991436f3a3195935133e45c4c9a..aae7ddea9942ed49b9160cd70db44f1e7ba25b6f 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -1,11 +1,11 @@ -package querynode +package querynodeimp import ( "log" ) type gcNode struct { - BaseNode + baseNode replica collectionReplica } @@ -50,12 +50,12 @@ func newGCNode(replica collectionReplica) *gcNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - baseNode := BaseNode{} + baseNode := baseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &gcNode{ - BaseNode: baseNode, + baseNode: baseNode, replica: replica, } } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index f375c824023be164ba3e933104483dded936883f..57f6500b1ce0f6d47f9d5fc358df0072d43476ef 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" @@ -13,7 +13,7 @@ import ( ) type insertNode struct { - BaseNode + baseNode replica collectionReplica } @@ -162,12 +162,12 @@ func newInsertNode(replica collectionReplica) *insertNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - baseNode := BaseNode{} + baseNode := baseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &insertNode{ - BaseNode: baseNode, + baseNode: baseNode, replica: replica, } } diff --git a/internal/querynode/flow_graph_key2seg_node.go b/internal/querynode/flow_graph_key2seg_node.go index 77b84ad0fa6a46ca301300da10b3abc453db6e07..ff3b966a5bf7751f15a652bd6e95a2793a67e22d 100644 --- a/internal/querynode/flow_graph_key2seg_node.go +++ b/internal/querynode/flow_graph_key2seg_node.go @@ -1,7 +1,7 @@ -package querynode +package querynodeimp type key2SegNode struct { - BaseNode + baseNode key2SegMsg key2SegMsg } @@ -17,12 +17,12 @@ func newKey2SegNode() *key2SegNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - baseNode := BaseNode{} + baseNode := baseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &key2SegNode{ - BaseNode: baseNode, + baseNode: baseNode, } } diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index 451f9b6952ad003a3f687ee44550808cce6bc481..58d4eb5283a5bc2fa96ef89fc0282feca61d0d4e 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "github.com/zilliztech/milvus-distributed/internal/msgstream" diff --git a/internal/querynode/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go index 9ea29ab7b1d040c58c0db227dbe77537ff37de3b..d71902bd57915fce9b72b063b89bf38353ec26f2 100644 --- a/internal/querynode/flow_graph_msg_stream_input_nodes.go +++ b/internal/querynode/flow_graph_msg_stream_input_nodes.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" diff --git a/internal/querynode/flow_graph_node.go b/internal/querynode/flow_graph_node.go index e585fcb07d870a0e630b8076ca42e54141d30513..d6e9f8b0480b688e226e016ef026764b185a2cbb 100644 --- a/internal/querynode/flow_graph_node.go +++ b/internal/querynode/flow_graph_node.go @@ -1,7 +1,7 @@ -package querynode +package querynodeimp import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" -type BaseNode = flowgraph.BaseNode -type Node = flowgraph.Node -type InputNode = flowgraph.InputNode +type baseNode = flowgraph.BaseNode +type node = flowgraph.Node +type inputNode = flowgraph.InputNode diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 275666560d589c95f57b1f13ddefa90b5eb76ee6..8262909d98f078c073da17143de6e4303c2f02f0 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -1,11 +1,11 @@ -package querynode +package querynodeimp import ( "log" ) type serviceTimeNode struct { - BaseNode + baseNode replica collectionReplica } @@ -42,12 +42,12 @@ func newServiceTimeNode(replica collectionReplica) *serviceTimeNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism - baseNode := BaseNode{} + baseNode := baseNode{} baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) return &serviceTimeNode{ - BaseNode: baseNode, + baseNode: baseNode, replica: replica, } } diff --git a/internal/querynode/index.go b/internal/querynode/index.go index 8c8f84b17e6e7c59dc0dbe5e3c1a2179f3bfbfb3..fdea5e83669e8b1fcdb7905440bc22a935fd9305 100644 --- a/internal/querynode/index.go +++ b/internal/querynode/index.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go index d56cca4f21dbf1972c79ebe331b29ffacd4a33e9..a654d9628c244cb5b12378440d6ccaa5104bdec6 100644 --- a/internal/querynode/load_index_info.go +++ b/internal/querynode/load_index_info.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include diff --git a/internal/querynode/load_index_info_test.go b/internal/querynode/load_index_info_test.go index 95261c7002eb7c9012598afab0fe59d66d69840d..64bf1966a5f9e2d48eb147427e734c58033a5c3d 100644 --- a/internal/querynode/load_index_info_test.go +++ b/internal/querynode/load_index_info_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "testing" diff --git a/internal/querynode/load_index_service.go b/internal/querynode/load_index_service.go index 10857b4f3a4d313c7073e4841c97750e50055cfb..e6c44533173a63518107568cd259968839f3b4a5 100644 --- a/internal/querynode/load_index_service.go +++ b/internal/querynode/load_index_service.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" diff --git a/internal/querynode/load_index_service_test.go b/internal/querynode/load_index_service_test.go index 71d79ba3ac7ccca453a92d482b3c55a5971007d2..be6c6b0bbb970a4cd1d5b92cfb8afb95a7f752f0 100644 --- a/internal/querynode/load_index_service_test.go +++ b/internal/querynode/load_index_service_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "encoding/binary" @@ -23,7 +23,7 @@ import ( ) func TestLoadIndexService_FloatVector(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionID := rand.Int63n(1000000) segmentID := rand.Int63n(1000000) initTestMeta(t, node, "collection0", collectionID, segmentID) @@ -276,7 +276,7 @@ func TestLoadIndexService_FloatVector(t *testing.T) { // create loadIndexClient fieldID := UniqueID(100) loadIndexChannelNames := Params.LoadIndexChannelNames - client := client.NewLoadIndexClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames) + client := client.NewQueryNodeClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames) client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams) // init message stream consumer and do checks @@ -341,7 +341,7 @@ func TestLoadIndexService_FloatVector(t *testing.T) { } func TestLoadIndexService_BinaryVector(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionID := rand.Int63n(1000000) segmentID := rand.Int63n(1000000) initTestMeta(t, node, "collection0", collectionID, segmentID, true) @@ -584,7 +584,7 @@ func TestLoadIndexService_BinaryVector(t *testing.T) { // create loadIndexClient fieldID := UniqueID(100) loadIndexChannelNames := Params.LoadIndexChannelNames - client := client.NewLoadIndexClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames) + client := client.NewQueryNodeClient(node.queryNodeLoopCtx, Params.PulsarAddress, loadIndexChannelNames) client.LoadIndex(indexPaths, segmentID, fieldID, "vec", indexParams) // init message stream consumer and do checks diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go index f24dc1433aa1cb8e517c19ad0ace93e86e2dbfd6..ea0880e48665b7a936e0a35236893c45d4339130 100644 --- a/internal/querynode/meta_service.go +++ b/internal/querynode/meta_service.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" diff --git a/internal/querynode/meta_service_test.go b/internal/querynode/meta_service_test.go index 01338f36e08a0fb0941705c3927bf27fd6f8d2b9..05aefca6b0c8a74ae71eb16c76d8f03d7620dd29 100644 --- a/internal/querynode/meta_service_test.go +++ b/internal/querynode/meta_service_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "math" @@ -10,7 +10,7 @@ import ( ) func TestMetaService_start(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) node.metaService.start() @@ -118,7 +118,7 @@ func TestMetaService_printSegmentStruct(t *testing.T) { } func TestMetaService_processCollectionCreate(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) id := "0" @@ -163,7 +163,7 @@ func TestMetaService_processCollectionCreate(t *testing.T) { } func TestMetaService_processSegmentCreate(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -185,7 +185,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) { } func TestMetaService_processCreate(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) key1 := Params.MetaRootPath + "/collection/0" @@ -241,7 +241,7 @@ func TestMetaService_processCreate(t *testing.T) { } func TestMetaService_processSegmentModify(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) segmentID := UniqueID(0) @@ -275,7 +275,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { } func TestMetaService_processCollectionModify(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) id := "0" @@ -383,7 +383,7 @@ func TestMetaService_processCollectionModify(t *testing.T) { } func TestMetaService_processModify(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) key1 := Params.MetaRootPath + "/collection/0" @@ -516,7 +516,7 @@ func TestMetaService_processModify(t *testing.T) { } func TestMetaService_processSegmentDelete(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) @@ -541,7 +541,7 @@ func TestMetaService_processSegmentDelete(t *testing.T) { } func TestMetaService_processCollectionDelete(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) id := "0" @@ -589,7 +589,7 @@ func TestMetaService_processCollectionDelete(t *testing.T) { } func TestMetaService_processDelete(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) key1 := Params.MetaRootPath + "/collection/0" @@ -652,7 +652,7 @@ func TestMetaService_processDelete(t *testing.T) { } func TestMetaService_processResp(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) metaChan := (*node.metaService).kvBase.WatchWithPrefix("") @@ -667,7 +667,7 @@ func TestMetaService_processResp(t *testing.T) { } func TestMetaService_loadCollections(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) err2 := (*node.metaService).loadCollections() @@ -676,7 +676,7 @@ func TestMetaService_loadCollections(t *testing.T) { } func TestMetaService_loadSegments(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) err2 := (*node.metaService).loadSegments() diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index 663134dc153eb1ec688e771cc1afdfac8dd38548..24caf25d1ecc37550b6764924b7f11f42f9fc78b 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "log" diff --git a/internal/querynode/param_table_test.go b/internal/querynode/param_table_test.go index 461073146f8597d88c29076365c7bbb8e45dd0a8..bceb0ff7d74077b5b40e822d20eb4148c9f6741b 100644 --- a/internal/querynode/param_table_test.go +++ b/internal/querynode/param_table_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "fmt" diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index e2dc4593d792425db5ba927b18497f4984ae2972..538e5732d810ce406e7fc123bde45fbc2c323704 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* diff --git a/internal/querynode/partition_test.go b/internal/querynode/partition_test.go index 8d27b88e6b5ec869c4b40ce93a72d24a7bd0282b..aeccc6cccda7675e0ccf24b29d0532fbabf500d9 100644 --- a/internal/querynode/partition_test.go +++ b/internal/querynode/partition_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "testing" @@ -7,7 +7,7 @@ import ( ) func TestPartition_Segments(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() collectionName := "collection0" collectionID := UniqueID(0) initTestMeta(t, node, collectionName, collectionID, 0) diff --git a/internal/querynode/plan.go b/internal/querynode/plan.go index 0909406189a66b1ce2c74865bc2bbb25237f9463..b0d695576da8ceea1534445c4eabf1edf2fdf78e 100644 --- a/internal/querynode/plan.go +++ b/internal/querynode/plan.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include diff --git a/internal/querynode/plan_test.go b/internal/querynode/plan_test.go index e707f404d183ed8825b8d0cfaa060129361a2f51..88913550d3753e1b06cc3d2db7c9794014fc5c84 100644 --- a/internal/querynode/plan_test.go +++ b/internal/querynode/plan_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "encoding/binary" diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index bab8e47afb07ad16c221e289f29a795d7a1376e3..38ad79bee9aed7a30f8f75990d662087d2394743 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* @@ -18,18 +18,36 @@ import ( "io" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" + "google.golang.org/grpc" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) +type Node interface { + Start() error + Close() + + AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) + RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) + WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) + LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) + ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) + GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) +} + type QueryNode struct { queryNodeLoopCtx context.Context queryNodeLoopCancel context.CancelFunc QueryNodeID uint64 + grpcServer *grpc.Server replica collectionReplica - // services + // internal services dataSyncService *dataSyncService metaService *metaService searchService *searchService @@ -45,7 +63,12 @@ func Init() { Params.Init() } -func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { +func NewQueryNode(ctx context.Context, queryNodeID uint64) Node { + var node Node = newQueryNode(ctx, queryNodeID) + return node +} + +func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { ctx1, cancel := context.WithCancel(ctx) q := &QueryNode{ @@ -66,8 +89,11 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { Type: "const", Param: 1, }, + Reporter: &config.ReporterConfig{ + LogSpans: true, + }, } - q.tracer, q.closer, err = cfg.NewTracer() + q.tracer, q.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 1217fa3da34d8069dd5a41bb08f5af2d557651bc..c7d57803f6d3048049784012f0bb7b03c8c6ca8f 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" @@ -118,7 +118,7 @@ func initTestMeta(t *testing.T, node *QueryNode, collectionName string, collecti assert.NoError(t, err) } -func newQueryNode() *QueryNode { +func newQueryNodeMock() *QueryNode { var ctx context.Context @@ -134,7 +134,7 @@ func newQueryNode() *QueryNode { ctx = context.Background() } - svr := NewQueryNode(ctx, 0) + svr := newQueryNode(ctx, 0) return svr } @@ -166,7 +166,7 @@ func TestMain(m *testing.M) { // NOTE: start pulsar and etcd before test func TestQueryNode_Start(t *testing.T) { - localNode := newQueryNode() + localNode := newQueryNodeMock() err := localNode.Start() assert.Nil(t, err) localNode.Close() diff --git a/internal/querynode/reduce.go b/internal/querynode/reduce.go index 8fa56bf7fbaf4dd2ade4cae0b966bee92f61b8fa..d6a9e858ea354834799871dbf49ab987581c3d24 100644 --- a/internal/querynode/reduce.go +++ b/internal/querynode/reduce.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* #cgo CFLAGS: -I${SRCDIR}/../core/output/include diff --git a/internal/querynode/reduce_test.go b/internal/querynode/reduce_test.go index 8667c68f4a03a9176227d6ee9e148246a8cb7ebc..0dadbdc7025903781c00016350b913fabc659e78 100644 --- a/internal/querynode/reduce_test.go +++ b/internal/querynode/reduce_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "encoding/binary" diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 5a6ce44a55ba9d171912a0d13cd929039e97be87..689aa7135063e9dce53a39b96b2bd8f981063500 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import "C" import ( diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index d3dfcf316ae11201e54a7a6167ff750f4a40d093..789b9348388167f65465c240412395e42006fd7c 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" @@ -18,7 +18,7 @@ import ( ) func TestSearch_Search(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) pulsarURL := Params.PulsarAddress @@ -204,7 +204,7 @@ func TestSearch_Search(t *testing.T) { } func TestSearch_SearchMultiSegments(t *testing.T) { - node := NewQueryNode(context.Background(), 0) + node := newQueryNode(context.Background(), 0) initTestMeta(t, node, "collection0", 0, 0) pulsarURL := Params.PulsarAddress diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 2e00200a2fb6f5af96e5886ffb80cd8b531dd9bc..bd9380b0ad6cffaa68fd5e552f3465b910ecc5ba 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp /* diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 52689704b7219fdb333357bac364db06abd20be1..3de321fc771282374faf79ef5f70242543acee70 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "encoding/binary" diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index 17c8bd9473e927ddc6d2402e2dabbdebd86e4901..c5e27f9b88d3d738aefa9bed0692da460b12615c 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "context" diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index c7ff4b3f945d151c58a3e9776e7fe4c5dae2d405..574d258bcca6a8feca559946654b2ca152e51860 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "testing" @@ -8,7 +8,7 @@ import ( // NOTE: start pulsar before test func TestStatsService_start(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil) node.statsService.start() @@ -17,7 +17,7 @@ func TestStatsService_start(t *testing.T) { //NOTE: start pulsar before test func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { - node := newQueryNode() + node := newQueryNodeMock() initTestMeta(t, node, "collection0", 0, 0) const receiveBufSize = 1024 diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 60529a3c9868837c3e651dc057b8d407281edeb7..eab7dd19668ea9dc9ae3ddb60a78ba0bc37f00b9 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "sync" diff --git a/internal/querynode/tsafe_test.go b/internal/querynode/tsafe_test.go index 1ae166f7f7a19f1f72df35c3c9d3a6cae016d569..88021d1036071046d32052069802cd55d6b5ed97 100644 --- a/internal/querynode/tsafe_test.go +++ b/internal/querynode/tsafe_test.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import ( "testing" diff --git a/internal/querynode/type_def.go b/internal/querynode/type_def.go index 6cbd347791bb36cc3a76b997fde3ff562696a578..7b6fb06efb28d2da31ff2e4139a1fbede475c99f 100644 --- a/internal/querynode/type_def.go +++ b/internal/querynode/type_def.go @@ -1,4 +1,4 @@ -package querynode +package querynodeimp import "github.com/zilliztech/milvus-distributed/internal/util/typeutil" diff --git a/internal/queryservice/interface.go b/internal/queryservice/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..7f2013b3167caeb43ec550a7f1ddc33094589237 --- /dev/null +++ b/internal/queryservice/interface.go @@ -0,0 +1,18 @@ +package queryserviceimpl + +import "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + +type Interface interface { + RegisterNode(req querypb.RegisterNodeRequest) (querypb.RegisterNodeResponse, error) + + ShowCollections(req querypb.ShowCollectionRequest) (querypb.ShowCollectionResponse, error) + LoadCollection(req querypb.LoadCollectionRequest) error + ReleaseCollection(req querypb.ReleaseCollectionRequest) error + + ShowPartitions(req querypb.ShowPartitionRequest) (querypb.ShowPartitionResponse, error) + GetPartitionStates(req querypb.PartitionStatesRequest) (querypb.PartitionStatesResponse, error) + LoadPartitions(req querypb.LoadPartitionRequest) error + ReleasePartitions(req querypb.ReleasePartitionRequest) error + + CreateQueryChannel() (querypb.CreateQueryChannelResponse, error) +} diff --git a/internal/queryservice/query_service.go b/internal/queryservice/query_service.go new file mode 100644 index 0000000000000000000000000000000000000000..3d2a84421451be42fa856f68e43e04dc0fcef8c7 --- /dev/null +++ b/internal/queryservice/query_service.go @@ -0,0 +1,42 @@ +package queryserviceimpl + +import "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + +type QueryService struct { +} + +func (qs *QueryService) RegisterNode(req querypb.RegisterNodeRequest) (querypb.RegisterNodeResponse, error) { + return querypb.RegisterNodeResponse{}, nil +} + +func (qs *QueryService) ShowCollections(req querypb.ShowCollectionRequest) (querypb.ShowCollectionResponse, error) { + return querypb.ShowCollectionResponse{}, nil +} + +func (qs *QueryService) LoadCollection(req querypb.LoadCollectionRequest) error { + return nil +} + +func (qs *QueryService) ReleaseCollection(req querypb.ReleaseCollectionRequest) error { + return nil +} + +func (qs *QueryService) ShowPartitions(req querypb.ShowPartitionRequest) (querypb.ShowPartitionResponse, error) { + return querypb.ShowPartitionResponse{}, nil +} + +func (qs *QueryService) GetPartitionStates(req querypb.PartitionStatesRequest) (querypb.PartitionStatesResponse, error) { + return querypb.PartitionStatesResponse{}, nil +} + +func (qs *QueryService) LoadPartitions(req querypb.LoadPartitionRequest) error { + return nil +} + +func (qs *QueryService) ReleasePartitions(req querypb.ReleasePartitionRequest) error { + return nil +} + +func (qs *QueryService) CreateQueryChannel() (querypb.CreateQueryChannelResponse, error) { + return querypb.CreateQueryChannelResponse{}, nil +} diff --git a/internal/writenode/write_node.go b/internal/writenode/write_node.go index 280a3298c0aa9792d83fe1c7da1d680bc7b01746..5199148c96cc1476e3bf9525f12cdf649b776c08 100644 --- a/internal/writenode/write_node.go +++ b/internal/writenode/write_node.go @@ -6,6 +6,7 @@ import ( "io" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" ) @@ -51,9 +52,12 @@ func (node *WriteNode) Start() error { Type: "const", Param: 1, }, + Reporter: &config.ReporterConfig{ + LogSpans: true, + }, } var err error - node.tracer, node.closer, err = cfg.NewTracer() + node.tracer, node.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) }