Naiad A Timely Dataflow System
这里的epoch技术是不是也是bw-tree的那个技术?一招鲜吃遍天??
流式/增量
- 支持反馈的有结构循环
- 有状态的数据流节点,能够在没有全局协调(global coordination)的情况下消费和生产记录(record)
- 当达到某个循环次数或者遍历完所有数据之后通知所有节点
Timely Dataflow
Timely dataflow基于有向图:
- 顶点是有状态的,可沿着边发送/接收有逻辑时间的消息
- 图可包含环,且可嵌套,时间戳反映不同的输入时期(epoch)和循环迭代
生成的模型:
- 支持并发执行不同时期和循环迭代的数据
- 当所有的消息都被传输后(根据某个时间戳),顶点支持显式的通知
2.1. 图结构
2.1.1. 图结构的顶点
-
输入顶点:接收外部生产者的消息
- 输入的消息会标记epoch,外部生产者可通知顶点该epoch不会有消息进来,且当所有epoch消息都不会进来时可以“关闭”顶点
-
输出顶点:将消息输出给外部的消费者
- 输出的消息也会标记epoch,顶点可通知外部消费者该epoch的消息不会出来,且当所有epoch消息都不会出来时可以“关闭”顶点
2.1.2. 嵌套循环上下文
包含3个系统相关的顶点
- 入口顶点:进入该上下文必须经过该顶点
- 出口顶点:离开该上下文必须经过该顶点
- 反馈顶点:每个循环至少有1个,它不会嵌套在任何内部循环上下文中
2.1.3. 时间戳
消息时间戳t
可定义为下面2个元素的元组:
- e* ∈ℕ:Epoch号
c*⃗ =⟨*c*1,*c*2,…,*c*k*⟩∈ℕ*k* :Loop counter向量,每个维度对应对应循环上下文的循环次数,可用于区分循环上下文,且可跟踪循环进度
上述3个节点处理时间戳的结果如下(对于第k个循环上下文):
顶点 | 输入时间戳 | 输出时间戳 |
---|---|---|
入口顶点 | (e,⟨c1,c2,…,c**k⟩) | (e,⟨c1,c2,…,c**k,0⟩) |
出口顶点 | (e,⟨c1,c2,…,ck*,*ck+1⟩) | (e,⟨c1,c2,…,c**k⟩) |
反馈顶点 | (e,⟨c1,c2,…,c**k⟩) | (e,⟨c1,c2,…,c**k+1⟩) |
时间戳的比较:当且仅当e1≤e2
且c1→≤c2→(根据字典序比较)时,t1≤t2
2.2. 顶点计算
每个顶点v
实现下面2个回调:
v.OnRecv(edge, msg, timestamp)
:收到来自edge
的消息v.OnNotify(timestamp)
:没有更多<= timestamp
的消息到来
回调可被自定义(重写)
回调上下文中,可能会调用下面2个系统提供的方法:
this.SendBy(edge, msg, timestamp)
:向一条边发送消息this.NotifyAt(timestamp)
:在timestamp
时候,进行调用通知
调用关系是:
u.SendBy(e, m, t) -> v.OnRecv(e, m, t)
,e
为u->v
的边v.NotifyAt(t) -> v.OnNotify(t)
- 每个顶点的回调都会被排队,但必须保证:
- 当
t' <= t
时,v.OnNotify(t)
发生在v.OnRecv(e, m, t')
后,因为前者是作为t
之前所有消息都已收到,不会再来的信号。重写的回调也得满足这个要求。
- 当
2.3. 达成Timely Dataflow
本节主要讲系统如何推断具有给定时间戳的未来消息的不可能性(即推断“通知”的安全性),并提供单线程的实现。
2.3.1. Pointstamp
任何时刻,未来消息的时间戳受到当前未处理事件(消息和通知请求)以及图结构的限制:
- 消息只会沿边传输,且时间戳仅会被2.1.3.小节的三种顶点修改。
- 由于事件的发送不能产生时间回溯,因此可以计算事件产生的消息时间戳的下界。将这种算法应用到未处理事件上,则可判断顶点通知是否正确
这里定义Pointstamp,对应每个事件:(t∈Timestamp,l∈Edge*∪*Vertex)
- 对于
v.SendBy(e, m, t)
,对应的是(t, e)
- 对于
v.NotifyAt(t)
,对应的是(t, v)
一些结论和定义:
- 当且仅当满足下面条件时,(t1,l1)
could-result-in(t2,l2):
- 存在一条路径ψ=⟨l1,…,l2⟩
,根据这条路径,时间戳ψ(t1)≤t2,左边表示t1
- 仅被路径上的3类节点修改的时间戳
Path Summary:l1
到l2的时间戳变化的函数
- 可以保证若两位置存在多条路径,它们的summary必然不一样,其中一条总会比另一台更早产生调整后的时间戳
- 可以找到最小的path summary,将路径记为ψ[l1,l2] 因此,检测could-result-in,只需检测$\psil_1, l_2 \le t_2$即可
2.3.2. 单线程实现
调度器维护一组活跃的pointstamp,每个元素包含2个计数器:
- Occurrence count(
OC
):未完成的事件发生个数 - Precursor count(
PC
):could-result-in顺序下,前面的pointstamp个数
当顶点产生和撤销事件时,OC
根据下面更新:
v.SendBy(e,m,t)
前,v.NotifyAt(t)
前:OC[(t,e/v)] += 1
v.OnRecv(e,m,t)
后,v.OnNotify(t)
后:OC[(t,e/v)] -= 1
当pointstamp活跃时,PC
根据下面初始化:
- 置为已有could-result-in的活跃pointstamp个数
- 同时,增加当前pointstamp之后的pointstamp
PC
值
当pointstamp不活跃时:
-
OC
值为0,移除活跃pointstamp集合 -
递减之后的pointstamp
PC
值当
PC
值为0,则该pointstamp为frontier,调度器可将任何通知发给它
系统初始化时,在下面位置初始化一个pointstamp:
- 位置为每个输入顶点
- 时间戳为:第一个epoch,以及全为0的loop count
OC
为1,PC
为0
当输入节点的输入完毕时:
- 若epoch
e
完毕,则创建e+1
的pointstamp,删除原有的pointstamp - 通知下游epoch
e
的消息已经输入完毕 - 若输入节点关闭时,删除当前位置的所有pointstamp,允许输入到下游的事件最终可从计算中消失
有个rust实现 https://github.com/TimelyDataflow/timely-dataflow
这里有个faster的rust封装,目的是让timely-dataflow做存储后端,延迟提升很好 论文 代码
RDD 擅长什么
-
擅长
-
- 对数据集中所有元素做同样操作的批处理应用
- 高效地记住每个转换(lineage图中的每一步)
- 不需要备份大量的数据就可以恢复丢失的分区
-
不擅长
-
- 需要对共享的状态做异步细粒度更新(fine-grained)的应用
Timely Dataflow
- 一个有向图,有状态节点通过有向边发送和接受带有逻辑时间戳(logically timestamped)的消息
- 由于每个loop都会有自己的timestamp,利用这个可以区分数据在哪个epoch和loop iterations,也支持嵌套循环
- 支持在不同epochs和iterations间的并发执行
- 当一个特定的timestamp的所有messages交付之后,向节点发出通知
如何实现低延迟
- 采用消息机制作为编程模型
v.SENDBY(edge, message, time)
v.ONRECV(edge, message, time)
v.NOTIFYAT(time)
v.ONNOTIFY(time)
\2. 异步细粒度执行
\3. 分布式进度追踪协议
参考
- https://keys961.github.io/2019/04/19/%E8%AE%BA%E6%96%87%E9%98%85%E8%AF%BB-Naiad/