3.2. Kafka connector#

3.2.1. Introduction#

The quasardb Kafka connector makes use of Kafka Connect to extend QuasarDB’s support to allow for storing of data using Kafka topics.

You may read and download the connector’s code from GitHub at https://github.com/bureau14/kafka-connect-qdb.

3.2.2. Installation#

3.2.2.1. QuasarDB C library#

Since our Kafka connector makes use of the QuasarDB C library, you need to install the qdb-api library on every working machine as well.

3.2.2.2. Plugin jar#

The QuasarDB Kafka Connector is distributed as an uber jar containing all of the classfiles for the plugin. You can download the latest build at https://download.quasardb.net/quasardb/nightly/api/kafka/.

After downloading the uberjar, put the file in your plugin path as defined in your Kafka Connect’s worker configuration (example below).

Please refer to the Official Kafka documentation for more information on installing plugins.

3.2.3. Configuration#

3.2.3.1. Worker#

After we have downloaded our plugin, we must put it in a location on our Kafka Connect workers’ machine and create a worker configuration.

Assuming you want to put your Kafka plugins in /usr/local/share/kafka/plugins, we tell the worker where to find the Kafka broker and plugins like this:

bootstrap.servers=kafka:9092
plugin.path=/usr/local/share/kafka/plugins

The QuasarDB Kafka Connect plugin currently only works with Avro or structured JSON. An example on how to define structured JSON is as fullows:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

In addition, we must provide some additional configuration, such as the flush interval and where it needs to keep track of its offsets (in case of crashes):

offset.storage.file.filename=/var/lib/kafka/connect.offsets
offset.flush.interval.ms=60000

This will tell the plugin to flush its buffer every minute. You might want to tune this variable, depending upon your workload.

For completeness, here is an example of the complete worker.properties:

bootstrap.servers=kafka:9092
plugin.path=/usr/local/share/kafka/plugins
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/var/lib/kafka/connect.offsets
offset.flush.interval.ms=60000

3.2.3.2. Connector#

After configuring your workers to detect the plugin, we must still write a configuration for the connector, to tell it how to write the data into QuasarDB. For this, we define a connector.properties file as follows:

name=qdb-sink
connector.class=QdbSinkConnector
tasks.max=10
topics=topic1,topic2
qdb.cluster=qdb://my.qdb-server:2836/
qdb.table_from_topic=true

This will tell the QuasarDB Sink to connect to the QuasarDB cluster at qdb://my.qdb-server:2836, and perform an automatic translation of topics to tables.

The QuasarDB Sink that all columns present in the table match a column with the same name in the Json data structure. We rely on Kafka Connect Transformations for you to extract and rename the fields to match the format in QuasarDB tables. For more information, see the official documentation.

3.2.4. Additional options#

The QuasarDB Kafka Connector provides additional options for you to use:

Option

Type

Description

qdb.security.username

String

For a secure connection, the user’s username.

qdb.security.user_private_key

String

For a secure connection, the user’s private key.

qdb.security.cluster_public_key

String

For a secure connection, the cluster’s public key.

qdb.table

String

When provided, all incoming data (regardless of topic) will be stored in this table

qdb.table_from_topic

Boolean

When provided, will use a record’s topic name as the QuasarDB table name.

qdb.table_from_column

String

When provided, looks up the value for this column inside a Record, and uses this as the table name. This allows for dynamic dispatching of records to different tables.

qdb.table_autocreate_skeleton

String

When provided, automatically creates new tables when a table is not found. Uses provided value as a skeleton table whose schema will be copied into the new table.

qdb.table_autocreate_skeleton_column

String

When provided, automatically creates new tables when a table is not found. Looks up the value for this column inside the Record, and uses this as a skeleton table whose schema will be copied into the new table.

qdb.table_autocreate_tags

List<String>

When provided, automatically assigns all tags in this column to every newly created table.

qdb.table_autocreate_tags_column

String

When provided, automatically assigns all tags to newly created tables. Looks up the tags from the column of the Record. Assumes it is a String sequence.

3.2.4.1. Running the worker#

Assuming you saved the worker.properties and connector.properties in /opt/kafka/config/, we can then launch a Kafka Connect worker like this:

/opt/kafka/bin/connect-standalone.sh /opt/kafka/config/worker.properties /opt/kafka/config/connector.properties

You should see periodic log messages of the Kafka connector flushing its output towards QuasarDB.

3.2.5. Performance Considerations#

To achieve the best performance, please take note of the following considerations when modelling your Kafka topics and installing the plugins:

  • Partition your topics by table destination; this will benefit distributed workers tremendously, and avoid the performance penalty you get when writing into the same QuasarDB shard by multiple concurrent workers

  • Try to buffer as long as possible, but not longer: ideally you let each of your Kafka workers buffer exactly one shard of data; e.g. when your shard size is 5 minutes, you provide your Kafka worker with a offset.flush.interval.ms=300000.