The quasardb Kafka connector makes use of Kafka Connect to extend QuasarDB’s support to allow for storing of data using Kafka topics.
You may read and download the connector’s code from GitHub at https://github.com/bureau14/kafka-connect-qdb.
Since our Kafka connector makes use of the QuasarDB C library, you need to install the qdb-api library on every working machine as well.
The QuasarDB Kafka Connector is distributed as an uber jar containing all of the classfiles for the plugin. You can download the latest build at https://download.quasardb.net/quasardb/nightly/api/kafka/.
After downloading the uberjar, put the file in your plugin path as defined in your Kafka Connect’s worker configuration (example below).
Please refer to the Official Kafka documentation for more information on installing plugins.
After we have downloaded our plugin, we must put it in a location on our Kafka Connect workers’ machine and create a worker configuration.
Assuming you want to put your Kafka plugins in /usr/local/share/kafka/plugins, we tell the worker where to find the Kafka broker and plugins like this:
The QuasarDB Kafka Connect plugin currently only works with Avro or structured JSON. An example on how to define structured JSON is as fullows:
key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true
In addition, we must provide some additional configuration, such as the flush interval and where it needs to keep track of its offsets (in case of crashes):
This will tell the plugin to flush its buffer every minute. You might want to tune this variable, depending upon your workload.
For completeness, here is an example of the complete worker.properties:
bootstrap.servers=kafka:9092 plugin.path=/usr/local/share/kafka/plugins key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.file.filename=/var/lib/kafka/connect.offsets offset.flush.interval.ms=60000
After configuring your workers to detect the plugin, we must still write a configuration for the connector, to tell it how to write the data into QuasarDB. For this, we define a connector.properties file as follows:
name=qdb-sink connector.class=QdbSinkConnector tasks.max=10 topics=topic1,topic2 qdb.cluster=qdb://my.qdb-server:2836/ qdb.table_from_topic=true
This will tell the QuasarDB Sink to connect to the QuasarDB cluster at qdb://my.qdb-server:2836, and perform an automatic translation of topics to tables.
The QuasarDB Sink that all columns present in the table match a column with the same name in the Json data structure. We rely on Kafka Connect Transformations for you to extract and rename the fields to match the format in QuasarDB tables. For more information, see the official documentation.
The QuasarDB Kafka Connector provides additional options for you to use:
For a secure connection, the user’s username.
For a secure connection, the user’s private key.
For a secure connection, the cluster’s public key.
When provided, all incoming data (regardless of topic) will be stored in this table
When provided, will use a record’s topic name as the QuasarDB table name.
When provided, looks up the value for this column inside a Record, and uses this as the table name. This allows for dynamic dispatching of records to different tables.
When provided, automatically creates new tables when a table is not found. Uses provided value as a skeleton table whose schema will be copied into the new table.
When provided, automatically creates new tables when a table is not found. Looks up the value for this column inside the Record, and uses this as a skeleton table whose schema will be copied into the new table.
When provided, automatically assigns all tags in this column to every newly created table.
When provided, automatically assigns all tags to newly created tables. Looks up the tags from the column of the Record. Assumes it is a String sequence.
Assuming you saved the worker.properties and connector.properties in /opt/kafka/config/, we can then launch a Kafka Connect worker like this:
/opt/kafka/bin/connect-standalone.sh /opt/kafka/config/worker.properties /opt/kafka/config/connector.properties
You should see periodic log messages of the Kafka connector flushing its output towards QuasarDB.
To achieve the best performance, please take note of the following considerations when modelling your Kafka topics and installing the plugins:
Partition your topics by table destination; this will benefit distributed workers tremendously, and avoid the performance penalty you get when writing into the same QuasarDB shard by multiple concurrent workers
Try to buffer as long as possible, but not longer: ideally you let each of your Kafka workers buffer exactly one shard of data; e.g. when your shard size is 5 minutes, you provide your Kafka worker with a offset.flush.interval.ms=300000.