本章介绍了以下秘籍:
- 是时候了
- 使用正则表达式解析 IIS 日志
- 探险
- 解读每日输出日志
- 将
daily.out
解析添加到 Axiom - 用 YARA 扫描指示器
如今,遇到配备了某种形式的事件或活动监控软件的现代系统并不罕见。此软件的实施可能有助于满足安全、调试或法规遵从性要求。无论在何种情况下,这一名副其实的信息宝库都可以而且通常被用于各种类型的网络调查。日志分析的一个常见问题是,需要筛选大量数据才能找到感兴趣的子集。通过本章中的食谱,我们将探索具有重大证据价值的各种日志,并演示如何快速处理和审查这些日志。具体而言,我们将涵盖:
- 将不同的时间戳格式(UNIX、FILETIME 等)转换为人类可读的格式
- 从 IIS 平台解析 web 服务器访问日志
- 使用 Splunk 的 Python API 接收、查询和导出日志
- 从 macOS
daily.out
日志中提取驱动器使用信息 - 从 Axiom 执行我们的
daily.out
日志解析器 - 使用 YARA 规则识别感兴趣文件的奖励秘籍
Visit www.packtpub.com/books/content/support to download the code bundle for this chapter.
食谱难度:简单
Python 版本:2.7 或 3.5
操作系统:任何
任何好的日志文件的一个重要元素是时间戳。此值表示日志中记录的活动或事件的日期和时间。这些日期值可以有多种格式,可以表示为数字或十六进制值。在日志之外,不同的文件和工件以不同的方式存储日期,即使数据类型保持不变。一个常见的区别因素是历元值,它是格式计算时间的日期。一个共同的纪元是 1970 年 1 月 1 日,尽管其他格式从 1601 年 1 月 1 日开始计算。不同格式之间的另一个不同因素是用于计数的间隔。虽然常见的格式以秒或毫秒为单位,但有些格式以时间块为单位,例如自纪元以来的 100 纳秒数。因此,这里开发的秘籍可以接受原始日期时间输入,并提供格式化的时间戳作为其输出。
此脚本中使用的所有库都存在于 Python 的标准库中。
为了在 Python 中解释常见的日期格式,我们执行以下操作:
- 设置参数以获取原始日期值、日期源和数据类型。
- 开发一个类,为不同日期格式的数据提供通用接口。
- 支持 Unix 历元值和 Microsoft
FILETIME
日期的处理。
我们首先导入用于参数处理和解析日期的库。具体来说,我们需要datetime
库中的datetime
类读取原始日期值,需要timedelta
类指定时间戳偏移量。
from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from datetime import datetime as dt
from datetime import timedelta
此秘籍的命令行处理程序采用三个位置参数,date_value
、source
和type
,分别表示要处理的日期值、日期值的来源(UNIX、FILETIME 等)和类型(整数或十六进制值)。我们对源和类型参数使用 choices 关键字来限制用户可以提供的选项。请注意,源参数使用自定义的get_supported_formats()
函数,而不是支持的日期格式的预定义列表。然后,我们获取这些参数并启动ParseDate
类的一个实例,在将其timestamp
属性打印到控制台之前,调用run()
方法来处理转换过程。
if __name__ == '__main__':
parser = ArgumentParser(
description=__description__,
formatter_class=ArgumentDefaultsHelpFormatter,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("date_value", help="Raw date value to parse")
parser.add_argument("source", help="Source format of date",
choices=ParseDate.get_supported_formats())
parser.add_argument("type", help="Data type of input value",
choices=('number', 'hex'), default='int')
args = parser.parse_args()
date_parser = ParseDate(args.date_value, args.source, args.type)
date_parser.run()
print(date_parser.timestamp)
让我们看看ParseDate
类是如何工作的。通过使用类,我们可以轻松地在其他脚本中扩展和实现此代码。从命令行参数中,我们接受日期值、日期源和值类型的参数。这些值和输出变量timestamp
在__init__
方法中定义:
class ParseDate(object):
def __init__(self, date_value, source, data_type):
self.date_value = date_value
self.source = source
self.data_type = data_type
self.timestamp = None
run()
方法是控制器,很像我们许多食谱的main()
函数,并根据日期源选择正确的方法进行调用。这使我们能够轻松地扩展类并添加新的支持。在这个版本中,我们只支持三种日期类型:Unix 纪元秒、Unix 纪元毫秒和 Microsoft 的 FILETIME。为了减少我们需要编写的方法的数量,我们将设计 Unix epoch 方法来处理秒和毫秒格式的时间戳。
def run(self):
if self.source == 'unix-epoch':
self.parse_unix_epoch()
elif self.source == 'unix-epoch-ms':
self.parse_unix_epoch(True)
elif self.source == 'windows-filetime':
self.parse_windows_filetime()
为了帮助那些希望在将来使用这个库的人,我们添加了一个查看支持哪些格式的方法。通过使用@classmethod
修饰符,我们公开了这个函数,而不需要首先初始化类。这就是我们可以在命令行处理程序中使用get_supported_formats()
方法的原因。只需记住在添加新功能时更新!
@classmethod
def get_supported_formats(cls):
return ['unix-epoch', 'unix-epoch-ms', 'windows-filetime']
parse_unix_epoch()
方法处理 Unix 历元时间的处理。我们指定一个可选参数milliseconds
,在处理秒和毫秒值之间切换此方法。首先,我们必须确定数据类型是"hex"
还是"number"
。如果它是"hex"
,我们将其转换为整数,如果它是"number"
,我们将其转换为浮点。如果我们不识别或不支持此方法的数据类型,例如string
,我们将向用户抛出错误并退出脚本。
转换值后,我们评估是否应将其视为毫秒值,如果是,则在进一步处理之前将其除以1,000
。接下来,我们使用datetime
类的fromtimestamp()
方法将数字转换为datetime
对象。最后,我们将此日期格式化为人类可读的格式,并将此字符串存储在timestamp
属性中。
def parse_unix_epoch(self, milliseconds=False):
if self.data_type == 'hex':
conv_value = int(self.date_value)
if milliseconds:
conv_value = conv_value / 1000.0
elif self.data_type == 'number':
conv_value = float(self.date_value)
if milliseconds:
conv_value = conv_value / 1000.0
else:
print("Unsupported data type '{}' provided".format(
self.data_type))
sys.exit('1')
ts = dt.fromtimestamp(conv_value)
self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
parse_windows_filetime()
类方法处理FILETIME
格式,通常存储为十六进制值。使用与前面类似的代码块,我们将"hex"
或"number"
值转换为 Python 对象,并针对任何其他提供的格式引发错误。一个区别是,在进一步处理之前,我们将日期值除以10
而不是1,000
。
而在前面的方法中,datetime
库处理历元偏移,这次我们需要单独处理这个偏移。使用timedelta
类,我们指定毫秒值并将其添加到表示 FILETIME 格式的历元的datetime
对象中。生成的datetime
对象现在可以为用户格式化和输出:
def parse_windows_filetime(self):
if self.data_type == 'hex':
microseconds = int(self.date_value, 16) / 10.0
elif self.data_type == 'number':
microseconds = float(self.date_value) / 10
else:
print("Unsupported data type '{}' provided".format(
self.data_type))
sys.exit('1')
ts = dt(1601, 1, 1) + timedelta(microseconds=microseconds)
self.timestamp = ts.strftime('%Y-%m-%d %H:%M:%S.%f')
运行此脚本时,我们可以提供时间戳,并以易于读取的格式查看转换后的值,如下所示:
这个脚本可以进一步改进。我们提供了以下一项或多项建议:
- 添加对其他类型时间戳(OLE、WebKit 等)的支持
- 通过
pytz
添加时区支持 - 使用
dateutil
处理难读日期的格式设置
难度:中等
Python 版本:3.5
操作系统:任何
来自 web 服务器的日志对于生成用户统计数据非常有用,它为我们提供了有关所用设备和访问者地理位置的深入信息。它们还为检查人员提供了澄清,以寻找试图利用 web 服务器或其他未经授权使用的用户。虽然这些日志存储重要的细节,但它们以一种不便于有效分析的方式进行存储。如果您试图手动执行此操作,则字段名将在文件顶部指定,并要求您在阅读文本文件时记住字段的顺序。幸运的是,有更好的办法。使用下面的脚本,我们将演示如何迭代每一行,将值映射到字段,并创建一个正确显示结果的电子表格,从而更容易快速分析数据集。
此脚本中使用的所有库都存在于 Python 的标准库中。
要正确形成此秘籍,我们需要采取以下步骤:
- 接受输入日志文件和输出 CSV 文件的参数。
- 为每个日志列定义正则表达式模式。
- 遍历日志中的每一行,并以我们可以解析单个元素和处理带引号的空格字符的方式准备每一行。
- 验证每个值并将其映射到相应的列。
- 将映射列和值写入电子表格报表。
我们首先导入用于参数处理和日志记录的库,然后是解析和验证日志信息所需的内置库。其中包括re
正则表达式库和shlex
词法分析器库。我们还包括用于处理日志消息和报告输出的sys
和csv
。我们通过调用getLogger()
方法初始化秘籍的日志对象。
from __future__ import print_function
from argparse import ArgumentParser, FileType
import re
import shlex
import logging
import sys
import csv
logger = logging.getLogger(__file__)
在导入之后,我们为将从日志中解析的字段定义模式。这些信息在不同的日志之间可能会有所不同,尽管这里表达的模式应该涵盖日志中的大多数元素。
You may need to add, remove, or reorder some of the patterns defined as follows to properly parse the IIS log you are working with. These patterns should cover the common elements found in IIS logs.
我们将这些模式构建为一个名为iis_log_format
的元组列表,其中第一个元组元素是列名,第二个是正则表达式模式,用于验证预期内容。通过使用正则表达式模式,我们可以定义一组数据必须遵循才能有效的规则。这些列必须按照它们在日志中出现的顺序表示;否则,代码将无法正确地将值映射到列。
iis_log_format = [
("date", re.compile(r"\d{4}-\d{2}-\d{2}")),
("time", re.compile(r"\d\d:\d\d:\d\d")),
("s-ip", re.compile(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
("cs-method", re.compile(
r"(GET)|(POST)|(PUT)|(DELETE)|(OPTIONS)|(HEAD)|(CONNECT)")),
("cs-uri-stem", re.compile(r"([A-Za-z0-1/\.-]*)")),
("cs-uri-query", re.compile(r"([A-Za-z0-1/\.-]*)")),
("s-port", re.compile(r"\d*")),
("cs-username", re.compile(r"([A-Za-z0-1/\.-]*)")),
("c-ip", re.compile(
r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4}")),
("cs(User-Agent)", re.compile(r".*")),
("sc-status", re.compile(r"\d*")),
("sc-substatus", re.compile(r"\d*")),
("sc-win32-status", re.compile(r"\d*")),
("time-taken", re.compile(r"\d*"))
]
此秘籍的命令行处理程序采用两个位置参数iis_log
和csv_report
,分别表示要处理的 IIS 日志和所需的 CSV 路径。此外,此秘籍还接受可选参数l
,指定秘籍日志文件的输出路径。
接下来,我们初始化秘籍的日志记录实用程序,并将其配置为控制台和基于文件的日志记录。这一点很重要,因为当我们无法为用户解析行时,我们应该以正式的方式注意这一点。以这种方式,如果出现故障,它们不应该错误地认为所有行都已成功解析并显示在生成的 CSV 电子表格中。我们还希望记录运行时消息,包括脚本的版本和提供的参数。现在,我们已经准备好调用main()
函数并启动脚本。有关设置日志对象的更详细说明,请参阅第 1 章、基本脚本和文件信息秘籍中的日志秘籍。
if __name__ == '__main__':
parser = ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument('iis_log', help="Path to IIS Log",
type=FileType('r'))
parser.add_argument('csv_report', help="Path to CSV report")
parser.add_argument('-l', help="Path to processing log",
default=__name__ + '.log')
args = parser.parse_args()
logger.setLevel(logging.DEBUG)
msg_fmt = logging.Formatter("%(asctime)-15s %(funcName)-10s "
"%(levelname)-8s %(message)s")
strhndl = logging.StreamHandler(sys.stdout)
strhndl.setFormatter(fmt=msg_fmt)
fhndl = logging.FileHandler(args.log, mode='a')
fhndl.setFormatter(fmt=msg_fmt)
logger.addHandler(strhndl)
logger.addHandler(fhndl)
logger.info("Starting IIS Parsing ")
logger.debug("Supplied arguments: {}".format(", ".join(sys.argv[1:])))
logger.debug("System " + sys.platform)
logger.debug("Version " + sys.version)
main(args.iis_log, args.csv_report, logger)
logger.info("IIS Parsing Complete")
main()
函数处理此脚本中的大部分逻辑。我们创建一个列表parsed_logs
,在遍历日志文件中的行之前存储解析后的行。在for
循环中,我们剥离行并创建一个存储字典log_entry
,用于记录。通过跳过以注释(或磅)字符开头的行,或者如果行为空,我们可以加快处理速度,并防止列匹配中出现错误。
虽然 IIS 日志以空格分隔的值存储,但它们使用双引号转义包含空格的字符串。例如,useragent
字符串是单个值,但通常包含一个或多个空格。使用shlex
模块,我们可以使用shlex()
方法解析该行,并通过在空间值上正确划分数据来自动处理引号转义空间。这个库会减慢处理速度,所以我们只在包含双引号字符的行上使用它。
def main(iis_log, report_file, logger):
parsed_logs = []
for raw_line in iis_log:
line = raw_line.strip()
log_entry = {}
if line.startswith("#") or len(line) == 0:
continue
if '\"' in line:
line_iter = shlex.shlex(line_iter)
else:
line_iter = line.split(" ")
在正确分隔行的情况下,我们使用enumerate
函数逐步遍历记录中的每个元素,并提取相应的列名和模式。使用该模式,我们对值调用match()
方法,如果匹配,则在log_entry
字典中创建一个条目。如果该值与模式不匹配,我们将记录一个错误并在日志文件中提供整行。在遍历每一列之后,我们将记录字典附加到已解析日志记录的初始列表中,并对其余行重复此过程。
for count, split_entry in enumerate(line_iter):
col_name, col_pattern = iis_log_format[count]
if col_pattern.match(split_entry):
log_entry[col_name] = split_entry
else:
logger.error("Unknown column pattern discovered. "
"Line preserved in full below")
logger.error("Unparsed Line: {}".format(line))
parsed_logs.append(log_entry)
处理完所有行后,我们会在准备write_csv()
方法之前向控制台打印一条状态消息。我们使用一个简单的列表理解表达式来提取iis_log_format
列表中每个元组的第一个元素,它表示一个列名。在提取了这些列之后,让我们看看报表编写器。
logger.info("Parsed {} lines".format(len(parsed_logs)))
cols = [x[0] for x in iis_log_format]
logger.info("Creating report file: {}".format(report_file))
write_csv(report_file, cols, parsed_logs)
logger.info("Report created")
报表编写器使用我们之前探讨过的方法创建 CSV 文件。因为我们将这些行存储为字典列表,所以我们可以使用csv.DictWriter
类轻松地创建包含四行代码的报告。
def write_csv(outfile, fieldnames, data):
with open(outfile, 'w', newline="") as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames)
csvfile.writeheader()
csvfile.writerows(data)
当我们查看脚本生成的 CSV 报告时,我们会在示例输出中看到以下字段:
这个脚本可以进一步改进。以下是一项建议:
- 虽然我们可以像脚本开始时那样定义正则表达式模式,但我们可以使用正则表达式管理库来简化我们的工作。一个例子是
grok
库,它用于为模式创建变量名。这使我们能够轻松地组织和扩展模式,因为我们可以用名称而不是字符串值来表示它们。该库被其他平台(如 ELK 堆栈)用于正则表达式的管理和实现。
难度:中等
Python 版本:2.7
操作系统:任何
由于保留了详细程度和时间范围,日志文件可能很快变得相当大。正如您可能已经注意到的,先前秘籍中的 CSV 报告很容易变得太大,以至于我们的电子表格应用程序无法高效地打开或浏览。一种替代方法是将数据加载到数据库中,而不是在电子表格中分析这些数据。
Splunk是一个将 NoSQL 数据库与摄取和查询引擎结合在一起的平台,使其成为一个强大的分析工具。其数据库以类似 Elasticsearch 或 MongoDB 的方式运行,允许存储文档或结构化记录。因此,我们不需要提供具有一致键值映射的记录来将它们存储在数据库中。这就是为什么 NoSQL 数据库对日志分析如此有用的原因,因为日志格式可以根据事件类型而变化。
在本秘籍中,我们学习将上一秘籍中的 CSV 报告索引到 Splunk 中,从而允许我们与平台内的数据进行交互。我们还设计了脚本来对数据集运行查询,并将响应查询的结果数据子集导出到 CSV 文件。这些过程在不同的阶段处理,因此我们可以根据需要独立查询和导出数据。
此秘籍需要安装第三方库splunk-sdk
。此脚本中使用的所有其他库都存在于 Python 的标准库中。此外,我们必须在主机操作系统上安装 Splunk,并且由于splunk-sdk
库的限制,必须使用 Python 2 运行脚本。
要安装 Splunk,我们需要导航到Splunk.com,填写表格,然后选择 Splunk Enterprise 免费试用下载。此企业试用版允许我们使用 API 进行练习,并使我们能够每天上传 500 MB。下载应用程序后,我们需要启动它来配置应用程序。虽然有很多配置我们可以更改,但现在还是使用默认设置来启动它,以保持简单并专注于 API。这样,服务器的默认地址将为localhost:8000
。通过在浏览器中导航到此地址,我们可以首次登录,设置帐户并(请执行此操作)更改管理员密码。
The default username and password for a new Splunk install is admin and changeme.
当 Splunk 实例处于活动状态时,我们现在可以安装 API 库。这个库处理从 RESTAPI 到 Python 对象的转换。在撰写本书时,Splunk API 仅在 Python2 中可用。splunk-sdk
库可安装pip
:
pip install splunk-sdk==1.6.2
To learn more about the splunk-sdk
library, visit http://dev.splunk.com/python.
现在环境已正确配置,我们可以开始开发代码了。此脚本将索引新数据以进行 Splunk,对该数据运行查询,并将响应查询的数据子集导出到 CSV 文件。要做到这一点,我们需要:
- 开发一个健壮的参数处理界面,允许用户指定这些选项。
- 构建一个类来处理具有各种属性方法的操作。
- 创建方法来处理索引新数据和创建数据存储索引的过程。
- 设置以允许提供信息报告的方式运行 Splunk 查询的方法。
- 提供将报告导出为 CSV 格式的机制。
我们首先导入此脚本所需的库,包括新安装的splunklib
。为了防止由于用户的无知而产生不必要的错误,我们使用sys
库来确定执行脚本的 Python 版本,如果不是 Python 2,则会引发错误。
from __future__ import print_function
from argparse import ArgumentParser, ArgumentError
from argparse import ArgumentDefaultsHelpFormatter
import splunklib.client as client
import splunklib.results as results
import os
import sys
import csv
if sys.version_info.major != 2:
print("Invalid python version. Must use Python 2 due to splunk api "
"library")
下一个要开发的逻辑块是秘籍的命令行参数处理程序。由于在这段代码中有许多选项和操作要执行,因此我们需要在这一部分花费一些额外的时间。因为这段代码是基于类的,所以我们必须在这一节中设置一些额外的逻辑。
此秘籍的命令行处理程序接受一个位置输入action
,它表示要运行的操作(索引、查询或导出)。此秘籍还支持七个可选参数:index
、config
、file
、query
、cols
、host
和port
。让我们开始看看所有这些选项的作用。
index
参数实际上是必需参数,用于指定要从中摄取、查询或导出数据的 Splunk 索引的名称。这可以是现有的或新的index
名称。config
参数是指包含 Splunk 实例的用户名和密码的配置文件。如参数帮助中所述,此文件应在执行代码的位置之外进行保护和存储。在企业环境中,您可能需要进一步保护这些凭据。
if __name__ == '__main__':
parser = ArgumentParser(
description=__description__,
formatter_class=ArgumentDefaultsHelpFormatter,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument('action', help="Action to run",
choices=['index', 'query', 'export'])
parser.add_argument('--index-name', help="Name of splunk index",
required=True)
parser.add_argument('--config',
help="Place where login details are stored."
" Should have the username on the first line and"
" the password on the second."
" Please Protect this file!",
default=os.path.expanduser("~/.splunk_py.ini"))
file
参数将用于提供file
至index
进入平台的路径,或用于指定将导出的query
数据写入的文件名。例如,我们将使用file
参数指向我们希望从上一个秘籍中摄取的 CSV 电子表格。query
参数还具有双重用途,可用于从 Splunk 运行查询或指定要导出为 CSV 的查询 ID。这意味着index
和query
动作只需要这些参数中的一个,而export
动作需要这两个参数。
parser.add_argument('--file', help="Path to file")
parser.add_argument('--query', help="Splunk query to run or sid of "
"existing query to export")
最后一块参数允许用户修改秘籍的默认属性。例如,cols
参数可用于指定源数据中要导出的列以及导出顺序。因为我们将查询和导出 IIS 日志,所以我们已经知道哪些列是可用的,哪些是我们感兴趣的。您可能希望根据正在研究的数据类型指定可选的默认列。最后两个参数包括host
和port
参数,每个参数默认为本地服务器,但可以配置为允许您与备用实例交互。
parser.add_argument(
'--cols',
help="Speficy columns to export. comma seperated list",
default='_time,date,time,sc_status,c_ip,s_ip,cs_User_Agent')
parser.add_argument('--host', help="hostname of server",
default="localhost")
parser.add_argument('--port', help="help", default="8089")
args = parser.parse_args()
通过指定参数,我们可以解析它们,并在执行秘籍之前验证是否满足所有要求。首先,我们必须打开并读取包含认证凭证的config
文件,其中username
在第一行,password
在第二行。使用这些信息,我们创建了一个字典conn_dict
,其中包含登录详细信息和服务器位置。此字典被传递到splunklib``client.connect()
方法。注意我们如何使用del()
方法删除包含这些敏感信息的变量。虽然用户名和密码仍然可以通过service
对象访问,但我们希望限制存储这些详细信息的区域数量。创建service
变量后,我们测试是否有任何应用程序安装在 Splunk 中,因为默认情况下至少有一个应用程序,并将其用作验证成功的测试。
with open(args.config, 'r') as open_conf:
username, password = [x.strip() for x in open_conf.readlines()]
conn_dict = {'host': args.host, 'port': int(args.port),
'username': username, 'password': password}
del(username)
del(password)
service = client.connect(**conn_dict)
del(conn_dict)
if len(service.apps) == 0:
print("Login likely unsuccessful, cannot find any applications")
sys.exit()
通过将列转换为列表并创建Spelunking
类实例,我们继续处理提供的参数。要初始化该类,我们必须为其提供service
变量、要执行的操作、索引名和列。使用这个,我们的类实例现在可以使用了。
cols = args.cols.split(",")
spelunking = Spelunking(service, args.action, args.index_name, cols)
接下来,我们使用一系列的if-elif-else
语句来处理我们预期会遇到的三种不同的操作。如果用户提供了index
动作,我们首先确认可选的file
参数存在,如果不存在则会引发错误。如果我们找到了它,我们将该值赋给Spelunking
类实例的相应属性。对于query
和export
操作重复这种逻辑,确认它们也与正确的可选参数一起使用。注意我们如何使用os.path.abspath()
函数为类指定文件的绝对路径。这允许splunklib
在系统上找到正确的文件。而且,在书中可能是最长的参数处理部分中,我们已经完成了必要的逻辑,现在可以调用类run()
方法来启动对请求的特定操作的处理。
if spelunking.action == 'index':
if 'file' not in vars(args):
ArgumentError('--file parameter required')
sys.exit()
else:
spelunking.file = os.path.abspath(args.file)
elif spelunking.action == 'export':
if 'file' not in vars(args):
ArgumentError('--file parameter required')
sys.exit()
if 'query' not in vars(args):
ArgumentError('--query parameter required')
sys.exit()
spelunking.file = os.path.abspath(args.file)
spelunking.sid = args.query
elif spelunking.action == 'query':
if 'query' not in vars(args):
ArgumentError('--query parameter required')
sys.exit()
else:
spelunking.query = "search index={} {}".format(args.index_name,
args.query)
else:
ArgumentError('Unknown action required')
sys.exit()
spelunking.run()
现在有了这些参数,让我们深入研究负责处理用户请求的操作的类。这个类有四个参数,包括service
变量、用户指定的action
、Splunk 索引名和要使用的列。所有其他属性都设置为None
,如前一个代码块所示,如果提供了这些属性,将在执行时进行适当初始化。这样做是为了限制类所需的参数数量,并处理某些属性未使用的情况。所有这些属性都是在类的开头初始化的,以确保我们已经指定了默认值。
class Spelunking(object):
def __init__(self, service, action, index_name, cols):
self.service = service
self.action = action
self.index = index_name
self.file = None
self.query = None
self.sid = None
self.job = None
self.cols = cols
run()
方法负责使用get_or_create_index()
方法从 Splunk 实例获取index
对象。它还检查在命令行中指定的操作,并调用相应的类实例方法。
def run(self):
index_obj = self.get_or_create_index()
if self.action == 'index':
self.index_data(index_obj)
elif self.action == 'query':
self.query_index()
elif self.action == 'export':
self.export_report()
return
顾名思义,get_or_create_index()
方法首先测试指定的索引是否存在并与之建立连接,或者如果该名称未找到任何索引,则创建一个新索引。由于该信息作为类似字典的对象存储在service
变量的indexes
属性中,因此我们可以通过名称轻松测试索引的存在性。
def get_or_create_index(self):
# Create a new index
if self.index not in self.service.indexes:
return service.indexes.create(self.index)
else:
return self.service.indexes[self.index]
为了从一个文件(比如 CSV 文件)中摄取数据,我们可以使用一行语句以index_data()
方法向实例发送信息。此方法使用splunk_index
对象的upload()
方法将文件发送到 Splunk 以供摄取。虽然 CSV 文件是如何导入数据的一个简单示例,但我们也可以使用上一个秘籍中的一些逻辑将原始日志读取到 Splunk 实例中,而无需中间 CSV 步骤。为此,我们希望使用一种不同的index
对象方法,允许我们单独发送每个解析的事件。
def index_data(self, splunk_index):
splunk_index.upload(self.file)
query_index()
方法稍微复杂一些,因为我们首先需要修改用户提供的查询。如下面的代码片段所示,我们需要将用户指定的列添加到初始查询中。这将使查询中未使用的字段在导出阶段可用。在这个修改之后,我们使用service.jobs.create()
方法在 Splunk 系统中创建一个新作业,并记录查询 SID。此 SID 将在导出阶段用于导出特定查询作业的结果。我们打印这些信息,以及 Splunk 实例中作业过期之前的时间。默认情况下,此生存时间值为300
秒或五分钟。
def query_index(self):
self.query = self.query + "| fields + " + ", ".join(self.cols)
self.job = self.service.jobs.create(self.query, rf=self.cols)
self.sid = self.job.sid
print("Query job {} created. will expire in {} seconds".format(
self.sid, self.job['ttl']))
如前所述,export_report()
方法使用前面方法中提到的 SID 来检查作业是否完成,并检索数据进行导出。为了做到这一点,我们迭代了可用的作业,如果我们的作业不存在,则发出警告。如果找到作业,但is_ready()
方法返回False
,则该作业仍在处理中,尚未准备好导出结果。
def export_report(self):
job_obj = None
for j in self.service.jobs:
if j.sid == self.sid:
job_obj = j
if job_obj is None:
print("Job SID {} not found. Did it expire?".format(self.sid))
sys.exit()
if not job_obj.is_ready():
print("Job SID {} is still processing. "
"Please wait to re-run".format(self.sir))
如果作业通过了这两个测试,我们将从 Splunk 中提取数据,并使用write_csv()
方法将其写入 CSV 文件。在此之前,我们需要初始化一个列表来存储作业结果。接下来,我们检索结果,指定感兴趣的列,并将原始数据读入job_results
变量。幸运的是,splunklib
提供了一个ResultsReader
将job_results
变量转换为字典列表。我们反复浏览这个列表,并将每个词典添加到export_data
列表中。最后,我们提供要导出到 CSV 编写器的文件路径、列名和数据集。
export_data = []
job_results = job_obj.results(rf=self.cols)
for result in results.ResultsReader(job_results):
export_data.append(result)
self.write_csv(self.file, self.cols, export_data)
这个类中的write_csv()
方法是@staticmethod
。此装饰器允许我们在类中使用通用方法,而无需指定实例。毫无疑问,本书其他地方使用的方法看起来很熟悉,我们打开输出文件,创建一个DictWriter
对象,然后将列标题和数据写入文件。
@staticmethod
def write_csv(outfile, fieldnames, data):
with open(outfile, 'wb') as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames,
extrasaction="ignore")
csvfile.writeheader()
csvfile.writerows(data)
在我们假设的用例中,第一阶段将是索引 CSV 电子表格中包含的前一个秘籍的数据。如下面的代码片段所示,我们提供了来自上一个秘籍的 CSV 文件,并将其添加到 Splunk 索引中。接下来,我们查找用户代理为 iPhone 的所有条目。最后,最后一个阶段涉及获取查询的输出并创建 CSV 报告。
成功执行这三个命令后,我们可以打开并查看过滤后的输出:
这个脚本可以进一步改进。我们提供了一个或多个建议,如下所示:
- Python 的 splunkapi(通常)还有许多其他特性。此外,可以使用更高级的查询技术生成数据,我们可以将这些数据处理成图形,供技术和非技术最终用户使用。进一步了解 Splunk API 为您提供的许多功能。
难度:中等
Python 版本:3.5
操作系统:任何
操作系统日志通常反映系统上软件、硬件和服务的事件。这些细节可以帮助我们在调查事件时进行调查,例如使用可移动设备。在 macOS 系统上发现的daily.out
日志就是一个可以证明对识别此活动有用的日志示例。此日志记录了大量信息,包括连接到机器的驱动器以及每天可用和使用的存储量。虽然我们还可以从该日志中了解关机时间、网络状态和其他信息,但我们将重点关注驱动器随时间的使用情况。
此脚本中使用的所有库都存在于 Python 的标准库中。
此脚本将利用以下步骤:
- 设置参数以接受日志文件和写入报告的路径。
- 构建一个类来处理日志各个部分的解析。
- 创建一个方法来提取相关部分并将其传递给进一步处理。
- 从这些部分提取磁盘信息。
- 创建 CSV 编写器以导出提取的详细信息。
我们首先导入参数处理、日期解释和编写电子表格所需的库。在 Python 中处理文本文件的一个好处是,您很少需要第三方库。
from __future__ import print_function
from argparse import ArgumentParser, FileType
from datetime import datetime
import csv
此秘籍的命令行处理程序接受两个位置参数daily_out
和output_report
,分别表示 daily.out 日志文件的路径和 CSV 电子表格所需的输出路径。注意我们是如何通过argparse.FileType
类传递一个打开的文件对象进行处理的。接下来,我们用日志文件初始化ProcessDailyOut
类,调用run()
方法,并将返回的结果存储在parsed_events
变量中。然后我们调用write_csv()
方法,使用processor
类对象中定义的列将结果写入所需输出目录中的电子表格。
if __name__ == '__main__':
parser = ArgumentParser(
description=__description__,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument("daily_out", help="Path to daily.out file",
type=FileType('r'))
parser.add_argument("output_report", help="Path to csv report")
args = parser.parse_args()
processor = ProcessDailyOut(args.daily_out)
parsed_events = processor.run()
write_csv(args.output_report, processor.report_columns, parsed_events)
在ProcessDailyOut
类中,我们设置了用户提供的属性,并定义了用于报表的列。注意我们如何添加两组不同的列:disk_status_columns
和report_columns
。report_columns
只是disk_status_columns
,带有两个附加字段,用于标识输入日期和时区。
class ProcessDailyOut(object):
def __init__(self, daily_out):
self.daily_out = daily_out
self.disk_status_columns = [
'Filesystem', 'Size', 'Used', 'Avail', 'Capacity', 'iused',
'ifree', '%iused', 'Mounted on']
self.report_columns = ['event_date', 'event_tz'] + \
self.disk_status_columns
run()
方法首先迭代提供的日志文件。在从每行的开头和结尾去掉空白字符后,我们验证内容以识别分段中的中断。"-- End of daily output --"
字符串会打断日志文件中的每个条目。每个条目包含由新行分隔的多个数据段。出于这个原因,我们必须使用几个代码块来分别分割和处理每个部分。
在这个循环中,我们收集单个事件的所有行,并将其传递给process_event()
方法,并将处理后的结果附加到最终返回的parsed_events
列表中。
def run(self):
event_lines = []
parsed_events = []
for raw_line in self.daily_out:
line = raw_line.strip()
if line == '-- End of daily output --':
parsed_events += self.process_event(event_lines)
event_lines = []
else:
event_lines.append(line)
return parsed_events
在process_event()
方法中,我们将定义变量,这些变量将允许我们分割事件的各个部分以进行进一步处理。为了更好地理解下一段代码,请花点时间回顾以下事件示例:
在这个事件中,我们可以看到第一个元素是日期值和时区,后面是一系列小节。每个小节标题都是以冒号结尾的一行;我们使用它来分割这个文件中的各种数据元素,如下面的代码所示。我们创建了一个字典,event_data
,在进一步处理每个小节之前,使用小节标题作为键,并将其内容(如果存在)作为值。
def process_event(self, event_lines):
section_header = ""
section_data = []
event_data = {}
for line in event_lines:
if line.endswith(":"):
if len(section_data) > 0:
event_data[section_header] = section_data
section_data = []
section_header = ""
section_header = line.strip(":")
如果节标题行没有以冒号结尾,我们将检查该行中是否正好有两个冒号。如果是这样,我们将尝试验证这一行作为日期值。要使用内置库处理此日期格式,我们需要将时区与日期的其余部分分开提取,因为在 Python3 版本中存在一个已知的错误,该错误使用%Z
格式化程序解析时区。出于好奇,更多关于这个 bug 的信息可以在上找到 https://bugs.python.org/issue22377 。
为了将时区与日期值分开,我们在空间值上划界字符串,将时区值(本例中的元素4
)放在它自己的变量中,然后将剩余的时间值连接到一个新字符串中,我们可以用datetime
库解析该字符串。如果字符串没有最少的5
元素,则可能引发IndexError
,如果datetime
格式字符串无效,则可能引发ValueError
。如果未提出这些错误类型中的任何一种,我们将日期分配给event_data
字典。如果我们收到这些错误中的任何一个,该行将被追加到section_data
列表中,下一个循环迭代将继续。这一点很重要,因为一行可能包含两个冒号,而不是日期值,因此我们不希望在脚本中删除该行而忽略该行。
elif line.count(":") == 2:
try:
split_line = line.split()
timezone = split_line[4]
date_str = " ".join(split_line[:4] + [split_line[-1]])
try:
date_val = datetime.strptime(
date_str, "%a %b %d %H:%M:%S %Y")
except ValueError:
date_val = datetime.strptime(
date_str, "%a %b %d %H:%M:%S %Y")
event_data["event_date"] = [date_val, timezone]
section_data = []
section_header = ""
except ValueError:
section_data.append(line)
except IndexError:
section_data.append(line)
此条件的最后一部分将任何包含内容的行追加到section_data
变量,以便根据需要进行进一步处理。这可以防止空行进入,并允许我们捕获两个节标题之间的所有信息。
else:
if len(line):
section_data.append(line)
我们通过调用任何子处理器来关闭此函数。此时,我们仅使用process_disk()
方法处理磁盘信息部分,尽管可以开发代码来提取其他感兴趣的值。此方法接受事件信息和事件日期作为其输入。磁盘信息作为已处理磁盘信息元素的列表返回,我们返回到run()
方法,并将值添加到已处理事件列表中。
return self.process_disk(event_data.get("Disk status", []),
event_data.get("event_date", []))
为了处理磁盘子部分,我们遍历每一行(如果有),并提取相关的事件信息。for
循环首先检查迭代次数并跳过第 0 行,因为它包含数据的列标题。对于任何其他行,我们使用列表理解,在单个空格上拆分行,去掉空白,并过滤掉任何空白字段。
def process_disk(self, disk_lines, event_dates):
if len(disk_lines) == 0:
return {}
processed_data = []
for line_count, line in enumerate(disk_lines):
if line_count == 0:
continue
prepped_lines = [x for x in line.split(" ")
if len(x.strip()) != 0]
接下来,我们初始化一个字典disk_info
,它保存事件信息以及该快照的日期和时区详细信息。for
循环使用enumerate()
函数将值映射到它们的列名。如果列名包含"/Volumes/"
(驱动器卷的标准装载点),我们将加入其余拆分项。这可确保名称中带有空格的卷得到适当保留。
disk_info = {
"event_date": event_dates[0],
"event_tz": event_dates[1]
}
for col_count, entry in enumerate(prepped_lines):
curr_col = self.disk_status_columns[col_count]
if "/Volumes/" in entry:
disk_info[curr_col] = " ".join(
prepped_lines[col_count:])
break
disk_info[curr_col] = entry.strip()
最里面的for
循环通过将磁盘信息附加到processed_data
列表中而结束。处理完 disk 部分中的所有行后,我们将processed_data
列表返回给父函数。
processed_data.append(disk_info)
return processed_data
最后,我们简要介绍了write_csv()
方法,该方法使用DictWriter
类打开文件,并将标题行和内容写入 CSV 文件。
def write_csv(outfile, fieldnames, data):
with open(outfile, 'w', newline="") as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames)
csvfile.writeheader()
csvfile.writerows(data)
运行此脚本时,我们可以在 CSV 报告中看到提取的详细信息。此输出的示例如下所示:
食谱难度:简单
Python 版本:2.7
操作系统:任何
使用我们刚刚开发的解析 macOSdaily.out
日志的代码,我们将此功能添加到 Axiom 中,Axiom 由磁铁取证开发,用于自动提取这些事件。由于 Axiom 支持取证图像和松散文件的处理,因此我们可以为其提供完整的采集或仅导出本例中的daily.out
日志。通过该工具提供的 API,我们可以访问和处理其引擎找到的文件,并直接在 Axiom 中返回结果供审查。
Magnet 取证团队为 Python 和 XML 开发了一个 API,以添加对在 Axiom 中创建自定义工件的支持。在撰写本书之际,Python API 只能通过IronPython
运行 Python 2.7 版才能获得。虽然我们已经在这个平台之外开发了我们的代码,但我们可以按照本秘籍中列出的步骤轻松地将其集成到 Axiom 中。我们使用 Axiom 版本 1.1.3.5726 测试和开发该秘籍。
我们首先需要在 Windows 实例中安装 Axiom,并确保我们的代码稳定且可移植。此外,我们的代码需要沙箱友好。Axiom 沙箱限制了第三方库的使用和对某些 Python 模块和函数的访问,这些模块和函数可能会导致代码在应用程序外部与系统交互。出于这个原因,我们设计了daily.out
解析器,只使用沙箱中安全的内置库来演示使用这些自定义构件进行开发的容易性。
为了开发和实现定制工件,我们需要:
- 在 Windows 计算机上安装 Axiom in。
- 导入我们开发的脚本。
- 创建
Artifact
类并定义解析器元数据和列。 - 开发
Hunter
类来处理工件处理和结果报告。
对于这个脚本,我们导入axiom
库和 datetime 库。请注意,我们已经删除了之前的argparse
和csv
导入,它们在这里是不必要的。
from __future__ import print_function
from axiom import *
from datetime import datetime
接下来,我们必须粘贴上一个秘籍中的ProcessDailyOut
类,不包括write_csv
或参数处理代码,以便在这个脚本中使用。由于当前版本的 API 不允许导入,我们必须将所有需要的代码捆绑到一个脚本中。为了节省页面并避免冗余,我们将省略本节中的代码块(尽管它存在于本章附带的代码文件中)。
下一个类是DailyOutArtifact
,它是 Axiom API 提供的Artifact
类的子类。在GetName()
方法中定义插件名称之前,我们调用AddHunter()
方法,提供我们(尚未显示)的hHunter
类。
class DailyOutArtifact(Artifact):
def __init__(self):
self.AddHunter(DailyOutHunter())
def GetName(self):
return 'daily.out parser'
此类的最后一个方法CreateFragments()
指定如何处理已处理的 daily.out 日志结果的单个条目。就 Axiom API 而言,片段是用于描述工件的单个条目的术语。此代码块允许我们添加自定义列名,并为这些列指定适当的类别和数据类型。类别包括日期、位置和工具定义的其他特殊值。我们的工件的大多数列将在None
类别中,因为它们不显示特定类型的信息。
一个重要的分类差异是DateTimeLocal
与DateTime
:DateTime
将日期作为 UTC 值呈现给用户,因此我们需要有意识地选择合适的日期类别。因为我们从 daily.out 日志条目中提取了时区,所以我们在这个秘籍中使用了DateTimeLocal
类别。FragmentType
属性是所有值的字符串,因为该类不会将字符串中的值转换为其他数据类型。
def CreateFragments(self):
self.AddFragment('Snapshot Date - LocalTime (yyyy-mm-dd)',
Category.DateTimeLocal, FragmentType.DateTime)
self.AddFragment('Snapshot Timezone', Category.None,
FragmentType.String)
self.AddFragment('Volume Name',
Category.None, FragmentType.String)
self.AddFragment('Filesystem Mount',
Category.None, FragmentType.String)
self.AddFragment('Volume Size',
Category.None, FragmentType.String)
self.AddFragment('Volume Used',
Category.None, FragmentType.String)
self.AddFragment('Percentage Used',
Category.None, FragmentType.String)
下一节课是我们的Hunter
。这个父类用于运行处理代码,正如您将看到的,它指定了 Axiom 引擎将提供给插件的平台和内容。在这个例子中,我们只想在计算机平台和一个只有一个名字的文件上运行它。RegisterFileName()
方法是指定插件将请求哪些文件的几个选项之一。我们还可以使用正则表达式或文件扩展名来选择要处理的文件。
class DailyOutHunter(Hunter):
def __init__(self):
self.Platform = Platform.Computer
def Register(self, registrar):
registrar.RegisterFileName('daily.out')
Hunt()
方法是魔法发生的地方。首先,我们获得一个临时路径,在该路径中可以在沙箱中读取文件,并将其分配给temp_daily_out
变量。有了这个打开的文件,我们将 file 对象交给ProcessDailyOut
类,并使用run()
方法解析该文件,就像上一个秘籍一样。
def Hunt(self, context):
temp_daily_out = open(context.Searchable.FileCopy, 'r')
processor = ProcessDailyOut(temp_daily_out)
parsed_events = processor.run()
在收集解析的事件信息之后,我们准备将数据“发布”到软件并将其显示给用户。在for
循环中,我们首先启动Hit()
对象,使用AddValue()
方法向新片段添加数据。一旦我们将事件值分配给命中,我们将使用PublishHit()
方法将命中发布到平台,并继续循环,直到所有解析的事件都已发布:
for entry in parsed_events:
hit = Hit()
hit.AddValue(
"Snapshot Date - LocalTime (yyyy-mm-dd)",
entry['event_date'].strftime("%Y-%m-%d %H:%M:%S"))
hit.AddValue("Snapshot Timezone", entry['event_tz'])
hit.AddValue("Volume Name", entry['Mounted on'])
hit.AddValue("Filesystem Mount", entry["Filesystem"])
hit.AddValue("Volume Size", entry['Size'])
hit.AddValue("Volume Used", entry['Used'])
hit.AddValue("Percentage Used", entry['Capacity'])
self.PublishHit(hit)
最后一位代码检查文件是否为None
,如果不是,将关闭它。这是处理代码的结尾,如果在系统上发现另一个daily.out
文件,可能会再次调用!
if temp_daily_out is not None:
temp_daily_out.close()
最后一行记录了我们在 Axiom 引擎中的辛勤工作,以确保框架包含并调用它。
RegisterArtifact(DailyOutArtifact())
要在 Axiom 中使用新开发的工件,我们需要采取更多步骤来导入代码并针对图像运行代码。首先,我们需要启动 Axiom 过程。在这里,我们将根据提供的证据加载、选择和运行工件。在工具菜单下,我们选择管理自定义工件选项:
在“管理自定义工件”窗口中,我们将看到任何现有的自定义工件,并可以导入新工件,如下所示:
我们将添加自定义工件,更新的“管理自定义工件”窗口应显示工件的名称:
现在我们可以按 OK 继续 Axiom,添加证据并配置处理选项。当我们到达 ComputerArtifacts 选择时,我们希望确认选择了 CustomArtifacts 来运行。不用说,我们应该只在机器运行 macOS 或有 macOS 分区的情况下运行这个工件:
完成其余配置选项后,我们可以开始处理证据。处理完成后,我们运行 Axiom EXPENCE 来查看处理结果。如下面的屏幕截图所示,我们可以导航到 ArtifactReview 的自定义窗格,并查看插件中解析的列!这些列可以使用 Axiom 中的标准选项进行排序和导出,无需我们提供任何额外代码:
难度:中等
Python 版本:3.5
操作系统:任何
作为奖励部分,我们将利用强大的另一个递归算法(YARA)正则表达式引擎来扫描感兴趣的文件和泄露指标。YARA 是一个模式匹配实用程序,设计用于恶意软件识别和事件响应。许多工具将此引擎用作识别可能的恶意文件的主干。通过这个食谱,我们学习如何获取 YARA 规则,编译它们,并在一个或多个文件夹或文件中匹配它们。虽然我们将不介绍形成 YARA 规则所需的步骤,但您可以从的文档中了解更多有关该过程的信息 http://yara.readthedocs.io/en/latest/writingrules.html 。
此秘籍需要安装第三方库yara
。此脚本中使用的所有其他库都存在于 Python 的标准库中。此库可与pip
一起安装:
pip install yara-python==3.6.3
To learn more about the yara-python
library, visit https://yara.readthedocs.io/en/latest/.
我们也可以使用 YaraRules(等项目 http://yararules.com )并使用行业和 VirusShare(中预先构建的规则 http://virusshare.com 使用真实的恶意软件样本进行分析。
该脚本有四个主要的开发步骤:
- 建立并编译 YARA 规则。
- 扫描单个文件。
- 遍历目录以处理单个文件。
- 将结果导出到 CSV。
该脚本导入所需的库来处理参数解析、文件和文件夹迭代、编写 CSV 电子表格,以及yara
库来编译和扫描 YARA 规则。
from __future__ import print_function
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import os
import csv
import yara
此秘籍的命令行处理程序接受两个位置参数yara_rules
和path_to_scan
,这两个参数分别表示 YARA 规则的路径和要扫描的文件或文件夹。此秘籍还接受一个可选参数output
,如果提供,它会将扫描结果写入电子表格,而不是控制台。最后,我们将这些值传递给main()
方法。
if __name__ == '__main__':
parser = ArgumentParser(
description=__description__,
formatter_class=ArgumentDefaultsHelpFormatter,
epilog="Developed by {} on {}".format(
", ".join(__authors__), __date__)
)
parser.add_argument(
'yara_rules',
help="Path to Yara rule to scan with. May be file or folder path.")
parser.add_argument(
'path_to_scan',
help="Path to file or folder to scan")
parser.add_argument(
'--output',
help="Path to output a CSV report of scan results")
args = parser.parse_args()
main(args.yara_rules, args.path_to_scan, args.output)
在main()
函数中,我们接受yara
规则的路径、要扫描的文件或文件夹以及输出文件(如果有)。由于yara
规则可以是文件或目录,因此我们使用ios.isdir()
方法来确定是在整个目录上使用compile()
方法,还是在输入是文件的情况下,使用filepath
关键字将其传递给方法。compile()
方法读取一个或多个规则文件,并创建一个可以与扫描对象匹配的对象。
def main(yara_rules, path_to_scan, output):
if os.path.isdir(yara_rules):
yrules = yara.compile(yara_rules)
else:
yrules = yara.compile(filepath=yara_rules)
一旦规则被编译,我们将执行类似的if-else
语句来处理要扫描的路径。如果要扫描的输入是目录,我们将其传递给process_directory()
函数,否则,我们使用process_file()
方法。两者都使用已编译的 YARA 规则和路径来扫描并返回包含任何匹配项的词典列表。
if os.path.isdir(path_to_scan):
match_info = process_directory(yrules, path_to_scan)
else:
match_info = process_file(yrules, path_to_scan)
正如您可能猜到的,如果指定了输出路径,我们将使用我们在columns
列表中定义的列,最终将此字典列表转换为 CSV 报告。但是,如果输出参数为None
,我们将以不同的格式将此数据写入控制台。
columns = ['rule_name', 'hit_value', 'hit_offset', 'file_name',
'rule_string', 'rule_tag']
if output is None:
write_stdout(columns, match_info)
else:
write_csv(output, columns, match_info)
process_directory()
函数基本上迭代一个目录,并将每个文件传递给process_file()
函数。这会减少脚本中的冗余代码量。由于返回的对象是一个列表,因此返回的每个已处理条目都会添加到match_info
列表中。处理完每个文件后,将结果的完整列表返回给父函数。
def process_directory(yrules, folder_path):
match_info = []
for root, _, files in os.walk(folder_path):
for entry in files:
file_entry = os.path.join(root, entry)
match_info += process_file(yrules, file_entry)
return match_info
process_file()
方法与yrules
对象的match()
方法一起使用。返回的 match 对象是一个 iterable,包含一个或多个违反规则的命中。从命中中,我们可以提取规则名称、任何标记、文件中的偏移量、规则的字符串值和命中的字符串值。此信息加上文件路径将在报告中形成一个条目。总的来说,这些信息有助于确定命中是假阳性还是有意义。当微调 YARA 规则以确保仅提供相关结果供审查时,这也会有所帮助。
def process_file(yrules, file_path):
match = yrules.match(file_path)
match_info = []
for rule_set in match:
for hit in rule_set.strings:
match_info.append({
'file_name': file_path,
'rule_name': rule_set.rule,
'rule_tag': ",".join(rule_set.tags),
'hit_offset': hit[0],
'rule_string': hit[1],
'hit_value': hit[2]
})
return match_info
若用户未指定输出文件,则 Towrite_stdout()
功能报告匹配信息至控制台。我们遍历match_info
列表中的每个条目,并以冒号分隔、换行分隔的格式打印match_info
字典中的每个列名及其值。在每个条目之后,我们打印30
等号,以直观地将条目彼此分开。
def write_stdout(columns, match_info):
for entry in match_info:
for col in columns:
print("{}: {}".format(col, entry[col]))
print("=" * 30)
write_csv()
方法遵循标准约定,使用DictWriter
类将标题和所有数据写入工作表。请注意,该函数是如何调整的,以使用'w'
模式和newline
参数处理 Python 3 中的 CSV 写入。
def write_csv(outfile, fieldnames, data):
with open(outfile, 'w', newline="") as open_outfile:
csvfile = csv.DictWriter(open_outfile, fieldnames)
csvfile.writeheader()
csvfile.writerows(data)
使用此代码,我们可以在命令行中提供适当的参数,并生成任何匹配项的报告。以下屏幕截图显示了用于检测 Python 文件和键盘记录器的自定义规则:
这些规则显示在输出 CSV 报告中,如果未指定报告,则显示在控制台中,如下所示: