驾驭3200Gbps网络(11): 多线程
在上一章中,解决了测速前的预热问题,然而传输速度依然仅为 293.461 Gbps,只达到了总带宽 3200 Gbps 的 9.2%。我们可以猜测,可能是因为 CPU 无法及时处理32张网卡的完成队列以及提交新的操作,导致了网卡的空闲。在这一章中,我们尝试使用多线程来解决这个问题。我们把本章的程序命名为 11_multithread.cpp
。
多线程
我们打算使用8个线程,每个线程负责1张显卡及其对应的4张网卡。
为了避免多个线程之间对状态读写的竞争(Race Condition),我们需要将服务器端状态机中的一些变量改变成原子类型(Atomic Type)。
struct RandomFillRequestState {
// ...
std::atomic<State> state = State::kWaitRequest;
std::atomic<size_t> posted_warmups = 0;
std::atomic<size_t> cnt_warmups = 0;
std::atomic<size_t> posted_write_ops = 0;
std::atomic<size_t> finished_write_ops = 0;
// ...
};
在服务器端的主循环中,我们为每一张显卡创建一个线程。
int ServerMain(int argc, char **argv) {
// ...
// Loop forever. Accept one client at a time.
for (;;) {
printf("------\n");
// State machine
RandomFillRequestState s(&nets, &net_groups, &cuda_bufs);
// RECV for CONNECT
nets[0].PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// RECV for RandomFillRequest
nets[0].PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// Multi-thread Poll completions
std::vector<std::thread> threads;
threads.reserve(num_gpus);
for (size_t gpu_idx = 0; gpu_idx < net_groups.size(); ++gpu_idx) {
// Start a thread for each GPU
threads.emplace_back([&s, gpu_idx] {
auto nets = (*s.net_groups)[gpu_idx].nets;
while (s.state != RandomFillRequestState::State::kDone) {
for (auto *net : nets) {
net->PollCompletion();
}
switch (s.state) {
case RandomFillRequestState::State::kWaitRequest:
break;
case RandomFillRequestState::State::kPostWarmup:
s.PostWarmup(gpu_idx);
break;
case RandomFillRequestState::State::kWaitWarmup:
break;
case RandomFillRequestState::State::kWrite:
s.ContinuePostWrite(gpu_idx);
break;
case RandomFillRequestState::State::kDone:
break;
}
}
});
}
for (auto &t : threads) {
t.join();
}
}
return 0;
}
以上便是全部的修改。
运行效果
从上面的视频中可以看到,我们的程序在多线程的情况下,传输速度达到了 355.301 Gbps,占用了总带宽的 11.1%。比起单线程的 293.461 Gbps,提升了 21%。不过这个速度还是远远低于我们的预期,我们还要继续努力。
本章代码:https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/11_multithread.cpp