brpc作为一个基础框架,在很多项目中采用,也设计了很多数据结构,这里根据资料总结一下

首先brpc本身的资料就够多了。这里复读一下,总结概念

[toc]

有栈协程

这里有个详细总结 https://github.com/lishaojiang/talkcoroutine

简单了解汇编

函数的传参

函数的传参可以用纯用栈来传,也可以在参数少的情况下直接用寄存器传参,在多的情况,寄存器加栈一起,至于何种方式传参,传参顺序,用那些那种寄存器,只是一种约定,也就fastcall,stdcall,cdecl等。如windows的32平台下,eax常用来作返回什,ecx用作this指针,edx作为第二个参数,linux的32位下依次用用ebx,ecx,edx,但64位下就是RDI、RSI、RDX、RCX、R8、R9;arm64平台依次用x0,x1,x2…x7。实际上至于用什么寄存器传,也是看约定的,不是绝对的,当我们对汇编有很好的撑控能力后,的确可以不遵守,但前提要保证正确。

函数的返回值

函数的返回值,一般来说用eax/rax/x0作返回,但也不是必然的。一个函数也有借用其它寄存器作返回值,如果edx,xmm0。再有就是函数返回值比如是一个结构体,一个寄存器大小无法返回,可能会把返回值地址作隐藏的参数传递,后面说到boost的协程切换函数,就是这样的。

函数的调用与返回

从c/c++层面说函数调用与返回,实际没有太多花样。我们从汇编层面说一下,一般调用都用*call functionA*的形式,返回用ret的形式。首先这里面有一些省略,有一些等价过程,这些等价过程,对我们协程来说不但非常的重要,还要使用这些知识点,这里还涉及栈的push与pop,我们有必要细说一些。

理解Push ,Pop 指令的等效过程

push eax; 
// 等同两条伪指令
1.sub esp,4;   // 将esp减4,因为栈向下生长的,伪指令,仅供理解
2.mov esp,eax; // 将eax 放到esp中,伪指令,仅供理解


pop eax;  
// 等同两条伪指令
1.add esp,4;    // 将esp加4,因为栈向下生长的,伪指令,仅供理解
2.mov eax,esp; // 将esp 放到eax中,伪指令,仅供理解

深入理解call,jmp,ret指令的等效过程

call 内存/立即数/寄储器;
//1.原下个EIP对应指令地址入栈;(esp-4) 
//2.修改EIP为新的; 
//3.跳转EIP执行;

jmp 内存/立即数/寄储器; 
//1.修改EIP为新的; 
//2.跳转EIP执行; 注:无条件执行,不改变栈

ret; 
//1.修改EIP为新的栈顶数据; 
//2.esp +4;(恢复栈) 
//3.跳转EIP执行;

拿到EIP的值

__asm
{
    call NEXT
    NEXT:
    pop eax
}

组件

Work Stealing

核心思路是当本线程内没有待执行的任务时,从其他线程的任务队列中窃取任务执行。首先来看 work stealing 时使用的无锁队列 src/bthread/work_stealing_queue.h

template <typename T>
class WorkStealingQueue {
 public:
  WorkStealingQueue() : _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}

  int init(size_t capacity) {
    if (_capacity != 0) {
      LOG(ERROR) << "Already initialized";
      return -1;
    }
    if (capacity == 0) {
      LOG(ERROR) << "Invalid capacity=" << capacity;
      return -1;
    }
    if (capacity & (capacity - 1)) {
      LOG(ERROR) << "Invalid capacity=" << capacity
                 << " which must be power of 2";
      return -1;
    }
    _buffer = new (std::nothrow) T[capacity];
    if (NULL == _buffer) {
      return -1;
    }
    _capacity = capacity;
    return 0;
  }

  // 从底部追加,非线程安全,与 steal 线程安全
  bool push(const T& x) {
    const size_t b = _bottom.load(butil::memory_order_relaxed);
    const size_t t = _top.load(butil::memory_order_acquire);
    if (b >= t + _capacity) {  // Full queue.
      return false;
    }
    _buffer[b & (_capacity - 1)] = x;
    _bottom.store(b + 1, butil::memory_order_release);
    return true;
  }

  // 从底部弹出,非线程安全,与 steal 线程安全
  bool pop(T* val) {
    const size_t b = _bottom.load(butil::memory_order_relaxed);
    size_t t = _top.load(butil::memory_order_relaxed);
    if (t >= b) {
      // fast check since we call pop() in each sched.
      // Stale _top which is smaller should not enter this branch.
      return false;
    }
    const size_t newb = b - 1;
    _bottom.store(newb, butil::memory_order_relaxed);
    butil::atomic_thread_fence(butil::memory_order_seq_cst);
    t = _top.load(butil::memory_order_relaxed);
    if (t > newb) {
      _bottom.store(b, butil::memory_order_relaxed);
      return false;
    }
    *val = _buffer[newb & (_capacity - 1)];
    if (t != newb) {
      return true;
    }
    // Single last element, compete with steal()
    // 对于最后一个元素,使用 CAS 保证和 steal 并发时的线程安全
    const bool popped = _top.compare_exchange_strong(
        t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);
    _bottom.store(b, butil::memory_order_relaxed);
    return popped;
  }

  // 从顶部窃取,线程安全
  bool steal(T* val) {
    size_t t = _top.load(butil::memory_order_acquire);
    size_t b = _bottom.load(butil::memory_order_acquire);
    if (t >= b) {
      // Permit false negative for performance considerations.
      return false;
    }
    do {
      butil::atomic_thread_fence(butil::memory_order_seq_cst);
      b = _bottom.load(butil::memory_order_acquire);
      if (t >= b) {
        return false;
      }
      *val = _buffer[t & (_capacity - 1)];
      // CAS 保证线程安全
    } while (!_top.compare_exchange_strong(
        t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed));
    return true;
  }

 private:
  // Copying a concurrent structure makes no sense.
  DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);

  butil::atomic<size_t> _bottom;
  size_t _capacity;
  T* _buffer;
  butil::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top;  // 分开到两个 CacheLine
};

Futex & Butex

用futex api来实现butex

按理说,mutex也是futex实现的。可能是粒度不用,所以能比mutex快,butex只阻塞bthread而不是整个thread,怎么做到?

// sys_futex.h
inline int futex_wait_private(void *addr1, int expected,
                              const timespec *timeout) {
  // 当 *addr1 == expected 时,线程挂起等待
  return syscall(SYS_futex, addr1, (FUTEX_WAIT | FUTEX_PRIVATE_FLAG), expected,
                 timeout, NULL, 0);
}

inline int futex_wake_private(void *addr1, int nwake) {
  // 唤醒 nwake 个等待的线程
  return syscall(SYS_futex, addr1, (FUTEX_WAKE | FUTEX_PRIVATE_FLAG), nwake,
                 NULL, NULL, 0);
}

class FastPthreadMutex {
public:
  FastPthreadMutex() : _futex(0) {}
  ~FastPthreadMutex() {}
  void lock();
  void unlock();
  bool try_lock();

private:
  DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
  int lock_contended();
  unsigned _futex;
};


// mutex.cpp
// Implement bthread_mutex_t related functions
struct MutexInternal {
  butil::static_atomic<unsigned char> locked;
  butil::static_atomic<unsigned char> contended;
  unsigned short padding;
};

const MutexInternal MUTEX_CONTENDED_RAW = 1, 1, 0;
const MutexInternal MUTEX_LOCKED_RAW = 1, 0, 0;
// Define as macros rather than constants which can't be put in read-only
// section and affected by initialization-order fiasco.
#define BTHREAD_MUTEX_CONTENDED                                                \
  (*(const unsigned *)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned *)&bthread::MUTEX_LOCKED_RAW)

int FastPthreadMutex::lock_contended() {
  butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
  // 将状态原子修改为加锁 + 等待
  // 如果这期间锁被释放了,那么该函数直接退出
  while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
    // 否则调用 futex 尝试挂起当前线程,并且要求状态为加锁 + 等待(因为中间锁可能会被释放)
    if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
        errno != EWOULDBLOCK) {
      return errno;
    }
    // futex_wait_private 非 EWOULDBLOCK 失败的情况下,继续循环重试
  }
  return 0;
}

void FastPthreadMutex::lock() {
  bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
  // 加锁,如果 locked 原先的值为 0,lock 直接返回成功
  if (split->locked.exchange(1, butil::memory_order_acquire)) {
    // 如果已经是加锁状态,那么进入锁竞争的处理
    (void)lock_contended();
  }
}

bool FastPthreadMutex::try_lock() {
  bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
  return !split->locked.exchange(1, butil::memory_order_acquire);
}

void FastPthreadMutex::unlock() {
  butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
  const unsigned prev = whole->exchange(0, butil::memory_order_release);
  // CAUTION: the mutex may be destroyed, check comments before butex_create
  if (prev != BTHREAD_MUTEX_LOCKED) {
    // 如果有挂起等待的线程,执行唤醒
    futex_wake_private(whole, 1);
  }
}

实现分析

// butex.h
// Create a butex which is a futex-like 32-bit primitive for synchronizing
// bthreads/pthreads.
// Returns a pointer to 32-bit data, NULL on failure.
// NOTE: all butexes are private(not inter-process).
void *butex_create();

// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void *butex);

// Atomically wait on |butex| if *butex equals |expected_value|, until the
// butex is woken up by butex_wake*, or CLOCK_REALTIME reached |abstime| if
// abstime is not NULL.
// About |abstime|:
//   Different from FUTEX_WAIT, butex_wait uses absolute time.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void *butex, int expected_value, const timespec *abstime);


// butex.cpp
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
  // tids of pthreads are 0
  bthread_t tid;

  // Erasing node from middle of LinkedList is thread-unsafe, we need
  // to hold its container's lock.
  butil::atomic<Butex *> container;
};

struct ButexBthreadWaiter : public ButexWaiter {
  TaskMeta *task_meta;
  TimerThread::TaskId sleep_id;
  WaiterState waiter_state;
  int expected_value;
  Butex *initial_butex;
  TaskControl *control;
};

// 双向链表
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;

struct BAIDU_CACHELINE_ALIGNMENT Butex {
  Butex() {}
  ~Butex() {}

  butil::atomic<int> value;
  ButexWaiterList waiters;
  internal::FastPthreadMutex waiter_lock;
};

// 注意 value 的地址与 butex 对象一致,方便后面转换
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);

void *butex_create() {
  Butex *b = butil::get_object<Butex>();
  if (b) {
    return &b->value;
  }
  return NULL;
}

int butex_wait(void *arg, int expected_value, const timespec *abstime) {
  Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
  if (b->value.load(butil::memory_order_relaxed) != expected_value) {
    errno = EWOULDBLOCK;
    // Sometimes we may take actions immediately after unmatched butex,
    // this fence makes sure that we see changes before changing butex.
    butil::atomic_thread_fence(butil::memory_order_acquire);
    return -1;
  }
  TaskGroup *g = tls_task_group;
  if (NULL == g || g->is_current_pthread_task()) {
    return butex_wait_from_pthread(g, b, expected_value, abstime);
  }
  // 在栈上创建 waiter 对象
  ButexBthreadWaiter bbw;
  // tid is 0 iff the thread is non-bthread
  bbw.tid = g->current_tid();
  bbw.container.store(NULL, butil::memory_order_relaxed);
  bbw.task_meta = g->current_task();
  bbw.sleep_id = 0;
  bbw.waiter_state = WAITER_STATE_READY;
  bbw.expected_value = expected_value;
  bbw.initial_butex = b;
  bbw.control = g->control();

  if (abstime != NULL) {
    ...  // 先忽略对时间的处理
  }

  // release fence matches with acquire fence in interrupt_and_consume_waiters
  // in task_group.cpp to guarantee visibility of `interrupted'.
  bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
  g->set_remained(wait_for_butex, &bbw);  // 在执行下个 bthread 前执行 wait_for_butex
  TaskGroup::sched(&g);  // 主动 Yield

  // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
  // running and using bbw. The chance is small, just spin until it's done.
  BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
               30 /*nops before sched_yield*/);

  // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
  // Spin until current_waiter != NULL.
  BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
                   NULL, butil::memory_order_acquire) == NULL,
               30 /*nops before sched_yield*/);

  bool is_interrupted = false;
  if (bbw.task_meta->interrupted) {
    // Race with set and may consume multiple interruptions, which are OK.
    bbw.task_meta->interrupted = false;
    is_interrupted = true;
  }
  // If timed out as well as value unmatched, return ETIMEDOUT.
  if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
    errno = ETIMEDOUT;
    return -1;
  } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
    errno = EWOULDBLOCK;
    return -1;
  } else if (is_interrupted) {
    errno = EINTR;
    return -1;
  }
  return 0;
}

static void wait_for_butex(void *arg) {
  ButexBthreadWaiter *const bw = static_cast<ButexBthreadWaiter *>(arg);
  Butex *const b = bw->initial_butex;
  // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
  //    before they're queued, otherwise the waiter is already timedout
  //    and removed by TimerThread, in which case we should stop queueing.
  //
  // Visibility of waiter_state:
  //    [bthread]                         [TimerThread]
  //    waiter_state = TIMED
  //    tt_lock { add task }
  //                                      tt_lock { get task }
  //                                      waiter_lock { waiter_state=TIMEDOUT }
  //    waiter_lock { use waiter_state }
  // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
  // sequenced by two locks, both threads are guaranteed to see the correct
  // value.
  {
    BAIDU_SCOPED_LOCK(b->waiter_lock);
    // 加锁后,校验 value 和 expected_value 是否相等
    if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
      bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
    } else if (bw->waiter_state == WAITER_STATE_READY /*1*/ &&
               !bw->task_meta->interrupted) {
      b->waiters.Append(bw);  // 加入 waiters 队列
      bw->container.store(b, butil::memory_order_relaxed);
      return;
    }
  }

  // b->container is NULL which makes erase_from_butex_and_wakeup() and
  // TaskGroup::interrupt() no-op, there's no race between following code and
  // the two functions. The on-stack ButexBthreadWaiter is safe to use and
  // bw->waiter_state will not change again.
  // 如果没有成功 wait,则清理定时器,并切回原协程
  unsleep_if_necessary(bw, get_global_timer_thread());
  tls_task_group->ready_to_run(bw->tid);
  // FIXME: jump back to original thread is buggy.

  // // Value unmatched or waiter is already woken up by TimerThread, jump
  // // back to original bthread.
  // TaskGroup* g = tls_task_group;
  // ReadyToRunArgs args = { g->current_tid(), false };
  // g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
  // // 2: Don't run remained because we're already in a remained function
  // //    otherwise stack may overflow.
  // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}

int butex_wake(void *arg) {
  Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
  ButexWaiter *front = NULL;
  {
    BAIDU_SCOPED_LOCK(b->waiter_lock);
    if (b->waiters.empty()) {
      return 0;
    }
    front = b->waiters.head()->value();
    front->RemoveFromList();  // 从队列中取出等待的 waiter
    front->container.store(NULL, butil::memory_order_relaxed);
  }
  if (front->tid == 0) {
    wakeup_pthread(static_cast<ButexPthreadWaiter *>(front));
    return 1;
  }
  ButexBthreadWaiter *bbw = static_cast<ButexBthreadWaiter *>(front);
  unsleep_if_necessary(bbw, get_global_timer_thread());
  TaskGroup *g = tls_task_group;
  if (g) {
    TaskGroup::exchange(&g, bbw->tid);  // 如果在 bthread 环境中,直接唤醒
  } else {
    // 否认触发重新调度
    bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
  }
  return 1;
}

DoublyBufferedData

其实这是一个双buffer切换实现方案

双buffer切换要点就是切换时机,如果能确定业务的访问周期,那自然定时调用就行了,根本没有什么锁的开销。如果业务侧不确定,那就需要锁

简单实现就是读写锁。锁住,更新index,写锁会阻塞所有的读。DoublyBufferedData的设计考虑因为一把锁阻塞所有读写,考虑引入N+1把锁,写写阻塞,写等待所有读锁释放,降低临界区

template <typename T, typename TLS>
template <typename Fn>
size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
    // _modify_mutex sequences modifications. Using a separate mutex rather
    // than _wrappers_mutex is to avoid blocking threads calling
    // AddWrapper() or RemoveWrapper() too long. Most of the time, modifications
    // are done by one thread, contention should be negligible.
    BAIDU_SCOPED_LOCK(_modify_mutex);
    int bg_index = !_index.load(butil::memory_order_relaxed);
    // background instance is not accessed by other threads, being safe to
    // modify.
    const size_t ret = fn(_data[bg_index]);
    if (!ret) {
        return 0;
    }

    // Publish, flip background and foreground.
    // The release fence matches with the acquire fence in UnsafeRead() to
    // make readers which just begin to read the new foreground instance see
    // all changes made in fn.
    _index.store(bg_index, butil::memory_order_release);
    bg_index = !bg_index;
    
    // Wait until all threads finishes current reading. When they begin next
    // read, they should see updated _index.
    {
        BAIDU_SCOPED_LOCK(_wrappers_mutex);
        for (size_t i = 0; i < _wrappers.size(); ++i) {
            _wrappers[i]->WaitReadDone();
        }
    }

    const size_t ret2 = fn(_data[bg_index]);
    CHECK_EQ(ret2, ret) << "index=" << _index.load(butil::memory_order_relaxed);
    return ret2;
}

设计的很远。代码走读可以看这篇文章,写得不错

ParkingLot

// parking_lot.h
// Park idle workers.
class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
public:
  class State {
  public:
    State() : val(0) {}
    bool stopped() const { return val & 1; }

  private:
    friend class ParkingLot;
    State(int val) : val(val) {}
    int val;
  };

  ParkingLot() : _pending_signal(0) {}

  // Wake up at most `num_task' workers.
  // Returns #workers woken up.
  // 唤醒至多 num_task 个等待的线程
  // 注意 _pending_signal 增加了 num_task * 2,因为最后一位用来表示是否 stop
  int signal(int num_task) {
    _pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
    return futex_wake_private(&_pending_signal, num_task);
  }

  // Get a state for later wait().
  State get_state() {
    return _pending_signal.load(butil::memory_order_acquire);
  }

  // Wait for tasks.
  // If the `expected_state' does not match, wait() may finish directly.
  // 工作线程尝试挂起等待,此时会检查 _pending_signal 的数值,若不一致则挂起,继续执行
  void wait(const State &expected_state) {
    futex_wait_private(&_pending_signal, expected_state.val, NULL);
  }

  // Wakeup suspended wait() and make them unwaitable ever.
  void stop() {
    _pending_signal.fetch_or(1);
    // 将 _pending_signal 设为 stop,唤醒所有等待的工作线程,触发结束
    futex_wake_private(&_pending_signal, 10000);
  }

private:
  // higher 31 bits for signalling, LSB for stopping.
  butil::atomic<int> _pending_signal;
};

TaskControl 中包含一组 ParkingLot 对象,TaskGroup 初始化时会哈希打散使用这些对象:

// task_control.h
class TaskControl {
  ...
  static const int PARKING_LOT_NUM = 4;
  ParkingLot _pl[PARKING_LOT_NUM];
};

// task_group.cpp
TaskGroup::TaskGroup(TaskControl *c)
    : _cur_meta(NULL), _control(c), _num_nosignal(0), _nsignaled(0),
      _last_run_ns(butil::cpuwide_time_ns()), _cumulated_cputime_ns(0),
      _nswitch(0), _last_context_remained(NULL),
      _last_context_remained_arg(NULL), _pl(NULL), _main_stack(NULL),
      _main_tid(0), _remote_num_nosignal(0), _remote_nsignaled(0) {
  _steal_seed = butil::fast_rand();
  _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
  // 根据线程号哈希到某个 ParkingLot 对象上
  _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) %
                TaskControl::PARKING_LOT_NUM];
  CHECK(c);
}

当前工作线程在窃取任务前会先取一次 ParkingLot 的状态,当状态发生改变时会直接跳过 wait

bool TaskGroup::wait_task(bthread_t *tid) {
  do {
    if (_last_pl_state.stopped()) {
      // 如果已经 stop
      return false;
    }
    // 根据当前的 _last_pl_state 状态判断是否挂起
    _pl->wait(_last_pl_state);
    if (steal_task(tid)) {
      return true;
    }
  } while (true);
}

bool TaskGroup::steal_task(bthread_t *tid) {
  if (_remote_rq.pop(tid)) {
    return true;
  }
  // _last_pl_state 的状态在这里进行更新
  // _last_pl_state 发生变化时 wait_task 中的 wait 会直接跳过
  // 这里容忍 false postive,增加的开销是多一次尝试 steal_task
  _last_pl_state = _pl->get_state();
  return _control->steal_task(tid, &_steal_seed, _steal_offset);
}

当有新任务加入时,则调用 TaskControl::signal_task 通过 ParkingLog 唤醒等待的工作线程:

void TaskControl::signal_task(int num_task) {
  if (num_task <= 0) {
    return;
  }
  // TODO(gejun): Current algorithm does not guarantee enough threads will
  // be created to match caller's requests. But in another side, there's also
  // many useless signalings according to current impl. Capping the concurrency
  // is a good balance between performance and timeliness of scheduling.
  // 上方的官方注释也写明了,限制并发可以更好地平衡性能和调度及时性,这里直接将唤醒的上限设为 2
  if (num_task > 2) {
    num_task = 2;
  }
  int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
  num_task -= _pl[start_index].signal(1);  // 通过 ParkingLog 唤醒
  if (num_task > 0) {
    for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
      if (++start_index >= PARKING_LOT_NUM) {
        start_index = 0;
      }
      
      num_task -= _pl[start_index].signal(1);
    }
  }
  if (num_task > 0 &&
      FLAGS_bthread_min_concurrency >
          0 && // test min_concurrency for performance
      _concurrency.load(butil::memory_order_relaxed) <
          FLAGS_bthread_concurrency) {
    // TODO: Reduce this lock
    // 当仍然有需要执行的任务时,尝试增加工作线程
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
    if (_concurrency.load(butil::memory_order_acquire) <
        FLAGS_bthread_concurrency) {
      add_workers(1);
    }
  }
}

void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
  push_rq(tid);
  if (nosignal) {
    ++_num_nosignal;  // 对于设定了不触发信号的任务,仅增加计数
  } else {
    const int additional_signal = _num_nosignal;
    _num_nosignal = 0;
    _nsignaled += 1 + additional_signal;
    // 调用 signal_task 唤醒
    _control->signal_task(1 + additional_signal);
  }
}

// 将还没有触发信号的任务统一触发信号(工作线程内调用)
void TaskGroup::flush_nosignal_tasks() {
  const int val = _num_nosignal;
  if (val) {
    _num_nosignal = 0;
    _nsignaled += val;
    _control->signal_task(val);
  }
}

// 从工作线程外提交任务
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
  _remote_rq._mutex.lock();  // 加锁
  while (!_remote_rq.push_locked(tid)) {
    flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
    LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
                            << _remote_rq.capacity();
    ::usleep(1000);
    _remote_rq._mutex.lock();
  }
  if (nosignal) {
    ++_remote_num_nosignal;
    _remote_rq._mutex.unlock();
  } else {
    const int additional_signal = _remote_num_nosignal;
    _remote_num_nosignal = 0;
    _remote_nsignaled += 1 + additional_signal;
    _remote_rq._mutex.unlock();  // 锁的生命周期内也保护了计数相关变量
    _control->signal_task(1 + additional_signal);
  }
}

void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
  const int val = _remote_num_nosignal;
  if (!val) {
    locked_mutex.unlock();
    return;
  }
  _remote_num_nosignal = 0;
  _remote_nsignaled += val;
  locked_mutex.unlock();
  _control->signal_task(val);
}

Timer Thread

struct TimerThreadOptions {
  // Scheduling requests are hashed into different bucket to improve
  // scalability. However bigger num_buckets may NOT result in more scalable
  // schedule() because bigger values also make each buckets more sparse
  // and more likely to lock the global mutex. You better not change
  // this value, just leave it to us.
  // Default: 13
  size_t num_buckets;

  // If this field is not empty, some bvar for reporting stats of TimerThread
  // will be exposed with this prefix.
  // Default: ""
  std::string bvar_prefix;

  // Constructed with default options.
  TimerThreadOptions();
};

// TimerThread is a separate thread to run scheduled tasks at specific time.
// At most one task runs at any time, don't put time-consuming code in the
// callback otherwise the task may delay other tasks significantly.
class TimerThread {
public:
  struct Task;
  class Bucket;

  typedef uint64_t TaskId;
  const static TaskId INVALID_TASK_ID;

  TimerThread();
  ~TimerThread();

  // Start the timer thread.
  // This method should only be called once.
  // return 0 if success, errno otherwise.
  // 启动 TimerThread 线程
  int start(const TimerThreadOptions *options);

  // Stop the timer thread. Later schedule() will return INVALID_TASK_ID.
  // 结束 TimerThread 线程
  void stop_and_join();

  // Schedule |fn(arg)| to run at realtime |abstime| approximately.
  // Returns: identifier of the scheduled task, INVALID_TASK_ID on error.
  // 增加一个定时器任务
  TaskId schedule(void (*fn)(void *), void *arg, const timespec &abstime);

  // Prevent the task denoted by `task_id' from running. `task_id' must be
  // returned by schedule() ever.
  // Returns:
  //   0   -  Removed the task which does not run yet
  //  -1   -  The task does not exist.
  //   1   -  The task is just running.
  // 取消一个定时器任务
  int unschedule(TaskId task_id);

  // Get identifier of internal pthread.
  // Returns (pthread_t)0 if start() is not called yet.
  pthread_t thread_id() const { return _thread; }

private:
  // the timer thread will run this method.
  void run();  // TimerThread 线程函数
  static void *run_this(void *arg);

  bool _started; // whether the timer thread was started successfully.
  butil::atomic<bool> _stop;

  TimerThreadOptions _options;
  Bucket *_buckets;                  // list of tasks to be run
  internal::FastPthreadMutex _mutex; // protect _nearest_run_time
  int64_t _nearest_run_time;
  // the futex for wake up timer thread. can't use _nearest_run_time because
  // it's 64-bit.
  int _nsignals;
  pthread_t _thread; // all scheduled task will be run on this thread
};

void TimerThread::run() {
  int64_t last_sleep_time = butil::gettimeofday_us();

  // min heap of tasks (ordered by run_time)
  // TimerThread 线程内通过最小堆维护定时任务序列
  std::vector<Task *> tasks;
  tasks.reserve(4096);

  while (!_stop.load(butil::memory_order_relaxed)) {
    {
      BAIDU_SCOPED_LOCK(_mutex);
      _nearest_run_time = std::numeric_limits<int64_t>::max();
    }

    // 从所有的 Bucket 的获取任务
    for (size_t i = 0; i < _options.num_buckets; ++i) {
      Bucket &bucket = _buckets[i];
      for (Task *p = bucket.consume_tasks(); p != nullptr; ++nscheduled) {
        Task *next_task = p->next;

        // 对于已经取消的任务,不会加入到堆里,直接删除。这也是高性能的关键
        if (!p->try_delete()) {
          // 循环加入到堆里
          tasks.push_back(p);
          std::push_heap(tasks.begin(), tasks.end(), task_greater);
        }
        p = next_task;
      }
    }

    bool pull_again = false;
    while (!tasks.empty()) {
      // 从最小堆中取出任务
      Task *task1 = tasks[0];
      if (task1->try_delete()) { // already unscheduled
        std::pop_heap(tasks.begin(), tasks.end(), task_greater);
        tasks.pop_back();
        continue;
      }
      // 判断是否到时间了,没有则退出
      if (butil::gettimeofday_us() < task1->run_time) { // not ready yet.
        break;
      }

      {
        BAIDU_SCOPED_LOCK(_mutex);
        if (task1->run_time > _nearest_run_time) {
          // 检查当前的 _nearest_run_time,确认是否有更新的任务
          // 有的话则再次从 Bucket 中拉取
          pull_again = true;
          break;
        }
      }
      std::pop_heap(tasks.begin(), tasks.end(), task_greater);
      tasks.pop_back();
      // 执行定时任务并删除
      if (task1->run_and_delete()) {
        ++ntriggered;
      }
    }
    if (pull_again) {
      BT_VLOG << "pull again, tasks=" << tasks.size();
      continue;
    }

    // The realtime to wait for.
    int64_t next_run_time = std::numeric_limits<int64_t>::max();
    if (tasks.empty()) {
      next_run_time = std::numeric_limits<int64_t>::max();
    } else {
      next_run_time = tasks[0]->run_time;
    }
    // Similarly with the situation before running tasks, we check
    // _nearest_run_time to prevent us from waiting on a non-earliest
    // task. We also use the _nsignal to make sure that if new task
    // is earlier that the realtime that we wait for, we'll wake up.
    int expected_nsignals = 0;
    {
      BAIDU_SCOPED_LOCK(_mutex);
      if (next_run_time > _nearest_run_time) {
        // a task is earlier that what we would wait for.
        // We need to check buckets.
        continue;
      } else {
        _nearest_run_time = next_run_time;
        expected_nsignals = _nsignals;
      }
    }
    timespec *ptimeout = NULL;
    timespec next_timeout = {0, 0};
    const int64_t now = butil::gettimeofday_us();
    if (next_run_time != std::numeric_limits<int64_t>::max()) {
      next_timeout = butil::microseconds_to_timespec(next_run_time - now);
      ptimeout = &next_timeout;
    }
    busy_seconds += (now - last_sleep_time) / 1000000.0;
    // 计算需要等待的时间,通过 futex 等待
    futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
    last_sleep_time = butil::gettimeofday_us();
  }
  BT_VLOG << "Ended TimerThread=" << pthread_self();
}

TimerThread::Task *TimerThread::Bucket::consume_tasks() {
  Task *head = NULL;
  if (_task_head) { // NOTE: schedule() and consume_tasks() are sequenced
    // by TimerThread._nearest_run_time and fenced by TimerThread._mutex.
    // We can avoid touching the mutex and related cacheline when the
    // bucket is actually empty.
    BAIDU_SCOPED_LOCK(_mutex);
    if (_task_head) {
      // 获取任务时直接将链表指针传回
      head = _task_head;
      _task_head = NULL;
      _nearest_run_time = std::numeric_limits<int64_t>::max();
    }
  }
  return head;
}

bool TimerThread::Task::run_and_delete() {
  const uint32_t id_version = version_of_task_id(task_id);
  uint32_t expected_version = id_version;
  // This CAS is rarely contended, should be fast.
  // 通过 CAS 判定当前任务是否还需要做
  if (version.compare_exchange_strong(expected_version, id_version + 1,
                                      butil::memory_order_relaxed)) {
    fn(arg);  // 执行定时任务
    // The release fence is paired with acquire fence in
    // TimerThread::unschedule to make changes of fn(arg) visible.
    version.store(id_version + 2, butil::memory_order_release);
    butil::return_resource(slot_of_task_id(task_id));
    return true;
  } else if (expected_version == id_version + 2) {
    // 对于取消的任务直接归还资源
    butil::return_resource(slot_of_task_id(task_id));
    return false;
  } else {
    // Impossible.
    LOG(ERROR) << "Invalid version=" << expected_version << ", expecting "
               << id_version + 2;
    return false;
  }
}

TimerThread::TaskId TimerThread::schedule(void (*fn)(void *), void *arg,
                                          const timespec &abstime) {
  if (_stop.load(butil::memory_order_relaxed) || !_started) {
    // Not add tasks when TimerThread is about to stop.
    return INVALID_TASK_ID;
  }
  // Hashing by pthread id is better for cache locality.
  // 新增任务时分片到不同的 Bucket 中
  const Bucket::ScheduleResult result =
      _buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]
          .schedule(fn, arg, abstime);
  // 如果有更早的唤醒时间
  if (result.earlier) {
    bool earlier = false;
    const int64_t run_time = butil::timespec_to_microseconds(abstime);
    {
      BAIDU_SCOPED_LOCK(_mutex);
      if (run_time < _nearest_run_time) {
        _nearest_run_time = run_time;
        ++_nsignals;
        earlier = true;
      }
    }
    if (earlier) {
      // 使用 futex 唤醒
      futex_wake_private(&_nsignals, 1);
    }
  }
  return result.task_id;
}

TimerThread::Bucket::schedule(void (*fn)(void *), void *arg,
                              const timespec &abstime) {
  butil::ResourceId<Task> slot_id;
  Task *task = butil::get_resource<Task>(&slot_id);
  if (task == NULL) {
    ScheduleResult result = {INVALID_TASK_ID, false};
    return result;
  }
  task->next = NULL;
  task->fn = fn;
  task->arg = arg;
  task->run_time = butil::timespec_to_microseconds(abstime);
  uint32_t version = task->version.load(butil::memory_order_relaxed);
  if (version == 0) { // skip 0.
    // 分配的版本总是跳过 INVALID_TASK_ID
    task->version.fetch_add(2, butil::memory_order_relaxed);
    version = 2;
  }
  const TaskId id = make_task_id(slot_id, version);
  task->task_id = id;
  bool earlier = false;
  {
    BAIDU_SCOPED_LOCK(_mutex);
    // 加锁的临界区很短
    task->next = _task_head;
    _task_head = task;
    if (task->run_time < _nearest_run_time) {
      _nearest_run_time = task->run_time;
      earlier = true;
    }
  }
  ScheduleResult result = {id, earlier};
  return result;
}

TimerThread 有效的原因有以下几点:

  1. Bucket 锁内链表增加任务的操作是 O(1)\mathcal{O}(1)O(1) 的,临界区短
  2. RPC 场景下超时时间一般不变,大部分插入的时间是递增的,早于 nearest_run_time 而唤醒线程的次数很少
  3. 通过 ResourcePool 和版本进行删除操作,不参与全局竞争
  4. TimerThread 自行维护小顶堆,不参与全局竞争
  5. TimerThread 醒来的频率大约是 RPC 超时的倒数,比如超时时间 100ms,TimerThread 一秒内大约醒 10 次,已经最优

bthread 依赖 TimerThread 实现高效的 usleep

// Suspend current thread for at least `microseconds'
// Interruptible by bthread_interrupt().
extern int bthread_usleep(uint64_t microseconds);

int bthread_usleep(uint64_t microseconds) {
  bthread::TaskGroup *g = bthread::tls_task_group;
  if (NULL != g && !g->is_current_pthread_task()) {
    return bthread::TaskGroup::usleep(&g, microseconds);
  }
  return ::usleep(microseconds);
}

// To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup **pg, uint64_t timeout_us) {
  if (0 == timeout_us) {
    yield(pg);
    return 0;
  }
  TaskGroup *g = *pg;
  // We have to schedule timer after we switched to next bthread otherwise
  // the timer may wake up(jump to) current still-running context.
  // 将定时任务打包为 remained 函数
  SleepArgs e = {timeout_us, g->current_tid(), g->current_task(), g};
  g->set_remained(_add_sleep_event, &e);
  sched(pg);  // 当前协程出让
  g = *pg;
  e.meta->current_sleep = 0;
  if (e.meta->interrupted) {
    // Race with set and may consume multiple interruptions, which are OK.
    e.meta->interrupted = false;
    // NOTE: setting errno to ESTOP is not necessary from bthread's
    // pespective, however many RPC code expects bthread_usleep to set
    // errno to ESTOP when the thread is stopping, and print FATAL
    // otherwise. To make smooth transitions, ESTOP is still set instead
    // of EINTR when the thread is stopping.
    errno = (e.meta->stop ? ESTOP : EINTR);
    return -1;
  }
  return 0;
}

void TaskGroup::_add_sleep_event(void *void_args) {
  // Must copy SleepArgs. After calling TimerThread::schedule(), previous
  // thread may be stolen by a worker immediately and the on-stack SleepArgs
  // will be gone.
  SleepArgs e = *static_cast<SleepArgs *>(void_args);
  TaskGroup *g = e.group;

  // 增加定时任务
  TimerThread::TaskId sleep_id;
  sleep_id = get_global_timer_thread()->schedule(
      ready_to_run_from_timer_thread, void_args,
      butil::microseconds_from_now(e.timeout_us));
  ...
}

static void ready_to_run_from_timer_thread(void *arg) {
  CHECK(tls_task_group == NULL);
  const SleepArgs *e = static_cast<const SleepArgs *>(arg);
  // 到时间后,根据 tid 将任务重新加入队列
  e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
}

iobuf

作为收发处理的对象,要小,轻量,尽可能零拷贝,总得比std::any std::shared_ptr<void>要强

核心思想,引用计数+分块分片

class IOBuf {
// iobuf.h
// IOBuf is a non-continuous buffer that can be cut and combined w/o copying
// payload. It can be read from or flushed into file descriptors as well.
// IOBuf is [thread-compatible]. Namely using different IOBuf in different
// threads simultaneously is safe, and reading a static IOBuf from different
// threads is safe as well.
// IOBuf is [NOT thread-safe]. Modifying a same IOBuf from different threads
// simultaneously is unsafe and likely to crash.
class IOBuf {
 public:
  static const size_t DEFAULT_BLOCK_SIZE = 8192;  // block 默认大小 8KB

  // can't directly use `struct iovec' here because we also need to access the
  // reference counter(nshared) in Block*
  struct BlockRef {
    // NOTICE: first bit of `offset' is shared with BigView::start
    uint32_t offset;  // block 上的偏移,这里保证 offset <= max_int
    uint32_t length;  // 和长度
    Block *block;
  };

  // IOBuf is essentially a tiny queue of BlockRefs.
  struct SmallView {
    BlockRef refs[2];  // 引用少于两个 block
  };

  // SmallView 与 BigView 大小相同
  struct BigView {
    int32_t magic;
    uint32_t start;  // 起始的 block 下标
    BlockRef *refs;  // block 列表
    uint32_t nref;   // block 列表大小
    uint32_t cap_mask;
    size_t nbytes;  // 当前总长度

    const BlockRef &ref_at(uint32_t i) const {
      return refs[(start + i) & cap_mask];
    }

    BlockRef &ref_at(uint32_t i) { return refs[(start + i) & cap_mask]; }

    uint32_t capacity() const { return cap_mask + 1; }  // block 列表容量
  };

  bool _small() const { return _bv.magic >= 0; }  // 小于 0 时为 BigView

  size_t length() const {
    return _small() ? (_sv.refs[0].length + _sv.refs[1].length) : _bv.nbytes;
  }

 private:
  union {
    BigView _bv;  // _bv.magic 与 _sv.refs[0].offset 地址相同,小于 0 时表示 BigView
    SmallView _sv;
  };
};


// iobuf.cpp,看一下 append 的流程
void IOBuf::append(const IOBuf &other) {
  const size_t nref = other._ref_num();
  for (size_t i = 0; i < nref; ++i) {
    _push_back_ref(other._ref_at(i));
  }
}

// 引用的 Block 数量
inline size_t IOBuf::_ref_num() const {
  return _small() ? (!!_sv.refs[0].block + !!_sv.refs[1].block) : _bv.nref;
}

inline void IOBuf::_push_back_ref(const BlockRef &r) {
  if (_small()) {
    return _push_or_move_back_ref_to_smallview<false>(r);
  } else {
    return _push_or_move_back_ref_to_bigview<false>(r);
  }
}

template <bool MOVE>
void IOBuf::_push_or_move_back_ref_to_bigview(const BlockRef &r) {
  BlockRef &back = _bv.ref_at(_bv.nref - 1);
  if (back.block == r.block && back.offset + back.length == r.offset) {
    // Merge ref
    back.length += r.length;
    _bv.nbytes += r.length;
    if (MOVE) {
      r.block->dec_ref();
    }
    return;
  }

  if (_bv.nref != _bv.capacity()) {
    // block 列表有容量的情况下,直接增加对该 block 的引用
    _bv.ref_at(_bv.nref++) = r;
    _bv.nbytes += r.length;  // 长度叠加
    if (!MOVE) {
      r.block->inc_ref();
    }
    return;
  }
  // resize, don't modify bv until new_refs is fully assigned
  // 当前 block 列表容量不足,倍增申请
  const uint32_t new_cap = _bv.capacity() * 2;
  BlockRef *new_refs = iobuf::acquire_blockref_array(new_cap);
  for (uint32_t i = 0; i < _bv.nref; ++i) {
    // 复制原 block
    new_refs[i] = _bv.ref_at(i);
  }
  new_refs[_bv.nref++] = r;

  // Change other variables
  _bv.start = 0;
  // 删除原 block 列表
  iobuf::release_blockref_array(_bv.refs, _bv.capacity());
  _bv.refs = new_refs;
  _bv.cap_mask = new_cap - 1;
  _bv.nbytes += r.length;
  if (!MOVE) {
    r.block->inc_ref();
  }
}

// Block 的实现
void *(*blockmem_allocate)(size_t) = ::malloc;
void (*blockmem_deallocate)(void *) = ::free;

const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1;
typedef void (*UserDataDeleter)(void *);

struct UserDataExtension {
  UserDataDeleter deleter;
};

struct IOBuf::Block {
  butil::atomic<int> nshared;  // 引用计数
  uint16_t flags;  // 是否使用自定义 deleter,可以看下方的英文注释
  uint16_t abi_check; // original cap, never be zero.
  uint32_t size;
  uint32_t cap;
  Block *portal_next;
  // When flag is 0, data points to `size` bytes starting at
  // `(char*)this+sizeof(Block)' When flag & IOBUF_BLOCK_FLAGS_USER_DATA is
  // non-0, data points to the user data and the deleter is put in
  // UserDataExtension at `(char*)this+sizeof(Block)'
  char *data;

  Block(char *data_in, uint32_t data_size)
      : nshared(1), flags(0), abi_check(0), size(0), cap(data_size),
        portal_next(NULL), data(data_in) {
    iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
    iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
                                butil::memory_order_relaxed);
  }

  Block(char *data_in, uint32_t data_size, UserDataDeleter deleter)
      : nshared(1), flags(IOBUF_BLOCK_FLAGS_USER_DATA), abi_check(0),
        size(data_size), cap(data_size), portal_next(NULL), data(data_in) {
    get_user_data_extension()->deleter = deleter;  // 自定义 deleter
  }

  // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
  UserDataExtension *get_user_data_extension() {
    char *p = (char *)this;
    return (UserDataExtension *)(p + sizeof(Block));  // 当前内存之后的 4 字节
  }

  void inc_ref() {
    check_abi();
    nshared.fetch_add(1, butil::memory_order_relaxed);
  }

  void dec_ref() {
    check_abi();
    if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
      butil::atomic_thread_fence(butil::memory_order_acquire);
      if (!flags) {
        iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
        iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
                                    butil::memory_order_relaxed);
        this->~Block();  // 调用析构函数,不删除内存
        iobuf::blockmem_deallocate(this);  // 删除内存
      } else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
        get_user_data_extension()->deleter(data);  // 执行自定义删除
        this->~Block();
        free(this);
      }
    }
  }

  int ref_count() const { return nshared.load(butil::memory_order_relaxed); }

  bool full() const { return size >= cap; }
  size_t left_space() const { return cap - size; }
};

// 创建 block,同时存储 Block 对象和实际数据
inline IOBuf::Block *create_block(const size_t block_size) {
  if (block_size > 0xFFFFFFFFULL) {
    LOG(FATAL) << "block_size=" << block_size << " is too large";
    return NULL;
  }
  char *mem = (char *)iobuf::blockmem_allocate(block_size);
  if (mem == NULL) {
    return NULL;
  }
  return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
                                block_size - sizeof(IOBuf::Block));
}

// 追加用户数据,所有权取决于传入的 deleter
int IOBuf::append_user_data(void *data, size_t size, void (*deleter)(void *)) {
  if (size > 0xFFFFFFFFULL - 100) {
    LOG(FATAL) << "data_size=" << size << " is too large";
    return -1;
  }
  char *mem = (char *)malloc(sizeof(IOBuf::Block) + sizeof(UserDataExtension));
  if (mem == NULL) {
    return -1;
  }
  if (deleter == NULL) {
    deleter = ::free;
  }
  IOBuf::Block *b = new (mem) IOBuf::Block((char *)data, size, deleter);
  const IOBuf::BlockRef r = {0, b->cap, b};
  _move_back_ref(r);
  return 0;
}

这里有个简单的代码走读

socket

这里设计成shared_ptr和weak_ptr的形式了,不过客制化了一些

Socket 独有的 SetFailed 可以在需要时确保 Socket 不能被继续 Address 而最终引用计数归 0,单纯使用 shared_ptr/weak_ptr 则无法保证这点,当一个 server 需要退出时,如果请求仍频繁地到来,对应 Socket 的引用计数可能迟迟无法清 0 而导致 server 无法退出。另外 weak_ptr 无法直接作为 epoll 的 data,而 SocketId 可以。这些因素使我们设计了 Socket。

存储 SocketUniquePtr 还是 SocketId 取决于是否需要强引用。像 Controller 贯穿了 RPC 的整个流程,和 Socket 中的数据有大量交互,它存放的是 SocketUniquePtr。epoll 主要是提醒对应 fd 上发生了事件,如果 Socket 回收了,那这个事件是可有可无的,所以它存放了 SocketId。

由于 SocketUniquePtr 只要有效,其中的数据就不会变,这个机制使用户不用关心麻烦的 race conditon 和 ABA problem,可以放心地对共享的 fd 进行操作。这种方法也规避了隐式的引用计数,内存的 ownership 明确,程序的质量有很好的保证。bRPC 中有大量的 SocketUniquePtr 和 SocketId,它们确实简化了我们的开发。

class BAIDU_CACHELINE_ALIGNMENT Socket {
  struct Forbidden {};

 public:
  Socket(Forbidden);  // 禁止用户直接构造,但 ResourcePool 又需要访问 public 的构造函数,私有的 Forbidden 可以实现这个 trick

  // Create a Socket according to `options', put the identifier into `id'.
  // Returns 0 on sucess, -1 otherwise.
  static int Create(const SocketOptions &options, SocketId *id);

  // Place the Socket associated with identifier `id' into unique_ptr `ptr',
  // which will be released automatically when out of scope (w/o explicit
  // std::move). User can still access `ptr' after calling ptr->SetFailed()
  // before release of `ptr'.
  // This function is wait-free.
  // Returns 0 on success, -1 when the Socket was SetFailed().
  static int Address(SocketId id, SocketUniquePtr *ptr);

  // Mark this Socket or the Socket associated with `id' as failed.
  // Any later Address() of the identifier shall return NULL unless the
  // Socket was revivied by HealthCheckThread. The Socket is NOT recycled
  // after calling this function, instead it will be recycled when no one
  // references it. Internal fields of the Socket are still accessible
  // after calling this function. Calling SetFailed() of a Socket more
  // than once is OK.
  // This function is lock-free.
  // Returns -1 when the Socket was already SetFailed(), 0 otherwise.
  int SetFailed();

 private:
  // unsigned 32-bit version + signed 32-bit referenced-count.
  // Meaning of version:
  // * Created version: no SetFailed() is called on the Socket yet. Must be
  //   same evenness with initial _versioned_ref because during lifetime of
  //   a Socket on the slot, the version is added with 1 twice. This is
  //   also the version encoded in SocketId.
  // * Failed version: = created version + 1, SetFailed()-ed but returned.
  // * Other versions: the socket is already recycled.
  // Socket 自身记录的版本+引用计数,首次构造对象时为 0
  butil::atomic<uint64_t> _versioned_ref;

  // Flag used to mark whether additional reference has been decreased
  // by either `SetFailed' or `SetRecycle'
  // 回收标记
  butil::atomic<bool> _recycle_flag;
};

// SocketId = 32-bit version + 32-bit slot.
//   version: from version part of _versioned_nref, must be an EVEN number.
//   slot: designated by ResourcePool.
int Socket::Create(const SocketOptions &options, SocketId *id) {
  butil::ResourceId<Socket> slot;  // 接收返回的资源 ID
  Socket *const m = butil::get_resource(&slot, Forbidden());
  if (m == NULL) {
    LOG(FATAL) << "Fail to get_resource<Socket>";
    return -1;
  }
  g_vars->nsocket << 1;
  CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
  ...
  // nref can be non-zero due to concurrent AddressSocket().
  // _this_id will only be used in destructor/Destroy of referenced
  // slots, which is safe and properly fenced. Although it's better
  // to put the id into SocketUniquePtr.
  // 构造 SocketId,_versioned_ref 引用计数加 1,首次构造时版本为 0
  m->_this_id = MakeSocketId(VersionOfVRef(m->_versioned_ref.fetch_add(
                                 1, butil::memory_order_release)),
                             slot);
  ...
  *id = m->_this_id;  // 返回 socket id
  return 0;
}

// Utility functions to combine and extract SocketId.
BUTIL_FORCE_INLINE SocketId MakeSocketId(uint32_t version,
                                         butil::ResourceId<Socket> slot) {
  return SocketId((((uint64_t)version) << 32) | slot.value);
}

// Utility functions to combine and extract Socket::_versioned_ref
BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
  return (uint32_t)(vref >> 32);
}

BUTIL_FORCE_INLINE butil::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
  butil::ResourceId<Socket> id = {(sid & 0xFFFFFFFFul)};
  return id;
}

BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
  return (int32_t)(vref & 0xFFFFFFFFul);
}

inline int Socket::Address(SocketId id, SocketUniquePtr *ptr) {
  // 解析 ResourceId
  const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
  // 获取对象地址
  Socket *const m = address_resource(slot);
  if (__builtin_expect(m != NULL, 1)) {
    // acquire fence makes sure this thread sees latest changes before
    // Dereference() or Revive().
    // 引用计数 +1,注意这里返回的 vref1 是自增前的 _versioned_ref
    const uint64_t vref1 =
        m->_versioned_ref.fetch_add(1, butil::memory_order_acquire);
    // 获取版本
    const uint32_t ver1 = VersionOfVRef(vref1);
    if (ver1 == VersionOfSocketId(id)) {
      // 版本与 SocketId 的版本一致则返回成功
      ptr->reset(m);
      return 0;
    }

    // 版本不一致,恢复引用计数,返回的 vref2 是前面自增后的 _versioned_ref
    const uint64_t vref2 =
        m->_versioned_ref.fetch_sub(1, butil::memory_order_release);
    // 取出引用数量
    const int32_t nref = NRefOfVRef(vref2);
    if (nref > 1) {
      return -1;
    } else if (__builtin_expect(nref == 1, 1)) {
      // 无引用,再次读取版本
      const uint32_t ver2 = VersionOfVRef(vref2);
      // 如果版本是奇数,说明刚进行了 SetFailed 操作
      // 并且当前无引用,触发 socket 回收
      if ((ver2 & 1)) {
        if (ver1 == ver2 || ver1 + 1 == ver2) {
          uint64_t expected_vref = vref2 - 1;  // 自减后 _versioned_ref 的理论值
          // 尝试 CAS,原子地将 _versioned_ref 替换为版本 id_ver + 2,计数 0
          if (m->_versioned_ref.compare_exchange_strong(
                  expected_vref, MakeVRef(ver2 + 1, 0),
                  butil::memory_order_acquire, butil::memory_order_relaxed)) {
            // 成功 CAS 后,执行回收
            m->OnRecycle();  // 执行实际的回收操作
            return_resource(SlotOfSocketId(id));
          }
        } else {
          CHECK(false) << "ref-version=" << ver1 << " unref-version=" << ver2;
        }
      } else {
        CHECK_EQ(ver1, ver2);
        // Addressed a free slot.
      }
    } else {
      CHECK(false) << "Over dereferenced SocketId=" << id;
    }
  }
  return -1;
}

int Socket::SetFailed() { return SetFailed(EFAILEDSOCKET, NULL); }

int Socket::SetFailed(int error_code, const char *error_fmt, ...) {
  if (error_code == 0) {
    CHECK(false) << "error_code is 0";
    error_code = EFAILEDSOCKET;
  }
  const uint32_t id_ver = VersionOfSocketId(_this_id);  // 获取 socket id 上的版本
  uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
  for (;;) { // need iteration to retry compare_exchange_strong
    // version_ref 的版本与 socket id 的版本不一致,返回失败
    if (VersionOfVRef(vref) != id_ver) {
      return -1;
    }
    // Try to set version=id_ver+1 (to make later Address() return NULL),
    // retry on fail.
    // 尝试 CAS,原子地将 _version_ref 替换为版本 id_ver + 1,计数不变
    if (_versioned_ref.compare_exchange_strong(
            vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
            butil::memory_order_release, butil::memory_order_relaxed)) {
      // CAS 成功后,原先的 socket id Address 操作会因为版本校验而失败
      // _version_ref 的版本也会修改为一个**奇数**
      ...

      // Deref additionally which is added at creation so that this
      // Socket's reference will hit 0(recycle) when no one addresses it.
      ReleaseAdditionalReference();
      // NOTE: This Socket may be recycled at this point, don't
      // touch anything.
      return 0;
    }
  }
}

int Socket::ReleaseAdditionalReference() {
  bool expect = false;
  // Use `relaxed' fence here since `Dereference' has `released' fence
  if (_recycle_flag.compare_exchange_strong(expect, true,
                                            butil::memory_order_relaxed,
                                            butil::memory_order_relaxed)) {
    return Dereference();  // 引用计数 -1
  }
  return -1;
}

// socket_id.h
typedef uint64_t SocketId;

const SocketId INVALID_SOCKET_ID = (SocketId)-1;

class Socket;

extern void DereferenceSocket(Socket *);

struct SocketDeleter {
  void operator()(Socket *m) const { DereferenceSocket(m); }
};

// RAII,析构时执行 SocketDeleter() -> DereferenceSocket
typedef std::unique_ptr<Socket, SocketDeleter> SocketUniquePtr;

// socket.cpp
void DereferenceSocket(Socket *s) {
  if (s) {
    // 减少引用计数
    s->Dereference();
  }
}

inline int Socket::Dereference() {
  const SocketId id = _this_id;
  // 引用计数 -1
  const uint64_t vref =
      _versioned_ref.fetch_sub(1, butil::memory_order_release);
  const int32_t nref = NRefOfVRef(vref);  // 获得计数的值
  if (nref > 1) {
    // 存在其他引用,直接返回
    return 0;
  }
  if (__builtin_expect(nref == 1, 1)) {
    // 无引用,需要回收
    const uint32_t ver = VersionOfVRef(vref);  // 获取 version_ref 上的版本
    const uint32_t id_ver = VersionOfSocketId(id);  // 获取 socket id 上的版本
    // SetFailed 与 Address 并发时对特殊情况的处理,可以自行看英文注释
    // Besides first successful SetFailed() adds 1 to version, one of
    // those dereferencing nref from 1->0 adds another 1 to version.
    // Notice "one of those": The wait-free Address() may make ref of a
    // version-unmatched slot change from 1 to 0 for mutiple times, we
    // have to use version as a guard variable to prevent returning the
    // Socket to pool more than once.
    //
    // Note: `ver == id_ver' means this socket has been `SetRecycle'
    // before rather than `SetFailed'; `ver == ide_ver+1' means we
    // had `SetFailed' this socket before. We should destroy the
    // socket under both situation
    if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
      // sees nref:1->0, try to set version=id_ver+2,--nref.
      // No retry: if version changes, the slot is already returned by
      // another one who sees nref:1->0 concurrently; if nref changes,
      // which must be non-zero, the slot will be returned when
      // nref changes from 1->0 again.
      // Example:
      //   SetFailed(): --nref, sees nref:1->0           (1)
      //                try to set version=id_ver+2      (2)
      //    Address():  ++nref, unmatched version        (3)
      //                --nref, sees nref:1->0           (4)
      //                try to set version=id_ver+2      (5)
      // 1,2,3,4,5 or 1,3,4,2,5:
      //            SetFailed() succeeds, Address() fails at (5).
      // 1,3,2,4,5: SetFailed() fails with (2), the slot will be
      //            returned by (5) of Address()
      // 1,3,4,5,2: SetFailed() fails with (2), the slot is already
      //            returned by (5) of Address().
      uint64_t expected_vref = vref - 1;
      if (_versioned_ref.compare_exchange_strong(
              expected_vref, MakeVRef(id_ver + 2, 0),
              butil::memory_order_acquire, butil::memory_order_relaxed)) {
        OnRecycle();  // 执行实际的回收操作
        return_resource(SlotOfSocketId(id));
        return 1;
      }
      return 0;
    }
    LOG(FATAL) << "Invalid SocketId=" << id;
    return -1;
  }
  LOG(FATAL) << "Over dereferenced SocketId=" << id;
  return -1;
}

request

// 创建一个 bthread_id,附带数据为 data,错误处理为 on_error
int bthread_id_create(bthread_id_t* id, void* data,
                      int (*on_error)(bthread_id_t, void*, int)) {
  return bthread::id_create_impl(
      id, data, (on_error ? on_error : bthread::default_bthread_id_on_error),
      NULL);
}

static int id_create_impl(bthread_id_t* id, void* data,
                          int (*on_error)(bthread_id_t, void*, int),
                          int (*on_error2)(bthread_id_t, void*, int,
                                           const std::string&)) {
  IdResourceId slot;
  Id* const meta = get_resource(&slot);  // resource pool 获取对象
  if (meta) {
    meta->data = data;
    meta->on_error = on_error;
    meta->on_error2 = on_error2;
    CHECK(meta->pending_q.empty());
    uint32_t* butex = meta->butex;
    if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
      // Skip 0 so that bthread_id_t is never 0
      // avoid overflow to make comparisons simpler.
      // butex 的值规避 0
      *butex = 1;
    }
    *meta->join_butex = *butex;
    meta->first_ver = *butex;
    meta->locked_ver = *butex + 1;
    *id = make_id(*butex, slot);
    return 0;
  }
  return ENOMEM;
}

inline bthread_id_t make_id(uint32_t version, IdResourceId slot) {
  // 版本位于低 32 位,resource_id 使用高 32 位
  const bthread_id_t tmp = {(((uint64_t)slot.value) << 32) | (uint64_t)version};
  return tmp;
}

// bthread_id_t 实际对应的数据结构
struct BAIDU_CACHELINE_ALIGNMENT Id {
  // first_ver ~ locked_ver - 1: unlocked versions
  // locked_ver: locked
  // unlockable_ver: locked and about to be destroyed
  // contended_ver: locked and contended
  uint32_t first_ver;  // 起始合法版本,first_ver ~ locked_ver - 1 为合法范围
  uint32_t locked_ver;  // 锁定状态的版本
  internal::FastPthreadMutex mutex;  // 该结构元数据的锁
  void* data;
  int (*on_error)(bthread_id_t, void*, int);
  int (*on_error2)(bthread_id_t, void*, int, const std::string&);
  const char* lock_location;
  uint32_t* butex;
  uint32_t* join_butex;
  SmallQueue<PendingError, 2> pending_q;

  Id() {
    // Although value of the butex(as version part of bthread_id_t)
    // does not matter, we set it to 0 to make program more deterministic.
    butex = bthread::butex_create_checked<uint32_t>();
    join_butex = bthread::butex_create_checked<uint32_t>();
    *butex = 0;
    *join_butex = 0;
  }

  ~Id() {
    bthread::butex_destroy(butex);
    bthread::butex_destroy(join_butex);
  }

  inline bool has_version(uint32_t id_ver) const {
    // 范围内的版本意味着合法。每个 slot 的版本空间是独立的
    return id_ver >= first_ver && id_ver < locked_ver;
  }
  inline uint32_t contended_ver() const { return locked_ver + 1; }  // 锁定冲突时的状态
  inline uint32_t unlockable_ver() const { return locked_ver + 2; }  // 准备析构的状态
  inline uint32_t last_ver() const { return unlockable_ver(); }  // 销毁状态的版本

  // also the next "first_ver"
  inline uint32_t end_ver() const { return last_ver() + 1; }
};

// 等待 bthread_id 销毁
int bthread_id_join(bthread_id_t id) {
  const bthread::IdResourceId slot = bthread::get_slot(id);
  bthread::Id* const meta = address_resource(slot);
  if (!meta) {
    // The id is not created yet, this join is definitely wrong.
    return EINVAL;
  }
  const uint32_t id_ver = bthread::get_version(id);  // 获取 id 低 32 位的版本
  uint32_t* join_butex = meta->join_butex;
  while (1) {
    meta->mutex.lock();
    const bool has_ver = meta->has_version(id_ver);  // 检查当前 id 是否合法
    const uint32_t expected_ver = *join_butex;
    meta->mutex.unlock();
    if (!has_ver) {
      break;
    }
    // 在 join_butex 上等待唤醒
    if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
        errno != EWOULDBLOCK && errno != EINTR) {
      return errno;
    }
  }
  return 0;
}

// 解锁并销毁 bthread_id
int bthread_id_unlock_and_destroy(bthread_id_t id) {
  bthread::Id* const meta = address_resource(bthread::get_slot(id));
  if (!meta) {
    return EINVAL;
  }
  uint32_t* butex = meta->butex;
  uint32_t* join_butex = meta->join_butex;
  const uint32_t id_ver = bthread::get_version(id);
  meta->mutex.lock();
  if (!meta->has_version(id_ver)) {
    meta->mutex.unlock();
    LOG(FATAL) << "Invalid bthread_id=" << id.value;
    return EINVAL;
  }
  if (*butex == meta->first_ver) {
    meta->mutex.unlock();
    LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
    return EPERM;
  }
  const uint32_t next_ver = meta->end_ver();
  *butex = next_ver;
  *join_butex = next_ver;
  meta->first_ver = next_ver;  // 赋值后后续的 join 要么版本不合法,要么 join_butex 不一致
  meta->locked_ver = next_ver;
  meta->pending_q.clear();
  meta->mutex.unlock();
  // Notice that butex_wake* returns # of woken-up, not successful or not.
  bthread::butex_wake_except(butex, 0);
  bthread::butex_wake_all(join_butex);  // 唤醒等待的 join 函数
  return_resource(bthread::get_slot(id));  // 释放资源,原先的 bthread_id 会因为版本原因不再合法
  return 0;
}

// 声明 bthread_id 发生错误
int bthread_id_error2_verbose(bthread_id_t id, int error_code,
                              const std::string& error_text,
                              const char* location) {
  bthread::Id* const meta = address_resource(bthread::get_slot(id));
  if (!meta) {
    return EINVAL;
  }
  const uint32_t id_ver = bthread::get_version(id);
  uint32_t* butex = meta->butex;
  meta->mutex.lock();
  if (!meta->has_version(id_ver)) {
    meta->mutex.unlock();
    return EINVAL;
  }
  if (*butex == meta->first_ver) {
    // unlock 的状态,则上锁
    *butex = meta->locked_ver;
    meta->lock_location = location;
    meta->mutex.unlock();
    // 调用错误处理函数
    if (meta->on_error) {
      return meta->on_error(id, meta->data, error_code);
    } else {
      return meta->on_error2(id, meta->data, error_code, error_text);
    }
  } else {
    // lock 的状态,将错误信息加入队列
    bthread::PendingError e;
    e.id = id;
    e.error_code = error_code;
    e.error_text = error_text;
    e.location = location;
    meta->pending_q.push(e);
    meta->mutex.unlock();
    return 0;
  }
}

// 锁定 bthread_id
int bthread_id_lock_and_reset_range_verbose(bthread_id_t id, void** pdata,
                                            int range, const char* location) {
  bthread::Id* const meta = address_resource(bthread::get_slot(id));
  if (!meta) {
    return EINVAL;
  }
  const uint32_t id_ver = bthread::get_version(id);
  uint32_t* butex = meta->butex;
  bool ever_contended = false;
  meta->mutex.lock();
  while (meta->has_version(id_ver)) {
    // 元数据上锁,且版本合法的情况下
    if (*butex == meta->first_ver) {
      // contended locker always wakes up the butex at unlock.
      // 如果处于 unlock 的状态
      meta->lock_location = location;
      if (range == 0) {
        // fast path
      } else if (range < 0 || range > bthread::ID_MAX_RANGE ||
                 range + meta->first_ver <= meta->locked_ver) {
        LOG_IF(FATAL, range < 0)
            << "range must be positive, actually " << range;
        LOG_IF(FATAL, range > bthread::ID_MAX_RANGE)
            << "max range is " << bthread::ID_MAX_RANGE << ", actually "
            << range;
      } else {
        // 如果附带了版本修改操作,则修改对应的 locked_ver
        meta->locked_ver = meta->first_ver + range;
      }
      // 如果之前冲突了,则修改为 contended_ver(),这样 unlock 的时候会唤醒其他 bthread_id_lock
      *butex = (ever_contended ? meta->contended_ver() : meta->locked_ver);
      meta->mutex.unlock();
      if (pdata) {
        *pdata = meta->data;
      }
      return 0;
    } else if (*butex != meta->unlockable_ver()) {
      // 如果不是 unlock,也不是准备析构的状态,则表示现在处于 locked 或者 contended,赋值为 contended 状态
      *butex = meta->contended_ver();
      uint32_t expected_ver = *butex;
      meta->mutex.unlock();
      ever_contended = true;
      if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
          errno != EWOULDBLOCK && errno != EINTR) {
        // 等待
        return errno;
      }
      meta->mutex.lock();
    } else {  // bthread_id_about_to_destroy was called. 准备销毁
      meta->mutex.unlock();
      return EPERM;
    }
  }
  meta->mutex.unlock();
  return EINVAL;
}

// 解锁 bthread_id
int bthread_id_unlock(bthread_id_t id) {
  bthread::Id* const meta = address_resource(bthread::get_slot(id));
  if (!meta) {
    return EINVAL;
  }
  uint32_t* butex = meta->butex;
  // Release fence makes sure all changes made before signal visible to
  // woken-up waiters.
  const uint32_t id_ver = bthread::get_version(id);
  meta->mutex.lock();
  if (!meta->has_version(id_ver)) {
    meta->mutex.unlock();
    LOG(FATAL) << "Invalid bthread_id=" << id.value;
    return EINVAL;
  }
  if (*butex == meta->first_ver) {
    meta->mutex.unlock();
    LOG(FATAL) << "bthread_id=" << id.value << " is not locked!";
    return EPERM;
  }
  bthread::PendingError front;
  if (meta->pending_q.pop(&front)) {
    // 如果已经出错了,直接调用错误处理函数
    meta->lock_location = front.location;
    meta->mutex.unlock();
    if (meta->on_error) {
      return meta->on_error(front.id, meta->data, front.error_code);
    } else {
      return meta->on_error2(front.id, meta->data, front.error_code,
                             front.error_text);
    }
  } else {
    // 否则唤醒等待的锁定操作
    const bool contended = (*butex == meta->contended_ver());
    *butex = meta->first_ver;
    meta->mutex.unlock();
    if (contended) {
      // We may wake up already-reused id, but that's OK.
      bthread::butex_wake(butex);
    }
    return 0;
  }
}

官方文档所述,使用 bthread_id 可以解决以下问题:

  1. 在发送 RPC 过程中 response 回来了,处理 response 的代码和发送代码产生竞争。
  2. 设置 timer 后很快触发了,超时处理代码和发送代码产生竞争。
  3. 重试产生的多个 response 同时回来产生的竞争。
  4. 通过 correlation_id 在 O(1) 时间内找到对应的 RPC 上下文,而无需建立从 correlation_id 到 RPC 上下文的全局哈希表。
  5. 取消 RPC。

具体实现上,每个 Controller 拥有一个 bthread_id 对象 _correlation_id,创建时将 data 绑定为 Controller 自身的指针,错误处理使用 HandleSocketFailed 函数:

// controller.cpp
CallId Controller::call_id() {
    butil::atomic<uint64_t>* target =
        (butil::atomic<uint64_t>*)&_correlation_id.value;
    uint64_t loaded = target->load(butil::memory_order_relaxed);
    if (loaded) {
        const CallId id = { loaded };
        return id;
    }
    // Optimistic locking.
    CallId cid = { 0 };
    // The range of this id will be reset in Channel::CallMethod
    CHECK_EQ(0, bthread_id_create2(&cid, this, HandleSocketFailed));
    if (!target->compare_exchange_strong(loaded, cid.value,
                                         butil::memory_order_relaxed)) {
        bthread_id_cancel(cid);
        cid.value = loaded;
    }
    return cid;
}

调用前会将 correlation_id 对象的范围改为 max_retry() + 2,每个版本对应的解释如代码中的注释所示:

// channel.cpp
const CallId correlation_id = cntl->call_id();
const int rc = bthread_id_lock_and_reset_range(
                    correlation_id, NULL, 2 + cntl->max_retry());

// Make versioned correlation_id.
// call_id         : unversioned, mainly for ECANCELED and ERPCTIMEDOUT
// call_id + 1     : first try.
// call_id + 2     : retry 1
// ...
// call_id + N + 1 : retry N
// All ids except call_id are versioned. Say if we've sent retry 1 and
// a failed response of first try comes back, it will be ignored.

当调用 Socket 将请求写入后,Controller 会在 correlation_id 上执行等待:

// channel.cpp
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
                         google::protobuf::RpcController* controller_base,
                         const google::protobuf::Message* request,
                         google::protobuf::Message* response,
                         google::protobuf::Closure* done) {
  ...
  cntl->IssueRPC(start_send_real_us);
  if (done == NULL) {
    // MUST wait for response when sending synchronous RPC. It will
    // be woken up by callback when RPC finishes (succeeds or still
    // fails after retry)
    Join(correlation_id);  // 等待 correlation_id 销毁
    if (cntl->_span) {
      cntl->SubmitSpan();
    }
    cntl->OnRPCEnd(butil::gettimeofday_us());
  }
}

当请求成功收到回复时,会按照协议调用对应的处理函数,例如 baidu_std 会调用 ProcessRpcResponse

// baidu_rpc_protocol.cpp
void ProcessRpcResponse(InputMessageBase* msg_base) {
  const int64_t start_parse_us = butil::cpuwide_time_us();
  DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
  RpcMeta meta;
  // 解析 RPC 元信息
  if (!ParsePbFromIOBuf(&meta, msg->meta)) {
    LOG(WARNING) << "Fail to parse from response meta";
    return;
  }

  // 读取 correlation_id
  const bthread_id_t cid = { static_cast<uint64_t>(meta.correlation_id()) };
  Controller* cntl = NULL;
  // 对 correlation_id 加锁
  const int rc = bthread_id_lock(cid, (void**)&cntl);
  ...
  const int saved_error = cntl->ErrorCode();
  accessor.OnResponse(cid, saved_error);
}

// controller_private_accessor.h
class ControllerPrivateAccessor {
public:
    void OnResponse(CallId id, int saved_error) {
        const Controller::CompletionInfo info = { id, true };
        _cntl->OnVersionedRPCReturned(info, false, saved_error);
    }
}

// controller.cpp
void Controller::OnVersionedRPCReturned(const CompletionInfo& info,
                                        bool new_bthread, int saved_error) {
  ...
  bthread_id_about_to_destroy(info.id);  // 唤醒 join
  ...
}

当 RPC 过程中发生错误时,比如 SocketKeepWrite 写入失败:

// socket.cpp
void Socket::ReturnFailedWriteRequest(Socket::WriteRequest* p, int error_code,
                                      const std::string& error_text) {
  if (!p->reset_pipelined_count_and_user_message()) {
    CancelUnwrittenBytes(p->data.size());
  }
  p->data.clear();  // data is probably not written.
  const bthread_id_t id_wait = p->id_wait;
  butil::return_object(p);
  if (id_wait != INVALID_BTHREAD_ID) {
    // id_wait 也就是上面的 correlation_id,bthread_id_error2 会进行上锁,并调用对应的错误处理函数,也就是初始化赋值的 HandleSocketFailed
    bthread_id_error2(id_wait, error_code, error_text);
  }
}

// controller.cpp
int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code,
                                   const std::string& error_text) {
  // 从 data 中回复 controller 指针
  Controller* cntl = static_cast<Controller*>(data);
  if (!cntl->is_used_by_rpc()) {
    // Cannot destroy the call_id before RPC otherwise an async RPC
    // using the controller cannot be joined and related resources may be
    // destroyed before done->Run() running in another bthread.
    // The error set will be detected in Channel::CallMethod and fail
    // the RPC.
    cntl->SetFailed(error_code, "Cancel call_id=%" PRId64
                    " before CallMethod()", id.value);
    return bthread_id_unlock(id);
  }
  const int saved_error = cntl->ErrorCode();
  if (error_code == ERPCTIMEDOUT) {
    cntl->SetFailed(error_code, "Reached timeout=%" PRId64 "ms @%s",
                    cntl->timeout_ms(),
                    butil::endpoint2str(cntl->remote_side()).c_str());
  } else if (error_code == EBACKUPREQUEST) {
    cntl->SetFailed(error_code, "Reached backup timeout=%" PRId64 "ms @%s",
                    cntl->backup_request_ms(),
                    butil::endpoint2str(cntl->remote_side()).c_str());
  } else if (!error_text.empty()) {
    cntl->SetFailed(error_code, "%s", error_text.c_str());
  } else {
    cntl->SetFailed(error_code, "%s @%s", berror(error_code),
                    butil::endpoint2str(cntl->remote_side()).c_str());
  }
  CompletionInfo info = { id, false };
  cntl->OnVersionedRPCReturned(info, true, saved_error);  // 结束
  return 0;
}

correlation_id 也会用作超时的处理:

// channel.cpp
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
                         google::protobuf::RpcController* controller_base,
                         const google::protobuf::Message* request,
                         google::protobuf::Message* response,
                         google::protobuf::Closure* done) {
  ...
  // Setup timer for RPC timetout

  // _deadline_us is for truncating _connect_timeout_ms
  cntl->_deadline_us = cntl->timeout_ms() * 1000L + start_send_real_us;
  const int rc = bthread_timer_add(
    &cntl->_timeout_id,
    butil::microseconds_to_timespec(cntl->_deadline_us),
    HandleTimeout, (void*)correlation_id.value);  // 参数使用 correlation_id 的值
  ...
}

// 超时时调用 HandleTimeout
static void HandleTimeout(void* arg) {
    bthread_id_t correlation_id = { (uint64_t)arg };
    bthread_id_error(correlation_id, ERPCTIMEDOUT);  // 继而调用 HandleSocketFailed
}

// RPC 结束时会删除对应的定时器
void Controller::EndRPC(const CompletionInfo& info) {
  if (_timeout_id != 0) {
    bthread_timer_del(_timeout_id);
    _timeout_id = 0;
  }
  ...
}

bvar

状态信息收集

profiler

https://github.com/apache/incubator-brpc/blob/master/docs/cn/contention_profiler.md

参考链接

https://github.com/apache/incubator-brpc/blob/master/docs/cn/consistent_hashing.md

https://github.com/apache/incubator-brpc/blob/master/docs/cn/io.md

https://github.com/apache/incubator-brpc/blob/master/src/butil/iobuf.h

https://github.com/ronaldo8210/brpc_source_code_analysis/blob/master/docs/client_rpc_exception.md

https://zhuanlan.zhihu.com/p/113427004

https://zhuanlan.zhihu.com/p/169840204

https://sf-zhou.github.io/brpc/brpc_01_bthread.html

https://sf-zhou.github.io/brpc/brpc_02_memory.html

https://sf-zhou.github.io/brpc/brpc_03_socket.html

https://sf-zhou.github.io/brpc/brpc_04_schedule.html

https://sf-zhou.github.io/brpc/brpc_05_request.html