Apache Kafka 0.9 Overview

Categories: BigData

Introduction

Apache Kafka is a scalable and high-performance message broker. It can be deployed as a single node. However more importantly it can be clustered to handle arbitrary loads - millions of records per hour and many thousands of concurrent publishing and subscribing processes even for the same topic are no problem, given enough hardware. It also supports data replication for robustness, allowing it to continue running despite failing nodes or disks - a very important feature when a cluster grows large enough.

This article gives a brief introduction to Apache Kafka, most useful for those who are not yet familiar with it. It does assume some familiarity with the concept of message-brokers/message-queuing.

The official Kafka documentation is very good; this article just gives a general overview and then addresses a few items that I personally found lacking in the standard docs.

Message Brokers, Publishers/Producers, Subscribers/Consumers

A message broker sits between different processes, and passes messages between them. A message submitted by a publisher process to a topic may be passed on to one subscriber process or multiple subscriber processes.

There are three basic reasons to use a message broker:

  • so that a publisher (aka producer) can send off a message without caring whether the expected subscriber (aka consumer) is ready for it yet - or in fact, whether the subscriber/consumer is even running at the moment;
  • so that a publisher (aka producer) can send off a message without caring if there are zero, one, or many other different kinds of program interested in receiving that message; or
  • so that data generated by a publisher can be consumed by multiple identical subscribers running in parallel to improve throughput.

The first provides decoupling between the application that sends a message and the applications that receive it; they can be developed quite separately, they don’t need to know each other’s network addresses, and more receivers of messages (for the existing topics) can be added at any time without code changes to any existing components. Actually, additional publishers can also be added (for existing topics), although this is less usual.

The second provides buffering between the application that sends a message and the application that receives it. The publisher/producer can generate short bursts of messages faster than the downstream app can handle them, without having to slow down - the message broker does the buffering. The publisher or subscriber can also be stopped and restarted (eg for version upgrades) without any complex synchronization; the publisher and subscribers are connected only to the message broker, not directly to each other, and the message-broker (optionally) provides buffering while a subscriber is down.

The third provides scalability.

In general, the words Producer/Consumer are used to indicate a 1:1 message transfer pattern where a single message from a Producer is processed by a single Consumer application - ie exactly one piece of logic will be applied to the message (though there may be a pool of identical instances of this Consumer application for throughput reasons, ie load-balancing). In contrast, the terms Publisher/Subscriber are used when a single message from a Publisher is sent to multiple different applications, ie it may be processed in multiple different ways (though each of these “different applications” may be a pool of identical instances for throughput reasons). From the originating point of view, Producer/Publisher are identical - a message is sent to a logical topic within a message-broker. From the point of view of applications processing messages, the terms are also identical - they connect to the message-broker and want to be given all messages for a specific logical topic. The only difference lies within the message-broker itself: when to consider a message “consumed”; in a 1:1 pattern, it is “consumed” when one external application has received it, while in a pub/sub pattern, it is “consumed” only when each subscriber has received it.

Kafka supports both 1:1 and 1:N models, in an elegant unified manner. Kafka documentation tends to apply the terms Producer/Consumer for both kinds of usage, as will this article. The verb “to publish” is used to indicate a Publisher writing to a topic, and “to subscribe” is used to indicate the process of registering a Consumer process with the message broker.

Publishing data to a topic where there are one or more different types of consumer, each with a large number of instances for increased throughput, is where Kafka shines.

The Issues with Traditional Message Brokers

There have been Message Broker implementations for decades, and they sit at the core of many very important software systems. However the pre-Kafka message brokers share some limitations which Kafka was invented to solve. The primary problems are:

  • Message Ordering and Grouping
  • Transactional Behaviour
  • Message Deletion (ie effective use of disk and memory)
  • Scalability
  • Performance

In the basic message-broker pattern, messages are simply written to a “topic”, and when multiple apps are reading from the topic for the purpose of load-balancing then messages are just handed out in order to the next free consumer. However it is common for some messages to be “related to each other”, such that they all should be processed by the same consumer instance (eg so it can use some kind of in-memory cache). Possibly more importantly, ensuring all messages in a “group” are handled by the same process instance ensures they are processed in the same order they were submitted. Advanced message-broker implementations do support this, by allowing the publisher to set “message group” IDs on messages. However in many message-brokers the internal code to implement this behaviour is very complex due to the general broker design, performance is impacted, scalability is impacted, and strange behaviour can occur when that particular subscriber fails/is restarted, etc.

For reliable message processing, the broker needs to correctly handle the case where a consumer fetches a message, then crashes before it completely processes that message; the “not completely processed” message should be made available to other consumers. Traditional message brokers implement something similar to database transaction-handling, where a message is “tentatively” handed out to a consumer as part of a “transaction” that the consumer then “commits” to indicate the message truly has been processed. The implementation in many message-brokers is complex and slow, due to their overall design.

Traditional message brokers keep unconsumed messages for an unlimited amount of time. They determine at the time the message is received who the valid subscribers are, and then track which of these subscribers have actually read the message. A message is finally “deletable” only when all of the subscribers valid at the time of message arrival have processed it. Tracking this is complex - particularly when the numbers of subscribers reaches into the thousands.

There are many message broker implementations which are clusterable in that a set of broker instances can be run, and cooperate together. However usually each topic has an “owning node” which manages that topic, leading to potential scalability problems for topics with large numbers of messages.

Kafka takes an unusual approach to two core concepts (topic partitioning and message retention) which results in much simpler solutions to all the above issues.

What Makes Kafka Different

Topic Partitions

One single feature of Kafka is responsible for most of its scalability: messages are published not to a topic but to a (topic, partition) pair.

In Kafka, when a topic (queue) is defined, the number of partitions for the topic is also defined. Each partition results in an open file somewhere in the Kafka cluster, so a topic should not have an excessive number of partitions defined. However the number of partitions also sets the maximum parallelism for consumers for the topic (a topic with only one partition cannot support load-balancing). A single Kafka node can handle hundreds of open files concurrently, ie hundreds of partitions. A cluster of a few thousand Kafka nodes scales linearly to hundreds of thousands of partitions if really needed.

When a message is published to a topic (via a Kafka client library), the publisher specifies (explicitly or implicitly) the partition to be used for the message. The client library then determines which Kafka node is currently responsible for that partition, and the message is sent directly to that Kafka node. The message is written to the current file for that (topic, partition) - and also replicated to any “backup” nodes for that partition. Choosing a partition for a message is discussed further later. These files are simply on the local filesystem - no distributed filesystem is required.

When a process subscribes to a Kafka topic (via a Kafka client library), it does not specify which partitions to read from; instead Kafka tells the consumer which partitions it should (currently) read from. When a topic has N partitions, and the first client subscribes to it, then that client is allocated all partitions. When a second identical client process subscribes to the same topic for purposes of “load-balancing”, then half of the partitions are “taken away from” the first client, and assigned to the second one. As more “load-balancing” clients subscribe to the same topic, responsibility for the partitions is progressively shuffled around. Maximum parallelism is reached when there is one subscriber for each partition in the topic; further clients are not assigned any partitions at all and will sit idle - unless one of the existing subscribers closes its connection or fails to respond, in which case an “idle” subscriber will take over the failed subscriber’s responsibilities.

At no time will two subscribing processes within the same “load-balancing” group be responsible for the same partition, and thus messages within a (topic, partition) pair are always processed in the same order they were submitted. Message ordering is not guaranteed between partitions, as different subscribers may be executing at different speeds. This does imply that if order must be preserved for all messages in a topic, then that topic must have only one partition, and processing of messages from that topic via load-balancing is not possible.

Having only a single reader per (load-balance-group, partition) also makes it very simple for Kafka to track the “next message to return” for a subscriber; it is always just a single offset for that load-balancing group. Kafka stores that information internally, and persistently. However a subscriber is also permitted to modify the current offset within a partition if it wishes, allowing sophisticated subscribers to perform their own offset tracking - in particular, allowing clients to implement “transactional reads/rollbacks” via any mechanism they wish.

Multiple processes can independently subscribe to a topic; processes in different load-balance-groups will be able to read the same messages independently - effectively implementing “broadcast” rather than “load-balance” behaviour.

Note that the publisher of a message needs to carefully consider which partition a message should be assigned to. Subscribers to a topic are less aware of partitions; the Kafka library generally just handles them automatically. The only relevance partitions have for subscribers is that message order is guaranteed within a partition, and the maximum parallelism is limited to the number of partitions.

The number of partitions in a topic can be modified later (Kafka v0.8.1 and later), although there are some limitations and complications. See later for information about how the client chooses the partition for a message, and see the official Kafka documentation for information about how to change the number of partitions.

Time-Based Message Retention

As noted above, as each message is published to a (topic, partition) it is written to the current file for that (topic, partition) pair. Periodically, based on either time or file-size, the current output file is closed and a new one is opened. Each topic has a configured “retention time”; partition-files are kept until their retention-time has been reached and are then deleted. Retention times are typically set to a few days, though it depends heavily upon use-case.

This time-based scheme is used instead of tracking the existence of subscribers; it allows applications to be down for a period of time and still be able to retrieve their messages, while not imposing any complex admin overhead on Kafka. The maximum supported down-time is limited to the retention-time of the topic - but this can be temporarily extended by operations staff if necessary, as long as disk space is available.

Retaining by time also allows subscribers to “rewind” and “re-read” messages if desired. This is particularly useful in the presence of downstream bugs or failures; the problem can be fixed and messages reprocessed, as long as the retention time has not been exceeded. It is also very useful during development and testing; a single set of test-data within a Kafka topic can be repeatedly processed.

Most importantly, time-based retention makes it trivial to support multiple subscribers to the same topic; tracking the “next message” for each subscriber is simply a matter of retaining a single integer - the most-recently-read message-id. There is no need to store a read/not-read flag per (subscriber, message) as some other brokers do. The minimal amount of data needed per-subscriber allows Kafka to scale to vast numbers of subscribers. And when a load-balancing-group’s “most recently read” message-id becomes so old that it points to a message in a file whose retention-time has passed, then the subscriber information can itself be deleted, providing a simple “auto-cleanup”. As noted above, subscribers also have the option of tracking their offset themselves, relieving Kafka of even this minimal overhead.

This approach does increase the amount of disk storage space required; conventional message-brokers can delete messages as soon as they are consumed by all “known” subscribers. As disk is cheap, this is considered a reasonable tradeoff for the features and reduced implementation complexity.

Because each partition is “owned” by a Kafka node, and stored as a set of files (one active) on a single server, the total amount of data in a single partition is limited to the amount of storage available on a single server in the cluster. However as a topic is divided into multiple partitions, a topic overall can have many times that amount of data.

Unlike traditional message-brokers which calculate the “valid subscribers” for a message when the message is first received, Kafka allows subscribers to be registered at any time; if one uses a new (unknown) subscriber-group then it will see all messages that have not yet reached their expiry time. This is particularly useful for development, testing, and recovery after various types of system failure.

Message IDs

Each message can be uniquely identified by (topic, partition, offset). The offset is not a “byte offset within a file”, but instead a logical counter of messages within a partition. When a subscriber “seeks” within a partition (to rewind or skip messages), this is done via a message-id.

TODO: when a topic is repartitioned, what happens to the message IDs?

Message Keys

Each message written to Kafka must be a (key, value) pair. The value is simply a block of bytes, and is not inspected by Kafka in any way. The key is a string, and is used in several ways.

The key can be used to choose a partition - see the next section.

The key can also be used by “compacted topics” - see later.

Choosing a Partition

The default algorithm used by Kafka to allocate a message to a partition is simply “hash(message-key) % npartitions”. This guarantees that all messages with an identical message-key are assigned to the same partition - and thus will be processed in the same order they were published. A financial system might publish messages using the account-id as a key for example, to ensure message order is preserved for all messages associated with the same account.

Alternatively, the client (publisher) process can specify a partition-id directly (after querying the topic meta-data to see how many partitions exist).

Messages should of course be distributed evenly across all partitions for the topic; if partitions become unbalanced then throughput will be affected (some load-balancing subscribers will be sitting idle while others are working).

Transactional Writes and Reads

Some traditional message-brokers have a complex acknowledgement system, and some even support full XA transactions to allow read-from-broker and write-to-database to be atomic, with rollback support.

In Kafka, there is no transactional support for publishing messages; they are simply sent or not. Normally messages are gathered into batches (limited by size or time-interval) for each destination Kafka node. Kafka then sends back acknowledgements when the messages have been successfully persisted (and optionally replicated). However there is no support for “unpublish” or “prepare-to-publish, I’ll confirm later”.

Reading messages is a little more sophisticated. For any “load-balancing group”, a single Kafka node is responsible for tracking “offset information” for the group.

A subscribing application actually “pulls” messages, ie using the Kafka client library “get next” method will connect to a Kafka node responsible for one of the partitions that this subscriber has been allocated for, and request the next N available records (batching is done for efficiency). The “current position” tracked by Kafka is not updated at this point. As the client then consumes messages from this buffer, it periodically sends messages to the “group manager” for its load-balance group to update the current offset. Exactly when these updates are sent can be controlled by the subscriber. When reprocessing a message (“at least once delivery”) has no bad side-effects, then the offset updates can be batched for efficiency. When reprocessing a message is not desirable but nevertheless tolerable, then the offset can be updated more often. When “exactly once delivery” is desired, then the subscriber should take over offset management itself; one option for a subscriber which is already writing data to a database is to store the current kafka message-id within the database, and include this update within the same database transaction. On restart it can then read this same database to determine which kafka-message-id it has most recently successfully processed.

Kafka as a Persistent DataStore

It is not unusual for a process (or set of processes) to keep an in-memory collection tracking data which has flowed through the system within a time-period (window). However, if such a process needs to be restarted, how can its data be retained? One solution is to use a Kafka topic - as data is cached, also write it to a topic. On startup, simply read all messages in the topic to rebuild the in-memory cache. The topic retention-time can be used to discard older, unwanted data.

It is also not unusual for a process to keep an in-memory (key->value) structure tracking the most recent value for any key. Again, if such a process needs to be restarted, how can its state be retained? A Kafka topic can be used, writing (key,value) pairs to it before updating the in-memory cache. On restart, all messages in the topic can be re-read to rebuild the cache. Kafka has special support for this kind of usage - compacted topics. As with normal partitions, the “current file” is periodically closed and a new one started. A background task in Kafka scans the closed files, looking for messages with the same key and writing the most recent message for each key to a new file; the old files are then deleted. The result is that when a series of (key, value) entries are written to the topic, and the topic later re-read, then the most-recent (key,value) for any key will definitely be found - older ones will eventually be purged, though not immediately.

Reliable Delivery

Some message-brokers (particularly, IBM MQ-series) run an “agent” on each host. A publisher application submits messages to the local agent which then is responsible for passing the message over the network. This approach has the advantage that handling network errors is then the responsibility of the agent process rather than the publisher application.

Kafka doesn’t itself provide this feature; each client application is expected to open a network connection to relevant Kafka nodes (partition managers), and will simply fail (report errors) if Kafka cannot be contacted. However the bruce project provides a daemon service that can fill the role of a “reliable local delivery agent” for Kafka.

Installing Kafka

There is nothing particularly complex about installing Kafka - in fact, easier than many other message brokers.

The only quirk is that the Kafka server nodes require access to a Zookeeper cluster - something that you almost certainly already have if you are regularly working with clusters. Note that older versions of Kafka (<0.9.0) required client applications to contact the Zookeeper cluster directly, and therefore Zookeeper node addresses were part of the configuration required by Kafka clients. Since 0.9.0, Zookeeper is relevant only for the Kafka server nodes; client applications are not exposed to Zookeeper and require no Zookeeper libraries or network addresses.

The Kafka API

The client->kafka network protocol is carefully defined by the Kafka developers, allowing client libraries to be implemented in various languages.

Kafka was originally implemented in the Scala language, and for Scala developers there is a good client library available.

For Java developers, there is a good client library which provides an API in package “org.apache.kafka”. Note that there is an older API in package “kafka.javaapi” which is a rather clumsy wrapper around the Scala API; avoid this.

For other languages, see the Kafka wiki.

There is no JMS (Java Message Service) provider library for Kafka; while it has similar features to other message-brokers that do have JMS interfaces, it doesn’t quite fit into the datamodel assumed by JMS.

References

comments powered by Disqus