如何使用 Python 和 Apache Spark 整理日志数据

使用 NASA 日志的案例研究,展示如何利用 Spark 大规模分析数据。
225 位读者喜欢这篇文章。
metrics and data shown on a computer screen

Opensource.com

如今,利用分析的最流行和有效的企业用例之一是日志分析。几乎每个组织现在都有多个系统和基础设施日夜不停地运行。为了有效地维持业务运营,这些组织需要了解其基础设施是否发挥了最大潜力。找出答案需要分析系统和应用程序日志,甚至可能对日志数据应用预测分析。所涉及的日志数据量通常是巨大的,具体取决于所涉及的组织基础设施类型以及在其上运行的应用程序。

The log data processing pipeline by Doug Henschen.

日志数据处理管道

由于计算限制,我们过去只能在单台机器上分析数据样本的日子已经一去不复返了。得益于大数据、更好和分布式计算以及诸如 Apache Spark 等用于大数据处理和开源分析的框架,我们可以每天对可能数十亿条日志消息执行可扩展的日志分析。本以案例研究为导向的教程旨在采用动手实践的方式,展示我们如何利用 Spark 对半结构化日志数据进行大规模日志分析。如果您对使用 Spark 进行可扩展的 SQL 感兴趣,请随时查看 使用 Spark 进行大规模 SQL

虽然市面上有许多优秀的开源框架和工具用于日志分析,例如 Elasticsearch,但本分为两部分的教程旨在展示如何利用 Spark 进行大规模日志分析。当然,在实际应用中,您可以自由选择自己的工具箱来分析日志数据。

让我们开始吧!

主要目标:NASA 日志分析

正如我们之前提到的,Apache Spark 是一个出色且理想的开源框架,用于大规模地整理、分析和建模结构化和非结构化数据!在本教程中,我们的主要目标是业界最流行的用例之一——日志分析。服务器日志是常见的企业数据源,通常包含大量可操作的见解和信息。在这些情况下,日志数据来自许多来源,例如 Web、客户端和计算服务器、应用程序、用户生成的内容和平面文件。这些日志可用于监控服务器、改进业务和客户情报、构建推荐系统、欺诈检测等等。

Spark 允许您廉价地将日志转储和存储到磁盘上的文件中,同时仍然提供丰富的 API 来进行大规模数据分析。这个动手案例研究将向您展示如何在来自 NASA 的真实生产日志上使用 Apache Spark,同时学习数据整理和基本但功能强大的探索性数据分析技术。在本研究中,我们将分析来自佛罗里达州 NASA 肯尼迪航天中心 Web 服务器的日志数据集。

完整的数据集——包含两个月内对 NASA 肯尼迪航天中心的所有 HTTP 请求——可在此处 免费 下载。或者,如果您喜欢 FTP

接下来,如果您想继续学习,请从 我的 GitHub 下载本教程,并将这两个文件放在与教程的 Jupyter Notebook 相同的目录中。

设置依赖项

第一步是确保您可以访问 Spark 会话和集群。对于此步骤,您可以使用自己的本地 Spark 设置或基于云的设置。通常,大多数云平台现在都提供 Spark 集群,您还可以使用免费选项,包括 Databricks 社区版。本教程假设您已经设置了 Spark,因此我们不会花费额外的时间从头开始配置或设置 Spark。

通常,预配置的 Spark 设置在您启动 Jupyter Notebook 服务器时已经预加载了必要的环境变量或依赖项。在我的情况下,我可以使用笔记本中的以下命令检查它们

spark

Spark session

这些结果表明我的集群目前正在运行 Spark 2.4.0。我们还可以使用以下代码检查 sqlContext 是否存在

sqlContext

<pyspark.sql.context.SQLContext at 0x7fb1577b6400>

现在,如果您没有预配置这些变量并且收到错误,您可以使用以下代码加载和配置它们

# configure spark variables
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
    
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

# load up other dependencies
import re
import pandas as pd

我们还需要加载其他库来处理 DataFrames正则表达式。使用正则表达式是解析日志文件的主要方面之一。此工具提供了一种强大的模式匹配技术,可用于提取和查找半结构化和非结构化数据中的模式。

The Perl Problems strip from xkcd.

xkcd 的 Perl Problems 漫画 xkcd

正则表达式可能非常有效和强大,但它们也可能令人感到不知所措和困惑。不过不用担心,通过练习,您可以真正发挥它们的最大潜力。以下示例展示了在 Python 中使用正则表达式的一种方法。在这里,我们尝试在给定的输入句子中查找单词 'spark' 的所有出现次数。

m = re.finditer(r'.*?(spark).*?', "I'm searching for a spark in PySpark", re.I)
for match in m:
    print(match, match.start(), match.end())

<_sre.SRE_Match object; span=(0, 25), match=“I’m searching for a spark”> 0 25

<_sre.SRE_Match object; span=(25, 36), match=’ in PySpark’> 25 36

让我们继续分析的下一部分。

加载和查看 NASA 日志数据集

鉴于我们的数据存储在以下路径(以平面文件的形式),让我们将其加载到 DataFrame 中。 我们将分步进行。以下代码加载我们磁盘的日志数据文件名

import glob

raw_data_files = glob.glob('*.gz')
raw_data_files

[‘NASA_access_log_Jul95.gz’, ‘NASA_access_log_Aug95.gz’]

现在,我们将使用 sqlContext.read.text()spark.read.text() 读取文本文件。此代码生成一个 DataFrame,其中包含一个名为 value 的字符串列

base_df = spark.read.text(raw_data_files)
base_df.printSchema()

root

|-- value: string (nullable = true)

此输出使我们能够查看日志数据模式的文本,我们稍后将检查该模式。您可以使用以下代码查看保存日志数据的数据结构的类型

type(base_df)

pyspark.sql.dataframe.DataFrame

在本教程中,我们始终使用 Spark DataFrame。但是,如果您愿意,您也可以通过添加以下代码将 DataFrame 转换为 弹性分布式数据集 (RDD)(Spark 的原始数据结构),如果需要的话

base_df_rdd = base_df.rdd
type(base_df_rdd)

pyspark.rdd.RDD

现在让我们看一下 DataFrame 中的实际日志数据:

base_df.show(10, truncate=False)

The log data within the dataframe.

base_df.show dataframe 中的日志数据

此结果看起来确实像是标准的半结构化服务器日志数据。在文件变得有用之前,我们肯定需要进行一些数据处理和整理。请记住,从 RDD 访问数据的方式略有不同,如下所示

base_df_rdd.take(10)

Log data from the resilient distributed datasets.

图 5:通过 base_df_rdd 在 dataframe 中的日志数据

既然我们已经加载并查看了日志数据,让我们对其进行处理和整理。

数据整理

在本节中,我们清理和解析日志数据集,以从每条日志消息中提取具有有意义信息的结构化属性。

了解日志数据

如果您熟悉 Web 服务器日志,您会认识到上面显示的数据采用 通用日志格式。字段为:  

remotehost rfc931 authuser [date] "request" status bytes

字段 描述
remotehost 远程主机名(或 IP 号码,如果 DNS 主机名不可用或 DNSLookup 已关闭)。
rfc931 用户的远程登录名(如果存在)。
authuser HTTP 服务器验证后远程用户的用户名。
[date] 请求的日期和时间。
“request” 请求,与来自浏览器或客户端的请求完全相同。
status 服务器发回客户端的 HTTP 状态代码
bytes 传输到客户端的字节数 (Content-Length)。

我们现在需要技术来解析、匹配和从日志数据中提取这些属性。

使用正则表达式进行数据解析和提取

接下来,我们必须将半结构化日志数据解析为单独的列。我们将使用特殊的内置 regexp_extract()  函数进行解析。此函数将列与具有一个或多个 捕获 的正则表达式匹配,并允许您提取匹配的组之一。我们将为要提取的每个字段使用一个正则表达式。

您现在一定已经听说或使用过相当多的正则表达式。如果您发现正则表达式令人困惑(它们当然可能令人困惑),并且您想了解更多关于它们的信息,我们建议您查看 RegexOne 网站。您可能还会发现 Goyvaerts 和 Levithan 编写的 正则表达式食谱 是一本有用的参考书。

让我们看一下我们在数据集中处理的日志总数

print((base_df.count(), len(base_df.columns)))

(3461613, 1)

看起来我们总共有大约 346 万条日志消息。数量不少!让我们提取并查看一些示例日志消息

sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

Sample log messages.

示例日志消息。

提取主机名

让我们编写一些正则表达式来从日志中提取主机名

host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = [re.search(host_pattern, item).group(1)
           if re.search(host_pattern, item)
           else 'no match'
           for item in sample_logs]
hosts

[‘199.72.81.55’,

‘unicomp6.unicomp.net’,

‘199.120.110.21’,

‘burger.letters.com’,

…,

…,

‘unicomp6.unicomp.net’,

‘d104.aa.net’,

‘d104.aa.net’]

提取时间戳

让我们使用正则表达式从日志中提取时间戳字段

ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
timestamps = [re.search(ts_pattern, item).group(1) for item in sample_logs]
timestamps

[‘01/Jul/1995:00:00:01 -0400’,

‘01/Jul/1995:00:00:06 -0400’,

‘01/Jul/1995:00:00:09 -0400’,

…,

…,

‘01/Jul/1995:00:00:14 -0400’,

‘01/Jul/1995:00:00:15 -0400’,

‘01/Jul/1995:00:00:15 -0400’]

提取 HTTP 请求方法、URI 和协议

现在,让我们使用正则表达式从日志中提取 HTTP 请求方法、URI 和协议模式字段

method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
               if re.search(method_uri_protocol_pattern, item)
               else 'no match'
              for item in sample_logs]
method_uri_protocol

[(‘GET’, ‘/history/apollo/’, ‘HTTP/1.0’),

(‘GET’, ‘/shuttle/countdown/’, ‘HTTP/1.0’),

…,

…,

(‘GET’, ‘/shuttle/countdown/count.gif’, ‘HTTP/1.0’),

(‘GET’, ‘/images/NASA-logosmall.gif’, ‘HTTP/1.0’)]

提取 HTTP 状态代码

现在,让我们使用正则表达式从日志中提取 HTTP 状态代码:

status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) for item in sample_logs]
print(status)

[‘200’, ‘200’, ‘200’, ‘304’, …, ‘200’, ‘200’]

提取 HTTP 响应内容大小

现在,让我们使用正则表达式从日志中提取 HTTP 响应内容大小:

content_size_pattern = r'\s(\d+)$'
content_size = [re.search(content_size_pattern, item).group(1) for item in sample_logs]
print(content_size)

[‘6245’, ‘3985’, ‘4085’, ‘0’, …, ‘1204’, ‘40310’, ‘786’]

整合在一起

现在,让我们利用我们之前构建的所有正则表达式模式,并使用 regexp_extract(...) 方法来构建我们的 DataFrame,其中所有日志属性都整齐地提取到它们自己的单独列中。

from pyspark.sql.functions import regexp_extract

logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

Log dataframe extracted using regexp_extract(...)

使用 regexp_extract(...) 提取的日志 dataframe

查找缺失值

缺失值和空值是数据分析和机器学习的祸根。让我们看看我们的数据解析和提取逻辑的效果如何。首先,让我们验证原始 DataFrame 中是否没有空行

(base_df
    .filter(base_df['value']
                .isNull())
    .count())

0

一切都好!现在,如果我们的数据解析和提取工作正常,我们应该没有任何具有潜在空值的行。让我们尝试进行测试

bad_rows_df = logs_df.filter(logs_df['host'].isNull()| 
                             logs_df['timestamp'].isNull() | 
                             logs_df['method'].isNull() |
                             logs_df['endpoint'].isNull() |
                             logs_df['status'].isNull() |
                             logs_df['content_size'].isNull()|
                             logs_df['protocol'].isNull())
bad_rows_df.count()

33905

哎呀!看起来我们的数据中有超过 3.3 万个缺失值!我们可以处理这个问题吗?

请记住,这不是您可以直接查询并获取哪些列具有空值的常规 pandas (链接) DataFrame。我们所谓的大数据集驻留在磁盘上,可能存在于 Spark 集群中的多个节点中。那么我们如何找出哪些列可能具有空值呢?

查找空值计数

我们通常可以使用以下技术来找出哪些列具有空值。

注意: 此方法改编自 StackOverflow一个优秀的答案

from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum

def count_null(col_name):
    return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

# Build up a list of column expressions, one per column.
exprs = [count_null(col_name) for col_name in logs_df.columns]

# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
logs_df.agg(*exprs).show()

 

Checking which columns have null values.

使用 count_null() 检查哪些列具有空值

好吧,看起来我们在 status 列中有一个缺失值,而其他所有值都在 content_size 列中。让我们看看我们是否可以找出问题所在!

处理 HTTP 状态中的空值

我们用于 status 列的原始解析正则表达式是

regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer')
                                          .alias( 'status')

可能是数字更多,导致我们的正则表达式错误?还是数据点本身不好?让我们找出答案。

注意:在下面的表达式中,波浪号 (~) 表示“非”。

null_status_df = base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))
null_status_df.count()

1

让我们看看这个坏记录是什么样的

null_status_df.show(truncate=False)

A bad record that's missing information.

一个缺少信息的坏记录,通过 null_status_df

看起来是一条缺少大量信息的记录。让我们通过我们的日志数据解析管道传递它

bad_status_df = null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                                      regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                                      regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                                      regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                                      regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                                      regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                                      regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
bad_status_df.show(truncate=False)

The full bad log record containing no information and two null entries.

包含无信息和两个空条目的完整坏日志记录。

看起来记录本身是不完整的记录,没有有用的信息,最好的选择是按如下方式删除此记录

logs_df = logs_df[logs_df['status'].isNotNull()]
exprs = [count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs).show()

The dropped record.

已删除的记录。

处理 HTTP 内容大小中的空值

根据我们之前的正则表达式,我们用于 content_size 列的原始解析正则表达式是

regexp_extract('value', r'\s(\d+)$', 1).cast('integer')
                                       .alias('content_size')

原始数据集中本身可能缺少数据吗?让我们找出答案。我们首先在我们的基本 DataFrame 中查找具有潜在缺失内容大小的记录

null_content_size_df = base_df.filter(~base_df['value'].rlike(r'\s\d+$'))
null_content_size_df.count()

33905

该数字似乎与我们处理后的 DataFrame 中缺失内容大小值的数量相匹配。让我们看一下数据帧中前十条具有缺失内容大小的记录

null_content_size_df.take(10)

The top 10 dataframe records with missing content sizes.

前 10 条具有缺失内容大小的 dataframe 记录。

很明显,错误的原始数据记录对应于错误响应,其中没有发送回任何内容,并且服务器为 content_size 字段发出了 -。由于我们不想从分析中丢弃这些行,因此让我们推算或用 0 填充它们。

修复 content_size 为空的行

最简单的解决方案是将 logs_df 中的空值替换为 0,就像我们之前讨论的那样。Spark DataFrame API 提供了一组专门用于处理空值的函数和字段,其中包括

  • fillna(),它使用指定的非空值填充空值。
  • na,它返回一个 DataFrameNaFunctions 对象,其中包含许多用于操作空列的函数。 

有几种方法可以调用此函数。最简单的方法只是用已知值替换所有空列。但是,为了安全起见,最好传递一个包含 (column_name, value) 映射的 Python 字典。这就是我们要做的。文档中的一个示例如下所示:

>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+---+------+-------+
|age|height|   name|
+---+------+-------+
| 10|    80|  Alice|
|  5|  null|    Bob|
| 50|  null|    Tom|
| 50|  null|unknown|
+---+------+-------+

现在我们使用此函数将 content_size 字段中的所有缺失值填充为 0

logs_df = logs_df.na.fill({'content_size': 0})
exprs = [count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs).show()

The null values now replaced by zero.

空值现在已替换为零。

看,没有缺失值了!

处理时间字段(时间戳)

现在我们有了一个干净、解析的 DataFrame,我们必须将时间戳字段解析为实际的时间戳。通用日志格式时间有点非标准。 用户定义函数 (UDF) 是解析它的最直接方法:

from pyspark.sql.functions import udf

month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

def parse_clf_time(text):
    """ Convert Common Log time format into a Python datetime object
    Args:
        text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving
    return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
      int(text[7:11]),
      month_map[text[3:6]],
      int(text[0:2]),
      int(text[12:14]),
      int(text[15:17]),
      int(text[18:20])
    )

现在让我们使用此函数解析 DataFrame 的 time 

udf_parse_time = udf(parse_clf_time)

logs_df = (logs_df.select('*', udf_parse_time(logs_df['timestamp'])
                                  .cast('timestamp')
                                  .alias('time'))
                  .drop('timestamp')
logs_df.show(10, truncate=True)

The timestamp parsed with a User-Defined Function (UDF).

使用用户定义函数 (UDF) 解析的时间戳。

一切似乎都很好!让我们通过检查 DataFrame 的模式来验证这一点

logs_df.printSchema()

root

|-- host: string (nullable = true)

|-- method: string (nullable = true)

|-- endpoint: string (nullable = true)

|-- protocol: string (nullable = true)

|-- status: integer (nullable = true)

|-- content_size: integer (nullable = false)


|-- time: timestamp (nullable = true)

现在让我们缓存 logs_df,因为我们将在本系列的 第二部分 中广泛使用它进行数据分析。

logs_df.cache()

结论

获取、处理和整理数据是任何端到端数据科学或分析用例中最重要的一些步骤。当处理大规模的半结构化或非结构化数据时,事情开始变得更加困难。本案例研究为您提供了一个逐步的动手实践方法,以利用 Python 和 Spark 等开源工具和框架的力量,大规模处理和整理半结构化的 NASA 日志数据。一旦我们准备好干净的数据集,我们终于可以开始使用它来获得有关 NASA 服务器的有用见解。点击进入本系列的第二篇文章,获取关于使用 Python 和 Apache Spark 分析和可视化 NASA 日志数据的 动手教程。   


本文最初发表在 Medium 的 Towards Data Science 频道,并经许可重新发表。 


 

User profile image.
Dipanjan (DJ) Sarkar 是 Red Hat 的数据科学家、已出版的作家、顾问和培训师。他曾为多家初创公司以及英特尔等财富 500 强公司提供咨询和工作。他主要致力于利用数据科学、机器学习和深度学习来构建大规模智能系统。

评论已关闭。

Creative Commons License本作品根据 Creative Commons Attribution-Share Alike 4.0 International License 获得许可。
© . All rights reserved.