Skip to content

Commit

Permalink
Merge pull request #25 from munakoiso/better_lwlock
Browse files Browse the repository at this point in the history
Remove LWLocks
  • Loading branch information
munakoiso authored Mar 4, 2024
2 parents 3c15333 + cd0e9df commit 76ef74b
Showing 1 changed file with 20 additions and 36 deletions.
56 changes: 20 additions & 36 deletions logerrors.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ typedef struct slow_log_info {
} SlowLogInfo;

typedef struct messages_buffer {
LWLock lock;
int current_interval_index;
pg_atomic_uint32 current_message_index;
pg_atomic_uint64 current_interval_index;
pg_atomic_uint64 current_message_index;
/* depends on messages per interval and max intervals count */
MessageInfo buffer[messages_per_interval * max_actual_intervals_count];
} MessagesBuffer;
Expand Down Expand Up @@ -134,7 +133,6 @@ global_variables_init()
char* excluded_errcode_str;
char excluded_errcodes_copy[error_codes_count * (len_sqlstate_str + 1)];
global_variables->intervals_count = intervals_count;
/* +5 because we don't want take lock on MessagesBuffer while pg_log_errors_stats is running */
global_variables->actual_intervals_count = intervals_count + 5;
global_variables->interval = interval;

Expand Down Expand Up @@ -182,25 +180,16 @@ static void
add_message(int err_code, Oid db_oid, Oid user_oid, int message_type_index) {
int index_to_write;
int current_message;
int current_interval;
if (global_variables == NULL)
return;
LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE);
current_message = pg_atomic_read_u32(&global_variables->messagesBuffer.current_message_index);
index_to_write = global_variables->messagesBuffer.current_interval_index * messages_per_interval
+ current_message;
if (current_message >= messages_per_interval) {
/* too many messages per one interval, save current instead of random message in interval */
srand(time(0));
index_to_write = global_variables->messagesBuffer.current_interval_index * messages_per_interval +
rand() % messages_per_interval;
}

current_interval = pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) % (uint64)global_variables->actual_intervals_count;
current_message = pg_atomic_fetch_add_u64(&global_variables->messagesBuffer.current_message_index, 1) % (uint64)messages_per_interval;
index_to_write = current_interval * messages_per_interval + current_message;
global_variables->messagesBuffer.buffer[index_to_write].db_oid = db_oid;
global_variables->messagesBuffer.buffer[index_to_write].user_oid = user_oid;
global_variables->messagesBuffer.buffer[index_to_write].error_code = err_code;
global_variables->messagesBuffer.buffer[index_to_write].message_type_index = message_type_index;
pg_atomic_write_u32(&global_variables->messagesBuffer.current_message_index, current_message + 1);
LWLockRelease(&global_variables->messagesBuffer.lock);
global_variables->messagesBuffer.buffer[index_to_write].error_code = err_code;
}

static char*
Expand Down Expand Up @@ -232,9 +221,9 @@ logerrors_init()
err_name = hash_search(error_names_hashtable, (void *) &key, HASH_ENTER, &found);
err_name->name = (char*)error_names[i];
}
pg_atomic_init_u32(&global_variables->messagesBuffer.current_message_index, 0);
pg_atomic_init_u64(&global_variables->messagesBuffer.current_message_index, 0);
pg_atomic_init_u64(&global_variables->messagesBuffer.current_interval_index, 0);
MemSet(&global_variables->total_count, 0, message_types_count);
LWLockInitialize(&global_variables->messagesBuffer.lock, LWLockNewTrancheId());
for (i = 0; i < message_types_count; ++i) {
pg_atomic_init_u32(&global_variables->total_count[i], 0);
}
Expand All @@ -251,24 +240,21 @@ static void
logerrors_update_info()
{
int i;
int current_index;
int prev_index;
int current_interval;
if (global_variables == NULL) {
return;
}
LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE);
prev_index = global_variables->messagesBuffer.current_interval_index;
global_variables->messagesBuffer.current_interval_index = (prev_index + 1)
% global_variables->actual_intervals_count;
current_index = global_variables->messagesBuffer.current_interval_index;
current_interval = (pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) + (uint64)1) %
(uint64)global_variables->actual_intervals_count;
for (i = 0; i < messages_per_interval; ++i) {
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].error_code = -1;
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].db_oid = -1;
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].user_oid = -1;
global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].message_type_index = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].error_code = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].db_oid = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].user_oid = -1;
global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].message_type_index = -1;
}
pg_atomic_write_u32(&global_variables->messagesBuffer.current_message_index, 0);
LWLockRelease(&global_variables->messagesBuffer.lock);
pg_atomic_write_u64(&global_variables->messagesBuffer.current_message_index, 0);
// no locking is required as this is the only place where the current_interval_index changes
pg_atomic_write_u64(&global_variables->messagesBuffer.current_interval_index, current_interval);
}

void
Expand Down Expand Up @@ -657,9 +643,7 @@ pg_log_errors_stats(PG_FUNCTION_ARGS)
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);

LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE);
current_interval_index = global_variables->messagesBuffer.current_interval_index;
LWLockRelease(&global_variables->messagesBuffer.lock);
current_interval_index = pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) % (uint64)global_variables->actual_intervals_count;
/* 'TOTAL' counters */
for (lvl_i = 0; lvl_i < message_types_count; ++lvl_i) {

Expand Down

0 comments on commit 76ef74b

Please sign in to comment.