Skip to content
Snippets Groups Projects
Unverified Commit 00f12305 authored by binbinHan's avatar binbinHan Committed by GitHub
Browse files

flow.S/B/P (#5306)


* flow.S/B/P

* optimize

* optimize

* fix according comment

* add attr in experimental namespace

* optimize

* oneflow_export_value

* refine

* refine

* refine

* customized_symbol module

* refine

* fix bug

* Add new placement init func (#5408)

* add_new_placement_init_func

* flow.env.all_device_placement

* refine

* refine

* refine

* refine

* refine

* refine

* refine

* refine

* refine

Co-authored-by: default avataroneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
parent 3e8d3a1d
No related branches found
No related tags found
No related merge requests found
......@@ -17,10 +17,14 @@ limitations under the License.
#include <pybind11/stl.h>
#include <pybind11/operators.h>
#include "oneflow/api/python/of_api_registry.h"
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/common/symbol.h"
#include "oneflow/core/framework/instructions_builder.h"
#include "oneflow/core/framework/parallel_conf_util.h"
#include "oneflow/core/job/parallel_desc.h"
#include "oneflow/core/job/placement.cfg.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/job/resource_desc.h"
namespace py = pybind11;
......@@ -28,6 +32,12 @@ namespace oneflow {
namespace {
Maybe<Shape> MakeShape(const py::tuple& py_shape) {
DimVector shape_dims{};
for (const auto& dim : py_shape) { shape_dims.emplace_back(dim.cast<int64_t>()); }
return std::make_shared<Shape>(shape_dims);
}
struct PlacementSymbolExportUtil {
static std::shared_ptr<ParallelDesc> ApiCreatePlacementSymbol(
int64_t symbol_id, const std::shared_ptr<cfg::ParallelConf>& symbol_conf) {
......@@ -55,6 +65,62 @@ struct PlacementSymbolExportUtil {
return CreatePlacementSymbol(device_tag, machine_device_ids, hierarchy).GetPtrOrThrow();
}
static Maybe<Symbol<ParallelDesc>> CreatePlacementSymbol(
const std::string& device_type, const py::dict& machine_device_ids,
const std::shared_ptr<Shape>& hierarchy) {
static const HashMap<std::string, std::string> type2device_tag{{"cpu", "cpu"}, {"cuda", "gpu"}};
CHECK_OR_RETURN(type2device_tag.find(device_type) != type2device_tag.end())
<< "Invalid device_type: " << device_type << ", device_type must be \"cpu\" or \"cuda\".";
std::string device_tag = type2device_tag.at(device_type);
std::vector<std::string> formated_machine_device_ids;
for (const auto& pair : machine_device_ids) {
CHECK_OR_RETURN(py::isinstance<py::int_>(pair.first))
<< "Key of machine_device_ids must be int.";
std::string device_name = "";
std::string machine_id = std::to_string(pair.first.cast<int64_t>());
if (py::isinstance<py::int_>(pair.second)) {
device_name = machine_id + ":" + std::to_string(pair.second.cast<int64_t>());
formated_machine_device_ids.emplace_back(device_name);
} else {
CHECK_OR_RETURN(py::isinstance<py::iterable>(pair.second))
<< "Value of machine_device_ids must be int, list or range";
for (const auto& device_id : pair.second) {
CHECK_OR_RETURN(py::isinstance<py::int_>(device_id))
<< "Value of machine_device_ids must be int, list or range of int.";
device_name = machine_id + ":" + std::to_string(device_id.cast<int64_t>());
formated_machine_device_ids.emplace_back(device_name);
}
}
}
const auto parallel_conf =
MakeParallelConf(device_tag, formated_machine_device_ids, hierarchy).GetPtrOrThrow();
std::shared_ptr<ParallelDesc> parallel_desc;
JUST(LogicalRun([&parallel_desc, &parallel_conf](InstructionsBuilder* builder) -> Maybe<void> {
parallel_desc = JUST(builder->GetParallelDescSymbol(parallel_conf));
return Maybe<void>::Ok();
}));
return SymbolOf(*parallel_desc);
}
static Symbol<ParallelDesc> ApiCreatePlacementSymbol(const std::string& device_type,
const py::dict& machine_device_ids,
const std::shared_ptr<Shape>& hierarchy) {
return CreatePlacementSymbol(device_type, machine_device_ids, hierarchy).GetOrThrow();
}
static Maybe<Symbol<ParallelDesc>> CreatePlacementSymbol(const std::string& device_tag,
const py::dict& machine_device_ids,
const py::tuple& hierarchy) {
std::shared_ptr<Shape> shape = CHECK_JUST(MakeShape(hierarchy));
return CreatePlacementSymbol(device_tag, machine_device_ids, shape);
}
static Symbol<ParallelDesc> ApiCreatePlacementSymbol(const std::string& device_type,
const py::dict& machine_device_ids,
const py::tuple& hierarchy) {
return CreatePlacementSymbol(device_type, machine_device_ids, hierarchy).GetOrThrow();
}
static HashMap<int64_t, std::vector<int64_t>> MachineId2DeviceIdList(const ParallelDesc& x) {
const auto map_with_shared_ptr = x.machine_id2sorted_dev_phy_ids();
// pybind11 fails to compile if we return a
......@@ -65,6 +131,67 @@ struct PlacementSymbolExportUtil {
}
return map_without_shared_ptr;
}
static Symbol<ParallelDesc> AllDevicePlacement(const std::string& device_type) {
CHECK_NOTNULL((Global<ResourceDesc, ForEnv>::Get()));
static const HashMap<std::string, std::string> type2device_tag{{"cpu", "cpu"}, {"cuda", "gpu"}};
CHECK(type2device_tag.find(device_type) != type2device_tag.end())
<< "Invalid device_type: " << device_type << ", device_type must be \"cpu\" or \"cuda\".";
std::string device_tag = type2device_tag.at(device_type);
int64_t world_size = GlobalProcessCtx::WorldSize();
int64_t device_num = 0;
{
if (device_tag == "gpu") {
device_num = Global<ResourceDesc, ForEnv>::Get()->GpuDeviceNum();
CHECK(device_num > 0) << "Can't build cuda placement because no gpu is found!";
} else {
device_num = Global<ResourceDesc, ForEnv>::Get()->CpuDeviceNum();
}
}
std::vector<std::string> machine_device_ids;
for (int64_t rank = 0; rank < world_size; ++rank) {
std::string device_name = std::to_string(rank) + ":0-" + std::to_string(device_num - 1);
machine_device_ids.emplace_back(device_name);
}
return SymbolOf(*PlacementSymbolExportUtil::ApiCreatePlacementSymbol(
device_tag, machine_device_ids, std::shared_ptr<Shape>()));
}
static std::string PlacementSymbol2String(Symbol<ParallelDesc> placement) {
std::string device_type = placement->device_tag() == "gpu" ? "\"cuda\"" : "\"cpu\"";
std::string machine_device_ids = "{";
std::string device_name;
int64_t machine_idx = 0;
for (int64_t machine_id : placement->sorted_machine_ids()) {
std::string device_name = std::to_string(machine_id) + " : [";
int64_t device_idx = 0;
for (int64_t device_id : placement->sorted_dev_phy_ids(machine_id)) {
device_name += std::to_string(device_id);
if (++device_idx != placement->sorted_dev_phy_ids(machine_id).size()) {
device_name += ", ";
}
}
device_name += "]";
if (++machine_idx != placement->sorted_machine_ids().size()) { device_name += ", "; }
machine_device_ids += device_name;
}
machine_device_ids += "}";
std::string hierarchy = "(";
int32_t hierarchy_dim_idx = 0;
for (int64_t dim : placement->hierarchy()->dim_vec()) {
hierarchy += std::to_string(dim);
if (++hierarchy_dim_idx != placement->hierarchy()->dim_vec().size()) {
hierarchy += ", ";
} else if (placement->hierarchy()->dim_vec().size() == 1) {
hierarchy += ",";
}
}
hierarchy += ")";
std::string placement_str = "oneflow.placement(device_type=" + device_type
+ ", machine_device_ids=" + machine_device_ids
+ ", hierarchy=" + hierarchy + ")";
return placement_str;
}
};
} // namespace
......@@ -90,6 +217,30 @@ ONEFLOW_API_PYBIND11_MODULE("", m) {
.def("Containing", &ParallelDesc::Bigger)
.def(py::self == py::self)
.def(py::hash(py::self));
py::class_<Symbol<ParallelDesc>, std::shared_ptr<Symbol<ParallelDesc>>>(m, "placement")
.def(py::init([](const std::string& device_type, const py::dict& machine_device_ids,
const std::shared_ptr<Shape>& hierarchy) {
return PlacementSymbolExportUtil::ApiCreatePlacementSymbol(
device_type, machine_device_ids, hierarchy);
}),
py::arg("device_type"), py::arg("machine_device_ids"), py::arg("hierarchy"))
.def(py::init([](const std::string& device_type, const py::dict& machine_device_ids,
const py::tuple& hierarchy) {
return PlacementSymbolExportUtil::ApiCreatePlacementSymbol(
device_type, machine_device_ids, hierarchy);
}),
py::arg("device_type"), py::arg("machine_device_ids"),
py::arg("hierarchy") = py::tuple())
.def_property_readonly("device_type",
[](Symbol<ParallelDesc> p) {
std::string device_type = p->device_tag() == "gpu" ? "cuda" : "cpu";
return device_type;
})
.def_property_readonly("hierarchy", [](Symbol<ParallelDesc> p) { return p->hierarchy(); })
.def("__str__", &PlacementSymbolExportUtil::PlacementSymbol2String)
.def("__repr__", &PlacementSymbolExportUtil::PlacementSymbol2String);
m.def("AllDevicePlacement", &PlacementSymbolExportUtil::AllDevicePlacement);
}
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <pybind11/pybind11.h>
#include "oneflow/api/python/of_api_registry.h"
#include "oneflow/core/common/util.h"
#include "oneflow/core/common/maybe.h"
#include "oneflow/core/common/symbol.h"
#include "oneflow/core/job/sbp_parallel.cfg.h"
namespace py = pybind11;
namespace oneflow {
static const int64_t kMaxSplitAxis = 6;
namespace {
std::string SbpParallelSymbolToString(const Symbol<cfg::SbpParallel>& sbp_sym) {
std::string sbp_str = "oneflow.sbp.";
if (sbp_sym->has_broadcast_parallel()) {
sbp_str += "broadcast";
} else if (sbp_sym->has_partial_sum_parallel()) {
sbp_str += "partial_sum";
} else if (sbp_sym->has_split_parallel()) {
sbp_str += "split(axis=" + std::to_string(sbp_sym->split_parallel().axis()) + ")";
} else {
UNIMPLEMENTED();
}
return sbp_str;
}
Maybe<std::vector<Symbol<cfg::SbpParallel>>> MakeSplitSbpParallelList(int max_split_axis) {
std::shared_ptr<std::vector<Symbol<cfg::SbpParallel>>> ret =
std::make_shared<std::vector<Symbol<cfg::SbpParallel>>>(max_split_axis);
for (int i = 0; i < max_split_axis; ++i) {
cfg::SbpParallel split_sbp_parallel;
split_sbp_parallel.mutable_split_parallel()->set_axis(i);
ret->at(i) = SymbolOf(split_sbp_parallel);
}
return ret;
}
Maybe<Symbol<cfg::SbpParallel>> MakeBroadcastSbpParallel() {
cfg::SbpParallel broadcast_sbp;
broadcast_sbp.mutable_broadcast_parallel();
return SymbolOf(broadcast_sbp);
}
Maybe<Symbol<cfg::SbpParallel>> MakePartialSumSbpParallel() {
cfg::SbpParallel partial_sum_sbp;
partial_sum_sbp.mutable_partial_sum_parallel();
return SymbolOf(partial_sum_sbp);
}
Maybe<Symbol<cfg::SbpParallel>> GetSplitSbpParallel(int axis) {
CHECK_LT_OR_RETURN(axis, kMaxSplitAxis);
static std::vector<Symbol<cfg::SbpParallel>> split_sbp_sym_list =
*JUST(MakeSplitSbpParallelList(kMaxSplitAxis));
return split_sbp_sym_list.at(axis);
}
Maybe<Symbol<cfg::SbpParallel>> GetBroadcastSbpParallel() {
static Symbol<cfg::SbpParallel> broadcast_sbp = JUST(MakeBroadcastSbpParallel());
return broadcast_sbp;
}
Maybe<Symbol<cfg::SbpParallel>> GetPartialSumSbpParallel() {
static Symbol<cfg::SbpParallel> partial_sum_sbp = JUST(MakePartialSumSbpParallel());
return partial_sum_sbp;
}
} // namespace
ONEFLOW_API_PYBIND11_MODULE("sbp", m) {
m.attr("max_split_axis") = kMaxSplitAxis;
py::class_<Symbol<cfg::SbpParallel>, std::shared_ptr<Symbol<cfg::SbpParallel>>>(m, "sbp")
.def("__str__", &SbpParallelSymbolToString)
.def("__repr__", &SbpParallelSymbolToString);
m.def(
"split", [](int axis) { return GetSplitSbpParallel(axis).GetOrThrow(); }, py::arg("axis"));
m.def("broadcast", []() { return GetBroadcastSbpParallel().GetOrThrow(); });
m.def("partial_sum", []() { return GetPartialSumSbpParallel().GetOrThrow(); });
}
} // namespace oneflow
......@@ -332,8 +332,8 @@ class ObjectMsgDefaultAllocator : public ObjectMsgAllocator {
ObjectMsgDefaultAllocator() = default;
static ObjectMsgDefaultAllocator* GlobalObjectMsgAllocator() {
static ObjectMsgDefaultAllocator allocator;
return &allocator;
static ObjectMsgDefaultAllocator* allocator = new ObjectMsgDefaultAllocator();
return allocator;
}
char* Allocate(std::size_t size) override { return allocator_.allocate(size); }
......
......@@ -22,7 +22,7 @@ oneflow._oneflow_internal.CheckAndClearRegistryFlag()
Size = oneflow._oneflow_internal.Size
device = oneflow._oneflow_internal.device
placement = oneflow._oneflow_internal.PlacementSymbol
placement = oneflow._oneflow_internal.placement
no_grad = oneflow._oneflow_internal.autograd.no_grad
# define dtype at the begining of oneflow init
......
......@@ -18,7 +18,11 @@ from __future__ import absolute_import
from contextlib import contextmanager
import oneflow.python.framework.distribute_context as distribute_ctx
from oneflow.python.oneflow_export import oneflow_export, oneflow_deprecate
from oneflow.python.oneflow_export import (
oneflow_export,
oneflow_deprecate,
oneflow_export_value,
)
import oneflow._oneflow_internal
import traceback
......@@ -213,3 +217,51 @@ def get_world_size():
@oneflow_export("distributed.is_multi_client")
def is_multi_client():
return oneflow._oneflow_internal.IsMultiClient()
@oneflow_export("sbp.split")
def split_sbp(
axis: int,
) -> oneflow._oneflow_internal.oneflow.core.job.sbp_parallel.SbpParallel:
r"""Generate a split scheme in which op will be splitted at `axis`.
Args:
axis (int): At `axis` the op will be splitted.
Returns:
SbpParallel: Split scheme object, often required by `to_consistent` method of `Tensor`
Example::
array = numpy.array([[1.0, 2.0], [3.0, 4.0]])
t1 = flow.tensor(array)
ct2 = t1.to_consistent(sbp=flow.sbp.split(0), placement=("cuda", {0: [0, 1, 2, 3]}))
"""
assert type(axis) is int
return oneflow._oneflow_internal.sbp.split(axis)
@oneflow_export_value("sbp.broadcast")
def broadcast_sbp() -> oneflow._oneflow_internal.oneflow.core.job.sbp_parallel.SbpParallel:
r"""Generate a broadcast scheme.
Returns:
SbpParallel: Broadcast scheme object,, often required by `to_consistent` method of `Tensor`
Example::
array = numpy.array([[1.0, 2.0], [3.0, 4.0]])
t1 = flow.tensor(array)
ct2 = t1.to_consistent(sbp=flow.sbp.broadcast, placement=("cuda", {0: [0, 1, 2, 3]}))
"""
return oneflow._oneflow_internal.sbp.broadcast()
@oneflow_export_value("sbp.partial_sum")
def partial_sum_sbp() -> oneflow._oneflow_internal.oneflow.core.job.sbp_parallel.SbpParallel:
r"""Generate a partial_sum scheme.
Returns:
SbpParallel: PartialSum scheme object,, often required by `to_consistent` method of `Tensor`
Example::
array = numpy.array([[1.0, 2.0], [3.0, 4.0]])
t1 = flow.tensor(array)
ct2 = t1.to_consistent(sbp=flow.sbp.partial_sum, placement=("cuda", {0: [0, 1, 2, 3]}))
"""
return oneflow._oneflow_internal.sbp.partial_sum()
......@@ -33,6 +33,16 @@ import oneflow._oneflow_internal
import traceback
@oneflow_export("env.all_device_placement")
def api_all_device_placement(device_type: str) -> None:
r"""Return a placement containing all devices of all machines under env.
Args:
device_type (str): cuda or cpu
"""
return oneflow._oneflow_internal.AllDevicePlacement(device_type)
@oneflow_export("enable_eager_execution")
def api_enable_eager_execution(val: bool = True) -> None:
r"""If True, job will execute in eager mode, else use lazy mode(static graph).
......
......@@ -32,6 +32,22 @@ def oneflow_export(*api_names, **kwargs):
else:
new_api_names = ["experimental." + n for n in new_api_names] + new_api_names
func_or_class._ONEFLOW_API = new_api_names
func_or_class._IS_VALUE = False
return func_or_class
return Decorator
def oneflow_export_value(*api_names, **kwargs):
def Decorator(func_or_class):
new_api_names = list(api_names)
if hasattr(func_or_class, "_ONEFLOW_API_TAG"):
if func_or_class._ONEFLOW_API_TAG == "experimental_api":
new_api_names = ["experimental." + n for n in new_api_names]
else:
new_api_names = ["experimental." + n for n in new_api_names] + new_api_names
func_or_class._ONEFLOW_API = new_api_names
func_or_class._IS_VALUE = True
return func_or_class
return Decorator
......@@ -75,7 +91,7 @@ def export_oneflow_api_internal_symbols(internal_name, api_name):
internal_names_2_api_names = {
"PlacementSymbol": "placement",
"placement": "placement",
"Size": "Size",
"device": "device",
"autograd.no_grad": "no_grad",
......
......@@ -36,6 +36,28 @@ def dtype_related_symbols():
]
def customized_symbols():
return [
# Note that the imported module name shouldn't be same with existing module, use import ... as ... if there is same module with same name
# oneflow.device
"""from oneflow._oneflow_internal import device""",
"""device.__module__ = \"oneflow\"""",
# oneflow.Size
"""from oneflow._oneflow_internal import Size""",
"""Size.__module__ = \"oneflow\"""",
# oneflow.sbp.sbp
"""from oneflow._oneflow_internal.sbp import sbp""",
"""sbp.__module__ = \"oneflow.sbp\"""",
"""del sbp""", # Note that del is used here carefully to avoid deleting the class that was originally exported under the oneflow namespace
# oneflow.Tensor
"""from oneflow.python.framework.tensor import Tensor""",
"""Tensor.__module__ = \"oneflow\"""",
# oneflow.placement
"""from oneflow._oneflow_internal import placement""",
"""placement.__module__ = \"oneflow\"""",
]
class VirtualModule(object):
def __init__(self):
self._func_or_class_dict = {}
......@@ -83,6 +105,8 @@ class VirtualModule(object):
if "experimental/__init__.py" in init_file_path:
lines += dtype_related_symbols()
lines = list(mod_set) + lines
if "oneflow/__init__.py" in init_file_path:
lines = customized_symbols() + lines
f.write("\n" + "\n".join(lines) + "\n")
def submodule_names(self):
......@@ -94,20 +118,24 @@ def include_submodule(modname):
def include_export(api_name_base, symbol):
# print(symbol._IS_VALUE)
if symbol.__name__ == api_name_base:
return ["from {} import {}".format(symbol.__module__, api_name_base)]
output = ["from {} import {}".format(symbol.__module__, api_name_base)]
else:
if inspect.isclass(symbol):
return [
output = [
"from {} import {}".format(symbol.__module__, symbol.__name__),
"{} = {}".format(api_name_base, symbol.__name__),
]
else:
return [
output = [
"from {} import {} as {}".format(
symbol.__module__, symbol.__name__, api_name_base
)
]
if symbol._IS_VALUE:
output.append("{} = {}()".format(api_name_base, api_name_base))
return output
def exported_symbols():
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment