Kafka Connect

Categories: BigData

Introduction

I’ve already written about the Apache Kafka Message Broker. It is a fine tool, and very widely used.

Kafka Connect is another component of the Apache Kafka project; it is dedicated to importing data from external systems into Kafka topics, and exporting data from Kafka topics into external systems. Kafka Connect is included as part of the standard Kafka download but enabling Kafka Connect requires explicitly starting a Kafka Connect daemon on one or more servers. The daemons form their own “cluster”, separate from the cluster of Kafka message-broker nodes, in which the configured “connectors” are run to import/export data.

The Kafka Connect documentation available from the Apache Project is reasonable but fails to address a number of important topics. The documentation from the company Confluent is more extensive, but unfortunately does not make clear the lines between open-source and commercial features. So here is a brief overview…

This article is based on Kafka 0.10.1.0 and the Confluent Open Source release 3.2.0 (current version at March 2017).

Kafka, Kafka Connect and Confluent

The Apache Kafka project is the home for development of the Kafka message broker and Kafka Connect, and all code it hosts is open-source. The Kafka project does not itself develop any actual connectors (sources or sinks) for Kafka Connect except for a trivial “file” connector. A list of connectors for various technologies is hosted at the Kafka Connect Hub - which is run by the company Confluent. Note however that this list is a combination of open-source and proprietary connectors, with no clear indication of which is which - you must follow the link for each connector and read the associated information to determine its state.

Confluent was founded by one of the inventors of Kafka, and its staff contribute significantly to the Apache Kafka project. As well as hosting the Kafka Connect Hub, they provide consulting and support services, and additional Kafka-related tools.

The open-source components available from Confluent are:

  • Kafka serializer and deserializer for AVRO format (with optional Schema Registry support)
  • The Kafka Connect Schema Registry
  • Kafka Connect connectors for JDBC, HDFS, S3, and Elasticsearch
  • Kafka client libraries for non-Java languages (the Apache project only provides Java and Scala client libraries).
  • The Kafka REST Proxy (allows Kafka Broker publish, subscribe, monitor and administrate from any language or tool)

The “Confluent Platform Open Source Edition” available from the Confluent website is simply a repackaging of the standard Kafka download to include the open-source components listed above; you could install them all yourself, but confluent nicely provide them all together.

Confluent also provides proprietary add-ons and tools for Kafka and Kafka Connect:

  • Confluent Control Center provides monitoring/statistics of Kafka topics in general, and provides a nice way to configure Kafka Connect
  • Data balancing between Kafka nodes (this is a generic Kafka management feature, and not specific to Kafka Connect)
  • Replication between separate Kafka clusters (via a proprietary Kafka Connect connector)
  • Cloud integration

The “Confluent Platform Enterprise Edition” includes all the above components - but requires a license.

The sourcecode for open-source components from Confluent (ie those bundled in the Confluent Platform Open Source Edition) is available via the Confluent Github account. The binary artifacts themselves are not registered in the standard maven repositories; if you wish to references them from buildfiles then they must be manually uploaded into a local repository or repository-manager.

Why Kafka Connect?

There are a number of existing tools designed for data import and export which are also capable of writing to Kafka or reading from Kafka, such as Flume. The primary advantages of Kafka Connect are:

  • autorecovery after failure; a “source” connector can attach arbitrary “source location” information to each record it passes to Kafka Connect and on failure Kafka Connect will automatically provide this information back to the connector so it can resume where it failed. Autorecovery for “sink” connectors is even easier.
  • autofailover; the Kafka Connect nodes build a cluster and when one node fails the work it is doing is redistributed to other nodes.
  • simple parallelism; a connector can define data import or export tasks which are to be executed in parallel.

Tools such as Flume, Logstash or Heka provide a wider variety of standard connectors, and sophisticated data-transformation pipelines into which custom logic can be hooked. However their error-recovery, failover and scalability features leave a lot to be desired; they all are really designed to be run as a single daemon on a single server and often have very clumsy “reliable buffering” options which rely on local disk storage.

Unlike Flume, Logstash, etc., Kafka Connect assumes that each connector either writes to a Kafka topic (a source) or reads from a Kafka topic (a sink). Other data import tools typically have an internal buffering layer of some sort; Kafka Connect just makes this explicit and assumes this buffer is the Kafka message broker. Given that the Kafka message broker is more reliable than any self-built-and-maintained buffer implementation is likely to be, this provides Kafka Connect with a reliability/stability advantage over alternatives.

More detailed comparisons against alternate tools are presented later in this article.

Why Not Kafka Connect?

The primary disadvantages of Kafka Connect are:

  • the very limited selection of connectors at the current time
  • the poor separation of commercial and open-source features; the tool is useable with pure open-source, but the documentation and available websites (primarily those from Confluent) are very unclear on which features require licensing.
  • the lack of configuration tools
  • poor/primitive approach to deploying custom connectors (plugins) - they are simply jars on the framework classpath
  • very Java/Scala centric

Kafka Connect currently feels more like a “bag of tools” than a packaged solution at the current time - at least without purchasing commercial tools.

Kafka Connect Concepts

As described here,

  • A worker is an operating-system process (Java-based) which executes connectors and their associated tasks in child threads.
  • A connector is an object which defines parameters for one or more tasks which should actually do the work of importing or exporting data.
  • A source connector generates tasks which read from some arbitrary input and write to Kafka.
  • A sink connector generates tasks which read from Kafka and write to some arbitrary output.

Kafka Connect is not intended for significant data transformation; nevertheless the most recent versions of Kafka Connect allow the configuration-parameters for a connector to define basic data transformations. For “source” connectors, this functionality assumes that the tasks transform their input into AVRO or JSON format; the transformation is applied just before writing the record to a Kafka topic. For “sink” connectors, this functionality assumes that data on the input Kafka topic is already in AVRO or JSON format, and the transformation is applied before each record is passed to a task object.

While custom connectors can be implemented (and can thus contain logic), the framework recommends performing custom data transformations in a dedicated stream-processing framework such as spark-streaming or Kafka Streams. This allows transformation logic to be implemented in any framework which supports Kafka topics as input, and tested using standard testing tools for that framework.

Kafka Connect is implemented in Java and Scala; any custom connectors must also be implemented in one of these languages.

Dependencies

Whether run in stand-alone or distributed mode (see later), Kafka Connect nodes require a connection to a Kafka message-broker cluster - ie direct network access to every node in the Kafka message-broker cluster.

For distributed mode, there are no other dependencies. Kafka Connect nodes are completely stateless; even the connector configuration settings are stored in a Kafka message topic. This makes Kafka Connect nodes very suitable for running via technologies such as Docker - provided the inputs and outputs for the specific connectors configured in Kafka Connect are accessible (eg when a connector must read from a local filesystem, then that filesystem will need to be available).

for standalone mode a small amount of local disk storage is required to store the “current location” and the connector configuration.

Distributed Mode

A Kafka Connect worker instance (ie a java process) is started with a Kafka Broker address, the names of several Kafka topics for “internal use” and a “group id” parameter. Each worker instance coordinates with other worker instances belonging to the same group-id via the “internal use” Kafka topics. No other external coordination mechanism is needed (no Zookeeper, etc) - everything is done via the Kafka message broker. These internal topics can be automatically defined but it is better for the sysadmin to explicitly define them with appropriate settings.

The workers negotiate between themselves (via the topics) on how best to distribute the set of connectors and tasks across the available set of workers. If a worker process dies, the cluster is rebalanced to distribute the work fairly over the remaining workers. If a new worker is started, a rebalance ensures it takes over some work from the existing workers.

There is no central server in a Kafka Connect installation, just the worker instances. The (commercial) Confluent Control Center is an administration tool with a nice UI for administering Kafka and Kafka Connect, but it is optional.

A connector is generally run just once when it is defined (uploaded via REST) - and again if its configuration is changed. The connector generates one or more tasks which are responsible for actually reading data (source) or writing data (sink); the tasks are then automatically distributed across the cluster (via the internal Kafka topics). The “max tasks” setting for each connector controls how finely work is divided. As an example, a JDBC source connector configured to replicate all tables from a source database may inspect the database on startup and generate one task per table; if tasks.max is less than the number of tables then it must configure some of the tasks to handle multiple tables. Tasks are long-running and single-threaded. The set of tasks for a component are usually computed only at connector initialisation. It is possible for a connector to periodically recompute its set of tasks, triggering a “rebalance” if the number of tasks or their config has changed, but it is non-trivial to implement and not many connectors actually do so.

A Kafka Connect worker instance can run multiple tasks concurrently as internal threads. For example, 10 tasks can be executed by 2 worker processes simply by starting 5 threads in each worker process.

Kafka Connect is not responsible for launching worker instances, or restarting them on failure. Worker processes can be launched in any desired manner, eg manually, via systemd or sysv-init, Docker, Mesos, Yarn, etc. Any restart-after-failure or process-migration is similarly done externally to Kafka Connect; Kafka Connect simply takes care of new processes joining and existing processes leaving a federation by distributing work appropriately.

Individual worker instances are stateless, ie no state is cached on local disk. State is instead stored in the “internal” Kafka topics. State includes the read-offset for sources and sinks so that migration of workload from one worker to another happens without (excessive) data duplication.

Standalone Mode

Standalone mode is simply distributed-mode where a worker instance uses no internal topics within the Kafka message broker (and thus cannot collaborate with peers). The process runs all specified connectors, and their generated tasks, itself (as threads).

Because standalone mode uses no Kafka Connect “internal topics” for storage, it instead stores current source offsets in a local file (for use on restart). In standalone mode, information about the connectors to execute is provided as a commandline option (in distributed mode such config is registered via REST and stored in a Kafka topic).

Running a connector in “standalone mode” can be valid for production systems; it is the way most ETL-style workloads have traditionally been executed in the past. In this approach, managing failover must be done in the traditional way - eg by scripts starting an alternate instance.

Standalone mode may be appropriate for deploying on systems generating events, allowing events to be pushed directly from such systems into a Kafka message broker cluster. Note however that Kafka client applications (including Kafka Connect daemons) require direct network access to all nodes of their Kafka cluster; data is partitioned at the client and pushed directly to whichever Kafka broker nodes are hosting those partitions.

Launching a Worker

A worker instance is simply a Java process, and is usually launched via a provided shell-script. The worker instance then loads (from its CLASSPATH) whichever custom connectors are specified by the connector configuration. The configuration is provided on the commandline for standalone mode, and read from a Kafka topic for distributed mode.

There is also a standard Docker container image for launching a Kafka Connect worker; any number of instances of this image can be launched and will automatically federate together as long as they are configured with the same Kafka message broker cluster and group-id.

REST API

Each worker instance starts an embedded webserver through which it exposes a REST api for status-queries and configuration. For workers in distributed mode, configuration uploaded via this REST API is saved in internal Kafka message broker topics. For workers in standalone mode, the configuration REST apis are not relevant.

The Confluent Control Center provides much of its kafka-connect-management UI by wrapping the worker REST api.

Monitoring of Kafka Connect daemons could potentially be done by Nagios or similar via REST calls to periodically obtain system status.

Connector Types

A connector can be created by implementing a specific Java interface. There are many existing connectors, and writing custom ones is also possible - of course the code must then be available when the worker is launched.

A Kafka Connect worker simply expects the implementation for any connector and task classes it executes to be present in its classpath. This code is loaded directly into the application without the benefit of child classloaders, an OSGi framework, or similar. Care must therefore be taken when implementing custom connectors not to depend on any libraries which conflict with Kafka Connect’s own dependencies.

The following connectors are included in the “Confluent Open Source Edition” download package:

  • JDBC
  • HDFS
  • S3
  • Elasticsearch

Currently there appears to be no way to download these connectors individually, but as they are open-source I presume they can be extracted from the Confluent Open Source download and copied into a standard Kafka installation if that is preferred.

Documentation for the “replicator” connector is presented on the Confluent documentation site mixed in with the above, but it is not included in the “open source” download bundle. In other words, the documentation fails to mention that it is actually a commercial connector which requires licencing the Enterprise version.

The “file source” and “file sink” connectors described in the documentation are part of the standard Apache Kafka download bundle. However they are simply example connectors meant as starting-points for custom connectors rather than being particularly useful as-is. The Connect Hub lists connector kafka-connect-spooldir as a “community” provided file-source connector; unfortunately at the current time it is not particularly advanced, performant or featureful. Among other things, it reads each input file completely into memory and is thus not useful for large inputs. In general, importing of files is a tricky thing to write a generic source connector for; the directory-structures and ways in which flag-files etc are presented by the writing application make it often necessary to implement a custom connector for file reading.

The selection of connectors, and the way in which Confluent’s “Hub” presents them, is definitely the weakest part of Kafka Connect at the current time. The “installation process” for adding custom connectors to a Kafka Connect intstallation is also rather primitive; it rather leaves the impression that Confluent are pushing users of Kafka Connect hard to install their prebuilt bundles (at least the open source version). Nevertheless, it is possible to use just the Apache Kafka standard bundle with a little effort. Hopefully this will improve over time.

Configuring Kafka Connect

Each worker instance is started with a commandline option pointing to a config-file containing options for the worker instance (eg Kafka message broker details, group-id).

In standalone mode, a worker is also given a commandline option pointing to a config-file defining the connectors to be executed. In distributed mode, each worker instead retrieves connector/task configuration from a Kafka topic (specified in the worker config file). Configuration for distributed mode is updated by making a call to a REST api on any worker instance; the provided data is persisted by the worker to the Kafka topic.

Even in standalone mode, a worker process provides a REST api for status-checks etc.

The REST api can be used to pause and resume connectors (in both standalone and distributed mode).

NOTE: Configuration options “key.converter” and “value.converter” options are worker-specific, not connector-specific, ie are in the per-worker configuration file. Therefore every Kafka topic accessed by every connector for a worker cluster must use the same serialization format for data in the topic - seems odd. Not usually a problem, but it seems like this should be a per-converter option to me.

Connections from Kafka Connect Workers to Kafka Brokers

When in distributed mode, each worker establishes a connection to the Kafka message broker cluster for administrative purposes. These settings are defined in the worker configuration file as “top level” settings.

For each connector, a separate connection (set of sockets) to the Kafka message broker cluster is established. Many of the settings are inherited from the “top level” Kafka settings, but they can be overridden with config prefix “consumer.” (used by sinks) or “producer.” (used by sources) in order to use different Kafka message broker network settings for connections carrying production data vs connections carrying admin messages. However these settings apply to all sinks (consumers) or sources (producers) - it cannot be configured per-connector.

Note that the Kafka connections are established by the core Kafka Connect code, and passed as parameters to the connector implementations. Thus it is possible to pass Kafka sessions with “limited rights” into not-completely-trusted connector code, preventing it from easily writing to unwanted topics. Of course as Java runs in a single process, and its security model is not very reliable, this is not a 100% safe way to sandbox untrusted custom connectors, but better than nothing.

The Standard JDBC Source Connector

The connector hub site lists a JDBC source connector, and this connector is part of the Confluent Open Source download. It does not seem possible to download this separately; for users who have installed the “pure” Kafka bundle from Apache rather than the Confluent bundle, then it is presumably necessary to extract this connector from the Confluent bundle and copy it over.

It has the following configuration options:

  • a database to scan, specified as a JDBC url
  • a poll interval
  • a regular expression specifying which tables to watch; a separate Kafka topic is written to for each table
  • an SQL column which has an “incrementing id”, in which case the connector can detect new records (select where id > last-known-id).
  • an SQL column with an updated-timestamp in which case the connector can detected new/modifiedrecords (select where timestamp > last-known-timestamp)

Strangely, although the connector is apparently designed with the ability to copy multiple tables, the “incrementing id” and “timestamp” column-names are global - ie when multiple tables are being copied then they must all follow the same naming convention for these columns.

Different tables are processed in parallel; each single table is processed in single-threaded style.

Scalability Limitations of Kafka Connect

Note: the following information is based purely on logical deduction; I can find no external documentation or articles which address these issues, and have not read the Kafka source-code (yet). Corrections are welcome!

A connector runs in a single thread. The connector generates configuration for long-running tasks, each with their own source of data to be moved into Kafka (source) or target to move Kafka data to (sink). Tasks are executed in parallel over all connect worker instances. The maximum number of tasks which a connector generates at initialisation is set in configuration (tasks.max).

How effectively data can be processed in parallel depends upon the source-type, and upon the ordering constraints required for the copied data.

  • When the source is Kafka (ie for all sinks), then the maximum parallelism for reads (sources) is one task per partition of the topic being read. Given that Kafka guarantees ordering via partitions, and each partition is processed only by a single thread, any ordering present in the input topic is preserved in the outputs.

  • When the standard JDBC-source is used, then unfortunately the Kafka Connect documentation is rather unclear on how/whether parallelism is supported. However I presume the connector generates one task per table (unless tasks.max would be exceeded, in which case some tasks are responsible for multiple tables). This implies that processing of a single table is simply single-threaded. The SQOOP tool does implement parallelism within a single table, and has a number of tricky config-options to support this which Kafka Connect’s “standard” JDBC source connector implementation appears to be missing - and which would be hard to implement anyway, given Kafka Connect’s “static tasks” approach. The JDBC-source config is also weird in that it supports multiple tables but defines “mode”, “incrementing.column.name” etc as connector-wide settings, rather than per-table. There does not appear to be any direct way to control message key associated with records written to Kafka. This means that the records imported are by default distributed randomly over all partitions of the target Kafka topic and will thus be processed in effectively random order when consumed from the topic (unless the target topic has just one partition). The relatively new “transforms” feature for Kafka Connect does seem to provide ways of setting the Kafka message-key to fields extracted from the current record (I haven’t personally tried this).

  • When a File source of some kind is used then the number of Task objects is still fixed at startup - new ones cannot be dynamically spawned when directory-scanning finds new files to process. I suppose a task can internally implement its own thread-pool, but Kafka Connect does not help with that - and would not distribute such a pool across the cluster. Even if parallel-processing is implemented somehow, the amount of parallelism possible depends upon the desired data-ordering. If it is irrelevant in which order records from the input files are processed, then multiple (filename, offset, length) blocks could potentially be processed in parallel. However the ordering of records seen by a consumer of the Kafka topic would effectively be random. At the other extreme, if the input files are ordered (eg via timestamp) and the records within each file are ordered by timestamp, and such ordering must be preserved for the Kafka topic consumer, then the files must be read one-by-one as a single thread, and written to a Kafka topic with a single topic. In other words, only a single Task may be generated at a time by the connector implementation.

A partition is a subset of an overall event-stream which can be read independently of other subsets. Processing a partition in single-threaded form allows preservation of order in that partition. Kafka provides partitions, and offers only single-threaded sequential reads of partitions.

Parallelism in Kafka Connect sinks works well. The input (a Kafka topic) is inherently statically partitioned, allowing parallelism up to the number of partitions. Performance is optimal when the output is also partitioned in the same way, eg when writing an HDFS file per Kafka topic (possibly registering the files as Hive tables later).

Parallelism in Kafka Connect sources in general only works well when the input is “statically partitioned” in some way. This is sadly seldom the case for sources other than Kafka topics. Reading from database tables with JDBC might be”partitionable” if each row in the database happened to contain an “enum-like” field with a small number of values which are reasonably evenly distributed across the dataset; a separate SQL select could then be run for each enum-value, resulting in N approximately even result-sets. However that is a pretty unlikely scenario - so unlikely that the standard JDBC source doesn’t offer support for it. Similarly, if a system logged not to one output-file but to N output files in round-robin sequence, then those output files could be processed in parallel (assuming preservation of order is not important). This is also an unlikely scenario.

The partitioning must be “static”, ie fixed over long time-periods. Making a “select count(*)” then using “select .. from .. max ..” to divide the resultset into N queries is not “static partitioning”, and is not supported by Kafka Connect; that approach would require generating a new set of tasks after each “select count”, but tasks are static - defined only on startup.

Thus the common scenarios of reading from a single table, or reading from a single file, are not scalable in Kafka Connect - the work is done in a single thread. And bulk-imports of large numbers of files is also not well supported - the number of tasks is not dynamic and there is no built-in mechanism for the tasks to agree on who does what (no “shared work queue”); tasks are really completely independent once they have been launched with their connector-generated config-params statically specifying the inputs they are to work on.

Note that out-of-order processing of records from the source is actually often acceptable. In particular, if each record contains a timestamp-field and the records are simply going to land in HBase eventually, then the fact that they are out-of-order in the Kafka topic is not very important - HBase can reorder them efficiently on insert.

The Confluent Schema Registry

In the Kafka message broker itself, each message consists simply of a (key, value) pair where both key and value are plain byte-arrays. Some Kafka command-line tools (eg script kafka-console-consumer.sh) are smart enough to recognise when a key or value byte-array is actually a UTF8 string, and to render it appropriately on the console. However that’s as far as the Kafka message broker goes in “interpreting” key or value contents.

When someone needs to know what each topic holds (the purpose of the data, and its actual format) then they can potentially ask the development team for the applications which read and write those topics - or read the code. That’s fairly clumsy, but probably works for small projects. When the Kafka Broker cluster has scaled to hold dozens or hundreds of topics, read and written by dozens or hundreds of applications, then that just doesn’t scale - it is clear to outsiders at that point that a lot of very useful data is flowing through the system, but exactly what it is and how it is formatted will be very hard to determine. It is also possible for buggy producers to write non-compliant data to topics. In effect, it is like having a relational database without a schema - hard to introspect.

I have written a separate article on Kafka serialization and schema management to discuss these issues further.

Performance Statistics

It is useful to be able to gather statistics on system performance for any Kafka producer or consumer application - including Kafka Connect.

The Kafka client library exposes statistics via JMX (provided JMX is enabled for the JVM).

The Burrow tool takes the alternative approach of monitoring the “head” and “tail” offsets of each topic partition.

The standard Kafka client libraries used by Java applications include an “interceptor hook” that can be used to invoke arbitrary classes (configurable at startup) for each message. This can be used for all sorts of purposes; the Confluent Control Center includes a “metric interceptor” library which gathers statistics and periodically writes them to a dedicated Kafka topic from which it then populates dashboards and graphs. Unfortunately the Control Center UI is proprietary - and the metric-interceptor appears to be so too.

See the Monitoring section in the main kafka article for more information.

Kafka Connect Transforms

Kafka Connect version 0.10.2.0 introduced the ability to configure basic transforms of data before a source writes it to a Kafka topic or before a sink receives it from a Kafka topic. The feature is so new that there is very little documentation on it yet; the wiki page linked to above appears to be the best source of information at the moment.

The issue ticket has some more details, including a link to another ticket to add some transforms which did not make the cutoff for version 0.10.2.0.

Particularly interesting is the ValueToKey transform which allows the message-key to be set from fields in the message-value (assuming the message-value is a structured type). This allows the partitioning of messages generated by a source-connector to be (at least partially) controlled via configuration.

Only the base Transformation class is defined within the Kafka Connect core library (and thus the javadocs); the individual transforms are distributed as artifact org.apache.kafka:connect-transforms.

Security

Kafka Connect (v0.10.1.0) works fine with Kerberos-secured Kafka message brokers, and also works fine with SSL-encrypted connections to these brokers.

Unfortunately, the REST API which Kafka Connect nodes expose cannot currently be protected via either Kerberos or SSL; there is a feature-request for this. When configuring a secure cluster, it is therefore necessary to configure an external proxy (eg Apache HTTP) to act as a secure gateway to the REST services.

Comparisons with Similar Tools

The Kafka docs include a list of alternative tools for loghandling and ETL, and describes where they believe Kafka Connect is superior; some research into the referenced tools can be found below.

The main arguments made are:

  • For reliability and scalability, any ETL tool needs to provide a persistent buffer to store data read-but-not-yet transformed, in case of failure somewhere in the processing chain. Most of the referenced tools have some internal message buffering implementation of their own which is inferior to Kafka in terms of scalability and reliability (and I would have to agree). Many of the referenced tools can be configured to use Kafka as a buffer - but then why not use a tool which is built in that way already? Kafka Connect is similar to such ETL tools except that its “internal message buffering mechanism” is always Kafka.
  • Transformation logic should not be done within the loghandling framework; that just limits developers in selection of tools and languages. Instead, data should land in Kafka and from there anystreaming technlogy can be used to transform the data.
  • The existing tools do not handle failure cases well (and I would also agree there). Using Kafka as the storage mechanism, and a tool which properly saves Kafka message-offsets etc, is much more reliable. The use of Kafka topics as cluster-wide state storage (rather than storing state in local filesystems as all other tools do) also improves failover scenarios (again, I would agree).
  • Kafka Connect clustering makes data-processing more scalable. I would agree for some usecases (those where the input can be considered as statically-partitioned).
  • That “big picture” solutions (NiFi in particular) don’t match the way that large corporations work (nobody can truly manage complex pipelines from a central point as various components are managed by different groups). Maybe true, depends on use-case.

Some useful terminology:

  • log forwarder –> installed on multiple nodes to push local events to a central point
  • log aggregator –> a central service which accepts events from multiple log-forwarders, combines events, and distributes to multiple sinks.

Flume

Flume instances are independant - unlike Kafka Connect, flume instances do not build a “cluster” or communicate with each other in any way. A single flume instance can have multiple sources and multiple sinks, ie a single Flume process can support multiple “pipelines” of data concurrently.

The Kafka flume source relies on Kafka to track its “current offset” for restart purposes. Other Kafka sources typically track their “current offset” in a local file; for example the file-source keeps a local file with (filename, offset) pairs in it.

Flume:

  • is not clustered; two flume instances pulling from the same source are independent.
  • supports Kafka source, Hive sink, etc.
  • is not limited to having Kafka as either source or sink (as Connect is)
  • is moderately robust. Events are removed from the source and placed in a buffer; the buffer may be in-memory or persistent. Events are removed from the buffer only after persisting at the sink, allowing reasonable recovery. However as persistent buffering is local it does not protect against node-failure.

So in comparison to Connect:

  • Connect is slightly more scaleable (can distribute load across all members of the cluster) while flume is single-thread-per-source. However see comments on Kafka Connect scalability above.
  • Connect is more available (can handle failure of a node). Flume stores state locally, so failure of a node cannot be simply recovered from by starting the same process on a different node.
  • Connect is less flexible (covers fewer use-cases)
  • Connect requires one of (source,sink) to be Kafka - and requires direct network access to all Kafka nodes.
  • Connect has fewer off-the-shelf sources/sinks than Flume

Other notes:

  • Flume supports “fan-out” configuration to distribute data to multiple receivers. Connect offers the same functionality indirectly via Kafka.

One of the primary use-cases for Flume is to combine logfiles from many different servers into a single central location; flume is installed on each “source” server for this usecase. Connect is not designed to handle this use-case, instead being more appropriate for “pulling” source data from remote systems. Or in other words, Flume can act as both “log forwarder” or “log aggregator” depending on how it is configured.

The Kafka sources and sinks for Flume were written by Cloudera, who also published an article on the use of Flume with Kafka (which the article calls flafka).

Logstash

Logstash works similar to Flume; a single logstash process supports multiple “pipelines” for processing java. Each logstash process is independent, ie no “clustering”. The state of each pipeline is typically stored in a local file, making restarts work correctly - but only on the same node. Failover to another node is not built-in, and is complicated due to the locally stored state.

Logstash is part of the Elasticsearch project, but can be used independently. It is implemented in JRuby, ie ruby code running on the JVM, and with access to java libraries. All “plugins” for reading, writing and transforming data are written in Ruby.

The standard file input class supports only one input file, doing a kind of “tail”. Current offset is persisted to a local file, for restarts. It does handle “rollover” of such a file, ensuring it reads the remainder of the old file before resuming reading of the new one.

Unlike Flume, the collect-logfiles-from-many-hosts use-case is normally solved with logstash by running logstash at the “server”, and running filebeat at each client (host generating the logfiles).

By convention, each “pipeline” is defined in a file under “/etc/logstash/conf.d”. Logstash loads them all on startup. There is also a single config-file for the overall application. Config files are local. Status information is also stored locally.

Within a pipeline, Logstash can process input in parallel: the “input stage” reads batches of data from the source and hands them off to a pool of threads for processing (in particular, filtering of data). The “batch size” is a global config-option which applies to all pipelines, and is measured as number of events to read. The pool of “workers” is shared across all defined pipelines. I’m not sure at the moment how record order is preserved when using pools of workers (possibly it is not; webserver logfiles are not order-sensitive and they are the primary usecase for logstash)..

The internal event-buffers are by default in-memory-only. Logstash shutdown normally waits for the event-buffers to empty. If logstash is forcibly shut down, or the whole server goes down, then data is lost. There is BETA functionality to persist these internal buffers to local disk.

Logstash can act as log-forwarder or log-aggregator, but in general “filebeat” is used as the log-forwarder and logstash as the log-aggregator only.

Fluentd

Despite its name, fluentd has nothing to do with Confluence (wiki/bugtracker) or Confluent (the Kafka specialist company). It is an ETL tool which:

  • is opensource (Apache2 license)
  • is implemented in a combination of Ruby and C
  • uses very little memory
  • is based in Japan
  • is supposedly “similar to flume but easier to install and maintain, and with better documentation” (I found the documentation pretty but shallow)
  • has, like Kafka Connect, open-source and enterprise editions.

The enterprise version:

  • includes RPM/DEB packages; open-source download must be installed via ruby-gems.
  • includes Chef recipes for installation.
  • has extra QA + support

The plugin for reading files is called “in_tail”. It handles “file-rollover” (by tracking the inode-number), but does not process multiple files.

Because code is Ruby, database access is either db-specific or via the “DBI” wrapper.

There is no information on the overall architecture of fluentd, but as it is described as “like flume”, I presume it is also single-process-with-multiple-pipelines and no clustering.

Failover-handling is described here. Fluentd supports failover at the forwarder-side by having a list of aggregators to send to; the first available one is used. When neither is available, events are cached at the forwarder until they can be sent. But any events received by a “dead” aggregator which are not yet sent remain on disk until the original aggregator is resumed.

Facebook Scribe

A few articles mentioned Facebook Scribe as also being a tool in this space. However it appears to be a dead project. No indication if it is being maintained elsewhere (I searched reasonably hard, found no active fork).

It would appear that Facebook now use fluentd instead.

Heka

Heka is written in Go (and custom plugins must also be written in Go - or in Lua)

The architecture of Heka appears to be similar to Flume/Logstash/fluentd, ie a single process supporting multiple concurrent pipelines, with no clustering.

The Heka daemon includes an embedded webserver which provides a “dashboard” interface. However configuration generally is done over config-files as with flume/logstash/fluentd.

Unlike other tools, Heka also provides the ability to do statistical analysis on the events passing through (or even terminating at!) Heka; the results can be viewed in the dashboard.

Apache NiFi

All of the ETL tools looked at so far are command-line-driven, and require significant investment from operations and developers to install and configure. Kafka Connect is possibly the most labor-intensive of all (though more reliable and scalable once set up).

Apache NiFi is an attempt to provide a more user-friendly way of designing data import and export workflows. It provides a web-based interactive user interface to “drag and drop” widgets in order to define data processing pipelines. These definitions then (somehow) drive back-end processing engines.

NiFi also has a strong focus on “data governance”, ie building databases and reports showing which outputs are related to which inputs (no matter how indirectly).

The project is, however, very young at the current time. Many significant features (eg Kerberos support) were only added in 2016.

The documentation seems to match the target audience of the project - good at the high-level pretty-pictures part, but definitely lacking in details regarding scalability and error-handling. From a day’s worth of reading of documentation, I personally have major concerns about throughput and about behaviour when a node in the system fails. It would be advisable to spend a significant amount of time testing scalability and failure scenarios before committing to using NiFi in any production environment.

I have written more about NiFi here.

Other technologies of interest (not yet researched)

Developing a Custom Connector

When developing a custom connector, it is useful to be able to start Kafka Connect from within an IDE. There are probably clever solutions involving starting Kafka Connect with some kind of “hotswap agent” and the debugger-agent enabled, then deploying class-files directly into the Kafka Connect $CLASSPATH and connecting to the external process via the debug-port.

However a simpler solution is to add the following dependencies to your code:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-runtime</artifactId>
            <version>${kafka.version}</version>
            <scope>test</scope>
        </dependency>

and then define a class in the unit-test project code which is based on standard Kafka class org.apache.kafka.connect.cli.ConnectStandalone. This class can then be started in debug-mode from the IDE, and the connector is running within 10 seconds. Not perfect, but better than nothing.

Example code can be found here.

References

Some other links of interest: