Dealing with Mutable Records in a BigQuery Data Warehouse

Categories: Cloud, BigData

Introduction

The Google BigQuery database database is intended for business intelligence / data warehouse workloads, ie analysis of static data. New records can be efficiently appended to a BigQuery table, but modifying existing records is complicated and inefficient. Unfortunately, “reference tables” that are needed in BigQuery for performing joins are often simply a replica of data stored in some “upstream” system such as SAP or a relational database. The upstream system often allows mutating records - but replicating these changes through to the BigQuery business intelligence system can be tricky to implement.

The BigQuery documentation does have a page that addresses this issue. However it can be hard to grasp the purpose of this documentation without further context; this article aims to provide that context.

Some other non-relational databases have the same issue; Apache Hive is one of them (although Hive streaming tables do allow more efficient updates than are available in BigQuery).

If you are new to data warehousing, my article Introduction to Data Warehousing might be useful.

Facts and Dimensions

In a conventional data warehouse, stored data is divided into two categories: facts and dimensions.

Fact tables hold data which represents events happening in an external system; for example:

  • in a retail system, the sale of a product is a fact (generated at the checkout for example)
  • in an inventory management system, each product ship or product received is a fact
  • in a factory-management system, sensor readings are facts (this sensor had this value at this time)

Facts do not change once recorded. A product sold might later be returned - but that simply creates another fact of type “product returned” rather than modifying the original sale event record.

Dimensions hold semi-static “reference” data against which the fact tables are joined; for example

  • a catalog of products (mapping from product-id to product description and attributes)
  • a catalog of sensors in a factory (mapping from sensor id to sensor type, last-calibrated, etc)

BigQuery handles fact tables fine - appending to a table is efficient, and facts never change.

Static dimension tables, ie reference data that never changes, are also no problem.

When a dimension table receives updates, however, then this “semi-static” nature can be difficult to deal with. This is the issue that this article addresses.

Linking Facts to Dimensions

The problem to be solved is:

  • how to store the dimension data (with or without “change history”)
  • while making it efficient to find the matching dimension record when inserting new records into the fact table

Sometimes new fact records simply need to be linked to “the most current version” of the corresponding dimension record, and sometimes new fact records need to be linked to the dimension record which was active at some timestamp which is part of the fact record.

Finding the appropriate dimension record for a fact record might be done at insert-time (ie the new fact record is stored with a new column holding the primary-key of the matching dimension record), or might be done later during queries. In general, data warehouses prefer the first approach - do the lookup once and early, in order to make later queries more efficient.

Updates in BigQuery

Before we discuss the solutions, it is useful to understand how updates and deletes work in BigQuery.

A BigQuery table consists of a set of files on disk. Each file contains (roughly) a linear sequence of records. Unlike relational databases, records are not stored in BTree format, and existing files are never modified.

To modify a record within a file, a new version of that file must be created - ie every record in that file must be read and rewritten. These files are generally quite large (typically holding thousands or millions of records and taking tens or hundreds of megabytes of disk-space).

Here is a sensible usage of update, fixing a spelling-mistake in potentially millions of records:

#standardSQL
UPDATE somedataset.sometable
SET product_name='milk'
WHERE product_name='mikl'

This will cause a rewrite of every file in the table in which the where-clause matched (at least; might trigger a rewrite of every file associated with the table depending upon how BigQuery is implemented). And that is not only slow, but also (relatively) expensive.

When such updates are a rare occurrence, this is not a problem. However it is not something that should be done frequently. And in fact, BigQuery has a quota for updates:

Maximum number of combined UPDATE, DELETE, and MERGE statements per day per table — 96

Maximum number of combined UPDATE, DELETE, and MERGE statements per day per project — 10,000

Slowly Changing Dimensions in other Data Warehouses

How traditional data warehouses deal with this issue is addressed nicely in this Wikipedia article on Slowly Changing Dimensions. Unfortunately all of the solutions described there rely on mutating existing records in the database when updates are applied; they either

  • overwrite the old record with the new one (ie equivalent to delete + insert)
  • keep older records, but use an “isCurrent” flag to mark the active one (requires setting isCurrent=false on the older existing record when a newer version is inserted)
  • keep older records, and include an “end date” field on each record (requires updating end-date from null on the older existing record when a newer verson is inserted)

In fact, the schemas suggested in the Wikipedia article can be useful - they just need to be applied a little differently (to work around the inefficiencies of small updates in BigQuery).

Solution A: Replacing Dimension Tables Without History

There is a simple solution when:

  • your analysis jobs don’t ever care about historical values for specific dimension values, ie all reports and analysis should always use the most recent value;
  • the dimension table isn’t updated very often;
  • the dimension table isn’t excessively large; and
  • the dimension table is simply a mirror (replica) of a table in a traditional relational system.

In this case, the simplest solution is to just re-export the data from the upstream database whenever data changes (max once per day), and reimport it into BigQuery. A BigQuery load job is atomic, ie the new data replaces the old data in a single atomic step.

This is effectively the “type 1” solution from the Slowly Changing Dimensions article.

One potential problem is that it is traditional for a dimension table to use a surrogate key for its records. Because existing fact records will refer to dimension records by their keys, a “reload” of a dimension table would need to somehow ensure that the keys remain identical.

Solution B: Replacing Dimension Tables With History

Assume you do care about change-history, eg want to be able to generate reports showing the dimension-value a fact record had at the time the fact occurred, but all the other factors from the above solution apply, ie:

  • the dimension table isn’t updated too often;
  • the dimension table isn’t too large; and
  • the upstream system is a relational database

In this case, you may be able to apply one of the approaches from the Slowly Changing Dimensions article in the relational DB, and then export/reimport the entire table into BigQuery. Or in other words, this approach pushes the problem of maintaining the mutable table in data-warehouse form upstream to the provider of that data.

In particular, the “type 2b” (type 2 with effective-date) and “type 2c” (type 2 with effective-date and current-flag) solutions are good data models; they make it reasonably easy to write joins against the dimension table that pick out the right version of data either by date-range or simply by “current_flag = Y”.

As with solution A above, record keys need to be stable to avoid updating fact tables that refer to the dimension records.

Solution C: Batching Updates

This approach may be useful when the dimension table is too large to reimport on change, or an upstream relational DB cannot be used to structure the data in desired form before import.

As records are modified in the upstream system, append the new data to a temporary table in BigQuery. Then occasionally (eg once per day) apply all queued changes to the target dimension table, producing a new table as output (which then replaces the original). This allows existing records to be modified or deleted, and the new records to be inserted with appropriate values.

If changes are already being delivered from the upstream system as batches, then that makes things even easier.

This approach allows the “type 2b” or “type 2c” approach from the Slowly Changing Dimensions to be applied within BigQuery; in particular existing records for which a newer version now exists can be rewritten to have an end_date column, and their current_flag (if present) can be changed from Y to N.

Of course rewriting a table is an expensive operation, which is why it is necessary to apply changes in batches.

The disadvantage of this approach is that when small updates are being delivered frequently, eg as they are being made in the upstream mutable (and probably relational) system, then there will be a delay until this new information is available for joining fact-tables against. That is unfortunately the price to pay for using BigQuery - or any other database where updates are expensive and inefficient. The benefit of BigQuery is of course its cheap and fast (because scaleable) queries.

The BigQuery UPDATE and DELETE statements are not useful for applying batches of changes to individual records. BigQuery supports a non-standard SQL statement MERGE which can be used to do such rewriting; the statement is extremely powerful - but also quite complex to use. If merge is not sufficient, then a dataflow application can be used to apply the batched updates to the target table.

Running a MERGE or a dataflow application takes time; the results should be exposed atomically to other applications that are currently querying the dimension table being updated. The Google documentation suggests the following approaches for this:

  • write the records to a whole new table, and use a view used to atomically switch between old and new table versions; or
  • write the records to a file on disk, and then rely on the atomic behaviour of a load-job to replace the old table with the contents of the file

Solution D: Append Updates With Timestamp (Non-mutating)

In this approach, updates to a dimension are simply appended to the existing table without modifying the original records at all. Each dimension record has a timestamp indicating when it was created, and joins against the table use SQL to pick out the version of the record with logical-key X which has the highest creation-timestamp.

This is effectively option (2a) from the Slowly Changing Dimensions article.

Unfortunately, such queries are rather inefficient. This approach is therefore not particularly scalable. In addition, the SQL that is needed to implement this is complicated. Some queries which implement this are shown below.

Finding the Latest Record with a Max-based Query

Here is one possible solution for finding the “most recent” record for a table:

-- assume dimension table with form (key1, key2, rdata, lastModified)
-- assume fact table with form (key1, key2, ldata)
-- where (key1, key2) are a multipart primary key
-- and we want to simply join each fact record against the "most recent version" from the dimension table

#standardSQL
select l.key1, l.key2, l.ldata, r.rdata
from dataset.left l
inner join (
  select key1, key2, max(lastModified) as maxModified
  from dataset.right
  group by key1, key2) as rmax
on l.key1=rmax.key1 and l.key2=rmax.key2
inner join dataset.right r
on l.key1=r.key1 and l.key2=r.key2 and r.lastModified=rmax.maxModified

The first inner join effectively derives a “lookup table” from the dimension-table which maps (key1, key2) to the most recent timestamp for that key. The fact table can then join to the dimension-table record with (key1, key2, maxModified) to get “the most recent record”.

There is probably also a solution that uses the row_number analytic function rather than max + second join (see later).

There are two problems with this whole approach, however:

  • the query (or other possible variants) is complex, even on its own (ie will not run fast), and
  • to join another table against this table, the query needs to be modified - making it even more complex.

Finding the dimension record active at a date from the fact record is presumably also possible with the appropriate query, but even more complex. It would seem that an analytic function would be necessary in this case (rather than the max-based solution above).

Finding the Latest Record with an Analytics-based Query

The following query solves the “find most recent record” problem using the row_number analytic function rather than max + second join.

#standardSQL
with
  rtemp as (
    select
      r.key1, r.key2, r.rdata, r.lastModified,
      row_number() over (partition by r.key1, r.key2 order by lastModified desc) as rn
    from dataset.right r),
  rlatest as (select * from rtemp where rtemp.rn = 1)
select l.key1, l.key2, l.ldata, r.rdata, r.lastModified from
  dataset.left l inner join rlatest r
  on l.key1 = r.key1 and l.key2 = r.key2
  order by l.key1, l.key2

This query works in a similar way to the “max” solution above; the inner select generates a table rlatest with just the most recent record from dataset.right for each (key1, key2) pair. Unfortunately, a BigQuery where-clause cannot reference column aliases; it is therefore necessary to compute column “rn” in a nested select (rtemp).

It is not clear (to me) how to write a function that uses timestamps from records in the “left” table to find the matching record in the right table.

The BigQuery docs also provide a query for deduplicating tables which might form a good starting-point for a “select most recent” query.

The Slowly Changing Dimensions article option with start-date and end-date requires record modification at first sight (to set end-date on existing records when a new version is inserted). However perhaps an analytic function with the correct window might be able to implicitly deduce an end-date which is equal to the start-date of the following record?

Finding the Latest Record with an Outer Join

This page presents an interesting query that might be a good base for an alternative query to “find the most recent record”.

-- based on https://www.brentozar.com/archive/2009/04/getting-the-most-recent-record/
-- find records from right where there is no record with a larger lastModified timestamp
-- (and thus the outer-join populates the fields from rn with nulls)
SELECT `r.*`
  FROM dataset.right r
    LEFT OUTER JOIN dataset.right rn
    ON (r.key1 = rn.key1) and (r.key2 = rn.key2) and r.lastModified < rn.lastModified
  WHERE rn.key1 IS NULL

Test Data for Latest Record Queries

In order to test the above queries, I wrote a small python program to generate test data. For reference, here it is:

TODO: add test code here
!!

The test-data file can be loaded with

bq load --source_format=NEWLINE_DELIMITED_JSON dataset.left ./left.json

Solution E: Update Data Masking

The Google docs describe an “update data masking” approach (“Technique 3”) with a main table for the dimension and a separate table for “updates”, with a view that joins the base table against the updates table.

This is effectively a variant of the above solution, where clever SQL is used to pick out the “latest” record without mutating older records when they are made obsolete.

When the number of updates is small relative to the size of the dimension table then this might be more efficient than inserting updates into the dimension table directly. It is, however, less elegant, and the SQL is complex.

The example view Google provides relies on the non-standard IF#introduction SQL operator.

Other Issues

Interactions between Streaming and Updates

UPDATE, DELETE and MERGE statements cannot be executed while streaming is “active” (a non-empty streaming buffer exists). Presumably solution D or E (join dimension table with update data) would work while streaming is active, but storing the results back to the original table would fail. However dimension tables are unlikely to be the target of streaming writes (unlike fact tables, where streaming inserts are common).

NOTE: when using streaming as a workaround for unable-to-block-on-completed-upload, this could be a problem.

The Merge Statement

The BigQuery MERGE statement is well documented - but is complex. As a brief taste of what it looks like:

merge target_table t
using updates_table u
on t.id = u.id
when matched and u.op="update" then update set t.foo = u.foo
when matched and u.op="delete" then delete
when not matched by target then ..
when not matched by source then ..

Summary

At this point, this article has hopefully provided enough context to allow the Google documentation on handling changing data (which was mentioned in the introduction to this article) to be more easily understood.

If you have any good solution to the problem that is not mentioned above, please let me know!

References