Beam Code for sending a Pubsub Message after Write

Categories: Cloud

Introduction

Sometimes it is useful to run a Beam application to process data, and then send a Google Pubsub message after that data has been written to Google Cloud Storage or to BigQuery. This allows “further processing” of the data to be triggered. The following code fragments show one way to achieve this.

Although this is technically a Beam-related article, it is probably only relevant when developing in the Google environment (in which case the Beam code is probably going to be run on Google Dataflow).

Sending a Pubsub Message After File Write

Example code:


public class .... {
    /** Map filename to PubsubMessage object holding that filename as an attribute. */
    private static PubsubMessage buildMessageForFile(String filename) {
        // When processing Pubsub messages in a cloud-function, param "event" provides the message. Property "event.data" returns the data field
        // given here while "event.attributes.somekey" returns the values from the attributes map used here.
        //
        // Here, we pass the filename in an event attribute, "event.attributes.filename".
        Map<String, String> attrs = Collections.singletonMap("filename", filename);
        return new PubsubMessage(null, attrs);
    }

    public static void main(String[] args) {
        ....
        someCollection.apply(TextIO.write().to(options.getOutput()).withoutSharding().withOutputFilenames()) //
                            .getPerDestinationOutputFilenames() //
                            .apply(MapElements.into(TypeDescriptors.of(PubsubMessage.class)).via(f -> buildMessageForFile(f.getValue())))
                            .apply(PubsubIO.writeMessages().to(options.getTopicName()));
        ....
    }
}

The target topic must be created before writing to it; this can be done with the gcloud commandline tool:

  • gcloud pubsub subscriptions create {subname} --topic {topicname}

The contents of the topic can be viewed (ie to test whether the above code actually worked) via:

  • gcloud pubsub subscriptions pull {subname}

Sending a Pubsub Message after BigQuery Write

This is somewhat trickier than the file case. The TextIO.write() transform produces a collection of the names of files that were created, and the presence of a filename indicates that the file has been written. The BigQueryIO.write() instead generates a WriteResult object as its output-type; this is not a PCollection, ie it does not (directly) represent a single stream of output values, and so apply cannot be invoked directly on it. The WriteResult object currently has only a single property:

  • failedInserts: a collection of records which “failed to be inserted”.

We really want to trigger sending of the message when all data in the window (global for batch processing) has been written, regardless of whether failures were present. And we only want it sent once, whether zero or multiple failed messages are present.

There are two possible approaches to this:

  • use Wait.on() (which I have tested)
  • use the failure-list as a “side input” (seems theoretically possible but not tested)

The Wait.on transform (since Beam 2.3.0) is designed for exactly this scenario - block processing on one collection until the window for another collection has completed. The message to send is presumably generated from some summary of the collection of records sent to BigQuery, eg min/max record ids. Wait.on can be used to “block” the transform that sends this data to pubsub until the write-transform has completed.

Alternatively, a transform which uses some collection as a “side input” will wait until that side-input is complete. The failure-list could therefore be transformed into a count of failed messages (or some other guaranteed-small collection), and then this (small) collection converted to a PCollectionView and used as a side-input to the transform that sends the pubsub message.

There is one further problem: when using BiqQueryIO.write().withMethod(Method.STREAMING_INSERTS) this works fine. However in Beam 2.3.0, when using Method.FILE_LOADS, the WriteResult.failures is actually a “dummy” collection which is immediately “complete” even before the file-loading is executed. An enhancement-request has been filed to fix this, but until that happens STREAMING_INSERTS must be used if it is necessary to delay sending of the pubsub message (or whatever other processing you require) until after loading into BigQuery has completed.

Bugreports

The following bugreports may be relevant to the topic of send-after-write: