我们的许多客户正在使用我们的Webhook 功能构建有用的服务,但不幸的是,其他客户却没有。 我们经常听到他们团队中没有人足够精通编写可以提取 Webhook 有效负载并使用该数据做一些事情的服务。 这使得他们要么希望从其开发团队获得周期(不太可能),要么继续不用。
但是,如果您可以编写自己的 Web 服务呢? 您可以将多少涉及从系统 A 获取数据并将其输入到系统 B 的例行任务自动化?
学习足够的编码可以成为您工具箱中的一项重要技能,也是优化组织中安全流程的重要资产。 在这篇文章中,我将引导您完成一个教程,该教程将帮助您开始使用 Python Flask 编写自己的 Web 服务。
我们要构建什么
具体来说,我将引导您完成创建一个简单的 Python Flask 应用程序的过程,该应用程序提供 RESTful Web 服务。 该服务将提供一个端点来
- 从 Threat Stack 接收 JSON 格式的有效负载(Webhook)
- 解析有效负载以获取 Threat Stack 警报 ID
- 从 Threat Stack 检索详细的警报数据
- 将 Webhook 和警报数据存档到 AWS S3
但在我开始之前,请记住一些事项。 首先,我不会理会任何前端显示功能,因此您无需担心 HTML 或 CSS。 其次,我的组织遵循 Flask 自己的建议的组织方式。 我将跳过单模块模式,直接使用Packages and Blueprints 模型。
Flask 教程种类繁多。 一方面,有一些教程解释了如何构建小型、简单的应用程序(整个应用程序都放在一个文件中)。 另一方面,有一些教程解释了如何构建更大、更复杂的应用程序。 本教程填补了中间的空白,并演示了一个简单但可以立即适应日益复杂的需求的结构。
项目结构
我将构建的项目的结构来自 Explore Flask,如下所示
Threatstack-to-s3
├── app
│ ├── __init__.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── s3.py
│ │ └── threatstack.py
│ └── views
│ ├── __init__.py
│ └── s3.py
├── gunicorn.conf.py
├── requirements.osx.txt
├── requirements.txt
└── threatstack-to-s3.py
顶层文件
我将从顶层文件开始讨论,这些文件在构建服务时对我很有用
Gunicorn.conf.py: 这是 Gunicorn WSGI HTTP 服务器的配置文件,该服务器将提供此应用程序。 虽然应用程序可以自行运行并接受连接,但 Gunicorn 在处理多个连接并允许应用程序随负载扩展方面效率更高。
Requirements.txt/requirements.osx.txt: 应用程序的依赖项在此文件中列出。 pip 实用程序使用它来安装所需的 Python 包。 有关安装依赖项的信息,请参阅此README.md的“设置”部分。
Threatstack-to-s3.py: 这是应用程序启动器。 如果您正在进行本地调试,可以使用“python”直接运行它,或者可以将其作为参数传递给“gunicorn”作为应用程序入口点。 有关如何启动服务的信息,请参阅 README.md。
应用程序包(app/ 目录)
应用程序包是我的应用程序包。 应用程序的逻辑位于此目录下。 正如我之前提到的,我已选择将应用程序分解为更小的模块集合,而不是使用单个单体模块文件。
此包中定义的以下四个可用模块是
注意: app.views 和 app.models 不提供任何内容,并且它们的 __init__.py 文件为空。
App 模块
app 模块的工作是创建 Flask 应用程序。 它导出一个函数 create_app(),该函数将创建一个 Flask 应用程序对象并对其进行配置。 目前,它初始化与我的应用程序视图相对应的应用程序蓝图。 最终,create_app() 将执行其他操作,例如初始化日志记录,但为了清楚和简单起见,我现在跳过它。
App/__init__.py
from flask import Flask
def _initialize_blueprints(application):
'''
Register Flask blueprints
'''
from app.views.s3 import s3
application.register_blueprint(s3, url_prefix='/api/v1/s3')
def create_app():
'''
Create an app by initializing components.
'''
application = Flask(__name__)
_initialize_blueprints(application)
# Do it!
return application
Copy
threatstack-to-s3.py 使用此模块来启动应用程序。 它导入 create_app(),然后使用它来创建 Flask 应用程序实例。
Threatstack-to-s3.py
#!/usr/bin/env python
from app import create_app
# Gunicorn entry point.
application = create_app()
if __name__ == '__main__':
# Entry point when run via Python interpreter.
print("== Running in debug mode ==")
application.run(host='localhost', port=8080, debug=True)
Copy
视图和 Flask 蓝图
在讨论其余三个模块之前,我将讨论什么是视图和 Flask 蓝图,然后深入研究 app.views.s3 模块。
视图: 视图是应用程序使用者看到的内容。 此应用程序没有前端,但有一个公共 API 端点。 将视图视为可以并且应该暴露给正在使用此应用程序的人或事物(例如,使用者)的内容。 最佳实践是使视图尽可能简单。 如果端点的工作是接收数据并将其复制到 S3,请使其执行该功能,但将如何在应用程序模型中完成的细节隐藏起来。 视图应主要表示使用者希望看到的操作发生,而细节(使用者不应关心)存在于应用程序模型中(稍后描述)。
Flask 蓝图: 早些时候我说过我将使用 Packages and Blueprints 布局而不是单个模块应用程序。 蓝图包含我的 API 端点结构的一部分。 这让我可以从逻辑上对 API 的相关部分进行分组。 在我的例子中,每个视图模块都是它自己的蓝图。
了解更多
使用蓝图的模块化应用程序 Flask 网站上的文档。
Explore Flask 是一本关于使用 Flask 开发 Web 应用程序的最佳实践和模式的书。
App.views.s3 模块
threatstack-to-s3 服务接收 Threat Stack Webhook HTTP 请求,并将警报数据的副本存储在 S3 中。 这是我存储允许某人执行此操作的 API 端点集的地方。 如果您回头查看 app/__init__.py,您会看到我已将端点集植根于 /api/v1/s3。
来自 app/__init__.py:
from views.s3 import s3
app.register_blueprint(s3, url_prefix='/api/v1/s3')
Copy
我使用此路径有几个原因
- API: 要注意这是一个 API,我不应该期望有前端。 也许有一天我会添加一个前端。 可能不会,但我发现这在心理上很有用,并且可以作为对他人的标志
- V1: 这是 API 的版本 1。 如果我需要进行重大更改以适应新的要求,我可以添加 v2,以便在将所有使用者迁移到新版本时存在两个 API
- S3: 这是我正在连接和操作的服务。 我可以自由地将路径的这一部分命名为我想要的任何名称,但我喜欢保持其描述性。 例如,如果该服务将数据中继到 HipChat,我可以将路径的这一部分命名为 hipchat
在 app.views.s3 中,我暂时提供一个端点 /alert,它表示我正在操作的对象,并且仅响应 HTTP POST 请求方法。
记住: 构建 API 时,URL 路径应表示名词,HTTP 请求方法应表示动词。
App/views/s3.py
'''
API to archive alerts from Threat Stack to S3
'''
from flask import Blueprint, jsonify, request
import app.models.s3 as s3_model
import app.models.threatstack as threatstack_model
s3 = Blueprint('s3', __name__)
@s3.route('/alert', methods=['POST'])
def put_alert():
'''
Archive Threat Stack alerts to S3.
'''
webhook_data = request.get_json()
for alert in webhook_data.get('alerts'):
alert_full = threatstack_model.get_alert_by_id(alert.get('id'))
s3_model.put_webhook_data(alert)
s3_model.put_alert_data(alert_full)
status_code = 200
success = True
response = {'success': success}
return jsonify(response), status_code
Copy
现在我将介绍该模块的一些关键部分。 如果您对 Python 足够熟悉,您可以跳过接下来的几行关于导入的内容,但如果您想知道我为什么要重命名我导入的内容,请继续阅读。
from flask import Blueprint, jsonify, request
import app.models.s3 as s3_model
import app.models.threatstack as threatstack_model
Copy
我喜欢输入简洁性和一致性。 我本可以这样做来导入模型模块
import app.models.s3
import app.models.threatstack
Copy
但这意味着我将使用如下函数
app.models.s3.put_webhook_alert(alert)
Copy
我也本可以这样做
from app.models import s3, threatstack
Copy
但是,这会在我稍后创建 s3 蓝图对象时中断,因为我会覆盖 s3 模型模块。
s3 = Blueprint('s3', __name__) # We've just overwritten the s3 module we imported.
Copy
由于这些原因,导入模型模块并稍微重命名它们更容易。
现在我将介绍与应用程序端点及其相关联的函数。
@s3.route('/alert', methods=['POST'])
def put_alert():
'''
Archive Threat Stack alerts to S3.
'''
Copy
第一行称为装饰器。 我正在向名为 /alert 的 s3 蓝图添加路由(扩展为 /api/v1/s3/alert),当向其发出 HTTP POST 请求时,将导致调用 put_alert()。
函数的主体非常简单
- 获取请求的 JSON 数据
- 遍历 alerts 键中的数组
- 对于每个警报
- 从 Threat Stack 检索警报详细信息
- 将警报信息存储在 S3 的请求中
- 将警报详细信息存储在 S3 中
webhook_data = request.get_json()
for alert in webhook_data.get('alerts'):
alert_full = threatstack_model.get_alert_by_id(alert.get('id'))
s3_model.put_webhook_data(alert)
s3_model.put_alert_data(alert_full)
Copy
完成上述步骤后,我会返回一个简单的JSON文档,指示事务成功或失败。(注意:目前没有错误处理机制,所以我硬编码了成功响应和HTTP状态代码。稍后添加错误处理时,我会修改它。)
status_code = 200
success = True
response = {'success': success}
return jsonify(response), status_code
Copy
至此,我已经满足了我的请求,并完成了消费者所请求的内容。请注意,我没有包含任何演示如何满足请求的代码。我必须做什么才能获取警报的详细信息?我执行了哪些操作来存储警报?警报在S3中是如何存储和命名的?消费者并不真正关心这些细节。这是在您自己的服务中组织代码的一个好方法:消费者需要了解的内容应该放在您的视图中。消费者不需要了解的细节应该放在您的模型中,我即将介绍模型。
在讨论其余模块之前,我将讨论模型,模型是如何与我正在使用的服务(例如Threat Stack和S3)进行通信的。
模型
模型描述的是“事物”,而这些“事物”是我想要对其执行操作的对象。通常,当您搜索关于Flask模型的帮助时,博客和文档都喜欢使用数据库作为示例。虽然我现在所做的事情与数据库没有太大区别,但我只是将数据存储在对象存储中,而不是数据库中。这并不是我将来可能对从Threat Stack接收的数据所做的唯一事情。
此外,我选择跳过面向对象的方法,而采用过程式风格。在更高级的Python中,我会对警报对象进行建模,并提供操作它的方法。但是,对于在S3中存储数据这一任务而言,这引入了不必要的复杂性,并且也使代码对于演示简单任务而言更加复杂。因此,为了简洁和清晰,我选择了牺牲技术上的正确性。
App.models.threatstack 模块
顾名思义,app.models.threatstack 模块处理与 Threat Stack 的通信。
'''
Communicate with Threat Stack
'''
import os
import requests
THREATSTACK_BASE_URL = os.environ.get('THREATSTACK_BASE_URL', 'https://app.threatstack.com/api/v1')
THREATSTACK_API_KEY = os.environ.get('THREATSTACK_API_KEY')
def get_alert_by_id(alert_id):
'''
Retrieve an alert from Threat Stack by alert ID.
'''
alerts_url = '{}/alerts/{}'.format(THREATSTACK_BASE_URL, alert_id)
resp = requests.get(
alerts_url,
headers={'Authorization': THREATSTACK_API_KEY}
)
return resp.json()
Copy
快速浏览一下几个需要注意的地方
THREATSTACK_BASE_URL = os.environ.get('THREATSTACK_BASE_URL', 'https://app.threatstack.com/api/v1')
THREATSTACK_API_KEY = os.environ.get('THREATSTACK_API_KEY')
Copy
我不想在我的代码中保留 Threat Stack API 密钥。这只是良好的代码/安全实践。我现在将从我的环境中获取 API 密钥,因为这是一个快速而简单的解决方案。在某个时候,我应该将所有配置集中在一个文件中,而不是隐藏在这里,这样代码和设置会更简洁。这是以后的工作,现在,设置已记录在 README.md 中。
def get_alert_by_id(alert_id):
'''
Retrieve an alert from Threat Stack by alert ID.
'''
alerts_url = '{}/alerts/{}'.format(THREATSTACK_BASE_URL, alert_id)
resp = requests.get(
alerts_url,
headers={'Authorization': THREATSTACK_API_KEY}
)
return resp.json()
Copy
get_alert_by_id() 函数接受一个警报 ID,查询 Threat Stack 平台以获取警报数据,并返回该数据。我使用 Python 的 requests 模块 向 Threat Stack API 端点发出 HTTP GET 请求,该端点返回给定警报的警报信息。
App.models.s3 模块
app.models.s3 模块处理与 AWS S3 的连接。
'''
Manipulate objects in AWS S3.
'''
import boto3
import json
import os
import time
TS_AWS_S3_BUCKET = os.environ.get('TS_AWS_S3_BUCKET')
TS_AWS_S3_PREFIX = os.environ.get('TS_AWS_S3_PREFIX', None)
def put_webhook_data(alert):
'''
Put alert webhook data in S3 bucket.
'''
alert_time = time.gmtime(alert.get('created_at')/1000)
alert_time_path = time.strftime('%Y/%m/%d/%H/%M', alert_time)
alert_key = '/'.join([alert_time_path, alert.get('id')])
if TS_AWS_S3_PREFIX:
alert_key = '/'.join([TS_AWS_S3_PREFIX, alert_key])
s3_client = boto3.client('s3')
s3_client.put_object(
Body=json.dumps(alert),
Bucket=TS_AWS_S3_BUCKET,
Key=alert_key
)
return None
def put_alert_data(alert):
'''
Put alert data in S3.
'''
alert_id = alert.get('id')
alert_key = '/'.join(['alerts',
alert_id[0:2],
alert_id[2:4],
alert_id
])
if TS_AWS_S3_PREFIX:
alert_key = '/'.join([TS_AWS_S3_PREFIX, alert_key])
s3_client = boto3.client('s3')
s3_client.put_object(
Body=json.dumps(alert),
Bucket=TS_AWS_S3_BUCKET,
Key=alert_key
)
return None
Copy
我将讲解一些有趣的部分
TS_AWS_S3_BUCKET = os.environ.get('TS_AWS_S3_BUCKET')
TS_AWS_S3_PREFIX = os.environ.get('TS_AWS_S3_PREFIX', None)
Copy
同样,此应用程序没有配置文件,但我需要设置一个 S3 存储桶名称和一个可选的前缀。我最终应该修复这个问题——设置已记录在 README.md 中,目前这样就足够了。
函数 put_webhook_data() 和 put_alert_data() 有很多重复的代码。我没有重构它们,因为在重构之前更容易看到逻辑。如果您仔细观察,您会发现它们之间唯一的区别是 alert_key 的定义方式。我将重点关注 put_webhook_data()
def put_webhook_data(alert):
'''
Put alert webhook data in S3 bucket.
'''
alert_time = time.gmtime(alert.get('created_at')/1000)
alert_time_path = time.strftime('%Y/%m/%d/%H/%M', alert_time)
alert_key = '/'.join(['webhooks', alert_time_path, alert.get('id')])
if TS_AWS_S3_PREFIX:
alert_key = '/'.join([TS_AWS_S3_PREFIX, alert_key])
s3_client = boto3.client('s3')
s3_client.put_object(
Body=json.dumps(alert),
Bucket=TS_AWS_S3_BUCKET,
Key=alert_key
)
return None
Copy
此函数接受一个名为 alert 的参数。回顾 app/views/s3.py,alert 只是发送到端点的 JSON 数据。Webhook 数据按日期和时间存储在 S3 中。发生在 2017-01-17 13:51 的警报 587c0159a907346eccb84004 在 S3 中存储为 webhooks/2017/01/17/13/51/587c0159a907346eccb84004。
首先,我获取警报时间。Threat Stack 以自 Unix 纪元以来的毫秒数发送警报时间,需要将其转换为秒,这是 Python 处理时间的方式。我获取该时间并将其解析为将成为目录路径的字符串。然后,我将存储 webhook 数据的顶级目录、基于时间的路径以及最终的警报 ID 连接起来,以形成 S3 中 webhook 数据的路径。
Boto 3 是 Python 中用于处理 AWS 资源的主要模块。我初始化一个 boto3 客户端对象,以便我可以与 S3 通信并将对象放在那里。s3_client.put_object() 使用其 Bucket 和 Key 参数非常简单,这些参数分别是 S3 存储桶的名称和我想要存储的 S3 对象的路径。Body 参数是我转换回字符串的警报。
总结
我现在拥有一个功能性的 Python Flask Web 服务,它可以接受 Threat Stack Webhook 请求,获取警报的详细信息,并将其存档在 S3 中。这是一个良好的开端,但要使其投入生产,还有很多工作要做。您可能会立即问:“如果出现问题怎么办?” 没有异常处理来处理诸如与 Threat Stack 或 S3 的通信失败之类的问题。我故意省略了它以保持代码清晰。也没有授权密钥检查。这意味着任何人都可以向其发送数据。(并且由于我不进行任何错误检查或异常处理,他们可以使服务崩溃。)也没有 TLS 加密处理。这是我留给 Nginx 或 Apache 的事情,它们将是位于此应用程序前面的 Web 服务器。在将此 Web 服务投入生产之前,您需要解决所有这些问题以及更多问题。但就目前而言,这是一个可以帮助您在开始构建自己的服务时变得更加舒适的起点。
资源
查看 GitHub 上的 Threat Stack 到 S3 服务的存储库。
由于应用程序会经历修订,请查看 本文中使用的版本。
查看 Tom 的关于 Python Flask 中异常处理 的新教程。
本文最初发表在 Threat Stack 博客上。经许可转载。
评论已关闭。