使用 ELK Stack 和 Ruby 构建数据处理管道

我们如何以及时且经济高效的方式从应用程序日志中提取有价值的数据?
228 位读者喜欢这篇文章。
Rocks stacked

Tony Smith via Flickr (CC BY 2.0)

应用程序日志通常包含有价值的数据。我们如何以及时且经济高效的方式提取这些数据?作为一个示例应用程序,我们将讨论一个多租户系统,我们在其中通过子域名托管多个站点。日志文件中的 URL 包含路径 (/api, /search, 等) 和参数 (?foo=bar)

如果我们不想使用 ELK,我们可以构建一个不同的数据处理管道,使用 API 接收消息,将它们放入队列,然后让工作进程处理数据。我在这篇博文中概述了这种方法,并将其与 ELK 进行了比较。

在我们的 ELK 解决方案中,我们将按客户和日期将数据拆分到单独的 Elasticsearch 索引中,并构建报告以显示哪些 URL 路径被访问。这是处理时间序列数据时的常见模式。

为了保持简单,我们将使用负载均衡器日志,它包含与 Web 服务器日志相同的信息,但更加集中化。我们将配置我们的 AWS 负载均衡器,使其每五分钟将日志发布到 S3 存储桶。日志将从那里被 Logstash 拾取并处理到 Elasticsearch 中。

这是一个 ELB 日志文件的示例行

2018-05-10T18:26:13.276Z ELB_NAME 73.157.179.139:60708 10.0.1.42:80 0.000021
0.000303 0.000014 200 200 0 68 "GET https://site1.mysystem.com/api?foo=bar...
HTTP/1.1" "Mozilla/5.0 (Linux; Android 7.0; SM-T580 Build/NRD90M; wv)
AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/65.0.3325.109 Safari/537.36 [Pinterest/Android]"
ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2

Logstash 配置

我们将从 Logstash S3 输入插件开始

# /etc/logstash/conf.d/s3_elastic.conf
input {
  s3 {
    aws_credentials_file => "./aws_credentials_file.yml"
    bucket               => "my-elb-logs"
    prefix               => "subfolder/path/here/"
    sincedb_path         => "./data/plugins/inputs/s3/sincedb_s3_elastic"    
  }
}

Logstash 使用一个 sincedb 文件来跟踪其在日志文件处理中的位置。如果我们停止 Logstash 并在稍后启动它,它将处理在停机期间累积的日志。

然后我们配置 Elasticsearch 输出插件。stdout 可以用于调试。我们将在本文后面讨论 [@metadata][index]

# /etc/logstash/conf.d/s3_elastic.conf
output {
  stdout { codec => rubydebug { metadata => true } }
  elasticsearch {
    hosts     => [127.0.0.1]
    user      => "elastic"
    password  => "password-here"
    index     => "%{[@metadata][index]}"
  }
}

对于过滤,我们将从 Grok 开始,然后删除不必要的字段

# /etc/logstash/conf.d/s3_elastic.conf
filter {
  grok  {    
     match        => { "message" => "%{ELB_ACCESS_LOG}"}
     remove_field => [ "elb", "backendip", "backendport", ...]
  }
}

Logstash 为我们提供了可靠的 Grok 模式,可以将每个日志文件行解析为 Event 对象。现在我们的数据看起来像这样

{
       "request" => "http://site1.mysystem.com/api?foo=bar",
          "path" => "/api",
    "@timestamp" => 2018-05-10T18:26:13.276Z,
      "response" => 200,
      "clientip" => "73.157.179.139",
        "params" => "?foo=bar",
        "message" => "...",
        ...
}

Ruby 代码

我们需要实现业务逻辑来验证和转换我们的数据。考虑到此用例的简单要求,我们可以不用 Ruby 就能完成,但它可以为我们提供更大的灵活性和控制。我们需要提取 URL 主机,它将用作索引名称的一部分。我们还想从 URL 中获取 foo 参数。我们可以从内联 Ruby 代码开始

# /etc/logstash/conf.d/s3_elastic.conf
filter {
  ruby {
    code =>   "
              require 'uri'
              uri = URI(event.get('request'))
              event.set('host', uri.host)
              foo_value = CGI::parse(event.get('params'))['foo'].first
              event.set('foo', foo_value)              
              "
  }

现在我们的 Event 对象包含单独的 hostfoo 字段

{
       "request" => "http://site1.mysystem.com/api?foo=bar",
          "path" => "/api",
          ...
          "host" => "site1.mysystem.com",
           "foo" => "bar",
}

将代码放在配置文件中不是一种可扩展的方法,并且难以测试。幸运的是,最新版本的 Ruby 过滤器插件 支持从 .conf 文件引用单独的 Ruby 脚本,我们可以使用自动化测试来测试我们的代码。我们通过指定 Ruby 脚本的路径来修改 .conf 文件

# /etc/logstash/conf.d/s3_elastic.conf file
filter {
  ruby {
    path => "/etc/logstash/ruby/s3_elastic.rb"
    # script_params => {  }
  }
}

一个区别是,现在 Ruby 必须从外部脚本文件返回 Event 对象数组。

# /etc/logstash/ruby/s3_elastic.rb
require 'uri'
# the value of `params` is the value of the hash passed to `script_params`
# in the logstash configuration
def register(params)
end
# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
  uri = URI(event.get('request'))
  event.set('host', uri.host)
  foo_value = CGI::parse(event.get('params'))['foo'].first
  event.set('foo', foo_value)
  return [event]
end
test 'valid test' do
  parameters { {  } }
  in_event do { 'request' => 'http://site1.mysystem.com/api?foo=bar' } end
  expect('params') do |events|
    events.first.get('host') == 'site1.mysystem.com'
    events.first.get('foo') == 'bar'
  end
end

我们可以通过指定 -t 标志来运行自动化测试,例如:logstash -f /etc/logstash/conf/s3_elastic.conf -t

[logstash.filters.ruby.script] Test run complete
{:script_path=>"/etc/logstash/ruby/s3_elastic.rb",
  :results=>{:passed=>1, :failed=>0, :errored=>0}}
Configuration OK
[logstash.runner] Using config.test_and_exit mode. Config Validation Result: OK.
Exiting Logstash

我们需要确定在索引名称中使用哪个日期,因为我们不能假定是当前日期。为此,我们将使用 timestamp 字段 (2018-05-10T18:26:13.276Z)。我们还可以提取用于确定索引的业务逻辑到一个单独的方法中。如果发生错误,我们将默认使用今天的日期

# /etc/logstash/ruby/s3_elastic.rb
def filter(event)
  ...
  event.set("[@metadata][index]", get_index(event))
  return [event]
end
def get_index event
  host = event.get('host')
  date = event.get('timestamp').split('T').first
  "#{host}-#{date}"
rescue
  "#{host}-#{Time.now.strftime("%Y.%m.%d")}"
end
...

我们使用 event.set 来创建 [@metadata][index] 字段。它不会与文档一起保存,但可以在我们的 .conf 文件中用于指定索引。这种方法允许我们将主机与日期组合的逻辑保留在同一个 Ruby 方法中。

聚合

我们现在可以使用 Kibana(甚至 curl)来运行聚合。我们可以跨所有索引进行查询,以了解哪些 URL 路径被访问以及访问频率。

POST /*/_search?size=0
{
  "aggs" : {
    "path_count" : {
      "terms" : {
        "field" : "path.keyword"
      }
    }
  }
}

数据将像这样返回

{
  "took": 709,
  "timed_out": false,
  "_shards": {
    ...
  },
  "hits": {
    ...
  },
  "aggregations": {
    "path_count": {
      ...
      "buckets": [
        {
          "key": "/api",
          "doc_count": 913281
        },
        {
          "key": "/search",
          "doc_count": 742813
        },
        ...
      ]
    }
  }
}

如果我们想查询特定客户或日期的数据,我们需要在 POST /*2018.05.10/_search?size=0 中将其指定为索引模式。Kibana 还允许我们基于这些聚合构建可视化和仪表板。

链接

标签
User profile image.
Oracle Cloud 高级软件开发工程师。此处表达的观点仅代表我个人,不一定代表我的雇主。构建出色软件最重要的事情不是如何做,而是客户需要什么以及为什么存在这种需求。我与客户、高管和其他工程师紧密合作,回答这些问题,然后构建出色的软件。

评论已关闭。

Creative Commons License本作品根据 Creative Commons Attribution-Share Alike 4.0 International License 许可。
© . All rights reserved.