4.1. Python#

4.1.1. Requirements#

Before you can get started, please ensure that:

  • You have the latest version of the QuasarDB client library installed on your computer

  • You have access to a running QuasarDB cluster.

The rest of this document assumes you have a cluster up and running under qdb://127.0.0.1:2836.

4.1.2. Supported python versions#

The currently supported Python versions are:

  • Python 3.7;

  • Python 3.8;

  • Python 3.9;

  • Python 3.10;

  • Python 3.11.

  • Python 3.12.

All our packages are built and published as binaries to PyPi. For Linux, our packages are built against the manylinux2014 / PEP 599 spec which requires glibc 2.17 or later.

4.1.3. Installation#

The QuasarDB Python API is distributed as a binary using PyPi:

pip install quasardb

4.1.4. Introduction#

The QuasarDB Python API consists of three components:

Component

Description

Core API

Provides basic functionality, such as connection establishment, statistics, and pure Python interfaces for insertion, retrieval and queries.

Numpy adaptor

Highest performance interface for reading and writing numpy arrays directly to QuasarDB.

Pandas adaptor

High performance interface for eading and writing Pandas dataframes directly to QuasarDB.

This tutorial will guide you through each of these separately.

4.1.5. Core API#

4.1.5.1. Importing libraries#

import quasardb

4.1.5.2. Connection management#

Establishing a connection with the QuasarDB cluster is easy. You need the URI of at least one of your nodes, and the client will automatically detect all nodes in the cluster.

A QuasarDB cluster operates in either a secure or an insecure mode. If you do not know whether your cluster is running in secure mode, please ask your system administrator.

4.1.5.2.1. Insecure connection#

uri = 'qdb://127.0.0.1:2836'

# Establish a one-off connection
conn = quasardb.Cluster(uri)

try:
   # ... do something with conn
   pass

finally:
   # Don't forget to release the connection when you're done
   conn.close()

# Alternatively, scoped resources can be used to automatically manage
# the connection lifecycle:
with quasardb.Cluster(uri) as conn:

    # ... do something with conn
    pass

4.1.5.2.2. Secure connection#

In addition to the cluster URI, a secure connection requires you to provide:

  1. The cluster’s public key file

  2. Your username

  3. Your secret/private key

Your username and secret key is usually provided in a single file, the user’s security file. More information on QuasarDB’s security mechanisms can be found in our security manual. If you do not know the values of these parameters, please ask your system administrator.

The example below demonstrates how to connect to the Quasar cluster using these credentials.

import json

def parse_public_key(path):
   with open(path, 'r') as fp:
      return fp.read()

def parse_secret_key(path):
   with open(path, 'r') as fp:
      parsed = json.load(fp)
      return (parsed['username'], parsed['secret_key'])

def get_conn(uri, cluster_public_key_file, user_security_file):
   cluster_public_key = parse_public_key(cluster_public_key_file)
   (user_name, user_private_key) = parse_secret_key(user_security_file)

   return quasardb.Cluster(uri,
                           user_name=user_name,
                           user_private_key=user_private_key,
                           cluster_public_key=cluster_public_key)

# Establish a one-off connection
conn = get_conn('qdb://127.0.0.1:2838',
                'cluster_public.key',
                'user_private.key')

try:
   # ... do something with conn
   pass

finally:
   # Don't forget to release the connection when you're done
   conn.close()

# Alternatively, scoped resources can be used to automatically manage
# the connection lifecycle:
with get_conn('qdb://127.0.0.1:2838',
              'cluster_public.key',
              'user_private.key') as conn:

    # ... do something with conn
    pass

4.1.5.2.3. Connection pool#

The Python API also comes with a built-in connection pool. This allows you to automatically manage the lifecycle of your connections, and use a decorator to automatically insert the connection in the correct place.

The connection pool needs to be initialized with the connection properties at the start, after which you can use the built-in decorator. The example below demonstrates how to use the connection pool with a insecure connection, but the same applies to secure connections.

import quasardb.pool as pool

# Here we initialize our connection pool. It accepts the same arguments as
# quasardb.Cluster.
pool.initialize(uri='qdb://127.0.0.1:2836')

# You can use the connection pool instance directly like this:
with pool.instance().connect() as conn:
   # ... do something with conn
   pass

# Alternatively, you can make use of the decorator function which handles
# connection management for you. By default, `conn` is always injected as
# the first argument.
@pool.with_conn()
def my_handler(conn, additional_args):
   print("has conn {} with args {}".format(conn, additional_args))

my_handler('test')
has conn <quasardb.pool.SessionWrapper object at 0x7f0d40711ed0> with args test

4.1.5.3. Creating a table#

Before we can store timeseries data, we need to create a table. A table is uniquely identified by a name (e.g. “stocks” or “sensors”) and must have 1 or more columns. A timestamp index called $timestamp is always created by default, and does not need to be explicitly provided.

When creating a table, you will need to know the following:

  • The name of the table: in the example below, we will create a table called stocks.

  • What columns you need: in wanthe example below, we will create a table with three columns: * open, a double precision column that represents the price of a stock at open time; * close, a double precision column that represents the price of a stock at close time; * volume, an integer column that represents the total volume of stock traded.

  • The shard size of the size of the column: when not provided, defaults to 1d. For more information, see the documentations about Shards.

In this example we will create a table called stocks with three columns: “open”, “close” and “volume”.

import datetime

with pool.instance().connect() as conn:
  # Here we acquire a reference to a table in the cluster. This table does not yet
  # have to exist.
  t = conn.table("stocks")

  # Drop the table if it already exists
  if t.exists() is True:
    t.remove()

  # Initialize our column definitions
  cols = [quasardb.ColumnInfo(quasardb.ColumnType.Double, "open"),
          quasardb.ColumnInfo(quasardb.ColumnType.Double, "close"),
          quasardb.ColumnInfo(quasardb.ColumnType.Int64, "volume")]

  # Now create the table with a shard size of 1 hour.
  t.create(cols, datetime.timedelta(hours=1))

4.1.5.3.1. Attaching tags#

QuasarDB allows you to manage your tables by attaching tags to them. For more information about tags, see Managing tables with Tags.

In the example below, we will attach the tag nasdaq to the stocks table we created.

with pool.instance().connect() as conn:
  t = conn.table("stocks")

4.1.5.4. Insertion#

Now that we have a table we want to insert into, we can write data to it using the batch insertion API. For more information, see the documentation on the batch inserter.

Python’s batch writer API is based on Numpy arrays, and the example below will demonstrate inserting data into these.

import numpy as np

# We start by initializing our data. As the batch writer is numpy-based, we
# initialize our data as numpy arrays.
#
# Our timestamp index must always have a ``datetime64`` dtype.
idx = np.array([np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns')],
               dtype='datetime64[ns]')

# Initialize our data arrays: the dtypes must match those of the columns, otherwise
# an error will be thrown.
open = np.array([3.40, 3.50], dtype='float64')
close = np.array([3.50, 3.55], dtype='float64')
volume = np.array([10000, 7500], dtype='int64')

# Now that we have our data ready, we can initialize our batch writer and push
# the data.
with pool.instance().connect() as conn:
  # First acquire a reference the table we wish to insert into.
  t = conn.table("stocks")

  # Now, let's prepare the data for the batch writer by using a special class
  # which holds this data.
  d = quasardb.WriterData()

  # We could append data for multiple tables here.
  d.append(t, idx, [open, close, volume])

  # Acquire a reference to the batch writer, and push the data.
  w = conn.writer()

  # We have four different push modes, mapping to each of the different insertion modes
  # of the batch writer:
  #
  # - None, which uses the default, transactional insertion mode which uses copy-on-write.
  # - 'async', which uses the asynchronous insertion mode.
  # - 'fast', which uses the fast insertion mode.
  # - 'truncate', which uses the truncation insertion mode.
  #
  # We recommend 'async' for streaming data and/or small incremental inserts, and
  # 'fast' for batch inserts.
  w.push(d, push_mode=quasardb.WriterPushMode.Fast)

4.1.5.5. Reader#

On the other side we have the “bulk reader”. The bulk reader provides streaming access to a single table, optionally limited by certain columns and/or certain time ranges.

If you want to have efficient row-oriented access to the raw data in a table, this is the API you want to use. If you want to execute aggregates, complex where clauses and/or multi-table joins, please see the query API.

The example below will show you how to read our stock data for just a single day.

with pool.instance().connect() as conn:
  table_names = ["stocks"]

  # Now we'll acquire a reference to the reader, which will yield batches of data.
  with conn.reader(table_names) as reader:
    # Read all batches into memory, we consider only the first one
    batches = list(reader)
    batch = batches[0]

    # Data is returned in column-oriented format
    assert batch['open'][0] == 3.4
    assert batch['close'][0] == 3.5
    assert batch['volume'][0] == 10000

4.1.5.6. Queries#

If you are looking for more flexible control over the kind of calculations performed on a dataset, or want to push certain computations to the cluster, QuasarDB offers an SQL-like query language for you to interact with your data. Please see our query language documentation.

In the example below, we will show you how to execute a simple query that calculates to total traded volume for the entire dataset.

with pool.instance().connect() as conn:
  result = conn.query("SELECT SUM(volume) AS vol FROM stocks")

  # We expect only a single result
  assert len(result) == 1

  # We iterate over the results by row
  for row in result:
     # ...
     assert row["vol"] == 17500
     pass

  # But, as they're a simple list, we can also directly access them like this
  assert result[0]["vol"] == 17500

4.1.6. Numpy adaptor#

As mentioned before, QuasarDB’s Python API is based numpy: this allows for efficient storage and processing of large arrays of values, and works well for timeseries data.

For your convenience, we provide a quasardb.numpy batteries-included module that does the heavy lifting under the hood, and is the preferred choice for ingesting data in high performance scenarios:

  • It allows for conveniently ingesting large volumes of data in a single function call;

  • It ensures the input data has the correct shape, and converts data to the correct types where necessary;

  • It natively supports Numpy’s Masked Arrays to work with null values.

In addition to these, it also provides a query interface.

4.1.6.1. Importing the library#

import quasardb.numpy as qdbnp

4.1.6.2. Querying#

We can use the query function to use access the data using the Numpy adapter: it executes a query an returns the results as numpy arrays. It always returns a tuple of (index, dict | list[np.array]).

The function takes two positional arguments:

Position

Parameter

Type

Description

0

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

1

query

str

The query to execute

And two optional keyword arguments:

Parameter

Type

Default

Description

index

str or int

None

If provided, resolves column and uses that as the index. If string (e.g. $timestamp), uses that column as the index. If int (e.g. 1), looks up the column based on that offset.

If None, then returns a monotonically increasing counter for each row number.

dict

bool

False

If true, returns data arrays as a dict, otherwise a list of np.arrays.

The examples below will demonstrate how to use this function based on the same query we used in the earlier query example.

4.1.6.2.1. Example: dict results#

The most basic way to query is to return the results as a dict of numpy arrays, which is demonstrated by the code below.

with pool.instance().connect() as conn:
  q = "SELECT * FROM stocks"
  (idx, result) = qdbnp.query(conn, q, dict=True, index='$timestamp')

  # Note that, when explicitly providing the `$timestamp` index, it is removed from the result
  # data.
  assert '$timestamp' not in result

  print("keys:   {}".format(result.keys()))
  print("index:  {}".format(idx))

  print("open:   {}".format(result['open']))
  print("close:  {}".format(result['close']))
  print("volume: {}".format(result['volume']))
keys:   dict_keys(['$table', 'open', 'close', 'volume'])
index:  ['2019-02-01T00:00:00.000000000' '2019-02-02T00:00:00.000000000']
open:   [3.4 3.5]
close:  [3.5 3.55]
volume: [10000 7500]

4.1.6.2.2. Example: array results#

We can also run the same query, but return the data as arrays. This is usually more convenient if you wish to rely on list destructuring, as demonstrated by the code below:

with pool.instance().connect() as conn:
  # Note that, when explicitly providing the `$timestamp` index, it is removed from the data
  # arrays.
  q = "SELECT $timestamp, open, close, volume FROM stocks"
  (idx, (open, close, volume)) = qdbnp.query(conn, q, index='$timestamp')

  print("index:  {}".format(idx))
  print("open:   {}".format(open))
  print("close:  {}".format(close))
  print("volume: {}".format(volume))
index:  ['2019-02-01T00:00:00.000000000' '2019-02-02T00:00:00.000000000']
open:   [3.4 3.5]
close:  [3.5 3.55]
volume: [10000 7500]

4.1.6.3. Insertion#

We can use the write_arrays function to insert new data into the database. This function uses the batch inserter under the hood, and comes with several options.

It takes three required, positional arguments:

Position

Parameter

Type

Description

0

data

Iterable of np.array, or dict-like of str:np.array

Numpy arrays to write into the database. Can either be a list of numpy arrays, in which case they are expected to be in the same order as table.list_columns(), and an array is provided for each of the columns. If the keyword argument index is None, the first array will be assumed to be an index with dtype datetime64[ns].

Alternatively, a dict of key/values may be provided, where the key is expected to be a table column label, and the value is expected to be a np.array. If present, a column with label '$timestamp' will be used as the index.

In all cases, all numpy arrays are expected to be of exactly the same length as the index.

1

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

2

table

quasardb.Table or str

Either a string or a reference to a QuasarDB table object. For example, 'stocks' or conn.table('stocks') are both valid values.

In addition to these, the function takes many optional keyword arguments:

Parameter

Type

Default

Description

dtype

np.dtype, list of np.dtype, or dict of str:np.dtype

None

Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label.

If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence.

index

np.array with dtype datetime64[ns]

None

Optionally explicitly provide an array as the $timestamp index. If not provided, the first argument to data will be used as the index.

deduplicate

bool or list[str]

None

Enables server-side deduplication of data when it is written into the table. When True, automatically deduplicates rows when all values of a row are identical. When a list of strings is provided, deduplicates only based on the values of these columns. When ['$timestamp'], deduplicates only based on the timestamp index.

deduplication_mode

'drop' or 'upsert'

'drop'

When deduplicate is enabled, specifies how deduplication should be performed. When 'drop' (the default), it drops any new data when a duplicate was previously stored. When 'upsert', it replaces the old data with the new data when a duplicate was previously stored.

infer_types

bool

True

If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it.

Important: as conversions are expensive and often the majority of time spent when inserting data into QuasarDB, we strongly recommend setting this to False for performance-sensitive code.

_async

bool

False

Enables the asynchronous batch inserter mode.

fast

bool

False

Enables the fast batch inserter mode.

truncate

bool

False

Enables the truncation batch inserter mode.

For demonstration purposes, the code below generates sample data which we’ll be (re)using accross the various examples below.

# We first prepare our data again, this time we'll generate some random data.
rows = 10

# Generate an index
def gen_index(start_time):
   return np.array([(start_time + np.timedelta64(i, 's'))
                    for i in range(rows)]).astype('datetime64[ns]')

start_time = np.datetime64('2022-07-01', 'ns')

# As we can provide our data as a dict to the Numpy adaptor, which automatically
# maps each column by name.
data = {'open': np.random.uniform(100, 200, rows),
        'close': np.random.uniform(100, 200, rows),
        'volume': np.random.randint(10000, 20000, rows)}

4.1.6.3.1. Example: insertion#

The example below demonstrates how to insert data into our stocks table using the fast insertion mode. For performance reasons, we disable implicit conversions by setting infer_types to False.

with pool.instance().connect() as conn:
   qdbnp.write_arrays(data, conn, conn.table('stocks'),
                      index=gen_index(start_time),
                      fast=True,
                      infer_types=False)

   # We now expect exactly 10 rows in this time range.
   q = "SELECT COUNT(volume) AS vol FROM stocks IN RANGE (2022-07-01, +1d)"
   (idx_, [result]) = qdbnp.query(conn, q)

   assert result[0] == rows

4.1.6.3.2. Example: deduplication#

QuasarDB supports server-side deduplication of data as you insert it. We demonstrate this functionality by first actually creating duplicate data, and demonstrating how to avoid this.

with pool.instance().connect() as conn:
   # Note how we're explicitly setting `deduplicate` to `False`: this is the default
   # behavior, so not strictly necessary.
   qdbnp.write_arrays(data, conn, conn.table('stocks'),
                      index=gen_index(start_time),
                      deduplicate=False,
                      fast=True,
                      infer_types=False)

   # As we just inserted the same dataset again, we expect duplicate data.
   q = "SELECT $timestamp, open FROM stocks IN RANGE (2022-07-01, +1d) LIMIT 10"
   (idx1, result1) = qdbnp.query(conn, q, index='$timestamp', dict=True)

   assert len(idx1) == len(result1['open'])
   print("index: {}".format(idx1))
   print("open:  {}".format(result1['open']))

   # Now, when we insert a second time, but this time enable deduplication,
   # this will not happen: as the entire dataset is a perfect duplicate of
   # previously stored data, none of the rows should be inserted.
   qdbnp.write_arrays(data, conn, conn.table('stocks'),
                      index=gen_index(start_time),
                      deduplicate=True,
                      fast=True,
                      infer_types=False)

   # This time, we query again and expect the exact same results as before.
   (idx2, result2) = qdbnp.query(conn, q, index='$timestamp', dict=True)

   np.testing.assert_array_equal(idx1, idx2)
   np.testing.assert_array_equal(result1['open'], result2['open'])
 index: ['2022-07-01T00:00:00.000000000' '2022-07-01T00:00:00.000000000'
'2022-07-01T00:00:01.000000000' '2022-07-01T00:00:01.000000000'
'2022-07-01T00:00:02.000000000' '2022-07-01T00:00:02.000000000'
'2022-07-01T00:00:03.000000000' '2022-07-01T00:00:03.000000000'
'2022-07-01T00:00:04.000000000' '2022-07-01T00:00:04.000000000']
 open:  [129.8009381555491 129.8009381555491 134.86020632486571 134.86020632486571
 115.54372771310257 115.54372771310257 198.11043129730098
 198.11043129730098 125.5302704963691 125.5302704963691]

4.1.6.3.3. Example: masked arrays#

The Numpy adaptor uses masked arrays to represent the data: masked arrays enable explicit “masking” of values in an array that are null. This is in contrast with Pandas, which relies on inline representation of null values which leads to surprising null value problems we discuss below.

The example below demonstrates on how to use a masked array with the numpy adaptor to deal with null values. For this example, we will use an “orders” table which may have null values for an integer column.

Let’s first create this table.

cols = [quasardb.ColumnInfo(quasardb.ColumnType.String, "action"),
        quasardb.ColumnInfo(quasardb.ColumnType.Int64, "buy_qty"),
        quasardb.ColumnInfo(quasardb.ColumnType.Int64, "sell_qty")]

with pool.instance().connect() as conn:
  t = conn.table("orders")
  if t.exists() is True:
    t.remove()

  t.create(cols, datetime.timedelta(hours=1))

Now that the table is created, let’s prepare some data for 10 orders. Whenever the action is buy, we expect a buy_qty, whenever it’s sell, we expect a sell_qty.

actions = np.random.choice(a=['buy', 'sell'], size=rows)
buy_qtys = np.random.randint(1, 10, rows)
sell_qtys = np.random.randint(1, 10, rows)

print("actions: {}".format(actions))
print("buy_qtys: {}".format(buy_qtys))
print("sell_qtys: {}".format(sell_qtys))
actions: ['sell' 'buy' 'buy' 'sell' 'sell' 'sell' 'buy' 'sell' 'buy' 'buy']
buy_qtys: [3 6 7 9 5 3 8 3 1 3]
sell_qtys: [9 1 5 3 9 7 8 2 7 7]

As you can see, both buy and sell qty arrays have been initialized, but without any null values. Let’s mask the entries of both buy and sell qty arrays:

# The buy mask is a boolean arrays that is True whenever the action is 'buy'
is_buy = actions == 'buy'

# The sell mask is a boolean arrays that is True whenever the action is 'sell'
is_sell = actions == 'sell'

# We can now use these to convert the buy and sell qty arrays to a masked array.
# As a True value indicates the value is masked (as in, it is hidden), this means
# we want to mask all buy entries whenever an action is sell.

import numpy.ma as ma
buy_qtys = ma.array(data=buy_qtys,
                    mask=is_sell)
sell_qtys = ma.array(data=sell_qtys,
                     mask=is_buy)

print("actions: {}".format(actions))
print("buy_qtys: {}".format(buy_qtys))
print("sell_qtys: {}".format(sell_qtys))
actions: ['sell' 'buy' 'buy' 'sell' 'sell' 'sell' 'buy' 'sell' 'buy' 'buy']
buy_qtys: [-- 6 7 -- -- -- 8 -- 1 3]
sell_qtys: [9 -- -- 3 9 7 -- 2 -- --]

Perfect, we can see the values are masked in the correct places. We can now use these masked arrays to insert these into QuasarDB directly, which will be stored as null values.

orders = {'action': actions,
          'buy_qty': buy_qtys,
          'sell_qty': sell_qtys}

with pool.instance().connect() as conn:
   qdbnp.write_arrays(orders, conn, conn.table('orders'),
                      index=gen_index(start_time),
                      fast=True,
                      infer_types=False)

   # As an example, we can now query back the data for all buys, and can see
   # the data in QuasarDB is as we would expect.
   q = "SELECT action, buy_qty, sell_qty FROM orders WHERE action = 'buy'"
   (idx, (actions_, buy_qtys_, sell_qtys_)) = qdbnp.query(conn, q)

   print("actions: {}".format(actions_))
   print("buy_qtys:  {}".format(buy_qtys_))
   print("sell_qtys: {}".format(sell_qtys_))
actions: ['buy' 'buy' 'buy' 'buy' 'buy']
buy_qtys:  [6 7 8 1 3]
sell_qtys: [-- -- -- -- --]

Exactly as we wanted: all sell_qty values are null / masked, while we have the values we expected for all buy_qty values.

4.1.7. Pandas adaptor#

QuasarDB’s Python API also provides a Pandas adaptor that builds on top of the Numpy adaptor. This adaptor enables direct integration with Pandas, and as such allows you to read and write Pandas Dataframes directly with QuasarDB.

4.1.7.1. Importing the library#

import quasardb.pandas as qdbpd

4.1.7.2. Querying#

We can use the query function to use access the data using the Pandas adapter: it executes a query an returns the results as a Dataframe.

The function takes two positional arguments:

Position

Parameter

Type

Description

0

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

1

query

str

The query to execute

And an optional keyword argument:

Parameter

Type

Default

Description

index

str or int

None

If provided, resolves column and uses that as the index. If string (e.g. $timestamp), uses that column as the index.

If None, then uses Pandas’ default dataframe indexing (a mononitically increasing counter).

The example below will demonstrate how to use this function based on the same query we used in the earlier query example.

with pool.instance().connect() as conn:
  df = qdbpd.query(conn, "SELECT * FROM stocks LIMIT 10")

df

$timestamp

$table

open

close

volume

0

2019-02-01 00:00:00

stocks

3.400000

3.500000

10000

1

2019-02-02 00:00:00

stocks

3.500000

3.550000

7500

2

2022-07-01 00:00:00

stocks

129.800938

189.807248

14288

3

2022-07-01 00:00:00

stocks

129.800938

189.807248

14288

4

2022-07-01 00:00:01

stocks

134.860206

179.282507

17750

5

2022-07-01 00:00:01

stocks

134.860206

179.282507

17750

6

2022-07-01 00:00:02

stocks

115.543728

154.417280

17369

7

2022-07-01 00:00:02

stocks

115.543728

154.417280

17369

8

2022-07-01 00:00:03

stocks

198.110431

134.008978

11141

9

2022-07-01 00:00:03

stocks

198.110431

134.008978

11141

As a convenience, we can use the $timestamp column as the index as well, by setting the index parameter accordingly.

with pool.instance().connect() as conn:
  df = qdbpd.query(conn, "SELECT * FROM stocks LIMIT 10", index='$timestamp')

df

$table

open

close

volume

2019-02-01 00:00:00

stocks

3.400000

3.500000

10000

2019-02-02 00:00:00

stocks

3.500000

3.550000

7500

2022-07-01 00:00:00

stocks

129.800938

189.807248

14288

2022-07-01 00:00:00

stocks

129.800938

189.807248

14288

2022-07-01 00:00:01

stocks

134.860206

179.282507

17750

2022-07-01 00:00:01

stocks

134.860206

179.282507

17750

2022-07-01 00:00:02

stocks

115.543728

154.417280

17369

2022-07-01 00:00:02

stocks

115.543728

154.417280

17369

2022-07-01 00:00:03

stocks

198.110431

134.008978

11141

2022-07-01 00:00:03

stocks

198.110431

134.008978

11141

4.1.7.3. Insertion#

We can use the write_dataframe function to insert new data into the database. This function uses the Numpy adaptor’s write_arrays function under the hood, and comes with several options:

It takes three required, positional arguments:

Position

Parameter

Type

Description

0

df

pandas.DataFrame

The pandas dataframe to store

1

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

2

table

quasardb.Table or str

Either a string or a reference to a QuasarDB table object. For example, 'stocks' or conn.table('stocks') are both valid values.

In addition to these, the function takes many optional keyword arguments:

Parameter

Type

Default

Description

dtype

np.dtype, list of np.dtype, or dict of str:np.dtype

None

Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label.

If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence.

create

bool

False

If true, automatically creates the table if it did not yet exist.

shard_size

datetime.timedelta

None

If create is True and a table is to be created, uses this value as the shard size. If not, uses QuasarDB’s default shard size (1d).

deduplicate

bool or list[str]

None

Enables server-side deduplication of data when it is written into the table. When True, automatically deduplicates rows when all values of a row are identical. When a list of strings is provided, deduplicates only based on the values of these columns. When ['$timestamp'], deduplicates only based on the timestamp index.

deduplication_mode

'drop' or 'upsert'

'drop'

When deduplicate is enabled, specifies how deduplication should be performed. When 'drop' (the default), it drops any new data when a duplicate was previously stored. When 'upsert', it replaces the old data with the new data when a duplicate was previously stored.

infer_types

bool

True

If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it.

Important: as conversions are expensive and often the majority of time spent when inserting data into QuasarDB, we strongly recommend setting this to False for performance-sensitive code.

_async

bool

False

Enables the asynchronous batch inserter mode.

fast

bool

False

Enables the fast batch inserter mode.

truncate

bool

False

Enables the truncation batch inserter mode.

For demonstration purposes, the code below generates sample data which we’ll be (re)using accross the various examples below.

import pandas as pd

start_time = np.datetime64('2022-07-15', 'ns')

# We will reuse the earlier sample data to construct a dataframe instead.
data_ = data
data_['$timestamp'] = gen_index(start_time)

# Note how we're setting the `$timestamp` column as the index: when writing a dataframe
# to QuasarDB, a `datetime64[ns]` index is required.
df = pd.DataFrame.from_dict(data_)
df = df.set_index('$timestamp').reindex()
df

$timestamp

open

close

volume

2022-07-15 00:00:00

129.800938

189.807248

14288

2022-07-15 00:00:01

134.860206

179.282507

17750

2022-07-15 00:00:02

115.543728

154.417280

17369

2022-07-15 00:00:03

198.110431

134.008978

11141

2022-07-15 00:00:04

125.530270

193.479529

11947

2022-07-15 00:00:05

153.835698

195.047264

15907

2022-07-15 00:00:06

199.421627

125.887649

15512

2022-07-15 00:00:07

188.824268

119.402445

16381

2022-07-15 00:00:08

147.326543

102.446742

15213

2022-07-15 00:00:09

155.819616

198.740592

13419

4.1.7.3.1. Example: insertion#

The example below demonstrates how to insert data into our stocks table using the fast insertion mode. For performance reasons, we disable implicit conversions by setting infer_types to False.

with pool.instance().connect() as conn:
   qdbpd.write_dataframe(df, conn, conn.table('stocks'),
                         fast=True,
                         infer_types=False)

   # We now expect exactly 10 rows in this time range.
   q = "SELECT COUNT(volume) AS vol FROM stocks IN RANGE (2022-07-15, +1d)"
   result = qdbpd.query(conn, q)

print(result)

vol

0

10

4.1.7.3.2. Example: deduplication#

QuasarDB supports server-side deduplication of data as you insert it. We demonstrate this functionality by first actually creating duplicate data, and demonstrating how to avoid this.

with pool.instance().connect() as conn:
   # Note how we're explicitly setting `deduplicate` to `False`: this is the default
   # behavior, so not strictly necessary.
   qdbpd.write_dataframe(df, conn, conn.table('stocks'),
                         deduplicate=False,
                         fast=True,
                         infer_types=False)

   # As we just inserted the same dataset again, we expect duplicate data.
   q = "SELECT $timestamp, open FROM stocks IN RANGE (2022-07-15, +1d) LIMIT 10"
   df1 = qdbpd.query(conn, q, index='$timestamp')
   print(df1.head)

   # Now, when we insert a second time, but this time enable deduplication,
   # this will not happen: as the entire dataset is a perfect duplicate of
   # previously stored data, none of the rows should be inserted.
   qdbpd.write_dataframe(df, conn, conn.table('stocks'),
                         deduplicate=True,
                         fast=True,
                         infer_types=False)

   # This time, we query again and expect the exact same results as before.
   df2 = qdbpd.query(conn, q, index='$timestamp')
   print(df2.head)
<bound method NDFrame.head of                            open
2022-07-15 00:00:00  129.800938
2022-07-15 00:00:00  129.800938
2022-07-15 00:00:01  134.860206
2022-07-15 00:00:01  134.860206
2022-07-15 00:00:02  115.543728
2022-07-15 00:00:02  115.543728
2022-07-15 00:00:03  198.110431
2022-07-15 00:00:03  198.110431
2022-07-15 00:00:04  125.530270
2022-07-15 00:00:04  125.530270>
<bound method NDFrame.head of                            open
2022-07-15 00:00:00  129.800938
2022-07-15 00:00:00  129.800938
2022-07-15 00:00:01  134.860206
2022-07-15 00:00:01  134.860206
2022-07-15 00:00:02  115.543728
2022-07-15 00:00:02  115.543728
2022-07-15 00:00:03  198.110431
2022-07-15 00:00:03  198.110431
2022-07-15 00:00:04  125.530270
2022-07-15 00:00:04  125.530270>

4.1.7.4. Null values#

Null values in Pandas have sometimes surprising behavior which can affect the performance and correctness of your code, and deserves some explanation. Contrary to Numpy’s masked arrays (which we covered in an earlier section), Pandas tries to represent null values inline: for example, for a float64 series, it represents a null value as a NaN: this makes sense, as this is an explicitly defined standard of the IEEE 754 Floating Point Arithmetic standard.

For illustration purposes, let’s generate a small dataframe which has a few null values:

data = {'floats': [1.2, 3.4, None, 5.6, 7.8]}
pd.DataFrame.from_dict(data)
print(pd)

floats

0

1.2

1

3.4

2

NaN

3

5.6

4

7.8

As you can see, Pandas inserted the NaN in the correct place. But what happens when you’re using a type that cannot be represented inline? For example, what if we want to combine null values with integers?

As there is no way to represent null values with integers, Pandas will instead silently convert the input data to doubles. Observe the following:

data['ints_with_null'] = [1, 2, None, 4, 5]
data['ints_no_null']   = [1, 2, 3, 4, 5]

pd.DataFrame.from_dict(data)
print(pd)

floats

ints_with_null

ints_no_null

0

1.2

1.0

1

1

3.4

2.0

2

2

NaN

NaN

3

3

5.6

4.0

4

4

7.8

5.0

5

Even without printing the dtypes, it’s now obvious that the array with null values has been converted to floating point.

As QuasarDB will want these values as integers instead, what this means is that a lot of conversions will take place under the hood:

  1. Pandas first converts the array to float64, as demonstrated above;

  2. QuasarDB’s adapter probes the null values and converts this into a masked array;

  3. QuasarDB’s adapter then converts the masked array back to int64.

This means we’re not just copying, but actually converting the data to-and-from float64 before we’re inserting it into QuasarDB.

Knowing this, we recommend the following:

  • In performance sensitive scenarios: work directly with the Numpy adaptor as it avoids this entirely;

  • In other scenarios, while calling write_dataframe, either:

    • explicitly set the dtype of these columns to their desired type (in this case, int64);

    • ensure infer_types is to True (the default).

4.1.8. Reference#