diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 27b9de9a127d..ca390bab9ac6 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -11643,6 +11643,46 @@ Metadata = #{ pid => pid(), the Info list can be changed at any time without prior notice.

+ {long_message_queue, {Disable, Enable}} + +

+ If the message queue length of a process in the system reach + Enable length, a long_message_queue + monitor message is sent to the process identified by + MonitorPid. The monitor message will be on + the form {monitor, Pid, long_message_queue, Long}, where + Pid is the process identifier of the process that got + a long message queue and Long will equal true + indicating that it is in a long message queue state. No + more long_message_queue monitor messages will be sent + due to the process identified by Pid until its message + queue length falls down to a length of + Disable length. When this happens, a + long_message_queue monitor message with Long equal + to false will be sent to the process identified by + MonitorPid indicating that the process is no + longer in a long message queue state. As of this, if the + message queue length should again reach + Enable length, a new long_message_queue + monitor message with Long set to true will again + be sent. That is, a long_message_queue monitor message + is sent when a process enters or leaves a long message + queue state where these state changes are defined by the + Enable and Disable + parameters. +

+

+ Enable length must be an integer larger than + zero and Disable length must be an integer + larger than or equal to zero. Disable length + must also be smaller than Enable length. If + the above is not satisfied the operation will fail with a + badarg error exception. You are recommended to use a much + smaller value for Disable length than + Enable length in order not to be flooded with + long_message_queue monitor messages. +

+
{long_schedule, Time}

If a process or port in the system runs uninterrupted diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index f0fa834d493a..06164e9f6a1a 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -410,6 +410,7 @@ atom load_failure atom local atom logger atom long_gc +atom long_message_queue atom long_schedule atom low atom Lt='<' diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index f161378faf8e..03db7d307c41 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -316,7 +316,7 @@ print_process_info(fmtfn_t to, void *to_arg, Process *p, ErtsProcLocks orig_lock len = erts_proc_sig_fetch(p); erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); } else { - len = p->sig_qs.len; + len = p->sig_qs.mq_len; } erts_print(to, to_arg, "Message queue length: %d\n", len); @@ -324,14 +324,13 @@ print_process_info(fmtfn_t to, void *to_arg, Process *p, ErtsProcLocks orig_lock and we can do it safely */ if (!ERTS_IS_CRASH_DUMPING && p->sig_qs.first != NULL && !garbing && (locks & ERTS_PROC_LOCK_MAIN)) { + ErtsMessage *mp; erts_print(to, to_arg, "Message queue: ["); - ERTS_FOREACH_SIG_PRIVQS( - p, mp, - { - if (ERTS_SIG_IS_NON_MSG((ErtsSignal *) mp)) - erts_print(to, to_arg, mp->next ? "%T," : "%T", - ERL_MESSAGE_TERM(mp)); - }); + for (mp = p->sig_qs.first; mp; mp = mp->next) { + if (ERTS_SIG_IS_NON_MSG((ErtsSignal *) mp)) + erts_print(to, to_arg, mp->next ? "%T," : "%T", + ERL_MESSAGE_TERM(mp)); + } erts_print(to, to_arg, "]\n"); } diff --git a/erts/emulator/beam/emu/msg_instrs.tab b/erts/emulator/beam/emu/msg_instrs.tab index 00c7291257c6..365313ab37bd 100644 --- a/erts/emulator/beam/emu/msg_instrs.tab +++ b/erts/emulator/beam/emu/msg_instrs.tab @@ -208,7 +208,6 @@ remove_message() { Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; - Sint len = erts_proc_sig_privqs_len(c_p); dtrace_proc_str(c_p, receiver_name); token2 = SEQ_TRACE_TOKEN(c_p); @@ -219,7 +218,7 @@ remove_message() { } DTRACE6(message_receive, receiver_name, size_object(ERL_MESSAGE_TERM(msgp)), - len, /* This is NOT message queue len, but its something... */ + c_p->sig_qs.mq_len, tok_label, tok_lastcnt, tok_serial); } #endif diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index bee6fd2084ec..9993f1a4864b 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -780,13 +780,6 @@ collect_one_suspend_monitor(ErtsMonitor *mon, void *vsmicp, Sint reds) #define ERTS_PI_IX_PARENT 36 #define ERTS_PI_IX_ASYNC_DIST 37 -#define ERTS_PI_FLAG_SINGELTON (1 << 0) -#define ERTS_PI_FLAG_ALWAYS_WRAP (1 << 1) -#define ERTS_PI_FLAG_WANT_MSGS (1 << 2) -#define ERTS_PI_FLAG_NEED_MSGQ_LEN (1 << 3) -#define ERTS_PI_FLAG_FORCE_SIG_SEND (1 << 4) -#define ERTS_PI_FLAG_REQUEST_FOR_OTHER (1 << 5) - #define ERTS_PI_UNRESERVE(RS, SZ) \ (ASSERT((RS) >= (SZ)), (RS) -= (SZ)) @@ -803,8 +796,8 @@ static ErtsProcessInfoArgs pi_args[] = { {am_current_function, 4, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, {am_initial_call, 4, 0, ERTS_PROC_LOCK_MAIN}, {am_status, 0, 0, 0}, - {am_messages, 0, ERTS_PI_FLAG_WANT_MSGS|ERTS_PI_FLAG_NEED_MSGQ_LEN|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, - {am_message_queue_len, 0, ERTS_PI_FLAG_NEED_MSGQ_LEN, ERTS_PROC_LOCK_MAIN}, + {am_messages, 0, ERTS_PI_FLAG_WANT_MSGS|ERTS_PI_FLAG_NEED_MSGQ|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, + {am_message_queue_len, 0, 0, ERTS_PROC_LOCK_MAIN}, {am_links, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, {am_monitors, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, {am_monitored_by, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, @@ -813,7 +806,7 @@ static ErtsProcessInfoArgs pi_args[] = { {am_error_handler, 0, 0, ERTS_PROC_LOCK_MAIN}, {am_heap_size, 0, 0, ERTS_PROC_LOCK_MAIN}, {am_stack_size, 0, 0, ERTS_PROC_LOCK_MAIN}, - {am_memory, 0, ERTS_PI_FLAG_NEED_MSGQ_LEN|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, + {am_memory, 0, ERTS_PI_FLAG_NEED_MSGQ|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, {am_garbage_collection, 3+2 + 3+2 + 3+2 + 3+2 + 3+2 + ERTS_MAX_HEAP_SIZE_MAP_SZ, 0, ERTS_PROC_LOCK_MAIN}, {am_group_leader, 0, 0, ERTS_PROC_LOCK_MAIN}, {am_reductions, 0, 0, ERTS_PROC_LOCK_MAIN}, @@ -824,7 +817,7 @@ static ErtsProcessInfoArgs pi_args[] = { {am_catchlevel, 0, 0, ERTS_PROC_LOCK_MAIN}, {am_backtrace, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, {am_last_calls, 0, 0, ERTS_PROC_LOCK_MAIN}, - {am_total_heap_size, 0, ERTS_PI_FLAG_NEED_MSGQ_LEN|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, + {am_total_heap_size, 0, ERTS_PI_FLAG_NEED_MSGQ|ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN}, {am_suspending, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, 0}, {am_min_heap_size, 0, 0, ERTS_PROC_LOCK_MAIN}, {am_min_bin_vheap_size, 0, 0, ERTS_PROC_LOCK_MAIN}, @@ -1106,21 +1099,6 @@ erts_process_info(Process *c_p, static void pi_setup_grow(int **arr, int *def_arr, Uint *sz, int ix); -#ifdef DEBUG -static int -empty_or_adj_msgq_signals_only(ErtsMessage *sig) -{ - ErtsSignal *s = (ErtsSignal *) sig; - for (s = (ErtsSignal *) sig; s; s = (ErtsSignal *) s->common.next) { - if (!ERTS_SIG_IS_NON_MSG(s)) - return 0; - if (ERTS_PROC_SIG_OP(s->common.tag) != ERTS_SIG_Q_OP_ADJ_MSGQ) - return 0; - } - return !0; -} -#endif - static ERTS_INLINE int pi_maybe_flush_signals(Process *c_p, int pi_flags) { @@ -1129,16 +1107,9 @@ pi_maybe_flush_signals(Process *c_p, int pi_flags) /* * pi_maybe_flush_signals() flush signals in callers - * signal queue for two different reasons: - * - * 1. If we need 'message_queue_len', but not 'messages', we need - * to handle all signals in the middle queue in order for - * 'c_p->sig_qs.len' to reflect the amount of messages in the - * message queue. We could count traverse the queues, but it - * is better to handle all signals in the queue instead since - * this is work we anyway need to do at some point. + * signal queue due to the following reason: * - * 2. Ensures that all signals that the caller might have sent to + * Ensures that all signals that the caller might have sent to * itself are handled before we gather information. * * This is, however, not strictly necessary. process_info() is @@ -1159,17 +1130,6 @@ pi_maybe_flush_signals(Process *c_p, int pi_flags) if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) { flushed: - /* - * Even though we've requested a clean sig queue - * the middle queue may contain adjust-message-queue - * signals since those may be reinserted if yielding. - * Such signals does not effect us though. - */ - ASSERT(((pi_flags & (ERTS_PI_FLAG_WANT_MSGS - | ERTS_PI_FLAG_NEED_MSGQ_LEN)) - != ERTS_PI_FLAG_NEED_MSGQ_LEN) - || empty_or_adj_msgq_signals_only(c_p->sig_qs.cont)); - ASSERT(c_p->sig_qs.flags & FS_FLUSHING_SIGS); c_p->sig_qs.flags &= ~(FS_FLUSHED_SIGS|FS_FLUSHING_SIGS); @@ -1180,25 +1140,19 @@ pi_maybe_flush_signals(Process *c_p, int pi_flags) state = erts_atomic32_read_nob(&c_p->state); if (!(c_p->sig_qs.flags & FS_FLUSHING_SIGS)) { - int flush_flags = 0; - if (erts_atomic32_read_nob(&c_p->xstate) - & ERTS_PXSFLG_MAYBE_SELF_SIGS) { - flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_FROM_ID; - } - else if (state & ERTS_PSFLG_MSG_SIG_IN_Q) { - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); - erts_proc_sig_fetch(c_p); - erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - } - if (((pi_flags & (ERTS_PI_FLAG_WANT_MSGS - | ERTS_PI_FLAG_NEED_MSGQ_LEN)) - == ERTS_PI_FLAG_NEED_MSGQ_LEN) - && (flush_flags || c_p->sig_qs.cont)) { - flush_flags |= ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ; + if (!(erts_atomic32_read_nob(&c_p->xstate) + & ERTS_PXSFLG_MAYBE_SELF_SIGS)) { + if (state & ERTS_PSFLG_MSG_SIG_IN_Q) { + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + } + /* done; no need to flush... */ + return 0; } - if (!flush_flags) - return 0; /* done; no need to flush... */ - erts_proc_sig_init_flush_signals(c_p, flush_flags, c_p->common.id); + erts_proc_sig_init_flush_signals(c_p, + ERTS_PROC_SIG_FLUSH_FLG_FROM_ID, + c_p->common.id); if (c_p->sig_qs.flags & FS_FLUSHED_SIGS) goto flushed; } @@ -1327,7 +1281,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) /* wait for it to terminate properly... */ goto send_signal; } - if (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN) { + if (flags & ERTS_PI_FLAG_NEED_MSGQ) { ASSERT(locks & ERTS_PROC_LOCK_MAIN); if (rp->sig_qs.flags & FS_FLUSHING_SIGS) { erts_proc_unlock(rp, locks); @@ -1408,9 +1362,8 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) send_signal: { Eterm ref = erts_make_ref(c_p); - int enqueued, need_msgq_len; + int enqueued; flags |= ERTS_PI_FLAG_REQUEST_FOR_OTHER; - need_msgq_len = (flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); /* * Set save pointer to the end of the message queue so we won't * have to scan the whole* message queue for the result. Note @@ -1419,9 +1372,8 @@ send_signal: { */ erts_msgq_set_save_end(c_p); enqueued = erts_proc_sig_send_process_info_request(c_p, pid, item_ix, - len, need_msgq_len, - flags, reserve_size, - ref); + len, flags, + reserve_size, ref); if (!enqueued) { /* Restore save pointer... */ erts_msgq_set_save_first(c_p); @@ -1581,8 +1533,8 @@ process_info_aux(Process *c_p, } case ERTS_PI_IX_MESSAGES: { - ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); - if (rp->sig_qs.len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) { + ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ); + if (rp->sig_qs.mq_len == 0 || (ERTS_TRACE_FLAGS(rp) & F_SENSITIVE)) { *msgq_len_p = 0; res = NIL; } @@ -1593,7 +1545,7 @@ process_info_aux(Process *c_p, Uint heap_need; mip = erts_alloc(ERTS_ALC_T_TMP, - rp->sig_qs.len*sizeof(ErtsMessageInfo)); + rp->sig_qs.mq_len*sizeof(ErtsMessageInfo)); /* * Note that message queue may shrink when calling @@ -1635,11 +1587,8 @@ process_info_aux(Process *c_p, case ERTS_PI_IX_MESSAGE_QUEUE_LEN: { Sint len = *msgq_len_p; if (len < 0) { - ASSERT((flags & ERTS_PI_FLAG_REQUEST_FOR_OTHER) - || !rp->sig_qs.cont); - len = rp->sig_qs.len; + len = rp->sig_qs.mq_len; } - ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); ASSERT(len >= 0); if (len <= MAX_SMALL) res = make_small(len); @@ -1924,9 +1873,9 @@ process_info_aux(Process *c_p, total_heap_size += rp->mbuf_sz; + ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ); if (rp->sig_qs.flags & FS_ON_HEAP_MSGQ) { ErtsMessage *mp; - ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); for (mp = rp->sig_qs.first; mp; mp = mp->next) { if (ERTS_SIG_IS_RECV_MARKER(mp)) continue; @@ -1934,7 +1883,7 @@ process_info_aux(Process *c_p, if (mp->data.attached) total_heap_size += erts_msg_attached_data_size(mp); } - *reds += (Uint) rp->sig_qs.len / 4; + *reds += (Uint) rp->sig_qs.mq_len / 4; } (void) erts_bld_uint(NULL, &hsz, total_heap_size); @@ -1959,8 +1908,8 @@ process_info_aux(Process *c_p, hp = erts_produce_heap(hfact, hsz, reserve_size); res = erts_bld_uint(&hp, NULL, size); - ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ_LEN); - *reds += (Uint) rp->sig_qs.len / 4; + ASSERT(flags & ERTS_PI_FLAG_NEED_MSGQ); + *reds += (Uint) rp->sig_qs.mq_len / 4; break; } diff --git a/erts/emulator/beam/erl_bif_trace.c b/erts/emulator/beam/erl_bif_trace.c index 20c96797539a..8a519645f888 100644 --- a/erts/emulator/beam/erl_bif_trace.c +++ b/erts/emulator/beam/erl_bif_trace.c @@ -2038,6 +2038,8 @@ void erts_system_monitor_clear(Process *c_p) { erts_system_monitor_large_heap = 0; erts_system_monitor_flags.busy_port = 0; erts_system_monitor_flags.busy_dist_port = 0; + erts_system_monitor_long_msgq_on = ERTS_SWORD_MAX; + erts_system_monitor_long_msgq_off = -1; if (c_p) { erts_thr_progress_unblock(); erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); @@ -2059,7 +2061,18 @@ static Eterm system_monitor_get(Process *p) Eterm long_gc = NIL; Eterm long_schedule = NIL; Eterm large_heap = NIL; - + Eterm long_msgq_off = NIL; + Eterm long_msgq_on = NIL; + + if (erts_system_monitor_long_msgq_off >= 0) { + ASSERT(erts_system_monitor_long_msgq_on + > erts_system_monitor_long_msgq_off); + hsz += 2+3+3; + (void) erts_bld_uint(NULL, &hsz, + (Sint) erts_system_monitor_long_msgq_off); + (void) erts_bld_uint(NULL, &hsz, + (Sint) erts_system_monitor_long_msgq_on); + } if (erts_system_monitor_long_gc != 0) { hsz += 2+3; (void) erts_bld_uint(NULL, &hsz, erts_system_monitor_long_gc); @@ -2074,6 +2087,12 @@ static Eterm system_monitor_get(Process *p) } hp = HAlloc(p, hsz); + if (erts_system_monitor_long_msgq_off >= 0) { + long_msgq_off = erts_bld_uint(&hp, NULL, + (Sint) erts_system_monitor_long_msgq_off); + long_msgq_on = erts_bld_uint(&hp, NULL, + (Sint) erts_system_monitor_long_msgq_on); + } if (erts_system_monitor_long_gc != 0) { long_gc = erts_bld_uint(&hp, NULL, erts_system_monitor_long_gc); } @@ -2085,6 +2104,13 @@ static Eterm system_monitor_get(Process *p) large_heap = erts_bld_uint(&hp, NULL, erts_system_monitor_large_heap); } res = NIL; + if (long_msgq_off != NIL) { + Eterm t; + ASSERT(long_msgq_on != NIL); + t = TUPLE2(hp, long_msgq_off, long_msgq_on); hp += 3; + t = TUPLE2(hp, am_long_message_queue, t); hp += 3; + res = CONS(hp, t, res); hp += 2; + } if (long_gc != NIL) { Eterm t = TUPLE2(hp, am_long_gc, long_gc); hp += 3; res = CONS(hp, t, res); hp += 2; @@ -2148,6 +2174,7 @@ system_monitor(Process *p, Eterm monitor_pid, Eterm list) if (is_not_list(list)) goto error; else { Uint long_gc, long_schedule, large_heap; + Sint long_msgq_on, long_msgq_off; int busy_port, busy_dist_port; system_blocked = 1; @@ -2159,7 +2186,8 @@ system_monitor(Process *p, Eterm monitor_pid, Eterm list) goto error; for (long_gc = 0, long_schedule = 0, large_heap = 0, - busy_port = 0, busy_dist_port = 0; + busy_port = 0, busy_dist_port = 0, + long_msgq_on = ERTS_SWORD_MAX, long_msgq_off = -1; is_list(list); list = CDR(list_val(list))) { Eterm t = CAR(list_val(list)); @@ -2176,6 +2204,14 @@ system_monitor(Process *p, Eterm monitor_pid, Eterm list) if (! term_to_Uint(tp[2], &large_heap)) goto error; if (large_heap < 16384) large_heap = 16384; /* 16 Kword is not an unnatural heap size */ + } else if (tp[1] == am_long_message_queue) { + if (!is_tuple_arity(tp[2], 2)) goto error; + tp = tuple_val(tp[2]); + if (!term_to_Sint(tp[1], &long_msgq_off)) goto error; + if (!term_to_Sint(tp[2], &long_msgq_on)) goto error; + if (long_msgq_off < 0) goto error; + if (long_msgq_on <= 0) goto error; + if (long_msgq_off >= long_msgq_on) goto error; } else goto error; } else if (t == am_busy_port) { busy_port = !0; @@ -2191,6 +2227,8 @@ system_monitor(Process *p, Eterm monitor_pid, Eterm list) erts_system_monitor_large_heap = large_heap; erts_system_monitor_flags.busy_port = !!busy_port; erts_system_monitor_flags.busy_dist_port = !!busy_dist_port; + erts_system_monitor_long_msgq_off = long_msgq_off; + erts_system_monitor_long_msgq_on = long_msgq_on; erts_thr_progress_unblock(); BIF_RET(prev); diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index 6ba0acbda210..cdfb32588dd9 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -48,6 +48,8 @@ #define ERTS_INACT_WR_PB_LEAVE_LIMIT 10 #define ERTS_INACT_WR_PB_LEAVE_PERCENTAGE 10 +#define ERTS_LONG_GC_MAX_NMSIGS 1000 + #if defined(DEBUG) || 0 #define ERTS_GC_DEBUG #else @@ -649,22 +651,29 @@ young_gen_usage(Process *p, Uint *ext_msg_usage) } \ } while (0) - static ERTS_INLINE void check_for_possibly_long_gc(Process *p, Uint ygen_usage) { int major; - Uint sz; + Sint sz; major = (p->flags & F_NEED_FULLSWEEP) || GEN_GCS(p) >= MAX_GEN_GCS(p); sz = ygen_usage; sz += p->hend - p->stop; - if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) - sz += erts_proc_sig_privqs_len(p); if (major) sz += p->old_htop - p->old_heap; - + if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) { + Sint len, max_len = ERTS_POTENTIALLY_LONG_GC_HSIZE - sz; + if (max_len < 0) + len = -1; + else + len = erts_proc_sig_privqs_len(p, max_len, ERTS_LONG_GC_MAX_NMSIGS); + if (len < 0) + sz = ERTS_POTENTIALLY_LONG_GC_HSIZE; + else + sz += (Uint) len; + } if (sz >= ERTS_POTENTIALLY_LONG_GC_HSIZE) { ASSERT(!(p->flags & (F_DISABLE_GC|F_DELAY_GC))); p->flags |= major ? F_DIRTY_MAJOR_GC : F_DIRTY_MINOR_GC; @@ -2690,16 +2699,7 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) } #endif - size = n + erts_proc_sig_privqs_len(p); - if (p->sig_qs.cont) { - /* - * We do not know the exact size needed since - * alias-message signals are not included in - * length of private queues. We might have to - * resize while inspecting the signal queue... - */ - size += 128; - } + size = n + erts_proc_sig_privqs_len(p, -1, -1); if (size > rootset->size) { ERTS_GC_ASSERT(roots == rootset->def); roots = erts_alloc(ERTS_ALC_T_ROOTSET, @@ -2719,25 +2719,19 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) } mp = p->sig_qs.cont; while (mp) { - if (size <= n) { - Roots *old_roots = roots; - size += 128; - roots = erts_alloc(ERTS_ALC_T_ROOTSET, - size*sizeof(Roots)); - sys_memcpy(roots, old_roots, n*sizeof(Roots)); - if (old_roots != rootset->def) - erts_free(ERTS_ALC_T_ROOTSET, old_roots); - rootset->size = size; - } + ASSERT(n < size); if (ERTS_SIG_IS_INTERNAL_MSG(mp) && !mp->data.attached) { roots[n].v = mp->m; roots[n].sz = ERL_MESSAGE_REF_ARRAY_SZ; n++; } else if (ERTS_SIG_IS_HEAP_ALIAS_MSG(mp)) { - /* Exclude message slot... */ - roots[n].v = &mp->m[1]; - roots[n].sz = ERL_MESSAGE_REF_ARRAY_SZ - 1; + /* + * Exclude message and token slots since they do + * not yet contain valid Erlang terms... + */ + roots[n].v = &mp->m[2]; + roots[n].sz = ERL_MESSAGE_REF_ARRAY_SZ - 2; n++; } mp = mp->next; @@ -3402,21 +3396,25 @@ offset_message(ErtsMessage *mp, Sint offs, char* area, Uint area_size) } break; } + mesg = ERL_MESSAGE_TOKEN(mp); + if (is_boxed(mesg) && ErtsInArea(ptr_val(mesg), area, area_size)) { + ERL_MESSAGE_TOKEN(mp) = offset_ptr(mesg, offs); + } + ASSERT((is_nil(ERL_MESSAGE_TOKEN(mp)) || + is_tuple(ERL_MESSAGE_TOKEN(mp)) || + is_atom(ERL_MESSAGE_TOKEN(mp)))); } - mesg = ERL_MESSAGE_TOKEN(mp); - if (is_boxed(mesg) && ErtsInArea(ptr_val(mesg), area, area_size)) { - ERL_MESSAGE_TOKEN(mp) = offset_ptr(mesg, offs); - } + /* + * In the alias message case, both reference to actual message and + * reference to a potential token are contained in the 'from' + * entry... + */ mesg = ERL_MESSAGE_FROM(mp); if (is_boxed(mesg) && ErtsInArea(ptr_val(mesg), area, area_size)) { ERL_MESSAGE_FROM(mp) = offset_ptr(mesg, offs); } ERTS_OFFSET_DT_UTAG(mp, area, area_size, offs); - - ASSERT((is_nil(ERL_MESSAGE_TOKEN(mp)) || - is_tuple(ERL_MESSAGE_TOKEN(mp)) || - is_atom(ERL_MESSAGE_TOKEN(mp)))); } } @@ -3747,7 +3745,7 @@ reached_max_heap_size(Process *p, Uint total_heap_size, hp = erts_produce_heap(&hfact, 2*(alive ? 9 : 8), 0); args = CONS(hp, stacktrace, args); hp += 2; args = CONS(hp, msg, args); hp += 2; - args = CONS(hp, make_small((p)->sig_inq.len), args); hp += 2; + args = CONS(hp, make_small((p)->sig_qs.mq_len), args); hp += 2; args = CONS(hp, am_true, args); hp += 2; args = CONS(hp, (max_heap_flags & MAX_HEAP_SIZE_KILL ? am_true : am_false), args); hp += 2; args = CONS(hp, make_small(total_heap_size), args); hp += 2; diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 0eea3c826e5d..ca645257e37b 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -272,6 +272,7 @@ struct erl_mesg { typedef union { ErtsSignalCommon common; + ErtsNonMsgSignal nm_sig; ErtsMessageRef msg; } ErtsSignal; @@ -317,7 +318,7 @@ typedef struct { * * These are: * - an inner queue which only consists of - * message signals + * message signals and possibly receive markers * - a middle queue which contains a mixture * of message and non-message signals * @@ -354,26 +355,27 @@ typedef struct { * as an offset which even might be negative. */ - /* inner queue */ + /* inner queue (message queue) */ ErtsMessage *first; ErtsMessage **last; /* point to the last next pointer */ ErtsMessage **save; + Sint mq_len; /* Message queue length */ /* middle queue */ ErtsMessage *cont; ErtsMessage **cont_last; ErtsMsgQNMSigs nmsigs; - + Sint mlenoffs; /* nr of trailing msg sigs after last non-msg sig */ + /* Common for inner and middle queue */ ErtsRecvMarkerBlock *recv_mrk_blk; - Sint len; /* NOT message queue length (see above) */ Uint32 flags; } ErtsSignalPrivQueues; typedef struct ErtsSignalInQueue_ { ErtsMessage* first; ErtsMessage** last; /* point to the last next pointer */ - Sint len; /* number of messages in queue */ + Sint mlenoffs; /* nr of trailing msg sigs after last non-msg sig */ ErtsMsgQNMSigs nmsigs; #ifdef ERTS_PROC_SIG_HARD_DEBUG int may_contain_heap_terms; @@ -452,12 +454,14 @@ typedef struct erl_trace_message_queue__ { do { \ ASSERT(ERTS_SIG_IS_MSG(msg)); \ ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), &(p)->sig_inq, "before");\ + ERTS_HDBG_INQ_LEN(&(p)->sig_inq); \ *(p)->sig_inq.last = (msg); \ (p)->sig_inq.last = &(msg)->next; \ - (p)->sig_inq.len++; \ + (p)->sig_inq.mlenoffs++; \ if (!((ps) & ERTS_PSFLG_MSG_SIG_IN_Q)) \ (void) erts_atomic32_read_bor_nob(&(p)->state, \ ERTS_PSFLG_MSG_SIG_IN_Q); \ + ERTS_HDBG_INQ_LEN(&(p)->sig_inq); \ ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), &(p)->sig_inq, "after"); \ } while(0) diff --git a/erts/emulator/beam/erl_monitor_link.h b/erts/emulator/beam/erl_monitor_link.h index c61b0dd16f60..ee2c1892abe0 100644 --- a/erts/emulator/beam/erl_monitor_link.h +++ b/erts/emulator/beam/erl_monitor_link.h @@ -517,7 +517,7 @@ typedef struct { struct ErtsMonLnkNode__ { union { - ErtsSignalCommon signal; + ErtsNonMsgSignal signal; ErtsMonLnkTreeNode tree; ErtsMonLnkListNode list; } node; diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 17d2eed6d640..3b5270c1df5d 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -63,6 +63,15 @@ #define ERTS_PROC_SIG_ADJ_MSGQ_MSGS_FACTOR \ 25 +#ifdef USE_VM_PROBES +# define ERTS_CLEAR_SEQ_TOKEN_VALUE(MP) \ + ((ERL_MESSAGE_DT_UTAG((MP)) != NIL) ? am_have_dt_utag : NIL) +#else +# define ERTS_CLEAR_SEQ_TOKEN_VALUE(MP) NIL +#endif +#define ERTS_CLEAR_SEQ_TOKEN(MP) \ + ERL_MESSAGE_TOKEN((MP)) = ERTS_CLEAR_SEQ_TOKEN_VALUE((MP)) + Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max); @@ -71,7 +80,6 @@ void erts_proc_sig_queue_init(void) { ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MASK > ERTS_SIG_Q_OP_MAX); - ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK > ERTS_SIG_Q_OP_MAX); ERTS_CT_ASSERT(ERTS_SIG_Q_TYPE_MASK >= ERTS_SIG_Q_TYPE_MAX); } @@ -109,13 +117,18 @@ typedef struct { } u; } ErtsExitSignalData; +typedef struct { + ErtsExitSignalData xsigd; + Eterm token; +} ErtsSeqTokenExitSignalData; + typedef struct { Eterm message; Eterm key; } ErtsPersistMonMsg; typedef struct { - ErtsSignalCommon common; + ErtsNonMsgSignal common; Eterm nodename; Uint32 connection_id; Eterm local; /* internal pid (immediate) */ @@ -133,7 +146,12 @@ typedef struct { } ErtsDistSpawnReplySigData; typedef struct { - ErtsSignalCommon common; + ErtsDistSpawnReplySigData data; + Eterm token; +} ErtsDistSeqTokenSpawnReplySigData; + +typedef struct { + ErtsNonMsgSignal common; Uint flags_on; Uint flags_off; Eterm tracer; @@ -144,7 +162,7 @@ typedef struct { #define ERTS_SIG_GL_FLG_SENDER (((erts_aint_t) 1) << 2) typedef struct { - ErtsSignalCommon common; + ErtsNonMsgSignal common; erts_atomic_t flags; Eterm group_leader; Eterm reply_to; @@ -170,16 +188,7 @@ typedef struct { } ErtsProcSigPendingSuspend; typedef struct { - ErtsSignalCommon common; - Sint refc; - Sint delayed_len; - Sint len_offset; -} ErtsProcSigMsgQLenOffsetMarker; - -typedef struct { - ErtsSignalCommon common; - ErtsProcSigMsgQLenOffsetMarker marker; - Sint msgq_len_offset; + ErtsNonMsgSignal common; Eterm requester; Eterm ref; ErtsORefThing oref_thing; @@ -189,11 +198,8 @@ typedef struct { int item_ix[1]; /* of len size in reality... */ } ErtsProcessInfoSig; -#define ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE ((Sint) -1) -#define ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC ((Sint) -2) - typedef struct { - ErtsSignalCommon common; + ErtsNonMsgSignal common; Eterm requester; Eterm (*func)(Process *, void *, int *, ErlHeapFragment **); void *arg; @@ -306,6 +312,12 @@ static void save_delayed_nm_signal(ErtsSavedNMSignals *saved_sigs, ErtsMessage *sig) { ErtsSignal *nm_sig = (ErtsSignal *) sig; + /* + * All saved signals will be restored at the front of the + * middle queue, so no message signals will precede any of + * them... + */ + nm_sig->nm_sig.mlenoffs = 0; nm_sig->common.next = NULL; nm_sig->common.specific.next = NULL; if (!saved_sigs->first) { @@ -373,7 +385,7 @@ restore_delayed_nm_signals(Process *c_p, ErtsSavedNMSignals *saved_sigs) } typedef struct { - ErtsSignalCommon common; + ErtsNonMsgSignal common; Eterm ref; Eterm heap[1]; } ErtsSigDistProcDemonitor; @@ -445,6 +457,20 @@ get_exit_signal_data(ErtsMessage *xsig) + xsig->hfrag.used_size); } +static ERTS_INLINE void +clear_seq_token_gen_exit(ErtsMessage *sig) +{ + Uint tag = ((ErtsSignal *) sig)->common.tag; + ASSERT(ERTS_SIG_Q_OP_EXIT == ERTS_PROC_SIG_OP(tag) + || ERTS_SIG_Q_OP_EXIT_LINKED == ERTS_PROC_SIG_OP(tag) + || ERTS_SIG_Q_OP_MONITOR_DOWN == ERTS_PROC_SIG_OP(tag)); + if (ERTS_PROC_SIG_XTRA(tag)) { + ErtsSeqTokenExitSignalData *datap + = (ErtsSeqTokenExitSignalData *) get_exit_signal_data(sig); + datap->token = ERTS_CLEAR_SEQ_TOKEN_VALUE(sig); + } +} + static ERTS_INLINE ErtsDistSpawnReplySigData * get_dist_spawn_reply_data(ErtsMessage *sig) { @@ -456,6 +482,18 @@ get_dist_spawn_reply_data(ErtsMessage *sig) + sig->hfrag.used_size); } +static ERTS_INLINE void +clear_seq_token_spawn_reply(ErtsMessage *sig) +{ + Uint tag = ((ErtsSignal *) sig)->common.tag; + ASSERT(ERTS_SIG_Q_OP_DIST_SPAWN_REPLY == ERTS_PROC_SIG_OP(tag)); + if (ERTS_PROC_SIG_XTRA(tag)) { + ErtsDistSeqTokenSpawnReplySigData *datap + = (ErtsDistSeqTokenSpawnReplySigData *) get_dist_spawn_reply_data(sig); + datap->token = ERTS_CLEAR_SEQ_TOKEN_VALUE(sig); + } +} + static ERTS_INLINE ErtsCLAData * get_cla_data(ErtsMessage *sig) { @@ -516,6 +554,9 @@ sig_enqueue_trace(ErtsPTabElementCommon *sender, Eterm from, | F_TRACE_SOL1)))) { ErtsSigTraceInfo *ti; Eterm tag; + ((ErtsSignal *) *sigp)->nm_sig.mlenoffs = 0; /* directly following + trace info signal... + */ /* * Set on link enabled. * @@ -529,6 +570,9 @@ sig_enqueue_trace(ErtsPTabElementCommon *sender, Eterm from, ti->common.next = *sigp; ti->common.specific.next = &ti->common.next; ti->common.tag = tag; + ti->common.mlenoffs = 0; /* Need to zero this since it may be + preceeded by another non-message + signal... */ ti->flags_on = ERTS_TRACE_FLAGS(c_p) & TRACEE_FLAGS; if (!(ti->flags_on & F_TRACE_SOL1)) { @@ -676,6 +720,8 @@ static int dbg_check_non_msg(ErtsSignalInQueue* q) * Appart from next pointers between the signals in the sequence also: * * next pointer pointers between non-message signals must have been * correctly set up. + * * all non leading non-message signals should have 'mlenoffs' set to zero. + * * * @param is_to_buffer Non-zero if not enqueue on processes in signal * queue. @@ -713,10 +759,70 @@ enqueue_signals(int is_to_buffer, Process *rp, ErtsMessage *first, ASSERT(!!is_to_buffer == (dest_queue != &rp->sig_inq)); +#if defined(ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN) && 0 + /* verify that the signal sequence fulfills the requirements... */ + Sint no_nmsg_sigs = 0, no_msg_sigs = 0; + int last_next_found = 0, last_nmsig_found = 0; + ErtsMessage *sig; + ErtsNonMsgSignal *nmsig = NULL; + + + for (sig = first; sig; sig = sig->next) { + if (ERTS_SIG_IS_MSG(sig)) { + no_msg_sigs++; + } + else { + if (no_nmsg_sigs++ == 0) { + ERTS_ASSERT(!nmsig); + nmsig = (ErtsNonMsgSignal *) sig; + } + else { + ERTS_ASSERT(((ErtsNonMsgSignal *) sig)->mlenoffs == 0); + } + ERTS_ASSERT(!last_nmsig_found); + ERTS_ASSERT(no_msg_sigs == 0); + ERTS_ASSERT(nmsig == (ErtsNonMsgSignal *) sig); + if (last_next && last_next == nmsig->specific.next) { + ERTS_ASSERT(!last_next_found); + last_next_found = !0; + } + else if (last_next_found) { + ERTS_ASSERT(last_next); + ERTS_ASSERT(*last_next == sig); + ERTS_ASSERT(!sig->next); + ERTS_ASSERT(!nmsig->specific.next); + } + if (!nmsig->specific.next) { + last_nmsig_found = !0; + nmsig = NULL; + } + else { + ERTS_ASSERT(nmsig->specific.next == &sig->next); + nmsig = (ErtsNonMsgSignal *) *nmsig->specific.next; + } + } + } + + ERTS_ASSERT(!nmsig); + ERTS_ASSERT(no_msg_sigs == 0 + || no_nmsg_sigs == 0 + || no_nmsg_sigs == 1); + ERTS_ASSERT(no_nmsg_sigs <= 1 + || (no_msg_sigs == 0 && last_next && last_next_found)); + ERTS_ASSERT(no_nmsg_sigs > 1 + || (!last_next && !last_next_found)); + ERTS_ASSERT(no_nmsg_sigs == 0 || last_nmsig_found); + ERTS_ASSERT(num_msgs == no_msg_sigs); + +#endif /* DEBUG */ + if (flush_buffers) { erts_proc_sig_queue_flush_buffers(rp); } + + ERTS_HDBG_INQ_LEN(dest_queue); + this = dest_queue->last; ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp, dest_queue); @@ -732,8 +838,9 @@ enqueue_signals(int is_to_buffer, Process *rp, ErtsMessage *first, if (nmsig) { dest_queue->nmsigs.next = this; set_flags |= ERTS_PSFLG_NMSG_SIG_IN_Q; + ((ErtsNonMsgSignal *) first)->mlenoffs = dest_queue->mlenoffs; + dest_queue->mlenoffs = 0; } - } else { ErtsSignal *sig; @@ -744,6 +851,8 @@ enqueue_signals(int is_to_buffer, Process *rp, ErtsMessage *first, ASSERT(sig && !sig->common.specific.next); if (nmsig) { sig->common.specific.next = this; + ((ErtsNonMsgSignal *) first)->mlenoffs = dest_queue->mlenoffs; + dest_queue->mlenoffs = 0; } } @@ -773,7 +882,9 @@ enqueue_signals(int is_to_buffer, Process *rp, ErtsMessage *first, else ASSERT(dbg_count_nmsigs(first) == 0); - dest_queue->len += num_msgs; + dest_queue->mlenoffs += num_msgs; + + ERTS_HDBG_INQ_LEN(dest_queue); ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp, dest_queue); @@ -887,10 +998,6 @@ erts_ensure_dirty_proc_signals_handled(Process *proc, } } -static void -check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig); - - static int proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, ErtsSignal *sig, int force_flush, int op) @@ -916,6 +1023,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, erts_port_lookup(from, ERTS_PORT_SFLGS_INVALID_LOOKUP) != NULL))); ASSERT(is_value(from) && is_internal_pid(pid)); + ASSERT(ERTS_SIG_IS_NON_MSG(sig)); if (is_normal_sched) { pend_sig = esdp->pending_signal.sig; @@ -975,6 +1083,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, } /* Prepend pending signal */ + sig->nm_sig.mlenoffs = 0; /* directly preceeded by pend_sig... */ pend_sig->common.next = (ErtsMessage*) sig; pend_sig->common.specific.next = &pend_sig->common.next; first = (ErtsMessage*) pend_sig; @@ -1015,7 +1124,7 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, last->next = NULL; - if (!force_flush && op != ERTS_SIG_Q_OP_PROCESS_INFO && + if (!force_flush && erts_proc_sig_queue_try_enqueue_to_buffer(from, rp, 0, first, &last->next, last_next, 0)) { if (!is_normal_sched) { @@ -1041,8 +1150,6 @@ proc_queue_signal(ErtsPTabElementCommon *sender, Eterm from, Eterm pid, state = enqueue_signals(0, rp, first, &last->next, last_next, 0, state, &rp->sig_inq); - if (ERTS_UNLIKELY(op == ERTS_SIG_Q_OP_PROCESS_INFO)) - check_push_msgq_len_offs_marker(rp, sig); res = !0; } @@ -1149,6 +1256,13 @@ erts_proc_sig_fetch__(Process *proc, const erts_aint32_t clear_flags = (ERTS_PSFLG_MSG_SIG_IN_Q | ERTS_PSFLG_NMSG_SIG_IN_Q); erts_aint32_t set_flags = 0; +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN + int not_exiting = !ERTS_PROC_IS_EXITING(proc); + if (not_exiting) { + ERTS_HDBG_PRIVQ_LEN(proc); + ERTS_HDBG_INQ_LEN(&proc->sig_inq); + } +#endif if (buffers) proc_sig_queue_flush_buffers(proc, buffers); @@ -1170,20 +1284,38 @@ erts_proc_sig_fetch__(Process *proc, } else { + ASSERT(proc->sig_qs.mq_len >= 0); + ASSERT(proc->sig_qs.mlenoffs >= 0); + if (!proc->sig_inq.nmsigs.next) { ASSERT(!proc->sig_inq.nmsigs.last); + ASSERT(proc->sig_inq.mlenoffs > 0); if (!proc->sig_qs.cont && !ERTS_MSG_RECV_TRACED(proc)) { *proc->sig_qs.last = proc->sig_inq.first; proc->sig_qs.last = proc->sig_inq.last; + ASSERT(proc->sig_qs.mlenoffs == 0); + proc->sig_qs.mq_len += proc->sig_inq.mlenoffs; + erts_chk_sys_mon_long_msgq_on(proc); } else { *proc->sig_qs.cont_last = proc->sig_inq.first; proc->sig_qs.cont_last = proc->sig_inq.last; + proc->sig_qs.mlenoffs += proc->sig_inq.mlenoffs; set_flags = ERTS_PSFLG_SIG_Q; } } else { + ErtsNonMsgSignal *nmsig = + (ErtsNonMsgSignal *) *proc->sig_inq.nmsigs.next; + ASSERT(nmsig); + + ASSERT(proc->sig_inq.mlenoffs >= 0); + ASSERT(nmsig->mlenoffs >= 0); + + nmsig->mlenoffs += proc->sig_qs.mlenoffs; + proc->sig_qs.mlenoffs = proc->sig_inq.mlenoffs; + ASSERT(proc->sig_inq.nmsigs.last); if (!proc->sig_qs.nmsigs.last) { ASSERT(!proc->sig_qs.nmsigs.next); @@ -1216,11 +1348,9 @@ erts_proc_sig_fetch__(Process *proc, proc->sig_qs.cont_last = proc->sig_inq.last; } - proc->sig_qs.len += proc->sig_inq.len; - proc->sig_inq.first = NULL; proc->sig_inq.last = &proc->sig_inq.first; - proc->sig_inq.len = 0; + proc->sig_inq.mlenoffs = 0; } ASSERT((set_flags & clear_flags) == 0); @@ -1264,101 +1394,13 @@ erts_proc_sig_fetch__(Process *proc, unget_buffers_return: erts_proc_sig_queue_unget_buffers(buffers, need_unget_buffers); } -} - -Sint -erts_proc_sig_fetch_msgq_len_offs__(Process *proc, - ErtsSignalInQueueBufferArray *buffers, - int need_unget_buffers) -{ - ErtsProcSigMsgQLenOffsetMarker *marker; - Sint len; - - marker = (ErtsProcSigMsgQLenOffsetMarker *) proc->sig_inq.first; - ASSERT(marker); - ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK); - - proc->sig_qs.flags |= FS_DELAYED_PSIGQS_LEN; - - /* - * Prevent update of sig_qs.len in fetch. These - * updates are done via process-info signal(s) - * instead... - */ - len = proc->sig_inq.len; - marker->delayed_len += len; - marker->len_offset -= len; - proc->sig_inq.len = 0; - - /* - * Temporarily remove marker during fetch... - */ - - proc->sig_inq.first = marker->common.next; - if (proc->sig_inq.last == &marker->common.next) - proc->sig_inq.last = &proc->sig_inq.first; - if (proc->sig_inq.nmsigs.next == &marker->common.next) - proc->sig_inq.nmsigs.next = &proc->sig_inq.first; - if (proc->sig_inq.nmsigs.last == &marker->common.next) - proc->sig_inq.nmsigs.last = &proc->sig_inq.first; - - erts_proc_sig_fetch__(proc, buffers, need_unget_buffers); - - marker->common.next = NULL; - proc->sig_inq.first = (ErtsMessage *) marker; - proc->sig_inq.last = &marker->common.next; - - return marker->delayed_len; -} - -static ERTS_INLINE Sint -proc_sig_privqs_len(Process *c_p, int have_qlock) -{ - Sint res = c_p->sig_qs.len; - - ERTS_LC_ASSERT(!have_qlock - ? (ERTS_PROC_LOCK_MAIN - == erts_proc_lc_my_proc_locks(c_p)) - : ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN) - == ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN) - & erts_proc_lc_my_proc_locks(c_p)))); - - if (c_p->sig_qs.flags & FS_DELAYED_PSIGQS_LEN) { - ErtsProcSigMsgQLenOffsetMarker *marker; - - if (!have_qlock) - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); - - marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first; - ASSERT(marker); - ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK); - - res += marker->delayed_len; - - if (!have_qlock) - erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - } #ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN - { - Sint len = 0; - ERTS_FOREACH_SIG_PRIVQS( - c_p, mp, - { - if (ERTS_SIG_IS_MSG(mp)) - len++; - }); - ERTS_ASSERT(res == len); + if (not_exiting) { + ERTS_HDBG_PRIVQ_LEN(proc); } #endif - return res; -} - -Sint -erts_proc_sig_privqs_len(Process *c_p) -{ - return proc_sig_privqs_len(c_p, 0); } void @@ -1389,8 +1431,13 @@ get_external_non_msg_signal(ErtsMessage *sig) ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT); ASSERT(is_non_value(xsigd->reason)); - if (sig->hfrag.next == NULL) - return (ErtsDistExternal*)(xsigd + 1); + if (sig->hfrag.next == NULL) { + char *ptr = (char *) xsigd; + ptr += (ERTS_PROC_SIG_XTRA(((ErtsSignal *) sig)->common.tag) + ? sizeof(ErtsSeqTokenExitSignalData) + : sizeof(ErtsExitSignalData)); + return (ErtsDistExternal *) ptr; + } return erts_get_dist_ext(sig->hfrag.next); } @@ -1423,7 +1470,7 @@ send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, ErlHeapFragment *hfrag; ErlOffHeap *ohp; Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz = 0; - int seq_trace; + int seq_trace, has_token; Process *c_p; #ifdef USE_VM_PROBES Eterm s_utag, utag; @@ -1441,13 +1488,6 @@ send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, ASSERT(is_immed(from_tag)); - hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm); - - seq_trace = c_p && have_seqtrace(token); - if (seq_trace) { - seq_trace_update_serial(c_p); - } - #ifdef USE_VM_PROBES utag_sz = 0; utag = NIL; @@ -1461,8 +1501,21 @@ send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, hsz += utag_sz; #endif - token_sz = size_object(token); - hsz += token_sz; + has_token = !is_nil(token); + if (has_token) { + hsz = sizeof(ErtsSeqTokenExitSignalData)/sizeof(Eterm); + token_sz = size_object(token); + hsz += token_sz; + } + else { + hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm); + token_sz = 0; + } + + seq_trace = c_p && have_seqtrace(token); + if (seq_trace) { + seq_trace_update_serial(c_p); + } from_sz = size_object(from); hsz += from_sz; @@ -1510,8 +1563,8 @@ send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, ohp = &hfrag->off_heap; start_hp = hp; - s_token = copy_struct(token, token_sz, &hp, ohp); - s_from = copy_struct(from, from_sz, &hp, ohp); + s_token = is_immed(token) ? token : copy_struct(token, token_sz, &hp, ohp); + s_from = is_immed(from) ? from : copy_struct(from, from_sz, &hp, ohp); s_ref = copy_struct(ref, ref_sz, &hp, ohp); if (is_value(reason)) { @@ -1545,11 +1598,9 @@ send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, : copy_struct(utag, utag_sz, &hp, ohp)); ERL_MESSAGE_DT_UTAG(mp) = s_utag; #endif - ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(op, ERTS_SIG_Q_TYPE_GEN_EXIT, - 0); - ERL_MESSAGE_TOKEN(mp) = s_token; + has_token); ERL_MESSAGE_FROM(mp) = from_tag; /* immediate... */ hfrag->used_size = hp - start_hp; @@ -1559,6 +1610,8 @@ send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, xsigd->message = s_message; xsigd->from = s_from; xsigd->reason = s_reason; + if (has_token) + ((ErtsSeqTokenExitSignalData *) xsigd)->token = s_token; hfrag->next = dist_ext_hfrag; if (is_not_nil(s_ref)) { @@ -1574,7 +1627,9 @@ send_gen_exit_signal(ErtsPTabElementCommon *sender, Eterm from_tag, xsigd->u.link.connection_id = conn_id; } - hp += sizeof(ErtsExitSignalData)/sizeof(Eterm); + hp += (has_token + ? sizeof(ErtsSeqTokenExitSignalData)/sizeof(Eterm) + : sizeof(ErtsExitSignalData)/sizeof(Eterm)); if (dist_ext != NULL && dist_ext_hfrag == NULL && is_non_value(reason)) { erts_make_dist_ext_copy(dist_ext, (ErtsDistExternal *) hp); @@ -1633,12 +1688,16 @@ do_seq_trace_output(Eterm to, Eterm token, Eterm msg) static ERTS_INLINE int get_alias_msg_data(ErtsMessage *sig, Eterm *fromp, Eterm *aliasp, - Eterm *msgp, void **attachedp) + Eterm *msgp, void **attachedp, Eterm *tokenp) { - int type = ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag); + Eterm tag = ((ErtsSignal *) sig)->common.tag; + int type = ERTS_PROC_SIG_TYPE(tag); + int seq_token = (int) ERTS_PROC_SIG_XTRA(tag); Eterm *tp; + if (type == ERTS_SIG_Q_TYPE_DIST) { + ASSERT(sig->hfrag.alloc_size >= (seq_token ? 2 : 1)); if (fromp) *fromp = ERL_MESSAGE_FROM(sig); if (aliasp) @@ -1647,13 +1706,19 @@ get_alias_msg_data(ErtsMessage *sig, Eterm *fromp, Eterm *aliasp, *msgp = THE_NON_VALUE; if (attachedp) *attachedp = ERTS_MSG_COMBINED_HFRAG; + if (tokenp) + *tokenp = seq_token ? sig->hfrag.mem[1] : NIL; return type; } - ASSERT(is_tuple_arity(ERL_MESSAGE_FROM(sig), 3) - || is_tuple_arity(ERL_MESSAGE_FROM(sig), 5)); + ASSERT(is_tuple(ERL_MESSAGE_FROM(sig))); tp = tuple_val(ERL_MESSAGE_FROM(sig)); + + ASSERT(seq_token + ? (arityval(tp[0]) == 4 || arityval(tp[0]) == 6) + : (arityval(tp[0]) == 3 || arityval(tp[0]) == 5)); + if (fromp) *fromp = tp[1]; if (aliasp) @@ -1664,17 +1729,21 @@ get_alias_msg_data(ErtsMessage *sig, Eterm *fromp, Eterm *aliasp, if (!attachedp) return type; - if (is_tuple_arity(ERL_MESSAGE_FROM(sig), 3)) { + if (arityval(tp[0]) < 5) { + ASSERT(arityval(tp[0]) == (seq_token ? 4: 3)); if (type == ERTS_SIG_Q_TYPE_HEAP) *attachedp = NULL; else { ASSERT(type == ERTS_SIG_Q_TYPE_OFF_HEAP); *attachedp = ERTS_MSG_COMBINED_HFRAG; } + if (tokenp) + *tokenp = seq_token ? tp[4] : NIL; } else { Uint low, high; ASSERT(type == ERTS_SIG_Q_TYPE_HEAP_FRAG); + ASSERT(arityval(tp[0]) == (seq_token ? 6: 5)); /* * Heap fragment pointer in element 4 and 5. See * erts_proc_sig_send_to_alias(). @@ -1691,11 +1760,43 @@ get_alias_msg_data(ErtsMessage *sig, Eterm *fromp, Eterm *aliasp, *attachedp = (void *) ((((Uint) high) << 16) | ((Uint) low)); #endif ASSERT(*attachedp != NULL); + if (tokenp) + *tokenp = seq_token ? tp[6] : NIL; } return type; } +static ERTS_INLINE void +clear_seq_token_alias_msg(ErtsMessage *sig) +{ + Uint tag = ((ErtsSignal *) sig)->common.tag; + ASSERT(ERTS_SIG_Q_OP_ALIAS_MSG == ERTS_PROC_SIG_OP(tag)); + if (ERTS_PROC_SIG_XTRA(tag)) { + switch (ERTS_PROC_SIG_TYPE(tag)) { + case ERTS_SIG_Q_TYPE_DIST: + sig->hfrag.mem[1] = ERTS_CLEAR_SEQ_TOKEN_VALUE(sig); + break; + case ERTS_SIG_Q_TYPE_HEAP_FRAG: { + Eterm *tp = tuple_val(ERL_MESSAGE_FROM(sig)); + ASSERT(arityval(tp[0]) == 6); + tp[6] = ERTS_CLEAR_SEQ_TOKEN_VALUE(sig); + break; + } + case ERTS_SIG_Q_TYPE_HEAP: + case ERTS_SIG_Q_TYPE_OFF_HEAP: { + Eterm *tp = tuple_val(ERL_MESSAGE_FROM(sig)); + ASSERT(arityval(tp[0]) == 4); + tp[4] = ERTS_CLEAR_SEQ_TOKEN_VALUE(sig); + break; + } + default: + ASSERT(0); + break; + } + } +} + void erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig) { @@ -1716,7 +1817,7 @@ erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig) if (ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(tag)) { /* Retrieve pointer to heap fragment (may not be NULL). */ void *attached; - (void) get_alias_msg_data(sig, NULL, NULL, NULL, &attached); + (void) get_alias_msg_data(sig, NULL, NULL, NULL, &attached, NULL); sig->data.heap_frag = hfrag = (ErlHeapFragment *) attached; ASSERT(hfrag); } @@ -1783,7 +1884,7 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm ErlOffHeap *ohp; Uint hsz, to_sz, token_sz, msg_sz; Eterm *hp, pid, to_copy, token_copy, msg_copy; - int type; + int type, has_token; #ifdef SHCOPY_SEND erts_shcopy_t info; #else @@ -1832,8 +1933,14 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm to_sz = size_object(to); hsz += to_sz; - token_sz = size_object(token); - hsz += token_sz; + has_token = !is_nil(token); + if (has_token) { + token_sz = size_object(token); + hsz += 1 /* extra element in from-tuple */ + token_sz; + } + else { + token_sz = 0; + } /* * SHCOPY corrupts the heap between copy_shared_calculate(), and @@ -1857,7 +1964,12 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm rp_state = erts_atomic32_read_nob(&rp->state); if (rp_state & ERTS_PSFLG_OFF_HEAP_MSGQ) { type = ERTS_SIG_Q_TYPE_OFF_HEAP; - hsz += 4; /* 3-tuple containing from, alias, and message */ + hsz += 4; /* + * 3-tuple containing from, alias, and message. + * If a non-nil token is passed this tuple will + * be increased to a 4-tuple. That extra element + * has already been accounted for above though... + */ mp = erts_alloc_message(hsz, &hp); ohp = &mp->hfrag.off_heap; hfrag = NULL; @@ -1867,9 +1979,13 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm hsz += 6; /* * 5-tuple containing from, alias, message, high part * of heap frag address, and low part of heap frag - * address. If we manage to allocate on the heap, we + * address. If a non-nil token is passed this tuple will + * be increased to a 6-tuple. That extra element + * has already been accounted for above though... + * + * If we manage to allocate on the heap, we * omit the heap frag address elements and use a - * 3-tuple instead. + * 3-tuple or 4-tuple instead. */ mp = erts_try_alloc_message_on_heap(rp, &rp_state, &rp_locks, hsz, &hp, &ohp, &on_heap); @@ -1911,18 +2027,23 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm #endif ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ALIAS_MSG, - type, 0); - ERL_MESSAGE_TOKEN(mp) = token_copy; + type, has_token); if (type != ERTS_SIG_Q_TYPE_HEAP_FRAG) { - /* 3-tuple containing from, alias, and message */ - ERL_MESSAGE_FROM(mp) = TUPLE3(hp, from, to_copy, msg_copy); + /* + * 3-tuple or 4-tuple containing from, alias, message, + * and perhaps a non-nil token. + */ + ERL_MESSAGE_FROM(mp) = (has_token + ? TUPLE4(hp, from, to_copy, msg_copy, + token_copy) + : TUPLE3(hp, from, to_copy, msg_copy)); } else { /* - * 5-tuple containing from, alias, and message, - * low halfword of heap frag address, and - * high halfword of heap frag address. + * 5-tuple or 6-tuple containing from, alias, and message, + * low halfword of heap frag address, high halfword of heap + * frag address, and perhaps a non-nil token. */ Uint low, high; Eterm hfrag_low, hfrag_high; @@ -1935,8 +2056,11 @@ erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to, Eterm msg, Eterm #endif hfrag_low = make_small(low); hfrag_high = make_small(high); - ERL_MESSAGE_FROM(mp) = TUPLE5(hp, from, to_copy, msg_copy, - hfrag_low, hfrag_high); + ERL_MESSAGE_FROM(mp) = (has_token + ? TUPLE6(hp, from, to_copy, msg_copy, + hfrag_low, hfrag_high, token_copy) + : TUPLE5(hp, from, to_copy, msg_copy, + hfrag_low, hfrag_high)); } if (!proc_queue_signal(&c_p->common, from, pid, (ErtsSignal *) mp, 0, @@ -1963,9 +2087,14 @@ erts_proc_sig_send_dist_to_alias(Eterm from, Eterm alias, ErlHeapFragment *hfrag, Eterm token) { ErtsMessage* mp; - Eterm token_copy; Eterm *hp; Eterm pid; + int has_token = !is_nil(token); + +#ifdef USE_VM_PROBES + if (token == am_have_dt_utag) + token = NIL; +#endif ASSERT(is_ref(alias)); pid = erts_get_pid_of_ref(alias); @@ -1978,49 +2107,47 @@ erts_proc_sig_send_dist_to_alias(Eterm from, Eterm alias, */ if (hfrag) { + Uint hsz = has_token ? 2 : 1; /* * Fragmented message. Data already allocated in heap fragment * including 'token' and 'to' ref. Only need room for the * 'alias' boxed pointer and a pointer to the heap fragment... */ - mp = erts_alloc_message(1, &hp); + mp = erts_alloc_message(hsz, &hp); ASSERT(mp->hfrag.alloc_size == 1); hp[0] = alias; + if (hsz == 2) + hp[1] = token; mp->hfrag.next = hfrag; - token_copy = token; } else { /* Un-fragmented message, allocate space for token and dist_ext in message. */ Uint dist_ext_sz = erts_dist_ext_size(edep) / sizeof(Eterm); Uint token_sz = is_immed(token) ? 0 : size_object(token); Uint alias_sz = size_object(alias); - Uint sz = 1 + alias_sz + token_sz + dist_ext_sz; + Uint usz = (has_token ? 2 : 1) + alias_sz + token_sz; + Uint asz = usz + dist_ext_sz; Eterm *aliasp; - mp = erts_alloc_message(sz, &hp); - ASSERT(mp->hfrag.alloc_size > 1); + mp = erts_alloc_message(asz, &hp); + ASSERT(mp->hfrag.alloc_size > 2); aliasp = hp++; + if (has_token) { + Eterm *tokenp = hp++; + *tokenp = (is_immed(token) + ? token + : copy_struct(token, token_sz, &hp, + &mp->hfrag.off_heap)); + } *aliasp = copy_struct(alias, alias_sz, &hp, &mp->hfrag.off_heap); - token_copy = (is_immed(token) - ? token - : copy_struct(token, token_sz, &hp, - &mp->hfrag.off_heap)); - mp->hfrag.used_size = 1 + alias_sz + token_sz; + mp->hfrag.used_size = usz; erts_make_dist_ext_copy(edep, erts_get_dist_ext(&mp->hfrag)); } ERL_MESSAGE_FROM(mp) = edep->dep->sysname; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; - if (token == am_have_dt_utag) - ERL_MESSAGE_TOKEN(mp) = NIL; - else -#endif - ERL_MESSAGE_TOKEN(mp) = token_copy; - ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ALIAS_MSG, ERTS_SIG_Q_TYPE_DIST, - 0); + has_token); if (!proc_queue_signal(NULL, from, pid, (ErtsSignal *) mp, 0, ERTS_SIG_Q_OP_ALIAS_MSG)) { @@ -2671,7 +2798,6 @@ erts_proc_sig_send_process_info_request(Process *c_p, Eterm to, int *item_ix, int len, - int need_msgq_len, int flags, Uint reserve_size, Eterm ref) @@ -2688,17 +2814,6 @@ erts_proc_sig_send_process_info_request(Process *c_p, pis->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_PROCESS_INFO, 0, 0); - if (!need_msgq_len) - pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE; - else { - pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC; - pis->marker.common.next = NULL; - pis->marker.common.specific.next = NULL; - pis->marker.common.tag = ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK; - pis->marker.refc = 0; - pis->marker.delayed_len = 0; - pis->marker.len_offset = 0; - } pis->requester = c_p->common.id; sys_memcpy((void *) &pis->oref_thing, (void *) internal_ref_val(ref), @@ -2793,7 +2908,7 @@ erts_proc_sig_send_dist_spawn_reply(Eterm node, ErlOffHeap *ohp; ErtsMessage *mp; Eterm ordered_from; - int force_flush; + int force_flush, has_token = !is_nil(token); ASSERT(is_atom(node)); @@ -2821,11 +2936,16 @@ erts_proc_sig_send_dist_spawn_reply(Eterm node, hsz += result_sz; } - token_sz = is_immed(token) ? 0 : size_object(token); - hsz += token_sz; - - hsz += sizeof(ErtsDistSpawnReplySigData)/sizeof(Eterm); - + if (has_token) { + token_sz = is_immed(token) ? 0 : size_object(token); + hsz += token_sz; + hsz += sizeof(ErtsDistSeqTokenSpawnReplySigData)/sizeof(Eterm); + } + else { + token_sz = 0; + hsz += sizeof(ErtsDistSpawnReplySigData)/sizeof(Eterm); + } + mp = erts_alloc_message(hsz, &hp); hp_start = hp; hfrag = &mp->hfrag; @@ -2859,12 +2979,13 @@ erts_proc_sig_send_dist_spawn_reply(Eterm node, datap->result = result_copy; datap->link = lnk; datap->patch_point = patch_point; + if (has_token) + ((ErtsDistSeqTokenSpawnReplySigData *) datap)->token = token_copy; ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_DIST_SPAWN_REPLY, ERTS_SIG_Q_TYPE_UNDEFINED, - 0); + has_token); ERL_MESSAGE_FROM(mp) = node; - ERL_MESSAGE_TOKEN(mp) = token_copy; /* * Sent from spawn-service at node, but we need to order this @@ -2991,7 +3112,6 @@ erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id) ERTS_SIG_Q_TYPE_CLA, 0); ERL_MESSAGE_FROM(sig) = c_p->common.id; - ERL_MESSAGE_TOKEN(sig) = am_undefined; #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(sig) = NIL; #endif @@ -3038,7 +3158,7 @@ erts_proc_sig_send_move_msgq_off_heap(Eterm to) void erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id) { - int force_flush_buffers = 0, enqueue_mq, fetch_sigs; + int force_flush_buffers = 0; ErtsSignal *sig; ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); @@ -3049,7 +3169,7 @@ erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id) ASSERT(!(flags & ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) || is_internal_pid(id) || is_internal_port(id)); - sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSignalCommon)); + sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsNonMsgSignal)); sig->common.next = NULL; sig->common.specific.attachment = NULL; sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_FLUSH, @@ -3064,51 +3184,18 @@ erts_proc_sig_init_flush_signals(Process *c_p, int flags, Eterm id) if (!proc_queue_signal(NULL, id, c_p->common.id, sig, force_flush_buffers, ERTS_SIG_Q_OP_FLUSH)) ERTS_INTERNAL_ERROR("Failed to send flush signal to ourselves"); - enqueue_mq = 0; - fetch_sigs = !0; - break; - case ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ: - enqueue_mq = !0; - fetch_sigs = 0; break; default: - enqueue_mq = !!(flags & ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ); - fetch_sigs = !0; break; } erts_set_gc_state(c_p, 0); - if (fetch_sigs) { - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); - erts_proc_sig_fetch(c_p); - erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - } - - c_p->sig_qs.flags |= FS_FLUSHING_SIGS; - - if (enqueue_mq) { - if (!c_p->sig_qs.cont) { - c_p->sig_qs.flags |= FS_FLUSHED_SIGS; - erts_free(ERTS_ALC_T_SIG_DATA, sig); - } - else { - if (!c_p->sig_qs.nmsigs.last) { - ASSERT(!c_p->sig_qs.nmsigs.next); - c_p->sig_qs.nmsigs.next = c_p->sig_qs.cont_last; - } - else { - ErtsSignal *lsig = (ErtsSignal *) *c_p->sig_qs.nmsigs.last; - ASSERT(c_p->sig_qs.nmsigs.next); - ASSERT(lsig && !lsig->common.specific.next); - lsig->common.specific.next = c_p->sig_qs.cont_last; - } + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - c_p->sig_qs.nmsigs.last = c_p->sig_qs.cont_last; - *c_p->sig_qs.cont_last = (ErtsMessage *) sig; - c_p->sig_qs.cont_last = &sig->common.next; - } - } + c_p->sig_qs.flags |= FS_FLUSHING_SIGS; } @@ -3278,7 +3365,7 @@ setup_tracing_state(Process *c_p, ErtsSigRecvTracing *tracing) } static ERTS_INLINE void -remove_iq_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) +remove_innerq_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) { /* * Remove signal from message queue (inner queue). @@ -3297,8 +3384,8 @@ remove_iq_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) } static ERTS_INLINE void -remove_mq_sig(Process *c_p, ErtsMessage *sig, - ErtsMessage **next_sig, ErtsMessage ***next_nm_sig) +remove_middleq_sig(Process *c_p, ErtsMessage *sig, + ErtsMessage **next_sig, ErtsMessage ***next_nm_sig) { /* * Remove signal from (middle) signal queue. @@ -3323,37 +3410,66 @@ remove_nm_sig(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig) ASSERT(ERTS_SIG_IS_NON_MSG(sig)); ASSERT(*next_sig == sig); *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; - remove_mq_sig(c_p, sig, next_sig, next_nm_sig); + remove_middleq_sig(c_p, sig, next_sig, next_nm_sig); +} + +static ERTS_INLINE void +inc_converted_msgs_len(Process *c_p, + ErtsSigRecvTracing *tracing, + ErtsMessage ***next_nm_sig, + Sint len) +{ + ASSERT(len > 0); + if (!tracing->messages.active) { + /* The converted messages were moved into the message queue... */ + c_p->sig_qs.mq_len += len; + erts_chk_sys_mon_long_msgq_on(c_p); + } + else { + /* + * The converted messages will be moved into the message queue + * when traced... + */ + if (!*next_nm_sig) { + c_p->sig_qs.mlenoffs += len; + } + else { + ErtsNonMsgSignal *nmsig = (ErtsNonMsgSignal *) **next_nm_sig; + ASSERT(nmsig); + nmsig->mlenoffs += len; + } + } } static ERTS_INLINE void -convert_to_msg(Process *c_p, ErtsMessage *sig, ErtsMessage *msg, - ErtsMessage ***next_nm_sig) +convert_to_msg(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage *sig, + ErtsMessage *msg, ErtsMessage ***next_nm_sig) { ErtsMessage **next_sig = *next_nm_sig; ASSERT(ERTS_SIG_IS_NON_MSG(sig)); *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; - c_p->sig_qs.len++; *next_sig = msg; - remove_mq_sig(c_p, sig, &msg->next, next_nm_sig); + remove_middleq_sig(c_p, sig, &msg->next, next_nm_sig); + inc_converted_msgs_len(c_p, tracing, next_nm_sig, 1); } static ERTS_INLINE void -convert_to_msgs(Process *c_p, ErtsMessage *sig, Uint no_msgs, - ErtsMessage *first_msg, ErtsMessage *last_msg, +convert_to_msgs(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage *sig, + Uint no_msgs, ErtsMessage *first_msg, ErtsMessage *last_msg, ErtsMessage ***next_nm_sig) { ErtsMessage **next_sig = *next_nm_sig; ASSERT(ERTS_SIG_IS_NON_MSG(sig)); *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; - c_p->sig_qs.len += no_msgs; *next_sig = first_msg; - remove_mq_sig(c_p, sig, &last_msg->next, next_nm_sig); + remove_middleq_sig(c_p, sig, &last_msg->next, next_nm_sig); + inc_converted_msgs_len(c_p, tracing, next_nm_sig, no_msgs); } static ERTS_INLINE void -insert_messages(Process *c_p, ErtsMessage **next, ErtsMessage *first, - ErtsMessage *last, Uint no_msgs, ErtsMessage ***next_nm_sig) +insert_messages(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage **next, + ErtsMessage *first, ErtsMessage *last, Sint no_msgs, + ErtsMessage ***next_nm_sig) { last->next = *next; if (c_p->sig_qs.cont_last == next) @@ -3362,31 +3478,25 @@ insert_messages(Process *c_p, ErtsMessage **next, ErtsMessage *first, *next_nm_sig = &last->next; if (c_p->sig_qs.nmsigs.last == next) c_p->sig_qs.nmsigs.last = &last->next; - c_p->sig_qs.len += no_msgs; + inc_converted_msgs_len(c_p, tracing, next_nm_sig, no_msgs); *next = first; } static ERTS_INLINE void -remove_mq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig, ErtsMessage ***next_nm_sig) -{ - /* Removing message... */ - ASSERT(!ERTS_SIG_IS_NON_MSG(sig)); - c_p->sig_qs.len--; - remove_mq_sig(c_p, sig, next_sig, next_nm_sig); -} - -static ERTS_INLINE void -remove_iq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) +remove_innerq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) { /* Removing message... */ ASSERT(!ERTS_SIG_IS_NON_MSG(sig)); - c_p->sig_qs.len--; - remove_iq_sig(c_p, sig, next_sig); + c_p->sig_qs.mq_len--; + ASSERT(c_p->sig_qs.mq_len >= 0); + erts_chk_sys_mon_long_msgq_off(c_p); + remove_innerq_sig(c_p, sig, next_sig); } static ERTS_INLINE void -convert_prepared_sig_to_msg_attached(Process *c_p, ErtsMessage *sig, Eterm msg, - void *data_attached, +convert_prepared_sig_to_msg_attached(Process *c_p, ErtsSigRecvTracing *tracing, + ErtsMessage *sig, Eterm msg, + Eterm token, void *data_attached, ErtsMessage ***next_nm_sig) { /* @@ -3397,20 +3507,23 @@ convert_prepared_sig_to_msg_attached(Process *c_p, ErtsMessage *sig, Eterm msg, *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; sig->data.attached = data_attached; ERL_MESSAGE_TERM(sig) = msg; - c_p->sig_qs.len++; + ERL_MESSAGE_TOKEN(sig) = token; + inc_converted_msgs_len(c_p, tracing, next_nm_sig, 1); } static ERTS_INLINE void -convert_prepared_sig_to_msg(Process *c_p, ErtsMessage *sig, Eterm msg, - ErtsMessage ***next_nm_sig) +convert_prepared_sig_to_msg(Process *c_p, ErtsSigRecvTracing *tracing, + ErtsMessage *sig, Eterm msg, + Eterm token, ErtsMessage ***next_nm_sig) { - convert_prepared_sig_to_msg_attached(c_p, sig, msg, + convert_prepared_sig_to_msg_attached(c_p, tracing, sig, msg, token, ERTS_MSG_COMBINED_HFRAG, next_nm_sig); } static ERTS_INLINE void -convert_prepared_sig_to_external_msg(Process *c_p, ErtsMessage *sig, +convert_prepared_sig_to_external_msg(Process *c_p, ErtsSigRecvTracing *tracing, + ErtsMessage *sig, Eterm token, ErtsMessage ***next_nm_sig) { /* @@ -3421,7 +3534,8 @@ convert_prepared_sig_to_external_msg(Process *c_p, ErtsMessage *sig, *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; sig->data.attached = &sig->hfrag; ERL_MESSAGE_TERM(sig) = THE_NON_VALUE; - c_p->sig_qs.len++; + ERL_MESSAGE_TOKEN(sig) = token; + inc_converted_msgs_len(c_p, tracing, next_nm_sig, 1); } static ERTS_INLINE Eterm @@ -3592,7 +3706,7 @@ recv_marker_dequeue(Process *c_p, ErtsRecvMarker *markp) markp->set_save = 0; } else { - remove_iq_sig(c_p, sigp, markp->prev_next); + remove_innerq_sig(c_p, sigp, markp->prev_next); markp->in_sigq = markp->in_msgq = 0; ASSERT(!markp->set_save); #ifdef DEBUG @@ -3753,7 +3867,7 @@ recv_marker_reuse(Process *c_p, int *ixp) ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); - remove_iq_sig(c_p, sigp, markp->prev_next); + remove_innerq_sig(c_p, sigp, markp->prev_next); markp->in_sigq = markp->in_msgq = 0; #ifdef DEBUG markp->prev_next = NULL; @@ -3819,6 +3933,7 @@ recv_marker_insert(Process *c_p, ErtsRecvMarker *markp, int setting) markp->sig.common.specific.next = NULL; markp->sig.common.tag = ERTS_RECV_MARKER_TAG; + ERTS_HDBG_PRIVQ_LEN(c_p); markp->pass = 0; markp->set_save = 0; markp->in_sigq = 1; @@ -3863,7 +3978,10 @@ recv_marker_insert(Process *c_p, ErtsRecvMarker *markp, int setting) *c_p->sig_qs.cont_last = (ErtsMessage *) &markp->sig; c_p->sig_qs.cont_last = &markp->sig.common.next; + markp->sig.nm_sig.mlenoffs = c_p->sig_qs.mlenoffs; + c_p->sig_qs.mlenoffs = 0; } + ERTS_HDBG_PRIVQ_LEN(c_p); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); } @@ -4047,8 +4165,11 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill) && (c_p->flags & F_TRAP_EXIT)) { - convert_prepared_sig_to_msg(c_p, sig, - xsigd->message, next_nm_sig); + Eterm token = (!ERTS_PROC_SIG_XTRA(tag) + ? NIL + : ((ErtsSeqTokenExitSignalData *) xsigd)->token); + convert_prepared_sig_to_msg(c_p, tracing, sig, xsigd->message, + token, next_nm_sig); conv_msg = sig; } else if (reason == am_normal @@ -4102,16 +4223,19 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, } static ERTS_INLINE int -convert_prepared_down_message(Process *c_p, ErtsMessage *sig, - Eterm msg, ErtsMessage ***next_nm_sig) +convert_prepared_down_message(Process *c_p, ErtsSigRecvTracing *tracing, + ErtsMessage *sig, Eterm msg, + ErtsMessage ***next_nm_sig) { - convert_prepared_sig_to_msg(c_p, sig, msg, next_nm_sig); + convert_prepared_sig_to_msg(c_p, tracing, sig, msg, am_undefined, + next_nm_sig); erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN); return 1; } static int convert_to_down_message(Process *c_p, + ErtsSigRecvTracing *tracing, ErtsMessage *sig, ErtsMonitorData *mdp, ErtsMonitor **omon, @@ -4314,7 +4438,7 @@ convert_to_down_message(Process *c_p, ERL_MESSAGE_TOKEN(mp) = am_undefined; /* Replace original signal with the exit message... */ - convert_to_msg(c_p, sig, mp, next_nm_sig); + convert_to_msg(c_p, tracing, sig, mp, next_nm_sig); cnt += 4; @@ -4325,6 +4449,7 @@ convert_to_down_message(Process *c_p, static ERTS_INLINE int convert_to_nodedown_messages(Process *c_p, + ErtsSigRecvTracing *tracing, ErtsMessage *sig, ErtsMonitorData *mdp, ErtsMessage ***next_nm_sig) @@ -4372,15 +4497,16 @@ convert_to_nodedown_messages(Process *c_p, erts_proc_unlock(c_p, locks & ~ERTS_PROC_LOCK_MAIN); /* Replace signal with 'nodedown' messages */ - convert_to_msgs(c_p, sig, n, nd_first, nd_last, next_nm_sig); + convert_to_msgs(c_p, tracing, sig, n, nd_first, nd_last, next_nm_sig); erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN); } return cnt; } -static int +static int handle_nodedown(Process *c_p, + ErtsSigRecvTracing *tracing, ErtsMessage *sig, ErtsMonitorData *mdp, ErtsMessage ***next_nm_sig) @@ -4423,15 +4549,17 @@ handle_nodedown(Process *c_p, cnt += 3; } - return cnt + convert_to_nodedown_messages(c_p, sig, mdp, next_nm_sig); + return cnt + + convert_to_nodedown_messages(c_p, tracing, sig, mdp, next_nm_sig); } static void -handle_persistent_mon_msg(Process *c_p, Uint16 type, - ErtsMonitor *mon, ErtsMessage *sig, +handle_persistent_mon_msg(Process *c_p, ErtsSigRecvTracing *tracing, + Uint16 type, ErtsMonitor *mon, ErtsMessage *sig, Eterm msg, ErtsMessage ***next_nm_sig) { - convert_prepared_sig_to_msg(c_p, sig, msg, next_nm_sig); + convert_prepared_sig_to_msg(c_p, tracing, sig, msg, am_undefined, + next_nm_sig); switch (type) { @@ -4492,7 +4620,8 @@ handle_persistent_mon_msg(Process *c_p, Uint16 type, } if (locks != ERTS_PROC_LOCK_MAIN) erts_proc_unlock(c_p, locks & ~ERTS_PROC_LOCK_MAIN); - insert_messages(c_p, &sig->next, first, last, n, next_nm_sig); + insert_messages(c_p, tracing, &sig->next, first, last, + n, next_nm_sig); } break; } @@ -4556,137 +4685,6 @@ handle_group_leader(Process *c_p, ErtsSigGroupLeader *sgl) destroy_sig_group_leader(sgl); } -static void -check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig) -{ - ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig; - - ASSERT(ERTS_PROC_SIG_OP(sig->common.tag) == ERTS_SIG_Q_OP_PROCESS_INFO); - - if (pisig->msgq_len_offset == ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC) { - ErtsProcSigMsgQLenOffsetMarker *mrkr; - Sint len, msgq_len_offset; - ErtsMessage *first = rp->sig_inq.first; - ASSERT(first); - if (((ErtsSignal *) first)->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) - mrkr = (ErtsProcSigMsgQLenOffsetMarker *) first; - else { - mrkr = &pisig->marker; - - ASSERT(mrkr->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK); - - mrkr->common.next = first; - ASSERT(rp->sig_inq.last != &rp->sig_inq.first); - if (rp->sig_inq.nmsigs.next == &rp->sig_inq.first) - rp->sig_inq.nmsigs.next = &mrkr->common.next; - if (rp->sig_inq.nmsigs.last == &rp->sig_inq.first) - rp->sig_inq.nmsigs.last = &mrkr->common.next; - rp->sig_inq.first = (ErtsMessage *) mrkr; - } - - len = rp->sig_inq.len; - msgq_len_offset = len - mrkr->len_offset; - - mrkr->len_offset = len; - mrkr->refc++; - - pisig->msgq_len_offset = msgq_len_offset; - -#ifdef DEBUG - /* save pointer to used marker... */ - pisig->marker.common.specific.attachment = (void *) mrkr; -#endif - - } -} - -static void -destroy_process_info_request(Process *c_p, ErtsProcessInfoSig *pisig) -{ - int dealloc_pisig = !0; - - if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) { - Sint refc; - int dealloc_marker = 0; - ErtsProcSigMsgQLenOffsetMarker *marker; -#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN - Sint delayed_len; -#endif - - ASSERT(pisig->msgq_len_offset >= 0); - - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); - marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first; - ASSERT(marker); - ASSERT(marker->refc > 0); - ASSERT(pisig->marker.common.specific.attachment == (void *) marker); - - marker->delayed_len -= pisig->msgq_len_offset; -#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN - delayed_len = marker->delayed_len; -#endif - - refc = --marker->refc; - if (refc) { - if (marker == &pisig->marker) { - /* Another signal using our marker... */ - dealloc_pisig = 0; - } - } - else { - /* Marker unused; remove it... */ - ASSERT(marker->delayed_len + marker->len_offset == 0); -#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN - delayed_len += marker->len_offset; -#endif - if (marker != &pisig->marker) - dealloc_marker = !0; /* used another signals marker... */ - c_p->sig_inq.first = marker->common.next; - if (c_p->sig_inq.last == &marker->common.next) - c_p->sig_inq.last = &c_p->sig_inq.first; - if (c_p->sig_inq.nmsigs.next == &marker->common.next) - c_p->sig_inq.nmsigs.next = &c_p->sig_inq.first; - if (c_p->sig_inq.nmsigs.last == &marker->common.next) - c_p->sig_inq.nmsigs.last = &c_p->sig_inq.first; - } - erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - - if (!refc) { - c_p->sig_qs.flags &= ~FS_DELAYED_PSIGQS_LEN; - /* Adjust msg len of inner+middle queue */ - ASSERT(marker->len_offset <= 0); - c_p->sig_qs.len -= marker->len_offset; - - ASSERT(c_p->sig_qs.len >= 0); - } - -#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN - { - Sint len = 0; - ERTS_FOREACH_SIG_PRIVQS( - c_p, mp, - { - if (ERTS_SIG_IS_MSG(mp)) - len++; - }); - ERTS_ASSERT(c_p->sig_qs.len + delayed_len == len); - } -#endif - - - if (dealloc_marker) { - ErtsProcessInfoSig *pisig2 - = (ErtsProcessInfoSig *) (((char *) marker) - - offsetof(ErtsProcessInfoSig, - marker)); - erts_free(ERTS_ALC_T_SIG_DATA, pisig2); - } - } - - if (dealloc_pisig) - erts_free(ERTS_ALC_T_SIG_DATA, pisig); -} - static int handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage *sig, ErtsMessage ***next_nm_sig, @@ -4698,24 +4696,11 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, ASSERT(!!is_alive == !(erts_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_EXITING)); - - if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) { - /* - * Request requires message queue data to be updated - * before inspection... - */ - - ASSERT(pisig->msgq_len_offset >= 0); - /* - * Update sig_qs.len to reflect the length - * of the message queue... - */ - c_p->sig_qs.len += pisig->msgq_len_offset; - - if (is_alive) { + if (is_alive) { + if (pisig->flags & ERTS_PI_FLAG_NEED_MSGQ) { /* - * Move messages part of message queue into inner - * signal queue... + * Request requires message queue to be updated before inspection. + * Move messages part of message queue into inner signal queue... */ ASSERT(tracing); @@ -4738,21 +4723,8 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, *next_nm_sig = &c_p->sig_qs.cont; *c_p->sig_qs.last = NULL; } - -#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN - { - Sint len; - ErtsMessage *mp; - for (mp = c_p->sig_qs.first, len = 0; mp; mp = mp->next) { - ERTS_ASSERT(ERTS_SIG_IS_MSG(mp)); - len++; - } - ERTS_ASSERT(c_p->sig_qs.len == len); - } -#endif } - } - if (is_alive) { + if (!pisig->common.specific.next) { /* * No more signals in middle queue... @@ -4817,7 +4789,7 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, erts_proc_unlock(rp, locks); } - destroy_process_info_request(c_p, pisig); + erts_free(ERTS_ALC_T_SIG_DATA, pisig); if (reds > INT_MAX/8) reds = INT_MAX/8; @@ -5277,7 +5249,10 @@ handle_dist_spawn_reply(Process *c_p, ErtsSigRecvTracing *tracing, } if (convert_to_message) { - convert_prepared_sig_to_msg(c_p, sig, msg, next_nm_sig); + Eterm token = (!ERTS_PROC_SIG_XTRA(((ErtsSignal *) sig)->common.tag) + ? NIL + : ((ErtsDistSeqTokenSpawnReplySigData *) datap)->token); + convert_prepared_sig_to_msg(c_p, tracing, sig, msg, token, next_nm_sig); if (tag_hfrag) { /* Save heap fragment of tag in message... */ ASSERT(sig->data.attached == ERTS_MSG_COMBINED_HFRAG); @@ -5383,21 +5358,22 @@ handle_dist_spawn_reply_exiting(Process *c_p, } static int -handle_alias_message(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig) +handle_alias_message(Process *c_p, ErtsSigRecvTracing *tracing, + ErtsMessage *sig, ErtsMessage ***next_nm_sig) { void *data_attached; - Eterm from, alias, msg; + Eterm from, alias, msg, token; ErtsMonitor *mon; Uint16 flags; int type, cnt = 0; - type = get_alias_msg_data(sig, &from, &alias, &msg, &data_attached); + type = get_alias_msg_data(sig, &from, &alias, &msg, &data_attached, &token); ASSERT(is_internal_pid(from) || is_atom(from)); ASSERT(is_internal_pid_ref(alias)); ERL_MESSAGE_FROM(sig) = from; - + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), alias); flags = mon ? mon->flags : (Uint16) 0; if (!(flags & ERTS_ML_STATE_ALIAS_MASK) @@ -5474,7 +5450,7 @@ handle_alias_message(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig) } if (type != ERTS_SIG_Q_TYPE_DIST) { - convert_prepared_sig_to_msg_attached(c_p, sig, msg, + convert_prepared_sig_to_msg_attached(c_p, tracing, sig, msg, token, data_attached, next_nm_sig); cnt++; } @@ -5485,8 +5461,9 @@ handle_alias_message(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig) * See erts_proc_sig_send_dist_to_alias() for info on * how the signal was constructed... */ - if (sig->hfrag.alloc_size > 1) { - convert_prepared_sig_to_external_msg(c_p, sig, next_nm_sig); + if (sig->hfrag.alloc_size > 2) { + convert_prepared_sig_to_external_msg(c_p, tracing, sig, token, + next_nm_sig); cnt++; } else { @@ -5499,16 +5476,17 @@ handle_alias_message(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig) (void *) &sig->m[0], ERL_MESSAGE_REF_ARRAY_SZ*sizeof(Eterm)); ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; + ERL_MESSAGE_TOKEN(mp) = token; ASSERT(sig->hfrag.next); mp->data.heap_frag = sig->hfrag.next; /* Replace original signal with the external message... */ - convert_to_msg(c_p, sig, mp, next_nm_sig); + convert_to_msg(c_p, tracing, sig, mp, next_nm_sig); ERL_MESSAGE_TERM(sig) = NIL; sig->data.attached = ERTS_MSG_COMBINED_HFRAG; sig->hfrag.next = NULL; - sig->next = NULL;; + sig->next = NULL; erts_cleanup_messages(sig); cnt += 8; } @@ -5567,6 +5545,8 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, return 0; } + ERTS_HDBG_PRIVQ_LEN(c_p); + next_nm_sig = &c_p->sig_qs.nmsigs.next; setup_tracing_state(c_p, &tracing); @@ -5591,6 +5571,14 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, yield = !0; break; /* tracing limit or end... */ } +#ifdef DEBUG + if (*next_nm_sig) { + ErtsNonMsgSignal *nm_sig = (ErtsNonMsgSignal *) **next_nm_sig; + ASSERT(nm_sig); + ASSERT(ERTS_SIG_IS_NON_MSG_TAG(nm_sig->tag)); + ASSERT(nm_sig->mlenoffs == 0); + } +#endif ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); } @@ -5601,8 +5589,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ASSERT(sig); ASSERT(ERTS_SIG_IS_NON_MSG(sig)); + ASSERT(((ErtsNonMsgSignal *) sig)->mlenoffs >= 0); - tag = ((ErtsSignal *) sig)->common.tag; + tag = ((ErtsNonMsgSignal *) sig)->tag; + c_p->sig_qs.mq_len += ((ErtsNonMsgSignal *) sig)->mlenoffs; + erts_chk_sys_mon_long_msgq_on(c_p); switch (ERTS_PROC_SIG_OP(tag)) { @@ -5642,7 +5633,8 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, mdp = erts_monitor_to_data(tmon); if (erts_monitor_is_in_table(&mdp->origin)) { omon = &mdp->origin; - cnt += convert_to_down_message(c_p, sig, mdp, &omon, + cnt += convert_to_down_message(c_p, &tracing, + sig, mdp, &omon, type, next_nm_sig); } break; @@ -5679,7 +5671,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, tmon = &mdp->u.target; } ASSERT(!(omon->flags & ERTS_ML_FLGS_SPAWN)); - cnt += convert_prepared_down_message(c_p, sig, + cnt += convert_prepared_down_message(c_p, &tracing, sig, xsigd->message, next_nm_sig); if (omon->flags & ERTS_ML_FLG_TAG) { @@ -5702,7 +5694,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, mdp = erts_monitor_to_data(tmon); if (erts_monitor_is_in_table(&mdp->origin)) { omon = &mdp->origin; - cnt += handle_nodedown(c_p, sig, mdp, next_nm_sig); + cnt += handle_nodedown(c_p, &tracing, sig, mdp, next_nm_sig); } break; case ERTS_MON_TYPE_SUSPEND: @@ -5777,7 +5769,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), key); if (mon) { ASSERT(erts_monitor_is_origin(mon)); - handle_persistent_mon_msg(c_p, type, mon, sig, + handle_persistent_mon_msg(c_p, &tracing, type, mon, sig, msg, next_nm_sig); if ((mon->flags & ERTS_ML_STATE_ALIAS_MASK) @@ -6127,7 +6119,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, case ERTS_SIG_Q_OP_ALIAS_MSG: { ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); - cnt += handle_alias_message(c_p, sig, next_nm_sig); + cnt += handle_alias_message(c_p, &tracing, sig, next_nm_sig); ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; } @@ -6251,13 +6243,22 @@ stop: { res = !c_p->sig_qs.cont; } else if (*next_nm_sig) { + ErtsSignal *next; /* * All messages prior to next non-message * signal should be moved to inner queue. * Next non-message signal to handle should * be first in middle queue. */ - ASSERT(**next_nm_sig); + next = (ErtsSignal *) **next_nm_sig; + + ASSERT(next); + ASSERT(ERTS_SIG_IS_NON_MSG(next)); + ASSERT(next->nm_sig.mlenoffs >= 0); + c_p->sig_qs.mq_len += next->nm_sig.mlenoffs; + next->nm_sig.mlenoffs = 0; + erts_chk_sys_mon_long_msgq_on(c_p); + if (*next_nm_sig != &c_p->sig_qs.cont) { if (ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.cont)) { ErtsRecvMarker *markp = (ErtsRecvMarker *) c_p->sig_qs.cont; @@ -6289,6 +6290,11 @@ stop: { ASSERT(!c_p->sig_qs.nmsigs.next); c_p->sig_qs.nmsigs.last = NULL; + ASSERT(c_p->sig_qs.mlenoffs >= 0); + c_p->sig_qs.mq_len += c_p->sig_qs.mlenoffs; + c_p->sig_qs.mlenoffs = 0; + erts_chk_sys_mon_long_msgq_on(c_p); + if (c_p->sig_qs.cont_last != &c_p->sig_qs.cont) { if (ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.cont)) { ErtsRecvMarker *markp = (ErtsRecvMarker *) c_p->sig_qs.cont; @@ -6346,6 +6352,12 @@ stop: { c_p->sig_qs.save); } + if (c_p->sig_qs.mq_len && (c_p->flags & F_HIBERNATED)) { + erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN); + } + + ERTS_HDBG_PRIVQ_LEN(c_p); + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); *redsp = cnt/ERTS_SIG_REDS_CNT_FACTOR + 1; @@ -6660,36 +6672,22 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp, return !0; } -#ifdef USE_VM_PROBES -# define ERTS_CLEAR_SEQ_TOKEN(MP) \ - ERL_MESSAGE_TOKEN((MP)) = ((ERL_MESSAGE_DT_UTAG((MP)) != NIL) \ - ? am_have_dt_utag : NIL) -#else -# define ERTS_CLEAR_SEQ_TOKEN(MP) \ - ERL_MESSAGE_TOKEN((MP)) = NIL -#endif - static ERTS_INLINE void clear_seq_trace_token(ErtsMessage *sig) { if (ERTS_SIG_IS_MSG((ErtsSignal *) sig)) ERTS_CLEAR_SEQ_TOKEN(sig); else { - Uint tag; - Uint16 op, type; - - tag = ((ErtsSignal *) sig)->common.tag; - type = ERTS_PROC_SIG_TYPE(tag); - op = ERTS_PROC_SIG_OP(tag); + Uint tag = ((ErtsSignal *) sig)->common.tag; - switch (op) { + switch (ERTS_PROC_SIG_OP(tag)) { case ERTS_SIG_Q_OP_EXIT: case ERTS_SIG_Q_OP_EXIT_LINKED: case ERTS_SIG_Q_OP_MONITOR_DOWN: - switch (type) { + switch (ERTS_PROC_SIG_TYPE(tag)) { case ERTS_SIG_Q_TYPE_GEN_EXIT: - ERTS_CLEAR_SEQ_TOKEN(sig); + clear_seq_token_gen_exit(sig); break; case ERTS_LNK_TYPE_PORT: case ERTS_LNK_TYPE_PROC: @@ -6709,12 +6707,14 @@ clear_seq_trace_token(ErtsMessage *sig) } break; - case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: case ERTS_SIG_Q_OP_DIST_SPAWN_REPLY: + clear_seq_token_spawn_reply(sig); + break; case ERTS_SIG_Q_OP_ALIAS_MSG: - ERTS_CLEAR_SEQ_TOKEN(sig); + clear_seq_token_alias_msg(sig); break; + case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: case ERTS_SIG_Q_OP_MONITOR: case ERTS_SIG_Q_OP_DEMONITOR: case ERTS_SIG_Q_OP_LINK: @@ -6890,7 +6890,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig) } case ERTS_SIG_Q_OP_FLUSH: - size = sizeof(ErtsSignalCommon); + size = sizeof(ErtsNonMsgSignal); break; case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: @@ -7088,7 +7088,7 @@ remove_yield_marker(Process *c_p, ErtsRecvMarker *mrkp) ASSERT(mrkp); ASSERT(mrkp->is_yield_mark); ASSERT(mrkp->in_msgq); - remove_iq_sig(c_p, (ErtsMessage *) mrkp, mrkp->prev_next); + remove_innerq_sig(c_p, (ErtsMessage *) mrkp, mrkp->prev_next); mrkp->in_msgq = 0; mrkp->in_sigq = 0; mrkp->prev_next = NULL; @@ -7161,7 +7161,7 @@ insert_adj_msgq_yield_markers(Process *c_p, yp->last.prev_next = next_sig; *next_nm_sig = ((ErtsSignal *) sig)->common.specific.next; *next_sig = (ErtsMessage *) &yp->last; - remove_mq_sig(c_p, sig, &yp->last.sig.common.next, next_nm_sig); + remove_middleq_sig(c_p, sig, &yp->last.sig.common.next, next_nm_sig); ERTS_SIG_DBG_RECV_MARK_SET_HANDLED(&yp->last); } @@ -7295,6 +7295,8 @@ send_cla_reply(Process *c_p, ErtsMessage *sig, Eterm to, else rp_locks = 0; + ERL_MESSAGE_TOKEN(sig) = am_undefined; + erts_queue_proc_message(c_p, rp, rp_locks, sig, reply_msg); } @@ -7776,7 +7778,6 @@ handle_message_enqueued_tracing(Process *c_p, Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; - Sint len = erts_proc_sig_privqs_len(c_p); Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg); if (seq_trace_token != NIL && is_tuple(seq_trace_token)) { @@ -7788,7 +7789,7 @@ handle_message_enqueued_tracing(Process *c_p, DTRACE6(message_queued, tracing->messages.receiver_name, size_object(ERL_MESSAGE_TERM(msg)), - len, /* This is NOT message queue len, but its something... */ + c_p->sig_qs.mq_len, tok_label, tok_lastcnt, tok_serial); } #endif @@ -7807,6 +7808,7 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage ***next_nm_sig) { ErtsMessage **next_sig, *sig; + Sint *mlenoffsp; int cnt = 0, limit = ERTS_PROC_SIG_TRACE_COUNT_LIMIT; ASSERT(tracing->messages.next); @@ -7828,7 +7830,21 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, ASSERT(!sig || !ERTS_SIG_IS_RECV_MARKER(sig) || !((ErtsRecvMarker *) sig)->in_msgq); } - + + /* + * We keep 'mlenoffs' up to date for everyting we do, so that the signal queue + * is up to data all the time. By this we might switch over to not do + * receive tracing at any time without having to adjust the signal queue. + */ + if (!*next_nm_sig) { + mlenoffsp = &c_p->sig_qs.mlenoffs; + } + else { + ErtsNonMsgSignal *nmsig = (ErtsNonMsgSignal *) **next_nm_sig; + ASSERT(nmsig); + mlenoffsp = &nmsig->mlenoffs; + } + /* * Receive tracing active. Handle all messages * until next non-message signal... @@ -7839,12 +7855,14 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, tracing->messages.next = next_sig; return -1; /* Yield... */ } + ASSERT(*mlenoffsp > 0); + (*mlenoffsp)--; if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) { cnt += 50; /* Decode is expensive... */ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 0)) { /* Bad dist message; remove it... */ - remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig); + remove_middleq_sig(c_p, sig, next_sig, next_nm_sig); sig->next = NULL; erts_cleanup_messages(sig); sig = *next_sig; @@ -7854,6 +7872,8 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, handle_message_enqueued_tracing(c_p, tracing, sig); cnt++; + c_p->sig_qs.mq_len++; + erts_chk_sys_mon_long_msgq_on(c_p); next_sig = &(*next_sig)->next; sig = *next_sig; } @@ -7966,7 +7986,7 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, ASSERT(*mpp == bad_mp); - remove_iq_m_sig(rp, mp, mpp); + remove_innerq_m_sig(rp, mp, mpp); mp = *mpp; @@ -7994,9 +8014,7 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, mp = mp->next; } - - ASSERT(info_on_self || c_p->sig_qs.len == i); - ASSERT(!info_on_self || c_p->sig_qs.len >= i); + ASSERT(c_p->sig_qs.mq_len == i); *msgq_len_p = i; @@ -8354,9 +8372,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_ALIAS_MSG: { void *attached; ErlHeapFragment *hfp; - (void) get_alias_msg_data(sig, NULL, NULL, NULL, &attached); + (void) get_alias_msg_data(sig, NULL, NULL, NULL, &attached, + NULL); if (!attached) - break; + break; /* on heap */ if (attached == ERTS_MSG_COMBINED_HFRAG) hfp = &sig->hfrag; else @@ -8401,7 +8420,6 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: case ERTS_SIG_Q_OP_PROCESS_INFO: case ERTS_SIG_Q_OP_RECV_MARK: - case ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK: case ERTS_SIG_Q_OP_FLUSH: break; @@ -8769,7 +8787,6 @@ erts_proc_sig_hdbg_check_in_queue(Process *p, struct ErtsSignalInQueue_ *buffer, NULL, nmsig_flag, msig_flag); - ERTS_ASSERT(p->sig_inq.len == len); (void)len; } #endif /* ERTS_PROC_SIG_HARD_DEBUG */ @@ -8962,16 +8979,23 @@ static void sig_inq_concat(ErtsSignalInQueue* q1, ErtsSignalInQueue* q2) { ErtsMessage** first_queue_last = q1->last; /* Second queue should not be empty */ + ERTS_HDBG_INQ_LEN(q1); + ERTS_HDBG_INQ_LEN(q2); ASSERT(q2->last != &q2->first); - if (NULL == q1->nmsigs.next) { - /* There is no non-message signals in q1 but maybe in q2 */ - if (q2->nmsigs.next != NULL) { + if (NULL == q2->nmsigs.next) { + q1->mlenoffs += q2->mlenoffs; + } + else { + ErtsNonMsgSignal *nmsig = (ErtsNonMsgSignal *) *q2->nmsigs.next; + ASSERT(nmsig); + if (NULL == q1->nmsigs.next) { /* There is non-message signals in q2 but not in q1 */ if (q2->nmsigs.next == &q2->first) { /* The first message in q2 is a non-message signal (The next pointer to the first non-message signal comes from the first queue) */ q1->nmsigs.next = first_queue_last; + ASSERT(nmsig->mlenoffs == 0); } else { /* Internal message in q2 is the first non-message signal */ q1->nmsigs.next = q2->nmsigs.next; @@ -8986,32 +9010,37 @@ static void sig_inq_concat(ErtsSignalInQueue* q1, ErtsSignalInQueue* q2) q1->nmsigs.last = q2->nmsigs.last; } } - } else if (NULL != q2->nmsigs.next) { - ErtsMessage** first_nmsig_in_q2; - /* We have non-message signals in both queues */ - if (q2->nmsigs.next == &q2->first) { - /* The first signal in q2 is a non-message signal */ - ErtsSignal *sig; - sig = (ErtsSignal *) *q1->nmsigs.last; - sig->common.specific.next = first_queue_last; - first_nmsig_in_q2 = first_queue_last; - } else { - /* The first signal in q2 is a message signal */ - ErtsSignal *sig; - sig = (ErtsSignal *) *q1->nmsigs.last; - sig->common.specific.next = q2->nmsigs.next; - first_nmsig_in_q2 = q2->nmsigs.next; - } - if (q2->nmsigs.last == &q2->first) { - /* Only one non-message signal in q2 */ - q1->nmsigs.last = first_nmsig_in_q2; - } else { - q1->nmsigs.last = q2->nmsigs.last; + else { + ErtsMessage** first_nmsig_in_q2; + ASSERT(nmsig); + /* We have non-message signals in both queues */ + if (q2->nmsigs.next == &q2->first) { + /* The first signal in q2 is a non-message signal */ + ErtsSignal *sig; + sig = (ErtsSignal *) *q1->nmsigs.last; + sig->common.specific.next = first_queue_last; + first_nmsig_in_q2 = first_queue_last; + ASSERT(nmsig->mlenoffs == 0); + } else { + /* The first signal in q2 is a message signal */ + ErtsSignal *sig; + sig = (ErtsSignal *) *q1->nmsigs.last; + sig->common.specific.next = q2->nmsigs.next; + first_nmsig_in_q2 = q2->nmsigs.next; + } + if (q2->nmsigs.last == &q2->first) { + /* Only one non-message signal in q2 */ + q1->nmsigs.last = first_nmsig_in_q2; + } else { + q1->nmsigs.last = q2->nmsigs.last; + } } + nmsig->mlenoffs += q1->mlenoffs; + q1->mlenoffs = q2->mlenoffs; } *q1->last = q2->first; q1->last = q2->last; - q1->len += q2->len; + ERTS_HDBG_INQ_LEN(q1); ASSERT((!q1->nmsigs.next && !q1->nmsigs.last) || (q1->nmsigs.next && q1->nmsigs.last)); } @@ -9041,7 +9070,7 @@ static Uint proc_sig_queue_flush_buffer(Process* proc, sig_inq_concat(&proc->sig_inq, &buf->b.queue); buf->b.queue.first = NULL; buf->b.queue.last = &buf->b.queue.first; - buf->b.queue.len = 0; + buf->b.queue.mlenoffs = 0; buf->b.queue.nmsigs.next = NULL; buf->b.queue.nmsigs.last = NULL; ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc, &buf->b.queue); @@ -9209,7 +9238,7 @@ void erts_proc_sig_queue_maybe_install_buffers(Process* p, erts_aint32_t state) ERTS_LOCK_FLAGS_CATEGORY_PROCESS); buffers->slots[i].b.queue.first = NULL; buffers->slots[i].b.queue.last = &buffers->slots[i].b.queue.first; - buffers->slots[i].b.queue.len = 0; + buffers->slots[i].b.queue.mlenoffs = 0; buffers->slots[i].b.queue.nmsigs.next = NULL; buffers->slots[i].b.queue.nmsigs.last = NULL; buffers->slots[i].b.nr_of_enqueues = 0; diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 89155810995e..341a7a7319db 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -95,17 +95,76 @@ # define ERTS_PROC_SIG_HARD_DEBUG_SIGQ_BUFFERS #endif +#define ERTS_HDBG_PRIVQ_LEN__(P) \ + do { \ + Sint len = 0; \ + ErtsMessage *sig; \ + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks((P)) \ + & ERTS_PROC_LOCK_MAIN); \ + for (sig = (P)->sig_qs.first; sig; sig = sig->next) { \ + if (ERTS_SIG_IS_MSG(sig)) \ + len++; \ + } \ + ERTS_ASSERT((P)->sig_qs.mq_len == len); \ + for (sig = (P)->sig_qs.cont, len = 0; sig; sig = sig->next) { \ + if (ERTS_SIG_IS_MSG(sig)) { \ + len++; \ + } \ + else { \ + ErtsNonMsgSignal *nmsig = (ErtsNonMsgSignal *) sig; \ + ERTS_ASSERT(nmsig->mlenoffs == len); \ + len = 0; \ + } \ + } \ + ERTS_ASSERT((P)->sig_qs.mlenoffs == len); \ + } while (0) +#define ERTS_HDBG_INQ_LEN__(Q) \ + do { \ + Sint len = 0; \ + ErtsMessage *sig; \ + for (sig = (Q)->first; sig; sig = sig->next) { \ + if (ERTS_SIG_IS_MSG(sig)) { \ + len++; \ + } \ + else { \ + ErtsNonMsgSignal *nmsig = (ErtsNonMsgSignal *) sig; \ + ERTS_ASSERT(nmsig->mlenoffs == len); \ + len = 0; \ + } \ + } \ + ERTS_ASSERT((Q)->mlenoffs == len); \ + } while (0) + +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN +#define ERTS_HDBG_PRIVQ_LEN(P) ERTS_HDBG_PRIVQ_LEN__((P)) +#define ERTS_HDBG_INQ_LEN(Q) ERTS_HDBG_INQ_LEN((Q)) +#else +#define ERTS_HDBG_PRIVQ_LEN(P) +#define ERTS_HDBG_INQ_LEN(Q) +#endif + + struct erl_mesg; struct erl_dist_external; +#define ERTS_SIGNAL_COMMON_FIELDS__ \ + struct erl_mesg *next; \ + union { \ + struct erl_mesg **next; \ + void *attachment; \ + } specific; \ + Eterm tag + + typedef struct { - struct erl_mesg *next; - union { - struct erl_mesg **next; - void *attachment; - } specific; - Eterm tag; + ERTS_SIGNAL_COMMON_FIELDS__; } ErtsSignalCommon; + +typedef struct { + ERTS_SIGNAL_COMMON_FIELDS__; + Sint mlenoffs; /* Number of msg sigs preceeding the non-msg sig */ +} ErtsNonMsgSignal; + /* * Note that not all signal are handled using this functionality! */ @@ -193,7 +252,7 @@ typedef struct { #define ERTS_RECV_MARKER_PASS_MAX 4 typedef struct { - ErtsSignalCommon common; + ErtsNonMsgSignal common; Eterm from; Uint64 id; } ErtsSigUnlinkOp; @@ -299,17 +358,6 @@ int erts_proc_sig_queue_force_buffers(Process*); << ERTS_SIG_Q_XTRA_SHIFT), \ _TAG_HEADER_EXTERNAL_PID)) - -/* - * ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK is not an actual - * operation. We keep it at the top of the OP range, - * larger than ERTS_SIG_Q_OP_MAX. - */ -#define ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK ERTS_SIG_Q_OP_MASK - -#define ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK \ - ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK,0,0) - struct dist_entry_; #define ERTS_PROC_HAS_INCOMING_SIGNALS(P) \ @@ -864,14 +912,6 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, * * @param[in] len Length of info index array * - * @param[in] need_msgq_len Non-zero if message queue - * length is needed; otherwise, - * zero. If non-zero, sig_qs.len - * will be set to correspond - * to the message queue length - * before call to - * erts_process_info() - * * @param[in] flags Flags to pass to * erts_process_info() * @@ -889,7 +929,6 @@ erts_proc_sig_send_process_info_request(Process *c_p, Eterm to, int *item_ix, int len, - int need_msgq_len, int flags, Uint reserve_size, Eterm ref); @@ -1255,12 +1294,6 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls, int neg_o_reds, ErtsMessage **msgpp, int *get_outp); -/* - * CLEAN_SIGQ - Flush until middle queue is empty, i.e. - * the content of inner+middle queue equals - * the message queue. - */ -#define ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ (1 << 0) /* * FROM_ALL - Flush signals from all local senders (processes * and ports). @@ -1276,8 +1309,7 @@ erts_proc_sig_receive_helper(Process *c_p, int fcalls, * All erts_proc_sig_init_flush_signals() flags. */ #define ERTS_PROC_SIG_FLUSH_FLGS \ - (ERTS_PROC_SIG_FLUSH_FLG_CLEAN_SIGQ \ - | ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \ + (ERTS_PROC_SIG_FLUSH_FLG_FROM_ALL \ | ERTS_PROC_SIG_FLUSH_FLG_FROM_ID) /** @@ -1330,19 +1362,46 @@ ERTS_GLB_INLINE Sint erts_proc_sig_fetch(Process *p); /** * - * @brief Get amount of messages in private queues + * @brief Get amount of signals in private queues * * @param[in] c_p Pointer to process struct of * currently executing process. * - * @returns Amount of message signals in + * @param[in] max_nmsigs Maximum amount of signals to + * traverse in order to calculate + * the result. If this limit is + * reached the operation is aborted + * and the current calculated + * result is returned as a negative + * amount. If a negative number + * is given, an unlimited amount + * of signals will be traversed in + * order to calculate the result. + * + * + * @param[in] max_nmsigs Maximum amount of non-message + * signals to traverse in order + * to calculate the result. If + * this limit is reached the + * operation is aborted and the + * current calculated result + * is returned as a negative + * amount. If a negative number + * is given, an unlimited amount + * of non-message signals will + * be traversed in order to + * calculate the result. + * + * @returns Amount of signals in * inner plus middle signal * queues after fetch completed - * (NOT the message queue - * length). + * (this is NOT the message queue + * length). Negative amount + * if the operation was aborted + * (see above). */ -Sint -erts_proc_sig_privqs_len(Process *c_p); +ERTS_GLB_INLINE Sint +erts_proc_sig_privqs_len(Process *c_p, Sint max_sigs, Sint max_nmsigs); /** * @brief Enqueue a sequence of signals on an in signal queue of @@ -1754,9 +1813,8 @@ extern Process *erts_dirty_process_signal_handler_max; void erts_proc_sig_fetch__(Process *proc, ErtsSignalInQueueBufferArray* buffers, int need_unget_buffers); -Sint erts_proc_sig_fetch_msgq_len_offs__(Process *proc, - ErtsSignalInQueueBufferArray* buffers, - int need_unget_buffers); +ERTS_GLB_INLINE void erts_chk_sys_mon_long_msgq_on(Process *proc); +ERTS_GLB_INLINE void erts_chk_sys_mon_long_msgq_off(Process *proc); ERTS_GLB_INLINE int erts_msgq_eq_recv_mark_id__(Eterm term1, Eterm term2); ERTS_GLB_INLINE void erts_msgq_recv_marker_set_save__(Process *c_p, ErtsRecvMarkerBlock *blkp, @@ -1843,11 +1901,32 @@ erts_proc_sig_queue_unget_buffers(ErtsSignalInQueueBufferArray* buffers, } } +ERTS_GLB_INLINE void +erts_chk_sys_mon_long_msgq_on(Process *proc) +{ + if (((proc->sig_qs.flags & (FS_MON_MSGQ_LEN|FS_MON_MSGQ_LEN_LONG)) + == FS_MON_MSGQ_LEN) + && proc->sig_qs.mq_len >= erts_system_monitor_long_msgq_on) { + proc->sig_qs.flags |= FS_MON_MSGQ_LEN_LONG; + monitor_generic(proc, am_long_message_queue, am_true); + } +} + +ERTS_GLB_INLINE void +erts_chk_sys_mon_long_msgq_off(Process *proc) +{ + if (((proc->sig_qs.flags & (FS_MON_MSGQ_LEN|FS_MON_MSGQ_LEN_LONG)) + == (FS_MON_MSGQ_LEN|FS_MON_MSGQ_LEN_LONG)) + && proc->sig_qs.mq_len <= erts_system_monitor_long_msgq_off) { + proc->sig_qs.flags &= ~FS_MON_MSGQ_LEN_LONG; + monitor_generic(proc, am_long_message_queue, am_false); + } +} + ERTS_GLB_INLINE Sint erts_proc_sig_fetch(Process *proc) { - Sint res = 0; - ErtsSignal *sig; + Sint res; ErtsSignalInQueueBufferArray* buffers; int need_unget_buffers; ERTS_LC_ASSERT((erts_proc_lc_my_proc_locks(proc) @@ -1869,38 +1948,43 @@ erts_proc_sig_fetch(Process *proc) buffers = erts_proc_sig_queue_get_buffers(proc, &need_unget_buffers); - sig = (ErtsSignal *) proc->sig_inq.first; - - if (!sig) { - if (buffers) - goto fetch; - } - else if (ERTS_UNLIKELY(sig->common.tag - == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK)) { - res = erts_proc_sig_fetch_msgq_len_offs__(proc, buffers, - need_unget_buffers); - } - else { - fetch: + if (buffers || proc->sig_inq.first) erts_proc_sig_fetch__(proc, buffers, need_unget_buffers); - } - - res += proc->sig_qs.len; + res = proc->sig_qs.mq_len; ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0); -#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN - { - Sint len = 0; - ERTS_FOREACH_SIG_PRIVQS( - proc, mp, - { - if (ERTS_SIG_IS_MSG(mp)) - len++; - }); - ERTS_ASSERT(res == len); + return res; +} + +ERTS_GLB_INLINE Sint +erts_proc_sig_privqs_len(Process *c_p, Sint max_sigs, Sint max_nmsigs) +{ + Sint res = c_p->sig_qs.mq_len; + int no_nmsigs = 0; + ErtsMessage **nmsigpp; + + ERTS_HDBG_PRIVQ_LEN(c_p); + + if (max_sigs < 0) + max_sigs = ERTS_SWORD_MAX; /* Check all... */ + if (res > max_sigs) + return -res; + + nmsigpp = c_p->sig_qs.nmsigs.next; + if (max_nmsigs < 0) + max_nmsigs = ERTS_SWORD_MAX; /* Check all... */ + res += c_p->sig_qs.mlenoffs; + while (nmsigpp) { + ErtsNonMsgSignal *nmsigp = (ErtsNonMsgSignal *) *nmsigpp; + ASSERT(nmsigp); + res += nmsigp->mlenoffs + 1; + if (res > max_sigs) + return -res; + if (++no_nmsigs > max_nmsigs) + return -res; /* Abort; to many non-message signals... */ + nmsigpp = nmsigp->specific.next; } -#endif return res; } @@ -2140,7 +2224,9 @@ erts_msgq_unlink_msg(Process *c_p, ErtsMessage *msgp) ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "before"); *c_p->sig_qs.save = sigp; - c_p->sig_qs.len--; + c_p->sig_qs.mq_len--; + ASSERT(c_p->sig_qs.mq_len >= 0); + erts_chk_sys_mon_long_msgq_off(c_p); if (sigp && ERTS_SIG_IS_RECV_MARKER(sigp)) { ErtsMessage **sigpp = c_p->sig_qs.save; ((ErtsRecvMarker *) sigp)->prev_next = sigpp; @@ -2179,7 +2265,9 @@ erts_msgq_unlink_msg_set_save_first(Process *c_p, ErtsMessage *msgp) ASSERT(!(c_p->sig_qs.flags & FS_HANDLING_SIGS)); ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "before"); *c_p->sig_qs.save = sigp; - c_p->sig_qs.len--; + c_p->sig_qs.mq_len--; + ASSERT(c_p->sig_qs.mq_len >= 0); + erts_chk_sys_mon_long_msgq_off(c_p); if (!sigp) c_p->sig_qs.last = c_p->sig_qs.save; else if (ERTS_SIG_IS_RECV_MARKER(sigp)) diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index d6cd7b159641..d85f5e463eed 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -428,6 +428,8 @@ Eterm ERTS_WRITE_UNLIKELY(erts_system_monitor); Eterm ERTS_WRITE_UNLIKELY(erts_system_monitor_long_gc); Uint ERTS_WRITE_UNLIKELY(erts_system_monitor_long_schedule); Eterm ERTS_WRITE_UNLIKELY(erts_system_monitor_large_heap); +Sint ERTS_WRITE_UNLIKELY(erts_system_monitor_long_msgq_on); +Sint ERTS_WRITE_UNLIKELY(erts_system_monitor_long_msgq_off); struct erts_system_monitor_flags_t erts_system_monitor_flags; /* system performance monitor */ @@ -9562,7 +9564,8 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) state = erts_atomic32_read_nob(&p->state); if ((state & ERTS_PSFLG_MSG_SIG_IN_Q) - && ERTS_MSG_RECV_TRACED(p) + && ((p->sig_qs.flags & FS_MON_MSGQ_LEN) + || ERTS_MSG_RECV_TRACED(p)) && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) { if (!(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_ACTIVE_SYS))) { goto sched_out_fetch_signals; @@ -10005,7 +10008,8 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) erts_runq_unlock(rq); erts_proc_lock(p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_MSGQ)); - if (ERTS_MSG_RECV_TRACED(p) + if (((p->sig_qs.flags & FS_MON_MSGQ_LEN) + || ERTS_MSG_RECV_TRACED(p)) && !(p->sig_qs.flags & FS_FLUSHING_SIGS)) { erts_proc_sig_fetch(p); } @@ -10043,7 +10047,16 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) erts_proc_lock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - state = erts_atomic32_read_nob(&p->state); + if (erts_system_monitor_long_msgq_off < 0) { + if (p->sig_qs.flags & FS_MON_MSGQ_LEN) + p->sig_qs.flags &= ~(FS_MON_MSGQ_LEN|FS_MON_MSGQ_LEN_LONG); + } + else { + if (!(p->sig_qs.flags & FS_MON_MSGQ_LEN)) + p->sig_qs.flags |= FS_MON_MSGQ_LEN; + } + + state = erts_atomic32_read_nob(&p->state); if (erts_sched_stat.enabled) { int prio; @@ -12482,13 +12495,14 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->sig_qs.cont_last = &p->sig_qs.cont; p->sig_qs.save = &p->sig_qs.first; p->sig_qs.recv_mrk_blk = NULL; - p->sig_qs.len = 0; + p->sig_qs.mq_len = 0; + p->sig_qs.mlenoffs = 0; p->sig_qs.nmsigs.next = NULL; p->sig_qs.nmsigs.last = NULL; p->sig_inq_contention_counter = 0; p->sig_inq.first = NULL; p->sig_inq.last = &p->sig_inq.first; - p->sig_inq.len = 0; + p->sig_inq.mlenoffs = 0; p->sig_inq.nmsigs.next = NULL; p->sig_inq.nmsigs.last = NULL; ASSERT(erts_atomic_read_nob(&p->sig_inq_buffers) == (erts_aint_t)NULL); @@ -12999,13 +13013,14 @@ void erts_init_empty_process(Process *p) p->sig_qs.cont_last = &p->sig_qs.cont; p->sig_qs.save = &p->sig_qs.first; p->sig_qs.recv_mrk_blk = NULL; - p->sig_qs.len = 0; + p->sig_qs.mq_len = 0; + p->sig_qs.mlenoffs = 0; p->sig_qs.nmsigs.next = NULL; p->sig_qs.nmsigs.last = NULL; p->sig_inq_contention_counter = 0; p->sig_inq.first = NULL; p->sig_inq.last = &p->sig_inq.first; - p->sig_inq.len = 0; + p->sig_inq.mlenoffs = 0; p->sig_inq.nmsigs.next = NULL; p->sig_inq.nmsigs.last = NULL; erts_atomic_init_nob(&p->sig_inq_buffers, (erts_aint_t)NULL); @@ -13085,7 +13100,8 @@ erts_debug_verify_clean_empty_process(Process* p) ASSERT(ERTS_P_LT_MONITORS(p) == NULL); ASSERT(ERTS_P_LINKS(p) == NULL); ASSERT(p->sig_qs.first == NULL); - ASSERT(p->sig_qs.len == 0); + ASSERT(p->sig_qs.mq_len == 0); + ASSERT(p->sig_qs.mlenoffs == 0); ASSERT(p->bif_timers == NULL); ASSERT(p->dictionary == NULL); ASSERT(p->catches == 0); @@ -13095,7 +13111,7 @@ erts_debug_verify_clean_empty_process(Process* p) ASSERT(p->parent == am_undefined); ASSERT(p->sig_inq.first == NULL); - ASSERT(p->sig_inq.len == 0); + ASSERT(p->sig_inq.mlenoffs == 0); /* Thing that erts_cleanup_empty_process() cleans up */ diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index b25129cf1909..8caa60c6c433 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1533,6 +1533,8 @@ extern Eterm ERTS_WRITE_UNLIKELY(erts_system_monitor); extern Uint ERTS_WRITE_UNLIKELY(erts_system_monitor_long_gc); extern Uint ERTS_WRITE_UNLIKELY(erts_system_monitor_long_schedule); extern Uint ERTS_WRITE_UNLIKELY(erts_system_monitor_large_heap); +extern Sint ERTS_WRITE_UNLIKELY(erts_system_monitor_long_msgq_on); +extern Sint ERTS_WRITE_UNLIKELY(erts_system_monitor_long_msgq_off); struct erts_system_monitor_flags_t { unsigned int busy_port : 1; unsigned int busy_dist_port : 1; @@ -1589,12 +1591,14 @@ extern int erts_system_profile_ts_type; #define FS_UNUSED (1 << 3) /* Unused */ #define FS_HANDLING_SIGS (1 << 4) /* Process is handling signals */ #define FS_WAIT_HANDLE_SIGS (1 << 5) /* Process is waiting to handle signals */ -#define FS_DELAYED_PSIGQS_LEN (1 << 6) /* Delayed update of sig_qs.len */ +#define FS_UNUSED2 (1 << 6) /* Unused */ #define FS_FLUSHING_SIGS (1 << 7) /* Currently flushing signals */ #define FS_FLUSHED_SIGS (1 << 8) /* Flushing of signals completed */ #define FS_NON_FETCH_CNT1 (1 << 9) /* First bit of non-fetch signals counter */ #define FS_NON_FETCH_CNT2 (1 << 10)/* Second bit of non-fetch signals counter */ #define FS_NON_FETCH_CNT4 (1 << 11)/* Third bit of non-fetch signals counter */ +#define FS_MON_MSGQ_LEN (1 << 12) /* Monitor of msgq len enabled */ +#define FS_MON_MSGQ_LEN_LONG (1 << 13)/* --"-- and it is currently long */ #define FS_NON_FETCH_CNT_MASK \ (FS_NON_FETCH_CNT1|FS_NON_FETCH_CNT2|FS_NON_FETCH_CNT4) @@ -2033,6 +2037,14 @@ void erts_print_scheduler_info(fmtfn_t to, void *to_arg, ErtsSchedulerData *esdp void erts_print_run_queue_info(fmtfn_t, void *to_arg, ErtsRunQueue*); void erts_dump_extended_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg); void erts_dump_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg); + +#define ERTS_PI_FLAG_SINGELTON (1 << 0) +#define ERTS_PI_FLAG_ALWAYS_WRAP (1 << 1) +#define ERTS_PI_FLAG_WANT_MSGS (1 << 2) +#define ERTS_PI_FLAG_NEED_MSGQ (1 << 3) +#define ERTS_PI_FLAG_FORCE_SIG_SEND (1 << 4) +#define ERTS_PI_FLAG_REQUEST_FOR_OTHER (1 << 5) + Eterm erts_process_info(Process *c_p, ErtsHeapFactory *hfact, Process *rp, ErtsProcLocks rp_locks, int *item_ix, int item_ix_len, diff --git a/erts/emulator/beam/jit/beam_jit_common.cpp b/erts/emulator/beam/jit/beam_jit_common.cpp index 8e78e2cf1656..b0888318dd22 100644 --- a/erts/emulator/beam/jit/beam_jit_common.cpp +++ b/erts/emulator/beam/jit/beam_jit_common.cpp @@ -1196,7 +1196,6 @@ Sint32 beam_jit_remove_message(Process *c_p, Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; - Sint len = erts_proc_sig_privqs_len(c_p); dtrace_proc_str(c_p, receiver_name); token2 = SEQ_TRACE_TOKEN(c_p); @@ -1208,7 +1207,7 @@ Sint32 beam_jit_remove_message(Process *c_p, DTRACE6(message_receive, receiver_name, size_object(ERL_MESSAGE_TERM(msgp)), - len, /* This is NOT message queue len, but its something... */ + c_p->sig_qs.mq_len, tok_label, tok_lastcnt, tok_serial); diff --git a/erts/emulator/test/trace_SUITE.erl b/erts/emulator/test/trace_SUITE.erl index 37f47cff3068..8e6f18c4d695 100644 --- a/erts/emulator/test/trace_SUITE.erl +++ b/erts/emulator/test/trace_SUITE.erl @@ -37,7 +37,8 @@ system_monitor_args/1, more_system_monitor_args/1, system_monitor_long_gc_1/1, system_monitor_long_gc_2/1, system_monitor_large_heap_1/1, system_monitor_large_heap_2/1, - system_monitor_long_schedule/1, + system_monitor_long_schedule/1, system_monitor_long_message_queue/1, + system_monitor_long_message_queue_ignore/1, bad_flag/1, trace_delivered/1, trap_exit_self_receive/1, trace_info_badarg/1, erl_704/1, ms_excessive_nesting/1]). @@ -62,7 +63,9 @@ all() -> more_system_monitor_args, system_monitor_long_gc_1, system_monitor_long_gc_2, system_monitor_large_heap_1, system_monitor_long_schedule, - system_monitor_large_heap_2, bad_flag, trace_delivered, + system_monitor_large_heap_2, system_monitor_long_message_queue, + system_monitor_long_message_queue_ignore, + bad_flag, trace_delivered, trap_exit_self_receive, trace_info_badarg, erl_704, ms_excessive_nesting]. @@ -849,13 +852,16 @@ system_monitor_args(Config) when is_list(Config) -> end, {Self,[{large_heap,MinN}]} = erlang:system_monitor(), {Self,[{large_heap,MinN}]} = - erlang:system_monitor(Self, [busy_port]), + erlang:system_monitor(Self,[{long_message_queue, {100,101}}]), + {Self,[{long_message_queue,{100,101}}]} = erlang:system_monitor(), + {Self,[{long_message_queue,{100,101}}]} = + erlang:system_monitor(Self, [busy_port]), {Self,[busy_port]} = erlang:system_monitor(), {Self,[busy_port]} = erlang:system_monitor({Self,[busy_dist_port]}), {Self,[busy_dist_port]} = erlang:system_monitor(), All = lists:sort([busy_port,busy_dist_port, - {long_gc,1},{large_heap,65535}]), + {long_gc,1},{large_heap,65535},{long_message_queue,{99,100}}]), {Self,[busy_dist_port]} = erlang:system_monitor(Self, All), {Self,A1} = erlang:system_monitor(), All = lists:sort(A1), @@ -889,6 +895,14 @@ system_monitor_args(Config) when is_list(Config) -> (catch erlang:system_monitor(Self,[{large_heap,-1}])), {'EXIT',{badarg,_}} = (catch erlang:system_monitor({Self,[{large_heap,atom}]})), + {'EXIT',{badarg,_}} = + (catch erlang:system_monitor(Self,[{long_message_queue, {100,100}}])), + {'EXIT',{badarg,_}} = + (catch erlang:system_monitor(Self,[{long_message_queue, {-1,1}}])), + {'EXIT',{badarg,_}} = + (catch erlang:system_monitor(Self,[{long_message_queue, {0,-1}}])), + {'EXIT',{badarg,_}} = + (catch erlang:system_monitor(Self,[{long_message_queue, {-1,0}}])), ok. @@ -1169,6 +1183,93 @@ large_heap_check(Pid, Size, Result) -> Result end. + +system_monitor_long_message_queue(Config) when is_list(Config) -> + Self = self(), + SMonPrxy = spawn_link(fun () -> smon_lmq_proxy(Self) end), + erlang:system_monitor(SMonPrxy,[{long_message_queue, {50,100}}]), + erlang:yield(), + lists:foreach(fun (_) -> self() ! hello end, lists:seq(1, 100)), + receive {monitor,Self,long_message_queue,true} -> ok + after 5000 -> ct:fail(missing_on_message) + end, + + lists:foreach(fun (_) -> self() ! hello end, lists:seq(1, 10)), + receive {monitor,Self,long_message_queue,_} = Msg0 -> ct:fail({unexpected_message, Msg0}) + after 1000 -> ok + end, + + lists:foreach(fun (_) -> receive hello -> ok end end, lists:seq(1, 50)), + receive {monitor,Self,long_message_queue,_} = Msg1 -> ct:fail({unexpected_message, Msg1}) + after 1000 -> ok + end, + + lists:foreach(fun (_) -> self() ! hello end, lists:seq(1, 50)), + receive {monitor,Self,long_message_queue,_} = Msg2 -> ct:fail({unexpected_message, Msg2}) + after 1000 -> ok + end, + + lists:foreach(fun (_) -> receive hello -> ok end end, lists:seq(1, 60)), + receive {monitor,Self,long_message_queue,false} -> ok + after 5000 -> ct:fail(missing_off_message) + end, + + lists:foreach(fun (_) -> self() ! hello end, lists:seq(1, 100)), + receive {monitor,Self,long_message_queue,true} -> ok + after 5000 -> ct:fail(missing_on_message) + end, + + lists:foreach(fun (_) -> receive hello -> ok end end, lists:seq(1, 100)), + receive {monitor,Self,long_message_queue,false} -> ok + after 5000 -> ct:fail(missing_off_message) + end, + + lists:foreach(fun (_) -> receive hello -> ok end end, lists:seq(1, 50)), + {message_queue_len, 0} = process_info(self(), message_queue_len), + + unlink(SMonPrxy), + exit(SMonPrxy, kill), + false = is_process_alive(SMonPrxy), + + erlang:system_monitor(undefined), + ok. + +smon_lmq_proxy(To) -> + receive Msg -> To ! Msg end, + smon_lmq_proxy(To). + +system_monitor_long_message_queue_ignore(Config) when is_list(Config) -> + %% + %% Ensure that messages are delivered and monitored even if a + %% process ignores the message queue while continuesly executing. + %% + erlang:system_monitor(self(),[{long_message_queue, {50,100}}]), + Pid = spawn_opt(fun ignore_messages_working/0, [{priority,low}, link]), + lists:foreach(fun (_) -> Pid ! hello end, lists:seq(1, 50)), + receive {monitor,Pid,long_message_queue,_} = Msg0 -> ct:fail({unexpected_message, Msg0}) + after 1000 -> ok + end, + + lists:foreach(fun (_) -> Pid ! hello end, lists:seq(1, 50)), + receive {monitor,Pid,long_message_queue,true} -> ok + after 5000 -> ct:fail(missing_on_message) + end, + + unlink(Pid), + exit(Pid, kill), + false = is_process_alive(Pid), + + erlang:system_monitor(undefined), + + ok. + +ignore_messages_working() -> + _ = id(lists:seq(1, 10000)), + ignore_messages_working(). + +id(X) -> + X. + seq(N, M) -> seq(N, M, []). diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index 0e764c24307b..3fbfd1f8905d 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -1616,10 +1616,22 @@ define etp-sigq-flags-int # Args: int # if ($arg0 & ~((1 << 9)-1)) - printf "GARBAGE<%x> ", ($arg0 & ~((1 << 9)-1)) + printf "GARBAGE<%x> ", ($arg0 & ~((1 << 11)-1)) + end + if ($arg0 & (1 << 10)) + printf "mon-msgq-len-long " + end + if ($arg0 & (1 << 9)) + printf "mon-msgq-len " + end + if ($arg0 & (1 << 8)) + printf "flushed-sigs " + end + if ($arg0 & (1 << 7)) + printf "flushing-sigs " end if ($arg0 & (1 << 6)) - printf "delayed-sigq-len " + printf "UNUSED<2> " end if ($arg0 & (1 << 5)) printf "wait-handle-sig " @@ -1628,7 +1640,7 @@ define etp-sigq-flags-int printf "handling-sig " end if ($arg0 & (1 << 3)) - printf "local-signals-only " + printf "UNUSED<1> " end if ($arg0 & (1 << 2)) printf "offheap-msgq-changing " @@ -2734,7 +2746,7 @@ define etp-process-info-int printf "0\n" end printf " Mbuf size: %ld\n", $etp_proc->mbuf_sz - printf " Msgq len: %ld (inner=%ld, outer=%ld)\n", ($etp_proc->sig_qs.len + $etp_proc->sig_inq.len), $etp_proc->sig_qs.len, $etp_proc->sig_inq.len + printf " Msgq len: %ld\n", $etp_proc->sig_qs.mq_len if (!($arg1)) printf " Msgq Flags: " etp-sigq-flags $etp_proc @@ -2939,7 +2951,7 @@ define etp-process-memory-info printf " | none " end printf "] [Mbuf: %5ld", $etp_pmem_proc->mbuf_sz - printf " | %3ld (%3ld | %3ld)", ($etp_pmem_proc->sig_qs.len + $etp_pmem_proc->sig_inq.len), $etp_pmem_proc->sig_qs.len, $etp_pmem_proc->sig_inq.len + printf " | %3ld", $etp_pmem_proc->sig_qs.mq_len printf "] " if ($etp_pmem_proc->i) printf " I: " diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index 00ea9e5775fb..2c14a1469df7 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 110e6e62894b..59ce4476cb6a 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -347,6 +347,8 @@ 'busy_port' | 'busy_dist_port' | {'long_gc', non_neg_integer()} | + {'long_message_queue', {Disable :: non_neg_integer(), + Enable :: pos_integer()}} | {'long_schedule', non_neg_integer()} | {'large_heap', non_neg_integer()}.