3.1. Tutorial

3.1.1. Requirements

Before you can get started, please ensure that:

  • You have the latest version of the QuasarDB client library installed on your computer

  • You have access to a running QuasarDB cluster.

The rest of this document assumes you have a cluster up and running under qdb://127.0.0.1:2836.

3.1.2. Installing libraries

pip install quasardb

3.1.3. Importing libraries

Most languages require you to import the relevant QuasarDB modules before you can use them, so we start out with them.

import json
import quasardb
import numpy as np

3.1.4. Connection management

Establishing a connection with the QuasarDB cluster is easy. You need the URI of at least one of your nodes, and the client will automatically detect all nodes in the cluster.

A QuasarDB cluster operates in either a secure or an insecure mode. If you do not know whether your cluster is running in secure mode, please ask your system administrator.

3.1.4.1. Insecure connection

    import quasardb.pool as pool

    # Always initialize the connection pool global singleton.
    pool.initialize(uri="qdb://127.0.0.1:2836")

    # You can use the connection pool instance directly like this:
    with pool.instance().connect() as conn:
        # ... do something with conn
        pass

    # Alternatively, you can make use of the decorator function which handles
    # connection management for you
    @pool.with_conn()
    def my_handler(conn, additional_args):
        # By default, `conn` is always injected as the first argument
        result = conn.query()

3.1.4.2. Secure connection

In case of a secure connection, we need to provide a few additional parameters:

  • A username;

  • A user private key;

  • A cluster public key.

More information on QuasarDB’s security mechanisms can be found in our security manual.

If you do not know the values of these parameters, please ask your system administrator.

with quasardb.Cluster(uri='qdb://127.0.0.1:2836',
                      user_name=user_key['username'],
                      user_private_key=user_key['secret_key'],
                      cluster_public_key=cluster_key) as scs:

3.1.5. Creating a table

Before we can store timeseries data, we need to create a table. A table is uniquely identified by a name (e.g. “stocks” or “sensors”) and can have 1 or more columns.

In this example we will create a table “stocks” with three columns, “open”, “close” and “volume”. The respective types of the columns are two double precision floating point values and a 64-bit signed integer.


    # First we acquire a reference to a table (which may or may not yet exist)
    t = c.table("stocks")

    # Initialize our column definitions
    cols = [quasardb.ColumnInfo(quasardb.ColumnType.Double, "open"),
            quasardb.ColumnInfo(quasardb.ColumnType.Double, "close"),
            quasardb.ColumnInfo(quasardb.ColumnType.Int64, "volume")]

    # Now create the table with the default shard size
    t.create(cols)

3.1.5.1. Attaching tags

QuasarDB allows you to manage your tables by attaching tags to them. For more information about tags, see Managing tables with Tags.

In the example below, we will attach the tag nasdaq to the “stocks” table we created.


t.attach_tag("nasdaq")

3.1.6. A word about API types

Now that we have our tables in place, it’s time to start interacting with actual data. On a high-level, QuasarDB provides two different APIs for you to insert data:

  • A row-based API, where you insert data on a row-by-row basis. This API is referred to as our “batch inserter”. This API provides stronger guarantees in terms of consistency.

  • A column-based API, where you insert pure timeseries data per column. This data is typically aligned per timestamp, and therefor assumes unique timestamps.

If you’re unsure which API is best for you, start out with the row-based insertion API, the batch inserter.

You should now continue with either the row oriented or the column oriented tutorials.

3.1.7. Row oriented API

3.1.7.1. Batch inserter

The QuasarDB batch inserter provides you with a row-oriented interface to send data to the QuasarDB cluster. The data is buffered client-side and sent in batches, ensuring efficiency and consistency.

The batch writer has various modes of operation, each with different tradeoffs:

Insertion mode

Description

Use case(s)

Default

Transactional insertion mode that employs Copy-on-Write

General purpose

Fast

Transactional insert that does not employ Copy-on-Write. Newly written data may be visible to queries before the transaction is fully completed.

Streaming data, many small incremental writes

Asynchronous

Data is buffered in-memory in the QuasarDB daemon nodes before writing to disk. Data from multiple sources is buffered together, and periodically flushed to disk.

Streaming data where multiple processes simultaneously write into the same table(s)

Truncate (a.k.a. “upsert”)

Replaces any existing data with the provided data.

Replay of historical data

When in doubt, we recommend you use the default insertion mode.

The steps involved in using the batch writer API is as follows:

  1. Initialize a local batch inserter instance, providing it with the tables and columns you want to insert data for. Note that specifying multiple tables is supported: this will allow you to insert data into multiple tables in one atomic operation.

  2. Prepare/buffer the batch you want to insert. Buffering locally before sending ensures that the tranmission of the data is happening at maximum throughput, ensuring server-side efficiency.

  3. Push the batch to the cluster.

  4. If necessary, go back to step 2 to send additional batches.

We recommend you use batch sizes as large as possible: between 50k and 500k rows is optimal.

In the example below we will insert two different rows for two separate days into our “stocks” table.


# We need to tell the batch inserter which columns we plan to insert. Note
# how we give it a hint that we expect to insert 2 rows for each of these columns.
batch_columns = [quasardb.BatchColumnInfo("stocks", "open", 2),
                 quasardb.BatchColumnInfo("stocks", "close", 2),
                 quasardb.BatchColumnInfo("stocks", "volume", 2)]

# Now that we know which columns we want to insert, we initialize our batch inserter.
inserter = c.inserter(batch_columns)


# Insert the first row: to start a new row, we must provide it with a mandatory
# timestamp that all values for this row will share. QuasarDB will use this timestamp
# as its primary index.
#
# QuasarDB only supports nanosecond timestamps, so we must specifically convert our
# dates to nanosecond precision.
inserter.start_row(np.datetime64('2019-02-01', 'ns'))

# We now set the values for our columns by their relative offsets: column 0 below
# refers to the first column we provide in the batch_columns variable above.
inserter.set_double(0, 3.40)
inserter.set_double(1, 3.50)
inserter.set_int64(2, 10000)

# We tell the batch inserter to start a new row before we can set the values for the
# next row.
inserter.start_row(np.datetime64('2019-02-02', 'ns'))

inserter.set_double(0, 3.50)
inserter.set_double(1, 3.55)
inserter.set_int64(2, 7500)

# Now that we're done, we push the buffer as one single operation.
inserter.push()

3.1.7.2. Bulk reader

On the other side of the row-oriented API we have the “bulk reader”. The bulk reader provides streaming access to a single table, optionally limited by certain columns and/or certain time ranges.

If you want to have efficient row-oriented access to the raw data in a table, this is the API you want to use. If you want to execute aggregates, complex where clauses and/or multi-table joins, please see the query API.

The example below will show you how to read our stock data for just a single day.


# We can initialize a bulk reader based directly from our table. By
# providing a dict=True parameter, the QuasarDB API will automatically
# expose our rows as dicts.
reader = t.reader(dict=True, ranges=[(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])

# The bulk reader is exposed as a regular Python iterator
for row in reader:

    # We can access the row locally within our loop:
    print(row)

    # But because the QuasarDB Python API is zero-copy, our row maintains a
    # reference to the underlying data. If we want to keep the row data alive
    # longer than the local scope, you can use row.copy() as follows:
    do_something_async_with(row.copy())

The next section will show you how to store and retrieve the same dataset using the column-oriented API. If this is irrelevant to you, it’s safe to skip directly to the query API.

3.1.8. Column oriented API

The other high level APIs QuasarDB offers are the column-oriented API. These APIs are more lightweight than the row-oriented APIs, and provides a good alternative if your dataset is shaped correctly.

3.1.8.1. Storing timeseries

To store a single timeseries, all you have to do is provide a sequence of timestamp / value pairs, and which column you want to store them as.


# Our API is built on top of numpy, and provides zero-copy integration with native
# numpy arrays. As such, we first prepare three different arrays for each of our three
# columns:
opens = np.array([3.40, 3.50], dtype=np.float64)
closes = np.array([3.50, 3.55], dtype=np.float64)
volumes = np.array([10000, 7500], dtype=np.int64)

# Seperately, we generate a numpy array of timestamps. Since our three columns share
# the same timestamps, we can reuse this array for all of them, but this is not required.
timestamps = np.array([np.datetime64('2019-02-01'), np.datetime64('2019-02-02')], dtype='datetime64[ns]')

# When inserting, we provide the value arrays en timestamp arrays separately.
t.double_insert("open", timestamps, opens)
t.double_insert("close", timestamps, closes)
t.int64_insert("volume", timestamps, volumes)

3.1.8.2. Retrieving timeseries

To retrieve a single timeseries, you provide a column and one or more timerange(s). Our examples below show how to retrieve all three columns for a single day.


# We first prepare the intervals we want to select data from, that is, a list of
# timeranges. An interval is defined as a tuple of start time (inclusive) and end
# time (exclusive).
#
# In this example, we just use a single interval.
intervals = np.array([(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])

# As with insertion, our API works with native numpy arrays and returns the results as such.
(timestamps1, opens) = t.double_get_ranges("open", intervals)
(timestamps2, closes) = t.double_get_ranges("close", intervals)
(timestamps3, volumes) = t.int64_get_ranges("volume", intervals)

# For this specific dataset, timestamps1 == timestamps2 == timestamps3, but
# this does not necessarily have to be the case.
np.testing.assert_array_equal(timestamps1, timestamps2)
np.testing.assert_array_equal(timestamps1, timestamps3)

3.1.9. Queries

If you are looking for more flexible control over the kind of calculations performed on a dataset, or want to push certain computations to the cluster, QuasarDB offers an SQL-like query language for you to interact with your data. Please see our query language documentation.

In the example below, we will show you how to execute a simple query that calculates to total traded volume for the entire dataset.


result = c.query("SELECT SUM(volume) FROM stocks")

# results is returned as a list of dicts
for row in result:
    print("row: ", row)


# Since we only expect one row, we also access it like this:
aggregate_result = result[0]['sum(volume)']
print("sum(volume): ", aggregate_result)

3.1.10. Dropping a table

It’s easy to drop a table with QuasarDB, and is immediately visible to all clients.


# Use the earlier reference of the table we acquired to remove it:
t.remove()

3.1.11. Full example

For completeness, we provide the full examples of all APIs below.

# import-start
import json
import quasardb
import numpy as np
# import-end

def do_something_async_with(x):
    pass

def test_pool():
    # pool-connect-start
    import quasardb.pool as pool

    # Always initialize the connection pool global singleton.
    pool.initialize(uri="qdb://127.0.0.1:2836")

    # You can use the connection pool instance directly like this:
    with pool.instance().connect() as conn:
        # ... do something with conn
        pass

    # Alternatively, you can make use of the decorator function which handles
    # connection management for you
    @pool.with_conn()
    def my_handler(conn, additional_args):
        # By default, `conn` is always injected as the first argument
        result = conn.query()

    # pool-connect-end

# connect-start
with quasardb.Cluster("qdb://127.0.0.1:2836") as c:
# connect-end
    def secure_connect():
        user_key = {}
        cluster_key = ""

        with open('user_private.key', 'r') as user_key_file:
            user_key = json.load(user_key_file)
        with open('cluster_public.key', 'r') as cluster_key_file:
            cluster_key = cluster_key_file.read()

        # secure-connect-start
        with quasardb.Cluster(uri='qdb://127.0.0.1:2836',
                              user_name=user_key['username'],
                              user_private_key=user_key['secret_key'],
                              cluster_public_key=cluster_key) as scs:
        # secure-connect-end
            pass

    # create-table-start

    # First we acquire a reference to a table (which may or may not yet exist)
    t = c.table("stocks")

    # Initialize our column definitions
    cols = [quasardb.ColumnInfo(quasardb.ColumnType.Double, "open"),
            quasardb.ColumnInfo(quasardb.ColumnType.Double, "close"),
            quasardb.ColumnInfo(quasardb.ColumnType.Int64, "volume")]

    # Now create the table with the default shard size
    t.create(cols)

    # create-table-end

    # tags-start

    t.attach_tag("nasdaq")

    # tags-end

    # batch-insert-start

    # We need to tell the batch inserter which columns we plan to insert. Note
    # how we give it a hint that we expect to insert 2 rows for each of these columns.
    batch_columns = [quasardb.BatchColumnInfo("stocks", "open", 2),
                     quasardb.BatchColumnInfo("stocks", "close", 2),
                     quasardb.BatchColumnInfo("stocks", "volume", 2)]

    # Now that we know which columns we want to insert, we initialize our batch inserter.
    inserter = c.inserter(batch_columns)


    # Insert the first row: to start a new row, we must provide it with a mandatory
    # timestamp that all values for this row will share. QuasarDB will use this timestamp
    # as its primary index.
    #
    # QuasarDB only supports nanosecond timestamps, so we must specifically convert our
    # dates to nanosecond precision.
    inserter.start_row(np.datetime64('2019-02-01', 'ns'))

    # We now set the values for our columns by their relative offsets: column 0 below
    # refers to the first column we provide in the batch_columns variable above.
    inserter.set_double(0, 3.40)
    inserter.set_double(1, 3.50)
    inserter.set_int64(2, 10000)

    # We tell the batch inserter to start a new row before we can set the values for the
    # next row.
    inserter.start_row(np.datetime64('2019-02-02', 'ns'))

    inserter.set_double(0, 3.50)
    inserter.set_double(1, 3.55)
    inserter.set_int64(2, 7500)

    # Now that we're done, we push the buffer as one single operation.
    inserter.push()

    # batch-insert-end


    # bulk-read-start

    # We can initialize a bulk reader based directly from our table. By
    # providing a dict=True parameter, the QuasarDB API will automatically
    # expose our rows as dicts.
    reader = t.reader(dict=True, ranges=[(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])

    # The bulk reader is exposed as a regular Python iterator
    for row in reader:

        # We can access the row locally within our loop:
        print(row)

        # But because the QuasarDB Python API is zero-copy, our row maintains a
        # reference to the underlying data. If we want to keep the row data alive
        # longer than the local scope, you can use row.copy() as follows:
        do_something_async_with(row.copy())

    # bulk-read-end

    # column-insert-start

    # Our API is built on top of numpy, and provides zero-copy integration with native
    # numpy arrays. As such, we first prepare three different arrays for each of our three
    # columns:
    opens = np.array([3.40, 3.50], dtype=np.float64)
    closes = np.array([3.50, 3.55], dtype=np.float64)
    volumes = np.array([10000, 7500], dtype=np.int64)

    # Seperately, we generate a numpy array of timestamps. Since our three columns share
    # the same timestamps, we can reuse this array for all of them, but this is not required.
    timestamps = np.array([np.datetime64('2019-02-01'), np.datetime64('2019-02-02')], dtype='datetime64[ns]')

    # When inserting, we provide the value arrays en timestamp arrays separately.
    t.double_insert("open", timestamps, opens)
    t.double_insert("close", timestamps, closes)
    t.int64_insert("volume", timestamps, volumes)

    # column-insert-end

    # column-get-start

    # We first prepare the intervals we want to select data from, that is, a list of
    # timeranges. An interval is defined as a tuple of start time (inclusive) and end
    # time (exclusive).
    #
    # In this example, we just use a single interval.
    intervals = np.array([(np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns'))])

    # As with insertion, our API works with native numpy arrays and returns the results as such.
    (timestamps1, opens) = t.double_get_ranges("open", intervals)
    (timestamps2, closes) = t.double_get_ranges("close", intervals)
    (timestamps3, volumes) = t.int64_get_ranges("volume", intervals)

    # For this specific dataset, timestamps1 == timestamps2 == timestamps3, but
    # this does not necessarily have to be the case.
    np.testing.assert_array_equal(timestamps1, timestamps2)
    np.testing.assert_array_equal(timestamps1, timestamps3)

    # column-get-end

    # query-start

    result = c.query("SELECT SUM(volume) FROM stocks")

    # results is returned as a list of dicts
    for row in result:
        print("row: ", row)


    # Since we only expect one row, we also access it like this:
    aggregate_result = result[0]['sum(volume)']
    print("sum(volume): ", aggregate_result)

    # query-end

    # drop-table-start

    # Use the earlier reference of the table we acquired to remove it:
    t.remove()

    # drop-table-end