brpc
brpc作为一个基础框架,在很多项目中采用,也设计了很多数据结构,这里根据资料总结一下
首先brpc本身的资料就够多了。这里复读一下,总结概念
[toc]
有栈协程
这里有个详细总结 https://github.com/lishaojiang/talkcoroutine
简单了解汇编
函数的传参
函数的传参可以用纯用栈来传,也可以在参数少的情况下直接用寄存器传参,在多的情况,寄存器加栈一起,至于何种方式传参,传参顺序,用那些那种寄存器,只是一种约定,也就fastcall,stdcall,cdecl等。如windows的32平台下,eax常用来作返回什,ecx用作this指针,edx作为第二个参数,linux的32位下依次用用ebx,ecx,edx,但64位下就是RDI、RSI、RDX、RCX、R8、R9;arm64平台依次用x0,x1,x2…x7。实际上至于用什么寄存器传,也是看约定的,不是绝对的,当我们对汇编有很好的撑控能力后,的确可以不遵守,但前提要保证正确。
函数的返回值
函数的返回值,一般来说用eax/rax/x0作返回,但也不是必然的。一个函数也有借用其它寄存器作返回值,如果edx,xmm0。再有就是函数返回值比如是一个结构体,一个寄存器大小无法返回,可能会把返回值地址作隐藏的参数传递,后面说到boost的协程切换函数,就是这样的。
函数的调用与返回
从c/c++层面说函数调用与返回,实际没有太多花样。我们从汇编层面说一下,一般调用都用*call functionA*的形式,返回用ret的形式。首先这里面有一些省略,有一些等价过程,这些等价过程,对我们协程来说不但非常的重要,还要使用这些知识点,这里还涉及栈的push与pop,我们有必要细说一些。
理解Push ,Pop 指令的等效过程
push eax; // 等同两条伪指令 1.sub esp,4; // 将esp减4,因为栈向下生长的,伪指令,仅供理解 2.mov esp,eax; // 将eax 放到esp中,伪指令,仅供理解 pop eax; // 等同两条伪指令 1.add esp,4; // 将esp加4,因为栈向下生长的,伪指令,仅供理解 2.mov eax,esp; // 将esp 放到eax中,伪指令,仅供理解
深入理解call,jmp,ret指令的等效过程
call 内存/立即数/寄储器; //1.原下个EIP对应指令地址入栈;(esp-4) //2.修改EIP为新的; //3.跳转EIP执行; jmp 内存/立即数/寄储器; //1.修改EIP为新的; //2.跳转EIP执行; 注:无条件执行,不改变栈 ret; //1.修改EIP为新的栈顶数据; //2.esp +4;(恢复栈) //3.跳转EIP执行;
拿到EIP的值
__asm { call NEXT NEXT: pop eax }
组件
Work Stealing
核心思路是当本线程内没有待执行的任务时,从其他线程的任务队列中窃取任务执行。首先来看 work stealing 时使用的无锁队列 src/bthread/work_stealing_queue.h
template <typename T>
class WorkStealingQueue {
public:
WorkStealingQueue() : _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}
int init(size_t capacity) {
if (_capacity != 0) {
LOG(ERROR) << "Already initialized";
return -1;
}
if (capacity == 0) {
LOG(ERROR) << "Invalid capacity=" << capacity;
return -1;
}
if (capacity & (capacity - 1)) {
LOG(ERROR) << "Invalid capacity=" << capacity
<< " which must be power of 2";
return -1;
}
_buffer = new (std::nothrow) T[capacity];
if (NULL == _buffer) {
return -1;
}
_capacity = capacity;
return 0;
}
// 从底部追加,非线程安全,与 steal 线程安全
bool push(const T& x) {
const size_t b = _bottom.load(butil::memory_order_relaxed);
const size_t t = _top.load(butil::memory_order_acquire);
if (b >= t + _capacity) { // Full queue.
return false;
}
_buffer[b & (_capacity - 1)] = x;
_bottom.store(b + 1, butil::memory_order_release);
return true;
}
// 从底部弹出,非线程安全,与 steal 线程安全
bool pop(T* val) {
const size_t b = _bottom.load(butil::memory_order_relaxed);
size_t t = _top.load(butil::memory_order_relaxed);
if (t >= b) {
// fast check since we call pop() in each sched.
// Stale _top which is smaller should not enter this branch.
return false;
}
const size_t newb = b - 1;
_bottom.store(newb, butil::memory_order_relaxed);
butil::atomic_thread_fence(butil::memory_order_seq_cst);
t = _top.load(butil::memory_order_relaxed);
if (t > newb) {
_bottom.store(b, butil::memory_order_relaxed);
return false;
}
*val = _buffer[newb & (_capacity - 1)];
if (t != newb) {
return true;
}
// Single last element, compete with steal()
// 对于最后一个元素,使用 CAS 保证和 steal 并发时的线程安全
const bool popped = _top.compare_exchange_strong(
t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);
_bottom.store(b, butil::memory_order_relaxed);
return popped;
}
// 从顶部窃取,线程安全
bool steal(T* val) {
size_t t = _top.load(butil::memory_order_acquire);
size_t b = _bottom.load(butil::memory_order_acquire);
if (t >= b) {
// Permit false negative for performance considerations.
return false;
}
do {
butil::atomic_thread_fence(butil::memory_order_seq_cst);
b = _bottom.load(butil::memory_order_acquire);
if (t >= b) {
return false;
}
*val = _buffer[t & (_capacity - 1)];
// CAS 保证线程安全
} while (!_top.compare_exchange_strong(
t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed));
return true;
}
private:
// Copying a concurrent structure makes no sense.
DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);
butil::atomic<size_t> _bottom;
size_t _capacity;
T* _buffer;
butil::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top; // 分开到两个 CacheLine
};
Futex & Butex
用futex api来实现butex
按理说,mutex也是futex实现的。可能是粒度不用,所以能比mutex快,butex只阻塞bthread而不是整个thread,怎么做到?
// sys_futex.h
inline int futex_wait_private(void *addr1, int expected,
const timespec *timeout) {
// 当 *addr1 == expected 时,线程挂起等待
return syscall(SYS_futex, addr1, (FUTEX_WAIT | FUTEX_PRIVATE_FLAG), expected,
timeout, NULL, 0);
}
inline int futex_wake_private(void *addr1, int nwake) {
// 唤醒 nwake 个等待的线程
return syscall(SYS_futex, addr1, (FUTEX_WAKE | FUTEX_PRIVATE_FLAG), nwake,
NULL, NULL, 0);
}
class FastPthreadMutex {
public:
FastPthreadMutex() : _futex(0) {}
~FastPthreadMutex() {}
void lock();
void unlock();
bool try_lock();
private:
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
int lock_contended();
unsigned _futex;
};
// mutex.cpp
// Implement bthread_mutex_t related functions
struct MutexInternal {
butil::static_atomic<unsigned char> locked;
butil::static_atomic<unsigned char> contended;
unsigned short padding;
};
const MutexInternal MUTEX_CONTENDED_RAW = 1, 1, 0;
const MutexInternal MUTEX_LOCKED_RAW = 1, 0, 0;
// Define as macros rather than constants which can't be put in read-only
// section and affected by initialization-order fiasco.
#define BTHREAD_MUTEX_CONTENDED \
(*(const unsigned *)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned *)&bthread::MUTEX_LOCKED_RAW)
int FastPthreadMutex::lock_contended() {
butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
// 将状态原子修改为加锁 + 等待
// 如果这期间锁被释放了,那么该函数直接退出
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
// 否则调用 futex 尝试挂起当前线程,并且要求状态为加锁 + 等待(因为中间锁可能会被释放)
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
errno != EWOULDBLOCK) {
return errno;
}
// futex_wait_private 非 EWOULDBLOCK 失败的情况下,继续循环重试
}
return 0;
}
void FastPthreadMutex::lock() {
bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
// 加锁,如果 locked 原先的值为 0,lock 直接返回成功
if (split->locked.exchange(1, butil::memory_order_acquire)) {
// 如果已经是加锁状态,那么进入锁竞争的处理
(void)lock_contended();
}
}
bool FastPthreadMutex::try_lock() {
bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
return !split->locked.exchange(1, butil::memory_order_acquire);
}
void FastPthreadMutex::unlock() {
butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
const unsigned prev = whole->exchange(0, butil::memory_order_release);
// CAUTION: the mutex may be destroyed, check comments before butex_create
if (prev != BTHREAD_MUTEX_LOCKED) {
// 如果有挂起等待的线程,执行唤醒
futex_wake_private(whole, 1);
}
}
实现分析
// butex.h
// Create a butex which is a futex-like 32-bit primitive for synchronizing
// bthreads/pthreads.
// Returns a pointer to 32-bit data, NULL on failure.
// NOTE: all butexes are private(not inter-process).
void *butex_create();
// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void *butex);
// Atomically wait on |butex| if *butex equals |expected_value|, until the
// butex is woken up by butex_wake*, or CLOCK_REALTIME reached |abstime| if
// abstime is not NULL.
// About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void *butex, int expected_value, const timespec *abstime);
// butex.cpp
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
// tids of pthreads are 0
bthread_t tid;
// Erasing node from middle of LinkedList is thread-unsafe, we need
// to hold its container's lock.
butil::atomic<Butex *> container;
};
struct ButexBthreadWaiter : public ButexWaiter {
TaskMeta *task_meta;
TimerThread::TaskId sleep_id;
WaiterState waiter_state;
int expected_value;
Butex *initial_butex;
TaskControl *control;
};
// 双向链表
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
struct BAIDU_CACHELINE_ALIGNMENT Butex {
Butex() {}
~Butex() {}
butil::atomic<int> value;
ButexWaiterList waiters;
internal::FastPthreadMutex waiter_lock;
};
// 注意 value 的地址与 butex 对象一致,方便后面转换
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
void *butex_create() {
Butex *b = butil::get_object<Butex>();
if (b) {
return &b->value;
}
return NULL;
}
int butex_wait(void *arg, int expected_value, const timespec *abstime) {
Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
// Sometimes we may take actions immediately after unmatched butex,
// this fence makes sure that we see changes before changing butex.
butil::atomic_thread_fence(butil::memory_order_acquire);
return -1;
}
TaskGroup *g = tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
return butex_wait_from_pthread(g, b, expected_value, abstime);
}
// 在栈上创建 waiter 对象
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
bbw.tid = g->current_tid();
bbw.container.store(NULL, butil::memory_order_relaxed);
bbw.task_meta = g->current_task();
bbw.sleep_id = 0;
bbw.waiter_state = WAITER_STATE_READY;
bbw.expected_value = expected_value;
bbw.initial_butex = b;
bbw.control = g->control();
if (abstime != NULL) {
... // 先忽略对时间的处理
}
// release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw); // 在执行下个 bthread 前执行 wait_for_butex
TaskGroup::sched(&g); // 主动 Yield
// erase_from_butex_and_wakeup (called by TimerThread) is possibly still
// running and using bbw. The chance is small, just spin until it's done.
BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
30 /*nops before sched_yield*/);
// If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
// Spin until current_waiter != NULL.
BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
NULL, butil::memory_order_acquire) == NULL,
30 /*nops before sched_yield*/);
bool is_interrupted = false;
if (bbw.task_meta->interrupted) {
// Race with set and may consume multiple interruptions, which are OK.
bbw.task_meta->interrupted = false;
is_interrupted = true;
}
// If timed out as well as value unmatched, return ETIMEDOUT.
if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
errno = ETIMEDOUT;
return -1;
} else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
errno = EWOULDBLOCK;
return -1;
} else if (is_interrupted) {
errno = EINTR;
return -1;
}
return 0;
}
static void wait_for_butex(void *arg) {
ButexBthreadWaiter *const bw = static_cast<ButexBthreadWaiter *>(arg);
Butex *const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
// and removed by TimerThread, in which case we should stop queueing.
//
// Visibility of waiter_state:
// [bthread] [TimerThread]
// waiter_state = TIMED
// tt_lock { add task }
// tt_lock { get task }
// waiter_lock { waiter_state=TIMEDOUT }
// waiter_lock { use waiter_state }
// tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
// sequenced by two locks, both threads are guaranteed to see the correct
// value.
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
// 加锁后,校验 value 和 expected_value 是否相等
if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY /*1*/ &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw); // 加入 waiters 队列
bw->container.store(b, butil::memory_order_relaxed);
return;
}
}
// b->container is NULL which makes erase_from_butex_and_wakeup() and
// TaskGroup::interrupt() no-op, there's no race between following code and
// the two functions. The on-stack ButexBthreadWaiter is safe to use and
// bw->waiter_state will not change again.
// 如果没有成功 wait,则清理定时器,并切回原协程
unsleep_if_necessary(bw, get_global_timer_thread());
tls_task_group->ready_to_run(bw->tid);
// FIXME: jump back to original thread is buggy.
// // Value unmatched or waiter is already woken up by TimerThread, jump
// // back to original bthread.
// TaskGroup* g = tls_task_group;
// ReadyToRunArgs args = { g->current_tid(), false };
// g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
// // 2: Don't run remained because we're already in a remained function
// // otherwise stack may overflow.
// TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}
int butex_wake(void *arg) {
Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
ButexWaiter *front = NULL;
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->waiters.empty()) {
return 0;
}
front = b->waiters.head()->value();
front->RemoveFromList(); // 从队列中取出等待的 waiter
front->container.store(NULL, butil::memory_order_relaxed);
}
if (front->tid == 0) {
wakeup_pthread(static_cast<ButexPthreadWaiter *>(front));
return 1;
}
ButexBthreadWaiter *bbw = static_cast<ButexBthreadWaiter *>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup *g = tls_task_group;
if (g) {
TaskGroup::exchange(&g, bbw->tid); // 如果在 bthread 环境中,直接唤醒
} else {
// 否认触发重新调度
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
}
return 1;
}
DoublyBufferedData
双buffer切换要点就是切换时机,如果能确定业务的访问周期,那自然定时调用就行了,根本没有什么锁的开销。如果业务侧不确定,那就需要锁
简单实现就是读写锁。锁住,更新index,写锁会阻塞所有的读。DoublyBufferedData的设计考虑因为一把锁阻塞所有读写,考虑引入N+1把锁,写写阻塞,写等待所有读锁释放,降低临界区
template <typename T, typename TLS>
template <typename Fn>
size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
// _modify_mutex sequences modifications. Using a separate mutex rather
// than _wrappers_mutex is to avoid blocking threads calling
// AddWrapper() or RemoveWrapper() too long. Most of the time, modifications
// are done by one thread, contention should be negligible.
BAIDU_SCOPED_LOCK(_modify_mutex);
int bg_index = !_index.load(butil::memory_order_relaxed);
// background instance is not accessed by other threads, being safe to
// modify.
const size_t ret = fn(_data[bg_index]);
if (!ret) {
return 0;
}
// Publish, flip background and foreground.
// The release fence matches with the acquire fence in UnsafeRead() to
// make readers which just begin to read the new foreground instance see
// all changes made in fn.
_index.store(bg_index, butil::memory_order_release);
bg_index = !bg_index;
// Wait until all threads finishes current reading. When they begin next
// read, they should see updated _index.
{
BAIDU_SCOPED_LOCK(_wrappers_mutex);
for (size_t i = 0; i < _wrappers.size(); ++i) {
_wrappers[i]->WaitReadDone();
}
}
const size_t ret2 = fn(_data[bg_index]);
CHECK_EQ(ret2, ret) << "index=" << _index.load(butil::memory_order_relaxed);
return ret2;
}
设计的很远。代码走读可以看这篇文章,写得不错
ParkingLot
// parking_lot.h
// Park idle workers.
class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
public:
class State {
public:
State() : val(0) {}
bool stopped() const { return val & 1; }
private:
friend class ParkingLot;
State(int val) : val(val) {}
int val;
};
ParkingLot() : _pending_signal(0) {}
// Wake up at most `num_task' workers.
// Returns #workers woken up.
// 唤醒至多 num_task 个等待的线程
// 注意 _pending_signal 增加了 num_task * 2,因为最后一位用来表示是否 stop
int signal(int num_task) {
_pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
return futex_wake_private(&_pending_signal, num_task);
}
// Get a state for later wait().
State get_state() {
return _pending_signal.load(butil::memory_order_acquire);
}
// Wait for tasks.
// If the `expected_state' does not match, wait() may finish directly.
// 工作线程尝试挂起等待,此时会检查 _pending_signal 的数值,若不一致则挂起,继续执行
void wait(const State &expected_state) {
futex_wait_private(&_pending_signal, expected_state.val, NULL);
}
// Wakeup suspended wait() and make them unwaitable ever.
void stop() {
_pending_signal.fetch_or(1);
// 将 _pending_signal 设为 stop,唤醒所有等待的工作线程,触发结束
futex_wake_private(&_pending_signal, 10000);
}
private:
// higher 31 bits for signalling, LSB for stopping.
butil::atomic<int> _pending_signal;
};
TaskControl
中包含一组 ParkingLot
对象,TaskGroup
初始化时会哈希打散使用这些对象:
// task_control.h
class TaskControl {
...
static const int PARKING_LOT_NUM = 4;
ParkingLot _pl[PARKING_LOT_NUM];
};
// task_group.cpp
TaskGroup::TaskGroup(TaskControl *c)
: _cur_meta(NULL), _control(c), _num_nosignal(0), _nsignaled(0),
_last_run_ns(butil::cpuwide_time_ns()), _cumulated_cputime_ns(0),
_nswitch(0), _last_context_remained(NULL),
_last_context_remained_arg(NULL), _pl(NULL), _main_stack(NULL),
_main_tid(0), _remote_num_nosignal(0), _remote_nsignaled(0) {
_steal_seed = butil::fast_rand();
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
// 根据线程号哈希到某个 ParkingLot 对象上
_pl = &c->_pl[butil::fmix64(pthread_numeric_id()) %
TaskControl::PARKING_LOT_NUM];
CHECK(c);
}
当前工作线程在窃取任务前会先取一次 ParkingLot
的状态,当状态发生改变时会直接跳过 wait
:
bool TaskGroup::wait_task(bthread_t *tid) {
do {
if (_last_pl_state.stopped()) {
// 如果已经 stop
return false;
}
// 根据当前的 _last_pl_state 状态判断是否挂起
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
}
} while (true);
}
bool TaskGroup::steal_task(bthread_t *tid) {
if (_remote_rq.pop(tid)) {
return true;
}
// _last_pl_state 的状态在这里进行更新
// _last_pl_state 发生变化时 wait_task 中的 wait 会直接跳过
// 这里容忍 false postive,增加的开销是多一次尝试 steal_task
_last_pl_state = _pl->get_state();
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
当有新任务加入时,则调用 TaskControl::signal_task
通过 ParkingLog
唤醒等待的工作线程:
void TaskControl::signal_task(int num_task) {
if (num_task <= 0) {
return;
}
// TODO(gejun): Current algorithm does not guarantee enough threads will
// be created to match caller's requests. But in another side, there's also
// many useless signalings according to current impl. Capping the concurrency
// is a good balance between performance and timeliness of scheduling.
// 上方的官方注释也写明了,限制并发可以更好地平衡性能和调度及时性,这里直接将唤醒的上限设为 2
if (num_task > 2) {
num_task = 2;
}
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= _pl[start_index].signal(1); // 通过 ParkingLog 唤醒
if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= _pl[start_index].signal(1);
}
}
if (num_task > 0 &&
FLAGS_bthread_min_concurrency >
0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) <
FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
// 当仍然有需要执行的任务时,尝试增加工作线程
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) <
FLAGS_bthread_concurrency) {
add_workers(1);
}
}
}
void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
push_rq(tid);
if (nosignal) {
++_num_nosignal; // 对于设定了不触发信号的任务,仅增加计数
} else {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
// 调用 signal_task 唤醒
_control->signal_task(1 + additional_signal);
}
}
// 将还没有触发信号的任务统一触发信号(工作线程内调用)
void TaskGroup::flush_nosignal_tasks() {
const int val = _num_nosignal;
if (val) {
_num_nosignal = 0;
_nsignaled += val;
_control->signal_task(val);
}
}
// 从工作线程外提交任务
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
_remote_rq._mutex.lock(); // 加锁
while (!_remote_rq.push_locked(tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
<< _remote_rq.capacity();
::usleep(1000);
_remote_rq._mutex.lock();
}
if (nosignal) {
++_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_nsignaled += 1 + additional_signal;
_remote_rq._mutex.unlock(); // 锁的生命周期内也保护了计数相关变量
_control->signal_task(1 + additional_signal);
}
}
void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
const int val = _remote_num_nosignal;
if (!val) {
locked_mutex.unlock();
return;
}
_remote_num_nosignal = 0;
_remote_nsignaled += val;
locked_mutex.unlock();
_control->signal_task(val);
}
Timer Thread
struct TimerThreadOptions {
// Scheduling requests are hashed into different bucket to improve
// scalability. However bigger num_buckets may NOT result in more scalable
// schedule() because bigger values also make each buckets more sparse
// and more likely to lock the global mutex. You better not change
// this value, just leave it to us.
// Default: 13
size_t num_buckets;
// If this field is not empty, some bvar for reporting stats of TimerThread
// will be exposed with this prefix.
// Default: ""
std::string bvar_prefix;
// Constructed with default options.
TimerThreadOptions();
};
// TimerThread is a separate thread to run scheduled tasks at specific time.
// At most one task runs at any time, don't put time-consuming code in the
// callback otherwise the task may delay other tasks significantly.
class TimerThread {
public:
struct Task;
class Bucket;
typedef uint64_t TaskId;
const static TaskId INVALID_TASK_ID;
TimerThread();
~TimerThread();
// Start the timer thread.
// This method should only be called once.
// return 0 if success, errno otherwise.
// 启动 TimerThread 线程
int start(const TimerThreadOptions *options);
// Stop the timer thread. Later schedule() will return INVALID_TASK_ID.
// 结束 TimerThread 线程
void stop_and_join();
// Schedule |fn(arg)| to run at realtime |abstime| approximately.
// Returns: identifier of the scheduled task, INVALID_TASK_ID on error.
// 增加一个定时器任务
TaskId schedule(void (*fn)(void *), void *arg, const timespec &abstime);
// Prevent the task denoted by `task_id' from running. `task_id' must be
// returned by schedule() ever.
// Returns:
// 0 - Removed the task which does not run yet
// -1 - The task does not exist.
// 1 - The task is just running.
// 取消一个定时器任务
int unschedule(TaskId task_id);
// Get identifier of internal pthread.
// Returns (pthread_t)0 if start() is not called yet.
pthread_t thread_id() const { return _thread; }
private:
// the timer thread will run this method.
void run(); // TimerThread 线程函数
static void *run_this(void *arg);
bool _started; // whether the timer thread was started successfully.
butil::atomic<bool> _stop;
TimerThreadOptions _options;
Bucket *_buckets; // list of tasks to be run
internal::FastPthreadMutex _mutex; // protect _nearest_run_time
int64_t _nearest_run_time;
// the futex for wake up timer thread. can't use _nearest_run_time because
// it's 64-bit.
int _nsignals;
pthread_t _thread; // all scheduled task will be run on this thread
};
void TimerThread::run() {
int64_t last_sleep_time = butil::gettimeofday_us();
// min heap of tasks (ordered by run_time)
// TimerThread 线程内通过最小堆维护定时任务序列
std::vector<Task *> tasks;
tasks.reserve(4096);
while (!_stop.load(butil::memory_order_relaxed)) {
{
BAIDU_SCOPED_LOCK(_mutex);
_nearest_run_time = std::numeric_limits<int64_t>::max();
}
// 从所有的 Bucket 的获取任务
for (size_t i = 0; i < _options.num_buckets; ++i) {
Bucket &bucket = _buckets[i];
for (Task *p = bucket.consume_tasks(); p != nullptr; ++nscheduled) {
Task *next_task = p->next;
// 对于已经取消的任务,不会加入到堆里,直接删除。这也是高性能的关键
if (!p->try_delete()) {
// 循环加入到堆里
tasks.push_back(p);
std::push_heap(tasks.begin(), tasks.end(), task_greater);
}
p = next_task;
}
}
bool pull_again = false;
while (!tasks.empty()) {
// 从最小堆中取出任务
Task *task1 = tasks[0];
if (task1->try_delete()) { // already unscheduled
std::pop_heap(tasks.begin(), tasks.end(), task_greater);
tasks.pop_back();
continue;
}
// 判断是否到时间了,没有则退出
if (butil::gettimeofday_us() < task1->run_time) { // not ready yet.
break;
}
{
BAIDU_SCOPED_LOCK(_mutex);
if (task1->run_time > _nearest_run_time) {
// 检查当前的 _nearest_run_time,确认是否有更新的任务
// 有的话则再次从 Bucket 中拉取
pull_again = true;
break;
}
}
std::pop_heap(tasks.begin(), tasks.end(), task_greater);
tasks.pop_back();
// 执行定时任务并删除
if (task1->run_and_delete()) {
++ntriggered;
}
}
if (pull_again) {
BT_VLOG << "pull again, tasks=" << tasks.size();
continue;
}
// The realtime to wait for.
int64_t next_run_time = std::numeric_limits<int64_t>::max();
if (tasks.empty()) {
next_run_time = std::numeric_limits<int64_t>::max();
} else {
next_run_time = tasks[0]->run_time;
}
// Similarly with the situation before running tasks, we check
// _nearest_run_time to prevent us from waiting on a non-earliest
// task. We also use the _nsignal to make sure that if new task
// is earlier that the realtime that we wait for, we'll wake up.
int expected_nsignals = 0;
{
BAIDU_SCOPED_LOCK(_mutex);
if (next_run_time > _nearest_run_time) {
// a task is earlier that what we would wait for.
// We need to check buckets.
continue;
} else {
_nearest_run_time = next_run_time;
expected_nsignals = _nsignals;
}
}
timespec *ptimeout = NULL;
timespec next_timeout = {0, 0};
const int64_t now = butil::gettimeofday_us();
if (next_run_time != std::numeric_limits<int64_t>::max()) {
next_timeout = butil::microseconds_to_timespec(next_run_time - now);
ptimeout = &next_timeout;
}
busy_seconds += (now - last_sleep_time) / 1000000.0;
// 计算需要等待的时间,通过 futex 等待
futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
last_sleep_time = butil::gettimeofday_us();
}
BT_VLOG << "Ended TimerThread=" << pthread_self();
}
TimerThread::Task *TimerThread::Bucket::consume_tasks() {
Task *head = NULL;
if (_task_head) { // NOTE: schedule() and consume_tasks() are sequenced
// by TimerThread._nearest_run_time and fenced by TimerThread._mutex.
// We can avoid touching the mutex and related cacheline when the
// bucket is actually empty.
BAIDU_SCOPED_LOCK(_mutex);
if (_task_head) {
// 获取任务时直接将链表指针传回
head = _task_head;
_task_head = NULL;
_nearest_run_time = std::numeric_limits<int64_t>::max();
}
}
return head;
}
bool TimerThread::Task::run_and_delete() {
const uint32_t id_version = version_of_task_id(task_id);
uint32_t expected_version = id_version;
// This CAS is rarely contended, should be fast.
// 通过 CAS 判定当前任务是否还需要做
if (version.compare_exchange_strong(expected_version, id_version + 1,
butil::memory_order_relaxed)) {
fn(arg); // 执行定时任务
// The release fence is paired with acquire fence in
// TimerThread::unschedule to make changes of fn(arg) visible.
version.store(id_version + 2, butil::memory_order_release);
butil::return_resource(slot_of_task_id(task_id));
return true;
} else if (expected_version == id_version + 2) {
// 对于取消的任务直接归还资源
butil::return_resource(slot_of_task_id(task_id));
return false;
} else {
// Impossible.
LOG(ERROR) << "Invalid version=" << expected_version << ", expecting "
<< id_version + 2;
return false;
}
}
TimerThread::TaskId TimerThread::schedule(void (*fn)(void *), void *arg,
const timespec &abstime) {
if (_stop.load(butil::memory_order_relaxed) || !_started) {
// Not add tasks when TimerThread is about to stop.
return INVALID_TASK_ID;
}
// Hashing by pthread id is better for cache locality.
// 新增任务时分片到不同的 Bucket 中
const Bucket::ScheduleResult result =
_buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]
.schedule(fn, arg, abstime);
// 如果有更早的唤醒时间
if (result.earlier) {
bool earlier = false;
const int64_t run_time = butil::timespec_to_microseconds(abstime);
{
BAIDU_SCOPED_LOCK(_mutex);
if (run_time < _nearest_run_time) {
_nearest_run_time = run_time;
++_nsignals;
earlier = true;
}
}
if (earlier) {
// 使用 futex 唤醒
futex_wake_private(&_nsignals, 1);
}
}
return result.task_id;
}
TimerThread::Bucket::schedule(void (*fn)(void *), void *arg,
const timespec &abstime) {
butil::ResourceId<Task> slot_id;
Task *task = butil::get_resource<Task>(&slot_id);
if (task == NULL) {
ScheduleResult result = {INVALID_TASK_ID, false};
return result;
}
task->next = NULL;
task->fn = fn;
task->arg = arg;
task->run_time = butil::timespec_to_microseconds(abstime);
uint32_t version = task->version.load(butil::memory_order_relaxed);
if (version == 0) { // skip 0.
// 分配的版本总是跳过 INVALID_TASK_ID
task->version.fetch_add(2, butil::memory_order_relaxed);
version = 2;
}
const TaskId id = make_task_id(slot_id, version);
task->task_id = id;
bool earlier = false;
{
BAIDU_SCOPED_LOCK(_mutex);
// 加锁的临界区很短
task->next = _task_head;
_task_head = task;
if (task->run_time < _nearest_run_time) {
_nearest_run_time = task->run_time;
earlier = true;
}
}
ScheduleResult result = {id, earlier};
return result;
}
TimerThread
有效的原因有以下几点:
Bucket
锁内链表增加任务的操作是 O(1)\mathcal{O}(1)O(1) 的,临界区短- RPC 场景下超时时间一般不变,大部分插入的时间是递增的,早于
nearest_run_time
而唤醒线程的次数很少 - 通过
ResourcePool
和版本进行删除操作,不参与全局竞争 TimerThread
自行维护小顶堆,不参与全局竞争TimerThread
醒来的频率大约是 RPC 超时的倒数,比如超时时间 100ms,TimerThread
一秒内大约醒 10 次,已经最优
bthread 依赖 TimerThread
实现高效的 usleep
:
// Suspend current thread for at least `microseconds'
// Interruptible by bthread_interrupt().
extern int bthread_usleep(uint64_t microseconds);
int bthread_usleep(uint64_t microseconds) {
bthread::TaskGroup *g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::TaskGroup::usleep(&g, microseconds);
}
return ::usleep(microseconds);
}
// To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup **pg, uint64_t timeout_us) {
if (0 == timeout_us) {
yield(pg);
return 0;
}
TaskGroup *g = *pg;
// We have to schedule timer after we switched to next bthread otherwise
// the timer may wake up(jump to) current still-running context.
// 将定时任务打包为 remained 函数
SleepArgs e = {timeout_us, g->current_tid(), g->current_task(), g};
g->set_remained(_add_sleep_event, &e);
sched(pg); // 当前协程出让
g = *pg;
e.meta->current_sleep = 0;
if (e.meta->interrupted) {
// Race with set and may consume multiple interruptions, which are OK.
e.meta->interrupted = false;
// NOTE: setting errno to ESTOP is not necessary from bthread's
// pespective, however many RPC code expects bthread_usleep to set
// errno to ESTOP when the thread is stopping, and print FATAL
// otherwise. To make smooth transitions, ESTOP is still set instead
// of EINTR when the thread is stopping.
errno = (e.meta->stop ? ESTOP : EINTR);
return -1;
}
return 0;
}
void TaskGroup::_add_sleep_event(void *void_args) {
// Must copy SleepArgs. After calling TimerThread::schedule(), previous
// thread may be stolen by a worker immediately and the on-stack SleepArgs
// will be gone.
SleepArgs e = *static_cast<SleepArgs *>(void_args);
TaskGroup *g = e.group;
// 增加定时任务
TimerThread::TaskId sleep_id;
sleep_id = get_global_timer_thread()->schedule(
ready_to_run_from_timer_thread, void_args,
butil::microseconds_from_now(e.timeout_us));
...
}
static void ready_to_run_from_timer_thread(void *arg) {
CHECK(tls_task_group == NULL);
const SleepArgs *e = static_cast<const SleepArgs *>(arg);
// 到时间后,根据 tid 将任务重新加入队列
e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
}
iobuf
作为收发处理的对象,要小,轻量,尽可能零拷贝,总得比std::any
std::shared_ptr<void>
要强
核心思想,引用计数+分块分片
class IOBuf {
// iobuf.h
// IOBuf is a non-continuous buffer that can be cut and combined w/o copying
// payload. It can be read from or flushed into file descriptors as well.
// IOBuf is [thread-compatible]. Namely using different IOBuf in different
// threads simultaneously is safe, and reading a static IOBuf from different
// threads is safe as well.
// IOBuf is [NOT thread-safe]. Modifying a same IOBuf from different threads
// simultaneously is unsafe and likely to crash.
class IOBuf {
public:
static const size_t DEFAULT_BLOCK_SIZE = 8192; // block 默认大小 8KB
// can't directly use `struct iovec' here because we also need to access the
// reference counter(nshared) in Block*
struct BlockRef {
// NOTICE: first bit of `offset' is shared with BigView::start
uint32_t offset; // block 上的偏移,这里保证 offset <= max_int
uint32_t length; // 和长度
Block *block;
};
// IOBuf is essentially a tiny queue of BlockRefs.
struct SmallView {
BlockRef refs[2]; // 引用少于两个 block
};
// SmallView 与 BigView 大小相同
struct BigView {
int32_t magic;
uint32_t start; // 起始的 block 下标
BlockRef *refs; // block 列表
uint32_t nref; // block 列表大小
uint32_t cap_mask;
size_t nbytes; // 当前总长度
const BlockRef &ref_at(uint32_t i) const {
return refs[(start + i) & cap_mask];
}
BlockRef &ref_at(uint32_t i) { return refs[(start + i) & cap_mask]; }
uint32_t capacity() const { return cap_mask + 1; } // block 列表容量
};
bool _small() const { return _bv.magic >= 0; } // 小于 0 时为 BigView
size_t length() const {
return _small() ? (_sv.refs[0].length + _sv.refs[1].length) : _bv.nbytes;
}
private:
union {
BigView _bv; // _bv.magic 与 _sv.refs[0].offset 地址相同,小于 0 时表示 BigView
SmallView _sv;
};
};
// iobuf.cpp,看一下 append 的流程
void IOBuf::append(const IOBuf &other) {
const size_t nref = other._ref_num();
for (size_t i = 0; i < nref; ++i) {
_push_back_ref(other._ref_at(i));
}
}
// 引用的 Block 数量
inline size_t IOBuf::_ref_num() const {
return _small() ? (!!_sv.refs[0].block + !!_sv.refs[1].block) : _bv.nref;
}
inline void IOBuf::_push_back_ref(const BlockRef &r) {
if (_small()) {
return _push_or_move_back_ref_to_smallview<false>(r);
} else {
return _push_or_move_back_ref_to_bigview<false>(r);
}
}
template <bool MOVE>
void IOBuf::_push_or_move_back_ref_to_bigview(const BlockRef &r) {
BlockRef &back = _bv.ref_at(_bv.nref - 1);
if (back.block == r.block && back.offset + back.length == r.offset) {
// Merge ref
back.length += r.length;
_bv.nbytes += r.length;
if (MOVE) {
r.block->dec_ref();
}
return;
}
if (_bv.nref != _bv.capacity()) {
// block 列表有容量的情况下,直接增加对该 block 的引用
_bv.ref_at(_bv.nref++) = r;
_bv.nbytes += r.length; // 长度叠加
if (!MOVE) {
r.block->inc_ref();
}
return;
}
// resize, don't modify bv until new_refs is fully assigned
// 当前 block 列表容量不足,倍增申请
const uint32_t new_cap = _bv.capacity() * 2;
BlockRef *new_refs = iobuf::acquire_blockref_array(new_cap);
for (uint32_t i = 0; i < _bv.nref; ++i) {
// 复制原 block
new_refs[i] = _bv.ref_at(i);
}
new_refs[_bv.nref++] = r;
// Change other variables
_bv.start = 0;
// 删除原 block 列表
iobuf::release_blockref_array(_bv.refs, _bv.capacity());
_bv.refs = new_refs;
_bv.cap_mask = new_cap - 1;
_bv.nbytes += r.length;
if (!MOVE) {
r.block->inc_ref();
}
}
// Block 的实现
void *(*blockmem_allocate)(size_t) = ::malloc;
void (*blockmem_deallocate)(void *) = ::free;
const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1;
typedef void (*UserDataDeleter)(void *);
struct UserDataExtension {
UserDataDeleter deleter;
};
struct IOBuf::Block {
butil::atomic<int> nshared; // 引用计数
uint16_t flags; // 是否使用自定义 deleter,可以看下方的英文注释
uint16_t abi_check; // original cap, never be zero.
uint32_t size;
uint32_t cap;
Block *portal_next;
// When flag is 0, data points to `size` bytes starting at
// `(char*)this+sizeof(Block)' When flag & IOBUF_BLOCK_FLAGS_USER_DATA is
// non-0, data points to the user data and the deleter is put in
// UserDataExtension at `(char*)this+sizeof(Block)'
char *data;
Block(char *data_in, uint32_t data_size)
: nshared(1), flags(0), abi_check(0), size(0), cap(data_size),
portal_next(NULL), data(data_in) {
iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
butil::memory_order_relaxed);
}
Block(char *data_in, uint32_t data_size, UserDataDeleter deleter)
: nshared(1), flags(IOBUF_BLOCK_FLAGS_USER_DATA), abi_check(0),
size(data_size), cap(data_size), portal_next(NULL), data(data_in) {
get_user_data_extension()->deleter = deleter; // 自定义 deleter
}
// Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
UserDataExtension *get_user_data_extension() {
char *p = (char *)this;
return (UserDataExtension *)(p + sizeof(Block)); // 当前内存之后的 4 字节
}
void inc_ref() {
check_abi();
nshared.fetch_add(1, butil::memory_order_relaxed);
}
void dec_ref() {
check_abi();
if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
butil::atomic_thread_fence(butil::memory_order_acquire);
if (!flags) {
iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
butil::memory_order_relaxed);
this->~Block(); // 调用析构函数,不删除内存
iobuf::blockmem_deallocate(this); // 删除内存
} else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
get_user_data_extension()->deleter(data); // 执行自定义删除
this->~Block();
free(this);
}
}
}
int ref_count() const { return nshared.load(butil::memory_order_relaxed); }
bool full() const { return size >= cap; }
size_t left_space() const { return cap - size; }
};
// 创建 block,同时存储 Block 对象和实际数据
inline IOBuf::Block *create_block(const size_t block_size) {
if (block_size > 0xFFFFFFFFULL) {
LOG(FATAL) << "block_size=" << block_size << " is too large";
return NULL;
}
char *mem = (char *)iobuf::blockmem_allocate(block_size);
if (mem == NULL) {
return NULL;
}
return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
block_size - sizeof(IOBuf::Block));
}
// 追加用户数据,所有权取决于传入的 deleter
int IOBuf::append_user_data(void *data, size_t size, void (*deleter)(void *)) {
if (size > 0xFFFFFFFFULL - 100) {
LOG(FATAL) << "data_size=" << size << " is too large";
return -1;
}
char *mem = (char *)malloc(sizeof(IOBuf::Block) + sizeof(UserDataExtension));
if (mem == NULL) {
return -1;
}
if (deleter == NULL) {
deleter = ::free;
}
IOBuf::Block *b = new (mem) IOBuf::Block((char *)data, size, deleter);
const IOBuf::BlockRef r = {0, b->cap, b};
_move_back_ref(r);
return 0;
}
socket
这里设计成shared_ptr和weak_ptr的形式了,不过客制化了一些
Socket 独有的 SetFailed 可以在需要时确保 Socket 不能被继续 Address 而最终引用计数归 0,单纯使用 shared_ptr/weak_ptr 则无法保证这点,当一个 server 需要退出时,如果请求仍频繁地到来,对应 Socket 的引用计数可能迟迟无法清 0 而导致 server 无法退出。另外 weak_ptr 无法直接作为 epoll 的 data,而 SocketId 可以。这些因素使我们设计了 Socket。
存储 SocketUniquePtr 还是 SocketId 取决于是否需要强引用。像 Controller 贯穿了 RPC 的整个流程,和 Socket 中的数据有大量交互,它存放的是 SocketUniquePtr。epoll 主要是提醒对应 fd 上发生了事件,如果 Socket 回收了,那这个事件是可有可无的,所以它存放了 SocketId。
由于 SocketUniquePtr 只要有效,其中的数据就不会变,这个机制使用户不用关心麻烦的 race conditon 和 ABA problem,可以放心地对共享的 fd 进行操作。这种方法也规避了隐式的引用计数,内存的 ownership 明确,程序的质量有很好的保证。bRPC 中有大量的 SocketUniquePtr 和 SocketId,它们确实简化了我们的开发。
class BAIDU_CACHELINE_ALIGNMENT Socket {
struct Forbidden {};
public:
Socket(Forbidden); // 禁止用户直接构造,但 ResourcePool 又需要访问 public 的构造函数,私有的 Forbidden 可以实现这个 trick
// Create a Socket according to `options', put the identifier into `id'.
// Returns 0 on sucess, -1 otherwise.
static int Create(const SocketOptions &options, SocketId *id);
// Place the Socket associated with identifier `id' into unique_ptr `ptr',
// which will be released automatically when out of scope (w/o explicit
// std::move). User can still access `ptr' after calling ptr->SetFailed()
// before release of `ptr'.
// This function is wait-free.
// Returns 0 on success, -1 when the Socket was SetFailed().
static int Address(SocketId id, SocketUniquePtr *ptr);
// Mark this Socket or the Socket associated with `id' as failed.
// Any later Address() of the identifier shall return NULL unless the
// Socket was revivied by HealthCheckThread. The Socket is NOT recycled
// after calling this function, instead it will be recycled when no one
// references it. Internal fields of the Socket are still accessible
// after calling this function. Calling SetFailed() of a Socket more
// than once is OK.
// This function is lock-free.
// Returns -1 when the Socket was already SetFailed(), 0 otherwise.
int SetFailed();
private:
// unsigned 32-bit version + signed 32-bit referenced-count.
// Meaning of version:
// * Created version: no SetFailed() is called on the Socket yet. Must be
// same evenness with initial _versioned_ref because during lifetime of
// a Socket on the slot, the version is added with 1 twice. This is
// also the version encoded in SocketId.
// * Failed version: = created version + 1, SetFailed()-ed but returned.
// * Other versions: the socket is already recycled.
// Socket 自身记录的版本+引用计数,首次构造对象时为 0
butil::atomic<uint64_t> _versioned_ref;
// Flag used to mark whether additional reference has been decreased
// by either `SetFailed' or `SetRecycle'
// 回收标记
butil::atomic<bool> _recycle_flag;
};
// SocketId = 32-bit version + 32-bit slot.
// version: from version part of _versioned_nref, must be an EVEN number.
// slot: designated by ResourcePool.
int Socket::Create(const SocketOptions &options, SocketId *id) {
butil::ResourceId<Socket> slot; // 接收返回的资源 ID
Socket *const m = butil::get_resource(&slot, Forbidden());
if (m == NULL) {
LOG(FATAL) << "Fail to get_resource<Socket>";
return -1;
}
g_vars->nsocket << 1;
CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
...
// nref can be non-zero due to concurrent AddressSocket().
// _this_id will only be used in destructor/Destroy of referenced
// slots, which is safe and properly fenced. Although it's better
// to put the id into SocketUniquePtr.
// 构造 SocketId,_versioned_ref 引用计数加 1,首次构造时版本为 0
m->_this_id = MakeSocketId(VersionOfVRef(m->_versioned_ref.fetch_add(
1, butil::memory_order_release)),
slot);
...
*id = m->_this_id; // 返回 socket id
return 0;
}
// Utility functions to combine and extract SocketId.
BUTIL_FORCE_INLINE SocketId MakeSocketId(uint32_t version,
butil::ResourceId<Socket> slot) {
return SocketId((((uint64_t)version) << 32) | slot.value);
}
// Utility functions to combine and extract Socket::_versioned_ref
BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
return (uint32_t)(vref >> 32);
}
BUTIL_FORCE_INLINE butil::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
butil::ResourceId<Socket> id = {(sid & 0xFFFFFFFFul)};
return id;
}
BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
return (int32_t)(vref & 0xFFFFFFFFul);
}
inline int Socket::Address(SocketId id, SocketUniquePtr *ptr) {
// 解析 ResourceId
const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
// 获取对象地址
Socket *const m = address_resource(slot);
if (__builtin_expect(m != NULL, 1)) {
// acquire fence makes sure this thread sees latest changes before
// Dereference() or Revive().
// 引用计数 +1,注意这里返回的 vref1 是自增前的 _versioned_ref
const uint64_t vref1 =
m->_versioned_ref.fetch_add(1, butil::memory_order_acquire);
// 获取版本
const uint32_t ver1 = VersionOfVRef(vref1);
if (ver1 == VersionOfSocketId(id)) {
// 版本与 SocketId 的版本一致则返回成功
ptr->reset(m);
return 0;
}
// 版本不一致,恢复引用计数,返回的 vref2 是前面自增后的 _versioned_ref
const uint64_t vref2 =
m->_versioned_ref.fetch_sub(1, butil::memory_order_release);
// 取出引用数量
const int32_t nref = NRefOfVRef(vref2);
if (nref > 1) {
return -1;
} else if (__builtin_expect(nref == 1, 1)) {
// 无引用,再次读取版本
const uint32_t ver2 = VersionOfVRef(vref2);
// 如果版本是奇数,说明刚进行了 SetFailed 操作
// 并且当前无引用,触发 socket 回收
if ((ver2 & 1)) {
if (ver1 == ver2 || ver1 + 1 == ver2) {
uint64_t expected_vref = vref2 - 1; // 自减后 _versioned_ref 的理论值
// 尝试 CAS,原子地将 _versioned_ref 替换为版本 id_ver + 2,计数 0
if (m->_versioned_ref.compare_exchange_strong(
expected_vref, MakeVRef(ver2 + 1, 0),
butil::memory_order_acquire, butil::memory_order_relaxed)) {
// 成功 CAS 后,执行回收
m->OnRecycle(); // 执行实际的回收操作
return_resource(SlotOfSocketId(id));
}
} else {
CHECK(false) << "ref-version=" << ver1 << " unref-version=" << ver2;
}
} else {
CHECK_EQ(ver1, ver2);
// Addressed a free slot.
}
} else {
CHECK(false) << "Over dereferenced SocketId=" << id;
}
}
return -1;
}
int Socket::SetFailed() { return SetFailed(EFAILEDSOCKET, NULL); }
int Socket::SetFailed(int error_code, const char *error_fmt, ...) {
if (error_code == 0) {
CHECK(false) << "error_code is 0";
error_code = EFAILEDSOCKET;
}
const uint32_t id_ver = VersionOfSocketId(_this_id); // 获取 socket id 上的版本
uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
for (;;) { // need iteration to retry compare_exchange_strong
// version_ref 的版本与 socket id 的版本不一致,返回失败
if (VersionOfVRef(vref) != id_ver) {
return -1;
}
// Try to set version=id_ver+1 (to make later Address() return NULL),
// retry on fail.
// 尝试 CAS,原子地将 _version_ref 替换为版本 id_ver + 1,计数不变
if (_versioned_ref.compare_exchange_strong(
vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
butil::memory_order_release, butil::memory_order_relaxed)) {
// CAS 成功后,原先的 socket id Address 操作会因为版本校验而失败
// _version_ref 的版本也会修改为一个**奇数**
...
// Deref additionally which is added at creation so that this
// Socket's reference will hit 0(recycle) when no one addresses it.
ReleaseAdditionalReference();
// NOTE: This Socket may be recycled at this point, don't
// touch anything.
return 0;
}
}
}
int Socket::ReleaseAdditionalReference() {
bool expect = false;
// Use `relaxed' fence here since `Dereference' has `released' fence
if (_recycle_flag.compare_exchange_strong(expect, true,
butil::memory_order_relaxed,
butil::memory_order_relaxed)) {
return Dereference(); // 引用计数 -1
}
return -1;
}
// socket_id.h
typedef uint64_t SocketId;
const SocketId INVALID_SOCKET_ID = (SocketId)-1;
class Socket;
extern void DereferenceSocket(Socket *);
struct SocketDeleter {
void operator()(Socket *m) const { DereferenceSocket(m); }
};
// RAII,析构时执行 SocketDeleter() -> DereferenceSocket
typedef std::unique_ptr<Socket, SocketDeleter> SocketUniquePtr;
// socket.cpp
void DereferenceSocket(Socket *s) {
if (s) {
// 减少引用计数
s->Dereference();
}
}
inline int Socket::Dereference() {
const SocketId id = _this_id;
// 引用计数 -1
const uint64_t vref =
_versioned_ref.fetch_sub(1, butil::memory_order_release);
const int32_t nref = NRefOfVRef(vref); // 获得计数的值
if (nref > 1) {
// 存在其他引用,直接返回
return 0;
}
if (__builtin_expect(nref == 1, 1)) {
// 无引用,需要回收
const uint32_t ver = VersionOfVRef(vref); // 获取 version_ref 上的版本
const uint32_t id_ver = VersionOfSocketId(id); // 获取 socket id 上的版本
// SetFailed 与 Address 并发时对特殊情况的处理,可以自行看英文注释
// Besides first successful SetFailed() adds 1 to version, one of
// those dereferencing nref from 1->0 adds another 1 to version.
// Notice "one of those": The wait-free Address() may make ref of a
// version-unmatched slot change from 1 to 0 for mutiple times, we
// have to use version as a guard variable to prevent returning the
// Socket to pool more than once.
//
// Note: `ver == id_ver' means this socket has been `SetRecycle'
// before rather than `SetFailed'; `ver == ide_ver+1' means we
// had `SetFailed' this socket before. We should destroy the
// socket under both situation
if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
// sees nref:1->0, try to set version=id_ver+2,--nref.
// No retry: if version changes, the slot is already returned by
// another one who sees nref:1->0 concurrently; if nref changes,
// which must be non-zero, the slot will be returned when
// nref changes from 1->0 again.
// Example:
// SetFailed(): --nref, sees nref:1->0 (1)
// try to set version=id_ver+2 (2)
// Address(): ++nref, unmatched version (3)
// --nref, sees nref:1->0 (4)
// try to set version=id_ver+2 (5)
// 1,2,3,4,5 or 1,3,4,2,5:
// SetFailed() succeeds, Address() fails at (5).
// 1,3,2,4,5: SetFailed() fails with (2), the slot will be
// returned by (5) of Address()
// 1,3,4,5,2: SetFailed() fails with (2), the slot is already
// returned by (5) of Address().
uint64_t expected_vref = vref - 1;
if (_versioned_ref.compare_exchange_strong(
expected_vref, MakeVRef(id_ver + 2, 0),
butil::memory_order_acquire, butil::memory_order_relaxed)) {
OnRecycle(); // 执行实际的回收操作
return_resource(SlotOfSocketId(id));
return 1;
}
return 0;
}
LOG(FATAL) << "Invalid SocketId=" << id;
return -1;
}
LOG(FATAL) << "Over dereferenced SocketId=" << id;
return -1;
}
request
// 创建一个 bthread_id,附带数据为 data,错误处理为 on_error
int bthread_id_create(bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int)) {
return bthread::id_create_impl(
id, data, (on_error ? on_error : bthread::default_bthread_id_on_error),
NULL);
}
static int id_create_impl(bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int),
int (*on_error2)(bthread_id_t, void*, int,
const std::string&)) {
IdResourceId slot;
Id* const meta = get_resource(&slot); // resource pool 获取对象
if (meta) {
meta->data = data;
meta->on_error = on_error;
meta->on_error2 = on_error2;
CHECK(meta->pending_q.empty());
uint32_t* butex = meta->butex;
if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
// Skip 0 so that bthread_id_t is never 0
// avoid overflow to make comparisons simpler.
// butex 的值规避 0
*butex = 1;
}
*meta->join_butex = *butex;
meta->first_ver = *butex;
meta->locked_ver = *butex + 1;
*id = make_id(*butex, slot);
return 0;
}
return ENOMEM;
}
inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
// 版本位于低 32 位,resource_id 使用高 32 位
const bthread_id_t tmp = {(((uint64_t)slot.value) << 32) | (uint64_t)version};
return tmp;
}
// bthread_id_t 实际对应的数据结构
struct BAIDU_CACHELINE_ALIGNMENT Id {
// first_ver ~ locked_ver - 1: unlocked versions
// locked_ver: locked
// unlockable_ver: locked and about to be destroyed
// contended_ver: locked and contended
uint32_t first_ver; // 起始合法版本,first_ver ~ locked_ver - 1 为合法范围
uint32_t locked_ver; // 锁定状态的版本
internal::FastPthreadMutex mutex; // 该结构元数据的锁
void* data;
int (*on_error)(bthread_id_t, void*, int);
int (*on_error2)(bthread_id_t, void*, int, const std::string&);
const char* lock_location;
uint32_t* butex;
uint32_t* join_butex;
SmallQueue<PendingError, 2> pending_q;
Id() {
// Although value of the butex(as version part of bthread_id_t)
// does not matter, we set it to 0 to make program more deterministic.
butex = bthread::butex_create_checked<uint32_t>();
join_butex = bthread::butex_create_checked<uint32_t>();
*butex = 0;
*join_butex = 0;
}
~Id() {
bthread::butex_destroy(butex);
bthread::butex_destroy(join_butex);
}
inline bool has_version(uint32_t id_ver) const {
// 范围内的版本意味着合法。每个 slot 的版本空间是独立的
return id_ver >= first_ver && id_ver < locked_ver;
}
inline uint32_t contended_ver() const { return locked_ver + 1; } // 锁定冲突时的状态
inline uint32_t unlockable_ver() const { return locked_ver + 2; } // 准备析构的状态
inline uint32_t last_ver() const { return unlockable_ver(); } // 销毁状态的版本
// also the next "first_ver"
inline uint32_t end_ver() const { return last_ver() + 1; }
};
// 等待 bthread_id 销毁
int bthread_id_join(bthread_id_t id) {
const bthread::IdResourceId slot = bthread::get_slot(id);
bthread::Id* const meta = address_resource(slot);
if (!meta) {
// The id is not created yet, this join is definitely wrong.
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id); // 获取 id 低 32 位的版本
uint32_t* join_butex = meta->join_butex;
while (1) {
meta->mutex.lock();
const bool has_ver = meta->has_version(id_ver); // 检查当前 id 是否合法
const uint32_t expected_ver = *join_butex;
meta->mutex.unlock();
if (!has_ver) {
break;
}
// 在 join_butex 上等待唤醒
if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
}
return 0;
}
// 解锁并销毁 bthread_id
int bthread_id_unlock_and_destroy(bthread_id_t id) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
uint32_t* join_butex = meta->join_butex;
const uint32_t id_ver = bthread::get_version(id);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
LOG(FATAL) << "Invalid bthread_id=" << id.value;
return EINVAL;
}
if (*butex == meta->first_ver) {
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
const uint32_t next_ver = meta->end_ver();
*butex = next_ver;
*join_butex = next_ver;
meta->first_ver = next_ver; // 赋值后后续的 join 要么版本不合法,要么 join_butex 不一致
meta->locked_ver = next_ver;
meta->pending_q.clear();
meta->mutex.unlock();
// Notice that butex_wake* returns # of woken-up, not successful or not.
bthread::butex_wake_except(butex, 0);
bthread::butex_wake_all(join_butex); // 唤醒等待的 join 函数
return_resource(bthread::get_slot(id)); // 释放资源,原先的 bthread_id 会因为版本原因不再合法
return 0;
}
// 声明 bthread_id 发生错误
int bthread_id_error2_verbose(bthread_id_t id, int error_code,
const std::string& error_text,
const char* location) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
return EINVAL;
}
if (*butex == meta->first_ver) {
// unlock 的状态,则上锁
*butex = meta->locked_ver;
meta->lock_location = location;
meta->mutex.unlock();
// 调用错误处理函数
if (meta->on_error) {
return meta->on_error(id, meta->data, error_code);
} else {
return meta->on_error2(id, meta->data, error_code, error_text);
}
} else {
// lock 的状态,将错误信息加入队列
bthread::PendingError e;
e.id = id;
e.error_code = error_code;
e.error_text = error_text;
e.location = location;
meta->pending_q.push(e);
meta->mutex.unlock();
return 0;
}
}
// 锁定 bthread_id
int bthread_id_lock_and_reset_range_verbose(bthread_id_t id, void** pdata,
int range, const char* location) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* butex = meta->butex;
bool ever_contended = false;
meta->mutex.lock();
while (meta->has_version(id_ver)) {
// 元数据上锁,且版本合法的情况下
if (*butex == meta->first_ver) {
// contended locker always wakes up the butex at unlock.
// 如果处于 unlock 的状态
meta->lock_location = location;
if (range == 0) {
// fast path
} else if (range < 0 || range > bthread::ID_MAX_RANGE ||
range + meta->first_ver <= meta->locked_ver) {
LOG_IF(FATAL, range < 0)
<< "range must be positive, actually " << range;
LOG_IF(FATAL, range > bthread::ID_MAX_RANGE)
<< "max range is " << bthread::ID_MAX_RANGE << ", actually "
<< range;
} else {
// 如果附带了版本修改操作,则修改对应的 locked_ver
meta->locked_ver = meta->first_ver + range;
}
// 如果之前冲突了,则修改为 contended_ver(),这样 unlock 的时候会唤醒其他 bthread_id_lock
*butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
meta->mutex.unlock();
if (pdata) {
*pdata = meta->data;
}
return 0;
} else if (*butex != meta->unlockable_ver()) {
// 如果不是 unlock,也不是准备析构的状态,则表示现在处于 locked 或者 contended,赋值为 contended 状态
*butex = meta->contended_ver();
uint32_t expected_ver = *butex;
meta->mutex.unlock();
ever_contended = true;
if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
// 等待
return errno;
}
meta->mutex.lock();
} else { // bthread_id_about_to_destroy was called. 准备销毁
meta->mutex.unlock();
return EPERM;
}
}
meta->mutex.unlock();
return EINVAL;
}
// 解锁 bthread_id
int bthread_id_unlock(bthread_id_t id) {
bthread::Id* const meta = address_resource(bthread::get_slot(id));
if (!meta) {
return EINVAL;
}
uint32_t* butex = meta->butex;
// Release fence makes sure all changes made before signal visible to
// woken-up waiters.
const uint32_t id_ver = bthread::get_version(id);
meta->mutex.lock();
if (!meta->has_version(id_ver)) {
meta->mutex.unlock();
LOG(FATAL) << "Invalid bthread_id=" << id.value;
return EINVAL;
}
if (*butex == meta->first_ver) {
meta->mutex.unlock();
LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
return EPERM;
}
bthread::PendingError front;
if (meta->pending_q.pop(&front)) {
// 如果已经出错了,直接调用错误处理函数
meta->lock_location = front.location;
meta->mutex.unlock();
if (meta->on_error) {
return meta->on_error(front.id, meta->data, front.error_code);
} else {
return meta->on_error2(front.id, meta->data, front.error_code,
front.error_text);
}
} else {
// 否则唤醒等待的锁定操作
const bool contended = (*butex == meta->contended_ver());
*butex = meta->first_ver;
meta->mutex.unlock();
if (contended) {
// We may wake up already-reused id, but that's OK.
bthread::butex_wake(butex);
}
return 0;
}
}
如官方文档所述,使用 bthread_id
可以解决以下问题:
- 在发送 RPC 过程中 response 回来了,处理 response 的代码和发送代码产生竞争。
- 设置 timer 后很快触发了,超时处理代码和发送代码产生竞争。
- 重试产生的多个 response 同时回来产生的竞争。
- 通过 correlation_id 在 O(1) 时间内找到对应的 RPC 上下文,而无需建立从 correlation_id 到 RPC 上下文的全局哈希表。
- 取消 RPC。
具体实现上,每个 Controller 拥有一个 bthread_id
对象 _correlation_id
,创建时将 data
绑定为 Controller 自身的指针,错误处理使用 HandleSocketFailed
函数:
// controller.cpp
CallId Controller::call_id() {
butil::atomic<uint64_t>* target =
(butil::atomic<uint64_t>*)&_correlation_id.value;
uint64_t loaded = target->load(butil::memory_order_relaxed);
if (loaded) {
const CallId id = { loaded };
return id;
}
// Optimistic locking.
CallId cid = { 0 };
// The range of this id will be reset in Channel::CallMethod
CHECK_EQ(0, bthread_id_create2(&cid, this, HandleSocketFailed));
if (!target->compare_exchange_strong(loaded, cid.value,
butil::memory_order_relaxed)) {
bthread_id_cancel(cid);
cid.value = loaded;
}
return cid;
}
调用前会将 correlation_id
对象的范围改为 max_retry() + 2
,每个版本对应的解释如代码中的注释所示:
// channel.cpp
const CallId correlation_id = cntl->call_id();
const int rc = bthread_id_lock_and_reset_range(
correlation_id, NULL, 2 + cntl->max_retry());
// Make versioned correlation_id.
// call_id : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
// call_id + 1 : first try.
// call_id + 2 : retry 1
// ...
// call_id + N + 1 : retry N
// All ids except call_id are versioned. Say if we've sent retry 1 and
// a failed response of first try comes back, it will be ignored.
当调用 Socket
将请求写入后,Controller 会在 correlation_id
上执行等待:
// channel.cpp
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
...
cntl->IssueRPC(start_send_real_us);
if (done == NULL) {
// MUST wait for response when sending synchronous RPC. It will
// be woken up by callback when RPC finishes (succeeds or still
// fails after retry)
Join(correlation_id); // 等待 correlation_id 销毁
if (cntl->_span) {
cntl->SubmitSpan();
}
cntl->OnRPCEnd(butil::gettimeofday_us());
}
}
当请求成功收到回复时,会按照协议调用对应的处理函数,例如 baidu_std
会调用 ProcessRpcResponse
:
// baidu_rpc_protocol.cpp
void ProcessRpcResponse(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
RpcMeta meta;
// 解析 RPC 元信息
if (!ParsePbFromIOBuf(&meta, msg->meta)) {
LOG(WARNING) << "Fail to parse from response meta";
return;
}
// 读取 correlation_id
const bthread_id_t cid = { static_cast<uint64_t>(meta.correlation_id()) };
Controller* cntl = NULL;
// 对 correlation_id 加锁
const int rc = bthread_id_lock(cid, (void**)&cntl);
...
const int saved_error = cntl->ErrorCode();
accessor.OnResponse(cid, saved_error);
}
// controller_private_accessor.h
class ControllerPrivateAccessor {
public:
void OnResponse(CallId id, int saved_error) {
const Controller::CompletionInfo info = { id, true };
_cntl->OnVersionedRPCReturned(info, false, saved_error);
}
}
// controller.cpp
void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
bool new_bthread, int saved_error) {
...
bthread_id_about_to_destroy(info.id); // 唤醒 join
...
}
当 RPC 过程中发生错误时,比如 Socket
的 KeepWrite
写入失败:
// socket.cpp
void Socket::ReturnFailedWriteRequest(Socket::WriteRequest* p, int error_code,
const std::string& error_text) {
if (!p->reset_pipelined_count_and_user_message()) {
CancelUnwrittenBytes(p->data.size());
}
p->data.clear(); // data is probably not written.
const bthread_id_t id_wait = p->id_wait;
butil::return_object(p);
if (id_wait != INVALID_BTHREAD_ID) {
// id_wait 也就是上面的 correlation_id,bthread_id_error2 会进行上锁,并调用对应的错误处理函数,也就是初始化赋值的 HandleSocketFailed
bthread_id_error2(id_wait, error_code, error_text);
}
}
// controller.cpp
int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code,
const std::string& error_text) {
// 从 data 中回复 controller 指针
Controller* cntl = static_cast<Controller*>(data);
if (!cntl->is_used_by_rpc()) {
// Cannot destroy the call_id before RPC otherwise an async RPC
// using the controller cannot be joined and related resources may be
// destroyed before done->Run() running in another bthread.
// The error set will be detected in Channel::CallMethod and fail
// the RPC.
cntl->SetFailed(error_code, "Cancel call_id=%" PRId64
" before CallMethod()", id.value);
return bthread_id_unlock(id);
}
const int saved_error = cntl->ErrorCode();
if (error_code == ERPCTIMEDOUT) {
cntl->SetFailed(error_code, "Reached timeout=%" PRId64 "ms @%s",
cntl->timeout_ms(),
butil::endpoint2str(cntl->remote_side()).c_str());
} else if (error_code == EBACKUPREQUEST) {
cntl->SetFailed(error_code, "Reached backup timeout=%" PRId64 "ms @%s",
cntl->backup_request_ms(),
butil::endpoint2str(cntl->remote_side()).c_str());
} else if (!error_text.empty()) {
cntl->SetFailed(error_code, "%s", error_text.c_str());
} else {
cntl->SetFailed(error_code, "%s @%s", berror(error_code),
butil::endpoint2str(cntl->remote_side()).c_str());
}
CompletionInfo info = { id, false };
cntl->OnVersionedRPCReturned(info, true, saved_error); // 结束
return 0;
}
correlation_id
也会用作超时的处理:
// channel.cpp
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
...
// Setup timer for RPC timetout
// _deadline_us is for truncating _connect_timeout_ms
cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
const int rc = bthread_timer_add(
&cntl->_timeout_id,
butil::microseconds_to_timespec(cntl->_deadline_us),
HandleTimeout, (void*)correlation_id.value); // 参数使用 correlation_id 的值
...
}
// 超时时调用 HandleTimeout
static void HandleTimeout(void* arg) {
bthread_id_t correlation_id = { (uint64_t)arg };
bthread_id_error(correlation_id, ERPCTIMEDOUT); // 继而调用 HandleSocketFailed
}
// RPC 结束时会删除对应的定时器
void Controller::EndRPC(const CompletionInfo& info) {
if (_timeout_id != 0) {
bthread_timer_del(_timeout_id);
_timeout_id = 0;
}
...
}
bvar
状态信息收集
profiler
https://github.com/apache/incubator-brpc/blob/master/docs/cn/contention_profiler.md
参考链接
https://github.com/apache/incubator-brpc/blob/master/docs/cn/consistent_hashing.md
https://github.com/apache/incubator-brpc/blob/master/docs/cn/io.md
https://github.com/apache/incubator-brpc/blob/master/src/butil/iobuf.h
https://github.com/ronaldo8210/brpc_source_code_analysis/blob/master/docs/client_rpc_exception.md
https://zhuanlan.zhihu.com/p/113427004
https://zhuanlan.zhihu.com/p/169840204
https://sf-zhou.github.io/brpc/brpc_01_bthread.html
https://sf-zhou.github.io/brpc/brpc_02_memory.html
https://sf-zhou.github.io/brpc/brpc_03_socket.html
https://sf-zhou.github.io/brpc/brpc_04_schedule.html
https://sf-zhou.github.io/brpc/brpc_05_request.html