dynomite简单分析
[toc]
代码 https://github.com/Netflix/dynomite
整体框架 功能
本质是个代理,后面需要挂上redis
灵活的能感知/记录状态的高可用代理。实现的是这个方向
Q: why not twenproxy?这个项目诞生时间也是12-14年,和twenproxy几乎同时啊,欧美互联网都有memcache/redis代理需求
todo: 找机会走读一下twenproxy和codis
关于代理的问题
路由分片设计?
dnode通信,不同node负责一个token的range
基于hash做分片,hash算法是开始指定的,md5/crc32/murmurhash 任意选择
uint32_t dnode_peer_idx_for_key_on_rack(struct server_pool *pool,
struct rack *rack, uint8_t *key,
uint32_t keylen) {
struct dyn_token token;
pool->key_hash(key, keylen, &token);
return vnode_dispatch(&rack->continuums, rack->ncontinuum, &token);
}
token定义
struct dyn_token {
uint32_t signum;
uint32_t mag[4];
uint32_t len;
};
token是固定的,新增节点就重新分布
这东西和cassandra的概念一致,都是抄的amz
dnode_peer_add_node -> dnode_peer_pool_run -> vnode_update
如果新加机器呢?只能停一个节点dnode,配置文件上加一个server,然后拉起,重新触发vnode_update
路由推拉策略?
路由推拉一般都是三种节点部署模式可能遇到的,数据层变动了,proxy不知道,导致数据落错位置,实际上,这里是DHT环,后端只做存储,不感知自己负责的数据范围
同样的,搬迁也没有做,比如更新了vnode,数据应该有搬迁动作。这里没做
访问失败处理?
访问失败没有重试,准确的说,是没有访问失败的处理逻辑,默认一定能拿到peer,直接访问,不校验peer可能为空(bug吧)
代码简单走读
代码libevent/twenproxy味特重
main函数在dynomita.c里,写的比较清晰,有个函数很有意思
/**
* Set unlimited core dump resource limits.
*/
static void dn_coredump_init(void) {
struct rlimit core_limits;
core_limits.rlim_cur = core_limits.rlim_max = RLIM_INFINITY;
setrlimit(RLIMIT_CORE, &core_limits);
}
还能这么搞,第一次见
初始化主流程
static rstatus_t dn_run(struct instance *nci) {
rstatus_t status;
THROW_STATUS(core_start(nci));
struct context *ctx = nci->ctx;
struct server_pool *sp = &ctx->pool;
if (!sp->enable_gossip) core_set_local_state(ctx, NORMAL);
/* run rabbit run */
for (;;) {
status = core_loop(ctx);
if (status != DN_OK) {
break;
}
}
core_stop(ctx);
return DN_OK;
}
core_start初始化了:
- 链接队列(tailq 明显是libevent的)
- 任务管理task_mgr(rbtree管理)
- 初始化server pool,作为代理管理所有后端server
- TAILQ_INIT(&sp->c_conn_q); client connection q TAILQ_INIT(&sp->ready_conn_q);
- sp->key_hash = get_hash_func(cp->hash); 这里就是hash算法了,应该和路由分片设计相关
- serverpool后面有datastore指针,代表一组服务,内部有conn_pool/name
- core_stats_create 系统指标,p99之类的
- libevent初始化
- proxy_init/dnode_proxy_init ->_conn_get(free_connq) ->conn_listen conn
- 有一致性信息conn->read_consistency/conn->write_consistency
typedef enum consistency {
DC_ONE = 0,
DC_QUORUM,
DC_SAFE_QUORUM,
DC_EACH_SAFE_QUORUM,
} consistency_t;
- conn_listen比较经典(省略了错误处理)
rstatus_t conn_listen(struct context *ctx, struct conn *p) {
rstatus_t status;
struct server_pool *pool = &ctx->pool;
ASSERT((p->type == CONN_PROXY) || (p->type == CONN_DNODE_PEER_PROXY));
p->sd = socket(p->family, SOCK_STREAM, 0);
status = _conn_reuse(p);
status = bind(p->sd, p->addr, p->addrlen);
status = listen(p->sd, pool->backlog);
status = dn_set_nonblocking(p->sd);
status = dn_set_keepalive(p->sd, 1);
status = conn_event_add_conn(p);
status = conn_event_del_out(p);
}
- core_dnode_peer_init初始化CONN_DNODE_PEER_SERVER
接受一个链接
可以通过connection_type来找
typedef enum connection_type {
CONN_UNSPECIFIED,
CONN_PROXY, // a dynomite proxy (listening) connection
CONN_CLIENT, // this is connected to a client connection
CONN_SERVER, // this is connected to underlying datastore ...redis/memcache
CONN_DNODE_PEER_PROXY, // this is a dnode (listening) connection...default
// 8101
CONN_DNODE_PEER_CLIENT, // this is connected to a dnode peer client
CONN_DNODE_PEER_SERVER, // this is connected to a dnode peer server
} connection_type_t;
dnode是内部交互/复制之类的链接8101
proxy指的是对外服务的连接,处理命令 端口8102
相当于ops来做多态的动作了,不同的conn实现了不同的ops方法
struct conn_ops {
func_recv_t recv; /* recv (read) handler */
func_recv_next_t recv_next; /* recv next message handler */
func_recv_done_t recv_done; /* read done handler */
func_send_t send; /* send (write) handler */
func_send_next_t send_next; /* write next message handler */
func_send_done_t send_done; /* write done handler */
func_close_t close; /* close handler */
func_active_t active; /* active? handler */
func_ref_t ref; /* connection reference handler */
func_unref_t unref; /* connection unreference handler */
func_msgq_t enqueue_inq; /* connection inq msg enqueue handler */
func_msgq_t dequeue_inq; /* connection inq msg dequeue handler */
func_msgq_t enqueue_outq; /* connection outq msg enqueue handler */
func_msgq_t dequeue_outq; /* connection outq msg dequeue handler */
func_response_handler rsp_handler;
};
//这些指针封装了一系列的宏
#define conn_recv(ctx, conn) (conn)->ops->recv(ctx, conn)
#define conn_recv_next(ctx, conn, alloc) \
(conn)->ops->recv_next(ctx, conn, alloc)
#define conn_recv_done(ctx, conn, msg, nmsg) \
(conn)->ops->recv_done(ctx, conn, msg, nmsg)
#define conn_send(ctx, conn) (conn)->ops->send(ctx, conn)
#define conn_send_next(ctx, conn) (conn)->ops->send_next(ctx, conn)
#define conn_send_done(ctx, conn, msg) (conn)->ops->send_done(ctx, conn, msg)
#define conn_close(ctx, conn) (conn)->ops->close(ctx, conn)
#define conn_active(conn) (conn)->ops->active(conn)
#define conn_ref(conn, owner) (conn)->ops->ref(conn, owner)
#define conn_unref(conn) (conn)->ops->unref(conn)
#define conn_enqueue_inq(ctx, conn, msg) \
(conn)->ops->enqueue_inq(ctx, conn, msg)
#define conn_dequeue_inq(ctx, conn, msg) \
(conn)->ops->dequeue_inq(ctx, conn, msg)
#define conn_enqueue_outq(ctx, conn, msg) \
(conn)->ops->enqueue_outq(ctx, conn, msg)
#define conn_dequeue_outq(ctx, conn, msg) \
(conn)->ops->dequeue_outq(ctx, conn, msg)
TAILQ_HEAD(conn_tqh, conn);
dnode CONN_DNODE_PEER_PROXY
接受链接的动作绑定在ops上
struct conn_ops dnode_server_ops = {
dnode_recv, NULL, NULL, NULL, NULL,
NULL, dnode_close, NULL, dnode_ref, dnode_unref,
NULL, NULL, NULL, NULL, conn_cant_handle_response};
void init_dnode_proxy_conn(struct conn *conn) {
conn->dyn_mode = 1;
conn->type = CONN_DNODE_PEER_PROXY;
conn->ops = &dnode_server_ops;
}
其中dnode_recv调用dnode_accept
内容也很常规啊,accept之后给客户端分配CONN_DNODE_PEER_CLIENT
static rstatus_t dnode_accept(struct context *ctx, struct conn *p) {
rstatus_t status;
struct conn *c;
struct sockaddr_in client_address;
socklen_t client_len = sizeof(client_address);
int sd = 0;
ASSERT(p->type == CONN_DNODE_PEER_PROXY);
ASSERT(p->sd > 0);
ASSERT(p->recv_active && p->recv_ready);
for (;;) {
sd = accept(p->sd, (struct sockaddr *)&client_address, &client_len);
if (sd < 0) {
if (errno == EINTR) {
log_warn("accept on %s not ready - eintr", print_obj(p));
continue;
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
p->recv_ready = 0;
return DN_OK;
}
/*
* FIXME: On EMFILE or ENFILE mask out IN event on the proxy; mask
* it back in when some existing connection gets closed
*/
log_error("accept on %s failed: %s", print_obj(p), strerror(errno));
return DN_ERROR;
}
break;
}
char clntName[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &client_address.sin_addr.s_addr, clntName,
sizeof(clntName)) != NULL);
c = conn_get(p->owner, init_dnode_client_conn); //注意这行
c->sd = sd;
string_copy_c(&c->pname, (unsigned char *)dn_unresolve_peer_desc(c->sd));
stats_pool_incr(ctx, dnode_client_connections);
status = dn_set_nonblocking(c->sd);
if (p->family == AF_INET || p->family == AF_INET6) {
status = dn_set_tcpnodelay(c->sd);
}
status = conn_event_add_conn(c);
}
dnode CONN_DNODE_PEER_CLIENT
结构类似
struct conn_ops dnode_client_ops = {msg_recv,
dnode_req_recv_next,
dnode_req_recv_done,
msg_send,
dnode_rsp_send_next,
dnode_rsp_send_done,
dnode_client_close,
dnode_client_active,
dnode_client_ref,
dnode_client_unref,
NULL,
NULL,
dnode_req_client_enqueue_omsgq,
dnode_req_client_dequeue_omsgq,
dnode_client_handle_response};
void init_dnode_client_conn(struct conn *conn) {
conn->dyn_mode = 1;
conn->type = CONN_DNODE_PEER_CLIENT;
conn->ops = &dnode_client_ops;
}
client - proxy - server交互
在代码中画了流程图
/*
* Client+ Proxy Server+
* (dynomite)
* .
* msg_recv {read event} . msg_recv {read event}
* + . +
* | . |
* \ . /
* req_recv_next . rsp_recv_next
* + . +
* | . | Rsp
* req_recv_done . rsp_recv_done <===
* + . +
* | . |
* Req \ . /
* ===> req_filter* . *rsp_filter
* + . +
* | . |
* \ . /
* req_forward-// (a) . (c) \\-rsp_forward
* .
* .
* msg_send {write event} . msg_send {write event}
* + . +
* | . |
* Rsp' \ . / Req'
* <=== rsp_send_next . req_send_next ===>
* + . +
* | . |
* \ . /
* rsp_send_done-// (d) . (b) //-req_send_done
*/
proxy CONN_PROXY
逻辑是一样的
struct conn_ops proxy_ops = {proxy_recv, NULL, NULL, NULL, NULL, NULL,
proxy_close, NULL, proxy_ref, proxy_unref,
// enqueue, dequeues
NULL, NULL, NULL, NULL, conn_cant_handle_response};
void init_proxy_conn(struct conn *conn) {
conn->dyn_mode = 0;
conn->type = CONN_PROXY;
conn->ops = &proxy_ops;
}
proxy_recv里做proxy_accept,和上面的逻辑差不多,accept之后给客户端分配CONN_CLIENT
proxy CONN_CLIENT
一样的绑定方法
struct conn_ops client_ops = {msg_recv,
req_recv_next,
req_recv_done,
msg_send,
rsp_send_next,
rsp_send_done,
client_close,
client_active,
client_ref,
client_unref,
NULL,
NULL,
req_client_enqueue_omsgq,
req_client_dequeue_omsgq,
client_handle_response};
void init_client_conn(struct conn *conn) {
conn->dyn_mode = 0;
conn->type = CONN_CLIENT;
conn->ops = &client_ops;
}
不得不说,代码写的挺整齐,方便维护
msg_recv recv_next就是拿到消息,然后用mbuf保存
消息接收
msg_recv msg_recv_chain 处理/管理mbuf -> msg_parse -> conn_recv_next 这里循环解析 msg->parser(msg, ctx);,成功之后msg_parsed保存处理一下链表 mbuf等等
- msg-parser什么时候绑定?msg_get,一开始处理mbuf的时候就会标记
- redis_parse_req 具体parser
recv_done之后就会req_forward, 判断满足条件就会req_client_enqueue_omsgq
然后就是真正的转发到某个节点?dnode_peer_pool_server -> dnode_peer_for_key_on_rack判断key属于哪个dnode
一个简单的日志
[2021-07-13 14:56:22.566] conn_recv_data:352 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> recv 35 of 16320
[2021-07-13 14:56:22.566] redis_parse_req:1587 parsed command 'hset'
[2021-07-13 14:56:22.566] redis_parse_req:2364 parsed req 3 res 0 type 71 state 0 rpos 35 of 35
00000000 2a 34 0d 0a 24 34 0d 0a 68 73 65 74 0d 0a 24 31 |*4..$4..hset..$1|
00000010 0d 0a 68 0d 0a 24 31 0d 0a 61 0d 0a 24 31 0d 0a |..h..$1..a..$1..|
00000020 62 0d 0a |b..|
[2021-07-13 14:56:22.566] server_get_dc:570 server_get_dc dc 'localdc'
[2021-07-13 14:56:22.566] server_get_rack:602 server_get_rack 'localrack'
[2021-07-13 14:56:22.566] req_forward:927 >>>>>>>>>>>>>>>>>>>>>>> <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> RECEIVED <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> key 'h' tagged key 'h'
[2021-07-13 14:56:22.566] req_client_enqueue_omsgq:1314 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> enqueue outq <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35>
[2021-07-13 14:56:22.566] server_get_dc:570 server_get_dc dc 'localdc'
[2021-07-13 14:56:22.566] server_get_rack:602 server_get_rack 'localrack'
[2021-07-13 14:56:22.566] req_forward_all_racks_for_dc:733 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> same DC racks:1 expect replies 1
[2021-07-13 14:56:22.566] dnode_peer_pool_server:826 Entering dnode_peer_pool_conn ................................
[2021-07-13 14:56:22.566] dnode_peer_for_key_on_rack:812 dyn: key 'h' maps to server '127.0.0.1:8101'
[2021-07-13 14:56:22.566] req_forward_local_datastore:544 c_conn 0x1e0b270 got server conn 0x1dfee30
[2021-07-13 14:56:22.566] req_forward_local_datastore:552 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> FORWARD <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> to storage conn <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'>
[2021-07-13 14:56:22.566] event_add_out:137 adding conn <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> to active
[2021-07-13 14:56:22.567] msg_tmo_insert:276 insert req 3 into tmo rbt with expiry of 5000 msec
[2021-07-13 14:56:22.567] req_server_enqueue_imsgq:917 conn 0x1dfee30 enqueue inq 3:0
[2021-07-13 14:56:22.567] req_forward_local_datastore:600 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> local forward <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> to <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> len 35 key 'h'
[2021-07-13 14:56:22.567] conn_sendv_data:401 sendv on sd 6 35 of 35 in 1 buffers
[2021-07-13 14:56:22.567] req_server_dequeue_imsgq:931 conn 0x1dfee30 dequeue inq 3:0
[2021-07-13 14:56:22.567] req_server_enqueue_omsgq:947 conn 0x1dfee30 enqueue outq 3:0
[2021-07-13 14:56:22.567] event_del_out:165 removing conn <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> from active
[2021-07-13 14:56:22.567] conn_recv_data:352 <CONN_SERVER 0x1dfee30 6 to '127.0.0.1:22122:1'> recv 4 of 16320
[2021-07-13 14:56:22.567] redis_parse_rsp:2983 parsed rsp 4 res 0 type 159 state 0 rpos 4 of 4
00000000 3a 31 0d 0a |:1..|
[2021-07-13 14:56:22.567] server_ok:310 reset server '127.0.0.1:22122:1' failure count from 0 to 0
[2021-07-13 14:56:22.567] msg_tmo_delete:294 delete req 3 from tmo rbt
[2021-07-13 14:56:22.567] req_server_dequeue_omsgq:963 conn 0x1dfee30 dequeue outq 3:0
[2021-07-13 14:56:22.567] server_rsp_forward:788 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35> RECEIVED <RSP 0x1e0ba40 4:0 RSP_REDIS_INTEGER len:4>
[2021-07-13 14:56:22.567] msg_local_one_rsp_handler:1161 31503792 SELECTED 31504976
[2021-07-13 14:56:22.567] event_add_out:137 adding conn <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> to active
[2021-07-13 14:56:22.567] conn_sendv_data:401 sendv on sd 9 4 of 4 in 1 buffers
[2021-07-13 14:56:22.567] rsp_send_done:165 conn 0x1e0b270 rsp 0x1e0ba40 done
[2021-07-13 14:56:22.567] req_client_dequeue_omsgq:1328 <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> dequeue outq <REQ 0x1e0b5a0 3:0::0 REQ_REDIS_HSET, len:35>
[2021-07-13 14:56:22.567] rsp_send_done:181 conn 0x1e0b270 removing message 3:0
[2021-07-13 14:56:22.567] event_del_out:165 removing conn <CONN_CLIENT 0x1e0b270 9 from '127.0.0.1:37871'> from active
[2021-07-13 14:56:31.179] gossip_loop:794 Gossip is running ...
数据结构的管理这里没有梳理,有空再说吧
一致性
会发送给所有数据节点
if (request_send_to_all_local_racks(req)) {
// send request to all local racks
req_forward_all_racks_for_dc(ctx, c_conn, req, orig_mbuf, key, keylen, dc);
}
然后收到消息后检验满足条件rspmgr_check_is_done -> rspmgr_is_quorum_achieved
// Wait for only quorum number of responses before responding
bool rspmgr_check_is_done(struct response_mgr *rspmgr) {
uint8_t pending_responses = (uint8_t)(
rspmgr->max_responses - rspmgr->good_responses - rspmgr->error_responses);
// do the required calculation and tell if we are done here
if (rspmgr->good_responses >= rspmgr->quorum_responses) {
// We received enough good responses but do their checksum match?
if (rspmgr_is_quorum_achieved(rspmgr)) {
log_info("req %lu quorum achieved", rspmgr->msg->id);
rspmgr->done = true;
} else if (pending_responses) {
// There's a mismatch in checksum. Wait for any pending responses
rspmgr->done = false;
} else {
// no pending responses, and the checksum do not match.
rspmgr->done = true;
}
} else if ((pending_responses + rspmgr->good_responses) <
rspmgr->quorum_responses) {
// Even if we receive all the pending responses, still we do not form
// a quorum, So decision is done, no quorum possible
rspmgr->done = true;
}
return rspmgr->done;
}