diff --git a/oneflow/core/common/buffer_manager.h b/oneflow/core/common/buffer_manager.h index ec6e1903bd1e32084e0740a26d44ac07c7fe13c7..d386b714513b3012c3097abca037ccb7a1e06d60 100644 --- a/oneflow/core/common/buffer_manager.h +++ b/oneflow/core/common/buffer_manager.h @@ -31,7 +31,9 @@ class BufferMgr final { CHECK(name2buffer_.emplace(buffer_name, std::make_unique<Buffer<T>>(buffer_size)).second); } Buffer<T>* Get(const std::string& buffer_name) const { - return name2buffer_.at(buffer_name).get(); + const auto& iter = name2buffer_.find(buffer_name); + CHECK(iter != name2buffer_.end()) << "buffer_name: " << buffer_name; + return iter->second.get(); } private: @@ -58,6 +60,21 @@ inline std::string GetForeignOutputBufferName(const std::string& job_name) { return prefix + job_name; } +inline std::string GetInputBufferName(const std::string& job_name, const std::string& op_name) { + static const std::string prefix = "ForeignInput-"; + return prefix + job_name + "-" + op_name; +} + +inline std::string GetOutputBufferName(const std::string& job_name, const std::string& op_name) { + static const std::string prefix = "ForeignOutput-"; + return prefix + job_name + "-" + op_name; +} + +inline std::string GetSourceTickBufferName(const std::string& job_name) { + static const std::string prefix = "SourceTick-"; + return prefix + job_name; +} + } // namespace oneflow #endif // ONEFLOW_CORE_COMMON_BUFFER_MANAGER_H_ diff --git a/oneflow/core/eager/lazy_job_device_context.h b/oneflow/core/eager/lazy_job_device_context.h new file mode 100644 index 0000000000000000000000000000000000000000..4879de509674ad4a6f4563a922c35103047d6c1c --- /dev/null +++ b/oneflow/core/eager/lazy_job_device_context.h @@ -0,0 +1,98 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef ONEFLOW_CORE_EAGER_LAZY_JOB_DEVICE_CONTEXT_H_ +#define ONEFLOW_CORE_EAGER_LAZY_JOB_DEVICE_CONTEXT_H_ + +#include "oneflow/core/framework/nn_graph_if.h" +#include "oneflow/core/common/util.h" +#include "oneflow/core/device/device_context.h" + +namespace oneflow { + +namespace vm { + +class LazyJobDeviceCtx final : public DeviceCtx { + public: + OF_DISALLOW_COPY_AND_MOVE(LazyJobDeviceCtx); + LazyJobDeviceCtx() = default; + ~LazyJobDeviceCtx() override = default; + +#ifdef WITH_CUDA + const cudaStream_t& cuda_stream() const override { + UNIMPLEMENTED(); + return *(const cudaStream_t*)nullptr; + } + const cublasHandle_t& cublas_pmh_handle() const override { + UNIMPLEMENTED(); + return *(const cublasHandle_t*)nullptr; + } + const cublasHandle_t& cublas_tensor_op_math_handle() const override { + UNIMPLEMENTED(); + return *(const cublasHandle_t*)nullptr; + } + const cublasHandle_t& cublas_pmd_handle() const override { + UNIMPLEMENTED(); + return *(const cublasHandle_t*)nullptr; + } + const cudnnHandle_t& cudnn_handle() const override { + UNIMPLEMENTED(); + return *(const cudnnHandle_t*)nullptr; + } +#endif + + void SyncDevice() override { UNIMPLEMENTED(); } + + void AddCallBack(std::function<void()> callback) const override { UNIMPLEMENTED(); } + + vm::Allocator* mut_allocator() override { + UNIMPLEMENTED(); + return (vm::Allocator*)nullptr; + } + + std::queue<std::weak_ptr<NNGraphIf>>* mut_queue() { return &queue_; } + std::mutex* mut_mutex() { return &mutex_; } + std::condition_variable* mut_cond() { return &cond_; } + + void WaitUntilQueueEmptyIfFrontNNGraphNotEquals(const std::shared_ptr<NNGraphIf>& nn_graph) { + std::unique_lock<std::mutex> lock(mutex_); + if (queue_.empty()) { return; } + const auto& last_nn_graph = queue_.front().lock(); + if (!last_nn_graph) { return; } + if (last_nn_graph == nn_graph) { return; } + cond_.wait(lock, [this]() { return queue_.empty(); }); + } + + void EnqueueNNGraph(const std::shared_ptr<NNGraphIf>& nn_graph) { + std::unique_lock<std::mutex> lock(mutex_); + queue_.emplace(nn_graph); + } + + void DequeueNNGraph() { + std::unique_lock<std::mutex> lock(mutex_); + queue_.pop(); + cond_.notify_all(); + } + + private: + std::queue<std::weak_ptr<NNGraphIf>> queue_; + std::mutex mutex_; + std::condition_variable cond_; +}; + +} // namespace vm +} // namespace oneflow + +#endif // ONEFLOW_CORE_EAGER_LAZY_JOB_DEVICE_CONTEXT_H_ diff --git a/oneflow/core/eager/lazy_job_instruction_type.cpp b/oneflow/core/eager/lazy_job_instruction_type.cpp new file mode 100644 index 0000000000000000000000000000000000000000..73b40f30e89dcaf0cfe36851f822dabd917d75b5 --- /dev/null +++ b/oneflow/core/eager/lazy_job_instruction_type.cpp @@ -0,0 +1,166 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "oneflow/core/eager/lazy_job_stream_type.h" +#include "oneflow/core/eager/lazy_job_device_context.h" +#include "oneflow/core/eager/run_lazy_job_phy_instr_operand.h" +#include "oneflow/core/framework/nn_graph_if.h" +#include "oneflow/core/common/container_util.h" +#include "oneflow/core/vm/instruction.msg.h" +#include "oneflow/core/vm/instruction_type.h" +#include "oneflow/core/job/job_instance.h" +#include "oneflow/core/common/buffer_manager.h" +#include "oneflow/core/common/global.h" +#include "oneflow/core/vm/stream.msg.h" +#include "oneflow/core/vm/thread_ctx.msg.h" +#include "oneflow/core/register/ofblob.h" +#include "oneflow/core/vm/naive_instruction_status_querier.h" + +namespace oneflow { + +namespace { + +class LazyJobInstance final : public JobInstance { + public: + LazyJobInstance(const LazyJobInstance&) = delete; + LazyJobInstance(LazyJobInstance&&) = delete; + ~LazyJobInstance() override = default; + LazyJobInstance(const std::string& job_name, + const HashMap<std::string, std::function<void(int64_t)>>& push_cbs, + const HashMap<std::string, std::function<void(int64_t)>>& pull_cbs, + const std::function<void()> finish_cb) + : job_name_(job_name), push_cbs_(push_cbs), pull_cbs_(pull_cbs), finish_cb_(finish_cb) {} + + std::string job_name() const override { return job_name_; } + void PushBlobByOpName(uint64_t ofblob_ptr, const std::string& op_name) const override { + const auto& push_cb = CHECK_JUST(MapAt(push_cbs_, op_name)); + return push_cb(ofblob_ptr); + } + void PullBlobByOpName(uint64_t ofblob_ptr, const std::string& op_name) const override { + const auto& pull_cb = CHECK_JUST(MapAt(pull_cbs_, op_name)); + return pull_cb(ofblob_ptr); + } + void Finish() const override { finish_cb_(); } + + std::string sole_input_op_name_in_user_job() const override { + UNIMPLEMENTED(); + return std::string(); + } + std::string sole_output_op_name_in_user_job() const override { + UNIMPLEMENTED(); + return std::string(); + } + void PushBlob(uint64_t ofblob_ptr) const override { UNIMPLEMENTED(); } + void PullBlob(uint64_t ofblob_ptr) const override { UNIMPLEMENTED(); } + + private: + const std::string job_name_; + const HashMap<std::string, std::function<void(int64_t)>> push_cbs_; + const HashMap<std::string, std::function<void(int64_t)>> pull_cbs_; + const std::function<void()> finish_cb_; +}; + +} // namespace + +namespace vm { + +class RunLazyJobInstructionType final : public InstructionType { + public: + RunLazyJobInstructionType(const RunLazyJobInstructionType&) = delete; + RunLazyJobInstructionType(RunLazyJobInstructionType&&) = delete; + RunLazyJobInstructionType() = default; + ~RunLazyJobInstructionType() = default; + using stream_type = LazyJobStreamType; + void Infer(vm::Instruction* instruction) const override { UNIMPLEMENTED(); } + void Compute(vm::Instruction* instruction) const override { + const auto& cur_nn_graph = GetCurNNGraph(instruction); + auto* device_ctx = GetLazyJobDeviceCtx(instruction); + + device_ctx->WaitUntilQueueEmptyIfFrontNNGraphNotEquals(cur_nn_graph); + { + const auto& job_instance = MakeJobInstance(instruction); + const auto& job_name = job_instance->job_name(); + auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get(); + for (const auto& op_name : cur_nn_graph->inputs_op_names()) { + buffer_mgr->Get(GetInputBufferName(job_name, op_name))->Send(job_instance); + } + for (const auto& op_name : cur_nn_graph->outputs_op_names()) { + buffer_mgr->Get(GetOutputBufferName(job_name, op_name))->Send(job_instance); + } + buffer_mgr->Get(GetCallbackNotifierBufferName(job_name))->Send(job_instance); + buffer_mgr->Get(GetSourceTickBufferName(job_name))->Send(job_instance); + } + device_ctx->EnqueueNNGraph(cur_nn_graph); + } + + private: + LazyJobDeviceCtx* GetLazyJobDeviceCtx(Instruction* instruction) const { + auto* stream = instruction->mut_stream(); + auto* device_ctx = dynamic_cast<LazyJobDeviceCtx*>(stream->device_ctx().get()); + CHECK_NOTNULL(device_ctx); + return device_ctx; + } + + std::shared_ptr<NNGraphIf> GetCurNNGraph(Instruction* instruction) const { + const auto* ptr = instruction->instr_msg().phy_instr_operand().get(); + const auto* phy_instr_operand = dynamic_cast<const RunLazyJobPhyInstrOperand*>(ptr); + CHECK_NOTNULL(phy_instr_operand); + return phy_instr_operand->nn_graph(); + } + + std::shared_ptr<LazyJobInstance> MakeJobInstance(Instruction* instruction) const { + const auto* ptr = instruction->instr_msg().phy_instr_operand().get(); + const auto* phy_instr_operand = dynamic_cast<const RunLazyJobPhyInstrOperand*>(ptr); + CHECK_NOTNULL(phy_instr_operand); + const auto& nn_graph = phy_instr_operand->nn_graph(); + HashMap<std::string, std::function<void(int64_t)>> push_cbs; + CHECK_EQ(nn_graph->inputs_op_names().size(), phy_instr_operand->inputs()->size()); + for (int i = 0; i < nn_graph->inputs_op_names().size(); ++i) { + const auto& op_name = nn_graph->inputs_op_names().at(i); + const auto* blob = &phy_instr_operand->inputs()->at(i)->blob(); + const auto& PushCb = [blob](int64_t of_blob_ptr) { + OfBlob* of_blob = reinterpret_cast<OfBlob*>(of_blob_ptr); + of_blob->mut_blob()->CopyHeaderFrom(of_blob->mut_device_ctx(), blob); + of_blob->mut_blob()->CopyDataContentFrom(of_blob->mut_device_ctx(), blob); + }; + CHECK(push_cbs.emplace(op_name, PushCb).second); + } + HashMap<std::string, std::function<void(int64_t)>> pull_cbs; + CHECK_EQ(nn_graph->outputs_op_names().size(), phy_instr_operand->outputs()->size()); + for (int i = 0; i < nn_graph->outputs_op_names().size(); ++i) { + const auto& op_name = nn_graph->outputs_op_names().at(i); + auto* mut_blob = phy_instr_operand->outputs()->at(i)->mut_blob(); + const auto& PullCb = [mut_blob](int64_t of_blob_ptr) { + OfBlob* of_blob = reinterpret_cast<OfBlob*>(of_blob_ptr); + mut_blob->CopyHeaderFrom(of_blob->mut_device_ctx(), &of_blob->blob()); + mut_blob->CopyDataContentFrom(of_blob->mut_device_ctx(), &of_blob->blob()); + }; + CHECK(pull_cbs.emplace(op_name, PullCb).second); + } + const auto& FinishCb = [this, instruction]() { + auto* device_ctx = GetLazyJobDeviceCtx(instruction); + device_ctx->DequeueNNGraph(); + auto* status_buffer = instruction->mut_status_buffer(); + NaiveInstrStatusQuerier::MutCast(status_buffer->mut_buffer()->mut_data())->set_done(); + }; + return std::make_shared<LazyJobInstance>(nn_graph->job_name(), push_cbs, pull_cbs, FinishCb); + } +}; + +COMMAND(RegisterInstructionType<RunLazyJobInstructionType>("RunLazyJob")); + +} // namespace vm +} // namespace oneflow diff --git a/oneflow/core/eager/lazy_job_instruction_type_test.cpp b/oneflow/core/eager/lazy_job_instruction_type_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f8fa7dcf319d8d3db4cdb58e361eb685ec218282 --- /dev/null +++ b/oneflow/core/eager/lazy_job_instruction_type_test.cpp @@ -0,0 +1,255 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// include sstream first to avoid some compiling error +// caused by the following trick +// reference: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=65899 +#include <sstream> +#define private public +#include <thread> +#include <chrono> +#include <atomic> +#include "oneflow/core/control/ctrl_bootstrap.pb.h" +#include "oneflow/core/vm/id_util.h" +#include "oneflow/core/vm/virtual_machine.msg.h" +#include "oneflow/core/vm/vm_desc.msg.h" +#include "oneflow/core/vm/vm_util.h" +#include "oneflow/core/vm/test_util.h" +#include "oneflow/core/vm/stream_type.h" +#include "oneflow/core/vm/instruction_type.h" +#include "oneflow/core/vm/string_object.h" +#include "oneflow/core/vm/test_util.h" +#include "oneflow/core/framework/instructions_builder.h" +#include "oneflow/core/common/buffer_manager.h" +#include "oneflow/core/common/buffer.h" +#include "oneflow/core/job/job_instance.h" + +namespace oneflow { +namespace vm { +namespace test { + +namespace { + +void InitNumProcessPerNode() { + Global<NumProcessPerNode>::New(); + Global<NumProcessPerNode>::Get()->set_value(1); +} + +void DestroyNumProcessPerNode() { Global<NumProcessPerNode>::Delete(); } + +} // namespace + +using InstructionMsgList = OBJECT_MSG_LIST(vm::InstructionMsg, instr_msg_link); + +class NoArgNoRetMockNNGraph : public NNGraphIf { + public: + NoArgNoRetMockNNGraph(const std::string& job_name) : job_name_(job_name) {} + ~NoArgNoRetMockNNGraph() override = default; + + const std::string& job_name() const override { return job_name_; } + const std::vector<std::string>& inputs_op_names() const override { + static std::vector<std::string> empty; + return empty; + } + const std::vector<std::string>& outputs_op_names() const override { + static std::vector<std::string> empty; + return empty; + } + + private: + const std::string job_name_; +}; + +TEST(RunLazyJobInstructionType, simple) { + InitNumProcessPerNode(); + vm::TestResourceDescScope resource_scope(0, 1); + auto vm_desc = ObjectMsgPtr<vm::VmDesc>::New(vm::TestUtil::NewVmResourceDesc().Get()); + vm::TestUtil::AddStreamDescByInstrNames(vm_desc.Mutable(), {"RunLazyJob"}); + auto vm = ObjectMsgPtr<vm::VirtualMachine>::New(vm_desc.Get()); + Global<BufferMgr<std::shared_ptr<JobInstance>>>::New(); + const std::string job_name("test_job"); + auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get(); + buffer_mgr->NewBuffer(GetSourceTickBufferName(job_name), 128); + std::thread enter_thread([&]() { + std::shared_ptr<JobInstance> test_job_instance; + auto* buffer = buffer_mgr->Get(GetSourceTickBufferName(job_name)); + while (buffer->Receive(&test_job_instance) == kBufferStatusSuccess) { + // Do nothing + } + }); + buffer_mgr->NewBuffer(GetCallbackNotifierBufferName(job_name), 128); + std::thread leave_thread([&]() { + std::shared_ptr<JobInstance> test_job_instance; + auto* buffer = buffer_mgr->Get(GetCallbackNotifierBufferName(job_name)); + while (buffer->Receive(&test_job_instance) == kBufferStatusSuccess) { + test_job_instance->Finish(); + } + }); + InstructionMsgList list; + vm::cfg::EagerSymbolList eager_symbol_list; + InstructionsBuilder instructions_builder(nullptr, &list, &eager_symbol_list); + { + static const auto& empty_list = + std::make_shared<const std::vector<std::shared_ptr<vm::EagerBlobObject>>>(); + const auto& nn_graph = std::make_shared<NoArgNoRetMockNNGraph>(job_name); + CHECK_JUST(instructions_builder.RunLazyJob(empty_list, empty_list, empty_list, nn_graph)); + CHECK_JUST(instructions_builder.RunLazyJob(empty_list, empty_list, empty_list, nn_graph)); + } + ASSERT_EQ(list.size(), 2); + vm->Receive(&list); + auto* vm_ptr = vm.Mutable(); + std::thread scheduler_thread([vm_ptr]() { + while (!vm_ptr->Empty()) { + vm_ptr->Schedule(); + OBJECT_MSG_LIST_FOR_EACH_PTR(vm_ptr->mut_thread_ctx_list(), t) { t->TryReceiveAndRun(); } + } + }); + scheduler_thread.join(); + buffer_mgr->Get(GetSourceTickBufferName(job_name))->Close(); + buffer_mgr->Get(GetCallbackNotifierBufferName(job_name))->Close(); + leave_thread.join(); + enter_thread.join(); + Global<BufferMgr<std::shared_ptr<JobInstance>>>::Delete(); + DestroyNumProcessPerNode(); +} + +TEST(RunLazyJobInstructionType, wait_for_another_job_finished) { + InitNumProcessPerNode(); + vm::TestResourceDescScope resource_scope(0, 1); + auto vm_desc = ObjectMsgPtr<vm::VmDesc>::New(vm::TestUtil::NewVmResourceDesc().Get()); + vm::TestUtil::AddStreamDescByInstrNames(vm_desc.Mutable(), {"RunLazyJob"}); + auto vm = ObjectMsgPtr<vm::VirtualMachine>::New(vm_desc.Get()); + Global<BufferMgr<std::shared_ptr<JobInstance>>>::New(); + const std::string job_name0("test_job0"); + const std::string job_name1("test_job1"); + std::atomic<bool> flag_enter_thread0(false); + std::atomic<int> count_enter_thread0(0); + auto* buffer_mgr = Global<BufferMgr<std::shared_ptr<JobInstance>>>::Get(); + buffer_mgr->NewBuffer(GetSourceTickBufferName(job_name0), 128); + buffer_mgr->NewBuffer(GetSourceTickBufferName(job_name1), 128); + std::thread enter_thread0([&]() { + while (!flag_enter_thread0) {} + std::shared_ptr<JobInstance> test_job_instance; + auto* buffer = buffer_mgr->Get(GetSourceTickBufferName(job_name0)); + while (buffer->Receive(&test_job_instance) == kBufferStatusSuccess) { ++count_enter_thread0; } + }); + std::atomic<bool> flag_enter_thread1(false); + std::atomic<int> count_enter_thread1(0); + std::thread enter_thread1([&]() { + while (!flag_enter_thread1) {} + std::shared_ptr<JobInstance> test_job_instance; + auto* buffer = buffer_mgr->Get(GetSourceTickBufferName(job_name1)); + while (buffer->Receive(&test_job_instance) == kBufferStatusSuccess) { ++count_enter_thread1; } + }); + buffer_mgr->NewBuffer(GetCallbackNotifierBufferName(job_name0), 128); + buffer_mgr->NewBuffer(GetCallbackNotifierBufferName(job_name1), 128); + std::atomic<bool> flag_leave_thread0(false); + std::atomic<int> count_leave_thread0(0); + std::thread leave_thread0([&]() { + while (!flag_leave_thread0) {} + std::shared_ptr<JobInstance> test_job_instance; + auto* buffer = buffer_mgr->Get(GetCallbackNotifierBufferName(job_name0)); + while (buffer->Receive(&test_job_instance) == kBufferStatusSuccess) { + ++count_leave_thread0; + test_job_instance->Finish(); + } + }); + std::atomic<bool> flag_leave_thread1(false); + std::atomic<int> count_leave_thread1(0); + std::thread leave_thread1([&]() { + while (!flag_leave_thread1) {} + std::shared_ptr<JobInstance> test_job_instance; + auto* buffer = buffer_mgr->Get(GetCallbackNotifierBufferName(job_name1)); + while (buffer->Receive(&test_job_instance) == kBufferStatusSuccess) { + ++count_leave_thread1; + test_job_instance->Finish(); + } + }); + buffer_mgr->NewBuffer(GetForeignInputBufferName(job_name0), 128); + buffer_mgr->NewBuffer(GetForeignInputBufferName(job_name1), 128); + buffer_mgr->NewBuffer(GetForeignOutputBufferName(job_name0), 128); + buffer_mgr->NewBuffer(GetForeignOutputBufferName(job_name1), 128); + InstructionMsgList list; + vm::cfg::EagerSymbolList eager_symbol_list; + InstructionsBuilder instructions_builder(nullptr, &list, &eager_symbol_list); + int num_job0_instance = 2; + int num_job1_instance = 3; + { + static const auto& empty_list = + std::make_shared<const std::vector<std::shared_ptr<vm::EagerBlobObject>>>(); + const auto& nn_graph0 = std::make_shared<NoArgNoRetMockNNGraph>(job_name0); + const auto& nn_graph1 = std::make_shared<NoArgNoRetMockNNGraph>(job_name1); + for (int i = 0; i < num_job0_instance; ++i) { + CHECK_JUST(instructions_builder.RunLazyJob(empty_list, empty_list, empty_list, nn_graph0)); + } + for (int i = 0; i < num_job1_instance; ++i) { + CHECK_JUST(instructions_builder.RunLazyJob(empty_list, empty_list, empty_list, nn_graph1)); + } + } + ASSERT_EQ(list.size(), num_job0_instance + num_job1_instance); + vm->Receive(&list); + ASSERT_EQ(vm->pending_msg_list().size(), num_job0_instance + num_job1_instance); + auto* vm_ptr = vm.Mutable(); + std::thread scheduler_thread([vm_ptr]() { + while (!vm_ptr->Empty()) { + vm_ptr->Schedule(); + OBJECT_MSG_LIST_FOR_EACH_PTR(vm_ptr->mut_thread_ctx_list(), t) { t->TryReceiveAndRun(); } + } + }); + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(count_enter_thread0, 0); + ASSERT_EQ(count_leave_thread0, 0); + ASSERT_EQ(count_enter_thread1, 0); + ASSERT_EQ(count_leave_thread1, 0); + flag_enter_thread0 = true; + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(count_enter_thread0, num_job0_instance); + ASSERT_EQ(count_leave_thread0, 0); + ASSERT_EQ(count_enter_thread1, 0); + ASSERT_EQ(count_leave_thread1, 0); + flag_enter_thread1 = true; + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(count_enter_thread0, num_job0_instance); + ASSERT_EQ(count_leave_thread0, 0); + ASSERT_EQ(count_enter_thread1, 0); + ASSERT_EQ(count_leave_thread1, 0); + flag_leave_thread0 = true; + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(count_enter_thread0, num_job0_instance); + ASSERT_EQ(count_leave_thread0, num_job0_instance); + ASSERT_EQ(count_enter_thread1, num_job1_instance); + ASSERT_EQ(count_leave_thread1, 0); + flag_leave_thread1 = true; + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(count_enter_thread0, num_job0_instance); + ASSERT_EQ(count_leave_thread0, num_job0_instance); + ASSERT_EQ(count_enter_thread1, num_job1_instance); + ASSERT_EQ(count_leave_thread1, num_job1_instance); + scheduler_thread.join(); + buffer_mgr->Get(GetSourceTickBufferName(job_name0))->Close(); + buffer_mgr->Get(GetSourceTickBufferName(job_name1))->Close(); + buffer_mgr->Get(GetCallbackNotifierBufferName(job_name0))->Close(); + buffer_mgr->Get(GetCallbackNotifierBufferName(job_name1))->Close(); + leave_thread0.join(); + leave_thread1.join(); + enter_thread0.join(); + enter_thread1.join(); + Global<BufferMgr<std::shared_ptr<JobInstance>>>::Delete(); + DestroyNumProcessPerNode(); +} + +} // namespace test +} // namespace vm +} // namespace oneflow diff --git a/oneflow/core/eager/lazy_job_stream_type.cpp b/oneflow/core/eager/lazy_job_stream_type.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2a37001ae4bd4c04e4328f0306b5357803e25628 --- /dev/null +++ b/oneflow/core/eager/lazy_job_stream_type.cpp @@ -0,0 +1,68 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "oneflow/core/eager/lazy_job_stream_type.h" +#include "oneflow/core/vm/instruction_type.h" +#include "oneflow/core/vm/instruction.msg.h" +#include "oneflow/core/vm/thread_ctx.msg.h" +#include "oneflow/core/eager/lazy_job_device_context.h" +#include "oneflow/core/vm/naive_instruction_status_querier.h" +#include "oneflow/core/common/util.h" + +namespace oneflow { +namespace vm { + +void LazyJobStreamType::InitDeviceCtx(std::unique_ptr<DeviceCtx>* device_ctx, + Stream* stream) const { + device_ctx->reset(new LazyJobDeviceCtx()); +} + +void LazyJobStreamType::InitInstructionStatus(const Stream& stream, + InstructionStatusBuffer* status_buffer) const { + static_assert(sizeof(NaiveInstrStatusQuerier) < kInstructionStatusBufferBytes, ""); + NaiveInstrStatusQuerier::PlacementNew(status_buffer->mut_buffer()->mut_data()); +} + +void LazyJobStreamType::DeleteInstructionStatus(const Stream& stream, + InstructionStatusBuffer* status_buffer) const { + // do nothing +} + +bool LazyJobStreamType::QueryInstructionStatusDone( + const Stream& stream, const InstructionStatusBuffer& status_buffer) const { + return NaiveInstrStatusQuerier::Cast(status_buffer.buffer().data())->done(); +} + +void LazyJobStreamType::Compute(Instruction* instruction) const { + { + const auto& instr_type_id = instruction->mut_instr_msg()->instr_type_id(); + CHECK_EQ(instr_type_id.stream_type_id().interpret_type(), InterpretType::kCompute); + instr_type_id.instruction_type().Compute(instruction); + } +} + +ObjectMsgPtr<StreamDesc> LazyJobStreamType::MakeStreamDesc(const Resource& resource, + int64_t this_machine_id) const { + auto ret = ObjectMsgPtr<StreamDesc>::New(); + ret->mutable_stream_type_id()->__Init__(LookupStreamType4TypeIndex<LazyJobStreamType>()); + ret->set_num_machines(1); + ret->set_num_streams_per_machine(1); + ret->set_num_streams_per_thread(1); + return ret; +} + +} // namespace vm +} // namespace oneflow diff --git a/oneflow/core/eager/lazy_job_stream_type.h b/oneflow/core/eager/lazy_job_stream_type.h new file mode 100644 index 0000000000000000000000000000000000000000..8bdf4d3a4257e0862b410301cd31b5903d84078f --- /dev/null +++ b/oneflow/core/eager/lazy_job_stream_type.h @@ -0,0 +1,53 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef ONEFLOW_CORE_EAGER_LAZY_JOB_STREAM_TYPE_H_ +#define ONEFLOW_CORE_EAGER_LAZY_JOB_STREAM_TYPE_H_ + +#include "oneflow/core/object_msg/flat_msg_view.h" +#include "oneflow/core/vm/stream_type.h" +#include "oneflow/core/vm/instruction.msg.h" +#include "oneflow/core/device/device_context.h" +#include "oneflow/core/job/resource.pb.h" + +namespace oneflow { +namespace vm { + +class LazyJobStreamType final : public StreamType { + public: + LazyJobStreamType() = default; + ~LazyJobStreamType() override = default; + + const char* device_tag() const override { return "lazy_job"; } + + void InitDeviceCtx(std::unique_ptr<DeviceCtx>* device_ctx, Stream* stream) const override; + + void InitInstructionStatus(const Stream& stream, + InstructionStatusBuffer* status_buffer) const override; + void DeleteInstructionStatus(const Stream& stream, + InstructionStatusBuffer* status_buffer) const override; + bool QueryInstructionStatusDone(const Stream& stream, + const InstructionStatusBuffer& status_buffer) const override; + void Compute(Instruction* instruction) const override; + ObjectMsgPtr<StreamDesc> MakeStreamDesc(const Resource& resource, + int64_t this_machine_id) const override; + bool SharingVirtualMachineThread() const override { return false; } +}; + +} // namespace vm +} // namespace oneflow + +#endif // ONEFLOW_CORE_EAGER_LAZY_JOB_STREAM_TYPE_H_ diff --git a/oneflow/core/eager/local_call_opkernel_phy_instr_operand.h b/oneflow/core/eager/local_call_opkernel_phy_instr_operand.h index a4aa14f734efd484523a5b4d0188c40921206c81..9867befb29c6688e4b4dbc15923a90c6837da6d7 100644 --- a/oneflow/core/eager/local_call_opkernel_phy_instr_operand.h +++ b/oneflow/core/eager/local_call_opkernel_phy_instr_operand.h @@ -22,7 +22,6 @@ limitations under the License. #include "oneflow/core/vm/instruction_operand.msg.h" namespace oneflow { - namespace one { class StatefulLocalOpKernel; diff --git a/oneflow/core/eager/opkernel_instruction_type.cpp b/oneflow/core/eager/opkernel_instruction_type.cpp index 58b3b676bce9e875710b683a43b3ea5775bbe233..50012a6be195a499b102838ffb013ad23ddff4e7 100644 --- a/oneflow/core/eager/opkernel_instruction_type.cpp +++ b/oneflow/core/eager/opkernel_instruction_type.cpp @@ -34,11 +34,11 @@ limitations under the License. #include "oneflow/core/vm/object.h" #include "oneflow/core/framework/user_op_registry_manager.h" #include "oneflow/core/job/foreign_callback.h" +#include "oneflow/core/job/parallel_signature.cfg.h" #include "oneflow/core/register/ofblob.h" #include "oneflow/core/vm/symbol_storage.h" #include "oneflow/core/operator/op_node_signature_desc.h" #include "oneflow/core/operator/op_conf_symbol.h" -#include "oneflow/core/framework/tensor_impl.h" #include "oneflow/user/kernels/stateful_local_opkernel.h" namespace oneflow { diff --git a/oneflow/core/eager/run_lazy_job_phy_instr_operand.cpp b/oneflow/core/eager/run_lazy_job_phy_instr_operand.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5589bdbfa5fbc123d7bb6f68ea54cdd1c71d0bcd --- /dev/null +++ b/oneflow/core/eager/run_lazy_job_phy_instr_operand.cpp @@ -0,0 +1,54 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include "oneflow/core/eager/run_lazy_job_phy_instr_operand.h" + +namespace oneflow { +namespace vm { + +void RunLazyJobPhyInstrOperand::ForEachConstMirroredObject( + const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>& DoEach) + const { + for (const auto& input : *inputs()) { + DoEach(nullptr, CHECK_JUST(input->compute_local_dep_object()) + ->mut_local_dep_object() + ->mut_mirrored_object()); + } +} + +void RunLazyJobPhyInstrOperand::ForEachMutMirroredObject( + const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>& DoEach) + const { + for (const auto& parameter : *parameters()) { + DoEach(nullptr, CHECK_JUST(parameter->compute_local_dep_object()) + ->mut_local_dep_object() + ->mut_mirrored_object()); + } +} + +void RunLazyJobPhyInstrOperand::ForEachMut2MirroredObject( + const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>& DoEach) + const { + // TODO(lixinqi): move partial of outputs into ForEachMutMirroredObject if shape infered before + // compute. + for (const auto& output : *outputs()) { + DoEach(nullptr, CHECK_JUST(output->compute_local_dep_object()) + ->mut_local_dep_object() + ->mut_mirrored_object()); + } +} + +} // namespace vm +} // namespace oneflow diff --git a/oneflow/core/eager/run_lazy_job_phy_instr_operand.h b/oneflow/core/eager/run_lazy_job_phy_instr_operand.h new file mode 100644 index 0000000000000000000000000000000000000000..b4f8fc960bd04306a0b12cf0e6ec8f8065a420e1 --- /dev/null +++ b/oneflow/core/eager/run_lazy_job_phy_instr_operand.h @@ -0,0 +1,72 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef ONEFLOW_CORE_EAGER_RUN_JOB_PHY_INSTR_OPERAND_H_ +#define ONEFLOW_CORE_EAGER_RUN_JOB_PHY_INSTR_OPERAND_H_ + +#include "oneflow/core/vm/instruction_operand.msg.h" +#include "oneflow/core/eager/eager_blob_object.h" +#include "oneflow/core/framework/nn_graph_if.h" + +namespace oneflow { + +namespace one { + +using EagerBlobObjectListPtr = + std::shared_ptr<const std::vector<std::shared_ptr<vm::EagerBlobObject>>>; + +} + +namespace vm { + +class RunLazyJobPhyInstrOperand final : public PhyInstrOperand { + public: + RunLazyJobPhyInstrOperand(const RunLazyJobPhyInstrOperand&) = delete; + RunLazyJobPhyInstrOperand(RunLazyJobPhyInstrOperand&&) = delete; + ~RunLazyJobPhyInstrOperand() override = default; + + RunLazyJobPhyInstrOperand(const one::EagerBlobObjectListPtr& inputs, + const one::EagerBlobObjectListPtr& outputs, + const one::EagerBlobObjectListPtr& parameters, + const std::shared_ptr<NNGraphIf>& nn_graph) + : inputs_(inputs), outputs_(outputs), parameters_(parameters), nn_graph_(nn_graph) {} + + const one::EagerBlobObjectListPtr& inputs() const { return inputs_; } + const one::EagerBlobObjectListPtr& outputs() const { return outputs_; } + const one::EagerBlobObjectListPtr& parameters() const { return parameters_; } + const std::shared_ptr<NNGraphIf>& nn_graph() const { return nn_graph_; } + + void ForEachConstMirroredObject( + const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>&) + const override; + + void ForEachMutMirroredObject( + const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>&) + const override; + + void ForEachMut2MirroredObject( + const std::function<void(vm::MirroredObject* infer, vm::MirroredObject* compute)>&) + const override; + + private: + one::EagerBlobObjectListPtr inputs_; + one::EagerBlobObjectListPtr outputs_; + one::EagerBlobObjectListPtr parameters_; + std::shared_ptr<NNGraphIf> nn_graph_; +}; +} // namespace vm +} // namespace oneflow + +#endif // ONEFLOW_CORE_EAGER_RUN_JOB_PHY_INSTR_OPERAND_H_ diff --git a/oneflow/core/framework/instructions_builder.cpp b/oneflow/core/framework/instructions_builder.cpp index d24989defeae76628fecd41831db259941641362..037d0e40c99ecf6ff59859394147fee651930014 100644 --- a/oneflow/core/framework/instructions_builder.cpp +++ b/oneflow/core/framework/instructions_builder.cpp @@ -236,6 +236,18 @@ Maybe<int64_t> InstructionsBuilder::NewObjectId( return object_id; } +Maybe<void> InstructionsBuilder::RunLazyJob(const one::EagerBlobObjectListPtr& inputs, + const one::EagerBlobObjectListPtr& outputs, + const one::EagerBlobObjectListPtr& parameters, + const std::shared_ptr<NNGraphIf>& nn_graph) const { + static std::string instr_name("RunLazyJob"); + ObjectMsgPtr<vm::InstructionMsg> instruction = ObjectMsgPtr<vm::InstructionMsg>::New(instr_name); + *instruction->mutable_phy_instr_operand() = + std::make_shared<vm::RunLazyJobPhyInstrOperand>(inputs, outputs, parameters, nn_graph); + instruction_list_->EmplaceBack(std::move(instruction)); + return Maybe<void>::Ok(); +} + Maybe<compatible_py::BlobObject> InstructionsBuilder::PackPhysicalBlobsToLogicalBlob( const std::vector<std::shared_ptr<compatible_py::BlobObject>>& physical_blob_objects, const std::shared_ptr<compatible_py::OpArgParallelAttribute>& op_arg_parallel_attr, @@ -886,7 +898,7 @@ Maybe<void> InstructionsBuilder::ReleaseTensor( *instruction->mutable_phy_instr_operand() = std::make_shared<vm::ReleaseTensorArgPhyInstrOperand>( eager_blob_object, compute_local_dep_object); *instruction->mut_parallel_desc() = parallel_desc; - instruction_list_->EmplaceBack(std::move(instruction.Mutable())); + instruction_list_->EmplaceBack(std::move(instruction)); return Maybe<void>::Ok(); } @@ -898,7 +910,7 @@ Maybe<void> InstructionsBuilder::SoftSyncStream( *instruction->mutable_phy_instr_operand() = std::make_shared<vm::SoftSyncStreamPhyInstrOperand>(compute_local_dep_object, modifier); *instruction->mut_parallel_desc() = parallel_desc; - instruction_list_->EmplaceBack(std::move(instruction.Mutable())); + instruction_list_->EmplaceBack(std::move(instruction)); return Maybe<void>::Ok(); } @@ -929,7 +941,7 @@ Maybe<void> InstructionsBuilder::AccessBlobByCallback(const T tensor, *instruction->mutable_phy_instr_operand() = std::make_shared<vm::AccessBlobArgCbPhyInstrOperand>( eager_blob_object, compute_local_dep_object, callback, modifier); *instruction->mut_parallel_desc() = parallel_desc; - instruction_list_->EmplaceBack(std::move(instruction.Mutable())); + instruction_list_->EmplaceBack(std::move(instruction)); return Maybe<void>::Ok(); } diff --git a/oneflow/core/framework/instructions_builder.h b/oneflow/core/framework/instructions_builder.h index 6f1cf94b66ce01b969b5ef4ec8196a08878fb79b..1a7997fc665521bc0651c5f81df0dd55ed4a8f3d 100644 --- a/oneflow/core/framework/instructions_builder.h +++ b/oneflow/core/framework/instructions_builder.h @@ -17,6 +17,7 @@ limitations under the License. #define ONEFLOW_CORE_FRAMEWORK_INSTRUCTIONS_BUILDER_H_ #include "oneflow/core/eager/local_call_opkernel_phy_instr_operand.h" +#include "oneflow/core/eager/run_lazy_job_phy_instr_operand.h" #include "oneflow/core/vm/instruction.cfg.h" #include "oneflow/core/vm/instruction.msg.h" #include "oneflow/core/vm/id_generator.h" @@ -47,6 +48,8 @@ class TensorTuple; class MirroredTensor; } // namespace one +class NNGraphIf; + namespace detail { template<typename T> @@ -92,6 +95,11 @@ class InstructionsBuilder : public std::enable_shared_from_this<InstructionsBuil vm::InstructionMsgList* mut_instruction_list() { return instruction_list_; } + Maybe<void> RunLazyJob(const one::EagerBlobObjectListPtr& inputs, + const one::EagerBlobObjectListPtr& outputs, + const one::EagerBlobObjectListPtr& parameters, + const std::shared_ptr<NNGraphIf>& nn_graph) const; + Maybe<compatible_py::BlobObject> PackPhysicalBlobsToLogicalBlob( const std::vector<std::shared_ptr<compatible_py::BlobObject>>& physical_blob_objects, const std::shared_ptr<compatible_py::OpArgParallelAttribute>& op_arg_parallel_attr, diff --git a/oneflow/core/job/job_instance.h b/oneflow/core/job/job_instance.h index 5be11bd8a2667f3b2ac2ffa962a32db2ab39eee0..dc9ed7c03baa44eed31c42ca317223da33dea503 100644 --- a/oneflow/core/job/job_instance.h +++ b/oneflow/core/job/job_instance.h @@ -29,6 +29,12 @@ class JobInstance { virtual std::string job_name() const { UNIMPLEMENTED(); } virtual std::string sole_input_op_name_in_user_job() const { UNIMPLEMENTED(); } virtual std::string sole_output_op_name_in_user_job() const { UNIMPLEMENTED(); } + virtual void PushBlobByOpName(uint64_t ofblob_ptr, const std::string& op_name) const { + UNIMPLEMENTED(); + } + virtual void PullBlobByOpName(uint64_t ofblob_ptr, const std::string& op_name) const { + UNIMPLEMENTED(); + } virtual void PushBlob(uint64_t ofblob_ptr) const { UNIMPLEMENTED(); } virtual void PullBlob(uint64_t ofblob_ptr) const { UNIMPLEMENTED(); } virtual void Finish() const { UNIMPLEMENTED(); } diff --git a/oneflow/core/register/ofblob.h b/oneflow/core/register/ofblob.h index 1290652215952d5a29556da3e7e6603c6c5467fd..d9e107bdbb22a56fc50b2b0923d7a8ce0f2288d1 100644 --- a/oneflow/core/register/ofblob.h +++ b/oneflow/core/register/ofblob.h @@ -32,6 +32,7 @@ class OfBlob final { } ~OfBlob() = default; + const Blob& blob() const { return *blob_; } int data_type() const { return blob_->data_type(); } size_t NumAxes() const { return blob_->shape().NumAxes(); } bool is_dynamic() const { return blob_->blob_desc().is_dynamic(); } @@ -45,6 +46,9 @@ class OfBlob final { void AutoMemCopyFrom(const T* ptr, int64_t len) const; void AsyncAutoMemset(const char value) const; + Blob* mut_blob() { return blob_; } + DeviceCtx* mut_device_ctx() { return device_ctx_; } + private: DeviceCtx* device_ctx_; Blob* blob_; diff --git a/oneflow/core/vm/cpu_stream_type.cpp b/oneflow/core/vm/cpu_stream_type.cpp index 1fad1a730fd6ff5cb01f80f671556a68445f743b..8947486905baa61e922d9d1ba4a85a561c732f69 100644 --- a/oneflow/core/vm/cpu_stream_type.cpp +++ b/oneflow/core/vm/cpu_stream_type.cpp @@ -33,8 +33,7 @@ void CpuStreamType::InitInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer) const { static_assert(sizeof(NaiveInstrStatusQuerier) < kInstructionStatusBufferBytes, ""); NaiveInstrStatusQuerier::PlacementNew(status_buffer->mut_buffer()->mut_data()); -} // namespace - // voidCpuStreamType::InitInstructionStatus(constStream&stream,InstructionStatusBuffer*status_buffer)const +} void CpuStreamType::DeleteInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer) const {