https://wanghenshui.github.io/my-slides/pelikan/ 整体概括在这里。简单看代码

基本框架


core_run(worker_processor)
core_worker_evloop(worker_processor)
core_server_evloop()
core_admin_evloop()

worker_processor可以有不同的构成。pelikan可以有多种server形态

worker线程是真正的干活的,绑定线程,开始等待事件

    for(;;) {
        if (_worker_evwait() != CC_OK) {
            log_crit("worker core event loop exited due to failure");
            break;
        }
    }

_worker_evwait event_wait 裸写 epoll_wait time_update 原子计数

worker在最开始已经绑定好context

core_worker_setup(worker_options_st *options, worker_metrics_st *metrics)
    ctx->evb = event_base_create(nevent, _worker_event);// 注册好callback

然后epoll事件处理

_worker_event(void *arg, uint32_t events) {
    struct buf_sock *s = arg;
    if (s == NULL) { /* event on pipe */
        if (events & EVENT_READ) { /* new connection from server */
            _worker_read_notification();
        }
        if (events & EVENT_WRITE) { /* retry return notification */
            _worker_write_notification();
        }
        if (events & EVENT_ERR) {
            log_error("error event received on pipe");
        }
    } else {
        /* event on one of the connections */
        if (events & EVENT_READ) {
            _worker_event_read(s);
        }
        if (events & EVENT_WRITE) {
            /* got here only when a previous write was incompleted/retried */
            log_verb("processing worker write event on buf_sock %p", s);
            INCR(worker_metrics, worker_event_write);
            if (_worker_event_write(s) == CC_OK) {
                /* write backlog cleared up, re-add read event (only) */
                event_del(ctx->evb, hdl->wid(s->ch));
                event_add_read(ctx->evb, hdl->rid(s->ch), s);
            }
        }
        if (events & EVENT_ERR) {
            s->ch->state = CHANNEL_TERM;

        }

        /* TODO(yao): come up with a robust policy about channel connection
         * and pending data. Since an error can either be server (usually
         * memory) issues or client issues (bad syntax etc), or requested (quit)
         * it is hard to determine whether the channel should be immediately
         * closed or not. A simplistic approach might be to always close asap,
         * and clients should not initiate closing unless they have received all
         * their responses. This is not as nice as the TCP half-close behavior,
         * but simpler to implement and probably fine initially.
         */
        if (s->ch->state == CHANNEL_TERM || s->ch->state == CHANNEL_ERROR) {
            worker_ret_stream(s);
        }
    }
}

这里和epoll常规代码差不多。多了个特殊场景,比如很多失败连接的突发场景,如何处理?worker_ret_stream

worker_ret_stream(struct buf_sock *s) {
    /* first clean up states that only worker thread understands,
     * and stop receiving event updates. then it's safe to return to server
     */
    processor->error(&s->rbuf, &s->wbuf, &s->data);
    event_del(ctx->evb, hdl->rid(s->ch));

    /* push buf_sock to queue */
    INCR(worker_metrics, worker_ret_stream);
    if (ring_array_push(&s, conn_term) != CC_OK) {
        /* here we have no choice but to clean up the stream to avoid leak */
        log_error("term connection queue is full");
        hdl->term(s->ch);
        buf_sock_reset(s);
        buf_sock_return(&s);
        return;
    }
    /* conn_term */
    _worker_write_notification();
}

一共就缓存那么多client,再多的错误client直接就踢了,_worker_write_notification会写eventfd/pipe通知一下, 连接队列满了,清一下

批量的动作,在异步线程,影响更低一些。在worker线程执行。难免多一些延迟。这是作者们的考量

接到异步消息,清理连接。这里把清理动作放在外部线程。降低worker线程对延迟

_server_event(void *arg, uint32_t events) {
    struct buf_sock *s = arg;
    if (s == NULL) { /* event on pipe */
        if (events & EVENT_READ) { /* terminating connection from worker */
            _server_read_notification();
        }
        if (events & EVENT_WRITE) { /* retrying worker notification */
            _server_write_notification();
        }
        if (events & EVENT_ERR) {

        }
    } else { /* event on listening socket */
        if (events & EVENT_READ) {

            _server_event_read(s);
        }
        if (events & EVENT_ERR) { /* effectively refusing new conn */
            /* TODO: shall we retry bind and listen ? */
            _server_close(s);
        }
    }
}

_server_read_notification接受的就是写pipe的信息,来做异步的清理动作

#ifdef USE_EVENT_FD
    int rc = read(efd_worker_to_server, &i, sizeof(uint64_t));
    if (rc < 0) {
        log_warn("not adding new connections due to eventfd error");
        return;
    }
#else
    i = pipe_recv(pipe_term, buf, RING_ARRAY_DEFAULT_CAP);
    if (i < 0) { /* errors, do not read from ring array */
        log_warn("not reclaiming connections due to pipe error");
        return;
    }
#endif

    /* each byte in the pipe corresponds to a connection in the array */
    for (; i > 0; --i) {
        status = ring_array_pop(&s, conn_term);
        if (status != CC_OK) {
            log_warn("event number does not match conn queue: missing %d conns",
                    i);
            return;
        }
        log_verb("Recycling buf_sock %p from worker thread", s);
        hdl->term(s->ch);
        buf_sock_reset(s);
        buf_sock_return(&s);
    }

_server_write_notification是通知worker的。双向

_worker_event_read _worker_event_write内部调用之前worker_processor注册好的 read|write|error函数

实际的代码处理,比如cdb

struct data_processor worker_processor = {
    cdb_process_read,
    cdb_process_write,
    cdb_process_error,
};

所以直接看这几个handle就行了,具体在各个db的data目录下的process文件。说实话没啥值得看的。segcache的设计可以看看

hotkey_sample

这个函数顾名思义。没啥说的感觉。算是亮点吧

涉及到细节就不想看了。眼高手低。下次有时间继续看吧


附录

一段bind代码

/* bind worker to the core */
cpu_set_t cpuset;
pthread_t thread = pthread_self();

CPU_ZERO(&cpuset);
CPU_SET(binding_core, &cpuset);

if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset) != 0) {
    log_warn("fail to bind worker thread to core %d: %s",
            binding_core, strerror(errno));
} else {
    log_info("binding worker thread to core %d", binding_core);
}