Skip to content
Snippets Groups Projects
Commit 854accf9 authored by neza2017's avatar neza2017 Committed by yefu.chen
Browse files

Remove master and writenode


Signed-off-by: default avatarneza2017 <yefu.chen@zilliz.com>
parent d5346e13
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 5877 deletions
package master
import (
"github.com/zilliztech/milvus-distributed/internal/kv"
)
type IDAllocator interface {
Alloc(count uint32) (UniqueID, UniqueID, error)
AllocOne() (UniqueID, error)
UpdateID() error
}
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalIDAllocator struct {
allocator Allocator
}
func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator {
return &GlobalIDAllocator{
allocator: NewGlobalTSOAllocator(key, base),
}
}
// Initialize will initialize the created global TSO allocator.
func (gia *GlobalIDAllocator) Initialize() error {
return gia.allocator.Initialize()
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(count)
if err != nil {
return 0, 0, err
}
idStart := UniqueID(timestamp)
idEnd := idStart + int64(count)
return idStart, idEnd, nil
}
func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(1)
if err != nil {
return 0, err
}
idStart := UniqueID(timestamp)
return idStart, nil
}
func (gia *GlobalIDAllocator) UpdateID() error {
return gia.allocator.UpdateTSO()
}
package master
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
)
type IndexBuildInfo struct {
segmentID UniqueID
fieldID UniqueID
binlogFilePath []string
}
type IndexBuildChannelInfo struct {
id UniqueID
info *IndexBuildInfo
indexParams []*commonpb.KeyValuePair
}
type IndexBuildScheduler struct {
client BuildIndexClient
metaTable *metaTable
indexBuildChan chan *IndexBuildInfo
indexLoadSch persistenceScheduler
indexDescribeID chan UniqueID
indexDescribe chan *IndexBuildChannelInfo
ctx context.Context
cancel context.CancelFunc
}
func NewIndexBuildScheduler(ctx context.Context, client BuildIndexClient, metaTable *metaTable, indexLoadScheduler *IndexLoadScheduler) *IndexBuildScheduler {
ctx2, cancel := context.WithCancel(ctx)
return &IndexBuildScheduler{
client: client,
metaTable: metaTable,
indexLoadSch: indexLoadScheduler,
indexBuildChan: make(chan *IndexBuildInfo, 100),
indexDescribe: make(chan *IndexBuildChannelInfo, 100),
ctx: ctx2,
cancel: cancel,
}
}
func (scheduler *IndexBuildScheduler) schedule(info interface{}) error {
indexBuildInfo := info.(*IndexBuildInfo)
segMeta, err := scheduler.metaTable.GetSegmentByID(indexBuildInfo.segmentID)
if err != nil {
return err
}
// parse index params
typeParams, err := scheduler.metaTable.GetFieldTypeParams(segMeta.CollectionID, indexBuildInfo.fieldID)
if err != nil {
return err
}
indexParams, err := scheduler.metaTable.GetFieldIndexParams(segMeta.CollectionID, indexBuildInfo.fieldID)
if err != nil {
return err
}
typeParamsMap := make(map[string]string)
indexParamsMap := make(map[string]string)
for _, kv := range typeParams {
typeParamsMap[kv.Key] = kv.Value
}
for _, kv := range indexParams {
indexParamsMap[kv.Key] = kv.Value
}
parseMap := func(mStr string) (map[string]string, error) {
buffer := make(map[string]interface{})
err := json.Unmarshal([]byte(mStr), &buffer)
if err != nil {
return nil, errors.New("Unmarshal params failed")
}
ret := make(map[string]string)
for key, value := range buffer {
valueStr := fmt.Sprintf("%v", value)
ret[key] = valueStr
}
return ret, nil
}
var typeParamsKV []*commonpb.KeyValuePair
for key := range typeParamsMap {
if key == "params" {
mapParams, err := parseMap(typeParamsMap[key])
if err != nil {
log.Println("parse params error: ", err)
}
for pk, pv := range mapParams {
typeParamsKV = append(typeParamsKV, &commonpb.KeyValuePair{
Key: pk,
Value: pv,
})
}
} else {
typeParamsKV = append(typeParamsKV, &commonpb.KeyValuePair{
Key: key,
Value: typeParamsMap[key],
})
}
}
var indexParamsKV []*commonpb.KeyValuePair
for key := range indexParamsMap {
if key == "params" {
mapParams, err := parseMap(indexParamsMap[key])
if err != nil {
log.Println("parse params error: ", err)
}
for pk, pv := range mapParams {
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
Key: pk,
Value: pv,
})
}
} else {
indexParamsKV = append(indexParamsKV, &commonpb.KeyValuePair{
Key: key,
Value: indexParamsMap[key],
})
}
}
requset := &indexpb.BuildIndexRequest{
DataPaths: indexBuildInfo.binlogFilePath,
TypeParams: typeParamsKV,
IndexParams: indexParamsKV,
}
indexResp, err := scheduler.client.BuildIndex(requset)
if err != nil {
log.Printf("build index for segment %d field %d, failed:%s", indexBuildInfo.segmentID, indexBuildInfo.fieldID, err.Error())
return err
}
indexID := indexResp.IndexID
err = scheduler.metaTable.AddFieldIndexMeta(&etcdpb.FieldIndexMeta{
SegmentID: indexBuildInfo.segmentID,
FieldID: indexBuildInfo.fieldID,
IndexID: indexID,
IndexParams: indexParams,
State: commonpb.IndexState_NONE,
})
if err != nil {
log.Printf("WARNING: " + err.Error())
//return err
}
scheduler.indexDescribe <- &IndexBuildChannelInfo{
id: indexID,
info: indexBuildInfo,
indexParams: indexParams,
}
return nil
}
func (scheduler *IndexBuildScheduler) describe() error {
for {
select {
case <-scheduler.ctx.Done():
{
log.Printf("broadcast context done, exit")
return errors.New("broadcast done exit")
}
case channelInfo := <-scheduler.indexDescribe:
indexID := channelInfo.id
indexBuildInfo := channelInfo.info
for {
indexIDs := []UniqueID{channelInfo.id}
request := &indexpb.IndexStatesRequest{
IndexIDs: indexIDs,
}
description, err := scheduler.client.GetIndexStates(request)
if err != nil {
return err
}
if description.States[0].State == commonpb.IndexState_FINISHED {
log.Printf("build index for segment %d field %d is finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
request := &indexpb.IndexFilePathsRequest{
IndexIDs: indexIDs,
}
response, err := scheduler.client.GetIndexFilePaths(request)
if err != nil {
return err
}
var filePathsInfos [][]string
for _, indexID := range indexIDs {
for _, filePathInfo := range response.FilePaths {
if indexID == filePathInfo.IndexID {
filePathsInfos = append(filePathsInfos, filePathInfo.IndexFilePaths)
break
}
}
}
filePaths := filePathsInfos[0]
//TODO: remove fileName
var fieldName string
segMeta := scheduler.metaTable.segID2Meta[indexBuildInfo.segmentID]
collMeta := scheduler.metaTable.collID2Meta[segMeta.CollectionID]
if collMeta.Schema != nil {
for _, field := range collMeta.Schema.Fields {
if field.FieldID == indexBuildInfo.fieldID {
fieldName = field.Name
}
}
}
info := &IndexLoadInfo{
segmentID: indexBuildInfo.segmentID,
fieldID: indexBuildInfo.fieldID,
fieldName: fieldName,
indexFilePaths: filePaths,
indexParams: channelInfo.indexParams,
}
// Save data to meta table
err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{
SegmentID: indexBuildInfo.segmentID,
FieldID: indexBuildInfo.fieldID,
IndexID: indexID,
IndexParams: channelInfo.indexParams,
State: commonpb.IndexState_FINISHED,
IndexFilePaths: filePaths,
})
if err != nil {
fmt.Println("indexbuilder scheduler updateFiledIndexMetaFailed", indexBuildInfo.segmentID)
return err
}
err = scheduler.indexLoadSch.Enqueue(info)
log.Printf("build index for segment %d field %d enqueue load index", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
if err != nil {
return err
}
log.Printf("build index for segment %d field %d finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
break
} else {
// save status to meta table
err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{
SegmentID: indexBuildInfo.segmentID,
FieldID: indexBuildInfo.fieldID,
IndexID: indexID,
IndexParams: channelInfo.indexParams,
State: description.States[0].State,
})
if err != nil {
return err
}
}
time.Sleep(1 * time.Second)
}
}
}
}
func (scheduler *IndexBuildScheduler) scheduleLoop() {
for {
select {
case info := <-scheduler.indexBuildChan:
err := scheduler.schedule(info)
if err != nil {
log.Println(err)
}
case <-scheduler.ctx.Done():
log.Print("server is closed, exit index build loop")
return
}
}
}
func (scheduler *IndexBuildScheduler) Enqueue(info interface{}) error {
scheduler.indexBuildChan <- info.(*IndexBuildInfo)
return nil
}
func (scheduler *IndexBuildScheduler) Start() error {
go scheduler.scheduleLoop()
go scheduler.describe()
return nil
}
func (scheduler *IndexBuildScheduler) Close() {
scheduler.cancel()
}
package master
import (
"context"
"log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type IndexLoadInfo struct {
segmentID UniqueID
fieldID UniqueID
fieldName string
indexParams []*commonpb.KeyValuePair
indexFilePaths []string
}
type IndexLoadScheduler struct {
indexLoadChan chan *IndexLoadInfo
client LoadIndexClient
metaTable *metaTable
ctx context.Context
cancel context.CancelFunc
}
func NewIndexLoadScheduler(ctx context.Context, client LoadIndexClient, metaTable *metaTable) *IndexLoadScheduler {
ctx2, cancel := context.WithCancel(ctx)
indexLoadChan := make(chan *IndexLoadInfo, 100)
return &IndexLoadScheduler{
client: client,
metaTable: metaTable,
indexLoadChan: indexLoadChan,
ctx: ctx2,
cancel: cancel,
}
}
func (scheduler *IndexLoadScheduler) schedule(info interface{}) error {
indexLoadInfo := info.(*IndexLoadInfo)
indexParams := make(map[string]string)
for _, kv := range indexLoadInfo.indexParams {
indexParams[kv.Key] = kv.Value
}
err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName, indexParams)
//TODO: Save data to meta table
if err != nil {
return err
}
return nil
}
func (scheduler *IndexLoadScheduler) scheduleLoop() {
for {
select {
case info := <-scheduler.indexLoadChan:
err := scheduler.schedule(info)
if err != nil {
log.Println(err)
}
case <-scheduler.ctx.Done():
log.Print("server is closed, exit flush scheduler loop")
return
}
}
}
func (scheduler *IndexLoadScheduler) Enqueue(info interface{}) error {
scheduler.indexLoadChan <- info.(*IndexLoadInfo)
return nil
}
func (scheduler *IndexLoadScheduler) Start() error {
go scheduler.scheduleLoop()
return nil
}
func (scheduler *IndexLoadScheduler) Close() {
scheduler.cancel()
}
package master
import (
"fmt"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
)
type createIndexTask struct {
baseTask
req *milvuspb.CreateIndexRequest
indexBuildScheduler *IndexBuildScheduler
indexLoadScheduler *IndexLoadScheduler
segManager SegmentManager
}
func (task *createIndexTask) Type() commonpb.MsgType {
return commonpb.MsgType_kCreateIndex
}
func (task *createIndexTask) Ts() (Timestamp, error) {
return task.req.Base.Timestamp, nil
}
func (task *createIndexTask) Execute() error {
collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName)
if err != nil {
return err
}
var fieldID int64 = -1
for _, fieldSchema := range collMeta.Schema.Fields {
if fieldSchema.Name == task.req.FieldName {
fieldID = fieldSchema.FieldID
break
}
}
if fieldID == -1 {
return fmt.Errorf("can not find field name %s", task.req.FieldName)
}
// pre checks
isIndexable, err := task.mt.IsIndexable(collMeta.ID, fieldID)
if err != nil {
return err
}
if !isIndexable {
return fmt.Errorf("field %s is not vector", task.req.FieldName)
}
// modify schema
if err := task.mt.UpdateFieldIndexParams(task.req.CollectionName, task.req.FieldName, task.req.ExtraParams); err != nil {
return err
}
// check if closed segment has the same index build history
for _, segID := range collMeta.SegmentIDs {
segMeta, err := task.mt.GetSegmentByID(segID)
if err != nil {
return err
}
if segMeta.CloseTime == 0 {
continue
}
hasIndexMeta, err := task.mt.HasFieldIndexMeta(segID, fieldID, task.req.ExtraParams)
if err != nil {
return err
}
if hasIndexMeta {
// load index
indexMeta, err := task.mt.GetFieldIndexMeta(segID, fieldID, task.req.ExtraParams)
if err != nil {
return err
}
err = task.indexLoadScheduler.Enqueue(&IndexLoadInfo{
segmentID: segID,
fieldID: fieldID,
fieldName: task.req.FieldName,
indexFilePaths: indexMeta.IndexFilePaths,
indexParams: indexMeta.IndexParams,
})
if err != nil {
return err
}
} else {
// create index
for _, kv := range segMeta.BinlogFilePaths {
if kv.FieldID != fieldID {
continue
}
err := task.indexBuildScheduler.Enqueue(&IndexBuildInfo{
segmentID: segID,
fieldID: fieldID,
binlogFilePath: kv.BinlogFiles,
})
if err != nil {
return err
}
break
}
}
}
// close unfilled segment
return task.segManager.ForceClose(collMeta.ID)
}
type describeIndexTask struct {
baseTask
req *milvuspb.DescribeIndexRequest
resp *milvuspb.DescribeIndexResponse
}
func (task *describeIndexTask) Type() commonpb.MsgType {
return commonpb.MsgType_kDescribeIndex
}
func (task *describeIndexTask) Ts() (Timestamp, error) {
return task.req.Base.Timestamp, nil
}
func (task *describeIndexTask) Execute() error {
collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName)
if err != nil {
return err
}
var fieldID int64 = -1
for _, fieldSchema := range collMeta.Schema.Fields {
if fieldSchema.Name == task.req.FieldName {
fieldID = fieldSchema.FieldID
break
}
}
if fieldID == -1 {
return fmt.Errorf("can not find field %s", task.req.FieldName)
}
indexParams, err := task.mt.GetFieldIndexParams(collMeta.ID, fieldID)
if err != nil {
return err
}
description := &milvuspb.IndexDescription{
IndexName: "", // todo add IndexName to master meta_table
Params: indexParams,
}
task.resp.IndexDescriptions = []*milvuspb.IndexDescription{description}
return nil
}
type getIndexStateTask struct {
baseTask
req *milvuspb.IndexStateRequest
runtimeStats *RuntimeStats
resp *milvuspb.IndexStateResponse
}
func (task *getIndexStateTask) Type() commonpb.MsgType {
return commonpb.MsgType_kGetIndexState
}
func (task *getIndexStateTask) Ts() (Timestamp, error) {
return task.req.Base.Timestamp, nil
}
func (task *getIndexStateTask) Execute() error {
// get field id, collection id
collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName)
if err != nil {
return err
}
var fieldID int64 = -1
for _, fieldSchema := range collMeta.Schema.Fields {
if fieldSchema.Name == task.req.FieldName {
fieldID = fieldSchema.FieldID
break
}
}
if fieldID == -1 {
return fmt.Errorf("can not find field %s", task.req.FieldName)
}
// total segment nums
totalSegmentNums := len(collMeta.SegmentIDs)
indexParams, err := task.mt.GetFieldIndexParams(collMeta.ID, fieldID)
if err != nil {
return err
}
// get completed segment nums from querynode's runtime stats
relatedSegments := task.runtimeStats.GetTotalNumOfRelatedSegments(collMeta.ID, fieldID, indexParams)
task.resp = &milvuspb.IndexStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
if int64(totalSegmentNums) == relatedSegments {
task.resp.State = commonpb.IndexState_FINISHED
} else {
task.resp.State = commonpb.IndexState_INPROGRESS
}
return nil
}
package master
import (
"context"
"log"
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
// Server is the pd server.
type (
UniqueID = typeutil.UniqueID
Timestamp = typeutil.Timestamp
)
type Master struct {
// Server state.
isServing int64
// Server start timestamp
startTimestamp int64
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel func()
serverLoopWg sync.WaitGroup
//grpc server
grpcServer *grpc.Server
grpcErr chan error
kvBase *etcdkv.EtcdKV
scheduler *ddRequestScheduler
flushSch *FlushScheduler
indexBuildSch *IndexBuildScheduler
indexLoadSch *IndexLoadScheduler
metaTable *metaTable
timesSyncMsgProducer *timeSyncMsgProducer
// tso ticker
tsoTicker *time.Ticker
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
segmentManager SegmentManager
segmentAssigner *SegmentAssigner
statProcessor *StatsProcessor
segmentStatusMsg ms.MsgStream
//id allocator
idAllocator *GlobalIDAllocator
//tso allocator
tsoAllocator *GlobalTSOAllocator
runtimeStats *RuntimeStats
}
func newKVBase(kvRoot string, etcdAddr []string) *etcdkv.EtcdKV {
cli, _ := clientv3.New(clientv3.Config{
Endpoints: etcdAddr,
DialTimeout: 5 * time.Second,
})
kvBase := etcdkv.NewEtcdKV(cli, kvRoot)
return kvBase
}
func Init() {
rand.Seed(time.Now().UnixNano())
Params.Init()
}
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context) (*Master, error) {
//Init(etcdAddr, kvRootPath)
etcdAddress := Params.EtcdAddress
metaRootPath := Params.MetaRootPath
kvRootPath := Params.KvRootPath
pulsarAddr := Params.PulsarAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
return nil, err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, metaRootPath)
metakv, err := NewMetaTable(etcdKV)
if err != nil {
return nil, err
}
//timeSyncMsgProducer
tsMsgProducer, err := NewTimeSyncMsgProducer(ctx)
if err != nil {
return nil, err
}
pulsarProxyServiceStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarProxyServiceStream.SetPulsarClient(pulsarAddr)
pulsarProxyServiceStream.CreatePulsarConsumers(Params.ProxyServiceTimeTickChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
pulsarProxyServiceStream.Start()
proxyTimeTickBarrier := newProxyServiceTimeTickBarrier(ctx, pulsarProxyServiceStream)
tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier)
pulsarWriteStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarWriteStream.SetPulsarClient(pulsarAddr)
pulsarWriteStream.CreatePulsarConsumers(Params.WriteNodeTimeTickChannelNames, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
pulsarWriteStream.Start()
var writeStream ms.MsgStream = pulsarWriteStream
writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, Params.WriteNodeIDList)
tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier)
pulsarDDStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDDStream.SetPulsarClient(pulsarAddr)
pulsarDDStream.CreatePulsarProducers(Params.DDChannelNames)
tsMsgProducer.SetDDSyncStream(pulsarDDStream)
pulsarDMStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDMStream.SetPulsarClient(pulsarAddr)
pulsarDMStream.CreatePulsarProducers(Params.InsertChannelNames)
tsMsgProducer.SetDMSyncStream(pulsarDMStream)
pulsarK2SStream := pulsarms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarK2SStream.SetPulsarClient(pulsarAddr)
pulsarK2SStream.CreatePulsarProducers(Params.K2SChannelNames)
tsMsgProducer.SetK2sSyncStream(pulsarK2SStream)
proxyTtBarrierWatcher := make(chan *ms.TimeTickMsg, 1024)
writeNodeTtBarrierWatcher := make(chan *ms.TimeTickMsg, 1024)
tsMsgProducer.WatchProxyTtBarrier(proxyTtBarrierWatcher)
tsMsgProducer.WatchWriteNodeTtBarrier(writeNodeTtBarrierWatcher)
// stats msg stream
statsMs := pulsarms.NewPulsarMsgStream(ctx, 1024)
statsMs.SetPulsarClient(pulsarAddr)
statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, Params.MsgChannelSubName, util.NewUnmarshalDispatcher(), 1024)
statsMs.Start()
m := &Master{
ctx: ctx,
startTimestamp: time.Now().Unix(),
kvBase: newKVBase(kvRootPath, []string{etcdAddress}),
metaTable: metakv,
timesSyncMsgProducer: tsMsgProducer,
grpcErr: make(chan error),
segmentStatusMsg: statsMs,
}
//init idAllocator
m.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "gid"))
if err := m.idAllocator.Initialize(); err != nil {
return nil, err
}
//init tsoAllocator
m.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "tso"))
if err := m.tsoAllocator.Initialize(); err != nil {
return nil, err
}
m.scheduler = NewDDRequestScheduler(ctx)
m.scheduler.SetDDMsgStream(pulsarDDStream)
m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() })
flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, Params.MetaRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
if err != nil {
return nil, err
}
buildIndexClient := grpcindexserviceclient.NewClient(Params.IndexBuilderAddress)
queryNodeClient := client.NewQueryNodeClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames)
m.indexLoadSch = NewIndexLoadScheduler(ctx, queryNodeClient, m.metaTable)
m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch)
m.flushSch = NewFlushScheduler(ctx, flushClient, m.metaTable, m.indexBuildSch, func() (Timestamp, error) { return m.tsoAllocator.AllocOne() })
m.segmentAssigner = NewSegmentAssigner(ctx, metakv,
func() (Timestamp, error) { return m.tsoAllocator.AllocOne() },
proxyTtBarrierWatcher,
)
m.segmentManager, err = NewSegmentManager(ctx, metakv,
func() (UniqueID, error) { return m.idAllocator.AllocOne() },
func() (Timestamp, error) { return m.tsoAllocator.AllocOne() },
writeNodeTtBarrierWatcher,
m.flushSch,
m.segmentAssigner)
if err != nil {
return nil, err
}
m.runtimeStats = NewRuntimeStats()
m.statProcessor = NewStatsProcessor(metakv, m.runtimeStats,
func() (Timestamp, error) { return m.tsoAllocator.AllocOne() },
)
m.grpcServer = grpc.NewServer()
masterpb.RegisterMasterServiceServer(m.grpcServer, m)
return m, nil
}
// AddStartCallback adds a callback in the startServer phase.
func (s *Master) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}
// AddCloseCallback adds a callback in the Close phase.
func (s *Master) AddCloseCallback(callbacks ...func()) {
s.closeCallbacks = append(s.closeCallbacks, callbacks...)
}
// Close closes the server.
func (s *Master) Close() {
if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) {
// server is already closed
return
}
log.Print("closing server")
s.stopServerLoop()
s.segmentAssigner.Close()
s.segmentManager.Close()
if s.kvBase != nil {
s.kvBase.Close()
}
// Run callbacks
for _, cb := range s.closeCallbacks {
cb()
}
log.Print("close server")
}
// IsClosed checks whether server is closed or not.
func (s *Master) IsClosed() bool {
return atomic.LoadInt64(&s.isServing) == 0
}
func (s *Master) IsServing() bool {
return !s.IsClosed()
}
// Run runs the pd server.
func (s *Master) Run(grpcPort int64) error {
if err := s.startServerLoop(s.ctx, grpcPort); err != nil {
return err
}
s.segmentAssigner.Start()
s.segmentManager.Start()
atomic.StoreInt64(&s.isServing, 1)
// Run callbacks
for _, cb := range s.startCallbacks {
cb()
}
return nil
}
// Context returns the context of server.
func (s *Master) Context() context.Context {
return s.ctx
}
// LoopContext returns the loop context of server.
func (s *Master) LoopContext() context.Context {
return s.serverLoopCtx
}
func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
//go s.Se
s.serverLoopWg.Add(1)
if err := s.timesSyncMsgProducer.Start(); err != nil {
return err
}
s.serverLoopWg.Add(1)
if err := s.scheduler.Start(); err != nil {
return err
}
s.serverLoopWg.Add(1)
if err := s.indexLoadSch.Start(); err != nil {
return err
}
s.serverLoopWg.Add(1)
if err := s.indexBuildSch.Start(); err != nil {
return err
}
s.serverLoopWg.Add(1)
if err := s.flushSch.Start(); err != nil {
return err
}
s.serverLoopWg.Add(1)
go s.grpcLoop(grpcPort)
if err := <-s.grpcErr; err != nil {
return err
}
s.serverLoopWg.Add(1)
go s.statisticsLoop()
s.serverLoopWg.Add(1)
go s.tsLoop()
return nil
}
func (s *Master) stopServerLoop() {
s.timesSyncMsgProducer.Close()
s.serverLoopWg.Done()
s.scheduler.Close()
s.serverLoopWg.Done()
s.flushSch.Close()
s.serverLoopWg.Done()
s.indexBuildSch.Close()
s.serverLoopWg.Done()
s.indexLoadSch.Close()
s.serverLoopWg.Done()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
log.Printf("server is closed, exit grpc server")
}
s.serverLoopCancel()
s.serverLoopWg.Wait()
}
// StartTimestamp returns the start timestamp of this server
func (s *Master) StartTimestamp() int64 {
return s.startTimestamp
}
func (s *Master) checkGrpcReady(ctx context.Context, targetCh chan error) {
select {
case <-time.After(100 * time.Millisecond):
targetCh <- nil
case <-ctx.Done():
return
}
}
func (s *Master) grpcLoop(grpcPort int64) {
defer s.serverLoopWg.Done()
defaultGRPCPort := ":"
defaultGRPCPort += strconv.FormatInt(grpcPort, 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
log.Printf("failed to listen: %v", err)
s.grpcErr <- err
return
}
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
go s.checkGrpcReady(ctx, s.grpcErr)
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErr <- err
}
}
func (s *Master) tsLoop() {
defer s.serverLoopWg.Done()
s.tsoTicker = time.NewTicker(UpdateTimestampStep)
defer s.tsoTicker.Stop()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case <-s.tsoTicker.C:
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Println("failed to update timestamp", err)
return
}
if err := s.idAllocator.UpdateID(); err != nil {
log.Println("failed to update id", err)
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Println("tsLoop is closed")
return
}
}
}
func (s *Master) statisticsLoop() {
defer s.serverLoopWg.Done()
defer s.segmentStatusMsg.Close()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case msg := <-s.segmentStatusMsg.Chan():
err := s.statProcessor.ProcessQueryNodeStats(msg)
if err != nil {
log.Println(err)
}
case <-ctx.Done():
log.Print("server is closed, exit segment statistics loop")
return
}
}
}
This diff is collapsed.
This diff is collapsed.
package master
import (
"context"
"reflect"
"strconv"
"testing"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/stretchr/testify/assert"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"go.etcd.io/etcd/clientv3"
)
func TestMetaTable_Collection(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
defer meta.client.Close()
colMeta := pb.CollectionMeta{
ID: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
colMeta2 := pb.CollectionMeta{
ID: 50,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
colMeta3 := pb.CollectionMeta{
ID: 30,
Schema: &schemapb.CollectionSchema{
Name: "coll2",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
colMeta4 := pb.CollectionMeta{
ID: 30,
Schema: &schemapb.CollectionSchema{
Name: "coll2",
},
CreateTime: 0,
SegmentIDs: []UniqueID{1},
PartitionTags: []string{},
}
colMeta5 := pb.CollectionMeta{
ID: 30,
Schema: &schemapb.CollectionSchema{
Name: "coll2",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{"1"},
}
segID1 := pb.SegmentMeta{
SegmentID: 200,
CollectionID: 100,
PartitionTag: "p1",
}
segID2 := pb.SegmentMeta{
SegmentID: 300,
CollectionID: 100,
PartitionTag: "p1",
}
segID3 := pb.SegmentMeta{
SegmentID: 400,
CollectionID: 100,
PartitionTag: "p2",
}
err = meta.AddCollection(&colMeta)
assert.Nil(t, err)
err = meta.AddCollection(&colMeta2)
assert.NotNil(t, err)
err = meta.AddCollection(&colMeta3)
assert.Nil(t, err)
err = meta.AddCollection(&colMeta4)
assert.NotNil(t, err)
err = meta.AddCollection(&colMeta5)
assert.NotNil(t, err)
collsName, err := meta.ListCollections()
assert.Nil(t, err)
assert.Equal(t, len(collsName), 2)
e1 := reflect.DeepEqual(collsName, []string{"coll1", "coll2"})
e2 := reflect.DeepEqual(collsName, []string{"coll2", "coll1"})
assert.True(t, e1 || e2)
hasCollection := meta.HasCollection(colMeta.ID)
assert.True(t, hasCollection)
err = meta.AddPartition(colMeta.ID, "p1")
assert.Nil(t, err)
err = meta.AddPartition(colMeta.ID, "p2")
assert.Nil(t, err)
err = meta.AddSegment(&segID1)
assert.Nil(t, err)
err = meta.AddSegment(&segID2)
assert.Nil(t, err)
err = meta.AddSegment(&segID3)
assert.Nil(t, err)
getColMeta, err := meta.GetCollectionByName("coll5")
assert.NotNil(t, err)
assert.Nil(t, getColMeta)
getColMeta, err = meta.GetCollectionByName(colMeta.Schema.Name)
assert.Nil(t, err)
assert.Equal(t, 3, len(getColMeta.SegmentIDs))
err = meta.DeleteCollection(colMeta.ID)
assert.Nil(t, err)
err = meta.DeleteCollection(500)
assert.NotNil(t, err)
hasCollection = meta.HasCollection(colMeta.ID)
assert.False(t, hasCollection)
_, err = meta.GetSegmentByID(segID1.SegmentID)
assert.NotNil(t, err)
_, err = meta.GetSegmentByID(segID2.SegmentID)
assert.NotNil(t, err)
_, err = meta.GetSegmentByID(segID3.SegmentID)
assert.NotNil(t, err)
err = meta.reloadFromKV()
assert.Nil(t, err)
assert.Equal(t, 0, len(meta.proxyID2Meta))
assert.Equal(t, 0, len(meta.tenantID2Meta))
assert.Equal(t, 1, len(meta.collName2ID))
assert.Equal(t, 1, len(meta.collID2Meta))
assert.Equal(t, 0, len(meta.segID2Meta))
err = meta.DeleteCollection(colMeta3.ID)
assert.Nil(t, err)
}
func TestMetaTable_DeletePartition(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
defer meta.client.Close()
colMeta := pb.CollectionMeta{
ID: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
segID1 := pb.SegmentMeta{
SegmentID: 200,
CollectionID: 100,
PartitionTag: "p1",
}
segID2 := pb.SegmentMeta{
SegmentID: 300,
CollectionID: 100,
PartitionTag: "p1",
}
segID3 := pb.SegmentMeta{
SegmentID: 400,
CollectionID: 100,
PartitionTag: "p2",
}
err = meta.AddCollection(&colMeta)
assert.Nil(t, err)
err = meta.AddPartition(500, "p1")
assert.NotNil(t, err)
err = meta.AddPartition(colMeta.ID, "p1")
assert.Nil(t, err)
err = meta.AddPartition(colMeta.ID, "p2")
assert.Nil(t, err)
err = meta.AddPartition(colMeta.ID, "p2")
assert.NotNil(t, err)
err = meta.AddSegment(&segID1)
assert.Nil(t, err)
err = meta.AddSegment(&segID2)
assert.Nil(t, err)
err = meta.AddSegment(&segID3)
assert.Nil(t, err)
afterCollMeta, err := meta.GetCollectionByName("coll1")
assert.Nil(t, err)
assert.Equal(t, 3, len(afterCollMeta.PartitionTags))
assert.Equal(t, 3, len(afterCollMeta.SegmentIDs))
err = meta.DeletePartition(100, "p1")
assert.Nil(t, err)
err = meta.DeletePartition(500, "p1")
assert.NotNil(t, err)
afterCollMeta, err = meta.GetCollectionByName("coll1")
assert.Nil(t, err)
assert.Equal(t, 2, len(afterCollMeta.PartitionTags))
assert.Equal(t, 1, len(afterCollMeta.SegmentIDs))
hasPartition := meta.HasPartition(colMeta.ID, "p1")
assert.False(t, hasPartition)
hasPartition = meta.HasPartition(colMeta.ID, "p2")
assert.True(t, hasPartition)
_, err = meta.GetSegmentByID(segID1.SegmentID)
assert.NotNil(t, err)
_, err = meta.GetSegmentByID(segID2.SegmentID)
assert.NotNil(t, err)
_, err = meta.GetSegmentByID(segID3.SegmentID)
assert.Nil(t, err)
afterCollMeta, err = meta.GetCollectionByName("coll1")
assert.Nil(t, err)
err = meta.reloadFromKV()
assert.Nil(t, err)
assert.Equal(t, 0, len(meta.proxyID2Meta))
assert.Equal(t, 0, len(meta.tenantID2Meta))
assert.Equal(t, 1, len(meta.collName2ID))
assert.Equal(t, 1, len(meta.collID2Meta))
assert.Equal(t, 1, len(meta.segID2Meta))
// delete not exist
err = meta.DeletePartition(100, "not_exist")
assert.NotNil(t, err)
}
func TestMetaTable_Segment(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
defer meta.client.Close()
keys, _, err := meta.client.LoadWithPrefix("")
assert.Nil(t, err)
err = meta.client.MultiRemove(keys)
assert.Nil(t, err)
colMeta := pb.CollectionMeta{
ID: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
segMeta := pb.SegmentMeta{
SegmentID: 200,
CollectionID: 100,
PartitionTag: "p1",
}
err = meta.AddCollection(&colMeta)
assert.Nil(t, err)
err = meta.AddPartition(colMeta.ID, "p1")
assert.Nil(t, err)
err = meta.AddSegment(&segMeta)
assert.Nil(t, err)
getSegMeta, err := meta.GetSegmentByID(segMeta.SegmentID)
assert.Nil(t, err)
assert.Equal(t, &segMeta, getSegMeta)
segMeta.NumRows = 111
segMeta.MemSize = 100000
err = meta.UpdateSegment(&segMeta)
assert.Nil(t, err)
err = meta.CloseSegment(segMeta.SegmentID, Timestamp(11))
assert.Nil(t, err)
err = meta.CloseSegment(1000, Timestamp(11))
assert.NotNil(t, err)
getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID)
assert.Nil(t, err)
assert.Equal(t, getSegMeta.NumRows, int64(111))
assert.Equal(t, getSegMeta.CloseTime, uint64(11))
assert.Equal(t, int64(100000), getSegMeta.MemSize)
err = meta.DeleteSegment(segMeta.SegmentID)
assert.Nil(t, err)
err = meta.DeleteSegment(1000)
assert.NotNil(t, err)
getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID)
assert.Nil(t, getSegMeta)
assert.NotNil(t, err)
getColMeta, err := meta.GetCollectionByName(colMeta.Schema.Name)
assert.Nil(t, err)
assert.Equal(t, 0, len(getColMeta.SegmentIDs))
meta.tenantID2Meta = make(map[UniqueID]pb.TenantMeta)
meta.proxyID2Meta = make(map[UniqueID]pb.ProxyMeta)
meta.collID2Meta = make(map[UniqueID]pb.CollectionMeta)
meta.collName2ID = make(map[string]UniqueID)
meta.segID2Meta = make(map[UniqueID]pb.SegmentMeta)
err = meta.reloadFromKV()
assert.Nil(t, err)
assert.Equal(t, 0, len(meta.proxyID2Meta))
assert.Equal(t, 0, len(meta.tenantID2Meta))
assert.Equal(t, 1, len(meta.collName2ID))
assert.Equal(t, 1, len(meta.collID2Meta))
assert.Equal(t, 0, len(meta.segID2Meta))
}
func TestMetaTable_UpdateSegment(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
defer meta.client.Close()
colMeta := pb.CollectionMeta{
ID: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
segMeta := pb.SegmentMeta{
SegmentID: 200,
CollectionID: 100,
PartitionTag: "p1",
NumRows: 110,
}
err = meta.AddCollection(&colMeta)
assert.Nil(t, err)
err = meta.UpdateSegment(&segMeta)
assert.Nil(t, err)
seg, err := meta.GetSegmentByID(200)
assert.Nil(t, err)
assert.Equal(t, seg.NumRows, int64(110))
segMeta.NumRows = 210
err = meta.UpdateSegment(&segMeta)
assert.Nil(t, err)
seg, err = meta.GetSegmentByID(200)
assert.Nil(t, err)
assert.Equal(t, seg.NumRows, int64(210))
}
func TestMetaTable_AddPartition_Limit(t *testing.T) {
Init()
Params.MaxPartitionNum = 256 // adding 4096 partitions is too slow
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
defer meta.client.Close()
colMeta := pb.CollectionMeta{
ID: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
err = meta.AddCollection(&colMeta)
assert.Nil(t, err)
for i := 0; i < int(Params.MaxPartitionNum); i++ {
err := meta.AddPartition(100, "partition_"+strconv.Itoa(i))
assert.Nil(t, err)
}
err = meta.AddPartition(100, "partition_limit")
assert.NotNil(t, err)
}
func TestMetaTable_LoadIndexMetaFromKv(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
kv := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta := pb.FieldIndexMeta{
SegmentID: 1,
FieldID: 100,
IndexID: 1000,
IndexParams: []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}},
State: commonpb.IndexState_FINISHED,
IndexFilePaths: []string{"path1"},
}
marshalRes := proto.MarshalTextString(&meta)
err = kv.Save("/indexmeta/"+strconv.FormatInt(meta.SegmentID, 10)+strconv.FormatInt(meta.FieldID, 10)+strconv.FormatInt(meta.IndexID, 10), marshalRes)
assert.Nil(t, err)
metaTable, err := NewMetaTable(kv)
assert.Nil(t, err)
res, err := metaTable.HasFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}})
assert.Nil(t, err)
assert.True(t, res)
}
func TestMetaTable_IndexMeta(t *testing.T) {
Init()
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
err = meta.AddFieldIndexMeta(&pb.FieldIndexMeta{
SegmentID: 1,
FieldID: 100,
IndexID: 1000,
IndexParams: []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}},
State: commonpb.IndexState_INPROGRESS,
IndexFilePaths: []string{},
})
assert.Nil(t, err)
err = meta.AddFieldIndexMeta(&pb.FieldIndexMeta{
SegmentID: 1,
FieldID: 100,
IndexID: 1000,
IndexParams: []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}},
State: commonpb.IndexState_INPROGRESS,
IndexFilePaths: []string{},
})
assert.NotNil(t, err)
res, err := meta.HasFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}})
assert.Nil(t, err)
assert.True(t, res)
res, err = meta.HasFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v2"}})
assert.Nil(t, err)
assert.False(t, res)
err = meta.UpdateFieldIndexMeta(&pb.FieldIndexMeta{
SegmentID: 1,
FieldID: 100,
IndexID: 1000,
IndexParams: []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}},
State: commonpb.IndexState_FINISHED,
IndexFilePaths: []string{},
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.IndexState_FINISHED, meta.segID2IndexMetas[1][0].State)
err = meta.DeleteFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}})
assert.Nil(t, err)
res, err = meta.HasFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}})
assert.Nil(t, err)
assert.False(t, res)
}
This diff is collapsed.
package master
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_Init(t *testing.T) {
Params.Init()
}
func TestParamTable_Address(t *testing.T) {
address := Params.Address
assert.Equal(t, address, "localhost")
}
func TestParamTable_Port(t *testing.T) {
port := Params.Port
assert.Equal(t, port, 53100)
}
func TestParamTable_MetaRootPath(t *testing.T) {
path := Params.MetaRootPath
assert.Equal(t, path, "by-dev/meta")
}
func TestParamTable_KVRootPath(t *testing.T) {
path := Params.KvRootPath
assert.Equal(t, path, "by-dev/kv")
}
func TestParamTableIndexServiceAddress(t *testing.T) {
path := Params.IndexBuilderAddress
assert.Equal(t, path, "localhost:31000")
}
func TestParamTable_TopicNum(t *testing.T) {
num := Params.TopicNum
fmt.Println("TopicNum:", num)
}
func TestParamTable_SegmentSize(t *testing.T) {
size := Params.SegmentSize
assert.Equal(t, size, float64(512))
}
func TestParamTable_SegmentSizeFactor(t *testing.T) {
factor := Params.SegmentSizeFactor
assert.Equal(t, factor, 0.75)
}
func TestParamTable_DefaultRecordSize(t *testing.T) {
size := Params.DefaultRecordSize
assert.Equal(t, size, int64(1024))
}
func TestParamTable_MinSegIDAssignCnt(t *testing.T) {
cnt := Params.MinSegIDAssignCnt
assert.Equal(t, cnt, int64(1024))
}
func TestParamTable_MaxSegIDAssignCnt(t *testing.T) {
cnt := Params.MaxSegIDAssignCnt
assert.Equal(t, cnt, int64(16384))
}
func TestParamTable_SegIDAssignExpiration(t *testing.T) {
expiration := Params.SegIDAssignExpiration
assert.Equal(t, expiration, int64(2000))
}
func TestParamTable_QueryNodeNum(t *testing.T) {
num := Params.QueryNodeNum
fmt.Println("QueryNodeNum", num)
}
func TestParamTable_QueryNodeStatsChannelName(t *testing.T) {
name := Params.QueryNodeStatsChannelName
assert.Equal(t, name, "query-node-stats")
}
func TestParamTable_ProxyIDList(t *testing.T) {
ids := Params.ProxyIDList
assert.Equal(t, len(ids), 1)
assert.Equal(t, ids[0], int64(0))
}
func TestParamTable_ProxyTimeTickChannelNames(t *testing.T) {
names := Params.ProxyServiceTimeTickChannelNames
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "proxyTimeTick-0")
}
func TestParamTable_MsgChannelSubName(t *testing.T) {
name := Params.MsgChannelSubName
assert.Equal(t, name, "master")
}
func TestParamTable_SoftTimeTickBarrierInterval(t *testing.T) {
interval := Params.SoftTimeTickBarrierInterval
assert.Equal(t, interval, Timestamp(0x7d00000))
}
func TestParamTable_WriteNodeIDList(t *testing.T) {
ids := Params.WriteNodeIDList
assert.Equal(t, len(ids), 1)
assert.Equal(t, ids[0], int64(3))
}
func TestParamTable_WriteNodeTimeTickChannelNames(t *testing.T) {
names := Params.WriteNodeTimeTickChannelNames
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "writeNodeTimeTick-3")
}
func TestParamTable_InsertChannelNames(t *testing.T) {
names := Params.InsertChannelNames
assert.Equal(t, Params.TopicNum, len(names))
}
func TestParamTable_K2SChannelNames(t *testing.T) {
names := Params.K2SChannelNames
assert.Equal(t, len(names), 1)
assert.Equal(t, names[0], "k2s-0")
}
This diff is collapsed.
This diff is collapsed.
package master
type persistenceScheduler interface {
Enqueue(interface{}) error
schedule(interface{}) error
scheduleLoop()
Start() error
Close()
}
type MockFlushScheduler struct {
}
func (m *MockFlushScheduler) Enqueue(i interface{}) error {
return nil
}
func (m *MockFlushScheduler) schedule(i interface{}) error {
return nil
}
func (m *MockFlushScheduler) scheduleLoop() {
}
func (m *MockFlushScheduler) Start() error {
return nil
}
func (m *MockFlushScheduler) Close() {
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment