TODO: Article in Progress…
I’ve recently written a Spark streaming application which reads from Kafka and writes to Hive. There were a number of tricky things encountered on the way.
This was on Hortonworks Data Platform 2.5.3, with Kafka 0.10.0, Spark 2.0, and Hive 1.3 on Yarn.
Note that integration between Spark and Kafka is provided by a library which is developed as part of the Spark project, but not “embedded” in the standard spark libs.
The Kafka queue the application was reading from had a reasonably short retention time, and a reasonably high volume of data. It thus quickly reached a steady state where data was flowing in at one end, and being expired at the other.
Running the application with a unique groupId and kafka.offset.reset=”-latest” appeared to worked fine. I then wanted to do a performance test, ie see how fast it could process data when necessary. The obvious approach was to use a new groupId and to set kafka.offset.reset=”-earliest” in order to get a large block of data to test with.
Sadly, the results were just weird. Performance was fast for a few seconds, then dropped back significantly. And the logs got intermittent exceptions about “IndexOutOfRange” and “Kafka Client is not concurrent” (or something similar; I no longer have the exact error message).
It took a while before the cause became clear: the Spark driver application was looking at the topics and computing the block of messages to be executed as
(partition, previous offset -> previous-offset + N) for each partition and sending these values as tasks to the Spark workers. But when the Spark worker code tried to read its assigned messages the offset was no longer valid as a Kafka background thread had expired the messages. The Kafka library then throws an IndexOutOfRange exception.
I can’t blame Kafka here - that exception is totally reasonable from Kafka’s point of view. Sadly, however, Spark Streaming does not handle this exception at all well - it goes quite weird for a while, trying to re-schedule the task etc. If it happens to reschedule the task on the same server that failed, then the “not concurrent” error occurs - presumably because the previous exception did not releases its “lock” on the Kafka connection when the previous exception occured. Of course this was intermittent, depending on which Spark worker instance the “retry” happened to be assigned to. And eventually, Spark retries again with the “-latest” flag applied - ie it jumps over all messages on the queue and starts processing just newly-arriving messages. This led to the misleading impression that the application performance was poor, when it was in fact simply limited to the incoming data rate.
This situation is of course not something that should happen in production - if your spark streaming app is reading data older than the Kafka retention-time then things are already very bad. Nevertheless, the behaviour of Spark + streaming + kafka in this situation is not helpful.
When starting spark-streaming with Kafka, and there is a lot of data already on the queue, then spark-streaming simply allocates all of the existing data to the first batch. This of course implies that the first batch will run extremely long - potentially hours - regardless of the streaming batch window size.
It is therefore necessary to set a sane limit via configuration option ??.
Lots of Small Hive Files
Spark streaming into Hive, or HDFS in general, is suboptimal anyway. Spark streaming is simply batch-mode running at regular time intervals.
When writing into Hive, each task generates an HDFS file for each logical Hive partition its data belongs to. After all tasks have completed for the current streaming batch, the Spark driver application then sends a message to Hive to load those files into the Hive table - ie just move the HDFS files to a new directory (trivial namenode update) (assuming the table is “managed”) and then to add entries in the metastore pointing to the new files.
Unfortunately, this does lead to Hive tables containing lots of files - and the smaller the streaming batch window is, the more HDFS files are in the Hive table.
When the target Hive table is not partitioned, or has reasonably low partitioning cardinality (eg partitioned by month) then the Hive command
alter table Foo concatenate (for non-partitioned) or
alter table Foo partition (colname=value) can be used to merge the files. For tables with lots of logical partitions, this is clearly much trickier.
The obvious solution is to use larger batch windows. I eventually settled on a batch-window of 300 seconds which was small enough to be able to monitor progress (ie see data being added regularly to the table) while producing only 12 files per hour per Kafka partition. Running “alter table .. concatenate” once per day then keeps everything nicely tidy. However see the section on KafkaUtils for issues related to using such a large batch window.
Note: “logical partition” means the partitioning declared in the Hive DDL for the table, separating data into directories based upon the value of one or more columns. The word “partition” is also used to refer to a HDFS file holding data which “belongs to” the table.
TODO: see if declaring the target Hive table as transactional helps at all. Hive does have a “streaming mode” which produces delta files in HDFS, together with a background merging thread that cleans those up automatically. Probably not supported by the Spark/Kafka integration lib, but worth a try…
The standard way of reading from Kafka with Spark is to create a “direct stream” using the ?? lib. And the docs for this lib recommend using the following code:
TODO: show Kafka utils with subscribe
However that “subscribe” approach has significant drawbacks.
Note: very old examples of Spark and Kafka refer to a “kafka receiver”. Just ignore those - that was always an awful way to communicate with Kafka; the “direct stream” approach is far better. I won’t even bother to describe how that receiver stuff worked - believe me, you don’t want to know.
TODO: complain about how streaming creates infinite logfiles
- use KafkaUtils.Assign instead of KafkaUtils.Subscribe to avoid group/session timeouts
- kakfa offset updated before commit to hive occurs
- kafka streaming log4j rolling appender must be configured