Skip to content
Snippets Groups Projects
Unverified Commit 7df9302f authored by cheng cheng's avatar cheng cheng Committed by GitHub
Browse files

New/CloseRuntimeBuffers and RunLazyJob impl (#5571)

* NNGraph interface and implement for CompileAndRuntime

* fix compile

* RunLazyNNGraph

* New/CloseRuntimeBuffers and RunLazyJob impl

* using job_conf.concurrency_width for Source/Sink BufferSize

* Add note for buffer size
parent 47e9e661
No related branches found
No related tags found
No related merge requests found
......@@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/framework/nn_graph.h"
#include "oneflow/core/common/buffer_manager.h"
#include "oneflow/core/control/ctrl_client.h"
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/eager/eager_blob_object.h"
......@@ -21,18 +22,24 @@ limitations under the License.
#include "oneflow/core/job/compiler.h"
#include "oneflow/core/job/job_build_and_infer_ctx_mgr.h"
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/job/job_instance.h"
#include "oneflow/core/job/plan_util.h"
#include "oneflow/core/job/runtime.h"
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
namespace oneflow {
NNGraph::~NNGraph() { runtime_.reset(); }
NNGraph::~NNGraph() {
CloseRuntimeBuffers();
runtime_.reset();
}
const std::vector<std::string>& NNGraph::inputs_op_names() const { return input_op_names_; }
const std::vector<std::string>& NNGraph::outputs_op_names() const { return output_op_names_; }
int64_t NNGraph::variable_op_size() const { return variable_op_name2eager_blob_.size(); }
Maybe<void> NNGraph::RegisterInputOpNames(const std::vector<std::string>& input_op_names) {
input_op_names_.assign(input_op_names.begin(), input_op_names.end());
return Maybe<void>::Ok();
......@@ -65,6 +72,7 @@ Maybe<void> NNGraph::RegisterVariableOpNamesAndTensors(
}
Maybe<void> NNGraph::CompileAndInitRuntime() {
CHECK_OR_RETURN(!runtime_inited_);
JobBuildAndInferCtx* job_ctx = JUST(GetJobBuildAndInferCtx(name_));
job_ = job_ctx->job();
// TODO(chengcheng): CHECK job valid for each rank.
......@@ -95,16 +103,87 @@ Maybe<void> NNGraph::CompileAndInitRuntime() {
}
OF_SESSION_BARRIER();
}
// TODO(chengcheng): BufferMgr->NewBuffer for each inputs, outputs, wait ids, callback.
NewRuntimeBuffers();
runtime_.reset(new Runtime(plan_, GetMaxVal<size_t>(), false));
runtime_inited_ = true;
return Maybe<void>::Ok();
}
void NNGraph::NewRuntimeBuffers() {
auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get();
// NOTE(chengcheng):
// The BufferSize for each Buffer:
// 1. SourceTick and CallbackNotifier is job_conf.concurrency_width by user (default = 128)
// in Pipeline Parallelism, this value need greater than pipeline stage num for pipelining.
// 2. Input/Output Buffer is 2 because this is the minimum size of pipeline async launch job.
size_t concurrency_width = job_.job_conf().concurrency_width();
buffer_mgr->NewBuffer(GetSourceTickBufferName(name_), concurrency_width);
buffer_mgr->NewBuffer(GetCallbackNotifierBufferName(name_), concurrency_width);
for (const std::string& input_op_name : input_op_names_) {
buffer_mgr->NewBuffer(GetInputBufferName(name_, input_op_name), 2);
}
for (const std::string& output_op_name : output_op_names_) {
buffer_mgr->NewBuffer(GetOutputBufferName(name_, output_op_name), 2);
}
}
void NNGraph::CloseRuntimeBuffers() {
if (runtime_inited_) {
auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get();
for (const std::string& output_op_name : output_op_names_) {
buffer_mgr->Get(GetOutputBufferName(name_, output_op_name))->Close();
}
for (const std::string& input_op_name : input_op_names_) {
buffer_mgr->Get(GetInputBufferName(name_, input_op_name))->Close();
}
buffer_mgr->Get(GetCallbackNotifierBufferName(name_))->Close();
buffer_mgr->Get(GetSourceTickBufferName(name_))->Close();
}
}
namespace {
Maybe<void> MakeEagerBlobObjectList(std::vector<std::shared_ptr<vm::EagerBlobObject>>* blob_list,
const std::vector<std::shared_ptr<one::Tensor>>& tensor_list) {
for (const auto& tensor : tensor_list) {
CHECK_OR_RETURN(tensor->is_eager());
if (tensor->is_consistent()) {
blob_list->push_back(JUST(JUST(tensor->cur_rank_phy_tensor())->eager_blob_object()));
} else {
blob_list->push_back(JUST(tensor->eager_blob_object()));
}
}
return Maybe<void>::Ok();
}
} // namespace
Maybe<void> RunLazyNNGraph(const std::vector<std::shared_ptr<one::Tensor>>& inputs,
const std::vector<std::shared_ptr<one::Tensor>>& outputs,
const std::vector<std::shared_ptr<one::Tensor>>& parameters,
const std::shared_ptr<NNGraph>& nn_graph) {
TODO(); // call InstructionsBuilder::RunLazyJob
CHECK_EQ_OR_RETURN(inputs.size(), nn_graph->inputs_op_names().size());
CHECK_EQ_OR_RETURN(outputs.size(), nn_graph->outputs_op_names().size());
CHECK_EQ_OR_RETURN(parameters.size(), nn_graph->variable_op_size());
std::vector<std::shared_ptr<vm::EagerBlobObject>> input_blobs;
std::vector<std::shared_ptr<vm::EagerBlobObject>> output_blobs;
std::vector<std::shared_ptr<vm::EagerBlobObject>> var_blobs;
JUST(MakeEagerBlobObjectList(&input_blobs, inputs));
JUST(MakeEagerBlobObjectList(&output_blobs, outputs));
JUST(MakeEagerBlobObjectList(&var_blobs, parameters));
const auto& input_blob_list_ptr =
std::make_shared<const std::vector<std::shared_ptr<vm::EagerBlobObject>>>(
std::move(input_blobs));
const auto& output_blob_list_ptr =
std::make_shared<const std::vector<std::shared_ptr<vm::EagerBlobObject>>>(
std::move(output_blobs));
const auto& var_blob_list_ptr =
std::make_shared<const std::vector<std::shared_ptr<vm::EagerBlobObject>>>(
std::move(var_blobs));
JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> {
return builder->RunLazyJob(input_blob_list_ptr, output_blob_list_ptr, var_blob_list_ptr,
nn_graph);
}));
return Maybe<void>::Ok();
}
......
......@@ -28,12 +28,13 @@ class Runtime;
class NNGraph final : public NNGraphIf {
public:
explicit NNGraph(const std::string& name) : name_(name) {}
explicit NNGraph(const std::string& name) : name_(name), runtime_inited_(false) {}
~NNGraph();
const std::string& job_name() const { return name_; }
const std::vector<std::string>& inputs_op_names() const;
const std::vector<std::string>& outputs_op_names() const;
int64_t variable_op_size() const;
Maybe<void> RegisterInputOpNames(const std::vector<std::string>& input_op_names);
Maybe<void> RegisterOutputOpNames(const std::vector<std::string>& output_op_names);
......@@ -43,6 +44,9 @@ class NNGraph final : public NNGraphIf {
Maybe<void> CompileAndInitRuntime();
private:
void NewRuntimeBuffers();
void CloseRuntimeBuffers();
std::string name_;
std::vector<std::string> input_op_names_;
std::vector<std::string> output_op_names_;
......@@ -51,6 +55,7 @@ class NNGraph final : public NNGraphIf {
Plan plan_;
// TODO(chengcheng): temp impl using runtime now, need reimplement for dynamic multi nn.Graph.
std::unique_ptr<Runtime> runtime_;
bool runtime_inited_;
};
Maybe<void> RunLazyNNGraph(const std::vector<std::shared_ptr<one::Tensor>>& inputs,
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment