diff --git a/build/docker/deploy/datanode/DockerFile b/build/docker/deploy/datanode/Dockerfile similarity index 100% rename from build/docker/deploy/datanode/DockerFile rename to build/docker/deploy/datanode/Dockerfile diff --git a/build/docker/deploy/dataservice/DockerFile b/build/docker/deploy/dataservice/Dockerfile similarity index 100% rename from build/docker/deploy/dataservice/DockerFile rename to build/docker/deploy/dataservice/Dockerfile diff --git a/build/docker/deploy/docker-compose.yml b/build/docker/deploy/docker-compose.yml index 8fc702d051608bcf26737a7245d03a0a24355b53..dadadf66a658db9a49e16097b7df62544fae7978 100644 --- a/build/docker/deploy/docker-compose.yml +++ b/build/docker/deploy/docker-compose.yml @@ -5,7 +5,7 @@ services: image: ${TARGET_REPO}/proxyservice:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/proxyservice/DockerFile + dockerfile: build/docker/deploy/proxyservice/Dockerfile cache_from: - ${SOURCE_REPO}/proxyservice:${SOURCE_TAG} networks: @@ -15,7 +15,7 @@ services: image: ${TARGET_REPO}/proxynode:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/proxynode/DockerFile + dockerfile: build/docker/deploy/proxynode/Dockerfile cache_from: - ${SOURCE_REPO}/proxynode:${SOURCE_TAG} environment: @@ -30,7 +30,7 @@ services: image: ${TARGET_REPO}/queryservice:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/queryservice/DockerFile + dockerfile: build/docker/deploy/queryservice/Dockerfile cache_from: - ${SOURCE_REPO}/queryservice:${SOURCE_TAG} environment: @@ -45,7 +45,7 @@ services: image: ${TARGET_REPO}/querynode:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/querynode/DockerFile + dockerfile: build/docker/deploy/querynode/Dockerfile cache_from: - ${SOURCE_REPO}/querynode:${SOURCE_TAG} environment: @@ -60,7 +60,7 @@ services: image: ${TARGET_REPO}/datanode:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/datanode/DockerFile + dockerfile: build/docker/deploy/datanode/Dockerfile cache_from: - ${SOURCE_REPO}/datanode:${SOURCE_TAG} environment: @@ -75,7 +75,7 @@ services: image: ${TARGET_REPO}/indexservice:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/indexservice/DockerFile + dockerfile: build/docker/deploy/indexservice/Dockerfile cache_from: - ${SOURCE_REPO}/indexservice:${SOURCE_TAG} environment: @@ -89,7 +89,7 @@ services: image: ${TARGET_REPO}/indexnode:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/indexnode/DockerFile + dockerfile: build/docker/deploy/indexnode/Dockerfile cache_from: - ${SOURCE_REPO}/indexnode:${SOURCE_TAG} environment: diff --git a/build/docker/deploy/indexbuilder/DockerFile b/build/docker/deploy/indexbuilder/Dockerfile similarity index 100% rename from build/docker/deploy/indexbuilder/DockerFile rename to build/docker/deploy/indexbuilder/Dockerfile diff --git a/build/docker/deploy/indexnode/DockerFile b/build/docker/deploy/indexnode/Dockerfile similarity index 100% rename from build/docker/deploy/indexnode/DockerFile rename to build/docker/deploy/indexnode/Dockerfile diff --git a/build/docker/deploy/indexservice/DockerFile b/build/docker/deploy/indexservice/Dockerfile similarity index 100% rename from build/docker/deploy/indexservice/DockerFile rename to build/docker/deploy/indexservice/Dockerfile diff --git a/build/docker/deploy/masterservice/DockerFile b/build/docker/deploy/masterservice/Dockerfile similarity index 100% rename from build/docker/deploy/masterservice/DockerFile rename to build/docker/deploy/masterservice/Dockerfile diff --git a/build/docker/deploy/proxynode/DockerFile b/build/docker/deploy/proxynode/Dockerfile similarity index 100% rename from build/docker/deploy/proxynode/DockerFile rename to build/docker/deploy/proxynode/Dockerfile diff --git a/build/docker/deploy/proxyservice/DockerFile b/build/docker/deploy/proxyservice/Dockerfile similarity index 100% rename from build/docker/deploy/proxyservice/DockerFile rename to build/docker/deploy/proxyservice/Dockerfile diff --git a/build/docker/deploy/querynode/DockerFile b/build/docker/deploy/querynode/Dockerfile similarity index 100% rename from build/docker/deploy/querynode/DockerFile rename to build/docker/deploy/querynode/Dockerfile diff --git a/build/docker/deploy/queryservice/DockerFile b/build/docker/deploy/queryservice/Dockerfile similarity index 100% rename from build/docker/deploy/queryservice/DockerFile rename to build/docker/deploy/queryservice/Dockerfile diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index fcb2c5cce87aaeca457e041f04769ea6afd50192..5b1c54e09e8accdb6ef8f923fa7103f0ddc751eb 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -94,6 +94,7 @@ type ( ddChannelName string segmentInfoStream msgstream.MsgStream insertChannels []string + ttBarrier timesync.TimeTickBarrier } ) @@ -177,23 +178,23 @@ func (s *Server) initSegmentInfoChannel() { s.segmentInfoStream.Start() } func (s *Server) initMsgProducer() error { + var err error factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - ttMsgStream, _ := factory.NewMsgStream(s.ctx) - ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName) - s.ttMsgStream = ttMsgStream + if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil { + return err + } + s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName) s.ttMsgStream.Start() - timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs()) - dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster) - k2sStream, _ := factory.NewMsgStream(s.ctx) - k2sStream.AsProducer(Params.K2SChannelNames) - s.k2sMsgStream = k2sStream + if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil { + return err + } + s.k2sMsgStream.AsProducer(Params.K2SChannelNames) s.k2sMsgStream.Start() + dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster) k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream) - producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher) - if err != nil { + if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil { return err } - s.msgProducer = producer s.msgProducer.Start(s.ctx) return nil } @@ -297,10 +298,11 @@ func (s *Server) checkMasterIsHealthy() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(3) + s.serverLoopWg.Add(4) go s.startStatsChannel(s.serverLoopCtx) go s.startSegmentFlushChannel(s.serverLoopCtx) go s.startDDChannel(s.serverLoopCtx) + go s.startTTBarrier(s.serverLoopCtx) } func (s *Server) startStatsChannel(ctx context.Context) { @@ -388,6 +390,12 @@ func (s *Server) startDDChannel(ctx context.Context) { } } +func (s *Server) startTTBarrier(ctx context.Context) { + defer s.serverLoopWg.Done() + s.ttBarrier = timesync.NewHardTimeTickBarrier(ctx, s.ttMsgStream, s.cluster.GetNodeIDs()) + s.ttBarrier.StartBackgroundLoop() +} + func (s *Server) waitDataNodeRegister() { log.Println("waiting data node to register") <-s.registerFinishCh diff --git a/internal/proxynode/meta_cache.go b/internal/proxynode/meta_cache.go index c22e42bc422a9ed4a9b45b8bd8f572d93f750603..2362f1432ec410c3dd6320a5b4f202a584c60863 100644 --- a/internal/proxynode/meta_cache.go +++ b/internal/proxynode/meta_cache.go @@ -20,20 +20,19 @@ type Cache interface { GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) RemoveCollection(collectionName string) - RemovePartition(partitionName string) + RemovePartition(collectionName string, partitionName string) } type collectionInfo struct { - collID typeutil.UniqueID - schema *schemapb.CollectionSchema + collID typeutil.UniqueID + schema *schemapb.CollectionSchema + partInfo map[string]typeutil.UniqueID } type MetaCache struct { client MasterClientInterface collInfo map[string]*collectionInfo - partInfo map[string]typeutil.UniqueID - col2par map[string][]string mu sync.RWMutex } @@ -52,8 +51,6 @@ func NewMetaCache(client MasterClientInterface) (*MetaCache, error) { return &MetaCache{ client: client, collInfo: map[string]*collectionInfo{}, - partInfo: map[string]typeutil.UniqueID{}, - col2par: map[string][]string{}, }, nil } @@ -79,11 +76,16 @@ func (m *MetaCache) readCollectionSchema(collectionName string) (*schemapb.Colle return collInfo.schema, nil } -func (m *MetaCache) readPartitionID(partitionName string) (typeutil.UniqueID, error) { +func (m *MetaCache) readPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) { m.mu.RLock() defer m.mu.RUnlock() - partitionID, ok := m.partInfo[partitionName] + collInfo, ok := m.collInfo[collectionName] + if !ok { + return 0, errors.Errorf("can't find collection name:%s", collectionName) + } + + partitionID, ok := collInfo.partInfo[partitionName] if !ok { return 0, errors.Errorf("can't find partition name:%s", partitionName) } @@ -112,15 +114,14 @@ func (m *MetaCache) GetCollectionID(collectionName string) (typeutil.UniqueID, e return 0, errors.Errorf("%s", coll.Status.Reason) } - collInfo := &collectionInfo{ - collID: coll.CollectionID, - schema: coll.Schema, - } _, ok := m.collInfo[collectionName] if !ok { - m.collInfo[collectionName] = collInfo + m.collInfo[collectionName] = &collectionInfo{} } - return collInfo.collID, nil + m.collInfo[collectionName].schema = coll.Schema + m.collInfo[collectionName].collID = coll.CollectionID + + return m.collInfo[collectionName].collID, nil } func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.CollectionSchema, error) { collSchema, err := m.readCollectionSchema(collectionName) @@ -144,19 +145,18 @@ func (m *MetaCache) GetCollectionSchema(collectionName string) (*schemapb.Collec return nil, errors.Errorf("%s", coll.Status.Reason) } - collInfo := &collectionInfo{ - collID: coll.CollectionID, - schema: coll.Schema, - } _, ok := m.collInfo[collectionName] if !ok { - m.collInfo[collectionName] = collInfo + m.collInfo[collectionName] = &collectionInfo{} } - return collInfo.schema, nil + m.collInfo[collectionName].schema = coll.Schema + m.collInfo[collectionName].collID = coll.CollectionID + + return m.collInfo[collectionName].schema, nil } func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) (typeutil.UniqueID, error) { - partitionID, err := m.readPartitionID(partitionName) + partitionID, err := m.readPartitionID(collectionName, partitionName) if err == nil { return partitionID, nil } @@ -180,34 +180,45 @@ func (m *MetaCache) GetPartitionID(collectionName string, partitionName string) return 0, errors.Errorf("partition ids len: %d doesn't equal Partition name len %d", len(partitions.PartitionIDs), len(partitions.PartitionNames)) } - m.col2par[collectionName] = partitions.PartitionNames + + _, ok := m.collInfo[collectionName] + if !ok { + m.collInfo[collectionName] = &collectionInfo{ + partInfo: map[string]typeutil.UniqueID{}, + } + } + partInfo := m.collInfo[collectionName].partInfo for i := 0; i < len(partitions.PartitionIDs); i++ { - _, ok := m.partInfo[partitions.PartitionNames[i]] + _, ok := partInfo[partitions.PartitionNames[i]] if !ok { - m.partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i] + partInfo[partitions.PartitionNames[i]] = partitions.PartitionIDs[i] } } - _, ok := m.partInfo[partitionName] + _, ok = partInfo[partitionName] if !ok { return 0, errors.Errorf("partitionID of partitionName:%s can not be find", partitionName) } - return m.partInfo[partitionName], nil + return partInfo[partitionName], nil } func (m *MetaCache) RemoveCollection(collectionName string) { m.mu.Lock() defer m.mu.Unlock() delete(m.collInfo, collectionName) - for _, partitionName := range m.col2par[collectionName] { - delete(m.partInfo, partitionName) - } - delete(m.col2par, collectionName) } -func (m *MetaCache) RemovePartition(partitionName string) { +func (m *MetaCache) RemovePartition(collectionName, partitionName string) { m.mu.Lock() defer m.mu.Unlock() - delete(m.partInfo, partitionName) + _, ok := m.collInfo[collectionName] + if !ok { + return + } + partInfo := m.collInfo[collectionName].partInfo + if partInfo == nil { + return + } + delete(partInfo, partitionName) } diff --git a/internal/timesync/time_sync_producer.go b/internal/timesync/time_sync_producer.go index 55f74dfacb7af4b95605161bc238d8a3e8011b75..1fc4288ff2ebf18c73c5f7556927f24544dc628d 100644 --- a/internal/timesync/time_sync_producer.go +++ b/internal/timesync/time_sync_producer.go @@ -63,19 +63,13 @@ func (producer *MsgProducer) broadcastMsg() { func (producer *MsgProducer) Start(ctx context.Context) { producer.ctx, producer.cancel = context.WithCancel(ctx) - producer.wg.Add(2 + len(producer.watchers)) - go producer.startTTBarrier() + producer.wg.Add(1 + len(producer.watchers)) for _, watcher := range producer.watchers { go producer.startWatcher(watcher) } go producer.broadcastMsg() } -func (producer *MsgProducer) startTTBarrier() { - defer producer.wg.Done() - producer.ttBarrier.StartBackgroundLoop(producer.ctx) -} - func (producer *MsgProducer) startWatcher(watcher TimeTickWatcher) { defer producer.wg.Done() watcher.StartBackgroundLoop(producer.ctx) diff --git a/internal/timesync/timesync.go b/internal/timesync/timesync.go index 4f74aca5a798180c8b69868d2b029fa80cde61dd..9df5c1e93222291ced1b6a0703539ce67b8c0f46 100644 --- a/internal/timesync/timesync.go +++ b/internal/timesync/timesync.go @@ -18,7 +18,7 @@ type ( TimeTickBarrier interface { GetTimeTick() (Timestamp, error) - StartBackgroundLoop(ctx context.Context) + StartBackgroundLoop() } softTimeTickBarrier struct { @@ -38,7 +38,7 @@ type ( } ) -func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier { +func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier { if len(peerIds) <= 0 { log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n") return nil @@ -49,6 +49,7 @@ func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInte sttbarrier.ttStream = ttStream sttbarrier.outTt = make(chan Timestamp, 1024) sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp) + sttbarrier.ctx = ctx for _, id := range peerIds { sttbarrier.peer2LastTt[id] = Timestamp(0) } @@ -79,12 +80,11 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { } } -func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) { - ttBarrier.ctx = ctx +func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop() { for { select { - case <-ctx.Done(): - log.Printf("[TtBarrierStart] %s\n", ctx.Err()) + case <-ttBarrier.ctx.Done(): + log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) return default: } @@ -137,14 +137,13 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) { } } -func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) { - ttBarrier.ctx = ctx +func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop() { // Last timestamp synchronized state := Timestamp(0) for { select { - case <-ctx.Done(): - log.Printf("[TtBarrierStart] %s\n", ctx.Err()) + case <-ttBarrier.ctx.Done(): + log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) return default: } @@ -188,7 +187,7 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp { return tempMin } -func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier { +func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier { if len(peerIds) <= 0 { log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!") return nil @@ -199,6 +198,7 @@ func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTime sttbarrier.outTt = make(chan Timestamp, 1024) sttbarrier.peer2Tt = make(map[UniqueID]Timestamp) + sttbarrier.ctx = ctx for _, id := range peerIds { sttbarrier.peer2Tt[id] = Timestamp(0) }