Skip to content
Snippets Groups Projects
Commit e786ad6a authored by Kun Yu 【余昆】's avatar Kun Yu 【余昆】 Committed by yefu.chen
Browse files

Fix inconsistency of produce msg and consume msg with rmq


Signed-off-by: default avataryukun <kun.yu@zilliz.com>
parent b07a371f
No related branches found
No related tags found
No related merge requests found
......@@ -144,7 +144,7 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error {
channelID := searchResult.ResultChannelID
channelIDInt, _ := strconv.ParseInt(channelID, 10, 64)
if channelIDInt >= int64(len(ms.producers)) {
return errors.New("Failed to produce pulsar msg to unKnow channel")
return errors.New("Failed to produce rmq msg to unKnow channel")
}
bucketValues[index] = int32(channelIDInt)
continue
......@@ -178,38 +178,6 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error {
if err != nil {
return err
}
//
//msg := &pulsar.ProducerMessage{Payload: mb}
//var child opentracing.Span
if v.Msgs[i].Type() == commonpb.MsgType_kInsert ||
v.Msgs[i].Type() == commonpb.MsgType_kSearch ||
v.Msgs[i].Type() == commonpb.MsgType_kSearchResult {
//tracer := opentracing.GlobalTracer()
//ctx := v.Msgs[i].GetMsgContext()
//if ctx == nil {
// ctx = context.Background()
//}
//
//if parent := opentracing.SpanFromContext(ctx); parent != nil {
// child = tracer.StartSpan("start send pulsar msg",
// opentracing.FollowsFrom(parent.Context()))
//} else {
// child = tracer.StartSpan("start send pulsar msg")
//}
//child.SetTag("hash keys", v.Msgs[i].HashKeys())
//child.SetTag("start time", v.Msgs[i].BeginTs())
//child.SetTag("end time", v.Msgs[i].EndTs())
//child.SetTag("msg type", v.Msgs[i].Type())
//msg.Properties = make(map[string]string)
//err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties})
//if err != nil {
// child.LogFields(oplog.Error(err))
// child.Finish()
// return err
//}
//child.LogFields(oplog.String("inject success", "inject success"))
}
m, err := msgstream.ConvertToByteArray(mb)
if err != nil {
......
......@@ -177,6 +177,7 @@ func (pt *ParamTable) initQueryNodeIDList() []UniqueID {
}
ret = append(ret, UniqueID(v))
}
pt.QueryNodeIDList = ret
return ret
}
......
......@@ -114,7 +114,12 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
return err
}
targetFields := s.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
//fmt.Println("srcFieldIDs in internal:", srcFieldIDs)
//fmt.Println("dstFieldIDs in internal:", fieldIDs)
targetFields, err := s.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs)
if err != nil {
return err
}
err = s.segLoader.loadSegmentFieldsData(segment, targetFields)
if err != nil {
return err
......
......@@ -1142,7 +1142,7 @@ func TestSegmentLoad_Search_Vector(t *testing.T) {
paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix)
assert.NoError(t, err)
fieldsMap := node.loadService.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs)
fieldsMap, _ := node.loadService.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs)
assert.Equal(t, len(fieldsMap), 2)
segment, err := node.replica.getSegmentByID(segmentID)
......
......@@ -8,6 +8,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/kv"
minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/storage"
......@@ -53,11 +54,11 @@ func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*intern
}
pathResponse, err := loader.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest)
if err != nil {
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, nil, err
}
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) || len(pathResponse.FieldIDs) <= 0 {
return nil, nil, errors.New("illegal InsertBinlogPathsResponse")
}
......@@ -82,7 +83,7 @@ func (loader *segmentLoader) filterOutVectorFields(fieldIDs []int64, vectorField
return targetFields
}
func (loader *segmentLoader) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList {
func (loader *segmentLoader) checkTargetFields(paths []*internalpb2.StringList, srcFieldIDs []int64, dstFieldIDs []int64) (map[int64]*internalpb2.StringList, error) {
targetFields := make(map[int64]*internalpb2.StringList)
containsFunc := func(s []int64, e int64) bool {
......@@ -94,13 +95,14 @@ func (loader *segmentLoader) getTargetFields(paths []*internalpb2.StringList, sr
return false
}
for i, fieldID := range srcFieldIDS {
if containsFunc(dstFields, fieldID) {
targetFields[fieldID] = paths[i]
for i, fieldID := range dstFieldIDs {
if !containsFunc(srcFieldIDs, fieldID) {
return nil, errors.New("uncompleted fields")
}
targetFields[fieldID] = paths[i]
}
return targetFields
return targetFields, nil
}
func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error {
......
......@@ -286,7 +286,8 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons
if iter.Seek([]byte(dataKey)); currentID != DefaultMessageID && iter.Valid() {
iter.Next()
} else {
iter.SeekToFirst()
newKey := fixChanName + "/"
iter.Seek([]byte(newKey))
}
offset := 0
......
......@@ -269,3 +269,52 @@ func TestRocksMQ_Throughout(t *testing.T) {
cDuration := ct1 - ct0
log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration)
}
func TestRocksMQ_MultiChan(t *testing.T) {
etcdAddr := os.Getenv("ETCD_ADDRESS")
if etcdAddr == "" {
etcdAddr = "localhost:2379"
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
defer etcdKV.Close()
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
_ = idAllocator.Initialize()
name := "/tmp/rocksmq_multichan"
defer os.RemoveAll(name)
rmq, err := NewRocksMQ(name, idAllocator)
assert.Nil(t, err)
channelName0 := "chan01"
channelName1 := "chan11"
err = rmq.CreateChannel(channelName0)
assert.Nil(t, err)
defer rmq.DestroyChannel(channelName0)
err = rmq.CreateChannel(channelName1)
assert.Nil(t, err)
defer rmq.DestroyChannel(channelName1)
assert.Nil(t, err)
loopNum := 10
for i := 0; i < loopNum; i++ {
msg0 := "for_chann0_" + strconv.Itoa(i)
msg1 := "for_chann1_" + strconv.Itoa(i)
pMsg0 := ProducerMessage{payload: []byte(msg0)}
pMsg1 := ProducerMessage{payload: []byte(msg1)}
err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
assert.Nil(t, err)
err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
assert.Nil(t, err)
}
groupName := "test_group"
_ = rmq.DestroyConsumerGroup(groupName, channelName1)
_, err = rmq.CreateConsumerGroup(groupName, channelName1)
assert.Nil(t, err)
cMsgs, err := rmq.Consume(groupName, channelName1, 1)
assert.Nil(t, err)
assert.Equal(t, len(cMsgs), 1)
assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0))
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment