Vagrant, Kafka and Kerberos

Categories: Programming, Cryptography, BigData

Overview

This article is a kind of combined demo; it shows some basic Vagrant usage and also how to set up a virtual machine running Ubuntu Linux, with the following software installed:

  • a Kerberos KDC
  • Zookeeper (just one node)
  • Kafka Message Broker (just one node)
  • Kafka Connect (two nodes)

I use the resulting VM for testing applications which use Kafka and Kafka Connect (whether kerberos-enabled or not), and hope it may be useful to you for similar purposes. It also gives a useful overview of Vagrant, and of the other components listed above.

This environment is obviously not production ready; it is suitable for development uses only.

Vagrant

First, a quick intro to Vagrant for those who have not used it before…

Overview

Vagrant is a fairly simple but useful open-source “virtual machine management” tool. It provides a standard configuration-file format for defining a virtual machine instance, including network ports. This file can then be executed against a range of different hypervisors - local ones such as VirtualBox or KVM, or web-providers such as AWS.

A vagrantfile defines the basic attributes needed for a VM. Vagrant itself then provides a portable cross-platform and cross-hypervisor commandline tool to create and boot the VM specified in the configuration file, to pause or stop the VM, and to open an SSH console to it (without needing to explicitly configure keys).

And very importantly, there is a central repository of open-source-operating-system-images called atlas which can be used as the base image for a new VM created by Vagrant.

Vagrant is primarily used to define temporary VMs for test and development rather than setting up production systems. While it is possible to manage small numbers of related VMs via Vagrant, it is not intended for “large scale” VM orchestration; production setups should instead apply tools like Hashicorp Terraform, AWS CloudFormation or RedHat Cloudforms.

Vagrant has only very basic support for provisioning the created VM, ie modifying the VM after it has been booted. The vagrantfile specifies one or more “provisioner plugins” to be executed once the target VM is up and running. See later for more information on provisioning.

Note that with Vagrant, each new VM starts from a clean VM image. If complex and time-consuming provisioning is needed, then it will be repeated each time a new instance is created. It is of course possible to generate a custom “base VM image”, but generating such an image is not part of Vagrant’s functionality.

Vagrant Supported Hypervisors

Vagrant supports several VM technologies, ie can launch an image via VirtualBox (multi-platform), Hyper-V (Microsoft), VMWare (multiplatform), AWS (amazon), KVM (linux) and others.

Some VM technologies (eg VMWare, AWS) provide servers through which network commands can be sent to download and start specific VM images. When used with one of these technologies, Vagrant can be used to set up a virtual machine on some remote guest system. With VM technologies which do not provide such remote-management (eg KVM, Virtualbox), Vagrant can only start the VM on the local host on which the Vagrant command has been executed.

Each hypervisor is supported via a corresponding provider plugin; anyone can write provider plugins for additional hypervisor tools if necessary.

Vagrant Supported Provisioning Tools

Vagrant includes several default provisioner plugins including shellscript, puppet, ansible, and docker.

The shellscript provisioner is the simplest; the specified shellscript is simply executed within the new VM. Typically the script executes “apt” to install software, “useradd” to create users, etc.

The puppet/ansible/etc provisioners simply execute the relevant provisioning software within the VM, using a configuration definition script specified in the Vagrantfile. As with shellscript-provisioning, these tools can download and install software, modify configuration files, add users, and such tasks. See documentation on the various tools for more details.

The docker provisioner plugin is particularly interesting; it installs docker itself within the VM and can then download and deploy docker images specified in the Vagrantfile.

The Vagrant Implementation

Vagrant is implemented in Ruby. It is an open project, but was originally implemented, and is still primarily maintained, by Hashicorp.

Hashicorp provide a number of tools related to virtual machine management, mostly/exclusively implemented in Ruby. Terraform is a commercial product from Hashicorp which is effectively Vagrant’s bigger brother. Terraform is intended for managing large numbers of virtual machines in production environments.

The Target VM: Software to Install

Here are the things that are going to be installed via Vagrant and the “shellscript provisioner” into a VM for purposes of testing Kafka..

Kerberos KDC

Kerberos is an extremely well-known and widely used authentication and encryption system.

A KDC is a “key distribution center”; a registry of symmetric keys for users and services which wish to securely communicate with each other.

Often, datacenters that use Kafka do not bother with encryption or authentication, and just assume the datacenter is “inaccessible” and all users are trustworthy (have userids but these do not need to be verified). If this is not the case, ie the cluster needs to be secured, then Zookeeper, Kafka, Kafka Connect, and various other BigData components support Kerberos ie can be configured to use a KDC to verify the identity of the remote component as each network socket is opened, and encrypt data with the associated session-key.

In order to have a development/test environment with Kerberos-enabled Kafka, a KDC is needed. The vagrant configuration and associated provisioning scripts presented in this article show how to set up a KDC for development purposes.

When choosing which KDC to install in the VM, I considered four possibilities:

  • Heimdal Kerberos
  • MIT Kerberos
  • Apache Directory Server
  • Kerby

Heimdal Kerberos is well-known, widely used, and available as native packages on most Linux distributions; a simple “apt install” is all that is necessary. Configuring it as a KDC with a local database of users was actually quite simple. Heimdal is also the standard KDC implementation used on MacOS.

MIT Kerberos is probably the most widely-used implementation of Kerberos (apart from the one built into Microsoft Active Directory). It is available as native packages on most Linux distributions. I tried it out, but was unable to set up multiple domains within a single KDC. I also found its configuration format more confusing than that of Heimdal, and its error-logging was very unhelpful. Heimdal was nicer to work with.

Apache Directory Server is a combined Kerberos and LDAP implementation in Java. I looked at it briefly, but installation does not seem to be trivial and the feature-set is overkill for a simple kerberos-enabled Kafka environment for test purposes.

Kerby is a subproject of the Apache Directory Server project, providing a very simple KDC implementation intended for embedding in other software or for use in unit and integration tests that interact with Kerberos. I can imagine such a thing being very useful. However running it as a standalone KDC was not such a success - the project is very young (1.0-RC1 at the current time), documentation is not good, and many features are missing. Setting up Heimdal was significantly easier than setting up Kerby.

By the way, a stack-overflow comment I found claimed that using a local KDC for testing was pointless as DNS configuration was also necessary. This is completely wrong; while a KDC can interact with DNS in some situations, setting up a kerberos-enabled Kafka environment (or using a KDC for other similar purposes) does not need any custom DNS settings.

The Kerberos Client Configuration File

Kerberos client applications (in particular kinit) need a kerberos configuration file to tell it how to reach the relevant KDC servers. By default Kerberos client apps use config-file /etc/krb5.conf.

One part of this file is a set of “domain->realm” mappings. This stuff is important for tools like “kerberised-ftp”. Command “ftp some.host” will need to use the current TGT to allocate a service ticket for some service. It must therefore know the Principal of the target service - but has only a DNS domain-name. It is the domain/realm mappings in the Kerberos client config file which let it figure out the relevant KDC. For our test-vm, we do need to create an /etc/krb5.conf file; however as we do not need kerberised-ftp or any similar tools, the config-file does not need to set up any domain/realm mappings.

Zookeeper

Zookeeper is a well-known project of the Apache Foundation. It is a kind of distributed database, but a very specialized one; its primary purpose is to provide “synchronization primitives” and basic shared configuration data for a set of external processes.

In production a zookeeper installation should consist of at least three nodes. The setup described here sets up only a single Zookeeper node, which works but is a single-point-of-failure; of course that isn’t relevant in this development environment.

Kafka Message Broker

Obviously, the point of this VM is to install the Kafka message broker. We will install just a single node; that is sufficient for testing producer and consumer applications.

The storage location for topics is configured to be “within” the VM itself, meaning that destroying the VM discards all stored data.

Kafka Connect

The point of this VM is also to install the Kafka Connect “data integration” tool. We will install two nodes running in “distributed mode” so that we can test failover by killing an instance and verifying that the surviving instance takes over the tasks previously allocated to the now-stopped node.

Prerequisites

The vagrant tool requires a Vagrantfile as specification for the VM to be created/managed. While most of this file is hypervisor-type-independent, it is possible to provide hypervisor-specific settings too. The following Vagrantfile assumes Virtualbox is being used as the hypervisor, ie

  • the local computer has Vagrant installed
  • the local computer has Virtualbox installed

Actually, the Vagrantfile is not virtual-box specific; it does have one virtualbox-related setting (amount of RAM to allocate in new VM) but that will just be ignored if this Vagrantfile is used with a different hypervisor.

Project Directory Structure

When Vagrant is executed, a configuration-file named Vagrantfile must be specified. The directory in which the executed Vagrantfile is stored is “exported” automatically to the target VM; from within the VM it is accessible at path “/vagrant”. This is the primary mechanism for providing resources to the new VM.

When the VM is a “local” VM (eg started via Virtualbox or KVM) then the directory is usually “shared” using some hypervisor-specific file-sharing mechanism. When the VM is remote (eg on AWS) or local sharing cannot be set up, then Vagrant sets up a kind of rsync-like communication with the VM so that these resources are still available to the VM at path /vagrant.

In the description below, this approach is used to provide a number of files to the VM. The following structure is actually checked into Git:

+ setup.sh
+ Vagrantfile
+ bootstrap.sh
+ kerberos/
  + krb5.conf
+ kafka/
  + config/
    + zookeeper.properties
    + kafka.properties
    + connect-log4j.properties
    + connect-distributed-1.properties
    + connect-distributed-2.properties
  + systemd/
    + zookeeper.service
    + kafka.service
    + kafka-connect-1.service
    + kafka-connect-2.service

The contents of the files will be described further later.

Initial Setup

There are some resources we would like to provide to the booted VM, but do not wish to check into version-control. I have therefore defined a file setup.sh which should be executed once to download the necessary resources into the project base directory.

Here is the setup.sh script:

wget http://www-eu.apache.org/dist/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxf kafka_2.12-0.10.2.0.tgz

As you can see, all it does is download a binary release of Kafka v0.10.2.0 and unpack it. The resulting shellscripts and jarfiles will be copied into the VM later.

The “2.12” number embedded in the filename is the version of Scala with which the Kafka code was compiled; this is important if using Scala plugins with the installed application as the scala ecosystem is not very good at binary compatibility between releases. However client/server network communications are properly stable, and so the “0.10.2” version is the important part.

The Vagrantfile

Here is the primary configuration file for this vagrant project:

# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu/xenial64"

  # Network
  config.vm.network "private_network", ip: "192.168.33.10"

  # Zookeeper port (binary)
  config.vm.network "forwarded_port", guest: 9099, host: 9099

  # Kafka port (binary)
  # It is important that kafka nodes are forwarded 1:1 without remapping, as
  # kafka clients outside the VM will ask a "bootstrap kafka server" for a
  # list of the (host,port) addresses of all other kafka nodes in the cluster,
  # and these (host,port) addresses must then be valid for the client.
  config.vm.network "forwarded_port", guest: 9095, host: 9095

  # Kafka-connect port (REST)
  config.vm.network "forwarded_port", guest: 9091, host: 9091
  config.vm.network "forwarded_port", guest: 9092, host: 9092

  # Kerberos KDC
  #
  # Allow the host (with a suitable /etc/krb5.conf file) to access the KDC in the VM
  config.vm.network "forwarded_port", guest: 88, host: 9088, protocol: "tcp"
  config.vm.network "forwarded_port", guest: 88, host: 9088, protocol: "udp"

  config.vm.provider "virtualbox" do |vb|
    #   vb.gui = true
    vb.memory = "2048"
  end

  config.vm.synced_folder ".", "/vagrant", type: "rsync"

  config.vm.provision "shell", path: "bootstrap.sh"
end

This file is actually a Ruby closure which will be executed by Vagrant at runtime. The contents have the following effects:

  • Vagrant.configure("2") do sets up a version-2 configuration specification (which ends at the matching end keyword).
  • config.vm.box specifies which base image from the Atlas image repository the VM will be booted from. This will be downloaded on first run, and cached on the local filesystem.
  • config.vm.network "private_network" specifies the IP address which the VM should be allocated. See section on networking later.
  • config.vm.network "forwarded_port" specifies which ports within the VM should be accessible as “localhost:port” from the host, or “hostaddr:port” from any system with access to the host.
  • config.vm.provider "virtualbox" do specifies options which are to be applied when the hypervisor is Virtualbox. They are ignored when another hypervisor is being used.
  • config.vm.synced_folder forces the use of rsync for host-to-guest sharing, rather than Virtualbox shared folders. See later.
  • config.vm.provision "shell" specifies that after the VM has been booted, the specified shell-script should be executed within the VM environment, as root. Vagrant nicely arranges for any output from this script to be displayed on the console on which the vagrant up command was executed.

Bootstrap Script

The work of actually configuring the system after boot from the base config.vm.box image is done (in this setup) in file bootstrap.sh. This file of course started as very simple and has grown as extra features were added. At some point it makes sense to switch from a simple shell-script to a proper provisioning tool such as Ansible or Puppet, but IMO the script is still simple enough in its current form.

#!/usr/bin/env bash
#
# Ports:
#  zookeeper: 2181
#  kafka: 9092
#  kafka-connect

#-----------------------------------------
echo "provisioning" > /tmp/provisioned.txt
apt-get update

#-----------------------------------------
# Enable Debian "unattended installation" mode (see http://www.microhowto.info/howto/perform_an_unattended_installation_of_a_debian_package.html)
export DEBIAN_FRONTEND=noninteractive

# To set specific answers to install questions:
#   echo "pkgname questionid datatype answer" | debconf-set-selections
#
# To find out the necessary pgkname/questionid/answer values, make an interactive install then
#   debconf-get-selections | grep ...

#-----------------------------------------
# Ensure that cryptography operations do not stall due to lack of entropy
apt-get --yes install haveged

#-----------------------------------------
# Set up a local Kerberos instance, with users.
#
# This setup deliberately does not use the "default realm" to avoid having misconfiguration accidentally
# work by "falling back" to the default realm when that was not really intended. Instead, two realms
# with names "UMDEMO.COM" and "UMTEST.COM" are set up with users.
#
# Heimdal Kerberos is used rather than MIT Kerberos, due to problems with MIT kerberos when configuring
# multiple realms for a single KDC instance.
#
# The following steps which install kafka-broker and kafka-connect do not yet use this KDC, ie it is
# set up but not referenced by default. 

# Install heimdal kerberos servers, with automated answers via debconf-set-selections
echo "krb5-config   krb5-config/kerberos_servers    string  localhost" | debconf-set-selections
echo "krb5-config   krb5-config/default_realm       string  DEFAULT_REALM" | debconf-set-selections
apt-get --yes install heimdal-kdc heimdal-servers

cp /vagrant/config/krb5.conf /etc/krb5.conf
echo "*/admin@UMDEMO.COM all" >> /etc/heimdal-kdc/kadmind.acl
echo "*/admin@UMTEST.COM all" >> /etc/heimdal-kdc/kadmind.acl

kadmin -l init --realm-max-ticket-life=unlimited --realm-max-renewable-life=unlimited UMDEMO.COM
kadmin -l -r UMDEMO.COM add --use-defaults --password=password umdemo/admin
kadmin -l -r UMDEMO.COM add --use-defaults --password=password umdemo1
kadmin -l -r UMDEMO.COM add --use-defaults --password=password broker1
kadmin -l -r UMDEMO.COM add --use-defaults --password=password client1

kadmin -l init --realm-max-ticket-life=unlimited --realm-max-renewable-life=unlimited UMTEST.COM
kadmin -l -r UMTEST.COM add --use-defaults --password=password umtest/admin
kadmin -l -r UMTEST.COM add --use-defaults --password=password umtest1

kadmin -l list '*'

service heimdal-kdc restart
journalctl -u heimdal-kdc

#-----------------------------------------
echo "installing Java" >> /tmp/provisioned.txt
apt-get --yes install --assume-yes openjdk-8-jdk-headless

#-----------------------------------------
echo "install Kafka" >> /tmp/provisioned.txt
adduser --quiet --system --no-create-home kafka

# Make directory for logging output (might not be used)
mkdir /var/log/kafka-connect
chown -R kafka /var/log/kafka-connect

# Make directory for application code
mkdir /opt/kafka

# Copy kafka binaries and scripts
cp -r /vagrant/kafka_2.12-* /opt/kafka
rm /opt/kafka/*.tgz*
ln -s /opt/kafka/kafka_2.12-* /opt/kafka/current

# Copy kafka configuration files
cp -r /vagrant/kafka/config /opt/kafka/config

# Copy custom kafka-connect plugins
cp -r /vagrant/kafka/plugins /opt/kafka/plugins

# Set ownership
chown -R kafka /opt/kafka

# Set up systemd services for zookeeper and kafka
echo "install kafka systemd" >> /tmp/provisioned.txt
cp /vagrant/kafka/systemd/* /etc/systemd/system
systemctl daemon-reload
systemctl enable zookeeper --now
systemctl enable kafka --now

#-----------------------------------------
echo "install kafka-connect" >> /tmp/provisioned.txt

# wait for systemd to start up kafka above
sleep 10

KAFKA_BIN="/opt/kafka/current/bin"
KAFKA_TOPIC="$KAFKA_BIN/kafka-topics.sh --zookeeper localhost:9099"

# Create the internal topics needed for kafka-connect (with replication=3 and partitions=10 for production)
$KAFKA_TOPIC --create --replication-factor 1 --partitions 2 --topic connect-offsets --config cleanup.policy=compact
$KAFKA_TOPIC --create --replication-factor 1 --partitions 2 --topic connect-configs --config cleanup.policy=compact
$KAFKA_TOPIC --create --replication-factor 1 --partitions 2 --topic connect-status  --config cleanup.policy=compact

systemctl enable kafka-connect-1 --now
systemctl enable kafka-connect-2 --now

#-----------------------------------------
echo "provisioned" >> /tmp/provisioned.txt

The base image is an Ubuntu linux version, ie uses a Debian-style package manager. These can be set up to prompt the admin for configuration details when software is being installed; that obviously is not desirable in a Vagrant config so “unintended installation mode” is set up, and debconf-set-selections is used to predefine answers to mandatory questions for the packages which are being installed, as described in the comments.

The Kerberos server creates encryption keys using /dev/random. This is not a problem in an OS running on “real hardware” as the hardware generates enough random behaviour to properly populate the Linux “entropy pool”. However it is a problem in a VM, as that has no real hardware. The result is that operations reading /dev/random within the VM can hang, waiting for sufficient entropy to be created. The haveged software solves this problem by deriving entropy from the CPU alone.

As described in the script comments, Heimdal is installed and configured as a Kerberos Key Distribution Center (KDC). Some test user/service accounts are also created. However the Zookeeper, Kafka and Kafka Connect services (daemons) are not set up to actually use the KDC - that is an exercise left to the reader :-).

The Zookeeper/Kafka/Kafka-Connect files contained in the release downloaded by shellscript setup.sh are copied into relevant places. To actually start them, systemd service files are then installed - see later. As noted earlier, the base directory for the vagrant project is “mounted” within the VM image at path /vagrant by default, making these files accessible.

Systemd Scripts

After application binaries are copied to their desired locations within the VM filesystem, they need to be started - and monitored. And the resulting VM may be stopped and started without being recreated, making it necessary to ensure the installed software is restarted as with a normal OS.

Packaged software such as Heimdal already comes with the relevant scripts. Unfortunately Zookeeper, Kafka and Kafka Connect are not “packaged” for Linux; we can copy the binaries but then need some way to start them. This is achieved by installing appropriate systemd service files, and these are defined as follows:

Zookeeper Service

There is a systemd service definition for starting a Java-based Zookeeper daemon. Zookeeper is a completely different project from Kafka, but the Kafka download includes a copy of Zookeeper for convenience.

[Unit]
Description=Apache Zookeeper server (broker) - used by Kafka
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
Group=nogroup
ExecStart=/opt/kafka/current/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
Restart=on-failure
SyslogIdentifier=kafka-zookeeper

[Install]
WantedBy=multi-user.target

There isn’t much to be discussed here - the file just starts zookeeper on OS startup.

Just one Zookeeper instance is started - enough for test purposes.

Here is the properties file for Zookeeper referenced from the systemd service file. This is simply copied from the example file distributed with the downloaded Kafka release.

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper

# the port at which the clients will connect
clientPort=9099

# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

Kafka Service

There is a systemd service definition for starting a Java-based Kafka message broker daemon.

[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target zookeeper.service
After=network.target remote-fs.target zookeeper.service

[Service]
Type=simple
User=kafka
Group=nogroup
# Ugly hack: sleep before starting so that zookeeper has a chance to open its socket before kafka runs.
ExecStartPre=/bin/sleep 10
ExecStart=/opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/kafka.properties
Restart=on-failure
SyslogIdentifier=kafka

[Install]
WantedBy=multi-user.target

Also fairly simple. As can be seen, the Kafka service has a dependency on Zookeeper.

There is a minor problem: there can be an interval of time between an application being started and it being ready to handle clients. Systemd can track dependencies, but it cannot know when this interval is past without help from the application itself - and Zookeeper does not provide any such help. My solution is a very ugly hack, as commented in the file - just a short sleep. It works in practice, at least well enough for a test environment.

Just one broker instance is started - enough for test purposes.

Here is the properties file for Kafka referenced from the systemd service file. This is mostly copied from the example file distributed with the downloaded Kafka release.

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on.
listeners=PLAINTEXT://:9095

# Hostname and port the broker will advertise to producers and consumers.
#
# For Vagrant use, it is important that the "canonical host name" is not used here.
# Kafka clients outside the VM will ask a "bootstrap kafka server node" for the
# addresses of all nodes in the cluster, then try to connect to those addresses.
# But processes outside the cluster will not be able to resolve the canonical
# host name allocated to this VM instance. Using "127.0.0.1", and requiring the
# Vagrantfile to export this port without renumbering will allow kafka clients
# inside and outside the VM to work correctly.
advertised.listeners=PLAINTEXT://127.0.0.1:9095

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Retention Policy #############################

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

zookeeper.connect=localhost:9099

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

Kafka Connect Service

And there are two systemd service definitions for starting a Java-based Kafka Connect daemon twice with different config-files:

[Unit]
Description=Apache Kafka Connect server (node 1)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target kafka.service
After=network.target remote-fs.target kafka.service

[Service]
Type=simple
User=kafka
Group=nogroup

Environment=CLASSPATH=/opt/kafka/plugins/*
Environment=LOG_DIR=/var/log/kafka-connect
Environment=KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/connect-log4j.properties

# Hack: sleep for a while to ensure kafka broker has started (not needed in production)
ExecStartPre=/bin/sleep 10
ExecStart=/opt/kafka/current/bin/connect-distributed.sh /opt/kafka/config/connect-distributed-1.properties
ExecStop=

SuccessExitStatus=143
Restart=on-failure
SyslogIdentifier=kafka-connect-1

[Install]
WantedBy=multi-user.target

The second unit-file is identical to the above one, except for its name and config-file.

Entry Environment=CLASSPATH=... sets an environment-variable which is then processed by the executed script connect-distributed.sh. This variable ensures that every jarfile in the specified directory /opt/kafka/plugins is on the JVM classpath; this provides a clean place to put jarfiles for additional “connector plugins” to be used by kafka-connect.

The “SuccessExitStatus” setting is needed because on deliberate shutdown, Kafka Connect exits with an unusual non-zero status. Without this setting, systemd will consider it to have “failed” and will try to restart it.

Here is the properties file for Kafka Connect referenced from the systemd service file. This is mostly copied from the example file connect-distributed.properties distributed with the downloaded Kafka release. There is a separate file for each kafka-connect service, but they are almost identical..

bootstrap.servers=localhost:9095

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=connect-offsets

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=connect-configs

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set the port on which the kafka-connect instance listens
rest.port=9091

Starting the VM

Simply run vagrant up in the base directory (where the Vagrantfile is), and wait a few minutes - a brand new VM is created and configured.

When using Virtualbox as the hypervisor, a subdirectory “.vagrant” is created to hold the VM image.

Network Configuration

The Vagrantfile above specifies its network address:

config.vm.network "private_network", ip: "192.168.33.10"

The behaviour is slightly hypervisor-dependent; the following information applies when Virtualbox is being used.

As a result of this private_network declaration, there are three network interfaces within the VM:

  • A NAT interface for accessing the internet.
  • A “loopback” interface with address 127.0.0.1 (as usual) which is only accessible from within the VM
  • A “host only” interface with the IP address specified in the Vagrant file

The NAT interface has an arbitrary IP address. The exact address is not relevant as there will never be any incoming connections to the VM via this interface; it is there only for the VM to make outgoing connections. The interface has associated “routing rules” which configure this interface as the “gateway” for any target IP address except 192.168.33.10 and localhost. Any outgoing connections are remapped within the host operating system to use the host’s own IP address (that is what NAT does). Applications within the VM can listen on ports on this network address (it doesn’t result in an error) but it is pointless as no inbound connections ever occur on this interface.

The loopback interface works as would be expected. An application within the VM can listen on the loopback interface, but such ports are only accessible from applications within the VM.

The “host only” (private) network is accessible from the host operating system. Virtualbox automatically creates a new interface in the host, with a name like vboxnet3. That interface has an associated IP address, though it is not particularly relevant (the address is actually the top 24 bits of the internal address followed by 1 - ie in this case 192.168.33.1). This host-side interface has associated routing rules which make it the gateway for all target IP addresses of form 192.168.33.* - thus from the host, all applications within the VM listening on ports on interface 192.168.33.10 are accessible.

The network routing rules mentioned above can be viewed in the host with Unix command route.

The “host only” network is not accessible from systems other than the host. If an application within a VM should be exposed to the local network, then this can be done via forwarded_port declarations. These cause ports on the host’s public interface address to be forwarded to ports on the VM’s interface (192.168.33.10 in this case). As a side-effect, the host can also then access listening applications within the VM via address localhost:nnnn.

Specifically, for services configured in the example presented in this article:

  • Heimdal config does not override the defaults of (addr=*, port=88) so it listens on all three VM interfaces. It is therefore accessible from the host via “192.168.33.10:88”. Due to the port-mapping it is also available to the host as “localhost:9088
  • Kafka and Kafka Connect instances listen on interface “*” and are thus visible from the host as “192.168.33.10:nnnn” and as “localhost:pppp”.

Vagrant and Synced Folders

Vagrant provides a way of sharing files between the “host” used to launch the VM and the “guest” VM image.

When the hypervisor being used is Virtualbox, then Vagrant will by default try to use virtualbox shared-folders. While this is an efficient and bi-directional file sharing technique, it unfortunately requires kernel modules to be installed within the VM which is sometimes problematic.

Vagrant provides a number of other options for sharing folders, and I find the “rsync” not only applicable to all hypervisors but actually the best option even when Virtualbox is being used. Communication is only one-way (host->vm) but sharing files in the other direction is IMO seldom needed. Changes on the host system also require vagrant rsync to be run before they are visible within the VM, but again I find such updates to be rare and the extra step not a problem.

Other Notes

I did at first consider using the Apache Directory Server (directory.apache.org) or its subproject Kerby for the KDC. However ADS has far too much functionality (complete LDAP for example), and Kerby is interesting for embedded/testing use-cases but very raw in its current 1.0-rc state. Setting up the mainstream Heimdal KDC was actually pretty easy.

References