3.2. Kafka connector#
3.2.1. Introduction#
The quasardb Kafka connector makes use of Kafka Connect to extend QuasarDB’s support to allow for storing of data using Kafka topics.
You may read and download the connector’s code from GitHub at https://github.com/bureau14/kafka-connect-qdb.
3.2.2. Installation#
3.2.2.1. QuasarDB C library#
Since our Kafka connector makes use of the QuasarDB C library, you need to install the qdb-api library on every working machine as well.
3.2.2.2. Plugin jar#
The QuasarDB Kafka Connector is distributed as an uber jar containing all of the classfiles for the plugin. You can download the latest build at https://download.quasardb.net/quasardb/nightly/api/kafka/.
After downloading the uberjar, put the file in your plugin path as defined in your Kafka Connect’s worker configuration (example below).
Please refer to the Official Kafka documentation for more information on installing plugins.
3.2.3. Configuration#
3.2.3.1. Worker#
After we have downloaded our plugin, we must put it in a location on our Kafka Connect workers’ machine and create a worker configuration.
Assuming you want to put your Kafka plugins in /usr/local/share/kafka/plugins, we tell the worker where to find the Kafka broker and plugins like this:
bootstrap.servers=kafka:9092
plugin.path=/usr/local/share/kafka/plugins
The QuasarDB Kafka Connect plugin currently only works with Avro or structured JSON. An example on how to define structured JSON is as fullows:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
In addition, we must provide some additional configuration, such as the flush interval and where it needs to keep track of its offsets (in case of crashes):
offset.storage.file.filename=/var/lib/kafka/connect.offsets
offset.flush.interval.ms=60000
This will tell the plugin to flush its buffer every minute. You might want to tune this variable, depending upon your workload.
For completeness, here is an example of the complete worker.properties:
bootstrap.servers=kafka:9092
plugin.path=/usr/local/share/kafka/plugins
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/var/lib/kafka/connect.offsets
offset.flush.interval.ms=60000
3.2.3.2. Connector#
After configuring your workers to detect the plugin, we must still write a configuration for the connector, to tell it how to write the data into QuasarDB. For this, we define a connector.properties file as follows:
name=qdb-sink
connector.class=QdbSinkConnector
tasks.max=10
topics=topic1,topic2
qdb.cluster=qdb://my.qdb-server:2836/
qdb.table_from_topic=true
This will tell the QuasarDB Sink to connect to the QuasarDB cluster at qdb://my.qdb-server:2836, and perform an automatic translation of topics to tables.
The QuasarDB Sink that all columns present in the table match a column with the same name in the Json data structure. We rely on Kafka Connect Transformations for you to extract and rename the fields to match the format in QuasarDB tables. For more information, see the official documentation.
3.2.4. Additional options#
The QuasarDB Kafka Connector provides additional options for you to use:
Option
Type
Description
qdb.security.username
String
For a secure connection, the user’s username.
qdb.security.user_private_key
String
For a secure connection, the user’s private key.
qdb.security.cluster_public_key
String
For a secure connection, the cluster’s public key.
qdb.table
String
When provided, all incoming data (regardless of topic) will be stored in this table
qdb.table_from_topic
Boolean
When provided, will use a record’s topic name as the QuasarDB table name.
qdb.table_from_column
String
When provided, looks up the value for this column inside a Record, and uses this as the table name. This allows for dynamic dispatching of records to different tables.
qdb.table_autocreate_skeleton
String
When provided, automatically creates new tables when a table is not found. Uses provided value as a skeleton table whose schema will be copied into the new table.
qdb.table_autocreate_skeleton_column
String
When provided, automatically creates new tables when a table is not found. Looks up the value for this column inside the Record, and uses this as a skeleton table whose schema will be copied into the new table.
qdb.table_autocreate_tags
List<String>
When provided, automatically assigns all tags in this column to every newly created table.
qdb.table_autocreate_tags_column
String
When provided, automatically assigns all tags to newly created tables. Looks up the tags from the column of the Record. Assumes it is a String sequence.
3.2.4.1. Running the worker#
Assuming you saved the worker.properties and connector.properties in /opt/kafka/config/, we can then launch a Kafka Connect worker like this:
/opt/kafka/bin/connect-standalone.sh /opt/kafka/config/worker.properties /opt/kafka/config/connector.properties
You should see periodic log messages of the Kafka connector flushing its output towards QuasarDB.
3.2.5. Performance Considerations#
To achieve the best performance, please take note of the following considerations when modelling your Kafka topics and installing the plugins:
Partition your topics by table destination; this will benefit distributed workers tremendously, and avoid the performance penalty you get when writing into the same QuasarDB shard by multiple concurrent workers
Try to buffer as long as possible, but not longer: ideally you let each of your Kafka workers buffer exactly one shard of data; e.g. when your shard size is 5 minutes, you provide your Kafka worker with a offset.flush.interval.ms=300000.