驾驭3200Gbps网络(13): 状态分片
在上一章中,我们为多线程绑定了 CPU 核心,将传输速度提升到了 1237.738 Gbps,可惜依然只达到了总带宽 3200 Gbps 的 38.7%。显然 CPU 仍然是瓶颈,我们需要进一步优化。
回想我们把程序由单线程改为多线程的过程中,我们将一些变量改为了原子类型(Atomic Type),以避免多个线程之间对状态读写的竞争(Race Condition)。这其中包含了 posted_write_ops
和 finished_write_ops
这两个变量。因为在一秒钟内我们会提交和完成上百万个操作,所以对这两个原子变量的操作会非常频繁,有可能会成为瓶颈。
注意到在我们的程序中,每个线程负责一张显卡,他们之间的进度彼此是独立的。我们没有必要对每一个操作的提交和完成都进行原子自增操作。在本章中,我们尝试将不同线程的状态分片,使得大部分时候每个线程都不需要读写其他线程的状态。我们把本章的程序命名为 13_shard.cpp
。
状态分片
我们将 posted_write_ops
和 finished_write_ops
这两个原子变量改为了数组,每个线程只负责自己的状态。另外,我们增加一个原子变量 cnt_finished_gpus
,用于同步已经完成的线程数。
struct RandomFillRequestState {
// ...
std::array<uint64_t, 8> posted_write_ops;
std::array<uint64_t, 8> finished_write_ops;
std::atomic<size_t> cnt_finished_gpus = 0;
// ...
};
在状态机转变到 kWrite
之前,我们需要对这两个数组进行初始化。
struct RandomFillRequestState {
// ...
void HandleWarmupCompletion(Network &net, RdmaOp &op) {
// ...
// Prepare RDMA WRITE the data to remote GPU.
printf("Started RDMA WRITE to the remote GPU memory.\n");
// ...
std::fill(posted_write_ops.begin(), posted_write_ops.end(), 0);
std::fill(finished_write_ops.begin(), finished_write_ops.end(), 0);
state = State::kWrite;
}
};
在提交新的 WRITE
操作时,我们只需要对当前线程的状态进行操作。
struct RandomFillRequestState {
// ...
void ContinuePostWrite(size_t gpu_idx) {
// ...
group.nets[net_idx]->PostWrite(
RdmaWriteOp{ ... },
[this](Network &net, RdmaOp &op) { HandleWriteCompletion(net, op); });
++posted_write_ops[gpu_idx];
// ...
}
};
在 WRITE
操作的完成回调中,我们则需要稍微仔细一点处理。首先我们需要增加当前线程的 finished_write_ops
。如果达到了输出进度的条件,那么我们就需要读取所有线程的进度,并进行求和。因为在输出进度的时候我们不要求完全准确的数值,所以我们没有必要加入额外的同步操作,完全可以直接脏读(Dirty Read)其他线程的状态。进一步地我们可以使用 AVX2 指令集来加速求和操作。
另一方面,如果当前线程完成的操作数达到了总操作数,那么我们就需要增加 cnt_finished_gpus
,并检查是否所有线程都已经完成。如果所有线程都已经完成,那么我们就可以输出最终的进度并结束程序。因为 cnt_finished_gpus
是原子变量,所以我们这里可以保证正确性。而因为只有当当前线程完成了所有操作时才会读写 cnt_finished_gpus
,所以这里也不会因为原子变量的竞争而导致性能下降。
uint64_t SumU64x8AVX2(const std::array<uint64_t, 8> &arr) {
// https://www.perplexity.ai/search/c-sum-over-a-std-array-uint64-p8.MLN_mQeOwoik79acOxg
__m256i sum_vec = _mm256_loadu_si256((const __m256i *)arr.data());
sum_vec = _mm256_add_epi64(
sum_vec, _mm256_loadu_si256((const __m256i *)(arr.data() + 4)));
__m128i sum_128 = _mm_add_epi64(_mm256_extracti128_si256(sum_vec, 0),
_mm256_extracti128_si256(sum_vec, 1));
return _mm_extract_epi64(sum_128, 0) + _mm_extract_epi64(sum_128, 1);
}
struct RandomFillRequestState {
// ...
void HandleWriteCompletion(Network &net, RdmaOp &op) {
auto gpu_finished_ops = ++finished_write_ops[op.write.buf->cuda_device];
if (gpu_finished_ops % 16384 == 0) {
auto now = std::chrono::high_resolution_clock::now();
auto posted = SumU64x8AVX2(posted_write_ops);
auto finished = SumU64x8AVX2(finished_write_ops);
PrintProgress(now, posted, finished);
}
if (gpu_finished_ops == total_write_ops_per_gpu) {
if (++cnt_finished_gpus == connect_msg->num_gpus) {
state = State::kDone;
PrintProgress(std::chrono::high_resolution_clock::now(),
total_write_ops, total_write_ops);
printf("\nFinished all RDMA WRITEs to the remote GPU memory.\n");
}
}
}
};
以上就是全部的改动。我们可以看到,通过状态分片,我们避免了多个线程之间对状态读写的竞争,提升了程序的性能。我们来看一下运行效果。
运行效果
从上面的视频中可以看到,我们对不同线程的状态进行分片之后,传输速度达到了 1522.567 Gbps,达到了总带宽 3200 Gbps 的 47.6%,相比起之前的 1237.738 Gbps,提升了 23%。这是一个不错的提升,不过离我们的目标还有一半的距离,我们还要继续努力。
本章代码:https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/13_shard.cpp