Skip to content
Snippets Groups Projects
Unverified Commit bffde6d1 authored by binbinHan's avatar binbinHan Committed by GitHub
Browse files

Compatible single client (#5417)


* mkdir compatible/single_client

* mv to compatible/singe_client/python

* mv to ./python/compatible/

* use oneflow.compatible.single_client

* back up

* backup

* refactor oneflow.xxx to oneflow.compatible.single_client.xxx

* mv to single_client

* refactor import oneflow to oneflow.compatible.single_client

* backup

* refine

* refine

* make single_clint ci test copy from correct path

* refine

* refine

* delete ioconf and split single-clint ci test

* auto format by CI

* delete default session

* fix method of erase sess id

* delete string multi client in test.yml

* fix spell error

* delete space

* refine

* mv single client python file to compatible_single_client_python

* refine

* fix ci error

* fix bug

* fix bug

* refine

* refine

* delete skip if in test/(modules/tensor)/* because of default eager env

* fix bug

* synchronization oneflow/compatible_single_client and oneflow/python

* fix distributed_run bug

* fix test_multi_process dead lock

* fix test_multi_process dead lock

* synchronization modify between oneflow/python and oneflow/compatible_single_client

* refine

* minor fix

* fix bug

* fix doctest.sh

* rm files

* remove files and add checks

* delete moduels

* Revert "delete moduels"

This reverts commit f95ab369f556b71fd0663b67c6dd51c9b6d5ee81.

Co-authored-by: default avatarXinqi Li <lixinqi0703106@163.com>
Co-authored-by: default avatarliufengwei <2472937968@qq.com>
Co-authored-by: default avataroneflow-ci-bot <ci-bot@oneflow.org>
Co-authored-by: default avataroneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
Co-authored-by: default avatartsai <jackalcooper@gmail.com>
parent 5119226d
No related branches found
No related tags found
No related merge requests found
Showing
with 865 additions and 28 deletions
......@@ -263,7 +263,7 @@ jobs:
cuda_version: ${{ matrix.cuda_version }}
extra_docker_args: $extra_docker_args
python_version: ${{ matrix.python_version }}
- name: Custom Op test (run by oneflow build docker)
- name: Single client custom Op test (run by oneflow build docker)
timeout-minutes: 45
if: matrix.test_suite == 'cpu' && env.is_built != '1'
run: |
......@@ -406,9 +406,6 @@ jobs:
extra_docker_args+=" --env ONEFLOW_TEST_CPU_ONLY=1"
extra_docker_args+=" --env CUDA_VISIBLE_DEVICES=-1"
fi
if [ "$test_suite" == "cuda_new_interface" ]; then
extra_docker_args+=" --env ONEFLOW_TEST_ENABLE_EAGER=1"
fi
# set container_name
container_name=pr-${{ github.event.pull_request.number }}-run-id-${{ github.run_id }}-${test_suite}-test
extra_docker_args+=" --name ${container_name}"
......@@ -443,21 +440,21 @@ jobs:
docker run ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
${image_name} \
bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/build_docs.sh"
- name: Op test (distributed, 1st try)
- name: Single client op test (distributed, 1st try)
timeout-minutes: 45
if: matrix.test_suite == 'cuda'
continue-on-error: true
id: distributed_try_1
run: |
python3 ci/test/distributed_run.py --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
- name: Op test (distributed, 2nd try)
- name: Single client op test (distributed, 2nd try)
timeout-minutes: 45
if: matrix.test_suite == 'cuda' && steps.distributed_try_1.outcome=='failure'
continue-on-error: true
id: distributed_try_2
run: |
python3 ci/test/distributed_run.py --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
- name: Op test (distributed, 3rd try)
- name: Single client op test (distributed, 3rd try)
timeout-minutes: 45
if: matrix.test_suite == 'cuda' && steps.distributed_try_2.outcome=='failure'
continue-on-error: false
......@@ -489,7 +486,7 @@ jobs:
${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
${image_name} \
bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/doctest.sh"
- name: Dry run test (run without runtime)
- name: Single client dry run test (run without runtime)
timeout-minutes: 45
if: matrix.test_suite == 'cuda'
run: |
......@@ -553,7 +550,7 @@ jobs:
-e ONEFLOW_TEST_DIR=$PWD/oneflow/python/test/graph \
${{ env.image_tag }} \
bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/generic_test.sh"
- name: Op test
- name: Single client op test
timeout-minutes: 45
if: matrix.test_suite == 'cpu' || matrix.test_suite == 'cuda_op'
run: |
......@@ -562,7 +559,7 @@ jobs:
${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
${image_name} \
bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/1node_op_test.sh"
- name: Model test
- name: Single client model test
timeout-minutes: 45
if: matrix.test_suite == 'cpu' || matrix.test_suite == 'cuda'
run: |
......@@ -571,7 +568,7 @@ jobs:
${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
${image_name} \
bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/1node_model_test.sh"
- name: Model serve test
- name: Single client model serve test
timeout-minutes: 45
id: model_serve_test
if: matrix.test_suite == 'cuda'
......@@ -587,7 +584,7 @@ jobs:
set -x
docker run ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
${image_name} bash ci/test/print_stack_from_core.sh python3 serving-tmp
- name: Benchmark (mainly for backward compatibility)
- name: Single client benchmark (mainly for backward compatibility)
timeout-minutes: 45
if: matrix.test_suite == 'cuda'
run: |
......@@ -595,7 +592,7 @@ jobs:
docker run ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
${image_name} \
bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/1node_benchmark_test.sh"
- name: Benchmark FP16 (mainly for backward compatibility)
- name: Single client benchmark FP16 (mainly for backward compatibility)
timeout-minutes: 45
if: matrix.test_suite == 'cuda'
run: |
......@@ -603,7 +600,7 @@ jobs:
docker run ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
${image_name} \
bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/1node_benchmark_test_fp16.sh"
- name: XLA Test
- name: Single client XLA Test
timeout-minutes: 45
if: contains(fromJson('["xla", "xla_cpu"]'), matrix.test_suite) && env.is_built != '1'
run: |
......
set -xe
rm -rf /benchmarks
cp -r oneflow/python/benchmarks /benchmarks
cp -r oneflow/compatible_single_client_python/benchmarks /benchmarks
cd /benchmarks
python3 cnn_benchmark/of_cnn_benchmarks.py \
......
set -ex
rm -rf /benchmarks
cp -r oneflow/python/benchmarks /benchmarks
cp -r oneflow/compatible_single_client_python/benchmarks /benchmarks
cd /benchmarks
python3 cnn_benchmark/of_cnn_benchmarks.py \
......
......@@ -7,7 +7,7 @@ test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"}
rm -rf $test_tmp_dir
mkdir -p $test_tmp_dir
cp -r $src_dir/oneflow/python/test/custom_ops $test_tmp_dir
cp -r $src_dir/oneflow/compatible_single_client_python/test/custom_ops $test_tmp_dir
cd $test_tmp_dir
export ONEFLOW_TEST_DEVICE_NUM=1
......
......@@ -6,7 +6,7 @@ test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"}
rm -rf $test_tmp_dir
mkdir -p $test_tmp_dir
cp -r $src_dir/oneflow/python/test $test_tmp_dir
cp -r $src_dir/oneflow/compatible_single_client_python/test $test_tmp_dir
cd $test_tmp_dir
export ONEFLOW_TEST_DEVICE_NUM=1
......
#!/bin/bash
set -xe
cp -r oneflow/python/test /test_dir
cp -r oneflow/compatible_single_client_python/test /test_dir
cd /test_dir
python3 models/1node_test.py
......@@ -10,7 +10,7 @@ test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"./test_tmp_dir"}
rm -rf $test_tmp_dir
mkdir -p $test_tmp_dir
cp -r $src_dir/oneflow/python/test $test_tmp_dir
cp -r $src_dir/oneflow/compatible_single_client_python/test $test_tmp_dir
cd $test_tmp_dir
python3 -m oneflow --doctor
......
......@@ -10,7 +10,7 @@ test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"}
rm -rf $test_tmp_dir
mkdir -p $test_tmp_dir
chmod -R o+w $test_tmp_dir
cp -r $src_dir/oneflow/python/test $test_tmp_dir
cp -r $src_dir/oneflow/compatible_single_client_python/test $test_tmp_dir
cd $test_tmp_dir
ONEFLOW_TEST_DEVICE_NUM=1 python3 test/ops/test_assign.py --failfast --verbose
......
......@@ -161,7 +161,7 @@ def wait_for_env_proto_and_launch_workers(
shell=True,
)
run_docker_cmd = f"ssh {remote_host} docker exec {container_name}"
run_docker_cmd += f" python3 -m oneflow --start_worker --env_proto={workspace_dir}/env.prototxt"
run_docker_cmd += f" python3 -m oneflow.compatible.single_client --start_worker --env_proto={workspace_dir}/env.prototxt"
print("[docker agent]", run_docker_cmd)
remote_docker_proc[remote_host] = subprocess.Popen(
run_docker_cmd, shell=True
......
......@@ -11,7 +11,7 @@ python3 -c 'import oneflow; f=open("oneflow_path.txt", "w"); f.write(oneflow.__p
gpu_num=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l)
python3 $src_dir/ci/test/parallel_run.py \
--gpu_num=${gpu_num} \
--dir=$(cat oneflow_path.txt) \
--dir=$(cat oneflow_path.txt)/python \
--timeout=1 \
--verbose \
--chunk=1 \
......
......@@ -9,7 +9,7 @@ test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"./test_tmp_dir"}
rm -rf $test_tmp_dir
mkdir -p $test_tmp_dir
cp -r $src_dir/oneflow/python/benchmarks $test_tmp_dir
cp -r $src_dir/oneflow/compatible_single_client_python/benchmarks $test_tmp_dir
cd $test_tmp_dir/benchmarks
export ONEFLOW_DRY_RUN=1
......
......@@ -5,7 +5,7 @@ test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"}
rm -rf $test_tmp_dir
mkdir -p $test_tmp_dir
cp -r $src_dir/oneflow/python/test/xrt $test_tmp_dir
cp -r $src_dir/oneflow/compatible_single_client_python/test/xrt $test_tmp_dir
cd $test_tmp_dir
python3 -c "import oneflow; assert oneflow.sysconfig.with_xla()"
for f in $src_dir/oneflow/python/test/xrt/*.py; do python3 "$f"; done
python3 -c "import oneflow.compatible.single_client as flow; assert flow.sysconfig.with_xla()"
for f in $src_dir/oneflow/compatible_single_client_python/test/xrt/*.py; do python3 "$f"; done
......@@ -296,20 +296,36 @@ add_custom_target(of_pyscript_copy ALL
COMMAND "${CMAKE_COMMAND}" -E copy
"${PROJECT_SOURCE_DIR}/oneflow/__main__.py" "${of_pyscript_dir}/oneflow/__main__.py"
COMMAND rm -rf ${of_pyscript_dir}/oneflow/python
COMMAND rm -rf ${of_pyscript_dir}/oneflow/compatible
COMMAND ${CMAKE_COMMAND} -E create_symlink "${PROJECT_SOURCE_DIR}/oneflow/python" "${of_pyscript_dir}/oneflow/python"
COMMAND ${CMAKE_COMMAND} -E copy_directory "${PROJECT_SOURCE_DIR}/oneflow/compatible_single_client_python" "${of_pyscript_dir}/oneflow/compatible/single_client/python"
COMMAND "${CMAKE_COMMAND}" -E copy
"${PROJECT_SOURCE_DIR}/oneflow/single_client_init.py" "${of_pyscript_dir}/oneflow/compatible/single_client/__init__.py"
COMMAND ${Python_EXECUTABLE} "${PROJECT_SOURCE_DIR}/tools/conver_single_client_name_space.py" "${of_pyscript_dir}/oneflow/compatible"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/compatible/__init__.py"
COMMAND "${CMAKE_COMMAND}" -E copy
"${PROJECT_SOURCE_DIR}/oneflow/single_client_main.py" "${of_pyscript_dir}/oneflow/compatible/single_client/__main__.py"
COMMAND ${CMAKE_COMMAND} -E make_directory "${of_pyscript_dir}/oneflow/distributed"
COMMAND ${CMAKE_COMMAND} -E create_symlink "${PROJECT_SOURCE_DIR}/oneflow/python/distributed/launch.py" "${of_pyscript_dir}/oneflow/distributed/launch.py"
COMMAND ${CMAKE_COMMAND} -E copy_directory "${of_proto_python_dir}/oneflow/core" "${of_pyscript_dir}/oneflow/core"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/core/__init__.py"
COMMAND ${CMAKE_COMMAND} -E make_directory "${of_pyscript_dir}/oneflow/F"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/F/__init__.py"
COMMAND ${CMAKE_COMMAND} -E make_directory "${of_pyscript_dir}/oneflow/compatible/single_client/F"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/compatible/single_client/F/__init__.py"
COMMAND ${CMAKE_COMMAND} -E make_directory "${of_pyscript_dir}/oneflow/experimental/F"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/experimental/F/__init__.py"
COMMAND ${CMAKE_COMMAND} -E make_directory "${of_pyscript_dir}/oneflow/compatible/single_client/experimental/F"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/compatible/single_client/experimental/F/__init__.py"
COMMAND ${CMAKE_COMMAND} -E make_directory "${of_pyscript_dir}/oneflow/python_gen"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/python_gen/__init__.py"
COMMAND ${CMAKE_COMMAND} -E make_directory "${of_pyscript_dir}/oneflow/compatible/single_client/python_gen"
COMMAND ${CMAKE_COMMAND} -E touch "${of_pyscript_dir}/oneflow/compatible/single_client/python_gen/__init__.py"
COMMAND ${Python_EXECUTABLE} ${PROJECT_SOURCE_DIR}/tools/generate_pip_version.py ${gen_pip_args} --src=${PROJECT_SOURCE_DIR}
COMMAND ${Python_EXECUTABLE} "${PROJECT_SOURCE_DIR}/tools/generate_oneflow_symbols_export_file.py"
"${PROJECT_SOURCE_DIR}" "${of_pyscript_dir}/oneflow/python_gen/__export_symbols__.py")
"${PROJECT_SOURCE_DIR}/oneflow/python" "${of_pyscript_dir}/oneflow/python_gen/__export_symbols__.py" "python"
COMMAND ${Python_EXECUTABLE} "${PROJECT_SOURCE_DIR}/tools/generate_oneflow_symbols_export_file.py"
"${of_pyscript_dir}/oneflow/compatible" "${of_pyscript_dir}/oneflow/compatible/single_client/python_gen/__export_symbols__.py" "compatible")
# source this file to add oneflow in PYTHONPATH
file(WRITE "${PROJECT_BINARY_DIR}/source.sh" "export PYTHONPATH=${of_pyscript_dir}:$PYTHONPATH")
......
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import absolute_import
from oneflow.compatible import single_client as flow
from oneflow.core.operator import op_conf_pb2 as op_conf_util
from oneflow.core.register import logical_blob_id_pb2 as logical_blob_id_util
from oneflow.compatible_single_client_python.framework import (
interpret_util as interpret_util,
)
from oneflow.compatible_single_client_python.framework import id_util as id_util
from oneflow.compatible_single_client_python.framework import (
remote_blob as remote_blob_util,
)
from oneflow.compatible_single_client_python.framework import hob as hob
from oneflow.compatible_single_client_python.lib.core import enable_if as enable_if
from oneflow.compatible_single_client_python.oneflow_export import oneflow_export
from typing import Union, Tuple, List, Optional, Sequence, Callable
import oneflow._oneflow_internal
@oneflow_export("advanced.distribute_clone")
def api_distribute_clone(
x: oneflow._oneflow_internal.BlobDesc, name: Optional[str] = None
) -> Tuple[oneflow._oneflow_internal.BlobDesc]:
func = enable_if.unique([distribute_clone])
return func(x, name=name)
@enable_if.condition(hob.in_global_mode)
def distribute_clone(x, name=None):
if name is None:
name = id_util.UniqueStr("DistributeClone_")
op_conf = op_conf_util.OperatorConf()
op_conf.name = name
setattr(op_conf.distribute_clone_conf, "in", x.unique_name)
parallel_size = flow.current_scope().device_parallel_desc_symbol.parallel_num
op_conf.distribute_clone_conf.out.extend(
["out_%d" % i for i in range(parallel_size)]
)
interpret_util.ConsistentForward(op_conf)
ret = []
for i in range(parallel_size):
out = "out_%d" % i
lbi = logical_blob_id_util.LogicalBlobId()
lbi.op_name = op_conf.name
lbi.blob_name = out
ret.append(remote_blob_util.RemoteBlob(lbi))
return tuple(ret)
@oneflow_export("advanced.distribute_add")
def api_distribute_add(
xs: Sequence[oneflow._oneflow_internal.BlobDesc], name: Optional[str] = None
) -> oneflow._oneflow_internal.BlobDesc:
func = enable_if.unique([distribute_add])
return func(xs, name=name)
@enable_if.condition(hob.in_global_mode)
def distribute_add(xs, name=None):
assert flow.current_scope().device_parallel_desc_symbol.parallel_num == len(xs)
if name is None:
name = id_util.UniqueStr("DistributeAdd_")
op_conf = op_conf_util.OperatorConf()
op_conf.name = name
getattr(op_conf.distribute_add_conf, "in").extend(
[_SoleConsistentLbn(x) for x in xs]
)
op_conf.distribute_add_conf.out = "out"
interpret_util.ConsistentForward(op_conf)
lbi = logical_blob_id_util.LogicalBlobId()
lbi.op_name = op_conf.name
lbi.blob_name = "out"
return remote_blob_util.RemoteBlob(lbi)
@oneflow_export("advanced.distribute_split")
def api_distribute_split(
x: oneflow._oneflow_internal.BlobDesc, axis: int = 0, name: Optional[str] = None
) -> Tuple[oneflow._oneflow_internal.BlobDesc]:
func = enable_if.unique([distribute_split])
return func(x, axis=axis, name=name)
@enable_if.condition(hob.in_global_mode)
def distribute_split(x, axis=0, name=None):
if name is None:
name = id_util.UniqueStr("DistributeSplit_")
op_conf = op_conf_util.OperatorConf()
op_conf.name = name
setattr(op_conf.distribute_split_conf, "in", x.unique_name)
op_conf.distribute_split_conf.axis = axis
parallel_size = flow.current_scope().device_parallel_desc_symbol.parallel_num
op_conf.distribute_split_conf.out.extend(
["out_%d" % i for i in range(parallel_size)]
)
interpret_util.ConsistentForward(op_conf)
ret = []
for i in range(parallel_size):
out = "out_%d" % i
lbi = logical_blob_id_util.LogicalBlobId()
lbi.op_name = op_conf.name
lbi.blob_name = out
ret.append(remote_blob_util.RemoteBlob(lbi))
return tuple(ret)
@oneflow_export("advanced.distribute_concat")
def api_distribute_concat(
xs: Sequence[oneflow._oneflow_internal.BlobDesc],
axis: int = 0,
name: Optional[str] = None,
) -> oneflow._oneflow_internal.BlobDesc:
func = enable_if.unique([distribute_concat])
return func(xs, axis=axis, name=name)
@enable_if.condition(hob.in_global_mode)
def distribute_concat(xs, axis=0, name=None):
assert flow.current_scope().device_parallel_desc_symbol.parallel_num == len(xs)
if name is None:
name = id_util.UniqueStr("DistributeConcat_")
op_conf = op_conf_util.OperatorConf()
op_conf.name = name
getattr(op_conf.distribute_concat_conf, "in").extend(
[_SoleConsistentLbn(x) for x in xs]
)
op_conf.distribute_concat_conf.axis = axis
op_conf.distribute_concat_conf.out = "out"
interpret_util.ConsistentForward(op_conf)
lbi = logical_blob_id_util.LogicalBlobId()
lbi.op_name = op_conf.name
lbi.blob_name = "out"
return remote_blob_util.RemoteBlob(lbi)
@oneflow_export("advanced.distribute_map")
def api_distribute_map(
xs: Union[
Sequence[oneflow._oneflow_internal.BlobDesc], oneflow._oneflow_internal.BlobDesc
],
f: Callable[
[oneflow._oneflow_internal.BlobDesc, oneflow._oneflow_internal.BlobDesc],
oneflow._oneflow_internal.BlobDesc,
],
axis: int = 0,
) -> Tuple[oneflow._oneflow_internal.BlobDesc]:
func = enable_if.unqiue([distribute_map])
return func(xs, f, axis=axis)
@enable_if.condition(hob.in_global_mode)
def distribute_map(xs, f, axis=0):
_AssertInputOrOutput(xs)
if isinstance(xs, (list, tuple)) == False:
xs = [xs]
splitted_xs = [flow.advanced.distribute_split(x, axis=axis) for x in xs]
results = [_UnderSingleDevicePlacementScope(f, *x) for x in zip(*splitted_xs)]
output_is_not_container = all(
[isinstance(x, oneflow._oneflow_internal.ConsistentBlob) for x in results]
)
results = [_TryWrapTuple(x) for x in results]
result = [flow.advanced.distribute_concat(x, axis=axis) for x in zip(*results)]
if output_is_not_container:
return result[0]
return tuple(result)
@oneflow_export("cast_to_current_logical_view")
def cast_to_current_logical_view(
x: oneflow._oneflow_internal.BlobDesc,
) -> oneflow._oneflow_internal.BlobDesc:
if (
isinstance(x, oneflow._oneflow_internal.ConsistentBlob)
and flow.scope.mirrored_view_enabled()
) or (
isinstance(x, oneflow._oneflow_internal.MirroredBlob)
and flow.scope.consistent_view_enabled()
):
x = flow.identity(x)
return x
def _SoleConsistentLbn(blob):
assert blob.parallel_size == 1
if isinstance(blob, oneflow._oneflow_internal.ConsistentBlob):
return blob.unique_name
if isinstance(blob, oneflow._oneflow_internal.MirroredBlob):
return blob.sub_consistent_blob_list[0].unique_name
raise NotImplementedError
def _AssertInputOrOutput(xs):
assert isinstance(xs, (list, tuple, oneflow._oneflow_internal.ConsistentBlob))
if isinstance(xs, (list, tuple)):
assert len(xs) > 0
assert all(
[isinstance(x, oneflow._oneflow_internal.ConsistentBlob) for x in xs]
)
def _TryWrapTuple(ys):
_AssertInputOrOutput(ys)
if isinstance(ys, (list, tuple)) == False:
ys = (ys,)
return ys
def _UnderSingleDevicePlacementScope(f, *args):
parallel_desc_symbol = flow.current_scope().device_parallel_desc_symbol
for machine_id, device_id in _EachMachineIdAndDeviceId(parallel_desc_symbol):
mch_dev_str = "@%d:%d" % (machine_id, device_id)
with flow.scope.placement(parallel_desc_symbol.device_tag, mch_dev_str):
return f(*args)
def _EachMachineIdAndDeviceId(parallel_desc_symbol):
for (
machine_id,
device_id_list,
) in parallel_desc_symbol.machine_id2device_id_list.items():
for device_id in device_id_list:
yield machine_id, device_id
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import Union, Sequence, Tuple
from oneflow.compatible_single_client_python.framework.tensor import Tensor
from oneflow.compatible_single_client_python.oneflow_export import oneflow_export
from oneflow.compatible_single_client_python.framework.tensor_tuple_util import (
convert_to_tensor_tuple,
)
from oneflow._oneflow_internal import TensorTuple
from oneflow._oneflow_internal.autograd import grad as grad_api
from oneflow._oneflow_internal.autograd import backward as backward_api
@oneflow_export("autograd.grad")
def grad(
outputs: Union[Tensor, Sequence[Tensor]],
inputs: Union[Tensor, Sequence[Tensor]],
out_grads: Union[Tensor, Sequence[Tensor], None] = None,
retain_graph: bool = False,
create_graph: bool = False,
) -> Tuple[Tensor]:
in_grads = grad_api(
convert_to_tensor_tuple(outputs),
convert_to_tensor_tuple(inputs),
convert_to_tensor_tuple(out_grads),
retain_graph,
create_graph,
)
return tuple([Tensor(x) for x in in_grads])
@oneflow_export("autograd.backward")
def backward(
outputs: Union[Tensor, Sequence[Tensor]],
out_grads: Union[Tensor, Sequence[Tensor], None],
retain_graph: bool = False,
create_graph: bool = False,
) -> None:
backward_api(
convert_to_tensor_tuple(outputs),
convert_to_tensor_tuple(out_grads),
retain_graph,
create_graph,
)
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
import numpy as np
class StopWatch:
def __init__(self):
pass
def start(self):
self.start_time = time.time()
self.last_split = self.start_time
def set_start(self, val):
self.start_time = val
self.last_split = self.start_time
def split(self):
now = time.time()
duration = now - self.last_split
self.last_split = now
return duration
def stop(self):
self.stop_time = time.time()
def duration(self):
return self.stop_time - self.start_time
class BERTSpeedometer:
def __init__(self):
self.watch = StopWatch()
self.throughoutput_list = []
def speedometer_cb(
self,
step,
start_time,
total_batch_size,
skip_iter_num,
iter_num,
loss_print_every_n_iter,
):
def callback(train_loss):
assert skip_iter_num >= 0
if skip_iter_num == 0 and step == 0:
self.watch.set_start(start_time)
print("Start trainning without any skipping iteration.")
if step < skip_iter_num:
if step == 0:
print(
"Skipping {} iterations for benchmark purpose.".format(
skip_iter_num
)
)
if (step + 1) == skip_iter_num:
self.watch.start()
print("Start trainning.")
else:
train_step = step - skip_iter_num
if (train_step + 1) % loss_print_every_n_iter == 0:
total_loss = train_loss[0].mean()
mlm_loss = train_loss[1].mean()
nsp_loss = train_loss[2].mean()
avg_elapse_time_per_iter = (
self.watch.split() / loss_print_every_n_iter
)
sentences_per_sec = total_batch_size / avg_elapse_time_per_iter
print(
"iter {}, total_loss: {:.3f}, mlm_loss: {:.3f}, nsp_loss: {:.3f}, speed: {:.3f}(sec/batch), {:.3f}(sentences/sec)".format(
train_step,
total_loss,
mlm_loss,
nsp_loss,
avg_elapse_time_per_iter,
sentences_per_sec,
)
)
self.throughoutput_list.append(sentences_per_sec)
if (train_step + 1) == iter_num:
self.watch.stop()
totoal_duration = self.watch.duration()
avg_sentences_per_sec = (
total_batch_size * iter_num / totoal_duration
)
print("-".ljust(66, "-"))
print(
"average speed: {:.3f}(sentences/sec), new_cal_method: {:.3f}(sentences/sec)".format(
avg_sentences_per_sec, np.mean(self.throughoutput_list)
)
)
print("-".ljust(66, "-"))
return callback
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import math
from oneflow.compatible import single_client as flow
from oneflow.core.common import data_type_pb2 as data_type_util
from oneflow.core.operator import op_conf_pb2 as op_conf_util
class BertBackbone(object):
def __init__(
self,
input_ids_blob,
input_mask_blob,
token_type_ids_blob,
vocab_size,
seq_length=512,
hidden_size=768,
num_hidden_layers=12,
num_attention_heads=12,
intermediate_size=3072,
hidden_act="gelu",
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
max_position_embeddings=512,
type_vocab_size=16,
initializer_range=0.02,
):
with flow.scope.namespace("bert"):
with flow.scope.namespace("embeddings"):
(self.embedding_output_, self.embedding_table_) = _EmbeddingLookup(
input_ids_blob=input_ids_blob,
vocab_size=vocab_size,
embedding_size=hidden_size,
initializer_range=initializer_range,
word_embedding_name="word_embeddings",
)
self.embedding_output_ = _EmbeddingPostprocessor(
input_blob=self.embedding_output_,
seq_length=seq_length,
embedding_size=hidden_size,
use_token_type=True,
token_type_ids_blob=token_type_ids_blob,
token_type_vocab_size=type_vocab_size,
token_type_embedding_name="token_type_embeddings",
use_position_embeddings=True,
position_embedding_name="position_embeddings",
initializer_range=initializer_range,
max_position_embeddings=max_position_embeddings,
dropout_prob=hidden_dropout_prob,
)
with flow.scope.namespace("encoder"):
attention_mask_blob = _CreateAttentionMaskFromInputMask(
input_mask_blob,
from_seq_length=seq_length,
to_seq_length=seq_length,
)
self.all_encoder_layers_ = _TransformerModel(
input_blob=self.embedding_output_,
attention_mask_blob=attention_mask_blob,
seq_length=seq_length,
hidden_size=hidden_size,
num_hidden_layers=num_hidden_layers,
num_attention_heads=num_attention_heads,
intermediate_size=intermediate_size,
intermediate_act_fn=GetActivation(hidden_act),
hidden_dropout_prob=hidden_dropout_prob,
attention_probs_dropout_prob=attention_probs_dropout_prob,
initializer_range=initializer_range,
do_return_all_layers=False,
)
self.sequence_output_ = self.all_encoder_layers_[-1]
def embedding_output(self):
return self.embedding_output_
def all_encoder_layers(self):
return self.all_encoder_layers_
def sequence_output(self):
return self.sequence_output_
def embedding_table(self):
return self.embedding_table_
def CreateInitializer(std):
return flow.truncated_normal(std)
def _Gelu(in_blob):
return flow.math.gelu(in_blob)
def _TransformerModel(
input_blob,
attention_mask_blob,
seq_length,
hidden_size=768,
num_hidden_layers=12,
num_attention_heads=12,
intermediate_size=3072,
intermediate_act_fn=_Gelu,
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
initializer_range=0.02,
do_return_all_layers=False,
):
assert hidden_size % num_attention_heads == 0
attention_head_size = int(hidden_size / num_attention_heads)
input_width = hidden_size
prev_output_blob = flow.reshape(input_blob, (-1, input_width))
all_layer_output_blobs = []
for layer_idx in range(num_hidden_layers):
with flow.scope.namespace("layer_%d" % layer_idx):
layer_input_blob = prev_output_blob
with flow.scope.namespace("attention"):
with flow.scope.namespace("self"):
attention_output_blob = _AttentionLayer(
from_blob=layer_input_blob,
to_blob=layer_input_blob,
attention_mask_blob=attention_mask_blob,
num_attention_heads=num_attention_heads,
size_per_head=attention_head_size,
attention_probs_dropout_prob=attention_probs_dropout_prob,
initializer_range=initializer_range,
do_return_2d_tensor=True,
from_seq_length=seq_length,
to_seq_length=seq_length,
)
with flow.scope.namespace("output"):
attention_output_blob = _FullyConnected(
attention_output_blob,
input_size=num_attention_heads * attention_head_size,
units=hidden_size,
weight_initializer=CreateInitializer(initializer_range),
name="dense",
)
attention_output_blob = _Dropout(
attention_output_blob, hidden_dropout_prob
)
attention_output_blob = attention_output_blob + layer_input_blob
attention_output_blob = _LayerNorm(
attention_output_blob, hidden_size
)
with flow.scope.namespace("intermediate"):
if callable(intermediate_act_fn):
act_fn = op_conf_util.kNone
else:
act_fn = intermediate_act_fn
intermediate_output_blob = _FullyConnected(
attention_output_blob,
input_size=num_attention_heads * attention_head_size,
units=intermediate_size,
activation=act_fn,
weight_initializer=CreateInitializer(initializer_range),
name="dense",
)
if callable(intermediate_act_fn):
intermediate_output_blob = intermediate_act_fn(
intermediate_output_blob
)
with flow.scope.namespace("output"):
layer_output_blob = _FullyConnected(
intermediate_output_blob,
input_size=intermediate_size,
units=hidden_size,
weight_initializer=CreateInitializer(initializer_range),
name="dense",
)
layer_output_blob = _Dropout(layer_output_blob, hidden_dropout_prob)
layer_output_blob = layer_output_blob + attention_output_blob
layer_output_blob = _LayerNorm(layer_output_blob, hidden_size)
prev_output_blob = layer_output_blob
all_layer_output_blobs.append(layer_output_blob)
input_shape = (-1, seq_length, hidden_size)
if do_return_all_layers:
final_output_blobs = []
for layer_output_blob in all_layer_output_blobs:
final_output_blob = flow.reshape(layer_output_blob, input_shape)
final_output_blobs.append(final_output_blob)
return final_output_blobs
else:
final_output_blob = flow.reshape(prev_output_blob, input_shape)
return [final_output_blob]
def _AttentionLayer(
from_blob,
to_blob,
attention_mask_blob,
num_attention_heads=1,
size_per_head=512,
query_act=op_conf_util.kNone,
key_act=op_conf_util.kNone,
value_act=op_conf_util.kNone,
attention_probs_dropout_prob=0.0,
initializer_range=0.02,
do_return_2d_tensor=False,
batch_size=None,
from_seq_length=None,
to_seq_length=None,
):
def TransposeForScores(input_blob, num_attention_heads, seq_length, width):
output_blob = flow.reshape(
input_blob, [-1, seq_length, num_attention_heads, width]
)
output_blob = flow.transpose(output_blob, perm=[0, 2, 1, 3])
return output_blob
from_blob_2d = flow.reshape(from_blob, [-1, num_attention_heads * size_per_head])
to_blob_2d = flow.reshape(to_blob, [-1, num_attention_heads * size_per_head])
query_blob = _FullyConnected(
from_blob_2d,
input_size=num_attention_heads * size_per_head,
units=num_attention_heads * size_per_head,
activation=query_act,
name="query",
weight_initializer=CreateInitializer(initializer_range),
)
key_blob = _FullyConnected(
to_blob_2d,
input_size=num_attention_heads * size_per_head,
units=num_attention_heads * size_per_head,
activation=key_act,
name="key",
weight_initializer=CreateInitializer(initializer_range),
)
value_blob = _FullyConnected(
to_blob_2d,
input_size=num_attention_heads * size_per_head,
units=num_attention_heads * size_per_head,
activation=value_act,
name="value",
weight_initializer=CreateInitializer(initializer_range),
)
query_blob = TransposeForScores(
query_blob, num_attention_heads, from_seq_length, size_per_head
)
key_blob = TransposeForScores(
key_blob, num_attention_heads, to_seq_length, size_per_head
)
attention_scores_blob = flow.matmul(query_blob, key_blob, transpose_b=True)
attention_scores_blob = attention_scores_blob * (
1.0 / math.sqrt(float(size_per_head))
)
attention_mask_blob = flow.reshape(
attention_mask_blob, [-1, 1, from_seq_length, to_seq_length]
)
attention_mask_blob = flow.cast(attention_mask_blob, dtype=flow.float)
addr_blob = (attention_mask_blob - 1.0) * 10000.0
attention_scores_blob = attention_scores_blob + addr_blob
attention_probs_blob = flow.nn.softmax(attention_scores_blob)
attention_probs_blob = _Dropout(attention_probs_blob, attention_probs_dropout_prob)
value_blob = flow.reshape(
value_blob, [-1, to_seq_length, num_attention_heads, size_per_head]
)
value_blob = flow.transpose(value_blob, perm=[0, 2, 1, 3])
context_blob = flow.matmul(attention_probs_blob, value_blob)
context_blob = flow.transpose(context_blob, perm=[0, 2, 1, 3])
if do_return_2d_tensor:
context_blob = flow.reshape(
context_blob, [-1, num_attention_heads * size_per_head]
)
else:
context_blob = flow.reshape(
context_blob, [-1, from_seq_length, num_attention_heads * size_per_head]
)
return context_blob
def _FullyConnected(
input_blob, input_size, units, activation=None, name=None, weight_initializer=None
):
weight_blob = flow.get_variable(
name=name + "-weight",
shape=[input_size, units],
dtype=input_blob.dtype,
initializer=weight_initializer,
)
bias_blob = flow.get_variable(
name=name + "-bias",
shape=[units],
dtype=input_blob.dtype,
initializer=flow.constant_initializer(0.0),
)
output_blob = flow.matmul(input_blob, weight_blob)
output_blob = flow.nn.bias_add(output_blob, bias_blob)
return output_blob
def _Dropout(input_blob, dropout_prob):
if dropout_prob == 0.0:
return input_blob
return flow.nn.dropout(input_blob, rate=dropout_prob)
def _LayerNorm(input_blob, hidden_size):
return flow.layers.layer_norm(
input_blob, name="LayerNorm", begin_norm_axis=-1, begin_params_axis=-1
)
def _CreateAttentionMaskFromInputMask(to_mask_blob, from_seq_length, to_seq_length):
output = flow.cast(to_mask_blob, dtype=flow.float)
output = flow.reshape(output, [-1, 1, to_seq_length])
zeros = flow.constant(0.0, dtype=flow.float, shape=[from_seq_length, to_seq_length])
output = zeros + output
return output
def _EmbeddingPostprocessor(
input_blob,
seq_length,
embedding_size,
use_token_type=False,
token_type_ids_blob=None,
token_type_vocab_size=16,
token_type_embedding_name="token_type_embeddings",
use_position_embeddings=True,
position_embedding_name="position_embeddings",
initializer_range=0.02,
max_position_embeddings=512,
dropout_prob=0.1,
):
output = input_blob
if use_token_type:
assert token_type_ids_blob is not None
token_type_table = flow.get_variable(
name=token_type_embedding_name,
shape=[token_type_vocab_size, embedding_size],
dtype=input_blob.dtype,
initializer=CreateInitializer(initializer_range),
)
token_type_embeddings = flow.gather(
params=token_type_table, indices=token_type_ids_blob, axis=0
)
output = output + token_type_embeddings
if use_position_embeddings:
position_table = flow.get_variable(
name=position_embedding_name,
shape=[1, max_position_embeddings, embedding_size],
dtype=input_blob.dtype,
initializer=CreateInitializer(initializer_range),
)
assert seq_length <= max_position_embeddings
if seq_length != max_position_embeddings:
position_table = flow.slice(
position_table, begin=[None, 0, 0], size=[None, seq_length, -1]
)
output = output + position_table
output = _LayerNorm(output, embedding_size)
output = _Dropout(output, dropout_prob)
return output
def _EmbeddingLookup(
input_ids_blob,
vocab_size,
embedding_size=128,
initializer_range=0.02,
word_embedding_name="word_embeddings",
):
embedding_table = flow.get_variable(
name=word_embedding_name,
shape=[vocab_size, embedding_size],
dtype=flow.float,
initializer=CreateInitializer(initializer_range),
)
output = flow.gather(params=embedding_table, indices=input_ids_blob, axis=0)
return output, embedding_table
def GetActivation(name):
if name == "linear":
return None
elif name == "relu":
return flow.math.relu
elif name == "tanh":
return flow.math.tanh
elif name == "gelu":
return flow.math.gelu
else:
raise Exception("unsupported activation")
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment