diff --git a/official/recommend/naml/docker_start.sh b/official/recommend/naml/docker_start.sh
new file mode 100644
index 0000000000000000000000000000000000000000..ff4ec545577096e72691d0cdbaa4403e2aacadca
--- /dev/null
+++ b/official/recommend/naml/docker_start.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+
+# Copyright(C) 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+docker_image=$1
+data_dir=$2
+model_dir=$3
+
+docker run -it --ipc=host \
+ --device=/dev/davinci0 \
+ --device=/dev/davinci1 \
+ --device=/dev/davinci2 \
+ --device=/dev/davinci3 \
+ --device=/dev/davinci4 \
+ --device=/dev/davinci5 \
+ --device=/dev/davinci6 \
+ --device=/dev/davinci7 \
+ --device=/dev/davinci_manager \
+ --device=/dev/devmm_svm --device=/dev/hisi_hdc \
+ -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \
+ -v /usr/local/Ascend/add-ons/:/usr/local/Ascend/add-ons/ \
+ -v ${model_dir}:${model_dir} \
+ -v ${data_dir}:${data_dir} \
+ -v ~/ascend/log/npu/conf/slog/slog.conf:/var/log/npu/conf/slog/slog.conf \
+ -v ~/ascend/log/npu/slog/:/var/log/npu/slog -v ~/ascend/log/npu/profiling/:/var/log/npu/profiling \
+ -v ~/ascend/log/npu/dump/:/var/log/npu/dump -v ~/ascend/log/npu/:/usr/slog ${docker_image} \
+ /bin/bash
diff --git a/official/recommend/naml/infer/convert/convert.sh b/official/recommend/naml/infer/convert/convert.sh
new file mode 100644
index 0000000000000000000000000000000000000000..a81a59963df534736902b4f9a8a8c368303d83ca
--- /dev/null
+++ b/official/recommend/naml/infer/convert/convert.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+
+air_path=$1
+om_path=$2
+
+echo "Input AIR file path: ${air_path}"
+echo "Output OM file path: ${om_path}"
+
+atc --framework=1 --model="${air_path}" \
+ --output="${om_path}" \
+ --input_shape="x:1,1,16,48" \
+ --soc_version=Ascend310
+
\ No newline at end of file
diff --git a/official/recommend/naml/infer/data/config/news.pipeline b/official/recommend/naml/infer/data/config/news.pipeline
new file mode 100644
index 0000000000000000000000000000000000000000..a22de77a90b5a66be035d40f5c2b66f0edfff1f7
--- /dev/null
+++ b/official/recommend/naml/infer/data/config/news.pipeline
@@ -0,0 +1,53 @@
+{
+ "news_pipeline": {
+ "stream_config": {
+ "deviceId": "0"
+ },
+ "appsrc0": {
+ "props": {
+ "blocksize": "409600"
+ },
+ "factory": "appsrc",
+ "next": "mxpi_tensorinfer0:0"
+ },
+ "appsrc1": {
+ "props": {
+ "blocksize": "409600"
+ },
+ "factory": "appsrc",
+ "next": "mxpi_tensorinfer0:1"
+ },
+ "appsrc2": {
+ "props": {
+ "blocksize": "409600"
+ },
+ "factory": "appsrc",
+ "next": "mxpi_tensorinfer0:2"
+ },
+ "appsrc3": {
+ "props": {
+ "blocksize": "409600"
+ },
+ "factory": "appsrc",
+ "next": "mxpi_tensorinfer0:3"
+ },
+ "appsink0": {
+ "factory": "appsink"
+ },
+ "mxpi_tensorinfer0": {
+ "props": {
+ "dataSource": "appsrc0,appsrc1,appsrc2,appsrc3",
+ "modelPath": "../data/model/naml_news_encoder_bs_1.om"
+ },
+ "factory": "mxpi_tensorinfer",
+ "next": "mxpi_dataserialize"
+ },
+ "mxpi_dataserialize": {
+ "props": {
+ "outputDataKeys": "mxpi_tensorinfer0"
+ },
+ "factory": "mxpi_dataserialize",
+ "next": "appsink0"
+ }
+ }
+}
\ No newline at end of file
diff --git a/official/recommend/naml/infer/data/config/user.pipeline b/official/recommend/naml/infer/data/config/user.pipeline
new file mode 100644
index 0000000000000000000000000000000000000000..167e6cb9454881d29a06856ef746547cd77ab3e6
--- /dev/null
+++ b/official/recommend/naml/infer/data/config/user.pipeline
@@ -0,0 +1,32 @@
+{
+ "user_pipeline": {
+ "stream_config": {
+ "deviceId": "0"
+ },
+ "appsrc0": {
+ "props": {
+ "blocksize": "409600"
+ },
+ "factory": "appsrc",
+ "next": "mxpi_tensorinfer1"
+ },
+ "mxpi_tensorinfer1": {
+ "props": {
+ "dataSource": "appsrc0",
+ "modelPath": "../data/model/naml_user_encoder_bs_1.om"
+ },
+ "factory": "mxpi_tensorinfer",
+ "next": "mxpi_dataserialize0"
+ },
+ "mxpi_dataserialize0": {
+ "props": {
+ "outputDataKeys": "mxpi_tensorinfer1"
+ },
+ "factory": "mxpi_dataserialize",
+ "next": "appsink0"
+ },
+ "appsink0": {
+ "factory": "appsink"
+ }
+ }
+}
\ No newline at end of file
diff --git a/official/recommend/naml/infer/docker_start_infer.sh b/official/recommend/naml/infer/docker_start_infer.sh
new file mode 100644
index 0000000000000000000000000000000000000000..c69eae648c5cb61117d39f350af09363699436c7
--- /dev/null
+++ b/official/recommend/naml/infer/docker_start_infer.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# Copyright(C) 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+docker_image=$1
+model_dir=$2
+
+if [ -z "${docker_image}" ]; then
+ echo "please input docker_image"
+ exit 1
+fi
+
+if [ ! -d "${model_dir}" ]; then
+ echo "please input model_dir"
+ exit 1
+fi
+
+docker run -it \
+ --device=/dev/davinci0 \
+ --device=/dev/davinci_manager \
+ --device=/dev/devmm_svm \
+ --device=/dev/hisi_hdc \
+ -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \
+ -v ${model_dir}:${model_dir} \
+ ${docker_image} \
+ /bin/bash
\ No newline at end of file
diff --git a/official/recommend/naml/infer/mxbase/CMakeLists.txt b/official/recommend/naml/infer/mxbase/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..4b9cefde5ee084f71d8a2a3436fdbd94f03f9f37
--- /dev/null
+++ b/official/recommend/naml/infer/mxbase/CMakeLists.txt
@@ -0,0 +1,51 @@
+cmake_minimum_required(VERSION 3.10.0)
+project(naml)
+
+set(TARGET naml)
+
+add_definitions(-DENABLE_DVPP_INTERFACE)
+add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0)
+add_definitions(-Dgoogle=mindxsdk_private)
+add_compile_options(-std=c++11 -fPIE -fstack-protector-all -fPIC -Wall)
+add_link_options(-Wl,-z,relro,-z,now,-z,noexecstack -s -pie)
+
+# Check environment variable
+if(NOT DEFINED ENV{ASCEND_HOME})
+ message(FATAL_ERROR "please define environment variable:ASCEND_HOME")
+endif()
+if(NOT DEFINED ENV{ASCEND_VERSION})
+ message(WARNING "please define environment variable:ASCEND_VERSION")
+endif()
+if(NOT DEFINED ENV{ARCH_PATTERN})
+ message(WARNING "please define environment variable:ARCH_PATTERN")
+endif()
+set(ACL_INC_DIR $ENV{ASCEND_HOME}/$ENV{ASCEND_VERSION}/$ENV{ARCH_PATTERN}/acllib/include)
+set(ACL_LIB_DIR $ENV{ASCEND_HOME}/$ENV{ASCEND_VERSION}/$ENV{ARCH_PATTERN}/acllib/lib64)
+
+set(MXBASE_ROOT_DIR $ENV{MX_SDK_HOME})
+set(MXBASE_INC ${MXBASE_ROOT_DIR}/include)
+set(MXBASE_LIB_DIR ${MXBASE_ROOT_DIR}/lib)
+set(MXBASE_POST_LIB_DIR ${MXBASE_ROOT_DIR}/lib/modelpostprocessors)
+set(MXBASE_POST_PROCESS_DIR ${MXBASE_ROOT_DIR}/include/MxBase/postprocess/include)
+if(DEFINED ENV{MXSDK_OPENSOURCE_DIR})
+ set(OPENSOURCE_DIR $ENV{MXSDK_OPENSOURCE_DIR})
+else()
+ set(OPENSOURCE_DIR ${MXBASE_ROOT_DIR}/opensource)
+endif()
+
+include_directories(${ACL_INC_DIR})
+include_directories(${OPENSOURCE_DIR}/include)
+include_directories(${OPENSOURCE_DIR}/include/opencv4)
+
+include_directories(${MXBASE_INC})
+include_directories(${MXBASE_POST_PROCESS_DIR})
+
+link_directories(${ACL_LIB_DIR})
+link_directories(${OPENSOURCE_DIR}/lib)
+link_directories(${MXBASE_LIB_DIR})
+link_directories(${MXBASE_POST_LIB_DIR})
+
+add_executable(${TARGET} src/main.cpp src/Naml.cpp)
+target_link_libraries(${TARGET} glog cpprest mxbase opencv_world stdc++fs)
+
+install(TARGETS ${TARGET} RUNTIME DESTINATION ${PROJECT_SOURCE_DIR}/)
diff --git a/official/recommend/naml/infer/mxbase/build.sh b/official/recommend/naml/infer/mxbase/build.sh
new file mode 100644
index 0000000000000000000000000000000000000000..f32ef247e5079593757334d5c6ef4a796850aced
--- /dev/null
+++ b/official/recommend/naml/infer/mxbase/build.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+
+path_cur=$(dirname $0)
+
+function check_env()
+{
+ # set ASCEND_VERSION to ascend-toolkit/latest when it was not specified by user
+ if [ ! "${ASCEND_VERSION}" ]; then
+ export ASCEND_VERSION=ascend-toolkit/latest
+ echo "Set ASCEND_VERSION to the default value: ${ASCEND_VERSION}"
+ else
+ echo "ASCEND_VERSION is set to ${ASCEND_VERSION} by user"
+ fi
+
+ if [ ! "${ARCH_PATTERN}" ]; then
+ # set ARCH_PATTERN to ./ when it was not specified by user
+ export ARCH_PATTERN=./
+ echo "ARCH_PATTERN is set to the default value: ${ARCH_PATTERN}"
+ else
+ echo "ARCH_PATTERN is set to ${ARCH_PATTERN} by user"
+ fi
+}
+
+function build_naml()
+{
+ cd $path_cur
+ rm -rf build
+ mkdir -p build
+ cd build
+ cmake ..
+ make
+ ret=$?
+ if [ ${ret} -ne 0 ]; then
+ echo "Failed to build naml."
+ exit ${ret}
+ fi
+ make install
+}
+
+check_env
+build_naml
diff --git a/official/recommend/naml/infer/mxbase/src/Naml.cpp b/official/recommend/naml/infer/mxbase/src/Naml.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..0ad240863b8e4a6ed73570e3b2c5748cd85811d5
--- /dev/null
+++ b/official/recommend/naml/infer/mxbase/src/Naml.cpp
@@ -0,0 +1,448 @@
+/**
+ * Copyright 2022 Huawei Technologies Co., Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Naml.h"
+
+#include <sys/stat.h>
+
+#include <fstream>
+#include <iostream>
+#include <sstream>
+#include <utility>
+#include <vector>
+
+#include "MxBase/DeviceManager/DeviceManager.h"
+#include "MxBase/Log/Log.h"
+
+APP_ERROR Naml::init(const InitParam & initParam) {
+ deviceId_ = initParam.deviceId;
+ APP_ERROR ret = MxBase::DeviceManager::GetInstance()->InitDevices();
+ if (ret != APP_ERR_OK) {
+ LogError << "Init devices failed, ret=" << ret << ".";
+ return ret;
+ }
+ ret = MxBase::TensorContext::GetInstance()->SetContext(initParam.deviceId);
+ if (ret != APP_ERR_OK) {
+ LogError << "Set context failed, ret=" << ret << ".";
+ return ret;
+ }
+
+ news_model_ = std::make_shared < MxBase::ModelInferenceProcessor >();
+ ret = news_model_->Init(initParam.newsmodelPath, news_modelDesc_);
+ if (ret != APP_ERR_OK) {
+ LogError << "news_model_ init failed, ret=" << ret << ".";
+ return ret;
+ }
+
+ user_model_ = std::make_shared < MxBase::ModelInferenceProcessor >();
+ ret = user_model_->Init(initParam.usermodelPath, user_modelDesc_);
+ if (ret != APP_ERR_OK) {
+ LogError << "user_model_ init failed, ret=" << ret << ".";
+ return ret;
+ }
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::de_init() {
+ news_model_->DeInit();
+ user_model_->DeInit();
+ MxBase::DeviceManager::GetInstance()->DestroyDevices();
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::process(const std::vector < std::string > & datapaths,
+const InitParam & initParam) {
+ APP_ERROR ret = init(initParam);
+ if (ret != APP_ERR_OK) {
+ LogError << "Naml init newsmodel failed, ret=" << ret << ".";
+ return ret;
+ }
+ std::vector < std::vector < std::string >> news_input;
+ std::string input_news_path = datapaths[0];
+ std::map < uint32_t, std::vector < float_t >> news_ret_map;
+ ret = readfile(input_news_path, news_input);
+ if (ret != APP_ERR_OK) {
+ LogError << "read datafile failed, ret=" << ret << ".";
+ return ret;
+ }
+ ret = news_process(news_input, news_ret_map);
+ if (ret != APP_ERR_OK) {
+ LogError << "news_process failed, ret=" << ret << ".";
+ return ret;
+ }
+
+ std::vector < std::vector < std::string >> user_input;
+ std::string input_user_path = datapaths[1];
+ std::map < uint32_t, std::vector < float_t >> user_ret_map;
+ ret = readfile(input_user_path, user_input);
+ if (ret != APP_ERR_OK) {
+ LogError << "read datafile failed, ret=" << ret << ".";
+ return ret;
+ }
+
+ ret = user_process(user_input, user_ret_map, news_ret_map);
+ if (ret != APP_ERR_OK) {
+ LogError << "user_process failed, ret=" << ret << ".";
+ return ret;
+ }
+
+ std::vector < std::vector < std::string >> eval_input;
+ std::string input_eval_path = datapaths[2];
+
+ ret = readfile(input_eval_path, eval_input);
+ if (ret != APP_ERR_OK) {
+ LogError << "read datafile failed, ret=" << ret << ".";
+ return ret;
+ }
+ ret = pred_process(eval_input, news_ret_map, user_ret_map);
+ if (ret != APP_ERR_OK) {
+ LogError << "pred_process failed, ret=" << ret << ".";
+ return ret;
+ }
+ return APP_ERR_OK;
+}
+
+void string2vector(std::vector < uint32_t > & vec, std::string str) {
+ std::istringstream sstr(str);
+ while (sstr.good()) {
+ uint32_t num = 0;
+ sstr >> num;
+ vec.push_back(num);
+ }
+}
+
+APP_ERROR Naml::pred_process(std::vector < std::vector < std::string >> & eval_input,
+std::map < uint32_t, std::vector < float_t >> & news_ret_map,
+std::map < uint32_t, std::vector < float_t >> & user_ret_map) {
+ std::vector < float > vec_AUC;
+ for (size_t i = 0; i < eval_input.size() - 1; i++) {
+ std::vector < std::vector < float >> newsCandidate;
+ std::vector < uint32_t > candidateNewsIds;
+ string2vector(candidateNewsIds, eval_input[i][1]);
+ std::uint32_t candidateUid = 0;
+ std::istringstream uid(eval_input[i][0]);
+ uid >> candidateUid;
+ std::vector < float > predResult;
+ for (size_t j = 0; j < candidateNewsIds.size(); j++) {
+ auto it = news_ret_map.find(candidateNewsIds[j]);
+ if (it != news_ret_map.end()) {
+ newsCandidate.push_back(news_ret_map[candidateNewsIds[j]]);
+ }
+ }
+ for (size_t j = 0; j < newsCandidate.size(); j++) {
+ float dotMulResult = 0;
+ for (int k = 0; k < 400; ++k) {
+ auto it = user_ret_map.find(candidateUid);
+ if (it != user_ret_map.end()) {
+ dotMulResult += newsCandidate[j][k] * user_ret_map[candidateUid][k];
+ }
+ }
+ predResult.push_back(dotMulResult);
+ }
+ if (predResult.size() > 0) {
+ std::vector < uint32_t > labels;
+ string2vector(labels, eval_input[i][2]);
+ calcAUC(vec_AUC, predResult, labels);
+ LogInfo << "The pred processing :" << i << " / " << eval_input.size() - 1
+ << std::endl;
+ }
+ }
+ LogInfo << "The pred processing :" << eval_input.size() - 1 << " / "
+ << eval_input.size() - 1 << std::endl;
+ float ans = 0;
+ for (size_t i = 0; i < vec_AUC.size(); i++) {
+ ans += vec_AUC[i];
+ }
+ ans = ans / vec_AUC.size();
+ LogInfo << "The AUC is :" << ans << std::endl;
+ return APP_ERR_OK;
+}
+
+void Naml::calcAUC(std::vector < float > & vec_auc, std::vector < float > & predResult,
+std::vector < uint32_t > & labels) {
+ int N = 0, P = 0;
+ std::vector < float > neg_prob;
+ std::vector < float > pos_prob;
+ for (size_t i = 0; i < labels.size(); i++) {
+ if (labels[i] == 1) {
+ P += 1;
+ pos_prob.push_back(predResult[i]);
+ } else {
+ N += 1;
+ neg_prob.push_back(predResult[i]);
+ }
+ }
+ float count = 0;
+ for (size_t i = 0; i < pos_prob.size(); i++) {
+ for (size_t j = 0; j < neg_prob.size(); j++) {
+ if (pos_prob[i] > neg_prob[j]) {
+ count += 1;
+ } else if (pos_prob[i] == neg_prob[j]) {
+ count += 0.5;
+ }
+ }
+ }
+ vec_auc.push_back(count / (N * P));
+}
+
+APP_ERROR Naml::news_process(std::vector < std::vector < std::string >> & news_input,
+std::map < uint32_t, std::vector < float_t >> & news_ret_map) {
+ for (size_t i = 0; i < news_input.size() - 1; i++) {
+ std::vector < MxBase::TensorBase > inputs = {
+ };
+ std::vector < MxBase::TensorBase > outputs = {
+ };
+ APP_ERROR ret = read_news_inputs(news_input[i], &inputs);
+ if (ret != APP_ERR_OK) {
+ LogError << "get inputs failed, ret=" << ret << ".";
+ return ret;
+ }
+ ret = inference(inputs, &outputs, news_model_, news_modelDesc_);
+ if (ret != APP_ERR_OK) {
+ LogError << "Inference failed, ret=" << ret << ".";
+ return ret;
+ }
+ std::uint32_t nid = 0;
+ std::istringstream sstream(news_input[i][0]);
+ sstream >> nid;
+ ret = post_process(&outputs, &news_ret_map, nid);
+ if (ret != APP_ERR_OK) {
+ LogError << "post_process failed, ret=" << ret << ".";
+ return ret;
+ }
+ LogInfo << "The news model is processing :" << i << " / "
+ << news_input.size() - 1 << std::endl;
+ }
+ LogInfo << "The news model completes the task:" << news_input.size() - 1
+ << " / " << news_input.size() - 1 << std::endl;
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::user_process(std::vector < std::vector < std::string >> & user_input,
+std::map < uint32_t, std::vector < float_t >> & user_ret_map,
+std::map < uint32_t, std::vector < float_t >> & news_ret_map) {
+ for (size_t i = 0; i < user_input.size() - 1; i++) {
+ std::vector < MxBase::TensorBase > inputs = {
+ };
+ std::vector < MxBase::TensorBase > outputs = {
+ };
+ APP_ERROR ret = read_user_inputs(user_input[i], news_ret_map, &inputs);
+ if (ret != APP_ERR_OK) {
+ LogError << "get inputs failed, ret=" << ret << ".";
+ return ret;
+ }
+ ret = inference(inputs, &outputs, user_model_, user_modelDesc_);
+ if (ret != APP_ERR_OK) {
+ LogError << "Inference failed, ret=" << ret << ".";
+ return ret;
+ }
+ std::uint32_t uid = 0;
+ std::istringstream sstream(user_input[i][0]);
+ sstream >> uid;
+ ret = post_process(&outputs, &user_ret_map, uid);
+ if (ret != APP_ERR_OK) {
+ LogError << "post_process failed, ret=" << ret << ".";
+ return ret;
+ }
+ LogInfo << "The user model is processing :" << i << " / "
+ << user_input.size() - 1 << std::endl;
+ }
+ LogInfo << "The user model completes the task:" << user_input.size() - 1
+ << " / " << user_input.size() - 1 << std::endl;
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::readfile(const std::string & filepath,
+std::vector < std::vector < std::string >> & datastr) {
+ std::ifstream infile;
+ infile.open(filepath, std::ios_base::in);
+ if (infile.fail()) {
+ LogError << "Failed to open file: " << filepath << ".";
+ return APP_ERR_COMM_OPEN_FAIL;
+ }
+ std::string linestr;
+ while (infile.good()) {
+ std::getline(infile, linestr);
+ std::istringstream sstream(linestr);
+ std::vector < std::string > vecstr;
+ std::string str;
+ while (std::getline(sstream, str, ',')) {
+ vecstr.push_back(str);
+ }
+ datastr.push_back(vecstr);
+ }
+ infile.close();
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::post_process(std::vector < MxBase::TensorBase > * outputs,
+std::map < uint32_t, std::vector < float_t >> * ret_map,
+uint32_t index) {
+ MxBase::TensorBase & tensor = outputs->at(0);
+ APP_ERROR ret = tensor.ToHost();
+ if (ret != APP_ERR_OK) {
+ LogError << GetError(ret) << "Tensor deploy to host failed.";
+ return ret;
+ }
+ // check tensor is available
+ auto outputShape = tensor.GetShape();
+ uint32_t length = outputShape[0];
+ uint32_t classNum = outputShape[1];
+ // LogInfo << "output shape is: " << outputShape[0] << " " << outputShape[1]
+ // << std::endl;
+ void * data = tensor.GetBuffer();
+ for (uint32_t i = 0; i < length; i++) {
+ std::vector < float_t > result = {
+ };
+ for (uint32_t j = 0; j < classNum; j++) {
+ float_t value = *(reinterpret_cast < float_t * > (data) + i * classNum + j);
+ result.push_back(value);
+ }
+ ret_map->insert(std::pair < uint32_t, std::vector < float_t >>(index, result));
+ }
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::inference(const std::vector < MxBase::TensorBase > & inputs,
+std::vector < MxBase::TensorBase > * outputs,
+std::shared_ptr < MxBase::ModelInferenceProcessor > & model_,
+MxBase::ModelDesc desc) {
+ auto dtypes = model_->GetOutputDataType();
+ for (size_t i = 0; i < desc.outputTensors.size(); ++i) {
+ std::vector < uint32_t > shape = {
+ };
+ for (size_t j = 0; j < desc.outputTensors[i].tensorDims.size(); ++j) {
+ shape.push_back((uint32_t)desc.outputTensors[i].tensorDims[j]);
+ }
+ MxBase::TensorBase tensor(shape, dtypes[i],
+ MxBase::MemoryData::MemoryType::MEMORY_DEVICE,
+ deviceId_);
+ APP_ERROR ret = MxBase::TensorBase::TensorBaseMalloc(tensor);
+ if (ret != APP_ERR_OK) {
+ LogError << "TensorBaseMalloc failed, ret=" << ret << ".";
+ return ret;
+ }
+ outputs->push_back(tensor);
+ }
+ MxBase::DynamicInfo dynamicInfo = {
+ };
+ dynamicInfo.dynamicType = MxBase::DynamicType::STATIC_BATCH;
+ APP_ERROR ret = model_->ModelInference(inputs, *outputs, dynamicInfo);
+ if (ret != APP_ERR_OK) {
+ LogError << "ModelInference failed, ret=" << ret << ".";
+ return ret;
+ }
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::read_user_inputs(std::vector < std::string > & datavec,
+std::map < uint32_t, std::vector < float_t >> & news_ret_map,
+std::vector < MxBase::TensorBase > * inputs) {
+ uint32_t history[50] = {
+ 0
+ };
+ read2arr(datavec[1], history);
+ std::vector < std::vector < float_t >> userdata;
+ for (size_t i = 0; i < 50; i++) {
+ auto it = news_ret_map.find(history[i]);
+ if (it != news_ret_map.end()) {
+ userdata.push_back(news_ret_map[history[i]]);
+ }
+ }
+ input_user_tensor(inputs, 0, userdata);
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::read_news_inputs(std::vector < std::string > & datavec,
+std::vector < MxBase::TensorBase > * inputs) {
+ int length[] = {
+ 1, 1, 16, 48
+ };
+ for (int i = 0; i < 4; i++) {
+ uint32_t data[length[i]] = {
+ 0
+ };
+ read2arr(datavec[i + 1], data);
+ APP_ERROR ret = input_news_tensor(inputs, i, data, length[i]);
+ if (ret != APP_ERR_OK) {
+ LogError << "input array failed.";
+ return ret;
+ }
+ }
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::read2arr(std::string & datastr, std::uint32_t * arr) {
+ std::istringstream str(datastr);
+ while (str.good()) {
+ str >> *arr;
+ arr++;
+ }
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::input_user_tensor(std::vector < MxBase::TensorBase > * inputs,
+uint8_t index,
+std::vector < std::vector < float_t >> & userdata) {
+ float_t data[50][400] = {
+ 0
+ };
+ for (size_t i = 0; i < userdata.size(); i++) {
+ for (size_t j = 0; j < 400; j++) {
+ data[i][j] = userdata[i][j];
+ }
+ }
+ const uint32_t dataSize = user_modelDesc_.inputTensors[index].tensorSize;
+ MxBase::MemoryData memoryDataDst(dataSize, MxBase::MemoryData::MEMORY_DEVICE,
+ deviceId_);
+ MxBase::MemoryData memoryDataSrc(reinterpret_cast < void * > (data), dataSize,
+ MxBase::MemoryData::MEMORY_HOST_MALLOC);
+ APP_ERROR ret =
+ MxBase::MemoryHelper::MxbsMallocAndCopy(memoryDataDst, memoryDataSrc);
+ if (ret != APP_ERR_OK) {
+ LogError << GetError(ret) << "Memory malloc and copy failed.";
+ return ret;
+ }
+ std::vector < uint32_t > shape = {
+ 1, 50, 400
+ };
+ inputs->push_back(MxBase::TensorBase(memoryDataDst, false, shape,
+ MxBase::TENSOR_DTYPE_FLOAT32));
+ return APP_ERR_OK;
+}
+
+APP_ERROR Naml::input_news_tensor(std::vector < MxBase::TensorBase > * inputs,
+uint8_t index, uint32_t * data,
+uint32_t tensor_size) {
+ const uint32_t dataSize = news_modelDesc_.inputTensors[index].tensorSize;
+
+ MxBase::MemoryData memoryDataDst(dataSize, MxBase::MemoryData::MEMORY_DEVICE,
+ deviceId_);
+ MxBase::MemoryData memoryDataSrc(reinterpret_cast < void * > (data), dataSize,
+ MxBase::MemoryData::MEMORY_HOST_MALLOC);
+ APP_ERROR ret =
+ MxBase::MemoryHelper::MxbsMallocAndCopy(memoryDataDst, memoryDataSrc);
+ if (ret != APP_ERR_OK) {
+ LogError << GetError(ret) << "Memory malloc and copy failed.";
+ return ret;
+ }
+ std::vector < uint32_t > shape = {
+ 1, tensor_size
+ };
+ inputs->push_back(MxBase::TensorBase(memoryDataDst, false, shape,
+ MxBase::TENSOR_DTYPE_UINT32));
+ return APP_ERR_OK;
+}
diff --git a/official/recommend/naml/infer/mxbase/src/Naml.h b/official/recommend/naml/infer/mxbase/src/Naml.h
new file mode 100644
index 0000000000000000000000000000000000000000..cc83520b43f62b8cc33f66d0f16955ed3b86cec7
--- /dev/null
+++ b/official/recommend/naml/infer/mxbase/src/Naml.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2022 Huawei Technologies Co., Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MXBASE_NAML_H
+#define MXBASE_NAML_H
+
+#include <string>
+#include <vector>
+#include <map>
+#include <memory>
+#include <opencv2/opencv.hpp>
+
+#include "MxBase/ModelInfer/ModelInferenceProcessor.h"
+#include "MxBase/Tensor/TensorContext/TensorContext.h"
+
+struct InitParam {
+ uint32_t deviceId;
+ std::string newsmodelPath;
+ std::string usermodelPath;
+ std::string newsDataPath;
+ std::string userDataPath;
+ std::string evalDataPath;
+};
+
+class Naml {
+ public:
+ APP_ERROR process(const std::vector < std::string > & datapaths,
+ const InitParam & initparam);
+ APP_ERROR inference(const std::vector < MxBase::TensorBase > & inputs,
+ std::vector < MxBase::TensorBase > * outputs,
+ std::shared_ptr < MxBase::ModelInferenceProcessor > & model_,
+ MxBase::ModelDesc desc);
+ APP_ERROR input_news_tensor(std::vector < MxBase::TensorBase > * inputs,
+ uint8_t index, uint32_t * data,
+ uint32_t tensor_size);
+ APP_ERROR input_user_tensor(std::vector < MxBase::TensorBase > * inputs,
+ uint8_t index,
+ std::vector < std::vector < float_t >> & userdata);
+ APP_ERROR de_init();
+ void calcAUC(std::vector < float > & vec_auc, std::vector < float > & predResult,
+ std::vector < uint32_t > & labels);
+ APP_ERROR readfile(const std::string & filepath,
+ std::vector < std::vector < std::string >> & datastr);
+ APP_ERROR read_news_inputs(std::vector < std::string > & datavec,
+ std::vector < MxBase::TensorBase > * inputs);
+ APP_ERROR read2arr(std::string & datastr, std::uint32_t * arr);
+ APP_ERROR pred_process(std::vector < std::vector < std::string >> & eval_input,
+ std::map < uint32_t, std::vector < float_t >> & news_ret_map,
+ std::map < uint32_t, std::vector < float_t >> & user_ret_map);
+ APP_ERROR news_process(std::vector < std::vector < std::string >> & news_input,
+ std::map < uint32_t, std::vector < float_t >> & news_ret_map);
+ APP_ERROR user_process(std::vector < std::vector < std::string >> & user_input,
+ std::map < uint32_t, std::vector < float_t >> & user_ret_map,
+ std::map < uint32_t, std::vector < float_t >> & news_ret_map);
+ APP_ERROR read_user_inputs(std::vector < std::string > & datavec,
+ std::map < uint32_t, std::vector < float_t >> & news_ret_map,
+ std::vector < MxBase::TensorBase > * inputs);
+ APP_ERROR post_process(std::vector < MxBase::TensorBase > * outputs,
+ std::map < uint32_t, std::vector < float_t >> * ret_map,
+ uint32_t index);
+ APP_ERROR init(const InitParam & initParam);
+
+ private:
+ std::shared_ptr < MxBase::ModelInferenceProcessor > news_model_;
+ std::shared_ptr < MxBase::ModelInferenceProcessor > user_model_;
+ MxBase::ModelDesc news_modelDesc_ = {
+ };
+ MxBase::ModelDesc user_modelDesc_ = {
+ };
+ uint32_t deviceId_ = 0;
+};
+#endif
diff --git a/official/recommend/naml/infer/mxbase/src/main.cpp b/official/recommend/naml/infer/mxbase/src/main.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..7c69c4beeb444828e75cccf6e8791c0d6241da87
--- /dev/null
+++ b/official/recommend/naml/infer/mxbase/src/main.cpp
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2022 Huawei Technologies Co., Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include "MxBase/Log/Log.h"
+#include "Naml.h"
+
+void InitSourceParam(InitParam* initParam, const std::string &model_dir) {
+ initParam->deviceId = 0;
+ initParam->newsmodelPath = model_dir + "naml_news_encoder_bs_1.om";
+ initParam->usermodelPath = model_dir + "naml_user_encoder_bs_1.om";
+ initParam->newsDataPath = "../data/mxbase_data/newsdata.csv";
+ initParam->userDataPath = "../data/mxbase_data/userdata.csv";
+ initParam->evalDataPath = "../data/mxbase_data/evaldata.csv";
+}
+
+int main(int argc, char const* argv[]) {
+ if (argc <= 1) {
+ LogError << "Please input model dir , such as './data/model/";
+ return APP_ERR_OK;
+ }
+ InitParam initParam;
+ std::string m_path = argv[1];
+ InitSourceParam(&initParam, m_path);
+ auto namlbase = std::make_shared < Naml >();
+ std::vector < std::string > datapaths;
+ datapaths.push_back(initParam.newsDataPath);
+ datapaths.push_back(initParam.userDataPath);
+ datapaths.push_back(initParam.evalDataPath);
+
+ APP_ERROR ret = namlbase->process(datapaths, initParam);
+ if (ret != APP_ERR_OK) {
+ LogError << "Naml process failed, ret=" << ret << ".";
+ return ret;
+ }
+ namlbase->de_init();
+ return 0;
+}
diff --git a/official/recommend/naml/infer/requirements.txt b/official/recommend/naml/infer/requirements.txt
new file mode 100644
index 0000000000000000000000000000000000000000..c1b032064f4e3bbf3b272d47854eee41bdd63aa9
--- /dev/null
+++ b/official/recommend/naml/infer/requirements.txt
@@ -0,0 +1,4 @@
+mindspore
+numpy
+sklearn
+pyyaml
diff --git a/official/recommend/naml/infer/sdk/main.py b/official/recommend/naml/infer/sdk/main.py
new file mode 100644
index 0000000000000000000000000000000000000000..c1ca3f4e61eb47ecc8aa3ab9c62e7eb07ac9b790
--- /dev/null
+++ b/official/recommend/naml/infer/sdk/main.py
@@ -0,0 +1,196 @@
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+import os
+
+import MxpiDataType_pb2 as MxpiDataType
+import numpy as np
+from StreamManagerApi import StreamManagerApi, MxDataInput, \
+ InProtobufVector, MxProtobufIn, StringVector
+
+from util.dataset import create_eval_dataset, EvalNews, EvalUsers, EvalCandidateNews, MINDPreprocess
+from util.config import config
+from util.utils import NAMLMetric
+
+
+def news_process(streamapi, news_pipeline_path, news_data):
+ """Perform news reasoning
+ Returns:
+ news_dict = {}
+ """
+ streamName = b'news_pipeline'
+ with open(news_pipeline_path, 'rb') as f:
+ pipelineStr = f.read()
+ streamapi.CreateMultipleStreams(pipelineStr)
+ iterator = news_data.create_dict_iterator(output_numpy=True)
+ tensors = []
+ news_dict = {}
+ news_dataset_size = news_data.get_dataset_size()
+ key_vec = StringVector()
+ key_vec.push_back(b'mxpi_tensorinfer0')
+
+ for count, data in enumerate(iterator):
+ tensors.clear()
+ tensors.append(data["category"])
+ tensors.append(data["subcategory"])
+ tensors.append(data["title"])
+ tensors.append(data["abstract"])
+ if not send_data(tensors, streamName, streamapi):
+ exit()
+ ret = streamapi.GetProtobuf(streamName, 0, key_vec)
+ if ret.size() == 0:
+ print("inferResult is null")
+ exit()
+ if ret[0].errorCode != 0:
+ print("GetProtobuf error. errorCode=%d" % ret[0].errorCode)
+ exit()
+ result = MxpiDataType.MxpiTensorPackageList()
+ result.ParseFromString(ret[0].messageBuf)
+ news_vec = np.frombuffer(result.tensorPackageVec[0].tensorVec[0].dataStr, dtype='<f4')
+ for nid in enumerate(data["news_id"]):
+ news_dict[str(nid[1][0])] = news_vec
+ print(f"===Generate News vector==== [ {count} / {news_dataset_size} ]", end='\r')
+ print(f"===Generate News vector==== [ {news_dataset_size} / {news_dataset_size} ]")
+ return news_dict
+
+
+def user_process(streamapi, user_pipeline_path, user_data, news_process_ret):
+ """Perform user reasoning
+ Returns:
+ user_dict = {}
+ """
+ streamName = b'user_pipeline'
+ with open(user_pipeline_path, 'rb') as f:
+ pipelineStr = f.read()
+ streamapi.CreateMultipleStreams(pipelineStr)
+ user_data_size = user_data.get_dataset_size()
+ tensors = []
+ key_vec = StringVector()
+ key_vec.push_back(b'mxpi_tensorinfer1')
+ iterator = user_data.create_dict_iterator(output_numpy=True)
+ user_dict = {}
+ for count, data in enumerate(iterator):
+ tensors.clear()
+ browsed_news = []
+ for news in data["history"]:
+ news_list = []
+ for nid in news:
+ news_list.append(news_process_ret[str(nid[0])])
+ browsed_news.append(np.array(news_list))
+ browsed_news = np.array(browsed_news)
+ tensors.append(browsed_news)
+ if not send_data(tensors, streamName, streamapi):
+ exit()
+ ret = streamapi.GetProtobuf(streamName, 0, key_vec)
+ if ret.size() == 0:
+ print("inferResult is null")
+ exit()
+ if ret[0].errorCode != 0:
+ print("GetProtobuf error. errorCode=%d" % ret[0].errorCode)
+ exit()
+ result = MxpiDataType.MxpiTensorPackageList()
+ result.ParseFromString(ret[0].messageBuf)
+ user_vec = np.frombuffer(result.tensorPackageVec[0].tensorVec[0].dataStr, dtype='<f4')
+ for uid in enumerate(data["uid"]):
+ user_dict[str(uid[1])] = user_vec
+ print(f"===Generate Users vector==== [ {count} / {user_data_size} ]", end='\r')
+ print(f"===Generate Users vector==== [ {user_data_size} / {user_data_size} ]")
+ streamapi.DestroyAllStreams()
+ return user_dict
+
+
+def create_dataset(mindpreprocess, datatype, batch_size):
+ """create_dataset"""
+ dataset = create_eval_dataset(mindpreprocess, datatype, batch_size)
+ return dataset
+
+
+def send_data(tensors, stream_name, stream_manager):
+ """
+ Construct the input of the stream,
+ send inputs data to a specified stream based on streamName.
+
+ Returns:
+ bool: send data success or not
+ """
+
+ inPluginId = 0
+ for tensor in tensors:
+ tensorPackageList = MxpiDataType.MxpiTensorPackageList()
+ tensorPackage = tensorPackageList.tensorPackageVec.add()
+ array_bytes = tensor.tobytes()
+ dataInput = MxDataInput()
+ dataInput.data = array_bytes
+ tensorVec = tensorPackage.tensorVec.add()
+ tensorVec.deviceId = 0
+ tensorVec.memType = 0
+ for i in tensor.shape:
+ tensorVec.tensorShape.append(i)
+ tensorVec.dataStr = dataInput.data
+ tensorVec.tensorDataSize = len(array_bytes)
+ key = "appsrc{}".format(inPluginId).encode('utf-8')
+ protobufVec = InProtobufVector()
+ protobuf = MxProtobufIn()
+ protobuf.key = key
+ protobuf.type = b'MxTools.MxpiTensorPackageList'
+ protobuf.protobuf = tensorPackageList.SerializeToString()
+ protobufVec.push_back(protobuf)
+ ret = stream_manager.SendProtobuf(stream_name, inPluginId, protobufVec)
+ inPluginId += 1
+ if ret != 0:
+ print("Failed to send data to stream.")
+ return False
+ return True
+
+
+def run_process():
+ """run naml model SDK process"""
+ if config.neg_sample == 4:
+ config.neg_sample = -1
+ if config.batch_size != 1:
+ config.batch_size = 1
+ config.embedding_file = os.path.join(config.dataset_path, config.embedding_file)
+ config.word_dict_path = os.path.join(config.dataset_path, config.word_dict_path)
+ config.category_dict_path = os.path.join(config.dataset_path, config.category_dict_path)
+ config.subcategory_dict_path = os.path.join(config.dataset_path, config.subcategory_dict_path)
+ config.uid2index_path = os.path.join(config.dataset_path, config.uid2index_path)
+ streamManagerApi = StreamManagerApi()
+ ret = streamManagerApi.InitManager()
+ if ret != 0:
+ print("Failed to init Stream manager, ret=%s" % str(ret))
+ exit()
+ news_pi_path = "../data/config/news.pipeline"
+ user_pi_path = "../data/config/user.pipeline"
+ mindpreprocess = MINDPreprocess(vars(config), dataset_path=os.path.join(config.dataset_path,
+ "MIND{}_dev".format(config.dataset)))
+ news_data = create_dataset(mindpreprocess, EvalNews, 1)
+ user_data = create_dataset(mindpreprocess, EvalUsers, 1)
+ news_dict = news_process(streamManagerApi, news_pi_path, news_data)
+ print("start to user_process")
+ user_dict = user_process(streamManagerApi, user_pi_path, user_data, news_dict)
+ print("start to metric")
+ eval_data = create_dataset(mindpreprocess, EvalCandidateNews, 1)
+ dataset_size = eval_data.get_dataset_size()
+ iterator = eval_data.create_dict_iterator(output_numpy=True)
+ metric = NAMLMetric()
+ for count, data in enumerate(iterator):
+ pred = np.dot(np.stack([news_dict[str(nid)] for nid in data["candidate_nid"]], axis=0),
+ user_dict[str(data["uid"])])
+ metric.update(pred, data["labels"])
+ print(f"===Click Prediction==== [ {count} / {dataset_size} ]", end='\r')
+ print(f"===Click Prediction==== [ {dataset_size} / {dataset_size} ]")
+ metric.eval()
+
+if __name__ == '__main__':
+ run_process()
diff --git a/official/recommend/naml/infer/sdk/util/__init__.py b/official/recommend/naml/infer/sdk/util/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/official/recommend/naml/infer/sdk/util/config.py b/official/recommend/naml/infer/sdk/util/config.py
new file mode 100644
index 0000000000000000000000000000000000000000..72ae8f02a6c5cd9b83b69c5f30e091718d7d6e80
--- /dev/null
+++ b/official/recommend/naml/infer/sdk/util/config.py
@@ -0,0 +1,131 @@
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+"""Parse arguments"""
+
+import os
+import ast
+import argparse
+from pprint import pprint, pformat
+import yaml
+
+
+class Config:
+ """
+ Configuration namespace. Convert dictionary to members.
+ """
+
+ def __init__(self, cfg_dict):
+ for k, v in cfg_dict.items():
+ if isinstance(v, (list, tuple)):
+ setattr(self, k, [Config(x) if isinstance(x, dict) else x for x in v])
+ else:
+ setattr(self, k, Config(v) if isinstance(v, dict) else v)
+
+ def __str__(self):
+ return pformat(self.__dict__)
+
+ def __repr__(self):
+ return self.__str__()
+
+
+def parse_cli_to_yaml(parser, cfg, helper=None, choices=None, cfg_path="default_config.yaml"):
+ """
+ Parse command line arguments to the configuration according to the default yaml.
+
+ Args:
+ parser: Parent parser.
+ cfg: Base configuration.
+ helper: Helper description.
+ cfg_path: Path to the default yaml config.
+ """
+ parser = argparse.ArgumentParser(description="[REPLACE THIS at config.py]",
+ parents=[parser])
+ helper = {} if helper is None else helper
+ choices = {} if choices is None else choices
+ for item in cfg:
+ if not isinstance(cfg[item], list) and not isinstance(cfg[item], dict):
+ help_description = helper[item] if item in helper else "Please reference to {}".format(cfg_path)
+ choice = choices[item] if item in choices else None
+ if isinstance(cfg[item], bool):
+ parser.add_argument("--" + item, type=ast.literal_eval, default=cfg[item], choices=choice,
+ help=help_description)
+ else:
+ parser.add_argument("--" + item, type=type(cfg[item]), default=cfg[item], choices=choice,
+ help=help_description)
+ args = parser.parse_args()
+ return args
+
+
+def parse_yaml(yaml_path):
+ """
+ Parse the yaml config file.
+
+ Args:
+ yaml_path: Path to the yaml config.
+ """
+ with open(yaml_path, 'r') as fin:
+ try:
+ cfgs = yaml.load_all(fin.read(), Loader=yaml.FullLoader)
+ cfgs = [x for x in cfgs]
+ if len(cfgs) == 1:
+ cfg_helper = {}
+ cfg = cfgs[0]
+ cfg_choices = {}
+ elif len(cfgs) == 2:
+ cfg, cfg_helper = cfgs
+ cfg_choices = {}
+ elif len(cfgs) == 3:
+ cfg, cfg_helper, cfg_choices = cfgs
+ else:
+ raise ValueError("At most 3 docs (config, description for help, choices) are supported in config yaml")
+ print(cfg_helper)
+ except:
+ raise ValueError("Failed to parse yaml")
+ return cfg, cfg_helper, cfg_choices
+
+
+def merge(args, cfg):
+ """
+ Merge the base config from yaml file and command line arguments.
+
+ Args:
+ args: Command line arguments.
+ cfg: Base configuration.
+ """
+ args_var = vars(args)
+ for item in args_var:
+ cfg[item] = args_var[item]
+ return cfg
+
+
+def get_config():
+ """
+ Get Config according to the yaml file and cli arguments.
+ """
+ parser = argparse.ArgumentParser(description="default name", add_help=False)
+ current_dir = os.path.dirname(os.path.abspath(__file__))
+ parser.add_argument("--config_path", type=str, default=os.path.join(current_dir, "../MINDlarge_config.yaml"),
+ help="Config file path")
+ path_args, _ = parser.parse_known_args()
+ default, helper, choices = parse_yaml(path_args.config_path)
+ args = parse_cli_to_yaml(parser=parser, cfg=default, helper=helper, choices=choices, cfg_path=path_args.config_path)
+ final_config = merge(args, default)
+ pprint(final_config)
+ print("Please check the above information for the configurations", flush=True)
+ return Config(final_config)
+
+
+config = get_config()
diff --git a/official/recommend/naml/infer/sdk/util/data2csv.py b/official/recommend/naml/infer/sdk/util/data2csv.py
new file mode 100644
index 0000000000000000000000000000000000000000..fe0a79987d2b83862c134e2f3f4be9ff86b6687a
--- /dev/null
+++ b/official/recommend/naml/infer/sdk/util/data2csv.py
@@ -0,0 +1,125 @@
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+
+import csv
+import os
+import stat
+
+from config import config
+from dataset import create_eval_dataset, EvalNews, EvalUsers, EvalCandidateNews, MINDPreprocess
+
+
+def do_Convert(infer_path):
+ """ converter for mxbase dataset """
+ if config.neg_sample == 4:
+ config.neg_sample = -1
+ if config.batch_size != 1:
+ config.batch_size = 1
+ config.embedding_file = os.path.join(config.dataset_path, config.embedding_file)
+ config.word_dict_path = os.path.join(config.dataset_path, config.word_dict_path)
+ config.category_dict_path = os.path.join(config.dataset_path, config.category_dict_path)
+ config.subcategory_dict_path = os.path.join(config.dataset_path, config.subcategory_dict_path)
+ config.uid2index_path = os.path.join(config.dataset_path, config.uid2index_path)
+
+ mindpreprocess = MINDPreprocess(vars(config),
+ dataset_path=os.path.join(config.dataset_path,
+ "MIND{}_dev".format(config.dataset)))
+ news_data = create_dataset(mindpreprocess, EvalNews, 1)
+ user_data = create_dataset(mindpreprocess, EvalUsers, 1)
+ eval_data = create_dataset(mindpreprocess, EvalCandidateNews, 1)
+ create_newsdata2file(news_data, infer_path)
+ create_userdata2file(user_data, infer_path)
+ create_evaldata2file(eval_data, infer_path)
+ print("===create mxbase_data success====", end='\r')
+
+
+def create_newsdata2file(news_data, infer_path):
+ """ news_data convert to csv file """
+ iterator = news_data.create_dict_iterator(output_numpy=True)
+ news_dataset_size = news_data.get_dataset_size()
+ rows = []
+ for count, data in enumerate(iterator):
+ row = [data["news_id"][0][0], data["category"][0][0], data["subcategory"][0][0]]
+ title_num = " ".join([str(num) for num in data["title"][0]])
+ row.append(title_num)
+ abstract_num = " ".join([str(num) for num in data["abstract"][0]])
+ row.append(abstract_num)
+ rows.append(row)
+ print(f"===create News data==== [ {count} / {news_dataset_size} ]", end='\r')
+
+ filepath = os.path.join(infer_path, "mxbase_data/newsdata.csv")
+ if not os.path.exists(filepath):
+ os.makedirs(os.path.join(infer_path, "mxbase_data/"))
+ with open(filepath, 'w', newline='') as inf:
+ writer = csv.writer(inf)
+ writer.writerows(rows)
+ inf.close()
+ os.chmod(filepath, stat.S_IRWXO)
+ print(f"===create news data==== [ {news_dataset_size} / {news_dataset_size} ]")
+
+
+def create_userdata2file(user_data, infer_path):
+ """ user_data convert to csv file """
+ iterator = user_data.create_dict_iterator(output_numpy=True)
+ user_dataset_size = user_data.get_dataset_size()
+ rows = []
+ for count, data in enumerate(iterator):
+ row = [data["uid"][0]]
+ for newses in data["history"]:
+ hisstr = " ".join([str(num[0]) for num in newses])
+ row.append(hisstr)
+ rows.append(row)
+ print(f"===create user data==== [ {count} / {user_dataset_size} ]", end='\r')
+
+ filepath = os.path.join(infer_path, "mxbase_data/userdata.csv")
+ with open(filepath, 'w', newline='') as t:
+ writer = csv.writer(t)
+ writer.writerows(rows)
+ t.close()
+ print(f"===create user data==== [ {user_dataset_size} / {user_dataset_size} ]", end='\r')
+
+
+def create_evaldata2file(eval_data, infer_path):
+ """Prediction data convert to csv file """
+ iterator = eval_data.create_dict_iterator(output_numpy=True)
+ eval_data_size = eval_data.get_dataset_size()
+ rows = []
+ for count, data in enumerate(iterator):
+ row = []
+ row.append(data["uid"])
+ candidate_nid = " ".join([str(nid) for nid in data["candidate_nid"]])
+ row.append(candidate_nid)
+ labels = " ".join([str(label) for label in data["labels"]])
+ row.append(labels)
+ rows.append(row)
+ print(f"===create eval data==== [ {count} / {eval_data_size} ]", end='\r')
+ filepath = os.path.join(infer_path, "mxbase_data/evaldata.csv")
+ with open(filepath, 'w', newline='') as t:
+ writer = csv.writer(t)
+ writer.writerows(rows)
+ t.close()
+ print(f"===create eval data==== [ {eval_data_size} / {eval_data_size} ]", end='\r')
+
+
+def create_dataset(mindpreprocess, datatype, batch_size):
+ """create_dataset"""
+ dataset = create_eval_dataset(mindpreprocess, datatype, batch_size)
+ return dataset
+
+
+if __name__ == '__main__':
+ filePath = os.path.abspath(os.path.dirname(__file__))
+ input_path = os.path.abspath(os.path.join(filePath, "../../data/"))
+ do_Convert(input_path)
diff --git a/official/recommend/naml/infer/sdk/util/dataset.py b/official/recommend/naml/infer/sdk/util/dataset.py
new file mode 100644
index 0000000000000000000000000000000000000000..9416eb25fb65253605541edcce0195e7de652c20
--- /dev/null
+++ b/official/recommend/naml/infer/sdk/util/dataset.py
@@ -0,0 +1,335 @@
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+"""Dataset loading, creation and processing"""
+import re
+import os
+import random
+import math
+import pickle
+
+import numpy as np
+import mindspore.dataset as ds
+
+ds.config.set_prefetch_size(8)
+
+
+class MINDPreprocess:
+ """
+ MIND dataset Preprocess class.
+ when training, neg_sample=4, when test, neg_sample=-1
+ """
+
+ def __init__(self, config, dataset_path=""):
+ self.config = config
+ self.dataset_dir = dataset_path
+ self.word_dict_path = config['word_dict_path']
+ self.category_dict_path = config['category_dict_path']
+ self.subcategory_dict_path = config['subcategory_dict_path']
+ self.uid2index_path = config['uid2index_path']
+
+ # behaviros config
+ self.neg_sample = config['neg_sample']
+ self.n_words_title = config['n_words_title']
+ self.n_words_abstract = config['n_words_abstract']
+ self.n_browsed_news = config['n_browsed_news']
+
+ # news config
+ self.n_words_title = config['n_words_title']
+ self.n_words_abstract = config['n_words_abstract']
+ self._tokenize = 're'
+
+ self._is_init_data = False
+ self._init_data()
+
+ self._index = 0
+ self._sample_store = []
+
+ self._diff = 0
+
+ def _load_pickle(self, file_path):
+ with open(file_path, 'rb') as fp:
+ return pickle.load(fp)
+
+ def _init_news(self, news_path):
+ """News info initialization."""
+ print(f"Start to init news, news path: {news_path}")
+
+ category_dict = self._load_pickle(file_path=self.category_dict_path)
+ word_dict = self._load_pickle(file_path=self.word_dict_path)
+ subcategory_dict = self._load_pickle(
+ file_path=self.subcategory_dict_path)
+
+ self.nid_map_index = {}
+ title_list = []
+ category_list = []
+ subcategory_list = []
+ abstract_list = []
+
+ with open(news_path) as file_handler:
+ for line in file_handler:
+ nid, category, subcategory, title, abstract, _ = line.strip("\n").split('\t')[:6]
+
+ if nid in self.nid_map_index:
+ continue
+
+ self.nid_map_index[nid] = len(self.nid_map_index)
+ title_list.append(self._word_tokenize(title))
+ category_list.append(category)
+ subcategory_list.append(subcategory)
+ abstract_list.append(self._word_tokenize(abstract))
+
+ news_len = len(title_list)
+ self.news_title_index = np.zeros((news_len, self.n_words_title), dtype=np.int32)
+ self.news_abstract_index = np.zeros((news_len, self.n_words_abstract), dtype=np.int32)
+ self.news_category_index = np.zeros((news_len, 1), dtype=np.int32)
+ self.news_subcategory_index = np.zeros((news_len, 1), dtype=np.int32)
+ self.news_ids = np.zeros((news_len, 1), dtype=np.int32)
+
+ for news_index in range(news_len):
+ title = title_list[news_index]
+ title_index_list = [word_dict.get(word.lower(), 0) for word in title[:self.n_words_title]]
+ self.news_title_index[news_index, list(range(len(title_index_list)))] = title_index_list
+
+ abstract = abstract_list[news_index]
+ abstract_index_list = [word_dict.get(word.lower(), 0) for word in abstract[:self.n_words_abstract]]
+ self.news_abstract_index[news_index, list(range(len(abstract_index_list)))] = abstract_index_list
+
+ category = category_list[news_index]
+ self.news_category_index[news_index, 0] = category_dict.get(category, 0)
+
+ subcategory = subcategory_list[news_index]
+ self.news_subcategory_index[news_index, 0] = subcategory_dict.get(subcategory, 0)
+
+ self.news_ids[news_index, 0] = news_index
+
+ def _init_behaviors(self, behaviors_path):
+ """Behaviors info initialization."""
+ print(f"Start to init behaviors, path: {behaviors_path}")
+
+ self.history_list = []
+ self.impression_list = []
+ self.label_list = []
+ self.impression_index_list = []
+ self.uid_list = []
+ self.poss = []
+ self.negs = []
+ self.index_map = {}
+
+ self.total_count = 0
+ uid2index = self._load_pickle(self.uid2index_path)
+
+ with open(behaviors_path) as file_handler:
+ for index, line in enumerate(file_handler):
+ uid, _, history, impressions = line.strip("\n").split('\t')[-4:]
+ negs = []
+ history = [self.nid_map_index[i] for i in history.split()]
+ random.shuffle(history)
+ history = [0] * (self.n_browsed_news - len(history)) + history[:self.n_browsed_news]
+ user_id = uid2index.get(uid, 0)
+
+ if self.neg_sample > 0:
+ for item in impressions.split():
+ nid, label = item.split('-')
+ nid = self.nid_map_index[nid]
+ if label == '1':
+ self.poss.append(nid)
+ self.index_map[self.total_count] = index
+ self.total_count += 1
+ else:
+ negs.append(nid)
+ else:
+ nids = []
+ labels = []
+ for item in impressions.split():
+ nid, label = item.split('-')
+ nids.append(self.nid_map_index[nid])
+ labels.append(int(label))
+ self.impression_list.append((np.array(nids, dtype=np.int32), np.array(labels, dtype=np.int32)))
+ self.total_count += 1
+
+ self.history_list.append(history)
+ self.negs.append(negs)
+ self.uid_list.append(user_id)
+
+ def _init_data(self):
+ news_path = os.path.join(self.dataset_dir, 'news.tsv')
+ behavior_path = os.path.join(self.dataset_dir, 'behaviors.tsv')
+ if not self._is_init_data:
+ self._init_news(news_path)
+ self._init_behaviors(behavior_path)
+ self._is_init_data = True
+ print(f'init data end, count: {self.total_count}')
+
+ def _word_tokenize(self, sent):
+ """
+ Split sentence into word list using regex.
+ Args:
+ sent (str): Input sentence
+
+ Return:
+ list: word list
+ """
+ pat = re.compile(r"[\w]+|[.,!?;|]")
+ if isinstance(sent, str):
+ return pat.findall(sent.lower())
+ return []
+
+ def __getitem__(self, index):
+ uid_index = self.index_map[index]
+ if self.neg_sample >= 0:
+ negs = self.negs[uid_index]
+ nid = self.poss[index]
+ random.shuffle(negs)
+ neg_samples = (negs + [0] * (self.neg_sample - len(negs))) if self.neg_sample > len(negs) \
+ else random.sample(negs, self.neg_sample)
+ candidate_samples = [nid] + neg_samples
+ labels = [1] + [0] * self.neg_sample
+
+ else:
+ candidate_samples, labels = self.preprocess.impression_list[index]
+ browsed_samples = self.history_list[uid_index]
+ browsed_category = np.array(self.news_category_index[browsed_samples], dtype=np.int32)
+ browsed_subcategory = np.array(self.news_subcategory_index[browsed_samples], dtype=np.int32)
+ browsed_title = np.array(self.news_title_index[browsed_samples], dtype=np.int32)
+ browsed_abstract = np.array(self.news_abstract_index[browsed_samples], dtype=np.int32)
+ candidate_category = np.array(self.news_category_index[candidate_samples], dtype=np.int32)
+ candidate_subcategory = np.array(self.news_subcategory_index[candidate_samples], dtype=np.int32)
+ candidate_title = np.array(self.news_title_index[candidate_samples], dtype=np.int32)
+ candidate_abstract = np.array(self.news_abstract_index[candidate_samples], dtype=np.int32)
+ labels = np.array(labels, dtype=np.int32)
+ return browsed_category, browsed_subcategory, browsed_title, browsed_abstract, \
+ candidate_category, candidate_subcategory, candidate_title, candidate_abstract, labels
+
+ @property
+ def column_names(self):
+ news_column_names = ['category', 'subcategory', 'title', 'abstract']
+ column_names = ['browsed_' + item for item in news_column_names]
+ column_names += ['candidate_' + item for item in news_column_names]
+ column_names += ['labels']
+ return column_names
+
+ def __len__(self):
+ return self.total_count
+
+
+class EvalDatasetBase:
+ """Base evaluation Datase class."""
+
+ def __init__(self, preprocess: MINDPreprocess):
+ self.preprocess = preprocess
+
+
+class EvalNews(EvalDatasetBase):
+ """Generator dataset for all news."""
+
+ def __len__(self):
+ return len(self.preprocess.news_title_index)
+
+ def __getitem__(self, index):
+ news_id = self.preprocess.news_ids[index]
+ title = self.preprocess.news_title_index[index]
+ category = self.preprocess.news_category_index[index]
+ subcategory = self.preprocess.news_subcategory_index[index]
+ abstract = self.preprocess.news_abstract_index[index]
+ return news_id.reshape(-1), category.reshape(-1), subcategory.reshape(-1), title.reshape(-1), \
+ abstract.reshape(-1)
+
+ @property
+ def column_names(self):
+ return ['news_id', 'category', 'subcategory', 'title', 'abstract']
+
+
+class EvalUsers(EvalDatasetBase):
+ """Generator dataset for all user."""
+
+ def __len__(self):
+ return len(self.preprocess.uid_list)
+
+ def __getitem__(self, index):
+ uid = np.array(self.preprocess.uid_list[index], dtype=np.int32)
+ history = np.array(self.preprocess.history_list[index], dtype=np.int32)
+ return uid, history.reshape(50, 1)
+
+ @property
+ def column_names(self):
+ return ['uid', 'history']
+
+
+class EvalCandidateNews(EvalDatasetBase):
+ """Generator dataset for all candidate news."""
+
+ @property
+ def column_names(self):
+ return ['uid', 'candidate_nid', 'labels']
+
+ def __len__(self):
+ return self.preprocess.total_count
+
+ def __getitem__(self, index):
+ uid = np.array(self.preprocess.uid_list[index], dtype=np.int32)
+ nid, label = self.preprocess.impression_list[index]
+ return uid, nid, label
+
+
+class DistributedSampler():
+ """
+ sampling the dataset.
+
+ Args:
+ Returns:
+ num_samples, number of samples.
+ """
+
+ def __init__(self, preprocess: MINDPreprocess, rank, group_size, shuffle=True, seed=0):
+ self.preprocess = preprocess
+ self.rank = rank
+ self.group_size = group_size
+ self.dataset_length = preprocess.total_count
+ self.num_samples = int(math.ceil(self.dataset_length * 1.0 / self.group_size))
+ self.total_size = self.num_samples * self.group_size
+ self.shuffle = shuffle
+ self.seed = seed
+
+ def __iter__(self):
+ if self.shuffle:
+ self.seed = (self.seed + 1) & 0xffffffff
+ np.random.seed(self.seed)
+ indices = np.random.permutation(self.dataset_length).tolist()
+ else:
+ indices = list(range(len(self.dataset_length)))
+
+ indices += indices[:(self.total_size - len(indices))]
+ indices = indices[self.rank::self.group_size]
+ return iter(indices)
+
+ def __len__(self):
+ return self.num_samples
+
+
+def create_dataset(mindpreprocess, batch_size=64, rank=0, group_size=1):
+ """Get generator dataset when training."""
+ sampler = DistributedSampler(mindpreprocess, rank, group_size, shuffle=True)
+ dataset = ds.GeneratorDataset(mindpreprocess, mindpreprocess.column_names, sampler=sampler)
+ dataset = dataset.batch(batch_size, drop_remainder=True)
+ return dataset
+
+
+def create_eval_dataset(mindpreprocess, eval_cls, batch_size=64):
+ """Get generator dataset when evaluation."""
+ eval_instance = eval_cls(mindpreprocess)
+ dataset = ds.GeneratorDataset(eval_instance, eval_instance.column_names, shuffle=False)
+ if not isinstance(eval_instance, EvalCandidateNews):
+ dataset = dataset.batch(batch_size)
+ return dataset
diff --git a/official/recommend/naml/infer/sdk/util/utils.py b/official/recommend/naml/infer/sdk/util/utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..799d105d007cfce8510c32a24233af6805dcb43d
--- /dev/null
+++ b/official/recommend/naml/infer/sdk/util/utils.py
@@ -0,0 +1,131 @@
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+"""Utils for NAML."""
+import time
+import numpy as np
+from sklearn.metrics import roc_auc_score
+from mindspore import Tensor
+
+from .dataset import create_eval_dataset, EvalNews, EvalUsers, EvalCandidateNews
+
+def get_metric(args, mindpreprocess, news_encoder, user_encoder, metric):
+ """Calculate metrics."""
+ start = time.time()
+ news_dict = {}
+ user_dict = {}
+ dataset = create_eval_dataset(mindpreprocess, EvalNews, batch_size=args.batch_size)
+ dataset_size = dataset.get_dataset_size()
+ iterator = dataset.create_dict_iterator(output_numpy=True)
+ for count, data in enumerate(iterator):
+ news_vector = news_encoder(Tensor(data["category"]), Tensor(data["subcategory"]),
+ Tensor(data["title"]), Tensor(data["abstract"])).asnumpy()
+ for i, nid in enumerate(data["news_id"]):
+ news_dict[str(nid[0])] = news_vector[i]
+ print(f"===Generate News vector==== [ {count} / {dataset_size} ]", end='\r')
+ print(f"===Generate News vector==== [ {dataset_size} / {dataset_size} ]")
+ dataset = create_eval_dataset(mindpreprocess, EvalUsers, batch_size=args.batch_size)
+ dataset_size = dataset.get_dataset_size()
+ iterator = dataset.create_dict_iterator(output_numpy=True)
+ for count, data in enumerate(iterator):
+ browsed_news = []
+ for newses in data["history"]:
+ news_list = []
+ for nid in newses:
+ news_list.append(news_dict[str(nid[0])])
+ browsed_news.append(np.array(news_list))
+ browsed_news = np.array(browsed_news)
+ user_vector = user_encoder(Tensor(browsed_news)).asnumpy()
+ for i, uid in enumerate(data["uid"]):
+ user_dict[str(uid)] = user_vector[i]
+ print(f"===Generate Users vector==== [ {count} / {dataset_size} ]", end='\r')
+ print(f"===Generate Users vector==== [ {dataset_size} / {dataset_size} ]")
+ dataset = create_eval_dataset(mindpreprocess, EvalCandidateNews, batch_size=args.batch_size)
+ dataset_size = dataset.get_dataset_size()
+ iterator = dataset.create_dict_iterator(output_numpy=True)
+ for count, data in enumerate(iterator):
+ pred = np.dot(
+ np.stack([news_dict[str(nid)] for nid in data["candidate_nid"]], axis=0),
+ user_dict[str(data["uid"])]
+ )
+ metric.update(pred, data["labels"])
+ print(f"===Click Prediction==== [ {count} / {dataset_size} ]", end='\r')
+ print(f"===Click Prediction==== [ {dataset_size} / {dataset_size} ]")
+ auc = metric.eval()
+ total_cost = time.time() - start
+ print(f"Eval total cost: {total_cost} s")
+ return auc
+
+def process_data(args):
+ word_embedding = np.load(args.embedding_file)
+ _, h = word_embedding.shape
+ if h < args.word_embedding_dim:
+ word_embedding = np.pad(word_embedding, ((0, 0), (0, args.word_embedding_dim - 300)), 'constant',
+ constant_values=0)
+ elif h > args.word_embedding_dim:
+ word_embedding = word_embedding[:, :args.word_embedding_dim]
+ print("Load word_embedding", word_embedding.shape)
+ return Tensor(word_embedding.astype(np.float32))
+
+def AUC(y_true, y_pred):
+ return roc_auc_score(y_true, y_pred)
+
+def MRR(y_true, y_pred):
+ index = np.argsort(y_pred)[::-1]
+ y_true = np.take(y_true, index)
+ score = y_true / (np.arange(len(y_true)) + 1)
+ return np.sum(score) / np.sum(y_true)
+
+def DCG(y_true, y_pred, n):
+ index = np.argsort(y_pred)[::-1]
+ y_true = np.take(y_true, index[:n])
+ score = (2 ** y_true - 1) / np.log2(np.arange(len(y_true)) + 2)
+ return np.sum(score)
+
+def nDCG(y_true, y_pred, n):
+ return DCG(y_true, y_pred, n) / DCG(y_true, y_true, n)
+
+class NAMLMetric:
+ """
+ Metric method
+ """
+ def __init__(self):
+ super(NAMLMetric, self).__init__()
+ self.AUC_list = []
+ self.MRR_list = []
+ self.nDCG5_list = []
+ self.nDCG10_list = []
+
+ def clear(self):
+ """Clear the internal evaluation result."""
+ self.AUC_list = []
+ self.MRR_list = []
+ self.nDCG5_list = []
+ self.nDCG10_list = []
+
+ def update(self, predict, y_true):
+ predict = predict.flatten()
+ y_true = y_true.flatten()
+ self.AUC_list.append(AUC(y_true, predict))
+ self.MRR_list.append(MRR(y_true, predict))
+ self.nDCG5_list.append(nDCG(y_true, predict, 5))
+ self.nDCG10_list.append(nDCG(y_true, predict, 10))
+
+ def eval(self):
+ auc = np.mean(self.AUC_list)
+ print('AUC:', auc)
+ print('MRR:', np.mean(self.MRR_list))
+ print('nDCG@5:', np.mean(self.nDCG5_list))
+ print('nDCG@10:', np.mean(self.nDCG10_list))
+ return auc
diff --git a/official/recommend/naml/modelarts/train_start.py b/official/recommend/naml/modelarts/train_start.py
new file mode 100644
index 0000000000000000000000000000000000000000..a099a9108253235ac571a9873f4473e906d04eec
--- /dev/null
+++ b/official/recommend/naml/modelarts/train_start.py
@@ -0,0 +1,208 @@
+# Copyright 2022 Huawei Technologies Co., Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============================================================================
+"""Train NAML."""
+import time
+import os
+import glob
+import math
+import numpy as np
+from mindspore import nn, load_checkpoint, context, save_checkpoint, Tensor
+from mindspore.train.serialization import export
+import mindspore.common.dtype as mstype
+from mindspore.common import set_seed
+from mindspore.train.model import Model
+from mindspore.train.loss_scale_manager import DynamicLossScaleManager
+from mindspore.context import ParallelMode
+from mindspore.communication.management import init
+from mindspore.train.callback import CheckpointConfig, ModelCheckpoint, LossMonitor, TimeMonitor
+from src.naml import NAML, NAMLWithLossCell
+from src.dataset import create_dataset, MINDPreprocess
+from src.utils import process_data
+
+from model_utils.config import config
+from model_utils.moxing_adapter import moxing_wrapper
+from model_utils.device_adapter import get_rank_id, get_device_id, get_device_num
+
+
+def modelarts_pre_process():
+ '''modelarts pre process function.'''
+
+ def unzip(zip_file, save_dir):
+ import zipfile
+ s_time = time.time()
+ if not os.path.exists(os.path.join(save_dir, config.modelarts_dataset_unzip_name)):
+ zip_isexist = zipfile.is_zipfile(zip_file)
+ if zip_isexist:
+ fz = zipfile.ZipFile(zip_file, 'r')
+ data_num = len(fz.namelist())
+ print("Extract Start...")
+ print("unzip file num: {}".format(data_num))
+ data_print = int(data_num / 100) if data_num > 100 else 1
+ i = 0
+ for file in fz.namelist():
+ if i % data_print == 0:
+ print("unzip percent: {}%".format(int(i * 100 / data_num)), flush=True)
+ i += 1
+ fz.extract(file, save_dir)
+ print("cost time: {}min:{}s.".format(int((time.time() - s_time) / 60),
+ int(int(time.time() - s_time) % 60)))
+ print("Extract Done.")
+ else:
+ print("This is not zip.")
+ else:
+ print("Zip has been extracted.")
+
+ if config.need_modelarts_dataset_unzip:
+ zip_file_1 = os.path.join(config.data_path, config.modelarts_dataset_unzip_name + ".zip")
+ save_dir_1 = os.path.join(config.data_path)
+
+ sync_lock = "/tmp/unzip_sync.lock"
+
+ # Each server contains 8 devices as most.
+ if get_device_id() % min(get_device_num(), 8) == 0 and not os.path.exists(sync_lock):
+ print("Zip file path: ", zip_file_1)
+ print("Unzip file save dir: ", save_dir_1)
+ unzip(zip_file_1, save_dir_1)
+ print("===Finish extract data synchronization===")
+ try:
+ os.mknod(sync_lock)
+ except IOError:
+ pass
+
+ while True:
+ if os.path.exists(sync_lock):
+ break
+ time.sleep(1)
+
+ print("Device: {}, Finish sync unzip data from {} to {}.".format(get_device_id(), zip_file_1, save_dir_1))
+
+ config.save_checkpoint_path = os.path.join(config.output_path, config.save_checkpoint_path)
+
+
+def run_export(ckpt_model):
+ """run export."""
+ config.checkpoint_path = ckpt_model
+ config.phase = "export"
+ config.device_id = get_device_id()
+ config.neg_sample = config.export_neg_sample
+ context.set_context(mode=context.GRAPH_MODE, device_target=config.platform, device_id=config.device_id,
+ save_graphs=config.save_graphs, save_graphs_path="naml_ir")
+
+ net = NAML(config)
+ net.set_train(False)
+ net_with_loss = NAMLWithLossCell(net)
+ load_checkpoint(config.checkpoint_path, net_with_loss)
+ news_encoder = net.news_encoder
+ user_encoder = net.user_encoder
+ bs = config.batch_size
+ category = Tensor(np.zeros([bs, 1], np.int32))
+ subcategory = Tensor(np.zeros([bs, 1], np.int32))
+ title = Tensor(np.zeros([bs, config.n_words_title], np.int32))
+ abstract = Tensor(np.zeros([bs, config.n_words_abstract], np.int32))
+
+ news_input_data = [category, subcategory, title, abstract]
+ export(news_encoder, *news_input_data,
+ file_name=os.path.join(config.save_checkpoint_path + '/', f"naml_news_encoder_bs_{bs}"),
+ file_format="AIR")
+
+ browsed_news = Tensor(np.zeros([bs, config.n_browsed_news, config.n_filters], np.float32))
+ export(user_encoder, browsed_news,
+ file_name=os.path.join(config.save_checkpoint_path + '/', f"naml_user_encoder_bs_{bs}"),
+ file_format="AIR")
+
+
+@moxing_wrapper(pre_process=modelarts_pre_process)
+def run_train():
+ """run train."""
+ config.phase = "train"
+ config.rank = get_rank_id()
+ config.device_id = get_device_id()
+ config.device_num = get_device_num()
+ if config.device_num > 1:
+ context.set_context(mode=context.GRAPH_MODE, device_target=config.platform, save_graphs=config.save_graphs)
+ context.reset_auto_parallel_context()
+ context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True,
+ device_num=config.device_num)
+ init()
+ config.save_checkpoint_path = os.path.join(config.save_checkpoint_path, "ckpt_" + str(config.rank))
+ else:
+ context.set_context(mode=context.GRAPH_MODE, device_target=config.platform, device_id=config.device_id,
+ save_graphs=config.save_graphs, save_graphs_path="naml_ir")
+
+ config.epochs = config.default_epochs * math.ceil(config.device_num ** 0.5) if config.epochs <= 0 else config.epochs
+
+ config.embedding_file = os.path.join(config.dataset_path, config.embedding_file)
+ config.word_dict_path = os.path.join(config.dataset_path, config.word_dict_path)
+ config.category_dict_path = os.path.join(config.dataset_path, config.category_dict_path)
+ config.subcategory_dict_path = os.path.join(config.dataset_path, config.subcategory_dict_path)
+ config.uid2index_path = os.path.join(config.dataset_path, config.uid2index_path)
+ config.train_dataset_path = os.path.join(config.dataset_path, config.train_dataset_path)
+ config.eval_dataset_path = os.path.join(config.dataset_path, config.eval_dataset_path)
+
+ set_seed(config.seed)
+ word_embedding = process_data(config)
+ net = NAML(config, word_embedding)
+ net_with_loss = NAMLWithLossCell(net)
+ if config.checkpoint_path:
+ load_checkpoint(config.pretrain_checkpoint, net_with_loss)
+ mindpreprocess_train = MINDPreprocess(vars(config), dataset_path=config.train_dataset_path)
+ dataset = create_dataset(mindpreprocess_train, batch_size=config.batch_size, rank=config.rank,
+ group_size=config.device_num)
+ config.dataset_size = dataset.get_dataset_size()
+ config.print_times = min(config.dataset_size, config.print_times)
+ if config.weight_decay:
+ weight_params = list(filter(lambda x: 'weight' in x.name, net.trainable_params()))
+ other_params = list(filter(lambda x: 'weight' not in x.name, net.trainable_params()))
+ group_params = [{'params': weight_params, 'weight_decay': 1e-3},
+ {'params': other_params, 'weight_decay': 0.0},
+ {'order_params': net.trainable_params()}]
+ opt = nn.AdamWeightDecay(group_params, config.lr, beta1=config.beta1, beta2=config.beta2, eps=config.epsilon)
+ else:
+ opt = nn.Adam(net.trainable_params(), config.lr, beta1=config.beta1, beta2=config.beta2, eps=config.epsilon)
+ if config.mixed:
+ loss_scale_manager = DynamicLossScaleManager(init_loss_scale=128.0, scale_factor=2, scale_window=10000)
+ net_with_loss.to_float(mstype.float16)
+ for _, cell in net_with_loss.cells_and_names():
+ if isinstance(cell, (nn.Embedding, nn.Softmax, nn.SoftmaxCrossEntropyWithLogits)):
+ cell.to_float(mstype.float32)
+ model = Model(net_with_loss, optimizer=opt, loss_scale_manager=loss_scale_manager)
+ else:
+ model = Model(net_with_loss, optimizer=opt)
+
+ # checkpoint
+ cb = [TimeMonitor(data_size=config.dataset_size), LossMonitor()]
+ if config.rank == 0:
+ ckpt_config = CheckpointConfig(save_checkpoint_steps=config.dataset_size)
+ cb.append(ModelCheckpoint(prefix="naml", directory=config.save_checkpoint_path, config=ckpt_config))
+
+ start_time = time.time()
+ print("======================= Start Train ==========================", flush=True)
+ model.train(config.epochs, dataset, callbacks=cb, dataset_sink_mode=config.sink_mode)
+ save_checkpoint(net_with_loss, os.path.join(config.save_checkpoint_path, "naml_last.ckpt"))
+ end_time = time.time()
+ ckpt_list = glob.glob(os.path.join(config.save_checkpoint_path, '*.ckpt'))
+ ckpt_list.sort()
+ ckpt_model = ckpt_list[-1]
+
+ run_export(ckpt_model)
+ print("processor_name: {}".format(config.platform))
+ print("test_name: NAML")
+ print(f"model_name: NAML MIND{config.dataset}")
+ print("batch_size: {}".format(config.batch_size))
+ print("latency: {} s".format(end_time - start_time))
+
+
+if __name__ == '__main__':
+ run_train()