上一章中,我们为多线程绑定了 CPU 核心,将传输速度提升到了 1237.738 Gbps,可惜依然只达到了总带宽 3200 Gbps 的 38.7%。显然 CPU 仍然是瓶颈,我们需要进一步优化。

回想我们把程序由单线程改为多线程的过程中,我们将一些变量改为了原子类型(Atomic Type),以避免多个线程之间对状态读写的竞争(Race Condition)。这其中包含了 posted_write_opsfinished_write_ops 这两个变量。因为在一秒钟内我们会提交和完成上百万个操作,所以对这两个原子变量的操作会非常频繁,有可能会成为瓶颈。

注意到在我们的程序中,每个线程负责一张显卡,他们之间的进度彼此是独立的。我们没有必要对每一个操作的提交和完成都进行原子自增操作。在本章中,我们尝试将不同线程的状态分片,使得大部分时候每个线程都不需要读写其他线程的状态。我们把本章的程序命名为 13_shard.cpp

状态分片

我们将 posted_write_opsfinished_write_ops 这两个原子变量改为了数组,每个线程只负责自己的状态。另外,我们增加一个原子变量 cnt_finished_gpus,用于同步已经完成的线程数。

struct RandomFillRequestState {
  // ...
  std::array<uint64_t, 8> posted_write_ops;
  std::array<uint64_t, 8> finished_write_ops;
  std::atomic<size_t> cnt_finished_gpus = 0;
  // ...
};

在状态机转变到 kWrite 之前,我们需要对这两个数组进行初始化。

struct RandomFillRequestState {
  // ...

  void HandleWarmupCompletion(Network &net, RdmaOp &op) {
    // ...

    // Prepare RDMA WRITE the data to remote GPU.
    printf("Started RDMA WRITE to the remote GPU memory.\n");
    // ...
    std::fill(posted_write_ops.begin(), posted_write_ops.end(), 0);
    std::fill(finished_write_ops.begin(), finished_write_ops.end(), 0);
    state = State::kWrite;
  }
};

在提交新的 WRITE 操作时,我们只需要对当前线程的状态进行操作。

struct RandomFillRequestState {
  // ...

  void ContinuePostWrite(size_t gpu_idx) {
    // ...
    group.nets[net_idx]->PostWrite(
        RdmaWriteOp{ ... },
        [this](Network &net, RdmaOp &op) { HandleWriteCompletion(net, op); });
    ++posted_write_ops[gpu_idx];
    // ...
  }
};

WRITE 操作的完成回调中,我们则需要稍微仔细一点处理。首先我们需要增加当前线程的 finished_write_ops。如果达到了输出进度的条件,那么我们就需要读取所有线程的进度,并进行求和。因为在输出进度的时候我们不要求完全准确的数值,所以我们没有必要加入额外的同步操作,完全可以直接脏读(Dirty Read)其他线程的状态。进一步地我们可以使用 AVX2 指令集来加速求和操作。

另一方面,如果当前线程完成的操作数达到了总操作数,那么我们就需要增加 cnt_finished_gpus,并检查是否所有线程都已经完成。如果所有线程都已经完成,那么我们就可以输出最终的进度并结束程序。因为 cnt_finished_gpus 是原子变量,所以我们这里可以保证正确性。而因为只有当当前线程完成了所有操作时才会读写 cnt_finished_gpus,所以这里也不会因为原子变量的竞争而导致性能下降。

uint64_t SumU64x8AVX2(const std::array<uint64_t, 8> &arr) {
  // https://www.perplexity.ai/search/c-sum-over-a-std-array-uint64-p8.MLN_mQeOwoik79acOxg
  __m256i sum_vec = _mm256_loadu_si256((const __m256i *)arr.data());
  sum_vec = _mm256_add_epi64(
      sum_vec, _mm256_loadu_si256((const __m256i *)(arr.data() + 4)));
  __m128i sum_128 = _mm_add_epi64(_mm256_extracti128_si256(sum_vec, 0),
                                  _mm256_extracti128_si256(sum_vec, 1));
  return _mm_extract_epi64(sum_128, 0) + _mm_extract_epi64(sum_128, 1);
}

struct RandomFillRequestState {
  // ...

  void HandleWriteCompletion(Network &net, RdmaOp &op) {
    auto gpu_finished_ops = ++finished_write_ops[op.write.buf->cuda_device];
    if (gpu_finished_ops % 16384 == 0) {
      auto now = std::chrono::high_resolution_clock::now();
      auto posted = SumU64x8AVX2(posted_write_ops);
      auto finished = SumU64x8AVX2(finished_write_ops);
      PrintProgress(now, posted, finished);
    }
    if (gpu_finished_ops == total_write_ops_per_gpu) {
      if (++cnt_finished_gpus == connect_msg->num_gpus) {
        state = State::kDone;
        PrintProgress(std::chrono::high_resolution_clock::now(),
                      total_write_ops, total_write_ops);
        printf("\nFinished all RDMA WRITEs to the remote GPU memory.\n");
      }
    }
  }
};

以上就是全部的改动。我们可以看到,通过状态分片,我们避免了多个线程之间对状态读写的竞争,提升了程序的性能。我们来看一下运行效果。

运行效果

从上面的视频中可以看到,我们对不同线程的状态进行分片之后,传输速度达到了 1522.567 Gbps,达到了总带宽 3200 Gbps 的 47.6%,相比起之前的 1237.738 Gbps,提升了 23%。这是一个不错的提升,不过离我们的目标还有一半的距离,我们还要继续努力。

本章代码:https://github.com/abcdabcd987/libfabric-efa-demo/blob/master/src/13_shard.cpp