使用 Apache Spark SQL 和 DataFrames 扩展关系数据库

使用你友好的 SQL (带有一点变化) 来大规模地整理、聚合和过滤数据。
226 位读者喜欢这篇文章。
computer servers processing data

Opensource.com

无论炒作和更新的数据库(通常称为 NoSQL 数据库)的出现如何,关系数据库仍然存在。 简单原因是关系数据库强制执行必要的结构、约束,并提供一个漂亮的、声明式的语言来查询数据(我们喜欢):SQL!

但是,规模一直是关系数据库的问题。21 世纪的大多数企业都装载着丰富的数据存储和存储库,并希望充分利用他们的大数据来实现可操作的见解。关系数据库可能很受欢迎,但除非我们投资于适当的大数据管理策略,否则它们不能很好地扩展。这涉及到考虑潜在的数据源、数据量、约束、模式、ETL(提取-转换-加载)、访问和查询模式等等!

 

本文将介绍一些卓越的进展,这些进展利用了关系数据库的力量,但使用 Apache Spark 的一些较新组件——Spark SQL 和 DataFrames——“大规模地” 使用。最值得注意的是,我们将介绍以下主题。

  1. 扩展关系数据库的动机和挑战
  2. 了解 Spark SQL 和 DataFrames
    • 目标
    • 架构和功能
    • 性能
  3. 本系列的第二篇文章介绍了一个关于 Spark SQL 的真实世界案例研究/教程,其中包含动手示例

我们将研究人们如此努力并投入时间在 Apache Spark 中构建新组件的主要挑战和动机,以便我们可以大规模地执行 SQL。 我们还将检查 Spark SQL 和 DataFrames 的主要架构、接口、功能和性能基准。 最后,但最重要的是,我们将介绍一个真实的案例研究,该案例研究基于使用 Spark SQL 和 DataFrames 利用 Databricks Cloud Platform for Spark 分析入侵攻击的 KDD 99 Cup Data

扩展大数据关系数据库的动机和挑战

关系数据存储易于构建和查询。 此外,用户和开发人员通常更喜欢使用易于解释的声明性查询,用类似人类的可读语言(例如 SQL)编写。 然而,随着数据量的增加和种类的增加,关系方法无法很好地扩展以构建大数据应用程序和分析系统。 以下是一些主要挑战。

  • 处理不同类型和来源的数据,这些数据可以是结构化、半结构化和非结构化的
  • 构建进出各种数据源的 ETL 管道,这可能导致开发大量特定的自定义代码,从而随着时间的推移增加技术债务
  • 具有执行传统商业智能 (BI) 的分析和高级分析(机器学习、统计建模等)的能力,后者绝对难以在关系系统中执行

大数据分析并不是昨天才发明的! 我们已经通过 Hadoop 和 MapReduce 范例在这个领域取得了成功。 这很强大,但通常很慢,并且为用户提供了一个低级的过程式编程接口,这要求人们编写大量代码,即使是非常简单的数据转换也是如此。 然而,一旦 Spark 发布,它真正彻底改变了大数据分析的方式,重点是内存计算、容错、高级抽象和易用性。

 

Hadoop Map Reduce vs. Spark

从那时起,诸如 Hive、Pig 和 Shark(演变为 Spark SQL)之类的几个框架和系统为大数据存储提供了丰富的关系接口和声明式查询机制。 挑战仍然是这些工具要么是基于关系的,要么是基于过程的,我们无法两全其美。

然而,在现实世界中,大多数数据和分析管道可能涉及关系代码和过程代码的组合。 强迫用户选择其中一个最终会使事情复杂化,并增加用户在开发、构建和维护不同应用程序和系统方面的工作量。 Apache Spark SQL 构建在之前提到的名为 Shark 的 SQL-on-Spark 工作之上。 Spark SQL 并没有强迫用户在关系 API 或过程 API 之间进行选择,而是试图使用户能够无缝地混合两者,并大规模地对大数据执行数据查询、检索和分析。

了解 Spark SQL 和 DataFrames

Spark SQL 本质上试图弥合我们之前提到的两个模型(关系模型和过程模型)之间的差距,它有两个主要组件。

  • Spark SQL 提供了一个 DataFrame API,可以对外部数据源和 Spark 的内置分布式集合执行关系操作——大规模地!
  • 为了支持大数据中各种不同的数据源和算法,Spark SQL 引入了一种新颖的可扩展优化器,称为 Catalyst,它可以轻松地为机器学习等高级分析添加数据源、优化规则和数据类型。

本质上,Spark SQL 利用 Spark 的强大功能,对大数据执行大规模的分布式、鲁棒的、内存计算。 Spark SQL 提供最先进的 SQL 性能,并且还保持与 Apache Hive(一种流行的大数据仓库框架)支持的所有现有结构和组件的兼容性,包括数据格式、用户定义函数 (UDF) 和元存储。 除此之外,它还有助于从大数据源和企业数据仓库(如 JSON、Hive、Parquet 等)中提取各种数据格式,并执行关系操作和过程操作的组合,以进行更复杂、更高级的分析。

目标

让我们看看 Spark SQL 一些有趣的事实,包括它的使用、采用和目标,其中一些我将再次无耻地从关于“Spark 中的关系数据处理”的优秀原始论文中复制。 Spark SQL 于 2014 年 5 月首次发布,现在可能是 Spark 中最积极开发的组件之一。 Apache Spark 绝对是大数据处理最活跃的开源项目,拥有数百名贡献者。

除了是一个开源项目之外,Spark SQL 已经开始被主流行业采用。 它已经被部署在非常大规模的环境中。 Facebook 有一个关于“Apache Spark @Scale:一个 60 TB+ 的生产用例”的优秀案例研究。 该公司正在为实体排名做数据准备,他们的 Hive 作业过去需要几天时间并且面临许多挑战,但 Facebook 成功地使用 Spark 扩展并提高了性能。 看看他们在这个旅程中面临的有趣挑战!

另一个有趣的事实是,Databricks Cloud(运行 Spark 的托管服务)三分之二的客户在其他编程语言中使用 Spark SQL。 我们还将在本系列的 第二部分 中展示一个使用 Databricks 上的 Spark SQL 的动手案例研究。

Spark SQL 的主要目标(由其创建者定义)是

  1. 使用程序员友好的 API 支持关系处理,无论是在 Spark 程序(在原生 RDD 上)中还是在外部数据源上
  2. 使用已建立的 DBMS 技术提供高性能
  3. 轻松支持新的数据源,包括半结构化数据和适合查询联合的外部数据库
  4. 支持使用高级分析算法(如图处理和机器学习)进行扩展

架构和功能

我们现在将看看围绕 Spark SQL 和 DataFrames 的关键特性和架构。 这里需要记住的一些关键概念是关于 Spark 生态系统,它一直在随着时间的推移而不断发展。

 

Spark ecosystem

RDD(弹性分布式数据集)可能是 Spark 所有成功案例背后最大的贡献者。 它基本上是一种数据结构,或者更准确地说是一种分布式内存抽象,它允许程序员在大规模分布式集群上执行内存计算,同时保留诸如容错之类的方面。 您还可以并行化许多计算和转换,并跟踪整个转换过程,这有助于有效地重新计算丢失的数据。 Spark 爱好者可能希望阅读一篇关于 RDD 的优秀论文,“弹性分布式数据集:用于内存集群计算的容错抽象”。 此外,Spark 与驱动程序和 worker 的概念一起工作,如下图所示。

 

Spark works with drivers and workers

您通常可以通过从文件、数据库中读取数据、并行化现有集合甚至转换来创建 RDD。 通常,转换是可以用来将数据转换为不同方面和维度的操作,具体取决于我们想要整理和处理数据的方式。 它们也是延迟评估的,这意味着,即使您定义了一个转换,结果也不会被计算,直到您应用一个动作,该动作通常需要将结果返回给驱动程序程序(然后它计算所有应用的转换!)。

 

RDD transformations

向数据科学家朋友 Favio Vázquez 和他的优秀文章“使用 Apache Spark 的深度学习”致敬,我从中获得了一些绝妙的想法和内容,包括前面的图。 去看看吧!

既然我们已经了解了 Spark 的总体架构,让我们更深入地了解 Spark SQL。通常,Spark SQL 作为库在 Spark 之上运行,正如我们在涵盖 Spark 生态系统的图中看到的那样。下图更详细地展示了 Spark SQL 的典型架构和接口。

 

Spark SQL architecture and interfaces

该图清楚地显示了各种 SQL 接口,这些接口可以通过 JDBC/ODBC 或通过命令行控制台访问,以及集成到 Spark 支持的编程语言中的 DataFrame API(我们将使用 Python)。DataFrame API 非常强大,允许用户最终将过程代码和关系代码混合在一起!像 UDF(用户自定义函数)这样的高级函数也可以在 SQL 中公开,BI 工具可以使用这些函数。

Spark DataFrame 非常有趣,可以帮助我们利用 Spark SQL 的强大功能,并在需要时结合其过程范式。Spark DataFrame 基本上是具有相同模式的行(行类型)的分布式集合。它基本上是一个被组织成命名列的 Spark Dataset。这里需要注意的一点是,Datasets 是 DataFrame API 的扩展,它提供了一个类型安全、面向对象的编程接口。 因此,它们仅在 Java 和 Scala 中可用,因此我们将专注于 DataFrames。

 

DataFrames architecture

DataFrame 相当于关系数据库中的表(但底层进行了更多优化),并且可以以类似于 Spark 中“原生”分布式集合 (RDD) 的方式进行操作。Spark DataFrame 具有一些有趣的属性,其中一些属性如下所述。

  1. 与 RDD 不同,DataFrame 通常会跟踪它们的模式,并支持各种关系操作,从而实现更优化的执行。
  2. DataFrame 可以从表构建,就像 Big Data 基础设施中现有的 Hive 表一样,甚至可以从现有的 RDD 构建。
  3. 可以使用直接 SQL 查询以及 DataFrame DSL(领域特定语言)来操作 DataFrame,在 DataFrame DSL 中,我们可以使用各种关系运算符和转换器,例如 where 和 groupBy。
  4. 此外,每个 DataFrame 也可以被视为行对象的 RDD,允许用户调用过程式的 Spark API,例如 map。
  5. 最后,一个给定的,但要始终记住的一点是,与传统的 dataframe API (Pandas) 不同,Spark DataFrame 是惰性的,即每个 DataFrame 对象表示一个计算数据集的逻辑计划,但在用户调用特殊的“输出操作”(例如 save)之前,不会发生执行。

这应该让您对 Spark SQL、DataFrame、基本特征、概念、架构和接口有足够的了解。 让我们通过查看性能基准来结束本节。

性能

在没有正确优化的情况下发布新功能可能是致命的,而构建 Spark 的人员进行了大量的性能测试和基准测试! 让我们看一些有趣的结果。 下面展示了第一个展示一些结果的图表。

 

Performance comparisons

在这些实验中,他们使用 AMPLab 大数据基准测试(该基准测试使用 Pavlo 等人开发的 Web 分析工作负载)比较了 Spark SQL 与 Shark 和 Impala 的性能。 该基准测试包含四种类型的查询,这些查询具有执行扫描、聚合、连接和基于 UDF 的 MapReduce 作业的不同参数。 使用列式 Parquet 格式压缩后,数据集为 110GB 的数据。 我们看到,在所有查询中,Spark SQL 都比 Shark 快得多,并且通常与 Impala 具有竞争力。 Catalyst 优化器对此负责,它可以减少 CPU 开销(我们将对此进行简要介绍)。 此功能使 Spark SQL 在许多查询中与基于 C++ 和 LLVM 的 Impala 引擎具有竞争力。 与 Impala 的最大差距在于查询 3a,其中 Impala 选择了一个更好的连接计划,因为查询的选择性使其中一个表非常小。

以下图表显示了 DataFrame、常规 Spark API 以及 Spark + SQL 的更多性能基准。

 

Spark DataFrames vs. RDDs and SQL

Spark DataFrames 与 RDD 和 SQL 的比较

最后,下图显示了 DataFrame 与不同语言的 RDD 之间的一个不错的基准测试结果,这为 DataFrame 的优化程度提供了一个有趣的视角。

 

Comparing Spark DataFrames and RDDs

比较 Spark DataFrames 和 RDD

性能秘诀:Catalyst 优化器

为什么 Spark SQL 如此快速和优化? 原因是基于 Scala 中的函数式编程结构的新型可扩展优化器 Catalyst。 虽然我们不会在此处详细介绍 Catalyst,但值得一提的是,因为它有助于优化 DataFrame 操作和查询。

 

Catalyst

Catalyst 的可扩展设计有两个目的。

  • 使其易于向 Spark SQL 添加新的优化技术和功能,尤其是在处理大数据、半结构化数据和高级分析方面的各种问题时
  • 易于扩展优化器 - 例如,通过添加特定于数据源的规则,可以将过滤或聚合推送到外部存储系统或支持新的数据类型

Catalyst 支持基于规则和基于成本的优化。 虽然过去已经提出了可扩展的优化器,但它们通常需要复杂的特定于领域的语言来指定规则。 通常,这会导致学习曲线和维护负担增加。 相比之下,Catalyst 使用 Scala 编程语言的标准功能(例如模式匹配),使开发人员可以使用完整的编程语言,同时仍然可以轻松指定规则。

 

Catalyst architecture

Catalyst 的核心包含一个用于表示树和应用规则来操作它们的通用库。 在此框架之上,它具有特定于关系查询处理的库(例如,表达式、逻辑查询计划)和处理查询执行的不同阶段的几组规则:分析、逻辑优化、物理规划和代码生成以将查询的部分编译为 Java 字节码。 有兴趣了解有关 Catalyst 的更多详细信息并进行深入研究吗? 您可以查看 Databricks 的这篇精彩文章“Spark SQL Catalyst 优化器深度剖析”。

点击进入本系列的第二篇文章,查看基于真实世界数据集的 实践教程,以了解如何使用 Spark SQL。


本文最初出现在 Medium 的 Towards Data Science 频道,经许可转载。


接下来要阅读什么
标签
User profile image.
Dipanjan (DJ) Sarkar 是 Red Hat 的数据科学家、已发表的作者、顾问和培训师。他曾为多家初创公司以及英特尔等财富 500 强公司提供咨询和工作。 他主要致力于利用数据科学、机器学习和深度学习来构建大规模智能系统。

2 条评论

你能解释得更清楚吗?

“这里需要注意的一点是,Datasets 是 DataFrame API 的扩展,它提供了一个类型安全、面向对象的编程接口。 因此,它们仅在 Java 和 Scala 中可用,因此我们将专注于 DataFrames”

我很难理解这一点。 如果 Dataframe *是* Dataset[Row],那么 Dataset 怎么能是 Dataframe API 的扩展? 在我看来,情况正好相反。

此外,忽略 Datasets 的逻辑是什么,因为它们仅在 Spark 的一流语言中可用?

当然,总体而言,数据结构有点相似但又有所不同,这让人有点困惑。 但是,如果您查看 Spark 的演变历史(https://stackoverflow.com/questions/31508083/difference-between-datafra…),我们首先拥有 RDD,然后在 2013 年出现了 DataFrames,最后 Dataset 在 2015 年从 DataFrames 中分离出来,作为 DF 的类型安全版本。

Datasets 非常好,并且在原生 Spark(利用 Scala)中运行良好,但由于我们在示例中利用 Python,因此我们必须使用 Spark DataFrames。 传统上,Datasets 一直比 DataFrames 稍慢,但它们的性能正在赶上(https://databricks.com/session/demystifying-dataframe-and-dataset)。 希望这有帮助!

Creative Commons License本作品根据 Creative Commons Attribution-Share Alike 4.0 International License 授权。
© . All rights reserved.