Skip to content
Snippets Groups Projects
Unverified Commit d9d28b80 authored by Juncheng's avatar Juncheng Committed by GitHub
Browse files

Remove obsolete Profiler (#5747)

parent 29609b44
No related branches found
No related tags found
No related merge requests found
Showing
with 8 additions and 229 deletions
......@@ -24,7 +24,6 @@ class AccCompActor final : public CompActor {
~AccCompActor() override = default;
private:
int64_t ActNumForEachOutput(int64_t regst_desc_id) const override;
void Act() override;
void VirtualAsyncSendNaiveProducedRegstMsgToConsumer() override;
......@@ -63,10 +62,6 @@ void AccCompActor::Init(const TaskProto& task_proto, int32_t max_acc_cnt) {
max_acc_cnt_ = max_acc_cnt;
}
int64_t AccCompActor::ActNumForEachOutput(int64_t regst_desc_id) const {
return regst_desc_id == Name2SoleRegstDescId("out") ? max_acc_cnt_ : 1;
}
void AccCompActor::Act() {
Regst* out_regst = GetNaiveCurWriteable("out");
Regst* in_regst = GetNaiveCurReadable("in");
......
......@@ -31,10 +31,6 @@ void AccTickCompActor::VirtualCompActorInit(const TaskProto& proto) {
OF_SET_MSG_HANDLER(&AccTickCompActor::HandlerNormal);
}
int64_t AccTickCompActor::ActNumForEachOutput(int64_t regst_desc_id) const {
return regst_desc_id == Name2SoleRegstDescId("out") ? max_acc_cnt_ : 1;
}
void AccTickCompActor::Act() { acc_cnt_ += 1; }
void AccTickCompActor::VirtualAsyncSendNaiveProducedRegstMsgToConsumer() {
......
......@@ -28,7 +28,6 @@ class AccTickCompActor : public CompActor {
protected:
void VirtualCompActorInit(const TaskProto& proto) override;
int64_t ActNumForEachOutput(int64_t regst_desc_id) const override;
private:
void Act() override;
......
syntax = "proto2";
package oneflow;
message ReadableRegstInfo {
required int64 regst_desc_id = 1;
required int64 act_id = 2;
}
message ActEvent {
required bool is_experiment_phase = 1;
required int64 actor_id = 2;
required int64 work_stream_id = 3;
required int64 act_id = 4;
required double ready_time = 5;
required double start_time = 6;
required double stop_time = 7;
repeated ReadableRegstInfo readable_regst_infos = 10;
}
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/actor/act_event_logger.h"
#include "oneflow/core/persistence/persistent_in_stream.h"
#include "oneflow/core/common/protobuf.h"
#include <google/protobuf/text_format.h>
namespace oneflow {
const std::string ActEventLogger::experiment_prefix_("experiment_");
const std::string ActEventLogger::act_event_bin_filename_("act_event.bin");
const std::string ActEventLogger::act_event_txt_filename_("act_event.txt");
void ActEventLogger::PrintActEventToLogDir(const ActEvent& act_event) {
bin_out_stream_ << act_event;
std::string act_event_txt;
google::protobuf::TextFormat::PrintToString(act_event, &act_event_txt);
txt_out_stream_ << act_event_txt;
}
std::string ActEventLogger::experiment_act_event_bin_filename() {
return experiment_prefix_ + act_event_bin_filename_;
}
std::string ActEventLogger::act_event_bin_filename() { return act_event_bin_filename_; }
ActEventLogger::ActEventLogger(bool is_experiment)
: bin_out_stream_(LocalFS(), JoinPath(FLAGS_log_dir, (is_experiment ? experiment_prefix_ : "")
+ act_event_bin_filename_)),
txt_out_stream_(LocalFS(), JoinPath(FLAGS_log_dir, (is_experiment ? experiment_prefix_ : "")
+ act_event_txt_filename_)) {}
void ParseActEvents(const std::string& act_event_filepath,
std::list<std::unique_ptr<ActEvent>>* act_events) {
PersistentInStream in_stream(LocalFS(), act_event_filepath);
int64_t act_event_size;
while (!in_stream.ReadFully(reinterpret_cast<char*>(&act_event_size), sizeof(act_event_size))) {
std::vector<char> buffer(act_event_size);
CHECK(!in_stream.ReadFully(buffer.data(), act_event_size));
auto act_event = std::make_unique<ActEvent>();
act_event->ParseFromArray(buffer.data(), act_event_size);
act_events->emplace_back(std::move(act_event));
}
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_ACTOR_ACT_EVENT_LOGGER_H_
#define ONEFLOW_CORE_ACTOR_ACT_EVENT_LOGGER_H_
#include "oneflow/core/common/util.h"
#include "oneflow/core/common/str_util.h"
#include "oneflow/core/actor/act_event.pb.h"
#include "oneflow/core/persistence/persistent_out_stream.h"
namespace oneflow {
class ActEventLogger final {
public:
OF_DISALLOW_COPY_AND_MOVE(ActEventLogger);
~ActEventLogger() = default;
void PrintActEventToLogDir(const ActEvent&);
static std::string experiment_act_event_bin_filename();
static std::string act_event_bin_filename();
private:
static const std::string experiment_prefix_;
static const std::string act_event_bin_filename_;
static const std::string act_event_txt_filename_;
friend class Global<ActEventLogger>;
ActEventLogger(bool is_experiment_phase);
PersistentOutStream bin_out_stream_;
PersistentOutStream txt_out_stream_;
};
void ParseActEvents(const std::string& act_event_filepath,
std::list<std::unique_ptr<ActEvent>>* act_events);
} // namespace oneflow
#endif // ONEFLOW_CORE_ACTOR_ACT_EVENT_LOGGER_H_
......@@ -42,7 +42,6 @@ void Actor::Init(const JobDesc* job_desc, const TaskProto& task_proto,
job_desc_ = job_desc;
actor_id_ = task_proto.task_id();
global_work_stream_id_ = Global<IDMgr>::Get()->GlobalWorkStreamId4ActorId(actor_id_);
act_id_ = -1;
job_id_ = task_proto.job_id();
InitDeviceCtx(thread_ctx);
if (task_proto.has_parallel_ctx()) {
......@@ -69,7 +68,6 @@ void Actor::Init(const JobDesc* job_desc, const TaskProto& task_proto,
});
int64_t regst_desc_id = pair.second.regst_desc_id();
CHECK(name2regst_desc_id_.insert({pair.first, {regst_desc_id}}).second);
produced_regst2expected_act_id_[regst_desc_id] = act_id_;
if (pair.second.regst_desc_type().has_ctrl_regst_desc()) {
produced_ctrl_regst_desc_ids_.insert(regst_desc_id);
}
......@@ -246,11 +244,6 @@ KernelCtx Actor::GenDefaultKernelCtx() const {
return ctx;
}
void Actor::SetReadableRegstInfo(const Regst* regst, ReadableRegstInfo* info) const {
info->set_regst_desc_id(regst->regst_desc_id());
info->set_act_id(regst->act_id());
}
void Actor::ForEachCurNaiveReadableDataRegst(std::function<void(const Regst*)> func) const {
naive_consumed_rs_.ForEachFrontRegst([func](int64_t regst_desc_id, Regst* regst) {
if (Global<RegstMgr>::Get()->HasProducerTaskId4RegstDescId(regst_desc_id)) { return; }
......@@ -355,42 +348,8 @@ int Actor::HandlerZombie(const ActorMsg& msg) {
return 0;
}
void Actor::TryLogActEvent(const std::function<void()>& DoAct) const {
if (false) {
auto act_event = std::make_shared<ActEvent>();
act_event->set_actor_id(actor_id());
act_event->set_work_stream_id(global_work_stream_id_);
act_event->set_act_id(act_id_);
act_event->set_ready_time(GetCurTime());
naive_consumed_rs_.ForEachFrontRegst([&](int64_t regst_desc_id, const Regst* readable_regst) {
if (Global<RegstMgr>::Get()->HasProducerTaskId4RegstDescId(regst_desc_id)) { return; }
ReadableRegstInfo* info = act_event->add_readable_regst_infos();
Actor::SetReadableRegstInfo(readable_regst, info);
});
ForEachCurCustomizedReadableRegst([&](const Regst* readable_regst) {
ReadableRegstInfo* info = act_event->add_readable_regst_infos();
SetReadableRegstInfo(readable_regst, info);
});
device_ctx_->AddCallBack([act_event]() { act_event->set_start_time(GetCurTime()); });
DoAct();
device_ctx_->AddCallBack([act_event]() {
act_event->set_stop_time(GetCurTime());
// The stream poller thread is not allowed to perform blocking RPC call. Hence, the
// RPC call is forwarded to the thread pool and will be executed there.
Global<ThreadPool>::Get()->AddWork(
[act_event]() { Global<CtrlClient>::Get()->PushActEvent(*act_event); });
});
} else {
DoAct();
}
}
void Actor::ActUntilFail() {
while (IsReadReady() && IsWriteReady()) {
act_id_ += 1;
// TryLogActEvent([&] { Act(); }); NOTE(chengcheng): LogActEvent NOT ready now.
Act();
AsyncSendCustomizedProducedRegstMsgToConsumer();
......@@ -485,7 +444,6 @@ void Actor::AsyncSendProducedCtrlRegstMsgToConsumer() {
int64_t Actor::HandleRegstToConsumer(Regst* regst) {
auto regst_reading_cnt_it = produced_regst2reading_cnt_.find(regst);
CHECK_EQ(regst_reading_cnt_it->second, 0);
regst->set_act_id(act_id_);
int64_t real_consumer_cnt = 0;
for (int64_t consumer : regst->consumers_actor_id()) {
......@@ -622,12 +580,6 @@ int Actor::TryUpdtStateAsProducedRegst(Regst* regst) {
} else if (naive_produced_rs_.TryPushBackRegst(regst) != 0) {
UpdtStateAsCustomizedProducedRegst(regst);
}
int64_t& expected_act_id = produced_regst2expected_act_id_[regst->regst_desc_id()];
if (expected_act_id >= 0 && CheckOutputActId(regst->regst_desc_id())) {
CHECK_EQ(regst->act_id(), expected_act_id);
}
expected_act_id = regst->act_id() + ActNumForEachOutput(regst->regst_desc_id());
return 0;
}
......
......@@ -16,7 +16,6 @@ limitations under the License.
#ifndef ONEFLOW_CORE_ACTOR_ACTOR_H_
#define ONEFLOW_CORE_ACTOR_ACTOR_H_
#include "oneflow/core/actor/act_event.pb.h"
#include "oneflow/core/actor/actor_message_bus.h"
#include "oneflow/core/device/cpu_device_context.h"
#include "oneflow/core/device/cuda_device_context.h"
......@@ -75,10 +74,8 @@ class Actor {
std::unique_ptr<DeviceCtx>& mut_device_ctx() { return device_ctx_; }
KernelCtx GenDefaultKernelCtx() const;
const std::vector<ExecKernel>& exec_kernel_vec() { return exec_kernel_vec_; }
virtual void SetReadableRegstInfo(const Regst*, ReadableRegstInfo*) const;
void ForEachCurNaiveReadableDataRegst(std::function<void(const Regst*)>) const;
int64_t act_id() const { return act_id_; }
int64_t ReadingCnt4ProducedRegst(Regst* regst) const;
void IncreaseReadingCnt4ProducedRegst(Regst* regst, int64_t val);
void IncreaseTotalReadingCnt(int64_t val) { total_reading_cnt_ += val; }
......@@ -151,12 +148,6 @@ class Actor {
// Act
void ActUntilFail();
virtual void Act() { UNIMPLEMENTED(); }
virtual int64_t ActNumForEachOutput(int64_t regst_desc_id) const { return 1; }
virtual bool CheckOutputActId(int64_t regst_desc_id) const {
return true; // TODO(jiyuan): figure out the ActNumForEachOutput of the model regsts to MdSave
// area
}
void TryLogActEvent(const std::function<void()>& Callback) const;
// Ready
bool IsReadReady() const;
......@@ -205,7 +196,6 @@ class Actor {
const JobDesc* job_desc_;
int64_t actor_id_;
int64_t global_work_stream_id_;
int64_t act_id_;
int64_t job_id_;
std::unique_ptr<ParallelContext> parallel_ctx_;
std::vector<ExecKernel> exec_kernel_vec_;
......@@ -216,7 +206,6 @@ class Actor {
int64_t remaining_eord_cnt_;
HashMap<int64_t, std::vector<std::unique_ptr<Regst>>> produced_regsts_;
HashMap<int64_t, int64_t> produced_regst2expected_act_id_;
HashMap<Regst*, int64_t> produced_regst2reading_cnt_;
int64_t total_reading_cnt_;
......
......@@ -43,8 +43,7 @@ ActorMsg ActorMsg::BuildRegstMsgToConsumer(int64_t producer, int64_t consumer,
msg.msg_type_ = ActorMsgType::kRegstMsg;
msg.regst_wrapper_.regst = regst_raw_ptr;
msg.regst_wrapper_.comm_net_token = nullptr;
msg.regst_wrapper_.regst_status = regst_raw_ptr->status();
msg.regst_wrapper_.regst_status.regst_desc_id = regst_raw_ptr->regst_desc_id();
msg.regst_wrapper_.regst_desc_id = regst_raw_ptr->regst_desc_id();
msg.regst_wrapper_.has_sole_empty_blob = IsSoleBlobAndDynamicEmpty(regst_raw_ptr);
msg.regst_wrapper_.is_data_regst_to_consumer =
regst_raw_ptr->regst_desc()->regst_desc_type().has_data_regst_desc();
......@@ -58,7 +57,7 @@ ActorMsg ActorMsg::BuildRegstMsgToProducer(int64_t consumer, int64_t producer,
msg.dst_actor_id_ = producer;
msg.msg_type_ = ActorMsgType::kRegstMsg;
msg.regst_wrapper_.regst = regst_raw_ptr;
msg.regst_wrapper_.regst_status.regst_desc_id = -1;
msg.regst_wrapper_.regst_desc_id = -1;
msg.regst_wrapper_.comm_net_token = nullptr;
// you can NOT access the regst ptr when multi nodes, because the address is in another machine
msg.regst_wrapper_.has_sole_empty_blob = false;
......@@ -103,7 +102,7 @@ int64_t ActorMsg::regst_desc_id() const {
if (Global<IDMgr>::Get()->MachineId4ActorId(src_actor_id_) == GlobalProcessCtx::Rank()) {
return regst_wrapper_.regst->regst_desc_id();
} else {
return regst_wrapper_.regst_status.regst_desc_id;
return regst_wrapper_.regst_desc_id;
}
}
......@@ -117,11 +116,6 @@ void ActorMsg::set_comm_net_sequence_number(int64_t sequence_number) {
regst_wrapper_.comm_net_sequence_number = sequence_number;
}
int64_t ActorMsg::act_id() const {
CHECK_EQ(msg_type_, ActorMsgType::kRegstMsg);
return regst_wrapper_.regst_status.act_id;
}
void* ActorMsg::comm_net_token() const {
CHECK_EQ(msg_type_, ActorMsgType::kRegstMsg);
return regst_wrapper_.comm_net_token;
......
......@@ -52,7 +52,6 @@ class ActorMsg final {
ActorCmd actor_cmd() const;
Regst* regst() const;
int64_t regst_desc_id() const;
int64_t act_id() const;
void* comm_net_token() const;
void set_comm_net_token(void* token);
bool has_sole_empty_blob() const;
......@@ -79,7 +78,7 @@ class ActorMsg final {
Regst* regst;
void* comm_net_token;
int64_t comm_net_sequence_number;
RegstStatus regst_status;
int64_t regst_desc_id;
bool has_sole_empty_blob;
bool is_data_regst_to_consumer;
};
......
......@@ -124,8 +124,6 @@ void CaseCompActor::AsyncSendCustomizedProducedRegstMsgToConsumer() {
bool CaseCompActor::ProducedCtrlRegstValid(int64_t regst_desc_id) const { return true; }
bool CaseCompActor::CheckOutputActId(int64_t regst_desc_id) const { return false; }
REGISTER_ACTOR(kCase, CaseCompActor);
} // namespace oneflow
......@@ -39,7 +39,6 @@ class CaseCompActor final : public CompActor {
bool ProducedCtrlRegstValid(int64_t regst_desc_id) const override;
void NormalProcessCustomizedReadableRegstMsg(const ActorMsg&) override;
void NormalProcessCustomizedEordMsg(const ActorMsg&) override {}
bool CheckOutputActId(int64_t regst_desc_id) const override;
std::pair<RegstNameType, HashSet<std::string>> GetNaiveOrCustomizedConsumedRegstDescName()
override {
return std::make_pair(RegstNameType::kNaive, HashSet<std::string>{});
......
......@@ -55,19 +55,11 @@ void CopyCommNetActor::ForEachCurCustomizedReadableRegst(
handler(sequence_number2regst_ctx_.at(next_sequence_number_).regst_raw_ptr);
}
void CopyCommNetActor::SetReadableRegstInfo(const Regst* regst, ReadableRegstInfo* info) const {
const RegstCtx& regst_ctx = sequence_number2regst_ctx_.at(next_sequence_number_);
CHECK(regst == regst_ctx.regst_raw_ptr);
info->set_regst_desc_id(in_regst_desc_id_);
info->set_act_id(regst_ctx.act_id);
}
bool CopyCommNetActor::NormalTryProcessReadableMsgFromOtherMachine(const ActorMsg& msg) {
RegstCtx regst_ctx;
regst_ctx.comm_net_token = msg.comm_net_token();
regst_ctx.regst_raw_ptr = msg.regst();
regst_ctx.producer = msg.src_actor_id();
regst_ctx.act_id = msg.act_id();
regst_ctx.has_sole_empty_blob = msg.has_sole_empty_blob();
CHECK(sequence_number2regst_ctx_.emplace(msg.comm_net_sequence_number(), regst_ctx).second);
return true;
......
......@@ -32,13 +32,11 @@ class CopyCommNetActor final : public Actor {
void* comm_net_token;
Regst* regst_raw_ptr;
int64_t producer;
int64_t act_id;
bool has_sole_empty_blob;
};
void VirtualActorInit(const TaskProto&) override;
void InitDeviceCtx(const ThreadCtx&) override;
void SetReadableRegstInfo(const Regst*, ReadableRegstInfo*) const override;
std::pair<RegstNameType, HashSet<std::string>> GetNaiveOrCustomizedConsumedRegstDescName()
override {
......
......@@ -44,10 +44,6 @@ void InputWiseCompActor::Init(const TaskProto& task_proto) {
OF_SET_MSG_HANDLER(&InputWiseCompActor::HandlerNormal);
}
int64_t InputWiseCompActor::ActNumForEachOutput(int64_t regst_desc_id) const {
return regst_desc_id2in_bn_id_.size();
}
void InputWiseCompActor::NormalProcessCustomizedReadableRegstMsg(const ActorMsg& msg) {
CHECK_EQ(0, consumed_rs_.TryPushBackRegst(msg.regst()));
}
......
......@@ -32,7 +32,6 @@ class InputWiseCompActor : public CompActor {
int64_t processed_regst_desc_id_cnt() const { return processed_regst_desc_id_cnt_; }
int64_t RegstDescNum() const { return consumed_rs_.total_regst_desc_cnt(); }
int64_t InBnId4RegstDescId(int64_t id) const { return regst_desc_id2in_bn_id_.at(id); }
int64_t ActNumForEachOutput(int64_t regst_desc_id) const override;
bool ProducedCtrlRegstValid(int64_t regst_desc_id) const override;
......
......@@ -31,7 +31,6 @@ class PackCompActor final : public CompActor {
void Act() override;
void VirtualAsyncSendNaiveProducedRegstMsgToConsumer() override;
void VirtualAsyncSendNaiveConsumedRegstMsgToProducer() override;
int64_t ActNumForEachOutput(int64_t) const override { return total_pack_num_; }
size_t total_pack_num_;
size_t act_num_cnt_;
......
......@@ -18,6 +18,7 @@ limitations under the License.
namespace oneflow {
void ReentrantLockCompActor::VirtualCompActorInit(const TaskProto& task_proto) {
act_id_ = 0;
CHECK_EQ(1, exec_kernel_vec().size());
const auto& kernel_conf = task_proto.exec_sequence().exec_node().Get(0).kernel_conf();
const auto& ibns = kernel_conf.op_attribute().input_bns();
......@@ -60,7 +61,8 @@ void ReentrantLockCompActor::Act() {
cur_processed_regst_desc_id_ = GetCurProcessedRegstDescId();
Regst* const cur_regst = consumed_rs_.Front(cur_processed_regst_desc_id_);
reentrant_lock_status_.set_cur_ibn(Ibn4RegstDescId(cur_processed_regst_desc_id_));
reentrant_lock_status_.set_cur_act_id(act_id());
reentrant_lock_status_.set_cur_act_id(act_id_);
act_id_ += 1;
KernelCtx kernel_ctx = GenDefaultKernelCtx();
kernel_ctx.other = &reentrant_lock_status_;
AsyncLaunchKernel(kernel_ctx, [&](int64_t regst_desc_id) -> Regst* {
......
......@@ -29,7 +29,6 @@ class ReentrantLockCompActor final : public CompActor {
protected:
void VirtualCompActorInit(const TaskProto&) override;
bool CheckOutputActId(int64_t regst_desc_id) const override { return false; }
private:
void Act() override;
......@@ -54,6 +53,7 @@ class ReentrantLockCompActor final : public CompActor {
HashMap<int64_t, std::string> regst_desc_id2ibn_;
ReentrantLockStatus reentrant_lock_status_;
int64_t eord_regst_desc_id_;
int64_t act_id_;
};
} // namespace oneflow
......
......@@ -33,7 +33,6 @@ class SspVariableProxyCompActor final : public CompActor {
override {
return std::make_pair(RegstNameType::kNaive, HashSet<std::string>{});
}
bool CheckOutputActId(int64_t regst_desc_id) const override { return false; }
bool IsCustomizedReadReady() const override { return consumed_var_rs_.IsCurSlotReady(); }
bool IsCustomizedWriteReady() const override {
int64_t cur_staleness = (received_var_piece_id_ - ack_msg_returned_ref_piece_id_);
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment