diff --git a/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp b/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp index 2fb164d60340223392e2a76b97ba3f67b9539443..0d25c4536fd44360b0a9f12c3b75a5427347f34d 100644 --- a/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp +++ b/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp @@ -92,11 +92,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten auto infer_ctx = JUST(GetCurInferCtx()); OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf)); - const std::string& op_name = op_conf.name(); - // temp debug log - std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl - << " and the origin op_conf is :" << op_conf.DebugString(); + std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl; int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf)); const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym = @@ -113,7 +110,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten CHECK_OR_RETURN(!outputs->at(0).get()); (*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true)); - TensorNameScope::Global()->Record(outputs->at(0), op_name + "/" + obn); + TensorNameScope::Global()->Record(outputs->at(0), GenLogicalBlobName(op_conf.name(), obn)); return Maybe<void>::Ok(); } @@ -149,11 +146,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const auto infer_ctx = JUST(GetCurInferCtx()); OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf)); - const std::string& op_name = op_conf.name(); - // temp debug log - std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl - << " and the origin op_conf is :" << op_conf.DebugString(); + std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl; int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf)); const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym = @@ -171,9 +165,9 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const CHECK_OR_RETURN(!outputs->at(0).get()); (*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true)); // NOTE(chengcheng): Record variable op output LazyTenosr - TensorNameScope::Global()->Record(outputs->at(0), op_name + "/" + obn); + TensorNameScope::Global()->Record(outputs->at(0), GenLogicalBlobName(op_conf.name(), obn)); // NOTE(chengcheng): Record EagerTensor as variable tensor name - TensorNameScope::Global()->Record(input_tensor, op_name + "/" + obn); + TensorNameScope::Global()->Record(input_tensor, GenLogicalBlobName(op_conf.name(), obn)); return Maybe<void>::Ok(); } @@ -211,11 +205,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const T auto infer_ctx = JUST(GetCurInferCtx()); OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf)); - const std::string& op_name = op_conf.name(); - // temp debug log - std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl - << " and the origin op_conf is :" << op_conf.DebugString(); + std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl; int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf)); const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym = @@ -239,26 +230,63 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const T Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const OpExprInterpContext& ctx) const { CHECK_EQ_OR_RETURN(inputs.size(), op_expr.input_size()); - const auto& scope = JUST(GetCurrentScope()); auto op_conf = JUST(OpInterpUtil::GenBuiltinOpConf(op_expr, ctx.attrs)); - int64_t symbol_id = JUST(scope->symbol_id()); - op_conf->set_scope_symbol_id(symbol_id); - if (!op_conf->has_device_tag()) { - op_conf->set_device_tag(scope->device_parallel_desc_symbol()->device_tag()); - } + // TODO(chengcheng): Handle special UserOp such as: + // 1. [Source UserOp] : OFRecordReader, CoinFlip + // 2. [Change Placement/ParallelDesc UserOp] : to/to_consistent/parallel_cast + // 3. [Multi-Inputs & Different ParallelDesc for each input UserOp] : like there are 2 inputs, + // one from CPU and the other from GPU. + // ..., etc. + + const auto& scope = JUST(GetCurrentScope()); + int64_t old_scope_symbol_id = JUST(scope->symbol_id()); + // TODO(chengcheng): New parallel desc scope from all inputs tensors. + op_conf->set_scope_symbol_id(old_scope_symbol_id); + + // NOTE(chengcheng): + // Normal UserOp inputs size >= 1 for infer parallel_desc. + // if inputs size == 0, need handle in SourceUserOp impl. + CHECK_GE_OR_RETURN(inputs.size(), 1); + const std::string device_tag = GetDeviceTagOfTensor(inputs.at(0)); + op_conf->set_device_tag(device_tag); for (int i = 0; i < inputs.size(); ++i) { + const auto& input_tensor = inputs.at(i); + CHECK_OR_RETURN(device_tag == GetDeviceTagOfTensor(input_tensor)); const std::string& ibn = op_expr.indexed_ibns().at(i); - const std::string& tensor_name = TensorNameScope::Global()->Lookup(inputs[i]); - ReplaceInputLbnInOpCustomizedConf(op_conf.get(), ibn, tensor_name); - // TODO(chengcheng): check inputs tensor placement equal, and create parallel scope? or set in - // python. + const std::string& lbn = TensorNameScope::Global()->Lookup(inputs[i]); + if (lbn.empty()) { + CHECK_OR_RETURN(input_tensor->is_eager()); // NOTE(chengcheng): lazy_tensor MUST has lbn. + + // TODO(chengcheng): + // this is free EagerTensor which NOT captured by nn.Graph (inputs/params). + // Need Create a VariableOpConf for this inputs tensor, and Record name for itself. + UNIMPLEMENTED(); + } + CHECK_OR_RETURN(!lbn.empty()); // NOTE(chengcheng): lbn must not empty now. + ReplaceInputLbnInOpCustomizedConf(op_conf.get(), ibn, lbn); } - const auto& session = JUST(GetDefaultSession()); - bool is_mirrored_strategy_enabled = JUST(session->IsMirroredStrategyEnabled()); - const auto& op_attribute = - JUST(OpInterpUtil::AddOpAndInferOpAttribute(*op_conf, is_mirrored_strategy_enabled)); - OpAttribute proto_op_attribute; - op_attribute->ToProto(&proto_op_attribute); + + auto infer_ctx = JUST(GetCurInferCtx()); + // NOTE(chengcheng): MUST reset unique op name before InferCtx::AddOp + const std::string new_op_name = *JUST(infer_ctx->NewUniqueOpNameByFunctionalOpConf(*op_conf)); + + // NOTE(chengcheng): for UserOp, NOT only reset op_name, but also the output values. + op_conf->set_name(new_op_name); + for (auto& pair : *(op_conf->mutable_user_conf()->mutable_output())) { + auto& list_s = pair.second; + for (int i = 0; i < list_s.s_size(); ++i) { + std::string old_lbn = list_s.s(i); + LogicalBlobId old_lbi = GenLogicalBlobId(old_lbn); + // NOTE(chengcheng): MUST change the old_lbn to new op name. + std::string new_lbn = GenLogicalBlobName(new_op_name, old_lbi.blob_name()); + list_s.set_s(i, new_lbn); + } + } + + // temp debug log + std::cout << "cclog: Lazy nn.Graph add UserOp: " << op_conf->DebugString() << std::endl; + + OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(*op_conf)); int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(*op_conf)); const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym = @@ -268,16 +296,16 @@ Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu CHECK_EQ_OR_RETURN(outputs->size(), op_expr.output_size()); for (int i = 0; i < op_expr.output_size(); ++i) { const std::string& obn = op_expr.indexed_obns().at(i); - const auto& parallel_attr = JUST( - compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, proto_op_attribute, obn)); - const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(proto_op_attribute, obn)); + const auto& parallel_attr = + JUST(compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, op_attr, obn)); + const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(op_attr, obn)); if (!(outputs->at(i).get())) { (*outputs)[i] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true)); } else { - // TODO(hjchen2) Reset shape, dtype and so on. + // TODO(chengcheng, hjchen2) Reset shape, dtype and so on for InplaceUserOp. UNIMPLEMENTED(); } - TensorNameScope::Global()->Record(outputs->at(i), op_expr.op_name() + "/" + obn); + TensorNameScope::Global()->Record(outputs->at(i), GenLogicalBlobName(new_op_name, obn)); } return Maybe<void>::Ok(); } diff --git a/oneflow/core/job/job_build_and_infer_ctx.cpp b/oneflow/core/job/job_build_and_infer_ctx.cpp index 624f0362de93934a8e2ae61d493c30c0b62541e5..542e1b42d14b84b071389890831bd7ae514e33dc 100644 --- a/oneflow/core/job/job_build_and_infer_ctx.cpp +++ b/oneflow/core/job/job_build_and_infer_ctx.cpp @@ -1319,12 +1319,9 @@ Maybe<std::string> JobBuildAndInferCtx::NewUniqueOpNameByFunctionalOpConf( } else { op_type_name = "SystemOp"; } - std::string op_name = op_name_prefix + op_type_name + "-" + std::to_string(unique_op_name_index_); + std::string op_name = op_name_prefix + op_type_name + "_" + std::to_string(unique_op_name_index_); ++unique_op_name_index_; - // temp debug log - std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl - << " and the origin op_conf is :" << op_conf.DebugString(); return op_name; } diff --git a/oneflow/python/test/graph/test_user_op_expr.py b/oneflow/python/test/graph/test_user_op_expr.py new file mode 100644 index 0000000000000000000000000000000000000000..db486ad8fd818d8c6ab2bbfe64a9f4afc01c8244 --- /dev/null +++ b/oneflow/python/test/graph/test_user_op_expr.py @@ -0,0 +1,146 @@ +""" +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import unittest + +import numpy as np +import os + +os.environ["MASTER_ADDR"] = "127.0.0.1" +os.environ["MASTER_PORT"] = "12139" +os.environ["WORLD_SIZE"] = "1" +os.environ["RANK"] = "0" +os.environ["LOCAL_RANK"] = "0" + +import oneflow +import oneflow.experimental as flow +import oneflow.python.framework.session_context as session_ctx +import oneflow._oneflow_internal +from oneflow.python.framework.multi_client_session import MultiClientSession +import oneflow.python.framework.c_api_util as c_api_util + + +@flow.unittest.skip_unless_1n1d() +class TestUserOpGraph(unittest.TestCase): + def test_user_op_graph(test_case): + test_case.assertTrue(oneflow.distributed.is_multi_client()) + test_case.assertTrue( + oneflow.python.framework.env_util.HasAllMultiClientEnvVars() + ) + + x0 = flow.Tensor(20, 30) + weight0 = flow.Tensor(30, 50) + x1 = flow.Tensor(50, 70) + + # NOTE(chengcheng): this tiny net is: + # x0 * weight0 -> out0 + # relu(out0) -> y0 + # y0 * x1 -> out1 + # relu(out1) -> y1 + + flow.nn.init.uniform_(x0, a=-1.0, b=1.0) + flow.nn.init.uniform_(x1, a=-1.0, b=1.0) + flow.nn.init.uniform_(weight0, a=-1.0, b=1.0) + + session = session_ctx.GetDefaultSession() + test_case.assertTrue(isinstance(session, MultiClientSession)) + session.TryInit() + + with oneflow._oneflow_internal.lazy_mode.gard(True): + + oneflow._oneflow_internal.JobBuildAndInferCtx_Open( + "cc_test_user_op_expr_job" + ) + job_conf = ( + oneflow._oneflow_internal.oneflow.core.job.job_conf.JobConfigProto() + ) + job_conf.set_job_name("cc_test_user_op_expr_job") + job_conf.mutable_predict_conf() + c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_conf) + + # input_conf.set_in_0("EagerTensorInput") + # input_conf.set_out_0("out_0") + + x0_conf = ( + oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf() + ) + x0_op = oneflow._oneflow_internal.one.FeedInputOpExpr( + "cc_Input_0", x0_conf, ["in_0"], ["out_0"] + ) + x1_conf = ( + oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf() + ) + x1_op = oneflow._oneflow_internal.one.FeedInputOpExpr( + "cc_Input_1", x1_conf, ["in_0"], ["out_0"] + ) + weight0_conf = ( + oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedVariableOpConf() + ) + weight0_op = oneflow._oneflow_internal.one.FeedVariableOpExpr( + "cc_Variable_0", weight0_conf, ["in_0"], ["out_0"] + ) + output_conf = ( + oneflow._oneflow_internal.oneflow.core.operator.op_conf.FetchOutputOpConf() + ) + output_op = oneflow._oneflow_internal.one.FetchOutputOpExpr( + "cc_Output_0", output_conf, ["in_0"], ["out_0"] + ) + + attrs = oneflow._oneflow_internal.MutableCfgAttrMap() + + if not x0.is_determined: + x0.determine() + x0_tensor_in_c = x0._local_or_consistent_tensor + if not x1.is_determined: + x1.determine() + x1_tensor_in_c = x1._local_or_consistent_tensor + if not weight0.is_determined: + weight0.determine() + weight0_tensor_in_c = weight0._local_or_consistent_tensor + + x0_lazy_tensor = x0_op.apply([x0_tensor_in_c], attrs)[0] + x1_lazy_tensor = x1_op.apply([x1_tensor_in_c], attrs)[0] + weight0_lazy_tensor = weight0_op.apply([weight0_tensor_in_c], attrs)[0] + + test_case.assertEqual(x0_lazy_tensor.shape, (20, 30)) + test_case.assertTrue(x0_lazy_tensor.is_lazy) + test_case.assertEqual(weight0_lazy_tensor.shape, (30, 50)) + test_case.assertTrue(weight0_lazy_tensor.is_lazy) + test_case.assertEqual(x1_lazy_tensor.shape, (50, 70)) + test_case.assertTrue(x1_lazy_tensor.is_lazy) + + out0 = flow.F.matmul(x0_lazy_tensor, weight0_lazy_tensor) + test_case.assertEqual(out0.shape, (20, 50)) + test_case.assertTrue(out0.is_lazy) + + y0 = flow.F.relu(out0) + test_case.assertEqual(y0.shape, (20, 50)) + test_case.assertTrue(y0.is_lazy) + + out1 = flow.F.matmul(y0, x1_lazy_tensor) + test_case.assertEqual(out1.shape, (20, 70)) + test_case.assertTrue(out1.is_lazy) + + y1 = flow.F.relu(out1) + test_case.assertEqual(y1.shape, (20, 70)) + test_case.assertTrue(y1.is_lazy) + + eager_output = output_op.apply([y1], attrs)[0] + test_case.assertEqual(eager_output.shape, (20, 70)) + test_case.assertTrue(not eager_output.is_lazy) + + +if __name__ == "__main__": + unittest.main()