6.3. Batch Import Best Practices#
This guide describes best practices for efficiently loading batch data into QuasarDB.
Batch data refers to any non-continuous ingestion, including: - Historical data (months or years of backfills) - Daily/hourly loads (e.g., financial market closes) - Periodic snapshots (e.g., end-of-day positions)
The fundamental principle: align your ingestion with QuasarDB’s storage model to avoid write and read amplification.
6.3.1. Key principles#
Split data by table (e.g., stocks.apple, stocks.google).
Split data by shard — align writes to the table’s shard_size (e.g., 15-minute intervals).
Insert entire shards at once.
Use ``fast`` insertion mode — direct writes to disk.
Optionally disable RocksDB compaction during very large imports.
6.3.2. Real-world example: stock trading data#
Suppose you manage market trade data across multiple stocks. Your database has tables like:
stocks.apple
stocks.google
stocks.microsoft
Each table stores: - timestamp: time of the trade - price: trade price - volume: number of shares traded
The system’s shard_size is configured to 15 minutes.
You receive a file trades_20250427_12h00.csv containing raw trade data for multiple stocks.
Example CSV:
timestamp,symbol,price,volume
2025-04-27T11:45:00Z,APPL,187.20,100
2025-04-27T11:47:00Z,GOOG,2800.50,50
2025-04-27T12:03:00Z,MSFT,305.10,200
Goal: Efficiently import this file into the right tables, properly batched by table and shard.
6.3.3. Steps#
Read the CSV into a Pandas DataFrame.
Split by table (symbol).
Organize data into per-shard groups (15-minute windows).
Write all DataFrames into QuasarDB at once using `qdbpd.write_dataframes()`.
6.3.4. Correct Python code (Pandas version)#
import quasardb
import quasardb.pandas as qdbpd
import pandas as pd
from datetime import datetime
from collections import defaultdict
# Connect to the cluster
cluster = quasardb.Cluster('qdb://127.0.0.1:2836')
# Load a dataframe, in our case, the code below assumes a dataframe with:
# - a 'timestamp' column of datetime objects;
# - a 'symbol' column which denotes the stock symbol being traded
#
# We first create individual arrays for each column
timestamps = [datetime.now(), datetime.now()]
symbols = ['apple', 'google']
opens = [42.05, 96.5],
closes = [41.95, 96.57],
# Represents these columns as a dict of arrays. We call the timestamp
# column `$timestamp` because it aligns with the name QuasarDB uses,
# not because it's required.
d = {'$timestamp': timestamps,
'symbols': symbols,
'opens': opens,
'closes': closes}
# Convert the dict of arrays into a dataframe. We do *not* yet index by
# timestamp (which is required by the QuasarDB pandas API), because we'll
# split this big dataframe into smaller dataframes based on shard size.
#
# As it is always more efficient to sort/index multiple smaller dataframes
# than a single large dataframe, we prefer to do the sorting *after* we
# split our dataframe into smaller groups.
df = pd.DataFrame(data=d)
# Create a dictionary to store per-table dataframes.
dataframes = defaultdict(list)
# Define shard size: 15 minutes
shard_size = pd.Timedelta(minutes=15)
# Compute shard start times
df['shard_start'] = df['$timestamp'].dt.floor('15T')
# Group by symbol and shard_start
grouped = df.groupby(['symbol', 'shard_start'])
for (symbol, shard_start), group in grouped:
table_name = f"stocks.{symbol}"
# The 'shard_start' column is not necessary, but we recommend
# storing the 'symbol' column in the table as well, even though
# the name of the table already tells us which column it is.
#
# This is because it makes queries where you'll want to `GROUP BY symbol`
# easier and more efficient, especially if you have multiple tables
# that refer to the same symbol (e.g. the same company being traded in
# multiple exchanges).
cleaned_group = group.drop(columns=['shard_start'])
# The QuasarDB pandas API requires a timestamp index.
cleaned_group.set_index('timestamp', inplace=True)
dataframes[table_name].append(cleaned_group)
# Merge shards into one dataframe per table
final_dataframes = {table: pd.concat(frames) for table, frames in dataframes.items()}
# Write all dataframes at once
qdbpd.write_dataframes(cluster, final_dataframes, fast=True)
6.3.5. Explanation#
** `.floor(‘15T’)`**: aligns each row to the 15-minute shard it belongs to.
** Group first by symbol, then by shard window**: separates cleanly by table and by shard.
** `qdbpd.write_dataframes`** writes multiple tables at once, fully using fast insertion mode (push_mode=quasardb.WriterPushMode.Fast).
6.3.6. Performance tuning#
Large batch sizes: Let the client-side memory fill up before pushing, if possible.
Parallelism: QuasarDB can internally parallelize writes per shard/table automatically.
Cluster tuning: Adjust connection pools if performing extremely high ingestion rates.
6.3.7. Avoid write amplification#
Write amplification can happen if:
You partially write the same shard multiple times.
You shuffle data timestamps across writes.
To avoid:
Always prepare full shards before sending to QuasarDB.
Send complete shards in a single batch write.
6.3.8. Handling very large historical imports#
If importing very large datasets (>500 GB):
Disable RocksDB compaction:
cluster_disable_auto_compaction
Perform the import.
Re-enable compaction:
cluster_enable_auto_compaction
This drastically improves write throughput, at the expense of read amplification.
6.3.9. Streaming vs batch ingestion#
|:--------------------------------------------|:———————————————| | Use fast mode (direct write to disk) | Use async mode (server-side buffering) | | Pre-organized, shard-aligned data | Real-time, unpredictable timestamps | | Maximal batch sizes | Frequent small writes | | Typical for historical or periodic data | Typical for live sensor, tick, or event data |
6.3.10. Summary checklist#
Read file into Pandas DataFrame
Normalize by table (symbol)
Align timestamps to shard windows
Batch and write all tables at once with fast=True
Optionally disable compaction for massive imports