diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 66332809424ac10beed2d9d8e30f2c777779ae83..6f6ea1a91933fb63bf9f01ff8d1cbe41f27bbeff 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -246,7 +246,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { var offset int for _, blob := range msg.RowData { - bv := blob.GetValue()[pos+offset : pos+(dim/8)] + bv := blob.GetValue()[pos : pos+(dim/8)] fieldData.Data = append(fieldData.Data, bv...) offset = len(bv) } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index a2a6f71314264c47b8555e0e09322bb03a27ff37..ba6c8f6d143636cc03c113bdb2fe00fa6029b435 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -436,6 +436,11 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int, data interfa return emptyErr } dataPointer = unsafe.Pointer(&d[0]) + case []byte: + if len(d) <= 0 { + return emptyErr + } + dataPointer = unsafe.Pointer(&d[0]) case []int8: if len(d) <= 0 { return emptyErr diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 32378d887d44db862c85c954d62518029dc449e3..9396ba8ecc104bcbe535c7451b224c9e76d9b7cd 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -16,6 +17,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" + "github.com/zilliztech/milvus-distributed/internal/util/retry" ) type MasterServiceInterface interface { @@ -541,7 +543,11 @@ func (qs *QueryService) CreateQueryChannel() (*querypb.CreateQueryChannelRespons fmt.Println("query service create query channel, queryChannelName = ", allocatedQueryChannel) for nodeID, node := range qs.queryNodes { fmt.Println("node ", nodeID, " watch query channel") - _, err := node.AddQueryChannel(addQueryChannelsRequest) + fn := func() error { + _, err := node.AddQueryChannel(addQueryChannelsRequest) + return err + } + err := retry.Retry(10, time.Millisecond*200, fn) if err != nil { qs.qcMutex.Unlock() return &querypb.CreateQueryChannelResponse{