Skip to content
Snippets Groups Projects
Unverified Commit 6de35463 authored by xiaoxiang shi's avatar xiaoxiang shi Committed by GitHub
Browse files

add msg_penddin_list in ibverbs_qp to optimize qp_init_attr.cap.max_send_wr (#5485)


* add ibverbs_qp.h

* modify the qp 2021-07-14

* optimize the qp_init_attr.cap.max_send_wr
when we send  too many message which are greater then qp_init_attr.cap.max_send_wr  it may have some problem
we use the msg_pendding_list_ to store the extra messages when the message is greater than qp_init_attr.cap.max_send_wr

* optimize the qp_init_attr.cap.max_send_wr
    when we send  too many message which are greater then qp_init_attr.cap.max_send_wr  it may have some proble
    we use the msg_pendding_list_ to store the extra messages when the message is greater than qp_init_attr.cap and clear todo

* optimize the qp_init_attr.cap.max_send_wr
    when we send  too many message which are greater then qp_init_attr.cap.max_send_wr  it may have some proble
    we use the msg_pendding_list_ to store the extra messages when the message is greater than qp_init_attr.cap and clear todo  and clear extra line

* optimize the qp_init_attr.cap.max_send_wr
    when we send  too many message which are greater then qp_init_attr.cap.max_send_wr  it may have some proble
    we use the msg_pendding_list_ to store the extra messages when the message is greater than qp_init_attr.cap and clear todo  and clear extra line and license

* optimize the qp_init_attr.cap.max_send_wr
    when we send  too many message which are greater then qp_init_attr.cap.max_send_wr  it may have some proble
    we use the msg_pendding_list_ to store the extra messages when the message is greater than qp_init_attr.cap and clear todo  and clear extra line and license and todo

* optimize the qp_init_attr.cap.max_send_wr
    when we send  too many message which are greater then qp_init_attr.cap.max_send_wr  it may have some proble
    we use the msg_pendding_list_ to store the extra messages when the message is greater than qp_init_attr.cap -2021-07-15-20:00

* fix the review and pass the test

* increase the kMaxSendWr 07-19

* fix the deconstruct function 2021-07-19

* test the first pr and make it precise

* fix the format and check the logic and remove the using_pendding_list_

* 2021-07-21

* fix the code  according to the  review

* fix the

* rewrite the pr and now we test it

* optimize the IB
when send too many message whose number is bigger then qp_init_attr.cap.max_send_wr,
I use the msg_pendding_list to store the extra message and wait for some time to send these message of the msg_pendding_list

* clear the duplicate license

* use pendding_list to optimize qp_init_attr.cap.max_send_wr of IB module

* change the format of the IB

* commit the format

* add new line 2021-07-27

* add new line 2021-07-27-19:30

* add new line 2021-07-27-19:33

* what is the problem of the format

* change what

* optimize the format

* win10 changed

* runtime

* runtime change again

* optimize the format of runtime

* optimize the format of acor_message.h

* optimize the format ofcpp

* optimize the format ofcpp  again

* optimize the format ofcpp  version0.56

* remove some head file

* fix the name of some paremeter

* merge master to the lambda7 and push it

* cleat the newline of 2021-0728

* why crlf

* optimze the format

* use the make format and optimization

* make the  EnqueuePostSendReadWR/PostPenddingSendWR be private

* use the ending_send_wr_lock_ as lock

* use the pending instead of the ending

* use the pending instead of the ending and

Co-authored-by: default avataroneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
parent 32ba8001
No related branches found
No related tags found
No related merge requests found
......@@ -62,6 +62,8 @@ IBVerbsQP::IBVerbsQP(ibv_context* ctx, ibv_pd* pd, uint8_t port_num, ibv_cq* sen
FOR_RANGE(size_t, i, 0, recv_msg_buf_.size()) { recv_msg_buf_.at(i) = new ActorMsgMR(pd_); }
// send_msg_buf_
CHECK(send_msg_buf_.empty());
num_outstanding_send_wr_ = 0;
max_outstanding_send_wr_ = queue_depth;
}
IBVerbsQP::~IBVerbsQP() {
......@@ -162,8 +164,7 @@ void IBVerbsQP::PostReadRequest(const IBVerbsCommNetRMADesc& remote_mem,
wr.imm_data = 0;
wr.wr.rdma.remote_addr = remote_mem.mem_ptr + i * block_size;
wr.wr.rdma.rkey = remote_mem.mr_rkey;
ibv_send_wr* bad_wr = nullptr;
CHECK_EQ(ibv_post_send(qp_, &wr, &bad_wr), 0);
EnqueuePostSendReadWR(wr, sge);
}
}
......@@ -185,8 +186,19 @@ void IBVerbsQP::PostSendRequest(const ActorMsg& msg) {
wr.send_flags = 0;
wr.imm_data = 0;
memset(&(wr.wr), 0, sizeof(wr.wr));
ibv_send_wr* bad_wr = nullptr;
CHECK_EQ(ibv_post_send(qp_, &wr, &bad_wr), 0);
EnqueuePostSendReadWR(wr, sge);
}
void IBVerbsQP::EnqueuePostSendReadWR(ibv_send_wr wr, ibv_sge sge) {
std::unique_lock<std::mutex> pending_send_wr_lock_(pending_send_wr_mutex_);
if (num_outstanding_send_wr_ < max_outstanding_send_wr_) {
num_outstanding_send_wr_++;
ibv_send_wr* bad_wr = nullptr;
CHECK_EQ(ibv_post_send(qp_, &wr, &bad_wr), 0);
} else {
std::pair<ibv_send_wr, ibv_sge> ibv_send_wr_sge = std::make_pair(wr, sge);
pending_send_wr_queue_.push(ibv_send_wr_sge);
}
}
void IBVerbsQP::ReadDone(WorkRequestId* wr_id) {
......@@ -196,6 +208,7 @@ void IBVerbsQP::ReadDone(WorkRequestId* wr_id) {
Global<CommNet>::Get()->ReadDone(wr_id->read_id);
DeleteWorkRequestId(wr_id);
}
PostPendingSendWR();
}
void IBVerbsQP::SendDone(WorkRequestId* wr_id) {
......@@ -204,6 +217,7 @@ void IBVerbsQP::SendDone(WorkRequestId* wr_id) {
send_msg_buf_.push(wr_id->msg_mr);
}
DeleteWorkRequestId(wr_id);
PostPendingSendWR();
}
void IBVerbsQP::RecvDone(WorkRequestId* wr_id) {
......@@ -214,6 +228,20 @@ void IBVerbsQP::RecvDone(WorkRequestId* wr_id) {
DeleteWorkRequestId(wr_id);
}
void IBVerbsQP::PostPendingSendWR() {
std::unique_lock<std::mutex> pending_send_wr_lock_(pending_send_wr_mutex_);
if (pending_send_wr_queue_.empty() == false) {
std::pair<ibv_send_wr, ibv_sge> ibv_send_wr_sge = std::move(pending_send_wr_queue_.front());
ibv_send_wr wr = ibv_send_wr_sge.first;
wr.sg_list = &ibv_send_wr_sge.second;
pending_send_wr_queue_.pop();
ibv_send_wr* bad_wr = nullptr;
CHECK_EQ(ibv_post_send(qp_, &wr, &bad_wr), 0);
} else {
if (num_outstanding_send_wr_ > 0) { num_outstanding_send_wr_--; }
}
}
void IBVerbsQP::PostRecvRequest(ActorMsgMR* msg_mr) {
WorkRequestId* wr_id = NewWorkRequestId();
wr_id->msg_mr = msg_mr;
......
......@@ -70,6 +70,8 @@ class IBVerbsQP final {
void RecvDone(WorkRequestId*);
private:
void EnqueuePostSendReadWR(ibv_send_wr wr, ibv_sge sge);
void PostPendingSendWR();
WorkRequestId* NewWorkRequestId();
void DeleteWorkRequestId(WorkRequestId* wr_id);
ActorMsgMR* GetOneSendMsgMRFromBuf();
......@@ -83,6 +85,10 @@ class IBVerbsQP final {
std::mutex send_msg_buf_mtx_;
std::queue<ActorMsgMR*> send_msg_buf_;
std::mutex pending_send_wr_mutex_;
uint32_t num_outstanding_send_wr_;
uint32_t max_outstanding_send_wr_;
std::queue<std::pair<ibv_send_wr, ibv_sge>> pending_send_wr_queue_;
};
} // namespace oneflow
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment