diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index e577c525c89969c9ac6d7d4d23e81418c5c79a86..5599216bede48787d937a7edb66382dbf817e670 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -56,11 +56,12 @@ type ( } DataNode struct { - ctx context.Context - cancel context.CancelFunc - NodeID UniqueID - Role string - State internalpb2.StateCode + ctx context.Context + cancel context.CancelFunc + NodeID UniqueID + Role string + State internalpb2.StateCode + watchDm chan struct{} dataSyncService *dataSyncService metaService *metaService @@ -81,11 +82,13 @@ func NewDataNode(ctx context.Context) *DataNode { Params.Init() ctx2, cancel2 := context.WithCancel(ctx) node := &DataNode{ - ctx: ctx2, - cancel: cancel2, - NodeID: Params.NodeID, // GOOSE TODO: How to init - Role: typeutil.DataNodeRole, - State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic + ctx: ctx2, + cancel: cancel2, + NodeID: Params.NodeID, // GOOSE TODO: How to init + Role: typeutil.DataNodeRole, + State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic + watchDm: make(chan struct{}), + dataSyncService: nil, metaService: nil, masterService: nil, @@ -135,6 +138,13 @@ func (node *DataNode) Init() error { return errors.Errorf("Register node failed: %v", err) } + select { + case <-time.After(RPCConnectionTimeout): + return errors.New("Get DmChannels failed in 30 seconds") + case <-node.watchDm: + log.Println("insert channel names set") + } + for _, kv := range resp.InitParams.StartParams { switch kv.Key { case "DDChannelName": @@ -162,10 +172,10 @@ func (node *DataNode) Init() error { node.flushChan = make(chan *flushMsg, chanSize) node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc) + node.dataSyncService.init() node.metaService = newMetaService(node.ctx, replica, node.masterService) node.replica = replica - node.dataSyncService.initNodes() // --- Opentracing --- cfg := &config.Configuration{ @@ -191,14 +201,9 @@ func (node *DataNode) Init() error { func (node *DataNode) Start() error { node.metaService.init() - return nil -} - -// DataNode is HEALTHY until StartSync() is called -func (node *DataNode) StartSync() { - node.dataSyncService.init() go node.dataSyncService.start() node.State = internalpb2.StateCode_HEALTHY + return nil } func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { @@ -219,7 +224,7 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common default: Params.InsertChannelNames = in.GetChannelNames() status.ErrorCode = commonpb.ErrorCode_SUCCESS - node.StartSync() + node.watchDm <- struct{}{} return status, nil } } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 2c605b93f4fae2c5dd6ac6e141c61ab4efde1f0f..9821405c9f594b0004bf8fa400ffe8271c8e1ab7 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -76,7 +76,6 @@ func (s *Server) Start() error { func (s *Server) Stop() error { err := s.core.Stop() - s.cancel() s.grpcServer.GracefulStop() return err } diff --git a/internal/proxynode/meta_cache.go b/internal/proxynode/meta_cache.go index 2362f1432ec410c3dd6320a5b4f202a584c60863..c22e42bc422a9ed4a9b45b8bd8f572d93f750603 100644 --- a/internal/proxynode/meta_cache.go +++ b/internal/proxynode/meta_cache.go @@ -20,19 +20,20 @@ type Cache interface { GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) RemoveCollection(collectionName string) - RemovePartition(collectionName string, partitionName string) + RemovePartition(partitionName string) } type collectionInfo struct { - collID typeutil.UniqueID - schema *schemapb.CollectionSchema - partInfo map[string]typeutil.UniqueID + collID typeutil.UniqueID + schema *schemapb.CollectionSchema } type MetaCache struct { client MasterClientInterface collInfo map[string]*collectionInfo + partInfo map[string]typeutil.UniqueID + col2par map[string][]string mu sync.RWMutex } @@ -51,6 +52,8 @@ func NewMetaCache(client MasterClientInterface) (*MetaCache, error) { return &MetaCache{ client: client, collInfo: map[string]*collectionInfo{}, + partInfo: map[string]typeutil.UniqueID{}, + col2par: map[string][]string{}, }, nil } @@ -76,16 +79,11 @@ func (m *MetaCache) readCollectionSchema(collectionName string) (*schemapb.Colle return collInfo.schema, nil } -func (m *MetaCache) readPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) { +func (m *MetaCache) readPartitionID(partitionName string) (typeutil.UniqueID, error) { m.mu.RLock() defer m.mu.RUnlock() - collInfo, ok := m.collInfo[collectionName] - if !ok { - return 0, errors.Errorf("can't find collection name:%s", collectionName) - } - - partitionID, ok := collInfo.partInfo[partitionName] + partitionID, ok := m.partInfo[partitionName] if !ok { return 0, errors.Errorf("can't find partition name:%s", partitionName) } @@ -114,14 +112,15 @@ func (m *MetaCache) GetCollectionID(collectionName string) (typeutil.UniqueID, e return 0, errors.Errorf("%s", coll.Status.Reason) } + collInfo := &collectionInfo{ + collID: coll.CollectionID, + schema: coll.Schema, + } _, ok := m.collInfo[collectionName] if !ok { - m.collInfo[collectionName] = &collectionInfo{} + m.collInfo[collectionName] = collInfo } - m.collInfo[collectionName].schema = coll.Schema - m.collInfo[collectionName].collID = coll.CollectionID - - return m.collInfo[collectionName].collID, nil + return collInfo.collID, nil } func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) { collSchema, err := m.readCollectionSchema(collectionName) @@ -145,18 +144,19 @@ func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.Collec return nil, errors.Errorf("%s", coll.Status.Reason) } + collInfo := &collectionInfo{ + collID: coll.CollectionID, + schema: coll.Schema, + } _, ok := m.collInfo[collectionName] if !ok { - m.collInfo[collectionName] = &collectionInfo{} + m.collInfo[collectionName] = collInfo } - m.collInfo[collectionName].schema = coll.Schema - m.collInfo[collectionName].collID = coll.CollectionID - - return m.collInfo[collectionName].schema, nil + return collInfo.schema, nil } func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) { - partitionID, err := m.readPartitionID(collectionName, partitionName) + partitionID, err := m.readPartitionID(partitionName) if err == nil { return partitionID, nil } @@ -180,45 +180,34 @@ func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) return 0, errors.Errorf("partition ids len: %d doesn't equal Partition name len %d", len(partitions.PartitionIDs), len(partitions.PartitionNames)) } - - _, ok := m.collInfo[collectionName] - if !ok { - m.collInfo[collectionName] = &collectionInfo{ - partInfo: map[string]typeutil.UniqueID{}, - } - } - partInfo := m.collInfo[collectionName].partInfo + m.col2par[collectionName] = partitions.PartitionNames for i := 0; i < len(partitions.PartitionIDs); i++ { - _, ok := partInfo[partitions.PartitionNames[i]] + _, ok := m.partInfo[partitions.PartitionNames[i]] if !ok { - partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i] + m.partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i] } } - _, ok = partInfo[partitionName] + _, ok := m.partInfo[partitionName] if !ok { return 0, errors.Errorf("partitionID of partitionName:%s can not be find", partitionName) } + return m.partInfo[partitionName], nil - return partInfo[partitionName], nil } func (m *MetaCache) RemoveCollection(collectionName string) { m.mu.Lock() defer m.mu.Unlock() delete(m.collInfo, collectionName) + for _, partitionName := range m.col2par[collectionName] { + delete(m.partInfo, partitionName) + } + delete(m.col2par, collectionName) } -func (m *MetaCache) RemovePartition(collectionName, partitionName string) { +func (m *MetaCache) RemovePartition(partitionName string) { m.mu.Lock() defer m.mu.Unlock() - _, ok := m.collInfo[collectionName] - if !ok { - return - } - partInfo := m.collInfo[collectionName].partInfo - if partInfo == nil { - return - } - delete(partInfo, partitionName) + delete(m.partInfo, partitionName) }