Seastar's semaphore utility
When first learning to work with the Seastar programming model it can be
surprising to discover that despite a single threaded execution model there are
plenty of cases where controlling concurrency is necessary. In this post we’ll
examine a few of these cases and look at the core primitive seastar::semaphore
and how it is used.
Consider the following code snippet which starts two background fibers where
each fiber references a shared index object. The (void)...
syntax means:
start this fiber but ignore the returned future. When fibers are started this
way we say that the fiber has been started and is running in the background.
static index_t index;
void start_background() {
(void)do_some_work()
.then([this](auto curr) {
index[curr] += 1;
return other_work();
})
.then([this](auto last) {
if (last) {
index[last] += 2;
}
// ...
});
}
// start two instances
start_background();
start_background();
Notice that the fiber (of which there are two concurrent instances) has a reference to a shared index object which each fiber instance mutates. Are there any race condition here? Since all continuations are executed to completion by a single thread the answer is: well, it depends on what sort of race you care about. There is no data race here that could cause corruption to the shared index object since each series of mutating statements will never be preempted.
However, that does not mean that we do not sometimes need to control concurrency. For example, notice above that the last two continuations both mutate the state. This represents a common scenario in which a high-level operation (the fiber) requires multiple fine-grained changes spread out over more than one continuation. But because the two fiber instances are independent, Seastar is free to interleave execution of the continuations, possibly leading to inconsistent behavior.
Here is a modified version of the same code where the entire fiber is run within a semaphore configured as a mutual exclusion device. This prevents any interleaving of instances of this fiber. Of course, control can be exerted at a finer granularity, but how that would be done would be entirely application specific, so we’ll keep it simple here.
seastar::semaphore sem{1};
void start_background() {
(void)seastar::with_semaphore(sem, 1, [this] {
return do_some_work()
.then([this](auto curr) {
index[curr] += 1;
return other_work();
})
.then([this](auto last) {
if (last) {
index[last] += 2;
}
// ...
});
});
}
Using a seastar::semaphore
as a mutex like we did above is a very common
case. However, semaphores can also be used to control resource usage and apply
backpressure. Take a look at the following snippet that accepts connections in
a loop, and handles each connection in a background fiber.
ss::repeat([this, socket] {
return socket.accept().then(
[this](auto conn) { (void)handle_connection(conn); });
});
This pattern is common in servers. And often the number of connections will be low enough that resource usage will not be a concern. But what if a large number of clients (or a faulty client) all connected? Nothing is preventing this loop from accepting so many connections that all of the system resources are exhausted. Semaphores are useful in scenarios like this for applying backpressure when resource usage exceeds a limit.
Here is an updated version of the example which uses a semaphore to restrict
the number concurrent connections being handled. The helper
seastar::get_units
attempts to get a single unit from the semaphore. If it
cannot, then the loop stops until a unit of the semaphore becomes available.
Otherwise, the acquired units are passed into the finally
block on the
connection handler. As we will see later, the units
are like an RAII object:
when the connection has been handled and the finally block destroys the units
object the acquired units are automatically returned to the semaphore allowing
a new connection to be accepted.
seastar::semaphore conn_limit{100};
ss::repeat([this, socket] {
return seastar::get_units(conn_limit, 1, [this](auto units) {
return socket.accept().then([this, u = std::move(units)](auto conn) {
(void)handle_connection(conn).finally([u = std::move(u)] {});
});
});
});
That’s it for the use cases and motivation. If you’re writing software with Seastar the semaphore is a utility that will be hard to avoid using. Personally I found it surprising that concurrency control would be such a big part of designing software with Seastar. But after observing over and over real world situations like those I’ve shown above, I’ve realized how elegant solutions can be when using a semaphore.
Semaphore internals #
The base type seastar::basic_semaphore
contains most of the semaphore
implementation, and is in my opinion, small and quite elegant. It contains only
three data members: a counter, the current exception if the semaphore is broken
(more on that later), and a wait list.
ssize_t _count;
std::exception_ptr _ex;
struct entry {
promise<> pr;
size_t nr;
};
expiring_fifo<entry, expiry_handler, clock> _wait_list;
Generally when using a Seastar semaphore the seastar::semaphore
type alias is
used which sets a default exception factory.
using semaphore = basic_semaphore<semaphore_default_exception_factory>;
When constructing a new semaphore only the initial count needs to be specified. This is how many units the semaphore is managing. With a value of one the semaphore will provide mutual exclusion, but you could count megabytes of memory, or initialize it to zero for other special use cases.
seastar::semaphore sem{1};
basic_semaphore::basic_semaphore(size_t count) : _count(count) {}
Callers consume semaphore resources using the wait(...)
interface. First let’s
take a look at the fast path. If there are enough available resources to satisfy
the request when wait
is invoked, the internal counter is decremented, and a
ready future is returned immediately. We’ll take a look at may_proceed
shortly, but it is roughly equivalent to checking if _count >= nr
is true.
future<> wait(time_point timeout, size_t nr = 1) {
if (may_proceed(nr)) {
_count -= nr;
return make_ready_future<>();
}
...
When there are not enough resources available, there is more work to be done. The behavior of waiting on a semaphore is that the caller will be returned a future that is set at a later time when then the resources have successfully been acquired, so the future and associated promise must be created and tracked.
The first thing that is done is to check if the semaphore is broken. A broken
semaphore is one in which a problem has occurred and it is no longer safe to
acquire the semaphore. This is often used in shutdown scenarios where waiters
are signaled to react to the exceptional case and not proceed with whatever
required the semaphore. In the case of wait()
it acts as a barrier,
preventing new callers from becoming waiters.
But when the semaphore is not broken, the caller becomes a waiter. A promise is created and stored on a wait list, and a corresponding future is returned to the caller.
if (_ex) {
return make_exception_future(_ex);
}
promise<> pr;
auto fut = pr.get_future();
_wait_list.push_back(entry(std::move(pr), nr), timeout);
return fut;
}
On the other side of the semaphore we find the signal(...)
interface for
releasing resources back to the semaphore. If the semaphore is broken the call
returns immediately. Since the broken state is a signal to waiters, there is no
need to raise an exception here since the caller is simply releasing its units.
void signal(size_t nr = 1) {
if (_ex) {
return;
}
Otherwise the internal counter is incremented, and the wait list is processed. Waiters are woken up in FIFO order as long as there are available resources remaining.
_count += nr;
while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
auto& x = _wait_list.front();
_count -= x.nr;
x.pr.set_value();
_wait_list.pop_front();
}
Since waiters are woken up in FIFO order a semaphore can also a be used as a
queue. And that’s pretty much all there is to the semaphore in Seastar. Now
let’s take a look back at the details of the may_proceed
function which we
glossed over earlier. Here it is again, used in wait()
to guard the fast path,
effectively equivalent to checking if _cout >= nr
is true.
future<> wait(time_point timeout, size_t nr = 1) {
if (may_proceed(nr)) {
_count -= nr;
return make_ready_future<>();
}
...
Below is the implementation of may_proceed()
which returns true if both of
the following conditions are met: enough resource units are available and
there are no waiters. For a mutex these two conditions are effectively
equivalent, but if the semaphore is protecting a resource like memory usage,
then the set of scenarios is more complicated. For example a caller may request
1 unit, and even though 2 units are available, prior fibers had requested more
units than were available and are currently waiting. In this case the caller
goes onto the wait list in FIFO order rather than jumping ahead of existing
waiters, potentially leading to a form of starvation.
bool may_proceed(size_t nr) const noexcept {
return has_available_units(nr) && _wait_list.empty();
}
Less common, but very useful, is try_wait
which doesn’t return a future, but
rather returns true if the lock was acquired, and false if the lock is already
held.
bool try_wait(size_t nr = 1) {
if (may_proceed(nr)) {
_count -= nr;
return true;
} else {
return false;
}
}
Try wait is useful when you have other productive work to do if you cannot
obtain the semaphore immediately. It can also be useful when acquiring multiple
semaphores and you want to avoid two fibers acquiring two semaphores in
opposite orders resulting in a deadlock. To avoid the deadlock, use try_wait
to acquire the second lock (of course there are other techniques for dealing
with this situation as well).
Semaphore timeout #
You may have noticed that semaphore::wait
accepts a timeout parameter. This
parameter can be used to specify a maximum amount of time a fiber should wait
when acquiring a semaphore. When a semaphore waiter exceeds its requested
timeout it is notified by receiving an exceptional future from wait
.
In the following example the first fiber holds the semaphore for 100 seconds, while the second fiber tries to acquire the semaphore with a 5 second timeout.
semaphore sem{1};
// fiber one
sem.wait(1, [] { return seastar::sleep(100s); });
// fiber two
sem.wait(5s, 1, [] {});
The second fiber will receive an exceptional future containing a
seastar::semaphore_timed_out
exception and may then proceed to handle the
timeout accordingly.
Broken semaphore #
Semaphores may be explicitly transitioned into a broken state by invoking the
broken
method and passing in an exception.
template <typename Exception>
void broken(const Exception& ex) {
_ex = std::make_exception_ptr(ex);
_count = 0;
while (!_wait_list.empty()) {
auto& x = _wait_list.front();
x.pr.set_exception(xp);
_wait_list.pop_front();
}
}
When a semaphore is marked broken all existing (and future) waiter futures resolve immediately as an exceptional future containing the exception provided when semaphore was broken. This is useful in shutdown scenarios as well as when a mutex can no longer guarantee consistency of state for some reason. It can be useful to use the exception to communicate primitive status information to waiters depending on why the semaphore was broken such as a network error or the initialization of a shutdown sequence.
Semaphore RAII #
The basic semaphore interface that we’ve seen so far (e.g. wait
and signal
)
can be challenging to use safely when we start considering complex return
paths and exception handling. For example if one return path forgets to signal a
binary semaphore then other fibers can no longer make progress. If that path
happens to be rarely executed, it can lead to difficult to debug problems.
To help with this there are several higher level semaphore interfaces that
automatically signal a semaphore in all return paths. One such example is the
with_semaphore
function.
return with_semaphore(_sem, 1, [] { throw std::runtime_error("oops"); });
The with_semaphore
function automatically waits on the requested resources,
and then releases them when the continuation is complete, handling all cases
correctly (e.g. throwing an exception in a return path). Here is the
implementation:
template<typename Func>
futurize_t<std::result_of_t<Func()>>
with_semaphore(semaphore& sem, size_t units, Func&& func) {
return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
return futurize_apply(std::forward<Func>(func)).finally([units = std::move(units)] {});
});
}
The first thing that happens is that get_units
waits on the semaphore and
returns an object that signals the semaphore in its destructor. We’ll talk about
get_units
more below, but it is effectively an
RAII device for the
semaphore.
Next, the continuation that is passed in is wrapped using futurize_apply
which safely converts any thrown exception into an exceptional future. And to
complete the implementation, the finally
continuation holds ownership on the
RAII units object until the entire fiber completes at which point the
destructor of the RAII units object signals the underlying semaphore.
For the most part I’ve found that with_semaphore
is sufficient for a large
number of use cases and I rarely need to interact with the semaphore directly.
But sometimes more control is needed, which leads us to the flexible get_units
interface for creating an RAII units object.
Here is the gist of semaphore_units
. First, it holds a pointer to a semaphore
and a integer value (the number of units).
class semaphore_units {
semaphore *_sem;
size_t _n;
In the destructor the underlying semaphore is signaled with the stored value.
~semaphore_units() noexcept {
if (_n) {
_sem->signal(_n);
}
}
The get_units
convenience function acts as a factory, hiding details like
waiting on the semaphore, and constructing the units instance once the
semaphore is acquired.
template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
future<semaphore_units<ExceptionFactory, Clock>>
get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
return sem.wait(units).then([&sem, units] {
return semaphore_units<ExceptionFactory, Clock>{ sem, units };
});
}
Once you acquire a units instance then semaphore resources can be moved around arbitrarily in your application. There is even an interesting interface for splitting units, though I have yet to find a need for this. I can see it being useful when managing resources like memory reservations.
semaphore_units split(size_t units) {
if (units > _n) {
throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
}
_n -= units;
return semaphore_units(_sem, units);
}
That covers most of the semaphore, and many of its common uses. There are some other
interfaces worth exploring, like consume
which forcefully takes resources away
from a semaphore, as well as other variants of the interfaces we’ve introduced.