diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index 6c827a04461c679966b27656da3d9f44559f69eb..ff0f23afdaa7b3beaedd9ac0e26308a1d2733367 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -13,9 +13,8 @@ const ( ) type Request interface { - Wait() + Wait() error Notify(error) - IsValid() bool } type BaseRequest struct { @@ -23,13 +22,9 @@ type BaseRequest struct { Valid bool } -func (req *BaseRequest) Wait() { +func (req *BaseRequest) Wait() error { err := <-req.Done - req.Valid = err == nil -} - -func (req *BaseRequest) IsValid() bool { - return req.Valid + return err } func (req *BaseRequest) Notify(err error) { @@ -252,5 +247,5 @@ func (ta *Allocator) Close() { func (ta *Allocator) CleanCache() { req := &SyncRequest{BaseRequest: BaseRequest{Done: make(chan error), Valid: false}} ta.ForceSyncChan <- req - req.Wait() + _ = req.Wait() } diff --git a/internal/allocator/id.go b/internal/allocator/id.go index 6588a5f5685bd3f425719265a0a80ec8e2e0d382..5b681be0ba813a8706143d56c5ba7b4d01d1216c 100644 --- a/internal/allocator/id.go +++ b/internal/allocator/id.go @@ -144,11 +144,10 @@ func (ia *IDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) { req.count = count ia.Reqs <- req - req.Wait() - - if !req.IsValid() { - return 0, 0, nil + if err := req.Wait(); err != nil { + return 0, 0, err } + start, count := req.id, req.count return start, start + int64(count), nil } diff --git a/internal/allocator/timestamp.go b/internal/allocator/timestamp.go index 6fcd58f2d2261119ff5cf7812f912b34603c1d73..c62acd428117ed8b1505c68d4ffdb2d171f985bb 100644 --- a/internal/allocator/timestamp.go +++ b/internal/allocator/timestamp.go @@ -2,7 +2,7 @@ package allocator import ( "context" - "errors" + "fmt" "log" "time" @@ -144,10 +144,8 @@ func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) { } req.count = count ta.Reqs <- req - req.Wait() - - if !req.IsValid() { - return nil, errors.New("alloc time stamp request failed") + if err := req.Wait(); err != nil { + return nil, fmt.Errorf("alloc time stamp request failed: %s", err) } start, count := req.timestamp, req.count diff --git a/internal/proxynode/segment.go b/internal/proxynode/segment.go index 65e58d52ce13c85dd9c7cf5396fde10f5cce5d98..052bd09d544a85f6336f6d3cf6ed79c1c9f5c316 100644 --- a/internal/proxynode/segment.go +++ b/internal/proxynode/segment.go @@ -355,10 +355,9 @@ func (sa *SegIDAssigner) GetSegmentID(collID UniqueID, partitionID UniqueID, cha timestamp: ts, } sa.Reqs <- req - req.Wait() - - if !req.IsValid() { - return nil, errors.New("GetSegmentID Failed") + if err := req.Wait(); err != nil { + return nil, fmt.Errorf("GetSegmentID failed: %s", err) } + return req.segInfo, nil } diff --git a/internal/util/typeutil/schema.go b/internal/util/typeutil/schema.go index a5eb05bb03a40d6e702c726e767e9a1c5a3adfaf..5056a4867ea334f6fbfeaf7d53ef264c050e9e36 100644 --- a/internal/util/typeutil/schema.go +++ b/internal/util/typeutil/schema.go @@ -20,14 +20,26 @@ func EstimateSizePerRecord(schema *schemapb.CollectionSchema) (int, error) { res += 8 case schemapb.DataType_String: res += 125 // todo find a better way to estimate string type - case schemapb.DataType_BinaryVector, schemapb.DataType_FloatVector: + case schemapb.DataType_BinaryVector: for _, kv := range fs.TypeParams { if kv.Key == "dim" { v, err := strconv.Atoi(kv.Value) if err != nil { return -1, err } - res += v + res += v / 8 + break + } + } + case schemapb.DataType_FloatVector: + for _, kv := range fs.TypeParams { + if kv.Key == "dim" { + v, err := strconv.Atoi(kv.Value) + if err != nil { + return -1, err + } + res += v * 4 + break } } }