在本系列文章的第一部分中,我们研究了使用 Apache Spark SQL 和 DataFrames“大规模”利用关系数据库的能力的进步。现在,我们将根据一个真实世界的数据集做一个简单的教程,看看如何使用 Spark SQL。 我们将使用 Spark DataFrames,但重点将更多地放在使用 SQL 上。 在另一篇文章中,我将详细讨论 Spark DataFrames 和常见操作。
我喜欢使用云服务来进行我的机器学习、深度学习,甚至大数据分析需求,而不是费力地设置我自己的 Spark 集群。 我将使用 Databricks 平台来满足我的 Spark 需求。 Databricks 是一家由 Apache Spark 的创建者创立的公司,旨在帮助客户使用 Spark 进行基于云的大数据处理。

最简单(且免费)的方法是访问 Try Databricks 页面并注册一个社区版帐户。 你会得到一个基于云的集群,这是一个具有 6GB 和无限笔记本的单节点集群 - 对于免费版本来说还不错! 如果你对分析大数据有严重的需求,我建议使用 Databricks 平台。
现在让我们开始我们的案例研究。 随时在 Databricks 或你自己的 Spark 集群中的主屏幕上创建一个新笔记本。

你也可以导入我的包含整个教程的笔记本,但请务必运行每个单元格并进行尝试和探索,而不仅仅是阅读它。 不确定如何在 Databricks 上使用 Spark? 按照这个简短但有用的教程。
本教程将使你熟悉处理通常从数据库或平面文件获取的结构化数据的基本 Spark 功能。 我们将探索使用 Spark 利用 DataFrames 和 SQL 的概念来查询和聚合关系数据的典型方法。 我们将处理来自 KDD Cup 1999 的一个有趣的数据集,并尝试使用像 dataframe 这样的高级抽象来查询数据,该数据已经在像 R 和 Python 这样的流行数据分析工具中流行起来。 我们还将了解使用 SQL 语言构建数据查询并从我们的数据中检索有洞察力的信息是多么容易。 这也会大规模发生,而我们不必做更多的事情,因为 Spark 在后端有效地分配这些数据结构,这使我们的查询具有可扩展性并尽可能高效。 我们将从加载一些基本依赖项开始。
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
数据检索
KDD Cup 1999数据集用于第三届国际知识发现和数据挖掘工具竞赛,该竞赛与 KDD-99(第五届国际知识发现和数据挖掘会议)一起举行。 竞赛任务是构建一个网络入侵检测器,一种能够区分坏连接(称为入侵或攻击)和好的、正常的连接的预测模型。 此数据库包含一组要审核的标准数据,其中包括在军事网络环境中模拟的各种入侵。
我们将使用减少的数据集 kddcup.data_10_percent.gz,其中包含近 50 万个网络交互。 我们将从 Web 本地下载此 Gzip 文件,然后使用它。 如果你有良好的稳定互联网连接,请随意下载并使用完整的数据集 kddcup.data.gz。
使用来自 Web 的数据
在 Databricks 中处理从 Web 检索的数据集可能有点棘手。 幸运的是,我们有一些优秀的实用程序包,例如 dbutils,可以帮助我们更轻松地完成工作。 让我们快速浏览一下此模块的一些基本功能。
dbutils.help()
This module provides various utilities for users to interact with the rest of Databricks.
fs: DbfsUtils -> Manipulates the Databricks filesystem (DBFS) from the console
meta: MetaUtils -> Methods to hook into the compiler (EXPERIMENTAL)
notebook: NotebookUtils -> Utilities for the control flow of a notebook (EXPERIMENTAL)
preview: Preview -> Utilities under preview category
secrets: SecretUtils -> Provides utilities for leveraging secrets within notebooks
widgets: WidgetsUtils -> Methods to create and get bound value of input widgets inside notebooks
在 Databricks 中检索和存储数据
我们现在将利用 Python urllib 库从其 Web 存储库中提取 KDD Cup 99 数据,将其存储在一个临时位置,然后将其移动到 Databricks 文件系统,这可以方便地访问此数据进行分析
注意: 如果你跳过此步骤并直接下载数据,你可能会收到 InvalidInputException:输入路径不存在 错误。
import urllib
urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

构建 KDD 数据集
现在我们的数据已存储在 Databricks 文件系统中,让我们将数据从磁盘加载到 Spark 的传统抽象数据结构 弹性分布式数据集 (RDD) 中。
data_file = "dbfs:/kdd/kddcup_data.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

你还可以使用以下代码验证数据的的数据结构类型 (RDD)。
type(raw_rdd)

在我们的数据上构建 Spark DataFrame
Spark DataFrame 是一种有趣的数据结构,代表了分布式的数据集合。 通常,进入 Spark 中所有 SQL 功能的入口点是 SQLContext 类。 要创建此调用的基本实例,我们只需要一个 SparkContext 引用。 在 Databricks 中,此全局上下文对象可用作 sc 用于此目的。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext

拆分 CSV 数据
我们 RDD 中的每个条目都是以逗号分隔的数据行,我们首先需要拆分它,然后才能解析和构建我们的 dataframe。
csv_rdd = raw_rdd.map(lambda row: row.split(","))
print(csv_rdd.take(2))
print(type(csv_rdd))

检查特征总数(列)
我们可以使用以下代码来检查数据集中潜在的列总数。
len(csv_rdd.take(1)[0])
Out[57]: 42
理解和解析数据
KDD 99 Cup 数据由从连接数据中捕获的不同属性组成。 你可以获得数据中的属性完整列表以及与每个属性/列的描述相关的更多详细信息。 我们将只使用数据集中的一些特定列,其详细信息如下所述。
特征编号 | 特征名称 | 描述 | 类型 |
---|---|---|---|
1 | 持续时间 | 连接的长度(秒数) | 连续 |
2 | protocol_type | 协议的类型,例如,tcp、udp 等 | 离散 |
3 | 服务 | 目标网络上的网络服务,例如 http、telnet 等 | 离散 |
4 | src_bytes | 从源到目标的数据字节数 | 连续 |
5 | dst_bytes | 从目标到源的数据字节数 | 连续 |
6 | 标志 | 连接的正常或错误状态 | 离散 |
7 | wrong_fragment | “错误”片段的数量 | 连续 |
8 | 紧急 | 紧急数据包的数量 | 连续 |
9 | 热 | “热”指标的数量 | 连续 |
10 | num_failed_logins | 失败的登录尝试次数 | 连续 |
11 | num_compromised | “受损”条件的数量 | 连续 |
12 | su_attempted | 如果尝试了“su root”命令,则为 1; 否则为 0 | 离散 |
13 | num_root | “root”访问的数量 | 连续 |
14 | num_file_creations | 文件创建操作的数量 | 连续 |
我们将根据每个数据点(行)中的位置提取以下列,并按如下方式构建一个新的 RDD。
from pyspark.sql import Row
parsed_rdd = csv_rdd.map(lambda r: Row(
duration=int(r[0]),
protocol_type=r[1],
service=r[2],
flag=r[3],
src_bytes=int(r[4]),
dst_bytes=int(r[5]),
wrong_fragment=int(r[7]),
urgent=int(r[8]),
hot=int(r[9]),
num_failed_logins=int(r[10]),
num_compromised=int(r[12]),
su_attempted=r[14],
num_root=int(r[15]),
num_file_creations=int(r[16]),
label=r[-1]
)
)
parsed_rdd.take(5)

构建 DataFrame
现在我们的数据已经整齐地解析和格式化,让我们构建我们的 DataFrame!
df = sqlContext.createDataFrame(parsed_rdd)
display(df.head(10))

你现在还可以使用以下代码查看我们的 DataFrame 的模式。
df.printSchema()

构建临时表
我们可以利用 registerTempTable() 函数来构建一个临时表,以便大规模地在我们的 DataFrame 上运行 SQL 命令! 要记住的一点是,此临时表的生命周期与会话相关联。 它创建一个内存表,其范围限定为创建它的集群。 数据使用 Hive 的高度优化的内存列格式存储。
你还可以查看 saveAsTable(),它创建一个永久的物理表,使用 Parquet 格式存储在 S3 中。 所有集群都可以访问此表。 表元数据(包括文件的位置)存储在 Hive 元存储中。
help(df.registerTempTable)

df.registerTempTable("connections")
大规模执行 SQL
让我们看几个例子,说明我们如何基于我们的 dataframe 在我们的表上运行 SQL 查询。 我们将从一些简单的查询开始,然后在本教程中查看聚合、过滤器、排序、子查询和透视。
基于协议类型的连接
让我们看看如何根据连接协议的类型获取连接总数。 首先,我们将使用普通的 DataFrame DSL 语法来执行聚合来获取此信息。
display(df.groupBy('protocol_type')
.count()
.orderBy('count', ascending=False))

我们也可以使用 SQL 来执行相同的聚合吗? 是的,我们可以利用我们之前为此构建的表!
protocols = sqlContext.sql("""
SELECT protocol_type, count(*) as freq
FROM connections
GROUP BY protocol_type
ORDER BY 2 DESC
""")
display(protocols)

你可以清楚地看到你获得了相同的结果,并且无需担心你的后台基础设施或代码的执行方式。 只需编写简单的 SQL!
基于好或坏(攻击类型)签名的连接
我们现在将运行一个简单的聚合来检查基于好(正常)或坏(入侵攻击)类型的连接总数。
labels = sqlContext.sql("""
SELECT label, count(*) as freq
FROM connections
GROUP BY label
ORDER BY 2 DESC
""")
display(labels)

我们有很多不同的攻击类型。 我们可以以条形图的形式将其可视化。 最简单的方法是使用 Databricks 笔记本中出色的界面选项。

这为我们提供了一个漂亮的条形图,你可以通过单击 绘图选项 进一步自定义它。

另一种方法是编写代码来完成它。 你可以将聚合数据提取为 Pandas DataFrame 并将其绘制为常规条形图。
labels_df = pd.DataFrame(labels.toPandas())
labels_df.set_index("label", drop=True,inplace=True)
labels_fig = labels_df.plot(kind='barh')
plt.rcParams["figure.figsize"] = (7, 5)
plt.rcParams.update({'font.size': 10})
plt.tight_layout()
display(labels_fig.figure)

基于协议和攻击的连接
让我们看看哪些协议最容易受到攻击,方法是使用以下 SQL 查询。
attack_protocol = sqlContext.sql("""
SELECT
protocol_type,
CASE label
WHEN 'normal.' THEN 'no attack'
ELSE 'attack'
END AS state,
COUNT(*) as freq
FROM connections
GROUP BY protocol_type, state
ORDER BY 3 DESC
""")
display(attack_protocol)

嗯,看起来 ICMP 连接,其次是 TCP 连接遭受的攻击最多。
基于协议和攻击的连接统计
让我们看一下与我们的连接请求相关的这些协议和攻击的一些统计措施。
attack_stats = sqlContext.sql("""
SELECT
protocol_type,
CASE label
WHEN 'normal.' THEN 'no attack'
ELSE 'attack'
END AS state,
COUNT(*) as total_freq,
ROUND(AVG(src_bytes), 2) as mean_src_bytes,
ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_compromised) as total_compromised,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
GROUP BY protocol_type, state
ORDER BY 3 DESC
""")
display(attack_stats)

看起来在 TCP 请求中传输的平均数据量要高得多,这并不奇怪。 有趣的是,攻击从源到目标的传输的数据的平均有效载荷要高得多。
按服务和攻击类型过滤基于 TCP 协议的连接统计
鉴于我们有更多相关数据和统计信息,让我们仔细看看 TCP 攻击。 我们现在将基于服务和攻击类型聚合不同类型的 TCP 攻击,并观察不同的指标。
tcp_attack_stats = sqlContext.sql("""
SELECT
service,
label as attack_type,
COUNT(*) as total_freq,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
WHERE protocol_type = 'tcp'
AND label != 'normal.'
GROUP BY service, attack_type
ORDER BY total_freq DESC
""")
display(tcp_attack_stats)

有很多攻击类型,并且前面的输出显示了其中的一个特定部分。
按服务和攻击类型过滤基于 TCP 协议的连接统计
我们现在将通过在我们的查询中基于持续时间、文件创建和 root 访问施加一些约束来过滤其中的一些攻击类型。
tcp_attack_stats = sqlContext.sql("""
SELECT
service,
label as attack_type,
COUNT(*) as total_freq,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
WHERE (protocol_type = 'tcp'
AND label != 'normal.')
GROUP BY service, attack_type
HAVING (mean_duration >= 50
AND total_file_creations >= 5
AND total_root_acceses >= 1)
ORDER BY total_freq DESC
""")
display(tcp_attack_stats)

有趣的是,多跳攻击 可以获得目标主机的 root 权限!
用于基于服务过滤 TCP 攻击类型的子查询
让我们尝试获取基于服务和攻击类型的所有 TCP 攻击,以便这些攻击的总体平均持续时间大于零 (> 0)。为此,我们可以使用包含所有聚合统计信息的内部查询,提取相关查询,并在外部查询中应用平均持续时间过滤器,如下所示。
tcp_attack_stats = sqlContext.sql("""
SELECT
t.service,
t.attack_type,
t.total_freq
FROM
(SELECT
service,
label as attack_type,
COUNT(*) as total_freq,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
WHERE protocol_type = 'tcp'
AND label != 'normal.'
GROUP BY service, attack_type
ORDER BY total_freq DESC) as t
WHERE t.mean_duration > 0
""")
display(tcp_attack_stats)

这太棒了!现在,另一种有趣的查看数据的方式是使用数据透视表,其中一个属性表示行,另一个属性表示列。 让我们看看是否可以利用 Spark DataFrames 来做到这一点!
从聚合数据构建数据透视表
我们将基于先前基于类型和服务聚合攻击的 DataFrame 对象。 为此,我们可以利用 Spark DataFrames 和 DataFrame DSL 的强大功能。
display((tcp_attack_stats.groupby('service')
.pivot('attack_type')
.agg({'total_freq':'max'})
.na.fill(0))
)

我们得到了一个漂亮、整洁的数据透视表,显示了基于服务和攻击类型的所有事件!
下一步
我鼓励您尝试使用 Spark SQL 和 DataFrames。您甚至可以导入我的 Notebook 并在您自己的帐户中使用它。
您也可以参考我的 GitHub 存储库,其中包含本文中使用的所有代码和 Notebook。它涵盖了我们未在此处涵盖的内容,包括
- 联接(Joins)
- 窗口函数(Window functions)
- Spark DataFrames 的详细操作和转换
如果您想离线使用,您还可以将我的教程作为Jupyter Notebook访问。
网上有很多文章和教程,我建议您查看一下。 一个有用的资源是 Databricks 完整的 Spark SQL 指南。
考虑使用 JSON 数据但不知道如何使用 Spark SQL?Databricks 支持它!查看此出色的 Spark SQL 中的 JSON 支持指南。
对 SQL 中诸如窗口函数和排名之类的高级概念感兴趣?请查看“在 Spark SQL 中引入窗口函数。”
我将撰写另一篇文章,以直观的方式涵盖其中的一些概念,这应该很容易理解。敬请关注!
如果您有任何反馈或疑问,可以通过LinkedIn与我联系。
本文最初出现在 Medium 的 Towards Data Science 频道,并经许可转载。
1 条评论