Kafka Connect JDBC Source Where Clauses

Categories: BigData

The Kafka Connect JDBCSourceConnector reads from a relational database and outputs each row as a message in a kafka topic.

The config-file supports specifying the data to read as either a table-name (table.whitelist) or a custom query (query). Unfortunately, the documentation states clearly that when option query is used, and “incremental load” is also enabled then the query must not include any “where” component as the connector will itself add a where-clause and this will result in invalid SQL syntax.

Is there anything that can be done about this? For most databases, the answer is yes…

When using incremental loading via mode=timestamp+incrementing, the query emitted by the JDBCSourceConnector looks like:

  $query
  WHERE $timestamp.col.name < :1
    AND (($timestamp.col.name = :2 AND $inc.col.name > :3) OR $timestamp.col.name > :4)
  ORDER BY $timestamp.col.name, $inc.col.name ASC

The binding values are:

  • :1 is current-time minus a delay (timestamp.delay.interval.ms), so that database updates can be given a small window in which to complete before being read by kafka-connect
  • :2 and :3 are used to continue after an earlier shutdown
  • :4 is the normal case for loading new records

Obviously, $query cannot contain a where-clause right? Wrong! A clever user on stack-overflow pointed out that a subselect can be used:

query=select * from (select a,b,c from sometable where ....)

And just by the way, there is currently a bug in the JDBCSourceConnector which prevents table names of form “schema.table” from being specified - which means that it is not possible to use option table.whitelist to read from a table which is in a schema which is not the default schema of the login user account. The workaround is to use the “query” config option. Another possible workaround is to add &currentSchema= to the JBDC connection string.