From 94ffe857620b9c938442701f4b426e80f452ae5b Mon Sep 17 00:00:00 2001 From: liufengwei0103 <2472937968@qq.com> Date: Mon, 5 Jul 2021 16:40:08 +0800 Subject: [PATCH] replace ForeignJobInstance using JobInstance (#5374) Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com> --- .../python/framework/foreign_job_instance.cpp | 73 ------------------- oneflow/api/python/framework/framework.h | 6 +- oneflow/api/python/framework/framework_api.h | 2 +- oneflow/api/python/framework/job_instance.cpp | 72 ++++++++++++++++++ ...{foreign_job_instance.h => job_instance.h} | 6 +- .../job/runtime_buffer_managers_scope.cpp | 6 +- oneflow/core/job/runtime_buffers_scope.cpp | 6 +- .../core/job/session_global_objects_scope.cpp | 2 +- .../core/kernel/callback_notify_kernel.cpp | 6 +- oneflow/core/kernel/foreign_input_kernel.cpp | 6 +- oneflow/core/kernel/foreign_output_kernel.cpp | 6 +- oneflow/python/framework/job_instance.py | 4 +- 12 files changed, 97 insertions(+), 98 deletions(-) delete mode 100644 oneflow/api/python/framework/foreign_job_instance.cpp create mode 100644 oneflow/api/python/framework/job_instance.cpp rename oneflow/core/job/{foreign_job_instance.h => job_instance.h} (92%) diff --git a/oneflow/api/python/framework/foreign_job_instance.cpp b/oneflow/api/python/framework/foreign_job_instance.cpp deleted file mode 100644 index 1afe904c8..000000000 --- a/oneflow/api/python/framework/foreign_job_instance.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* -Copyright 2020 The OneFlow Authors. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -#include <pybind11/pybind11.h> -#include <string> -#include <memory> -#include "oneflow/api/python/of_api_registry.h" -#include "oneflow/core/common/util.h" -#include "oneflow/core/job/foreign_job_instance.h" - -namespace py = pybind11; - -namespace oneflow { - -class PyForeignJobInstance : public ForeignJobInstance { - public: - // Inherit the constructors - using ForeignJobInstance::ForeignJobInstance; - - // Trampoline (need one for each virtual function) - std::string job_name() const override { - PYBIND11_OVERRIDE(std::string, /* Return type */ - ForeignJobInstance, /* Parent class */ - job_name, /* Name of function in C++ (must match Python name) */ - ); - } - - std::string sole_input_op_name_in_user_job() const override { - PYBIND11_OVERRIDE(std::string, ForeignJobInstance, sole_input_op_name_in_user_job, ); - } - - std::string sole_output_op_name_in_user_job() const override { - PYBIND11_OVERRIDE(std::string, ForeignJobInstance, sole_output_op_name_in_user_job, ); - } - - void PushBlob(uint64_t ofblob_ptr) const override { - PYBIND11_OVERRIDE(void, ForeignJobInstance, PushBlob, ofblob_ptr); - } - - void PullBlob(uint64_t ofblob_ptr) const override { - PYBIND11_OVERRIDE(void, ForeignJobInstance, PullBlob, ofblob_ptr); - } - - void Finish() const override { PYBIND11_OVERRIDE(void, ForeignJobInstance, Finish, ); } -}; - -} // namespace oneflow - -ONEFLOW_API_PYBIND11_MODULE("", m) { - using namespace oneflow; - - py::class_<ForeignJobInstance, PyForeignJobInstance, std::shared_ptr<ForeignJobInstance>>( - m, "ForeignJobInstance") - .def(py::init<>()) - .def("job_name", &ForeignJobInstance::job_name) - .def("sole_input_op_name_in_user_job", &ForeignJobInstance::sole_input_op_name_in_user_job) - .def("sole_output_op_name_in_user_job", &ForeignJobInstance::sole_output_op_name_in_user_job) - .def("PushBlob", &ForeignJobInstance::PushBlob) - .def("PullBlob", &ForeignJobInstance::PullBlob) - .def("Finish", &ForeignJobInstance::Finish); -} diff --git a/oneflow/api/python/framework/framework.h b/oneflow/api/python/framework/framework.h index d92de3bef..d346242a8 100644 --- a/oneflow/api/python/framework/framework.h +++ b/oneflow/api/python/framework/framework.h @@ -27,7 +27,7 @@ limitations under the License. #include "oneflow/core/job/inter_user_job_info.pb.h" #include "oneflow/core/job/foreign_callback.h" #include "oneflow/core/job/foreign_watcher.h" -#include "oneflow/core/job/foreign_job_instance.h" +#include "oneflow/core/job/job_instance.h" #include "oneflow/core/job/oneflow.h" #include "oneflow/core/job/placement.pb.h" #include "oneflow/core/framework/config_def.h" @@ -58,11 +58,11 @@ inline Maybe<void> RegisterWatcherOnlyOnce(const std::shared_ptr<ForeignWatcher> return Maybe<void>::Ok(); } -inline Maybe<void> LaunchJob(const std::shared_ptr<oneflow::ForeignJobInstance>& cb) { +inline Maybe<void> LaunchJob(const std::shared_ptr<oneflow::JobInstance>& cb) { CHECK_OR_RETURN(GlobalProcessCtx::IsThisProcessMaster()); CHECK_NOTNULL_OR_RETURN(Global<Oneflow>::Get()); const auto& job_name = cb->job_name(); - auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::Get(); + auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get(); int64_t job_id = Global<JobName2JobId>::Get()->at(job_name); if (IsPullJob(job_name, *Global<InterUserJobInfo>::Get())) { buffer_mgr->Get(GetForeignOutputBufferName(job_name))->Send(cb); diff --git a/oneflow/api/python/framework/framework_api.h b/oneflow/api/python/framework/framework_api.h index e3bbc6498..3d1b328a4 100644 --- a/oneflow/api/python/framework/framework_api.h +++ b/oneflow/api/python/framework/framework_api.h @@ -28,7 +28,7 @@ inline void RegisterWatcherOnlyOnce(const std::shared_ptr<oneflow::ForeignWatche return oneflow::RegisterWatcherOnlyOnce(watcher).GetOrThrow(); } -inline void LaunchJob(const std::shared_ptr<oneflow::ForeignJobInstance>& cb) { +inline void LaunchJob(const std::shared_ptr<oneflow::JobInstance>& cb) { return oneflow::LaunchJob(cb).GetOrThrow(); } diff --git a/oneflow/api/python/framework/job_instance.cpp b/oneflow/api/python/framework/job_instance.cpp new file mode 100644 index 000000000..367c39326 --- /dev/null +++ b/oneflow/api/python/framework/job_instance.cpp @@ -0,0 +1,72 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include <pybind11/pybind11.h> +#include <string> +#include <memory> +#include "oneflow/api/python/of_api_registry.h" +#include "oneflow/core/common/util.h" +#include "oneflow/core/job/job_instance.h" + +namespace py = pybind11; + +namespace oneflow { + +class PyJobInstance : public JobInstance { + public: + // Inherit the constructors + using JobInstance::JobInstance; + + // Trampoline (need one for each virtual function) + std::string job_name() const override { + PYBIND11_OVERRIDE(std::string, /* Return type */ + JobInstance, /* Parent class */ + job_name, /* Name of function in C++ (must match Python name) */ + ); + } + + std::string sole_input_op_name_in_user_job() const override { + PYBIND11_OVERRIDE(std::string, JobInstance, sole_input_op_name_in_user_job, ); + } + + std::string sole_output_op_name_in_user_job() const override { + PYBIND11_OVERRIDE(std::string, JobInstance, sole_output_op_name_in_user_job, ); + } + + void PushBlob(uint64_t ofblob_ptr) const override { + PYBIND11_OVERRIDE(void, JobInstance, PushBlob, ofblob_ptr); + } + + void PullBlob(uint64_t ofblob_ptr) const override { + PYBIND11_OVERRIDE(void, JobInstance, PullBlob, ofblob_ptr); + } + + void Finish() const override { PYBIND11_OVERRIDE(void, JobInstance, Finish, ); } +}; + +} // namespace oneflow + +ONEFLOW_API_PYBIND11_MODULE("", m) { + using namespace oneflow; + + py::class_<JobInstance, PyJobInstance, std::shared_ptr<JobInstance>>(m, "JobInstance") + .def(py::init<>()) + .def("job_name", &JobInstance::job_name) + .def("sole_input_op_name_in_user_job", &JobInstance::sole_input_op_name_in_user_job) + .def("sole_output_op_name_in_user_job", &JobInstance::sole_output_op_name_in_user_job) + .def("PushBlob", &JobInstance::PushBlob) + .def("PullBlob", &JobInstance::PullBlob) + .def("Finish", &JobInstance::Finish); +} diff --git a/oneflow/core/job/foreign_job_instance.h b/oneflow/core/job/job_instance.h similarity index 92% rename from oneflow/core/job/foreign_job_instance.h rename to oneflow/core/job/job_instance.h index ea1327054..5be11bd8a 100644 --- a/oneflow/core/job/foreign_job_instance.h +++ b/oneflow/core/job/job_instance.h @@ -20,11 +20,11 @@ limitations under the License. namespace oneflow { -class ForeignJobInstance { +class JobInstance { public: - ForeignJobInstance() = default; + JobInstance() = default; - virtual ~ForeignJobInstance() = default; + virtual ~JobInstance() = default; virtual std::string job_name() const { UNIMPLEMENTED(); } virtual std::string sole_input_op_name_in_user_job() const { UNIMPLEMENTED(); } diff --git a/oneflow/core/job/runtime_buffer_managers_scope.cpp b/oneflow/core/job/runtime_buffer_managers_scope.cpp index c74db1eaa..3c4ea7f95 100644 --- a/oneflow/core/job/runtime_buffer_managers_scope.cpp +++ b/oneflow/core/job/runtime_buffer_managers_scope.cpp @@ -15,17 +15,17 @@ limitations under the License. */ #include "oneflow/core/common/buffer_manager.h" #include "oneflow/core/job/runtime_buffer_managers_scope.h" -#include "oneflow/core/job/foreign_job_instance.h" +#include "oneflow/core/job/job_instance.h" namespace oneflow { RuntimeBufferManagersScope::RuntimeBufferManagersScope() { Global<BufferMgr<int64_t>>::New(); - Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::New(); + Global<BufferMgr<std::shared_ptr<JobInstance>>>::New(); } RuntimeBufferManagersScope::~RuntimeBufferManagersScope() { - Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::Delete(); + Global<BufferMgr<std::shared_ptr<JobInstance>>>::Delete(); Global<BufferMgr<int64_t>>::Delete(); } diff --git a/oneflow/core/job/runtime_buffers_scope.cpp b/oneflow/core/job/runtime_buffers_scope.cpp index 078e445b3..66123a42b 100644 --- a/oneflow/core/job/runtime_buffers_scope.cpp +++ b/oneflow/core/job/runtime_buffers_scope.cpp @@ -16,14 +16,14 @@ limitations under the License. #include "oneflow/core/common/buffer_manager.h" #include "oneflow/core/job/runtime_buffers_scope.h" #include "oneflow/core/job/job_desc.h" -#include "oneflow/core/job/foreign_job_instance.h" +#include "oneflow/core/job/job_instance.h" namespace oneflow { RuntimeBuffersScope::RuntimeBuffersScope(const JobConfs& job_confs) { size_t job_size = Global<JobName2JobId>::Get()->size(); Global<BufferMgr<int64_t>>::Get()->NewBuffer(kBufferNameGlobalWaitJobId, job_size); - auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::Get(); + auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get(); for (const auto& pair : job_confs.job_id2job_conf()) { const auto& job_name = pair.second.job_name(); CHECK_EQ(pair.first, Global<JobName2JobId>::Get()->at(job_name)); @@ -35,7 +35,7 @@ RuntimeBuffersScope::RuntimeBuffersScope(const JobConfs& job_confs) { } RuntimeBuffersScope::~RuntimeBuffersScope() { - auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::Get(); + auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get(); for (const auto& pair : *Global<JobName2JobId>::Get()) { const auto& job_name = pair.first; buffer_mgr->Get(GetCallbackNotifierBufferName(job_name))->Close(); diff --git a/oneflow/core/job/session_global_objects_scope.cpp b/oneflow/core/job/session_global_objects_scope.cpp index 21614defd..4b7d277b2 100644 --- a/oneflow/core/job/session_global_objects_scope.cpp +++ b/oneflow/core/job/session_global_objects_scope.cpp @@ -21,7 +21,7 @@ limitations under the License. #include "oneflow/core/job/available_memory_desc.pb.h" #include "oneflow/core/job/id_manager.h" #include "oneflow/core/job/profiler.h" -#include "oneflow/core/job/foreign_job_instance.h" +#include "oneflow/core/job/job_instance.h" #include "oneflow/core/job/inter_user_job_info.pb.h" #include "oneflow/core/job/job_desc.h" #include "oneflow/core/job/critical_section_desc.h" diff --git a/oneflow/core/kernel/callback_notify_kernel.cpp b/oneflow/core/kernel/callback_notify_kernel.cpp index 7e9bd525e..5268f351a 100644 --- a/oneflow/core/kernel/callback_notify_kernel.cpp +++ b/oneflow/core/kernel/callback_notify_kernel.cpp @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "oneflow/core/kernel/callback_notify_kernel.h" -#include "oneflow/core/job/foreign_job_instance.h" +#include "oneflow/core/job/job_instance.h" namespace oneflow { @@ -23,8 +23,8 @@ void CallbackNotifyKernel<T>::ForwardDataContent( const KernelCtx& ctx, std::function<Blob*(const std::string&)> BnInOp2Blob) const { T job_id = *BnInOp2Blob("in")->dptr<T>(); const auto& buffer_name = this->op_conf().callback_notify_conf().callback_buffer_name(job_id); - std::shared_ptr<ForeignJobInstance> foreign_job_instance; - BufferStatus buffer_status = Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::Get() + std::shared_ptr<JobInstance> foreign_job_instance; + BufferStatus buffer_status = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get() ->Get(buffer_name) ->TryReceive(&foreign_job_instance); CHECK_NE(buffer_status, kBufferStatusEmpty); diff --git a/oneflow/core/kernel/foreign_input_kernel.cpp b/oneflow/core/kernel/foreign_input_kernel.cpp index e4e5c57d4..b85b5adb5 100644 --- a/oneflow/core/kernel/foreign_input_kernel.cpp +++ b/oneflow/core/kernel/foreign_input_kernel.cpp @@ -16,15 +16,15 @@ limitations under the License. #include "oneflow/core/kernel/foreign_input_kernel.h" #include "oneflow/core/common/buffer_manager.h" #include "oneflow/core/register/ofblob.h" -#include "oneflow/core/job/foreign_job_instance.h" +#include "oneflow/core/job/job_instance.h" namespace oneflow { void ForeignInputKernel::ForwardDataContent( const KernelCtx& ctx, std::function<Blob*(const std::string&)> BnInOp2Blob) const { const auto& buffer_name = op_conf().foreign_input_conf().ofblob_buffer_name(); - std::shared_ptr<ForeignJobInstance> foreign_job_instance; - BufferStatus buffer_status = Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::Get() + std::shared_ptr<JobInstance> foreign_job_instance; + BufferStatus buffer_status = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get() ->Get(buffer_name) ->TryReceive(&foreign_job_instance); CHECK_NE(buffer_status, kBufferStatusEmpty); diff --git a/oneflow/core/kernel/foreign_output_kernel.cpp b/oneflow/core/kernel/foreign_output_kernel.cpp index 5d854566c..d619d2a36 100644 --- a/oneflow/core/kernel/foreign_output_kernel.cpp +++ b/oneflow/core/kernel/foreign_output_kernel.cpp @@ -16,15 +16,15 @@ limitations under the License. #include "oneflow/core/kernel/foreign_output_kernel.h" #include "oneflow/core/common/buffer_manager.h" #include "oneflow/core/register/ofblob.h" -#include "oneflow/core/job/foreign_job_instance.h" +#include "oneflow/core/job/job_instance.h" namespace oneflow { void ForeignOutputKernel::ForwardDataContent( const KernelCtx& ctx, std::function<Blob*(const std::string&)> BnInOp2Blob) const { const auto& buffer_name = op_conf().foreign_output_conf().ofblob_buffer_name(); - std::shared_ptr<ForeignJobInstance> foreign_job_instance; - BufferStatus buffer_status = Global<BufferMgr<std::shared_ptr<ForeignJobInstance>>>::Get() + std::shared_ptr<JobInstance> foreign_job_instance; + BufferStatus buffer_status = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get() ->Get(buffer_name) ->TryReceive(&foreign_job_instance); CHECK_NE(buffer_status, kBufferStatusEmpty); diff --git a/oneflow/python/framework/job_instance.py b/oneflow/python/framework/job_instance.py index 875064858..08e6ba120 100644 --- a/oneflow/python/framework/job_instance.py +++ b/oneflow/python/framework/job_instance.py @@ -73,7 +73,7 @@ def MakeJobInstance(*arg, **kw): return job_instance -class JobInstance(oneflow._oneflow_internal.ForeignJobInstance): +class JobInstance(oneflow._oneflow_internal.JobInstance): def __init__( self, job_name, @@ -83,7 +83,7 @@ class JobInstance(oneflow._oneflow_internal.ForeignJobInstance): pull_cb=None, finish_cb=None, ): - oneflow._oneflow_internal.ForeignJobInstance.__init__(self) + oneflow._oneflow_internal.JobInstance.__init__(self) self.thisown = 0 self.job_name_ = str(job_name) self.sole_input_op_name_in_user_job_ = str(sole_input_op_name_in_user_job) -- GitLab