diff --git a/.jenkins/modules/Publish/Publish.groovy b/.jenkins/modules/Publish/Publish.groovy index 0eb2d30d401f85a1a5e74e6b87c59fbdc7dc9dc7..72cfd82da8f0cc5f326407672fb10a08a7271e86 100644 --- a/.jenkins/modules/Publish/Publish.groovy +++ b/.jenkins/modules/Publish/Publish.groovy @@ -43,8 +43,8 @@ dir ('build/docker/deploy') { sh 'docker pull registry.zilliz.com/milvus-distributed/milvus-distributed-dev:latest || true' sh 'docker pull ${SOURCE_REPO}/querynode:${SOURCE_TAG} || true' - sh 'docker-compose build --force-rm querynode' - sh 'docker-compose push querynode' + sh 'docker-compose build --force-rm querynode1' + sh 'docker-compose push querynode1' sh 'docker pull registry.zilliz.com/milvus-distributed/milvus-distributed-dev:latest || true' sh 'docker pull ${SOURCE_REPO}/datanode:${SOURCE_TAG} || true' diff --git a/.jenkins/modules/Regression/PythonRegression.groovy b/.jenkins/modules/Regression/PythonRegression.groovy index a9ed80a4e9fc0bd12af09fa1f6af1058ef11a526..31e017feb739f4548f1ebb1a559e5d6826ffd334 100644 --- a/.jenkins/modules/Regression/PythonRegression.groovy +++ b/.jenkins/modules/Regression/PythonRegression.groovy @@ -6,15 +6,15 @@ try { dir ('build/docker/deploy') { sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} pull' sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d master' - sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxyservice' - sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxynode' sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d indexservice' sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d indexnode' - sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d queryservice' + sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxyservice' sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d dataservice' - sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=1 -d querynode' - sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=2 -d querynode' + sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d queryservice' sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e DATA_NODE_ID=3 -d datanode' + sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} up -d proxynode' + sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=1 -d querynode1' + sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME} run -e QUERY_NODE_ID=2 -d querynode2' } dir ('build/docker/test') { diff --git a/build/docker/deploy/.env b/build/docker/deploy/.env index f20ec3c88b3a3027722821eabbdf800e657e040e..ccb8f4923e418b5efe0402cf6ea22204f58291a3 100644 --- a/build/docker/deploy/.env +++ b/build/docker/deploy/.env @@ -7,6 +7,10 @@ ETCD_ADDRESS=etcd:2379 MASTER_ADDRESS=master:53100 MINIO_ADDRESS=minio:9000 PROXY_NODE_HOST=proxynode +INDEX_NODE_HOST=indexnode +QUERY_NODE_HOST1=querynode1 +QUERY_NODE_HOST2=querynode2 +DATA_NODE_HOST=datanode PROXY_SERVICE_ADDRESS=proxyservice:19530 INDEX_SERVICE_ADDRESS=indexservice:31000 DATA_SERVICE_ADDRESS=dataservice:13333 diff --git a/build/docker/deploy/docker-compose.yml b/build/docker/deploy/docker-compose.yml index f7898f5384b8405758c1ddb855b976536a0ca374..f5b568143fac034d3858e0ccbb2f4d97c4a8eb65 100644 --- a/build/docker/deploy/docker-compose.yml +++ b/build/docker/deploy/docker-compose.yml @@ -63,7 +63,7 @@ services: networks: - milvus - querynode: + querynode1: image: ${TARGET_REPO}/querynode:${TARGET_TAG} build: context: ../../../ @@ -76,6 +76,26 @@ services: MINIO_ADDRESS: ${MINIO_ADDRESS} DATA_SERVICE_ADDRESS: ${DATA_SERVICE_ADDRESS} INDEX_SERVICE_ADDRESS: ${INDEX_SERVICE_ADDRESS} + QUERY_SERVICE_ADDRESS: ${QUERY_SERVICE_ADDRESS} +# QUERY_NODE_HOST: ${QUERY_NODE_HOST1} + networks: + - milvus + + querynode2: + image: ${TARGET_REPO}/querynode:${TARGET_TAG} + build: + context: ../../../ + dockerfile: build/docker/deploy/querynode/Dockerfile + cache_from: + - ${SOURCE_REPO}/querynode:${SOURCE_TAG} + environment: + PULSAR_ADDRESS: ${PULSAR_ADDRESS} + MASTER_ADDRESS: ${MASTER_ADDRESS} + MINIO_ADDRESS: ${MINIO_ADDRESS} + DATA_SERVICE_ADDRESS: ${DATA_SERVICE_ADDRESS} + INDEX_SERVICE_ADDRESS: ${INDEX_SERVICE_ADDRESS} + QUERY_SERVICE_ADDRESS: ${QUERY_SERVICE_ADDRESS} +# QUERY_NODE_HOST: ${QUERY_NODE_HOST2} networks: - milvus @@ -92,6 +112,9 @@ services: MASTER_ADDRESS: ${MASTER_ADDRESS} MINIO_ADDRESS: ${MINIO_ADDRESS} DATA_SERVICE_ADDRESS: ${DATA_SERVICE_ADDRESS} +# DATA_NODE_HOST: ${DATA_NODE_HOST} + depends_on: + - "dataservice" networks: - milvus @@ -119,6 +142,7 @@ services: environment: INDEX_SERVICE_ADDRESS: ${INDEX_SERVICE_ADDRESS} MINIO_ADDRESS: ${MINIO_ADDRESS} + INDEX_NODE_HOST: ${INDEX_NODE_HOST} depends_on: - "indexservice" networks: diff --git a/cmd/distributed/components/data_node.go b/cmd/distributed/components/data_node.go index 1b372cdc6c9a3f9689000b0103964713084ad30a..eba3c0761be2cf67068be6f04a8ae1a3d123a90e 100644 --- a/cmd/distributed/components/data_node.go +++ b/cmd/distributed/components/data_node.go @@ -3,133 +3,31 @@ package components import ( "context" "log" - "time" - - dn "github.com/zilliztech/milvus-distributed/internal/datanode" - ms "github.com/zilliztech/milvus-distributed/internal/masterservice" - - dnc "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" - dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + grpcdatanode "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) type DataNode struct { ctx context.Context - svr *dnc.Server - - masterService *msc.GrpcClient - dataService *dsc.Client + svr *grpcdatanode.Server } func NewDataNode(ctx context.Context, factory msgstream.Factory) (*DataNode, error) { - const retry = 10 - const interval = 200 - - svr, err := dnc.New(ctx, factory) + svr, err := grpcdatanode.New(ctx, factory) if err != nil { - panic(err) - } - - log.Println("Datanode is", dn.Params.NodeID) - - // --- Master Service Client --- - ms.Params.Init() - log.Println("Master service address:", dn.Params.MasterAddress) - log.Println("Init master service client ...") - masterClient, err := msc.NewGrpcClient(dn.Params.MasterAddress, 20*time.Second) - if err != nil { - panic(err) - } - - if err = masterClient.Init(); err != nil { - panic(err) - } - - if err = masterClient.Start(); err != nil { - panic(err) - } - - var cnt int - for cnt = 0; cnt < retry; cnt++ { - time.Sleep(time.Duration(cnt*interval) * time.Millisecond) - if cnt != 0 { - log.Println("Master service isn't ready ...") - log.Printf("Retrying getting master service's states in ... %v ms", interval) - } - - msStates, err := masterClient.GetComponentStates() - - if err != nil { - continue - } - if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if msStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= retry { - panic("Master service isn't ready") - } - - if err := svr.SetMasterServiceInterface(masterClient); err != nil { - panic(err) - } - - // --- Data Service Client --- - log.Println("Data service address: ", dn.Params.ServiceAddress) - log.Println("Init data service client ...") - dataService := dsc.NewClient(dn.Params.ServiceAddress) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } - - for cnt = 0; cnt < retry; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - continue - } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= retry { - panic("Data service isn't ready") - } - - if err := svr.SetDataServiceInterface(dataService); err != nil { - panic(err) + return nil, err } return &DataNode{ ctx: ctx, svr: svr, - - dataService: dataService, - masterService: masterClient, }, nil } func (d *DataNode) Run() error { - if err := d.svr.Init(); err != nil { - panic(err) - } - - if err := d.svr.Start(); err != nil { + if err := d.svr.Run(); err != nil { panic(err) } log.Println("Data node successfully started ...") @@ -137,7 +35,8 @@ func (d *DataNode) Run() error { } func (d *DataNode) Stop() error { - _ = d.dataService.Stop() - _ = d.masterService.Stop() - return d.svr.Stop() + if err := d.svr.Stop(); err != nil { + return err + } + return nil } diff --git a/cmd/distributed/components/data_service.go b/cmd/distributed/components/data_service.go index 6ca63d786b11941fc25f84b14e303c431311f1b1..5a62ed4cb5f1035fc001f78649834a67d3adaa70 100644 --- a/cmd/distributed/components/data_service.go +++ b/cmd/distributed/components/data_service.go @@ -2,81 +2,38 @@ package components import ( "context" - "errors" - "log" - "time" - ms "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - - "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" ) type DataService struct { - ctx context.Context - server *dataservice.Service - masterClient *ms.GrpcClient + ctx context.Context + svr *grpcdataserviceclient.Server } func NewDataService(ctx context.Context, factory msgstream.Factory) (*DataService, error) { - service := dataservice.NewGrpcService(ctx, factory) - - dataservice.Params.Init() - client, err := ms.NewGrpcClient(dataservice.Params.MasterAddress, 30*time.Second) + s, err := grpcdataserviceclient.NewServer(ctx, factory) if err != nil { return nil, err } - log.Println("master client create complete") - if err = client.Init(); err != nil { - return nil, err - } - if err = client.Start(); err != nil { - return nil, err - } - ticker := time.NewTicker(500 * time.Millisecond) - tctx, tcancel := context.WithTimeout(ctx, 30*time.Second) - defer func() { - ticker.Stop() - tcancel() - }() - - for { - var states *internalpb2.ComponentStates - select { - case <-ticker.C: - states, err = client.GetComponentStates() - if err != nil { - continue - } - case <-tctx.Done(): - return nil, errors.New("master client connect timeout") - } - if states.State.StateCode == internalpb2.StateCode_INITIALIZING || states.State.StateCode == internalpb2.StateCode_HEALTHY { - break - } - } - service.SetMasterClient(client) return &DataService{ - ctx: ctx, - server: service, - masterClient: client, + ctx: ctx, + svr: s, }, nil } func (s *DataService) Run() error { - if err := s.server.Init(); err != nil { - return err - } - if err := s.server.Start(); err != nil { + if err := s.svr.Run(); err != nil { return err } return nil } func (s *DataService) Stop() error { - _ = s.masterClient.Stop() - _ = s.server.Stop() + if err := s.svr.Stop(); err != nil { + return err + } return nil } diff --git a/cmd/distributed/components/master_service.go b/cmd/distributed/components/master_service.go index 562171643857133a46afa89c4c7714eb8e270d34..20ec85d1a76d86848e7af928b66ba3e8f4d6e36c 100644 --- a/cmd/distributed/components/master_service.go +++ b/cmd/distributed/components/master_service.go @@ -2,170 +2,39 @@ package components import ( "context" - "fmt" - "log" - "time" - ds "github.com/zilliztech/milvus-distributed/internal/dataservice" - dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - ps "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" - psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" - qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" - is "github.com/zilliztech/milvus-distributed/internal/indexservice" - ms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - qs "github.com/zilliztech/milvus-distributed/internal/queryservice" ) type MasterService struct { ctx context.Context - svr *msc.GrpcServer - - proxyService *psc.Client - dataService *dsc.Client - indexService *isc.Client - queryService *qsc.Client + svr *msc.Server } func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterService, error) { - const reTryCnt = 3 - - svr, err := msc.NewGrpcServer(ctx, factory) - if err != nil { - return nil, err - } - log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) - - cnt := 0 - - ps.Params.Init() - log.Printf("proxy service address : %s", ps.Params.ServiceAddress) - proxyService := psc.NewClient(ps.Params.ServiceAddress) - if err = proxyService.Init(); err != nil { - panic(err) - } - - for cnt = 0; cnt < reTryCnt; cnt++ { - pxStates, err := proxyService.GetComponentStates() - if err != nil { - log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, err.Error()) - continue - } - if pxStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("get state from proxy service, retry count = %d, error = %s", cnt, pxStates.Status.Reason) - continue - } - if pxStates.State.StateCode != internalpb2.StateCode_INITIALIZING && pxStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - - if err = svr.SetProxyService(proxyService); err != nil { - panic(err) - } - - ds.Params.Init() - log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) - dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } - for cnt = 0; cnt < reTryCnt; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - log.Printf("retry cout = %d, error = %s", cnt, err.Error()) - continue - } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("retry cout = %d, error = %s", cnt, dsStates.Status.Reason) - continue - } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= reTryCnt { - panic("connect to data service failed") - } - - if err = svr.SetDataService(dataService); err != nil { - panic(err) - } - is.Params.Init() - log.Printf("index service address : %s", is.Params.Address) - indexService := isc.NewClient(is.Params.Address) - if err = indexService.Init(); err != nil { - return nil, err - } - - if err = svr.SetIndexService(indexService); err != nil { - return nil, err - } - - qs.Params.Init() - queryService, err := qsc.NewClient(qs.Params.Address, time.Duration(ms.Params.Timeout)*time.Second) + svr, err := msc.NewServer(ctx, factory) if err != nil { return nil, err } - if err = queryService.Init(); err != nil { - return nil, err - } - if err = queryService.Start(); err != nil { - return nil, err - } - if err = svr.SetQueryService(queryService); err != nil { - return nil, err - } return &MasterService{ ctx: ctx, svr: svr, - - proxyService: proxyService, - dataService: dataService, - indexService: indexService, - queryService: queryService, }, nil } func (m *MasterService) Run() error { - if err := m.svr.Init(); err != nil { - return err - } - - if err := m.svr.Start(); err != nil { + if err := m.svr.Run(); err != nil { return err } return nil } func (m *MasterService) Stop() error { - if m != nil { - if m.proxyService != nil { - _ = m.proxyService.Stop() - } - if m.indexService != nil { - _ = m.indexService.Stop() - } - if m.dataService != nil { - _ = m.dataService.Stop() - } - if m.queryService != nil { - _ = m.queryService.Stop() - } - if m.svr != nil { - return m.svr.Stop() - } + if err := m.svr.Stop(); err != nil { + return err } return nil } diff --git a/cmd/distributed/components/query_node.go b/cmd/distributed/components/query_node.go index 06c7ebdc9a9d5423d6a5e66ca6045b62a14e944e..380631db819837500b498d74b315e211e80802c8 100644 --- a/cmd/distributed/components/query_node.go +++ b/cmd/distributed/components/query_node.go @@ -2,243 +2,33 @@ package components import ( "context" - "errors" - "fmt" "log" - "time" - dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" - msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - qns "github.com/zilliztech/milvus-distributed/internal/distributed/querynode" - qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + grpcquerynode "github.com/zilliztech/milvus-distributed/internal/distributed/querynode" "github.com/zilliztech/milvus-distributed/internal/msgstream" - - ds "github.com/zilliztech/milvus-distributed/internal/dataservice" - is "github.com/zilliztech/milvus-distributed/internal/indexservice" - ms "github.com/zilliztech/milvus-distributed/internal/masterservice" - qs "github.com/zilliztech/milvus-distributed/internal/queryservice" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) type QueryNode struct { ctx context.Context - svr *qns.Server - - dataService *dsc.Client - masterService *msc.GrpcClient - indexService *isc.Client - queryService *qsc.Client + svr *grpcquerynode.Server } func NewQueryNode(ctx context.Context, factory msgstream.Factory) (*QueryNode, error) { - const retry = 10 - const interval = 500 - - svr, err := qns.NewServer(ctx, factory) - if err != nil { - panic(err) - } - // --- QueryService --- - qs.Params.Init() - log.Println("QueryService address:", qs.Params.Address) - log.Println("Init Query service client ...") - queryService, err := qsc.NewClient(qs.Params.Address, 20*time.Second) + svr, err := grpcquerynode.NewServer(ctx, factory) if err != nil { - panic(err) - } - - if err = queryService.Init(); err != nil { - panic(err) - } - - if err = queryService.Start(); err != nil { - panic(err) - } - - var cnt int - for cnt = 0; cnt < retry; cnt++ { - if cnt != 0 { - log.Println("Query service isn't ready ...") - log.Printf("Retrying getting query service's states in ... %v ms", interval) - } - - qsStates, err := queryService.GetComponentStates() - - if err != nil { - continue - } - if qsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if qsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && qsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= retry { - panic("Query service isn't ready") - } - if err := svr.SetQueryService(queryService); err != nil { - panic(err) - } - - // --- Master Service Client --- - ms.Params.Init() - addr := fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port) - log.Println("Master service address:", addr) - log.Println("Init master service client ...") - var masterService *msc.GrpcClient = nil - if QueryMock { - svr.SetMasterService(&qns.MasterServiceMock{Count: 0}) - } else { - masterService, err = msc.NewGrpcClient(addr, 20*time.Second) - if err != nil { - panic(err) - } - - if err = masterService.Init(); err != nil { - panic(err) - } - - if err = masterService.Start(); err != nil { - panic(err) - } - - ticker := time.NewTicker(interval * time.Millisecond) - tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond) - defer func() { - ticker.Stop() - tcancel() - }() - - for { - var states *internalpb2.ComponentStates - select { - case <-ticker.C: - states, err = masterService.GetComponentStates() - if err != nil { - continue - } - case <-tctx.Done(): - return nil, errors.New("master client connect timeout") - } - if states.State.StateCode == internalpb2.StateCode_HEALTHY { - break - } - } - - if err := svr.SetMasterService(masterService); err != nil { - panic(err) - } - } - - // --- IndexService --- - is.Params.Init() - log.Println("Index service address:", is.Params.Address) - var indexService *isc.Client = nil - if QueryMock { - svr.SetIndexService(&qns.IndexServiceMock{Count: 0}) - } else { - indexService = isc.NewClient(is.Params.Address) - - if err := indexService.Init(); err != nil { - panic(err) - } - - if err := indexService.Start(); err != nil { - panic(err) - } - - ticker := time.NewTicker(interval * time.Millisecond) - tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond) - defer func() { - ticker.Stop() - tcancel() - }() - - for { - var states *internalpb2.ComponentStates - select { - case <-ticker.C: - states, err = indexService.GetComponentStates() - if err != nil { - continue - } - case <-tctx.Done(): - return nil, errors.New("Index service client connect timeout") - } - if states.State.StateCode == internalpb2.StateCode_HEALTHY { - break - } - } - - if err := svr.SetIndexService(indexService); err != nil { - panic(err) - } - } - - // --- DataService --- - ds.Params.Init() - log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port) - log.Println("Init data service client ...") - var dataService *dsc.Client = nil - if QueryMock { - svr.SetDataService(&qns.DataServiceMock{Count: 0}) - } else { - dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } - - for cnt = 0; cnt < retry; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - log.Printf("retry cout = %d, error = %s", cnt, err.Error()) - continue - } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("retry cout = %d, error = %s", cnt, err.Error()) - continue - } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= retry { - panic("Data service isn't ready") - } - - if err := svr.SetDataService(dataService); err != nil { - panic(err) - } + return nil, err } return &QueryNode{ - ctx: ctx, svr: svr, - - dataService: dataService, - masterService: masterService, - indexService: indexService, - queryService: queryService, }, nil + } func (q *QueryNode) Run() error { - if err := q.svr.Init(); err != nil { - panic(err) - } - - if err := q.svr.Start(); err != nil { + if err := q.svr.Run(); err != nil { panic(err) } log.Println("Query node successfully started ...") @@ -246,11 +36,8 @@ func (q *QueryNode) Run() error { } func (q *QueryNode) Stop() error { - if !QueryMock { - _ = q.dataService.Stop() - _ = q.masterService.Stop() - _ = q.indexService.Stop() + if err := q.svr.Stop(); err != nil { + return err } - _ = q.queryService.Stop() - return q.svr.Stop() + return nil } diff --git a/cmd/distributed/components/query_service.go b/cmd/distributed/components/query_service.go index 4d6c937dec1f054b34555f691ddba6d77b277327..67f2399f56e3b757fb887c83ae7496afa367d474 100644 --- a/cmd/distributed/components/query_service.go +++ b/cmd/distributed/components/query_service.go @@ -2,161 +2,40 @@ package components import ( "context" - "fmt" "log" - "time" - - ds "github.com/zilliztech/milvus-distributed/internal/dataservice" - dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - qs "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice" - ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + grpcqueryservice "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/queryservice" ) type QueryService struct { ctx context.Context - svr *qs.Server - - dataService *dsc.Client - masterService *msc.GrpcClient + svr *grpcqueryservice.Server } -const ( - QueryMock = false -) - func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryService, error) { - const retry = 10 - const interval = 200 - - queryservice.Params.Init() - svr, err := qs.NewServer(ctx, factory) + svr, err := grpcqueryservice.NewServer(ctx, factory) if err != nil { panic(err) } - log.Println("Queryservice id is", queryservice.Params.QueryServiceID) - - // --- Master Service Client --- - ms.Params.Init() - log.Printf("Master service address: %s:%d", ms.Params.Address, ms.Params.Port) - log.Println("Init master service client ...") - var masterService *msc.GrpcClient = nil - if QueryMock { - masterMock := queryservice.NewMasterMock() - svr.SetMasterService(masterMock) - } else { - masterService, err = msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second) - if err != nil { - panic(err) - } - - if err = masterService.Init(); err != nil { - panic(err) - } - - if err = masterService.Start(); err != nil { - panic(err) - } - - var cnt int - for cnt = 0; cnt < retry; cnt++ { - time.Sleep(time.Duration(cnt*interval) * time.Millisecond) - if cnt != 0 { - log.Println("Master service isn't ready ...") - log.Printf("Retrying getting master service's states in ... %v ms", interval) - } - - msStates, err := masterService.GetComponentStates() - - if err != nil { - continue - } - if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if msStates.State.StateCode != internalpb2.StateCode_HEALTHY && msStates.State.StateCode != internalpb2.StateCode_INITIALIZING { - continue - } - break - } - if cnt >= retry { - panic("Master service isn't ready") - } - - if err := svr.SetMasterService(masterService); err != nil { - panic(err) - } - } - - // --- Data service client --- - ds.Params.Init() - log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port) - log.Println("Init data service client ...") - var dataService *dsc.Client = nil - if QueryMock { - dataMock := queryservice.NewDataMock() - svr.SetDataService(dataMock) - } else { - dataService = dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } - - var cnt int - for cnt = 0; cnt < retry; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - continue - } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= retry { - panic("Data service isn't ready") - } - - if err := svr.SetDataService(dataService); err != nil { - panic(err) - } - } return &QueryService{ - ctx: ctx, - svr: svr, - dataService: dataService, - masterService: masterService, + ctx: ctx, + svr: svr, }, nil } func (qs *QueryService) Run() error { - if err := qs.svr.Init(); err != nil { + if err := qs.svr.Run(); err != nil { panic(err) } - - if err := qs.svr.Start(); err != nil { - panic(err) - } - log.Println("Data node successfully started ...") + log.Println("QueryService successfully started ...") return nil } func (qs *QueryService) Stop() error { - if !QueryMock { - _ = qs.dataService.Stop() - _ = qs.masterService.Stop() + if err := qs.svr.Stop(); err != nil { + return err } - return qs.svr.Stop() + return nil } diff --git a/go.sum b/go.sum index d36ea03ef8ca0cdfb993cdf4e269e6eef4235d46..f2731592357715a9699ab2d3262680efe9ff1b2c 100644 --- a/go.sum +++ b/go.sum @@ -297,6 +297,7 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible h1:CAG0PUvo1fen+ZEfxKJjFIc8GuuN5RuaBuCAuaP2Hno= github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible/go.mod h1:iIubILNIN6Jq9h8uiSLrN9L1tuj3iSSFwz3R61skm/A= diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index c7eb4696ededbc04b8d9a818133a52103046d52e..8909e17f521e6500ec7877663cc7a3f7047e9228 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -78,12 +78,10 @@ type ( func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { - Params.Init() ctx2, cancel2 := context.WithCancel(ctx) node := &DataNode{ ctx: ctx2, cancel: cancel2, - NodeID: Params.NodeID, // GOOSE TODO: How to init Role: typeutil.DataNodeRole, watchDm: make(chan struct{}), @@ -94,9 +92,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { replica: nil, msFactory: factory, } - - node.State.Store(internalpb2.StateCode_INITIALIZING) - + node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return node } @@ -130,7 +126,7 @@ func (node *DataNode) Init() error { }, Address: &commonpb.Address{ Ip: Params.IP, - Port: Params.Port, + Port: int64(Params.Port), }, } @@ -181,10 +177,14 @@ func (node *DataNode) Init() error { func (node *DataNode) Start() error { node.metaService.init() go node.dataSyncService.start() - node.State.Store(internalpb2.StateCode_HEALTHY) + node.UpdateStateCode(internalpb2.StateCode_HEALTHY) return nil } +func (node *DataNode) UpdateStateCode(code internalpb2.StateCode) { + node.State.Store(code) +} + func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 86f595a803d409bca588575b7cfdc792664c2e98..20f99a16721e69d07aac0d2d2f5262b1dc021da1 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -18,8 +18,8 @@ type ParamTable struct { // === DataNode Internal Components Configs === NodeID UniqueID - IP string // GOOSE TODO load from config file - Port int64 + IP string + Port int FlowGraphMaxQueueLength int32 FlowGraphMaxParallelism int32 FlushInsertBufferSize int32 @@ -29,8 +29,8 @@ type ParamTable struct { // === DataNode External Components Configs === // --- External Client Address --- - MasterAddress string - ServiceAddress string // GOOSE TODO: init from config file + //MasterAddress string + //ServiceAddress string // GOOSE TODO: init from config file // --- Pulsar --- PulsarAddress string @@ -80,8 +80,6 @@ func (p *ParamTable) Init() { // === DataNode Internal Components Configs === p.initNodeID() - p.initIP() - p.initPort() p.initFlowGraphMaxQueueLength() p.initFlowGraphMaxParallelism() p.initFlushInsertBufferSize() @@ -90,10 +88,6 @@ func (p *ParamTable) Init() { p.initDdBinlogRootPath() // === DataNode External Components Configs === - // --- Master --- - p.initMasterAddress() - p.initServiceAddress() - // --- Pulsar --- p.initPulsarAddress() @@ -140,19 +134,6 @@ func (p *ParamTable) initNodeID() { p.NodeID = p.ParseInt64("_dataNodeID") } -func (p *ParamTable) initIP() { - addr, err := p.Load("dataNode.address") - if err != nil { - panic(err) - } - p.IP = addr -} - -func (p *ParamTable) initPort() { - port := p.ParseInt64("dataNode.port") - p.Port = port -} - // ---- flowgraph configs ---- func (p *ParamTable) initFlowGraphMaxQueueLength() { p.FlowGraphMaxQueueLength = p.ParseInt32("dataNode.dataSync.flowGraph.maxQueueLength") @@ -189,29 +170,6 @@ func (p *ParamTable) initDdBinlogRootPath() { p.DdBinlogRootPath = path.Join(rootPath, "data_definition_log") } -// ===== DataNode External components configs ==== -// ---- Master ---- -func (p *ParamTable) initMasterAddress() { - addr, err := p.Load("_MasterAddress") - if err != nil { - panic(err) - } - p.MasterAddress = addr -} - -func (p *ParamTable) initServiceAddress() { - addr, err := p.Load("dataService.address") - if err != nil { - panic(err) - } - - port, err := p.Load("dataService.port") - if err != nil { - panic(err) - } - p.ServiceAddress = addr + ":" + port -} - // ---- Pulsar ---- func (p *ParamTable) initPulsarAddress() { url, err := p.Load("_PulsarAddress") diff --git a/internal/datanode/param_table_test.go b/internal/datanode/param_table_test.go index 5f34635e09296934ddc5dc05cfab8a48ad83d124..ce6146ca9e5724d9e9995634342865720fb80d00 100644 --- a/internal/datanode/param_table_test.go +++ b/internal/datanode/param_table_test.go @@ -44,11 +44,6 @@ func TestParamTable_DataNode(t *testing.T) { log.Println("DdBinlogRootPath:", path) }) - t.Run("Test MasterAddress", func(t *testing.T) { - address := Params.MasterAddress - log.Println("MasterAddress:", address) - }) - t.Run("Test PulsarAddress", func(t *testing.T) { address := Params.PulsarAddress log.Println("PulsarAddress:", address) diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index a1a7c105515036cd7b678795216e52753961fe34..2d451130322f574a6712bfb34c02e30bd359b89c 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -13,9 +13,7 @@ import ( type ParamTable struct { paramtable.BaseTable - Address string - Port int - NodeID int64 + NodeID int64 EtcdAddress string MetaRootPath string @@ -54,8 +52,6 @@ func (p *ParamTable) Init() { } // set members - p.initAddress() - p.initPort() p.initNodeID() p.initEtcdAddress() @@ -80,18 +76,6 @@ func (p *ParamTable) Init() { }) } -func (p *ParamTable) initAddress() { - dataserviceAddress, err := p.Load("dataservice.address") - if err != nil { - panic(err) - } - p.Address = dataserviceAddress -} - -func (p *ParamTable) initPort() { - p.Port = p.ParseInt("dataservice.port") -} - func (p *ParamTable) initNodeID() { p.NodeID = p.ParseInt64("dataservice.nodeID") } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 7e42b4ffc492e42c5dd7c34d8ebd98fcc84d9f6d..bc7b9e53c923bec2517dfe750f927251b9b47135 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -9,12 +9,12 @@ import ( "sync/atomic" "time" + grpcdatanodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/datanode/client" + "go.uber.org/zap" "github.com/zilliztech/milvus-distributed/internal/log" - "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -109,7 +109,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro msFactory: factory, } s.insertChannels = s.getInsertChannels() - s.state.Store(internalpb2.StateCode_INITIALIZING) + s.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return s, nil } @@ -158,11 +158,15 @@ func (s *Server) Start() error { return err } s.startServerLoop() - s.state.Store(internalpb2.StateCode_HEALTHY) + s.UpdateStateCode(internalpb2.StateCode_HEALTHY) log.Debug("start success") return nil } +func (s *Server) UpdateStateCode(code internalpb2.StateCode) { + s.state.Store(code) +} + func (s *Server) checkStateIsHealthy() bool { return s.state.Load().(internalpb2.StateCode) == internalpb2.StateCode_HEALTHY } @@ -466,11 +470,14 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, } + log.Info("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port)) node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID) if err != nil { return nil, err } + s.cluster.Register(node) + if s.ddChannelName == "" { resp, err := s.masterClient.GetDdChannel() if err = VerifyResponse(resp, err); err != nil { @@ -493,10 +500,11 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register } func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, error) { - client := datanode.NewClient(fmt.Sprintf("%s:%d", ip, port)) + client := grpcdatanodeclient.NewClient(fmt.Sprintf("%s:%d", ip, port)) if err := client.Init(); err != nil { return nil, err } + if err := client.Start(); err != nil { return nil, err } diff --git a/internal/distributed/datanode/client.go b/internal/distributed/datanode/client/client.go similarity index 73% rename from internal/distributed/datanode/client.go rename to internal/distributed/datanode/client/client.go index ebbd730f0b6fa3809af65977bebda7ecd603a041..3c2390c0cfee0099d431001464fe606232d46829 100644 --- a/internal/distributed/datanode/client.go +++ b/internal/distributed/datanode/client/client.go @@ -1,21 +1,18 @@ -package datanode +package grpcdatanodeclient import ( "context" + "log" "time" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/util/retry" "google.golang.org/grpc" ) -const ( - RPCConnectionTimeout = 30 * time.Second - Retry = 3 -) - type Client struct { ctx context.Context grpc datapb.DataNodeClient @@ -26,18 +23,23 @@ type Client struct { func NewClient(address string) *Client { return &Client{ address: address, + ctx: context.Background(), } } func (c *Client) Init() error { - ctx, cancel := context.WithTimeout(context.Background(), RPCConnectionTimeout) - defer cancel() - var err error - for i := 0; i < Retry; i++ { - if c.conn, err = grpc.DialContext(ctx, c.address, grpc.WithInsecure(), grpc.WithBlock()); err == nil { - break + + connectGrpcFunc := func() error { + log.Println("DataNode connect czs::", c.address) + conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return err } + c.conn = conn + return nil } + + err := retry.Retry(100, time.Millisecond*200, connectGrpcFunc) if err != nil { return err } diff --git a/internal/distributed/datanode/param_table.go b/internal/distributed/datanode/param_table.go new file mode 100644 index 0000000000000000000000000000000000000000..ef5364765482a4cda0ac31c21da70bf13f324db7 --- /dev/null +++ b/internal/distributed/datanode/param_table.go @@ -0,0 +1,64 @@ +package grpcdatanode + +import ( + "os" + "sync" + + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +var Params ParamTable +var once sync.Once + +type ParamTable struct { + paramtable.BaseTable + + IP string + Port int + + MasterAddress string + DataServiceAddress string +} + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + pt.initMasterAddress() + pt.initDataServiceAddress() + pt.initPort() // todo random generate + }) +} + +func (pt *ParamTable) LoadFromArgs() { + +} + +func (pt *ParamTable) LoadFromEnv() { + Params.IP = funcutil.GetLocalIP() + host := os.Getenv("DATA_NODE_HOST") + if len(host) > 0 { + Params.IP = host + } +} + +func (pt *ParamTable) initPort() { + port := pt.ParseInt("dataNode.port") + pt.Port = port +} + +func (pt *ParamTable) initMasterAddress() { + ret, err := pt.Load("_MasterAddress") + if err != nil { + panic(err) + } + pt.MasterAddress = ret +} + +func (pt *ParamTable) initDataServiceAddress() { + ret, err := pt.Load("_DataServiceAddress") + if err != nil { + panic(err) + } + pt.DataServiceAddress = ret +} diff --git a/internal/distributed/datanode/param_table_test.go b/internal/distributed/datanode/param_table_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1b32758f170023e88f3791691b0ca43c94e5d7f2 --- /dev/null +++ b/internal/distributed/datanode/param_table_test.go @@ -0,0 +1,21 @@ +package grpcdatanode + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + assert.NotEqual(t, Params.Port, 0) + t.Logf("DataNode Port:%d", Params.Port) + + assert.NotEqual(t, Params.DataServiceAddress, "") + t.Logf("DataServiceAddress:%s", Params.DataServiceAddress) + + assert.NotEqual(t, Params.MasterAddress, "") + t.Logf("MasterAddress:%s", Params.MasterAddress) + +} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 080fc27eb19122898f74657740f4907d0b5b643a..96448d4a65dd780d6e249f6e93ad501c3515573b 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -1,12 +1,19 @@ -package datanode +package grpcdatanode import ( "context" + "sync" + "time" + + "log" "net" "strconv" - "sync" + + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" dn "github.com/zilliztech/milvus-distributed/internal/datanode" + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -17,86 +24,183 @@ import ( ) type Server struct { - core *dn.DataNode - - grpcServer *grpc.Server - grpcError error - grpcErrMux sync.Mutex - - ctx context.Context - cancel context.CancelFunc + impl *dn.DataNode + wg sync.WaitGroup + grpcErrChan chan error + grpcServer *grpc.Server + ctx context.Context + cancel context.CancelFunc msFactory msgstream.Factory + + masterService *msc.GrpcClient + dataService *dsc.Client } func New(ctx context.Context, factory msgstream.Factory) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) var s = &Server{ - ctx: ctx1, - cancel: cancel, - msFactory: factory, + ctx: ctx1, + cancel: cancel, + msFactory: factory, + grpcErrChan: make(chan error), } - s.core = dn.NewDataNode(s.ctx, s.msFactory) - s.grpcServer = grpc.NewServer() - datapb.RegisterDataNodeServer(s.grpcServer, s) - addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10) + s.impl = dn.NewDataNode(s.ctx, s.msFactory) + + return s, nil +} + +func (s *Server) startGrpcLoop(grpcPort int) { + defer s.wg.Done() + + addr := ":" + strconv.Itoa(grpcPort) + lis, err := net.Listen("tcp", addr) if err != nil { - return nil, err + log.Printf("DataNode GrpcServer:failed to listen: %v", err) + s.grpcErrChan <- err + return } + log.Println("DataNode:: addr:", addr) - go func() { - if err = s.grpcServer.Serve(lis); err != nil { - s.grpcErrMux.Lock() - defer s.grpcErrMux.Unlock() - s.grpcError = err - } - }() + s.grpcServer = grpc.NewServer() + datapb.RegisterDataNodeServer(s.grpcServer, s) - s.grpcErrMux.Lock() - err = s.grpcError - s.grpcErrMux.Unlock() + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() - if err != nil { - return nil, err + go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) + if err := s.grpcServer.Serve(lis); err != nil { + log.Println("DataNode Start Grpc Failed!!!!") + s.grpcErrChan <- err } - return s, nil + } func (s *Server) SetMasterServiceInterface(ms dn.MasterServiceInterface) error { - return s.core.SetMasterServiceInterface(ms) + return s.impl.SetMasterServiceInterface(ms) } func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error { - return s.core.SetDataServiceInterface(ds) + return s.impl.SetDataServiceInterface(ds) } -func (s *Server) Init() error { - return s.core.Init() -} +func (s *Server) Run() error { -func (s *Server) Start() error { - return s.core.Start() + if err := s.init(); err != nil { + return err + } + log.Println("data node init done ...") + + if err := s.start(); err != nil { + return err + } + log.Println("data node start done ...") + return nil } func (s *Server) Stop() error { - err := s.core.Stop() s.cancel() - s.grpcServer.GracefulStop() - return err + var err error + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + } + + err = s.impl.Stop() + if err != nil { + return err + } + s.wg.Wait() + return nil +} + +func (s *Server) init() error { + Params.Init() + Params.Port = funcutil.GetAvailablePort() + Params.LoadFromEnv() + Params.LoadFromArgs() + + log.Println("DataNode, port:", Params.Port) + s.wg.Add(1) + go s.startGrpcLoop(Params.Port) + // wait for grpc server loop start + err := <-s.grpcErrChan + if err != nil { + return err + } + + // --- Master Server Client --- + log.Println("Master service address:", Params.MasterAddress) + log.Println("Init master service client ...") + masterClient, err := msc.NewClient(Params.MasterAddress, 20*time.Second) + if err != nil { + panic(err) + } + + if err = masterClient.Init(); err != nil { + panic(err) + } + + if err = masterClient.Start(); err != nil { + panic(err) + } + err = funcutil.WaitForComponentHealthy(masterClient, "MasterService", 100, time.Millisecond*200) + + if err != nil { + panic(err) + } + + if err := s.SetMasterServiceInterface(masterClient); err != nil { + panic(err) + } + + // --- Data Server Client --- + log.Println("Data service address: ", Params.DataServiceAddress) + log.Println("DataNode Init data service client ...") + dataService := dsc.NewClient(Params.DataServiceAddress) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + err = funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, time.Millisecond*200) + if err != nil { + panic(err) + } + if err := s.SetDataServiceInterface(dataService); err != nil { + panic(err) + } + + dn.Params.Init() + dn.Params.Port = Params.Port + dn.Params.IP = Params.IP + + s.impl.NodeID = dn.Params.NodeID + s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + + if err := s.impl.Init(); err != nil { + log.Println("impl init error: ", err) + return err + } + return nil +} + +func (s *Server) start() error { + return s.impl.Start() } func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { - return s.core.GetComponentStates() + return s.impl.GetComponentStates() } func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { - return s.core.WatchDmChannels(in) + return s.impl.WatchDmChannels(in) } func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) { - if s.core.State.Load().(internalpb2.StateCode) != internalpb2.StateCode_HEALTHY { + if s.impl.State.Load().(internalpb2.StateCode) != internalpb2.StateCode_HEALTHY { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: "DataNode isn't healthy.", @@ -104,5 +208,5 @@ func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) } return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, - }, s.core.FlushSegments(in) + }, s.impl.FlushSegments(in) } diff --git a/internal/distributed/dataservice/client.go b/internal/distributed/dataservice/client/client.go similarity index 89% rename from internal/distributed/dataservice/client.go rename to internal/distributed/dataservice/client/client.go index cc70a3ed36081b85349ac8cbba3ad25b113c54d7..9e3ff9552772e018a005733745a8bd7ac9366e14 100644 --- a/internal/distributed/dataservice/client.go +++ b/internal/distributed/dataservice/client/client.go @@ -1,10 +1,11 @@ -package dataservice +package grpcdataserviceclient import ( "context" "time" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/util/retry" "google.golang.org/grpc" @@ -14,36 +15,36 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) -const ( - timeout = 30 * time.Second - retry = 3 -) - type Client struct { grpcClient datapb.DataServiceClient conn *grpc.ClientConn + ctx context.Context addr string } func NewClient(addr string) *Client { return &Client{ addr: addr, + ctx: context.Background(), } } func (c *Client) Init() error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - var err error - for i := 0; i < retry; i++ { - if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err == nil { - break + connectGrpcFunc := func() error { + conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + return err } + c.conn = conn + return nil } + + err := retry.Retry(100, time.Millisecond*200, connectGrpcFunc) if err != nil { return err } c.grpcClient = datapb.NewDataServiceClient(c.conn) + return nil } diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go deleted file mode 100644 index 7f3990bf1f66fb205581f4fe386422125ca7b3b6..0000000000000000000000000000000000000000 --- a/internal/distributed/dataservice/grpc_service.go +++ /dev/null @@ -1,138 +0,0 @@ -package dataservice - -import ( - "context" - "fmt" - "log" - "net" - "time" - - "google.golang.org/grpc" - - "github.com/zilliztech/milvus-distributed/internal/dataservice" - - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - - "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" -) - -type Service struct { - server *dataservice.Server - ctx context.Context - grpcServer *grpc.Server -} - -func (s *Service) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) { - return s.server.GetSegmentInfo(request) -} - -func NewGrpcService(ctx context.Context, factory msgstream.Factory) *Service { - s := &Service{} - var err error - s.ctx = ctx - s.server, err = dataservice.CreateServer(s.ctx, factory) - if err != nil { - log.Fatalf("create server error: %s", err.Error()) - return nil - } - return s -} - -func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) { - s.server.SetMasterClient(masterClient) -} - -func (s *Service) Init() error { - var err error - s.grpcServer = grpc.NewServer() - datapb.RegisterDataServiceServer(s.grpcServer, s) - lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port)) - if err != nil { - return nil - } - c := make(chan struct{}) - go func() { - if err2 := s.grpcServer.Serve(lis); err2 != nil { - close(c) - err = err2 - } - }() - timer := time.NewTimer(1 * time.Second) - defer timer.Stop() - select { - case <-timer.C: - break - case <-c: - return err - } - return s.server.Init() -} - -func (s *Service) Start() error { - return s.server.Start() -} - -func (s *Service) Stop() error { - err := s.server.Stop() - s.grpcServer.GracefulStop() - return err -} - -func (s *Service) RegisterNode(ctx context.Context, request *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { - return s.server.RegisterNode(request) -} - -func (s *Service) Flush(ctx context.Context, request *datapb.FlushRequest) (*commonpb.Status, error) { - return s.server.Flush(request) -} - -func (s *Service) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) { - return s.server.AssignSegmentID(request) -} - -func (s *Service) ShowSegments(ctx context.Context, request *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) { - return s.server.ShowSegments(request) -} - -func (s *Service) GetSegmentStates(ctx context.Context, request *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { - return s.server.GetSegmentStates(request) -} - -func (s *Service) GetInsertBinlogPaths(ctx context.Context, request *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) { - return s.server.GetInsertBinlogPaths(request) -} - -func (s *Service) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { - return s.server.GetInsertChannels(request) -} - -func (s *Service) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { - return s.server.GetCollectionStatistics(request) -} - -func (s *Service) GetPartitionStatistics(ctx context.Context, request *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) { - return s.server.GetPartitionStatistics(request) -} - -func (s *Service) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { - return s.server.GetComponentStates() -} - -func (s *Service) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.server.GetTimeTickChannel() -} - -func (s *Service) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.server.GetStatisticsChannel() -} - -func (s *Service) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { - return s.server.GetSegmentInfoChannel() -} - -func (s *Service) GetCount(ctx context.Context, request *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) { - return s.server.GetCount(request) -} diff --git a/internal/distributed/dataservice/paramtable.go b/internal/distributed/dataservice/paramtable.go index 2af61acbaab9bb17aaba7071ca6d94e0fc007715..ddce5dc0f58a016df09394914e62753660cd9a56 100644 --- a/internal/distributed/dataservice/paramtable.go +++ b/internal/distributed/dataservice/paramtable.go @@ -1,7 +1,6 @@ -package dataservice +package grpcdataserviceclient import ( - "os" "sync" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" @@ -10,6 +9,7 @@ import ( type ParamTable struct { paramtable.BaseTable + Port int MasterAddress string } @@ -19,6 +19,7 @@ var once sync.Once func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() + pt.initPort() pt.initParams() pt.LoadFromEnv() }) @@ -29,20 +30,17 @@ func (pt *ParamTable) initParams() { } func (pt *ParamTable) LoadFromEnv() { - masterAddress := os.Getenv("MASTER_ADDRESS") - if masterAddress != "" { - pt.MasterAddress = masterAddress - } + +} + +func (pt *ParamTable) initPort() { + pt.Port = pt.ParseInt("dataservice.port") } func (pt *ParamTable) initMasterAddress() { - masterHost, err := pt.Load("master.address") - if err != nil { - panic(err) - } - port, err := pt.Load("master.port") + ret, err := pt.Load("_MasterAddress") if err != nil { panic(err) } - pt.MasterAddress = masterHost + ":" + port + pt.MasterAddress = ret } diff --git a/internal/distributed/dataservice/service.go b/internal/distributed/dataservice/service.go new file mode 100644 index 0000000000000000000000000000000000000000..6382f7f3da0001aaf829982499328729fa7e9df6 --- /dev/null +++ b/internal/distributed/dataservice/service.go @@ -0,0 +1,212 @@ +package grpcdataserviceclient + +import ( + "context" + "log" + "net" + "strconv" + "sync" + "time" + + "google.golang.org/grpc" + + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" + + "github.com/zilliztech/milvus-distributed/internal/dataservice" + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" +) + +type Server struct { + ctx context.Context + cancel context.CancelFunc + + grpcErrChan chan error + wg sync.WaitGroup + + impl *dataservice.Server + grpcServer *grpc.Server + masterClient *msc.GrpcClient +} + +func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { + + ctx1, cancel := context.WithCancel(ctx) + + s := &Server{ + ctx: ctx1, + cancel: cancel, + grpcErrChan: make(chan error), + } + + var err error + s.impl, err = dataservice.CreateServer(s.ctx, factory) + if err != nil { + return nil, err + } + return s, nil +} + +func (s *Server) init() error { + Params.Init() + Params.LoadFromEnv() + + s.wg.Add(1) + go s.startGrpcLoop(Params.Port) + // wait for grpc server loop start + if err := <-s.grpcErrChan; err != nil { + return err + } + + log.Println("DataService:: MasterServicAddr:", Params.MasterAddress) + client, err := msc.NewClient(Params.MasterAddress, 10*time.Second) + if err != nil { + panic(err) + } + log.Println("master client create complete") + if err = client.Init(); err != nil { + panic(err) + } + if err = client.Start(); err != nil { + panic(err) + } + s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + + err = funcutil.WaitForComponentInitOrHealthy(client, "MasterService", 100, time.Millisecond*200) + + if err != nil { + panic(err) + } + s.impl.SetMasterClient(client) + + dataservice.Params.Init() + if err := s.impl.Init(); err != nil { + log.Println("impl init error: ", err) + return err + } + return nil +} + +func (s *Server) startGrpcLoop(grpcPort int) { + + defer s.wg.Done() + + log.Println("network port: ", grpcPort) + lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) + if err != nil { + log.Printf("GrpcServer:failed to listen: %v", err) + s.grpcErrChan <- err + return + } + + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + + s.grpcServer = grpc.NewServer() + datapb.RegisterDataServiceServer(s.grpcServer, s) + + go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) + if err := s.grpcServer.Serve(lis); err != nil { + s.grpcErrChan <- err + } +} + +func (s *Server) start() error { + return s.impl.Start() +} + +func (s *Server) Stop() error { + + s.cancel() + var err error + + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + } + + err = s.impl.Stop() + if err != nil { + return err + } + + s.wg.Wait() + + return nil +} + +func (s *Server) Run() error { + + if err := s.init(); err != nil { + return err + } + log.Println("dataservice init done ...") + + if err := s.start(); err != nil { + return err + } + return nil +} + +func (s *Server) GetSegmentInfo(ctx context.Context, request *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) { + return s.impl.GetSegmentInfo(request) +} + +func (s *Server) RegisterNode(ctx context.Context, request *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { + return s.impl.RegisterNode(request) +} + +func (s *Server) Flush(ctx context.Context, request *datapb.FlushRequest) (*commonpb.Status, error) { + return s.impl.Flush(request) +} + +func (s *Server) AssignSegmentID(ctx context.Context, request *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) { + return s.impl.AssignSegmentID(request) +} + +func (s *Server) ShowSegments(ctx context.Context, request *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) { + return s.impl.ShowSegments(request) +} + +func (s *Server) GetSegmentStates(ctx context.Context, request *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { + return s.impl.GetSegmentStates(request) +} + +func (s *Server) GetInsertBinlogPaths(ctx context.Context, request *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) { + return s.impl.GetInsertBinlogPaths(request) +} + +func (s *Server) GetInsertChannels(ctx context.Context, request *datapb.InsertChannelRequest) (*internalpb2.StringList, error) { + return s.impl.GetInsertChannels(request) +} + +func (s *Server) GetCollectionStatistics(ctx context.Context, request *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) { + return s.impl.GetCollectionStatistics(request) +} + +func (s *Server) GetPartitionStatistics(ctx context.Context, request *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) { + return s.impl.GetPartitionStatistics(request) +} + +func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { + return s.impl.GetComponentStates() +} + +func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { + return s.impl.GetTimeTickChannel() +} + +func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { + return s.impl.GetStatisticsChannel() +} + +func (s *Server) GetSegmentInfoChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { + return s.impl.GetSegmentInfoChannel() +} + +func (s *Server) GetCount(ctx context.Context, request *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) { + return s.impl.GetCount(request) +} diff --git a/internal/distributed/indexnode/paramtable.go b/internal/distributed/indexnode/paramtable.go index 739a5b1ef9e864b4ab8b0ef5d7da8e3b0bb896d4..e26a5fccf59f30c1e67c31285944c512c6150033 100644 --- a/internal/distributed/indexnode/paramtable.go +++ b/internal/distributed/indexnode/paramtable.go @@ -30,12 +30,10 @@ func (pt *ParamTable) Init() { }) } -// todo func (pt *ParamTable) LoadFromArgs() { } -//todo func (pt *ParamTable) LoadFromEnv() { indexServiceAddress := os.Getenv("INDEX_SERVICE_ADDRESS") if indexServiceAddress != "" { @@ -43,7 +41,7 @@ func (pt *ParamTable) LoadFromEnv() { } Params.IP = funcutil.GetLocalIP() - host := os.Getenv("PROXY_NODE_HOST") + host := os.Getenv("INDEX_NODE_HOST") if len(host) > 0 { Params.IP = host } diff --git a/internal/distributed/masterservice/client.go b/internal/distributed/masterservice/client/client.go similarity index 67% rename from internal/distributed/masterservice/client.go rename to internal/distributed/masterservice/client/client.go index 2bc46f445718d522d9dafcd89d2aa9ca6b41e888..c36a16a23accd0825511809422accdf5a63ae70f 100644 --- a/internal/distributed/masterservice/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -1,11 +1,10 @@ -package masterservice +package grpcmasterserviceclient import ( "context" "time" "github.com/zilliztech/milvus-distributed/internal/errors" - cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -19,18 +18,20 @@ type GrpcClient struct { conn *grpc.ClientConn //inner member - addr string - timeout time.Duration - retry int + addr string + timeout time.Duration + grpcTimeout time.Duration + retry int } -func NewGrpcClient(addr string, timeout time.Duration) (*GrpcClient, error) { +func NewClient(addr string, timeout time.Duration) (*GrpcClient, error) { return &GrpcClient{ - grpcClient: nil, - conn: nil, - addr: addr, - timeout: timeout, - retry: 3, + grpcClient: nil, + conn: nil, + addr: addr, + timeout: timeout, + grpcTimeout: time.Second * 5, + retry: 3, }, nil } @@ -47,7 +48,6 @@ func (c *GrpcClient) Init() error { return err } c.grpcClient = masterpb.NewMasterServiceClient(c.conn) - cms.Params.Init() return nil } @@ -59,98 +59,98 @@ func (c *GrpcClient) Stop() error { } func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{}) } //DDL request func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.CreateCollection(ctx, in) } func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.DropCollection(ctx, in) } func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.HasCollection(ctx, in) } func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.DescribeCollection(ctx, in) } func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.ShowCollections(ctx, in) } func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.CreatePartition(ctx, in) } func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.DropPartition(ctx, in) } func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.HasPartition(ctx, in) } func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.ShowPartitions(ctx, in) } //index builder service func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.CreateIndex(ctx, in) } func (c *GrpcClient) DropIndex(in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.DropIndex(ctx, in) } func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.DescribeIndex(ctx, in) } //global timestamp allocator func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.AllocTimestamp(ctx, in) } func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.AllocID(ctx, in) } //receiver time tick from proxy service, and put it into this channel func (c *GrpcClient) GetTimeTickChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{}) if err != nil { @@ -164,7 +164,7 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) { //receive ddl from rpc and time tick from proxy service, and put them into this channel func (c *GrpcClient) GetDdChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{}) if err != nil { @@ -178,7 +178,7 @@ func (c *GrpcClient) GetDdChannel() (string, error) { //just define a channel, not used currently func (c *GrpcClient) GetStatisticsChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{}) if err != nil { @@ -191,13 +191,13 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) { } func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.DescribeSegment(ctx, in) } func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), c.grpcTimeout) defer cancel() return c.grpcClient.ShowSegments(ctx, in) } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 400dffda26ba5645a7f52c1fb74e47cb3fea6053..6d09a5da979d6db7fef7baa6348e8007fc86d3a0 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -1,14 +1,18 @@ -package masterservice +package grpcmasterservice import ( "context" "fmt" "math/rand" "regexp" + "strconv" + "strings" "sync" "testing" "time" + grpcmasterserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" @@ -25,16 +29,23 @@ func TestGrpcService(t *testing.T) { rand.Seed(time.Now().UnixNano()) randVal := rand.Int() - //cms.Params.Address = "127.0.0.1" - cms.Params.Port = (randVal % 100) + 10000 + Params.Init() + Params.Port = (randVal % 100) + 10000 + parts := strings.Split(Params.Address, ":") + if len(parts) == 2 { + Params.Address = parts[0] + ":" + strconv.Itoa(Params.Port) + t.Log("newParams.Address:", Params.Address) + } msFactory := pulsarms.NewFactory() - svr, err := NewGrpcServer(context.Background(), msFactory) + svr, err := NewServer(context.Background(), msFactory) assert.Nil(t, err) + svr.connectQueryService = false + svr.connectProxyService = false + svr.connectIndexService = false + svr.connectDataService = false - // cms.Params.NodeID = 0 - //cms.Params.PulsarAddress = "pulsar://127.0.0.1:6650" - //cms.Params.EtcdAddress = "127.0.0.1:2379" + cms.Params.Init() cms.Params.MetaRootPath = fmt.Sprintf("/%d/test/meta", randVal) cms.Params.KvRootPath = fmt.Sprintf("/%d/test/kv", randVal) cms.Params.ProxyTimeTickChannel = fmt.Sprintf("proxyTimeTick%d", randVal) @@ -48,11 +59,14 @@ func TestGrpcService(t *testing.T) { cms.Params.DefaultPartitionName = "_default" cms.Params.DefaultIndexName = "_default" - t.Logf("master service port = %d", cms.Params.Port) + t.Logf("master service port = %d", Params.Port) - core := svr.core.(*cms.Core) + err = svr.startGrpc() + assert.Nil(t, err) + svr.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING) - err = svr.Init() + core := svr.core + err = core.Init() assert.Nil(t, err) core.ProxyTimeTickChan = make(chan typeutil.Timestamp, 8) @@ -126,10 +140,12 @@ func TestGrpcService(t *testing.T) { return nil } - err = svr.Start() + err = svr.start() assert.Nil(t, err) - cli, err := NewGrpcClient(fmt.Sprintf("%s:%d", cms.Params.Address, cms.Params.Port), 3*time.Second) + svr.core.UpdateStateCode(internalpb2.StateCode_HEALTHY) + + cli, err := grpcmasterserviceclient.NewClient(Params.Address, 3*time.Second) assert.Nil(t, err) err = cli.Init() @@ -178,6 +194,7 @@ func TestGrpcService(t *testing.T) { status, err := cli.CreateCollection(req) assert.Nil(t, err) + assert.Equal(t, len(createCollectionArray), 1) assert.Equal(t, status.ErrorCode, commonpb.ErrorCode_SUCCESS) assert.Equal(t, createCollectionArray[0].Base.MsgType, commonpb.MsgType_kCreateCollection) diff --git a/internal/distributed/masterservice/param_table.go b/internal/distributed/masterservice/param_table.go new file mode 100644 index 0000000000000000000000000000000000000000..ad9df6ee94c4c5b2afed05d75f91cc505be65100 --- /dev/null +++ b/internal/distributed/masterservice/param_table.go @@ -0,0 +1,83 @@ +package grpcmasterservice + +import ( + "sync" + + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +var Params ParamTable +var once sync.Once + +type ParamTable struct { + paramtable.BaseTable + + Address string // ip:port + Port int + + ProxyServiceAddress string + IndexServiceAddress string + QueryServiceAddress string + DataServiceAddress string +} + +func (p *ParamTable) Init() { + once.Do(func() { + p.BaseTable.Init() + err := p.LoadYaml("advanced/master.yaml") + if err != nil { + panic(err) + } + p.initAddress() + p.initPort() + p.initProxyServiceAddress() + p.initIndexServiceAddress() + p.initQueryServiceAddress() + p.initDataServiceAddress() + + }) +} + +func (p *ParamTable) initAddress() { + ret, err := p.Load("_MasterAddress") + if err != nil { + panic(err) + } + p.Address = ret +} + +func (p *ParamTable) initPort() { + p.Port = p.ParseInt("master.port") +} + +func (p *ParamTable) initProxyServiceAddress() { + ret, err := p.Load("_PROXY_SERVICE_ADDRESS") + if err != nil { + panic(err) + } + p.ProxyServiceAddress = ret +} + +func (p *ParamTable) initIndexServiceAddress() { + ret, err := p.Load("IndexServiceAddress") + if err != nil { + panic(err) + } + p.IndexServiceAddress = ret +} + +func (p *ParamTable) initQueryServiceAddress() { + ret, err := p.Load("_QueryServiceAddress") + if err != nil { + panic(err) + } + p.QueryServiceAddress = ret +} + +func (p *ParamTable) initDataServiceAddress() { + ret, err := p.Load("_DataServiceAddress") + if err != nil { + panic(err) + } + p.DataServiceAddress = ret +} diff --git a/internal/distributed/masterservice/param_table_test.go b/internal/distributed/masterservice/param_table_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4bd671206886abb1970044f11f720cbfc860d376 --- /dev/null +++ b/internal/distributed/masterservice/param_table_test.go @@ -0,0 +1,29 @@ +package grpcmasterservice + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + assert.NotEqual(t, Params.Address, "") + t.Logf("master address = %s", Params.Address) + + assert.NotEqual(t, Params.Port, 0) + t.Logf("master port = %d", Params.Port) + + assert.NotEqual(t, Params.IndexServiceAddress, "") + t.Logf("IndexServiceAddress:%s", Params.IndexServiceAddress) + + assert.NotEqual(t, Params.DataServiceAddress, "") + t.Logf("DataServiceAddress:%s", Params.DataServiceAddress) + + assert.NotEqual(t, Params.QueryServiceAddress, "") + t.Logf("QueryServiceAddress:%s", Params.QueryServiceAddress) + + assert.NotEqual(t, Params.ProxyServiceAddress, "") + t.Logf("ProxyServiceAddress:%s", Params.ProxyServiceAddress) +} diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index fceb859b01fd43bf9837529b811acc14a8ed23cc..7f8bb1be176d9eeee5f2af107f9b7557ed49a5b0 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -1,12 +1,20 @@ -package masterservice +package grpcmasterservice import ( "context" - "fmt" + "log" + "strconv" + "time" + "net" "sync" - "github.com/zilliztech/milvus-distributed/internal/errors" + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" + isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" + psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" + qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -17,161 +25,271 @@ import ( ) // grpc wrapper -type GrpcServer struct { - core cms.Interface - grpcServer *grpc.Server - grpcError error - grpcErrMux sync.Mutex +type Server struct { + core *cms.Core + grpcServer *grpc.Server + grpcErrChan chan error + + wg sync.WaitGroup ctx context.Context cancel context.CancelFunc + + proxyService *psc.Client + dataService *dsc.Client + indexService *isc.Client + queryService *qsc.Client + + connectProxyService bool + connectDataService bool + connectIndexService bool + connectQueryService bool } -func NewGrpcServer(ctx context.Context, factory msgstream.Factory) (*GrpcServer, error) { - s := &GrpcServer{} +func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { + + ctx1, cancel := context.WithCancel(ctx) + + s := &Server{ + ctx: ctx1, + cancel: cancel, + grpcErrChan: make(chan error), + connectDataService: true, + connectProxyService: true, + connectIndexService: true, + connectQueryService: true, + } + var err error - s.ctx, s.cancel = context.WithCancel(ctx) - if s.core, err = cms.NewCore(s.ctx, factory); err != nil { + s.core, err = cms.NewCore(s.ctx, factory) + if err != nil { return nil, err } + return s, err +} - s.grpcServer = grpc.NewServer() - s.grpcError = nil - masterpb.RegisterMasterServiceServer(s.grpcServer, s) - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cms.Params.Port)) +func (s *Server) Run() error { + if err := s.init(); err != nil { + return err + } + + if err := s.start(); err != nil { + return err + } + return nil +} + +func (s *Server) init() error { + Params.Init() + log.Println("init params done") + + err := s.startGrpc() if err != nil { - return nil, err + return err } - go func() { - if err := s.grpcServer.Serve(lis); err != nil { - s.grpcErrMux.Lock() - defer s.grpcErrMux.Unlock() - s.grpcError = err + + s.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + + if s.connectProxyService { + log.Printf("proxy service address : %s", Params.ProxyServiceAddress) + proxyService := psc.NewClient(Params.ProxyServiceAddress) + if err := proxyService.Init(); err != nil { + panic(err) } - }() - s.grpcErrMux.Lock() - err = s.grpcError - s.grpcErrMux.Unlock() + err := funcutil.WaitForComponentInitOrHealthy(proxyService, "ProxyService", 100, 200*time.Millisecond) + if err != nil { + panic(err) + } - if err != nil { - return nil, err + if err = s.core.SetProxyService(proxyService); err != nil { + panic(err) + } } - return s, nil -} + if s.connectDataService { + log.Printf("data service address : %s", Params.DataServiceAddress) + dataService := dsc.NewClient(Params.DataServiceAddress) + if err := dataService.Init(); err != nil { + panic(err) + } + if err := dataService.Start(); err != nil { + panic(err) + } + err := funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, 200*time.Millisecond) + if err != nil { + panic(err) + } -func (s *GrpcServer) Init() error { - return s.core.Init() -} + if err = s.core.SetDataService(dataService); err != nil { + panic(err) + } + } + if s.connectIndexService { + log.Printf("index service address : %s", Params.IndexServiceAddress) + indexService := isc.NewClient(Params.IndexServiceAddress) + if err := indexService.Init(); err != nil { + panic(err) + } -func (s *GrpcServer) Start() error { - return s.core.Start() + if err := s.core.SetIndexService(indexService); err != nil { + panic(err) + + } + } + if s.connectQueryService { + queryService, err := qsc.NewClient(Params.QueryServiceAddress, 5*time.Second) + if err != nil { + panic(err) + } + if err = queryService.Init(); err != nil { + panic(err) + } + if err = queryService.Start(); err != nil { + panic(err) + } + if err = s.core.SetQueryService(queryService); err != nil { + panic(err) + } + } + cms.Params.Init() + log.Println("grpc init done ...") + + if err := s.core.Init(); err != nil { + return err + } + return nil } -func (s *GrpcServer) Stop() error { - err := s.core.Stop() - s.cancel() - s.grpcServer.GracefulStop() +func (s *Server) startGrpc() error { + s.wg.Add(1) + go s.startGrpcLoop(Params.Port) + // wait for grpc server loop start + err := <-s.grpcErrChan return err } -func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set proxy service failed") +func (s *Server) startGrpcLoop(grpcPort int) { + + defer s.wg.Done() + + log.Println("network port: ", grpcPort) + lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) + if err != nil { + log.Printf("GrpcServer:failed to listen: %v", err) + s.grpcErrChan <- err + return } - return c.SetProxyService(p) -} -func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set data service failed") + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + + s.grpcServer = grpc.NewServer() + masterpb.RegisterMasterServiceServer(s.grpcServer, s) + + go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) + if err := s.grpcServer.Serve(lis); err != nil { + s.grpcErrChan <- err } - return c.SetDataService(p) + } -func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set index service failed") +func (s *Server) start() error { + log.Println("Master Core start ...") + if err := s.core.Start(); err != nil { + return err } - return c.SetIndexService(p) + return nil } -func (s *GrpcServer) SetQueryService(q cms.QueryServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set query service failed") +func (s *Server) Stop() error { + if s.proxyService != nil { + _ = s.proxyService.Stop() + } + if s.indexService != nil { + _ = s.indexService.Stop() + } + if s.dataService != nil { + _ = s.dataService.Stop() + } + if s.queryService != nil { + _ = s.queryService.Stop() + } + if s.core != nil { + return s.core.Stop() + } + s.cancel() + if s.grpcServer != nil { + s.grpcServer.GracefulStop() } - return c.SetQueryService(q) + s.wg.Wait() + return nil } -func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { +func (s *Server) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { return s.core.GetComponentStates() } //DDL request -func (s *GrpcServer) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { +func (s *Server) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { return s.core.CreateCollection(in) } -func (s *GrpcServer) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { +func (s *Server) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { return s.core.DropCollection(in) } -func (s *GrpcServer) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { +func (s *Server) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { return s.core.HasCollection(in) } -func (s *GrpcServer) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { +func (s *Server) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { return s.core.DescribeCollection(in) } -func (s *GrpcServer) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { +func (s *Server) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { return s.core.ShowCollections(in) } -func (s *GrpcServer) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { +func (s *Server) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { return s.core.CreatePartition(in) } -func (s *GrpcServer) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { +func (s *Server) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { return s.core.DropPartition(in) } -func (s *GrpcServer) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { +func (s *Server) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { return s.core.HasPartition(in) } -func (s *GrpcServer) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { +func (s *Server) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { return s.core.ShowPartitions(in) } //index builder service -func (s *GrpcServer) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { +func (s *Server) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { return s.core.CreateIndex(in) } -func (s *GrpcServer) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { +func (s *Server) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { return s.core.DropIndex(in) } -func (s *GrpcServer) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { +func (s *Server) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { return s.core.DescribeIndex(in) } //global timestamp allocator -func (s *GrpcServer) AllocTimestamp(ctx context.Context, in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { +func (s *Server) AllocTimestamp(ctx context.Context, in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { return s.core.AllocTimestamp(in) } -func (s *GrpcServer) AllocID(ctx context.Context, in *masterpb.IDRequest) (*masterpb.IDResponse, error) { +func (s *Server) AllocID(ctx context.Context, in *masterpb.IDRequest) (*masterpb.IDResponse, error) { return s.core.AllocID(in) } //receiver time tick from proxy service, and put it into this channel -func (s *GrpcServer) GetTimeTickChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { +func (s *Server) GetTimeTickChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { rsp, err := s.core.GetTimeTickChannel() if err != nil { return &milvuspb.StringResponse{ @@ -192,7 +310,7 @@ func (s *GrpcServer) GetTimeTickChannelRPC(ctx context.Context, empty *commonpb. } //receive ddl from rpc and time tick from proxy service, and put them into this channel -func (s *GrpcServer) GetDdChannelRPC(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { +func (s *Server) GetDdChannelRPC(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { rsp, err := s.core.GetDdChannel() if err != nil { return &milvuspb.StringResponse{ @@ -213,7 +331,7 @@ func (s *GrpcServer) GetDdChannelRPC(ctx context.Context, in *commonpb.Empty) (* } //just define a channel, not used currently -func (s *GrpcServer) GetStatisticsChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { +func (s *Server) GetStatisticsChannelRPC(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) { rsp, err := s.core.GetStatisticsChannel() if err != nil { return &milvuspb.StringResponse{ @@ -233,10 +351,10 @@ func (s *GrpcServer) GetStatisticsChannelRPC(ctx context.Context, empty *commonp }, nil } -func (s *GrpcServer) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { +func (s *Server) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { return s.core.DescribeSegment(in) } -func (s *GrpcServer) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { +func (s *Server) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { return s.core.ShowSegments(in) } diff --git a/internal/distributed/proxynode/paramtable.go b/internal/distributed/proxynode/paramtable.go index 8e72ac2c5d5379069959b47e25934430e7962023..16d4cf8b1c3283f3c166a382548b90bef0969a24 100644 --- a/internal/distributed/proxynode/paramtable.go +++ b/internal/distributed/proxynode/paramtable.go @@ -6,6 +6,7 @@ import ( "strconv" "sync" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -36,12 +37,10 @@ func (pt *ParamTable) Init() { }) } -// todo func (pt *ParamTable) LoadFromArgs() { } -//todo func (pt *ParamTable) LoadFromEnv() { masterAddress := os.Getenv("MASTER_ADDRESS") @@ -69,6 +68,12 @@ func (pt *ParamTable) LoadFromEnv() { pt.DataServiceAddress = dataServiceAddress } + Params.IP = funcutil.GetLocalIP() + host := os.Getenv("PROXY_NODE_HOST") + if len(host) > 0 { + Params.IP = host + } + } func (pt *ParamTable) initParams() { diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index a1eca52e5b1f4b78d3fb4cb1ba1a43a64a356d0c..6609e97735fda30756471b86343328b0483ba282 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -6,18 +6,20 @@ import ( "io" "log" "net" - "os" "strconv" "sync" "time" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" - grpcdataservice "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + "google.golang.org/grpc" + + grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" - grcpmasterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + grpcmasterserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -25,7 +27,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/proxypb" "github.com/zilliztech/milvus-distributed/internal/proxynode" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" - "google.golang.org/grpc" ) type Server struct { @@ -36,15 +37,9 @@ type Server struct { grpcErrChan chan error - ip string - port int - - //todo - proxyServiceClient *grpcproxyserviceclient.Client - - // todo InitParams Service addrs - masterServiceClient *grcpmasterservice.GrpcClient - dataServiceClient *grpcdataservice.Client + proxyServiceClient *grpcproxyserviceclient.Client + masterServiceClient *grpcmasterserviceclient.GrpcClient + dataServiceClient *grpcdataserviceclient.Client queryServiceClient *grpcqueryserviceclient.Client indexServiceClient *grpcindexserviceclient.Client @@ -87,7 +82,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { log.Println("network port: ", grpcPort) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) if err != nil { - log.Printf("GrpcServer:failed to listen: %v", err) + log.Printf("Server:failed to listen: %v", err) s.grpcErrChan <- err return } @@ -124,12 +119,6 @@ func (s *Server) init() error { var err error Params.Init() - Params.IP = funcutil.GetLocalIP() - host := os.Getenv("PROXY_NODE_HOST") - if len(host) > 0 { - Params.IP = host - } - Params.LoadFromEnv() Params.LoadFromArgs() @@ -169,7 +158,7 @@ func (s *Server) init() error { masterServiceAddr := Params.MasterAddress log.Println("master address: ", masterServiceAddr) timeout := 3 * time.Second - s.masterServiceClient, err = grcpmasterservice.NewGrpcClient(masterServiceAddr, timeout) + s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, timeout) if err != nil { return err } @@ -182,7 +171,7 @@ func (s *Server) init() error { dataServiceAddr := Params.DataServiceAddress log.Println("data service address ...", dataServiceAddr) - s.dataServiceClient = grpcdataservice.NewClient(dataServiceAddr) + s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr) err = s.dataServiceClient.Init() if err != nil { return err diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index 3765813cbd15db4c73b279ff3dec4a47492a83eb..67d7925e84682a30706699faf5dffe2e7d90ea0e 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -62,7 +62,7 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) if err != nil { return nil, err } - return server, err + return server, nil } func (s *Server) Run() error { @@ -133,6 +133,7 @@ func (s *Server) start() error { } func (s *Server) Stop() error { + s.cancel() s.closer.Close() err := s.impl.Stop() if err != nil { @@ -141,7 +142,6 @@ func (s *Server) Stop() error { if s.grpcServer != nil { s.grpcServer.GracefulStop() } - s.cancel() s.wg.Wait() return nil } diff --git a/internal/distributed/querynode/param_table.go b/internal/distributed/querynode/param_table.go new file mode 100644 index 0000000000000000000000000000000000000000..80bfc8c779b84aeeba93a1565a66e38425cc50cd --- /dev/null +++ b/internal/distributed/querynode/param_table.go @@ -0,0 +1,95 @@ +package grpcquerynode + +import ( + "os" + "strconv" + "sync" + + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +var Params ParamTable +var once sync.Once + +type ParamTable struct { + paramtable.BaseTable + + QueryNodeIP string + QueryNodePort int + QueryNodeID UniqueID + + IndexServiceAddress string + MasterAddress string + DataServiceAddress string + QueryServiceAddress string +} + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + pt.initMasterAddress() + pt.initIndexServiceAddress() + pt.initDataServiceAddress() + pt.initQueryServiceAddress() + + }) +} + +func (pt *ParamTable) LoadFromArgs() { + +} + +func (pt *ParamTable) LoadFromEnv() { + + // todo assign by queryservice and set by read initparms + queryNodeIDStr := os.Getenv("QUERY_NODE_ID") + if queryNodeIDStr == "" { + panic("Can't Get QUERY_NODE_ID") + } + + queryID, err := strconv.Atoi(queryNodeIDStr) + if err != nil { + panic(err) + } + pt.QueryNodeID = UniqueID(queryID) + + Params.QueryNodeIP = funcutil.GetLocalIP() + host := os.Getenv("QUERY_NODE_HOST") + if len(host) > 0 { + Params.QueryNodeIP = host + } + +} + +func (pt *ParamTable) initMasterAddress() { + ret, err := pt.Load("_MasterAddress") + if err != nil { + panic(err) + } + pt.MasterAddress = ret +} + +func (pt *ParamTable) initIndexServiceAddress() { + ret, err := pt.Load("IndexServiceAddress") + if err != nil { + panic(err) + } + pt.IndexServiceAddress = ret +} + +func (pt *ParamTable) initDataServiceAddress() { + ret, err := pt.Load("_DataServiceAddress") + if err != nil { + panic(err) + } + pt.DataServiceAddress = ret +} + +func (pt *ParamTable) initQueryServiceAddress() { + ret, err := pt.Load("_QueryServiceAddress") + if err != nil { + panic(err) + } + pt.QueryServiceAddress = ret +} diff --git a/internal/distributed/querynode/param_table_test.go b/internal/distributed/querynode/param_table_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ff36ffae65a418a57c242b20f13643c2c00ea4c4 --- /dev/null +++ b/internal/distributed/querynode/param_table_test.go @@ -0,0 +1,23 @@ +package grpcquerynode + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + assert.NotEqual(t, Params.IndexServiceAddress, "") + t.Logf("IndexServiceAddress:%s", Params.IndexServiceAddress) + + assert.NotEqual(t, Params.DataServiceAddress, "") + t.Logf("DataServiceAddress:%s", Params.DataServiceAddress) + + assert.NotEqual(t, Params.MasterAddress, "") + t.Logf("MasterAddress:%s", Params.MasterAddress) + + assert.NotEqual(t, Params.QueryServiceAddress, "") + t.Logf("QueryServiceAddress:%s", Params.QueryServiceAddress) +} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 2d0d1249c6d6eceb01ae53547aef8ea9e31b643c..1f39a4b1f40c5751acbe6e8460a9c4a0ff583e0c 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -2,10 +2,18 @@ package grpcquerynode import ( "context" - "fmt" "log" "net" + "strconv" "sync" + "time" + + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" + isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" + qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" "google.golang.org/grpc" @@ -14,86 +22,240 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" qn "github.com/zilliztech/milvus-distributed/internal/querynode" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) +type UniqueID = typeutil.UniqueID + type Server struct { - node *qn.QueryNode + impl *qn.QueryNode + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + grpcErrChan chan error grpcServer *grpc.Server - grpcError error - grpcErrMux sync.Mutex - ctx context.Context - cancel context.CancelFunc + dataService *dsc.Client + masterService *msc.GrpcClient + indexService *isc.Client + queryService *qsc.Client } func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { + ctx1, cancel := context.WithCancel(ctx) + s := &Server{ - ctx: ctx, - node: qn.NewQueryNodeWithoutID(ctx, factory), + ctx: ctx1, + cancel: cancel, + impl: qn.NewQueryNodeWithoutID(ctx, factory), + grpcErrChan: make(chan error), } + return s, nil +} - qn.Params.Init() - s.grpcServer = grpc.NewServer() - querypb.RegisterQueryNodeServer(s.grpcServer, s) - lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", qn.Params.QueryNodeIP, qn.Params.QueryNodePort)) +func (s *Server) init() error { + + Params.Init() + Params.QueryNodePort = funcutil.GetAvailablePort() + Params.LoadFromEnv() + Params.LoadFromArgs() + + log.Println("QueryNode, port:", Params.QueryNodePort) + s.wg.Add(1) + go s.startGrpcLoop(Params.QueryNodePort) + // wait for grpc server loop start + err := <-s.grpcErrChan if err != nil { - return nil, err + return err + } + // --- QueryService --- + log.Println("QueryService address:", Params.QueryServiceAddress) + log.Println("Init Query service client ...") + queryService, err := qsc.NewClient(Params.QueryServiceAddress, 20*time.Second) + if err != nil { + panic(err) } - go func() { - log.Println("start query node grpc server...") - if err = s.grpcServer.Serve(lis); err != nil { - s.grpcErrMux.Lock() - defer s.grpcErrMux.Unlock() - s.grpcError = err - } - }() + if err = queryService.Init(); err != nil { + panic(err) + } - s.grpcErrMux.Lock() - err = s.grpcError - s.grpcErrMux.Unlock() + if err = queryService.Start(); err != nil { + panic(err) + } + err = funcutil.WaitForComponentInitOrHealthy(queryService, "QueryService", 100, time.Millisecond*200) if err != nil { - return nil, err + panic(err) } - return s, nil + + if err := s.SetQueryService(queryService); err != nil { + panic(err) + } + + // --- Master Server Client --- + //ms.Params.Init() + addr := Params.MasterAddress + log.Println("Master service address:", addr) + log.Println("Init master service client ...") + + masterService, err := msc.NewClient(addr, 20*time.Second) + if err != nil { + panic(err) + } + + if err = masterService.Init(); err != nil { + panic(err) + } + + if err = masterService.Start(); err != nil { + panic(err) + } + + err = funcutil.WaitForComponentHealthy(masterService, "MasterService", 100, time.Millisecond*200) + if err != nil { + panic(err) + } + + if err := s.SetMasterService(masterService); err != nil { + panic(err) + } + + // --- IndexService --- + log.Println("Index service address:", Params.IndexServiceAddress) + indexService := isc.NewClient(Params.IndexServiceAddress) + + if err := indexService.Init(); err != nil { + panic(err) + } + + if err := indexService.Start(); err != nil { + panic(err) + } + // wait indexservice healthy + err = funcutil.WaitForComponentHealthy(indexService, "IndexService", 100, time.Millisecond*200) + if err != nil { + panic(err) + } + + if err := s.SetIndexService(indexService); err != nil { + panic(err) + } + + // --- DataService --- + log.Printf("Data service address: %s", Params.DataServiceAddress) + log.Println("Querynode Init data service client ...") + + dataService := dsc.NewClient(Params.DataServiceAddress) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + err = funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, time.Millisecond*200) + if err != nil { + panic(err) + } + + if err := s.SetDataService(dataService); err != nil { + panic(err) + } + + qn.Params.Init() + qn.Params.QueryNodeIP = Params.QueryNodeIP + qn.Params.QueryNodePort = int64(Params.QueryNodePort) + qn.Params.QueryNodeID = Params.QueryNodeID + + s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + + if err := s.impl.Init(); err != nil { + log.Println("impl init error: ", err) + return err + } + return nil } -func (s *Server) Init() error { - return s.node.Init() +func (s *Server) start() error { + return s.impl.Start() } -func (s *Server) Start() error { - return s.node.Start() +func (s *Server) startGrpcLoop(grpcPort int) { + defer s.wg.Done() + + addr := ":" + strconv.Itoa(grpcPort) + + lis, err := net.Listen("tcp", addr) + if err != nil { + log.Printf("QueryNode GrpcServer:failed to listen: %v", err) + s.grpcErrChan <- err + return + } + log.Println("QueryNode:: addr:", addr) + + s.grpcServer = grpc.NewServer() + querypb.RegisterQueryNodeServer(s.grpcServer, s) + + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + + go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) + if err := s.grpcServer.Serve(lis); err != nil { + log.Println("QueryNode Start Grpc Failed!!!!") + s.grpcErrChan <- err + } + +} + +func (s *Server) Run() error { + + if err := s.init(); err != nil { + return err + } + log.Println("querynode init done ...") + + if err := s.start(); err != nil { + return err + } + log.Println("querynode start done ...") + return nil } func (s *Server) Stop() error { - err := s.node.Stop() s.cancel() - s.grpcServer.GracefulStop() - return err + var err error + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + } + + err = s.impl.Stop() + if err != nil { + return err + } + s.wg.Wait() + return nil } func (s *Server) SetMasterService(master qn.MasterServiceInterface) error { - return s.node.SetMasterService(master) + return s.impl.SetMasterService(master) } func (s *Server) SetQueryService(query qn.QueryServiceInterface) error { - return s.node.SetQueryService(query) + return s.impl.SetQueryService(query) } func (s *Server) SetIndexService(index qn.IndexServiceInterface) error { - return s.node.SetIndexService(index) + return s.impl.SetIndexService(index) } func (s *Server) SetDataService(data qn.DataServiceInterface) error { - return s.node.SetDataService(data) + return s.impl.SetDataService(data) } func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { // ignore ctx and in - channel, err := s.node.GetTimeTickChannel() + channel, err := s.impl.GetTimeTickChannel() if err != nil { return nil, err } @@ -107,7 +269,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*m func (s *Server) GetStatsChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { // ignore ctx and in - channel, err := s.node.GetStatisticsChannel() + channel, err := s.impl.GetStatisticsChannel() if err != nil { return nil, err } @@ -121,7 +283,7 @@ func (s *Server) GetStatsChannel(ctx context.Context, in *commonpb.Empty) (*milv func (s *Server) GetComponentStates(ctx context.Context, in *commonpb.Empty) (*querypb.ComponentStatesResponse, error) { // ignore ctx and in - componentStates, err := s.node.GetComponentStates() + componentStates, err := s.impl.GetComponentStates() if err != nil { return nil, err } @@ -135,29 +297,29 @@ func (s *Server) GetComponentStates(ctx context.Context, in *commonpb.Empty) (*q func (s *Server) AddQueryChannel(ctx context.Context, in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) { // ignore ctx - return s.node.AddQueryChannel(in) + return s.impl.AddQueryChannel(in) } func (s *Server) RemoveQueryChannel(ctx context.Context, in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) { // ignore ctx - return s.node.RemoveQueryChannel(in) + return s.impl.RemoveQueryChannel(in) } func (s *Server) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { // ignore ctx - return s.node.WatchDmChannels(in) + return s.impl.WatchDmChannels(in) } func (s *Server) LoadSegments(ctx context.Context, in *querypb.LoadSegmentRequest) (*commonpb.Status, error) { // ignore ctx - return s.node.LoadSegments(in) + return s.impl.LoadSegments(in) } func (s *Server) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) { // ignore ctx - return s.node.ReleaseSegments(in) + return s.impl.ReleaseSegments(in) } func (s *Server) GetSegmentInfo(ctx context.Context, in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) { - return s.node.GetSegmentInfo(in) + return s.impl.GetSegmentInfo(in) } diff --git a/internal/distributed/querynode/service_test.go b/internal/distributed/querynode/service_test.go deleted file mode 100644 index b7daa509bbd7c5b3f4126d786dfdd22d2d69f435..0000000000000000000000000000000000000000 --- a/internal/distributed/querynode/service_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package grpcquerynode - -import ( - "context" - "log" - "os" - "os/signal" - "syscall" - "testing" - "time" - - "go.uber.org/zap" - - "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" - "github.com/zilliztech/milvus-distributed/internal/querynode" -) - -const ( - debug = true - ctxTimeInMillisecond = 2000 -) - -func TestQueryNodeDistributed_Service(t *testing.T) { - // Creates server. - var ctx context.Context - var cancel context.CancelFunc - if debug { - ctx, cancel = context.WithCancel(context.Background()) - } else { - d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, cancel = context.WithDeadline(context.Background(), d) - } - go mockMain(ctx) - <-ctx.Done() - cancel() -} - -func mockMain(ctx context.Context) { - svr := newServerMock(ctx) - if err := svr.Init(); err != nil { - panic(err) - } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - var sig os.Signal - if err := svr.Start(); err != nil { - panic(err) - } - defer svr.Stop() - - <-ctx.Done() - log.Print("Got signal to exit", zap.String("signal", sig.String())) - - switch sig { - case syscall.SIGTERM: - os.Exit(0) - default: - os.Exit(1) - } -} - -func newServerMock(ctx context.Context) *Server { - factory := pulsarms.NewFactory() - server := &Server{ - node: querynode.NewQueryNodeWithoutID(ctx, factory), - } - - if err := server.node.SetQueryService(&queryServiceMock{}); err != nil { - panic(err) - } - if err := server.node.SetMasterService(&MasterServiceMock{}); err != nil { - panic(err) - } - if err := server.node.SetIndexService(&IndexServiceMock{}); err != nil { - panic(err) - } - if err := server.node.SetDataService(&DataServiceMock{}); err != nil { - panic(err) - } - - return server -} diff --git a/internal/distributed/queryservice/client/client.go b/internal/distributed/queryservice/client/client.go index 97d2955683edf16eaf034da0116ff0e11d2dfa26..166411ac30c6f55a98ac79a1b576754bac11c633 100644 --- a/internal/distributed/queryservice/client/client.go +++ b/internal/distributed/queryservice/client/client.go @@ -36,7 +36,6 @@ func NewClient(address string, timeout time.Duration) (*Client, error) { func (c *Client) Init() error { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() - var err error for i := 0; i < c.retry; i++ { if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err == nil { diff --git a/internal/distributed/queryservice/param_table.go b/internal/distributed/queryservice/param_table.go new file mode 100644 index 0000000000000000000000000000000000000000..2520f079a5dc4ffe5a0cbb4a0442454bff52c47a --- /dev/null +++ b/internal/distributed/queryservice/param_table.go @@ -0,0 +1,58 @@ +package grpcqueryservice + +import ( + "sync" + + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +var Params ParamTable +var once sync.Once + +type ParamTable struct { + paramtable.BaseTable + Port int + + IndexServiceAddress string + MasterAddress string + DataServiceAddress string +} + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + pt.initPort() + pt.initMasterAddress() + pt.initIndexServiceAddress() + pt.initDataServiceAddress() + + }) +} + +func (pt *ParamTable) initMasterAddress() { + ret, err := pt.Load("_MasterAddress") + if err != nil { + panic(err) + } + pt.MasterAddress = ret +} + +func (pt *ParamTable) initIndexServiceAddress() { + ret, err := pt.Load("IndexServiceAddress") + if err != nil { + panic(err) + } + pt.IndexServiceAddress = ret +} + +func (pt *ParamTable) initDataServiceAddress() { + ret, err := pt.Load("_DataServiceAddress") + if err != nil { + panic(err) + } + pt.DataServiceAddress = ret +} + +func (pt *ParamTable) initPort() { + pt.Port = pt.ParseInt("queryService.port") +} diff --git a/internal/distributed/queryservice/param_table_test.go b/internal/distributed/queryservice/param_table_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4b5d014595b14b648880e3598d9d91cd4f0e497b --- /dev/null +++ b/internal/distributed/queryservice/param_table_test.go @@ -0,0 +1,21 @@ +package grpcqueryservice + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + assert.NotEqual(t, Params.IndexServiceAddress, "") + t.Logf("IndexServiceAddress:%s", Params.IndexServiceAddress) + + assert.NotEqual(t, Params.DataServiceAddress, "") + t.Logf("DataServiceAddress:%s", Params.DataServiceAddress) + + assert.NotEqual(t, Params.MasterAddress, "") + t.Logf("MasterAddress:%s", Params.MasterAddress) + +} diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 303023edb2da46451b605e16f1ecdab54e1bf4f9..094326120e7719a5bd09074dd1694660213bf402 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -6,6 +6,11 @@ import ( "net" "strconv" "sync" + "time" + + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" "google.golang.org/grpc" @@ -18,75 +23,146 @@ import ( ) type Server struct { - grpcServer *grpc.Server - grpcError error - grpcErrMux sync.Mutex - + wg sync.WaitGroup loopCtx context.Context loopCancel context.CancelFunc + grpcServer *grpc.Server + + grpcErrChan chan error - queryService *qs.QueryService + impl *qs.QueryService msFactory msgstream.Factory + + dataService *dsc.Client + masterService *msc.GrpcClient } func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) - service, err := qs.NewQueryService(ctx1, factory) + svr, err := qs.NewQueryService(ctx1, factory) if err != nil { cancel() return nil, err } return &Server{ - queryService: service, - loopCtx: ctx1, - loopCancel: cancel, - msFactory: factory, + impl: svr, + loopCtx: ctx1, + loopCancel: cancel, + msFactory: factory, + grpcErrChan: make(chan error), }, nil } -func (s *Server) Init() error { - log.Println("query service init") - if err := s.queryService.Init(); err != nil { +func (s *Server) Run() error { + + if err := s.init(); err != nil { + return err + } + log.Println("queryservice init done ...") + + if err := s.start(); err != nil { return err } return nil } -func (s *Server) Start() error { - log.Println("start query service ...") +func (s *Server) init() error { + Params.Init() - s.grpcServer = grpc.NewServer() - querypb.RegisterQueryServiceServer(s.grpcServer, s) - log.Println("Starting start query service Server") - lis, err := net.Listen("tcp", ":"+strconv.Itoa(qs.Params.Port)) - if err != nil { + s.wg.Add(1) + go s.startGrpcLoop(Params.Port) + // wait for grpc server loop start + if err := <-s.grpcErrChan; err != nil { return err } - go func() { - if err := s.grpcServer.Serve(lis); err != nil { - s.grpcErrMux.Lock() - defer s.grpcErrMux.Unlock() - s.grpcError = err - } - }() + // --- Master Server Client --- + log.Println("Master service address:", Params.MasterAddress) + log.Println("Init master service client ...") - s.grpcErrMux.Lock() - err = s.grpcError - s.grpcErrMux.Unlock() + masterService, err := msc.NewClient(Params.MasterAddress, 20*time.Second) if err != nil { - return err + panic(err) + } + + if err = masterService.Init(); err != nil { + panic(err) + } + + if err = masterService.Start(); err != nil { + panic(err) + } + // wait for master init or healthy + err = funcutil.WaitForComponentInitOrHealthy(masterService, "MasterService", 100, time.Millisecond*200) + if err != nil { + panic(err) + } + + if err := s.SetMasterService(masterService); err != nil { + panic(err) + } + + // --- Data service client --- + log.Println("DataService Address:", Params.DataServiceAddress) + log.Println("QueryService Init data service client ...") + + dataService := dsc.NewClient(Params.DataServiceAddress) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + err = funcutil.WaitForComponentInitOrHealthy(dataService, "DataService", 100, time.Millisecond*200) + if err != nil { + panic(err) + } + if err := s.SetDataService(dataService); err != nil { + panic(err) } - s.queryService.Start() + qs.Params.Init() + s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING) + + if err := s.impl.Init(); err != nil { + return err + } return nil } +func (s *Server) startGrpcLoop(grpcPort int) { + + defer s.wg.Done() + + log.Println("network port: ", grpcPort) + lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) + if err != nil { + log.Printf("GrpcServer:failed to listen: %v", err) + s.grpcErrChan <- err + return + } + + ctx, cancel := context.WithCancel(s.loopCtx) + defer cancel() + + s.grpcServer = grpc.NewServer() + querypb.RegisterQueryServiceServer(s.grpcServer, s) + + go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) + if err := s.grpcServer.Serve(lis); err != nil { + s.grpcErrChan <- err + } +} + +func (s *Server) start() error { + return s.impl.Start() +} + func (s *Server) Stop() error { - err := s.queryService.Stop() + err := s.impl.Stop() s.loopCancel() if s.grpcServer != nil { s.grpcServer.GracefulStop() @@ -95,7 +171,7 @@ func (s *Server) Stop() error { } func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) { - componentStates, err := s.queryService.GetComponentStates() + componentStates, err := s.impl.GetComponentStates() if err != nil { return &internalpb2.ComponentStates{ Status: &commonpb.Status{ @@ -109,7 +185,7 @@ func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (* } func (s *Server) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) { - channel, err := s.queryService.GetTimeTickChannel() + channel, err := s.impl.GetTimeTickChannel() if err != nil { return &milvuspb.StringResponse{ Status: &commonpb.Status{ @@ -129,7 +205,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (* } func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) { - statisticsChannel, err := s.queryService.GetStatisticsChannel() + statisticsChannel, err := s.impl.GetStatisticsChannel() if err != nil { return &milvuspb.StringResponse{ Status: &commonpb.Status{ @@ -149,51 +225,51 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) } func (s *Server) SetMasterService(m qs.MasterServiceInterface) error { - s.queryService.SetMasterService(m) + s.impl.SetMasterService(m) return nil } func (s *Server) SetDataService(d qs.DataServiceInterface) error { - s.queryService.SetDataService(d) + s.impl.SetDataService(d) return nil } func (s *Server) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { - return s.queryService.RegisterNode(req) + return s.impl.RegisterNode(req) } func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) { - return s.queryService.ShowCollections(req) + return s.impl.ShowCollections(req) } func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) { - return s.queryService.LoadCollection(req) + return s.impl.LoadCollection(req) } func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { - return s.queryService.ReleaseCollection(req) + return s.impl.ReleaseCollection(req) } func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) { - return s.queryService.ShowPartitions(req) + return s.impl.ShowPartitions(req) } func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) { - return s.queryService.GetPartitionStates(req) + return s.impl.GetPartitionStates(req) } func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error) { - return s.queryService.LoadPartitions(req) + return s.impl.LoadPartitions(req) } func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error) { - return s.queryService.ReleasePartitions(req) + return s.impl.ReleasePartitions(req) } func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*querypb.CreateQueryChannelResponse, error) { - return s.queryService.CreateQueryChannel() + return s.impl.CreateQueryChannel() } func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) { - return s.queryService.GetSegmentInfo(req) + return s.impl.GetSegmentInfo(req) } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index b8a63fc309e3ca38210210da33c4994ee0c09f67..23c97ccf46a822c825d4e511cf5782072f72fc52 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -58,7 +58,7 @@ func NewNodeImpl(ctx context.Context) (*NodeImpl, error) { func (i *NodeImpl) Init() error { log.Println("AAAAAAAAAAAAAAAAA", i.serviceClient) - err := funcutil.WaitForComponentReady(i.serviceClient, "IndexService", 10, time.Second) + err := funcutil.WaitForComponentHealthy(i.serviceClient, "IndexService", 10, time.Second) log.Println("BBBBBBBBB", i.serviceClient) if err != nil { diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 8ca43e6860ee5e59287711ba000e5c2d616529f5..ff96add4679ed698a7e5f7df1392c5e57ba37489 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -93,11 +93,6 @@ type Interface interface { //segment DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) - - //get system config from master, not used currently - //GetSysConfigs(in *milvuspb.SysConfigRequest) - - //GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) } // ------------------ struct ----------------------- @@ -182,7 +177,7 @@ type Core struct { //call once initOnce sync.Once startOnce sync.Once - isInit atomic.Value + //isInit atomic.Value msFactory ms.Factory } @@ -192,17 +187,19 @@ type Core struct { func NewCore(c context.Context, factory ms.Factory) (*Core, error) { ctx, cancel := context.WithCancel(c) rand.Seed(time.Now().UnixNano()) - Params.Init() core := &Core{ ctx: ctx, cancel: cancel, msFactory: factory, } - core.stateCode.Store(internalpb2.StateCode_INITIALIZING) - core.isInit.Store(false) + core.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return core, nil } +func (c *Core) UpdateStateCode(code internalpb2.StateCode) { + c.stateCode.Store(code) +} + func (c *Core) checkInit() error { if c.MetaTable == nil { return errors.Errorf("MetaTable is nil") @@ -788,7 +785,6 @@ func (c *Core) Init() error { c.ddReqQueue = make(chan reqTask, 1024) c.indexTaskQueue = make(chan *CreateIndexTask, 1024) initError = c.setMsgStreams() - c.isInit.Store(true) }) if initError == nil { log.Printf("Master service State Code = %s", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)]) @@ -797,10 +793,6 @@ func (c *Core) Init() error { } func (c *Core) Start() error { - isInit := c.isInit.Load().(bool) - if !isInit { - return errors.Errorf("call init before start") - } if err := c.checkInit(); err != nil { return err } diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/master_service_test.go index 1e487205a69d79902c5da68986cdd9eb8b29ab06..75e0a44b578630e09ece432c7272358c9d46267a 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/master_service_test.go @@ -162,6 +162,7 @@ func TestMasterService(t *testing.T) { defer cancel() msFactory := pulsarms.NewFactory() + Params.Init() core, err := NewCore(ctx, msFactory) assert.Nil(t, err) randVal := rand.Int() diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 0088b8f46e66c67f6d3ad87f5c8010f11803f0aa..2ac7b57f86f1c5e72f5fe3594b60b19c038e823d 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -12,9 +12,7 @@ var once sync.Once type ParamTable struct { paramtable.BaseTable - Address string - Port int - NodeID uint64 + NodeID uint64 PulsarAddress string EtcdAddress string @@ -43,8 +41,6 @@ func (p *ParamTable) Init() { panic(err) } - p.initAddress() - p.initPort() p.initNodeID() p.initPulsarAddress() @@ -65,18 +61,6 @@ func (p *ParamTable) Init() { }) } -func (p *ParamTable) initAddress() { - masterAddress, err := p.Load("master.address") - if err != nil { - panic(err) - } - p.Address = masterAddress -} - -func (p *ParamTable) initPort() { - p.Port = p.ParseInt("master.port") -} - func (p *ParamTable) initNodeID() { p.NodeID = uint64(p.ParseInt64("master.nodeID")) } diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index af09ab497770a5fe4fc99adbd01f6661a440dc1f..fb27352eae2b67b36916732dbe81366e9db6276c 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -9,12 +9,6 @@ import ( func TestParamTable(t *testing.T) { Params.Init() - assert.NotEqual(t, Params.Address, "") - t.Logf("master address = %s", Params.Address) - - assert.NotEqual(t, Params.Port, 0) - t.Logf("master port = %d", Params.Port) - assert.NotEqual(t, Params.NodeID, 0) t.Logf("master node ID = %d", Params.NodeID) diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index 21a5996ceef9616e6ed5dbfa883d637f6914f7ef..fd05a5671dd5414300b811d38ccc61e4dae4bb43 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -131,12 +131,11 @@ func (s *ServiceImpl) Init() error { s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream) log.Println("create time tick ...") - s.stateCode = internalpb2.StateCode_HEALTHY - return nil } func (s *ServiceImpl) Start() error { + s.stateCode = internalpb2.StateCode_HEALTHY s.sched.Start() log.Println("start scheduler ...") return s.tick.Start() diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go index 66cc49f7136d7761efe833850f2fcd01af4dbb78..4d9b03cc70774da1aaf9c0f2e734b03dcd23e387 100644 --- a/internal/proxyservice/proxyservice.go +++ b/internal/proxyservice/proxyservice.go @@ -5,8 +5,6 @@ import ( "math/rand" "time" - "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -21,8 +19,7 @@ type ServiceImpl struct { //subStates *internalpb2.ComponentStates - dataServiceClient *dataservice.Client - nodeStartParams []*commonpb.KeyValuePair + nodeStartParams []*commonpb.KeyValuePair ctx context.Context cancel context.CancelFunc diff --git a/internal/querynode/param_table.go b/internal/querynode/param_table.go index c8c28faf8b36ffcc2347c1ce22bdc3ec33205578..fa56ec2306ef3833577387659230c1f689b9064f 100644 --- a/internal/querynode/param_table.go +++ b/internal/querynode/param_table.go @@ -6,7 +6,6 @@ import ( "strconv" "sync" - "github.com/zilliztech/milvus-distributed/internal/util/funcutil" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -17,7 +16,6 @@ type ParamTable struct { ETCDAddress string MetaRootPath string WriteNodeSegKvSubPath string - IndexBuilderAddress string QueryNodeIP string QueryNodePort int64 @@ -75,11 +73,6 @@ func (p *ParamTable) Init() { panic(err) } - err = p.LoadYaml("milvus.yaml") - if err != nil { - panic(err) - } - queryNodeIDStr := os.Getenv("QUERY_NODE_ID") if queryNodeIDStr == "" { queryNodeIDList := p.QueryNodeIDList() @@ -90,18 +83,6 @@ func (p *ParamTable) Init() { } } - queryNodeIP := os.Getenv("QUERY_NODE_IP") - if queryNodeIP == "" { - p.QueryNodeIP = "localhost" - } else { - p.QueryNodeIP = queryNodeIP - } - p.QueryNodePort = int64(funcutil.GetAvailablePort()) - - err = p.LoadYaml("advanced/common.yaml") - if err != nil { - panic(err) - } err = p.Save("_queryNodeID", queryNodeIDStr) if err != nil { panic(err) @@ -122,7 +103,6 @@ func (p *ParamTable) Init() { p.initETCDAddress() p.initMetaRootPath() p.initWriteNodeSegKvSubPath() - p.initIndexBuilderAddress() p.initGracefulTime() p.initMsgChannelSubName() @@ -234,14 +214,6 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = url } -func (p *ParamTable) initIndexBuilderAddress() { - ret, err := p.Load("_IndexBuilderAddress") - if err != nil { - panic(err) - } - p.IndexBuilderAddress = ret -} - func (p *ParamTable) initInsertChannelRange() { insertChannelRange, err := p.Load("msgChannel.channelRange.insert") if err != nil { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 68ef6c2e128e9620df4353bc9c1d1c2173d9ba08..14f35bff27d5c0c40621a2588d7a7b63d0d9958e 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -48,7 +48,7 @@ type QueryNode struct { queryNodeLoopCtx context.Context queryNodeLoopCancel context.CancelFunc - QueryNodeID uint64 + QueryNodeID UniqueID stateCode atomic.Value replica collectionReplica @@ -72,7 +72,7 @@ type QueryNode struct { msFactory msgstream.Factory } -func NewQueryNode(ctx context.Context, queryNodeID uint64, factory msgstream.Factory) *QueryNode { +func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode { ctx1, cancel := context.WithCancel(ctx) node := &QueryNode{ queryNodeLoopCtx: ctx1, @@ -88,7 +88,7 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, factory msgstream.Fac } node.replica = newCollectionReplicaImpl() - node.stateCode.Store(internalpb2.StateCode_INITIALIZING) + node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return node } @@ -107,13 +107,9 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer } node.replica = newCollectionReplicaImpl() - node.stateCode.Store(internalpb2.StateCode_INITIALIZING) - return node -} + node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) -// TODO: delete this and call node.Init() -func Init() { - Params.Init() + return node } func (node *QueryNode) Init() error { @@ -193,14 +189,12 @@ func (node *QueryNode) Start() error { //go node.metaService.start() go node.loadService.start() go node.statsService.start() - - node.stateCode.Store(internalpb2.StateCode_HEALTHY) - <-node.queryNodeLoopCtx.Done() + node.UpdateStateCode(internalpb2.StateCode_HEALTHY) return nil } func (node *QueryNode) Stop() error { - node.stateCode.Store(internalpb2.StateCode_ABNORMAL) + node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) node.queryNodeLoopCancel() // free collectionReplica @@ -225,6 +219,10 @@ func (node *QueryNode) Stop() error { return nil } +func (node *QueryNode) UpdateStateCode(code internalpb2.StateCode) { + node.stateCode.Store(code) +} + func (node *QueryNode) SetMasterService(master MasterServiceInterface) error { if master == nil { return errors.New("null master service interface") diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index bb9655f76185249a7b908eb2def6df7fcf8abde2..6f830a6d599776448d4f059b0bffd40249cd92ec 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -26,12 +26,15 @@ const defaultPartitionID = UniqueID(2021) type queryServiceMock struct{} func setup() { + os.Setenv("QUERY_NODE_ID", "1") Params.Init() + //Params.QueryNodeID = 1 Params.initQueryTimeTickChannelName() Params.initSearchResultChannelNames() Params.initStatsChannelName() Params.initSearchChannelNames() Params.MetaRootPath = "/etcd/test/root/querynode" + } func genTestCollectionMeta(collectionID UniqueID, isBinary bool) *etcdpb.CollectionInfo { @@ -160,7 +163,7 @@ func newQueryNodeMock() *QueryNode { } msFactory := pulsarms.NewFactory() - svr := NewQueryNode(ctx, 0, msFactory) + svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory) err := svr.SetQueryService(&queryServiceMock{}) if err != nil { panic(err) @@ -208,5 +211,6 @@ func TestMain(m *testing.M) { func TestQueryNode_Start(t *testing.T) { localNode := newQueryNodeMock() localNode.Start() + <-localNode.queryNodeLoopCtx.Done() localNode.Stop() } diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index e2144a421e6a0a891178123eea590f4b3a6722ba..392da166e23acd89d0809d1a181e87fe7dcb9c3d 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -13,7 +13,6 @@ type ParamTable struct { paramtable.BaseTable Address string - Port int QueryServiceID UniqueID // stats @@ -42,7 +41,6 @@ func (p *ParamTable) Init() { p.initStatsChannelName() p.initTimeTickChannelName() p.initQueryServiceAddress() - p.initPort() }) } @@ -70,7 +68,3 @@ func (p *ParamTable) initQueryServiceAddress() { } p.Address = url } - -func (p *ParamTable) initPort() { - p.Port = p.ParseInt("queryService.port") -} diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 9396ba8ecc104bcbe535c7451b224c9e76d9b7cd..76dfa6ca7bbc60d9a7cd56f361f3c08731bd426c 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -67,33 +67,24 @@ type QueryService struct { } func (qs *QueryService) Init() error { - Params.Init() - qs.isInit.Store(true) return nil } func (qs *QueryService) Start() error { - isInit := qs.isInit.Load().(bool) - - switch { - case !isInit: - return errors.New("call start before init") - case qs.dataServiceClient == nil: - return errors.New("dataService Client not set") - case qs.masterServiceClient == nil: - return errors.New("masterService Client not set") - } - - qs.stateCode.Store(internalpb2.StateCode_HEALTHY) + qs.UpdateStateCode(internalpb2.StateCode_HEALTHY) return nil } func (qs *QueryService) Stop() error { qs.loopCancel() - qs.stateCode.Store(internalpb2.StateCode_ABNORMAL) + qs.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return nil } +func (qs *QueryService) UpdateStateCode(code internalpb2.StateCode) { + qs.stateCode.Store(code) +} + func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) { serviceComponentInfo := &internalpb2.ComponentInfo{ NodeID: Params.QueryServiceID, @@ -624,8 +615,7 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ qcMutex: &sync.Mutex{}, msFactory: factory, } - service.stateCode.Store(internalpb2.StateCode_INITIALIZING) - service.isInit.Store(false) + service.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return service, nil } diff --git a/internal/util/funcutil/func.go b/internal/util/funcutil/func.go index 9db5c7e74f42cb259678eacbf7bb975e1e95bd1a..20b2f73c233adf19a1c62c8df47a50579714a8b3 100644 --- a/internal/util/funcutil/func.go +++ b/internal/util/funcutil/func.go @@ -37,7 +37,7 @@ func GetLocalIP() string { return ipv4.LocalIP() } -func WaitForComponentReady(service StateComponent, serviceName string, attempts int, sleep time.Duration) error { +func WaitForComponentStates(service StateComponent, serviceName string, states []internalpb2.StateCode, attempts int, sleep time.Duration) error { checkFunc := func() error { resp, err := service.GetComponentStates() if err != nil { @@ -48,18 +48,32 @@ func WaitForComponentReady(service StateComponent, serviceName string, attempts return errors.New(resp.Status.Reason) } - if resp.State.StateCode != internalpb2.StateCode_HEALTHY { - return errors.New("") + meet := false + for _, state := range states { + if resp.State.StateCode == state { + meet = true + break + } + } + if !meet { + msg := fmt.Sprintf("WaitForComponentStates, not meet, %s current state:%d", serviceName, resp.State.StateCode) + return errors.New(msg) } - return nil } - err := retry.Retry(attempts, sleep, checkFunc) - if err != nil { - errMsg := fmt.Sprintf("ProxyNode wait for %s ready failed", serviceName) - return errors.New(errMsg) - } - return nil + return retry.Retry(attempts, sleep, checkFunc) +} + +func WaitForComponentInitOrHealthy(service StateComponent, serviceName string, attempts int, sleep time.Duration) error { + return WaitForComponentStates(service, serviceName, []internalpb2.StateCode{internalpb2.StateCode_INITIALIZING, internalpb2.StateCode_HEALTHY}, attempts, sleep) +} + +func WaitForComponentInit(service StateComponent, serviceName string, attempts int, sleep time.Duration) error { + return WaitForComponentStates(service, serviceName, []internalpb2.StateCode{internalpb2.StateCode_INITIALIZING}, attempts, sleep) +} + +func WaitForComponentHealthy(service StateComponent, serviceName string, attempts int, sleep time.Duration) error { + return WaitForComponentStates(service, serviceName, []internalpb2.StateCode{internalpb2.StateCode_HEALTHY}, attempts, sleep) } func ParseIndexParamsMap(mStr string) (map[string]string, error) { diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go index 64a201ca254376825ce99e5d1f6dc680bb519eba..6fa1689a95af62b12fb3d5132c1827e134229e8c 100644 --- a/internal/util/paramtable/paramtable.go +++ b/internal/util/paramtable/paramtable.go @@ -145,6 +145,23 @@ func (gp *BaseTable) tryloadFromEnv() { panic(err) } + proxyServiceAddress := os.Getenv("PROXY_SERVICE_ADDRESS") + if proxyServiceAddress == "" { + addr, err := gp.Load("proxyService.address") + if err != nil { + panic(err) + } + proxyServicePort, err := gp.Load("proxyService.port") + if err != nil { + panic(err) + } + proxyServiceAddress = addr + ":" + proxyServicePort + } + err = gp.Save("_PROXY_SERVICE_ADDRESS", proxyServiceAddress) + if err != nil { + panic(err) + } + indexBuilderAddress := os.Getenv("INDEX_SERVICE_ADDRESS") if indexBuilderAddress == "" { indexBuilderHost, err := gp.Load("indexBuilder.address") diff --git a/scripts/build_docker.sh b/scripts/build_docker.sh deleted file mode 100755 index c3abc2619f2a160bc866281b54c507881be34e56..0000000000000000000000000000000000000000 --- a/scripts/build_docker.sh +++ /dev/null @@ -1,11 +0,0 @@ -cd ../build/docker/deploy/ - -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 master -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 proxyservice -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 proxynode -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 indexservice -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 indexnode -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 queryservice -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 dataservice -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 querynode -docker-compose build --build-arg https_proxy=http://wakanda:Fantast1c@192.168.2.28:3339 datanode diff --git a/scripts/start.sh b/scripts/start.sh index b3d2b27086400830cd4ebbc8ea51675d13b4fc86..1bc2c1d432226bbc6d773faca018f8d590eb0feb 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -3,6 +3,12 @@ cd .. echo "starting master" nohup ./bin/masterservice > ~/masterservice.out 2>&1 & +echo "starting dataservice" +nohup ./bin/dataservice > ~/dataservice.out 2>&1 & + +echo "starting datanode" +nohup ./bin/datanode > ~/datanode.out 2>&1 & + echo "starting proxyservice" nohup ./bin/proxyservice > ~/proxyservice.out 2>&1 & @@ -20,12 +26,6 @@ echo "starting querynode2" export QUERY_NODE_ID=2 nohup ./bin/querynode > ~/querynode2.out 2>&1 & -echo "starting dataservice" -nohup ./bin/dataservice > ~/dataservice.out 2>&1 & - -echo "starting datanode" -nohup ./bin/datanode > ~/datanode.out 2>&1 & - echo "starting indexservice" nohup ./bin/indexservice > ~/indexservice.out 2>&1 &