4.1. Python

4.1. Python

4.1.1. Requirements

Before you can get started, please ensure that:

  • You have the latest version of the QuasarDB client library installed on your computer

  • You have access to a running QuasarDB cluster.

The rest of this document assumes you have a cluster up and running under qdb://127.0.0.1:2836.

4.1.2. Supported python versions

The currently supported Python versions are:

  • Python 3.6;

  • Python 3.7;

  • Python 3.8;

  • Python 3.9;

  • Python 3.10.

All our packages are built and published as binaries to PyPi. For Linux, our packages are built against the manylinux2014 / PEP 599 spec which requires glibc 2.17 or later.

4.1.3. Installation

The QuasarDB Python API is distributed as a binary using PyPi:

pip install quasardb

4.1.4. Introduction

The QuasarDB Python API consists of three components:

Component

Description

Core API

Provides basic functionality, such as connection establishment, statistics, and pure Python interfaces for insertion, retrieval and queries.

Numpy adaptor

Highest performance interface for reading and writing numpy arrays directly to QuasarDB.

Pandas adaptor

High performance interface for eading and writing Pandas dataframes directly to QuasarDB.

This tutorial will guide you through each of these separately.

4.1.5. Core API

4.1.5.1. Importing libraries

import quasardb

4.1.5.2. Connection management

Establishing a connection with the QuasarDB cluster is easy. You need the URI of at least one of your nodes, and the client will automatically detect all nodes in the cluster.

A QuasarDB cluster operates in either a secure or an insecure mode. If you do not know whether your cluster is running in secure mode, please ask your system administrator.

4.1.5.2.1. Insecure connection

uri = 'qdb://127.0.0.1:2836'

# Establish a one-off connection
conn = quasardb.Cluster(uri)

try:
   # ... do something with conn
   pass

finally:
   # Don't forget to release the connection when you're done
   conn.close()

# Alternatively, scoped resources can be used to automatically manage
# the connection lifecycle:
with quasardb.Cluster(uri) as conn:

    # ... do something with conn
    pass

4.1.5.2.2. Secure connection

In addition to the cluster URI, a secure connection requires you to provide:

  1. The cluster’s public key file

  2. Your username

  3. Your secret/private key

Your username and secret key is usually provided in a single file, the user’s security file. More information on QuasarDB’s security mechanisms can be found in our security manual. If you do not know the values of these parameters, please ask your system administrator.

The example below demonstrates how to connect to the Quasar cluster using these credentials.

import json

def parse_public_key(path):
   with open(path, 'r') as fp:
      return fp.read()

def parse_secret_key(path):
   with open(path, 'r') as fp:
      parsed = json.load(fp)
      return (parsed['username'], parsed['secret_key'])

def get_conn(uri, cluster_public_key_file, user_security_file):
   cluster_public_key = parse_public_key(cluster_public_key_file)
   (user_name, user_private_key) = parse_secret_key(user_security_file)

   return quasardb.Cluster(uri,
                           user_name=user_name,
                           user_private_key=user_private_key,
                           cluster_public_key=cluster_public_key)

# Establish a one-off connection
conn = get_conn('qdb://127.0.0.1:2838',
                'cluster_public.key',
                'user_private.key')

try:
   # ... do something with conn
   pass

finally:
   # Don't forget to release the connection when you're done
   conn.close()

# Alternatively, scoped resources can be used to automatically manage
# the connection lifecycle:
with get_conn('qdb://127.0.0.1:2838',
              'cluster_public.key',
              'user_private.key') as conn:

    # ... do something with conn
    pass

4.1.5.2.3. Connection pool

The Python API also comes with a built-in connection pool. This allows you to automatically manage the lifecycle of your connections, and use a decorator to automatically insert the connection in the correct place.

The connection pool needs to be initialized with the connection properties at the start, after which you can use the built-in decorator. The example below demonstrates how to use the connection pool with a insecure connection, but the same applies to secure connections.

import quasardb.pool as pool

# Here we initialize our connection pool. It accepts the same arguments as
# quasardb.Cluster.
pool.initialize(uri='qdb://127.0.0.1:2836')

# You can use the connection pool instance directly like this:
with pool.instance().connect() as conn:
   # ... do something with conn
   pass

# Alternatively, you can make use of the decorator function which handles
# connection management for you. By default, `conn` is always injected as
# the first argument.
@pool.with_conn()
def my_handler(conn, additional_args):
   print("has conn {} with args {}".format(conn, additional_args))

my_handler('test')
has conn <quasardb.pool.SessionWrapper object at 0x7f2c0c56a290> with args test

4.1.5.3. Creating a table

Before we can store timeseries data, we need to create a table. A table is uniquely identified by a name (e.g. “stocks” or “sensors”) and must have 1 or more columns. A timestamp index called $timestamp is always created by default, and does not need to be explicitly provided.

When creating a table, you will need to know the following:

  • The name of the table: in the example below, we will create a table called stocks.

  • What columns you need: in wanthe example below, we will create a table with three columns: * open, a double precision column that represents the price of a stock at open time; * close, a double precision column that represents the price of a stock at close time; * volume, an integer column that represents the total volume of stock traded.

  • The shard size of the size of the column: when not provided, defaults to 1d. For more information, see the documentations about Shards.

In this example we will create a table called stocks with three columns: “open”, “close” and “volume”.

import datetime

with pool.instance().connect() as conn:
  # Here we acquire a reference to a table in the cluster. This table does not yet
  # have to exist.
  t = conn.table("stocks")

  # Drop the table if it already exists
  if t.exists() is True:
    t.remove()

  # Initialize our column definitions
  cols = [quasardb.ColumnInfo(quasardb.ColumnType.Double, "open"),
          quasardb.ColumnInfo(quasardb.ColumnType.Double, "close"),
          quasardb.ColumnInfo(quasardb.ColumnType.Int64, "volume")]

  # Now create the table with a shard size of 1 hour.
  t.create(cols, datetime.timedelta(hours=1))

4.1.5.3.1. Attaching tags

QuasarDB allows you to manage your tables by attaching tags to them. For more information about tags, see Managing tables with Tags.

In the example below, we will attach the tag nasdaq to the stocks table we created.

with pool.instance().connect() as conn:
  t = conn.table("stocks")

4.1.5.4. Insertion

Now that we have a table we want to insert into, we can write data to it using the batch insertion API. For more information, see the documentation on the batch inserter.

Python’s batch writer API is based on Numpy arrays, and the example below will demonstrate inserting data into these.

import numpy as np

# We start by initializing our data. As the batch writer is numpy-based, we
# initialize our data as numpy arrays.
#
# Our timestamp index must always have a ``datetime64`` dtype.
idx = np.array([np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns')],
               dtype='datetime64[ns]')

# Initialize our data arrays: the dtypes must match those of the columns, otherwise
# an error will be thrown.
open = np.array([3.40, 3.50], dtype='float64')
close = np.array([3.50, 3.55], dtype='float64')
volume = np.array([10000, 7500], dtype='int64')

# Now that we have our data ready, we can initialize our batch writer and push
# the data.
with pool.instance().connect() as conn:
  # First acquire a reference to writer.
  w = conn.writer(conn.table("stocks"))

  # The index does not require any offset, as there can only be a single index.
  w.set_index(idx)

  # Each of the `set` functions requires a numerical offset of each column to set. Given
  # the layout of the table as we created it in the previous section, it means that
  # column 0 maps to `open`, 1 maps to `close` and 2 maps to `volume`.
  w.set_double_column(0, open)
  w.set_double_column(1, close)
  w.set_int64_column(2, volume)

  # We have four different push functions, mapping to each of the different insertion modes
  # of the batch writer:
  #
  # - w.push(), which uses the default, transactional insertion mode which uses copy-on-write.
  # - w.push_async(), which uses the asynchronous insertion mode.
  # - w.push_fast(), which uses the fast insertion mode.
  # - w.push_truncate(), which uses the truncation insertion mode.
  #
  # We recommend push_async() for streaming data and/or small incremental inserts, and
  # push_fast() for batch inserts.
  w.push_fast()

4.1.5.5. Reader

On the other side we have the “bulk reader”. The bulk reader provides streaming access to a single table, optionally limited by certain columns and/or certain time ranges.

If you want to have efficient row-oriented access to the raw data in a table, this is the API you want to use. If you want to execute aggregates, complex where clauses and/or multi-table joins, please see the query API.

The example below will show you how to read our stock data for just a single day.

with pool.instance().connect() as conn:
  # First acquire a reference to the reader. In this case, we tell it to expose the
  # rows as dicts.

  reader = conn.table("stocks").reader(dict=True,
                                       ranges=[(np.datetime64('2019-02-01', 'ns'),
                                                np.datetime64('2019-02-02', 'ns'))])

  # Now we can iterate over the reader and evaluate the results
  for row in reader:
     assert row['open'] == 3.4
     assert row['close'] == 3.5
     assert row['volume'] == 10000

4.1.5.6. Queries

If you are looking for more flexible control over the kind of calculations performed on a dataset, or want to push certain computations to the cluster, QuasarDB offers an SQL-like query language for you to interact with your data. Please see our query language documentation.

In the example below, we will show you how to execute a simple query that calculates to total traded volume for the entire dataset.

with pool.instance().connect() as conn:
  result = conn.query("SELECT SUM(volume) AS vol FROM stocks")

  # We expect only a single result
  assert len(result) == 1

  # We iterate over the results by row
  for row in result:
     # ...
     assert row["vol"] == 17500
     pass

  # But, as they're a simple list, we can also directly access them like this
  assert result[0]["vol"] == 17500

4.1.6. Numpy adaptor

As mentioned before, QuasarDB’s Python API is based numpy: this allows for efficient storage and processing of large arrays of values, and works well for timeseries data.

For your convenience, we provide a quasardb.numpy batteries-included module that does the heavy lifting under the hood, and is the preferred choice for ingesting data in high performance scenarios:

  • It allows for conveniently ingesting large volumes of data in a single function call;

  • It ensures the input data has the correct shape, and converts data to the correct types where necessary;

  • It natively supports Numpy’s Masked Arrays to work with null values.

In addition to these, it also provides a query interface.

4.1.6.1. Importing the library

import quasardb.numpy as qdbnp

4.1.6.2. Querying

We can use the query function to use access the data using the Numpy adapter: it executes a query an returns the results as numpy arrays. It always returns a tuple of (index, dict | list[np.array]).

The function takes two positional arguments:

Position

Parameter

Type

Description

0

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

1

query

str

The query to execute

And two optional keyword arguments:

Parameter

Type

Default

Description

index

str or int

None

If provided, resolves column and uses that as the index. If string (e.g. $timestamp), uses that column as the index. If int (e.g. 1), looks up the column based on that offset.

If None, then returns a monotonically increasing counter for each row number.

dict

bool

False

If true, returns data arrays as a dict, otherwise a list of np.arrays.

The examples below will demonstrate how to use this function based on the same query we used in the earlier query example.

4.1.6.2.1. Example: dict results

The most basic way to query is to return the results as a dict of numpy arrays, which is demonstrated by the code below.

with pool.instance().connect() as conn:
  q = "SELECT * FROM stocks"
  (idx, result) = qdbnp.query(conn, q, dict=True, index='$timestamp')

  # Note that, when explicitly providing the `$timestamp` index, it is removed from the result
  # data.
  assert '$timestamp' not in result

  print("keys:   {}".format(result.keys()))
  print("index:  {}".format(idx))

  print("open:   {}".format(result['open']))
  print("close:  {}".format(result['close']))
  print("volume: {}".format(result['volume']))
keys:   dict_keys(['$table', 'open', 'close', 'volume'])
index:  ['2019-02-01T00:00:00.000000000' '2019-02-02T00:00:00.000000000']
open:   [3.4 3.5]
close:  [3.5 3.55]
volume: [10000 7500]

4.1.6.2.2. Example: array results

We can also run the same query, but return the data as arrays. This is usually more convenient if you wish to rely on list destructuring, as demonstrated by the code below:

with pool.instance().connect() as conn:
  # Note that, when explicitly providing the `$timestamp` index, it is removed from the data
  # arrays.
  q = "SELECT $timestamp, open, close, volume FROM stocks"
  (idx, (open, close, volume)) = qdbnp.query(conn, q, index='$timestamp')

  print("index:  {}".format(idx))
  print("open:   {}".format(open))
  print("close:  {}".format(close))
  print("volume: {}".format(volume))
index:  ['2019-02-01T00:00:00.000000000' '2019-02-02T00:00:00.000000000']
open:   [3.4 3.5]
close:  [3.5 3.55]
volume: [10000 7500]

4.1.6.3. Insertion

We can use the write_arrays function to insert new data into the database. This function uses the batch inserter under the hood, and comes with several options.

It takes three required, positional arguments:

Position

Parameter

Type

Description

0

data

Iterable of np.array, or dict-like of str:np.array

Numpy arrays to write into the database. Can either be a list of numpy arrays, in which case they are expected to be in the same order as table.list_columns(), and an array is provided for each of the columns. If the keyword argument index is None, the first array will be assumed to be an index with dtype datetime64[ns].

Alternatively, a dict of key/values may be provided, where the key is expected to be a table column label, and the value is expected to be a np.array. If present, a column with label '$timestamp' will be used as the index.

In all cases, all numpy arrays are expected to be of exactly the same length as the index.

1

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

2

table

quasardb.Table or str

Either a string or a reference to a QuasarDB table object. For example, 'stocks' or conn.table('stocks') are both valid values.

In addition to these, the function takes many optional keyword arguments:

Parameter

Type

Default

Description

dtype

np.dtype, list of np.dtype, or dict of str:np.dtype

None

Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label.

If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence.

index

np.array with dtype datetime64[ns]

None

Optionally explicitly provide an array as the $timestamp index. If not provided, the first argument to data will be used as the index.

drop_duplicates

bool or list[str]

None

Enables server-side deduplication of data when it is written into the table. When True, automatically deduplicates rows when all values of a row are identical. When a list of strings is provided, deduplicates only based on the values of these columns.

infer_types

bool

True

If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it.

Important: as conversions are expensive and often the majority of time spent when inserting data into QuasarDB, we strongly recommend setting this to False for performance-sensitive code.

_async

bool

False

Enables the asynchronous batch inserter mode.

fast

bool

False

Enables the fast batch inserter mode.

truncate

bool

False

Enables the truncation batch inserter mode.

For demonstration purposes, the code below generates sample data which we’ll be (re)using accross the various examples below.

# We first prepare our data again, this time we'll generate some random data.
rows = 10

# Generate an index
def gen_index(start_time):
   return np.array([(start_time + np.timedelta64(i, 's'))
                    for i in range(rows)]).astype('datetime64[ns]')

start_time = np.datetime64('2022-07-01', 'ns')

# As we can provide our data as a dict to the Numpy adaptor, which automatically
# maps each column by name.
data = {'open': np.random.uniform(100, 200, rows),
        'close': np.random.uniform(100, 200, rows),
        'volume': np.random.randint(10000, 20000, rows)}

4.1.6.3.1. Example: insertion

The example below demonstrates how to insert data into our stocks table using the fast insertion mode. For performance reasons, we disable implicit conversions by setting infer_types to False.

with pool.instance().connect() as conn:
   qdbnp.write_arrays(data, conn, conn.table('stocks'),
                      index=gen_index(start_time),
                      fast=True,
                      infer_types=False)

   # We now expect exactly 10 rows in this time range.
   q = "SELECT COUNT(volume) AS vol FROM stocks IN RANGE (2022-07-01, +1d)"
   (idx_, [result]) = qdbnp.query(conn, q)

   assert result[0] == rows

4.1.6.3.2. Example: deduplication

QuasarDB supports server-side deduplication of data as you insert it. We demonstrate this functionality by first actually creating duplicate data, and demonstrating how to avoid this.

with pool.instance().connect() as conn:
   # Note how we're explicitly setting `drop_duplicates` to `False`: this is the default
   # behavior, so not strictly necessary.
   qdbnp.write_arrays(data, conn, conn.table('stocks'),
                      index=gen_index(start_time),
                      drop_duplicates=False,
                      fast=True,
                      infer_types=False)

   # As we just inserted the same dataset again, we expect duplicate data.
   q = "SELECT $timestamp, open FROM stocks IN RANGE (2022-07-01, +1d) LIMIT 10"
   (idx1, result1) = qdbnp.query(conn, q, index='$timestamp', dict=True)

   assert len(idx1) == len(result1['open'])
   print("index: {}".format(idx1))
   print("open:  {}".format(result1['open']))

   # Now, when we insert a second time, but this time enable deduplication,
   # this will not happen: as the entire dataset is a perfect duplicate of
   # previously stored data, none of the rows should be inserted.
   qdbnp.write_arrays(data, conn, conn.table('stocks'),
                      index=gen_index(start_time),
                      drop_duplicates=True,
                      fast=True,
                      infer_types=False)

   # This time, we query again and expect the exact same results as before.
   (idx2, result2) = qdbnp.query(conn, q, index='$timestamp', dict=True)

   np.testing.assert_array_equal(idx1, idx2)
   np.testing.assert_array_equal(result1['open'], result2['open'])
index: ['2022-07-01T00:00:00.000000000' '2022-07-01T00:00:00.000000000'
 '2022-07-01T00:00:01.000000000' '2022-07-01T00:00:01.000000000'
 '2022-07-01T00:00:02.000000000' '2022-07-01T00:00:02.000000000'
 '2022-07-01T00:00:03.000000000' '2022-07-01T00:00:03.000000000'
 '2022-07-01T00:00:04.000000000' '2022-07-01T00:00:04.000000000']
open:  [128.34719901007253 128.34719901007253 184.64105586080174
 184.64105586080174 133.62350872315778 133.62350872315778
 133.38943124157223 133.38943124157223 105.09830476194443
 105.09830476194443]

4.1.6.3.3. Example: masked arrays

The Numpy adaptor uses masked arrays to represent the data: masked arrays enable explicit “masking” of values in an array that are null. This is in contrast with Pandas, which relies on inline representation of null values which leads to surprising null value problems we discuss below.

The example below demonstrates on how to use a masked array with the numpy adaptor to deal with null values. For this example, we will use an “orders” table which may have null values for an integer column.

Let’s first create this table.

cols = [quasardb.ColumnInfo(quasardb.ColumnType.String, "action"),
        quasardb.ColumnInfo(quasardb.ColumnType.Int64, "buy_qty"),
        quasardb.ColumnInfo(quasardb.ColumnType.Int64, "sell_qty")]

with pool.instance().connect() as conn:
  t = conn.table("orders")
  if t.exists() is True:
    t.remove()

  t.create(cols, datetime.timedelta(hours=1))

Now that the table is created, let’s prepare some data for 10 orders. Whenever the action is buy, we expect a buy_qty, whenever it’s sell, we expect a sell_qty.

actions = np.random.choice(a=['buy', 'sell'], size=rows)
buy_qtys = np.random.randint(1, 10, rows)
sell_qtys = np.random.randint(1, 10, rows)

print("actions: {}".format(actions))
print("buy_qtys: {}".format(buy_qtys))
print("sell_qtys: {}".format(sell_qtys))
actions: ['buy' 'buy' 'sell' 'buy' 'buy' 'buy' 'buy' 'buy' 'buy' 'sell']
buy_qtys: [2 3 5 1 3 6 6 8 2 3]
sell_qtys: [1 3 3 5 8 6 7 8 4 6]

As you can see, both buy and sell qty arrays have been initialized, but without any null values. Let’s mask the entries of both buy and sell qty arrays:

# The buy mask is a boolean arrays that is True whenever the action is 'buy'
is_buy = actions == 'buy'

# The sell mask is a boolean arrays that is True whenever the action is 'sell'
is_sell = actions == 'sell'

# We can now use these to convert the buy and sell qty arrays to a masked array.
# As a True value indicates the value is masked (as in, it is hidden), this means
# we want to mask all buy entries whenever an action is sell.

import numpy.ma as ma
buy_qtys = ma.array(data=buy_qtys,
                    mask=is_sell)
sell_qtys = ma.array(data=sell_qtys,
                     mask=is_buy)

print("actions: {}".format(actions))
print("buy_qtys: {}".format(buy_qtys))
print("sell_qtys: {}".format(sell_qtys))
actions: ['buy' 'buy' 'sell' 'buy' 'buy' 'buy' 'buy' 'buy' 'buy' 'sell']
buy_qtys: [2 3 -- 1 3 6 6 8 2 --]
sell_qtys: [-- -- 3 -- -- -- -- -- -- 6]

Perfect, we can see the values are masked in the correct places. We can now use these masked arrays to insert these into QuasarDB directly, which will be stored as null values.

orders = {'action': actions,
          'buy_qty': buy_qtys,
          'sell_qty': sell_qtys}

with pool.instance().connect() as conn:
   qdbnp.write_arrays(orders, conn, conn.table('orders'),
                      index=gen_index(start_time),
                      fast=True,
                      infer_types=False)

   # As an example, we can now query back the data for all buys, and can see
   # the data in QuasarDB is as we would expect.
   q = "SELECT action, buy_qty, sell_qty FROM orders WHERE action = 'buy'"
   (idx, (actions_, buy_qtys_, sell_qtys_)) = qdbnp.query(conn, q)

   print("actions: {}".format(actions_))
   print("buy_qtys:  {}".format(buy_qtys_))
   print("sell_qtys: {}".format(sell_qtys_))
actions: ['buy' 'buy' 'buy' 'buy' 'buy' 'buy' 'buy' 'buy']
buy_qtys:  [2 3 1 3 6 6 8 2]
sell_qtys: [-- -- -- -- -- -- -- --]

Exactly as we wanted: all sell_qty values are null / masked, while we have the values we expected for all buy_qty values.

4.1.7. Pandas adaptor

QuasarDB’s Python API also provides a Pandas adaptor that builds on top of the Numpy adaptor. This adaptor enables direct integration with Pandas, and as such allows you to read and write Pandas Dataframes directly with QuasarDB.

4.1.7.1. Importing the library

import quasardb.pandas as qdbpd

4.1.7.2. Querying

We can use the query function to use access the data using the Pandas adapter: it executes a query an returns the results as a Dataframe.

The function takes two positional arguments:

Position

Parameter

Type

Description

0

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

1

query

str

The query to execute

And an optional keyword argument:

Parameter

Type

Default

Description

index

str or int

None

If provided, resolves column and uses that as the index. If string (e.g. $timestamp), uses that column as the index.

If None, then uses Pandas’ default dataframe indexing (a mononitically increasing counter).

The example below will demonstrate how to use this function based on the same query we used in the earlier query example.

with pool.instance().connect() as conn:
  df = qdbpd.query(conn, "SELECT * FROM stocks LIMIT 10")

df
$timestamp $table open close volume
0 2019-02-01 00:00:00 stocks 3.400000 3.500000 10000
1 2019-02-02 00:00:00 stocks 3.500000 3.550000 7500
2 2022-07-01 00:00:00 stocks 128.347199 159.411600 14433
3 2022-07-01 00:00:00 stocks 128.347199 159.411600 14433
4 2022-07-01 00:00:01 stocks 184.641056 144.816449 13381
5 2022-07-01 00:00:01 stocks 184.641056 144.816449 13381
6 2022-07-01 00:00:02 stocks 133.623509 112.797994 13898
7 2022-07-01 00:00:02 stocks 133.623509 112.797994 13898
8 2022-07-01 00:00:03 stocks 133.389431 199.937392 12937
9 2022-07-01 00:00:03 stocks 133.389431 199.937392 12937

As a convenience, we can use the $timestamp column as the index as well, by setting the index parameter accordingly.

with pool.instance().connect() as conn:
  df = qdbpd.query(conn, "SELECT * FROM stocks LIMIT 10", index='$timestamp')

df
$table open close volume
2019-02-01 00:00:00 stocks 3.400000 3.500000 10000
2019-02-02 00:00:00 stocks 3.500000 3.550000 7500
2022-07-01 00:00:00 stocks 128.347199 159.411600 14433
2022-07-01 00:00:00 stocks 128.347199 159.411600 14433
2022-07-01 00:00:01 stocks 184.641056 144.816449 13381
2022-07-01 00:00:01 stocks 184.641056 144.816449 13381
2022-07-01 00:00:02 stocks 133.623509 112.797994 13898
2022-07-01 00:00:02 stocks 133.623509 112.797994 13898
2022-07-01 00:00:03 stocks 133.389431 199.937392 12937
2022-07-01 00:00:03 stocks 133.389431 199.937392 12937

4.1.7.3. Insertion

We can use the write_dataframe function to insert new data into the database. This function uses the Numpy adaptor’s write_arrays function under the hood, and comes with several options:

It takes three required, positional arguments:

Position

Parameter

Type

Description

0

df

pandas.DataFrame

The pandas dataframe to store

1

cluster

quasardb.Cluster

Active connection to the QuasarDB cluster

2

table

quasardb.Table or str

Either a string or a reference to a QuasarDB table object. For example, 'stocks' or conn.table('stocks') are both valid values.

In addition to these, the function takes many optional keyword arguments:

Parameter

Type

Default

Description

dtype

np.dtype, list of np.dtype, or dict of str:np.dtype

None

Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label.

If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence.

create

bool

False

If true, automatically creates the table if it did not yet exist.

shard_size

datetime.timedelta

None

If create is True and a table is to be created, uses this value as the shard size. If not, uses QuasarDB’s default shard size (1d).

drop_duplicates

bool or list[str]

None

Enables server-side deduplication of data when it is written into the table. When True, automatically deduplicates rows when all values of a row are identical. When a list of strings is provided, deduplicates only based on the values of these columns.

infer_types

bool

True

If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it.

Important: as conversions are expensive and often the majority of time spent when inserting data into QuasarDB, we strongly recommend setting this to False for performance-sensitive code.

_async

bool

False

Enables the asynchronous batch inserter mode.

fast

bool

False

Enables the fast batch inserter mode.

truncate

bool

False

Enables the truncation batch inserter mode.

For demonstration purposes, the code below generates sample data which we’ll be (re)using accross the various examples below.

import pandas as pd

start_time = np.datetime64('2022-07-15', 'ns')

# We will reuse the earlier sample data to construct a dataframe instead.
data_ = data
data_['$timestamp'] = gen_index(start_time)

# Note how we're setting the `$timestamp` column as the index: when writing a dataframe
# to QuasarDB, a `datetime64[ns]` index is required.
df = pd.DataFrame.from_dict(data_)
df = df.set_index('$timestamp').reindex()
df
open close volume
$timestamp
2022-07-15 00:00:00 128.347199 159.411600 14433
2022-07-15 00:00:01 184.641056 144.816449 13381
2022-07-15 00:00:02 133.623509 112.797994 13898
2022-07-15 00:00:03 133.389431 199.937392 12937
2022-07-15 00:00:04 105.098305 140.237032 17998
2022-07-15 00:00:05 146.805857 145.007622 13670
2022-07-15 00:00:06 191.286479 148.826930 12916
2022-07-15 00:00:07 191.809996 107.640025 16448
2022-07-15 00:00:08 155.274311 164.768294 11573
2022-07-15 00:00:09 121.702696 114.526020 13293

4.1.7.3.1. Example: insertion

The example below demonstrates how to insert data into our stocks table using the fast insertion mode. For performance reasons, we disable implicit conversions by setting infer_types to False.

with pool.instance().connect() as conn:
   qdbpd.write_dataframe(df, conn, conn.table('stocks'),
                         fast=True,
                         infer_types=False)

   # We now expect exactly 10 rows in this time range.
   q = "SELECT COUNT(volume) AS vol FROM stocks IN RANGE (2022-07-15, +1d)"
   result = qdbpd.query(conn, q)

result
vol
0 10

4.1.7.3.2. Example: deduplication

QuasarDB supports server-side deduplication of data as you insert it. We demonstrate this functionality by first actually creating duplicate data, and demonstrating how to avoid this.

with pool.instance().connect() as conn:
   # Note how we're explicitly setting `drop_duplicates` to `False`: this is the default
   # behavior, so not strictly necessary.
   qdbpd.write_dataframe(df, conn, conn.table('stocks'),
                         drop_duplicates=False,
                         fast=True,
                         infer_types=False)

   # As we just inserted the same dataset again, we expect duplicate data.
   q = "SELECT $timestamp, open FROM stocks IN RANGE (2022-07-15, +1d) LIMIT 10"
   df1 = qdbpd.query(conn, q, index='$timestamp')
   print(df1.head)

   # Now, when we insert a second time, but this time enable deduplication,
   # this will not happen: as the entire dataset is a perfect duplicate of
   # previously stored data, none of the rows should be inserted.
   qdbpd.write_dataframe(df, conn, conn.table('stocks'),
                         drop_duplicates=True,
                         fast=True,
                         infer_types=False)

   # This time, we query again and expect the exact same results as before.
   df2 = qdbpd.query(conn, q, index='$timestamp')
   print(df2.head)
<bound method NDFrame.head of                            open
2022-07-15 00:00:00  128.347199
2022-07-15 00:00:00  128.347199
2022-07-15 00:00:01  184.641056
2022-07-15 00:00:01  184.641056
2022-07-15 00:00:02  133.623509
2022-07-15 00:00:02  133.623509
2022-07-15 00:00:03  133.389431
2022-07-15 00:00:03  133.389431
2022-07-15 00:00:04  105.098305
2022-07-15 00:00:04  105.098305>
<bound method NDFrame.head of                            open
2022-07-15 00:00:00  128.347199
2022-07-15 00:00:00  128.347199
2022-07-15 00:00:01  184.641056
2022-07-15 00:00:01  184.641056
2022-07-15 00:00:02  133.623509
2022-07-15 00:00:02  133.623509
2022-07-15 00:00:03  133.389431
2022-07-15 00:00:03  133.389431
2022-07-15 00:00:04  105.098305
2022-07-15 00:00:04  105.098305>

4.1.7.4. Null values

Null values in Pandas have sometimes surprising behavior which can affect the performance and correctness of your code, and deserves some explanation. Contrary to Numpy’s masked arrays (which we covered in an earlier section), Pandas tries to represent null values inline: for example, for a float64 series, it represents a null value as a NaN: this makes sense, as this is an explicitly defined standard of the IEEE 754 Floating Point Arithmetic standard.

For illustration purposes, let’s generate a small dataframe which has a few null values:

data = {'floats': [1.2, 3.4, None, 5.6, 7.8]}
pd.DataFrame.from_dict(data)
floats
0 1.2
1 3.4
2 NaN
3 5.6
4 7.8

As you can see, Pandas inserted the NaN in the correct place. But what happens when you’re using a type that cannot be represented inline? For example, what if we want to combine null values with integers?

As there is no way to represent null values with integers, Pandas will instead silently convert the input data to doubles. Observe the following:

data['ints_with_null'] = [1, 2, None, 4, 5]
data['ints_no_null']   = [1, 2, 3, 4, 5]

pd.DataFrame.from_dict(data)
floats ints_with_null ints_no_null
0 1.2 1.0 1
1 3.4 2.0 2
2 NaN NaN 3
3 5.6 4.0 4
4 7.8 5.0 5

Even without printing the dtypes, it’s now obvious that the array with null values has been converted to floating point.

As QuasarDB will want these values as integers instead, what this means is that a lot of conversions will take place under the hood:

  1. Pandas first converts the array to float64, as demonstrated above;

  2. QuasarDB’s adapter probes the null values and converts this into a masked array;

  3. QuasarDB’s adapter then converts the masked array back to int64.

This means we’re not just copying, but actually converting the data to-and-from float64 before we’re inserting it into QuasarDB.

Knowing this, we recommend the following:

  • In performance sensitive scenarios: work directly with the Numpy adaptor as it avoids this entirely;

  • In other scenarios, while calling write_dataframe, either:

    • explicitly set the dtype of these columns to their desired type (in this case, int64);

    • ensure infer_types is to True (the default).

4.1.8. Reference