Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
Improve device management thread
Browse files Browse the repository at this point in the history
Remove useless mutex.
Move eventfd-related functions to separate file.
Rename config properties: discovery_interval -> manager_interval, default value: 15.
  • Loading branch information
RoEdAl committed Jun 9, 2024
1 parent 6a0c122 commit fa9ca8e
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 131 deletions.
4 changes: 2 additions & 2 deletions etc/quectel.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[general]
;interval=60 ; Number of seconds between trying to connect to devices
;interval=15 ; Number of seconds between trying to connect to devices
;smsdb=:memory: ; /var/lib/asterisk/smsdb
;smsdb_backup=/var/lib/asterisk/smsdb-backup
;csmsttl=600
;smsttl=600

[defaults]
;multiparty=no
Expand Down
4 changes: 2 additions & 2 deletions quectel.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[general]
;interval=60 ; Number of seconds between trying to connect to devices
;interval=15 ; Number of seconds between trying to connect to devices
;smsdb=:memory: ; /var/lib/asterisk/smsdb
;smsdb_backup=/var/lib/asterisk/smsdb-backup
;csmsttl=600
;smsttl=600

[defaults]
;multiparty=no
Expand Down
153 changes: 46 additions & 107 deletions src/chan_quectel.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
*/

#include <signal.h>
#include <sys/eventfd.h>

#include "ast_config.h"

Expand All @@ -59,6 +58,7 @@
#include "dc_config.h" /* dc_uconfig_fill() dc_gconfig_fill() dc_sconfig_fill() */
#include "errno.h"
#include "error.h"
#include "eventfd.h"
#include "helpers.h"
#include "monitor_thread.h"
#include "msg_tech.h"
Expand Down Expand Up @@ -302,14 +302,6 @@ static void pvt_destroy(struct pvt* const pvt)
pvt_free(pvt);
}

static void get_public_state_event(struct public_state* const state, int* discovery_interval, int* fd)
{
SCOPED_MUTEX(dev_manager_lock, &state->dev_manager_lock);
*discovery_interval = SCONF_GLOBAL(state, discovery_interval);
fd[0] = state->dev_manager_stop_event;
fd[1] = state->dev_manager_restart_event;
}

static int waitfor_n_fd(int* fd, int* ms)
{
int exception;
Expand All @@ -323,43 +315,26 @@ static int waitfor_n_fd(int* fd, int* ms)
return outfd;
}

static int reset_event(int fd)
{
// eventfd uses 64bit integers for signalling
uint64_t dummy = 0;
ssize_t l = read(fd, &dummy, sizeof(dummy));
if (l < 0 && errno != EAGAIN) {
return -1;
}
// EAGAIN is non-fatal here since that just means that
// the event was already reset when we were called,
// which is ok
return 0;
}

static void dev_manager_threadproc_state(struct public_state* const state)
{
int discovery_interval;
int fd[2];
int manager_interval = SCONF_GLOBAL(state, manager_interval);
int fd[2] = {state->dev_manager_event_stop, state->dev_manager_event_signal};

auto int ev_wait()
{
int t = discovery_interval;
int t = manager_interval;
return waitfor_n_fd(fd, &t);
}

get_public_state_event(state, &discovery_interval, fd);

while (1) {
const int waitfd = ev_wait();
if (waitfd == fd[0]) { // exit
ast_log(LOG_NOTICE, "Got device manager exit event\n");
ast_debug(3, "[dev-manager] Got exit event\n");
break;
} else if (waitfd == fd[1]) { // reload
ast_log(LOG_NOTICE, "Got device manager reload event\n");
get_public_state_event(state, &discovery_interval, fd);
reset_event(fd[1]);
continue;
ast_debug(3, "[dev-manager] Got signal\n");
manager_interval = SCONF_GLOBAL(state, manager_interval);
eventfd_reset(fd[1]);
}
// timeout
struct pvt* pvt;
Expand All @@ -368,7 +343,9 @@ static void dev_manager_threadproc_state(struct public_state* const state)
AST_RWLIST_TRAVERSE(&state->devices, pvt, entry) {
SCOPED_MUTEX(pvt_lock, &pvt->lock);

pvt->must_remove = 0;
if (pvt->must_remove) {
continue;
}

if (pvt->restart_time != RESTATE_TIME_NOW) {
continue;
Expand All @@ -379,32 +356,44 @@ static void dev_manager_threadproc_state(struct public_state* const state)

switch (pvt->desired_state) {
case DEV_STATE_RESTARTED:
ast_debug(4, "[dev-manager][%s] Restarting device\n", PVT_ID(pvt));
pvt_monitor_stop(pvt);
pvt->desired_state = DEV_STATE_STARTED;
/* fall through */

case DEV_STATE_STARTED:
ast_debug(4, "[dev-manager][%s] Starting device\n", PVT_ID(pvt));
pvt_start(pvt);
break;

case DEV_STATE_REMOVED:
ast_debug(4, "[dev-manager][%s] Removing device\n", PVT_ID(pvt));
pvt_monitor_stop(pvt);
pvt->must_remove = 1;
break;

case DEV_STATE_STOPPED:
ast_debug(4, "[dev-manager][%s] Stopping device\n", PVT_ID(pvt));
pvt_monitor_stop(pvt);
break;
}
}
AST_RWLIST_UNLOCK(&state->devices);

/* actual device removal here for avoid long (discovery) time write lock on device list in loop above */
AST_RWLIST_WRLOCK(&state->devices);

if (AST_RWLIST_TRYWRLOCK(&state->devices)) {
continue;
}

AST_RWLIST_TRAVERSE_SAFE_BEGIN(&state->devices, pvt, entry)
{
ast_mutex_lock(&pvt->lock);
if (ast_mutex_trylock(&pvt->lock)) {
continue;
}

if (pvt->must_remove) {
ast_debug(4, "[dev-manager][%s] Freeing device\n", PVT_ID(pvt));
AST_RWLIST_REMOVE_CURRENT(entry);
pvt_free(pvt);
} else {
Expand All @@ -425,71 +414,32 @@ static void* dev_manager_threadproc(void* arg)

static int dev_manager_start(struct public_state* const state)
{
SCOPED_MUTEX(device_manager_lock, &state->dev_manager_lock);

if (ast_pthread_create_background(&state->dev_manager_thread, NULL, dev_manager_threadproc, state) < 0) {
ast_log(LOG_ERROR, "Unable to start discovery thread\n");
state->dev_manager_thread = AST_PTHREADT_NULL;
return -1;
}

return 0;
}

static int set_event(int fd)
static void dev_manager_signal(const struct public_state* const state)
{
// eventfd uses 64bit integers for signalling
uint64_t dummy = 1;
ssize_t l = write(fd, &dummy, sizeof(dummy));
if (l < 0) {
return -1;
}
return 0;
}

static void get_dev_manager_restart_event(struct public_state* const state, int* dev_manager_restart_event)
{
SCOPED_MUTEX(dev_manager_lock, &state->dev_manager_lock);
*dev_manager_restart_event = state->dev_manager_restart_event;
}

static void dev_manager_restart(struct public_state* const state)
{
int fd;
get_dev_manager_restart_event(state, &fd);
if (set_event(fd)) {
if (eventfd_set(state->dev_manager_event_signal)) {
ast_log(LOG_ERROR, "Unable to signal device manager thread\n");
}
}

static void get_dev_manager_thread_handle(struct public_state* const state, pthread_t* dev_manager_thread, int* dev_manager_stop_event)
{
SCOPED_MUTEX(dev_manager_lock, &state->dev_manager_lock);
*dev_manager_thread = state->dev_manager_thread;
*dev_manager_stop_event = state->dev_manager_stop_event;
}

static void clear_dev_manager_thread_handle(struct public_state* const state)
{
SCOPED_MUTEX(dev_manager_lock, &state->dev_manager_lock);
state->dev_manager_thread = AST_PTHREADT_NULL;
}

static void dev_manager_stop(struct public_state* const state)
{
pthread_t dev_manager_thread;
int dev_manager_stop_event;

get_dev_manager_thread_handle(state, &dev_manager_thread, &dev_manager_stop_event);
if (set_event(dev_manager_stop_event)) {
if (eventfd_set(state->dev_manager_event_stop)) {
ast_log(LOG_ERROR, "Unable to signal device manager thread\n");
}

if (dev_manager_thread && (dev_manager_thread != AST_PTHREADT_STOP) && (dev_manager_thread != AST_PTHREADT_NULL)) {
pthread_join(dev_manager_thread, NULL);
if (state->dev_manager_thread && (state->dev_manager_thread != AST_PTHREADT_STOP) && (state->dev_manager_thread != AST_PTHREADT_NULL)) {
pthread_join(state->dev_manager_thread, NULL);
}

clear_dev_manager_thread_handle(state);
state->dev_manager_thread = AST_PTHREADT_NULL;
}

#/* */
Expand Down Expand Up @@ -1184,7 +1134,7 @@ static struct pvt* pvt_create(const pvt_config_t* settings)
ast_string_field_set(pvt, subscriber_number, NULL);

/* and copy settings */
memcpy(&pvt->settings, settings, sizeof(pvt->settings));
pvt->settings = *settings;

pvt->empty_str.__AST_STR_LEN = 1;
pvt->empty_str.__AST_STR_TS = DS_STATIC;
Expand All @@ -1209,7 +1159,7 @@ void pvt_try_restate(struct pvt* pvt)
{
if (pvt_time4restate(pvt)) {
pvt->restart_time = RESTATE_TIME_NOW;
dev_manager_restart(gpublic);
dev_manager_signal(gpublic);
}
}

Expand All @@ -1232,8 +1182,7 @@ static int pvt_reconfigure(struct pvt* pvt, const pvt_config_t* settings, restat
}

/* check what config changes require restaring */
else if (strcmp(UCONFIG(settings, audio_tty), CONF_UNIQ(pvt, audio_tty)) || strcmp(UCONFIG(settings, data_tty), CONF_UNIQ(pvt, data_tty)) ||
SCONFIG(settings, resetquectel) != CONF_SHARED(pvt, resetquectel) || SCONFIG(settings, callwaiting) != CONF_SHARED(pvt, callwaiting)) {
else if (pvt_config_compare(settings, &pvt->settings)) {
/* TODO: schedule restart */
pvt->desired_state = DEV_STATE_RESTARTED;

Expand All @@ -1242,7 +1191,7 @@ static int pvt_reconfigure(struct pvt* pvt, const pvt_config_t* settings, restat
}

/* and copy settings */
memcpy(&pvt->settings, settings, sizeof(pvt->settings));
pvt->settings = *settings;
}
return rv;
}
Expand Down Expand Up @@ -1464,13 +1413,13 @@ static int load_module()
return AST_MODULE_LOAD_DECLINE;
}

const int rv = public_state_init(gpublic);
if (rv != AST_MODULE_LOAD_SUCCESS) {
const int res = public_state_init(gpublic);
if (res != AST_MODULE_LOAD_SUCCESS) {
ast_free(gpublic);
gpublic = NULL;
}

return rv;
return res;
}

#/* */
Expand All @@ -1493,8 +1442,6 @@ static unsigned int get_default_framing() { return PTIME_CAPTURE }

#endif

static int create_event() { return eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); }

static int public_state_init(struct public_state* state)
{
int rv = AST_MODULE_LOAD_DECLINE;
Expand All @@ -1504,10 +1451,9 @@ static int public_state_init(struct public_state* state)
return rv;
}

ast_mutex_init(&state->dev_manager_lock);
state->dev_manager_restart_event = create_event();
state->dev_manager_stop_event = create_event();
state->dev_manager_thread = AST_PTHREADT_NULL;
state->dev_manager_event_signal = eventfd_create();
state->dev_manager_event_stop = eventfd_create();
state->dev_manager_thread = AST_PTHREADT_NULL;

AST_RWLIST_HEAD_INIT(&state->devices);

Expand All @@ -1519,8 +1465,8 @@ static int public_state_init(struct public_state* state)

rv = AST_MODULE_LOAD_FAILURE;

if (!dev_manager_start(state)) {
ast_log(LOG_ERROR, "Unable to create discovery thread\n");
if (dev_manager_start(state)) {
ast_log(LOG_ERROR, "Unable to create device manager thread\n");
devices_destroy(state);
AST_RWLIST_HEAD_DESTROY(&state->devices);
return rv;
Expand Down Expand Up @@ -1566,12 +1512,6 @@ static int public_state_init(struct public_state* state)

#/* */

void close_event(int* fd)
{
close(*fd);
*fd = -1;
}

static void public_state_fini(struct public_state* const state)
{
/* First, take us out of the channel loop */
Expand All @@ -1595,9 +1535,8 @@ static void public_state_fini(struct public_state* const state)
dev_manager_stop(state);
devices_destroy(state);

ast_mutex_destroy(&state->dev_manager_lock);
close_event(&state->dev_manager_restart_event);
close_event(&state->dev_manager_stop_event);
eventfd_close(&state->dev_manager_event_signal);
eventfd_close(&state->dev_manager_event_stop);
AST_RWLIST_HEAD_DESTROY(&state->devices);

ast_threadpool_shutdown(gpublic->threadpool);
Expand All @@ -1619,7 +1558,7 @@ void pvt_reload(restate_time_t when)

reload_config(gpublic, 1, when, &dev_reload);
if (dev_reload > 0) {
dev_manager_restart(gpublic);
dev_manager_signal(gpublic);
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/chan_quectel.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,9 @@ typedef struct pvt {
typedef struct public_state {
AST_RWLIST_HEAD(devices, pvt) devices;
struct ast_threadpool* threadpool;
ast_mutex_t dev_manager_lock;
pthread_t dev_manager_thread;
int dev_manager_restart_event;
int dev_manager_stop_event;
int dev_manager_event_signal;
int dev_manager_event_stop;
struct dc_gconfig global_settings;
} public_state_t;

Expand Down
Loading

0 comments on commit fa9ca8e

Please sign in to comment.