Skip to content
Snippets Groups Projects
Commit 6a133863 authored by XuanYang-cn's avatar XuanYang-cn Committed by yefu.chen
Browse files

Add DataNode package


Signed-off-by: default avatarXuanYang-cn <xuan.yang@zilliz.com>
parent 1ce32d87
No related branches found
No related tags found
No related merge requests found
Showing
with 829 additions and 2985 deletions
......@@ -19,7 +19,9 @@ msgChannel:
searchResult: "searchResult"
k2s: "k2s"
proxyTimeTick: "proxyTimeTick"
writeNodeTimeTick: "writeNodeTimeTick"
writeNodeTimeTick: "writeNodeTimeTick" # GOOSE TODO: remove this
dataNodeTimeTick: "dataNodeTimeTick"
dataNodeSegStatistics: "dataNodeSegStatistics"
# old name: statsChannels: "statistic"
queryNodeStats: "query-node-stats"
# cmd for loadIndex, flush, etc...
......@@ -30,7 +32,8 @@ msgChannel:
masterSubNamePrefix: "master"
proxySubNamePrefix: "proxy"
queryNodeSubNamePrefix: "queryNode"
writeNodeSubNamePrefix: "writeNode"
writeNodeSubNamePrefix: "writeNode" # GOOSE TODO: remove this
dataNodeSubNamePrefix: "dataNode"
# default channel range [0, 1)
channelRange:
......
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
dataNode:
dataSync:
flowGraph:
maxQueueLength: 1024
maxParallelism: 1024
msgStream:
dataDefinition:
recvBufSize: 64 # msgPack chan buffer size
pulsarBufSize: 64 # pulsar chan buffer size
insert:
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
delete:
#streamBufSize: 1024 # msgPack chan buffer size
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
segStatistics:
recvBufSize: 64
publishInterval: 1000 # milliseconds
flush:
# max buffer size to flush
insertBufSize: 500
ddBufSize: 20
......@@ -13,7 +13,8 @@
nodeID: # will be deprecated after v0.2
proxyIDList: [0]
queryNodeIDList: [1, 2]
writeNodeIDList: [3]
writeNodeIDList: [3] # GOOSE TODO: remove this
dataNodeIDList: [3]
etcd:
address: localhost
......@@ -21,8 +22,10 @@ etcd:
rootPath: by-dev
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
writeNodeSegKvSubPath: writer/segment
writeNodeDDLKvSubPath: writer/ddl
segFlushMetaSubPath: writer/segment
ddlFlushMetaSubPath: writer/ddl
writeNodeSegKvSubPath: writer/segment # GOOSE TODO: remove this
writeNodeDDLKvSubPath: writer/ddl # GOOSE TODO: remove this
segThreshold: 10000
minio:
......
......@@ -15,6 +15,12 @@
extern "C" {
#endif
enum SegmentType {
Invalid = 0,
Growing = 1,
Sealed = 2,
};
enum ErrorCode {
Success = 0,
UnexpectedException = 1,
......
......@@ -28,7 +28,7 @@ extern "C" {
#include <stdint.h>
#include "segcore/collection_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CIndex;
typedef void* CIndexQueryResult;
......
This diff is collapsed.
This diff is collapsed.
......@@ -77,9 +77,9 @@ class SegmentGrowing : public SegmentInternalInterface {
get_deleted_count() const = 0;
};
using SegmentBasePtr = std::unique_ptr<SegmentGrowing>;
using SegmentGrowingPtr = std::unique_ptr<SegmentGrowing>;
SegmentBasePtr
SegmentGrowingPtr
CreateGrowingSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024);
} // namespace segcore
......
......@@ -9,12 +9,14 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <memory>
#include "SegmentInterface.h"
#include "common/LoadInfo.h"
namespace milvus::segcore {
class SegmentSealed {
class SegmentSealed : public SegmentInterface {
public:
virtual const Schema&
get_schema() = 0;
......@@ -26,4 +28,11 @@ class SegmentSealed {
LoadFieldData(const LoadFieldDataInfo& info) = 0;
};
using SegmentSealedPtr = std::unique_ptr<SegmentSealed>;
SegmentSealedPtr
CreateSealedSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024) {
return nullptr;
}
} // namespace milvus::segcore
......@@ -18,7 +18,7 @@ extern "C" {
#include <stdint.h>
#include "segcore/collection_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CLoadIndexInfo;
typedef void* CBinarySet;
......
......@@ -16,7 +16,7 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
#include "segcore/collection_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CPlan;
typedef void* CPlaceholderGroup;
......
......@@ -16,7 +16,7 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
#include "segcore/segment_c.h"
#include "common/status_c.h"
#include "common/type_c.h"
typedef void* CMarshaledHits;
......
......@@ -10,29 +10,42 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <cstring>
#include <cstdint>
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentSealed.h"
#include "segcore/Collection.h"
#include "segcore/segment_c.h"
#include "common/LoadInfo.h"
#include "common/type_c.h"
#include <knowhere/index/vector_index/VecIndex.h>
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include <cstdint>
#include <boost/concept_check.hpp>
#include "common/LoadInfo.h"
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id) {
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, int seg_type) {
auto col = (milvus::segcore::Collection*)collection;
auto segment = milvus::segcore::CreateGrowingSegment(col->get_schema());
std::unique_ptr<milvus::segcore::SegmentInterface> segment;
switch (seg_type) {
case Invalid:
std::cout << "invalid segment type" << std::endl;
break;
case Growing:
segment = milvus::segcore::CreateGrowingSegment(col->get_schema());
break;
case Sealed:
segment = milvus::segcore::CreateSealedSegment(col->get_schema());
break;
default:
std::cout << "invalid segment type" << std::endl;
}
std::cout << "create segment " << segment_id << std::endl;
return (void*)segment.release();
}
void
DeleteSegment(CSegmentBase segment) {
DeleteSegment(CSegmentInterface segment) {
auto s = (milvus::segcore::SegmentGrowing*)segment;
std::cout << "delete segment " << std::endl;
......@@ -48,7 +61,7 @@ DeleteQueryResult(CQueryResult query_result) {
//////////////////////////////////////////////////////////////////
CStatus
Insert(CSegmentBase c_segment,
Insert(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
......@@ -79,15 +92,18 @@ Insert(CSegmentBase c_segment,
}
int64_t
PreInsert(CSegmentBase c_segment, int64_t size) {
PreInsert(CSegmentInterface c_segment, int64_t size) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
return segment->PreInsert(size);
}
CStatus
Delete(
CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps) {
Delete(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
try {
......@@ -106,14 +122,15 @@ Delete(
}
int64_t
PreDelete(CSegmentBase c_segment, int64_t size) {
PreDelete(CSegmentInterface c_segment, int64_t size) {
// TODO: use dynamic cast, and return c status
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
return segment->PreDelete(size);
}
CStatus
Search(CSegmentBase c_segment,
Search(CSegmentInterface c_segment,
CPlan c_plan,
CPlaceholderGroup* c_placeholder_groups,
uint64_t* timestamps,
......@@ -153,7 +170,7 @@ Search(CSegmentBase c_segment,
}
CStatus
FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult c_result) {
FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult c_result) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto plan = (milvus::query::Plan*)c_plan;
auto result = (milvus::QueryResult*)c_result;
......@@ -171,7 +188,7 @@ FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult c_result) {
}
CStatus
UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info) {
UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info) {
auto status = CStatus();
try {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
......@@ -189,26 +206,26 @@ UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info) {
//////////////////////////////////////////////////////////////////
int
Close(CSegmentBase c_segment) {
Close(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto status = segment->Close();
return status.code();
}
int
BuildIndex(CCollection c_collection, CSegmentBase c_segment) {
BuildIndex(CCollection c_collection, CSegmentInterface c_segment) {
PanicInfo("unimplemented");
}
bool
IsOpened(CSegmentBase c_segment) {
IsOpened(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto status = segment->get_state();
return status == milvus::segcore::SegmentGrowing::SegmentState::Open;
}
int64_t
GetMemoryUsageInBytes(CSegmentBase c_segment) {
GetMemoryUsageInBytes(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto mem_size = segment->GetMemoryUsageInBytes();
return mem_size;
......@@ -217,14 +234,14 @@ GetMemoryUsageInBytes(CSegmentBase c_segment) {
//////////////////////////////////////////////////////////////////
int64_t
GetRowCount(CSegmentBase c_segment) {
GetRowCount(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto row_count = segment->get_row_count();
return row_count;
}
int64_t
GetDeletedCount(CSegmentBase c_segment) {
GetDeletedCount(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
auto deleted_count = segment->get_deleted_count();
return deleted_count;
......
......@@ -9,6 +9,8 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
......@@ -17,26 +19,27 @@ extern "C" {
#include <stdlib.h>
#include <stdint.h>
#include "common/type_c.h"
#include "segcore/plan_c.h"
#include "segcore/load_index_c.h"
#include "common/status_c.h"
typedef void* CSegmentBase;
typedef void* CSegmentInterface;
typedef void* CQueryResult;
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id);
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, int seg_type);
void
DeleteSegment(CSegmentBase segment);
DeleteSegment(CSegmentInterface segment);
void
DeleteQueryResult(CQueryResult query_result);
//////////////////////////////////////////////////////////////////
// interface for growing segment
CStatus
Insert(CSegmentBase c_segment,
Insert(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
......@@ -45,50 +48,65 @@ Insert(CSegmentBase c_segment,
int sizeof_per_row,
int64_t count);
// interface for growing segment
int64_t
PreInsert(CSegmentBase c_segment, int64_t size);
PreInsert(CSegmentInterface c_segment, int64_t size);
// interface for growing segment
CStatus
Delete(
CSegmentBase c_segment, int64_t reserved_offset, int64_t size, const int64_t* row_ids, const uint64_t* timestamps);
Delete(CSegmentInterface c_segment,
int64_t reserved_offset,
int64_t size,
const int64_t* row_ids,
const uint64_t* timestamps);
// interface for growing segment
int64_t
PreDelete(CSegmentBase c_segment, int64_t size);
PreDelete(CSegmentInterface c_segment, int64_t size);
// common interface
CStatus
Search(CSegmentBase c_segment,
Search(CSegmentInterface c_segment,
CPlan plan,
CPlaceholderGroup* placeholder_groups,
uint64_t* timestamps,
int num_groups,
CQueryResult* result);
// common interface
CStatus
FillTargetEntry(CSegmentBase c_segment, CPlan c_plan, CQueryResult result);
FillTargetEntry(CSegmentInterface c_segment, CPlan c_plan, CQueryResult result);
// deprecated
CStatus
UpdateSegmentIndex(CSegmentBase c_segment, CLoadIndexInfo c_load_index_info);
UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info);
//////////////////////////////////////////////////////////////////
// deprecated
int
Close(CSegmentBase c_segment);
Close(CSegmentInterface c_segment);
// deprecated
int
BuildIndex(CCollection c_collection, CSegmentBase c_segment);
BuildIndex(CCollection c_collection, CSegmentInterface c_segment);
// deprecated
bool
IsOpened(CSegmentBase c_segment);
IsOpened(CSegmentInterface c_segment);
// common interface
int64_t
GetMemoryUsageInBytes(CSegmentBase c_segment);
GetMemoryUsageInBytes(CSegmentInterface c_segment);
//////////////////////////////////////////////////////////////////
// common interface
int64_t
GetRowCount(CSegmentBase c_segment);
GetRowCount(CSegmentInterface c_segment);
// ???
int64_t
GetDeletedCount(CSegmentBase c_segment);
GetDeletedCount(CSegmentInterface c_segment);
#ifdef __cplusplus
}
......
......@@ -52,7 +52,7 @@ TEST(CApiTest, GetCollectionNameTest) {
TEST(CApiTest, SegmentTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
DeleteCollection(collection);
DeleteSegment(segment);
}
......@@ -60,7 +60,7 @@ TEST(CApiTest, SegmentTest) {
TEST(CApiTest, InsertTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
......@@ -95,7 +95,7 @@ TEST(CApiTest, InsertTest) {
TEST(CApiTest, DeleteTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
long delete_row_ids[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
......@@ -112,7 +112,7 @@ TEST(CApiTest, DeleteTest) {
TEST(CApiTest, SearchTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
......@@ -201,7 +201,7 @@ TEST(CApiTest, SearchTest) {
// TEST(CApiTest, BuildIndexTest) {
// auto schema_tmp_conf = "";
// auto collection = NewCollection(schema_tmp_conf);
// auto segment = NewSegment(collection, 0);
// auto segment = NewSegment(collection, 0, 1);
//
// std::vector<char> raw_data;
// std::vector<uint64_t> timestamps;
......@@ -285,7 +285,7 @@ TEST(CApiTest, SearchTest) {
TEST(CApiTest, IsOpenedTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto is_opened = IsOpened(segment);
assert(is_opened);
......@@ -297,7 +297,7 @@ TEST(CApiTest, IsOpenedTest) {
TEST(CApiTest, CloseTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto status = Close(segment);
assert(status == 0);
......@@ -309,7 +309,7 @@ TEST(CApiTest, CloseTest) {
TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto old_memory_usage_size = GetMemoryUsageInBytes(segment);
std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl;
......@@ -428,7 +428,7 @@ generate_index(
// TEST(CApiTest, TestSearchPreference) {
// auto schema_tmp_conf = "";
// auto collection = NewCollection(schema_tmp_conf);
// auto segment = NewSegment(collection, 0);
// auto segment = NewSegment(collection, 0, 1);
//
// auto beg = chrono::high_resolution_clock::now();
// auto next = beg;
......@@ -547,7 +547,7 @@ generate_index(
TEST(CApiTest, GetDeletedCountTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
long delete_row_ids[] = {100000, 100001, 100002};
unsigned long delete_timestamps[] = {0, 0, 0};
......@@ -568,7 +568,7 @@ TEST(CApiTest, GetDeletedCountTest) {
TEST(CApiTest, GetRowCountTest) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
int N = 10000;
auto [raw_data, timestamps, uids] = generate_data(N);
......@@ -592,7 +592,7 @@ TEST(CApiTest, GetRowCountTest) {
// "\u003e\ncreate_time: 1600416765\nsegment_ids: 6873737669791618215\npartition_tags: \"default\"\n";
//
// auto collection = NewCollection(schema_string.data());
// auto segment = NewSegment(collection, 0);
// auto segment = NewSegment(collection, 0, 1);
// DeleteCollection(collection);
// DeleteSegment(segment);
//}
......@@ -629,7 +629,7 @@ TEST(CApiTest, MergeInto) {
TEST(CApiTest, Reduce) {
auto schema_tmp_conf = "";
auto collection = NewCollection(schema_tmp_conf);
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
......@@ -845,7 +845,7 @@ TEST(CApiTest, UpdateSegmentIndex_Without_Predicate) {
std::string schema_string = generate_collection_shema("L2", "16", false);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
......@@ -970,7 +970,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Range) {
std::string schema_string = generate_collection_shema("L2", "16", false);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
......@@ -1108,7 +1108,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Term) {
std::string schema_string = generate_collection_shema("L2", "16", false);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
......@@ -1245,7 +1245,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Range) {
std::string schema_string = generate_collection_shema("JACCARD", "16", true);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
......@@ -1384,7 +1384,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) {
std::string schema_string = generate_collection_shema("JACCARD", "16", true);
auto collection = NewCollection(schema_string.c_str());
auto schema = ((segcore::Collection*)collection)->get_schema();
auto segment = NewSegment(collection, 0);
auto segment = NewSegment(collection, 0, 1);
auto N = 1000 * 1000;
auto dataset = DataGen(schema, N);
......
package writerclient
import (
"strconv"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
pb "github.com/zilliztech/milvus-distributed/internal/proto/writerpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Client struct {
kvClient kv.TxnBase // client of a reliable kv service, i.e. etcd client
kvPrefix string
flushStream msgstream.MsgStream
}
func NewWriterClient(etcdAddress string, kvRootPath string, writeNodeSegKvSubPath string, flushStream msgstream.MsgStream) (*Client, error) {
// init kv client
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
return nil, err
}
kvClient := etcdkv.NewEtcdKV(etcdClient, kvRootPath)
return &Client{
kvClient: kvClient,
kvPrefix: writeNodeSegKvSubPath,
flushStream: flushStream,
}, nil
}
type SegmentDescription struct {
SegmentID UniqueID
IsClosed bool
OpenTime Timestamp
CloseTime Timestamp
}
func (c *Client) FlushSegment(segmentID UniqueID, collectionID UniqueID, partitionTag string, timestamp Timestamp) error {
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
flushMsg := internalpb2.FlushMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kFlush,
Timestamp: timestamp,
},
SegmentID: segmentID,
CollectionID: collectionID,
PartitionTag: partitionTag,
}
fMsg := &msgstream.FlushMsg{
BaseMsg: baseMsg,
FlushMsg: flushMsg,
}
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, fMsg)
err := c.flushStream.Produce(&msgPack)
return err
}
func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error) {
// query etcd
ret := &SegmentDescription{
SegmentID: segmentID,
IsClosed: false,
}
key := c.kvPrefix + strconv.FormatInt(segmentID, 10)
etcdKV, ok := c.kvClient.(*etcdkv.EtcdKV)
if !ok {
return nil, errors.New("type assertion failed for etcd kv")
}
count, err := etcdKV.GetCount(key)
if err != nil {
return nil, err
}
if count <= 0 {
ret.IsClosed = false
return ret, nil
}
value, err := c.kvClient.Load(key)
if err != nil {
return ret, err
}
flushMeta := pb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, &flushMeta)
if err != nil {
return ret, err
}
ret.IsClosed = flushMeta.IsClosed
ret.OpenTime = flushMeta.OpenTime
ret.CloseTime = flushMeta.CloseTime
return ret, nil
}
func (c *Client) GetInsertBinlogPaths(segmentID UniqueID) (map[int64][]string, error) {
key := c.kvPrefix + strconv.FormatInt(segmentID, 10)
value, err := c.kvClient.Load(key)
if err != nil {
return nil, err
}
flushMeta := pb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, &flushMeta)
if err != nil {
return nil, err
}
ret := make(map[int64][]string)
for _, field := range flushMeta.Fields {
ret[field.FieldID] = field.BinlogPaths
}
return ret, nil
}
package datanode
import (
"log"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type Collection struct {
schema *schemapb.CollectionSchema
id UniqueID
}
func (c *Collection) Name() string {
return c.schema.Name
}
func (c *Collection) ID() UniqueID {
return c.id
}
func newCollection(collectionID UniqueID, schemaStr string) *Collection {
var schema schemapb.CollectionSchema
err := proto.UnmarshalText(schemaStr, &schema)
if err != nil {
log.Println(err)
return nil
}
var newCollection = &Collection{
schema: &schema,
id: collectionID,
}
return newCollection
}
package datanode
import (
"fmt"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/errors"
)
type collectionReplica interface {
// collection
getCollectionNum() int
addCollection(collectionID UniqueID, schemaBlob string) error
removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error)
getCollectionByName(collectionName string) (*Collection, error)
hasCollection(collectionID UniqueID) bool
getSegmentStatistics() // GOOSE TODO
}
type collectionReplicaImpl struct {
mu sync.RWMutex
collections []*Collection
}
//----------------------------------------------------------------------------------------------------- collection
func (colReplica *collectionReplicaImpl) getSegmentStatistics() {
// GOOSE TODO
}
func (colReplica *collectionReplicaImpl) getCollectionNum() int {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return len(colReplica.collections)
}
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schemaBlob string) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
var newCollection = newCollection(collectionID, schemaBlob)
colReplica.collections = append(colReplica.collections, newCollection)
fmt.Println("yyy, create collection: ", newCollection.Name())
return nil
}
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
fmt.Println("drop collection:", collectionID)
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
tmpCollections := make([]*Collection, 0)
for _, col := range colReplica.collections {
if col.ID() != collectionID {
tmpCollections = append(tmpCollections, col)
} else {
fmt.Println("yyy, drop collection name: ", col.Name())
}
}
colReplica.collections = tmpCollections
return nil
}
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, collection := range colReplica.collections {
if collection.ID() == collectionID {
return collection, nil
}
}
return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10))
}
func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName string) (*Collection, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, collection := range colReplica.collections {
if collection.Name() == collectionName {
return collection, nil
}
}
return nil, errors.New("Cannot found collection: " + collectionName)
}
func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, col := range colReplica.collections {
if col.ID() == collectionID {
return true
}
}
return false
}
package datanode
import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func newReplica() collectionReplica {
collections := make([]*Collection, 0)
var replica collectionReplica = &collectionReplicaImpl{
collections: collections,
}
return replica
}
func genTestCollectionMeta(collectionName string, collectionID UniqueID) *etcdpb.CollectionMeta {
fieldVec := schemapb.FieldSchema{
FieldID: UniqueID(100),
Name: "vec",
IsPrimaryKey: false,
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "metric_type",
Value: "L2",
},
},
}
fieldInt := schemapb.FieldSchema{
FieldID: UniqueID(101),
Name: "age",
IsPrimaryKey: false,
DataType: schemapb.DataType_INT32,
}
schema := schemapb.CollectionSchema{
Name: collectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
return &collectionMeta
}
func initTestMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) {
collectionMeta := genTestCollectionMeta(collectionName, collectionID)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
require.NotEqual(t, "", schemaBlob)
var err = replica.addCollection(collectionMeta.ID, schemaBlob)
require.NoError(t, err)
collection, err := replica.getCollectionByName(collectionName)
require.NoError(t, err)
assert.Equal(t, collection.Name(), collectionName)
assert.Equal(t, collection.ID(), collectionID)
assert.Equal(t, replica.getCollectionNum(), 1)
}
//----------------------------------------------------------------------------------------------------- collection
func TestCollectionReplica_getCollectionNum(t *testing.T) {
replica := newReplica()
initTestMeta(t, replica, "collection0", 0, 0)
assert.Equal(t, replica.getCollectionNum(), 1)
}
func TestCollectionReplica_addCollection(t *testing.T) {
replica := newReplica()
initTestMeta(t, replica, "collection0", 0, 0)
}
func TestCollectionReplica_removeCollection(t *testing.T) {
replica := newReplica()
initTestMeta(t, replica, "collection0", 0, 0)
assert.Equal(t, replica.getCollectionNum(), 1)
err := replica.removeCollection(0)
assert.NoError(t, err)
assert.Equal(t, replica.getCollectionNum(), 0)
}
func TestCollectionReplica_getCollectionByID(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestMeta(t, replica, collectionName, collectionID, 0)
targetCollection, err := replica.getCollectionByID(collectionID)
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.Name(), collectionName)
assert.Equal(t, targetCollection.ID(), collectionID)
}
func TestCollectionReplica_getCollectionByName(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestMeta(t, replica, collectionName, collectionID, 0)
targetCollection, err := replica.getCollectionByName(collectionName)
assert.NoError(t, err)
assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.Name(), collectionName)
assert.Equal(t, targetCollection.ID(), collectionID)
}
func TestCollectionReplica_hasCollection(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestMeta(t, replica, collectionName, collectionID, 0)
hasCollection := replica.hasCollection(collectionID)
assert.Equal(t, hasCollection, true)
hasCollection = replica.hasCollection(UniqueID(1))
assert.Equal(t, hasCollection, false)
}
func TestCollectionReplica_freeAll(t *testing.T) {
replica := newReplica()
collectionName := "collection0"
collectionID := UniqueID(0)
initTestMeta(t, replica, collectionName, collectionID, 0)
}
package datanode
import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)
func TestCollection_newCollection(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
collectionMeta := genTestCollectionMeta(collectionName, collectionID)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
assert.NotEqual(t, "", schemaBlob)
collection := newCollection(collectionMeta.ID, schemaBlob)
assert.Equal(t, collection.Name(), collectionName)
assert.Equal(t, collection.ID(), collectionID)
}
func TestCollection_deleteCollection(t *testing.T) {
collectionName := "collection0"
collectionID := UniqueID(0)
collectionMeta := genTestCollectionMeta(collectionName, collectionID)
schemaBlob := proto.MarshalTextString(collectionMeta.Schema)
assert.NotEqual(t, "", schemaBlob)
collection := newCollection(collectionMeta.ID, schemaBlob)
assert.Equal(t, collection.Name(), collectionName)
assert.Equal(t, collection.ID(), collectionID)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment