Skip to content

Latest commit

 

History

History
1673 lines (1219 loc) · 63 KB

File metadata and controls

1673 lines (1219 loc) · 63 KB

十一、网络开发

在本章中,我们将介绍以下配方:

  • 处理 JSON 如何解析和编写 JSON 对象
  • 解析 URL 如何解析 URL 的路径、查询和其他部分
  • 使用 HTTP 如何从 HTTP 端点读取数据
  • 向 HTTP 提交表单如何将 HTML 表单发布到 HTTP 端点
  • 构建 HTML 如何使用正确的转义生成 HTML
  • 服务 HTTP 通过 HTTP 服务动态内容
  • 服务静态文件如何通过 HTTP 服务静态文件
  • web 应用程序中的错误如何报告 web 应用程序中的错误
  • 处理表单和文件解析从 HTML 表单接收的数据和上载的文件
  • 提供基本 REST/JSON API 的 REST API
  • 处理 Cookie 如何处理 Cookie 以识别返回的用户

介绍

HTTP 协议和更一般的 web 技术集被认为是创建分布式系统的一种有效而稳健的方法,可以利用广泛而可靠的方法,利用现成的技术和缓存、错误传播、可重复请求的范例实现进程间通信,以及针对服务可能失败而不影响整体系统状态的上下文的最佳实践。

Python 有许多非常好和可靠的 web 框架,从 Django 和 TurboGears 等全堆栈解决方案,到 Pyramid 和 Flask 等更精细的可调整框架。但是,在许多情况下,标准库可能已经提供了实现基于 HTTP 的软件所需的工具,而无需依赖外部库和框架。

在本章中,我们将介绍标准库提供的一些常见方法和工具,这些方法和工具在 HTTP 和基于 web 的应用程序环境中非常方便。

处理 JSON

使用基于 web 的解决方案时,最常见的需求之一是解析和说出 JSON。Python 内置了对 XML 和 HTML 的支持,但也支持 JSON 编码和解码。

JSON 编码器还可以专门处理非标准类型,例如日期。

怎么做。。。

对于该配方,需执行以下步骤:

  1. JSONEncoderJSONDecoder类可以专门用于实现自定义编码和解码行为:
import json
import datetime
import decimal
import types

class CustomJSONEncoder(json.JSONEncoder):
    """JSON Encoder with support for additional types.

    Supports dates, times, decimals, generators and
    any custom class that implements __json__ method.
    """
    def default(self, obj):
        if hasattr(obj, '__json__') and callable(obj.__json__):
            return obj.__json__()
        elif isinstance(obj, (datetime.datetime, datetime.time)):
            return obj.replace(microsecond=0).isoformat()
        elif isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
        elif isinstance(obj, types.GeneratorType):
            return list(obj)
        else:
            return super().default(obj)
  1. 然后可以将我们的自定义编码器传递给json.dumps,根据我们的规则对 JSON 输出进行编码:
jsonstr = json.dumps({'s': 'Hello World',
                    'dt': datetime.datetime.utcnow(),
                    't': datetime.datetime.utcnow().time(),
                    'g': (i for i in range(5)),
                    'd': datetime.date.today(),
                    'dct': {
                        's': 'SubDict',
                        'dt': datetime.datetime.utcnow()
                    }}, 
                    cls=CustomJSONEncoder)

>>> print(jsonstr)
{"t": "10:53:53", 
 "s": "Hello World", 
 "d": "2018-06-29", 
 "dt": "2018-06-29T10:53:53", 
 "dct": {"dt": "2018-06-29T10:53:53", "s": "SubDict"}, 
 "g": [0, 1, 2, 3, 4]}
  1. 我们还可以对任何自定义类进行编码,只要它提供了一个__json__方法:
class Person:
    def __init__(self, name, surname):
        self.name = name
        self.surname = surname

    def __json__(self):
        return {
            'name': self.name,
            'surname': self.surname
        }
  1. 结果将是一个包含所提供数据的 JSON 对象:
>>> print(json.dumps({'person': Person('Simone', 'Marzola')}, 
                     cls=CustomJSONEncoder))
{"person": {"name": "Simone", "surname": "Marzola"}}
  1. 顺便说一句,加载编码值将导致对普通字符串进行解码,因为它们不是 JSON 类型:
>>> print(json.loads(jsonstr))
{'g': [0, 1, 2, 3, 4], 
 'd': '2018-06-29', 
 's': 'Hello World', 
 'dct': {'s': 'SubDict', 'dt': '2018-06-29T10:56:30'}, 
 't': '10:56:30', 
 'dt': '2018-06-29T10:56:30'}
  1. 如果我们还想解析回溯日期,我们可以尝试专门化一个JSONDecoder来猜测字符串是否包含 ISO 8601 格式的日期,并尝试将其解析回:
class CustomJSONDecoder(json.JSONDecoder):
    """Custom JSON Decoder that tries to decode additional types.

    Decoder tries to guess dates, times and datetimes in ISO format.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(
            *args, **kwargs, object_hook=self.parse_object
        )

    def parse_object(self, values):
        for k, v in values.items():
            if not isinstance(v, str):
                continue

            if len(v) == 10 and v.count('-') == 2:
                # Probably contains a date
                try:
                    values[k] = datetime.datetime.strptime(v, '%Y-
                    %m-%d').date()
                except:
                    pass
            elif len(v) == 8 and v.count(':') == 2:
                # Probably contains a time
                try:
                    values[k] = datetime.datetime.strptime(v, 
                    '%H:%M:%S').time()
                except:
                    pass
            elif (len(v) == 19 and v.count('-') == 2 and 
                v.count('T') == 1 and v.count(':') == 2):
                # Probably contains a datetime
                try:
                    values[k] = datetime.datetime.strptime(v, '%Y-
                    %m-%dT%H:%M:%S')
                except:
                    pass
        return values
  1. 在以前的数据处加载应导致预期的类型:
>>> jsondoc = json.loads(jsonstr, cls=CustomJSONDecoder)
>>> print(jsondoc)
{'g': [0, 1, 2, 3, 4], 
 'd': datetime.date(2018, 6, 29), 
 's': 'Hello World', 
 'dct': {'s': 'SubDict', 'dt': datetime.datetime(2018, 6, 29, 10, 56, 30)},
 't': datetime.time(10, 56, 30), 
 'dt': datetime.datetime(2018, 6, 29, 10, 56, 30)}

它是如何工作的。。。

为了生成 Python 对象的 JSON 表示,使用了json.dumps方法。此方法接受一个附加参数cls,其中可以提供自定义编码器类:

json.dumps({'key': 'value', cls=CustomJSONEncoder)

当需要对编码器不知道如何编码的对象进行编码时,将调用所提供类的default方法。

我们的CustomJSONEncoder类提供了一个default方法,用于处理编码日期、时间、生成器、小数以及任何提供__json__方法的自定义类:

class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if hasattr(obj, '__json__') and callable(obj.__json__):
            return obj.__json__()
        elif isinstance(obj, (datetime.datetime, datetime.time)):
            return obj.replace(microsecond=0).isoformat()
        elif isinstance(obj, datetime.date):
            return obj.isoformat()
        elif isinstance(obj, decimal.Decimal):
            return float(obj)
        elif isinstance(obj, types.GeneratorType):
            return list(obj)
        else:
            return super().default(obj)

这是通过逐个检查编码对象的属性来完成的。记住,编码器知道如何编码的对象不会提供给default方法;只有编码器不知道如何处理的对象才会传递给default方法。

因此,我们只需要检查我们想要支持的对象,而不是标准对象。

我们的第一个检查是验证提供的对象是否有__json__方法:

if hasattr(obj, '__json__') and callable(obj.__json__):
    return obj.__json__()

对于任何具有可调用的__json__属性的对象,我们将依靠调用它来检索该对象的 JSON 表示。__json__方法所要做的就是返回 JSON 编码器知道如何编码的任何对象,通常是存储对象属性的dict

对于日期,我们将使用 ISO 8601 格式的简化形式对其进行编码:

elif isinstance(obj, (datetime.datetime, datetime.time)):
    return obj.replace(microsecond=0).isoformat()
elif isinstance(obj, datetime.date):
    return obj.isoformat()

这通常允许从客户端轻松解析,例如 JavaScript 解释器,这些解释器可能必须从提供的数据中构建date对象。

Decimal只是为了方便起见转换成浮点数。这在大多数情况下就足够了,并且与任何 JSON 解码器完全兼容,无需任何额外的机器。当然,没有什么可以阻止我们返回更复杂的对象,如字典,以保持固定的精度:

elif isinstance(obj, decimal.Decimal):
    return float(obj)

最后,使用生成器并从中返回包含的值的列表。这通常是您所期望的,并且表示生成器逻辑本身需要做出不合理的努力来保证跨语言兼容性:

elif isinstance(obj, types.GeneratorType):
    return list(obj)

对于我们不知道如何处理的任何对象,我们只是让父对象实现default方法并继续:

else:
    return super().default(obj)

这只会抱怨对象不可 JSON 序列化,并会通知开发人员我们不知道如何处理它。

相反,自定义解码器支持的工作方式略有不同。

虽然编码器将接收它知道的对象和它不知道的对象(因为 Python 对象比 JSON 对象更丰富),但很容易看出它如何只能为它不知道的对象请求额外的指导,并以标准方式为它知道如何处理的对象执行操作。

解码器只接收有效的 JSON 对象;否则,提供的字符串将根本不是有效的 JSON。

它如何知道所提供的字符串必须被解码为普通字符串,或者它是否应该请求额外的指导?

它不能,因此它要求对任何一个解码对象提供指导。

这就是解码器基于object_hook可调用函数的原因,该函数将接收每一个解码的 JSON 对象,并可以检查它以执行其他转换,或者如果正常解码是正确的,它可以释放它。

在我们的实现中,我们对解码器进行了子类化,并提供了一个基于本地类方法parse_object的默认object_hook参数:

class CustomJSONDecoder(json.JSONDecoder):
    def __init__(self, *args, **kwargs):
        super().__init__(
            *args, **kwargs, object_hook=self.parse_object
        )

然后,parse_object方法将接收解码 JSON 时发现的任何 JSON 对象(顶部或嵌套的);因此,它将收到一组字典,可以根据需要以任何方式进行检查,并编辑其内容,以便在 JSON 解码器本身执行的转换之外执行其他转换:

def parse_object(self, values):
    for k, v in values.items():
        if not isinstance(v, str):
            continue

        if len(v) == 10 and v.count('-') == 2:
            # Probably contains a date
            try:
                values[k] = datetime.datetime.strptime(v, '%Y-%m-
                %d').date()
            except:
                pass
        elif len(v) == 8 and v.count(':') == 2:
            # Probably contains a time
            try:
                values[k] = datetime.datetime.strptime(v, 
                '%H:%M:%S').time()
            except:
                pass
        elif (len(v) == 19 and v.count('-') == 2 and 
            v.count('T') == 1 and v.count(':') == 2):
            # Probably contains a datetime
            try:
                values[k] = datetime.datetime.strptime(v, '%Y-%m-
                %dT%H:%M:%S')
            except:
                pass
    return values

接收到的参数实际上是一个完整的 JSON 对象,因此它永远不会是一个单独的字段;它将始终是一个对象(因此,一个包含多个键值的完整 Python 字典)。

请看以下对象:

{'g': [0, 1, 2, 3, 4], 
 'd': '2018-06-29', 
 's': 'Hello World', 

您不会收到g键,但会收到整个 Python 字典。这意味着,如果您的 JSON 文档没有嵌套的 JSON 对象,那么整个文档只会调用一次您的object_hook而不会调用其他对象。

因此,parse_object方法提供的自定义object_hook迭代解码 JSON 对象的所有属性:

for k, v in values.items():
    if not isinstance(v, str):
        continue

由于 JSON 中的日期和时间通常以 ISO8601 格式的字符串表示,因此它忽略了所有非字符串的内容。

我们对数字、列表和字典的转换方式非常满意(如果希望将日期放在列表中,则可能需要跳转到列表中),因此如果值不是字符串,我们就跳过它。

当值是一个字符串时,我们检查它的属性,如果我们猜测它可能是一个日期,我们会尝试将它解析为一个日期。

我们可以考虑一个日期的正确定义:三个值由两个破折号隔开,三个值由两个冒号隔开,中间有两个:

elif (len(v) == 19 and v.count('-') == 2 and 
      v.count('T') == 1 and v.count(':') == 2):
    # Probably contains a datetime

如果该定义匹配,我们实际上尝试将其解码为 Pythondatetime对象,并替换解码的 JSON 对象中的值:

# Probably contains a datetime
try:
    values[k] = datetime.datetime.strptime(v, '%Y-%m-%dT%H:%M:%S')
except:
    pass

还有更多。。。

您可能注意到,虽然将 Python 编码为 JSON 是相当合理和健壮的,但返回过程中充满了问题。

JSON 不是一种非常有表现力的语言;它不为自定义类型提供任何机制,因此您有一种标准的方法向解码器返回有关您希望解码的类型的提示。

虽然我们可以猜测类似2017-01-01T13:21:17的东西是一个日期,但我们根本无法保证。也许,最初,它实际上是一些文本,碰巧包含了一些可以作为日期解码的内容,但在 Python 中从来没有打算成为datetime对象。

因此,通常只在受限环境中实现自定义解码是安全的。如果您知道并控制将从中接收数据的源,那么提供自定义解码通常是安全的。您可能希望使用自定义属性扩展 JSON,以指导解码器(比如有一个__type__键,告诉您它是日期还是字符串),但在开放的 web 世界中,尝试猜测人们发送给您的内容通常不是一个好主意,因为 web 非常多样化。

JSON 有一些扩展的标准版本,它们试图精确地解决解码数据时的这种模糊性,例如 JSON-LD 和 JSON 模式,它们允许您用 JSON 表示更复杂的实体。

如果您觉得有必要,您应该依靠这些标准来避免重新发明轮子的风险,并避免面临现有标准已经解决的解决方案的限制。

解析 URL

使用基于 web 的软件时,经常需要了解链接、协议和路径。

您可能会倾向于依赖正则表达式或字符串拆分来解析 URL,但如果您考虑了 URL 可能包含的所有奇怪之处(如凭据或特定协议),那么这可能不像您预期的那么容易。

Python 在urllibcgi模块中提供了实用程序,当您想要考虑 URL 可能具有的所有不同格式时,这些实用程序会使您的工作变得更轻松。

依靠它们可以让生活更轻松,让你的软件更健壮。

怎么做。。。

urllib.parse模块有多个解析 URL 的工具。最常用的解决方案是依靠urllib.parse.urlparse,它可以处理最广泛的 URL 类型:

import urllib.parse

def parse_url(url):
    """Parses an URL of the most widespread format.

    This takes for granted there is a single set of parameters
    for the whole path.
    """
    parts = urllib.parse.urlparse(url)
    parsed = vars(parts)
    parsed['query'] = urllib.parse.parse_qs(parts.query)
    return parsed

可以在命令行上调用前面的代码段,如下所示:

>>> url = 'http://user:[email protected]:80/path/subpath?arg1=val1&arg2=val2#fragment'
>>> result = parse_url(url)
>>> print(result)
OrderedDict([('scheme', 'http'),
             ('netloc', 'user:[email protected]:80'),
             ('path', '/path/subpath'),
             ('params', ''),
             ('query', {'arg1': ['val1'], 'arg2': ['val2']}),
             ('fragment', 'fragment')])

返回的OrderedDict包含组成我们 URL 的所有部分,并且,对于查询参数,它提供了已经解析过的部分。

还有更多。。。

如今,URI 还支持在每个路径段提供参数。这些在实践中很少使用,但是如果您的代码预期会收到此类 URI,那么您不应该依赖于urllib.parse.urlparse,因为它试图从 URL 解析参数,而这些 URI 不支持这些参数:

>>> url = 'http://user:[email protected]:80/root;para1/subpath;para2?arg1=val1#fragment'
>>> result = urllib.parse.urlparse(url)
>>> print(result)
ParseResult(scheme='http', netloc='user:[email protected]:80', 
            path='/root;para1/subpath', 
            params='para2', 
            query='arg1=val1', 
            fragment='fragment')

您可能已经注意到,路径最后一部分的参数在params中得到了正确解析,但第一部分的参数保留在path中。

在这种情况下,您可能希望依赖于urllib.parse.urlsplit,它不会解析参数,而是让您自己解析。因此,您可以自己从参数中拆分 URL 段:

>>> parsed = urllib.parse.urlsplit(url)
>>> print(parsed)
SplitResult(scheme='http', netloc='user:[email protected]:80', 
            path='/root;para1/subpath;para2', 
            query='arg1=val1', 
            fragment='fragment')

请注意,在本例中,所有参数都保留在path中,然后您可以自己拆分它们。

使用 HTTP

您可能正在与基于 HTTP REST API 的第三方服务进行交互,也可能正在从第三方获取内容,或者只是下载软件需要作为输入的文件。这并不重要。如今,编写应用程序而忽略 HTTP 几乎是不可能的;你迟早要面对它。人们期望所有类型的应用程序都支持 HTTP。如果您正在编写一个图像查看器,他们可能希望能够将一个指向图像的 URL 抛出到它并看到它出现。

虽然 Python 标准库从未真正做到用户友好和显而易见,但它始终有与 HTTP 交互的方法,而且它们是现成的。

怎么做。。。

此配方的步骤如下所示:

  1. urllib.request模块提供提交 HTTP 请求所需的机制。围绕它的轻型包装器可以解决大多数使用 HTTP 的需求:
import urllib.request
import urllib.parse
import json

def http_request(url, query=None, method=None, headers={}, data=None):
    """Perform an HTTP request and return the associated response."""
    parts = vars(urllib.parse.urlparse(url))
    if query:
        parts['query'] = urllib.parse.urlencode(query)

    url = urllib.parse.ParseResult(**parts).geturl()
    r = urllib.request.Request(url=url, method=method, 
                            headers=headers,
                            data=data)
    with urllib.request.urlopen(r) as resp:
        msg, resp = resp.info(), resp.read()

    if msg.get_content_type() == 'application/json':
        resp = json.loads(resp.decode('utf-8'))

    return msg, resp
  1. 我们可以使用http_request函数执行获取文件的请求:
>>> msg, resp = http_request('https://httpbin.org/bytes/16')
>>> print(msg.get_content_type(), resp)
application/octet-stream b'k\xe3\x05\x06=\x17\x1a9%#\xd0\xae\xd8\xdc\xf9>'
  1. 我们还可以使用它与基于 JSON 的 API 交互:
>>> msg, resp = http_request('https://httpbin.org/get', query={
...     'a': 'Hello',
...     'b': 'World'
... })
>>> print(msg.get_content_type(), resp)
application/json
{'url': 'https://httpbin.org/get?a=Hello&b=World', 
 'headers': {'Accept-Encoding': 'identity', 
             'User-Agent': 'Python-urllib/3.5', 
             'Connection': 'close', 
             'Host': 'httpbin.org'}, 
 'args': {'a': 'Hello', 'b': 'World'}, 
 'origin': '127.19.102.123'}
  1. 此外,它还可用于向端点提交或上载数据:
>>> msg, resp = http_request('https://httpbin.org/post', method='POST',
...                          data='This is my posted data!'.encode('ascii'),
...                          headers={'Content-Type': 'text/plain'})
>>> print(msg.get_content_type(), resp)
application/json 
{'data': 'This is my posted data!', 
 'json': None, 
 'form': {}, 
 'args': {}, 
 'files': {}, 
 'headers': {'User-Agent': 'Python-urllib/3.5', 
             'Connection': 'close', 
             'Content-Type': 'text/plain', 
             'Host': 'httpbin.org', 
             'Accept-Encoding': 'identity', 
             'Content-Length': '23'}, 
 'url': 'https://httpbin.org/post', 
 'origin': '127.19.102.123'}

它是如何工作的。。。

http_request方法负责创建urllib.request.Request实例,通过网络发送并获取响应。

将请求发送到附加查询参数的指定 URL。

该函数要做的第一件事是解析 URL,以便它可以替换其中的一部分。这样做是为了能够用提供的参数替换/追加查询参数:

parts = vars(urllib.parse.urlparse(url))
if query:
    parts['query'] = urllib.parse.urlencode(query)

urllib.parse.urlencode将接受参数字典,如{'a': 5, 'b': 7},并将返回包含urlencode参数的字符串:'b=7&a=5'

然后将生成的查询字符串放入url的已解析部分,以替换当前存在的查询参数。

然后从现在包含正确查询参数的所有部分重新构建url

url = urllib.parse.ParseResult(**parts).geturl()

一旦编码查询的url准备就绪,它将从中构建一个请求,代理指定的方法、头和请求体:

r = urllib.request.Request(url=url, method=method, headers=headers,
                           data=data)

当执行普通的GET请求时,这些将是默认的请求,但是能够指定它们允许我们执行更高级的请求,例如POST,或者在请求中提供特殊的头。

然后打开请求并读回响应:

with urllib.request.urlopen(r) as resp:
    msg, resp = resp.info(), resp.read()

响应作为一个urllib.response.addinfourl对象返回,包含两个相关部分:响应主体和http.client.HTTPMessage,从中我们可以获得所有响应信息,如标题、URL 等。

通过像文件一样读取响应来检索主体,而通过info()方法检索HTTPMessage

通过检索到的信息,我们可以检查响应是否为 JSON 响应,在本例中,我们将其解码回字典,以便我们可以导航响应,而不仅仅是接收普通字节:

if msg.get_content_type() == 'application/json':
    resp = json.loads(resp.decode('utf-8'))

对于所有响应,我们返回消息和正文。如果不需要,调用方可以忽略该消息:

return msg, resp

还有更多。。。

对于简单的情况,生成 HTTP 请求可能非常简单,对于更复杂的情况,生成 HTTP 请求可能非常复杂。完美地处理 HTTP 协议可能是一项漫长而复杂的工作,特别是因为协议规范本身并不总是清楚地规定事情应该如何工作,而且很多都来自于对真实的现有 web 服务器和客户端如何工作的经验。

出于这个原因,如果您需要的不仅仅是获取简单端点,那么您可能希望依靠第三方库来执行 HTTP 请求,例如几乎所有 Python 环境都可以使用的请求库。

向 HTTP 提交表单

有时,您必须与 HTML 表单交互或上载文件。这通常需要处理multipart/form-data编码。

表单可以混合文件和文本数据,并且表单中可以有多个不同的字段。因此,它需要一种在同一请求中表示多个字段的方法,其中一些字段可以是二进制文件。

这就是为什么在多部分中编码数据会变得棘手的原因,但在大多数情况下,仅使用标准库工具就可以推出基本配方。

怎么做。。。

以下是此配方的步骤:

  1. multipart本身需要跟踪我们想要编码的所有字段和文件,然后自己执行编码。
  2. 我们将依赖io.BytesIO来存储所有生成的字节:
import io
import mimetypes
import uuid

class MultiPartForm:
    def __init__(self):
        self.fields = {}
        self.files = []

    def __setitem__(self, name, value):
        self.fields[name] = value

    def add_file(self, field, filename, data, mimetype=None):
        if mimetype is None:
            mimetype = (mimetypes.guess_type(filename)[0] or
                        'application/octet-stream')
        self.files.append((field, filename, mimetype, data))

    def _generate_bytes(self, boundary):
        buffer = io.BytesIO()
        for field, value in self.fields.items():
            buffer.write(b'--' + boundary + b'\r\n')
            buffer.write('Content-Disposition: form-data; '
                        'name="{}"\r\n'.format(field).encode('utf-8'))
            buffer.write(b'\r\n')
            buffer.write(value.encode('utf-8'))
            buffer.write(b'\r\n')
        for field, filename, f_content_type, body in self.files:
            buffer.write(b'--' + boundary + b'\r\n')
            buffer.write('Content-Disposition: file; '
                        'name="{}"; filename="{}"\r\n'.format(
                            field, filename
                        ).encode('utf-8'))
            buffer.write('Content-Type: {}\r\n'.format(
                f_content_type
            ).encode('utf-8'))
            buffer.write(b'\r\n')
            buffer.write(body)
            buffer.write(b'\r\n')
        buffer.write(b'--' + boundary + b'--\r\n')
        return buffer.getvalue()

    def encode(self):
        boundary = uuid.uuid4().hex.encode('ascii')
        while boundary in self._generate_bytes(boundary=b'NOBOUNDARY'):
            boundary = uuid.uuid4().hex.encode('ascii')

        content_type = 'multipart/form-data; boundary={}'.format(
            boundary.decode('ascii')
        )
        return content_type, self._generate_bytes(boundary)
  1. 然后我们可以提供并编码我们的form数据:
>>> form = MultiPartForm()
>>> form['name'] = 'value'
>>> form.add_file('file1', 'somefile.txt', b'Some Content', 'text/plain')
>>> content_type, form_body = form.encode()
>>> print(content_type, '\n\n', form_body.decode('ascii'))
multipart/form-data; boundary=6c5109dfa19a450695013d4eecac2b0b 

--6c5109dfa19a450695013d4eecac2b0b
Content-Disposition: form-data; name="name"

value
--6c5109dfa19a450695013d4eecac2b0b
Content-Disposition: file; name="file1"; filename="somefile.txt"
Content-Type: text/plain

Some Content
--6c5109dfa19a450695013d4eecac2b0b--
  1. 使用上一个配方中的http_request方法,我们可以通过 HTTP 提交任何form
>>> _, resp = http_request('https://httpbin.org/post', method='POST',
                           data=form_body, 
                           headers={'Content-Type': content_type})
>>> print(resp)
{'headers': {
    'Accept-Encoding': 'identity', 
    'Content-Type': 'multipart/form-data; boundary=6c5109dfa19a450695013d4eecac2b0b', 
    'User-Agent': 'Python-urllib/3.5', 
    'Content-Length': '272', 
    'Connection': 'close', 
    'Host': 'httpbin.org'
 }, 
 'json': None,
 'url': 'https://httpbin.org/post', 
 'data': '', 
 'args': {}, 
 'form': {'name': 'value'}, 
 'origin': '127.69.102.121', 
 'files': {'file1': 'Some Content'}}

如您所见,httpbin正确地接收了我们的file1name字段,并处理了这两个字段。

它是如何工作的。。。

multipart实际上是基于在单个主体中编码多个请求。每个部分由一个边界分隔,该边界内包含该部分的数据。

每个部分都可以提供数据和元数据,例如所提供数据的内容类型。

这样,接收器就可以知道所包含的数据是二进制的、文本的还是其他的。例如,为formsurname字段指定值的部分如下所示:

Content-Disposition: form-data; name="surname"

MySurname

为上传文件提供数据的部分如下所示:

Content-Disposition: file; name="file1"; filename="somefile.txt"
Content-Type: text/plain

Some Content

我们的MultiPartForm允许我们通过使用字典语法设置两个普通form字段来存储它们:

def __setitem__(self, name, value):
    self.fields[name] = value

我们可以在命令行上调用它,如下所示:

>>> form['name'] = 'value'

并通过add_file方式添加来提供文件:

def add_file(self, field, filename, data, mimetype=None):
    if mimetype is None:
        mimetype = (mimetypes.guess_type(filename)[0] or
                    'application/octet-stream')
    self.files.append((field, filename, mimetype, data))

我们可以在命令行上调用此方法,如下所示:

>>> form.add_file('file1', 'somefile.txt', b'Some Content', 'text/plain')

它们只是将想要的字段和文件记录在一个字典和一个列表中,只有在稍后调用_generate_bytes以实际生成完整的多部分内容时才会使用这些字段和文件。

所有的艰苦工作都是由_generate_bytes完成的,它遍历所有这些字段和文件,并为每个字段和文件创建一个零件:

for field, value in self.fields.items():
    buffer.write(b'--' + boundary + b'\r\n')
    buffer.write('Content-Disposition: form-data; '
                'name="{}"\r\n'.format(field).encode('utf-8'))
    buffer.write(b'\r\n')
    buffer.write(value.encode('utf-8'))
    buffer.write(b'\r\n')

由于边界必须分离每个部分,所以验证边界本身不包含在数据本身中,或者接收方可能错误地考虑在遇到它时结束的部分是非常重要的。

这就是为什么我们的MultiPartForm类生成一个boundary,检查它是否包含在多部分响应中,如果包含在多部分响应中,它将生成一个新的响应,直到它能够找到一个不包含在数据中的boundary

boundary = uuid.uuid4().hex.encode('ascii')
while boundary in self._generate_bytes(boundary=b'NOBOUNDARY'):
    boundary = uuid.uuid4().hex.encode('ascii')

一旦我们找到一个有效的boundary,我们就可以使用它来生成多部分内容,并将其返回给调用者,其中包含必须使用的内容类型(因为内容类型向接收者提供了一个提示,提示其boundary要检查哪一个内容):

content_type = 'multipart/form-data; boundary={}'.format(
    boundary.decode('ascii')
)
return content_type, self._generate_bytes(boundary)

还有更多。。。

多部分编码不是一个容易的主题;例如,在多部分正文中对名称进行编码不是一个容易的主题。

多年来,关于多部分内容中字段名和文件名的正确编码是什么,它被多次更改和讨论。

从历史上看,在这些字段中只依赖普通 ASCII 名称是安全的,因此,如果要确保提交数据的服务器能够正确接收数据,可能需要使用不涉及 Unicode 字符的简单文件名和字段。

多年来,人们提出了多种其他方式对这些字段和文件名进行编码。UTF-8 是官方支持的 HTML5 后备方案之一。建议的方法依赖于 UTF-8 对文件名和字段进行编码,因此它可以向后兼容使用普通 ASCII 名称的情况,但当服务器支持 Unicode 字符时,仍然可以依赖 Unicode 字符。

构建 HTML

无论何时构建网页、电子邮件或报告,您都可能需要使用需要向用户显示的实际值替换 HTML 模板中的占位符。

我们已经在第 2 章T**ext Management中看到了如何实现一个最小的、简单的模板引擎,但它在任何方面都不特定于 HTML。

在使用 HTML 时,特别重要的是要注意转义用户提供的值,因为这可能会导致断页甚至 XSS 攻击。

很明显,你不希望你的用户因为你在网站上以"<script>alert('You are hacked!')</script>"的姓氏注册而生你的气。

因此,Python 标准库提供了转义工具,可用于正确准备插入 HTML 的内容。

怎么做。。。

结合string.Formattercgi模块,可以创建一个为我们处理转义的格式化程序:

import string
import cgi

class HTMLFormatter(string.Formatter):
    def get_field(self, field_name, args, kwargs):
        val, key = super().get_field(field_name, args, kwargs)
        if hasattr(val, '__html__'):
            val = val.__html__()
        elif isinstance(val, str):
            val = cgi.escape(val)
        return val, key

class Markup:
    def __init__(self, v):
        self.v = v
    def __str__(self):
        return self.v
    def __html__(self):
        return str(self)

然后我们可以使用HTMLFormatterMarkup类,同时在需要时保留注入原始html的能力:

>>> html = HTMLFormatter().format('Hello {name}, you are {title}', 
                                  name='<strong>Name</strong>',
                                  title=Markup('<em>a developer</em>'))
>>> print(html)
Hello &lt;strong&gt;Name&lt;/strong&gt;, you are <em>a developer</em>

我们还可以很容易地将这个方法和关于文本模板引擎的方法结合起来,实现一个带有转义的简约 HTML 模板引擎。

它是如何工作的。。。

HTMLFormatter必须替换格式字符串中的值时,它将检查检索到的值是否有__html__方法:

if hasattr(val, '__html__'):
    val = val.__html__()

如果该方法存在,则应返回值的 HTML 表示形式。这应该是一个完全有效的转义 HTML。

否则,该值应为需要转义的字符串:

elif isinstance(val, str):
    val = cgi.escape(val)

这使得我们提供给HTMLFormatter的任何值在默认情况下都会被转义:

>>> html = HTMLFormatter().format('Hello {name}', 
                                  name='<strong>Name</strong>')
>>> print(html)
Hello &lt;strong&gt;Name&lt;/strong&gt;

如果我们想避免转义,我们可以依赖于Markup对象,它可以包装一个字符串,使其按原样通过,而不进行任何转义:

>>> html = HTMLFormatter().format('Hello {name}', 
                                  name=Markup('<strong>Name</strong>'))
>>> print(html)
Hello <strong>Name</strong>

这是因为我们的Markup对象实现了一个按原样返回字符串的__html__方法。由于我们的HTMLFormatter忽略了具有__html__方法的任何值,因此字符串将在没有任何形式转义的情况下通过。

虽然Markup允许我们禁用按需转义,但当我们知道实际上需要 HTML 时,我们可以将 HTML 方法应用于任何其他对象。任何需要在网页中表示的对象都可以提供一个__html__方法,并根据该方法自动转换为 HTML。

例如,您可以将__html__添加到您的User类中,任何时候您想要将您的用户放入网页,您只需要提供User实例本身。

服务 HTTP

通过 HTTP 进行交互是分布式应用程序甚至是完全分离的软件之间最频繁的通信方式之一,它也是所有现有 Web 应用程序和基于 Web 的工具的基础。

虽然 Python 有几十个优秀的 web 框架可以满足大多数不同的需求,但标准库本身具备实现基本 web 应用程序所需的所有基础。

怎么做。。。

Python 有一个名为 WSGI 的方便协议来实现基于 HTTP 的应用程序。而对于更高级的需求,可能需要一个 web 框架;对于非常简单的需求,Python 本身内置的wsgiref实现可以满足我们的需求:

import re
import inspect
from wsgiref.headers import Headers
from wsgiref.simple_server import make_server
from wsgiref.util import request_uri
from urllib.parse import parse_qs

class WSGIApplication:
    def __init__(self):
        self.routes = []

    def route(self, path):
        def _route_decorator(f):
            self.routes.append((re.compile(path), f))
            return f
        return _route_decorator

    def serve(self):
        httpd = make_server('', 8000, self)
        print("Serving on port 8000...")
        httpd.serve_forever()

    def _not_found(self, environ, resp):
        resp.status = '404 Not Found'
        return b"""<h1>Not Found</h1>"""

    def __call__(self, environ, start_response):
        request = Request(environ)

        routed_action = self._not_found
        for regex, action in self.routes:
            match = regex.fullmatch(request.path)
            if match:
                routed_action = action
                request.urlargs = match.groupdict()
                break

        resp = Response()

        if inspect.isclass(routed_action):
            routed_action = routed_action()
        body = routed_action(request, resp)

        resp.send(start_response)
        return [body]

class Response:
    def __init__(self):
        self.status = '200 OK'
        self.headers = Headers([
            ('Content-Type', 'text/html; charset=utf-8')
        ])

    def send(self, start_response):
        start_response(self.status, self.headers.items())

class Request:
    def __init__(self, environ):
        self.environ = environ
        self.urlargs = {}

    @property
    def path(self):
        return self.environ['PATH_INFO']

    @property
    def query(self):
        return parse_qs(self.environ['QUERY_STRING'])

然后我们可以创建一个WSGIApplication并向其注册任意数量的路由:

app = WSGIApplication()

@app.route('/')
def index(request, resp):
    return b'Hello World, <a href="/link">Click here</a>'

@app.route('/link')
def link(request, resp):
    return (b'You clicked the link! '
            b'Try <a href="/args?a=1&b=2">Some arguments</a>')

@app.route('/args')
def args(request, resp):
    return (b'You provided %b<br/>'
            b'Try <a href="/name/HelloWorld">URL Arguments</a>' % 
            repr(request.query).encode('utf-8'))

@app.route('/name/(?P<first_name>\\w+)')
def name(request, resp):
    return (b'Your name: %b' % request.urlargs['first_name'].encode('utf-8'))

一旦准备就绪,我们只需为应用程序提供服务:

app.serve()

如果一切正常,通过将浏览器指向http://localhost:8000,您应该会看到一个 Hello World 文本和一个链接,指向提供查询参数、URL 参数和各种 URL 服务的其他页面。

它是如何工作的。。。

WSGIApplication创建一个 WSGI 服务器,负责为 web 应用程序本身提供服务(self

def serve(self):
    httpd = make_server('', 8000, self)
    print("Serving on port 8000...")
    httpd.serve_forever()

对于每个请求,服务器都会调用WSGIApplication.__call__来检索该请求的响应。

WSGIApplication.__call__扫描所有注册的路由(每个路由可以注册到app.route(path),其中path为正则表达式)。当正则表达式与当前 URL 路径匹配时,将调用注册函数以生成该路由的响应:

def __call__(self, environ, start_response):
    request = Request(environ)

    routed_action = self._not_found
    for regex, action in self.routes:
        match = regex.fullmatch(request.path)
        if match:
            routed_action = action
            request.urlargs = match.groupdict()
            break

找到与路径匹配的函数后,将调用该函数以获取响应正文,然后将生成的正文返回给服务器:

resp = Response()
body = routed_action(request, resp)

resp.send(start_response)
return [body]

在返回主体之前,调用Response.send通过start_response可调用函数发送响应 HTTP 头和状态。

相反,ResponseRequest对象用于保存当前请求的环境(以及从 URL 解析的任何附加参数)、标题和响应状态。这样,为处理请求而调用的操作可以接收请求并在发送请求之前检查请求或从响应中添加/删除头。

还有更多。。。

虽然可以使用所提供的WSGIApplication实现为基于 HTTP 的基本应用程序提供服务,但对于功能齐全的应用程序来说,有许多功能缺失或不完整。

当涉及更复杂的 web 应用程序时,通常需要缓存、会话、身份验证、授权、管理数据库连接、事务和管理等部分,大多数 Python web 框架都可以轻松地为您提供这些部分。

实现一个完整的 web 框架超出了本书的范围,当 Python 环境中有许多优秀的 web 框架可用时,您可能应该避免重新发明轮子。

Python 拥有广泛的 web 框架,涵盖了从用于快速开发的全栈框架到 Django;面向 API 的微框架,如 Flask;到灵活的解决方案,如金字塔和涡轮齿轮,其中所需的部件可以根据需要启用、禁用或更换,范围从全堆栈解决方案到微框架。

提供静态文件

有时在基于 JavaScript 的应用程序或静态网站上工作时,需要能够直接从磁盘提供目录内容。

Python 标准库有一个现成的 HTTP 服务器,可以处理请求,将它们映射到目录中的文件,因此我们可以快速滚动自己的 HTTP 服务器来编写网站,而无需安装任何其他工具。

怎么做。。。

http.server模块提供了实现负责服务目录内容的 HTTP 服务器所需的大部分内容:

import os.path
import socketserver
from http.server import SimpleHTTPRequestHandler, HTTPServer

def serve_directory(path, port=8000):
    class ConfiguredHandler(HTTPDirectoryRequestHandler):
        SERVED_DIRECTORY = path
    httpd = ThreadingHTTPServer(("", port), ConfiguredHandler)
    print("serving on port", port)
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        httpd.server_close()

class ThreadingHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
    pass

class HTTPDirectoryRequestHandler(SimpleHTTPRequestHandler):
    SERVED_DIRECTORY = '.'

    def translate_path(self, path):
        path = super().translate_path(path)
        relpath = os.path.relpath(path)
        return os.path.join(self.SERVED_DIRECTORY, relpath)

然后可以针对任何路径启动serve_directory,在http://localhost:8000上提供该路径的内容:

serve_directory('/tmp')

将浏览器指向http://localhost:8000应列出/tmp目录的内容,并允许您导航该目录并查看任何文件的内容。

它是如何工作的。。。

ThreadingHTTPServerHTTPServerThreadingMixin合并,允许您一次处理多个请求。

在为静态网站提供服务时,这一点尤其重要,因为浏览器经常会使连接打开的时间超过需要的时间,并且在一次为单个请求提供服务时,在浏览器关闭上一个连接之前,您可能无法获取 CSS 或 JavaScript 文件。

对于每个请求,HTTPServer将其转发给指定的处理程序进行处理。SimpleHTTPRequestHandler能够为请求提供服务,将它们映射到磁盘上的本地文件,但在大多数 Python 版本中,它只能从当前目录提供服务。

为了能够为来自任何目录的请求提供服务,我们提供了一个定制的translate_path方法,它替换了与SERVED_DIRECTORY类变量相关的标准实现产生的路径。

serve_directory然后将所有内容放在一起,并将HTTPServer与定制的请求处理程序连接,以创建一个能够处理所提供路径的请求的服务器。

还有更多。。。

在最近的 Python 版本中,http.server模块发生了很多变化。最新版本 Python 3.7 已经提供了开箱即用的ThreadingHTTPServer类,现在可以配置SimpleHTTPRequestHandler服务的特定目录,从而无需定制translate_path方法来服务特定目录。

web 应用程序中的错误

通常,当 Python WSGI web 应用程序崩溃时,终端中会出现回溯,浏览器中会出现空路径。

这并不能使调试正在发生的事情变得非常容易,除非你明确地检查你的终端,否则很容易忽略你的页面没有显示,因为它实际上崩溃了。

幸运的是,Python 标准库为 web 应用程序提供了一些基本的调试工具,可以在浏览器中报告崩溃,这样您就可以看到并修复它们,而无需跳出浏览器。

怎么做。。。

cgitb模块提供了将异常及其回溯格式化为 HTML 的工具,因此我们可以利用它来实现 WSGI 中间件,该中间件可以包装任何 web 应用程序,以便在浏览器中提供更好的错误报告:

import cgitb
import sys

class ErrorMiddleware:
    """Wrap a WSGI application to display errors in the browser"""
    def __init__(self, app):
        self.app = app

    def __call__(self, environ, start_response):
        app_iter = None
        try:
            app_iter = self.app(environ, start_response)
            for item in app_iter:
                yield item
        except:
            try:
                start_response('500 INTERNAL SERVER ERROR', [
                    ('Content-Type', 'text/html; charset=utf-8'),
                    ('X-XSS-Protection', '0'),
                ])
            except Exception:
                # There has been output but an error occurred later on. 
                # In that situation we can do nothing fancy anymore, 
                # better log something into the error log and fallback.
                environ['wsgi.errors'].write(
                    'Debugging middleware caught exception in streamed '
                    'response after response headers were already sent.\n'
                )
            else:
                yield cgitb.html(sys.exc_info()).encode('utf-8')
        finally:
            if hasattr(app_iter, 'close'):
                app_iter.close()

ErrorMiddleware可用于包装任何 WSGI 应用程序,以便在出现错误时将错误显示在 web 浏览器中。

例如,我们可以从上一个配方中提取我们的WSGIApplication,添加一条将导致崩溃的路由,并为包装好的应用程序提供服务,以查看如何将错误报告到 web 浏览器中:

from web_06 import WSGIApplication
from wsgiref.simple_server import make_server

app = WSGIApplication()

@app.route('/crash')
def crash(req, resp):
    raise RuntimeError('This is a crash!')

app = ErrorMiddleware(app)

httpd = make_server('', 8000, app)
print("Serving on port 8000...")
httpd.serve_forever()

一旦您将浏览器指向http://localhost:8000/crash,您应该会看到一个格式良好的触发异常回溯。

它是如何工作的。。。

ErrorMiddleware接收原始申请并在请求处理中替换。

所有 HTTP 请求都将由ErrorMiddleware接收,然后由ErrorMiddleware将它们代理给应用程序,并返回应用程序提供的结果响应。

如果在使用应用程序响应时出现异常,它将停止标准流,而不是进一步使用应用程序的响应,它将格式化异常并将其作为响应发送回浏览器。

之所以这样做,是因为ErrorMiddleware.__call__实际上调用了包装好的应用程序并迭代任何提供的结果:

def __call__(self, environ, start_response):
    app_iter = None
    try:
        app_iter = self.app(environ, start_response)
        for item in app_iter:
            yield item
    ...

这种方法既适用于返回正常响应的应用程序,也适用于返回生成器作为响应的应用程序。

如果调用应用程序或使用响应时出现错误,则会捕获错误,并尝试向浏览器通知新的start_response服务器错误:

except:
    try:
        start_response('500 INTERNAL SERVER ERROR', [
            ('Content-Type', 'text/html; charset=utf-8'),
            ('X-XSS-Protection', '0'),
        ])

如果start_response失败,则意味着包装的应用程序已经调用了start_response,因此无法再更改响应状态代码或标头。

在这种情况下,由于我们无法再提供格式良好的响应,我们只能退回到在终端上提供错误:

except Exception:
    # There has been output but an error occurred later on. 
    # In that situation we can do nothing fancy anymore, 
    # better log something into the error log and fallback.
    environ['wsgi.errors'].write(
        'Debugging middleware caught exception in streamed '
        'response after response headers were already sent.\n'
    )

如果start_response成功,我们将停止返回应用程序响应的内容,而是返回错误和回溯,格式由cgitb正确设置:

else:
    yield cgitb.html(sys.exc_info()).encode('utf-8')

在这两种情况下,如果它提供了close方法,我们将关闭应用程序响应。这样,如果是需要关闭的文件或任何源,我们可以避免泄漏:

finally:
    if hasattr(app_iter, 'close'):
        app_iter.close()

还有更多。。。

在标准库之外,可以使用 Python 提供更完整的 web 应用程序错误报告解决方案。如果您有进一步的需求或希望通过电子邮件或云错误报告解决方案(如 Sentry)获得错误通知,您可能需要提供错误报告 WSGI 库。

Flask 中的Werkzeug调试器、Pylons 项目中的WebError库和 TurboGears 项目中的Backlash库可能是最常见的解决方案。

您可能还想检查您的 web 框架是否提供了一些高级错误报告配置,因为其中许多框架都依赖于这些库或其他工具提供了现成的配置。

处理表格和文件

提交表单和上传文件时,通常使用multipart/form-data编码发送。

我们已经看到了如何创建编码在multipart/form-data中的数据,并将其提交给端点,但是我们如何处理这种格式的传入数据呢?

怎么做。。。

标准库中的cgi.FieldStorage类已经提供了解析多部分数据并以易于处理的方式将其发送回您所需的所有机制。

我们将创建一个简单的 web 应用程序(基于WSGIApplication),展示如何使用cgi.FieldStorage解析上传的文件并将其展示给用户:

import cgi

from web_06 import WSGIApplication
import base64

app = WSGIApplication()

@app.route('/')
def index(req, resp):
    return (
        b'<form action="/upload" method="post" enctype="multipart/form-
           data">'
        b'  <input type="file" name="uploadedfile"/>'
        b'  <input type="submit" value="Upload">'
        b'</form>'
    )

@app.route('/upload')
def upload(req, resp):
    form = cgi.FieldStorage(fp=req.environ['wsgi.input'], 
                            environ=req.environ)
    if 'uploadedfile' not in form:
        return b'Nothing uploaded'

    uploadedfile = form['uploadedfile']
    if uploadedfile.type.startswith('image'):
        # User uploaded an image, show it
        return b'<img src="data:%b;base64,%b"/>' % (
            uploadedfile.type.encode('ascii'),
            base64.b64encode(uploadedfile.file.read())
        )
    elif uploadedfile.type.startswith('text'):
        return uploadedfile.file.read()
    else:
        return b'You uploaded %b' % uploadedfile.filename.encode('utf-8')

app.serve()

它是如何工作的。。。

该应用程序公开两个网页。一个位于网站的根目录上(通过index功能),它只显示一个带有上传字段的简单表单。

另一个是upload函数,它接收上传的文件,如果是图像或文本文件,则将其显示出来。在所有其他情况下,它只会显示上载文件的名称。

以多部分格式处理上传所需的全部工作就是从中创建一个cgi.FieldStorage

form = cgi.FieldStorage(fp=req.environ['wsgi.input'], 
                        environ=req.environ)

POST请求的整个主体在environ请求中始终可用wsgi.input键。

这提供了一个类似文件的对象,可以读取该对象以使用发布的数据。如果您需要多次使用FieldStorage,请确保在创建FieldStorage后将其保存在一旁,因为一旦wsgi.input中的数据被使用,它将变得不可访问。

cgi.FieldStorage提供了一个类似字典的界面,所以我们可以通过检查uploadedfile条目是否存在来检查文件是否上传:

if 'uploadedfile' not in form:
    return b'Nothing uploaded'

这是因为在我们的表单中,我们提供了uploadedfile作为字段名称:

b'  <input type="file" name="uploadedfile"/>'

可通过form['uploadedfile']访问该特定字段。

由于它是一个文件,它将返回一个提供typefilenamefile属性的对象,通过这些属性我们可以检查上传文件的 MIME 类型,以查看它是否是图像:

if uploadedfile.type.startswith('image'):

如果它是一个图像,我们可以读取它的内容,将其编码到base64中,这样就可以通过img标签显示:

base64.b64encode(uploadedfile.file.read())

只有上传文件的格式无法识别时,才使用filename属性,这样我们至少可以打印回上传文件的名称:

return b'You uploaded %b' % uploadedfile.filename.encode('utf-8')

RESTAPI

REST with JSON 已经成为基于 web 的应用程序跨应用程序通信技术的事实标准。

这是一个非常有效的协议,而且每个人都能理解它的定义这一事实使得它很快流行起来。

此外,与其他更复杂的通信协议相比,快速 REST 实现可以非常快地推出。

由于 Python 标准库提供了构建基于 WSGI 的应用程序所需的基础,因此不难扩展我们现有的方法来支持基于 REST 的请求分派。

怎么做。。。

我们将使用上一个配方中的WSGIApplication,但不是为根注册函数,而是注册一个能够基于请求方法分派的特定类。

  1. 我们要实现的所有 REST 类都必须从单个RestController实现继承:
class RestController:
    def __call__(self, req, resp):
        method = req.environ['REQUEST_METHOD']
        action = getattr(self, method, self._not_found)
        return action(req, resp)

    def _not_found(self, environ, resp):
        resp.status = '404 Not Found'
        return b'{}'  # Provide an empty JSON document
  1. 然后我们可以将RestController子类化,实现所有具体的GETPOSTDELETEPUT方法,并注册特定路由上的资源:
import json
from web_06 import WSGIApplication

app = WSGIApplication()

@app.route('/resources/?(?P<id>\\w*)')
class ResourcesRestController(RestController):
    RESOURCES = {}

    def GET(self, req, resp):
        resource_id = req.urlargs['id']
        if not resource_id:
            # Whole catalog requested
            return json.dumps(self.RESOURCES).encode('utf-8')

        if resource_id not in self.RESOURCES:
            return self._not_found(req, resp)

        return json.dumps(self.RESOURCES[resource_id]).encode('utf-8')

    def POST(self, req, resp):
        content_length = int(req.environ['CONTENT_LENGTH'])
        data = req.environ['wsgi.input'].read(content_length).decode('utf-8')

        resource = json.loads(data)
        resource['id'] = str(len(self.RESOURCES)+1)
        self.RESOURCES[resource['id']] = resource
        return json.dumps(resource).encode('utf-8')

    def DELETE(self, req, resp):
        resource_id = req.urlargs['id']
        if not resource_id:
            return self._not_found(req, resp)
        self.RESOURCES.pop(resource_id, None)

        req.status = '204 No Content'
        return b''

这已经提供了一些基本功能,允许我们从内存目录中添加、删除和列出资源。

  1. 为了测试这一点,我们可以在后台线程中启动一台服务器,并使用我们之前配方中的http_request函数:
import threading
threading.Thread(target=app.serve, daemon=True).start()

from web_03 import http_request
  1. 然后,我们可以创建一个新资源:
>>> _, resp = http_request('http://localhost:8000/resources', method='POST', 
                           data=json.dumps({'name': 'Mario',
                                            'surname': 'Mario'}).encode('utf-8'))
>>> print('NEW RESOURCE: ', resp)
NEW RESOURCE:  b'{"surname": "Mario", "id": "1", "name": "Mario"}'
  1. 我们在这里列出了所有这些:
>>> _, resp = http_request('http://localhost:8000/resources')
>>> print('ALL RESOURCES: ', resp)
ALL RESOURCES:  b'{"1": {"surname": "Mario", "id": "1", "name": "Mario"}}'
  1. 添加第二个:
>>> http_request('http://localhost:8000/resources', method='POST', 
                 data=json.dumps({'name': 'Luigi',
                                  'surname': 'Mario'}).encode('utf-8'))
  1. 接下来,我们看到现在两个资源都列出了:
>>> _, resp = http_request('http://localhost:8000/resources')
>>> print('ALL RESOURCES: ', resp)
ALL RESOURCES:  b'{"1": {"surname": "Mario", "id": "1", "name": "Mario"}, 
                   "2": {"surname": "Mario", "id": "2", "name": "Luigi"}}'
  1. 然后,我们可以从目录中要求特定的资源:
>>> _, resp = http_request('http://localhost:8000/resources/1')
>>> print('RESOURCES #1: ', resp)
RESOURCES #1:  b'{"surname": "Mario", "id": "1", "name": "Mario"}'
  1. 我们还可以删除特定资源:
>>> http_request('http://localhost:8000/resources/2', method='DELETE')
  1. 然后查看它是否已被实际删除:
>>> _, resp = http_request('http://localhost:8000/resources')
>>> print('ALL RESOURCES', resp)
ALL RESOURCES b'{"1": {"surname": "Mario", "id": "1", "name": "Mario"}}'

这应该允许我们为大多数简单的情况提供一个 REST 接口,依赖于 Python 标准库本身已经提供的内容。

它是如何工作的。。。

大部分魔法都是由RestController.__call__完成的:

class RestController:
    def __call__(self, req, resp):
        method = req.environ['REQUEST_METHOD']
        action = getattr(self, method, self._not_found)
        return action(req, resp)

无论何时调用RestController的子类,它都会查看 HTTP 请求方法并查找名为类似 HTTP 方法的实例方法。

如果有,则调用该方法并返回该方法本身提供的响应。如果没有,则调用self._not_found,只响应 404 错误。

这依赖于WSGIApplication.__call__对类而不是函数的支持。

WSGIApplication.__call__通过app.route发现一个路由关联的对象,该对象是一个类时,它总是会创建一个实例,然后调用该实例:

if inspect.isclass(routed_action):
    routed_action = routed_action()
body = routed_action(request, resp)

如果routed_actionRestController子类,那么routed_action = routed_action()将用该类的实例替换该类,然后routed_action(request, resp)将调用RestController.__call__方法来实际服务该请求。

然后,RestController.__call__方法可以将请求转发到基于 HTTP 方法的正确实例方法。

注意,由于 REST 资源是通过在 URL 中提供资源标识符来标识的,分配给RestController的路由必须有一个id参数和一个可选的/

@app.route('/resources/?(?P<id>\\w*)')

否则,您将无法区分对整个GET资源目录/resources的请求和对特定GET资源/resources/3的请求。

缺少id参数正是我们的GET方法决定何时返回整个目录的内容的方式:

def GET(self, req, resp):
    resource_id = req.urlargs['id']
    if not resource_id:
        # Whole catalog requested
        return json.dumps(self.RESOURCES).encode('utf-8')

对于接收请求正文中的数据的方法,例如POSTPUTPATCH,您必须从req.environ['wsgi.input']读取请求正文。

在这种情况下,提供要读取的确切字节数很重要,因为连接可能永远不会关闭,否则读取可能永远阻塞。

Content-Length标题可用于了解输入的长度:

def POST(self, req, resp):
    content_length = int(req.environ['CONTENT_LENGTH'])
    data = req.environ['wsgi.input'].read(content_length).decode('utf-8')

处理饼干

网络应用程序中经常使用 cookie 在浏览器中存储数据。最常见的用例是用户标识。

我们将实现一个基于 cookies 的非常简单且不安全的识别系统,以展示如何使用 cookies。

怎么做。。。

http.cookies.SimpleCookie类提供解析和生成 cookie 所需的所有工具。

  1. 我们可以依靠它来创建一个 web 应用程序端点,该端点将设置 cookie:
from web_06 import WSGIApplication

app = WSGIApplication()

import time
from http.cookies import SimpleCookie

@app.route('/identity')
def identity(req, resp):
    identity = int(time.time())

    cookie = SimpleCookie()
    cookie['identity'] = 'USER: {}'.format(identity)

    for set_cookie in cookie.values():
        resp.headers.add_header('Set-Cookie', set_cookie.OutputString())
    return b'Go back to <a href="/">index</a> to check your identity'
  1. 我们可以使用它创建一个将解析 cookie 并告诉我们当前用户是谁的 cookie:
@app.route('/')
def index(req, resp):
    if 'HTTP_COOKIE' in req.environ:
        cookies = SimpleCookie(req.environ['HTTP_COOKIE'])
        if 'identity' in cookies:
            return b'Welcome back, %b' % cookies['identity'].value.encode('utf-8')
    return b'Visit <a href="/identity">/identity</a> to get an identity'
  1. 启动应用程序后,您可以将浏览器指向http://localhost:8000,您应该会看到 web 应用程序抱怨您缺少身份:
app.serve()

一旦你点击了建议的链接,你应该会得到一个,然后返回到索引页面,它应该会通过 cookie 识别你。

它是如何工作的。。。

SimpleCookie类将 cookie 表示为一组或多个值。

每个值都可以像字典一样设置到 cookie 中:

cookie = SimpleCookie()
cookie['identity'] = 'USER: {}'.format(identity)

如果 cookiemorsel必须接受更多选项,也可以使用字典语法进行设置:

cookie['identity']['Path'] = '/'

每个 cookie 可以包含多个值,每个值都应该设置一个Set-CookieHTTP 头。

在 cookie 上迭代将检索构成 cookie 的所有键/值对,然后对它们调用OutputString()将返回由Set-Cookie头按预期编码的 cookie 值,以及所有附加属性:

for set_cookie in cookie.values():
    resp.headers.add_header('Set-Cookie', set_cookie.OutputString())

实际上,一旦设置了 cookie,调用OutputString()将返回需要发送到浏览器的字符串:

>>> cookie = SimpleCookie()
>>> cookie['somevalue'] = 42
>>> cookie['somevalue']['Path'] = '/'
>>> cookie['somevalue'].OutputString()
'somevalue=42; Path=/'

读回 cookie 非常简单,只要从environ['HTTP_COOKIE']值(如果可用)构建 cookie 即可:

cookies = SimpleCookie(req.environ['HTTP_COOKIE'])

一旦对 cookie 进行了解析,就可以使用字典语法访问其中存储的值:

cookies['identity']

还有更多。。。

使用 cookie 时,您应该注意的一个特殊情况是 cookie 的生命周期。

cookie 可以有一个Expires属性,该属性将说明它们应该在哪一天死亡(浏览器将丢弃它们),实际上,这就是删除 cookie 的方式。如果再次设置 cookie 的日期为过去的Expires日期,则会将其删除。

但是 Cookie 也可以有一个Max-Age属性,该属性说明它们应该停留多长时间,或者可以创建为会话 Cookie,当浏览器窗口关闭时,会话 Cookie 将消失。

因此,如果您面临 cookie 随机消失或无法正确加载的问题,请始终检查这些属性,因为 cookie 可能刚刚被浏览器删除。