Large scale streaming with Kafka
I had an opportunity to work on an interesting infrastructure challenge. It goes something like this: we need to be able to persist incoming data stream which consists of approximately 200 thousand messages/second, we also need to guarantee data availability and redundancy. This is a typical scale of data I used to deal with at Chartbeat on a daily basis. When working with such high traffic you're most likely going to run into the questions to which you might not know the answers right away.
- How many servers do we need to handle such traffic?
- Do we need to store the data and how can we do that?
- If we must store the data, for how long are we going to need access to it?
- How much the new infrastructure is going cost us?
These are just a few questions that you will have to answer in order to pick the right tools for the job. In this post I will try to provide the answers to some of these questions and also show you a sample infrastructure setup that can be used to handle large amounts of traffic while abiding our requirements.
- Apache Kafka is great for high write data where data redundancy and persistence is a requirement.
- You should always have compression enabled in Kafka. You can save lots of money on network traffic.
- Custom C++ http-to-kafka adapter to accommodate performance needs. It uses libevent and librdkafka.
First of all, let me give you a quick background about the data and where it is coming from. These 200K messages/second that I mentioned before are HTTP requests coming from all over the world. This is typical beacon traffic that all web analytics companies collect. The requests contain information about what people are doing on a web page. The exact details of the message contents are not important. The average request size is somewhere around 0.8KB. That's almost 150MB of data every second.
We can consider this HTTP traffic to be an event stream. Each request contains some metrics which change over time. The order of these events is also important. When it is presented like this, the entire beacon traffic could be pushed onto a queue and processed one by one. Queues provide message ordering, sometimes they also have data persistence and replication. Apache Kafka is one of those systems and it has everything we need.
Queueing systems such as RabbitMQ and ZeroMQ either cannot handle so many writes, or they sacrifice persistence to achieve better write performance. Similarly, databases have the same limitations, they are usually optimized to work well in some workflows but not others. In our case, we want to write lots of data, and be able to read whatever was written in the same order.
Apache Kafka is exactly the solution to our problem. Kafka is a distributed commit log. The design, when you think about it, is pretty simple. It's a bunch of append-only files that get replicated over your Kafka cluster. Append-only files provide very huge write throughput even on the commodity hardware since you are saving disk seek cycles when you need to write data; we also get data persistence for free and replication support.
Data Distribution in Apache Kafka
Understanding how Kafka distributes data is crucial for choosing right servers to run your Kafka cluster.
In Kafka there's a notion of topic. Topics keep messages of similar type and are split into partitions. Replication is set up on a per-partition basis and the larger number of partitions provide better concurrency. If you are familiar with other queue systems you can think of a topic as a queue. However, this queue lives on a disk and it provides at-least-once message delivery guarantees. This means that you can receive the same message once or more, the same way you get data when tailing a file more than once.
Each partition gets assigned a leader node which receives write requests and replica nodes are reading from. For example, we have a topic called tweets with 5 partitions and 3 node cluster. A typical topic configuration might look like this.
partition: 0, leader: 1 partition: 1, leader: 2 partition: 2, leader: 3 partition: 3, leader: 1 partition: 4, leader: 2
In the above example, node 1 and node 2 will have 2/5ths of the data, and node 3 will have 1/5th of the data. Let's say we set replication factor to 2; now topic configuration might look something similar to the following.
partition: 0, leader: 1, replicas: 1,2 partition: 1, leader: 2, replicas: 2,3 partition: 2, leader: 3, replicas: 3,1 partition: 3, leader: 1, replicas: 1,2 partition: 4, leader: 2, replicas: 2,3
Leader node is always part of the replica set, the second replica can be chosen randomly. Ideally, you would want all nodes to have about the same number of partitions to get an even load across all nodes. Here node 1 has 3 partitions (0, 1, and 2), node 2 has 4, and node 3 has 3.
Estimating Storage Needs
We have a rough idea of how data is distributed across the Kafka cluster. Knowing the number of nodes in the cluster, the number of partitions and replication factor we can easily estimate how much data we can store per node. Let's say there are 5 nodes with 80GB hard drives, we have a topic with 10 partitions and the replication factor of 3.
- number of partitions per node:
10 partitions * 3 replicas / 5 servers = 6 partitions per server.
- max amount of data a partition can hold:
80G / 6 = 13G.
13G per partition is a theoretical max for the above example. It's always a good idea to have some extra disk buffer for data spikes or when Kafka cluster configuration changes. When the cluster configuration changes due to node failures, Kafka will automatically reelect new leaders for partitions that went down. Therefore you can see increased network load on some servers because more data is being transferred among producers and consumers.
The streaming infrastructure consists of two main components: front servers and Kafka cluster. Front servers receive beacon traffic and there is a good bunch of them for availability and performance reasons. Front server responsibility, besides a few other ones, is to push data to the Kafka cluster.
There is no off-the-shelf solution to get HTTP traffic into Kafka. Kafka is a pretty basic system and it is user's responsibility to figure out how to partition data or if it needs to be pre-processed before it reaches Kafka. We are doing both of these things. We generate shard key from the HTTP message and we are also converting HTTP messages to Google's Protocol Buffer messages.
By converting HTTP messages to protocol buffers we don't have to pre-process the data in other parts of the system. This means that the overall system design becomes simpler and any new applications that consume data from Kafka will need to do less processing and type checking. Protocol buffers also provide a small reduction in message size, which is important when dealing with large amounts of data.
The http-to-kafka adapter is written in C++ which makes it really easy to integrate protocol buffer code. It uses libevent 2.x for its event loop and HTTP server. For handling Kafka communication we use librdkafka. This is a multi-threaded library and it provides asynchronous reads and writes. Such a feature is very useful because we don't want to block main thread, which runs HTTP server, while sending data to Kafka. HTTP server processes the data and hands it off to librdkafka where the data is pushed to Kafka automatically in the background. librdkafka can also automatically discover new nodes and handle connection failures. A single instance of this adapter can easily handle about 8K requests/s.
To handle the aforementioned traffic we chose 10 m3.xlarge servers from Amazon Web Services. These servers are only for running Kafka cluster. There is one topic for accepting beacon traffic and it has 512 partitions. We run cluster with replication factor set to 3; this means that data lives on 3 servers at any given time, assuming cluster is in a healthy state.
One important Kafka configuration option is data compression. Neha has a nice piece on compression in kafka. All I'm going to say here is that you definitely want to go with Snappy compression. GZip compression for our purposes was too slow on the server side which caused us to hit librdkafka outgoing queue limits and start losing messages. This might not be the case all the time, but it will most likely happen if you are dealing with very high message rates. Turning on Snappy reduced network IO from 40MB/s to 25MB/s; that is almost 50% savings. Such reduction in network traffic can save thousands of dollars on inter-AZ traffic in AWS.