Skip to content

Commit

Permalink
GUI constraint editor turning over. Rough edges to work on, but funct…
Browse files Browse the repository at this point in the history
…ional.
  • Loading branch information
jim-carciofini committed Aug 12, 2024
1 parent 89dea40 commit 48926d7
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 62 deletions.
169 changes: 111 additions & 58 deletions pate_binja/pate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import shlex
import signal
import sys
import threading
from json import JSONDecodeError
from subprocess import Popen, PIPE, STDOUT, TimeoutExpired
from typing import IO, Any, Optional
Expand Down Expand Up @@ -56,6 +57,8 @@ def __init__(self, filename: os.PathLike,
self.trace_file = None
self.last_cfar_graph = None

self.traceConstraintModeDone = threading.Event()

def run(self) -> None:
if self.filename.endswith(".run-config.json"):
self._run_live()
Expand Down Expand Up @@ -441,23 +444,25 @@ def _ask_user(self, prompt: str, choices: list[str]) -> Optional[str]:
return choice

def command_loop(self):
if self.config_callback:
self.config_callback(self.config)
rec = self.next_json()
self._command('goto_prompt')
while self.command_step():
pass
self.user.show_message("Pate finished")

def command_step(self):
# Process one json record from pate
try:
rec = self.next_json(gotoPromptAfterNonJson=True)
return self.process_json(rec)
if self.config_callback:
self.config_callback(self.config)
rec = self.next_json()
self._command('goto_prompt')
while self.command_step():
pass
# Enter trace constraint processing mode
self.traceConstraintModeDone.wait()
self.user.show_message("Pate finished")
except EOFError:
self.user.show_message("Pate terminated unexpectedly")
return False

def command_step(self):
# Process one json record from pate
rec = self.next_json(gotoPromptAfterNonJson=True)
return self.process_json(rec)

def process_json(self, rec):

if self.debug_json:
Expand All @@ -466,50 +471,7 @@ def process_json(self, rec):

if isinstance(rec, dict) and rec.get('this') == 'Regenerate result with new trace constraints?':
# Finish detected
self._command('up')
rec = self.next_json()
#isinstance(rec, dict) and rec.get('trace_node_kind') == 'final_result':
self.user.show_message('\nProcessing verification results.\n')
with io.StringIO() as out:
for tnc in rec['trace_node_contents']:
eqconds = tnc.get('content', {}).get('eq_conditions', {}).get('map')
if eqconds:
# Found eq conditions
for item in eqconds:
node = item['key']
eqcond = item['val']

node_id = get_graph_node_id(node)
predicate = eqcond['predicate']
trace_true = eqcond['trace_true']
trace_false = eqcond['trace_false']
trace_footprint = eqcond['trace_footprint']

#print('CFAR id:', node_id)

out.write(f'Equivalence condition for {node_id}\n')
pprint_symbolic(out, predicate)
out.write('\n')

#out.write('\nTrace True\n')
#pprint_node_event_trace(trace_true, 'True Trace', out=out)

#out.write('\nTrace False\n')
#pprint_node_event_trace(trace_false, 'False Trace', out=out)

if self.last_cfar_graph:
cfar_node = self.last_cfar_graph.get(node_id)
cfar_node.predicate = predicate
cfar_node.trace_true = trace_true
cfar_node.trace_false = trace_false
cfar_node.trace_footprint = trace_footprint

self.user.show_message(out.getvalue())
if self.last_cfar_graph:
self.user.show_cfar_graph(self.last_cfar_graph)

self._command('goto_prompt')
# Exit for now, but this is where we want to enter loop to handle trace constraints.
self.processFinalResult()
return False

elif isinstance(rec, dict) and rec.get('this') and rec.get('trace_node_contents'):
Expand Down Expand Up @@ -573,6 +535,89 @@ def process_json(self, rec):

return True

def processFinalResult(self):
self._command('up')
rec = self.next_json()
# isinstance(rec, dict) and rec.get('trace_node_kind') == 'final_result':
self.user.show_message('\nProcessing verification results.\n')
with io.StringIO() as out:
for tnc in rec['trace_node_contents']:
eqconds = tnc.get('content', {}).get('eq_conditions', {}).get('map')
if eqconds:
# Found eq conditions
for item in eqconds:
node = item['key']
eqcond = item['val']

node_id = get_graph_node_id(node)
predicate = eqcond['predicate']
trace_true = eqcond['trace_true']
trace_false = eqcond['trace_false']
trace_footprint = eqcond['trace_footprint']

# print('CFAR id:', node_id)

out.write(f'Equivalence condition for {node_id}\n')
pprint_symbolic(out, predicate)
out.write('\n')

# out.write('\nTrace True\n')
# pprint_node_event_trace(trace_true, 'True Trace', out=out)

# out.write('\nTrace False\n')
# pprint_node_event_trace(trace_false, 'False Trace', out=out)

if self.last_cfar_graph:
cfar_node = self.last_cfar_graph.get(node_id)
cfar_node.predicate = predicate
cfar_node.trace_true = trace_true
cfar_node.trace_false = trace_false
cfar_node.trace_footprint = trace_footprint

self.user.show_message(out.getvalue())
if self.last_cfar_graph:
self.user.show_cfar_graph(self.last_cfar_graph)
self._command('goto_prompt')
rec = self.next_json()

def processTraceConstraints(self, traceConstraints: list[tuple[TraceVar, str, str]]):
# TODO: infrastructure to do this in the background on same thread as command loop
with io.StringIO() as out:
# input "[ [ { \"var\" : { \"symbolic_ident\" : 0 }, \"op\" : \"EQ\", \"const\" : \"128\"} ] ]"
# TODO: Handle multiple nodes in final result
out.write(r'input "[')
# TODO: Handle multiple eq conds
out.write(r'[')
for tc in traceConstraints:
out.write(r'{\"var\":{\"symbolic_ident\":')
# symbolic_ident
out.write(str(tc[0].symbolic_ident))
out.write(r'},\"op\":\"')
# op
out.write(tc[1])
out.write(r'\",\"const\":\"')
# int const
out.write(str(tc[2]))
out.write(r'\"}')
out.write(r']')
out.write(r']"')
verifierTraceConstraintInput = out.getvalue()

print('verifierTraceConstraintInput:', verifierTraceConstraintInput)
self.debug_io = True
self._command('0')
# TODO: Consider generalizing command_loop rather than this processing?
while True:
rec = self.next_json()
if isinstance(rec, dict) and rec['this'] == 'Waiting for constraints..':
break
self._command(verifierTraceConstraintInput)
while True:
rec = self.next_json()
if isinstance(rec, dict) and rec['this'] == 'Regenerate result with new trace constraints?':
break
self.processFinalResult()

def show_message(self, rec: Any):
if isinstance(rec, list):
for m in rec:
Expand Down Expand Up @@ -732,6 +777,8 @@ def __init__(self, kind, raw):
self.raw = raw
self.pretty = 'unknown'
self.numBits = 0
self.type = None
self.symbolic_ident = None

match self.kind:
case 'reg_op':
Expand All @@ -746,13 +793,19 @@ def __init__(self, kind, raw):
with io.StringIO() as out:
out.write(f'{get_addr_id(raw["fst"])}: {mem_op["direction"]} {get_value_id(mem_op["addr"])} ')
self.pretty = out.getvalue()
offset = mem_op['value']['offset']
if isinstance(offset, dict):
self.symbolic_ident = offset['symbolic_ident']
self.type = offset['type']
self.numBits = mem_op['size'] * 8

def extractTraceVars(raw) -> list[TraceVar]:
def extractTraceVars(rawFootprint) -> list[TraceVar]:
traceVars = []
#for r in raw['fp_initial_regs']['reg_op']['map']:
# traceVars.append(TraceVar('reg_op', r))
for r in raw['fp_mem']:
for r in rawFootprint['original']['fp_mem']:
traceVars.append(TraceVar('mem_op', r))
for r in rawFootprint['patched']['fp_mem']:
traceVars.append(TraceVar('mem_op', r))
# TODO: sort by instruction addr, but reverse seems to work for now
traceVars.reverse()
Expand Down
24 changes: 20 additions & 4 deletions pate_binja/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ def __init__(self, cfarNode, parent=None):
main_layout.addWidget(mainSplitter)
self.setLayout(main_layout)

self.updateFromCfarNode()

def updateFromCfarNode(self):
with io.StringIO() as out:
pate.pprint_symbolic(out, self.cfarNode.predicate)
self.eqCondField.appendPlainText(out.getvalue())
Expand All @@ -525,19 +528,32 @@ def showTrueTraceConstraintDialog(self):
#d.setWindowTitle(f'{d.windowTitle()} - {cfarNode.id}')
if d.exec():
print(d.getConstraints())
# TODO:
# - Get constraint form dialog
# - Send constraint to pate
# - Wait for pate to compute
# - Get new result node form pate
# TODO: Spin dialog?
pw: Optional[PateWidget] = getAncestorInstanceOf(self, PateWidget)
# TODO: Better way to do this?
pw.pate_thread.pate_wrapper.processTraceConstraints(d.getConstraints())
# - update traces widgit
self.updateFromCfarNode()
# TODO: report failed constraint?
QMessageBox.warning(self, "Warning", "TODO: process trace constraint")

traceConstraintRelations = ["LTs", "LTu", "GTs", "GTu", "LEs", "LEu", "GEs", "GEu", "NEQ", "EQ"]
traceConstraintRelations = ["EQ", "NEQ", "LTs", "LTu", "GTs", "GTu", "LEs", "LEu", "GEs", "GEu"]

class PateTraceConstraintDialog(QDialog):
def __init__(self, cfarNode: pate.CFARNode, parent=None):
super().__init__(parent)

self.cfarNode = cfarNode

# TODO: Handle original AND patched
rawTraceVars = self.cfarNode.trace_footprint['original']
self.traceVars = pate.extractTraceVars(rawTraceVars)
self.traceVars = pate.extractTraceVars(self.cfarNode.trace_footprint)

# Prune TraceVars with no symbolic_ident
self.traceVars = [tv for tv in self.traceVars if tv.symbolic_ident is not None]

#self.resize(1500, 800)
self.setWindowTitle("Trace Constraint")
Expand Down

0 comments on commit 48926d7

Please sign in to comment.