Under the hood of Spark Shuffle Service

2022-10-30

Motivation

在使用了建立在 Yarn NodeManager 上的 Spark external shuffle service 后,虽然解决了 Spark executor 的容错性问题,但是却因为大量的小文件随机读,使得单台 nodemanager 的 disk io 利用率达到 100%,从而可能导致 executor 端fetch shuffle data 时候,出现 fetch failed exception。在某些波峰时,大量 Spark 任务失败

如下图
image
image

另外在我们的场景下,因为在引入了 elastic NodeManager on K8s, 使得 external shuffle service 在弹性 NM decommission 下线时,可能会对任务造成较大的波动,特别是在弹性节点占据集群中较大容量时。

基于上述背景,计划深入研究 spark external shuffle service 和后续的改进方案,用以增强 Spark 在大规模分布式计算下的稳定性。

问题点

Shuffle 作为影响分布式计算下多节点数据交互的关键因素,是一个热门的优化点,主要从以下几个问题点来做

  1. 磁盘的碎片读写,Spill多次写磁盘和Reduce只拉取部分Partition数据,影响效率

  2. Reduce 读取Map端本地数据,需要 MxR 次远程网络读,影响稳定性

  3. 因为 ESS 的数据存储机制,导致随机读放大,Disk IO 容易被打满

下文将此这三个点作为问题,贯穿文章。

Spark external shuffle service 原理 (version < 3.2)

Spark 为了解决 executor 在完成 task 后,能够顺利退出。因此引入了 external shuffle service 的机制,使得 shuffle data 能够托管于 ESS,executor 的生命周期 < shuffle data。主要有如下几个步骤(magnet 论文中已经概括)
image

  1. 每个 Spark executor 一旦启动,就会注册到 Yarn nodemanager 上的 external shuffle service 上。这种注册方式使得 ess 能够知道 executor map 物化的 shuffle 数据的位置。请注意,Spark ESS 不与 Spark executor 绑定,并在多个 Spark app 间共享

  2. shuffle map stage 阶段中的每个任务都处理其数据部分。在 map 任务结束时,它会生成一对文件,一个是 shuffle data, 另一个是 shuffle data 的索引文件。Map task 将会通过使用 hash 过 partition key 来排序所有转换过的 record。在这个过程中,map task 将写入临时数据到磁盘上,如果它还没有排序完成。一旦排序完成,shuffle data file 将会被生成,属于同一个下游 reduce partition 的数据 records 将会被聚合到一个 shuffle block 上。对应的记录着 block offset 的 shuffle 索引文件也将会一同被生成。

  3. 当下一个 reduce 的 stage 开始运行时,他们将会查询 spark driver 来获取到 input shuffle block. 一旦信息可用,每一个 reduce task 将会和对应的 spark ess 实例建立连接,来 fetch data. 对于 ESS, 一旦接收到请求,将会通过 shuffle 索引文件来定位到 offset, 直接 skip 到对应的 shuffle data block 位置,从磁盘上读取,并发回 reduce task.

按照 Linkedin 的时间来看,存在两个问题

  1. 随机读造成Disk IO 压力过大,其中因素包括 small shuffle block 太多、随机读
  2. M*R 的数据访问连接,造成不稳定

从我们的实践来看,也存在上述问题。同时在弹性混部场景下,ESS 也不适用。

Push-based shuffle service (Managet) 原理

Magnet 为了解决上述问题,引入一个 push-based shuffle service. 通过在 Map 侧将同一个 partition 的数据 push -> reduce partition 上,解决了 data locality + disk io 效率不高的问题。架构图如下
image

从一个下游 reduce partition 上的 magnet shuffle service 的视角来看,通过将不同 map 侧 push 过来的数据 merge 到一起,可以直接让 reduce task 顺序读取。从而实现 RSS 号称的随机读转换为顺序读,顺便还实现了数据本地性。

这种 push based 的方式和下述的 RSS 实现是异曲同工的,简单点讲,其实在 Spark 上实现 shuffle service, 大部分都是在 merge/compaction 的操作,方便 reduce task 一次性读取。这一点优化,有点类似于 Spark 早期将 task 产出的数据分为 partition-N 份,优化为使用 sort-based merge的操作,降低本地的小文件存储。

几个注意点

  1. Magnet 的部署和当前 ESS 兼容
  2. Magnet 的数据将会存储两份(准确来说是<2),一份在 shuffle write 端(map),一份在 shuffle read 端(reduce)。因此理论上,任意一端丢失或者不完整,都不会重算。但加大了存储量,linkedin 通过 zstd 压缩 shuffle data 来优化, 实现只增加了百分之30的存储量
  3. Magnet 为了解决部分 merge task 缓慢和数据倾斜,都有超时取消部分 merge block 的操作
  4. Magnet 兼容 Dynamic Allocation,优化为 executor 感知适应 magnet location 分配的策略

Magnet 目前已经合并到 Spark 3.1 版本中,优点是运维简单,无需维护额外的服务;缺点是无法适用于弹性混部的场景

Remote shuffle service

Tencent firestorm

Firestorm 由腾讯开源,通过 coordinator cluster + shuffle server cluster 主要这两个组件实现。
coordinator 负责采集 shuffle server 的状态数据,并且给 spark task 分配数据写入的 shuffle server。
shuffle server 负责从 spark executor 上接收写入数据,并 merge 后落盘。

目前 firestorm 支持 内存+本地磁盘、内存+HDFS、本地磁盘、HDFS 这4种模式。local disk only 从介绍上看,是腾讯内部广泛使用的。

Firestorm 需要独立部署服务,一般可以直接部署在物理机的 NM 上。架构图如下
image

Firestorm 写入数据的步骤如下

image

  1. task将 record 数据发送到 buffer manager 中

  2. 当 buffer 满了的时候,刷写 buffer 数据到 executor 上的 data queue 中

  3. 维护一个 thread pool, 先向远端 shuffle server 获取到可用的内存块。获取成功后,取用 queue 中的数据后远程写入到 shuffle server 中。

  4. shuffle server 接收到数据后,先放置在 memory 中,等到 buffer 满了后,再进行落盘。落盘分别写 index file 和 data file.

  5. 落盘后,task 经汇报所有的需写入的 blockid 给 shuffle server。shuffle server 同步等待落盘成功后,再返回。这一步也会存储数据元信息在 shuffle server 侧,方便后续数据校验

Firestorm 读取数据步骤

如果是 shuffle server local disk,则 client 向 shuffle server 读取数据;
如果是存储到 HDFS, 则 client 不经过 shuffle server, 直接与 HDFS 交互读取数据。

未测试性能,下述为腾讯分享的性能估算

  1. shuffle data 只有几十兆的情况下,性能降低 15%

  2. shuffle data 在有几十G的情况下,性能有 3-4 倍提升

开源地址: https://github.com/tencent/firestorm

Aliyun RSS

同样采用的是 master-slave 架构,从架构介绍上来看,对于 worker 节点的磁盘管控、滚动升级做了很多优化。也是支持本地盘和 HDFS 存储。同时 shuffle 数据存储在本地盘的时候,支持在 worker 端 replication。

架构图如下:
image

shuffle 主要过程如下

  1. Mapper在首次PushData时请求Master分配Worker资源,Worker记录自己所需要服务的Partition列表。
  2. Mapper把Shuffle数据缓存到内存,超过阈值时触发Push。
  3. 隶属同个Partition的数据被Push到同一个Worker做合并,主Worker内存接收到数据后立即向从Worker发起Replication,数据达成内存两副本后即向Client发送ACK,Flusher后台线程负责刷盘。
  4. Mapper Stage运行结束,MetaService向Worker发起CommitFiles命令,把残留在内存的数据全部刷盘并返回文件列表。
  5. Reducer从对应的文件列表中读取Shuffle数据。

整体架构思路和 firestorm 基本都是一致的,只在于说性能如何?

开源地址https://github.com/alibaba/RemoteShuffleService

OPPO shuttle

oppo shuttle 只支持分布式文件系统来作为 shuffle 存储。因此暂不做考虑

开源地址https://github.com/oppo-bigdata/shuttle

Reference

  1. http://www.vldb.org/pvldb/vol13/p3382-shen.pdf
  2. https://engineering.linkedin.com/blog/2020/introducing-magnet
  3. https://zhuanlan.zhihu.com/p/443752106
  4. https://issues.apache.org/jira/browse/SPARK-30602
  5. https://www.bilibili.com/s/video/BV1Ah411x7ay (腾讯firestorm介绍)
  6. Shuttle:高可用 高性能 Spark Remote Shuffle Service