Skip to content
Snippets Groups Projects
Commit eb9907e6 authored by Niu Chong's avatar Niu Chong Committed by Will Zhang
Browse files

Fix: fix: set thrd_id for CompTaskNode as GetCpuDeviceThrdId(dev_phy_id) (#919)

parent cda02dc7
No related branches found
No related tags found
No related merge requests found
......@@ -183,16 +183,17 @@ bool LogicalNode::HasOpWithForwardModelBlob() const {
[](const Operator* op) { return op->forward_model_bns().empty() == false; });
}
void LogicalNode::GenSortedCompTaskNodes(std::function<int64_t(const TaskNode*)> AllocateCpuThrdId,
std::function<void(CompTaskNode*)> Handler) const {
void LogicalNode::GenSortedCompTaskNodes(
std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly,
std::function<void(CompTaskNode*)> Handler) const {
int64_t parallel_idx = 0;
int64_t parallel_num = parallel_desc_->parallel_num();
for (int64_t machine_id : parallel_desc_->sorted_machine_ids()) {
for (int64_t dev_phy_id : parallel_desc_->sorted_dev_phy_ids(machine_id)) {
CompTaskNode* comp_task_node = NewCompTaskNode();
comp_task_node->set_machine_id(machine_id);
const IDMgr* id_mgr = Global<IDMgr>::Get();
if (parallel_desc_->device_type() == DeviceType::kGPU) {
const IDMgr* id_mgr = Global<IDMgr>::Get();
switch (comp_task_node->GetCudaWorkType()) {
case CudaWorkType::kCompute: {
comp_task_node->set_thrd_id(id_mgr->GetGpuComputeThrdId(dev_phy_id));
......@@ -213,7 +214,12 @@ void LogicalNode::GenSortedCompTaskNodes(std::function<int64_t(const TaskNode*)>
default: UNIMPLEMENTED();
}
} else if (parallel_desc_->device_type() == DeviceType::kCPU) {
comp_task_node->set_thrd_id(AllocateCpuThrdId(comp_task_node));
if (comp_task_node->IsPersistence()) {
comp_task_node->set_thrd_id(AllocateCpuThrdIdEvenly(comp_task_node));
} else {
comp_task_node->set_thrd_id(
id_mgr->GetCpuDeviceThrdId(dev_phy_id % Global<JobDesc>::Get()->CpuDeviceNum()));
}
} else {
UNIMPLEMENTED();
}
......
......@@ -43,7 +43,7 @@ class LogicalNode : public Node<LogicalNode, LogicalEdge> {
bool HasOpWithModelOrConstModelBlob() const;
bool HasOpWithModelBlob() const;
bool HasOpWithForwardModelBlob() const;
void GenSortedCompTaskNodes(std::function<int64_t(const TaskNode*)> AllocateCpuThrdId,
void GenSortedCompTaskNodes(std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly,
std::function<void(CompTaskNode*)>) const;
// model split
......@@ -78,7 +78,7 @@ class LogicalNode : public Node<LogicalNode, LogicalEdge> {
HashMap<const LogicalNode*, std::vector<TaskNode*>>* logical2sorted_out_box, \
std::function<TaskNode**(CompTaskNode * src, int64_t machine_id, int32_t mem_zone_id)> \
Mut121BufTask, \
std::function<int64_t(const TaskNode*)> AllocateCpuThrdId)
std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly)
class TaskGraph;
using BldSubTskGphMthd = void(TaskGraph::*) BLD_SUB_TSK_GPH_MTHD_ARGS();
......
......@@ -19,7 +19,7 @@ TaskGraph::TaskGraph(std::unique_ptr<const LogicalGraph>&& logical_gph) {
std::vector<int64_t> cpu_device_offset(job_desc->TotalMachineNum(), 0);
std::vector<int64_t> persistence_offset(job_desc->TotalMachineNum(), 0);
auto AllocateCpuThrdId = [&](const TaskNode* task_node) {
auto AllocateCpuThrdIdEvenly = [&](const TaskNode* task_node) {
int64_t ret = -1;
if (task_node->IsPersistence() == false) {
int64_t& offset = cpu_device_offset.at(task_node->machine_id());
......@@ -33,10 +33,11 @@ TaskGraph::TaskGraph(std::unique_ptr<const LogicalGraph>&& logical_gph) {
return ret;
};
logical_gph_->ForEachNode([&](const LogicalNode* logical_node) {
logical_node->GenSortedCompTaskNodes(AllocateCpuThrdId, [&](CompTaskNode* comp_task_node) {
AddAllocatedNode(comp_task_node);
logical2sorted_comp_tasks[logical_node].push_back(comp_task_node);
});
logical_node->GenSortedCompTaskNodes(
AllocateCpuThrdIdEvenly, [&](CompTaskNode* comp_task_node) {
AddAllocatedNode(comp_task_node);
logical2sorted_comp_tasks[logical_node].push_back(comp_task_node);
});
});
logical_gph_->ForEachEdge([&](const LogicalEdge* logical_edge) {
BldSubTskGphMthd method =
......@@ -44,7 +45,7 @@ TaskGraph::TaskGraph(std::unique_ptr<const LogicalGraph>&& logical_gph) {
(this->*method)(logical_edge->src_node(), logical_edge->dst_node(),
logical2sorted_comp_tasks.at(logical_edge->src_node()),
logical2sorted_comp_tasks.at(logical_edge->dst_node()), &logical2sorted_in_box,
&logical2sorted_out_box, Mut121BufTask, AllocateCpuThrdId);
&logical2sorted_out_box, Mut121BufTask, AllocateCpuThrdIdEvenly);
});
ToDotWithAutoFilePath();
}
......@@ -57,7 +58,7 @@ DEFINE_BLD_SUB_TASK_GRAPH_METHOD(BldSubTskGphByBoxing) {
std::vector<TaskNode*>* sorted_out_box = nullptr;
if (logical2sorted_out_box->find(src_logical) == logical2sorted_out_box->end()) {
BuildOutBoxing(src_logical, sorted_src_comp_tasks, &((*logical2sorted_out_box)[src_logical]),
AllocateCpuThrdId);
AllocateCpuThrdIdEvenly);
}
sorted_out_box = &(logical2sorted_out_box->at(src_logical));
......@@ -65,7 +66,7 @@ DEFINE_BLD_SUB_TASK_GRAPH_METHOD(BldSubTskGphByBoxing) {
std::vector<TaskNode*>* sorted_in_box = nullptr;
if (logical2sorted_in_box->find(dst_logical) == logical2sorted_in_box->end()) {
BuildInBoxing(dst_logical, sorted_dst_comp_tasks, &((*logical2sorted_in_box)[dst_logical]),
AllocateCpuThrdId);
AllocateCpuThrdIdEvenly);
}
sorted_in_box = &(logical2sorted_in_box->at(dst_logical));
......@@ -130,7 +131,7 @@ DEFINE_BLD_SUB_TASK_GRAPH_METHOD(BldSubTskGphBySelectOneSourceToSoleSink) {
}
CHECK_NOTNULL(selected_src_comp_task);
BldSubTskGphByOneToOne(nullptr, nullptr, {selected_src_comp_task}, sorted_dst_comp_tasks, nullptr,
nullptr, Mut121BufTask, AllocateCpuThrdId);
nullptr, Mut121BufTask, AllocateCpuThrdIdEvenly);
}
DEFINE_BLD_SUB_TASK_GRAPH_METHOD(BldSubTskGphByReduceScatter2ReduceAdd) {
......@@ -224,7 +225,7 @@ void TaskGraph::AddCopyCommNetTask(TaskNode* src, TaskNode* dst) {
void TaskGraph::BuildOutBoxing(const LogicalNode* logical,
const std::vector<CompTaskNode*>& sorted_comp_tasks,
std::vector<TaskNode*>* sorted_out_box,
std::function<int64_t(const TaskNode*)> AllocateCpuThrdId) {
std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly) {
std::map<int64_t, std::vector<TaskNode*>> machine_id2bound_task;
for (CompTaskNode* comp_task : sorted_comp_tasks) {
TaskNode* task = AddCopyD2HTaskIfNotCpu(comp_task);
......@@ -233,7 +234,7 @@ void TaskGraph::BuildOutBoxing(const LogicalNode* logical,
for (const auto& pair : machine_id2bound_task) {
OutBoxingTaskNode* boxing_task = NewNode<OutBoxingTaskNode>();
boxing_task->set_machine_id(pair.second.front()->machine_id());
boxing_task->set_thrd_id(AllocateCpuThrdId(boxing_task));
boxing_task->set_thrd_id(AllocateCpuThrdIdEvenly(boxing_task));
for (TaskNode* task : pair.second) { Connect<TaskNode>(task, NewEdge(), boxing_task); }
sorted_out_box->push_back(boxing_task);
}
......@@ -242,7 +243,7 @@ void TaskGraph::BuildOutBoxing(const LogicalNode* logical,
void TaskGraph::BuildInBoxing(const LogicalNode* logical,
const std::vector<CompTaskNode*>& sorted_comp_tasks,
std::vector<TaskNode*>* sorted_in_box,
std::function<int64_t(const TaskNode*)> AllocateCpuThrdId) {
std::function<int64_t(const TaskNode*)> AllocateCpuThrdIdEvenly) {
std::map<int64_t, std::vector<TaskNode*>> machine_id2bound_task;
for (CompTaskNode* comp_task : sorted_comp_tasks) {
TaskNode* task = AddCopyH2DTaskIfNotCpu(comp_task);
......@@ -251,7 +252,7 @@ void TaskGraph::BuildInBoxing(const LogicalNode* logical,
for (const auto& pair : machine_id2bound_task) {
InBoxingTaskNode* boxing_task = NewNode<InBoxingTaskNode>();
boxing_task->set_machine_id(pair.second.front()->machine_id());
boxing_task->set_thrd_id(AllocateCpuThrdId(boxing_task));
boxing_task->set_thrd_id(AllocateCpuThrdIdEvenly(boxing_task));
for (TaskNode* task : pair.second) { Connect<TaskNode>(boxing_task, NewEdge(), task); }
sorted_in_box->push_back(boxing_task);
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment