This article briefly covers several widely-used open-source software projects for processing “big data”, ie tools for analysing data collections of size tens of terabytes up to multiple petabytes.
It is assumed that the reader is familiar with the concepts of non-relational data storage, clustered filesystems, and similar. Those topics are covered in this companion article.
There are a lot of tools in this area, and many of these summaries are pretty brief - it’s the “10,000 foot view” of big-data processing. Hopefully at some time I will be able to fill out some of these sections, or write dedicated articles on some topics.
WARNING: I am not an expert in this area; read with caution!
The Brute Force Approach to Big Data Processing
The simplest approach to analysing large amounts of data is to:
- buy a great big server with lots of disk and ram
- install a relational database like Oracle
- insert lots of data
- run SQL commands against the database, or write programs that access the data via JDBC or similar
There are two obvious problems:
- limits on the CPU and IO bandwidth of the relational database itself, and
- limits on the CPU and network bandwidth of the computer hosting the client application
There is a limit to the amount of IO bandwidth and CPU power in a single server; the only solution is to use a cluster of servers. However relational databases don’t cluster well. SQL-based client applications don’t cluster particularly well either; when reading just a single table it may be possible to run multiple client processes each analysing a portion of the table (ie selecting only specific ranges of keys), but the relational reliance on joins often makes this ineffective - and each client process is still pulling its input data from a central relational database over a network, which can quickly become IO-bound.
The solution is to provide tools which:
- distribute data across multiple hosts to allow better scaling of persistent storage
- distribute database servers across multiple hosts to allow better scaling of CPU/IO-bandwidth/network-bandwidth
- use a non-relational model which has fewer serial-processing constraints (X must complete before Y)
- allow custom processing code (ie stuff other than SQL or similar languages implemented in the database itself) to be executed on the node on which the relevant data is stored.
Widely Used Open-Source Big Data Processing Tools
Here is a brief list of some of the most widely-used open-source projects that are related to processing of data “at rest”:
- Apache Hadoop MapReduce (actually, mostly obsolete now)
- Various SQL-like Query Languages (including Hive and CQL)
- Apache Spark
- Apache Hive
- Apache Pig
- Apache Hama
- Google Pregel and Apache Giraffe
- Apache Tinkerpop
Here is a brief list of projects related to processing data “in motion” (aka real-time processing):
- Apache Spark
- Apache Storm
- Apache Flink
- Apache Kafka Streams
Yes, the current dominance of the Apache Foundation in the big-data world is impressive - anybody with an interesting open-source big data project seems to want to have it hosted by the Apache Foundation. The Java language is also dominant, although there are a few non-java-based projects (often not open-source).
Many other significant projects are not listed here; there are simply too many to describe…
Apache Hadoop MapReduce
Apache Hadoop is one of the earliest open-source big-data projects, and still very active. This project provides three core components, of which one (MapReduce) is about “data processing” - but it builds upon the other two components.
The Hadoop project officially provides:
- Hadoop Distributed File System (HDFS)
- Hadoop YARN
- Hadoop MapReduce (relies on YARN)
HDFS turns a set of commodity servers running a standard operating system (eg Debian) into a big pool of storage space for files. Unlike many other distributed filesystems, it deliberately exposes the names of node(s) on which a file is stored, so that other programs can make efficient decisions about where to run logic which reads the file. See this article for additional information.
YARN turns a set of commodity servers running a standard operating system (eg Debian) into a big pool of CPU and RAM, in which processes can be executed. Many other projects use a YARN cluster to execute parallel logic. HDFS and YARN are often installed together on the same physical servers to create a combined cluster capable of both storage and processing. See this article for more information.
MapReduce is a design-pattern for performing parallel processing of large sets of records. Hadoop MapReduce is an implementation of the MapReduce design pattern; it is a framework which coordinates HDFS and Yarn together to analyse a large file (sequence of records) by spawning N processes in a YARN cluster, each processing 1/Nth of the overall input. When the input file is in HDFS, then the processes are allocated near the data they read - ie code is brought to the data, rather than data being brought to the code; this can provide very large performance improvements for IO-bound processing. To avoid confusion between the generic MapReduce pattern and the Hadoop MapReduce framework, I will refer to the latter (implementation) as HMapReduce throughout this article. See this article for more information.
Actually, directly writing MapReduce code by hand is seldom done now; there are better tools which are faster and more user-friendly (particularly Spark). The use of MapReduce as a “back end” for other tools is also fading rapidly, being replaced mostly by Tez or Spark. However (in my opinion) learning how MapReduce works is worthwhile; many big-data processing tools use similar (but more complicated) techniques under the hood. Consider it like learning how a 1960s petrol car motor works - modern motors use similar principles but are far more complex and not such good “learning tools”.
HMapReduce, Tez, Mesos and Spark as Job Executors
The HMapReduce library is used as a helper by many higher-level tools - ie some “database-like” tools (such as Hive) execute queries by dynamically generating HMapReduce programs and executing them on a YARN cluster.
Tez is (very roughly) a “helper library” for YARN, allowing higher-level applications to more easily execute run processing logic in parallel in a YARN cluster. Using HMapReduce as a framework for executing logic has some limitations, particularly related to efficient performance; Tez is a more efficient “target platform” for such tools.
Mesos is a cluster workload scheduler somewhat like YARN - but on a more generic level. It can be considered a “meta-scheduler” which tracks the available resources in a cluster of servers and offers them to one or more “subschedulers”. Yarn can be used as a Mesos subscheduler - thus allowing multiple YARN logical clusters to share the same physical servers.
Spark offers a parallel-processing-framework for programming (ie competes with HMapReduce), and a query-language that compiles to programs that use the spark parallel-processing framework (ie competes with Pig/HiveQL). Spark-based programs can be executed on a YARN cluster. Alternatively, a Spark program can act as a Mesos “subscheduler” to run directly on hardware managed by a Mesos cluster. Other features of Spark are discussed later in this article.
Query Languages as a Data Processing Tool
SQL and related declarative query languages can be seen as “data processing tools”.
Apache Hive provides an “SQL compiler” which transforms reasonably-standard SQL queries into programs based on HMapReduce, Tez or Spark, and then runs them within a YARN (or Mesos) cluster. The data that the queries run against (ie the data which the generated programs read) may come from various supported sources, including:
- files stored in a distributed filesystem (eg HDFS), holding data in simple formats such as comma-separated-values
- files stored in a distributed filesystem (eg HDFS), holding data in moderately complex and optimised formats such as ORCFile.
- Apache HBase (accessed via GET/SCAN operations sent to HBase servers over a network)
Hive also supports SQL insert/update/delete statements with some data-storage back-ends, eg ORCFile or HBase, but describing this functionality is not really in scope for this article.
Note that because Hive works by generating HMapReduce/Tez/Spark “programs”, anything that Hive can do can also be done directly via these cluster-based programming tools.
Apache HBase does not provide a query-language directly, but several other projects support executing queries against HBase, including Apache Hive, Apache Phoenix and Apache Drill. Apache Cassandra is similar to Apache HBase in many ways, but provides its own query-language (CQL) as part of the standard distribution. There are also adapters to access Cassandra from HMapReduce/Tez/Spark applications.
In order for HMapReduce/Tez/Spark applications to be efficient at accessing data, the underlying datastore must expose expose data location information to clients, so that the applications can minimise network IO by locating processing logic near to the source of data. HDFS, HBase, and Cassandra all expose the necessary information. HBase and Cassandra also have a data model which allows nesting child records within their parents to minimise joins, and which allows fine-grained control over record order which can also improve batch-mode analysis performance. In addition, they partially support “column store” format, which reduces disk IO. These are not processing technologies themselves, but features which allow MapReduce, Spark, and similar technologies to work more efficiently.
Spark is a data-processing tool with a number of features:
- It provides sophisticated Scala and Java programming APIs for writing “parallel” programs, ie programs that execute on many threads or machines concurrently. The API is inspired by functional programming, which allows the Spark engine the ability to restructure code for optimal performance.
- It provides SparkQL, a declarative SQL-like query language.
- It provides a “streaming” mode, in which a spark program is repeatedly applied to new records as they arrive, rather than being applied to a fixed set of pre-existing records
- Some other tools use it as a “scheduling layer”; they “compile” requests to Spark code, and then submit it to the spark server for execution on some cluster such as Yarn.
The first and last features are in direct competition with HMapReduce. Spark is newer than HMapReduce and generally considered to be superior, providing better performance and also being easier to program.
Spark is very often used with HDFS (data input/output) and YARN (process distribution), but also supports a variety of other filesystems and clustering platforms. In particular, it was originally designed for the Mesos clustering system.
Spark programs can read from and write to various big-data databases, for example Apache Hive. This allows Java or Scala code to be used to implement logic that simply cannot be expressed in the query-language of other tools.
Pig is similar to Hive: a “query compiler” which generates programs which are then executed on a YARN cluster (via MapReduce, Tez or Spark). Like Hive, the compiled queries are applied to data stored in “files” of various formats (the same formats Hive supports). Pig can also use “schemas” from a Hive metastore.
Pig’s query language is named “Pig Latin”, a pun referring to a wordgame popular with children. It is described in the official documentation as “a dataflow language” and “a parallel programming language for data analysis”, and feels somewhere between declarative SQL and an imperative language like Basic (but without loops). The intent of Pig/PigLatin is to allow data-analysts to perform complex queries (more complex than SQL can express) without needing to program in Java or a similar language.
Hama is a framework for supporting BSP (Bulk Synchronous Parallel) computing.
BSP is a programming model somewhat like a generalized version of the MapReduce algorithm. Each BSP program does:
- local computation
- process communication
- barrier synchronization
Some framework then distributes multiple copies of a BSP program across a cluster and feeds each instance with appropriate input data.
Hama provides a bunch of APIs (including base classes that a BSP program must subclass). It also provides the “runtime”.
The data that is fed into BSP programs usually comes from files on HDFS. An API allows communication with other “peers” (which may be objects provided by Hama such as a datasource or may be an external process. Eventually a program will invoke sync() and wait until all other peers have reached the same point - similar to the point in MapReduce where the map phase has ended and the shuffle/reduce phase starts.
Google Pregel and Apache Giraph
The Bulk Synchronous Parallel (BSP) programming model is designed to process large amounts of tabular data by distributing the work across multiple nodes (see Apache Hama above). It would appear that Google’s Pregel is the equivalent model for processing large amounts of data stored in a graph database (or at least in graph form, with data represented as “vertices” (objects) and “edges” (relations/assertions)).
Apache TinkerPop is a “graph computing framework” (currently in incubator). It appears to provide a way to apply processing logic to vertices and edges in a graph database, in a clustered/scalable manner.
The core parts appear to be:
- Tinkerpop Structure API: a portable graph-storage-access API that runs on multiple graph databases
- Tinkerpop Process API: declaratively declares which nodes to process (traverse)
- VertexProgram is one kind of task to apply to nodes, and MapReduce is another
- GraphComputer is a system that runs VertexProgram/MapReduce jobs in either OLTP or OLAP mode (see below).
AIUI, Tinkerpop assumes that an “OLTP” system will be visible as a single running database server that handles requests - ie the traditional “database server” style of access. It therefore executes tinkerpop “jobs” (whether VertexProgram or MapReduce) directly against this server without using a “cluster” of any sort. No BSP-, Pregel- or HMapReduce-style concurrent processing is done in “OLTP” mode. The database server must support the Tinkerpop APIs; supported databases include Neo4J.
However when Tinkerpop is in “OLAP” mode, it assumes that there is no “running server” - or at least not one that it should access directly. Instead, VertexProgram/MapReduce definitions become “jobs” which it dispatches via either Apache Giraph or Apache Spark. These jobs are expected to then fetch their data directly from files on a distributed filesystem rather than an active database server - eg via the Hive HCatalog system. The Tinkerpop documentation refers to this as “supporting Hadoop”.
Data in Motion : Message Brokers and Event Processors
The tools mentioned so far are about processing “data at rest”, ie data already saved and static. However there are other situations where data needs to be processed as it flows through a system. The following tools are relevant for such cases:
- Apache Kafka is a high-performance message-broker (similar to ActiveMQ) which is popular in “big data” environments. The current version (0.9.1) includes a “preview” of a native streaming API for processing events present in kafka.
- Apache Storm is a framework for applying logic to streams of events, particularly those coming from an event broker, or rows being inserted into a database. Hadoop etc are for “after-the-fact processing” while storm is for “real time” processing.
- Apache Flink is another framework for applying logic to streams of events, and is currently very active and well-regarded. The programming API is somewhat similar to Spark.
- Spark Streaming is a component of the larger Spark framework which applies Spark programs to a stream of records rather than to a fixed pre-existing set of records.