diff --git a/internal/master/client.go b/internal/master/client.go index 6fc9138dec5c228ad64a6cd21f1c2560aa0de3c5..34d7e73731be39dd80f9dd0b913144f84d927425 100644 --- a/internal/master/client.go +++ b/internal/master/client.go @@ -27,7 +27,7 @@ func (m *MockWriteNodeClient) FlushSegment(segmentID UniqueID) error { func (m *MockWriteNodeClient) DescribeSegment(segmentID UniqueID) (*writerclient.SegmentDescription, error) { now := time.Now() - if now.Sub(m.flushTime).Seconds() > 3 { + if now.Sub(m.flushTime).Seconds() > 2 { return &writerclient.SegmentDescription{ SegmentID: segmentID, IsClosed: true, @@ -67,7 +67,7 @@ func (m *MockBuildIndexClient) BuildIndexWithoutID(columnDataPaths []string, typ func (m *MockBuildIndexClient) DescribeIndex(indexID UniqueID) (*buildindexclient.IndexDescription, error) { now := time.Now() - if now.Sub(m.buildTime).Seconds() > 3 { + if now.Sub(m.buildTime).Seconds() > 2 { return &buildindexclient.IndexDescription{ ID: 1, Status: indexbuilderpb.IndexStatus_FINISHED, diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go deleted file mode 100644 index df986afe0f25021fcc452d200185317803ce5a61..0000000000000000000000000000000000000000 --- a/internal/master/collection_task_test.go +++ /dev/null @@ -1,373 +0,0 @@ -package master - -import ( - "context" - "log" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" -) - -func TestMaster_CollectionTask(t *testing.T) { - Init() - refreshMasterAddress() - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) - assert.Nil(t, err) - _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) - assert.Nil(t, err) - - Params = ParamTable{ - Address: Params.Address, - Port: Params.Port, - - EtcdAddress: Params.EtcdAddress, - MetaRootPath: "/test/root/meta", - KvRootPath: "/test/root/kv", - PulsarAddress: Params.PulsarAddress, - - ProxyIDList: []typeutil.UniqueID{1, 2}, - WriteNodeIDList: []typeutil.UniqueID{3, 4}, - - TopicNum: 5, - QueryNodeNum: 3, - SoftTimeTickBarrierInterval: 300, - - // segment - SegmentSize: 536870912 / 1024 / 1024, - SegmentSizeFactor: 0.75, - DefaultRecordSize: 1024, - MinSegIDAssignCnt: 1048576 / 1024, - MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, - SegIDAssignExpiration: 2000, - - // msgChannel - ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, - WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, - DDChannelNames: []string{"dd1", "dd2"}, - InsertChannelNames: []string{"dm0", "dm1"}, - K2SChannelNames: []string{"k2s0", "k2s1"}, - QueryNodeStatsChannelName: "statistic", - MsgChannelSubName: Params.MsgChannelSubName, - } - - svr, err := CreateServer(ctx) - assert.Nil(t, err) - err = svr.Run(int64(Params.Port)) - assert.Nil(t, err) - - conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) - assert.Nil(t, err) - defer conn.Close() - - cli := masterpb.NewMasterClient(conn) - sch := schemapb.CollectionSchema{ - Name: "col1", - Description: "test collection", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - Name: "col1_f1", - Description: "test collection filed 1", - DataType: schemapb.DataType_VECTOR_FLOAT, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f1_tk1", - Value: "col1_f1_tv1", - }, - { - Key: "col1_f1_tk2", - Value: "col1_f1_tv2", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f1_ik1", - Value: "col1_f1_iv1", - }, - { - Key: "col1_f1_ik2", - Value: "col1_f1_iv2", - }, - }, - }, - { - Name: "col1_f2", - Description: "test collection filed 2", - DataType: schemapb.DataType_VECTOR_BINARY, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f2_tk1", - Value: "col1_f2_tv1", - }, - { - Key: "col1_f2_tk2", - Value: "col1_f2_tv2", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f2_ik1", - Value: "col1_f2_iv1", - }, - { - Key: "col1_f2_ik2", - Value: "col1_f2_iv2", - }, - }, - }, - }, - } - schemaBytes, err := proto.Marshal(&sch) - assert.Nil(t, err) - - req := internalpb.CreateCollectionRequest{ - MsgType: internalpb.MsgType_kCreateCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - Schema: &commonpb.Blob{Value: schemaBytes}, - } - log.Printf("... [Create] collection col1\n") - st, err := cli.CreateCollection(ctx, &req) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - - // HasCollection - reqHasCollection := internalpb.HasCollectionRequest{ - MsgType: internalpb.MsgType_kHasCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - CollectionName: &servicepb.CollectionName{ - CollectionName: "col1", - }, - } - - // "col1" is true - log.Printf("... [Has] collection col1\n") - boolResp, err := cli.HasCollection(ctx, &reqHasCollection) - assert.Nil(t, err) - assert.Equal(t, true, boolResp.Value) - assert.Equal(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) - - // "colNotExist" is false - reqHasCollection.CollectionName.CollectionName = "colNotExist" - boolResp, err = cli.HasCollection(ctx, &reqHasCollection) - assert.Nil(t, err) - assert.Equal(t, boolResp.Value, false) - assert.Equal(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) - - // error - reqHasCollection.Timestamp = Timestamp(10) - reqHasCollection.CollectionName.CollectionName = "col1" - boolResp, err = cli.HasCollection(ctx, &reqHasCollection) - assert.Nil(t, err) - assert.NotEqual(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) - - // ShowCollection - reqShowCollection := internalpb.ShowCollectionRequest{ - MsgType: internalpb.MsgType_kShowCollections, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - } - - listResp, err := cli.ShowCollections(ctx, &reqShowCollection) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) - assert.Equal(t, 1, len(listResp.Values)) - assert.Equal(t, "col1", listResp.Values[0]) - - reqShowCollection.Timestamp = Timestamp(10) - listResp, err = cli.ShowCollections(ctx, &reqShowCollection) - assert.Nil(t, err) - assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) - - // CreateCollection Test - collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) - assert.Nil(t, err) - t.Logf("collection id = %d", collMeta.ID) - assert.Equal(t, collMeta.CreateTime, uint64(11)) - assert.Equal(t, collMeta.Schema.Name, "col1") - assert.Equal(t, collMeta.Schema.AutoID, false) - assert.Equal(t, len(collMeta.Schema.Fields), 2) - assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") - assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") - assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) - assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) - assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2) - assert.Equal(t, int64(100), collMeta.Schema.Fields[0].FieldID) - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Value, "col1_f1_tv2") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Key, "col1_f1_ik1") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Key, "col1_f1_ik2") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Value, "col1_f1_iv1") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Value, "col1_f1_iv2") - - assert.Equal(t, int64(101), collMeta.Schema.Fields[1].FieldID) - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Key, "col1_f2_tk1") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Key, "col1_f2_tk2") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Value, "col1_f2_tv1") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Value, "col1_f2_tv2") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Key, "col1_f2_ik1") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Key, "col1_f2_ik2") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") - - req.Timestamp = Timestamp(10) - st, err = cli.CreateCollection(ctx, &req) - assert.Nil(t, err) - assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - - // DescribeCollection Test - reqDescribe := &internalpb.DescribeCollectionRequest{ - MsgType: internalpb.MsgType_kDescribeCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - CollectionName: &servicepb.CollectionName{ - CollectionName: "col1", - }, - } - des, err := cli.DescribeCollection(ctx, reqDescribe) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, des.Status.ErrorCode) - - assert.Equal(t, "col1", des.Schema.Name) - assert.Equal(t, false, des.Schema.AutoID) - assert.Equal(t, 2, len(des.Schema.Fields)) - assert.Equal(t, "col1_f1", des.Schema.Fields[0].Name) - assert.Equal(t, "col1_f2", des.Schema.Fields[1].Name) - assert.Equal(t, schemapb.DataType_VECTOR_FLOAT, des.Schema.Fields[0].DataType) - assert.Equal(t, schemapb.DataType_VECTOR_BINARY, des.Schema.Fields[1].DataType) - assert.Equal(t, 2, len(des.Schema.Fields[0].TypeParams)) - assert.Equal(t, 2, len(des.Schema.Fields[0].IndexParams)) - assert.Equal(t, 2, len(des.Schema.Fields[1].TypeParams)) - assert.Equal(t, 2, len(des.Schema.Fields[1].IndexParams)) - assert.Equal(t, int64(100), des.Schema.Fields[0].FieldID) - assert.Equal(t, "col1_f1_tk1", des.Schema.Fields[0].TypeParams[0].Key) - assert.Equal(t, "col1_f1_tv1", des.Schema.Fields[0].TypeParams[0].Value) - assert.Equal(t, "col1_f1_ik1", des.Schema.Fields[0].IndexParams[0].Key) - assert.Equal(t, "col1_f1_iv1", des.Schema.Fields[0].IndexParams[0].Value) - assert.Equal(t, "col1_f1_tk2", des.Schema.Fields[0].TypeParams[1].Key) - assert.Equal(t, "col1_f1_tv2", des.Schema.Fields[0].TypeParams[1].Value) - assert.Equal(t, "col1_f1_ik2", des.Schema.Fields[0].IndexParams[1].Key) - assert.Equal(t, "col1_f1_iv2", des.Schema.Fields[0].IndexParams[1].Value) - - assert.Equal(t, int64(101), des.Schema.Fields[1].FieldID) - assert.Equal(t, "col1_f2_tk1", des.Schema.Fields[1].TypeParams[0].Key) - assert.Equal(t, "col1_f2_tv1", des.Schema.Fields[1].TypeParams[0].Value) - assert.Equal(t, "col1_f2_ik1", des.Schema.Fields[1].IndexParams[0].Key) - assert.Equal(t, "col1_f2_iv1", des.Schema.Fields[1].IndexParams[0].Value) - assert.Equal(t, "col1_f2_tk2", des.Schema.Fields[1].TypeParams[1].Key) - assert.Equal(t, "col1_f2_tv2", des.Schema.Fields[1].TypeParams[1].Value) - assert.Equal(t, "col1_f2_ik2", des.Schema.Fields[1].IndexParams[1].Key) - assert.Equal(t, "col1_f2_iv2", des.Schema.Fields[1].IndexParams[1].Value) - - reqDescribe.CollectionName.CollectionName = "colNotExist" - des, err = cli.DescribeCollection(ctx, reqDescribe) - assert.Nil(t, err) - assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, des.Status.ErrorCode) - log.Printf(des.Status.Reason) - - reqDescribe.CollectionName.CollectionName = "col1" - reqDescribe.Timestamp = Timestamp(10) - des, err = cli.DescribeCollection(ctx, reqDescribe) - assert.Nil(t, err) - assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, des.Status.ErrorCode) - log.Printf(des.Status.Reason) - - // ------------------------------DropCollectionTask--------------------------- - log.Printf("... [Drop] collection col1\n") - ser := servicepb.CollectionName{CollectionName: "col1"} - - reqDrop := internalpb.DropCollectionRequest{ - MsgType: internalpb.MsgType_kDropCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - CollectionName: &ser, - } - - // DropCollection - st, err = cli.DropCollection(ctx, &reqDrop) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - - collMeta, err = svr.metaTable.GetCollectionByName(sch.Name) - assert.NotNil(t, err) - - // HasCollection "col1" is false - reqHasCollection.Timestamp = Timestamp(11) - reqHasCollection.CollectionName.CollectionName = "col1" - boolResp, err = cli.HasCollection(ctx, &reqHasCollection) - assert.Nil(t, err) - assert.Equal(t, false, boolResp.Value) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, boolResp.Status.ErrorCode) - - // ShowCollections - reqShowCollection.Timestamp = Timestamp(11) - listResp, err = cli.ShowCollections(ctx, &reqShowCollection) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) - assert.Equal(t, 0, len(listResp.Values)) - - // Drop again - st, err = cli.DropCollection(ctx, &reqDrop) - assert.Nil(t, err) - assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - - // Create "col1" - req.Timestamp = Timestamp(11) - st, err = cli.CreateCollection(ctx, &req) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - - boolResp, err = cli.HasCollection(ctx, &reqHasCollection) - assert.Nil(t, err) - assert.Equal(t, true, boolResp.Value) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, boolResp.Status.ErrorCode) - - // Create "col2" - sch.Name = "col2" - schemaBytes, err = proto.Marshal(&sch) - assert.Nil(t, err) - - req = internalpb.CreateCollectionRequest{ - MsgType: internalpb.MsgType_kCreateCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - Schema: &commonpb.Blob{Value: schemaBytes}, - } - st, err = cli.CreateCollection(ctx, &req) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) - - // Show Collections - listResp, err = cli.ShowCollections(ctx, &reqShowCollection) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) - assert.Equal(t, 2, len(listResp.Values)) - assert.ElementsMatch(t, []string{"col1", "col2"}, listResp.Values) - - svr.Close() -} diff --git a/internal/master/config_task_test.go b/internal/master/config_task_test.go deleted file mode 100644 index 5de544c156108d64868e398dc85f784d80fd1cb6..0000000000000000000000000000000000000000 --- a/internal/master/config_task_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package master - -import ( - "context" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" -) - -func TestMaster_ConfigTask(t *testing.T) { - Init() - refreshMasterAddress() - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) - require.Nil(t, err) - _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) - require.Nil(t, err) - - Params = ParamTable{ - Address: Params.Address, - Port: Params.Port, - - EtcdAddress: Params.EtcdAddress, - MetaRootPath: "/test/root", - PulsarAddress: Params.PulsarAddress, - - ProxyIDList: []typeutil.UniqueID{1, 2}, - WriteNodeIDList: []typeutil.UniqueID{3, 4}, - - TopicNum: 5, - QueryNodeNum: 3, - SoftTimeTickBarrierInterval: 300, - - // segment - SegmentSize: 536870912 / 1024 / 1024, - SegmentSizeFactor: 0.75, - DefaultRecordSize: 1024, - MinSegIDAssignCnt: 1048576 / 1024, - MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, - SegIDAssignExpiration: 2000, - - // msgChannel - ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, - WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, - InsertChannelNames: []string{"dm0", "dm1"}, - K2SChannelNames: []string{"k2s0", "k2s1"}, - QueryNodeStatsChannelName: "statistic", - MsgChannelSubName: Params.MsgChannelSubName, - } - - svr, err := CreateServer(ctx) - require.Nil(t, err) - err = svr.Run(int64(Params.Port)) - defer svr.Close() - require.Nil(t, err) - - conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) - require.Nil(t, err) - defer conn.Close() - - cli := masterpb.NewMasterClient(conn) - testKeys := []string{ - "/etcd/address", - "/master/port", - "/master/proxyidlist", - "/master/segmentthresholdfactor", - "/pulsar/token", - "/reader/stopflag", - "/proxy/timezone", - "/proxy/network/address", - "/proxy/storage/path", - "/storage/accesskey", - } - - testVals := []string{ - "localhost", - "53100", - "[1 2]", - "0.75", - "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY", - "-1", - "UTC+8", - "0.0.0.0", - "/var/lib/milvus", - "", - } - - sc := SysConfig{kv: svr.kvBase} - sc.InitFromFile(".") - - configRequest := &internalpb.SysConfigRequest{ - MsgType: internalpb.MsgType_kGetSysConfigs, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - Keys: testKeys, - KeyPrefixes: []string{}, - } - - response, err := cli.GetSysConfigs(ctx, configRequest) - assert.Nil(t, err) - assert.ElementsMatch(t, testKeys, response.Keys) - assert.ElementsMatch(t, testVals, response.Values) - assert.Equal(t, len(response.GetKeys()), len(response.GetValues())) - - configRequest = &internalpb.SysConfigRequest{ - MsgType: internalpb.MsgType_kGetSysConfigs, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - Keys: []string{}, - KeyPrefixes: []string{"/master"}, - } - - response, err = cli.GetSysConfigs(ctx, configRequest) - assert.Nil(t, err) - for i := range response.GetKeys() { - assert.True(t, strings.HasPrefix(response.GetKeys()[i], "/master")) - } - assert.Equal(t, len(response.GetKeys()), len(response.GetValues())) - - t.Run("Test duplicate keys and key prefix", func(t *testing.T) { - configRequest.Keys = []string{} - configRequest.KeyPrefixes = []string{"/master"} - - resp, err := cli.GetSysConfigs(ctx, configRequest) - require.Nil(t, err) - assert.Equal(t, len(resp.GetKeys()), len(resp.GetValues())) - assert.NotEqual(t, 0, len(resp.GetKeys())) - - configRequest.Keys = []string{"/master/port"} - configRequest.KeyPrefixes = []string{"/master"} - - respDup, err := cli.GetSysConfigs(ctx, configRequest) - require.Nil(t, err) - assert.Equal(t, len(respDup.GetKeys()), len(respDup.GetValues())) - assert.NotEqual(t, 0, len(respDup.GetKeys())) - assert.Equal(t, len(respDup.GetKeys()), len(resp.GetKeys())) - }) - -} diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index e1a61285af0075155447967452f13286215c5b72..8216ec99958a40a823a356cfe16f59164f91eaf4 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -443,8 +443,39 @@ func (s *Master) AssignSegmentID(ctx context.Context, request *internalpb.Assign }, nil } -func (s *Master) CreateIndex(context.Context, *internalpb.CreateIndexRequest) (*commonpb.Status, error) { - return nil, nil +func (s *Master) CreateIndex(ctx context.Context, req *internalpb.CreateIndexRequest) (*commonpb.Status, error) { + task := &createIndexTask{ + baseTask: baseTask{ + sch: s.scheduler, + mt: s.metaTable, + cv: make(chan error), + }, + req: req, + indexBuildScheduler: s.indexBuildSch, + indexLoadScheduler: s.indexLoadSch, + segManager: s.segmentManager, + } + + err := s.scheduler.Enqueue(task) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "Enqueue failed: " + err.Error(), + }, nil + } + + err = task.WaitToFinish(ctx) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "Create Index error: " + err.Error(), + }, nil + } + + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + Reason: "", + }, nil } func (s *Master) DescribeIndex(context.Context, *internalpb.DescribeIndexRequest) (*servicepb.DescribeIndexResponse, error) { diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go deleted file mode 100644 index ec7f2ce9fe0ff6e00391245d85736b828e2b7057..0000000000000000000000000000000000000000 --- a/internal/master/grpc_service_test.go +++ /dev/null @@ -1,184 +0,0 @@ -package master - -import ( - "context" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" -) - -func TestMaster_CreateCollection(t *testing.T) { - Init() - refreshMasterAddress() - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - etcdAddr := Params.EtcdAddress - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - assert.Nil(t, err) - _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) - assert.Nil(t, err) - - Params = ParamTable{ - Address: Params.Address, - Port: Params.Port, - - EtcdAddress: Params.EtcdAddress, - MetaRootPath: "/test/root/meta", - KvRootPath: "/test/root/kv", - PulsarAddress: Params.PulsarAddress, - - ProxyIDList: []typeutil.UniqueID{1, 2}, - WriteNodeIDList: []typeutil.UniqueID{3, 4}, - - TopicNum: 5, - QueryNodeNum: 3, - SoftTimeTickBarrierInterval: 300, - - // segment - SegmentSize: 536870912 / 1024 / 1024, - SegmentSizeFactor: 0.75, - DefaultRecordSize: 1024, - MinSegIDAssignCnt: 1048576 / 1024, - MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, - SegIDAssignExpiration: 2000, - - // msgChannel - ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, - WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, - InsertChannelNames: []string{"dm0", "dm1"}, - K2SChannelNames: []string{"k2s0", "k2s1"}, - QueryNodeStatsChannelName: "statistic", - MsgChannelSubName: Params.MsgChannelSubName, - } - - svr, err := CreateServer(ctx) - assert.Nil(t, err) - err = svr.Run(int64(Params.Port)) - assert.Nil(t, err) - - conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) - assert.Nil(t, err) - defer conn.Close() - - cli := masterpb.NewMasterClient(conn) - sch := schemapb.CollectionSchema{ - Name: "col1", - Description: "test collection", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - Name: "col1_f1", - Description: "test collection filed 1", - DataType: schemapb.DataType_VECTOR_FLOAT, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f1_tk1", - Value: "col1_f1_tv1", - }, - { - Key: "col1_f1_tk2", - Value: "col1_f1_tv2", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f1_ik1", - Value: "col1_f1_iv1", - }, - { - Key: "col1_f1_ik2", - Value: "col1_f1_iv2", - }, - }, - }, - { - Name: "col1_f2", - Description: "test collection filed 2", - DataType: schemapb.DataType_VECTOR_BINARY, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f2_tk1", - Value: "col1_f2_tv1", - }, - { - Key: "col1_f2_tk2", - Value: "col1_f2_tv2", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f2_ik1", - Value: "col1_f2_iv1", - }, - { - Key: "col1_f2_ik2", - Value: "col1_f2_iv2", - }, - }, - }, - }, - } - schemaBytes, err := proto.Marshal(&sch) - assert.Nil(t, err) - - req := internalpb.CreateCollectionRequest{ - MsgType: internalpb.MsgType_kCreateCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - Schema: &commonpb.Blob{Value: schemaBytes}, - } - st, err := cli.CreateCollection(ctx, &req) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - - collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) - assert.Nil(t, err) - t.Logf("collection id = %d", collMeta.ID) - assert.Equal(t, collMeta.CreateTime, uint64(11)) - assert.Equal(t, collMeta.Schema.Name, "col1") - assert.Equal(t, collMeta.Schema.AutoID, false) - assert.Equal(t, len(collMeta.Schema.Fields), 2) - assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") - assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") - assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) - assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) - assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2) - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Value, "col1_f1_tv2") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Key, "col1_f1_ik1") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Key, "col1_f1_ik2") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Value, "col1_f1_iv1") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Value, "col1_f1_iv2") - - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Key, "col1_f2_tk1") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Key, "col1_f2_tk2") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Value, "col1_f2_tv1") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Value, "col1_f2_tv2") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Key, "col1_f2_ik1") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Key, "col1_f2_ik2") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") - - req.Timestamp = Timestamp(10) - st, err = cli.CreateCollection(ctx, &req) - assert.Nil(t, err) - assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - - svr.Close() -} diff --git a/internal/master/index_task.go b/internal/master/index_task.go new file mode 100644 index 0000000000000000000000000000000000000000..bb39fb986d8145f1ab06b2ab92e304b540dd8587 --- /dev/null +++ b/internal/master/index_task.go @@ -0,0 +1,95 @@ +package master + +import ( + "fmt" + + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +type createIndexTask struct { + baseTask + req *internalpb.CreateIndexRequest + indexBuildScheduler *IndexBuildScheduler + indexLoadScheduler *IndexLoadScheduler + segManager *SegmentManager +} + +func (task *createIndexTask) Type() internalpb.MsgType { + return internalpb.MsgType_kCreateIndex +} + +func (task *createIndexTask) Ts() (Timestamp, error) { + return task.req.Timestamp, nil +} + +func (task *createIndexTask) Execute() error { + // modify schema + if err := task.mt.UpdateFieldIndexParams(task.req.CollectionName, task.req.FieldName, task.req.ExtraParams); err != nil { + return err + } + // check if closed segment has the same index build history + collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName) + if err != nil { + return err + } + var fieldID int64 = -1 + for _, fieldSchema := range collMeta.Schema.Fields { + if fieldSchema.Name == task.req.FieldName { + fieldID = fieldSchema.FieldID + break + } + } + if fieldID == -1 { + return fmt.Errorf("can not find field name %s", task.req.FieldName) + } + + for _, segID := range collMeta.SegmentIDs { + segMeta, err := task.mt.GetSegmentByID(segID) + if err != nil { + return err + } + if segMeta.CloseTime == 0 { + continue + } + hasIndexMeta, err := task.mt.HasFieldIndexMeta(segID, fieldID, task.req.ExtraParams) + if err != nil { + return err + } + + if hasIndexMeta { + // load index + indexMeta, err := task.mt.GetFieldIndexMeta(segID, fieldID, task.req.ExtraParams) + if err != nil { + return err + } + err = task.indexLoadScheduler.Enqueue(&IndexLoadInfo{ + segmentID: segID, + fieldID: fieldID, + fieldName: task.req.FieldName, + indexFilePaths: indexMeta.IndexFilePaths, + }) + if err != nil { + return err + } + } else { + // create index + for _, kv := range segMeta.BinlogFilePaths { + if kv.FieldID != fieldID { + continue + } + err := task.indexBuildScheduler.Enqueue(&IndexBuildInfo{ + segmentID: segID, + fieldID: fieldID, + binlogFilePath: kv.BinlogFiles, + }) + if err != nil { + return err + } + break + } + } + } + + // close unfilled segment + return task.segManager.ForceClose(collMeta.ID) +} diff --git a/internal/master/master_test.go b/internal/master/master_test.go index 632e45deaf88e2de6b9bcf43b747227cb0e3cda1..c163a6849fd9fd5cd2df53d6d6a957a40dc476e2 100644 --- a/internal/master/master_test.go +++ b/internal/master/master_test.go @@ -2,11 +2,16 @@ package master import ( "context" + "fmt" "log" "math/rand" "os" "strconv" + "strings" "testing" + "time" + + "github.com/stretchr/testify/require" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" @@ -14,7 +19,7 @@ import ( "github.com/stretchr/testify/assert" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" @@ -70,7 +75,7 @@ func receiveTimeTickMsg(stream *ms.MsgStream) bool { func getTimeTickMsgPack(ttmsgs [][2]uint64) *ms.MsgPack { msgPack := ms.MsgPack{} for _, vi := range ttmsgs { - msgPack.Msgs = append(msgPack.Msgs, getTtMsg(internalPb.MsgType_kTimeTick, UniqueID(vi[0]), Timestamp(vi[1]))) + msgPack.Msgs = append(msgPack.Msgs, getTtMsg(internalpb.MsgType_kTimeTick, UniqueID(vi[0]), Timestamp(vi[1]))) } return &msgPack } @@ -103,194 +108,1021 @@ func TestMaster(t *testing.T) { log.Fatal("run server failed", zap.Error(err)) } - proxyTimeTickStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream - proxyTimeTickStream.SetPulsarClient(pulsarAddr) - proxyTimeTickStream.CreatePulsarProducers(Params.ProxyTimeTickChannelNames) - proxyTimeTickStream.Start() - - writeNodeStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream - writeNodeStream.SetPulsarClient(pulsarAddr) - writeNodeStream.CreatePulsarProducers(Params.WriteNodeTimeTickChannelNames) - writeNodeStream.Start() - - ddMs := ms.NewPulsarMsgStream(ctx, 1024) - ddMs.SetPulsarClient(pulsarAddr) - ddMs.CreatePulsarConsumers(Params.DDChannelNames, "DDStream", ms.NewUnmarshalDispatcher(), 1024) - ddMs.Start() - - dMMs := ms.NewPulsarMsgStream(ctx, 1024) - dMMs.SetPulsarClient(pulsarAddr) - dMMs.CreatePulsarConsumers(Params.InsertChannelNames, "DMStream", ms.NewUnmarshalDispatcher(), 1024) - dMMs.Start() - - k2sMs := ms.NewPulsarMsgStream(ctx, 1024) - k2sMs.SetPulsarClient(pulsarAddr) - k2sMs.CreatePulsarConsumers(Params.K2SChannelNames, "K2SStream", ms.NewUnmarshalDispatcher(), 1024) - k2sMs.Start() - - ttsoftmsgs := [][2]uint64{ - {0, 10}, - } - msgSoftPackAddr := getTimeTickMsgPack(ttsoftmsgs) + conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) + require.Nil(t, err) + cli := masterpb.NewMasterClient(conn) - proxyTimeTickStream.Produce(msgSoftPackAddr) - var dMMsgstream ms.MsgStream = dMMs - assert.True(t, receiveTimeTickMsg(&dMMsgstream)) - var ddMsgstream ms.MsgStream = ddMs - assert.True(t, receiveTimeTickMsg(&ddMsgstream)) + t.Run("TestConfigTask", func(t *testing.T) { + testKeys := []string{ + "/etcd/address", + "/master/port", + "/master/proxyidlist", + "/master/segmentthresholdfactor", + "/pulsar/token", + "/reader/stopflag", + "/proxy/timezone", + "/proxy/network/address", + "/proxy/storage/path", + "/storage/accesskey", + } - tthardmsgs := [][2]int{ - {3, 10}, - } + testVals := []string{ + "localhost", + "53100", + "[1 2]", + "0.75", + "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY", + "-1", + "UTC+8", + "0.0.0.0", + "/var/lib/milvus", + "", + } - msghardPackAddr := getMsgPack(tthardmsgs) - writeNodeStream.Produce(msghardPackAddr) - var k2sMsgstream ms.MsgStream = k2sMs - assert.True(t, receiveTimeTickMsg(&k2sMsgstream)) + sc := SysConfig{kv: svr.kvBase} + sc.InitFromFile(".") - conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) - assert.Nil(t, err) - defer conn.Close() + configRequest := &internalpb.SysConfigRequest{ + MsgType: internalpb.MsgType_kGetSysConfigs, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + Keys: testKeys, + KeyPrefixes: []string{}, + } - cli := masterpb.NewMasterClient(conn) + response, err := cli.GetSysConfigs(ctx, configRequest) + assert.Nil(t, err) + assert.ElementsMatch(t, testKeys, response.Keys) + assert.ElementsMatch(t, testVals, response.Values) + assert.Equal(t, len(response.GetKeys()), len(response.GetValues())) - sch := schemapb.CollectionSchema{ - Name: "name" + strconv.FormatUint(rand.Uint64(), 10), - Description: "test collection", - AutoID: false, - Fields: []*schemapb.FieldSchema{}, - } + configRequest = &internalpb.SysConfigRequest{ + MsgType: internalpb.MsgType_kGetSysConfigs, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + Keys: []string{}, + KeyPrefixes: []string{"/master"}, + } - schemaBytes, err := proto.Marshal(&sch) - assert.Nil(t, err) + response, err = cli.GetSysConfigs(ctx, configRequest) + assert.Nil(t, err) + for i := range response.GetKeys() { + assert.True(t, strings.HasPrefix(response.GetKeys()[i], "/master")) + } + assert.Equal(t, len(response.GetKeys()), len(response.GetValues())) - createCollectionReq := internalPb.CreateCollectionRequest{ - MsgType: internalPb.MsgType_kCreateCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - Schema: &commonpb.Blob{Value: schemaBytes}, - } - st, err := cli.CreateCollection(ctx, &createCollectionReq) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + }) - var consumeMsg ms.MsgStream = ddMs - var createCollectionMsg *ms.CreateCollectionMsg - for { - result := consumeMsg.Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - createCollectionMsg = v.(*ms.CreateCollectionMsg) + t.Run("TestConfigDuplicateKeysAndKeyPrefix", func(t *testing.T) { + configRequest := &internalpb.SysConfigRequest{} + configRequest.Keys = []string{} + configRequest.KeyPrefixes = []string{"/master"} + + configRequest.Timestamp = uint64(time.Now().Unix()) + resp, err := cli.GetSysConfigs(ctx, configRequest) + require.Nil(t, err) + assert.Equal(t, len(resp.GetKeys()), len(resp.GetValues())) + assert.NotEqual(t, 0, len(resp.GetKeys())) + + configRequest.Keys = []string{"/master/port"} + configRequest.KeyPrefixes = []string{"/master"} + + configRequest.Timestamp = uint64(time.Now().Unix()) + respDup, err := cli.GetSysConfigs(ctx, configRequest) + require.Nil(t, err) + assert.Equal(t, len(respDup.GetKeys()), len(respDup.GetValues())) + assert.NotEqual(t, 0, len(respDup.GetKeys())) + assert.Equal(t, len(respDup.GetKeys()), len(resp.GetKeys())) + }) + + t.Run("TestCollectionTask", func(t *testing.T) { + fmt.Println("point 3") + sch := schemapb.CollectionSchema{ + Name: "col1", + Description: "test collection", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + Name: "col1_f1", + Description: "test collection filed 1", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f1_tk1", + Value: "col1_f1_tv1", + }, + { + Key: "col1_f1_tk2", + Value: "col1_f1_tv2", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f1_ik1", + Value: "col1_f1_iv1", + }, + { + Key: "col1_f1_ik2", + Value: "col1_f1_iv2", + }, + }, + }, + { + Name: "col1_f2", + Description: "test collection filed 2", + DataType: schemapb.DataType_VECTOR_BINARY, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f2_tk1", + Value: "col1_f2_tv1", + }, + { + Key: "col1_f2_tk2", + Value: "col1_f2_tv2", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f2_ik1", + Value: "col1_f2_iv1", + }, + { + Key: "col1_f2_ik2", + Value: "col1_f2_iv2", + }, + }, + }, + }, + } + schemaBytes, err := proto.Marshal(&sch) + assert.Nil(t, err) + + createCollectionReq := internalpb.CreateCollectionRequest{ + MsgType: internalpb.MsgType_kCreateCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + Schema: &commonpb.Blob{Value: schemaBytes}, + } + log.Printf("... [Create] collection col1\n") + st, err := cli.CreateCollection(ctx, &createCollectionReq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + // HasCollection + reqHasCollection := internalpb.HasCollectionRequest{ + MsgType: internalpb.MsgType_kHasCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &servicepb.CollectionName{ + CollectionName: "col1", + }, + } + + // "col1" is true + log.Printf("... [Has] collection col1\n") + boolResp, err := cli.HasCollection(ctx, &reqHasCollection) + assert.Nil(t, err) + assert.Equal(t, true, boolResp.Value) + assert.Equal(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + + // "colNotExist" is false + reqHasCollection.CollectionName.CollectionName = "colNotExist" + boolResp, err = cli.HasCollection(ctx, &reqHasCollection) + assert.Nil(t, err) + assert.Equal(t, boolResp.Value, false) + assert.Equal(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + + // error + reqHasCollection.Timestamp = Timestamp(0) + reqHasCollection.CollectionName.CollectionName = "col1" + boolResp, err = cli.HasCollection(ctx, &reqHasCollection) + assert.Nil(t, err) + assert.NotEqual(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS) + + // ShowCollection + reqShowCollection := internalpb.ShowCollectionRequest{ + MsgType: internalpb.MsgType_kShowCollections, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + } + + listResp, err := cli.ShowCollections(ctx, &reqShowCollection) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) + assert.Equal(t, 1, len(listResp.Values)) + assert.Equal(t, "col1", listResp.Values[0]) + + reqShowCollection.Timestamp = Timestamp(0) + listResp, err = cli.ShowCollections(ctx, &reqShowCollection) + assert.Nil(t, err) + assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) + + // CreateCollection Test + collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) + assert.Nil(t, err) + t.Logf("collection id = %d", collMeta.ID) + assert.Equal(t, collMeta.Schema.Name, "col1") + assert.Equal(t, collMeta.Schema.AutoID, false) + assert.Equal(t, len(collMeta.Schema.Fields), 2) + assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") + assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") + assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) + assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) + assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2) + assert.Equal(t, int64(100), collMeta.Schema.Fields[0].FieldID) + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Value, "col1_f1_tv2") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Key, "col1_f1_ik1") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Key, "col1_f1_ik2") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Value, "col1_f1_iv1") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Value, "col1_f1_iv2") + + assert.Equal(t, int64(101), collMeta.Schema.Fields[1].FieldID) + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Key, "col1_f2_tk1") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Key, "col1_f2_tk2") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Value, "col1_f2_tv1") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Value, "col1_f2_tv2") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Key, "col1_f2_ik1") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Key, "col1_f2_ik2") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") + + createCollectionReq.Timestamp = Timestamp(0) + st, err = cli.CreateCollection(ctx, &createCollectionReq) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + // DescribeCollection Test + reqDescribe := &internalpb.DescribeCollectionRequest{ + MsgType: internalpb.MsgType_kDescribeCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &servicepb.CollectionName{ + CollectionName: "col1", + }, + } + des, err := cli.DescribeCollection(ctx, reqDescribe) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, des.Status.ErrorCode) + + assert.Equal(t, "col1", des.Schema.Name) + assert.Equal(t, false, des.Schema.AutoID) + assert.Equal(t, 2, len(des.Schema.Fields)) + assert.Equal(t, "col1_f1", des.Schema.Fields[0].Name) + assert.Equal(t, "col1_f2", des.Schema.Fields[1].Name) + assert.Equal(t, schemapb.DataType_VECTOR_FLOAT, des.Schema.Fields[0].DataType) + assert.Equal(t, schemapb.DataType_VECTOR_BINARY, des.Schema.Fields[1].DataType) + assert.Equal(t, 2, len(des.Schema.Fields[0].TypeParams)) + assert.Equal(t, 2, len(des.Schema.Fields[0].IndexParams)) + assert.Equal(t, 2, len(des.Schema.Fields[1].TypeParams)) + assert.Equal(t, 2, len(des.Schema.Fields[1].IndexParams)) + assert.Equal(t, int64(100), des.Schema.Fields[0].FieldID) + assert.Equal(t, "col1_f1_tk1", des.Schema.Fields[0].TypeParams[0].Key) + assert.Equal(t, "col1_f1_tv1", des.Schema.Fields[0].TypeParams[0].Value) + assert.Equal(t, "col1_f1_ik1", des.Schema.Fields[0].IndexParams[0].Key) + assert.Equal(t, "col1_f1_iv1", des.Schema.Fields[0].IndexParams[0].Value) + assert.Equal(t, "col1_f1_tk2", des.Schema.Fields[0].TypeParams[1].Key) + assert.Equal(t, "col1_f1_tv2", des.Schema.Fields[0].TypeParams[1].Value) + assert.Equal(t, "col1_f1_ik2", des.Schema.Fields[0].IndexParams[1].Key) + assert.Equal(t, "col1_f1_iv2", des.Schema.Fields[0].IndexParams[1].Value) + + assert.Equal(t, int64(101), des.Schema.Fields[1].FieldID) + assert.Equal(t, "col1_f2_tk1", des.Schema.Fields[1].TypeParams[0].Key) + assert.Equal(t, "col1_f2_tv1", des.Schema.Fields[1].TypeParams[0].Value) + assert.Equal(t, "col1_f2_ik1", des.Schema.Fields[1].IndexParams[0].Key) + assert.Equal(t, "col1_f2_iv1", des.Schema.Fields[1].IndexParams[0].Value) + assert.Equal(t, "col1_f2_tk2", des.Schema.Fields[1].TypeParams[1].Key) + assert.Equal(t, "col1_f2_tv2", des.Schema.Fields[1].TypeParams[1].Value) + assert.Equal(t, "col1_f2_ik2", des.Schema.Fields[1].IndexParams[1].Key) + assert.Equal(t, "col1_f2_iv2", des.Schema.Fields[1].IndexParams[1].Value) + + reqDescribe.CollectionName.CollectionName = "colNotExist" + des, err = cli.DescribeCollection(ctx, reqDescribe) + assert.Nil(t, err) + assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, des.Status.ErrorCode) + log.Printf(des.Status.Reason) + + reqDescribe.CollectionName.CollectionName = "col1" + reqDescribe.Timestamp = Timestamp(0) + des, err = cli.DescribeCollection(ctx, reqDescribe) + assert.Nil(t, err) + assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, des.Status.ErrorCode) + log.Printf(des.Status.Reason) + + // ------------------------------DropCollectionTask--------------------------- + log.Printf("... [Drop] collection col1\n") + ser := servicepb.CollectionName{CollectionName: "col1"} + + reqDrop := internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &ser, + } + + // DropCollection + st, err = cli.DropCollection(ctx, &reqDrop) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + collMeta, err = svr.metaTable.GetCollectionByName(sch.Name) + assert.NotNil(t, err) + + // HasCollection "col1" is false + reqHasCollection.Timestamp = uint64(time.Now().Unix()) + reqHasCollection.CollectionName.CollectionName = "col1" + boolResp, err = cli.HasCollection(ctx, &reqHasCollection) + assert.Nil(t, err) + assert.Equal(t, false, boolResp.Value) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, boolResp.Status.ErrorCode) + + // ShowCollections + reqShowCollection.Timestamp = uint64(time.Now().Unix()) + listResp, err = cli.ShowCollections(ctx, &reqShowCollection) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) + assert.Equal(t, 0, len(listResp.Values)) + + // Drop again + st, err = cli.DropCollection(ctx, &reqDrop) + assert.Nil(t, err) + assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + // Create "col1" + createCollectionReq.Timestamp = uint64(time.Now().Unix()) + + st, err = cli.CreateCollection(ctx, &createCollectionReq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + reqHasCollection.Timestamp = uint64(time.Now().Unix()) + boolResp, err = cli.HasCollection(ctx, &reqHasCollection) + assert.Nil(t, err) + assert.Equal(t, true, boolResp.Value) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, boolResp.Status.ErrorCode) + + // Create "col2" + sch.Name = "col2" + schemaBytes, err = proto.Marshal(&sch) + assert.Nil(t, err) + + createCollectionReq = internalpb.CreateCollectionRequest{ + MsgType: internalpb.MsgType_kCreateCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + Schema: &commonpb.Blob{Value: schemaBytes}, + } + st, err = cli.CreateCollection(ctx, &createCollectionReq) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) + + // Show Collections + reqShowCollection.Timestamp = uint64(time.Now().Unix()) + listResp, err = cli.ShowCollections(ctx, &reqShowCollection) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) + assert.Equal(t, 2, len(listResp.Values)) + assert.ElementsMatch(t, []string{"col1", "col2"}, listResp.Values) + + // Drop Collection + ser = servicepb.CollectionName{CollectionName: "col1"} + + reqDrop = internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &ser, + } + + // DropCollection + st, err = cli.DropCollection(ctx, &reqDrop) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + ser = servicepb.CollectionName{CollectionName: "col2"} + reqDrop = internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &ser, + } + + // DropCollection + st, err = cli.DropCollection(ctx, &reqDrop) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + //consume msg + ddMs := ms.NewPulsarMsgStream(ctx, 1024) + ddMs.SetPulsarClient(pulsarAddr) + ddMs.CreatePulsarConsumers(Params.DDChannelNames, "DDStream", ms.NewUnmarshalDispatcher(), 1024) + ddMs.Start() + + var consumeMsg ms.MsgStream = ddMs + for { + result := consumeMsg.Consume() + if len(result.Msgs) > 0 { + break } - break } - } - assert.Equal(t, createCollectionReq.MsgType, createCollectionMsg.CreateCollectionRequest.MsgType) - assert.Equal(t, createCollectionReq.ReqID, createCollectionMsg.CreateCollectionRequest.ReqID) - assert.Equal(t, createCollectionReq.Timestamp, createCollectionMsg.CreateCollectionRequest.Timestamp) - assert.Equal(t, createCollectionReq.ProxyID, createCollectionMsg.CreateCollectionRequest.ProxyID) - assert.Equal(t, createCollectionReq.Schema.Value, createCollectionMsg.CreateCollectionRequest.Schema.Value) - - ////////////////////////////CreatePartition//////////////////////// - partitionName := "partitionName" + strconv.FormatUint(rand.Uint64(), 10) - createPartitionReq := internalPb.CreatePartitionRequest{ - MsgType: internalPb.MsgType_kCreatePartition, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{ - CollectionName: sch.Name, - Tag: partitionName, - }, - } + ddMs.Close() + }) - st, err = cli.CreatePartition(ctx, &createPartitionReq) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + t.Run("TestPartitionTask", func(t *testing.T) { + sch := schemapb.CollectionSchema{ + Name: "col1", + Description: "test collection", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + Name: "col1_f1", + Description: "test collection filed 1", + DataType: schemapb.DataType_VECTOR_FLOAT, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f1_tk1", + Value: "col1_f1_tv1", + }, + { + Key: "col1_f1_tk2", + Value: "col1_f1_tv2", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f1_ik1", + Value: "col1_f1_iv1", + }, + { + Key: "col1_f1_ik2", + Value: "col1_f1_iv2", + }, + }, + }, + { + Name: "col1_f2", + Description: "test collection filed 2", + DataType: schemapb.DataType_VECTOR_BINARY, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f2_tk1", + Value: "col1_f2_tv1", + }, + { + Key: "col1_f2_tk2", + Value: "col1_f2_tv2", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "col1_f2_ik1", + Value: "col1_f2_iv1", + }, + { + Key: "col1_f2_ik2", + Value: "col1_f2_iv2", + }, + }, + }, + }, + } + schemaBytes, err := proto.Marshal(&sch) + assert.Nil(t, err) - var createPartitionMsg *ms.CreatePartitionMsg - for { - result := consumeMsg.Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - createPartitionMsg = v.(*ms.CreatePartitionMsg) + createCollectionReq := internalpb.CreateCollectionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + Schema: &commonpb.Blob{Value: schemaBytes}, + } + st, _ := cli.CreateCollection(ctx, &createCollectionReq) + assert.NotNil(t, st) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) + + createPartitionReq := internalpb.CreatePartitionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, + } + st, _ = cli.CreatePartition(ctx, &createPartitionReq) + assert.NotNil(t, st) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) + + createPartitionReq = internalpb.CreatePartitionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, + } + st, _ = cli.CreatePartition(ctx, &createPartitionReq) + assert.NotNil(t, st) + assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, st.ErrorCode) + + createPartitionReq = internalpb.CreatePartitionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, + } + st, _ = cli.CreatePartition(ctx, &createPartitionReq) + assert.NotNil(t, st) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) + + collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) + assert.Nil(t, err) + t.Logf("collection id = %d", collMeta.ID) + assert.Equal(t, collMeta.Schema.Name, "col1") + assert.Equal(t, collMeta.Schema.AutoID, false) + assert.Equal(t, len(collMeta.Schema.Fields), 2) + assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") + assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") + assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) + assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) + assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2) + assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2) + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") + assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Value, "col1_f1_tv2") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Key, "col1_f1_ik1") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Key, "col1_f1_ik2") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Value, "col1_f1_iv1") + assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Value, "col1_f1_iv2") + + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Key, "col1_f2_tk1") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Key, "col1_f2_tk2") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Value, "col1_f2_tv1") + assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Value, "col1_f2_tv2") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Key, "col1_f2_ik1") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Key, "col1_f2_ik2") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") + assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") + assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, collMeta.PartitionTags) + + showPartitionReq := internalpb.ShowPartitionRequest{ + MsgType: internalpb.MsgType_kShowPartitions, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &servicepb.CollectionName{CollectionName: "col1"}, + } + + stringList, err := cli.ShowPartitions(ctx, &showPartitionReq) + assert.Nil(t, err) + assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, stringList.Values) + + showPartitionReq = internalpb.ShowPartitionRequest{ + MsgType: internalpb.MsgType_kShowPartitions, + ReqID: 1, + Timestamp: 0, + ProxyID: 1, + CollectionName: &servicepb.CollectionName{CollectionName: "col1"}, + } + + stringList, _ = cli.ShowPartitions(ctx, &showPartitionReq) + assert.NotNil(t, stringList) + assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, stringList.Status.ErrorCode) + + hasPartitionReq := internalpb.HasPartitionRequest{ + MsgType: internalpb.MsgType_kHasPartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, + } + + hasPartition, err := cli.HasPartition(ctx, &hasPartitionReq) + assert.Nil(t, err) + assert.True(t, hasPartition.Value) + + hasPartitionReq = internalpb.HasPartitionRequest{ + MsgType: internalpb.MsgType_kHasPartition, + ReqID: 1, + Timestamp: 0, + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, + } + + hasPartition, _ = cli.HasPartition(ctx, &hasPartitionReq) + assert.NotNil(t, hasPartition) + assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, stringList.Status.ErrorCode) + + hasPartitionReq = internalpb.HasPartitionRequest{ + MsgType: internalpb.MsgType_kHasPartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition3"}, + } + + hasPartition, err = cli.HasPartition(ctx, &hasPartitionReq) + assert.Nil(t, err) + assert.False(t, hasPartition.Value) + + deletePartitionReq := internalpb.DropPartitionRequest{ + MsgType: internalpb.MsgType_kDropPartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, + } + + st, err = cli.DropPartition(ctx, &deletePartitionReq) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) + + deletePartitionReq = internalpb.DropPartitionRequest{ + MsgType: internalpb.MsgType_kDropPartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, + } + + st, _ = cli.DropPartition(ctx, &deletePartitionReq) + assert.NotNil(t, st) + assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, st.ErrorCode) + + hasPartitionReq = internalpb.HasPartitionRequest{ + MsgType: internalpb.MsgType_kHasPartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, + } + + hasPartition, err = cli.HasPartition(ctx, &hasPartitionReq) + assert.Nil(t, err) + assert.False(t, hasPartition.Value) + + describePartitionReq := internalpb.DescribePartitionRequest{ + MsgType: internalpb.MsgType_kDescribePartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, + } + + describePartition, err := cli.DescribePartition(ctx, &describePartitionReq) + assert.Nil(t, err) + assert.Equal(t, &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, describePartition.Name) + + describePartitionReq = internalpb.DescribePartitionRequest{ + MsgType: internalpb.MsgType_kDescribePartition, + ReqID: 1, + Timestamp: 0, + ProxyID: 1, + PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, + } + + describePartition, _ = cli.DescribePartition(ctx, &describePartitionReq) + assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, describePartition.Status.ErrorCode) + + // DropCollection + ser := servicepb.CollectionName{CollectionName: "col1"} + reqDrop := internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &ser, + } + st, err = cli.DropCollection(ctx, &reqDrop) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + //consume msg + ddMs := ms.NewPulsarMsgStream(ctx, 1024) + ddMs.SetPulsarClient(pulsarAddr) + ddMs.CreatePulsarConsumers(Params.DDChannelNames, "DDStream", ms.NewUnmarshalDispatcher(), 1024) + ddMs.Start() + + var consumeMsg ms.MsgStream = ddMs + for { + result := consumeMsg.Consume() + if len(result.Msgs) > 0 { + break } - break } - } - assert.Equal(t, createPartitionReq.MsgType, createPartitionMsg.CreatePartitionRequest.MsgType) - assert.Equal(t, createPartitionReq.ReqID, createPartitionMsg.CreatePartitionRequest.ReqID) - assert.Equal(t, createPartitionReq.Timestamp, createPartitionMsg.CreatePartitionRequest.Timestamp) - assert.Equal(t, createPartitionReq.ProxyID, createPartitionMsg.CreatePartitionRequest.ProxyID) - assert.Equal(t, createPartitionReq.PartitionName.CollectionName, createPartitionMsg.CreatePartitionRequest.PartitionName.CollectionName) - assert.Equal(t, createPartitionReq.PartitionName.Tag, createPartitionMsg.CreatePartitionRequest.PartitionName.Tag) - - ////////////////////////////DropPartition//////////////////////// - dropPartitionReq := internalPb.DropPartitionRequest{ - MsgType: internalPb.MsgType_kDropPartition, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{ - CollectionName: sch.Name, - Tag: partitionName, - }, - } + ddMs.Close() + }) - st, err = cli.DropPartition(ctx, &dropPartitionReq) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + t.Run("TestBroadCastRequest", func(t *testing.T) { - var dropPartitionMsg *ms.DropPartitionMsg - for { - result := consumeMsg.Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - dropPartitionMsg = v.(*ms.DropPartitionMsg) + proxyTimeTickStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream + proxyTimeTickStream.SetPulsarClient(pulsarAddr) + proxyTimeTickStream.CreatePulsarProducers(Params.ProxyTimeTickChannelNames) + proxyTimeTickStream.Start() + + writeNodeStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream + writeNodeStream.SetPulsarClient(pulsarAddr) + writeNodeStream.CreatePulsarProducers(Params.WriteNodeTimeTickChannelNames) + writeNodeStream.Start() + + ddMs := ms.NewPulsarMsgStream(ctx, 1024) + ddMs.SetPulsarClient(pulsarAddr) + ddMs.CreatePulsarConsumers(Params.DDChannelNames, "DDStream", ms.NewUnmarshalDispatcher(), 1024) + ddMs.Start() + + dMMs := ms.NewPulsarMsgStream(ctx, 1024) + dMMs.SetPulsarClient(pulsarAddr) + dMMs.CreatePulsarConsumers(Params.InsertChannelNames, "DMStream", ms.NewUnmarshalDispatcher(), 1024) + dMMs.Start() + + k2sMs := ms.NewPulsarMsgStream(ctx, 1024) + k2sMs.SetPulsarClient(pulsarAddr) + k2sMs.CreatePulsarConsumers(Params.K2SChannelNames, "K2SStream", ms.NewUnmarshalDispatcher(), 1024) + k2sMs.Start() + + ttsoftmsgs := [][2]uint64{ + {0, 10}, + } + msgSoftPackAddr := getTimeTickMsgPack(ttsoftmsgs) + + err := proxyTimeTickStream.Produce(msgSoftPackAddr) + assert.Nil(t, err) + var dMMsgstream ms.MsgStream = dMMs + assert.True(t, receiveTimeTickMsg(&dMMsgstream)) + var ddMsgstream ms.MsgStream = ddMs + assert.True(t, receiveTimeTickMsg(&ddMsgstream)) + + tthardmsgs := [][2]int{ + {3, 10}, + } + + msghardPackAddr := getMsgPack(tthardmsgs) + err = writeNodeStream.Produce(msghardPackAddr) + assert.Nil(t, err) + var k2sMsgstream ms.MsgStream = k2sMs + assert.True(t, receiveTimeTickMsg(&k2sMsgstream)) + + conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) + assert.Nil(t, err) + defer conn.Close() + + cli := masterpb.NewMasterClient(conn) + + sch := schemapb.CollectionSchema{ + Name: "name" + strconv.FormatUint(rand.Uint64(), 10), + Description: "test collection", + AutoID: false, + Fields: []*schemapb.FieldSchema{}, + } + + schemaBytes, err := proto.Marshal(&sch) + assert.Nil(t, err) + + createCollectionReq := internalpb.CreateCollectionRequest{ + MsgType: internalpb.MsgType_kCreateCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + Schema: &commonpb.Blob{Value: schemaBytes}, + } + st, err := cli.CreateCollection(ctx, &createCollectionReq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + var consumeMsg ms.MsgStream = ddMs + var createCollectionMsg *ms.CreateCollectionMsg + for { + result := consumeMsg.Consume() + if len(result.Msgs) > 0 { + msgs := result.Msgs + for _, v := range msgs { + createCollectionMsg = v.(*ms.CreateCollectionMsg) + } + break } - break } - } - assert.Equal(t, dropPartitionReq.MsgType, dropPartitionMsg.DropPartitionRequest.MsgType) - assert.Equal(t, dropPartitionReq.ReqID, dropPartitionMsg.DropPartitionRequest.ReqID) - assert.Equal(t, dropPartitionReq.Timestamp, dropPartitionMsg.DropPartitionRequest.Timestamp) - assert.Equal(t, dropPartitionReq.ProxyID, dropPartitionMsg.DropPartitionRequest.ProxyID) - assert.Equal(t, dropPartitionReq.PartitionName.CollectionName, dropPartitionMsg.DropPartitionRequest.PartitionName.CollectionName) - - ////////////////////////////DropCollection//////////////////////// - dropCollectionReq := internalPb.DropCollectionRequest{ - MsgType: internalPb.MsgType_kDropCollection, - ReqID: 1, - Timestamp: 11, - ProxyID: 1, - CollectionName: &servicepb.CollectionName{CollectionName: sch.Name}, - } + assert.Equal(t, createCollectionReq.MsgType, createCollectionMsg.CreateCollectionRequest.MsgType) + assert.Equal(t, createCollectionReq.ReqID, createCollectionMsg.CreateCollectionRequest.ReqID) + assert.Equal(t, createCollectionReq.Timestamp, createCollectionMsg.CreateCollectionRequest.Timestamp) + assert.Equal(t, createCollectionReq.ProxyID, createCollectionMsg.CreateCollectionRequest.ProxyID) + assert.Equal(t, createCollectionReq.Schema.Value, createCollectionMsg.CreateCollectionRequest.Schema.Value) + + ////////////////////////////CreatePartition//////////////////////// + partitionName := "partitionName" + strconv.FormatUint(rand.Uint64(), 10) + createPartitionReq := internalpb.CreatePartitionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{ + CollectionName: sch.Name, + Tag: partitionName, + }, + } - st, err = cli.DropCollection(ctx, &dropCollectionReq) - assert.Nil(t, err) - assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + st, err = cli.CreatePartition(ctx, &createPartitionReq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - var dropCollectionMsg *ms.DropCollectionMsg - for { - result := consumeMsg.Consume() - if len(result.Msgs) > 0 { - msgs := result.Msgs - for _, v := range msgs { - dropCollectionMsg = v.(*ms.DropCollectionMsg) + var createPartitionMsg *ms.CreatePartitionMsg + for { + result := consumeMsg.Consume() + if len(result.Msgs) > 0 { + msgs := result.Msgs + for _, v := range msgs { + createPartitionMsg = v.(*ms.CreatePartitionMsg) + } + break } - break } - } - assert.Equal(t, dropCollectionReq.MsgType, dropCollectionMsg.DropCollectionRequest.MsgType) - assert.Equal(t, dropCollectionReq.ReqID, dropCollectionMsg.DropCollectionRequest.ReqID) - assert.Equal(t, dropCollectionReq.Timestamp, dropCollectionMsg.DropCollectionRequest.Timestamp) - assert.Equal(t, dropCollectionReq.ProxyID, dropCollectionMsg.DropCollectionRequest.ProxyID) - assert.Equal(t, dropCollectionReq.CollectionName.CollectionName, dropCollectionMsg.DropCollectionRequest.CollectionName.CollectionName) + assert.Equal(t, createPartitionReq.MsgType, createPartitionMsg.CreatePartitionRequest.MsgType) + assert.Equal(t, createPartitionReq.ReqID, createPartitionMsg.CreatePartitionRequest.ReqID) + assert.Equal(t, createPartitionReq.Timestamp, createPartitionMsg.CreatePartitionRequest.Timestamp) + assert.Equal(t, createPartitionReq.ProxyID, createPartitionMsg.CreatePartitionRequest.ProxyID) + assert.Equal(t, createPartitionReq.PartitionName.CollectionName, createPartitionMsg.CreatePartitionRequest.PartitionName.CollectionName) + assert.Equal(t, createPartitionReq.PartitionName.Tag, createPartitionMsg.CreatePartitionRequest.PartitionName.Tag) + + ////////////////////////////DropPartition//////////////////////// + dropPartitionReq := internalpb.DropPartitionRequest{ + MsgType: internalpb.MsgType_kDropPartition, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{ + CollectionName: sch.Name, + Tag: partitionName, + }, + } + + st, err = cli.DropPartition(ctx, &dropPartitionReq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + var dropPartitionMsg *ms.DropPartitionMsg + for { + result := consumeMsg.Consume() + if len(result.Msgs) > 0 { + msgs := result.Msgs + for _, v := range msgs { + dropPartitionMsg = v.(*ms.DropPartitionMsg) + } + break + } + } + assert.Equal(t, dropPartitionReq.MsgType, dropPartitionMsg.DropPartitionRequest.MsgType) + assert.Equal(t, dropPartitionReq.ReqID, dropPartitionMsg.DropPartitionRequest.ReqID) + assert.Equal(t, dropPartitionReq.Timestamp, dropPartitionMsg.DropPartitionRequest.Timestamp) + assert.Equal(t, dropPartitionReq.ProxyID, dropPartitionMsg.DropPartitionRequest.ProxyID) + assert.Equal(t, dropPartitionReq.PartitionName.CollectionName, dropPartitionMsg.DropPartitionRequest.PartitionName.CollectionName) + + ////////////////////////////DropCollection//////////////////////// + dropCollectionReq := internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &servicepb.CollectionName{CollectionName: sch.Name}, + } + + st, err = cli.DropCollection(ctx, &dropCollectionReq) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + + var dropCollectionMsg *ms.DropCollectionMsg + for { + result := consumeMsg.Consume() + if len(result.Msgs) > 0 { + msgs := result.Msgs + for _, v := range msgs { + dropCollectionMsg = v.(*ms.DropCollectionMsg) + } + break + } + } + assert.Equal(t, dropCollectionReq.MsgType, dropCollectionMsg.DropCollectionRequest.MsgType) + assert.Equal(t, dropCollectionReq.ReqID, dropCollectionMsg.DropCollectionRequest.ReqID) + assert.Equal(t, dropCollectionReq.Timestamp, dropCollectionMsg.DropCollectionRequest.Timestamp) + assert.Equal(t, dropCollectionReq.ProxyID, dropCollectionMsg.DropCollectionRequest.ProxyID) + assert.Equal(t, dropCollectionReq.CollectionName.CollectionName, dropCollectionMsg.DropCollectionRequest.CollectionName.CollectionName) + }) + + t.Run("TestSegmentManager_RPC", func(t *testing.T) { + collName := "test_coll" + partitionTag := "test_part" + schema := &schemapb.CollectionSchema{ + Name: collName, + Description: "test coll", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + {FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_INT32}, + {FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}}, + }, + } + schemaBytes, err := proto.Marshal(schema) + assert.Nil(t, err) + _, err = cli.CreateCollection(ctx, &internalpb.CreateCollectionRequest{ + MsgType: internalpb.MsgType_kCreateCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + Schema: &commonpb.Blob{Value: schemaBytes}, + }) + assert.Nil(t, err) + _, err = cli.CreatePartition(ctx, &internalpb.CreatePartitionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + ReqID: 2, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + PartitionName: &servicepb.PartitionName{ + CollectionName: collName, + Tag: partitionTag, + }, + }) + assert.Nil(t, err) + + resp, err := cli.AssignSegmentID(ctx, &internalpb.AssignSegIDRequest{ + PeerID: 1, + Role: internalpb.PeerRole_Proxy, + PerChannelReq: []*internalpb.SegIDRequest{ + {Count: 10000, ChannelID: 0, CollName: collName, PartitionTag: partitionTag}, + }, + }) + assert.Nil(t, err) + assignments := resp.GetPerChannelAssignment() + assert.EqualValues(t, 1, len(assignments)) + assert.EqualValues(t, commonpb.ErrorCode_SUCCESS, assignments[0].Status.ErrorCode) + assert.EqualValues(t, collName, assignments[0].CollName) + assert.EqualValues(t, partitionTag, assignments[0].PartitionTag) + assert.EqualValues(t, int32(0), assignments[0].ChannelID) + assert.EqualValues(t, uint32(10000), assignments[0].Count) + + // test stats + segID := assignments[0].SegID + pulsarAddress := Params.PulsarAddress + msgStream := ms.NewPulsarMsgStream(ctx, 1024) + msgStream.SetPulsarClient(pulsarAddress) + msgStream.CreatePulsarProducers([]string{Params.QueryNodeStatsChannelName}) + msgStream.Start() + defer msgStream.Close() + + err = msgStream.Produce(&ms.MsgPack{ + BeginTs: 102, + EndTs: 104, + Msgs: []ms.TsMsg{ + &ms.QueryNodeStatsMsg{ + QueryNodeStats: internalpb.QueryNodeStats{ + MsgType: internalpb.MsgType_kQueryNodeStats, + PeerID: 1, + SegStats: []*internalpb.SegmentStats{ + {SegmentID: segID, MemorySize: 600000000, NumRows: 1000000, RecentlyModified: true}, + }, + }, + BaseMsg: ms.BaseMsg{ + HashValues: []uint32{0}, + }, + }, + }, + }) + assert.Nil(t, err) + + time.Sleep(500 * time.Millisecond) + segMeta, err := svr.metaTable.GetSegmentByID(segID) + assert.Nil(t, err) + assert.EqualValues(t, 1000000, segMeta.GetNumRows()) + assert.EqualValues(t, int64(600000000), segMeta.GetMemSize()) + + ser := servicepb.CollectionName{CollectionName: collName} + reqDrop := internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + ReqID: 1, + Timestamp: uint64(time.Now().Unix()), + ProxyID: 1, + CollectionName: &ser, + } + + // DropCollection + st, err := cli.DropCollection(ctx, &reqDrop) + assert.Nil(t, err) + assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) + }) cancel() + conn.Close() svr.Close() } diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 601f412a4c78ed58468d5d09f19d74ce3a9cc282..216583a005ce76b02507be9a7e058d8e6a256759 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -524,7 +524,7 @@ func (mt *metaTable) saveFieldIndexMetaToEtcd(meta *pb.FieldIndexMeta) error { return mt.client.Save(key, marshaledMeta) } -func (mt *metaTable) DeleteFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexType string, indexParams []*commonpb.KeyValuePair) error { +func (mt *metaTable) DeleteFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexParams []*commonpb.KeyValuePair) error { mt.indexLock.Lock() defer mt.indexLock.Unlock() @@ -568,6 +568,22 @@ func (mt *metaTable) HasFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexPa return false, nil } +func (mt *metaTable) GetFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexParams []*commonpb.KeyValuePair) (*pb.FieldIndexMeta, error) { + mt.indexLock.RLock() + defer mt.indexLock.RUnlock() + + if _, ok := mt.segID2IndexMetas[segID]; !ok { + return nil, fmt.Errorf("can not find segment %d", segID) + } + + for _, v := range mt.segID2IndexMetas[segID] { + if v.FieldID == fieldID && typeutil.CompareIndexParams(v.IndexParams, indexParams) { + return &v, nil + } + } + return nil, fmt.Errorf("can not find field %d", fieldID) +} + func (mt *metaTable) UpdateFieldIndexMeta(meta *pb.FieldIndexMeta) error { mt.indexLock.Lock() defer mt.indexLock.Unlock() @@ -635,3 +651,30 @@ func (mt *metaTable) GetFieldIndexParams(collID UniqueID, fieldID UniqueID) ([]* } return nil, fmt.Errorf("can not find field %d in collection %d", fieldID, collID) } + +func (mt *metaTable) UpdateFieldIndexParams(collName string, fieldName string, indexParams []*commonpb.KeyValuePair) error { + mt.ddLock.Lock() + defer mt.ddLock.Unlock() + + vid, ok := mt.collName2ID[collName] + if !ok { + return errors.Errorf("can't find collection: " + collName) + } + meta, ok := mt.collID2Meta[vid] + if !ok { + return errors.Errorf("can't find collection: " + collName) + } + + for _, fieldSchema := range meta.Schema.Fields { + if fieldSchema.Name == fieldName { + fieldSchema.IndexParams = indexParams + if err := mt.saveCollectionMeta(&meta); err != nil { + _ = mt.reloadFromKV() + return err + } + return nil + } + } + + return fmt.Errorf("can not find field with id %s", fieldName) +} diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 01a953c3f56d688dfeee441599914438292a2ea5..00940dd75edc8ed69c478ecbf53fea60f46e5edf 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -497,7 +497,7 @@ func TestMetaTable_IndexMeta(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, indexbuilderpb.IndexStatus_FINISHED, meta.segID2IndexMetas[1][0].Status) - err = meta.DeleteFieldIndexMeta(1, 100, "type1", []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}}) + err = meta.DeleteFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}}) assert.Nil(t, err) res, err = meta.HasFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}}) assert.Nil(t, err) diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go deleted file mode 100644 index e0eef54dff61f1c61ce1c7679c61046371159c37..0000000000000000000000000000000000000000 --- a/internal/master/partition_task_test.go +++ /dev/null @@ -1,339 +0,0 @@ -package master - -import ( - "context" - "testing" - - "github.com/golang/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" -) - -func TestMaster_Partition(t *testing.T) { - Init() - refreshMasterAddress() - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - etcdAddr := Params.EtcdAddress - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) - assert.Nil(t, err) - _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) - assert.Nil(t, err) - - Params = ParamTable{ - Address: Params.Address, - Port: Params.Port, - - EtcdAddress: Params.EtcdAddress, - MetaRootPath: "/test/root/meta", - KvRootPath: "/test/root/kv", - PulsarAddress: Params.PulsarAddress, - - ProxyIDList: []typeutil.UniqueID{1, 2}, - WriteNodeIDList: []typeutil.UniqueID{3, 4}, - - TopicNum: 5, - QueryNodeNum: 3, - SoftTimeTickBarrierInterval: 300, - - // segment - SegmentSize: 536870912 / 1024 / 1024, - SegmentSizeFactor: 0.75, - DefaultRecordSize: 1024, - MinSegIDAssignCnt: 1048576 / 1024, - MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, - SegIDAssignExpiration: 2000, - - // msgChannel - ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, - WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, - InsertChannelNames: []string{"dm0", "dm1"}, - K2SChannelNames: []string{"k2s0", "k2s1"}, - QueryNodeStatsChannelName: "statistic", - MsgChannelSubName: Params.MsgChannelSubName, - - MaxPartitionNum: int64(4096), - DefaultPartitionTag: "_default", - } - - svr, err := CreateServer(ctx) - assert.Nil(t, err) - err = svr.Run(int64(Params.Port)) - assert.Nil(t, err) - - conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) - assert.Nil(t, err) - defer conn.Close() - - cli := masterpb.NewMasterClient(conn) - sch := schemapb.CollectionSchema{ - Name: "col1", - Description: "test collection", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - Name: "col1_f1", - Description: "test collection filed 1", - DataType: schemapb.DataType_VECTOR_FLOAT, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f1_tk1", - Value: "col1_f1_tv1", - }, - { - Key: "col1_f1_tk2", - Value: "col1_f1_tv2", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f1_ik1", - Value: "col1_f1_iv1", - }, - { - Key: "col1_f1_ik2", - Value: "col1_f1_iv2", - }, - }, - }, - { - Name: "col1_f2", - Description: "test collection filed 2", - DataType: schemapb.DataType_VECTOR_BINARY, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f2_tk1", - Value: "col1_f2_tv1", - }, - { - Key: "col1_f2_tk2", - Value: "col1_f2_tv2", - }, - }, - IndexParams: []*commonpb.KeyValuePair{ - { - Key: "col1_f2_ik1", - Value: "col1_f2_iv1", - }, - { - Key: "col1_f2_ik2", - Value: "col1_f2_iv2", - }, - }, - }, - }, - } - schemaBytes, err := proto.Marshal(&sch) - assert.Nil(t, err) - - createCollectionReq := internalpb.CreateCollectionRequest{ - MsgType: internalpb.MsgType_kCreatePartition, - ReqID: 1, - Timestamp: 1, - ProxyID: 1, - Schema: &commonpb.Blob{Value: schemaBytes}, - } - st, _ := cli.CreateCollection(ctx, &createCollectionReq) - assert.NotNil(t, st) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) - - createPartitionReq := internalpb.CreatePartitionRequest{ - MsgType: internalpb.MsgType_kCreatePartition, - ReqID: 1, - Timestamp: 2, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, - } - st, _ = cli.CreatePartition(ctx, &createPartitionReq) - assert.NotNil(t, st) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) - - createPartitionReq = internalpb.CreatePartitionRequest{ - MsgType: internalpb.MsgType_kCreatePartition, - ReqID: 1, - Timestamp: 1, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, - } - st, _ = cli.CreatePartition(ctx, &createPartitionReq) - assert.NotNil(t, st) - assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, st.ErrorCode) - - createPartitionReq = internalpb.CreatePartitionRequest{ - MsgType: internalpb.MsgType_kCreatePartition, - ReqID: 1, - Timestamp: 3, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, - } - st, _ = cli.CreatePartition(ctx, &createPartitionReq) - assert.NotNil(t, st) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) - - collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) - assert.Nil(t, err) - t.Logf("collection id = %d", collMeta.ID) - assert.Equal(t, collMeta.CreateTime, uint64(1)) - assert.Equal(t, collMeta.Schema.Name, "col1") - assert.Equal(t, collMeta.Schema.AutoID, false) - assert.Equal(t, len(collMeta.Schema.Fields), 2) - assert.Equal(t, collMeta.Schema.Fields[0].Name, "col1_f1") - assert.Equal(t, collMeta.Schema.Fields[1].Name, "col1_f2") - assert.Equal(t, collMeta.Schema.Fields[0].DataType, schemapb.DataType_VECTOR_FLOAT) - assert.Equal(t, collMeta.Schema.Fields[1].DataType, schemapb.DataType_VECTOR_BINARY) - assert.Equal(t, len(collMeta.Schema.Fields[0].TypeParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[0].IndexParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[1].TypeParams), 2) - assert.Equal(t, len(collMeta.Schema.Fields[1].IndexParams), 2) - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Key, "col1_f1_tk1") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Key, "col1_f1_tk2") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[0].Value, "col1_f1_tv1") - assert.Equal(t, collMeta.Schema.Fields[0].TypeParams[1].Value, "col1_f1_tv2") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Key, "col1_f1_ik1") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Key, "col1_f1_ik2") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[0].Value, "col1_f1_iv1") - assert.Equal(t, collMeta.Schema.Fields[0].IndexParams[1].Value, "col1_f1_iv2") - - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Key, "col1_f2_tk1") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Key, "col1_f2_tk2") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[0].Value, "col1_f2_tv1") - assert.Equal(t, collMeta.Schema.Fields[1].TypeParams[1].Value, "col1_f2_tv2") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Key, "col1_f2_ik1") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Key, "col1_f2_ik2") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1") - assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2") - - //assert.Equal(t, collMeta.PartitionTags[0], "partition1") - //assert.Equal(t, collMeta.PartitionTags[1], "partition2") - assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, collMeta.PartitionTags) - - showPartitionReq := internalpb.ShowPartitionRequest{ - MsgType: internalpb.MsgType_kShowPartitions, - ReqID: 1, - Timestamp: 4, - ProxyID: 1, - CollectionName: &servicepb.CollectionName{CollectionName: "col1"}, - } - - stringList, err := cli.ShowPartitions(ctx, &showPartitionReq) - assert.Nil(t, err) - assert.ElementsMatch(t, []string{"_default", "partition1", "partition2"}, stringList.Values) - - showPartitionReq = internalpb.ShowPartitionRequest{ - MsgType: internalpb.MsgType_kShowPartitions, - ReqID: 1, - Timestamp: 3, - ProxyID: 1, - CollectionName: &servicepb.CollectionName{CollectionName: "col1"}, - } - - stringList, _ = cli.ShowPartitions(ctx, &showPartitionReq) - assert.NotNil(t, stringList) - assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, stringList.Status.ErrorCode) - - hasPartitionReq := internalpb.HasPartitionRequest{ - MsgType: internalpb.MsgType_kHasPartition, - ReqID: 1, - Timestamp: 5, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, - } - - hasPartition, err := cli.HasPartition(ctx, &hasPartitionReq) - assert.Nil(t, err) - assert.True(t, hasPartition.Value) - - hasPartitionReq = internalpb.HasPartitionRequest{ - MsgType: internalpb.MsgType_kHasPartition, - ReqID: 1, - Timestamp: 4, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, - } - - hasPartition, _ = cli.HasPartition(ctx, &hasPartitionReq) - assert.NotNil(t, hasPartition) - assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, stringList.Status.ErrorCode) - - hasPartitionReq = internalpb.HasPartitionRequest{ - MsgType: internalpb.MsgType_kHasPartition, - ReqID: 1, - Timestamp: 6, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition3"}, - } - - hasPartition, err = cli.HasPartition(ctx, &hasPartitionReq) - assert.Nil(t, err) - assert.False(t, hasPartition.Value) - - deletePartitionReq := internalpb.DropPartitionRequest{ - MsgType: internalpb.MsgType_kDropPartition, - ReqID: 1, - Timestamp: 7, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, - } - - st, err = cli.DropPartition(ctx, &deletePartitionReq) - assert.Nil(t, err) - assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) - - deletePartitionReq = internalpb.DropPartitionRequest{ - MsgType: internalpb.MsgType_kDropPartition, - ReqID: 1, - Timestamp: 6, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, - } - - st, _ = cli.DropPartition(ctx, &deletePartitionReq) - assert.NotNil(t, st) - assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, st.ErrorCode) - - hasPartitionReq = internalpb.HasPartitionRequest{ - MsgType: internalpb.MsgType_kHasPartition, - ReqID: 1, - Timestamp: 8, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition2"}, - } - - hasPartition, err = cli.HasPartition(ctx, &hasPartitionReq) - assert.Nil(t, err) - assert.False(t, hasPartition.Value) - - describePartitionReq := internalpb.DescribePartitionRequest{ - MsgType: internalpb.MsgType_kDescribePartition, - ReqID: 1, - Timestamp: 9, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, - } - - describePartition, err := cli.DescribePartition(ctx, &describePartitionReq) - assert.Nil(t, err) - assert.Equal(t, &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, describePartition.Name) - - describePartitionReq = internalpb.DescribePartitionRequest{ - MsgType: internalpb.MsgType_kDescribePartition, - ReqID: 1, - Timestamp: 8, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{CollectionName: "col1", Tag: "partition1"}, - } - - describePartition, _ = cli.DescribePartition(ctx, &describePartitionReq) - assert.Equal(t, commonpb.ErrorCode_UNEXPECTED_ERROR, describePartition.Status.ErrorCode) - - svr.Close() -} diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index 4e80cf6f4de09706ba034ee81adad543c7ce5fc4..2e7ae0f24f58c2d79d1ae07649a38d58c2c4e532 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -358,6 +358,20 @@ func (manager *SegmentManager) initChannelRanges() error { } return nil } + +// ForceClose set segments of collection with collID closable, segment will be closed after the assignments of it has expired +func (manager *SegmentManager) ForceClose(collID UniqueID) error { + status, ok := manager.collStatus[collID] + if !ok { + return nil + } + + for _, segStatus := range status.segments { + segStatus.closable = true + } + return nil +} + func NewSegmentManager(ctx context.Context, meta *metaTable, globalIDAllocator func() (UniqueID, error), diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index 3e33527871543f883b710c2cc61101d0eca16202..28e16ec7eca16dacc512e598b769dfbf0dad4a34 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -6,20 +6,16 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" ) func TestSegmentManager_AssignSegment(t *testing.T) { @@ -160,143 +156,3 @@ func TestSegmentManager_AssignSegment(t *testing.T) { assert.Nil(t, err) assert.NotEqualValues(t, 0, segMeta.CloseTime) } -func TestSegmentManager_RPC(t *testing.T) { - Init() - refreshMasterAddress() - etcdAddress := Params.EtcdAddress - rootPath := "/test/root" - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}}) - assert.Nil(t, err) - _, err = cli.Delete(ctx, rootPath, clientv3.WithPrefix()) - assert.Nil(t, err) - Params = ParamTable{ - Address: Params.Address, - Port: Params.Port, - - EtcdAddress: Params.EtcdAddress, - MetaRootPath: "/test/root/meta", - KvRootPath: "/test/root/kv", - PulsarAddress: Params.PulsarAddress, - - ProxyIDList: []typeutil.UniqueID{1, 2}, - WriteNodeIDList: []typeutil.UniqueID{3, 4}, - - TopicNum: 5, - QueryNodeNum: 3, - SoftTimeTickBarrierInterval: 300, - - // segment - SegmentSize: 536870912 / 1024 / 1024, - SegmentSizeFactor: 0.75, - DefaultRecordSize: 1024, - MinSegIDAssignCnt: 1048576 / 1024, - MaxSegIDAssignCnt: Params.MaxSegIDAssignCnt, - SegIDAssignExpiration: 2000, - - // msgChannel - ProxyTimeTickChannelNames: []string{"proxy1", "proxy2"}, - WriteNodeTimeTickChannelNames: []string{"write3", "write4"}, - InsertChannelNames: []string{"dm0", "dm1"}, - K2SChannelNames: []string{"k2s0", "k2s1"}, - QueryNodeStatsChannelName: "statistic", - MsgChannelSubName: Params.MsgChannelSubName, - - MaxPartitionNum: int64(4096), - DefaultPartitionTag: "_default", - } - - collName := "test_coll" - partitionTag := "test_part" - master, err := CreateServer(ctx) - assert.Nil(t, err) - defer master.Close() - err = master.Run(int64(Params.Port)) - assert.Nil(t, err) - dialContext, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) - assert.Nil(t, err) - defer dialContext.Close() - client := masterpb.NewMasterClient(dialContext) - schema := &schemapb.CollectionSchema{ - Name: collName, - Description: "test coll", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - {FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_INT32}, - {FieldID: 1, Name: "f1", IsPrimaryKey: false, DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}}, - }, - } - schemaBytes, err := proto.Marshal(schema) - assert.Nil(t, err) - _, err = client.CreateCollection(ctx, &internalpb.CreateCollectionRequest{ - MsgType: internalpb.MsgType_kCreateCollection, - ReqID: 1, - Timestamp: 100, - ProxyID: 1, - Schema: &commonpb.Blob{Value: schemaBytes}, - }) - assert.Nil(t, err) - _, err = client.CreatePartition(ctx, &internalpb.CreatePartitionRequest{ - MsgType: internalpb.MsgType_kCreatePartition, - ReqID: 2, - Timestamp: 101, - ProxyID: 1, - PartitionName: &servicepb.PartitionName{ - CollectionName: collName, - Tag: partitionTag, - }, - }) - assert.Nil(t, err) - - resp, err := client.AssignSegmentID(ctx, &internalpb.AssignSegIDRequest{ - PeerID: 1, - Role: internalpb.PeerRole_Proxy, - PerChannelReq: []*internalpb.SegIDRequest{ - {Count: 10000, ChannelID: 0, CollName: collName, PartitionTag: partitionTag}, - }, - }) - assert.Nil(t, err) - assignments := resp.GetPerChannelAssignment() - assert.EqualValues(t, 1, len(assignments)) - assert.EqualValues(t, commonpb.ErrorCode_SUCCESS, assignments[0].Status.ErrorCode) - assert.EqualValues(t, collName, assignments[0].CollName) - assert.EqualValues(t, partitionTag, assignments[0].PartitionTag) - assert.EqualValues(t, int32(0), assignments[0].ChannelID) - assert.EqualValues(t, uint32(10000), assignments[0].Count) - - // test stats - segID := assignments[0].SegID - pulsarAddress := Params.PulsarAddress - ms := msgstream.NewPulsarMsgStream(ctx, 1024) - ms.SetPulsarClient(pulsarAddress) - ms.CreatePulsarProducers([]string{"statistic"}) - ms.Start() - defer ms.Close() - - err = ms.Produce(&msgstream.MsgPack{ - BeginTs: 102, - EndTs: 104, - Msgs: []msgstream.TsMsg{ - &msgstream.QueryNodeStatsMsg{ - QueryNodeStats: internalpb.QueryNodeStats{ - MsgType: internalpb.MsgType_kQueryNodeStats, - PeerID: 1, - SegStats: []*internalpb.SegmentStats{ - {SegmentID: segID, MemorySize: 600000000, NumRows: 1000000, RecentlyModified: true}, - }, - }, - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - }, - }, - }) - assert.Nil(t, err) - - time.Sleep(500 * time.Millisecond) - segMeta, err := master.metaTable.GetSegmentByID(segID) - assert.Nil(t, err) - assert.EqualValues(t, 1000000, segMeta.GetNumRows()) - assert.EqualValues(t, int64(600000000), segMeta.GetMemSize()) -} diff --git a/internal/util/flowgraph/message.go b/internal/util/flowgraph/message.go index 579f51398b0ffe78d27d9662b984643ea4c80f8b..e5c01d7d4ef92872f38abc45713cd68f06f5a6ee 100644 --- a/internal/util/flowgraph/message.go +++ b/internal/util/flowgraph/message.go @@ -13,6 +13,14 @@ type MsgStreamMsg struct { timestampMax Timestamp } +func GenerateMsgStreamMsg(tsMessages []msgstream.TsMsg, timestampMin, timestampMax Timestamp) *MsgStreamMsg { + return &MsgStreamMsg{ + tsMessages: tsMessages, + timestampMin: timestampMin, + timestampMax: timestampMax, + } +} + func (msMsg *MsgStreamMsg) TimeTick() Timestamp { return msMsg.timestampMax } diff --git a/internal/writenode/data_sync_service.go b/internal/writenode/data_sync_service.go index 6e3a07265c7fe601b424a43e65f9dcc290368f11..3c18b4eaa3e319fa786001d60d7406a98670f58d 100644 --- a/internal/writenode/data_sync_service.go +++ b/internal/writenode/data_sync_service.go @@ -39,7 +39,7 @@ func (dsService *dataSyncService) initNodes() { var dmStreamNode Node = newDmInputNode(dsService.ctx) var ddStreamNode Node = newDDInputNode(dsService.ctx) - var ddNode Node = newDDNode() + var ddNode Node = newDDNode(dsService.ctx) var filterDmNode Node = newFilteredDmNode() var insertBufferNode Node = newInsertBufferNode(dsService.ctx) diff --git a/internal/writenode/dd_buffer.go b/internal/writenode/dd_buffer.go deleted file mode 100644 index b312f856efb076cb08e4835265febcb8a0ef2d38..0000000000000000000000000000000000000000 --- a/internal/writenode/dd_buffer.go +++ /dev/null @@ -1,47 +0,0 @@ -package writenode - -import ( - "errors" - "strconv" -) - -type ddBuffer struct { - collectionBuffer map[UniqueID]interface{} - partitionBuffer map[UniqueID]interface{} -} - -func (d *ddBuffer) addCollection(collectionID UniqueID) error { - if _, ok := d.collectionBuffer[collectionID]; !ok { - return errors.New("collection " + strconv.FormatInt(collectionID, 10) + " is already exists") - } - - d.collectionBuffer[collectionID] = nil - return nil -} - -func (d *ddBuffer) removeCollection(collectionID UniqueID) error { - if _, ok := d.collectionBuffer[collectionID]; !ok { - return errors.New("cannot found collection " + strconv.FormatInt(collectionID, 10)) - } - - delete(d.collectionBuffer, collectionID) - return nil -} - -func (d *ddBuffer) addPartition(partitionID UniqueID) error { - if _, ok := d.partitionBuffer[partitionID]; !ok { - return errors.New("partition " + strconv.FormatInt(partitionID, 10) + " is already exists") - } - - d.partitionBuffer[partitionID] = nil - return nil -} - -func (d *ddBuffer) removePartition(partitionID UniqueID) error { - if _, ok := d.partitionBuffer[partitionID]; !ok { - return errors.New("cannot found partition " + strconv.FormatInt(partitionID, 10)) - } - - delete(d.partitionBuffer, partitionID) - return nil -} diff --git a/internal/writenode/flow_graph_dd_node.go b/internal/writenode/flow_graph_dd_node.go index d113ed120bd2c348dc7b3faa195494949c42fa3c..f9b8a3946355df921ea1b4ac884d49c4877f6e73 100644 --- a/internal/writenode/flow_graph_dd_node.go +++ b/internal/writenode/flow_graph_dd_node.go @@ -1,20 +1,65 @@ package writenode import ( + "context" + "errors" "log" "sort" + "strconv" "github.com/golang/protobuf/proto" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/zilliztech/milvus-distributed/internal/allocator" + "github.com/zilliztech/milvus-distributed/internal/kv" + miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/storage" ) type ddNode struct { BaseNode - ddMsg *ddMsg - ddBuffer *ddBuffer + ddMsg *ddMsg + ddRecords *ddRecords + ddBuffer *ddBuffer + + idAllocator *allocator.IDAllocator + kv kv.Base +} + +type ddData struct { + ddRequestString []string + timestamps []Timestamp + eventTypes []storage.EventTypeCode +} + +type ddBuffer struct { + ddData map[UniqueID]*ddData + maxSize int +} + +type ddRecords struct { + collectionRecords map[UniqueID]interface{} + partitionRecords map[UniqueID]interface{} +} + +func (d *ddBuffer) size() int { + if d.ddData == nil || len(d.ddData) <= 0 { + return 0 + } + + size := 0 + for _, data := range d.ddData { + size += len(data.ddRequestString) + } + return size +} + +func (d *ddBuffer) full() bool { + return d.size() >= d.maxSize } func (ddNode *ddNode) Name() string { @@ -68,6 +113,62 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { } } + // generate binlog + if ddNode.ddBuffer.full() { + ddCodec := &storage.DataDefinitionCodec{} + for collectionID, data := range ddNode.ddBuffer.ddData { + // buffer data to binlog + binLogs, err := ddCodec.Serialize(data.timestamps, data.ddRequestString, data.eventTypes) + if err != nil { + log.Println(err) + continue + } + if len(binLogs) != 2 { + log.Println("illegal binLogs") + continue + } + + // binLogs -> minIO/S3 + if len(data.ddRequestString) != len(data.timestamps) || + len(data.timestamps) != len(data.eventTypes) { + log.Println("illegal ddBuffer, failed to save binlog") + continue + } else { + // Blob key example: + // ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} + // ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} + keyCommon := Params.DdLogRootPath + strconv.FormatInt(collectionID, 10) + "/" + + // save ts binlog + timestampLogIdx, err := ddNode.idAllocator.AllocOne() + if err != nil { + log.Println(err) + } + timestampKey := keyCommon + binLogs[0].GetKey() + "/" + strconv.FormatInt(timestampLogIdx, 10) + err = ddNode.kv.Save(timestampKey, string(binLogs[0].GetValue())) + if err != nil { + log.Println(err) + } + log.Println("save ts binlog, key = ", timestampKey) + + // save dd binlog + ddLogIdx, err := ddNode.idAllocator.AllocOne() + if err != nil { + log.Println(err) + } + ddKey := keyCommon + binLogs[1].GetKey() + "/" + strconv.FormatInt(ddLogIdx, 10) + err = ddNode.kv.Save(ddKey, string(binLogs[1].GetValue())) + if err != nil { + log.Println(err) + } + log.Println("save dd binlog, key = ", ddKey) + } + } + // clear buffer + ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData) + log.Println("dd buffer flushed") + } + var res Msg = ddNode.ddMsg return []*Msg{&res} } @@ -75,16 +176,18 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { collectionID := msg.CollectionID - err := ddNode.ddBuffer.addCollection(collectionID) - if err != nil { + // add collection + if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; ok { + err := errors.New("collection " + strconv.FormatInt(collectionID, 10) + " is already exists") log.Println(err) return } + ddNode.ddRecords.collectionRecords[collectionID] = nil // TODO: add default partition? var schema schemapb.CollectionSchema - err = proto.Unmarshal((*msg.Schema).Value, &schema) + err := proto.Unmarshal((*msg.Schema).Value, &schema) if err != nil { log.Println(err) return @@ -96,17 +199,30 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { timestamp: msg.Timestamp, }) - // TODO: write dd binlog + _, ok := ddNode.ddBuffer.ddData[collectionID] + if !ok { + ddNode.ddBuffer.ddData[collectionID] = &ddData{ + ddRequestString: make([]string, 0), + timestamps: make([]Timestamp, 0), + eventTypes: make([]storage.EventTypeCode, 0), + } + } + + ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.CreateCollectionRequest.String()) + ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Timestamp) + ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.CreateCollectionEventType) } func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { collectionID := msg.CollectionID - err := ddNode.ddBuffer.removeCollection(collectionID) - if err != nil { + // remove collection + if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; !ok { + err := errors.New("cannot found collection " + strconv.FormatInt(collectionID, 10)) log.Println(err) return } + delete(ddNode.ddRecords.collectionRecords, collectionID) collectionName := msg.CollectionName.CollectionName ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName], @@ -115,17 +231,31 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { timestamp: msg.Timestamp, }) - // TODO: write dd binlog + _, ok := ddNode.ddBuffer.ddData[collectionID] + if !ok { + ddNode.ddBuffer.ddData[collectionID] = &ddData{ + ddRequestString: make([]string, 0), + timestamps: make([]Timestamp, 0), + eventTypes: make([]storage.EventTypeCode, 0), + } + } + + ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropCollectionRequest.String()) + ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Timestamp) + ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropCollectionEventType) } func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { partitionID := msg.PartitionID + collectionID := msg.CollectionID - err := ddNode.ddBuffer.addPartition(partitionID) - if err != nil { + // add partition + if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; ok { + err := errors.New("partition " + strconv.FormatInt(partitionID, 10) + " is already exists") log.Println(err) return } + ddNode.ddRecords.partitionRecords[partitionID] = nil partitionTag := msg.PartitionName.Tag ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], @@ -134,17 +264,31 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { timestamp: msg.Timestamp, }) - // TODO: write dd binlog + _, ok := ddNode.ddBuffer.ddData[collectionID] + if !ok { + ddNode.ddBuffer.ddData[collectionID] = &ddData{ + ddRequestString: make([]string, 0), + timestamps: make([]Timestamp, 0), + eventTypes: make([]storage.EventTypeCode, 0), + } + } + + ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.CreatePartitionRequest.String()) + ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Timestamp) + ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.CreatePartitionEventType) } func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { partitionID := msg.PartitionID + collectionID := msg.CollectionID - err := ddNode.ddBuffer.removePartition(partitionID) - if err != nil { + // remove partition + if _, ok := ddNode.ddRecords.partitionRecords[partitionID]; !ok { + err := errors.New("cannot found partition " + strconv.FormatInt(partitionID, 10)) log.Println(err) return } + delete(ddNode.ddRecords.partitionRecords, partitionID) partitionTag := msg.PartitionName.Tag ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], @@ -153,10 +297,21 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { timestamp: msg.Timestamp, }) - // TODO: write dd binlog + _, ok := ddNode.ddBuffer.ddData[collectionID] + if !ok { + ddNode.ddBuffer.ddData[collectionID] = &ddData{ + ddRequestString: make([]string, 0), + timestamps: make([]Timestamp, 0), + eventTypes: make([]storage.EventTypeCode, 0), + } + } + + ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropPartitionRequest.String()) + ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Timestamp) + ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropPartitionEventType) } -func newDDNode() *ddNode { +func newDDNode(ctx context.Context) *ddNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -164,13 +319,46 @@ func newDDNode() *ddNode { baseNode.SetMaxQueueLength(maxQueueLength) baseNode.SetMaxParallelism(maxParallelism) - ddBuffer := &ddBuffer{ - collectionBuffer: make(map[UniqueID]interface{}), - partitionBuffer: make(map[UniqueID]interface{}), + ddRecords := &ddRecords{ + collectionRecords: make(map[UniqueID]interface{}), + partitionRecords: make(map[UniqueID]interface{}), + } + + minIOEndPoint := Params.MinioAddress + minIOAccessKeyID := Params.MinioAccessKeyID + minIOSecretAccessKey := Params.MinioSecretAccessKey + minIOUseSSL := Params.MinioUseSSL + minIOClient, err := minio.New(minIOEndPoint, &minio.Options{ + Creds: credentials.NewStaticV4(minIOAccessKeyID, minIOSecretAccessKey, ""), + Secure: minIOUseSSL, + }) + if err != nil { + panic(err) + } + // TODO: load bucket name from yaml? + minioKV, err := miniokv.NewMinIOKV(ctx, minIOClient, "write-node-dd-node") + if err != nil { + panic(err) + } + + idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress) + if err != nil { + panic(err) + } + err = idAllocator.Start() + if err != nil { + panic(err) } return &ddNode{ - BaseNode: baseNode, - ddBuffer: ddBuffer, + BaseNode: baseNode, + ddRecords: ddRecords, + ddBuffer: &ddBuffer{ + ddData: make(map[UniqueID]*ddData), + maxSize: Params.FlushDdBufSize, + }, + + idAllocator: idAllocator, + kv: minioKV, } } diff --git a/internal/writenode/flow_graph_dd_node_test.go b/internal/writenode/flow_graph_dd_node_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5903a47b9fdeee37e41b9210ae8d09247b155b3d --- /dev/null +++ b/internal/writenode/flow_graph_dd_node_test.go @@ -0,0 +1,126 @@ +package writenode + +import ( + "context" + "testing" + "time" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" + "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" +) + +func TestFlowGraphDDNode_Operate(t *testing.T) { + const ctxTimeInMillisecond = 2000 + const closeWithDeadline = false + var ctx context.Context + + if closeWithDeadline { + var cancel context.CancelFunc + d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel = context.WithDeadline(context.Background(), d) + defer cancel() + } else { + ctx = context.Background() + } + + startMaster(ctx) + Params.FlushDdBufSize = 4 + + ddNode := newDDNode(ctx) + + colID := UniqueID(0) + colName := "col-test-0" + // create collection + createColReq := internalpb.CreateCollectionRequest{ + MsgType: internalpb.MsgType_kCreateCollection, + CollectionID: colID, + ReqID: 1, + Timestamp: 1, + ProxyID: 1, + Schema: &commonpb.Blob{}, + } + createColMsg := msgstream.CreateCollectionMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: Timestamp(1), + EndTimestamp: Timestamp(1), + HashValues: []uint32{uint32(0)}, + }, + CreateCollectionRequest: createColReq, + } + + // drop collection + dropColReq := internalpb.DropCollectionRequest{ + MsgType: internalpb.MsgType_kDropCollection, + CollectionID: colID, + ReqID: 2, + Timestamp: 2, + ProxyID: 2, + CollectionName: &servicepb.CollectionName{CollectionName: colName}, + } + dropColMsg := msgstream.DropCollectionMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: Timestamp(2), + EndTimestamp: Timestamp(2), + HashValues: []uint32{uint32(0)}, + }, + DropCollectionRequest: dropColReq, + } + + partitionID := UniqueID(100) + partitionTag := "partition-test-0" + // create partition + createPartitionReq := internalpb.CreatePartitionRequest{ + MsgType: internalpb.MsgType_kCreatePartition, + CollectionID: colID, + PartitionID: partitionID, + ReqID: 3, + Timestamp: 3, + ProxyID: 3, + PartitionName: &servicepb.PartitionName{ + CollectionName: colName, + Tag: partitionTag, + }, + } + createPartitionMsg := msgstream.CreatePartitionMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: Timestamp(3), + EndTimestamp: Timestamp(3), + HashValues: []uint32{uint32(0)}, + }, + CreatePartitionRequest: createPartitionReq, + } + + // drop partition + dropPartitionReq := internalpb.DropPartitionRequest{ + MsgType: internalpb.MsgType_kDropPartition, + CollectionID: colID, + PartitionID: partitionID, + ReqID: 4, + Timestamp: 4, + ProxyID: 4, + PartitionName: &servicepb.PartitionName{ + CollectionName: colName, + Tag: partitionTag, + }, + } + dropPartitionMsg := msgstream.DropPartitionMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: Timestamp(4), + EndTimestamp: Timestamp(4), + HashValues: []uint32{uint32(0)}, + }, + DropPartitionRequest: dropPartitionReq, + } + + tsMessages := make([]msgstream.TsMsg, 0) + tsMessages = append(tsMessages, msgstream.TsMsg(&createColMsg)) + tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg)) + tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg)) + tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg)) + msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3)) + var inMsg Msg = msgStream + ddNode.Operate([]*Msg{&inMsg}) +} diff --git a/internal/writenode/write_node_test.go b/internal/writenode/write_node_test.go index 3905b3ab3066874eb6e7a517a1d077ff928cfd2c..b99288b3cd5a40cdfff58a0bca0676886bad90e4 100644 --- a/internal/writenode/write_node_test.go +++ b/internal/writenode/write_node_test.go @@ -1,11 +1,18 @@ package writenode import ( + "context" "fmt" + "log" "math/rand" "os" "strconv" "testing" + + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + + "github.com/zilliztech/milvus-distributed/internal/master" ) func makeNewChannelNames(names []string, suffix string) []string { @@ -22,6 +29,34 @@ func refreshChannelNames() { Params.InsertChannelNames = makeNewChannelNames(Params.InsertChannelNames, suffix) } +func startMaster(ctx context.Context) { + master.Init() + etcdAddr := master.Params.EtcdAddress + metaRootPath := master.Params.MetaRootPath + + etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + if err != nil { + panic(err) + } + _, err = etcdCli.Delete(context.TODO(), metaRootPath, clientv3.WithPrefix()) + if err != nil { + panic(err) + } + + masterPort := 53101 + master.Params.Port = masterPort + svr, err := master.CreateServer(ctx) + if err != nil { + log.Print("create server failed", zap.Error(err)) + } + if err := svr.Run(int64(master.Params.Port)); err != nil { + log.Fatal("run server failed", zap.Error(err)) + } + + fmt.Println("Waiting for server!", svr.IsServing()) + Params.MasterAddress = master.Params.Address + ":" + strconv.Itoa(masterPort) +} + func TestMain(m *testing.M) { Params.Init() refreshChannelNames()