Apache Uniffle(incubating) 0.7 版本 AQE 优化

2022-12-14

引言

在应用 uniffle 到我们内部集群时,使用的还是 0.5 版本(此时 uniffle 还未加入 Apache 基金会, 原名 Tencent/Firestorm)。Uniffle 是在我们内部 Spark3.x 上应用,但是腾讯内部仍是大量 Spark2.x 的任务,因为未对 Spark3.x 做优化。在此过程中,我们作为吃螃蟹的人,踩了很多坑,也做了一些针对性的优化。本文讲述的主要优化均已在 uniffle 0.7 版本发布,详见:https://uniffle.apache.org/download/release-notes-0.7.0

AQE

Spark 中的 shuffle 一直是重点的性能优化点,又因为 join 的具体实现与 shuffle 机制密切相关,因此如果要详细优化 uniffle 的 shuffle, 必须对 join 有一定的了解。

那么我们先从 spark 的 sort merge join 聊起,再引申到 RSS 如何更好匹配 Spark 的shuffle 提升性能

Join 与 Shuffle

大数据的快建立在能对数据进行分区,且并行计算的基础上。但是 join 却因为需要聚合数据,因此有 join 的任务一般都具有 shuffle 的阶段,同时也是任务的最大耗时点。

Spark 默认使用的就是 SortMergeJoin, 详见于早期版本的 hashJoin, 不仅提升了 executor 的稳定性,还适用于大规模的数据量。因此沿用了很多年。但是在面对各种各样的业务场景,单一的 join 机制已经是不能满足用户需求。因此也会有 boardcastJoin 之类的其他 join 策略。

同时对于 join 策略的自动调整,以及在 reducer 侧对倾斜数据量的优化,也都涵盖在了 Spark 3.x 的 AQE 机制中了。

SortMergeJoin

先给出 sortMergeJoin 的原理图,如下:
image

因此在 executor 侧进行局部的 sort 之后,在 reducer 需要读取某个 partition 的时候,就需要去连接上游所有的 mapper 输出的数据(partition 与 mapper 对应的元数据存储在 mapOutputTracker 中),此时就会产生一次 shuffle reader,同时也会带来一些随机 IO

SortMergeJoin -> Broadcast Join

但是当我们两张表 join 其中一张表较小时,可以通过 AQE(此过程发生在 shuffle write 完成时,因为 driver 会拥有所有的分区数据) 将其从 SortMergeJoin 优化为 Broadcast join. 也就是说,对于上面的模式,省去了一部分 shuffle 的网络开销,但是需要把小表 boardcast 到所有的 reducer task(也可以是 executor) 中。如下所示

image

-- 如果单纯非 AQE 下且手动指定了 Broadcast join, 则此过程无 shuffle.

Local shuffle read

但是这足够么?对于经过上述优化过后的读取, 如下所示,仍然存在一张大表数据的 shuffle read,也就是下图的中的右下部分的图片,其中的一个 reducer 中读取的数据都是同一个 partition 的(常规 shuffle 做法)

image

但是我们没有必要在 reducer 中将一张大表的 partition 都聚会于同一个 reducer 中,join 的关键是将两张表的 partition 汇聚到一个 reducer 中。现在小表的数据 broadcast 出来了,因此没必要做 partition get 操作了。因此完全可以将 reader 取消跨网络的 shuffle, 直接将 reader 调度到 mapper 侧的 executor 上(此时的 reader 读取的数据指定为全部为此 executor map 上产出的所有数据),实现 local shuffle read. 也就是如上图的右上侧的展示。理论上来说,配合调度,此过程无任何 remote shuffle 产生。

通过上述的优化,理论上就不会存在 shuffle 的开销了。此时的 reducer partition 的描述如下

  case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
        SparkEnv.get.shuffleManager.getReaderForRange(
          dependency.shuffleHandle,
          mapIndex,
          mapIndex + 1,
          startReducerIndex,
          endReducerIndex,
          context,
          sqlMetricsReporter)

假设 mapIndex = 101, startReducerIndex = 0, endReducerIndex = 1000
也就是说 reducer 会从上游 mapIndex = 101 的 executor 上读取 partition 从 (0, 1000) 的数据。此时如果 spark 调度器将此 reducer 调度到 mapIndex=101 的 executor 上,则 shuffle 数据直接从本地的 blockManager 中读取,完全没有网络传输的开销。

RemoteShuffleService 与 localShuffleRead 的矛盾

依照上述的 PartialMapperPartitionSpec 设定,对于如果使用了 Uniffle 来讲,每一个 partition 均被分发到了具体的 shuffle server 上。那么如果从 mapIndex 的角度来去获取数据,则不仅会带来大量的随机 IO,而且会带来 reducer number * shuffle-server number 的网络连接,性能将下降十分明显。如下所示

image

因此与其被 AQE 优化为 local shuffle read, 不如禁用掉这个优化,直接采用 reducer 侧拉取单一 partition 的数据,使得网络连接降低,随机 IO 也会大幅度的减少. 具体优化详见:apache/incubator-uniffle@85b7b3b

Attention

可是当在 uniffle 中禁用 localShuffleRead 后,发现在 Spark AQE SortMergeJoin -> BroadcastHashJoin 优化下,如果非 broadcast 的一侧数据的部分 partition 非常巨大的情况下,却是不会应用 Spark skew join 的优化的。因此这部分需要支持下,这部分代码我们会尽快提交到 Spark 社区,uniffle 侧详见:apache/incubator-uniffle#838

至于如何对 BroadcastHashJoin 对 skew join 进行切分数据的优化呢?可以参照此 PR:apache/spark#32328

Data skew

得益于 AQE 的机制,在数据处理的过程中,数据倾斜不再是一个大的问题。但是对 AQE + RSS(Uniffle) 来说,却产生了一些设计上的取舍。

AQE 优化机制

先论 AQE 如何处理 data skew 的情况,在 shuffle write 时,每个 shuffleWriter 都会将此 writer 上的 record 归属于的 partitionId 和出现数目汇总于 driver, 因此在完成 stage 上游的 shuffle write 后,driver 就根据 data skew 设定的阈值判定数据是否倾斜和数据是否过小。

针对于数据的倾斜,则会将此数据切分为多份,供多个 reducer 处理,以期达到一致的处理时间。如对于 partitionId: 1 的数据过大,在被AQE 切分为两份后,则会设定为不同的 spec: (partitionStart-1, partitionEnd-1, mapStartIndex:0, mapEndIndex: 100), (partitionId-1, mapStartIndex:100, mapEndIndex: Integer.MAX).

很自然,如对于数据过小的数据 partition 3 和 4,则AQE 会合并到一个 reducer 来处理,spec: (partitionStart-3, partitionEnd-4, mapStartindex-0, mapEndIndex: Interger.MAX)

RSS 设计优化

数据过小导致的合并,在 RSS 中天然支持,因为 RSS 能否针对特定的 partition 来拉取数据,因此此时相当于一个 reducer 去向 shuffle-server 请求两个 partition。

但是对于数据倾斜的情况,在之前版本的 Uniffle 则会出现一些问题,如下

  1. 过大的 partition 导致 shuffle server 上的本地磁盘被写满,进而导致常规的 shuffle 数据被 evict,任务大面积失败
  2. 过大的 partition 在被切分为 n 个 reducer 后,任务运行并发过大,导致 shuffle server 网卡被打满
  3. 过大的 partition 读取时,不支持根据 mapId 来过滤,导致读取的数据量被放大,且任务运行速度变慢
  4. 过大的 partition 写入时流量过大,写入磁盘速率跟不上 receive 速度,导致常规 app 受影响

从上述原理上分析来看,解决这个问题可以从两个方面入手

  1. shuffle 数据支持 mapId 来过滤,降低端上过滤造成的性能下降
  2. 大 partition 数据写入到分布式存储 HDFS,降低写入时对本地磁盘的压力,读取则将压力卸载到 HDFS 不同的 datanode

读取优化: partition shuffle data 支持 mapId 过滤

在某个 partition 数据写入 shuffle server 时,除了数据,还携带一个 blockId (由 mapId, partitionId, 自增字符三者组成),因此写入的数据是携带有 mapId 的。依据 uniffle 的设计原理,写入的数据会先在内存中暂存,攒一批后批量 append 写入到对应 partition 的分区文件中(分别写入到 index file 和 data file). 其中 index file 会存放 blockId 和其对应的数据在 data file 中的 offset.

基于上述,我们需要根据 mapId 来排序此分区文件。因此有两种方案 LOCAL_ORDER 和 GLOBAL_ODER

LOCAL_ORDER

LOCAL_ORDER 意为对单个 partition 来说,从 MAPID 而言是局部有序

基于 uniffle 会在内存中攒一批 partition 的数据,我们可以将写入磁盘的数据进行排序后再 append 到磁盘。因此基于这个假设,每次刷写的数据量为 56M,则分区文件中的数据顺序根据 mapId 来看,则是部分有序的。

因此 client 在读取时,则可以根据 index file 找到每一段部分有序的数据,来去 data file 进行跳读。此方案假设是内存足够且各个应用间抢占不明显。

Uniffle 当前默认采用此种机制。

GLOBAL_ORDER

GLOBAL_ORDER 意为对单个 partition 来说,从 MAPID 而言是全局有序

GLOBAL_ORDER 则是在所有数据写入到分区文件后,在第一次 reducer 发起读取请求时,发现是被 AQE 优化的,则进行一次全局加锁的分区文件全局排序。

因此,排序完成后对 client 来说,则只需要进行一次顺序读取。但是因为需要等待排序完成,此过程耗时未知,且耗费 shuffle server 资源,因此未做尝试。

在进行上述优化,AQE 数据倾斜场景相较于原先 Uniffle 性能提升近 3.5 倍!

  1. Uniffle PR:apache/incubator-uniffle@74949f5.
  2. Proposal : https://docs.google.com/document/d/1G0cOFVJbYLf2oX1fiadh7zi2M6DlEcjTQTh4kSkb0LA/edit

写入优化:Huge partition 发现与限制

上述在 AQE 中对大 partition 的 skew 优化,也已经提及到对 huge partition 的处理,本章叙述下可采用的思路和实现。

首先因为受限于 RSS 本身的机制,是将多个 map 产出的相同 partition 的数据聚合在一起。如果一个 partition 的巨大(线上可达 20T),则对本地磁盘造成巨大压力,包括高写入速率和触发容量限制

处理这种情况,无法是两种方案

  1. 压力分散到不同的后端 shuffle server,暂且称为 dynamic partition spilt
  2. 压力分散到分布式存储,比如 S3 或者是 HDFS,暂且称为 partition upload to HDFS

Dynamic partition split

动态分区 split 策略,是一旦单个 partition 达到了大分区的阈值(比如 20g), 则触发一个 split 信号,接下来写入侧则切换到下一个 server 来写入。优势是无需借助到分布式存储,且本地磁盘写入速率快,可承载压力大。缺点是整体大分区容量受限于 RSS 集群容量,并且因为一个 partition 写入到多个 shuffle server, 但写入时并未与读取的顺序对齐,因此每一个经过 AQE 的 reducer 都需要去连接每一台拥有此分区的 shuffle server 来获取,连接数过高,且效率不高。(此方案仍可以优化,但因为 uniffle 对单个 partition 采用静态分配,因此未采用)

TIPS : 此方案被 celeborn 所采用,在 uniffle 支持 dynamic assignment 后也可实现

Partition upload to HDFS

将大 partition 写入到 HDFS,也是受益于 uniffle multiple tier storages 的机制 ( Flink 在最近的版本中 shuffle 也采用 multiple storages). 简单介绍下 multiple storages. 主要分为三级:memory -> local disk -> HDFS/S3 etc...

  1. memory 作为高速率写入的缓冲
  2. local disk 提高持久化高速存储
  3. HDFS 支持无限容量和负载均衡式并行读取

得益于本身的机制,因此未做很多改造。但是原先 uniffle 中对 HDFS 的用法,是将在 memory 中聚合超过 64m 就写入 HDFS,但实测效果不佳,并且也没有充分利用闲置的本地磁盘。因为做了一些新的 huge partition 的特性来满足,包括如下

  1. 对 huge partition 定量限制,一旦达到某个阈值,就直接写入 HDFS。但因为 HDFS 写入速率不高,且大分区接收速率非常快,通过实现了将一个 partition 并行写入多个(并行度可配置) HDFS 文件,来实现可伸缩扩展的高速写入。也弥补了 HDFS 的速率缺陷
  2. uniffle 中的所有数据都会在内存中暂留,且未多每个 app 做 memory quota 限制。但是 huge partition 写入速率高导致占用内存多,进一步影响了正常分区的任务。因此就引入了 huge partition memory limitation.

通过上述优化,现在线上的大分区任务都正常运行。但是前提是通过牺牲 huge partition 的速度来满足常规任务的稳定性, 后续会进一步优化大分区任务的速度

Reference