Categories: Programming, BigData
Introduction
This article summarizes two related design patterns for managing persistent state in large enterprise systems.
In 2011, Nathan Marz wrote a blog article called “beating the CAP theorem” which describes a design-pattern that he later named “the lambda architecture”. His book “Big Data: Principles and Best Practices of Scalable Realtime Data Systems” (Manning press) explores this approach in more detail.
In 2014, Jay Kreps wrote an article describing a simplification of this approach; he refers to it half-jokingly as “the kappa architecture”, noting that the concept is so simple that a name for it is possibly unnecessary.
Simple or not, I found both ideas well worth getting to grips with. These ideas are also related to many other recent movements in IT, including functional sytems of all sorts, “event sourcing”, and event-processing in general.
There are many good resources on these topics: the links above are good starting ppints. The rest of this posting is just my own non-expert opinion and musings on the matter; in part it is simply a “rephrasing” of the core concepts of both approaches in a way that helped me grasp what is central to the ideas.
Motivation
The Kappa architecture is is a variant of the Lambda architecture (and I see it as a special simplified case); you should read Jay Krep’s article (quite brief), and Nathan Marz’s original. The Manning book is large, and only worth the time for those who are seriously considering building such a system.
Enterprise systems generally have one or more large relational databases at their center. These databases hold “the current state” of the system, and other applications query and update this state. However this approach has problems with race-conditions and performance bottlenecks. Caches are sometimes used to help scale such systems, but this introduces its own issues. Sometimes such systems keep “history” information (also known as audit trails) which theoretically show how the current state has been reached. Both the lambda and kappa approaches invert this approach: the “history” is viewed as the real data in the system, and the “current state” is just a cache derived from this information. This approach can result in a more reliable and scalable system; it also is more resistent to programmer failures (bugs in the software responsible for updating state).
As an example, a few years ago I worked on a large project to develop an account-management system for investment banks. Persistent data included users, accounts, and inter-bank reconciliation data. An Oracle database held the “current state” of many persistent entities; code would use a custom persistence layer to read “objects” from the database, modify them, and write them back. The persistence layer wrote “history records” tracking the changes that were made. This is all fairly standard practice in traditionally-architected systems, going back many decades. And development of this system was very painful. Data conflicts, transactional problems, and broken history information were always popping up during development and testing. Sadly, this is not unusual - many projects have such issues. The complexity of modern data-models and the volumes of data make persistence based on traditional read/modify/write much harder than it used to be.
The lambda/kappa approach would be to instead store “change events” as the primary persistent data. Of course the system users do not want to view raw history information (except in specific cases); they want to know data at the current time. The answer is to derive the current state from the history and cache that in a database. The advantages are:
- storing the original events is conceptually simpler
- the original events are immutable
- losing or corrupting the current state is not catastrophic - it can be derived again if needed
- if there is a bug in the way the current state is derived, the code can be fixed and the state recomputed.
The last two points in particular indicate the greatest difference between current-state-based and history-based approaches:
- given the current state, the history cannot be derived, but
- given the history, the current state can be derived.
The Lambda/Kappa approaches are relevant any time an information system can be described as a large flow of events which cause modification to a current state.
The Lambda Design
Jay Krep’s article on Kappa summarizes Lambda very well, and links to Nathan Marz’ article for further details. I’ll therefore give only a very brief summary here..
In Lambda, historical information is periodically reprocessed by a batch-mode program to generate “batch layer” outputs in a database. Such data can then be used for reporting or similar. At some later time, the historical information (now updated with additional events) is reprocessed, producing an updated version of the “batch layer” derived outputs which then replaces the older version. Between batch runs, a realtime-mode program runs against incoming events to incrementally update a “speed layer” database which is an incremental overlay of the batch-layer. Applications which need the very latest results must somehow merge the batch-layer and speed-layer results.
Historical data does not need to reach back to “the beginning of time”. For some use-cases, ancient history can just be discarded. For others, a set of events can be replaced by a single event representing “the current state” at some point in time - or in other words, a “snapshot” of the state at some point in time can be used as the starting point, and earlier events discarded - though the snapshot should be chosen with care as modifying the computations that created it is obviously no longer possible.
The Problems with Lambda
There are a couple of issues with the above design:
- The batch-mode and realtime-mode code often has the same logic (eg
for each transaction: cust_balance = cust_balance + transaction_amount
). However they are implemented in different styles: batch vs streaming. - The “current state” is spread between two databases: “batch” and “speed”. Any app which wants the current state needs to somehow merge these.
Implementing the same logic in two different forms (eg batch-mode spark and streaming-mode storm) is painful. Even if the implementation language is the same, the code differs: the streaming layer code is aware of the batch layer outputs.
When “current state” is only exposed via REST services then the REST service implementations are responsible for this read/merge logic. However this makes it difficult/impossible to run reports against “the current state” - reporting tools do not understand how to read/merge the two DBs.
The kappa solution is to have the current state in just one database, and use just one way of processing event data (whether historical or current): a streaming (realtime) program.
Kappa Design
Kappa is a simplification of Lambda which can be applied if:
- processing of events can be done correctly in a streaming algorithm (not all logic can be so expressed), and
- processing of incoming events can be done at sufficient speed without taking “shortcuts” or making approximations
When derived data needs to be regenerated (eg when an algorithm has been changed) then you:
- create a new output database (set of tables)
- feed all the historical raw data into a new instance of the streaming algorithm
-
and when the reprocessing has “caught up to” the current time:
- start feeding the real-time data into the new streaming instance
- make the new output database the “current” database for downstream apps
- stop the old streaming instance and discard the old database
If you don’t mind some outage time where the target database is not available, you can simply stop the original stream, clear the output tables, then restart processing at the relevant historical point.
The Kappa approach is rather similar to a database “materialized view”, where a table is simply an SQL statement applied to another table with the results cached (rather than a normal view which evaluates the view-sql on-the-fly).
When reprocessing historical data with the Kappa approach, the number of parallel processes can be increased as needed. As example, if two parallel streaming processes may be sufficient to keep up with the incoming realtime data then “reprocessing” might be started with 10 parallel processes and reduced to two once it has “caught up” to realtime data.
As mentioned above in the section on Lambda, history may start with some “base current state”, with events older than that date having been discarded.
The benefits of Kappa over Lambda are significant (as long as the system requirements allow Kappa to be applied):
- all the complexity of separate batch-layer and speed-layer databases disappear; the current state is held in just one database.
- the complexity of two different programs for processing the same input data disappears; the same application is used in both cases.
- reporting tools have access to the very latest data (no separate batch/speed databases)
- system operations is simplified (fewer processes and datastores to manage and monitor)
It is not always the case, however, that the “batch mode” logic can be implemented in realtime (streaming) form. The realtime component must optimise for performance of processing; the batch component may optimise for performance of querying the outputs. For some queries, “rollups” (denormalized precomputations) may be necessary when querying large datasets; it may be optimal to compute these during batch processing but skip them during realtime processing on the basis that the amount of data in the “speed layer” is limited. Sorting and indexing may be another case - in-memory sorts and linear searches may be adequate in the speed layer, given the limited amount of data.
Why Reprocess At All?
In the Lambda approach, the realtime (“speed”) processing layer may be allowed to make approximations, and make simplifications such as ignoring the possibility of rare race-conditions. When the full batch-mode pass is made, any errors introduced by the realtime processing is discarded and replaced by the proper results. Obviously, when the realtime layer is taking advantage of this possibility, then batch-mode processing should be done regularly (eg weekly, daily or maybe hourly).
When the realtime processing layer is considered to generate results just as correct as the realtime layer, then the “batch” computations has only one primary goal: to ensure that if it is ever necessary to regenerate the current state from historical data, then the infrastructure to do so still works. Running the reprocessing once a month would probably be sufficient in that case.
In the Kappa approach, it can be assumed that the realtime processing code generates 100% correct outputs - after all, in Kappa it is the same code used to reprocess historical data. So reprocessing of historical data serves mostly to verify that history can be reprocessed if that should be necessary.
Reasons why reprocessing history might be necessary include:
- detection of a bug in the current algorithm,
- enhancement of the current algorithm to produce additional outputs from the same input data, or improve the quality of outputs,
- migration to a different database type or schema for current state storage,
- or to regenerate the current state databases when they have been lost.
The last item provides the ability to skip backups for the current-state databases, as they can be regenerated if needed (though the historical data of course must be reliably stored!).
In both designs, it is possible to keep periodic snapshots, and just recompute data since that snapshot.
Before discarding the Lambda “batch mode” component and going purely with the Kappa approach, it may also be worth considering the implications of “eventual consistency” and whether the realtime processing layer is in fact always generating the same outputs that would be generated at a later time after “convergence” of distributed systems has completed. To quote from Nathan Marz original article:
(Batch processing is) easy to reason about because you only have to think about data and functions on that data. There’s no read-repair, concurrency, or other complex issues to consider.
The Kafka Message Broker
The Kappa approach is particularly easy if “historical” data only means a few weeks or months of data, and the message-broker is Kafka: just set the Kafka retention time so all required historical data is retained, and then have the new streaming instance read from the start of the Kafka queue. The new instance will automatically transition from “historical” to “realtime” data - it is all just data in the same queue.
Note that data in a Kafka queue takes up no more (and probably less) disk-space than the same data held in a database of some sort; if you are prepared to dedicate disk-space to holding that history in a database then storing it in Kafka files is no more expensive. The disadvantage of course is that the data cannot be directly queried.
Functional Systems
In recent years the “functional programming” movement has received far more attention than previously. These somewhat neglected ideas are proving useful for a number of reasons, the most significant of which is scalability.
In any system holding centralized state information, parallel processing (scalability) is a problem. Accessing and updating that central state is vulnerable to race-conditions, and thus requires locking when accessed in parallel. This applies equally to multi-threaded programming within a single process, and to systems of interacting processes.
Functional programming consists (in part) of using immutable datastructures. Such structures do not require locking. They can also be safely replicated.
The focus on “immutable history” in the Lambda/Kappa design patterns is similar.
Domain Driven Design, Event Sourcing and CQRS
The concepts behind the term “Event Sourcing” are clearly very similar to those in what Marz/Kreps name Lambda and Kappa.
The concepts of “CQRS” and “Domain Driven Design” are often applied together with Event Sourcing.
-
Event Sourcing is the concept of storing the history of a system (the set of events which led to its current state) rather than just the current state.
-
Command-query responsibility segregation (CQRS) is a design pattern in which code that queries data is separated from the code which modifies data. The well-known model-view-controller pattern is somewhat similar; there, the controller is responsible for performing updates on the model while the view is responsible for “querying” it. CQRS is perhaps the “enterprise data version” of MVC. CQRS is pretty much the opposite of CRUD-based (Create Read Update Delete) DAO (Data Access Object) code which tightly couples queries and updates by implementing them in one object.
-
Domain Driven Design (DDD) is at its core simply the approach of modelling the system to be implemented as entities and operations, and using this model to drive the implementation. The book “Domain-Driven Design: Tackling Complexity in the Heart of Software” by Eric Evans (2004) created the term and formalizes many parts of the “object oriented design” approach.
One of the nice properties of representing system changes as streams of events rather than updates to state is that such changes can be easily replicated and processed in parallel by multiple systems. When there are multiple places in which the same data is stored (eg caches), this can simplify the task of keeping everything up-to-date. The Apache Samza stream-processing-system is an interesting application of this approach.
Summary
There is far more to the Lambda and Kappa design patterns than described here. In addition, I’m no expert in this area - all advice and opinion here should be read with caution. I found the ideas interesting, and there are many connections to other big-data topics and active IT-related movements. I hope you find it interesting too.
References
- Kappa Site
- Kappa Article
- The Original Lambda Article
- Heimeshoff & Jander: CQRS
- Domain Driven Design, Event Sourcing and CQRS
- Martin Fowler: Event Sourcing
- CALM - Consistency as Logical Monoticity
Other useful links:
- Datomic - a database designed around a Kappa architecture which processes a stream of “persistent entity update” events to keep one or more (key,value) caches up-to-date, and then supports queries against these caches.