From 40e17b05b3010370894026af8b85814b2c7bfc53 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Wed, 6 Mar 2024 00:37:05 +0200 Subject: [PATCH] memsink client: bump last_as_blank_ts on every flock() --- python/src/ustreamer.c | 76 +++++++++++++++++++++++------------------- src/libs/memsink.c | 5 ++- 2 files changed, 46 insertions(+), 35 deletions(-) diff --git a/python/src/ustreamer.c b/python/src/ustreamer.c index bbd551ac3..37b4f61f5 100644 --- a/python/src/ustreamer.c +++ b/python/src/ustreamer.c @@ -109,57 +109,66 @@ static PyObject *_MemsinkObject_exit(_MemsinkObject *self, PyObject *Py_UNUSED(i static int _wait_frame(_MemsinkObject *self) { const ldf deadline_ts = us_get_now_monotonic() + self->wait_timeout; -# define RETURN_OS_ERROR { \ - Py_BLOCK_THREADS \ - PyErr_SetFromErrno(PyExc_OSError); \ - return -1; \ - } - + int locked = -1; ldf now_ts; do { Py_BEGIN_ALLOW_THREADS - const int retval = us_flock_timedwait_monotonic(self->fd, self->lock_timeout); + locked = us_flock_timedwait_monotonic(self->fd, self->lock_timeout); now_ts = us_get_now_monotonic(); - - if (retval < 0 && errno != EWOULDBLOCK) { - RETURN_OS_ERROR; - - } else if (retval == 0) { - us_memsink_shared_s *mem = self->mem; - if (mem->magic == US_MEMSINK_MAGIC && mem->version == US_MEMSINK_VERSION && mem->id != self->frame_id) { - if (self->drop_same_frames > 0) { - if ( - US_FRAME_COMPARE_GEOMETRY(self->mem, self->frame) - && (self->frame_ts + self->drop_same_frames > now_ts) - && !memcmp(self->frame->data, mem->data, mem->used) - ) { - self->frame_id = mem->id; - goto drop; - } - } - - Py_BLOCK_THREADS - return 0; + if (locked < 0) { + if (errno == EWOULDBLOCK) { + goto retry; } + goto os_error; + } + + us_memsink_shared_s *mem = self->mem; + if (mem->magic != US_MEMSINK_MAGIC || mem->version != US_MEMSINK_VERSION) { + goto retry; + } + + // Let the sink know that the client is alive + mem->last_client_ts = now_ts; + + if (mem->id == self->frame_id) { + goto retry; + } - if (flock(self->fd, LOCK_UN) < 0) { - RETURN_OS_ERROR; + if (self->drop_same_frames > 0) { + if ( + US_FRAME_COMPARE_GEOMETRY(self->mem, self->frame) + && (self->frame_ts + self->drop_same_frames > now_ts) + && !memcmp(self->frame->data, mem->data, mem->used) + ) { + self->frame_id = mem->id; + goto retry; } } - drop: + // New frame found + Py_BLOCK_THREADS + return 0; + + os_error: + Py_BLOCK_THREADS + PyErr_SetFromErrno(PyExc_OSError); + return -1; + + retry: + if (locked >= 0 && flock(self->fd, LOCK_UN) < 0) { + goto os_error; + } if (usleep(1000) < 0) { - RETURN_OS_ERROR; + goto os_error; } Py_END_ALLOW_THREADS if (PyErr_CheckSignals() < 0) { return -1; } - } while (now_ts < deadline_ts); + } while (now_ts < deadline_ts); return -2; -# undef RETURN_OS_ERROR } static PyObject *_MemsinkObject_wait_frame(_MemsinkObject *self, PyObject *args, PyObject *kwargs) { @@ -185,7 +194,6 @@ static PyObject *_MemsinkObject_wait_frame(_MemsinkObject *self, PyObject *args, US_FRAME_COPY_META(self->mem, self->frame); self->frame_id = mem->id; self->frame_ts = us_get_now_monotonic(); - mem->last_client_ts = self->frame_ts; if (key_required) { mem->key_requested = true; } diff --git a/src/libs/memsink.c b/src/libs/memsink.c index 01df505d4..467ee2dce 100644 --- a/src/libs/memsink.c +++ b/src/libs/memsink.c @@ -226,6 +226,10 @@ int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool *key_reque retval = -1; goto done; } + + // Let the sink know that the client is alive + sink->mem->last_client_ts = us_get_now_monotonic(); + if (sink->mem->id == sink->last_readed_id) { retval = -2; // Not updated goto done; @@ -240,7 +244,6 @@ int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool *key_reque if (key_required) { sink->mem->key_requested = true; } - sink->mem->last_client_ts = us_get_now_monotonic(); done: if (flock(sink->fd, LOCK_UN) < 0) {