pelikan代码走读
https://wanghenshui.github.io/my-slides/pelikan/ 整体概括在这里。简单看代码
基本框架
core_run(worker_processor)
core_worker_evloop(worker_processor)
core_server_evloop()
core_admin_evloop()
worker_processor
可以有不同的构成。pelikan可以有多种server形态
worker线程是真正的干活的,绑定线程,开始等待事件
for(;;) {
if (_worker_evwait() != CC_OK) {
log_crit("worker core event loop exited due to failure");
break;
}
}
_worker_evwait event_wait 裸写 epoll_wait time_update 原子计数
worker在最开始已经绑定好context
core_worker_setup(worker_options_st *options, worker_metrics_st *metrics)
ctx->evb = event_base_create(nevent, _worker_event);// 注册好callback
然后epoll事件处理
_worker_event(void *arg, uint32_t events) {
struct buf_sock *s = arg;
if (s == NULL) { /* event on pipe */
if (events & EVENT_READ) { /* new connection from server */
_worker_read_notification();
}
if (events & EVENT_WRITE) { /* retry return notification */
_worker_write_notification();
}
if (events & EVENT_ERR) {
log_error("error event received on pipe");
}
} else {
/* event on one of the connections */
if (events & EVENT_READ) {
_worker_event_read(s);
}
if (events & EVENT_WRITE) {
/* got here only when a previous write was incompleted/retried */
log_verb("processing worker write event on buf_sock %p", s);
INCR(worker_metrics, worker_event_write);
if (_worker_event_write(s) == CC_OK) {
/* write backlog cleared up, re-add read event (only) */
event_del(ctx->evb, hdl->wid(s->ch));
event_add_read(ctx->evb, hdl->rid(s->ch), s);
}
}
if (events & EVENT_ERR) {
s->ch->state = CHANNEL_TERM;
}
/* TODO(yao): come up with a robust policy about channel connection
* and pending data. Since an error can either be server (usually
* memory) issues or client issues (bad syntax etc), or requested (quit)
* it is hard to determine whether the channel should be immediately
* closed or not. A simplistic approach might be to always close asap,
* and clients should not initiate closing unless they have received all
* their responses. This is not as nice as the TCP half-close behavior,
* but simpler to implement and probably fine initially.
*/
if (s->ch->state == CHANNEL_TERM || s->ch->state == CHANNEL_ERROR) {
worker_ret_stream(s);
}
}
}
这里和epoll常规代码差不多。多了个特殊场景,比如很多失败连接的突发场景,如何处理?worker_ret_stream
worker_ret_stream(struct buf_sock *s) {
/* first clean up states that only worker thread understands,
* and stop receiving event updates. then it's safe to return to server
*/
processor->error(&s->rbuf, &s->wbuf, &s->data);
event_del(ctx->evb, hdl->rid(s->ch));
/* push buf_sock to queue */
INCR(worker_metrics, worker_ret_stream);
if (ring_array_push(&s, conn_term) != CC_OK) {
/* here we have no choice but to clean up the stream to avoid leak */
log_error("term connection queue is full");
hdl->term(s->ch);
buf_sock_reset(s);
buf_sock_return(&s);
return;
}
/* conn_term */
_worker_write_notification();
}
一共就缓存那么多client,再多的错误client直接就踢了,_worker_write_notification会写eventfd/pipe通知一下, 连接队列满了,清一下
批量的动作,在异步线程,影响更低一些。在worker线程执行。难免多一些延迟。这是作者们的考量
接到异步消息,清理连接。这里把清理动作放在外部线程。降低worker线程对延迟
_server_event(void *arg, uint32_t events) {
struct buf_sock *s = arg;
if (s == NULL) { /* event on pipe */
if (events & EVENT_READ) { /* terminating connection from worker */
_server_read_notification();
}
if (events & EVENT_WRITE) { /* retrying worker notification */
_server_write_notification();
}
if (events & EVENT_ERR) {
}
} else { /* event on listening socket */
if (events & EVENT_READ) {
_server_event_read(s);
}
if (events & EVENT_ERR) { /* effectively refusing new conn */
/* TODO: shall we retry bind and listen ? */
_server_close(s);
}
}
}
_server_read_notification接受的就是写pipe的信息,来做异步的清理动作
#ifdef USE_EVENT_FD
int rc = read(efd_worker_to_server, &i, sizeof(uint64_t));
if (rc < 0) {
log_warn("not adding new connections due to eventfd error");
return;
}
#else
i = pipe_recv(pipe_term, buf, RING_ARRAY_DEFAULT_CAP);
if (i < 0) { /* errors, do not read from ring array */
log_warn("not reclaiming connections due to pipe error");
return;
}
#endif
/* each byte in the pipe corresponds to a connection in the array */
for (; i > 0; --i) {
status = ring_array_pop(&s, conn_term);
if (status != CC_OK) {
log_warn("event number does not match conn queue: missing %d conns",
i);
return;
}
log_verb("Recycling buf_sock %p from worker thread", s);
hdl->term(s->ch);
buf_sock_reset(s);
buf_sock_return(&s);
}
_server_write_notification是通知worker的。双向
_worker_event_read _worker_event_write内部调用之前worker_processor注册好的 read|write|error函数
实际的代码处理,比如cdb
struct data_processor worker_processor = {
cdb_process_read,
cdb_process_write,
cdb_process_error,
};
所以直接看这几个handle就行了,具体在各个db的data目录下的process文件。说实话没啥值得看的。segcache的设计可以看看
hotkey_sample
这个函数顾名思义。没啥说的感觉。算是亮点吧
涉及到细节就不想看了。眼高手低。下次有时间继续看吧
附录
一段bind代码
/* bind worker to the core */
cpu_set_t cpuset;
pthread_t thread = pthread_self();
CPU_ZERO(&cpuset);
CPU_SET(binding_core, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0) {
log_warn("fail to bind worker thread to core %d: %s",
binding_core, strerror(errno));
} else {
log_info("binding worker thread to core %d", binding_core);
}