Skip to content
Snippets Groups Projects
Commit 5015a103 authored by XuanYang-cn's avatar XuanYang-cn Committed by yefu.chen
Browse files

Add logic of get meta from MasterService for datanode


Signed-off-by: default avatarXuanYang-cn <xuan.yang@zilliz.com>
parent 1fe09977
No related branches found
No related tags found
No related merge requests found
Showing
with 472 additions and 240 deletions
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
const reTryCnt = 3
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Printf("master service address : %s:%d", ms.Params.Address, ms.Params.Port)
svr, err := msc.NewGrpcServer(ctx)
if err != nil {
panic(err)
}
log.Printf("proxy service address : %s", psc.Params.NetworkAddress())
//proxyService := psc.NewClient(ctx, psc.Params.NetworkAddress())
//TODO, test proxy service GetComponentStates, before set
//if err = svr.SetProxyService(proxyService); err != nil {
// panic(err)
//}
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
cnt := 0
for cnt = 0; cnt < reTryCnt; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= reTryCnt {
panic("connect to data service failed")
}
//if err = svr.SetDataService(dataService); err != nil {
// panic(err)
//}
log.Printf("index service address : %s", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
if err = svr.SetIndexService(indexService); err != nil {
panic(err)
}
if err = svr.Start(); err != nil {
panic(err)
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
log.Printf("Got %s signal to exit", sig.String())
_ = svr.Stop()
}
......@@ -23,4 +23,5 @@ master:
IDAssignExpiration: 2000 # ms
maxPartitionNum: 4096
nodeID: 100
\ No newline at end of file
nodeID: 100
timeout: 5 # time out, 5 seconds
\ No newline at end of file
......@@ -25,7 +25,7 @@ func (alloc *allocatorImpl) allocID() (UniqueID, error) {
resp, err := alloc.masterService.AllocID(&masterpb.IDRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: 1, // GOOSE TODO add msg id
MsgID: 1, // GOOSE TODO
Timestamp: 0, // GOOSE TODO
SourceID: Params.NodeID,
},
......
package datanode
import (
"log"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
......@@ -24,17 +21,9 @@ func (c *Collection) Schema() *schemapb.CollectionSchema {
return c.schema
}
func newCollection(collectionID UniqueID, schemaStr string) *Collection {
var schema schemapb.CollectionSchema
err := proto.UnmarshalText(schemaStr, &schema)
if err != nil {
log.Println(err)
return nil
}
func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection {
var newCollection = &Collection{
schema: &schema,
schema: schema,
id: collectionID,
}
return newCollection
......
......@@ -6,13 +6,14 @@ import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type collectionReplica interface {
// collection
getCollectionNum() int
addCollection(collectionID UniqueID, schemaBlob string) error
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
getCollectionByName(collectionName string) (*Collection, error)
......@@ -162,11 +163,11 @@ func (colReplica *collectionReplicaImpl) getCollectionNum() int {
return len(colReplica.collections)
}
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schemaBlob string) error {
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
var newCollection = newCollection(collectionID, schemaBlob)
var newCollection = newCollection(collectionID, schema)
colReplica.collections = append(colReplica.collections, newCollection)
log.Println("Create collection: ", newCollection.Name())
......
......@@ -3,7 +3,6 @@ package datanode
import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
......@@ -23,10 +22,7 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName
Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
require.NotEqual(t, "", schemaBlob)
var err = replica.addCollection(collectionMeta.ID, schemaBlob)
var err = replica.addCollection(collectionMeta.ID, collectionMeta.Schema)
require.NoError(t, err)
collection, err := replica.getCollectionByName(collectionName)
......
......@@ -3,7 +3,6 @@ package datanode
import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)
......@@ -13,10 +12,7 @@ func TestCollection_newCollection(t *testing.T) {
Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
assert.NotEqual(t, "", schemaBlob)
collection := newCollection(collectionMeta.ID, schemaBlob)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
assert.Equal(t, collection.Name(), collectionName)
assert.Equal(t, collection.ID(), collectionID)
}
......@@ -27,10 +23,7 @@ func TestCollection_deleteCollection(t *testing.T) {
Factory := &MetaFactory{}
collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
assert.NotEqual(t, "", schemaBlob)
collection := newCollection(collectionMeta.ID, schemaBlob)
collection := newCollection(collectionMeta.ID, collectionMeta.Schema)
assert.Equal(t, collection.Name(), collectionName)
assert.Equal(t, collection.ID(), collectionID)
}
......@@ -26,8 +26,8 @@ const (
type (
Inteface interface {
typeutil.Service
typeutil.Component
GetComponentStates() (*internalpb2.ComponentStates, error)
WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
}
......@@ -43,6 +43,7 @@ type (
}
DataNode struct {
// GOOSE TODO: complete interface with component
ctx context.Context
NodeID UniqueID
Role string
......@@ -124,7 +125,7 @@ func (node *DataNode) Init() error {
chanSize := 100
flushChan := make(chan *flushMsg, chanSize)
node.dataSyncService = newDataSyncService(node.ctx, flushChan, replica, alloc)
node.metaService = newMetaService(node.ctx, replica)
node.metaService = newMetaService(node.ctx, replica, node.masterService)
node.replica = replica
// Opentracing
......@@ -154,7 +155,7 @@ func (node *DataNode) Init() error {
func (node *DataNode) Start() error {
go node.dataSyncService.start()
node.metaService.start()
node.metaService.init()
return nil
}
......
......@@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
......@@ -42,7 +41,7 @@ func TestDataSyncService_Start(t *testing.T) {
replica := newReplica()
allocFactory := AllocatorFactory{}
sync := newDataSyncService(ctx, flushChan, replica, allocFactory)
sync.replica.addCollection(collMeta.ID, proto.MarshalTextString(collMeta.Schema))
sync.replica.addCollection(collMeta.ID, collMeta.Schema)
go sync.start()
// test data generate
......
......@@ -3,6 +3,8 @@ package datanode
import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
......@@ -15,6 +17,12 @@ type (
AllocatorFactory struct {
}
MasterServiceFactory struct {
ID UniqueID
collectionName string
collectionID UniqueID
}
)
func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
......@@ -156,3 +164,42 @@ func (alloc AllocatorFactory) allocID() (UniqueID, error) {
// GOOSE TODO: random ID generate
return UniqueID(0), nil
}
func (m *MasterServiceFactory) setID(id UniqueID) {
m.ID = id // GOOSE TODO: random ID generator
}
func (m *MasterServiceFactory) setCollectionID(id UniqueID) {
m.collectionID = id
}
func (m *MasterServiceFactory) setCollectionName(name string) {
m.collectionName = name
}
func (m *MasterServiceFactory) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
resp := &masterpb.IDResponse{
Status: &commonpb.Status{},
ID: m.ID,
}
return resp, nil
}
func (m *MasterServiceFactory) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
resp := &milvuspb.ShowCollectionResponse{
Status: &commonpb.Status{},
CollectionNames: []string{m.collectionName},
}
return resp, nil
}
func (m *MasterServiceFactory) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
f := MetaFactory{}
meta := f.CollectionMetaFactory(m.collectionID, m.collectionName)
resp := &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{},
CollectionID: m.collectionID,
Schema: meta.Schema,
}
return resp, nil
}
......@@ -224,9 +224,8 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) {
return
}
schemaStr := proto.MarshalTextString(&schema)
// add collection
err = ddNode.replica.addCollection(collectionID, schemaStr)
err = ddNode.replica.addCollection(collectionID, &schema)
if err != nil {
log.Println(err)
return
......
......@@ -9,7 +9,6 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
......@@ -39,11 +38,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
Factory := &MetaFactory{}
collMeta := Factory.CollectionMetaFactory(UniqueID(0), "coll1")
schemaBlob := proto.MarshalTextString(collMeta.Schema)
require.NotEqual(t, "", schemaBlob)
replica := newReplica()
err = replica.addCollection(collMeta.ID, schemaBlob)
err = replica.addCollection(collMeta.ID, collMeta.Schema)
require.NoError(t, err)
// Params.FlushInsertBufSize = 2
......
......@@ -4,73 +4,91 @@ import (
"context"
"fmt"
"log"
"path"
"reflect"
"strings"
"time"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
)
type metaService struct {
ctx context.Context
kvBase *etcdkv.EtcdKV
replica collectionReplica
ctx context.Context
replica collectionReplica
masterClient MasterServiceInterface
}
func newMetaService(ctx context.Context, replica collectionReplica) *metaService {
ETCDAddr := Params.EtcdAddress
MetaRootPath := Params.MetaRootPath
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr},
DialTimeout: 5 * time.Second,
})
func newMetaService(ctx context.Context, replica collectionReplica, m MasterServiceInterface) *metaService {
return &metaService{
ctx: ctx,
kvBase: etcdkv.NewEtcdKV(cli, MetaRootPath),
replica: replica,
ctx: ctx,
replica: replica,
masterClient: m,
}
}
func (mService *metaService) start() {
// init from meta
func (mService *metaService) init() {
err := mService.loadCollections()
if err != nil {
log.Fatal("metaService loadCollections failed")
log.Fatal("metaService init failed:", err)
}
}
func GetCollectionObjID(key string) string {
ETCDRootPath := Params.MetaRootPath
func (mService *metaService) loadCollections() error {
names, err := mService.getCollectionNames()
if err != nil {
return err
}
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
return strings.TrimPrefix(key, prefix)
for _, name := range names {
err := mService.createCollection(name)
if err != nil {
return err
}
}
return nil
}
func isCollectionObj(key string) bool {
ETCDRootPath := Params.MetaRootPath
prefix := path.Join(ETCDRootPath, CollectionPrefix) + "/"
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)
func (mService *metaService) getCollectionNames() ([]string, error) {
req := &milvuspb.ShowCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowCollections,
MsgID: 0, //GOOSE TODO
Timestamp: 0, // GOOSE TODO
SourceID: Params.NodeID,
},
DbName: "default", // GOOSE TODO
}
return index == 0
response, err := mService.masterClient.ShowCollections(req)
if err != nil {
return nil, errors.Errorf("Get collection names from master service wrong: %v", err)
}
return response.GetCollectionNames(), nil
}
func isSegmentObj(key string) bool {
ETCDRootPath := Params.MetaRootPath
func (mService *metaService) createCollection(name string) error {
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: 0, //GOOSE TODO
Timestamp: 0, // GOOSE TODO
SourceID: Params.NodeID,
},
DbName: "default", // GOOSE TODO
CollectionName: name,
}
prefix := path.Join(ETCDRootPath, SegmentPrefix) + "/"
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)
response, err := mService.masterClient.DescribeCollection(req)
if err != nil {
return errors.Errorf("Describe collection %v from master service wrong: %v", name, err)
}
return index == 0
err = mService.replica.addCollection(response.GetCollectionID(), response.GetSchema())
if err != nil {
return errors.Errorf("Add collection %v into collReplica wrong: %v", name, err)
}
return nil
}
func printCollectionStruct(obj *etcdpb.CollectionMeta) {
......@@ -85,51 +103,3 @@ func printCollectionStruct(obj *etcdpb.CollectionMeta) {
fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface())
}
}
func (mService *metaService) processCollectionCreate(id string, value string) {
//println(fmt.Sprintf("Create Collection:$%s$", id))
col := mService.collectionUnmarshal(value)
if col != nil {
schema := col.Schema
schemaBlob := proto.MarshalTextString(schema)
err := mService.replica.addCollection(col.ID, schemaBlob)
if err != nil {
log.Println(err)
}
}
}
func (mService *metaService) loadCollections() error {
keys, values, err := mService.kvBase.LoadWithPrefix(CollectionPrefix)
if err != nil {
return err
}
for i := range keys {
objID := GetCollectionObjID(keys[i])
mService.processCollectionCreate(objID, values[i])
}
return nil
}
//----------------------------------------------------------------------- Unmarshal and Marshal
func (mService *metaService) collectionUnmarshal(value string) *etcdpb.CollectionMeta {
col := etcdpb.CollectionMeta{}
err := proto.UnmarshalText(value, &col)
if err != nil {
log.Println(err)
return nil
}
return &col
}
func (mService *metaService) collectionMarshal(col *etcdpb.CollectionMeta) string {
value := proto.MarshalTextString(col)
if value == "" {
log.Println("marshal collection failed")
return ""
}
return value
}
......@@ -7,94 +7,46 @@ import (
"github.com/stretchr/testify/assert"
)
func TestMetaService_start(t *testing.T) {
func TestMetaService_All(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
replica := newReplica()
mFactory := &MasterServiceFactory{}
mFactory.setCollectionID(0)
mFactory.setCollectionName("a-collection")
metaService := newMetaService(ctx, replica, mFactory)
t.Run("Test getCollectionNames", func(t *testing.T) {
names, err := metaService.getCollectionNames()
assert.NoError(t, err)
assert.Equal(t, 1, len(names))
assert.Equal(t, "a-collection", names[0])
})
t.Run("Test createCollection", func(t *testing.T) {
hasColletion := metaService.replica.hasCollection(0)
assert.False(t, hasColletion)
err := metaService.createCollection("a-collection")
assert.NoError(t, err)
hasColletion = metaService.replica.hasCollection(0)
assert.True(t, hasColletion)
})
t.Run("Test loadCollections", func(t *testing.T) {
hasColletion := metaService.replica.hasCollection(1)
assert.False(t, hasColletion)
mFactory.setCollectionID(1)
mFactory.setCollectionName("a-collection-1")
err := metaService.loadCollections()
assert.NoError(t, err)
hasColletion = metaService.replica.hasCollection(1)
assert.True(t, hasColletion)
hasColletion = metaService.replica.hasCollection(0)
assert.True(t, hasColletion)
})
metaService := newMetaService(ctx, replica)
metaService.start()
}
func TestMetaService_getCollectionObjId(t *testing.T) {
var key = "/collection/collection0"
var collectionObjID1 = GetCollectionObjID(key)
assert.Equal(t, collectionObjID1, "/collection/collection0")
key = "fakeKey"
var collectionObjID2 = GetCollectionObjID(key)
assert.Equal(t, collectionObjID2, "fakeKey")
}
func TestMetaService_isCollectionObj(t *testing.T) {
var key = Params.MetaRootPath + "/collection/collection0"
var b1 = isCollectionObj(key)
assert.Equal(t, b1, true)
key = Params.MetaRootPath + "/segment/segment0"
var b2 = isCollectionObj(key)
assert.Equal(t, b2, false)
}
func TestMetaService_processCollectionCreate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
replica := newReplica()
metaService := newMetaService(ctx, replica)
defer cancel()
id := "0"
value := `schema: <
name: "test"
fields: <
fieldID:100
name: "vec"
data_type: VECTOR_FLOAT
type_params: <
key: "dim"
value: "16"
>
index_params: <
key: "metric_type"
value: "L2"
>
>
fields: <
fieldID:101
name: "age"
data_type: INT32
type_params: <
key: "dim"
value: "1"
>
>
>
segmentIDs: 0
partition_tags: "default"
`
metaService.processCollectionCreate(id, value)
collectionNum := replica.getCollectionNum()
assert.Equal(t, collectionNum, 1)
collection, err := replica.getCollectionByName("test")
assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0))
}
func TestMetaService_loadCollections(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
replica := newReplica()
metaService := newMetaService(ctx, replica)
err2 := (*metaService).loadCollections()
assert.Nil(t, err2)
}
......@@ -5,6 +5,7 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
......@@ -56,63 +57,93 @@ func (c *GrpcClient) Stop() error {
}
func (c *GrpcClient) GetComponentStates() (*internalpb2.ComponentStates, error) {
return c.grpcClient.GetComponentStatesRPC(context.Background(), &commonpb.Empty{})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.GetComponentStatesRPC(ctx, &commonpb.Empty{})
}
//DDL request
func (c *GrpcClient) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
return c.grpcClient.CreateCollection(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.CreateCollection(ctx, in)
}
func (c *GrpcClient) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
return c.grpcClient.DropCollection(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DropCollection(ctx, in)
}
func (c *GrpcClient) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
return c.grpcClient.HasCollection(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.HasCollection(ctx, in)
}
func (c *GrpcClient) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return c.grpcClient.DescribeCollection(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DescribeCollection(ctx, in)
}
func (c *GrpcClient) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
return c.grpcClient.ShowCollections(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.ShowCollections(ctx, in)
}
func (c *GrpcClient) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
return c.grpcClient.CreatePartition(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.CreatePartition(ctx, in)
}
func (c *GrpcClient) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
return c.grpcClient.DropPartition(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DropPartition(ctx, in)
}
func (c *GrpcClient) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
return c.grpcClient.HasPartition(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.HasPartition(ctx, in)
}
func (c *GrpcClient) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
return c.grpcClient.ShowPartitions(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.ShowPartitions(ctx, in)
}
//index builder service
func (c *GrpcClient) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
return c.grpcClient.CreateIndex(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.CreateIndex(ctx, in)
}
func (c *GrpcClient) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
return c.grpcClient.DescribeIndex(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DescribeIndex(ctx, in)
}
//global timestamp allocator
func (c *GrpcClient) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) {
return c.grpcClient.AllocTimestamp(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.AllocTimestamp(ctx, in)
}
func (c *GrpcClient) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) {
return c.grpcClient.AllocID(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.AllocID(ctx, in)
}
//receiver time tick from proxy service, and put it into this channel
func (c *GrpcClient) GetTimeTickChannel() (string, error) {
rsp, err := c.grpcClient.GetTimeTickChannelRPC(context.Background(), &commonpb.Empty{})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
rsp, err := c.grpcClient.GetTimeTickChannelRPC(ctx, &commonpb.Empty{})
if err != nil {
return "", err
}
......@@ -124,7 +155,9 @@ func (c *GrpcClient) GetTimeTickChannel() (string, error) {
//receive ddl from rpc and time tick from proxy service, and put them into this channel
func (c *GrpcClient) GetDdChannel() (string, error) {
rsp, err := c.grpcClient.GetDdChannelRPC(context.Background(), &commonpb.Empty{})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
rsp, err := c.grpcClient.GetDdChannelRPC(ctx, &commonpb.Empty{})
if err != nil {
return "", err
}
......@@ -136,7 +169,9 @@ func (c *GrpcClient) GetDdChannel() (string, error) {
//just define a channel, not used currently
func (c *GrpcClient) GetStatisticsChannel() (string, error) {
rsp, err := c.grpcClient.GetStatisticsChannelRPC(context.Background(), &commonpb.Empty{})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
rsp, err := c.grpcClient.GetStatisticsChannelRPC(ctx, &commonpb.Empty{})
if err != nil {
return "", err
}
......@@ -147,9 +182,13 @@ func (c *GrpcClient) GetStatisticsChannel() (string, error) {
}
func (c *GrpcClient) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
return c.grpcClient.DescribeSegment(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.DescribeSegment(ctx, in)
}
func (c *GrpcClient) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
return c.grpcClient.ShowSegments(context.Background(), in)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(cms.Params.Timeout))
defer cancel()
return c.grpcClient.ShowSegments(ctx, in)
}
package masterservice
import (
"context"
"fmt"
"math/rand"
"regexp"
......@@ -26,7 +27,7 @@ func TestGrpcService(t *testing.T) {
//cms.Params.Address = "127.0.0.1"
cms.Params.Port = (randVal % 100) + 10000
svr, err := NewGrpcServer()
svr, err := NewGrpcServer(context.Background())
assert.Nil(t, err)
// cms.Params.NodeID = 0
......
......@@ -6,6 +6,7 @@ import (
"net"
"sync"
"github.com/zilliztech/milvus-distributed/internal/errors"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
......@@ -26,10 +27,10 @@ type GrpcServer struct {
cancel context.CancelFunc
}
func NewGrpcServer() (*GrpcServer, error) {
func NewGrpcServer(ctx context.Context) (*GrpcServer, error) {
s := &GrpcServer{}
var err error
s.ctx, s.cancel = context.WithCancel(context.Background())
s.ctx, s.cancel = context.WithCancel(ctx)
if s.core, err = cms.NewCore(s.ctx); err != nil {
return nil, err
}
......@@ -73,6 +74,30 @@ func (s *GrpcServer) Stop() error {
return err
}
func (s *GrpcServer) SetProxyService(p cms.ProxyServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set proxy service failed")
}
return c.SetProxyService(p)
}
func (s *GrpcServer) SetDataService(p cms.DataServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set data service failed")
}
return c.SetDataService(p)
}
func (s *GrpcServer) SetIndexService(p cms.IndexServiceInterface) error {
c, ok := s.core.(*cms.Core)
if !ok {
return errors.Errorf("set index service failed")
}
return c.SetIndexService(p)
}
func (s *GrpcServer) GetComponentStatesRPC(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.core.GetComponentStates()
}
......
......@@ -2,6 +2,7 @@ package masterservice
import (
"context"
"fmt"
"log"
"math/rand"
"strconv"
......@@ -735,6 +736,13 @@ func (c *Core) GetStatisticsChannel() (string, error) {
}
func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &CreateCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -758,6 +766,13 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb
}
func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &DropCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -780,6 +795,16 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta
}
func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
Value: false,
}, nil
}
t := &HasCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -809,6 +834,17 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR
}
func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
Schema: nil,
CollectionID: 0,
}, nil
}
t := &DescribeCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -836,6 +872,16 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv
}
func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.ShowCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
CollectionNames: nil,
}, nil
}
t := &ShowCollectionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -865,6 +911,13 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh
}
func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &CreatePartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -887,6 +940,13 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S
}
func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &DropPartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -909,6 +969,16 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu
}
func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
Value: false,
}, nil
}
t := &HasPartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -938,6 +1008,17 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes
}
func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.ShowPartitionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
PartitionNames: nil,
PartitionIDs: nil,
}, nil
}
t := &ShowPartitionReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -968,6 +1049,13 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show
}
func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
}, nil
}
t := &CreateIndexReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -990,6 +1078,16 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e
}
func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
IndexDescriptions: nil,
}, nil
}
t := &DescribeIndexReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -1020,6 +1118,16 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr
}
func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.DescribeSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
IndexID: 0,
}, nil
}
t := &DescribeSegmentReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......@@ -1050,6 +1158,16 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D
}
func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) {
code := c.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.ShowSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]),
},
SegmentIDs: nil,
}, nil
}
t := &ShowSegmentReqTask{
baseReqTask: baseReqTask{
cv: make(chan error),
......
......@@ -27,6 +27,8 @@ type ParamTable struct {
MaxPartitionNum int64
DefaultPartitionName string
DefaultIndexName string
Timeout int
}
func (p *ParamTable) Init() {
......@@ -54,6 +56,8 @@ func (p *ParamTable) Init() {
p.initMaxPartitionNum()
p.initDefaultPartitionName()
p.initDefaultIndexName()
p.initTimeout()
}
func (p *ParamTable) initAddress() {
......@@ -163,3 +167,7 @@ func (p *ParamTable) initDefaultIndexName() {
}
p.DefaultIndexName = name
}
func (p *ParamTable) initTimeout() {
p.Timeout = p.ParseInt("master.timeout")
}
......@@ -50,4 +50,7 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.DefaultIndexName, "")
t.Logf("default index name = %s", Params.DefaultIndexName)
assert.NotZero(t, Params.Timeout)
t.Logf("master timeout = %d", Params.Timeout)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment