Skip to content

Plugins

ETenal edited this page Oct 5, 2023 · 20 revisions

ExpBridge provides a comprehensive interface for reproducing upstream kernel bugs. It can easily integrate with existing bug assessment tools, as a form of a standalone plugin of ExpBridge.

In fact, all the current functionalities of ExpBridge (reproducing bugs on downstream, extracting kernel trace, automatically tuning the original PoCs, etc.) were implemented as one or more plugins. Plugins can share data with each other and collaborate with other plugins. ExpBridge also provides rich API for common functionalities, such as launching VM, detecting kernel crashes, etc.

  • RawBugReproduce: RawBugReproduce takes upstream PoC to downstream. Without any adaptation, RawBugReproduce runs the PoC on the downstream distro and monitors any kernel crashes.

  • SyzFeatureMinimize: SyzFeatureMinimize minimizes the features that are required by the testcase. Those features normally control how does PoC work, for example, repeat, sandbox. Minimizing the features provides the maximum capability to accommodate different downstream distro environments.

  • TraceAnalysis: TraceAnalysis extract PoC execution traces from upstream and downstream distro. Those traces later feed to ModulesAnalysis for further analysis.

  • ModulesAnalysis: ModulesAnalysis takes the kernel traces from TraceAnalysis, and compare the upstream trace with the downstream traces. The differences in their traces reflect the potential missing modules.

  • BugReproduce: BugReproduce takes the results from previous plugins, and make adaptations to the upstream PoC and test it on downstream distros.

Workflow

A blank plugin the following entries:

  • prepare() is a common entry to prepare a plugin. It has no arguments, users are able to pass customized arguments through ExpBridge configuration file. prepare() function can pass arguments to prepare_on_demand() if necessary.

  • prepare_on_demand() is used to setup variables and conditions. It's also handy when you are testing a plugin by passing the arguments directly to the plugin without a config file.

  • success() can only be called once for each case. It makes the current case move to the succeed category.

  • run() is the main entry of a plugin. It's where you implement the main functionality of your plugin. It returns True to create a success stamp, otherwise, no stamp will be created.

  • generate_report() executes after run() returns True.

  • cleanup() executes after plugin is finished.

from plugins import AnalysisModule

class Template(AnalysisModule):
    NAME = "Template"
    REPORT_START = "======================Template Report======================"
    REPORT_END =   "==================================================================="
    REPORT_NAME = "Report_Template"
    DEPENDENCY_PLUGINS = []

    def __init__(self):
        super().__init__()
        
    def prepare(self):
        plugin = self.cfg.get_plugin(self.NAME)
        if plugin == None:
            self.err_msg("No such plugin {}".format(self.NAME))
        try:
            self.greeting = int(plugin.greeting)
        except AttributeError:
            self.err_msg("Failed to get greeting")
            return False
        return self.prepare_on_demand()
    
    def prepare_on_demand(self):
        self._prepared = True
        return True
    
    def success(self):
        return self._move_to_success

    def run(self):
        """
        do something
        True: plugin return successfully
        False: something goes wrong, stamp will not be created
        """
        self.logger.info("Hello you, {}".format(self.greeting))
        return True
    
    def generate_report(self):
        final_report = "\n".join(self.report)
        self.info_msg(final_report)
        self._write_to(final_report, self.REPORT_NAME)
    
    def _write_to(self, content, name):
        file_path = "{}/{}".format(self.path_case_plugin, name)
        super()._write_to(content, file_path)

    def cleanup(self):
        super().cleanup()

Write your own plugin

This is a tutorial for writing a bug bisection plugin. This plugin tests whether a bug is reproducible on a list of decreased kernel versions. (e.g., v5.7, v5.6, v5.5 ...). If the bug fails to reproduce on a certain kernel version, it often indicates the buggy vulnerability was introduced later on. (e.g., if the bug reproduced on v5.7, v5.6, but failed on v5.5, it means the buggy commit appear between v5.5 to v5.6)

Create a plugin folder

The plugin folder contains at least two files: __init__.py and name_of_the_plugin.py

__init__.py contains a DESCRIPTION, ENABLE, and AS_SERVICE. DESCRIPTION is the description of the plugin that shows in the help information of Expbridge. ENABLE decides whether the plugin is enabled. It doesn't automatically apply the plugin to the workflow, users have to pass the argument --name-of-the-plugin to actually use the plugin. See more details in Run-ExpBridge. AS_SERVICE determines whether to treat the plugin as a service. A service plugin can't be invoked through command line, it's often used by other plugins as a library. For example, SyzkallerInterface plugin provides SyzkallerInterface class to other plugins.

# __init__.py
from .bug_bisection import BugBisection

DESCRIPTION = "BugBisection is a plugin that determines the first buggy kernel version"
ENABLE = True
AS_SERVICE = False

Now, we start writing the main functions of the plugin. In prepare() function, we want to retrieve the repro_timeout, how many attempt, and a kernel_version list.

    def prepare(self):
        plugin = self.cfg.get_plugin(self.NAME)
        if plugin == None:
            self.err_msg("No such plugin {}".format(self.NAME))
        try:
            self.repro_timeout = int(plugin.timeout)
        except AttributeError:
            self.err_msg("Failed to get timeout")
            return False
        try:
            self.repro_attempt = int(plugin.attempt)
        except AttributeError:
            self.repro_attempt = 3
        try:
            self.kernel_version = plugin.kernel_version
        except AttributeError:
            return False
        return self.prepare_on_demand()

In the configuration file, the plugin config is defined as follows:

"BugBisection": {
     "kernel_version": {"e5fd3e6": ["v5.7", "v5.6", "v5.5", "v5.4", "v5.3", "v5.2", "v5.1", "v5.0"]},
     "timeout": 300,
     "attempt": 10
}

Next, we write the main entrance - run() function. First step, we decide whether the PoC is 32bit or 64bit. And then, we iterate the kernel version from the kernel_version list. For each kernel version, we call build_upstream_kernel to compile the Linux kernel, and test_poc.

    def run(self):
        i386 = False
        if regx_match(r'386', self.case["manager"]):
            i386 = True
        for version in self.kernel_version[self.case_hash]:
            self.logger.info("Now testing {}".format(version))
            if self.build_upstream_kernel(kernel_version=version) != 0:
                self.err_msg("Failed to build upstream kernel")
                return False
            if not self.test_poc(i386=i386, version=version):
                self.report.append("Bug doesn't reproduce on {}".format(version))
                return True
            self.report.append("Bug triggered on {}".format(version))
        return True

Compiling the Linux kernel is easy. ExpBridge provides an API build_mainline_kernel for building Linux kernel. We pass the specific kernel version to the commit argument, and keep the original kernel configuration by setting keep_ori_config to True. For other arguments of build_mainline_kernel, please refer to the API Reference

    def build_upstream_kernel(self, kernel_version):
        if self._check_stamp("BUILD_KERNEL"):
            self._remove_stamp("BUILD_KERNEL")
        ret = self.build_mainline_kernel(commit=kernel_version, keep_ori_config=True)
        if ret == 0:
            self._create_stamp("BUILD_SYZ_FEATURE_MINIMIZE_KERNEL")
        return ret

We want to run the PoC on an upstream kernel in a virtual machine and monitor the kernel crash if it appears. Luckily, ExpBridge took care of most of the steps. We first get the kernel from Config().get_kernel_by_name(). self.kernel is obtained from self.case['kernel'], which eventually comes from Syzbot. Config().get_kernel_by_name() return a Vendor() class. The Vendor() class contains a Reproducer() class that makes bug reproducing super easy.

The reproduce() function needs several arguments, func is the callback function that the VM will eventually invoke after it boots up. By using this callback function, you can decide what to do after VM is ready, how to reproduce the bug, or perform some preparation if needed. func_args pass the callback function arguments to the callback function. vm_tag isn't mandatory but it helps with the identification. timeout specifies the maximum running time of the VM. The VM will be killed when the time is out. attempt indicates the number of rounds of reproducing needed. If root is true, it indicates reproducing the PoC as the root user. work_dir is the work directory that saves the reproducing log, and VM log. c_hash should be the bug hash value, it's used in multiple places, such as log file name. The return values of reproduce() are crash context, triggerability status, and extra output if there are any.

    def test_poc(self, i386, version):
        upstream = self.cfg.get_kernel_by_name(self.kernel)
        if upstream == None:
            self.logger.exception("Fail to get {} kernel".format(self.kernel))
            return False
        upstream.repro.init_logger(self.logger)
        _, triggered, _ = upstream.repro.reproduce(func=self._capture_crash, func_args=(i386,), vm_tag='test {}'.format(version),\
            timeout=self.repro_timeout + 100, attempt=self.repro_attempt, root=True, work_dir=self.path_case_plugin, c_hash=self.case_hash)
        self.info_msg("crash triggered: {}".format(triggered))
        return triggered

The last missing puzzle is the callback function _capture_crash(). The callback function of reproduce() has two mandatory arguments qemu, which is a VM() class object. It gives user the capability to operate the VM, such as uploading/downloading file to/from the VM. The second argument root indicates whether reproducing the PoC by the root user. The following arguments are customized arguments, here we only pass the i386 to the callback function.

In order to run the PoC, we upload the PoC source code into the VM, and compile the source code to a Linux executable binary. And finally, we run the PoC program. Note that you don't need to worry about how to capture the kernel crash, ExpBridge did it for you. Normally, Expbridge will capture every kernel crash report, but you can also specify what crash you want to capture or exclude by setting the include and exclude in the configuration file. See more details here

    def _capture_crash(self, qemu: VM, root: bool, i386: bool):
        qemu.upload(user='root', src=["{}/poc.c".format(self.path_case_plugin)], dst="~/poc.c", wait=True)
        if i386:
            qemu.command(cmds="gcc -m32 -pthread -o poc poc.c", user="root", wait=True)
        else:
            qemu.command(cmds="gcc -pthread -o poc poc.c", user="root", wait=True)
        
        qemu.command(cmds="./poc", user="root", wait=True, timeout=self.repro_timeout)
        return