diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go new file mode 100644 index 0000000000000000000000000000000000000000..9b050f956098fd4ec7616ae7f509c9307f1017ce --- /dev/null +++ b/cmd/masterservice/main.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + ds "github.com/zilliztech/milvus-distributed/internal/dataservice" + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice" + is "github.com/zilliztech/milvus-distributed/internal/indexservice" + ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" +) + +const reTryCnt = 3 + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port) + + svr, err := msc.NewGrpcServer(ctx) + if err != nil { + panic(err) + } + + log.Printf("proxy service address : %s", psc.Params.NetworkAddress()) + //proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress()) + + //TODO, test proxy service GetComponentStates, before set + + //if err = svr.SetProxyService(proxyService); err != nil { + // panic(err) + //} + + log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) + dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + cnt := 0 + for cnt = 0; cnt < reTryCnt; cnt++ { + dsStates, err := dataService.GetComponentStates() + if err != nil { + continue + } + if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= reTryCnt { + panic("connect to data service failed") + } + + //if err = svr.SetDataService(dataService); err != nil { + // panic(err) + //} + + log.Printf("index service address : %s", is.Params.Address) + indexService := isc.NewClient(is.Params.Address) + + if err = svr.SetIndexService(indexService); err != nil { + panic(err) + } + + if err = svr.Start(); err != nil { + panic(err) + } + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + sig := <-sc + log.Printf("Got %s signal to exit", sig.String()) + _ = svr.Stop() +} diff --git a/configs/advanced/master.yaml b/configs/advanced/master.yaml index 69474322ae7cca630a3823294effab00227af124..3e1c4e7816b0ed21682547262e140e86557c910a 100644 --- a/configs/advanced/master.yaml +++ b/configs/advanced/master.yaml @@ -23,4 +23,5 @@ master: IDAssignExpiration: 2000 # ms maxPartitionNum: 4096 - nodeID: 100 \ No newline at end of file + nodeID: 100 + timeout: 5 # time out, 5 seconds \ No newline at end of file diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index 0cad7a46c37e74de7a00aa21fc1bd46060e43723..83400a8ea206af8f18e99faaab15545048f5d81a 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -25,7 +25,7 @@ func (alloc *allocatorImpl) allocID() (UniqueID, error) { resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, - MsgID: 1, // GOOSE TODO add msg id + MsgID: 1, // GOOSE TODO Timestamp: 0, // GOOSE TODO SourceID: Params.NodeID, }, diff --git a/internal/datanode/collection.go b/internal/datanode/collection.go index 3610dc0b81840aa0bf3a0c04e59baae4fcfdca1e..d489eef70513b0affbf743a49ff64fa0a1ea4f51 100644 --- a/internal/datanode/collection.go +++ b/internal/datanode/collection.go @@ -1,9 +1,6 @@ package datanode import ( - "log" - - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -24,17 +21,9 @@ func (c *Collection) Schema() *schemapb.CollectionSchema { return c.schema } -func newCollection(collectionID UniqueID, schemaStr string) *Collection { - - var schema schemapb.CollectionSchema - err := proto.UnmarshalText(schemaStr, &schema) - if err != nil { - log.Println(err) - return nil - } - +func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection { var newCollection = &Collection{ - schema: &schema, + schema: schema, id: collectionID, } return newCollection diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 5412a2ebd4dccaebb390567d00f46bf3d50becf1..0be37979a459840077bc246e1acf4b9a2f7dd2fe 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -6,13 +6,14 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) type collectionReplica interface { // collection getCollectionNum() int - addCollection(collectionID UniqueID, schemaBlob string) error + addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) @@ -162,11 +163,11 @@ func (colReplica *collectionReplicaImpl) getCollectionNum() int { return len(colReplica.collections) } -func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schemaBlob string) error { +func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - var newCollection = newCollection(collectionID, schemaBlob) + var newCollection = newCollection(collectionID, schema) colReplica.collections = append(colReplica.collections, newCollection) log.Println("Create collection: ", newCollection.Name()) diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index 25869712d7b64ec5d1f61db9d87a22ce59b7af30..c97d38dbddfbea284393e2a0f7b07fbafea1c106 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -3,7 +3,6 @@ package datanode import ( "testing" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -23,10 +22,7 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - schemaBlob := proto.MarshalTextString(collectionMeta.Schema) - require.NotEqual(t, "", schemaBlob) - - var err = replica.addCollection(collectionMeta.ID, schemaBlob) + var err = replica.addCollection(collectionMeta.ID, collectionMeta.Schema) require.NoError(t, err) collection, err := replica.getCollectionByName(collectionName) diff --git a/internal/datanode/collection_test.go b/internal/datanode/collection_test.go index cce9bc1dcdd3e28abd1d0cf60ce2eeedef6afbcc..6f12bfd0bbec3c7ef8b1ecd26b13bc1c0a4ae797 100644 --- a/internal/datanode/collection_test.go +++ b/internal/datanode/collection_test.go @@ -3,7 +3,6 @@ package datanode import ( "testing" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" ) @@ -13,10 +12,7 @@ func TestCollection_newCollection(t *testing.T) { Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - schemaBlob := proto.MarshalTextString(collectionMeta.Schema) - assert.NotEqual(t, "", schemaBlob) - - collection := newCollection(collectionMeta.ID, schemaBlob) + collection := newCollection(collectionMeta.ID, collectionMeta.Schema) assert.Equal(t, collection.Name(), collectionName) assert.Equal(t, collection.ID(), collectionID) } @@ -27,10 +23,7 @@ func TestCollection_deleteCollection(t *testing.T) { Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) - schemaBlob := proto.MarshalTextString(collectionMeta.Schema) - assert.NotEqual(t, "", schemaBlob) - - collection := newCollection(collectionMeta.ID, schemaBlob) + collection := newCollection(collectionMeta.ID, collectionMeta.Schema) assert.Equal(t, collection.Name(), collectionName) assert.Equal(t, collection.ID(), collectionID) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 4be2c78d99e97c877a9424d9e1d6e325398801bc..ced35ae9dc6dea42b40a9a53c3e8cbfbcf29a6ce 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -26,8 +26,8 @@ const ( type ( Inteface interface { typeutil.Service + typeutil.Component - GetComponentStates() (*internalpb2.ComponentStates, error) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) } @@ -43,6 +43,7 @@ type ( } DataNode struct { + // GOOSE TODO: complete interface with component ctx context.Context NodeID UniqueID Role string @@ -124,7 +125,7 @@ func (node *DataNode) Init() error { chanSize := 100 flushChan := make(chan *flushMsg, chanSize) node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc) - node.metaService = newMetaService(node.ctx, replica) + node.metaService = newMetaService(node.ctx, replica, node.masterService) node.replica = replica // Opentracing @@ -154,7 +155,7 @@ func (node *DataNode) Init() error { func (node *DataNode) Start() error { go node.dataSyncService.start() - node.metaService.start() + node.metaService.init() return nil } diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 0813dde84bb73afdf6116475917b6ce35fa7e55a..3b40ea1a203eb60a2790dfd5b034080ceaeeecc5 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -42,7 +41,7 @@ func TestDataSyncService_Start(t *testing.T) { replica := newReplica() allocFactory := AllocatorFactory{} sync := newDataSyncService(ctx, flushChan, replica, allocFactory) - sync.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema)) + sync.replica.addCollection(collMeta.ID, collMeta.Schema) go sync.start() // test data generate diff --git a/internal/datanode/factory.go b/internal/datanode/factory.go index d8a462a9e83952b546536129a206da0bb4ef9a69..bb87ca008b4b45062b01c0f766778f351645a760 100644 --- a/internal/datanode/factory.go +++ b/internal/datanode/factory.go @@ -3,6 +3,8 @@ package datanode import ( "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -15,6 +17,12 @@ type ( AllocatorFactory struct { } + + MasterServiceFactory struct { + ID UniqueID + collectionName string + collectionID UniqueID + } ) func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta { @@ -156,3 +164,42 @@ func (alloc AllocatorFactory) allocID() (UniqueID, error) { // GOOSE TODO: random ID generate return UniqueID(0), nil } + +func (m *MasterServiceFactory) setID(id UniqueID) { + m.ID = id // GOOSE TODO: random ID generator +} + +func (m *MasterServiceFactory) setCollectionID(id UniqueID) { + m.collectionID = id +} + +func (m *MasterServiceFactory) setCollectionName(name string) { + m.collectionName = name +} + +func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { + resp := &masterpb.IDResponse{ + Status: &commonpb.Status{}, + ID: m.ID, + } + return resp, nil +} + +func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { + resp := &milvuspb.ShowCollectionResponse{ + Status: &commonpb.Status{}, + CollectionNames: []string{m.collectionName}, + } + return resp, nil + +} +func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + f := MetaFactory{} + meta := f.CollectionMetaFactory(m.collectionID, m.collectionName) + resp := &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{}, + CollectionID: m.collectionID, + Schema: meta.Schema, + } + return resp, nil +} diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index d7432ae161998fff449a26b08416d77d8d6bae16..45676f19b64e82145e328b4176bf847dbc4d7d16 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -224,9 +224,8 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { return } - schemaStr := proto.MarshalTextString(&schema) // add collection - err = ddNode.replica.addCollection(collectionID, schemaStr) + err = ddNode.replica.addCollection(collectionID, &schema) if err != nil { log.Println(err) return diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 78b08901e2830c8fbc22e88dda1d4aacb6c8f584..82f0ef69e9c6bf0c103141ab12ac6cfa0d79ed9c 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -39,11 +38,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { Factory := &MetaFactory{} collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1") - schemaBlob := proto.MarshalTextString(collMeta.Schema) - require.NotEqual(t, "", schemaBlob) replica := newReplica() - err = replica.addCollection(collMeta.ID, schemaBlob) + err = replica.addCollection(collMeta.ID, collMeta.Schema) require.NoError(t, err) // Params.FlushInsertBufSize = 2 diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index fa92d59fbcfaecc4e807c8bdb945c2e99adedf53..c259af622f0cad06998241cd3e44ad7c4473b1ad 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -4,73 +4,91 @@ import ( "context" "fmt" "log" - "path" "reflect" - "strings" - "time" - "github.com/golang/protobuf/proto" - "go.etcd.io/etcd/clientv3" - - etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" ) type metaService struct { - ctx context.Context - kvBase *etcdkv.EtcdKV - replica collectionReplica + ctx context.Context + replica collectionReplica + masterClient MasterServiceInterface } -func newMetaService(ctx context.Context, replica collectionReplica) *metaService { - ETCDAddr := Params.EtcdAddress - MetaRootPath := Params.MetaRootPath - - cli, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{ETCDAddr}, - DialTimeout: 5 * time.Second, - }) - +func newMetaService(ctx context.Context, replica collectionReplica, m MasterServiceInterface) *metaService { return &metaService{ - ctx: ctx, - kvBase: etcdkv.NewEtcdKV(cli, MetaRootPath), - replica: replica, + ctx: ctx, + replica: replica, + masterClient: m, } } -func (mService *metaService) start() { - // init from meta +func (mService *metaService) init() { err := mService.loadCollections() if err != nil { - log.Fatal("metaService loadCollections failed") + log.Fatal("metaService init failed:", err) } } -func GetCollectionObjID(key string) string { - ETCDRootPath := Params.MetaRootPath +func (mService *metaService) loadCollections() error { + names, err := mService.getCollectionNames() + if err != nil { + return err + } - prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" - return strings.TrimPrefix(key, prefix) + for _, name := range names { + err := mService.createCollection(name) + if err != nil { + return err + } + } + return nil } -func isCollectionObj(key string) bool { - ETCDRootPath := Params.MetaRootPath - - prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/" - prefix = strings.TrimSpace(prefix) - index := strings.Index(key, prefix) +func (mService *metaService) getCollectionNames() ([]string, error) { + req := &milvuspb.ShowCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kShowCollections, + MsgID: 0, //GOOSE TODO + Timestamp: 0, // GOOSE TODO + SourceID: Params.NodeID, + }, + DbName: "default", // GOOSE TODO + } - return index == 0 + response, err := mService.masterClient.ShowCollections(req) + if err != nil { + return nil, errors.Errorf("Get collection names from master service wrong: %v", err) + } + return response.GetCollectionNames(), nil } -func isSegmentObj(key string) bool { - ETCDRootPath := Params.MetaRootPath +func (mService *metaService) createCollection(name string) error { + req := &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kDescribeCollection, + MsgID: 0, //GOOSE TODO + Timestamp: 0, // GOOSE TODO + SourceID: Params.NodeID, + }, + DbName: "default", // GOOSE TODO + CollectionName: name, + } - prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/" - prefix = strings.TrimSpace(prefix) - index := strings.Index(key, prefix) + response, err := mService.masterClient.DescribeCollection(req) + if err != nil { + return errors.Errorf("Describe collection %v from master service wrong: %v", name, err) + } - return index == 0 + err = mService.replica.addCollection(response.GetCollectionID(), response.GetSchema()) + if err != nil { + return errors.Errorf("Add collection %v into collReplica wrong: %v", name, err) + } + + return nil } func printCollectionStruct(obj *etcdpb.CollectionMeta) { @@ -85,51 +103,3 @@ func printCollectionStruct(obj *etcdpb.CollectionMeta) { fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface()) } } - -func (mService *metaService) processCollectionCreate(id string, value string) { - //println(fmt.Sprintf("Create Collection:$%s$", id)) - - col := mService.collectionUnmarshal(value) - if col != nil { - schema := col.Schema - schemaBlob := proto.MarshalTextString(schema) - err := mService.replica.addCollection(col.ID, schemaBlob) - if err != nil { - log.Println(err) - } - } -} - -func (mService *metaService) loadCollections() error { - keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix) - if err != nil { - return err - } - - for i := range keys { - objID := GetCollectionObjID(keys[i]) - mService.processCollectionCreate(objID, values[i]) - } - - return nil -} - -//----------------------------------------------------------------------- Unmarshal and Marshal -func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta { - col := etcdpb.CollectionMeta{} - err := proto.UnmarshalText(value, &col) - if err != nil { - log.Println(err) - return nil - } - return &col -} - -func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string { - value := proto.MarshalTextString(col) - if value == "" { - log.Println("marshal collection failed") - return "" - } - return value -} diff --git a/internal/datanode/meta_service_test.go b/internal/datanode/meta_service_test.go index 3154ecebd0ebfd1f7b62bc46c1d0c08ff6605fd1..23d97085b5e2ced21dfe0cfab0fb29f12680e405 100644 --- a/internal/datanode/meta_service_test.go +++ b/internal/datanode/meta_service_test.go @@ -7,94 +7,46 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMetaService_start(t *testing.T) { +func TestMetaService_All(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() replica := newReplica() + mFactory := &MasterServiceFactory{} + mFactory.setCollectionID(0) + mFactory.setCollectionName("a-collection") + metaService := newMetaService(ctx, replica, mFactory) + + t.Run("Test getCollectionNames", func(t *testing.T) { + names, err := metaService.getCollectionNames() + assert.NoError(t, err) + assert.Equal(t, 1, len(names)) + assert.Equal(t, "a-collection", names[0]) + }) + + t.Run("Test createCollection", func(t *testing.T) { + hasColletion := metaService.replica.hasCollection(0) + assert.False(t, hasColletion) + + err := metaService.createCollection("a-collection") + assert.NoError(t, err) + hasColletion = metaService.replica.hasCollection(0) + assert.True(t, hasColletion) + }) + + t.Run("Test loadCollections", func(t *testing.T) { + hasColletion := metaService.replica.hasCollection(1) + assert.False(t, hasColletion) + + mFactory.setCollectionID(1) + mFactory.setCollectionName("a-collection-1") + err := metaService.loadCollections() + assert.NoError(t, err) + + hasColletion = metaService.replica.hasCollection(1) + assert.True(t, hasColletion) + hasColletion = metaService.replica.hasCollection(0) + assert.True(t, hasColletion) + }) - metaService := newMetaService(ctx, replica) - - metaService.start() -} - -func TestMetaService_getCollectionObjId(t *testing.T) { - var key = "/collection/collection0" - var collectionObjID1 = GetCollectionObjID(key) - - assert.Equal(t, collectionObjID1, "/collection/collection0") - - key = "fakeKey" - var collectionObjID2 = GetCollectionObjID(key) - - assert.Equal(t, collectionObjID2, "fakeKey") -} - -func TestMetaService_isCollectionObj(t *testing.T) { - var key = Params.MetaRootPath + "/collection/collection0" - var b1 = isCollectionObj(key) - - assert.Equal(t, b1, true) - - key = Params.MetaRootPath + "/segment/segment0" - var b2 = isCollectionObj(key) - - assert.Equal(t, b2, false) -} - -func TestMetaService_processCollectionCreate(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - replica := newReplica() - metaService := newMetaService(ctx, replica) - defer cancel() - id := "0" - value := `schema: < - name: "test" - fields: < - fieldID:100 - name: "vec" - data_type: VECTOR_FLOAT - type_params: < - key: "dim" - value: "16" - > - index_params: < - key: "metric_type" - value: "L2" - > - > - fields: < - fieldID:101 - name: "age" - data_type: INT32 - type_params: < - key: "dim" - value: "1" - > - > - > - segmentIDs: 0 - partition_tags: "default" - ` - - metaService.processCollectionCreate(id, value) - - collectionNum := replica.getCollectionNum() - assert.Equal(t, collectionNum, 1) - - collection, err := replica.getCollectionByName("test") - assert.NoError(t, err) - assert.Equal(t, collection.ID(), UniqueID(0)) -} - -func TestMetaService_loadCollections(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - replica := newReplica() - - metaService := newMetaService(ctx, replica) - - err2 := (*metaService).loadCollections() - assert.Nil(t, err2) } diff --git a/internal/distributed/masterservice/client.go b/internal/distributed/masterservice/client.go index ff81bee94ff9bd88435ba40eea11262ded94463c..18af4df96bdc664045287e0ebc526c63a16fdd9e 100644 --- a/internal/distributed/masterservice/client.go +++ b/internal/distributed/masterservice/client.go @@ -5,6 +5,7 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/errors" + cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -56,63 +57,93 @@ func (c *GrpcClient) Stop() error { } func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) { - return c.grpcClient.GetComponentStatesRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{}) } //DDL request func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - return c.grpcClient.CreateCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.CreateCollection(ctx, in) } func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - return c.grpcClient.DropCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DropCollection(ctx, in) } func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - return c.grpcClient.HasCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.HasCollection(ctx, in) } func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - return c.grpcClient.DescribeCollection(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DescribeCollection(ctx, in) } func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { - return c.grpcClient.ShowCollections(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.ShowCollections(ctx, in) } func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - return c.grpcClient.CreatePartition(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.CreatePartition(ctx, in) } func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - return c.grpcClient.DropPartition(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DropPartition(ctx, in) } func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - return c.grpcClient.HasPartition(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.HasPartition(ctx, in) } func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - return c.grpcClient.ShowPartitions(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.ShowPartitions(ctx, in) } //index builder service func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - return c.grpcClient.CreateIndex(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.CreateIndex(ctx, in) } func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - return c.grpcClient.DescribeIndex(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DescribeIndex(ctx, in) } //global timestamp allocator func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) { - return c.grpcClient.AllocTimestamp(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.AllocTimestamp(ctx, in) } func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { - return c.grpcClient.AllocID(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.AllocID(ctx, in) } //receiver time tick from proxy service, and put it into this channel func (c *GrpcClient) GetTimeTickChannel() (string, error) { - rsp, err := c.grpcClient.GetTimeTickChannelRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{}) if err != nil { return "", err } @@ -124,7 +155,9 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) { //receive ddl from rpc and time tick from proxy service, and put them into this channel func (c *GrpcClient) GetDdChannel() (string, error) { - rsp, err := c.grpcClient.GetDdChannelRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{}) if err != nil { return "", err } @@ -136,7 +169,9 @@ func (c *GrpcClient) GetDdChannel() (string, error) { //just define a channel, not used currently func (c *GrpcClient) GetStatisticsChannel() (string, error) { - rsp, err := c.grpcClient.GetStatisticsChannelRPC(context.Background(), &commonpb.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{}) if err != nil { return "", err } @@ -147,9 +182,13 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) { } func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - return c.grpcClient.DescribeSegment(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.DescribeSegment(ctx, in) } func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - return c.grpcClient.ShowSegments(context.Background(), in) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout)) + defer cancel() + return c.grpcClient.ShowSegments(ctx, in) } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 17ecc0acd6108427b41dbc7abc1d49c68ca06023..f2e9514c698696e0248e459e5db311cfcf9718d7 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -1,6 +1,7 @@ package masterservice import ( + "context" "fmt" "math/rand" "regexp" @@ -26,7 +27,7 @@ func TestGrpcService(t *testing.T) { //cms.Params.Address = "127.0.0.1" cms.Params.Port = (randVal % 100) + 10000 - svr, err := NewGrpcServer() + svr, err := NewGrpcServer(context.Background()) assert.Nil(t, err) // cms.Params.NodeID = 0 diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index a531f860e39464689843a8d5bb2d7165e556d9e5..e1731e30255f62891d4e593bdd18a89384c09ca0 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -6,6 +6,7 @@ import ( "net" "sync" + "github.com/zilliztech/milvus-distributed/internal/errors" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -26,10 +27,10 @@ type GrpcServer struct { cancel context.CancelFunc } -func NewGrpcServer() (*GrpcServer, error) { +func NewGrpcServer(ctx context.Context) (*GrpcServer, error) { s := &GrpcServer{} var err error - s.ctx, s.cancel = context.WithCancel(context.Background()) + s.ctx, s.cancel = context.WithCancel(ctx) if s.core, err = cms.NewCore(s.ctx); err != nil { return nil, err } @@ -73,6 +74,30 @@ func (s *GrpcServer) Stop() error { return err } +func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error { + c, ok := s.core.(*cms.Core) + if !ok { + return errors.Errorf("set proxy service failed") + } + return c.SetProxyService(p) +} + +func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error { + c, ok := s.core.(*cms.Core) + if !ok { + return errors.Errorf("set data service failed") + } + return c.SetDataService(p) +} + +func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error { + c, ok := s.core.(*cms.Core) + if !ok { + return errors.Errorf("set index service failed") + } + return c.SetIndexService(p) +} + func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { return s.core.GetComponentStates() } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 5cf7eb90dffe5f83664924cc2bff04064cdbe170..cc4338d4ab5d0ed3457f15fc96ae881bf39b436a 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -2,6 +2,7 @@ package masterservice import ( "context" + "fmt" "log" "math/rand" "strconv" @@ -735,6 +736,13 @@ func (c *Core) GetStatisticsChannel() (string, error) { } func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -758,6 +766,13 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb } func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -780,6 +795,16 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta } func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + Value: false, + }, nil + } t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -809,6 +834,17 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR } func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + Schema: nil, + CollectionID: 0, + }, nil + } t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -836,6 +872,16 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv } func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.ShowCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + CollectionNames: nil, + }, nil + } t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -865,6 +911,13 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh } func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -887,6 +940,13 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S } func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -909,6 +969,16 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu } func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + Value: false, + }, nil + } t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -938,6 +1008,17 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes } func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.ShowPartitionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + PartitionNames: nil, + PartitionIDs: nil, + }, nil + } t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -968,6 +1049,13 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show } func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, nil + } t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -990,6 +1078,16 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e } func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.DescribeIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + IndexDescriptions: nil, + }, nil + } t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1020,6 +1118,16 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr } func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.DescribeSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + IndexID: 0, + }, nil + } t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1050,6 +1158,16 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D } func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { + code := c.stateCode.Load().(internalpb2.StateCode) + if code != internalpb2.StateCode_HEALTHY { + return &milvuspb.ShowSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), + }, + SegmentIDs: nil, + }, nil + } t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 61dde4ba0958e846a48c56d0ed6df0c0d9a193e2..b4f8c45f4b0d69cfef4cf0065eabdf5a8b302e39 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -27,6 +27,8 @@ type ParamTable struct { MaxPartitionNum int64 DefaultPartitionName string DefaultIndexName string + + Timeout int } func (p *ParamTable) Init() { @@ -54,6 +56,8 @@ func (p *ParamTable) Init() { p.initMaxPartitionNum() p.initDefaultPartitionName() p.initDefaultIndexName() + + p.initTimeout() } func (p *ParamTable) initAddress() { @@ -163,3 +167,7 @@ func (p *ParamTable) initDefaultIndexName() { } p.DefaultIndexName = name } + +func (p *ParamTable) initTimeout() { + p.Timeout = p.ParseInt("master.timeout") +} diff --git a/internal/masterservice/param_table_test.go b/internal/masterservice/param_table_test.go index 2c2b071e7c35202240bedb45202a66fdf36b3590..af09ab497770a5fe4fc99adbd01f6661a440dc1f 100644 --- a/internal/masterservice/param_table_test.go +++ b/internal/masterservice/param_table_test.go @@ -50,4 +50,7 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.DefaultIndexName, "") t.Logf("default index name = %s", Params.DefaultIndexName) + + assert.NotZero(t, Params.Timeout) + t.Logf("master timeout = %d", Params.Timeout) }