Spark RDD Random Notes

Categories: BigData

As a quick extension to the discussion on RDDs from the Spark Overview article, here are some notes on a few of the operations that can be performed on an RDD.

These notes are pretty rough - don’t rely on anything you read here :-)

What is an RDD

As described in the Spark Overview article, an RDD is a datastructure holding metadata about a dataset.

An RDD may be an “initial RDD” in which case it holds metadata about some external source of data; these are created via methods on a SparkContext object.

Otherwise an RDD is a “child RDD”, holding a reference to a parent RDD; the parent defines the input to the child RDD, and an operation to be applied to that input dataset to generate an output dataset. An RDD can also have multiple parent RDDs, eg an RDD performing a “join” of two input datasets.

A Spark driver application uses methods on types SparkContext and RDD to build a graph of RDDs; invoking a “transformation” method on an RDD generates a child RDD. Invoking an “action” method on an RDD instead causes the RDD on which it is invoked to submit itself to the spark framework for execution. The spark framework transforms the RDD and its ancestors into a graph of tasks, serializes these tasks, and sends them over network connections to spark executor processes for execution.

Creating an Initial RDD

An RDD which does not have a parent RDD must instead reference some external source of data, such as an in-memory list, a local file, a file in a distributed filesystem such as HDFS, a Kafka topic, or similar.

An Initial RDD for an HDFS File

The following example defines an initial RDD whose source is a file in HDFS:

  // assume variable sc references a SparkContext object..
  val lines = sc.textFile("hdfs://path/to/file.csv")

This method (and the other base methods on SparkContext which create an RDD) starts construction of a new “work graph” within the SparkContext; in effect these methods are “builder methods” for constructing a graph and the returned RDD object is another subbuilder object which offers methods for attaching yet more nodes into the graph. This initial node in the graph is a “create rdd” node wrapping the provided URL. When the graph is eventually executed, this node will make a metadata request to the HDFS namenode for the specified path, obtain information including:

  • the file size, in blocks
  • for each file block, the list of nodes on which the block is stored (replication)

and then execute all child nodes of that RDD node.

Every RDD in the graph of RDDs has explicit or implicit partitioning information, which is used to determine the number of tasks that are spawned, and whether the RDD node can be merged with its parent RDD (only possible if they use the same partitioning of the data).

While it would be theoretically possible to decide from the initial RDD’s partitioning how many Spark Executors and VCores to allocate, this is not currently done: the executor/core values are instead specified as job configuration parameters. Spark dynamic mode (see later) can increase or reduce the number of executors/vcores but that is done based upon the number of tasks waiting to be executed, rather than the partitioning of the input data source. Allocating resources based on the initial RDD partitioning would only be of use in shallow processing pipelines, which is probably why this is not done.

An Initial RDD for an in-memory List

SparkContext.parallelize(s: Seq) takes an arbitrary sequence object and returns an RDD for it. In this case, the RDD is not “metadata referencing the input data” but truly contains the data to be processed. The partitioning factor for such an RDD is arbitrary, and can be specified in the parallelize method. The RDD presumably holds an array of partition-descriptors (like it does with HDFS where each element describes a block), but with the parallelize method each element references a subrange of the in-memory sequence.

When an action method is eventually invoked on some descendant of such an initial RDD, the generated tasks sent to the spark executor instances include a copy of the relevant subrange of the original sequence.

RDD Partitioning

Each RDD has an explicit or implicit partition-descriptor. For an “initial RDD” representing a file in HDFS, the partitioning is simply the number of HDFS blocks. For an initial RDD representing a Kafka topic, it is the partitions in the input topic, etc. When a child RDD is generated from a parent RDD using a “map” transformation, the partitioning of the child RDD is identical to the parent RDD - the data is simply processed “in place” without any need to perform a shuffle. Similarly, filtering and grouping only transforms in-place. However a reduce operation needs to gather data from multiple upstream tasks, and rearranges the data (a shuffle); an RDD created from a parent RDD by performing a reduce therefore has a different partitioning descriptor.

When the Spark framework executes an “action” RDD, it looks up the chain of parent RDDs and merges adjacent nodes which have the same partitioning, ie can generate one single task for multiple adjacent RDDs.

This analysis/merging results in a “logical plan”. For each logical task, a separate instance is then queued for execution for each data partition. As an example:

  • an RDD references a file in HDFS which has 12 blocks
  • a child transformation RDD references the parent
  • another child transformation RDD references the above RDD
  • and then an action-method is invoked on the above RDD, ie spark must evaluate the graph

None of the ancestors of the RDD on which the action was invoked have forced the data to be repartitioned (shuffled), so they can all be merged into one logical task. Twelve instances of that task are then scheduled, one for each HDFS block of the ancestor RDD. Each task instance reads/processes one block of the input HDFS file and then terminates. When all 12 task instances have completed, the spark job is complete.

RDDs and Closures

When a “map” node is executed which has a “hdfs file rdd” as a parent, then spark creates a “task” object per file-block in the parent (ie input data partition) and then:

  • serializes the closure-param passed to the map method
  • for each task: choose a suitable worker node and send the serialized closure over the network to that node, where it is executed.

The location where each task ran is tracked, and used for scheduling child nodes of the map node (eg reduce nodes).

The “resilient” part of the RDD is that when a map task fails, the driver program which is executing the graph is notified, and it simply resubmits the serialized closure to another worker process. In the case of a map task which is reading from HDFS, the source data is still present. In the case where the task is a “downstream” one consuming the output of an earlier task then:

  • if the worker node is still available (ie the task has failed but not its host) then the task can be resubmitted to the same worker - presumably the input data is still there in memory or local disk
  • if the worker node has crashed (or fails repeatedly) then processing of the graph needs to be restarted from an earlier stage. As long as the transformations (closures) being applied do not have side effects this will be completely transparent.

RDD Methods

Transformations and Actions

RDD methods are either “transformations” or “actions”. Transformations do not trigger execution of the current RDD graph; actions do. In general, anything that returns an RDD object is a transformation; anything else is an action. As an example, RDD.foreach returns Unit (aka void), so it must be an action!

Transformations

Map

Invoking RDD.map on a parent RDD creates a child RDD object which references its parent and references a closure which is the mapping-logic to apply.

Note that the mapping logic is not yet executed - it is just serialized to a byte-array and the new RDD holds a reference to it. When eventually some task that includes that map step is executed, the closure is sent along with the serialized task object to some Spark Executor instance where Spark deserializes the closure and then passes each record in the task’s allocated partition of the input dataset to that deserialized closure. The records returned by the closure are cached in a memory buffer, and spilled to disk if there is insufficient memory available.

Filter

Like “map”, invoking RDD.filter creates a child RDD object which references its parent and references a closure which is the filtering-logic to apply.

keyBy

Operations which assign a key to an object can also be configured with the number of “buckets” to allocate their output to. Each (key,value) output is associated with bucket (hash(key)%nbuckets).

Cache

RDD.cache simply associates a label with the cached output of the parent RDD. This does not alter the partitioning of data - when data processed by the parent RDD is split across N servers, then a following cache operation simply preserves the data already on those N servers without moving it anywhere. A “cache” RDD can therefore be merged with its parent node in exactly the same way as map RDDs can be.

An RDD whose parent is a cache RDD runs with the same partitioning as its parent, and on the same servers.

Normally, when RDDs can be merged, then data is simply streamed from one to another in a “pipeline” fashion, and the logic for both RDDs is performed in a single pass. Inserting a cache node instead forces the intermediate result to be written to in-memory or disk storage; RDDs which have the cache RDD as their parent then iterate over those cached values in a separate pass.

Reduce

The reduce family of methods can be divided into those which reduce by a key (most of them) and the few which do not. Those which do not reduce by key do not need to move data betwen servers, and do not change the current partitioning of data - the reduce is just run on whichever servers currently hold sections of the input dataset. However in general a reduce is applied to “all records with a specific key regardless of which server they are currently on” - ie data must be moved around.

Such “shuffling” operations can be run on an arbitrary number of servers in parallel; there is no need to use the same number of tasks as the parent RDD used. Such RDD methods therefore take an optional “npartitions” parameter; when not specified then a default value is provided by the spark framework. The parent RDD is then modified to ensure it buckets its output to match the npartitions required by the downstream RDD.

Distinct

Method RDD.distinct has two parts. First it executes as a mapper-like function on each partition of the input dataset in its current location, finding the distinct keys within that local partition, and writing the results into N buckets. The result is a set of values on each server which are locally distinct but not globally distinct. Spark then starts N tasks on arbitrary spark executors which contact every spark executor used in the previous phase, and fetch the keys for one specific bucket; each instance can then filter out duplicates resulting in a globally distinct set of keys - though that result dataset of unique keys is now distributed over N servers.

The first pass could be skipped, instead simply grouping all records by key, bucketing, doing the shuffle, then computing the distinct keys. However making a first “local” pass before performing a shuffle reduces the amount of network traffic immensely.

Many of spark’s transformation methods internally generate such multi-phase processing.

Actions

foreach

RDD.foreach(..) is probably the simplest of all actions. The RDD simply holds a serialized version of the closure to be applied. At runtime, a task is sent to each executor holding a partition of the input (parent) dataset, and the executor applies the closure to each record in that dataset in turn (in place, on that server).

Count Action

A simple count action can be performed on each existing data partition in parallel. These partial results are then returned to the driver as each task completes, and the driver sums them itself to produce an overall count.

A count-by-key instead requires that the data be bucketed on the servers it is currently on, then N tasks be started which each fetch the records for one bucket from every other server on which the proceeding task ran. Each of these count tasks then has all the records it needs locally, and can return a map of (key, count) to the driver (a map is needed because a bucket can contain many distinct keys).

When using the interactive shell, adding a “cache” operation to the graph immediately triggers execution of the graph. Adding a “reduce-like” operation to the graph also immediately triggers execution. Adding the original rdd, and adding map-like operations to the graph does not trigger execution; they are evaluated only when needed.

Collect

Method RDD.collect returns an array with the output of each RDD partition as an element. The return-type is not an RDD, so it must be an action.

As the “collect” operation does not need to repartition (shuffle) its input, the necessary logic can be merged into the same task as its parent RDD. As each task instance executes, the collect component simply captures all data flowing through it and pipes this back to the driver application. When data is partitioned N ways, then the driver will receive N responses (one for each tasks) containing the results generated by that task instance running against that specific partition of the input dataset.

Tasks as Pipelines

It has been already mentioned above how Spark code which looks like it is making multiple passes over the data is actually _merged_ into a single task - and pass. Here's an example:
  val initialRdd = sparkContext.textFile("hdfs://path/to/file.csv") // returns a list of lines
  val splitRdd = initialRdd.flatmap(line => line.split(",")  // returns a list of fields in a line
  val filteredRdd = splitRdd.filter(field => field.size() > 5)  // fields longer than 5 chars
  val uppercased = filteredRdd.map(field => field.toUpperCase)
  val results = filteredRdd.collect() // action

Here’s a more verbose description of what actually happens at runtime, within a SparkExecutor. The code is pretty ugly - but the point is to show that the processing above is done in just one pass:

  trait SimpleIterator {
    def next(): String
  }

  class MyInitialRdd(source: InputStream, offsetFrom: Long, offsetTo: Long) extends SimpleIterator {
    def next() = {
      // return next line from input-stream over the subset of the file allocated to this particular task instance (the "partition")
    }
  }

  class MySplitter(Iterator src) extends SimpleIterator {
    var buffer = Array[String]()
    var index = 0

    // returns N values for each single value from upstream
    def next() = {
      if (index < buffer.size()) {
         return buffer(index++)
      }

      val line = src.next()
      if (line == null) {
        return null
      } else {
        buffer = line.split(",")
        index = 0
        return buffer(index++)
      }
    }
  }

  class MyFilter(Iterator src) extends SimpleIterator {
    // pulls multiple values from upstream src until one matches, or EOF is reached
    def next() = {
      do {
        val field = src.next()
        if (field == null) {
          null
        } else if (field.size() > 5) {
          field
        }
        // else ignore value and fetch next
      } while(true);
    }
  }

  class MyUpperCaser(Iterator src) extends SimpleIterator {
    // fairly obvious
  }

  val pipeline = new MyUpperCaser(new MyFilter(new MySplitter(new MyInitialRdd(src, offsetFrom, offsetTo))))
  do {
    val result = pipeline.next();
    if (result == null) {
      break;
    }
    sendToDriver(result) // collect
  } while true;

Each call to pipeline.next() pulls data from the MyUpperCaser instance which pulls data from its upstream source, etc. The result is that data flows through all five processing stages but requires only one pass over the data, with no need to store intermediate results anywhere (except in memory in the case of the flatmap operation).

Any developer familiar with Haskell’s lazy behaviour will find this very obvious - or with Scala’s lazy views, or even Python’s generators.

To come back to the original Spark code above, Spark’s execution framework effectively transforms a sequence of operations on an RDD into a “pipeline” like the above. Such a pipeline obviously cannot be executed across a “shuffle” boundary, but for a sequence of RDDs each of which is a transform of its parent that does not alter the data partitioning, they can be elegantly expressed in source-code as separate operations but at runtime form an efficient processing pipeline instead.

Notes on Efficiency

For map phases where the input partitions are highly correlated with the output partitions, in-memory caching is very effective - the server which processed an input block X generates most/all of the members of output key Y, so running the reduce phase on key Y can be efficiently performed on the same server. When input partitions are not well correlated with the output partitions, then the reduce phase for key Y must fetch data over the network from just about every node on which the map phase ran, and so the in-memory caching saves nothing.