Module quasardb.pandas

Expand source code
# pylint: disable=C0103,C0111,C0302,R0903

# Copyright (c) 2009-2021, quasardb SAS. All rights reserved.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    * Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#    * Neither the name of quasardb nor the names of its contributors may
#      be used to endorse or promote products derived from this software
#      without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY QUASARDB AND CONTRIBUTORS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#

import logging
from datetime import datetime
from functools import partial

import quasardb
import quasardb.table_cache as table_cache
import quasardb.numpy as qdbnp


logger = logging.getLogger('quasardb.pandas')

class PandasRequired(ImportError):
    """
    Exception raised when trying to use QuasarDB pandas integration, but
    pandas has not been installed.
    """
    pass

try:
    import numpy as np
    import numpy.ma as ma
    import pandas as pd
    from pandas.core.api import DataFrame, Series
    from pandas.core.base import PandasObject

except ImportError:
    raise PandasRequired(
        "The pandas library is required to handle pandas data formats")


# Constant mapping of numpy dtype to QuasarDB column type
# TODO(leon): support this natively in qdb C api ? we have everything we need
#             to understand dtypes.
_dtype_map = {
    np.dtype('int64'): quasardb.ColumnType.Int64,
    np.dtype('int32'): quasardb.ColumnType.Int64,
    np.dtype('float64'): quasardb.ColumnType.Double,
    np.dtype('object'): quasardb.ColumnType.String,
    np.dtype('M8[ns]'): quasardb.ColumnType.Timestamp,
    np.dtype('datetime64[ns]'): quasardb.ColumnType.Timestamp,

    'int64': quasardb.ColumnType.Int64,
    'int32': quasardb.ColumnType.Int64,
    'float32': quasardb.ColumnType.Double,
    'float64': quasardb.ColumnType.Double,
    'timestamp': quasardb.ColumnType.Timestamp,
    'string': quasardb.ColumnType.String,
    'bytes': quasardb.ColumnType.Blob,

    'floating': quasardb.ColumnType.Double,
    'integer': quasardb.ColumnType.Int64,
    'bytes': quasardb.ColumnType.Blob,
    'string': quasardb.ColumnType.String,
    'datetime64':  quasardb.ColumnType.Timestamp
}

def read_series(table, col_name, ranges=None):
    """
    Read a Pandas Timeseries from a single column.

    Parameters:
    -----------

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    col_name : str
      Name of the column to read.

    ranges : list
      A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects.
    """
    read_with = {
        quasardb.ColumnType.Double: table.double_get_ranges,
        quasardb.ColumnType.Blob: table.blob_get_ranges,
        quasardb.ColumnType.String: table.string_get_ranges,
        quasardb.ColumnType.Int64: table.int64_get_ranges,
        quasardb.ColumnType.Timestamp: table.timestamp_get_ranges,
        quasardb.ColumnType.Symbol: table.string_get_ranges,
    }

    kwargs = {
        'column': col_name
    }

    if ranges is not None:
        kwargs['ranges'] = ranges

    # Dispatch based on column type
    t = table.column_type_by_id(col_name)

    logger.info("reading Series from column %s.%s with type %s",
                 table.get_name(), col_name, t)

    res = (read_with[t])(**kwargs)

    return Series(res[1], index=res[0])


def write_series(series,
                 table,
                 col_name,
                 infer_types=True,
                 dtype=None):
    """
    Writes a Pandas Timeseries to a single column.

    Parameters:
    -----------

    series : pandas.Series
      Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted
      to be transformed to appropriate QuasarDB type.

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    col_name : str
      Column name to store data in.
    """

    logger.debug("write_series, table=%s, col_name=%s, infer_types=%s, dtype=%s", table.get_name(), col_name, infer_types, dtype)

    data = None
    index = None

    data = ma.masked_array(series.to_numpy(copy=False),
                           mask=series.isna())

    if infer_types is True:
        index = series.index.to_numpy('datetime64[ns]', copy=False)
    else:
        index = series.index.to_numpy(copy=False)

    assert data is not None
    assert index is not None

    return qdbnp.write_array(data=data,
                             index=index,
                             table=table,
                             column=col_name,
                             dtype=dtype,
                             infer_types=infer_types)

def query(cluster: quasardb.Cluster,
          query,
          index=None,
          blobs=False,
          numpy=True):
    """
    Execute a query and return the results as DataFrames. Returns a dict of
    tablename / DataFrame pairs.

    Parameters:
    -----------

    cluster : quasardb.Cluster
      Active connection to the QuasarDB cluster

    query : str
      The query to execute.

    blobs : bool or list
      Determines which QuasarDB blob-columns should be returned as bytearrays; otherwise
      they are returned as UTF-8 strings.

      True means every blob column should be returned as byte-array, or a list will
      specify which specific columns. Defaults to false, meaning all blobs are returned
      as strings.

    """
    logger.debug("querying and returning as DataFrame: %s", query)
    (index, m) = qdbnp.query(cluster, query, index=index, dict=True)
    df = pd.DataFrame(m)

    df.set_index(index, inplace=True)
    return df

def stream_dataframe(table : quasardb.Table, *, batch_size : int = 2**16, column_names : list = None, ranges : list = None):
    """
    Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful
    when traversing a large dataset which does not fit into memory.

    Parameters:
    -----------

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    batch_size : int
      The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows
      as batch size by default.

    column_names : optional list
      List of columns to read in dataframe. The timestamp column '$timestamp' is
      always read.

      Defaults to all columns.

    ranges: optional list
      A list of time ranges to read, represented as tuples of Numpy datetime64[ns] objects.
      Defaults to the entire table.

    """
    # Sanitize batch_size
    if batch_size == None:
        batch_size = 2**16
    elif not isinstance(batch_size, int):
        raise TypeError("batch_size should be an integer, but got: {} with value {}".format(type(batch_size), str(batch_size)))

    kwargs = {}

    if column_names:
        kwargs['column_names'] = column_names

    if ranges:
        kwargs['ranges'] = ranges

    with table.reader(**kwargs) as reader:
        for batch in reader:
            # We always expect the timestamp column, and set this as the index
            assert '$timestamp' in batch

            idx = pd.Index(batch.pop('$timestamp'), copy=False, name='$timestamp')
            df = pd.DataFrame(batch, index=idx)

            yield df


def read_dataframe(table, **kwargs):
    """
    Read a Pandas Dataframe from a QuasarDB Timeseries table. Wraps around stream_dataframe(), and
    returns everything as a single dataframe. batch_size is always explicitly set to 0.
    """

    if 'batch_size' in kwargs and kwargs['batch_size'] != 0 and kwargs['batch_size'] != None:
        logger.warn("Providing a batch size with read_dataframe is unsupported, overriding batch_size to 65536.")
        logger.warn("If you wish to traverse the data in smaller batches, please use: stream_dataframe().")
        kwargs['batch_size'] = 2**16

    # Note that this is *lazy*, dfs is a generator, not a list -- as such, dataframes will be
    # fetched on-demand, which means that an error could occur in the middle of processing
    # dataframes.
    dfs = stream_dataframe(table, **kwargs)

    return pd.concat(dfs)


def _extract_columns(df, cinfos):
    """
    Converts dataframe to a number of numpy arrays, one for each column.

    Arrays will be indexed by relative offset, in the same order as the table's columns.
    If a table column is not present in the dataframe, it it have a None entry.
    If a dataframe column is not present in the table, it will be ommitted.
    """
    ret = {}

    # Grab all columns from the DataFrame in the order of table columns,
    # put None if not present in df.
    for i in range(len(cinfos)):
        (cname, ctype) = cinfos[i]
        xs = None

        if cname in df.columns:
            arr = df[cname].array
            ret[cname] = ma.masked_array(arr.to_numpy(copy=False),
                                         mask=arr.isna())

    return ret

def write_dataframes(
        dfs,
        cluster,
        *,
        create = False,
        shard_size = None,
        **kwargs):
    """
    Store dataframes into a table. Any additional parameters not documented here
    are passed to numpy.write_arrays(). Please consult the pydoc of that function
    for additional accepted parameters.

    Parameters:
    -----------

    dfs: dict[str | quasardb.Table, pd.DataFrame] | list[tuple[str | quasardb.Table, pd.DataFrame]]
      This can be either a dict that maps table (either objects or names) to a dataframe, or a list
      of table<>dataframe tuples.

    cluster: quasardb.Cluster
      Active connection to the QuasarDB cluster

    create: optional bool
      Whether to create the table. Defaults to False.

    shard_size: optional datetime.timedelta
      The shard size of the timeseries you wish to create when `create` is True.
    """

    # If dfs is a dict, we convert it to a list of tuples.
    if isinstance(dfs, dict):
        dfs = dfs.items()

    if shard_size is not None and create == False:
        raise ValueError("Invalid argument: shard size provided while create is False")

    # If the tables are provided as strings, we look them up.
    dfs_ = []
    for (table, df) in dfs:
        if isinstance(table, str):
            table = table_cache.lookup(table, cluster)

        dfs_.append((table, df))

    data_by_table = []

    for table, df in dfs_:
        logger.debug("quasardb.pandas.write_dataframe, create = %s", create)
        assert isinstance(df, pd.DataFrame)

        # Create table if requested
        if create:
            _create_table_from_df(df, table, shard_size)

        cinfos = [(x.name, x.type) for x in table.list_columns()]

        if not df.index.is_monotonic_increasing:
            logger.warn("dataframe index is unsorted, resorting dataframe based on index")
            df = df.sort_index().reindex()

        # We pass everything else to our qdbnp.write_arrays function, as generally speaking
        # it is (much) more sensible to deal with numpy arrays than Pandas dataframes:
        # pandas has the bad habit of wanting to cast data to different types if your data
        # is sparse, most notably forcing sparse integer arrays to floating points.

        data = _extract_columns(df, cinfos)
        data['$timestamp'] = df.index.to_numpy(copy=False,
                                               dtype='datetime64[ns]')

        data_by_table.append((table, data))

    return qdbnp.write_arrays(data_by_table, cluster,
                              table=None,
                              index=None,
                              **kwargs)

def write_dataframe(
        df,
        cluster,
        table,
        **kwargs):
    """
    Store a single dataframe into a table. Takes the same arguments as `write_dataframes`, except only
    a single df/table combination.
    """
    write_dataframes([(table, df)], cluster, **kwargs)


def write_pinned_dataframe(*args, **kwargs):
    """
    Legacy wrapper around write_dataframe()
    """
    logger.warn("write_pinned_dataframe is deprecated and will be removed in a future release.")
    logger.warn("Please use write_dataframe directly instead")
    return write_dataframe(*args, **kwargs)


def _create_table_from_df(df, table, shard_size=None):
    cols = list()

    dtypes = _get_inferred_dtypes(df)

    logger.info("got inferred dtypes: %s", dtypes)
    for c in df.columns:
        dt = dtypes[c]
        ct = _dtype_to_column_type(df[c].dtype, dt)
        logger.debug("probed pandas dtype %s to inferred dtype %s and map to quasardb column type %s", df[c].dtype, dt, ct)
        cols.append(quasardb.ColumnInfo(ct, c))

    try:
        if not shard_size:
            table.create(cols)
        else:
            table.create(cols, shard_size)
    except quasardb.quasardb.AliasAlreadyExistsError:
        # TODO(leon): warn? how?
        pass

    return table


def _dtype_to_column_type(dt, inferred):
    res = _dtype_map.get(inferred, None)
    if res is None:
        res = _dtype_map.get(dt, None)

    if res is None:
        raise ValueError("Incompatible data type: ", dt)

    return res


def _get_inferred_dtypes(df):
    dtypes = dict()
    for i in range(len(df.columns)):
        c = df.columns[i]
        dt = pd.api.types.infer_dtype(df[c].values)
        logger.debug("Determined dtype of column %s to be %s", c, dt)
        dtypes[c] = dt
    return dtypes


def _get_inferred_dtypes_indexed(df):
    dtypes = _get_inferred_dtypes(df)
    # Performance improvement: avoid a expensive dict lookups by indexing
    # the column types by relative offset within the df.
    return list(dtypes[c] for c in df.columns)

Functions

def query(cluster, query, index=None, blobs=False, numpy=True)

Execute a query and return the results as DataFrames. Returns a dict of tablename / DataFrame pairs.

Parameters:

cluster : quasardb.Cluster Active connection to the QuasarDB cluster

query : str The query to execute.

blobs : bool or list Determines which QuasarDB blob-columns should be returned as bytearrays; otherwise they are returned as UTF-8 strings.

True means every blob column should be returned as byte-array, or a list will specify which specific columns. Defaults to false, meaning all blobs are returned as strings.

Expand source code
def query(cluster: quasardb.Cluster,
          query,
          index=None,
          blobs=False,
          numpy=True):
    """
    Execute a query and return the results as DataFrames. Returns a dict of
    tablename / DataFrame pairs.

    Parameters:
    -----------

    cluster : quasardb.Cluster
      Active connection to the QuasarDB cluster

    query : str
      The query to execute.

    blobs : bool or list
      Determines which QuasarDB blob-columns should be returned as bytearrays; otherwise
      they are returned as UTF-8 strings.

      True means every blob column should be returned as byte-array, or a list will
      specify which specific columns. Defaults to false, meaning all blobs are returned
      as strings.

    """
    logger.debug("querying and returning as DataFrame: %s", query)
    (index, m) = qdbnp.query(cluster, query, index=index, dict=True)
    df = pd.DataFrame(m)

    df.set_index(index, inplace=True)
    return df
def read_dataframe(table, **kwargs)

Read a Pandas Dataframe from a QuasarDB Timeseries table. Wraps around stream_dataframe(), and returns everything as a single dataframe. batch_size is always explicitly set to 0.

Expand source code
def read_dataframe(table, **kwargs):
    """
    Read a Pandas Dataframe from a QuasarDB Timeseries table. Wraps around stream_dataframe(), and
    returns everything as a single dataframe. batch_size is always explicitly set to 0.
    """

    if 'batch_size' in kwargs and kwargs['batch_size'] != 0 and kwargs['batch_size'] != None:
        logger.warn("Providing a batch size with read_dataframe is unsupported, overriding batch_size to 65536.")
        logger.warn("If you wish to traverse the data in smaller batches, please use: stream_dataframe().")
        kwargs['batch_size'] = 2**16

    # Note that this is *lazy*, dfs is a generator, not a list -- as such, dataframes will be
    # fetched on-demand, which means that an error could occur in the middle of processing
    # dataframes.
    dfs = stream_dataframe(table, **kwargs)

    return pd.concat(dfs)
def read_series(table, col_name, ranges=None)

Read a Pandas Timeseries from a single column.

Parameters:

table : quasardb.Timeseries QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

col_name : str Name of the column to read.

ranges : list A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects.

Expand source code
def read_series(table, col_name, ranges=None):
    """
    Read a Pandas Timeseries from a single column.

    Parameters:
    -----------

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    col_name : str
      Name of the column to read.

    ranges : list
      A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects.
    """
    read_with = {
        quasardb.ColumnType.Double: table.double_get_ranges,
        quasardb.ColumnType.Blob: table.blob_get_ranges,
        quasardb.ColumnType.String: table.string_get_ranges,
        quasardb.ColumnType.Int64: table.int64_get_ranges,
        quasardb.ColumnType.Timestamp: table.timestamp_get_ranges,
        quasardb.ColumnType.Symbol: table.string_get_ranges,
    }

    kwargs = {
        'column': col_name
    }

    if ranges is not None:
        kwargs['ranges'] = ranges

    # Dispatch based on column type
    t = table.column_type_by_id(col_name)

    logger.info("reading Series from column %s.%s with type %s",
                 table.get_name(), col_name, t)

    res = (read_with[t])(**kwargs)

    return Series(res[1], index=res[0])
def stream_dataframe(table, *, batch_size=65536, column_names=None, ranges=None)

Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size batch_size, which is useful when traversing a large dataset which does not fit into memory.

Parameters:

table : quasardb.Timeseries QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

batch_size : int The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows as batch size by default.

column_names : optional list List of columns to read in dataframe. The timestamp column '$timestamp' is always read.

Defaults to all columns.

ranges: optional list A list of time ranges to read, represented as tuples of Numpy datetime64[ns] objects. Defaults to the entire table.

Expand source code
def stream_dataframe(table : quasardb.Table, *, batch_size : int = 2**16, column_names : list = None, ranges : list = None):
    """
    Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful
    when traversing a large dataset which does not fit into memory.

    Parameters:
    -----------

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    batch_size : int
      The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows
      as batch size by default.

    column_names : optional list
      List of columns to read in dataframe. The timestamp column '$timestamp' is
      always read.

      Defaults to all columns.

    ranges: optional list
      A list of time ranges to read, represented as tuples of Numpy datetime64[ns] objects.
      Defaults to the entire table.

    """
    # Sanitize batch_size
    if batch_size == None:
        batch_size = 2**16
    elif not isinstance(batch_size, int):
        raise TypeError("batch_size should be an integer, but got: {} with value {}".format(type(batch_size), str(batch_size)))

    kwargs = {}

    if column_names:
        kwargs['column_names'] = column_names

    if ranges:
        kwargs['ranges'] = ranges

    with table.reader(**kwargs) as reader:
        for batch in reader:
            # We always expect the timestamp column, and set this as the index
            assert '$timestamp' in batch

            idx = pd.Index(batch.pop('$timestamp'), copy=False, name='$timestamp')
            df = pd.DataFrame(batch, index=idx)

            yield df
def write_dataframe(df, cluster, table, **kwargs)

Store a single dataframe into a table. Takes the same arguments as write_dataframes(), except only a single df/table combination.

Expand source code
def write_dataframe(
        df,
        cluster,
        table,
        **kwargs):
    """
    Store a single dataframe into a table. Takes the same arguments as `write_dataframes`, except only
    a single df/table combination.
    """
    write_dataframes([(table, df)], cluster, **kwargs)
def write_dataframes(dfs, cluster, *, create=False, shard_size=None, **kwargs)

Store dataframes into a table. Any additional parameters not documented here are passed to numpy.write_arrays(). Please consult the pydoc of that function for additional accepted parameters.

Parameters:

dfs: dict[str | quasardb.Table, pd.DataFrame] | list[tuple[str | quasardb.Table, pd.DataFrame]] This can be either a dict that maps table (either objects or names) to a dataframe, or a list of table<>dataframe tuples.

cluster: quasardb.Cluster Active connection to the QuasarDB cluster

create: optional bool Whether to create the table. Defaults to False.

shard_size: optional datetime.timedelta The shard size of the timeseries you wish to create when create is True.

Expand source code
def write_dataframes(
        dfs,
        cluster,
        *,
        create = False,
        shard_size = None,
        **kwargs):
    """
    Store dataframes into a table. Any additional parameters not documented here
    are passed to numpy.write_arrays(). Please consult the pydoc of that function
    for additional accepted parameters.

    Parameters:
    -----------

    dfs: dict[str | quasardb.Table, pd.DataFrame] | list[tuple[str | quasardb.Table, pd.DataFrame]]
      This can be either a dict that maps table (either objects or names) to a dataframe, or a list
      of table<>dataframe tuples.

    cluster: quasardb.Cluster
      Active connection to the QuasarDB cluster

    create: optional bool
      Whether to create the table. Defaults to False.

    shard_size: optional datetime.timedelta
      The shard size of the timeseries you wish to create when `create` is True.
    """

    # If dfs is a dict, we convert it to a list of tuples.
    if isinstance(dfs, dict):
        dfs = dfs.items()

    if shard_size is not None and create == False:
        raise ValueError("Invalid argument: shard size provided while create is False")

    # If the tables are provided as strings, we look them up.
    dfs_ = []
    for (table, df) in dfs:
        if isinstance(table, str):
            table = table_cache.lookup(table, cluster)

        dfs_.append((table, df))

    data_by_table = []

    for table, df in dfs_:
        logger.debug("quasardb.pandas.write_dataframe, create = %s", create)
        assert isinstance(df, pd.DataFrame)

        # Create table if requested
        if create:
            _create_table_from_df(df, table, shard_size)

        cinfos = [(x.name, x.type) for x in table.list_columns()]

        if not df.index.is_monotonic_increasing:
            logger.warn("dataframe index is unsorted, resorting dataframe based on index")
            df = df.sort_index().reindex()

        # We pass everything else to our qdbnp.write_arrays function, as generally speaking
        # it is (much) more sensible to deal with numpy arrays than Pandas dataframes:
        # pandas has the bad habit of wanting to cast data to different types if your data
        # is sparse, most notably forcing sparse integer arrays to floating points.

        data = _extract_columns(df, cinfos)
        data['$timestamp'] = df.index.to_numpy(copy=False,
                                               dtype='datetime64[ns]')

        data_by_table.append((table, data))

    return qdbnp.write_arrays(data_by_table, cluster,
                              table=None,
                              index=None,
                              **kwargs)
def write_pinned_dataframe(*args, **kwargs)

Legacy wrapper around write_dataframe()

Expand source code
def write_pinned_dataframe(*args, **kwargs):
    """
    Legacy wrapper around write_dataframe()
    """
    logger.warn("write_pinned_dataframe is deprecated and will be removed in a future release.")
    logger.warn("Please use write_dataframe directly instead")
    return write_dataframe(*args, **kwargs)
def write_series(series, table, col_name, infer_types=True, dtype=None)

Writes a Pandas Timeseries to a single column.

Parameters:

series : pandas.Series Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted to be transformed to appropriate QuasarDB type.

table : quasardb.Timeseries QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

col_name : str Column name to store data in.

Expand source code
def write_series(series,
                 table,
                 col_name,
                 infer_types=True,
                 dtype=None):
    """
    Writes a Pandas Timeseries to a single column.

    Parameters:
    -----------

    series : pandas.Series
      Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted
      to be transformed to appropriate QuasarDB type.

    table : quasardb.Timeseries
      QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

    col_name : str
      Column name to store data in.
    """

    logger.debug("write_series, table=%s, col_name=%s, infer_types=%s, dtype=%s", table.get_name(), col_name, infer_types, dtype)

    data = None
    index = None

    data = ma.masked_array(series.to_numpy(copy=False),
                           mask=series.isna())

    if infer_types is True:
        index = series.index.to_numpy('datetime64[ns]', copy=False)
    else:
        index = series.index.to_numpy(copy=False)

    assert data is not None
    assert index is not None

    return qdbnp.write_array(data=data,
                             index=index,
                             table=table,
                             column=col_name,
                             dtype=dtype,
                             infer_types=infer_types)

Classes

class PandasRequired (...)

Exception raised when trying to use QuasarDB pandas integration, but pandas has not been installed.

Expand source code
class PandasRequired(ImportError):
    """
    Exception raised when trying to use QuasarDB pandas integration, but
    pandas has not been installed.
    """
    pass

Ancestors

  • builtins.ImportError
  • builtins.Exception
  • builtins.BaseException