• 基于状态:(英文:state-based),即将各个节点之间的CRDT数据直接进行合并,所有节点都能最终合并到同一个状态,数据合并的顺序不会影响到最终的结果。
  • 基于操作:(英文:operation-based,可以简写为op-based)。将每一次对数据的操作通知给其他节点。只要节点知道了对数据的所有操作(收到操作的顺序可以是任意的),就能合并到同一个状态。

实际上,基于状态基于操作的CRDT已经在数学上被证明可以相互转换。

具体概念看参考链接2 ,图非常清晰

下面介绍几个比较有意思的CRDT类型。

G-Counter (Grow-only Counter)

这是一个只增不减的计数器。对于N个节点,每个节点上维护一个长度为N的向量$V={P_0, P_1, P2, …, P{n-1}}。。P_m表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器P_m := P_m + 1。当要统计整个集群的计数器总数时,只需要对向量。当要统计整个集群的计数器总数时,只需要对向量V$中的所有元素求和即可。

举个例子,我们有A, B, C三个节点上分别部署了同一个G-Counter。初始状态如下:

A: {A: 0, B: 0, C: 0} = 0
B: {A: 0, B: 0, C: 0} = 0
C: {A: 0, B: 0, C: 0} = 0

每个节点都维护着其他节点的计数器状态,初始状态下,所有计数器都为0。现在我们假设有三个客户端a,b,c,a和b先后在节点A上增加了一次计数器,c在节点B上增加了一次计数器:

A: {A: 2, B: 0, C: 0} = 2
B: {A: 0, B: 1, C: 0} = 1
C: {A: 0, B: 0, C: 0} = 0

此时如果分别向A, B, C查询当前计数器的值,得到的结果分别是{2, 1, 0}。而实际上一共有3次增加计数器的操作,因此全局计数器的正确值应该为3,此时系统内状态是不一致的。不过没关系,我们追求的是最终一致性。假设经过一段时间,B向A发起了合并的请求:

A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3

经过合并后,A和B的计数器状态合并,现在从A和B读取到的计数器的值变为3。接下来C和A进行合并:

A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3

现在节点ABC都达到了相同的状态,从任意一个节点获取到的计数器值都为3。也就是说3个节点达成了最终一致性。

我们可以用以下伪代码描述G-Counter的逻辑:

payload integer[n] P
	initial [0, 0, 0, ..., 0]
update increment()
	let g = myId()
	P[g] := P[g] + 1 // 只更新当前节点对应的计数器
query value() : integer v
	let v = P[0] + P[1] + ... + P[n-1] // 将每个节点的计数器进行累加
merge (X, Y) : payload Z
	for i = 0, 1, ... , n-1:
		Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器

G-Counter使用max()操作来进行各个状态的合并,我们知道函数max满足可交换性,结合性,幂等性,即:

  • 可交换性: max(X,Y)=max(Y,X)
  • 结合性: max(max(X,Y),Z)=max(X,max(Y,Z))
  • 幂等性: max(X,X)=X

所以G-Counter可以在分布式系统中使用,并且可以无冲突合并。

PN-Counter (Positive-Negative-Counter)

G-Counter有一个限制,就是计数器只能增加,不能减少。不过我们可以通过使用两个G-Counter来实现一个既能增加也能减少计数器(PN-Counter)。简单来说,就是用一个G-Counter来记录所有累加结果,另一个G-Counter来记录累减结果,需要查询当前计数器时,只需要计算两个G-Counter的差即可。

payload integer[n] P, integer[n] N
	initial [0, 0, ..., 0], [0, 0, ..., 0]
update increment()
	let g = myId()
	P[g] := P[g] + 1 // 只更新当前节点对应的计数器
update decrement()
	let g = myId()
	N[g] := N[g] + 1
query value() : integer v
	let v = sum(P) - sum(N) // 用P向量的和减去N向量的和
merge (X, Y) : payload Z
	for i = 0, 1, ... , n-1:
		Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器
		Z.N[i] = max(N.P[i], Y.N[i])

Registers

register有assign()及value()两种操作

  • Last Write Wins -register(LWW-Register)

给每个assign操作添加unique ids,比如timestamps或者vector clock,使用max函数进行merge

  • Multi-valued -register(MV-Register)

类似G-Counter,每次assign都会新增一个版本,使用max函数进行merge

Sets

  • Grow-only set(G-Set)

使用union操作进行merge

  • Two-phase set(2P-Set)

使用两个G-Set来实现,一个addSet用于添加,一个removeSet用于移除

  • Last write wins set(LWW-element Set)

类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了timestamp信息,且timestamp较高的add及remove优先

  • Observed-remove set(OR-Set)

类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了tag信息,对于同一个tag的操作add优先于remove

其他数据类型

Array

关于Array有Replicated Growable Array(RGA),支持addRight(v, a)操作

Graph

Graph可以基于Sets结构实现,不过需要处理并发的addEdge(u, v)、removeVertex(u)操作

Map

Map需要处理并发的put、rmv操作


ref

  1. https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type

  2. http://galudisu.info/2018/06/06/akka/ddata/Akka-Distributed-Data-Deep-Dive/

  3. http://liyu1981.github.io/what-is-CRDT/

  4. https://lfwen.site/2018/06/09/crdt-counter/

  5. 详细介绍了各种CRDT实现https://juejin.im/post/5cd25e886fb9a0322758d23f

  6. 论文 https://hal.inria.fr/file/index/docid/555588/filename/techreport.pdf

  7. 一个系统的阅读导航http://christophermeiklejohn.com/crdt/2014/07/22/readings-in-crdts.html

    1. 博主的另外两篇导航也很不错
      1. http://christophermeiklejohn.com/distributed/systems/2013/07/12/readings-in-distributed-systems.html
      2. http://christophermeiklejohn.com/linear/logic/2014/01/04/readings-in-linear-logic.html

contact