Kafka Serialization and the Schema Registry

Categories: BigData

Introduction

I’ve already written about the Apache Kafka Message Broker. It is a fine tool, and very widely used. The Kafka Connect extension helps in importing messages from external systems, or exporting messages to them, and is also excellent.

This article looks at best practices for representing data-structures passing through a system as messages in a Kafka topic - ie how meaningful data-structures can be serialized to a Kafka message. It is assumed that you have read the above articles on Kafka and Kafka Connect (or already understand them well).

This article is based on Kafka 0.10.0.

Confluent and Kafka

The Apache Kafka project provides the Kafka broker, Kafka Connect, and Kafka Streams.

The company Confluent was founded by one of the primary inventors of the Kafka broker. It provides commercial support for Kafka-related products, provides a few commercial add-ons for Kafka (in particular the Confluent Control Center web UI for monitoring and configuration), and also develops/hosts a number of open source projects.

Relevant for this article is that Confluent hosts (and primarily develops) the open-source Schema Registry, and some open-source “converters” for Kafka Connect which communicate with it. These tools are very useful on their own - though there is no UI provided (that is part of the commercial Confluent Control Center product).

I am not an employee of Confluent, nor do I have any financial interest in promoting their products or anything Kafka-related. I just think Kafka is excellent, and the schema registry is too.

Purpose of the Schema Registry

In the Kafka message broker itself, each message consists simply of a (key, value) pair where both key and value are plain byte-arrays. Some Kafka command-line tools (eg script kafka-console-consumer.sh) are smart enough to recognise when a key or value byte-array is actually a UTF8 string, and to render it appropriately on the console. However that’s as far as the Kafka message broker goes in “interpreting” key or value contents.

When someone needs to know what each topic holds (the purpose of the data, and its actual format) then they can potentially ask the development team for the applications which read and write those topics - or read the code. That’s fairly clumsy, but probably works for small projects. When the Kafka Broker cluster has scaled to hold dozens or hundreds of topics, read and written by dozens or hundreds of applications, then that just doesn’t scale - it is clear to outsiders at that point that a lot of very useful data is flowing through the system, but exactly what it is and how it is formatted will be very hard to determine.

Requiring documentation for each topic to be added to a central point (eg a wiki page for the Kafka cluster) might work for a while. However we all know that eventually projects will be deployed without updating documentation - or ever writing any at all. And a simple documentation site will not detect errors involving accidental incompatible changes to the data-format for a topic - until the applications consuming that data break.

The Schema Registry is intended to solve these problems by providing a central point for registering documentation and schemas for each topic - and have compliant Kafka producers and consumers (of which connectors are a special kind) validate the data they write or read using these schemas to immediately detect incompatibilities. The schema registry only supports AVRO schemas, ie it requires the data in a Kafka queue to be in AVRO format.

Having schemas associated with topics is comparable to having relational schemas associated with tables. I’m sure everyone would agree that being able to look at the schema for an arbitrary table is very useful when writing new programs which access existing data. It’s also good to know that data can only be inserted into a relational table if it is compliant with the schema for that table. Using the schema registry with Kafka provides both of these benefits to the Kafka world - data is introspectable and guaranteed consistent. There is a slight difference, however - different records within a Kafka topic can potentially be associated with different versions of the same schema. The schema-registry solution for this is to allocate a unique ID for each variant of each schema, and for each record to be tagged with the relevant ID (which is small); the reader can retrieve the entire schema when needed.

The Schema Registry itself is open-source, and available via Github. However the Web UI available to nicely browse the contents in interactive manner is part of the Confluent enterprise package (license required). Even without the UI, the registry can be very useful - and if your system is so successful that browsing of schemas becomes important, then maybe licensing the necessary software is reasonable..

To understand how the Schema Registry can be used together with Kafka Connect, it is necessary to cover three topics:

  • serialization and deserialization in Kafka producers and consumers
  • using the Schema Registry with standard Kafka producers and consumers
  • using the Schema Registry with Kafka Connect source and sink connectors

Kafka Client Library Serialization and Deserialization

Normal Kafka “producer” applications use the KafkaProducer API to pass (Object key, Object value) pairs to the Kafka client library. The Kafka client library then applies configurable serializers to these key and value objects to convert them to byte-arrays. Not all serializers support all kinds of objects, ie there must be some compatibility between the objects that the code generates and the kinds of serializers configured. The standard serializers included in the core Kafka client jarfiles include:

  • ByteArraySerializer – a trivial “no-op” serializer which requires the object to be a bytearray
  • StringSerializer – a trivial “no-op” serializer which requires the object to be a String

Additional serializers are provided in separate (but still standard) jars:

  • JSONSerializer (requires maven artifact org.apache.kafka:connect-json) - note that this is actually a Kafka Connect artifact, providing both a Kafka serializer and a Kafka Connect Converter (see later)

The JSONSerializer expects the object to be a Map<String, ?> where the map-value may itself be a map, ie a tree-structured object may be returned. It will also accept plain strings, acting like the StringSerializer in this case.

And the company Confluent provides an AVRO serializer:

  • KafkaAvroSerializer (requires maven artifact io.confluent:kafka-avro-serializer)

The reason why this class is called KafkaAvroSerializer rather than simply AvroSerializer is probably avoid a naming conflict with a class from the Avro library.

The source-code for the KafkaAvroSerializer is open-source, but the binary is not available from any standard Maven repository. The kafka-avro-serializer artifact is part of the “schema registry” project at the confluent github site. The easiest way to obtain the binary artifact is to do the following:

  git clone https://github.com/confluentinc/schema-registry.git
  git checkout v3.2.0
  mvn install -DskipTests

The KafkaAvroSerializer expects the objects passed to it to be a subtype of org.apache.avro.generic.IndexedRecord which contains a reference to a schema as well as the data. The schema part describes which datafields may/must be present in the data, and serialization will fail if the data is not compliant with the schema - thus preventing a buggy producer from writing non-compliant data to a topic. The objects passed may be “dynamic” (defined in a map-like manner by adding name/value pairs to an org.apache.avro.generic.GenericRecord) or AVRO tools may be used to generate data-holder classes from an AVRO schema and these classes then used in the producer code.

The steps required to use the AVRO dynamic approach in a producer are:

  • include the kafka-avro-serializer jarfile in the classpath
  • in the configuration properties used to initialise the Producer object, specify serializer=KafkaAvroSerializer
  • (once) define an instance of org.apache.avro.Schema, giving a schema-name and defining which fields may/must be present in a valid record (eg by loading a schema-definition from a textfie in the classpath)
  • for each output record
    • call new org.apache.avro.generic.GenericData.Record(schema) to create a GenericRecord instance referencing the schema (this is a subtype of IndexedRecord)
    • add (name, value) pairs to the GenericRecord
    • call KafkaProducer.send(...) passing a ProducerRecord which wraps the GenericRecord object

The steps required to use the AVRO “generated code” approach are simpler:

  • define an AVRO schema as a textfile
  • use AVRO tools to generate java classes corresponding to that schema (which will be subtypes of AVRO’s SpecificRecordBase and thus IndexedRecord, though that is not relevant for the using code)
  • include the kafka-avro-serializer jarfile in the classpath
  • in the configuration properties used to initialise the Producer object, specify serializer=KafkaAvroSerializer
  • for each output record
    • call new SomeGeneratedType(..) to create an instance of a desired class which was generated from the schema
    • call setters on the object in the usual Java manner
    • call KafkaProducer.send(...) passing a ProducerRecord which wraps the (strongly typed) object

In the dynamic case, the producer code includes a lot of calls to AVRO-specific APIs. With just one or two calls more, the AVRO serialization to byte-array could also be done in the producer code, and then that byte-array passed to the Kafka client library, with the simple ByteArraySerializer being configured. In other words, in the dynamic case, pushing the AVRO serialization down into the Kafka client framework doesn’t help much.

In the static case, however, the custom producer has no direct calls to AVRO at all, and thus delegating the actual serialization to AVRO makes the code cleaner.

And when we look at the Schema Registry integration later, using a KafkaAvroSerializer makes more sense - integration with the Schema Registry can be added transparently to the custom producer code.

So far, we have only looked at the producer side. However using a deserializer with a consumer is pretty much just the reverse. A Consumer object is initialised with a set of properties, and this should specify deserializer=.. where the deserializer-class matches the format of data in the input topic. The consumer is then passed objects by the Kafka client library whose type depends on the serializer configured - eg map-of-maps for JSON, or some subtype of GenericRecord for the AVRO serialization. When using AVRO and generated code, then a concrete object-type can be specified, and instances of this type will be passed to the consumer thus hiding AVRO from the custom consumer code.

TODO: how does the consumer register its schema in the dynamic case?

Note that AVRO messages written optionally include schema information (configurable). When using AVRO serialization in applications that write many objects to an OutputStream, the schema is often embedded in that output-stream. However for the Kafka use-case (many many independent messages using the same schema) that is just too inefficient. The disadvantage of not including the schema with each message is that each consumer must simply parse the data using its own schema definition, in the hope that the data really is compatible. The Schema Registry solves this issue by storing the schema in a central place (once) and just including a “schema id” in each message written to Kafka.

Each Kafka serializer (including ByteArraySerializer, StringSerializer, JSONSerializer, etc) prepends a small header to the keys and values it generates, specifying which serializer generated the message. This allows the consumer to verify that it is using the correct deserializer - and allows tools like kafka-console-consumer.sh to know how to print out messages.

Standard Producers/Consumers and the Schema Registry

The KafkaAvroSerializer already has support for the confluent schema-registry built in; it just needs to be enabled via configuration.

Once enabled, the serializer looks at the topic being written to, and fetches the corresponding schema for that topic - from cache if it has already seen it, from the remote registry if not. The registry maintains a list of schemas for each topic - the “current schema” and previous ones. Each schema has a unique id.

The schema fetched from the registry is compared to the one provided in each record - they should be compatible. Data is written with the retrieved schema format. The remote-schema may omit fields included in the record-schema, in which case those fields will be ignored during serialization. The remote-schema may also add new fields with default values, in which case the default-value will be used during serialization. Incompatible changes (eg the remote schema adds a field without default value) cause a serialization error, thus preventing a producer from writing data that is not compliant with the current schema for that topic.

For each message, the serializer writes a header to each message, containing (magic-number, schema-id) followed by the actual data.

Note: each schema has an ID, but that is not specified by the producer. Instead, the “current schema” is fetched for the target topic.

Caching of schemas works well - schemas registered with the registry may be replaced, but never modified.

There are many advantages to using serialized datastructures for keys and values, and for registering them in a central repository, rather than using a custom/ad-hoc encoding for messages. In particular, this makes it easy to develop new Kafka producers which write to the same topics, or consumers which read from existing topics - exactly what data is in the topic is defined clearly in the schema. Without this registry, a developer would need to read the sourcecode of an existing producer or consumer for that topic to understand the content in that topic.

The Schema Repository provides a central place to administer the AVRO schema associated with each topic. It manages only AVRO schemas; JSON serializers/converters cannot take advantage of it. The Schema Repository server provides a REST API for browsing schemas. It can also be used to update schemas - but will reject updates where the new schema is not backwards-compatible with the current one. The actual schemas are stored in Kafka topics, meaning the Schema Repository is stateless - and so multiple instances can be started for high availability (workers configured to use the Schema Repository will fail if it is not available).

Kafka Connect and the Schema Registry

Kafka Connect takes an opinionated approach to data-formats in topics; its design strongly encourages writing serialized datastructures into the key and value fields of a message. In particular, they really recommend using the AVRO library to define schemas for keys and values. They also (somewhat reluctantly) support JSON serialization for keys and values. If you really insist, you can use the ByteArraySerializer or StringSerializer, but the documentation and examples do not cover this well. Support for structured data makes sense when thinking about moving data to and from storage such as relational databases, non-relational databases, Hive files in Parquet/ORC format, etc. It can be less useful when exchanging data with sensors or old IT systems where converting data into a structured form is significant work that is perhaps better done somewhere other than a connector.

Kafka connect uses a similar approach to the “serializer” feature of the Kafka message broker, but calls them “converters” instead. Actually, a Kafka Connect converter class is usually just a trivial wrapper around the corresponding Serializer and Deserializer classes. In fact:

  • artifact org.apache.kafka:connect-json provides both JSONSerializer and JSONConverter (which is a wrapper around JSONSerializer)
  • artifact io.confluent:kafka-avro-converter provides class AvroConverter (a wrapper around KafkaAvroSerializer) - and has a dependency on artifact kafka-avro-serializer

In the Kafka client library, each Producer or Consumer instance is configured with its own properties, including its own serializer/deserializer. However a Kafka Connect worker instance “hosts” multiple connectors, and the global worker configuration properties define default converters (one for message keys, one for message values) to be applied to each connector (ie to all messages written to the Kafka message broker, or read from it).

In Kafka Connect versions prior to 0.10.1.0, this default converter configuration cannot be overridden - it is identical for all connectors in that worker instance (and should be identical for all workers in a cluster). This works acceptably when all reading and writing of data is done by Kafka Connect sources and sinks. Sadly, it means that if some Kafka topics really must contain JSON format messages, then that representation is forced on all topics accessed by that Kafka Connect cluster. Similarly, if some Kafka producer or consumer application requires “raw” messages not in JSON or AVRO format, then that representation is also forced on all topics managed by the Kafka Connect cluster. Kafka Connect version 0.10.2.0 introduces the ability to override the global default converters on a per-connector basis, by defining the converters to be used in the per-converter configuration; see this ticket and this commit.

Kafka Connect offers custom sources/sinks a slightly different API for writing and reading messages than the broker:

  • instead of calling Producer.send(ProducerRecord), sources must implement a poll method which returns List<SourceRecord>
  • instead of calling Consumer.poll(ConsumerRecord), sinks receive a call to put(Collection<SinkRecord>)

The SourceRecord/SinkRecord interfaces are more abstract than ProducerRecord/ConsumerRecord, and force more “policy” on connector code than the Kafka broker does on producers/consumers. In particular, schemas are only present in plain Kafka producer applications if the ProducerRecords being generated are wrapping AVRO IndexedRecord instances. However in Kafka Connect, the SourceRecord type (equivalent to ProducerRecord) has explicit keySchema and valueSchema parameters; opting out of “strict data validation” is much harder to do.

A SourceRecord includes (topic, keyschema, keyobject, valueschema, valueobject) fields. The (keyschema, keyobject) are passed to the configured “key converter” and the (valueschema, valueobject) are passed to the “value converter”. Connect uses the standard Schema type to abstract away the details of which serialization method is used (eg JSON or AVRO); in both cases the schema is defined in code and must match the associated data. Unlike the broker, Connect offers wrapper class Struct for representing “structured” data in dynamic form - thus custom connector code does not need to deal with AVRO GenericRecords directly. Structs are also compatible with the JSONSerializer.

As with Kafka producers, the connect framework uses the converter (serializer) to verify that the data is consistent with the declared schema. And like plain Kafka, when the converter is AvroConverter (wrapping KafkaAvroSerializer) and the schema-registry is enabled for it, then the current schema for the target topic is retrieved and used during serialization, and the ID of that schema is embedded in the generated output.

In a sink connector, the process is reversed; Kafka Connect pulls a message (as byte-array) from a topic then uses the configured converter to parse it into an appropriate form (usually a Struct instance) before passing it to the sink’s task object for processing.

A nice thing about the converters feature is that it is transparent to the connector/task code whether AVRO and the Schema Registry are being used or not; the code generates a Schema object declaring how it intends to format messages (sources) or how it expects messages to have been formatted (sinks), and the converters take care of the rest. When the schema registry is activated, then additional validation and visibility is available, but it can be enabled/disabled at any time - or encoding can be switched to JSON if desired.

Partitioning Data

Note that messages from source-connectors are by default partitioned via hash(message-key) as with any Kafka producer. Care must therefore be taken when building a message-key if a specific partitioning of the output is desired, and the effects of the converters for message-keys should be taken into account. The Kafka Connect “transforms” API (added in v0.10.2.0) allows the code to generate a complex-structured message-value, and connector configuration to map specific fields from that message-value into the message key. The result is that the administrator deploying a connector can configure partitioning of outputs rather than it being fully defined by the connector implementation.

The KafkaProducer API accepts ProducerRecords which have an explicit parameter allowing the target partition for a message to be specified. The Kafka Connect SourceRecord API does not have this option - as far a I know, routing is always done on the message-key.

Propagating Schema Metadata

One interesting feature of using schemas can be demonstrated by combining a source JDBCConnector with a sink HiveConnector. The JDBC connector is capable of retrieving the metadata for the table it is loading from, and registering this metadata automatically as a schema (if it does not already exist). Records from the source relational table are then tagged with this schema-id when they are written to Kafka. The Hive connector is capable of retrieving the schema for incoming records (cached for performance of course), detecting missing colums in the target Hive table and automatically declaring them.

Tools like Sqoop, Flume or LogStash just don’t do that sort of thing.

References

For more reading on serialization and the schema registry: