如何使用 Python 和 Apache Spark 分析日志数据

使用 NASA 日志的案例研究,展示如何利用 Spark 大规模分析数据。
226 位读者喜欢这个。
Person standing in front of a giant computer screen with numbers, data

Opensource.com

在本系列的第一部分中,我们首先使用 PythonApache Spark 来处理和整理我们的示例 Web 日志,使其成为适合分析的格式,考虑到当今大多数组织生成的大量日志数据,这是一项至关重要的技术。我们设置了环境变量、依赖项,加载了使用 DataFrame 和正则表达式所需的库,当然还加载了示例日志数据。然后我们将日志数据整理成干净、结构化且有意义的格式。在第二部分中,我们重点关注分析这些数据。

Web 日志数据分析

现在我们有了一个 DataFrame,其中包含解析和清理后的日志文件作为数据帧,我们可以执行一些有趣的探索性数据分析 (EDA),尝试获得一些有趣的见解!

内容大小统计

让我们计算一些关于我们的 Web 服务器返回的内容大小的统计信息。特别是,我们想知道平均值、最小值和最大值内容大小。

我们通过在 logs_dfcontent_size 列上调用 .describe() 来计算这些统计信息。.describe() 函数以这种格式返回给定列的 countmeanstddevminmax

content_size_summary_df = logs_df.describe(['content_size'])
content_size_summary_df.toPandas()

Stastical analysis regarding the size of content your web server returns.

使用 .describe() 分析的内容大小。

或者,我们可以使用 SQL 直接计算这些统计信息。pyspark.sql.functions 模块有许多有用的函数,请参阅 文档 了解更多信息。

在我们应用 .agg() 函数后,我们调用 toPandas() 来提取结果并将其转换为 pandas DataFrame,这在 Jupyter Notebook 上提供了更好的格式

from pyspark.sql import functions as F

(logs_df.agg(F.min(logs_df['content_size']).alias('min_content_size'),
             F.max(logs_df['content_size']).alias('max_content_size'),
             F.mean(logs_df['content_size']).alias('mean_content_size'),
             F.stddev(logs_df['content_size']).alias('std_content_size'),
             F.count(logs_df['content_size']).alias('count_content_size'))
        .toPandas())

The same data reformatted into a pandas dataframe.

重新格式化为 pandas 数据帧的相同数据。

当我们验证结果时,我们看到它们与预期相同。

HTTP 状态代码分析

接下来,让我们查看日志的状态代码值,看看出现了哪些状态代码值以及出现的次数。我们再次从 logs_df 开始,按 status 列分组,应用 .count() 聚合函数,然后按 status 列排序

status_freq_df = (logs_df
                     .groupBy('status')
                     .count()
                     .sort('status')
                     .cache())
print('Total distinct HTTP Status Codes:', status_freq_df.count())                     

不同的 HTTP 状态代码总数:8

让我们以频率表的形式查看每个状态代码的出现次数

status_freq_pd_df = (status_freq_df
                         .toPandas()
                         .sort_values(by=['count'],
                                      ascending=False))
status_freq_pd_df

How many times each status code appears in your log.

每个状态代码在您的日志中出现的次数。

看起来最常见的状态代码是 200—OK—这是一个很好的迹象,表明大部分时间事情都在正常运行。让我们将其可视化

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
%matplotlib inline

sns.catplot(x='status', y='count', data=status_freq_pd_df, 
            kind='bar', order=status_freq_pd_df['status'])

HTTP status code occurrences in a bar chart.

条形图中的 HTTP 状态代码出现次数。

还不错。但由于数据的巨大偏斜,几个状态代码几乎不可见。让我们进行对数变换,看看情况是否有所改善。通常,对数变换帮助我们将高度偏斜的数据转换为近似正态分布,以便我们可以以更易于理解的方式可视化数据分布:

log_freq_df = status_freq_df.withColumn('log(count)', 
                                        F.log(status_freq_df['count']))
log_freq_df.show()

Error code frequency as a log transform.

作为对数变换的错误代码频率。

结果看起来肯定不错,并且似乎已经处理了偏斜,让我们通过可视化数据来验证这一点

log_freq_pd_df = (log_freq_df
                    .toPandas()
                    .sort_values(by=['log(count)'],
                                 ascending=False))
sns.catplot(x='status', y='log(count)', data=log_freq_pd_df, 
            kind='bar', order=status_freq_pd_df['status'])

HTTP status code frequency bar chart, after a log transform.

对数变换后的 HTTP 状态代码频率条形图。

这个图表看起来肯定好多了,偏斜也更小,让我们更好地了解状态代码的分布!

分析频繁主机

让我们查看频繁访问服务器的主机,方法是获取每个 host 的总访问次数,按访问次数排序,并且仅显示访问次数最多的前 10 个主机

host_sum_df =(logs_df
               .groupBy('host')
               .count()
               .sort('count', ascending=False).limit(10))

host_sum_df.show(truncate=False)

Hosts that frequently access the server sorted by number of accesses.

通过 host_sum_df 按访问次数排序的频繁访问服务器的主机。

此表看起来不错,但让我们更仔细地检查第 9 行中的空白记录

host_sum_pd_df = host_sum_df.toPandas()
host_sum_pd_df.iloc[8][‘host’]

''

看起来最顶层的主机名之一是空字符串。这个例子教会了我们一个宝贵的教训:在数据整理时,不仅要检查空值,还要检查空字符串。

显示前 20 个最频繁的端点

现在,让我们可视化日志中端点 URI 命中的次数。要执行此任务,请从 logs_df 开始,然后按 endpoint 列分组,按计数聚合,并像上一个示例一样按降序排序

paths_df = (logs_df
            .groupBy('endpoint')
            .count()
            .sort('count', ascending=False).limit(20))

paths_pd_df = paths_df.toPandas()
paths_pd_df            

A table showing the number of hits to each endpoint URI in descending order.

通过 paths_df 按降序排列的每个端点 URI 的命中次数

毫不奇怪,访问最多的资产是 GIF、主页和一些 CGI 脚本。

显示前 10 个错误端点

哪些是前 10 个请求的端点,但没有返回代码 200(HTTP 状态 OK)?为了找出答案,我们创建了一个排序列表,其中包含端点以及它们被访问时返回非 200 代码的次数,然后显示前 10 个

not200_df = (logs_df
               .filter(logs_df['status'] != 200))

error_endpoints_freq_df = (not200_df
                               .groupBy('endpoint')
                               .count()
                               .sort('count', ascending=False)
                               .limit(10)
                          )
                          
error_endpoints_freq_df.show(truncate=False)                          

A table displaying the top ten error endpoints and their frequency.

通过 error_endpoints_freq_df 的前 10 个错误端点及其频率

看起来 GIF(动画/静态图像)最容易加载失败。为什么会这样?鉴于这些日志来自 1995 年,并且考虑到我们当时的互联网速度,我并不感到惊讶!

唯一主机的总数

在这两个月内,有多少个唯一主机访问了 NASA 网站?我们可以通过几个转换来找到答案

unique_host_count = (logs_df
                     .select('host')
                     .distinct()
                     .count())
unique_host_count

137933

每日唯一主机的数量

对于一个高级示例,让我们看看如何确定每天唯一主机的数量。在这里,我们想要一个 DataFrame,其中包含月份中的日期以及该日期关联的唯一主机数量,并按月份中的日期升序排序。

考虑一下您为此任务需要执行的步骤。由于每个日志仅涵盖一个月,因此您可以忽略月份问题,至少现在是这样。对于跨越多个月的数据,我们在进行必要的聚合时需要同时考虑月份和日期。您可能需要使用 pyspark.sql.functions 模块的 dayofmonth() 函数(我们已在本教程的开头将其导入为 F)。

host_day_df 开始,这是一个具有两列的 DataFrame

The columns in the host_day_df dataframe.

host_day_df 数据帧中的列。

logs_df 中的每一行在此 DataFrame 中都有一行。本质上,我们只是在转换每一行。例如,对于这一行

unicomp6.unicomp.net - - [01/Aug/1995:00:35:41 -0400] "GET /shuttle/missions/sts-73/news HTTP/1.0" 302 -

您的 host_day_df 应该有 unicomp6.unicomp.net 1

host_day_df = logs_df.select(logs_df.host, 
                             F.dayofmonth('time').alias('day'))
host_day_df.show(5, truncate=False)

The top five hosts making requests on the first day.

通过 host_day_df 的第一天提出请求的前五个主机

接下来是 host_day_distinct_df,这是一个 DataFrame,其列与 host_day_df 相同,但删除了重复的 (day, host)

host_day_df = logs_df.select(logs_df.host, 
                             F.dayofmonth('time').alias('day'))
host_day_df.show(5, truncate=False)

host_day_distinct_df gives the same output as host_day_df, but with duplicate rows removed.

host_day_distinct_df 给出的输出与 host_day_df 相同,但删除了重复行。

另一个选项是 daily_unique_hosts_df,这是一个具有两列的 DataFrame,其中一列与以前的 DataFrame 不同

Columns shown by daily_unique_hosts_df.

通过 daily_unique_hosts_df 的月份中的日期和每天提出请求的唯一主机数

def_mr = pd.get_option('max_rows')
pd.set_option('max_rows', 10)

daily_hosts_df = (host_day_distinct_df
                     .groupBy('day')
                     .count()
                     .sort("day"))

daily_hosts_df = daily_hosts_df.toPandas()
daily_hosts_df

daily_unique_hosts_df shows the day of the month and the number of unique hosts making requests that day.

daily_unique_hosts_df 显示月份中的日期和当天提出请求的唯一主机数。

此结果为我们提供了一个不错的 DataFrame,显示了每天唯一主机的总数。让我们将其可视化

c = sns.catplot(x='day', y='count', 
                data=daily_hosts_df, 
                kind='point', height=5, 
                aspect=1.5)

Unique hosts per day charted using daily_unique_hosts_df.

使用 daily_unique_hosts_df 绘制的每天唯一主机数

每个主机的每日平均请求数

在上一个示例中,我们研究了一种确定每日日志中唯一主机数量的方法。现在让我们找到每天每个主机对 NASA 网站发出的平均请求数。在这里,我们想要一个按月份中的日期升序排序的 DataFrame,其中包括月份中的日期以及该日期每个主机的关联平均请求数

daily_hosts_df = (host_day_distinct_df
                     .groupBy('day')
                     .count()
                     .select(col("day"), 
                                      col("count").alias("total_hosts")))

total_daily_reqests_df = (logs_df
                              .select(F.dayofmonth("time")
                                          .alias("day"))
                              .groupBy("day")
                              .count()
                              .select(col("day"), 
                                      col("count").alias("total_reqs")))

avg_daily_reqests_per_host_df = total_daily_reqests_df.join(daily_hosts_df, 'day')
avg_daily_reqests_per_host_df = (avg_daily_reqests_per_host_df
                                    .withColumn('avg_reqs', col('total_reqs') / col('total_hosts'))
                                    .sort("day"))
avg_daily_reqests_per_host_df = avg_daily_reqests_per_host_df.toPandas()
avg_daily_reqests_per_host_df

The average number of daily requests per host via avg_daily_reqests_per_host_df.

通过 avg_daily_reqests_per_host_df 的每个主机的每日平均请求数。

现在我们可以可视化每个主机的平均每日请求数

c = sns.catplot(x='day', y='avg_reqs', 
                data=avg_daily_reqests_per_host_df, 
                kind='point', height=5, aspect=1.5)

The average number of daily requests per host charted.

绘制的每个主机的平均每日请求数。

看起来第 13 天每个主机的请求数最多。

计数 404 响应代码

创建一个 DataFrame,其中仅包含状态代码为 404(未找到)的日志记录。我们确保 cache() not_found_df DataFrame,因为我们将在接下来的示例中使用它。您认为日志中有多少 404 记录?

not_found_df = logs_df.filter(logs_df["status"] == 404).cache()
print(('Total 404 responses: {}').format(not_found_df.count()))

404 响应总数:20899

列出前 20 个 404 响应代码端点

使用我们之前缓存的 DataFrame—仅包含状态代码为 404 响应代码的日志记录—我们现在将打印出生成最多 404 错误的前 20 个端点的列表。 请记住,无论何时生成顶级端点,它们都应按排序顺序排列

endpoints_404_count_df = (not_found_df
                          .groupBy("endpoint")
                          .count()
                          .sort("count", ascending=False)
                          .limit(20))

endpoints_404_count_df.show(truncate=False)

The top 20 response code endpoints, sorted, thanks to endpoints_404_count_df.

通过 endpoints_404_count_df 排序的前 20 个响应代码端点。

列出前 20 个 404 响应代码主机

使用我们之前缓存的 DataFrame,其中仅包含状态代码为 404 响应代码的日志记录,我们现在可以打印出生成最多 404 错误的前 20 个主机的列表。 同样,请记住,顶级主机应按排序顺序排列

hosts_404_count_df = (not_found_df
                          .groupBy("host")
                          .count()
                          .sort("count", ascending=False)
                          .limit(20))

hosts_404_count_df.show(truncate=False)

The top twenty 404 response code hosts via hosts_404_count_df.

通过 hosts_404_count_df 的前 20 个 404 响应代码主机。

此输出让我们很好地了解了哪些主机最终为 NASA 网页生成了最多的 404 错误。

可视化每日 404 错误

现在让我们按时间(按时间)探索我们的 404 记录。与显示每日唯一主机数量的示例类似,我们将按天细分 404 请求,并在 errors_by_date_sorted_df 中按天对每日计数进行排序

errors_by_date_sorted_df = (not_found_df
                                .groupBy(F.dayofmonth('time').alias('day'))
                                .count()
                                .sort("day"))

errors_by_date_sorted_pd_df = errors_by_date_sorted_df.toPandas()
errors_by_date_sorted_pd_df

404 errors per day via errors_by_date_sorted_df.

通过 status_freq_df 的每个状态代码出现的次数。

现在让我们可视化每天的 404 错误总数

c = sns.catplot(x='day', y='count', 
                data=errors_by_date_sorted_pd_df, 
                kind='point', height=5, aspect=1.5)

Total 404 errors per day.

每天的 404 错误总数。

404 错误的前三天

根据之前的图表,404 错误最多的月份中的前三天是哪几天?了解这一点可以帮助我们诊断并深入研究这些特定日期,以找出可能出现的问题(服务器问题、DNS 问题、拒绝服务、延迟问题、维护等等)。我们可以利用我们之前创建的 errors_by_date_sorted_df DataFrame 来回答这个问题

(errors_by_date_sorted_df
    .sort("count", ascending=False)
    .show(3))

The top 3 days of 404 errors via errors_by_date_sorted_df.

通过 errors_by_date_sorted_df 的 404 错误的前 3 天。

可视化每小时 404 错误

使用我们之前缓存的 DataFrame not_found_df,我们现在可以按一天中的小时进行分组和排序(按升序排列)。我们将使用此过程创建一个 DataFrame,其中包含一天中每小时的 HTTP 请求的 404 响应总数(午夜从 0 点开始)。然后,我们将从 DataFrame 构建可视化。

hourly_avg_errors_sorted_df = (not_found_df
                                   .groupBy(F.hour('time')
                                             .alias('hour'))
                                   .count()
                                   .sort('hour'))
hourly_avg_errors_sorted_pd_df = hourly_avg_errors_sorted_df.toPandas()

c = sns.catplot(x='hour', y='count', 
                data=hourly_avg_errors_sorted_pd_df, 
                kind='bar', height=5, aspect=1.5)

Total 404 errors per hour in a bar chart.

条形图中的每小时 404 错误总数。

看起来 404 错误总数在下午发生最多,而在清晨发生最少。我们现在可以将 pandas 显示的最大行数重置为默认值,因为我们之前已将其更改为显示有限的行数。

pd.set_option(‘max_rows’, def_mr)

结论

我们对大规模数据整理、解析、分析和可视化采取了动手方法,针对一个非常常见但至关重要的日志分析案例研究。虽然我们在此处处理的数据从大小或数量的角度来看可能不是“大数据”,但这些技术和方法具有足够的通用性,可以扩展到更大的数据量。我希望这个练习能为您提供关于如何利用 Apache Spark 等开源框架来处理您自己的结构化和半结构化数据的想法!

您可以在 我的 GitHub 存储库 中找到本文随附的所有代码和分析。此外,您可以在此 Jupyter Notebook 中找到逐步方法。

对了解 Spark SQL 和 DataFrame 感兴趣?查看我在 opensource.com 上的 动手教程

如果您有任何反馈或疑问,可以在此处留言或在 LinkedIn 上联系我。


本文最初发表在 Medium 的 Towards Data Science 频道上,并经许可转载。


 

User profile image.
Dipanjan (DJ) Sarkar 是 Red Hat 的数据科学家、已出版的作者、顾问和培训师。他曾为多家初创公司以及英特尔等财富 500 强公司提供咨询和工作。他主要致力于利用数据科学、机器学习和深度学习来构建大规模智能系统。

1 条评论

非常有趣!好奇为什么您计算每个状态代码频率的对数,而不是仅仅将 y 轴设置为对数刻度?当您计算对数时,您会丢失数据中的所有可读性。

Creative Commons License本作品根据知识共享署名-相同方式共享 4.0 国际许可协议获得许可。
© . All rights reserved.