Kafka Serialization and the Schema Registry

Categories: BigData

Introduction

I’ve already written about the Apache Kafka Message Broker. It is a fine tool, and very widely used. The Kafka Connect extension helps in importing messages from external systems, or exporting messages to them, and is also excellent.

This article looks at best practices for representing data-structures passing through a system as messages in a Kafka topic - ie how meaningful data-structures can be serialized to a Kafka message. It is assumed that you have read the above articles on Kafka and Kafka Connect (or already understand them well).

This article is based on Kafka 0.10.0.

Confluent and Kafka

The Apache Kafka project provides the Kafka broker, Kafka Connect, and Kafka Streams.

The company Confluent was founded by one of the primary inventors of the Kafka broker. It provides commercial support for Kafka-related products, provides a few commercial add-ons for Kafka (in particular the Confluent Control Center web UI for monitoring and configuration), and also develops/hosts a number of open source projects.

Relevant for this article is that Confluent hosts (and primarily develops) the open-source Schema Registry, and some open-source “converters” for Kafka Connect which communicate with it. These tools are very useful on their own - though there is no UI provided (that is part of the commercial Confluent Control Center product).

I am not an employee of Confluent, nor do I have any financial interest in promoting their products or anything Kafka-related. I just think Kafka is excellent, and the schema registry is too.

The Message Serialization Problem

In the Kafka message broker itself, each message consists simply of a (key, value) pair where both key and value are plain byte-arrays. Some Kafka command-line tools (eg script kafka-console-consumer.sh) are smart enough to recognise when a key or value byte-array is actually a UTF8 string, and to render it appropriately on the console 1. However that’s as far as the Kafka message broker goes in “interpreting” key or value contents.

Messages transferred between applications are, however, structured data. There are very few applications which just pass around blocks of bytes whose content they are not interested in; what applications usually pass around are datastructures with fields. The producer of a message must therefore serialize a datastructure into a byte-array before writing to Kafka, and the consumer must deserialize a byte-array into a datastructure before processing each message. Actually, as a Kafka message is a (key, value) pair this process applies to both key and value.

There are two problems consumers need to deal with when deserializing a message:

  • knowing the dataformat
  • handling changes in the dataformat

When someone needs to know what each topic holds (the purpose of the data, and its actual format) then they can potentially ask the development team for the applications which read and write those topics - or read the code. That’s fairly clumsy, but probably works for small projects. When the Kafka Broker cluster has scaled to hold dozens or hundreds of topics, read and written by dozens or hundreds of applications, then that just doesn’t scale - it is clear to outsiders at that point that a lot of very useful data is flowing through the system, but exactly what it is and how it is formatted will be very hard to determine. Requiring documentation for each topic to be added to a central point (eg a wiki page for the Kafka cluster) might work for a while. However we all know that eventually projects will be deployed without updating documentation - or ever writing any at all. And a simple documentation site will not detect errors involving accidental incompatible changes to the data-format for a topic - until the applications consuming that data break.

Applications evolve over time, so the producer of data may need to start writing messages with slightly different format at some time, eg to add a new field to the datastructure written to the Kafka topic. There might also be multiple producer applications writing messages to the same topic with slightly different versions of the same data. This makes life for the consumer(s) even more complicated - they somehow need to know how to deal with messages of different formats on the same topic.

The rest of this article looks at how basic serialization works in the Kafka client library, and then at the KafkaAvroSerializer + Confluent-Schema-Registry approach to solving these problems.

Basic Message Serialization and Deserialization

Producer applications initialize the Kafka client lib with config-options “key.serializer” and “value.serializer” which specify a class implementing the Kafka client Serializer interface. They then invoke method send passing objects of type ProducerRecord<K,V> which have a field “key” of type K and a field “value” of type V; it is a runtime error if the configured “key.serializer” does not handle objects of type K or the “value.serializer” does not handle objects of type V.

Similarly, consumer applications initialize the Kafka client lib with config-options “key.deserializer” and “value.deserializer”. The consumer app then calls poll() which returns objects of type ConsumerRecord<K,V> with a field “key” of type K and a field “value” of type V; it is a runtime error if the configured deserializer encounters data of the incorrect format in the incoming messages, or if the deserializer produces objects which are not of the expected type K or V. There is no standard way for a deserializer to confirm that a message (bytearray) within a topic is of the expected format, ie no fixed header or similar within the message; the deserializer must simply be correctly configured to match the topic serializer.

The Kafka client libraries provide only trivial Serializer (and matching Deserializer) implementations:

  • ByteArraySerializer which accepts only byte-arrays as input and returns the data unchanged
  • StringSerializer which accepts only strings and returns the equivalent sequence of UTF8 bytes
  • and some serializer objects for primitive types (eg LongSerializer) which are not of much use for serializing message values, but might occasionally be useful for serializing message keys.

A Serializer implementation is not limited to supporting just one input type, although all the basic serializers do in fact support only their own type.

A producer/consumer pair which want to exchange structured data can do their own serialization, where the producer configures the ByteArraySerializer then maps its data with any desired serialization framework (eg Java native serialization, Protobuf, Kryo, or similar) to a byte-array itself. The consumer would similarly use the ByteArrayDeserializer and then explicitly apply the matching framework to deserialize that byte-array itself. If the intermediate form is Json, then perhaps the Json framework would be used to produce a string, and then the StringSerializer used.

However rather than embed serialization in the producer/consumer code, it is also possible for the producer to send ProducerRecord objects containing key/value objects with more complex types and then to configure the producer with a corresponding serializer implementation that can handle that more complex type. The effect is the same, but the work is more elegantly split: the producer now only concerns itself with its core functionality and leaves the complexities of serialization to a separate step. This also allows these more complex serializers to be reused across producers - after all, the work of serialization does not depend on any specific business logic. Similarly, the consumer can be written to expect method poll to return whatever the desired deserializer type returns - a datastructure rather than just a byte-array. The core kafka client lib does not (currently) provide such serializers, but additional libraries do.

A common pattern is for type K or V to somehow embed the associated schema, eg V may be a type holding a (schema, data) pair which the serializer knows how to handle. When the producer is provided with (schema, data) for each message, that component can potentially do interesting things with the schema information (see later). Of course the consumer will need to be aware of such behaviour - but as deserialization is also separated from the consumer business logic, a producer and consumer can successfully exchange datastructures as long as their serializer/deserializer “plugins” are configured to match each other.

The separation of business-logic from serialization/deserialization also provides a point (before serialization, after deserialization) at which Kafka can call out to plugins which then can see the data in its unserialized form and potentially do all sorts of interesting things as messages are being produced/consumed.

Json Serialization/Deserialization

Maven artifact org.apache.kafka:connect-json:${kafka.version} provides class org.apache.kafka.connect.json.JsonSerializer. which implements the Kafka Serializer interface, ie has a method “serialize” which takes an Object as parameter.

The object to serialize must be of type com.fasterxml.jackson.databind.JsonNode. Serialization is always done via a default-configured instance of com.fasterxml.jackson.databind.ObjectMapper. The serializer accepts no schema information, and has no configuration settings.

The same artifact provides a JsonDeserializer class which can be configured for a KafkaConsumer instance. The consumer’s poll method then returns instances of JsonNode. Again, no schema is available and there are no configuration settings.

Note that the JsonSerializer and JsonDeserializer are provided in an artifact indicating kafka-connect; this is because the same artifact defines the JsonConverter type for use in kafka-connect. However there is no reason why JsonSerializer/JsonDeserializer cannot be used in a plain producer/consumer application. They don’t provide much functionality (embedding Jackson directly in the app and using ByteArraySerializer is just as easy) but do allow the general concept of separation of serialization to be used.

Avro Serialization/Deserialization and the Schema Registry

Json is not a very efficient way of encoding data. There are several good frameworks for encoding Java objects to binary forms including Protobuf, Kryo and Avro; the only one with an available Kafka serializer/deserializer adapter (as far as I know) is Avro, and that serializer is provided by the company Confluent. Note however that the Avro serializer provided by Confluent is tightly integrated with the Schema Registry (see later) and cannot be used without it.

The KafkaAvroSerializer and KafkaAvroDeserializer classes are provided in maven artifact io.confluent:kafka-avro-serializer:{confluent-platform-version}. Note that the somewhat clumsy names with the Kafka prefix are needed because underneath they depend on a class from the Avro serialization library called AvroSerializer, and reusing the same name would be too confusing. Jackson’s equivalent class is called ObjectMapper so no such name-clash occurs in the JsonSerializer.

Avro (and many similar frameworks) can serialize objects given a (schema-descriptor, data-descriptor) pair, where the data-descriptor is a map-like set of (fieldname, fieldvalue) entries; this mode is called “dynamic serialization”. It is also possible to write a schema ahead-of-time and then generate Java DTO classes from that schema, and then pass instances of those generated classes (which hold a ref to their schema) to Avro for serialization. Avro’s wrapper-type for (schema, data) pairs is called GenericRecord.

When using the static/pre-generated-class approach, the producer and consumer applications do not directly use Avro functionality; they deal in the generated types only. The producer sends ProducerRecord objects whose key and/or value are instances of an Avro-generated type; the AvroSerializer extracts the schema from the object and then serializes it. On the consumer side, if the generated classes are also available, and the Avro deserializer was configured with SPECIFIC_AVRO_READER_CONFIG=true then instances of the corresponding types are returned, else GenericRecord. Note that when deserializing into generated types, then the generated classes in the classpath have their own embedded schema - and so the Avro framework has a “reader schema” to use/validate-against when deserializing.

When using the dynamic approach, producer applications need to send ProducerRecord objects which contain GenericRecord instances, ie (schema, map-like) pairs. Alternatively, the ProducerRecord can have a null schema and a value of one of the following types; there is a “generic schema” which can be applied to represent any of these:

  • boolean, int, long, float, double, String
  • byte[]

Note that the Kafka-connect Struct type is not supported (see later).

On the consumer side, reading of data serialized as a GenericRecord is returned as a GenericRecord, and other types are returned as their type. This example uses Avro types GenericRecord and IndexedRecord directly.

The only significant config-setting for the KafkaAvroSerializer is schema.registry.url which is a comma-separated list of schema-registries.

Note that when using the Avro serializer (or a similar serializer implementation for Protobuf/Kryo/etc), serialization is elegantly separated from the business-logic of producing messages, but the producer still compiles in the schema-definitions (as it embeds a reference in each ProducerRecord object). This static relation is correct - the code really does produce messages of fixed structure, and cannot do otherwise without changing the code. On the consumer side, there is a compiled-in schema only when using Avro-generated classes. However even when using dynamic behaviour on the consumer side, the code still makes assumptions about the incoming data ie there is implicitly a kind of schema.

Just to repeat once more: the ProducerRecord type must embed key/value objects which are either instances of GenericRecord (ie (schema, map-like) pairs), or instances of generated types (which implement IndexedRecord). On the consumer side, the poll method will either return instances of GenericRecord, or instances of generated classes (which implicitly have a link to their schema).

There are two very big differences between the described Json serializer and this Avro serializer: that with Avro the producer specifies a schema (directly or indirectly), and that the producer and consumer integrate with the schema registry. When a record is written by a producer to a specific topic, the KafkaAvroSerializer fetches the “current schema” for the target topic from its local cache - or downloads it from the schema registry if it is not already locally cached. The writing then proceeds with the (writerschema, topicschema) pair - something that Avro serialization supports. When the two schemas are identical, then the behaviour is obvious. When the writerschema is compatible with the topicschema then Avro makes the necessary adaptations (eg discarding fields from the input object which are in the writerschema but not in the topicschema). When the writerschema is incompatible with the topicschema, then serialization fails and the record is not written.

The result is that each message in the kafka topic always contains the ID of the schema which was associated with the topic at the time that the message was written - and only at the cost of a 4-byte schema id rather than a complete copy of the schema. If the schema for a topic needs to be changed, then a new schema (with a new id) becomes the “current schema” for the topic, but the old schema is not modified or removed. Existing messages in the topic are therefore still correctly tagged with the schema that applies to their content.

On the consumer side, as a message is read its schema-id is extracted. If this matches no locally-cached schema then the specified schema is downloaded from the registry and cached. The Avro library then has access to the schema it needs to properly decode the message contents. When deserializing into generated classes in the local classpath, which have their schema associated with them, then schema compatibility can also be verified at that point.

The KafkaAvroSerializer is open-source, but the binary is not available from any standard Maven repository. The kafka-avro-serializer artifact is part of the “schema registry” project at the confluent github site. The easiest way to obtain the binary artifact is to do the following:

  git clone https://github.com/confluentinc/schema-registry.git
  git checkout v3.2.0
  mvn install -DskipTests

Additional Features of the Schema Registry

As described in the section on the KafkaAvroSerializer, the Schema Registry provides a central point for registering schemas for each topic in order to efficiently bind a schema to each message in kafka without having to embed entire schemas. It also provides an official schema for each topic, preventing producers from writing incompatible data to the topic.

However having the schema for each topic in the registry provides some other useful features. In particular, it is possible to browse the schema registry to see the official schema for each topic - something very useful when there are many topics in a system. Having schemas associated with topics is comparable to having relational schemas associated with tables. I’m sure everyone would agree that being able to look at the schema for an arbitrary table is very useful when writing new programs which access existing data. It’s also good to know that data can only be inserted into a relational table if it is compliant with the schema for that table. Using the schema registry with Kafka provides both of these benefits to the Kafka world - data is introspectable and guaranteed consistent. There is a slight difference, however - different records within a Kafka topic can potentially be associated with different versions of the same schema.

Schemas in the registry are immutable, allowing client applications to cache them efficiently. The current binding from topic to schema can be changed (ie the topic->schemaId mapping) but a registered schema is never modified or deleted.

Note: currently the Confluent schema registry only supports one kind of schema and serialization - Avro.

The Schema Registry itself is open-source, and available via Github. However the Web UI available to nicely browse the contents in interactive manner is part of the Confluent enterprise package (license required). Even without the UI, the registry can be very useful - and if your system is so successful that browsing of schemas becomes important, then maybe licensing the necessary software is reasonable..

The Schema Repository manages only Avro schemas; Json serializers/converters cannot take advantage of it. The schema definitions themselves are stored in compacted Kafka topics, meaning the Schema Repository is stateless - and so multiple instances can be started for high availability (workers configured to use the Schema Repository will fail if it is not available).

Kafka Connect Connectors

The Kafka Connect framework for moving data between external storage and Kafka sometimes needs more control over schemas than is provided by the fairly simple Kafka core Serializer and Deserializer interfaces. It therefore has invented a wrapper type org.apache.kafka.connect.storage.Converter; a Converter has a method for serializing and one for deserializing. Converters are also typically more featureful than the pretty single-purpose Serializer and Deserializer implementations presented above.

Although the standard converters are intended for use with Kafka Connect, there is no reason why these cannot be used in a standalone producer or consumer application if desired.

The KafkaAvroSerializer just uses the native schema types from the Avro library to represent schemas. However Connectors exist for other serialization formats (including Json) and so there is a need for a portable representation of schemas and map-like data representations; these types have been added to the Kafka libraries as org.apache.kafka.connect.data.Schema and org.apache.kafka.connect.data.Struct.

The Connector interface is:

void 	        configure(Map<String,?> configs, boolean isKey);
byte[] 	        fromConnectData(String topic, Schema schema, Object value);
SchemaAndValue 	toConnectData(String topic, byte[] value);

The StringConverter

While there is a trivial Serializer implementation for every primitive type, the only “primitive” converter implementation is StringConverter. However JsonSerializer supports all basic types (including String) so it can be used where needed (with schema.enable off) to serialize other types.

The StringConverter simply calls toString on every object passed to it - ie it is NOT suitable for many types, including byte-arrays.

The StringConverter ignores schemas.

The JsonConverter

The JsonConverter implementation can be found in the same artifact as JsonSerializer/JsonDeserializer.

While the JsonSerializer supports only JsonNode as an input parameter, JsonConverter can be passed any of the following types, with or without a schema:

  • boolean, byte, short, int, long, float, double, String
  • byte[], ByteBuffer
  • Collection
  • Map
  • Struct (ie the kafka-connect representation of generic map-like data)

The JsonSerializer cannot be passed arbitrary Java beans to serialize.

When a schema is provided to the fromConnectData method, then:

  • the value parameter is validated against the specified schema (eg unknown fields in the value cause serialization to be rejected)
  • values (whether direct or embedded in a Map/Struct) may also be of type Date, Timestamp or BigDecimal

A schema is necessary for supporting Date/Timestamp/BigDecimal because Json is limited in how it represents values: they are either quoted-strings or not. Round-tripping data via Json can therefore lead to type-loss, eg a date must be stored in Json as a string - but then it is not clear when deserializing what the original type was. Similarly, BigDecimals must be represented in Json as strings.

On the consuming end, a schema is also necessary if Date/Timestamp/BigDecimal types are to be deserialized correctly.

The JsonConverter has just one significant configuration option: schema.enable2. When schema.enable is false then Json serialization happens just as would be expected. When schema.enable is true then the generated Json is of form

{
  "schema":{...},
  "payload": ...
}

The schema part allows a consumer to dynamically obtain more information about fields in the Json payload than can be deduced from raw Json - eg whether a Json string should actually be a Date or BigDecimal. It also allows the sender to specify “default values” for fields in the schema, which are applied if the payload does not define them. Method JsonConverter.toConnectData(..) uses exactly this approach to correctly round-trip those tricky date/timestamp/numeric fields.

When schema.enable is true but null is passed as the schema parameter of method fromConnectData then the “schema” field of the output Json is just null (no schema is auto-generated).

Of course, when a topic contains many messages with small payloads then enabling schemas can have a significant overhead.

In short, JsonConverter adds a lot of functionality to JsonSerializer. It may therefore be worth using this directly from producer/consumer apps which are not kafka-connect connectors. However given the inefficiency of Json, moving to the Avro serializer or converter is even better.

The AvroConverter

While the Avro serializer class is called KafkaAvroSerializer to avoid a name-clash with a class from the Avro library, the converter has no such problem and so can be named in the obvious way.

Unlike the combined JsonSerializer/JsonConverter artifact, the KafkaAvroSerializer and AvroConverter are in different artifacts; see io.confluent:kafka-avro-converter:{confluent-platform-version}.

Like the JsonConverter, the AvroConverter accepts:

  • boolean, byte, short, int, long, float, double, String
  • byte[], ByteBuffer
  • Collection
  • Map
  • Struct (ie the kafka-connect representation of generic map-like data)

However such data is still encoded in Avro, using the “default schema” which is flexible enough to represent all the above data-types.

Like the KafkaAvroSerializer (because it wraps one), AvroConverter requires a schema registry server. The KafkaAvroSerializer could potentially be implemented like the JsonConverter with “schema.enable” and embed the schema in each message - but no such functionality exists, and it is unlikely anyone would bother, given the advantages of having a schema registry.

The KafkaAvroSerializer requires schema information encoded in the Avro types; the AvroConverter takes schema information in the kafka-connect Schema representation and then internally maps that to the Avro equivalent.

Kafka Connect and the Schema Registry

Kafka Connect takes an opinionated approach to data-formats in topics; its design strongly encourages writing serialized datastructures into the key and value fields of a message. In particular, they really recommend using the Avro converter to define schemas for keys and values. They also (somewhat reluctantly) support the Json converter for keys and values. If you really insist, you can use the ByteArraySerializer or StringSerializer, but the documentation and examples do not cover this well. Support for structured data makes sense when thinking about moving data to and from storage such as relational databases, non-relational databases, Hive files in Parquet/ORC format, etc. It can be less useful when exchanging data with sensors or old IT systems where converting data into a structured form is significant work that is perhaps better done somewhere other than a connector.

In the Kafka client library, each Producer or Consumer instance is configured with its own properties, including its own serializer/deserializer. However a Kafka Connect worker instance “hosts” multiple connectors, and the global worker configuration properties define default converters (one for message keys, one for message values) to be applied to each connector (ie to all messages written to the Kafka message broker, or read from it).

In Kafka Connect versions prior to 0.10.1.0, this default converter configuration cannot be overridden - it is identical for all connectors in that worker instance (and should be identical for all workers in a cluster). This works acceptably when all reading and writing of data is done by Kafka Connect sources and sinks. Sadly, it means that if some Kafka topics really must contain Json format messages, then that representation is forced on all topics accessed by that Kafka Connect cluster. Similarly, if some Kafka producer or consumer application requires “raw” messages not in Json or Avro format, then that representation is also forced on all topics managed by the Kafka Connect cluster. Kafka Connect version 0.10.2.0 introduces the ability to override the global default converters on a per-connector basis, by defining the converters to be used in the per-converter configuration; see this ticket and this commit.

Kafka Connect offers custom sources/sinks a slightly different API for writing and reading messages than the broker:

  • instead of calling Producer.send(ProducerRecord), sources must implement a poll method which returns List<SourceRecord>
  • instead of calling Consumer.poll(ConsumerRecord), sinks receive a call to put(Collection<SinkRecord>)

The SourceRecord/SinkRecord interfaces are more abstract than ProducerRecord/ConsumerRecord, and force more “policy” on connector code than the Kafka broker does on producers/consumers. In particular, schemas are only present in plain Kafka producer applications if the ProducerRecords being generated are wrapping Avro IndexedRecord instances. However in Kafka Connect, the SourceRecord type (equivalent to ProducerRecord) has explicit keySchema and valueSchema parameters; opting out of “strict data validation” is much harder to do.

A SourceRecord includes (topic, keyschema, keyobject, valueschema, valueobject) fields. The (keyschema, keyobject) are passed to the configured “key converter” and the (valueschema, valueobject) are passed to the “value converter”. Connect uses the standard Schema type to abstract away the details of which serialization method is used (eg Json or Avro); in both cases the schema is defined in code and must match the associated data. Unlike the broker, Connect offers wrapper class Struct for representing “structured” data in dynamic form - thus custom connector code does not need to deal with Avro GenericRecords directly. Structs are also compatible with the JsonSerializer.

As with Kafka producers, the connect framework uses the converter (serializer) to verify that the data is consistent with the declared schema. And like plain Kafka, when the converter is AvroConverter (wrapping KafkaAvroSerializer) and the schema-registry is enabled for it, then the current schema for the target topic is retrieved and used during serialization, and the ID of that schema is embedded in the generated output.

In a sink connector, the process is reversed; Kafka Connect pulls a message (as byte-array) from a topic then uses the configured converter to parse it into an appropriate form (usually a Struct instance) before passing it to the sink’s task object for processing.

A nice thing about the converters feature is that it is transparent to the connector/task code whether Avro and the Schema Registry are being used or not; the code generates a Schema object declaring how it intends to format messages (sources) or how it expects messages to have been formatted (sinks), and the converters take care of the rest. When the schema registry is activated, then additional validation and visibility is available, but it can be enabled/disabled at any time - or encoding can be switched to Json if desired.

Partitioning Data

Note that messages from source-connectors are by default partitioned via hash(message-key) as with any Kafka producer. Care must therefore be taken when building a message-key if a specific partitioning of the output is desired, and the effects of the converters for message-keys should be taken into account. The Kafka Connect “transforms” API (added in v0.10.2.0) allows the code to generate a complex-structured message-value, and connector configuration to map specific fields from that message-value into the message key. The result is that the administrator deploying a connector can configure partitioning of outputs rather than it being fully defined by the connector implementation.

The KafkaProducer API accepts ProducerRecords which have an explicit parameter allowing the target partition for a message to be specified. The Kafka Connect SourceRecord API does not have this option - as far a I know, routing is always done on the message-key.

Propagating Schema Metadata

One interesting feature of using schemas can be demonstrated by combining a source JDBCConnector with a sink HiveConnector. The JDBC connector is capable of retrieving the metadata for the table it is loading from, and registering this metadata automatically as a schema (if it does not already exist). Records from the source relational table are then tagged with this schema-id when they are written to Kafka. The Hive connector is capable of retrieving the schema for incoming records (cached for performance of course), detecting missing colums in the target Hive table and automatically declaring them.

Tools like Sqoop, Flume or LogStash just don’t do that sort of thing.

Using the KafkaAvroSerializer

The steps required to use the Avro “dynamic” approach are well documented in the link above.

The steps required to use the Avro “generated code” approach are simple:

  • define an Avro schema as a textfile
  • use Avro tools to generate Java classes corresponding to that schema (which will be subtypes of Avro’s SpecificRecordBase and thus IndexedRecord, though that is not relevant for the using code)
  • include the kafka-avro-serializer jarfile in the classpath
  • in the configuration properties used to initialise the Producer object, specify serializer=KafkaAvroSerializer
  • for each output record
    • call new SomeGeneratedType(..) to create an instance of a desired class which was generated from the schema
    • call setters on the object in the usual Java manner
    • call KafkaProducer.send(...) passing a ProducerRecord which wraps the (strongly typed) object

In the dynamic case, the producer code includes a lot of calls to Avro-specific APIs. With just one or two calls more, the Avro serialization to byte-array could also be done in the producer code, and then that byte-array passed to the Kafka client library, with the simple ByteArraySerializer being configured. In other words, in the dynamic case, pushing the Avro serialization down into the Kafka client framework doesn’t help much.

In the static case, however, the custom producer has no direct calls to Avro at all, and thus delegating the actual serialization to Avro makes the code cleaner.

And when we look at the Schema Registry integration, using a KafkaAvroSerializer makes more sense - integration with the Schema Registry can be added transparently to the custom producer code.

References

For more reading on serialization and the schema registry:

  1. The console consumer can be explicitly configured with a “formatter” for displaying messages serialized using more than just StringSerializer 

  2. Actually, the JsonConverter config options are “key.schema.enable” and “value.schema.enable” because the setting is global but there are separate converters for key and value; a converter knows whether it is serializing keys or values, and uses the corresponding setting.