Skip to content
Snippets Groups Projects
Unverified Commit 80acea4b authored by cheng cheng's avatar cheng cheng Committed by GitHub
Browse files

LazyInterpreter for FetchOutputOpExpr and set op parallel_distribution (#5527)


* LazyInterpreter for FetchOutputOpExpr and set op parallel_distribution

* Add note

* refine test scripts

Co-authored-by: default avataroneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
parent d63ccc09
No related branches found
No related tags found
No related merge requests found
......@@ -50,7 +50,15 @@ bool GetIsDynamicOfTensor(const std::shared_ptr<Tensor>& tensor) {
Maybe<void> GenParallelDistributionByTensor(ParallelDistribution* parallel_distribution,
const std::shared_ptr<Tensor>& tensor) {
// TODO(chengcheng)
parallel_distribution->clear_sbp_parallel();
if (tensor->is_local()) {
// NOTE(chengcheng):
// OneFlow Lazy is always consistent. LocalTensor is a special case of ConsistentTensor which
// placement is only this rank, and SbpParallel is Broadcast.
parallel_distribution->add_sbp_parallel()->mutable_broadcast_parallel();
} else {
JUST(tensor->parallel_distribution())->ToProto(parallel_distribution);
}
return Maybe<void>::Ok();
}
......@@ -79,9 +87,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten
input_tensor->shape()->ToProto(blob_conf->mutable_shape());
blob_conf->set_data_type(input_tensor->dtype());
blob_conf->set_is_dynamic(GetIsDynamicOfTensor(input_tensor));
if (input_tensor->is_consistent()) {
JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor));
}
JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor));
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));
......@@ -136,9 +142,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
// NOTE(chengcheng): VariableOpConf initializer_conf is useless because variable is inited
// by EagerTensor.
var_conf->mutable_initializer()->mutable_empty_conf();
if (input_tensor->is_consistent()) {
// TODO(chengcheng): GenerateParallelDistributionString by tensor.
}
// TODO(chengcheng): GenerateParallelDistributionString by tensor.
if (!input_tensor->requires_grad()) { var_conf->set_trainable(false); }
// TODO(chengcheng, xuxiaoyu): Set L1/L2 RegularizerConf by nn.Graph Optimizer
......@@ -175,9 +179,60 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx) const {
// TODO(chengcheng)
OF_UNIMPLEMENTED() << "The type " << op_expr.op_type_name()
<< " has not been supported in LazyInterpreter::Apply.";
// NOTE(chengcheng): inputs[0] is the LazyTensor
CHECK_EQ_OR_RETURN(inputs.size(), 1);
CHECK_EQ_OR_RETURN(op_expr.input_size(), 1);
const std::shared_ptr<Tensor>& input_tensor = inputs.at(0);
CHECK_OR_RETURN(input_tensor->is_lazy());
// NOTE(chengcheng): Lazy always consistent.
CHECK_OR_RETURN(input_tensor->is_consistent());
const std::string& input_lbn = TensorNameScope::Global()->Lookup(input_tensor);
CHECK_OR_RETURN(!input_lbn.empty()); // lbn must exist.
const auto& scope = JUST(GetCurrentScope());
int64_t scope_symbol_id = JUST(scope->symbol_id());
OperatorConf op_conf;
op_conf.set_name(op_expr.op_name()); // construct by python nn.Graph
op_conf.set_scope_symbol_id(scope_symbol_id); // TODO(chengcheng): NewScope by cur scope.
op_conf.set_device_tag(GetDeviceTagOfTensor(input_tensor));
// NOTE(chengcheng):
// We contruct OutputOpConf instead of FetchOutputOpConf because FetchOutputOpExpr JUST
// for get nn.Graph output LazyTensor.
OutputOpConf* output_conf = op_conf.mutable_output_conf();
output_conf->set_in(input_lbn);
output_conf->set_out("out");
InterfaceBlobConf* blob_conf = output_conf->mutable_blob_conf();
input_tensor->shape()->ToProto(blob_conf->mutable_shape());
blob_conf->set_data_type(input_tensor->dtype());
blob_conf->set_is_dynamic(GetIsDynamicOfTensor(input_tensor));
JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor));
auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));
const std::string& op_name = op_conf.name();
// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();
int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
JUST(GetSymbol<cfg::ParallelConf, ParallelDesc>(parallel_desc_sym_id));
// Check outputs num and setup output tensor properties.
CHECK_EQ_OR_RETURN(outputs->size(), 1);
CHECK_EQ_OR_RETURN(op_expr.output_size(), 1);
const std::string obn = "out"; // NOTE(chengcheng): obn is NOT op_expr.indexed_obns
const auto& parallel_attr =
JUST(compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, op_attr, obn));
const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(op_attr, obn));
CHECK_OR_RETURN(!outputs->at(0).get());
// TODO(chengcheng): Build EagerLocalTensor if parllel attr is this rank.
(*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/false));
return Maybe<void>::Ok();
}
......
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
import numpy as np
import os
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "12139"
os.environ["WORLD_SIZE"] = "1"
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"
import oneflow
import oneflow.experimental as flow
import oneflow.python.framework.session_context as session_ctx
import oneflow._oneflow_internal
from oneflow.python.framework.multi_client_session import MultiClientSession
import oneflow.python.framework.c_api_util as c_api_util
@flow.unittest.skip_unless_1n1d()
class TestFetchOutputTensor(unittest.TestCase):
def test_fetch_output_tensor(test_case):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(
oneflow.python.framework.env_util.HasAllMultiClientEnvVars()
)
x = flow.Tensor(1, 1, 10, 10)
flow.nn.init.uniform_(x, a=-1.0, b=1.0)
session = session_ctx.GetDefaultSession()
test_case.assertTrue(isinstance(session, MultiClientSession))
session.TryInit()
with oneflow._oneflow_internal.lazy_mode.gard(True):
oneflow._oneflow_internal.JobBuildAndInferCtx_Open(
"cc_test_output_op_expr_job"
)
job_conf = (
oneflow._oneflow_internal.oneflow.core.job.job_conf.JobConfigProto()
)
job_conf.set_job_name("cc_test_output_op_expr_job")
job_conf.mutable_predict_conf()
c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_conf)
attrs = oneflow._oneflow_internal.MutableCfgAttrMap()
input_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf()
)
input_conf.set_in_0("EagerTensorInput")
input_conf.set_out_0("out_0")
input_op = oneflow._oneflow_internal.one.FeedInputOpExpr(
"cc_Input_0", input_conf, ["in_0"], ["out_0"]
)
output_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FetchOutputOpConf()
)
output_conf.set_in_0(
"LazyTensorInput"
) # don't care lbn of feed/fetch op conf
output_conf.set_out_0("out_0")
output_op = oneflow._oneflow_internal.one.FetchOutputOpExpr(
"cc_Output_0", output_conf, ["in_0"], ["out_0"]
)
if not x.is_determined:
x.determine()
x_tensor_in_c = x._local_or_consistent_tensor
lazy_tensor = input_op.apply([x_tensor_in_c], attrs)[0]
test_case.assertEqual(lazy_tensor.shape, (1, 1, 10, 10))
test_case.assertTrue(lazy_tensor.is_lazy)
test_case.assertTrue(lazy_tensor.is_consistent)
eager_tensor = output_op.apply([lazy_tensor], attrs)[0]
test_case.assertEqual(eager_tensor.shape, (1, 1, 10, 10))
test_case.assertTrue(not eager_tensor.is_lazy)
test_case.assertTrue(eager_tensor.is_consistent)
if __name__ == "__main__":
unittest.main()
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment