diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index 9b5a912f53a665b5c4743b8fb49eea91eecb812a..5d4a49947f9ec64e974e6f9574251d5f5b74b280 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -51,17 +51,7 @@ type ( } ) -func newReplica() collectionReplica { - collections := make([]*Collection, 0) - segments := make([]*Segment, 0) - - var replica collectionReplica = &collectionReplicaImpl{ - collections: collections, - segments: segments, - } - return replica -} - +//----------------------------------------------------------------------------------------------------- collection func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -71,7 +61,7 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se return segment, nil } } - return nil, errors.Errorf("Cannot find segment, id = %v", segmentID) + return nil, errors.Errorf("cannot find segment, id = %v", segmentID) } func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collID UniqueID, @@ -173,7 +163,7 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc var newCollection = newCollection(collectionID, schema) colReplica.collections = append(colReplica.collections, newCollection) - log.Println("Create collection:", newCollection.Name()) + log.Println("Create collection: ", newCollection.Name()) return nil } @@ -187,25 +177,25 @@ func (colReplica *collectionReplicaImpl) getCollectionIDByName(collName string) return collection.ID(), nil } } - return 0, errors.Errorf("Cannot get collection ID by name %s: not exist", collName) + return 0, errors.Errorf("There is no collection name=%v", collName) } func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { + // GOOSE TODO: optimize colReplica.mu.Lock() defer colReplica.mu.Unlock() - length := len(colReplica.collections) - for index, col := range colReplica.collections { - if col.ID() == collectionID { - log.Println("Drop collection: ", col.Name()) - colReplica.collections[index] = colReplica.collections[length-1] - colReplica.collections = colReplica.collections[:length-1] - return nil + tmpCollections := make([]*Collection, 0) + for _, col := range colReplica.collections { + if col.ID() != collectionID { + tmpCollections = append(tmpCollections, col) + } else { + log.Println("Drop collection : ", col.Name()) } } - - return errors.Errorf("Cannot remove collection %d: not exist", collectionID) + colReplica.collections = tmpCollections + return nil } func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { @@ -217,7 +207,7 @@ func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID return collection, nil } } - return nil, errors.Errorf("Cannot get collection %d by ID: not exist", collectionID) + return nil, errors.Errorf("cannot find collection, id = %v", collectionID) } func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) { diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index 3b3e98e46a0088d2b96e25ad7bd3d85ab0df5270..c97d38dbddfbea284393e2a0f7b07fbafea1c106 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -5,11 +5,20 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) +func newReplica() collectionReplica { + collections := make([]*Collection, 0) + segments := make([]*Segment, 0) + + var replica collectionReplica = &collectionReplicaImpl{ + collections: collections, + segments: segments, + } + return replica +} + func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) { - // GOOSE TODO remove Factory := &MetaFactory{} collectionMeta := Factory.CollectionMetaFactory(collectionID, collectionName) @@ -24,133 +33,71 @@ func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName } -func TestReplica_Collection(t *testing.T) { - Factory := &MetaFactory{} - collMetaMock := Factory.CollectionMetaFactory(0, "collection0") - - t.Run("Test add collection", func(t *testing.T) { - - replica := newReplica() - assert.False(t, replica.hasCollection(0)) - num := replica.getCollectionNum() - assert.Equal(t, 0, num) - - err := replica.addCollection(0, collMetaMock.GetSchema()) - assert.NoError(t, err) - - assert.True(t, replica.hasCollection(0)) - num = replica.getCollectionNum() - assert.Equal(t, 1, num) - - coll, err := replica.getCollectionByID(0) - assert.NoError(t, err) - assert.NotNil(t, coll) - assert.Equal(t, UniqueID(0), coll.ID()) - assert.Equal(t, "collection0", coll.Name()) - assert.Equal(t, collMetaMock.GetSchema(), coll.Schema()) - - coll, err = replica.getCollectionByName("collection0") - assert.NoError(t, err) - assert.NotNil(t, coll) - assert.Equal(t, UniqueID(0), coll.ID()) - assert.Equal(t, "collection0", coll.Name()) - assert.Equal(t, collMetaMock.GetSchema(), coll.Schema()) - - collID, err := replica.getCollectionIDByName("collection0") - assert.NoError(t, err) - assert.Equal(t, UniqueID(0), collID) - - }) - - t.Run("Test remove collection", func(t *testing.T) { - replica := newReplica() - err := replica.addCollection(0, collMetaMock.GetSchema()) - require.NoError(t, err) - - numsBefore := replica.getCollectionNum() - coll, err := replica.getCollectionByID(0) - require.NotNil(t, coll) - require.NoError(t, err) - - err = replica.removeCollection(0) - assert.NoError(t, err) - numsAfter := replica.getCollectionNum() - assert.Equal(t, 1, numsBefore-numsAfter) - - coll, err = replica.getCollectionByID(0) - assert.Nil(t, coll) - assert.Error(t, err) - err = replica.removeCollection(999999999) - assert.Error(t, err) - }) - - t.Run("Test errors", func(t *testing.T) { - replica := newReplica() - require.False(t, replica.hasCollection(0)) - require.Equal(t, 0, replica.getCollectionNum()) - - coll, err := replica.getCollectionByName("Name-not-exist") - assert.Error(t, err) - assert.Nil(t, coll) - - coll, err = replica.getCollectionByID(0) - assert.Error(t, err) - assert.Nil(t, coll) - - collID, err := replica.getCollectionIDByName("Name-not-exist") - assert.Error(t, err) - assert.Zero(t, collID) - - err = replica.removeCollection(0) - assert.Error(t, err) - }) +//----------------------------------------------------------------------------------------------------- collection +func TestCollectionReplica_getCollectionNum(t *testing.T) { + replica := newReplica() + initTestReplicaMeta(t, replica, "collection0", 0, 0) + assert.Equal(t, replica.getCollectionNum(), 1) +} +func TestCollectionReplica_addCollection(t *testing.T) { + replica := newReplica() + initTestReplicaMeta(t, replica, "collection0", 0, 0) } -func TestReplica_Segment(t *testing.T) { - t.Run("Test segment", func(t *testing.T) { - replica := newReplica() - assert.False(t, replica.hasSegment(0)) +func TestCollectionReplica_removeCollection(t *testing.T) { + replica := newReplica() + initTestReplicaMeta(t, replica, "collection0", 0, 0) + assert.Equal(t, replica.getCollectionNum(), 1) - err := replica.addSegment(0, 1, 2, make([]*internalpb2.MsgPosition, 0)) - assert.NoError(t, err) - assert.True(t, replica.hasSegment(0)) + err := replica.removeCollection(0) + assert.NoError(t, err) + assert.Equal(t, replica.getCollectionNum(), 0) +} - seg, err := replica.getSegmentByID(0) - assert.NoError(t, err) - assert.NotNil(t, seg) - assert.Equal(t, UniqueID(1), seg.collectionID) - assert.Equal(t, UniqueID(2), seg.partitionID) +func TestCollectionReplica_getCollectionByID(t *testing.T) { + replica := newReplica() + collectionName := "collection0" + collectionID := UniqueID(0) + initTestReplicaMeta(t, replica, collectionName, collectionID, 0) + targetCollection, err := replica.getCollectionByID(collectionID) + assert.NoError(t, err) + assert.NotNil(t, targetCollection) + assert.Equal(t, targetCollection.Name(), collectionName) + assert.Equal(t, targetCollection.ID(), collectionID) +} - assert.Equal(t, int64(0), seg.numRows) +func TestCollectionReplica_getCollectionByName(t *testing.T) { + replica := newReplica() + collectionName := "collection0" + collectionID := UniqueID(0) + initTestReplicaMeta(t, replica, collectionName, collectionID, 0) - err = replica.updateStatistics(0, 100) - assert.NoError(t, err) - assert.Equal(t, int64(100), seg.numRows) + targetCollection, err := replica.getCollectionByName(collectionName) + assert.NoError(t, err) + assert.NotNil(t, targetCollection) + assert.Equal(t, targetCollection.Name(), collectionName) + assert.Equal(t, targetCollection.ID(), collectionID) - update, err := replica.getSegmentStatisticsUpdates(0) - assert.NoError(t, err) - assert.Equal(t, UniqueID(0), update.SegmentID) - assert.Equal(t, int64(100), update.NumRows) - assert.True(t, update.IsNewSegment) - }) +} - t.Run("Test errors", func(t *testing.T) { - replica := newReplica() - require.False(t, replica.hasSegment(0)) +func TestCollectionReplica_hasCollection(t *testing.T) { + replica := newReplica() + collectionName := "collection0" + collectionID := UniqueID(0) + initTestReplicaMeta(t, replica, collectionName, collectionID, 0) - seg, err := replica.getSegmentByID(0) - assert.Error(t, err) - assert.Nil(t, seg) + hasCollection := replica.hasCollection(collectionID) + assert.Equal(t, hasCollection, true) + hasCollection = replica.hasCollection(UniqueID(1)) + assert.Equal(t, hasCollection, false) - err = replica.removeSegment(0) - assert.Error(t, err) +} - err = replica.updateStatistics(0, 0) - assert.Error(t, err) +func TestCollectionReplica_freeAll(t *testing.T) { + replica := newReplica() + collectionName := "collection0" + collectionID := UniqueID(0) + initTestReplicaMeta(t, replica, collectionName, collectionID, 0) - update, err := replica.getSegmentStatisticsUpdates(0) - assert.Error(t, err) - assert.Nil(t, update) - }) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 9d834b79107e18d665579ddf6b81b6679d9017d0..500e0456f541eea4ebe6891c4347fb4403f851c8 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -16,7 +16,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) const ( @@ -39,6 +38,7 @@ type ( FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) SetMasterServiceInterface(ms MasterServiceInterface) error + SetDataServiceInterface(ds DataServiceInterface) error } @@ -55,6 +55,7 @@ type ( } DataNode struct { + // GOOSE TODO: complete interface with component ctx context.Context NodeID UniqueID Role string @@ -79,8 +80,8 @@ func NewDataNode(ctx context.Context) *DataNode { Params.Init() node := &DataNode{ ctx: ctx, - NodeID: Params.NodeID, // GOOSE TODO How to init - Role: typeutil.DataNodeRole, + NodeID: Params.NodeID, // GOOSE TODO + Role: "DataNode", // GOOSE TODO State: internalpb2.StateCode_INITIALIZING, dataSyncService: nil, metaService: nil, @@ -106,7 +107,7 @@ func (node *DataNode) Init() error { req := &datapb.RegisterNodeRequest{ Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_kNone, + MsgType: commonpb.MsgType_kNone, //GOOSE TODO SourceID: node.NodeID, }, Address: &commonpb.Address{ @@ -117,10 +118,11 @@ func (node *DataNode) Init() error { resp, err := node.dataService.RegisterNode(req) if err != nil { - return errors.Errorf("Register node failed: %v", err) + return errors.Errorf("Init failed: %v", err) } for _, kv := range resp.InitParams.StartParams { + log.Println(kv) switch kv.Key { case "DDChannelName": Params.DDChannelNames = []string{kv.Value} @@ -148,7 +150,7 @@ func (node *DataNode) Init() error { node.metaService = newMetaService(node.ctx, replica, node.masterService) node.replica = replica - // --- Opentracing --- + // Opentracing cfg := &config.Configuration{ ServiceName: "data_node", Sampler: &config.SamplerConfig{ @@ -165,6 +167,7 @@ func (node *DataNode) Init() error { } node.tracer = tracer node.closer = closer + opentracing.SetGlobalTracer(node.tracer) return nil @@ -180,14 +183,12 @@ func (node *DataNode) Start() error { } func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { - log.Println("Init insert channel names:", in.GetChannelNames()) Params.InsertChannelNames = append(Params.InsertChannelNames, in.GetChannelNames()...) return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil } func (node *DataNode) GetComponentStates() (*internalpb2.ComponentStates, error) { - log.Println("DataNode current state:", node.State) states := &internalpb2.ComponentStates{ State: &internalpb2.ComponentInfo{ NodeID: Params.NodeID, diff --git a/internal/datanode/factory.go b/internal/datanode/factory.go index c0fbebb3469cbbb0e4d2cc55bf09f52bc263549e..9152f37acfe6f5dfa8f17b86ffd9acb26cb72bd8 100644 --- a/internal/datanode/factory.go +++ b/internal/datanode/factory.go @@ -17,7 +17,6 @@ type ( } AllocatorFactory struct { - ID UniqueID } MasterServiceFactory struct { @@ -162,23 +161,9 @@ func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionNa return &collection } -func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory { - f := &AllocatorFactory{} - if len(id) == 1 { - f.ID = id[0] - } - return f -} - -func (alloc AllocatorFactory) setID(id UniqueID) { - alloc.ID = id -} - func (alloc AllocatorFactory) allocID() (UniqueID, error) { - if alloc.ID == 0 { - return UniqueID(0), nil // GOOSE TODO: random ID generating - } - return alloc.ID, nil + // GOOSE TODO: random ID generate + return UniqueID(0), nil } func (m *MasterServiceFactory) setID(id UniqueID) { diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index fad96d3a00bfa3610bf3d0421fe9e042bbb36b7b..45676f19b64e82145e328b4176bf847dbc4d7d16 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -112,7 +112,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { case commonpb.MsgType_kDropPartition: ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg)) default: - log.Println("Not supporting message type:", msg.Type()) + log.Println("Non supporting message type:", msg.Type()) } } @@ -132,7 +132,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { } default: - log.Println(". default: do nothing ...") + log.Println("..........default do nothing") } // generate binlog @@ -303,8 +303,8 @@ func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { } ddNode.ddRecords.partitionRecords[partitionID] = nil - partitionName := msg.PartitionName - ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName], + partitionTag := msg.PartitionName + ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], metaOperateRecord{ createOrDrop: true, timestamp: msg.Base.Timestamp, @@ -341,8 +341,8 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { } delete(ddNode.ddRecords.partitionRecords, partitionID) - partitionName := msg.PartitionName - ddNode.ddMsg.partitionRecords[partitionName] = append(ddNode.ddMsg.partitionRecords[partitionName], + partitionTag := msg.PartitionName + ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], metaOperateRecord{ createOrDrop: false, timestamp: msg.Base.Timestamp, diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 556c92253ce2f512059153798832813cb7e5bca2..35386b3dad78b2cee5834f83725e69afd655d23f 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -2,7 +2,6 @@ package datanode import ( "context" - "log" "testing" "time" @@ -36,57 +35,51 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { require.NoError(t, err) Params.MetaRootPath = testPath - // Params.FlushDdBufferSize = 4 + Params.FlushDdBufferSize = 4 replica := newReplica() - allocatorMock := NewAllocatorFactory() - ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, allocatorMock) - log.Print() + idFactory := AllocatorFactory{} + ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, idFactory) - collID := UniqueID(0) - collName := "col-test-0" + colID := UniqueID(0) + colName := "col-test-0" // create collection - createCollReq := internalpb2.CreateCollectionRequest{ + createColReq := internalpb2.CreateCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kCreateCollection, MsgID: 1, Timestamp: 1, SourceID: 1, }, - CollectionID: collID, - Schema: make([]byte, 0), - CollectionName: collName, - DbName: "DbName", - DbID: UniqueID(0), + CollectionID: colID, + Schema: make([]byte, 0), } - createCollMsg := msgstream.CreateCollectionMsg{ + createColMsg := msgstream.CreateCollectionMsg{ BaseMsg: msgstream.BaseMsg{ BeginTimestamp: Timestamp(1), EndTimestamp: Timestamp(1), HashValues: []uint32{uint32(0)}, }, - CreateCollectionRequest: createCollReq, + CreateCollectionRequest: createColReq, } // drop collection - dropCollReq := internalpb2.DropCollectionRequest{ + dropColReq := internalpb2.DropCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDropCollection, MsgID: 2, Timestamp: 2, SourceID: 2, }, - CollectionID: collID, - CollectionName: collName, - DbName: "DbName", - DbID: UniqueID(0), + CollectionID: colID, + CollectionName: colName, } - dropCollMsg := msgstream.DropCollectionMsg{ + dropColMsg := msgstream.DropCollectionMsg{ BaseMsg: msgstream.BaseMsg{ BeginTimestamp: Timestamp(2), EndTimestamp: Timestamp(2), HashValues: []uint32{uint32(0)}, }, - DropCollectionRequest: dropCollReq, + DropCollectionRequest: dropColReq, } partitionID := UniqueID(100) @@ -99,12 +92,10 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { Timestamp: 3, SourceID: 3, }, - CollectionID: collID, + CollectionID: colID, PartitionID: partitionID, - CollectionName: collName, + CollectionName: colName, PartitionName: partitionName, - DbName: "DbName", - DbID: UniqueID(0), } createPartitionMsg := msgstream.CreatePartitionMsg{ BaseMsg: msgstream.BaseMsg{ @@ -123,12 +114,10 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { Timestamp: 4, SourceID: 4, }, - CollectionID: collID, + CollectionID: colID, PartitionID: partitionID, - CollectionName: collName, + CollectionName: colName, PartitionName: partitionName, - DbName: "DbName", - DbID: UniqueID(0), } dropPartitionMsg := msgstream.DropPartitionMsg{ BaseMsg: msgstream.BaseMsg{ @@ -139,17 +128,16 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { DropPartitionRequest: dropPartitionReq, } - replica.addSegment(1, collID, partitionID, make([]*internalpb2.MsgPosition, 0)) inFlushCh <- &flushMsg{ - msgID: 5, - timestamp: 5, + msgID: 1, + timestamp: 6, segmentIDs: []UniqueID{1}, - collectionID: collID, + collectionID: UniqueID(1), } tsMessages := make([]msgstream.TsMsg, 0) - tsMessages = append(tsMessages, msgstream.TsMsg(&createCollMsg)) - tsMessages = append(tsMessages, msgstream.TsMsg(&dropCollMsg)) + tsMessages = append(tsMessages, msgstream.TsMsg(&createColMsg)) + tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg)) tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg)) msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3), make([]*internalpb2.MsgPosition, 0)) diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index 8a9561fae1469ec1de22196e40292c582804086d..182b7ec7e7d879639946f728249f76384f769ac9 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -27,7 +27,6 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ } func (mService *metaService) init() { - log.Println("Initing meta ...") err := mService.loadCollections() if err != nil { log.Fatal("metaService init failed:", err) diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index 0ffd225149221511b787b083ed8b6c64c6b70eff..a7434d108a90ab19f81565023bc44dfc14656cf5 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -10,8 +10,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - - "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" ) type ( @@ -21,7 +19,7 @@ type ( ip string port int64 } - client *datanode.Client + client DataNodeClient channelNum int } dataNodeCluster struct { @@ -38,18 +36,11 @@ func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster { } } -func (c *dataNodeCluster) Register(ip string, port int64, id int64) { +func (c *dataNodeCluster) Register(dataNode *dataNode) { c.mu.Lock() defer c.mu.Unlock() - if c.checkDataNodeNotExist(ip, port) { - c.nodes = append(c.nodes, &dataNode{ - id: id, - address: struct { - ip string - port int64 - }{ip: ip, port: port}, - channelNum: 0, - }) + if c.checkDataNodeNotExist(dataNode.address.ip, dataNode.address.port) { + c.nodes = append(c.nodes, dataNode) if len(c.nodes) == Params.DataNodeNum { close(c.finishCh) } @@ -125,3 +116,12 @@ func (c *dataNodeCluster) FlushSegment(request *datapb.FlushSegRequest) { } } } + +func (c *dataNodeCluster) ShutDownClients() { + for _, node := range c.nodes { + if err := node.client.Stop(); err != nil { + log.Println(err.Error()) + continue + } + } +} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 056aaf773fd794d3f53c6bddffee0a5d350d4278..80f57f258afc57a4cf39dc403c76ebe05a9bde4c 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -11,6 +11,8 @@ import ( "sync/atomic" "time" + "github.com/zilliztech/milvus-distributed/internal/distributed/datanode" + "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -61,6 +63,13 @@ type MasterClient interface { GetComponentStates() (*internalpb2.ComponentStates, error) } +type DataNodeClient interface { + WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) + GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) + FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) + Stop() error +} + type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp @@ -380,6 +389,7 @@ func (s *Server) waitDataNodeRegister() { } func (s *Server) Stop() error { + s.cluster.ShutDownClients() s.ttMsgStream.Close() s.k2sMsgStream.Close() s.msgProducer.Close() @@ -428,7 +438,11 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, } - s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID) + node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID) + if err != nil { + return nil, err + } + s.cluster.Register(node) if s.ddChannelName == "" { resp, err := s.masterClient.GetDdChannel() if err != nil { @@ -450,6 +464,25 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register return ret, nil } +func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, error) { + client := datanode.NewClient(fmt.Sprintf("%s:%d", ip, port)) + if err := client.Init(); err != nil { + return nil, err + } + if err := client.Start(); err != nil { + return nil, err + } + return &dataNode{ + id: id, + address: struct { + ip string + port int64 + }{ip: ip, port: port}, + client: client, + channelNum: 0, + }, nil +} + func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) { if !s.checkStateIsHealthy() { return &commonpb.Status{ diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index b0903bd48567f214040d89a11b45e0c7f8081a59..658cd2bf1df22ab8f2c2cdfba08401c2c69c7853 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -7,6 +7,7 @@ import ( "sync" dn "github.com/zilliztech/milvus-distributed/internal/datanode" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -66,6 +67,11 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error { } func (s *Server) Init() error { + err := s.core.Init() + if err != nil { + return errors.Errorf("Init failed: %v", err) + } + return s.core.Init() }