diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 08d8424edd68dcaefc46ddc41beb7660fd692a12..e49eabe9ad332a4994d491779ea0f015ea4ae203 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -347,11 +347,9 @@ jobs: - name: Op test (distributed) if: matrix.test_suite == 'cuda' run: | - python3 ci/test/distributed_run.py --make_dotssh - python3 ci/test/distributed_run.py --run --bash_script=ci/test/2node_op_test.sh \ - --build_docker_img \ - --oneflow_wheel_path=${wheelhouse_dir} \ - --oneflow_worker_bin=${bin_dir}/oneflow_worker + python3 ci/test/distributed_run.py \ + --bash_script=ci/test/2node_op_test.sh \ + --oneflow_wheel_path=${wheelhouse_dir} - name: Print backtrace (distributed test) if: always() && matrix.test_suite == 'cuda' run: | @@ -362,8 +360,8 @@ jobs: if: always() && matrix.test_suite == 'cuda' uses: ./.github/actions/upload_oss with: - src_path: oneflow_temp - oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/oneflow_temp + src_path: distributed-tmp + oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/distributed-tmp oss_access_key_id: ${{ secrets.OSS_ACCESS_KEY_ID }} oss_access_key_secret: ${{ secrets.OSS_ACCESS_KEY_SECRET }} - name: Dry run test (run without runtime) diff --git a/ci/test/2node_op_test.sh b/ci/test/2node_op_test.sh index 593713414506948a7de032230a658f5fe3c98033..a914e90218ec8654ca8f6a24525cf9dd3e767928 100644 --- a/ci/test/2node_op_test.sh +++ b/ci/test/2node_op_test.sh @@ -3,14 +3,13 @@ set -xe export PYTHONUNBUFFERED=1 -bash ci/test/try_install.sh - src_dir=${ONEFLOW_SRC_DIR:-"$PWD"} test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"} rm -rf $test_tmp_dir mkdir -p $test_tmp_dir +chmod -R o+w $test_tmp_dir cp -r $src_dir/oneflow/python/test $test_tmp_dir cd $test_tmp_dir diff --git a/ci/test/distributed_run.py b/ci/test/distributed_run.py index 796e9ac248cc2ef6c9a18675665bbc2d921959b9..b806f9707e1156ae73de269cf0395202cc8dd04e 100644 --- a/ci/test/distributed_run.py +++ b/ci/test/distributed_run.py @@ -1,22 +1,17 @@ +from multiprocessing.connection import Listener import os import subprocess import socket import tempfile from contextlib import closing -from subprocess import TimeoutExpired import argparse import uuid - -FIX_SSH_PERMISSION = """ -mkdir -p /run/sshd -chown root ~/.ssh -chmod 700 ~/.ssh -chown root ~/.ssh/* -chmod 600 ~/.ssh/* -chmod 400 ~/.ssh/id_rsa -chmod 400 ~/.ssh/id_rsa.pub -chmod 600 ~/.ssh/config -""" +import getpass +import atexit +import pathlib +import asyncio +import glob +from datetime import date HARD_CODED_AFFILIATIONS = { "192.168.1.11": ["192.168.1.12",], @@ -27,6 +22,20 @@ HARD_CODED_AFFILIATIONS = { } +def is_img_existing(tag): + returncode = subprocess.run( + "docker image inspect {}".format(tag), + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ).returncode + if returncode == 0: + print("[OK]", tag) + return True + else: + return False + + def get_affiliations(host): # TODO(tsai): Implement a HTTP endpoint to retrieve affiliations if host in HARD_CODED_AFFILIATIONS: @@ -50,239 +59,534 @@ def find_free_port(): return s.getsockname()[1] -def make_dotssh(dotssh_dir): - bash_cmd = f"""set -ex -rm -rf /root/.ssh -ssh-keygen -t rsa -N "" -f /root/.ssh/id_rsa -cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys && \ - chmod 600 /root/.ssh/authorized_keys -/etc/init.d/ssh start && \ - ssh-keyscan -H localhost >> /root/.ssh/known_hosts +async def spawn_shell_and_check(cmd: str = None): + p = await asyncio.create_subprocess_shell(cmd,) + await p.wait() + assert p.returncode == 0, cmd -cp -r /root/.ssh/* {dotssh_dir} -chmod 777 {dotssh_dir} -chmod 777 {dotssh_dir}/* -""" - with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f: - f_name = f.name - f.write(bash_cmd) - f.flush() - subprocess.check_call( - f"docker run --rm -v /tmp:/host/tmp -v {dotssh_dir}:{dotssh_dir} -w $PWD oneflow-test:$USER bash /host/{f_name}", - shell=True, - ) - config_content = """Host * - StrictHostKeyChecking no -""" - with open(os.path.join(dotssh_dir, "config"), "w") as f: - f.write(config_content) + +async def spawn_shell(cmd: str = None): + p = await asyncio.create_subprocess_shell(cmd,) + await p.wait() -def build_docker_img(hostname=None, workspace_dir=None): - if hostname: +async def build_docker_img(remote_host=None, workspace_dir=None): + if remote_host: assert workspace_dir - subprocess.check_call("rm -f > oneflow-src.zip", shell=True) - subprocess.check_call( - "git archive --format zip HEAD > oneflow-src.zip", shell=True + await spawn_shell_and_check("rm -f > oneflow-src.zip") + await spawn_shell_and_check("git archive --format zip HEAD > oneflow-src.zip") + await spawn_shell_and_check( + f"scp oneflow-src.zip {remote_host}:{workspace_dir}/oneflow-src.zip", ) - subprocess.check_call( - f"scp oneflow-src.zip {hostname}:{workspace_dir}/oneflow-src.zip", - shell=True, + await spawn_shell_and_check( + f"ssh {remote_host} unzip {workspace_dir}/oneflow-src.zip -d {workspace_dir}/oneflow-src", ) - subprocess.check_call( - f"ssh {hostname} unzip {workspace_dir}/oneflow-src.zip -d {workspace_dir}/oneflow-src", - shell=True, - ) - subprocess.check_call( - f"ssh {hostname} bash {workspace_dir}/oneflow-src/docker/ci/test/build.sh", - shell=True, + await spawn_shell_and_check( + f"ssh {remote_host} bash {workspace_dir}/oneflow-src/docker/ci/test/build.sh", ) else: - subprocess.check_call(f"bash docker/ci/test/build.sh", shell=True) + await spawn_shell_and_check(f"bash docker/ci/test/build.sh") -def create_remote_workspace_dir(hostname, workspace_dir): - subprocess.check_call(f"ssh {hostname} mkdir -p {workspace_dir}", shell=True) +async def create_remote_workspace_dir(remote_host=None, workspace_dir=None): + await spawn_shell_and_check(f"ssh {remote_host} mkdir -p {workspace_dir}") + print("create_remote_workspace_dir done") -def launch_remote_container( - hostname, docker_ssh_port, survival_time, dotssh_dir, workspace_dir +async def launch_remote_container( + remote_host=None, + survival_time=None, + workspace_dir=None, + container_name=None, + img_tag=None, + oneflow_wheel_path=None, + oneflow_build_path=None, ): - subprocess.check_call( - f"scp -r {dotssh_dir} {hostname}:{workspace_dir}/dotssh", shell=True - ) - bash_cmd = f"""set -ex -{FIX_SSH_PERMISSION} -/usr/sbin/sshd -p {docker_ssh_port} -sleep {survival_time} -""" - with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f: - f_name = f.name - f.write(bash_cmd) - f.flush() - subprocess.check_call( - f"scp {f_name} {hostname}:{workspace_dir}/launch_ssh_server.sh", shell=True, - ) - docker_cmd = f"""docker run --privileged --cidfile {workspace_dir}/worker.cid --network host --shm-size=8g --rm -v {workspace_dir}/dotssh:/root/.ssh -v {workspace_dir}:{workspace_dir} -w {workspace_dir} -v /dataset:/dataset -v /model_zoo:/model_zoo oneflow-test:$USER bash launch_ssh_server.sh + print("launching remote container at", remote_host) + assert img_tag + pythonpath_args = None + if oneflow_wheel_path: + pythonpath_args = "" + elif oneflow_build_path: + pythonpath_args = f"--env PYTHONPATH={workspace_dir}/python_scripts" + else: + raise ValueError("must have oneflow_wheel_path or oneflow_build_path") + docker_cmd = f"""docker run --privileged -d --network host --shm-size=8g --rm -v {workspace_dir}:{workspace_dir} -w {workspace_dir} -v /dataset:/dataset -v /model_zoo:/model_zoo --name {container_name} {pythonpath_args} {img_tag} sleep {survival_time} """ - ssh_cmd = f"ssh {hostname} {docker_cmd}" - print(ssh_cmd, flush=True) - proc = subprocess.Popen(ssh_cmd, shell=True,) - try: - proc.wait(timeout=10) - raise ValueError("sshd quit early, returncode:", proc.returncode) - except TimeoutExpired: - survival_time_min = survival_time / 60 - survival_time_min = int(survival_time_min) - print( - f"remote container launched, host: {hostname}, ssh port: {docker_ssh_port}, .ssh dir: {dotssh_dir}, survival: {survival_time_min} mins", - flush=True, + await spawn_shell_and_check(f"ssh {remote_host} {docker_cmd}") + if oneflow_wheel_path: + whl_basename = os.path.basename(oneflow_wheel_path) + await spawn_shell_and_check( + f"ssh {remote_host} docker exec {container_name} python3 -m pip install {workspace_dir}/{whl_basename}" ) + await spawn_shell( + f"ssh {remote_host} docker exec {container_name} python3 -m oneflow --doctor" + ) + + +def handle_cast(conn=None, cmd=None): + received_cmd: str = conn.recv().decode() + assert received_cmd.startswith("cast/") + received_cmd = received_cmd.replace("cast/", "") + assert received_cmd == cmd, (received_cmd, cmd) + return conn.recv().decode() + + +def handle_call(conn=None, cmd=None, response=None): + received_cmd: str = conn.recv().decode() + assert received_cmd.startswith("call/") + received_cmd = received_cmd.replace("call/", "") + assert received_cmd == cmd, (received_cmd, cmd) + msg = conn.recv().decode() + conn.send(response.encode()) + return msg -def run_bash_script( - bash_script, - timeout, - ssh_port, - dotssh_dir, - this_host, - remote_host, - oneflow_worker_bin, - oneflow_wheel_path, +def wait_for_env_proto_and_launch_workers( + agent_port=None, agent_authkey=None, remote_hosts=None ): - assert os.path.exists(bash_script) - log_dir = "./unittest-log-" + str(uuid.uuid4()) - ctrl_port = find_free_port() - data_port = find_free_port() - exports = f""" + listener = Listener(("localhost", agent_port), authkey=agent_authkey) + while True: + conn = listener.accept() + remote_docker_proc = {} + for remote_host in remote_hosts: + assert handle_cast(conn=conn, cmd="host"), remote_host + env_proto_txt = handle_cast(conn=conn, cmd="env_proto") + print("[docker agent]", f"[{remote_host}]", env_proto_txt) + f = tempfile.NamedTemporaryFile(mode="wb+", delete=True) + f.write(env_proto_txt.encode()) + f.flush() + subprocess.check_call( + f"rsync -azP --omit-dir-times --no-perms --no-group {f.name} {remote_host}:{workspace_dir}/env.prototxt", + shell=True, + ) + run_docker_cmd = f"ssh {remote_host} docker exec {container_name}" + run_docker_cmd += f" python3 -m oneflow --start_worker --env_proto={workspace_dir}/env.prototxt" + print("[docker agent]", run_docker_cmd) + remote_docker_proc[remote_host] = subprocess.Popen( + run_docker_cmd, shell=True + ) + handle_call(conn=conn, cmd="start_worker", response="ok") + for k, v in remote_docker_proc.items(): + assert v.wait() == 0 + + +class DockerAgent: + def __init__( + self, + port=None, + authkey=None, + this_host=None, + remote_hosts=None, + container_name=None, + timeout=None, + workspace_dir=None, + img_tag=None, + oneflow_wheel_path=None, + oneflow_build_path=None, + oneflow_test_tmp_dir=None, + ) -> None: + # info + self.this_host = this_host + self.remote_hosts = remote_hosts + self.container_name = container_name + self.timeout = timeout + self.common_docker_args = "--privileged --rm --network host --shm-size=8g -v $HOME:$HOME -v /dataset:/dataset -v /model_zoo:/model_zoo" + self.workspace_dir = workspace_dir + self.img_tag = img_tag + self.oneflow_wheel_path = oneflow_wheel_path + self.oneflow_build_path = oneflow_build_path + self.oneflow_test_tmp_dir = oneflow_test_tmp_dir + # impl + self.env_proto_txt = None + self.bash_tmp_file = None + self.bash_proc = None + self.remote_docker_proc = {} + self.agent_port = port + self.agent_authkey = authkey + + def __enter__(self): + return self + + def run_bash_script_async(self, bash_script=None, cmd=None): + remote_hosts_str = ",".join(self.remote_hosts) + ctrl_port = find_free_port() + data_port = find_free_port() + exports = f""" export ONEFLOW_TEST_MASTER_PORT={ctrl_port} export ONEFLOW_TEST_DATA_PORT={data_port} -export ONEFLOW_TEST_SSH_PORT={ssh_port} -export ONEFLOW_TEST_LOG_DIR={log_dir} -export ONEFLOW_TEST_NODE_LIST="{this_host},{remote_host}" +export ONEFLOW_TEST_NODE_LIST="{self.this_host},{remote_hosts_str}" export ONEFLOW_WORKER_KEEP_LOG=1 -export ONEFLOW_TEST_TMP_DIR="./distributed-tmp" +export ONEFLOW_TEST_TMP_DIR="{self.oneflow_test_tmp_dir}" export NCCL_DEBUG=INFO +export ONEFLOW_TEST_WORKER_AGENT_PORT={agent_port} +export ONEFLOW_TEST_WORKER_AGENT_AUTHKEY={agent_authkey} """ - if oneflow_worker_bin: - exports += f"export ONEFLOW_WORKER_BIN={oneflow_worker_bin}\n" - if oneflow_wheel_path: - exports += f"export ONEFLOW_WHEEL_PATH={oneflow_wheel_path}\n" - bash_cmd = f"""set -ex + if self.oneflow_wheel_path: + exports += f"python3 -m pip install {self.oneflow_wheel_path}" + if self.oneflow_build_path: + exports += f"export PYTHONPATH={self.oneflow_build_path}/python_scripts:$PYTHONPATH\n" + bash_cmd = None + if bash_script: + assert os.path.exists(bash_script) + bash_cmd = f"""set -ex {exports} -rm -rf ~/.ssh -cp -r /dotssh ~/.ssh -{FIX_SSH_PERMISSION} bash {bash_script} """ - artifact_cmd = f"""set -ex + elif cmd: + bash_cmd = f"""set -ex {exports} -rm -rf ~/.ssh -cp -r /dotssh ~/.ssh -{FIX_SSH_PERMISSION} -mkdir -p oneflow_temp -rm -rf oneflow_temp/{remote_host} -scp -P {ssh_port} -r {remote_host}:~/oneflow_temp oneflow_temp/{remote_host} -rm -f oneflow_temp/{remote_host}/*/oneflow_worker -chmod -R o+w oneflow_temp -chmod -R o+r oneflow_temp +{cmd} """ - returncode = None + else: + raise ValueError("not impl") + assert bash_cmd - def get_docker_cmd(f, cmd): - f_name = f.name - print(cmd, flush=True) - f.write(cmd) - f.flush() - return f"docker run --privileged --network host --shm-size=8g --rm -v /tmp:/host/tmp -v $PWD:$PWD -v $HOME:$HOME -w $PWD -v {dotssh_dir}:/dotssh -v /dataset:/dataset -v /model_zoo:/model_zoo oneflow-test:$USER bash /host{f_name}" + def get_docker_cmd(f, cmd): + f_name = f.name + f.write(cmd) + f.flush() + return f"docker run {self.common_docker_args} -v /tmp:/host/tmp:ro -v $PWD:$PWD -w $PWD --name {self.container_name} {self.img_tag} bash /host{f_name}" - with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f: + f = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", delete=True) run_docker_cmd = get_docker_cmd(f, bash_cmd) - returncode = subprocess.call(run_docker_cmd, shell=True, timeout=timeout) + self.bash_tmp_file = f + self.bash_proc = subprocess.Popen(run_docker_cmd, shell=True) - with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f: - artifact_docker_cmd = get_docker_cmd(f, artifact_cmd) - subprocess.check_call(artifact_docker_cmd, shell=True, timeout=timeout) + def block(self): + from multiprocessing import Process - if returncode != 0: - raise ValueError(run_docker_cmd) + p = None + kwargs = { + "agent_port": self.agent_port, + "agent_authkey": self.agent_authkey, + "remote_hosts": self.remote_hosts, + } + p = Process(target=wait_for_env_proto_and_launch_workers, kwargs=kwargs,) + p.start() + print("[docker agent]", "blocking") + while self.bash_proc.poll() is None and p.is_alive() == True: + pass + p.terminate() + assert self.bash_proc.returncode == 0 + print("[docker agent]", "bash execution done") + def __exit__(self, exc_type, exc_val, exc_tb): + pass -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument( - "--launch_remote_container", action="store_true", required=False, default=False + +async def fix_and_sync_libs(oneflow_internal_path=None, remote_hosts=None): + tmp_dir = tempfile.TemporaryDirectory() + tmp_lib_dir = os.path.join(tmp_dir.name, "libs") + os.mkdir(tmp_lib_dir) + await spawn_shell_and_check( + """ldd file | grep "=> /" | awk '{print $3}' | xargs -I '{}' cp -v '{}' destination""".replace( + "file", oneflow_internal_path + ).replace( + "destination", tmp_lib_dir + ), ) - parser.add_argument( - "--make_dotssh", action="store_true", required=False, default=False + libs = os.listdir(tmp_lib_dir) + assert len(libs) > 0 + excludelist_path = os.path.join( + pathlib.Path(__file__).parent.absolute(), "excludelist" ) - parser.add_argument("--run", action="store_true", required=False, default=False) + excludelist = open(excludelist_path).read().split("\n") + await spawn_shell_and_check(f"cp {oneflow_internal_path} {tmp_dir.name}") + + def handle_lib(lib): + if lib in excludelist or "libpython" in lib: + print("excluding", lib) + return spawn_shell_and_check(f"rm {tmp_lib_dir}/{lib}") + else: + print("keeping", lib) + return spawn_shell_and_check( + f"patchelf --set-rpath '$ORIGIN' {tmp_lib_dir}/{lib}" + ) + + await asyncio.gather(*(handle_lib(lib) for lib in libs)) + + tmp_oneflow_internal_path = os.path.join( + tmp_dir.name, pathlib.Path(oneflow_internal_path).name + ) + print("before fixing .so") + await spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}") + print("fixing .so") + await spawn_shell_and_check( + f"patchelf --set-rpath '$ORIGIN/libs' {tmp_oneflow_internal_path}" + ) + + await asyncio.gather( + *[ + spawn_shell_and_check( + f"ssh {remote_host} 'mkdir -p {workspace_dir}/python_scripts/oneflow/libs'", + ) + for remote_host in remote_hosts + ] + ) + + async def copy_file(path=None, remote_host=None): + relpath = os.path.relpath(path, tmp_dir.name) + await spawn_shell_and_check( + f"scp {path} {remote_host}:{workspace_dir}/python_scripts/oneflow/{relpath}", + ) + + files = [ + os.path.join(root, name) + for root, dirs, files in os.walk(tmp_dir.name, topdown=True) + for name in files + ] + + await asyncio.gather( + *[ + copy_file(path=f, remote_host=remote_host) + for remote_host in remote_hosts + for f in files + ], + spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}"), + ) + + +def get_remote_hosts(args): + remote_hosts = None + if len(args.remote_host) == 1: + remote_hosts = args.remote_host.split(",") + elif len(args.remote_host) == 0: + affiliations = get_affiliations(this_host) + assert ( + affiliations + ), f"no affiliated node found for {this_host}, you should specify one" + remote_host = affiliations[0] + remote_host = socket.gethostbyname(remote_host) + remote_hosts = [remote_host] + else: + remote_hosts = args.remote_host + return remote_hosts + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--debug", action="store_true", required=False, default=False) parser.add_argument( - "--build_docker_img", action="store_true", required=False, default=False + "--skip_libs", action="store_true", required=False, default=False ) parser.add_argument("--bash_script", type=str, required=False) default_this_host = socket.gethostname() parser.add_argument( "--this_host", type=str, required=False, default=default_this_host ) - parser.add_argument("--remote_host", type=str, required=False) - default_dotssh_dir = os.path.expanduser("~/distributed_run_dotssh") + parser.add_argument("--remote_host", action="append", default=[]) + parser.add_argument("--oneflow_wheel_path", type=str, required=False, default=None) + parser.add_argument("--oneflow_build_path", type=str, required=False, default=None) + parser.add_argument("--custom_img_tag", type=str, required=False, default=None) + parser.add_argument("--cmd", type=str, required=False, default=None) parser.add_argument( - "--dotssh_dir", type=str, required=False, default=default_dotssh_dir + "--oneflow_test_tmp_dir", type=str, required=False, default="distributed-tmp" ) - parser.add_argument("--oneflow_worker_bin", type=str, required=False, default=None) - parser.add_argument("--oneflow_wheel_path", type=str, required=False, default=None) - parser.add_argument("--ssh_port", type=int, required=False, default=None) parser.add_argument("--timeout", type=int, required=False, default=6 * 60 * 60) args = parser.parse_args() - ssh_port = None - if args.ssh_port: - ssh_port = args.ssh_port - else: - ssh_port = find_free_port() - assert ssh_port - if args.make_dotssh: - make_dotssh(args.dotssh_dir) - + assert bool(args.oneflow_wheel_path) != bool(args.oneflow_build_path) + assert bool(args.bash_script) != bool(args.cmd) + if args.skip_libs: + assert args.debug, "--skip_libs only works with --debug" + assert ( + args.oneflow_build_path + ), "--skip_libs only works with --oneflow_build_path" + oneflow_wheel_path = args.oneflow_wheel_path + if oneflow_wheel_path and os.path.isdir(oneflow_wheel_path): + whl_paths = [ + name for name in glob.glob(os.path.join(oneflow_wheel_path, f"*.whl",)) + ] + assert len(whl_paths) == 1 + oneflow_wheel_path = whl_paths[0] this_host = args.this_host this_host = resolve_hostname_hardcoded(this_host) - remote_host = None - if args.remote_host: - assert len(args.remote_host.split(",")) == 1, "only support 2-nodes run for now" - remote_host = args.remote_host - else: - affiliations = get_affiliations(this_host) - assert ( - affiliations - ), f"no affiliated node found for {this_host}, you should specify one" - remote_host = affiliations[0] - remote_host = socket.gethostbyname(remote_host) + remote_hosts = get_remote_hosts(args) - print(f"this_host: {this_host}, remote_host: {remote_host}", flush=True) + print(f"this_host: {this_host}, remote_hosts: {remote_hosts}", flush=True) + sub_dir = str(uuid.uuid4()) + if args.debug: + sub_dir = "debug" workspace_dir = os.path.join( - os.path.expanduser("~"), "distributed_run_workspace", str(uuid.uuid4()) + os.path.expanduser("~"), "distributed_run_workspace", sub_dir ) - create_remote_workspace_dir(remote_host, workspace_dir) - if args.launch_remote_container: - launch_remote_container(remote_host, ssh_port, args.timeout, args.dotssh_dir) - if args.build_docker_img: - build_docker_img() - build_docker_img(remote_host, workspace_dir) - if args.run: - launch_remote_container( - remote_host, ssh_port, args.timeout, args.dotssh_dir, workspace_dir, + print("workspace_dir", workspace_dir) + loop = asyncio.get_event_loop() + loop.run_until_complete( + asyncio.gather( + *[ + create_remote_workspace_dir( + remote_host=remote_host, workspace_dir=workspace_dir + ) + for remote_host in remote_hosts + ] ) - assert args.bash_script - run_bash_script( - args.bash_script, - args.timeout, - ssh_port, - args.dotssh_dir, - this_host, - remote_host, - args.oneflow_worker_bin, - args.oneflow_wheel_path, + ) + if args.oneflow_build_path: + so_paths = [ + name + for name in glob.glob( + os.path.join( + args.oneflow_build_path, + f"python_scripts/oneflow/_oneflow_internal.*.so", + ) + ) + ] + assert len(so_paths) == 1, so_paths + oneflow_internal_path = so_paths[0] + oneflow_internal_path = os.path.join( + args.oneflow_build_path, oneflow_internal_path ) - exit(0) + tmp_dir = None + print("copying python_scripts dir") + loop.run_until_complete( + asyncio.gather( + *[ + spawn_shell_and_check( + f"rsync -azP --omit-dir-times --no-perms --no-group --copy-links --include='*.py' --exclude='*.so' --exclude='__pycache__' --exclude='python_scripts/oneflow/include' --include='*/' --exclude='*' {args.oneflow_build_path}/python_scripts {remote_host}:{workspace_dir}" + ) + for remote_host in remote_hosts + ] + ) + ) + if args.skip_libs == False: + print("copying .so") + loop.run_until_complete( + fix_and_sync_libs( + oneflow_internal_path=oneflow_internal_path, + remote_hosts=remote_hosts, + ) + ) + elif oneflow_wheel_path: + loop.run_until_complete( + asyncio.gather( + *[ + spawn_shell_and_check( + f"rsync -azP --omit-dir-times --no-perms --no-group {oneflow_wheel_path} {remote_host}:{workspace_dir}" + ) + for remote_host in remote_hosts + ] + ) + ) + default_docker_image = "oneflow-test:$USER" + ci_user_docker_image = "oneflow-test:ci-user" + img_tag = None + if args.custom_img_tag == None: + if is_img_existing(default_docker_image): + img_tag = default_docker_image + elif is_img_existing(ci_user_docker_image): + img_tag = ci_user_docker_image + else: + loop.run_until_complete( + asyncio.gather( + *[ + build_docker_img( + remote_host=remote_host, workspace_dir=workspace_dir + ) + for remote_host in remote_hosts + ], + build_docker_img(workspace_dir=workspace_dir), + ) + ) + img_tag = default_docker_image + else: + img_tag = args.custom_img_tag + assert img_tag + agent_port = find_free_port() + agent_authkey = str(uuid.uuid4()) + container_name = ( + getpass.getuser() + + "-distributed-run-main-node-at-" + + this_host.replace(".", "-") + ) + + def exit_handler(): + print( + "---------start cleanup, you should ignore errors below and check the errors above---------" + ) + if args.oneflow_build_path: + print("fixing permission of", args.oneflow_build_path) + subprocess.call( + f"docker run --rm -v {args.oneflow_build_path}:/p -w /p busybox chmod -R o+w .", + shell=True, + ) + loop.run_until_complete( + asyncio.gather( + *[ + spawn_shell( + f"ssh {remote_host} docker run --rm -v {workspace_dir}:/p -w /p busybox chmod -R 777 .", + ) + for remote_host in remote_hosts + ], + ) + ) + print("copying artifacts") + loop.run_until_complete( + asyncio.gather( + *[ + spawn_shell( + f"rsync -azP --omit-dir-times --no-perms --no-group --exclude='*.whl' --exclude='python_scripts' {remote_host}:{workspace_dir}/ {args.oneflow_test_tmp_dir}/{remote_host}" + ) + for remote_host in remote_hosts + ] + ) + ) + assert workspace_dir + if args.debug == False: + loop.run_until_complete( + asyncio.gather( + *[ + spawn_shell(f"ssh {remote_host} rm -rf {workspace_dir}",) + for remote_host in remote_hosts + ], + ) + ) + print("removing docker container:", container_name) + rm_cmd = f"docker rm -f {container_name}" + loop.run_until_complete( + asyncio.gather( + *[ + spawn_shell(f"ssh {remote_host} {rm_cmd}") + for remote_host in remote_hosts + ], + spawn_shell(rm_cmd), + ) + ) + + atexit.register(exit_handler) + loop.run_until_complete( + asyncio.gather( + *[ + launch_remote_container( + remote_host=remote_host, + survival_time=args.timeout, + workspace_dir=workspace_dir, + container_name=container_name, + oneflow_wheel_path=oneflow_wheel_path, + oneflow_build_path=args.oneflow_build_path, + img_tag=img_tag, + ) + for remote_host in remote_hosts + ], + ) + ) + + with DockerAgent( + port=agent_port, + authkey=agent_authkey.encode(), + this_host=this_host, + remote_hosts=remote_hosts, + container_name=container_name, + workspace_dir=workspace_dir, + oneflow_wheel_path=oneflow_wheel_path, + oneflow_build_path=args.oneflow_build_path, + img_tag=img_tag, + oneflow_test_tmp_dir=args.oneflow_test_tmp_dir, + ) as agent: + if args.bash_script: + agent.run_bash_script_async(bash_script=args.bash_script,) + elif args.cmd: + agent.run_bash_script_async(cmd=args.cmd,) + agent.block() diff --git a/ci/test/excludelist b/ci/test/excludelist new file mode 100644 index 0000000000000000000000000000000000000000..662b06b0fbf5b535636089e54258dfcad5f890ad --- /dev/null +++ b/ci/test/excludelist @@ -0,0 +1,222 @@ +# This file lists libraries that we will assume to be present on the host system and hence +# should NOT be bundled inside AppImages. This is a working document; expect it to change +# over time. File format: one filename per line. Each entry should have a justification comment. + +# See the useful tool at https://abi-laboratory.pro/index.php?view=navigator&symbol=hb_buffer_set_cluster_level#result +# to investigate issues with missing symbols. + +ld-linux.so.2 +ld-linux-x86-64.so.2 +libanl.so.1 +libBrokenLocale.so.1 +libcidn.so.1 +# libcrypt.so.1 # Not part of glibc anymore as of Fedora 30. See https://github.com/slic3r/Slic3r/issues/4798 and https://pagure.io/fedora-docs/release-notes/c/01d74b33564faa42959c035e1eee286940e9170e?branch=f28 +libc.so.6 +libdl.so.2 +libm.so.6 +libmvec.so.1 +# libnsl.so.1 # Not part of glibc anymore as of Fedora 28. See https://github.com/RPCS3/rpcs3/issues/5224#issuecomment-434930594 +libnss_compat.so.2 +# libnss_db.so.2 # Not part of neon-useredition-20190321-0530-amd64.iso +libnss_dns.so.2 +libnss_files.so.2 +libnss_hesiod.so.2 +libnss_nisplus.so.2 +libnss_nis.so.2 +libpthread.so.0 +libresolv.so.2 +librt.so.1 +libthread_db.so.1 +libutil.so.1 +# These files are all part of the GNU C Library which should never be bundled. +# List was generated from a fresh build of glibc 2.25. + +libstdc++.so.6 +# Workaround for: +# usr/lib/libstdc++.so.6: version `GLIBCXX_3.4.21' not found + +libGL.so.1 +# The above may be missing on Chrome OS, https://www.reddit.com/r/Crostini/comments/d1lp67/ultimaker_cura_no_longer_running_as_an_appimage/ +libEGL.so.1 +# Part of the video driver (OpenGL); present on any regular +# desktop system, may also be provided by proprietary drivers. +# Known to cause issues if it's bundled. + +libGLdispatch.so.0 +libGLX.so.0 +# reported to be superfluent and conflicting system libraries (graphics driver) +# see https://github.com/linuxdeploy/linuxdeploy/issues/89 + +libOpenGL.so.0 +# Qt installed via install-qt.sh apparently links to this library +# part of OpenGL like libGL/libEGL, so excluding it should not cause any problems +# https://github.com/linuxdeploy/linuxdeploy/issues/152 + +libdrm.so.2 +# Workaround for: +# Antergos Linux release 2015.11 (ISO-Rolling) +# /usr/lib/libdrm_amdgpu.so.1: error: symbol lookup error: undefined symbol: drmGetNodeTypeFromFd (fatal) +# libGL error: unable to load driver: swrast_dri.so +# libGL error: failed to load driver: swrast +# Unrecognized OpenGL version + +libglapi.so.0 +# Part of mesa +# known to cause problems with graphics, see https://github.com/RPCS3/rpcs3/issues/4427#issuecomment-381674910 + +libgbm.so.1 +# Part of mesa +# https://github.com/probonopd/linuxdeployqt/issues/390#issuecomment-529036305 + +libxcb.so.1 +# Workaround for: +# Fedora 23 +# symbol lookup error: /lib64/libxcb-dri3.so.0: undefined symbol: xcb_send_fd +# Uncertain if this is required to be bundled for some distributions - if so we need to write a version check script and use LD_PRELOAD to load the system version if it is newer +# Fedora 25: +# undefined symbol: xcb_send_request_with_fds +# https://github.com/AppImage/AppImages/issues/128 + +libX11.so.6 +# Workaround for: +# Fedora 23 +# symbol lookup error: ./lib/libX11.so.6: undefined symbol: xcb_wait_for_reply64 +# Uncertain if this is required to be bundled for some distributions - if so we need to write a version check script and use LD_PRELOAD to load the system version if it is newer + +libgio-2.0.so.0 +# Workaround for: +# On Ubuntu, "symbol lookup error: /usr/lib/x86_64-linux-gnu/gtk-2.0/modules/liboverlay-scrollbar.so: undefined symbol: g_settings_new" + +# libgdk-x11-2.0.so.0 # Missing on openSUSE-Tumbleweed-KDE-Live-x86_64-Snapshot20170601-Media.iso +# libgtk-x11-2.0.so.0 # Missing on openSUSE-Tumbleweed-KDE-Live-x86_64-Snapshot20170601-Media.iso + +libasound.so.2 +# Workaround for: +# No sound, e.g., in VLC.AppImage (does not find sound cards) + +# https://github.com/AppImage/pkg2appimage/issues/475 +# libgdk_pixbuf-2.0.so.0 +# Was: Workaround for: +# On Ubuntu, get (inkscape:25621): GdkPixbuf-WARNING **: Error loading XPM image loader: Image type 'xpm' is not supported + +libfontconfig.so.1 +# Workaround for: +# Application stalls when loading fonts during application launch; e.g., KiCad on ubuntu-mate + +libthai.so.0 +# Workaround for: +# audacity: /tmp/.mount_AudaciUsFbON/usr/lib/libthai.so.0: version `LIBTHAI_0.1.25' not found (required by /usr/lib64/libpango-1.0.so.0) +# on openSUSE Tumbleweed + +# other "low-level" font rendering libraries +# should fix https://github.com/probonopd/linuxdeployqt/issues/261#issuecomment-377522251 +# and https://github.com/probonopd/linuxdeployqt/issues/157#issuecomment-320755694 +libfreetype.so.6 +libharfbuzz.so.0 + +# Note, after discussion we do not exlude this, but we can use a dummy library that just does nothing +# libselinux.so.1 +# Workaround for: +# sed: error while loading shared libraries: libpcre.so.3: cannot open shared object file: No such file or directory +# Some distributions, such as Arch Linux, do not come with libselinux.so.1 by default. +# The solution is to bundle a dummy mock library: +# echo "extern int is_selinux_enabled(void){return 0;}" >> selinux-mock.c +# gcc -s -shared -o libselinux.so.1 -Wl,-soname,libselinux.so.1 selinux-mock.c +# strip libselinux.so.1 +# More information: https://github.com/AppImage/AppImages/issues/83 +# and https://github.com/AppImage/AppImageKit/issues/775#issuecomment-614954821 +# https://gitlab.com/sulinos/devel/libselinux-dummy + +# The following are assumed to be part of the base system +# Removing these has worked e.g., for Krita. Feel free to report if +# you think that some of these should go into AppImages and why. +libcom_err.so.2 +libexpat.so.1 +libgcc_s.so.1 +libglib-2.0.so.0 +libgpg-error.so.0 +# libgssapi_krb5.so.2 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there +# libgssapi.so.3 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23 +# libhcrypto.so.4 # Missing on openSUSE LEAP 42.0 +# libheimbase.so.1 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23 +# libheimntlm.so.0 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23 +# libhx509.so.5 # Missing on openSUSE LEAP 42.0 +libICE.so.6 +# libidn.so.11 # Does not come with Solus by default +# libk5crypto.so.3 # Runnning AppImage built on Debian 9 or Ubuntu 16.04 on an Archlinux fails otherwise; https://github.com/AppImage/AppImages/issues/301 +# libkeyutils.so.1 # Does not come with Void Linux by default; https://github.com/Subsurface-divelog/subsurface/issues/1971#issuecomment-466606834 +# libkrb5.so.26 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there. Missing on openSUSE LEAP 42.0 +# libkrb5.so.3 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there +# libkrb5support.so.0 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there +libp11-kit.so.0 +# libpcre.so.3 # Missing on Fedora 24, SLED 12 SP1, and openSUSE Leap 42.2 +# libroken.so.18 # Mission on openSUSE LEAP 42.0 +# libsasl2.so.2 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23 +libSM.so.6 +libusb-1.0.so.0 +libuuid.so.1 +# libwind.so.0 # Missing on openSUSE LEAP 42.0 + +# Potentially dangerous libraries +libgobject-2.0.so.0 + +# Workaround for: +# Rectangles instead of fonts +# https://github.com/AppImage/AppImages/issues/240 +libpangoft2-1.0.so.0 +libpangocairo-1.0.so.0 +libpango-1.0.so.0 + +# FIXME: +# Can get symbol lookup error: /lib64/libpango-1.0.so.0: undefined symbol: g_log_structured_standard +# if libcairo is bundled but libpango is not + +# Workaround for: +# e.g., Spotify +# relocation error: /lib/x86_64-linux-gnu/libgcrypt.so.20: +# symbol gpgrt_lock_lock, version GPG_ERROR_1.0 not defined +# in file libgpg-error.so.0 with link time reference +libgpg-error.so.0 + +libjack.so.0 +# it must match the ABI of the JACK server which is installed in the base system +# rncbc confirmed this +# However, this library is missing on Fedora-WS-Live-31-1-9 +# which means that we should avoid using JACK altogether if possible + +# Unsolved issue: +# https://github.com/probonopd/linuxdeployqt/issues/35 +# Error initializing NSS with a persistent database (sql:/home/me/.pki/nssdb): libsoftokn3.so: cannot open shared object file: No such file or directory +# Error initializing NSS without a persistent database: NSS error code: -5925 +# nss_error=-5925, os_error=0 +# libnss3.so should not be removed from the bundles, as this causes other issues, e.g., +# https://github.com/probonopd/linuxdeployqt/issues/35#issuecomment-256213517 +# and https://github.com/AppImage/AppImages/pull/114 +# libnss3.so + +# The following cannot be excluded, see +# https://github.com/AppImage/AppImages/commit/6c7473d8cdaaa2572248dcc53d7f617a577ade6b +# http://stackoverflow.com/questions/32644157/forcing-a-binary-to-use-a-specific-newer-version-of-a-shared-library-so +# libssl.so.1 +# libssl.so.1.0.0 +# libcrypto.so.1 +# libcrypto.so.1.0.0 + +# According to https://github.com/RicardoEPRodrigues/3Engine/issues/4#issuecomment-511598362 +# libGLEW is not tied to a specific GPU. It's linked against libGL.so.1 +# and that one is different depending on the installed driver. +# In fact libGLEW is changing its soversion very often, so you should always bundle libGLEW.so.2.0 + +# libglut.so.3 # to be confirmed + +libxcb-dri3.so.0 # https://github.com/AppImage/AppImages/issues/348 +libxcb-dri2.so.0 # https://github.com/probonopd/linuxdeployqt/issues/331#issuecomment-442276277 + +# If the next line turns out to cause issues, we will have to remove it again and find another solution +libfribidi.so.0 # https://github.com/olive-editor/olive/issues/221 and https://github.com/knapsu/plex-media-player-appimage/issues/14 + +# Workaround for: +# symbol lookup error: /lib/x86_64-linux-gnu/libgnutls.so.30: undefined symbol: __gmpz_limbs_write +# https://github.com/ONLYOFFICE/appimage-desktopeditors/issues/3 +# Apparently coreutils depends on it, so it should be safe to assume that it comes with every target system +libgmp.so.10 diff --git a/cmake/oneflow.cmake b/cmake/oneflow.cmake index f5404ec6f51ff3f4a0ca958435c1baefd0172a41..041856c9896ce9069cccfcb841660509a6beefc7 100644 --- a/cmake/oneflow.cmake +++ b/cmake/oneflow.cmake @@ -145,10 +145,6 @@ foreach(oneflow_single_file ${oneflow_all_src}) if(RPC_BACKEND MATCHES "GRPC") list(APPEND of_transport_test_cc ${oneflow_single_file}) endif() - elseif("${oneflow_single_file}" MATCHES "^${PROJECT_SOURCE_DIR}/oneflow/core/job/oneflow_worker.cpp$") - if (RPC_BACKEND MATCHES "GRPC") - list(APPEND of_main_cc ${oneflow_single_file}) - endif() elseif("${oneflow_single_file}" MATCHES "^${PROJECT_SOURCE_DIR}/oneflow/(core|user|xrt)/.*_test\\.cpp$") # test file list(APPEND of_all_test_cc ${oneflow_single_file}) diff --git a/oneflow/core/job/oneflow_worker.cpp b/oneflow/core/job/oneflow_worker.cpp deleted file mode 100644 index 44cc4d65634c27da1a31ecd7abbee1c2bfa802e8..0000000000000000000000000000000000000000 --- a/oneflow/core/job/oneflow_worker.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2020 The OneFlow Authors. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -#include "oneflow/core/common/maybe.h" -#include "oneflow/core/job/oneflow.h" -#include "oneflow/core/job/env_global_objects_scope.h" -#include "oneflow/core/job/session_global_objects_scope.h" -#include "oneflow/core/job/env.pb.h" -#include "oneflow/core/job/cluster.h" -#include "oneflow/core/job/cluster_instruction.h" -#include "oneflow/core/control/ctrl_client.h" -#include "oneflow/core/control/ctrl_server.h" -#include "oneflow/core/persistence/tee_persistent_log_stream.h" - -namespace oneflow { - -namespace { - -Maybe<void> Run(const std::string& env_proto_filepath) { - EnvProto env_proto; - ParseProtoFromTextFile(env_proto_filepath, &env_proto); - CHECK_ISNULL_OR_RETURN(Global<EnvGlobalObjectsScope>::Get()); - // Global<T>::New is not allowed to be called here - // because glog is not constructed yet and LOG(INFO) has bad bahavior - Global<EnvGlobalObjectsScope>::SetAllocated(new EnvGlobalObjectsScope()); - JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto)); - JUST(Cluster::WorkerLoop()); - Global<EnvGlobalObjectsScope>::Delete(); - return Maybe<void>::Ok(); -} - -} // namespace - -} // namespace oneflow - -DEFINE_string(env_proto, "", "EnvProto file path"); - -int main(int argc, char* argv[]) { - using namespace oneflow; - gflags::ParseCommandLineFlags(&argc, &argv, true); - CHECK_JUST(Run(FLAGS_env_proto)); - return 0; -} diff --git a/oneflow/python/deprecated/init_cluster_env.py b/oneflow/python/deprecated/init_cluster_env.py index fac9d6bcd8dd3bf6757dc7968860365e5a5f2df3..8d6c4b473cba468640f8048e3289a910f5f4c35d 100644 --- a/oneflow/python/deprecated/init_cluster_env.py +++ b/oneflow/python/deprecated/init_cluster_env.py @@ -29,108 +29,6 @@ from oneflow.python.oneflow_export import oneflow_export import subprocess -@oneflow_export("deprecated.init_worker") -def init_worker( - scp_binary: bool = True, - use_uuid: bool = True, - ssh_port=22, - bootstrap_conf_list=None, -) -> None: - assert type(env_util.default_env_proto) is EnvProto - env_util.defautl_env_proto_mutable = False - env_proto = env_util.default_env_proto - assert len(env_proto.machine) > 0 or len(bootstrap_conf_list) > 0 - assert type(scp_binary) is bool - assert type(use_uuid) is bool - oneflow_worker_path = os.getenv("ONEFLOW_WORKER_BIN") - assert oneflow_worker_path is not None, "please set env ONEFLOW_WORKER_BIN" - assert os.path.isfile( - oneflow_worker_path - ), "binary oneflow_worker not found, please check your environment variable ONEFLOW_WORKER_BIN, path: {}".format( - oneflow_worker_path - ) - global _temp_run_dir - if use_uuid: - assert scp_binary is True - _temp_run_dir = os.getenv("HOME") + "/oneflow_temp/" + str(uuid.uuid1()) - else: - _temp_run_dir = os.getenv("HOME") + "/oneflow_temp/no_uuid" - run_dir = _temp_run_dir - run_dir = os.path.abspath(os.path.expanduser(run_dir)) - if bootstrap_conf_list is None: - env_file = NamedTemporaryFile(delete=False) - if sys.version_info >= (3, 0): - env_file.write(pbtxt.MessageToString(env_proto).encode()) - else: - env_file.write(pbtxt.MessageToString(env_proto)) - env_file.close() - - for machine in env_proto.machine: - if machine.id == 0: - pass - else: - _SendBinaryAndConfig2Worker( - machine.addr, - oneflow_worker_path, - env_file.name, - run_dir, - scp_binary, - ssh_port, - ) - - os.remove(env_file.name) - else: - worker_env_proto = EnvProto() - worker_env_proto.CopyFrom(env_proto) - worker_env_proto.ClearField("ctrl_bootstrap_conf") - for bootstrap_conf in bootstrap_conf_list: - if bootstrap_conf.rank == 0: - continue - # set ctrl_bootstrap_conf of worker - assert bootstrap_conf.HasField("host") - worker_env_proto.ctrl_bootstrap_conf.CopyFrom(bootstrap_conf) - env_file = NamedTemporaryFile(delete=False) - if sys.version_info >= (3, 0): - env_file.write(pbtxt.MessageToString(worker_env_proto).encode()) - else: - env_file.write(pbtxt.MessageToString(worker_env_proto)) - env_file.close() - _SendBinaryAndConfig2Worker( - bootstrap_conf.host, - oneflow_worker_path, - env_file.name, - run_dir, - scp_binary, - ssh_port, - ) - os.remove(env_file.name) - - -@oneflow_export("deprecated.delete_worker") -def delete_worker(ssh_port=22) -> None: - ssh_port_arg = " -p {} ".format(ssh_port) - # assert env_util.env_proto_mutable == False - env_proto = env_util.default_env_proto - assert isinstance(env_proto, EnvProto) - global _temp_run_dir - assert _temp_run_dir != "" - for machine in env_proto.machine: - if machine.id == 0: - continue - ssh_prefix = ( - "ssh {} ".format(ssh_port_arg) - + getpass.getuser() - + "@" - + machine.addr - + " " - ) - if os.getenv("ONEFLOW_WORKER_KEEP_LOG"): - print("worker log kept at: {}".format(machine.addr), flush=True) - else: - _SystemCall(ssh_prefix + '"rm -r ' + _temp_run_dir + '"') - print("temp run dir removed at: {}".format(machine.addr), flush=True) - - @oneflow_export("deprecated.delete_worker_by_bootstrap") def delete_worker_by_bootstrap(ssh_port=22) -> None: ssh_port_arg = " -p {} ".format(ssh_port) @@ -166,63 +64,4 @@ def delete_worker_of_multi_process(run_dir) -> None: print("temp run dir removed at localhost:" + run_dir, flush=True) -def _SendBinaryAndConfig2Worker( - addr, oneflow_worker_path, env_proto_path, run_dir, scp_binary, ssh_port -): - ssh_port_arg = " -p {} ".format(ssh_port) - scp_port_arg = " -P {} ".format(ssh_port) - _SystemCall( - "ssh-copy-id {} -f ".format(ssh_port_arg) + getpass.getuser() + "@" + addr - ) - ssh_prefix = "ssh {}".format(ssh_port_arg) + getpass.getuser() + "@" + addr + " " - remote_file_prefix = " " + getpass.getuser() + "@" + addr + ":" - assert run_dir != "" - _SystemCall(ssh_prefix + '"mkdir -p ' + run_dir + '"') - if scp_binary: - _SystemCall( - "scp {}".format(scp_port_arg) - + oneflow_worker_path - + remote_file_prefix - + run_dir - + "/oneflow_worker" - ) - _SystemCall( - "scp {}".format(scp_port_arg) - + env_proto_path - + remote_file_prefix - + run_dir - + "/env.proto" - ) - oneflow_libibverbs_path = os.getenv("ONEFLOW_LIBIBVERBS_PATH") - libibverbs_env_str = "" - if oneflow_libibverbs_path: - libibverbs_env_str = "ONEFLOW_LIBIBVERBS_PATH=" + oneflow_libibverbs_path + " " - oneflow_cmd = ( - '"cd ' - + run_dir - + "; " - + libibverbs_env_str - + "nohup ./oneflow_worker -logtostderr=0 -log_dir=./log -v=0 -logbuflevel=-1 " - + "-env_proto=./env.proto " - + ' 1>/dev/null 2>&1 </dev/null & "' - ) - _SystemCall(ssh_prefix + oneflow_cmd) - proc = subprocess.Popen( - ssh_prefix + "ps aux", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - shell=True, - ) - outs, errs = proc.communicate(timeout=5) - print(outs) - assert "oneflow_worker" in str(outs), "fail to start oneflow_worker" - print("oneflow worker initialized:", addr, flush=True) - - -def _SystemCall(cmd): - print(cmd, flush=True) - subprocess.check_call(cmd, shell=True) - - _temp_run_dir = "" diff --git a/oneflow/python/framework/unittest.py b/oneflow/python/framework/unittest.py index 784905412679800fd709fb8901714d37a65bf54f..0459f056c13aba62b92e029b37e465184b230871 100644 --- a/oneflow/python/framework/unittest.py +++ b/oneflow/python/framework/unittest.py @@ -17,9 +17,7 @@ from __future__ import absolute_import import os import sys -import getpass import imp -import inspect import socket from contextlib import closing import uuid @@ -33,7 +31,6 @@ from oneflow.core.job.env_pb2 import EnvProto from oneflow.python.oneflow_export import oneflow_export from typing import Any, Dict, Callable import subprocess -import platform class _ClearDefaultSession(object): @@ -169,6 +166,55 @@ _unittest_env_initilized = False _unittest_worker_initilized = False +def worker_agent_port(): + port_txt = os.getenv("ONEFLOW_TEST_WORKER_AGENT_PORT") + if port_txt: + return int(port_txt) + else: + return None + + +def worker_agent_authkey(): + key = os.getenv("ONEFLOW_TEST_WORKER_AGENT_AUTHKEY") + assert key + return key + + +def use_worker_agent(): + return worker_agent_port() is not None + + +def cast(conn=None, cmd=None, msg=None): + cmd = "cast/" + cmd + print("[unittest]", f"[{cmd}]", msg) + conn.send(cmd.encode()) + conn.send(msg.encode()) + + +def call(conn=None, cmd=None, msg=None): + cmd = "call/" + cmd + print("[unittest]", f"[{cmd}]", msg) + conn.send(cmd.encode()) + msg_ = "" + if msg is not None: + msg_ = msg + conn.send(msg_.encode()) + return conn.recv().decode() + + +def launch_worker_via_agent(host=None, env_proto=None): + print("[unittest]", "launching worker via agent at", host) + from multiprocessing.connection import Client + + address = ("localhost", worker_agent_port()) + conn = Client(address, authkey=worker_agent_authkey().encode()) + cast(conn=conn, cmd="host", msg=host) + cast(conn=conn, cmd="env_proto", msg=pbtxt.MessageToString(env_proto)) + assert call(conn=conn, cmd="start_worker") == "ok" + print("[unittest]", "worker launched via agent at", host) + conn.close() + + @oneflow_export("unittest.TestCase") class TestCase(unittest.TestCase): def setUp(self): @@ -180,18 +226,20 @@ class TestCase(unittest.TestCase): master_port = os.getenv("ONEFLOW_TEST_MASTER_PORT") assert master_port, "env var ONEFLOW_TEST_MASTER_PORT not set" oneflow.env.ctrl_port(int(master_port)) + data_port = os.getenv("ONEFLOW_TEST_DATA_PORT") + if data_port: + oneflow.env.data_port(int(data_port)) if enable_init_by_host_list(): oneflow.env.machine(node_list()) data_port = os.getenv("ONEFLOW_TEST_DATA_PORT") - if data_port: - oneflow.env.data_port(int(data_port)) - ssh_port = os.getenv("ONEFLOW_TEST_SSH_PORT") print("initializing worker...") - oneflow.deprecated.init_worker( - scp_binary=True, use_uuid=True, ssh_port=int(ssh_port) - ) - atexit.register(oneflow.deprecated.delete_worker, ssh_port=ssh_port) - _unittest_worker_initilized = True + for machine in env_util.default_env_proto.machine: + if machine.id == 0: + pass + else: + launch_worker_via_agent( + host=machine.addr, env_proto=env_util.default_env_proto + ) else: ctrl_port = os.getenv("ONEFLOW_TEST_CTRL_PORT") config_rank_ctrl_port = -1 @@ -215,23 +263,19 @@ class TestCase(unittest.TestCase): config_rank_ctrl_port, config_node_size, ) - - data_port = os.getenv("ONEFLOW_TEST_DATA_PORT") - if data_port: - oneflow.env.data_port(int(data_port)) - - ssh_port = os.getenv("ONEFLOW_TEST_SSH_PORT") - print("initializing worker...") - oneflow.deprecated.init_worker( - scp_binary=True, - use_uuid=True, - ssh_port=int(ssh_port), - bootstrap_conf_list=bootstrap_conf_list, - ) - atexit.register( - oneflow.deprecated.delete_worker_by_bootstrap, ssh_port=ssh_port - ) - _unittest_worker_initilized = True + worker_env_proto = EnvProto() + worker_env_proto.CopyFrom(env_util.default_env_proto) + worker_env_proto.ClearField("ctrl_bootstrap_conf") + for bootstrap_conf in bootstrap_conf_list: + if bootstrap_conf.rank == 0: + continue + # set ctrl_bootstrap_conf of worker + assert bootstrap_conf.HasField("host") + worker_env_proto.ctrl_bootstrap_conf.CopyFrom(bootstrap_conf) + launch_worker_via_agent( + host=bootstrap_conf.host, env_proto=worker_env_proto + ) + _unittest_worker_initilized = True elif device_num() > 1 and enable_multi_process(): master_port = find_free_port() oneflow.env.ctrl_port(master_port)