Logstash 是 Elastic 发布的开源工具,旨在摄取和转换数据。它最初是构建为将日志数据摄取到 ElasticSearch 中的日志处理管道。几个版本之后,它可以做更多的事情。
Logstash 的核心是一种提取-转换-加载 (ETL) 管道。非结构化日志数据被提取,过滤器对其进行转换,结果被加载到某种形式的数据存储中。
Logstash 可以获取像这样的 syslog 示例文本行
Sep 11 14:13:38 vorthys sshd[16998]: Received disconnect from 192.0.2.11 port 53730:11: disconnected by user
并将其转换为更丰富的数据结构
{
"timestamp": "1505157218000",
"host": "vorthys",
"program": "sshd",
"pid": "16998",
"message": "Received disconnect from 192.0.2.11 port 53730:11: disconnected by user",
"sshd_action": "disconnect",
"sshd_tuple": "192.0.2.11:513730"
}
根据您使用的后端存储,您可以使用索引字段而不是 grep terabytes 的文本来查找这样的事件。如果您每天生成数十到数百 GB 的日志,这很重要。
内部架构
Logstash 具有用 JRuby 实现的三阶段管道
输入阶段插件提取数据。这可以来自日志文件、TCP 或 UDP 侦听器、多个协议特定的插件(例如 syslog 或 IRC),甚至排队系统(例如 Redis、AQMP 或 Kafka)。此阶段使用有关事件来源的元数据标记传入事件。
过滤器阶段插件转换和丰富数据。这是生成上面示例中 sshd_action
和 sshd_tuple
字段的阶段。这是您找到 Logstash 大部分价值的地方。
输出阶段插件将处理后的事件加载到其他地方,例如 ElasticSearch 或其他文档数据库,或者排队系统,例如 Redis、AQMP 或 Kafka。它也可以配置为与 API 通信。也可以将类似 PagerDuty 的东西连接到您的 Logstash 输出。
有一个 cron 作业检查您的备份是否成功完成?它可以在日志流中发出警报。这被输入拾取,并且设置为捕获这些事件的过滤器配置对其进行标记,允许条件输出知道此事件是为其而设的。这就是您可以将警报添加到原本需要创建自己的通知层或在不允许与外部世界通信的系统上运行的脚本的方式。
线程
一般来说,每个输入都在自己的线程中运行。过滤器和输出阶段更复杂。在 Logstash 1.5 到 2.1 中,过滤器阶段具有可配置的线程数,而输出阶段占用单个线程。这在 Logstash 2.2 中发生了变化,当时构建了过滤器阶段线程来处理输出阶段。由于内部队列减少了一个要跟踪的队列,因此 Logstash 2.2 的吞吐量得到了提高。
如果您运行的是旧版本,则值得升级到至少 2.2。当我们从 1.5 迁移到 2.2 时,我们看到整体吞吐量增加了 20-25%。Logstash 在等待状态下花费的时间也更少,因此我们使用了更多的 CPU(47% 对 75%)。
配置管道
Logstash 可以接受单个文件或目录作为其配置。如果给定一个目录,它会按词汇顺序读取文件。这很重要,因为排序对于过滤器插件很重要(我们稍后将更详细地讨论)。
这是一个裸 Logstash 配置文件
input { }
filter { }
output { }
这些中的每一个都将包含零个或多个插件配置,并且可以有多个块。
输入配置
输入部分可能如下所示
input {
syslog {
port => 514
type => “syslog_server”
}
}
这告诉 Logstash 在端口 514 上打开 syslog { }
插件,并将通过该插件传入的每个事件的文档类型设置为 syslog_server
。此插件仅遵循 RFC 3164,而不遵循较新的 RFC 5424。
这是一个稍微更复杂的输入块
# Pull in syslog data
input {
file {
path => [
"/var/log/syslog",
"/var/log/auth.log"
]
type => "syslog"
}
}
# Pull in application-log data. They emit data in JSON form.
input {
file {
path => [
"/var/log/app/worker_info.log",
"/var/log/app/broker_info.log",
"/var/log/app/supervisor.log"
]
exclude => "*.gz"
type => "applog"
codec => "json"
}
}
这个使用两个不同的 input { }
块来调用 file { }
插件的不同调用:一个跟踪系统级日志,另一个跟踪应用程序级日志。通过使用两个不同的 input { }
块,为每个块生成一个 Java 线程。对于多核系统,不同的内核跟踪配置的文件;如果一个线程阻塞,另一个线程将继续运行。
这两个 file { }
块都可以放在同一个 input { }
块中;它们只是在同一个线程中运行——Logstash 真的不在乎。
过滤器配置
过滤器部分是您将数据转换为更新、更易于使用的东西的地方。过滤器可能非常复杂。以下是一些实现不同目标的过滤器示例
filter {
if [program] == "metrics_fetcher" {
mutate {
add_tag => [ 'metrics' ]
}
}
}
在此示例中,如果 program
字段(由顶部示例输入中的 syslog
插件填充)读取 metrics_fetcher
,则它会标记事件 metrics
。此标记可以在以后的过滤器插件中使用,以进一步丰富数据。
filter {
if "metrics" in [tags] {
kv {
source => "message"
target => "metrics"
}
}
}
此过滤器仅在 metrics
在标记列表中时运行。然后它使用 kv { }
插件 根据 message
字段中的 key=value
对填充一组新字段。这些新键作为 metrics
字段的子字段放置,允许文本 pages_per_second=42 faults=0
变为事件上的 metrics.pages_per_second = 42
和 metrics.faults = 0
。
为什么不将此放入与设置 tag
值的条件相同的位置?因为事件可以通过多种方式获得 metrics
标记——这样,kv
过滤器将处理所有这些事件。
由于过滤器是排序的,因此确保在检查过滤器之前运行定义 metrics
标记的过滤器插件非常重要。以下是确保您的过滤器部分以最佳方式排序的指南
- 您的早期过滤器应尽可能多地应用元数据。
- 使用元数据,执行事件的详细解析。
- 在您的后期过滤器中,规范化您的数据以减少下游问题。
- 确保字段数据类型被强制转换为统一值。
priority
可以是布尔值、整数或字符串。- 包括 ElasticSearch 在内的一些系统会悄悄地为您转换类型。将字符串发送到布尔字段不会给您想要的结果。
- 如果值的数据类型不正确,其他系统将直接拒绝该值。
mutate { }
插件 在这里很有用,因为它具有将字段强制转换为特定数据类型的方法。
- 确保字段数据类型被强制转换为统一值。
以下是有助于从长字符串中提取字段的有用插件
- date:许多日志记录系统发出时间戳。此插件解析该时间戳并将事件的时间戳设置为该嵌入时间。默认情况下,事件的时间戳是它被摄取的时间,这可能是几秒钟、几小时甚至几天后。
- kv:如前所述,它可以将像
backup_state=failed progress=0.24
这样的字符串转换为您可以对其执行操作的字段。 - csv:当给定要期望的列列表时,它可以基于逗号分隔值在事件上创建字段。
- json:如果字段以 JSON 格式格式化,这将将其转换为字段。非常强大!
- xml:与 JSON 插件类似,这将把包含 XML 数据的字段转换为新字段。
- grok:这是您的正则表达式引擎。如果您需要将像
The accounting backup failed
这样的字符串转换为将通过if [backup_status] == 'failed'
的内容,这将做到这一点。- Grok 可以填写自己的文章,所以我将 将您转发到我 LISA 课程的这个示例 和这个 用于扩展 grok 的规则列表。
输出配置
Elastic 希望您将其全部发送到 ElasticSearch,但是任何可以接受 JSON 文档或其表示的数据结构的任何东西都可以作为输出。请记住,事件可以发送到多个输出。考虑这个指标示例
output {
# Send to the local ElasticSearch port, and rotate the index daily.
elasticsearch {
hosts => [
"localhost",
"logelastic.prod.internal"
]
template_name => "logstash"
index => "logstash-{+YYYY.MM.dd}"
}
if "metrics" in [tags] {
influxdb {
host => "influx.prod.internal"
db => "logstash"
measurement => "appstats"
# This next bit only works because it is already a hash.
data_points => "%{metrics}"
send_as_tags => [ 'environment', 'application' ]
}
}
}
还记得上面的 metrics
示例吗?这就是我们如何输出它。标记为 metrics
的事件将以其完整事件形式发送到 ElasticSearch。此外,该事件上 metrics
字段下的子字段将发送到 influxdb,在 logstash
数据库中,在 appstats
测量下。除了测量值外,environment
和 application
字段的值将作为索引标签提交。
有很多输出。以下是一些按类型分组的输出
- API 端点:Jira、PagerDuty、Rackspace、Redmine、Zabbix
- 队列:Redis、Rabbit、Kafka、SQS
- 消息平台:IRC、XMPP、HipChat、电子邮件、IMAP
- 文档数据库:ElasticSearch、MongoDB、Solr
- 指标:OpenTSDB、InfluxDB、Nagios、Graphite、StatsD
- 文件和其他静态工件:文件、CSV、S3
还有更多 输出插件。
特殊之处:编解码器
编解码器是 Logstash 配置的一个特殊部分。我们在上面的 file {}
示例中看到了一个使用过的编解码器。
# Pull in application-log data. They emit data in JSON form.
input {
file {
path => [
"/var/log/app/worker_info.log",
"/var/log/app/broker_info.log",
"/var/log/app/supervisor.log"
]
exclude => "*.gz"
type => "applog"
codec => "json"
}
}
在这种情况下,文件插件配置为使用 json
编解码器。这告诉文件插件期望文件中的每一行都有一个完整的 JSON 数据结构。如果您的日志可以以这样的结构发出,那么您的过滤器阶段将比您必须 grok、kv 和 csv 以进行丰富处理要短得多。
json_lines
编解码器 的不同之处在于它将根据 feed 中的换行符分隔事件。当使用像 tcp { }
输入这样的东西时,这最有用,当连接程序流式传输 JSON 文档而无需每次都重新建立连接时。
multiline
编解码器 值得特别提及。顾名思义,这是一个您可以放在输入端以将多行事件(例如 Java 堆栈转储)重新组合成单个事件的编解码器。
input {
file {
path => '/var/log/stackdumps.log'
type => 'stackdumps'
codec => multiline {
pattern => "^\s"
what => previous
}
}
}
此编解码器告诉文件插件将任何以空格开头的日志行视为属于上一行。它将附加到带有新行和日志行内容的 message
字段。一旦它遇到不以空格开头的日志行,它将关闭事件并将其提交到过滤器阶段。
警告:由于 Logstash 的高度分布式特性,多行编解码器需要尽可能靠近日志源运行。如果它直接读取文件,那就太完美了。如果事件来自另一个系统(例如集中式 syslog 系统),则重新组合成单个事件将更具挑战性。
架构
Logstash 可以从一体化盒子扩展到需要复杂事件路由的巨型基础设施,然后才能处理事件以满足不同的业务所有者。
在此示例中,Logstash 在四个应用程序框中的每一个上运行。每个独立的配置将处理后的事件发送到集中的 ElasticSearch 集群。这可以扩展到很远,但这意味着您的日志处理资源正在与您的应用程序资源竞争。
此示例显示了一个基于 Syslog 的现有集中式日志记录基础设施,我们正在添加到其中。在这里,Logstash 安装在集中式日志记录框上,并配置为使用 rsyslog 的文件输出。然后将处理后的结果发送到 ElasticSearch。
进一步阅读
在 Jamie Riedesel 的演讲 S、M 和 L Logstash 架构:基础 中了解更多信息,该演讲将在 LISA17 上举行,LISA17 将于 10 月 29 日至 11 月 3 日在加利福尼亚州旧金山举行。
评论已关闭。