diff --git a/cmd/dataservice/main.go b/cmd/dataservice/main.go index 095bf92ac88c5db2d318c533231bededc7424188..d7d3178361a208dc8b9fab973c2cb28aa203fe96 100644 --- a/cmd/dataservice/main.go +++ b/cmd/dataservice/main.go @@ -2,72 +2,25 @@ package main import ( "context" - "fmt" "log" "os" "os/signal" "syscall" - "time" - ms "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - - "github.com/zilliztech/milvus-distributed/internal/masterservice" - - "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/cmd/distributed/components" ) func main() { ctx, cancel := context.WithCancel(context.Background()) - service := dataservice.NewGrpcService(ctx) - - masterservice.Params.Init() - client, err := ms.NewGrpcClient(fmt.Sprintf("%s:%d", masterservice.Params.Address, masterservice.Params.Port), 30*time.Second) + svr, err := components.NewDataService(ctx) if err != nil { panic(err) } - log.Println("master client create complete") - if err = client.Init(); err != nil { - panic(err) - } - if err = client.Start(); err != nil { + if err = svr.Run(); err != nil { panic(err) } - service.SetMasterClient(client) - ticker := time.NewTicker(500 * time.Millisecond) - tctx, tcancel := context.WithTimeout(ctx, 30*time.Second) - defer func() { - if err = client.Stop(); err != nil { - panic(err) - } - ticker.Stop() - tcancel() - }() - for { - var states *internalpb2.ComponentStates - select { - case <-ticker.C: - states, err = client.GetComponentStates() - if err != nil { - continue - } - case <-tctx.Done(): - panic("master timeout") - } - if states.State.StateCode == internalpb2.StateCode_INITIALIZING || states.State.StateCode == internalpb2.StateCode_HEALTHY { - break - } - } - - if err = service.Init(); err != nil { - panic(err) - } - if err = service.Start(); err != nil { - panic(err) - } sc := make(chan os.Signal) signal.Notify(sc, syscall.SIGHUP, @@ -76,7 +29,7 @@ func main() { syscall.SIGQUIT) <-sc cancel() - if err = service.Stop(); err != nil { + if err := svr.Stop(); err != nil { panic(err) } log.Println("shut down data service") diff --git a/cmd/distributed/components/data_service.go b/cmd/distributed/components/data_service.go new file mode 100644 index 0000000000000000000000000000000000000000..c1b2ee4f205c0640938563517a55e19bf17d6f16 --- /dev/null +++ b/cmd/distributed/components/data_service.go @@ -0,0 +1,83 @@ +package components + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + ms "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + + "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" +) + +type DataService struct { + ctx context.Context + server *dataservice.Service + masterClient *ms.GrpcClient +} + +func NewDataService(ctx context.Context) (*DataService, error) { + service := dataservice.NewGrpcService(ctx) + + masterservice.Params.Init() + client, err := ms.NewGrpcClient(fmt.Sprintf("%s:%d", masterservice.Params.Address, masterservice.Params.Port), 30*time.Second) + if err != nil { + return nil, err + } + log.Println("master client create complete") + if err = client.Init(); err != nil { + return nil, err + } + if err = client.Start(); err != nil { + return nil, err + } + ticker := time.NewTicker(500 * time.Millisecond) + tctx, tcancel := context.WithTimeout(ctx, 30*time.Second) + defer func() { + ticker.Stop() + tcancel() + }() + + for { + var states *internalpb2.ComponentStates + select { + case <-ticker.C: + states, err = client.GetComponentStates() + if err != nil { + continue + } + case <-tctx.Done(): + return nil, errors.New("master client connect timeout") + } + if states.State.StateCode == internalpb2.StateCode_INITIALIZING || states.State.StateCode == internalpb2.StateCode_HEALTHY { + break + } + } + service.SetMasterClient(client) + + return &DataService{ + ctx: ctx, + server: service, + masterClient: client, + }, nil +} + +func (s *DataService) Run() error { + if err := s.server.Init(); err != nil { + return err + } + if err := s.server.Start(); err != nil { + return err + } + return nil +} + +func (s *DataService) Stop() error { + _ = s.masterClient.Stop() + _ = s.server.Stop() + return nil +} diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index 330291a4fd11485f234a3653b473fc746d1d842f..d913abf9136a70e85403677dfc678ff77a9bb74a 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -7,8 +7,6 @@ import ( "net" "time" - "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/dataservice" @@ -21,10 +19,9 @@ import ( ) type Service struct { - server *dataservice.Server - ctx context.Context - grpcServer *grpc.Server - masterClient *masterservice.GrpcClient + server *dataservice.Server + ctx context.Context + grpcServer *grpc.Server } func NewGrpcService(ctx context.Context) *Service { @@ -49,18 +46,17 @@ func (s *Service) Init() error { datapb.RegisterDataServiceServer(s.grpcServer, s) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port)) if err != nil { - log.Fatal(err.Error()) return nil } c := make(chan struct{}) go func() { if err2 := s.grpcServer.Serve(lis); err2 != nil { - log.Println(err.Error()) close(c) err = err2 } }() timer := time.NewTimer(1 * time.Second) + defer timer.Stop() select { case <-timer.C: break diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 5919b894635ed14f26f848b667411c55c639eae6..373f00c0b4dfde335a6464060ea3f4cb78bb8f46 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -414,10 +414,13 @@ func (t *ShowPartitionReqTask) IgnoreTimeStamp() bool { } func (t *ShowPartitionReqTask) Execute() error { - coll, err := t.core.MetaTable.GetCollectionByName(t.Req.CollectionName) + coll, err := t.core.MetaTable.GetCollectionByID(t.Req.CollectionID) if err != nil { return err } + if coll.Schema.Name != t.Req.CollectionName { + return errors.Errorf("collection %s not exist", t.Req.CollectionName) + } for _, partID := range coll.PartitionIDs { partMeta, err := t.core.MetaTable.GetPartitionByID(partID) if err != nil {