From 37c63928dab947b61f5844c68cdf44da9248889c Mon Sep 17 00:00:00 2001
From: Shenghang Tsai <jackalcooper@gmail.com>
Date: Wed, 19 May 2021 18:06:30 +0800
Subject: [PATCH] Replace oneflow_worker with worker agent (#4900)

* naive impl

* refine

* refine

* add log

* refine

* refine

* refine

* refine

* add todo

* refine

* refine

* sync dynamic libs

* refine

* fix docker cmd

* fix rank

* refine

* refine

* add callbacks simple rpc

* refine

* refine

* fix

* refine

* refine

* refine

* fix conn

* support tradional mode

* refine

* refine

* refine

* rm

* refine

* refine

* refine

* refine todo

* refine

* refine

* rm unused

* rm todo

* revert

* refine

* add log

* refine

* refine

* fix order

* refine

* refine

* refine

* refine

* refine

* refine

* refine

* rm

* rename

* add comment

* refine

* rm

* refine

* refine

* refine

* refine

* add todo

* add info

* refine

* refine

* refine

* add back some legacy code

* refine

* refine

* refine

* refine

* refine

* rm oneflow_worker exe

* rm log

* fix bug

* support --cmd

* add check

* refine

* fix

* fmt
---
 .github/workflows/test.yml                    |  12 +-
 ci/test/2node_op_test.sh                      |   3 +-
 ci/test/distributed_run.py                    | 686 +++++++++++++-----
 ci/test/excludelist                           | 222 ++++++
 cmake/oneflow.cmake                           |   4 -
 oneflow/core/job/oneflow_worker.cpp           |  55 --
 oneflow/python/deprecated/init_cluster_env.py | 161 ----
 oneflow/python/framework/unittest.py          | 100 ++-
 8 files changed, 795 insertions(+), 448 deletions(-)
 create mode 100644 ci/test/excludelist
 delete mode 100644 oneflow/core/job/oneflow_worker.cpp

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 08d8424ed..e49eabe9a 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -347,11 +347,9 @@ jobs:
       - name: Op test (distributed)
         if: matrix.test_suite == 'cuda'
         run: |
-          python3 ci/test/distributed_run.py --make_dotssh
-          python3 ci/test/distributed_run.py --run --bash_script=ci/test/2node_op_test.sh \
-            --build_docker_img \
-            --oneflow_wheel_path=${wheelhouse_dir} \
-            --oneflow_worker_bin=${bin_dir}/oneflow_worker
+          python3 ci/test/distributed_run.py \
+            --bash_script=ci/test/2node_op_test.sh \
+            --oneflow_wheel_path=${wheelhouse_dir}
       - name: Print backtrace (distributed test)
         if: always() && matrix.test_suite == 'cuda'
         run: |
@@ -362,8 +360,8 @@ jobs:
         if: always() && matrix.test_suite == 'cuda'
         uses: ./.github/actions/upload_oss
         with:
-          src_path: oneflow_temp
-          oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/oneflow_temp
+          src_path: distributed-tmp
+          oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/distributed-tmp
           oss_access_key_id: ${{ secrets.OSS_ACCESS_KEY_ID }}
           oss_access_key_secret: ${{ secrets.OSS_ACCESS_KEY_SECRET }}
       - name: Dry run test (run without runtime)
diff --git a/ci/test/2node_op_test.sh b/ci/test/2node_op_test.sh
index 593713414..a914e9021 100644
--- a/ci/test/2node_op_test.sh
+++ b/ci/test/2node_op_test.sh
@@ -3,14 +3,13 @@ set -xe
 
 export PYTHONUNBUFFERED=1
 
-bash ci/test/try_install.sh
-
 src_dir=${ONEFLOW_SRC_DIR:-"$PWD"}
 test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"/test_tmp_dir"}
 
 
 rm -rf $test_tmp_dir
 mkdir -p $test_tmp_dir
+chmod -R o+w $test_tmp_dir
 cp -r $src_dir/oneflow/python/test $test_tmp_dir
 cd $test_tmp_dir
 
diff --git a/ci/test/distributed_run.py b/ci/test/distributed_run.py
index 796e9ac24..b806f9707 100644
--- a/ci/test/distributed_run.py
+++ b/ci/test/distributed_run.py
@@ -1,22 +1,17 @@
+from multiprocessing.connection import Listener
 import os
 import subprocess
 import socket
 import tempfile
 from contextlib import closing
-from subprocess import TimeoutExpired
 import argparse
 import uuid
-
-FIX_SSH_PERMISSION = """
-mkdir -p /run/sshd
-chown root ~/.ssh
-chmod 700 ~/.ssh
-chown root ~/.ssh/*
-chmod 600 ~/.ssh/*
-chmod 400 ~/.ssh/id_rsa
-chmod 400 ~/.ssh/id_rsa.pub
-chmod 600 ~/.ssh/config
-"""
+import getpass
+import atexit
+import pathlib
+import asyncio
+import glob
+from datetime import date
 
 HARD_CODED_AFFILIATIONS = {
     "192.168.1.11": ["192.168.1.12",],
@@ -27,6 +22,20 @@ HARD_CODED_AFFILIATIONS = {
 }
 
 
+def is_img_existing(tag):
+    returncode = subprocess.run(
+        "docker image inspect {}".format(tag),
+        shell=True,
+        stdout=subprocess.DEVNULL,
+        stderr=subprocess.DEVNULL,
+    ).returncode
+    if returncode == 0:
+        print("[OK]", tag)
+        return True
+    else:
+        return False
+
+
 def get_affiliations(host):
     # TODO(tsai): Implement a HTTP endpoint to retrieve affiliations
     if host in HARD_CODED_AFFILIATIONS:
@@ -50,239 +59,534 @@ def find_free_port():
         return s.getsockname()[1]
 
 
-def make_dotssh(dotssh_dir):
-    bash_cmd = f"""set -ex
-rm -rf /root/.ssh
-ssh-keygen -t rsa -N "" -f /root/.ssh/id_rsa
-cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys && \
-    chmod 600 /root/.ssh/authorized_keys
-/etc/init.d/ssh start && \
-    ssh-keyscan -H localhost >> /root/.ssh/known_hosts
+async def spawn_shell_and_check(cmd: str = None):
+    p = await asyncio.create_subprocess_shell(cmd,)
+    await p.wait()
+    assert p.returncode == 0, cmd
 
-cp -r /root/.ssh/* {dotssh_dir}
-chmod 777 {dotssh_dir}
-chmod 777 {dotssh_dir}/*
-"""
-    with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f:
-        f_name = f.name
-        f.write(bash_cmd)
-        f.flush()
-        subprocess.check_call(
-            f"docker run --rm -v /tmp:/host/tmp -v {dotssh_dir}:{dotssh_dir} -w $PWD oneflow-test:$USER bash /host/{f_name}",
-            shell=True,
-        )
-    config_content = """Host *
-	StrictHostKeyChecking no
-"""
-    with open(os.path.join(dotssh_dir, "config"), "w") as f:
-        f.write(config_content)
+
+async def spawn_shell(cmd: str = None):
+    p = await asyncio.create_subprocess_shell(cmd,)
+    await p.wait()
 
 
-def build_docker_img(hostname=None, workspace_dir=None):
-    if hostname:
+async def build_docker_img(remote_host=None, workspace_dir=None):
+    if remote_host:
         assert workspace_dir
-        subprocess.check_call("rm -f > oneflow-src.zip", shell=True)
-        subprocess.check_call(
-            "git archive --format zip HEAD > oneflow-src.zip", shell=True
+        await spawn_shell_and_check("rm -f > oneflow-src.zip")
+        await spawn_shell_and_check("git archive --format zip HEAD > oneflow-src.zip")
+        await spawn_shell_and_check(
+            f"scp oneflow-src.zip {remote_host}:{workspace_dir}/oneflow-src.zip",
         )
-        subprocess.check_call(
-            f"scp oneflow-src.zip {hostname}:{workspace_dir}/oneflow-src.zip",
-            shell=True,
+        await spawn_shell_and_check(
+            f"ssh  {remote_host} unzip {workspace_dir}/oneflow-src.zip -d {workspace_dir}/oneflow-src",
         )
-        subprocess.check_call(
-            f"ssh  {hostname} unzip {workspace_dir}/oneflow-src.zip -d {workspace_dir}/oneflow-src",
-            shell=True,
-        )
-        subprocess.check_call(
-            f"ssh  {hostname} bash {workspace_dir}/oneflow-src/docker/ci/test/build.sh",
-            shell=True,
+        await spawn_shell_and_check(
+            f"ssh  {remote_host} bash {workspace_dir}/oneflow-src/docker/ci/test/build.sh",
         )
     else:
-        subprocess.check_call(f"bash docker/ci/test/build.sh", shell=True)
+        await spawn_shell_and_check(f"bash docker/ci/test/build.sh")
 
 
-def create_remote_workspace_dir(hostname, workspace_dir):
-    subprocess.check_call(f"ssh {hostname} mkdir -p {workspace_dir}", shell=True)
+async def create_remote_workspace_dir(remote_host=None, workspace_dir=None):
+    await spawn_shell_and_check(f"ssh {remote_host} mkdir -p {workspace_dir}")
+    print("create_remote_workspace_dir done")
 
 
-def launch_remote_container(
-    hostname, docker_ssh_port, survival_time, dotssh_dir, workspace_dir
+async def launch_remote_container(
+    remote_host=None,
+    survival_time=None,
+    workspace_dir=None,
+    container_name=None,
+    img_tag=None,
+    oneflow_wheel_path=None,
+    oneflow_build_path=None,
 ):
-    subprocess.check_call(
-        f"scp -r {dotssh_dir} {hostname}:{workspace_dir}/dotssh", shell=True
-    )
-    bash_cmd = f"""set -ex
-{FIX_SSH_PERMISSION}
-/usr/sbin/sshd -p {docker_ssh_port}
-sleep {survival_time}
-"""
-    with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f:
-        f_name = f.name
-        f.write(bash_cmd)
-        f.flush()
-        subprocess.check_call(
-            f"scp {f_name} {hostname}:{workspace_dir}/launch_ssh_server.sh", shell=True,
-        )
-    docker_cmd = f"""docker run --privileged --cidfile {workspace_dir}/worker.cid --network host --shm-size=8g --rm -v {workspace_dir}/dotssh:/root/.ssh -v {workspace_dir}:{workspace_dir} -w {workspace_dir} -v /dataset:/dataset -v /model_zoo:/model_zoo oneflow-test:$USER bash launch_ssh_server.sh
+    print("launching remote container at", remote_host)
+    assert img_tag
+    pythonpath_args = None
+    if oneflow_wheel_path:
+        pythonpath_args = ""
+    elif oneflow_build_path:
+        pythonpath_args = f"--env PYTHONPATH={workspace_dir}/python_scripts"
+    else:
+        raise ValueError("must have oneflow_wheel_path or oneflow_build_path")
+    docker_cmd = f"""docker run --privileged -d --network host --shm-size=8g --rm -v {workspace_dir}:{workspace_dir} -w {workspace_dir} -v /dataset:/dataset -v /model_zoo:/model_zoo --name {container_name} {pythonpath_args} {img_tag} sleep {survival_time}
 """
-    ssh_cmd = f"ssh {hostname} {docker_cmd}"
-    print(ssh_cmd, flush=True)
-    proc = subprocess.Popen(ssh_cmd, shell=True,)
-    try:
-        proc.wait(timeout=10)
-        raise ValueError("sshd quit early, returncode:", proc.returncode)
-    except TimeoutExpired:
-        survival_time_min = survival_time / 60
-        survival_time_min = int(survival_time_min)
-        print(
-            f"remote container launched, host: {hostname}, ssh port: {docker_ssh_port}, .ssh dir: {dotssh_dir}, survival: {survival_time_min} mins",
-            flush=True,
+    await spawn_shell_and_check(f"ssh {remote_host} {docker_cmd}")
+    if oneflow_wheel_path:
+        whl_basename = os.path.basename(oneflow_wheel_path)
+        await spawn_shell_and_check(
+            f"ssh {remote_host} docker exec {container_name} python3 -m pip install {workspace_dir}/{whl_basename}"
         )
+    await spawn_shell(
+        f"ssh {remote_host} docker exec {container_name} python3 -m oneflow --doctor"
+    )
+
+
+def handle_cast(conn=None, cmd=None):
+    received_cmd: str = conn.recv().decode()
+    assert received_cmd.startswith("cast/")
+    received_cmd = received_cmd.replace("cast/", "")
+    assert received_cmd == cmd, (received_cmd, cmd)
+    return conn.recv().decode()
+
+
+def handle_call(conn=None, cmd=None, response=None):
+    received_cmd: str = conn.recv().decode()
+    assert received_cmd.startswith("call/")
+    received_cmd = received_cmd.replace("call/", "")
+    assert received_cmd == cmd, (received_cmd, cmd)
+    msg = conn.recv().decode()
+    conn.send(response.encode())
+    return msg
 
 
-def run_bash_script(
-    bash_script,
-    timeout,
-    ssh_port,
-    dotssh_dir,
-    this_host,
-    remote_host,
-    oneflow_worker_bin,
-    oneflow_wheel_path,
+def wait_for_env_proto_and_launch_workers(
+    agent_port=None, agent_authkey=None, remote_hosts=None
 ):
-    assert os.path.exists(bash_script)
-    log_dir = "./unittest-log-" + str(uuid.uuid4())
-    ctrl_port = find_free_port()
-    data_port = find_free_port()
-    exports = f"""
+    listener = Listener(("localhost", agent_port), authkey=agent_authkey)
+    while True:
+        conn = listener.accept()
+        remote_docker_proc = {}
+        for remote_host in remote_hosts:
+            assert handle_cast(conn=conn, cmd="host"), remote_host
+            env_proto_txt = handle_cast(conn=conn, cmd="env_proto")
+            print("[docker agent]", f"[{remote_host}]", env_proto_txt)
+            f = tempfile.NamedTemporaryFile(mode="wb+", delete=True)
+            f.write(env_proto_txt.encode())
+            f.flush()
+            subprocess.check_call(
+                f"rsync -azP --omit-dir-times --no-perms --no-group {f.name} {remote_host}:{workspace_dir}/env.prototxt",
+                shell=True,
+            )
+            run_docker_cmd = f"ssh {remote_host} docker exec {container_name}"
+            run_docker_cmd += f" python3 -m oneflow --start_worker --env_proto={workspace_dir}/env.prototxt"
+            print("[docker agent]", run_docker_cmd)
+            remote_docker_proc[remote_host] = subprocess.Popen(
+                run_docker_cmd, shell=True
+            )
+            handle_call(conn=conn, cmd="start_worker", response="ok")
+        for k, v in remote_docker_proc.items():
+            assert v.wait() == 0
+
+
+class DockerAgent:
+    def __init__(
+        self,
+        port=None,
+        authkey=None,
+        this_host=None,
+        remote_hosts=None,
+        container_name=None,
+        timeout=None,
+        workspace_dir=None,
+        img_tag=None,
+        oneflow_wheel_path=None,
+        oneflow_build_path=None,
+        oneflow_test_tmp_dir=None,
+    ) -> None:
+        # info
+        self.this_host = this_host
+        self.remote_hosts = remote_hosts
+        self.container_name = container_name
+        self.timeout = timeout
+        self.common_docker_args = "--privileged --rm --network host --shm-size=8g -v $HOME:$HOME -v /dataset:/dataset -v /model_zoo:/model_zoo"
+        self.workspace_dir = workspace_dir
+        self.img_tag = img_tag
+        self.oneflow_wheel_path = oneflow_wheel_path
+        self.oneflow_build_path = oneflow_build_path
+        self.oneflow_test_tmp_dir = oneflow_test_tmp_dir
+        # impl
+        self.env_proto_txt = None
+        self.bash_tmp_file = None
+        self.bash_proc = None
+        self.remote_docker_proc = {}
+        self.agent_port = port
+        self.agent_authkey = authkey
+
+    def __enter__(self):
+        return self
+
+    def run_bash_script_async(self, bash_script=None, cmd=None):
+        remote_hosts_str = ",".join(self.remote_hosts)
+        ctrl_port = find_free_port()
+        data_port = find_free_port()
+        exports = f"""
 export ONEFLOW_TEST_MASTER_PORT={ctrl_port}
 export ONEFLOW_TEST_DATA_PORT={data_port}
-export ONEFLOW_TEST_SSH_PORT={ssh_port}
-export ONEFLOW_TEST_LOG_DIR={log_dir}
-export ONEFLOW_TEST_NODE_LIST="{this_host},{remote_host}"
+export ONEFLOW_TEST_NODE_LIST="{self.this_host},{remote_hosts_str}"
 export ONEFLOW_WORKER_KEEP_LOG=1
-export ONEFLOW_TEST_TMP_DIR="./distributed-tmp"
+export ONEFLOW_TEST_TMP_DIR="{self.oneflow_test_tmp_dir}"
 export NCCL_DEBUG=INFO
+export ONEFLOW_TEST_WORKER_AGENT_PORT={agent_port}
+export ONEFLOW_TEST_WORKER_AGENT_AUTHKEY={agent_authkey}
 """
-    if oneflow_worker_bin:
-        exports += f"export ONEFLOW_WORKER_BIN={oneflow_worker_bin}\n"
-    if oneflow_wheel_path:
-        exports += f"export ONEFLOW_WHEEL_PATH={oneflow_wheel_path}\n"
-    bash_cmd = f"""set -ex
+        if self.oneflow_wheel_path:
+            exports += f"python3 -m pip install {self.oneflow_wheel_path}"
+        if self.oneflow_build_path:
+            exports += f"export PYTHONPATH={self.oneflow_build_path}/python_scripts:$PYTHONPATH\n"
+        bash_cmd = None
+        if bash_script:
+            assert os.path.exists(bash_script)
+            bash_cmd = f"""set -ex
 {exports}
-rm -rf ~/.ssh
-cp -r /dotssh ~/.ssh
-{FIX_SSH_PERMISSION}
 bash {bash_script}
 """
-    artifact_cmd = f"""set -ex
+        elif cmd:
+            bash_cmd = f"""set -ex
 {exports}
-rm -rf ~/.ssh
-cp -r /dotssh ~/.ssh
-{FIX_SSH_PERMISSION}
-mkdir -p oneflow_temp
-rm -rf oneflow_temp/{remote_host}
-scp -P {ssh_port} -r {remote_host}:~/oneflow_temp oneflow_temp/{remote_host}
-rm -f oneflow_temp/{remote_host}/*/oneflow_worker
-chmod -R o+w oneflow_temp
-chmod -R o+r oneflow_temp
+{cmd}
 """
-    returncode = None
+        else:
+            raise ValueError("not impl")
+        assert bash_cmd
 
-    def get_docker_cmd(f, cmd):
-        f_name = f.name
-        print(cmd, flush=True)
-        f.write(cmd)
-        f.flush()
-        return f"docker run --privileged --network host --shm-size=8g --rm -v /tmp:/host/tmp -v $PWD:$PWD -v $HOME:$HOME -w $PWD -v {dotssh_dir}:/dotssh -v /dataset:/dataset -v /model_zoo:/model_zoo oneflow-test:$USER bash /host{f_name}"
+        def get_docker_cmd(f, cmd):
+            f_name = f.name
+            f.write(cmd)
+            f.flush()
+            return f"docker run {self.common_docker_args} -v /tmp:/host/tmp:ro -v $PWD:$PWD -w $PWD --name {self.container_name} {self.img_tag} bash /host{f_name}"
 
-    with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f:
+        f = tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", delete=True)
         run_docker_cmd = get_docker_cmd(f, bash_cmd)
-        returncode = subprocess.call(run_docker_cmd, shell=True, timeout=timeout)
+        self.bash_tmp_file = f
+        self.bash_proc = subprocess.Popen(run_docker_cmd, shell=True)
 
-    with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as f:
-        artifact_docker_cmd = get_docker_cmd(f, artifact_cmd)
-        subprocess.check_call(artifact_docker_cmd, shell=True, timeout=timeout)
+    def block(self):
+        from multiprocessing import Process
 
-    if returncode != 0:
-        raise ValueError(run_docker_cmd)
+        p = None
+        kwargs = {
+            "agent_port": self.agent_port,
+            "agent_authkey": self.agent_authkey,
+            "remote_hosts": self.remote_hosts,
+        }
+        p = Process(target=wait_for_env_proto_and_launch_workers, kwargs=kwargs,)
+        p.start()
+        print("[docker agent]", "blocking")
+        while self.bash_proc.poll() is None and p.is_alive() == True:
+            pass
+        p.terminate()
+        assert self.bash_proc.returncode == 0
+        print("[docker agent]", "bash execution done")
 
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        pass
 
-if __name__ == "__main__":
-    parser = argparse.ArgumentParser()
-    parser.add_argument(
-        "--launch_remote_container", action="store_true", required=False, default=False
+
+async def fix_and_sync_libs(oneflow_internal_path=None, remote_hosts=None):
+    tmp_dir = tempfile.TemporaryDirectory()
+    tmp_lib_dir = os.path.join(tmp_dir.name, "libs")
+    os.mkdir(tmp_lib_dir)
+    await spawn_shell_and_check(
+        """ldd file | grep "=> /" | awk '{print $3}' | xargs -I '{}' cp -v '{}' destination""".replace(
+            "file", oneflow_internal_path
+        ).replace(
+            "destination", tmp_lib_dir
+        ),
     )
-    parser.add_argument(
-        "--make_dotssh", action="store_true", required=False, default=False
+    libs = os.listdir(tmp_lib_dir)
+    assert len(libs) > 0
+    excludelist_path = os.path.join(
+        pathlib.Path(__file__).parent.absolute(), "excludelist"
     )
-    parser.add_argument("--run", action="store_true", required=False, default=False)
+    excludelist = open(excludelist_path).read().split("\n")
+    await spawn_shell_and_check(f"cp {oneflow_internal_path} {tmp_dir.name}")
+
+    def handle_lib(lib):
+        if lib in excludelist or "libpython" in lib:
+            print("excluding", lib)
+            return spawn_shell_and_check(f"rm {tmp_lib_dir}/{lib}")
+        else:
+            print("keeping", lib)
+            return spawn_shell_and_check(
+                f"patchelf --set-rpath '$ORIGIN' {tmp_lib_dir}/{lib}"
+            )
+
+    await asyncio.gather(*(handle_lib(lib) for lib in libs))
+
+    tmp_oneflow_internal_path = os.path.join(
+        tmp_dir.name, pathlib.Path(oneflow_internal_path).name
+    )
+    print("before fixing .so")
+    await spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}")
+    print("fixing .so")
+    await spawn_shell_and_check(
+        f"patchelf --set-rpath '$ORIGIN/libs' {tmp_oneflow_internal_path}"
+    )
+
+    await asyncio.gather(
+        *[
+            spawn_shell_and_check(
+                f"ssh {remote_host} 'mkdir -p {workspace_dir}/python_scripts/oneflow/libs'",
+            )
+            for remote_host in remote_hosts
+        ]
+    )
+
+    async def copy_file(path=None, remote_host=None):
+        relpath = os.path.relpath(path, tmp_dir.name)
+        await spawn_shell_and_check(
+            f"scp {path} {remote_host}:{workspace_dir}/python_scripts/oneflow/{relpath}",
+        )
+
+    files = [
+        os.path.join(root, name)
+        for root, dirs, files in os.walk(tmp_dir.name, topdown=True)
+        for name in files
+    ]
+
+    await asyncio.gather(
+        *[
+            copy_file(path=f, remote_host=remote_host)
+            for remote_host in remote_hosts
+            for f in files
+        ],
+        spawn_shell_and_check(f"ldd {tmp_oneflow_internal_path}"),
+    )
+
+
+def get_remote_hosts(args):
+    remote_hosts = None
+    if len(args.remote_host) == 1:
+        remote_hosts = args.remote_host.split(",")
+    elif len(args.remote_host) == 0:
+        affiliations = get_affiliations(this_host)
+        assert (
+            affiliations
+        ), f"no affiliated node found for {this_host}, you should specify one"
+        remote_host = affiliations[0]
+        remote_host = socket.gethostbyname(remote_host)
+        remote_hosts = [remote_host]
+    else:
+        remote_hosts = args.remote_host
+    return remote_hosts
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--debug", action="store_true", required=False, default=False)
     parser.add_argument(
-        "--build_docker_img", action="store_true", required=False, default=False
+        "--skip_libs", action="store_true", required=False, default=False
     )
     parser.add_argument("--bash_script", type=str, required=False)
     default_this_host = socket.gethostname()
     parser.add_argument(
         "--this_host", type=str, required=False, default=default_this_host
     )
-    parser.add_argument("--remote_host", type=str, required=False)
-    default_dotssh_dir = os.path.expanduser("~/distributed_run_dotssh")
+    parser.add_argument("--remote_host", action="append", default=[])
+    parser.add_argument("--oneflow_wheel_path", type=str, required=False, default=None)
+    parser.add_argument("--oneflow_build_path", type=str, required=False, default=None)
+    parser.add_argument("--custom_img_tag", type=str, required=False, default=None)
+    parser.add_argument("--cmd", type=str, required=False, default=None)
     parser.add_argument(
-        "--dotssh_dir", type=str, required=False, default=default_dotssh_dir
+        "--oneflow_test_tmp_dir", type=str, required=False, default="distributed-tmp"
     )
-    parser.add_argument("--oneflow_worker_bin", type=str, required=False, default=None)
-    parser.add_argument("--oneflow_wheel_path", type=str, required=False, default=None)
-    parser.add_argument("--ssh_port", type=int, required=False, default=None)
     parser.add_argument("--timeout", type=int, required=False, default=6 * 60 * 60)
     args = parser.parse_args()
 
-    ssh_port = None
-    if args.ssh_port:
-        ssh_port = args.ssh_port
-    else:
-        ssh_port = find_free_port()
-    assert ssh_port
-    if args.make_dotssh:
-        make_dotssh(args.dotssh_dir)
-
+    assert bool(args.oneflow_wheel_path) != bool(args.oneflow_build_path)
+    assert bool(args.bash_script) != bool(args.cmd)
+    if args.skip_libs:
+        assert args.debug, "--skip_libs only works with --debug"
+        assert (
+            args.oneflow_build_path
+        ), "--skip_libs only works with --oneflow_build_path"
+    oneflow_wheel_path = args.oneflow_wheel_path
+    if oneflow_wheel_path and os.path.isdir(oneflow_wheel_path):
+        whl_paths = [
+            name for name in glob.glob(os.path.join(oneflow_wheel_path, f"*.whl",))
+        ]
+        assert len(whl_paths) == 1
+        oneflow_wheel_path = whl_paths[0]
     this_host = args.this_host
     this_host = resolve_hostname_hardcoded(this_host)
 
-    remote_host = None
-    if args.remote_host:
-        assert len(args.remote_host.split(",")) == 1, "only support 2-nodes run for now"
-        remote_host = args.remote_host
-    else:
-        affiliations = get_affiliations(this_host)
-        assert (
-            affiliations
-        ), f"no affiliated node found for {this_host}, you should specify one"
-        remote_host = affiliations[0]
-        remote_host = socket.gethostbyname(remote_host)
+    remote_hosts = get_remote_hosts(args)
 
-    print(f"this_host: {this_host}, remote_host: {remote_host}", flush=True)
+    print(f"this_host: {this_host}, remote_hosts: {remote_hosts}", flush=True)
+    sub_dir = str(uuid.uuid4())
+    if args.debug:
+        sub_dir = "debug"
     workspace_dir = os.path.join(
-        os.path.expanduser("~"), "distributed_run_workspace", str(uuid.uuid4())
+        os.path.expanduser("~"), "distributed_run_workspace", sub_dir
     )
-    create_remote_workspace_dir(remote_host, workspace_dir)
-    if args.launch_remote_container:
-        launch_remote_container(remote_host, ssh_port, args.timeout, args.dotssh_dir)
-    if args.build_docker_img:
-        build_docker_img()
-        build_docker_img(remote_host, workspace_dir)
-    if args.run:
-        launch_remote_container(
-            remote_host, ssh_port, args.timeout, args.dotssh_dir, workspace_dir,
+    print("workspace_dir", workspace_dir)
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(
+        asyncio.gather(
+            *[
+                create_remote_workspace_dir(
+                    remote_host=remote_host, workspace_dir=workspace_dir
+                )
+                for remote_host in remote_hosts
+            ]
         )
-        assert args.bash_script
-        run_bash_script(
-            args.bash_script,
-            args.timeout,
-            ssh_port,
-            args.dotssh_dir,
-            this_host,
-            remote_host,
-            args.oneflow_worker_bin,
-            args.oneflow_wheel_path,
+    )
+    if args.oneflow_build_path:
+        so_paths = [
+            name
+            for name in glob.glob(
+                os.path.join(
+                    args.oneflow_build_path,
+                    f"python_scripts/oneflow/_oneflow_internal.*.so",
+                )
+            )
+        ]
+        assert len(so_paths) == 1, so_paths
+        oneflow_internal_path = so_paths[0]
+        oneflow_internal_path = os.path.join(
+            args.oneflow_build_path, oneflow_internal_path
         )
-        exit(0)
+        tmp_dir = None
+        print("copying python_scripts dir")
+        loop.run_until_complete(
+            asyncio.gather(
+                *[
+                    spawn_shell_and_check(
+                        f"rsync -azP --omit-dir-times --no-perms --no-group --copy-links --include='*.py' --exclude='*.so' --exclude='__pycache__' --exclude='python_scripts/oneflow/include' --include='*/' --exclude='*' {args.oneflow_build_path}/python_scripts {remote_host}:{workspace_dir}"
+                    )
+                    for remote_host in remote_hosts
+                ]
+            )
+        )
+        if args.skip_libs == False:
+            print("copying .so")
+            loop.run_until_complete(
+                fix_and_sync_libs(
+                    oneflow_internal_path=oneflow_internal_path,
+                    remote_hosts=remote_hosts,
+                )
+            )
+    elif oneflow_wheel_path:
+        loop.run_until_complete(
+            asyncio.gather(
+                *[
+                    spawn_shell_and_check(
+                        f"rsync -azP --omit-dir-times --no-perms --no-group {oneflow_wheel_path} {remote_host}:{workspace_dir}"
+                    )
+                    for remote_host in remote_hosts
+                ]
+            )
+        )
+    default_docker_image = "oneflow-test:$USER"
+    ci_user_docker_image = "oneflow-test:ci-user"
+    img_tag = None
+    if args.custom_img_tag == None:
+        if is_img_existing(default_docker_image):
+            img_tag = default_docker_image
+        elif is_img_existing(ci_user_docker_image):
+            img_tag = ci_user_docker_image
+        else:
+            loop.run_until_complete(
+                asyncio.gather(
+                    *[
+                        build_docker_img(
+                            remote_host=remote_host, workspace_dir=workspace_dir
+                        )
+                        for remote_host in remote_hosts
+                    ],
+                    build_docker_img(workspace_dir=workspace_dir),
+                )
+            )
+            img_tag = default_docker_image
+    else:
+        img_tag = args.custom_img_tag
+    assert img_tag
+    agent_port = find_free_port()
+    agent_authkey = str(uuid.uuid4())
+    container_name = (
+        getpass.getuser()
+        + "-distributed-run-main-node-at-"
+        + this_host.replace(".", "-")
+    )
+
+    def exit_handler():
+        print(
+            "---------start cleanup, you should ignore errors below and check the errors above---------"
+        )
+        if args.oneflow_build_path:
+            print("fixing permission of", args.oneflow_build_path)
+            subprocess.call(
+                f"docker run --rm -v {args.oneflow_build_path}:/p -w /p busybox chmod -R o+w .",
+                shell=True,
+            )
+        loop.run_until_complete(
+            asyncio.gather(
+                *[
+                    spawn_shell(
+                        f"ssh {remote_host} docker run --rm -v {workspace_dir}:/p -w /p busybox chmod -R 777 .",
+                    )
+                    for remote_host in remote_hosts
+                ],
+            )
+        )
+        print("copying artifacts")
+        loop.run_until_complete(
+            asyncio.gather(
+                *[
+                    spawn_shell(
+                        f"rsync -azP --omit-dir-times --no-perms --no-group --exclude='*.whl' --exclude='python_scripts' {remote_host}:{workspace_dir}/ {args.oneflow_test_tmp_dir}/{remote_host}"
+                    )
+                    for remote_host in remote_hosts
+                ]
+            )
+        )
+        assert workspace_dir
+        if args.debug == False:
+            loop.run_until_complete(
+                asyncio.gather(
+                    *[
+                        spawn_shell(f"ssh {remote_host} rm -rf {workspace_dir}",)
+                        for remote_host in remote_hosts
+                    ],
+                )
+            )
+        print("removing docker container:", container_name)
+        rm_cmd = f"docker rm -f {container_name}"
+        loop.run_until_complete(
+            asyncio.gather(
+                *[
+                    spawn_shell(f"ssh {remote_host} {rm_cmd}")
+                    for remote_host in remote_hosts
+                ],
+                spawn_shell(rm_cmd),
+            )
+        )
+
+    atexit.register(exit_handler)
+    loop.run_until_complete(
+        asyncio.gather(
+            *[
+                launch_remote_container(
+                    remote_host=remote_host,
+                    survival_time=args.timeout,
+                    workspace_dir=workspace_dir,
+                    container_name=container_name,
+                    oneflow_wheel_path=oneflow_wheel_path,
+                    oneflow_build_path=args.oneflow_build_path,
+                    img_tag=img_tag,
+                )
+                for remote_host in remote_hosts
+            ],
+        )
+    )
+
+    with DockerAgent(
+        port=agent_port,
+        authkey=agent_authkey.encode(),
+        this_host=this_host,
+        remote_hosts=remote_hosts,
+        container_name=container_name,
+        workspace_dir=workspace_dir,
+        oneflow_wheel_path=oneflow_wheel_path,
+        oneflow_build_path=args.oneflow_build_path,
+        img_tag=img_tag,
+        oneflow_test_tmp_dir=args.oneflow_test_tmp_dir,
+    ) as agent:
+        if args.bash_script:
+            agent.run_bash_script_async(bash_script=args.bash_script,)
+        elif args.cmd:
+            agent.run_bash_script_async(cmd=args.cmd,)
+        agent.block()
diff --git a/ci/test/excludelist b/ci/test/excludelist
new file mode 100644
index 000000000..662b06b0f
--- /dev/null
+++ b/ci/test/excludelist
@@ -0,0 +1,222 @@
+# This file lists libraries that we will assume to be present on the host system and hence
+# should NOT be bundled inside AppImages. This is a working document; expect it to change
+# over time. File format: one filename per line. Each entry should have a justification comment.
+
+# See the useful tool at https://abi-laboratory.pro/index.php?view=navigator&symbol=hb_buffer_set_cluster_level#result
+# to investigate issues with missing symbols.
+
+ld-linux.so.2
+ld-linux-x86-64.so.2
+libanl.so.1
+libBrokenLocale.so.1
+libcidn.so.1
+# libcrypt.so.1 # Not part of glibc anymore as of Fedora 30. See https://github.com/slic3r/Slic3r/issues/4798 and https://pagure.io/fedora-docs/release-notes/c/01d74b33564faa42959c035e1eee286940e9170e?branch=f28
+libc.so.6
+libdl.so.2
+libm.so.6
+libmvec.so.1
+# libnsl.so.1 # Not part of glibc anymore as of Fedora 28. See https://github.com/RPCS3/rpcs3/issues/5224#issuecomment-434930594
+libnss_compat.so.2
+# libnss_db.so.2 # Not part of neon-useredition-20190321-0530-amd64.iso
+libnss_dns.so.2
+libnss_files.so.2
+libnss_hesiod.so.2
+libnss_nisplus.so.2
+libnss_nis.so.2
+libpthread.so.0
+libresolv.so.2
+librt.so.1
+libthread_db.so.1
+libutil.so.1
+# These files are all part of the GNU C Library which should never be bundled.
+# List was generated from a fresh build of glibc 2.25.
+
+libstdc++.so.6
+# Workaround for:
+# usr/lib/libstdc++.so.6: version `GLIBCXX_3.4.21' not found
+
+libGL.so.1
+# The above may be missing on Chrome OS, https://www.reddit.com/r/Crostini/comments/d1lp67/ultimaker_cura_no_longer_running_as_an_appimage/
+libEGL.so.1
+# Part of the video driver (OpenGL); present on any regular
+# desktop system, may also be provided by proprietary drivers.
+# Known to cause issues if it's bundled.
+
+libGLdispatch.so.0
+libGLX.so.0
+# reported to be superfluent and conflicting system libraries (graphics driver)
+# see https://github.com/linuxdeploy/linuxdeploy/issues/89
+
+libOpenGL.so.0
+# Qt installed via install-qt.sh apparently links to this library
+# part of OpenGL like libGL/libEGL, so excluding it should not cause any problems
+# https://github.com/linuxdeploy/linuxdeploy/issues/152
+
+libdrm.so.2
+# Workaround for:
+# Antergos Linux release 2015.11 (ISO-Rolling)
+# /usr/lib/libdrm_amdgpu.so.1: error: symbol lookup error: undefined symbol: drmGetNodeTypeFromFd (fatal)
+# libGL error: unable to load driver: swrast_dri.so
+# libGL error: failed to load driver: swrast
+# Unrecognized OpenGL version
+
+libglapi.so.0
+# Part of mesa
+# known to cause problems with graphics, see https://github.com/RPCS3/rpcs3/issues/4427#issuecomment-381674910
+
+libgbm.so.1
+# Part of mesa
+# https://github.com/probonopd/linuxdeployqt/issues/390#issuecomment-529036305
+
+libxcb.so.1
+# Workaround for:
+# Fedora 23
+# symbol lookup error: /lib64/libxcb-dri3.so.0: undefined symbol: xcb_send_fd
+# Uncertain if this is required to be bundled for some distributions - if so we need to write a version check script and use LD_PRELOAD to load the system version if it is newer
+# Fedora 25:
+# undefined symbol: xcb_send_request_with_fds
+# https://github.com/AppImage/AppImages/issues/128
+
+libX11.so.6
+# Workaround for:
+# Fedora 23
+# symbol lookup error: ./lib/libX11.so.6: undefined symbol: xcb_wait_for_reply64
+# Uncertain if this is required to be bundled for some distributions - if so we need to write a version check script and use LD_PRELOAD to load the system version if it is newer
+
+libgio-2.0.so.0
+# Workaround for:
+# On Ubuntu, "symbol lookup error: /usr/lib/x86_64-linux-gnu/gtk-2.0/modules/liboverlay-scrollbar.so: undefined symbol: g_settings_new"
+
+# libgdk-x11-2.0.so.0 # Missing on openSUSE-Tumbleweed-KDE-Live-x86_64-Snapshot20170601-Media.iso
+# libgtk-x11-2.0.so.0 # Missing on openSUSE-Tumbleweed-KDE-Live-x86_64-Snapshot20170601-Media.iso
+
+libasound.so.2
+# Workaround for:
+# No sound, e.g., in VLC.AppImage (does not find sound cards)
+
+# https://github.com/AppImage/pkg2appimage/issues/475
+# libgdk_pixbuf-2.0.so.0
+# Was: Workaround for:
+# On Ubuntu, get (inkscape:25621): GdkPixbuf-WARNING **: Error loading XPM image loader: Image type 'xpm' is not supported
+
+libfontconfig.so.1
+# Workaround for:
+# Application stalls when loading fonts during application launch; e.g., KiCad on ubuntu-mate
+
+libthai.so.0
+# Workaround for:
+# audacity: /tmp/.mount_AudaciUsFbON/usr/lib/libthai.so.0: version `LIBTHAI_0.1.25' not found (required by /usr/lib64/libpango-1.0.so.0)
+# on openSUSE Tumbleweed
+
+# other "low-level" font rendering libraries
+# should fix https://github.com/probonopd/linuxdeployqt/issues/261#issuecomment-377522251
+# and https://github.com/probonopd/linuxdeployqt/issues/157#issuecomment-320755694
+libfreetype.so.6
+libharfbuzz.so.0
+
+# Note, after discussion we do not exlude this, but we can use a dummy library that just does nothing
+# libselinux.so.1
+# Workaround for:
+# sed: error while loading shared libraries: libpcre.so.3: cannot open shared object file: No such file or directory
+# Some distributions, such as Arch Linux, do not come with libselinux.so.1 by default.
+# The solution is to bundle a dummy mock library:
+# echo "extern int is_selinux_enabled(void){return 0;}" >> selinux-mock.c
+# gcc -s -shared -o libselinux.so.1 -Wl,-soname,libselinux.so.1 selinux-mock.c
+# strip libselinux.so.1
+# More information: https://github.com/AppImage/AppImages/issues/83
+# and https://github.com/AppImage/AppImageKit/issues/775#issuecomment-614954821
+# https://gitlab.com/sulinos/devel/libselinux-dummy
+
+# The following are assumed to be part of the base system
+# Removing these has worked e.g., for Krita. Feel free to report if
+# you think that some of these should go into AppImages and why.
+libcom_err.so.2
+libexpat.so.1
+libgcc_s.so.1
+libglib-2.0.so.0
+libgpg-error.so.0
+# libgssapi_krb5.so.2 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there
+# libgssapi.so.3 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
+# libhcrypto.so.4 # Missing on openSUSE LEAP 42.0
+# libheimbase.so.1 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
+# libheimntlm.so.0 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
+# libhx509.so.5 # Missing on openSUSE LEAP 42.0
+libICE.so.6
+# libidn.so.11 # Does not come with Solus by default
+# libk5crypto.so.3 # Runnning AppImage built on Debian 9 or Ubuntu 16.04 on an Archlinux fails otherwise; https://github.com/AppImage/AppImages/issues/301
+# libkeyutils.so.1 # Does not come with Void Linux by default; https://github.com/Subsurface-divelog/subsurface/issues/1971#issuecomment-466606834
+# libkrb5.so.26 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there. Missing on openSUSE LEAP 42.0
+# libkrb5.so.3 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there
+# libkrb5support.so.0 # Disputed, seemingly needed by Arch Linux since Kerberos is named differently there
+libp11-kit.so.0
+# libpcre.so.3 # Missing on Fedora 24, SLED 12 SP1, and openSUSE Leap 42.2
+# libroken.so.18 # Mission on openSUSE LEAP 42.0
+# libsasl2.so.2 # Seemingly needed when running Ubuntu 14.04 binaries on Fedora 23
+libSM.so.6
+libusb-1.0.so.0
+libuuid.so.1
+# libwind.so.0 # Missing on openSUSE LEAP 42.0
+
+# Potentially dangerous libraries
+libgobject-2.0.so.0
+
+# Workaround for:
+# Rectangles instead of fonts
+# https://github.com/AppImage/AppImages/issues/240
+libpangoft2-1.0.so.0
+libpangocairo-1.0.so.0
+libpango-1.0.so.0
+
+# FIXME:
+# Can get symbol lookup error: /lib64/libpango-1.0.so.0: undefined symbol: g_log_structured_standard
+# if libcairo is bundled but libpango is not
+
+# Workaround for:
+# e.g., Spotify
+# relocation error: /lib/x86_64-linux-gnu/libgcrypt.so.20:
+# symbol gpgrt_lock_lock, version GPG_ERROR_1.0 not defined
+# in file libgpg-error.so.0 with link time reference
+libgpg-error.so.0
+
+libjack.so.0
+# it must match the ABI of the JACK server which is installed in the base system
+# rncbc confirmed this
+# However, this library is missing on Fedora-WS-Live-31-1-9
+# which means that we should avoid using JACK altogether if possible
+
+# Unsolved issue:
+# https://github.com/probonopd/linuxdeployqt/issues/35
+# Error initializing NSS with a persistent database (sql:/home/me/.pki/nssdb): libsoftokn3.so: cannot open shared object file: No such file or directory
+# Error initializing NSS without a persistent database: NSS error code: -5925
+# nss_error=-5925, os_error=0
+# libnss3.so should not be removed from the bundles, as this causes other issues, e.g.,
+# https://github.com/probonopd/linuxdeployqt/issues/35#issuecomment-256213517
+# and https://github.com/AppImage/AppImages/pull/114
+# libnss3.so
+
+# The following cannot be excluded, see
+# https://github.com/AppImage/AppImages/commit/6c7473d8cdaaa2572248dcc53d7f617a577ade6b
+# http://stackoverflow.com/questions/32644157/forcing-a-binary-to-use-a-specific-newer-version-of-a-shared-library-so
+# libssl.so.1
+# libssl.so.1.0.0
+# libcrypto.so.1
+# libcrypto.so.1.0.0
+
+# According to https://github.com/RicardoEPRodrigues/3Engine/issues/4#issuecomment-511598362
+# libGLEW is not tied to a specific GPU. It's linked against libGL.so.1
+# and that one is different depending on the installed driver.
+# In fact libGLEW is changing its soversion very often, so you should always bundle libGLEW.so.2.0
+
+# libglut.so.3 # to be confirmed
+
+libxcb-dri3.so.0 # https://github.com/AppImage/AppImages/issues/348
+libxcb-dri2.so.0 # https://github.com/probonopd/linuxdeployqt/issues/331#issuecomment-442276277
+
+# If the next line turns out to cause issues, we will have to remove it again and find another solution
+libfribidi.so.0 # https://github.com/olive-editor/olive/issues/221 and https://github.com/knapsu/plex-media-player-appimage/issues/14
+
+# Workaround for:
+# symbol lookup error: /lib/x86_64-linux-gnu/libgnutls.so.30: undefined symbol: __gmpz_limbs_write
+# https://github.com/ONLYOFFICE/appimage-desktopeditors/issues/3
+# Apparently coreutils depends on it, so it should be safe to assume that it comes with every target system
+libgmp.so.10
diff --git a/cmake/oneflow.cmake b/cmake/oneflow.cmake
index f5404ec6f..041856c98 100644
--- a/cmake/oneflow.cmake
+++ b/cmake/oneflow.cmake
@@ -145,10 +145,6 @@ foreach(oneflow_single_file ${oneflow_all_src})
       if(RPC_BACKEND MATCHES "GRPC")
         list(APPEND of_transport_test_cc ${oneflow_single_file})
       endif()
-    elseif("${oneflow_single_file}" MATCHES "^${PROJECT_SOURCE_DIR}/oneflow/core/job/oneflow_worker.cpp$")
-      if (RPC_BACKEND MATCHES "GRPC")
-        list(APPEND of_main_cc ${oneflow_single_file})
-      endif()
     elseif("${oneflow_single_file}" MATCHES "^${PROJECT_SOURCE_DIR}/oneflow/(core|user|xrt)/.*_test\\.cpp$")
       # test file
       list(APPEND of_all_test_cc ${oneflow_single_file})
diff --git a/oneflow/core/job/oneflow_worker.cpp b/oneflow/core/job/oneflow_worker.cpp
deleted file mode 100644
index 44cc4d656..000000000
--- a/oneflow/core/job/oneflow_worker.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-Copyright 2020 The OneFlow Authors. All rights reserved.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-#include "oneflow/core/common/maybe.h"
-#include "oneflow/core/job/oneflow.h"
-#include "oneflow/core/job/env_global_objects_scope.h"
-#include "oneflow/core/job/session_global_objects_scope.h"
-#include "oneflow/core/job/env.pb.h"
-#include "oneflow/core/job/cluster.h"
-#include "oneflow/core/job/cluster_instruction.h"
-#include "oneflow/core/control/ctrl_client.h"
-#include "oneflow/core/control/ctrl_server.h"
-#include "oneflow/core/persistence/tee_persistent_log_stream.h"
-
-namespace oneflow {
-
-namespace {
-
-Maybe<void> Run(const std::string& env_proto_filepath) {
-  EnvProto env_proto;
-  ParseProtoFromTextFile(env_proto_filepath, &env_proto);
-  CHECK_ISNULL_OR_RETURN(Global<EnvGlobalObjectsScope>::Get());
-  // Global<T>::New is not allowed to be called here
-  // because glog is not constructed yet and LOG(INFO) has bad bahavior
-  Global<EnvGlobalObjectsScope>::SetAllocated(new EnvGlobalObjectsScope());
-  JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto));
-  JUST(Cluster::WorkerLoop());
-  Global<EnvGlobalObjectsScope>::Delete();
-  return Maybe<void>::Ok();
-}
-
-}  // namespace
-
-}  // namespace oneflow
-
-DEFINE_string(env_proto, "", "EnvProto file path");
-
-int main(int argc, char* argv[]) {
-  using namespace oneflow;
-  gflags::ParseCommandLineFlags(&argc, &argv, true);
-  CHECK_JUST(Run(FLAGS_env_proto));
-  return 0;
-}
diff --git a/oneflow/python/deprecated/init_cluster_env.py b/oneflow/python/deprecated/init_cluster_env.py
index fac9d6bcd..8d6c4b473 100644
--- a/oneflow/python/deprecated/init_cluster_env.py
+++ b/oneflow/python/deprecated/init_cluster_env.py
@@ -29,108 +29,6 @@ from oneflow.python.oneflow_export import oneflow_export
 import subprocess
 
 
-@oneflow_export("deprecated.init_worker")
-def init_worker(
-    scp_binary: bool = True,
-    use_uuid: bool = True,
-    ssh_port=22,
-    bootstrap_conf_list=None,
-) -> None:
-    assert type(env_util.default_env_proto) is EnvProto
-    env_util.defautl_env_proto_mutable = False
-    env_proto = env_util.default_env_proto
-    assert len(env_proto.machine) > 0 or len(bootstrap_conf_list) > 0
-    assert type(scp_binary) is bool
-    assert type(use_uuid) is bool
-    oneflow_worker_path = os.getenv("ONEFLOW_WORKER_BIN")
-    assert oneflow_worker_path is not None, "please set env ONEFLOW_WORKER_BIN"
-    assert os.path.isfile(
-        oneflow_worker_path
-    ), "binary oneflow_worker not found, please check your environment variable ONEFLOW_WORKER_BIN, path: {}".format(
-        oneflow_worker_path
-    )
-    global _temp_run_dir
-    if use_uuid:
-        assert scp_binary is True
-        _temp_run_dir = os.getenv("HOME") + "/oneflow_temp/" + str(uuid.uuid1())
-    else:
-        _temp_run_dir = os.getenv("HOME") + "/oneflow_temp/no_uuid"
-    run_dir = _temp_run_dir
-    run_dir = os.path.abspath(os.path.expanduser(run_dir))
-    if bootstrap_conf_list is None:
-        env_file = NamedTemporaryFile(delete=False)
-        if sys.version_info >= (3, 0):
-            env_file.write(pbtxt.MessageToString(env_proto).encode())
-        else:
-            env_file.write(pbtxt.MessageToString(env_proto))
-        env_file.close()
-
-        for machine in env_proto.machine:
-            if machine.id == 0:
-                pass
-            else:
-                _SendBinaryAndConfig2Worker(
-                    machine.addr,
-                    oneflow_worker_path,
-                    env_file.name,
-                    run_dir,
-                    scp_binary,
-                    ssh_port,
-                )
-
-        os.remove(env_file.name)
-    else:
-        worker_env_proto = EnvProto()
-        worker_env_proto.CopyFrom(env_proto)
-        worker_env_proto.ClearField("ctrl_bootstrap_conf")
-        for bootstrap_conf in bootstrap_conf_list:
-            if bootstrap_conf.rank == 0:
-                continue
-            # set ctrl_bootstrap_conf of worker
-            assert bootstrap_conf.HasField("host")
-            worker_env_proto.ctrl_bootstrap_conf.CopyFrom(bootstrap_conf)
-            env_file = NamedTemporaryFile(delete=False)
-            if sys.version_info >= (3, 0):
-                env_file.write(pbtxt.MessageToString(worker_env_proto).encode())
-            else:
-                env_file.write(pbtxt.MessageToString(worker_env_proto))
-            env_file.close()
-            _SendBinaryAndConfig2Worker(
-                bootstrap_conf.host,
-                oneflow_worker_path,
-                env_file.name,
-                run_dir,
-                scp_binary,
-                ssh_port,
-            )
-            os.remove(env_file.name)
-
-
-@oneflow_export("deprecated.delete_worker")
-def delete_worker(ssh_port=22) -> None:
-    ssh_port_arg = " -p {} ".format(ssh_port)
-    # assert env_util.env_proto_mutable == False
-    env_proto = env_util.default_env_proto
-    assert isinstance(env_proto, EnvProto)
-    global _temp_run_dir
-    assert _temp_run_dir != ""
-    for machine in env_proto.machine:
-        if machine.id == 0:
-            continue
-        ssh_prefix = (
-            "ssh {} ".format(ssh_port_arg)
-            + getpass.getuser()
-            + "@"
-            + machine.addr
-            + " "
-        )
-        if os.getenv("ONEFLOW_WORKER_KEEP_LOG"):
-            print("worker log kept at: {}".format(machine.addr), flush=True)
-        else:
-            _SystemCall(ssh_prefix + '"rm -r ' + _temp_run_dir + '"')
-            print("temp run dir removed at: {}".format(machine.addr), flush=True)
-
-
 @oneflow_export("deprecated.delete_worker_by_bootstrap")
 def delete_worker_by_bootstrap(ssh_port=22) -> None:
     ssh_port_arg = " -p {} ".format(ssh_port)
@@ -166,63 +64,4 @@ def delete_worker_of_multi_process(run_dir) -> None:
         print("temp run dir removed at localhost:" + run_dir, flush=True)
 
 
-def _SendBinaryAndConfig2Worker(
-    addr, oneflow_worker_path, env_proto_path, run_dir, scp_binary, ssh_port
-):
-    ssh_port_arg = " -p {} ".format(ssh_port)
-    scp_port_arg = " -P {} ".format(ssh_port)
-    _SystemCall(
-        "ssh-copy-id {} -f ".format(ssh_port_arg) + getpass.getuser() + "@" + addr
-    )
-    ssh_prefix = "ssh {}".format(ssh_port_arg) + getpass.getuser() + "@" + addr + " "
-    remote_file_prefix = " " + getpass.getuser() + "@" + addr + ":"
-    assert run_dir != ""
-    _SystemCall(ssh_prefix + '"mkdir -p ' + run_dir + '"')
-    if scp_binary:
-        _SystemCall(
-            "scp {}".format(scp_port_arg)
-            + oneflow_worker_path
-            + remote_file_prefix
-            + run_dir
-            + "/oneflow_worker"
-        )
-    _SystemCall(
-        "scp {}".format(scp_port_arg)
-        + env_proto_path
-        + remote_file_prefix
-        + run_dir
-        + "/env.proto"
-    )
-    oneflow_libibverbs_path = os.getenv("ONEFLOW_LIBIBVERBS_PATH")
-    libibverbs_env_str = ""
-    if oneflow_libibverbs_path:
-        libibverbs_env_str = "ONEFLOW_LIBIBVERBS_PATH=" + oneflow_libibverbs_path + " "
-    oneflow_cmd = (
-        '"cd '
-        + run_dir
-        + "; "
-        + libibverbs_env_str
-        + "nohup ./oneflow_worker -logtostderr=0 -log_dir=./log -v=0 -logbuflevel=-1 "
-        + "-env_proto=./env.proto "
-        + ' 1>/dev/null 2>&1 </dev/null & "'
-    )
-    _SystemCall(ssh_prefix + oneflow_cmd)
-    proc = subprocess.Popen(
-        ssh_prefix + "ps aux",
-        stdout=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        encoding="utf-8",
-        shell=True,
-    )
-    outs, errs = proc.communicate(timeout=5)
-    print(outs)
-    assert "oneflow_worker" in str(outs), "fail to start oneflow_worker"
-    print("oneflow worker initialized:", addr, flush=True)
-
-
-def _SystemCall(cmd):
-    print(cmd, flush=True)
-    subprocess.check_call(cmd, shell=True)
-
-
 _temp_run_dir = ""
diff --git a/oneflow/python/framework/unittest.py b/oneflow/python/framework/unittest.py
index 784905412..0459f056c 100644
--- a/oneflow/python/framework/unittest.py
+++ b/oneflow/python/framework/unittest.py
@@ -17,9 +17,7 @@ from __future__ import absolute_import
 
 import os
 import sys
-import getpass
 import imp
-import inspect
 import socket
 from contextlib import closing
 import uuid
@@ -33,7 +31,6 @@ from oneflow.core.job.env_pb2 import EnvProto
 from oneflow.python.oneflow_export import oneflow_export
 from typing import Any, Dict, Callable
 import subprocess
-import platform
 
 
 class _ClearDefaultSession(object):
@@ -169,6 +166,55 @@ _unittest_env_initilized = False
 _unittest_worker_initilized = False
 
 
+def worker_agent_port():
+    port_txt = os.getenv("ONEFLOW_TEST_WORKER_AGENT_PORT")
+    if port_txt:
+        return int(port_txt)
+    else:
+        return None
+
+
+def worker_agent_authkey():
+    key = os.getenv("ONEFLOW_TEST_WORKER_AGENT_AUTHKEY")
+    assert key
+    return key
+
+
+def use_worker_agent():
+    return worker_agent_port() is not None
+
+
+def cast(conn=None, cmd=None, msg=None):
+    cmd = "cast/" + cmd
+    print("[unittest]", f"[{cmd}]", msg)
+    conn.send(cmd.encode())
+    conn.send(msg.encode())
+
+
+def call(conn=None, cmd=None, msg=None):
+    cmd = "call/" + cmd
+    print("[unittest]", f"[{cmd}]", msg)
+    conn.send(cmd.encode())
+    msg_ = ""
+    if msg is not None:
+        msg_ = msg
+    conn.send(msg_.encode())
+    return conn.recv().decode()
+
+
+def launch_worker_via_agent(host=None, env_proto=None):
+    print("[unittest]", "launching worker via agent at", host)
+    from multiprocessing.connection import Client
+
+    address = ("localhost", worker_agent_port())
+    conn = Client(address, authkey=worker_agent_authkey().encode())
+    cast(conn=conn, cmd="host", msg=host)
+    cast(conn=conn, cmd="env_proto", msg=pbtxt.MessageToString(env_proto))
+    assert call(conn=conn, cmd="start_worker") == "ok"
+    print("[unittest]", "worker launched via agent at", host)
+    conn.close()
+
+
 @oneflow_export("unittest.TestCase")
 class TestCase(unittest.TestCase):
     def setUp(self):
@@ -180,18 +226,20 @@ class TestCase(unittest.TestCase):
                 master_port = os.getenv("ONEFLOW_TEST_MASTER_PORT")
                 assert master_port, "env var ONEFLOW_TEST_MASTER_PORT not set"
                 oneflow.env.ctrl_port(int(master_port))
+                data_port = os.getenv("ONEFLOW_TEST_DATA_PORT")
+                if data_port:
+                    oneflow.env.data_port(int(data_port))
                 if enable_init_by_host_list():
                     oneflow.env.machine(node_list())
                     data_port = os.getenv("ONEFLOW_TEST_DATA_PORT")
-                    if data_port:
-                        oneflow.env.data_port(int(data_port))
-                    ssh_port = os.getenv("ONEFLOW_TEST_SSH_PORT")
                     print("initializing worker...")
-                    oneflow.deprecated.init_worker(
-                        scp_binary=True, use_uuid=True, ssh_port=int(ssh_port)
-                    )
-                    atexit.register(oneflow.deprecated.delete_worker, ssh_port=ssh_port)
-                    _unittest_worker_initilized = True
+                    for machine in env_util.default_env_proto.machine:
+                        if machine.id == 0:
+                            pass
+                        else:
+                            launch_worker_via_agent(
+                                host=machine.addr, env_proto=env_util.default_env_proto
+                            )
                 else:
                     ctrl_port = os.getenv("ONEFLOW_TEST_CTRL_PORT")
                     config_rank_ctrl_port = -1
@@ -215,23 +263,19 @@ class TestCase(unittest.TestCase):
                         config_rank_ctrl_port,
                         config_node_size,
                     )
-
-                    data_port = os.getenv("ONEFLOW_TEST_DATA_PORT")
-                    if data_port:
-                        oneflow.env.data_port(int(data_port))
-
-                    ssh_port = os.getenv("ONEFLOW_TEST_SSH_PORT")
-                    print("initializing worker...")
-                    oneflow.deprecated.init_worker(
-                        scp_binary=True,
-                        use_uuid=True,
-                        ssh_port=int(ssh_port),
-                        bootstrap_conf_list=bootstrap_conf_list,
-                    )
-                    atexit.register(
-                        oneflow.deprecated.delete_worker_by_bootstrap, ssh_port=ssh_port
-                    )
-                    _unittest_worker_initilized = True
+                    worker_env_proto = EnvProto()
+                    worker_env_proto.CopyFrom(env_util.default_env_proto)
+                    worker_env_proto.ClearField("ctrl_bootstrap_conf")
+                    for bootstrap_conf in bootstrap_conf_list:
+                        if bootstrap_conf.rank == 0:
+                            continue
+                        # set ctrl_bootstrap_conf of worker
+                        assert bootstrap_conf.HasField("host")
+                        worker_env_proto.ctrl_bootstrap_conf.CopyFrom(bootstrap_conf)
+                        launch_worker_via_agent(
+                            host=bootstrap_conf.host, env_proto=worker_env_proto
+                        )
+                _unittest_worker_initilized = True
         elif device_num() > 1 and enable_multi_process():
             master_port = find_free_port()
             oneflow.env.ctrl_port(master_port)
-- 
GitLab