Simplifying Kafka for Ruby apps

Túlio Ornelas
Klarna Engineering
Published in
4 min readJan 24, 2017

--

As you can imagine, Klarna is not implemented with a single app and a database. We have a rich environment full of microservices, which means we can't find all the data we need in a single place. Kafka has become the data backbone for our services.

Kafka is not a traditional message broker, it doesn't have a lot of features (and responsibilities) of queues like RabbitMQ. It is a distributed log that delegates any higher order functionality to its consumers — who consumed what, unique messages, IDs, etc.

One of our projects required us to save events from different sources, sometimes calling a super slow API to enhance the data. We needed an exponential backoff to avoid overloading the API. Another requirement was to be able to restore the state of a single entity, which meant selecting all events related to this entity and processing them in order. We also had to consume messages exactly once, as consuming more than once would revert the state of the entity. My team decided to pack those requirements and others into a pluggable framework. We named it Phobos.

What is Phobos?

Phobos is a micro framework and library for applications dealing with Apache Kafka.

  • It wraps common behaviors needed by consumers and producers into an easy and convenient API (multithreading, exponential backoff, error handling)
  • It provides a CLI for starting and stopping a standalone application, ready to be used for production purposes

Why not just a Kafka client? You still need to write a lot of code to manage proper consuming and producing of messages. You need proper message routing, error handling, retries, back off and perhaps logging and instrumenting of the message management process. Finally, you also need to consider how to deploy your app and how to start it.

How does it work?

We wanted something with structure; something that was easy enough to become brainless after a while but flexible enough to cope with different projects. Phobos can be used in two ways: as a standalone application or to support Kafka features in your existing project — including Rails apps. It provides a CLI tool to run it.

Standalone apps have benefits such as individual deploys and smaller code bases — with only 3-steps we can have our consumers up and running.

Step 1: Run phobos init inside your project folder

$ phobos init
create config/phobos.yml
create phobos_boot.rb

phobos.yml is the configuration file and phobos_boot.rb is the starting point for the Phobos boot process, so this is where you might put something like require 'my_app'. An example with all available configurations can be found in the project repo (phobos.yml.example).

Step 2: Create and configure your handler

In Phobos apps listeners act as our consumers. A listener requires a handler (a Ruby class where you process incoming messages), a Kafka topic, and a Kafka group id. You write the handlers and Phobos makes sure to run them for you.

After creating the handler go to config/phobos.yml and configure it.

listeners:
- handler: MyHandler
topic: test
group_id: test-1

Step 3: Start your app with phobos start

Writing a handler is all you need to allow Phobos to work — it will take care of execution, retries and concurrency. After executing the start command you should see something like:

By default, the start command will look for the configuration file at config/phobos.yml and it will load the file phobos_boot.rb if it exists. It is possible to change both files, use -c for the configuration file and -b for the boot file.

Hooks

One way to keep the framework flexible is to expose hooks that you can use to build new features and behaviors. The Phobos handler has 3 hooks you can use.

All hooks are defined as class methods: start, stop and around_consume. Start and stop are called once per thread (defined by the option max_concurrency) and around_consume is called for every message.

For one of our projects it was super important to consume messages only once and to be able to reprocess a group of messages at any time. To solve this, we ended up saving the messages with a checksum in a Postgres database, and ignoring the messages with the same checksum when we consumed. This database acted as a cache, deleting the messages after a while. We used Postgres because it was super convenient to setup with Amazon RDS. We call this checkpoint.

We extracted the code as a new gem called phobos_db_checkpoint.

Phobos DB Checkpoint

The readme describes PhobosDBCheckpoint as a drop in replacement to Phobos::Handler, extending it with the following features:

  • Persists your Kafka events to an ActiveRecord compatible database
  • Ensures that your handler will consume messages only once
  • Allows your system to quickly reprocess events in case of failures

The gem provides a utility to setup your project for ActiveRecord migrations and database configurations. To use it, simply call: phobos_db_checkpoint init.

Whenever you need to "checkpoint" your events you return an ack with at least your event id. Ack accepts entity id, event time, event type and event version.

We have been running Phobos and its friends in production for more than a year and we are really happy with the results. The open source versions were extracted from our code base in July 2016. Since then our projects were migrated and are using the open source version.

My team always has discussions about extracting libraries from our code base — we truly believe this process helps us achieve better code quality. In the process of open sourcing it, we’ve invested a lot in documenting and testing it. We hope you find it useful! Also, If you have any ideas or different needs, send us an issue or a PR so that we can make it even better.

--

--