diff --git a/cmd/datanode/main.go b/cmd/datanode/main.go index 7bde774e98af59b0d83628406d3bfc718914182d..42e8de857ca864f43ce095ca5cf3bf1e10c9db25 100644 --- a/cmd/datanode/main.go +++ b/cmd/datanode/main.go @@ -8,6 +8,7 @@ import ( "syscall" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) func main() { @@ -15,7 +16,9 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dn, err := distributed.NewDataNode(ctx) + msFactory := pulsarms.NewFactory() + + dn, err := distributed.NewDataNode(ctx, msFactory) if err != nil { panic(err) } diff --git a/cmd/dataservice/main.go b/cmd/dataservice/main.go index d7d3178361a208dc8b9fab973c2cb28aa203fe96..ad1c81e61d450474c73cf51f664262c9cfd1c351 100644 --- a/cmd/dataservice/main.go +++ b/cmd/dataservice/main.go @@ -8,12 +8,15 @@ import ( "syscall" "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) func main() { ctx, cancel := context.WithCancel(context.Background()) - svr, err := components.NewDataService(ctx) + msFactory := pulsarms.NewFactory() + + svr, err := components.NewDataService(ctx, msFactory) if err != nil { panic(err) } diff --git a/cmd/distributed/components/data_node.go b/cmd/distributed/components/data_node.go index 9a9ae38367bad7958087a53b1603ce98d1f29e58..1b372cdc6c9a3f9689000b0103964713084ad30a 100644 --- a/cmd/distributed/components/data_node.go +++ b/cmd/distributed/components/data_node.go @@ -12,6 +12,7 @@ import ( dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) @@ -24,12 +25,12 @@ type DataNode struct { dataService *dsc.Client } -func NewDataNode(ctx context.Context) (*DataNode, error) { +func NewDataNode(ctx context.Context, factory msgstream.Factory) (*DataNode, error) { const retry = 10 const interval = 200 - svr, err := dnc.New(ctx) + svr, err := dnc.New(ctx, factory) if err != nil { panic(err) } diff --git a/cmd/distributed/components/data_service.go b/cmd/distributed/components/data_service.go index c1b2ee4f205c0640938563517a55e19bf17d6f16..1205db71a5e8c5312f68f000f2d30a7fa02e1710 100644 --- a/cmd/distributed/components/data_service.go +++ b/cmd/distributed/components/data_service.go @@ -9,6 +9,7 @@ import ( ms "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" @@ -20,8 +21,8 @@ type DataService struct { masterClient *ms.GrpcClient } -func NewDataService(ctx context.Context) (*DataService, error) { - service := dataservice.NewGrpcService(ctx) +func NewDataService(ctx context.Context, factory msgstream.Factory) (*DataService, error) { + service := dataservice.NewGrpcService(ctx, factory) masterservice.Params.Init() client, err := ms.NewGrpcClient(fmt.Sprintf("%s:%d", masterservice.Params.Address, masterservice.Params.Port), 30*time.Second) diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go index 9d90c7e139dc2ded1737d9c77d45203ad9540042..562171643857133a46afa89c4c7714eb8e270d34 100644 --- a/cmd/distributed/components/master_service.go +++ b/cmd/distributed/components/master_service.go @@ -15,6 +15,7 @@ import ( qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" is "github.com/zilliztech/milvus-distributed/internal/indexservice" ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" qs "github.com/zilliztech/milvus-distributed/internal/queryservice" @@ -30,10 +31,10 @@ type MasterService struct { queryService *qsc.Client } -func NewMasterService(ctx context.Context) (*MasterService, error) { +func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterService, error) { const reTryCnt = 3 - svr, err := msc.NewGrpcServer(ctx) + svr, err := msc.NewGrpcServer(ctx, factory) if err != nil { return nil, err } diff --git a/cmd/distributed/components/proxy_node.go b/cmd/distributed/components/proxy_node.go index da6c5d97c1932e8a81f15a85dc23cd2b9502ebce..a3b47cedd3877e8870178a131c9b41e79ad2e63d 100644 --- a/cmd/distributed/components/proxy_node.go +++ b/cmd/distributed/components/proxy_node.go @@ -3,6 +3,8 @@ package components import ( "context" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + grpcproxynode "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode" ) @@ -10,9 +12,9 @@ type ProxyNode struct { svr *grpcproxynode.Server } -func NewProxyNode(ctx context.Context) (*ProxyNode, error) { +func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) { n := &ProxyNode{} - svr, err := grpcproxynode.NewServer(ctx) + svr, err := grpcproxynode.NewServer(ctx, factory) if err != nil { return nil, err } diff --git a/cmd/distributed/components/proxy_service.go b/cmd/distributed/components/proxy_service.go index f044ae246dfc5ef96ec90897a1bcc9e36a63dfb4..4581e729d1c3f9618b5e058cdc0351502d57e4dc 100644 --- a/cmd/distributed/components/proxy_service.go +++ b/cmd/distributed/components/proxy_service.go @@ -3,6 +3,8 @@ package components import ( "context" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + grpcproxyservice "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" ) @@ -10,9 +12,9 @@ type ProxyService struct { svr *grpcproxyservice.Server } -func NewProxyService(ctx context.Context) (*ProxyService, error) { +func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) { service := &ProxyService{} - svr, err := grpcproxyservice.NewServer(ctx) + svr, err := grpcproxyservice.NewServer(ctx, factory) if err != nil { return nil, err } diff --git a/cmd/distributed/components/query_node.go b/cmd/distributed/components/query_node.go index c9009f282e41259aed328648556a96b5a81ca177..621c3a5f0b7e495e6b451d02c9e5eddadf492851 100644 --- a/cmd/distributed/components/query_node.go +++ b/cmd/distributed/components/query_node.go @@ -11,6 +11,7 @@ import ( msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" qns "github.com/zilliztech/milvus-distributed/internal/distributed/querynode" qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + "github.com/zilliztech/milvus-distributed/internal/msgstream" ds "github.com/zilliztech/milvus-distributed/internal/dataservice" is "github.com/zilliztech/milvus-distributed/internal/indexservice" @@ -32,11 +33,11 @@ type QueryNode struct { queryService *qsc.Client } -func NewQueryNode(ctx context.Context) (*QueryNode, error) { +func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, error) { const retry = 10 const interval = 500 - svr, err := qns.NewServer(ctx) + svr, err := qns.NewServer(ctx, factory) if err != nil { panic(err) } diff --git a/cmd/distributed/components/query_service.go b/cmd/distributed/components/query_service.go index 78337cd10dc3f35b9946160c7108f58fbdf2070c..ca288ffb7701c58bbd2d602a33856784453d1e18 100644 --- a/cmd/distributed/components/query_service.go +++ b/cmd/distributed/components/query_service.go @@ -12,6 +12,7 @@ import ( ds "github.com/zilliztech/milvus-distributed/internal/dataservice" ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/queryservice" @@ -25,12 +26,12 @@ type QueryService struct { masterService *msc.GrpcClient } -func NewQueryService(ctx context.Context) (*QueryService, error) { +func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) { const retry = 10 const interval = 200 queryservice.Params.Init() - svr, err := qs.NewServer(ctx) + svr, err := qs.NewServer(ctx, factory) if err != nil { panic(err) } diff --git a/cmd/distributed/main.go b/cmd/distributed/main.go index aacedf752e54a4aea939971935f0a56f0d58d921..5e6b50e929a043435941f63a5b0064273d80be5f 100644 --- a/cmd/distributed/main.go +++ b/cmd/distributed/main.go @@ -64,7 +64,7 @@ func run(serverType string) error { default: return errors.Errorf("unknown server type = %s", serverType) } - role.Run() + role.Run(false) return nil } diff --git a/cmd/distributed/roles/roles.go b/cmd/distributed/roles/roles.go index 5419d94832d54b7f94d3389e78470df355fd9ce2..ec7036830ba0dfe2dd0776bffc40ac8f4804e007 100644 --- a/cmd/distributed/roles/roles.go +++ b/cmd/distributed/roles/roles.go @@ -9,8 +9,18 @@ import ( "syscall" "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" + "github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms" ) +func newMsgFactory(localMsg bool) msgstream.Factory { + if localMsg { + return rmqms.NewFactory() + } + return pulsarms.NewFactory() +} + type MilvusRoles struct { EnableMaster bool `env:"ENABLE_MASTER"` EnableProxyService bool `env:"ENABLE_PROXY_SERVICE"` @@ -42,7 +52,7 @@ func (mr *MilvusRoles) EnvValue(env string) bool { return false } -func (mr *MilvusRoles) Run() { +func (mr *MilvusRoles) Run(localMsg bool) { if !mr.HasAnyRole() { log.Printf("set the roles please ...") return @@ -55,8 +65,9 @@ func (mr *MilvusRoles) Run() { if mr.EnableMaster { log.Print("start as master service") go func() { + factory := newMsgFactory(localMsg) var err error - masterService, err = components.NewMasterService(ctx) + masterService, err = components.NewMasterService(ctx, factory) if err != nil { panic(err) } @@ -68,8 +79,9 @@ func (mr *MilvusRoles) Run() { if mr.EnableProxyService { log.Print("start as proxy service") go func() { + factory := newMsgFactory(localMsg) var err error - proxyService, err = components.NewProxyService(ctx) + proxyService, err = components.NewProxyService(ctx, factory) if err != nil { panic(err) } @@ -81,8 +93,9 @@ func (mr *MilvusRoles) Run() { if mr.EnableProxyNode { log.Print("start as proxy node") go func() { + factory := newMsgFactory(localMsg) var err error - proxyNode, err = components.NewProxyNode(ctx) + proxyNode, err = components.NewProxyNode(ctx, factory) if err != nil { panic(err) } @@ -94,8 +107,9 @@ func (mr *MilvusRoles) Run() { if mr.EnableQueryService { log.Print("start as query service") go func() { + factory := newMsgFactory(localMsg) var err error - queryService, err = components.NewQueryService(ctx) + queryService, err = components.NewQueryService(ctx, factory) if err != nil { panic(err) } @@ -107,8 +121,9 @@ func (mr *MilvusRoles) Run() { if mr.EnableQueryNode { log.Print("start as query node") go func() { + factory := newMsgFactory(localMsg) var err error - queryNode, err = components.NewQueryNode(ctx) + queryNode, err = components.NewQueryNode(ctx, factory) if err != nil { panic(err) } @@ -120,8 +135,9 @@ func (mr *MilvusRoles) Run() { if mr.EnableDataService { log.Print("start as data service") go func() { + factory := newMsgFactory(localMsg) var err error - dataService, err = components.NewDataService(ctx) + dataService, err = components.NewDataService(ctx, factory) if err != nil { panic(err) } @@ -133,8 +149,9 @@ func (mr *MilvusRoles) Run() { if mr.EnableDataNode { log.Print("start as data node") go func() { + factory := newMsgFactory(localMsg) var err error - dataNode, err = components.NewDataNode(ctx) + dataNode, err = components.NewDataNode(ctx, factory) if err != nil { panic(err) } diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index a59d5e1e87400566e10813794b8ce46935445343..34b146ecd49bb83ba8c84f66df67f5aa48cc9e0f 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -8,13 +8,15 @@ import ( "syscall" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ms, err := distributed.NewMasterService(ctx) + msFactory := pulsarms.NewFactory() + ms, err := distributed.NewMasterService(ctx, msFactory) if err != nil { panic(err) } diff --git a/cmd/proxy/node/proxy_node.go b/cmd/proxy/node/proxy_node.go index 9e271920f9ff52c9e4223f677b6a1732c4806245..e146b9905f2d322f6b2f58fbda81846d1a21f006 100644 --- a/cmd/proxy/node/proxy_node.go +++ b/cmd/proxy/node/proxy_node.go @@ -8,13 +8,15 @@ import ( "syscall" "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "go.uber.org/zap" ) func main() { ctx, cancel := context.WithCancel(context.Background()) - n, err := components.NewProxyNode(ctx) + msFactory := pulsarms.NewFactory() + n, err := components.NewProxyNode(ctx, msFactory) if err != nil { log.Print("create server failed", zap.Error(err)) } diff --git a/cmd/proxy/service/proxy_service.go b/cmd/proxy/service/proxy_service.go index 9086d87b3355f2f443c16150bfb309c970e1705b..fdc1d21716c75c3388129b0dcc1cb93aae9637b9 100644 --- a/cmd/proxy/service/proxy_service.go +++ b/cmd/proxy/service/proxy_service.go @@ -8,13 +8,15 @@ import ( "syscall" "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "go.uber.org/zap" ) func main() { ctx, cancel := context.WithCancel(context.Background()) - s, err := components.NewProxyService(ctx) + msFactory := pulsarms.NewFactory() + s, err := components.NewProxyService(ctx, msFactory) if err != nil { log.Fatal("create proxy service error: " + err.Error()) } diff --git a/cmd/querynode/querynode.go b/cmd/querynode/querynode.go index 86e984cff9b52e605658d30a8ba8fd72053a705e..179f5618cb986193edc4205c941e8c9298c82c90 100644 --- a/cmd/querynode/querynode.go +++ b/cmd/querynode/querynode.go @@ -8,13 +8,15 @@ import ( "syscall" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - svr, err := distributed.NewQueryNode(ctx) + msFactory := pulsarms.NewFactory() + svr, err := distributed.NewQueryNode(ctx, msFactory) if err != nil { panic(err) diff --git a/cmd/queryservice/queryservice.go b/cmd/queryservice/queryservice.go index 1390e307fe4bb73e59aa651ab2436b3ca5230b66..1cc29da53fdcbfe2a7c978828db00c8c50a83f58 100644 --- a/cmd/queryservice/queryservice.go +++ b/cmd/queryservice/queryservice.go @@ -8,13 +8,16 @@ import ( "syscall" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - svr, err := distributed.NewQueryService(ctx) + msFactory := pulsarms.NewFactory() + + svr, err := distributed.NewQueryService(ctx, msFactory) if err != nil { panic(err) } diff --git a/cmd/singlenode/main.go b/cmd/singlenode/main.go index 40265fd14130f38d489576b7a2698e202dc9cd7e..c597ab4fd826ef387f4931fada95b1882665f326 100644 --- a/cmd/singlenode/main.go +++ b/cmd/singlenode/main.go @@ -20,5 +20,5 @@ func initRoles(roles *roles.MilvusRoles) { func main() { var roles roles.MilvusRoles initRoles(&roles) - roles.Run() + roles.Run(false) } diff --git a/go.mod b/go.mod index dd5cf6093d642d8dc6335854d66b194386ad6511..0f160674e9beb6633e0410d4ae8090cc8730d34c 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/klauspost/compress v1.10.11 // indirect github.com/kr/text v0.2.0 // indirect github.com/minio/minio-go/v7 v7.0.5 + github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/reflect2 v1.0.1 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/oklog/run v1.1.0 diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index b97e15db5d5f717092e526e125b6fd660597b30d..c7eb4696ededbc04b8d9a818133a52103046d52e 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -9,6 +9,7 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -70,10 +71,12 @@ type ( replica Replica closer io.Closer + + msFactory msgstream.Factory } ) -func NewDataNode(ctx context.Context) *DataNode { +func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { Params.Init() ctx2, cancel2 := context.WithCancel(ctx) @@ -89,6 +92,7 @@ func NewDataNode(ctx context.Context) *DataNode { masterService: nil, dataService: nil, replica: nil, + msFactory: factory, } node.State.Store(internalpb2.StateCode_INITIALIZING) @@ -165,7 +169,7 @@ func (node *DataNode) Init() error { chanSize := 100 node.flushChan = make(chan *flushMsg, chanSize) - node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc) + node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc, node.msFactory) node.dataSyncService.init() node.metaService = newMetaService(node.ctx, replica, node.masterService) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 0e7146dfb643e2596ce7977b0478f158cfc0aac8..d883b3003eb3eb2d2065faf9603eaee26025b808 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -5,6 +5,7 @@ import ( "log" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" "go.etcd.io/etcd/clientv3" ) @@ -15,16 +16,18 @@ type dataSyncService struct { flushChan chan *flushMsg replica Replica idAllocator allocator + msFactory msgstream.Factory } func newDataSyncService(ctx context.Context, flushChan chan *flushMsg, - replica Replica, alloc allocator) *dataSyncService { + replica Replica, alloc allocator, factory msgstream.Factory) *dataSyncService { service := &dataSyncService{ ctx: ctx, fg: nil, flushChan: flushChan, replica: replica, idAllocator: alloc, + msFactory: factory, } return service } @@ -65,12 +68,21 @@ func (dsService *dataSyncService) initNodes() { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) - var dmStreamNode Node = newDmInputNode(dsService.ctx) - var ddStreamNode Node = newDDInputNode(dsService.ctx) + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + err = dsService.msFactory.SetParams(m) + if err != nil { + panic(err) + } + + var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory) + var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory) var filterDmNode Node = newFilteredDmNode() var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator) - var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator) + var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator, dsService.msFactory) var gcNode Node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 7973aecb7afef4f11b54dfdd3dc18352958c68a0..ffa8d893e463126fd5c350a3e95e4f60f4015f94 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -39,7 +39,13 @@ func TestDataSyncService_Start(t *testing.T) { flushChan := make(chan *flushMsg, chanSize) replica := newReplica() allocFactory := AllocatorFactory{} - sync := newDataSyncService(ctx, flushChan, replica, allocFactory) + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "pulsarAddress": pulsarURL, + "receiveBufSize": 1024, + "pulsarBufSize": 1024} + err := msFactory.SetParams(m) + sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory) sync.replica.addCollection(collMeta.ID, collMeta.Schema) sync.init() go sync.start() @@ -78,15 +84,14 @@ func TestDataSyncService_Start(t *testing.T) { timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) // pulsar produce - const receiveBufSize = 1024 insertChannels := Params.InsertChannelNames ddChannels := Params.DDChannelNames - factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024) - insertStream, _ := factory.NewMsgStream(ctx) + assert.NoError(t, err) + insertStream, _ := msFactory.NewMsgStream(ctx) insertStream.AsProducer(insertChannels) - ddStream, _ := factory.NewMsgStream(ctx) + ddStream, _ := msFactory.NewMsgStream(ctx) ddStream.AsProducer(ddChannels) var insertMsgStream msgstream.MsgStream = insertStream @@ -95,7 +100,7 @@ func TestDataSyncService_Start(t *testing.T) { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err := insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(&msgPack) assert.NoError(t, err) err = insertMsgStream.Broadcast(&timeTickMsgPack) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index eac2177dfbef01b71f4d2832ae7cfffe552c6b57..66332809424ac10beed2d9d8e30f2c777779ae83 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -15,7 +15,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" @@ -621,7 +620,7 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) ( } func newInsertBufferNode(ctx context.Context, flushMeta *metaTable, - replica Replica, alloc allocator) *insertBufferNode { + replica Replica, alloc allocator, factory msgstream.Factory) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -651,8 +650,6 @@ func newInsertBufferNode(ctx context.Context, flushMeta *metaTable, } minioPrefix := Params.InsertBinlogRootPath - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - //input stream, data node time tick wTt, _ := factory.NewMsgStream(ctx) wTt.AsProducer([]string{Params.TimeTickChannelName}) diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 12c776b981dd93def9abf0f62b0dd18b62b09e1c..527f7d577bf9dd691ecb595ff09fb6899e50ae84 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -8,7 +8,9 @@ import ( "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) @@ -39,7 +41,16 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { require.NoError(t, err) idFactory := AllocatorFactory{} - iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory) + + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory, msFactory) inMsg := genInsertMsg() var iMsg flowgraph.Msg = &inMsg iBNode.Operate([]*flowgraph.Msg{&iMsg}) diff --git a/internal/datanode/flow_graph_msg_stream_input_node.go b/internal/datanode/flow_graph_msg_stream_input_node.go index 0476ce4d6bb08bcd496a51e12ea4646e96952c05..96b5c2093ce07d64e723ab75f14d46ada1fe04e0 100644 --- a/internal/datanode/flow_graph_msg_stream_input_node.go +++ b/internal/datanode/flow_graph_msg_stream_input_node.go @@ -4,18 +4,16 @@ import ( "context" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) -func newDmInputNode(ctx context.Context) *flowgraph.InputNode { +func newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism consumeChannels := Params.InsertChannelNames consumeSubName := Params.MsgChannelSubName - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) insertStream, _ := factory.NewTtMsgStream(ctx) insertStream.AsConsumer(consumeChannels, consumeSubName) @@ -24,13 +22,12 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode { return node } -func newDDInputNode(ctx context.Context) *flowgraph.InputNode { +func newDDInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism consumeSubName := Params.MsgChannelSubName - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) tmpStream, _ := factory.NewTtMsgStream(ctx) tmpStream.AsConsumer(Params.DDChannelNames, consumeSubName) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 5b792126798a4271e534e1967fb87dd4a3644580..318b104de7e8efb12912e6ca942219ca08be9463 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -17,7 +17,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -94,17 +93,19 @@ type ( ddChannelName string segmentInfoStream msgstream.MsgStream insertChannels []string + msFactory msgstream.Factory ttBarrier timesync.TimeTickBarrier } ) -func CreateServer(ctx context.Context) (*Server, error) { +func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { Params.Init() ch := make(chan struct{}) s := &Server{ ctx: ctx, registerFinishCh: ch, cluster: newDataNodeCluster(ch), + msFactory: factory, } s.insertChannels = s.getInsertChannels() s.state.Store(internalpb2.StateCode_INITIALIZING) @@ -130,6 +131,15 @@ func (s *Server) Init() error { func (s *Server) Start() error { var err error + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + err = s.msFactory.SetParams(m) + if err != nil { + return err + } + s.allocator = newAllocatorImpl(s.masterClient) if err = s.initMeta(); err != nil { return err @@ -171,23 +181,21 @@ func (s *Server) initMeta() error { } func (s *Server) initSegmentInfoChannel() { - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - segmentInfoStream, _ := factory.NewMsgStream(s.ctx) + segmentInfoStream, _ := s.msFactory.NewMsgStream(s.ctx) segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName}) s.segmentInfoStream = segmentInfoStream s.segmentInfoStream.Start() } func (s *Server) initMsgProducer() error { var err error - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil { + if s.ttMsgStream, err = s.msFactory.NewTtMsgStream(s.ctx); err != nil { return err } s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName) s.ttMsgStream.Start() s.ttBarrier = timesync.NewHardTimeTickBarrier(s.ctx, s.ttMsgStream, s.cluster.GetNodeIDs()) s.ttBarrier.Start() - if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil { + if s.k2sMsgStream, err = s.msFactory.NewMsgStream(s.ctx); err != nil { return err } s.k2sMsgStream.AsProducer(Params.K2SChannelNames) @@ -308,8 +316,7 @@ func (s *Server) startServerLoop() { func (s *Server) startStatsChannel(ctx context.Context) { defer s.serverLoopWg.Done() - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - statsStream, _ := factory.NewMsgStream(ctx) + statsStream, _ := s.msFactory.NewMsgStream(ctx) statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName) statsStream.Start() defer statsStream.Close() @@ -334,8 +341,7 @@ func (s *Server) startStatsChannel(ctx context.Context) { func (s *Server) startSegmentFlushChannel(ctx context.Context) { defer s.serverLoopWg.Done() - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - flushStream, _ := factory.NewMsgStream(ctx) + flushStream, _ := s.msFactory.NewMsgStream(ctx) flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName) flushStream.Start() defer flushStream.Close() @@ -370,8 +376,7 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { func (s *Server) startDDChannel(ctx context.Context) { defer s.serverLoopWg.Done() - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - ddStream, _ := factory.NewMsgStream(ctx) + ddStream, _ := s.msFactory.NewMsgStream(ctx) ddStream.AsConsumer([]string{s.ddChannelName}, Params.DataServiceSubscriptionName) ddStream.Start() defer ddStream.Close() @@ -603,7 +608,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha Segment: segmentInfo, }, } - msgPack := &pulsarms.MsgPack{ + msgPack := &msgstream.MsgPack{ Msgs: []msgstream.TsMsg{infoMsg}, } if err = s.segmentInfoStream.Produce(msgPack); err != nil { diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index aa3a06717a5326261e6b74e4e6cfb605bd84ae94..080fc27eb19122898f74657740f4907d0b5b643a 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -8,6 +8,7 @@ import ( dn "github.com/zilliztech/milvus-distributed/internal/datanode" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -24,16 +25,19 @@ type Server struct { ctx context.Context cancel context.CancelFunc + + msFactory msgstream.Factory } -func New(ctx context.Context) (*Server, error) { +func New(ctx context.Context, factory msgstream.Factory) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) var s = &Server{ - ctx: ctx1, - cancel: cancel, + ctx: ctx1, + cancel: cancel, + msFactory: factory, } - s.core = dn.NewDataNode(s.ctx) + s.core = dn.NewDataNode(s.ctx, s.msFactory) s.grpcServer = grpc.NewServer() datapb.RegisterDataNodeServer(s.grpcServer, s) addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10) diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index 13a8fab20dff7ce7fa2340fcf1b5a23bc1155aca..7f3990bf1f66fb205581f4fe386422125ca7b3b6 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -13,6 +13,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -28,11 +29,11 @@ func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInf return s.server.GetSegmentInfo(request) } -func NewGrpcService(ctx context.Context) *Service { +func NewGrpcService(ctx context.Context, factory msgstream.Factory) *Service { s := &Service{} var err error s.ctx = ctx - s.server, err = dataservice.CreateServer(s.ctx) + s.server, err = dataservice.CreateServer(s.ctx, factory) if err != nil { log.Fatalf("create server error: %s", err.Error()) return nil diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 0a8e7549553b948739414fd81ed4754ca4613186..f69d7a2124ac524b77b0b24b2321043f28b5c315 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -12,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -27,7 +28,8 @@ func TestGrpcService(t *testing.T) { //cms.Params.Address = "127.0.0.1" cms.Params.Port = (randVal % 100) + 10000 - svr, err := NewGrpcServer(context.Background()) + msFactory := pulsarms.NewFactory() + svr, err := NewGrpcServer(context.Background(), msFactory) assert.Nil(t, err) // cms.Params.NodeID = 0 diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index ebd1b87d2a73b0026acd72e4b248e3047d8bbd1d..491b6c5bc7d163684c937040c1cd44d50ef5ad52 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -8,6 +8,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -26,13 +27,14 @@ type GrpcServer struct { cancel context.CancelFunc } -func NewGrpcServer(ctx context.Context) (*GrpcServer, error) { +func NewGrpcServer(ctx context.Context, factory msgstream.Factory) (*GrpcServer, error) { s := &GrpcServer{} var err error s.ctx, s.cancel = context.WithCancel(ctx) - if s.core, err = cms.NewCore(s.ctx); err != nil { + if s.core, err = cms.NewCore(s.ctx, factory); err != nil { return nil, err } + s.grpcServer = grpc.NewServer() s.grpcError = nil masterpb.RegisterMasterServiceServer(s.grpcServer, s) diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index d69f30782111cfcc5505c549f9fd5b21edff3e34..5963c568f10f158296e0ef39c33dfe818aee8d92 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -11,9 +11,9 @@ import ( grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" - "github.com/zilliztech/milvus-distributed/internal/util/funcutil" - + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" "google.golang.org/grpc" @@ -48,7 +48,7 @@ type Server struct { indexServiceClient *grpcindexserviceclient.Client } -func NewServer(ctx context.Context) (*Server, error) { +func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { server := &Server{ ctx: ctx, @@ -56,7 +56,7 @@ func NewServer(ctx context.Context) (*Server, error) { } var err error - server.impl, err = proxynode.NewProxyNodeImpl(server.ctx) + server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory) if err != nil { return nil, err } diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 19fe18ecde050419c668301d50c905517e9713bf..e40f5c9746add7c05bfbb4588c1943b3724a32d1 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -27,7 +28,7 @@ type Server struct { impl *proxyservice.ServiceImpl } -func NewServer(ctx1 context.Context) (*Server, error) { +func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) { ctx, cancel := context.WithCancel(ctx1) server := &Server{ @@ -37,7 +38,7 @@ func NewServer(ctx1 context.Context) (*Server, error) { } var err error - server.impl, err = proxyservice.NewServiceImpl(server.ctx) + server.impl, err = proxyservice.NewServiceImpl(server.ctx, factory) if err != nil { return nil, err } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 0a0f6e6152b96eb7568b1cb93c70d5be43703230..2d0d1249c6d6eceb01ae53547aef8ea9e31b643c 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" @@ -26,10 +27,10 @@ type Server struct { cancel context.CancelFunc } -func NewServer(ctx context.Context) (*Server, error) { +func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { s := &Server{ ctx: ctx, - node: qn.NewQueryNodeWithoutID(ctx), + node: qn.NewQueryNodeWithoutID(ctx, factory), } qn.Params.Init() diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 0fc2208c78bb379a3158c75712a4f3ab24f1fd45..bc2ae5b19acb0b814b413246092a561377a42bf3 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -25,11 +26,13 @@ type Server struct { loopCancel context.CancelFunc queryService *qs.QueryService + + msFactory msgstream.Factory } -func NewServer(ctx context.Context) (*Server, error) { +func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) - service, err := qs.NewQueryService(ctx1) + service, err := qs.NewQueryService(ctx1, factory) if err != nil { cancel() return nil, err @@ -39,6 +42,7 @@ func NewServer(ctx context.Context) (*Server, error) { queryService: service, loopCtx: ctx1, loopCancel: cancel, + msFactory: factory, }, nil } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 2862e77df53800a69faccd79ded80ff0727cb734..02095d5922fb0c9398f914808ad8ea29a04467f3 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -12,7 +12,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" @@ -181,17 +180,20 @@ type Core struct { initOnce sync.Once startOnce sync.Once isInit atomic.Value + + msFactory ms.Factory } // --------------------- function -------------------------- -func NewCore(c context.Context) (*Core, error) { +func NewCore(c context.Context, factory ms.Factory) (*Core, error) { ctx, cancel := context.WithCancel(c) rand.Seed(time.Now().UnixNano()) Params.Init() core := &Core{ - ctx: ctx, - cancel: cancel, + ctx: ctx, + cancel: cancel, + msFactory: factory, } core.stateCode.Store(internalpb2.StateCode_INITIALIZING) core.isInit.Store(false) @@ -414,7 +416,6 @@ func (c *Core) tsLoop() { } } func (c *Core) setMsgStreams() error { - if Params.PulsarAddress == "" { return errors.Errorf("PulsarAddress is empty") } @@ -427,8 +428,17 @@ func (c *Core) setMsgStreams() error { return errors.Errorf("ProxyTimeTickChannel is empty") } - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - proxyTimeTickStream, _ := factory.NewMsgStream(c.ctx) + var err error + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + err = c.msFactory.SetParams(m) + if err != nil { + return err + } + + proxyTimeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) proxyTimeTickStream.AsConsumer([]string{Params.ProxyTimeTickChannel}, Params.MsgChannelSubName) proxyTimeTickStream.Start() @@ -436,14 +446,14 @@ func (c *Core) setMsgStreams() error { if Params.TimeTickChannel == "" { return errors.Errorf("TimeTickChannel is empty") } - timeTickStream, _ := factory.NewMsgStream(c.ctx) + timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) timeTickStream.AsProducer([]string{Params.TimeTickChannel}) // master dd channel if Params.DdChannel == "" { return errors.Errorf("DdChannel is empty") } - ddStream, _ := factory.NewMsgStream(c.ctx) + ddStream, _ := c.msFactory.NewMsgStream(c.ctx) ddStream.AsProducer([]string{Params.DdChannel}) c.SendTimeTick = func(t typeutil.Timestamp) error { @@ -577,7 +587,7 @@ func (c *Core) setMsgStreams() error { if Params.DataServiceSegmentChannel == "" { return errors.Errorf("DataServiceSegmentChannel is empty") } - dataServiceStream, _ := factory.NewMsgStream(c.ctx) + dataServiceStream, _ := c.msFactory.NewMsgStream(c.ctx) dataServiceStream.AsConsumer([]string{Params.DataServiceSegmentChannel}, Params.MsgChannelSubName) dataServiceStream.Start() c.DataServiceSegmentChan = make(chan *datapb.SegmentInfo, 1024) diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index de1bebbae80df0c7a00dd6f27652032fed684fb8..7c1210a8cb33d83e6f9505d95a7d4250cd144981 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -148,7 +148,8 @@ func TestMasterService(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - core, err := NewCore(ctx) + msFactory := pulsarms.NewFactory() + core, err := NewCore(ctx, msFactory) assert.Nil(t, err) randVal := rand.Int() @@ -192,18 +193,24 @@ func TestMasterService(t *testing.T) { err = core.Start() assert.Nil(t, err) - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - proxyTimeTickStream, _ := factory.NewMsgStream(ctx) + m := map[string]interface{}{ + "receiveBufSize": 1024, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + proxyTimeTickStream, _ := msFactory.NewMsgStream(ctx) proxyTimeTickStream.AsProducer([]string{Params.ProxyTimeTickChannel}) - dataServiceSegmentStream, _ := factory.NewMsgStream(ctx) + dataServiceSegmentStream, _ := msFactory.NewMsgStream(ctx) dataServiceSegmentStream.AsProducer([]string{Params.DataServiceSegmentChannel}) - timeTickStream, _ := factory.NewMsgStream(ctx) + timeTickStream, _ := msFactory.NewMsgStream(ctx) timeTickStream.AsConsumer([]string{Params.TimeTickChannel}, Params.MsgChannelSubName) timeTickStream.Start() - ddStream, _ := factory.NewMsgStream(ctx) + ddStream, _ := msFactory.NewMsgStream(ctx) ddStream.AsConsumer([]string{Params.DdChannel}, Params.MsgChannelSubName) ddStream.Start() diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index e62e0059590b891710797941d09ba96fd6ad62f2..a56ce74633f98eb8c3c6eb109e9b15c748f96649 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -37,6 +37,7 @@ type MsgStream interface { } type Factory interface { + SetParams(params map[string]interface{}) error NewMsgStream(ctx context.Context) (MsgStream, error) NewTtMsgStream(ctx context.Context) (MsgStream, error) } diff --git a/internal/msgstream/pulsarms/factory.go b/internal/msgstream/pulsarms/factory.go index 8e10c2e3f01ffdbb7d93feff44ab44245972df11..8f5da7e0481fb44365e3b1892346e41c00e66158 100644 --- a/internal/msgstream/pulsarms/factory.go +++ b/internal/msgstream/pulsarms/factory.go @@ -3,30 +3,39 @@ package pulsarms import ( "context" + "github.com/mitchellh/mapstructure" "github.com/zilliztech/milvus-distributed/internal/msgstream" ) type Factory struct { dispatcherFactory msgstream.ProtoUDFactory - address string - receiveBufSize int64 - pulsarBufSize int64 + // the following members must be public, so that mapstructure.Decode() can access them + PulsarAddress string + ReceiveBufSize int64 + PulsarBufSize int64 +} + +func (f *Factory) SetParams(params map[string]interface{}) error { + err := mapstructure.Decode(params, f) + if err != nil { + return err + } + return nil } func (f *Factory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return newPulsarMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher()) + return newPulsarMsgStream(ctx, f.PulsarAddress, f.ReceiveBufSize, f.PulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher()) } func (f *Factory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return NewPulsarTtMsgStream(ctx, f.address, f.receiveBufSize, f.pulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher()) + return newPulsarTtMsgStream(ctx, f.PulsarAddress, f.ReceiveBufSize, f.PulsarBufSize, f.dispatcherFactory.NewUnmarshalDispatcher()) } -func NewFactory(address string, receiveBufSize int64, pulsarBufSize int64) *Factory { +func NewFactory() msgstream.Factory { f := &Factory{ dispatcherFactory: msgstream.ProtoUDFactory{}, - address: address, - receiveBufSize: receiveBufSize, - pulsarBufSize: pulsarBufSize, + ReceiveBufSize: 64, + PulsarBufSize: 64, } return f } diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index 9e5ada5607477216e33bfb1898c66e386a9fa30c..f8dbe5dced4ca5313d008cd2356d553248caf6bb 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -409,7 +409,7 @@ type PulsarTtMsgStream struct { lastTimeStamp Timestamp } -func NewPulsarTtMsgStream(ctx context.Context, +func newPulsarTtMsgStream(ctx context.Context, address string, receiveBufSize int64, pulsarBufSize int64, diff --git a/internal/msgstream/pulsarms/pulsar_msgstream_test.go b/internal/msgstream/pulsarms/pulsar_msgstream_test.go index b406cbb7429332845d8e6220ffe9a6c27d27b0bb..255aaf749033ded47dab09f0c2253acc59cc4b07 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream_test.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream_test.go @@ -210,7 +210,7 @@ func initPulsarTtStream(pulsarAddress string, var input msgstream.MsgStream = inputStream // set output stream - outputStream, _ := NewPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher()) + outputStream, _ := newPulsarTtMsgStream(context.Background(), pulsarAddress, 100, 100, factory.NewUnmarshalDispatcher()) outputStream.AsConsumer(consumerChannels, consumerSubName) outputStream.Start() var output msgstream.MsgStream = outputStream diff --git a/internal/msgstream/rmqms/factory.go b/internal/msgstream/rmqms/factory.go index 0da978f1ca10dd38e1e0ec14bc6cb4d657c678e8..31f2d730a4f0e1065a9ace4e0c4237fafeafb613 100644 --- a/internal/msgstream/rmqms/factory.go +++ b/internal/msgstream/rmqms/factory.go @@ -3,23 +3,39 @@ package rmqms import ( "context" + "github.com/mitchellh/mapstructure" + "github.com/zilliztech/milvus-distributed/internal/msgstream" ) type Factory struct { dispatcherFactory msgstream.ProtoUDFactory - receiveBufSize int64 - rmqBufSize int64 + // the following members must be public, so that mapstructure.Decode() can access them + ReceiveBufSize int64 + RmqBufSize int64 +} + +func (f *Factory) SetParams(params map[string]interface{}) error { + err := mapstructure.Decode(params, f) + if err != nil { + return err + } + return nil } func (f *Factory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) { - return newRmqMsgStream(ctx, f.receiveBufSize, f.rmqBufSize, f.dispatcherFactory.NewUnmarshalDispatcher()) + return newRmqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, f.dispatcherFactory.NewUnmarshalDispatcher()) +} + +func (f *Factory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) { + return newRmqTtMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, f.dispatcherFactory.NewUnmarshalDispatcher()) } -func NewFactory(address string, receiveBufSize int64, pulsarBufSize int64) *Factory { +func NewFactory() msgstream.Factory { f := &Factory{ dispatcherFactory: msgstream.ProtoUDFactory{}, - receiveBufSize: receiveBufSize, + ReceiveBufSize: 64, + RmqBufSize: 64, } return f } diff --git a/internal/msgstream/rmqms/rmq_msgstream.go b/internal/msgstream/rmqms/rmq_msgstream.go index 6bcaa945855bb792e81f5d1704d982272dd724be..5bfbfbdd0cd006764a9acc3f6cc000989efb6f67 100644 --- a/internal/msgstream/rmqms/rmq_msgstream.go +++ b/internal/msgstream/rmqms/rmq_msgstream.go @@ -350,7 +350,7 @@ type RmqTtMsgStream struct { lastTimeStamp Timestamp } -func NewRmqTtMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64, +func newRmqTtMsgStream(ctx context.Context, receiveBufSize int64, rmqBufSize int64, unmarshal msgstream.UnmarshalDispatcher) (*RmqTtMsgStream, error) { rmqMsgStream, err := newRmqMsgStream(ctx, receiveBufSize, rmqBufSize, unmarshal) if err != nil { diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go index 3aa1e08e071c67b4309456ca439a996e0e2a8db4..ef01ece4a0cdf617c35cbc971404811acc40b648 100644 --- a/internal/proxynode/insert_channels.go +++ b/internal/proxynode/insert_channels.go @@ -8,8 +8,6 @@ import ( "strconv" "sync" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -78,6 +76,7 @@ type InsertChannelsMap struct { usageHistogram []int // message stream can be closed only when the use count is zero mtx sync.RWMutex nodeInstance *NodeImpl + msFactory msgstream.Factory } func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []string) error { @@ -101,8 +100,7 @@ func (m *InsertChannelsMap) createInsertMsgStream(collID UniqueID, channels []st m.insertChannels = append(m.insertChannels, channels) m.collectionID2InsertChannels[collID] = len(m.insertChannels) - 1 - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamInsertBufSize, 1024) - stream, _ := factory.NewMsgStream(context.Background()) + stream, _ := m.msFactory.NewMsgStream(context.Background()) stream.AsProducer(channels) repack := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { return insertRepackFunc(tsMsgs, hashKeys, m.nodeInstance.segAssigner, true) @@ -198,6 +196,7 @@ func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap { droppedBitMap: make([]int, 0), usageHistogram: make([]int, 0), nodeInstance: node, + msFactory: node.msFactory, } } diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 850027122d59afa0a65da21ee7b3f303ab47b028..2408eb0e63f65bdf980aca9d106b4f1a79a2a2a5 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -15,8 +15,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/allocator" @@ -53,6 +51,7 @@ type NodeImpl struct { manipulationMsgStream msgstream.MsgStream queryMsgStream msgstream.MsgStream + msFactory msgstream.Factory closer io.Closer @@ -61,12 +60,13 @@ type NodeImpl struct { closeCallbacks []func() } -func NewProxyNodeImpl(ctx context.Context) (*NodeImpl, error) { +func NewProxyNodeImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) node := &NodeImpl{ - ctx: ctx1, - cancel: cancel, + ctx: ctx1, + cancel: cancel, + msFactory: factory, } return node, nil @@ -102,7 +102,6 @@ func (node *NodeImpl) waitForServiceReady(service Component, serviceName string) } func (node *NodeImpl) Init() error { - // todo wait for proxyservice state changed to Healthy err := node.waitForServiceReady(node.proxyServiceClient, "ProxyService") @@ -131,8 +130,6 @@ func (node *NodeImpl) Init() error { return err } - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchBufSize, 1024) - // wait for dataservice state changed to Healthy if node.dataServiceClient != nil { err = node.waitForServiceReady(node.dataServiceClient, "DataService") @@ -179,7 +176,16 @@ func (node *NodeImpl) Init() error { // return err //} - node.queryMsgStream, _ = factory.NewMsgStream(node.ctx) + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": Params.MsgStreamSearchBufSize, + "PulsarBufSize": 1024} + err = node.msFactory.SetParams(m) + if err != nil { + return err + } + + node.queryMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) node.queryMsgStream.AsProducer(Params.SearchChannelNames) log.Println("create query message stream ...") @@ -206,7 +212,7 @@ func (node *NodeImpl) Init() error { node.segAssigner = segAssigner node.segAssigner.PeerID = Params.ProxyID - node.manipulationMsgStream, _ = factory.NewMsgStream(node.ctx) + node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) node.manipulationMsgStream.AsProducer(Params.InsertChannelNames) repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true) @@ -214,12 +220,12 @@ func (node *NodeImpl) Init() error { node.manipulationMsgStream.SetRepackFunc(repackFuncImpl) log.Println("create manipulation message stream ...") - node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator) + node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory) if err != nil { return err } - node.tick = newTimeTick(node.ctx, node.tsoAllocator, time.Millisecond*200, node.sched.TaskDoneTest) + node.tick = newTimeTick(node.ctx, node.tsoAllocator, time.Millisecond*200, node.sched.TaskDoneTest, node.msFactory) return nil } diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 24cbc57717a7c9cf5d2169109b3e18782802a413..fdfab974bd34efafcff1da5f8f107253d8f8c2d9 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -13,7 +13,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) type TaskQueue interface { @@ -247,17 +246,21 @@ type TaskScheduler struct { wg sync.WaitGroup ctx context.Context cancel context.CancelFunc + + msFactory msgstream.Factory } func NewTaskScheduler(ctx context.Context, idAllocator *allocator.IDAllocator, - tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) { + tsoAllocator *allocator.TimestampAllocator, + factory msgstream.Factory) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ idAllocator: idAllocator, tsoAllocator: tsoAllocator, ctx: ctx1, cancel: cancel, + msFactory: factory, } s.DdQueue = NewDdTaskQueue(s) s.DmQueue = NewDmTaskQueue(s) @@ -371,9 +374,8 @@ func (sched *TaskScheduler) queryLoop() { func (sched *TaskScheduler) queryResultLoop() { defer sched.wg.Done() - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamSearchResultBufSize, 1024) - queryResultMsgStream, _ := factory.NewMsgStream(sched.ctx) + queryResultMsgStream, _ := sched.msFactory.NewMsgStream(sched.ctx) queryResultMsgStream.AsConsumer(Params.SearchResultChannelNames, Params.ProxySubName) queryNodeNum := Params.QueryNodeNum diff --git a/internal/proxynode/timetick.go b/internal/proxynode/timetick.go index 30861feb2b9af4b6b2c0b09e5cbd5bc02cbee856..1e76fcf6ec17e0ff169d6c2eaa177b959e250a3a 100644 --- a/internal/proxynode/timetick.go +++ b/internal/proxynode/timetick.go @@ -13,7 +13,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) type tickCheckFunc = func(Timestamp) bool @@ -27,6 +26,7 @@ type timeTick struct { tsoAllocator *allocator.TimestampAllocator tickMsgStream msgstream.MsgStream + msFactory msgstream.Factory peerID UniqueID wg sync.WaitGroup @@ -40,7 +40,8 @@ type timeTick struct { func newTimeTick(ctx context.Context, tsoAllocator *allocator.TimestampAllocator, interval time.Duration, - checkFunc tickCheckFunc) *timeTick { + checkFunc tickCheckFunc, + factory msgstream.Factory) *timeTick { ctx1, cancel := context.WithCancel(ctx) t := &timeTick{ ctx: ctx1, @@ -49,10 +50,10 @@ func newTimeTick(ctx context.Context, interval: interval, peerID: Params.ProxyID, checkFunc: checkFunc, + msFactory: factory, } - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.MsgStreamTimeTickBufSize, 1024) - t.tickMsgStream, _ = factory.NewMsgStream(t.ctx) + t.tickMsgStream, _ = t.msFactory.NewMsgStream(t.ctx) t.tickMsgStream.AsProducer(Params.ProxyTimeTickChannelNames) return t } diff --git a/internal/proxynode/timetick_test.go b/internal/proxynode/timetick_test.go index 6520a99fcf15911f378fcb0882a7c886debdb3cc..af73f5decd7f160a438e98f06f2ab2adfb48b6a0 100644 --- a/internal/proxynode/timetick_test.go +++ b/internal/proxynode/timetick_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" ) var trueCnt = 0 @@ -34,7 +35,8 @@ func TestTimeTick_Start2(t *testing.T) { err = tsoAllocator.Start() assert.Nil(t, err) - tt := newTimeTick(ctx, tsoAllocator, Params.TimeTickInterval, checkFunc) + msFactory := pulsarms.NewFactory() + tt := newTimeTick(ctx, tsoAllocator, Params.TimeTickInterval, checkFunc, msFactory) defer func() { cancel() diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index 379694f3af853fba59e739984caa8a34d448c11f..6a70511d8b00f44394e1e2bc93a300963d111b69 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -10,8 +10,6 @@ import ( "strconv" "time" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -95,15 +93,22 @@ func (s *ServiceImpl) fillNodeInitParams() error { } func (s *ServiceImpl) Init() error { - factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - err := s.fillNodeInitParams() if err != nil { return err } log.Println("fill node init params ...") - serviceTimeTickMsgStream, _ := factory.NewTtMsgStream(s.ctx) + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + err = s.msFactory.SetParams(m) + if err != nil { + return err + } + + serviceTimeTickMsgStream, _ := s.msFactory.NewTtMsgStream(s.ctx) serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel}) log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel}) @@ -112,11 +117,11 @@ func (s *ServiceImpl) Init() error { for ; i < Params.InsertChannelNum; i++ { channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10) } - insertTickMsgStream, _ := factory.NewMsgStream(s.ctx) + insertTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) insertTickMsgStream.AsProducer(channels) log.Println("create insert time tick producer channel: ", channels) - nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx) + nodeTimeTickMsgStream, _ := s.msFactory.NewMsgStream(s.ctx) nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel, "proxyservicesub") // TODO: add config log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel) diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go index e6e48337bbde93cee7c15b4b727798a636467efb..66cc49f7136d7761efe833850f2fcd01af4dbb78 100644 --- a/internal/proxyservice/proxyservice.go +++ b/internal/proxyservice/proxyservice.go @@ -7,8 +7,8 @@ import ( "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) @@ -26,14 +26,17 @@ type ServiceImpl struct { ctx context.Context cancel context.CancelFunc + + msFactory msgstream.Factory } -func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) { +func NewServiceImpl(ctx context.Context, factory msgstream.Factory) (*ServiceImpl, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) s := &ServiceImpl{ - ctx: ctx1, - cancel: cancel, + ctx: ctx1, + cancel: cancel, + msFactory: factory, } s.allocator = NewNodeIDAllocator() diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 04856af94db377c577b82f384fd12985c3df53d2..e0ebdc83f6a35c52d370f0823f9989bdb63d1e94 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -12,18 +12,19 @@ type dataSyncService struct { ctx context.Context fg *flowgraph.TimeTickedFlowGraph - dmStream msgstream.MsgStream - ddStream msgstream.MsgStream + dmStream msgstream.MsgStream + ddStream msgstream.MsgStream + msFactory msgstream.Factory replica collectionReplica } -func newDataSyncService(ctx context.Context, replica collectionReplica) *dataSyncService { +func newDataSyncService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *dataSyncService { service := &dataSyncService{ - ctx: ctx, - fg: nil, - - replica: replica, + ctx: ctx, + fg: nil, + replica: replica, + msFactory: factory, } service.initNodes() @@ -52,7 +53,7 @@ func (dsService *dataSyncService) initNodes() { var ddNode node = newDDNode(dsService.replica) var insertNode node = newInsertNode(dsService.replica) - var serviceTimeNode node = newServiceTimeNode(dsService.ctx, dsService.replica) + var serviceTimeNode node = newServiceTimeNode(dsService.ctx, dsService.replica, dsService.msFactory) var gcNode node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index dc961aa876567a5ac4b13b0809a28d289e87ade5..7b3ef328897a1399447d177c70aeaaa32b0f8caa 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -109,11 +109,18 @@ func TestDataSyncService_Start(t *testing.T) { ddChannels := Params.DDChannelNames pulsarURL := Params.PulsarAddress - factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024) - insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "receiveBufSize": receiveBufSize, + "pulsarAddress": pulsarURL, + "pulsarBufSize": 1024} + err := msFactory.SetParams(m) + assert.Nil(t, err) + + insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) insertStream.AsProducer(insertChannels) - ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + ddStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) ddStream.AsProducer(ddChannels) var insertMsgStream msgstream.MsgStream = insertStream @@ -122,7 +129,7 @@ func TestDataSyncService_Start(t *testing.T) { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err := insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(&msgPack) assert.NoError(t, err) err = insertMsgStream.Broadcast(&timeTickMsgPack) @@ -131,7 +138,7 @@ func TestDataSyncService_Start(t *testing.T) { assert.NoError(t, err) // dataSync - node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) + node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory) go node.dataSyncService.start() <-node.queryNodeLoopCtx.Done() diff --git a/internal/querynode/flow_graph_msg_stream_input_nodes.go b/internal/querynode/flow_graph_msg_stream_input_nodes.go index 1816d0e8360594f8eba0f3e401cf8d020bbb0aa9..d91a302a70cd3d14aa6f91d28d0df7cb5a41f9a2 100644 --- a/internal/querynode/flow_graph_msg_stream_input_nodes.go +++ b/internal/querynode/flow_graph_msg_stream_input_nodes.go @@ -3,15 +3,12 @@ package querynode import ( "context" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" ) func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode { - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.InsertReceiveBufSize, Params.InsertPulsarBufSize) - // query node doesn't need to consume any topic - insertStream, _ := factory.NewTtMsgStream(ctx) + insertStream, _ := dsService.msFactory.NewTtMsgStream(ctx) dsService.dmStream = insertStream maxQueueLength := Params.FlowGraphMaxQueueLength @@ -22,12 +19,10 @@ func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph } func (dsService *dataSyncService) newDDInputNode(ctx context.Context) *flowgraph.InputNode { - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.DDReceiveBufSize, Params.DDPulsarBufSize) - consumeChannels := Params.DDChannelNames consumeSubName := Params.MsgChannelSubName - ddStream, _ := factory.NewTtMsgStream(ctx) + ddStream, _ := dsService.msFactory.NewTtMsgStream(ctx) ddStream.AsConsumer(consumeChannels, consumeSubName) dsService.ddStream = ddStream diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 651be6fbc5527797ec1a4f31763171e84909fcea..85e9d8767c410621a76b1dc226cd8c04de2d917d 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -5,7 +5,6 @@ import ( "log" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) @@ -70,7 +69,7 @@ func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error { return stNode.timeTickMsgStream.Produce(&msgPack) } -func newServiceTimeNode(ctx context.Context, replica collectionReplica) *serviceTimeNode { +func newServiceTimeNode(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *serviceTimeNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -78,7 +77,6 @@ func newServiceTimeNode(ctx context.Context, replica collectionReplica) *service baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.SearchReceiveBufSize, 1024) timeTimeMsgStream, _ := factory.NewMsgStream(ctx) timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName}) diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 247c76b3c4a72bec20c18d56341d82f67e9c4c55..a8f2145daa43f293eb924b7dfb93ac9900de0bad 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -1014,14 +1014,22 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, const receiveBufSize = 1024 insertChannels := Params.InsertChannelNames ddChannels := Params.DDChannelNames - pulsarURL := Params.PulsarAddress - factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024) - insertStream, _ := factory.NewMsgStream(ctx) + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "receiveBufSize": receiveBufSize, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err := msFactory.SetParams(m) + if err != nil { + return err + } + + insertStream, _ := msFactory.NewMsgStream(ctx) insertStream.AsProducer(insertChannels) insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName) - ddStream, _ := factory.NewMsgStream(ctx) + ddStream, _ := msFactory.NewMsgStream(ctx) ddStream.AsProducer(ddChannels) var insertMsgStream msgstream.MsgStream = insertStream @@ -1030,7 +1038,7 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID, var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err := insertMsgStream.Produce(&msgPack) + err = insertMsgStream.Produce(&msgPack) if err != nil { return err } @@ -1072,14 +1080,22 @@ func sentTimeTick(ctx context.Context) error { const receiveBufSize = 1024 insertChannels := Params.InsertChannelNames ddChannels := Params.DDChannelNames - pulsarURL := Params.PulsarAddress - factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024) - insertStream, _ := factory.NewMsgStream(ctx) + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "receiveBufSize": receiveBufSize, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err := msFactory.SetParams(m) + if err != nil { + return err + } + + insertStream, _ := msFactory.NewMsgStream(ctx) insertStream.AsProducer(insertChannels) insertStream.AsConsumer(insertChannels, Params.MsgChannelSubName) - ddStream, _ := factory.NewMsgStream(ctx) + ddStream, _ := msFactory.NewMsgStream(ctx) ddStream.AsProducer(ddChannels) var insertMsgStream msgstream.MsgStream = insertStream @@ -1088,7 +1104,7 @@ func sentTimeTick(ctx context.Context) error { var ddMsgStream msgstream.MsgStream = ddStream ddMsgStream.Start() - err := insertMsgStream.Broadcast(&timeTickMsgPack) + err = insertMsgStream.Broadcast(&timeTickMsgPack) if err != nil { return err } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 2d79e8a69d04afb665a80d0279c394b85bebde0a..3dde735b810f3ddbd46ed8eb13b676764c923f39 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -16,6 +16,7 @@ import ( "context" "errors" "fmt" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "io" "log" "sync/atomic" @@ -66,9 +67,11 @@ type QueryNode struct { queryClient QueryServiceInterface indexClient IndexServiceInterface dataClient DataServiceInterface + + msFactory msgstream.Factory } -func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { +func NewQueryNode(ctx context.Context, queryNodeID uint64, factory msgstream.Factory) *QueryNode { ctx1, cancel := context.WithCancel(ctx) node := &QueryNode{ queryNodeLoopCtx: ctx1, @@ -79,6 +82,8 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { metaService: nil, searchService: nil, statsService: nil, + + msFactory: factory, } node.replica = newCollectionReplicaImpl() @@ -86,7 +91,7 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { return node } -func NewQueryNodeWithoutID(ctx context.Context) *QueryNode { +func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode { ctx1, cancel := context.WithCancel(ctx) node := &QueryNode{ queryNodeLoopCtx: ctx1, @@ -96,6 +101,8 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode { metaService: nil, searchService: nil, statsService: nil, + + msFactory: factory, } node.replica = newCollectionReplicaImpl() @@ -143,12 +150,23 @@ func (node *QueryNode) Init() error { } func (node *QueryNode) Start() error { + var err error + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + err = node.msFactory.SetParams(m) + if err != nil { + return err + } + // init services and manager - node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) - node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) + node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory) + node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory) //node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) + node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory) // start services go node.dataSyncService.start() diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 939ed0917b49b7982e21e24ce480d4abe743e788..02cb89f5fdb77013afcb66003155a989545b332c 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -133,7 +134,8 @@ func newQueryNodeMock() *QueryNode { }() } - svr := NewQueryNode(ctx, 0) + msFactory := pulsarms.NewFactory() + svr := NewQueryNode(ctx, 0, msFactory) err := svr.SetQueryService(&queryServiceMock{}) if err != nil { panic(err) diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index f995935f7d90f639413b391651b12312dc330c46..f6e55192ce8853e91ecb3b6e722b999cc2da28cd 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -12,7 +12,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -38,13 +37,8 @@ type searchService struct { type ResultEntityIds []UniqueID -func newSearchService(ctx context.Context, replica collectionReplica) *searchService { +func newSearchService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *searchService { receiveBufSize := Params.SearchReceiveBufSize - pulsarBufSize := Params.SearchPulsarBufSize - - msgStreamURL := Params.PulsarAddress - - factory := pulsarms.NewFactory(msgStreamURL, receiveBufSize, pulsarBufSize) consumeChannels := Params.SearchChannelNames consumeSubName := Params.MsgChannelSubName diff --git a/internal/querynode/search_service_test.go b/internal/querynode/search_service_test.go index ca33a0ffc21de69904e0d58a4e2c22d5ea092514..1765e346493a9375f00ac39c17e3a5856ff1b6c4 100644 --- a/internal/querynode/search_service_test.go +++ b/internal/querynode/search_service_test.go @@ -93,14 +93,21 @@ func TestSearch_Search(t *testing.T) { msgPackSearch := msgstream.MsgPack{} msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg) - factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024) - searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "receiveBufSize": receiveBufSize, + "pulsarAddress": pulsarURL, + "pulsarBufSize": 1024} + err = msFactory.SetParams(m) + assert.Nil(t, err) + + searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) searchStream.AsProducer(searchProducerChannels) searchStream.Start() err = searchStream.Produce(&msgPackSearch) assert.NoError(t, err) - node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) + node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory) go node.searchService.start() // start insert @@ -179,10 +186,10 @@ func TestSearch_Search(t *testing.T) { insertChannels := Params.InsertChannelNames ddChannels := Params.DDChannelNames - insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) insertStream.AsProducer(insertChannels) - ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + ddStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) ddStream.AsProducer(ddChannels) var insertMsgStream msgstream.MsgStream = insertStream @@ -200,7 +207,7 @@ func TestSearch_Search(t *testing.T) { assert.NoError(t, err) // dataSync - node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) + node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory) go node.dataSyncService.start() time.Sleep(1 * time.Second) @@ -209,14 +216,22 @@ func TestSearch_Search(t *testing.T) { } func TestSearch_SearchMultiSegments(t *testing.T) { - node := NewQueryNode(context.Background(), 0) - initTestMeta(t, node, 0, 0) - pulsarURL := Params.PulsarAddress + const receiveBufSize = 1024 + + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "receiveBufSize": receiveBufSize, + "pulsarAddress": pulsarURL, + "pulsarBufSize": 1024} + err := msFactory.SetParams(m) + assert.Nil(t, err) + + node := NewQueryNode(context.Background(), 0, msFactory) + initTestMeta(t, node, 0, 0) // test data generate const msgLength = 10 - const receiveBufSize = 1024 const DIM = 16 searchProducerChannels := Params.SearchChannelNames var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} @@ -283,14 +298,13 @@ func TestSearch_SearchMultiSegments(t *testing.T) { msgPackSearch := msgstream.MsgPack{} msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg) - factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024) - searchStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) searchStream.AsProducer(searchProducerChannels) searchStream.Start() err = searchStream.Produce(&msgPackSearch) assert.NoError(t, err) - node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) + node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory) go node.searchService.start() // start insert @@ -373,10 +387,10 @@ func TestSearch_SearchMultiSegments(t *testing.T) { insertChannels := Params.InsertChannelNames ddChannels := Params.DDChannelNames - insertStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + insertStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) insertStream.AsProducer(insertChannels) - ddStream, _ := factory.NewMsgStream(node.queryNodeLoopCtx) + ddStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx) ddStream.AsProducer(ddChannels) var insertMsgStream msgstream.MsgStream = insertStream @@ -394,7 +408,7 @@ func TestSearch_SearchMultiSegments(t *testing.T) { assert.NoError(t, err) // dataSync - node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) + node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, msFactory) go node.dataSyncService.start() time.Sleep(1 * time.Second) diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index 9608128e80987907d103e66471e47a69ea05c358..cf6db549ff410ec11fccfeaae5d7b1c9a2aea08b 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -8,7 +8,6 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) @@ -20,9 +19,10 @@ type statsService struct { fieldStatsChan chan []*internalpb2.FieldStats statsStream msgstream.MsgStream + msFactory msgstream.Factory } -func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb2.FieldStats) *statsService { +func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService { return &statsService{ ctx: ctx, @@ -31,6 +31,8 @@ func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsC fieldStatsChan: fieldStatsChan, statsStream: nil, + + msFactory: factory, } } @@ -40,8 +42,7 @@ func (sService *statsService) start() { // start pulsar producerChannels := []string{Params.StatsChannelName} - factory := pulsarms.NewFactory(Params.PulsarAddress, Params.StatsReceiveBufSize, 1024) - statsStream, _ := factory.NewMsgStream(sService.ctx) + statsStream, _ := sService.msFactory.NewMsgStream(sService.ctx) statsStream.AsProducer(producerChannels) var statsMsgStream msgstream.MsgStream = statsStream diff --git a/internal/querynode/stats_service_test.go b/internal/querynode/stats_service_test.go index d21230fd08c4a2d4ce10eb2ce5f7ab9efb3eba5b..1222982ffbfbc3ea393735acf9f320346f658c08 100644 --- a/internal/querynode/stats_service_test.go +++ b/internal/querynode/stats_service_test.go @@ -12,7 +12,14 @@ import ( func TestStatsService_start(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil) + + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "PulsarAddress": Params.PulsarAddress, + "ReceiveBufSize": 1024, + "PulsarBufSize": 1024} + msFactory.SetParams(m) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory) node.statsService.start() node.Stop() } @@ -26,15 +33,21 @@ func TestSegmentManagement_sendSegmentStatistic(t *testing.T) { // start pulsar producerChannels := []string{Params.StatsChannelName} - pulsarURL := Params.PulsarAddress - factory := pulsarms.NewFactory(pulsarURL, receiveBufSize, 1024) - statsStream, err := factory.NewMsgStream(node.queryNodeLoopCtx) + msFactory := pulsarms.NewFactory() + m := map[string]interface{}{ + "receiveBufSize": receiveBufSize, + "pulsarAddress": Params.PulsarAddress, + "pulsarBufSize": 1024} + err := msFactory.SetParams(m) + assert.Nil(t, err) + + statsStream, err := msFactory.NewMsgStream(node.queryNodeLoopCtx) assert.Nil(t, err) statsStream.AsProducer(producerChannels) var statsMsgStream msgstream.MsgStream = statsStream - node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil) + node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, nil, msFactory) node.statsService.statsStream = statsMsgStream node.statsService.statsStream.Start() diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index ffae64b47b836f21938c29e552e67c81f2e87f2f..60941772e27d66df07904b470616376a352b24d0 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -9,6 +9,7 @@ import ( nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -54,6 +55,8 @@ type QueryService struct { stateCode atomic.Value isInit atomic.Value enableGrpc bool + + msFactory msgstream.Factory } func (qs *QueryService) Init() error { @@ -140,7 +143,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb } node = newQueryNodeInfo(client) } else { - client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID)) + client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID), qs.msFactory) node = newQueryNodeInfo(client) } qs.queryNodes[UniqueID(allocatedID)] = node @@ -546,7 +549,7 @@ func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*queryp }, nil } -func NewQueryService(ctx context.Context) (*QueryService, error) { +func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) { nodes := make(map[UniqueID]*queryNodeInfo) ctx1, cancel := context.WithCancel(ctx) replica := newMetaReplica() @@ -558,6 +561,7 @@ func NewQueryService(ctx context.Context) (*QueryService, error) { numRegisterNode: 0, numQueryChannel: 0, enableGrpc: false, + msFactory: factory, } service.stateCode.Store(internalpb2.StateCode_INITIALIZING) service.isInit.Store(false) diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index 3c7ca361614f48dd96214b7b21ae513367e2cb04..2316d25cbb3491547f796afcae1ab59c107b953b 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -165,7 +166,8 @@ func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) (*inte } func TestQueryService_Init(t *testing.T) { - service, err := NewQueryService(context.Background()) + msFactory := pulsarms.NewFactory() + service, err := NewQueryService(context.Background(), msFactory) assert.Nil(t, err) service.Init() service.Start() @@ -193,7 +195,8 @@ func TestQueryService_Init(t *testing.T) { } func TestQueryService_load(t *testing.T) { - service, err := NewQueryService(context.Background()) + msFactory := pulsarms.NewFactory() + service, err := NewQueryService(context.Background(), msFactory) assert.Nil(t, err) service.Init() service.Start()