diff --git a/.jenkins/check/config/filter_cpplint.txt b/.jenkins/check/config/filter_cpplint.txt index 4998f1fe12e365e6aad2d274230e6862ed734dd0..7f1a69f00e21f4753df23fdb7bfa69b4b0985996 100644 --- a/.jenkins/check/config/filter_cpplint.txt +++ b/.jenkins/check/config/filter_cpplint.txt @@ -108,6 +108,10 @@ "models/official/cv/psenet/infer/mxbase/src/PsenetDetection.h" "runtime/references" "models/official/cv/psenet/infer/mxbase/src/PsenetDetection.cpp" "runtime/references" +"models/research/recommend/autodis/infer/mxbase/src/main.cpp" "runtime/references" +"models/research/recommend/autodis/infer/mxbase/src/Autodis.h" "runtime/references" +"models/research/recommend/autodis/infer/mxbase/src/Autodis.cpp" "runtime/references" + "models/research/cv/textfusenet/infer/mxbase/src/Textfusenet.h" "runtime/references" "models/research/cv/textfusenet/infer/mxbase/src/Textfusenet.cpp" "runtime/references" "models/research/cv/textfusenet/infer/mxbase/src/PostProcess/TextfusenetMindsporePost.h" "runtime/references" diff --git a/research/recommend/autodis/infer/convert/convert.sh b/research/recommend/autodis/infer/convert/convert.sh new file mode 100644 index 0000000000000000000000000000000000000000..95f8d4f31b839f1437e49b43d55a47067d09e320 --- /dev/null +++ b/research/recommend/autodis/infer/convert/convert.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +if [ $# -ne 2 ] +then + echo "Need two parameters: one for air model input file path, another for om model output dir path!" + exit 1 +fi + +model=$1 +output=$2 + +atc --model="${model}" \ + --framework=1 \ + --output="${output}" \ + --soc_version=Ascend310 \ + --input_shape="batch_ids:1,39;batch_ids:1,39;labels:1,1" \ diff --git a/research/recommend/autodis/infer/data/config/autodis.pipeline b/research/recommend/autodis/infer/data/config/autodis.pipeline new file mode 100644 index 0000000000000000000000000000000000000000..e5bea67a5ee0af4a24aa70bb06185d50d1556acf --- /dev/null +++ b/research/recommend/autodis/infer/data/config/autodis.pipeline @@ -0,0 +1,49 @@ +{ + "autodis": { + "stream_config": { + "deviceId": "0" + }, + "appsrc0": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:0" + }, + "appsrc1": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:1" + }, + "appsrc2": { + "props": { + "blocksize": "409600" + }, + "factory": "appsrc", + "next": "mxpi_tensorinfer0:2" + }, + "mxpi_tensorinfer0": { + "props": { + "dataSource":"appsrc0,appsrc1,appsrc2", + "modelPath": "../data/model/autodis.om" + }, + "factory": "mxpi_tensorinfer", + "next": "mxpi_dataserialize0" + }, + "mxpi_dataserialize0": { + "props": { + "outputDataKeys": "mxpi_tensorinfer0" + }, + "factory": "mxpi_dataserialize", + "next": "appsink0" + }, + "appsink0": { + "props": { + "blocksize": "4096000" + }, + "factory": "appsink" + } + } +} diff --git a/research/recommend/autodis/infer/docker_start_infer.sh b/research/recommend/autodis/infer/docker_start_infer.sh new file mode 100644 index 0000000000000000000000000000000000000000..64cf90a2311bdfb21d68a4e90e08602670fdf632 --- /dev/null +++ b/research/recommend/autodis/infer/docker_start_infer.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +docker_image=$1 +data_dir=$2 + +function show_help() { + echo "Usage: docker_start.sh docker_image data_dir" +} + +function param_check() { + if [ -z "${docker_image}" ]; then + echo "please input docker_image" + show_help + exit 1 + fi + + if [ -z "${data_dir}" ]; then + echo "please input data_dir" + show_help + exit 1 + fi +} + +param_check + +docker run -it \ + --device=/dev/davinci0 \ + --device=/dev/davinci_manager \ + --device=/dev/devmm_svm \ + --device=/dev/hisi_hdc \ + -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \ + -v ${data_dir}:${data_dir} \ + ${docker_image} \ + /bin/bash diff --git a/research/recommend/autodis/infer/mxbase/CMakeLists.txt b/research/recommend/autodis/infer/mxbase/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..8ad5abc911ea305a99b1d8b7bcae84f7c5f2351c --- /dev/null +++ b/research/recommend/autodis/infer/mxbase/CMakeLists.txt @@ -0,0 +1,56 @@ +cmake_minimum_required(VERSION 3.10.0) +project(autodis) + +set(TARGET autodis) + +add_definitions(-DENABLE_DVPP_INTERFACE) +add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0) +add_definitions(-Dgoogle=mindxsdk_private) +add_compile_options(-std=c++11 -fPIE -fstack-protector-all -fPIC -Wall) +add_link_options(-Wl,-z,relro,-z,now,-z,noexecstack -s -pie) + +# Check environment variable +if(NOT DEFINED ENV{ASCEND_HOME}) + message(FATAL_ERROR "please define environment variable:ASCEND_HOME") +endif() +if(NOT DEFINED ENV{ASCEND_VERSION}) + message(WARNING "please define environment variable:ASCEND_VERSION") +endif() +if(NOT DEFINED ENV{ARCH_PATTERN}) + message(WARNING "please define environment variable:ARCH_PATTERN") +endif() + +set(ACL_INC_DIR $ENV{ASCEND_HOME}/$ENV{ASCEND_VERSION}/$ENV{ARCH_PATTERN}/acllib/include) +set(ACL_LIB_DIR $ENV{ASCEND_HOME}/$ENV{ASCEND_VERSION}/$ENV{ARCH_PATTERN}/acllib/lib64) + +set(MXBASE_ROOT_DIR $ENV{MX_SDK_HOME}) +set(MXBASE_INC ${MXBASE_ROOT_DIR}/include) +set(MXBASE_LIB_DIR ${MXBASE_ROOT_DIR}/lib) +set(MXBASE_POST_LIB_DIR ${MXBASE_ROOT_DIR}/lib/modelpostprocessors) +set(MXBASE_POST_PROCESS_DIR ${MXBASE_ROOT_DIR}/include/MxBase/postprocess/include) + +if(DEFINED ENV{MXSDK_OPENSOURCE_DIR}) + set(OPENSOURCE_DIR $ENV{MXSDK_OPENSOURCE_DIR}) +else() + set(OPENSOURCE_DIR ${MXBASE_ROOT_DIR}/opensource) +endif() + + +include_directories(${ACL_INC_DIR}) +include_directories(${OPENSOURCE_DIR}/include) +include_directories(${OPENSOURCE_DIR}/include/opencv4) + +include_directories(${MXBASE_INC}) +include_directories(${MXBASE_POST_PROCESS_DIR}) + +link_directories(${ACL_LIB_DIR}) +link_directories(${OPENSOURCE_DIR}/lib) +link_directories(${MXBASE_LIB_DIR}) +link_directories(${MXBASE_POST_LIB_DIR}) + + +add_executable(${TARGET} src/main.cpp src/Autodis.cpp) + +target_link_libraries(${TARGET} glog cpprest mxbase opencv_world) + +install(TARGETS ${TARGET} RUNTIME DESTINATION ${PROJECT_SOURCE_DIR}/) diff --git a/research/recommend/autodis/infer/mxbase/build.sh b/research/recommend/autodis/infer/mxbase/build.sh new file mode 100644 index 0000000000000000000000000000000000000000..3bf7ec893ebdf3ac12c122f2bda363e286fe2e34 --- /dev/null +++ b/research/recommend/autodis/infer/mxbase/build.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +path_cur=$(dirname $0) + +function check_env() +{ + # set ASCEND_VERSION to ascend-toolkit/latest when it was not specified by user + if [ ! "${ASCEND_HOME}" ]; then + export ASCEND_HOME=/usr/local/Ascend/ + echo "Set ASCEND_HOME to the default value: ${ASCEND_HOME}" + else + echo "ASCEND_HOME is set to ${ASCEND_HOME} by user" + fi + + if [ ! "${ASCEND_VERSION}" ]; then + export ASCEND_VERSION=nnrt/latest + echo "Set ASCEND_VERSION to the default value: ${ASCEND_VERSION}" + else + echo "ASCEND_VERSION is set to ${ASCEND_VERSION} by user" + fi + + if [ ! "${ARCH_PATTERN}" ]; then + # set ARCH_PATTERN to ./ when it was not specified by user + export ARCH_PATTERN=./ + echo "ARCH_PATTERN is set to the default value: ${ARCH_PATTERN}" + else + echo "ARCH_PATTERN is set to ${ARCH_PATTERN} by user" + fi +} + +function build_autodis() +{ + cd $path_cur + rm -rf build + mkdir -p build + cd build + cmake .. + make + ret=$? + if [ ${ret} -ne 0 ]; then + echo "Failed to build autodis." + exit ${ret} + fi + make install +} + +check_env +build_autodis diff --git a/research/recommend/autodis/infer/mxbase/src/Autodis.cpp b/research/recommend/autodis/infer/mxbase/src/Autodis.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b70f05ad47ebf2a493cef9268ee5d613ec1e240d --- /dev/null +++ b/research/recommend/autodis/infer/mxbase/src/Autodis.cpp @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2021. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <unistd.h> +#include <sys/stat.h> +#include <math.h> +#include <memory> +#include <string> +#include <fstream> +#include <algorithm> +#include <vector> +#include "Autodis.h" +#include "MxBase/DeviceManager/DeviceManager.h" +#include "MxBase/Log/Log.h" + +APP_ERROR AUTODIS::Init(const InitParam &initParam) { + deviceId_ = initParam.deviceId; + APP_ERROR ret = MxBase::DeviceManager::GetInstance()->InitDevices(); + if (ret != APP_ERR_OK) { + LogError << "Init devices failed, ret=" << ret << "."; + return ret; + } + ret = MxBase::TensorContext::GetInstance()->SetContext(initParam.deviceId); + if (ret != APP_ERR_OK) { + LogError << "Set context failed, ret=" << ret << "."; + return ret; + } + model_ = std::make_shared<MxBase::ModelInferenceProcessor>(); + ret = model_->Init(initParam.modelPath, modelDesc_); + if (ret != APP_ERR_OK) { + LogError << "ModelInferenceProcessor init failed, ret=" << ret << "."; + return ret; + } + return APP_ERR_OK; +} + +APP_ERROR AUTODIS::DeInit() { + model_->DeInit(); + MxBase::DeviceManager::GetInstance()->DestroyDevices(); + return APP_ERR_OK; +} + +template<class dtype> +APP_ERROR AUTODIS::VectorToTensorBase(const std::vector<std::vector<dtype>> &input, uint32_t inputId + , MxBase::TensorBase &tensorBase) { + uint32_t dataSize = modelDesc_.inputTensors[inputId].tensorDims[1]; + dtype *metaFeatureData = new dtype[dataSize]; + uint32_t idx = 0; + for (size_t bs = 0; bs < input.size(); bs++) { + for (size_t c = 0; c < input[bs].size(); c++) { + metaFeatureData[idx++] = input[bs][c]; + } + } + MxBase::MemoryData memoryDataDst(dataSize * 4, MxBase::MemoryData::MEMORY_DEVICE, deviceId_); + MxBase::MemoryData memoryDataSrc(reinterpret_cast<void *>(metaFeatureData), dataSize * 4 + , MxBase::MemoryData::MEMORY_HOST_MALLOC); + + APP_ERROR ret = MxBase::MemoryHelper::MxbsMallocAndCopy(memoryDataDst, memoryDataSrc); + if (ret != APP_ERR_OK) { + LogError << GetError(ret) << "Memory malloc failed."; + return ret; + } + std::vector<uint32_t> shape = {1, dataSize}; + if (typeid(dtype) == typeid(float)) { + tensorBase = MxBase::TensorBase(memoryDataDst, false, shape, MxBase::TENSOR_DTYPE_FLOAT32); + } else { + tensorBase = MxBase::TensorBase(memoryDataDst, false, shape, MxBase::TENSOR_DTYPE_INT32); + } + return APP_ERR_OK; +} + +APP_ERROR AUTODIS::Inference(const std::vector<MxBase::TensorBase> &inputs, + std::vector<MxBase::TensorBase> &outputs) { + auto dtypes = model_->GetOutputDataType(); + for (size_t i = 0; i < modelDesc_.outputTensors.size(); i++) { + std::vector<uint32_t> shape = {}; + for (size_t j = 0; j < modelDesc_.outputTensors[i].tensorDims.size(); j++) { + shape.push_back((uint32_t)modelDesc_.outputTensors[i].tensorDims[j]); + } + MxBase::TensorBase tensor(shape, dtypes[i], MxBase::MemoryData::MemoryType::MEMORY_DEVICE, deviceId_); + APP_ERROR ret = MxBase::TensorBase::TensorBaseMalloc(tensor); + if (ret != APP_ERR_OK) { + LogError << "TensorBaseMalloc failed, ret=" << ret << "."; + return ret; + } + outputs.push_back(tensor); + } + MxBase::DynamicInfo dynamicInfo = {}; + dynamicInfo.dynamicType = MxBase::DynamicType::STATIC_BATCH; + auto startTime = std::chrono::high_resolution_clock::now(); + APP_ERROR ret = model_->ModelInference(inputs, outputs, dynamicInfo); + auto endTime = std::chrono::high_resolution_clock::now(); + double costMs = std::chrono::duration<double, std::milli>(endTime - startTime).count(); + inferCostTimeMilliSec += costMs; + if (ret != APP_ERR_OK) { + LogError << "ModelInference failed, ret=" << ret << "."; + return ret; + } + return APP_ERR_OK; +} + +APP_ERROR AUTODIS::PostProcess(std::vector<float> &probs, const std::vector<MxBase::TensorBase> &inputs) { + size_t index = 0; + for (auto retTensor : inputs) { + if (index != 1) { + index++; + continue; + } + std::vector<uint32_t> shape = retTensor.GetShape(); + uint32_t N = shape[0]; + uint32_t C = shape[1]; + // LogInfo << N << '\t' << C << '\t'; + if (!retTensor.IsHost()) { + // LogInfo << "this tensor is not in host. Now deploy it to host"; + retTensor.ToHost(); + } + void* data = retTensor.GetBuffer(); + for (uint32_t i = 0; i < N; i++) { + for (uint32_t j = 0; j < C; j++) { + float value = *(reinterpret_cast<float*>(data) + i * C + j); + probs.emplace_back(value); + } + } + index++; + } + return APP_ERR_OK; +} + +APP_ERROR AUTODIS::PrintInputInfo(std::vector<MxBase::TensorBase> inputs) { + LogInfo << "input size: " << inputs.size(); + for (size_t i = 0; i < inputs.size(); i++) { + // check tensor is available + MxBase::TensorBase &tensor_input = inputs[i]; + auto inputShape = tensor_input.GetShape(); + uint32_t inputDataType = tensor_input.GetDataType(); + LogInfo << "input_" + std::to_string(i) + "_shape is: " << inputShape[0] + << " " << inputShape[1] << " " << inputShape.size(); + LogInfo << "input_" + std::to_string(i) + "_dtype is: " << inputDataType; + } + return APP_ERR_OK; +} + +APP_ERROR AUTODIS::Process(const std::vector<std::vector<int>> &ids, const std::vector<std::vector<float>> &wts, + const std::vector<std::vector<float>> &label, const InitParam &initParam + , std::vector<int> &pred, std::vector<float> &probs) { + std::vector<MxBase::TensorBase> inputs = {}; + std::vector<MxBase::TensorBase> outputs = {}; + std::vector<float> infer_probs; + size_t batch_size = ids.size(); + APP_ERROR ret; + MxBase::TensorBase tensorBase; + ret = VectorToTensorBase(ids, 0, tensorBase); + if (ret != APP_ERR_OK) { + LogError << "ToTensorBase failed, ret=" << ret << "."; + return ret; + } + inputs.push_back(tensorBase); + ret = VectorToTensorBase(wts, 1, tensorBase); + if (ret != APP_ERR_OK) { + LogError << "ToTensorBase failed, ret=" << ret << "."; + return ret; + } + inputs.push_back(tensorBase); + ret = VectorToTensorBase(label, 2, tensorBase); + if (ret != APP_ERR_OK) { + LogError << "ToTensorBase failed, ret=" << ret << "."; + return ret; + } + inputs.push_back(tensorBase); + // print inputs info + // PrintInputInfo(inputs); + + // run inference + ret = Inference(inputs, outputs); + if (ret != APP_ERR_OK) { + LogError << "Inference failed, ret=" << ret << "."; + return ret; + } + + ret = PostProcess(infer_probs, outputs); + if (ret != APP_ERR_OK) { + LogError << "Save model infer results into file failed. ret = " << ret << "."; + return ret; + } + + // save result + for (size_t i = 0; i < batch_size; i++) { + pred.push_back(static_cast<int>(round(infer_probs[i]))); + probs.push_back(infer_probs[i]); + } + return APP_ERR_OK; +} diff --git a/research/recommend/autodis/infer/mxbase/src/Autodis.h b/research/recommend/autodis/infer/mxbase/src/Autodis.h new file mode 100644 index 0000000000000000000000000000000000000000..5aa85de6cb29eceb541abb8f2917fe203798e291 --- /dev/null +++ b/research/recommend/autodis/infer/mxbase/src/Autodis.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> +#include <vector> +#include <memory> +#ifndef MxBase_AUTODIS_H +#define MxBase_AUTODIS_H + +#include "MxBase/DvppWrapper/DvppWrapper.h" +#include "MxBase/ModelInfer/ModelInferenceProcessor.h" +#include "MxBase/Tensor/TensorContext/TensorContext.h" + +struct InitParam { + uint32_t deviceId; + bool checkTensor; + std::string modelPath; +}; + +class AUTODIS { + public: + APP_ERROR Init(const InitParam &initParam); + APP_ERROR DeInit(); + template<class dtype> + APP_ERROR VectorToTensorBase(const std::vector<std::vector<dtype>> &input_x, uint32_t inputId + , MxBase::TensorBase &tensorBase); + APP_ERROR Inference(const std::vector<MxBase::TensorBase> &inputs, std::vector<MxBase::TensorBase> &outputs); + APP_ERROR Process(const std::vector<std::vector<int>> &ids, const std::vector<std::vector<float>> &wts + , const std::vector<std::vector<float>> &label, const InitParam &initParam, + std::vector<int> &pred, std::vector<float> &probs); + APP_ERROR PostProcess(std::vector<float> &probs, const std::vector<MxBase::TensorBase> &inputs); + APP_ERROR PrintInputInfo(std::vector<MxBase::TensorBase> inputs); + double GetInferCostMilliSec() const {return inferCostTimeMilliSec;} + private: + std::shared_ptr<MxBase::ModelInferenceProcessor> model_; + MxBase::ModelDesc modelDesc_; + uint32_t deviceId_ = 0; + double inferCostTimeMilliSec = 0.0; +}; +#endif diff --git a/research/recommend/autodis/infer/mxbase/src/main.cpp b/research/recommend/autodis/infer/mxbase/src/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ce31af66189ea487bca498ad4abaa69a2e7a846b --- /dev/null +++ b/research/recommend/autodis/infer/mxbase/src/main.cpp @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2021. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <dirent.h> +#include <unistd.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <fstream> +#include <string> +#include <sstream> +#include <cstdlib> +#include <vector> +#include <cmath> +#include <cstdio> +#include "Autodis.h" +#include "MxBase/Log/Log.h" + +const char input_format[] = "bin"; +const int shape[3] = {39, 39, 1}; +template<class dtype> +APP_ERROR ReadTxt(const std::string &path, std::vector<std::vector<dtype>> &dataset) { + std::ifstream fp(path); + std::string line; + while (std::getline(fp, line)) { + std::vector<dtype> data_line; + std::string number; + std::istringstream readstr(line); + + while (std::getline(readstr, number, '\t')) { + data_line.push_back(atof(number.c_str())); + } + dataset.push_back(data_line); + } + return APP_ERR_OK; +} + +template<class dtype> +APP_ERROR ReadBin(const std::string &path, std::vector<std::vector<dtype>> &dataset, const int index) { + std::ifstream inFile(path, std::ios::in|std::ios::binary); + dtype data[shape[index]]; + while (inFile.read(reinterpret_cast<char *>(&data), sizeof(data))) { + std::vector<dtype> temp(data, data + sizeof(data) / sizeof(data[0])); + dataset.push_back(temp); + } + return APP_ERR_OK; +} + +APP_ERROR WriteResult(const std::string &output_dir, const std::string &output_file, + const std::vector<std::vector<float>> &label, const std::vector<float> &probs, + const std::vector<int> &pred) { + std::string output_path = output_dir+"/"+output_file; + if (access(output_dir.c_str(), F_OK) == -1) { + mkdir(output_dir.c_str(), S_IRWXO|S_IRWXG|S_IRWXU); + } + std::ofstream outfile(output_path, std::ios::out | std::ios::trunc); + if (outfile.fail()) { + LogError << "Failed to open result file: "; + return APP_ERR_COMM_FAILURE; + } + outfile << "label\tprob\tpred\n"; + + for (size_t i = 0; i < label.size(); i ++) { + std::string temp = ""; + for (size_t j = 0; j < label[i].size(); j ++) { + temp += std::to_string(static_cast<int>(label[i][j]))+"\t"; + } + temp += std::to_string(probs[i])+"\t"; + temp += std::to_string(pred[i])+"\n"; + outfile << temp; + } + outfile.close(); + return APP_ERR_OK; +} + +APP_ERROR WriteMetric(const std::string &output_dir, const std::string &output_file, + const int &data_row, const float &infer_total_time, + const float &acc, const float &auc) { + std::string output_path = output_dir+"/"+output_file; + if (access(output_dir.c_str(), F_OK) == -1) { + mkdir(output_dir.c_str(), S_IRWXO | S_IRWXG | S_IRWXU); + } + std::ofstream outfile(output_path, std::ios::out | std::ios::trunc); + if (outfile.fail()) { + LogError << "Failed to open result file: "; + return APP_ERR_COMM_FAILURE; + } + outfile << "Number of samples:" + std::to_string(data_row) + "\n"; + outfile << "Infer total time:" + std::to_string(infer_total_time) + "\n"; + outfile << "Average infer time:" + std::to_string(infer_total_time/data_row) + "\n"; + outfile << "Infer acc:" + std::to_string(acc) + "\n"; + outfile << "Infer auc:" + std::to_string(auc) + "\n"; + return APP_ERR_OK; +} + +float get_acc(const std::vector<int> &pred, const std::vector<std::vector<float>> &label) { + size_t num = pred.size(); + if (num == 0) { + return 0.0; + } + float s = 0; + for (size_t i = 0; i < num; i++) { + if (pred[i] == static_cast<int>(label[i][0])) { + s += 1; + } + } + return s/num; +} + +float get_auc(const std::vector<float> &probs, const std::vector<std::vector<float>> &label, size_t n_bins = 10000) { + size_t positive_len = 0; + for (size_t i = 0; i < label.size(); i++) { + positive_len += static_cast<int>(label[i][0]); + } + size_t negative_len = label.size()-positive_len; + if (positive_len == 0 || negative_len == 0) { + return 0.0; + } + uint64_t total_case = positive_len*negative_len; + std::vector<size_t> pos_histogram(n_bins+1, 0); + std::vector<size_t> neg_histogram(n_bins+1, 0); + float bin_width = 1.0/n_bins; + for (size_t i = 0; i < probs.size(); i ++) { + size_t nth_bin = static_cast<int>(probs[i]/bin_width); + if (static_cast<int>(label[i][0]) == 1) { + pos_histogram[nth_bin] += 1; + } else { + neg_histogram[nth_bin] += 1; + } + } + size_t accumulated_neg = 0; + float satisfied_pair = 0; + for (size_t i = 0; i < n_bins+1; i ++) { + satisfied_pair += (pos_histogram[i]*accumulated_neg + pos_histogram[i]*neg_histogram[i]*0.5); + accumulated_neg += neg_histogram[i]; + } + return satisfied_pair/total_case; +} + +int main(int argc, char* argv[]) { + InitParam initParam = {}; + initParam.deviceId = 0; + initParam.checkTensor = true; + initParam.modelPath = "../data/model/autodis.om"; + auto autodis = std::make_shared<AUTODIS>(); + printf("Start running\n"); + APP_ERROR ret = autodis->Init(initParam); + if (ret != APP_ERR_OK) { + autodis->DeInit(); + LogError << "autodis init failed, ret=" << ret << "."; + return ret; + } + + // read data from txt or bin + std::vector<std::vector<int>> ids_data; + std::vector<std::vector<float>> wts_data; + std::vector<std::vector<float>> label_data; + if (strcmp(input_format, "txt") == 0) { + std::string ids_path = "../data/input/ids.txt"; + std::string wts_path = "../data/input/wts.txt"; + std::string label_path = "../data/input/label.txt"; + + ret = ReadTxt(ids_path, ids_data); + if (ret != APP_ERR_OK) { + LogError << "read ids failed, ret=" << ret << "."; + return ret; + } + ret = ReadTxt(wts_path, wts_data); + if (ret != APP_ERR_OK) { + LogError << "read wts failed, ret=" << ret << "."; + return ret; + } + ret = ReadTxt(label_path, label_data); + if (ret != APP_ERR_OK) { + LogError << "read label failed, ret=" << ret << "."; + return ret; + } + } else { + std::string ids_path = "../data/input/ids.bin"; + std::string wts_path = "../data/input/wts.bin"; + std::string label_path = "../data/input/label.bin"; + + ret = ReadBin(ids_path, ids_data, 0); + if (ret != APP_ERR_OK) { + LogError << "read ids failed, ret=" << ret << "."; + return ret; + } + ret = ReadBin(wts_path, wts_data, 1); + if (ret != APP_ERR_OK) { + LogError << "read wts failed, ret=" << ret << "."; + return ret; + } + ret = ReadBin(label_path, label_data, 2); + if (ret != APP_ERR_OK) { + LogError << "read label failed, ret=" << ret << "."; + return ret; + } + } + + int ids_row = ids_data.size(); + int wts_row = wts_data.size(); + int label_row = label_data.size(); + + if (label_row != ids_row || label_row != wts_row) { + LogError << "size of label, ids and wts are not equal"; + return -1; + } + int data_row = label_row; + + std::vector<int> pred; + std::vector<float> probs; + + for (int i = 0; i < data_row; i++) { + std::vector<std::vector<int>> ids; + std::vector<std::vector<float>> wts; + std::vector<std::vector<float>> label; + ids.emplace_back(ids_data[i]); + wts.emplace_back(wts_data[i]); + label.emplace_back(label_data[i]); + ret = autodis->Process(ids, wts, label, initParam, pred, probs); + if (ret !=APP_ERR_OK) { + LogError << "autodis process failed, ret=" << ret << "."; + autodis->DeInit(); + return ret; + } + } + + std::string output_dir = "./output"; + std::string output_file = "result.txt"; + WriteResult(output_dir, output_file, label_data, probs, pred); + + float infer_total_time = autodis->GetInferCostMilliSec()/1000; + float acc = get_acc(pred, label_data); + float auc = get_auc(probs, label_data); + output_file = "metric.txt"; + WriteMetric(output_dir, output_file, data_row, infer_total_time, acc, auc); + + LogInfo << "<<==========Infer Metric==========>>"; + LogInfo << "Number of samples:" + std::to_string(data_row); + LogInfo << "Infer total time:" + std::to_string(infer_total_time); + LogInfo << "Average infer time:" + std::to_string(infer_total_time/data_row); + LogInfo << "Infer acc:"+ std::to_string(acc); + LogInfo << "Infer auc:"+ std::to_string(auc); + LogInfo << "<<================================>>"; + + autodis->DeInit(); + return APP_ERR_OK; +} diff --git a/research/recommend/autodis/infer/sdk/main.py b/research/recommend/autodis/infer/sdk/main.py new file mode 100644 index 0000000000000000000000000000000000000000..1c5f488def2a6278e9f865adc65e8f6602000f72 --- /dev/null +++ b/research/recommend/autodis/infer/sdk/main.py @@ -0,0 +1,254 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +sample script of autodis infer using SDK run in docker +""" + +import argparse +import os +import time + +import MxpiDataType_pb2 as MxpiDataType +import numpy as np +from StreamManagerApi import StreamManagerApi, MxDataInput, InProtobufVector, \ + MxProtobufIn, StringVector + +def parse_args(): + """set and check parameters.""" + parser = argparse.ArgumentParser(description='autodis process') + parser.add_argument('--data_dir', type=str, default='../data/input', help='Data path') + parser.add_argument('--ids_file', type=str, default='ids') + parser.add_argument('--wts_file', type=str, default='wts') + parser.add_argument('--label_file', type=str, default='label') + parser.add_argument('--input_format', type=str, default='bin') + parser.add_argument('--output_dir', type=str, default='./output', help='Data path') + parser.add_argument('--pipeline', type=str, default='../data/config/autodis.pipeline', help='SDK infer pipeline') + parser.add_argument('--dense_dim', type=int, default=13) + parser.add_argument('--slot_dim', type=int, default=26) + args_opt = parser.parse_args() + return args_opt + +args = parse_args() + +def send_source_data(appsrc_id, file_name, file_data, stream_name, stream_manager, shape, tp): + """ + Construct the input of the stream, + send inputs data to a specified stream based on streamName. + + Returns: + bool: send data success or not + """ + tensors = np.array(file_data, dtype=tp).reshape(shape) + tensor_package_list = MxpiDataType.MxpiTensorPackageList() + tensor_package = tensor_package_list.tensorPackageVec.add() + data_input = MxDataInput() + tensor_vec = tensor_package.tensorVec.add() + tensor_vec.deviceId = 0 + tensor_vec.memType = 0 + for i in tensors.shape: + tensor_vec.tensorShape.append(i) + array_bytes = tensors.tobytes() + data_input.data = array_bytes + tensor_vec.dataStr = data_input.data + tensor_vec.tensorDataSize = len(array_bytes) + key = "appsrc{}".format(appsrc_id).encode('utf-8') + protobuf_vec = InProtobufVector() + protobuf = MxProtobufIn() + protobuf.key = key + protobuf.type = b'MxTools.MxpiTensorPackageList' + protobuf.protobuf = tensor_package_list.SerializeToString() + protobuf_vec.push_back(protobuf) + ret = stream_manager.SendProtobuf(stream_name, appsrc_id, protobuf_vec) + if ret < 0: + print("Failed to send data to stream.") + return False + print("Send successfully!") + return True + +def send_appsrc_data(appsrc_id, file_name, file_data, stream_name, stream_manager, shape, tp): + """ + send three stream to infer model, include input ids, input mask and token type_id. + + Returns: + bool: send data success or not + """ + if not send_source_data(appsrc_id, file_name, file_data, stream_name, stream_manager, shape, tp): + return False + return True + +def get_acc(labels, preds): + """Accuracy""" + accuracy = np.sum(labels == preds) / len(labels) + return accuracy + +def post_process(infer_result): + """ + process the result of infer tensor to Visualization results. + Args: + infer_result: get logit from infer result + """ + result = MxpiDataType.MxpiTensorPackageList() + result.ParseFromString(infer_result[0].messageBuf) + res = np.frombuffer(result.tensorPackageVec[0].tensorVec[1].dataStr, dtype=np.float32) + res = res.reshape((-1,)) + label = np.frombuffer(result.tensorPackageVec[0].tensorVec[2].dataStr, dtype=np.float32) + label = res.reshape((-1,)) + pred_label = np.round(res) + return int(label[0]), res[0], int(pred_label[0]) + +def get_auc(labels, preds, n_bins=10000): + """ROC_AUC""" + postive_len = sum(labels) + negative_len = len(labels) - postive_len + total_case = postive_len * negative_len + if total_case == 0: + return 0 + pos_histogram = [0 for _ in range(n_bins+1)] + neg_histogram = [0 for _ in range(n_bins+1)] + bin_width = 1.0 / n_bins + for i in range(len(labels)): + nth_bin = int(preds[i]/bin_width) + if labels[i] == 1: + pos_histogram[nth_bin] += 1 + else: + neg_histogram[nth_bin] += 1 + accumulated_neg = 0 + satisfied_pair = 0 + for i in range(n_bins+1): + satisfied_pair += (pos_histogram[i] * accumulated_neg + pos_histogram[i] * neg_histogram[i] * 0.5) + accumulated_neg += neg_histogram[i] + return satisfied_pair / float(total_case) + +def run(): + """ + read pipeline and do infer + """ + # init stream manager + stream_manager_api = StreamManagerApi() + ret = stream_manager_api.InitManager() + if ret != 0: + print("Failed to init Stream manager, ret=%s" % str(ret)) + exit() + + # create streams by pipeline config file + with open(os.path.realpath(args.pipeline), 'rb') as f: + pipeline_str = f.read() + ret = stream_manager_api.CreateMultipleStreams(pipeline_str) + if ret != 0: + print("Failed to create Stream, ret=%s" % str(ret)) + exit() + + # preprocess data + if args.input_format == 'txt': + ids_data = np.loadtxt(os.path.join(args.data_dir, args.ids_file+"."+args.input_format), delimiter="\t") + wts_data = np.loadtxt(os.path.join(args.data_dir, args.wts_file+"."+args.input_format), delimiter="\t") + label_data = np.loadtxt(os.path.join(args.data_dir, args.label_file+"."+args.input_format), delimiter="\t") + else: + ids_data = np.fromfile(os.path.join(args.data_dir, args.ids_file+"."+args.input_format), dtype=np.int32) + ids_data.shape = -1, 39 + wts_data = np.fromfile(os.path.join(args.data_dir, args.wts_file+"."+args.input_format), dtype=np.float32) + wts_data.shape = -1, 39 + label_data = np.fromfile(os.path.join(args.data_dir, args.label_file+"."+args.input_format), dtype=np.float32) + label_data.shape = -1, 1 + + if(ids_data.shape[0] != wts_data.shape[0] or wts_data.shape[0] != label_data.shape[0]): + print("number of input data not completely equal") + exit() + rows = label_data.shape[0] + + # statistical variable + labels = [] + probs = [] + preds = [] + infer_total_time = 0 + + # write predict label + if not os.path.exists(args.output_dir): + os.mkdir(args.output_dir) + fo = open(os.path.join(args.output_dir, "result.txt"), "w") + fo.write("label\tprob\tpred\n") + for i in range(rows): + # fetch data + ids = ids_data[i] + wts = wts_data[i] + label = label_data[i] + + # data shape + ids_shape = (-1, args.dense_dim+args.slot_dim) + wts_shape = (-1, args.dense_dim+args.slot_dim) + label_shape = (-1, 1) + + # data type + ids_type = np.int32 + wts_type = np.float32 + label_type = np.float32 + + # send data + stream_name = b'autodis' + if not send_appsrc_data(0, "ids", ids, stream_name, stream_manager_api, ids_shape, ids_type): + return + if not send_appsrc_data(1, "wts", wts, stream_name, stream_manager_api, wts_shape, wts_type): + return + if not send_appsrc_data(2, "label", label, stream_name, stream_manager_api, label_shape, label_type): + return + + # Obtain the inference result by specifying streamName and uniqueId. + key_vec = StringVector() + key_vec.push_back(b'mxpi_tensorinfer0') + start_time = time.time() + infer_result = stream_manager_api.GetProtobuf(stream_name, 0, key_vec) + infer_total_time += time.time() - start_time + if infer_result.size() == 0: + print("inferResult is null") + return + if infer_result[0].errorCode != 0: + print("GetProtobuf error. errorCode=%d" % (infer_result[0].errorCode)) + return + + # updata variable + label_, prob_, pred_ = post_process(infer_result) + label_ = label + labels.append(label_) + probs.append(prob_) + preds.append(pred_) + + # write predict label + fo.write(str(label_)+"\t"+str(prob_)+"\t"+str(pred_)+"\n") + + labels = np.array(labels) + probs = np.array(probs) + preds = np.array(preds) + infer_acc = get_acc(labels, preds) + infer_auc = get_auc(labels, probs) + fo1 = open(os.path.join(args.output_dir, "metric.txt"), "w") + fo1.write("Number of samples:%d\n"%(rows)) + fo1.write("Infer total time:%f\n"%(infer_total_time)) + fo1.write("Average infer time:%f\n"%(infer_total_time/rows)) + fo1.write("Infer acc:%f\n"%(infer_acc)) + fo1.write("Infer auc:%f\n"%(infer_auc)) + fo.close() + fo1.close() + print('<<======== Infer Metric ========>>') + print("Number of samples:%d"%(rows)) + print("Infer total time:%f"%(infer_total_time)) + print("Average infer time:%f\n"%(infer_total_time/rows)) + print("Infer acc:%f"%(infer_acc)) + print("infer auc:%f"%(infer_auc)) + print('<<===============================>>') + stream_manager_api.DestroyAllStreams() + +if __name__ == '__main__': + run() diff --git a/research/recommend/autodis/infer/sdk/prec/calc_metric.py b/research/recommend/autodis/infer/sdk/prec/calc_metric.py new file mode 100644 index 0000000000000000000000000000000000000000..8eab4d113df9b622ad359ea18c4c54a31defe50f --- /dev/null +++ b/research/recommend/autodis/infer/sdk/prec/calc_metric.py @@ -0,0 +1,75 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +sample script of autodis calculating metric +""" + +import argparse +import numpy as np + +def parse_args(): + """set and check parameters.""" + parser = argparse.ArgumentParser(description='calc metric') + parser.add_argument('--result_file', type=str, default='../output/result.txt') + parser.add_argument('--output_file', type=str, default='./metric.txt') + args_opt = parser.parse_args() + return args_opt + +def get_acc(labels, preds): + """Accuracy""" + accuracy = np.sum(labels == preds) / len(labels) + return accuracy + +def get_auc(labels, preds, n_bins=10000): + """ROC_AUC""" + postive_len = sum(labels) + negative_len = len(labels) - postive_len + total_case = postive_len * negative_len + if total_case == 0: + return 0 + pos_histogram = [0 for _ in range(n_bins+1)] + neg_histogram = [0 for _ in range(n_bins+1)] + bin_width = 1.0 / n_bins + for i in range(len(labels)): + nth_bin = int(preds[i]/bin_width) + if labels[i] == 1: + pos_histogram[nth_bin] += 1 + else: + neg_histogram[nth_bin] += 1 + accumulated_neg = 0 + satisfied_pair = 0 + for i in range(n_bins+1): + satisfied_pair += (pos_histogram[i] * accumulated_neg + pos_histogram[i] * neg_histogram[i] * 0.5) + accumulated_neg += neg_histogram[i] + + return satisfied_pair / float(total_case) + +def run(): + """ + calc metric + """ + args = parse_args() + data = np.loadtxt(args.result_file, delimiter="\t", skiprows=1) + data.shape = -1, 3 + acc = get_acc(data[:, 0], data[:, 2]) + auc = get_auc(data[:, 0], data[:, 1]) + fo = open(args.output_file, "w") + fo.write("Infer acc:{}\nInfer auc:{}".format(acc, auc)) + fo.close() + print("Infer acc:{}\nInfer auc:{}".format(acc, auc)) + +if __name__ == '__main__': + run() diff --git a/research/recommend/autodis/infer/sdk/run.sh b/research/recommend/autodis/infer/sdk/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..8f9e455115ecf1a4e68e3c6f12c0a69300ef76ea --- /dev/null +++ b/research/recommend/autodis/infer/sdk/run.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e + +# Simple log helper functions +info() { echo -e "\033[1;34m[INFO ][MxStream] $1\033[1;37m" ; } +warn() { echo >&2 -e "\033[1;31m[WARN ][MxStream] $1\033[1;37m" ; } + +#export MX_SDK_HOME=/home/data/cz/app/mxManufacture +export LD_LIBRARY_PATH=${MX_SDK_HOME}/lib:${MX_SDK_HOME}/opensource/lib:${MX_SDK_HOME}/opensource/lib64:/usr/local/Ascend/ascend-toolkit/latest/acllib/lib64:${LD_LIBRARY_PATH} +export GST_PLUGIN_SCANNER=${MX_SDK_HOME}/opensource/libexec/gstreamer-1.0/gst-plugin-scanner +export GST_PLUGIN_PATH=${MX_SDK_HOME}/opensource/lib/gstreamer-1.0:${MX_SDK_HOME}/lib/plugins + +#to set PYTHONPATH, import the StreamManagerApi.py +export PYTHONPATH=$PYTHONPATH:${MX_SDK_HOME}/python + +python3.7 main.py +exit 0 diff --git a/research/recommend/autodis/infer/utils/prepare_txt.py b/research/recommend/autodis/infer/utils/prepare_txt.py new file mode 100644 index 0000000000000000000000000000000000000000..6a1766ad6b5b4763684d1069e2b36f7c83fc0588 --- /dev/null +++ b/research/recommend/autodis/infer/utils/prepare_txt.py @@ -0,0 +1,58 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +""" +sample script of preparing txt for autodis infer +""" +import os +import argparse +import numpy as np +def parse_args(): + """set and check parameters.""" + parser = argparse.ArgumentParser(description="prepare txt") + parser.add_argument('--train_line_count', type=int, default=45840617) + parser.add_argument('--test_size', type=float, default=0.1) + parser.add_argument('--seed', type=int, default=2020) + parser.add_argument('--data_dir', type=str, default='../data/input/origin_data') + parser.add_argument('--dst_dir', type=str, default='../data/input/origin_data') + parser.add_argument('--data_input', type=str, default="train.txt") + parser.add_argument('--data_output', type=str, default="test.txt") + args, _ = parser.parse_known_args() + return args + +def run(): + """ + prepare txt data + """ + args = parse_args() + test_size = int(args.train_line_count * args.test_size) + all_indices = [i for i in range(args.train_line_count)] + np.random.seed(args.seed) + np.random.shuffle(all_indices) + print("all_indices.size:{}".format(len(all_indices))) + test_indices_set = set(all_indices[:test_size]) + print("test_indices_set.size:{}".format(len(test_indices_set))) + with open(os.path.join(args.data_dir, args.data_input), "r") as f: + fo = open(os.path.join(args.dst_dir, args.data_output), "w") + i = 0 + line = f.readline() + while line: + if i in test_indices_set: + fo.write(line) + i += 1 + line = f.readline() + fo.close() + +if __name__ == '__main__': + run() diff --git a/research/recommend/autodis/infer/utils/preprocess_mindrecord.py b/research/recommend/autodis/infer/utils/preprocess_mindrecord.py new file mode 100644 index 0000000000000000000000000000000000000000..2b0ae852266aeb3e8c1be7310bd7fbb59aaa674e --- /dev/null +++ b/research/recommend/autodis/infer/utils/preprocess_mindrecord.py @@ -0,0 +1,66 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +""" +sample script of preprocessing mindrecord data for autodis infer +""" +import os +import argparse +import numpy as np +import mindspore.dataset as ds + +parser = argparse.ArgumentParser(description='Process some integers.') +parser.add_argument('--file_path', + default='../data/input/mindrecord/test_input_part.mindrecord0', + help='input file path') +parser.add_argument('--dst_dir', + default='../data/input', + help='output folder') + +args = parser.parse_args() + +batch_size = 1000 + +data_set = ds.MindDataset(args.file_path, columns_list=['feat_ids', 'feat_vals', 'label'], + shuffle=False, num_parallel_workers=8) + +data_set = data_set.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(batch_size, 39), + np.array(y).flatten().reshape( + batch_size, 39), + np.array(z).flatten().reshape(batch_size, 1))), + input_columns=['feat_ids', 'feat_vals', 'label'], + column_order=['feat_ids', 'feat_vals', 'label'], + num_parallel_workers=8) +d = data_set.create_dict_iterator() + +ids_arr = [] +vals_arr = [] +lab_arr = [] +count = 0 + +for i, item in enumerate(d): + ids_arr.extend(list(item['feat_ids'].asnumpy()[:])) + vals_arr.extend(list(item['feat_vals'].asnumpy()[:])) + lab_arr.extend(list(item['label'].asnumpy()[:])) + count += batch_size + +print("Have hadle {} lines".format(count)) + +ids_arr = np.array(ids_arr, dtype=np.int32) +vals_arr = np.array(vals_arr, dtype=np.float32) +lab_arr = np.array(lab_arr, dtype=np.float32) + +ids_arr.tofile(os.path.join(args.dst_dir, 'ids.bin')) +vals_arr.tofile(os.path.join(args.dst_dir, 'wts.bin')) +lab_arr.tofile(os.path.join(args.dst_dir, 'label.bin')) diff --git a/research/recommend/autodis/infer/utils/preprocess_txt.py b/research/recommend/autodis/infer/utils/preprocess_txt.py new file mode 100644 index 0000000000000000000000000000000000000000..7fd9e8e192970115641be927828ccb673fc03d87 --- /dev/null +++ b/research/recommend/autodis/infer/utils/preprocess_txt.py @@ -0,0 +1,139 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +""" +sample script of preprocessing txt data for autodis infer +""" +import collections +import pickle +import os +import argparse +class StatsDict(): + """preprocessed data""" + + def __init__(self, field_size, dense_dim, slot_dim, skip_id_convert): + self.field_size = field_size + self.dense_dim = dense_dim + self.slot_dim = slot_dim + self.skip_id_convert = bool(skip_id_convert) + + self.val_cols = ["val_{}".format(i + 1) for i in range(self.dense_dim)] + self.cat_cols = ["cat_{}".format(i + 1) for i in range(self.slot_dim)] + + self.val_min_dict = {col: 0 for col in self.val_cols} + self.val_max_dict = {col: 0 for col in self.val_cols} + + self.cat_count_dict = {col: collections.defaultdict(int) for col in self.cat_cols} + + self.oov_prefix = "OOV" + + self.cat2id_dict = {} + self.cat2id_dict.update({col: i for i, col in enumerate(self.val_cols)}) + self.cat2id_dict.update( + {self.oov_prefix + col: i + len(self.val_cols) for i, col in enumerate(self.cat_cols)}) + + def load_dict(self, dict_path, prefix=""): + with open(os.path.join(dict_path, "{}val_max_dict.pkl".format(prefix)), "rb") as file_wrt: + self.val_max_dict = pickle.load(file_wrt) + with open(os.path.join(dict_path, "{}val_min_dict.pkl".format(prefix)), "rb") as file_wrt: + self.val_min_dict = pickle.load(file_wrt) + with open(os.path.join(dict_path, "{}cat_count_dict.pkl".format(prefix)), "rb") as file_wrt: + self.cat_count_dict = pickle.load(file_wrt) + print("val_max_dict.items()[:50]:{}".format(list(self.val_max_dict.items()))) + print("val_min_dict.items()[:50]:{}".format(list(self.val_min_dict.items()))) + + def get_cat2id(self, threshold=100): + for key, cat_count_d in self.cat_count_dict.items(): + new_cat_count_d = dict(filter(lambda x: x[1] > threshold, cat_count_d.items())) + for cat_str, _ in new_cat_count_d.items(): + self.cat2id_dict[key + "_" + cat_str] = len(self.cat2id_dict) + print("cat2id_dict.size:{}".format(len(self.cat2id_dict))) + print("cat2id.dict.items()[:50]:{}".format(list(self.cat2id_dict.items())[:50])) + + def map_cat2id(self, values, cats): + """Cat to id""" + + def minmax_scale_value(i, val): + max_v = float(self.val_max_dict["val_{}".format(i + 1)]) + return float(val) * 1.0 / max_v + + id_list = [] + weight_list = [] + for i, val in enumerate(values): + if val == "": + id_list.append(i) + weight_list.append(0) + else: + key = "val_{}".format(i + 1) + id_list.append(self.cat2id_dict[key]) + weight_list.append(minmax_scale_value(i, float(val))) + + for i, cat_str in enumerate(cats): + key = "cat_{}".format(i + 1) + "_" + cat_str + if key in self.cat2id_dict: + if self.skip_id_convert is True: + # For the synthetic data, if the generated id is between [0, max_vcoab], but the num examples is l + # ess than vocab_size/ slot_nums the id will still be converted to [0, real_vocab], where real_vocab + # the actually the vocab size, rather than the max_vocab. So a simple way to alleviate this + # problem is skip the id convert, regarding the synthetic data id as the final id. + id_list.append(cat_str) + else: + id_list.append(self.cat2id_dict[key]) + else: + id_list.append(self.cat2id_dict[self.oov_prefix + "cat_{}".format(i + 1)]) + weight_list.append(1.0) + return id_list, weight_list + +def parse_args(): + """set and check parameters.""" + parser = argparse.ArgumentParser(description="autodis process") + parser.add_argument('--data_dir', type=str, default='../data/input/origin_data') + parser.add_argument('--dst_dir', type=str, default='../data/input') + parser.add_argument('--data_input', type=str, default="test.txt") + parser.add_argument('--dense_dim', type=int, default=13) + parser.add_argument('--slot_dim', type=int, default=26) + parser.add_argument("--skip_id_convert", type=int, default=0) + parser.add_argument("--threshold", type=int, default=100) + args, _ = parser.parse_known_args() + return args + +def run(): + """ + preprocessing txt data + """ + args = parse_args() + stats = StatsDict(field_size=args.dense_dim+args.slot_dim, dense_dim=args.dense_dim, \ + slot_dim=args.slot_dim, skip_id_convert=args.skip_id_convert) + stats.load_dict(dict_path="./stats_dict", prefix="") + stats.get_cat2id(threshold=args.threshold) + fi = open(os.path.join(args.data_dir, args.data_input), "r") + fo1 = open(os.path.join(args.dst_dir, "label.txt"), "w") + fo2 = open(os.path.join(args.dst_dir, "ids.txt"), "w") + fo3 = open(os.path.join(args.dst_dir, "wts.txt"), "w") + for line in fi: + line = line.strip("\n") + items = line.split("\t") + label = float(items[0]) + values = items[1:1 + args.dense_dim] + cats = items[1 + args.dense_dim:] + ids, wts = stats.map_cat2id(values, cats) + fo1.write(str(int(label))+"\n") + fo2.write("\t".join(str(id) for id in ids)+"\n") + fo3.write("\t".join(str(wt) for wt in wts)+"\n") + fo1.close() + fo2.close() + fo3.close() + +if __name__ == '__main__': + run() diff --git a/research/recommend/autodis/modelarts/train_start.py b/research/recommend/autodis/modelarts/train_start.py new file mode 100644 index 0000000000000000000000000000000000000000..303bc17c7429ce83a3948cfb485f9efb74429638 --- /dev/null +++ b/research/recommend/autodis/modelarts/train_start.py @@ -0,0 +1,169 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""train_criteo.""" +import os +import sys +import datetime +import argparse +import numpy as np + +from mindspore import context, Tensor +from mindspore.context import ParallelMode +from mindspore.communication.management import init, get_rank +from mindspore.train.model import Model +from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, TimeMonitor +from mindspore.common import set_seed +from mindspore.train.serialization import export, load_checkpoint + +from src.autodis import ModelBuilder, AUCMetric +from src.dataset_modelarts import create_dataset, DataType +from src.callback import EvalCallBack, LossCallBack +from src.model_utils.moxing_adapter_modelarts import moxing_wrapper +from src.model_utils.config_modelarts import config, train_config, data_config, model_config +from src.model_utils.device_adapter_modelarts import get_device_id, get_device_num, get_rank_id + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +set_seed(1) + +def get_latest_ckpt(): + '''get latest ckpt''' + ckpt_path = config.ckpt_path + ckpt_files = [ckpt_file for ckpt_file in os.listdir(ckpt_path) if ckpt_file.endswith(".ckpt")] + if not ckpt_files: + return None + latest_ckpt_file = sorted(ckpt_files)[-1] + return os.path.join(ckpt_path, latest_ckpt_file) + +def export_air_onxx(): + '''export air or onxx''' + ckpt_file = get_latest_ckpt() + if not ckpt_file: + print("Not found ckpt file") + return + config.ckpt_file = ckpt_file + config.file_name = os.path.join(config.ckpt_path, config.file_name) + + print("starting export AIR and ONNX") + + model_config.batch_size = 1 + model_builder = ModelBuilder(model_config, train_config) + _, network = model_builder.get_train_eval_net() + network.set_train(False) + + load_checkpoint(config.ckpt_file, net=network) + + data_config.batch_size = 1 + batch_ids = Tensor(np.zeros([data_config.batch_size, data_config.data_field_size]).astype(np.int32)) + batch_wts = Tensor(np.zeros([data_config.batch_size, data_config.data_field_size]).astype(np.float32)) + labels = Tensor(np.zeros([data_config.batch_size, 1]).astype(np.float32)) + input_data = [batch_ids, batch_wts, labels] + + config.file_format = "AIR" + export(network, *input_data, file_name=config.file_name, file_format=config.file_format) + config.file_format = "MINDIR" + export(network, *input_data, file_name=config.file_name, file_format=config.file_format) + # mox.file.copy(config.file_name+".air", config.ckpt_path+"/../autodis.air") + + +def modelarts_pre_process(): + '''modelarts pre process function.''' + config.train_data_dir = config.data_path + config.ckpt_path = os.path.join(config.output_path, config.ckpt_path) + +@moxing_wrapper(pre_process=modelarts_pre_process) +def run_train(): + '''convert data to mindrecord''' + parser = argparse.ArgumentParser(description='Autodis') + parser.add_argument('--data_url', type=str, default='') + parser.add_argument('--train_url', type=str, default='') + parser.add_argument('--train_epochs', type=int, default=15) + args_opt, _ = parser.parse_known_args() + train_config.train_epochs = args_opt.train_epochs + # get data_url and train_url + config.train_data_dir = args_opt.data_url + config.ckpt_path = config.train_url + + # train function + config.do_eval = config.do_eval == 'True' + rank_size = get_device_num() + if rank_size > 1: + if config.device_target == "Ascend": + device_id = get_device_id() + context.set_context(mode=context.GRAPH_MODE, device_target=config.device_target, device_id=device_id) + context.reset_auto_parallel_context() + context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True) + init() + rank_id = get_rank_id() + else: + print("Unsupported device_target ", config.device_target) + exit() + else: + if config.device_target == "Ascend": + device_id = get_device_id() + context.set_context(mode=context.GRAPH_MODE, device_target=config.device_target, device_id=device_id) + else: + print("Unsupported device_target ", config.device_target) + exit() + rank_size = None + rank_id = None + + # Init Profiler + data_config.data_format = 1 + ds_train = create_dataset(config.train_data_dir, + train_mode=True, + epochs=1, + batch_size=train_config.batch_size, + data_type=DataType(data_config.data_format), + rank_size=rank_size, + rank_id=rank_id) + print("ds_train.size: {}".format(ds_train.get_dataset_size())) + + # steps_size = ds_train.get_dataset_size() + + model_builder = ModelBuilder(model_config, train_config) + train_net, eval_net = model_builder.get_train_eval_net() + auc_metric = AUCMetric() + model = Model(train_net, eval_network=eval_net, metrics={"auc": auc_metric}) + + time_callback = TimeMonitor(data_size=ds_train.get_dataset_size()) + loss_callback = LossCallBack(loss_file_path=config.loss_file_name) + callback_list = [time_callback, loss_callback] + + if train_config.save_checkpoint: + config.ckpt_path = os.path.join(config.ckpt_path, datetime.datetime.now().strftime('%Y-%m-%d_%H_%M_%S')) + if rank_size: + train_config.ckpt_file_name_prefix = train_config.ckpt_file_name_prefix + str(get_rank()) + config.ckpt_path = os.path.join(config.ckpt_path, 'ckpt_' + str(get_rank()) + '/') + config_ck = CheckpointConfig(save_checkpoint_steps=train_config.save_checkpoint_steps, + keep_checkpoint_max=train_config.keep_checkpoint_max) + ckpt_cb = ModelCheckpoint(prefix=train_config.ckpt_file_name_prefix, + directory=config.ckpt_path, + config=config_ck) + callback_list.append(ckpt_cb) + + if config.do_eval: + ds_eval = create_dataset(config.train_data_dir, train_mode=False, + epochs=1, + batch_size=train_config.batch_size, + data_type=DataType(data_config.data_format)) + eval_callback = EvalCallBack(model, ds_eval, auc_metric, + eval_file_path=config.eval_file_name) + callback_list.append(eval_callback) + model.train(train_config.train_epochs, ds_train, callbacks=callback_list) + export_air_onxx() + +if __name__ == '__main__': + run_train() diff --git a/research/recommend/autodis/scripts/docker_start.sh b/research/recommend/autodis/scripts/docker_start.sh new file mode 100644 index 0000000000000000000000000000000000000000..882c28e4926bafc36e30489091b068e1edb8dd2c --- /dev/null +++ b/research/recommend/autodis/scripts/docker_start.sh @@ -0,0 +1,38 @@ +#!/bin/bash +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +docker_image=$1 +data_dir=$2 +model_dir=$3 + +docker run -it --ipc=host \ + --device=/dev/davinci0 \ + --device=/dev/davinci1 \ + --device=/dev/davinci2 \ + --device=/dev/davinci3 \ + --device=/dev/davinci4 \ + --device=/dev/davinci5 \ + --device=/dev/davinci6 \ + --device=/dev/davinci7 \ + --device=/dev/davinci_manager \ + --device=/dev/devmm_svm \ + --device=/dev/hisi_hdc \ + --privileged \ + -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \ + -v /usr/local/Ascend/add-ons/:/usr/local/Ascend/add-ons \ + -v ${data_dir}:${data_dir} \ + -v ${model_dir}:${model_dir} \ + -v /root/ascend/log:/root/ascend/log ${docker_image} /bin/bash diff --git a/research/recommend/autodis/src/dataset_modelarts.py b/research/recommend/autodis/src/dataset_modelarts.py new file mode 100644 index 0000000000000000000000000000000000000000..0a223a049f01001ef5572d2962d91bf826299d0c --- /dev/null +++ b/research/recommend/autodis/src/dataset_modelarts.py @@ -0,0 +1,298 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +""" +Create train or eval dataset. +""" +import os +import math +from enum import Enum + +import numpy as np +import pandas as pd +import mindspore.dataset as ds +import mindspore.common.dtype as mstype + +from src.model_utils.config_modelarts import data_config as DataConfig + + +class DataType(Enum): + """ + Enumerate supported dataset format. + """ + MINDRECORD = 1 + TFRECORD = 2 + H5 = 3 + + +class H5Dataset(): + """ + Create dataset with H5 format. + + Args: + data_path (str): Dataset directory. + train_mode (bool): Whether dataset is used for train or eval (default=True). + train_num_of_parts (int): The number of train data file (default=21). + test_num_of_parts (int): The number of test data file (default=3). + """ + max_length = 39 + + def __init__(self, data_path, train_mode=True, + train_num_of_parts=DataConfig.train_num_of_parts, + test_num_of_parts=DataConfig.test_num_of_parts): + self._hdf_data_dir = data_path + self._is_training = train_mode + if self._is_training: + self._file_prefix = 'train' + self._num_of_parts = train_num_of_parts + else: + self._file_prefix = 'test' + self._num_of_parts = test_num_of_parts + self.data_size = self._bin_count(self._hdf_data_dir, self._file_prefix, self._num_of_parts) + print("data_size: {}".format(self.data_size)) + + def _bin_count(self, hdf_data_dir, file_prefix, num_of_parts): + size = 0 + for part in range(num_of_parts): + _y = pd.read_hdf(os.path.join(hdf_data_dir, f'{file_prefix}_output_part_{str(part)}.h5')) + size += _y.shape[0] + return size + + def _iterate_hdf_files_(self, num_of_parts=None, + shuffle_block=False): + """ + iterate among hdf files(blocks). when the whole data set is finished, the iterator restarts + from the beginning, thus the data stream will never stop + :param train_mode: True or false,false is eval_mode, + this file iterator will go through the train set + :param num_of_parts: number of files + :param shuffle_block: shuffle block files at every round + :return: input_hdf_file_name, output_hdf_file_name, finish_flag + """ + parts = np.arange(num_of_parts) + while True: + if shuffle_block: + for _ in range(int(shuffle_block)): + np.random.shuffle(parts) + for i, p in enumerate(parts): + yield os.path.join(self._hdf_data_dir, f'{self._file_prefix}_input_part_{str(p)}.h5'), \ + os.path.join(self._hdf_data_dir, f'{self._file_prefix}_output_part_{str(p)}.h5'), \ + i + 1 == len(parts) + + def _generator(self, X, y, batch_size, shuffle=True): + """ + should be accessed only in private + :param X: + :param y: + :param batch_size: + :param shuffle: + :return: + """ + number_of_batches = np.ceil(1. * X.shape[0] / batch_size) + counter = 0 + finished = False + sample_index = np.arange(X.shape[0]) + if shuffle: + for _ in range(int(shuffle)): + np.random.shuffle(sample_index) + assert X.shape[0] > 0 + while True: + batch_index = sample_index[batch_size * counter: batch_size * (counter + 1)] + X_batch = X[batch_index] + y_batch = y[batch_index] + counter += 1 + yield X_batch, y_batch, finished + if counter == number_of_batches: + counter = 0 + finished = True + + def batch_generator(self, batch_size=1000, + random_sample=False, shuffle_block=False): + """ + :param train_mode: True or false,false is eval_mode, + :param batch_size + :param num_of_parts: number of files + :param random_sample: if True, will shuffle + :param shuffle_block: shuffle file blocks at every round + :return: + """ + + for hdf_in, hdf_out, _ in self._iterate_hdf_files_(self._num_of_parts, + shuffle_block): + start = stop = None + X_all = pd.read_hdf(hdf_in, start=start, stop=stop).values + y_all = pd.read_hdf(hdf_out, start=start, stop=stop).values + data_gen = self._generator(X_all, y_all, batch_size, + shuffle=random_sample) + finished = False + + while not finished: + X, y, finished = data_gen.__next__() + X_id = X[:, 0:self.max_length] + X_va = X[:, self.max_length:] + yield np.array(X_id.astype(dtype=np.int32)), \ + np.array(X_va.astype(dtype=np.float32)), \ + np.array(y.astype(dtype=np.float32)) + + +def _get_h5_dataset(directory, train_mode=True, epochs=1, batch_size=1000): + """ + Get dataset with h5 format. + + Args: + directory (str): Dataset directory. + train_mode (bool): Whether dataset is use for train or eval (default=True). + epochs (int): Dataset epoch size (default=1). + batch_size (int): Dataset batch size (default=1000) + + Returns: + Dataset. + """ + data_para = {'batch_size': batch_size} + if train_mode: + data_para['random_sample'] = True + data_para['shuffle_block'] = True + + h5_dataset = H5Dataset(data_path=directory, train_mode=train_mode) + numbers_of_batch = math.ceil(h5_dataset.data_size / batch_size) + + def _iter_h5_data(): + train_eval_gen = h5_dataset.batch_generator(**data_para) + for _ in range(0, numbers_of_batch, 1): + yield train_eval_gen.__next__() + + data_set = ds.GeneratorDataset(_iter_h5_data, ["ids", "weights", "labels"]) + data_set = data_set.repeat(epochs) + return data_set + + +def _get_mindrecord_dataset(directory, train_mode=True, epochs=1, batch_size=1000, + line_per_sample=1000, rank_size=None, rank_id=None): + """ + Get dataset with mindrecord format. + + Args: + directory (str): Dataset directory. + train_mode (bool): Whether dataset is use for train or eval (default=True). + epochs (int): Dataset epoch size (default=1). + batch_size (int): Dataset batch size (default=1000). + line_per_sample (int): The number of sample per line (default=1000). + rank_size (int): The number of device, not necessary for single device (default=None). + rank_id (int): Id of device, not necessary for single device (default=None). + + Returns: + Dataset. + """ + file_prefix_name = 'train_input_part.mindrecord' if train_mode else 'test_input_part.mindrecord' + file_suffix_name = '00' if train_mode else '0' + shuffle = train_mode + + if rank_size is not None and rank_id is not None: + data_set = ds.MindDataset(os.path.join(directory, file_prefix_name + file_suffix_name), + columns_list=['feat_ids', 'feat_vals', 'label'], + num_shards=rank_size, shard_id=rank_id, shuffle=shuffle, + num_parallel_workers=8) + else: + data_set = ds.MindDataset(os.path.join(directory, file_prefix_name + file_suffix_name), + columns_list=['feat_ids', 'feat_vals', 'label'], + shuffle=shuffle, num_parallel_workers=8) + data_set = data_set.batch(int(batch_size / line_per_sample), drop_remainder=True) + data_set = data_set.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(batch_size, 39), + np.array(y).flatten().reshape(batch_size, 39), + np.array(z).flatten().reshape(batch_size, 1))), + input_columns=['feat_ids', 'feat_vals', 'label'], + column_order=['feat_ids', 'feat_vals', 'label'], + num_parallel_workers=8) + data_set = data_set.repeat(epochs) + return data_set + + +def _get_tf_dataset(directory, train_mode=True, epochs=1, batch_size=1000, + line_per_sample=1000, rank_size=None, rank_id=None): + """ + Get dataset with tfrecord format. + + Args: + directory (str): Dataset directory. + train_mode (bool): Whether dataset is use for train or eval (default=True). + epochs (int): Dataset epoch size (default=1). + batch_size (int): Dataset batch size (default=1000). + line_per_sample (int): The number of sample per line (default=1000). + rank_size (int): The number of device, not necessary for single device (default=None). + rank_id (int): Id of device, not necessary for single device (default=None). + + Returns: + Dataset. + """ + dataset_files = [] + file_prefixt_name = 'train' if train_mode else 'test' + shuffle = train_mode + for (dir_path, _, filenames) in os.walk(directory): + for filename in filenames: + if file_prefixt_name in filename and 'tfrecord' in filename: + dataset_files.append(os.path.join(dir_path, filename)) + schema = ds.Schema() + schema.add_column('feat_ids', de_type=mstype.int32) + schema.add_column('feat_vals', de_type=mstype.float32) + schema.add_column('label', de_type=mstype.float32) + if rank_size is not None and rank_id is not None: + data_set = ds.TFRecordDataset(dataset_files=dataset_files, shuffle=shuffle, + schema=schema, num_parallel_workers=8, + num_shards=rank_size, shard_id=rank_id, + shard_equal_rows=True) + else: + data_set = ds.TFRecordDataset(dataset_files=dataset_files, shuffle=shuffle, + schema=schema, num_parallel_workers=8) + data_set = data_set.batch(int(batch_size / line_per_sample), drop_remainder=True) + data_set = data_set.map(operations=(lambda x, y, z: ( + np.array(x).flatten().reshape(batch_size, 39), + np.array(y).flatten().reshape(batch_size, 39), + np.array(z).flatten().reshape(batch_size, 1))), + input_columns=['feat_ids', 'feat_vals', 'label'], + column_order=['feat_ids', 'feat_vals', 'label'], + num_parallel_workers=8) + data_set = data_set.repeat(epochs) + return data_set + + +def create_dataset(directory, train_mode=True, epochs=1, batch_size=1000, + data_type=DataType.TFRECORD, line_per_sample=1000, + rank_size=None, rank_id=None): + """ + Get dataset. + + Args: + directory (str): Dataset directory. + train_mode (bool): Whether dataset is use for train or eval (default=True). + epochs (int): Dataset epoch size (default=1). + batch_size (int): Dataset batch size (default=1000). + data_type (DataType): The type of dataset which is one of H5, TFRECORE, MINDRECORD (default=TFRECORD). + line_per_sample (int): The number of sample per line (default=1000). + rank_size (int): The number of device, not necessary for single device (default=None). + rank_id (int): Id of device, not necessary for single device (default=None). + + Returns: + Dataset. + """ + if data_type == DataType.MINDRECORD: + return _get_mindrecord_dataset(directory, train_mode, epochs, + batch_size, line_per_sample, + rank_size, rank_id) + if data_type == DataType.TFRECORD: + return _get_tf_dataset(directory, train_mode, epochs, batch_size, + line_per_sample, rank_size=rank_size, rank_id=rank_id) + + if rank_size is not None and rank_size > 1: + raise ValueError('Please use mindrecord dataset.') + return _get_h5_dataset(directory, train_mode, epochs, batch_size) diff --git a/research/recommend/autodis/src/model_utils/config_modelarts.py b/research/recommend/autodis/src/model_utils/config_modelarts.py new file mode 100644 index 0000000000000000000000000000000000000000..00219a456c5cc89b3a92f13d31238a3400bfdef9 --- /dev/null +++ b/research/recommend/autodis/src/model_utils/config_modelarts.py @@ -0,0 +1,153 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Parse arguments""" + +import os +import ast +import argparse +from pprint import pprint, pformat +import yaml + +_config = "default_config.yaml" + + +class Config: + """ + Configuration namespace. Convert dictionary to members. + """ + def __init__(self, cfg_dict): + for k, v in cfg_dict.items(): + if isinstance(v, (list, tuple)): + setattr(self, k, [Config(x) if isinstance(x, dict) else x for x in v]) + else: + setattr(self, k, Config(v) if isinstance(v, dict) else v) + + def __str__(self): + return pformat(self.__dict__) + + def __repr__(self): + return self.__str__() + + +def parse_cli_to_yaml(parser, cfg, helper=None, choices=None, cfg_path=_config): + """ + Parse command line arguments to the configuration according to the default yaml. + + Args: + parser: Parent parser. + cfg: Base configuration. + helper: Helper description. + cfg_path: Path to the default yaml config. + """ + parser = argparse.ArgumentParser(description="[REPLACE THIS at config.py]", + parents=[parser]) + helper = {} if helper is None else helper + choices = {} if choices is None else choices + for item in cfg: + if not isinstance(cfg[item], list) and not isinstance(cfg[item], dict): + help_description = helper[item] if item in helper else "Please reference to {}".format(cfg_path) + choice = choices[item] if item in choices else None + if isinstance(cfg[item], bool): + parser.add_argument("--" + item, type=ast.literal_eval, default=cfg[item], choices=choice, + help=help_description) + else: + parser.add_argument("--" + item, type=type(cfg[item]), default=cfg[item], choices=choice, + help=help_description) + args, _ = parser.parse_known_args() + return args + + +def parse_yaml(yaml_path): + """ + Parse the yaml config file. + + Args: + yaml_path: Path to the yaml config. + """ + with open(yaml_path, 'r') as fin: + try: + cfgs = yaml.load_all(fin.read(), Loader=yaml.FullLoader) + cfgs = [x for x in cfgs] + if len(cfgs) == 1: + cfg_helper = {} + cfg = cfgs[0] + cfg_choices = {} + elif len(cfgs) == 2: + cfg, cfg_helper = cfgs + cfg_choices = {} + elif len(cfgs) == 3: + cfg, cfg_helper, cfg_choices = cfgs + else: + raise ValueError("At most 3 docs (config, description for help, choices) are supported in config yaml") + print(cfg_helper) + except: + raise ValueError("Failed to parse yaml") + return cfg, cfg_helper, cfg_choices + + +def merge(args, cfg): + """ + Merge the base config from yaml file and command line arguments. + + Args: + args: Command line arguments. + cfg: Base configuration. + """ + args_var = vars(args) + for item in args_var: + cfg[item] = args_var[item] + return cfg + + +def extra_operations(cfg): + """ + Do extra work on Config object. + + Args: + cfg: Object after instantiation of class 'Config'. + """ + cfg.ModelConfig.batch_size = cfg.DataConfig.batch_size + cfg.ModelConfig.data_field_size = cfg.DataConfig.data_field_size + cfg.ModelConfig.data_vocab_size = cfg.DataConfig.data_vocab_size + cfg.TrainConfig.batch_size = cfg.DataConfig.batch_size + + +def get_config(): + """ + Get Config according to the yaml file and cli arguments. + """ + parser = argparse.ArgumentParser(description="default name", add_help=False) + current_dir = os.path.dirname(os.path.abspath(__file__)) + parser.add_argument("--config_path", type=str, default=os.path.join(current_dir, "../../{}".format(_config)), + help="Config file path") + path_args, _ = parser.parse_known_args() + default, helper, choices = parse_yaml(path_args.config_path) + args = parse_cli_to_yaml(parser=parser, cfg=default, helper=helper, choices=choices, cfg_path=path_args.config_path) + final_config = merge(args, default) + pprint(final_config) + print("Please check the above information for the configurations", flush=True) + config_obj = Config(final_config) + extra_operations(config_obj) + return config_obj + + +config = get_config() +data_config = config.DataConfig +model_config = config.ModelConfig +train_config = config.TrainConfig + +if __name__ == '__main__': + print(config) diff --git a/research/recommend/autodis/src/model_utils/device_adapter_modelarts.py b/research/recommend/autodis/src/model_utils/device_adapter_modelarts.py new file mode 100644 index 0000000000000000000000000000000000000000..bc41f8a2ac239bfbf0b3d1e040b6783caa920d21 --- /dev/null +++ b/research/recommend/autodis/src/model_utils/device_adapter_modelarts.py @@ -0,0 +1,27 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Device adapter for ModelArts""" + +from src.model_utils.config_modelarts import config + +if config.enable_modelarts: + from src.model_utils.moxing_adapter import get_device_id, get_device_num, get_rank_id, get_job_id +else: + from src.model_utils.local_adapter import get_device_id, get_device_num, get_rank_id, get_job_id + +__all__ = [ + "get_device_id", "get_device_num", "get_rank_id", "get_job_id" +] diff --git a/research/recommend/autodis/src/model_utils/moxing_adapter_modelarts.py b/research/recommend/autodis/src/model_utils/moxing_adapter_modelarts.py new file mode 100644 index 0000000000000000000000000000000000000000..e828c1c9f9f2e781ecff273bbe41ce690d583fc4 --- /dev/null +++ b/research/recommend/autodis/src/model_utils/moxing_adapter_modelarts.py @@ -0,0 +1,123 @@ +# Copyright 2021 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +"""Moxing adapter for ModelArts""" + +import os +import functools +from mindspore import context +from mindspore.profiler import Profiler +from src.model_utils.config_modelarts import config + +_global_sync_count = 0 + +def get_device_id(): + device_id = os.getenv('DEVICE_ID', '0') + return int(device_id) + + +def get_device_num(): + device_num = os.getenv('RANK_SIZE', '1') + return int(device_num) + + +def get_rank_id(): + global_rank_id = os.getenv('RANK_ID', '0') + return int(global_rank_id) + + +def get_job_id(): + job_id = os.getenv('JOB_ID') + job_id = job_id if job_id != "" else "default" + return job_id + +def sync_data(from_path, to_path): + """ + Download data from remote obs to local directory if the first url is remote url and the second one is local path + Upload data from local directory to remote obs in contrast. + """ + import moxing as mox + import time + global _global_sync_count + sync_lock = "/tmp/copy_sync.lock" + str(_global_sync_count) + _global_sync_count += 1 + + # Each server contains 8 devices as most. + if get_device_id() % min(get_device_num(), 8) == 0 and not os.path.exists(sync_lock): + print("from path: ", from_path) + print("to path: ", to_path) + mox.file.copy_parallel(from_path, to_path) + print("===finish data synchronization===") + try: + os.mknod(sync_lock) + # print("os.mknod({}) success".format(sync_lock)) + except IOError: + pass + print("===save flag===") + + while True: + if os.path.exists(sync_lock): + break + time.sleep(1) + + print("Finish sync data from {} to {}.".format(from_path, to_path)) + + +def moxing_wrapper(pre_process=None, post_process=None): + """ + Moxing wrapper to download dataset and upload outputs. + """ + def wrapper(run_func): + @functools.wraps(run_func) + def wrapped_func(*args, **kwargs): + # Download data from data_url + if config.enable_modelarts: + if config.data_url: + sync_data(config.data_url, config.data_path) + print("Dataset downloaded: ", os.listdir(config.data_path)) + if config.checkpoint_url: + sync_data(config.checkpoint_url, config.load_path) + print("Preload downloaded: ", os.listdir(config.load_path)) + if config.train_url: + sync_data(config.train_url, config.output_path) + print("Workspace downloaded: ", os.listdir(config.output_path)) + + context.set_context(save_graphs_path=os.path.join(config.output_path, str(get_rank_id()))) + config.device_num = get_device_num() + config.device_id = get_device_id() + if not os.path.exists(config.output_path): + os.makedirs(config.output_path) + + if pre_process: + pre_process() + + if config.enable_profiling: + profiler = Profiler() + + run_func(*args, **kwargs) + + if config.enable_profiling: + profiler.analyse() + + # Upload data to train_url + if config.enable_modelarts: + if post_process: + post_process() + + if config.train_url: + print("Start to copy output directory") + sync_data(config.output_path, config.train_url) + return wrapped_func + return wrapper