Skip to content
Snippets Groups Projects
Unverified Commit c6b47a3e authored by cheng cheng's avatar cheng cheng Committed by GitHub
Browse files

LazyInterpret for UserOpExpr (#5544)


* LazyInterpret::ApplyImpl for UserOpExpr

* Fix bug of LazyInterpret UserOpExpr for change output lbns

* Add test user op expr test

* fix note mistake

Co-authored-by: default avataroneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
parent ad578fb0
No related branches found
No related tags found
No related merge requests found
......@@ -92,11 +92,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));
const std::string& op_name = op_conf.name();
// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl;
int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
......@@ -113,7 +110,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten
CHECK_OR_RETURN(!outputs->at(0).get());
(*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true));
TensorNameScope::Global()->Record(outputs->at(0), op_name + "/" + obn);
TensorNameScope::Global()->Record(outputs->at(0), GenLogicalBlobName(op_conf.name(), obn));
return Maybe<void>::Ok();
}
......@@ -149,11 +146,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));
const std::string& op_name = op_conf.name();
// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl;
int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
......@@ -171,9 +165,9 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
CHECK_OR_RETURN(!outputs->at(0).get());
(*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true));
// NOTE(chengcheng): Record variable op output LazyTenosr
TensorNameScope::Global()->Record(outputs->at(0), op_name + "/" + obn);
TensorNameScope::Global()->Record(outputs->at(0), GenLogicalBlobName(op_conf.name(), obn));
// NOTE(chengcheng): Record EagerTensor as variable tensor name
TensorNameScope::Global()->Record(input_tensor, op_name + "/" + obn);
TensorNameScope::Global()->Record(input_tensor, GenLogicalBlobName(op_conf.name(), obn));
return Maybe<void>::Ok();
}
......@@ -211,11 +205,8 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const T
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));
const std::string& op_name = op_conf.name();
// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
std::cout << "cclog: Lazy nn.Graph AddOp: " << op_conf.DebugString() << std::endl;
int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
......@@ -239,26 +230,63 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const T
Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx) const {
CHECK_EQ_OR_RETURN(inputs.size(), op_expr.input_size());
const auto& scope = JUST(GetCurrentScope());
auto op_conf = JUST(OpInterpUtil::GenBuiltinOpConf(op_expr, ctx.attrs));
int64_t symbol_id = JUST(scope->symbol_id());
op_conf->set_scope_symbol_id(symbol_id);
if (!op_conf->has_device_tag()) {
op_conf->set_device_tag(scope->device_parallel_desc_symbol()->device_tag());
}
// TODO(chengcheng): Handle special UserOp such as:
// 1. [Source UserOp] : OFRecordReader, CoinFlip
// 2. [Change Placement/ParallelDesc UserOp] : to/to_consistent/parallel_cast
// 3. [Multi-Inputs & Different ParallelDesc for each input UserOp] : like there are 2 inputs,
// one from CPU and the other from GPU.
// ..., etc.
const auto& scope = JUST(GetCurrentScope());
int64_t old_scope_symbol_id = JUST(scope->symbol_id());
// TODO(chengcheng): New parallel desc scope from all inputs tensors.
op_conf->set_scope_symbol_id(old_scope_symbol_id);
// NOTE(chengcheng):
// Normal UserOp inputs size >= 1 for infer parallel_desc.
// if inputs size == 0, need handle in SourceUserOp impl.
CHECK_GE_OR_RETURN(inputs.size(), 1);
const std::string device_tag = GetDeviceTagOfTensor(inputs.at(0));
op_conf->set_device_tag(device_tag);
for (int i = 0; i < inputs.size(); ++i) {
const auto& input_tensor = inputs.at(i);
CHECK_OR_RETURN(device_tag == GetDeviceTagOfTensor(input_tensor));
const std::string& ibn = op_expr.indexed_ibns().at(i);
const std::string& tensor_name = TensorNameScope::Global()->Lookup(inputs[i]);
ReplaceInputLbnInOpCustomizedConf(op_conf.get(), ibn, tensor_name);
// TODO(chengcheng): check inputs tensor placement equal, and create parallel scope? or set in
// python.
const std::string& lbn = TensorNameScope::Global()->Lookup(inputs[i]);
if (lbn.empty()) {
CHECK_OR_RETURN(input_tensor->is_eager()); // NOTE(chengcheng): lazy_tensor MUST has lbn.
// TODO(chengcheng):
// this is free EagerTensor which NOT captured by nn.Graph (inputs/params).
// Need Create a VariableOpConf for this inputs tensor, and Record name for itself.
UNIMPLEMENTED();
}
CHECK_OR_RETURN(!lbn.empty()); // NOTE(chengcheng): lbn must not empty now.
ReplaceInputLbnInOpCustomizedConf(op_conf.get(), ibn, lbn);
}
const auto& session = JUST(GetDefaultSession());
bool is_mirrored_strategy_enabled = JUST(session->IsMirroredStrategyEnabled());
const auto& op_attribute =
JUST(OpInterpUtil::AddOpAndInferOpAttribute(*op_conf, is_mirrored_strategy_enabled));
OpAttribute proto_op_attribute;
op_attribute->ToProto(&proto_op_attribute);
auto infer_ctx = JUST(GetCurInferCtx());
// NOTE(chengcheng): MUST reset unique op name before InferCtx::AddOp
const std::string new_op_name = *JUST(infer_ctx->NewUniqueOpNameByFunctionalOpConf(*op_conf));
// NOTE(chengcheng): for UserOp, NOT only reset op_name, but also the output values.
op_conf->set_name(new_op_name);
for (auto& pair : *(op_conf->mutable_user_conf()->mutable_output())) {
auto& list_s = pair.second;
for (int i = 0; i < list_s.s_size(); ++i) {
std::string old_lbn = list_s.s(i);
LogicalBlobId old_lbi = GenLogicalBlobId(old_lbn);
// NOTE(chengcheng): MUST change the old_lbn to new op name.
std::string new_lbn = GenLogicalBlobName(new_op_name, old_lbi.blob_name());
list_s.set_s(i, new_lbn);
}
}
// temp debug log
std::cout << "cclog: Lazy nn.Graph add UserOp: " << op_conf->DebugString() << std::endl;
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(*op_conf));
int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(*op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
......@@ -268,16 +296,16 @@ Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu
CHECK_EQ_OR_RETURN(outputs->size(), op_expr.output_size());
for (int i = 0; i < op_expr.output_size(); ++i) {
const std::string& obn = op_expr.indexed_obns().at(i);
const auto& parallel_attr = JUST(
compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, proto_op_attribute, obn));
const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(proto_op_attribute, obn));
const auto& parallel_attr =
JUST(compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, op_attr, obn));
const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(op_attr, obn));
if (!(outputs->at(i).get())) {
(*outputs)[i] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/true));
} else {
// TODO(hjchen2) Reset shape, dtype and so on.
// TODO(chengcheng, hjchen2) Reset shape, dtype and so on for InplaceUserOp.
UNIMPLEMENTED();
}
TensorNameScope::Global()->Record(outputs->at(i), op_expr.op_name() + "/" + obn);
TensorNameScope::Global()->Record(outputs->at(i), GenLogicalBlobName(new_op_name, obn));
}
return Maybe<void>::Ok();
}
......
......@@ -1319,12 +1319,9 @@ Maybe<std::string> JobBuildAndInferCtx::NewUniqueOpNameByFunctionalOpConf(
} else {
op_type_name = "SystemOp";
}
std::string op_name = op_name_prefix + op_type_name + "-" + std::to_string(unique_op_name_index_);
std::string op_name = op_name_prefix + op_type_name + "_" + std::to_string(unique_op_name_index_);
++unique_op_name_index_;
// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
return op_name;
}
......
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
import numpy as np
import os
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "12139"
os.environ["WORLD_SIZE"] = "1"
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"
import oneflow
import oneflow.experimental as flow
import oneflow.python.framework.session_context as session_ctx
import oneflow._oneflow_internal
from oneflow.python.framework.multi_client_session import MultiClientSession
import oneflow.python.framework.c_api_util as c_api_util
@flow.unittest.skip_unless_1n1d()
class TestUserOpGraph(unittest.TestCase):
def test_user_op_graph(test_case):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(
oneflow.python.framework.env_util.HasAllMultiClientEnvVars()
)
x0 = flow.Tensor(20, 30)
weight0 = flow.Tensor(30, 50)
x1 = flow.Tensor(50, 70)
# NOTE(chengcheng): this tiny net is:
# x0 * weight0 -> out0
# relu(out0) -> y0
# y0 * x1 -> out1
# relu(out1) -> y1
flow.nn.init.uniform_(x0, a=-1.0, b=1.0)
flow.nn.init.uniform_(x1, a=-1.0, b=1.0)
flow.nn.init.uniform_(weight0, a=-1.0, b=1.0)
session = session_ctx.GetDefaultSession()
test_case.assertTrue(isinstance(session, MultiClientSession))
session.TryInit()
with oneflow._oneflow_internal.lazy_mode.gard(True):
oneflow._oneflow_internal.JobBuildAndInferCtx_Open(
"cc_test_user_op_expr_job"
)
job_conf = (
oneflow._oneflow_internal.oneflow.core.job.job_conf.JobConfigProto()
)
job_conf.set_job_name("cc_test_user_op_expr_job")
job_conf.mutable_predict_conf()
c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_conf)
# input_conf.set_in_0("EagerTensorInput")
# input_conf.set_out_0("out_0")
x0_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf()
)
x0_op = oneflow._oneflow_internal.one.FeedInputOpExpr(
"cc_Input_0", x0_conf, ["in_0"], ["out_0"]
)
x1_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf()
)
x1_op = oneflow._oneflow_internal.one.FeedInputOpExpr(
"cc_Input_1", x1_conf, ["in_0"], ["out_0"]
)
weight0_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedVariableOpConf()
)
weight0_op = oneflow._oneflow_internal.one.FeedVariableOpExpr(
"cc_Variable_0", weight0_conf, ["in_0"], ["out_0"]
)
output_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FetchOutputOpConf()
)
output_op = oneflow._oneflow_internal.one.FetchOutputOpExpr(
"cc_Output_0", output_conf, ["in_0"], ["out_0"]
)
attrs = oneflow._oneflow_internal.MutableCfgAttrMap()
if not x0.is_determined:
x0.determine()
x0_tensor_in_c = x0._local_or_consistent_tensor
if not x1.is_determined:
x1.determine()
x1_tensor_in_c = x1._local_or_consistent_tensor
if not weight0.is_determined:
weight0.determine()
weight0_tensor_in_c = weight0._local_or_consistent_tensor
x0_lazy_tensor = x0_op.apply([x0_tensor_in_c], attrs)[0]
x1_lazy_tensor = x1_op.apply([x1_tensor_in_c], attrs)[0]
weight0_lazy_tensor = weight0_op.apply([weight0_tensor_in_c], attrs)[0]
test_case.assertEqual(x0_lazy_tensor.shape, (20, 30))
test_case.assertTrue(x0_lazy_tensor.is_lazy)
test_case.assertEqual(weight0_lazy_tensor.shape, (30, 50))
test_case.assertTrue(weight0_lazy_tensor.is_lazy)
test_case.assertEqual(x1_lazy_tensor.shape, (50, 70))
test_case.assertTrue(x1_lazy_tensor.is_lazy)
out0 = flow.F.matmul(x0_lazy_tensor, weight0_lazy_tensor)
test_case.assertEqual(out0.shape, (20, 50))
test_case.assertTrue(out0.is_lazy)
y0 = flow.F.relu(out0)
test_case.assertEqual(y0.shape, (20, 50))
test_case.assertTrue(y0.is_lazy)
out1 = flow.F.matmul(y0, x1_lazy_tensor)
test_case.assertEqual(out1.shape, (20, 70))
test_case.assertTrue(out1.is_lazy)
y1 = flow.F.relu(out1)
test_case.assertEqual(y1.shape, (20, 70))
test_case.assertTrue(y1.is_lazy)
eager_output = output_op.apply([y1], attrs)[0]
test_case.assertEqual(eager_output.shape, (20, 70))
test_case.assertTrue(not eager_output.is_lazy)
if __name__ == "__main__":
unittest.main()
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment