Categories: BigData
Introduction
Spark is a framework for writing distributed programs which process large amounts of data. It produces programs which can be executed on a cluster management system such as Yarn (ie Hadoop) or Mesos.
This article is a set of notes I prepared before making a presentation on Spark-on-Yarn to work colleagues. The notes are fairly rough; the content below has not been “polished” into the kind of article that could be printed in a magazine. However I hope the content is useful to you.
The audience for this presentation was a mixture of data engineers (software developers), data scientists, and operations staff. The presentation therefore goes into quite a lot of detail about what happens when a Spark application gets started in a Yarn cluster, in particular what roles different servers or services play, how data flows between different servers, how an application scales horizontally, and what kinds of things set an upper bound on the scalability. This presentation also discusses the basics of the Spark programming model (RDDs, SparkQL, etc) but does not go into detail about how to actually write a Spark program (ie what the source-code looks like).
As a presentation, the information below was of course mixed with examples, discussions, and diagrams that are not included here.
Presentation Overview
This presentation consists roughly of:
- a 5 min introduction
- a 20 minute overview of HDFS
- a 20 minute overview of Yarn
- a 20 minute overview of MapReduce
- a 20 minute comparison of Spark with MapReduce at process/task level
- a 10 minute overview of spark-streaming
- a 20 minute look at Spark application code (SparkContext, RDD, etc)
In total, that’s about 2.5 hours - not including time for breaks, questions, etc.
This is a quick overview of the most important parts and runtime-behaviour of Spark. There are many resources on the internet for further research on specific topics - just Google.
The Goals/Features of Spark
Using the API provided by the Spark framework, a Spark program defines a sequence of operations to be performed on input data. The Spark framework analyses the defined sequence and generates an equivalent sequence of tasks. When a task can by its nature be applied to different subsets (partitions) of the data concurrently, then Spark arranges for the task logic to be sent to different servers within the cluster and executed in parallel. When a task requires as input the output of a previous task but grouped (partitioned) in a different way then Spark ensures that the data is moved between servers in the cluster in the most efficient way possible (though that doesn’t mean it is always trivial). And because transient problems may occur on some servers in the cluster, Spark monitors the progress of tasks and if a task fails then it restarts the same task (ie its logic) at a different place in the cluster.
On top of this core functionality, additional components of the overall Spark family add more complex features such as the ability to express logic in an SQL-like language rather than a programming API, and higher-level APIs for statistics, machine learning, graph processing, and various other kinds of things that can be applied to large amounts of data.
The Spark core needs some way of managing a set of servers (whether virtual or physical), each running a suitable operating system (usually Linux). Spark includes a fairly simple implementation of the necessary software (“Spark Standalone”) but can also integrate with the Yarn and Mesos cluster management systems. This presentation concentrates mostly on Spark-on-Yarn. Similarly, Spark needs some source of data - and large amounts of it, to make using Spark worthwhile. Spark supports a number of possible sources, but this presentation concentrates mostly on HDFS as the source of data. Spark with Yarn and HDFS is the most common kind of deployment - and if you understand these, then understanding the other options is not a big step.
HDFS
HDFS is a distributed filesystem, ie somewhat like the filesystem on a local laptop but bigger.
In the non-big-data world, databases are usually the source of data for analysis - so why are we talking about filesystems?
The Why of HDFS
(a)
When an analysis consists of a “full table scan”, ie “for each record…” then databases are mostly irrelevant. DBs are very good at organising data so that specific records can be found, but that’s not the case here. They are also good at managing updates to data - but again, that is often not relevant for “big data analysis” tasks.
In many cases, a file or set of files consisting of a sequence of records is all that is needed, without the complexity and overhead of a database.
(b)
A database such as Oracle internally has two parts: a data storage format and a query-execution-engine.
Big data tools often split these two parts into separate products. The ORC and Parquet libraries provide APIs for writing streams of records into files with the kind of associated metadata that a database would add. Hive and SparkQL are query-execution engines.
Actually, Hive is a query-compilation tool which takes an SQL-like statement in, and generates an application which can be executed on a cluster of servers. The application reads from some datasource (eg ORC/Parquet files in HDFS), processes the data, and returns a result matching the original query. SparkQL is similar: it also compiles queries into applications that are run in a cluster. And some other tools work similarly (though things like HBase or Cassandra do NOT work this way).
There is one additional component that goes along with Hive/SparkQL/etc - a way to know which files in HDFS belong together as a “table”, and a description of the data-format within those files (ie the “table schema”). ORC and Parquet files actually embed the schema of the data they store, but a query-compiler tool needs to know that schema before reading the files, so it needs to be stored externally. An HCatalog server provides a registry of (table, schema, file*)
for this purpose.
The How of HDFS
Ok, so it is hopefully clear why file-storage is relevant for big data, whether MapReduce, Hive or Spark. How HDFS works is reasonably simple:
- multiple servers each with multiple disks
- a “data node” application running on each server
- a single “name node” server for coordination
On startup, each data node makes a network call to the name node to register its existence and which data it is currently holding.
When a file is saved, the writer first requests the name node to create an entry representing that file (a specific path); permission checks are done here. Then the writer divides the file up into blocks (eg 128MB per block), and writes each block to a data node. The data node saves the block as a single file in its local filesystem, then calls back to the name-node to say that a new block has been added to a specific file.
When replication is enabled for the file (and it usually is), a data node also optionally forwards each block on to some other data node which also writes it to disk - and forwards the block on, etc. until the file replication factor has been reached.
Theoretically, a writer could write each block in parallel, getting extreme write throughput - but as far as I know this has not been implemented, because the use-cases are not common.
There is no limit on file-size except the sum of storage over the entire cluster.
Existing files can be deleted or moved, and can be appended-to. But inserting data into the middle, or overwriting data in the middle is not allowed.
The “native files” used by a data-node to store individual blocks just have “block ids” as names. There is no way to look at a data-nodes native files and know which blocks belong to the same file, or what that file-name is.
A data-node uses a background thread to regularly recompute the checksum for each block; if it doesn’t match then the file is deleted (or marked as bad) and the name-node is notified; the name-node then tells another server with a good copy of the block to replicate it, so the replication count is maintained. If an entire disk or server goes bad, the same happens - but for every affected block. Note that when a disk has an expected lifetime of 365 days, then in a cluster of 365 disks one is likely to go bad every day. In a cluster of 3650 disks….clearly, “restore from backup” is not a feasable strategy.
An app wanting to read a file contacts the “name node” which does permission checks, then returns a list of (block->servers)
. The reader then decides which blocks it wants and makes a request to any of the servers holding a copy of that block. Note that the replication factor means that the reader can choose from multiple sources of each block - whichever is “best” (nearest).
The ability to read in parallel also means that for a file with 20 blocks, it is possible to start 10 processes on 10 different servers within a cluster, where each process reads two blocks from whichever servers are closest to them. This allows scalability in two ways:
- parallel reads from multiple servers fixes io-bound behaviour on individual data nodes
- moving logic to the data can also fix io-bound behaviour at the network level, by reducing or eliminating the number of network hops from the storage to the processing logic.
LIVE: draw some diagrams showing how HDFS works here
YARN
Yarn is completely separated from HDFS, and addresses a different problem:
- HDFS makes use of the sum of all storage-capacity and io-bandwidth within a cluster
- YARN makes use of the sum of all RAM-capacity and CPU-capacity within a cluster
How Yarn works is reasonably simple in theory:
- multiple servers each with plenty of RAM and multiple CPU cores
- a “node manager” application running on each server (“yarn worker node”)
- a single “resource manager” server for coordination
On startup, each yarn worker node makes a network call to the resource manager to register its existence and how much RAM/CPU it has. Each node also includes information such as its operating-system, CPU-type, and any arbitrary tags that the sysadmin has defined for that specific server (eg “has-internet-access” or “has-GPU”).
(Does this sound familiar? HDFS uses a similar design pattern..)
Submitting a Yarn Job
A client application which wants to run some logic sends a “job request” to the resource-manager via a custom network protocol; there are libraries for various languages. The job-request looks something like:
- name of job
- name of submitter
- name of “work queue”
- how much RAM is needed
- other constraints on the “container” the job should be run in, eg
- operating-system-type (eg “debian”)
- CPU type (eg “x86-64”)
- a list of tags which the host server must have
- a list of servers that the host should be “near”
- list of resource-files needed, in one of the following representations:
- http-url
- hdfs-url
- embedded binary blob
- string specifying the command to be executed (usually references one of the above resources)
The Yarn resource-manager performs permission-checks then puts this request on the specified work-queue.
The resource-manager periodically checks its work-queues for waiting jobs, and then looks through its list of available processing-nodes to see if there is one which matches the job constraints and which has free resources to execute the job. When one is found, the resource-manager “reserves” the capabilities (a “container”) and then the job is sent to that processing-node for execution in that “container”.
The constraint related to being “near” specific servers is very useful when scheduling a job which reads from specific blocks of an HDFS file; by hinting to Yarn which hosts are best, the application’s IO performance when reading that data later will be improved. Note however that there is no direct integration between Yarn and HDFS; the “location hint” is a generic feature that any application can use when submitting a job to Yarn.
How a Yarn Worker Executes a Job
When a Yarn worker node receives a job-request from the resource-manager, it:
- reduces its advertised set of “free resources”
- creates a working-directory for the job
- fetches all resources listed in the job-request and saves them in the working-directory
- changes the “current directory” to the working-directory and executes the command from the job-request (with some special env-vars set).
Normally, the command looks something like java -jar ./the_app_to_run.jar ...
where the executed jarfile was listed in the job resources and therefore has been written into the local working directory. The example command does assume that a suitable JVM is pre-installed on each processing-node.
Alternatively, the command could be something like python ./the_app_to_run.py ...
. Again, this would assume that a suitable python environment is pre-installed.
When the executed command terminates, the yarn worker node deletes the working-directory, removing all job-specific resources and any local files the job may have created (note: there are special cases for handling “mapreduce shuffle” and logfiles).
A set of “reserved resources” in which a process can be started via the above steps is called a “container”. As you can see, it does not necessarily mean anything like a linux “namespace” or a docker container. However there are obviously security implications for the above description, including the fact that the command can be anything (including “/bin/sh rm -rf /”) and that it can peek into “../*
” to see temporary working directories associated with other jobs. Yarn does have some alternative approaches for launching the specified command, including using a user-account other than the one running the node-manager process, and (in prototype) using linux file and pid namespaces. There is also currently support for launching processes via Docker, but this is intended mainly to provide a standardised os-environment for each job rather than for security.
The started application is required to connect back to the yarn resource-manager and send heartbeat/status messages regularly. These status values can be seen in the resource-manager web interface. The address for the resource-manager can be found in special environment-variables. The job can run for an unlimited amount of time, as long as it regularly sends heartbeats. If the original job-request specifies that this is a “supervised” application, and it crashes (stops sending heartbeats) then the resource-manager will restart it on another processing-node.
This connection to the resource-manager can also be used by a job to request additional containers, ie to submit more “job requests”. An application which does this is referred to as an “Application Master” or a “Driver”; MapReduce, Spark, etc uses this approach - launch a single process which runs “supervising/coordinating” logic and allocates additional containers to do the actual work of reading and processing data. These “subjobs” are usually called “tasks”.
MapReduce
MapReduce is the algorthm that really started the “big data boom”. However MapReduce is now considered obsolete - so why am I talking about it? Mostly because the principles of Spark, Tez, and similar systems are based on the same principles, but more complicated. Understanding MapReduce first makes the more advanced tools easier to understand - like understanding a 1970s car engine before trying to understand a modern one with fuel injection, computer-controlled valves, etc. Or learning to fly a cessna before moving on to fighter jets.
MapReduce is a design-pattern first documented as an academic paper by some google researchers. They never released their implementation, but the Hadoop project implements a version based on the published research.
The Hadoop MapReduce implementation is effectively a library which helps applications to schedule lots of tasks in Yarn. When the data-source used as input to the processing is a file in HDFS, then the MapReduce library also provides helper methods to ensure the Yarn tasks scheduled in the cluster are placed “near” the HDFS data-nodes holding a copy of the input data, for improved IO performance.
Mappers
A “mapper” function processes each record in a dataset individually. It can:
- filter out unwanted records based on the content of the record itself
- select just a subset of the available fields in the record
- transform fields (eg change names to uppercase)
It can also assign a category (aka group) to each record. For MapReduce, this step is mandatory ie a mapper function must assign a category for each output record; for Spark this is optional.
A mapper function can only look at each record individually; it cannot compare records or compute aggregations over records. Because of this constraint, mappers are very parallelisable; given a dataset of 1,000,000 records it is irrelevant whether one mapper is applied to all records, 10 mappers to each block of 100,000 records, or 100 mappers to each block of 10,000 records.
The result of applying a mapper function to a dataset is a sequence of records which are derived from the original records. When applied to a dataset, a mapper function doing “transformation” type logic will generally produce a sequence which is as large as the input dataset, while a mapper function doing “filter” type logic will produce just a few (or maybe just one or even zero) output records.
Because a mapper only sees one record at a time, the surrounding framework feeding data to the mapper function can be very efficient with memory; there is no need to hold large numbers of records in memory. Of course for performance reasons, buffering is used but it is not required by the algorithm.
Reducers
A “reducer” function processes all records with the same category (group). At runtime, a reducer is given an “iterator” object whose next-method returns a series of objects all belonging to the same category/group. A reducer may compute sums, minimum, etc over all records in the group.
A reducer method often produces only one output record, containing the “summary” of its input group; because a reducer is applied to each group, the result of applying a reducer is therefore usually one record per group. However a reducer method can generate any number of records (eg 1 output for each input); for example a reducer method used to implement a join between datasets produces many output records.
Reducers have less natural parallelism than mappers; the maximum parallelism is the number of categories (groups) in the input data.
Because a reducer uses an iterator to fetch each record, there is no direct need to load all records in the category into memory. The entire dataset for a group must be held on a single server, and the reduce function run there, but the entire group does not need to be held in memory. However care needs to be taken when writing a reducer function; one which simply caches all objects in memory as it reads them will end up with all records in memory at once.
Drivers
A Hadoop MapReduce program consists of some Java code which:
- contains an implementation of a mapper-function
- contains an implementation of a reducer-function
- contains a main-method which:
- calls the MapReduce library API to create a “context”
- calls a method on the context to register a “data source” (eg an HDFS file)
- calls a method on the context to register the mapper-function
- calls a method on the context to register the reducer-function
- calls a method on the context to begin the processing
The resulting application is compiled into a jarfile. A script can then be used to create a “job request” which references this jarfile as a “resource”, and send this job-request to the Yarn resource-manager.
Yarn eventually executes something like “java -jar mymapreduceapp.jar driver
” on some Yarn worker node in the cluster, thus starting up the MapReduce program in “driver mode”. This application connects back to the yarn resource-manager and starts sending heartbeats and status messages. It then queries the specified data-source for meta-data; here we will assume the input is an HDFS file, in which case the returned metadata is a list of (blockid->server*)
entries. The driver then decides, based on the number of blocks, how many parallel processes it will use to perform the mapping stage - normally one per HDFS block. The driver then sends a request to the yarn resource-manager requesting “containers”, and then sends job-requests to Yarn to start its own jarfile in each container in “mapper mode” with the relevant file (startoffset, endoffset) values as parameters.
For each of these “mapper tasks”, a yarn worker node executes the specified command (something like “java -jar mymapreduceapp.jar mapper filename startoffset endoffset
”). The MapReduce framework then opens the filename, seeks to the start-offset, and starts feeding records one by one into the registered mapper-function. The records output by the mapper-function are streamed to local disk (details are somewhat complex). When all records have been processed, the Java process notifies its driver that processing was successful and then the process terminates, releasing the resources back to the node-manager.
Note that there may not be sufficient resources in the cluster to run all tasks concurrently; in this case some tasks will have to wait until earlier tasks have completed. If a task crashes, the driver will retry it several times on the same or different host.
Once the driver has seen that all mapper tasks have finished (ie the complete input dataset has been passed through a mapper function on some server), it starts the reduce phase.
As noted earlier, all records with the same category (group) must be on the same server in order to run the reduce function. But applying the mapper function to subsets of the dataset on different servers means that each server will have records with various categories - or in other words, the records for a specific category are scattered across all servers which ran the mapper function. This is solved by what Hadoop MapReduce calls “the shuffle”. As each reducer task starts, the MapReduce framework code makes an HTTP request to every server which ran the preceding mapping function, and fetches all data with the category(or categories) assigned to that reducer task instance. The MapReduce framework streams this data to local disk on the worker node running the reducer task, and after all data has arrived it invokes the reducer function with an iterator object capable of returning that cached data.
Remember that Yarn supports “locality hints” for allocating processes? The MapReduce driver can take advantage of this to ensure that mapper processes are run near or on a server which hosts a copy of the data being read (eg an HDFS block). When the code can be run exactly on the server hosting the data, then reading is extremely efficient as absolutely no network traffic is required.
Bucketing
Above, I said that a reduce function is applied to exactly one category/group. However what happens if the mapping algorithm generates millions of distinct groups for the input dataset?
The answer is “bucketing”. A bucket is a group-of-groups; the simplest way of mapping a group to a bucket is hash(groupid)%nbuckets
. Note that:
- all records in the same group are assigned the same bucket
- but a bucket can have records from multiple groups.
The MapReduce driver application decides, before it starts, how many processes it wants to start to run reducers in (eg N=5). The mapper phase then ensures that records output by the mapper function are written into files belonging to one of the N buckets (eg by applying the hash-modulo function above). Then when a process is started to run the reducer function, it uses the http calls to fetch all data belonging to its bucket. This ensures that all the data for a specific category/group still lands on the same physical server, but multiple categories/groups are now handled by a single process so the number of processes is sensibly limited.
It would be nice to dynamically determine an appropriate number of reducers based upon the number of distinct categories generated by the mappers, and by the volume of data in each category. However as the input data is only available after the mapper phase, but bucketing is done during the mapping phase, this is rather tricky. I’m not sure whether this is possible in Hadoop MapReduce, or in tools like Spark.
Joins
A “join” can be made with the MapReduce framework in several ways. One is for the mapper function to have two input streams - the two “tables” to join. The mapper function then generates output records of form (joinkey=>(inputtable, column1,column2,...))
ie to group the records by the column being joined on. This ensures a reducer function gets the relevant records from both tables for a specific joinkey, and can merge the multiple records into a single output one.
In effect, this approach partitions both of the tables on the joinkey value, and each reducer runs on a partition and therefore is guaranteed to see both “sides” of the join for any particular koinkey, even when multiple reducers are being run in parallel.
There are other ways to implement joins, depending upon the relative sizes of the datasets being joined, whether they are sorted by the same key, etc. The exact details aren’t so relevant as this is primarily a presentation on Spark - but internally Spark uses some of the same techniques that were previously implemented manually on top of MapReduce.
MapReduce as a back-end
MapReduce is a library that can be used when hand-writing Java code. There is also a way to write mapper and reducer functions in Python - HMapReduce uses a standard main-method which starts an external python interpreter process, loads the mapper or reducer script into it, and then sends records to it over STDIN and reads output back via STDOUT.
However other tools exist which generate Java bytecode which calls the Hadoop MapReduce APIs, and then submit the resulting program to Yarn for execution. Hive can compile SQL-like statements in this way, generating a MapReduce program which then produces the desired results. Pig is a “high-level dataflow language” which is compiled down to Java code that then uses the Hadoop MapReduce libraries.
LIVE: draw a diagram here
The Problems with MapReduce
While MapReduce was a useful tool for many years, there are a number of problems with the design-pattern in general, and also with the Hadoop implementation, which mean that it is seldom used these days.
The primary disadvantages of MapReduce are:
- A new Java process is launched via Yarn for each mapper instance and each reducer instance (ie each task), and the process terminates when mapping or reducing over the allocated subset of input data (partition) has completed. This does have a small advantage for mapping: Yarn can pick exactly the optimal host for each process in order to be as close as possible to the input data. However Java is not known for its startup speed; starting these JVM processes repeatly is a significant performance hit.
- Because each process runs just one task then terminates it is not possible to cache data in memory between tasks - everything needs to be written to either local disk or HDFS in order not to lose the results. In-memory caching is useful for algorithms which “fork” - computing an intermediate value which is then processed in two different ways.
- Mapreduce tasks do not send much information back to their driver application, limiting the runtime optimisations that the driver can perform.
- There is no efficient way to make multiple “reduce” passes over the same data. Multiple “map” passes are not useful; because the logic is only per-record there is nothing that cannot be solved just by applying the logic in one pass. However multiple reduces can be useful - but because a reducer function receives only an “iterator” object which provides input data, there is no way to reprocess the output of the reducer exept by launching another (map, reduce) pair with an empty map phase.
In addition:
- The MapReduce programmer API has been called verbose - although now that Java8 lambdas are available, it could probably be improved significantly with little effort.
- TODO: List other disadvantages.
In my opionion, Spark fans (and the Spark site itself) make excessive claims of their advantages over MapReduce. There definitely are advantages, but not everything that is claimed is true. Internally, there are many similarities between Spark and MapReduce.
Alternatives to MapReduce
Two alternatives have been developed to MapReduce: Tez and Spark. Spark is actually a family of products; in this section I am talking about the “spark core execution engine” - the lowest-level part of Spark on which everything else is built.
Both Tez and Spark solve the “slow process startup time” issue by instead allocating a set of N operating-system processes at job startup time, and then reusing these processes for multiple tasks. In other words, instead of a task requiring sending a command like “java -jar ...
” to a yarn container, it requires sending a serialized Java object to an already existing process which deserializes the request and then executes it from a thread. There are some disadvantages, including:
- the number of processes to spawn needs to be decided on up front (though Spark now has a “dynamic” option which may help)
- data locality is not optimal, ie none of the available processes might be on the server hosting the relevant HDFS block.
However in addition to avoiding process startup overhead, reusing a processes to execute multiple tasks allows the process to cache results in memory; if a subsequent pass over the results (ie a subsequent task) is executed on the same server and is consuming the output of the previous task, then having that output cached in memory can save a lot of time.
And having a pool of “reserved processes” allows interactive sessions with lower latency. MapReduce-based systems have never been nice for running quick test queries for “data exploration” due to the need to request containers via yarn and then launch Java processes in them. A long-running Tez-based or Spark-based “session” can hold onto a pool of such processes for hours, waiting for to execute the tasks associated with a SQL query as soon as it is submitted from the session owner. Obviously this wastes some cluster resources, but it is a good tradeoff as long as the number of interactive users is reasonably low.
Most tools which originally generated hadoop-map-reduce-based code as a “back end” have now been ported to Tez, Spark or both. In particular, Hive now supports all 3 back-ends and defaults to Tez.
Tez does provide a programming API, and can be used for hand-written code. However it is very seldom used in this way; for simple programming tasks the traditional MapReduce framework is better known and documented and for complex tasks the Spark framework provides an elegant API and has established market share. Tez is widely used as a back-end for other big-data applications.
In addition to Spark’s nice programming API, Spark’s core is also popular as a back-end for many applications. And there is a family of tools which only support Spark core as their “back end”, including spark-streaming and spark-ql.
A note on CPU-bound vs IO-bound processing
Scientific research can require processing of data using very CPU-intensive algorithms, ie the work is CPU-bound regardless of the amount of data to be processed. Research projects therefore usually use what is called HPC (High Performance Computing), and the architecture used separates data storage from data processing. A pool of servers provide storage (possibly HDFS or in a HDFS-like way). A separate pool of servers provide computation (CPUs and RAM); these fetch data from the storage system as needed, compute, then write results back. This works acceptably because IO and network bandwidth issues are dwarfed by CPU resource bottlenecks. Or in other words, this kind of work is CPU-bound and IO problems take second place.
In more general big-data-processing, it is very common to process vast amounts of data in fairly simple ways. Here the bottleneck is definitely IO, and the solution is to integrate processing with storage rather than keep them separate. This means:
- data is replicated to give a choice of servers on which to run the processing (and therefore read operations have low/zero network bandwidth requirements)
- the storage system exposes data-location information to client applications (eg
block->servers
mappings) - data-processing resources (fast CPUs and lots of RAM) are placed near, or on, the same servers hosting storage
- processing tasks are fine-grained and dynamically allocated near the data they process
Spark Deployment Mechanisms
Spark can be deployed in several ways:
- on Yarn
- on Mesos
- standalone cluster
- localcluster
- local
Now that we have discussed Yarn, Spark-on-yarn is fairly obvious. When a Spark job starts, it requests N “containers” from Yarn. It then dispatches a “job request” to each container which starts a Java VM running the “spark executor” application. Unlike MapReduce, the “spark executor” application is not specific to the job being executed; instead it is a generic application which can then be sent “serialized tasks” to execute. A Spark executor generally runs for as long as its parent driver application runs, and processes many different tasks in that time. This does hold the danger that a badly-behaved task can affect other tasks (including crashing the entire executor process). However an executor instance is never shared between jobs, so the worst a bad task can do is affect other tasks in the same job. There are two options for starting the job:
- cluster-mode submits a jobrequest to the Yarn resource-manager for the spark “driver” component; the entire workload runs in the cluster and the original system from which the job was submitted is not involved in processing.
- client-mode starts the driver on the server from which the job was started, ie the “coordinator” is running locally (particularly useful for debugging etc). However a trivial “yarn application master” process is still submitted to the Yarn resource manager anyway, as only a Yarn application can send heartbeats back to the resource-manager and request additional “child” containers. The client-mode driver app communicates directly with this trivial application-master in order to allocate container resources (ie uses it as a kind of proxy).
Mesos is, generally speaking, very similar to Yarn. Given a bunch of servers, install a mesos node-manager service on each one. Unlike yarn, there is no central “resource manager”; the corresponding logic is partly in the node-manager services and partly in the client/driver applications. When an application running “locally” wants to start some other process within the Mesos cluster, it broadcasts a “job request” describing the resources it wants. Each mesos node-manager which has free resources responds with an “offer”; the client application then either “accepts” one of the offers or decides to wait a while and try again later. A yarn-resource-manager-compatible service exists for Mesos so that yarn-enabled applications see the cluster as a standard yarn cluster. In fact, a single Mesos cluster can host several of these gateways, thus supporting multiple independent “yarn clusters” (in different versions if desired) on top of a single underlying Mesos cluster. However the conceptual differences between Yarn and Mesos are small enough that they are not really relevant for this discussion of Spark.
If you have a set of servers, and intend to run nothing but Spark code on them, then a spark-standalone-cluster may be used. On each server, a Spark worker service is started - just like a Yarn node-manager or a Mesos node-manager. And similarly, on request it will start a new process. However unlike yarn or mesos, the new process is always a “spark executor”. As with spark-on-yarn, a job starts N executors and holds on to them until the job terminates. An executor instance (JVM process) is never shared between jobs.
The localcluster mode starts its N executors just by running “exec process” from the driver application. That means that the executors are real processes but run on the same physical server; good for testing but not so good for scalability.
And local mode simply starts an executor as a thread within the same JVM as the driver application. Used for trivial testing only.
A Spark job needs to specify “num executors” and “num vcores” on startup. The “num executors” has been described above; it is the number of OS processes (JVM processes) to be started. There is no guarantee that these processes will be on different physical hosts, but they usually will be. The “vcores” option specifies how many concurrent tasks an executor should support. Each concurrent task is run in a separate thread within the parent executor.
As already noted, a Spark “executor” process usually lives as long as its parent driver process - ie a Spark “job” (or a Spark session). This is different from Hadoop MapReduce, where a process only lives as long as a single task.
Spark Map and Reduce
The Spark API supports “filter” operations for filtering data, “map” operations for transforming data, and “keyBy” operations for associating a category/group/key with each record. These all correspond to the abilities of a MapReduce “mapper” function. The Spark programming API allows these to be defined as a sequence of separate function calls, rather than as a single function call but the result is the same. A well-known principle of functional programming is:
input
.map(record=>f1(record)) // first pass
.map(record=>f2(record)) // second pass
is equivalent to:
input.map(record=>f2(f1(record))) // one pass
and Spark applies this principle automatically, ie what appears to be multiple passes over data in the source-code is actually a single pass, applying both functions to each record (as long as no repartitioning aka reduceByKey aka shuffle occurs between operations).
Spark also supports “reduceByKey” operations which are equivalent to MapReduce reducer functions. And like MapReduce, this requires a preceding “categorisation” pass to allocate a category/group/key to each record and requires a “shuffle” operation to ensure that all records with the same category/group/key are moved to the same physical server (though records with a different category can be processed in parallel on a different server).
Unlike MapReduce, Spark supports a reduce operation without key, allowing the associated code to see all records in the local dataset (effectively, a random subset of the overall dataset). MapReduce does have something similar called a “combiner function”.
Spark also supports a number of API methods which internally generate multiple tasks of map-like and reduce-like behaviour; distinct, countByKey and aggregateByKey are examples. Having this pre-built into the language is certainly nicer than having to implement something similar with the old MapReduce API.
The Spark Driver
One thing that is different between MapReduce and Spark is that in Spark the “driver” program is part of the processing, while in MapReduce it is purely a coordinator.
When a Spark task completes, it can return data to the driver. In particular, operations like “collect” cause the entire output of a set of N instances of the same task to be directed back to the driver. Of course this should be used with care - but when the tasks are only generating a small amount of data it is more sensible for the driver to receive and process that data than to spawn another task which somehow obtains that data and processes it.
Broadcast Variables
The driver can push data to every executor via the “broadcast variables” mechanism. This is normally done to provide small “lookup tables” or config-info to each task. Broadcasting is done via the SparkContext object.
Tasks are sent to executors as serialized objects, so associated data can also be embedded in the task objects (by simply allowing the task closure to capture the data). However this is less efficient than broadcast variables as the data is sent once per task rather than once per job.
Arbitrary files can also be pushed to each Executor (and then referenced from tasks) as part of the original job request. This is used to push the jarfiles containing the application’s code, but can also be used to push resource-files of any type.
Spark Memory Caches
Much is made of Spark’s “in-memory caching” of data vs MapReduce. However this helps only for some algorithms.
As noted earlier, multiple “map” passes over data are not necessary; because each record is processed individually the multiple passes can always be merged into one.
However multiple “reduce” passes over data can be useful. MapReduce cannot efficiently do this, but Spark can just cache the output of a reduce operation in memory (overflowing to disk only if absolutely necessary) and then if the next reduce task is assigned to the same executor then the data can be iterated over again without ever needing to be flushed to disk.
Some algorithms are not a single pipeline of transforms, but instead a “directed acyclic graph” where the same intermediate results may be processed in different ways. Spark’s ability to cache data in-memory can be useful here.
In-memory caching can potentially help with the “shuffle”, as data requested by remote nodes can be served from memory. However as the Kafka message broker has proven, streaming file writes and streaming file reads are extremely efficient in modern operating systems - often better than in-memory caching, and less complicated. As long as seeks are not involved, avoiding disk access (even for traditional rotating media) is not really that important.
Spark’s API provide a number of methods to simply “load the entire local dataset into memory”. This can allow interesting applications, such as matrix maths over multiple records. Obviously it does bring with it the danger of OutOfMemory errors - the application developer needs to be sure of their estimates of the number of data elements in each partition of the dataset.
Spark Streaming
A standard Spark program does batch processing - it reads and transforms the complete (and finite) set of input data, then terminates. A Spark streaming application instead runs every N milliseconds, procesing whatever data has been received since the last run - an approach called “microbatching”. The interval may be anywhere from 50 milliseconds to multiple minutes. As with batching, the driver application organises each each microbatch, dividing the work into a set of tasks which are then distributed over a set of executors, and monitoring/coordinating the processing done by tasks. When a microbatch is complete, the driver then sleeps until it is time for the next microbatch to start.
The input data stream may be naturally “partitioned” - in particular, the input may be a Kafka topic which naturally has partitions. Spark streaming works with these partitions, ensuring that no two tasks read from the same partition at the same time (to preserve ordering of records within a partition). However tasks processing the same input partition in different micro-batches may be executed on different spark executor instances over time.
Alternative streaming solutions (eg Storm, Flink or Kafka Streams) dedicate a single process to each input data “partitions” long-term. For example, if the input data stream has 20 partitions, then 4 processes may be started each handling 5 partitions. Each process simply runs in a continuous loop reading blocks of input data, processessing and then writing the results.
The problem with the dedicated-process-per-partition approach is that if one process is running slow then all processes must either slow down to match the slowest process, or the timestamp on the data that each server is processing will drift apart. With the microbatching approach, each “slice” of the data generates (following the previous example) 20 tasks which are distributed over 4 executors. If one executor is running slowly, then it might only complete 2 tasks in the time that the others complete their 5 tasks - and the remaining tasks will be distributed over the now-idle executors. Similarly, if an executor crashes then the tasks it was allocated can simply be redistributed to surviving nodes; with dedicated processors for input partitions recovery from a crash is not so trivial.
There are however also problems with the microbatching approach. Issues include:
- when restarting a microbatch streaming app after an outage, there can be a significant backlog of data to process. If all this data is included in the current batch, then the time to process the data can be far longer than the usual microbatch window. If, instead, each batch is limited to processing a maximum number of input records per batch then that limit needs to be carefully tuned to avoid having the system sit idle while work is available, ie can result in lower-than-optimum throughput.
- the unit of “recovery” when a task fails is the whole task; that means that to avoid duplicated (at-least-once) delivery of results, data should be “committed” to the output only when a task fully completes. Not all output technologies support this (eg Hive does, but Kafka does not).
- understanding/monitoring what is going on in a microbatching streaming application (and Spark in particular) is more complicated than when there is a dedicated process per input data partition.
SparkQL
Hive is a library which (simplified) provides an API that takes a query string as parameter; it generates as output an application which uses the MapReduce, Tez or Spark library. The application reads ORC or Parquet files (and also supports other datasources) and performs the processing requested in the original string. The compilation process requires information about the “table structure” being queried - normally this is obtained by making a request to an HCatalog server where (table, schema, files)
information is registered.
Hive can also be deployed as a standalone service (ie the Hive library is combined with a main-method) which accepts SQL statements over a network connection, and passes them to the Hive library. The protocol is not REST, but there is a corresponding JDBC-compliant library that can be used from Java applications to communicate over the network with a standalone Hive server.
Spark provides similar functionality, also compiling SQL statements to an application which can be executed in a cluster - and using info from HCatalog if desired.
Spark DataSets
The original API offered by the Spark library is a row-based one; user code is passed each row (record) of the input data as a Java object containing a child object for each field in the row. The user code then transforms the row, returning another row - or returns a boolean in the case of filtering, etc. This is similar to how data is processed in MapReduce, where a mapper or reducer function is passed an object for each record to be processed.
There is however a significant limitation to this row-based approach: before calling any user-provided code, the current record needs to be represented as a Java object in memory. Unfortunately, Java objects are extremely space-inefficient; a simple object of any type takes at least 20 bytes consisting of a pointer to its class-object and various other administration overhead. Strings also need to be stored as UTF16, and numerical types also have overhead. And every object must be in heap memory, and be garbage-collected when no longer needed. And when such objects need to be written to disk or over a network, they need to be serialized which also takes significant CPU and memory resources.
The Spark DataSet API is an alternative way for users of the Spark framework to apply logic to the input data without forcing the framework to convert every record to a Java object representation; logic is written in the form of UDFs (User Defined Functions) which take only specific property values as input. Spark also provides a large set of predefined functions for operating on property values. The user then specifies the operations to be applied to data in a SQL-like manner, ie expresses filter, transform, sort, join, and other operations as combinations of various UDFs applied to specific properties of the input rows. This approach allows Spark to represent records in the input, intermediate, and output datasets using an optimised internal representation without needing to build traditional Java object representations of them.
In order to function, the DataSet API does need a schema for the input records, ie needs to know which fields exist in each input row and what datatypes they have.
Records in DataSet format can be converted to and from the traditional API as needed.
Spark RDDs
One key word that is found over and over in the Spark documentation is RDD - Resilient Distributed Dataset. An RDD is often described as something magical or very very clever but it is actually quite simple: it is just metadata about a dataset.
As an example, it is common for a Spark application to start by creating an RDD representing a HDFS file whose contents are to be processed. The resulting RDD is simply a datastructure containing the following fields:
- type: HDFS RDD
- name of input file
- number of partitions in the file
- locations (servers) holding each partition of the file
Another RDD can be derived from the above RDD by applying a filter or transform operation to it; the resulting RDD is simply a datastructure containing:
- type: Map RDD
- parent RDD: see above
- transformation: bytearray containing a serialized closure which is the logic to apply to each record
When Spark eventually needs to evaluate an RDD, it then looks at the RDD and its ancestors, merging RDDs which can be executed in a single pass, then generates a corresponding set of tasks that will carry out the processing described by the RDD and its ancestors.
An RDD is “resilient” during execution because a task that fails to generate its partition of an RDD from the upstream data can be reexecuted. If the upstream data no longer exists, then the steps that produced it can also be reexecuted, etc. This allows Spark to aggressively cache intermediate data in-memory-only, knowing that at the worst case it can back up and recreate it. Nevertheless, Spark may choose to persist some intermediate results to disk. Unlike MapReduce, where a JVM hosting a task always terminates after the task is complete, Spark runs tasks within application-scoped JVMs, allowing it to retain references to memory generated by tasks. Spark’s internal scheduler then dispatches subsequent tasks to the JVM hosting its input data (the JVM having been started via YARN).
The term “distributed” is justified because an RDD does not hold the data - it just holds metadata about where the input data really is. Multiple tasks can then be started on multiple executors which independently retrieve different subsets of that input dataset.
Other Notes
An interactive Spark session is a single Spark “application”, spawning jobs at irregular intervals.
The Spark architecture does require the YARN-spawned processes to be JVMs, something that YARN does not require. The current HMapReduce implementation also require the YARN-spawned process to be a JVM, even when the map/reduce logic is implemented in an external language (such as Python) invoked via MR “streaming”, although it would be possible to implement a non-Java MapReduce.