diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index 95f02d60c16d673ef37012f340ca32a5469b8647..c39a60a678db99e6ce18b1bd87e5210b4cc08ed4 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -192,7 +192,7 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { return errors.New("nil producer in msg stream") } reBucketValues := make([][]int32, len(tsMsgs)) - for channelID, tsMsg := range tsMsgs { + for idx, tsMsg := range tsMsgs { hashValues := tsMsg.HashKeys() bucketValues := make([]int32, len(hashValues)) for index, hashValue := range hashValues { @@ -203,12 +203,12 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { if channelIDInt >= int64(len(ms.producers)) { return errors.New("Failed to produce pulsar msg to unKnow channel") } - bucketValues[index] = int32(channelIDInt) + bucketValues[index] = int32(hashValue % uint32(len(ms.producers))) continue } bucketValues[index] = int32(hashValue % uint32(len(ms.producers))) } - reBucketValues[channelID] = bucketValues + reBucketValues[idx] = bucketValues } var result map[int32]*MsgPack diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go index e9d16be74317cbec225743618683a9aa13eceacc..c480319d1ed4be268abaaab147769db2d826623b 100644 --- a/internal/proxynode/insert_channels.go +++ b/internal/proxynode/insert_channels.go @@ -132,6 +132,22 @@ func (m *InsertChannelsMap) closeInsertMsgStream(collID UniqueID) error { return nil } +func (m *InsertChannelsMap) getInsertChannels(collID UniqueID) ([]string, error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + loc, ok := m.collectionID2InsertChannels[collID] + if !ok { + return nil, errors.New("cannot find collection with id: " + strconv.Itoa(int(collID))) + } + + if m.droppedBitMap[loc] != 0 { + return nil, errors.New("insert message stream already closed") + } + ret := append([]string(nil), m.insertChannels[loc]...) + return ret, nil +} + func (m *InsertChannelsMap) getInsertMsgStream(collID UniqueID) (msgstream.MsgStream, error) { m.mtx.RLock() defer m.mtx.RUnlock() diff --git a/internal/proxynode/repack_func.go b/internal/proxynode/repack_func.go index 9785b605d0743b14c26cb5ae54efbe6a723231fc..2593f87380db98ef1c36437534acea9e9f9752b2 100644 --- a/internal/proxynode/repack_func.go +++ b/internal/proxynode/repack_func.go @@ -23,7 +23,8 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, channelCountMap := make(map[UniqueID]map[int32]uint32) // reqID --> channelID to count channelMaxTSMap := make(map[UniqueID]map[int32]Timestamp) // reqID --> channelID to max Timestamp - reqSchemaMap := make(map[UniqueID][]string) + reqSchemaMap := make(map[UniqueID][]UniqueID) // reqID --> channelID [2]UniqueID {CollectionID, PartitionID} + channelNamesMap := make(map[UniqueID][]string) // collectionID --> channelNames for i, request := range tsMsgs { if request.Type() != commonpb.MsgType_kInsert { @@ -54,7 +55,7 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, } if _, ok := reqSchemaMap[reqID]; !ok { - reqSchemaMap[reqID] = []string{insertRequest.CollectionName, insertRequest.PartitionName} + reqSchemaMap[reqID] = []UniqueID{insertRequest.CollectionID, insertRequest.PartitionID} } for idx, channelID := range keys { @@ -68,6 +69,22 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, } } + collID := insertRequest.CollectionID + if _, ok := channelNamesMap[collID]; !ok { + channelNames, err := globalInsertChannelsMap.getInsertChannels(collID) + if err != nil { + return nil, err + } + channelNamesMap[collID] = channelNames + } + } + + var getChannelName = func(collID UniqueID, channelID int32) string { + if _, ok := channelNamesMap[collID]; !ok { + return "" + } + names := channelNamesMap[collID] + return names[channelID] } reqSegCountMap := make(map[UniqueID]map[int32]map[UniqueID]uint32) @@ -77,14 +94,18 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, reqSegCountMap[reqID] = make(map[int32]map[UniqueID]uint32) } schema := reqSchemaMap[reqID] - collName, partitionTag := schema[0], schema[1] + collID, partitionID := schema[0], schema[1] for channelID, count := range countInfo { ts, ok := channelMaxTSMap[reqID][channelID] if !ok { ts = typeutil.ZeroTimestamp log.Println("Warning: did not get max Timstamp!") } - mapInfo, err := segIDAssigner.GetSegmentID(collName, partitionTag, channelID, count, ts) + channelName := getChannelName(collID, channelID) + if channelName == "" { + return nil, errors.New("ProxyNode, repack_func, can not found channelName") + } + mapInfo, err := segIDAssigner.GetSegmentID(collID, partitionID, channelName, count, ts) if err != nil { return nil, err } @@ -156,7 +177,9 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, keys := hashKeys[i] reqID := insertRequest.Base.MsgID collectionName := insertRequest.CollectionName - partitionTag := insertRequest.PartitionName + collectionID := insertRequest.CollectionID + partitionID := insertRequest.PartitionID + partitionName := insertRequest.PartitionName proxyID := insertRequest.Base.SourceID for index, key := range keys { ts := insertRequest.Timestamps[index] @@ -175,13 +198,16 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, Timestamp: ts, SourceID: proxyID, }, + CollectionID: collectionID, + PartitionID: partitionID, CollectionName: collectionName, - PartitionName: partitionTag, + PartitionName: partitionName, SegmentID: segmentID, - ChannelID: strconv.FormatInt(int64(key), 10), - Timestamps: []uint64{ts}, - RowIDs: []int64{rowID}, - RowData: []*commonpb.Blob{row}, + // todo rename to ChannelName + ChannelID: strconv.FormatInt(int64(key), 10), + Timestamps: []uint64{ts}, + RowIDs: []int64{rowID}, + RowData: []*commonpb.Blob{row}, } insertMsg := &msgstream.InsertMsg{ InsertRequest: sliceRequest, diff --git a/internal/proxynode/segment.go b/internal/proxynode/segment.go index 7464cff8ab6de84bf814ecbb0569439461963044..1f4035c66942e9e53628ac121f93e44574a642ef 100644 --- a/internal/proxynode/segment.go +++ b/internal/proxynode/segment.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "log" - "strconv" "time" "github.com/zilliztech/milvus-distributed/internal/allocator" @@ -25,14 +24,12 @@ type Allocator = allocator.Allocator type segRequest struct { allocator.BaseRequest - count uint32 - colName string - partitionName string - collID UniqueID - partitionID UniqueID - segInfo map[UniqueID]uint32 - channelID int32 - timestamp Timestamp + count uint32 + collID UniqueID + partitionID UniqueID + segInfo map[UniqueID]uint32 + channelName string + timestamp Timestamp } type segInfo struct { @@ -44,9 +41,7 @@ type segInfo struct { type assignInfo struct { collID UniqueID partitionID UniqueID - collName string - partitionName string - channelID int32 + channelName string segID UniqueID segInfos *list.List segCapacity uint32 @@ -122,7 +117,7 @@ func (info *assignInfo) IsActive(now time.Time) bool { type SegIDAssigner struct { Allocator - assignInfos map[string]*list.List // collectionName -> *list.List + assignInfos map[UniqueID]*list.List // collectionID -> *list.List segReqs []*datapb.SegIDRequest getTickFunc func() Timestamp PeerID UniqueID @@ -140,7 +135,7 @@ func NewSegIDAssigner(ctx context.Context, client DataServiceClient, getTickFunc }, countPerRPC: SegCountPerRPC, serviceClient: client, - assignInfos: make(map[string]*list.List), + assignInfos: make(map[UniqueID]*list.List), getTickFunc: getTickFunc, } sa.TChan = &allocator.Ticker{ @@ -160,7 +155,6 @@ func (sa *SegIDAssigner) SetServiceClient(client DataServiceClient) { func (sa *SegIDAssigner) collectExpired() { ts := sa.getTickFunc() - //now := time.Now() for _, info := range sa.assignInfos { for e := info.Front(); e != nil; e = e.Next() { assign := e.Value.(*assignInfo) @@ -176,36 +170,33 @@ func (sa *SegIDAssigner) pickCanDoFunc() { if sa.ToDoReqs == nil { return } - records := make(map[string]map[string]map[int32]uint32) + records := make(map[UniqueID]map[UniqueID]map[string]uint32) newTodoReqs := sa.ToDoReqs[0:0] for _, req := range sa.ToDoReqs { segRequest := req.(*segRequest) - colName := segRequest.colName - partitionName := segRequest.partitionName - channelID := segRequest.channelID + collID := segRequest.collID + partitionID := segRequest.partitionID + channelName := segRequest.channelName - if _, ok := records[colName]; !ok { - records[colName] = make(map[string]map[int32]uint32) + if _, ok := records[collID]; !ok { + records[collID] = make(map[UniqueID]map[string]uint32) } - if _, ok := records[colName][partitionName]; !ok { - records[colName][partitionName] = make(map[int32]uint32) + if _, ok := records[collID][partitionID]; !ok { + records[collID][partitionID] = make(map[string]uint32) } - if _, ok := records[colName][partitionName][channelID]; !ok { - records[colName][partitionName][channelID] = 0 + if _, ok := records[collID][partitionID][channelName]; !ok { + records[collID][partitionID][channelName] = 0 } - records[colName][partitionName][channelID] += segRequest.count - assign := sa.getAssign(segRequest.colName, segRequest.partitionName, segRequest.channelID) - if assign == nil || assign.Capacity(segRequest.timestamp) < records[colName][partitionName][channelID] { - partitionID, _ := typeutil.Hash32String(segRequest.colName) + records[collID][partitionID][channelName] += segRequest.count + assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName) + if assign == nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] { sa.segReqs = append(sa.segReqs, &datapb.SegIDRequest{ - ChannelName: strconv.FormatUint(uint64(segRequest.channelID), 10), - Count: segRequest.count, - CollName: segRequest.colName, - PartitionName: segRequest.partitionName, - CollectionID: 0, - PartitionID: partitionID, + ChannelName: channelName, + Count: segRequest.count, + CollectionID: collID, + PartitionID: partitionID, }) newTodoReqs = append(newTodoReqs, req) } else { @@ -215,15 +206,15 @@ func (sa *SegIDAssigner) pickCanDoFunc() { sa.ToDoReqs = newTodoReqs } -func (sa *SegIDAssigner) getAssign(colName, partitionName string, channelID int32) *assignInfo { - assignInfos, ok := sa.assignInfos[colName] +func (sa *SegIDAssigner) getAssign(collID UniqueID, partitionID UniqueID, channelName string) *assignInfo { + assignInfos, ok := sa.assignInfos[collID] if !ok { return nil } for e := assignInfos.Front(); e != nil; e = e.Next() { info := e.Value.(*assignInfo) - if info.partitionName != partitionName || info.channelID != channelID { + if info.partitionID != partitionID || info.channelName != channelName { continue } return info @@ -244,7 +235,7 @@ func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegIDRequest) bool if req1 == req2 { return true } - return req1.CollName == req2.CollName && req1.PartitionName == req2.PartitionName && req1.ChannelName == req2.ChannelName + return req1.CollectionID == req2.CollectionID && req1.PartitionID == req2.PartitionID && req1.ChannelName == req2.ChannelName } func (sa *SegIDAssigner) reduceSegReqs() { @@ -305,19 +296,14 @@ func (sa *SegIDAssigner) syncSegments() bool { log.Println("SyncSegment Error:", info.Status.Reason) continue } - // FIXME: use channelName - channel, err := strconv.Atoi(info.ChannelName) - if err != nil { - return false - } - assign := sa.getAssign(info.CollName, info.PartitionName, int32(channel)) + assign := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName) segInfo := &segInfo{ segID: info.SegID, count: info.Count, expireTime: info.ExpireTime, } if assign == nil { - colInfos, ok := sa.assignInfos[info.CollName] + colInfos, ok := sa.assignInfos[info.CollectionID] if !ok { colInfos = list.New() } @@ -325,15 +311,13 @@ func (sa *SegIDAssigner) syncSegments() bool { segInfos.PushBack(segInfo) assign = &assignInfo{ - collID: info.CollectionID, - partitionID: info.PartitionID, - channelID: int32(channel), - segInfos: segInfos, - partitionName: info.PartitionName, - collName: info.CollName, + collID: info.CollectionID, + partitionID: info.PartitionID, + channelName: info.ChannelName, + segInfos: segInfos, } colInfos.PushBack(assign) - sa.assignInfos[info.CollName] = colInfos + sa.assignInfos[info.CollectionID] = colInfos } else { assign.segInfos.PushBack(segInfo) } @@ -345,7 +329,7 @@ func (sa *SegIDAssigner) syncSegments() bool { func (sa *SegIDAssigner) processFunc(req allocator.Request) error { segRequest := req.(*segRequest) - assign := sa.getAssign(segRequest.colName, segRequest.partitionName, segRequest.channelID) + assign := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName) if assign == nil { return errors.New("Failed to GetSegmentID") } @@ -354,14 +338,14 @@ func (sa *SegIDAssigner) processFunc(req allocator.Request) error { return err } -func (sa *SegIDAssigner) GetSegmentID(colName, partitionName string, channelID int32, count uint32, ts Timestamp) (map[UniqueID]uint32, error) { +func (sa *SegIDAssigner) GetSegmentID(collID UniqueID, partitionID UniqueID, channelName string, count uint32, ts Timestamp) (map[UniqueID]uint32, error) { req := &segRequest{ - BaseRequest: allocator.BaseRequest{Done: make(chan error), Valid: false}, - colName: colName, - partitionName: partitionName, - channelID: channelID, - count: count, - timestamp: ts, + BaseRequest: allocator.BaseRequest{Done: make(chan error), Valid: false}, + collID: collID, + partitionID: partitionID, + channelName: channelName, + count: count, + timestamp: ts, } sa.Reqs <- req req.Wait()