4.1. Python#
4.1.1. Requirements#
Before you can get started, please ensure that:
You have the latest version of the QuasarDB client library installed on your computer
You have access to a running QuasarDB cluster.
The rest of this document assumes you have a cluster up and running under qdb://127.0.0.1:2836.
4.1.2. Supported python versions#
The currently supported Python versions are:
Python 3.7;
Python 3.8;
Python 3.9;
Python 3.10;
Python 3.11.
Python 3.12.
All our packages are built and published as binaries to PyPi. For Linux, our packages are built against the manylinux2014 / PEP 599 spec which requires glibc 2.17 or later.
4.1.3. Installation#
The QuasarDB Python API is distributed as a binary using PyPi:
pip install quasardb
quasardb==3.13.4
quasardb = "3.13.4"
4.1.4. Introduction#
The QuasarDB Python API consists of three components:
Component |
Description |
---|---|
Core API |
Provides basic functionality, such as connection establishment, statistics, and pure Python interfaces for insertion, retrieval and queries. |
Numpy adaptor |
Highest performance interface for reading and writing numpy arrays directly to QuasarDB. |
Pandas adaptor |
High performance interface for eading and writing Pandas dataframes directly to QuasarDB. |
This tutorial will guide you through each of these separately.
4.1.5. Core API#
4.1.5.1. Importing libraries#
import quasardb
4.1.5.2. Connection management#
Establishing a connection with the QuasarDB cluster is easy. You need the URI of at least one of your nodes, and the client will automatically detect all nodes in the cluster.
A QuasarDB cluster operates in either a secure or an insecure mode. If you do not know whether your cluster is running in secure mode, please ask your system administrator.
4.1.5.2.1. Insecure connection#
uri = 'qdb://127.0.0.1:2836'
# Establish a one-off connection
conn = quasardb.Cluster(uri)
try:
# ... do something with conn
pass
finally:
# Don't forget to release the connection when you're done
conn.close()
# Alternatively, scoped resources can be used to automatically manage
# the connection lifecycle:
with quasardb.Cluster(uri) as conn:
# ... do something with conn
pass
4.1.5.2.2. Secure connection#
In addition to the cluster URI, a secure connection requires you to provide:
The cluster’s public key file
Your username
Your secret/private key
Your username and secret key is usually provided in a single file, the user’s security file. More information on QuasarDB’s security mechanisms can be found in our security manual. If you do not know the values of these parameters, please ask your system administrator.
The example below demonstrates how to connect to the Quasar cluster using these credentials.
import json
def parse_public_key(path):
with open(path, 'r') as fp:
return fp.read()
def parse_secret_key(path):
with open(path, 'r') as fp:
parsed = json.load(fp)
return (parsed['username'], parsed['secret_key'])
def get_conn(uri, cluster_public_key_file, user_security_file):
cluster_public_key = parse_public_key(cluster_public_key_file)
(user_name, user_private_key) = parse_secret_key(user_security_file)
return quasardb.Cluster(uri,
user_name=user_name,
user_private_key=user_private_key,
cluster_public_key=cluster_public_key)
# Establish a one-off connection
conn = get_conn('qdb://127.0.0.1:2838',
'cluster_public.key',
'user_private.key')
try:
# ... do something with conn
pass
finally:
# Don't forget to release the connection when you're done
conn.close()
# Alternatively, scoped resources can be used to automatically manage
# the connection lifecycle:
with get_conn('qdb://127.0.0.1:2838',
'cluster_public.key',
'user_private.key') as conn:
# ... do something with conn
pass
4.1.5.2.3. Connection pool#
The Python API also comes with a built-in connection pool. This allows you to automatically manage the lifecycle of your connections, and use a decorator to automatically insert the connection in the correct place.
The connection pool needs to be initialized with the connection properties at the start, after which you can use the built-in decorator. The example below demonstrates how to use the connection pool with a insecure connection, but the same applies to secure connections.
import quasardb.pool as pool
# Here we initialize our connection pool. It accepts the same arguments as
# quasardb.Cluster.
pool.initialize(uri='qdb://127.0.0.1:2836')
# You can use the connection pool instance directly like this:
with pool.instance().connect() as conn:
# ... do something with conn
pass
# Alternatively, you can make use of the decorator function which handles
# connection management for you. By default, `conn` is always injected as
# the first argument.
@pool.with_conn()
def my_handler(conn, additional_args):
print("has conn {} with args {}".format(conn, additional_args))
my_handler('test')
has conn <quasardb.pool.SessionWrapper object at 0x7f0d40711ed0> with args test
4.1.5.3. Creating a table#
Before we can store timeseries data, we need to create a table. A table is uniquely identified by a name (e.g. “stocks” or “sensors”) and must have 1 or more columns. A timestamp index called $timestamp
is always created by default, and does not need to be explicitly provided.
When creating a table, you will need to know the following:
The name of the table: in the example below, we will create a table called
stocks
.What columns you need: in wanthe example below, we will create a table with three columns: *
open
, a double precision column that represents the price of a stock at open time; *close
, a double precision column that represents the price of a stock at close time; *volume
, an integer column that represents the total volume of stock traded.The shard size of the size of the column: when not provided, defaults to 1d. For more information, see the documentations about Shards.
In this example we will create a table called stocks
with three columns: “open”, “close” and “volume”.
import datetime
with pool.instance().connect() as conn:
# Here we acquire a reference to a table in the cluster. This table does not yet
# have to exist.
t = conn.table("stocks")
# Drop the table if it already exists
if t.exists() is True:
t.remove()
# Initialize our column definitions
cols = [quasardb.ColumnInfo(quasardb.ColumnType.Double, "open"),
quasardb.ColumnInfo(quasardb.ColumnType.Double, "close"),
quasardb.ColumnInfo(quasardb.ColumnType.Int64, "volume")]
# Now create the table with a shard size of 1 hour.
t.create(cols, datetime.timedelta(hours=1))
4.1.5.4. Insertion#
Now that we have a table we want to insert into, we can write data to it using the batch insertion API. For more information, see the documentation on the batch inserter.
Python’s batch writer API is based on Numpy arrays, and the example below will demonstrate inserting data into these.
import numpy as np
# We start by initializing our data. As the batch writer is numpy-based, we
# initialize our data as numpy arrays.
#
# Our timestamp index must always have a ``datetime64`` dtype.
idx = np.array([np.datetime64('2019-02-01', 'ns'), np.datetime64('2019-02-02', 'ns')],
dtype='datetime64[ns]')
# Initialize our data arrays: the dtypes must match those of the columns, otherwise
# an error will be thrown.
open = np.array([3.40, 3.50], dtype='float64')
close = np.array([3.50, 3.55], dtype='float64')
volume = np.array([10000, 7500], dtype='int64')
# Now that we have our data ready, we can initialize our batch writer and push
# the data.
with pool.instance().connect() as conn:
# First acquire a reference the table we wish to insert into.
t = conn.table("stocks")
# Now, let's prepare the data for the batch writer by using a special class
# which holds this data.
d = quasardb.WriterData()
# We could append data for multiple tables here.
d.append(t, idx, [open, close, volume])
# Acquire a reference to the batch writer, and push the data.
w = conn.writer()
# We have four different push modes, mapping to each of the different insertion modes
# of the batch writer:
#
# - None, which uses the default, transactional insertion mode which uses copy-on-write.
# - 'async', which uses the asynchronous insertion mode.
# - 'fast', which uses the fast insertion mode.
# - 'truncate', which uses the truncation insertion mode.
#
# We recommend 'async' for streaming data and/or small incremental inserts, and
# 'fast' for batch inserts.
w.push(d, push_mode=quasardb.WriterPushMode.Fast)
4.1.5.5. Reader#
On the other side we have the “bulk reader”. The bulk reader provides streaming access to a single table, optionally limited by certain columns and/or certain time ranges.
If you want to have efficient row-oriented access to the raw data in a table, this is the API you want to use. If you want to execute aggregates, complex where clauses and/or multi-table joins, please see the query API.
The example below will show you how to read our stock data for just a single day.
with pool.instance().connect() as conn:
table_names = ["stocks"]
# Now we'll acquire a reference to the reader, which will yield batches of data.
with conn.reader(table_names) as reader:
# Read all batches into memory, we consider only the first one
batches = list(reader)
batch = batches[0]
# Data is returned in column-oriented format
assert batch['open'][0] == 3.4
assert batch['close'][0] == 3.5
assert batch['volume'][0] == 10000
4.1.5.6. Queries#
If you are looking for more flexible control over the kind of calculations performed on a dataset, or want to push certain computations to the cluster, QuasarDB offers an SQL-like query language for you to interact with your data. Please see our query language documentation.
In the example below, we will show you how to execute a simple query that calculates to total traded volume for the entire dataset.
with pool.instance().connect() as conn:
result = conn.query("SELECT SUM(volume) AS vol FROM stocks")
# We expect only a single result
assert len(result) == 1
# We iterate over the results by row
for row in result:
# ...
assert row["vol"] == 17500
pass
# But, as they're a simple list, we can also directly access them like this
assert result[0]["vol"] == 17500
4.1.5.7. Timezone Support#
By default, QuasarDB stores and queries all data using the UTC timezone. QuasarDB supports adjusting the timezone for queries: this enables you to e.g. aggregate by day or week, taking into account your local timezone.
To set the timezone for queries to Europe/Brussels
, invoke the set_timezone
function on the connection’s options as follows:
with pool.instance().connect() as conn:
conn.options().set_timezone("Europe/Brussels")
result = conn.query("SELECT $timestamp, SUM(volume) AS vol FROM stocks IN RANGE (TODAY(), -1month) GROUP BY 1d")
The query is now executed using the provided timezone.
4.1.6. Numpy adaptor#
As mentioned before, QuasarDB’s Python API is based numpy: this allows for efficient storage and processing of large arrays of values, and works well for timeseries data.
For your convenience, we provide a quasardb.numpy
batteries-included module that does the heavy lifting under the hood, and is the preferred choice for ingesting data in high performance scenarios:
It allows for conveniently ingesting large volumes of data in a single function call;
It ensures the input data has the correct shape, and converts data to the correct types where necessary;
It natively supports Numpy’s Masked Arrays to work with null values.
In addition to these, it also provides a query interface.
4.1.6.1. Importing the library#
import quasardb.numpy as qdbnp
4.1.6.2. Querying#
We can use the query
function to use access the data using the Numpy adapter: it executes a query an returns the results as numpy arrays. It always returns a tuple of (index, dict | list[np.array])
.
The function takes two positional arguments:
Position |
Parameter |
Type |
Description |
---|---|---|---|
0 |
cluster |
quasardb.Cluster |
Active connection to the QuasarDB cluster |
1 |
query |
str |
The query to execute |
And two optional keyword arguments:
Parameter |
Type |
Default |
Description |
---|---|---|---|
index |
|
|
If provided, resolves column and uses that as the index. If string (e.g. $timestamp), uses that column as the index. If int (e.g. 1), looks up the column based on that offset. If |
dict |
|
|
If true, returns data arrays as a dict, otherwise a list of np.arrays. |
The examples below will demonstrate how to use this function based on the same query we used in the earlier query example.
4.1.6.2.1. Example: dict results#
The most basic way to query is to return the results as a dict of numpy arrays, which is demonstrated by the code below.
with pool.instance().connect() as conn:
q = "SELECT * FROM stocks"
(idx, result) = qdbnp.query(conn, q, dict=True, index='$timestamp')
# Note that, when explicitly providing the `$timestamp` index, it is removed from the result
# data.
assert '$timestamp' not in result
print("keys: {}".format(result.keys()))
print("index: {}".format(idx))
print("open: {}".format(result['open']))
print("close: {}".format(result['close']))
print("volume: {}".format(result['volume']))
keys: dict_keys(['$table', 'open', 'close', 'volume'])
index: ['2019-02-01T00:00:00.000000000' '2019-02-02T00:00:00.000000000']
open: [3.4 3.5]
close: [3.5 3.55]
volume: [10000 7500]
4.1.6.2.2. Example: array results#
We can also run the same query, but return the data as arrays. This is usually more convenient if you wish to rely on list destructuring, as demonstrated by the code below:
with pool.instance().connect() as conn:
# Note that, when explicitly providing the `$timestamp` index, it is removed from the data
# arrays.
q = "SELECT $timestamp, open, close, volume FROM stocks"
(idx, (open, close, volume)) = qdbnp.query(conn, q, index='$timestamp')
print("index: {}".format(idx))
print("open: {}".format(open))
print("close: {}".format(close))
print("volume: {}".format(volume))
index: ['2019-02-01T00:00:00.000000000' '2019-02-02T00:00:00.000000000']
open: [3.4 3.5]
close: [3.5 3.55]
volume: [10000 7500]
4.1.6.3. Insertion#
We can use the write_arrays
function to insert new data into the database. This function uses the batch inserter under the hood, and comes with several options.
It takes three required, positional arguments:
Position |
Parameter |
Type |
Description |
---|---|---|---|
0 |
data |
Iterable of |
Numpy arrays to write into the database. Can either be a list of numpy arrays,
in which case they are expected to be in the same order as Alternatively, a dict of key/values may be provided, where the key is expected
to be a table column label, and the value is expected to be a np.array. If present,
a column with label In all cases, all numpy arrays are expected to be of exactly the same length as the index. |
1 |
cluster |
|
Active connection to the QuasarDB cluster |
2 |
table |
|
Either a string or a reference to a QuasarDB table object. For example, |
In addition to these, the function takes many optional keyword arguments:
Parameter |
Type |
Default |
Description |
---|---|---|---|
dtype |
|
|
Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label. If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence. |
index |
|
|
Optionally explicitly provide an array as the $timestamp index. If not provided,
the first argument to |
deduplicate |
|
|
Enables server-side deduplication of data when it
is written into the table. When |
deduplication_mode |
|
|
When |
infer_types |
|
|
If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it. Important: as conversions are expensive and often the majority of time spent
when inserting data into QuasarDB, we strongly recommend setting this to |
_async |
|
|
Enables the asynchronous batch inserter mode. |
fast |
|
|
Enables the fast batch inserter mode. |
truncate |
|
|
Enables the truncation batch inserter mode. |
For demonstration purposes, the code below generates sample data which we’ll be (re)using accross the various examples below.
# We first prepare our data again, this time we'll generate some random data.
rows = 10
# Generate an index
def gen_index(start_time):
return np.array([(start_time + np.timedelta64(i, 's'))
for i in range(rows)]).astype('datetime64[ns]')
start_time = np.datetime64('2022-07-01', 'ns')
# As we can provide our data as a dict to the Numpy adaptor, which automatically
# maps each column by name.
data = {'open': np.random.uniform(100, 200, rows),
'close': np.random.uniform(100, 200, rows),
'volume': np.random.randint(10000, 20000, rows)}
4.1.6.3.1. Example: insertion#
The example below demonstrates how to insert data into our stocks
table using the fast insertion mode. For performance reasons, we disable implicit conversions by setting infer_types
to False
.
with pool.instance().connect() as conn:
qdbnp.write_arrays(data, conn, conn.table('stocks'),
index=gen_index(start_time),
fast=True,
infer_types=False)
# We now expect exactly 10 rows in this time range.
q = "SELECT COUNT(volume) AS vol FROM stocks IN RANGE (2022-07-01, +1d)"
(idx_, [result]) = qdbnp.query(conn, q)
assert result[0] == rows
4.1.6.3.2. Example: deduplication#
QuasarDB supports server-side deduplication of data as you insert it. We demonstrate this functionality by first actually creating duplicate data, and demonstrating how to avoid this.
with pool.instance().connect() as conn:
# Note how we're explicitly setting `deduplicate` to `False`: this is the default
# behavior, so not strictly necessary.
qdbnp.write_arrays(data, conn, conn.table('stocks'),
index=gen_index(start_time),
deduplicate=False,
fast=True,
infer_types=False)
# As we just inserted the same dataset again, we expect duplicate data.
q = "SELECT $timestamp, open FROM stocks IN RANGE (2022-07-01, +1d) LIMIT 10"
(idx1, result1) = qdbnp.query(conn, q, index='$timestamp', dict=True)
assert len(idx1) == len(result1['open'])
print("index: {}".format(idx1))
print("open: {}".format(result1['open']))
# Now, when we insert a second time, but this time enable deduplication,
# this will not happen: as the entire dataset is a perfect duplicate of
# previously stored data, none of the rows should be inserted.
qdbnp.write_arrays(data, conn, conn.table('stocks'),
index=gen_index(start_time),
deduplicate=True,
fast=True,
infer_types=False)
# This time, we query again and expect the exact same results as before.
(idx2, result2) = qdbnp.query(conn, q, index='$timestamp', dict=True)
np.testing.assert_array_equal(idx1, idx2)
np.testing.assert_array_equal(result1['open'], result2['open'])
index: ['2022-07-01T00:00:00.000000000' '2022-07-01T00:00:00.000000000'
'2022-07-01T00:00:01.000000000' '2022-07-01T00:00:01.000000000'
'2022-07-01T00:00:02.000000000' '2022-07-01T00:00:02.000000000'
'2022-07-01T00:00:03.000000000' '2022-07-01T00:00:03.000000000'
'2022-07-01T00:00:04.000000000' '2022-07-01T00:00:04.000000000']
open: [129.8009381555491 129.8009381555491 134.86020632486571 134.86020632486571
115.54372771310257 115.54372771310257 198.11043129730098
198.11043129730098 125.5302704963691 125.5302704963691]
4.1.6.3.3. Example: masked arrays#
The Numpy adaptor uses masked arrays to represent the data: masked arrays enable explicit “masking” of values in an array that are null. This is in contrast with Pandas, which relies on inline representation of null values which leads to surprising null value problems we discuss below.
The example below demonstrates on how to use a masked array with the numpy adaptor to deal with null values. For this example, we will use an “orders” table which may have null values for an integer column.
Let’s first create this table.
cols = [quasardb.ColumnInfo(quasardb.ColumnType.String, "action"),
quasardb.ColumnInfo(quasardb.ColumnType.Int64, "buy_qty"),
quasardb.ColumnInfo(quasardb.ColumnType.Int64, "sell_qty")]
with pool.instance().connect() as conn:
t = conn.table("orders")
if t.exists() is True:
t.remove()
t.create(cols, datetime.timedelta(hours=1))
Now that the table is created, let’s prepare some data for 10 orders. Whenever the action is buy
, we expect a buy_qty
, whenever it’s sell
, we expect a sell_qty
.
actions = np.random.choice(a=['buy', 'sell'], size=rows)
buy_qtys = np.random.randint(1, 10, rows)
sell_qtys = np.random.randint(1, 10, rows)
print("actions: {}".format(actions))
print("buy_qtys: {}".format(buy_qtys))
print("sell_qtys: {}".format(sell_qtys))
actions: ['sell' 'buy' 'buy' 'sell' 'sell' 'sell' 'buy' 'sell' 'buy' 'buy']
buy_qtys: [3 6 7 9 5 3 8 3 1 3]
sell_qtys: [9 1 5 3 9 7 8 2 7 7]
As you can see, both buy and sell qty arrays have been initialized, but without any null values. Let’s mask the entries of both buy and sell qty arrays:
# The buy mask is a boolean arrays that is True whenever the action is 'buy'
is_buy = actions == 'buy'
# The sell mask is a boolean arrays that is True whenever the action is 'sell'
is_sell = actions == 'sell'
# We can now use these to convert the buy and sell qty arrays to a masked array.
# As a True value indicates the value is masked (as in, it is hidden), this means
# we want to mask all buy entries whenever an action is sell.
import numpy.ma as ma
buy_qtys = ma.array(data=buy_qtys,
mask=is_sell)
sell_qtys = ma.array(data=sell_qtys,
mask=is_buy)
print("actions: {}".format(actions))
print("buy_qtys: {}".format(buy_qtys))
print("sell_qtys: {}".format(sell_qtys))
actions: ['sell' 'buy' 'buy' 'sell' 'sell' 'sell' 'buy' 'sell' 'buy' 'buy']
buy_qtys: [-- 6 7 -- -- -- 8 -- 1 3]
sell_qtys: [9 -- -- 3 9 7 -- 2 -- --]
Perfect, we can see the values are masked in the correct places. We can now use these masked arrays to insert these into QuasarDB directly, which will be stored as null values.
orders = {'action': actions,
'buy_qty': buy_qtys,
'sell_qty': sell_qtys}
with pool.instance().connect() as conn:
qdbnp.write_arrays(orders, conn, conn.table('orders'),
index=gen_index(start_time),
fast=True,
infer_types=False)
# As an example, we can now query back the data for all buys, and can see
# the data in QuasarDB is as we would expect.
q = "SELECT action, buy_qty, sell_qty FROM orders WHERE action = 'buy'"
(idx, (actions_, buy_qtys_, sell_qtys_)) = qdbnp.query(conn, q)
print("actions: {}".format(actions_))
print("buy_qtys: {}".format(buy_qtys_))
print("sell_qtys: {}".format(sell_qtys_))
actions: ['buy' 'buy' 'buy' 'buy' 'buy']
buy_qtys: [6 7 8 1 3]
sell_qtys: [-- -- -- -- --]
Exactly as we wanted: all sell_qty
values are null / masked, while we have the values we expected for all buy_qty
values.
4.1.7. Pandas adaptor#
QuasarDB’s Python API also provides a Pandas adaptor that builds on top of the Numpy adaptor. This adaptor enables direct integration with Pandas, and as such allows you to read and write Pandas Dataframes directly with QuasarDB.
4.1.7.1. Importing the library#
import quasardb.pandas as qdbpd
4.1.7.2. Querying#
We can use the query
function to use access the data using the Pandas adapter: it executes a query an returns the results as a Dataframe.
The function takes two positional arguments:
Position |
Parameter |
Type |
Description |
---|---|---|---|
0 |
cluster |
quasardb.Cluster |
Active connection to the QuasarDB cluster |
1 |
query |
str |
The query to execute |
And an optional keyword argument:
Parameter |
Type |
Default |
Description |
---|---|---|---|
index |
|
|
If provided, resolves column and uses that as the index. If string (e.g. $timestamp), uses that column as the index. If |
The example below will demonstrate how to use this function based on the same query we used in the earlier query example.
with pool.instance().connect() as conn:
df = qdbpd.query(conn, "SELECT * FROM stocks LIMIT 10")
df
$timestamp |
$table |
open |
close |
volume |
|
---|---|---|---|---|---|
0 |
2019-02-01 00:00:00 |
stocks |
3.400000 |
3.500000 |
10000 |
1 |
2019-02-02 00:00:00 |
stocks |
3.500000 |
3.550000 |
7500 |
2 |
2022-07-01 00:00:00 |
stocks |
129.800938 |
189.807248 |
14288 |
3 |
2022-07-01 00:00:00 |
stocks |
129.800938 |
189.807248 |
14288 |
4 |
2022-07-01 00:00:01 |
stocks |
134.860206 |
179.282507 |
17750 |
5 |
2022-07-01 00:00:01 |
stocks |
134.860206 |
179.282507 |
17750 |
6 |
2022-07-01 00:00:02 |
stocks |
115.543728 |
154.417280 |
17369 |
7 |
2022-07-01 00:00:02 |
stocks |
115.543728 |
154.417280 |
17369 |
8 |
2022-07-01 00:00:03 |
stocks |
198.110431 |
134.008978 |
11141 |
9 |
2022-07-01 00:00:03 |
stocks |
198.110431 |
134.008978 |
11141 |
As a convenience, we can use the $timestamp
column as the index as well, by setting the index
parameter accordingly.
with pool.instance().connect() as conn:
df = qdbpd.query(conn, "SELECT * FROM stocks LIMIT 10", index='$timestamp')
df
$table |
open |
close |
volume |
|
---|---|---|---|---|
2019-02-01 00:00:00 |
stocks |
3.400000 |
3.500000 |
10000 |
2019-02-02 00:00:00 |
stocks |
3.500000 |
3.550000 |
7500 |
2022-07-01 00:00:00 |
stocks |
129.800938 |
189.807248 |
14288 |
2022-07-01 00:00:00 |
stocks |
129.800938 |
189.807248 |
14288 |
2022-07-01 00:00:01 |
stocks |
134.860206 |
179.282507 |
17750 |
2022-07-01 00:00:01 |
stocks |
134.860206 |
179.282507 |
17750 |
2022-07-01 00:00:02 |
stocks |
115.543728 |
154.417280 |
17369 |
2022-07-01 00:00:02 |
stocks |
115.543728 |
154.417280 |
17369 |
2022-07-01 00:00:03 |
stocks |
198.110431 |
134.008978 |
11141 |
2022-07-01 00:00:03 |
stocks |
198.110431 |
134.008978 |
11141 |
4.1.7.3. Insertion#
We can use the write_dataframe
function to insert new data into the database. This function uses the Numpy adaptor’s write_arrays
function under the hood, and comes with several options:
It takes three required, positional arguments:
Position |
Parameter |
Type |
Description |
---|---|---|---|
0 |
df |
|
The pandas dataframe to store |
1 |
cluster |
|
Active connection to the QuasarDB cluster |
2 |
table |
|
Either a string or a reference to a QuasarDB table object. For example, |
In addition to these, the function takes many optional keyword arguments:
Parameter |
Type |
Default |
Description |
---|---|---|---|
dtype |
|
|
Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label. If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence. |
create |
|
|
If true, automatically creates the table if it did not yet exist. |
shard_size |
|
|
If |
deduplicate |
|
|
Enables server-side deduplication of data when it
is written into the table. When |
deduplication_mode |
|
|
When |
infer_types |
|
|
If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it. Important: as conversions are expensive and often the majority of time spent
when inserting data into QuasarDB, we strongly recommend setting this to |
_async |
|
|
Enables the asynchronous batch inserter mode. |
fast |
|
|
Enables the fast batch inserter mode. |
truncate |
|
|
Enables the truncation batch inserter mode. |
For demonstration purposes, the code below generates sample data which we’ll be (re)using accross the various examples below.
import pandas as pd
start_time = np.datetime64('2022-07-15', 'ns')
# We will reuse the earlier sample data to construct a dataframe instead.
data_ = data
data_['$timestamp'] = gen_index(start_time)
# Note how we're setting the `$timestamp` column as the index: when writing a dataframe
# to QuasarDB, a `datetime64[ns]` index is required.
df = pd.DataFrame.from_dict(data_)
df = df.set_index('$timestamp').reindex()
df
$timestamp |
open |
close |
volume |
---|---|---|---|
2022-07-15 00:00:00 |
129.800938 |
189.807248 |
14288 |
2022-07-15 00:00:01 |
134.860206 |
179.282507 |
17750 |
2022-07-15 00:00:02 |
115.543728 |
154.417280 |
17369 |
2022-07-15 00:00:03 |
198.110431 |
134.008978 |
11141 |
2022-07-15 00:00:04 |
125.530270 |
193.479529 |
11947 |
2022-07-15 00:00:05 |
153.835698 |
195.047264 |
15907 |
2022-07-15 00:00:06 |
199.421627 |
125.887649 |
15512 |
2022-07-15 00:00:07 |
188.824268 |
119.402445 |
16381 |
2022-07-15 00:00:08 |
147.326543 |
102.446742 |
15213 |
2022-07-15 00:00:09 |
155.819616 |
198.740592 |
13419 |
4.1.7.3.1. Example: insertion#
The example below demonstrates how to insert data into our stocks
table using the fast insertion mode. For performance reasons, we disable implicit conversions by setting infer_types
to False
.
with pool.instance().connect() as conn:
qdbpd.write_dataframe(df, conn, conn.table('stocks'),
fast=True,
infer_types=False)
# We now expect exactly 10 rows in this time range.
q = "SELECT COUNT(volume) AS vol FROM stocks IN RANGE (2022-07-15, +1d)"
result = qdbpd.query(conn, q)
print(result)
vol |
|
---|---|
0 |
10 |
4.1.7.3.2. Example: deduplication#
QuasarDB supports server-side deduplication of data as you insert it. We demonstrate this functionality by first actually creating duplicate data, and demonstrating how to avoid this.
with pool.instance().connect() as conn:
# Note how we're explicitly setting `deduplicate` to `False`: this is the default
# behavior, so not strictly necessary.
qdbpd.write_dataframe(df, conn, conn.table('stocks'),
deduplicate=False,
fast=True,
infer_types=False)
# As we just inserted the same dataset again, we expect duplicate data.
q = "SELECT $timestamp, open FROM stocks IN RANGE (2022-07-15, +1d) LIMIT 10"
df1 = qdbpd.query(conn, q, index='$timestamp')
print(df1.head)
# Now, when we insert a second time, but this time enable deduplication,
# this will not happen: as the entire dataset is a perfect duplicate of
# previously stored data, none of the rows should be inserted.
qdbpd.write_dataframe(df, conn, conn.table('stocks'),
deduplicate=True,
fast=True,
infer_types=False)
# This time, we query again and expect the exact same results as before.
df2 = qdbpd.query(conn, q, index='$timestamp')
print(df2.head)
<bound method NDFrame.head of open
2022-07-15 00:00:00 129.800938
2022-07-15 00:00:00 129.800938
2022-07-15 00:00:01 134.860206
2022-07-15 00:00:01 134.860206
2022-07-15 00:00:02 115.543728
2022-07-15 00:00:02 115.543728
2022-07-15 00:00:03 198.110431
2022-07-15 00:00:03 198.110431
2022-07-15 00:00:04 125.530270
2022-07-15 00:00:04 125.530270>
<bound method NDFrame.head of open
2022-07-15 00:00:00 129.800938
2022-07-15 00:00:00 129.800938
2022-07-15 00:00:01 134.860206
2022-07-15 00:00:01 134.860206
2022-07-15 00:00:02 115.543728
2022-07-15 00:00:02 115.543728
2022-07-15 00:00:03 198.110431
2022-07-15 00:00:03 198.110431
2022-07-15 00:00:04 125.530270
2022-07-15 00:00:04 125.530270>
4.1.7.4. Null values#
Null values in Pandas have sometimes surprising behavior which can affect the performance and correctness of your code, and deserves some explanation. Contrary to Numpy’s masked arrays (which we covered in an earlier section), Pandas tries to represent null values inline: for example, for a float64
series, it represents a null value as a NaN
: this makes sense, as this is an explicitly defined standard of the IEEE 754 Floating Point Arithmetic standard.
For illustration purposes, let’s generate a small dataframe which has a few null values:
data = {'floats': [1.2, 3.4, None, 5.6, 7.8]}
pd.DataFrame.from_dict(data)
print(pd)
floats |
|
---|---|
0 |
1.2 |
1 |
3.4 |
2 |
NaN |
3 |
5.6 |
4 |
7.8 |
As you can see, Pandas inserted the NaN in the correct place. But what happens when you’re using a type that cannot be represented inline? For example, what if we want to combine null values with integers?
As there is no way to represent null values with integers, Pandas will instead silently convert the input data to doubles. Observe the following:
data['ints_with_null'] = [1, 2, None, 4, 5]
data['ints_no_null'] = [1, 2, 3, 4, 5]
pd.DataFrame.from_dict(data)
print(pd)
floats |
ints_with_null |
ints_no_null |
|
---|---|---|---|
0 |
1.2 |
1.0 |
1 |
1 |
3.4 |
2.0 |
2 |
2 |
NaN |
NaN |
3 |
3 |
5.6 |
4.0 |
4 |
4 |
7.8 |
5.0 |
5 |
Even without printing the dtypes, it’s now obvious that the array with null values has been converted to floating point.
As QuasarDB will want these values as integers instead, what this means is that a lot of conversions will take place under the hood:
Pandas first converts the array to float64, as demonstrated above;
QuasarDB’s adapter probes the null values and converts this into a masked array;
QuasarDB’s adapter then converts the masked array back to int64.
This means we’re not just copying, but actually converting the data to-and-from float64 before we’re inserting it into QuasarDB.
Knowing this, we recommend the following:
In performance sensitive scenarios: work directly with the Numpy adaptor as it avoids this entirely;
In other scenarios, while calling
write_dataframe
, either:
explicitly set the
dtype
of these columns to their desired type (in this case,int64
);ensure
infer_types
is toTrue
(the default).
4.1.8. User properties#
User properties are a way to attach metadata to specific connection. They allow key-value pairs to be stored with every connection. This metadata can be useful when you need a way to identify a connection for debugging or logging purposes. User properties are logged server side when JSON log output is enabled. Information on how to enable JSON logging can be found under configuration section.
More information about user properties can be found here.
4.1.8.1. Enabling and appending to user properties#
For this example you will need to launch server in debug log mode. See observability for more information.
import quasardb
with quasardb.Cluster("qdb://127.0.0.1:2836") as conn:
# first make sure that the user properties are enabled
conn.options().enable_user_properties()
# now you can set new user property
conn.properties().put("key_1", "value_for_key_1")
# and retrieve it
print(conn.properties().get("key_1"))
# now let's see how user properties are logged
# we will try two times to PUT BLOB into the database, on second try it will fail as there is already value for this key
# if cluster logging is enabled in debug mode this should produce a message in JSON log file
try:
conn.query("PUT BLOB a 'a'")
conn.query("PUT BLOB a 'a'")
except quasardb.AliasAlreadyExistsError as e:
print("Caught execption", e)
If cluster is set to log to file, in JSON format, in log file you will see message similar to this one:
{"timestamp":"2025-01-15T11:12:51.139482200Z","process_id":1916,"thread_id":34012,"level":"debug","message":"could not put entry a (was created: false, flags: 2, entries map count: 1, timestamp: {1736939571.139371400:34012:c5fe30bf0154acc-5d63bd06e7878b9c-5f635b3cf7fc3560-dbe35df7b5080651}): the entry already exists","$client_hostname":"hal-9000","$client_version":"3.14.2 3.14.2.dev0 d82b8b86d71c9334951b442b937abf9a598eda64 2025-01-14 10:18:10 -0500","$client_target":"AMD64 core2 64-bit","$client_query":"PUT BLOB a 'a'","key_1":"value_for_key_1","$client_timestamp":"2025-01-15T12:12:51+01:00","$client_platform":"Microsoft Windows 11 (build 26100), 64-bit"}
As you can see the user property is logged with the message, making it easier to identify connection source.