Skip to content
Snippets Groups Projects
Unverified Commit 810d8db5 authored by Juncheng's avatar Juncheng Committed by GitHub
Browse files

Remove Plan::net_topo (#5502)


Co-authored-by: default avataroneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
parent ec0dd0fa
No related branches found
No related tags found
No related merge requests found
......@@ -25,47 +25,6 @@ limitations under the License.
namespace oneflow {
void Compiler::GenNetTopo(Plan* plan) const {
HashMap<int64_t, int64_t> rid2mid;
HashMap<int64_t, int64_t> tid2mid;
std::map<int64_t, std::set<int64_t>> net_topo;
for (const TaskProto& task_proto : plan->task()) {
for (const auto& regst_desc_it : task_proto.produced_regst_desc()) {
rid2mid.emplace(regst_desc_it.second.regst_desc_id(), task_proto.machine_id());
}
CHECK(tid2mid.emplace(task_proto.task_id(), task_proto.machine_id()).second);
}
for (const TaskProto& task_proto : plan->task()) {
for (const auto& regst_desc_it : task_proto.produced_regst_desc()) {
int64_t rid = regst_desc_it.second.regst_desc_id();
auto rid2mid_it = rid2mid.find(rid);
CHECK(rid2mid_it != rid2mid.end());
int64_t producer_mid = rid2mid_it->second;
for (int64_t consumer_task_id : regst_desc_it.second.consumer_task_id()) {
auto tid2mid_it = tid2mid.find(consumer_task_id);
CHECK(tid2mid_it != tid2mid.end());
int64_t consumer_mid = tid2mid_it->second;
net_topo[producer_mid].insert(consumer_mid);
net_topo[consumer_mid].insert(producer_mid);
}
}
}
HashMap<int64_t, MachineIds> std_net_topo;
NetTopo& pb_net_topo = *(plan->mutable_net_topo());
for (auto& pair : net_topo) {
int64_t src_mid = pair.first;
if (pair.second.count(src_mid)) { pair.second.erase(src_mid); }
std::vector<int64_t> peer_mids(pair.second.begin(), pair.second.end());
MachineIds pb_mids;
*(pb_mids.mutable_machine_id()) = StdVec2PbRf<int64_t>(peer_mids);
CHECK(std_net_topo.emplace(src_mid, pb_mids).second);
}
*(pb_net_topo.mutable_peer_machine_ids()) = HashMap2PbMap(std_net_topo);
}
void CreateOpAttributeRef(Plan* plan, int64_t job_id, TaskProto* task_proto) {
auto* job_id2op_attribute_ref_table = plan->mutable_job_id2op_attribute_ref_table();
CHECK(task_proto->exec_sequence().exec_node_size() == 1);
......
......@@ -30,7 +30,6 @@ class Compiler final {
~Compiler() = default;
void Compile(Job*, Plan*, bool need_job_complete) const;
void GenNetTopo(Plan* plan) const;
};
} // namespace oneflow
......
......@@ -103,8 +103,6 @@ std::string cluster_thrd_ids_key(const std::string& plan_name) {
return plan_name + "_cluster_thrd_ids";
}
std::string net_topo_key(const std::string& plan_name) { return plan_name + "_net_topo"; }
std::string ctrl_regst_desc_info_key(const std::string& plan_name) {
return plan_name + "_ctrl_regst_desc_info_key";
}
......@@ -191,7 +189,6 @@ void PushPlan(const std::string& plan_name, Plan&& plan) {
Global<CtrlClient>::Get()->PushKV(block7chunk_key(plan_name, pair.first), pair.second);
}
Global<CtrlClient>::Get()->PushKV(net_topo_key(plan_name), plan.net_topo());
Global<CtrlClient>::Get()->PushKV(ctrl_regst_desc_info_key(plan_name),
plan.ctrl_regst_desc_info());
Global<CtrlClient>::Get()->PushKV(job_id2job_conf(plan_name), plan.job_confs());
......@@ -214,9 +211,6 @@ void PullPlan(const std::string& plan_name, Plan* plan) {
Global<CtrlClient>::Get()->PullKV(sub_plan_key(plan_name, machine_id, thrd_id), &sub_plan);
plan->mutable_task()->MergeFrom(sub_plan.task());
}
NetTopo net_topo;
Global<CtrlClient>::Get()->PullKV(net_topo_key(plan_name), &net_topo);
*(plan->mutable_net_topo()) = net_topo;
CtrlRegstDescInfo ctrl_regst_desc_info;
Global<CtrlClient>::Get()->PullKV(ctrl_regst_desc_info_key(plan_name), &ctrl_regst_desc_info);
*(plan->mutable_ctrl_regst_desc_info()) = ctrl_regst_desc_info;
......@@ -399,7 +393,7 @@ Maybe<void> CompileCurJobOnMaster(Job* job, Plan* plan, bool need_job_complete)
return Maybe<void>::Ok();
}
void MergePlanWithoutGenNetTopo(Plan* plan, Plan&& other) {
void MergePlan(Plan* plan, Plan&& other) {
PbRpf<TaskProto>* dst_tasks = plan->mutable_task();
PbRpf<TaskProto>* src_tasks = other.mutable_task();
dst_tasks->Reserve(dst_tasks->size() + src_tasks->size());
......@@ -421,17 +415,10 @@ void MergePlanWithoutGenNetTopo(Plan* plan, Plan&& other) {
}
}
void MergeSubPlanWithoutGenNetTopo(Plan* plan, std::vector<Plan>&& sub_plans) {
void MergeSubPlan(Plan* plan, std::vector<Plan>&& sub_plans) {
CHECK(!sub_plans.empty());
*plan = std::move(sub_plans.at(0));
FOR_RANGE(int32_t, i, 1, sub_plans.size()) {
MergePlanWithoutGenNetTopo(plan, std::move(sub_plans.at(i)));
}
}
void MergePlan(Plan* plan, Plan&& other) {
MergePlanWithoutGenNetTopo(plan, std::move(other));
Compiler().GenNetTopo(plan);
FOR_RANGE(int32_t, i, 1, sub_plans.size()) { MergePlan(plan, std::move(sub_plans.at(i))); }
}
void DumpCtrlRegstInfoToPlan(Plan* plan) {
......@@ -1161,7 +1148,7 @@ Maybe<void> CompileJobsAndMergePlans(const PbRpf<Job>& job_confs, Plan& plan) {
auto scope = std::make_unique<GlobalJobDescScope>(jobs.at(i)->job_conf(), i);
JUST(CompileCurJobOnMaster(jobs.at(i).get(), &sub_plans.at(i), true));
}
MergeSubPlanWithoutGenNetTopo(&plan, std::move(sub_plans));
MergeSubPlan(&plan, std::move(sub_plans));
InterJobMemSharingUtil::MergeMemReusedChunkBetweenUserJobs(function_jobs, &plan);
InterJobMemSharingUtil::MergeMemSharedInterfaceMemBlockBetweenJobs(jobs, &plan);
PlanUtil::SetForceInplaceMemBlock(&plan);
......
......@@ -11,10 +11,6 @@ message MachineIds {
repeated int64 machine_id = 1;
}
message NetTopo {
map<int64, MachineIds> peer_machine_ids = 1;
}
message JobConfs {
map<int64, JobConfigProto> job_id2job_conf = 1;
}
......@@ -38,7 +34,6 @@ message OpAttributeInfo {
message Plan {
repeated TaskProto task = 1;
required MemBlockAndChunkList block_chunk_list = 2;
required NetTopo net_topo = 3;
required JobConfs job_confs = 4;
required CollectiveBoxingPlan collective_boxing_plan= 5;
required CtrlRegstDescInfo ctrl_regst_desc_info = 6;
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment