From 7c0c835b4949b0819c96990d12fb4a1c6be4fb5a Mon Sep 17 00:00:00 2001 From: sunby <bingyi.sun@zilliz.com> Date: Mon, 25 Jan 2021 15:16:28 +0800 Subject: [PATCH] Init params and add segment start/end position Signed-off-by: sunby <bingyi.sun@zilliz.com> --- cmd/masterservice/main.go | 93 -------- configs/advanced/channel.yaml | 5 + configs/advanced/data_service.yaml | 13 ++ configs/advanced/master.yaml | 3 +- internal/datanode/allocator.go | 2 +- internal/datanode/collection.go | 15 +- internal/datanode/collection_replica.go | 7 +- internal/datanode/collection_replica_test.go | 6 +- internal/datanode/collection_test.go | 11 +- internal/datanode/data_node.go | 7 +- internal/datanode/data_sync_service_test.go | 3 +- internal/datanode/factory.go | 47 ---- internal/datanode/flow_graph_dd_node.go | 3 +- .../flow_graph_insert_buffer_node_test.go | 5 +- internal/datanode/meta_service.go | 150 ++++++++----- internal/datanode/meta_service_test.go | 122 +++++++--- internal/dataservice/param.go | 81 +++++-- internal/dataservice/server.go | 211 +++++++++++------- internal/dataservice/stats_handler.go | 23 +- internal/distributed/masterservice/client.go | 77 ++----- .../masterservice/masterservice_test.go | 3 +- internal/distributed/masterservice/server.go | 29 +-- internal/masterservice/master_service.go | 118 ---------- internal/masterservice/param_table.go | 8 - internal/masterservice/param_table_test.go | 3 - internal/timesync/timetick_watcher.go | 8 + 26 files changed, 481 insertions(+), 572 deletions(-) delete mode 100644 cmd/masterservice/main.go create mode 100644 configs/advanced/data_service.yaml diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go deleted file mode 100644 index 9b050f956..000000000 --- a/cmd/masterservice/main.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "os" - "os/signal" - "syscall" - - ds "github.com/zilliztech/milvus-distributed/internal/dataservice" - dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" - msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" - is "github.com/zilliztech/milvus-distributed/internal/indexservice" - ms "github.com/zilliztech/milvus-distributed/internal/masterservice" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" -) - -const reTryCnt = 3 - -func main() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) - - svr, err := msc.NewGrpcServer(ctx) - if err != nil { - panic(err) - } - - log.Printf("proxy service address : %s", psc.Params.NetworkAddress()) - //proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress()) - - //TODO, test proxy service GetComponentStates, before set - - //if err = svr.SetProxyService(proxyService); err != nil { - // panic(err) - //} - - log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) - dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataService.Init(); err != nil { - panic(err) - } - if err = dataService.Start(); err != nil { - panic(err) - } - cnt := 0 - for cnt = 0; cnt < reTryCnt; cnt++ { - dsStates, err := dataService.GetComponentStates() - if err != nil { - continue - } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - continue - } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= reTryCnt { - panic("connect to data service failed") - } - - //if err = svr.SetDataService(dataService); err != nil { - // panic(err) - //} - - log.Printf("index service address : %s", is.Params.Address) - indexService := isc.NewClient(is.Params.Address) - - if err = svr.SetIndexService(indexService); err != nil { - panic(err) - } - - if err = svr.Start(); err != nil { - panic(err) - } - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - sig := <-sc - log.Printf("Got %s signal to exit", sig.String()) - _ = svr.Stop() -} diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml index a11c7fccb..bbfe73444 100644 --- a/configs/advanced/channel.yaml +++ b/configs/advanced/channel.yaml @@ -29,6 +29,10 @@ msgChannel: queryNodeStats: "query-node-stats" # cmd for loadIndex, flush, etc... cmd: "cmd" + dataServiceInsertChannel: "insert-channel-" + dataServiceStatistic: "dataservice-statistics-channel" + dataServiceTimeTick: "dataservice-timetick-channel" + dataServiceSegmentInfo: "segment-info-channel" # sub name generation rule: ${subNamePrefix}-${NodeID} subNamePrefix: @@ -37,6 +41,7 @@ msgChannel: queryNodeSubNamePrefix: "queryNode" writeNodeSubNamePrefix: "writeNode" # GOOSE TODO: remove this dataNodeSubNamePrefix: "dataNode" + dataServiceSubNamePrefix: "dataService" # default channel range [0, 1) channelRange: diff --git a/configs/advanced/data_service.yaml b/configs/advanced/data_service.yaml new file mode 100644 index 000000000..49c405e79 --- /dev/null +++ b/configs/advanced/data_service.yaml @@ -0,0 +1,13 @@ +dataservice: + nodeID: 14040 + address: "127.0.0.1" + port: 13333 + segment: + # old name: segmentThreshold: 536870912 + size: 512 # MB + sizeFactor: 0.75 + defaultSizePerRecord: 1024 + # old name: segmentExpireDuration: 2000 + IDAssignExpiration: 2000 # ms + insertChannelNumPerCollection: 4 + dataNodeNum: 2 \ No newline at end of file diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index 3e1c4e781..69474322a 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -23,5 +23,4 @@ master: IDAssignExpiration: 2000 # ms maxPartitionNum: 4096 - nodeID: 100 - timeout: 5 # time out, 5 seconds \ No newline at end of file + nodeID: 100 \ No newline at end of file diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index 83400a8ea..0cad7a46c 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -25,7 +25,7 @@ func (alloc *allocatorImpl) allocID() (UniqueID, error) { resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, - MsgID: 1, // GOOSE TODO + MsgID: 1, // GOOSE TODO add msg id Timestamp: 0, // GOOSE TODO SourceID: Params.NodeID, }, diff --git a/internal/datanode/collection.go b/internal/datanode/collection.go index d489eef70..3610dc0b8 100644 --- a/internal/datanode/collection.go +++ b/internal/datanode/collection.go @@ -1,6 +1,9 @@ package datanode import ( + "log" + + "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -21,9 +24,17 @@ func (c *Collection) Schema() *schemapb.CollectionSchema { return c.schema } -func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection { +func newCollection(collectionID UniqueID, schemaStr string) *Collection { + + var schema schemapb.CollectionSchema + err := proto.UnmarshalText(schemaStr, &schema) + if err != nil { + log.Println(err) + return nil + } + var newCollection = &Collection{ - schema: schema, + schema: &schema, id: collectionID, } return newCollection diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 0be37979a..5412a2ebd 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -6,14 +6,13 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) type collectionReplica interface { // collection getCollectionNum() int - addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error + addCollection(collectionID UniqueID, schemaBlob string) error removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) @@ -163,11 +162,11 @@ func (colReplica *collectionReplicaImpl) getCollectionNum() int { return len(colReplica.collections) } -func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { +func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schemaBlob string) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - var newCollection = newCollection(collectionID, schema) + var newCollection = newCollection(collectionID, schemaBlob) colReplica.collections = append(colReplica.collections, newCollection) log.Println("Create collection: ", newCollection.Name()) diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index c97d38dbd..25869712d 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -3,6 +3,7 @@ package datanode import ( "testing" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -22,7 +23,10 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - var err = replica.addCollection(collectionMeta.ID, collectionMeta.Schema) + schemaBlob := proto.MarshalTextString(collectionMeta.Schema) + require.NotEqual(t, "", schemaBlob) + + var err = replica.addCollection(collectionMeta.ID, schemaBlob) require.NoError(t, err) collection, err := replica.getCollectionByName(collectionName) diff --git a/internal/datanode/collection_test.go b/internal/datanode/collection_test.go index 6f12bfd0b..cce9bc1dc 100644 --- a/internal/datanode/collection_test.go +++ b/internal/datanode/collection_test.go @@ -3,6 +3,7 @@ package datanode import ( "testing" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" ) @@ -12,7 +13,10 @@ func TestCollection_newCollection(t *testing.T) { Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - collection := newCollection(collectionMeta.ID, collectionMeta.Schema) + schemaBlob := proto.MarshalTextString(collectionMeta.Schema) + assert.NotEqual(t, "", schemaBlob) + + collection := newCollection(collectionMeta.ID, schemaBlob) assert.Equal(t, collection.Name(), collectionName) assert.Equal(t, collection.ID(), collectionID) } @@ -23,7 +27,10 @@ func TestCollection_deleteCollection(t *testing.T) { Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - collection := newCollection(collectionMeta.ID, collectionMeta.Schema) + schemaBlob := proto.MarshalTextString(collectionMeta.Schema) + assert.NotEqual(t, "", schemaBlob) + + collection := newCollection(collectionMeta.ID, schemaBlob) assert.Equal(t, collection.Name(), collectionName) assert.Equal(t, collection.ID(), collectionID) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ced35ae9d..4be2c78d9 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -26,8 +26,8 @@ const ( type ( Inteface interface { typeutil.Service - typeutil.Component + GetComponentStates() (*internalpb2.ComponentStates, error) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) } @@ -43,7 +43,6 @@ type ( } DataNode struct { - // GOOSE TODO: complete interface with component ctx context.Context NodeID UniqueID Role string @@ -125,7 +124,7 @@ func (node *DataNode) Init() error { chanSize := 100 flushChan := make(chan *flushMsg, chanSize) node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc) - node.metaService = newMetaService(node.ctx, replica, node.masterService) + node.metaService = newMetaService(node.ctx, replica) node.replica = replica // Opentracing @@ -155,7 +154,7 @@ func (node *DataNode) Init() error { func (node *DataNode) Start() error { go node.dataSyncService.start() - node.metaService.init() + node.metaService.start() return nil } diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 3b40ea1a2..0813dde84 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -41,7 +42,7 @@ func TestDataSyncService_Start(t *testing.T) { replica := newReplica() allocFactory := AllocatorFactory{} sync := newDataSyncService(ctx, flushChan, replica, allocFactory) - sync.replica.addCollection(collMeta.ID, collMeta.Schema) + sync.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema)) go sync.start() // test data generate diff --git a/internal/datanode/factory.go b/internal/datanode/factory.go index bb87ca008..d8a462a9e 100644 --- a/internal/datanode/factory.go +++ b/internal/datanode/factory.go @@ -3,8 +3,6 @@ package datanode import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -17,12 +15,6 @@ type ( AllocatorFactory struct { } - - MasterServiceFactory struct { - ID UniqueID - collectionName string - collectionID UniqueID - } ) func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { @@ -164,42 +156,3 @@ func (alloc AllocatorFactory) allocID() (UniqueID, error) { // GOOSE TODO: random ID generate return UniqueID(0), nil } - -func (m *MasterServiceFactory) setID(id UniqueID) { - m.ID = id // GOOSE TODO: random ID generator -} - -func (m *MasterServiceFactory) setCollectionID(id UniqueID) { - m.collectionID = id -} - -func (m *MasterServiceFactory) setCollectionName(name string) { - m.collectionName = name -} - -func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - resp := &masterpb.IDResponse{ - Status: &commonpb.Status{}, - ID: m.ID, - } - return resp, nil -} - -func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - resp := &milvuspb.ShowCollectionResponse{ - Status: &commonpb.Status{}, - CollectionNames: []string{m.collectionName}, - } - return resp, nil - -} -func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - f := MetaFactory{} - meta := f.CollectionMetaFactory(m.collectionID, m.collectionName) - resp := &milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{}, - CollectionID: m.collectionID, - Schema: meta.Schema, - } - return resp, nil -} diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 45676f19b..d7432ae16 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -224,8 +224,9 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { return } + schemaStr := proto.MarshalTextString(&schema) // add collection - err = ddNode.replica.addCollection(collectionID, &schema) + err = ddNode.replica.addCollection(collectionID, schemaStr) if err != nil { log.Println(err) return diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 82f0ef69e..78b08901e 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -38,9 +39,11 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { Factory := &MetaFactory{} collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") + schemaBlob := proto.MarshalTextString(collMeta.Schema) + require.NotEqual(t, "", schemaBlob) replica := newReplica() - err = replica.addCollection(collMeta.ID, collMeta.Schema) + err = replica.addCollection(collMeta.ID, schemaBlob) require.NoError(t, err) // Params.FlushInsertBufSize = 2 diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index c259af622..fa92d59fb 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -4,91 +4,73 @@ import ( "context" "fmt" "log" + "path" "reflect" + "strings" + "time" - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/golang/protobuf/proto" + "go.etcd.io/etcd/clientv3" + + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" ) type metaService struct { - ctx context.Context - replica collectionReplica - masterClient MasterServiceInterface + ctx context.Context + kvBase *etcdkv.EtcdKV + replica collectionReplica } -func newMetaService(ctx context.Context, replica collectionReplica, m MasterServiceInterface) *metaService { +func newMetaService(ctx context.Context, replica collectionReplica) *metaService { + ETCDAddr := Params.EtcdAddress + MetaRootPath := Params.MetaRootPath + + cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{ETCDAddr}, + DialTimeout: 5 * time.Second, + }) + return &metaService{ - ctx: ctx, - replica: replica, - masterClient: m, + ctx: ctx, + kvBase: etcdkv.NewEtcdKV(cli, MetaRootPath), + replica: replica, } } -func (mService *metaService) init() { +func (mService *metaService) start() { + // init from meta err := mService.loadCollections() if err != nil { - log.Fatal("metaService init failed:", err) + log.Fatal("metaService loadCollections failed") } } -func (mService *metaService) loadCollections() error { - names, err := mService.getCollectionNames() - if err != nil { - return err - } +func GetCollectionObjID(key string) string { + ETCDRootPath := Params.MetaRootPath - for _, name := range names { - err := mService.createCollection(name) - if err != nil { - return err - } - } - return nil + prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" + return strings.TrimPrefix(key, prefix) } -func (mService *metaService) getCollectionNames() ([]string, error) { - req := &milvuspb.ShowCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kShowCollections, - MsgID: 0, //GOOSE TODO - Timestamp: 0, // GOOSE TODO - SourceID: Params.NodeID, - }, - DbName: "default", // GOOSE TODO - } +func isCollectionObj(key string) bool { + ETCDRootPath := Params.MetaRootPath - response, err := mService.masterClient.ShowCollections(req) - if err != nil { - return nil, errors.Errorf("Get collection names from master service wrong: %v", err) - } - return response.GetCollectionNames(), nil + prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" + prefix = strings.TrimSpace(prefix) + index := strings.Index(key, prefix) + + return index == 0 } -func (mService *metaService) createCollection(name string) error { - req := &milvuspb.DescribeCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kDescribeCollection, - MsgID: 0, //GOOSE TODO - Timestamp: 0, // GOOSE TODO - SourceID: Params.NodeID, - }, - DbName: "default", // GOOSE TODO - CollectionName: name, - } +func isSegmentObj(key string) bool { + ETCDRootPath := Params.MetaRootPath - response, err := mService.masterClient.DescribeCollection(req) - if err != nil { - return errors.Errorf("Describe collection %v from master service wrong: %v", name, err) - } + prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" + prefix = strings.TrimSpace(prefix) + index := strings.Index(key, prefix) - err = mService.replica.addCollection(response.GetCollectionID(), response.GetSchema()) - if err != nil { - return errors.Errorf("Add collection %v into collReplica wrong: %v", name, err) - } - - return nil + return index == 0 } func printCollectionStruct(obj *etcdpb.CollectionMeta) { @@ -103,3 +85,51 @@ func printCollectionStruct(obj *etcdpb.CollectionMeta) { fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface()) } } + +func (mService *metaService) processCollectionCreate(id string, value string) { + //println(fmt.Sprintf("Create Collection:$%s$", id)) + + col := mService.collectionUnmarshal(value) + if col != nil { + schema := col.Schema + schemaBlob := proto.MarshalTextString(schema) + err := mService.replica.addCollection(col.ID, schemaBlob) + if err != nil { + log.Println(err) + } + } +} + +func (mService *metaService) loadCollections() error { + keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix) + if err != nil { + return err + } + + for i := range keys { + objID := GetCollectionObjID(keys[i]) + mService.processCollectionCreate(objID, values[i]) + } + + return nil +} + +//----------------------------------------------------------------------- Unmarshal and Marshal +func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta { + col := etcdpb.CollectionMeta{} + err := proto.UnmarshalText(value, &col) + if err != nil { + log.Println(err) + return nil + } + return &col +} + +func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string { + value := proto.MarshalTextString(col) + if value == "" { + log.Println("marshal collection failed") + return "" + } + return value +} diff --git a/internal/datanode/meta_service_test.go b/internal/datanode/meta_service_test.go index 23d97085b..3154ecebd 100644 --- a/internal/datanode/meta_service_test.go +++ b/internal/datanode/meta_service_test.go @@ -7,46 +7,94 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMetaService_All(t *testing.T) { +func TestMetaService_start(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() replica := newReplica() - mFactory := &MasterServiceFactory{} - mFactory.setCollectionID(0) - mFactory.setCollectionName("a-collection") - metaService := newMetaService(ctx, replica, mFactory) - - t.Run("Test getCollectionNames", func(t *testing.T) { - names, err := metaService.getCollectionNames() - assert.NoError(t, err) - assert.Equal(t, 1, len(names)) - assert.Equal(t, "a-collection", names[0]) - }) - - t.Run("Test createCollection", func(t *testing.T) { - hasColletion := metaService.replica.hasCollection(0) - assert.False(t, hasColletion) - - err := metaService.createCollection("a-collection") - assert.NoError(t, err) - hasColletion = metaService.replica.hasCollection(0) - assert.True(t, hasColletion) - }) - - t.Run("Test loadCollections", func(t *testing.T) { - hasColletion := metaService.replica.hasCollection(1) - assert.False(t, hasColletion) - - mFactory.setCollectionID(1) - mFactory.setCollectionName("a-collection-1") - err := metaService.loadCollections() - assert.NoError(t, err) - - hasColletion = metaService.replica.hasCollection(1) - assert.True(t, hasColletion) - hasColletion = metaService.replica.hasCollection(0) - assert.True(t, hasColletion) - }) + metaService := newMetaService(ctx, replica) + + metaService.start() +} + +func TestMetaService_getCollectionObjId(t *testing.T) { + var key = "/collection/collection0" + var collectionObjID1 = GetCollectionObjID(key) + + assert.Equal(t, collectionObjID1, "/collection/collection0") + + key = "fakeKey" + var collectionObjID2 = GetCollectionObjID(key) + + assert.Equal(t, collectionObjID2, "fakeKey") +} + +func TestMetaService_isCollectionObj(t *testing.T) { + var key = Params.MetaRootPath + "/collection/collection0" + var b1 = isCollectionObj(key) + + assert.Equal(t, b1, true) + + key = Params.MetaRootPath + "/segment/segment0" + var b2 = isCollectionObj(key) + + assert.Equal(t, b2, false) +} + +func TestMetaService_processCollectionCreate(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + replica := newReplica() + metaService := newMetaService(ctx, replica) + defer cancel() + id := "0" + value := `schema: < + name: "test" + fields: < + fieldID:100 + name: "vec" + data_type: VECTOR_FLOAT + type_params: < + key: "dim" + value: "16" + > + index_params: < + key: "metric_type" + value: "L2" + > + > + fields: < + fieldID:101 + name: "age" + data_type: INT32 + type_params: < + key: "dim" + value: "1" + > + > + > + segmentIDs: 0 + partition_tags: "default" + ` + + metaService.processCollectionCreate(id, value) + + collectionNum := replica.getCollectionNum() + assert.Equal(t, collectionNum, 1) + + collection, err := replica.getCollectionByName("test") + assert.NoError(t, err) + assert.Equal(t, collection.ID(), UniqueID(0)) +} + +func TestMetaService_loadCollections(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + replica := newReplica() + + metaService := newMetaService(ctx, replica) + + err2 := (*metaService).loadCollections() + assert.Nil(t, err2) } diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 2e3f00e37..0e381ed15 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -1,6 +1,8 @@ package dataservice import ( + "strconv" + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -11,8 +13,6 @@ type ParamTable struct { Port int NodeID int64 - MasterAddress string - EtcdAddress string MetaRootPath string KvRootPath string @@ -31,6 +31,7 @@ type ParamTable struct { DataNodeNum int SegmentInfoChannelName string DataServiceSubscriptionName string + K2SChannelNames []string } var Params ParamTable @@ -39,15 +40,14 @@ func (p *ParamTable) Init() { // load yaml p.BaseTable.Init() - err := p.LoadYaml("advanced/master.yaml") - if err != nil { + if err := p.LoadYaml("advanced/data_service.yaml"); err != nil { panic(err) } // set members p.initAddress() p.initPort() - p.NodeID = 1 // todo + p.initNodeID() p.initEtcdAddress() p.initMetaRootPath() @@ -68,15 +68,19 @@ func (p *ParamTable) Init() { } func (p *ParamTable) initAddress() { - masterAddress, err := p.Load("master.address") + dataserviceAddress, err := p.Load("dataservice.address") if err != nil { panic(err) } - p.Address = masterAddress + p.Address = dataserviceAddress } func (p *ParamTable) initPort() { - p.Port = p.ParseInt("master.port") + p.Port = p.ParseInt("dataservice.port") +} + +func (p *ParamTable) initNodeID() { + p.NodeID = p.ParseInt64("dataservice.nodeID") } func (p *ParamTable) initEtcdAddress() { @@ -119,46 +123,83 @@ func (p *ParamTable) initKvRootPath() { p.KvRootPath = rootPath + "/" + subPath } func (p *ParamTable) initSegmentSize() { - p.SegmentSize = p.ParseFloat("master.segment.size") + p.SegmentSize = p.ParseFloat("dataservice.segment.size") } func (p *ParamTable) initSegmentSizeFactor() { - p.SegmentSizeFactor = p.ParseFloat("master.segment.sizeFactor") + p.SegmentSizeFactor = p.ParseFloat("dataservice.segment.sizeFactor") } func (p *ParamTable) initDefaultRecordSize() { - p.DefaultRecordSize = p.ParseInt64("master.segment.defaultSizePerRecord") + p.DefaultRecordSize = p.ParseInt64("dataservice.segment.defaultSizePerRecord") } -// TODO read from config/env func (p *ParamTable) initSegIDAssignExpiration() { - p.SegIDAssignExpiration = 3000 //ms + p.SegIDAssignExpiration = p.ParseInt64("dataservice.segment.IDAssignExpiration") //ms } func (p *ParamTable) initInsertChannelPrefixName() { - p.InsertChannelPrefixName = "insert-channel-" + var err error + p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel") + if err != nil { + panic(err) + } } func (p *ParamTable) initInsertChannelNumPerCollection() { - p.InsertChannelNumPerCollection = 4 + p.InsertChannelNumPerCollection = p.ParseInt64("dataservice.insertChannelNumPerCollection") } func (p *ParamTable) initStatisticsChannelName() { - p.StatisticsChannelName = "dataservice-statistics-channel" + var err error + p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic") + if err != nil { + panic(err) + } } func (p *ParamTable) initTimeTickChannelName() { - p.TimeTickChannelName = "dataservice-timetick-channel" + var err error + p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick") + if err != nil { + panic(err) + } } func (p *ParamTable) initDataNodeNum() { - p.DataNodeNum = 2 + p.DataNodeNum = p.ParseInt("dataservice.dataNodeNum") } func (p *ParamTable) initSegmentInfoChannelName() { - p.SegmentInfoChannelName = "segment-info-channel" + var err error + p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo") + if err != nil { + panic(err) + } } func (p *ParamTable) initDataServiceSubscriptionName() { - p.DataServiceSubscriptionName = "dataserive-sub" + var err error + p.DataServiceSubscriptionName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSubNamePrefix") + if err != nil { + panic(err) + } +} + +func (p *ParamTable) initK2SChannelNames() { + prefix, err := p.Load("msgChannel.chanNamePrefix.k2s") + if err != nil { + panic(err) + } + prefix += "-" + iRangeStr, err := p.Load("msgChannel.channelRange.k2s") + if err != nil { + panic(err) + } + channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",") + var ret []string + for _, ID := range channelIDs { + ret = append(ret, prefix+strconv.Itoa(ID)) + } + p.K2SChannelNames = ret } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index e3782b22a..f3c07d99f 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -49,28 +49,31 @@ type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp Server struct { - ctx context.Context - serverLoopCtx context.Context - serverLoopCancel context.CancelFunc - serverLoopWg sync.WaitGroup - state internalpb2.StateCode - client *etcdkv.EtcdKV - meta *meta - segAllocator segmentAllocator - statsHandler *statsHandler - insertChannelMgr *insertChannelManager - allocator allocator - cluster *dataNodeCluster - msgProducer *timesync.MsgProducer - registerFinishCh chan struct{} - masterClient *masterservice.GrpcClient - ttMsgStream msgstream.MsgStream - ddChannelName string - segmentInfoStream msgstream.MsgStream + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel context.CancelFunc + serverLoopWg sync.WaitGroup + state internalpb2.StateCode + client *etcdkv.EtcdKV + meta *meta + segAllocator segmentAllocator + statsHandler *statsHandler + insertChannelMgr *insertChannelManager + allocator allocator + cluster *dataNodeCluster + msgProducer *timesync.MsgProducer + registerFinishCh chan struct{} + masterClient *masterservice.GrpcClient + ttMsgStream msgstream.MsgStream + k2sMsgStream msgstream.MsgStream + ddChannelName string + segmentInfoStream msgstream.MsgStream + segmentFlushStream msgstream.MsgStream } ) func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Server, error) { + Params.Init() ch := make(chan struct{}) return &Server{ ctx: ctx, @@ -83,32 +86,29 @@ func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Serve } func (s *Server) Init() error { - Params.Init() return nil } func (s *Server) Start() error { + var err error s.allocator = newAllocatorImpl(s.masterClient) - if err := s.initMeta(); err != nil { + if err = s.initMeta(); err != nil { return err } s.statsHandler = newStatsHandler(s.meta) - segAllocator, err := newSegmentAllocator(s.meta, s.allocator) + s.segAllocator, err = newSegmentAllocator(s.meta, s.allocator) if err != nil { return err } - s.segAllocator = segAllocator - s.waitDataNodeRegister() - - if err = s.loadMetaFromMaster(); err != nil { + s.initSegmentInfoChannel() + if err = s.initMsgProducer(); err != nil { return err } - if err = s.initMsgProducer(); err != nil { + if err = s.loadMetaFromMaster(); err != nil { return err } - - s.initSegmentInfoChannel() s.startServerLoop() + s.waitDataNodeRegister() s.state = internalpb2.StateCode_HEALTHY log.Println("start success") return nil @@ -128,21 +128,28 @@ func (s *Server) initMeta() error { return nil } -func (s *Server) waitDataNodeRegister() { - log.Println("waiting data node to register") - <-s.registerFinishCh - log.Println("all data nodes register") +func (s *Server) initSegmentInfoChannel() { + segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) + segmentInfoStream.SetPulsarClient(Params.PulsarAddress) + segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName}) + s.segmentInfoStream = segmentInfoStream + s.segmentInfoStream.Start() } - func (s *Server) initMsgProducer() error { - ttMsgStream := pulsarms.NewPulsarTtMsgStream(s.ctx, 1024) + ttMsgStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) ttMsgStream.SetPulsarClient(Params.PulsarAddress) ttMsgStream.CreatePulsarConsumers([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) s.ttMsgStream = ttMsgStream s.ttMsgStream.Start() timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs()) dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster) - producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher) + k2sStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) + k2sStream.SetPulsarClient(Params.PulsarAddress) + k2sStream.CreatePulsarProducers(Params.K2SChannelNames) + s.k2sMsgStream = k2sStream + s.k2sMsgStream.Start() + k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream) + producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher) if err != nil { return err } @@ -151,46 +158,6 @@ func (s *Server) initMsgProducer() error { return nil } -func (s *Server) startServerLoop() { - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(1) - go s.startStatsChannel(s.serverLoopCtx) -} - -func (s *Server) startStatsChannel(ctx context.Context) { - defer s.serverLoopWg.Done() - statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024) - statsStream.SetPulsarClient(Params.PulsarAddress) - statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) - statsStream.Start() - defer statsStream.Close() - for { - select { - case <-ctx.Done(): - return - default: - } - msgPack := statsStream.Consume() - for _, msg := range msgPack.Msgs { - statistics := msg.(*msgstream.SegmentStatisticsMsg) - for _, stat := range statistics.SegStats { - if err := s.statsHandler.HandleSegmentStat(stat); err != nil { - log.Println(err.Error()) - continue - } - } - } - } -} - -func (s *Server) initSegmentInfoChannel() { - segmentInfoStream := pulsarms.NewPulsarMsgStream(s.ctx, 1024) - segmentInfoStream.SetPulsarClient(Params.PulsarAddress) - segmentInfoStream.CreatePulsarProducers([]string{Params.SegmentInfoChannelName}) - s.segmentInfoStream = segmentInfoStream - s.segmentInfoStream.Start() -} - func (s *Server) loadMetaFromMaster() error { log.Println("loading collection meta from master") collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{ @@ -248,9 +215,83 @@ func (s *Server) loadMetaFromMaster() error { log.Println("load collection meta from master complete") return nil } +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(2) + go s.startStatsChannel(s.serverLoopCtx) + go s.startSegmentFlushChannel(s.serverLoopCtx) +} + +func (s *Server) startStatsChannel(ctx context.Context) { + defer s.serverLoopWg.Done() + statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024) + statsStream.SetPulsarClient(Params.PulsarAddress) + statsStream.CreatePulsarConsumers([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) + statsStream.Start() + defer statsStream.Close() + for { + select { + case <-ctx.Done(): + return + default: + } + msgPack := statsStream.Consume() + for _, msg := range msgPack.Msgs { + statistics := msg.(*msgstream.SegmentStatisticsMsg) + for _, stat := range statistics.SegStats { + if err := s.statsHandler.HandleSegmentStat(stat); err != nil { + log.Println(err.Error()) + continue + } + } + } + } +} + +func (s *Server) startSegmentFlushChannel(ctx context.Context) { + defer s.serverLoopWg.Done() + flushStream := pulsarms.NewPulsarMsgStream(ctx, 1024) + flushStream.SetPulsarClient(Params.PulsarAddress) + flushStream.CreatePulsarConsumers([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) + flushStream.Start() + defer flushStream.Close() + for { + select { + case <-ctx.Done(): + log.Println("segment flush channel shut down") + return + default: + } + msgPack := flushStream.Consume() + for _, msg := range msgPack.Msgs { + if msg.Type() != commonpb.MsgType_kSegmentFlushDone { + continue + } + realMsg := msg.(*msgstream.FlushCompletedMsg) + + segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID) + if err != nil { + log.Println(err.Error()) + continue + } + segmentInfo.FlushedTime = realMsg.BeginTimestamp + if err = s.meta.UpdateSegment(segmentInfo); err != nil { + log.Println(err.Error()) + continue + } + } + } +} + +func (s *Server) waitDataNodeRegister() { + log.Println("waiting data node to register") + <-s.registerFinishCh + log.Println("all data nodes register") +} func (s *Server) Stop() error { s.ttMsgStream.Close() + s.k2sMsgStream.Close() s.msgProducer.Close() s.segmentInfoStream.Close() s.stopServerLoop() @@ -398,6 +439,23 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha if err = s.segAllocator.OpenSegment(segmentInfo); err != nil { return err } + infoMsg := &msgstream.SegmentInfoMsg{ + SegmentMsg: datapb.SegmentMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kSegmentInfo, + MsgID: 0, + Timestamp: 0, // todo + SourceID: 0, + }, + Segment: segmentInfo, + }, + } + msgPack := &pulsarms.MsgPack{ + Msgs: []msgstream.TsMsg{infoMsg}, + } + if err = s.segmentInfoStream.Produce(msgPack); err != nil { + return err + } return nil } @@ -422,7 +480,8 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg resp.CreateTime = segmentInfo.OpenTime resp.SealedTime = segmentInfo.SealedTime resp.FlushedTime = segmentInfo.FlushedTime - // TODO start/end positions + resp.StartPositions = segmentInfo.StartPosition + resp.EndPositions = segmentInfo.EndPosition return resp, nil } diff --git a/internal/dataservice/stats_handler.go b/internal/dataservice/stats_handler.go index 2c1edc95f..68c113b2c 100644 --- a/internal/dataservice/stats_handler.go +++ b/internal/dataservice/stats_handler.go @@ -20,10 +20,25 @@ func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStat return err } - //if segStats.IsNewSegment { - // segMeta.OpenTime = segStats.CreateTime - // segMeta.segStats.StartPositions - //} + if segStats.IsNewSegment { + segMeta.OpenTime = segStats.CreateTime + segMeta.StartPosition = append(segMeta.StartPosition, segStats.StartPositions...) + } + segMeta.SealedTime = segStats.EndTime + for _, pos := range segStats.EndPositions { + isNew := true + for _, epos := range segMeta.EndPosition { + if epos.ChannelName == pos.ChannelName { + epos.Timestamp = pos.Timestamp + epos.MsgID = pos.MsgID + isNew = false + break + } + } + if isNew { + segMeta.EndPosition = append(segMeta.EndPosition, pos) + } + } segMeta.NumRows = segStats.NumRows segMeta.MemSize = segStats.MemorySize diff --git a/internal/distributed/masterservice/client.go b/internal/distributed/masterservice/client.go index 18af4df96..ff81bee94 100644 --- a/internal/distributed/masterservice/client.go +++ b/internal/distributed/masterservice/client.go @@ -5,7 +5,6 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/errors" - cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -57,93 +56,63 @@ func (c *GrpcClient) Stop() error { } func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{}) + return c.grpcClient.GetComponentStatesRPC(context.Background(), &commonpb.Empty{}) } //DDL request func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.CreateCollection(ctx, in) + return c.grpcClient.CreateCollection(context.Background(), in) } func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DropCollection(ctx, in) + return c.grpcClient.DropCollection(context.Background(), in) } func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.HasCollection(ctx, in) + return c.grpcClient.HasCollection(context.Background(), in) } func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DescribeCollection(ctx, in) + return c.grpcClient.DescribeCollection(context.Background(), in) } func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.ShowCollections(ctx, in) + return c.grpcClient.ShowCollections(context.Background(), in) } func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.CreatePartition(ctx, in) + return c.grpcClient.CreatePartition(context.Background(), in) } func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DropPartition(ctx, in) + return c.grpcClient.DropPartition(context.Background(), in) } func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.HasPartition(ctx, in) + return c.grpcClient.HasPartition(context.Background(), in) } func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.ShowPartitions(ctx, in) + return c.grpcClient.ShowPartitions(context.Background(), in) } //index builder service func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.CreateIndex(ctx, in) + return c.grpcClient.CreateIndex(context.Background(), in) } func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DescribeIndex(ctx, in) + return c.grpcClient.DescribeIndex(context.Background(), in) } //global timestamp allocator func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.AllocTimestamp(ctx, in) + return c.grpcClient.AllocTimestamp(context.Background(), in) } func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.AllocID(ctx, in) + return c.grpcClient.AllocID(context.Background(), in) } //receiver time tick from proxy service, and put it into this channel func (c *GrpcClient) GetTimeTickChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{}) + rsp, err := c.grpcClient.GetTimeTickChannelRPC(context.Background(), &commonpb.Empty{}) if err != nil { return "", err } @@ -155,9 +124,7 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) { //receive ddl from rpc and time tick from proxy service, and put them into this channel func (c *GrpcClient) GetDdChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{}) + rsp, err := c.grpcClient.GetDdChannelRPC(context.Background(), &commonpb.Empty{}) if err != nil { return "", err } @@ -169,9 +136,7 @@ func (c *GrpcClient) GetDdChannel() (string, error) { //just define a channel, not used currently func (c *GrpcClient) GetStatisticsChannel() (string, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{}) + rsp, err := c.grpcClient.GetStatisticsChannelRPC(context.Background(), &commonpb.Empty{}) if err != nil { return "", err } @@ -182,13 +147,9 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) { } func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.DescribeSegment(ctx, in) + return c.grpcClient.DescribeSegment(context.Background(), in) } func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) - defer cancel() - return c.grpcClient.ShowSegments(ctx, in) + return c.grpcClient.ShowSegments(context.Background(), in) } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index f2e9514c6..17ecc0acd 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -1,7 +1,6 @@ package masterservice import ( - "context" "fmt" "math/rand" "regexp" @@ -27,7 +26,7 @@ func TestGrpcService(t *testing.T) { //cms.Params.Address = "127.0.0.1" cms.Params.Port = (randVal % 100) + 10000 - svr, err := NewGrpcServer(context.Background()) + svr, err := NewGrpcServer() assert.Nil(t, err) // cms.Params.NodeID = 0 diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index e1731e302..a531f860e 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -6,7 +6,6 @@ import ( "net" "sync" - "github.com/zilliztech/milvus-distributed/internal/errors" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -27,10 +26,10 @@ type GrpcServer struct { cancel context.CancelFunc } -func NewGrpcServer(ctx context.Context) (*GrpcServer, error) { +func NewGrpcServer() (*GrpcServer, error) { s := &GrpcServer{} var err error - s.ctx, s.cancel = context.WithCancel(ctx) + s.ctx, s.cancel = context.WithCancel(context.Background()) if s.core, err = cms.NewCore(s.ctx); err != nil { return nil, err } @@ -74,30 +73,6 @@ func (s *GrpcServer) Stop() error { return err } -func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set proxy service failed") - } - return c.SetProxyService(p) -} - -func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set data service failed") - } - return c.SetDataService(p) -} - -func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error { - c, ok := s.core.(*cms.Core) - if !ok { - return errors.Errorf("set index service failed") - } - return c.SetIndexService(p) -} - func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { return s.core.GetComponentStates() } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index cc4338d4a..5cf7eb90d 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -2,7 +2,6 @@ package masterservice import ( "context" - "fmt" "log" "math/rand" "strconv" @@ -736,13 +735,6 @@ func (c *Core) GetStatisticsChannel() (string, error) { } func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -766,13 +758,6 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb } func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -795,16 +780,6 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta } func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - Value: false, - }, nil - } t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -834,17 +809,6 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR } func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - Schema: nil, - CollectionID: 0, - }, nil - } t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -872,16 +836,6 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv } func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.ShowCollectionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - CollectionNames: nil, - }, nil - } t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -911,13 +865,6 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh } func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -940,13 +887,6 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S } func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -969,16 +909,6 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu } func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - Value: false, - }, nil - } t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1008,17 +938,6 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes } func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.ShowPartitionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - PartitionNames: nil, - PartitionIDs: nil, - }, nil - } t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1049,13 +968,6 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show } func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, nil - } t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1078,16 +990,6 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e } func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - IndexDescriptions: nil, - }, nil - } t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1118,16 +1020,6 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr } func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.DescribeSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - IndexID: 0, - }, nil - } t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1158,16 +1050,6 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D } func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - code := c.stateCode.Load().(internalpb2.StateCode) - if code != internalpb2.StateCode_HEALTHY { - return &milvuspb.ShowSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), - }, - SegmentIDs: nil, - }, nil - } t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index b4f8c45f4..61dde4ba0 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -27,8 +27,6 @@ type ParamTable struct { MaxPartitionNum int64 DefaultPartitionName string DefaultIndexName string - - Timeout int } func (p *ParamTable) Init() { @@ -56,8 +54,6 @@ func (p *ParamTable) Init() { p.initMaxPartitionNum() p.initDefaultPartitionName() p.initDefaultIndexName() - - p.initTimeout() } func (p *ParamTable) initAddress() { @@ -167,7 +163,3 @@ func (p *ParamTable) initDefaultIndexName() { } p.DefaultIndexName = name } - -func (p *ParamTable) initTimeout() { - p.Timeout = p.ParseInt("master.timeout") -} diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index af09ab497..2c2b071e7 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -50,7 +50,4 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.DefaultIndexName, "") t.Logf("default index name = %s", Params.DefaultIndexName) - - assert.NotZero(t, Params.Timeout) - t.Logf("master timeout = %d", Params.Timeout) } diff --git a/internal/timesync/timetick_watcher.go b/internal/timesync/timetick_watcher.go index 906453917..9ba4c87d2 100644 --- a/internal/timesync/timetick_watcher.go +++ b/internal/timesync/timetick_watcher.go @@ -17,6 +17,14 @@ type MsgTimeTickWatcher struct { msgQueue chan *ms.TimeTickMsg } +func NewMsgTimeTickWatcher(streams ...ms.MsgStream) *MsgTimeTickWatcher { + watcher := &MsgTimeTickWatcher{ + streams: streams, + msgQueue: make(chan *ms.TimeTickMsg), + } + return watcher +} + func (watcher *MsgTimeTickWatcher) Watch(msg *ms.TimeTickMsg) { watcher.msgQueue <- msg } -- GitLab