diff --git a/libcanard/canard.c b/libcanard/canard.c index dcf9bfd..098d362 100644 --- a/libcanard/canard.c +++ b/libcanard/canard.c @@ -72,9 +72,9 @@ #define INITIAL_TOGGLE_STATE true #define CONTAINER_OF(type, ptr, member) \ - ((type*) (((ptr) == NULL) ? NULL : (void*) (((char*) (ptr)) - offsetof(type, member)))) -#define CONST_CONTAINER_OF(type, ptr, member) \ ((const type*) (((ptr) == NULL) ? NULL : (const void*) (((const char*) (ptr)) - offsetof(type, member)))) +#define MUTABLE_CONTAINER_OF(type, ptr, member) \ + ((type*) (((ptr) == NULL) ? NULL : (void*) (((char*) (ptr)) - offsetof(type, member)))) /// Used for inserting new items into AVL trees. CANARD_PRIVATE struct CanardTreeNode* avlTrivialFactory(void* const user_reference) @@ -346,8 +346,8 @@ CANARD_PRIVATE int8_t txAVLPriorityPredicate( // { typedef struct CanardTxQueueItem TxItem; - const TxItem* const target = CONST_CONTAINER_OF(TxItem, user_reference, priority_base); - const TxItem* const other = CONST_CONTAINER_OF(TxItem, node, priority_base); + const TxItem* const target = CONTAINER_OF(TxItem, user_reference, priority_base); + const TxItem* const other = CONTAINER_OF(TxItem, node, priority_base); CANARD_ASSERT((target != NULL) && (other != NULL)); return (target->frame.extended_can_id >= other->frame.extended_can_id) ? +1 : -1; } @@ -362,8 +362,8 @@ CANARD_PRIVATE int8_t txAVLDeadlinePredicate( // { typedef struct CanardTxQueueItem TxItem; - const TxItem* const target = CONST_CONTAINER_OF(TxItem, user_reference, deadline_base); - const TxItem* const other = CONST_CONTAINER_OF(TxItem, node, deadline_base); + const TxItem* const target = CONTAINER_OF(TxItem, user_reference, deadline_base); + const TxItem* const other = CONTAINER_OF(TxItem, node, deadline_base); CANARD_ASSERT((target != NULL) && (other != NULL)); return (target->tx_deadline_usec >= other->tx_deadline_usec) ? +1 : -1; } @@ -592,6 +592,35 @@ CANARD_PRIVATE int32_t txPushMultiFrame(struct CanardTxQueue* const que, return out; } +/// Flushes expired transfers by comparing deadline timestamps of the pending transfers with the current time. +CANARD_PRIVATE void txFlushExpiredTransfers(struct CanardTxQueue* const que, + const struct CanardInstance* const ins, + const CanardMicrosecond now_usec) +{ + struct CanardTxQueueItem* tx_item = NULL; + while (NULL != (tx_item = MUTABLE_CONTAINER_OF( // + struct CanardTxQueueItem, + cavlFindExtremum(que->deadline_root, false), + deadline_base))) + { + if (now_usec <= tx_item->tx_deadline_usec) + { + // The queue is sorted by deadline, so we can stop here. + break; + } + + // All frames of the transfer are released at once b/c they all have the same deadline. + struct CanardTxQueueItem* tx_item_to_free = NULL; + while (NULL != (tx_item_to_free = canardTxPop(que, tx_item))) + { + tx_item = tx_item_to_free->next_in_transfer; + canardTxFree(que, ins, tx_item_to_free); + + que->stats.dropped_frames++; + } + } +} + // --------------------------------------------- RECEPTION --------------------------------------------- #define RX_SESSIONS_PER_SUBSCRIPTION (CANARD_NODE_ID_MAX + 1U) @@ -1052,7 +1081,7 @@ rxSubscriptionPredicateOnPortID(void* const user_reference, // NOSONAR Cavl API { CANARD_ASSERT((user_reference != NULL) && (node != NULL)); const CanardPortID sought = *((const CanardPortID*) user_reference); - const CanardPortID other = CONST_CONTAINER_OF(struct CanardRxSubscription, node, base)->port_id; + const CanardPortID other = CONTAINER_OF(struct CanardRxSubscription, node, base)->port_id; static const int8_t NegPos[2] = {-1, +1}; // Clang-Tidy mistakenly identifies a narrowing cast to int8_t here, which is incorrect. return (sought == other) ? 0 : NegPos[sought > other]; // NOLINT no narrowing conversion is taking place here @@ -1063,7 +1092,7 @@ rxSubscriptionPredicateOnStruct(void* const user_reference, // NOSONAR Cavl API const struct CanardTreeNode* const node) { return rxSubscriptionPredicateOnPortID( // - &CONTAINER_OF(struct CanardRxSubscription, user_reference, base)->port_id, + &MUTABLE_CONTAINER_OF(struct CanardRxSubscription, user_reference, base)->port_id, node); } @@ -1119,6 +1148,15 @@ int32_t canardTxPush(struct CanardTxQueue* const que, const struct CanardPayload payload, const CanardMicrosecond now_usec) { + // Before pushing payload (potentially in multiple frames), we need to try to flush any expired transfers. + // This is necessary to ensure that we don't exhaust the capacity of the queue by holding outdated frames. + // The flushing is done by comparing deadline timestamps of the pending transfers with the current time, + // which makes sense only if the current time is known (bigger than zero). + if (now_usec > 0) + { + txFlushExpiredTransfers(que, ins, now_usec); + } + (void) now_usec; int32_t out = -CANARD_ERROR_INVALID_ARGUMENT; @@ -1167,7 +1205,7 @@ struct CanardTxQueueItem* canardTxPeek(const struct CanardTxQueue* const que) // Paragraph 6.7.2.1.15 of the C standard says: // A pointer to a structure object, suitably converted, points to its initial member, and vice versa. struct CanardTreeNode* const priority_node = cavlFindExtremum(que->priority_root, false); - out = CONTAINER_OF(struct CanardTxQueueItem, priority_node, priority_base); + out = MUTABLE_CONTAINER_OF(struct CanardTxQueueItem, priority_node, priority_base); } return out; } @@ -1223,7 +1261,7 @@ int8_t canardRxAccept(struct CanardInstance* const ins, // This is the reason the function has a logarithmic time complexity of the number of subscriptions. // Note also that this one of the two variable-complexity operations in the RX pipeline; the other one // is memcpy(). Excepting these two cases, the entire RX pipeline contains neither loops nor recursion. - struct CanardRxSubscription* const sub = CONTAINER_OF( // + struct CanardRxSubscription* const sub = MUTABLE_CONTAINER_OF( // struct CanardRxSubscription, cavlSearch(&ins->rx_subscriptions[(size_t) model.transfer_kind], &model.port_id, @@ -1307,7 +1345,7 @@ int8_t canardRxUnsubscribe(struct CanardInstance* const ins, { CanardPortID port_id_mutable = port_id; - struct CanardRxSubscription* const sub = CONTAINER_OF( // + struct CanardRxSubscription* const sub = MUTABLE_CONTAINER_OF( // struct CanardRxSubscription, cavlSearch(&ins->rx_subscriptions[tk], &port_id_mutable, &rxSubscriptionPredicateOnPortID, NULL), base); @@ -1348,7 +1386,7 @@ int8_t canardRxGetSubscription(struct CanardInstance* const ins, { CanardPortID port_id_mutable = port_id; - struct CanardRxSubscription* const sub = CONTAINER_OF( // + struct CanardRxSubscription* const sub = MUTABLE_CONTAINER_OF( // struct CanardRxSubscription, cavlSearch(&ins->rx_subscriptions[tk], &port_id_mutable, &rxSubscriptionPredicateOnPortID, NULL), base); diff --git a/libcanard/canard.h b/libcanard/canard.h index d76bb7c..8ef472e 100644 --- a/libcanard/canard.h +++ b/libcanard/canard.h @@ -309,7 +309,7 @@ struct CanardMemoryResource /// Holds the statistics of a transmission queue. struct CanardTxQueueStats { - /// Holds number of dropped TX frames (due to timeout when `now > deadline`). + /// Holds number of dropped TX frames due to timeout (when `now > deadline`) or b/c of transmission failures. size_t dropped_frames; }; @@ -458,8 +458,9 @@ struct CanardInstance /// The time complexity models given in the API documentation are made on the assumption that the memory management /// functions have constant complexity O(1). /// - /// The following API functions may allocate memory: canardRxAccept(), canardTxPush(). - /// The following API functions may deallocate memory: canardRxAccept(), canardRxSubscribe(), canardRxUnsubscribe(). + /// The following API functions may allocate memory: canardTxPush(), canardRxAccept(). + /// The following API functions may deallocate memory: canardTxPush(), canardTxFree(), canardRxAccept(), + /// canardRxSubscribe(), canardRxUnsubscribe(). /// The exact memory requirement and usage model is specified for each function in its documentation. struct CanardMemoryResource memory; diff --git a/tests/helpers.hpp b/tests/helpers.hpp index 322d90d..94d2c26 100644 --- a/tests/helpers.hpp +++ b/tests/helpers.hpp @@ -300,11 +300,17 @@ class TxQueue const CanardMicrosecond now_usec = 0ULL) { checkInvariants(); - const auto size_before = que_.size; - const auto ret = canardTxPush(&que_, ins, transmission_deadline_usec, &metadata, payload, now_usec); - const auto num_added = static_cast(ret); - enforce((ret < 0) || ((size_before + num_added) == que_.size), "Unexpected size change after push"); + + const auto size_before = que_.size; + const auto dropped_before = que_.stats.dropped_frames; + + const auto ret = canardTxPush(&que_, ins, transmission_deadline_usec, &metadata, payload, now_usec); + const auto num_added = static_cast(ret); + + enforce((ret < 0) || ((size_before + num_added + dropped_before - que_.stats.dropped_frames) == que_.size), + "Unexpected size change after push"); checkInvariants(); + return ret; } diff --git a/tests/test_public_tx.cpp b/tests/test_public_tx.cpp index c852a2d..c45ab03 100644 --- a/tests/test_public_tx.cpp +++ b/tests/test_public_tx.cpp @@ -814,3 +814,119 @@ TEST_CASE("TxPayloadOwnership") } } } + +TEST_CASE("TxFlushExpired") +{ + helpers::Instance ins; + helpers::TxQueue que{2, CANARD_MTU_CAN_FD}; // Limit capacity at 2 frames. + + auto& tx_alloc = que.getAllocator(); + auto& ins_alloc = ins.getAllocator(); + + std::array payload{}; + std::iota(payload.begin(), payload.end(), 0U); + + REQUIRE(CANARD_NODE_ID_UNSET == ins.getNodeID()); + REQUIRE(CANARD_MTU_CAN_FD == que.getMTU()); + REQUIRE(0 == que.getSize()); + REQUIRE(0 == tx_alloc.getNumAllocatedFragments()); + REQUIRE(0 == ins_alloc.getNumAllocatedFragments()); + + CanardMicrosecond now = 10'000'000ULL; // 10s + const CanardMicrosecond deadline = 1'000'000ULL; // 1s + + CanardTransferMetadata meta{}; + + // 1. Push single-frame with padding, peek. @ 10s + { + meta.priority = CanardPriorityNominal; + meta.transfer_kind = CanardTransferKindMessage; + meta.port_id = 321; + meta.remote_node_id = CANARD_NODE_ID_UNSET; + meta.transfer_id = 21; + REQUIRE(1 == que.push(&ins.getInstance(), now + deadline, meta, {8, payload.data()}, now)); + REQUIRE(1 == que.getSize()); + REQUIRE(1 == tx_alloc.getNumAllocatedFragments()); + REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount()); + REQUIRE(1 == ins_alloc.getNumAllocatedFragments()); + REQUIRE(sizeof(CanardTxQueueItem) * 1 == ins_alloc.getTotalAllocatedAmount()); + + // Peek and check the payload. + CanardTxQueueItem* ti = que.peek(); + REQUIRE(nullptr != ti); // Make sure we get the same frame again. + REQUIRE(ti->frame.payload.size == 12); + REQUIRE(ti->frame.payload.allocated_size == 12); + REQUIRE(0 == std::memcmp(ti->frame.payload.data, payload.data(), 8)); + REQUIRE(ti->tx_deadline_usec == now + deadline); + REQUIRE(1 == tx_alloc.getNumAllocatedFragments()); + REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount()); + REQUIRE(1 == ins_alloc.getNumAllocatedFragments()); + REQUIRE(sizeof(CanardTxQueueItem) * 1 == ins_alloc.getTotalAllocatedAmount()); + + // Don't pop and free the item - we gonna flush it by the next push at 12s. + } + + now += 2 * deadline; // 10s -> 12s + + // 2. Push two-frames, peek. @ 12s (after 2x deadline) + // These 2 frames should still fit into the queue (with capacity 2) despite one expired frame still there.` + { + que.setMTU(8); + ins.setNodeID(42); + meta.transfer_id = 22; + REQUIRE(2 == que.push(&ins.getInstance(), now + deadline, meta, {8, payload.data()}, now)); + REQUIRE(2 == que.getSize()); + REQUIRE(2 == tx_alloc.getNumAllocatedFragments()); + REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount()); + REQUIRE(2 == ins_alloc.getNumAllocatedFragments()); + REQUIRE(sizeof(CanardTxQueueItem) * 2 == ins_alloc.getTotalAllocatedAmount()); + REQUIRE(1 == que.getInstance().stats.dropped_frames); + + // a) Peek and check the payload of the 1st frame + CanardTxQueueItem* ti = NULL; + { + ti = que.peek(); + REQUIRE(nullptr != ti); + REQUIRE(ti->frame.payload.size == 8); + REQUIRE(ti->frame.payload.allocated_size == 8); + REQUIRE(0 == std::memcmp(ti->frame.payload.data, payload.data(), 7)); + REQUIRE(ti->tx_deadline_usec == now + deadline); + REQUIRE(2 == tx_alloc.getNumAllocatedFragments()); + REQUIRE((8 + 4) == tx_alloc.getTotalAllocatedAmount()); + REQUIRE(2 == ins_alloc.getNumAllocatedFragments()); + REQUIRE(sizeof(CanardTxQueueItem) * 2 == ins_alloc.getTotalAllocatedAmount()); + + // Don't pop and free the item - we gonna flush it by the next push @ 14s. + } + // b) Check the payload of the 2nd frame + { + ti = ti->next_in_transfer; + REQUIRE(nullptr != ti); + REQUIRE(ti->frame.payload.size == 4); + REQUIRE(ti->frame.payload.allocated_size == 4); + REQUIRE(0 == std::memcmp(ti->frame.payload.data, payload.data() + 7, 1)); + REQUIRE(ti->tx_deadline_usec == now + deadline); + + // Don't pop and free the item - we gonna flush it by the next push @ 14s. + } + } + + now += 2 * deadline; // 12s -> 14s + + // 3. Push three-frames, peek. @ 14s (after another 2x deadline) + // These 3 frames should not fit into the queue (with capacity 2), + // but as a side effect, the expired frames (from push @ 12s) should be flushed as well. + { + meta.transfer_id = 23; + REQUIRE(-CANARD_ERROR_OUT_OF_MEMORY == + que.push(&ins.getInstance(), now + deadline, meta, {8 * 2, payload.data()}, now)); + REQUIRE(0 == que.getSize()); + REQUIRE(0 == tx_alloc.getNumAllocatedFragments()); + REQUIRE(0 == tx_alloc.getTotalAllocatedAmount()); + REQUIRE(0 == ins_alloc.getNumAllocatedFragments()); + REQUIRE(0 == ins_alloc.getTotalAllocatedAmount()); + REQUIRE(1 + 2 == que.getInstance().stats.dropped_frames); + + REQUIRE(nullptr == que.peek()); + } +}