驾驭3200Gbps网络(9): 使用32张网卡
在上一章中,我们已经搞清楚了系统的拓扑。而在第七章中我们已经能够在单张网卡上达到 97.844 Gbps 带宽。那么在这一章里,我们将使用8张显卡对应的32张网卡,看看我们的程序能达到多少带宽。我们把这个程序命名为 9_multinet.cpp
。
网络
我们首先对 Network
类进行一个微小的修改。一是我们记录下来网卡对应的显卡 cuda_device
。二是我们在打开网络的时候,让所有的网卡都共享同一个 fabric
对象。
struct Network {
// ...
int cuda_device;
};
Network Network::Open(struct fi_info *fi, int cuda_device,
struct fid_fabric *fabric) {
if (!fabric) {
FI_CHECK(fi_fabric(fi->fabric_attr, &fabric, nullptr));
}
// ...
return Network{fi, fabric, domain, cq, av, ep, addr, cuda_device};
}
负载均衡
因为我们所有 WRITE
操作的大小都是相同的,所以负载均衡非常容易,只需要轮流使用显卡对应的四张网卡即可。为此,我们可以实现一个简单的 NetworkGroup
类,保存四张网卡的信息,并且使用 Round-Robin 来选择下一张网卡。
struct NetworkGroup {
std::vector<Network *> nets;
uint8_t rr_mask;
uint8_t rr_idx = 0;
NetworkGroup(std::vector<Network *> &&nets) {
CHECK(nets.size() <= kMaxNetworksPerGroup);
CHECK((nets.size() & (nets.size() - 1)) == 0); // power of 2
this->rr_mask = nets.size() - 1;
this->nets = std::move(nets);
}
NetworkGroup(const NetworkGroup &) = delete;
NetworkGroup(NetworkGroup &&) = default;
uint8_t GetNext() {
rr_idx = (rr_idx + 1) & rr_mask;
return rr_idx;
}
};
CONNECT
消息
在之前的程序中,因为只使用了一张网卡,所以客户端只需要向服务器端发送一个地址即可。而现在,因为我们需要使用多张网卡,所以我们在 CONNECT
消息中需要发送显卡的数量、网卡的数量、以及每张网卡的地址和内存区域。和之前一样,我们将定长的数据放在结构体当中,而变长的数据放在结构体之后的内存中。
struct AppConnectMessage {
AppMessageBase base;
size_t num_gpus;
size_t num_nets;
size_t num_mr;
EfaAddress &net_addr(size_t index) {
CHECK(index < num_nets);
return ((EfaAddress *)((uintptr_t)&base + sizeof(*this)))[index];
}
MemoryRegion &mr(size_t index) {
CHECK(index < num_mr);
return ((MemoryRegion *)((uintptr_t)&base + sizeof(*this) +
num_nets * sizeof(EfaAddress)))[index];
}
size_t MessageBytes() const {
return sizeof(*this) + num_nets * sizeof(EfaAddress) +
num_mr * sizeof(MemoryRegion);
}
};
服务器端逻辑
从一张网卡变成8张显卡32张网卡,服务器端的状态机的一些变量需要由单个变量变成数组。
constexpr size_t kMaxNetworksPerGroup = 4;
struct RandomFillRequestState {
std::vector<Network> *nets;
std::vector<NetworkGroup> *net_groups;
std::vector<Buffer> *cuda_bufs;
std::vector<std::array<fi_addr_t, kMaxNetworksPerGroup>> remote_addrs;
std::vector<WriteState> write_states;
// ...
};
当收到 CONNECT
消息时,我们需要让服务器端和客户端的网卡一一对应起来,增加到对应的地址向量中。
struct RandomFillRequestState {
// ...
void HandleConnect(Network &net, RdmaOp &op) {
// ...
// Assuming remote has the same number of GPUs and NICs.
CHECK(msg.num_gpus == cuda_bufs->size());
CHECK(msg.num_nets == nets->size());
// Add peer addresses
nets_per_gpu = msg.num_nets / msg.num_gpus;
buf_per_gpu = connect_msg->num_mr / connect_msg->num_nets;
for (size_t i = 0; i < msg.num_gpus; ++i) {
std::array<fi_addr_t, kMaxNetworksPerGroup> addrs = {};
for (size_t j = 0; j < nets_per_gpu; ++j) {
auto idx = i * nets_per_gpu + j;
addrs[j] = nets->at(idx).AddPeerAddress(msg.net_addr(idx));
}
remote_addrs.push_back(addrs);
}
// Initialize write states
write_states.resize(connect_msg->num_gpus);
}
};
我们也需要对 ContinuePostWrite()
函数进行一些修改。我们传入一个 gpu_idx
参数,表示只提交这个显卡对应的网卡的 WRITE
操作。首先是先用 NetworkGroup
选择下一个网卡。然后我们要找到出来这个网卡对应的客户端地址 dest_addr
、内存区域地址 mr.addr
、以及内存区域的远程访问密钥 mr.rkey
。在设置立即数上面,我们也要注意判断是否是当前网卡的最后一个 WRITE
操作(s.i_page + nets_per_gpu >= num_pages
)。
struct RandomFillRequestState {
// ...
void ContinuePostWrite(size_t gpu_idx) {
auto &s = write_states[gpu_idx];
if (s.i_repeat == total_repeat)
return;
auto page_size = request_msg->page_size;
auto num_pages = request_msg->num_pages;
auto net_idx = (*net_groups)[gpu_idx].GetNext();
uint32_t imm_data = 0;
if (s.i_repeat + 1 == total_repeat && s.i_buf + 1 == buf_per_gpu &&
s.i_page + nets_per_gpu >= num_pages) {
// The last WRITE. Pass remote context back.
imm_data = request_msg->remote_context;
}
const auto &mr = connect_msg->mr(
(gpu_idx * nets_per_gpu + net_idx) * buf_per_gpu + s.i_buf);
(*net_groups)[gpu_idx].nets[net_idx]->PostWrite(
RdmaWriteOp{.buf = &(*cuda_bufs)[gpu_idx],
.offset = s.i_buf * (page_size * num_pages) + s.i_page * page_size,
.len = page_size,
.imm_data = imm_data,
.dest_ptr = mr.addr + request_msg->page_idx(s.i_page) * page_size,
.dest_addr = remote_addrs[gpu_idx][net_idx],
.dest_key = mr.rkey},
[this](Network &net, RdmaOp &op) { HandleWriteCompletion(); });
++posted_write_ops;
if (++s.i_page == num_pages) {
s.i_page = 0;
if (++s.i_buf == buf_per_gpu) {
s.i_buf = 0;
if (++s.i_repeat == total_repeat)
return;
}
}
}
};
在服务器端的主程序,我们首先检测系统拓扑,然后打开所有的网卡,并分组建立网络组。
int ServerMain(int argc, char **argv) {
// Topology detection
struct fi_info *info = GetInfo();
auto topo_groups = DetectTopo(info);
int num_gpus = topo_groups.size();
int num_nets = topo_groups[0].fi_infos.size() * topo_groups.size();
int nets_per_gpu = num_nets / num_gpus;
// Open Netowrk
std::vector<Network> nets;
std::vector<NetworkGroup> net_groups;
for (int cuda_device = 0; cuda_device < num_gpus; ++cuda_device) {
std::vector<Network *> group_nets;
for (auto *fi : topo_groups[cuda_device].fi_infos) {
int cuda_device = nets.size() / nets_per_gpu;
auto *fabric = nets.empty() ? nullptr : nets[0].fabric;
nets.push_back(Network::Open(fi, cuda_device, fabric));
group_nets.push_back(&nets.back());
}
net_groups.push_back(NetworkGroup(std::move(group_nets)));
}
PrintTopologyGroups(topo_groups);
// ...
}
然后我们在每张显卡上都分配一个缓冲区。同时,向这张显卡对应的所有的网卡注册这个内存区域。
// Allocate and register CUDA memory
printf("Registered MR from");
std::vector<Buffer> cuda_bufs;
for (int i = 0; i < num_gpus; ++i) {
CUDA_CHECK(cudaSetDevice(i));
cuda_bufs.push_back(Buffer::AllocCuda(kMemoryRegionSize * 2, kBufAlign));
for (int j = 0; j < nets_per_gpu; ++j) {
nets[i * nets_per_gpu + j].RegisterMemory(cuda_bufs.back());
}
printf(" cuda:%d", i);
fflush(stdout);
}
printf("\n");
我们打算使用第一张网卡来接收客户端连接,所以我们分配两个缓冲区并在第一张网卡上注册内存区域。
// Allocate and register message buffer
auto buf1 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
auto buf2 = Buffer::Alloc(kMessageBufferSize, kBufAlign);
nets[0].RegisterMemory(buf1);
nets[0].RegisterMemory(buf2);
在主循环中,我们先在第一张网卡上提交两个 RECV
操作。在状态机未完成前,我们每次遍历每一个显卡。对于一张显卡,我们先处理其对应的所有的网卡的完成队列。然后如果状态机的状态处于 kWrite
,则继续提交更多的 WRITE
操作。
// Loop forever. Accept one client at a time.
for (;;) {
printf("------\n");
// State machine
RandomFillRequestState s(&nets, &net_groups, &cuda_bufs);
// RECV for CONNECT
nets[0].PostRecv(buf1, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// RECV for RandomFillRequest
nets[0].PostRecv(buf2, [&s](Network &net, RdmaOp &op) { s.OnRecv(net, op); });
// Wait for completion
while (s.state != RandomFillRequestState::State::kDone) {
for (size_t gpu_idx = 0; gpu_idx < net_groups.size(); ++gpu_idx) {
for (auto *net : net_groups[gpu_idx].nets) {
net->PollCompletion();
}
switch (s.state) {
case RandomFillRequestState::State::kWaitRequest:
break;
case RandomFillRequestState::State::kWrite:
s.ContinuePostWrite(gpu_idx);
break;
case RandomFillRequestState::State::kDone:
break;
}
}
}
}
return 0;
客户端逻辑
客户端逻辑的修改和服务器端类似。首先检测系统拓扑,然后打开所有的网卡,并分组建立网络组。然后在每张显卡上分配缓冲区并在其对应的网卡上注册内存区域。再在第一张网卡上分配缓冲区和注册内存区域。这里不再赘述。
然后我们需要在 CONNECT
消息中包含显卡的数量、网卡的数量、每张网卡的地址、每张网卡上的内存区域。然后使用第一张网卡向服务器端发送这个消息。
int ClientMain(int argc, char **argv) {
// ...
// Send address to server
auto &connect_msg = *(AppConnectMessage *)buf1.data;
connect_msg = {
.base = {.type = AppMessageType::kConnect},
.num_gpus = (size_t)num_gpus,
.num_nets = nets.size(),
.num_mr = nets.size() * 2,
};
for (size_t i = 0; i < nets.size(); i++) {
connect_msg.net_addr(i) = nets[i].addr;
int cuda_device = nets[i].cuda_device;
connect_msg.mr(i * 2) = {
.addr = (uint64_t)cuda_bufs1[cuda_device].data,
.size = cuda_bufs1[cuda_device].size,
.rkey = nets[i].GetMR(cuda_bufs1[cuda_device])->key,
};
connect_msg.mr(i * 2 + 1) = {
.addr = (uint64_t)cuda_bufs2[cuda_device].data,
.size = cuda_bufs2[cuda_device].size,
.rkey = nets[i].GetMR(cuda_bufs2[cuda_device])->key,
};
}
bool connect_sent = false;
nets[0].PostSend(
server_addr, buf1, connect_msg.MessageBytes(),
[&connect_sent](Network &net, RdmaOp &op) { connect_sent = true; });
while (!connect_sent) {
nets[0].PollCompletion();
}
}
在等待 REMOTE WRITE
回调函数的时候,之前的程序只需要等待一次回调,而现在我们需要在每张网卡上都等待一次回调。
// Prepare to receive the last REMOTE WRITE from server
int cnt_last_remote_write_received = 0;
uint32_t remote_write_op_id = 0x123;
for (auto &net : nets) {
net.AddRemoteWrite(remote_write_op_id, [&cnt_last_remote_write_received](
Network &net, RdmaOp &op) {
++cnt_last_remote_write_received;
});
}
// Send message to server
// ...
// Wait for REMOTE WRITE from server
while (cnt_last_remote_write_received != num_nets) {
for (auto &net : nets) {
net.PollCompletion();
}
}
printf("Received RDMA WRITE to local GPU memory.\n");
到此我们就完成了所有的修改。
运行效果
在上面的视频中我们可以看到,我们确实能够使用8张显卡32张网卡,然而传输速度仅为 287.089 Gbps,只达到了总带宽 3200 Gbps 的 9.0%。接下来我们将一步一步地优化我们的程序,直到我们能打满 3200 Gbps 的带宽。
本章代码:https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/9_multinet.cpp