3.1. Tutorial¶
3.1.1. Requirements¶
Before you can get started, please ensure that:
You have the latest version of the QuasarDB client library installed on your computer
You have access to a running QuasarDB cluster.
The rest of this document assumes you have a cluster up and running under qdb://127.0.0.1:2836.
3.1.2. Installing libraries¶
pip install quasardb
pip install quasardb
# Ensure we can use https-based apt repos
$ apt install apt-transport-https ca-certificates -y
# Add the QuasarDB repository as a trusted repo
$ echo "deb [trusted=yes] https://repo.quasardb.net/apt/ /" > /etc/apt/sources.list.d/quasardb.list
# Refresh your apt cache
$ apt update
# In addition to the QuasarDB API, install cmake and g++ which are required
# for ad-hoc compilation of the QuasarDB Python module.
$ apt install qdb-api cmake g++ python3-dev
# Ensure you have the latest version of wheel
$ pip install wheel
# Ensure you have the latest version of setuptools
$ pip install --upgrade setuptools
# Install the Python module
$ pip install quasardb
# Enable SCL for recent gcc
$ yum install centos-release-scl -y
# Enable EPEL for recent cmake
$ yum install epel-release -y
# Enable QuasarDB Repository
$ echo $'[quasardb]\nname=QuasarDB repo\nbaseurl=https://repo.quasardb.net/yum/\nenabled=1\ngpgcheck=0' > /etc/yum.repos.d/quasardb.repo
$ yum install devtoolset-7-gcc-c++ cmake3 make qdb-api python3-devel python3-wheel
# Make cmake3 the default
$ alternatives --install /usr/bin/cmake cmake /usr/bin/cmake3 10
# Start using gcc 7
$ scl enable devtoolset-7 bash
# Default RHEL7 setuptools is not recent enough
$ pip install --upgrade setuptools
# Install the Python module
$ pip install quasardb
<dependency>
<groupId>net.quasardb</groupId>
<artifactId>qdb</artifactId>
<version>3.9.9</version>
</dependency>
dependencies {
compile "net.quasardb.qdb:3.9.9"
}
# Note: the Go api requires that you have the C api installed
# See the C api for more details
go get -d github.com/bureau14/qdb-api-go
# After adding 'import qdb "github.com/bureau14/qdb-api-go"' to one of your source files
# Note: the Go api requires that you have the C api installed
# See the C api for more details
go mod init
go get
npm install --save-dev github:bureau14/qdb-api-nodejs#3.9.9
npm install --save-dev github:bureau14/qdb-api-nodejs#3.9.9
# For Linux we recommend building the package from source which requires
# that you have the C api installed. See the C api for more details
npm install --build-from-source --save-dev github:bureau14/qdb-api-nodejs#3.9.9
<PackageReference Include="quasardb" Version="3.8.7" />
Install-Package quasardb -Version 3.8.7
Use the windows installer executable, select the C api option.
Choose one of the following installer depending on your platform:
> qdb-x.y.z-windows-64bit-setup.exe
> qdb-x.y.z-windows-32bit-setup.exe
Extract the tarball to your preferred location:
tar xzf qdb-x.y.z-darwin-64bit-c-api.tar.gz --no-same-owner -C /path/to/lib
# Ensure we can use https-based apt repos
$ apt install apt-transport-https ca-certificates -y
# Add the QuasarDB repository as a trusted repo
$ echo "deb [trusted=yes] https://repo.quasardb.net/apt/ /" > /etc/apt/sources.list.d/quasardb.list
# Refresh your apt cache
$ apt update
# Install
$ apt install qdb-api
# Enable QuasarDB Repository
$ echo $'[quasardb]\nname=QuasarDB repo\nbaseurl=https://repo.quasardb.net/yum/\nenabled=1\ngpgcheck=0' > /etc/yum.repos.d/quasardb.repo
$ yum install qdb-api
3.1.3. Importing libraries¶
Most languages require you to import the relevant QuasarDB modules before you can use them, so we start out with them.
import json
import quasardb
import numpy as np
import quasardb
import quasardb.pandas as qdbpd
import pandas as pd
import numpy as np
import net.quasardb.qdb.*;
import net.quasardb.qdb.ts.*;
import net.quasardb.qdb.jni.*;
import net.quasardb.qdb.exception.*;
import qdb "github.com/bureau14/qdb-api-go"
var qdb = require('quasardb')
using Quasardb;
using Quasardb.TimeSeries;
#include <qdb/client.h>
#include <qdb/tag.h>
#include <qdb/ts.h>
3.1.4. Connection management¶
Establishing a connection with the QuasarDB cluster is easy. You need the URI of at least one of your nodes, and the client will automatically detect all nodes in the cluster.
A QuasarDB cluster operates in either a secure or an insecure mode. If you do not know whether your cluster is running in secure mode, please ask your system administrator.
3.1.4.1. Insecure connection¶
with quasardb.Cluster("qdb://127.0.0.1:2836") as c:
Session c;
try {
c = Session.connect(new Session.SecurityOptions(username,
user_secret_key,
cluster_public_key),
"qdb://127.0.0.1:2838");
} catch (ConnectionRefusedException ex) {
System.err.println("Failed to connect to cluster, make sure server is running!");
System.exit(1);
}
clusterURI := "qdb://127.0.0.1:2836"
timeoutDuration := time.Duration(120) * time.Second
handle, err := qdb.SetupHandle(clusterURI, timeoutDuration)
var cluster = new qdb.Cluster('qdb://127.0.0.1:2836')
cluster.connect(function() {
// Connected successfully
return callback(null, cluster)
}, function(err) {
// Connection error
return callback(err, null)
})
var c = new QdbCluster("qdb://127.0.0.1:2836");
// We first need to open a handle, which is is the memory structure that
// QuasarDB uses to maintain connection state.
qdb_handle_t handle;
qdb_error_t error = qdb_open(&handle, qdb_p_tcp);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Now that we have opened the handle, we can tell it to establish a connection
// with the cluster.
error = qdb_connect(handle, "qdb://localhost:2836");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
curl -X POST \
-H "Content-Type: application/json" \
-d '{"username": "$USERNAME", "secret_key": "$SECRET"}' \
http://127.0.0.1:40080/api/login
3.1.4.2. Secure connection¶
In case of a secure connection, we need to provide a few additional parameters:
A username;
A user private key;
A cluster public key.
More information on QuasarDB’s security mechanisms can be found in our security manual.
If you do not know the values of these parameters, please ask your system administrator.
with quasardb.Cluster(uri='qdb://127.0.0.1:2836',
user_name=user_key['username'],
user_private_key=user_key['secret_key'],
cluster_public_key=cluster_key) as scs:
Session c;
try {
c = Session.connect(new Session.SecurityOptions(username,
user_secret_key,
cluster_public_key),
"qdb://127.0.0.1:2838");
} catch (ConnectionRefusedException ex) {
System.err.println("Failed to connect to cluster, make sure server is running!");
System.exit(1);
}
clusterURI := "qdb://127.0.0.1:2836"
timeoutDuration := time.Duration(120) * time.Second
clusterPublicKeyPath := "/path/to/cluster_public.key"
usersPrivateKeyPath := "/path/to/user_private.key"
handle, err := qdb.SetupSecuredHandle(
clusterURI,
clusterPublicKeyPath,
usersPrivateKeyPath,
timeoutDuration,
qdb.EncryptNone,
)
var secureCluster = new qdb.Cluster(
'qdb://127.0.0.1:2836',
'/path/to/cluster_public.key',
'/path/to/user_private.key'
)
secureCluster.connect(function() {
// Connected successfully
return callback(null, cluster)
}, function(err) {
// Connection error
return callback(err, null)
})
var sc = new QdbCluster(secureClusterURI, clusterPublicKey, userName, userPrivateKey);
// We first need to open a handle, which is is the memory structure that
// QuasarDB uses to maintain connection state.
qdb_handle_t handle;
qdb_error_t error = qdb_open(&handle, qdb_p_tcp);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Load the encoded key
error = qdb_option_set_cluster_public_key(handle, "cluster_public_key");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Then the username and its associated encoded key
error = qdb_option_set_user_credentials(handle, "user", "user_private_key");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// another option is to load directly from the security files
error = qdb_option_load_security_files(handle, "cluster_public_key.txt", "user_credentials.txt");
// Now that we have opened the handle, we can tell it to establish a connection
// with the cluster.
error = qdb_connect(handle, "qdb://localhost:2836");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
curl -X POST \
-H "Content-Type: application/json" \
-d '{"username": "$USERNAME", "secret_key": "$SECRET"}' \
http://127.0.0.1:40080/api/login
3.1.5. Creating a table¶
Before we can store timeseries data, we need to create a table. A table is uniquely identified by a name (e.g. “stocks” or “sensors”) and can have 1 or more columns.
In this example we will create a table “stocks” with three columns, “open”, “close” and “volume”. The respective types of the columns are two double precision floating point values and a 64-bit signed integer.
# First we acquire a reference to a table (which may or may not yet exist)
t = c.table("stocks")
# Initialize our column definitions
cols = [quasardb.ColumnInfo(quasardb.ColumnType.Double, "open"),
quasardb.ColumnInfo(quasardb.ColumnType.Double, "close"),
quasardb.ColumnInfo(quasardb.ColumnType.Int64, "volume")]
# Now create the table with the default shard size
t.create(cols)
Column[] definitions = {
new Column.Double ("open"),
new Column.Double ("close"),
new Column.Int64 ("volume")
};
// This will return a reference to the newly created Table
Table t = Table.create(c, "stocks", definitions);
table := handle.Timeseries("stocks")
columnsInfo := []qdb.TsColumnInfo{
qdb.NewTsColumnInfo("open", qdb.TsColumnDouble),
qdb.NewTsColumnInfo("close", qdb.TsColumnDouble),
qdb.NewTsColumnInfo("volume", qdb.TsColumnInt64),
}
shardSizeDuration := 24 * time.Hour
err := table.Create(shardSizeDuration, columnsInfo...)
var table = cluster.ts('stocks')
table.create([
qdb.DoubleColumnInfo('open'),
qdb.DoubleColumnInfo('close'),
qdb.Int64ColumnInfo('volume')
], function(err) {
if (err) {
// Failed to create table
return callback(err)
}
// Successfully created table
// First we acquire a reference to a table (which may or may not yet exist)
var ts = c.Table("stocks");
// Initialize our column definitions
var columns = new QdbColumnDefinition[]{
new QdbDoubleColumnDefinition("open"),
new QdbDoubleColumnDefinition("close"),
new QdbInt64ColumnDefinition("volume")};
// Now create the table with the default shard size
ts.Create(columns);
// Initialize our columns definitions
const qdb_ts_column_info_t columns[3] = {
{.name = "open", .type = qdb_ts_column_double}, //
{.name = "close", .type = qdb_ts_column_double}, //
{.name = "volume", .type = qdb_ts_column_int64} //
};
const int columns_count = sizeof(columns) / sizeof(qdb_ts_column_info_t);
// Now create the table with the default shard size
qdb_error_t error = qdb_ts_create(handle, "stocks", qdb_d_default_shard_size, columns, columns_count);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
3.1.5.1. Attaching tags¶
QuasarDB allows you to manage your tables by attaching tags to them. For more information about tags, see Managing tables with Tags.
In the example below, we will attach the tag nasdaq
to the “stocks” table we created.
t.attach_tag("nasdaq")
// You can also attach a tag by only providing the table string. See the
// javadocs for other ways to call this function.
Table.attachTag(c, t, "nasdaq");
// Once you've created a table you can attach tags to it
err = table.AttachTag("nasdaq")
table.attachTag("nasdaq", function (err) {
if (err) {
callback(err)
}
callback()
})
ts.AttachTag(c.Tag("nasdaq"));
error = qdb_attach_tag(handle, "stocks", "nasdaq");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
3.1.6. A word about API types¶
Now that we have our tables in place, it’s time to start interacting with actual data. On a high-level, QuasarDB provides two different APIs for you to insert data:
A row-based API, where you insert data on a row-by-row basis. This API is referred to as our “batch inserter”. This API provides stronger guarantees in terms of consistency.
A column-based API, where you insert pure timeseries data per column. This data is typically aligned per timestamp, and therefor assumes unique timestamps.
If you’re unsure which API is best for you, start out with the row-based insertion API, the batch inserter.
You should now continue with either the row oriented or the column oriented tutorials.
3.1.7. Row oriented API¶
3.1.7.1. Batch inserter¶
The QuasarDB batch inserter provides you with a row-oriented interface to send data to the QuasarDB cluster. The data is buffered client-side and sent in batches, ensuring efficiency and consistency.
The batch writer has various modes of operation, each with different tradeoffs:
Insertion mode |
Description |
Use case(s) |
---|---|---|
Default |
Transactional insertion mode that employs Copy-on-Write |
General purpose |
Fast |
Transactional insert that does not employ Copy-on-Write. Newly written data may be visible to queries before the transaction is fully completed. |
Streaming data, many small incremental writes |
Asynchronous |
Data is buffered in-memory in the QuasarDB daemon nodes before writing to disk. Data from multiple sources is buffered together, and periodically flushed to disk. |
Streaming data where multiple processes simultaneously write into the same table(s) |
Truncate (a.k.a. “upsert”) |
Replaces any existing data with the provided data. |
Replay of historical data |
When in doubt, we recommend you use the default insertion mode.
The steps involved in using the batch writer API is as follows:
Initialize a local batch inserter instance, providing it with the tables and columns you want to insert data for. Note that specifying multiple tables is supported: this will allow you to insert data into multiple tables in one atomic operation.
Prepare/buffer the batch you want to insert. Buffering locally before sending ensures that the tranmission of the data is happening at maximum throughput, ensuring server-side efficiency.
Push the batch to the cluster.
If necessary, go back to step 2 to send additional batches.
We recommend you use batch sizes as large as possible: between 50k and 500k rows is optimal.
In the example below we will insert two different rows for two separate days into our “stocks” table.
# We need to tell the batch inserter which columns we plan to insert. Note
# how we give it a hint that we expect to insert 2 rows for each of these columns.
batch_columns = [quasardb.BatchColumnInfo("stocks", "open", 2),
quasardb.BatchColumnInfo("stocks", "close", 2),
quasardb.BatchColumnInfo("stocks", "volume", 2)]
# Now that we know which columns we want to insert, we initialize our batch inserter.
inserter = c.inserter(batch_columns)
# Insert the first row: to start a new row, we must provide it with a mandatory
# timestamp that all values for this row will share. QuasarDB will use this timestamp
# as its primary index.
#
# QuasarDB only supports nanosecond timestamps, so we must specifically convert our
# dates to nanosecond precision.
inserter.start_row(np.datetime64('2019-02-01', 'ns'))
# We now set the values for our columns by their relative offsets: column 0 below
# refers to the first column we provide in the batch_columns variable above.
inserter.set_double(0, 3.40)
inserter.set_double(1, 3.50)
inserter.set_int64(2, 10000)
# We tell the batch inserter to start a new row before we can set the values for the
# next row.
inserter.start_row(np.datetime64('2019-02-02', 'ns'))
inserter.set_double(0, 3.50)
inserter.set_double(1, 3.55)
inserter.set_int64(2, 7500)
# Now that we're done, we push the buffer as one single operation.
inserter.push()
# Prepare the entire DataFrame which we wish to store
data = {'open': [3.40, 3.50],
'close': [3.50, 3.55],
'volume': [10000, 7500]}
timestamps = np.array([np.datetime64('2019-02-01'),
np.datetime64('2019-02-02')],
dtype='datetime64[ns]')
df = pd.DataFrame(data=data,
index=timestamps)
# By providing the create=True parameter, we explicitly tell the Pandas connector to
# create a new table based on the schema
# of the DataFrame.
qdbpd.write_dataframe(df, c, "stocks", create=True)
// We initialize a Writer here that automatically flushes rows as we insert
// them, by default every 50,000 rows. If we want to explicitly control these
// flushes, use `Table.writer()` instead.
Writer w = Table.autoFlushWriter(c, "stocks");
// Insert the first row: to start a new row, we must provide it with a mandatory
// timestamp that all values for this row will share. QuasarDB will use this timestamp
// as its primary index.
w.append(new Timespec(Instant.ofEpochSecond(1548979200)),
new Value[] {
Value.createDouble(3.40),
Value.createDouble(3.50),
Value.createInt64(10000)
});
// Inserting the next row is a matter of just calling append.
w.append(new Timespec(Instant.ofEpochSecond(1549065600)),
new Value[] {
Value.createDouble(3.50),
Value.createDouble(3.55),
Value.createInt64(7500)
});
// Now that we're done, we push the buffer as one single operation. Note that,
// because in this specific example we are using the autoFlushWriter, this would
// happen automatically under the hood every append() invocations.
w.flush();
batchColumnInfo := []qdb.TsBatchColumnInfo{
qdb.NewTsBatchColumnInfo("stocks", "open", 2),
qdb.NewTsBatchColumnInfo("stocks", "close", 2),
qdb.NewTsBatchColumnInfo("stocks", "volume", 2),
}
batch, err := handle.TsBatch(batchColumnInfo...)
if err != nil {
return err
}
batch.StartRow(time.Unix(1548979200, 0))
batch.RowSetDouble(0, 3.40)
batch.RowSetDouble(1, 3.50)
batch.RowSetInt64(2, 10000)
batch.StartRow(time.Unix(1549065600, 0))
batch.RowSetDouble(0, 3.50)
batch.RowSetDouble(1, 3.55)
batch.RowSetInt64(2, 7500)
err = batch.Push()
// We initialize a writer our batch writer.
var writer = ts.Writer();
// Alternatively we could select specific columns
// var writer = ts.Writer(new QdbColumnDefinition[]{
// new QdbDoubleColumnDefinition("open")
//});
// Insert the first row: to start a new row, we must provide it with a mandatory
// timestamp that all values for this row will share. QuasarDB will use this timestamp
// as its primary index.
writer.StartRow(new DateTime(2019, 02, 01));
// We now set the values for our columns by their relative offsets: column 0 below
// refers to the first column we provide in the columns variable above.
writer.SetDouble(0, 3.40);
writer.SetDouble(1, 3.50);
writer.SetInt64(2, 10000);
// We tell the batch writer to start a new row before we can set the values for the
// next row.
writer.StartRow(new DateTime(2019, 02, 02));
writer.SetDouble(0, 3.50);
writer.SetDouble(1, 3.55);
writer.SetInt64(2, 7500);
// Now that we're done, we push the buffer as one single operation.
writer.Push();
// Initialize our batch columns definitions
const qdb_ts_batch_column_info_t batch_columns[3] = {
{.timeseries = "stocks", .column = "open", .elements_count_hint = 2}, //
{.timeseries = "stocks", .column = "close", .elements_count_hint = 2}, //
{.timeseries = "stocks", .column = "volume", .elements_count_hint = 2} //
};
const int batch_columns_count = sizeof(batch_columns) / sizeof(qdb_ts_batch_column_info_t);
// create our batch handle
qdb_batch_table_t table;
error = qdb_ts_batch_table_init(handle, batch_columns, batch_columns_count, &table);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// The batch API is row oriented, we first setup the start timestamp of the row
// Set timestamp to 2019-02-01
qdb_timespec_t timestamp = {.tv_sec = 1548979200, .tv_nsec = 0};
qdb_ts_batch_start_row(table, ×tamp);
// Then set the values for each column
qdb_ts_batch_row_set_double(table, 0, 3.40);
qdb_ts_batch_row_set_double(table, 1, 3.50);
qdb_ts_batch_row_set_int64(table, 2, 10000);
// Add another row
// Set timestamp to 2019-02-02
timestamp.tv_sec = 1549065600;
qdb_ts_batch_start_row(table, ×tamp);
qdb_ts_batch_row_set_double(table, 0, 3.50);
qdb_ts_batch_row_set_double(table, 1, 3.55);
qdb_ts_batch_row_set_int64(table, 2, 7500);
// Push into the database as a single operation
error = qdb_ts_batch_push(table);
// Don't forget to release the table
qdb_release(handle, table);
3.1.7.2. Bulk reader¶
On the other side of the row-oriented API we have the “bulk reader”. The bulk reader provides streaming access to a single table, optionally limited by certain columns and/or certain time ranges.
If you want to have efficient row-oriented access to the raw data in a table, this is the API you want to use. If you want to execute aggregates, complex where clauses and/or multi-table joins, please see the query API.
The example below will show you how to read our stock data for just a single day.
# We can initialize a bulk reader based directly from our table. By
# providing a dict=True parameter, the QuasarDB API will automatically
# expose our rows as dicts.
reader = t.reader(dict=True, ranges=[(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])
# The bulk reader is exposed as a regular Python iterator
for row in reader:
# We can access the row locally within our loop:
print(row)
# But because the QuasarDB Python API is zero-copy, our row maintains a
# reference to the underlying data. If we want to keep the row data alive
# longer than the local scope, you can use row.copy() as follows:
do_something_async_with(row.copy())
ranges = [(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))]
t = c.table("stocks")
# By providing the row_index=True parameter, we explicitly tell the Pandas connector
# to retrieve rows using the row-oriented bulk reader and use the default Pandas sequential
# row index. This is useful if your dataset is sparse and may contain null values.
df = qdbpd.read_dataframe(t, row_index=True, ranges=ranges)
// We first initialize the TimeRange we are looking for. Providing a timerange
// to a bulk reader is mandatory.
TimeRange[] ranges = new TimeRange[] { new TimeRange(new Timespec(Instant.ofEpochSecond(1548979200)),
new Timespec(Instant.ofEpochSecond(1549065600))) };
// In this example, we initialize a bulk reader by simply providing a session,
// table name and timerange we're interested in. For alternative ways to initialize
// a bulk reader, please refer to the javadoc of the Table class.
Reader r = Table.reader(c, "stocks", ranges);
// The reader implements an Iterator interface which allows us to traverse the rows:
while (r.hasNext()) {
WritableRow row = r.next();
// Each row has a timestamp which you can access as a Timespec:
System.out.println("row timestamp: " + row.getTimestamp().toString());
// Note that the offsets of the values array align with the offsets we used
// when creating the table, i.e. 0 means "open", 1 means "close" and 2 means
// "volume":
Value[] values = row.getValues();
Value openValue = values[0];
Value closealue = values[1];
Value volumeValue = values[2];
}
table := handle.Timeseries("stocks")
bulk, err := table.Bulk()
if err != nil {
return err
}
err = bulk.GetRanges(qdb.NewRange(time.Unix(1548979200, 0), time.Unix(1549065601, 0)))
if err != nil {
return err
}
for {
timestamp, err := bulk.NextRow()
if err != nil {
break
}
open, err := bulk.GetDouble()
if err != nil {
return err
}
close, err := bulk.GetDouble()
if err != nil {
return err
}
volume, err := bulk.GetInt64()
if err != nil {
return err
}
fmt.Printf("timestamp: %s, open: %v, close: %v, volume: %v\n", timestamp, open, close, volume)
}
// We can initialize a bulk reader based directly from our table.
var reader = ts.Reader();
ts.Reader(new QdbTimeInterval(new DateTime(2019, 02, 01), new DateTime(2019, 02, 02)));
// The bulk reader is exposed as a regular .Net Enumerable
foreach (var row in reader)
{
// Each row has a timestamp which you can access as a Timespec:
Console.WriteLine($"row timestamp: {row.Timestamp}");
// Note that the offsets of the values array align with the offsets we used
// when creating the table, i.e. 0 means "open", 1 means "close" and 2 means
// "volume":
var openValue = row[0].DoubleValue;
var closealue = row[1].DoubleValue;
var volumeValue = row[2].Int64Value;
}
// We can initialize our bulk reader directly from the columns we defined earlier
qdb_local_table_t local_table;
error = qdb_ts_local_table_init(handle, "stocks", columns, columns_count, &local_table);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Setup a range going from 2019-02-01 to 2019-02-02
qdb_ts_range_t range = {.begin = {.tv_sec = 1548979200, .tv_nsec = 0}, .end = {.tv_sec = 1549065600, .tv_nsec = 0}};
error = qdb_ts_table_get_ranges(local_table, &range, 1u);
while (!qdb_ts_table_next_row(local_table, ×tamp))
{
double value_index_zero = 0;
double value_index_one = 0;
qdb_int_t value_index_two = 0;
error = qdb_ts_row_get_double(local_table, 0, &value_index_zero);
// put cleanup logic here in case of error
error = qdb_ts_row_get_double(local_table, 1, &value_index_one);
// put cleanup logic here in case of error
error = qdb_ts_row_get_int64(local_table, 2, &value_index_two);
// put cleanup logic here in case of error
}
// don't forget to release the table once finished
qdb_release(handle, local_table);
The next section will show you how to store and retrieve the same dataset using the column-oriented API. If this is irrelevant to you, it’s safe to skip directly to the query API.
3.1.8. Column oriented API¶
The other high level APIs QuasarDB offers are the column-oriented API. These APIs are more lightweight than the row-oriented APIs, and provides a good alternative if your dataset is shaped correctly.
3.1.8.1. Storing timeseries¶
To store a single timeseries, all you have to do is provide a sequence of timestamp / value pairs, and which column you want to store them as.
# Our API is built on top of numpy, and provides zero-copy integration with native
# numpy arrays. As such, we first prepare three different arrays for each of our three
# columns:
opens = np.array([3.40, 3.50], dtype=np.float64)
closes = np.array([3.50, 3.55], dtype=np.float64)
volumes = np.array([10000, 7500], dtype=np.int64)
# Seperately, we generate a numpy array of timestamps. Since our three columns share
# the same timestamps, we can reuse this array for all of them, but this is not required.
timestamps = np.array([np.datetime64('2019-02-01'), np.datetime64('2019-02-02')], dtype='datetime64[ns]')
# When inserting, we provide the value arrays en timestamp arrays separately.
t.double_insert("open", timestamps, opens)
t.double_insert("close", timestamps, closes)
t.int64_insert("volume", timestamps, volumes)
# The Pandas connector maps QuasarDB's column-oriented API to Pandas Series. As such,
# we initialize three Series for each of our three columns.
#
# Seperately, we generate a numpy array of timestamps. Since our three columns share
# the same timestamps, we can reuse this as the index for all our Series.
timestamps = np.array([np.datetime64('2019-02-01'), np.datetime64('2019-02-02')], dtype='datetime64[ns]')
opens = pd.Series(data=[3.40, 3.50], index=timestamps, dtype=np.float64)
closes = pd.Series(data=[3.50, 3.55], index=timestamps, dtype=np.float64)
volumes = pd.Series(data=[10000, 7500], index=timestamps, dtype=np.int64)
# We use the write_series function to insert column by column.
qdbpd.write_series(opens, t, "open")
qdbpd.write_series(closes, t, "close")
qdbpd.write_series(volumes, t, "volume")
table := handle.Timeseries("stocks")
openColumn := table.DoubleColumn("open")
closeColumn := table.DoubleColumn("close")
volumeColumn := table.Int64Column("volume")
t1 := time.Unix(1600000000, 0)
t2 := time.Unix(1610000000, 0)
openPoints := []qdb.TsDoublePoint{
qdb.NewTsDoublePoint(t1, 3.40),
qdb.NewTsDoublePoint(t2, 3.40),
}
closePoints := []qdb.TsDoublePoint{
qdb.NewTsDoublePoint(t1, 3.50),
qdb.NewTsDoublePoint(t2, 3.55),
}
volumePoints := []qdb.TsInt64Point{
qdb.NewTsInt64Point(t1, 10000),
qdb.NewTsInt64Point(t2, 7500),
}
err := openColumn.Insert(openPoints...)
if err != nil {
return err
}
err = closeColumn.Insert(closePoints...)
if err != nil {
return err
}
err = volumeColumn.Insert(volumePoints...)
if err != nil {
return err
}
var table = cluster.ts('stocks')
table.columns(function(err, columns) {
if (err) {
return callback(err)
}
var open = columns[0]
var close = columns[1]
var volume = columns[2]
open.insert([
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)), 3.40),
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2020, 10, 5, 4)), 3.50)
], function(err) {
if (err) {
return callback(err)
}
close.insert([
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)), 3.50),
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2020, 10, 5, 4)), 3.55)
], function(err) {
if (err) {
return callback(err)
}
volume.insert([
qdb.Int64Point(qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)), 10000),
qdb.Int64Point(qdb.Timestamp.fromDate(new Date(2020, 10, 5, 4)), 7500)
], function(err) {
if (err) {
return callback(err)
}
return callback()
})
})
})
})
// Prepare some data to be inserted
var opens = new QdbDoublePointCollection { { new DateTime(2019, 02, 01), 3.40 }, { new DateTime(2019, 02, 02), 3.50 } };
var closes = new QdbDoublePointCollection { { new DateTime(2019, 02, 01), 3.50 }, { new DateTime(2019, 02, 02), 3.55 } };
var volumes = new QdbInt64PointCollection { { new DateTime(2019, 02, 01), 10000 }, { new DateTime(2019, 02, 02), 7500 } };
// Retrieve the different columns from our table
var openCol = ts.DoubleColumns["open"];
var closeCol = ts.DoubleColumns["close"];
var volumeCol = ts.Int64Columns["volume"];
// Insert data for each column
openCol.Insert(opens);
closeCol.Insert(closes);
volumeCol.Insert(volumes);
// Prepare the points for each column
const qdb_ts_double_point opens[2] = {
{.timestamp = {.tv_sec = 1548979200, .tv_nsec = 0}, .value = 3.4}, //
{.timestamp = {.tv_sec = 1549065600, .tv_nsec = 0}, .value = 3.5} //
};
const qdb_ts_double_point closes[2] = {
{.timestamp = {.tv_sec = 1548979200, .tv_nsec = 0}, .value = 3.50}, //
{.timestamp = {.tv_sec = 1549065600, .tv_nsec = 0}, .value = 3.55} //
};
const qdb_ts_int64_point volumes[2] = {
{.timestamp = {.tv_sec = 1548979200, .tv_nsec = 0}, .value = 7500}, //
{.timestamp = {.tv_sec = 1549065600, .tv_nsec = 0}, .value = 10000} //
};
// Insert each column independently
error = qdb_ts_double_insert(handle, "stocks", "open", opens, 2u);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
error = qdb_ts_double_insert(handle, "stocks", "close", closes, 2u);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
error = qdb_ts_int64_insert(handle, "stocks", "volume", volumes, 2u);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
3.1.8.2. Retrieving timeseries¶
To retrieve a single timeseries, you provide a column and one or more timerange(s). Our examples below show how to retrieve all three columns for a single day.
# We first prepare the intervals we want to select data from, that is, a list of
# timeranges. An interval is defined as a tuple of start time (inclusive) and end
# time (exclusive).
#
# In this example, we just use a single interval.
intervals = np.array([(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])
# As with insertion, our API works with native numpy arrays and returns the results as such.
(timestamps1, opens) = t.double_get_ranges("open", intervals)
(timestamps2, closes) = t.double_get_ranges("close", intervals)
(timestamps3, volumes) = t.int64_get_ranges("volume", intervals)
# For this specific dataset, timestamps1 == timestamps2 == timestamps3, but
# this does not necessarily have to be the case.
np.testing.assert_array_equal(timestamps1, timestamps2)
np.testing.assert_array_equal(timestamps1, timestamps3)
# We first prepare the intervals we want to select data from, that is, a list of
# timeranges. An interval is defined as a tuple of start time (inclusive) and end
# time (exclusive).
#
# In this example, we just use a single interval.
intervals = np.array([(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])
# We can then use the read_series function to read column by column. The objects
# returned are regular pd.Series objects.
opens = qdbpd.read_series(t, "open", intervals)
closes = qdbpd.read_series(t, "close", intervals)
volumes = qdbpd.read_series(t, "volume", intervals)
table := handle.Timeseries("stocks")
openColumn := table.DoubleColumn("open")
closeColumn := table.DoubleColumn("close")
volumeColumn := table.Int64Column("volume")
timeRange := qdb.NewRange(time.Unix(1600000000, 0), time.Unix(1610000001, 0))
openResults, err := openColumn.GetRanges(timeRange)
if err != nil {
return err
}
closeResults, err := closeColumn.GetRanges(timeRange)
if err != nil {
return err
}
volumeResults, err := volumeColumn.GetRanges(timeRange)
if err != nil {
return err
}
for i := 0; i < 2; i++ {
timestamp := openResults[i].Timestamp()
open := openResults[i].Content()
close := closeResults[i].Content()
volume := volumeResults[i].Content()
fmt.Printf("timestamp: %s, open: %v, close: %v, volume: %v\n", timestamp, open, close, volume)
}
var range = qdb.TsRange(
qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)),
qdb.Timestamp.fromDate(new Date(2020, 10, 5, 5))
)
var table = cluster.ts('stocks')
table.columns(function(err, columns) {
if (err) {
return callback(err)
}
var open = columns[0]
var close = columns[1]
var volume = columns[2]
open.ranges([range], function(err, openPoints) {
if (err) {
return callback(err)
}
close.ranges([range], function(err, closePoints) {
if (err) {
return callback(err)
}
volume.ranges([range], function(err, volumePoints) {
if (err) {
return callback(err)
}
for (var i = 0; i < openPoints.length; i++) {
timestamp = openPoints[i].timestamp
open = openPoints[i].value
close = closePoints[i].value
volume = volumePoints[i].value
console.log(`timestamp: ${timestamp}, open: ${open}, close: ${close}, volume: ${volume}`)
}
return callback()
})
})
})
})
// using the same columns we used for the insertion
// we can retrieve the points from a specific range
var range = new QdbTimeInterval(new DateTime(2019, 02, 01), new DateTime(2019, 02, 02));
openCol.Points(range);
// you can now inspect the values in the enumerable Points
var resultPoints = openCol.Points();
// Setup the range(s) we want to get
const qdb_ts_range_t ranges[1] = {{.begin = {.tv_sec = 1548979200, .tv_nsec = 0}, .end = {.tv_sec = 1549065600, .tv_nsec = 0}}};
// We write the data into empty structure you pass as in-out parameters
qdb_ts_double_point * points = NULL;
qdb_size_t point_count = 0;
// Get the provided ranges
error = qdb_ts_double_get_ranges(handle, "stocks", "open", ranges, 1u, &points, &point_count);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
3.1.9. Queries¶
If you are looking for more flexible control over the kind of calculations performed on a dataset, or want to push certain computations to the cluster, QuasarDB offers an SQL-like query language for you to interact with your data. Please see our query language documentation.
In the example below, we will show you how to execute a simple query that calculates to total traded volume for the entire dataset.
result = c.query("SELECT SUM(volume) FROM stocks")
# results is returned as a list of dicts
for row in result:
print("row: ", row)
# Since we only expect one row, we also access it like this:
aggregate_result = result[0]['sum(volume)']
print("sum(volume): ", aggregate_result)
df = qdbpd.query(c, "SELECT SUM(volume) FROM stocks")
# The API returns dataframe
print("result: ", df)
// We can either construct a query from a raw string like this
Query q1 = Query.of("SELECT SUM(volume) FROM stocks");
// Or we can use the QueryBuilder for more flexible query building, especially
// useful for providing e.g. ranges.
String colName = "volume";
String tableName = "stocks";
Query q2 = new QueryBuilder()
.add("SELECT SUM(")
.add(colName)
.add(") FROM")
.add(tableName)
.asQuery();
// Execute the query
Result r = q1.execute(c);
// In this case, columns[0] matches to result rows[0] and are
// our timestamps.
String[] columns = r.columns;
Row[] rows = r.rows;
// Last but not least, the Query results API also implements native Java
// streams.
Stream<Row> s = r.stream();
System.out.println("total row count: " + s.count());
query := handle.Query(fmt.Sprintf("SELECT SUM(volume) FROM stocks"))
table, err := query.Execute()
if err != nil {
return err
}
for _, row := range table.Rows() {
for _, col := range table.Columns(row) {
fmt.Printf("%v ", col.Get().Value())
}
fmt.Println()
}
cluster.query("SELECT SUM(volume) FROM stocks").run(function(err, result) {
if (err) {
return callback(err)
}
for (var i = 0 ; i < result.rows.length; i++) {
console.log(`${result.rows[i][1]}`)
}
return callback()
})
// Execute the query
var r = c.Query("SELECT SUM(volume) FROM stocks");
// The rows are exposed as a regular .Net Enumerable
var columnNames = r.ColumnNames;
var rows = r.Rows;
foreach (var row in rows)
{
Console.WriteLine($"{columnNames[0]}: {row[0].Value}");
}
// Since we only expect one row, we also access it like this:
var aggregateResult = rows[0]["sum(volume)"].Int64Value;
Console.Write($"sum(volume): {aggregateResult}");
curl \
-X POST \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d '{ "query": "SELECT SUM(volume) FROM stocks" }' \
http://127.0.0.1:40080/api/query
3.1.10. Dropping a table¶
It’s easy to drop a table with QuasarDB, and is immediately visible to all clients.
# Use the earlier reference of the table we acquired to remove it:
t.remove()
// We can simply remove a table by its name.
Table.remove(c, "stocks");
table := handle.Timeseries("stocks")
err := table.Remove()
table = cluster.ts("stocks")
table.remove(function(err) {
if (err) {
callback(err)
}
callback()
})
// Use the earlier reference of the table we acquired to remove it:
ts.Remove();
// A timeseries is considered a normal entry for this operation
// You can safely remove it
qdb_remove(handle, "stocks");
3.1.11. Full example¶
For completeness, we provide the full examples of all APIs below.
# import-start
import json
import quasardb
import numpy as np
# import-end
def do_something_async_with(x):
pass
# connect-start
with quasardb.Cluster("qdb://127.0.0.1:2836") as c:
# connect-end
def secure_connect():
user_key = {}
cluster_key = ""
with open('user_private.key', 'r') as user_key_file:
user_key = json.load(user_key_file)
with open('cluster_public.key', 'r') as cluster_key_file:
cluster_key = cluster_key_file.read()
# secure-connect-start
with quasardb.Cluster(uri='qdb://127.0.0.1:2836',
user_name=user_key['username'],
user_private_key=user_key['secret_key'],
cluster_public_key=cluster_key) as scs:
# secure-connect-end
pass
# create-table-start
# First we acquire a reference to a table (which may or may not yet exist)
t = c.table("stocks")
# Initialize our column definitions
cols = [quasardb.ColumnInfo(quasardb.ColumnType.Double, "open"),
quasardb.ColumnInfo(quasardb.ColumnType.Double, "close"),
quasardb.ColumnInfo(quasardb.ColumnType.Int64, "volume")]
# Now create the table with the default shard size
t.create(cols)
# create-table-end
# tags-start
t.attach_tag("nasdaq")
# tags-end
# batch-insert-start
# We need to tell the batch inserter which columns we plan to insert. Note
# how we give it a hint that we expect to insert 2 rows for each of these columns.
batch_columns = [quasardb.BatchColumnInfo("stocks", "open", 2),
quasardb.BatchColumnInfo("stocks", "close", 2),
quasardb.BatchColumnInfo("stocks", "volume", 2)]
# Now that we know which columns we want to insert, we initialize our batch inserter.
inserter = c.inserter(batch_columns)
# Insert the first row: to start a new row, we must provide it with a mandatory
# timestamp that all values for this row will share. QuasarDB will use this timestamp
# as its primary index.
#
# QuasarDB only supports nanosecond timestamps, so we must specifically convert our
# dates to nanosecond precision.
inserter.start_row(np.datetime64('2019-02-01', 'ns'))
# We now set the values for our columns by their relative offsets: column 0 below
# refers to the first column we provide in the batch_columns variable above.
inserter.set_double(0, 3.40)
inserter.set_double(1, 3.50)
inserter.set_int64(2, 10000)
# We tell the batch inserter to start a new row before we can set the values for the
# next row.
inserter.start_row(np.datetime64('2019-02-02', 'ns'))
inserter.set_double(0, 3.50)
inserter.set_double(1, 3.55)
inserter.set_int64(2, 7500)
# Now that we're done, we push the buffer as one single operation.
inserter.push()
# batch-insert-end
# bulk-read-start
# We can initialize a bulk reader based directly from our table. By
# providing a dict=True parameter, the QuasarDB API will automatically
# expose our rows as dicts.
reader = t.reader(dict=True, ranges=[(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])
# The bulk reader is exposed as a regular Python iterator
for row in reader:
# We can access the row locally within our loop:
print(row)
# But because the QuasarDB Python API is zero-copy, our row maintains a
# reference to the underlying data. If we want to keep the row data alive
# longer than the local scope, you can use row.copy() as follows:
do_something_async_with(row.copy())
# bulk-read-end
# column-insert-start
# Our API is built on top of numpy, and provides zero-copy integration with native
# numpy arrays. As such, we first prepare three different arrays for each of our three
# columns:
opens = np.array([3.40, 3.50], dtype=np.float64)
closes = np.array([3.50, 3.55], dtype=np.float64)
volumes = np.array([10000, 7500], dtype=np.int64)
# Seperately, we generate a numpy array of timestamps. Since our three columns share
# the same timestamps, we can reuse this array for all of them, but this is not required.
timestamps = np.array([np.datetime64('2019-02-01'), np.datetime64('2019-02-02')], dtype='datetime64[ns]')
# When inserting, we provide the value arrays en timestamp arrays separately.
t.double_insert("open", timestamps, opens)
t.double_insert("close", timestamps, closes)
t.int64_insert("volume", timestamps, volumes)
# column-insert-end
# column-get-start
# We first prepare the intervals we want to select data from, that is, a list of
# timeranges. An interval is defined as a tuple of start time (inclusive) and end
# time (exclusive).
#
# In this example, we just use a single interval.
intervals = np.array([(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])
# As with insertion, our API works with native numpy arrays and returns the results as such.
(timestamps1, opens) = t.double_get_ranges("open", intervals)
(timestamps2, closes) = t.double_get_ranges("close", intervals)
(timestamps3, volumes) = t.int64_get_ranges("volume", intervals)
# For this specific dataset, timestamps1 == timestamps2 == timestamps3, but
# this does not necessarily have to be the case.
np.testing.assert_array_equal(timestamps1, timestamps2)
np.testing.assert_array_equal(timestamps1, timestamps3)
# column-get-end
# query-start
result = c.query("SELECT SUM(volume) FROM stocks")
# results is returned as a list of dicts
for row in result:
print("row: ", row)
# Since we only expect one row, we also access it like this:
aggregate_result = result[0]['sum(volume)']
print("sum(volume): ", aggregate_result)
# query-end
# drop-table-start
# Use the earlier reference of the table we acquired to remove it:
t.remove()
# drop-table-end
import java.time.Instant;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.stream.Stream;
import java.nio.file.Paths;
import java.nio.file.Files;
import org.json.JSONObject;
// import-start
import net.quasardb.qdb.*;
import net.quasardb.qdb.ts.*;
import net.quasardb.qdb.jni.*;
import net.quasardb.qdb.exception.*;
// import-end
public class Tutorial {
public static void main(String[] args) throws IOException {
Session c = Tutorial.connect();
Table t = Tutorial.createTable(c);
Tutorial.batchInsert(c);
Tutorial.bulkRead(c);
Tutorial.query(c);
Tutorial.dropTable(c);
}
private void secureConnect() {
String username = "";
String user_secret_key = "";
String cluster_public_key = "";
try {
String user_file_content = new String(Files.readAllBytes(Paths.get("user_private.key")));
JSONObject user = new JSONObject(user_file_content);
username = user.get("username").toString();
user_secret_key = user.get("secret_key").toString();
cluster_public_key = new String(Files.readAllBytes(Paths.get("cluster_public.key")));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
// secure-connect-start
Session c;
try {
c = Session.connect(new Session.SecurityOptions(username,
user_secret_key,
cluster_public_key),
"qdb://127.0.0.1:2838");
} catch (ConnectionRefusedException ex) {
System.err.println("Failed to connect to cluster, make sure server is running!");
System.exit(1);
}
// secure-connect-end
}
private static Session connect() throws ConnectionRefusedException {
// connect-start
Session c;
try {
c = Session.connect("qdb://127.0.0.1:2836");
} catch (ConnectionRefusedException ex) {
System.err.println("Failed to connect to cluster, make sure server is running!");
throw ex;
}
// connect-end
return c;
}
private static Table createTable(Session c) {
// create-table-start
Column[] definitions = {
new Column.Double ("open"),
new Column.Double ("close"),
new Column.Int64 ("volume")
};
// This will return a reference to the newly created Table
Table t = Table.create(c, "stocks", definitions);
// create-table-end
// tags-start
// You can also attach a tag by only providing the table string. See the
// javadocs for other ways to call this function.
Table.attachTag(c, t, "nasdaq");
// tags-end
return t;
}
private static void batchInsert(Session c) throws IOException {
// batch-insert-start
// We initialize a Writer here that automatically flushes rows as we insert
// them, by default every 50,000 rows. If we want to explicitly control these
// flushes, use `Table.writer()` instead.
Writer w = Table.autoFlushWriter(c, "stocks");
// Insert the first row: to start a new row, we must provide it with a mandatory
// timestamp that all values for this row will share. QuasarDB will use this timestamp
// as its primary index.
w.append(new Timespec(Instant.ofEpochSecond(1548979200)),
new Value[] {
Value.createDouble(3.40),
Value.createDouble(3.50),
Value.createInt64(10000)
});
// Inserting the next row is a matter of just calling append.
w.append(new Timespec(Instant.ofEpochSecond(1549065600)),
new Value[] {
Value.createDouble(3.50),
Value.createDouble(3.55),
Value.createInt64(7500)
});
// Now that we're done, we push the buffer as one single operation. Note that,
// because in this specific example we are using the autoFlushWriter, this would
// happen automatically under the hood every append() invocations.
w.flush();
// batch-insert-end
}
private static void bulkRead(Session c) throws IOException {
// bulk-read-start
// We first initialize the TimeRange we are looking for. Providing a timerange
// to a bulk reader is mandatory.
TimeRange[] ranges = new TimeRange[] { new TimeRange(new Timespec(Instant.ofEpochSecond(1548979200)),
new Timespec(Instant.ofEpochSecond(1549065600))) };
// In this example, we initialize a bulk reader by simply providing a session,
// table name and timerange we're interested in. For alternative ways to initialize
// a bulk reader, please refer to the javadoc of the Table class.
Reader r = Table.reader(c, "stocks", ranges);
// The reader implements an Iterator interface which allows us to traverse the rows:
while (r.hasNext()) {
WritableRow row = r.next();
// Each row has a timestamp which you can access as a Timespec:
System.out.println("row timestamp: " + row.getTimestamp().toString());
// Note that the offsets of the values array align with the offsets we used
// when creating the table, i.e. 0 means "open", 1 means "close" and 2 means
// "volume":
Value[] values = row.getValues();
Value openValue = values[0];
Value closealue = values[1];
Value volumeValue = values[2];
}
// bulk-read-end
}
private static void query(Session c) throws IOException {
// query-start
// We can either construct a query from a raw string like this
Query q1 = Query.of("SELECT SUM(volume) FROM stocks");
// Or we can use the QueryBuilder for more flexible query building, especially
// useful for providing e.g. ranges.
String colName = "volume";
String tableName = "stocks";
Query q2 = new QueryBuilder()
.add("SELECT SUM(")
.add(colName)
.add(") FROM")
.add(tableName)
.asQuery();
// Execute the query
Result r = q1.execute(c);
// In this case, columns[0] matches to result rows[0] and are
// our timestamps.
String[] columns = r.columns;
Row[] rows = r.rows;
// Last but not least, the Query results API also implements native Java
// streams.
Stream<Row> s = r.stream();
System.out.println("total row count: " + s.count());
// query-end
}
private static void dropTable(Session c) throws IOException {
// drop-table-start
// We can simply remove a table by its name.
Table.remove(c, "stocks");
// drop-table-end
}
}
package main
// import-start
import (
"fmt"
"time"
qdb "github.com/bureau14/qdb-api-go"
)
// import-end
func main() {
handle, err := connect()
if err != nil {
fmt.Printf("Failed to connect to QuasarDB: %s", err.Error())
panic("Failed to connect to QuasarDB")
}
dropTable(handle)
err = createTable(handle)
if err != nil {
fmt.Printf("Failed to create table: %s", err.Error())
panic("Failed to create table")
}
err = batchInsert(handle)
if err != nil {
fmt.Printf("Failed to insert data: %s", err.Error())
panic("Failed to insert data")
}
err = bulkRead(handle)
if err != nil {
fmt.Printf("Failed to bulk read data: %s", err.Error())
panic("Failed to bulk read data")
}
err = query(handle)
if err != nil {
fmt.Printf("Failed to query data: %s", err.Error())
panic("Failed to query data")
}
err = columnInsert(handle)
if err != nil {
fmt.Printf("Failed to column insert data: %s", err.Error())
panic("Failed to column insert data")
}
err = columnRead(handle)
if err != nil {
fmt.Printf("Failed to column read data: %s", err.Error())
panic("Failed to column read data")
}
err = dropTable(handle)
if err != nil {
fmt.Printf("Failed to drop table: %s", err.Error())
panic("Failed to drop table")
}
}
func connect() (*qdb.HandleType, error) {
// connect-start
clusterURI := "qdb://127.0.0.1:2836"
timeoutDuration := time.Duration(120) * time.Second
handle, err := qdb.SetupHandle(clusterURI, timeoutDuration)
// connect-end
if err != nil {
return nil, err
}
return &handle, nil
}
func secureConnect() (*qdb.HandleType, error) {
// secure-connect-start
clusterURI := "qdb://127.0.0.1:2836"
timeoutDuration := time.Duration(120) * time.Second
clusterPublicKeyPath := "/path/to/cluster_public.key"
usersPrivateKeyPath := "/path/to/user_private.key"
handle, err := qdb.SetupSecuredHandle(
clusterURI,
clusterPublicKeyPath,
usersPrivateKeyPath,
timeoutDuration,
qdb.EncryptNone,
)
// secure-connect-end
if err != nil {
return nil, err
}
return &handle, nil
}
func createTable(handle *qdb.HandleType) error {
// create-table-start
table := handle.Timeseries("stocks")
columnsInfo := []qdb.TsColumnInfo{
qdb.NewTsColumnInfo("open", qdb.TsColumnDouble),
qdb.NewTsColumnInfo("close", qdb.TsColumnDouble),
qdb.NewTsColumnInfo("volume", qdb.TsColumnInt64),
}
shardSizeDuration := 24 * time.Hour
err := table.Create(shardSizeDuration, columnsInfo...)
// create-table-end
if err != nil {
return err
}
// tags-start
// Once you've created a table you can attach tags to it
err = table.AttachTag("nasdaq")
// tags-end
return err
}
func batchInsert(handle *qdb.HandleType) error {
// batch-insert-start
batchColumnInfo := []qdb.TsBatchColumnInfo{
qdb.NewTsBatchColumnInfo("stocks", "open", 2),
qdb.NewTsBatchColumnInfo("stocks", "close", 2),
qdb.NewTsBatchColumnInfo("stocks", "volume", 2),
}
batch, err := handle.TsBatch(batchColumnInfo...)
if err != nil {
return err
}
batch.StartRow(time.Unix(1548979200, 0))
batch.RowSetDouble(0, 3.40)
batch.RowSetDouble(1, 3.50)
batch.RowSetInt64(2, 10000)
batch.StartRow(time.Unix(1549065600, 0))
batch.RowSetDouble(0, 3.50)
batch.RowSetDouble(1, 3.55)
batch.RowSetInt64(2, 7500)
err = batch.Push()
// batch-insert-end
return err
}
func bulkRead(handle *qdb.HandleType) error {
// bulk-read-start
table := handle.Timeseries("stocks")
bulk, err := table.Bulk()
if err != nil {
return err
}
err = bulk.GetRanges(qdb.NewRange(time.Unix(1548979200, 0), time.Unix(1549065601, 0)))
if err != nil {
return err
}
for {
timestamp, err := bulk.NextRow()
if err != nil {
break
}
open, err := bulk.GetDouble()
if err != nil {
return err
}
close, err := bulk.GetDouble()
if err != nil {
return err
}
volume, err := bulk.GetInt64()
if err != nil {
return err
}
fmt.Printf("timestamp: %s, open: %v, close: %v, volume: %v\n", timestamp, open, close, volume)
}
// bulk-read-end
return nil
}
func columnInsert(handle *qdb.HandleType) error {
// column-insert-start
table := handle.Timeseries("stocks")
openColumn := table.DoubleColumn("open")
closeColumn := table.DoubleColumn("close")
volumeColumn := table.Int64Column("volume")
t1 := time.Unix(1600000000, 0)
t2 := time.Unix(1610000000, 0)
openPoints := []qdb.TsDoublePoint{
qdb.NewTsDoublePoint(t1, 3.40),
qdb.NewTsDoublePoint(t2, 3.40),
}
closePoints := []qdb.TsDoublePoint{
qdb.NewTsDoublePoint(t1, 3.50),
qdb.NewTsDoublePoint(t2, 3.55),
}
volumePoints := []qdb.TsInt64Point{
qdb.NewTsInt64Point(t1, 10000),
qdb.NewTsInt64Point(t2, 7500),
}
err := openColumn.Insert(openPoints...)
if err != nil {
return err
}
err = closeColumn.Insert(closePoints...)
if err != nil {
return err
}
err = volumeColumn.Insert(volumePoints...)
if err != nil {
return err
}
// column-insert-end
return nil
}
func columnRead(handle *qdb.HandleType) error {
// column-get-start
table := handle.Timeseries("stocks")
openColumn := table.DoubleColumn("open")
closeColumn := table.DoubleColumn("close")
volumeColumn := table.Int64Column("volume")
timeRange := qdb.NewRange(time.Unix(1600000000, 0), time.Unix(1610000001, 0))
openResults, err := openColumn.GetRanges(timeRange)
if err != nil {
return err
}
closeResults, err := closeColumn.GetRanges(timeRange)
if err != nil {
return err
}
volumeResults, err := volumeColumn.GetRanges(timeRange)
if err != nil {
return err
}
for i := 0; i < 2; i++ {
timestamp := openResults[i].Timestamp()
open := openResults[i].Content()
close := closeResults[i].Content()
volume := volumeResults[i].Content()
fmt.Printf("timestamp: %s, open: %v, close: %v, volume: %v\n", timestamp, open, close, volume)
}
// column-get-end
return nil
}
func query(handle *qdb.HandleType) error {
// query-start
query := handle.Query(fmt.Sprintf("SELECT SUM(volume) FROM stocks"))
table, err := query.Execute()
if err != nil {
return err
}
for _, row := range table.Rows() {
for _, col := range table.Columns(row) {
fmt.Printf("%v ", col.Get().Value())
}
fmt.Println()
}
// query-end
return nil
}
func dropTable(handle *qdb.HandleType) error {
// drop-table-start
table := handle.Timeseries("stocks")
err := table.Remove()
// drop-table-end
return err
}
// import-start
var qdb = require('quasardb')
// import-end
function connect(callback) {
// connect-start
var cluster = new qdb.Cluster('qdb://127.0.0.1:2836')
cluster.connect(function() {
// Connected successfully
return callback(null, cluster)
}, function(err) {
// Connection error
return callback(err, null)
})
// connect-end
}
function secureConnect(callback) {
// secure-connect-start
var secureCluster = new qdb.Cluster(
'qdb://127.0.0.1:2836',
'/path/to/cluster_public.key',
'/path/to/user_private.key'
)
secureCluster.connect(function() {
// Connected successfully
return callback(null, cluster)
}, function(err) {
// Connection error
return callback(err, null)
})
// secure-connect-end
}
function createTable(cluster, callback) {
// create-table-start
var table = cluster.ts('stocks')
table.create([
qdb.DoubleColumnInfo('open'),
qdb.DoubleColumnInfo('close'),
qdb.Int64ColumnInfo('volume')
], function(err) {
if (err) {
// Failed to create table
return callback(err)
}
// Successfully created table
// create-table-end
// tags-start
table.attachTag("nasdaq", function (err) {
if (err) {
callback(err)
}
callback()
})
// tags-end
})
}
function batchInsert(cluster, callback) {
// not supported yet
}
function bulkRead(cluster, callback) {
// not supported yet
}
function columnInsert(cluster, callback) {
// column-insert-start
var table = cluster.ts('stocks')
table.columns(function(err, columns) {
if (err) {
return callback(err)
}
var open = columns[0]
var close = columns[1]
var volume = columns[2]
open.insert([
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)), 3.40),
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2020, 10, 5, 4)), 3.50)
], function(err) {
if (err) {
return callback(err)
}
close.insert([
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)), 3.50),
qdb.DoublePoint(qdb.Timestamp.fromDate(new Date(2020, 10, 5, 4)), 3.55)
], function(err) {
if (err) {
return callback(err)
}
volume.insert([
qdb.Int64Point(qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)), 10000),
qdb.Int64Point(qdb.Timestamp.fromDate(new Date(2020, 10, 5, 4)), 7500)
], function(err) {
if (err) {
return callback(err)
}
return callback()
})
})
})
})
// column-insert-end
}
function columnRead(cluster, callback) {
// column-get-start
var range = qdb.TsRange(
qdb.Timestamp.fromDate(new Date(2000, 10, 5, 2)),
qdb.Timestamp.fromDate(new Date(2020, 10, 5, 5))
)
var table = cluster.ts('stocks')
table.columns(function(err, columns) {
if (err) {
return callback(err)
}
var open = columns[0]
var close = columns[1]
var volume = columns[2]
open.ranges([range], function(err, openPoints) {
if (err) {
return callback(err)
}
close.ranges([range], function(err, closePoints) {
if (err) {
return callback(err)
}
volume.ranges([range], function(err, volumePoints) {
if (err) {
return callback(err)
}
for (var i = 0; i < openPoints.length; i++) {
timestamp = openPoints[i].timestamp
open = openPoints[i].value
close = closePoints[i].value
volume = volumePoints[i].value
console.log(`timestamp: ${timestamp}, open: ${open}, close: ${close}, volume: ${volume}`)
}
return callback()
})
})
})
})
// column-get-end
}
function query(cluster, callback) {
// query-start
cluster.query("SELECT SUM(volume) FROM stocks").run(function(err, result) {
if (err) {
return callback(err)
}
for (var i = 0 ; i < result.rows.length; i++) {
console.log(`${result.rows[i][1]}`)
}
return callback()
})
// query-end
}
function dropTable(cluster, callback) {
// drop-table-start
table = cluster.ts("stocks")
table.remove(function(err) {
if (err) {
callback(err)
}
callback()
})
// drop-table-end
}
connect(function(err, cluster) {
if (err) {
throw new Error("Failed to connect to cluster")
}
createTable(cluster, function (err) {
if (err) {
throw new Error("Failed to create table")
}
columnInsert(cluster, function(err) {
if (err) {
throw new Error("Failed to column insert")
}
columnRead(cluster, function(err) {
if (err) {
throw new Error("Failed to column read")
}
query(cluster, function(err) {
if (err) {
throw new Error("Failed to query table")
}
dropTable(cluster, function(err) {
if (err) {
throw new Error("Failed to drop table")
}
})
})
})
})
})
})
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
// import-start
using Quasardb;
using Quasardb.TimeSeries;
// import-end
namespace Quasardb.Tests.Tutorial
{
[TestClass]
public class Tutorial
{
[TestMethod]
public void OnInsecure()
{
// connect-start
var c = new QdbCluster("qdb://127.0.0.1:2836");
// connect-end
Assert.IsNotNull(c);
// create-table-start
// First we acquire a reference to a table (which may or may not yet exist)
var ts = c.Table("stocks");
// Initialize our column definitions
var columns = new QdbColumnDefinition[]{
new QdbDoubleColumnDefinition("open"),
new QdbDoubleColumnDefinition("close"),
new QdbInt64ColumnDefinition("volume")};
// Now create the table with the default shard size
ts.Create(columns);
// create-table-end
// tags-start
ts.AttachTag(c.Tag("nasdaq"));
// tags-end
// batch-insert-start
// We initialize a writer our batch writer.
var writer = ts.Writer();
// Alternatively we could select specific columns
// var writer = ts.Writer(new QdbColumnDefinition[]{
// new QdbDoubleColumnDefinition("open")
//});
// Insert the first row: to start a new row, we must provide it with a mandatory
// timestamp that all values for this row will share. QuasarDB will use this timestamp
// as its primary index.
writer.StartRow(new DateTime(2019, 02, 01));
// We now set the values for our columns by their relative offsets: column 0 below
// refers to the first column we provide in the columns variable above.
writer.SetDouble(0, 3.40);
writer.SetDouble(1, 3.50);
writer.SetInt64(2, 10000);
// We tell the batch writer to start a new row before we can set the values for the
// next row.
writer.StartRow(new DateTime(2019, 02, 02));
writer.SetDouble(0, 3.50);
writer.SetDouble(1, 3.55);
writer.SetInt64(2, 7500);
// Now that we're done, we push the buffer as one single operation.
writer.Push();
// batch-insert-end
// bulk-read-start
// We can initialize a bulk reader based directly from our table.
var reader = ts.Reader();
ts.Reader(new QdbTimeInterval(new DateTime(2019, 02, 01), new DateTime(2019, 02, 02)));
// The bulk reader is exposed as a regular .Net Enumerable
foreach (var row in reader)
{
// Each row has a timestamp which you can access as a Timespec:
Console.WriteLine($"row timestamp: {row.Timestamp}");
// Note that the offsets of the values array align with the offsets we used
// when creating the table, i.e. 0 means "open", 1 means "close" and 2 means
// "volume":
var openValue = row[0].DoubleValue;
var closealue = row[1].DoubleValue;
var volumeValue = row[2].Int64Value;
}
// bulk-read-end
// column-insert-start
// Prepare some data to be inserted
var opens = new QdbDoublePointCollection { { new DateTime(2019, 02, 01), 3.40 }, { new DateTime(2019, 02, 02), 3.50 } };
var closes = new QdbDoublePointCollection { { new DateTime(2019, 02, 01), 3.50 }, { new DateTime(2019, 02, 02), 3.55 } };
var volumes = new QdbInt64PointCollection { { new DateTime(2019, 02, 01), 10000 }, { new DateTime(2019, 02, 02), 7500 } };
// Retrieve the different columns from our table
var openCol = ts.DoubleColumns["open"];
var closeCol = ts.DoubleColumns["close"];
var volumeCol = ts.Int64Columns["volume"];
// Insert data for each column
openCol.Insert(opens);
closeCol.Insert(closes);
volumeCol.Insert(volumes);
// column-insert-end
// column-get-start
// using the same columns we used for the insertion
// we can retrieve the points from a specific range
var range = new QdbTimeInterval(new DateTime(2019, 02, 01), new DateTime(2019, 02, 02));
openCol.Points(range);
// you can now inspect the values in the enumerable Points
var resultPoints = openCol.Points();
// column-get-end
var ptEnum = resultPoints.GetEnumerator();
ptEnum.MoveNext();
Assert.AreEqual(ptEnum.Current.Time, opens[0].Time);
Assert.AreEqual(ptEnum.Current.Value, opens[0].Value);
// query-start
// Execute the query
var r = c.Query("SELECT SUM(volume) FROM stocks");
// The rows are exposed as a regular .Net Enumerable
var columnNames = r.ColumnNames;
var rows = r.Rows;
foreach (var row in rows)
{
Console.WriteLine($"{columnNames[0]}: {row[0].Value}");
}
// Since we only expect one row, we also access it like this:
var aggregateResult = rows[0]["sum(volume)"].Int64Value;
Console.Write($"sum(volume): {aggregateResult}");
// query-end
// drop-table-start
// Use the earlier reference of the table we acquired to remove it:
ts.Remove();
// drop-table-end
}
[TestMethod]
public void OnSecure()
{
var secureClusterURI = DaemonRunner.SecureClusterUrl;
var clusterPublicKey = DaemonRunner.ClusterPublicKey;
var userName = DaemonRunner.UserName;
var userPrivateKey = DaemonRunner.UserPrivateKey;
// secure-connect-start
var sc = new QdbCluster(secureClusterURI, clusterPublicKey, userName, userPrivateKey);
// secure-connect-end
Assert.IsNotNull(sc);
}
}
}
// import-start
#include <qdb/client.h>
#include <qdb/tag.h>
#include <qdb/ts.h>
// import-end
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
// NOTE: Name connect may clash with system-header ::connect.
int db_connect(qdb_handle_t * h)
{
assert(h);
// connect-start
// We first need to open a handle, which is is the memory structure that
// QuasarDB uses to maintain connection state.
qdb_handle_t handle;
qdb_error_t error = qdb_open(&handle, qdb_p_tcp);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Now that we have opened the handle, we can tell it to establish a connection
// with the cluster.
error = qdb_connect(handle, "qdb://localhost:2836");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// connect-end
*h = handle;
return EXIT_SUCCESS;
}
int secure_db_connect(qdb_handle_t * h)
{
assert(h);
// secure-connect-start
// We first need to open a handle, which is is the memory structure that
// QuasarDB uses to maintain connection state.
qdb_handle_t handle;
qdb_error_t error = qdb_open(&handle, qdb_p_tcp);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Load the encoded key
error = qdb_option_set_cluster_public_key(handle, "cluster_public_key");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Then the username and its associated encoded key
error = qdb_option_set_user_credentials(handle, "user", "user_private_key");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// another option is to load directly from the security files
error = qdb_option_load_security_files(handle, "cluster_public_key.txt", "user_credentials.txt");
// Now that we have opened the handle, we can tell it to establish a connection
// with the cluster.
error = qdb_connect(handle, "qdb://localhost:2836");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// secure-connect-end
*h = handle;
return EXIT_SUCCESS;
}
int main()
{
qdb_handle_t handle = NULL;
if (db_connect(&handle)) return EXIT_FAILURE;
// create-table-start
// Initialize our columns definitions
const qdb_ts_column_info_t columns[3] = {
{.name = "open", .type = qdb_ts_column_double}, //
{.name = "close", .type = qdb_ts_column_double}, //
{.name = "volume", .type = qdb_ts_column_int64} //
};
const int columns_count = sizeof(columns) / sizeof(qdb_ts_column_info_t);
// Now create the table with the default shard size
qdb_error_t error = qdb_ts_create(handle, "stocks", qdb_d_default_shard_size, columns, columns_count);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// create-table-end
// tags-start
error = qdb_attach_tag(handle, "stocks", "nasdaq");
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// tags-end
// batch-insert-start
// Initialize our batch columns definitions
const qdb_ts_batch_column_info_t batch_columns[3] = {
{.timeseries = "stocks", .column = "open", .elements_count_hint = 2}, //
{.timeseries = "stocks", .column = "close", .elements_count_hint = 2}, //
{.timeseries = "stocks", .column = "volume", .elements_count_hint = 2} //
};
const int batch_columns_count = sizeof(batch_columns) / sizeof(qdb_ts_batch_column_info_t);
// create our batch handle
qdb_batch_table_t table;
error = qdb_ts_batch_table_init(handle, batch_columns, batch_columns_count, &table);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// The batch API is row oriented, we first setup the start timestamp of the row
// Set timestamp to 2019-02-01
qdb_timespec_t timestamp = {.tv_sec = 1548979200, .tv_nsec = 0};
qdb_ts_batch_start_row(table, ×tamp);
// Then set the values for each column
qdb_ts_batch_row_set_double(table, 0, 3.40);
qdb_ts_batch_row_set_double(table, 1, 3.50);
qdb_ts_batch_row_set_int64(table, 2, 10000);
// Add another row
// Set timestamp to 2019-02-02
timestamp.tv_sec = 1549065600;
qdb_ts_batch_start_row(table, ×tamp);
qdb_ts_batch_row_set_double(table, 0, 3.50);
qdb_ts_batch_row_set_double(table, 1, 3.55);
qdb_ts_batch_row_set_int64(table, 2, 7500);
// Push into the database as a single operation
error = qdb_ts_batch_push(table);
// Don't forget to release the table
qdb_release(handle, table);
// batch-insert-end
// bulk-read-start
// We can initialize our bulk reader directly from the columns we defined earlier
qdb_local_table_t local_table;
error = qdb_ts_local_table_init(handle, "stocks", columns, columns_count, &local_table);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// Setup a range going from 2019-02-01 to 2019-02-02
qdb_ts_range_t range = {.begin = {.tv_sec = 1548979200, .tv_nsec = 0}, .end = {.tv_sec = 1549065600, .tv_nsec = 0}};
error = qdb_ts_table_get_ranges(local_table, &range, 1u);
while (!qdb_ts_table_next_row(local_table, ×tamp))
{
double value_index_zero = 0;
double value_index_one = 0;
qdb_int_t value_index_two = 0;
error = qdb_ts_row_get_double(local_table, 0, &value_index_zero);
// put cleanup logic here in case of error
error = qdb_ts_row_get_double(local_table, 1, &value_index_one);
// put cleanup logic here in case of error
error = qdb_ts_row_get_int64(local_table, 2, &value_index_two);
// put cleanup logic here in case of error
}
// don't forget to release the table once finished
qdb_release(handle, local_table);
// bulk-read-end
// column-insert-start
// Prepare the points for each column
const qdb_ts_double_point opens[2] = {
{.timestamp = {.tv_sec = 1548979200, .tv_nsec = 0}, .value = 3.4}, //
{.timestamp = {.tv_sec = 1549065600, .tv_nsec = 0}, .value = 3.5} //
};
const qdb_ts_double_point closes[2] = {
{.timestamp = {.tv_sec = 1548979200, .tv_nsec = 0}, .value = 3.50}, //
{.timestamp = {.tv_sec = 1549065600, .tv_nsec = 0}, .value = 3.55} //
};
const qdb_ts_int64_point volumes[2] = {
{.timestamp = {.tv_sec = 1548979200, .tv_nsec = 0}, .value = 7500}, //
{.timestamp = {.tv_sec = 1549065600, .tv_nsec = 0}, .value = 10000} //
};
// Insert each column independently
error = qdb_ts_double_insert(handle, "stocks", "open", opens, 2u);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
error = qdb_ts_double_insert(handle, "stocks", "close", closes, 2u);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
error = qdb_ts_int64_insert(handle, "stocks", "volume", volumes, 2u);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// column-insert-end
// column-get-start
// Setup the range(s) we want to get
const qdb_ts_range_t ranges[1] = {{.begin = {.tv_sec = 1548979200, .tv_nsec = 0}, .end = {.tv_sec = 1549065600, .tv_nsec = 0}}};
// We write the data into empty structure you pass as in-out parameters
qdb_ts_double_point * points = NULL;
qdb_size_t point_count = 0;
// Get the provided ranges
error = qdb_ts_double_get_ranges(handle, "stocks", "open", ranges, 1u, &points, &point_count);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// column-get-end
// drop-table-start
// A timeseries is considered a normal entry for this operation
// You can safely remove it
qdb_remove(handle, "stocks");
// drop-table-end
// close-start
error = qdb_close(handle);
if (QDB_FAILURE(error)) return EXIT_FAILURE;
// close-end
return EXIT_SUCCESS;
}