Skip to content

Commit

Permalink
memsink client: bump last_as_blank_ts on every flock()
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Mar 5, 2024
1 parent 0b8940d commit 40e17b0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 35 deletions.
76 changes: 42 additions & 34 deletions python/src/ustreamer.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,57 +109,66 @@ static PyObject *_MemsinkObject_exit(_MemsinkObject *self, PyObject *Py_UNUSED(i
static int _wait_frame(_MemsinkObject *self) {
const ldf deadline_ts = us_get_now_monotonic() + self->wait_timeout;

# define RETURN_OS_ERROR { \
Py_BLOCK_THREADS \
PyErr_SetFromErrno(PyExc_OSError); \
return -1; \
}

int locked = -1;
ldf now_ts;
do {
Py_BEGIN_ALLOW_THREADS

const int retval = us_flock_timedwait_monotonic(self->fd, self->lock_timeout);
locked = us_flock_timedwait_monotonic(self->fd, self->lock_timeout);
now_ts = us_get_now_monotonic();

if (retval < 0 && errno != EWOULDBLOCK) {
RETURN_OS_ERROR;

} else if (retval == 0) {
us_memsink_shared_s *mem = self->mem;
if (mem->magic == US_MEMSINK_MAGIC && mem->version == US_MEMSINK_VERSION && mem->id != self->frame_id) {
if (self->drop_same_frames > 0) {
if (
US_FRAME_COMPARE_GEOMETRY(self->mem, self->frame)
&& (self->frame_ts + self->drop_same_frames > now_ts)
&& !memcmp(self->frame->data, mem->data, mem->used)
) {
self->frame_id = mem->id;
goto drop;
}
}

Py_BLOCK_THREADS
return 0;
if (locked < 0) {
if (errno == EWOULDBLOCK) {
goto retry;
}
goto os_error;
}

us_memsink_shared_s *mem = self->mem;
if (mem->magic != US_MEMSINK_MAGIC || mem->version != US_MEMSINK_VERSION) {
goto retry;
}

// Let the sink know that the client is alive
mem->last_client_ts = now_ts;

if (mem->id == self->frame_id) {
goto retry;
}

if (flock(self->fd, LOCK_UN) < 0) {
RETURN_OS_ERROR;
if (self->drop_same_frames > 0) {
if (
US_FRAME_COMPARE_GEOMETRY(self->mem, self->frame)
&& (self->frame_ts + self->drop_same_frames > now_ts)
&& !memcmp(self->frame->data, mem->data, mem->used)
) {
self->frame_id = mem->id;
goto retry;
}
}

drop:
// New frame found
Py_BLOCK_THREADS
return 0;

os_error:
Py_BLOCK_THREADS
PyErr_SetFromErrno(PyExc_OSError);
return -1;

retry:
if (locked >= 0 && flock(self->fd, LOCK_UN) < 0) {
goto os_error;
}
if (usleep(1000) < 0) {
RETURN_OS_ERROR;
goto os_error;
}
Py_END_ALLOW_THREADS
if (PyErr_CheckSignals() < 0) {
return -1;
}
} while (now_ts < deadline_ts);

} while (now_ts < deadline_ts);
return -2;
# undef RETURN_OS_ERROR
}

static PyObject *_MemsinkObject_wait_frame(_MemsinkObject *self, PyObject *args, PyObject *kwargs) {
Expand All @@ -185,7 +194,6 @@ static PyObject *_MemsinkObject_wait_frame(_MemsinkObject *self, PyObject *args,
US_FRAME_COPY_META(self->mem, self->frame);
self->frame_id = mem->id;
self->frame_ts = us_get_now_monotonic();
mem->last_client_ts = self->frame_ts;
if (key_required) {
mem->key_requested = true;
}
Expand Down
5 changes: 4 additions & 1 deletion src/libs/memsink.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool *key_reque
retval = -1;
goto done;
}

// Let the sink know that the client is alive
sink->mem->last_client_ts = us_get_now_monotonic();

if (sink->mem->id == sink->last_readed_id) {
retval = -2; // Not updated
goto done;
Expand All @@ -240,7 +244,6 @@ int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool *key_reque
if (key_required) {
sink->mem->key_requested = true;
}
sink->mem->last_client_ts = us_get_now_monotonic();

done:
if (flock(sink->fd, LOCK_UN) < 0) {
Expand Down

0 comments on commit 40e17b0

Please sign in to comment.