在本章中,我们将介绍以下配方:
- 基本的日志记录允许您跟踪软件正在做什么,并且它通常与其输出无关
- 日志记录到文件当日志记录频繁时,有必要将日志存储在磁盘上
- 登录到 Syslog 如果您的系统有一个 Syslog 守护进程,您可能希望登录到 Syslog,而不是使用独立文件
- 解析参数使用命令行工具编写时,几乎任何工具都需要解析选项
- 交互式 Shell 有时选项不够,您需要一种形式的 Read-Eval 打印循环来驱动您的工具
- 调整终端文本的大小要正确对齐显示的输出,我们需要知道终端窗口的大小
- 运行系统命令如何在软件中集成其他第三方命令
- 进度条如何在文本工具中显示进度条
- 消息框如何在文本工具中显示确定/取消消息框
- 输入框如何在文本工具中请求输入
编写新工具时,首先需要的是使其能够与周围环境交互,以显示结果、跟踪错误和接收输入。
用户习惯于命令行工具与他们和系统交互的某些标准方式,如果从头开始,遵循此标准可能会非常耗时和困难。
这就是为什么 Python 中的标准库提供了一些工具来满足实现能够通过 shell 和文本进行交互的软件的最常见需求。
在本章中,我们将了解如何实现一些形式的日志记录,以便我们的程序能够保存日志文件;我们将看到如何实现基于选项和交互式软件,然后我们将看到如何实现基于文本的更高级图形输出。
控制台软件的首要要求之一是记录它所做的事情,即发生的事情,以及任何警告或错误。特别是当我们谈论在后台运行的长期软件或守护进程时。
不幸的是,如果您曾经尝试使用 Pythonlogging
模块,您可能已经注意到除了错误之外,您无法获得任何输出。
这是因为默认启用级别为WARNING
,因此只跟踪警告和更糟糕的情况。要使日志记录普遍可用,需要做一些小的调整。
对于该配方,步骤如下:
logging
模块允许我们通过basicConfig
方式轻松设置日志配置:
>>> import logging, sys
>>>
>>> logging.basicConfig(level=logging.INFO, stream=sys.stderr,
... format='%(asctime)s %(name)s %(levelname)s: %(message)s')
>>> log = logging.getLogger(__name__)
- 既然我们的
logger
配置正确,我们可以尝试使用它:
>>> def dosum(a, b, count=1):
... log.info('Starting sum')
... if a == b == 0:
... log.warning('Will be just 0 for any count')
... res = (a + b) * count
... log.info('(%s + %s) * %s = %s' % (a, b, count, res))
... print(res)
...
>>> dosum(5, 3)
2018-02-11 22:07:59,870 __main__ INFO: Starting sum
2018-02-11 22:07:59,870 __main__ INFO: (5 + 3) * 1 = 8
8
>>> dosum(5, 3, count=2)
2018-02-11 22:07:59,870 __main__ INFO: Starting sum
2018-02-11 22:07:59,870 __main__ INFO: (5 + 3) * 2 = 16
16
>>> dosum(0, 1, count=5)
2018-02-11 22:07:59,870 __main__ INFO: Starting sum
2018-02-11 22:07:59,870 __main__ INFO: (0 + 1) * 5 = 5
5
>>> dosum(0, 0)
2018-02-11 22:08:00,621 __main__ INFO: Starting sum
2018-02-11 22:08:00,621 __main__ WARNING: Will be just 0 for any count
2018-02-11 22:08:00,621 __main__ INFO: (0 + 0) * 1 = 0
0
logging.basicConfig
将root
记录器(如果未找到所用记录器的特定配置,Python 将使用主记录器)配置为写入INFO
级别或更高级别的内容。这将允许我们显示除调试消息之外的所有内容。format
参数指定日志消息的格式;在本例中,我们添加了日期和时间、记录器的名称、记录的级别以及消息本身。最后,stream
参数告诉记录器将其输出写入标准错误。
一旦我们配置了root
记录器,我们选择的任何没有特定配置的日志都将使用root
记录器。
因此,下一行,logging.getLogger(__name__)
将获得一个名为类似于它正在执行的 Python 模块的记录器。如果您将代码保存到一个文件中,记录器将被命名为dosum
(假设您的文件名为dosum.py
;如果没有,则记录器将命名为__main__
,如前一示例所示。
Python 记录器是在第一次使用logging.getLogger
检索它们时创建的,随后对getLogger
的任何调用都只会返回已经存在的一个。然而,对于一个非常简单的程序,名称并不重要,在大型软件中,通常最好抓取多个记录器,以便您可以区分消息来自软件的哪个子系统。
您可能想知道为什么我们将logging
配置为将其输出发送到stderr
,而不是标准输出。这允许我们将软件的输出(通过打印语句写入stdout
中)与日志信息分开。这通常是一个很好的实践,因为工具的用户可能需要调用工具的输出,而不需要记录消息所产生的所有噪音,这样做允许我们使用以下内容调用脚本:
$ python dosum.py 2>/dev/null
8
16
5
0
我们只会返回结果,不会产生任何噪音,因为我们将stderr
重定向到了/dev/null
,这在 Unix 系统上会导致丢弃写入stderr
的所有内容。
对于长时间运行的程序,登录到屏幕不是一个非常可行的选择。在代码运行数小时后,最早记录的消息将丢失,即使它们仍然可用,读取所有日志或搜索它们也不是很容易。
将日志保存到文件允许无限长(只要我们的磁盘允许),并允许使用工具(如grep
)搜索日志。
默认情况下,Python 日志记录配置为写入屏幕,但在配置日志记录时,很容易提供写入任何文件的方法。
为了测试一个文件的logging
,我们将创建一个短工具,根据当前时间计算出*nth*Fibonacci 数。如果是下午 3:01,我们只想计算 1 个数字,而如果是下午 3:59,我们想计算 59 个数字。
软件将提供计算出的数字作为输出,但我们还希望记录计算出的数字以及运行时间:
import logging, sys
if __name__ == '__main__':
if len(sys.argv) < 2:
print('Please provide logging file name as argument')
sys.exit(1)
logging_file = sys.argv[1]
logging.basicConfig(level=logging.INFO, filename=logging_file,
format='%(asctime)s %(name)s %(levelname)s: %(message)s')
log = logging.getLogger(__name__)
def fibo(num):
log.info('Computing up to %sth fibonacci number', num)
a, b = 0, 1
for n in range(num):
a, b = b, a+b
print(b, '', end='')
print(b)
if __name__ == '__main__':
import datetime
fibo(datetime.datetime.now().second)
代码分为三部分:初始化日志、工具的fibo
功能和main
功能。我们以这种方式显式地划分代码,因为fibo
函数可能在其他模块中使用,在这种情况下,我们不希望logging
被重新配置;我们只想使用程序将提供的日志配置。因此,logging.basicConfig
调用被包装在__name__ == '__main__'
中,因此logging
仅在模块作为工具直接调用时配置,而不是在其他模块导入时配置。
当调用多个logging.basicConfig
实例时,只考虑第一个实例。如果我们在其他模块导入时没有将日志配置包装在if
中,它可能最终会驱动整个软件日志配置,这取决于模块导入的顺序,这显然是我们不想要的。
与我们之前的配方不同,basicConfig
配置了filename
参数,而不是stream
参数。这意味着将创建logging.FileHandler
来处理日志消息,并将消息附加到该文件中。
代码的中心部分是fibo
函数本身,最后一部分是检查代码是作为 Python 脚本调用还是作为模块导入。当作为模块导入时,我们只想提供fibo
函数并避免运行它,但当作为脚本执行时,我们想计算斐波那契数。
你可能想知道为什么我使用了两个if __name__ == '__main__'
部分;如果将两者合并为一,脚本将继续工作。但通常最好在尝试使用之前确保logging
已配置,否则结果将是我们最终使用logging.lastResort
处理程序,该处理程序将只写入stderr
,直到配置日志记录。
类 Unix 系统通常提供一种通过syslog
协议收集日志消息的方法,该协议允许我们将存储日志的系统与生成日志的系统分开。
特别是在应用程序分布在多个服务器上的情况下,这非常方便;您当然不想登录到 20 个不同的服务器来收集 Python 应用程序的所有日志,因为它在多个节点上运行。特别是对于 web 应用程序,这在云提供商中非常常见,因此能够在一个地方收集所有 Python 日志非常方便。
这正是使用syslog
允许我们做的;我们将看到如何将日志消息发送到系统上运行的守护进程,但也可以将它们发送到任何系统。
虽然这个配方不需要syslog
守护进程来工作,但您需要一个守护进程来检查它是否正常工作,或者消息是否可读。在 Linux 或 macOS 系统中,这通常是开箱即用的配置,但在 Windows 系统中,需要安装 Syslog 服务器或使用云解决方案。很多都存在,在谷歌上快速搜索应该可以为你提供一些便宜甚至免费的选择。
当使用高度定制的日志记录解决方案时,不可能再依赖logging.basicConfig
,因此我们必须手动设置日志记录环境:
import logging
import logging.config
# OSX logs through /var/run/syslog this should be /dev/log
# on Linux system or a tuple ('ADDRESS', PORT) to log to a remote server
SYSLOG_ADDRESS = '/var/run/syslog'
logging.config.dictConfig({
'version': 1,
'formatters': {
'default': {
'format': '%(asctime)s %(name)s: %(levelname)s %(message)s'
},
},
'handlers': {
'syslog': {
'class': 'logging.handlers.SysLogHandler',
'formatter': 'default',
'address': SYSLOG_ADDRESS
}
},
'root': {
'handlers': ['syslog'],
'level': 'INFO'
}
})
log = logging.getLogger()
log.info('Hello Syslog!')
如果工作正常,您的消息应该由 Syslog 记录,并在 macOS 上运行syslog
命令或在 Linux 上以tail
作为/var/log/syslog
时可见:
$ syslog | tail -n 2
Feb 18 17:52:43 Pulsar Google Chrome[294] <Error>: ... SOME CHROME ERROR MESSAGE ...
Feb 18 17:53:48 Pulsar 2018-02-18 17[4294967295] <Info>: 53:48,610 INFO root Hello Syslog!
The syslog
file path might change from distribution to distribution; if /var/log/syslog
doesn't work, try /var/log/messages
or refer to your distribution documentation.
由于我们依赖于dictConfig
,您注意到我们的配置比以前的配方要复杂一些。这是因为我们自己配置了日志基础结构的一部分。
无论何时配置日志记录,都可以使用记录器编写消息。默认情况下,系统只有一个记录器:root
记录器(如果您调用logging.getLogger
而不提供任何特定名称,则会得到该记录器)。
记录器本身不处理消息,因为写入或打印日志消息是处理程序负责的事情。因此,如果要读取发送的日志消息,则需要配置处理程序。在我们的例子中,我们使用SysLogHandler
,它写入 Syslog。
然后,Handler 负责编写消息,但并不真正参与如何构建/格式化消息。您注意到,除了您自己的消息之外,当您记录某些内容时,您还可以获得日志级别、记录器名称、时间戳以及日志系统为您添加的一些详细信息。将这些细节添加到消息中通常是格式化程序的工作。格式化程序获取记录器提供的所有信息,并将它们打包到应由处理程序编写的消息中。
最后但并非最不重要的一点是,您的日志记录配置可能非常复杂。您可以设置一些要转到本地文件的消息和一些要转到 Syslog 的消息,以及应在屏幕上打印的更多消息。这将涉及多个处理程序,它们应该知道应该威胁哪些消息,应该忽略哪些消息。允许这些知识是过滤器的工作。一旦将筛选器附加到处理程序,就可以控制该处理程序应保存哪些消息以及应忽略哪些消息。
Python 日志系统现在看起来可能非常直观,这是因为它是一个非常强大的解决方案,可以通过多种方式进行配置,但是一旦您了解了可用的构建块,就可以以非常灵活的方式组合它们。
在编写命令行工具时,通常会让它根据提供给可执行文件的选项更改行为。这些选项通常与可执行文件名一起出现在sys.argv
中,但解析它们并不像看起来那么容易,尤其是在必须支持多个参数的情况下。此外,当一个选项的格式不正确时,通常最好提供一条用法消息,告知用户使用该工具的正确方法。
为此配方执行以下步骤:
argparse.ArgumentParser
对象是负责解析命令行选项的主要对象:
import argparse
import operator
import logging
import functools
parser = argparse.ArgumentParser(
description='Applies an operation to one or more numbers'
)
parser.add_argument("number",
help="One or more numbers to perform an operation on.",
nargs='+', type=int)
parser.add_argument('-o', '--operation',
help="The operation to perform on numbers.",
choices=['add', 'sub', 'mul', 'div'], default='add')
parser.add_argument("-v", "--verbose", action="store_true",
help="increase output verbosity")
opts = parser.parse_args()
logging.basicConfig(level=logging.INFO if opts.verbose else logging.WARNING)
log = logging.getLogger()
operation = getattr(operator, opts.operation)
log.info('Applying %s to %s', opts.operation, opts.number)
print(functools.reduce(operation, opts.number))
- 一旦在没有任何参数的情况下调用了我们的命令,它将提供一个简短的用法文本:
$ python /tmp/doop.py
usage: doop.py [-h] [-o {add,sub,mul,div}] [-v] number [number ...]
doop.py: error: the following arguments are required: number
- 如果我们提供
-h
选项,argparse
将为我们生成完整的使用指南:
$ python /tmp/doop.py -h
usage: doop.py [-h] [-o {add,sub,mul,div}] [-v] number [number ...]
Applies an operation to one or more numbers
positional arguments:
number One or more numbers to perform an operation on.
optional arguments:
-h, --help show this help message and exit
-o {add,sub,mul,div}, --operation {add,sub,mul,div}
The operation to perform on numbers.
-v, --verbose increase output verbosity
- 使用该命令将导致预期结果:
$ python /tmp/dosum.py 1 2 3 4 -o mul
24
我们使用ArgumentParser.add_argument
方法填充可用选项列表。对于每个参数,还可以提供一个help
选项,该选项将为该参数声明help
字符串。
位置参数仅提供参数的名称:
parser.add_argument("number",
help="One or more numbers to perform an operation on.",
nargs='+', type=int)
nargs
选项告诉ArgumentParser
我们希望指定该参数的次数,+
值表示至少一次或多次。然后type=int
告诉我们参数应该转换成整数。
一旦我们有了要应用操作的数字,我们需要了解操作本身:
parser.add_argument('-o', '--operation',
help="The operation to perform on numbers.",
choices=['add', 'sub', 'mul', 'div'], default='add')
在本例中,我们指定了一个选项(以破折号开始,-
),该选项可以作为-o
或--operation
提供。我们声明,唯一可能的值是'add'
、'sub'
、'mul'
或'div'
(提供不同的值会导致argparse
投诉),如果用户没有指定,默认值是add
。
作为最佳实践,我们的命令只打印结果;能够询问一些关于它将要做什么的日志记录是很方便的。因此,我们提供了verbose
选项,它驱动我们为命令启用的日志记录级别:
parser.add_argument("-v", "--verbose", action="store_true",
help="increase output verbosity")
如果提供了该选项,我们将只存储verbose
模式已启用(action="store_true"
将其设置为True
存储在opts.verbose
中),并且我们将相应地配置logging
模块,以便我们的log.info
仅在verbose
启用时可见。
最后,我们可以实际解析命令行选项,并将结果返回到opts
对象中:
opts = parser.parse_args()
一旦我们有了可用的选项,我们就可以配置日志记录,这样我们就可以读取verbose
选项并相应地进行配置:
logging.basicConfig(level=logging.INFO if opts.verbose else logging.WARNING)
一旦解析了选项并配置了logging
,其余的就只是在提供的数字集上实际执行预期的操作并打印结果:
operation = getattr(operator, opts.operation)
log.info('Applying %s to %s', opts.operation, opts.number)
print(functools.reduce(operation, opts.number))
如果您将命令行选项与第 1 章、容器和数据结构中的字典和回退配方混合使用,您可以扩展工具的行为,不仅从命令行读取选项,还可以从环境变量读取选项,当您不能完全控制命令的调用方式,但可以设置环境变量时,这通常非常方便。
有时,编写命令行工具是不够的,您需要能够提供某种交互。假设您想编写一个邮件客户端。在这种情况下,打电话mymail list
查看您的邮件,或者mymail read
从 shell 中读取特定邮件,等等,都不是很方便。此外,如果您想要实现有状态行为,例如mymail reply
实例应该回复您正在查看的当前邮件,这甚至可能是不可能的。
在这些情况下,交互式程序更好,Python 标准库提供了通过cmd
模块编写程序所需的所有工具。
我们可以尝试为我们的mymail
程序编写一个交互式 shell;它不会读真正的电子邮件,但我们会伪造足够的行为来展示一个功能齐全的 shell。
此配方的步骤如下所示:
cmd.Cmd
类允许我们启动交互式 shell 并基于它们实现命令:
EMAILS = [
{'sender': '[email protected]', 'subject': 'First email',
'body': 'This is my first email'},
{'sender': '[email protected]', 'subject': 'Second email',
'body': 'This is my second email'},
]
import cmd
import shlex
class MyMail(cmd.Cmd):
intro = 'Simple interactive email client.'
prompt = 'mymail> '
def __init__(self, *args, **kwargs):
super(MyMail, self).__init__(*args, **kwargs)
self.selected_email = None
def do_list(self, line):
"""list
List emails currently in the Inbox"""
for idx, email in enumerate(EMAILS):
print('[{idx}] From: {e[sender]} -
{e[subject]}'.format(
idx=idx, e=email
))
def do_read(self, emailnum):
"""read [emailnum]
Reads emailnum nth email from those listed in the Inbox"""
try:
idx = int(emailnum.strip())
except:
print('Invalid email index {}'.format(emailnum))
return
try:
email = EMAILS[idx]
except IndexError:
print('Email {} not found'.format(idx))
return
print('From: {e[sender]}\n'
'Subject: {e[subject]}\n'
'\n{e[body]}'.format(e=email))
# Track the last read email as the selected one for reply.
self.selected_email = idx
def do_reply(self, message):
"""reply [message]
Sends back an email to the author of the received email"""
if self.selected_email is None:
print('No email selected for reply.')
return
email = EMAILS[self.selected_email]
print('Replied to {e[sender]} with: {message}'.format(
e=email, message=message
))
def do_send(self, arguments):
"""send [recipient] [subject] [message]
Send a new email with [subject] to [recipient]"""
# Split the arguments with shlex
# so that we allow subject or message with spaces.
args = shlex.split(arguments)
if len(args) < 3:
print('A recipient, a subject and a message are
required.')
return
recipient, subject, message = args[:3]
if len(args) >= 4:
message += ' '.join(args[3:])
print('Sending email {} to {}: "{}"'.format(
subject, recipient, message
))
def complete_send(self, text, line, begidx, endidx):
# Provide autocompletion of recipients for send command.
return [e['sender'] for e in EMAILS if e['sender'].startswith(text)]
def do_EOF(self, line):
return True
if __name__ == '__main__':
MyMail().cmdloop()
- 启动脚本时应提供一个良好的交互式提示:
$ python /tmp/mymail.py
Simple interactive email client.
mymail> help
Documented commands (type help <topic>):
========================================
help list read reply send
Undocumented commands:
======================
EOF
- 如文件所述,我们应该能够阅读电子邮件列表,阅读特定电子邮件,并回复当前打开的电子邮件:
mymail> list
[0] From: author1@domain.com - First email
[1] From: author2@domain.com - Second email
mymail> read 0
From: author1@domain.com
Subject: First email
This is my first email
mymail> reply Thanks for your message!
Replied to author1@domain.com with: Thanks for your message!
- 然后,我们可以使用更高级的发送命令,该命令还可以自动完成新电子邮件的收件人:
mymail> help send
send [recipient] [subject] [message]
Send a new email with [subject] to [recipient]
mymail> send author
author1@domain.com author2@domain.com
mymail> send author2@domain.com "Saw your email" "I saw your message, thanks for sending it!"
Sending email Saw your email to author2@domain.com: "I saw your message, thanks for sending it!"
mymail>
cmd.Cmd
循环打印我们通过prompt
类属性提供的prompt
并等待命令。我们在prompt
之后编写的任何内容都将被拆分,第一部分将根据我们自己的子类提供的方法列表进行查找。
每当提供命令时,cmd.Cmd.cmdloop
调用关联的方法,然后再次启动。
任何以do_*
开头的方法都是命令,do_
之后的部分是命令名。如果在交互提示中使用了help
命令,则实现该命令的方法的任何 docstring 都将在我们的工具文档中报告。
Cmd
类不提供解析命令参数的工具,因此如果您的命令有多个参数,您必须自己拆分它们。在我们的例子中,我们依赖于shlex
,因此用户可以控制如何分割参数。这允许我们解析主题和消息,同时提供一种在其中包含空格的方法。否则,我们将无法知道主题的结束和消息的开始。
send
命令还支持通过complete_send
方法自动完成收件人。如果提供了一个complete_*
方法,当按下选项卡自动完成命令参数时,Cmd
会调用该方法。该方法接收需要完成的文本以及有关整行文本和光标当前位置的一些详细信息。由于没有对参数进行任何解析,因此光标的位置和整行文本有助于为每个参数提供不同的自动完成行为。在我们的例子中,我们只能自动完成收件人,因此不需要区分各种参数。
最后但并非最不重要的一点是,do_EOF
命令允许在按下Ctrl+D时退出命令行。否则,我们将不得不退出交互式 shell。这是Cmd
提供的约定,如果do_EOF
命令返回True
,则表示 shell 可以退出。
我们在第 2 章、文本管理中看到了对齐文本的方法,展示了在固定空间内对齐文本的可能解决方案。可用空间量在一个COLSIZE
常数中定义,该常数被选择为适合大多数具有三列的端子(大多数端子适合 80 列)。
但是,如果用户的终端窗口小于 60 列,会发生什么?我们的路线会被严重破坏。此外,在非常大的窗口上,虽然文本不会被破坏,但与窗口相比,它看起来太小了。
出于这个原因,当显示应该保留正确对齐属性的文本时,通常最好也考虑用户终端窗口的大小。
步骤如下:
shutil.get_terminal_size
功能可以对终端窗口大小提供指导,并在不可用的情况下提供回退。我们将根据第二章文本管理的对齐文本配方调整maketable
功能,以考虑终端大小:
import shutil
import textwrap, itertools
def maketable(cols):
term_size = shutil.get_terminal_size(fallback=(80, 24))
colsize = (term_size.columns // len(cols)) - 3
if colsize < 1:
raise ValueError('Column too small')
return '\n'.join(map(' | '.join, itertools.zip_longest(*[
[s.ljust(colsize) for s in textwrap.wrap(col, colsize)] for col in cols
], fillvalue=' '*colsize)))
- 现在可以在多列中打印任何文本,并查看它是否适合终端窗口的大小:
COLUMNS = 5
TEXT = ['Lorem ipsum dolor sit amet, consectetuer adipiscing elit. '
'Aenean commodo ligula eget dolor. Aenean massa. '
'Cum sociis natoque penatibus et magnis dis parturient montes, '
'nascetur ridiculus mus'] * COLUMNS
print(maketable(TEXT))
如果尝试调整终端窗口的大小并重新运行脚本,您将注意到文本现在总是以不同的方式对齐,以确保它适合可用空间。
我们的maketable
函数不再依赖于列大小的常数,而是通过获取终端宽度(term_size.columns
并将其除以要显示的列数来计算它。
三个字符总是被减去,因为我们想计算|
分隔符所消耗的空间。
通过shutil.get_terminal_size
获取端子的尺寸(term_size
,查看stdout
检查连接端子的尺寸。
如果无法检索大小,或者连接的不是终端的东西作为输出,则使用回退值。只需将脚本的输出重定向到文件,即可检查回退值是否按预期工作:
$ python myscript.py > output.txt
如果您打开output.txt
,您应该看到 80 个字符的回退被用作没有任何指定宽度的文件。
在某些情况下,特别是在编写系统工具时,可能需要将某些工作转移到另一个命令。例如,如果您必须解压缩文件,在许多情况下,将工作转移到gunzip
/zip
命令或尝试在 Python 中重现相同的行为可能是有意义的。
虽然 Python 中有很多方法来处理这项工作,但它们都有细微的差异,这可能会使任何开发人员的生活变得困难,因此最好有一个解决最常见问题的通用解决方案。
执行以下步骤:
- 结合
subprocess
和shlex
模块,我们可以构建在大多数情况下可靠的解决方案:
import shlex
import subprocess
def run(command):
try:
result = subprocess.check_output(shlex.split(command),
stderr=subprocess.STDOUT)
return 0, result
except subprocess.CalledProcessError as e:
return e.returncode, e.output
- 对于成功和失败的命令,很容易检查它是否按预期工作:
for path in ('/', '/should_not_exist'):
status, out = run('ls "{}"'.format(path))
if status == 0:
print('<Success>')
else:
print('<Error: {}>'.format(status))
print(out)
- 在我的系统上,这正确地列出了文件系统的根目录,并对不存在的路径进行了投诉:
<Success>
Applications
Developer
Library
LibraryPreferences
Network
...
<Error: 2>
ls: cannot access /should_not_exist: No such file or directory
调用命令本身是由subprocess.check_output
函数执行的,但是在调用它之前,我们需要在包含命令本身及其参数的列表中正确地拆分命令。依赖shlex
可以让我们驱动并区分参数应该如何分割。要查看其效果,您可以尝试在任何类似 Unix 的系统上比较run('ls / var')
和run('ls "/ var"')
。第一个会打印很多文件,而第二个会抱怨路径不存在。这是因为,在第一种情况下,我们实际上向ls
(/
和var
发送了两个不同的参数),而在第二种情况下,我们发送了一个单独的参数("/ var"
。如果我们不使用shlex
,就无法区分这两种情况。
然后,通过stderr=subprocess.STDOUT
选项处理命令失败的情况(我们可以检测到,因为run
函数将返回一个非零的状态),从而允许我们接收失败描述。
调用我们命令的繁重工作由subprocess.check_output
执行,实际上,它是subprocess.Popen
的一个包装器,将完成两件事:
- 使用
subprocess.Popen
生成所需的命令,配置为将输出写入管道,以便父进程(我们自己的程序)可以从该管道读取并获取输出。 - 生成线程以持续使用打开的与子进程通信的管道的内容。这确保了它们永远不会填满,因为如果它们填满了,我们调用的命令就会阻塞,因为它将无法写入更多的输出。
需要注意的一点是,我们的run
函数将查找能够满足请求的命令的可执行文件,但不会运行任何 shell 表达式。因此,不可能向它发送 shell 脚本。如果需要的话,shell=True
选项可以传递给subprocess.check_output
,但这是非常不鼓励的,因为它允许将 shell 代码注入到我们的程序中。
假设您想编写一个命令,打印用户选择的目录的内容;一个非常简单的解决方案可能是:
import sys
if len(sys.argv) < 2:
print('Please provide a directory')
sys.exit(1)
_, out = run('ls {}'.format(sys.argv[1]))
print(out)
现在,如果我们在run
中允许shell=True
并且用户提供了一个类似/var; rm -rf /
的路径,会发生什么?用户可能最终会删除整个系统磁盘,尽管这仍然受到我们依赖shlex
来分割参数的限制,但通过 shell 来运行命令仍然不安全。
在执行需要大量时间的工作时(通常是需要 I/O 到较慢的端点(如磁盘或网络),最好让用户知道您正在前进,还有多少工作要做。进度条虽然不精确,但它是一种非常好的方式,可以让用户了解到目前为止我们已经完成了多少工作,还有多少工作要做。
配方步骤如下所示:
- 进度条本身将由装饰器显示,以便我们可以将其应用于任何我们希望以最小努力报告进度的功能:
import shutil, sys
def withprogressbar(func):
"""Decorates ``func`` to display a progress bar while running.
The decorated function can yield values from 0 to 100 to
display the progress.
"""
def _func_with_progress(*args, **kwargs):
max_width, _ = shutil.get_terminal_size()
gen = func(*args, **kwargs)
while True:
try:
progress = next(gen)
except StopIteration as exc:
sys.stdout.write('\n')
return exc.value
else:
# Build the displayed message so we can compute
# how much space is left for the progress bar
itself.
message = '[%s] {}%%'.format(progress)
# Add 3 characters to cope for the %s and %%
bar_width = max_width - len(message) + 3
filled = int(round(bar_width / 100.0 * progress))
spaceleft = bar_width - filled
bar = '=' * filled + ' ' * spaceleft
sys.stdout.write((message+'\r') % bar)
sys.stdout.flush()
return _func_with_progress
- 然后我们需要一个函数,它实际执行一些我们可能想要报告进度的操作。在本例中,它只是一个等待指定时间的简单函数:
import time
@withprogressbar
def wait(seconds):
"""Waits ``seconds`` seconds and returns how long it waited."""
start = time.time()
step = seconds / 100.0
for i in range(1, 101):
time.sleep(step)
yield i # Send % of progress to withprogressbar
# Return how much time passed since we started,
# which is in fact how long we waited for real.
return time.time() - start
- 现在调用修饰函数应该告诉我们它等待了多长时间,并在等待时显示一个进度条:
print('WAITED', wait(5))
- 脚本运行时,您应该看到进度条和最终结果,如下所示:
$ python /tmp/progress.py
[=====================================] 100%
WAITED 5.308781862258911
所有的工作都是通过withprogressbar
功能完成的。它充当一个装饰器,所以我们可以将它应用于任何具有@withprogressbar
语法的函数。
这非常方便,因为报告进度的代码与实际执行工作的代码是隔离的,这允许我们在许多不同的情况下重用它。
为了使装饰器在函数本身运行时与装饰函数交互,我们依赖 Python 生成器:
gen = func(*args, **kwargs)
while True:
try:
progress = next(gen)
except StopIteration as exc:
sys.stdout.write('\n')
return exc.value
else:
# display the progressbar
当我们调用修饰函数(在我们的示例中为wait
函数)时,我们实际上是从我们的修饰器调用_func_with_progress
。函数要做的第一件事是调用修饰函数:
gen = func(*args, **kwargs)
由于修饰后的函数包含一个yield progress
语句,所以当它想要显示某个进度时(wait
中for
循环中的yield i
,函数将返回generator
。
每当生成器遇到一个yield progress
语句时,我们都会将其作为应用于生成器的下一个函数的返回值接收回来:
progress = next(gen)
然后,我们可以显示我们的进度并再次调用next(gen)
,这样修饰后的函数可以向前移动并返回一个新的进度(修饰后的函数当前在yield
处暂停,并且在调用next
之前不会处理,这就是为什么我们的整段代码都被包装在while True:
中的原因),让函数永远继续,直到完成它必须完成的操作)。
一旦修饰函数完成了它必须完成的所有工作,它将引发一个StopIteration
异常,该异常将包含修饰函数在.value
属性中返回的值。
当我们想要将任何返回的值传播给调用方时,我们只需要自己返回该值。如果被修饰的函数应该返回它所做工作的一些结果,例如一个download(url)
函数应该返回对下载文件的引用,那么这一点尤其重要。
返回之前,我们打印一行新行:
sys.stdout.write('\n')
这样可以确保进度条后面的任何内容都不会与进度条本身重叠,而是打印在新行上。
然后我们只剩下显示进度条本身。配方的进度条部分的核心仅基于两行代码:
sys.stdout.write((message+'\r') % bar)
sys.stdout.flush()
这两行将确保我们的信息在屏幕上打印,而不会像print
通常那样移动到新的行。相反,这将移回同一行的开头。尝试用'\n'
替换'\r'
,您将立即看到差异。使用'\r'
,您将看到单个进度条从 0-100%移动,而使用'\n'
,您将看到许多进度条正在打印。
然后需要调用sys.stdout.flush()
以确保进度条实际显示,因为通常只在新行上刷新输出,并且由于我们只是一遍又一遍地打印同一行,除非我们显式地这样做,否则不会刷新。
现在我们知道了如何绘制进度条并对其进行更新,剩下的函数将用于计算进度条以显示:
message = '[%s] {}%%'.format(progress)
bar_width = max_width - len(message) + 3 # Add 3 characters to cope for the %s and %%
filled = int(round(bar_width / 100.0 * progress))
spaceleft = bar_width - filled
bar = '=' * filled + ' ' * spaceleft
首先,我们计算message
,这是我们想要在屏幕上显示的内容。消息是在没有进度条的情况下计算的,对于进度条,我们将留下一个%s
占位符,以便稍后填充它。
我们这样做是为了在显示了周围的括号和百分比后,我们知道还有多少空间留给条本身。该值为bar_width
,通过从消息大小中减去最大屏幕宽度(在函数开头用shutil.get_terminal_size()
检索)计算得出。我们必须添加的三个额外字符将解决我们的消息中%s
和%%
所占用的空间,一旦消息显示到屏幕上,这些空间实际上就不存在了,因为%s
将被条本身替换,%%
将解析为单个%
。
一旦我们知道酒吧本身有多少可用空间,我们就会计算出该空间中有多少应该用'='
(工作已经完成的部分)填充,有多少应该用空空间' '
(工作尚未完成的部分)填充。这是通过计算屏幕大小来实现的,以填充并匹配我们的进度百分比:
filled = int(round(bar_width / 100.0 * progress))
一旦我们知道用'='
填充多少,剩下的就只是空的空间:
spaceleft = bar_width - filled
因此,我们可以用填充的等号和spaceleft
空白来建造我们的酒吧:
bar = '=' * filled + ' ' * spaceleft
一旦该条准备就绪,它将通过使用%
字符串格式化操作符注入屏幕上显示的消息中:
sys.stdout.write((message+'\r') % bar)
如果您注意到,我混合了两种类型的字符串格式(str.format
和%
。我这样做是因为我认为这样可以使格式化过程更加清晰,而不必在每个格式化步骤中正确地解释转义。
虽然现在不太常见,但能够创建基于角色的交互式用户界面仍然有很大的价值,特别是当只需要一个带有 OK 按钮的简单消息对话框或 OK/cancel 对话框时;您可以通过一个漂亮的文本对话框将用户的注意力引导到他们身上,从而获得更好的效果。
curses
库仅包含在 Python for Unix 系统中,因此 Windows 用户可能需要一个解决方案,例如 CygWin 或 Linux 子系统 for Windows,以便能够拥有包含curses
支持的 Python 设置。
对于此配方,请执行以下步骤:
- 我们将制作一个
MessageBox.show
方法,我们可以在需要时使用它来显示消息框。MessageBox
类将能够显示带有 OK 或 OK/cancel 按钮的消息框:
import curses
import textwrap
import itertools
class MessageBox(object):
@classmethod
def show(cls, message, cancel=False, width=40):
"""Show a message with an Ok/Cancel dialog.
Provide ``cancel=True`` argument to show a cancel button
too.
Returns the user selected choice:
- 0 = Ok
- 1 = Cancel
"""
dialog = MessageBox(message, width, cancel)
return curses.wrapper(dialog._show)
def __init__(self, message, width, cancel):
self._message = self._build_message(width, message)
self._width = width
self._height = max(self._message.count('\n')+1, 3) + 6
self._selected = 0
self._buttons = ['Ok']
if cancel:
self._buttons.append('Cancel')
def _build_message(self, width, message):
lines = []
for line in message.split('\n'):
if line.strip():
lines.extend(textwrap.wrap(line, width-4,
replace_whitespace=False))
else:
lines.append('')
return '\n'.join(lines)
def _show(self, stdscr):
win = curses.newwin(self._height, self._width,
(curses.LINES - self._height) // 2,
(curses.COLS - self._width) // 2)
win.keypad(1)
win.border()
textbox = win.derwin(self._height - 1, self._width - 3,
1, 2)
textbox.addstr(0, 0, self._message)
return self._loop(win)
def _loop(self, win):
while True:
for idx, btntext in enumerate(self._buttons):
allowedspace = self._width // len(self._buttons)
btn = win.derwin(
3, 10,
self._height - 4,
(((allowedspace-10)//2*idx) + allowedspace*idx
+ 2)
)
btn.border()
flag = 0
if idx == self._selected:
flag = curses.A_BOLD
btn.addstr(1, (10-len(btntext))//2, btntext, flag)
win.refresh()
key = win.getch()
if key == curses.KEY_RIGHT:
self._selected = 1
elif key == curses.KEY_LEFT:
self._selected = 0
elif key == ord('\n'):
return self._selected
- 然后我们可以通过
MessageBox.show
方法使用:
MessageBox.show('Hello World,\n\npress enter to continue')
- 我们甚至可以使用它来检查用户的选择:
if MessageBox.show('Are you sure?\n\npress enter to confirm',
cancel=True) == 0:
print("Yeah! Let's continue")
else:
print("That's sad, hope to see you soon")
消息框基于curses
库,允许我们在屏幕上绘制基于文本的图形。当我们使用该对话框时,我们将进入全屏文本图形模式,一旦退出,我们将恢复以前的终端状态。
这允许我们在更复杂的程序中交错MessageBox
类,而不必使用curses
编写整个程序。这是由MessageBox.show
类方法中使用的curses.wrapper
函数允许的,用于包装实际显示该框的MessageBox._show
方法。
要显示的消息是通过MessageBox._build_message
方法在MessageBox
初始值设定项中准备的,以确保在消息过长时进行包装,并正确处理多行文本。消息框的高度取决于消息的长度和产生的行数,再加上我们总是包括的六行,以添加边框(使用两行)和按钮(使用四行)。
然后,MessageBox._show
方法创建实际的框窗口,为其添加边框,并在其中显示消息。消息显示后,我们输入MessageBox._loop
,等待用户在确定和取消之间进行选择。
MessageBox._loop
方法通过win.derwin
功能,将所有需要的按钮用自己的边框绘制出来。每个按钮宽 10 个字符,高 3 个字符,将根据allowedspace,
的值显示,该值为每个按钮保留相等的框空间。然后,一旦按钮框被绘制出来,它将检查当前显示的按钮是否是选中的按钮;如果是,则按钮的标签显示为粗体文本。这允许用户知道当前选择的选项。
一旦两个按钮都画好了,我们调用win.refresh()
在屏幕上实际显示我们刚才画的内容。
然后我们等待用户按任意键来相应地更新屏幕;左/右箭头键将在确定/取消选择之间切换,输入将确认当前选择。
如果用户更改了所选按钮(按左键或右键),我们将再次循环并重新绘制按钮。我们只需要重新绘制按钮,因为屏幕的其余部分没有改变;窗口边框和消息仍然是相同的,因此不需要在其上绘制。除非调用win.erase()
方法,否则屏幕的内容始终保持不变,因此我们不需要重新绘制屏幕上不需要更新的部分。
通过对此保持明智,我们还可以避免重新绘制按钮本身。这是因为当“取消/确定”文本从粗体变为普通文本时,只需重新绘制该文本,反之亦然。
一旦用户按下回车键,我们退出循环并返回当前选择的 OK 和 cancel 选项。这允许调用方根据用户的选择进行操作。
在编写基于控制台的软件时,有时需要要求用户提供长文本输入,而这些输入无法通过命令选项轻松提供。
在 Unix 世界中很少有这样的例子,比如一次编辑crontab
或调整多个配置选项。他们中的大多数依赖于启动一个成熟的第三方编辑器,如nano或vim,但可以轻松推出一个解决方案,在许多情况下,仅使用 Python 标准库就足够了,这样我们的工具就可以请求长时间或复杂的用户输入。
curses
库仅包含在用于 Unix 系统的 Python 中,因此 Windows 用户可能需要一个解决方案,例如 CygWin 或用于 Windows 的 Linux 子系统,以便能够拥有包含curses
支持的 Python 设置。
对于此配方,请执行以下步骤:
- Python 标准库提供了一个带有 Type T1 的多行文本编辑器的基础,例如键绑定。我们只需要稍微扩展一下,添加一些必需的行为和修复:
import curses
from curses.textpad import Textbox, rectangle
class TextInput(object):
@classmethod
def show(cls, message, content=None):
return curses.wrapper(cls(message, content)._show)
def __init__(self, message, content):
self._message = message
self._content = content
def _show(self, stdscr):
# Set a reasonable size for our input box.
lines, cols = curses.LINES - 10, curses.COLS - 40
y_begin, x_begin = (curses.LINES - lines) // 2,
(curses.COLS - cols) // 2
editwin = curses.newwin(lines, cols, y_begin, x_begin)
editwin.addstr(0, 1, "{}: (hit Ctrl-G to submit)"
.format(self._message))
rectangle(editwin, 1, 0, lines-2, cols-1)
editwin.refresh()
inputwin = curses.newwin(lines-4, cols-2, y_begin+2,
x_begin+1)
box = Textbox(inputwin)
self._load(box, self._content)
return self._edit(box)
def _load(self, box, text):
if not text:
return
for c in text:
box._insert_printable_char(c)
def _edit(self, box):
while True:
ch = box.win.getch()
if not ch:
continue
if ch == 127:
ch = curses.KEY_BACKSPACE
if not box.do_command(ch):
break
box.win.refresh()
return box.gather()
- 然后我们可以从用户处读取输入:
result = TextInput.show('Insert your name:')
print('Your name:', result)
- 我们甚至可以要求它编辑现有文本:
result = TextInput.show('Insert your name:',
content='Some Text\nTo be edited')
print('Your name:', result)
一切都从TextInput._show
方法开始,它准备了两个窗口;第一个绘制帮助文本(在我们的示例中为'Insert your name:'
)和文本区域的边框框。
一旦这些内容被绘制出来,它就会创建一个新的专用于Textbox
的窗口,因为文本框可以自由地插入、删除和编辑该窗口的内容。
如果我们有现有内容(content= argument
,则TextInput._load
功能会在继续编辑之前将其插入文本框。所提供内容中的每个字符都通过Textbox._insert_printable_char
功能注入文本框窗口。
然后,我们最终可以进入编辑循环(即TextInput._edit
方法),在这里,我们可以监听按键并做出相应的反应。事实上,Textbox.do_command
已经为我们完成了大部分工作,所以我们只需将按下的键转发给它,将字符插入到文本中或对特殊命令作出反应。该方法唯一的特殊之处是我们检查字符 127,即退格,并将其替换为curses.KEY_BACKSPACE
,因为当按下退格键时,并非所有终端发送相同的代码。一旦do_command
处理完字符,我们可以刷新窗口,以便出现任何新文本,然后再次循环。
当用户按 Enter T1 表示 Ctrl AUT2 THE+SUTT3EG G TY4 T4 时,编辑器将考虑文本完成并退出编辑循环。在此之前,我们调用Textbox.gather
获取文本编辑器的全部内容并将其发送回调用方。
需要注意的一点是,内容实际上是从curses
窗口的内容中获取的。所以,它实际上包含了你在屏幕上看到的所有空白。出于这个原因,Textbox.gather
方法将删除空白,以避免返回的响应大部分是围绕文本的空白。如果您试图编写包含多个空行的内容,这一点非常清楚;它们都将与剩余的空间一起被剥离。