Skip to content

Commit

Permalink
feat: multithreading
Browse files Browse the repository at this point in the history
The basic idea is actually pretty simple: there is a single
python interpreter, so hooks in different
threads can share python state but need
to acquire the GIL to enter hooks. Since hooks typically do not
block, this should not introduce any new deadlock conditions
in the target program.

Properly supporting multithreading required a ton of insane hacks.
The notable one is patching CPython calls to pthread_cond_init and
pthread_mutex_init to use the pshared attribute (since
our python threads are actually running under a different pgid).

However, I found out we are getting an ancient version of pthreads.
I had to patch the Dynamorio private loader to read the ELF symbol
version table and ignore versions that are marked 'hidden'. I'm not
quite sure why we would get the wrong version by default,
since parsing symbol versions is not required (and some
loaders may simply not do it), so idk.

There were a bunch of hangs which took a long time to debug---
cases where the GIL was not properly dropped around exits,
and cases where our own thread structures were not properly
free'd around exits. We also attempt to call Py_Finalize
on exit now, but it sometimes seems to return an error
code still.

CTRL-C still may not work correctly.
  • Loading branch information
ndrewh committed Jul 8, 2024
1 parent 5e44122 commit 51adc16
Show file tree
Hide file tree
Showing 12 changed files with 652 additions and 74 deletions.
48 changes: 48 additions & 0 deletions examples/ltrace_multithreaded.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# This is basically just ltrace.py but we also print the thread number
# and print a message when threads are created.

from pyda import *
from pwnlib.elf.elf import ELF
from pwnlib.util.packing import u64
import string
import sys, time

p = process()

e = ELF(p.exe_path)
e.address = p.maps[p.exe_path].base

plt_map = { e.plt[x]: x for x in e.plt }

def guess_arg(x):
printable_chars = bytes(string.printable, 'ascii')

# Is pointer?
if x > 0x100000000:
try:
data = p.read(x, 0x20)
if all([c in printable_chars for c in data[:4]]):
return str(data[:data.index(0)])
except:
pass

return hex(x)

def lib_hook(p):
name = plt_map[p.regs.rip]
print(f"[thread {p.tid}] {name}(" + ", ".join([
f"rdi={guess_arg(p.regs.rdi)}",
f"rsi={guess_arg(p.regs.rsi)}",
f"rdx={guess_arg(p.regs.rdx)}",
f"rcx={guess_arg(p.regs.rcx)}",
]) + ")")

def thread_entry(p):
print(f"thread_entry for {p.tid}")

p.set_thread_entry(thread_entry)

for x in e.plt:
p.hook(e.plt[x], lib_hook)

p.run()
2 changes: 1 addition & 1 deletion lib/pyda/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def after_call_hook(p):
self.hook(addr, call_hook)

def set_thread_entry(self, callback):
self._p.set_thread_init_hook(callback)
self._p.set_thread_init_hook(lambda p: callback(self))

def read(self, addr, size):
return self._p.read(addr, size)
Expand Down
11 changes: 10 additions & 1 deletion patches/cpython-3.10.12.patch
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ index 8d2221cfd8..1372cbc291 100644

#if defined(FAULTHANDLER_USE_ALT_STACK) && defined(HAVE_LINUX_AUXVEC_H) && defined(HAVE_SYS_AUXV_H)
diff --git a/Python/thread_pthread.h b/Python/thread_pthread.h
index 35b9810aa3..4085158070 100644
index 35b9810aa3..130528aec3 100644
--- a/Python/thread_pthread.h
+++ b/Python/thread_pthread.h
@@ -100,7 +100,7 @@
Expand All @@ -24,3 +24,12 @@ index 35b9810aa3..4085158070 100644
#endif


@@ -392,7 +392,7 @@ PyThread_allocate_lock(void)
lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t));

if (lock) {
- status = sem_init(lock,0,1);
+ status = sem_init(lock,1,1);
CHECK_STATUS("sem_init");

if (error) {
333 changes: 330 additions & 3 deletions patches/dynamorio-10.0.patch

Large diffs are not rendered by default.

143 changes: 125 additions & 18 deletions pyda_core/pyda_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,60 @@ pyda_process* pyda_mk_process() {
ABORT_IF_NODYNAMORIO;

pyda_process *proc = dr_global_alloc(sizeof(pyda_process));
proc->refcount = 2;
proc->refcount = 0; // xxx: will be incremented to 1 by first pyda_mk_thread
proc->dirty_hooks = 0;
proc->main_thread = pyda_mk_thread(proc);
proc->callbacks = NULL;
proc->thread_init_hook = NULL;
proc->py_obj = NULL;

pthread_condattr_t condattr;
int ret;
if (ret = pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED)) {
dr_fprintf(STDERR, "pthread_condattr_setpshared failed: %d\n", ret);
dr_abort();
}
if (ret = pthread_cond_init(&proc->thread_exit_cond, &condattr)) {
dr_fprintf(STDERR, "pthread_cond_init failed %d\n", ret);
dr_abort();
}

pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
if (ret = pthread_mutex_init(&proc->refcount_mutex, &attr)) {
dr_fprintf(STDERR, "pthread_mutex_init failed %d\n", ret);
dr_abort();
}
return proc;
}

pyda_thread* pyda_mk_thread(pyda_process *proc) {
ABORT_IF_NODYNAMORIO;

pyda_thread *thread = dr_global_alloc(sizeof(pyda_thread));
pthread_cond_init(&thread->resume_cond, 0);
pthread_cond_init(&thread->break_cond, 0);
pthread_condattr_t condattr;
pthread_condattr_init(&condattr);
int ret;
if (ret = pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED)) {
dr_fprintf(STDERR, "pthread_condattr_setpshared failed: %d\n", ret);
dr_abort();
}
if (ret = pthread_cond_init(&thread->resume_cond, &condattr)) {
dr_fprintf(STDERR, "pthread_cond_init failed %d\n", ret);
dr_abort();
}

if (pthread_cond_init(&thread->break_cond, &condattr)) {
dr_fprintf(STDERR, "pthread_cond_init failed\n");
dr_abort();
}

pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&thread->mutex, &attr);

// Start with it locked...
Expand All @@ -45,64 +81,119 @@ pyda_thread* pyda_mk_thread(pyda_process *proc) {
thread->app_yielded = 0;
thread->proc = proc;

dr_atomic_add32_return_sum(&thread->proc->refcount, 1);

thread->yield_count = 0;

static volatile unsigned int tid = 0;
thread->tid = dr_atomic_add32_return_sum(&tid, 1);
thread->rip_updated_in_cleancall = 0;
thread->skip_next_hook = 0;
thread->python_exited = 0;
thread->errored = 0;

// PyErr_SetString(PyExc_RuntimeError, "OK");
return thread;
}

void pyda_process_destroy(pyda_process *p) {
// We must be holding the GIL lock so we can drop the refs
PyGILState_STATE gstate = PyGILState_Ensure();

DEBUG_PRINTF("pyda_process_destroy\n");
if (p->thread_init_hook)
Py_DECREF(p->thread_init_hook);

p->thread_init_hook = NULL;

pyda_hook *cb = p->callbacks;
while (cb) {
Py_DECREF(cb->py_func);
cb->py_func = NULL;

void *del = cb;
cb = cb->next;

dr_global_free(del, sizeof(pyda_hook));
}
dr_global_free(p, sizeof(pyda_process));

PyGILState_Release(gstate);
}

void pyda_thread_destroy(pyda_thread *t) {
if (--t->proc->refcount == 0) {
pthread_mutex_lock(&t->proc->refcount_mutex);

int new_refcount = dr_atomic_add32_return_sum(&t->proc->refcount, -1);
if (new_refcount == 0) {
pyda_process_destroy(t->proc);
} else {
pthread_cond_signal(&t->proc->thread_exit_cond);
pthread_mutex_unlock(&t->proc->refcount_mutex);
}

dr_global_free(t, sizeof(pyda_thread));
}

void pyda_thread_destroy_last(pyda_thread *t) {
// wait for this thread to be the final thread
pthread_mutex_lock(&t->proc->refcount_mutex);
while (t->proc->refcount > 1)
pthread_cond_wait(&t->proc->thread_exit_cond, &t->proc->refcount_mutex);

DEBUG_PRINTF("pyda_thread_destroy_last unblock\n")
pthread_mutex_unlock(&t->proc->refcount_mutex);
pyda_thread_destroy(t);
}

void pyda_yield_noblock(pyda_thread *t) {
t->python_yielded = 1;
pthread_mutex_lock(&t->mutex);
pthread_cond_signal(&t->resume_cond);
pthread_mutex_unlock(&t->mutex);
}

// yield from python to the executable
void pyda_yield(pyda_thread *t) {
t->python_yielded = 1;
pthread_cond_signal(&t->resume_cond);
DEBUG_PRINTF("pyda_yield\n");
t->yield_count++;

// here we wait for the executable to signal
// dr_set_safe_for_sync(false);

pthread_mutex_lock(&t->mutex);
pthread_cond_signal(&t->resume_cond);

while (!t->app_yielded)
pthread_cond_wait(&t->break_cond, &t->mutex);

t->python_yielded = 0;
t->app_yielded = 0;
pthread_mutex_unlock(&t->mutex);

// dr_set_safe_for_sync(true);
}

void pyda_break_noblock(pyda_thread *t) {
t->app_yielded = 1;
pthread_mutex_lock(&t->mutex);
pthread_cond_signal(&t->break_cond);
pthread_mutex_unlock(&t->mutex);
}

// yield from the executable back to python
void pyda_break(pyda_thread *t) {
t->app_yielded = 1;
pthread_cond_signal(&t->break_cond);

// here we wait for the python to signal
// dr_set_safe_for_sync(false);

pthread_mutex_lock(&t->mutex);
pthread_cond_signal(&t->break_cond);

while (!t->python_yielded)
pthread_cond_wait(&t->resume_cond, &t->mutex);

t->app_yielded = 0;
t->python_yielded = 0;
pthread_mutex_unlock(&t->mutex);
// dr_set_safe_for_sync(true);
}
Expand All @@ -113,6 +204,7 @@ void pyda_initial_break(pyda_thread *t) {
while (!t->python_yielded)
pthread_cond_wait(&t->resume_cond, &t->mutex);

t->python_yielded = 0;
pthread_mutex_unlock(&t->mutex);
// dr_set_safe_for_sync(true);
}
Expand All @@ -125,6 +217,10 @@ PyObject *pyda_run_until(pyda_thread *proc, uint64_t addr) {
void pyda_add_hook(pyda_process *t, uint64_t addr, PyObject *callback) {
pyda_hook *cb = dr_global_alloc(sizeof(pyda_hook));
cb->py_func = callback;

Py_INCREF(callback);
DEBUG_PRINTF("pyda_add_hook %p %p\n", cb, cb->py_func);

cb->callback_type = 0;
cb->next = t->callbacks;
cb->addr = (void*)addr;
Expand Down Expand Up @@ -152,8 +248,7 @@ void pyda_remove_hook(pyda_process *p, uint64_t addr) {
}

void pyda_set_thread_init_hook(pyda_process *p, PyObject *callback) {
// TODO: hold some global lock here, just in case this gets called
// other than at startup
// NOTE: GIL is held

if (p->thread_init_hook)
Py_DECREF(p->thread_init_hook);
Expand All @@ -172,7 +267,7 @@ int pyda_flush_hooks() {
if (cb->callback_type == 0) {
DEBUG_PRINTF("dr_flush_region: %llx\n", (void*)cb->addr);
dr_flush_region((void*)cb->addr, 1);
DEBUG_PRINTF("dr_flush_region end");
DEBUG_PRINTF("dr_flush_region end\n");
}
cb = cb->next;
}
Expand All @@ -196,6 +291,8 @@ void pyda_hook_cleancall(pyda_hook *cb) {
return;
}

if (t->errored) return;

PyGILState_STATE gstate;
gstate = PyGILState_Ensure();

Expand All @@ -206,18 +303,28 @@ void pyda_hook_cleancall(pyda_hook *cb) {
t->cur_context.pc = (app_pc)cb->addr;
t->rip_updated_in_cleancall = 0;

PyObject *result = PyObject_CallFunctionObjArgs(cb->py_func, t->py_obj, NULL);
DEBUG_PRINTF("cleancall %p %p %p\n", cb, cb->py_func, t);

PyObject *result = PyObject_CallFunctionObjArgs(cb->py_func, t->proc->py_obj, NULL);
if (result == NULL) {
dr_fprintf(STDERR, "\n[Pyda] ERROR: Hook call failed. Skipping future hooks on thread %d\n", t->tid);
dr_flush_file(STDERR);
t->errored = 1;
PyErr_Print();
dr_fprintf(STDERR, "[Pyda] ERROR: Hook call failed. Aborting.\n");
dr_abort();
dr_fprintf(STDERR, "\n");
// dr_abort();
} else {
Py_DECREF(result);
}
Py_DECREF(result);

DEBUG_PRINTF("cleancall ret %p %p %p\n", cb, cb->py_func, t);

if (t->cur_context.pc == (app_pc)cb->addr && t->rip_updated_in_cleancall) {
if (t->rip_updated_in_cleancall) {
fprintf(stderr, "Hook updated RIP to the same address. This is UB. Aborting.\n");
dr_abort();
dr_fprintf(STDERR, "\n[Pyda] Hook updated RIP to the same address. This is UB. Skipping future hooks.\n");
dr_flush_file(STDERR);
t->errored = 1;
// dr_abort();
}
}

Expand Down
Loading

0 comments on commit 51adc16

Please sign in to comment.