6.3. Batch Import Best Practices#

This guide describes best practices for efficiently loading batch data into QuasarDB.

Batch data refers to any non-continuous ingestion, including: - Historical data (months or years of backfills) - Daily/hourly loads (e.g., financial market closes) - Periodic snapshots (e.g., end-of-day positions)

The fundamental principle: align your ingestion with QuasarDB’s storage model to avoid write and read amplification.

6.3.1. Key principles#

  1. Split data by table (e.g., stocks.apple, stocks.google).

  2. Split data by shard — align writes to the table’s shard_size (e.g., 15-minute intervals).

  3. Insert entire shards at once.

  4. Use ``fast`` insertion mode — direct writes to disk.

  5. Optionally disable RocksDB compaction during very large imports.

6.3.2. Real-world example: stock trading data#

Suppose you manage market trade data across multiple stocks. Your database has tables like:

  • stocks.apple

  • stocks.google

  • stocks.microsoft

Each table stores: - timestamp: time of the trade - price: trade price - volume: number of shares traded

The system’s shard_size is configured to 15 minutes.

You receive a file trades_20250427_12h00.csv containing raw trade data for multiple stocks.

Example CSV:

timestamp,symbol,price,volume
2025-04-27T11:45:00Z,APPL,187.20,100
2025-04-27T11:47:00Z,GOOG,2800.50,50
2025-04-27T12:03:00Z,MSFT,305.10,200

Goal: Efficiently import this file into the right tables, properly batched by table and shard.

6.3.3. Steps#

  1. Read the CSV into a Pandas DataFrame.

  2. Split by table (symbol).

  3. Organize data into per-shard groups (15-minute windows).

  4. Write all DataFrames into QuasarDB at once using `qdbpd.write_dataframes()`.

6.3.4. Correct Python code (Pandas version)#

import quasardb
import quasardb.pandas as qdbpd
import pandas as pd
from datetime import datetime
from collections import defaultdict

# Connect to the cluster
cluster = quasardb.Cluster('qdb://127.0.0.1:2836')

# Load a dataframe, in our case, the code below assumes a dataframe with:
# - a 'timestamp' column of datetime objects;
# - a 'symbol' column which denotes the stock symbol being traded
#
# We first create individual arrays for each column
timestamps = [datetime.now(), datetime.now()]
symbols = ['apple', 'google']
opens = [42.05, 96.5],
closes = [41.95, 96.57],

# Represents these columns as a dict of arrays. We call the timestamp
# column `$timestamp` because it aligns with the name QuasarDB uses,
# not because it's required.
d = {'$timestamp': timestamps,
     'symbols': symbols,
     'opens': opens,
     'closes': closes}

# Convert the dict of arrays into a dataframe. We do *not* yet index by
# timestamp (which is required by the QuasarDB pandas API), because we'll
# split this big dataframe into smaller dataframes based on shard size.
#
# As it is always more efficient to sort/index multiple smaller dataframes
# than a single large dataframe, we prefer to do the sorting *after* we
# split our dataframe into smaller groups.
df = pd.DataFrame(data=d)

# Create a dictionary to store per-table dataframes.
dataframes = defaultdict(list)

# Define shard size: 15 minutes
shard_size = pd.Timedelta(minutes=15)

# Compute shard start times
df['shard_start'] = df['$timestamp'].dt.floor('15T')

# Group by symbol and shard_start
grouped = df.groupby(['symbol', 'shard_start'])

for (symbol, shard_start), group in grouped:
    table_name = f"stocks.{symbol}"

    # The 'shard_start' column is not necessary, but we recommend
    # storing the 'symbol' column in the table as well, even though
    # the name of the table already tells us which column it is.
    #
    # This is because it makes queries where you'll want to `GROUP BY symbol`
    # easier and more efficient, especially if you have multiple tables
    # that refer to the same symbol (e.g. the same company being traded in
    # multiple exchanges).
    cleaned_group = group.drop(columns=['shard_start'])

    # The QuasarDB pandas API requires a timestamp index.
    cleaned_group.set_index('timestamp', inplace=True)
    dataframes[table_name].append(cleaned_group)

# Merge shards into one dataframe per table
final_dataframes = {table: pd.concat(frames) for table, frames in dataframes.items()}

# Write all dataframes at once
qdbpd.write_dataframes(cluster, final_dataframes, fast=True)

6.3.5. Explanation#

  • ** `.floor(‘15T’)`**: aligns each row to the 15-minute shard it belongs to.

  • ** Group first by symbol, then by shard window**: separates cleanly by table and by shard.

  • ** `qdbpd.write_dataframes`** writes multiple tables at once, fully using fast insertion mode (push_mode=quasardb.WriterPushMode.Fast).

6.3.6. Performance tuning#

  • Large batch sizes: Let the client-side memory fill up before pushing, if possible.

  • Parallelism: QuasarDB can internally parallelize writes per shard/table automatically.

  • Cluster tuning: Adjust connection pools if performing extremely high ingestion rates.

6.3.7. Avoid write amplification#

Write amplification can happen if:

  • You partially write the same shard multiple times.

  • You shuffle data timestamps across writes.

To avoid:

  • Always prepare full shards before sending to QuasarDB.

  • Send complete shards in a single batch write.

6.3.8. Handling very large historical imports#

If importing very large datasets (>500 GB):

  1. Disable RocksDB compaction:

    cluster_disable_auto_compaction
    
  2. Perform the import.

  3. Re-enable compaction:

    cluster_enable_auto_compaction
    

This drastically improves write throughput, at the expense of read amplification.

6.3.9. Streaming vs batch ingestion#

Batch Imports | Streaming/Continuous Ingestion |

|:--------------------------------------------|:———————————————| | Use fast mode (direct write to disk) | Use async mode (server-side buffering) | | Pre-organized, shard-aligned data | Real-time, unpredictable timestamps | | Maximal batch sizes | Frequent small writes | | Typical for historical or periodic data | Typical for live sensor, tick, or event data |

6.3.10. Summary checklist#

  • Read file into Pandas DataFrame

  • Normalize by table (symbol)

  • Align timestamps to shard windows

  • Batch and write all tables at once with fast=True

  • Optionally disable compaction for massive imports