-
Notifications
You must be signed in to change notification settings - Fork 5
Plugins
SyzBridge provides a comprehensive interface for reproducing upstream kernel bugs. It can easily integrate with existing bug assessment tools as standalone plugins of SyzBridge.
In fact, all the current functionalities of SyzBridge (reproducing bugs on downstream, extracting kernel trace, automatically tuning the original PoCs, etc.) were implemented as one or more plugins. These plugins can share data and collaborate with each other. SyzBridge also provides rich APIs for common functionalities, such as launching VMs or detecting kernel crashes, among others.
-
RawBugReproduce:
RawBugReproduce
runs upstream PoC on downstream, without any adaptation, and monitors kernel crashes if appear. -
SyzFeatureMinimize:
SyzFeatureMinimize
minimizes the testcase features. Those features control how PoC works, for example, repeat, sandbox. Minimizing those features provides the maximum capability to accommodate different downstream distro environments. -
TraceAnalysis:
TraceAnalysis
extracts PoC execution traces from upstream and downstream distro. Those traces are later fed toModulesAnalysis
for further analysis. -
ModulesAnalysis:
ModulesAnalysis
takes the kernel traces fromTraceAnalysis
, and compares the upstream trace with the downstream traces. The differences in the two traces reflect the potential missing modules. -
BugReproduce:
BugReproduce
takes the results from previous plugins, and makes adaptations to the upstream PoC and then tests it on downstream.
A blank plugin the following entries:
-
prepare()
is a common entry to prepare a plugin. It has no arguments, users are able to pass customized arguments through SyzBridge configuration file.prepare()
function can pass arguments toprepare_on_demand()
if necessary. -
prepare_on_demand()
is used to set up variables and conditions. It's also handy when you are testing a plugin by passing the arguments directly to the plugin without a config file. -
success()
can only be called once for each case. It makes the current case move to thesucceed
category. -
run()
is the main entry of a plugin. It's where you implement the main functionality of your plugin. It returnsTrue
to create a success stamp, otherwise, no stamp will be created. -
generate_report()
executes afterrun()
returnsTrue
. -
cleanup()
executes after plugin is finished.
from plugins import AnalysisModule
class Template(AnalysisModule):
NAME = "Template"
REPORT_START = "======================Template Report======================"
REPORT_END = "==================================================================="
REPORT_NAME = "Report_Template"
DEPENDENCY_PLUGINS = []
def __init__(self):
super().__init__()
def prepare(self):
plugin = self.cfg.get_plugin(self.NAME)
if plugin == None:
self.err_msg("No such plugin {}".format(self.NAME))
try:
self.greeting = int(plugin.greeting)
except AttributeError:
self.err_msg("Failed to get greeting")
return False
return self.prepare_on_demand()
def prepare_on_demand(self):
self._prepared = True
return True
def success(self):
return self._move_to_success
def run(self):
"""
do something
True: plugin return successfully
False: something goes wrong, stamp will not be created
"""
self.logger.info("Hello you, {}".format(self.greeting))
return True
def generate_report(self):
final_report = "\n".join(self.report)
self.info_msg(final_report)
self._write_to(final_report, self.REPORT_NAME)
def _write_to(self, content, name):
file_path = "{}/{}".format(self.path_case_plugin, name)
super()._write_to(content, file_path)
def cleanup(self):
super().cleanup()
This is a tutorial for writing a bug bisection plugin. This plugin tests whether a bug is reproducible on a list of decreasing kernel versions (e.g., v5.7, v5.6, v5.5...). If the bug fails to reproduce on a certain kernel version, it often indicates that the vulnerable code was introduced later (e.g., if the bug reproduced on v5.7 and v5.6 but failed on v5.5, it means the buggy commit appeared between v5.5 and v5.6).
The plugin folder contains at least two files: __init__.py
and name_of_the_plugin.py
__init__.py
contains a DESCRIPTION
, ENABLE
, and AS_SERVICE
.
DESCRIPTION
is the plugin description displayed in the help information of SyzBridge. ENABLE
determines whether the plugin is enabled. Enabling it doesn't automatically apply the plugin to the workflow; users must pass the argument --name-of-the-plugin
to use the plugin. For more details, refer to Run-SyzBridge. AS_SERVICE
determines whether to treat the plugin as a service. A service plugin can't be invoked through the command line and is often used by other plugins as a library. For example, SyzkallerInterface plugin provides SyzkallerInterface
class for use by other plugins.
# __init__.py
from .bug_bisection import BugBisection
DESCRIPTION = "BugBisection is a plugin that determines the first buggy kernel version"
ENABLE = True
AS_SERVICE = False
Now, we start writing the main functions of the plugin.
In prepare()
function, we want to retrieve the repro_timeout
, how many attempt
, and a kernel_version
list.
def prepare(self):
plugin = self.cfg.get_plugin(self.NAME)
if plugin == None:
self.err_msg("No such plugin {}".format(self.NAME))
try:
self.repro_timeout = int(plugin.timeout)
except AttributeError:
self.err_msg("Failed to get timeout")
return False
try:
self.repro_attempt = int(plugin.attempt)
except AttributeError:
self.repro_attempt = 3
try:
self.kernel_version = plugin.kernel_version
except AttributeError:
return False
return self.prepare_on_demand()
In the configuration file, the plugin config is defined as follows:
"BugBisection": {
"kernel_version": {"e5fd3e6": ["v5.7", "v5.6", "v5.5", "v5.4", "v5.3", "v5.2", "v5.1", "v5.0"]},
"timeout": 300,
"attempt": 10
}
Next, we write the main entry function, run()
.
The first step is to determine whether the PoC is 32-bit or 64-bit. Then, we iterate through the kernel version from the kernel_version
list. For each kernel version, we call build_upstream_kernel
to compile the Linux kernel and then reproduce the bug in test_poc
.
def run(self):
i386 = False
if regx_match(r'386', self.case["manager"]):
i386 = True
for version in self.kernel_version[self.case_hash]:
self.logger.info("Now testing {}".format(version))
if self.build_upstream_kernel(kernel_version=version) != 0:
self.err_msg("Failed to build upstream kernel")
return False
if not self.test_poc(i386=i386, version=version):
self.report.append("Bug doesn't reproduce on {}".format(version))
return True
self.report.append("Bug triggered on {}".format(version))
return True
Compiling the Linux kernel is straightforward. SyzBridge provides an API called build_mainline_kernel
for building the Linux kernel. We specify the desired kernel version in the commit
argument while preserving the original kernel configuration by setting keep_ori_config
to True
. For more information about the other arguments of build_mainline_kernel
, please refer to the API Reference
def build_upstream_kernel(self, kernel_version):
if self._check_stamp("BUILD_KERNEL"):
self._remove_stamp("BUILD_KERNEL")
ret = self.build_mainline_kernel(commit=kernel_version, keep_ori_config=True)
if ret == 0:
self._create_stamp("BUILD_SYZ_FEATURE_MINIMIZE_KERNEL")
return ret
We want to run the PoC on an upstream kernel in a virtual machine and monitor for kernel crashes if they occur. Fortunately, SyzBridge takes care of most of these steps.
First, we obtain the kernel from Config().get_kernel_by_name()
. The self.kernel
variable is derived from self.case['kernel']
, which ultimately originates from Syzbot. Config().get_kernel_by_name()
returns a Vendor()
class. This Vendor()
class contains a Reproducer()
class that simplifies bug reproduction.
The reproduce()
function requires several arguments. func
is the callback function that the virtual machine will eventually invoke after it boots up. By using this callback function, you can determine what to do once the VM is ready, how to reproduce the bug, or perform any necessary preparations. func_args
passes the callback function arguments to the callback function. vm_tag
isn't mandatory, but it helps with identification. timeout
specifies the maximum running time for the VM, which will be terminated if the time limit is reached. attempt
indicates the number of rounds of reproduction needed. If root
is set to true, it signifies reproducing the PoC as the root user. work_dir
is the working directory where the reproducing log and VM log are saved. c_hash
should be the bug hash value and is used in multiple places, such as in the log file name.
The return values of reproduce()
include the crash context, triggerability status, and any extra output, if available.
def test_poc(self, i386, version):
upstream = self.cfg.get_kernel_by_name(self.kernel)
if upstream == None:
self.logger.exception("Fail to get {} kernel".format(self.kernel))
return False
upstream.repro.init_logger(self.logger)
_, triggered, _ = upstream.repro.reproduce(func=self._capture_crash, func_args=(i386,), vm_tag='test {}'.format(version),\
timeout=self.repro_timeout + 100, attempt=self.repro_attempt, root=True, work_dir=self.path_case_plugin, c_hash=self.case_hash)
self.info_msg("crash triggered: {}".format(triggered))
return triggered
The final missing piece is the callback function _capture_crash()
. The callback function for reproduce()
has two mandatory arguments: qemu
, which is an object of the VM()
class, granting the user the capability to operate the VM (e.g., uploading/downloading files to/from the VM), and the second argument root
, which indicates whether the PoC is being reproduced by the root user. The following arguments are customized; here, we only pass i386
to the callback function.
To run the PoC, we upload the PoC source code into the VM, compile the source code into a Linux executable binary, and ultimately execute the PoC program. It's important to note that you don't need to worry about how to capture the kernel crash; SyzBridge takes care of this for you. Normally, SyzBridge captures every kernel crash report, but you can also specify which crashes to capture or exclude by setting the include
and exclude
options in the configuration file. See more details here
def _capture_crash(self, qemu: VM, root: bool, i386: bool):
qemu.upload(user='root', src=["{}/poc.c".format(self.path_case_plugin)], dst="~/poc.c", wait=True)
if i386:
qemu.command(cmds="gcc -m32 -pthread -o poc poc.c", user="root", wait=True)
else:
qemu.command(cmds="gcc -pthread -o poc poc.c", user="root", wait=True)
qemu.command(cmds="./poc", user="root", wait=True, timeout=self.repro_timeout)
return
The final plugin can be found at here