Generating Change Events with the Outbox Pattern

Categories: Architecture, Programming

Introduction

Sometimes an application needs to write to a database and to send a message to a message-broker at the same time - ie “atomically” or “in the same transaction”. This article describes the options for implementing this, including the outbox pattern. In particular, this article focuses on sending data update notifications beteen components in a microservice architecture.

The outbox pattern is pretty well-known, but the majority of descriptions are pretty brief; in this article I want to go into the details of different variants, options, and their tradeoffs. The information here is the result of quite a lot of experience in this area over the last few years, as I have been deeply involved in the design and implementation of a microservices-based system that uses events to replicate data.

The Problem Description

A major application at my employer had initially implemented notification events by simply writing to the database then sending the message to a broker (ActiveMQ or Kafka) without any explicit transactions; in practice, it just wasn’t reliable. The exact reason why some messages went missing was never 100% identified, but probably due to things like new system deployments (requiring application restart), intermittent network failures, or software bugs. Having 99.5% of records processed correctly turned out to be not good enough, so downstream applications started implementing “nightly resync jobs” to check for and patch up missing data, and various other ugly workarounds. A number of these code-points have now been moved to the outbox pattern and everything is much more reliable.

As we have continued our migration to microservices, we have a similar issue: each microservice has its own database, and each data item belongs to just one microservice, but other services may need read-only access to that data. We do this by replicating the needed data; when the owning service modifies data it sends a logical change event which other services may subscribe to in order to update their copy of that data. While the details are somewhat complicated, the most important step is that these messages must be sent reliably. We use the same pattern as was applied to the legacy system: the outbox pattern.

Using an outbox isn’t the only option though; this article looks at everything we considered.

Options for Triggering Events

As far as I am aware, the following is a complete list of the available options. Each option is explored further in a following section.

  1. Use distributed transactions.
  2. Use the built-in change event stream of a database (available only in a small set of databases).
  3. Use an external transaction-log-tailing tool (eg Debezium).
  4. Use an external tool which polls the database for changes.
  5. Write outbox records from application code.
  6. Write (partial) outbox records via database triggers.

Only the last two use the “outbox pattern”.

Each of these options has sub-variants which are described further in the detailed explorations below.

There is yet another option: to avoid doing an update and message-send atomically, by instead just sending the message and then asynchronously driving the update from that (sometimes called “listen to yourself”). It’s a very valid pattern but isn’t discussed further here.

Using Distributed Transactions

There are various protocols that really provide ACID transactions across multiple resources (cross-resource distributed transactions), ie not limited to just a single database. The majority of mainstream relational databases support this protocol, as do some message brokers. Depending on which software you are using, it therefore might be possible to actually do a single ACID transaction which updates a database and sends a message, with the transaction-manager ensuring that either both are successful or both are rolled back.

However there are many databases (particularly NoSQL databases) which don’t support this, and many message-brokers also do not support it (for example, Kafka).

Even if you happen to be using components that do make this possible, there are quite a few disadvantages. The database tables and message queues involved in the transaction often get locked for the duration of the transaction, leading to performance issues and potentially deadlocks elsewhere. It probably makes it impossible to batch transfer of messages from a client application to a message-broker - ie disables a major performance feature. When the message-broker is temporarily not available, then delays (timeouts) can occur in the request-path, followed by a rollback - although the database is working fine. And simply setting up the transaction-manager is quite complicated.

Personally, I would recommend against using this approach; distributed transactions are sometimes not possible and are always difficult.

Use a Database’s Built-in Change Event Stream

There are a few databases which make it very easy for applications to receive notifications when records change. These include PostgreSQL (a relational database) and MongoDB (a NoSQL document database).

It therefore seems possible for the application which owns the source data to itself subscribe to these events, and for each event to emit the corresponding logical event to a message broker.

I haven’t tried this approach (the database I have been working on does not support such events) but it seems promising. There are a few possible complications:

  • Is the change event stream reliable?
  • Does the change event stream require special privileges to subscribe to?
  • How hard is it to “de-duplicate” events, ie to emit a single logical change event for a database update that modified multiple tables/entities?

A reliable stream of change events is needed. The following questions should be considered:

  • What happens when the listening application is deployed?
  • Does this work in a cluster of database servers (if applicable)?
  • Does this work when failing over from a “primary” database to a hot-standby instance?

It is of course possible to use a dedicated instance of the primary application to handle such events, ie to deploy N instances of the application to handle regular work, and one (or more) instances dedicated to handling the event-stream. This is particularly tempting if subscribing to the event-stream requires special DB account permissions; the instance handling normal requests can continue to use a normal-privilege database account.

It would be possible to write a different application specifically for handling the raw database change events and mapping them to logical events, but this application would presumably need to copy a lot of the logic from the “primary” application and would be deeply coupled to its data model. A dedicated code-base for this therefore seems like a poor idea; just building the event-mapping logic into the primary application seems like a far more sensible solution.

It would also be possible to write an application which listens to these raw database events and makes calls to the primary application to either (a) return the logical event to be emitted or (b) trigger the event to be emitted directly. While this does decouple the primary application from the raw database event stream, there is a significant price to pay in complexity and performance; this seems like a bad idea (though sadly one I have seen implemented - on top of Kafka Connect in that case).

De-duplicating events is probably not an issue; such change-events are typically emitted “per commit” ie represent a full transaction and therefore all the necessary context is available. However the details would need to be checked per-database as there is no standard for this kind of feature.

This approach might have a minor performance issue; when an event is received then the associated entities may need to be read from the database in order to build the logical event (which option 5 avoids for example). However the entities to be read have just been changed, so are likely to be in memory in the database ie a network round-trip is required but little/no disk IO. That seems reasonable.

The task of mapping “raw” database events to logical events has been mentioned several times above and is considered important. Simply exposing the raw database change events to other applications (ie just piping these changes into a shared message-broker, possibly with some minor restructuring) will produce a very tightly coupled system. Any application consuming these events will need to know (be exposed to) very low-level details of the owning application’s database schema. This in turn is likely to lead to the need to do “synchronised deploys” when that schema must be upgraded to implement new features. Only applications which are already expected to be deployed together as a single unit should be coupled in this way (ie NOT code belonging to different domains, and ideally not even microservices belonging to the same domain). Even for a single application which consumes “its own” change events, this can cause problems when upgrading from one version to the next. Therefore, I strongly recommend mapping any such “raw” events into proper logical events that represent what changed at the domain level not at the persistence level.

Using a Standalone Transaction Log Tracker

There are various standalone applications such as Debezium which scan the “transaction logs” of different database types and generate messages - typically written to an event-broker such as Kafka.

This is a very efficient, low-latency, and reliable way to see what changes are occurring in a database.

The primary concerns with this are:

  • they are often commercial applications, ie require a license;
  • they are often difficult to install (require tight integration into the database deployment);
  • as standalone code, they require regular patching/upgrading; and
  • it can be very hard to generate a proper “logical representation” of the change which has occurred.

In addition, it is important to configure it to report only changes to tables of interest; it is likely that only a few percent of the tables in the database are actually relevant for the emitting of logical change events. Incorrect filtering will swamp the consumer with irrelevant data - but correct filtering means keeping the primary application and transaction-log scanner configuration “in sync”.

Many of the pros and cons for this approach are described in the section above.

One issue to consider in particular here is generation of logical events. If the tool is capable of writing messages directly into an event-broker, then it is tempting to just do this and mark the task as done. However consider carefully whether the event format being generated is truly a logical representation, or is reflecting the source database schema too directly. Incorrect abstraction here can either limit the ability of the primary application to change its database schema, or force consuming applications to be updated and deployed synchronously with changes in the primary application. When the consumers are applications in the same domain and maintained by the same software team then this is can possibly be tolerated (though it is still not good); when the consumers are applications in different domains and maintained by different developers then incorrect abstraction has a hugely high price, ie proper logical event representations are far more important.

An additional issue is the complications related to installing and managing the external transaction-log-tracking software. These additional complications are significant, and for me make this solution less appealing than option 6 (and maybe 5 depending on context).

Note that Debezium can be used as a library and embedded directly into an application; this mode can be considered a subcase of the previous option. Otherwise, Debezium is actually deployed as a plugin for Kafka Connect - but that is really just an implementation detail and the behaviour is quite different from the “poll database” option described later which can also be implemented with Kafka Connect.

Polling the Database for Changes

There are standalone CDC servers (such as Kafka Connect) which periodically execute SQL queries against a database to detect changes, and then emit these changes somewhere more accessible (eg to a Kafka message broker). This does require that the table be structured appropriately; an event-source approach definitely works but so do tables which have an incrementing version-number in the row, or a last-updated-timestamp.

If the table being scanned is a true outbox table (see option 5) then this can be an effective solution; this is discussed further later.

However if the tables being scanned are the real entities for which events should be produced then it can be very hard to create a proper “logical representation” of the changed data; instead some other code must read the events produced by this system and map them to proper events - at which point a solution like database triggers may as well be used as it is more efficient and less complex.

There are also a few cases where race-conditions can lead to events being missed. The scanning application needs either a “version counter” or a “last-changed timestamp” in order to filter out entities which have changed since its last scan. However databases can’t reliably provide these. The standard way of allocating version-counters is a database SEQUENCE type, but a number is allocated from this type at the point where a SQL insert/update command is executed, NOT the point where a commit is done. When two transactions run approximately in parallel, the transaction with the higher sequence number can be committed first. If the external tool then happens to execute its query, it will note that higher number as its “current state” and the other transaction (with a lower sequence number) will never be detected. The same issue can happen with timestamps; a timestamp allocated as part of a SQL statement is generated at the point of insert/update, not at the point of commit. In addition, timestamps have a maximum resolution, and timestamps are sometimes generated client-side rather than in the database leading to yet more possibilities for race-conditions. I’ve seen the out-of-order-commit problem actually occurring in production when using Kafka Connect - a fraction of a percent of messages were simply being skipped.

In addition, this solution requires the external tool to be configured with SQL query statements that are very tightly coupled to the database schema, ie any changes to the schema require changes in multiple applications and synchronised deployments.

This approach also has problems detecting deleted records.

This approach also has relatively high latency; queries are being run against the entity tables in the database which are presumably large, and therefore the query cannot be run too often - but this then increases the delay between a change being made and the corresponding event being emitted.

While this approach might be reasonable in cases where the primary application cannot be changed, it is slow, complex, and unreliable.

Outbox Records From Application Code

The standard description of the outbox pattern is that application code starts a transaction, updates any relevant entities, computes the message to be sent and writes this as a BLOB to some “outbox table”, then commits the database transaction. Some background thread polls that outbox table and simply emits that BLOB value 1:1 as a message (or this step can be delegated to something like Kafka Connect).

This is indeed an elegant solution:

  • easy to understand;
  • no external components required;
  • no database triggers;
  • no special database features required; and
  • high-performance (in particular the data needed to create logical event representation is probably already in memory).

However this approach does have a few pre-requisites.

  • it must be possible to identify all code-paths which update relevant objects;
  • it must be possible to update the application code, wrapping all relevant paths in transactions, etc.; and
  • creation of the logical event must be done as part of the user’s request.

Those first two items can be particularly tricky when adding change-events to a legacy system. Sometimes there are multiple code-paths which update a specific entity, and all of them must be found and wrapped in transactions that also create the logical event.

The last item is not normally a problem, but could be relevant for very performance-critical systems.

Partial Outbox Records From Database Triggers

Database triggers can be used to insert records into an outbox table whenever specific tables are modified. In this case, “outbox table” is possibly not the correct description as what is in the table is not messages to send, but simply the ids of modified entities (and possibly the operation ie insert/update/delete).

Some application then needs to periodically scan these records and emit logical representations of the changed entities. This is relatively easy to implement in a single-threaded1 manner (ie where there is no need to handle parallel scanning of this outbox table). As mentioned earlier, while this could theoretically be a dedicated application, simply having the “primary application” be the one that performs this work seems by far the best solution.

It is assumed that multiple instances of the primary application are deployed (scalable microservice). In order to make scanning of the outbox-table “single-threaded”, a “dedicated instance” of this application can be deployed (with appropriate configuration), but that does complicate deployments. Alternatively the instances of the cluster can dynamically choose at runtime which of them will perform that work; see this article on cluster-locks/leader-election for some suggestions how to achieve this.

The primary advantage of using triggers is that it is relatively easy to catch all updates; the number of tables in a database is far smaller than the number of code-paths in an application and therefore identifying the relevant ones is easier. It also avoids needing to deal with transactions in the code; triggers are by definition correctly integrated into the associated transaction. A given aggregate may consist of multiple entities, but:

  • child entities in a relational database always have the ID of their parent (in composite key or as a regular column), and
  • when a child changes, the logical event to be emitted is the parent, not the child.

It is therefore possible for a trigger to store the ID of the primary entity in the outbox table, and the code which processes that table then has the data needed to generate the appropriate message.

The advantages of this approach are:

  • reliably catches all changes;
  • not intrusive in the code;
  • offloads the work of building a logical representation to a background thread, not the original request;
  • no external processes or tools needed;
  • no extra libraries needed; and
  • no extra DB privileges needed

The disadvantages of this approach are:

  • a transaction may cause multiple triggers to fire, ie code generating messages should do “de-duplication”;
  • managing triggers is a little ugly; and
  • the primary entity (which was just in memory) needs to be re-read from the database, ie slightly less performant than the previous option.

Overall, the pros significantly outweigh the cons, ie this seems a very useful pattern.

Summary

I personally have experience of option 6 (database triggers) and it works very well. The extra code needed to scan the “outbox table”, build/send logical events, then delete the processed outbox records, is reasonably small. The (several) data streams we have built using this approach are running reliably in production under reasonably heavy load.

Option 5 (core outbox pattern, code generates messages directly) certainly has advantages in a modern codebase, but can be tricky to apply even there - and can be really tricky to integrate into a legacy codebase. It also still requires an extra background thread to process the outbox messages (as above) - unless that is delegated to something like Kafka Connect.

Option 2 (using a change-event-stream generated by the database itself) seems interesting, though I haven’t tried it.

Distributed transactions (option 1) have many complications and weak points. So do external CDC tools (option 3 and 4).

In all cases, I would recommend using the primary application itself to generate the logical change messages, rather than creating a dedicated application. The messages are tightly coupled to the logic and data schema of the primary application and separating this out only creates additional problems.

References

Footnotes

  1. One of the articles in References suggests using SQL statement “select .. for update skip locked” to efficiently scan an outbox table from multiple processes in parallel. This statement isn’t standard, but is supported in at least MySQL and PostgreSQL.