diff --git a/ci/test/2node_op_test.sh b/ci/test/2node_op_test.sh
index 992ccec57eef77714de7b9cee94ac25665521bc9..593713414506948a7de032230a658f5fe3c98033 100644
--- a/ci/test/2node_op_test.sh
+++ b/ci/test/2node_op_test.sh
@@ -17,6 +17,9 @@ cd $test_tmp_dir
ONEFLOW_TEST_DEVICE_NUM=1 python3 test/ops/test_assign.py --failfast --verbose
ONEFLOW_TEST_DEVICE_NUM=1 python3 test/ops/test_two_node_boxing.py --failfast --verbose
-ONEFLOW_TEST_ENABLE_INIT_BY_HOST_LIST=1 ONEFLOW_TEST_DEVICE_NUM=1 python3 -m unittest discover test/ops --failfast --verbose
-ONEFLOW_TEST_ENABLE_INIT_BY_HOST_LIST=1 ONEFLOW_TEST_DEVICE_NUM=2 python3 -m unittest discover test/ops --failfast --verbose
-ONEFLOW_TEST_ENABLE_INIT_BY_HOST_LIST=1 ONEFLOW_TEST_DEVICE_NUM=4 python3 -m unittest discover test/ops --failfast --verbose
+for device_num in 1 2 4
+do
+ ONEFLOW_TEST_ENABLE_INIT_BY_HOST_LIST=1 ONEFLOW_TEST_DEVICE_NUM=$device_num python3 -m unittest discover test/ops --failfast --verbose
+ # use a invalid ibverbs lib to test if falling back to epoll works
+ ONEFLOW_TEST_ENABLE_INIT_BY_HOST_LIST=1 ONEFLOW_TEST_DEVICE_NUM=$device_num ONEFLOW_LIBIBVERBS_PATH=invalid_lib python3 -m unittest discover test/ops --failfast --verbose
+done
diff --git a/cmake/third_party.cmake b/cmake/third_party.cmake
index 816375ea06a6d2541723e8c06687956df085b1c6..61120e76a299b0267e5b9eeb33205fdfc6fec2a6 100644
--- a/cmake/third_party.cmake
+++ b/cmake/third_party.cmake
@@ -219,16 +219,10 @@ if(BUILD_RDMA)
include(CheckIncludeFiles)
include(CheckLibraryExists)
CHECK_INCLUDE_FILES(infiniband/verbs.h HAVE_VERBS_H)
- CHECK_LIBRARY_EXISTS(ibverbs ibv_create_qp "" HAVE_IBVERBS)
- if(HAVE_VERBS_H AND HAVE_IBVERBS)
- list(APPEND oneflow_third_party_libs -libverbs)
+ if(HAVE_VERBS_H)
add_definitions(-DWITH_RDMA)
- elseif(HAVE_VERBS_H)
- message(FATAL_ERROR "RDMA library not found")
- elseif(HAVE_IBVERBS)
- message(FATAL_ERROR "RDMA head file not found")
else()
- message(FATAL_ERROR "RDMA library and head file not found")
+ message(FATAL_ERROR "RDMA head file not found")
endif()
else()
message(FATAL_ERROR "UNIMPLEMENTED")
diff --git a/oneflow/core/comm_network/ibverbs/ibverbs_comm_network.cpp b/oneflow/core/comm_network/ibverbs/ibverbs_comm_network.cpp
index 4a833cafd11889f529f3c9629c232e9006c62423..066dde1039fb8ee1854f5bcabd9887f241a89b38 100644
--- a/oneflow/core/comm_network/ibverbs/ibverbs_comm_network.cpp
+++ b/oneflow/core/comm_network/ibverbs/ibverbs_comm_network.cpp
@@ -18,6 +18,7 @@ limitations under the License.
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/job/global_for.h"
+#include "oneflow/core/dl/include/ibv.h"
#if defined(WITH_RDMA) && defined(OF_PLATFORM_POSIX)
@@ -34,7 +35,11 @@ std::string GenConnInfoKey(int64_t src_machine_id, int64_t dst_machine_id) {
}
void IBVForkInit() {
- if (ibv_fork_init() != 0) { LOG(ERROR) << "ibv_fork_init failed"; }
+ if (ibv::IsAvailable()) {
+ if (ibv::wrapper.ibv_fork_init() != 0) { std::cerr << "ibv_fork_init failed\n"; }
+ } else {
+ std::cerr << "libibverbs not available, ibv_fork_init skipped\n";
+ }
}
} // namespace
@@ -45,9 +50,9 @@ IBVerbsCommNet::~IBVerbsCommNet() {
for (IBVerbsQP* qp : qp_vec_) {
if (qp) { delete qp; }
}
- CHECK_EQ(ibv_destroy_cq(cq_), 0);
- CHECK_EQ(ibv_dealloc_pd(pd_), 0);
- CHECK_EQ(ibv_close_device(context_), 0);
+ CHECK_EQ(ibv::wrapper.ibv_destroy_cq(cq_), 0);
+ CHECK_EQ(ibv::wrapper.ibv_dealloc_pd(pd_), 0);
+ CHECK_EQ(ibv::wrapper.ibv_close_device(context_), 0);
}
void IBVerbsCommNet::RegisterMemoryDone() {
@@ -81,22 +86,22 @@ IBVerbsCommNet::IBVerbsCommNet()
: CommNetIf(),
token2mem_desc_(Global<ResourceDesc, ForEnv>::Get()->process_ranks().size()),
poll_exit_flag_(ATOMIC_FLAG_INIT) {
- ibv_device** device_list = ibv_get_device_list(nullptr);
+ ibv_device** device_list = ibv::wrapper.ibv_get_device_list(nullptr);
PCHECK(device_list);
ibv_device* device = device_list[0];
- context_ = ibv_open_device(device);
+ context_ = ibv::wrapper.ibv_open_device(device);
CHECK(context_);
- ibv_free_device_list(device_list);
- pd_ = ibv_alloc_pd(context_);
+ ibv::wrapper.ibv_free_device_list(device_list);
+ pd_ = ibv::wrapper.ibv_alloc_pd(context_);
CHECK(pd_);
ibv_device_attr device_attr{};
- CHECK_EQ(ibv_query_device(context_, &device_attr), 0);
- cq_ = ibv_create_cq(context_, device_attr.max_cqe, nullptr, nullptr, 0);
+ CHECK_EQ(ibv::wrapper.ibv_query_device(context_, &device_attr), 0);
+ cq_ = ibv::wrapper.ibv_create_cq(context_, device_attr.max_cqe, nullptr, nullptr, 0);
CHECK(cq_);
ibv_port_attr port_attr{};
- CHECK_EQ(ibv_query_port(context_, 1, &port_attr), 0);
+ CHECK_EQ(ibv::wrapper.ibv_query_port_wrap(context_, 1, &port_attr), 0);
ibv_gid gid{};
- CHECK_EQ(ibv_query_gid(context_, 1, 0, &gid), 0);
+ CHECK_EQ(ibv::wrapper.ibv_query_gid(context_, 1, 0, &gid), 0);
int64_t this_machine_id = GlobalProcessCtx::Rank();
qp_vec_.assign(Global<ResourceDesc, ForEnv>::Get()->process_ranks().size(), nullptr);
for (int64_t peer_id : peer_machine_id()) {
diff --git a/oneflow/core/comm_network/ibverbs/ibverbs_memory_desc.cpp b/oneflow/core/comm_network/ibverbs/ibverbs_memory_desc.cpp
index 54f0fbb8614414a41013cf85674818dd434cf39e..445b7dfa1fd9b1fea1592909b3d0b1ecc42a3c8a 100644
--- a/oneflow/core/comm_network/ibverbs/ibverbs_memory_desc.cpp
+++ b/oneflow/core/comm_network/ibverbs/ibverbs_memory_desc.cpp
@@ -16,6 +16,7 @@ limitations under the License.
#include "oneflow/core/comm_network/ibverbs/ibverbs_memory_desc.h"
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/job/global_for.h"
+#include "oneflow/core/dl/include/ibv.h"
#if defined(WITH_RDMA) && defined(OF_PLATFORM_POSIX)
@@ -31,9 +32,9 @@ IBVerbsMemDesc::IBVerbsMemDesc(ibv_pd* pd, void* mem_ptr, size_t byte_size) {
while (byte_size > 0) {
size_t cur_size =
std::min<size_t>(byte_size, Global<ResourceDesc, ForSession>::Get()->rdma_mem_block_byte());
- ibv_mr* cur_mr =
- ibv_reg_mr(pd, ch_mem_ptr, cur_size,
- IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ);
+ ibv_mr* cur_mr = ibv::wrapper.ibv_reg_mr_wrap(
+ pd, ch_mem_ptr, cur_size,
+ IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ);
CHECK(cur_mr);
mr_vec_.push_back(cur_mr);
ibv_sge cur_sge{};
@@ -50,7 +51,7 @@ IBVerbsMemDesc::IBVerbsMemDesc(ibv_pd* pd, void* mem_ptr, size_t byte_size) {
}
IBVerbsMemDesc::~IBVerbsMemDesc() {
- for (ibv_mr* mr : mr_vec_) { CHECK_EQ(ibv_dereg_mr(mr), 0); }
+ for (ibv_mr* mr : mr_vec_) { CHECK_EQ(ibv::wrapper.ibv_dereg_mr(mr), 0); }
}
IBVerbsMemDescProto IBVerbsMemDesc::ToProto() {
diff --git a/oneflow/core/comm_network/ibverbs/ibverbs_qp.cpp b/oneflow/core/comm_network/ibverbs/ibverbs_qp.cpp
index 0e97889befe61c03977434ed7b2467d3336e4a6a..b0f175004da5398597ee0111191682d95f48be87 100644
--- a/oneflow/core/comm_network/ibverbs/ibverbs_qp.cpp
+++ b/oneflow/core/comm_network/ibverbs/ibverbs_qp.cpp
@@ -18,6 +18,7 @@ limitations under the License.
#include "oneflow/core/actor/actor_message_bus.h"
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/job/global_for.h"
+#include "oneflow/core/dl/include/ibv.h"
#if defined(WITH_RDMA) && defined(OF_PLATFORM_POSIX)
@@ -35,7 +36,7 @@ IBVerbsQP::IBVerbsQP(ibv_context* ctx, ibv_pd* pd, ibv_cq* send_cq, ibv_cq* recv
pd_ = pd;
// qp_
ibv_device_attr device_attr{};
- CHECK_EQ(ibv_query_device(ctx, &device_attr), 0);
+ CHECK_EQ(ibv::wrapper.ibv_query_device(ctx, &device_attr), 0);
uint32_t max_recv_wr =
Global<ResourceDesc, ForSession>::Get()->rdma_recv_msg_buf_byte() / sizeof(ActorMsg);
max_recv_wr = std::min<uint32_t>(max_recv_wr, device_attr.max_qp_wr);
@@ -51,7 +52,7 @@ IBVerbsQP::IBVerbsQP(ibv_context* ctx, ibv_pd* pd, ibv_cq* send_cq, ibv_cq* recv
qp_init_attr.cap.max_inline_data = 0;
qp_init_attr.qp_type = IBV_QPT_RC;
qp_init_attr.sq_sig_all = 1;
- qp_ = ibv_create_qp(pd, &qp_init_attr);
+ qp_ = ibv::wrapper.ibv_create_qp(pd, &qp_init_attr);
CHECK(qp_);
// recv_msg_buf_
recv_msg_buf_.assign(max_recv_wr, nullptr);
@@ -61,7 +62,7 @@ IBVerbsQP::IBVerbsQP(ibv_context* ctx, ibv_pd* pd, ibv_cq* send_cq, ibv_cq* recv
}
IBVerbsQP::~IBVerbsQP() {
- CHECK_EQ(ibv_destroy_qp(qp_), 0);
+ CHECK_EQ(ibv::wrapper.ibv_destroy_qp(qp_), 0);
while (send_msg_buf_.empty() == false) {
delete send_msg_buf_.front();
send_msg_buf_.pop();
@@ -71,7 +72,7 @@ IBVerbsQP::~IBVerbsQP() {
void IBVerbsQP::Connect(const IBVerbsConnectionInfo& peer_info) {
ibv_port_attr port_attr{};
- CHECK_EQ(ibv_query_port(ctx_, 1, &port_attr), 0);
+ CHECK_EQ(ibv::wrapper.ibv_query_port_wrap(ctx_, 1, &port_attr), 0);
ibv_qp_attr qp_attr{};
// IBV_QPS_INIT
memset(&qp_attr, 0, sizeof(ibv_qp_attr));
@@ -80,8 +81,8 @@ void IBVerbsQP::Connect(const IBVerbsConnectionInfo& peer_info) {
qp_attr.port_num = 1;
qp_attr.qp_access_flags =
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
- CHECK_EQ(ibv_modify_qp(qp_, &qp_attr,
- IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS),
+ CHECK_EQ(ibv::wrapper.ibv_modify_qp(
+ qp_, &qp_attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS),
0);
// IBV_QPS_RTR
memset(&qp_attr, 0, sizeof(ibv_qp_attr));
@@ -102,9 +103,10 @@ void IBVerbsQP::Connect(const IBVerbsConnectionInfo& peer_info) {
qp_attr.rq_psn = 0;
qp_attr.max_dest_rd_atomic = 1;
qp_attr.min_rnr_timer = 12;
- CHECK_EQ(ibv_modify_qp(qp_, &qp_attr,
- IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN
- | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER),
+ CHECK_EQ(ibv::wrapper.ibv_modify_qp(qp_, &qp_attr,
+ IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN
+ | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC
+ | IBV_QP_MIN_RNR_TIMER),
0);
// IBV_QPS_RTS
memset(&qp_attr, 0, sizeof(ibv_qp_attr));
@@ -114,9 +116,9 @@ void IBVerbsQP::Connect(const IBVerbsConnectionInfo& peer_info) {
qp_attr.retry_cnt = 7;
qp_attr.rnr_retry = 7;
qp_attr.timeout = 14;
- CHECK_EQ(ibv_modify_qp(qp_, &qp_attr,
- IBV_QP_STATE | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC | IBV_QP_RETRY_CNT
- | IBV_QP_RNR_RETRY | IBV_QP_TIMEOUT),
+ CHECK_EQ(ibv::wrapper.ibv_modify_qp(qp_, &qp_attr,
+ IBV_QP_STATE | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC
+ | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_TIMEOUT),
0);
}
diff --git a/oneflow/core/dl/include/ibv.h b/oneflow/core/dl/include/ibv.h
new file mode 100644
index 0000000000000000000000000000000000000000..f705f7577b8d41c7936a65f04160e0ed45ee4169
--- /dev/null
+++ b/oneflow/core/dl/include/ibv.h
@@ -0,0 +1,62 @@
+/*
+Copyright 2020 The OneFlow Authors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#if defined(WITH_RDMA)
+#ifndef ONEFLOW_CORE_DL_INCLUDE_IBV_H_
+#define ONEFLOW_CORE_DL_INCLUDE_IBV_H_
+#include "oneflow/core/dl/include/wrapper.h"
+#include <infiniband/verbs.h>
+
+namespace oneflow {
+
+namespace ibv {
+// has to add extern otherwise it fails to compile at changes meaning of functions
+extern "C" typedef struct IBV {
+#define IBV_APIS(_) \
+ _(ibv_free_device_list) \
+ _(ibv_destroy_qp) \
+ _(ibv_query_gid) \
+ _(ibv_fork_init) \
+ _(ibv_open_device) \
+ _(ibv_destroy_cq) \
+ _(ibv_alloc_pd) \
+ _(ibv_modify_qp) \
+ _(ibv_dealloc_pd) \
+ _(ibv_get_device_list) \
+ _(ibv_close_device) \
+ _(ibv_create_qp) \
+ _(ibv_dereg_mr) \
+ _(ibv_create_cq) \
+ _(ibv_query_device)
+
+#define DECLARE_ONE(name) decltype(&name) name;
+ IBV_APIS(DECLARE_ONE)
+#undef DECLARE_ONE
+ // for a function is not only a function but also a macro,
+ // it requires an alternative name
+ struct ibv_mr* (*ibv_reg_mr_wrap)(struct ibv_pd* pd, void* addr, size_t length, int access);
+ int (*ibv_query_port_wrap)(struct ibv_context* context, uint8_t port_num,
+ struct ibv_port_attr* port_attr);
+} IBV;
+
+bool IsAvailable();
+
+extern IBV wrapper;
+
+} // namespace ibv
+} // namespace oneflow
+
+#endif // ONEFLOW_CORE_DL_INCLUDE_IBV_H_
+#endif // WITH_RDMA
diff --git a/oneflow/core/dl/include/wrapper.h b/oneflow/core/dl/include/wrapper.h
new file mode 100644
index 0000000000000000000000000000000000000000..00a7d66959aec2b367bb1f847efee8359a0e4a58
--- /dev/null
+++ b/oneflow/core/dl/include/wrapper.h
@@ -0,0 +1,43 @@
+/*
+Copyright 2020 The OneFlow Authors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef ONEFLOW_CORE_DL_INCLUDE_WRAPPER_H_
+#define ONEFLOW_CORE_DL_INCLUDE_WRAPPER_H_
+
+#include "oneflow/core/common/util.h"
+
+namespace oneflow {
+
+namespace dl {
+
+class DynamicLibrary {
+ public:
+ OF_DISALLOW_COPY_AND_MOVE(DynamicLibrary);
+ ~DynamicLibrary();
+
+ static std::unique_ptr<DynamicLibrary> Load(const std::vector<std::string>& names);
+ void* LoadSym(const char* name);
+ std::string AbsolutePath();
+
+ private:
+ DynamicLibrary(void* handle) : handle_(handle){};
+ void* handle_ = nullptr;
+};
+
+} // namespace dl
+
+} // namespace oneflow
+
+#endif // ONEFLOW_CORE_DL_INCLUDE_WRAPPER_H_
diff --git a/oneflow/core/dl/lib/ibv_wrapper.cpp b/oneflow/core/dl/lib/ibv_wrapper.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..40ac1fa5303cd3880d762300ae7e93aa12eb644d
--- /dev/null
+++ b/oneflow/core/dl/lib/ibv_wrapper.cpp
@@ -0,0 +1,130 @@
+/*
+Copyright 2020 The OneFlow Authors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#if defined(WITH_RDMA)
+#include "oneflow/core/dl/include/ibv.h"
+
+namespace oneflow {
+
+namespace ibv {
+
+std::vector<std::string> GetLibPaths() {
+ const char* custom_path = std::getenv("ONEFLOW_LIBIBVERBS_PATH");
+ if (custom_path == nullptr) {
+ return {"libibverbs.so.1", "libibverbs.so"};
+ } else {
+ return {custom_path};
+ }
+}
+
+dl::DynamicLibrary* GetIBVLibraryPtr() {
+ static std::unique_ptr<dl::DynamicLibrary> lib = dl::DynamicLibrary::Load(GetLibPaths());
+ return lib.get();
+}
+
+dl::DynamicLibrary& GetIBVLibrary() {
+ dl::DynamicLibrary* lib = GetIBVLibraryPtr();
+ CHECK(lib != nullptr) << "fail to find libibverbs";
+ return *lib;
+}
+
+template<typename FUNC>
+FUNC LoadSymbol(const char* name, FUNC* save) {
+ auto fn = reinterpret_cast<FUNC>(GetIBVLibrary().LoadSym(name));
+ if (!fn) {
+ std::cerr << "Can't load libibverbs symbol " << name << "\n";
+ abort();
+ };
+ *save = fn;
+ return fn;
+}
+
+bool IsAvailable() { return GetIBVLibraryPtr() != nullptr; }
+
+namespace _stubs {
+
+void ibv_free_device_list(struct ibv_device** list) {
+ return LoadSymbol(__func__, &wrapper.ibv_free_device_list)(list);
+}
+
+struct ibv_mr* ibv_reg_mr_wrap(struct ibv_pd* pd, void* addr, size_t length, int access) {
+ return LoadSymbol("ibv_reg_mr", &wrapper.ibv_reg_mr_wrap)(pd, addr, length, access);
+}
+
+int ibv_destroy_qp(struct ibv_qp* qp) { return LoadSymbol(__func__, &wrapper.ibv_destroy_qp)(qp); }
+
+int ibv_query_gid(struct ibv_context* context, uint8_t port_num, int index, union ibv_gid* gid) {
+ return LoadSymbol(__func__, &wrapper.ibv_query_gid)(context, port_num, index, gid);
+}
+
+int ibv_fork_init(void) { return LoadSymbol(__func__, &wrapper.ibv_fork_init)(); }
+
+int ibv_query_port_wrap(struct ibv_context* context, uint8_t port_num,
+ struct ibv_port_attr* port_attr) {
+ return LoadSymbol("ibv_query_port", &wrapper.ibv_query_port_wrap)(context, port_num, port_attr);
+}
+
+struct ibv_context* ibv_open_device(struct ibv_device* device) {
+ return LoadSymbol(__func__, &wrapper.ibv_open_device)(device);
+}
+
+int ibv_destroy_cq(struct ibv_cq* cq) { return LoadSymbol(__func__, &wrapper.ibv_destroy_cq)(cq); }
+
+struct ibv_pd* ibv_alloc_pd(struct ibv_context* context) {
+ return LoadSymbol(__func__, &wrapper.ibv_alloc_pd)(context);
+}
+
+int ibv_modify_qp(struct ibv_qp* qp, struct ibv_qp_attr* attr, int attr_mask) {
+ return LoadSymbol(__func__, &wrapper.ibv_modify_qp)(qp, attr, attr_mask);
+}
+
+int ibv_dealloc_pd(struct ibv_pd* pd) { return LoadSymbol(__func__, &wrapper.ibv_dealloc_pd)(pd); }
+
+struct ibv_device** ibv_get_device_list(int* num_devices) {
+ return LoadSymbol(__func__, &wrapper.ibv_get_device_list)(num_devices);
+}
+
+int ibv_close_device(struct ibv_context* context) {
+ return LoadSymbol(__func__, &wrapper.ibv_close_device)(context);
+}
+
+struct ibv_qp* ibv_create_qp(struct ibv_pd* pd, struct ibv_qp_init_attr* qp_init_attr) {
+ return LoadSymbol(__func__, &wrapper.ibv_create_qp)(pd, qp_init_attr);
+}
+
+int ibv_dereg_mr(struct ibv_mr* mr) { return LoadSymbol(__func__, &wrapper.ibv_dereg_mr)(mr); }
+
+struct ibv_cq* ibv_create_cq(struct ibv_context* context, int cqe, void* cq_context,
+ struct ibv_comp_channel* channel, int comp_vector) {
+ return LoadSymbol(__func__, &wrapper.ibv_create_cq)(context, cqe, cq_context, channel,
+ comp_vector);
+}
+
+int ibv_query_device(struct ibv_context* context, struct ibv_device_attr* device_attr) {
+ return LoadSymbol(__func__, &wrapper.ibv_query_device)(context, device_attr);
+}
+
+} // namespace _stubs
+
+IBV wrapper = {
+#define _REFERENCE_MEMBER(name) _stubs::name,
+ IBV_APIS(_REFERENCE_MEMBER)
+#undef _REFERENCE_MEMBER
+ _stubs::ibv_reg_mr_wrap,
+ _stubs::ibv_query_port_wrap};
+
+} // namespace ibv
+} // namespace oneflow
+#endif // WITH_RDMA
diff --git a/oneflow/core/dl/lib/wrapper.cpp b/oneflow/core/dl/lib/wrapper.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..3f0ad001c64cca41e4475392006b6de9aaa0a234
--- /dev/null
+++ b/oneflow/core/dl/lib/wrapper.cpp
@@ -0,0 +1,62 @@
+/*
+Copyright 2020 The OneFlow Authors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#include "oneflow/core/dl/include/wrapper.h"
+#include <dlfcn.h>
+#include <link.h>
+
+namespace oneflow {
+namespace dl {
+
+namespace {
+
+void* OpenSymbol(void* handle, const char* name) {
+ void* ret = dlsym(handle, name);
+ if (!ret) {
+ std::cerr << "Error in dlopen or dlsym: " << dlerror() << "\n";
+ abort();
+ }
+ return ret;
+}
+
+} // namespace
+
+// original implementation is from pytorch:
+// https://github.com/pytorch/pytorch/blob/259d19a7335b32c4a27a018034551ca6ae997f6b/aten/src/ATen/DynamicLibrary.cpp
+
+std::unique_ptr<DynamicLibrary> DynamicLibrary::Load(const std::vector<std::string>& names) {
+ for (const std::string& name : names) {
+ void* handle = dlopen(name.c_str(), RTLD_LOCAL | RTLD_NOW);
+ if (handle != nullptr) {
+ DynamicLibrary* lib = new DynamicLibrary(handle);
+ std::cout << "loaded library: " << lib->AbsolutePath() << "\n";
+ return std::unique_ptr<DynamicLibrary>(lib);
+ }
+ }
+ return std::unique_ptr<DynamicLibrary>();
+}
+
+void* DynamicLibrary::LoadSym(const char* name) { return OpenSymbol(handle_, name); }
+
+std::string DynamicLibrary::AbsolutePath() {
+ struct link_map* map;
+ dlinfo(handle_, RTLD_DI_LINKMAP, &map);
+ return map->l_name;
+}
+
+DynamicLibrary::~DynamicLibrary() { dlclose(handle_); }
+
+} // namespace dl
+} // namespace oneflow
diff --git a/oneflow/core/job/runtime.cpp b/oneflow/core/job/runtime.cpp
index 053c24785a23ce040b6d780640c7fe8b6804f06b..c1eb6bc3ea5320c300d50cbf7345de7b951fbb6f 100644
--- a/oneflow/core/job/runtime.cpp
+++ b/oneflow/core/job/runtime.cpp
@@ -31,6 +31,9 @@ limitations under the License.
#include "oneflow/user/summary/events_writer.h"
#include "oneflow/core/job/collective_boxing_executor.h"
#include "oneflow/core/job/collective_boxing_device_ctx_poller.h"
+#ifdef WITH_RDMA
+#include "oneflow/core/dl/include/ibv.h"
+#endif // WITH_RDMA
namespace oneflow {
@@ -108,8 +111,13 @@ void Runtime::NewAllGlobal(const Plan& plan, size_t total_piece_num, bool is_exp
// The Global<CommNet> is set allocated by Global<EpollCommNet>
if (Global<ResourceDesc, ForSession>::Get()->use_rdma()) {
#ifdef WITH_RDMA
- Global<IBVerbsCommNet>::New();
- Global<CommNet>::SetAllocated(Global<IBVerbsCommNet>::Get());
+ if (ibv::IsAvailable()) {
+ Global<IBVerbsCommNet>::New();
+ Global<CommNet>::SetAllocated(Global<IBVerbsCommNet>::Get());
+ } else {
+ LOG(ERROR) << "libibverbs not available, falling back to epoll";
+ Global<CommNet>::SetAllocated(Global<EpollCommNet>::Get());
+ }
#else
LOG(FATAL) << "RDMA components not found";
#endif
diff --git a/oneflow/python/deprecated/init_cluster_env.py b/oneflow/python/deprecated/init_cluster_env.py
index decb366cf1b20b048c9364e76210e088d93c03b3..fac9d6bcd8dd3bf6757dc7968860365e5a5f2df3 100644
--- a/oneflow/python/deprecated/init_cluster_env.py
+++ b/oneflow/python/deprecated/init_cluster_env.py
@@ -193,10 +193,15 @@ def _SendBinaryAndConfig2Worker(
+ run_dir
+ "/env.proto"
)
+ oneflow_libibverbs_path = os.getenv("ONEFLOW_LIBIBVERBS_PATH")
+ libibverbs_env_str = ""
+ if oneflow_libibverbs_path:
+ libibverbs_env_str = "ONEFLOW_LIBIBVERBS_PATH=" + oneflow_libibverbs_path + " "
oneflow_cmd = (
'"cd '
+ run_dir
+ "; "
+ + libibverbs_env_str
+ "nohup ./oneflow_worker -logtostderr=0 -log_dir=./log -v=0 -logbuflevel=-1 "
+ "-env_proto=./env.proto "
+ ' 1>/dev/null 2>&1 </dev/null & "'