Hello everyone! In this white paper summary we're going to tackle a paper written by an engineer that works at LinkedIn who talks us through how Kafka was designed and some of the design choices they made. As usual we'll walk through the paper and highlight the important parts. This is really me just reading it and writing out notes but I do recommend you guys read it! I'll leave the link in the reference section below.
These articles were made to get the important bits from the article or the parts I was interested in the most. I've added/removed and kept some parts the exact same. The end goal is to just understand how it works but all credit goes to the writer 100%.
This paper was written in 2011 so it doesn't mention replication because Kafka didn't have it back then.
Introduction
The paper starts by addressing how log processing has become very critical in this day and age and moves on to say that Kafka was made to tackle some of the log processing problems they faced at Linkedin and that it took ideas from other messaging systems. Then the writer starts vouching for Kafka and how its performance and scalability Is superior compared to other systems.
Then moves to talk about what "log" data really is in companies (user clicks, metrics, etc) and talks about how back in the day it was mainly used for analytics but in this day and age it's used directly into production systems in real time (search relevance, recommendations, ad targeting, security protections)
These types of log data are very challenging due to the volume nature of them. Processing them in a fast and efficient way is an even harder challenge.
Then the writer then mentions that the old ways of processing these logs were scraping them all for analysis, which is very inefficient if you think about it. And that several log aggregators were built in the last years (Scribe, Flume ,etc) which normally offload the data into a HDFS (Hadoop).
Linkedin wanted to achieve more with log aggregation, they needed to support all the real time applications mentioned above (search relevance, etc) with a delay of no more than a few seconds.
Then Kafka comes in, a combination of traditional log aggregators and messaging systems. The most important thing was that it allowed consuming these logs in real time. In the next section we'll discuss different messaging systems that were available at that time and why Linkedin couldn't adopt them and had to invent Kafka instead.
Related Work
The messaging systems that were available at that time weren't a good fit for log processing where there was a mismatch in features. The messaging systems then were focusing more on delivery guarantees rather than throughput. And that was considered overkill for collecting log data where if a click wasn't registered it wouldn't be the end of the world. The unneeded features increased the complexity of the system but they were made because not every system focuses on throughput as their main primary constraint. Not only this but those systems were very weak in distributed support. There is no easy way to partition and store messages on multiple machines.
Then the writer starts talking about how these systems usually aggregate the logs and dumps them periodically. But talks about how most of these systems usually do this processing offline (not real time) and that they use a "Push" model where they push the data to the consumers which potentially could overload the consumer if it's still processing data. They found that the "pull" model is more convenient to work with at Linkedin so each consumer can have its own rate and avoid being flooded by messages.
So briefing up, here's the summary:
Linkedin wanted throughput, none of the existing systems provided that
Existing systems weren't that scalable/ real time
The push model was not going to work for Linkedin
In the next section we talk about Kafka's architecture and design principles
Kafka Architecture and Design Principles
The basic outlines of Kafka are as follows:
A stream of messages of a particular type is called topic
A producer publishes messages to the topic
The messages are stored on a set of servers called brokers
A consumer subscribes to one or more topics from the broker and pulls data from the broker.
In the consumer, each message stream provides an iterator interface over the continual stream of messages being produced. The consumer iterates over the messages and blocks if there doesn't exist any.
Kafka supports two types of methods:
Point to Point delivery (just a basic queue where 1 consumer only takes the message and processes it)
Pub/Sub model where multiple consumers get a copy of the same message.
Below is a simple diagram visualizing what we wrote above (stole it from the paper 😅)
A topic is divided into multiple partitions and each broker stores one or more of those partitions. Multiple producers and consumers can publish and retrieve messages at the same time. In the next section we talk about partitions and some of the design choices that were made.
Partition Layout and Design Choices
Each partition of a topic has a logical log. Physically the log is a set of segment files where each file is around 1GB.
Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file. For better performance, we flush the segment files to disk only after a configurable number of messages have been published or a certain amount of time has elapsed. A message is only exposed to the consumers after it is flushed.
This is an example of providing Durability and Consistency over a little bit of latency (configurable). When the messages come in to the broker they are in an in memory buffer and after a configurable amount they get flushed to disk. Only when they are flushed the Consumer can see them. This gives a huge boost in durability and consistency over some milliseconds of latency which is very much worth it.
Unlike typical messaging systems, a message stored in Kafka doesn’t have an explicit message id. Instead, each message is addressed by its logical offset in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations. Note that our message ids are increasing but not consecutive. To compute the id of the next message, we have to add the length of the current message to its id.
A consumer always consumes messages from a particular partition sequentially. If the consumer acknowledges a particular message offset, it implies that the consumer has received all messages prior to that offset in the partition. Under the hood, the consumer is issuing asynchronous pull requests to the broker to have a buffer of data ready for the application to consume. Each pull request contains the offset of the message from which the consumption begins and an acceptable number of bytes to fetch. Each broker keeps in memory a sorted list of offsets that include the offset of the first message in every segment file. The broker locates the segment file where the requested message resides by searching the offset list, and sends the data back to the consumer. After a consumer receives a message, it computes the offset of the next message to consume and uses it in the next pull request.
The image above visualizes the in memory index present on the broker.
Where each index is the first offset of a segment file. With a simple binary search given an offset we can find the segment file.
The writer also mentions that although the end consumer API iterates one message at a time, under the covers, each pull request from a consumer also retrieves multiple messages up to a certain size, typically hundreds of kilobytes. Just so it can have them ready for processing since they're already flushed on disk on the broker.
One of the very smart decisions the writer talks about is depending on the file system page cache instead of in memory caching for accessing recent messages. This has the following advantages:
Avoids double buffering, there is no in memory buffer its only in the system file page cache
Completely offloads caching to the OS and not the Kafka process, Which is magnificent because there is no overhead for garbage collection
In a Kafka process restart, the cache still exists since it's the OS responsibility and it only goes if the device was rebooted.
OS caching plays a huge role too since producer and consumer access segment files sequentially. It was found that both the production and the consumption have consistent performance linear to the data size, up to many terabytes of data.
Kafka also optimized network access for the consumers, we'll have a look at the normal OS process for reading a local file from disk and sending it over the network.
Read the file from disk into memory (page cache in OS)
Since its in memory we need to copy it to the application cache (still in memory) but in a place in memory where the application can actually access it (the memory reserved for the application)
Then as transmission is about to begin, it offloads to a kernel buffer which is going to interact with the underlying socket and send the data.
The kernel buffer sends over the data via the socket.
That's quite a lot of copies and system calls being made. Kafka optimized this by leveraging an API that exists in Linux systems called sendfile
.
This directly transfers bytes from the file to the socket skipping all these copies and boosting performance.
Stateless broker: In Kafka, the information about how much each consumer has consumed is not maintained by the broker, but by the consumer itself. Such a design reduces a lot of the complexity and the overhead on the broker. However, this makes it tricky to delete a message, since a broker doesn’t know whether all subscribers have consumed the message.
Kafka solves this problem by using a simple time-based SLA for the retention policy. A message is automatically deleted if it has been retained in the broker longer than a certain period, typically 7 days. This solution works well in practice. Most consumers, including the offline ones, finish consuming either daily, hourly, or in real-time. The fact that the performance of Kafka doesn’t degrade with a larger data size makes this long retention feasible.
There is an important side benefit of this design. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but proves to be an essential feature for many consumers. For example, when there is an error in application logic in the consumer, the application can re-play certain messages after the error is fixed. This is particularly important to ETL data loads into our data warehouse or Hadoop system.
As another example, the consumed data may be flushed to a persistent store only periodically (e.g, a full-text indexer). If the consumer crashes, the unflushed data is lost. In this case, the consumer can checkpoint the smallest offset of the un-flushed messages and re-consume from that offset when it’s restarted. We note that rewinding a consumer is much easier to support in the pull model than the push model. Next up is some design considerations governing Kafka being distributed in Nature.
Distributed Coordination
Producers and the Consumers behave in a distributed setting. Each producer can publish a message to either a randomly selected partition or a partition semantically determined by a partitioning key/function.
Kafka has the concept of consumer groups. Each consumer group consists of one or more consumers that jointly consume a set of subscribed topics, i.e., each message is delivered to only one of the consumers within the group.
Different consumer groups each independently consume the full set of subscribed messages and no coordination is needed across consumer groups.
The consumers within the same group can be in different processes or on different machines. Our goal is to divide the messages stored in the brokers evenly among the consumers, without introducing too much coordination overhead.
The writer mentions that first decision was to make a partition within a topic the smallest unit of parallelism.
This means that at any given time, all messages from one partition are consumed only by a single consumer within each consumer group.
Had we allowed multiple consumers to simultaneously consume a single partition, they would have to coordinate who consumes what messages,which necessitates locking and state maintenance overhead.
In contrast, in our design consuming processes only need co-ordinate when the consumers rebalance the load, an infrequent event.
The second decision that we made is to not have a central “master” node, but instead let consumers coordinate among themselves in a decentralized fashion.
Adding a master can complicate the system since we have to further worry about master failures.
To facilitate the coordination, we employ a highly available consensus service Zookeeper. If you're unsure about what Zookeeper is I recommend you check out an article I wrote about it here
Kafka uses Zookeeper for the following:
Detecting the addition and the removal of brokers and consumers.
Triggering a rebalance process in each consumer when the above happens.
Maintaining the consumption relationship and keeping track of the consumed offset of each partition.
When each broker or consumer starts up, it stores its information in a broker or consumer registry in Zookeeper. The broker registry contains the broker’s host name and port, and the set of topics and partitions stored on it. The consumer registry includes the consumer group to which a consumer belongs and the set of topics that it subscribes to. Each consumer group is associated with an ownership registry and an offset registry in Zookeeper. The ownership registry has one path for every subscribed partition and the path value is the id of the consumer currently consuming from this partition.
The offset registry stores for each subscribed partition, the offset of the last consumed message in the partition.
The paths created in Zookeeper are ephemeral for the broker registry, the consumer registry and the ownership registry, and persistent for the offset registry.
Once a new consumer is added or changes are sent to consumers (broker/consumer changes) that consumer does a rebalance process to determine the subset of partitions it should consume from. I recommend reading the rebalance algorithms directly from the paper as it's explained best there.
Delivery Guarantees
Kafka guarantees at least once delivery, Exactly once delivery can be very complex to achieve (two phase commits). Most of the time, a message is delivered exactly once to each consumer group.
In the case when a consumer process crashes without a clean shutdown, the consumer process that takes over those partitions owned by the failed consumer may get some duplicate messages that are after the last offset successfully committed to zookeeper.
If an application cares about duplicates, it must add its own de- duplication logic. This is usually a more cost-effective approach than using two-phase commits.
Kafka guarantees that messages from a single partition are delivered to a consumer in order. However, there is no guarantee on the ordering of messages coming from different partitions.
To avoid log corruption, Kafka stores a CRC for each message in the log. If there is any I/O error on the broker, Kafka runs a recovery process to remove those messages with inconsistent CRCs. Having the CRC at the message level also allows us to check network errors after a message is produced or consumed.
In Apache Kafka, CRC (Cyclic Redundancy Check) is a mechanism used to ensure data integrity. Specifically, Kafka uses CRC to detect errors in the data being transmitted or stored.
Kafka uses CRC checksums in various parts of its architecture to ensure that the data remains intact from the producer to the broker and from the broker to the consumer.
When the data is read or transmitted, the checksum is recalculated and compared with the original checksum. If they don't match, it indicates that the data has been corrupted.
The last and final section of the paper the writer talks about the usage of Kafka at Linkedin, I won't be writing it here but it's a good read I'd recommend reading it 100%.