Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

semaphore #16

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [broadcast_event](#feature_event)
- [mutex](#feature_mutex)
- [shared_mutex](#feature_mutex)
- [semaphores](#feature_semaphore)

**Utilities**:
- [join](#feature_join)
Expand Down Expand Up @@ -350,6 +351,31 @@ task<void> lock_shared_mutex(shared_mutex& mtx) {
}
```

### <a name="feature_semaphore"></a> Semaphores

Semaphores are also very similar to their standard library counterparts. There is a `counting_semaphore` variant, where you can specify the maximum value of the semaphore's counter, and there is a `binary_semaphore` variant that specifies the maximum value to one.

Usage of semaphores is similar to the standard library equivalent, but you need to use ˙co_await` to acquire the semaphore:

```c++
counting_semaphore sema(0, 16); // Counter may go up to 16, current value at 0.

// A coroutine
launch([](counting_semaphore& sema){
co_await sema;
std::cout << "This runs second." << std::endl;
}(sema));

// Another coroutine
launch([](counting_semaphore& sema){
std::cout << "This runs first." << std::endl;
sema.release();
}(sema));
```

Unlike the standard library semaphore, semaphores in `asyncpp` don't have an implementation defined upper limit for the counter so you can go up to `std::numeric_limits<ptrdiff_t>::max()`. In `asyncpp`, semaphores will also complain (via `std::terminate`) if you exceed the maximum value of the counter by releasing too many times.


### <a name="feature_join"></a> Join

To retrieve the result of a coroutine, we must `co_await` it, however, only a coroutine can `co_await` another one. Then how is it possible to wait for a coroutine's completion from a plain old function? For this purpose, `asnyncpp` provides `join`:
Expand Down
62 changes: 62 additions & 0 deletions include/asyncpp/semaphore.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#pragma once

#include "container/atomic_deque.hpp"
#include "promise.hpp"
#include "threading/spinlock.hpp"

#include <cassert>
#include <cstdint>


namespace asyncpp {


class counting_semaphore {
struct awaitable {
counting_semaphore* m_owner = nullptr;
resumable_promise* m_enclosing = nullptr;
awaitable* m_prev = nullptr;
awaitable* m_next = nullptr;

bool await_ready() const noexcept;

template <std::convertible_to<const resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> promise) noexcept {
assert(m_owner);
m_enclosing = &promise.promise();
return !m_owner->acquire(this);
}

constexpr void await_resume() const noexcept {}
};

public:
explicit counting_semaphore(ptrdiff_t current_counter = 0, ptrdiff_t max_counter = std::numeric_limits<ptrdiff_t>::max()) noexcept;

bool try_acquire() noexcept;
awaitable operator co_await() noexcept;
void release() noexcept;
ptrdiff_t max() const noexcept;

ptrdiff_t _debug_get_counter() const noexcept;
deque<awaitable, &awaitable::m_prev, &awaitable::m_next>& _debug_get_awaiters();
void _debug_clear();

private:
bool acquire(awaitable* waiting) noexcept;

private:
spinlock m_spinlock;
deque<awaitable, &awaitable::m_prev, &awaitable::m_next> m_awaiters;
ptrdiff_t m_counter = 0;
const ptrdiff_t m_max;
};


class binary_semaphore : public counting_semaphore {
public:
explicit binary_semaphore(bool is_free = false) : counting_semaphore(static_cast<ptrdiff_t>(is_free), 1) {}
};


} // namespace asyncpp
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ target_sources(asyncpp
shared_mutex.cpp
sleep.cpp
testing/interleaver.cpp
semaphore.cpp
)

target_link_libraries(asyncpp asyncpp-headers)
88 changes: 88 additions & 0 deletions src/semaphore.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include <asyncpp/semaphore.hpp>

#include <mutex>


namespace asyncpp {


bool counting_semaphore::awaitable::await_ready() const noexcept {
assert(m_owner);
return m_owner->try_acquire();
}


counting_semaphore::counting_semaphore(ptrdiff_t current_counter, ptrdiff_t max_counter) noexcept : m_counter(current_counter), m_max(max_counter) {
assert(0 <= current_counter && current_counter <= max_counter);
}


bool counting_semaphore::try_acquire() noexcept {
std::lock_guard lk(m_spinlock);
if (m_counter > 0) {
--m_counter;
return true;
}
return false;
}


counting_semaphore::awaitable counting_semaphore::operator co_await() noexcept {
return { this };
}


void counting_semaphore::release() noexcept {
std::unique_lock lk(m_spinlock);
++m_counter;
const auto resumed = m_awaiters.pop_front();
if (resumed) {
--m_counter;
lk.unlock();
assert(resumed->m_enclosing);
resumed->m_enclosing->resume();
}
else {
if (m_counter > m_max) {
std::terminate(); // You released the semaphore too many times.
}
}
}


ptrdiff_t counting_semaphore::max() const noexcept {
return m_max;
}


ptrdiff_t counting_semaphore::_debug_get_counter() const noexcept {
return m_counter;
}


deque<counting_semaphore::awaitable, &counting_semaphore::awaitable::m_prev, &counting_semaphore::awaitable::m_next>& counting_semaphore::_debug_get_awaiters() {
return m_awaiters;
}


void counting_semaphore::_debug_clear() {
m_awaiters.~deque();
new (&m_awaiters) decltype(m_awaiters);
m_counter = m_max;
}


bool counting_semaphore::acquire(awaitable* waiting) noexcept {
std::lock_guard lk(m_spinlock);
if (m_counter > 0) {
--m_counter;
return true;
}
else {
m_awaiters.push_back(waiting);
return false;
}
}


} // namespace asyncpp
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ target_sources(test
test_thread_pool.cpp
test_event.cpp
test_sleep.cpp
test_semaphore.cpp
testing/test_interleaver.cpp
helper_schedulers.hpp
monitor_task.hpp
Expand Down
24 changes: 12 additions & 12 deletions test/test_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
using namespace asyncpp;


struct [[nodiscard]] scope_clear {
~scope_clear() {
struct [[nodiscard]] mtx_scope_clear {
~mtx_scope_clear() {
mtx._debug_clear();
}
mutex& mtx;
Expand All @@ -27,7 +27,7 @@ static monitor_task lock(unique_lock<mutex>& lk) {

TEST_CASE("Mutex: try lock", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

REQUIRE(mtx.try_lock());
REQUIRE(mtx._debug_is_locked());
Expand All @@ -36,7 +36,7 @@ TEST_CASE("Mutex: try lock", "[Mutex]") {

TEST_CASE("Mutex: lock direct immediate", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

auto monitor = lock_exclusively(mtx);
REQUIRE(monitor.get_counters().done);
Expand All @@ -46,7 +46,7 @@ TEST_CASE("Mutex: lock direct immediate", "[Mutex]") {

TEST_CASE("Mutex: lock spurious immediate", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

auto monitor = []() -> monitor_task { co_return; }();
auto awaiter = mtx.exclusive();
Expand All @@ -57,7 +57,7 @@ TEST_CASE("Mutex: lock spurious immediate", "[Mutex]") {

TEST_CASE("Mutex: sequencial locking attempts", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

REQUIRE(mtx.try_lock());
REQUIRE(!mtx.try_lock());
Expand All @@ -66,7 +66,7 @@ TEST_CASE("Mutex: sequencial locking attempts", "[Mutex]") {

TEST_CASE("Mutex: unlock", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

SECTION("exclusive -> free") {
mtx.try_lock();
Expand All @@ -92,7 +92,7 @@ TEST_CASE("Mutex: unlock", "[Mutex]") {

TEST_CASE("Mutex: unique lock try_lock", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

unique_lock lk(mtx, std::defer_lock);
REQUIRE(!lk.owns_lock());
Expand All @@ -105,7 +105,7 @@ TEST_CASE("Mutex: unique lock try_lock", "[Mutex]") {

TEST_CASE("Mutex: unique lock await", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

unique_lock lk(mtx, std::defer_lock);
auto monitor = lock(lk);
Expand All @@ -117,7 +117,7 @@ TEST_CASE("Mutex: unique lock await", "[Mutex]") {

TEST_CASE("Mutex: unique lock start locked", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

auto monitor = [](mutex& mtx) -> monitor_task {
unique_lock lk(co_await mtx.exclusive());
Expand All @@ -131,7 +131,7 @@ TEST_CASE("Mutex: unique lock start locked", "[Mutex]") {

TEST_CASE("Mutex: unique lock unlock", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

unique_lock lk(mtx, std::defer_lock);
lk.try_lock();
Expand All @@ -143,7 +143,7 @@ TEST_CASE("Mutex: unique lock unlock", "[Mutex]") {

TEST_CASE("Mutex: unique lock destructor", "[Mutex]") {
mutex mtx;
scope_clear guard(mtx);
mtx_scope_clear guard(mtx);

{
unique_lock lk(mtx, std::defer_lock);
Expand Down
Loading
Loading