Skip to content

Commit

Permalink
Merge pull request #7651 from rickard-green/rickard/mon-msgq-len/OTP-…
Browse files Browse the repository at this point in the history
…18709

Monitoring of message queue lengths
  • Loading branch information
rickard-green authored Oct 6, 2023
2 parents b89cd8f + 42bebea commit ef9967a
Show file tree
Hide file tree
Showing 18 changed files with 1,052 additions and 765 deletions.
40 changes: 40 additions & 0 deletions erts/doc/src/erlang.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11643,6 +11643,46 @@ Metadata = #{ pid => pid(),
the <c>Info</c> list can be changed at any time without
prior notice.</p>
</item>
<tag><c>{long_message_queue, {Disable, Enable}}</c></tag>
<item>
<p>
If the message queue length of a process in the system reach
<c><anno>Enable</anno></c> length, a <c>long_message_queue</c>
monitor message is sent to the process identified by
<c><anno>MonitorPid</anno></c>. The monitor message will be on
the form <c>{monitor, Pid, long_message_queue, Long}</c>, where
<c>Pid</c> is the process identifier of the process that got
a long message queue and <c>Long</c> will equal <c>true</c>
indicating that it is in a <i>long message queue</i> state. No
more <c>long_message_queue</c> monitor messages will be sent
due to the process identified by <c>Pid</c> until its message
queue length falls down to a length of
<c><anno>Disable</anno></c> length. When this happens, a
<c>long_message_queue</c> monitor message with <c>Long</c> equal
to <c>false</c> will be sent to the process identified by
<c><anno>MonitorPid</anno></c> indicating that the process is no
longer in a <i>long message queue</i> state. As of this, if the
message queue length should again reach
<c><anno>Enable</anno></c> length, a new <c>long_message_queue</c>
monitor message with <c>Long</c> set to <c>true</c> will again
be sent. That is, a <c>long_message_queue</c> monitor message
is sent when a process enters or leaves a <i>long message
queue</i> state where these state changes are defined by the
<c><anno>Enable</anno></c> and <c><anno>Disable</anno></c>
parameters.
</p>
<p>
<c><anno>Enable</anno></c> length must be an integer larger than
zero and <c><anno>Disable</anno></c> length must be an integer
larger than or equal to zero. <c><anno>Disable</anno></c> length
must also be smaller than <c><anno>Enable</anno></c> length. If
the above is not satisfied the operation will fail with a
<c>badarg</c> error exception. You are recommended to use a much
smaller value for <c><anno>Disable</anno></c> length than
<c><anno>Enable</anno></c> length in order not to be flooded with
<c>long_message_queue</c> monitor messages.
</p>
</item>
<tag><c>{long_schedule, Time}</c></tag>
<item>
<p>If a process or port in the system runs uninterrupted
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/atom.names
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ atom load_failure
atom local
atom logger
atom long_gc
atom long_message_queue
atom long_schedule
atom low
atom Lt='<'
Expand Down
15 changes: 7 additions & 8 deletions erts/emulator/beam/break.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,22 +316,21 @@ print_process_info(fmtfn_t to, void *to_arg, Process *p, ErtsProcLocks orig_lock
len = erts_proc_sig_fetch(p);
erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
} else {
len = p->sig_qs.len;
len = p->sig_qs.mq_len;
}
erts_print(to, to_arg, "Message queue length: %d\n", len);

/* display the message queue only if there is anything in it
and we can do it safely */
if (!ERTS_IS_CRASH_DUMPING && p->sig_qs.first != NULL && !garbing
&& (locks & ERTS_PROC_LOCK_MAIN)) {
ErtsMessage *mp;
erts_print(to, to_arg, "Message queue: [");
ERTS_FOREACH_SIG_PRIVQS(
p, mp,
{
if (ERTS_SIG_IS_NON_MSG((ErtsSignal *) mp))
erts_print(to, to_arg, mp->next ? "%T," : "%T",
ERL_MESSAGE_TERM(mp));
});
for (mp = p->sig_qs.first; mp; mp = mp->next) {
if (ERTS_SIG_IS_NON_MSG((ErtsSignal *) mp))
erts_print(to, to_arg, mp->next ? "%T," : "%T",
ERL_MESSAGE_TERM(mp));
}
erts_print(to, to_arg, "]\n");
}

Expand Down
3 changes: 1 addition & 2 deletions erts/emulator/beam/emu/msg_instrs.tab
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ remove_message() {
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
Sint len = erts_proc_sig_privqs_len(c_p);

dtrace_proc_str(c_p, receiver_name);
token2 = SEQ_TRACE_TOKEN(c_p);
Expand All @@ -219,7 +218,7 @@ remove_message() {
}
DTRACE6(message_receive,
receiver_name, size_object(ERL_MESSAGE_TERM(msgp)),
len, /* This is NOT message queue len, but its something... */
c_p->sig_qs.mq_len,
tok_label, tok_lastcnt, tok_serial);
}
#endif
Expand Down
111 changes: 30 additions & 81 deletions erts/emulator/beam/erl_bif_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -780,13 +780,6 @@ collect_one_suspend_monitor(ErtsMonitor *mon, void *vsmicp, Sint reds)
#define ERTS_PI_IX_PARENT 36
#define ERTS_PI_IX_ASYNC_DIST 37

#define ERTS_PI_FLAG_SINGELTON (1 << 0)
#define ERTS_PI_FLAG_ALWAYS_WRAP (1 << 1)
#define ERTS_PI_FLAG_WANT_MSGS (1 << 2)
#define ERTS_PI_FLAG_NEED_MSGQ_LEN (1 << 3)
#define ERTS_PI_FLAG_FORCE_SIG_SEND (1 << 4)
#define ERTS_PI_FLAG_REQUEST_FOR_OTHER (1 << 5)

#define ERTS_PI_UNRESERVE(RS, SZ) \
(ASSERT((RS) >= (SZ)), (RS) -= (SZ))

Expand All @@ -803,8 +796,8 @@ static ErtsProcessInfoArgs pi_args[] = {
{am_current_function, 4, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_initial_call, 4, 0, ERTS_PROC_LOCK_MAIN},
{am_status, 0, 0, 0},
{am_messages, 0, ERTS_PI_FLAG_WANT_MSGS|ERTS_PI_FLAG_NEED_MSGQ_LEN|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_message_queue_len, 0, ERTS_PI_FLAG_NEED_MSGQ_LEN, ERTS_PROC_LOCK_MAIN},
{am_messages, 0, ERTS_PI_FLAG_WANT_MSGS|ERTS_PI_FLAG_NEED_MSGQ|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_message_queue_len, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_links, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_monitors, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_monitored_by, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
Expand All @@ -813,7 +806,7 @@ static ErtsProcessInfoArgs pi_args[] = {
{am_error_handler, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_heap_size, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_stack_size, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_memory, 0, ERTS_PI_FLAG_NEED_MSGQ_LEN|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_memory, 0, ERTS_PI_FLAG_NEED_MSGQ|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_garbage_collection, 3+2 + 3+2 + 3+2 + 3+2 + 3+2 + ERTS_MAX_HEAP_SIZE_MAP_SZ, 0, ERTS_PROC_LOCK_MAIN},
{am_group_leader, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_reductions, 0, 0, ERTS_PROC_LOCK_MAIN},
Expand All @@ -824,7 +817,7 @@ static ErtsProcessInfoArgs pi_args[] = {
{am_catchlevel, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_backtrace, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_last_calls, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_total_heap_size, 0, ERTS_PI_FLAG_NEED_MSGQ_LEN|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_total_heap_size, 0, ERTS_PI_FLAG_NEED_MSGQ|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
{am_suspending, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, 0},
{am_min_heap_size, 0, 0, ERTS_PROC_LOCK_MAIN},
{am_min_bin_vheap_size, 0, 0, ERTS_PROC_LOCK_MAIN},
Expand Down Expand Up @@ -1106,21 +1099,6 @@ erts_process_info(Process *c_p,
static void
pi_setup_grow(int **arr, int *def_arr, Uint *sz, int ix);

#ifdef DEBUG
static int
empty_or_adj_msgq_signals_only(ErtsMessage *sig)
{
ErtsSignal *s = (ErtsSignal *) sig;
for (s = (ErtsSignal *) sig; s; s = (ErtsSignal *) s->common.next) {
if (!ERTS_SIG_IS_NON_MSG(s))
return 0;
if (ERTS_PROC_SIG_OP(s->common.tag) != ERTS_SIG_Q_OP_ADJ_MSGQ)
return 0;
}
return !0;
}
#endif

static ERTS_INLINE int
pi_maybe_flush_signals(Process *c_p, int pi_flags)
{
Expand All @@ -1129,16 +1107,9 @@ pi_maybe_flush_signals(Process *c_p, int pi_flags)

/*
* pi_maybe_flush_signals() flush signals in callers
* signal queue for two different reasons:
*
* 1. If we need 'message_queue_len', but not 'messages', we need
* to handle all signals in the middle queue in order for
* 'c_p->sig_qs.len' to reflect the amount of messages in the
* message queue. We could count traverse the queues, but it
* is better to handle all signals in the queue instead since
* this is work we anyway need to do at some point.
* signal queue due to the following reason:
*
* 2. Ensures that all signals that the caller might have sent to
* Ensures that all signals that the caller might have sent to
* itself are handled before we gather information.
*
* This is, however, not strictly necessary. process_info() is
Expand All @@ -1159,17 +1130,6 @@ pi_maybe_flush_signals(Process *c_p, int pi_flags)
if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) {
flushed:

/*
* Even though we've requested a clean sig queue
* the middle queue may contain adjust-message-queue
* signals since those may be reinserted if yielding.
* Such signals does not effect us though.
*/
ASSERT(((pi_flags & (ERTS_PI_FLAG_WANT_MSGS
| ERTS_PI_FLAG_NEED_MSGQ_LEN))
!= ERTS_PI_FLAG_NEED_MSGQ_LEN)
|| empty_or_adj_msgq_signals_only(c_p->sig_qs.cont));

ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS);

c_p->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS);
Expand All @@ -1180,25 +1140,19 @@ pi_maybe_flush_signals(Process *c_p, int pi_flags)
state = erts_atomic32_read_nob(&c_p->state);

if (!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) {
int flush_flags = 0;
if (erts_atomic32_read_nob(&c_p->xstate)
& ERTS_PXSFLG_MAYBE_SELF_SIGS) {
flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_FROM_ID;
}
else if (state & ERTS_PSFLG_MSG_SIG_IN_Q) {
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
}
if (((pi_flags & (ERTS_PI_FLAG_WANT_MSGS
| ERTS_PI_FLAG_NEED_MSGQ_LEN))
== ERTS_PI_FLAG_NEED_MSGQ_LEN)
&& (flush_flags || c_p->sig_qs.cont)) {
flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ;
if (!(erts_atomic32_read_nob(&c_p->xstate)
& ERTS_PXSFLG_MAYBE_SELF_SIGS)) {
if (state & ERTS_PSFLG_MSG_SIG_IN_Q) {
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
}
/* done; no need to flush... */
return 0;
}
if (!flush_flags)
return 0; /* done; no need to flush... */
erts_proc_sig_init_flush_signals(c_p, flush_flags, c_p->common.id);
erts_proc_sig_init_flush_signals(c_p,
ERTS_PROC_SIG_FLUSH_FLG_FROM_ID,
c_p->common.id);
if (c_p->sig_qs.flags & FS_FLUSHED_SIGS)
goto flushed;
}
Expand Down Expand Up @@ -1327,7 +1281,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
/* wait for it to terminate properly... */
goto send_signal;
}
if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) {
if (flags & ERTS_PI_FLAG_NEED_MSGQ) {
ASSERT(locks & ERTS_PROC_LOCK_MAIN);
if (rp->sig_qs.flags & FS_FLUSHING_SIGS) {
erts_proc_unlock(rp, locks);
Expand Down Expand Up @@ -1408,9 +1362,8 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)

send_signal: {
Eterm ref = erts_make_ref(c_p);
int enqueued, need_msgq_len;
int enqueued;
flags |= ERTS_PI_FLAG_REQUEST_FOR_OTHER;
need_msgq_len = (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
/*
* Set save pointer to the end of the message queue so we won't
* have to scan the whole* message queue for the result. Note
Expand All @@ -1419,9 +1372,8 @@ send_signal: {
*/
erts_msgq_set_save_end(c_p);
enqueued = erts_proc_sig_send_process_info_request(c_p, pid, item_ix,
len, need_msgq_len,
flags, reserve_size,
ref);
len, flags,
reserve_size, ref);
if (!enqueued) {
/* Restore save pointer... */
erts_msgq_set_save_first(c_p);
Expand Down Expand Up @@ -1581,8 +1533,8 @@ process_info_aux(Process *c_p,
}

case ERTS_PI_IX_MESSAGES: {
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) {
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ);
if (rp->sig_qs.mq_len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) {
*msgq_len_p = 0;
res = NIL;
}
Expand All @@ -1593,7 +1545,7 @@ process_info_aux(Process *c_p,
Uint heap_need;

mip = erts_alloc(ERTS_ALC_T_TMP,
rp->sig_qs.len*sizeof(ErtsMessageInfo));
rp->sig_qs.mq_len*sizeof(ErtsMessageInfo));

/*
* Note that message queue may shrink when calling
Expand Down Expand Up @@ -1635,11 +1587,8 @@ process_info_aux(Process *c_p,
case ERTS_PI_IX_MESSAGE_QUEUE_LEN: {
Sint len = *msgq_len_p;
if (len < 0) {
ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER)
|| !rp->sig_qs.cont);
len = rp->sig_qs.len;
len = rp->sig_qs.mq_len;
}
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
ASSERT(len >= 0);
if (len <= MAX_SMALL)
res = make_small(len);
Expand Down Expand Up @@ -1924,17 +1873,17 @@ process_info_aux(Process *c_p,

total_heap_size += rp->mbuf_sz;

ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ);
if (rp->sig_qs.flags & FS_ON_HEAP_MSGQ) {
ErtsMessage *mp;
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
for (mp = rp->sig_qs.first; mp; mp = mp->next) {
if (ERTS_SIG_IS_RECV_MARKER(mp))
continue;
ASSERT(ERTS_SIG_IS_MSG(mp));
if (mp->data.attached)
total_heap_size += erts_msg_attached_data_size(mp);
}
*reds += (Uint) rp->sig_qs.len / 4;
*reds += (Uint) rp->sig_qs.mq_len / 4;
}

(void) erts_bld_uint(NULL, &hsz, total_heap_size);
Expand All @@ -1959,8 +1908,8 @@ process_info_aux(Process *c_p,
hp = erts_produce_heap(hfact, hsz, reserve_size);
res = erts_bld_uint(&hp, NULL, size);

ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN);
*reds += (Uint) rp->sig_qs.len / 4;
ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ);
*reds += (Uint) rp->sig_qs.mq_len / 4;

break;
}
Expand Down
Loading

0 comments on commit ef9967a

Please sign in to comment.