公众号

点击「查看原文」跳转到 GitHub 上对应文件,链接就可以点击了
qq群 753792291 答疑在这里
欢迎投稿,推荐或自荐文章/软件/资源等,评论区留言
本期文章 没有 赞助 求赞助各位老板
标准委员会动态/ide/编译器信息放在这里
编译器信息最新动态推荐关注hellogcc公众号 本周更新 2025-01-08 第288期
从 mutex + std::queue 讲到 Michael-Scott queue、ABA、batching、hazard pointer。主题很老,但代码贴得多,适合复习 lock-free 基础。
基准版:
template <typename T>
class SimpleQueue {
public:
void Push(T* item, uint32_t threadId) {
(void)threadId;
std::unique_lock lock(m_Mutex);
m_Queue.push(*item);
}
bool Pop(T* item, uint32_t threadId) {
(void)threadId;
std::unique_lock lock(m_Mutex);
if (m_Queue.empty()) return false;
*item = std::move(m_Queue.front());
m_Queue.pop();
return true;
}
private:
std::queue<T> m_Queue;
mutable std::mutex m_Mutex;
};
低竞争下没问题,高竞争下锁竞争导致 context switch,队列操作几纳秒,切线程几微秒,直接高几个数量级。我谢谢你。
CAS 基础:
std::atomic<int> value{42};
int expected = 42;
int desired = 100;
if (value.compare_exchange_strong(expected, desired)) {
// success! value was 42, now it is 100
} else {
// failure, `expected` now holds the actual current value
}
朴素 lock-free queue:
template <typename T>
struct NaiveNode {
T data;
std::atomic<NaiveNode*> next;
};
template <typename T>
class NaiveLockFreeQueue {
public:
NaiveLockFreeQueue() {
auto dummy = new NaiveNode<T>();
dummy->next.store(nullptr);
head.store(dummy);
tail.store(dummy);
}
void Push(T* value, uint32_t /*threadId*/) {
auto newNode = new NaiveNode<T>();
newNode->data = *value;
newNode->next.store(nullptr, std::memory_order::relaxed);
while (true) {
auto currentHead = head.load(std::memory_order::acquire);
auto currentNext = currentHead->next.load(std::memory_order::acquire);
if (currentHead != head.load(std::memory_order::acquire)) continue;
if (currentNext == nullptr) {
NaiveNode<T>* expected = nullptr;
if (currentHead->next.compare_exchange_strong(expected, newNode)) {
head.compare_exchange_strong(currentHead, newNode);
return;
}
} else {
head.compare_exchange_strong(currentHead, currentNext);
}
}
}
bool Pop(T* value, uint32_t /*threadId*/) {
while (true) {
auto currentTail = tail.load(std::memory_order::acquire);
auto currentHead = head.load(std::memory_order::acquire);
auto next = currentTail->next.load(std::memory_order::acquire);
if (currentTail != tail.load(std::memory_order::acquire)) continue;
if (currentTail == currentHead) {
if (next == nullptr) return false;
head.compare_exchange_strong(currentHead, next);
} else {
if (!next) continue;
*value = next->data;
if (tail.compare_exchange_strong(currentTail, next)) {
delete currentTail;
return true;
}
}
}
}
private:
std::atomic<NaiveNode<T>*> head;
std::atomic<NaiveNode<T>*> tail;
};
坑也很经典:
new/delete,锁从队列搬到 malloc 里了Part II:Batching。 朴素版本三个问题:分配太多、cache locality 差、内存回收不安全。先解决前两个:一个节点不存一个元素,存一整个固定大小数组。比如 block size=1024,那就是 1024 次 push 才需要一次节点分配,pointer chasing 也从每个元素一次变成每 1024 个元素一次。
每个 slot 里有数据和一个 commit flag。注意“抢到 slot index”和“数据已经写完”不是一回事,中间有窗口,所以 consumer 不能抢到 index 就直接读。
template <typename T, bool BlockingRead>
requires std::is_trivially_copyable_v<T>
class FastQueueNodeSlot {
public:
FORCE_INLINE FastQueueNodeSlot() {
commited.clear(std::memory_order::relaxed);
}
FORCE_INLINE void Commit(T* value) {
memcpy(&data, value, sizeof(T));
commited.test_and_set(std::memory_order::release);
if constexpr (BlockingRead) {
commited.notify_one();
}
}
FORCE_INLINE bool IsCommited() const {
return commited.test(std::memory_order::acquire);
}
FORCE_INLINE void WaitTillCommited() const {
while (!IsCommited()) {
commited.wait(false, std::memory_order::relaxed);
}
}
FORCE_INLINE bool TryRead(T* value) const {
if (IsCommited()) {
memcpy(value, &data, sizeof(T));
return true;
}
return false;
}
FORCE_INLINE void Read(T* value) const {
WaitTillCommited();
memcpy(value, &data, sizeof(T));
}
FORCE_INLINE void Reset() { commited.clear(std::memory_order::relaxed); }
private:
T data;
std::atomic_flag commited;
};
这里限制 T 必须 trivially copyable,所以能直接 memcpy。你不能直接塞 std::string/unique_ptr,要么塞 POD,要么塞指针。换来的是没有构造/析构/异常安全那一堆麻烦,hot path 更干净。
C++20 的 atomic_flag::wait/notify_one 也挺有意思,blocking 版本不用 mutex + condition_variable,底下可以走 futex/WaitOnAddress 这类 atomic wait。
节点结构:
template <typename T, uint32_t Size, bool BlockingRead>
class FastQueueNode {
public:
using Self = FastQueueNode<T, Size, BlockingRead>;
FORCE_INLINE FastQueueNode() { Reset(); }
FORCE_INLINE bool IsUsedUp(std::memory_order memoryOrder) {
return (tail.load(memoryOrder) >= Size);
}
FORCE_INLINE bool Push(T* value) {
auto oldHead = head.load(std::memory_order::acquire);
while (true) {
if (oldHead == Size) {
return false;
}
auto newHead = oldHead + 1;
if (head.compare_exchange_strong(oldHead, newHead,
std::memory_order::seq_cst,
std::memory_order::acquire)) {
slots[oldHead].Commit(value);
return true;
}
}
}
FORCE_INLINE bool Pop(T* value) {
auto currentTail = tail.load(std::memory_order::seq_cst);
while (true) {
auto currentHead = head.load(std::memory_order::acquire);
if (currentTail >= currentHead) {
return false;
}
if constexpr (!BlockingRead) {
if (!slots[currentTail].IsCommited()) {
return false;
}
}
auto newTail = currentTail + 1;
if (tail.compare_exchange_strong(currentTail, newTail,
std::memory_order::seq_cst,
std::memory_order::acquire)) {
slots[currentTail].Read(value);
return true;
}
}
return false;
}
FORCE_INLINE void Reset() {
head.store(0, std::memory_order::release);
tail.store(0, std::memory_order::release);
next.store(nullptr, std::memory_order::release);
for (auto& slot : slots) {
slot.Reset();
}
}
FORCE_INLINE Self* Next(std::memory_order memoryOrder) {
return next.load(memoryOrder);
}
FORCE_INLINE bool TrySetNext(Self*& expected, Self* value) {
return next.compare_exchange_strong(expected, value,
std::memory_order::seq_cst,
std::memory_order::acquire);
}
private:
alignas(64) std::array<FastQueueNodeSlot<T, BlockingRead>, Size> slots;
alignas(64) std::atomic<uint32_t> head;
alignas(64) std::atomic<uint32_t> tail;
alignas(64) std::atomic<Self*> next;
};
alignas(64) 是为了防 false sharing。head、tail、next 都是热 atomic,放同一个 cache line 上就是互相扇嘴巴,cache coherence 会疯狂 invalidate。浪费一点 padding,比 cache line ping-pong 强。
Part III:Hazard Pointer。 batching 降低了分配频率,但 delete oldTail 仍然不安全。hazard pointer 的规则是:我要 dereference 某个共享指针之前,先把这个指针发布到一个全局可见的位置,告诉所有线程“这个地址我正在看,别删”。
每线程状态:
template <typename T, uint32_t PointerCount>
struct alignas(64) HazardThreadState {
std::array<std::atomic<T*>, PointerCount> hazardPointers;
std::vector<T*> retiredPointers;
std::vector<T*> scratchBuffer;
};
核心 Protect:
T* Protect(const std::atomic<T*>& ptr, uint32_t threadId, uint32_t slotId) {
assert(threadId < ThreadCount);
assert(slotId < PointerCount);
T* p = nullptr;
do {
p = ptr.load(std::memory_order::acquire);
state[threadId].hazardPointers[slotId].store(p, std::memory_order::seq_cst);
} while (p && p != ptr.load(std::memory_order::seq_cst));
return p;
}
void Release(uint32_t threadId, uint32_t slotId) {
state[threadId].hazardPointers[slotId].store(nullptr,
std::memory_order::release);
}
重点是“load → publish → reload 检查是否还一样”。如果第一次 load 之后别的线程已经把指针换掉并 retire 了,第二次检查会发现不一致,重来。这样 Protect 返回时,至少能保证:我发布 hazard 之后,这个 atomic 仍然指向它。别人 Scan 时会看到我的 hazard,不会 delete。
删除逻辑不是直接 delete,而是 Retire 进 retired list,攒到阈值再 Scan:
template <typename Deleter = std::nullptr_t>
requires(std::invocable<Deleter, T*> ||
std::same_as<Deleter, std::nullptr_t>)
void Scan(uint32_t threadId, Deleter deleter) {
auto& activePointersTracker = state[threadId].scratchBuffer;
activePointersTracker.clear();
for (const auto& list : state) {
for (const auto& pointer : list.hazardPointers) {
T* ptr = pointer.load(std::memory_order::seq_cst);
if (ptr) activePointersTracker.push_back(ptr);
}
}
std::sort(activePointersTracker.begin(), activePointersTracker.end());
auto& retiredList = state[threadId].retiredPointers;
for (uint32_t i = 0; i < retiredList.size();) {
if (!std::binary_search(activePointersTracker.begin(),
activePointersTracker.end(), retiredList[i])) {
if constexpr (!std::same_as<Deleter, std::nullptr_t>) {
deleter(retiredList[i]);
} else {
delete retiredList[i];
}
retiredList[i] = retiredList.back();
retiredList.pop_back();
} else {
i++;
}
}
}
这里两个性能点:scratch buffer 预分配复用;sort + binary_search 把朴素 O(N*R) 变成 O((N+R)logN)。另外 Retire 只能在 CAS 成功、当前线程真的“拿到这个节点所有权”之后调用,不然多个线程 retire 同一个指针就等着 double free。
为了避免 Protect 之后忘记 Release,搞个RAII回收guard, DEFER:
template <typename F>
requires std::is_invocable_v<F>
class Defer {
public:
explicit Defer(F payload) : deferedPayload(payload) {}
~Defer() { std::invoke(deferedPayload); }
private:
F deferedPayload;
};
#define CONCAT_IMPL(x, y) x##y
#define CONCAT(x, y) CONCAT_IMPL(x, y)
#define DEFER(...) \
const auto CONCAT(__deferedPayload, __LINE__) = \
Defer([&]() { __VA_ARGS__ });
用起来:
Node* current = hp.Protect(head, threadId, 0);
DEFER(hp.Release(threadId, 0););
Part IV:thread-local cache。 hazard pointer 解决了“什么时候能删”,但如果最终还是 delete,多线程 allocator 还是瓶颈。所以改成每个线程一个 node cache:retire 后不 delete,丢回当前线程 cache;下次需要新 node,先从 cache 拿。
Node* AcquireNewNode(uint32_t threadId) {
if constexpr (ThreadLocalCacheSize > 0) {
auto& chBuff = tcBuff[threadId];
if (chBuff->size() > 0) {
auto item = chBuff->back();
item->Reset();
chBuff->pop_back();
return item;
}
}
return new Node();
}
void ReleaseNewNode(Node* ptr, uint32_t threadId) {
if constexpr (ThreadLocalCacheSize > 0) {
auto& chBuff = tcBuff[threadId];
if (chBuff->size() < ThreadLocalCacheSize) {
chBuff->push_back(ptr);
return;
}
}
delete ptr;
}
if constexpr (ThreadLocalCacheSize > 0) 的意思是:你把 cache size 配成 0,相关代码在二进制里直接消失,零运行时开销。
最终 FastQueue 的 Push/Pop:
template <typename T, uint32_t NodeBufferSize, uint32_t ThreadCount,
bool BlockingRead = false, uint32_t ThreadLocalCacheSize = 256>
class FastQueue {
public:
using Node = FastQueueNode<T, NodeBufferSize, BlockingRead>;
FORCE_INLINE void Push(T* value, uint32_t threadId) {
Node *current = nullptr, *next = nullptr;
while (true) {
current = hp.Protect(head, threadId, 0);
DEFER(hp.Release(threadId, 0););
if (current->Push(value)) break;
next = current->Next(std::memory_order::acquire);
if (next == nullptr) {
auto newNode = AcquireNewNode(threadId);
if (!current->TrySetNext(next, newNode)) {
ReleaseNewNode(newNode, threadId);
}
continue;
}
head.compare_exchange_strong(current, next, std::memory_order::seq_cst,
std::memory_order::acquire);
}
}
FORCE_INLINE bool Pop(T* value, uint32_t threadId) {
Node *currentTail = nullptr, *next = nullptr;
while (true) {
currentTail = hp.Protect(tail, threadId, 1);
DEFER(hp.Release(threadId, 1););
if (currentTail->Pop(value)) return true;
if (!currentTail->IsUsedUp(std::memory_order::seq_cst)) return false;
next = currentTail->Next(std::memory_order::acquire);
if (!next) return false;
if (tail.compare_exchange_strong(currentTail, next,
std::memory_order::seq_cst,
std::memory_order::acquire)) {
hp.Retire(currentTail, threadId, [this, threadId](Node* node){
this->ReleaseNewNode(node, threadId);
});
}
}
}
private:
using ThreadLocalCacheBuffer = std::vector<Node*>;
alignas(64) std::atomic<Node*> head;
alignas(64) std::atomic<Node*> tail;
alignas(64) HazardPointer<Node, ThreadCount, 4, 16> hp;
alignas(64) std::array<Aligned<ThreadLocalCacheBuffer, 64>,
(ThreadLocalCacheSize > 0) ? ThreadCount : 1> tcBuff;
};
模板参数基本都是策略开关:
NodeBufferSize:每个 node 多少 slot,越大 pointer chasing 越少,但单 node 内存越大ThreadCount:hazard pointer 数组大小BlockingRead:consumer 碰到未 commit slot 时是等还是返回 falseThreadLocalCacheSize:每线程缓存多少回收 nodebenchmark: 5,000,000 个 item,clang++ -O3 -std=c++20,16 核 Intel + HT:
| 场景 | SimpleQueue(mutex) | NaiveLockFree | FastQueue | 加速 |
|---|---|---|---|---|
| MPMC Low (2P/2C) | 250.7ms | 797.1ms | 150.8ms | 1.6x |
| MPMC Med (8P/8C) | 2152.2ms | 2426.5ms | 309.7ms | 6.9x |
| MPMC High (16P/16C) | 2600.7ms | 2413.5ms | 299.6ms | 8.6x |
| SPMC Med (1P/8C) | 3138.0ms | 1595.1ms | 253.9ms | 12.3x |
| MPSC Med (8P/1C) | 563.2ms | 1865.2ms | 252.2ms | 2.2x |
几个观察:
new/delete,allocator 先炸这篇文章的核心价值不是“大家复制这份代码上生产”,而是把几个关键点串起来了:batching 降低分配和 pointer chasing、hazard pointer 解决 ABA/安全回收、thread-local cache 避免 allocator 锁、cache line alignment 避免 false sharing。
最后还是那句话:lock-free 能快,但复杂度是真贵。你要是没 benchmark 证明 mutex 是瓶颈,就别上来整这套,用户没受益,维护者先暴毙。
HFT/低延迟系统入门综述,目标是 p95 wire-to-wire latency < 50us。wire-to-wire 指从 NIC 收到行情包,到同一张 NIC 发出订单,中间所有处理都算进去,精确测量要靠 NIC hardware timestamp。
典型 pipeline:Market Data Ingestion → Filtering/Checks → Pricing Engine → Strategy Evaluation → OMS。
关键点:
malloc/new,自定义 allocator / pool部署 checklist 大概是:关 C-states/Turbo Boost、CPU governor 设 performance、关 PCIe ASPM、开 hugepages、isolcpus/nohz_full/rcu_nocbs、mlockall、cache-align、lock-free SPSC queue、TSC 计时。
这类文章有点低延迟八股文。不过重点在于:P99 很多时候死在 OS/硬件/观测缺失上,不是死在你少写了两条 AVX 指令上。
C++26 补齐 type-erased callable wrapper:std::copyable_function 和 std::function_ref。
std::function 的老毛病是 const-correctness 有坑:
#include <functional>
#include <iostream>
struct Counter {
int counter = 0;
void operator()() {
++counter;
std::cout << "modifying state: " << counter << '\n';
}
};
int main() {
const std::function<void()> f = Counter{};
f(); // OK (!)
}
std::function::operator() 是 const,但能调用内部对象非 const operator(),ABI 固化,没法修。
std::copyable_function 基本是 copyable 版 std::move_only_function,签名里的 qualifier 真正控制 operator():
#include <functional>
struct Counter {
int count = 0;
int operator()() { return ++count; } // non-const
};
int main() {
std::copyable_function<int()> f = Counter{};
f(); // OK
// std::copyable_function<int() const> g = Counter{}; // ERROR
std::copyable_function<int() const> h = []{ return 42; }; // OK
}
std::function_ref 是 non-owning callable reference,callable 版 string_view:
// function pointer -- too restrictive, no lambdas with captures:
payload retry(size_t times, payload(*action)());
// template -- works but bloats binary, must be in header:
template<class F>
payload retry(size_t times, F&& action);
// function_ref -- lightweight, non-owning, no allocation:
payload retry(size_t times, std::function_ref<payload()> action);
选择:callback 参数用 function_ref;存 callable 不复制用 move_only_function;要复制用 copyable_function;std::function 新代码能不用就不用。省流:std::function 终于可以慢慢退役了。
ref-qualifier 给成员函数加 & / &&,让函数根据 *this 是 lvalue 还是 rvalue 参与重载。
struct refqualified {
mytype var;
const mytype& getVar() const& { return var; }
void setVar(const mytype& v) & { var = v; }
mytype getVar() && { return var; }
};
常见组合:
f(...) const& // const lvalue 上可调
f(...) & // 只能 lvalue 调,T{}.f() 不行
f(...) && // 只能 rvalue/temporary 调
一个 fluent interface 例子,只允许临时对象链式配置:
enum loglevel { NONE, SOME, FULL };
struct Logger {
loglevel level = loglevel::NONE;
std::string sink = "none";
bool debug = false;
Logger&& setLogLevel(loglevel ll) && { level = ll; return std::move(*this); }
Logger&& setSink(std::string s) && { sink = s; return std::move(*this); }
Logger&& setDebug(bool d = true) && { debug = d; return std::move(*this); }
};
Logger l = Logger().setSink("mysink").setDebug();
// l.setLogLevel(loglevel::SOME); // error
这个例子有点刻意,但味道能看出来:把 API 的生命周期/值类别约束写进类型系统。尤其成员函数返回内部引用时,const& 返回引用,&& 返回值/移动值,可以避免从临时对象里拿悬垂引用。
Lemire 和 Jaël Champagne Gareau 的论文:整数转十进制字符串做到 2ns 以内。传统做法是反复除以 10/100,再查表拼字符。论文走 SIMD + AVX-512 IFMA:用 integer multiply-add 同时算多个 quotient/remainder,去掉 lookup table。
摘要里的核心数据:
std::to_chars 快 2~4x适合 JSON/CSV/logging 这种整数转字符串真在 profiler 顶上的场景。普通业务别急着手抄 AVX-512 intrinsic。
std::is_heap 可以更快,也不该要求 random accessArthur O’Dwyer 发现 std::ranges::is_heap 要求 random_access_range,但检查 heap 本质上只要按顺序看 parent/child 对,forward_range 就够了。
更简单的实现可以两个 forward iterator 锁步走,避免重复算 first + c、2*p+1:
template <class _Compare, class _ForwardIterator, class _Sentinel>
_LIBCPP_HIDE_FROM_ABI _LIBCPP_CONSTEXPR_SINCE_CXX20 _ForwardIterator
__is_heap_until(_ForwardIterator __first, _Sentinel __last, _Compare&& __comp) {
_ForwardIterator __child = __first;
if (__child == __last) {
return __child;
}
while (true) {
++__child;
if (__child == __last || __comp(*__first, *__child)) break;
++__child;
if (__child == __last || __comp(*__first, *__child)) break;
++__first;
}
return __child;
}
benchmark 结果也支持,libc++ 改完快不少:
| 场景 | n | before | after | 改善 |
|---|---|---|---|---|
| vector_half | 1K | 4155ns | 2658ns | -36% |
| vector_full | 1K | 8813ns | 5294ns | -39% |
| deque_half | 100K | 375844ns | 264856ns | -29% |
| deque_full | 10M | 65697333ns | 52488343ns | -20% |
结论:实现可以马上优化;标准也可以考虑把 is_heap/is_heap_until 从 random access 放宽到 forward。STL 角落里的小坑,但这种抠细节的文章就是好看。
GCC 16.1.1 下测三种 enum-to-string:C++26 reflection、enchantum、X-macro。
Reflection 版本:
#include <meta>
#include <string_view>
#include <type_traits>
template <typename T>
requires std::is_enum_v<T>
constexpr std::string_view to_enum_string(T val)
{
template for (constexpr auto e :
std::define_static_array(std::meta::enumerators_of(^^T)))
{
if (val == [:e:])
return std::meta::identifier_of(e);
}
return "<unknown>";
}
关键数据(单 TU 编译耗时):
| N | X-macro const char* | X-macro string_view | enchantum | Reflection |
|---|---|---|---|---|
| include only | 25.7ms | 136.0ms | 147.1ms | 180.8ms |
| 4 | 26.6ms | 137.6ms | 170.6ms | 186.7ms |
| 256 | 32.5ms | 153.0ms | 184.1ms | 215.0ms |
| 1024 | 54.7ms | 204.5ms | 272.0ms | 255.0ms |
算法本身不慢,慢的是 <meta>。Reflection 每个枚举项约 0.07ms/enumerator,和 string_view X-macro 的 0.06ms/enumerator 差不多;但 <meta> 头文件税太贵。
PCH 和 modules 更有意思:PCH 把 N=4 从 186.7ms 降到 80.6ms;GCC 16 modules 反而涨到 403.4ms。modules 又被打脸一次。
结论:库作者别随便在 public header 里 include <meta>。要极致编译速度,X-macro 还是土但是快。
用 OpenJDK GC barrier 做例子,比较 virtual、function pointer、std::variant、decoupled CRTP + lazy resolution。
virtual dispatch:
class BarrierSet {
public:
virtual ~BarrierSet() = default;
virtual void store(int* addr, int value) = 0;
};
class G1BarrierSet : public BarrierSet {
public:
void store(int* addr, int value) override {
printf(" [pre-barrier] saving old value: %d\n", *addr);
*addr = value;
printf(" [post-barrier] recording modified region @ %p\n", addr);
}
};
函数指针:
using StoreFn = void(*)(int*, int);
StoreFn store = resolve(argv[1]);
store(heap, 42);
std::variant + std::visit:
using AnyBarrierSet = std::variant<EpsilonBarrierSet, SerialBarrierSet, G1BarrierSet>;
auto bs = create(argv[1]);
std::visit([&](auto& b) { b.store(addr, value); }, bs);
libstdc++ 上 std::visit 生成得并不好:会 spill lambda capture,再按 variant index 查函数表。作者吐槽:vtable with extra steps。
decoupled CRTP + lazy resolution 是 OpenJDK 风格:第一次调用根据 runtime kind 选函数指针,之后直接调用缓存指针。
struct RuntimeDispatch {
using StoreFn = void(*)(int*, int);
static StoreFn _store_func;
static void store_init(int* addr, int value) {
StoreFn func;
switch (BarrierSet::barrier_set()->kind()) {
case G1: func = &G1BarrierSet::AccessBarrier<>::store; break;
case Serial: func = &SerialBarrierSet::AccessBarrier<>::store; break;
case Epsilon: func = &EpsilonBarrierSet::AccessBarrier<>::store; break;
}
_store_func = func;
func(addr, value);
}
static void store(int* addr, int value) {
_store_func(addr, value);
}
};
对比数据(Xeon Gold 6130,100M iterations,GCC 11):
| 方案 | dispatch overhead | text size | compile time |
|---|---|---|---|
| Virtual | +1.4ns | 6.8KB | 0.38s |
| Function pointer | +0.9ns | 4.7KB | 0.25s |
| variant | +2.2ns | 5.2KB | 0.32s |
| Decoupled CRTP | +0.9ns | 7.9KB | 0.29s |
结论别误读:大多数业务 virtual 就够了。只有“runtime 选一次、hot path 调爆、还要分层组合”的场景,decoupled CRTP 才值。std::visit 不一定比 virtual 快,这个点值得记住。
这篇是对 C++26 std::simd 的长篇开喷。观点:std::simd 来晚了,库级抽象遮住 optimizer,默认宽度不对,真实 SIMD 里最关键的 cross-lane 操作又表达不了。
主要槽点:
<experimental/simd>,简单 SIMD sin 函数编译 2.2s,标量 for-loop 0.2s,每个 TU 10x 税-O3 -ffast-math -march=native 下,普通 loop 能 auto-vectorize;std::simd 版因为模板抽象挡住优化,反而慢std::simd<int>::size() 在 AVX2/AVX-512 机器上可能还是 ABI-safe 的 128-bit;文章 benchmark 里 std::simd 326ns,标量 loop 137ns,慢 2.4xstd::simd 是 fixed-width abstraction,天然别扭吐槽很猛,看一乐,不过保守起见那就用highway
核心规则:依赖放在被依赖者后面,让缺失 include 更早暴露。推荐顺序:当前 TU header、subsystem headers、project internal、external、standard library。
// foo.cpp
// TU header
#include <proj/foo_module/foo.hpp>
// Subsystem headers
#include <proj/foo_module/bar.hpp>
#include <proj/foo_module/baz.hpp>
// Internal headers
#include <proj/my_other_module/foo.hpp>
#include <proj/my_other_module/bar.hpp>
// External headers
#include <boost/foo.hpp>
// Standard library headers
#include <string>
抓 transitive include 问题:
// A.hpp
auto identity(std::uint16_t a) -> std::uint16_t;
// B.hpp
#include <cstdint>
// A.cpp
#include "B.hpp"
#include "A.hpp"
auto identity(std::uint16_t a) -> std::uint16_t {
return a;
}
这段能编译只是因为 B.hpp 顺手 include 了 <cstdint>。哪天 B.hpp 删掉 <cstdint>,A.cpp 明明没动也炸了。
作者还建议更严格 IWYU:cpp 自己用到的东西,即使 header 已经 include 了,cpp 也显式 include 一遍。短期多几行 include,长期少很多“为什么我改 B,A 炸了”的玄学构建问题。
Lemire 的数学小帖。问题:所有 64-bit unsigned integer 里,有多少能表示成两个 32-bit unsigned integer 的乘积?
和 hash 有关,比如弱 hash:
func simpleHighLowHash(x uint64) uint64 {
high := uint32(x >> 16)
low := uint32(x & 0xFFFFFFFF)
return uint64(high) * uint64(low)
}
Webster 等人的数学和代码可以精确算出结果:
3,215,709,724,700,470,902 个 64-bit unsigned integer 可以写成两个 32-bit 整数的乘积,占所有 64-bit 值大约 17%。
给一个 64-bit 数 n,想判断能不能拆成两个 32-bit 因子,可以做 prime factorization,然后枚举小于 2^32 的 divisor:
for p in factor_multiplicities:
new_candidates = []
for c in candidates:
for i in range(factor_multiplicities[p] + 1):
if c * (p ** i) < 2**32:
new_candidates.append(c * (p ** i))
for new_c in new_candidates:
candidates.append(new_c)
m = max(candidates)
leftover = n // m
if leftover >= 2**32:
print("Leftover is too large, cannot find a suitable candidate.")
随机选一个 64-bit 数,大概率拆不出来。hash 设计警示:乘法看起来把 bit 搅动了,但输出集合可能比你想的小很多。
Lemire 博客版,解释上面那篇论文。核心:用 AVX-512 IFMA 一次对 8 个 64-bit lane 做 fused multiply-add,不同 lane 用不同 reciprocal 常量,一次算出多个 decimal digit。
核心代码:
__m512i to_string_avx512ifma_8digits(uint64_t n) {
__m512i bcstq_l = _mm512_set1_epi64(n);
constexpr uint64_t twoto52 = 0x10000000000000ULL; // 2^52
__m512i ifma_const = _mm512_setr_epi64(
twoto52 / 100000000, twoto52 / 10000000,
twoto52 / 1000000, twoto52 / 100000,
twoto52 / 10000, twoto52 / 1000,
twoto52 / 100, twoto52 / 10
);
__m512i zmmTen = _mm512_set1_epi64(10);
__m512i asciiZero = _mm512_set1_epi64('0');
__m512i lowbits_l = _mm512_madd52lo_epu64(ifma_const,
bcstq_l, ifma_const); // ifma_const * bcstq_l + ifma_const
__m512i highbits_l = _mm512_madd52hi_epu64(asciiZero,
zmmTen, lowbits_l);
return highbits_l;
}
这段最后能编成两条 vpmadd52huq 级别的 multiply-add 指令,生成 8 个 digit。长度不满 8 位可以用 AVX-512 masked store。
数据:比最佳竞争者快 1.4~2x,比 std::to_chars 快 2~4x。看着很爽,但 AVX-512 IFMA 门槛也高,适合库作者和极致序列化场景,业务代码还是先 profile。
Raymond Chen 的 Windows/MS-DOS 考古。错误码 7 是 ERROR_ARENA_TRASHED,名字很朋克:arena 被 trash 了,像重金属乐队把演出场地砸了。
这个错误来自 MS-DOS。当年 DOS 管内存时,会把内存组织成一串 variable-sized memory block,每块前面有一个 16 字节的 header,叫 arena:
arena STRUC
arena_signature DB ? ; 4D for valid item, 5A for last item
arena_owner DW ? ; owner of arena item
arena_size DW ? ; size in paragraphs of item
arena ENDS
几个点:
arena_owner 是拥有这块内存的 process PDB,0 表示 free0x4D,也就是 ASCII M0x5A,也就是 ASCII ZMZ 是 Mark Zbikowski 的首字母,也就是 EXE 文件头那个 MZ,老梗了DOS 在遍历内存块时,如果发现 signature 既不是 0x4D 也不是 0x5A,说明这串 arena 元数据被写坏了,于是返回 ERROR_ARENA_TRASHED。
重点:Win32 kernel 基本不用这个错误码。它更像历史遗留化石。Raymond 说这反而让它适合测试里 mock 错误条件:如果你看到 error 7,大概率是测试 harness 故意塞的,不是真系统错误。
顺便吐槽那些“教你修 ERROR_ARENA_TRASHED”的网站:错误码实际都不用了,但网页还一本正经让你跑系统扫描、更新驱动、修复系统文件。经典 SEO 垃圾内容,什么都不懂但非常自信。我看了都替它尴尬。
又是 Raymond 的上古 Windows 考古,没啥工程实用性,但适合饭后看。
Vittorio Romeo 这篇讲他在 SFML fork 里写的 sfex::Coroutine,一个 200 行左右、stackless、宏实现、可序列化、无分配 的游戏脚本 coroutine。
核心观点:C++20 coroutine 语法很好看,但不适合游戏逻辑:
std::coroutine_handle 是 opaque handle,看不到内部状态promise_type / awaitable / final_suspend / initial_suspend 一堆样板代码,烦游戏逻辑不一样。boss AI、cutscene、dialogue 不是“一次 async 请求结束就扔”的临时任务,它们是游戏对象状态的一部分。玩家随时 F5 quick-save,你要能把“boss 正停在 phase 3,还剩 0.3 秒 wait”一起存下来。
目标:
int state 加 struct 成员state 和成员就行一个 dialogue 例子:
struct DialogueScene : sfex::Coroutine
{
Yield operator()(World& world)
{
SFEX_CO_BEGIN;
world.showText("Hero", "I finally got you.");
SFEX_CO_YIELD(Wait{1.0f});
world.showText("Villain", "You're too late!");
SFEX_CO_YIELD(Wait{0.5f});
world.showText("Hero", "We'll see about that!");
SFEX_CO_YIELD(Wait{1.0f});
world.hideText();
SFEX_CO_RETURN(Done{});
SFEX_CO_END;
}
};
Yield 是 std::variant<NextFrame, Wait, Done>。driver 每帧调用一次,根据返回值决定下一帧继续、等几秒、还是结束:
DialogueScene scene;
float waitTimer = 0.f;
while (true)
{
const float dt = clock.restart().asSeconds();
if (waitTimer > 0.f)
{
waitTimer -= dt;
}
else
{
scene(world).match(
[&](NextFrame) { /* ...run again next frame... */ },
[&](Wait w) { waitTimer = w.seconds; },
[&](Done) { /* ...proceed to next scene... */ });
}
// ... draw, display ...
}
平铺的对话用 FSM 也不丑:
struct DialogueSceneFSM
{
int step = 0;
float waitTimer = 0.f;
bool tick(World& world, float dt)
{
if (waitTimer > 0.f)
{
waitTimer -= dt;
return true;
}
switch (step++)
{
case 0: world.showText("Hero", "I finally got you."); waitTimer = 1.0f; return true;
case 1: world.showText("Villain", "You're too late!"); waitTimer = 0.5f; return true;
case 2: world.showText("Hero", "We'll see about that!"); waitTimer = 1.0f; return true;
case 3: world.hideText(); return false;
}
return false;
}
};
但一旦有嵌套时间逻辑,比如角色走三步,每步之间停一下,FSM 就开始炸。coroutine 还是自然的 for + while + yield:
struct CutsceneScene : sfex::Coroutine
{
int paceIdx = 0;
float t = 0.f;
sf::Vec2f paceStartPos;
Yield operator()(World& world)
{
SFEX_CO_BEGIN;
// ...
for (paceIdx = 0; paceIdx < 3; ++paceIdx)
{
paceStartPos = world.villainPos;
t = 0.f;
while (t < 1.f)
{
t += world.dt;
world.villainPos.x = paceStartPos.x - 100.f * t;
SFEX_CO_YIELD(NextFrame{});
}
SFEX_CO_YIELD(Wait{0.30f});
}
world.showText("Hero", "Don't come any closer!");
SFEX_CO_YIELD(Wait{1.5f});
SFEX_CO_RETURN(Done{});
SFEX_CO_END;
}
};
注意 paceIdx/t/paceStartPos 都是成员,不能是局部变量。任何要跨 yield 存活的东西都得放 struct 上,后面会说这是这套宏方案最大的 footgun。
原理:switch + case label。 宏展开后其实就是手写状态机:
Yield DialogueScene::operator()(World& world)
{
switch (state)
{
case 0:
world.showText("Hero", "I finally got you.");
state = 1;
return Wait{1.0f};
case 1:
world.showText("Villain", "You're too late!");
state = 2;
return Wait{0.5f};
case 2:
world.showText("Hero", "We'll see about that!");
state = 3;
return Wait{1.0f};
case 3:
world.hideText();
state = 0;
return Done{};
}
std::unreachable();
}
C/C++ 的 case label 可以嵌在 switch body 任意深度(只要中间没有另一个 switch),所以 yield 可以写成“设置 state + return + 下一个 case label”。经典 protothreads 技巧。
宏定义:
#define SFEX_CO_BEGIN \
static constexpr int _sfex_base = __COUNTER__; \
switch (state) { case 0:;
#define SFEX_CO_YIELD(value) \
do { \
state = (__COUNTER__ + 1) - _sfex_base; \
return value; \
case (__COUNTER__ - _sfex_base):; \
} while (0)
#define SFEX_CO_RETURN(value) \
do \
{ \
state = 0; \
return value; \
} while (0)
#define SFEX_CO_END \
} \
\
std::unreachable();
这里用 __COUNTER__ 而不是 __LINE__,原因是序列化。__LINE__ 会因为加空行、格式化、插注释而变,旧存档里的 state=47 可能突然跳到另一个 yield。__COUNTER__ 是按宏使用次数递增,普通改格式不影响。代价是:你新增/删除 yield,后面的 state 还是会变,所以正式游戏还得给 coroutine 加 revision,旧存档不匹配就拒绝加载或迁移。
组合:SFEX_CO_AWAIT。 敌人 AI 可以把两个子 coroutine 作为成员,循环 await:
struct RingFire : sfex::Coroutine
{
int rings = 4;
int i = 0;
Yield operator()(World& world, Enemy& self)
{
SFEX_CO_BEGIN;
for (i = 0; i < rings; ++i)
{
world.spawnBulletRing(self.pos, /* count */ 8, /* speed */ 120.f);
SFEX_CO_YIELD(Wait{0.4f});
}
SFEX_CO_RETURN(Done{});
SFEX_CO_END;
}
};
struct AimedBurst : sfex::Coroutine
{
int shots = 5;
int i = 0;
Yield operator()(World& world, Enemy& self)
{
SFEX_CO_BEGIN;
for (i = 0; i < shots; ++i)
{
world.spawnBulletAimed(self.pos, world.player.pos, /* speed */ 220.f);
SFEX_CO_YIELD(Wait{0.2f});
}
SFEX_CO_RETURN(Done{});
SFEX_CO_END;
}
};
父 coroutine:
struct EnemyAI : sfex::Coroutine
{
RingFire ring;
AimedBurst aimed;
Yield operator()(World& world, Enemy& self)
{
SFEX_CO_BEGIN;
while (true)
{
ring = {};
SFEX_CO_AWAIT(ring(world, self));
SFEX_CO_YIELD(Wait{1.0f});
aimed = {};
SFEX_CO_AWAIT(aimed(world, self));
SFEX_CO_YIELD(Wait{1.0f});
}
SFEX_CO_END;
}
};
AWAIT 本质是 tick child,如果 child 没结束就把 child 的 yield 往上传。没有分配,没有 opaque handle,父对象销毁时子 coroutine 也跟着销毁。
坑:
lock_guard/unique_ptr/file handle 会在 yield 时析构。要跨 yield 持有资源,也得放成员上__COUNTER__ 如果在同一个 coroutine 函数体里被别的宏用,会扰乱 state 编号这个实现其实就是“可序列化 protothreads for game scripting”。花里胡哨但挺有意思。C++20 coroutine 是通用异步机制,适合网络/IO/generator;游戏脚本这种“状态就是数据、要存档、要热重载/回放”的场景,手写状态机或者这种宏 coroutine 反而更可控。宏当然丑,但能存档就是硬道理。