Apache Kafka 0.10.0 Overview

Categories: BigData

Updated: 2017-05-05

Introduction

Apache Kafka is a scalable and high-performance message broker. It can be deployed as a single node. However more importantly it can be clustered to handle arbitrary loads - millions of records per hour and many thousands of concurrent publishing and subscribing processes even for the same topic are no problem, given enough hardware. It also supports data replication for robustness, allowing it to continue running despite failing nodes or disks - a very important feature when a cluster grows large enough.

This article gives a brief introduction to Apache Kafka, most useful for those who are not yet familiar with it. It does assume some familiarity with the concept of message-brokers/message-queuing.

The official Kafka documentation is very good; this article just gives a general overview and then addresses a few items that I personally found lacking in the standard docs.

The Kafka Project

The Apache Kafka project actually produces three distinct pieces of software:

  • Kafka Message Broker (the original product of this project)
  • Kafka Connect
  • Kafka Streams

This article discusses the Kafka Message Broker only; I have written a separate article on Kafka Connect.

Message Brokers, Producers/Publishers, Consumers/Subscribers

A message broker sits between different processes, and passes messages between them. A message submitted by a producer process to a topic may be passed on to one consumer process or multiple consumer processes.

There are three basic reasons to use a message broker:

  • so that a producer (aka publisher) can send off a message without caring if there are zero, one, or many other different kinds of program interested in receiving that message;
  • so that a producer (aka publisher) can send off a message without caring whether the expected consumer (aka subscriber) is ready for it yet - or in fact, whether the consumer/subscriber is even running at the moment; or
  • so that data generated by a producer can be consumed by multiple identical consumers running in parallel to improve throughput.

The first provides decoupling between the application that sends a message and the applications that receive it; they can be developed quite separately, they don’t need to know each other’s network addresses, and more receivers of messages (for the existing topics) can be added at any time without code changes to any existing components. Actually, additional producers can also be added (for existing topics), although this is less usual.

The second provides buffering between the application that sends a message and the application that receives it. The producer/publisher can generate short bursts of messages faster than the downstream app can handle them, without having to slow down - the message broker does the buffering. The producer or consumer can also be stopped and restarted (eg for version upgrades) without any complex synchronization; the producer and consumer are connected only to the message broker, not directly to each other, and the message-broker (optionally) provides buffering while a consumer is down.

The third provides scalability.

In usual message-broker terminology the words Producer/Consumer are used to indicate a 1:1 message transfer pattern where a single message from a Producer is processed by a single Consumer application - ie exactly one piece of logic will be applied to the message (though there may be a pool of identical instances of this Consumer application for throughput reasons, ie load-balancing). In contrast, the terms Publisher/Subscriber are used when a single message from a Publisher is sent to multiple different applications, ie it may be processed in multiple different ways (though each of these “different applications” may be a pool of identical instances for throughput reasons). From the originating point of view, Producer/Publisher are identical - a message is sent to a logical topic within a message-broker. From the point of view of applications processing messages, the terms are also identical - they connect to the message-broker and want to be given all messages for a specific logical topic. The only difference lies within the message-broker itself: when to consider a message “consumed”; in a 1:1 pattern, it is “consumed” when one external application has received it, while in a pub/sub pattern, it is “consumed” only when each subscriber has received it.

Kafka supports both 1:1 and 1:N models, in an elegant unified manner. Kafka documentation tends to apply the terms Producer/Consumer for both kinds of usage, as will this article. The verb “to publish” is used to indicate a Producer writing to a topic, and “to subscribe” is used to indicate the process of registering a Consumer process with the message broker.

Publishing data to a topic where there are one or more different types of consumer, each with a large number of instances for increased throughput, is where Kafka shines.

Use Cases for Kafka

Rather than simply think of Kafka as a “better message broker”, it is helpful to look at the problems it solves. It can:

  • decouple the producer of some data from the consumer(s) of that data, ie allow the producer to be unaware of what consumers exist, and vice-versa;
  • provide a buffer between a bursty real-time use-it-or-lose-it source of messages (events) and the consumer(s) of the messages/events;
  • provide a buffer between a system generating bursts of events and a synchronous consumer of that data (similar to above, from a different perspective);
  • allow components of a processing chain to be temporarily shut down (eg for maintenance) without losing data;
  • split a dataflow, allowing multiple independent consumers to consume the same stream of events;
  • allow an incoming datastream to be processed in parallel, while still preserving message ordering for subsets (partitions) of the data;
  • allow limited historical data to be replayed on demand (eg after improving or fixing an algorithm);
  • persistently cache the most recent values of events for purposes such as rebuilding distributed in-memory caches.

All of this can be provided for extremely high-volume datastreams (as long as the input data constraints support partitioning; see later).

The first five patterns are also supported by other message brokers (eg RabbitMQ, ActiveMQ), although Kafka scales better than any other broker. The last three are specific to Kafka.

The Issues with Traditional Message Brokers

There have been message-broker implementations for decades, and they sit at the core of many very important software systems. However the pre-Kafka message brokers share some limitations which Kafka was invented to solve. The primary problems are:

  • Message ordering and grouping
  • Transactional behaviour
  • Message deletion (ie effective use of disk and memory)
  • Scalability
  • Performance

In the basic message-broker pattern, messages are simply written to a “topic” (aka queue), and when multiple apps are reading from the topic for the purpose of load-balancing then messages are just handed out in order to the next free consumer. However it is common for some messages to be “related to each other”, such that they all should be processed by the same consumer instance (eg so it can use some kind of in-memory cache). Possibly more importantly, ensuring all messages in a “group” are handled by the same process instance ensures they are processed in the same order they were submitted. Advanced message-broker implementations do support this, by allowing the producer to set “message group” IDs on messages. However in many message-brokers the internal code to implement this behaviour is very complex due to the general broker design, performance is impacted, scalability is impacted, and strange behaviour can occur when that particular consumer fails/is restarted, etc.

For reliable message processing, the broker needs to correctly handle the case where a consumer fetches a message, then crashes before it completely processes that message; the “not completely processed” message should be made available to other consumers. Traditional message brokers implement something similar to database transaction-handling, where a message is “tentatively” handed out to a consumer as part of a “transaction” that the consumer then “commits” to indicate the message truly has been processed. If, instead, a message is “rolled back” then it should be made available to other readers of that topic, even though they may have already read messages later in the sequence. The implementation in many message-brokers is complex and slow, due to their overall design.

Traditional message brokers keep unconsumed messages for an unlimited amount of time. They determine at the time the message is received who the valid consumers are, and then track which of these consumers have actually read the message. A message is finally “deletable” only when all of the consumers valid at the time of message arrival have processed it. Tracking this is complex - particularly when the numbers of consumers reaches into the thousands.

There are many message broker implementations which are clusterable in that a set of broker instances can be run, and cooperate together. However usually each topic has an “owning node” which manages that topic, leading to potential scalability problems for topics with large numbers of messages.

Kafka takes an unusual approach to two core concepts (topic partitioning and message retention) which results in much simpler solutions to all the above issues.

What Makes Kafka Different

Topic Partitions

One single feature of Kafka is responsible for most of its scalability: messages are published not to a topic but to a (topic, partition) pair.

In Kafka, when a topic (queue) is defined, the number of partitions for the topic is also defined. Each partition results in an open file somewhere in the Kafka cluster, so a topic should not have an excessive number of partitions defined. However the number of partitions also sets the maximum parallelism for consumers for the topic (a topic with only one partition cannot support load-balancing). A single Kafka node can handle hundreds of open files concurrently, ie hundreds of partitions. A cluster of a few thousand Kafka nodes scales linearly to hundreds of thousands of partitions if really needed.

When a message is published to a topic (via a Kafka client library), the producer specifies (explicitly or implicitly) the partition to be used for the message. The client library then determines which Kafka node is currently responsible for that partition, and the message is sent directly to that Kafka node. The message is appended to the current file for that (topic, partition) - and also replicated to any “backup” nodes for that partition. Choosing a partition for a message is discussed further later. These files are simply on the local filesystem - no distributed filesystem is required.

When an application wants to read messages from kafka, three values are important: the topic, the partition-id(s), and the offset in each partition. Any process connecting to Kafka can specify these manually to read messages, and this allows a lot of flexibility; clients can invent their own ways of deciding which applications see which messages. However usually the client leaves these values to Kafka and instead specifies only the topic and a “consumer group”. Client processes using the same group “cooperatively” process messages, ie messages are distributed across the group to achieve load-balancing, while client processes using different consumer groups see the same messages ie are independent consumers (“broadcast”).

When a topic has N partitions, and only one client (consumer) for a particular consumer-group is connected, then that client is allocated all partitions. When a second client process subscribes to the same topic using the same group, then half of the partitions are “taken away from” the first client, and assigned to the second one. As more “load-balancing” clients (ones with the same consumer-group) subscribe to the same topic, responsibility for the partitions is progressively redistributed. Maximum parallelism is reached when there is one consumer for each partition in the topic; further clients are not assigned any partitions at all and will sit idle - unless one of the existing consumers closes its connection or fails to respond, in which case an “idle” consumer will take over the failed consumer’s responsibilities.

At no time will two clients within the same consumer-group be responsible for the same partition, and thus messages within a (topic, partition) pair are always processed in the same order they were submitted. Message ordering is not guaranteed between partitions, as different consumers may be executing at different speeds. This does imply that if order must be preserved for all messages in a topic, then that topic must have only one partition, and processing of messages from that topic via load-balancing is not possible.

Having only a single reader per (consumer-group, partition) also makes it very simple for Kafka to track the “next message to return” for a consumer; it is always just a single offset for each (consumer-group, partition). Kafka stores that information internally, and persistently. However a consumer is also permitted to modify the current offset within a partition if it wishes, allowing sophisticated consumers to perform their own offset tracking - in particular, allowing clients to implement “transactional reads/rollbacks” (also known as “exactly once delivery”) via any mechanism they wish.

Note that the publisher of a message needs to carefully consider which partition a message should be assigned to. Subscribers to a topic are less aware of partitions; the Kafka library generally just handles them automatically. The only relevance partitions have for consumers is that message order is guaranteed within a partition, and the maximum parallelism is limited to the number of partitions.

The number of partitions in a topic can be modified later (Kafka v0.8.1 and later), although there are some limitations and complications. See later for information about how the client chooses the partition for a message, and see the official Kafka documentation for information about how to change the number of partitions.

Time-Based Message Retention

As each message is published to a (topic, partition) it is appended to the current file for that (topic, partition) pair. Periodically, based on either time or file-size, the current output file is closed and a new one is opened. Each topic has a configured “retention time”; partition-files are kept until ther retention-time has been reached and are then deleted. Retention times are typically set to a few days, though it depends heavily upon use-case.

This time-based deletion scheme is used instead of tracking the existence of consumers; it allows applications to be down for a period of time and still be able to retrieve their messages, while not imposing any complex admin overhead on Kafka. The maximum supported down-time is limited to the retention-time of the topic - but this can be temporarily extended by operations staff if necessary, as long as disk space is available.

Retaining messages for a fixed time-interval (rather than deleting messages when they are read) also allows consumers to “rewind” and “re-read” messages if desired. This is particularly useful in the presence of downstream bugs or failures; the problem can be fixed and messages reprocessed, as long as the retention time has not been exceeded. It is also very useful during development and testing; a single set of test-data within a Kafka topic can be repeatedly processed.

Most importantly, time-based retention makes it trivial to support multiple independent consumers to the same topic; tracking the “next message” for each consumer is simply a matter of retaining a single integer per partition per consumer-group - the most-recently-read message-id from that partition. There is no need to store a read/not-read flag per (consumer, message) as some other brokers do. The minimal amount of data needed per-consumer allows Kafka to scale to vast numbers of consumers. And when a consumer-group’s “most recently read” message-id becomes so old that it points to a message in a file whose retention-time has passed, then the consumer information can itself be deleted, providing a simple “auto-cleanup”. As noted above, consumers also have the option of tracking their offsets themselves, relieving Kafka of even this minimal overhead.

This approach to message retention does increase the amount of disk storage space required; conventional message-brokers can delete messages as soon as they are consumed by all “known” consumers. As disk is cheap, this is considered a reasonable tradeoff for the features and reduced implementation complexity.

Because each partition is “owned” by a Kafka node, and stored as a set of files (one active) on a single server, the total amount of data in a single partition is limited to the amount of storage available on a single server in the cluster. However as a topic is divided into multiple partitions, a topic overall can have many times that amount of data.

Unlike traditional message-brokers which calculate the “valid consumers” for a message when the message is first received, Kafka allows consumers to be registered at any time; if one uses a new (unknown) consumer-group then it will see all messages that have not yet reached their expiry time. This is particularly useful for development, testing, and recovery after various types of system failure.

Message Keys

Each message written to Kafka must be a (key, value) pair. The value is simply a block of bytes, and is not inspected by Kafka in any way. The key is also a block of bytes (though often the UTF8 encoding of a string), and is used by Kafka in several ways:

  • the key can be used to choose a partition - see the next section.
  • the key can also be used by “compacted topics” - see later.

In addition, the applications which consume messages can potentially use the key themselves, eg to hold “metadata” associated with the record. However encoding extra data in the key will affect the default partition allocation behaviour.

Choosing a Partition

The default algorithm used by Kafka client libraries to allocate a message to a partition is simply hash(message-key) % npartitions. This guarantees that all messages with an identical message-key are assigned to the same partition - and thus will be processed in the same order they were published. A financial system might publish messages using the account-id as a key for example, to ensure message order is preserved for all messages associated with the same account.

Alternatively, the client (publisher) process can specify a partition-id directly (after querying the topic meta-data to see how many partitions exist).

Messages should of course be distributed evenly across all partitions for the topic; if partitions become unbalanced then throughput will be affected (some load-balancing consumers will be sitting idle while others are working).

Processing Data in Parallel

One of the use-cases for a message-broker in general, and Kafka in particular, is the ability to process very large volumes of data via parallel processing.

The amount of parallelism that can be applied depends very much on the nature of the input data. There are three cases to consider:

  • input records have no ordering constraints
  • input data consists of a large number of independent low-volume streams of ordered records
  • input data consists of a small number of independent high-volume streams of ordered records

Often it is the case that the order of incoming records is irrelevant for processing. Something like entries from a webserver logfile recording the URLs visited might be useful for calculating statistics or advertising commissions without any need to preserve the order in which the visits happened. In such a case, a producer can write messages to Kafka with a null message key and the Kafka client library will ensure they are approximately evenly distributed across all partitions for the target topic - however many there are. The number of partitions for the topic can be arbitrary - and thus the amount of consumer parallelism can be selected to ensure the work gets done on time. Records may not be processed in quite the same order they arrived; for example in a trivial system in which parallel consumers just append all records to a common file, the ordering in the file will not be quite the same as the order in which records were written by the producer - a mild reordering can be expected due to some consumers working faster than others.

Some datastreams can be considered as a set of independent streams, where records in each stream are ordered but there are no constraints between streams. An example could be records from ATMs belonging to a bank. Records associated with a specific account are ordered - depositing $200 then withdrawing $100 is not the same as withdrawing $100 then depositing $200 as there are penalties associated with overdrawn accounts. In effect there are a large number of low-volume ordered streams, which is a very parallelisable system; the maximum number of partitions is the number of independent streams - in this example, the number of accounts. Each record may be published to Kafka using the account-id as the key; this ensures all records for the same account land in the same partition but accounts will be spread evenly across all nodes.

A more difficult example is a factory with two large machines, each of which has a thousand sensors which take a measurement every millisecond. That’s a million records per second per machine - definitely a big-data problem. However if order must be preserved for sensor readings from a specific machine, then the best that can be done is one partition per machine. That in turn limits the parallelism available to consumers - the code performing processing of this data will need to run on some serious hardware. In such cases, it might be necessary to give up on streaming processing; if each event includes a timestamp then they can simply be processed within Kafka as unordered data (see first case above), and written into a database. Batch processing can then be applied to the database, sorting by timestamp to recreate the correct ordering before performing analysis. The streaming processing in Kafka can still do some of the work, eg filtering out unwanted data, but the order-sensitive processing cannot be done in parallel with Kafka - or any other message-queue-like system.

Message IDs

Each message can be uniquely identified by (topic, partition, offset). The offset is not a “byte offset within a file”, but instead a logical counter of messages within a partition. When a consumer “seeks” within a partition (to rewind or skip messages), this is done via a message-id.

TODO: when a topic is repartitioned, what happens to the message IDs?

Data Serialization

Although message keys and values are just byte-arrays to Kafka, applications generally don’t want to pass byte-arrays around - they wish to transfer data structures. Any producer can of course itself serialize its data-structures into byte-arrays before passing them to the Kafka client library, and consumers can accept byte-arrays from Kafka and deserialize them itself into the original datastructure form. However given that this is such a common task, the standard Kafka client libraries have built-in support for message serialization. A producer can pass objects to the Kafka client library, and it will apply a configurable serializer to them automatically; similarly consumers can let the Kafka client library apply a deserializer to each message retrieved from a topic before it is passed to the consumer code for processing.

Allowing the Kafka client library to apply serialization/deserialization allows some interesting configurable behaviour - in particular, integration with the Confluent Schema Repository can be transparently enabled. See my article on Kafka serialization for more information.

Transactional Writes and Reads

Some traditional message-brokers have a complex acknowledgement system, and some even support full XA transactions to allow read-from-broker and write-to-database to be atomic, with rollback support.

In Kafka, there is no transactional support for publishing messages; they are simply sent or not. Normally messages are gathered into batches (limited by size or time-interval) for each destination Kafka node. Kafka then sends back acknowledgements when the messages have been successfully persisted (and optionally replicated). However there is no support for “unpublish” or “prepare-to-publish, I’ll confirm later”.

Reading messages is a little more sophisticated; Kafka provides an optional mechanism for tracking the “current offset” within each partition of a topic for any number of consumer-groups (cooperative load-balancing sets of processes). This allows a consumer application to “continue where it left off”, as long as it shut down cleanly. However this mechanism is not atomic or transactional; on crash of a consumer application messages may have been processed without the associated offset having been updated. In order to truly provide “exactly once” processing for messages, a consumer application needs to manage offsets itself; see the section entitled “Reliable Delivery”.

There has been considerable discussion within the Kafka developer group about transaction support for Kafka, and it may come in some future release. See the Kafka wiki for further information on this topic.

Reliable Delivery

Any queue-based and stream-based software will deliver messages received from its configured sources to its configured targets under normal conditions. However when things go wrong (network problems, crashing servers) systems usually provide one or more of the following delivery guarantees:

  • at most once
  • at least once
  • exactly once

Option “at most once” means that the target system will never receive duplicate messages, but some input messages may be lost. Option “at least once” means that no message will ever be lost, but that some messages may be delivered twice. Option “exactly once” is obviously the most desirable, but is not always possible to provide or has performance limitations.

The following sections address the options for tuning behaviour in failure conditions.

Reliable Consumers

Kafka consumers generally work as follows:

# startup
find out which kafka topic partitions this consumer process has been allocated
fetch the last-known-offset for each partition

# main processing (acks/offset-management not shown here)
forever:
  for each allocated partition:
    fetch a batch of messages from the partition at the current offset
    for each message in batch:
      send to target system
    flush buffers to target

Usually, consumers use the kafka-provided offset tracking system; the kafka client library provides an API to “update offset for partition X” (ie “acknowledge received messages”) which internally sends a message to a Kafka node which is acting as the “group manager” for that topic. On startup, the “group manager” can be queried to find the current offset for the topic partitions.

The consumer may send the acknowledgement (ie update the current offset) as soon as it has fetched a batch of messages; in the case of a crash that leads to “at most once” delivery, as the offset has been updated before the messages have been processed.

The consumer may instead send the acknowledgement after processing a batch of messages and flushing all outputs to the target systems. In the case of a crash, this leads to “at least once” delivery, with maximum duplication of 1 batch of messages on restart, as up to 1 batch of messages may have been processed without having updated the offset.

The consumer may choose to be more cautious and make an acknowledgement (ie update offset) after each message is processed:

forever:
  for each allocated partition:
    fetch a batch of messages from the partition at the current offset
    for each message in batch:
      send to target system
      flush buffers to target system and wait for acknowledgement from target
      send offset-update message to kafka group manager

On crash this still leads to “at least once” delivery, but reduces the duplication to at most one message. The disadvantage is a major performance impact.

When the target system is transactional then the consumer may choose to not use the kafka-standard offset tracking mechanism, and instead manage offsets itself:

# startup
find out which kafka topic partitions this process has been allocated
fetch the last-known-offset for each partition from target system

forever:
  for each allocated partition:
    fetch a batch of messages from the partition at the current offset
    start transaction on target system
    for each message in batch:
      add to transaction
    add offset-update to transaction
    commit transaction

In this approach, the offsets are stored in the same system as the data, so that a transactional commit (or rollback) affects the data and the offsets simultaneously. When data has been committed, then so has the related offset information; on crash the transaction will be rolled back - together with the offset data. This solution is efficient and reliable - but requires a transactional target system and the ability to store offset data within it (eg the right to define a special table for that purpose).

An alternative to implementing “exactly once delivery” in the Kafka layer is to either simply live with duplicates (they are likely to be extremely rare as long as your software environment is stable) or to rely on the target system to filter out duplicates later. If every message has some kind of unique ID allocated in the source system, then filtering within the target system is reasonably easy (or trivial if the ID is the unique key of data being inserted into a database). Given the difficulties of guaranteeing exactly-once behaviour in Kafka producers, this is an option well worth considering.

Reliable Producers

While consumers have a number of options for minimising or even avoiding duplicates, this is more difficult on the producer side.

Whether a producer is reading from a Kafka queue, some other message-system, or files on disk, similar problems need to be dealt with: on crash or network problems, which messages should be resent to the Kafka topic and which not?

Storing offset information locally within the producer cannot be reliable - after a batch of messages has been sent to a broker for appending to a topic partition, there is a time-window in which the producer does not know whether those messages have been written or not. If the local offset is updated before acknowledgement then “at most once” delivery may occur on crash; if the local offset is updated after acknowledgement then “at least once” delivery may occur on crash. In addition, storing offsets “locally” on a specific server is not ideal for error recovery.

Kafka does not provide the ability to atomically write messages to two topics; it is always possible that one write succeeds and the other does not. Therefore it is not possible to track offsets reliably via Kafka either.

The best current option when “exactly once” behaviour is absolutely required for a producer is to embed the offset information into each message. On producer startup, it should read the most recent message from each partition to determine where it should resume from. This is somewhat complicated to implement, and does require modifying the message body (or the key).

Alternatively, if the source data already includes a unique ID then the producer may be able to deduce the “current offset” from that - or simply to accept that “at least once” delivery may occur via Kafka, and filter out duplicates afterwards using the unique ID.

Kafka as a Persistent DataStore

It is not unusual for a process (or set of processes) to keep an in-memory collection tracking data which has flowed through the system within a time-period (window). However, if such a process needs to be restarted, how can its data be retained? One solution is to use a Kafka topic - as data is cached, also write it to a topic. On startup, simply read all messages in the topic to rebuild the in-memory cache. The topic retention-time can be used to discard older, unwanted data.

It is also not unusual for a process to keep an in-memory (key->value) structure tracking the most recent value for any key. Again, if such a process needs to be restarted, how can its state be retained? A Kafka topic can be used, writing (key,value) pairs to it before updating the in-memory cache. On restart, all messages in the topic can be re-read to rebuild the cache. Kafka has special support for this kind of usage - compacted topics. As with normal partitions, the “current file” is periodically closed and a new one started. A background task in Kafka scans the closed files, looking for messages with the same key and writing the most recent message for each key to a new file; the old files are then deleted. The result is that when a series of (key, value) entries are written to the topic, and the topic later re-read, then the most-recent (key,value) for any key will definitely be found - older ones will eventually be purged, though not immediately.

Reliable Message Submission

Some message-brokers (particularly, IBM MQ-series) run an “agent” on each host. A publisher application submits messages to the local agent which then is responsible for passing the message over the network. This approach has the advantage that handling network errors is then the responsibility of the agent process rather than the publisher application.

Kafka doesn’t itself provide this feature; each client application is expected to open a network connection to relevant Kafka nodes (partition managers), and will simply fail (report errors) if Kafka cannot be contacted. However the bruce project provides a daemon service that can fill the role of a “reliable local delivery agent” for Kafka.

Why Kafka is so Fast

There are three reasons why Kafka can handle such large volumes of messages:

  • file-io is append-only (streaming) writes, no seeks
  • producers and consumers connect directly to end nodes (no intermediates)
  • network traffic is batched

File IO

When a Kafka broker instance receives a message, it simply appends it to the current open file (ie writes to an open file-descriptor) and flushes the data from application memory to kernel memory. When the current file grows too large, the Kafka broker closes it and opens a new one. That’s all it does - no management of b-trees, no seek+write operations ever, no need to compute the set of consumers for the message or write copies of a message to multiple queues.

Streaming writes are very disk-friendly and kernel-friendly. Rotating storage does not like seeks at all. Solid state disks are much better at seeking reads, but do not like small random writes (they only write in “block sizes” and any write which is not a blocksize results in a read/merge/erase/write). Operating system kernels also love sequential disk io; they do a lot of management and tracking of IO operations internally and sequential operations make this internal admin much easier.

In general, reads of messages from a Kafka broker are also sequential; consumers read blocks of messages at a time. Reading a block of messages is potentially a file-open operation, then a seek and a sequential read of a reasonable amount of data. This is at least moderately efficient. Unlike some other message-brokers, read operations do not require the stored data to be updated - no need to mark messages as “read” or to delete them.

To discard unwanted messages, Kafka just deletes whole files of messages where the newest message is older than the topic “retention time”. No need to delete individual messages, manage b-trees, etc.

Handling Kafka “compacted topics” is a little trickier; a background process (per node) reads all files in a topic except the “current” one, determine the newest value for each unique key, and writes the (key, latestvalue) values to a new file. The processed files are then deleted. Again, this is streaming writes and complete-file-deletions making this also reasonably scalable. The “current output file” is never touched by this process. Compaction is required to always preserve the latest value for each key, but is not required to remove all other entries for that key - ie duplicates may still remain. This allows some performance optimisations - “mostly compacted” is still good enough.

Just as a side-note: this approach of streaming writes, periodic file-rollover, and a background process which periodically replaces older files with compacted versions, is found in a number of big-data projects. In particular, Apache HBase uses a similar technique to provide a database-like system with very high write performance. This is often called “log structured storage”, and Kafka is sometimes described as being a “distributed commit log”.

When replication is configured for a topic (and it usually should be) then the master Kafka node for a partition will first write to disk, and then forward the data to the node(s) hosting the replicas. The sent data is just a copy of the input; no complex transformations need to be made. Whether the master node waits for the replicas to confirm receipt before acknowledgement is sent to the original producer is specified in the producer’s original write-request.

Direct Writes

Producers determine which partition each message belongs in, queries cluster state to determine which node is the current “master” for that partition, and then send the message directly to the target node. This has the disadvantage that a producer must have direct network access to each node in the Kafka cluster. It has the advantage that a message never needs to pass through a “gateway” to get to the node that will actually store it.

The process is similar for consumers; they are told by Kafka which partitions they should read from, and cluster state tells them the network addresses for each node which holds a replica of that partition. The consumers then make read operations specifying (topic, partition, offset, count) directly to the relevant node. Read queries do not need to be sent to the “master” node for the topic/partition.

Batching

The standard Kafka client libraries try to batch network traffic for producers so that a single write-request transfers multiple messages.

Similarly, the Kafka client libraries batch read-requests for consumers. As noted in the section on file-io, within a single “consumer group” there is only one reader for each partition; parallelism is achieved by partitioning data before writing to Kafka rather than trying to have multiple consumers read from the same partition. This has the benefit that reads of a partition can be batched without “stealing” work from other readers in the same consumer group.

Installing Kafka

There is nothing particularly complex about installing Kafka - in fact, easier than many other message brokers.

The only quirk is that the Kafka server nodes require access to a Zookeeper cluster - something that you almost certainly already have if you are regularly working with clusters. Note that older versions of Kafka (<0.9.0) required client applications to contact the Zookeeper cluster directly, and therefore Zookeeper node addresses were part of the configuration required by Kafka clients. Since 0.9.0, Zookeeper is relevant only for the Kafka server nodes; client applications are not exposed to Zookeeper and require no Zookeeper libraries or network addresses.

Update: as of version 3.3, Zookeeper is optional. It is expected that as of version 4.x it will no longer be supported at all.

Monitoring Kafka

Each Kafka broker node exports a bunch of JMX endpoints which can be used to monitor that node. Each application which uses the Kafka client library also exposes JMX endpoints, provided the JVM is set up to expose JMX.

There is also a tool named “burrow” which does not monitor consumers directly, instead monitoring the “group managers” to measure the changes in HEAD and TAIL offsets into topics, and to record rebalances that happen as consumers are added to a topic (ok) or removed from a topic (possibly indicating a crash). Burrow requires the Go runtime and Go Package Manager (GPM), so is a little awkward to install.

The Confluent Control Center (commercial product) includes dashboards for monitoring Kafka installations. Among other techniques, they take advantage of the generic “configurable interceptor” feature of the Kafka client library (v0.10 or later) to plug in a library (Confluent Metric Interceptor) which gathers statistics in-memory for each producer/consumer and periodically writes them to a dedicated Kafka topic; this is obviously easier to manage than requiring knowledge of the JMX address of each producer or consumer application. It is not currently clear to me whether the support-metrics-client project on Github is the code for the “metrics interceptor” or whether the interceptor is closed-source.

Some links to additional information on monitoring:

Securing Kafka

Kafka supports both Kerberos and SSL encryption and authentication. I have personal experience with both, and they work fine.

This Github project provides a Vagrant configuration file which creates a Kafka cluster with authentication enabled.

The Kafka API

The client->kafka network protocol is carefully defined by the Kafka developers, allowing client libraries to be implemented in various languages.

Kafka was originally implemented in the Scala language, and for Scala developers there is a good client library available.

For Java developers, there is a good client library which provides an API in package “org.apache.kafka”. Note that there is an older API in package “kafka.javaapi” which is a rather clumsy wrapper around the Scala API; avoid this.

For other languages, see the Kafka wiki.

There is no JMS (Java Message Service) provider library for Kafka; while it has similar features to other message-brokers that do have JMS interfaces, it doesn’t quite fit into the datamodel assumed by JMS.

The Kafka REST Proxy

Kafka brokers provide only a binary API; client applications need to use a suitable library to communicate with a Kafka broker.

The company Confluent provide an open-source Kafka REST Proxy server, which offers a REST API for reading and writing messages and forwards such requests to the Kafka cluster in binary form. Of course this REST approach is far (far) slower than using binary Kafka clients, but can be useful in some circumstances.

Similarities between Kafka and MapReduce

Interestingly, the MapReduce algorithm has significant similarities with the way the Kafka message broker works. A “map” phase and the subsequent “shuffle” is effectively partitioning data in the same way that the Kafka client library does, allocating records that belong together to the same partition and thus a single node in the cluster. And a “reduce” phase is effectively the same as a Kafka message consumer - it processes records in an input partition in a single-threaded manner, while allowing data in other partitions to be processed in parallel.

References