From 4902141df6bcffecdb5392c3aaf4d0d67cf98671 Mon Sep 17 00:00:00 2001 From: gaasedelen Date: Sun, 12 Sep 2021 22:42:24 -0400 Subject: [PATCH] keep disassembler focused on 'last known address' when stepping through unmapped sequences --- plugins/tenet/context.py | 31 ++++++- plugins/tenet/integration/api/ida_api.py | 3 + plugins/tenet/integration/ida_integration.py | 14 ++- plugins/tenet/trace/analysis.py | 93 +++++++++++++++++--- 4 files changed, 129 insertions(+), 12 deletions(-) diff --git a/plugins/tenet/context.py b/plugins/tenet/context.py index 46d8ef8..03e8b27 100644 --- a/plugins/tenet/context.py +++ b/plugins/tenet/context.py @@ -343,7 +343,36 @@ def _idx_changed(self, idx): This will make the disassembler track with the PC/IP of the trace reader. """ - disassembler[self].navigate(self.reader.rebased_ip) + dctx = disassembler[self] + + # + # get a 'rebased' version of the current instruction pointer, which + # should map to the disassembler / open database if it is a code + # address that is known + # + + bin_address = self.reader.rebased_ip + + # + # if the code address is in a library / other unknown area that + # cannot be renedered by the disassembler, then resolve the last + # known trace 'address' within the database + # + + if not dctx.is_mapped(bin_address): + last_good_idx = self.reader.analysis.get_prev_mapped_idx(idx) + if last_good_idx == -1: + return # navigation is just not gonna happen... + + # fetch the last instruction pointer to fall within the trace + last_good_trace_address = self.reader.get_ip(last_good_idx) + + # convert the trace-based instruction pointer to one that maps to the disassembler + bin_address = self.reader.analysis.rebase_pointer(last_good_trace_address) + + # navigate the disassembler to a 'suitable' address based on the trace idx + dctx.navigate(bin_address) + disassembler.refresh_views() def _select_trace_file(self): """ diff --git a/plugins/tenet/integration/api/ida_api.py b/plugins/tenet/integration/api/ida_api.py index 60890d5..6056590 100644 --- a/plugins/tenet/integration/api/ida_api.py +++ b/plugins/tenet/integration/api/ida_api.py @@ -117,6 +117,9 @@ def execute_ui(function): def get_disassembler_user_directory(self): return ida_diskio.get_user_idadir() + def refresh_views(self): + ida_kernwin.refresh_idaview_anyway() + def get_disassembly_background_color(self): """ Get the background color of the IDA disassembly view. diff --git a/plugins/tenet/integration/ida_integration.py b/plugins/tenet/integration/ida_integration.py index b30185d..cc6bfe4 100644 --- a/plugins/tenet/integration/ida_integration.py +++ b/plugins/tenet/integration/ida_integration.py @@ -6,6 +6,7 @@ # import ida_dbg +import ida_bytes import ida_idaapi import ida_kernwin @@ -443,6 +444,17 @@ def _highlight_disassesmbly(self, lines_out, widget, lines_in): rebased_address = ctx.reader.analysis.rebase_pointer(address) trail[rebased_address] = ida_color + current_address = ctx.reader.rebased_ip + if not ida_bytes.is_mapped(current_address): + last_good_idx = ctx.reader.analysis.get_prev_mapped_idx(ctx.reader.idx) + if last_good_idx != -1: + + # fetch the last instruction pointer to fall within the trace + last_good_trace_address = ctx.reader.get_ip(last_good_idx) + + # convert the trace-based instruction pointer to one that maps to the disassembler + current_address = ctx.reader.analysis.rebase_pointer(last_good_trace_address) + for section in lines_in.sections_lines: for line in section: address = line.at.toea() @@ -451,7 +463,7 @@ def _highlight_disassesmbly(self, lines_out, widget, lines_in): color = backward_trail[address] elif address in forward_trail: color = forward_trail[address] - elif address == ctx.reader.rebased_ip: + elif address == current_address: color = current_color else: continue diff --git a/plugins/tenet/trace/analysis.py b/plugins/tenet/trace/analysis.py index 408586f..4955302 100644 --- a/plugins/tenet/trace/analysis.py +++ b/plugins/tenet/trace/analysis.py @@ -1,3 +1,4 @@ +import bisect import collections #----------------------------------------------------------------------------- @@ -25,12 +26,51 @@ def __init__(self, trace, dctx): self._dctx = dctx self._trace = trace self._remapped_regions = [] + self._unmapped_entry_points = [] self._analyze() + #------------------------------------------------------------------------- + # Public + #------------------------------------------------------------------------- + + def rebase_pointer(self, address): + """ + Return a rebased version of the given address, if one exists. + """ + for m1, m2 in self._remapped_regions: + #print(f"m1 start: {m1[0]:08X} address: {address:08X} m1 end: {m1[1]:08X}") + #print(f"m2 start: {m2[0]:08X} address: {address:08X} m2 end: {m2[1]:08X}") + if m1[0] <= address <= m1[1]: + return address + (m2[0] - m1[0]) + if m2[0] <= address <= m2[1]: + return address - (m2[0] - m1[0]) + return address + + def get_prev_mapped_idx(self, idx): + """ + Return the previous idx to fall within a mapped code region. + """ + index = bisect.bisect_right(self._unmapped_entry_points, idx) - 1 + try: + return self._unmapped_entry_points[index] + except IndexError: + return -1 + + #------------------------------------------------------------------------- + # Analysis + #------------------------------------------------------------------------- + def _analyze(self): """ Analyze the trace against the binary loaded by the disassembler. """ + self._analyze_aslr() + self._analyze_unmapped() + + def _analyze_aslr(self): + """ + Analyze trace execution to resolve ASLR mappings against the disassembler. + """ dctx, trace = self._dctx, self._trace # get instruction addresses from disassembler @@ -87,16 +127,49 @@ def _analyze(self): #print(f"TRC ADDRESSES: {len(trace_addresses)}") #print(f"INT ADDRESSES: {len(interesting_addresses)}") - def rebase_pointer(self, address): + def _analyze_unmapped(self): """ - Return a rebased version of the given address, if one exists. + Analyze trace execution to identify entry/exit to unmapped segments. """ - for m1, m2 in self._remapped_regions: - #print(f"m1 start: {m1[0]:08X} address: {address:08X} m1 end: {m1[1]:08X}") - #print(f"m2 start: {m2[0]:08X} address: {address:08X} m2 end: {m2[1]:08X}") - if m1[0] <= address <= m1[1]: - return address + (m2[0] - m1[0]) - if m2[0] <= address <= m2[1]: - return address - (m2[0] - m1[0]) + trace, ips = self._trace, self._trace.ip_addrs + lower_mapped, upper_mapped = self._remapped_regions[0][1] - return address + # + # for speed, pull out the 'compressed' ip indexes that matched mapped + # (known) addresses within the disassembler context + # + + mapped_ips = set() + for i, address in enumerate(ips): + if lower_mapped <= address <= upper_mapped: + mapped_ips.add(i) + + last_good_idx = 0 + unmapped_entries = [] + + # loop through each segment in the trace + for seg in trace.segments: + seg_ips = seg.ips + seg_base = seg.base_idx + + # loop through each executed instruction in this segment + for relative_idx in range(0, seg.length): + compressed_ip = seg_ips[relative_idx] + + # the current instruction is in an unmapped region + if compressed_ip not in mapped_ips: + + # if we were in a known/mapped region previously, then save it + if last_good_idx: + unmapped_entries.append(last_good_idx) + last_good_idx = 0 + + # if we are in a good / mapped region, update our current idx + else: + last_good_idx = seg_base + relative_idx + + #for idx in unmapped_entries: + # print(f"Analysis: Unmapped @ IDX {idx:,}") + + #print(f" - Unmapped Entry Points: {len(unmapped_entries)}") + self._unmapped_entry_points = unmapped_entries