Skip to content
Snippets Groups Projects
Unverified Commit 37c63928 authored by Shenghang Tsai's avatar Shenghang Tsai Committed by GitHub
Browse files

Replace oneflow_worker with worker agent (#4900)

* naive impl

* refine

* refine

* add log

* refine

* refine

* refine

* refine

* add todo

* refine

* refine

* sync dynamic libs

* refine

* fix docker cmd

* fix rank

* refine

* refine

* add callbacks simple rpc

* refine

* refine

* fix

* refine

* refine

* refine

* fix conn

* support tradional mode

* refine

* refine

* refine

* rm

* refine

* refine

* refine

* refine todo

* refine

* refine

* rm unused

* rm todo

* revert

* refine

* add log

* refine

* refine

* fix order

* refine

* refine

* refine

* refine

* refine

* refine

* refine

* rm

* rename

* add comment

* refine

* rm

* refine

* refine

* refine

* refine

* add todo

* add info

* refine

* refine

* refine

* add back some legacy code

* refine

* refine

* refine

* refine

* refine

* rm oneflow_worker exe

* rm log

* fix bug

* support --cmd

* add check

* refine

* fix

* fmt
parent 98f937f1
No related branches found
No related tags found
No related merge requests found
...@@ -347,11 +347,9 @@ jobs: ...@@ -347,11 +347,9 @@ jobs:
- name: Op test (distributed) - name: Op test (distributed)
if: matrix.test_suite == 'cuda' if: matrix.test_suite == 'cuda'
run: | run: |
python3 ci/test/distributed_run.py --make_dotssh python3 ci/test/distributed_run.py \
python3 ci/test/distributed_run.py --run --bash_script=ci/test/2node_op_test.sh \ --bash_script=ci/test/2node_op_test.sh \
--build_docker_img \ --oneflow_wheel_path=${wheelhouse_dir}
--oneflow_wheel_path=${wheelhouse_dir} \
--oneflow_worker_bin=${bin_dir}/oneflow_worker
- name: Print backtrace (distributed test) - name: Print backtrace (distributed test)
if: always() && matrix.test_suite == 'cuda' if: always() && matrix.test_suite == 'cuda'
run: | run: |
...@@ -362,8 +360,8 @@ jobs: ...@@ -362,8 +360,8 @@ jobs:
if: always() && matrix.test_suite == 'cuda' if: always() && matrix.test_suite == 'cuda'
uses: ./.github/actions/upload_oss uses: ./.github/actions/upload_oss
with: with:
src_path: oneflow_temp src_path: distributed-tmp
oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/oneflow_temp oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/distributed-tmp
oss_access_key_id: ${{ secrets.OSS_ACCESS_KEY_ID }} oss_access_key_id: ${{ secrets.OSS_ACCESS_KEY_ID }}
oss_access_key_secret: ${{ secrets.OSS_ACCESS_KEY_SECRET }} oss_access_key_secret: ${{ secrets.OSS_ACCESS_KEY_SECRET }}
- name: Dry run test (run without runtime) - name: Dry run test (run without runtime)
......
...@@ -3,14 +3,13 @@ set -xe ...@@ -3,14 +3,13 @@ set -xe
export PYTHONUNBUFFERED=1 export PYTHONUNBUFFERED=1
bash ci/test/try_install.sh
src_dir=${ONEFLOW_SRC_DIR:-"$PWD"} src_dir=${ONEFLOW_SRC_DIR:-"$PWD"}
test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"} test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"}
rm -rf $test_tmp_dir rm -rf $test_tmp_dir
mkdir -p $test_tmp_dir mkdir -p $test_tmp_dir
chmod -R o+w $test_tmp_dir
cp -r $src_dir/oneflow/python/test $test_tmp_dir cp -r $src_dir/oneflow/python/test $test_tmp_dir
cd $test_tmp_dir cd $test_tmp_dir
......
This diff is collapsed.
# This file lists libraries that we will assume to be present on the host system and hence
# should NOT be bundled inside AppImages. This is a working document; expect it to change
# over time. File format: one filename per line. Each entry should have a justification comment.
# See the useful tool at https://abi-laboratory.pro/index.php?view=navigator&symbol=hb_buffer_set_cluster_level#result
# to investigate issues with missing symbols.
ld-linux.so.2
ld-linux-x86-64.so.2
libanl.so.1
libBrokenLocale.so.1
libcidn.so.1
# libcrypt.so.1 # Not part of glibc anymore as of Fedora 30. See https://github.com/slic3r/Slic3r/issues/4798 and https://pagure.io/fedora-docs/release-notes/c/01d74b33564faa42959c035e1eee286940e9170e?branch=f28
libc.so.6
libdl.so.2
libm.so.6
libmvec.so.1
# libnsl.so.1 # Not part of glibc anymore as of Fedora 28. See https://github.com/RPCS3/rpcs3/issues/5224#issuecomment-434930594
libnss_compat.so.2
# libnss_db.so.2 # Not part of neon-useredition-20190321-0530-amd64.iso
libnss_dns.so.2
libnss_files.so.2
libnss_hesiod.so.2
libnss_nisplus.so.2
libnss_nis.so.2
libpthread.so.0
libresolv.so.2
librt.so.1
libthread_db.so.1
libutil.so.1
# These files are all part of the GNU C Library which should never be bundled.
# List was generated from a fresh build of glibc 2.25.
libstdc++.so.6
# Workaround for:
# usr/lib/libstdc++.so.6: version `GLIBCXX_3.4.21' not found
libGL.so.1
# The above may be missing on Chrome OS, https://www.reddit.com/r/Crostini/comments/d1lp67/ultimaker_cura_no_longer_running_as_an_appimage/
libEGL.so.1
# Part of the video driver (OpenGL); present on any regular
# desktop system, may also be provided by proprietary drivers.
# Known to cause issues if it's bundled.
libGLdispatch.so.0
libGLX.so.0
# reported to be superfluent and conflicting system libraries (graphics driver)
# see https://github.com/linuxdeploy/linuxdeploy/issues/89
libOpenGL.so.0
# Qt installed via install-qt.sh apparently links to this library
# part of OpenGL like libGL/libEGL, so excluding it should not cause any problems
# https://github.com/linuxdeploy/linuxdeploy/issues/152
libdrm.so.2
# Workaround for:
# Antergos Linux release 2015.11 (ISO-Rolling)
# /usr/lib/libdrm_amdgpu.so.1: error: symbol lookup error: undefined symbol: drmGetNodeTypeFromFd (fatal)
# libGL error: unable to load driver: swrast_dri.so
# libGL error: failed to load driver: swrast
# Unrecognized OpenGL version
libglapi.so.0
# Part of mesa
# known to cause problems with graphics, see https://github.com/RPCS3/rpcs3/issues/4427#issuecomment-381674910
libgbm.so.1
# Part of mesa
# https://github.com/probonopd/linuxdeployqt/issues/390#issuecomment-529036305
libxcb.so.1
# Workaround for:
# Fedora 23
# symbol lookup error: /lib64/libxcb-dri3.so.0: undefined symbol: xcb_send_fd
# Uncertain if this is required to be bundled for some distributions - if so we need to write a version check script and use LD_PRELOAD to load the system version if it is newer
# Fedora 25:
# undefined symbol: xcb_send_request_with_fds
# https://github.com/AppImage/AppImages/issues/128
libX11.so.6
# Workaround for:
# Fedora 23
# symbol lookup error: ./lib/libX11.so.6: undefined symbol: xcb_wait_for_reply64
# Uncertain if this is required to be bundled for some distributions - if so we need to write a version check script and use LD_PRELOAD to load the system version if it is newer
libgio-2.0.so.0
# Workaround for:
# On Ubuntu, "symbol lookup error: /usr/lib/x86_64-linux-gnu/gtk-2.0/modules/liboverlay-scrollbar.so: undefined symbol: g_settings_new"
# libgdk-x11-2.0.so.0 # Missing on openSUSE-Tumbleweed-KDE-Live-x86_64-Snapshot20170601-Media.iso
# libgtk-x11-2.0.so.0 # Missing on openSUSE-Tumbleweed-KDE-Live-x86_64-Snapshot20170601-Media.iso
libasound.so.2
# Workaround for:
# No sound, e.g., in VLC.AppImage (does not find sound cards)
# https://github.com/AppImage/pkg2appimage/issues/475
# libgdk_pixbuf-2.0.so.0
# Was: Workaround for:
# On Ubuntu, get (inkscape:25621): GdkPixbuf-WARNING **: Error loading XPM image loader: Image type 'xpm' is not supported
libfontconfig.so.1
# Workaround for:
# Application stalls when loading fonts during application launch; e.g., KiCad on ubuntu-mate
libthai.so.0
# Workaround for:
# audacity: /tmp/.mount_AudaciUsFbON/usr/lib/libthai.so.0: version `LIBTHAI_0.1.25' not found (required by /usr/lib64/libpango-1.0.so.0)
# on openSUSE Tumbleweed
# other "low-level" font rendering libraries
# should fix https://github.com/probonopd/linuxdeployqt/issues/261#issuecomment-377522251
# and https://github.com/probonopd/linuxdeployqt/issues/157#issuecomment-320755694
libfreetype.so.6
libharfbuzz.so.0
# Note, after discussion we do not exlude this, but we can use a dummy library that just does nothing
# libselinux.so.1
# Workaround for:
# sed: error while loading shared libraries: libpcre.so.3: cannot open shared object file: No such file or directory
# Some distributions, such as Arch Linux, do not come with libselinux.so.1 by default.
# The solution is to bundle a dummy mock library:
# echo "extern int is_selinux_enabled(void){return 0;}" >> selinux-mock.c
# gcc -s -shared -o libselinux.so.1 -Wl,-soname,libselinux.so.1 selinux-mock.c
# strip libselinux.so.1
# More information: https://github.com/AppImage/AppImages/issues/83
# and https://github.com/AppImage/AppImageKit/issues/775#issuecomment-614954821
# https://gitlab.com/sulinos/devel/libselinux-dummy
# The following are assumed to be part of the base system
# Removing these has worked e.g., for Krita. Feel free to report if
# you think that some of these should go into AppImages and why.
libcom_err.so.2
libexpat.so.1
libgcc_s.so.1
libglib-2.0.so.0
libgpg-error.so.0
# libgssapi_krb5.so.2 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there
# libgssapi.so.3 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
# libhcrypto.so.4 # Missing on openSUSE LEAP 42.0
# libheimbase.so.1 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
# libheimntlm.so.0 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
# libhx509.so.5 # Missing on openSUSE LEAP 42.0
libICE.so.6
# libidn.so.11 # Does not come with Solus by default
# libk5crypto.so.3 # Runnning AppImage built on Debian 9 or Ubuntu 16.04 on an Archlinux fails otherwise; https://github.com/AppImage/AppImages/issues/301
# libkeyutils.so.1 # Does not come with Void Linux by default; https://github.com/Subsurface-divelog/subsurface/issues/1971#issuecomment-466606834
# libkrb5.so.26 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there. Missing on openSUSE LEAP 42.0
# libkrb5.so.3 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there
# libkrb5support.so.0 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there
libp11-kit.so.0
# libpcre.so.3 # Missing on Fedora 24, SLED 12 SP1, and openSUSE Leap 42.2
# libroken.so.18 # Mission on openSUSE LEAP 42.0
# libsasl2.so.2 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
libSM.so.6
libusb-1.0.so.0
libuuid.so.1
# libwind.so.0 # Missing on openSUSE LEAP 42.0
# Potentially dangerous libraries
libgobject-2.0.so.0
# Workaround for:
# Rectangles instead of fonts
# https://github.com/AppImage/AppImages/issues/240
libpangoft2-1.0.so.0
libpangocairo-1.0.so.0
libpango-1.0.so.0
# FIXME:
# Can get symbol lookup error: /lib64/libpango-1.0.so.0: undefined symbol: g_log_structured_standard
# if libcairo is bundled but libpango is not
# Workaround for:
# e.g., Spotify
# relocation error: /lib/x86_64-linux-gnu/libgcrypt.so.20:
# symbol gpgrt_lock_lock, version GPG_ERROR_1.0 not defined
# in file libgpg-error.so.0 with link time reference
libgpg-error.so.0
libjack.so.0
# it must match the ABI of the JACK server which is installed in the base system
# rncbc confirmed this
# However, this library is missing on Fedora-WS-Live-31-1-9
# which means that we should avoid using JACK altogether if possible
# Unsolved issue:
# https://github.com/probonopd/linuxdeployqt/issues/35
# Error initializing NSS with a persistent database (sql:/home/me/.pki/nssdb): libsoftokn3.so: cannot open shared object file: No such file or directory
# Error initializing NSS without a persistent database: NSS error code: -5925
# nss_error=-5925, os_error=0
# libnss3.so should not be removed from the bundles, as this causes other issues, e.g.,
# https://github.com/probonopd/linuxdeployqt/issues/35#issuecomment-256213517
# and https://github.com/AppImage/AppImages/pull/114
# libnss3.so
# The following cannot be excluded, see
# https://github.com/AppImage/AppImages/commit/6c7473d8cdaaa2572248dcc53d7f617a577ade6b
# http://stackoverflow.com/questions/32644157/forcing-a-binary-to-use-a-specific-newer-version-of-a-shared-library-so
# libssl.so.1
# libssl.so.1.0.0
# libcrypto.so.1
# libcrypto.so.1.0.0
# According to https://github.com/RicardoEPRodrigues/3Engine/issues/4#issuecomment-511598362
# libGLEW is not tied to a specific GPU. It's linked against libGL.so.1
# and that one is different depending on the installed driver.
# In fact libGLEW is changing its soversion very often, so you should always bundle libGLEW.so.2.0
# libglut.so.3 # to be confirmed
libxcb-dri3.so.0 # https://github.com/AppImage/AppImages/issues/348
libxcb-dri2.so.0 # https://github.com/probonopd/linuxdeployqt/issues/331#issuecomment-442276277
# If the next line turns out to cause issues, we will have to remove it again and find another solution
libfribidi.so.0 # https://github.com/olive-editor/olive/issues/221 and https://github.com/knapsu/plex-media-player-appimage/issues/14
# Workaround for:
# symbol lookup error: /lib/x86_64-linux-gnu/libgnutls.so.30: undefined symbol: __gmpz_limbs_write
# https://github.com/ONLYOFFICE/appimage-desktopeditors/issues/3
# Apparently coreutils depends on it, so it should be safe to assume that it comes with every target system
libgmp.so.10
...@@ -145,10 +145,6 @@ foreach(oneflow_single_file ${oneflow_all_src}) ...@@ -145,10 +145,6 @@ foreach(oneflow_single_file ${oneflow_all_src})
if(RPC_BACKEND MATCHES "GRPC") if(RPC_BACKEND MATCHES "GRPC")
list(APPEND of_transport_test_cc ${oneflow_single_file}) list(APPEND of_transport_test_cc ${oneflow_single_file})
endif() endif()
elseif("${oneflow_single_file}" MATCHES "^${PROJECT_SOURCE_DIR}/oneflow/core/job/oneflow_worker.cpp$")
if (RPC_BACKEND MATCHES "GRPC")
list(APPEND of_main_cc ${oneflow_single_file})
endif()
elseif("${oneflow_single_file}" MATCHES "^${PROJECT_SOURCE_DIR}/oneflow/(core|user|xrt)/.*_test\\.cpp$") elseif("${oneflow_single_file}" MATCHES "^${PROJECT_SOURCE_DIR}/oneflow/(core|user|xrt)/.*_test\\.cpp$")
# test file # test file
list(APPEND of_all_test_cc ${oneflow_single_file}) list(APPEND of_all_test_cc ${oneflow_single_file})
......
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/common/maybe.h"
#include "oneflow/core/job/oneflow.h"
#include "oneflow/core/job/env_global_objects_scope.h"
#include "oneflow/core/job/session_global_objects_scope.h"
#include "oneflow/core/job/env.pb.h"
#include "oneflow/core/job/cluster.h"
#include "oneflow/core/job/cluster_instruction.h"
#include "oneflow/core/control/ctrl_client.h"
#include "oneflow/core/control/ctrl_server.h"
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
namespace oneflow {
namespace {
Maybe<void> Run(const std::string& env_proto_filepath) {
EnvProto env_proto;
ParseProtoFromTextFile(env_proto_filepath, &env_proto);
CHECK_ISNULL_OR_RETURN(Global<EnvGlobalObjectsScope>::Get());
// Global<T>::New is not allowed to be called here
// because glog is not constructed yet and LOG(INFO) has bad bahavior
Global<EnvGlobalObjectsScope>::SetAllocated(new EnvGlobalObjectsScope());
JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto));
JUST(Cluster::WorkerLoop());
Global<EnvGlobalObjectsScope>::Delete();
return Maybe<void>::Ok();
}
} // namespace
} // namespace oneflow
DEFINE_string(env_proto, "", "EnvProto file path");
int main(int argc, char* argv[]) {
using namespace oneflow;
gflags::ParseCommandLineFlags(&argc, &argv, true);
CHECK_JUST(Run(FLAGS_env_proto));
return 0;
}
...@@ -29,108 +29,6 @@ from oneflow.python.oneflow_export import oneflow_export ...@@ -29,108 +29,6 @@ from oneflow.python.oneflow_export import oneflow_export
import subprocess import subprocess
@oneflow_export("deprecated.init_worker")
def init_worker(
scp_binary: bool = True,
use_uuid: bool = True,
ssh_port=22,
bootstrap_conf_list=None,
) -> None:
assert type(env_util.default_env_proto) is EnvProto
env_util.defautl_env_proto_mutable = False
env_proto = env_util.default_env_proto
assert len(env_proto.machine) > 0 or len(bootstrap_conf_list) > 0
assert type(scp_binary) is bool
assert type(use_uuid) is bool
oneflow_worker_path = os.getenv("ONEFLOW_WORKER_BIN")
assert oneflow_worker_path is not None, "please set env ONEFLOW_WORKER_BIN"
assert os.path.isfile(
oneflow_worker_path
), "binary oneflow_worker not found, please check your environment variable ONEFLOW_WORKER_BIN, path: {}".format(
oneflow_worker_path
)
global _temp_run_dir
if use_uuid:
assert scp_binary is True
_temp_run_dir = os.getenv("HOME") + "/oneflow_temp/" + str(uuid.uuid1())
else:
_temp_run_dir = os.getenv("HOME") + "/oneflow_temp/no_uuid"
run_dir = _temp_run_dir
run_dir = os.path.abspath(os.path.expanduser(run_dir))
if bootstrap_conf_list is None:
env_file = NamedTemporaryFile(delete=False)
if sys.version_info >= (3, 0):
env_file.write(pbtxt.MessageToString(env_proto).encode())
else:
env_file.write(pbtxt.MessageToString(env_proto))
env_file.close()
for machine in env_proto.machine:
if machine.id == 0:
pass
else:
_SendBinaryAndConfig2Worker(
machine.addr,
oneflow_worker_path,
env_file.name,
run_dir,
scp_binary,
ssh_port,
)
os.remove(env_file.name)
else:
worker_env_proto = EnvProto()
worker_env_proto.CopyFrom(env_proto)
worker_env_proto.ClearField("ctrl_bootstrap_conf")
for bootstrap_conf in bootstrap_conf_list:
if bootstrap_conf.rank == 0:
continue
# set ctrl_bootstrap_conf of worker
assert bootstrap_conf.HasField("host")
worker_env_proto.ctrl_bootstrap_conf.CopyFrom(bootstrap_conf)
env_file = NamedTemporaryFile(delete=False)
if sys.version_info >= (3, 0):
env_file.write(pbtxt.MessageToString(worker_env_proto).encode())
else:
env_file.write(pbtxt.MessageToString(worker_env_proto))
env_file.close()
_SendBinaryAndConfig2Worker(
bootstrap_conf.host,
oneflow_worker_path,
env_file.name,
run_dir,
scp_binary,
ssh_port,
)
os.remove(env_file.name)
@oneflow_export("deprecated.delete_worker")
def delete_worker(ssh_port=22) -> None:
ssh_port_arg = " -p {} ".format(ssh_port)
# assert env_util.env_proto_mutable == False
env_proto = env_util.default_env_proto
assert isinstance(env_proto, EnvProto)
global _temp_run_dir
assert _temp_run_dir != ""
for machine in env_proto.machine:
if machine.id == 0:
continue
ssh_prefix = (
"ssh {} ".format(ssh_port_arg)
+ getpass.getuser()
+ "@"
+ machine.addr
+ " "
)
if os.getenv("ONEFLOW_WORKER_KEEP_LOG"):
print("worker log kept at: {}".format(machine.addr), flush=True)
else:
_SystemCall(ssh_prefix + '"rm -r ' + _temp_run_dir + '"')
print("temp run dir removed at: {}".format(machine.addr), flush=True)
@oneflow_export("deprecated.delete_worker_by_bootstrap") @oneflow_export("deprecated.delete_worker_by_bootstrap")
def delete_worker_by_bootstrap(ssh_port=22) -> None: def delete_worker_by_bootstrap(ssh_port=22) -> None:
ssh_port_arg = " -p {} ".format(ssh_port) ssh_port_arg = " -p {} ".format(ssh_port)
...@@ -166,63 +64,4 @@ def delete_worker_of_multi_process(run_dir) -> None: ...@@ -166,63 +64,4 @@ def delete_worker_of_multi_process(run_dir) -> None:
print("temp run dir removed at localhost:" + run_dir, flush=True) print("temp run dir removed at localhost:" + run_dir, flush=True)
def _SendBinaryAndConfig2Worker(
addr, oneflow_worker_path, env_proto_path, run_dir, scp_binary, ssh_port
):
ssh_port_arg = " -p {} ".format(ssh_port)
scp_port_arg = " -P {} ".format(ssh_port)
_SystemCall(
"ssh-copy-id {} -f ".format(ssh_port_arg) + getpass.getuser() + "@" + addr
)
ssh_prefix = "ssh {}".format(ssh_port_arg) + getpass.getuser() + "@" + addr + " "
remote_file_prefix = " " + getpass.getuser() + "@" + addr + ":"
assert run_dir != ""
_SystemCall(ssh_prefix + '"mkdir -p ' + run_dir + '"')
if scp_binary:
_SystemCall(
"scp {}".format(scp_port_arg)
+ oneflow_worker_path
+ remote_file_prefix
+ run_dir
+ "/oneflow_worker"
)
_SystemCall(
"scp {}".format(scp_port_arg)
+ env_proto_path
+ remote_file_prefix
+ run_dir
+ "/env.proto"
)
oneflow_libibverbs_path = os.getenv("ONEFLOW_LIBIBVERBS_PATH")
libibverbs_env_str = ""
if oneflow_libibverbs_path:
libibverbs_env_str = "ONEFLOW_LIBIBVERBS_PATH=" + oneflow_libibverbs_path + " "
oneflow_cmd = (
'"cd '
+ run_dir
+ "; "
+ libibverbs_env_str
+ "nohup ./oneflow_worker -logtostderr=0 -log_dir=./log -v=0 -logbuflevel=-1 "
+ "-env_proto=./env.proto "
+ ' 1>/dev/null 2>&1 </dev/null & "'
)
_SystemCall(ssh_prefix + oneflow_cmd)
proc = subprocess.Popen(
ssh_prefix + "ps aux",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
shell=True,
)
outs, errs = proc.communicate(timeout=5)
print(outs)
assert "oneflow_worker" in str(outs), "fail to start oneflow_worker"
print("oneflow worker initialized:", addr, flush=True)
def _SystemCall(cmd):
print(cmd, flush=True)
subprocess.check_call(cmd, shell=True)
_temp_run_dir = "" _temp_run_dir = ""
...@@ -17,9 +17,7 @@ from __future__ import absolute_import ...@@ -17,9 +17,7 @@ from __future__ import absolute_import
import os import os
import sys import sys
import getpass
import imp import imp
import inspect
import socket import socket
from contextlib import closing from contextlib import closing
import uuid import uuid
...@@ -33,7 +31,6 @@ from oneflow.core.job.env_pb2 import EnvProto ...@@ -33,7 +31,6 @@ from oneflow.core.job.env_pb2 import EnvProto
from oneflow.python.oneflow_export import oneflow_export from oneflow.python.oneflow_export import oneflow_export
from typing import Any, Dict, Callable from typing import Any, Dict, Callable
import subprocess import subprocess
import platform
class _ClearDefaultSession(object): class _ClearDefaultSession(object):
...@@ -169,6 +166,55 @@ _unittest_env_initilized = False ...@@ -169,6 +166,55 @@ _unittest_env_initilized = False
_unittest_worker_initilized = False _unittest_worker_initilized = False
def worker_agent_port():
port_txt = os.getenv("ONEFLOW_TEST_WORKER_AGENT_PORT")
if port_txt:
return int(port_txt)
else:
return None
def worker_agent_authkey():
key = os.getenv("ONEFLOW_TEST_WORKER_AGENT_AUTHKEY")
assert key
return key
def use_worker_agent():
return worker_agent_port() is not None
def cast(conn=None, cmd=None, msg=None):
cmd = "cast/" + cmd
print("[unittest]", f"[{cmd}]", msg)
conn.send(cmd.encode())
conn.send(msg.encode())
def call(conn=None, cmd=None, msg=None):
cmd = "call/" + cmd
print("[unittest]", f"[{cmd}]", msg)
conn.send(cmd.encode())
msg_ = ""
if msg is not None:
msg_ = msg
conn.send(msg_.encode())
return conn.recv().decode()
def launch_worker_via_agent(host=None, env_proto=None):
print("[unittest]", "launching worker via agent at", host)
from multiprocessing.connection import Client
address = ("localhost", worker_agent_port())
conn = Client(address, authkey=worker_agent_authkey().encode())
cast(conn=conn, cmd="host", msg=host)
cast(conn=conn, cmd="env_proto", msg=pbtxt.MessageToString(env_proto))
assert call(conn=conn, cmd="start_worker") == "ok"
print("[unittest]", "worker launched via agent at", host)
conn.close()
@oneflow_export("unittest.TestCase") @oneflow_export("unittest.TestCase")
class TestCase(unittest.TestCase): class TestCase(unittest.TestCase):
def setUp(self): def setUp(self):
...@@ -180,18 +226,20 @@ class TestCase(unittest.TestCase): ...@@ -180,18 +226,20 @@ class TestCase(unittest.TestCase):
master_port = os.getenv("ONEFLOW_TEST_MASTER_PORT") master_port = os.getenv("ONEFLOW_TEST_MASTER_PORT")
assert master_port, "env var ONEFLOW_TEST_MASTER_PORT not set" assert master_port, "env var ONEFLOW_TEST_MASTER_PORT not set"
oneflow.env.ctrl_port(int(master_port)) oneflow.env.ctrl_port(int(master_port))
if enable_init_by_host_list():
oneflow.env.machine(node_list())
data_port = os.getenv("ONEFLOW_TEST_DATA_PORT") data_port = os.getenv("ONEFLOW_TEST_DATA_PORT")
if data_port: if data_port:
oneflow.env.data_port(int(data_port)) oneflow.env.data_port(int(data_port))
ssh_port = os.getenv("ONEFLOW_TEST_SSH_PORT") if enable_init_by_host_list():
oneflow.env.machine(node_list())
data_port = os.getenv("ONEFLOW_TEST_DATA_PORT")
print("initializing worker...") print("initializing worker...")
oneflow.deprecated.init_worker( for machine in env_util.default_env_proto.machine:
scp_binary=True, use_uuid=True, ssh_port=int(ssh_port) if machine.id == 0:
pass
else:
launch_worker_via_agent(
host=machine.addr, env_proto=env_util.default_env_proto
) )
atexit.register(oneflow.deprecated.delete_worker, ssh_port=ssh_port)
_unittest_worker_initilized = True
else: else:
ctrl_port = os.getenv("ONEFLOW_TEST_CTRL_PORT") ctrl_port = os.getenv("ONEFLOW_TEST_CTRL_PORT")
config_rank_ctrl_port = -1 config_rank_ctrl_port = -1
...@@ -215,21 +263,17 @@ class TestCase(unittest.TestCase): ...@@ -215,21 +263,17 @@ class TestCase(unittest.TestCase):
config_rank_ctrl_port, config_rank_ctrl_port,
config_node_size, config_node_size,
) )
worker_env_proto = EnvProto()
data_port = os.getenv("ONEFLOW_TEST_DATA_PORT") worker_env_proto.CopyFrom(env_util.default_env_proto)
if data_port: worker_env_proto.ClearField("ctrl_bootstrap_conf")
oneflow.env.data_port(int(data_port)) for bootstrap_conf in bootstrap_conf_list:
if bootstrap_conf.rank == 0:
ssh_port = os.getenv("ONEFLOW_TEST_SSH_PORT") continue
print("initializing worker...") # set ctrl_bootstrap_conf of worker
oneflow.deprecated.init_worker( assert bootstrap_conf.HasField("host")
scp_binary=True, worker_env_proto.ctrl_bootstrap_conf.CopyFrom(bootstrap_conf)
use_uuid=True, launch_worker_via_agent(
ssh_port=int(ssh_port), host=bootstrap_conf.host, env_proto=worker_env_proto
bootstrap_conf_list=bootstrap_conf_list,
)
atexit.register(
oneflow.deprecated.delete_worker_by_bootstrap, ssh_port=ssh_port
) )
_unittest_worker_initilized = True _unittest_worker_initilized = True
elif device_num() > 1 and enable_multi_process(): elif device_num() > 1 and enable_multi_process():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment