diff --git a/go.mod b/go.mod index 5d436f649d2b0bb570ce62c384f29c67f35555a2..172b01907ac22924ea139276860f9077c71c3a03 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module github.com/zilliztech/milvus-distributed go 1.15 require ( - github.com/apache/pulsar-client-go v0.3.0 + github.com/apache/pulsar-client-go v0.1.1 + github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4 github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect @@ -11,6 +12,7 @@ require ( github.com/go-basic/ipv4 v1.0.0 github.com/gogo/protobuf v1.3.1 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.4.2 github.com/google/btree v1.0.0 github.com/klauspost/compress v1.10.11 // indirect diff --git a/go.sum b/go.sum index 3b797242d51c0ecd5b60bb250eabf05942e67a82..7c577ff8eb486c58a851d712b74d493b85b1d7fc 100644 --- a/go.sum +++ b/go.sum @@ -22,10 +22,15 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/apache/pulsar-client-go v0.1.1 h1:v/kU+2ZCC6yFIcbZrFtWa9/nvVzVr18L+xYJUvZSxEQ= +github.com/apache/pulsar-client-go v0.1.1/go.mod h1:mlxC65KL1BLhGO2bnT9zWMttVzR2czVPb27D477YpyU= github.com/apache/pulsar-client-go v0.3.0 h1:rNhJ/ENwoEfZPHHwUHNxPBTNqNQE2LQEm7DXu043giM= github.com/apache/pulsar-client-go v0.3.0/go.mod h1:9eSgOadVhCfb2DfWtS1SCYaYIMk9VDOZztr4u3FO8cQ= github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb h1:E1P0FudxDdj2RhbveZC9i3PwukLCA/4XQSkBS/dw6/I= github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY= +github.com/apache/thrift v0.14.0 h1:vqZ2DP42i8th2OsgCcYZkirtbzvpZEFx53LiWDJXIAs= +github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4 h1:orNYqmQGnSjgOauLWjHEp9/qIDT98xv/0Aa4Zet3/Y8= +github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4/go.mod h1:V/LzksIyqd3KZuQ2SunvReTG/UkArhII1dAWY5U1sCE= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk= @@ -92,6 +97,7 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+ne github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/frankban/quicktest v1.10.2 h1:19ARM85nVi4xH7xPXuc5eM/udya5ieh7b/Sv+d844Tk= github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -329,6 +335,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/protocolbuffers/protobuf v3.15.1+incompatible h1:HfHKbdokPc6vkBuYYXW/PTpOIo8BICHl1vfE8dT5jGo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= @@ -336,6 +343,7 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -379,6 +387,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= +github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ= +github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0= diff --git a/internal/log/logutil.go b/internal/log/logutil.go index 82c9a7e6507559c2b2c89b61d088f3474b4f537c..c5cdf888706b3b3b894df2aa6e3baa848a05719b 100644 --- a/internal/log/logutil.go +++ b/internal/log/logutil.go @@ -4,8 +4,6 @@ import ( "sync" "sync/atomic" - "github.com/apache/pulsar-client-go/pulsar/log" - "go.uber.org/zap/zapcore" etcd "go.etcd.io/etcd/clientv3" @@ -79,51 +77,6 @@ func (w *zapWrapper) V(l int) bool { return w.logger.Core().Enabled(zapcore.Level(zapLevel)) } -func (w *zapWrapper) SubLogger(fields log.Fields) log.Logger { - return w.WithFields(fields).(log.Logger) -} - -func (w *zapWrapper) WithFields(fields log.Fields) log.Entry { - if len(fields) == 0 { - return w - } - kv := make([]interface{}, 0, 2*len(fields)) - for k, v := range fields { - kv = append(kv, k, v) - } - return &zapWrapper{ - logger: w.logger.Sugar().With(kv...).Desugar(), - } -} - -func (w *zapWrapper) WithField(name string, value interface{}) log.Entry { - return &zapWrapper{ - logger: w.logger.Sugar().With(name, value).Desugar(), - } -} - -func (w *zapWrapper) WithError(err error) log.Entry { - return &zapWrapper{ - logger: w.logger.Sugar().With("error", err).Desugar(), - } -} - -func (w *zapWrapper) Debug(args ...interface{}) { - w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Debug(args...) -} - -func (w *zapWrapper) Warn(args ...interface{}) { - w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warn(args...) -} - -func (w *zapWrapper) Debugf(format string, args ...interface{}) { - w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Debugf(format, args...) -} - -func (w *zapWrapper) Warnf(format string, args ...interface{}) { - w.logger.WithOptions(zap.AddCallerSkip(1)).Sugar().Warnf(format, args...) -} - var once sync.Once var _globalZapWrapper atomic.Value diff --git a/internal/msgstream/pulsarms/pulsar_msgstream.go b/internal/msgstream/pulsarms/pulsar_msgstream.go index ef8f951a96d09cb13d8dd2bdded0364ded73152c..9ebaa690d3a7db00a32e7fb6059ca2911a62c066 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream.go @@ -90,6 +90,7 @@ func newPulsarMsgStream(ctx context.Context, streamCancel: streamCancel, consumerReflects: consumerReflects, consumerLock: &sync.Mutex{}, + wait: &sync.WaitGroup{}, } return stream, nil @@ -136,14 +137,14 @@ func (ms *PulsarMsgStream) AsConsumer(channels []string, return errors.New("pulsar is not ready, consumer is nil") } - ms.consumerLock.Lock() ms.consumers = append(ms.consumers, pc) ms.consumerChannels = append(ms.consumerChannels, channels[i]) ms.consumerReflects = append(ms.consumerReflects, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(pc.Chan()), }) - ms.consumerLock.Unlock() + ms.wait.Add(1) + go ms.receiveMsg(pc) return nil } err := util.Retry(10, time.Millisecond*200, fn) @@ -159,15 +160,11 @@ func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc) { } func (ms *PulsarMsgStream) Start() { - ms.wait = &sync.WaitGroup{} - if ms.consumers != nil { - ms.wait.Add(1) - go ms.bufMsgPackToChannel() - } } func (ms *PulsarMsgStream) Close() { ms.streamCancel() + ms.wait.Wait() for _, producer := range ms.producers { if producer != nil { @@ -321,9 +318,42 @@ func (ms *PulsarMsgStream) Consume() *MsgPack { } } -func (ms *PulsarMsgStream) bufMsgPackToChannel() { +func (ms *PulsarMsgStream) receiveMsg(consumer Consumer) { defer ms.wait.Done() + for { + select { + case <-ms.ctx.Done(): + return + case pulsarMsg, ok := <-consumer.Chan(): + if !ok { + return + } + headerMsg := commonpb.MsgHeader{} + err := proto.Unmarshal(pulsarMsg.Payload(), &headerMsg) + if err != nil { + log.Error("Failed to unmarshal message header", zap.Error(err)) + continue + } + tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.Base.MsgType) + if err != nil { + log.Error("Failed to unmarshal tsMsg", zap.Error(err)) + continue + } + + tsMsg.SetPosition(&msgstream.MsgPosition{ + ChannelName: filepath.Base(pulsarMsg.Topic()), + MsgID: typeutil.PulsarMsgIDToString(pulsarMsg.ID()), + }) + + msgPack := MsgPack{Msgs: []TsMsg{tsMsg}} + ms.receiveBuf <- &msgPack + } + } +} + +func (ms *PulsarMsgStream) bufMsgPackToChannel() { + for { select { case <-ms.ctx.Done(): @@ -500,7 +530,6 @@ func (ms *PulsarTtMsgStream) AsConsumer(channels []string, } func (ms *PulsarTtMsgStream) Start() { - ms.wait = &sync.WaitGroup{} if ms.consumers != nil { ms.wait.Add(1) go ms.bufMsgPackToChannel() diff --git a/internal/msgstream/pulsarms/pulsar_msgstream_test.go b/internal/msgstream/pulsarms/pulsar_msgstream_test.go index 255aaf749033ded47dab09f0c2253acc59cc4b07..431415a41475a59a1ff9da8afd22a4a40f0ab531 100644 --- a/internal/msgstream/pulsarms/pulsar_msgstream_test.go +++ b/internal/msgstream/pulsarms/pulsar_msgstream_test.go @@ -7,6 +7,8 @@ import ( "os" "testing" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -237,9 +239,10 @@ func receiveMsg(outputStream msgstream.MsgStream, msgCount int) { func TestStream_PulsarMsgStream_Insert(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1)) @@ -259,10 +262,10 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) { func TestStream_PulsarMsgStream_Delete(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"delete"} - consumerChannels := []string{"delete"} - consumerSubName := "subDelete" - + c := funcutil.RandomString(8) + producerChannels := []string{c} + consumerChannels := []string{c} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 1, 1)) //msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 3, 3)) @@ -279,9 +282,10 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) { func TestStream_PulsarMsgStream_Search(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"search"} - consumerChannels := []string{"search"} - consumerSubName := "subSearch" + c := funcutil.RandomString(8) + producerChannels := []string{c} + consumerChannels := []string{c} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 1, 1)) @@ -299,10 +303,10 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) { func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"searchResult"} - consumerChannels := []string{"searchResult"} - consumerSubName := "subSearchResult" - + c := funcutil.RandomString(8) + producerChannels := []string{c} + consumerChannels := []string{c} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3)) @@ -319,10 +323,10 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) { func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"timeTick"} - consumerChannels := []string{"timeTick"} - consumerSubName := "subTimeTick" - + c := funcutil.RandomString(8) + producerChannels := []string{c} + consumerChannels := []string{c} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3)) @@ -339,9 +343,10 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) { func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1)) @@ -359,9 +364,10 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) { func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 1, 1)) @@ -379,10 +385,10 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) { func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" - + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) baseMsg := msgstream.BaseMsg{ BeginTimestamp: 0, EndTimestamp: 0, @@ -433,9 +439,10 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) { func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) baseMsg := msgstream.BaseMsg{ BeginTimestamp: 0, @@ -484,9 +491,10 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) { func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) msgPack := msgstream.MsgPack{} msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 1, 1)) @@ -515,10 +523,10 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) { func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" - + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) msgPack0 := msgstream.MsgPack{} msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0)) @@ -549,9 +557,10 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) { func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"seek_insert1", "seek_insert2"} - consumerChannels := []string{"seek_insert1", "seek_insert2"} - consumerSubName := "subInsert" + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) msgPack0 := MsgPack{} msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0)) @@ -602,9 +611,10 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) { func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) { pulsarAddress, _ := Params.Load("_PulsarAddress") - producerChannels := []string{"insert1", "insert2"} - consumerChannels := []string{"insert1", "insert2"} - consumerSubName := "subInsert" + c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8) + producerChannels := []string{c1, c2} + consumerChannels := []string{c1, c2} + consumerSubName := funcutil.RandomString(8) msgPack0 := msgstream.MsgPack{} msgPack0.Msgs = append(msgPack0.Msgs, getTimeTickMsg(0, 0, 0)) diff --git a/internal/proxynode/paramtable.go b/internal/proxynode/paramtable.go index 307c4771374c854afd4685f413b580120e9f613e..a3ca31df19609f26c3e8bd695c98816176451fcf 100644 --- a/internal/proxynode/paramtable.go +++ b/internal/proxynode/paramtable.go @@ -177,7 +177,6 @@ func (pt *ParamTable) initQueryNodeIDList() []UniqueID { } ret = append(ret, UniqueID(v)) } - pt.QueryNodeIDList = ret return ret } diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go index 73043849ff7d0b1c4dc745cb6ab3ca9969c61da2..0039042260968944b9f302d75ebda9e10fe259f4 100644 --- a/internal/querynode/load_service.go +++ b/internal/querynode/load_service.go @@ -114,12 +114,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni return err } - //fmt.Println("srcFieldIDs in internal:", srcFieldIDs) - //fmt.Println("dstFieldIDs in internal:", fieldIDs) - targetFields, err := s.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs) - if err != nil { - return err - } + targetFields := s.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs) err = s.segLoader.loadSegmentFieldsData(segment, targetFields) if err != nil { return err diff --git a/internal/querynode/load_service_test.go b/internal/querynode/load_service_test.go index 8ec4ee440ee841c21233407f8c42d9181fc095c8..a8f2145daa43f293eb924b7dfb93ac9900de0bad 100644 --- a/internal/querynode/load_service_test.go +++ b/internal/querynode/load_service_test.go @@ -1142,7 +1142,7 @@ func TestSegmentLoad_Search_Vector(t *testing.T) { paths, srcFieldIDs, err := generateInsertBinLog(collectionID, partitionID, segmentID, keyPrefix) assert.NoError(t, err) - fieldsMap, _ := node.loadService.segLoader.checkTargetFields(paths, srcFieldIDs, fieldIDs) + fieldsMap := node.loadService.segLoader.getTargetFields(paths, srcFieldIDs, fieldIDs) assert.Equal(t, len(fieldsMap), 2) segment, err := node.replica.getSegmentByID(segmentID) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index b2a18961c332d8709dccd25ba7e7943192047cce..883c96e1758d1e6cf57bcda804adce5a70589c7e 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -8,7 +8,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" minioKV "github.com/zilliztech/milvus-distributed/internal/kv/minio" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/storage" @@ -54,11 +53,11 @@ func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*intern } pathResponse, err := loader.dataClient.GetInsertBinlogPaths(insertBinlogPathRequest) - if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + if err != nil { return nil, nil, err } - if len(pathResponse.FieldIDs) != len(pathResponse.Paths) || len(pathResponse.FieldIDs) <= 0 { + if len(pathResponse.FieldIDs) != len(pathResponse.Paths) { return nil, nil, errors.New("illegal InsertBinlogPathsResponse") } @@ -83,7 +82,7 @@ func (loader *segmentLoader) filterOutVectorFields(fieldIDs []int64, vectorField return targetFields } -func (loader *segmentLoader) checkTargetFields(paths []*internalpb2.StringList, srcFieldIDs []int64, dstFieldIDs []int64) (map[int64]*internalpb2.StringList, error) { +func (loader *segmentLoader) getTargetFields(paths []*internalpb2.StringList, srcFieldIDS []int64, dstFields []int64) map[int64]*internalpb2.StringList { targetFields := make(map[int64]*internalpb2.StringList) containsFunc := func(s []int64, e int64) bool { @@ -95,14 +94,13 @@ func (loader *segmentLoader) checkTargetFields(paths []*internalpb2.StringList, return false } - for i, fieldID := range dstFieldIDs { - if !containsFunc(srcFieldIDs, fieldID) { - return nil, errors.New("uncompleted fields") + for i, fieldID := range srcFieldIDS { + if containsFunc(dstFields, fieldID) { + targetFields[fieldID] = paths[i] } - targetFields[fieldID] = paths[i] } - return targetFields, nil + return targetFields } func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetFields map[int64]*internalpb2.StringList) error { diff --git a/internal/util/funcutil/random.go b/internal/util/funcutil/random.go new file mode 100644 index 0000000000000000000000000000000000000000..36d7cc6979b38e5149256055c0ce07e490f143e9 --- /dev/null +++ b/internal/util/funcutil/random.go @@ -0,0 +1,20 @@ +package funcutil + +import ( + "math/rand" + "time" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func RandomString(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +}