raft资料收集

[toc]


三个角色

  • Leader:接受客户端请求,并向Follower同步请求日志,当日志同步到大多数节点上后告诉Follower提交日志。
  • Follower:接受并持久化Leader同步的日志,在Leader告之日志可以提交之后,提交日志。
  • Candidate:Leader选举过程中的临时角色。

Follower只响应其他服务器的请求。如果Follower超时没有收到Leader的消息,它会成为一个Candidate并且开始一次Leader选举。收到大多数服务器投票的Candidate会成为新的Leader。Leader在宕机之前会一直保持Leader的状态。

优化的角色会加上learner

learner设计 https://www.jianshu.com/p/1037fb5a63ac

解决的问题: 分区,leader负载高

Leader选举(Leader Election)

Raft 使用心跳(heartbeat)触发Leader选举。当服务器启动时,初始化为Follower。Leader向所有Followers周期性发送heartbeat。如果Follower在选举超时时间内没有收到Leader的heartbeat,就会等待一段随机的时间后发起一次Leader选举。

Follower将其当前term加一然后转换为Candidate。它首先给自己投票并且给集群中的其他服务器发送 RequestVote RPC

  • 赢得了多数的选票,成功选举为Leader;
  • 收到了Leader的消息,表示有其它服务器已经抢先当选了Leader;
  • 没有服务器赢得多数的选票,Leader选举失败,等待选举时间超时后发起下一次选举。

Q: 时间过长?

两个超时设定

  • 心跳超时:Leader周期性的向Follower发送心跳(0.5ms – 20ms),如果Follower在选举超时时间内没有收到心跳,则触发选举。
  • 选举超时:如果存在两个或者多个节点选主,都没有拿到大多数节点的应答,需要重新选举,Raft引入随机的选举超时时间(150ms – 300ms),避免选主活锁。

心跳超时要小于选举超时一个量级,Leader才能够发送稳定的心跳消息来阻止Follower开始进入选举状态。可以设置:心跳超时=peers max RTT(round-trip time),选举超时=10 * 心跳超时

日志同步(Log Replication)

multi-paxos允许日志乱序提交,也就是说允许日志中存在空洞。

raft协议增加了日志顺序性的保证,每个节点只能顺序的commit日志。顺序性日志简化了一致性协议复杂程度,当然在性能上也有了更多的限制,为此,工程上又有了很多对应的优化,如:batch、pipline、leader stickness等等。

约定

  • Raft要求所有的日志不允许出现空洞。
  • Raft的日志都是顺序提交的,不允许乱序提交。
  • Leader不会覆盖和删除自己的日志,只会Append。
  • Follower可能会截断自己的日志。存在脏数据的情况。
  • Committed的日志最终肯定会被Apply。
  • Snapshot中的数据一定是Applied,那么肯定是Committed的。
  • commitIndex、lastApplied不会被所有节点持久化。
  • Leader通过提交一条Noop日志来确定commitIndex。
  • 每个节点重启之后,先加载上一个Snapshot,再加入RAFT复制组

Leader选出后,就开始接收客户端的请求。Leader把请求作为日志条目(Log entries)加入到它的日志中,然后并行的向其他服务器发起 AppendEntries RPC 复制日志条目。当这条日志被复制到大多数服务器上,Leader将这条日志应用到它的状态机并向客户端返回

Followers可能没有成功的复制日志,Leader会无限的重试 AppendEntries RPC直到所有的Followers最终存储了所有的日志条目。

Q:掉线永远复制不成功

Raft日志同步保证如下两点:

  • 如果不同日志中的两个条目有着相同的索引和任期号,则它们所存储的命令是相同的。
  • 如果不同日志中的两个条目有着相同的索引和任期号,则它们之前的所有条目都是完全一样的。

第一条特性源于Leader在一个term内在给定的一个log index最多创建一条日志条目,同时该条目在日志中的位置也从来不会改变。

第二条特性源于 AppendEntries 的一个简单的一致性检查。当发送一个 AppendEntries RPC 时,Leader会把新日志条目紧接着之前的条目的log index和term都包含在里面。如果Follower没有在它的日志中找到log index和term都相同的日志,它就会拒绝新的日志条目。

Q:pipline的优化

multi-raft复用心跳/通信socket

Q:leader切换导致多个不同

Leader通过强制Followers复制它的日志来处理日志的不一致,Followers上的不一致的日志会被Leader的日志覆盖。

Leader为了使Followers的日志同自己的一致,Leader需要找到Followers同它的日志一致的地方,然后覆盖Followers在该位置之后的条目。

Leader会从后往前试,每次AppendEntries失败后尝试前一个日志条目,直到成功找到每个Follower的日志一致位点,然后向后逐条覆盖Followers在该位置之后的条目。

流程

  • Leader 先将该命令追加到自己的日志中;先持久化再传播

  • Leader 并行地向其它节点发送 AppendEntries RPC,等待响应;

  • 收到超过半数节点的响应,则认为新的日志记录是被提交的:
    • Leader 将命令传给自己的状态机,然后向客户端返回响应
    • 此外,一旦 Leader 知道一条记录被提交了,将在后续的 AppendEntries RPC 中通知已经提交记录的 Followers
    • Follower 将已提交的命令传给自己的状态机
  • 如果 Follower 宕机/超时:Leader 将反复尝试发送 RPC;

  • 性能优化:Leader 不必等待每个 Follower 做出响应,只需要超过半数的成功响应(确保日志记录已经存储在超过半数的节点上)——一个很慢的节点不会使系统变慢,因为 Leader 不必等他

AppendEntries 一致性检查

Raft 通过 AppendEntries RPC 来检测这两个属性。

  • 对于每个 AppendEntries RPC 包含新日志记录之前那条记录的索引(prevLogIndex)和任期(prevLogTerm);
  • Follower 检查自己的 index 和 term 是否与 prevLogIndexprevLogTerm 匹配,匹配则接收该记录;否则拒绝;
  • Leader 通过 nextIndex 来修复日志。当 AppendEntries RPC 一致性检查失败,递减 nextIndex 并重试。

Log Compaction

崩溃恢复(Crash Recovery)

成员变更(Membership Change)

Raft成员变更的工程实践

新成员先加入再同步数据还是先同步数据再加入

  优点 缺点
新成员先加入再同步数据 简单并且快速,能加入还不存在的成员 可能降低服务的可用性
新成员先同步数据再加入 不会影响服务可用性 复杂并且较慢,不能加入还不存在的成员

新成员先加入再同步数据,成员变更可以立即完成,并且因为只要大多数成员同意即可加入,甚至可以加入还不存在的成员,加入后再慢慢同步数据。但在数据同步完成之前新成员无法服务,但新成员的加入可能让多数派集合增大,而新成员暂时又无法服务,此时如果有成员发生Failover,很可能导致无法满足多数成员存活的条件,让服务不可用。因此新成员先加入再同步数据,简化了成员变更,但可能降低服务的可用性。

新成员先同步数据再加入,成员变更需要后台异步进行,先将新成员作为Learner角色加入,只能同步数据,不具有投票权,不会增加多数派集合,等数据同步完成后再让新成员正式加入,正式加入后可立即开始工作,不影响服务可用性。因此新成员先同步数据再加入,不影响服务的可用性,但成员变更流程复杂,并且因为要先给新成员同步数据,不能加入还不存在的成员。

https://zhuanlan.zhihu.com/p/359206808

优化点

SetPeer

Raft只能在多数节点存活的情况下才可以正常工作,在实际环境中可能会存在多数节点故障只存活一个节点的情况,这个时候需要提供服务并修复数据。因为已经不能达到多数,不能写入数据,也不能做正常的节点变更。Raft库需要提供一个SetPeer的接口,设置每个节点的复制组节点列表,便于故障恢复。

假设只有一个节点S1存活的情况下,SetPeer设置节点列表为{S1},这样形成一个只有S1的节点列表,让S1继续提供读写服务,后续再调度其他节点进行AddPeer。通过强制修改节点列表,可以实现最大可用模式。

Noop

在分布式系统中,对于一个请求都有三种返回结果:成功、失败、超时。

在failover时,新的Leader由于没有持久化commitIndex,所以并不清楚当前日志的commitIndex在哪,也即不清楚log entry是committed还是uncommitted状态。通常在成为新Leader时提交一条空的log entry来提交之前所有的entry。

RAFT中增加了一个约束:对于之前Term的未Committed数据,修复到多数节点,且在新的Term下至少有一条新的Log Entry被复制或修复到多数节点之后,才能认为之前未Committed的Log Entry转为Committed。即最大化commit原则:Leader在当选后立即追加一条Noop并同步到多数节点,实现之前Term uncommitted的entry隐式commit。

  • 保证commit的数据不会丢。
  • 保证不会读到uncommitted的数据。

MultiRaft

元数据相比数据来说整体数据量要小的多,通常单台机器就可以存储。我们也通常借助于Etcd等使用单个Raft Group来进行元数据的复制和管理。但是单个Raft Group,存在以下两点弊端:

  • 集群的存储容量受限于单机存储容量(排除使用分布式存储)。
  • 集群的性能受限于单机性能(读写都由Leader处理)。

对于集群元数据来说使用单个Raft Group是够了,但是如果想让Raft用于数据的复制,那么必须得使用MultiRaft,也即有多个复制组,类似于Ceph的PG,每个PG、Raft Group是一个复制组。

多个复制组共用心跳

还需解决的问题

  • 负载均衡:可以通过Transfer Leadership的功能保持每个物理节点上Leader个数大致相当。
  • 链接复用:一个物理节点上的所有Raft Group复用链接。会有心跳合并、Lease共用等。
  • 中心节点:用来管理集群包括MultiRaft,使用单个Raft Group做高可靠,类似Ceph Mon。

性能优化

Batch

  • Batch写入落盘:对每一条Log Entry都进行fsync刷盘效率会比较低,可以在内存中缓存多个Log Entry Batch写入磁盘,提高吞吐量,类似于Ceph FileStore批量的写Journal。
  • Batch网络发送:Leader也可以一次性收集多个Log Entry,批量的发送给Follower。
  • Batch Apply:批量的Apply已经commit的Log到业务状态机。

Batch并不会对请求做延迟来达到批量处理的目的,对单个请求的延迟没有影响。

PipeLine

Raft依赖Leader来保持集群的数据一致性,数据的复制都是从Leader到Follower。一个简单的写入流程如下,性能是完全不行的:

  1. Leader收到Client请求。
  2. Leader将数据Append到自己的Log。
  3. Leader将数据发送给其他的Follower。
  4. Leader等待Follower ACK,大多数节点提交了Log,则Apply。
  5. Leader返回Client结果。
  6. 重复步骤1。

Leader跟其他节点之间的Log同步是串行Batch的方式,如果单纯使用Batch,每个Batch发送之后Leader依旧需要等待该Batch同步完成之后才能继续发送下一个Batch,这样会导致较长的延迟。可以通过Leader跟其他节点之间的PipeLine复制来改进,会有效降低延迟。

Parallel

顺序提交

将Leader Append持久化日志和向Followers发送日志并行处理。Leader只需要在内存中保存未Committed的Log Entry,在多数节点已经应答的情况下,无需等待Leader本地IO完成,直接将内存中的Log Entry直接Apply给状态机即可。

乱序提交

Out-of-Order参考:PolarFS: ParallelRaftBlueStore源码分析之事物状态机:IO保序

乱序提交要满足以下两点

  1. Log Entry之间不存在覆盖写,则可以乱序Commit、Apply。
  2. Log Entry之间存在覆盖写,不可以乱序,只能顺序Commit、Apply。

上层不同的应用场景限制了提交的方式

  • 对IO保序要求比较严格,那么只能使用顺序提交
  • 对IO保序没有要求,可以IO乱序完成,那么可顺序提交、乱序提交都可以使用

不同的分布式存储需要的提交方式

  • 分布式数据库(乱序提交):其上层可串行化的事物就可以保证数据一致性,可以容忍底层IO乱序完成的情况。
  • 分布式KV存储(乱序提交):多个KV之间(排除上层应用语义)本身并无相关性,也不需要IO保序,可以容忍IO乱序。
  • 分布式对象存储(乱序提交):本来就不保证同一对象的并发写入一致性,那么底层也就没必要顺序接收顺序完成IO,天然容忍IO乱序。
  • 分布式块存储(顺序提交):由于在块存储上可以构建不同的应用,而不同的应用对IO保序要求也不一样,所以为了通用性只能顺序提交。
  • 分布式文件存储(顺序提交):由于可以基于文件存储(Posix等接口)构建不同的应用,而不同的应用对IO保序要求也不一样,所以为了通用性只能顺序提交,当然特定场景下可以乱序提交,比如PolarFS适用于数据库。
  • 分布式存储具体能否乱序提交最终依赖于应用语义能否容忍存储IO乱序完成

简单分析

单个Raft Group只能顺序提交日志,多个Raft Group之间虽然可以做到并行提交日志,但是受限于上层应用(数据库等)的跨Group分布式事物,可能导致其他不相关的分布式事物不能并行提交,只能顺序提交。

上层应用比如数据库的分布式事物是跨Group(A、B、C)的,Group A被阻塞了,分布式事物不能提交, 那么所有的参数者Group(B、C)就不能解锁,进而不能提交其他不相关的分布式事物,从而引发多个Group的链式反应。

Raft不适用于多连接的高并发环境中,Leader和Follower维持多条连接的情况在生产环境也很常见,单条连接是有序的,多条连接并不能保证有序,有可能发送次序靠后的Log Entry先于发送次序靠前的Log Entry达到Follower,但是Raft规定Follower必须按次序接受Log Entry,就意味着即使发送次序靠后的Log Entry已经写入磁盘了(实际上不能落盘得等之前的Log Entry达到)也必须等到前面所有缺失的Log Entry达到后才能返回。如果这些Log Entry是业务逻辑顺序无关的,那么等待之前未到达的Log Entry将会增加整体的延迟。

其实Raft的日志复制和Ceph基于PG Log的复制一样,都是顺序提交的,虽然可以通过Batch、PipeLine优化,但是在并发量大的情况下延迟和吞吐量仍然上不去。

具体Raft乱序提交的实现可参考:PolarFS: ParallelRaft

Asynchronous

我们知道被committed的日志肯定是可以被Apply的,在什么时候Apply都不会影响数据的一致性。所以在Log Entry被committed之后,可以异步的去Apply到业务状态机,这样就可以并行的Append Log和Apply Log了,提升系统的吞吐量。

其实就和Ceph BlueStore的kv_sync_threadkv_finalize_thread一样,每个线程都有其队列。kv_sync_thread去写入元数据到RocksDB(请求到此已经成功),kv_finalize_thread去异步的回调上层应用通知请求成功。

ReadIndex

Raft的写入流程时走一遍Raft,保证了数据的一致性。为了实现线性一致性读,读流程也可以走一遍Raft,但是会产生磁盘IO,性能不好。Leader具有最新的数据,理论上Leader可以读取到最新的数据。但是在网络分区的情况下,无法确定当前的Leader是不是真的Leader,有可能当前Leader与其他节点发生了网络分区,其他节点形成了一个Group选举了新的Leader并更新了一些数据,此时如果Client还从老的Leader读取数据,便会产生Stale Read

读流程走一遍Raft、ReadIndex、Lease Read都是用来实现线性一致性读,避免Stale Read。

  1. 当收到读请求时,Leader先检查自己是否在当前Term commit过entry,没有否则直接返回。
  2. 然后Leader将自己当前的commitIndex记录到变量ReadIndex里面。
  3. 向Follower发起Heartbeat,收到大多数ACK说明自己还是Leader。
  4. Leader等待 applyIndex >= ReadIndex,就可以提供线性一致性读。
  5. 返回给状态机,执行读操作返回结果给Client。

线性一致性读:在T1时刻写入的值,在T1时刻之后读肯定可以读到。也即读的数据必须是读开始之后的某个值,不能是读开始之前的某个值。不要求返回最新的值,返回时间大于读开始的值就可以。

注意:在新Leader刚刚选举出来Noop的Entry还没有提交成功之前,是不能够处理读请求的,可以处理写请求。也即需要步骤1来防止Stale Read。

原因在新Leader刚刚选举出来Noop的Entry还没有提交成功之前,这时候的commitIndex并不能够保证是当前整个系统最新的commitIndex。考虑这个情况:w1->w2->w3->noop| commitIndex在w1;w2、w3对w1有更新;应该读的值是w3因为commitIndex之后可能还有Log Entry对该值更新,只要w1Apply到业务状态机就可以满足applyIndex >= ReadIndex,此时就可以返回w1的值,但是此时w2、w3还未Apply到业务状态机,就没法返回w3,就会产生Stale Read。必须等到Noop执行完才可以执行读,才可以避免Stale Read。

Follower Read

如果是热点数据么可以通过提供Follower Read来减轻Leader的读压力,可用非常方便的通过ReadIndex实现。

  1. Follower向Leader请求ReadIndex。
  2. Leader执行完ReadIndex章节的前4步(用来确定Leader是真正的Leader)。
  3. Leader返回commitIndex给Follower作为ReadIndex。
  4. Follower等待 applyIndex >= ReadIndex,就可以提供线性一致性读。
  5. 返回给状态机,执行读操作返回结果给Client。

Lease Read

Lease Read相比ReadIndex更近了一步,不仅省去了Log的磁盘开销,还省去了Heartbeat的网络开销,提升读的性能。

基本思路:Leader获取一个比election timeout小的租期(Lease),因为Follower至少在election timeout时间之后才会发送选举,那么在Lease内是不会进行Leader选举,就可以跳过ReadIndex心跳的环节,直接从Leader上读取。但是Lease Read的正确性是和时间挂钩的,如果时钟漂移比较严重,那么Lease Read就会产生问题。

  1. Leader定时发送(心跳超时时间)Heartbeat给Follower, 并记录时间点start。
  2. 如果大多数回应,那么新的Lease到期时间为start + Lease(<election timeout)
  3. Leader确认自己是Leader后,等待applyIndex >= ReadIndex,就可以提供线性一致性读。
  4. 返回给状态机,执行读操作返回结果给Client。

Quorum read

Raft 的读虽然可以发送给 follower,但还是要从 leader 获取 readIndexleader 的压力会很大。使用 quorum read 可以利用 follower 读,减小 leader 的压力, 提高读的吞吐量和性能: Improving Read Scalability in Raft-like consensus protocols

Double Write-Store

我们知道Raft把数据Append到自己的Log的同时发送请求给Follower,多数回复ACK就认为commit,就可以Apply到业务状态机了。如果业务状态机(分布式KV、分布式对象存储等)也把数据持久化存储,那么数据便Double Write-Store,集群中存在两份相同的数据,如果是三副本,那么就会有6份。

接下来主要思考元数据、数据做的一点点优化。

通常的一个优化方式就是先把数据写入Journal(环形队列、大小固定、空间连续、使用3D XPoint、NVME),然后再把数据写入内存即可返回,最后异步的把数据刷入HDD(最好带有NVME缓存)。

元数据

元数据通常使用分布式KV存储,数据量比较小,Double Write-Store影响不是很大,即使存储两份也不会浪费太多空间,而且以下改进也相比数据方面的改进更容易实现。

可以撸一个简单的Append-Only的单机存储引擎WAL来替代RocksDB作为Raft Log的存储引擎,Apply业务状态机层的存储引擎可以使用RocksDB,但是可以关闭RocksDB的WAL,因为数据已经存储在Append-Only的Raft Log了,细节仍需考虑。

数据

这里的数据通常指非结构化数据:图片、文档、音视频等。非结构化数据通常使用分布式对象存储、块存储、文件存储等来存储,由于数据量比较大,Double Store是不可接受的,大致有两种思路去优化:

  1. Raft Log、User Data分开存:Raft Log只存op-cmd,不存data,类似于Ceph的PG Log。
  2. Raft Log、User Data一起存:作为同一份数据来存储。Bitcask模型天然Append更容易实现。

ref

  1. https://shimingyah.github.io/2020/03/浅谈分布式存储之raft/#
  2. raft论文中文翻译 https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
  3. https://github.com/hashicorp/raft
  4. http://www.philipotoole.com/
  5. https://github.com/otoolep/hraftd
  6. https://github.com/RaftLib/RaftLib/wiki/Getting-Started
  7. https://github.com/feixiao/Distributed-Systems
  8. https://github.com/happyfish100/fastdfs
  9. http://www.cs.utexas.edu/~vijay/papers/pebblesdb-sosp17-slides.pdf
  10. http://www.philipotoole.com/building-a-distributed-key-value-store-using-raft/
  11. https://www.nosuchfield.com/2019/01/26/Distributed-systems-for-fun-and-profit-study-notes/
  12. braft https://github.com/baidu/braft/blob/master/docs/cn/raft_protocol.md
  13. etcd 源码解析 https://jiajunhuang.com/articles/2018_11_20-etcd_source_code_analysis_raftexample.md.html
  14. https://zhuanlan.zhihu.com/p/348680213
  15. https://youjiali1995.github.io/raft/raft-todo/
  16. leader lease https://www.jianshu.com/p/072380e12657

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

(转)beansdb


是beansdb的一些资料整理和总结

转自这里 http://sunisdown.me/gobeansdb-jia-gou-she-ji.html

BeansDB 是豆瓣内部实现的分布式存储系统,最开始的时候是 Davies 用 C 来实现的。2015 年的时候,内部决定开始用 Go 来重新写 BeansDB,然后 xiufeng 与 zzl 就实现了现在的 GoBeansDB。

为什么要自己实现一套 k/v 存储

我在刚刚接手 GoBeansDB 的时候,想过这个问题。既然有那么多优秀的数据库系统,为什么豆瓣还需要自己重新实现一套 k/v 存储? 这个问题可以拆分成两个方面,一个是为什么要用 K/V 数据库。一个是为什么要自己造轮子。

  1. 首先是因为数据大,而且数据是非结构化数据,像豆瓣的日记,就是一批很长的字符串。
  2. 其次是非常多的随机读。
  3. 有的时候会有大量的写操作
  4. 不需要外键什么的

上面四点可以排除掉类似 MySQL 这种传统的关系型数据库。

排除掉传统的关系行数据库之后,就需要对比现存的 K/V 数据库。

现在比较流行的 K/V 数据库有 LevelDBMongoDB ,还有 Cassandra ,现在看来这些项目都足够成熟。但是如果追溯到 BeansDB 项目最开始的时候,也就是 2012 年的时候,那个时间点并没有太好的选择。即使现在看来,除了 Cassandra 之外,像 LevelDBMongoDB 也不能满足我们的目标:

  1. 读写都需要比较低的 latency
  2. 数据量非常大,所以数据要写在磁盘上,数据库需要能够容纳比内存大的多的数据
  3. 高可用,单点故障不影响系统正常运行
  4. 高吞吐,尤其是针对写操作
  5. 能够快速恢复有问题的节点

这 5 点也可以排除调 MongoDBLevelDB

当然上面这些都是我做的推断,但是这些应该都不是最主要的原因。最主要的原因应该是豆瓣的工程师文化比较好,鼓励工程师去寻找一个最贴合业务的解决方案,并且这个工程师的团队还足够强,两者缺一不可。如果没有很强的工程师文化,可能会直接引入开源的解决方案,虽然不一定合适,但是应该足够解决痛点。如果工程师实力不够,也就没有能力去自己实现一套类似的系统。而且与其去引入一个复杂的,自己无法完全掌控的开源项目,不如自己实现一套贴合业务的,简单的系统。这样内部可以根据业务的需要来作调整,同时自己实现一个系统也比完全掌握一个庞大的开源项目要简单。一旦出现问题也比较容易找到问题所在。

为什么要用 Go 重新实现 BeansDB

BeansDB 是用 C 来实现的,为什么现在改用 Go 来实现?

  • 一个很重要的原因是 Go 的代码相比与 C 更容易维护。对一个工程师而言,Go 的学习成本比 C 要低很多。
    • 用 Go 可以比较快速的写出健壮的系统,而用 C 来写的话,则需要一定的经验积累。
    • Go 提供了可用的测试框架,写测试相对于 C/C++ 要方便
    • Go 标准库里面提供了方便的性能分析工具,用 C 也有类似的工具,但是做不到开箱即用
  • 还有 Go 的标准库也足够完善,不需要用 C 来重复造轮子。
  • Go 的执行效率虽然比 C 差,但是 BeansDB 的瓶颈是 IOPS,所以 Go 的执行效率并不会成为瓶颈。

GoBeansDB 的架构设计

GoBeansDB 是基于 DynamoBitcask 两篇论文来做的实现,这里优先讨论基于 Bitcask 实现的存储引擎。Bitcask 有一个缺点在于所有的 key 值都必须放在内存里面,GoBeansDB 这这个基础之上做了一些优化,绕过了 Bitcask 这个痛点。

GobeansDB 的存储有有两个比较重要的组成部分,一个是索引(htree),一个是数据文件(data)。索引与数据文件组成 Bucket。Bucket 的概念类似与关系行数据库里面的 table,在 GoBeansDB 的实现中就是给一个 Bucket 分配一个文件夹,这个文件夹下面放着相关的数据。每个 Bucket 下面一次只允许打开一个文件。打开的这个文件会一直保持打开的状态,一直等到追加到活跃文件超出阈值。文件超出阈值之后就关闭,然后新建一个新的继续添加。data 文件一旦关闭之后,文件就转换成为不活跃的数据文件。无法再往这个 data 文件上面追加数据。

img

状态为 active 的数据文件只做追加操作,这样连续的写入操作也不会明显增加磁盘的 IO 使用量。这种设计也极大的简化了数据的写入操作。

上面的图简单描述了 Bucket 内部文件的架构,每条数据里面包含TS(TimeStamp),Flag,Ver(Version),ValueHash,RecSize(单条记录的主要内容的大小),Value,crc(key,value,header 的 crc),ksz(Key Size),vsz(Value Size),pos(Position,这条记录在文件中的位置),Header。

当插入新数据的时候,直接在文件尾部添加这种结构的数据。删除操作是对原有的数据做更新操作,并将 Ver 绝对值+1,转变为负数。

在文件写入完成之后,需要更新内存里面的数据结构,也就是前面提到的 HTree,HTree 是一个 Hash Tree,结构如下

img

levels 表示真实的树状结构, leafs 是树的叶子,保存着真实的数据。

img

这种数据结构下读取一个值也非常简单,大多数情况下最多只需要一次 seek 。我们首先在 Htree 中通过 levels 找到 key 对应的 leafs , 然后通过 leafs 里面的报错的 Pos 来拿到文件编号(chunkID)以及 offset,这样就可以通过文件编号(chunkID)和 offset 来拿到保存的数据。在很多情况下文件系统的缓存会让这个读操作比预期的要快。

到这里关于 GoBeansDB wirte/delete/read 相关的操作都已经基本完成。但是仅仅这样还不能完备。

GC 操作

GoBeansDB 的模型非常简单,write/delete 操作都是在文件尾部追加新的数据,这样存在一个问题就是占用的磁盘空间比真实的数据要多。所以我们引入了 GC 机制来回收垃圾,释放内存与磁盘空间。

在 GoBeansDB 中,通过增量 GC 来减小 GC 的开销。xiufeng 通过分析 BeansDB 的日志,统计出一条新写入的数据,修改操作基本在写入之后的 7 天之内,所以我们保留 7 天之内的新数据不做 GC。然后在每天晚上,访问量较低的时候,分批做增量 GC。

GC 的过程是将 datafile 里面已经过时的数据清除掉,比如旧版本的value,已经标记为删除掉的key。

img

如 上图所示,GC 会读取状态为不活跃的数据文件,用其中存活的数据或者最新版本的数据生成一份新的数据文件,同时为这个新的数据文件创建一个 hint file。


ref

  1. https://blog.csdn.net/lemonk3664/article/details/9970293
  2. 作者的记录 比较bitcask https://www.douban.com/note/122507891/
  3. https://github.com/douban/beansdb 代码
  4. https://github.com/douban/gobeansdb/ 后面他们用go实现了
  5. https://zhuanlan.zhihu.com/p/53682577 提到了c版本的优化,位域之类的

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

pebblesdb


是pebblesdb的一些资料整理和总结

参考链接1 是论文

参考链接2是提到的一点介绍

而在 2017 年 SOSP 上发表的论文 PebblesDB 提出了另外一种思路。在传统 LSM-Tree 中,每一层由多个 SSTable 组成,每一个 SSTable 中保存了一组排好序 Key-Value,相同层的 SSTable 之间的 Key 没有重叠。当进行 Compaction 时,上层的 SSTable 需要与下层的 SSTable 进行合并,也就是将上层的 SSTable 和下层的 SSTable 读取到内存中,进行合并排序后,组成新的 SSTable,并写回到磁盘中。由于 Compaction 的过程中需要读取和写入下层的 SSTable,所以造成了读写放大,影响应能。

PebblesDB 将 LSM-Tree 和 Skip-List 数据结构进行结合。在 LSM-Tree 中每一层引入 Guard 概念。 每一层中包含多个 Guard,Guard 和 Guard 之间的 Key 的范围是有序的,且没有重叠,但 Guard 内部包含多个 SSTable,这些 SSTable 的 Key 的范围允许重叠。

当需要进行 Compaction 时,只需要将上层的 SSTable 读入内存,并按照下层的 Guard 将 SSTable 切分成多个新的 SSTable,并存放到下层对应的 Guard 中。在这个过程中不需要读取下层的 SSTable,也就在一定程度上避免了读写放大。作者将这种数据结构命名为 Fragemented Log-Structured Tree(FLSM)。PebblesDB 最多可以减低 6.7 倍的写放大,写入性能最多提升 105%。

和 WiscKey 类似,PebblesDB 也会多 Range Query 的性能造成影响。这是由于 Guard 内部的 SSTable 的 Key 存在重叠,所以在读取连续的 Key 时,需要同时读取 Guard 中所有的 SSTable,才能够获得正确的结果。

参考链接3pebblesdb的总结,博主写的不错

skiplist的访问选择是一种guard,将这种guard推广到lsm上,然后通过guard组织数据,允许小范围的重复,但是这种guard是很难判定的。所以借鉴意义不大,而且读数据性能肯定也会下降,insert等操作也会加剧。

参考链接3也是pebblesdb的总结,

提到一个观点

弱化全局有序,切分片段,这样实际上加重了有序对系统的影响

比如scan,读性能肯定会下降。

参考链接5 6 是代码

参考链接7是个ppt介绍。值得看一看。方便理解论文


ref

  1. http://www.cs.utexas.edu/~vijay/papers/sosp17-pebblesdb.pdf
  2. https://zhuanlan.zhihu.com/p/34455548
  3. https://zhuanlan.zhihu.com/p/46069535
  4. https://zhuanlan.zhihu.com/p/32225460
  5. https://github.com/utsaslab/pebblesdb
  6. https://github.com/xxks-kkk/HyperPebblesDB
  7. http://www.cs.utexas.edu/~vijay/papers/pebblesdb-sosp17-slides.pdf

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

Abseil Tip of the Week

本文是Abseil库 tip of the week的总结。不是翻译,有些点子还是不错的。


totw #1 string_view

厌烦了const char*到string之间的处理转换?你只是想用一下而已不需要构造一个拷贝?string_view就是为此而生的,它是一个视图,就是一个借用,也是类似go rust胖指针 slice之类的东西。内部有一个指针和一个长度

注意,string_view是没有\0的

totw #3 String Concatenation and operator+ vs. StrCat()

简单说,不要用string::operator +() 会有临时变量。absl::StrCat用来解决这个问题

totw #10 Splitting Strings, not Hairs

absl提供了string split相关函数

// Splits on commas. Stores in vector of string_view (no copies).
std::vector<absl::string_view> v = absl::StrSplit("a,b,c", ',');

// Splits on commas. Stores in vector of string (data copied once).
std::vector<std::string> v = absl::StrSplit("a,b,c", ',');

// Splits on literal string "=>" (not either of "=" or ">")
std::vector<absl::string_view> v = absl::StrSplit("a=>b=>c", "=>");

// Splits on any of the given characters (',' or ';')
using absl::ByAnyChar;
std::vector<std::string> v = absl::StrSplit("a,b;c", ByAnyChar(",;"));

// Stores in various containers (also works w/ absl::string_view)
std::set<std::string> s = absl::StrSplit("a,b,c", ',');
std::multiset<std::string> s = absl::StrSplit("a,b,c", ',');
std::list<std::string> li = absl::StrSplit("a,b,c", ',');

// Equiv. to the mythical SplitStringViewToDequeOfStringAllowEmpty()
std::deque<std::string> d = absl::StrSplit("a,b,c", ',');

// Yields "a"->"1", "b"->"2", "c"->"3"
std::map<std::string, std::string> m = absl::StrSplit("a,1,b,2,c,3", ',');

要是c++有python那种split就好了。(那种效率比较低)

totw #11 RVO 返回值优化。返回局部变量不必考虑多余的拷贝构造等。现代编译器默认功能
totw #42: Prefer Factory Functions to Initializer Methods

google是禁止使用异常的。如果初始化会失败,那就用工厂函数来搞,大概这样

class Foo {
 public:
  // Factory method: creates and returns a Foo.
  // May return null on failure.
  static std::unique_ptr<Foo> Create();

  // Foo is not copyable.
  Foo(const Foo&) = delete;
  Foo& operator=(const Foo&) = delete;

 private:
  // Clients can't invoke the constructor directly.
  Foo();
};

std::unique_ptr<Foo> Foo::Create() {
  // Note that since Foo's constructor is private, we have to use new.
  return absl::WrapUnique(new Foo());
}

这里注意,std::unique_ptr不能访问private构造函数,所以absl提供了一个wrapunique的一个wrapper

totw #45 不要用全局变量,尤其是库代码

(只能说尽量啦)

totw #88: Initialization: =, (), and {}

总结好了

  • 简单的构造逻辑,直接用=初始化基本类型(结合{}),结构体,以及拷贝构造

    int x = 2;
    std::string foo = "Hello World";
    std::vector<int> v = {1, 2, 3};
    std::unique_ptr<Matrix> matrix = NewMatrix(rows, cols);
    MyStruct x = {true, 5.0};
    MyProto copied_proto = original_proto;
    
    // Bad code
    int x{2};
    std::string foo{"Hello World"};
    std::vector<int> v{1, 2, 3};
    std::unique_ptr<Matrix> matrix{NewMatrix(rows, cols)};
    MyStruct x{true, 5.0};
    MyProto copied_proto{original_proto};
    
  • 用传统的构造函数语义()来调用复杂的构造语义

    Frobber frobber(size, &bazzer_to_duplicate);
    std::vector<double> fifty_pies(50, 3.14);
    // Bad code 
    // Could invoke an intializer list constructor, or a two-argument   constructor.
    Frobber frobber{size, &bazzer_to_duplicate};
    
    // Makes a vector of two doubles.
    std::vector<double> fifty_pies{50, 3.14};
    
  • {}不用=这种场景, 只用在上面这两种场景不能编译的情况下

    class Foo {
     public:
      Foo(int a, int b, int c) : array_{a, b, c} {}
      
     private:
      int array_[5];
      // Requires {}s because the constructor is marked explicit
      // and the type is non-copyable.
      EventManager em{EventManager::Options()};
    };
    
  • {}auto不要混用

    // Bad code
    auto x{1};
    auto y = {2}; // This is a std::initializer_list<int>!
    
totw #93: using absl::Span 也就是std::span ,container_view,更好用。也可以理解成flat pointer
totw #117: Copy Elision and Pass-by-value

就是一种吃掉拷贝的优化

// First constructor version
explicit Widget(const std::string& name) : name_(name) {}

// Second constructor version
explicit Widget(std::string name) : name_(std::move(name)) {}
totw #120: Return Values are Untouchable
MyStatus DoSomething() {
  MyStatus status;
  auto log_on_error = RunWhenOutOfScope([&status] {
    if (!status.ok()) LOG(ERROR) << status;
  });
  status = DoA();
  if (!status.ok()) return status;
  status = DoB();
  if (!status.ok()) return status;
  status = DoC();
  if (!status.ok()) return status;
  return status;
}

这段代码是有问题的,lambda中访问的status是在return执行完之后才会析构访问的,但是鉴于返回值优化,return没有执行,勉强正确,如果最后一行换成return MyStatus(); 这个lambda将永远错误的status,行为是未定义的 ,可能是错误的逻辑

解决办法,不用这种RAII访问返回值

totw #123: absl::optional and std::unique_ptr

这个表概括的很好了,absl::optional就是std::optional的替代版

  Bar absl::optional<Bar> std::unique_ptr<Bar>
Supports delayed construction  
Always safe to access    
Can transfer ownership of Bar    
Can store subclasses of Bar    
Movable If Bar is movable If Bar is movable
Copyable If Bar is copyable If Bar is copyable  
Friendly to CPU caches  
No heap allocation overhead  
Memory usage sizeof(Bar) sizeof(Bar) + sizeof(bool)2 sizeof(Bar*) when null, sizeof(Bar*) + sizeof(Bar) otherwise
Object lifetime Same as enclosing scope Restricted to enclosing scope Unrestricted
Call f(Bar*) f(&val_) f(&opt_.value()) or f(&*opt_) f(ptr_.get()) or f(&*ptr_)
Remove value N/A opt_.reset(); or opt_ = absl::nullopt; ptr_.reset(); or ptr_ = nullptr;
totw#126: make_unique is the new new
How Should We Choose Which to Use?
  1. By default, use absl::make_unique() (or std::make_shared() for the rare cases where shared ownership is appropriate) for dynamic allocation. For example, instead of: std::unique_ptr<T> bar(new T()); write auto bar = absl::make_unique<T>(); and instead of bar.reset(new T()); write bar = absl::make_unique<T>();
  2. In a factory function that uses a non-public constructor, return a std::unique_ptr<T> and use absl::WrapUnique(new T(...)) in the implementation.
  3. When dynamically allocating an object that requires brace initialization (typically a struct, an array, or a container), use absl::WrapUnique(new T{...}).
  4. When calling a legacy API that accepts ownership via a T*, either allocate the object in advance with absl::make_unique and call ptr.release() in the call, or use new directly in the function argument.
  5. When calling a legacy API that returns ownership via a T*, immediately construct a smart pointer with WrapUnique (unless you’re immediately passing the pointer to another legacy API that accepts ownership via a T*).
Summary

Prefer absl::make_unique() over absl::WrapUnique(), and prefer absl::WrapUnique() over raw new.

totw #131: Special Member Functions and = default

Prefer =default over writing an equivalent implementation by hand, even if that implementation is just {}

totw#134: make_unique and private constructors

和42条一样场景,make_unique不能直接创建private 构造函数,几个办法

声明成友元函数,或者用个wrapunique,或者替换成shared_ptr

totw #141: Beware Implicit Conversions to bool

bool转换的问题。属于老生常谈了。特定类型就需要实现safe bool,一般类型用option<T>包装一层 absl::optional<T>::has_value()判断

Tote #144 : Heterogeneous Lookup in Associative Containers

看代码

struct StringCmp {
  using is_transparent = void;
  bool operator()(absl::string_view a, absl::string_view b) const {
    return a < b;
  }
};

std::map<std::string, int, StringCmp> m = ...;
absl::string_view some_key = ...;
// The comparator `StringCmp` will accept any type that is implicitly
// convertible to `absl::string_view` and says so by declaring the
// `is_transparent` tag.
// We can pass `some_key` to `find()` without converting it first to
// `std::string`. In this case, that avoids the unnecessary memory allocation
// required to construct the `std::string` instance.
auto it = m.find(some_key);

省一个拷贝

再比如

struct ThreadCmp {
  using is_transparent = void;
  // Regular overload.
  bool operator()(const std::thread& a, const std::thread& b) const {
    return a.get_id() < b.get_id();
  }
  // Transparent overloads
  bool operator()(const std::thread& a, std::thread::id b) const {
    return a.get_id() < b;
  }
  bool operator()(std::thread::id a, const std::thread& b) const {
    return a < b.get_id();
  }
  bool operator()(std::thread::id a, std::thread::id b) const {
    return a < b;
  }
};

std::set<std::thread, ThreadCmp> threads = ...;
// Can't construct an instance of `std::thread` with the same id, just to do the lookup.
// But we can look up by id instead.
std::thread::id id = ...;
auto it = threads.find(id);
totw #149 Object Lifetimes vs. =delete

不只是构造函数析构函数可以标记为delete,普通成员函数也可以标记delete。这就引入了复杂性问题,是让类更完备还是更复杂

=delete for Lifetimes

假如你标记为delete就是为了限制参数的生存周期

class Request {
  ...

  // The provided Context must live as long as the current Request.
  void SetContext(const Context& context);
  void SetContext(Context&& context) = delete;

但是报错并不能告诉你正确的做法,只会告诉你因为什么错了

error: call to deleted function 'SetContext'

  SetContext(Context{});
  ^~~~~~~~~~

<source>:4:6: note: candidate function has been explicitly deleted

void SetContext(Context&& context) = delete;

你看了看报错,决定改一下,但是为什么这样改,不知道

这种设计会避免bug,但是也需要完善注释,不然不知所以

要是编译时报错能够定制错误信息,就牛逼了。

=delete for “Optimization”

这个场景是上面的反面,假如只接受右值,不接受拷贝,这样所有调用的时候都得显式加上std::move,显然更麻烦

这样做实际上是复杂化了,totw不建议使用,keep it simple

Tip of the Week #152 : AbslHashValue and You

把内部用的hash算法暴露出来了,可以这么用

struct Song {
  std::string name;
  std::string artist;
  absl::Duration duration;

  template <typename H>
  friend H AbslHashValue(H h, const Song& s) {
    return H::combine(std::move(h), s.name, s.artist, s.duration);
  }

  // operator == and != omitted for brevity.
};

Tip of the Week #161: Good Locals and Bad Locals

Good

auto& subsubmessage = *myproto.mutable_submessage()->mutable_subsubmessage();
subsubmessage.set_foo(21);
subsubmessage.set_bar(42);
subsubmessage.set_baz(63);

Bad

MyType value = SomeExpression(args);
return value;

Tip of the Week #171: Avoid Sentinel Values

返回值不清晰,建议用optional或者status

Tip of the Week #173: Wrapping Arguments in Option Structs

多参数构造抽象成option,不然难维护,构造指定不合理

Tip of the Week #175: Changes to Literal Constants in C++14 and C++17.

尽可能用1’000’000’000

以及chrono的ms min

Tip of the Week #176: Prefer Return Values to Output Parameters

尽可能用返回值。如果出参需要非常灵活,返回值控制不了,就用出参

Tip of the Week #181: Accessing the value of a StatusOr<T>

就是rocksdb status那种东西

// The same pattern used when handling a unique_ptr...
std::unique_ptr<Foo> foo = TryAllocateFoo();
if (foo != nullptr) {
  foo->DoBar();  // use the value object
}

// ...or an optional value...
absl::optional<Foo> foo = MaybeFindFoo();
if (foo.has_value()) {
  foo->DoBar();
}

// ...is also ideal for handling a StatusOr.
absl::StatusOr<Foo> foo = TryCreateFoo();
if (foo.ok()) {
  foo->DoBar();
}

表达能力比option和expect要好一些

此外,abseil还支持解析参数,支持gflags,可以说是把gflags迁移到这个库了,header only更友好一些


ref

  1. https://abseil.io/tips/

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

检测函数重载

本文是参考链接1 的翻译,感谢原作者。


Why

考虑一个简单的需求,Compute函数接口,有这么两个接口

// lib V1:
void Compute(int in, int& out) { }
// lib V2:
void Compute(double in, double& out) { }

假如我想用v2的接口但是用不到,可能就需要自己适配,如何灵活的检查这个接口呢?

首先想到的就是宏,判断接口使用的版本,区分出,然后自己适配接口。但是问题在于接口宏混乱以后无法灵活的适配

这时候就需要第三种方法,就是模板探测惯用法了

template <typename T, typename = void>
struct is_compute_available : std::false_type {};

template <typename T>
struct is_compute_available<T, 
           std::void_t<decltype(Compute(std::declval<T>(), 
                       std::declval<T&>())) >> : std::true_type {};

其中std::void_t吃掉decltye的类型。如果Compute对于指定的T没有匹配,整体就会退化到第一个匹配

关于std::void_t 参考链接2 3也可以看一下。有很多类似的用法。新时代的std::enable_if

上面的整体封装一下

// helper variable template
template< class T> inline constexpr bool is_compute_available_v = 
          is_compute_available<T>::value;

template <typename T>
void ComputeTest(T val)
{
    if constexpr (is_compute_available_v<T>)
    {
        T out { };
        Compute(val, out);
    }
    else
    {
        std::cout << "fallback...\n";
    }
}

回到这篇文章想要讨论的函数,std::from_chars.鉴于这些功能实现的还不完整,msvc全实现了,gcc和clang只实现了T=int

所以套用上面的解决方案,如果用宏,能区分gcc clang和msvc,但是具体到gcc和clang呢?

考虑feature test macros,但是这个只能确定有没有这个功能,不能确定这个功能支持啥类型

最终,使用模板探测惯用法

template <typename T, typename = void>
struct is_from_chars_convertible : false_type {};
template <typename T>
struct is_from_chars_convertible<T, 
                 void_t<decltype(from_chars(declval<const char*>(), declval<const char*>(), declval<T&>()))>> 
                 : true_type {};


template <typename T>
[[nodiscard]] std::optional<T> TryConvert(std::string_view sv) noexcept {
    T value{ };
    if constexpr (is_from_chars_convertible<T>::value) {
        const auto last = sv.data() + sv.size();
    const auto res = std::from_chars(sv.data(), last, value);
    if (res.ec == std::errc{} && res.ptr == last)
            return value;
    }
    else  {
        try {
            std::string str{ sv };
            size_t read = 0;
            if constexpr (std::is_same_v<T, double>)
                value = std::stod(str, &read);
            else if constexpr (std::is_same_v<T, float>)
                value = std::stof(str, &read);

            if (str.size() == read)
                return value;
        }
        catch (...) {  }
    }
}

不支持的类型放到try里用以前的api来搞。全文完。参考6中提到了作者提到的几个参考链接。非常棒。


ref

  1. https://www.bfilipek.com/2019/07/detect-overload-from-chars.html
  2. std::void_t https://zh.cppreference.com/w/cpp/types/void_t
  3. 这里有个介绍detection idiom的 写的不错。https://zhuanlan.zhihu.com/p/26155469
  4. std::from_charshttps://zh.cppreference.com/w/cpp/utility/from_chars
  5. std::is_detected 上面这套东西的封装 https://en.cppreference.com/w/cpp/experimental/is_detected
  6. 几个作者提到的网址
    1. https://stackoverflow.com/questions/51404763/c-compile-time-check-that-an-overloaded-function-can-be-called-with-a-certain
    2. https://stackoverflow.com/questions/257288/is-it-possible-to-write-a-template-to-check-for-a-functions-existence
    3. 又讲了遍函数探测方法,https://akrzemi1.wordpress.com/2014/06/26/clever-overloading/
    4. std::from_chars详细介绍,有了这套工具,可以放弃sprintf https://www.bfilipek.com/2018/12/fromchars.html
    5. 这里提到了expression SFINAE ,其实手法差不多,也是decltype和void_t std::declval https://blog.tartanllama.xyz/detection-idiom/ 里面的链接内容不错。有时间可以看看。

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

Writing a reverse proxy loadbalancer from the ground up in C


why

这篇文章是参考链接的翻译,简而言之,如何写一个反向代理


第一版,代码在这里 https://github.com/gpjt/rsp/blob/f214f5a75e112311b90c81ad823bf86c3900b03d/src/rsp.c

下面的介绍错误处理都去掉了

首先要明白反向代理干啥的,就是接收一个连接,转发给设定好的后端节点。代理/转发

那大概就有了框架

主程序负责接收,处理客户端连接 handle_client_connection

int main(int argc, char *argv[]) {
...
    if (argc != 4) {
        fprintf(stderr, 
                "Usage: %s <server_port> <backend_addr> <backend_port>\n", 
                argv[0]);
        exit(1);
    }
    server_port_str = argv[1];
    backend_addr = argv[2];
    backend_port_str = argv[3];
    server_socket_fd = socket(addr_iter->ai_family,
                                  addr_iter->ai_socktype,
                                  addr_iter->ai_protocol);
    bind(server_socket_fd, addr_iter->ai_addr, addr_iter->ai_addrlen)   
    listen(server_socket_fd, MAX_LISTEN_BACKLOG);

    while (1) {
        client_socket_fd = accept(server_socket_fd, NULL, NULL);


        handle_client_connection(client_socket_fd, backend_addr, backend_port_str);
    }

}

处理客户端连接呢,就是客户端来啥,我转发啥,后端节点来啥,原样转发给客户端fd

void handle_client_connection(int client_socket_fd, 
                              char *backend_host, 
                              char *backend_port_str) 
{
    backend_socket_fd = socket(addrs_iter->ai_family, 
                                   addrs_iter->ai_socktype,
                                   addrs_iter->ai_protocol);
    connect(backend_socket_fd, 
                    addrs_iter->ai_addr, 
                    addrs_iter->ai_addrlen) != -1) 
    bytes_read = read(client_socket_fd, buffer, BUFFER_SIZE);
    write(backend_socket_fd, buffer, bytes_read);

    while (bytes_read = read(backend_socket_fd, buffer, BUFFER_SIZE)) {
        write(client_socket_fd, buffer, bytes_read);
    }
}

这是简单的同步写法。当然nginx肯定要用epoll来尽量异步


ref

  1. http://www.gilesthomas.com/2013/08/writing-a-reverse-proxyloadbalancer-from-the-ground-up-in-c-part-1/
  2. http://www.gilesthomas.com/2013/09/writing-a-reverse-proxyloadbalancer-from-the-ground-up-in-c-part-2-handling-multiple-connections-with-epoll/
  3. 代码仓库 https://github.com/gpjt/rsp
  4. 这有个haproxy的代码解析。https://illx10000.github.io/2018/09/03/16.html

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

(转)Lockfree Hashtable

转自 http://spiritsaway.info/lockfree-hashtable.html

对于让hashtable变成无锁,主要要解决如下几个问题:

  1. hash表的核心就是探查方法,开链法在插入节点的时候会引入动态内存分配的问题,这个是在无锁里面很棘手的问题,所以没有采取开链法,同时二次探查在无锁和缓存上面很不友好,所以使用的就是线性探查。因此我们首先要使得线性探查无锁化。
  2. hash表插入的时候可能会导致过载。在STL的实现中是发现map内部装载率大于一定值时将map扩容。由于扩容的时候会出现迭代器失效的问题,所以这种方法在无锁的时候压根不可行。所以很多实现是直接开一个新的当前表大小的干净副本,通过指针将所有副本链接起来。查询和插入的时候需要遍历所有的副本

在preshing的一篇文章里面谈到了无锁线性扫描的实现,这里定义了一个基本的Entry:

struct Entry
{
    mint_atomic32_t key;
    mint_atomic32_t value;
};
Entry *m_entries;

在这个Entry里,我们规定如果key的值为0,则代表这个entry还没有被使用,所以插入的时候禁止传入为0的key

在此结构之下,定义的setItem操作如下:

void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
{
    for (uint32_t idx = 0;; idx++)
    {
        uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
        if ((prevKey == 0) || (prevKey == key))
        {
            mint_store_32_relaxed(&m_entries[idx].value, value);
            return;
        }
    }
}

类似的getItem的操作如下:

uint32_t ArrayOfItems::GetItem(uint32_t key)
{
    for (uint32_t idx = 0;; idx++)
    {
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey == key)
            return mint_load_32_relaxed(&m_entries[idx].value);
        if (probedKey == 0)
            return 0;          
    }
}

现在的疑问在于,这里的原子操作使用的都是relaxed语义,这个语义在x86上基本等于没有任何作用,如何在使得SetItem里的第8行能够被GetItem的第7行可见。事实上这压根做不到,因为一个线程在执行到SetItem的第8行之前被换出, 然后另外一个线程执行到了GetItem的第7行,这里读取的还是老的值。除了这种情况之外,还可能出现SetItem里的CAS操作并没有将数据更新的通知发放到其他的core上去,然而第8行的store操作已经被另外一个执行GetItem的线程可见的情况,此时GetItem会返回0。这两种情况都是合法的,因为在多线程中读取数据的时机是不确定的,因此读取老数据也是正常的。甚至可以说在没有通知机制的情况下,是不是最新根本没有意义。如果要实现publish-listen的机制,则需要在SetItem的时候将一个原子的bool变量设置为True,同时这个Store操作要使用Release语义,同时另外一个线程在CAS这个值的时候,要使用Acquire语义。

// Shared variables
char message[256];
ArrayOfItems collection;

void PublishMessage()
{
    // Write to shared memory non-atomically.
    strcpy(message, "I pity the fool!");

    // Release fence: The only way to safely pass non-atomic data between threads using Mintomic.
    mint_thread_fence_release();

    // Set a flag to indicate to other threads that the message is ready.
    collection.SetItem(SHARED_FLAG_KEY, 1)
}

这样才能使得数据更改完并通知完之后,另外一方能够得到最新数据。因此,当前设计的无锁hashtable在多线程上唯一做的事情就是防止了多个线程对同一个entry同时做SetItem操作。

preshing对SetItem有一个优化:减少不必要的CAS操作。在原来的实现中会遍历所有的元素去执行CAS操作,其实只有key == 0 or key == my_key的时候我们才需要去做CAS。所以这里的优化就是预先作一次load,发现可以去set的时候才去CAS

void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
{
    for (uint32_t idx = 0;; idx++)
    {
        // Load the key that was there.
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey != key)
        {
            // The entry was either free, or contains another key.
            if (probedKey != 0)
                continue;           // Usually, it contains another key. Keep probing.

            // The entry was free. Now let's try to take it using a CAS.
            uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
            if ((prevKey != 0) && (prevKey != key))
                continue;       // Another thread just stole it from underneath us.

            // Either we just added the key, or another thread did.
        }

        // Store the value in this array entry.
        mint_store_32_relaxed(&m_entries[idx].value, value);
        return;
    }
}

naive lockfree hashtable

在上面的lockfree linear scan的基础上,做一个lockfree hashtable还是比较简单的。这里定义了三个函数intergerHash, SetItem, GetItem

inline static uint32_t integerHash(uint32_t h)
{
    h ^= h >> 16;
    h *= 0x85ebca6b;
    h ^= h >> 13;
    h *= 0xc2b2ae35;
    h ^= h >> 16;
    return h;
}

这个hash函数的来源是MurmurHash3’s integer finalizer , 据说这样可以让每一位都起到差不多的作用。

void HashTable1::SetItem(uint32_t key, uint32_t value)
{
    for (uint32_t idx = integerHash(key);; idx++)
    {
        idx &= m_arraySize - 1;

        // Load the key that was there.
        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey != key)
        {
            // The entry was either free, or contains another key.
            if (probedKey != 0)
                continue;           // Usually, it contains another key. Keep probing.

            // The entry was free. Now let's try to take it using a CAS.
            uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
            if ((prevKey != 0) && (prevKey != key))
                continue;       // Another thread just stole it from underneath us.

            // Either we just added the key, or another thread did.
        }

        // Store the value in this array entry.
        mint_store_32_relaxed(&m_entries[idx].value, value);
        return;
    }
}

这里的SetItem正确工作有一个前提:整个hashtable不是满的,是满的一定会出错。

GetItem还是老样子:

uint32_t HashTable1::GetItem(uint32_t key)
{
    for (uint32_t idx = integerHash(key);; idx++)
    {
        idx &= m_arraySize - 1;

        uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
        if (probedKey == key)
            return mint_load_32_relaxed(&m_entries[idx].value);
        if (probedKey == 0)
            return 0;          
    }
}

所谓的publish函数也是一样:

// Shared variables
char message[256];
HashTable1 collection;

void PublishMessage()
{
    // Write to shared memory non-atomically.
    strcpy(message, "I pity the fool!");

    // Release fence: The only way to safely pass non-atomic data between threads using Mintomic.
    mint_thread_fence_release();

    // Set a flag to indicate to other threads that the message is ready.
    collection.SetItem(SHARED_FLAG_KEY, 1)
}

至于delete操作,我们可以规定value是某个值的时候代表当前entry是被删除的,这样就可以用SetItem(key, 0)来模拟delete操作了。

what about full

上面的无锁hashtable有一个致命缺陷,他没有处理整个hashtable满了的情况。为了处理满的情况,我们需要设置最大探查数量为当前hashtable的容量, 同时维护多个独立的hashtable,用一个无锁的链表将所有的hashtable的指针串联起来。如果最大探查数量达到上限,且当前hashtable没有下一个hashtable的指针,且则先建立一个新的hashtable,并挂载到无锁链表上,回到了有下一个hashtable的情况,然后对下一个hashtable做递归遍历。

这样做的确解决了扩容的问题,但是会出现性能下降的问题。后面过来的key在查询的时候会变得越来越慢,因为经常需要查询多层的hashtable。为了避免这个问题,出现了一种新的设计:每次添加一层hashtable的时候,都将容量扩大一倍,然后将上一个hashtable的内容拷贝到新的hashtable里。这个新的hashtable也叫做main_hashtable,由于我们无法在无锁的情况下把整个hashtable拷贝过去,所以采用lazy的方式,这个方式的步骤如下:

  1. 维持一个hashtable的无锁链表,链表的头节点就叫做main_hashtable,所有的hashtable通过一个next指针相连;
  2. 插入的时候如果发现当前的main_hashtable的装载因子(这个装载因子考虑了所有的key)已经大于0.5,则新建一个hashtable,然后插入到新的hashtable里;
  3. 扩容的时候设置一个标志位,表明当前正在扩容,避免多个线程同时扩容,浪费资源,扩容期间所有等待扩容的线程都忙等待,扩容完成之后清除正在扩容的标记;
  4. 新建立的hashtable是空的,大小为当前main_hashtable的两倍,每次新加入一个hashtable的时候都插入到头部,使之成为新的main_hashtable
  5. 查询的时候,根据这些next指针一直查询,直到最后一个hashtable
  6. 如果查询返回结果的时候发现返回结果的那个hashtable并不是main_hashtable,则把当前的key value对插入到main_hashtable里,这就是核心的lazy copy的过程

这个lazy的过程代码如下:

auto id = details::thread_id();
        auto hashedId = details::hash_thread_id(id);

        auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
        for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
            // Look for the id in this hash
            auto index = hashedId;
            while (true) {      // Not an infinite loop because at least one slot is free in the hash table
                index &= hash->capacity - 1;

                auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
                if (probedKey == id) {
                    // Found it! If we had to search several hashes deep, though, we should lazily add it
                    // to the current main hash table to avoid the extended search next time.
                    // Note there's guaranteed to be room in the current hash table since every subsequent
                    // table implicitly reserves space for all previous tables (there's only one
                    // implicitProducerHashCount).
                    auto value = hash->entries[index].value;
                    if (hash != mainHash) {
                        index = hashedId;
                        while (true) {
                            index &= mainHash->capacity - 1;
                            probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
                            auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
                            auto reusable = details::invalid_thread_id2;
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed)) ||
                                (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire))) {
#else
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed))) {
#endif
                                mainHash->entries[index].value = value;
                                break;
                            }
                            ++index;
                        }
                    }

                    return value;
                }
                if (probedKey == details::invalid_thread_id) {
                    break;      // Not in this hash table
                }
                ++index;
            }
        }

现在的核心则转移到了扩容的过程,扩容涉及到了动态内存分配和初始内容的填充,是比较耗的操作,所以要避免多个线程在争抢扩容的控制权。在moodycamel的设计里,是这样处理扩容的。

// Insert!
            auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
            while (true)
            {
                if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire))
                {
                    // We've acquired the resize lock, try to allocate a bigger hash table.
                    // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
                    // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
                    // locked block).
                    mainHash = implicitProducerHash.load(std::memory_order_acquire);
                    if (newCount >= (mainHash->capacity >> 1))
                    {
                        auto newCapacity = mainHash->capacity << 1;
                        while (newCount >= (newCapacity >> 1))
                        {
                            newCapacity <<= 1;
                        }
                        auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
                        if (raw == nullptr)
                        {
                            // Allocation failed
                            implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                            implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
                            return nullptr;
                        }

                        auto newHash = new (raw) ImplicitProducerHash;
                        newHash->capacity = newCapacity;
                        newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
                        for (size_t i = 0; i != newCapacity; ++i)
                        {
                            new (newHash->entries + i) ImplicitProducerKVP;
                            newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
                        }
                        newHash->prev = mainHash;
                        implicitProducerHash.store(newHash, std::memory_order_release);
                        implicitProducerHashResizeInProgress.clear(std::memory_order_release);
                        mainHash = newHash;
                    }
                    else
                    {
                        implicitProducerHashResizeInProgress.clear(std::memory_order_release);
                    }
                }

                // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
                // to finish being allocated by another thread (and if we just finished allocating above, the condition will
                // always be true)
                if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2))
                {
                    bool recycled;
                    auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
                    if (producer == nullptr)
                    {
                        implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                        return nullptr;
                    }
                    if (recycled)
                    {
                        implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
                    }

#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
                    producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
                    producer->threadExitListener.userData = producer;
                    details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
#endif

                    auto index = hashedId;
                    while (true)
                    {
                        index &= mainHash->capacity - 1;
                        auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);

                        auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
                        auto reusable = details::invalid_thread_id2;
                        if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)) ||
                            (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire)))
                        {
#else
                        if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)))
                        {
#endif
                            mainHash->entries[index].value = producer;
                            break;
                        }
                        ++index;
                    }
                    return producer;
                }

                // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
                // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
                // we try to allocate ourselves).
                mainHash = implicitProducerHash.load(std::memory_order_acquire);
            }

上面的第五行就是抢夺控制权的过程,进入扩容的条件就是当前装载因子已经大于0.5,且扩容标志位没有设置。

newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)

扩容的时候设置一个标志位,相当于一个锁,扩容完成之后清空标志位。但是由于线程换出的存在,这个标志位可能导致其他线程永远抢不到控制权,进入无限死循环。所以这里又对没有抢夺到扩容控制权的线程,还有另外的一个判断,如果装载因子小于0.75,则直接尝试插入,不用管。

newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)

这个分支里还做了一些事情,就是当真正的获得了一个implicit producer之后,注册一个线程退出的callback,这个callback会把当前producer销毁,并在hashtable里删除对应的key

最后剩下的一种情况就是:拿不到扩容所有权,且当前装载因子已经上了0.75,此时除了死循环没有办法,约等于死锁。这种情况很罕见,但是仍然可以构造出来:正在扩容的线程被换出。不知原作者如何处理这个情况。

todo

这有几个hashtable实现,上亿qps,无锁

  • https://github.com/PeterRK/fastCHD
  • https://github.com/PeterRK/SSHT
  • https://github.com/PeterRK/estuary

这里挖个坑,后面分析一下 http://cmph.sourceforge.net/chd.html


看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢! 你的评论非常重要!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

关于lock-free wait-free的一些整理

抄自这里https://zhuanlan.zhihu.com/p/55583561


Herb Sutter在DDJ pillars of concurrency一文中抛出并行编程的三个简单论点,一是分离任务,使用更细粒度的锁或者无锁编程;二是尽量通过并行任务使用CPU资源,以提高系统吞吐量及扩展性;三是保证对共享资源访问的一致性。第三点已经被atomicmutexlockcondition_variable解决了,第一点和第二点则可以归结为如何对任务进行粒度划分并投递到任务的执行单元中去调度执行。任务划分依赖于各种不同业务的理解,例如网络和渲染,很难抽取出其共性。而任务的调度执行则是一种通用的结构,可以分为四个部分:

  1. 任务的封装 在c++11里提供了三种最基本的任务封装形式future, promise,packaged_task
  2. 任务的结构 在c++17里补全了任务结构控制,主要是提供了then, when_all, when_any这三个用来关联多个future的函数
  3. 任务的执行 任务执行者基本都是使用线程池,每个线程不断的尝试获取一个任务并执行,类似于一个while循环
  4. 任务的调度 这部分负责了任务的投递和分发,他在多线程之间维持了一个任务容器集合,提供的接口主要包括接受新任务、取出一个任务和判断容器是否为空

在整个并发任务系统中,在任务容器集合之上的任务调度结构是核心。现在使用的最广泛的任务容器是concurrent queue,下面我们来对concurrent queue的多种实现来做一下分析。

naive concurrent queue

queue是一个维持先进先出(FIFO)队列的结构,在很多STL的实现之中采取的是多块连续内存的双向链表来维持其先进先出结构。为了在多线程中使用std::queue,最简单的方法就是使用锁来解决data race,同时修改原始提供的接口,使得这个数据结构不会被用错。

#include <queue>
#include <atomic>
#include <mutex>
template<typename T>
class concurrent_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    concurrent_queue()
    {
        // pass
    }
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(data));
        data_cond.notify_one();
    }
    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{
            return !data_queue.empty();
        })
        value = std::move(data_queue.front());
        data_queue.pop();
    }
    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{
            return !data_queue.empty();
        })
        auto res = std::make_shared<T>(std::move(data_queue.front()));
        data_queue.pop();
        return res;
    }
    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
        {
            return false;
        }
        value = std::move(data_queue.front());
        data_queue.pop()
        return True;
    }
    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
        {
            return std::shared_ptr<T>();
        }
        auto res =  std::make_shared<T>(std::move(data_queue.front()));
        data_queue.pop()
        return res;
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
}

上述代码的主要考量如下:

  1. 由于多线程的干扰,常规的查询empty之后再pop的处理流程是错误的,这两个操作必须封装在一起,所以这里提供了try_popwait_and_pop这两个接口来获取数据。
  2. 为了避免在数据拷贝的时候出现异常导致的数据不一致,返回数据的时候采取两套方案,一个是调用者提供引用,一个是返回一个shared_ptr。这样就保证了如果在拷贝构造front的时候出了trace也能维持整个queue的结构完整。

这个concurrent_queue并不是很高效,主要的drawback包括如下三个方面:

  1. 每次访问接口的时候都需要调用锁,而且是同一个锁
  2. 在尝试获得数据的时候失败会触发yield,从而导致线程切换
  3. 维持了一个全局的先进先出序列,在多消费者的情况下这个强制唯一序是没有意义的,在单消费者的情况下也很少会有这种要求。

对应的常见解决方案:

  1. 使用无锁的方式去代替mutex,同时由于无锁最大的问题是内存分配,有些并发队列通过预先设置最大大小的方式来预分配内存,从而绕过了著名的ABA问题
  2. 使用双链表结构去维持队列,而不是使用queue,这样我们就可以分离头节点和尾节点的访问;如果是固定大小的队列则可以采取ring buffer的形式来维持队列结构。
  3. 当尝试获得数据失败的时候,先轮询一段时间,如果这段时间内还是没有数据,则调用yield,也就是对condition_variable封装了一层。
  4. 每个生产者维护其投递队列,每个消费者根据对各个生产者任务队列的优先级去遍历获取任务。

事实上,在这是一个并发queue的时候,首先要明确如下几个问题:

  1. 这个queue的生产者和消费者各有多少个,常见的有单生产者单消费者(SPSC)、单生产者多消费者(SPMC)、多生产者单消费者(MPSC)和多生产者多消费者(MPMC)

  2. 这个queue的最大元素大小是否确定,如果可以确定最大大小,则动态内存分配就可以避免,直接采取环形队列当作容器即可;如果无法确定最大大小,则只能通过动态内存分配的形式去处理,这里的难度加大了很多,因为要处理多线程的内存分配。

下面我们来看一下现在主流的几种concurrent_queue的实现,来分析一下他们对concurrent_queue的实现优化。

intel spsc concurrent queue

intel官方网站上提供了一个SPSC queue,但是这个queue没有限制最大元素大小,如果临时内存不够的话会调用new,可能会触发锁。

// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
  // hardware fence is implicit on x86
  T v = *const_cast<T const volatile*>(addr);
  __memory_barrier(); // compiler fence
  return v;
}

// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v)
{
  // hardware fence is implicit on x86
  __memory_barrier(); // compiler fence
  *const_cast<T volatile*>(addr) = v;
}

// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;

// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
  spsc_queue()
  {
      node* n = new node;
      n->next_ = 0;
      tail_ = head_ = first_= tail_copy_ = n;
  }

  ~spsc_queue()
  {
      node* n = first_;
      do
      {
          node* next = n->next_;
          delete n;
          n = next;
      }
      while (n);
  }

  void enqueue(T v)
  {
      node* n = alloc_node();
      n->next_ = 0;
      n->value_ = v;
      store_release(&head_->next_, n);
      head_ = n;
  }

  // returns 'false' if queue is empty
  bool dequeue(T& v)
  {
      if (load_consume(&tail_->next_))
      {
          v = tail_->next_->value_;
          store_release(&tail_, tail_->next_);
          return true;
      }
      else
      {
          return false;
      }
  }

private:
  // internal node structure
  struct node
  {
      node* next_;
      T value_;
  };

  // consumer part
  // accessed mainly by consumer, infrequently be producer
  node* tail_; // tail of the queue

  // delimiter between consumer part and producer part,
  // so that they situated on different cache lines
  char cache_line_pad_ [cache_line_size];

  // producer part
  // accessed only by producer
  node* head_; // head of the queue
  node* first_; // last unused node (tail of node cache)
  node* tail_copy_; // helper (points somewhere between first_ and tail_)

  node* alloc_node()
  {
      // first tries to allocate node from internal node cache,
      // if attempt fails, allocates node via ::operator new()

      if (first_ != tail_copy_)
      {
          node* n = first_;
          first_ = first_->next_;
          return n;
      }
      tail_copy_ = load_consume(&tail_);
      if (first_ != tail_copy_)
      {
          node* n = first_;
          first_ = first_->next_;
          return n;
      }
      node* n = new node;
      return n;
  }

  spsc_queue(spsc_queue const&);
  spsc_queue& operator = (spsc_queue const&);
};

// usage example
int main()
{
  spsc_queue<int> q;
  q.enqueue(1);
  q.enqueue(2);
  int v;
  bool b = q.dequeue(v);
  b = q.dequeue(v);
  q.enqueue(3);
  q.enqueue(4);
  b = q.dequeue(v);
  b = q.dequeue(v);
  b = q.dequeue(v);
}

这个代码的实现很简单粗暴,核心是一个单链表,对于单链表的任何操作都是wait_free的,这个链表有四个指针:

  1. tail指针,指向下一个应该dequeue的位置
  2. head指针,指向最新的一个enqueue的位置
  3. first_指针,指向第一个可以回收node的位置
  4. tail_copy指针,指向一个安全的可以回收的nodenext位置,他不一定指向tail

在这个链表里,指针之间有如下关系:$first \le tail_copy \le tail \le head$ 。这里做的核心优化就是按需去更新tail_copy,没必要每次更新tail的时候都把tail_copy更新一遍,只有发现first == tail_copy的时候才去更新一下。每个操作都没有使用到CAS,因此都是wait_free的,当然那一行调用了new的除外。

这里为了避免False Sharing使用了padding。由于读线程只需要更改tail,所以只需要在tail之后加个padding即可。

facebook spsc concurrent queue

facebook提供了固定大小的SPSC queue,代码在follyProducerConsumerQueue里。

/*
 * Copyright 2017 Facebook, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// @author Bo Hu (bhu@fb.com)
// @author Jordan DeLong (delong.j@fb.com)

#pragma once

#include <atomic>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>

#include <folly/concurrency/CacheLocality.h>

namespace folly {

/*
 * ProducerConsumerQueue is a one producer and one consumer queue
 * without locks.
 */
template<class T>
struct ProducerConsumerQueue {
  typedef T value_type;

  ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
  ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;

  // size must be >= 2.
  //
  // Also, note that the number of usable slots in the queue at any
  // given time is actually (size-1), so if you start with an empty queue,
  // isFull() will return true after size-1 insertions.
  explicit ProducerConsumerQueue(uint32_t size)
    : size_(size)
    , records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
    , readIndex_(0)
    , writeIndex_(0)
  {
    assert(size >= 2);
    if (!records_) {
      throw std::bad_alloc();
    }
  }

  ~ProducerConsumerQueue() {
    // We need to destruct anything that may still exist in our queue.
    // (No real synchronization needed at destructor time: only one
    // thread can be doing this.)
    if (!std::is_trivially_destructible<T>::value) {
      size_t readIndex = readIndex_;
      size_t endIndex = writeIndex_;
      while (readIndex != endIndex) {
        records_[readIndex].~T();
        if (++readIndex == size_) {
          readIndex = 0;
        }
      }
    }

    std::free(records_);
  }

  template<class ...Args>
  bool write(Args&&... recordArgs) {
    auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
    auto nextRecord = currentWrite + 1;
    if (nextRecord == size_) {
      nextRecord = 0;
    }
    if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
      new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
      writeIndex_.store(nextRecord, std::memory_order_release);
      return true;
    }

    // queue is full
    return false;
  }

  // move (or copy) the value at the front of the queue to given variable
  bool read(T& record) {
    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
    if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
      // queue is empty
      return false;
    }

    auto nextRecord = currentRead + 1;
    if (nextRecord == size_) {
      nextRecord = 0;
    }
    record = std::move(records_[currentRead]);
    records_[currentRead].~T();
    readIndex_.store(nextRecord, std::memory_order_release);
    return true;
  }

  // pointer to the value at the front of the queue (for use in-place) or
  // nullptr if empty.
  T* frontPtr() {
    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
    if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
      // queue is empty
      return nullptr;
    }
    return &records_[currentRead];
  }

  // queue must not be empty
  void popFront() {
    auto const currentRead = readIndex_.load(std::memory_order_relaxed);
    assert(currentRead != writeIndex_.load(std::memory_order_acquire));

    auto nextRecord = currentRead + 1;
    if (nextRecord == size_) {
      nextRecord = 0;
    }
    records_[currentRead].~T();
    readIndex_.store(nextRecord, std::memory_order_release);
  }

  bool isEmpty() const {
    return readIndex_.load(std::memory_order_acquire) ==
        writeIndex_.load(std::memory_order_acquire);
  }

  bool isFull() const {
    auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
    if (nextRecord == size_) {
      nextRecord = 0;
    }
    if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
      return false;
    }
    // queue is full
    return true;
  }

  // * If called by consumer, then true size may be more (because producer may
  //   be adding items concurrently).
  // * If called by producer, then true size may be less (because consumer may
  //   be removing items concurrently).
  // * It is undefined to call this from any other thread.
  size_t sizeGuess() const {
    int ret = writeIndex_.load(std::memory_order_acquire) -
        readIndex_.load(std::memory_order_acquire);
    if (ret < 0) {
      ret += size_;
    }
    return ret;
  }

private:
 char pad0_[CacheLocality::kFalseSharingRange];
 const uint32_t size_;
 T* const records_;

 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> readIndex_;
 FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> writeIndex_;

 char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)];
};

}

这里就是使用环形队列来作为容器,双指针来作为头尾,读线程读取readIndex直接采用relaxed,写线程读取writeIndex的时候也是采取relaxed,因为这两个变量只会在对应的线程内修改,可以认为是对应线程的私有变量,如果要读取另外一个线程的变量则需要采取acquire,当然前提是修改的时候使用了release。为了避免False Sharing这里也使用了padding,只不过是用宏做的。

其实这里也可以做一点优化,就像前面intel的延迟处理tail_copy一样,首次读取另外一个线程变量的时候先用relaxed,如果发现不能操作了,则再使用acquire

总的来说,这个无锁spsc queue也是wait_free的。

moodycamel spsc concurrent queue

moody camel 在上面的基础上做了一些改进:在支持无大小限制的情况下,将动态内存分配的需求降得很低,同时支持了容量的动态增长。其容器结构是两层的queue of queue,第一层是循环链表,第二层是循环队列。第一层循环链表的控制基本等价于intelspsc里的代码,而第二层的循环队列的控制基本等价于folly的代码。当enqueue的时候,发现没有空闲内存的时候会调用malloc,不过这种动态内存分配比起intel的每个新node都分配来说简单多了,总的来说还是比较符合wait_free的。这个的代码我就不分析了,直接贴作者的解释吧。

# Enqueue
If room in tail block, add to tail
Else check next block
    If next block is not the head block, enqueue on next block
    Else create a new block and enqueue there
    Advance tail to the block we just enqueued to

# Dequeue
Remember where the tail block is
If the front block has an element in it, dequeue it
Else
    If front block was the tail block when we entered the function, return false
    Else advance to next block and dequeue the item there

naive spmc concurrent queue

在这前面介绍的spsc并发队列的基础上,我们可以比较容易的构建出一个spmc的并发队列,而构造一个mpsc的并发队列则难很多。其原因主要是在enqueue的时候,可能会涉及到动态内存分配,如果有好几个线程都抢着进行动态内存分配的话,就会出现malloc的锁征用。而多个线程抢占dequeue的时候,只需要采取CAS来保持tail的更新即可,虽说这个不是waitfree的,但是lockfree还是可以基本保证的。

boost mpmc concurrent queue

boost concurrent queue通过模板参数的方式来支持固定大小的队列和不定大小的队列。

如果是固定大小队列,则会使用一个带dummy headring buffer来存储内容,同时使用一个头节点索引和一个尾节点索引来标记队列的头尾位置。为了一次性的修改头尾节点索引,这里将队列大小的上限设置为了$2^{16} - 2$ ,这样两个索引就可以合并为一个int32 来处理,修改的时候可以使用compare_exchange_来同时修改。如果在支持int64类型的compare_exchange_操作的平台,队列大小的上限可以放到$2^{32} -2$ ,同时两个索引会被压缩为一个int64来做同时修改。

如果是不定大小的队列,则会使用链表的形式来维持队列结构, 代码见下。

struct BOOST_LOCKFREE_CACHELINE_ALIGNMENT node
 {
   typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle;
   typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;

   node(T const & v, handle_type null_handle):
   data(v)//, next(tagged_node_handle(0, 0))
   {
     /* increment tag to avoid ABA problem */
     tagged_node_handle old_next = next.load(memory_order_relaxed);
     tagged_node_handle new_next (null_handle, old_next.get_next_tag());
     next.store(new_next, memory_order_release);
   }

   node (handle_type null_handle):
   next(tagged_node_handle(null_handle, 0))
   {}

   node(void)
   {}

   atomic<tagged_node_handle> next;
   T data;
 };

这里比较有意思的就是第九行的注释:对指针的tag位置进行自增来避免ABA问题。这里的next指针是一个tagged_pointer,其分配位置是内存对齐的,对齐的大小由BOOST_LOCKFREE_CACHELINE_BYTES定义,在WIN平台下,这个宏定义如下:

#define BOOST_LOCKFREE_CACHELINE_BYTES 64
#ifdef _MSC_VER
#define BOOST_LOCKFREE_CACHELINE_ALIGNMENT __declspec(align(BOOST_LOCKFREE_CACHELINE_BYTES))

当这个指针是64字节对齐时,最底的6位是没有意义的,所以这6位我们可以用来存储额外的数据,这种指针就叫做tagged_pointer,在llvm里这个指针结构也很常见。

boost lockfree queue里,数据成员定义如下:

typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args;
static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity;
static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node
static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value;
static const bool node_based = !(has_capacity || fixed_sized);
static const bool compile_time_sized = has_capacity;
typedef typename detail::extract_allocator<bound_args, node>::type node_allocator;
typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t;
typedef typename pool_t::tagged_node_handle tagged_node_handle;
typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;

atomic<tagged_node_handle> head_;
static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
char padding1[padding_size];
atomic<tagged_node_handle> tail_;
char padding2[padding_size];
pool_t pool; //代表node的pool 可以当作内存分配器

因为atomic<T*>内部只包含一个T*作为成员变量,所以atomic<T*>T*的内存布局是一样的,所以这里的padding_size才会这样计算出来。这里的padding的意义在于让poll的开始地址是BOOST_LOCKFREE_CACHELINE_BYTES对齐的,同时这里分为了两个padding而不是一个padding主要是考虑到将tail head分离在两个cache_line上,避免不同线程之间的缓存竞争。

现在我们来看这个lockfree queue提供的接口。

首先查看empty

/** Check if the queue is empty
*
* \return true, if the queue is empty, false otherwise
* \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this
*       value in program logic.
* */
bool empty(void)
{
return pool.get_handle(head_.load()) == pool.get_handle(tail_.load());
}

注释里写的很清楚了,这个函数的返回值是不准确的,因为在没有锁的情况下无法同时获得head tail的准确值。

现在来看push,这里分为了两个接口push bounded_push,区分在于如果内存池已经用完,第一个push在当前队列是大小固定的情况下会返回false,不固定的情况下会向操作系统尝试申请更多的内存并返回;而第二个bounded_push则直接返回false

/** Pushes object t to the queue.
*
* \post object will be pushed to the queue, if internal node can be allocated
* \returns true, if the push operation is successful.
*
* \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
*                    from the OS. This may not be lock-free.
* */
bool push(T const & t)
{
    return do_push<false>(t);
}

/** Pushes object t to the queue.
*
* \post object will be pushed to the queue, if internal node can be allocated
* \returns true, if the push operation is successful.
*
* \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
* \throws if memory allocator throws
* */
bool bounded_push(T const & t)
{
    return do_push<true>(t);
}

这两个函数都调用了do_push,这个函数的定义如下:

template <bool Bounded>
    bool do_push(T const & t)
    {
        using detail::likely;

        node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
        handle_type node_handle = pool.get_handle(n);

        if (n == NULL)
            return false;

        for (;;) {
            tagged_node_handle tail = tail_.load(memory_order_acquire);
            node * tail_node = pool.get_pointer(tail);
            tagged_node_handle next = tail_node->next.load(memory_order_acquire);
            node * next_ptr = pool.get_pointer(next);

            tagged_node_handle tail2 = tail_.load(memory_order_acquire);
            if (likely(tail == tail2)) {
                if (next_ptr == 0) {
                    tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
                    if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
                        tagged_node_handle new_tail(node_handle, tail.get_next_tag());
                        tail_.compare_exchange_strong(tail, new_tail);
                        return true;
                    }
                }
                else {
                    tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
                    tail_.compare_exchange_strong(tail, new_tail);
                }
            }
        }
    }

这里比较难理解的一点就是tail tail2,以及最后30行的compare_exchange_strong。这里在19行使用判断的意义是避免在内部做无用功,虽然不使用19行的判断, tail改变之后,20行所在分支的检测都会fail掉,对正确性没影响,对性能上来说提升很大。在一个完整的成功push流程中有两个cas操作,我们需要担心的是在两个cas操作之间线程被换出之后会出现何种结果,也就是在24行之前被换出。此时老的tailnext已经被修正为了新数据,而新tail却没有更新。在下一个线程进来的时候会发现tail->next != 0, 因此会进28号的分支,在此分支之内会尝试将tail->next更新为tail,这样就避免了数据更新到一半的尴尬局面。

对于pop则只有一个函数:

/** Pops object from queue.
     *
     * \pre type U must be constructible by T and copyable, or T must be convertible to U
     * \post if pop operation is successful, object will be copied to ret.
     * \returns true, if the pop operation is successful, false if queue was empty.
     *
     * \note Thread-safe and non-blocking
     * */
    template <typename U>
    bool pop (U & ret)
    {
        using detail::likely;
        for (;;) {
            tagged_node_handle head = head_.load(memory_order_acquire);
            node * head_ptr = pool.get_pointer(head);

            tagged_node_handle tail = tail_.load(memory_order_acquire);
            tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
            node * next_ptr = pool.get_pointer(next);

            tagged_node_handle head2 = head_.load(memory_order_acquire);
            if (likely(head == head2)) {
                if (pool.get_handle(head) == pool.get_handle(tail)) {
                    if (next_ptr == 0)
                        return false;

                    tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
                    tail_.compare_exchange_strong(tail, new_tail);

                } else {
                    if (next_ptr == 0)
                        /* this check is not part of the original algorithm as published by michael and scott
                         *
                         * however we reuse the tagged_ptr part for the freelist and clear the next part during node
                         * allocation. we can observe a null-pointer here.
                         * */
                        continue;
                    detail::copy_payload(next_ptr->data, ret);

                    tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
                    if (head_.compare_exchange_weak(head, new_head)) {
                        pool.template destruct<true>(head);
                        return true;
                    }
                }
            }
        }
    }

这里就简单多了,成功的pop只需要一个compare_exchange_weak即可,所以就不需要担心数据更改到一半的问题,这里的28行处理的还是tail数据更新到一半的问题。

这里比较有意思的一点就是42行的.template,这个叫做template disambiguator, 其作用就是通知编译器destruct<true>是一个模板,而不是destruct < true

总的来说, boost lockfree queue的注意点完全在lock free上,并没有采取每个生产者单独一个queue的方式来解决争用,虽然我们可以在lockfree queue的基础上做一个这样的东西。

intel tbb concurrent queue

其实这个的总体实现与boost类似。占坑,以后填。粗看起来,这个东西的实现很具有STL的风格。

moodycamel concurrent queue

这个concurrent queue的实现被很多项目使用过, 值得重点分析。这个实现的突出之处在于,每个生产者都维持自己的专属queue,而不同的消费者会以不同的顺序去访问各个生产者的queue,直到遇到一个不为空的queue。简而言之,他所实现的MPMC(multiple producer multiple consumer)的队列建立在了SPMC的多线程队列的基础上。这个SPMC的实现是lockfree的,同时还增加了bulk操作。下面来慢慢介绍这个的设计。

首先就是在构建消费者的时候,尽可能的让消费者与生产者均衡绑定,内部实现是通过使用一个token来维持消费者与生产者之间的亲和性。其实最简单的亲和性分配的方法就是每个消费者分配一个生产者的编号,dequeue的时候采取轮询的方式,每次开始轮询的时候都以上次dequeue成功的生产者queue开始。

处理完了多生产者多消费者之间的映射,现在剩下的内容就是如何更高效的处理单生产者多消费者。moodycamel这里的主要改进就是单个queue的存储结构,这里采取的是两层的循环队列,第一层循环队列存储的是第二层循环队列的指针。一个队列只需要维护四个索引,考虑到原子性修改可以把消费者的两个索引合并为一个uint64或者uint32t,因为只有消费者会发生数据竞争,为了方便比较,也顺便把生产者的两个索引合并为一个uint64t or uint32t,这样就可以直接使用整数比较了。在enqueue的时候,数据复制完成之后,直接对生产者的索引自增即可。而dequeue的时候则没这么容易,此时首先自增消费者索引,然后判断当前消费者索引是否已经越过生产者索引,如果越过了,则对则对另外一个overcommit计数器进行自增,三个计数器合用才能获得真正的容量。

这里使用环形缓冲来扩容而不是采取列表来扩容,主要是因为连续的空间操作可以支持批量的enqueuedequeue操作,直接预先占据一些索引就行了。


ref

  1. http://spiritsaway.info/concurrent-queue.html 正文转载自这里
  2. https://github.com/chaoran/fast-wait-free-queue 这是个benchmark,需要测一下提到的队列实现
  3. 上面的框架作者的论文,一个wait-free实现 http://chaoran.me/assets/pdf/wfq-ppopp16.pdf
  4. 偶尔看到的一个SPSC 队列实现,十分清晰 https://github.com/rigtorp/SPSCQueue
  5. https://github.com/cameron314/concurrentqueue 这个实现就比较早了,实现的比较重
    1. 介绍博客 http://moodycamel.com/blog/2013/a-fast-lock-free-queue-for-c++
    2. http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
    3. http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
    4. 拓展阅读http://moodycamel.com/blog/2014/solving-the-aba-problem-for-lock-free-free-lists
    5. 作者的bench小工具https://github.com/cameron314/microbench
  6. https://github.com/cameron314/readerwriterqueue上面大神的SPSC队列实现
  7. https://rethinkdb.com/blog/lock-free-vs-wait-free-concurrency/ 7.看到一个更厉害的实现 https://max0x7ba.github.io/atomic_queue/html/benchmarks.html 更快 https://github.com/max0x7ba/atomic_queue

contact

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

Conflict Free Replicated Data Types 无冲突复制数据类型 CRDTs


  • 基于状态:(英文:state-based),即将各个节点之间的CRDT数据直接进行合并,所有节点都能最终合并到同一个状态,数据合并的顺序不会影响到最终的结果。
  • 基于操作:(英文:operation-based,可以简写为op-based)。将每一次对数据的操作通知给其他节点。只要节点知道了对数据的所有操作(收到操作的顺序可以是任意的),就能合并到同一个状态。

实际上,基于状态基于操作的CRDT已经在数学上被证明可以相互转换。

具体概念看参考链接2 ,图非常清晰

下面介绍几个比较有意思的CRDT类型。

G-Counter (Grow-only Counter)

这是一个只增不减的计数器。对于N个节点,每个节点上维护一个长度为N的向量$V={P_0, P_1, P2, …, P{n-1}}。。P_m表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器P_m := P_m + 1。当要统计整个集群的计数器总数时,只需要对向量。当要统计整个集群的计数器总数时,只需要对向量V$中的所有元素求和即可。

举个例子,我们有A, B, C三个节点上分别部署了同一个G-Counter。初始状态如下:

A: {A: 0, B: 0, C: 0} = 0
B: {A: 0, B: 0, C: 0} = 0
C: {A: 0, B: 0, C: 0} = 0

每个节点都维护着其他节点的计数器状态,初始状态下,所有计数器都为0。现在我们假设有三个客户端a,b,c,a和b先后在节点A上增加了一次计数器,c在节点B上增加了一次计数器:

A: {A: 2, B: 0, C: 0} = 2
B: {A: 0, B: 1, C: 0} = 1
C: {A: 0, B: 0, C: 0} = 0

此时如果分别向A, B, C查询当前计数器的值,得到的结果分别是{2, 1, 0}。而实际上一共有3次增加计数器的操作,因此全局计数器的正确值应该为3,此时系统内状态是不一致的。不过没关系,我们追求的是最终一致性。假设经过一段时间,B向A发起了合并的请求:

A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3

经过合并后,A和B的计数器状态合并,现在从A和B读取到的计数器的值变为3。接下来C和A进行合并:

A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3

现在节点ABC都达到了相同的状态,从任意一个节点获取到的计数器值都为3。也就是说3个节点达成了最终一致性。

我们可以用以下伪代码描述G-Counter的逻辑:

payload integer[n] P
	initial [0, 0, 0, ..., 0]
update increment()
	let g = myId()
	P[g] := P[g] + 1 // 只更新当前节点对应的计数器
query value() : integer v
	let v = P[0] + P[1] + ... + P[n-1] // 将每个节点的计数器进行累加
merge (X, Y) : payload Z
	for i = 0, 1, ... , n-1:
		Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器

G-Counter使用max()操作来进行各个状态的合并,我们知道函数max满足可交换性,结合性,幂等性,即:

  • 可交换性: max(X,Y)=max(Y,X)
  • 结合性: max(max(X,Y),Z)=max(X,max(Y,Z))
  • 幂等性: max(X,X)=X

所以G-Counter可以在分布式系统中使用,并且可以无冲突合并。

PN-Counter (Positive-Negative-Counter)

G-Counter有一个限制,就是计数器只能增加,不能减少。不过我们可以通过使用两个G-Counter来实现一个既能增加也能减少计数器(PN-Counter)。简单来说,就是用一个G-Counter来记录所有累加结果,另一个G-Counter来记录累减结果,需要查询当前计数器时,只需要计算两个G-Counter的差即可。

payload integer[n] P, integer[n] N
	initial [0, 0, ..., 0], [0, 0, ..., 0]
update increment()
	let g = myId()
	P[g] := P[g] + 1 // 只更新当前节点对应的计数器
update decrement()
	let g = myId()
	N[g] := N[g] + 1
query value() : integer v
	let v = sum(P) - sum(N) // 用P向量的和减去N向量的和
merge (X, Y) : payload Z
	for i = 0, 1, ... , n-1:
		Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器
		Z.N[i] = max(N.P[i], Y.N[i])

Registers

register有assign()及value()两种操作

  • Last Write Wins -register(LWW-Register)

给每个assign操作添加unique ids,比如timestamps或者vector clock,使用max函数进行merge

  • Multi-valued -register(MV-Register)

类似G-Counter,每次assign都会新增一个版本,使用max函数进行merge

Sets

  • Grow-only set(G-Set)

使用union操作进行merge

  • Two-phase set(2P-Set)

使用两个G-Set来实现,一个addSet用于添加,一个removeSet用于移除

  • Last write wins set(LWW-element Set)

类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了timestamp信息,且timestamp较高的add及remove优先

  • Observed-remove set(OR-Set)

类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了tag信息,对于同一个tag的操作add优先于remove

其他数据类型

Array

关于Array有Replicated Growable Array(RGA),支持addRight(v, a)操作

Graph

Graph可以基于Sets结构实现,不过需要处理并发的addEdge(u, v)、removeVertex(u)操作

Map

Map需要处理并发的put、rmv操作


ref

  1. https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type

  2. http://galudisu.info/2018/06/06/akka/ddata/Akka-Distributed-Data-Deep-Dive/

  3. http://liyu1981.github.io/what-is-CRDT/

  4. https://lfwen.site/2018/06/09/crdt-counter/

  5. 详细介绍了各种CRDT实现https://juejin.im/post/5cd25e886fb9a0322758d23f

  6. 论文 https://hal.inria.fr/file/index/docid/555588/filename/techreport.pdf

  7. 一个系统的阅读导航http://christophermeiklejohn.com/crdt/2014/07/22/readings-in-crdts.html

    1. 博主的另外两篇导航也很不错
      1. http://christophermeiklejohn.com/distributed/systems/2013/07/12/readings-in-distributed-systems.html
      2. http://christophermeiklejohn.com/linear/logic/2014/01/04/readings-in-linear-logic.html

contact

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

最近遇到的几个打包问题


这活真恶心。瞎忙还没意义。

编译libhdfs提示找不到xml2

错误提示

CMake Error at cmake-3.10.3/share/cmake-3.10/Modules/FindPackageHandleStandardArgs.cmake:137 (message):  
 Could NOT find LibXml2 (missing: LIBXML2_LIBRARY LIBXML2_INCLUDE_DIR)

用的cmake版本是3.10.

编译机是docker容器。本地测试好使

试着安装库依赖
yum install -y libxml2-devel

依旧不好使。我在其他环境的docker机器上测试cmake 3 cmake2都没问题

试着看cmakeerr log

看到了错误日志 发现 dladdr 连接错误,加上链接ldl修了半天才发现方向不对

手动指定 LIBXML2_LIBRARY LIBXML2_INCLUDE_DIR目录到bootstrap,cmake命令家伙是哪个

-DLIBXML2_INCLUDE_DIR=/usr/include/libxml2/ -DLIBXML2_LIBRARY=/usr/lib64/

,但是编译不过

回到开头

安装的日志是这样的

--> Running transaction check  
---> Package libxml2-devel.x86_64 0:2.9.1-6.3 will be installed  
--> Processing Dependency: libxml2 = 2.9.1-6.3 for package: libxml2-devel-2.9.1-6.3.x86_64  
--> Processing Dependency: xz-devel for package: libxml2-devel-2.9.1-6.3.x86_64  
--> Running transaction check  
---> Package libxml2.x86_64 0:2.9.1-6.el7_2.3 will be updated  
--> Processing Dependency: libxml2 = 2.9.1-6.el7_2.3 for package: libxml2-python-2.9.1-6.el7_2.3.x86_64  
---> Package libxml2.x86_64 0:2.9.1-6.3 will be an update  
---> Package xz-devel.x86_64 0:5.2.2-1.el7 will be installed  
--> Running transaction check  
---> Package libxml2-python.x86_64 0:2.9.1-6.el7_2.3 will be updated  
---> Package libxml2-python.x86_64 0:2.9.1-6.3.h3 will be an update  
--> Processing Dependency: libxml2 = 2.9.1-6.3.h3 for package: libxml2-python-2.9.1-6.3.h3.x86_64  
--> Running transaction check  
---> Package libxml2.x86_64 0:2.9.1-6.el7_2.3 will be updated  
---> Package libxml2.x86_64 0:2.9.1-6.el7_2.3 will be updated  
---> Package libxml2.x86_64 0:2.9.1-6.3 will be an update  
--> Processing Dependency: libxml2 = 2.9.1-6.3 for package: libxml2-devel-2.9.1-6.3.x86_64  
---> Package libxml2.x86_64 0:2.9.1-6.3.h3 will be an update  
--> Finished Dependency Resolution  
 You could try using --skip-broken to work around the problem  
 You could try running: rpm -Va --nofiles --nodigest  

卧槽,没装成功。妈的

加上–skip-broken还是不行

错误冲突实际上是这样的

Error: Package: libxml2-devel-2.9.1-6.3.x86_64 (current)  
          Requires: libxml2 = 2.9.1-6.3  
          Installed: libxml2-2.9.1-6.3.h3.x86_64 (@stable)  
               libxml2 = 2.9.1-6.3.h3  
          Available: libxml2-2.9.1-6.el7_2.3.i686 (base)  
               libxml2 = 2.9.1-6.el7_2.3  
          Available: libxml2-2.9.1-6.3.x86_64 (current)  
               libxml2 = 2.9.1-6.3 

试着删掉libxml2提示 yum is protecte,换另外的删除方法

rpm -e --nodeps libxml2

但是会报错

 There was a problem importing one of the Python modules  
 required to run yum. The error leading to this problem was:  
   
    libxml2.so.2: cannot open shared object file: No such file or directory  
   
 Please install a package which provides this module, or  
 verify that the module is installed correctly.  
   
 It's possible that the above module doesn't match the  
 current version of Python, which is:  
 2.7.5 (default, Nov  6 2016, 00:28:07)   
 [GCC 4.8.5 20150623 (Red Hat 4.8.5-11)]  
   
 If you cannot solve this problem yourself, please go to   
 the yum faq at:

yum完全就不工作了。

唉。我最后自带了一个xml源码包自己编。

RPM打包

我只是想把编好的二进制和相关lib库打包后放到rpm里。网上搜了一天。各种从头开始。各种源码开始编。一点意义都没有。

rpmbuild打包就有三种模式,ba bb bs。还有各种奇怪的宏。还有各种make install流程。很容易就陷入这些没用的细节中。我根本都不需要的。最终我使用了bs。就当我打包源码包了。

另外rpmbuild默认目录是root,还需要指定

rpmbuild --define '_topdir ./rpmbuild' -bs rpmbuild/SPECS/rpm.spec

另外,解压rpm

rpm2cpio xxx.rpm | cpio -div

又遇到了个问题,要求目录。只能从二进制打包,又浪费一上午

照着这个文档看了半天 http://kuanghy.github.io/2015/11/13/rpmbuild

我的步骤里总会删掉buildroot,原来还需要install流程里加二进制。

最后发一个我的模板

Name:  xx
Version:	1.0
Release:	1%{?dist}
Summary: xx binary
#Group:		
License: WTF	
 
%install
mkdir -p  %{buildroot}/dir/dir/somedir/
cp -a %(pwd)/your.tar.gz %{buildroot}/dir/dir/somedir/
echo "install done"

%files
%defattr (-,root,root,-)
/dir/

然后打包,bb是二进制打包,之前用的bs 源码打包

rpmbuild --define '_topdir ./rpmbuild' --buildroot=$(pwd)/rpmbuild/BUILDROOT -bb rpm.spec

ref

  1. 找不到iddr https://github.com/ContinuumIO/libhdfs3-downstream/issues/9 严重误导我。没有没啥影响。

  2. https://www.mail-archive.com/dev@hawq.incubator.apache.org/msg04104.html 严重误导

  3. https://stackoverflow.com/questions/12993460/why-am-i-getting-undefined-reference-to-dladdr-even-with-ldl-for-this-simpl 严重误导。不是同一个场景。

  4. https://github.com/libical/libical/issues/248 这个链接让我误以为装了glibc2-devel就能找到xml2了。没仔细读就用,误导

  5. https://gitlab.kitware.com/cmake/cmake/issues/18078 这个给了我一点思路,我可以定义这两个变量

  6. https://trac.macports.org/ticket/55386 误导

  7. https://stackoverflow.com/questions/15799047/trying-to-remove-yum-which-is-protected-in-centos 删除包

  8. https://serverfault.com/questions/742160/conflicts-in-files-when-installing-libxml2

  9. https://www.centos.org/forums/viewtopic.php?t=68202 这个和我遇到的问题差不多,但是是arm机器

  10. https://forums.cpanel.net/threads/requires-libxml2-update-error.636661/ 真正的原因,但没说解决办法

  11. https://stackoverflow.com/questions/416983/why-is-topdir-set-to-its-default-value-when-rpmbuild-called-from-tcl rpmbuild更改目录的办法

  12. 网上一大堆rpm详解封装。都是翻译的外国的文章。说不到重点。比如

    1. https://blog.csdn.net/Nedved_L/article/details/78548101
    2. https://blog.csdn.net/u012373815/article/details/73257754
    3. https://www.iarno.cn/2019/04/22/RPM%E6%89%93%E5%8C%85/ 这个是写得好的。不过我也没用上
    4. https://stackoverflow.com/questions/42773687/creating-rpm-spec-file-from-compiled-binary-files 没啥用
  • https://stackoverflow.com/questions/39743808/cannot-make-rpm-from-spec-file-file-not-found-errors 这个有点用

contact

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

^