From 4c7dae8f4a5b9a35e3692a417312cbd1da77871d Mon Sep 17 00:00:00 2001
From: daquexian <daquexian566@gmail.com>
Date: Tue, 3 Aug 2021 01:23:45 +0800
Subject: [PATCH] multi client multi machine test (#5685)

* add multi client multi machine test

Signed-off-by: daquexian <daquexian566@gmail.com>

* remove copy cores

Signed-off-by: daquexian <daquexian566@gmail.com>

* use discover in bash

Signed-off-by: daquexian <daquexian566@gmail.com>

* add tests in test.yml and refine

Signed-off-by: daquexian <daquexian566@gmail.com>

* remove multi_client test files into test dir to reuse code

Signed-off-by: daquexian <daquexian566@gmail.com>

* delete distributed_run_multi_client.py and move impl in distributed_run.py

Signed-off-by: daquexian <daquexian566@gmail.com>

* if -> elif

Signed-off-by: daquexian <daquexian566@gmail.com>

* try three times and upload log

Signed-off-by: daquexian <daquexian566@gmail.com>

* add 'mode' arg in py

Signed-off-by: daquexian <daquexian566@gmail.com>

* auto format by CI

* remove --multi_client in yml

Signed-off-by: daquexian <daquexian566@gmail.com>

* skip distributed test in cpu

Signed-off-by: daquexian <daquexian566@gmail.com>

* use new test container

Signed-off-by: daquexian <daquexian566@gmail.com>

* add host key to all machines

Signed-off-by: daquexian <daquexian566@gmail.com>

* auto format by CI

* fix python version

Signed-off-by: daquexian <daquexian566@gmail.com>

* fix python version

Signed-off-by: daquexian <daquexian566@gmail.com>

Co-authored-by: oneflow-ci-bot <ci-bot@oneflow.org>
Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
---
 .github/workflows/test.yml                    |  66 ++++++---
 ci/test/2node_op_test_multi_client.sh         |  23 +++
 ci/test/distributed_run.py                    | 138 +++++++++++++-----
 .../generic_test_multi_client.sh}             |   0
 .../test_speed_multi_client.sh}               |   0
 python/oneflow/framework/unittest.py          |   5 +-
 python/oneflow/test/modules/test_allreduce.py |  21 ++-
 7 files changed, 190 insertions(+), 63 deletions(-)
 create mode 100755 ci/test/2node_op_test_multi_client.sh
 rename ci/{test_multi_client/generic_test.sh => test/generic_test_multi_client.sh} (100%)
 rename ci/{test_multi_client/test_speed.sh => test/test_speed_multi_client.sh} (100%)

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 33e2e6167..cecd0bba2 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -422,37 +422,21 @@ jobs:
         continue-on-error: true
         id: distributed_try_1
         run: |
-          python3 ci/test/distributed_run.py --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
+          python3 ci/test/distributed_run.py --mode=single_client --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
       - name: Single client op test (distributed, 2nd try)
         timeout-minutes: 45
         if: matrix.test_suite == 'cuda' && steps.distributed_try_1.outcome=='failure'
         continue-on-error: true
         id: distributed_try_2
         run: |
-          python3 ci/test/distributed_run.py --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
+          python3 ci/test/distributed_run.py --mode=single_client --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
       - name: Single client op test (distributed, 3rd try)
         timeout-minutes: 45
         if: matrix.test_suite == 'cuda' && steps.distributed_try_2.outcome=='failure'
         continue-on-error: false
         id: distributed_try_3
         run: |
-          python3 ci/test/distributed_run.py --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
-      - name: Upload log (distributed test)
-        if: always() && steps.distributed_try_3.outcome=='failure' && matrix.test_suite == 'cuda'
-        uses: ./.github/actions/upload_oss
-        with:
-          src_path: distributed-tmp
-          oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/distributed-tmp
-          oss_access_key_id: ${{ secrets.OSS_ACCESS_KEY_ID }}
-          oss_access_key_secret: ${{ secrets.OSS_ACCESS_KEY_SECRET }}
-          upload_core: ${{ contains(github.event.pull_request.labels.*.name, 'upload-core') }}
-      - name: Print backtrace (distributed test)
-        if: always() && steps.distributed_try_3.outcome=='failure' && matrix.test_suite == 'cuda'
-        run: |
-          set -x
-          docker run \
-            ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
-            ${image_name} bash ci/test/print_stack_from_core.sh python3 distributed-tmp
+          python3 ci/test/distributed_run.py --mode=single_client --bash_script=ci/test/2node_op_test.sh --custom_img_tag=${{ env.image_name }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.6
       - name: Doctest
         timeout-minutes: 45
         if: matrix.test_suite == 'cuda'
@@ -499,7 +483,41 @@ jobs:
             ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
             -e ONEFLOW_TEST_DIR=$PWD/python/oneflow/test/modules \
             ${{ env.image_tag }} \
-            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test_multi_client/generic_test.sh"
+            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/generic_test_multi_client.sh"
+      - name: Module API test (distributed, 1st try)
+        if: matrix.test_suite == 'cuda_new_interface'
+        continue-on-error: true
+        id: new_interface_distributed_try_1
+        run: |
+          python3 ci/test/distributed_run.py --bash_script ci/test/2node_op_test_multi_client.sh --copy_files python/oneflow/test/ --copy_files python/oneflow/test_utils/ --copy_files ci/test/ --custom_img_tag=${{ env.image_tag }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.7
+      - name: Module API test (distributed, 2nd try)
+        if: matrix.test_suite == 'cuda_new_interface' && steps.new_interface_distributed_try_1.outcome=='failure'
+        continue-on-error: true
+        id: new_interface_distributed_try_2
+        run: |
+          python3 ci/test/distributed_run.py --bash_script ci/test/2node_op_test_multi_client.sh --copy_files python/oneflow/test/ --copy_files python/oneflow/test_utils/ --copy_files ci/test/ --custom_img_tag=${{ env.image_tag }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.7
+      - name: Module API test (distributed, 3rd try)
+        if: matrix.test_suite == 'cuda_new_interface' && steps.new_interface_distributed_try_2.outcome=='failure'
+        continue-on-error: false
+        id: new_interface_distributed_try_3
+        run: |
+          python3 ci/test/distributed_run.py --bash_script ci/test/2node_op_test_multi_client.sh --copy_files python/oneflow/test/ --copy_files python/oneflow/test_utils/ --copy_files ci/test/ --custom_img_tag=${{ env.image_tag }} --oneflow_wheel_path=${{ env.wheelhouse_dir }} --oneflow_wheel_python_version=3.7
+      - name: Upload log (distributed test)
+        if: always() && (steps.distributed_try_3.outcome=='failure' && matrix.test_suite == 'cuda') || (steps.new_interface_distributed_try_3.outcome=='failure' && matrix.test_suite == 'cuda_new_interface')
+        uses: ./.github/actions/upload_oss
+        with:
+          src_path: distributed-tmp
+          oss_dst_path: oss://oneflow-log/${{ github.repository }}/pr/${{ github.event.pull_request.number }}/${{github.run_id}}/distributed-tmp
+          oss_access_key_id: ${{ secrets.OSS_ACCESS_KEY_ID }}
+          oss_access_key_secret: ${{ secrets.OSS_ACCESS_KEY_SECRET }}
+          upload_core: ${{ contains(github.event.pull_request.labels.*.name, 'upload-core') }}
+      - name: Print backtrace (distributed test)
+        if: always() && (steps.distributed_try_3.outcome=='failure' && matrix.test_suite == 'cuda') || (steps.new_interface_distributed_try_3.outcome=='failure' && matrix.test_suite == 'cuda_new_interface')
+        run: |
+          set -x
+          docker run \
+            ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
+            ${image_name} bash ci/test/print_stack_from_core.sh python3 distributed-tmp
       - name: Dataloader API test
         timeout-minutes: 45
         if: contains(fromJson('["cuda_new_interface", "cpu_new_interface"]'), matrix.test_suite)
@@ -508,7 +526,7 @@ jobs:
             ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
             -e ONEFLOW_TEST_DIR=$PWD/python/oneflow/test/dataloader \
             ${{ env.image_tag }} \
-            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && python3 -m pip install -r docker/ci/test-v2/requirements.txt --user && bash ci/test/try_install.sh && bash ci/test_multi_client/generic_test.sh"
+            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && python3 -m pip install -r docker/ci/test-v2/requirements.txt --user && bash ci/test/try_install.sh && bash ci/test/generic_test_multi_client.sh"
       - name: Tensor API test
         timeout-minutes: 45
         if: contains(fromJson('["cuda_new_interface", "cpu_new_interface"]'), matrix.test_suite)
@@ -517,7 +535,7 @@ jobs:
             ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
             -e ONEFLOW_TEST_DIR=$PWD/python/oneflow/test/tensor \
             ${{ env.image_tag }} \
-            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test_multi_client/generic_test.sh"
+            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/generic_test.sh"
       - name: Graph API test
         if: contains(fromJson('["cuda_new_interface", "cpu_new_interface"]'), matrix.test_suite)
         run: |
@@ -525,7 +543,7 @@ jobs:
             ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
             -e ONEFLOW_TEST_DIR=$PWD/python/oneflow/test/graph \
             ${{ env.image_tag }} \
-            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test_multi_client/generic_test.sh"
+            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/generic_test_multi_client.sh"
       - name: Checkout Oneflow-Inc/models
         if: matrix.test_suite == 'cuda_new_interface'
         uses: actions/checkout@v2
@@ -541,7 +559,7 @@ jobs:
             ${{ env.extra_docker_args }} ${{ env.pip_cache_docker_args }} \
             -e ONEFLOW_MODELS_DIR=$PWD/oneflow-models \
             ${{ env.image_tag }} \
-            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test_multi_client/test_speed.sh"
+            bash -c "python3 -m pip config set global.index-url ${{ env.pip_index_mirror }} && bash ci/test/try_install.sh && bash ci/test/test_speed_multi_client.sh"
       - name: Post speed stats
         if: matrix.test_suite == 'cuda_new_interface'
         continue-on-error: true
diff --git a/ci/test/2node_op_test_multi_client.sh b/ci/test/2node_op_test_multi_client.sh
new file mode 100755
index 000000000..33efe6c8c
--- /dev/null
+++ b/ci/test/2node_op_test_multi_client.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+set -xeu
+
+export PYTHONUNBUFFERED=1
+
+src_dir=${ONEFLOW_SRC_DIR:-"$PWD"}
+test_dir=${ONEFLOW_TEST_DIR:-"$PWD/python/oneflow/test/modules"}
+test_tmp_dir=${ONEFLOW_TEST_TMP_DIR:-"./test_tmp_dir"}
+export ONEFLOW_TEST_UTILS_DIR=$src_dir/python/oneflow/test_utils
+
+
+rm -rf $test_tmp_dir
+mkdir -p $test_tmp_dir
+cp -r $test_dir $test_tmp_dir
+cd ${test_tmp_dir}/$(basename $test_dir)
+
+for device_num in 1 2 4
+do
+    ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr 192.168.1.12 -m unittest discover ${PWD} --failfast --verbose
+    # use a invalid ibverbs lib to test if falling back to epoll works
+    ONEFLOW_TEST_NODE_NUM=2 ONEFLOW_TEST_DEVICE_NUM=$device_num ONEFLOW_LIBIBVERBS_PATH=invalid_lib python3 -m oneflow.distributed.launch --nproc_per_node $device_num --nnodes=2 --node_rank=$NODE_RANK --master_addr 192.168.1.12 -m unittest discover ${PWD} --failfast --verbose
+done
diff --git a/ci/test/distributed_run.py b/ci/test/distributed_run.py
index 1f3e5f792..32e516b76 100644
--- a/ci/test/distributed_run.py
+++ b/ci/test/distributed_run.py
@@ -88,8 +88,21 @@ async def build_docker_img(remote_host=None, workspace_dir=None):
         await spawn_shell_and_check(f"bash docker/ci/test/build.sh")
 
 
-async def create_remote_workspace_dir(remote_host=None, workspace_dir=None):
+async def create_remote_workspace_dir(
+    remote_host=None, workspace_dir=None, copy_files=None
+):
     await spawn_shell_and_check(f"ssh {remote_host} mkdir -p {workspace_dir}")
+    if copy_files is not None:
+        for path in copy_files:
+            # Reference: https://stackoverflow.com/a/31278462
+            if os.path.isdir(path) and path[-1] != "/":
+                path += "/"
+            await spawn_shell_and_check(
+                f"ssh {remote_host} mkdir -p {workspace_dir}/{path}"
+            )
+            await spawn_shell_and_check(
+                f"rsync -azP --omit-dir-times --no-perms --no-group --copy-links --exclude='__pycache__' {path} {remote_host}:{workspace_dir}/{path}"
+            )
     print("create_remote_workspace_dir done")
 
 
@@ -101,6 +114,8 @@ async def launch_remote_container(
     img_tag=None,
     oneflow_wheel_path=None,
     oneflow_python_path=None,
+    cmd=None,
+    node_rank=None,
 ):
     print("launching remote container at", remote_host)
     assert img_tag
@@ -122,6 +137,14 @@ async def launch_remote_container(
     await spawn_shell(
         f"ssh {remote_host} docker exec {container_name} python3 -m oneflow --doctor"
     )
+    if cmd:
+        if node_rank is not None:
+            node_rank_args = f"--env NODE_RANK={node_rank}"
+        else:
+            node_rank_args = ""
+        await spawn_shell(
+            f"ssh {remote_host} docker exec {node_rank_args} {container_name} {cmd}"
+        )
 
 
 def handle_cast(conn=None, cmd=None):
@@ -397,8 +420,11 @@ if __name__ == "__main__":
         "--oneflow_test_tmp_dir", type=str, required=False, default="distributed-tmp"
     )
     parser.add_argument("--timeout", type=int, required=False, default=1 * 60 * 60)
+    parser.add_argument("--mode", type=str, required=False, default="multi_client")
+    parser.add_argument("--copy_files", action="append", default=[])
     args = parser.parse_args()
 
+    assert args.mode in ["multi_client", "single_client"]
     assert bool(args.oneflow_wheel_path) != bool(args.oneflow_python_path)
     assert bool(args.bash_script) != bool(args.cmd)
     if args.skip_libs:
@@ -406,6 +432,7 @@ if __name__ == "__main__":
         assert (
             args.oneflow_python_path
         ), "--skip_libs only works with --oneflow_python_path"
+
     oneflow_wheel_path = args.oneflow_wheel_path
     if oneflow_wheel_path and os.path.isdir(oneflow_wheel_path):
         whl_paths = [
@@ -449,12 +476,27 @@ if __name__ == "__main__":
         + "-distributed-run-main-node-at-"
         + this_host.replace(".", "-")
     )
+    if args.mode == "multi_client":
+        remote_hosts = [this_host] + remote_hosts
     loop = asyncio.get_event_loop()
+    # add host key to all machines (needed by ssh/scp/rsync)
+    loop.run_until_complete(
+        asyncio.gather(
+            *[
+                spawn_shell_and_check(
+                    f"ssh -o StrictHostKeyChecking=no {remote_host} true"
+                )
+                for remote_host in remote_hosts
+            ],
+        ),
+    )
     loop.run_until_complete(
         asyncio.gather(
             *[
                 create_remote_workspace_dir(
-                    remote_host=remote_host, workspace_dir=workspace_dir
+                    remote_host=remote_host,
+                    workspace_dir=workspace_dir,
+                    copy_files=args.copy_files,
                 )
                 for remote_host in remote_hosts
             ],
@@ -556,11 +598,14 @@ if __name__ == "__main__":
             )
         )
         print("copying artifacts")
+        extra_exclude_args = ""
+        for path in args.copy_files:
+            extra_exclude_args += f"--exclude='{path}' "
         loop.run_until_complete(
             asyncio.gather(
                 *[
                     spawn_shell(
-                        f"rsync -azP --omit-dir-times --no-perms --no-group --exclude='*.whl' --exclude='oneflow_python' {remote_host}:{workspace_dir}/ {args.oneflow_test_tmp_dir}/{remote_host}"
+                        f"rsync -azP --omit-dir-times --no-perms --no-group --exclude='*.whl' --exclude='oneflow_python' {extra_exclude_args} {remote_host}:{workspace_dir}/ {args.oneflow_test_tmp_dir}/{remote_host}"
                     )
                     for remote_host in remote_hosts
                 ]
@@ -585,37 +630,60 @@ if __name__ == "__main__":
         )
 
     atexit.register(exit_handler)
-    loop.run_until_complete(
-        asyncio.gather(
-            *[
-                launch_remote_container(
-                    remote_host=remote_host,
-                    survival_time=args.timeout,
-                    workspace_dir=workspace_dir,
-                    container_name=container_name,
-                    oneflow_wheel_path=oneflow_wheel_path,
-                    oneflow_python_path=args.oneflow_python_path,
-                    img_tag=img_tag,
-                )
-                for remote_host in remote_hosts
-            ],
+    if args.mode == "multi_client":
+        if args.bash_script:
+            args.cmd = f"bash {args.bash_script}"
+        loop.run_until_complete(
+            asyncio.gather(
+                *[
+                    launch_remote_container(
+                        remote_host=remote_host,
+                        survival_time=args.timeout,
+                        workspace_dir=workspace_dir,
+                        container_name=container_name,
+                        oneflow_wheel_path=oneflow_wheel_path,
+                        oneflow_python_path=args.oneflow_python_path,
+                        img_tag=img_tag,
+                        cmd=args.cmd,
+                        node_rank=node_rank,
+                    )
+                    for node_rank, remote_host in enumerate(remote_hosts)
+                ],
+            )
+        )
+    else:
+        loop.run_until_complete(
+            asyncio.gather(
+                *[
+                    launch_remote_container(
+                        remote_host=remote_host,
+                        survival_time=args.timeout,
+                        workspace_dir=workspace_dir,
+                        container_name=container_name,
+                        oneflow_wheel_path=oneflow_wheel_path,
+                        oneflow_python_path=args.oneflow_python_path,
+                        img_tag=img_tag,
+                    )
+                    for remote_host in remote_hosts
+                ],
+            )
         )
-    )
 
-    with DockerAgent(
-        port=agent_port,
-        authkey=agent_authkey.encode(),
-        this_host=this_host,
-        remote_hosts=remote_hosts,
-        container_name=container_name,
-        workspace_dir=workspace_dir,
-        oneflow_wheel_path=oneflow_wheel_path,
-        oneflow_python_path=args.oneflow_python_path,
-        img_tag=img_tag,
-        oneflow_test_tmp_dir=args.oneflow_test_tmp_dir,
-    ) as agent:
-        if args.bash_script:
-            agent.run_bash_script_async(bash_script=args.bash_script,)
-        elif args.cmd:
-            agent.run_bash_script_async(cmd=args.cmd,)
-        agent.block()
+    if args.mode == "single_client":
+        with DockerAgent(
+            port=agent_port,
+            authkey=agent_authkey.encode(),
+            this_host=this_host,
+            remote_hosts=remote_hosts,
+            container_name=container_name,
+            workspace_dir=workspace_dir,
+            oneflow_wheel_path=oneflow_wheel_path,
+            oneflow_python_path=args.oneflow_python_path,
+            img_tag=img_tag,
+            oneflow_test_tmp_dir=args.oneflow_test_tmp_dir,
+        ) as agent:
+            if args.bash_script:
+                agent.run_bash_script_async(bash_script=args.bash_script,)
+            elif args.cmd:
+                agent.run_bash_script_async(cmd=args.cmd,)
+            agent.block()
diff --git a/ci/test_multi_client/generic_test.sh b/ci/test/generic_test_multi_client.sh
similarity index 100%
rename from ci/test_multi_client/generic_test.sh
rename to ci/test/generic_test_multi_client.sh
diff --git a/ci/test_multi_client/test_speed.sh b/ci/test/test_speed_multi_client.sh
similarity index 100%
rename from ci/test_multi_client/test_speed.sh
rename to ci/test/test_speed_multi_client.sh
diff --git a/python/oneflow/framework/unittest.py b/python/oneflow/framework/unittest.py
index fba066a64..e00053d2f 100644
--- a/python/oneflow/framework/unittest.py
+++ b/python/oneflow/framework/unittest.py
@@ -109,7 +109,10 @@ def has_node_list():
 
 
 def node_size():
-    if has_node_list():
+    node_num_from_env = os.getenv("ONEFLOW_TEST_NODE_NUM", None)
+    if node_num_from_env:
+        return node_num_from_env
+    elif has_node_list():
         node_list_from_env = node_list()
         return len(node_list_from_env)
     else:
diff --git a/python/oneflow/test/modules/test_allreduce.py b/python/oneflow/test/modules/test_allreduce.py
index d09685243..b1baa752a 100644
--- a/python/oneflow/test/modules/test_allreduce.py
+++ b/python/oneflow/test/modules/test_allreduce.py
@@ -22,16 +22,16 @@ import oneflow as flow
 import oneflow.unittest
 
 
-@flow.unittest.skip_unless_1n2d()
 @unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases")
 class TestAllReduce(flow.unittest.TestCase):
+    @flow.unittest.skip_unless_1n2d()
     def test_all_reduce(test_case):
         arr_rank1 = np.array([1, 2])
         arr_rank2 = np.array([3, 4])
         if flow.distributed.get_rank() == 0:
-            x = flow.Tensor([1, 2])
+            x = flow.Tensor(arr_rank1)
         elif flow.distributed.get_rank() == 1:
-            x = flow.Tensor([3, 4])
+            x = flow.Tensor(arr_rank2)
         else:
             raise ValueError
         x = x.to(f"cuda:{flow.distributed.get_local_rank()}")
@@ -45,6 +45,21 @@ class TestAllReduce(flow.unittest.TestCase):
         y = nccl_allreduce_op(x)[0]
         test_case.assertTrue(np.allclose(y.numpy(), arr_rank1 + arr_rank2))
 
+    @flow.unittest.skip_unless_2n2d()
+    def test_all_reduce_2nodes(test_case):
+        np_arr = np.array([1, 2])
+        x = flow.Tensor(np_arr * flow.distributed.get_rank())
+        x = x.to(f"cuda:{flow.distributed.get_local_rank()}")
+        nccl_allreduce_op = (
+            flow.builtin_op("eager_nccl_all_reduce")
+            .Input("in")
+            .Output("out")
+            .Attr("parallel_conf", f'device_tag: "gpu", device_name: "0-1:0-1"')
+            .Build()
+        )
+        y = nccl_allreduce_op(x)[0]
+        test_case.assertTrue(np.allclose(y.numpy(), np_arr * 10))
+
 
 if __name__ == "__main__":
     unittest.main()
-- 
GitLab