Apache Beam and Google Dataflow Overview

Categories: Cloud, BigData

Introduction

Apache Beam is a programming API and runtime for writing applications that process large amounts of data in parallel. It can be used to process bounded (fixed-size) input (“batch processing”) or unbounded (continually-arriving) input (“stream processing”). Other technologies that address similar problems include Spark, Flink, and Storm.

The major feature that makes Beam different from existing big-data-processing frameworks is its support for windowing of incoming data. If you need to compute things such as “running totals” over records that fall into specific time-ranges, then Beam is worth a serious look.

The other major feature of Beam is that it is the API for executing logic on Google’s Dataflow cloud service. If you want to process big data in the Google cloud using code (rather than using a database query language) then you will probably need to learn Beam. Interestingly, Beam is divided into separate “front end” and “back end” layers, and Google’s Dataflow Engine is just one possible back-end; Beam applications can also be executed on a cluster of Flink servers or Spark servers (more on this later).

Much of the following information is taken from a good article by one of the authors of Google Dataflow; see the References section for the relevant links. This article does, however, provide some context that (in my opinion) that article lacks.

Note: I have been using Beam for only about a month now (having previously done a moderate amount of Spark work). Corrections to anything below are welcome.

History

Google developed a parallel-programming API called FlumeJava; this Java API allowed the transformations to be applied to incoming data to be defined in an elegant, functional-programming-like style. Google also developed several parallel-processing engines for executing such transformations, including MillWheel. The Google Dataflow team then took the best ideas from FlumeJava and MillWheel, and built the Dataflow service. Later they donated the “front end” programming API (ie that bit descended from FlumeJava) to the Apache foundation, and it is now called Apache Beam.

Google itself is a very heavy user of big-data-processing applications, and Dataflow was initially created to solve some of the problems they had which no other frameworks (internal or external) solved well. In particular, they have many use-cases that depend heavily on windowing input data. Google profiles users in order to drive their advertising placements, their recommendation-engines, and other components that bring them money (remember, Google is primarily about profile-driven advertising). Analysing user interactions with computer systems really needs to take into account the timestamp at which such interaction-events occurred - and thus requires windowing.

Of course, competing software products have not stood still. Bean was designed a few years ago, while Spark was still in version 1.x. Spark 1.4 added support for windowing-functions, but only in Spark-SQL and it does not appear to really address the issues regarding time-based windowing that Beam deals with. Spark 2.0 introduced the concept of “structured streaming” which unified batch and streaming processing (as Beam did from the start). Spark 2.3.0 (Feb 2018) introduces “continous processing” as an alternative to “micro-batching”, which might finally make Spark interesting competition to Beam for time-windowed processing (I would need to ask a Spark expert if the issues in the Beam paper/article can now be dealt with via Spark).

Note that “FlumeJava” is unrelated to the Apache Flume log-aggregation application.

Bounded and Unbounded Data, and Windowing

The Beam documentation avoids the use of the words “batch” and “streaming”. Instead, it refers to bounded input, ie where the application will eventually reach “the last record”, and unbounded input.

When dealing with unbounded input, it is obviously necessary to emit results at intervals - either regular intervals (such as once per minute), or based on “triggers” in the incoming data. Beam has specific support for this concept, and calls them (unsurprisingly) triggers. In addition, the results that are emitted will be the result of computing values over some set of records - often a set of records whose “event time” lies within some time-range. Beam also has specific support for this concept - each record has an implicit timestamp, and Beam automatically links records into the corresponding “windows”.

When processing bounded input, triggers and windows can still sometimes be relevant, and can be used if needed. If not (ie if the logic is a simple “process all records” kind of app) then Beam simply assigns every record the same timestamp, and assigns them to a single “global” window. This means that “traditional batch” processing is simply a special case of unbounded, triggered, windowed processing. Or in short, batch is a special case of streaming.

One particularly interesting window-type is the session - a set of timestamped events that occurred “close to” each other, followed by an interval of inactivity. Google analyses data within sessions for internal purposes, eg user interactions with youtube. Beam has inbuilt support for session-windows. Building session-windows in Spark 1.x was apparently extremely complex; I am not sure whether this is improved in Spark 2.x.

The Beam articles in the References section explain all this much better than I can..

The Beam Programming API (aka “front end”)

The primary programming language for writing Beam applications is Java (1.8+). The Apache Beam project is currently working on a reimplementation of Beam in Python (to allow writing Beam apps in Python) - this is mostly complete but still “experimental” at the current time.

The external Scio project provides a Scala-based wrapper over Beam; I haven’t tried it but at first glance it looks elegant and could be interesting if you are used to writing Spark applications in Scala.

The Beam Java API is actually reasonably nice. It isn’t as terse and elegant as the Spark Scala API, but in my opinion it is good enough for general use.

The Execution Graph

A Beam application runs in two very separate phases:

  • defining the execution-graph
  • running the execution-graph

The “main method” of a Beam application is a normal Java main-method, and simply makes calls to the Beam API which result in the construction of a “pipeline” object which wraps a “graph of transform nodes”.

Calling method Pipeline.run causes the execution-graph to be handed over to some “runner module” which arranges for execution of that graph.

When using the DirectRunner, execution is simple: threads are forked and evaluation of the graph starts immediately.

When using the DataflowRunner (ie when executing the Beam application on Google’s Dataflow Engine), then the local runner component:

  • serializes the execution-graph and uploads it to shared storage (google cloud storage)
  • uploads the application binary itself (and anything in its $CLASSPATH) to shared storage
  • generates a JSON object which contains an additional serialized version of the execution-graph plus metadata such as the path to the above files
  • and then either:
    • sends the JSON object to the Dataflow service to start immediate evaluation of the execution-graph
    • or uploads the JSON object to shared storage, from which it can later be executed (potentially many times) without needing to run the “main” method again.

The mode in which the pipeline gets saved into shared storage is called “template mode” and is triggered via commandline arg “--templateLocation=”.

Because of these two very clear separate phases, the code in a Beam application is split into two kinds:

  • code that is executed during “define graph time”, and
  • code (methods and lambdas) which are only executed at “run graph time”, ie as records are being processed.

If you are familiar with Spark, this separation will be no great surprise.

Similarly, there are are two kinds of “arguments” that can be passed to a Beam application:

  • arguments which are available during the “define graph” phase (and thus can be used to change the “shape” of the graph), and
  • arguments which are only available during the “run graph” phase (see class ValueProvider).

The first kind of argument can also be accessed from within transformations executed at runtime. However if the execution-graph has been uploaded to shared storage (a template) for later execution, then such arguments cannot be changed - their value from the “define graph” phase is simply captured during serialization. The second kind of argument can be specified when the execution-graph is executed at any later time - but its value is obviously NOT available at define-graph-time (consider the case where execution is triggered via a previously-uploaded template).

Beam Transforms

Beam class PTransform is the base type for all types which are “builders of execution-graph nodes”.

The Beam standard library provides a number of standard PTransform classes, including ones that build nodes for:

  • reading from data-sources (messaging systems, filesystems, serialized lists of objects that the define-graph phase provided, etc)
  • writing to data-sinks (messaging-systems, filesystems, databases, etc)
  • mapping, flatmapping, and filtering records (ie nodes that wrap a lambda that transforms or discards a record)
  • group-by-key
  • cogrouping multiple record streams (on top of which standard “joins” can be built)
  • combine operations (ie “reduce-like” operations)

Method PTransform.expand(...) is executed at define-graph-time.

The PTransform subclass ParDo is the generic tool for processing records (ie applying logic to each record in the input stream). Method ParDo.of(..) takes a DoFn object as parameter, and this object is executed at execute-graph-time (its processElement method is applied to each record).

The map/flatmap/filter transforms are built on top of ParDo, and the lambda provided to the transform is invoked via the processElement method of a ParDo.

Type PCollection represents a stream of records of some type.

A Spark RDD is equivalent to a combination of Beam’s PTransform and PCollection types.

Extension Libraries

The Apache Beam project provides not only the “core sdk”, but also a handful of “extension libraries”. The most significant of these extension libraries is one that provides the standard “join” operators (inner-join, outer-join, etc) - which are mostly implemented on top of the core CoGroup transform.

Side Inputs and Outputs

Operators like map/flatmap/filter are passed a stream of input records. Sometimes such functions need additional “reference data”; the side inputs feature allows this.

When writing a simple non-windowed batch processing application, a Spark broadcast variable and a Beam side input are effectively the same thing. However a side input can be windowed, ie when a transform-operation is processing input records from window X, then the side input object can provide “shared” data for window X - something that is difficult to implement in Spark.

Operators like map/flatmap/filter can also potentially generate multiple independent “categories” of output. One solution to this is to generate (key, value) pairs as output where the key is the category - but this isn’t always elegant (particularly when the value is itself a key/value pair). Beam’s side outputs mechanism effectively tags records output by an operator with a category. Records with different categories can then be directed down different paths in the pipeline.

Beam Execution Engines (aka “back end”)

Although a Beam application can be executed via a “runner”, and there are runners for Spark and Flink, this does not mean that Beam is simply a “portability wrapper”.

A Beam application which processes bounded data and which does not use windowing (technically, puts all records into a global window) is rather similar in structure to Spark, and when the SparkRunner is used then it will in fact generate logic that is pretty much a 1:1 mapping to Spark functionality, and it would be possible to simply write the equivalent program by hand using the Spark API. However when a Beam application uses windows, then the code generated by the Spark Runner is likely to be very complex - nothing that a developer would like to write themselves.

Tools like Hive and Pig work in a similar way; a developer specifies their logic in hive-sql or in the pig language, and then that is converted to a form that some other “engine” can execute - whether it is traditional MapReduce, Tez, or Spark. This does not mean that hive-sql or pig is simply a “portability wrapper” over those languages, but rather that they are “higher-level” tools which delegate execution of logic to external processing-engines by using a language that those processing-engines can handle. Beam likewise delegates execution of a “processing pipeline” which has been defined via the Beam API to any supported processing-engine.

One particularly useful “back end” is the DirectRunner, which executes the Beam application in the local VM. This is roughly equivalent to Spark’s “local[*]” mode. Running code with the DirectRunner makes it possible to launch it from an IDE, use breakpoints and stepping, and to see logged output in the local console. Debugging an app which is running remotely on the GCP Dataflow Engine is much harder (and I say that from experience!).

The DataflowRunner “back end” works together with the Google Dataflow service. Dataflow provides a number of nice features for Beam applications, including:

  • automatically instantiating ComputeEngine instances on which to run the application
  • handling ComputeEngine node failures transparently
  • forwarding log-messages to the Google StackDriver log-monitoring service
  • providing authentication credentials for the Dataflow code, so that calls can be made to other Google services (such as BigQuery)
  • providing a basic Dataflow-job web admin interface, where the structure of an executed job can be seen as a visual graph-of-nodes, together with the state (success/fail) of each node and the execution time.

Beam SQL

A Beam application can express logic in the form of a sql-like string, rather than having to write Java (or Python or Scala) code. The Beam runtime uses the Apache Calcite library to parse and optimise the expressed logic (eg joins, filters) and generate the appropriate logic. This is very similar to using Spark-SQL.

Beam vs Spark

If you are writing purely “batch” programs (ie ones processing “bounded input”) and do not need to deal with the concept of time-ordering of data within the input stream, then Beam and Spark are roughly equivalent.

Beam is (at least in my opinion) more difficult to reason about at runtime than Spark. With Spark in “batch” or “micro-batch” mode, I have a good grasp of how the driver-node and worker-nodes interact, how dynamic scaling works, and related concepts that may have an impact on application performance. Beam is more “abstract”, particularly due to the fact that it has different “runners” that may be implemented very differently, and are not well documented. The fact that the DataflowRunner targets a proprietary execution engine makes this even more tricky. When all works well, this is not so important - but I suspect that if performance issues pop up then tracking down the cause would be harder with Beam than Spark.

If you need to deal with an incoming event-stream where the events have timestamps and where analysis of the data is affected by these timestamps, then according to the Beam paper, Beam can deal correctly and efficiently with this data where the equivalent Spark application would be extremely complex. See the Beam article in the References section below for a full description. As noted above, the Beam article was written a while ago, and Spark has since gained support for “structured streaming” and “continuous processing”. Regardless, Beam does deal elegantly with windowed processing of time-sensitive records.

And if you are using the Google platform to execute code, then Beam + Dataflow is easier to manage/deploy, and cheaper to run, than using Spark + Google Dataproc (Google’s hadoop-as-a-service offering) or than running a hadoop cluster on a set of Google Compute Engine instances.

If you know Spark, you will find Beam quite similar in many ways. The primary differences are:

  • there is no “driver” node
  • there are no “action” operators; everything is a “transform”
  • there are no “broadcast variables”; these are replaced by “side inputs” instead

Maturity and External Tools

Beam is a much younger project than Spark, and this sometimes shows. Bugs and missing features are (in my experience) still quite common (as of early 2018). Tutorials are also easier to find for Spark, documentation for Spark is better, and the Spark community is significantly larger.

I am not sure how performant/efficient Beam is with respect to Spark, but would not be surprised if Spark is currently faster. However when running Beam applications on Google Dataflow, CPU and RAM are effectively unlimited, which makes comparisons difficult. It would be interesting to run the same application written in Beam and Spark on the same Spark cluster (ie use the Beam SparkRunner) and compare execution times. That assumes, of course, that the application does not need to use Beam features like windowing.

Dataflow Scaling and Pricing

This section addresses the issues of scaling and pricing when executing a Beam application on the Google Dataflow Engine.

The Beam framework core provides a way of expressing an algorithm such that it can be executed in parallel. Exactly how that parallel processing occurs is a combination of:

  • the Runner implementation selected and linked into the Beam application that is deployed;
  • the parameters that are passed to that Runner implementation (whether explicitly by code, or via command-line-arg passthrough to an Options class)
  • services provided by the environment, eg the Dataflow “coordinator” service

The DataflowRunner class is the Runner implementation used when executing a Beam app on the Google Dataflow platform. The dataflow-specific Beam library includes several Options classes, eg DataflowPipelineWorkerPoolOptions, which can be configured via code or via commandline arguments. As usual, options which are defined as primitive types are “graph-creation-time-only” parameters, which affect the shape of the generated execution graph or constants which are “captured” when that graph is built. Options which are defined using ValueProvider can be applied at runtime instead (eg provided when a pipeline template is executed).

Dataflow supports three primary configuration properties:

  • autoscaling mode
  • numWorkers
  • maxNumWorkers

The autoscaling mode may be:

  • “none”, in which case a fixed number of workers (processes) is started and all tasks included in the execution-graph are distributed across this pool of workers
  • “throughput-based”, in which case an initial number of workers (processes) is started and then the nodes report their “progress percentage” periodically to the dataflow service, which decides whether to start additional workers (or reduce the number).

The throughput-based autoscaling mode applies to both bounded (batch) and unbounded (streaming) input, but in somewhat different ways.

When the datasource is bounded and numWorkers is not specified then Dataflow tries to estimate the size of the input by calling datasource.getEstimatedSizeBytes() at execution graph creation time and applying some heuristic to determine a suitable numWorkers value. This works for cases where the graph is created and then immediately executed; the input details (eg filename) are available. Obviously, this doesn’t work well when deploying a template; it seems that in this case numWorker is effectively zero. In either case, numWorkers instances are started initially and then, while the app is running, method reader.getFractionConsumed is polled regularly to check how much of the data has been read; when this is progressing too slowly then more worker instances are started.

When the datasource is unbounded then numWorkers workers are initially started (with minimum of maxNumWorkers/15). The Dataflow engine periodically applies a heuristic to adjust the number of workers (ComputeEngine instances) between maxNumWorkers/15 and maxNumWorkers, with a minimum of one.

While the Google and Beam documentation is usually pretty good, it is (in my opinion) seriously lacking when it comes to describing the configuration options for scaling a Beam application on Dataflow, ie specifying the number of parallel processes (workers) to use. The current implementation also seems rather lacking and a “work in progress”.

After reading the docs and making some experiments, my tentative conclusions are:

  • the autoscaling mode can only be set when deploying a template; it is not possible to change the mode when executing a template.
  • num-workers (aka numWorkers) can only be set when deploying a template, and cannot be set when executing one.

The template-deploy-time nature of these settings is reinforced by looking at class DataflowPipelineWorkerPoolOptions; the corresponding getter/setter methods use primitive types (String, etc) rather than ValueProvider wrappers which generally implies that these values are bound at graph-creation-time (ie template deploy time).

Oddly, the web UI for executing a template includes option “machine type” - which is not documented via “gcloud dataflow jobs run –help”. And DataflowPipelineWorkerPoolOptions.getWorkerMachineType() returns a primitive string, suggesting this is indeed a graph-creation-time-only option.

It is not clear whether “max-workers” (runtime) and “maxNumWorkers” (deploytime) are the same thing.

See Google: Executing Templates for the vague documentation that currently exists.

Pricing for executing a Beam/Dataflow job depends upon compute-engine and disk resources. A disk is always allocated for each worker (maxNumWorkers for unbounded streaming), but the number of compute-engine instances may vary during autoscaling and are only charged when actually used.

File Compression

The standard Beam transforms which read files from disk support compressed files (gzip, bzip2, and deflate); when no compression algorithm is specified via the API then they try to deduce the algorithm to use from the filename suffix (Compression.AUTO).

Some compression formats are splittable, ie it is possible to seek to an offset within the file, scan to find some marker, and start decompressing data. This feature is necessary in order to be able to read an input file via multiple parallel processes (workers). Splittable algorithms include bzip2 and lz4 (which is not currently supported).

Unfortunately, some other compression formats are not splittable, ie data can only be compressed by starting from the beginning of the file. This includes gzip and deflate. Such input files must be read by a single process (worker).

And unfortunately, although bzip2 (suffix “.bz2”) is theoretically splittable, Beam 2.4.0 does not yet implement this; bzip2 files are read in a single worker just like gzip files.

In summary: if you feed a Beam/Dataflow application a large compressed file, then it will be processed but only single threaded.

Google Cloud Storage supports a metadata property “content-type” for each file; setting this to “gzip” is supposed to indicate to reading applications that the content is compressed with the gzip algorithm, regardless of the filename suffix. Unfortunately, the file-reader transforms in Beam 2.4.0 do not yet support this; ensure that compressed files have the correct filename suffix.

References