应用程序日志通常包含有价值的数据。我们如何以及时且经济高效的方式提取这些数据?作为一个示例应用程序,我们将讨论一个多租户系统,我们在其中通过子域名托管多个站点。日志文件中的 URL 包含路径 (/api, /search, 等)
和参数 (?foo=bar)
。
如果我们不想使用 ELK,我们可以构建一个不同的数据处理管道,使用 API 接收消息,将它们放入队列,然后让工作进程处理数据。我在这篇博文中概述了这种方法,并将其与 ELK 进行了比较。
在我们的 ELK 解决方案中,我们将按客户和日期将数据拆分到单独的 Elasticsearch 索引中,并构建报告以显示哪些 URL 路径被访问。这是处理时间序列数据时的常见模式。
为了保持简单,我们将使用负载均衡器日志,它包含与 Web 服务器日志相同的信息,但更加集中化。我们将配置我们的 AWS 负载均衡器,使其每五分钟将日志发布到 S3 存储桶。日志将从那里被 Logstash 拾取并处理到 Elasticsearch 中。
这是一个 ELB 日志文件的示例行
2018-05-10T18:26:13.276Z ELB_NAME 73.157.179.139:60708 10.0.1.42:80 0.000021
0.000303 0.000014 200 200 0 68 "GET https://site1.mysystem.com/api?foo=bar...
HTTP/1.1" "Mozilla/5.0 (Linux; Android 7.0; SM-T580 Build/NRD90M; wv)
AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/65.0.3325.109 Safari/537.36 [Pinterest/Android]"
ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2
Logstash 配置
我们将从 Logstash S3 输入插件开始
# /etc/logstash/conf.d/s3_elastic.conf
input {
s3 {
aws_credentials_file => "./aws_credentials_file.yml"
bucket => "my-elb-logs"
prefix => "subfolder/path/here/"
sincedb_path => "./data/plugins/inputs/s3/sincedb_s3_elastic"
}
}
Logstash 使用一个 sincedb
文件来跟踪其在日志文件处理中的位置。如果我们停止 Logstash 并在稍后启动它,它将处理在停机期间累积的日志。
然后我们配置 Elasticsearch 输出插件。stdout
可以用于调试。我们将在本文后面讨论 [@metadata][index]
。
# /etc/logstash/conf.d/s3_elastic.conf
output {
stdout { codec => rubydebug { metadata => true } }
elasticsearch {
hosts => [127.0.0.1]
user => "elastic"
password => "password-here"
index => "%{[@metadata][index]}"
}
}
对于过滤,我们将从 Grok 开始,然后删除不必要的字段
# /etc/logstash/conf.d/s3_elastic.conf
filter {
grok {
match => { "message" => "%{ELB_ACCESS_LOG}"}
remove_field => [ "elb", "backendip", "backendport", ...]
}
}
Logstash 为我们提供了可靠的 Grok 模式,可以将每个日志文件行解析为 Event 对象。现在我们的数据看起来像这样
{
"request" => "http://site1.mysystem.com/api?foo=bar",
"path" => "/api",
"@timestamp" => 2018-05-10T18:26:13.276Z,
"response" => 200,
"clientip" => "73.157.179.139",
"params" => "?foo=bar",
"message" => "...",
...
}
Ruby 代码
我们需要实现业务逻辑来验证和转换我们的数据。考虑到此用例的简单要求,我们可以不用 Ruby 就能完成,但它可以为我们提供更大的灵活性和控制。我们需要提取 URL 主机,它将用作索引名称的一部分。我们还想从 URL 中获取 foo
参数。我们可以从内联 Ruby 代码开始
# /etc/logstash/conf.d/s3_elastic.conf
filter {
ruby {
code => "
require 'uri'
uri = URI(event.get('request'))
event.set('host', uri.host)
foo_value = CGI::parse(event.get('params'))['foo'].first
event.set('foo', foo_value)
"
}
现在我们的 Event 对象包含单独的 host
和 foo
字段
{
"request" => "http://site1.mysystem.com/api?foo=bar",
"path" => "/api",
...
"host" => "site1.mysystem.com",
"foo" => "bar",
}
将代码放在配置文件中不是一种可扩展的方法,并且难以测试。幸运的是,最新版本的 Ruby 过滤器插件 支持从 .conf 文件引用单独的 Ruby 脚本,我们可以使用自动化测试来测试我们的代码。我们通过指定 Ruby 脚本的路径来修改 .conf 文件
# /etc/logstash/conf.d/s3_elastic.conf file
filter {
ruby {
path => "/etc/logstash/ruby/s3_elastic.rb"
# script_params => { }
}
}
一个区别是,现在 Ruby 必须从外部脚本文件返回 Event 对象数组。
# /etc/logstash/ruby/s3_elastic.rb
require 'uri'
# the value of `params` is the value of the hash passed to `script_params`
# in the logstash configuration
def register(params)
end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
uri = URI(event.get('request'))
event.set('host', uri.host)
foo_value = CGI::parse(event.get('params'))['foo'].first
event.set('foo', foo_value)
return [event]
end
test 'valid test' do
parameters { { } }
in_event do { 'request' => 'http://site1.mysystem.com/api?foo=bar' } end
expect('params') do |events|
events.first.get('host') == 'site1.mysystem.com'
events.first.get('foo') == 'bar'
end
end
我们可以通过指定 -t 标志来运行自动化测试,例如:logstash -f /etc/logstash/conf/s3_elastic.conf -t
。
[logstash.filters.ruby.script] Test run complete
{:script_path=>"/etc/logstash/ruby/s3_elastic.rb",
:results=>{:passed=>1, :failed=>0, :errored=>0}}
Configuration OK
[logstash.runner] Using config.test_and_exit mode. Config Validation Result: OK.
Exiting Logstash
我们需要确定在索引名称中使用哪个日期,因为我们不能假定是当前日期。为此,我们将使用 timestamp
字段 (2018-05-10T18:26:13.276Z
)。我们还可以提取用于确定索引的业务逻辑到一个单独的方法中。如果发生错误,我们将默认使用今天的日期
# /etc/logstash/ruby/s3_elastic.rb
def filter(event)
...
event.set("[@metadata][index]", get_index(event))
return [event]
end
def get_index event
host = event.get('host')
date = event.get('timestamp').split('T').first
"#{host}-#{date}"
rescue
"#{host}-#{Time.now.strftime("%Y.%m.%d")}"
end
...
我们使用 event.set
来创建 [@metadata][index]
字段。它不会与文档一起保存,但可以在我们的 .conf 文件中用于指定索引。这种方法允许我们将主机与日期组合的逻辑保留在同一个 Ruby 方法中。
聚合
我们现在可以使用 Kibana(甚至 curl)来运行聚合。我们可以跨所有索引进行查询,以了解哪些 URL 路径被访问以及访问频率。
POST /*/_search?size=0
{
"aggs" : {
"path_count" : {
"terms" : {
"field" : "path.keyword"
}
}
}
}
数据将像这样返回
{
"took": 709,
"timed_out": false,
"_shards": {
...
},
"hits": {
...
},
"aggregations": {
"path_count": {
...
"buckets": [
{
"key": "/api",
"doc_count": 913281
},
{
"key": "/search",
"doc_count": 742813
},
...
]
}
}
}
如果我们想查询特定客户或日期的数据,我们需要在 POST /*2018.05.10/_search?size=0
中将其指定为索引模式。Kibana 还允许我们基于这些聚合构建可视化和仪表板。
链接
- https://elastic.ac.cn/blog/do-you-grok-grok
- https://elastic.ac.cn/guide/en/logstash/current/event-api.html
- https://elastic.ac.cn/guide/en/logstash/current/plugins-filters-ruby.h…
- https://elastic.ac.cn/blog/moving-ruby-code-out-of-logstash-pipeline
- https://github.com/logstash-plugins/logstash-patterns-core/blob/master/…
评论已关闭。