Package quasardb.pandas

Functions

def query(cluster: Cluster,
query,
index=None,
blobs=False,
numpy=True)
Expand source code
def query(cluster: quasardb.Cluster, query, index=None, blobs=False, numpy=True):
    """
    Execute a query and return the results as DataFrames. Returns a dict of
    tablename / DataFrame pairs.

    Parameters:
    -----------

    cluster : quasardb.Cluster
      Active connection to the QuasarDB cluster

    query : str
      The query to execute.

    blobs : bool or list
      Determines which QuasarDB blob-columns should be returned as bytearrays; otherwise
      they are returned as UTF-8 strings.

      True means every blob column should be returned as byte-array, or a list will
      specify which specific columns. Defaults to false, meaning all blobs are returned
      as strings.

    """
    logger.debug("querying and returning as DataFrame: %s", query)
    (index, m) = qdbnp.query(cluster, query, index=index, dict=True)
    df = pd.DataFrame(m)

    df.set_index(index, inplace=True)
    return df

Execute a query and return the results as DataFrames. Returns a dict of tablename / DataFrame pairs.

Parameters:

cluster : quasardb.Cluster Active connection to the QuasarDB cluster

query : str The query to execute.

blobs : bool or list Determines which QuasarDB blob-columns should be returned as bytearrays; otherwise they are returned as UTF-8 strings.

True means every blob column should be returned as byte-array, or a list will specify which specific columns. Defaults to false, meaning all blobs are returned as strings.

def read_dataframe(conn: Cluster,
table,
**kwargs)
Expand source code
def read_dataframe(conn: quasardb.Cluster, table, **kwargs):
    """
    Read a Pandas Dataframe from a QuasarDB Timeseries table. Wraps around stream_dataframes(), and
    returns everything as a single dataframe. batch_size is always explicitly set to 0.


    Parameters:
    -----------

    conn : quasardb.Cluster
      Connection to the QuasarDB database.

    table : str | quasardb.Table
      QuasarDB table to stream, either as a string or a table object. When re-executing the same function
      multiple times on the same tables, providing the table as an object has a performance benefit.

    """

    if (
        "batch_size" in kwargs
        and kwargs["batch_size"] != 0
        and kwargs["batch_size"] != None
    ):
        logger.warn(
            "Providing a batch size with read_dataframe is unsupported, overriding batch_size to 65536."
        )
        logger.warn(
            "If you wish to traverse the data in smaller batches, please use: stream_dataframe()."
        )
        kwargs["batch_size"] = 2**16

    # Note that this is *lazy*, dfs is a generator, not a list -- as such, dataframes will be
    # fetched on-demand, which means that an error could occur in the middle of processing
    # dataframes.
    dfs = stream_dataframe(conn, table, **kwargs)

    return pd.concat(dfs)

Read a Pandas Dataframe from a QuasarDB Timeseries table. Wraps around stream_dataframes(), and returns everything as a single dataframe. batch_size is always explicitly set to 0.

Parameters:

conn : quasardb.Cluster Connection to the QuasarDB database.

table : str | quasardb.Table QuasarDB table to stream, either as a string or a table object. When re-executing the same function multiple times on the same tables, providing the table as an object has a performance benefit.

def read_series(table, col_name, ranges=None)
Expand source code
def read_series(table, col_name, ranges=None):
    """
    Read a Pandas Timeseries from a single column.

    Parameters:
    -----------

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    col_name : str
      Name of the column to read.

    ranges : list
      A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects.
    """
    read_with = {
        quasardb.ColumnType.Double: table.double_get_ranges,
        quasardb.ColumnType.Blob: table.blob_get_ranges,
        quasardb.ColumnType.String: table.string_get_ranges,
        quasardb.ColumnType.Int64: table.int64_get_ranges,
        quasardb.ColumnType.Timestamp: table.timestamp_get_ranges,
        quasardb.ColumnType.Symbol: table.string_get_ranges,
    }

    kwargs = {"column": col_name}

    if ranges is not None:
        kwargs["ranges"] = ranges

    # Dispatch based on column type
    t = table.column_type_by_id(col_name)

    logger.info(
        "reading Series from column %s.%s with type %s", table.get_name(), col_name, t
    )

    res = (read_with[t])(**kwargs)

    return Series(res[1], index=res[0])

Read a Pandas Timeseries from a single column.

Parameters:

table : quasardb.Timeseries QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

col_name : str Name of the column to read.

ranges : list A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects.

def stream_dataframe(conn: Cluster,
table,
**kwargs)
Expand source code
def stream_dataframe(conn: quasardb.Cluster, table, **kwargs):
    """
    Read a single table and return a stream of dataframes. This is a convenience function that wraps around
    `stream_dataframes`.
    """
    kwargs["tables"] = [table]

    # For backwards compatibility, we drop the `$table` column returned: this is not strictly
    # necessary, but it also is somewhat reasonable to drop it when we're reading from a single
    # table, which is the case here.
    clean_df_fn = lambda df: df.drop(columns=["$table"])

    return (clean_df_fn(df) for df in stream_dataframes(conn, **kwargs))

Read a single table and return a stream of dataframes. This is a convenience function that wraps around stream_dataframes().

def stream_dataframes(conn: Cluster,
tables: list,
*,
batch_size: int = 65536,
column_names: list = None,
ranges: list = None)
Expand source code
def stream_dataframes(
    conn: quasardb.Cluster,
    tables: list,
    *,
    batch_size: int = 2**16,
    column_names: list = None,
    ranges: list = None,
):
    """
    Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful
    when traversing a large dataset which does not fit into memory.

    Accepts the same parameters as `stream_dataframes`.

    Parameters:
    -----------

    conn : quasardb.Cluster
      Connection to the QuasarDB database.

    tables : list[str | quasardb.Table]
      QuasarDB tables to stream, as a list of strings or quasardb table objects.

    batch_size : int
      The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows
      as batch size by default.

    column_names : optional list
      List of columns to read in dataframe. The timestamp column '$timestamp' is
      always read.

      Defaults to all columns.

    ranges: optional list
      A list of time ranges to read, represented as tuples of Numpy datetime64[ns] objects.
      Defaults to the entire table.

    """
    # Sanitize batch_size
    if batch_size == None:
        batch_size = 2**16
    elif not isinstance(batch_size, int):
        raise TypeError(
            "batch_size should be an integer, but got: {} with value {}".format(
                type(batch_size), str(batch_size)
            )
        )

    kwargs = {"batch_size": batch_size}

    if column_names:
        kwargs["column_names"] = column_names

    if ranges:
        kwargs["ranges"] = ranges

    coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name()
    kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables]

    with conn.reader(**kwargs) as reader:
        for batch in reader:
            # We always expect the timestamp column, and set this as the index
            assert "$timestamp" in batch

            idx = pd.Index(batch.pop("$timestamp"), copy=False, name="$timestamp")
            df = pd.DataFrame(batch, index=idx)

            yield df

Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size batch_size, which is useful when traversing a large dataset which does not fit into memory.

Accepts the same parameters as stream_dataframes().

Parameters:

conn : quasardb.Cluster Connection to the QuasarDB database.

tables : list[str | quasardb.Table] QuasarDB tables to stream, as a list of strings or quasardb table objects.

batch_size : int The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows as batch size by default.

column_names : optional list List of columns to read in dataframe. The timestamp column '$timestamp' is always read.

Defaults to all columns.

ranges: optional list A list of time ranges to read, represented as tuples of Numpy datetime64[ns] objects. Defaults to the entire table.

def write_dataframe(df, cluster, table, **kwargs)
Expand source code
def write_dataframe(df, cluster, table, **kwargs):
    """
    Store a single dataframe into a table. Takes the same arguments as `write_dataframes`, except only
    a single df/table combination.
    """
    write_dataframes([(table, df)], cluster, **kwargs)

Store a single dataframe into a table. Takes the same arguments as write_dataframes(), except only a single df/table combination.

def write_dataframes(dfs, cluster, *, create=False, shard_size=None, **kwargs)
Expand source code
def write_dataframes(dfs, cluster, *, create=False, shard_size=None, **kwargs):
    """
    Store dataframes into a table. Any additional parameters not documented here
    are passed to numpy.write_arrays(). Please consult the pydoc of that function
    for additional accepted parameters.

    Parameters:
    -----------

    dfs: dict[str | quasardb.Table, pd.DataFrame] | list[tuple[str | quasardb.Table, pd.DataFrame]]
      This can be either a dict that maps table (either objects or names) to a dataframe, or a list
      of table<>dataframe tuples.

    cluster: quasardb.Cluster
      Active connection to the QuasarDB cluster

    create: optional bool
      Whether to create the table. Defaults to False.

    shard_size: optional datetime.timedelta
      The shard size of the timeseries you wish to create when `create` is True.
    """

    # If dfs is a dict, we convert it to a list of tuples.
    if isinstance(dfs, dict):
        dfs = dfs.items()

    if shard_size is not None and create == False:
        raise ValueError("Invalid argument: shard size provided while create is False")

    # If the tables are provided as strings, we look them up.
    dfs_ = []
    for table, df in dfs:
        if isinstance(table, str):
            table = table_cache.lookup(table, cluster)

        dfs_.append((table, df))

    data_by_table = []

    for table, df in dfs_:
        logger.debug("quasardb.pandas.write_dataframe, create = %s", create)
        assert isinstance(df, pd.DataFrame)

        # Create table if requested
        if create:
            _create_table_from_df(df, table, shard_size)

        cinfos = [(x.name, x.type) for x in table.list_columns()]

        if not df.index.is_monotonic_increasing:
            logger.warn(
                "dataframe index is unsorted, resorting dataframe based on index"
            )
            df = df.sort_index().reindex()

        # We pass everything else to our qdbnp.write_arrays function, as generally speaking
        # it is (much) more sensible to deal with numpy arrays than Pandas dataframes:
        # pandas has the bad habit of wanting to cast data to different types if your data
        # is sparse, most notably forcing sparse integer arrays to floating points.

        data = _extract_columns(df, cinfos)
        data["$timestamp"] = df.index.to_numpy(copy=False, dtype="datetime64[ns]")

        data_by_table.append((table, data))

    return qdbnp.write_arrays(data_by_table, cluster, table=None, index=None, **kwargs)

Store dataframes into a table. Any additional parameters not documented here are passed to numpy.write_arrays(). Please consult the pydoc of that function for additional accepted parameters.

Parameters:

dfs: dict[str | quasardb.Table, pd.DataFrame] | list[tuple[str | quasardb.Table, pd.DataFrame]] This can be either a dict that maps table (either objects or names) to a dataframe, or a list of table<>dataframe tuples.

cluster: quasardb.Cluster Active connection to the QuasarDB cluster

create: optional bool Whether to create the table. Defaults to False.

shard_size: optional datetime.timedelta The shard size of the timeseries you wish to create when create is True.

def write_pinned_dataframe(*args, **kwargs)
Expand source code
def write_pinned_dataframe(*args, **kwargs):
    """
    Legacy wrapper around write_dataframe()
    """
    logger.warn(
        "write_pinned_dataframe is deprecated and will be removed in a future release."
    )
    logger.warn("Please use write_dataframe directly instead")
    return write_dataframe(*args, **kwargs)

Legacy wrapper around write_dataframe()

def write_series(series, table, col_name, infer_types=True, dtype=None)
Expand source code
def write_series(series, table, col_name, infer_types=True, dtype=None):
    """
    Writes a Pandas Timeseries to a single column.

    Parameters:
    -----------

    series : pandas.Series
      Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted
      to be transformed to appropriate QuasarDB type.

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    col_name : str
      Column name to store data in.
    """

    logger.debug(
        "write_series, table=%s, col_name=%s, infer_types=%s, dtype=%s",
        table.get_name(),
        col_name,
        infer_types,
        dtype,
    )

    data = None
    index = None

    data = ma.masked_array(series.to_numpy(copy=False), mask=series.isna())

    if infer_types is True:
        index = series.index.to_numpy("datetime64[ns]", copy=False)
    else:
        index = series.index.to_numpy(copy=False)

    assert data is not None
    assert index is not None

    return qdbnp.write_array(
        data=data,
        index=index,
        table=table,
        column=col_name,
        dtype=dtype,
        infer_types=infer_types,
    )

Writes a Pandas Timeseries to a single column.

Parameters:

series : pandas.Series Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted to be transformed to appropriate QuasarDB type.

table : quasardb.Timeseries QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

col_name : str Column name to store data in.

Classes

class PandasRequired (*args, **kwargs)
Expand source code
class PandasRequired(ImportError):
    """
    Exception raised when trying to use QuasarDB pandas integration, but
    pandas has not been installed.
    """

    pass

Exception raised when trying to use QuasarDB pandas integration, but pandas has not been installed.

Ancestors

  • builtins.ImportError
  • builtins.Exception
  • builtins.BaseException