Package quasardb.numpy

Functions

def dtypes_equal(lhs, rhs)
Expand source code
def dtypes_equal(lhs, rhs):
    if lhs.kind == "U" or lhs.kind == "S":
        # Unicode and string data has variable length encoding, which means their itemsize
        # can be anything.
        #
        # We only care about dtype kind in this case.
        return lhs.kind == rhs.kind

    return lhs == rhs
def ensure_ma(xs, dtype=None)
Expand source code
def ensure_ma(xs, dtype=None):
    if isinstance(dtype, list):
        assert isinstance(xs, list) == True
        return [ensure_ma(xs_, dtype_) for (xs_, dtype_) in zip(xs, dtype)]

    # Don't bother if we're already a masked array
    if ma.isMA(xs):
        return xs

    if not isinstance(xs, np.ndarray):
        logger.debug("Provided data is not a numpy array: %s", type(xs))
        xs = np.array(xs, dtype=dtype)

    logger.debug("coercing array with dtype: %s", xs.dtype)

    if xs.dtype.kind in ["O", "U", "S"]:
        logger.debug("Data is object-like, masking None values")

        mask = xs == None
        return ma.masked_array(data=xs, mask=mask)
    else:
        logger.debug("Automatically masking invalid numbers")
        return ma.masked_invalid(xs, copy=False)
def query(cluster, query, index=None, dict=False)
Expand source code
def query(cluster, query, index=None, dict=False):
    """
    Execute a query and return the results as numpy arrays. The shape of the return value
    is always:

      tuple[index, dict | list[np.array]]


    If `dict` is True, constructs a dict[str, np.array] where the key is the column name.
    Otherwise, it returns a list of all the individual data arrays.



    Parameters:
    -----------

    cluster : quasardb.Cluster
      Active connection to the QuasarDB cluster

    query : str
      The query to execute.

    index : optional[str | int]
      If provided, resolves column and uses that as the index. If string (e.g. `$timestamp`), uses
      that column as the index. If int (e.g. `1`), looks up the column based on that offset.

    dict : bool
      If true, returns data arrays as a dict, otherwise a list of np.arrays.
      Defaults to False.

    """

    m = {}
    xs = cluster.query_numpy(query)

    return _xform_query_results(xs, index, dict)

Execute a query and return the results as numpy arrays. The shape of the return value is always:

tuple[index, dict | list[np.array]]

If dict is True, constructs a dict[str, np.array] where the key is the column name. Otherwise, it returns a list of all the individual data arrays.

Parameters:

cluster : quasardb.Cluster Active connection to the QuasarDB cluster

query : str The query to execute.

index : optional[str | int] If provided, resolves column and uses that as the index. If string (e.g. $timestamp), uses that column as the index. If int (e.g. 1), looks up the column based on that offset.

dict : bool If true, returns data arrays as a dict, otherwise a list of np.arrays. Defaults to False.

def read_array(table=None, column=None, ranges=None)
Expand source code
def read_array(table=None, column=None, ranges=None):
    if table is None:
        raise RuntimeError("A table is required.")

    if column is None:
        raise RuntimeError("A column is required.")

    kwargs = {"column": column}

    if ranges is not None:
        kwargs["ranges"] = ranges

    read_with = {
        quasardb.ColumnType.Double: table.double_get_ranges,
        quasardb.ColumnType.Blob: table.blob_get_ranges,
        quasardb.ColumnType.String: table.string_get_ranges,
        quasardb.ColumnType.Symbol: table.string_get_ranges,
        quasardb.ColumnType.Int64: table.int64_get_ranges,
        quasardb.ColumnType.Timestamp: table.timestamp_get_ranges,
    }

    ctype = table.column_type_by_id(column)

    fn = read_with[ctype]
    return fn(**kwargs)
def write_array(data=None, index=None, table=None, column=None, dtype=None, infer_types=True)
Expand source code
def write_array(
    data=None, index=None, table=None, column=None, dtype=None, infer_types=True
):
    """
    Write a Numpy array to a single column.

    Parameters:
    -----------

    data: np.array
      Numpy array with a dtype that is compatible with the column's type.

    index: np.array
      Numpy array with a datetime64[ns] dtype that will be used as the
      $timestamp axis for the data to be stored.

    dtype: optional np.dtype
      If provided, ensures the data array is converted to this dtype before
      insertion.

    infer_types: optional bool
      If true, when necessary will attempt to convert the data and index array
      to the best type for the column. For example, if you provide float64 data
      while the column's type is int64, it will automatically convert the data.

      Defaults to True. For production use cases where you want to avoid implicit
      conversions, we recommend always setting this to False.

    """

    if table is None:
        raise RuntimeError("A table is required.")

    if column is None:
        raise RuntimeError("A column is required.")

    if data is None:
        raise RuntimeError("A data numpy array is required.")

    if index is None:
        raise RuntimeError("An index numpy timestamp array is required.")

    data = ensure_ma(data, dtype=dtype)
    ctype = table.column_type_by_id(column)

    # We try to reuse some of the other functions, which assume array-like
    # shapes for column info and data. It's a bit hackish, but actually works
    # well.
    #
    # We should probably generalize this block of code with the same found in
    # write_arrays().

    cinfos = [(column, ctype)]
    dtype_ = [dtype]

    dtype = _coerce_dtype(dtype_, cinfos)

    if infer_types is True:
        dtype = _add_desired_dtypes(dtype, cinfos)

    # data_ = an array of [data]
    data_ = [data]
    data_ = _coerce_data(data_, dtype)
    _validate_dtypes(data_, cinfos)

    # No functions that assume array-of-data anymore, let's put it back
    data = data_[0]

    # Dispatch to the correct function
    write_with = {
        quasardb.ColumnType.Double: table.double_insert,
        quasardb.ColumnType.Blob: table.blob_insert,
        quasardb.ColumnType.String: table.string_insert,
        quasardb.ColumnType.Symbol: table.string_insert,
        quasardb.ColumnType.Int64: table.int64_insert,
        quasardb.ColumnType.Timestamp: table.timestamp_insert,
    }

    logger.info(
        "Writing array (%d rows of dtype %s) to columns %s.%s (type %s)",
        len(data),
        data.dtype,
        table.get_name(),
        column,
        ctype,
    )
    write_with[ctype](column, index, data)

Write a Numpy array to a single column.

Parameters:

data: np.array Numpy array with a dtype that is compatible with the column's type.

index: np.array Numpy array with a datetime64[ns] dtype that will be used as the $timestamp axis for the data to be stored.

dtype: optional np.dtype If provided, ensures the data array is converted to this dtype before insertion.

infer_types: optional bool If true, when necessary will attempt to convert the data and index array to the best type for the column. For example, if you provide float64 data while the column's type is int64, it will automatically convert the data.

Defaults to True. For production use cases where you want to avoid implicit conversions, we recommend always setting this to False.

def write_arrays(data,
cluster,
table=None,
*,
dtype=None,
index=None,
fast=False,
truncate=False,
deduplicate=False,
deduplication_mode='drop',
infer_types=True,
writer=None,
write_through=True,
retries=3,
**kwargs)
Expand source code
def write_arrays(
    data,
    cluster,
    table=None,
    *,
    dtype=None,
    index=None,
    _async=False,
    fast=False,
    truncate=False,
    deduplicate=False,
    deduplication_mode="drop",
    infer_types=True,
    writer=None,
    write_through=True,
    retries=3,
    # We accept additional kwargs that will be passed through the writer.push() methods
    **kwargs,
):
    """
    Write multiple aligned numpy arrays to a table.

    Parameters:
    -----------

    data: Iterable of np.array, or dict-like of str:np.array
      Numpy arrays to write into the database. Can either be a list of numpy arrays,
      in which case they are expected to be in the same order as table.list_columns(),
      and an array is provided for each of the columns. If `index` is None, the first
      array will be assumed to be an index with dtype `datetime64[ns]`.

      Alternatively, a dict of key/values may be provided, where the key is expected
      to be a table column label, and the value is expected to be a np.array. If present,
      a column with label '$timestamp' will be used as the index.

      In all cases, all numpy arrays are expected to be of exactly the same length as the
      index.

    cluster: quasardb.Cluster
      Active connection to the QuasarDB cluster

    table: quasardb.Table or str
      Either a string or a reference to a QuasarDB Timeseries table object.
      For example, 'my_table' or cluster.table('my_table') are both valid values.

      Defaults to False.

    index: optional np.array with dtype datetime64[ns]
      Optionally explicitly provide an array as the $timestamp index. If not provided,
      the first array provided to `data` will be used as the index.

    dtype: optional dtype, list of dtype, or dict of dtype
      Optional data type to force. If a single dtype, will force that dtype to all
      columns. If list-like, will map dtypes to dataframe columns by their offset.
      If dict-like, will map dtypes to dataframe columns by their label.

      If a dtype for a column is provided in this argument, and infer_types is also
      True, this argument takes precedence.

    deduplicate: bool or list[str]
      Enables server-side deduplication of data when it is written into the table.
      When True, automatically deduplicates rows when all values of a row are identical.
      When a list of strings is provided, deduplicates only based on the values of
      these columns.

      Defaults to False.

    deduplication_mode: 'drop' or 'upsert'
      When `deduplicate` is enabled, decides how deduplication is performed. 'drop' means
      any newly written duplicates are dropped, where 'upsert' means that the previously
      written data is updated to reflect the new data.

      Defaults to 'drop'.

    infer_types: optional bool
      If true, will attemp to convert types from Python to QuasarDB natives types if
      the provided dataframe has incompatible types. For example, a dataframe with integers
      will automatically convert these to doubles if the QuasarDB table expects it.

      Defaults to True. For production use cases where you want to avoid implicit conversions,
      we recommend setting this to False.

    truncate: optional bool
      Truncate (also referred to as upsert) the data in-place. Will detect time range to truncate
      from the time range inside the dataframe.

      Defaults to False.

    _async: optional bool
      If true, uses asynchronous insertion API where commits are buffered server-side and
      acknowledged before they are written to disk. If you insert to the same table from
      multiple processes, setting this to True may improve performance.

      Defaults to False.

    fast: optional bool
      Whether to use 'fast push'. If you incrementally add small batches of data to table,
      you may see better performance if you set this to True.

      Defaults to False.

    write_through: optional bool
      If True, data is not cached after write.
      By default is False, in which case caching is left at the discretion of the server.

    writer: optional quasardb.Writer
      Allows you to explicitly provide a Writer to use, which is expected to be
      initialized with the `table`.

      Reuse of the Writer allows for some performance improvements.

    retries: optional int or quasardb.RetryOptions
      Number of times to retry in case of a push failure. This is useful in case of async
      pipeline failures, or when doing transactional inserts that may occasionally cause
      transaction conflicts.

      Retries with exponential backoff, starts at 3 seconds, and doubles every retry attempt.

      Alternatively, a quasardb.RetryOptions object can be passed to more carefully fine-tune
      retry behavior.
    """

    if table:
        logger.debug("table explicitly provided, assuming single-table write")
        return write_arrays(
            [(table, data)],
            cluster,
            table=None,
            dtype=dtype,
            index=index,
            _async=_async,
            fast=fast,
            truncate=truncate,
            deduplicate=deduplicate,
            deduplication_mode=deduplication_mode,
            infer_types=infer_types,
            write_through=write_through,
            writer=writer,
            retries=retries,
            **kwargs,
        )

    ret = []

    # Create batch column info from dataframe
    if writer is None:
        writer = cluster.writer()

    n_rows = 0

    push_data = quasardb.WriterData()

    for table, data_ in data:
        # Acquire reference to table if string is provided
        if isinstance(table, str):
            table = table_cache.lookup(table, cluster)

        cinfos = [(x.name, x.type) for x in table.list_columns()]
        dtype = _coerce_dtype(dtype, cinfos)

        assert type(dtype) is list
        assert len(dtype) is len(cinfos)

        if index is None and isinstance(data_, dict) and "$timestamp" in data_:
            index_ = data_.pop("$timestamp")
            assert "$timestamp" not in data_
        elif index is not None:
            index_ = index
        else:
            raise RuntimeError("Invalid index: no index provided.")

        assert index_ is not None

        if infer_types is True:
            dtype = _add_desired_dtypes(dtype, cinfos)

        data_ = _ensure_list(data_, cinfos)

        if len(data_) != len(cinfos):
            raise InvalidDataCardinalityError(data_, cinfos)

        data_ = ensure_ma(data_, dtype=dtype)
        data_ = _coerce_data(data_, dtype)

        # Just some additional friendly information about incorrect dtypes, we'd
        # prefer to have this information thrown from Python instead of native
        # code as it generally makes for somewhat better error context.
        _validate_dtypes(data_, cinfos)

        deduplicate = _coerce_deduplicate(deduplicate, deduplication_mode, cinfos)

        # Sanity check
        assert len(data_) == len(cinfos)

        for i in range(len(data_)):
            assert len(data_[i]) == len(index_)

        push_data.append(table, index_, data_)

        n_rows += len(index_)
        ret.append(table)

    retries = _coerce_retries(retries)

    # By default, we push all additional kwargs to the writer.push() function. This allows transparent propagation
    # arguments.
    #
    # The initial use case was that so we can add additional parameters for test mocks, e.g. `mock_failures` so that
    # we can validate the retry functionality.
    push_kwargs = kwargs
    push_kwargs["deduplicate"] = deduplicate
    push_kwargs["deduplication_mode"] = deduplication_mode
    push_kwargs["write_through"] = write_through
    push_kwargs["retries"] = retries

    logger.debug("pushing %d rows", n_rows)
    start = time.time()

    if fast is True:
        push_kwargs["push_mode"] = quasardb.WriterPushMode.Fast
    elif truncate is True:
        push_kwargs["push_mode"] = quasardb.WriterPushMode.Truncate
    elif isinstance(truncate, tuple):
        push_kwargs["push_mode"] = quasardb.WriterPushMode.Truncate
        push_kwargs["range"] = truncate
    elif _async is True:
        push_kwargs["push_mode"] = quasardb.WriterPushMode.Async
    else:
        push_kwargs["push_mode"] = quasardb.WriterPushMode.Transactional

    writer.push(push_data, **push_kwargs)

    logger.debug("pushed %d rows in %s seconds", n_rows, (time.time() - start))

    return ret

Write multiple aligned numpy arrays to a table.

Parameters:

data: Iterable of np.array, or dict-like of str:np.array Numpy arrays to write into the database. Can either be a list of numpy arrays, in which case they are expected to be in the same order as table.list_columns(), and an array is provided for each of the columns. If index is None, the first array will be assumed to be an index with dtype datetime64[ns].

Alternatively, a dict of key/values may be provided, where the key is expected to be a table column label, and the value is expected to be a np.array. If present, a column with label '$timestamp' will be used as the index.

In all cases, all numpy arrays are expected to be of exactly the same length as the index.

cluster: quasardb.Cluster Active connection to the QuasarDB cluster

table: quasardb.Table or str Either a string or a reference to a QuasarDB Timeseries table object. For example, 'my_table' or cluster.table('my_table') are both valid values.

Defaults to False.

index: optional np.array with dtype datetime64[ns] Optionally explicitly provide an array as the $timestamp index. If not provided, the first array provided to data will be used as the index.

dtype: optional dtype, list of dtype, or dict of dtype Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label.

If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence.

deduplicate: bool or list[str] Enables server-side deduplication of data when it is written into the table. When True, automatically deduplicates rows when all values of a row are identical. When a list of strings is provided, deduplicates only based on the values of these columns.

Defaults to False.

deduplication_mode: 'drop' or 'upsert' When deduplicate is enabled, decides how deduplication is performed. 'drop' means any newly written duplicates are dropped, where 'upsert' means that the previously written data is updated to reflect the new data.

Defaults to 'drop'.

infer_types: optional bool If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it.

Defaults to True. For production use cases where you want to avoid implicit conversions, we recommend setting this to False.

truncate: optional bool Truncate (also referred to as upsert) the data in-place. Will detect time range to truncate from the time range inside the dataframe.

Defaults to False.

_async: optional bool If true, uses asynchronous insertion API where commits are buffered server-side and acknowledged before they are written to disk. If you insert to the same table from multiple processes, setting this to True may improve performance.

Defaults to False.

fast: optional bool Whether to use 'fast push'. If you incrementally add small batches of data to table, you may see better performance if you set this to True.

Defaults to False.

write_through: optional bool If True, data is not cached after write. By default is False, in which case caching is left at the discretion of the server.

writer: optional quasardb.Writer Allows you to explicitly provide a Writer to use, which is expected to be initialized with the table.

Reuse of the Writer allows for some performance improvements.

retries: optional int or quasardb.RetryOptions Number of times to retry in case of a push failure. This is useful in case of async pipeline failures, or when doing transactional inserts that may occasionally cause transaction conflicts.

Retries with exponential backoff, starts at 3 seconds, and doubles every retry attempt.

Alternatively, a quasardb.RetryOptions object can be passed to more carefully fine-tune retry behavior.

Classes

class IncompatibleDtypeError (cname=None, ctype=None, expected=None, provided=None)
Expand source code
class IncompatibleDtypeError(TypeError):
    """
    Exception raised when a provided dtype is not the expected dtype.
    """

    def __init__(self, cname=None, ctype=None, expected=None, provided=None):
        self.cname = cname
        self.ctype = ctype
        self.expected = expected
        self.provided = provided
        super().__init__(self.msg())

    def msg(self):
        return "Data for column '{}' with type '{}' was provided in dtype '{}' but need '{}'.".format(
            self.cname, self.ctype, self.provided, self.expected
        )

Exception raised when a provided dtype is not the expected dtype.

Ancestors

  • builtins.TypeError
  • builtins.Exception
  • builtins.BaseException

Methods

def msg(self)
Expand source code
def msg(self):
    return "Data for column '{}' with type '{}' was provided in dtype '{}' but need '{}'.".format(
        self.cname, self.ctype, self.provided, self.expected
    )
class IncompatibleDtypeErrors (xs)
Expand source code
class IncompatibleDtypeErrors(TypeError):
    """
    Wraps multiple dtype errors
    """

    def __init__(self, xs):
        self.xs = xs
        super().__init__(self.msg())

    def msg(self):
        return "\n".join(x.msg() for x in self.xs)

Wraps multiple dtype errors

Ancestors

  • builtins.TypeError
  • builtins.Exception
  • builtins.BaseException

Methods

def msg(self)
Expand source code
def msg(self):
    return "\n".join(x.msg() for x in self.xs)
class InvalidDataCardinalityError (data, cinfos)
Expand source code
class InvalidDataCardinalityError(ValueError):
    """
    Raised when the provided data arrays doesn't match the table's columns.
    """

    def __init__(self, data, cinfos):
        self.data = data
        self.cinfos = cinfos
        super().__init__(self.msg())

    def msg(self):
        return "Provided data array length '{}' exceeds amount of table columns '{}', unable to map data to columns".format(
            len(self.data), len(self.cinfos)
        )

Raised when the provided data arrays doesn't match the table's columns.

Ancestors

  • builtins.ValueError
  • builtins.Exception
  • builtins.BaseException

Methods

def msg(self)
Expand source code
def msg(self):
    return "Provided data array length '{}' exceeds amount of table columns '{}', unable to map data to columns".format(
        len(self.data), len(self.cinfos)
    )
class NumpyRequired (*args, **kwargs)
Expand source code
class NumpyRequired(ImportError):
    """
    Exception raised when trying to use QuasarDB pandas integration, but
    pandas has not been installed.
    """

    pass

Exception raised when trying to use QuasarDB pandas integration, but pandas has not been installed.

Ancestors

  • builtins.ImportError
  • builtins.Exception
  • builtins.BaseException