Apache Nifi Architecture

Categories: BigData

Introduction

The Apache NiFi project provides software for moving data (in various forms) from place to place - whether from server to server, or database to database.

There is a lot of buzz around at the moment about Apache NiFi at the moment, so I have had a look. In particular, I’m a back-end and details person - scalability and reliability are of interest to me rather than glitzy UIs; how clustering is done and how power-failures are handled are more interesting than whether drag-and-drop is supported in a UI.

I’m definitely no expert on NiFi - in fact, the following is based purely on documentation and presentations available on the internet. And unfortunately, the available documentation on the underlying principles of NiFi is fairly poor at the current time; it requires significant effort to discover what is going on “under the hood”. Nevertheless, this information is (in my opinion) sufficient to draw some conclusions about NiFi. Any time I write “NiFi does this…” or “works like this” below, please understand that this is implicitly prefixed with “as far as I know” or “according to available documentation”…

The official user NiFi documentation is good, but of course biased. I think the feature-list in the documentation overview oversells NiFi somewhat, at least at its current young state of development; concerns and weaknesses are addressed in the remainder of this article.

I assume you’ve already read some introductory articles/documents on NiFi, and looked at a few video that demonstrate its fancy interface; this is an “under the hood” look, not a gui-walkthrough.

NiFi in Context

NiFi provides data transfer, transform, and routing features for a wide (and expandable) set of datasource and datasink technologies. Its features are somewhat similar to aspects of an Enterprise Service Bus, but have more in common with an ETL (extract-transform-load) tool or an ‘event aggregator’. See also core data integration.

There are many well-known tools already addressing the problems of moving data around. They can generally be categorised as:

  • Heavyweight Enterprise Service Bus products (eg Mule or Camel). ESBs also move data from place to place, and can manipulate data specified in a high-level language (eg BPEL or JBI). Some ESB products allow such high-level logic to be defined graphically, as with NiFi. However ESBs often (sometimes exclusively) offer synchronous endpoints, while NiFi focuses on providing async dataflows (with minor exceptions, such as being able to control the HTTP response to an incoming HTTP POST request).

  • Medium-weight data integration tools which provide the ability to move blocks of data (eg entire files) around

  • Lightweight tools which focus on forwarding, aggregating and transforming streams of small records (eg Flume, Logstash, Kafka Connect). Many of these have basic configurable data transformation operations, and the ability to host custom code for more specific transforms.

These distinctions aren’t entirely clear-cut.

NiFi fits somewhere in the middle. It isn’t an ESB, although it does provide an embedded graphical high-level programming language for basic data manipulation and routing. It is moderately scalable, but isn’t suitable for handing very large numbers of records. It isn’t really suitable for handling extremely large data-blocks (multiple Megabytes). It does have nice tools for visualizing the flow of data through the system, and inspecting system state (on a per-node basis).

A number of tools include NiFi as part of their functionality, in particular:

  • Hortonworks Data Flow (HDF) - part of the Hortonworks suite
  • Kylo, an “all in one data lake” tool that is (at least partially) open-source but linked strongly to Think Big aka Teradata.

A partial list of data-integration tools can be found here and here. See also the Wikipedia definition for core data integration. Interestingly, there appears to be little presence of open-source tools in this area; a number of large companies have open-sourced part of their commercial products (eg Talend, Pentaho) but AFAICT these tools are heavily driven by the parent company rather than a community. Perhaps the lack of pure community-driven open-source interest is to be expected - this is a rather dull and enterprise-related topic..

NiFi Features

There is no doubt that NiFi looks cool in demos. Each NiFi instance has an embedded webserver which provides a modern javascript interactive editor for defining “data flows”. There is a large number of videos available on the internet showing how to use this graphical interface.

The list of things that NiFi can do is also impressive, including:

  • Interactive debugging of data flows
  • Interactive inspection of performance, with “messages per step per second” being updated on each node of a dataflow graph in near-real-time
  • Extensive audit trails showing what happened to records (aka provenance)
  • Detailed dashboard of NiFi instance status (per-instance only, not cluster-wide)
  • Support for back-pressure (stopping the inflow of new messages when the system is “full”)
  • Data prioritization - important data packets are processed before less-important ones, and when data must be discarded, then the lowest-priority ones go first.
  • Time-based data expiry (ie data that is only useful for a short time can be discarded if the system cannot process it within that time)

However the following things are potentially weaknesses in the NiFi approach:

  • Scalability
  • Avalability
  • Preservation of data order
  • Version control of data-flows
  • Code reviews of changes to data-flows

I’ll explain my concerns about the above in the following sections of this article, once the actual NiFi architecture has been presented.

NiFi History and Maturity

Current version (2017-04): 1.1.2

  • 1.1.0: 2016-11-29 – Adds support to run custom processors in isolated classloaders (what, no OSGI??!).
  • 1.0.0 2016-08-30 – First full open-source release. No longer a single master (single point of failure).

  • 0.7.2: 2017-02-20 (maintenance release)
  • 0.7.0: 2016-07-12

At the current time, NiFi is still quite new, and still getting major features and changes (eg Kerberos support in mid 2016).

The user forums are moderately busy. Expert feedback is sometimes available for questions, but not always - the development team is pretty small, and apparently there is not yet a significant pool of expert users contributing to the forums.

For users of products which embed NiFi (eg Hortonworks HDF or Kylo), there is of course also support through those product forums. I have no idea if these are busier and more helpful than the NiFi ones.

The NiFi forums show quite a few users having significant problems with NiFi; it is clearly still maturing.

The NiFi wiki includes a section on feature proposals which is interesting for a view of areas where NiFi users and developers see a current lack of available functionality.

Architecture First Glance

The architecture is described briefly here:

although a better (more detailed) architectural reference is:

Although NiFi is promoted as supporting clustering, that is really a thin add-on to standalone NiFi mode. It is easiest to understand NiFi first as as a single-process application, and then to look at the clustering features later. Without clustering enabled, NiFi is similar in deployment structure to tools such as Flume or Logstash which have no clustering support at all.

NiFi is based on Java, and each instance runs as a single big JVM process. To process significant numbers of messages through a NiFi instance, it should be run on a powerful server with many CPUs, lots of RAM, and fast local disks.

The data stored/managed by each NiFi instance is written to plain old locally-mounted disks. Of course these can be remote SAN devices if really desired - but for very high data volumes the network traffic that this causes has its own disadvantages. Data managed by a NiFi instance belongs to that instance and is not replicated or distributed. Moving data between NiFi instances requires explicit steps by the “programmer” (flow developer) and involves copying data from disk to disk over the network. The total amount of data that can be managed by a single NiFi node is thus limited to the amount of storage that can be mounted on a single server (currently, around 8TB is a cost-effective limit, though more can be crammed in if price is no concern).

As a data-item enters a NiFi instance, it is appended to an existing file in the local content repository, and a separate metadata record is created which references this content via (content-file, offset, length). When the current output file grows too large, a new one is started. The content repository can thus hold many independent data-items without generating large numbers of small files in the local native filesystem. This approach to data storage is often called a commit-log, write-ahead-log or log-structured storage, and is used in many scalable systems; for example a Kafka message broker node stores data similarly. These files are never modifed (except for appending data to the current output file, and eventually deleting whole files when they “expire”). Combining multiple data items in a single file complicates discarding such data when it is no longer needed - but NiFi doesn’t bother; instead it simply keeps the files in the content repository for a fixed period of time, and calls this a rolling buffer of history. Old content is therefore accessible via the associated provenance events until it ages out of the system (also very Kafka-like). Provenance events may exist which point to data which has been deleted; this is considered normal.

Actually, the content repository implementation is pluggable; as alternative to the FileContentRepository described above, there is a variant that simply stores its content in memory. This is of course fast, but has limited capacity and loses all data when the process terminates. There do not appear to be any other content repository implementations, and none planned.

The metadata record generated for each incoming data item is called a flow file. This is a misleading name, as it is not itself a file, and does not reference a file but a byte-range within some file in the content-repository. These records are stored in a separate repository using a similar mechanism to content - by appending them to a file on disk. However unlike the content data, NiFi aggressively caches flowfile metadata content in memory (as much as possible); the metadata size is reasonably small regardless of the size of the content that it references. Flowfile objects are immutable; when modified by a processing step then a new copy is created. This provides an audit trail of the metadata associated with a data-item as it passes through NiFi, and this information can be later inspected. The (potentially larger) content is treated in the same way (copy-on-write), though it is less common to modify the content.

Each flowfile record is present on exactly one NiFi internal work queue at a time, indicating which “processor” (logic step) should be applied to it next. These queues are (in some undefined way) maintained in a transactional manner so that they are not corrupted in the case of a process crash or immediate termination - on restart, any in-progress processing which was not committed appears to never have taken place, and the flowfile is present on the original work queue. Transactionality does not apply to any side effects a processor might have; in particular data ingress or data egress steps which have side-effects outside of NiFi.

NiFi defines a Java interface for “processors”, which are provided with a flowfile as input, and generate a set of zero or more (output-connector, output-flow-file) pairs. NiFi provides a wide selection of standard processors, and custom ones can be implemented when needed. The most recent NiFi release (1.1) provides a way to load custom processors into the NiFi process via a dedicated classloader to minimise class version conflicts (though sadly, they invented their own solution here rather than using OSGi).

Processors may implement “data ingress”, ie obtain data from the outside world either by actively polling for it, or by opening a network port and waiting for data to be pushed to them. By convention, these processors have names of form Get* or Listen* respectively. These processors write content to the content repository and generate flowfile objects. Note that this implies that NiFi cannot perform any management of a unit of data without copying it to the local filesystem; the ability to manage data without having to copy it is on the proposed features list.

Processors may implement “data egress”, ie move managed data to the outside world. Such processors by convention have names of form Put*. The associated flowfile is usually marked as done at that point. However the associated flowfile record (and any previous versions existing due to flowfile copy-on-write behaviour) is retained for audit purposes.

Processors may also “branch” a flow, sending content to two downstream processors. When the content is identical, then both generated flowfiles reference the same underlying content, ie it does not need to be copied on disk.

Processors are invoked by the system using threadpools, ie multiple records on an input queue can be in-flight concurrently. This seems to imply that NiFi does not / cannot preserve the order of messages flowing through the system (unlike Kafka-based systems which use data partitioning to process in parallel while preserving order). Processors are also currently invoked by the system on a “timer” basis (eg every 30 msecs) regardless of whether data is available in the input queue or not (“event-triggered processing” is considered experimental at the current time). Such polling could potentially consume significant resources on an idle system.

There are also controller services which are effectively singleton objects per NiFi instance which can be used to share resources between processor instances, eg SSL configuration settings.

Processors are provided with an API to save their internal state to the local disk if desired.

The general idea of flowfiles is that most decisions regarding routing of a unit of data (to another processor definition) can be performed by inspection of meta-data, without needing the content. When necessary, processors may inspect the content and set new metadata attributes which other processors may then access.

Comments on the general architecture:

  • Consolidating multiple units of data into log-structured files within the content-repository is elegant (though not original).
  • Separating content from metadata, so that metadata can be cached agressively in memory for performance is elegant.
  • In general, the way content and metadata are stored in write-once logfiles (with periodic rewriting for garbage collection) seems good - a technique used quite widely in big-data tools. This approach does make it impossible to free disk space when the related data is no longer needed - which NiFi solves by using a simple fixed data retention time for all data (kafka-like). A fixed retention time has advantages (simple to understand and implement) and disadvantages.
  • Relying simply on local storage, without replication, is a concern for system availability. Presumably traditional techniques such as RAID and SANs will need to be applied if a highly reliable solution is needed. That still won’t provide a highly-available solution, however.
  • Having a single massive Java process does not seem so elegant (see later for comments on clustering) - although competing tools such as Flume and Logstash do the same. Kafka Connect has a more elegant approach.
  • The way that tutorials (official and otherwise) show quite deep processing pipelines is a concern, (a) for implementation and maintenance and (b) for performance when message volumes are high.

Satellite NiFi Instances

One of the use-cases of NiFi is to install a NiFi instance on each “satellite system” which is a source of data, and one core instance (or a cluster of instances) in a central data-center. The satellite instances may be configured to push data to the central NiFi instance(s) periodically. Alternatively, satellite systems may be configured to hold data until the central system polls for it - though in that case the central system needs to know the address of each satellite system, and to have network access to initiate outbound connections to that system, which seems far more complex to manage.

When transferring data from one NiFi instance to another (via the “remote process group” mechanism), the flowfile state (ie metadata about the content) is also transferred.

Each NiFi instance includes a full webserver with support for REST endpoints, graphical dashboards, and much more. There is currently a project named MiNiFi to produce a slimmed-down NiFi version for deployment to satellite locations.

Graphical Data Flow Definition in NiFi

Each NiFi instance includes an embedded webserver that can serve up static javascript resources which form a rich client application. This rich client application can be used to view existing data flow definitions, and (if the user has appropriate rights) modify those flows. The flow definitions are stored as a single zipfile in the local filesystem named “flow.xml.gz”.

When operating a cluster of NiFi instances, it is not usual to place a load-balancer in front of it. It is certainly inadvisable to access the NiFi UI for any instance of a cluster through a load-balancer; they must be addressed directly as they are very stateful (distributing requests for the UI via load-balancing will not be successful). Of course such “direct access” patterns have negative implications for high availability (requests to a failed instance cannot be automatically redirected to an active one).

About Graphical Programming

One of the things heavily promoted about NiFi is its graphical drag-and-click environment for specifying how data flows through the system. In effect, users can define 1970s-style flow charts of logic using (predominantly) a mouse.

Many “event aggregation” alternatives to NiFi provide graphical interfaces to view system state (including throughput), configuration properties, and list processing steps; however in most cases the actual definition of flows is not done graphically (see the Flume UI, the Logstash UI, Confluent Control Center for Kafka Connect, etc). With “data integration” products (particularly commercial ones), graphical UIs for defining data flows are more common.

Graphical programming does have its place; it allows business experts to solve their own problems rather than wait for software experts to do it (aka “self service”). However there are a number of drawbacks to such systems, including:

  • Drag-and-drop can be slower than text-based programming in the hands of an experienced developer
  • Search-and-replace and similar text-processing tools are not available
  • Defining common code just once is difficult/impossible
  • Code comments are often not supported at all - ie no explanations about what is being attempted and why
  • Standard exception-handling is often not available
  • Refactoring support not available
  • Logging messages for auditing/debugging is often not possible
  • Version-control is often not present
  • Code reviews are often not possible
  • Roll-out from development to testing to production is often not considered
  • Debugging features are often lacking (NiFi users can inspect inputs and outputs of each node in a flow, but AFAICT other options are limited - in particular, when a processing node is a script..)
  • Putting power into the hands of non-specialists can lead to errors which specialists would not make (particularly regarding security, performance and error-handling)

The NiFi graphical development environment appears to suffer almost all of the above problems. There is limited potential to put common code into templates. It is also possible to define “process groups” (vaguely similar to functions) and to define a comment on the process-group. Templates can also be used to implement a primitive kind of rollout of code from a test environment to a production one. However none of these “solutions” are particularly elegant, and other issues remain completely unaddressed. The NiFi wiki proposed features section includes a number of entries addressing the development procedure issues listed above.

Apparently some users design flows in a development NiFi instance then check in the resulting flow.xml.gz. However diffs don’t really work in this approach, and there are issues - in particular, some settings will be different between dev and prod environments. NiFi does provide some limited “variable substitution” in workflows, ie workflows can reference variables which are different in different environments, though rolling these out (and ensuring they are reloaded when needed) must be complex. Apparently NiFi somehow tracks changes to dataflows for reporting purposes, but there does not appear to be any UI for this at the current time and it is not clear how this works in a clustered environment.

One video example available online which demonstrates how to define flows graphically shows the issues with non-specialists and complex flows; the self-confessed non-developer configures an “encrypt data” processing node in his flow in a way that should make any person knowledgable in the area of encryption wince. They then view the output data and comment “it looks encrypted to me”. Software specialists really do have certain skills and knowledge that business users do not. Related issues may arise when defining XPath expressions for extracting data from XML content, etc - different expressions which extract the same data may have orders-of-magnitude difference in performance.

Self-service systems typically are loved by their end-users for a few months (the power!). And as long as there are only a few, simple, graphical “programs” all is well. If a system only needs a small number of “data flows” then perhaps the advantages outweigh the disadvantages even over the long term. However more typically, such systems grow and grow - and descend into a mass of tangled and unmaintainable spaghetti.

It has traditionally been considered useful to have graphical tools for configuration of software. Is a NiFi flow a program or simply configuration? That’s something of a grey area; a flow that simply reads from one location and writes to another could well be considered simply configuration. A flow with a single routing step could possibly still be considered configuration. A flow which does significant transformation of its content, and non-trivial routing should probably be considered a program rather than configuration. Unfortunately, it is likely that a NiFi installation which is originally intended to hold just configuration-like flows will, over time, grow to instead contain program-like flows. Note also that there has been in recent times a trend away from even graphical configuration towards textual configuration; system management tools like puppet and chef require a text representation of configuration, and version-controlling and reviewing configuration changes has also proved beneficial. The devops movement, where deployment and even new environment provisioning, is automated are not compatible with graphical configuration tools - except those which are simply “helpers” for generating a textual representation of that configuration. Systems that should be scaled-up/scaled-down “on demand” also require automated configuration which is not compatible with graphical configuration.

From the NiFi email list, it is clear that many NiFi users have questions and problems with development practices (eg here). Equally clearly, the NiFi development team have no answers at the current time.

In short: if you are considering NiFi, I would recommend defining a review process associated with flow creation and modification, including documentation steps - and trying hard not to let the complexity of flows cross the line from “configuration” to “programming”.

Provenance Events

Provenance events track changes to metadata records (flowfiles) as they pass through a NiFi dataflow. Each time a flowfile is received/routed/etc, a ProvenanceEvent is created and inserted into Lucene for later analysis.

Flowfiles must be coarse-grained objects, otherwise the volume of provenance records generated by a dataflow (particularly a deep pipeline) will be overwhelming. Presumably NiFi flowfiles should only represent infrequent messages, and large numbers of small messages must be (somehow) batched. Possibly flowfile=email would be acceptable. Flowfile=sensor-reading would probably not be a valid use-case for high volumes.

According to an email thread with NiFi developers provenance events can be used for the following purposes:

  • debugging a flow
  • monitoring a flow
  • understanding a flow (particularly for NiFi users other than the flow author)
  • replaying a flow
  • as input to a data governance process

The uses of provenance for debugging and monitoring features are obvious; see any video demonstration/tutorial on NiFi. For those who work regularly with the Kafka message broker, the idea of replaying events is very familiar but many other “data integration” tools (whether Flume/Logstash/etc, ETL tools, or hand-written scripts) lack this; being able to say “redo processing of all events since some point in time” can be very useful.

Data governance allows system auditors and users to determine two things:

  • does the data delivered by some integration process comply with legal regulations, and
  • does this data come from a trustworthy source?

Examples of legal compliance is verifying that “all credit card numbers are masked” or “no GPS lat/long information is present”. A trustworthiness example is being able to determine whether a feed of stockmarket-share-prices comes from nasdaq.com or from dodgybrokers.example.

Unfortunately, while the email thread referenced above does contain a statement that provenance events can be used for data governance, no further information is provided on how exactly data-governance can be derived from provenance events and I have been unable to find any other source which describes how to implement data-governance with NiFi. Simply having a massive database tracking the metadata-history for every single unit of data which has flowed through a system does not itself provide data governance - in volume, data is not the same thing as information.

There is one particular use-case in which a full history of provenance events is necessary: combing through a large-volume stream of data looking for rare “matches”. In this case, once a match is found then it can be important to look at the provenance history for that specific event to determine whether this is a “reliable match”, ie whether the sources of this data are trustworthy or not. When the matched data is important enough then the original sources can be contacted again to verify the original data which lead eventually (possibly via multiple data transformations) to the match. Given the origins of NiFi in the US government, the ability to detect rare “hits” and then to be able to determine the original source of that information for validation is obviously important: crime and terrorism detection done by monitoring large dataflows for unusual patterns should also include strong verification abilities. Unsubstantiated conclusions are not useful, and false ones can be very damaging.

TODO: in some parts of the NiFi documentation, it is stated that a provenance event is emitted for each flowfile for each processor. However elsewhere it is stated that no provenance-event is generated for a flowfile sent to the success output of a processor - which is true? Reducing the number of provenance events would certainly increase system performance.

Clustering NiFi

A group of NiFi instances can be clustered together via a shared Zookeeper instance. This provides the following benefits:

  • When a dataflow is modified on one instance (via a browser), the changes are pushed to all other members of the cluster. There are some problems with the current implementation, but in general it seems to work ok since v1.0.

  • Exactly one instance is the primary node of a cluster, providing a place on which to execute processor logic that should only occur once-per-cluster. In particular, a processor which periodically pulls data from an external system should usually not run concurrently on multiple nodes. If the primary node should terminate, a new primary node will be “elected” from the remaining nodes.

  • When a NiFi satellite instance is pushing data to a NiFi cluster, then it can query any member of the cluster to find out the full list of cluster members and their current system-load, and then choose which instance to push data to. This provides a kind of “auto-configured load balancer” - though the pushing instance must have direct network connectivity to each cluster member. Members of a cluster in a data-center can use the same mechanism to push data to other members of the cluster; in particular a “data ingress” processor component running in primary node only mode can be explicitly configured to distribute load via this mechanism (though this results in a copy of the data).

In a cluster, the dataflow definitions are identical on each node (they can be modified on any node, and are synced). The processors defined in the dataflows execute on all nodes - except for those marked as primary node only. This allows parallel processing of data - but each node processes only the data it holds locally; data is not shared between nodes. This means that one node may be idle while another has a queue of work to do; NiFi clustering does not address this.

NiFi clustering does not address the following:

  • Workload sharing (except assisting with “distributed push” so that load is distributed on entry to the system)
  • Replication of data for reliability
  • Failover handling
  • Dynamic scalability
  • Cross-node monitoring and statistics (all stats shown on a NiFi node dashboard are for that node only)
  • Cross-node provenance info (all provenance info shown for a NiFi node is for that node only)

NiFi instances are very stateful, due to their use of local filesystem storage. They are definitely not the sort of thing that can simply be dynamically scaled up by starting new instances when desired - capacity planning is very important. It is possible to run NiFi instances in cloud environments, but each instance must be carefully configured with access to appropriate persistent storage volumes - unless you are willing to discard all data managed by a node when that virtual instance is terminated.

The general trend in big-data systems is to self-management and horizontal scalability. Tools such as Kafka, HBase, or Cassandra internally replicate data for resilience against storage problems; on node failure the load automatically is taken over by an existing node, and when a new server instance is added to a cluster the cluster rebalances itself (both storage and processing load) automatically. In contrast, NiFi’s approach to availability and scalability is very old-school:

  • Data is not replicated internally; resilience against storage problems must be done by using raid disks etc
  • Failover is not supported internally; resilience against server problems (eg powersupply problems, CPU problems, network card problems) can be done by having cold/warm standby servers, HA Proxy or similar, and using disk-level replication or using a SAN and remounting it on a new server.
  • Scalability is primarily vertical - use bigger servers. It is possible to have multiple NiFi instances, but distributing load across them requires careful configuration, they are stateful meaning each instance requires dedicated storage, and ensuring the new instances are resilient also requires careful consideration (see the first two points). So scaling horizontally is not a trivial process.

NiFi can be deployed on a cloud platform (such as AWS). This does mostly solve the node-resiliance problem. However storage must be persistent, which means that all disk IO performed by NiFi is actually network traffic; a significant limitation for performance. It also makes the storage-resiliance problem an issue for the cloud platform. However NiFi does not currently support S3 or similar; the storage must be mounted as a native filesystem. Using a cloud does not really address the horizontal-scalability problem; provisioning new NiFi instances is non-trivial due to the need to configure dedicated persistent storage for each instance, and the complications in actually distributing load across NiFi instances.

Outstanding questions regarding NiFi clustering

  • When designing a flow on one server, when is it copied to other servers (there is no save button as far as I can see..)
  • The stats shown on the “canvas” for each processor are per-host only. There is a “summary page” accessible from the “global menu” - is this cross-node?
  • When a workflow-node is “started” interactively, is it started on all worker-nodes in the cluster?
  • When defining an “ingress processor” which actively fetches data from an external system, is it standard practice to mark this processor as just running on the primary node, and then have the following step in the flow be a “remote process group” processor which distributes the data to all other nodes in the cluster?
  • When using Listen* or Get* processors, how is message ordering preserved? When all messages in the input, or subsets (partitions) must be delivered in the same order as they are received, how is this done?

Note from the FAQ:

Typically, if you want to pull from HDFS and partition that data across the cluster, you would run ListHDFS on the Primary Node only, and then use site-to-site (remote process group) to distribute that listing to all nodes in the cluster. We do realize this is not ideal…

The feature proposals wiki page has an entry for data replication. See also this email.

Flow Development Process

To continue the discussion regarding Graphical programming, here are some questions:

  • Can a flow be exported as text, and imported into another NiFi instance?
  • How are flows version-controlled?
  • How are flows code-reviewed (eg diffs seen)?
  • How are flow changes audited (who, when)?
  • How can flows be tested outside of production?
  • How can flows be debugged interactively?
  • How can flows be migrated from dev to test to prod environments?

Partial answer: the NiFi Repository project is intended to provide a central store for flows. According to Git history, initial commit was 2017-03-16(!) There are currently no tags and branches, ie no releases - this is a brand new project.

Partial answer for rollout: in the FAQ, flow templates are recommended for this purpose. In summary, after a flow has been designed in a development environment, components can be selected and “create template” invoked. This generates an XML file. A template can be imported into another NiFi instance with the “upload template” option. The template must then be dragged to the system canvas to activate it. With some strict processes, it might be posssible to define a “development flow” based on these features - though rather clumsy.

Question: The user guide “Output Port” section says that data can be sent to a remote NiFi instance if the output port is “on the root canvas”. Is that a requirement? Does that mean that a flow defined from a flow-template cannot send to a remote NiFi instance?

Another user has also enquired on the NiFi forums about “development best practices” recently; no answer so far.

The NiFi wiki has configuration management in the “feature proposals” section.

General Questions on NiFi

General questions:

  • Is there really a provenance-event for every data-item x every processor through which it flows? If so, then doesn’t this a limiting factor to the volume of records flowing through NiFi?
  • As documented in developer-guide, the “global processor state” is a single map object stored in Zookeeper, and any change to the state rewrites the entire map. Is this still the current implementation?
  • What does it mean for a processor to be “batched”?

Note: “typically no Provenance event is created when a processor routes a flowfile to success”. So in a linear flow with Get, 5 transforms, then Put, does this mean that there will only be 2 provenance events (the get and the put)?

Development:

  • Flowfiles are immutable entries in a “write ahead logfile” - rather like Kafka records. However at http://nifi.apache.org/developer-guide.html they are described as having attributes “path” and “filename” (and the user guide says something similar at https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#terminology). What do path/filename attributes mean for a flowfile record?

Error Handling and Resumption

On a single server, NiFi appears to have a reasonably good crash-resistance story. Content is immutable, and in logfile format. Flowfiles are immutable and in logfile format. Provenance events are immutable and in logfile format. The “input queues” stuff is apparaently crash-resistent. And “transactions” are supported. So on unclean app termination, resumption should work ok, restarting from the most recent complete flowfile for any processor.

Because data is stored “locally” rather than distributed, there is no replication factor for data as with HDFS, Kafka, Cassandra, etc. If a disk is lost, all data on that disk is lost. Of course technologies such as RAID can be used to mitigate some dangers. And storage can be mapped to a SAN volume, which gives not only better replication management, but also the ability to mount that same volume from a different server for disaster recovery. However such things are significantly more complex to configure reliability for than modern “big data” systems - and recovery times on failover are significantly longer.

Other

NiFi supports “back pressure” which is something that Kafka/Kafka-Connect does not do so well; Kafka applications generally assume that the broker cluster can absorb all data that can be thrown at it. On the other hand, that is usually true…

NiFi allows a priority to be associated with each data-packet, and an expiry time after which the data is no longer of use. That could be useful in some cases.

Normally, processors defined in a flow run concurrently on every node. However processors which communicate with external systems (eg making an SFTP request) should not run concurrently; these should be marked as isolated in which case they only run on the primary node. The data that such processors pull must then (somehow) be “load balanced” across other nodes in the cluster to get parallel processing.

A processor can “store state” so that on resume it can “continue where it left off” - a bit like kafka offsets. However it is up to the individual processor to make calls. The caller can specify that state is “local” (in which case it is stored in the local filesystem) or “global” in which case it is stored in Zookeeper. Zookeeper is NOT designed for frequent updates, ie frequent global status saves will be a scalability problem.

“Active” flowfiles are kept in memory, but every change is first appended to a write-ahead-transaction-log before being applied. This allows the current state to be recreated on restart. Periodically, “snapshots” are created so the transaction log does not grow infinitely (the tx log is applied to the latest snapshot). Flowfiles do not hold a reference to the “processor whose queue they are on”; instead each processor has a queue pointing to the flowfiles waiting to be processed by that processor.

TODO: how is the state of these queues managed/snapshotted/restored?

A processor can update a data-item in the content-repository. This actually causes a copy-on-write in the filecontentrepository - the updated flowfile references the location of the new version of the content, while older versions of the same flowfile continue to reference the original content at its original location.

The split between metadata (flowfile) and content is interesting. If the processing logic can mostly be applied to metadata (properties in the flowfile header) without needing to actually access the content, then this is a performance boost. Flow authors are encouraged to use Processors to grab specific data from the content and store it in the flowfile headers once, then use this data from other processors rather than access the content again. Can be a win for large content-blocks.

Splitting of a single data-item into multiple can be efficient in some cases. In particular, a single flowfile pointing at a single content-item which is a sequence of LF-terminated strings can be transformed into a flowfile-per-line without needing to move the content-item; the new flowfile entries just point at the same file in the filecontentrepository with different offset/length values. This does make refcounting of the referenced data difficult - but NiFi doesn’t bother, instead relying on a simple fixed retention time (aka “aging out”) to reclaim disk space.

When data is being “pushed” to NiFi, a flow is simply defined with a “listening” processor as the initial point. The flow is distributed across all nodes as usual, and so a “listening port” is opened on every node. An external load-balancer can then be configured to distribute connections across the entire set of NiFi nodes, and thus incoming connections will be distributed across all nodes.

When data is being “pulled” from NiFi, then the “pulling” processor should be configured to run only on the primary node (it can be configured to run only on another node, but then will not run if that node is down). The data however will be stored only on that single node. The flow must then use a “remote process group” processor to push the data to a named “input port” on a specific other NiFi node, or to a named input-port on “all nodes”. The flowfiles in the “pulling flow” will then be transferred to other NiFi nodes, together with a copy of the content-item that they reference. Load is not simply distributed round-robin, but instead is “load-aware”; more flowfiles are sent to lightly-loaded target NiFi nodes than heavily-loaded ones. Note however that this is a “push load distribution model” rather than the Kafka “pull load distribution” approach; pulling is better for overall scalability.

Data managed by NiFi must reside in the content repository - which is a concern when managing large units of content. The only viable approach appears to be to never import the content at all, but instead process a message whose content is simply the address of the relevant data. If handling of this message requires data from the (large) content then that should be extracted once, and embedded as properties in the initial flowfile record. While somewhat clumsy, alternatives to NiFi (eg Flume, Logstash, Kafka Connect) must handle this situation in a similar way.

Conclusions

Jobs like moving files around (collect from remote FTP server every X hours, deliver to share filesystem) have often been done with hand-written scripts in languages like Python. NiFi is a strong candidate to replace such scripts - it provides a suite of prebuilt processors, decent handling of errors, performance monitoring, and similar features. Being able to configure the source and sink parameters (hosts, paths, time-intervals,etc) via a graphical UI will be considered a plus by some. And the graphs it generates are pretty.

Handling tasks like collecting Twitter feeds (a common example in tutorials), providing a receiving endpoint for HTTP POST operations performed by other systems, etc can also probably be better handled through NiFi than other technologies - as long as the data volume is not too high.

For very high-volume feeds (hundreds of messages per minute or more), I have significant doubts that NiFi will scale. For “log aggregation” type workloads, tools like Flume and Logstash would appear to scale better on a single system; Kafka Connect scales excellently both vertically and horizontally (clustered). NiFi’s clustering has significant limitations that need to be understood before selecting it.

Providing a GUI for viewing existing configuration and system statistics is not controversial, and NiFi does this well (though all such reports are per-node, not per cluster). For “log aggregation” workloads, the ELK stack (elasticsearch/logstash/kibana) reporting is superior though. Providing a GUI for defining/editing dataflows is more problematic - the issues have been described above.

In summary, NiFi is definitely useful for some purposes, but not all. However its glitzy UI performs best at the most important task: looking good in demos. I therefore predict a boom in NiFi installations over the next few years. Whether disappointment sets in later depends upon how fast the NiFi team can get to work on the issues already listed under feature proposals on their wiki.

References and Resources

Data Integation Software and Principles:

  • Third Nature/Madsen: Role of Open Source in Data Integration - A good overview of Data Integration topics (though from 2009). Note that the report is “sponsored by Talend”, but there is no Talend-specific content within it and no obvious bias.
  • CloverETL – a partially-open-source project somewhat comparable to NiFi
  • Pentaho - open-core big data analytics product (mostly driven by Hitachi)

General NiFi Info:

Clustering Topics: