Skip to content
Snippets Groups Projects
Commit f3aad3a7 authored by Bingyi Sun 【孙秉义】's avatar Bingyi Sun 【孙秉义】 Committed by yefu.chen
Browse files

Change SegmentInfo


Signed-off-by: default avatarsunby <bingyi.sun@zilliz.com>
parent 5e781b93
No related branches found
No related tags found
No related merge requests found
Showing
with 185 additions and 125 deletions
......@@ -16,6 +16,7 @@
#include "knowhere/index/vector_index/VecIndex.h"
struct LoadIndexInfo {
std::string field_name;
int64_t field_id;
std::map<std::string, std::string> index_params;
milvus::knowhere::VecIndexPtr index;
......
......@@ -59,9 +59,11 @@ AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* c_index_key, cons
}
CStatus
AppendFieldInfo(CLoadIndexInfo c_load_index_info, int64_t field_id) {
AppendFieldInfo(CLoadIndexInfo c_load_index_info, const char* c_field_name, int64_t field_id) {
try {
auto load_index_info = (LoadIndexInfo*)c_load_index_info;
std::string field_name(c_field_name);
load_index_info->field_name = field_name;
load_index_info->field_id = field_id;
auto status = CStatus();
......@@ -95,6 +97,7 @@ AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) {
load_index_info->index =
milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_params["index_type"], mode);
load_index_info->index->Load(*binary_set);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
......
......@@ -33,7 +33,7 @@ CStatus
AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* index_key, const char* index_value);
CStatus
AppendFieldInfo(CLoadIndexInfo c_load_index_info, int64_t field_id);
AppendFieldInfo(CLoadIndexInfo c_load_index_info, const char* field_name, int64_t field_id);
CStatus
AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set);
......
......@@ -781,7 +781,7 @@ TEST(CApiTest, LoadIndexInfo) {
status = AppendIndexParam(c_load_index_info, index_param_key2.data(), index_param_value2.data());
assert(status.error_code == Success);
std::string field_name = "field0";
status = AppendFieldInfo(c_load_index_info, 0);
status = AppendFieldInfo(c_load_index_info, field_name.data(), 0);
assert(status.error_code == Success);
status = AppendIndex(c_load_index_info, c_binary_set);
assert(status.error_code == Success);
......@@ -937,7 +937,7 @@ TEST(CApiTest, UpdateSegmentIndex_Without_Predicate) {
AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str());
AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 100);
AppendFieldInfo(c_load_index_info, "fakevec", 100);
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
status = UpdateSegmentIndex(segment, c_load_index_info);
......@@ -1074,7 +1074,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Range) {
AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str());
AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 100);
AppendFieldInfo(c_load_index_info, "fakevec", 100);
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
status = UpdateSegmentIndex(segment, c_load_index_info);
......@@ -1211,7 +1211,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Term) {
AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str());
AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 100);
AppendFieldInfo(c_load_index_info, "fakevec", 100);
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
status = UpdateSegmentIndex(segment, c_load_index_info);
......@@ -1350,7 +1350,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Range) {
AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str());
AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 100);
AppendFieldInfo(c_load_index_info, "fakevec", 100);
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
status = UpdateSegmentIndex(segment, c_load_index_info);
......@@ -1488,7 +1488,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) {
AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str());
AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 100);
AppendFieldInfo(c_load_index_info, "fakevec", 100);
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
status = UpdateSegmentIndex(segment, c_load_index_info);
......@@ -1665,7 +1665,7 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) {
AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str());
AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str());
AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str());
AppendFieldInfo(c_load_index_info, 100);
AppendFieldInfo(c_load_index_info, "fakevec", 100);
AppendIndex(c_load_index_info, (CBinarySet)&binary_set);
auto load_index_info = (LoadIndexInfo*)c_load_index_info;
......
......@@ -105,6 +105,7 @@ TEST(Sealed, without_predicate) {
auto ref_result = QueryResultToJson(qr);
LoadIndexInfo load_info;
load_info.field_name = "fakevec";
load_info.field_id = fake_id.get();
load_info.index = indexing;
load_info.index_params["metric_type"] = "L2";
......@@ -197,6 +198,7 @@ TEST(Sealed, with_predicate) {
auto result = indexing->Query(query_dataset, conf, nullptr);
LoadIndexInfo load_info;
load_info.field_name = "fakevec";
load_info.field_id = fake_id.get();
load_info.index = indexing;
load_info.index_params["metric_type"] = "L2";
......@@ -310,6 +312,7 @@ TEST(Sealed, LoadFieldData) {
LoadIndexInfo vec_info;
vec_info.field_id = fakevec_id.get();
vec_info.field_name = "fakevec";
vec_info.index = indexing;
vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2;
segment->LoadIndex(vec_info);
......
package dataservice
import (
"fmt"
"strconv"
"sync"
)
type (
channelGroup []string
insertChannelManager struct {
mu sync.RWMutex
count int
channelGroups map[UniqueID][]channelGroup // collection id to channel ranges
channelGroups map[UniqueID][]string // collection id to channel ranges
}
)
func (cr channelGroup) Contains(channelName string) bool {
for _, name := range cr {
if name == channelName {
return true
}
}
return false
}
func newInsertChannelManager() *insertChannelManager {
return &insertChannelManager{
count: 0,
channelGroups: make(map[UniqueID][]channelGroup),
channelGroups: make(map[UniqueID][]string),
}
}
func (cm *insertChannelManager) GetChannels(collectionID UniqueID, groupNum int) ([]channelGroup, error) {
func (cm *insertChannelManager) GetChannels(collectionID UniqueID) ([]string, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
if _, ok := cm.channelGroups[collectionID]; ok {
return cm.channelGroups[collectionID], nil
}
channels := Params.InsertChannelNumPerCollection
m, n := channels/int64(groupNum), channels%int64(groupNum)
cg := make([]channelGroup, 0)
var i, j int64 = 0, 0
for i < channels {
var group []string
if j < n {
group = make([]string, m+1)
} else {
group = make([]string, m)
}
for k := 0; k < len(group); k++ {
group[k] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count)
cm.count++
}
i += int64(len(group))
j++
cg = append(cg, group)
cg := make([]string, channels)
var i int64 = 0
for ; i < channels; i++ {
cg[i] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count)
cm.count++
}
cm.channelGroups[collectionID] = cg
return cg, nil
}
func (cm *insertChannelManager) GetChannelGroup(collectionID UniqueID, channelName string) (channelGroup, error) {
cm.mu.RLock()
defer cm.mu.RUnlock()
_, ok := cm.channelGroups[collectionID]
if !ok {
return nil, fmt.Errorf("can not find collection %d", collectionID)
}
for _, cr := range cm.channelGroups[collectionID] {
if cr.Contains(channelName) {
return cr, nil
}
}
return nil, fmt.Errorf("channel name %s not found", channelName)
}
package dataservice
import (
"strconv"
"testing"
"github.com/stretchr/testify/assert"
)
func TestChannelAllocation(t *testing.T) {
func TestGetChannel(t *testing.T) {
Params.Init()
Params.InsertChannelNumPerCollection = 4
Params.InsertChannelPrefixName = "channel"
manager := newInsertChannelManager()
cases := []struct {
collectionID UniqueID
groupNum int
expectGroupNum int
}{
{1, 4, 4},
{1, 4, 4},
{2, 1, 1},
{3, 5, 4},
}
for _, c := range cases {
channels, err := manager.GetChannels(c.collectionID, c.expectGroupNum)
assert.Nil(t, err)
assert.EqualValues(t, c.expectGroupNum, len(channels))
total := 0
for _, channel := range channels {
total += len(channel)
}
assert.EqualValues(t, Params.InsertChannelNumPerCollection, total)
channels, err := manager.GetChannels(1)
assert.Nil(t, err)
assert.EqualValues(t, Params.InsertChannelNumPerCollection, len(channels))
for i := 0; i < len(channels); i++ {
assert.EqualValues(t, Params.InsertChannelPrefixName+strconv.Itoa(i), channels[i])
}
}
......@@ -23,16 +23,18 @@ type (
channelNum int
}
dataNodeCluster struct {
mu sync.RWMutex
finishCh chan struct{}
nodes []*dataNode
mu sync.RWMutex
finishCh chan struct{}
nodes []*dataNode
watchedCollection map[UniqueID]bool
}
)
func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster {
return &dataNodeCluster{
finishCh: finishCh,
nodes: make([]*dataNode, 0),
finishCh: finishCh,
nodes: make([]*dataNode, 0),
watchedCollection: make(map[UniqueID]bool),
}
}
......@@ -49,7 +51,7 @@ func (c *dataNodeCluster) Register(dataNode *dataNode) {
func (c *dataNodeCluster) checkDataNodeNotExist(ip string, port int64) bool {
for _, node := range c.nodes {
if node.address.ip == ip || node.address.port == port {
if node.address.ip == ip && node.address.port == port {
return false
}
}
......@@ -70,12 +72,25 @@ func (c *dataNodeCluster) GetNodeIDs() []int64 {
return ret
}
func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) {
func (c *dataNodeCluster) WatchInsertChannels(collectionID UniqueID, channels []string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.watchedCollection[collectionID] {
return
}
sort.Slice(c.nodes, func(i, j int) bool { return c.nodes[i].channelNum < c.nodes[j].channelNum })
var groups [][]string
if len(channels) < len(c.nodes) {
groups = make([][]string, len(channels))
} else {
groups = make([][]string, len(c.nodes))
}
length := len(groups)
for i, channel := range channels {
groups[i%length] = append(groups[i%length], channel)
}
for i, group := range groups {
_, err := c.nodes[i%len(c.nodes)].client.WatchDmChannels(&datapb.WatchDmChannelRequest{
resp, err := c.nodes[i].client.WatchDmChannels(&datapb.WatchDmChannelRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: -1, // todo
......@@ -88,7 +103,13 @@ func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) {
log.Println(err.Error())
continue
}
if resp.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Println(resp.Reason)
continue
}
c.nodes[i].channelNum += len(group)
}
c.watchedCollection[collectionID] = true
}
func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, error) {
......@@ -125,3 +146,12 @@ func (c *dataNodeCluster) ShutDownClients() {
}
}
}
// Clear only for test
func (c *dataNodeCluster) Clear() {
c.mu.Lock()
defer c.mu.Unlock()
c.finishCh = make(chan struct{})
c.nodes = make([]*dataNode, 0)
c.watchedCollection = make(map[UniqueID]bool)
}
package dataservice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestWatchChannels(t *testing.T) {
Params.Init()
Params.DataNodeNum = 3
cases := []struct {
collectionID UniqueID
channels []string
channelNums []int
}{
{1, []string{"c1"}, []int{1, 0, 0}},
{1, []string{"c1", "c2", "c3"}, []int{1, 1, 1}},
{1, []string{"c1", "c2", "c3", "c4"}, []int{2, 1, 1}},
{1, []string{"c1", "c2", "c3", "c4", "c5", "c6", "c7"}, []int{3, 2, 2}},
}
cluster := newDataNodeCluster(make(chan struct{}))
for _, c := range cases {
for i := 0; i < Params.DataNodeNum; i++ {
cluster.Register(&dataNode{
id: int64(i),
address: struct {
ip string
port int64
}{"localhost", int64(9999 + i)},
client: newMockDataNodeClient(),
channelNum: 0,
})
}
cluster.WatchInsertChannels(c.collectionID, c.channels)
for i := 0; i < len(cluster.nodes); i++ {
assert.EqualValues(t, c.channelNums[i], cluster.nodes[i].channelNum)
}
cluster.Clear()
}
}
......@@ -386,16 +386,16 @@ func (meta *meta) removeSegments(segIDs []UniqueID) error {
return meta.client.MultiRemove(segmentPaths)
}
func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) {
func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*datapb.SegmentInfo, error) {
return &datapb.SegmentInfo{
SegmentID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannels: channelRange,
OpenTime: 0,
SealedTime: 0,
NumRows: 0,
MemSize: 0,
State: datapb.SegmentState_SegmentGrowing,
SegmentID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
OpenTime: 0,
SealedTime: 0,
NumRows: 0,
MemSize: 0,
State: datapb.SegmentState_SegmentGrowing,
}, nil
}
......@@ -48,7 +48,7 @@ func TestSegment(t *testing.T) {
assert.Nil(t, err)
segID, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(id, 100, segID, []string{"c1", "c2"})
segmentInfo, err := BuildSegment(id, 100, segID, "c1")
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
......@@ -114,14 +114,14 @@ func TestGetCount(t *testing.T) {
nums, err := meta.GetNumRowsOfCollection(id)
assert.Nil(t, err)
assert.EqualValues(t, 0, nums)
segment, err := BuildSegment(id, 100, segID, []string{"c1"})
segment, err := BuildSegment(id, 100, segID, "c1")
assert.Nil(t, err)
segment.NumRows = 100
err = meta.AddSegment(segment)
assert.Nil(t, err)
segID, err = mockAllocator.allocID()
assert.Nil(t, err)
segment, err = BuildSegment(id, 100, segID, []string{"c1"})
segment, err = BuildSegment(id, 100, segID, "c1")
assert.Nil(t, err)
segment.NumRows = 300
err = meta.AddSegment(segment)
......
......@@ -4,6 +4,10 @@ import (
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
......@@ -46,3 +50,27 @@ func newTestSchema() *schemapb.CollectionSchema {
},
}
}
type mockDataNodeClient struct {
}
func newMockDataNodeClient() *mockDataNodeClient {
return &mockDataNodeClient{}
}
func (c *mockDataNodeClient) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil
}
func (c *mockDataNodeClient) GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
// todo
return nil, nil
}
func (c *mockDataNodeClient) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil
}
func (c *mockDataNodeClient) Stop() error {
return nil
}
......@@ -54,7 +54,7 @@ type (
sealed bool
lastExpireTime Timestamp
allocations []*allocation
channelGroup channelGroup
insertChannel string
}
allocation struct {
rowNums int
......@@ -100,7 +100,7 @@ func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentIn
total: totalRows,
sealed: false,
lastExpireTime: 0,
channelGroup: segmentInfo.InsertChannels,
insertChannel: segmentInfo.InsertChannel,
}
return nil
}
......@@ -112,7 +112,7 @@ func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID,
for _, segStatus := range allocator.segments {
if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID ||
!segStatus.channelGroup.Contains(channelName) {
segStatus.insertChannel != channelName {
continue
}
var success bool
......
package dataservice
import (
"log"
"math"
"strconv"
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/stretchr/testify/assert"
)
......@@ -27,7 +30,7 @@ func TestAllocSegment(t *testing.T) {
assert.Nil(t, err)
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"})
segmentInfo, err := BuildSegment(collID, 100, id, "c1")
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
......@@ -80,7 +83,7 @@ func TestSealSegment(t *testing.T) {
for i := 0; i < 10; i++ {
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(collID, 100, id, []string{"c" + strconv.Itoa(i)})
segmentInfo, err := BuildSegment(collID, 100, id, "c"+strconv.Itoa(i))
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
......@@ -115,21 +118,32 @@ func TestExpireSegment(t *testing.T) {
assert.Nil(t, err)
id, err := mockAllocator.allocID()
assert.Nil(t, err)
segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"})
segmentInfo, err := BuildSegment(collID, 100, id, "c1")
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
assert.Nil(t, err)
id1, _, _, err := segAllocator.AllocSegment(collID, 100, "c1", 10)
id1, _, et, err := segAllocator.AllocSegment(collID, 100, "c1", 10)
ts2, _ := tsoutil.ParseTS(et)
log.Printf("physical ts: %s", ts2.String())
assert.Nil(t, err)
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
ts, err := mockAllocator.allocTimestamp()
assert.Nil(t, err)
t1, _ := tsoutil.ParseTS(ts)
log.Printf("before ts: %s", t1.String())
time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
ts, err = mockAllocator.allocTimestamp()
assert.Nil(t, err)
err = segAllocator.ExpireAllocations(ts)
assert.Nil(t, err)
expired, err := segAllocator.IsAllocationsExpired(id1, ts)
if et > ts {
tsPhy, _ := tsoutil.ParseTS(ts)
log.Printf("ts %s", tsPhy.String())
}
assert.Nil(t, err)
assert.True(t, expired)
assert.EqualValues(t, 0, len(segAllocator.segments[id1].allocations))
......
......@@ -555,16 +555,11 @@ func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.Assign
}
func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error {
group, err := s.insertChannelMgr.GetChannelGroup(collectionID, channelName)
if err != nil {
return err
}
id, err := s.allocator.allocID()
if err != nil {
return err
}
segmentInfo, err := BuildSegment(collectionID, partitionID, id, group)
segmentInfo, err := BuildSegment(collectionID, partitionID, id, channelName)
if err != nil {
return err
}
......@@ -683,16 +678,12 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string,
if !s.checkStateIsHealthy() {
return nil, errors.New("server is initializing")
}
channelGroups, err := s.insertChannelMgr.GetChannels(req.CollectionID, s.cluster.GetNumOfNodes())
channels, err := s.insertChannelMgr.GetChannels(req.CollectionID)
if err != nil {
return nil, err
}
channels := make([]string, 0)
for _, group := range channelGroups {
channels = append(channels, group...)
}
s.cluster.WatchInsertChannels(channelGroups)
s.cluster.WatchInsertChannels(req.CollectionID, channels)
return channels, nil
}
......
......@@ -52,7 +52,7 @@ func TestDataNodeTTWatcher(t *testing.T) {
segID, err := allocator.allocID()
segmentIDs[i] = segID
assert.Nil(t, err)
segmentInfo, err := BuildSegment(id, 100, segID, []string{"channel" + strconv.Itoa(i)})
segmentInfo, err := BuildSegment(id, 100, segID, "channel"+strconv.Itoa(i))
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
......@@ -64,7 +64,7 @@ func TestDataNodeTTWatcher(t *testing.T) {
}
}
time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond)
time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
for i, c := range cases {
if c.allocation && !c.expired {
_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
......
......@@ -98,7 +98,7 @@ func TestGrpcService(t *testing.T) {
var binlogLock sync.Mutex
binlogPathArray := make([]string, 0, 16)
core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) {
binlogLock.Lock()
defer binlogLock.Unlock()
binlogPathArray = append(binlogPathArray, binlog...)
......
......@@ -152,7 +152,7 @@ type Core struct {
GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
//TODO, call index builder's client to build index, return build id
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error)
BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error)
//TODO, proxy service interface, notify proxy service to drop collection
InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error
......@@ -671,13 +671,11 @@ func (c *Core) SetDataService(s DataServiceInterface) error {
}
func (c *Core) SetIndexService(s IndexServiceInterface) error {
c.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) {
c.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) {
rsp, err := s.BuildIndex(&indexpb.BuildIndexRequest{
DataPaths: binlog,
TypeParams: typeParams,
IndexParams: indexParams,
IndexID: indexID,
IndexName: indexName,
})
if err != nil {
return 0, err
......
......@@ -628,7 +628,7 @@ func (t *CreateIndexTask) BuildIndex() error {
})
}
}
bldID, err = t.core.BuildIndexReq(binlogs, t.fieldSchema.TypeParams, t.indexParams, idxID, t.indexName)
bldID, err = t.core.BuildIndexReq(binlogs, t.fieldSchema.TypeParams, t.indexParams)
if err != nil {
return err
}
......
......@@ -137,7 +137,7 @@ message SegmentInfo {
int64 segmentID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
repeated string insert_channels = 4;
string insert_channel = 4;
uint64 open_time = 5;
uint64 sealed_time = 6;
uint64 flushed_time = 7;
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment