Kafka Connect JDBC Source Where Clauses
The Kafka Connect JDBCSourceConnector reads from a relational database and outputs each row as a message in a kafka topic.
The config-file supports specifying the data to read as either a table-name (
table.whitelist) or a custom query (
query). Unfortunately, the documentation states clearly that when option
query is used, and “incremental load” is also enabled then the query must not include any “where” component as the connector will itself add a where-clause and this will result in invalid SQL syntax.
Is there anything that can be done about this? For most databases, the answer is yes…
When using incremental loading via
mode=timestamp+incrementing, the query emitted by the JDBCSourceConnector looks like:
WHERE $timestamp.col.name < :1
AND (($timestamp.col.name = :2 AND $inc.col.name > :3) OR $timestamp.col.name > :4)
ORDER BY $timestamp.col.name, $inc.col.name ASC
The binding values are:
:1is current-time minus a delay (
timestamp.delay.interval.ms), so that database updates can be given a small window in which to complete before being read by kafka-connect
:3are used to continue after an earlier shutdown
:4is the normal case for loading new records
$query cannot contain a where-clause right? Wrong! A clever user on stack-overflow pointed out that a subselect can be used:
query=select * from (select a,b,c from sometable where ....)
And just by the way, there is currently a bug in the JDBCSourceConnector which prevents table names of form “schema.table” from being specified - which means that it is not possible to use option
table.whitelist to read from a table which is in a schema which is not the default schema of the login user account. The workaround is to use the “query” config option. Another possible workaround is to add
¤tSchema= to the JBDC connection string.