From 810d8db524abfb12f39f579c135c94aaacbd4dd3 Mon Sep 17 00:00:00 2001
From: Juncheng <liujuncheng1022@gmail.com>
Date: Fri, 16 Jul 2021 18:40:23 +0800
Subject: [PATCH] Remove Plan::net_topo (#5502)

Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
---
 oneflow/core/job/compiler.cpp | 41 -----------------------------------
 oneflow/core/job/compiler.h   |  1 -
 oneflow/core/job/oneflow.cpp  | 21 ++++--------------
 oneflow/core/job/plan.proto   |  5 -----
 4 files changed, 4 insertions(+), 64 deletions(-)

diff --git a/oneflow/core/job/compiler.cpp b/oneflow/core/job/compiler.cpp
index 1749181ee..d918bd41c 100644
--- a/oneflow/core/job/compiler.cpp
+++ b/oneflow/core/job/compiler.cpp
@@ -25,47 +25,6 @@ limitations under the License.
 
 namespace oneflow {
 
-void Compiler::GenNetTopo(Plan* plan) const {
-  HashMap<int64_t, int64_t> rid2mid;
-  HashMap<int64_t, int64_t> tid2mid;
-  std::map<int64_t, std::set<int64_t>> net_topo;
-
-  for (const TaskProto& task_proto : plan->task()) {
-    for (const auto& regst_desc_it : task_proto.produced_regst_desc()) {
-      rid2mid.emplace(regst_desc_it.second.regst_desc_id(), task_proto.machine_id());
-    }
-    CHECK(tid2mid.emplace(task_proto.task_id(), task_proto.machine_id()).second);
-  }
-
-  for (const TaskProto& task_proto : plan->task()) {
-    for (const auto& regst_desc_it : task_proto.produced_regst_desc()) {
-      int64_t rid = regst_desc_it.second.regst_desc_id();
-      auto rid2mid_it = rid2mid.find(rid);
-      CHECK(rid2mid_it != rid2mid.end());
-      int64_t producer_mid = rid2mid_it->second;
-      for (int64_t consumer_task_id : regst_desc_it.second.consumer_task_id()) {
-        auto tid2mid_it = tid2mid.find(consumer_task_id);
-        CHECK(tid2mid_it != tid2mid.end());
-        int64_t consumer_mid = tid2mid_it->second;
-        net_topo[producer_mid].insert(consumer_mid);
-        net_topo[consumer_mid].insert(producer_mid);
-      }
-    }
-  }
-
-  HashMap<int64_t, MachineIds> std_net_topo;
-  NetTopo& pb_net_topo = *(plan->mutable_net_topo());
-  for (auto& pair : net_topo) {
-    int64_t src_mid = pair.first;
-    if (pair.second.count(src_mid)) { pair.second.erase(src_mid); }
-    std::vector<int64_t> peer_mids(pair.second.begin(), pair.second.end());
-    MachineIds pb_mids;
-    *(pb_mids.mutable_machine_id()) = StdVec2PbRf<int64_t>(peer_mids);
-    CHECK(std_net_topo.emplace(src_mid, pb_mids).second);
-  }
-  *(pb_net_topo.mutable_peer_machine_ids()) = HashMap2PbMap(std_net_topo);
-}
-
 void CreateOpAttributeRef(Plan* plan, int64_t job_id, TaskProto* task_proto) {
   auto* job_id2op_attribute_ref_table = plan->mutable_job_id2op_attribute_ref_table();
   CHECK(task_proto->exec_sequence().exec_node_size() == 1);
diff --git a/oneflow/core/job/compiler.h b/oneflow/core/job/compiler.h
index 4c51fa543..8b119e684 100644
--- a/oneflow/core/job/compiler.h
+++ b/oneflow/core/job/compiler.h
@@ -30,7 +30,6 @@ class Compiler final {
   ~Compiler() = default;
 
   void Compile(Job*, Plan*, bool need_job_complete) const;
-  void GenNetTopo(Plan* plan) const;
 };
 
 }  // namespace oneflow
diff --git a/oneflow/core/job/oneflow.cpp b/oneflow/core/job/oneflow.cpp
index 6174ecdca..e561fd679 100644
--- a/oneflow/core/job/oneflow.cpp
+++ b/oneflow/core/job/oneflow.cpp
@@ -103,8 +103,6 @@ std::string cluster_thrd_ids_key(const std::string& plan_name) {
   return plan_name + "_cluster_thrd_ids";
 }
 
-std::string net_topo_key(const std::string& plan_name) { return plan_name + "_net_topo"; }
-
 std::string ctrl_regst_desc_info_key(const std::string& plan_name) {
   return plan_name + "_ctrl_regst_desc_info_key";
 }
@@ -191,7 +189,6 @@ void PushPlan(const std::string& plan_name, Plan&& plan) {
     Global<CtrlClient>::Get()->PushKV(block7chunk_key(plan_name, pair.first), pair.second);
   }
 
-  Global<CtrlClient>::Get()->PushKV(net_topo_key(plan_name), plan.net_topo());
   Global<CtrlClient>::Get()->PushKV(ctrl_regst_desc_info_key(plan_name),
                                     plan.ctrl_regst_desc_info());
   Global<CtrlClient>::Get()->PushKV(job_id2job_conf(plan_name), plan.job_confs());
@@ -214,9 +211,6 @@ void PullPlan(const std::string& plan_name, Plan* plan) {
     Global<CtrlClient>::Get()->PullKV(sub_plan_key(plan_name, machine_id, thrd_id), &sub_plan);
     plan->mutable_task()->MergeFrom(sub_plan.task());
   }
-  NetTopo net_topo;
-  Global<CtrlClient>::Get()->PullKV(net_topo_key(plan_name), &net_topo);
-  *(plan->mutable_net_topo()) = net_topo;
   CtrlRegstDescInfo ctrl_regst_desc_info;
   Global<CtrlClient>::Get()->PullKV(ctrl_regst_desc_info_key(plan_name), &ctrl_regst_desc_info);
   *(plan->mutable_ctrl_regst_desc_info()) = ctrl_regst_desc_info;
@@ -399,7 +393,7 @@ Maybe<void> CompileCurJobOnMaster(Job* job, Plan* plan, bool need_job_complete)
   return Maybe<void>::Ok();
 }
 
-void MergePlanWithoutGenNetTopo(Plan* plan, Plan&& other) {
+void MergePlan(Plan* plan, Plan&& other) {
   PbRpf<TaskProto>* dst_tasks = plan->mutable_task();
   PbRpf<TaskProto>* src_tasks = other.mutable_task();
   dst_tasks->Reserve(dst_tasks->size() + src_tasks->size());
@@ -421,17 +415,10 @@ void MergePlanWithoutGenNetTopo(Plan* plan, Plan&& other) {
   }
 }
 
-void MergeSubPlanWithoutGenNetTopo(Plan* plan, std::vector<Plan>&& sub_plans) {
+void MergeSubPlan(Plan* plan, std::vector<Plan>&& sub_plans) {
   CHECK(!sub_plans.empty());
   *plan = std::move(sub_plans.at(0));
-  FOR_RANGE(int32_t, i, 1, sub_plans.size()) {
-    MergePlanWithoutGenNetTopo(plan, std::move(sub_plans.at(i)));
-  }
-}
-
-void MergePlan(Plan* plan, Plan&& other) {
-  MergePlanWithoutGenNetTopo(plan, std::move(other));
-  Compiler().GenNetTopo(plan);
+  FOR_RANGE(int32_t, i, 1, sub_plans.size()) { MergePlan(plan, std::move(sub_plans.at(i))); }
 }
 
 void DumpCtrlRegstInfoToPlan(Plan* plan) {
@@ -1161,7 +1148,7 @@ Maybe<void> CompileJobsAndMergePlans(const PbRpf<Job>& job_confs, Plan& plan) {
     auto scope = std::make_unique<GlobalJobDescScope>(jobs.at(i)->job_conf(), i);
     JUST(CompileCurJobOnMaster(jobs.at(i).get(), &sub_plans.at(i), true));
   }
-  MergeSubPlanWithoutGenNetTopo(&plan, std::move(sub_plans));
+  MergeSubPlan(&plan, std::move(sub_plans));
   InterJobMemSharingUtil::MergeMemReusedChunkBetweenUserJobs(function_jobs, &plan);
   InterJobMemSharingUtil::MergeMemSharedInterfaceMemBlockBetweenJobs(jobs, &plan);
   PlanUtil::SetForceInplaceMemBlock(&plan);
diff --git a/oneflow/core/job/plan.proto b/oneflow/core/job/plan.proto
index fe08a4b5e..b0ffbbad5 100644
--- a/oneflow/core/job/plan.proto
+++ b/oneflow/core/job/plan.proto
@@ -11,10 +11,6 @@ message MachineIds {
   repeated int64 machine_id = 1;
 }
 
-message NetTopo {
-  map<int64, MachineIds> peer_machine_ids = 1;
-}
-
 message JobConfs {
   map<int64, JobConfigProto> job_id2job_conf = 1;
 }
@@ -38,7 +34,6 @@ message OpAttributeInfo {
 message Plan {
   repeated TaskProto task = 1;
   required MemBlockAndChunkList block_chunk_list = 2;
-  required NetTopo net_topo = 3;
   required JobConfs job_confs = 4;
   required CollectiveBoxingPlan collective_boxing_plan= 5;
   required CtrlRegstDescInfo ctrl_regst_desc_info = 6;
-- 
GitLab