From 80acea4b4952d4f34f2d45d3d2e2e4fa7f41c882 Mon Sep 17 00:00:00 2001 From: cheng cheng <472491134@qq.com> Date: Mon, 19 Jul 2021 19:12:01 +0800 Subject: [PATCH] LazyInterpreter for FetchOutputOpExpr and set op parallel_distribution (#5527) * LazyInterpreter for FetchOutputOpExpr and set op parallel_distribution * Add note * refine test scripts Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com> --- .../op_interpreter/lazy_op_interpreter.cpp | 75 +++++++++++-- .../python/test/graph/test_output_op_expr.py | 100 ++++++++++++++++++ 2 files changed, 165 insertions(+), 10 deletions(-) create mode 100644 oneflow/python/test/graph/test_output_op_expr.py diff --git a/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp b/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp index 785090c9c..2fb164d60 100644 --- a/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp +++ b/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp @@ -50,7 +50,15 @@ bool GetIsDynamicOfTensor(const std::shared_ptr<Tensor>& tensor) { Maybe<void> GenParallelDistributionByTensor(ParallelDistribution* parallel_distribution, const std::shared_ptr<Tensor>& tensor) { - // TODO(chengcheng) + parallel_distribution->clear_sbp_parallel(); + if (tensor->is_local()) { + // NOTE(chengcheng): + // OneFlow Lazy is always consistent. LocalTensor is a special case of ConsistentTensor which + // placement is only this rank, and SbpParallel is Broadcast. + parallel_distribution->add_sbp_parallel()->mutable_broadcast_parallel(); + } else { + JUST(tensor->parallel_distribution())->ToProto(parallel_distribution); + } return Maybe<void>::Ok(); } @@ -79,9 +87,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten input_tensor->shape()->ToProto(blob_conf->mutable_shape()); blob_conf->set_data_type(input_tensor->dtype()); blob_conf->set_is_dynamic(GetIsDynamicOfTensor(input_tensor)); - if (input_tensor->is_consistent()) { - JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor)); - } + JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor)); auto infer_ctx = JUST(GetCurInferCtx()); OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf)); @@ -136,9 +142,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const // NOTE(chengcheng): VariableOpConf initializer_conf is useless because variable is inited // by EagerTensor. var_conf->mutable_initializer()->mutable_empty_conf(); - if (input_tensor->is_consistent()) { - // TODO(chengcheng): GenerateParallelDistributionString by tensor. - } + // TODO(chengcheng): GenerateParallelDistributionString by tensor. if (!input_tensor->requires_grad()) { var_conf->set_trainable(false); } // TODO(chengcheng, xuxiaoyu): Set L1/L2 RegularizerConf by nn.Graph Optimizer @@ -175,9 +179,60 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const TensorTuple& inputs, TensorTuple* outputs, const OpExprInterpContext& ctx) const { - // TODO(chengcheng) - OF_UNIMPLEMENTED() << "The type " << op_expr.op_type_name() - << " has not been supported in LazyInterpreter::Apply."; + // NOTE(chengcheng): inputs[0] is the LazyTensor + CHECK_EQ_OR_RETURN(inputs.size(), 1); + CHECK_EQ_OR_RETURN(op_expr.input_size(), 1); + const std::shared_ptr<Tensor>& input_tensor = inputs.at(0); + CHECK_OR_RETURN(input_tensor->is_lazy()); + // NOTE(chengcheng): Lazy always consistent. + CHECK_OR_RETURN(input_tensor->is_consistent()); + const std::string& input_lbn = TensorNameScope::Global()->Lookup(input_tensor); + CHECK_OR_RETURN(!input_lbn.empty()); // lbn must exist. + + const auto& scope = JUST(GetCurrentScope()); + int64_t scope_symbol_id = JUST(scope->symbol_id()); + + OperatorConf op_conf; + op_conf.set_name(op_expr.op_name()); // construct by python nn.Graph + op_conf.set_scope_symbol_id(scope_symbol_id); // TODO(chengcheng): NewScope by cur scope. + op_conf.set_device_tag(GetDeviceTagOfTensor(input_tensor)); + // NOTE(chengcheng): + // We contruct OutputOpConf instead of FetchOutputOpConf because FetchOutputOpExpr JUST + // for get nn.Graph output LazyTensor. + OutputOpConf* output_conf = op_conf.mutable_output_conf(); + output_conf->set_in(input_lbn); + output_conf->set_out("out"); + InterfaceBlobConf* blob_conf = output_conf->mutable_blob_conf(); + input_tensor->shape()->ToProto(blob_conf->mutable_shape()); + blob_conf->set_data_type(input_tensor->dtype()); + blob_conf->set_is_dynamic(GetIsDynamicOfTensor(input_tensor)); + JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor)); + + auto infer_ctx = JUST(GetCurInferCtx()); + OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf)); + + const std::string& op_name = op_conf.name(); + + // temp debug log + std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl + << " and the origin op_conf is :" << op_conf.DebugString(); + + int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf)); + const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym = + JUST(GetSymbol<cfg::ParallelConf, ParallelDesc>(parallel_desc_sym_id)); + + // Check outputs num and setup output tensor properties. + CHECK_EQ_OR_RETURN(outputs->size(), 1); + CHECK_EQ_OR_RETURN(op_expr.output_size(), 1); + + const std::string obn = "out"; // NOTE(chengcheng): obn is NOT op_expr.indexed_obns + const auto& parallel_attr = + JUST(compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, op_attr, obn)); + const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(op_attr, obn)); + + CHECK_OR_RETURN(!outputs->at(0).get()); + // TODO(chengcheng): Build EagerLocalTensor if parllel attr is this rank. + (*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/false)); return Maybe<void>::Ok(); } diff --git a/oneflow/python/test/graph/test_output_op_expr.py b/oneflow/python/test/graph/test_output_op_expr.py new file mode 100644 index 000000000..5a20e57ec --- /dev/null +++ b/oneflow/python/test/graph/test_output_op_expr.py @@ -0,0 +1,100 @@ +""" +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import unittest + +import numpy as np +import os + +os.environ["MASTER_ADDR"] = "127.0.0.1" +os.environ["MASTER_PORT"] = "12139" +os.environ["WORLD_SIZE"] = "1" +os.environ["RANK"] = "0" +os.environ["LOCAL_RANK"] = "0" + +import oneflow +import oneflow.experimental as flow +import oneflow.python.framework.session_context as session_ctx +import oneflow._oneflow_internal +from oneflow.python.framework.multi_client_session import MultiClientSession +import oneflow.python.framework.c_api_util as c_api_util + + +@flow.unittest.skip_unless_1n1d() +class TestFetchOutputTensor(unittest.TestCase): + def test_fetch_output_tensor(test_case): + test_case.assertTrue(oneflow.distributed.is_multi_client()) + test_case.assertTrue( + oneflow.python.framework.env_util.HasAllMultiClientEnvVars() + ) + + x = flow.Tensor(1, 1, 10, 10) + flow.nn.init.uniform_(x, a=-1.0, b=1.0) + + session = session_ctx.GetDefaultSession() + test_case.assertTrue(isinstance(session, MultiClientSession)) + session.TryInit() + + with oneflow._oneflow_internal.lazy_mode.gard(True): + + oneflow._oneflow_internal.JobBuildAndInferCtx_Open( + "cc_test_output_op_expr_job" + ) + job_conf = ( + oneflow._oneflow_internal.oneflow.core.job.job_conf.JobConfigProto() + ) + job_conf.set_job_name("cc_test_output_op_expr_job") + job_conf.mutable_predict_conf() + c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_conf) + + attrs = oneflow._oneflow_internal.MutableCfgAttrMap() + + input_conf = ( + oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf() + ) + input_conf.set_in_0("EagerTensorInput") + input_conf.set_out_0("out_0") + input_op = oneflow._oneflow_internal.one.FeedInputOpExpr( + "cc_Input_0", input_conf, ["in_0"], ["out_0"] + ) + + output_conf = ( + oneflow._oneflow_internal.oneflow.core.operator.op_conf.FetchOutputOpConf() + ) + output_conf.set_in_0( + "LazyTensorInput" + ) # don't care lbn of feed/fetch op conf + output_conf.set_out_0("out_0") + output_op = oneflow._oneflow_internal.one.FetchOutputOpExpr( + "cc_Output_0", output_conf, ["in_0"], ["out_0"] + ) + + if not x.is_determined: + x.determine() + x_tensor_in_c = x._local_or_consistent_tensor + + lazy_tensor = input_op.apply([x_tensor_in_c], attrs)[0] + test_case.assertEqual(lazy_tensor.shape, (1, 1, 10, 10)) + test_case.assertTrue(lazy_tensor.is_lazy) + test_case.assertTrue(lazy_tensor.is_consistent) + + eager_tensor = output_op.apply([lazy_tensor], attrs)[0] + test_case.assertEqual(eager_tensor.shape, (1, 1, 10, 10)) + test_case.assertTrue(not eager_tensor.is_lazy) + test_case.assertTrue(eager_tensor.is_consistent) + + +if __name__ == "__main__": + unittest.main() -- GitLab