如何使用 Python Flask 编写 Web 服务

如果您可以编写自己的 Web 服务会怎样? 从本教程开始。
501 位读者喜欢这篇文章。
How to write a web service using Python Flask

Yuko Honda 在 Flickr 上发布。 CC BY-SA 2.0

我们的许多客户正在使用我们的Webhook 功能构建有用的服务,但不幸的是,其他客户却没有。 我们经常听到他们团队中没有人足够精通编写可以提取 Webhook 有效负载并使用该数据做一些事情的服务。 这使得他们要么希望从其开发团队获得周期(不太可能),要么继续不用。

但是,如果您可以编写自己的 Web 服务呢? 您可以将多少涉及从系统 A 获取数据并将其输入到系统 B 的例行任务自动化?

学习足够的编码可以成为您工具箱中的一项重要技能,也是优化组织中安全流程的重要资产。 在这篇文章中,我将引导您完成一个教程,该教程将帮助您开始使用 Python Flask 编写自己的 Web 服务。

我们要构建什么

具体来说,我将引导您完成创建一个简单的 Python Flask 应用程序的过程,该应用程序提供 RESTful Web 服务。 该服务将提供一个端点来

  • 从 Threat Stack 接收 JSON 格式的有效负载(Webhook)
  • 解析有效负载以获取 Threat Stack 警报 ID
  • 从 Threat Stack 检索详细的警报数据
  • 将 Webhook 和警报数据存档到 AWS S3

但在我开始之前,请记住一些事项。 首先,我不会理会任何前端显示功能,因此您无需担心 HTML 或 CSS。 其次,我的组织遵循 Flask 自己的建议的组织方式。 我将跳过单模块模式,直接使用Packages and Blueprints 模型

Flask 教程种类繁多。 一方面,有一些教程解释了如何构建小型、简单的应用程序(整个应用程序都放在一个文件中)。 另一方面,有一些教程解释了如何构建更大、更复杂的应用程序。 本教程填补了中间的空白,并演示了一个简单但可以立即适应日益复杂的需求的结构。

项目结构

我将构建的项目的结构来自 Explore Flask,如下所示

Threatstack-to-s3

├── app

│   ├── __init__.py

│   ├── models

│   │   ├── __init__.py

│   │   ├── s3.py

│   │   └── threatstack.py

│   └── views

│       ├── __init__.py

│       └── s3.py

├── gunicorn.conf.py

├── requirements.osx.txt

├── requirements.txt

└── threatstack-to-s3.py

顶层文件

我将从顶层文件开始讨论,这些文件在构建服务时对我很有用

Gunicorn.conf.py: 这是 Gunicorn WSGI HTTP 服务器的配置文件,该服务器将提供此应用程序。 虽然应用程序可以自行运行并接受连接,但 Gunicorn 在处理多个连接并允许应用程序随负载扩展方面效率更高。

Requirements.txt/requirements.osx.txt: 应用程序的依赖项在此文件中列出。 pip 实用程序使用它来安装所需的 Python 包。 有关安装依赖项的信息,请参阅此README.md的“设置”部分。

Threatstack-to-s3.py: 这是应用程序启动器。 如果您正在进行本地调试,可以使用“python”直接运行它,或者可以将其作为参数传递给“gunicorn”作为应用程序入口点。 有关如何启动服务的信息,请参阅 README.md

应用程序包(app/ 目录)

应用程序包是我的应用程序包。 应用程序的逻辑位于此目录下。 正如我之前提到的,我已选择将应用程序分解为更小的模块集合,而不是使用单个单体模块文件。

此包中定义的以下四个可用模块是

注意: app.viewsapp.models 不提供任何内容,并且它们的 __init__.py 文件为空。

App 模块

app 模块的工作是创建 Flask 应用程序。 它导出一个函数 create_app(),该函数将创建一个 Flask 应用程序对象并对其进行配置。 目前,它初始化与我的应用程序视图相对应的应用程序蓝图。 最终,create_app() 将执行其他操作,例如初始化日志记录,但为了清楚和简单起见,我现在跳过它。

App/__init__.py

from flask import Flask

def _initialize_blueprints(application):
    '''
    Register Flask blueprints
    '''
    from app.views.s3 import s3
    application.register_blueprint(s3, url_prefix='/api/v1/s3')

def create_app():
    '''
    Create an app by initializing components.
    '''
    application = Flask(__name__)

    _initialize_blueprints(application)

    # Do it!
    return application
Copy

threatstack-to-s3.py 使用此模块来启动应用程序。 它导入 create_app(),然后使用它来创建 Flask 应用程序实例。

Threatstack-to-s3.py

#!/usr/bin/env python
from app import create_app

# Gunicorn entry point.
application = create_app()

if __name__ == '__main__':
    # Entry point when run via Python interpreter.
    print("== Running in debug mode ==")
    application.run(host='localhost', port=8080, debug=True)
Copy

视图和 Flask 蓝图

在讨论其余三个模块之前,我将讨论什么是视图和 Flask 蓝图,然后深入研究 app.views.s3 模块。

视图: 视图是应用程序使用者看到的内容。 此应用程序没有前端,但有一个公共 API 端点。 将视图视为可以并且应该暴露给正在使用此应用程序的人或事物(例如,使用者)的内容。 最佳实践是使视图尽可能简单。 如果端点的工作是接收数据并将其复制到 S3,请使其执行该功能,但将如何在应用程序模型中完成的细节隐藏起来。 视图应主要表示使用者希望看到的操作发生,而细节(使用者不应关心)存在于应用程序模型中(稍后描述)。

Flask 蓝图: 早些时候我说过我将使用 Packages and Blueprints 布局而不是单个模块应用程序。 蓝图包含我的 API 端点结构的一部分。 这让我可以从逻辑上对 API 的相关部分进行分组。 在我的例子中,每个视图模块都是它自己的蓝图。

了解更多

使用蓝图的模块化应用程序 Flask 网站上的文档。

Explore Flask 是一本关于使用 Flask 开发 Web 应用程序的最佳实践和模式的书。

App.views.s3 模块

threatstack-to-s3 服务接收 Threat Stack Webhook HTTP 请求,并将警报数据的副本存储在 S3 中。 这是我存储允许某人执行此操作的 API 端点集的地方。 如果您回头查看 app/__init__.py,您会看到我已将端点集植根于 /api/v1/s3

来自 app/__init__.py:

    from views.s3 import s3
    app.register_blueprint(s3, url_prefix='/api/v1/s3')
Copy

我使用此路径有几个原因

  • API: 要注意这是一个 API,我不应该期望有前端。 也许有一天我会添加一个前端。 可能不会,但我发现这在心理上很有用,并且可以作为对他人的标志
  • V1: 这是 API 的版本 1。 如果我需要进行重大更改以适应新的要求,我可以添加 v2,以便在将所有使用者迁移到新版本时存在两个 API
  • S3: 这是我正在连接和操作的服务。 我可以自由地将路径的这一部分命名为我想要的任何名称,但我喜欢保持其描述性。 例如,如果该服务将数据中继到 HipChat,我可以将路径的这一部分命名为 hipchat

app.views.s3 中,我暂时提供一个端点 /alert,它表示我正在操作的对象,并且仅响应 HTTP POST 请求方法。

记住: 构建 API 时,URL 路径应表示名词,HTTP 请求方法应表示动词。

App/views/s3.py

'''
API to archive alerts from Threat Stack to S3
'''

from flask import Blueprint, jsonify, request
import app.models.s3 as s3_model
import app.models.threatstack as threatstack_model

s3 = Blueprint('s3', __name__)


@s3.route('/alert', methods=['POST'])
def put_alert():
    '''
    Archive Threat Stack alerts to S3.
    '''
    webhook_data = request.get_json()
    for alert in webhook_data.get('alerts'):
        alert_full = threatstack_model.get_alert_by_id(alert.get('id'))
        s3_model.put_webhook_data(alert)
        s3_model.put_alert_data(alert_full)

    status_code = 200
    success = True
    response = {'success': success}

    return jsonify(response), status_code  
Copy

现在我将介绍该模块的一些关键部分。 如果您对 Python 足够熟悉,您可以跳过接下来的几行关于导入的内容,但如果您想知道我为什么要重命名我导入的内容,请继续阅读。

from flask import Blueprint, jsonify, request
import app.models.s3 as s3_model
import app.models.threatstack as threatstack_model  
Copy

我喜欢输入简洁性和一致性。 我本可以这样做来导入模型模块

import app.models.s3
import app.models.threatstack
Copy

但这意味着我将使用如下函数

app.models.s3.put_webhook_alert(alert)  
Copy

我也本可以这样做

from app.models import s3, threatstack
Copy

但是,这会在我稍后创建 s3 蓝图对象时中断,因为我会覆盖 s3 模型模块。

s3 = Blueprint('s3', __name__) # We've just overwritten the s3 module we imported.  
Copy

由于这些原因,导入模型模块并稍微重命名它们更容易。

现在我将介绍与应用程序端点及其相关联的函数。

@s3.route('/alert', methods=['POST'])
def put_alert():
    '''
    Archive Threat Stack alerts to S3.
    '''
Copy

第一行称为装饰器。 我正在向名为 /alert 的 s3 蓝图添加路由(扩展为 /api/v1/s3/alert),当向其发出 HTTP POST 请求时,将导致调用 put_alert()

函数的主体非常简单

  • 获取请求的 JSON 数据
  • 遍历 alerts 键中的数组
  • 对于每个警报
    • 从 Threat Stack 检索警报详细信息
    • 将警报信息存储在 S3 的请求中
    • 将警报详细信息存储在 S3 中
    webhook_data = request.get_json()
    for alert in webhook_data.get('alerts'):
        alert_full = threatstack_model.get_alert_by_id(alert.get('id'))
        s3_model.put_webhook_data(alert)
        s3_model.put_alert_data(alert_full)
Copy

完成上述步骤后,我会返回一个简单的JSON文档,指示事务成功或失败。(注意:目前没有错误处理机制,所以我硬编码了成功响应和HTTP状态代码。稍后添加错误处理时,我会修改它。)

    status_code = 200
    success = True
    response = {'success': success}

    return jsonify(response), status_code
Copy

至此,我已经满足了我的请求,并完成了消费者所请求的内容。请注意,我没有包含任何演示如何满足请求的代码。我必须做什么才能获取警报的详细信息?我执行了哪些操作来存储警报?警报在S3中是如何存储和命名的?消费者并不真正关心这些细节。这是在您自己的服务中组织代码的一个好方法:消费者需要了解的内容应该放在您的视图中。消费者不需要了解的细节应该放在您的模型中,我即将介绍模型。

在讨论其余模块之前,我将讨论模型,模型是如何与我正在使用的服务(例如Threat Stack和S3)进行通信的。

模型

模型描述的是“事物”,而这些“事物”是我想要对其执行操作的对象。通常,当您搜索关于Flask模型的帮助时,博客和文档都喜欢使用数据库作为示例。虽然我现在所做的事情与数据库没有太大区别,但我只是将数据存储在对象存储中,而不是数据库中。这并不是我将来可能对从Threat Stack接收的数据所做的唯一事情。

此外,我选择跳过面向对象的方法,而采用过程式风格。在更高级的Python中,我会对警报对象进行建模,并提供操作它的方法。但是,对于在S3中存储数据这一任务而言,这引入了不必要的复杂性,并且也使代码对于演示简单任务而言更加复杂。因此,为了简洁和清晰,我选择了牺牲技术上的正确性。

App.models.threatstack 模块

顾名思义,app.models.threatstack 模块处理与 Threat Stack 的通信。

'''
Communicate with Threat Stack
'''
import os
import requests

THREATSTACK_BASE_URL = os.environ.get('THREATSTACK_BASE_URL', 'https://app.threatstack.com/api/v1')
THREATSTACK_API_KEY = os.environ.get('THREATSTACK_API_KEY')

def get_alert_by_id(alert_id):
    '''
    Retrieve an alert from Threat Stack by alert ID.
    '''
    alerts_url = '{}/alerts/{}'.format(THREATSTACK_BASE_URL, alert_id)

    resp = requests.get(
        alerts_url,
        headers={'Authorization': THREATSTACK_API_KEY}
    )

    return resp.json()
Copy

快速浏览一下几个需要注意的地方

THREATSTACK_BASE_URL = os.environ.get('THREATSTACK_BASE_URL', 'https://app.threatstack.com/api/v1')
THREATSTACK_API_KEY = os.environ.get('THREATSTACK_API_KEY')
Copy

我不想在我的代码中保留 Threat Stack API 密钥。这只是良好的代码/安全实践。我现在将从我的环境中获取 API 密钥,因为这是一个快速而简单的解决方案。在某个时候,我应该将所有配置集中在一个文件中,而不是隐藏在这里,这样代码和设置会更简洁。这是以后的工作,现在,设置已记录在 README.md 中。

def get_alert_by_id(alert_id):
    '''
    Retrieve an alert from Threat Stack by alert ID.
    '''
    alerts_url = '{}/alerts/{}'.format(THREATSTACK_BASE_URL, alert_id)

    resp = requests.get(
        alerts_url,
        headers={'Authorization': THREATSTACK_API_KEY}
    )

    return resp.json()
Copy

get_alert_by_id() 函数接受一个警报 ID,查询 Threat Stack 平台以获取警报数据,并返回该数据。我使用 Python 的 requests 模块 向 Threat Stack API 端点发出 HTTP GET 请求,该端点返回给定警报的警报信息。

阅读 Threat Stack API 文档

App.models.s3 模块

app.models.s3 模块处理与 AWS S3 的连接。

'''
Manipulate objects in AWS S3.
'''
import boto3
import json
import os
import time

TS_AWS_S3_BUCKET = os.environ.get('TS_AWS_S3_BUCKET')
TS_AWS_S3_PREFIX = os.environ.get('TS_AWS_S3_PREFIX', None)

def put_webhook_data(alert):
    '''
    Put alert webhook data in S3 bucket.
    '''
    alert_time = time.gmtime(alert.get('created_at')/1000)
    alert_time_path = time.strftime('%Y/%m/%d/%H/%M', alert_time)
    alert_key = '/'.join([alert_time_path, alert.get('id')])
    if TS_AWS_S3_PREFIX:
        alert_key = '/'.join([TS_AWS_S3_PREFIX, alert_key])

    s3_client = boto3.client('s3')
    s3_client.put_object(
        Body=json.dumps(alert),
        Bucket=TS_AWS_S3_BUCKET,
        Key=alert_key
    )

    return None

def put_alert_data(alert):
    '''
    Put alert data in S3.
    '''
    alert_id = alert.get('id')
    alert_key = '/'.join(['alerts',
                          alert_id[0:2],
                          alert_id[2:4],
                          alert_id
                          ])

    if TS_AWS_S3_PREFIX:
        alert_key = '/'.join([TS_AWS_S3_PREFIX, alert_key])

    s3_client = boto3.client('s3')
    s3_client.put_object(
        Body=json.dumps(alert),
        Bucket=TS_AWS_S3_BUCKET,
        Key=alert_key
    )

    return None
Copy

我将讲解一些有趣的部分

TS_AWS_S3_BUCKET = os.environ.get('TS_AWS_S3_BUCKET')
TS_AWS_S3_PREFIX = os.environ.get('TS_AWS_S3_PREFIX', None)
Copy

同样,此应用程序没有配置文件,但我需要设置一个 S3 存储桶名称和一个可选的前缀。我最终应该修复这个问题——设置已记录在 README.md 中,目前这样就足够了。

函数 put_webhook_data()put_alert_data() 有很多重复的代码。我没有重构它们,因为在重构之前更容易看到逻辑。如果您仔细观察,您会发现它们之间唯一的区别是 alert_key 的定义方式。我将重点关注 put_webhook_data()

def put_webhook_data(alert):
    '''
    Put alert webhook data in S3 bucket.
    '''
    alert_time = time.gmtime(alert.get('created_at')/1000)
    alert_time_path = time.strftime('%Y/%m/%d/%H/%M', alert_time)
    alert_key = '/'.join(['webhooks', alert_time_path, alert.get('id')])
    if TS_AWS_S3_PREFIX:
        alert_key = '/'.join([TS_AWS_S3_PREFIX, alert_key])

    s3_client = boto3.client('s3')
    s3_client.put_object(
        Body=json.dumps(alert),
        Bucket=TS_AWS_S3_BUCKET,
        Key=alert_key
    )

    return None
Copy

此函数接受一个名为 alert 的参数。回顾 app/views/s3.pyalert 只是发送到端点的 JSON 数据。Webhook 数据按日期和时间存储在 S3 中。发生在 2017-01-17 13:51 的警报 587c0159a907346eccb84004 在 S3 中存储为 webhooks/2017/01/17/13/51/587c0159a907346eccb84004。

首先,我获取警报时间。Threat Stack 以自 Unix 纪元以来的毫秒数发送警报时间,需要将其转换为秒,这是 Python 处理时间的方式。我获取该时间并将其解析为将成为目录路径的字符串。然后,我将存储 webhook 数据的顶级目录、基于时间的路径以及最终的警报 ID 连接起来,以形成 S3 中 webhook 数据的路径。

Boto 3 是 Python 中用于处理 AWS 资源的主要模块。我初始化一个 boto3 客户端对象,以便我可以与 S3 通信并将对象放在那里。s3_client.put_object() 使用其 BucketKey 参数非常简单,这些参数分别是 S3 存储桶的名称和我想要存储的 S3 对象的路径。Body 参数是我转换回字符串的警报。

总结

我现在拥有一个功能性的 Python Flask Web 服务,它可以接受 Threat Stack Webhook 请求,获取警报的详细信息,并将其存档在 S3 中。这是一个良好的开端,但要使其投入生产,还有很多工作要做。您可能会立即问:“如果出现问题怎么办?” 没有异常处理来处理诸如与 Threat Stack 或 S3 的通信失败之类的问题。我故意省略了它以保持代码清晰。也没有授权密钥检查。这意味着任何人都可以向其发送数据。(并且由于我不进行任何错误检查或异常处理,他们可以使服务崩溃。)也没有 TLS 加密处理。这是我留给 Nginx 或 Apache 的事情,它们将是位于此应用程序前面的 Web 服务器。在将此 Web 服务投入生产之前,您需要解决所有这些问题以及更多问题。但就目前而言,这是一个可以帮助您在开始构建自己的服务时变得更加舒适的起点。

资源

查看 GitHub 上的 Threat Stack 到 S3 服务的存储库

由于应用程序会经历修订,请查看 本文中使用的版本

查看 Tom 的关于 Python Flask 中异常处理 的新教程。

本文最初发表在 Threat Stack 博客上。经许可转载。

标签
User profile image.
作为 Threat Stack 的工程倡导者,Tom 利用他在云基础设施/安全方面的经验来解决问题并提供对解决方案的见解。他喜欢寻找安全可靠地自动化基础设施的新的和有趣的方法。工作之余,他是一位自豪的猫爸爸,养了两只三花猫,喜欢赛车和航海。

评论已关闭。

Creative Commons License本作品采用知识共享署名-相同方式共享 4.0 国际许可协议授权。
© . All rights reserved.