Seastar's semaphore utility

17 May 2021

When first learning to work with the Seastar programming model it can be surprising to discover that despite a single threaded execution model there are plenty of cases where controlling concurrency is necessary. In this post we’ll examine a few of these cases and look at the core primitive seastar::semaphore and how it is used.

Consider the following code snippet which starts two background fibers where each fiber references a shared index object. The (void)... syntax means: start this fiber but ignore the returned future. When fibers are started this way we say that the fiber has been started and is running in the background.

static index_t index;

void start_background() {
    (void)do_some_work()
      .then([this](auto curr) {
          index[curr] += 1;
          return other_work();
      })
      .then([this](auto last) {
          if (last) {
              index[last] += 2;
          }
          // ...
      });
}

// start two instances
start_background();
start_background();

Notice that the fiber (of which there are two concurrent instances) has a reference to a shared index object which each fiber instance mutates. Are there any race condition here? Since all continuations are executed to completion by a single thread the answer is: well, it depends on what sort of race you care about. There is no data race here that could cause corruption to the shared index object since each series of mutating statements will never be preempted.

However, that does not mean that we do not sometimes need to control concurrency. For example, notice above that the last two continuations both mutate the state. This represents a common scenario in which a high-level operation (the fiber) requires multiple fine-grained changes spread out over more than one continuation. But because the two fiber instances are independent, Seastar is free to interleave execution of the continuations, possibly leading to inconsistent behavior.

Here is a modified version of the same code where the entire fiber is run within a semaphore configured as a mutual exclusion device. This prevents any interleaving of instances of this fiber. Of course, control can be exerted at a finer granularity, but how that would be done would be entirely application specific, so we’ll keep it simple here.

seastar::semaphore sem{1};

void start_background() {
    (void)seastar::with_semaphore(sem, 1, [this] {
        return do_some_work()
          .then([this](auto curr) {
              index[curr] += 1;
              return other_work();
          })
          .then([this](auto last) {
              if (last) {
                  index[last] += 2;
              }
              // ...
          });
    });
}

Using a seastar::semaphore as a mutex like we did above is a very common case. However, semaphores can also be used to control resource usage and apply backpressure. Take a look at the following snippet that accepts connections in a loop, and handles each connection in a background fiber.

    ss::repeat([this, socket] {
        return socket.accept().then(
          [this](auto conn) { (void)handle_connection(conn); });
    });

This pattern is common in servers. And often the number of connections will be low enough that resource usage will not be a concern. But what if a large number of clients (or a faulty client) all connected? Nothing is preventing this loop from accepting so many connections that all of the system resources are exhausted. Semaphores are useful in scenarios like this for applying backpressure when resource usage exceeds a limit.

Here is an updated version of the example which uses a semaphore to restrict the number concurrent connections being handled. The helper seastar::get_units attempts to get a single unit from the semaphore. If it cannot, then the loop stops until a unit of the semaphore becomes available. Otherwise, the acquired units are passed into the finally block on the connection handler. As we will see later, the units are like an RAII object: when the connection has been handled and the finally block destroys the units object the acquired units are automatically returned to the semaphore allowing a new connection to be accepted.

    seastar::semaphore conn_limit{100};

    ss::repeat([this, socket] {
        return seastar::get_units(conn_limit, 1, [this](auto units) {
            return socket.accept().then([this, u = std::move(units)](auto conn) {
                (void)handle_connection(conn).finally([u = std::move(u)] {});
            });
        });
    });

That’s it for the use cases and motivation. If you’re writing software with Seastar the semaphore is a utility that will be hard to avoid using. Personally I found it surprising that concurrency control would be such a big part of designing software with Seastar. But after observing over and over real world situations like those I’ve shown above, I’ve realized how elegant solutions can be when using a semaphore.

Semaphore internals

The base type seastar::basic_semaphore contains most of the semaphore implementation, and is in my opinion, small and quite elegant. It contains only three data members: a counter, the current exception if the semaphore is broken (more on that later), and a wait list.

ssize_t _count;
std::exception_ptr _ex;

struct entry {
    promise<> pr;
    size_t nr;
};

expiring_fifo<entry, expiry_handler, clock> _wait_list;

Generally when using a Seastar semaphore the seastar::semaphore type alias is used which sets a default exception factory.

using semaphore = basic_semaphore<semaphore_default_exception_factory>;

When constructing a new semaphore only the initial count needs to be specified. This is how many units the semaphore is managing. With a value of one the semaphore will provide mutual exclusion, but you could count megabytes of memory, or initialize it to zero for other special use cases.

seastar::semaphore sem{1};

basic_semaphore::basic_semaphore(size_t count) : _count(count) {}

Callers consume semaphore resources using the wait(...) interface. First let’s take a look at the fast path. If there are enough available resources to satisfy the request when wait is invoked, the internal counter is decremented, and a ready future is returned immediately. We’ll take a look at may_proceed shortly, but it is roughly equivalent to checking if _count >= nr is true.

future<> wait(time_point timeout, size_t nr = 1) {
    if (may_proceed(nr)) {
        _count -= nr;
        return make_ready_future<>();
    }
    ...

When there are not enough resources available, there is more work to be done. The behavior of waiting on a semaphore is that the caller will be returned a future that is set at a later time when then the resources have successfully been acquired, so the future and associated promise must be created and tracked.

The first thing that is done is to check if the semaphore is broken. A broken semaphore is one in which a problem has occurred and it is no longer safe to acquire the semaphore. This is often used in shutdown scenarios where waiters are signaled to react to the exceptional case and not proceed with whatever required the semaphore. In the case of wait() it acts as a barrier, preventing new callers from becoming waiters.

But when the semaphore is not broken, the caller becomes a waiter. A promise is created and stored on a wait list, and a corresponding future is returned to the caller.

    if (_ex) {
        return make_exception_future(_ex);
    }
    promise<> pr;
    auto fut = pr.get_future();
    _wait_list.push_back(entry(std::move(pr), nr), timeout);
    return fut;
}

On the other side of the semaphore we find the signal(...) interface for releasing resources back to the semaphore. If the semaphore is broken the call returns immediately. Since the broken state is a signal to waiters, there is no need to raise an exception here since the caller is simply releasing its units.

void signal(size_t nr = 1) {
    if (_ex) {
        return;
    }

Otherwise the internal counter is incremented, and the wait list is processed. Waiters are woken up in FIFO order as long as there are available resources remaining.

    _count += nr;
    while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
        auto& x = _wait_list.front();
        _count -= x.nr;
        x.pr.set_value();
        _wait_list.pop_front();
    }

Since waiters are woken up in FIFO order a semaphore can also a be used as a queue. And that’s pretty much all there is to the semaphore in Seastar. Now let’s take a look back at the details of the may_proceed function which we glossed over earlier. Here it is again, used in wait() to guard the fast path, effectively equivalent to checking if _cout >= nr is true.

future<> wait(time_point timeout, size_t nr = 1) {
    if (may_proceed(nr)) {
        _count -= nr;
        return make_ready_future<>();
    }
    ...

Below is the implementation of may_proceed() which returns true if both of the following conditions are met: enough resource units are available and there are no waiters. For a mutex these two conditions are effectively equivalent, but if the semaphore is protecting a resource like memory usage, then the set of scenarios is more complicated. For example a caller may request 1 unit, and even though 2 units are available, prior fibers had requested more units than were available and are currently waiting. In this case the caller goes onto the wait list in FIFO order rather than jumping ahead of existing waiters, potentially leading to a form of starvation.

bool may_proceed(size_t nr) const noexcept {
    return has_available_units(nr) && _wait_list.empty();
}

Less common, but very useful, is try_wait which doesn’t return a future, but rather returns true if the lock was acquired, and false if the lock is already held.

bool try_wait(size_t nr = 1) {
    if (may_proceed(nr)) {
        _count -= nr;
        return true;
    } else {
        return false;
    }
}

Try wait is useful when you have other productive work to do if you cannot obtain the semaphore immediately. It can also be useful when acquiring multiple semaphores and you want to avoid two fibers acquiring two semaphores in opposite orders resulting in a deadlock. To avoid the deadlock, use try_wait to acquire the second lock (of course there are other techniques for dealing with this situation as well).

Semaphore timeout

You may have noticed that semaphore::wait accepts a timeout parameter. This parameter can be used to specify a maximum amount of time a fiber should wait when acquiring a semaphore. When a semaphore waiter exceeds its requested timeout it is notified by receiving an exceptional future from wait.

In the following example the first fiber holds the semaphore for 100 seconds, while the second fiber tries to acquire the semaphore with a 5 second timeout.

semaphore sem{1};

// fiber one
sem.wait(1, [] { return seastar::sleep(100s); });

// fiber two
sem.wait(5s, 1, [] {});

The second fiber will receive an exceptional future containing a seastar::semaphore_timed_out exception and may then proceed to handle the timeout accordingly.

Broken semaphore

Semaphores may be explicitly transitioned into a broken state by invoking the broken method and passing in an exception.

template <typename Exception>
void broken(const Exception& ex) {
    _ex = std::make_exception_ptr(ex);
    _count = 0;
    while (!_wait_list.empty()) {
        auto& x = _wait_list.front();
        x.pr.set_exception(xp);
        _wait_list.pop_front();
    }
}

When a semaphore is marked broken all existing (and future) waiter futures resolve immediately as an exceptional future containing the exception provided when semaphore was broken. This is useful in shutdown scenarios as well as when a mutex can no longer guarantee consistency of state for some reason. It can be useful to use the exception to communicate primitive status information to waiters depending on why the semaphore was broken such as a network error or the initialization of a shutdown sequence.

Semaphore RAII

The basic semaphore interface that we’ve seen so far (e.g. wait and signal) can be challenging to use safely when we start considering complex return paths and exception handling. For example if one return path forgets to signal a binary semaphore then other fibers can no longer make progress. If that path happens to be rarely executed, it can lead to difficult to debug problems.

To help with this there are several higher level semaphore interfaces that automatically signal a semaphore in all return paths. One such example is the with_semaphore function.

return with_semaphore(_sem, 1, [] { throw std::runtime_error("oops"); });

The with_semaphore function automatically waits on the requested resources, and then releases them when the continuation is complete, handling all cases correctly (e.g. throwing an exception in a return path). Here is the implementation:

template<typename Func>
futurize_t<std::result_of_t<Func()>>
with_semaphore(semaphore& sem, size_t units, Func&& func) {
    return get_units(sem, units).then([func = std::forward<Func>(func)] (auto units) mutable {
        return futurize_apply(std::forward<Func>(func)).finally([units = std::move(units)] {});
    });
}

The first thing that happens is that get_units waits on the semaphore and returns an object that signals the semaphore in its destructor. We’ll talk about get_units more below, but it is effectively an RAII device for the semaphore.

Next, the continuation that is passed in is wrapped using futurize_apply which safely converts any thrown exception into an exceptional future. And to complete the implementation, the finally continuation holds ownership on the RAII units object until the entire fiber completes at which point the destructor of the RAII units object signals the underlying semaphore.

For the most part I’ve found that with_semaphore is sufficient for a large number of use cases and I rarely need to interact with the semaphore directly. But sometimes more control is needed, which leads us to the flexible get_units interface for creating an RAII units object.

Here is the gist of semaphore_units. First, it holds a pointer to a semaphore and a integer value (the number of units).

class semaphore_units {
    semaphore *_sem;
    size_t _n;

In the destructor the underlying semaphore is signaled with the stored value.

    ~semaphore_units() noexcept {
        if (_n) {
            _sem->signal(_n);
        }
    }

The get_units convenience function acts as a factory, hiding details like waiting on the semaphore, and constructing the units instance once the semaphore is acquired.

template<typename ExceptionFactory, typename Clock = typename timer<>::clock>
future<semaphore_units<ExceptionFactory, Clock>>
get_units(basic_semaphore<ExceptionFactory, Clock>& sem, size_t units) noexcept {
    return sem.wait(units).then([&sem, units] {
        return semaphore_units<ExceptionFactory, Clock>{ sem, units };
    });
}

Once you acquire a units instance then semaphore resources can be moved around arbitrarily in your application. There is even an interesting interface for splitting units, though I have yet to find a need for this. I can see it being useful when managing resources like memory reservations.

    semaphore_units split(size_t units) {
        if (units > _n) {
            throw std::invalid_argument("Cannot take more units than those protected by the semaphore");
        }
        _n -= units;
        return semaphore_units(_sem, units);
    }

That covers most of the semaphore, and many of its common uses. There are some other interfaces worth exploring, like consume which forcefully takes resources away from a semaphore, as well as other variants of the interfaces we’ve introduced.