diff --git a/libs/api/ebpf_api.cpp b/libs/api/ebpf_api.cpp index 366e7b553b..5e01f45a02 100644 --- a/libs/api/ebpf_api.cpp +++ b/libs/api/ebpf_api.cpp @@ -4266,15 +4266,22 @@ _ebpf_ring_buffer_map_async_query_completion(_Inout_ void* completion_context) N break; } - int callback_result = subscription->sample_callback( - subscription->sample_callback_context, - const_cast(reinterpret_cast(record->data)), - record->header.length - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - if (callback_result != 0) { - break; + while (ebpf_ring_buffer_record_is_locked(record)) { + // The record is being written to by the producer. + // Wait for the producer to finish writing. + YieldProcessor(); } - consumer += record->header.length; + if (!ebpf_ring_buffer_record_is_discarded(record)) { + int callback_result = subscription->sample_callback( + subscription->sample_callback_context, + const_cast(reinterpret_cast(record->data)), + ebpf_ring_buffer_record_size(record) - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + if (callback_result != 0) { + break; + } + } + consumer += ebpf_ring_buffer_record_size(record); } } diff --git a/libs/runtime/ebpf_ring_buffer.c b/libs/runtime/ebpf_ring_buffer.c index 65ffbdb6bc..cc85d1a5d2 100644 --- a/libs/runtime/ebpf_ring_buffer.c +++ b/libs/runtime/ebpf_ring_buffer.c @@ -8,80 +8,16 @@ typedef struct _ebpf_ring_buffer { - ebpf_lock_t lock; - size_t length; - size_t consumer_offset; - size_t producer_offset; + int64_t length; + int64_t mask; + cxplat_spin_lock_t producer_lock; + cxplat_spin_lock_t consumer_lock; + volatile int64_t consumer_offset; + volatile int64_t producer_offset; uint8_t* shared_buffer; ebpf_ring_descriptor_t* ring_descriptor; } ebpf_ring_buffer_t; -inline static size_t -_ring_get_length(_In_ const ebpf_ring_buffer_t* ring) -{ - return ring->length; -} - -inline static size_t -_ring_get_producer_offset(_In_ const ebpf_ring_buffer_t* ring) -{ - return ring->producer_offset % ring->length; -} - -inline static size_t -_ring_get_consumer_offset(_In_ const ebpf_ring_buffer_t* ring) -{ - return ring->consumer_offset % ring->length; -} - -inline static size_t -_ring_get_used_capacity(_In_ const ebpf_ring_buffer_t* ring) -{ - ebpf_assert(ring->producer_offset >= ring->consumer_offset); - return ring->producer_offset - ring->consumer_offset; -} - -inline static void -_ring_advance_producer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length) -{ - ring->producer_offset += length; -} - -inline static void -_ring_advance_consumer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length) -{ - ring->consumer_offset += length; -} - -inline static _Ret_notnull_ ebpf_ring_buffer_record_t* -_ring_record_at_offset(_In_ const ebpf_ring_buffer_t* ring, size_t offset) -{ - return (ebpf_ring_buffer_record_t*)&ring->shared_buffer[offset % ring->length]; -} - -inline static _Ret_notnull_ ebpf_ring_buffer_record_t* -_ring_next_consumer_record(_In_ const ebpf_ring_buffer_t* ring) -{ - return _ring_record_at_offset(ring, _ring_get_consumer_offset(ring)); -} - -inline static _Ret_maybenull_ ebpf_ring_buffer_record_t* -_ring_buffer_acquire_record(_Inout_ ebpf_ring_buffer_t* ring, size_t requested_length) -{ - ebpf_ring_buffer_record_t* record = NULL; - requested_length += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data); - size_t remaining_space = ring->length - (ring->producer_offset - ring->consumer_offset); - - if (remaining_space > requested_length) { - record = _ring_record_at_offset(ring, _ring_get_producer_offset(ring)); - _ring_advance_producer_offset(ring, requested_length); - record->header.length = (uint32_t)requested_length; - record->header.locked = 1; - record->header.discarded = 0; - } - return record; -} - _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity) { @@ -100,6 +36,7 @@ ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity) } local_ring_buffer->length = capacity; + local_ring_buffer->mask = capacity - 1; local_ring_buffer->ring_descriptor = ebpf_allocate_ring_buffer_memory(capacity); if (!local_ring_buffer->ring_descriptor) { @@ -135,30 +72,23 @@ _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring, _In_reads_bytes_(length) uint8_t* data, size_t length) { ebpf_result_t result; - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length); + uint8_t* buffer; - if (record == NULL) { - result = EBPF_OUT_OF_SPACE; - goto Done; + result = ebpf_ring_buffer_reserve(ring, &buffer, length); + if (result != EBPF_SUCCESS) { + return result; } - record->header.discarded = 0; - record->header.locked = 0; - memcpy(record->data, data, length); - result = EBPF_SUCCESS; -Done: - ebpf_lock_unlock(&ring->lock, state); - return result; + memcpy(buffer, data, length); + + return ebpf_ring_buffer_submit(buffer); } void ebpf_ring_buffer_query(_In_ ebpf_ring_buffer_t* ring, _Out_ size_t* consumer, _Out_ size_t* producer) { - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - *consumer = ring->consumer_offset; - *producer = ring->producer_offset; - ebpf_lock_unlock(&ring->lock, state); + *consumer = (size_t)ReadAcquire64(&ring->consumer_offset); + *producer = (size_t)ReadAcquire64(&ring->producer_offset); } _Must_inspect_result_ ebpf_result_t @@ -166,47 +96,57 @@ ebpf_ring_buffer_return(_Inout_ ebpf_ring_buffer_t* ring, size_t length) { EBPF_LOG_ENTRY(); ebpf_result_t result; - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - size_t local_length = length; - size_t offset = _ring_get_consumer_offset(ring); - - if ((length > _ring_get_length(ring)) || length > _ring_get_used_capacity(ring)) { - EBPF_LOG_MESSAGE_UINT64_UINT64( - EBPF_TRACELOG_LEVEL_ERROR, - EBPF_TRACELOG_KEYWORD_MAP, - "ebpf_ring_buffer_return: Buffer too large", - ring->producer_offset, - ring->consumer_offset); - result = EBPF_INVALID_ARGUMENT; - goto Done; + KIRQL old_irql = KeGetCurrentIrql(); + if (old_irql < DISPATCH_LEVEL) { + KeRaiseIrqlToDpcLevel(); } - // Verify count. - while (local_length != 0) { - ebpf_ring_buffer_record_t* record = _ring_record_at_offset(ring, offset); - if (local_length < record->header.length) { - break; - } - offset += record->header.length; - local_length -= record->header.length; + cxplat_acquire_spin_lock_at_dpc_level(&ring->consumer_lock); + + int64_t consumer_offset = ReadNoFence64(&ring->consumer_offset); + int64_t producer_offset = ReadNoFence64(&ring->producer_offset); + int64_t effective_length = EBPF_PAD_8(length + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + + if (consumer_offset == producer_offset) { + result = EBPF_INVALID_ARGUMENT; + goto Exit; } - // Did it end on a record boundary? - if (local_length != 0) { - EBPF_LOG_MESSAGE_UINT64( - EBPF_TRACELOG_LEVEL_ERROR, - EBPF_TRACELOG_KEYWORD_MAP, - "ebpf_ring_buffer_return: Invalid buffer length", - local_length); + + int64_t remaining_space = producer_offset - consumer_offset; + + if (remaining_space > effective_length) { result = EBPF_INVALID_ARGUMENT; - goto Done; + goto Exit; } - _ring_advance_consumer_offset(ring, length); + remaining_space = effective_length; + + while (remaining_space > 0) { + ebpf_ring_buffer_record_t* record = + (ebpf_ring_buffer_record_t*)(ring->shared_buffer + (consumer_offset & ring->mask)); + + long size = ReadNoFence(&record->size); + size += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data); + size = EBPF_PAD_8(size); + + consumer_offset += size; + remaining_space -= size; + + record->size = 0; + } + + WriteNoFence64(&ring->consumer_offset, consumer_offset); + result = EBPF_SUCCESS; -Done: - ebpf_lock_unlock(&ring->lock, state); - EBPF_RETURN_RESULT(result); +Exit: + cxplat_release_spin_lock_from_dpc_level(&ring->consumer_lock); + + if (old_irql < DISPATCH_LEVEL) { + KeLowerIrql(old_irql); + } + + EBPF_RETURN_RESULT(EBPF_SUCCESS); } _Must_inspect_result_ ebpf_result_t @@ -225,55 +165,82 @@ ebpf_ring_buffer_reserve( _Inout_ ebpf_ring_buffer_t* ring, _Outptr_result_bytebuffer_(length) uint8_t** data, size_t length) { ebpf_result_t result; - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length); - if (record == NULL) { - result = EBPF_INVALID_ARGUMENT; - goto Done; + KIRQL old_irql; + int64_t producer_offset = ReadNoFence64(&ring->producer_offset); + int64_t consumer_offset = ReadNoFence64(&ring->consumer_offset); + int64_t remaining_space = ring->length - (producer_offset - consumer_offset); + size_t effective_length = EBPF_PAD_8(length + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + + if (remaining_space < (int64_t)effective_length) { + return EBPF_NO_MEMORY; + } + + old_irql = KeGetCurrentIrql(); + if (old_irql < DISPATCH_LEVEL) { + KeRaiseIrqlToDpcLevel(); + } + + cxplat_acquire_spin_lock_at_dpc_level(&ring->producer_lock); + + producer_offset = ReadNoFence64(&ring->producer_offset); + consumer_offset = ReadNoFence64(&ring->consumer_offset); + + remaining_space = ring->length - (producer_offset - consumer_offset); + + if (remaining_space < (int64_t)effective_length) { + result = EBPF_NO_MEMORY; + goto Exit; } - record->header.locked = 1; - MemoryBarrier(); + ebpf_ring_buffer_record_t* record = + (ebpf_ring_buffer_record_t*)(ring->shared_buffer + (producer_offset & ring->mask)); + WriteNoFence(&record->size, (long)length | EBPF_RING_BUFFER_RECORD_FLAG_LOCKED); *data = record->data; + + WriteNoFence64(&ring->producer_offset, producer_offset + effective_length); + result = EBPF_SUCCESS; -Done: - ebpf_lock_unlock(&ring->lock, state); + +Exit: + cxplat_release_spin_lock_from_dpc_level(&ring->producer_lock); + + if (old_irql < DISPATCH_LEVEL) { + KeLowerIrql(old_irql); + } + return result; } _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_submit(_Frees_ptr_opt_ uint8_t* data) { - if (!data) { + ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data); + long size = ReadAcquire(&record->size); + + if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) { return EBPF_INVALID_ARGUMENT; } - ebpf_ring_buffer_record_t* record = - (ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - - record->header.discarded = 0; - // Place a memory barrier here so that all prior writes to the record are completed before the record - // is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and - // the data in the record. - MemoryBarrier(); - record->header.locked = 0; + + size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED; + + WriteRelease(&record->size, size); return EBPF_SUCCESS; } _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_discard(_Frees_ptr_opt_ uint8_t* data) { - if (!data) { + ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data); + long size = ReadAcquire(&record->size); + + if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) { return EBPF_INVALID_ARGUMENT; } - ebpf_ring_buffer_record_t* record = - (ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - - record->header.discarded = 1; - // Place a memory barrier here so that all prior writes to the record are completed before the record - // is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and - // the data in the record. - MemoryBarrier(); - record->header.locked = 0; + + size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED; + size |= EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED; + + WriteRelease(&record->size, size); return EBPF_SUCCESS; } diff --git a/libs/runtime/ebpf_ring_buffer.h b/libs/runtime/ebpf_ring_buffer.h index 7d19fd3b82..dab3564157 100644 --- a/libs/runtime/ebpf_ring_buffer.h +++ b/libs/runtime/ebpf_ring_buffer.h @@ -38,6 +38,7 @@ ebpf_ring_buffer_destroy(_Frees_ptr_opt_ ebpf_ring_buffer_t* ring_buffer); * @retval EBPF_SUCCESS Successfully wrote record ring buffer. * @retval EBPF_OUT_OF_SPACE Unable to output to ring buffer due to inadequate space. */ +EBPF_INLINE_HINT _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring_buffer, _In_reads_bytes_(length) uint8_t* data, size_t length); diff --git a/libs/runtime/unit/platform_unit_test.cpp b/libs/runtime/unit/platform_unit_test.cpp index 5ec5849237..bbdf0a0561 100644 --- a/libs/runtime/unit/platform_unit_test.cpp +++ b/libs/runtime/unit/platform_unit_test.cpp @@ -1057,21 +1057,21 @@ TEST_CASE("ring_buffer_output", "[platform]") // Ring is not empty REQUIRE(producer != consumer); - REQUIRE(producer == data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + REQUIRE(producer == EBPF_PAD_8(data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data))); REQUIRE(consumer == 0); auto record = ebpf_ring_buffer_next_record(buffer, size, consumer, producer); REQUIRE(record != nullptr); - REQUIRE(record->header.length == data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + REQUIRE(record->size == data.size()); - REQUIRE(ebpf_ring_buffer_return(ring_buffer, record->header.length) == EBPF_SUCCESS); + REQUIRE(ebpf_ring_buffer_return(ring_buffer, record->size) == EBPF_SUCCESS); ebpf_ring_buffer_query(ring_buffer, &consumer, &producer); record = ebpf_ring_buffer_next_record(buffer, size, consumer, producer); REQUIRE(record == nullptr); REQUIRE(consumer == producer); - REQUIRE(producer == data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - REQUIRE(consumer == data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + REQUIRE(producer == EBPF_PAD_8(data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data))); + REQUIRE(consumer == EBPF_PAD_8(data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data))); data.resize(1023); while (ebpf_ring_buffer_output(ring_buffer, data.data(), data.size()) == EBPF_SUCCESS) { @@ -1134,7 +1134,7 @@ TEST_CASE("ring_buffer_reserve_submit_discard", "[platform]") } uint8_t* mem3 = nullptr; - REQUIRE(ebpf_ring_buffer_reserve(ring_buffer, &mem3, size + 1) == EBPF_INVALID_ARGUMENT); + REQUIRE(ebpf_ring_buffer_reserve(ring_buffer, &mem3, size + 1) == EBPF_NO_MEMORY); ebpf_ring_buffer_query(ring_buffer, &consumer, &producer); @@ -1146,6 +1146,106 @@ TEST_CASE("ring_buffer_reserve_submit_discard", "[platform]") ring_buffer = nullptr; } +TEST_CASE("ring_buffer_stress", "[platform]") +{ + _test_helper test_helper; + test_helper.initialize(); + ebpf_ring_buffer_t* ring_buffer; + + uint8_t* buffer; + std::vector data(10); + size_t size = 64 * 1024; + bool bad_record = false; + std::atomic a_records = 0; + std::atomic b_records = 0; + std::atomic stop{false}; + + REQUIRE(ebpf_ring_buffer_create(&ring_buffer, size) == EBPF_SUCCESS); + REQUIRE(ebpf_ring_buffer_map_buffer(ring_buffer, &buffer) == EBPF_SUCCESS); + + auto producer = [&](std::vector& data) { + while (!stop) { + if (ebpf_ring_buffer_output(ring_buffer, data.data(), data.size()) != EBPF_SUCCESS) { + YieldProcessor(); + } + } + }; + + std::vector data1(13, 'a'); + std::vector data2(23, 'b'); + auto consumer = [&]() { + size_t consumer; + size_t producer; + while (!stop) { + ebpf_ring_buffer_query(ring_buffer, &consumer, &producer); + if (consumer != producer) { + auto record = ebpf_ring_buffer_next_record(buffer, size, consumer, producer); + if (record != nullptr) { + volatile long actual_size = ReadAcquire(&record->size); + if (actual_size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED) { + YieldProcessor(); + continue; + } + switch (actual_size) { + case 13: + if (memcmp(record->data, data1.data(), data1.size()) != 0) { + bad_record = true; + } + a_records++; + break; + case 23: + if (memcmp(record->data, data2.data(), data2.size()) != 0) { + bad_record = true; + } + b_records++; + break; + default: + bad_record = true; + return; + break; + } + if (ebpf_ring_buffer_return(ring_buffer, actual_size) != EBPF_SUCCESS) { + bad_record = true; + break; + } + } + } else { + YieldProcessor(); + } + } + if (bad_record) { + return; + } + }; + + std::vector threads; + + auto producer_a = [&]() { producer(data1); }; + auto producer_b = [&]() { producer(data2); }; + + // Start consumer thread. + threads.emplace_back(std::thread(consumer)); + + // Start producer threads. + for (size_t i = 0; i < 10; i++) { + threads.emplace_back(std::thread(producer_a)); + threads.emplace_back(std::thread(producer_b)); + } + + std::this_thread::sleep_for(std::chrono::seconds(10)); + + stop = true; + + for (auto& thread : threads) { + thread.join(); + } + + REQUIRE(!bad_record); + REQUIRE((a_records + b_records) > 0); + + ebpf_ring_buffer_destroy(ring_buffer); +} + TEST_CASE("error codes", "[platform]") { for (ebpf_result_t result = EBPF_SUCCESS; result < EBPF_RESULT_COUNT; result = (ebpf_result_t)(result + 1)) { diff --git a/libs/shared/ebpf_ring_buffer_record.h b/libs/shared/ebpf_ring_buffer_record.h index 70c28a650a..13420f8ae6 100644 --- a/libs/shared/ebpf_ring_buffer_record.h +++ b/libs/shared/ebpf_ring_buffer_record.h @@ -5,15 +5,18 @@ CXPLAT_EXTERN_C_BEGIN +#define EBPF_RING_BUFFER_RECORD_FLAG_LOCKED_OFFSET 31 +#define EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED_OFFSET 30 +#define EBPF_RING_BUFFER_RECORD_FLAG_LOCKED (long)(0x1ul << EBPF_RING_BUFFER_RECORD_FLAG_LOCKED_OFFSET) +#define EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED (long)(0x1ul << EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED_OFFSET) + typedef struct _ebpf_ring_buffer_record { - struct - { - uint8_t locked : 1; - uint8_t discarded : 1; - uint32_t length : 30; - } header; - uint8_t data[1]; + long size; ///< Size of the record in bytes. The lower 30 bits are the size, the 31st bit is the locked flag, and + ///< the 32nd bit is the discarded flag. Next record starts at this + size + sizeof(size) + padding (to + ///< 8). + + uint8_t data[1]; ///< Data of the record. } ebpf_ring_buffer_record_t; /** @@ -36,4 +39,23 @@ ebpf_ring_buffer_next_record(_In_ const uint8_t* buffer, size_t buffer_length, s return (ebpf_ring_buffer_record_t*)(buffer + consumer % buffer_length); } +inline const bool +ebpf_ring_buffer_record_is_discarded(_In_ const ebpf_ring_buffer_record_t* record) +{ + return (ReadNoFence(&record->size) & EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED) != 0; +} + +inline const bool +ebpf_ring_buffer_record_is_locked(_In_ const ebpf_ring_buffer_record_t* record) +{ + return (ReadNoFence(&record->size) & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED) != 0; +} + +inline const size_t +ebpf_ring_buffer_record_size(_In_ const ebpf_ring_buffer_record_t* record) +{ + return (size_t)(ReadNoFence(&record->size) & + ~(EBPF_RING_BUFFER_RECORD_FLAG_LOCKED | EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED)); +} + CXPLAT_EXTERN_C_END