[toc]

代码 https://github.com/Netflix/dynomite

整体框架 功能

本质是个代理,后面需要挂上redis

灵活的能感知/记录状态的高可用代理。实现的是这个方向

Q: why not twenproxy?这个项目诞生时间也是12-14年,和twenproxy几乎同时啊,欧美互联网都有memcache/redis代理需求

todo: 找机会走读一下twenproxy和codis

关于代理的问题

路由分片设计?

dnode通信,不同node负责一个token的range

基于hash做分片,hash算法是开始指定的,md5/crc32/murmurhash 任意选择

uint32_t dnode_peer_idx_for_key_on_rack(struct server_pool *pool,
                                        struct rack *rack, uint8_t *key,
                                        uint32_t keylen) {
  struct dyn_token token;
  pool->key_hash(key, keylen, &token);
  return vnode_dispatch(&rack->continuums, rack->ncontinuum, &token);
}

token定义

struct dyn_token {
  uint32_t signum;
  uint32_t mag[4];
  uint32_t len;
};

token是固定的,新增节点就重新分布

这东西和cassandra的概念一致,都是抄的amz

dnode_peer_add_node -> dnode_peer_pool_run -> vnode_update

如果新加机器呢?只能停一个节点dnode,配置文件上加一个server,然后拉起,重新触发vnode_update

路由推拉策略?

路由推拉一般都是三种节点部署模式可能遇到的,数据层变动了,proxy不知道,导致数据落错位置,实际上,这里是DHT环,后端只做存储,不感知自己负责的数据范围

同样的,搬迁也没有做,比如更新了vnode,数据应该有搬迁动作。这里没做

访问失败处理?

访问失败没有重试,准确的说,是没有访问失败的处理逻辑,默认一定能拿到peer,直接访问,不校验peer可能为空(bug吧)

代码简单走读

代码libevent/twenproxy味特重

main函数在dynomita.c里,写的比较清晰,有个函数很有意思

/**
 * Set unlimited core dump resource limits.
 */
static void dn_coredump_init(void) {
  struct rlimit core_limits;
  core_limits.rlim_cur = core_limits.rlim_max = RLIM_INFINITY;
  setrlimit(RLIMIT_CORE, &core_limits);
}

还能这么搞,第一次见

初始化主流程

static rstatus_t dn_run(struct instance *nci) {
  rstatus_t status;
  THROW_STATUS(core_start(nci));
  struct context *ctx = nci->ctx;
  struct server_pool *sp = &ctx->pool;
  if (!sp->enable_gossip) core_set_local_state(ctx, NORMAL);
  /* run rabbit run */
  for (;;) {
    status = core_loop(ctx);
    if (status != DN_OK) {
      break;
    }
  }
  core_stop(ctx);
  return DN_OK;
}

core_start初始化了:

  • 链接队列(tailq 明显是libevent的)
  • 任务管理task_mgr(rbtree管理)
  • 初始化server pool,作为代理管理所有后端server
    • TAILQ_INIT(&sp->c_conn_q); client connection q TAILQ_INIT(&sp->ready_conn_q);
    • sp->key_hash = get_hash_func(cp->hash); 这里就是hash算法了,应该和路由分片设计相关
    • serverpool后面有datastore指针,代表一组服务,内部有conn_pool/name
  • core_stats_create 系统指标,p99之类的
  • libevent初始化
  • proxy_init/dnode_proxy_init ->_conn_get(free_connq) ->conn_listen conn
    • 有一致性信息conn->read_consistency/conn->write_consistency
typedef enum consistency {
  DC_ONE = 0,
  DC_QUORUM,
  DC_SAFE_QUORUM,
  DC_EACH_SAFE_QUORUM,
} consistency_t;
  • conn_listen比较经典(省略了错误处理)
rstatus_t conn_listen(struct context *ctx, struct conn *p) {
  rstatus_t status;
  struct server_pool *pool = &ctx->pool;
  ASSERT((p->type == CONN_PROXY) || (p->type == CONN_DNODE_PEER_PROXY));
  p->sd = socket(p->family, SOCK_STREAM, 0);
  status = _conn_reuse(p);
  status = bind(p->sd, p->addr, p->addrlen);
  status = listen(p->sd, pool->backlog);
  status = dn_set_nonblocking(p->sd);
  status = dn_set_keepalive(p->sd, 1);
  status = conn_event_add_conn(p);
  status = conn_event_del_out(p);
}
  • core_dnode_peer_init初始化CONN_DNODE_PEER_SERVER

接受一个链接

可以通过connection_type来找

typedef enum connection_type {
  CONN_UNSPECIFIED,
  CONN_PROXY,   // a dynomite proxy (listening) connection
  CONN_CLIENT,  // this is connected to a client connection
  CONN_SERVER,  // this is connected to underlying datastore ...redis/memcache
  CONN_DNODE_PEER_PROXY,   // this is a dnode (listening) connection...default
                           // 8101
  CONN_DNODE_PEER_CLIENT,  // this is connected to a dnode peer client
  CONN_DNODE_PEER_SERVER,  // this is connected to a dnode peer server
} connection_type_t;

dnode是内部交互/复制之类的链接8101

proxy指的是对外服务的连接,处理命令 端口8102

相当于ops来做多态的动作了,不同的conn实现了不同的ops方法

struct conn_ops {
  func_recv_t recv;           /* recv (read) handler */
  func_recv_next_t recv_next; /* recv next message handler */
  func_recv_done_t recv_done; /* read done handler */
  func_send_t send;           /* send (write) handler */
  func_send_next_t send_next; /* write next message handler */
  func_send_done_t send_done; /* write done handler */
  func_close_t close;         /* close handler */
  func_active_t active;       /* active? handler */

  func_ref_t ref;     /* connection reference handler */
  func_unref_t unref; /* connection unreference handler */

  func_msgq_t enqueue_inq;  /* connection inq msg enqueue handler */
  func_msgq_t dequeue_inq;  /* connection inq msg dequeue handler */
  func_msgq_t enqueue_outq; /* connection outq msg enqueue handler */
  func_msgq_t dequeue_outq; /* connection outq msg dequeue handler */
  func_response_handler rsp_handler;
};

//这些指针封装了一系列的宏
#define conn_recv(ctx, conn) (conn)->ops->recv(ctx, conn)
#define conn_recv_next(ctx, conn, alloc) \
  (conn)->ops->recv_next(ctx, conn, alloc)
#define conn_recv_done(ctx, conn, msg, nmsg) \
  (conn)->ops->recv_done(ctx, conn, msg, nmsg)

#define conn_send(ctx, conn) (conn)->ops->send(ctx, conn)
#define conn_send_next(ctx, conn) (conn)->ops->send_next(ctx, conn)
#define conn_send_done(ctx, conn, msg) (conn)->ops->send_done(ctx, conn, msg)

#define conn_close(ctx, conn) (conn)->ops->close(ctx, conn)
#define conn_active(conn) (conn)->ops->active(conn)
#define conn_ref(conn, owner) (conn)->ops->ref(conn, owner)
#define conn_unref(conn) (conn)->ops->unref(conn)

#define conn_enqueue_inq(ctx, conn, msg) \
  (conn)->ops->enqueue_inq(ctx, conn, msg)
#define conn_dequeue_inq(ctx, conn, msg) \
  (conn)->ops->dequeue_inq(ctx, conn, msg)
#define conn_enqueue_outq(ctx, conn, msg) \
  (conn)->ops->enqueue_outq(ctx, conn, msg)
#define conn_dequeue_outq(ctx, conn, msg) \
  (conn)->ops->dequeue_outq(ctx, conn, msg)
TAILQ_HEAD(conn_tqh, conn);

dnode CONN_DNODE_PEER_PROXY

接受链接的动作绑定在ops上

struct conn_ops dnode_server_ops = {
    dnode_recv, NULL,        NULL, NULL,      NULL,
    NULL,       dnode_close, NULL, dnode_ref, dnode_unref,
    NULL,       NULL,        NULL, NULL,      conn_cant_handle_response};
void init_dnode_proxy_conn(struct conn *conn) {
  conn->dyn_mode = 1;
  conn->type = CONN_DNODE_PEER_PROXY;
  conn->ops = &dnode_server_ops;
}

其中dnode_recv调用dnode_accept

内容也很常规啊,accept之后给客户端分配CONN_DNODE_PEER_CLIENT

static rstatus_t dnode_accept(struct context *ctx, struct conn *p) {
  rstatus_t status;
  struct conn *c;
  struct sockaddr_in client_address;
  socklen_t client_len = sizeof(client_address);
  int sd = 0;
  ASSERT(p->type == CONN_DNODE_PEER_PROXY);
  ASSERT(p->sd > 0);
  ASSERT(p->recv_active && p->recv_ready);
  for (;;) {
    sd = accept(p->sd, (struct sockaddr *)&client_address, &client_len);
    if (sd < 0) {
      if (errno == EINTR) {
        log_warn("accept on %s not ready - eintr", print_obj(p));
        continue;
      }

      if (errno == EAGAIN || errno == EWOULDBLOCK) {
        p->recv_ready = 0;
        return DN_OK;
      }

      /*
       * FIXME: On EMFILE or ENFILE mask out IN event on the proxy; mask
       * it back in when some existing connection gets closed
       */

      log_error("accept on %s failed: %s", print_obj(p), strerror(errno));
      return DN_ERROR;
    }

    break;
  }
  char clntName[INET_ADDRSTRLEN];
  inet_ntop(AF_INET, &client_address.sin_addr.s_addr, clntName,
                sizeof(clntName)) != NULL);
  c = conn_get(p->owner, init_dnode_client_conn); //注意这行
  c->sd = sd;
  string_copy_c(&c->pname, (unsigned char *)dn_unresolve_peer_desc(c->sd));
  stats_pool_incr(ctx, dnode_client_connections);
  status = dn_set_nonblocking(c->sd);
  if (p->family == AF_INET || p->family == AF_INET6) {
    status = dn_set_tcpnodelay(c->sd);
  }
  status = conn_event_add_conn(c);
}

dnode CONN_DNODE_PEER_CLIENT

结构类似

struct conn_ops dnode_client_ops = {msg_recv,
                                    dnode_req_recv_next,
                                    dnode_req_recv_done,
                                    msg_send,
                                    dnode_rsp_send_next,
                                    dnode_rsp_send_done,
                                    dnode_client_close,
                                    dnode_client_active,
                                    dnode_client_ref,
                                    dnode_client_unref,
                                    NULL,
                                    NULL,
                                    dnode_req_client_enqueue_omsgq,
                                    dnode_req_client_dequeue_omsgq,
                                    dnode_client_handle_response};

void init_dnode_client_conn(struct conn *conn) {
  conn->dyn_mode = 1;
  conn->type = CONN_DNODE_PEER_CLIENT;
  conn->ops = &dnode_client_ops;
}

client - proxy - server交互

在代码中画了流程图

/*

 *             Client+             Proxy           Server+
 *                              (dynomite)
 *                                   .
 *       msg_recv {read event}       .       msg_recv {read event}
 *         +                         .                         +
 *         |                         .                         |
 *         \                         .                         /
 *         req_recv_next             .             rsp_recv_next
 *           +                       .                       +
 *           |                       .                       |       Rsp
 *           req_recv_done           .           rsp_recv_done      <===
 *             +                     .                     +
 *             |                     .                     |
 *    Req      \                     .                     /
 *    ===>     req_filter*           .           *rsp_filter
 *               +                   .                   +
 *               |                   .                   |
 *               \                   .                   /
 *               req_forward-//  (a) . (c)  \\-rsp_forward
 *                                   .
 *                                   .
 *       msg_send {write event}      .      msg_send {write event}
 *         +                         .                         +
 *         |                         .                         |
 *    Rsp' \                         .                         /     Req'
 *   <===  rsp_send_next             .             req_send_next     ===>
 *           +                       .                       +
 *           |                       .                       |
 *           \                       .                       /
 *           rsp_send_done-//    (d) . (b)    //-req_send_done

*/

proxy CONN_PROXY

逻辑是一样的

struct conn_ops proxy_ops = {proxy_recv, NULL, NULL, NULL, NULL, NULL,
                             proxy_close, NULL, proxy_ref, proxy_unref,
                             // enqueue, dequeues
                             NULL, NULL, NULL, NULL, conn_cant_handle_response};

void init_proxy_conn(struct conn *conn) {
  conn->dyn_mode = 0;
  conn->type = CONN_PROXY;
  conn->ops = &proxy_ops;
}

proxy_recv里做proxy_accept,和上面的逻辑差不多,accept之后给客户端分配CONN_CLIENT

proxy CONN_CLIENT

一样的绑定方法

struct conn_ops client_ops = {msg_recv,
                              req_recv_next,
                              req_recv_done,
                              msg_send,
                              rsp_send_next,
                              rsp_send_done,
                              client_close,
                              client_active,
                              client_ref,
                              client_unref,
                              NULL,
                              NULL,
                              req_client_enqueue_omsgq,
                              req_client_dequeue_omsgq,
                              client_handle_response};

void init_client_conn(struct conn *conn) {
  conn->dyn_mode = 0;
  conn->type = CONN_CLIENT;
  conn->ops = &client_ops;
}

不得不说,代码写的挺整齐,方便维护

msg_recv recv_next就是拿到消息,然后用mbuf保存

消息接收

msg_recv msg_recv_chain 处理/管理mbuf -> msg_parse -> conn_recv_next 这里循环解析 msg->parser(msg, ctx);,成功之后msg_parsed保存处理一下链表 mbuf等等

  • msg-parser什么时候绑定?msg_get,一开始处理mbuf的时候就会标记
  • redis_parse_req 具体parser

recv_done之后就会req_forward, 判断满足条件就会req_client_enqueue_omsgq

然后就是真正的转发到某个节点?dnode_peer_pool_server -> dnode_peer_for_key_on_rack判断key属于哪个dnode

一个简单的日志

[2021-07-13 14:56:22.566] conn_recv_data:352 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> recv 35 of 16320
[2021-07-13 14:56:22.566] redis_parse_req:1587 parsed command 'hset'
[2021-07-13 14:56:22.566] redis_parse_req:2364 parsed req 3 res 0 type 71 state 0 rpos 35 of 35
00000000  2a 34 0d 0a 24 34 0d 0a  68 73 65 74 0d 0a 24 31   |*4..$4..hset..$1|
00000010  0d 0a 68 0d 0a 24 31 0d  0a 61 0d 0a 24 31 0d 0a   |..h..$1..a..$1..|
00000020  62 0d 0a                                           |b..|
[2021-07-13 14:56:22.566] server_get_dc:570 server_get_dc dc 'localdc'
[2021-07-13 14:56:22.566] server_get_rack:602 server_get_rack   'localrack'
[2021-07-13 14:56:22.566] req_forward:927 >>>>>>>>>>>>>>>>>>>>>>> <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> RECEIVED <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> key 'h' tagged key 'h'
[2021-07-13 14:56:22.566] req_client_enqueue_omsgq:1314 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> enqueue outq <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35>
[2021-07-13 14:56:22.566] server_get_dc:570 server_get_dc dc 'localdc'
[2021-07-13 14:56:22.566] server_get_rack:602 server_get_rack   'localrack'
[2021-07-13 14:56:22.566] req_forward_all_racks_for_dc:733 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> same DC racks:1 expect replies 1
[2021-07-13 14:56:22.566] dnode_peer_pool_server:826 Entering dnode_peer_pool_conn ................................
[2021-07-13 14:56:22.566] dnode_peer_for_key_on_rack:812 dyn: key 'h' maps to server '127.0.0.1:8101'
[2021-07-13 14:56:22.566] req_forward_local_datastore:544 c_conn 0x1e0b270 got server conn 0x1dfee30
[2021-07-13 14:56:22.566] req_forward_local_datastore:552 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> FORWARD <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> to storage conn <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'>
[2021-07-13 14:56:22.566] event_add_out:137 adding conn <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> to active
[2021-07-13 14:56:22.567] msg_tmo_insert:276 insert req 3 into tmo rbt with expiry of 5000 msec
[2021-07-13 14:56:22.567] req_server_enqueue_imsgq:917 conn 0x1dfee30 enqueue inq 3:0
[2021-07-13 14:56:22.567] req_forward_local_datastore:600 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> local forward <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> to <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> len 35 key 'h'
[2021-07-13 14:56:22.567] conn_sendv_data:401 sendv on sd 6 35 of 35 in 1 buffers
[2021-07-13 14:56:22.567] req_server_dequeue_imsgq:931 conn 0x1dfee30 dequeue inq 3:0
[2021-07-13 14:56:22.567] req_server_enqueue_omsgq:947 conn 0x1dfee30 enqueue outq 3:0
[2021-07-13 14:56:22.567] event_del_out:165 removing conn <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> from active
[2021-07-13 14:56:22.567] conn_recv_data:352 <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> recv 4 of 16320
[2021-07-13 14:56:22.567] redis_parse_rsp:2983 parsed rsp 4 res 0 type 159 state 0 rpos 4 of 4
00000000  3a 31 0d 0a                                        |:1..|
[2021-07-13 14:56:22.567] server_ok:310 reset server '127.0.0.1:22122:1' failure count from 0 to 0
[2021-07-13 14:56:22.567] msg_tmo_delete:294 delete req 3 from tmo rbt
[2021-07-13 14:56:22.567] req_server_dequeue_omsgq:963 conn 0x1dfee30 dequeue outq 3:0
[2021-07-13 14:56:22.567] server_rsp_forward:788 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> RECEIVED <RSP 0x1e0ba40 4:0 RSP_REDIS_INTEGER len:4>
[2021-07-13 14:56:22.567] msg_local_one_rsp_handler:1161 31503792 SELECTED 31504976
[2021-07-13 14:56:22.567] event_add_out:137 adding conn <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> to active
[2021-07-13 14:56:22.567] conn_sendv_data:401 sendv on sd 9 4 of 4 in 1 buffers
[2021-07-13 14:56:22.567] rsp_send_done:165 conn 0x1e0b270 rsp 0x1e0ba40 done
[2021-07-13 14:56:22.567] req_client_dequeue_omsgq:1328 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> dequeue outq <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35>
[2021-07-13 14:56:22.567] rsp_send_done:181 conn 0x1e0b270 removing message 3:0
[2021-07-13 14:56:22.567] event_del_out:165 removing conn <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> from active
[2021-07-13 14:56:31.179] gossip_loop:794 Gossip is running ...

数据结构的管理这里没有梳理,有空再说吧

一致性

会发送给所有数据节点

  if (request_send_to_all_local_racks(req)) {
    // send request to all local racks
    req_forward_all_racks_for_dc(ctx, c_conn, req, orig_mbuf, key, keylen, dc);
  } 

然后收到消息后检验满足条件rspmgr_check_is_done -> rspmgr_is_quorum_achieved

// Wait for only quorum number of responses before responding
bool rspmgr_check_is_done(struct response_mgr *rspmgr) {
  uint8_t pending_responses = (uint8_t)(
      rspmgr->max_responses - rspmgr->good_responses - rspmgr->error_responses);
  // do the required calculation and tell if we are done here
  if (rspmgr->good_responses >= rspmgr->quorum_responses) {
    // We received enough good responses but do their checksum match?
    if (rspmgr_is_quorum_achieved(rspmgr)) {
      log_info("req %lu quorum achieved", rspmgr->msg->id);
      rspmgr->done = true;
    } else if (pending_responses) {
      // There's a mismatch in checksum. Wait for any pending responses
      rspmgr->done = false;
    } else {
      // no pending responses, and the checksum do not match.
      rspmgr->done = true;
    }
  } else if ((pending_responses + rspmgr->good_responses) <
             rspmgr->quorum_responses) {
    // Even if we receive all the pending responses, still we do not form
    // a quorum, So decision is done, no quorum possible
    rspmgr->done = true;
  }
  return rspmgr->done;
}