diff --git a/internal/master/client.go b/internal/master/client.go new file mode 100644 index 0000000000000000000000000000000000000000..6fc9138dec5c228ad64a6cd21f1c2560aa0de3c5 --- /dev/null +++ b/internal/master/client.go @@ -0,0 +1,101 @@ +package master + +import ( + "time" + + buildindexclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client" + "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" + writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client" +) + +type WriteNodeClient interface { + FlushSegment(segmentID UniqueID) error + DescribeSegment(segmentID UniqueID) (*writerclient.SegmentDescription, error) + GetInsertBinlogPaths(segmentID UniqueID) (map[UniqueID][]string, error) +} + +type MockWriteNodeClient struct { + segmentID UniqueID + flushTime time.Time +} + +func (m *MockWriteNodeClient) FlushSegment(segmentID UniqueID) error { + m.flushTime = time.Now() + m.segmentID = segmentID + return nil +} + +func (m *MockWriteNodeClient) DescribeSegment(segmentID UniqueID) (*writerclient.SegmentDescription, error) { + now := time.Now() + if now.Sub(m.flushTime).Seconds() > 3 { + return &writerclient.SegmentDescription{ + SegmentID: segmentID, + IsClosed: true, + OpenTime: 0, + CloseTime: 1, + }, nil + } + return &writerclient.SegmentDescription{ + SegmentID: segmentID, + IsClosed: false, + OpenTime: 0, + CloseTime: 1, + }, nil +} + +func (m *MockWriteNodeClient) GetInsertBinlogPaths(segmentID UniqueID) (map[UniqueID][]string, error) { + return map[UniqueID][]string{ + 1: {"/binlog/insert/file_1"}, + 100: {"/binlog/insert/file_100"}, + }, nil +} + +type BuildIndexClient interface { + BuildIndexWithoutID(columnDataPaths []string, typeParams map[string]string, indexParams map[string]string) (UniqueID, error) + DescribeIndex(indexID UniqueID) (*buildindexclient.IndexDescription, error) + GetIndexFilePaths(indexID UniqueID) ([]string, error) +} + +type MockBuildIndexClient struct { + buildTime time.Time +} + +func (m *MockBuildIndexClient) BuildIndexWithoutID(columnDataPaths []string, typeParams map[string]string, indexParams map[string]string) (UniqueID, error) { + m.buildTime = time.Now() + return 1, nil +} + +func (m *MockBuildIndexClient) DescribeIndex(indexID UniqueID) (*buildindexclient.IndexDescription, error) { + now := time.Now() + if now.Sub(m.buildTime).Seconds() > 3 { + return &buildindexclient.IndexDescription{ + ID: 1, + Status: indexbuilderpb.IndexStatus_FINISHED, + EnqueueTime: time.Now(), + ScheduleTime: time.Now(), + BuildCompleteTime: time.Now(), + }, nil + } + return &buildindexclient.IndexDescription{ + ID: 1, + Status: indexbuilderpb.IndexStatus_INPROGRESS, + EnqueueTime: time.Now(), + ScheduleTime: time.Now(), + BuildCompleteTime: time.Now(), + }, nil +} + +func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, error) { + return []string{"/binlog/index/file_1", "/binlog/index/file_2", "/binlog/index/file_3"}, nil +} + +type LoadIndexClient interface { + LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error +} + +type MockLoadIndexClient struct { +} + +func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error { + return nil +} diff --git a/internal/master/flush_scheduler.go b/internal/master/flush_scheduler.go new file mode 100644 index 0000000000000000000000000000000000000000..f4aba60d85ba978500bc91bb1a30f62031627c6d --- /dev/null +++ b/internal/master/flush_scheduler.go @@ -0,0 +1,119 @@ +package master + +import ( + "context" + "log" + "time" + + "github.com/zilliztech/milvus-distributed/internal/errors" +) + +type FlushScheduler struct { + client WriteNodeClient + metaTable *metaTable + segmentFlushChan chan UniqueID + segmentDescribeChan chan UniqueID + indexBuilderSch persistenceScheduler + + ctx context.Context + cancel context.CancelFunc +} + +func NewFlushScheduler(ctx context.Context, client WriteNodeClient, metaTable *metaTable, buildScheduler *IndexBuildScheduler) *FlushScheduler { + ctx2, cancel := context.WithCancel(ctx) + + return &FlushScheduler{ + client: client, + metaTable: metaTable, + indexBuilderSch: buildScheduler, + segmentFlushChan: make(chan UniqueID, 100), + segmentDescribeChan: make(chan UniqueID, 100), + ctx: ctx2, + cancel: cancel, + } +} + +func (scheduler *FlushScheduler) schedule(id interface{}) error { + segmentID := id.(UniqueID) + err := scheduler.client.FlushSegment(segmentID) + log.Printf("flush segment %d", segmentID) + if err != nil { + return err + } + + scheduler.segmentDescribeChan <- segmentID + + return nil +} +func (scheduler *FlushScheduler) describe() error { + for { + select { + case <-scheduler.ctx.Done(): + { + log.Printf("broadcast context done, exit") + return errors.New("broadcast done exit") + } + case singleSegmentID := <-scheduler.segmentDescribeChan: + for { + description, err := scheduler.client.DescribeSegment(singleSegmentID) + if err != nil { + return err + } + if description.IsClosed { + log.Printf("flush segment %d is closed", singleSegmentID) + mapData, err := scheduler.client.GetInsertBinlogPaths(singleSegmentID) + if err != nil { + return err + } + for fieldID, data := range mapData { + info := &IndexBuildInfo{ + segmentID: singleSegmentID, + fieldID: fieldID, + binlogFilePath: data, + } + err = scheduler.indexBuilderSch.Enqueue(info) + log.Printf("segment %d field %d enqueue build index scheduler", singleSegmentID, fieldID) + if err != nil { + return err + } + } + //TODO: Save data to meta table + log.Printf("flush segment %d finished", singleSegmentID) + break + } + time.Sleep(1 * time.Second) + } + } + } + +} + +func (scheduler *FlushScheduler) scheduleLoop() { + for { + select { + case id := <-scheduler.segmentFlushChan: + err := scheduler.schedule(id) + if err != nil { + log.Println(err) + } + case <-scheduler.ctx.Done(): + log.Print("server is closed, exit flush scheduler loop") + return + } + } +} + +func (scheduler *FlushScheduler) Enqueue(id interface{}) error { + scheduler.segmentFlushChan <- id.(UniqueID) + return nil +} + +func (scheduler *FlushScheduler) Start() error { + go scheduler.scheduleLoop() + go scheduler.describe() + return nil +} + +func (scheduler *FlushScheduler) Close() { + scheduler.cancel() +} diff --git a/internal/master/index_builder_scheduler.go b/internal/master/index_builder_scheduler.go new file mode 100644 index 0000000000000000000000000000000000000000..1600d739927d43543931729603646040c44cf467 --- /dev/null +++ b/internal/master/index_builder_scheduler.go @@ -0,0 +1,147 @@ +package master + +import ( + "context" + "log" + "time" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" +) + +type IndexBuildInfo struct { + segmentID UniqueID + fieldID UniqueID + binlogFilePath []string +} +type IndexBuildChannelInfo struct { + id UniqueID + info *IndexBuildInfo +} + +type IndexBuildScheduler struct { + client BuildIndexClient + metaTable *metaTable + indexBuildChan chan *IndexBuildInfo + indexLoadSch persistenceScheduler + indexDescribeID chan UniqueID + indexDescribe chan *IndexBuildChannelInfo + + ctx context.Context + cancel context.CancelFunc +} + +func NewIndexBuildScheduler(ctx context.Context, client BuildIndexClient, metaTable *metaTable, indexLoadScheduler *IndexLoadScheduler) *IndexBuildScheduler { + ctx2, cancel := context.WithCancel(ctx) + + return &IndexBuildScheduler{ + client: client, + metaTable: metaTable, + indexLoadSch: indexLoadScheduler, + indexBuildChan: make(chan *IndexBuildInfo, 100), + indexDescribe: make(chan *IndexBuildChannelInfo, 100), + ctx: ctx2, + cancel: cancel, + } +} + +func (scheduler *IndexBuildScheduler) schedule(info interface{}) error { + indexBuildInfo := info.(*IndexBuildInfo) + indexID, err := scheduler.client.BuildIndexWithoutID(indexBuildInfo.binlogFilePath, nil, nil) + log.Printf("build index for segment %d field %d", indexBuildInfo.segmentID, indexBuildInfo.fieldID) + if err != nil { + return err + } + scheduler.indexDescribe <- &IndexBuildChannelInfo{ + id: indexID, + info: indexBuildInfo, + } + return nil +} + +func (scheduler *IndexBuildScheduler) describe() error { + for { + select { + case <-scheduler.ctx.Done(): + { + log.Printf("broadcast context done, exit") + return errors.New("broadcast done exit") + } + case channelInfo := <-scheduler.indexDescribe: + indexID := channelInfo.id + indexBuildInfo := channelInfo.info + for { + description, err := scheduler.client.DescribeIndex(channelInfo.id) + if err != nil { + return err + } + if description.Status == indexbuilderpb.IndexStatus_FINISHED { + log.Printf("build index for segment %d field %d is finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID) + filePaths, err := scheduler.client.GetIndexFilePaths(indexID) + if err != nil { + return err + } + + //TODO: remove fileName + var fieldName string + segMeta := scheduler.metaTable.segID2Meta[indexBuildInfo.segmentID] + collMeta := scheduler.metaTable.collID2Meta[segMeta.CollectionID] + if collMeta.Schema != nil { + for _, field := range collMeta.Schema.Fields { + if field.FieldID == indexBuildInfo.fieldID { + fieldName = field.Name + } + } + } + + info := &IndexLoadInfo{ + segmentID: indexBuildInfo.segmentID, + fieldID: indexBuildInfo.fieldID, + fieldName: fieldName, + indexFilePaths: filePaths, + } + //TODO: Save data to meta table + + err = scheduler.indexLoadSch.Enqueue(info) + log.Printf("build index for segment %d field %d enqueue load index", indexBuildInfo.segmentID, indexBuildInfo.fieldID) + if err != nil { + return err + } + log.Printf("build index for segment %d field %d finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID) + } + time.Sleep(1 * time.Second) + } + } + } + +} + +func (scheduler *IndexBuildScheduler) scheduleLoop() { + for { + select { + case info := <-scheduler.indexBuildChan: + err := scheduler.schedule(info) + if err != nil { + log.Println(err) + } + case <-scheduler.ctx.Done(): + log.Print("server is closed, exit index build loop") + return + } + } +} + +func (scheduler *IndexBuildScheduler) Enqueue(info interface{}) error { + scheduler.indexBuildChan <- info.(*IndexBuildInfo) + return nil +} + +func (scheduler *IndexBuildScheduler) Start() error { + go scheduler.scheduleLoop() + go scheduler.describe() + return nil +} + +func (scheduler *IndexBuildScheduler) Close() { + scheduler.cancel() +} diff --git a/internal/master/index_load_scheduler.go b/internal/master/index_load_scheduler.go new file mode 100644 index 0000000000000000000000000000000000000000..3d23cfcc8e41d3ff0113f66ddedc4feafbb0d21f --- /dev/null +++ b/internal/master/index_load_scheduler.go @@ -0,0 +1,74 @@ +package master + +import ( + "context" + "log" +) + +type IndexLoadInfo struct { + segmentID UniqueID + fieldID UniqueID + fieldName string + indexFilePaths []string +} + +type IndexLoadScheduler struct { + indexLoadChan chan *IndexLoadInfo + client LoadIndexClient + metaTable *metaTable + + ctx context.Context + cancel context.CancelFunc +} + +func NewIndexLoadScheduler(ctx context.Context, client LoadIndexClient, metaTable *metaTable) *IndexLoadScheduler { + ctx2, cancel := context.WithCancel(ctx) + indexLoadChan := make(chan *IndexLoadInfo, 100) + + return &IndexLoadScheduler{ + client: client, + metaTable: metaTable, + indexLoadChan: indexLoadChan, + ctx: ctx2, + cancel: cancel, + } +} + +func (scheduler *IndexLoadScheduler) schedule(info interface{}) error { + indexLoadInfo := info.(*IndexLoadInfo) + err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName) + //TODO: Save data to meta table + if err != nil { + return err + } + + return nil +} +func (scheduler *IndexLoadScheduler) scheduleLoop() { + for { + select { + case info := <-scheduler.indexLoadChan: + err := scheduler.schedule(info) + if err != nil { + log.Println(err) + } + case <-scheduler.ctx.Done(): + log.Print("server is closed, exit flush scheduler loop") + return + } + } +} + +func (scheduler *IndexLoadScheduler) Enqueue(info interface{}) error { + scheduler.indexLoadChan <- info.(*IndexLoadInfo) + return nil +} + +func (scheduler *IndexLoadScheduler) Start() error { + go scheduler.scheduleLoop() + return nil +} + +func (scheduler *IndexLoadScheduler) Close() { + scheduler.cancel() +} diff --git a/internal/master/master.go b/internal/master/master.go index 36b2b98c86b4c3d48fca7644822531a4636e119f..bc1111944d2d3abbade74647a617da7abdee9c06 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -44,6 +44,9 @@ type Master struct { kvBase *etcdkv.EtcdKV scheduler *ddRequestScheduler + flushSch *FlushScheduler + indexBuildSch *IndexBuildScheduler + indexLoadSch *IndexLoadScheduler metaTable *metaTable timesSyncMsgProducer *timeSyncMsgProducer @@ -176,15 +179,24 @@ func CreateServer(ctx context.Context) (*Master, error) { m.scheduler.SetDDMsgStream(pulsarDDStream) m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() }) + flushClient := &MockWriteNodeClient{} + buildIndexClient := &MockBuildIndexClient{} + loadIndexClient := &MockLoadIndexClient{} + + m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable) + m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch) + m.flushSch = NewFlushScheduler(ctx, flushClient, m.metaTable, m.indexBuildSch) + m.segmentAssigner = NewSegmentAssigner(ctx, metakv, func() (Timestamp, error) { return m.tsoAllocator.AllocOne() }, proxyTtBarrierWatcher, ) + m.segmentManager, err = NewSegmentManager(ctx, metakv, func() (UniqueID, error) { return m.idAllocator.AllocOne() }, func() (Timestamp, error) { return m.tsoAllocator.AllocOne() }, writeNodeTtBarrierWatcher, - &MockFlushScheduler{}, // todo replace mock with real flush scheduler + m.flushSch, m.segmentAssigner) if err != nil { @@ -283,6 +295,18 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error { if err := s.scheduler.Start(); err != nil { return err } + s.serverLoopWg.Add(1) + if err := s.indexLoadSch.Start(); err != nil { + return err + } + s.serverLoopWg.Add(1) + if err := s.indexBuildSch.Start(); err != nil { + return err + } + s.serverLoopWg.Add(1) + if err := s.flushSch.Start(); err != nil { + return err + } s.serverLoopWg.Add(1) go s.grpcLoop(grpcPort) @@ -305,6 +329,12 @@ func (s *Master) stopServerLoop() { s.serverLoopWg.Done() s.scheduler.Close() s.serverLoopWg.Done() + s.flushSch.Close() + s.serverLoopWg.Done() + s.indexBuildSch.Close() + s.serverLoopWg.Done() + s.indexLoadSch.Close() + s.serverLoopWg.Done() if s.grpcServer != nil { s.grpcServer.GracefulStop() diff --git a/internal/master/persistence_scheduler_test.go b/internal/master/persistence_scheduler_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3c13bd8d65a72b2ec43d099e80b12bdc11acd9a5 --- /dev/null +++ b/internal/master/persistence_scheduler_test.go @@ -0,0 +1,85 @@ +package master + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb" + "go.etcd.io/etcd/clientv3" +) + +func TestPersistenceScheduler(t *testing.T) { + //Init environment Params + Init() + + ctx := context.Background() + + //Init client, use Mock instead + flushClient := &MockWriteNodeClient{} + buildIndexClient := &MockBuildIndexClient{} + loadIndexClient := &MockLoadIndexClient{} + + etcdAddr := Params.EtcdAddress + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root") + + _, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix()) + assert.Nil(t, err) + + meta, err := NewMetaTable(etcdKV) + assert.Nil(t, err) + defer meta.client.Close() + + //Init scheduler + indexLoadSch := NewIndexLoadScheduler(ctx, loadIndexClient, meta) + indexBuildSch := NewIndexBuildScheduler(ctx, buildIndexClient, meta, indexLoadSch) + flushSch := NewFlushScheduler(ctx, flushClient, meta, indexBuildSch) + + //scheduler start + err = indexLoadSch.Start() + assert.Nil(t, err) + defer indexLoadSch.Close() + + err = indexBuildSch.Start() + assert.Nil(t, err) + defer indexBuildSch.Close() + + err = flushSch.Start() + assert.Nil(t, err) + defer flushSch.Close() + + //start from flush scheduler + err = flushSch.Enqueue(UniqueID(1)) + assert.Nil(t, err) + //wait flush segment request sent to write node + time.Sleep(100 * time.Millisecond) + segDes, err := flushClient.DescribeSegment(UniqueID(1)) + assert.Nil(t, err) + assert.Equal(t, false, segDes.IsClosed) + + //wait flush to finish + time.Sleep(3 * time.Second) + + segDes, err = flushClient.DescribeSegment(UniqueID(1)) + assert.Nil(t, err) + assert.Equal(t, UniqueID(1), segDes.SegmentID) + assert.Equal(t, true, segDes.IsClosed) + + //wait flush segment request sent to build index node + time.Sleep(100 * time.Microsecond) + idxDes, err := buildIndexClient.DescribeIndex(UniqueID(1)) + assert.Nil(t, err) + assert.Equal(t, indexbuilderpb.IndexStatus_INPROGRESS, idxDes.Status) + + //wait build index to finish + time.Sleep(3 * time.Second) + + idxDes, err = buildIndexClient.DescribeIndex(UniqueID(1)) + assert.Nil(t, err) + assert.Equal(t, indexbuilderpb.IndexStatus_FINISHED, idxDes.Status) + +}