From 3f2fa270989a4b8412c41bc9f7186cfff399c03c Mon Sep 17 00:00:00 2001 From: Alan Jowett Date: Tue, 8 Oct 2024 09:56:19 -0700 Subject: [PATCH 1/2] Add and remove CPU from epoch computation on active and idle Signed-off-by: Alan Jowett --- libs/runtime/ebpf_epoch.c | 272 ++++++++++++++++++--- libs/runtime/ebpf_epoch.h | 2 +- libs/runtime/ebpf_work_queue.c | 23 +- libs/runtime/ebpf_work_queue.h | 11 + libs/runtime/unit/platform_unit_test.cpp | 296 ++++++++++++++++++++++- 5 files changed, 561 insertions(+), 43 deletions(-) diff --git a/libs/runtime/ebpf_epoch.c b/libs/runtime/ebpf_epoch.c index ef0e69c369..7f69140d22 100644 --- a/libs/runtime/ebpf_epoch.c +++ b/libs/runtime/ebpf_epoch.c @@ -17,7 +17,23 @@ * 1) Each CPU determines the minimum epoch of all threads on the CPU. * 2) The minimum epoch is committed as the release epoch and any memory that is older than the release epoch is * released. - * 3) The epoch_computation_in_progress flag is cleared which allows the epoch computation to be initiated again. + * 3) The epoch_computation_in_progress flag is cleared which allows the epoch computation to be initiated again. + * + * Note: + * CPUs can be in one of three states: + * 1) Inactive: The CPU is not actively participating in epoch computation. + * Active flag is false. + * 2) Activating: The CPU is in the process of activating and is not yet active. + * Active flag is true and current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH. + * 3) Active: The CPU is actively participating in epoch computation. + * Active flag is true and current_epoch != EBPF_EPOCH_UNKNOWN_EPOCH. + * + * All CPUs except CPU 0 are in the inactive state at initialization. CPU 0 is always active. + * + * CPUs transition between states as follows: + * 1) Inactive -> Activating: The CPU is activated when a thread enters an epoch and the CPU is not active. + * 2) Activating -> Active: The CPU is active when it is notified of the current epoch value. + * 3) Active -> Inactive: The CPU is deactivated when there are no threads in the epoch and the free list is empty. */ /** @@ -30,9 +46,21 @@ */ #define EBPF_NANO_SECONDS_PER_FILETIME_TICK 100 +/** + * @brief A sentinel value used to indicate that the epoch is unknown. This is used to indicate that the CPU is in the + * process of activating. + */ +#define EBPF_EPOCH_UNKNOWN_EPOCH 0 + +/** + * @brief The first valid epoch value. + * Epoch values start at 1 and increments when _ebpf_epoch_messenger_propose_release_epoch is called. + */ +#define EBPF_EPOCH_FIRST_EPOCH 1 + #define EBPF_EPOCH_FAIL_FAST(REASON, ASSERTION) \ if (!(ASSERTION)) { \ - ebpf_assert(!#ASSERTION); \ + ebpf_assert(#ASSERTION); \ __fastfail(REASON); \ } @@ -44,16 +72,27 @@ */ typedef __declspec(align(EBPF_CACHE_LINE_SIZE)) struct _ebpf_epoch_cpu_entry { - LIST_ENTRY epoch_state_list; ///< Per-CPU list of thread entries. - ebpf_list_entry_t free_list; ///< Per-CPU free list. - int64_t current_epoch; ///< The current epoch for this CPU. - int64_t released_epoch; ///< The newest epoch that can be released. - int timer_armed : 1; ///< Set if the flush timer is armed. - int rundown_in_progress : 1; ///< Set if rundown is in progress. - int epoch_computation_in_progress : 1; ///< Set if epoch computation is in progress. - ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items. + LIST_ENTRY epoch_state_list; ///< Per-CPU list of thread entries. + ebpf_list_entry_t free_list; ///< Per-CPU free list. + int64_t current_epoch; ///< The current epoch for this CPU. + int64_t released_epoch; ///< The newest epoch that can be released. + uint64_t timer_armed : 1; ///< Set if the flush timer is armed. + uint64_t rundown_in_progress : 1; ///< Set if rundown is in progress. + uint64_t epoch_computation_in_progress : 1; ///< Set if epoch computation is in progress. + uint64_t active : 1; ///< CPU is active in epoch computation. Only accessed under _ebpf_epoch_active_cpu_list_lock. + uint64_t work_queue_assigned : 1; ///< Work queue is assigned to this CPU. + uint64_t reserved : 59; ///< Reserved for future use. + ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items. } ebpf_epoch_cpu_entry_t; +static_assert( + sizeof(ebpf_epoch_cpu_entry_t) == EBPF_CACHE_LINE_SIZE, "ebpf_epoch_cpu_entry_t must fit within one cache line"); + +/** + * @brief Lock to ensure a consistent view of the active CPUs. + */ +static ebpf_lock_t _ebpf_epoch_active_cpu_list_lock; ///< Lock to protect the active CPU list. + /** * @brief Table of per-CPU state. */ @@ -116,12 +155,12 @@ typedef struct _ebpf_epoch_cpu_message { struct { - uint64_t current_epoch; ///< The new current epoch. - uint64_t proposed_release_epoch; ///< Minimum epoch of all threads on the CPU. + int64_t current_epoch; ///< The new current epoch. + int64_t proposed_release_epoch; ///< Minimum epoch of all threads on the CPU. } propose_epoch; struct { - uint64_t released_epoch; ///< The newest epoch that can be released. + int64_t released_epoch; ///< The newest epoch that can be released. } commit_epoch; struct { @@ -228,6 +267,13 @@ static _IRQL_requires_(DISPATCH_LEVEL) void _ebpf_epoch_arm_timer_if_needed(ebpf static void _ebpf_epoch_work_item_callback(_In_ cxplat_preemptible_work_item_t* preemptible_work_item, void* context); +_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_activate_cpu(uint32_t cpu_id); + +_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_deactivate_cpu(uint32_t cpu_id); + +uint32_t +_ebpf_epoch_next_active_cpu(uint32_t cpu_id); + /** * @brief Raise the CPU's IRQL to DISPATCH_LEVEL if it is below DISPATCH_LEVEL. * First check if the IRQL is below DISPATCH_LEVEL to avoid the overhead of @@ -282,12 +328,13 @@ ebpf_epoch_initiate() goto Error; } + ebpf_lock_create(&_ebpf_epoch_active_cpu_list_lock); + ebpf_assert(EBPF_CACHE_ALIGN_POINTER(_ebpf_epoch_cpu_table) == _ebpf_epoch_cpu_table); // Initialize the per-CPU state. for (uint32_t cpu_id = 0; cpu_id < _ebpf_epoch_cpu_count; cpu_id++) { ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; - cpu_entry->current_epoch = 1; ebpf_list_initialize(&cpu_entry->epoch_state_list); ebpf_list_initialize(&cpu_entry->free_list); } @@ -306,11 +353,32 @@ ebpf_epoch_initiate() } } + // Set the current thread affinity to CPU 0. + // This could fail if the system is preventing the thread from moving to CPU 0. + GROUP_AFFINITY old_thread_affinity; + return_value = ebpf_set_current_thread_cpu_affinity(0, &old_thread_affinity); + if (return_value != EBPF_SUCCESS) { + goto Error; + } + + KIRQL old_irql = KeRaiseIrqlToDpcLevel(); + + // Activate CPU 0. + _ebpf_epoch_activate_cpu(0); + + KeLowerIrql(old_irql); + + // Restore the thread affinity. Can't fail at this point. + ebpf_restore_current_thread_cpu_affinity(&old_thread_affinity); + KeInitializeDpc(&_ebpf_epoch_timer_dpc, _ebpf_epoch_timer_worker, NULL); KeSetTargetProcessorDpc(&_ebpf_epoch_timer_dpc, 0); KeInitializeTimer(&_ebpf_epoch_compute_release_epoch_timer); + // Compute the first release epoch. + ebpf_epoch_synchronize(); + Error: if (return_value != EBPF_SUCCESS && _ebpf_epoch_cpu_table) { for (uint32_t cpu_id = 0; cpu_id < _ebpf_epoch_cpu_count; cpu_id++) { @@ -362,6 +430,9 @@ ebpf_epoch_terminate() cxplat_free( _ebpf_epoch_cpu_table, CXPLAT_POOL_FLAG_NON_PAGED | CXPLAT_POOL_FLAG_CACHE_ALIGNED, EBPF_POOL_TAG_EPOCH); _ebpf_epoch_cpu_table = NULL; + + ebpf_lock_destroy(&_ebpf_epoch_active_cpu_list_lock); + EBPF_RETURN_VOID(); } @@ -379,6 +450,10 @@ ebpf_epoch_enter(_Out_ ebpf_epoch_state_t* epoch_state) epoch_state->epoch = cpu_entry->current_epoch; ebpf_list_insert_tail(&cpu_entry->epoch_state_list, &epoch_state->epoch_list_entry); + if (!cpu_entry->active) { + _ebpf_epoch_activate_cpu(epoch_state->cpu_id); + } + _ebpf_epoch_lower_to_previous_irql(epoch_state->irql_at_enter); } #pragma warning(pop) @@ -691,6 +766,10 @@ _ebpf_epoch_insert_in_free_list(_In_ ebpf_epoch_allocation_header_t* header) uint32_t cpu_id = ebpf_get_current_cpu(); ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; + if (!cpu_entry->active) { + _ebpf_epoch_activate_cpu(cpu_id); + } + if (cpu_entry->rundown_in_progress) { KeLowerIrql(old_irql); switch (header->entry_type) { @@ -788,8 +867,6 @@ void _ebpf_epoch_messenger_propose_release_epoch( _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { - // Walk over each thread_entry in the epoch_state_list and compute the minimum epoch. - ebpf_list_entry_t* entry = cpu_entry->epoch_state_list.Flink; ebpf_epoch_state_t* epoch_state; uint32_t next_cpu; @@ -801,6 +878,20 @@ _ebpf_epoch_messenger_propose_release_epoch( } // Other CPUs update the current epoch. else { + // When a CPU transitions from inactive to activating, it sets the current_epoch to EBPF_EPOCH_UNKNOWN_EPOCH. + // If the epoch was EBPF_EPOCH_UNKNOWN_EPOCH (i.e. the CPU was activating when this memory was queued), then + // update the freed_epoch for all items in the free list now that we know the current epoch. This occurs when + // the CPU is activated and continues until the first epoch is proposed. + if (cpu_entry->current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH) { + for (ebpf_list_entry_t* entry = cpu_entry->free_list.Flink; entry != &cpu_entry->free_list; + entry = entry->Flink) { + ebpf_epoch_allocation_header_t* header = + CONTAINING_RECORD(entry, ebpf_epoch_allocation_header_t, list_entry); + ebpf_assert(header->freed_epoch == EBPF_EPOCH_UNKNOWN_EPOCH); + header->freed_epoch = message->message.propose_epoch.current_epoch; + } + } + cpu_entry->current_epoch = message->message.propose_epoch.current_epoch; } @@ -808,25 +899,24 @@ _ebpf_epoch_messenger_propose_release_epoch( MemoryBarrier(); // Previous CPU's minimum epoch. - uint64_t minimum_epoch = message->message.propose_epoch.proposed_release_epoch; + int64_t minimum_epoch = message->message.propose_epoch.proposed_release_epoch; - while (entry != &cpu_entry->epoch_state_list) { + // Walk over each thread_entry in the epoch_state_list and compute the minimum epoch. + for (ebpf_list_entry_t* entry = cpu_entry->epoch_state_list.Flink; entry != &cpu_entry->epoch_state_list; + entry = entry->Flink) { epoch_state = CONTAINING_RECORD(entry, ebpf_epoch_state_t, epoch_list_entry); minimum_epoch = min(minimum_epoch, epoch_state->epoch); - entry = entry->Flink; } // Set the proposed release epoch to the minimum epoch seen so far. message->message.propose_epoch.proposed_release_epoch = minimum_epoch; + next_cpu = _ebpf_epoch_next_active_cpu(current_cpu); + // If this is the last CPU, then send a message to the first CPU to commit the release epoch. - if (current_cpu == _ebpf_epoch_cpu_count - 1) { + if (next_cpu == 0) { message->message.commit_epoch.released_epoch = minimum_epoch; message->message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_COMMIT_RELEASE_EPOCH; - next_cpu = 0; - } else { - // Send the message to the next CPU. - next_cpu = current_cpu + 1; } _ebpf_epoch_send_message_async(message, next_cpu); @@ -854,22 +944,43 @@ _ebpf_epoch_messenger_commit_release_epoch( { uint32_t next_cpu; + // If any epoch_states are in EBPF_EPOCH_UNKNOWN_EPOCH, then activation of a CPU is in progress. + bool other_cpus_are_activating = (message->message.commit_epoch.released_epoch == EBPF_EPOCH_UNKNOWN_EPOCH); + + // If this CPU is in EBPF_EPOCH_UNKNOWN_EPOCH, then activation of this CPU is in progress. + bool this_cpu_is_activating = (cpu_entry->current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH); + cpu_entry->timer_armed = false; // Set the released_epoch to the value computed by the EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH message. cpu_entry->released_epoch = message->message.commit_epoch.released_epoch - 1; + next_cpu = _ebpf_epoch_next_active_cpu(current_cpu); + // If this is the last CPU, send the message to the first CPU to complete the cycle. - if (current_cpu != _ebpf_epoch_cpu_count - 1) { - // Send the message to the next CPU. - next_cpu = current_cpu + 1; - } else { + if (next_cpu == 0) { message->message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_EPOCH_COMPLETE; - next_cpu = 0; } _ebpf_epoch_send_message_async(message, next_cpu); + // Wait for all the CPUs to transition to an active state. + // If CPUs are activating, then it is not possible to determine what epoch they are in + // and memory cannot be released. + if (other_cpus_are_activating || this_cpu_is_activating) { + // One or more CPUs are still activating. Rearm the timer and wait for the next message. + _ebpf_epoch_arm_timer_if_needed(cpu_entry); + return; + } + + // All CPUs have transitioned to an active state and the epoch computation was successfully completed. + // Release any memory that is associated with expired epochs. _ebpf_epoch_release_free_list(cpu_entry, cpu_entry->released_epoch); + + // Check if this CPU is idle and deactivate it if it is (CPU 0 is always active). + if ((current_cpu != 0) && ebpf_list_is_empty(&cpu_entry->free_list) && + ebpf_list_is_empty(&cpu_entry->epoch_state_list)) { + _ebpf_epoch_deactivate_cpu(current_cpu); + } } /** @@ -935,15 +1046,13 @@ _ebpf_epoch_messenger_rundown_in_progress( { uint32_t next_cpu; cpu_entry->rundown_in_progress = true; + + next_cpu = _ebpf_epoch_next_active_cpu(current_cpu); + // If this is the last CPU, then stop. - if (current_cpu != _ebpf_epoch_cpu_count - 1) { - // Send the message to the next CPU. - next_cpu = current_cpu + 1; - } else { + if (next_cpu == 0) { // Signal the caller that rundown is complete. KeSetEvent(&message->completion_event, 0, FALSE); - // Set next_cpu to UINT32_MAX to make code analysis happy. - next_cpu = UINT32_MAX; return; } @@ -1069,3 +1178,96 @@ _ebpf_epoch_work_item_callback(_In_ cxplat_preemptible_work_item_t* preemptible_ cxplat_release_rundown_protection(&_ebpf_epoch_work_item_rundown_ref); } + +/** + * @brief Add the CPU to the next active CPU table. Must be called when running on cpu_id. + * + * @param[in] cpu_id CPU to add. Passed to avoid calling ebpf_get_current_cpu() again. + */ +_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_activate_cpu(uint32_t cpu_id) +{ + EBPF_LOG_ENTRY(); + + EBPF_LOG_MESSAGE_UINT64(EBPF_TRACELOG_LEVEL_INFO, EBPF_TRACELOG_KEYWORD_EPOCH, "Activating CPU", cpu_id); + + ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; + ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_active_cpu_list_lock); + + ebpf_assert(!cpu_entry->active); + ebpf_assert(ebpf_list_is_empty(&cpu_entry->free_list)); + ebpf_assert(cpu_id == ebpf_get_current_cpu()); + + cpu_entry->active = true; + // When the CPU is activated, the current epoch is not known. + // Memory freed before the current epoch is set will have its epoch set to EBPF_EPOCH_UNKNOWN_EPOCH and have its + // epoch set when the current epoch is known (i.e., when the next epoch is proposed). + cpu_entry->current_epoch = EBPF_EPOCH_UNKNOWN_EPOCH; + + if (!cpu_entry->work_queue_assigned) { + ebpf_result_t result = ebpf_timed_work_queue_set_cpu_id(cpu_entry->work_queue, cpu_id); + if (result != EBPF_SUCCESS) { + // This is a fatal error. The epoch system is in an inconsistent state. + // Bugcheck the system as this is non-recoverable state. + __fastfail(FAST_FAIL_INVALID_ARG); + } + cpu_entry->work_queue_assigned = 1; + } + + ebpf_lock_unlock(&_ebpf_epoch_active_cpu_list_lock, state); + EBPF_LOG_EXIT(); +} + +/** + * @brief Remove the CPU from the next active CPU table. + * + * @param[in] cpu_id CPU to remove. Passed to avoid calling ebpf_get_current_cpu() again. + */ +_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_deactivate_cpu(uint32_t cpu_id) +{ + EBPF_LOG_ENTRY(); + ebpf_assert(cpu_id == ebpf_get_current_cpu()); + + // Deactivating CPU 0 is not allowed. + ebpf_assert(cpu_id != 0); + ebpf_assert(ebpf_list_is_empty(&_ebpf_epoch_cpu_table[cpu_id].epoch_state_list)); + ebpf_assert(ebpf_list_is_empty(&_ebpf_epoch_cpu_table[cpu_id].free_list)); + + EBPF_LOG_MESSAGE_UINT64(EBPF_TRACELOG_LEVEL_INFO, EBPF_TRACELOG_KEYWORD_EPOCH, "Deactivating CPU", cpu_id); + + ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; + ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_active_cpu_list_lock); + cpu_entry->active = false; + ebpf_lock_unlock(&_ebpf_epoch_active_cpu_list_lock, state); + + EBPF_LOG_EXIT(); +} + +/** + * @brief Given the current CPU, return the next active CPU. + * + * @param[in] cpu_id The current CPU. Passed to avoid calling ebpf_get_current_cpu() again. + * @return The next active CPU. + */ +uint32_t +_ebpf_epoch_next_active_cpu(uint32_t cpu_id) +{ + uint32_t next_active_cpu; + ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_active_cpu_list_lock); + + // Prevent an infinite loop by making sure that CPU 0 is always active. + ebpf_assert(_ebpf_epoch_cpu_table[0].active); + + for (next_active_cpu = cpu_id + 1; next_active_cpu < _ebpf_epoch_cpu_count; next_active_cpu++) { + if (_ebpf_epoch_cpu_table[next_active_cpu].active) { + break; + } + } + + if (next_active_cpu == _ebpf_epoch_cpu_count) { + next_active_cpu = 0; + } + + ebpf_lock_unlock(&_ebpf_epoch_active_cpu_list_lock, state); + + return next_active_cpu; +} diff --git a/libs/runtime/ebpf_epoch.h b/libs/runtime/ebpf_epoch.h index b93a3c2114..3b1ddc1c72 100644 --- a/libs/runtime/ebpf_epoch.h +++ b/libs/runtime/ebpf_epoch.h @@ -14,7 +14,7 @@ extern "C" typedef struct _ebpf_epoch_state { LIST_ENTRY epoch_list_entry; /// List entry for the epoch list. - uint64_t epoch; /// The epoch when this entry was added to the list. + int64_t epoch; /// The epoch when this entry was added to the list. uint32_t cpu_id; /// The CPU on which this entry was added to the list. KIRQL irql_at_enter; /// The IRQL when this entry was added to the list. } ebpf_epoch_state_t; diff --git a/libs/runtime/ebpf_work_queue.c b/libs/runtime/ebpf_work_queue.c index 564e6039fd..7d836190b5 100644 --- a/libs/runtime/ebpf_work_queue.c +++ b/libs/runtime/ebpf_work_queue.c @@ -51,6 +51,21 @@ ebpf_timed_work_queue_create( KeInitializeTimer(&local_work_queue->timer); KeInitializeDpc(&local_work_queue->dpc, _ebpf_timed_work_queue_timer_callback, local_work_queue); + *work_queue = local_work_queue; + local_work_queue = NULL; + return_value = EBPF_SUCCESS; + +Done: + ebpf_timed_work_queue_destroy(local_work_queue); + + return return_value; +} + +_Must_inspect_result_ ebpf_result_t +ebpf_timed_work_queue_set_cpu_id(_Inout_ ebpf_timed_work_queue_t* work_queue, uint32_t cpu_id) +{ + ebpf_lock_state_t lock_state = ebpf_lock_lock(&work_queue->lock); + ebpf_result_t return_value; PROCESSOR_NUMBER processor_number; NTSTATUS status = KeGetProcessorNumberFromIndex(cpu_id, &processor_number); if (!NT_SUCCESS(status)) { @@ -58,18 +73,18 @@ ebpf_timed_work_queue_create( goto Done; } - status = KeSetTargetProcessorDpcEx(&local_work_queue->dpc, &processor_number); + status = KeSetTargetProcessorDpcEx(&work_queue->dpc, &processor_number); if (!NT_SUCCESS(status)) { return_value = EBPF_INVALID_ARGUMENT; goto Done; } - *work_queue = local_work_queue; - local_work_queue = NULL; + work_queue->cpu_id = cpu_id; + return_value = EBPF_SUCCESS; Done: - ebpf_timed_work_queue_destroy(local_work_queue); + ebpf_lock_unlock(&work_queue->lock, lock_state); return return_value; } diff --git a/libs/runtime/ebpf_work_queue.h b/libs/runtime/ebpf_work_queue.h index d87ef6aef3..a3e6612cc3 100644 --- a/libs/runtime/ebpf_work_queue.h +++ b/libs/runtime/ebpf_work_queue.h @@ -48,6 +48,17 @@ extern "C" _In_ ebpf_timed_work_queue_callback_t callback, _In_ void* context); + /** + * @brief Set the CPU ID for the timed work queue. + * + * @param[in,out] work_queue Work queue to set the CPU ID for. + * @param[in] cpu_id Which CPU to run the work queue on. + * @retval EBPF_SUCCESS The operation was successful. + * @retval EBPF_INVALID_ARGUMENT The CPU ID is invalid. + */ + _Must_inspect_result_ ebpf_result_t + ebpf_timed_work_queue_set_cpu_id(_Inout_ ebpf_timed_work_queue_t* work_queue, uint32_t cpu_id); + /** * @brief Destroy a timed work queue. * diff --git a/libs/runtime/unit/platform_unit_test.cpp b/libs/runtime/unit/platform_unit_test.cpp index 5ec5849237..6779687470 100644 --- a/libs/runtime/unit/platform_unit_test.cpp +++ b/libs/runtime/unit/platform_unit_test.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include extern ebpf_helper_function_prototype_t* ebpf_core_helper_function_prototype; @@ -74,6 +75,14 @@ typedef class _signal std::unique_lock l(lock); condition_variable.wait(l, [&]() { return signaled; }); } + + bool + wait_for(std::chrono::milliseconds timeout) + { + std::unique_lock l(lock); + return condition_variable.wait_for(l, timeout, [&]() { return signaled; }); + } + void signal() { @@ -82,6 +91,14 @@ typedef class _signal condition_variable.notify_all(); } + void + reset() + { + std::unique_lock l(lock); + signaled = false; + condition_variable.notify_all(); + } + private: std::mutex lock; std::condition_variable condition_variable; @@ -156,7 +173,12 @@ typedef class _ebpf_epoch_scope /** * @brief Construct a new ebpf epoch scope object and enter epoch. */ - _ebpf_epoch_scope() : in_epoch(false) { enter(); } + _ebpf_epoch_scope(bool enter_on_construct = true) : in_epoch(false), epoch_state{} + { + if (enter_on_construct) { + enter(); + } + } /** * @brief Leave epoch if entered. @@ -196,8 +218,8 @@ typedef class _ebpf_epoch_scope } private: - ebpf_epoch_state_t epoch_state; - bool in_epoch; + ebpf_epoch_state_t epoch_state{}; + bool in_epoch = false; } ebpf_epoch_scope_t; TEST_CASE("hash_table_test", "[platform]") @@ -627,6 +649,274 @@ TEST_CASE("epoch_test_stale_items", "[platform]") REQUIRE(ebpf_epoch_is_free_list_empty(1)); } } +typedef struct _work_item_context +{ + _work_item_context() : signal() {} + signal_t signal; + const static void + invoke(void* context) + { + _work_item_context* work_item_context = reinterpret_cast<_work_item_context*>(context); + work_item_context->signal.signal(); + } +} work_item_context_t; + +typedef std::unique_ptr work_item_ptr; + +struct scoped_cpu_affinity +{ + scoped_cpu_affinity(uint32_t i) : old_affinity_mask{} + { + affinity_set = ebpf_set_current_thread_cpu_affinity(i, &old_affinity_mask) == EBPF_SUCCESS; + REQUIRE(affinity_set); + } + ~scoped_cpu_affinity() + { + if (affinity_set) { + ebpf_restore_current_thread_cpu_affinity(&old_affinity_mask); + } + } + GROUP_AFFINITY old_affinity_mask; + bool affinity_set = false; +}; + +struct scoped_usersim_override +{ + scoped_usersim_override() { usersim_set_affinity_and_priority_override(0); } + ~scoped_usersim_override() { usersim_clear_affinity_and_priority_override(); } +}; + +/** + * @brief This function executes as a test script to verify epoch behavior. Various scripts are used to test different + * scenarios. + * + * @param[in] script The script to execute. + * + * @details The script is a vector of strings. Each string is a comma separated list of tokens. The first token is the + * command to execute. The remaining tokens are arguments to the command. + * If a step fails it signals this via a failing REQUIRE. + */ +static void +_run_epoch_test_script(const std::vector& script) +{ + using namespace std::chrono_literals; + typedef std::vector script_t; + + _test_helper test_helper; + test_helper.initialize(); + // Add scope to ensure that epoch state is cleaned up before test_helper. + { + std::vector work_item_contexts(2); + std::vector epoch_states = {false, false}; + std::vector work_items; + // Start on CPU 0 as default. + scoped_cpu_affinity affinity_scope(0); + + // Prevent usersim from interfering with the test. + scoped_usersim_override usersim_override; + + typedef std::variant, std::function, std::function> + step_t; + + std::map steps; + + // Define possible steps the script can take. + + // Initialize the state. + steps["setup"] = [&] { + work_items.clear(); + for (auto i = 0; i < 2; i++) { + work_item_contexts[i].signal.reset(); + auto work_item = ebpf_epoch_allocate_work_item(&work_item_contexts[i], work_item_context_t::invoke); + if (!work_item) { + FAIL("Failed to allocate work item."); + } + work_items.push_back(work_item_ptr(work_item, ebpf_epoch_cancel_work_item)); + } + }; + + // Switch to running on CPU N + steps["switch_cpu"] = [&](size_t i) { + GROUP_AFFINITY old_affinity{}; + (void)ebpf_set_current_thread_cpu_affinity(static_cast(i), &old_affinity); + }; + + // Enter epoch for location N + steps["enter_epoch"] = [&](size_t i) { epoch_states[i].enter(); }; + + // Exit epoch for location N + steps["exit_epoch"] = [&](size_t i) { epoch_states[i].exit(); }; + + // Submit work item N + steps["schedule"] = [&](size_t i) { ebpf_epoch_schedule_work_item(work_items[i].release()); }; + + // Wait for work item N to complete with success or timeout. + steps["wait"] = [&](size_t i, bool expect_success) { + if (expect_success) { + work_item_contexts[i].signal.wait(); + } else { + REQUIRE(!work_item_contexts[i].signal.wait_for(1s)); + } + }; + + // Explicitly clean up all state. + steps["clean up"] = [&] { + for (auto i = 0; i < work_items.size(); i++) { + work_items[i].reset(); + } + }; + + // Invoke epoch synchronize to ensure that all memory that is no longer in use is released. + steps["synchronize"] = [&] { ebpf_epoch_synchronize(); }; + + // Switch to CPU 0 to start. + std::get>(steps["switch_cpu"])(0); + + // Setup the state for the test. + std::get>(steps["setup"])(); + + // Execute each step in the script. + for (const auto& step : script) { + std::vector tokens; + std::istringstream iss(step); + + // Tokenize the step extracting the command and arguments. + for (std::string token; std::getline(iss, token, ',');) { + tokens.push_back(token); + } + + // Verify that the command is valid. + REQUIRE(steps.find(tokens[0]) != steps.end()); + + try { + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v>) { + arg(); + } else if constexpr (std::is_same_v>) { + arg(std::stoi(tokens[1])); + } else if constexpr (std::is_same_v>) { + bool success = tokens[2] == "signaled"; + arg(std::stoi(tokens[1]), success); + } else { + REQUIRE(!("Invalid step:" + step).size()); + } + }, + steps[tokens[0]]); + } catch (const std::exception& e) { + FAIL("Failed to execute step: " + step + " with error: " + e.what()); + } + } + + // clean up the state. + std::get>(steps["clean up"])(); + } +} + +/** + * @brief This test verifies that a single epoch can be entered and exited and that work item that has been scheduled + * will run only after the epoch has been exited. + */ +TEST_CASE("epoch_single_epoch", "[platform]") +{ + _run_epoch_test_script({ + "enter_epoch,0", + "schedule,0", + "wait,0,not_signaled", + "exit_epoch,0", + "synchronize", + "wait,0,signaled", + }); +} + +/** + * @brief This test verifies that an epoch can be entered and exited on two different CPUs and that work items scheduled + * on one CPU will not run until the epoch has been exited on that CPU. + */ +TEST_CASE("epoch_cross_cpu_exit", "[platform]") +{ + _run_epoch_test_script({ + "enter_epoch,0", + "schedule,0", + "wait,0,not_signaled", + "switch_cpu,1", + "exit_epoch,0", + "synchronize", + "wait,0,signaled", + }); +} + +/** + * @brief This test verifies that epochs can be entered recursively and that work items scheduled on the inner epoch or + * outer epoch will not run until the outer epoch has been exited. + */ +TEST_CASE("epoch_nested_epoch", "[platform]") +{ + _run_epoch_test_script({ + "enter_epoch,0", + "schedule,0", + "wait,0,not_signaled", + "enter_epoch,1", + "schedule,1", + "wait,0,not_signaled", + "wait,1,not_signaled", + "exit_epoch,1", + "wait,0,not_signaled", + "wait,1,not_signaled", + "exit_epoch,0", + "synchronize", + "wait,0,signaled", + "wait,1,signaled", + }); +} + +/** + * @brief This test verifies that epochs can be entered and exited in a non-overlapping manner and that work items + * scheduled on the first epoch will not run until the first epoch has been exited and work items scheduled on the + * second epoch will not run until the second epoch has been exited. + */ +TEST_CASE("epoch_sequential_non_overlapping", "[platform]") +{ + _run_epoch_test_script({ + "enter_epoch,0", + "schedule,0", + "wait,0,not_signaled", + "exit_epoch,0", + "synchronize", + "wait,0,signaled", + "enter_epoch,1", + "schedule,1", + "wait,1,not_signaled", + "exit_epoch,1", + "synchronize", + "wait,1,signaled", + }); +} + +/** + * @brief This test verifies that epochs can be entered and exited in an overlapping manner and that work items + * scheduled on the first epoch will not run until the first epoch has been exited and work items scheduled on the + * second epoch will not run until the second epoch has been exited. In addition this checks that work items + * scheduled in the first epoch will run after the first epoch has been exited despite the second epoch being + * entered. This is to verify that the work items are not blocked by the second epoch. + */ +TEST_CASE("epoch_sequential_overlapping_epochs", "[platform]") +{ + _run_epoch_test_script({ + "enter_epoch,0", + "schedule,0", + "wait,0,not_signaled", + "enter_epoch,1", + "exit_epoch,0", + "schedule,1", + "wait,0,signaled", + "wait,1,not_signaled", + "exit_epoch,1", + "synchronize", + "wait,1,signaled", + }); +} static auto provider_function = []() { return EBPF_SUCCESS; }; From 771aa2026d7acedeac6883846c2a23b8f738a42c Mon Sep 17 00:00:00 2001 From: Alan Jowett Date: Mon, 2 Dec 2024 13:19:36 -0800 Subject: [PATCH 2/2] Set epoch for threads that join during CPU activation Signed-off-by: Alan Jowett --- libs/runtime/ebpf_epoch.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/libs/runtime/ebpf_epoch.c b/libs/runtime/ebpf_epoch.c index 7f69140d22..dd23f43157 100644 --- a/libs/runtime/ebpf_epoch.c +++ b/libs/runtime/ebpf_epoch.c @@ -905,6 +905,13 @@ _ebpf_epoch_messenger_propose_release_epoch( for (ebpf_list_entry_t* entry = cpu_entry->epoch_state_list.Flink; entry != &cpu_entry->epoch_state_list; entry = entry->Flink) { epoch_state = CONTAINING_RECORD(entry, ebpf_epoch_state_t, epoch_list_entry); + if (epoch_state->epoch == EBPF_EPOCH_UNKNOWN_EPOCH) { + ebpf_assert(cpu_entry->current_epoch != EBPF_EPOCH_UNKNOWN_EPOCH); + // Assume the thread is in the previous epoch. This is safe because the CPU was not active when the thread + // entered the epoch and the epoch doesn't advance while CPUs are activating. + epoch_state->epoch = cpu_entry->current_epoch - 1; + } + minimum_epoch = min(minimum_epoch, epoch_state->epoch); }