Conflict Free Replicated Data Types 无冲突复制数据类型 CRDTs
- 基于状态:(英文:state-based),即将各个节点之间的CRDT数据直接进行合并,所有节点都能最终合并到同一个状态,数据合并的顺序不会影响到最终的结果。
- 基于操作:(英文:operation-based,可以简写为op-based)。将每一次对数据的操作通知给其他节点。只要节点知道了对数据的所有操作(收到操作的顺序可以是任意的),就能合并到同一个状态。
实际上,基于状态和基于操作的CRDT已经在数学上被证明可以相互转换。
具体概念看参考链接2 ,图非常清晰
下面介绍几个比较有意思的CRDT类型。
G-Counter (Grow-only Counter)
这是一个只增不减的计数器。对于N个节点,每个节点上维护一个长度为N的向量$V={P_0, P_1, P2, …, P{n-1}}。。P_m表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器P_m := P_m + 1。当要统计整个集群的计数器总数时,只需要对向量。当要统计整个集群的计数器总数时,只需要对向量V$中的所有元素求和即可。
举个例子,我们有A, B, C三个节点上分别部署了同一个G-Counter。初始状态如下:
A: {A: 0, B: 0, C: 0} = 0
B: {A: 0, B: 0, C: 0} = 0
C: {A: 0, B: 0, C: 0} = 0
每个节点都维护着其他节点的计数器状态,初始状态下,所有计数器都为0。现在我们假设有三个客户端a,b,c,a和b先后在节点A上增加了一次计数器,c在节点B上增加了一次计数器:
A: {A: 2, B: 0, C: 0} = 2
B: {A: 0, B: 1, C: 0} = 1
C: {A: 0, B: 0, C: 0} = 0
此时如果分别向A, B, C查询当前计数器的值,得到的结果分别是{2, 1, 0}。而实际上一共有3次增加计数器的操作,因此全局计数器的正确值应该为3,此时系统内状态是不一致的。不过没关系,我们追求的是最终一致性。假设经过一段时间,B向A发起了合并的请求:
A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3
经过合并后,A和B的计数器状态合并,现在从A和B读取到的计数器的值变为3。接下来C和A进行合并:
A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3
现在节点ABC都达到了相同的状态,从任意一个节点获取到的计数器值都为3。也就是说3个节点达成了最终一致性。
我们可以用以下伪代码描述G-Counter的逻辑:
payload integer[n] P
initial [0, 0, 0, ..., 0]
update increment()
let g = myId()
P[g] := P[g] + 1 // 只更新当前节点对应的计数器
query value() : integer v
let v = P[0] + P[1] + ... + P[n-1] // 将每个节点的计数器进行累加
merge (X, Y) : payload Z
for i = 0, 1, ... , n-1:
Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器
G-Counter使用max()操作来进行各个状态的合并,我们知道函数max满足可交换性,结合性,幂等性,即:
- 可交换性: max(X,Y)=max(Y,X)
- 结合性: max(max(X,Y),Z)=max(X,max(Y,Z))
- 幂等性: max(X,X)=X
所以G-Counter可以在分布式系统中使用,并且可以无冲突合并。
PN-Counter (Positive-Negative-Counter)
G-Counter有一个限制,就是计数器只能增加,不能减少。不过我们可以通过使用两个G-Counter来实现一个既能增加也能减少计数器(PN-Counter)。简单来说,就是用一个G-Counter来记录所有累加结果,另一个G-Counter来记录累减结果,需要查询当前计数器时,只需要计算两个G-Counter的差即可。
payload integer[n] P, integer[n] N
initial [0, 0, ..., 0], [0, 0, ..., 0]
update increment()
let g = myId()
P[g] := P[g] + 1 // 只更新当前节点对应的计数器
update decrement()
let g = myId()
N[g] := N[g] + 1
query value() : integer v
let v = sum(P) - sum(N) // 用P向量的和减去N向量的和
merge (X, Y) : payload Z
for i = 0, 1, ... , n-1:
Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器
Z.N[i] = max(N.P[i], Y.N[i])
Registers
register有assign()及value()两种操作
- Last Write Wins -register(
LWW-Register
)
给每个assign操作添加unique ids,比如timestamps或者vector clock,使用max函数进行merge
- Multi-valued -register(
MV-Register
)
类似G-Counter,每次assign都会新增一个版本,使用max函数进行merge
Sets
- Grow-only set(
G-Set
)
使用union操作进行merge
- Two-phase set(
2P-Set
)
使用两个G-Set来实现,一个addSet用于添加,一个removeSet用于移除
- Last write wins set(
LWW-element Set
)
类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了timestamp信息,且timestamp较高的add及remove优先
- Observed-remove set(
OR-Set
)
类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了tag信息,对于同一个tag的操作add优先于remove
其他数据类型
Array
关于Array有Replicated Growable Array(
RGA
),支持addRight(v, a)操作
Graph
Graph可以基于Sets结构实现,不过需要处理并发的addEdge(u, v)、removeVertex(u)操作
Map
Map需要处理并发的put、rmv操作
ref
-
https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type
-
http://galudisu.info/2018/06/06/akka/ddata/Akka-Distributed-Data-Deep-Dive/
-
http://liyu1981.github.io/what-is-CRDT/
-
https://lfwen.site/2018/06/09/crdt-counter/
-
详细介绍了各种CRDT实现https://juejin.im/post/5cd25e886fb9a0322758d23f
-
论文 https://hal.inria.fr/file/index/docid/555588/filename/techreport.pdf
-
一个系统的阅读导航http://christophermeiklejohn.com/crdt/2014/07/22/readings-in-crdts.html
- 博主的另外两篇导航也很不错
- http://christophermeiklejohn.com/distributed/systems/2013/07/12/readings-in-distributed-systems.html
- http://christophermeiklejohn.com/linear/logic/2014/01/04/readings-in-linear-logic.html
- 博主的另外两篇导航也很不错
- https://interjectedfuture.com/crdts-turned-inside-out/
- https://interjectedfuture.com/trade-offs-between-different-crdts/