-
Notifications
You must be signed in to change notification settings - Fork 4
Plugins
ExpBridge provides a comprehensive interface for reproducing upstream kernel bugs. It can easily integrate with existing bug assessment tools, as a form of a standalone plugin of ExpBridge.
In fact, all the current functionalities of ExpBridge (reproducing bugs on downstream, extracting kernel trace, automatically tuning the original PoCs, etc.) were implemented as one or more plugins. Plugins can share data with each other and collaborate with other plugins. ExpBridge also provides rich API for common functionalities, such as launching VM, detecting kernel crashes, etc.
-
RawBugReproduce:
RawBugReproduce
takes upstream PoC to downstream. Without any adaptation,RawBugReproduce
runs the PoC on the downstream distro and monitors any kernel crashes. -
SyzFeatureMinimize:
SyzFeatureMinimize
minimizes the features that are required by the testcase. Those features normally control how does PoC work, for example, repeat, sandbox. Minimizing the features provides the maximum capability to accommodate different downstream distro environments. -
TraceAnalysis:
TraceAnalysis
extract PoC execution traces from upstream and downstream distro. Those traces later feed toModulesAnalysis
for further analysis. -
ModulesAnalysis:
ModulesAnalysis
takes the kernel traces fromTraceAnalysis
, and compare the upstream trace with the downstream traces. The differences in their traces reflect the potential missing modules. -
BugReproduce:
BugReproduce
takes the results from previous plugins, and make adaptations to the upstream PoC and test it on downstream distros.
A blank plugin the following entries:
-
prepare()
is a common entry to prepare a plugin. It has no arguments, users are able to pass customized arguments through ExpBridge configuration file.prepare()
function can pass arguments toprepare_on_demand()
if necessary. -
prepare_on_demand()
is used to setup variables and conditions. It's also handy when you are testing a plugin by passing the arguments directly to the plugin without a config file. -
success()
can only be called once for each case. It makes the current case move to thesucceed
category. -
run()
is the main entry of a plugin. It's where you implement the main functionality of your plugin. It returnsTrue
to create a success stamp, otherwise, no stamp will be created. -
generate_report()
executes afterrun()
returnsTrue
. -
cleanup()
executes after plugin is finished.
from plugins import AnalysisModule
class Template(AnalysisModule):
NAME = "Template"
REPORT_START = "======================Template Report======================"
REPORT_END = "==================================================================="
REPORT_NAME = "Report_Template"
DEPENDENCY_PLUGINS = []
def __init__(self):
super().__init__()
def prepare(self):
plugin = self.cfg.get_plugin(self.NAME)
if plugin == None:
self.err_msg("No such plugin {}".format(self.NAME))
try:
self.greeting = int(plugin.greeting)
except AttributeError:
self.err_msg("Failed to get greeting")
return False
return self.prepare_on_demand()
def prepare_on_demand(self):
self._prepared = True
return True
def success(self):
return self._move_to_success
def run(self):
"""
do something
True: plugin return successfully
False: something goes wrong, stamp will not be created
"""
self.logger.info("Hello you, {}".format(self.greeting))
return True
def generate_report(self):
final_report = "\n".join(self.report)
self.info_msg(final_report)
self._write_to(final_report, self.REPORT_NAME)
def _write_to(self, content, name):
file_path = "{}/{}".format(self.path_case_plugin, name)
super()._write_to(content, file_path)
def cleanup(self):
super().cleanup()
This is a tutorial for writing a bug bisection plugin. This plugin tests whether a bug is reproducible on a list of decreased kernel versions. (e.g., v5.7, v5.6, v5.5 ...). If the bug fails to reproduce on a certain kernel version, it often indicates the buggy vulnerability was introduced later on. (e.g., if the bug reproduced on v5.7, v5.6, but failed on v5.5, it means the buggy commit appear between v5.5 to v5.6)
The plugin folder contains at least two files: __init__.py
and name_of_the_plugin.py
__init__.py
contains a DESCRIPTION
, ENABLE
, and AS_SERVICE
.
DESCRIPTION
is the description of the plugin that shows in the help information of Expbridge. ENABLE
decides whether the plugin is enabled. It doesn't automatically apply the plugin to the workflow, users have to pass the argument --name-of-the-plugin
to actually use the plugin. See more details in Run-ExpBridge. AS_SERVICE
determines whether to treat the plugin as a service. A service plugin can't be invoked through command line, it's often used by other plugins as a library. For example, SyzkallerInterface plugin provides SyzkallerInterface
class to other plugins.
# __init__.py
from .bug_bisection import BugBisection
DESCRIPTION = "BugBisection is a plugin that determines the first buggy kernel version"
ENABLE = True
AS_SERVICE = False
Now, we start writing the main functions of the plugin.
In prepare()
function, we want to retrieve the repro_timeout
, how many attempt
, and a kernel_version
list.
def prepare(self):
plugin = self.cfg.get_plugin(self.NAME)
if plugin == None:
self.err_msg("No such plugin {}".format(self.NAME))
try:
self.repro_timeout = int(plugin.timeout)
except AttributeError:
self.err_msg("Failed to get timeout")
return False
try:
self.repro_attempt = int(plugin.attempt)
except AttributeError:
self.repro_attempt = 3
try:
self.kernel_version = plugin.kernel_version
except AttributeError:
return False
return self.prepare_on_demand()
In the configuration file, the plugin config is defined as follows:
"BugBisection": {
"kernel_version": {"e5fd3e6": ["v5.7", "v5.6", "v5.5", "v5.4", "v5.3", "v5.2", "v5.1", "v5.0"]},
"timeout": 300,
"attempt": 10
}
Next, we write the main entrance - run()
function.
First step, we decide whether the PoC is 32bit or 64bit. And then, we iterate the kernel version
from the kernel_version
list. For each kernel version, we call build_upstream_kernel
to compile the Linux kernel, and test_poc
.
def run(self):
i386 = False
if regx_match(r'386', self.case["manager"]):
i386 = True
for version in self.kernel_version[self.case_hash]:
self.logger.info("Now testing {}".format(version))
if self.build_upstream_kernel(kernel_version=version) != 0:
self.err_msg("Failed to build upstream kernel")
return False
if not self.test_poc(i386=i386, version=version):
self.report.append("Bug doesn't reproduce on {}".format(version))
return True
self.report.append("Bug triggered on {}".format(version))
return True
Compiling the Linux kernel is easy. ExpBridge provides an API build_mainline_kernel
for building Linux kernel. We pass the specific kernel version to the commit
argument, and keep the original kernel configuration by setting keep_ori_config
to True
. For other arguments of build_mainline_kernel
, please refer to the API Reference
def build_upstream_kernel(self, kernel_version):
if self._check_stamp("BUILD_KERNEL"):
self._remove_stamp("BUILD_KERNEL")
ret = self.build_mainline_kernel(commit=kernel_version, keep_ori_config=True)
if ret == 0:
self._create_stamp("BUILD_SYZ_FEATURE_MINIMIZE_KERNEL")
return ret
We want to run the PoC on an upstream kernel in a virtual machine and monitor the kernel crash if it appears. Luckily, ExpBridge took care of most of the steps.
We first get the kernel from Config().get_kernel_by_name()
. self.kernel
is obtained from self.case['kernel'], which eventually comes from Syzbot. Config().get_kernel_by_name()
return a Vendor()
class. The Vendor()
class contains a Reproducer()
class that makes bug reproducing super easy.
The reproduce()
function needs several arguments, func
is the callback function that the VM will eventually invoke after it boots up. By using this callback function, you can decide what to do after VM is ready, how to reproduce the bug, or perform some preparation if needed. func_args
pass the callback function arguments to the callback function. vm_tag
isn't mandatory but it helps with the identification. timeout
specifies the maximum running time of the VM. The VM will be killed when the time is out. attempt
indicates the number of rounds of reproducing needed. If root
is true, it indicates reproducing the PoC as the root user. work_dir
is the work directory that saves the reproducing log, and VM log. c_hash
should be the bug hash value, it's used in multiple places, such as log file name.
The return values of reproduce()
are crash context, triggerability status, and extra output if there are any.
def test_poc(self, i386, version):
upstream = self.cfg.get_kernel_by_name(self.kernel)
if upstream == None:
self.logger.exception("Fail to get {} kernel".format(self.kernel))
return False
upstream.repro.init_logger(self.logger)
_, triggered, _ = upstream.repro.reproduce(func=self._capture_crash, func_args=(i386,), vm_tag='test {}'.format(version),\
timeout=self.repro_timeout + 100, attempt=self.repro_attempt, root=True, work_dir=self.path_case_plugin, c_hash=self.case_hash)
self.info_msg("crash triggered: {}".format(triggered))
return triggered
The last missing puzzle is the callback function _capture_crash()
. The callback function of reproduce()
has two mandatory arguments qemu
, which is a VM()
class object. It gives user the capability to operate the VM, such as uploading/downloading file to/from the VM. The second argument root
indicates whether reproducing the PoC by the root user. The following arguments are customized arguments, here we only pass the i386
to the callback function.
In order to run the PoC, we upload the PoC source code into the VM, and compile the source code to a Linux executable binary. And finally, we run the PoC program. Note that you don't need to worry about how to capture the kernel crash, ExpBridge did it for you. Normally, Expbridge will capture every kernel crash report, but you can also specify what crash you want to capture or exclude by setting the include
and exclude
in the configuration file. See more details here
def _capture_crash(self, qemu: VM, root: bool, i386: bool):
qemu.upload(user='root', src=["{}/poc.c".format(self.path_case_plugin)], dst="~/poc.c", wait=True)
if i386:
qemu.command(cmds="gcc -m32 -pthread -o poc poc.c", user="root", wait=True)
else:
qemu.command(cmds="gcc -pthread -o poc poc.c", user="root", wait=True)
qemu.command(cmds="./poc", user="root", wait=True, timeout=self.repro_timeout)
return
The final plugin can be found at here