Performance of the zlog sequencer service
In a previous post we discussed the design of zlog, our implementation of the CORFU distributed shared-log protocol on top of Ceph. A key component of the system is the sequencer server that reports the current log tail to clients. In this post we’ll discuss the implementation and performance of the sequencer in zlog.
The fast path of the sequencer server is simple. It contains an in-memory counter that is incremented when a client requests the next position in the log. Our current implementation receive requests from clients that are about 60 bytes in size (log identifier, epoch value), and responds with the current counter and epoch totaling about 16 bytes. The current design achieves approximately 180K requests per second with 4 server threads on the loopback interface, and approximately 100K requests per second over the 10 Gb Ethernet network on Google Compute Engine.
In contrast, the authors of the Tango paper report sequencer performance of approximately 500K requests per second over a 1 Gb Ethernet link. While 100K requests per second for our implementation is more than enough to get started with some interesting applications, it would be nice to reach parity with the results in the Tango paper.
In the remainder of this paper we’ll discuss the current implementation.
Protocol #
Our implementation uses Google Protocol Buffers to encode messages between
clients and the sequencer. Below is the specification for the messages. The
MSeqRequest
is sent by clients, and the MSeqReply
is the message type of
each server reply.
package zlog_proto;
option optimize_for = SPEED;
message MSeqRequest {
required uint64 epoch = 1;
required string pool = 2;
required string name = 3;
required bool next = 4;
}
message MSeqReply {
enum Status {
OK = 0;
INIT_LOG = 1;
STALE_EPOCH = 2;
}
required uint64 position = 1;
optional Status status = 2 [default = OK];
}
Next we’ll discuss the server and client designs.
Server #
When the server starts up it binds to a specific port --port
and starts a
set of worker threads --nthreads
. The server can also be daemonized but this
code has been removed for brevity. The LogManager
object tracks log metadata
and is used by the server.
int main(int argc, char* argv[])
{
int port;
std::string host;
int nthreads;
po::options_description desc("Allowed options");
desc.add_options()
("port", po::value<int>(&port)->required(), "Server port")
("nthreads", po::value<int>(&nthreads)->default_value(1), "Num threads")
("report-sec", po::value<int>(&report_sec)->default_value(0), "Time between rate reports")
("daemon,d", "Run in background")
;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (nthreads <= 0 || nthreads > 64)
nthreads = 1;
Server s(port, nthreads);
log_mgr = new LogManager();
s.run();
return 0;
}
The server design uses Boost ASIO and is based on the multi-threaded HTTP
server example available on the Boost ASIO website. The server contains a
single I/O service object, and we set the TCP_NODELAY
option for all new connections.
class Server {
public:
Server(short port, std::size_t nthreads)
: acceptor_(io_service_,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(), port)),
nthreads_(nthreads)
{
acceptor_.set_option(boost::asio::ip::tcp::no_delay(true));
start_accept();
}
When the server starts a set of threads all call the run
method on the I/O
service object.
void run() {
std::vector<boost::shared_ptr<boost::thread> > threads;
for (std::size_t i = 0; i < nthreads_; i++) {
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
for (std::size_t i = 0; i < threads.size(); ++i)
threads[i]->join();
}
New connections are handled by the state machine implemented in the Session
object that we’ll see next.
private:
void start_accept() {
Session* new_session = new Session(io_service_);
acceptor_.async_accept(new_session->socket(),
boost::bind(&Server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(Session* new_session,
const boost::system::error_code& error) {
if (!error)
new_session->start();
else
delete new_session;
start_accept();
}
boost::asio::io_service io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
std::size_t nthreads_;
};
Session #
The Session
object implements the sequencer protocol state machine. Each
connection starts with the client sending us a 32-bit message header.
void start() {
boost::asio::async_read(socket_,
boost::asio::buffer(buffer_, sizeof(uint32_t)),
boost::bind(&Session::handle_hdr, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
Next we sanity check the header and read the rest of the message.
void handle_hdr(const boost::system::error_code& err, size_t size) {
if (err) {
delete this;
return;
}
uint32_t msg_size = ntohl(*((uint32_t*)buffer_));
if (msg_size > sizeof(buffer_)) {
std::cerr << "message is too large" << std::endl;
delete this;
return;
}
boost::asio::async_read(socket_,
boost::asio::buffer(buffer_, msg_size),
boost::bind(&Session::handle_msg, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
Next we handle the part of the message with the interesting parts. We start by
re-setting a MSeqRequest
and initializing it with the received data.
void handle_msg(const boost::system::error_code& err, size_t size) {
if (err) {
delete this;
return;
}
req_.Clear();
if (!req_.ParseFromArray(buffer_, size)) {
std::cerr << "failed to parse message" << std::endl;
delete this;
return;
}
if (!req_.IsInitialized()) {
std::cerr << "received incomplete message" << std::endl;
delete this;
return;
}
Next we prepare a MSeqReply
structure and contact the LogManager
with the
information in the request. We’ll discuss the LogManager
in the next
section.
reply_.Clear();
uint64_t seq;
int ret = log_mgr->ReadSequence(req_.pool(), req_.name(),
req_.epoch(), req_.next(), &seq);
if (ret == -EAGAIN)
reply_.set_status(zlog_proto::MSeqReply::INIT_LOG);
else if (ret == -ERANGE)
reply_.set_status(zlog_proto::MSeqReply::STALE_EPOCH);
else
assert(!ret);
Once the LogManager
is finished, we record any errors that occurred, and set
the position reported by the LogManager
and send out the reply back to the
client.
reply_.set_position(seq);
assert(reply_.IsInitialized());
uint32_t msg_size = reply_.ByteSize();
assert(msg_size < sizeof(buffer_));
assert(reply_.SerializeToArray(buffer_, msg_size));
// scatter/gather buffers
std::vector<boost::asio::const_buffer> out;
be_msg_size_ = htonl(msg_size);
out.push_back(boost::asio::buffer(&be_msg_size_, sizeof(be_msg_size_)));
out.push_back(boost::asio::buffer(buffer_, msg_size));
boost::asio::async_write(socket_, out,
boost::bind(&Session::handle_reply, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
Next we’ll present the LogManager
component of the sequencer server.
Log Manager #
The LogManager
manages the mapping between log identifiers and the actual
tail counter. A tail is represented by a Sequence
object:
class Sequence {
public:
Sequence() : seq_(0) {}
Sequence(uint64_t seq) : seq_(seq) {}
uint64_t read() {
return seq_;
}
void inc() {
seq_++;
}
private:
uint64_t seq_;
};
A log is identified by the tuple (name, pool)
and stored in the LogManager
in the Log
struct that includes the current epoch
value for the log
instance:
struct Log {
Log() {}
Log(uint64_t pos, uint64_t epoch) : seq(pos), epoch(epoch) {}
Sequence seq;
uint64_t epoch;
};
The important part is when the log sequences are read and incremented. This is
handled by the ReadSequence
method called from the server Session
object
in response to a client request. The routine looks up the log in a std::map
and if it exists the sequence value is returned, and optionally incremented.
If the log isn’t found in the index the client receives an -EAGAIN
message
and a request to register the log is placed in a queue. A separate thread
handles the registration of a new log in the LogManager
because it is an
expensive process that would otherwise block other client requests.
uint64_t ReadSequence(const std::string& pool, const std::string& name,
uint64_t epoch, bool increment, uint64_t *seq) {
boost::unique_lock<boost::mutex> g(lock_);
std::map<std::pair<std::string, std::string>, Log>::iterator it =
logs_.find(std::make_pair(pool, name));
if (it == logs_.end()) {
QueueLogInit(pool, name);
return -EAGAIN;
}
if (epoch < it->second.epoch)
return -ERANGE;
if (increment)
it->second.seq.inc();
*seq = it->second.seq.read();
return 0;
}
The process of registering a new log includes the slow tail finding mechanism, but is omitted here because it isn’t part of the fast path. Next we’ll show the client side, which is very straight-forward.
Client #
A client starts by connecting to the sequencer server.
void SeqrClient::Connect() {
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::resolver::query query(
boost::asio::ip::tcp::v4(), host_.c_str(), port_);
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
boost::asio::connect(socket_, iterator);
}
Client requests are in the form of a MSeqRequest
object and include the
metadata for identifying the log.
int SeqrClient::CheckTail(uint64_t epoch, const std::string& pool,
const std::string& name, uint64_t *position, bool next) {
// fill in msg
zlog_proto::MSeqRequest req;
req.set_epoch(epoch);
req.set_name(name);
req.set_next(next);
req.set_pool(pool);
// serialize header and protobuf message
uint32_t msg_size = req.ByteSize();
uint32_t be_msg_size = htonl(msg_size);
uint32_t total_msg_size = msg_size + sizeof(be_msg_size);
assert(total_msg_size <= sizeof(buffer));
// add header
memcpy(buffer, &be_msg_size, sizeof(be_msg_size));
// add protobuf msg
assert(req.IsInitialized());
assert(req.SerializeToArray(buffer + sizeof(be_msg_size), msg_size));
// send
boost::asio::write(socket_, boost::asio::buffer(buffer, total_msg_size));
The write
call is blocking, so after it returns we read the response from
the same socket.
// get reply
boost::asio::read(socket_, boost::asio::buffer(&be_msg_size, sizeof(be_msg_size)));
msg_size = ntohl(be_msg_size);
assert(msg_size < sizeof(buffer));
boost::asio::read(socket_, boost::asio::buffer(buffer, msg_size));
// deserialize
zlog_proto::MSeqReply reply;
assert(reply.ParseFromArray(buffer, msg_size));
assert(reply.IsInitialized());
if (reply.status() == zlog_proto::MSeqReply::INIT_LOG)
return -EAGAIN;
else if (reply.status() == zlog_proto::MSeqReply::STALE_EPOCH)
return -ERANGE;
else {
assert(reply.status() == zlog_proto::MSeqReply::OK);
*position = reply.position();
}
return 0;
}
And that’s it. Performance is OK at 100K requests per second, but it would be nice to reach the reported 500K requests per second from the Tango paper. Using UDP rather than TCP may be a good thing to try next.
What’s Next #
The performance of the sequencer is sufficient for getting started with this project. The next things I’ll likely focus on are sequencer recovery, followed by looking into performance improvements.