C++ 中文周刊 2026-05-23 第202期

周刊项目地址

公众号

点击「查看原文」跳转到 GitHub 上对应文件,链接就可以点击了

qq群 753792291 答疑在这里

RSS

欢迎投稿,推荐或自荐文章/软件/资源等,评论区留言

本期文章 没有 赞助 求赞助各位老板


资讯

标准委员会动态/ide/编译器信息放在这里

编译器信息最新动态推荐关注hellogcc公众号 本周更新 2025-01-08 第288期

性能周刊

文章

从零写一个快一点的 lock-free queue

mutex + std::queue 讲到 Michael-Scott queue、ABA、batching、hazard pointer。主题很老,但代码贴得多,适合复习 lock-free 基础。

基准版:

template <typename T>
class SimpleQueue {
 public:
  void Push(T* item, uint32_t threadId) {
    (void)threadId;
    std::unique_lock lock(m_Mutex);
    m_Queue.push(*item);
  }

  bool Pop(T* item, uint32_t threadId) {
    (void)threadId;
    std::unique_lock lock(m_Mutex);
    if (m_Queue.empty()) return false;
    *item = std::move(m_Queue.front());
    m_Queue.pop();
    return true;
  }

 private:
  std::queue<T> m_Queue;
  mutable std::mutex m_Mutex;
};

低竞争下没问题,高竞争下锁竞争导致 context switch,队列操作几纳秒,切线程几微秒,直接高几个数量级。我谢谢你。

CAS 基础:

std::atomic<int> value{42};

int expected = 42;
int desired = 100;
if (value.compare_exchange_strong(expected, desired)) {
  // success! value was 42, now it is 100
} else {
  // failure, `expected` now holds the actual current value
}

朴素 lock-free queue:

template <typename T>
struct NaiveNode {
  T data;
  std::atomic<NaiveNode*> next;
};

template <typename T>
class NaiveLockFreeQueue {
 public:
  NaiveLockFreeQueue() {
    auto dummy = new NaiveNode<T>();
    dummy->next.store(nullptr);
    head.store(dummy);
    tail.store(dummy);
  }

  void Push(T* value, uint32_t /*threadId*/) {
    auto newNode = new NaiveNode<T>();
    newNode->data = *value;
    newNode->next.store(nullptr, std::memory_order::relaxed);

    while (true) {
      auto currentHead = head.load(std::memory_order::acquire);
      auto currentNext = currentHead->next.load(std::memory_order::acquire);

      if (currentHead != head.load(std::memory_order::acquire)) continue;

      if (currentNext == nullptr) {
        NaiveNode<T>* expected = nullptr;
        if (currentHead->next.compare_exchange_strong(expected, newNode)) {
          head.compare_exchange_strong(currentHead, newNode);
          return;
        }
      } else {
        head.compare_exchange_strong(currentHead, currentNext);
      }
    }
  }

  bool Pop(T* value, uint32_t /*threadId*/) {
    while (true) {
      auto currentTail = tail.load(std::memory_order::acquire);
      auto currentHead = head.load(std::memory_order::acquire);
      auto next = currentTail->next.load(std::memory_order::acquire);

      if (currentTail != tail.load(std::memory_order::acquire)) continue;

      if (currentTail == currentHead) {
        if (next == nullptr) return false;
        head.compare_exchange_strong(currentHead, next);
      } else {
        if (!next) continue;
        *value = next->data;
        if (tail.compare_exchange_strong(currentTail, next)) {
          delete currentTail;
          return true;
        }
      }
    }
  }

 private:
  std::atomic<NaiveNode<T>*> head;
  std::atomic<NaiveNode<T>*> tail;
};

坑也很经典:

Part II:Batching。 朴素版本三个问题:分配太多、cache locality 差、内存回收不安全。先解决前两个:一个节点不存一个元素,存一整个固定大小数组。比如 block size=1024,那就是 1024 次 push 才需要一次节点分配,pointer chasing 也从每个元素一次变成每 1024 个元素一次。

每个 slot 里有数据和一个 commit flag。注意“抢到 slot index”和“数据已经写完”不是一回事,中间有窗口,所以 consumer 不能抢到 index 就直接读。

template <typename T, bool BlockingRead>
  requires std::is_trivially_copyable_v<T>
class FastQueueNodeSlot {
 public:
  FORCE_INLINE FastQueueNodeSlot() {
    commited.clear(std::memory_order::relaxed);
  }

  FORCE_INLINE void Commit(T* value) {
    memcpy(&data, value, sizeof(T));
    commited.test_and_set(std::memory_order::release);
    if constexpr (BlockingRead) {
      commited.notify_one();
    }
  }

  FORCE_INLINE bool IsCommited() const {
    return commited.test(std::memory_order::acquire);
  }

  FORCE_INLINE void WaitTillCommited() const {
    while (!IsCommited()) {
      commited.wait(false, std::memory_order::relaxed);
    }
  }

  FORCE_INLINE bool TryRead(T* value) const {
    if (IsCommited()) {
      memcpy(value, &data, sizeof(T));
      return true;
    }
    return false;
  }

  FORCE_INLINE void Read(T* value) const {
    WaitTillCommited();
    memcpy(value, &data, sizeof(T));
  }

  FORCE_INLINE void Reset() { commited.clear(std::memory_order::relaxed); }

 private:
  T data;
  std::atomic_flag commited;
};

这里限制 T 必须 trivially copyable,所以能直接 memcpy。你不能直接塞 std::string/unique_ptr,要么塞 POD,要么塞指针。换来的是没有构造/析构/异常安全那一堆麻烦,hot path 更干净。

C++20 的 atomic_flag::wait/notify_one 也挺有意思,blocking 版本不用 mutex + condition_variable,底下可以走 futex/WaitOnAddress 这类 atomic wait。

节点结构:

template <typename T, uint32_t Size, bool BlockingRead>
class FastQueueNode {
 public:
  using Self = FastQueueNode<T, Size, BlockingRead>;

  FORCE_INLINE FastQueueNode() { Reset(); }

  FORCE_INLINE bool IsUsedUp(std::memory_order memoryOrder) {
    return (tail.load(memoryOrder) >= Size);
  }

  FORCE_INLINE bool Push(T* value) {
    auto oldHead = head.load(std::memory_order::acquire);
    while (true) {
      if (oldHead == Size) {
        return false;
      }
      auto newHead = oldHead + 1;
      if (head.compare_exchange_strong(oldHead, newHead,
        std::memory_order::seq_cst,
        std::memory_order::acquire)) {
        slots[oldHead].Commit(value);
        return true;
      }
    }
  }

  FORCE_INLINE bool Pop(T* value) {
    auto currentTail = tail.load(std::memory_order::seq_cst);
    while (true) {
      auto currentHead = head.load(std::memory_order::acquire);
      if (currentTail >= currentHead) {
        return false;
      }
      if constexpr (!BlockingRead) {
        if (!slots[currentTail].IsCommited()) {
          return false;
        }
      }
      auto newTail = currentTail + 1;
      if (tail.compare_exchange_strong(currentTail, newTail,
        std::memory_order::seq_cst,
        std::memory_order::acquire)) {
        slots[currentTail].Read(value);
        return true;
      }
    }
    return false;
  }

  FORCE_INLINE void Reset() {
    head.store(0, std::memory_order::release);
    tail.store(0, std::memory_order::release);
    next.store(nullptr, std::memory_order::release);
    for (auto& slot : slots) {
      slot.Reset();
    }
  }

  FORCE_INLINE Self* Next(std::memory_order memoryOrder) {
    return next.load(memoryOrder);
  }

  FORCE_INLINE bool TrySetNext(Self*& expected, Self* value) {
    return next.compare_exchange_strong(expected, value,
      std::memory_order::seq_cst,
      std::memory_order::acquire);
  }

 private:
  alignas(64) std::array<FastQueueNodeSlot<T, BlockingRead>, Size> slots;
  alignas(64) std::atomic<uint32_t> head;
  alignas(64) std::atomic<uint32_t> tail;
  alignas(64) std::atomic<Self*> next;
};

alignas(64) 是为了防 false sharing。headtailnext 都是热 atomic,放同一个 cache line 上就是互相扇嘴巴,cache coherence 会疯狂 invalidate。浪费一点 padding,比 cache line ping-pong 强。

Part III:Hazard Pointer。 batching 降低了分配频率,但 delete oldTail 仍然不安全。hazard pointer 的规则是:我要 dereference 某个共享指针之前,先把这个指针发布到一个全局可见的位置,告诉所有线程“这个地址我正在看,别删”。

每线程状态:

template <typename T, uint32_t PointerCount>
struct alignas(64) HazardThreadState {
  std::array<std::atomic<T*>, PointerCount> hazardPointers;
  std::vector<T*> retiredPointers;
  std::vector<T*> scratchBuffer;
};

核心 Protect

T* Protect(const std::atomic<T*>& ptr, uint32_t threadId, uint32_t slotId) {
  assert(threadId < ThreadCount);
  assert(slotId < PointerCount);

  T* p = nullptr;
  do {
    p = ptr.load(std::memory_order::acquire);
    state[threadId].hazardPointers[slotId].store(p, std::memory_order::seq_cst);
  } while (p && p != ptr.load(std::memory_order::seq_cst));
  return p;
}

void Release(uint32_t threadId, uint32_t slotId) {
  state[threadId].hazardPointers[slotId].store(nullptr,
    std::memory_order::release);
}

重点是“load → publish → reload 检查是否还一样”。如果第一次 load 之后别的线程已经把指针换掉并 retire 了,第二次检查会发现不一致,重来。这样 Protect 返回时,至少能保证:我发布 hazard 之后,这个 atomic 仍然指向它。别人 Scan 时会看到我的 hazard,不会 delete。

删除逻辑不是直接 delete,而是 Retire 进 retired list,攒到阈值再 Scan

template <typename Deleter = std::nullptr_t>
  requires(std::invocable<Deleter, T*> ||
           std::same_as<Deleter, std::nullptr_t>)
void Scan(uint32_t threadId, Deleter deleter) {
  auto& activePointersTracker = state[threadId].scratchBuffer;
  activePointersTracker.clear();

  for (const auto& list : state) {
    for (const auto& pointer : list.hazardPointers) {
      T* ptr = pointer.load(std::memory_order::seq_cst);
      if (ptr) activePointersTracker.push_back(ptr);
    }
  }

  std::sort(activePointersTracker.begin(), activePointersTracker.end());

  auto& retiredList = state[threadId].retiredPointers;
  for (uint32_t i = 0; i < retiredList.size();) {
    if (!std::binary_search(activePointersTracker.begin(),
                            activePointersTracker.end(), retiredList[i])) {
      if constexpr (!std::same_as<Deleter, std::nullptr_t>) {
        deleter(retiredList[i]);
      } else {
        delete retiredList[i];
      }
      retiredList[i] = retiredList.back();
      retiredList.pop_back();
    } else {
      i++;
    }
  }
}

这里两个性能点:scratch buffer 预分配复用;sort + binary_search 把朴素 O(N*R) 变成 O((N+R)logN)。另外 Retire 只能在 CAS 成功、当前线程真的“拿到这个节点所有权”之后调用,不然多个线程 retire 同一个指针就等着 double free。

为了避免 Protect 之后忘记 Release,搞个RAII回收guard, DEFER

template <typename F>
  requires std::is_invocable_v<F>
class Defer {
 public:
  explicit Defer(F payload) : deferedPayload(payload) {}
  ~Defer() { std::invoke(deferedPayload); }
 private:
  F deferedPayload;
};

#define CONCAT_IMPL(x, y) x##y
#define CONCAT(x, y) CONCAT_IMPL(x, y)
#define DEFER(...) \
  const auto CONCAT(__deferedPayload, __LINE__) = \
    Defer([&]() { __VA_ARGS__ });

用起来:

Node* current = hp.Protect(head, threadId, 0);
DEFER(hp.Release(threadId, 0););

Part IV:thread-local cache。 hazard pointer 解决了“什么时候能删”,但如果最终还是 delete,多线程 allocator 还是瓶颈。所以改成每个线程一个 node cache:retire 后不 delete,丢回当前线程 cache;下次需要新 node,先从 cache 拿。

Node* AcquireNewNode(uint32_t threadId) {
  if constexpr (ThreadLocalCacheSize > 0) {
    auto& chBuff = tcBuff[threadId];
    if (chBuff->size() > 0) {
      auto item = chBuff->back();
      item->Reset();
      chBuff->pop_back();
      return item;
    }
  }
  return new Node();
}

void ReleaseNewNode(Node* ptr, uint32_t threadId) {
  if constexpr (ThreadLocalCacheSize > 0) {
    auto& chBuff = tcBuff[threadId];
    if (chBuff->size() < ThreadLocalCacheSize) {
      chBuff->push_back(ptr);
      return;
    }
  }
  delete ptr;
}

if constexpr (ThreadLocalCacheSize > 0) 的意思是:你把 cache size 配成 0,相关代码在二进制里直接消失,零运行时开销。

最终 FastQueue 的 Push/Pop:

template <typename T, uint32_t NodeBufferSize, uint32_t ThreadCount,
  bool BlockingRead = false, uint32_t ThreadLocalCacheSize = 256>
class FastQueue {
 public:
  using Node = FastQueueNode<T, NodeBufferSize, BlockingRead>;

  FORCE_INLINE void Push(T* value, uint32_t threadId) {
    Node *current = nullptr, *next = nullptr;
    while (true) {
      current = hp.Protect(head, threadId, 0);
      DEFER(hp.Release(threadId, 0););

      if (current->Push(value)) break;

      next = current->Next(std::memory_order::acquire);

      if (next == nullptr) {
        auto newNode = AcquireNewNode(threadId);
        if (!current->TrySetNext(next, newNode)) {
          ReleaseNewNode(newNode, threadId);
        }
        continue;
      }

      head.compare_exchange_strong(current, next, std::memory_order::seq_cst,
        std::memory_order::acquire);
    }
  }

  FORCE_INLINE bool Pop(T* value, uint32_t threadId) {
    Node *currentTail = nullptr, *next = nullptr;

    while (true) {
      currentTail = hp.Protect(tail, threadId, 1);
      DEFER(hp.Release(threadId, 1););

      if (currentTail->Pop(value)) return true;
      if (!currentTail->IsUsedUp(std::memory_order::seq_cst)) return false;

      next = currentTail->Next(std::memory_order::acquire);
      if (!next) return false;

      if (tail.compare_exchange_strong(currentTail, next,
        std::memory_order::seq_cst,
        std::memory_order::acquire)) {
        hp.Retire(currentTail, threadId, [this, threadId](Node* node){
          this->ReleaseNewNode(node, threadId);
        });
      }
    }
  }

 private:
  using ThreadLocalCacheBuffer = std::vector<Node*>;

  alignas(64) std::atomic<Node*> head;
  alignas(64) std::atomic<Node*> tail;
  alignas(64) HazardPointer<Node, ThreadCount, 4, 16> hp;
  alignas(64) std::array<Aligned<ThreadLocalCacheBuffer, 64>,
    (ThreadLocalCacheSize > 0) ? ThreadCount : 1> tcBuff;
};

模板参数基本都是策略开关:

benchmark: 5,000,000 个 item,clang++ -O3 -std=c++20,16 核 Intel + HT:

场景 SimpleQueue(mutex) NaiveLockFree FastQueue 加速
MPMC Low (2P/2C) 250.7ms 797.1ms 150.8ms 1.6x
MPMC Med (8P/8C) 2152.2ms 2426.5ms 309.7ms 6.9x
MPMC High (16P/16C) 2600.7ms 2413.5ms 299.6ms 8.6x
SPMC Med (1P/8C) 3138.0ms 1595.1ms 253.9ms 12.3x
MPSC Med (8P/1C) 563.2ms 1865.2ms 252.2ms 2.2x

几个观察:

这篇文章的核心价值不是“大家复制这份代码上生产”,而是把几个关键点串起来了:batching 降低分配和 pointer chasing、hazard pointer 解决 ABA/安全回收、thread-local cache 避免 allocator 锁、cache line alignment 避免 false sharing

最后还是那句话:lock-free 能快,但复杂度是真贵。你要是没 benchmark 证明 mutex 是瓶颈,就别上来整这套,用户没受益,维护者先暴毙。


从 NIC 到 P99:低延迟 C++ 交易系统怎么做

HFT/低延迟系统入门综述,目标是 p95 wire-to-wire latency < 50us。wire-to-wire 指从 NIC 收到行情包,到同一张 NIC 发出订单,中间所有处理都算进去,精确测量要靠 NIC hardware timestamp。

典型 pipeline:Market Data Ingestion → Filtering/Checks → Pricing Engine → Strategy Evaluation → OMS。

关键点:

部署 checklist 大概是:关 C-states/Turbo Boost、CPU governor 设 performance、关 PCIe ASPM、开 hugepages、isolcpus/nohz_full/rcu_nocbsmlockall、cache-align、lock-free SPSC queue、TSC 计时。

这类文章有点低延迟八股文。不过重点在于:P99 很多时候死在 OS/硬件/观测缺失上,不是死在你少写了两条 AVX 指令上


C++26: 又多了两个 function wrapper

C++26 补齐 type-erased callable wrapper:std::copyable_functionstd::function_ref

std::function 的老毛病是 const-correctness 有坑:

#include <functional>
#include <iostream>

struct Counter {
  int counter = 0;
  void operator()() {
    ++counter;
    std::cout << "modifying state: " << counter << '\n';
  }
};

int main() {
  const std::function<void()> f = Counter{};
  f(); // OK (!)
}

std::function::operator()const,但能调用内部对象非 const operator(),ABI 固化,没法修。

std::copyable_function 基本是 copyable 版 std::move_only_function,签名里的 qualifier 真正控制 operator()

#include <functional>

struct Counter {
  int count = 0;
  int operator()() { return ++count; } // non-const
};

int main() {
  std::copyable_function<int()> f = Counter{};
  f(); // OK

  // std::copyable_function<int() const> g = Counter{}; // ERROR
  std::copyable_function<int() const> h = []{ return 42; }; // OK
}

std::function_ref 是 non-owning callable reference,callable 版 string_view

// function pointer -- too restrictive, no lambdas with captures:
payload retry(size_t times, payload(*action)());

// template -- works but bloats binary, must be in header:
template<class F>
payload retry(size_t times, F&& action);

// function_ref -- lightweight, non-owning, no allocation:
payload retry(size_t times, std::function_ref<payload()> action);

选择:callback 参数用 function_ref;存 callable 不复制用 move_only_function;要复制用 copyable_functionstd::function 新代码能不用就不用。省流:std::function 终于可以慢慢退役了。


C++ ref-qualifier 有什么用

ref-qualifier 给成员函数加 & / &&,让函数根据 *this 是 lvalue 还是 rvalue 参与重载。

struct refqualified {
  mytype var;

  const mytype& getVar() const& { return var; }
  void setVar(const mytype& v) & { var = v; }
  mytype getVar() && { return var; }
};

常见组合:

f(...) const& // const lvalue 上可调
f(...) &      // 只能 lvalue 调,T{}.f() 不行
f(...) &&     // 只能 rvalue/temporary 调

一个 fluent interface 例子,只允许临时对象链式配置:

enum loglevel { NONE, SOME, FULL };

struct Logger {
  loglevel level = loglevel::NONE;
  std::string sink = "none";
  bool debug = false;

  Logger&& setLogLevel(loglevel ll) && { level = ll; return std::move(*this); }
  Logger&& setSink(std::string s) && { sink = s; return std::move(*this); }
  Logger&& setDebug(bool d = true) && { debug = d; return std::move(*this); }
};

Logger l = Logger().setSink("mysink").setDebug();
// l.setLogLevel(loglevel::SOME); // error

这个例子有点刻意,但味道能看出来:把 API 的生命周期/值类别约束写进类型系统。尤其成员函数返回内部引用时,const& 返回引用,&& 返回值/移动值,可以避免从临时对象里拿悬垂引用。


Converting an Integer to a Decimal String in Under Two Nanoseconds

Lemire 和 Jaël Champagne Gareau 的论文:整数转十进制字符串做到 2ns 以内。传统做法是反复除以 10/100,再查表拼字符。论文走 SIMD + AVX-512 IFMA:用 integer multiply-add 同时算多个 quotient/remainder,去掉 lookup table。

摘要里的核心数据:

适合 JSON/CSV/logging 这种整数转字符串真在 profiler 顶上的场景。普通业务别急着手抄 AVX-512 intrinsic。


std::is_heap 可以更快,也不该要求 random access

Arthur O’Dwyer 发现 std::ranges::is_heap 要求 random_access_range,但检查 heap 本质上只要按顺序看 parent/child 对,forward_range 就够了。

更简单的实现可以两个 forward iterator 锁步走,避免重复算 first + c2*p+1

template <class _Compare, class _ForwardIterator, class _Sentinel>
_LIBCPP_HIDE_FROM_ABI _LIBCPP_CONSTEXPR_SINCE_CXX20 _ForwardIterator
__is_heap_until(_ForwardIterator __first, _Sentinel __last, _Compare&& __comp) {
  _ForwardIterator __child = __first;
  if (__child == __last) {
    return __child;
  }
  while (true) {
    ++__child;
    if (__child == __last || __comp(*__first, *__child)) break;
    ++__child;
    if (__child == __last || __comp(*__first, *__child)) break;
    ++__first;
  }
  return __child;
}

benchmark 结果也支持,libc++ 改完快不少:

场景 n before after 改善
vector_half 1K 4155ns 2658ns -36%
vector_full 1K 8813ns 5294ns -39%
deque_half 100K 375844ns 264856ns -29%
deque_full 10M 65697333ns 52488343ns -20%

结论:实现可以马上优化;标准也可以考虑把 is_heap/is_heap_until 从 random access 放宽到 forward。STL 角落里的小坑,但这种抠细节的文章就是好看。


C++26 Reflection enum-to-string 编译开销对比

GCC 16.1.1 下测三种 enum-to-string:C++26 reflection、enchantum、X-macro。

Reflection 版本:

#include <meta>
#include <string_view>
#include <type_traits>

template <typename T>
  requires std::is_enum_v<T>
constexpr std::string_view to_enum_string(T val)
{
  template for (constexpr auto e :
    std::define_static_array(std::meta::enumerators_of(^^T)))
  {
    if (val == [:e:])
      return std::meta::identifier_of(e);
  }
  return "<unknown>";
}

关键数据(单 TU 编译耗时):

N X-macro const char* X-macro string_view enchantum Reflection
include only 25.7ms 136.0ms 147.1ms 180.8ms
4 26.6ms 137.6ms 170.6ms 186.7ms
256 32.5ms 153.0ms 184.1ms 215.0ms
1024 54.7ms 204.5ms 272.0ms 255.0ms

算法本身不慢,慢的是 <meta>。Reflection 每个枚举项约 0.07ms/enumerator,和 string_view X-macro 的 0.06ms/enumerator 差不多;但 <meta> 头文件税太贵。

PCH 和 modules 更有意思:PCH 把 N=4 从 186.7ms 降到 80.6ms;GCC 16 modules 反而涨到 403.4ms。modules 又被打脸一次。

结论:库作者别随便在 public header 里 include <meta>。要极致编译速度,X-macro 还是土但是快。


运行时选择策略的四种 dispatch 写法

用 OpenJDK GC barrier 做例子,比较 virtual、function pointer、std::variant、decoupled CRTP + lazy resolution。

virtual dispatch:

class BarrierSet {
public:
  virtual ~BarrierSet() = default;
  virtual void store(int* addr, int value) = 0;
};

class G1BarrierSet : public BarrierSet {
public:
  void store(int* addr, int value) override {
    printf(" [pre-barrier] saving old value: %d\n", *addr);
    *addr = value;
    printf(" [post-barrier] recording modified region @ %p\n", addr);
  }
};

函数指针:

using StoreFn = void(*)(int*, int);
StoreFn store = resolve(argv[1]);
store(heap, 42);

std::variant + std::visit

using AnyBarrierSet = std::variant<EpsilonBarrierSet, SerialBarrierSet, G1BarrierSet>;

auto bs = create(argv[1]);
std::visit([&](auto& b) { b.store(addr, value); }, bs);

libstdc++ 上 std::visit 生成得并不好:会 spill lambda capture,再按 variant index 查函数表。作者吐槽:vtable with extra steps

decoupled CRTP + lazy resolution 是 OpenJDK 风格:第一次调用根据 runtime kind 选函数指针,之后直接调用缓存指针。

struct RuntimeDispatch {
  using StoreFn = void(*)(int*, int);
  static StoreFn _store_func;

  static void store_init(int* addr, int value) {
    StoreFn func;
    switch (BarrierSet::barrier_set()->kind()) {
      case G1: func = &G1BarrierSet::AccessBarrier<>::store; break;
      case Serial: func = &SerialBarrierSet::AccessBarrier<>::store; break;
      case Epsilon: func = &EpsilonBarrierSet::AccessBarrier<>::store; break;
    }
    _store_func = func;
    func(addr, value);
  }

  static void store(int* addr, int value) {
    _store_func(addr, value);
  }
};

对比数据(Xeon Gold 6130,100M iterations,GCC 11):

方案 dispatch overhead text size compile time
Virtual +1.4ns 6.8KB 0.38s
Function pointer +0.9ns 4.7KB 0.25s
variant +2.2ns 5.2KB 0.32s
Decoupled CRTP +0.9ns 7.9KB 0.29s

结论别误读:大多数业务 virtual 就够了。只有“runtime 选一次、hot path 调爆、还要分层组合”的场景,decoupled CRTP 才值。std::visit 不一定比 virtual 快,这个点值得记住。


C++26 的 std::simd 是不是没人要

这篇是对 C++26 std::simd 的长篇开喷。观点:std::simd 来晚了,库级抽象遮住 optimizer,默认宽度不对,真实 SIMD 里最关键的 cross-lane 操作又表达不了。

主要槽点:

吐槽很猛,看一乐,不过保守起见那就用highway


C++ include 顺序:反向依赖排序

核心规则:依赖放在被依赖者后面,让缺失 include 更早暴露。推荐顺序:当前 TU header、subsystem headers、project internal、external、standard library。

// foo.cpp

// TU header
#include <proj/foo_module/foo.hpp>

// Subsystem headers
#include <proj/foo_module/bar.hpp>
#include <proj/foo_module/baz.hpp>

// Internal headers
#include <proj/my_other_module/foo.hpp>
#include <proj/my_other_module/bar.hpp>

// External headers
#include <boost/foo.hpp>

// Standard library headers
#include <string>

抓 transitive include 问题:

// A.hpp
auto identity(std::uint16_t a) -> std::uint16_t;

// B.hpp
#include <cstdint>

// A.cpp
#include "B.hpp"
#include "A.hpp"

auto identity(std::uint16_t a) -> std::uint16_t {
  return a;
}

这段能编译只是因为 B.hpp 顺手 include 了 <cstdint>。哪天 B.hpp 删掉 <cstdint>A.cpp 明明没动也炸了。

作者还建议更严格 IWYU:cpp 自己用到的东西,即使 header 已经 include 了,cpp 也显式 include 一遍。短期多几行 include,长期少很多“为什么我改 B,A 炸了”的玄学构建问题。


只有 17% 的 64-bit 整数能写成两个 32-bit 整数的乘积

Lemire 的数学小帖。问题:所有 64-bit unsigned integer 里,有多少能表示成两个 32-bit unsigned integer 的乘积?

和 hash 有关,比如弱 hash:

func simpleHighLowHash(x uint64) uint64 {
  high := uint32(x >> 16)
  low := uint32(x & 0xFFFFFFFF)
  return uint64(high) * uint64(low)
}

Webster 等人的数学和代码可以精确算出结果:

3,215,709,724,700,470,902 个 64-bit unsigned integer 可以写成两个 32-bit 整数的乘积,占所有 64-bit 值大约 17%

给一个 64-bit 数 n,想判断能不能拆成两个 32-bit 因子,可以做 prime factorization,然后枚举小于 2^32 的 divisor:

for p in factor_multiplicities:
  new_candidates = []
  for c in candidates:
    for i in range(factor_multiplicities[p] + 1):
      if c * (p ** i) < 2**32:
        new_candidates.append(c * (p ** i))
  for new_c in new_candidates:
    candidates.append(new_c)
m = max(candidates)
leftover = n // m
if leftover >= 2**32:
  print("Leftover is too large, cannot find a suitable candidate.")

随机选一个 64-bit 数,大概率拆不出来。hash 设计警示:乘法看起来把 bit 搅动了,但输出集合可能比你想的小很多


SIMD 加速整数转字符串

Lemire 博客版,解释上面那篇论文。核心:用 AVX-512 IFMA 一次对 8 个 64-bit lane 做 fused multiply-add,不同 lane 用不同 reciprocal 常量,一次算出多个 decimal digit。

核心代码:

__m512i to_string_avx512ifma_8digits(uint64_t n) {
  __m512i bcstq_l = _mm512_set1_epi64(n);
  constexpr uint64_t twoto52 = 0x10000000000000ULL; // 2^52
  __m512i ifma_const = _mm512_setr_epi64(
    twoto52 / 100000000, twoto52 / 10000000,
    twoto52 / 1000000, twoto52 / 100000,
    twoto52 / 10000, twoto52 / 1000,
    twoto52 / 100, twoto52 / 10
  );
  __m512i zmmTen = _mm512_set1_epi64(10);
  __m512i asciiZero = _mm512_set1_epi64('0');
  __m512i lowbits_l = _mm512_madd52lo_epu64(ifma_const,
    bcstq_l, ifma_const); // ifma_const * bcstq_l + ifma_const
  __m512i highbits_l = _mm512_madd52hi_epu64(asciiZero,
    zmmTen, lowbits_l);
  return highbits_l;
}

这段最后能编成两条 vpmadd52huq 级别的 multiply-add 指令,生成 8 个 digit。长度不满 8 位可以用 AVX-512 masked store。

数据:比最佳竞争者快 1.4~2x,比 std::to_chars2~4x。看着很爽,但 AVX-512 IFMA 门槛也高,适合库作者和极致序列化场景,业务代码还是先 profile。


ERROR_ARENA_TRASHED 这个错误码是哪来的

Raymond Chen 的 Windows/MS-DOS 考古。错误码 7 是 ERROR_ARENA_TRASHED,名字很朋克:arena 被 trash 了,像重金属乐队把演出场地砸了。

这个错误来自 MS-DOS。当年 DOS 管内存时,会把内存组织成一串 variable-sized memory block,每块前面有一个 16 字节的 header,叫 arena:

arena STRUC
arena_signature DB ? ; 4D for valid item, 5A for last item
arena_owner     DW ? ; owner of arena item
arena_size      DW ? ; size in paragraphs of item
arena ENDS

几个点:

DOS 在遍历内存块时,如果发现 signature 既不是 0x4D 也不是 0x5A,说明这串 arena 元数据被写坏了,于是返回 ERROR_ARENA_TRASHED

重点:Win32 kernel 基本不用这个错误码。它更像历史遗留化石。Raymond 说这反而让它适合测试里 mock 错误条件:如果你看到 error 7,大概率是测试 harness 故意塞的,不是真系统错误。

顺便吐槽那些“教你修 ERROR_ARENA_TRASHED”的网站:错误码实际都不用了,但网页还一本正经让你跑系统扫描、更新驱动、修复系统文件。经典 SEO 垃圾内容,什么都不懂但非常自信。我看了都替它尴尬。

又是 Raymond 的上古 Windows 考古,没啥工程实用性,但适合饭后看。


给游戏写一个可序列化的宏 coroutine

Vittorio Romeo 这篇讲他在 SFML fork 里写的 sfex::Coroutine,一个 200 行左右、stackless、宏实现、可序列化、无分配 的游戏脚本 coroutine。

核心观点:C++20 coroutine 语法很好看,但不适合游戏逻辑:

游戏逻辑不一样。boss AI、cutscene、dialogue 不是“一次 async 请求结束就扔”的临时任务,它们是游戏对象状态的一部分。玩家随时 F5 quick-save,你要能把“boss 正停在 phase 3,还剩 0.3 秒 wait”一起存下来。

目标:

一个 dialogue 例子:

struct DialogueScene : sfex::Coroutine
{
  Yield operator()(World& world)
  {
    SFEX_CO_BEGIN;

    world.showText("Hero", "I finally got you.");
    SFEX_CO_YIELD(Wait{1.0f});

    world.showText("Villain", "You're too late!");
    SFEX_CO_YIELD(Wait{0.5f});

    world.showText("Hero", "We'll see about that!");
    SFEX_CO_YIELD(Wait{1.0f});

    world.hideText();
    SFEX_CO_RETURN(Done{});
    SFEX_CO_END;
  }
};

Yieldstd::variant<NextFrame, Wait, Done>。driver 每帧调用一次,根据返回值决定下一帧继续、等几秒、还是结束:

DialogueScene scene;
float waitTimer = 0.f;

while (true)
{
  const float dt = clock.restart().asSeconds();

  if (waitTimer > 0.f)
  {
    waitTimer -= dt;
  }
  else
  {
    scene(world).match(
      [&](NextFrame) { /* ...run again next frame... */ },
      [&](Wait w) { waitTimer = w.seconds; },
      [&](Done) { /* ...proceed to next scene... */ });
  }

  // ... draw, display ...
}

平铺的对话用 FSM 也不丑:

struct DialogueSceneFSM
{
  int step = 0;
  float waitTimer = 0.f;

  bool tick(World& world, float dt)
  {
    if (waitTimer > 0.f)
    {
      waitTimer -= dt;
      return true;
    }

    switch (step++)
    {
      case 0: world.showText("Hero", "I finally got you."); waitTimer = 1.0f; return true;
      case 1: world.showText("Villain", "You're too late!"); waitTimer = 0.5f; return true;
      case 2: world.showText("Hero", "We'll see about that!"); waitTimer = 1.0f; return true;
      case 3: world.hideText(); return false;
    }

    return false;
  }
};

但一旦有嵌套时间逻辑,比如角色走三步,每步之间停一下,FSM 就开始炸。coroutine 还是自然的 for + while + yield

struct CutsceneScene : sfex::Coroutine
{
  int paceIdx = 0;
  float t = 0.f;
  sf::Vec2f paceStartPos;

  Yield operator()(World& world)
  {
    SFEX_CO_BEGIN;

    // ...

    for (paceIdx = 0; paceIdx < 3; ++paceIdx)
    {
      paceStartPos = world.villainPos;
      t = 0.f;

      while (t < 1.f)
      {
        t += world.dt;
        world.villainPos.x = paceStartPos.x - 100.f * t;

        SFEX_CO_YIELD(NextFrame{});
      }

      SFEX_CO_YIELD(Wait{0.30f});
    }

    world.showText("Hero", "Don't come any closer!");
    SFEX_CO_YIELD(Wait{1.5f});

    SFEX_CO_RETURN(Done{});
    SFEX_CO_END;
  }
};

注意 paceIdx/t/paceStartPos 都是成员,不能是局部变量。任何要跨 yield 存活的东西都得放 struct 上,后面会说这是这套宏方案最大的 footgun。

原理:switch + case label。 宏展开后其实就是手写状态机:

Yield DialogueScene::operator()(World& world)
{
  switch (state)
  {
    case 0:

    world.showText("Hero", "I finally got you.");

    state = 1;
    return Wait{1.0f};

    case 1:

    world.showText("Villain", "You're too late!");

    state = 2;
    return Wait{0.5f};

    case 2:

    world.showText("Hero", "We'll see about that!");

    state = 3;
    return Wait{1.0f};

    case 3:

    world.hideText();

    state = 0;
    return Done{};
  }

  std::unreachable();
}

C/C++ 的 case label 可以嵌在 switch body 任意深度(只要中间没有另一个 switch),所以 yield 可以写成“设置 state + return + 下一个 case label”。经典 protothreads 技巧。

宏定义:

#define SFEX_CO_BEGIN \
  static constexpr int _sfex_base = __COUNTER__; \
  switch (state) { case 0:;

#define SFEX_CO_YIELD(value) \
  do { \
    state = (__COUNTER__ + 1) - _sfex_base; \
    return value; \
    case (__COUNTER__ - _sfex_base):; \
  } while (0)

#define SFEX_CO_RETURN(value) \
  do \
  { \
    state = 0; \
    return value; \
  } while (0)

#define SFEX_CO_END \
  } \
  \
  std::unreachable();

这里用 __COUNTER__ 而不是 __LINE__,原因是序列化。__LINE__ 会因为加空行、格式化、插注释而变,旧存档里的 state=47 可能突然跳到另一个 yield。__COUNTER__ 是按宏使用次数递增,普通改格式不影响。代价是:你新增/删除 yield,后面的 state 还是会变,所以正式游戏还得给 coroutine 加 revision,旧存档不匹配就拒绝加载或迁移。

组合:SFEX_CO_AWAIT 敌人 AI 可以把两个子 coroutine 作为成员,循环 await:

struct RingFire : sfex::Coroutine
{
  int rings = 4;
  int i = 0;

  Yield operator()(World& world, Enemy& self)
  {
    SFEX_CO_BEGIN;

    for (i = 0; i < rings; ++i)
    {
      world.spawnBulletRing(self.pos, /* count */ 8, /* speed */ 120.f);
      SFEX_CO_YIELD(Wait{0.4f});
    }

    SFEX_CO_RETURN(Done{});
    SFEX_CO_END;
  }
};

struct AimedBurst : sfex::Coroutine
{
  int shots = 5;
  int i = 0;

  Yield operator()(World& world, Enemy& self)
  {
    SFEX_CO_BEGIN;

    for (i = 0; i < shots; ++i)
    {
      world.spawnBulletAimed(self.pos, world.player.pos, /* speed */ 220.f);
      SFEX_CO_YIELD(Wait{0.2f});
    }

    SFEX_CO_RETURN(Done{});
    SFEX_CO_END;
  }
};

父 coroutine:

struct EnemyAI : sfex::Coroutine
{
  RingFire ring;
  AimedBurst aimed;

  Yield operator()(World& world, Enemy& self)
  {
    SFEX_CO_BEGIN;

    while (true)
    {
      ring = {};
      SFEX_CO_AWAIT(ring(world, self));
      SFEX_CO_YIELD(Wait{1.0f});

      aimed = {};
      SFEX_CO_AWAIT(aimed(world, self));
      SFEX_CO_YIELD(Wait{1.0f});
    }

    SFEX_CO_END;
  }
};

AWAIT 本质是 tick child,如果 child 没结束就把 child 的 yield 往上传。没有分配,没有 opaque handle,父对象销毁时子 coroutine 也跟着销毁。

坑:

这个实现其实就是“可序列化 protothreads for game scripting”。花里胡哨但挺有意思。C++20 coroutine 是通用异步机制,适合网络/IO/generator;游戏脚本这种“状态就是数据、要存档、要热重载/回放”的场景,手写状态机或者这种宏 coroutine 反而更可控。宏当然丑,但能存档就是硬道理。


上一期

本期

下一期

看到这里或许你有建议或者疑问或者指出错误,请留言评论! 多谢! 你的评论非常重要!也可以帮忙点赞收藏转发!多谢支持! 觉得写的不错那就给点吧, 在线乞讨 微信转账