Processing CSV Files with Beam
Apache Beam can be used to process data in batch (“bounded”) or streaming (“unbounded”) form. When processing bounded data from a file, it is usual for those files to be large - if they fit on a single machine, then you should possibly use a simpler way of processing them. So it is not too surprising that writing Beam code to read a single line from a file is tricky. Sadly, this is useful - in particular, I needed to process large CSV-format files which have a “header” as the first line of the file which affects how the other lines will be processed.
Here is the code I eventually came up with to solve this (using Beam 2.4.0); thanks go to Luka Obradovic who put up code on GitHub which formed the basis of this solution.
/**
* The MIT License (MIT)
*
* Copyright (C) 2016 Luka Obradovic.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package net.vonos.beam.transforms;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.CompressedSource;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.CoderUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
/**
* Returns the first text line from any file (without reading the whole file).
* <p>
* The input file is marked as "not splittable", thus all reading from the file occurs on a single node rather than in parallel.
* </p>
* <pre>
* Usage:
* PCollection<String> = pipeline.apply(Read.from(HeaderSource.from(options.getInput(), EmptyMatchTreatment.DISALLOW)));
*
* Usually combined with .apply(View.asSingleton) to produce a PCollectionView which can then be used to pass the first line
* of the file as a side-input to other processing steps.
* </pre>
*
* @author Luka Obradovic (obradovic.luka.83@gmail.com)
* @author Simon Kitching (update from Dataflow 1.x to Beam and simplify to only return the header line)
*/
public class HeaderSource extends FileBasedSource<String> {
private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
/** Factory method for use from "user code" */
public static HeaderSource from(ValueProvider<String> fileOrPatternSpec, EmptyMatchTreatment emptyMatchTreatment) {
return new HeaderSource(
fileOrPatternSpec,
emptyMatchTreatment);
}
/** Factory method for use from "user code" */
public static FileBasedSource<String> from(
ValueProvider<String> fileOrPatternSpec,
EmptyMatchTreatment emptyMatchTreatment,
Compression compression) {
HeaderSource hs = new HeaderSource(
fileOrPatternSpec,
emptyMatchTreatment);
return CompressedSource.from(hs).withCompression(compression);
}
/** Constructor used by factory method. */
private HeaderSource(
final ValueProvider<String> fileOrPatternSpec,
EmptyMatchTreatment emptyMatchTreatment) {
super(fileOrPatternSpec, emptyMatchTreatment, DEFAULT_MIN_BUNDLE_SIZE);
}
/** Constructor used after createForSubrangeOfFile is invoked. */
private HeaderSource(
final MatchResult.Metadata metadata,
long minBundleSize,
long start,
long end) {
super(metadata, minBundleSize, start, end);
}
/** No need for this source to be splittable; it only reads one line. */
protected boolean isSplittable() throws Exception {
return false;
}
@Override
protected FileBasedSource<String> createForSubrangeOfFile(
final MatchResult.Metadata metadata,
final long start,
final long end) {
return new HeaderSource(
metadata,
getMinBundleSize(),
start,
end);
}
@Override
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
return new HeaderReader(this);
}
@Override
public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}
// ============================================================================================
/**
* Object responsible for reading a specific range of the input file.
* <p>
* As the parent class sets isSplittable=false, there will actually be only one of these..
* </p>
*/
private static class HeaderReader extends FileBasedReader<String> {
private final ByteBuffer buf;
private ReadableByteChannel channel;
private long currOffset;
private String currentRecord;
HeaderReader(final HeaderSource source) {
super(source);
buf = ByteBuffer.allocate(4096);
buf.flip();
}
@Override
public void close() throws IOException {
super.close();
}
@Override
protected void startReading(final ReadableByteChannel channel) {
this.channel = channel;
}
@Override
protected boolean readNextRecord() throws IOException {
if (currentRecord != null) {
// Have already read everything we need to read. Returning false here should cause the
// close method on this class to be invoked in the near future, which will then close
// the channel.
return false;
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
currOffset += readLine(channel, buf, out);
currentRecord = bufToString(out);
return true;
}
private static String bufToString(ByteArrayOutputStream buf) throws CoderException {
return CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), buf.toByteArray());
}
private static int readLine(final ReadableByteChannel channel, ByteBuffer buf, ByteArrayOutputStream out) throws IOException {
int bytesRead = 0;
while (true) {
if (!buf.hasRemaining()) {
buf.clear();
int read = channel.read(buf);
if (read < 0) {
break;
}
buf.flip();
}
byte b = buf.get();
++bytesRead;
if (b == '\r') {
continue;
}
if (b == '\n') {
break;
}
out.write(b);
}
return bytesRead;
}
@Override
protected boolean isAtSplitPoint() {
// Every record is at a split point.
return true;
}
@Override
protected long getCurrentOffset() {
return currOffset;
}
@Override
public String getCurrent() throws NoSuchElementException {
return currentRecord;
}
}
}
As noted in the javadoc for the class, it is only a few lines of Beam code to use this class to read the first line from a file, create a PCollectionView from that line, and then read the file in the usual Beam way with that header information as a “side input” to the code that processes each line. (Luka’s original code always prepended the header to each line, rather than using a side-input, and was for Beam’s predecessor Dataflow 1.x).
The functionality in HeaderSource could possibly be implemented using the relatively new splittable DoFn API (which is just a couple of extra annotation-methods on a standard DoFn class). However the Beam capability matrix currently shows that only a couple of beam “back ends” support splittable DoFns at the current time.