本章介绍了以下秘籍:
- 用 IEF 快速启动
- 与 IEF 接触
- 这是一道很好的汤
- 寻找病毒
- 收集情报
- 完全被动
技术已经走过了漫长的道路,随着技术的进步,工具被广泛使用的程度也发生了变化。事实上,由于互联网上的工具数量巨大,认识到这些工具的存在是成功的一半。其中一些工具是公开的,可以用于取证目的。在本章中,我们将学习如何通过 Python 与网站交互并识别恶意软件,包括自动审查潜在的恶意域、IP 地址或文件。
我们首先来看看如何操作互联网证据查找器(IEF)结果,并在应用程序上下文之外执行额外的处理。我们还探讨了如何使用 VirusShare、PassiveTotal 和 VirusTotal 等服务分别创建已知恶意软件的哈希集、查询可疑域解析以及识别已知的坏域或文件。在这些脚本之间,您将熟悉如何使用 Python 与 API 交互。
本章中的脚本侧重于解决特定问题,并按复杂性排序:
- 学习从 IEF 结果中提取数据
- 从 Google Chrome 处理缓存的 Yahoo 联系人数据
- 用靓汤保存网页
- 从 VirusShare 创建 X-Ways 兼容的哈希集
- 使用 PassiveTotal 自动审查粗略的域或 IP 地址
- 使用 VirusTotal 自动识别已知的坏文件、域或 IP
Visit www.packtpub.com/books/content/support to download the code bundle for this chapter.
食谱难度:简单
Python 版本:3.5
操作系统:任何
此秘籍将作为将所有报告从 IEF 转储到 CSV 文件的快速方法,并介绍如何与 IEF 结果交互。IEF 将数据存储在 SQLite 数据库中,我们在第 3 章中对其进行了深入探讨,深入探讨了移动取证秘籍。由于 IEF 可以配置为扫描特定类别的信息,所以它并不像为每个 IEF 数据库转储集合表那样简单。相反,我们必须动态地确定此信息,然后与所述表交互。此秘籍将动态标识 IEF 数据库中的结果表,并将其转储到相应的 CSV 文件中。此过程可以在任何 SQLite 数据库上执行,以快速将其内容转储到 CSV 文件以供查看。
此脚本中使用的所有库都存在于 Python 的标准库中。对于此脚本,请确保在执行程序后生成 IEF 结果数据库。我们使用 IEF 版本 6.8.9.5774 生成用于开发此秘籍的数据库。例如,在 IEF 处理完取证图像后,您应该会看到一个名为IEFv6.db
的文件。这是我们将在此秘籍中与之交互的数据库。
我们将采用以下步骤从 IEF 结果数据库中提取数据:
- 连接到数据库。
- 查询数据库以标识所有表。
- 将结果表写入单个 CSV 文件。
首先,我们导入所需的库来处理参数解析、编写电子表格以及与 SQLite 数据库交互。
from __future__ import print_function
import argparse
import csv
import os
import sqlite3
import sys
这个秘籍的命令行处理程序相对简单。它接受两个位置参数IEF_DATABASE
和OUTPUT_DIR
,分别表示IEFv6.db
文件的文件路径和所需的输出位置。
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("IEF_DATABASE", help="Input IEF database")
parser.add_argument("OUTPUT_DIR", help="Output DIR")
args = parser.parse_args()
在调用脚本的main()
函数之前,我们像往常一样执行输入验证步骤。首先,我们检查输出目录,如果它不存在,就创建它。然后,我们确认 IEF 数据库按预期存在。如果一切正常,我们执行main()
功能,并向其提供两个用户提供的输入:
if not os.path.exists(args.OUTPUT_DIR):
os.makedirs(args.OUTPUT_DIR)
if os.path.exists(args.IEF_DATABASE) and \
os.path.isfile(args.IEF_DATABASE):
main(args.IEF_DATABASE, args.OUTPUT_DIR)
else:
print("[-] Supplied input file {} does not exist or is not a "
"file".format(args.IEF_DATABASE))
sys.exit(1)
main()
函数的启动非常简单。我们将状态消息打印到控制台,并创建到数据库的sqlite3
连接,以执行必要的 SQLite 查询:
def main(database, out_directory):
print("[+] Connecting to SQLite database")
conn = sqlite3.connect(database)
c = conn.cursor()
接下来,我们需要查询数据库,以确定存在的所有表。请注意,我们执行了一个相当复杂的查询来执行此操作。如果您熟悉 SQLite,您可能会摇头问为什么我们没有执行.table
命令。不幸的是,在 Python 中,这并不是那么容易做到的。相反,必须执行以下命令才能达到预期目标。
如前所述,Cursor
以元组列表的形式返回结果。我们执行的命令返回有关数据库中每个表的许多详细信息。在本例中,我们只对提取表的名称感兴趣。我们使用列表理解来实现这一点,首先从游标对象获取所有结果,然后如果名称符合某些条件,则将每个结果的第二个元素附加到表列表中。我们选择忽略以_
开头或以_DATA
结尾的表名。通过查看这些表,它们包含实际的缓存文件内容,而不是每个记录的元数据。
print("[+] Querying IEF database for list of all tables to extract")
c.execute("select * from sqlite_master where type='table'")
# Remove tables that start with "_" or end with "_DATA"
tables = [x[2] for x in c.fetchall() if not x[2].startswith('_') and
not x[2].endswith('_DATA')]
有了表名列表,我们现在可以遍历每一个表名,并将其内容提取到一个变量中。在此之前,我们将向控制台打印更新状态消息,以通知用户脚本的当前执行状态。为了编写 CSV,我们需要首先确定给定表的列名。正如我们在第 3 章中看到的,这是使用pragma table_info
命令执行的。通过一些简单的列表理解,我们只提取列的名称,并将它们存储在一个变量中供以后使用。
完成后,我们执行最喜欢的、最简单的 SQL 查询,并从每个表中选择所有(*
数据。使用游标对象上的fetchall()
方法,我们将包含整个表数据的元组列表存储在table_data
变量中:
print("[+] Dumping {} tables to CSV files in {}".format(
len(tables), out_directory))
for table in tables:
c.execute("pragma table_info('{}')".format(table))
table_columns = [x[1] for x in c.fetchall()]
c.execute("select * from '{}'".format(table))
table_data = c.fetchall()
现在,我们可以开始将每个表的数据写入相应的 CSV 文件。为了简单起见,每个 CSV 文件的名称只是表名和附加的.csv
扩展名。我们使用os.path.join()
将输出目录与所需的 CSV 名称组合。
接下来,我们将状态更新打印到控制台,并开始写入每个 CSV 文件的过程。这是通过首先将表格列名作为电子表格的标题,然后是表格的内容来完成的。我们使用**writerows()
**方法将元组列表写入一行,而不是创建一个不必要的循环,并对每个元组重复执行writerow()
。
csv_name = table + '.csv'
csv_path = os.path.join(out_directory, csv_name)
print('[+] Writing {} table to {} CSV file'.format(table,
csv_name))
with open(csv_path, "w", newline="") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(table_columns)
csv_writer.writerows(table_data)
运行此脚本时,我们可以看到发现的工件并提取文本信息的 CSV 报告:
完成脚本后,我们可以看到有关工件的信息,如以下报告片段所示:
难度:中等
Python 版本:3.5
操作系统:任何
我们可以通过操纵和收集来自 IEF 不一定支持的工件的信息,进一步利用 SQLite 数据库中的 IEF 结果。当新工件被发现并且不受支持时,这一点尤为重要。随着互联网和许多使用互联网的企业不断变化,软件跟上每一个新的工件是不现实的。在本例中,我们将查看缓存的 Yahoo Mail 联系人,这些联系人作为使用 Yahoo Mail 的副产品存储在本地系统中。
此脚本中使用的所有库都存在于 Python 的标准库中。同样,正如在前面的秘籍中一样,如果您想继续,您将需要一个 IEF 结果数据库。我们使用 IEF 版本 6.8.9.5774 生成用于开发此秘籍的数据库。除此之外,您可能还需要生成 Yahoo 邮件流量,以创建缓存 Yahoo 邮件联系人的必要情况。在本例中,我们使用 Google Chrome 浏览器使用 Yahoo Mail,因此将查看 Google Chrome 缓存数据。这个秘籍虽然特定于 Yahoo,但说明了如何使用 IEF 结果数据库进一步处理工件并识别其他相关信息。
秘籍遵循以下基本原则:
- 连接到输入数据库。
- 在 Google Chrome 缓存表中查询雅虎邮件联系人记录。
- 处理联系人缓存 JSON 数据和元数据。
- 将所有相关数据写入 CSV。
首先,我们导入所需的库来处理参数解析、编写电子表格、处理 JSON 数据以及与 SQLite 数据库交互。
from __future__ import print_function
import argparse
import csv
import json
import os
import sqlite3
import sys
此秘籍的命令行处理程序与第一个秘籍没有区别。它接受两个位置参数,IEF_DATABASE
和OUTPUT_DIR,
分别表示指向IEFv6.db
文件和所需输出位置的文件路径。
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("IEF_DATABASE", help="Input IEF database")
parser.add_argument("OUTPUT_CSV", help="Output CSV")
args = parser.parse_args()
同样,我们执行与本章第一个秘籍相同的数据验证步骤。如果它没有坏,为什么要修理它?验证后,我们执行main()
函数,并向其提供两个验证输入。
directory = os.path.dirname(args.OUTPUT_CSV)
if not os.path.exists(directory):
os.makedirs(directory)
if os.path.exists(args.IEF_DATABASE) and \
os.path.isfile(args.IEF_DATABASE):
main(args.IEF_DATABASE, args.OUTPUT_CSV)
else:
print(
"[-] Supplied input file {} does not exist or is not a "
"file".format(args.IEF_DATABASE))
sys.exit(1)
main()
函数通过创建到输入 SQLite 数据库的连接再次启动(我们保证此方法与第一个方法不同:继续阅读)。
def main(database, out_csv):
print("[+] Connecting to SQLite database")
conn = sqlite3.connect(database)
c = conn.cursor()
我们现在可以开始搜索数据库中的所有 Yahoo Mail 联系人缓存记录实例。请注意,我们正在寻找的 URL 片段相当特定于我们的目的。这将确保我们不会得到任何误报。URL 末尾的百分号(%
是 SQLite 通配符等效字符。如果输入目录没有 Chrome cache records 表、已损坏或已加密,我们将在try
和except
语句中执行查询。
print("[+] Querying IEF database for Yahoo Contact Fragments from "
"the Chrome Cache Records Table")
try:
c.execute(
"select * from 'Chrome Cache Records' where URL like "
"'https://data.mail.yahoo.com"
"/classicab/v2/contacts/?format=json%'")
except sqlite3.OperationalError:
print("Received an error querying the database -- database may be"
"corrupt or not have a Chrome Cache Records table")
sys.exit(2)
如果我们能够成功执行查询,我们将返回的元组列表存储到contact_cache
变量中。此变量是process_contacts()
函数的唯一输入,该函数返回一个嵌套列表结构,便于 CSV 编写器使用。
contact_cache = c.fetchall()
contact_data = process_contacts(contact_cache)
write_csv(contact_data, out_csv)
process_contacts()
功能首先向控制台打印状态消息,设置results
列表,然后遍历每个联系人缓存记录。除了原始数据之外,每个记录都有许多与其关联的元数据元素。这包括 URL、文件系统上缓存的位置以及第一次访问、最后一次访问和最后一次同步时间的时间戳。
我们使用json.loads()
方法将从表中提取的 JSON 数据存储到contact_json
变量中,以便进一步操作。JSON 数据中的total
和count
键存储了 Yahoo Mail 联系人的总数以及 JSON 缓存数据中的联系人数。
def process_contacts(contact_cache):
print("[+] Processing {} cache files matching Yahoo contact cache "
" data".format(len(contact_cache)))
results = []
for contact in contact_cache:
url = contact[0]
first_visit = contact[1]
last_visit = contact[2]
last_sync = contact[3]
loc = contact[8]
contact_json = json.loads(contact[7].decode())
total_contacts = contact_json["total"]
total_count = contact_json["count"]
在从 contact JSON 提取联系人数据之前,首先需要确保它具有联系人。如果没有,我们继续下一个缓存记录,希望在那里找到联系人。另一方面,如果我们确实有联系人,我们将一些变量初始化为空字符串。通过将变量批量分配给空字符串的元组,可以在一行中实现这一点:
if "contacts" not in contact_json:
continue
for c in contact_json["contacts"]:
name, anni, bday, emails, phones, links = (
"", "", "", "", "", "")
初始化这些变量后,我们开始在每个联系人中查找它们。有时,特定的缓存记录不会保留完整的联系方式,如"anniversary"
键。因此,我们初始化了这些变量,以避免在给定缓存记录中不存在特定键时引用不存在的变量。
对于name
、"anniversary"
和"birthday"
键,我们需要执行一些字符串连接,以便它们采用方便的格式。emails
、phones
和links
变量可能有多个结果,因此,我们使用列表理解和join()
方法创建这些各自元素的逗号分隔列表。这一行代码的优点在于,如果只有一封电子邮件、电话号码或链接,它不会不必要地在该元素后面加逗号。
if "name" in c:
name = c["name"]["givenName"] + " " + \
c["name"]["middleName"] + " " + c["name"]["familyName"]
if "anniversary" in c:
anni = c["anniversary"]["month"] + \
"/" + c["anniversary"]["day"] + "/" + \
c["anniversary"]["year"]
if "birthday" in c:
bday = c["birthday"]["month"] + "/" + \
c["birthday"]["day"] + "/" + c["birthday"]["year"]
if "emails" in c:
emails = ', '.join([x["ep"] for x in c["emails"]])
if "phones" in c:
phones = ', '.join([x["ep"] for x in c["phones"]])
if "links" in c:
links = ', '.join([x["ep"] for x in c["links"]])
我们使用get()
方法来处理company
、jobTitle
和notes
部分。因为它们是简单的键和值对,所以我们不需要对它们进行任何额外的字符串处理。相反,使用get()
方法,我们可以提取键的值,或者,如果它不存在,将默认值设置为空字符串。
company = c.get("company", "")
title = c.get("jobTitle", "")
notes = c.get("notes", "")
处理完联系人数据后,我们将元数据列表和提取的数据元素添加到results
列表中。处理完每个联系人和每个缓存记录后,我们会将results
列表返回给main()
函数,该函数会传递给 CSV writer 函数。
results.append([
url, first_visit, last_visit, last_sync, loc, name, bday,
anni, emails, phones, links, company, title, notes,
total_contacts, total_count])
return results
write_csv()
方法将嵌套的results
列表结构和输出文件路径作为其输入。在将状态消息打印到控制台之后,我们使用通常的策略将结果写入输出文件。也就是说,我们首先写入 CSV 的标题,然后是实际的联系人数据。由于嵌套列表结构,我们可以使用writerows()
方法将所有结果写入一行文件中。
def write_csv(data, output):
print("[+] Writing {} contacts to {}".format(len(data), output))
with open(output, "w", newline="") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow([
"URL", "First Visit (UTC)", "Last Visit (UTC)",
"Last Sync (UTC)", "Location", "Contact Name", "Bday",
"Anniversary", "Emails", "Phones", "Links", "Company", "Title",
"Notes", "Total Contacts", "Count of Contacts in Cache"])
csv_writer.writerows(data)
此屏幕截图演示了此脚本可以提取的数据类型示例:
难度:中等
Python 版本:3.5
操作系统:任何
在这个食谱中,我们利用Beautiful Soup库创建了一个网站保存工具。这是一个用于处理标记语言(如 HTML 或 XML)的库,可用于轻松处理这些类型的数据结构。我们将使用它在几行代码中识别和提取网页中的所有链接。该脚本旨在展示一个非常简单的网站保存脚本示例;它决不是要取代市场上现有的软件。
此秘籍需要安装第三方库bs4
。可通过以下命令安装此模块。此脚本中使用的所有其他库都存在于 Python 的标准库中。
pip install bs4==0.0.1
Learn more about the bs4
library; visit https://www.crummy.com/software/BeautifulSoup/bs4/doc/.
我们将在此秘籍中执行以下步骤:
- 访问索引网页并识别所有初始链接。
- 通过所有已知链接递归到:
- 查找其他链接并将其添加到队列中。
- 生成每个网页的
SHA-256
哈希。 - 写入并验证网页输出到目标目录。
- 记录相关活动和散列结果。
首先,我们导入所需的库来处理参数解析、解析 HTML 数据、解析日期、散列文件、记录数据以及与网页交互。我们还设置了一个变量,用于以后构造秘籍的日志组件。
from __future__ import print_function
import argparse
from bs4 import BeautifulSoup, SoupStrainer
from datetime import datetime
import hashlib
import logging
import os
import ssl
import sys
from urllib.request import urlopen
import urllib.error
logger = logging.getLogger(__name__)
此秘籍的命令行处理程序接受两个位置输入,DOMAIN
和OUTPUT_DIR
,分别表示要保留的网站 URL 和所需的输出目录。可选的-l
参数可用于指定日志文件路径的位置。
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("DOMAIN", help="Website Domain")
parser.add_argument("OUTPUT_DIR", help="Preservation Output Directory")
parser.add_argument("-l", help="Log file path",
default=__file__[:-3] + ".log")
args = parser.parse_args()
现在,我们将使用默认路径或用户指定的路径为脚本设置日志记录。使用第 1 章中的日志格式,我们指定了一个文件和流处理程序,以保持用户在循环中,并记录采集过程。
logger.setLevel(logging.DEBUG)
msg_fmt = logging.Formatter("%(asctime)-15s %(funcName)-10s"
"%(levelname)-8s %(message)s")
strhndl = logging.StreamHandler(sys.stderr)
strhndl.setFormatter(fmt=msg_fmt)
fhndl = logging.FileHandler(args.l, mode='a')
fhndl.setFormatter(fmt=msg_fmt)
logger.addHandler(strhndl)
logger.addHandler(fhndl)
设置日志后,我们将记录有关脚本执行上下文的一些详细信息,包括提供的参数和操作系统详细信息。
logger.info("Starting BS Preservation")
logger.debug("Supplied arguments: {}".format(sys.argv[1:]))
logger.debug("System " + sys.platform)
logger.debug("Version " + sys.version)
我们在所需的输出目录上执行一些额外的输入验证。完成这些步骤后,我们调用main()
函数并将网站 URL 和输出目录传递给它。
if not os.path.exists(args.OUTPUT_DIR):
os.makedirs(args.OUTPUT_DIR)
main(args.DOMAIN, args.OUTPUT_DIR)
main()
功能用于执行一些任务。首先,它通过删除实际名称之前任何不必要的元素来提取网站的基本名称。例如,https://google.com 成为谷歌网站。我们还创建了集合link_queue
,它将保存在网页上找到的所有唯一链接。
我们对输入 URL 执行一些额外的验证。在开发过程中,当 URL 前面没有https://
或http://
时,我们遇到了一些错误,因此我们检查此处是否存在这种情况,并退出脚本,如果它们不存在,则通知用户需求。如果一切正常,我们就可以访问基本网页了。为此,我们创建未经验证的 SSL 上下文,以避免访问网页时出错。
def main(website, output_dir):
base_name = website.replace(
"https://", "").replace("http://", "").replace("www.", "")
link_queue = set()
if "http://" not in website and "https://" not in website:
logger.error(
"Exiting preservation - invalid user input: {}".format(
website))
sys.exit(1)
logger.info("Accessing {} webpage".format(website))
context = ssl._create_unverified_context()
接下来,在一个try-except
块中,我们使用urlopen()
方法打开到具有未验证 SSL 上下文的网站的连接,并读取网页数据。如果在尝试访问网页时收到错误,我们将在退出脚本之前打印并记录状态消息。如果成功,我们将记录一条成功消息并继续执行脚本。
try:
index = urlopen(website, context=context).read().decode("utf-8")
except urllib.error.HTTPError as e:
logger.error(
"Exiting preservation - unable to access page: {}".format(
website))
sys.exit(2)
logger.debug("Successfully accessed {}".format(website))
对于第一个网页,我们调用write_output()
函数将其写入输出目录,调用find_links()
函数识别网页上的所有链接。具体而言,此功能尝试识别网站上的所有内部链接。我们将立即探讨这两种功能。
在识别第一个页面上的链接后,我们将两条状态消息打印到控制台,然后调用recurse_pages()
方法迭代并发现发现的网页上的所有链接,并将它们添加到队列集中。完成了main()
功能;现在让我们看一下函数的支持类型转换,从write_output()
方法开始。
write_output(website, index, output_dir)
link_queue = find_links(base_name, index, link_queue)
logger.info("Found {} initial links on webpage".format(
len(link_queue)))
recurse_pages(website, link_queue, context, output_dir)
logger.info("Completed preservation of {}".format(website))
write_output()
方法有几个参数:网页的 URL、网页数据、输出目录和可选的反参数。默认情况下,如果函数调用中未提供此参数,则将其设置为零。计数器参数用于将循环迭代编号附加到输出文件,以避免写入同名文件。我们首先删除输出文件名称中一些不必要的字符,这些字符可能会导致输出文件创建不必要的目录。我们还将输出目录与 URL 目录连接起来,并使用os.makedirs()
创建它们。
def write_output(name, data, output_dir, counter=0):
name = name.replace("http://", "").replace("https://", "").rstrip("//")
directory = os.path.join(output_dir, os.path.dirname(name))
if not os.path.exists(directory) and os.path.dirname(name) != "":
os.makedirs(directory)
现在,我们记录一些关于我们正在编写的网页的详细信息。首先,我们记录文件的名称和输出目标。然后,我们使用hash_data()
方法记录从网页读取的数据的散列。我们为输出文件创建 path 变量,并附加计数器字符串以避免覆盖资源。然后,我们打开输出文件并将网页内容写入其中。最后,我们通过调用hash_file()
方法记录输出文件哈希。
logger.debug("Writing {} to {}".format(name, output_dir))
logger.debug("Data Hash: {}".format(hash_data(data)))
path = os.path.join(output_dir, name)
path = path + "_" + str(counter)
with open(path, "w") as outfile:
outfile.write(data)
logger.debug("Output File Hash: {}".format(hash_file(path)))
hash_data()
方法真的很简单。我们读入 UTF-8 编码的数据,然后使用与前面秘籍中相同的方法生成它的SHA-256
散列。
def hash_data(data):
sha256 = hashlib.sha256()
sha256.update(data.encode("utf-8"))
return sha256.hexdigest()
hash_file()
方法只是稍微复杂一点。在散列数据之前,我们必须首先打开文件并将其内容读入SHA-256
算法。完成后,我们调用hexdigest()
方法并返回生成的SHA-256
散列。现在让我们转向find_links()
方法,以及如何利用BeautifulSoup
快速找到所有相关链接。
def hash_file(file):
sha256 = hashlib.sha256()
with open(file, "rb") as in_file:
sha256.update(in_file.read())
return sha256.hexdigest()
find_links()
方法在其初始for
循环中完成了一些事情。首先,我们从网页数据中创建一个BeautifulSoup
对象。其次,在创建该对象时,我们指定只处理文档的一部分,特别是,<a href>
标记。这有助于限制 CPU 周期和内存使用,并允许我们只关注相关内容。SoupStrainer
对象是过滤器的别致名称,在本例中,只过滤<a href>
标记。
设置链接列表后,我们创建一些逻辑来测试它们是否属于这个域。在本例中,我们通过检查网站的 URL 是否是链接的一部分来实现这一点。通过该测试的任何链接不得以“#
”符号开头。在测试期间,在其中一个网站上,我们发现这会导致内部页面引用或命名锚作为单独的页面添加,这是不可取的。链接通过这些测试后,将其添加到集合队列中(除非它已存在于集合对象中)。处理完所有此类链接后,队列将返回给调用函数。recurse_pages()
函数对该函数进行多次调用,以查找我们索引的每个页面中的所有链接。
def find_links(website, page, queue):
for link in BeautifulSoup(page, "html.parser",
parse_only=SoupStrainer("a", href=True)):
if website in link.get("href"):
if not os.path.basename(link.get("href")).startswith("#"):
queue.add(link.get("href"))
return queue
recurse_pages()
函数将网站 URL、当前链接队列、未验证的 SSL 上下文和输出目录作为其输入。我们首先创建一个经过处理的列表来跟踪我们已经探索过的链接。我们还设置了循环计数器,稍后将其传递到write_output()
函数中,以对输出文件进行唯一命名。
接下来,我们开始可怕的while True
循环,这通常是一种有点危险的迭代方式,但在本例中,它用于继续在队列上迭代,随着我们发现更多页面,队列会逐渐变大。在这个循环中,我们将计数器增加1
,但更重要的是,检查处理后的列表长度是否与找到的所有链接的长度匹配。如果是这样,这个循环将被打破。但是,在满足该场景之前,脚本将继续迭代所有链接,寻找更多内部链接并将它们写入输出目录。
def recurse_pages(website, queue, context, output_dir):
processed = []
counter = 0
while True:
counter += 1
if len(processed) == len(queue):
break
我们开始遍历队列的副本来处理每个链接。我们使用 setcopy()
命令,以便在迭代循环期间更新队列而不产生错误。如果链接已经被处理,我们将继续下一个链接,以避免执行冗余任务。如果这是第一次处理链接,则不会执行continue
命令,而是将此链接附加到已处理列表中,以便将来不再处理它。
for link in queue.copy():
if link in processed:
continue
processed.append(link)
我们尝试打开并读取每个链接的数据。如果无法访问该网页,则打印并记录该网页,然后继续执行脚本。通过这种方式,我们保留了所有可以访问的页面,并有一个日志,其中包含无法访问和保留的链接的详细信息。
try:
page = urlopen(link, context=context).read().decode(
"utf-8")
except urllib.error.HTTPError as e:
msg = "Error accessing webpage: {}".format(link)
logger.error(msg)
continue
最后,对于我们能够访问的每个链接,我们通过传递链接名、页面数据、输出目录和计数器将其输出写入一个文件。我们还将queue
对象设置为新的集合,该集合将包含旧queue
中的所有元素以及find_links()
方法中的任何附加新链接。最终,根据网站的大小可能需要一些时间,我们将处理链接队列中的所有项目,并在将状态消息打印到控制台后退出脚本。
write_output(link, page, output_dir, counter)
queue = find_links(website, page, queue)
logger.info("Identified {} links throughout website".format(
len(queue)))
执行此脚本时,我们提供网站的 URL、输出文件夹和日志文件的路径,如下所示:
然后,我们可以在浏览器中打开输出文件并查看保留的内容:
我们可以通过多种方式扩展此脚本,包括:
- 收集 CSS、图像和其他资源
- 使用 selenium 在浏览器中截屏呈现的页面
- 将用户代理设置为伪装集合
难度:中等
Python 版本:3.5
操作系统:任何
VirusShare 是最大的私人恶意软件样本收集中心,拥有超过 2930 万个样本和计数。VirusShare 的一大好处是,除了每个恶意软件研究人员梦寐以求的恶意软件的字面意义上的聚宝盆之外,它还提供免费提供的恶意软件哈希列表。我们可以使用这些散列来创建一个非常全面的散列集,并在案例工作中利用它来识别潜在的恶意文件。
To learn more about and use VirusShare
, visit the website https://virusshare.com/.
在这个秘籍中,我们演示了如何从 VirusShare 自动下载哈希列表,以创建一个换行分隔的哈希列表。此列表可由取证工具(如 X-Ways)用于创建哈希集。例如,其他取证工具 EnCase 也可以使用此列表,但需要使用 EnScript 才能成功导入和创建哈希集。
此秘籍使用tqdm
第三方库创建信息进度条。可通过以下命令安装tqdm
模块。此秘籍中使用的所有其他库都是 Python 的本机库。
pip install tqdm==4.11.2
Learn more about the tqdm
library; visit https://github.com/noamraph/tqdm.
我们将在此秘籍中执行以下步骤:
- 阅读 VirusShare 哈希页面并动态标识最新的哈希列表。
- 初始化进度条并下载所需范围内的哈希列表。
首先,我们导入所需的库来处理参数解析、创建进度条以及与网页交互。
from __future__ import print_function
import argparse
import os
import ssl
import sys
import tqdm
from urllib.request import urlopen
import urllib.error
此秘籍的命令行处理程序接受一个位置参数OUTPUT_HASH
,即我们将创建的哈希集所需的文件路径。可选参数--start
捕获为整数,是哈希列表的可选起始位置。VirusShare 维护一个指向恶意软件散列的链接页面,其中每个链接包含一个介于65,536
和131,072``MD5
散列之间的列表。用户可以指定所需的起始位置,而不是下载所有哈希列表(这可能需要一些时间)。例如,如果一个人以前从 VirusShare 下载过哈希,现在希望下载最近发布的几个哈希列表,那么这可能很有用。
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("OUTPUT_HASH", help="Output Hashset")
parser.add_argument("--start", type=int,
help="Optional starting location")
args = parser.parse_args()
我们执行标准输入验证步骤,以确保提供的输入不会导致任何意外错误。我们使用os.path.dirname()
方法将目录路径与文件路径分离,并检查它是否存在。如果没有,我们现在就创建目录,而不是在尝试写入不存在的目录时遇到问题。最后,我们使用一个if
语句并将start
参数作为关键字提供给main()
函数(如果已提供)。
directory = os.path.dirname(args.OUTPUT_HASH)
if not os.path.exists(directory):
os.makedirs(directory)
if args.start:
main(args.OUTPUT_HASH, start=args.start)
else:
main(args.OUTPUT_HASH)
main()
功能是此秘籍中唯一的功能。虽然它很长,但任务相对简单,使得附加功能有些不必要。注意函数定义中的**kwargs
参数。这将创建一个字典,我们可以引用它来支持提供的关键字参数。在访问 VirusShare 网站之前,我们先设置一些变量并将状态消息打印到控制台。我们使用ssl._create_unverified_context()
来绕过 Python 3.X 中接收到的 SSL 验证错误。
def main(hashset, **kwargs):
url = "https://virusshare.com/hashes.4n6"
print("[+] Identifying hash set range from {}".format(url))
context = ssl._create_unverified_context()
我们使用一个try
和except
块,在未经验证的 SSL 上下文中使用urllib.request.urlopen()
方法打开 VirusShare 哈希页。我们使用read()
方法读取页面数据并将其解码为 UTF-8。如果我们在尝试访问此页面时收到错误,我们将向控制台打印一条状态消息,并相应地退出脚本。
try:
index = urlopen(url, context=context).read().decode("utf-8")
except urllib.error.HTTPError as e:
print("[-] Error accessing webpage - exiting..")
sys.exit(1)
下载页面数据的第一个任务是识别最新的哈希列表。我们通过在 VirusShare 哈希列表中查找 HTMLhref
标记的最后一个实例来实现这一点。例如,示例链接可能看起来像“hashes/VirusShare_00288.md5
”。我们使用字符串切片和方法从链接中分离散列号(【上一个示例中的 T2】。我们现在检查kwargs
字典,看看是否提供了start
参数。如果不是,我们将start
变量设置为零,以下载第一个哈希列表和所有中间哈希列表(包括最后一个),从而创建哈希集。
tag = index.rfind(r'<a href="hashes/VirusShare_')
stop = int(index[tag + 27: tag + 27 + 5].lstrip("0"))
if "start" not in kwargs:
start = 0
else:
start = kwargs["start"]
在开始下载散列列表之前,我们执行健全性检查并验证start
变量。具体地说,我们检查它是小于零还是大于最新的哈希列表。我们正在使用start
和stop
变量初始化for
循环和进度条,因此必须验证start
变量以避免意外结果。如果用户提供了错误的start
参数,我们将向控制台打印一条状态消息并退出脚本。
在最后一次健全性检查之后,我们向控制台打印一条状态消息,并将hashes_downloaded
计数器设置为零。我们在稍后的状态消息中使用此计数器来记录下载并写入哈希列表的哈希数。
if start < 0 or start > stop:
print("[-] Supplied start argument must be greater than or equal "
"to zero but less than the latest hash list, "
"currently: {}".format(stop))
sys.exit(2)
print("[+] Creating a hashset from hash lists {} to {}".format(
start, stop))
hashes_downloaded = 0
如第 1 章、基本脚本和文件信息秘籍中所述,我们可以使用tqdm.trange()
方法替代内置range()
方法来创建循环和进度条。我们为它提供所需的start
和stop
整数,并设置进度条的刻度和说明。由于range()
的工作方式,我们必须将1
添加到stop
整数中,以实际下载最后一个哈希列表。
在for
循环中,我们创建一个基本 URL 并插入一个五位数的数字来指定适当的哈希列表。我们通过将整数转换为字符串,并使用zfill()
确保数字有五个字符,方法是在字符串前面加上零,直到它有五个数字长。接下来,和前面一样,我们使用try
和except
打开、读取和解码哈希列表。我们在任何新行字符上拆分以快速创建哈希列表。如果访问网页时遇到错误,我们会将状态消息打印到控制台并继续执行,而不是退出脚本。
for x in tqdm.trange(start, stop + 1, unit_scale=True,
desc="Progress"):
url_hash = "https://virusshare.com/hashes/VirusShare_"\
"{}.md5".format(str(x).zfill(5))
try:
hashes = urlopen(
url_hash, context=context).read().decode("utf-8")
hashes_list = hashes.split("\n")
except urllib.error.HTTPError as e:
print("[-] Error accessing webpage for hash list {}"
" - continuing..".format(x))
continue
一旦我们有了散列列表,我们在“a+
”模式下打开散列集文本文件,将其附加到文本文件的底部,如果该文件不存在,则创建该文件。之后,我们只需要迭代下载的哈希列表并将每个哈希写入文件。请注意,每个哈希列表都以几个注释行(由#
符号表示)开始,因此我们实现了除了空行之外忽略这些行的逻辑。下载所有哈希并写入文本文件后,我们将向控制台打印一条状态消息,并指示下载的哈希数。
with open(hashset, "a+") as hashfile:
for line in hashes_list:
if not line.startswith("#") and line != "":
hashes_downloaded += 1
hashfile.write(line + '\n')
print("[+] Finished downloading {} hashes into {}".format(
hashes_downloaded, hashset))
当我们运行此脚本时,哈希开始在本地下载,并存储在指定的文件中,如下所示:
预览输出文件时,我们可以看到保存为纯文本的MD5
散列值。如前所述,我们可以直接将其导入到取证工具中,如 X-Ways,或通过脚本导入到 EnCase(中)http://www.forensickb.com/2014/02/enscript-to-create-encase-v7-hash-set.html 。
难度:中等
Python 版本:3.5
操作系统:任何
在此秘籍中,我们使用免费的在线病毒、恶意软件和 URL 扫描器VirusTotal,自动审查潜在的恶意网站或文件。VirusTotal 在其网站上维护其 API 的详细文档。我们将演示如何使用记录在案的 API 对系统执行基本查询,并将返回的结果存储到 CSV 文件中。
要遵循这个方法,您需要首先使用 VirusTotal 创建一个帐户,并在免费的公共 API 和私有 API 之间做出选择。公共 API 有请求限制,而私有 API 没有。例如,对于公共 API,我们限制为每分钟 4 个请求,每月 178560 个请求。有关不同 API 类型的更多详细信息,请访问 VirusTotal 的网站。我们将使用requests
库进行这些 API 调用。此库可以通过以下方式安装:
pip install requests==2.18.4
To learn more about and use VirusTotal
, visit the website at https://www.virustotal.com/. Learn more about the VirusTotal
Public API; visit https://www.virustotal.com/en/documentation/public-api/. Learn more about the VirusTotal
Private API; visit https://www.virustotal.com/en/documentation/private-api/.
要查看脚本所需的 API 密钥,请单击右上角的帐户名并导航到“我的 API 密钥”。在这里,您可以查看 API 密钥的详细信息并请求私钥。查看以下屏幕截图以了解更多详细信息。此脚本中使用的所有库都存在于 Python 的标准库中。
我们采用以下方法来实现我们的目标:
- 读入签名列表,作为域和 IP 或文件路径和哈希进行研究。
- 使用域和 IP 或文件的 API 查询 VirusTotal。
- 将结果展平为方便的格式。
- 将结果写入 CSV 文件。
首先,我们导入所需的库来处理参数解析、创建电子表格、哈希文件、解析 JSON 数据以及与网页交互。
from __future__ import print_function
import argparse
import csv
import hashlib
import json
import os
import requests
import sys
import time
这个秘籍的命令行处理程序比普通的要复杂一些。它采用三个位置参数INPUT_FILE
、OUTPUT_CSV
和API_KEY
,分别表示域和 IP 或文件路径的输入文本文件、所需的输出 CSV 位置以及包含要使用的 API 密钥的文本文件。除此之外,还有几个可选参数,-t
(或--type
和--limit
,用于指定输入文件和文件路径或域中的数据类型,并限制符合公共 API 限制的请求。默认情况下,type
参数配置为域值。如果增加limit
开关,则其布尔值为True
;否则将为False
。
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("INPUT_FILE",
help="Text File containing list of file paths/"
"hashes or domains/IPs")
parser.add_argument("OUTPUT_CSV",
help="Output CSV with lookup results")
parser.add_argument("API_KEY", help="Text File containing API key")
parser.add_argument("-t", "--type",
help="Type of data: file or domain",
choices=("file", "domain"), default="domain")
parser.add_argument(
"--limit", action="store_true",
help="Limit requests to comply with public API key restrictions")
args = parser.parse_args()
接下来,我们对输入文件和输出 CSV 执行标准数据验证过程。如果输入通过了数据验证步骤,我们将所有参数传递给main()
函数,否则退出脚本。
directory = os.path.dirname(args.OUTPUT_CSV)
if not os.path.exists(directory):
os.makedirs(directory)
if os.path.exists(args.INPUT_FILE) and os.path.isfile(args.INPUT_FILE):
main(args.INPUT_FILE, args.OUTPUT_CSV,
args.API_KEY, args.limit, args.type)
else:
print("[-] Supplied input file {} does not exist or is not a "
"file".format(args.INPUT_FILE))
sys.exit(1)
main()
函数首先将输入文件读入一个名为objects
的集合。这里使用了一个集合来减少对 API 的重复行和重复调用。通过这种方式,我们可以尝试延长不必要地触及公共 API 限制的时间。
def main(input_file, output, api, limit, type):
objects = set()
with open(input_file) as infile:
for line in infile:
if line.strip() != "":
objects.add(line.strip())
读入数据后,检查读入的数据类型是否在域和 IP 类别或文件路径中。根据类型,我们将数据集发送到适当的函数,该函数将向main()
函数返回 VirusTotal 查询结果。然后我们将这些结果发送到write_csv()
方法以写入输出。我们先来看看query_domain()
函数。
if type == "domain":
data = query_domain(objects, api, limit)
else:
data = query_file(objects, api, limit)
write_csv(data, output)
此函数首先对 API 密钥文件执行额外的输入验证,以确保在尝试使用所述密钥进行调用之前该文件存在。如果文件确实存在,我们将其读入api
变量。json_data
列表将存储从 VirusTotal API 调用返回的 JSON 数据。
def query_domain(domains, api, limit):
if not os.path.exists(api) and os.path.isfile(api):
print("[-] API key file {} does not exist or is not a file".format(
api))
sys.exit(2)
with open(api) as infile:
api = infile.read().strip()
json_data = []
在将状态消息打印到控制台后,我们开始循环遍历集合中的每个域或 IP 地址。对于每一项,我们将count
增加一,以跟踪我们进行了多少 API 调用。我们创建一个参数字典,存储要搜索的域或 IP 以及 API 密钥,并将scan
设置为1
。通过将scan
设置为1
,我们将自动提交域或 IP 以供审查,如果该域或 IP 不在 VirusTotal 数据库中。
我们使用requests.post()
方法进行 API 调用,使用参数字典查询适当的 URL 以获得结果。我们对返回的请求对象使用json()
方法将其转换为易于操作的 JSON 数据。
print("[+] Querying {} Domains / IPs using VirusTotal API".format(
len(domains)))
count = 0
for domain in domains:
count += 1
params = {"resource": domain, "apikey": api, "scan": 1}
response = requests.post(
'https://www.virustotal.com/vtapi/v2/url/report',
params=params)
json_response = response.json()
如果 API 调用成功,并且在 VirusTotal 数据库中找到了数据,我们将 JSON 数据附加到列表中。如果 VirusTotal 数据库中不存在数据,我们可以在创建报表后使用 API 检索报表。这里,为了简单起见,我们假设数据已经存在于他们的数据库中,并且只在发现结果时添加结果,而不是在项目不存在时等待生成报告。
if "Scan finished" in json_response["verbose_msg"]:
json_data.append(json_response)
接下来,我们检查limit
是否为True
,并且count
变量是否等于 3。如果是这样,我们需要等待一分钟,然后继续查询,以符合公共 API 限制。我们将状态消息打印到控制台,以便用户知道脚本正在做什么,并使用time.sleep()
方法暂停脚本执行一分钟。等待一分钟后,我们将计数重置为零,并开始查询列表中剩余的域或 IP。完成此过程后,我们将 JSON 结果列表返回给main()
函数。
if limit and count == 3:
print("[+] Halting execution for a minute to comply with "
"public API key restrictions")
time.sleep(60)
print("[+] Continuing execution of remaining Domains / IPs")
count = 0
return json_data
query_file()
方法与我们刚刚探索的query_domain()
方法类似。首先,我们验证 API 密钥文件是否存在,否则退出脚本。验证后,我们读入 API 键并将其存储在api
变量中,然后实例化json_data
列表以存储 API JSON 数据。
def query_file(files, api, limit):
if not os.path.exists(api) and os.path.isfile(api):
print("[-] API key file {} does not exist or is not a file".format(
api))
sys.exit(3)
with open(api) as infile:
api = infile.read().strip()
json_data = []
与query_domain()
函数不同,我们需要对每个文件路径执行一些额外的验证和处理,然后才能使用它。也就是说,我们需要验证每个文件路径是否有效,然后必须对每个文件进行散列,或者使用签名文件中提供的散列。我们散列这些文件,因为这是我们在 VirusTotal 数据库中查找它们的方式。回想一下,我们假设该文件已经存在于数据库中。扫描文件后,我们可以使用 API 提交样本和检索报告。
print("[+] Hashing and Querying {} Files using VirusTotal API".format(
len(files)))
count = 0
for file_entry in files:
if os.path.exists(file_entry):
file_hash = hash_file(file_entry)
elif len(file_entry) == 32:
file_hash = file_entry
else:
continue
count += 1
让我们快速查看一下这个函数。hash_file()
方法相对简单。此函数将文件路径作为其唯一输入,并返回所述文件的SHA-256
哈希值。我们通过创建一个hashlib
算法对象,一次将文件数据读入1,024
字节,然后调用hexdigest()
方法返回计算出的哈希值,实现了这一点,就像我们在第 1 章基本脚本和文件信息秘籍中所做的那样。说到这里,让我们看看query_file()
方法的其余部分。
def hash_file(file_path):
sha256 = hashlib.sha256()
with open(file_path, 'rb') as open_file:
buff_size = 1024
buff = open_file.read(buff_size)
while buff:
sha256.update(buff)
buff = open_file.read(buff_size)
return sha256.hexdigest()
query_file()
方法继续创建一个参数字典,其中包含要查找的 API 键和文件哈希。同样,我们使用requests.post()
和json()
方法分别进行 API 调用并将其转换为 JSON 数据。
params = {"resource": file_hash, "apikey": api}
response = requests.post(
'https://www.virustotal.com/vtapi/v2/file/report',
params=params)
json_response = response.json()
如果 API 调用成功,并且该文件已经存在于 VirusTotal 数据库中,我们将 JSON 数据附加到列表中。我们再次对计数和限制进行检查,以确保遵守公共 API 限制。完成所有 API 调用后,我们将 JSON 数据列表返回给main()
函数进行输出。
if "Scan finished" in json_response["verbose_msg"]:
json_data.append(json_response)
if limit and count == 3:
print("[+] Halting execution for a minute to comply with "
"public API key restrictions")
time.sleep(60)
print("[+] Continuing execution of remaining files")
count = 0
return json_data
write_csv()
方法首先检查输出数据是否确实包含 API 结果。如果没有,脚本将退出,而不是写入空的 CSV 文件。
def write_csv(data, output):
if data == []:
print("[-] No output results to write")
sys.exit(4)
如果我们有结果,我们将向控制台打印一条状态消息,并首先将 JSON 数据平铺成方便的输出格式。我们创建一个flatten_data
列表,该列表将存储每个扁平化的 JSON 字典。字段列表维护扁平 JSON 字典中的键列表和所需的列标题。
我们使用几个for
循环来获取 JSON 数据,并将包含这些数据的字典附加到列表中。完成此过程后,我们将有一个非常简单的字典结构列表。我们可以像以前一样使用csv.DictWriter
类来轻松处理这种类型的数据结构。
print("[+] Writing output for {} domains with results to {}".format(
len(data), output))
flatten_data = []
field_list = ["URL", "Scan Date", "Service",
"Detected", "Result", "VirusTotal Link"]
for result in data:
for service in result["scans"]:
flatten_data.append(
{"URL": result.get("url", ""),
"Scan Date": result.get("scan_date", ""),
"VirusTotal Link": result.get("permalink", ""),
"Service": service,
"Detected": result["scans"][service]["detected"],
"Result": result["scans"][service]["result"]})
数据集准备好输出后,我们打开 CSV 文件并创建DictWriter
类实例。我们在字典中为它提供文件对象和标题列表。我们先将标题写入电子表格,然后再将每个字典写入一行。
with open(output, "w", newline="") as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=field_list)
csv_writer.writeheader()
for result in flatten_data:
csv_writer.writerow(result)
下面的屏幕截图反映了我们何时针对文件和哈希运行脚本,以及第二次针对域和 IP 运行脚本:
查看输出,我们可以了解文件和哈希的恶意软件分类以及 CSV 格式的域或 IP 排名:
难度:中等
Python 版本:3.5
操作系统:任何
本食谱探索了被动式 API,以及如何使用它来自动审查域和 IP 地址。此服务在查看给定域的历史解析详细信息时特别有用。例如,您可能有一个可疑的网络钓鱼网站,并且根据历史解析模式,可以确定该网站的活动时间以及用于共享该 IP 的其他域。然后,当您确定攻击者在环境中危害多个用户时如何保持持久性的不同方式和方法时,这将为您提供额外的域以供查看和搜索。
要使用 PassiveTotal API,首先需要在他们的网站上创建一个免费帐户。登录后,您可以通过导航到帐户设置并单击 API 访问部分下的用户显示按钮来查看 API 密钥。请参阅下面的屏幕截图以获取此页面的视觉表示。
此脚本中使用的所有库都存在于 Python 的标准库中。但是,我们确实安装了 PassiveTotal Python API 客户端,并按照自述文件中的安装和设置说明进行操作 https://github.com/passivetotal/python_api 或与pip install passivetotal==1.0.30
一起。我们这样做是为了使用被动式命令行pt-client
应用程序。在这个脚本中,我们通过这个客户机进行 API 调用,而不是像在前面的方法中那样在更手动的级别上执行。在他们的网站上可以找到更多关于 PassiveTotal API 的详细信息,特别是如果你对开发更高级的东西感兴趣的话。
To learn more about and use PassiveTotal
, visit the website https://www.passivetotal.org. Learn more about the PassiveTotal
API; visit https://api.passivetotal.org/api/docs. Learn more about the PassiveTotal
Python API; visit https://github.com/passivetotal/python_api.
我们采用以下方法来实现我们的目标:
- 阅读要查看的域列表。
- 使用
subprocess
调用命令行pt-client
,并将结果返回到每个域的脚本中。 - 将结果写入 CSV 文件。
首先,我们导入所需的库来处理参数解析、创建电子表格、解析 JSON 数据和生成子流程。
from __future__ import print_function
import argparse
import csv
import json
import os
import subprocess
import sys
此秘籍的命令行处理程序分别为包含域和/或 IP 的输入文本文件和所需的输出 CSV 获取两个位置参数INPUT_DOMAINS
和OUTPUT_CSV
。
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("INPUT_DOMAINS",
help="Text File containing Domains and/or IPs")
parser.add_argument("OUTPUT_CSV",
help="Output CSV with lookup results")
args = parser.parse_args()
我们对每个输入执行标准输入验证步骤,以避免脚本中出现意外错误。验证输入后,我们调用main()
函数并将两个输入传递给它。
directory = os.path.dirname(args.OUTPUT_CSV)
if not os.path.exists(directory):
os.makedirs(directory)
if os.path.exists(args.INPUT_DOMAINS) and \
os.path.isfile(args.INPUT_DOMAINS):
main(args.INPUT_DOMAINS, args.OUTPUT_CSV)
else:
print(
"[-] Supplied input file {} does not exist or is not a "
"file".format(args.INPUT_DOMAINS))
sys.exit(1)
main()
功能相当简单,与上一个秘籍类似。我们再次使用集合来读取输入文件中的对象。再一次,这是为了避免对被动式 API 的冗余 API 调用,因为免费 API 每天都有限制。读入这些对象后,我们调用query_domains()
函数,该函数使用pt-client
应用程序进行 API 调用。一旦我们获得了 API 调用返回的所有 JSON 数据,我们就调用write_csv()
方法将数据写入 CSV 文件。
def main(domain_file, output):
domains = set()
with open(domain_file) as infile:
for line in infile:
domains.add(line.strip())
json_data = query_domains(domains)
write_csv(json_data, output)
query_domains()
函数首先创建一个json_data
列表来存储返回的 JSON 数据,并向控制台打印一条状态消息。然后,我们开始遍历输入文件中的每个对象,并删除任何“https://
”或“http://
”子字符串。在测试pt-client
时,观察到如果存在子字符串,则会生成内部服务器错误。例如,代替https://www.google.com ,查询应该是www.google.com。
def query_domains(domains):
json_data = []
print("[+] Querying {} domains/IPs using PassiveTotal API".format(
len(domains)))
for domain in domains:
if "https://" in domain:
domain = domain.replace("https://", "")
elif "http://" in domain:
domain = domain.replace("http://", "")
当域或 IP 地址准备好查询时,我们使用subprocess.Popen()
方法打开一个新进程并执行pt-client
应用程序。此过程中要执行的参数位于列表中。如果域为www.google.com,则将执行的命令类似于pt-client pdns -q www.gooogle.com
。将stdout
关键字参数提供为subprocess.PIPE
将为流程创建一个新管道,以便我们可以从查询中检索结果。我们在下面的代码行中通过调用communicate()
方法,然后将返回的数据转换成 JSON 结构,然后存储。
proc = subprocess.Popen(
["pt-client", "pdns", "-q", domain], stdout=subprocess.PIPE)
results, err = proc.communicate()
result_json = json.loads(results.decode())
如果 JSON 结果中有quota_exceeded
消息,那么我们已经超过了每日 API 限制,并将其打印到控制台并继续执行。我们继续执行,而不是退出,这样我们就可以写入在超出每日 API 配额之前检索到的任何结果。
if "message" in result_json:
if "quota_exceeded" in result_json["message"]:
print("[-] API Search Quota Exceeded")
continue
接下来,我们设置result_count
并检查它是否等于零。如果找到查询的结果,我们将结果附加到 JSON 列表中。在输入文件中的所有域和/或 IP 上执行此操作后,我们返回 JSON 列表。
result_count = result_json["totalRecords"]
print("[+] {} results for {}".format(result_count, domain))
if result_count == 0:
pass
else:
json_data.append(result_json["results"])
return json_data
write_csv()
方法非常简单。在这里,我们首先检查是否有数据要写入输出文件。然后,我们将状态消息打印到控制台,并创建标题列表和它们的写入顺序。
def write_csv(data, output):
if data == []:
print("[-] No output results to write")
sys.exit(2)
print("[+] Writing output for {} domains/IPs with "
"results to {}".format(len(data), output))
field_list = ["value", "firstSeen", "lastSeen", "collected",
"resolve", "resolveType", "source", "recordType",
"recordHash"]
创建了头列表后,我们使用csv.DictWriter
类设置输出 CSV 文件,写入头行,并迭代 JSON 结果中的每个字典,并将它们写入各自的行。
with open(output, "w", newline="") as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=field_list)
csv_writer.writeheader()
for result in data:
for dictionary in result:
csv_writer.writerow(dictionary)
运行脚本可以深入了解被动总计查找中每个项目的响应数:
CSV 报告显示收集的信息,如下所示: