Distributed Read Models (Part 2: Implementation)

Categories: Architecture

Introduction

This article defines strategies and guidelines for successfully sharing entity state using Kafka message queues.

At willhaben our customer services are provided by a distributed system consisting of dozens of back-end components responsible for different DDD domains, and the associated data. These components often need access to data associated with other domains. Part 1 of this series talks about why we choose to make this data available via replication; this part discusses how we do it.

Sharing entity state via message-queues is not new. However we haven’t seen any published description of how to implement it which provides the level of detail included in this article.

This article is in fact a mild rewrite of our internal guide on implementing Distributed Read Models (aka Event-carried State Transfer). There are quite a few plain assertions in this article, such as the ones recommending values for Kafka topic partition counts. These are values/choices that suit us; you might need to adjust for your environment. And although we use Java and Spring, the vast majority of the recommendations below are language-independent.

In case you haven’t read part 1, the TL;DR is: any of our back-end components which owns data that other back-end components need access to must publish this data to a compacted topic managed by a Kafka message broker. Any component needing that data should read from that topic and store it in whichever form it finds useful (typically a database). This enables components to provide their services without needing to communicate synchronously with other components, providing performance, scalability, security, and various other benefits.

This article assumes you are familiar with Kafka in general. It’s also important to understand Kafka’s compacted topic feature. There is good documentation available online on this, but in general it makes a Kafka message topic act rather like a key-value database; instead of always deleting records after an expiry-time, the latest record for each key is kept permanently.

A note about the complexity of building Distributed Read Models: the length of this article looks a little intimidating, but in practice building a distributed read model for some model type isn’t all that hard. For the simplest approach it requires about 200 lines of code in the producer, and only about 50 lines of code in each consumer (assuming you have a persistence framework that makes storing data easy, a framework that easily allows connecting to Kafka, etc). It’s the sort of thing that might take a week for the first one, and only a day or two for similar models later. This article provides a lot of info about why specific approaches are recommended, but that’s not relevant to actually building the thing.

And finally, a disclaimer: this approach is relatively new to us. Our first such “read model data stream” is now about 1 year old, and we currently have 3 such streams with another 2 coming soon. So we can’t claim the guidelines here are a battle-proven solution — but it’s also not just theory.

Why the Name Distributed Read Model

The Model-View-Controller pattern is very well known. It is moderately well known that there doesn’t need to be one model; it’s often useful to maintain additional read-only models in parallel to the “core” model, in order to efficiently render specific views. The CQRS pattern, for example, is based on this idea. Here we are extending that concept so that the “read model” is not local to the application that “owns” the core data, but instead stored by a different component in a different database than the original, in whatever form that component requires — i.e. is a distributed read model. Some people may consider what is described here a variant of CQRS, and may call these distributed read models “projections” or “query models”.

The mechanism with which we keep these read-models updated is messages (aka events) passed via a message-broker; this pattern can also be found online under the name Event-carried State Transfer (somewhat of a pun on the name Representational State Transfer aka ReST).

Core Concepts of a Kafka-based Distributed Read Model

The owner of data writes messages to a Kafka compacted topic. When a topic is first set up, all relevant entities are written to the topic; this ensures that any new consumer has the full set of data available to it. New entities and changes are written to the topic when relevant, allowing existing consumers to keep their local copy of the data “up to date”.

These messages are current state snapshots of entities of a specific type, providing all relevant data for the entity1. The message is a logical representation of the entity, and avoids exposing details that are not truly part of the “domain model”. Exactly what data should be in this logical representation is a difficult subject. A data owner which publishes “all available data” for an entity is more difficult to refactor later — these fields are now part of the public API of the component and can only be changed if it can be proven that no consumer relies on the data. However a data owner which publishes only minimal data for an entity may later receive change-requests to extend the published data; such changes are possible but non-trivial (see later). The best guide seems to be to concentrate on the domain model, ie the things that are truly part of the domain’s ubiquitous language.

Data in the topic is not considered the source of truth; that is the internal storage of the component which produces the message-stream.

These messages are NOT change events such as “address updated”; a domain may produce such events if it desires but these are not a “read model” stream. The event sourcing design pattern is an excellent choice for many problems; in that pattern each event represents a change to an entity and the current entity state can/must be computed by applying all (immutable) events one after the other. However this approach has disadvantages when a data-owner uses it to share entity state with other components:

  • change-events need a lot more data to represent the full entity state
  • the data is more complex (different event types)
  • the consumer needs knowledge of how to apply each event-type to the previous state
  • event ordering is critical, and missed events or incorrectly ordered events cause incorrect object state to be computed
  • consumers generally don’t care about the historical states of an object

Transferring only the current state (as recommended here) avoids these issues when supporting consumers in the task of building a Read Model of the entities.

The pattern suggested in this article is more closely related to traditional synchronous calls than to things like event-sourcing or “domain events”. Imagine designing a synchronous API for a system which allows external applications to query the state of a particular entity, eg a handler for ReST requests of form GET /{sometype}/{someid}. That API would have some specific format of data that would be returned. This distributed read model pattern simply pushes equivalent data out pre-emptively so that external applications can store it in their local databases in a suitable form for them - and thus avoid the need for any synchronous queries for that data. The natural form for those messages is therefore “the full entity state”, just like a synchronous endpoint would return.

You may see comments on the internet stating that “Kafka is not an event store”. This is irrelevant for this pattern, as it is not being used as an event-store here; no component is trying to use it to directly query an entity by id, or query “the set of change events” for a particular id.

Before building any solution for sharing data (whether the solution suggested here or otherwise), it is worth looking into whether such sharing can be avoided. It is sometimes sufficient for data in multiple systems to share a common id without needing additional data. It may also (or in addition) be possible to push the “integration” of data from different components up into a higher layer.

Guidelines for Producers

Reliable Event Streams

Events representing “the new state of an entity” need to be reliably produced whenever an entity changes.

Assuming entity changes are being persisted to a relational database, that means ensuring a message is reliably sent for each transactional commit that affects the entity. Sending a Kafka message cannot be included as part of a database transaction because Kafka does not support cross-resource two-phase commit; it is therefore necessary to:

  • create some kind of “audit record” in the database as part of the transaction that updates the entity
  • and then at a later time send a message for each audit record

These audit-records can be created by code where the code-paths that update the entity can be reliably identified (using transactions to make sure the audit record is always created). Alternatively, database triggers can be used to create the audit-records whenever specific tables associated with the entity are updated. One advantage of the code-based approach is that the “audit record” can (optionally) contain the complete entity to be sent (serialized as AVRO) — aka the Outbox Pattern. The trigger-based approach is limited to storing just the entity-id in the audit record thus requiring re-reading the entity into memory when the message is actually created and sent.

Iterating over the set of unprocessed audit records and generating Kafka messages can be done with custom code in the producing application (a background thread polling for audit records), or via an external service such as Kafka Connect. However external services work best with the (full) Outbox Pattern, ie where the message to send has been fully computed and stored in the database; when only an ID is present in the “outbox” then the external service will need some way of calling back into the owning application to map that ID to the current entity state which is complex and inefficient - or the external service will need to be deeply coupled to the database schema (usually undesirable).

If the code scanning the “audit table” and writing to Kafka is part of a clustered application, ie runs as a pool of processes, then a mechanism is needed to ensure that only one instance does this scanning — or that such code is written in a way that parallel execution is safe. Implementing a simple “cluster mutex” solution using a relational database table is pretty simple (1 SQL update statement and about 5 lines of code). However there are also solutions pre-built as libraries (e.g. Shedlock) — and components such as Zookeeper if you really want to apply overkill.

Don’t forget to periodically delete old audit-records. When using code in the producing application, this is relatively easy: read a batch of audit records, send messages to Kafka, then delete exactly those records. It is true that a failure to delete records (e.g. an awkwardly timed crash or network failure) can result in the same message being written multiple times to Kafka, but Kafka in general is an “at least once” system, and so consumers need to deal with this anyway. When using Kafka Connect (an external tool for generating messages from database records), you are limited to deleting records “with field X less than Y”, which has a nasty trap: even when using auto-incrementing audit message ids, it is possible for records to be inserted into the database out-of-order which leads to missing messages.

If you do use an external scanning tool, and use a timestamp column to select audit-messages, use UTC and not the local timezone in order to avoid problems related to daylight-savings-time-change.

There are also “change-data-capture” tools that integrate deeply with specific databases to produce messages whenever specific tables change (“log scanning”). It is difficult to use such tools to directly produce a read-model message stream as such a stream should contain messages that represent “logical domain entities” and not the current internal representation used by the producing application (see notes below re logical representations and message schema compatibility). However the resulting stream could be used similarly to the “audit records” to inform the producing application which entities have changed, i.e. trigger creation of real logical messages.

Patterns that should be avoided (because they are not atomic) are:

  • on entity change, write entity to database and then write to Kafka
  • on entity change, write entity to database in one transaction, then write an audit-record in a different transaction

Entity Deletion

When an entity is deleted a Kafka message with appropriate key and null value shall be produced. There is further information about “tombstone compaction time” configuration later.

Writing a new entity with some “deleted marker” results in Kafka keeping this message in its compacted topic forever — something that is usually not desired. If a consumer wishes to mark entities as deleted rather than actually delete them, it can still do so when it receives a (key, null) message.

Event Minimisation

It is common for a database entity to be updated multiple times rapidly. When using database triggers, it is also common for an update to cause the trigger to fire multiple times.

Both of these situations cause multiple “audit records” for the same entity. However as our read-model data-stream always provides “the latest state” there is no benefit in sending this data multiple times.

Therefore when iterating over the “set of messages to send”, combine multiple messages for the same entity into one.

This combining of messages doesn’t have to be 100% accurate (find all messages); it isn’t wrong to write the same entity multiple times with the same state, just inefficient. It is therefore acceptable to read a block of audit-records and combine messages within that block only.

When fetching audit-records from the database, it may be helpful to select only audit-messages with a timestamp older than (now — X seconds); this ensures that rapid updates to the same entity get efficiently combined (minimises sent messages). It does of course introduce some extra latency in the read-model message stream but in many cases that is not important.

If you are legally required to send some event for each individual change, then do this via some other topic or mechanism ; the distributed read model approach is not intended for such use-cases. Not only does it combine messages where possible, but it also sends only “the latest state”, not change-events.

Re-Synchronizing Data

Each producer needs the ability to trigger a full export of all existing entities to the output Kafka topic. This full-export process will need to be run (once per environment) by each producer to produce the initial message-stream. It can also be used in cases where:

  • the topic’s content has been lost or corrupted
  • when the source data has been “fixed” via a bulk-sql-update or similar
  • when the export logic has changed, e.g. to add a new field to every exported (logical) entity

There doesn’t necessarily need to be a UI associated with this feature; it will be rarely used so it is considered acceptable if this logic is triggered via a non-trivial process. Options include:

  • triggering the export via a URL which is invoked using CURL or similar
  • providing an alternate “main method” for the application so that the export can be run directly via command-line invocation (assuming it is run in an environment where relevant resources are accessible) or via a Kubernetes one-shot task or similar.

Note that a “full export” will not delete messages in the topic for which there are no records in the producer’s dataset i.e. doesn’t “clear” the topic. This is probably not necessary for many domains, but it if is then the only way to achieve this is:

  • to write to a new topic (which will require all consumers to convert over), or
  • for the producer to know which records need to be deleted (eg if its database simply marks records as deleted rather than actually deleting them) in which case it can write null records (tombstones), or
  • for the producer to read the topic itself, checking whether each entity it reads can actually be found in its database (OK) or not (in which case write a null/tombstone record)
  • to set the topic “cleanup policy” to “delete” and set a short retention-time; wait until the retention-time has expired, then reset cleanup policy to compact again. This does mean that “new” applications cannot load historical data until the “full export” has completed, but that should be OK.

When a topic is already in use in production, then such a “batch update” can cause problematic latency; consumers really should process changes due to real customer interactions within a reasonable time, and not have those queued up behind less-time-critical batch updates. The full-export code should therefore have a configurable rate-limit, i.e. a maximum number of records to write per minute. When (re)exporting all records to a topic for which there are existing consumers, the write-rate should be set to a suitable value such that:

  • consumers can consume data faster than it is written
  • but the export also completes within a reasonable time-frame.

As an example, a large export may be scheduled to start at 10 pm, and to write at a rate such that the complete set of entities is output before 6 am the next day, ie rate = n-entities/(8*60) minutes. As long as consumers can keep up with this rate, they will also be able to consume messages related to “normal business activity” ie the applications will continue to function without large latencies for current messages. When doing a large export, consumers should be warned so that they can monitor the Kafka topic lag for their Kafka group, and temporarily increase the number of consuming processes if needed.

The code to export all entities should also be restartable, i.e. handle the case where the process doing the export is terminated without starting again from the beginning.

When audit-records are being created via database triggers, then an alternative to implementing code that re-exports all data is to simply execute SQL that “touches” each record in the relevant table(s). Or alternatively, simply insert the IDs of every record directly into the audit-table. However with this approach, any “rate limiting” and “retry” logic needs to be implemented by whatever tool is executing these SQL statements, or be part of the normal message-publishing process.

In addition to rate-limiting output, it might be helpful to have a “priority” field in the audit-table, with “normal” records (representing real-time changes) having a higher priority for writing to Kafka than the “batch update” records. However the primary issue that causes latency in consumer updates is having a large number of Kafka records in the topic waiting to be read; only limiting the write-rate for batch-updated records can ensure that.

And finally, note that when using the solution “insert ids directly into audit table”, preserving ordering for changes isn’t a concern. Events are always “the full current state” — even for older audit records. When 3 audit-records are created for an entity before the message-writing-to-Kafka actually occur, then three copies of the latest version of the entity will be written — which is fine. And hopefully the “message minimisation” process suggested earlier will actually reduce that to one.

Logical Representations

Before writing to Kafka, flatten internal data-structures into a clean “logical” representation of the entities (an “aggregate” in the terminology of DDD); consumers are not interested in your internal data representations. Using a logical representation also allows your internal representation to evolve without breaking compatibility with existing consumers.

Represent enum types by an AVRO enum type, not by integers eg declare a field as “UserStatus userStatus” not “integer userStatus”. Always define a default-value for any enum type; this allows new enum values to be added to the type later without breaking existing consumers. In the following example, when Shapes is updated to include ELLIPSE then any consumer using an older version of the schema will get UNKNOWN instead.

enum Shapes {
  UNKNOWN, SQUARE, TRIANGLE, CIRCLE
} = UNKNOWN;

If you make an incompatible change to the format of objects being written, you will need to provide two streams (topics) for a phase-out period, allowing consumers to update and switch over. This is a lot of work — so it is best to get the AVRO schema correct the first time — or at least make sure it is extensible.

The format of data written to the topic must be made available as an AVRO schema definition and the schema must be stored in the company Kafka schema registry. Consumers may choose to:

  • get a copy of the AVRO schema (.avdl file) and generate code themselves from it (recommended), or
  • just copy the Java classes from the producer application and check them in locally (hacky but acceptable)

The producer should not create a library with the AVRO classes in it; that encourages undesirable coupling between projects.

Note that consumers will pick a schema version (whatever was current when the consumer was written) and stay with that version unless they have a need to update. Producers have no right to demand that consumers update to a new schema, i.e. must preserve backwards compatibility. The schema registry assists producers in maintaining backwards compatibility (see later).

Message Keys and Partitioning

Choose the key used for messages carefully; fixing this later is hard. The key should:

  • be stable over time, i.e. consistently refer to a single entity as its properties evolve over time
  • ensure proper grouping of objects to avoid race-conditions (Kafka ensures all messages with an identical key are processed in order) — though the point above already ensures a suitable key is chosen.
  • ensure approximately even distribution of all messages over the chosen number of topic partitions (i.e. the key should hash well)

If the entity being written to the stream has a UUID identifier then this is often a good choice for a key; it isn’t a mutable business property and is random so distributes evenly over partitions.

Use a suitable topic partitioning count. There are cases where a consumer needs to reprocess all existing records in the topic; for topics with large numbers of records it is important that the consumer at least has the option of doing this with multiple processes in parallel — and that parallelism is limited by the Kafka topic partitioning. For topics with more than 1 million records use at least 16 partitions. For topics with more than 100k records use at least 8 partitions. For smaller topics, 3 partitions is recommended.

Other Producer-related Issues

Provide a “version-number” on each message where possible. This might be assigned by the “trigger” which creates an audit-message, or a timestamp might be appropriate for rarely-changed data.

If you have multiple non-production “development and test” environments, use a different Kafka topic for each environment.

Set “tombstone compaction” time on the topics to at least 2 days. When an entity is deleted, a message should be sent with an empty body; consumers will see this message and can delete the corresponding entity from the read-model — but only as long as the tombstones haven’t been compacted. After compaction, the entity (key) is simply not present in the topic any more — and so consumers will not be aware of its deletion unless they re-scan the entire topic then check which entities in their read-model have not been found. Choosing a reasonable tombstone-compaction time allows systems that are down for less than that period to still notice the deletion.

If messages contain strings then enable compression in Kafka for the topic.

Consider including a “source” field in the messages which represents the component that generated the messages; this

  • can be useful for topics with more than one producer writing to them — though this should be rare as we expect each dataset to have a single owner
  • can be useful to indicate different “causes” for a message. In particular, when the producer does a “complete re-export” of its dataset, the source can be used to indicate this, allowing consumers to skip some processing that should only be done on “real” object changes.
  • can make the data-stream more “self-describing” — though a well-chosen Kafka topic name can also do this, and without making each message larger

Guidelines for Consumers

General

A consumer listens on the relevant Kafka topic, updating its internal data-storage on each message.

Ensure that per-development environments (i.e. when a developer starts a component on their local laptop) do not consume messages from a common/shared Kafka topic, eg the topic for “the common development environment”.

Storage of Consumed Events

Only fields relevant for the consuming app should be extracted from the message; don’t try to store everything that is available. Selecting only relevant data:

  • saves space
  • minimises data leakage in case the consuming app has a data-leak
  • minimises GDPR/data-privacy issues
  • makes the consumer more stable against changes to data-format from the consumer

When writing messages to a relational database table, always use dedicated tables to hold read-model data, i.e. never mix read-model data and non-read-model data in the same table. It is recommended that table-names have the suffix “_rm” to indicate that they are read-model, i.e. read-only, data.

Choose a storage format appropriate for the purposes that the consumer uses the data for; it doesn’t have to mirror the format from the stream. In particular, the data can be enriched with additional fields (as long as those fields are not mutable because read-model tables should be read-only). See the CQRS pattern.

When the stream provides a unique “version number” for each message, then store that value. If this is not available, then consider storing the Kafka (partition, messageid) pair instead. If a version-number is available and could reasonably be considered part of the “business domain” then it may be stored on the same table as the entity. In other cases (artificial version-number, or Kafka offsets) the data should be stored in a separate table that shares a key with the entity. Retaining this information (i.e. the “currently imported version” for each entity) supports various use-cases including:

  • supporting idempotent processing in the presence of duplicated messages (Kafka provides at-least-once semantics, i.e. the same message could be received multiple times)
  • supporting idempotent processing while re-processing the entire Kafka compacted topic
  • detecting lost messages

Ensure Idempotence

When writing the code that consumes messages from Kafka, consider the effects of receiving the same message multiple times — i.e. idempotence. Kubernetes services can be terminated at any time, but Kafka messages are acknowledged in blocks (consumer offset updated); this means that any message can potentially be delivered multiple times. In particular, consider the implications of interacting with external systems as new versions of a read-model entity are received (e.g. sending notification to fraud-validation systems).

Consider also that the data producer may “re-export” all data to the topic; there may be operations that a consumer does not wish to re-execute and so needs to detect and handle such duplication.

Re-synchronizing Data

The consumer may choose to provide functionality to re-read the entire input topic, updating (overwriting) existing database state either partially or fully. This allows issues related to incorrect synchronization of the local read-model with the contents of the Kafka topic — regardless of how that synchronization problem has occurred. For topics with small numbers of values, this may be achieved simply by (manually) modifying the offsets associated with the consumer’s Kafka group id in the Kafka system. For topics with large numbers of records, it is recommended that:

  • the consumer makes the Kafka group-id configurable
  • the consumer provides an additional “main method” which starts just the code that reads from Kafka and updates the read-model

Re-reading the topic is then relatively simple: start a process which uses the above alternate main-method (eg as a Kubernetes one-shot job), configured with a new group-id. The new process will update the domain’s database without any performance impact on the existing service.

Constraints:

  • where an entity in the topic has recently been modified there may be multiple records due to compaction not having completed. The entity may therefore temporarily get “reverted” to an earlier state during processing. Possibly the “entity version” field could be used to avoid this issue…
  • where an entity has been deleted and the “tombstone compaction time” has been exceeded, then Kafka provides no way of detecting the absence of a specific entity. One possible solution is to add a column to the consumer’s entity table with a “last updated” timestamp; after completion of re-synchronization, any record whose “last-updated” is older than the start of the re-synchronization process can be deleted. Other similar variants of this approach are possible.

Something similar to this “re-sync” process will need to be executed at least once by any new consumer of an existing topic, ie when a new component is developed and put into production it will need to fully process the existing topic. This can be achieved just by starting the application without making it accessible to users until the synchronization process is complete; however the “one-shot job” approach can possibly be helpful here too — particularly by applying a level of parallelism higher than the application would use in normal operation — and by not activating irrelevant logic during this “data import” phase.

Handling Changes in the Read Model Contents

As noted earlier, a consumer should only extract and store fields which it is interested in. However it may happen that later development of the consumer requires data that it previously did not obtain from the messages. In this case, the schema of the table in which the read-model is stored should first be extended and code extended to extract/save the additional field. Then the “re-synchronize data” process can be applied as described above.

Handling Unreadable Data

When using spring-kafka integration, always wrap the key and value deserializer types in ErrorHandlingDeserializer so that a deserialization failure doesn’t just lead to spring doing infinite retries — with a logged exception each time. This ensures that your handler-method will be called even when the message cannot be deserialized using the provided settings.

Then ensure that your registered error-handler deals with the problem appropriately. Don’t consider a failure to process a message as “an invalid message”, but instead as “incorrect message handling code”. This means: do not ever just discard messages; this will lead to the read-model and the upstream “source of truth” gradually diverging from each other in a way that is hard to detect and hard to fix. In general, if a message cannot be read then processing should stop until the code that processes messages has been updated to handle that message appropriately. For Java applications, configuring an “error handler” which subclasses standard class StdErrorHandler and configures an appropriate backoff specification (with no maximum retry limit) is usually the right solution.

In situations where simply stopping further message processing has unacceptable consequences, then the following is recommended:

  • store the partition and message offset of the un-processable message
  • raise an alert
  • continue processing, skipping further unreadable messages
  • when fixed code is deployed, reset processing of messages to the offset stored above, i.e. reprocess all messages in the same partition since the failed one, in order, so that message ordering is appropriately applied

Moving un-processable messages to a “dead letter queue”, aka DLQ, is not recommended. Using a DLQ does allow processing of messages to continue, ie doesn’t block data replication until a developer addresses the issue — under the assumption that only a small percentage of messages are affected. However:

  • when one message fails, it is likely that many other messages will also fail
  • the contents of a DLQ are often just ignored
  • moving data from the original (topic, partition) to the DLQ breaks guarantees regarding ordering of messages, e.g. can result in an older version of an entity overwriting a more recent version.

Handling Event Latency Issues

Occasionally, a component is expected to provide data that it holds in a read-model in “near real time”; a user interacts with some system which first updates data in domain X, then fetches data from this consumer component — and expects to see the updated data. Obviously, read-models are asynchronous so the desired data will only be available at some later time (a few seconds to a few minutes).

The best solution is to avoid the problem; return data with a “valid since” timestamp so that the client application can see that the data they are seeing isn’t “real time” and present that to the user.

Other solutions include:

  • have relevant “update” APIs return the updated entity, and have the client merge this data into whatever is displayed to the client, until the updated data is available from the relevant back-end component API.
  • have relevant “update” APIs return the updated entity, and have relevant “search-like” back-end component APIs take an optional entity as parameter. When present, that data is merged into the results that the API returns. The client is responsible for taking the return value from the update and passing it to the next search/fetch call.
  • have update and search APIs pass through an integration-layer that does one of the above (to avoid this logic in the client)
  • have relevant “update” APIs write both to Kafka and to some shared datastore (eg memcached) — with such data marked with an expiry time that is slightly longer than the max expected latency of data propagating via Kafka. And have relevant search APIs check the datastore and merge that data into the results. Because interaction with the shared datastore is synchronous, this solution provides immediate consistency. The (short) timeout ensures that the shared datastore is only a temporary solution and the “source of truth” remains with the owning component. This approach simplifies clients, but does have the disadvantage of being somewhat complex, and of creating increased coupling between data owner and consumer.

Having the consumer bypass the read-model and fetch data directly from the producer using synchronous calls should only be used as a very last resort; this has many disadvantages for the stability and maintainability of both the consumer and producer.

Consider Managing Kafka Offsets Manually

Reading a Kafka topic correctly means knowing which messages have already been processed, ie keeping a set of (topic, partition, offset) values. Kafka can track that for an application if desired, requiring the consumer just to provide a “group id”. However it is worth considering tracking this data directly in the consuming application; in particular when this data is stored in a table of a relational database, then a single transaction can be used to update the read-model and update offsets as an atomic unit. Storing the offsets in a database also makes it easier to support the cases described in “Re-synchronizing Data” and “Handling Unreadable Data” above.

Kafka brokers require consumers (connected clients) to regularly poll for new messages. If a consumer does not poll within the required interval (default: 5 minutes) then the brokers assume that the consumer has crashed or hung and so redistributes partitions among the remaining consumers. The brokers also assume that the most-recently-sent block of messages have not been processed, so don’t increment the message offset — i.e. that block of messages will be reprocessed. This can lead to the consuming application repeatedly re-processing the same block of messages over and over again. Therefore, ensure consumers do not take too long to process each message-block; in particular:

  • ensure that any calls to external processes (e.g. rest calls) made during message processing have reasonable timeouts
  • ensure that the “block size” (number of messages retrieved from Kafka) is set low enough that all messages can be processed within the poll-interval

Ensure that per-developer environments (i.e. when a developer starts a component on their local laptop) do not consume messages from a common/shared Kafka topic, e.g. the topic for the “common development environment”. When using spring-kafka integration this means setting annotation attribute “autoStartup” appropriately. This is less important when using a local database and tracking Kafka topic offsets manually (see earlier), but it’s still unnecessary in most cases for a development instance to be consuming messages in the topic.

Producers and the Kafka Schema Registry

When a component writes entities to Kafka for a “read model”, the format of those messages becomes part of the API of a component, and must remain backwards-compatible (to avoid breaking consumers).

The Kafka Schema Registry provides an elegant way of enforcing that compatibility. Producers should register new schemas for their read-models with a registry as part of their deployment process; a non-backwards-compatible schema will then cause deployment to fail.

The schema must be registered using compatibility-level “full-transitive” to ensure that any data that a producer writes can be successfully read by all existing consumers, i.e. no existing applications will break. Note that “forward-transitive” is only sufficient when a producer always does a full-export of data after upgrading its schema; without this, a compacted topic can contain messages with old versions, making it impossible for a consumer to use the producer’s latest schema version.

Summary

As this article shows, setting up distributed read models involves a moderate amount of complexity. However as described in part 1 of this series, it brings simplicity in other areas, as well as performance, scalability, security, and other benefits. We’ve found this approach to work well for us so far.

Change History

This article was written by myself while an employee of willhaben, and originally published on Medium (with link from the company website) in February 2023. Minor updates have been made here in June 2023.

References and Further Reading

Footnotes

  1. Actually, what such endpoints typically return is an aggregate, and this is also what should be embedded in any emitted asynchronous messages.