diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index 0ae708a19ecb9ababafe5fcdb6bd5f9d5eac529e..60bf5d9fff1f2a0aa47274becc742f778aaeb77a 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -36,14 +36,6 @@ services: networks: - milvus - jaeger: - image: jaegertracing/all-in-one:latest - ports: - - "6831:6831/udp" - - "16686:16686" - networks: - - milvus - networks: milvus: diff --git a/docker-compose.yml b/docker-compose.yml index 9f3599abb9a5b139323717b4e98e4a9d7e91b8f4..cba23befabc4c39314b179b3227a1d3570ee3c31 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -83,10 +83,5 @@ services: networks: - milvus - jaeger: - image: jaegertracing/all-in-one:latest - networks: - - milvus - networks: milvus: diff --git a/go.mod b/go.mod index bb426c8ba01c5a6bcfdd5778e54e5f1aba3f7a79..47afde2bff278667721c394a64396793788e92f2 100644 --- a/go.mod +++ b/go.mod @@ -4,17 +4,14 @@ go 1.15 require ( code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect - github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/apache/pulsar-client-go v0.1.1 - github.com/apache/thrift v0.13.0 - github.com/aws/aws-sdk-go v1.30.8 // indirect + github.com/aws/aws-sdk-go v1.30.8 github.com/coreos/etcd v3.3.25+incompatible // indirect - github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect + github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 github.com/frankban/quicktest v1.10.2 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/git-hooks/git-hooks v1.3.1 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect - github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 github.com/klauspost/compress v1.10.11 // indirect @@ -23,12 +20,12 @@ require ( github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/onsi/ginkgo v1.12.1 // indirect github.com/onsi/gomega v1.10.0 // indirect - github.com/opentracing/opentracing-go v1.2.0 + github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pierrec/lz4 v2.5.2+incompatible // indirect github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 // indirect github.com/pingcap/errors v0.11.4 // indirect github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 // indirect - github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48 // indirect + github.com/pivotal-golang/bytefmt v0.0.0-20200131002437-cf55d5288a48 github.com/prometheus/client_golang v1.5.1 // indirect github.com/prometheus/common v0.10.0 // indirect github.com/prometheus/procfs v0.1.3 // indirect @@ -38,9 +35,7 @@ require ( github.com/spf13/cast v1.3.0 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 - github.com/tikv/client-go v0.0.0-20200824032810-95774393107b // indirect - github.com/uber/jaeger-client-go v2.25.0+incompatible - github.com/uber/jaeger-lib v2.4.0+incompatible // indirect + github.com/tikv/client-go v0.0.0-20200824032810-95774393107b github.com/urfave/cli v1.22.5 // indirect github.com/yahoo/athenz v1.9.16 // indirect go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 @@ -55,7 +50,7 @@ require ( google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 // indirect google.golang.org/grpc v1.31.0 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect - gopkg.in/yaml.v2 v2.3.0 // indirect + gopkg.in/yaml.v2 v2.3.0 honnef.co/go/tools v0.0.1-2020.1.4 // indirect sigs.k8s.io/yaml v1.2.0 // indirect ) diff --git a/go.sum b/go.sum index 14c1fca608a146e91e855a8c730df2ad5684861a..eb4ef6b6a4059c712a42b92603ceb5d687117066 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,6 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/HdrHistogram/hdrhistogram-go v1.0.1 h1:GX8GAYDuhlFQnI2fRDHQhTlkHMz8bEn0jTI6LJU0mpw= -github.com/HdrHistogram/hdrhistogram-go v1.0.1/go.mod h1:BWJ+nMSHY3L41Zj7CA3uXnloDp7xxV0YvstAE7nKTaM= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= @@ -26,8 +24,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1C github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apache/pulsar-client-go v0.1.1 h1:v/kU+2ZCC6yFIcbZrFtWa9/nvVzVr18L+xYJUvZSxEQ= github.com/apache/pulsar-client-go v0.1.1/go.mod h1:mlxC65KL1BLhGO2bnT9zWMttVzR2czVPb27D477YpyU= -github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= -github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk= @@ -121,7 +117,6 @@ github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18h github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -348,7 +343,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/protocolbuffers/protobuf v3.14.0+incompatible h1:8r0H76h/Q/lEnFFY60AuM23NOnaDMi6bd7zuboSYM+o= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 h1:/NRJ5vAYoqz+7sG51ubIDHXeWO8DlTSrToPu6q11ziA= github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -409,12 +403,6 @@ github.com/tikv/client-go v0.0.0-20200824032810-95774393107b/go.mod h1:K0NcdVNrX github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/uber/jaeger-client-go v1.6.0 h1:3+zLlq+4npI5fg8IsgAje3YsP7TcEdNzJScyqFIzxEQ= -github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= -github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= -github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo= -github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ= -github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.2/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43/go.mod h1:iT03XoTwV7xq/+UGwKO3UbC1nNNlopQiY61beSdrtOA= github.com/unrolled/render v1.0.0 h1:XYtvhA3UkpB7PqkvhUFYmpKD55OudoIeygcfus4vcd4= diff --git a/internal/core/src/indexbuilder/IndexWrapper.cpp b/internal/core/src/indexbuilder/IndexWrapper.cpp index 5d95eabf3d97ba40fcb9ee646b480a9c5a280254..fcced635e0e508894a70995b68002310c7b8182f 100644 --- a/internal/core/src/indexbuilder/IndexWrapper.cpp +++ b/internal/core/src/indexbuilder/IndexWrapper.cpp @@ -55,7 +55,6 @@ IndexWrapper::parse_impl(const std::string& serialized_params_str, knowhere::Con } auto stoi_closure = [](const std::string& s) -> int { return std::stoi(s); }; - auto stof_closure = [](const std::string& s) -> int { return std::stof(s); }; /***************************** meta *******************************/ check_parameter<int>(conf, milvus::knowhere::meta::DIM, stoi_closure, std::nullopt); @@ -89,7 +88,7 @@ IndexWrapper::parse_impl(const std::string& serialized_params_str, knowhere::Con check_parameter<int>(conf, milvus::knowhere::IndexParams::edge_size, stoi_closure, std::nullopt); /************************** NGT Search Params *****************************/ - check_parameter<float>(conf, milvus::knowhere::IndexParams::epsilon, stof_closure, std::nullopt); + check_parameter<int>(conf, milvus::knowhere::IndexParams::epsilon, stoi_closure, std::nullopt); check_parameter<int>(conf, milvus::knowhere::IndexParams::max_search_edges, stoi_closure, std::nullopt); /************************** NGT_PANNG Params *****************************/ @@ -275,12 +274,6 @@ IndexWrapper::QueryWithParam(const knowhere::DatasetPtr& dataset, const char* se std::unique_ptr<IndexWrapper::QueryResult> IndexWrapper::QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Config& conf) { - auto load_raw_data_closure = [&]() { LoadRawData(); }; // hide this pointer - auto index_type = get_index_type(); - if (is_in_nm_list(index_type)) { - std::call_once(raw_data_loaded_, load_raw_data_closure); - } - auto res = index_->Query(dataset, conf, nullptr); auto ids = res->Get<int64_t*>(milvus::knowhere::meta::IDS); auto distances = res->Get<float*>(milvus::knowhere::meta::DISTANCE); @@ -298,19 +291,5 @@ IndexWrapper::QueryImpl(const knowhere::DatasetPtr& dataset, const knowhere::Con return std::move(query_res); } -void -IndexWrapper::LoadRawData() { - auto index_type = get_index_type(); - if (is_in_nm_list(index_type)) { - auto bs = index_->Serialize(config_); - auto bptr = std::make_shared<milvus::knowhere::Binary>(); - auto deleter = [&](uint8_t*) {}; // avoid repeated deconstruction - bptr->data = std::shared_ptr<uint8_t[]>(static_cast<uint8_t*>(raw_data_.data()), deleter); - bptr->size = raw_data_.size(); - bs.Append(RAW_DATA, bptr); - index_->Load(bs); - } -} - } // namespace indexbuilder } // namespace milvus diff --git a/internal/core/src/indexbuilder/IndexWrapper.h b/internal/core/src/indexbuilder/IndexWrapper.h index 16f2721712c655bff7b2e7d53a235e32ed1d6458..65c6f149febf89bd30521e0478ba4eb2782b8583 100644 --- a/internal/core/src/indexbuilder/IndexWrapper.h +++ b/internal/core/src/indexbuilder/IndexWrapper.h @@ -66,9 +66,6 @@ class IndexWrapper { void StoreRawData(const knowhere::DatasetPtr& dataset); - void - LoadRawData(); - template <typename T> void check_parameter(knowhere::Config& conf, @@ -95,7 +92,6 @@ class IndexWrapper { milvus::json index_config_; knowhere::Config config_; std::vector<uint8_t> raw_data_; - std::once_flag raw_data_loaded_; }; } // namespace indexbuilder diff --git a/internal/core/unittest/test_index_wrapper.cpp b/internal/core/unittest/test_index_wrapper.cpp index bd335951f8053029f720e368d7907cb0d65d451d..a885c837a096b9558da69ec74c73d2c9c019e510 100644 --- a/internal/core/unittest/test_index_wrapper.cpp +++ b/internal/core/unittest/test_index_wrapper.cpp @@ -11,8 +11,6 @@ #include <tuple> #include <map> -#include <limits> -#include <math.h> #include <gtest/gtest.h> #include <google/protobuf/text_format.h> @@ -43,16 +41,16 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IDMAP) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, K}, {milvus::knowhere::Metric::TYPE, metric_type}, {milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, 4}, }; } else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IVFPQ) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, K}, {milvus::knowhere::IndexParams::nlist, 100}, - {milvus::knowhere::IndexParams::nprobe, 4}, + // {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4}, {milvus::knowhere::IndexParams::nbits, 8}, {milvus::knowhere::Metric::TYPE, metric_type}, @@ -61,9 +59,9 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, K}, {milvus::knowhere::IndexParams::nlist, 100}, - {milvus::knowhere::IndexParams::nprobe, 4}, + // {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::Metric::TYPE, metric_type}, {milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, 4}, #ifdef MILVUS_GPU_VERSION @@ -73,9 +71,9 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, K}, {milvus::knowhere::IndexParams::nlist, 100}, - {milvus::knowhere::IndexParams::nprobe, 4}, + // {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::nbits, 8}, {milvus::knowhere::Metric::TYPE, metric_type}, {milvus::knowhere::INDEX_FILE_SLICE_SIZE_IN_MEGABYTE, 4}, @@ -86,9 +84,9 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, K}, {milvus::knowhere::IndexParams::nlist, 100}, - {milvus::knowhere::IndexParams::nprobe, 4}, + // {milvus::knowhere::IndexParams::nprobe, 4}, {milvus::knowhere::IndexParams::m, 4}, {milvus::knowhere::IndexParams::nbits, 8}, {milvus::knowhere::Metric::TYPE, metric_type}, @@ -97,14 +95,13 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, K}, {milvus::knowhere::Metric::TYPE, metric_type}, }; } else if (index_type == milvus::knowhere::IndexEnum::INDEX_NSG) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, {milvus::knowhere::IndexParams::nlist, 163}, - {milvus::knowhere::meta::TOPK, K}, {milvus::knowhere::IndexParams::nprobe, 8}, {milvus::knowhere::IndexParams::knng, 20}, {milvus::knowhere::IndexParams::search_length, 40}, @@ -130,14 +127,17 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh #endif } else if (index_type == milvus::knowhere::IndexEnum::INDEX_HNSW) { return milvus::knowhere::Config{ - {milvus::knowhere::meta::DIM, DIM}, {milvus::knowhere::meta::TOPK, K}, - {milvus::knowhere::IndexParams::M, 16}, {milvus::knowhere::IndexParams::efConstruction, 200}, - {milvus::knowhere::IndexParams::ef, 200}, {milvus::knowhere::Metric::TYPE, metric_type}, + {milvus::knowhere::meta::DIM, DIM}, + // {milvus::knowhere::meta::TOPK, 10}, + {milvus::knowhere::IndexParams::M, 16}, + {milvus::knowhere::IndexParams::efConstruction, 200}, + {milvus::knowhere::IndexParams::ef, 200}, + {milvus::knowhere::Metric::TYPE, metric_type}, }; } else if (index_type == milvus::knowhere::IndexEnum::INDEX_ANNOY) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, 10}, {milvus::knowhere::IndexParams::n_trees, 4}, {milvus::knowhere::IndexParams::search_k, 100}, {milvus::knowhere::Metric::TYPE, metric_type}, @@ -146,7 +146,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_RHNSWFlat) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, 10}, {milvus::knowhere::IndexParams::M, 16}, {milvus::knowhere::IndexParams::efConstruction, 200}, {milvus::knowhere::IndexParams::ef, 200}, @@ -156,7 +156,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_RHNSWPQ) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, 10}, {milvus::knowhere::IndexParams::M, 16}, {milvus::knowhere::IndexParams::efConstruction, 200}, {milvus::knowhere::IndexParams::ef, 200}, @@ -167,7 +167,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_RHNSWSQ) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, 10}, {milvus::knowhere::IndexParams::M, 16}, {milvus::knowhere::IndexParams::efConstruction, 200}, {milvus::knowhere::IndexParams::ef, 200}, @@ -177,7 +177,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_NGTPANNG) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, 10}, {milvus::knowhere::Metric::TYPE, metric_type}, {milvus::knowhere::IndexParams::edge_size, 10}, {milvus::knowhere::IndexParams::epsilon, 0.1}, @@ -189,7 +189,7 @@ generate_conf(const milvus::knowhere::IndexType& index_type, const milvus::knowh } else if (index_type == milvus::knowhere::IndexEnum::INDEX_NGTONNG) { return milvus::knowhere::Config{ {milvus::knowhere::meta::DIM, DIM}, - {milvus::knowhere::meta::TOPK, K}, + // {milvus::knowhere::meta::TOPK, 10}, {milvus::knowhere::Metric::TYPE, metric_type}, {milvus::knowhere::IndexParams::edge_size, 20}, {milvus::knowhere::IndexParams::epsilon, 0.1}, @@ -234,99 +234,6 @@ GenDataset(int64_t N, const milvus::knowhere::MetricType& metric_type, bool is_b return milvus::segcore::DataGen(schema, N); } } - -using QueryResultPtr = std::unique_ptr<milvus::indexbuilder::IndexWrapper::QueryResult>; -void -PrintQueryResult(const QueryResultPtr& result) { - auto nq = result->nq; - auto k = result->topk; - - std::stringstream ss_id; - std::stringstream ss_dist; - - for (auto i = 0; i < nq; i++) { - for (auto j = 0; j < k; ++j) { - ss_id << result->ids[i * k + j] << " "; - ss_dist << result->distances[i * k + j] << " "; - } - ss_id << std::endl; - ss_dist << std::endl; - } - std::cout << "id\n" << ss_id.str() << std::endl; - std::cout << "dist\n" << ss_dist.str() << std::endl; -} - -float -L2(const float* point_a, const float* point_b, int dim) { - float dis = 0; - for (auto i = 0; i < dim; i++) { - auto c_a = point_a[i]; - auto c_b = point_b[i]; - dis += pow(c_b - c_a, 2); - } - return dis; -} - -int hamming_weight(uint8_t n) { - int count=0; - while(n != 0){ - count += n&1; - n >>= 1; - } - return count; -} -float -Jaccard(const uint8_t* point_a, const uint8_t* point_b, int dim) { - float dis; - int len = dim / 8; - float intersection = 0; - float union_num = 0; - for (int i = 0; i < len; i++) { - intersection += hamming_weight(point_a[i] & point_b[i]); - union_num += hamming_weight(point_a[i] | point_b[i]); - } - dis = 1 - (intersection / union_num); - return dis; -} - -float -CountDistance(const void* point_a, - const void* point_b, - int dim, - const milvus::knowhere::MetricType& metric, - bool is_binary = false) { - if (point_a == nullptr || point_b == nullptr) { - return std::numeric_limits<float>::max(); - } - if (metric == milvus::knowhere::Metric::L2) { - return L2(static_cast<const float*>(point_a), static_cast<const float*>(point_b), dim); - } else if (metric == milvus::knowhere::Metric::JACCARD) { - return Jaccard(static_cast<const uint8_t*>(point_a), static_cast<const uint8_t*>(point_b), dim); - } else { - return std::numeric_limits<float>::max(); - } -} - -void -CheckDistances(const QueryResultPtr& result, - const milvus::knowhere::DatasetPtr& base_dataset, - const milvus::knowhere::DatasetPtr& query_dataset, - const milvus::knowhere::MetricType& metric, - const float threshold = 1.0e-5) { - auto base_vecs = base_dataset->Get<float*>(milvus::knowhere::meta::TENSOR); - auto query_vecs = query_dataset->Get<float*>(milvus::knowhere::meta::TENSOR); - auto dim = base_dataset->Get<int64_t>(milvus::knowhere::meta::DIM); - auto nq = result->nq; - auto k = result->topk; - for (auto i = 0; i < nq; i++) { - for (auto j = 0; j < k; ++j) { - auto dis = result->distances[i * k + j]; - auto id = result->ids[i * k + j]; - auto count_dis = CountDistance(query_vecs + i * dim, base_vecs + id * dim, dim, metric); - // assert(std::abs(dis - count_dis) < threshold); - } - } -} } // namespace using Param = std::pair<milvus::knowhere::IndexType, milvus::knowhere::MetricType>; @@ -340,26 +247,8 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> { metric_type = param.second; std::tie(type_params, index_params) = generate_params(index_type, metric_type); - std::map<std::string, bool> is_binary_map = { - {milvus::knowhere::IndexEnum::INDEX_FAISS_IDMAP, false}, - {milvus::knowhere::IndexEnum::INDEX_FAISS_IVFPQ, false}, - {milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, false}, - {milvus::knowhere::IndexEnum::INDEX_FAISS_IVFSQ8, false}, - {milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, true}, - {milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP, true}, -#ifdef MILVUS_SUPPORT_SPTAG - {milvus::knowhere::IndexEnum::INDEX_SPTAG_KDT_RNT, false}, - {milvus::knowhere::IndexEnum::INDEX_SPTAG_BKT_RNT, false}, -#endif - {milvus::knowhere::IndexEnum::INDEX_HNSW, false}, - {milvus::knowhere::IndexEnum::INDEX_ANNOY, false}, - {milvus::knowhere::IndexEnum::INDEX_RHNSWFlat, false}, - {milvus::knowhere::IndexEnum::INDEX_RHNSWPQ, false}, - {milvus::knowhere::IndexEnum::INDEX_RHNSWSQ, false}, - {milvus::knowhere::IndexEnum::INDEX_NGTPANNG, false}, - {milvus::knowhere::IndexEnum::INDEX_NGTONNG, false}, - {milvus::knowhere::IndexEnum::INDEX_NSG, false}, - }; + std::map<std::string, bool> is_binary_map = {{milvus::knowhere::IndexEnum::INDEX_FAISS_IVFPQ, false}, + {milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT, true}}; is_binary = is_binary_map[index_type]; @@ -373,13 +262,9 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> { if (!is_binary) { xb_data = dataset.get_col<float>(0); xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_data.data()); - xq_data = dataset.get_col<float>(0); - xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_data.data()); } else { xb_bin_data = dataset.get_col<uint8_t>(0); xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_bin_data.data()); - xq_bin_data = dataset.get_col<uint8_t>(0); - xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_bin_data.data()); } } @@ -397,9 +282,6 @@ class IndexWrapperTest : public ::testing::TestWithParam<Param> { std::vector<float> xb_data; std::vector<uint8_t> xb_bin_data; std::vector<milvus::knowhere::IDType> ids; - milvus::knowhere::DatasetPtr xq_dataset; - std::vector<float> xq_data; - std::vector<uint8_t> xq_bin_data; }; TEST(PQ, Build) { @@ -426,47 +308,6 @@ TEST(IVFFLATNM, Build) { ASSERT_NO_THROW(index->AddWithoutIds(xb_dataset, conf)); } -TEST(IVFFLATNM, Query) { - auto index_type = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT; - auto metric_type = milvus::knowhere::Metric::L2; - auto conf = generate_conf(index_type, metric_type); - auto index = milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_type); - auto dataset = GenDataset(NB, metric_type, false); - auto xb_data = dataset.get_col<float>(0); - auto xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_data.data()); - ASSERT_NO_THROW(index->Train(xb_dataset, conf)); - ASSERT_NO_THROW(index->AddWithoutIds(xb_dataset, conf)); - auto bs = index->Serialize(conf); - auto bptr = std::make_shared<milvus::knowhere::Binary>(); - bptr->data = std::shared_ptr<uint8_t[]>((uint8_t*)xb_data.data(), [&](uint8_t*) {}); - bptr->size = DIM * NB * sizeof(float); - bs.Append(RAW_DATA, bptr); - index->Load(bs); - auto xq_data = dataset.get_col<float>(0); - auto xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_data.data()); - auto result = index->Query(xq_dataset, conf, nullptr); -} - -TEST(NSG, Query) { - auto index_type = milvus::knowhere::IndexEnum::INDEX_NSG; - auto metric_type = milvus::knowhere::Metric::L2; - auto conf = generate_conf(index_type, metric_type); - auto index = milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_type); - auto dataset = GenDataset(NB, metric_type, false); - auto xb_data = dataset.get_col<float>(0); - auto xb_dataset = milvus::knowhere::GenDataset(NB, DIM, xb_data.data()); - index->BuildAll(xb_dataset, conf); - auto bs = index->Serialize(conf); - auto bptr = std::make_shared<milvus::knowhere::Binary>(); - bptr->data = std::shared_ptr<uint8_t[]>((uint8_t*)xb_data.data(), [&](uint8_t*) {}); - bptr->size = DIM * NB * sizeof(float); - bs.Append(RAW_DATA, bptr); - index->Load(bs); - auto xq_data = dataset.get_col<float>(0); - auto xq_dataset = milvus::knowhere::GenDataset(NQ, DIM, xq_data.data()); - auto result = index->Query(xq_dataset, conf, nullptr); -} - TEST(BINFLAT, Build) { auto index_type = milvus::knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT; auto metric_type = milvus::knowhere::Metric::JACCARD; @@ -644,7 +485,12 @@ TEST_P(IndexWrapperTest, Dim) { TEST_P(IndexWrapperTest, BuildWithoutIds) { auto index = std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str()); - ASSERT_NO_THROW(index->BuildWithoutIds(xb_dataset)); + + if (milvus::indexbuilder::is_in_need_id_list(index_type)) { + ASSERT_ANY_THROW(index->BuildWithoutIds(xb_dataset)); + } else { + ASSERT_NO_THROW(index->BuildWithoutIds(xb_dataset)); + } } TEST_P(IndexWrapperTest, Codec) { @@ -665,16 +511,3 @@ TEST_P(IndexWrapperTest, Codec) { ASSERT_EQ(strcmp(binary.data, copy_binary.data), 0); } } - -TEST_P(IndexWrapperTest, Query) { - auto index_wrapper = - std::make_unique<milvus::indexbuilder::IndexWrapper>(type_params_str.c_str(), index_params_str.c_str()); - - index_wrapper->BuildWithoutIds(xb_dataset); - - std::unique_ptr<milvus::indexbuilder::IndexWrapper::QueryResult> query_result = index_wrapper->Query(xq_dataset); - ASSERT_EQ(query_result->topk, K); - ASSERT_EQ(query_result->nq, NQ); - ASSERT_EQ(query_result->distances.size(), query_result->topk * query_result->nq); - ASSERT_EQ(query_result->ids.size(), query_result->topk * query_result->nq); -} diff --git a/internal/master/master.go b/internal/master/master.go index 12f6e53086c74854e747c5969376260de5130f41..6d9734e425590fb80e70050e1ac597fa848d586c 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -218,7 +218,6 @@ func CreateServer(ctx context.Context) (*Master, error) { m.grpcServer = grpc.NewServer() masterpb.RegisterMasterServer(m.grpcServer, m) - return m, nil } diff --git a/internal/master/master_test.go b/internal/master/master_test.go index a605e73aa76127c24918cdd3826fae0d0d186ad8..0a44ed90e886b55d6b7bd5bec9e2d1842041fd2c 100644 --- a/internal/master/master_test.go +++ b/internal/master/master_test.go @@ -110,7 +110,6 @@ func TestMaster(t *testing.T) { conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) require.Nil(t, err) - cli := masterpb.NewMasterClient(conn) t.Run("TestConfigTask", func(t *testing.T) { @@ -887,6 +886,12 @@ func TestMaster(t *testing.T) { var k2sMsgstream ms.MsgStream = k2sMs assert.True(t, receiveTimeTickMsg(&k2sMsgstream)) + conn, err := grpc.DialContext(ctx, Params.Address, grpc.WithInsecure(), grpc.WithBlock()) + assert.Nil(t, err) + defer conn.Close() + + cli := masterpb.NewMasterClient(conn) + sch := schemapb.CollectionSchema{ Name: "name" + strconv.FormatUint(rand.Uint64(), 10), Description: "test collection", diff --git a/internal/msgstream/msg.go b/internal/msgstream/msg.go index a71d1cabfe89745cfdfaec2d3411c4e3521c8b19..518bcfa7afe56a34cf86ff1464eb309a42560a9c 100644 --- a/internal/msgstream/msg.go +++ b/internal/msgstream/msg.go @@ -1,8 +1,6 @@ package msgstream import ( - "context" - "github.com/golang/protobuf/proto" internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) @@ -10,8 +8,6 @@ import ( type MsgType = internalPb.MsgType type TsMsg interface { - GetContext() context.Context - SetContext(context.Context) BeginTs() Timestamp EndTs() Timestamp Type() MsgType @@ -21,7 +17,6 @@ type TsMsg interface { } type BaseMsg struct { - ctx context.Context BeginTimestamp Timestamp EndTimestamp Timestamp HashValues []uint32 @@ -49,14 +44,6 @@ func (it *InsertMsg) Type() MsgType { return it.MsgType } -func (it *InsertMsg) GetContext() context.Context { - return it.ctx -} - -func (it *InsertMsg) SetContext(ctx context.Context) { - it.ctx = ctx -} - func (it *InsertMsg) Marshal(input TsMsg) ([]byte, error) { insertMsg := input.(*InsertMsg) insertRequest := &insertMsg.InsertRequest @@ -101,13 +88,6 @@ func (fl *FlushMsg) Type() MsgType { return fl.GetMsgType() } -func (fl *FlushMsg) GetContext() context.Context { - return fl.ctx -} -func (fl *FlushMsg) SetContext(ctx context.Context) { - fl.ctx = ctx -} - func (fl *FlushMsg) Marshal(input TsMsg) ([]byte, error) { flushMsgTask := input.(*FlushMsg) flushMsg := &flushMsgTask.FlushMsg @@ -141,14 +121,6 @@ func (dt *DeleteMsg) Type() MsgType { return dt.MsgType } -func (dt *DeleteMsg) GetContext() context.Context { - return dt.ctx -} - -func (dt *DeleteMsg) SetContext(ctx context.Context) { - dt.ctx = ctx -} - func (dt *DeleteMsg) Marshal(input TsMsg) ([]byte, error) { deleteTask := input.(*DeleteMsg) deleteRequest := &deleteTask.DeleteRequest @@ -193,14 +165,6 @@ func (st *SearchMsg) Type() MsgType { return st.MsgType } -func (st *SearchMsg) GetContext() context.Context { - return st.ctx -} - -func (st *SearchMsg) SetContext(ctx context.Context) { - st.ctx = ctx -} - func (st *SearchMsg) Marshal(input TsMsg) ([]byte, error) { searchTask := input.(*SearchMsg) searchRequest := &searchTask.SearchRequest @@ -234,14 +198,6 @@ func (srt *SearchResultMsg) Type() MsgType { return srt.MsgType } -func (srt *SearchResultMsg) GetContext() context.Context { - return srt.ctx -} - -func (srt *SearchResultMsg) SetContext(ctx context.Context) { - srt.ctx = ctx -} - func (srt *SearchResultMsg) Marshal(input TsMsg) ([]byte, error) { searchResultTask := input.(*SearchResultMsg) searchResultRequest := &searchResultTask.SearchResult @@ -275,14 +231,6 @@ func (tst *TimeTickMsg) Type() MsgType { return tst.MsgType } -func (tst *TimeTickMsg) GetContext() context.Context { - return tst.ctx -} - -func (tst *TimeTickMsg) SetContext(ctx context.Context) { - tst.ctx = ctx -} - func (tst *TimeTickMsg) Marshal(input TsMsg) ([]byte, error) { timeTickTask := input.(*TimeTickMsg) timeTick := &timeTickTask.TimeTickMsg @@ -316,14 +264,6 @@ func (qs *QueryNodeStatsMsg) Type() MsgType { return qs.MsgType } -func (qs *QueryNodeStatsMsg) GetContext() context.Context { - return qs.ctx -} - -func (qs *QueryNodeStatsMsg) SetContext(ctx context.Context) { - qs.ctx = ctx -} - func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) ([]byte, error) { queryNodeSegStatsTask := input.(*QueryNodeStatsMsg) queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats @@ -365,14 +305,6 @@ func (cc *CreateCollectionMsg) Type() MsgType { return cc.MsgType } -func (cc *CreateCollectionMsg) GetContext() context.Context { - return cc.ctx -} - -func (cc *CreateCollectionMsg) SetContext(ctx context.Context) { - cc.ctx = ctx -} - func (cc *CreateCollectionMsg) Marshal(input TsMsg) ([]byte, error) { createCollectionMsg := input.(*CreateCollectionMsg) createCollectionRequest := &createCollectionMsg.CreateCollectionRequest @@ -405,13 +337,6 @@ type DropCollectionMsg struct { func (dc *DropCollectionMsg) Type() MsgType { return dc.MsgType } -func (dc *DropCollectionMsg) GetContext() context.Context { - return dc.ctx -} - -func (dc *DropCollectionMsg) SetContext(ctx context.Context) { - dc.ctx = ctx -} func (dc *DropCollectionMsg) Marshal(input TsMsg) ([]byte, error) { dropCollectionMsg := input.(*DropCollectionMsg) @@ -436,18 +361,109 @@ func (dc *DropCollectionMsg) Unmarshal(input []byte) (TsMsg, error) { return dropCollectionMsg, nil } -/////////////////////////////////////////CreatePartition////////////////////////////////////////// -type CreatePartitionMsg struct { +/////////////////////////////////////////HasCollection////////////////////////////////////////// +type HasCollectionMsg struct { BaseMsg - internalPb.CreatePartitionRequest + internalPb.HasCollectionRequest } -func (cc *CreatePartitionMsg) GetContext() context.Context { - return cc.ctx +func (hc *HasCollectionMsg) Type() MsgType { + return hc.MsgType } -func (cc *CreatePartitionMsg) SetContext(ctx context.Context) { - cc.ctx = ctx +func (hc *HasCollectionMsg) Marshal(input TsMsg) ([]byte, error) { + hasCollectionMsg := input.(*HasCollectionMsg) + hasCollectionRequest := &hasCollectionMsg.HasCollectionRequest + mb, err := proto.Marshal(hasCollectionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (hc *HasCollectionMsg) Unmarshal(input []byte) (TsMsg, error) { + hasCollectionRequest := internalPb.HasCollectionRequest{} + err := proto.Unmarshal(input, &hasCollectionRequest) + if err != nil { + return nil, err + } + hasCollectionMsg := &HasCollectionMsg{HasCollectionRequest: hasCollectionRequest} + hasCollectionMsg.BeginTimestamp = hasCollectionMsg.Timestamp + hasCollectionMsg.EndTimestamp = hasCollectionMsg.Timestamp + + return hasCollectionMsg, nil +} + +/////////////////////////////////////////DescribeCollection////////////////////////////////////////// +type DescribeCollectionMsg struct { + BaseMsg + internalPb.DescribeCollectionRequest +} + +func (dc *DescribeCollectionMsg) Type() MsgType { + return dc.MsgType +} + +func (dc *DescribeCollectionMsg) Marshal(input TsMsg) ([]byte, error) { + describeCollectionMsg := input.(*DescribeCollectionMsg) + describeCollectionRequest := &describeCollectionMsg.DescribeCollectionRequest + mb, err := proto.Marshal(describeCollectionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (dc *DescribeCollectionMsg) Unmarshal(input []byte) (TsMsg, error) { + describeCollectionRequest := internalPb.DescribeCollectionRequest{} + err := proto.Unmarshal(input, &describeCollectionRequest) + if err != nil { + return nil, err + } + describeCollectionMsg := &DescribeCollectionMsg{DescribeCollectionRequest: describeCollectionRequest} + describeCollectionMsg.BeginTimestamp = describeCollectionMsg.Timestamp + describeCollectionMsg.EndTimestamp = describeCollectionMsg.Timestamp + + return describeCollectionMsg, nil +} + +/////////////////////////////////////////ShowCollection////////////////////////////////////////// +type ShowCollectionMsg struct { + BaseMsg + internalPb.ShowCollectionRequest +} + +func (sc *ShowCollectionMsg) Type() MsgType { + return sc.MsgType +} + +func (sc *ShowCollectionMsg) Marshal(input TsMsg) ([]byte, error) { + showCollectionMsg := input.(*ShowCollectionMsg) + showCollectionRequest := &showCollectionMsg.ShowCollectionRequest + mb, err := proto.Marshal(showCollectionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (sc *ShowCollectionMsg) Unmarshal(input []byte) (TsMsg, error) { + showCollectionRequest := internalPb.ShowCollectionRequest{} + err := proto.Unmarshal(input, &showCollectionRequest) + if err != nil { + return nil, err + } + showCollectionMsg := &ShowCollectionMsg{ShowCollectionRequest: showCollectionRequest} + showCollectionMsg.BeginTimestamp = showCollectionMsg.Timestamp + showCollectionMsg.EndTimestamp = showCollectionMsg.Timestamp + + return showCollectionMsg, nil +} + +/////////////////////////////////////////CreatePartition////////////////////////////////////////// +type CreatePartitionMsg struct { + BaseMsg + internalPb.CreatePartitionRequest } func (cc *CreatePartitionMsg) Type() MsgType { @@ -483,14 +499,6 @@ type DropPartitionMsg struct { internalPb.DropPartitionRequest } -func (dc *DropPartitionMsg) GetContext() context.Context { - return dc.ctx -} - -func (dc *DropPartitionMsg) SetContext(ctx context.Context) { - dc.ctx = ctx -} - func (dc *DropPartitionMsg) Type() MsgType { return dc.MsgType } @@ -518,6 +526,105 @@ func (dc *DropPartitionMsg) Unmarshal(input []byte) (TsMsg, error) { return dropPartitionMsg, nil } +/////////////////////////////////////////HasPartition////////////////////////////////////////// +type HasPartitionMsg struct { + BaseMsg + internalPb.HasPartitionRequest +} + +func (hc *HasPartitionMsg) Type() MsgType { + return hc.MsgType +} + +func (hc *HasPartitionMsg) Marshal(input TsMsg) ([]byte, error) { + hasPartitionMsg := input.(*HasPartitionMsg) + hasPartitionRequest := &hasPartitionMsg.HasPartitionRequest + mb, err := proto.Marshal(hasPartitionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (hc *HasPartitionMsg) Unmarshal(input []byte) (TsMsg, error) { + hasPartitionRequest := internalPb.HasPartitionRequest{} + err := proto.Unmarshal(input, &hasPartitionRequest) + if err != nil { + return nil, err + } + hasPartitionMsg := &HasPartitionMsg{HasPartitionRequest: hasPartitionRequest} + hasPartitionMsg.BeginTimestamp = hasPartitionMsg.Timestamp + hasPartitionMsg.EndTimestamp = hasPartitionMsg.Timestamp + + return hasPartitionMsg, nil +} + +/////////////////////////////////////////DescribePartition////////////////////////////////////////// +type DescribePartitionMsg struct { + BaseMsg + internalPb.DescribePartitionRequest +} + +func (dc *DescribePartitionMsg) Type() MsgType { + return dc.MsgType +} + +func (dc *DescribePartitionMsg) Marshal(input TsMsg) ([]byte, error) { + describePartitionMsg := input.(*DescribePartitionMsg) + describePartitionRequest := &describePartitionMsg.DescribePartitionRequest + mb, err := proto.Marshal(describePartitionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (dc *DescribePartitionMsg) Unmarshal(input []byte) (TsMsg, error) { + describePartitionRequest := internalPb.DescribePartitionRequest{} + err := proto.Unmarshal(input, &describePartitionRequest) + if err != nil { + return nil, err + } + describePartitionMsg := &DescribePartitionMsg{DescribePartitionRequest: describePartitionRequest} + describePartitionMsg.BeginTimestamp = describePartitionMsg.Timestamp + describePartitionMsg.EndTimestamp = describePartitionMsg.Timestamp + + return describePartitionMsg, nil +} + +/////////////////////////////////////////ShowPartition////////////////////////////////////////// +type ShowPartitionMsg struct { + BaseMsg + internalPb.ShowPartitionRequest +} + +func (sc *ShowPartitionMsg) Type() MsgType { + return sc.MsgType +} + +func (sc *ShowPartitionMsg) Marshal(input TsMsg) ([]byte, error) { + showPartitionMsg := input.(*ShowPartitionMsg) + showPartitionRequest := &showPartitionMsg.ShowPartitionRequest + mb, err := proto.Marshal(showPartitionRequest) + if err != nil { + return nil, err + } + return mb, nil +} + +func (sc *ShowPartitionMsg) Unmarshal(input []byte) (TsMsg, error) { + showPartitionRequest := internalPb.ShowPartitionRequest{} + err := proto.Unmarshal(input, &showPartitionRequest) + if err != nil { + return nil, err + } + showPartitionMsg := &ShowPartitionMsg{ShowPartitionRequest: showPartitionRequest} + showPartitionMsg.BeginTimestamp = showPartitionMsg.Timestamp + showPartitionMsg.EndTimestamp = showPartitionMsg.Timestamp + + return showPartitionMsg, nil +} + /////////////////////////////////////////LoadIndex////////////////////////////////////////// type LoadIndexMsg struct { BaseMsg @@ -528,14 +635,6 @@ func (lim *LoadIndexMsg) Type() MsgType { return lim.MsgType } -func (lim *LoadIndexMsg) GetContext() context.Context { - return lim.ctx -} - -func (lim *LoadIndexMsg) SetContext(ctx context.Context) { - lim.ctx = ctx -} - func (lim *LoadIndexMsg) Marshal(input TsMsg) ([]byte, error) { loadIndexMsg := input.(*LoadIndexMsg) loadIndexRequest := &loadIndexMsg.LoadIndex diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go index 969755feb322a3ac33619500a061dd9a0f984c00..37dd71c053441673334cbda1c60adac9fdc5fc5f 100644 --- a/internal/msgstream/msgstream.go +++ b/internal/msgstream/msgstream.go @@ -4,13 +4,9 @@ import ( "context" "log" "reflect" - "strings" "sync" "time" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - "github.com/apache/pulsar-client-go/pulsar" "github.com/golang/protobuf/proto" @@ -155,29 +151,6 @@ func (ms *PulsarMsgStream) Close() { } } -type propertiesReaderWriter struct { - ppMap map[string]string -} - -func (ppRW *propertiesReaderWriter) Set(key, val string) { - // The GRPC HPACK implementation rejects any uppercase keys here. - // - // As such, since the HTTP_HEADERS format is case-insensitive anyway, we - // blindly lowercase the key (which is guaranteed to work in the - // Inject/Extract sense per the OpenTracing spec). - key = strings.ToLower(key) - ppRW.ppMap[key] = val -} - -func (ppRW *propertiesReaderWriter) ForeachKey(handler func(key, val string) error) error { - for k, val := range ppRW.ppMap { - if err := handler(k, val); err != nil { - return err - } - } - return nil -} - func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { tsMsgs := msgPack.Msgs if len(tsMsgs) <= 0 { @@ -227,41 +200,12 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error { if err != nil { return err } - - msg := &pulsar.ProducerMessage{Payload: mb} - var child opentracing.Span - if v.Msgs[i].Type() == internalPb.MsgType_kInsert || v.Msgs[i].Type() == internalPb.MsgType_kSearch { - tracer := opentracing.GlobalTracer() - ctx := v.Msgs[i].GetContext() - if ctx == nil { - ctx = context.Background() - } - - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("start send pulsar msg", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("start send pulsar msg") - } - child.SetTag("hash keys", v.Msgs[i].HashKeys()) - child.SetTag("start time", v.Msgs[i].BeginTs()) - child.SetTag("end time", v.Msgs[i].EndTs()) - msg.Properties = make(map[string]string) - err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties}) - if err != nil { - return err - } - } - if _, err := (*ms.producers[k]).Send( context.Background(), - msg, + &pulsar.ProducerMessage{Payload: mb}, ); err != nil { return err } - if child != nil { - child.Finish() - } } } return nil @@ -274,34 +218,10 @@ func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error { if err != nil { return err } - msg := &pulsar.ProducerMessage{Payload: mb} - if v.Type() == internalPb.MsgType_kInsert || v.Type() == internalPb.MsgType_kSearch { - tracer := opentracing.GlobalTracer() - ctx := v.GetContext() - if ctx == nil { - ctx = context.Background() - } - var child opentracing.Span - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("start send pulsar msg", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("start send pulsar msg, start time: %d") - } - child.SetTag("hash keys", v.HashKeys()) - child.SetTag("start time", v.BeginTs()) - child.SetTag("end time", v.EndTs()) - msg.Properties = make(map[string]string) - err = tracer.Inject(child.Context(), opentracing.TextMap, &propertiesReaderWriter{msg.Properties}) - if err != nil { - return err - } - child.Finish() - } for i := 0; i < producerLen; i++ { if _, err := (*ms.producers[i]).Send( context.Background(), - msg, + &pulsar.ProducerMessage{Payload: mb}, ); err != nil { return err } @@ -338,7 +258,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { for { select { case <-ms.ctx.Done(): - log.Println("done") return default: tsMsgList := make([]TsMsg, 0) @@ -351,7 +270,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { } pulsarMsg, ok := value.Interface().(pulsar.ConsumerMessage) - if !ok { log.Printf("type assertion failed, not consumer message type") continue @@ -365,21 +283,6 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() { continue } tsMsg, err := ms.unmarshal.Unmarshal(pulsarMsg.Payload(), headerMsg.MsgType) - if tsMsg.Type() == internalPb.MsgType_kInsert || tsMsg.Type() == internalPb.MsgType_kSearch { - tracer := opentracing.GlobalTracer() - spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()}) - if err != nil { - log.Println("extract message err") - log.Println(err.Error()) - } - span := opentracing.StartSpan("pulsar msg received", - ext.RPCServerOption(spanContext)) - span.SetTag("hash keys", tsMsg.HashKeys()) - span.SetTag("start time", tsMsg.BeginTs()) - span.SetTag("end time", tsMsg.EndTs()) - tsMsg.SetContext(opentracing.ContextWithSpan(context.Background(), span)) - span.Finish() - } if err != nil { log.Printf("Failed to unmarshal tsMsg, error = %v", err) continue @@ -517,23 +420,6 @@ func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int, if err != nil { log.Printf("Failed to unmarshal, error = %v", err) } - - if tsMsg.Type() == internalPb.MsgType_kInsert || tsMsg.Type() == internalPb.MsgType_kSearch { - tracer := opentracing.GlobalTracer() - spanContext, err := tracer.Extract(opentracing.HTTPHeaders, &propertiesReaderWriter{pulsarMsg.Properties()}) - if err != nil { - log.Println("extract message err") - log.Println(err.Error()) - } - span := opentracing.StartSpan("pulsar msg received", - ext.RPCServerOption(spanContext)) - span.SetTag("hash keys", tsMsg.HashKeys()) - span.SetTag("start time", tsMsg.BeginTs()) - span.SetTag("end time", tsMsg.EndTs()) - tsMsg.SetContext(opentracing.ContextWithSpan(context.Background(), span)) - span.Finish() - } - if headerMsg.MsgType == internalPb.MsgType_kTimeTick { eofMsgMap[channelIndex] = tsMsg.(*TimeTickMsg).Timestamp return @@ -614,7 +500,7 @@ func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e result := make(map[int32]*MsgPack) for i, request := range tsMsgs { if request.Type() != internalPb.MsgType_kInsert { - return nil, errors.New("msg's must be Insert") + return nil, errors.New(string("msg's must be Insert")) } insertRequest := request.(*InsertMsg) keys := hashKeys[i] @@ -625,7 +511,7 @@ func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e keysLen := len(keys) if keysLen != timestampLen || keysLen != rowIDLen || keysLen != rowDataLen { - return nil, errors.New("the length of hashValue, timestamps, rowIDs, RowData are not equal") + return nil, errors.New(string("the length of hashValue, timestamps, rowIDs, RowData are not equal")) } for index, key := range keys { _, ok := result[key] @@ -648,9 +534,6 @@ func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e } insertMsg := &InsertMsg{ - BaseMsg: BaseMsg{ - ctx: request.GetContext(), - }, InsertRequest: sliceRequest, } result[key].Msgs = append(result[key].Msgs, insertMsg) @@ -663,7 +546,7 @@ func deleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e result := make(map[int32]*MsgPack) for i, request := range tsMsgs { if request.Type() != internalPb.MsgType_kDelete { - return nil, errors.New("msg's must be Delete") + return nil, errors.New(string("msg's must be Delete")) } deleteRequest := request.(*DeleteMsg) keys := hashKeys[i] @@ -673,7 +556,7 @@ func deleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, e keysLen := len(keys) if keysLen != timestampLen || keysLen != primaryKeysLen { - return nil, errors.New("the length of hashValue, timestamps, primaryKeys are not equal") + return nil, errors.New(string("the length of hashValue, timestamps, primaryKeys are not equal")) } for index, key := range keys { @@ -707,7 +590,7 @@ func defaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, for i, request := range tsMsgs { keys := hashKeys[i] if len(keys) != 1 { - return nil, errors.New("len(msg.hashValue) must equal 1") + return nil, errors.New(string("len(msg.hashValue) must equal 1")) } key := keys[0] _, ok := result[key] diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 6a312c2bc04169e53915820fd400c177c7f48811..f4232bc82234f07d54d713772e89d70f77b5586f 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -2,8 +2,6 @@ package proxy import ( "context" - "fmt" - "io" "log" "math/rand" "net" @@ -11,10 +9,6 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go" - "github.com/uber/jaeger-client-go/config" - "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/allocator" @@ -45,9 +39,6 @@ type Proxy struct { manipulationMsgStream *msgstream.PulsarMsgStream queryMsgStream *msgstream.PulsarMsgStream - tracer opentracing.Tracer - closer io.Closer - // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() @@ -60,28 +51,11 @@ func Init() { func CreateProxy(ctx context.Context) (*Proxy, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) - var err error p := &Proxy{ proxyLoopCtx: ctx1, proxyLoopCancel: cancel, } - cfg := &config.Configuration{ - ServiceName: "tracing", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - Reporter: &config.ReporterConfig{ - LogSpans: true, - }, - } - p.tracer, p.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger)) - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(p.tracer) - pulsarAddress := Params.PulsarAddress() p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, Params.MsgStreamSearchBufSize()) @@ -224,8 +198,6 @@ func (p *Proxy) stopProxyLoop() { p.tick.Close() p.proxyLoopWg.Wait() - - p.closer.Close() } // Close closes the server. diff --git a/internal/proxy/repack_func.go b/internal/proxy/repack_func.go index f8873fe12f27bcae72473bd9b5eb252a9eac1aee..44139999e0403719ca9eaf141f110980b808b6e1 100644 --- a/internal/proxy/repack_func.go +++ b/internal/proxy/repack_func.go @@ -182,7 +182,6 @@ func insertRepackFunc(tsMsgs []msgstream.TsMsg, insertMsg := &msgstream.InsertMsg{ InsertRequest: sliceRequest, } - insertMsg.SetContext(request.GetContext()) if together { // all rows with same hash value are accumulated to only one message if len(result[key].Msgs) <= 0 { result[key].Msgs = append(result[key].Msgs, insertMsg) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index d01c45f0632545feca63a477eec4df35de3cf85c..425cae75cfb3e4de24b55cf4a24cf3cc5aa55dbe 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -7,9 +7,6 @@ import ( "math" "strconv" - "github.com/opentracing/opentracing-go" - oplog "github.com/opentracing/opentracing-go/log" - "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/msgstream" @@ -77,21 +74,12 @@ func (it *InsertTask) Type() internalpb.MsgType { } func (it *InsertTask) PreExecute() error { - span := opentracing.StartSpan("InsertTask preExecute") - defer span.Finish() - it.ctx = opentracing.ContextWithSpan(it.ctx, span) - span.SetTag("hash keys", it.ReqID) - span.SetTag("start time", it.BeginTs()) collectionName := it.BaseInsertTask.CollectionName if err := ValidateCollectionName(collectionName); err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } partitionTag := it.BaseInsertTask.PartitionTag if err := ValidatePartitionTag(partitionTag, true); err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } @@ -99,36 +87,22 @@ func (it *InsertTask) PreExecute() error { } func (it *InsertTask) Execute() error { - span, ctx := opentracing.StartSpanFromContext(it.ctx, "InsertTask Execute") - defer span.Finish() - it.ctx = ctx - span.SetTag("hash keys", it.ReqID) - span.SetTag("start time", it.BeginTs()) collectionName := it.BaseInsertTask.CollectionName - span.LogFields(oplog.String("collection_name", collectionName)) if !globalMetaCache.Hit(collectionName) { err := globalMetaCache.Sync(collectionName) if err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } } description, err := globalMetaCache.Get(collectionName) if err != nil || description == nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } autoID := description.Schema.AutoID - span.LogFields(oplog.Bool("auto_id", autoID)) var rowIDBegin UniqueID var rowIDEnd UniqueID rowNums := len(it.BaseInsertTask.RowData) rowIDBegin, rowIDEnd, _ = it.rowIDAllocator.Alloc(uint32(rowNums)) - span.LogFields(oplog.Int("rowNums", rowNums), - oplog.Int("rowIDBegin", int(rowIDBegin)), - oplog.Int("rowIDEnd", int(rowIDEnd))) it.BaseInsertTask.RowIDs = make([]UniqueID, rowNums) for i := rowIDBegin; i < rowIDEnd; i++ { offset := i - rowIDBegin @@ -151,8 +125,6 @@ func (it *InsertTask) Execute() error { EndTs: it.EndTs(), Msgs: make([]msgstream.TsMsg, 1), } - tsMsg.SetContext(it.ctx) - span.LogFields(oplog.String("send msg", "send msg")) msgPack.Msgs[0] = tsMsg err = it.manipulationMsgStream.Produce(msgPack) @@ -166,14 +138,11 @@ func (it *InsertTask) Execute() error { if err != nil { it.result.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR it.result.Status.Reason = err.Error() - span.LogFields(oplog.Error(err)) } return nil } func (it *InsertTask) PostExecute() error { - span, _ := opentracing.StartSpanFromContext(it.ctx, "InsertTask postExecute") - defer span.Finish() return nil } @@ -383,38 +352,24 @@ func (qt *QueryTask) SetTs(ts Timestamp) { } func (qt *QueryTask) PreExecute() error { - span := opentracing.StartSpan("InsertTask preExecute") - defer span.Finish() - qt.ctx = opentracing.ContextWithSpan(qt.ctx, span) - span.SetTag("hash keys", qt.ReqID) - span.SetTag("start time", qt.BeginTs()) - collectionName := qt.query.CollectionName if !globalMetaCache.Hit(collectionName) { err := globalMetaCache.Sync(collectionName) if err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } } _, err := globalMetaCache.Get(collectionName) if err != nil { // err is not nil if collection not exists - span.LogFields(oplog.Error(err)) - span.Finish() return err } if err := ValidateCollectionName(qt.query.CollectionName); err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } for _, tag := range qt.query.PartitionTags { if err := ValidatePartitionTag(tag, false); err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } } @@ -424,8 +379,6 @@ func (qt *QueryTask) PreExecute() error { } queryBytes, err := proto.Marshal(qt.query) if err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() return err } qt.Query = &commonpb.Blob{ @@ -435,10 +388,6 @@ func (qt *QueryTask) PreExecute() error { } func (qt *QueryTask) Execute() error { - span, ctx := opentracing.StartSpanFromContext(qt.ctx, "InsertTask Execute") - defer span.Finish() - span.SetTag("hash keys", qt.ReqID) - span.SetTag("start time", qt.BeginTs()) var tsMsg msgstream.TsMsg = &msgstream.SearchMsg{ SearchRequest: qt.SearchRequest, BaseMsg: msgstream.BaseMsg{ @@ -452,28 +401,20 @@ func (qt *QueryTask) Execute() error { EndTs: qt.Timestamp, Msgs: make([]msgstream.TsMsg, 1), } - tsMsg.SetContext(ctx) msgPack.Msgs[0] = tsMsg err := qt.queryMsgStream.Produce(msgPack) log.Printf("[Proxy] length of searchMsg: %v", len(msgPack.Msgs)) if err != nil { - span.LogFields(oplog.Error(err)) - span.Finish() log.Printf("[Proxy] send search request failed: %v", err) } return err } func (qt *QueryTask) PostExecute() error { - span, _ := opentracing.StartSpanFromContext(qt.ctx, "InsertTask postExecute") - span.SetTag("hash keys", qt.ReqID) - span.SetTag("start time", qt.BeginTs()) for { select { case <-qt.ctx.Done(): log.Print("wait to finish failed, timeout!") - span.LogFields(oplog.String("wait to finish failed, timeout", "wait to finish failed, timeout")) - span.Finish() return errors.New("wait to finish failed, timeout") case searchResults := <-qt.resultBuf: filterSearchResult := make([]*internalpb.SearchResult, 0) @@ -494,8 +435,6 @@ func (qt *QueryTask) PostExecute() error { Reason: filterReason, }, } - span.LogFields(oplog.Error(errors.New(filterReason))) - span.Finish() return errors.New(filterReason) } @@ -526,7 +465,6 @@ func (qt *QueryTask) PostExecute() error { Reason: filterReason, }, } - span.Finish() return nil } @@ -538,7 +476,6 @@ func (qt *QueryTask) PostExecute() error { Reason: filterReason, }, } - span.Finish() return nil } @@ -589,13 +526,10 @@ func (qt *QueryTask) PostExecute() error { reducedHitsBs, err := proto.Marshal(reducedHits) if err != nil { log.Println("marshal error") - span.LogFields(oplog.Error(err)) - span.Finish() return err } qt.result.Hits = append(qt.result.Hits, reducedHitsBs) } - span.Finish() return nil } } @@ -703,10 +637,7 @@ func (dct *DescribeCollectionTask) PreExecute() error { func (dct *DescribeCollectionTask) Execute() error { var err error dct.result, err = dct.masterClient.DescribeCollection(dct.ctx, &dct.DescribeCollectionRequest) - if err != nil { - return err - } - err = globalMetaCache.Update(dct.CollectionName.CollectionName, dct.result) + globalMetaCache.Update(dct.CollectionName.CollectionName, dct.result) return err } diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 44a595caf5679969e4455b592e70cdb1f9f34012..5ed2caa0675b53c4aa9a895a6983cd3f98dc0057 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -48,6 +48,7 @@ func (dsService *dataSyncService) initNodes() { var insertNode Node = newInsertNode(dsService.replica) var serviceTimeNode Node = newServiceTimeNode(dsService.replica) + var gcNode Node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&ddStreamNode) @@ -57,6 +58,7 @@ func (dsService *dataSyncService) initNodes() { dsService.fg.AddNode(&insertNode) dsService.fg.AddNode(&serviceTimeNode) + dsService.fg.AddNode(&gcNode) // dmStreamNode var err = dsService.fg.SetEdges(dmStreamNode.Name(), @@ -106,9 +108,17 @@ func (dsService *dataSyncService) initNodes() { // serviceTimeNode err = dsService.fg.SetEdges(serviceTimeNode.Name(), []string{insertNode.Name()}, - []string{}, + []string{gcNode.Name()}, ) if err != nil { log.Fatal("set edges failed in node:", serviceTimeNode.Name()) } + + // gcNode + err = dsService.fg.SetEdges(gcNode.Name(), + []string{serviceTimeNode.Name()}, + []string{}) + if err != nil { + log.Fatal("set edges failed in node:", gcNode.Name()) + } } diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index f4a5e1136946450de3a25f206f64a2b6ef7e0ec1..a7a2ac73201ff48439090ad0ef5f58d9c1b38e1d 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -44,6 +44,11 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { }, } ddNode.ddMsg = &ddMsg + gcRecord := gcRecord{ + collections: make([]UniqueID, 0), + partitions: make([]partitionWithID, 0), + } + ddNode.ddMsg.gcRecord = &gcRecord // sort tsMessages tsMessages := msMsg.TsMessages() @@ -115,11 +120,11 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { collectionID := msg.CollectionID - err := ddNode.replica.removeCollection(collectionID) - if err != nil { - log.Println(err) - return - } + //err := ddNode.replica.removeCollection(collectionID) + //if err != nil { + // log.Println(err) + // return + //} collectionName := msg.CollectionName.CollectionName ddNode.ddMsg.collectionRecords[collectionName] = append(ddNode.ddMsg.collectionRecords[collectionName], @@ -127,6 +132,8 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { createOrDrop: false, timestamp: msg.Timestamp, }) + + ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID) } func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { @@ -150,17 +157,22 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { collectionID := msg.CollectionID partitionTag := msg.PartitionName.Tag - err := ddNode.replica.removePartition(collectionID, partitionTag) - if err != nil { - log.Println(err) - return - } + //err := ddNode.replica.removePartition(collectionID, partitionTag) + //if err != nil { + // log.Println(err) + // return + //} ddNode.ddMsg.partitionRecords[partitionTag] = append(ddNode.ddMsg.partitionRecords[partitionTag], metaOperateRecord{ createOrDrop: false, timestamp: msg.Timestamp, }) + + ddNode.ddMsg.gcRecord.partitions = append(ddNode.ddMsg.gcRecord.partitions, partitionWithID{ + partitionTag: partitionTag, + collectionID: collectionID, + }) } func newDDNode(replica collectionReplica) *ddNode { diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index ceddaeab0b95a1a1b1880def7689ee3278a20e3a..fbc8eedb5c82b00e868561b6e5971a9af3f78468 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -2,6 +2,7 @@ package querynode import ( "log" + "math" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -59,6 +60,7 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { } } + iMsg.gcRecord = ddMsg.gcRecord var res Msg = &iMsg return []*Msg{&res} } @@ -81,17 +83,35 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg log.Println("Error, misaligned messages detected") return nil } + tmpTimestamps := make([]Timestamp, 0) tmpRowIDs := make([]int64, 0) tmpRowData := make([]*commonpb.Blob, 0) - targetTimestamp := records[len(records)-1].timestamp + + // calculate valid time range + timeBegin := Timestamp(0) + timeEnd := Timestamp(math.MaxUint64) + for _, record := range records { + if record.createOrDrop && timeBegin < record.timestamp { + timeBegin = record.timestamp + } + if !record.createOrDrop && timeEnd > record.timestamp { + timeEnd = record.timestamp + } + } + for i, t := range msg.Timestamps { - if t >= targetTimestamp { + if t >= timeBegin && t <= timeEnd { tmpTimestamps = append(tmpTimestamps, t) tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i]) tmpRowData = append(tmpRowData, msg.RowData[i]) } } + + if len(tmpRowIDs) <= 0 { + return nil + } + msg.Timestamps = tmpTimestamps msg.RowIDs = tmpRowIDs msg.RowData = tmpRowData diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go new file mode 100644 index 0000000000000000000000000000000000000000..cd0a9b984e7cf991436f3a3195935133e45c4c9a --- /dev/null +++ b/internal/querynode/flow_graph_gc_node.go @@ -0,0 +1,61 @@ +package querynode + +import ( + "log" +) + +type gcNode struct { + BaseNode + replica collectionReplica +} + +func (gcNode *gcNode) Name() string { + return "gcNode" +} + +func (gcNode *gcNode) Operate(in []*Msg) []*Msg { + //fmt.Println("Do gcNode operation") + + if len(in) != 1 { + log.Println("Invalid operate message input in gcNode, input length = ", len(in)) + // TODO: add error handling + } + + gcMsg, ok := (*in[0]).(*gcMsg) + if !ok { + log.Println("type assertion failed for gcMsg") + // TODO: add error handling + } + + // drop collections + for _, collectionID := range gcMsg.gcRecord.collections { + err := gcNode.replica.removeCollection(collectionID) + if err != nil { + log.Println(err) + } + } + + // drop partitions + for _, partition := range gcMsg.gcRecord.partitions { + err := gcNode.replica.removePartition(partition.collectionID, partition.partitionTag) + if err != nil { + log.Println(err) + } + } + + return nil +} + +func newGCNode(replica collectionReplica) *gcNode { + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + baseNode := BaseNode{} + baseNode.SetMaxQueueLength(maxQueueLength) + baseNode.SetMaxParallelism(maxParallelism) + + return &gcNode{ + BaseNode: baseNode, + replica: replica, + } +} diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index f60369521967aea4714a6099959fdcbadecb5883..9a2c8ca1f11e34738dfbfae93eaeb8e715b70ef3 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -90,6 +90,7 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { wg.Wait() var res Msg = &serviceTimeMsg{ + gcRecord: iMsg.gcRecord, timeRange: iMsg.timeRange, } return []*Msg{&res} diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index 88a133fab34a84d21da9f635103c1d8f7466f5d0..451f9b6952ad003a3f687ee44550808cce6bc481 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -16,6 +16,7 @@ type key2SegMsg struct { type ddMsg struct { collectionRecords map[string][]metaOperateRecord partitionRecords map[string][]metaOperateRecord + gcRecord *gcRecord timeRange TimeRange } @@ -26,6 +27,7 @@ type metaOperateRecord struct { type insertMsg struct { insertMessages []*msgstream.InsertMsg + gcRecord *gcRecord timeRange TimeRange } @@ -35,6 +37,12 @@ type deleteMsg struct { } type serviceTimeMsg struct { + gcRecord *gcRecord + timeRange TimeRange +} + +type gcMsg struct { + gcRecord *gcRecord timeRange TimeRange } @@ -55,42 +63,39 @@ type DeletePreprocessData struct { count int32 } -func (ksMsg *key2SegMsg) TimeTick() Timestamp { - return ksMsg.timeRange.timestampMax +// TODO: replace partitionWithID by partition id +type partitionWithID struct { + partitionTag string + collectionID UniqueID } -func (ksMsg *key2SegMsg) DownStreamNodeIdx() int { - return 0 +type gcRecord struct { + // collections and partitions to be dropped + collections []UniqueID + // TODO: use partition id + partitions []partitionWithID } -func (suMsg *ddMsg) TimeTick() Timestamp { - return suMsg.timeRange.timestampMax +func (ksMsg *key2SegMsg) TimeTick() Timestamp { + return ksMsg.timeRange.timestampMax } -func (suMsg *ddMsg) DownStreamNodeIdx() int { - return 0 +func (suMsg *ddMsg) TimeTick() Timestamp { + return suMsg.timeRange.timestampMax } func (iMsg *insertMsg) TimeTick() Timestamp { return iMsg.timeRange.timestampMax } -func (iMsg *insertMsg) DownStreamNodeIdx() int { - return 0 -} - func (dMsg *deleteMsg) TimeTick() Timestamp { return dMsg.timeRange.timestampMax } -func (dMsg *deleteMsg) DownStreamNodeIdx() int { - return 0 -} - func (stMsg *serviceTimeMsg) TimeTick() Timestamp { return stMsg.timeRange.timestampMax } -func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int { - return 0 +func (gcMsg *gcMsg) TimeTick() Timestamp { + return gcMsg.timeRange.timestampMax } diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 38feb029c8af67745dbedca59af89ee25b9b9023..275666560d589c95f57b1f13ddefa90b5eb76ee6 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -30,7 +30,12 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { // update service time stNode.replica.getTSafe().set(serviceTimeMsg.timeRange.timestampMax) //fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax)) - return nil + + var res Msg = &gcMsg{ + gcRecord: serviceTimeMsg.gcRecord, + timeRange: serviceTimeMsg.timeRange, + } + return []*Msg{&res} } func newServiceTimeNode(replica collectionReplica) *serviceTimeNode { diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index b7891040e8208894b2c66f07582ddc26adc3d59e..7c4271b23be5e31373966c3b64acfc395285916f 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -1,12 +1,8 @@ package flowgraph import ( - "fmt" "log" - "github.com/opentracing/opentracing-go" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/msgstream" ) @@ -29,31 +25,10 @@ func (inNode *InputNode) InStream() *msgstream.MsgStream { } // empty input and return one *Msg -func (inNode *InputNode) Operate([]*Msg) []*Msg { +func (inNode *InputNode) Operate(in []*Msg) []*Msg { //fmt.Println("Do InputNode operation") - msgPack := (*inNode.inStream).Consume() - var childs []opentracing.Span - tracer := opentracing.GlobalTracer() - if tracer != nil && msgPack != nil { - for _, msg := range msgPack.Msgs { - if msg.Type() == internalpb.MsgType_kInsert || msg.Type() == internalpb.MsgType_kSearch { - var child opentracing.Span - ctx := msg.GetContext() - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()), - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs())) - } - child.SetTag("hash keys", msg.HashKeys()) - child.SetTag("start time", msg.BeginTs()) - child.SetTag("end time", msg.EndTs()) - msg.SetContext(opentracing.ContextWithSpan(ctx, child)) - childs = append(childs, child) - } - } - } + msgPack := (*inNode.inStream).Consume() // TODO: add status if msgPack == nil { @@ -67,10 +42,6 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg { timestampMax: msgPack.EndTs, } - for _, child := range childs { - child.Finish() - } - return []*Msg{&msgStreamMsg} } diff --git a/internal/util/flowgraph/message.go b/internal/util/flowgraph/message.go index e5c01d7d4ef92872f38abc45713cd68f06f5a6ee..f02d2604cbee807f791a9877cf969e3047fdfb61 100644 --- a/internal/util/flowgraph/message.go +++ b/internal/util/flowgraph/message.go @@ -4,7 +4,6 @@ import "github.com/zilliztech/milvus-distributed/internal/msgstream" type Msg interface { TimeTick() Timestamp - DownStreamNodeIdx() int } type MsgStreamMsg struct { diff --git a/internal/writenode/collection_replica.go b/internal/writenode/collection_replica.go index 2310802c8f3419bb1f1a58c8f23040f506f2e852..059ebbf814dd50597c0e6d92c278879e0669ba24 100644 --- a/internal/writenode/collection_replica.go +++ b/internal/writenode/collection_replica.go @@ -1,6 +1,7 @@ package writenode import ( + "fmt" "strconv" "sync" @@ -42,6 +43,7 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc } func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { + fmt.Println("drop collection:", collectionID) colReplica.mu.Lock() defer colReplica.mu.Unlock() diff --git a/internal/writenode/data_sync_service.go b/internal/writenode/data_sync_service.go index efcd62b0365dce4b6ca4e37588268471340c697b..659c474539beb561d38d353245db102a63f22791 100644 --- a/internal/writenode/data_sync_service.go +++ b/internal/writenode/data_sync_service.go @@ -50,6 +50,7 @@ func (dsService *dataSyncService) initNodes() { var ddNode Node = newDDNode(dsService.ctx, dsService.ddChan, dsService.replica) var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.insertChan, dsService.replica) + var gcNode Node = newGCNode(dsService.replica) dsService.fg.AddNode(&dmStreamNode) dsService.fg.AddNode(&ddStreamNode) @@ -58,6 +59,7 @@ func (dsService *dataSyncService) initNodes() { dsService.fg.AddNode(&ddNode) dsService.fg.AddNode(&insertBufferNode) + dsService.fg.AddNode(&gcNode) // dmStreamNode var err = dsService.fg.SetEdges(dmStreamNode.Name(), @@ -98,9 +100,17 @@ func (dsService *dataSyncService) initNodes() { // insertBufferNode err = dsService.fg.SetEdges(insertBufferNode.Name(), []string{filterDmNode.Name()}, - []string{}, + []string{gcNode.Name()}, ) if err != nil { log.Fatal("set edges failed in node:", insertBufferNode.Name()) } + + // gcNode + err = dsService.fg.SetEdges(gcNode.Name(), + []string{insertBufferNode.Name()}, + []string{}) + if err != nil { + log.Fatal("set edges failed in node:", gcNode.Name()) + } } diff --git a/internal/writenode/flow_graph_dd_node.go b/internal/writenode/flow_graph_dd_node.go index 9d3b4db158ed89f920a865ddd966ebe382cfc067..8bd71886ebabe7213c0ed29313027e9c3b31839a 100644 --- a/internal/writenode/flow_graph_dd_node.go +++ b/internal/writenode/flow_graph_dd_node.go @@ -91,6 +91,11 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { } ddNode.ddMsg = &ddMsg + gcRecord := gcRecord{ + collections: make([]UniqueID, 0), + } + ddNode.ddMsg.gcRecord = &gcRecord + // sort tsMessages tsMessages := msMsg.TsMessages() sort.Slice(tsMessages, @@ -259,10 +264,10 @@ func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { collectionID := msg.CollectionID - err := ddNode.replica.removeCollection(collectionID) - if err != nil { - log.Println(err) - } + //err := ddNode.replica.removeCollection(collectionID) + //if err != nil { + // log.Println(err) + //} // remove collection if _, ok := ddNode.ddRecords.collectionRecords[collectionID]; !ok { @@ -291,6 +296,8 @@ func (ddNode *ddNode) dropCollection(msg *msgstream.DropCollectionMsg) { ddNode.ddBuffer.ddData[collectionID].ddRequestString = append(ddNode.ddBuffer.ddData[collectionID].ddRequestString, msg.DropCollectionRequest.String()) ddNode.ddBuffer.ddData[collectionID].timestamps = append(ddNode.ddBuffer.ddData[collectionID].timestamps, msg.Timestamp) ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropCollectionEventType) + + ddNode.ddMsg.gcRecord.collections = append(ddNode.ddMsg.gcRecord.collections, collectionID) } func (ddNode *ddNode) createPartition(msg *msgstream.CreatePartitionMsg) { diff --git a/internal/writenode/flow_graph_filter_dm_node.go b/internal/writenode/flow_graph_filter_dm_node.go index 0bca67ebcb4ec4723454c083331822f617275990..48ac781ddc4be96502fa79bbe5509e4a41ff9d46 100644 --- a/internal/writenode/flow_graph_filter_dm_node.go +++ b/internal/writenode/flow_graph_filter_dm_node.go @@ -1,10 +1,8 @@ package writenode import ( - "context" "log" - - "github.com/opentracing/opentracing-go" + "math" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -34,34 +32,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - var childs []opentracing.Span - tracer := opentracing.GlobalTracer() - if tracer != nil { - for _, msg := range msgStreamMsg.TsMessages() { - if msg.Type() == internalPb.MsgType_kInsert { - var child opentracing.Span - ctx := msg.GetContext() - if parent := opentracing.SpanFromContext(ctx); parent != nil { - child = tracer.StartSpan("pass filter node", - opentracing.FollowsFrom(parent.Context())) - } else { - child = tracer.StartSpan("pass filter node") - } - child.SetTag("hash keys", msg.HashKeys()) - child.SetTag("start time", msg.BeginTs()) - child.SetTag("end time", msg.EndTs()) - msg.SetContext(opentracing.ContextWithSpan(ctx, child)) - childs = append(childs, child) - } - } - } - ddMsg, ok := (*in[1]).(*ddMsg) if !ok { log.Println("type assertion failed for ddMsg") // TODO: add error handling } - fdmNode.ddMsg = ddMsg var iMsg = insertMsg{ @@ -82,20 +57,11 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { } } - for key, msg := range msgStreamMsg.TsMessages() { + for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case internalPb.MsgType_kInsert: - var ctx2 context.Context - if childs != nil { - if childs[key] != nil { - ctx2 = opentracing.ContextWithSpan(msg.GetContext(), childs[key]) - } else { - ctx2 = context.Background() - } - } resMsg := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) if resMsg != nil { - resMsg.SetContext(ctx2) iMsg.insertMessages = append(iMsg.insertMessages, resMsg) } // case internalPb.MsgType_kDelete: @@ -104,11 +70,9 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg { log.Println("Non supporting message type:", msg.Type()) } } - var res Msg = &iMsg - for _, child := range childs { - child.Finish() - } + iMsg.gcRecord = ddMsg.gcRecord + var res Msg = &iMsg return []*Msg{&res} } @@ -133,14 +97,31 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg tmpTimestamps := make([]Timestamp, 0) tmpRowIDs := make([]int64, 0) tmpRowData := make([]*commonpb.Blob, 0) - targetTimestamp := records[len(records)-1].timestamp + + // calculate valid time range + timeBegin := Timestamp(0) + timeEnd := Timestamp(math.MaxUint64) + for _, record := range records { + if record.createOrDrop && timeBegin < record.timestamp { + timeBegin = record.timestamp + } + if !record.createOrDrop && timeEnd > record.timestamp { + timeEnd = record.timestamp + } + } + for i, t := range msg.Timestamps { - if t >= targetTimestamp { + if t >= timeBegin && t <= timeEnd { tmpTimestamps = append(tmpTimestamps, t) tmpRowIDs = append(tmpRowIDs, msg.RowIDs[i]) tmpRowData = append(tmpRowData, msg.RowData[i]) } } + + if len(tmpRowIDs) <= 0 { + return nil + } + msg.Timestamps = tmpTimestamps msg.RowIDs = tmpRowIDs msg.RowData = tmpRowData diff --git a/internal/writenode/flow_graph_gc_node.go b/internal/writenode/flow_graph_gc_node.go new file mode 100644 index 0000000000000000000000000000000000000000..8784745b21bd86c60628843907d60dfeec2d22e2 --- /dev/null +++ b/internal/writenode/flow_graph_gc_node.go @@ -0,0 +1,53 @@ +package writenode + +import ( + "log" +) + +type gcNode struct { + BaseNode + replica collectionReplica +} + +func (gcNode *gcNode) Name() string { + return "gcNode" +} + +func (gcNode *gcNode) Operate(in []*Msg) []*Msg { + //fmt.Println("Do gcNode operation") + + if len(in) != 1 { + log.Println("Invalid operate message input in gcNode, input length = ", len(in)) + // TODO: add error handling + } + + gcMsg, ok := (*in[0]).(*gcMsg) + if !ok { + log.Println("type assertion failed for gcMsg") + // TODO: add error handling + } + + // drop collections + for _, collectionID := range gcMsg.gcRecord.collections { + err := gcNode.replica.removeCollection(collectionID) + if err != nil { + log.Println(err) + } + } + + return nil +} + +func newGCNode(replica collectionReplica) *gcNode { + maxQueueLength := Params.FlowGraphMaxQueueLength + maxParallelism := Params.FlowGraphMaxParallelism + + baseNode := BaseNode{} + baseNode.SetMaxQueueLength(maxQueueLength) + baseNode.SetMaxParallelism(maxParallelism) + + return &gcNode{ + BaseNode: baseNode, + replica: replica, + } +} diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go index 189a25bc60a077fd6acc88adb0f885f96ecbba82..084be8c05ffb72b903265874e2c4b03c8980f1a8 100644 --- a/internal/writenode/flow_graph_insert_buffer_node.go +++ b/internal/writenode/flow_graph_insert_buffer_node.go @@ -4,15 +4,11 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "log" "path" "strconv" "unsafe" - "github.com/opentracing/opentracing-go" - oplog "github.com/opentracing/opentracing-go/log" - "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/kv" miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio" @@ -100,23 +96,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // iMsg is insertMsg // 1. iMsg -> buffer for _, msg := range iMsg.insertMessages { - ctx := msg.GetContext() - var span opentracing.Span - if ctx != nil { - span, _ = opentracing.StartSpanFromContext(ctx, fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs())) - } else { - span = opentracing.StartSpan(fmt.Sprintf("insert buffer node, start time = %d", msg.BeginTs())) - } - span.SetTag("hash keys", msg.HashKeys()) - span.SetTag("start time", msg.BeginTs()) - span.SetTag("end time", msg.EndTs()) if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) { log.Println("Error: misaligned messages detected") continue } currentSegID := msg.GetSegmentID() collectionName := msg.GetCollectionName() - span.LogFields(oplog.Int("segment id", int(currentSegID))) idata, ok := ibNode.insertBuffer.insertData[currentSegID] if !ok { @@ -125,21 +110,6 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { } } - // Timestamps - _, ok = idata.Data[1].(*storage.Int64FieldData) - if !ok { - idata.Data[1] = &storage.Int64FieldData{ - Data: []int64{}, - NumRows: 0, - } - } - tsData := idata.Data[1].(*storage.Int64FieldData) - for _, ts := range msg.Timestamps { - tsData.Data = append(tsData.Data, int64(ts)) - } - tsData.NumRows += len(msg.Timestamps) - span.LogFields(oplog.Int("tsData numRows", tsData.NumRows)) - // 1.1 Get CollectionMeta from etcd collection, err := ibNode.replica.getCollectionByName(collectionName) //collSchema, err := ibNode.getCollectionSchemaByName(collectionName) @@ -388,11 +358,9 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { // 1.3 store in buffer ibNode.insertBuffer.insertData[currentSegID] = idata - span.LogFields(oplog.String("store in buffer", "store in buffer")) // 1.4 if full // 1.4.1 generate binlogs - span.LogFields(oplog.String("generate binlogs", "generate binlogs")) if ibNode.insertBuffer.full(currentSegID) { log.Printf(". Insert Buffer full, auto flushing (%v) rows of data...", ibNode.insertBuffer.size(currentSegID)) // partitionTag -> partitionID @@ -461,7 +429,6 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ibNode.outCh <- inBinlogMsg } } - span.Finish() } if len(iMsg.insertMessages) > 0 { @@ -572,7 +539,12 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { log.Printf("Error: send hard time tick into pulsar channel failed, %s\n", err.Error()) } - return nil + var res Msg = &gcMsg{ + gcRecord: iMsg.gcRecord, + timeRange: iMsg.timeRange, + } + + return []*Msg{&res} } func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) { diff --git a/internal/writenode/flow_graph_message.go b/internal/writenode/flow_graph_message.go index 147822e7c5035b8574817dc5911c845727a92078..5fc24e37ec10ee647c750ab893d69f793bc21de9 100644 --- a/internal/writenode/flow_graph_message.go +++ b/internal/writenode/flow_graph_message.go @@ -22,6 +22,7 @@ type ( // TODO: use partition id partitionRecords map[string][]metaOperateRecord flushMessages []*msgstream.FlushMsg + gcRecord *gcRecord timeRange TimeRange } @@ -33,6 +34,7 @@ type ( insertMsg struct { insertMessages []*msgstream.InsertMsg flushMessages []*msgstream.FlushMsg + gcRecord *gcRecord timeRange TimeRange } @@ -40,36 +42,33 @@ type ( deleteMessages []*msgstream.DeleteMsg timeRange TimeRange } + + gcMsg struct { + gcRecord *gcRecord + timeRange TimeRange + } + + gcRecord struct { + collections []UniqueID + } ) func (ksMsg *key2SegMsg) TimeTick() Timestamp { return ksMsg.timeRange.timestampMax } -func (ksMsg *key2SegMsg) DownStreamNodeIdx() int { - return 0 -} - func (suMsg *ddMsg) TimeTick() Timestamp { return suMsg.timeRange.timestampMax } -func (suMsg *ddMsg) DownStreamNodeIdx() int { - return 0 -} - func (iMsg *insertMsg) TimeTick() Timestamp { return iMsg.timeRange.timestampMax } -func (iMsg *insertMsg) DownStreamNodeIdx() int { - return 0 -} - func (dMsg *deleteMsg) TimeTick() Timestamp { return dMsg.timeRange.timestampMax } -func (dMsg *deleteMsg) DownStreamNodeIdx() int { - return 0 +func (gcMsg *gcMsg) TimeTick() Timestamp { + return gcMsg.timeRange.timestampMax } diff --git a/internal/writenode/write_node.go b/internal/writenode/write_node.go index 319c13e4593c580af568833f3ae24143af4e8878..d3ce6f84c70ac40d1fd52fc3a9f13df7d295d8ed 100644 --- a/internal/writenode/write_node.go +++ b/internal/writenode/write_node.go @@ -2,12 +2,6 @@ package writenode import ( "context" - "fmt" - "io" - - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go" - "github.com/uber/jaeger-client-go/config" ) type WriteNode struct { @@ -17,8 +11,6 @@ type WriteNode struct { flushSyncService *flushSyncService metaService *metaService replica collectionReplica - tracer opentracing.Tracer - closer io.Closer } func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode { @@ -46,22 +38,6 @@ func Init() { } func (node *WriteNode) Start() error { - cfg := &config.Configuration{ - ServiceName: "tracing", - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - Reporter: &config.ReporterConfig{ - LogSpans: true, - }, - } - var err error - node.tracer, node.closer, err = cfg.NewTracer(config.Logger(jaeger.StdLogger)) - if err != nil { - panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) - } - opentracing.SetGlobalTracer(node.tracer) // TODO GOOSE Init Size?? chanSize := 100