开源和公共云团队创下 100 TB 排序世界纪录

还没有读者喜欢这个。
Two different paths to different outcomes

Opensource.com

2014 年 10 月,Databricks 参加了 Sort Benchmark,并创下了排序 100 太字节 (TB) 数据或 1 万亿 100 字节记录的新的世界纪录。该团队在 207 台 EC2 虚拟机上使用了 Apache Spark,并在 23 分钟内排序了 100 TB 的数据。

相比之下,之前的世界纪录由 Hadoop MapReduce 在私有数据中心的 2100 台机器上创造,耗时 72 分钟。 这次成绩与加州大学圣地亚哥分校的研究团队构建高性能系统并驾齐驱,我们共同创造了一项新的世界纪录。

此外,虽然没有官方的拍字节 (PB) 排序竞赛,但我们将 Apache Spark (Spark) 进一步推进,还在 190 台机器上在 4 小时内排序了 1 PB 的数据(10 万亿条记录)。 这个 PB 级时间超越了之前基于 Hadoop MapReduce 报告的结果(在 3800 台机器上耗时 16 小时)。 据我们所知,这是首次使用开源软件 (Spark) 和公共云基础设施 (EC2) 的组合来创下 100 TB 排序的新纪录,也是首次在公共云中完成拍字节级排序。

以 Jim Gray 命名的基准工作负载在任何衡量标准下都是资源密集型的:按照严格的规则排序 100 TB 的数据会产生 500 TB 的磁盘 I/O 和 200 TB 的网络 I/O。 来自世界各地的组织经常构建专用的排序机器(专门的软件,有时甚至是专门的硬件)来参加此基准测试。

 
Hadoop MR
记录
Spark
记录
Spark
1 PB
数据大小 102.5 TB 100 TB 1000 TB
运行时间 72 分钟 23 分钟 234 分钟
节点数 2100 206 190
核心数 50400 物理 6592 虚拟化 6080 虚拟化
集群磁盘吞吐量 3150 GB/s
(估计)
618 GB/s 570 GB/s
Sort Benchmark Daytona 规则
网络 专用数据中心,10Gbps 虚拟化 (EC2) 10Gbps 网络 虚拟化 (EC2) 10Gbps 网络
排序速率 1.42 TB/分钟 4.27 TB/分钟 4.27 TB/分钟
每节点排序速率 0.67 GB/分钟 20.7 GB/分钟 22.5 GB/分钟

什么是 Spark?

Apache Spark 被广泛认为是 Hadoop MapReduce 的继任者,是一个快速且通用的用于大规模数据处理的引擎。 它提供 Java、Python、Scala 和 SQL 的编程 API,可用于高效执行各种工作负载,包括常见的 ETL、数据流、机器学习、图计算和 SQL。

Spark 是最活跃的开源项目之一。 2014 年,它拥有超过 465 位贡献者,使其成为 Apache 软件基金会中最活跃的项目以及大数据开源项目中最活跃的项目之一。

排序

Sort Benchmark(基准测试)最初由 Jim Gray 提出和赞助,旨在衡量计算机系统最先进的发展水平。 在 Jim Gray 于 2007 年去世后,该基准测试现在由过去获奖者的联盟运行。 该基准测试由多个类别组成,每个类别都有不同的重点。 Daytona Gray(以 Gray 博士的名字命名)是最具挑战性的类别,因为它要求参与系统在尽可能短的时间内排序 100 太字节 (TB) 的数据,而无需考虑使用的计算资源。

排序的核心是 shuffle 操作,它跨所有机器移动数据。 Shuffle 几乎是所有分布式数据处理工作负载的基础。 例如,SQL 查询连接两个不同的数据源使用 shuffle 将应连接在一起的元组移动到同一台机器上,而协作过滤算法(如 ALS)则依赖 shuffle 在网络上发送用户/产品评分和权重。

大多数数据管道都以大量原始数据开始,但随着管道的进行,由于过滤掉不相关的数据或更紧凑的中间数据表示,数据量会减少。 对 100 TB 原始输入数据执行 SQL 查询很可能只会在网络上 shuffle 100 TB 数据的一小部分。 这种模式也反映在流行的数据处理框架 MapReduce 的命名中。

然而,排序是最具挑战性的任务之一,因为沿管道没有数据减少。 排序 100 TB 的输入数据需要在网络上 shuffle 100 TB 的数据。 事实上,Daytona Gray 竞赛要求我们复制输入和输出数据以实现容错,因此排序 100 TB 的数据实际上会产生 500 TB 的磁盘 I/O 和 200 TB 的网络 I/O。

由于上述原因,当我们寻找衡量和改进 Spark 的指标时,排序(最苛刻的工作负载之一)自然而然地成为我们关注的重点。

是什么使之成为可能?

为了改进 Spark 以适应超大规模工作负载,我们进行了大量开发工作。 特别是,有三项主要工作与此基准测试高度相关。

首先也是最重要的,在 Spark 1.1 中,我们引入了一种新的 shuffle 实现,称为基于排序的 shuffle (SPARK-2045)。 之前的 Spark shuffle 实现是基于哈希的,需要在内存中维护 P(reduce 分区的数量)个并发缓冲区。 在基于排序的 shuffle 中,在任何给定时间只需要单个缓冲区。 这大大减少了 shuffle 期间的内存开销,并且可以支持单个阶段中具有数十万个任务的工作负载(我们的 PB 排序使用了 250,000 个任务)。

其次,我们基于 Netty 的 Epoll 本地套接字传输(通过 JNI)改进了 Spark 中的网络模块 (SPARK-2468)。 新模块还维护了自己的内存池,从而绕过了 JVM 的内存分配器,减少了垃圾回收的影响。

最后但并非最不重要的一点,我们创建了一个新的外部 shuffle 服务 (SPARK-3796),该服务与 Spark 执行器本身解耦。 这个新服务建立在上述网络模块之上,并确保即使执行器处于 GC 暂停状态,Spark 仍然可以提供 shuffle 文件。

Network activity during sort

通过这三项更改,我们的 Spark 集群能够在 map 阶段维持 3GB/s/节点 I/O 活动,并在 reduce 阶段维持 1.1 GB/s/节点网络活动,从而饱和这些机器上可用的 10Gbps 链路。

细节

TimSort: 在 Spark 1.1 中,我们将默认排序算法从快速排序切换到 TimSort,它是归并排序和插入排序的派生算法。 它在大多数实际数据集中的性能优于快速排序,尤其是在部分有序的数据集中。 我们在 map 和 reduce 阶段都使用 TimSort。

利用缓存局部性: 在排序基准测试中,每条记录为 100 字节,其中排序键是前 10 个字节。 当我们分析我们的排序程序时,我们注意到缓存未命中率很高,因为每次比较都需要一个随机的对象指针查找。 我们重新设计了我们的记录内存布局,将每条记录表示为一个 16 字节的记录(JVM 中的两个 long 类型),其中前 10 个字节表示排序键,后 4 个字节表示记录的位置(实际上,由于字节序和有符号性,它比这稍微复杂一些)。 这样,每次比较只需要一个主要是顺序的缓存查找,而不是随机的内存查找。 最初由 Chris Nyberg 等人在 AlphaSort 中提出,这是高性能系统中常用的一种技术。

Spark 友好的编程抽象和架构使我们能够在用户空间中(无需修改 Spark)用几行代码实现这些改进。 将 TimSort 与我们利用缓存局部性的新布局相结合,排序的 CPU 时间减少了 5 倍。

大规模容错: 在大规模情况下,很多事情都可能出错。 在此实验过程中,我们看到节点因网络连接问题而消失、Linux 内核在循环中旋转或节点因内存碎片整理而暂停。 幸运的是,Spark 具有容错能力,并从这些故障中恢复过来。

云的力量: 如前所述,我们利用 206 个 i2.8xlarge 实例来运行此 I/O 密集型实验。 这些实例通过 SSD 提供高 I/O 吞吐量。 我们将这些实例放在 VPC 中的放置组中,以通过单根 I/O 虚拟化 (SR-IOV) 实现增强的网络。 启用增强的网络可带来更高的性能 (10Gbps)、更低的延迟和更低的抖动。 我们要感谢 AWS 的所有参与者,感谢他们帮助实现这一切,包括:AWS EC2 服务团队、AWS EC2 业务开发团队、AWS 产品营销和 AWS 解决方案架构团队。 没有他们,这个实验就不可能实现。

Apache
Quill

本文是 Jason Hibbets 协调的 Apache Quill 专栏的一部分。 请通过 open@opensource.com 联系我们,分享您在 Apache 软件基金会项目中取得的成功案例和开源更新.

标签
User profile image.
Reynold Xin 是 Apache Spark 的项目管理委员会 (PMC) 成员,也是 Databricks 的联合创始人,Databricks 是一家由 Spark 的创建者创立的公司。 他最近在 Databricks 领导了一项扩展 Spark 的工作,并在 100 TB 排序 (Daytona Gray) 中创下了新的世界纪录。 在加入 Databricks 之前,他正在加州大学伯克利分校 AMPLab 攻读数据库博士学位。

5 条评论

令人印象深刻的改进。

是否可以在某个地方找到允许重复此基准测试(在 100TB 或更小规模下)的源代码?

190 个节点 x 8 个 SSD x 800GB = 1.2PB 的总存储空间。 如何在不压缩的情况下在 1.2PB 的存储空间上读取 1PB、对其进行排序并写入输出?

我很好奇。 每次排序的日期是什么时候? 即 Hadoop 排序运行的时间和 Spark 运行的时间? 我猜想摩尔定律产生了一些影响?

您能否分享源代码和集群详细信息以供审查? 还可以分享您为了获得此基准测试而调整的所有参数

© . All rights reserved.