diff --git a/logerrors.c b/logerrors.c index 6ca3f39..419e1b0 100644 --- a/logerrors.c +++ b/logerrors.c @@ -82,9 +82,8 @@ typedef struct slow_log_info { } SlowLogInfo; typedef struct messages_buffer { - LWLock lock; - int current_interval_index; - pg_atomic_uint32 current_message_index; + pg_atomic_uint64 current_interval_index; + pg_atomic_uint64 current_message_index; /* depends on messages per interval and max intervals count */ MessageInfo buffer[messages_per_interval * max_actual_intervals_count]; } MessagesBuffer; @@ -134,7 +133,6 @@ global_variables_init() char* excluded_errcode_str; char excluded_errcodes_copy[error_codes_count * (len_sqlstate_str + 1)]; global_variables->intervals_count = intervals_count; - /* +5 because we don't want take lock on MessagesBuffer while pg_log_errors_stats is running */ global_variables->actual_intervals_count = intervals_count + 5; global_variables->interval = interval; @@ -182,25 +180,16 @@ static void add_message(int err_code, Oid db_oid, Oid user_oid, int message_type_index) { int index_to_write; int current_message; + int current_interval; if (global_variables == NULL) return; - LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE); - current_message = pg_atomic_read_u32(&global_variables->messagesBuffer.current_message_index); - index_to_write = global_variables->messagesBuffer.current_interval_index * messages_per_interval - + current_message; - if (current_message >= messages_per_interval) { - /* too many messages per one interval, save current instead of random message in interval */ - srand(time(0)); - index_to_write = global_variables->messagesBuffer.current_interval_index * messages_per_interval + - rand() % messages_per_interval; - } - + current_interval = pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) % (uint64)global_variables->actual_intervals_count; + current_message = pg_atomic_fetch_add_u64(&global_variables->messagesBuffer.current_message_index, 1) % (uint64)messages_per_interval; + index_to_write = current_interval * messages_per_interval + current_message; global_variables->messagesBuffer.buffer[index_to_write].db_oid = db_oid; global_variables->messagesBuffer.buffer[index_to_write].user_oid = user_oid; - global_variables->messagesBuffer.buffer[index_to_write].error_code = err_code; global_variables->messagesBuffer.buffer[index_to_write].message_type_index = message_type_index; - pg_atomic_write_u32(&global_variables->messagesBuffer.current_message_index, current_message + 1); - LWLockRelease(&global_variables->messagesBuffer.lock); + global_variables->messagesBuffer.buffer[index_to_write].error_code = err_code; } static char* @@ -232,9 +221,9 @@ logerrors_init() err_name = hash_search(error_names_hashtable, (void *) &key, HASH_ENTER, &found); err_name->name = (char*)error_names[i]; } - pg_atomic_init_u32(&global_variables->messagesBuffer.current_message_index, 0); + pg_atomic_init_u64(&global_variables->messagesBuffer.current_message_index, 0); + pg_atomic_init_u64(&global_variables->messagesBuffer.current_interval_index, 0); MemSet(&global_variables->total_count, 0, message_types_count); - LWLockInitialize(&global_variables->messagesBuffer.lock, LWLockNewTrancheId()); for (i = 0; i < message_types_count; ++i) { pg_atomic_init_u32(&global_variables->total_count[i], 0); } @@ -251,24 +240,21 @@ static void logerrors_update_info() { int i; - int current_index; - int prev_index; + int current_interval; if (global_variables == NULL) { return; } - LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE); - prev_index = global_variables->messagesBuffer.current_interval_index; - global_variables->messagesBuffer.current_interval_index = (prev_index + 1) - % global_variables->actual_intervals_count; - current_index = global_variables->messagesBuffer.current_interval_index; + current_interval = (pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) + (uint64)1) % + (uint64)global_variables->actual_intervals_count; for (i = 0; i < messages_per_interval; ++i) { - global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].error_code = -1; - global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].db_oid = -1; - global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].user_oid = -1; - global_variables->messagesBuffer.buffer[i + current_index * messages_per_interval].message_type_index = -1; + global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].error_code = -1; + global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].db_oid = -1; + global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].user_oid = -1; + global_variables->messagesBuffer.buffer[i + current_interval * messages_per_interval].message_type_index = -1; } - pg_atomic_write_u32(&global_variables->messagesBuffer.current_message_index, 0); - LWLockRelease(&global_variables->messagesBuffer.lock); + pg_atomic_write_u64(&global_variables->messagesBuffer.current_message_index, 0); + // no locking is required as this is the only place where the current_interval_index changes + pg_atomic_write_u64(&global_variables->messagesBuffer.current_interval_index, current_interval); } void @@ -657,9 +643,7 @@ pg_log_errors_stats(PG_FUNCTION_ARGS) rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); - LWLockAcquire(&global_variables->messagesBuffer.lock, LW_EXCLUSIVE); - current_interval_index = global_variables->messagesBuffer.current_interval_index; - LWLockRelease(&global_variables->messagesBuffer.lock); + current_interval_index = pg_atomic_read_u64(&global_variables->messagesBuffer.current_interval_index) % (uint64)global_variables->actual_intervals_count; /* 'TOTAL' counters */ for (lvl_i = 0; lvl_i < message_types_count; ++lvl_i) {