Package quasardb.numpy
Functions
def dtypes_equal(lhs: DType, rhs: DType) ‑> bool-
Expand source code
def dtypes_equal(lhs: DType, rhs: DType) -> bool: if lhs.kind == "U" or lhs.kind == "S": # Unicode and string data has variable length encoding, which means their itemsize # can be anything. # # We only care about dtype kind in this case. return lhs.kind == rhs.kind return lhs == rhs def ensure_ma(xs: Any, dtype: Optional[Union[DType, List[Optional[DType]]]] = None) ‑> List[numpy.ma.MaskedArray] | numpy.ma.MaskedArray-
Expand source code
def ensure_ma( xs: Any, dtype: Optional[Union[DType, List[Optional[DType]]]] = None ) -> Union[List[MaskedArrayAny], MaskedArrayAny]: if isinstance(dtype, list): assert isinstance(xs, list) == True return [_ensure_ma(xs_, dtype_) for (xs_, dtype_) in zip(xs, dtype)] return _ensure_ma(xs, dtype) def query(cluster: quasardb.Cluster,
query: str,
index: Optional[Union[str, int]] = None,
dict: bool = False) ‑> Tuple[numpy.ndarray, Dict[str, numpy.ma.MaskedArray] | List[numpy.ma.MaskedArray]]-
Expand source code
def query( cluster: quasardb.Cluster, query: str, index: Optional[Union[str, int]] = None, dict: bool = False, ) -> Tuple[NDArrayAny, Union[Dict[str, MaskedArrayAny], List[MaskedArrayAny]]]: """ Execute a query and return the results as numpy arrays. The shape of the return value is always: tuple[index, dict | list[np.array]] If `dict` is True, constructs a dict[str, np.array] where the key is the column name. Otherwise, it returns a list of all the individual data arrays. Parameters: ----------- cluster : quasardb.Cluster Active connection to the QuasarDB cluster query : str The query to execute. index : optional[str | int] If provided, resolves column and uses that as the index. If string (e.g. `$timestamp`), uses that column as the index. If int (e.g. `1`), looks up the column based on that offset. dict : bool If true, returns data arrays as a dict, otherwise a list of np.arrays. Defaults to False. """ xs = cluster.query_numpy(query) return _xform_query_results(xs, index, dict)Execute a query and return the results as numpy arrays. The shape of the return value is always:
tuple[index, dict | list[np.array]]
If
dictis True, constructs a dict[str, np.array] where the key is the column name. Otherwise, it returns a list of all the individual data arrays.Parameters:
cluster : quasardb.Cluster Active connection to the QuasarDB cluster
query : str The query to execute.
index : optional[str | int] If provided, resolves column and uses that as the index. If string (e.g.
$timestamp), uses that column as the index. If int (e.g.1), looks up the column based on that offset.dict : bool If true, returns data arrays as a dict, otherwise a list of np.arrays. Defaults to False.
def read_arrays(conn: Cluster,
tables: List[TableLike],
*,
batch_size: Optional[int] = 65536,
column_names: Optional[Sequence[str]] = None,
ranges: Optional[RangeSet] = None) ‑> Tuple[numpy.ndarray, Dict[str, numpy.ma.MaskedArray]]-
Expand source code
def read_arrays( conn: Cluster, tables: List[TableLike], *, batch_size: Optional[int] = 2**16, column_names: Optional[Sequence[str]] = None, ranges: Optional[RangeSet] = None, ) -> IndexedMaskedArrays: """ Read any number of columns from tables as numpy masked arrays. Parameters: ----------- conn: quasardb.Cluster Connection to the QuasarDB database. tables : list[str | quasardb.Table] QuasarDB tables to read, as a list of strings or quasardb table objects. batch_size : int The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows as batch size by default. column_names: optional sequence[str] Column names to read. If None or an empty sequence is provided, all table columns are read. ranges: optional list[tuple] Time ranges to read. If None, the full available range is read. Returns: -------- tuple[numpy.ndarray, dict[str, numpy.ma.MaskedArray]] A pair consisting of the shared timestamp index and a mapping of column names to masked arrays. Examples: --------- Read all columns: >>> idx, cols = qdbnp.read_arrays(conn, [my_table], column_names=[]) Read a subset of columns for a given time range: >>> idx, cols = qdbnp.read_arrays( ... conn, ... [my_table], ... column_names=["open", "close"], ... ranges=[(start, end)], ... ) >>> opens = cols["open"] >>> closes = cols["close"] """ xs = stream_arrays( conn, tables, batch_size=batch_size, column_names=column_names, ranges=ranges, ) try: return _concat_array_batches(xs) except ValueError as e: logger.error( "Error while concatenating arrays. This can happen if result set is empty. Returning empty arrays. Error: %s", e, ) return np.array([], dtype=np.dtype("datetime64[ns]")), {}Read any number of columns from tables as numpy masked arrays.
Parameters:
conn: quasardb.Cluster Connection to the QuasarDB database.
tables : list[str | quasardb.Table] QuasarDB tables to read, as a list of strings or quasardb table objects.
batch_size : int The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows as batch size by default.
column_names: optional sequence[str] Column names to read. If None or an empty sequence is provided, all table columns are read.
ranges: optional list[tuple] Time ranges to read. If None, the full available range is read.
Returns:
tuple[numpy.ndarray, dict[str, numpy.ma.MaskedArray]] A pair consisting of the shared timestamp index and a mapping of column names to masked arrays.
Examples:
Read all columns:
>>> idx, cols = qdbnp.read_arrays(conn, [my_table], column_names=[])Read a subset of columns for a given time range:
>>> idx, cols = qdbnp.read_arrays( ... conn, ... [my_table], ... column_names=["open", "close"], ... ranges=[(start, end)], ... ) >>> opens = cols["open"] >>> closes = cols["close"] def stream_arrays(conn: Cluster,
tables: List[TableLike],
*,
batch_size: Optional[int] = 65536,
column_names: Optional[Sequence[str]] = None,
ranges: Optional[RangeSet] = None) ‑> Iterator[Tuple[numpy.ndarray, Dict[str, numpy.ma.MaskedArray]]]-
Expand source code
def stream_arrays( conn: Cluster, tables: List[TableLike], *, batch_size: Optional[int] = 2**16, column_names: Optional[Sequence[str]] = None, ranges: Optional[RangeSet] = None, ) -> Iterator[IndexedMaskedArrays]: """ Read one or more tables as numpy masked arrays. Returns a generator with indexed batches of size `batch_size`, which is useful when traversing a large dataset which does not fit into memory. """ # Sanitize batch_size if batch_size is None: batch_size = 2**16 elif not isinstance(batch_size, int): raise TypeError( "batch_size should be an integer, but got: {} with value {}".format( type(batch_size), str(batch_size) ) ) kwargs: Dict[str, Any] = {"batch_size": batch_size} if column_names: kwargs["column_names"] = column_names if ranges: kwargs["ranges"] = ranges coerce_table_name_fn = lambda x: x if isinstance(x, str) else x.get_name() kwargs["table_names"] = [coerce_table_name_fn(x) for x in tables] with conn.reader(**kwargs) as reader: for batch in reader: yield _reader_batch_to_arrays(batch)Read one or more tables as numpy masked arrays. Returns a generator with indexed batches of size
batch_size, which is useful when traversing a large dataset which does not fit into memory. def write_arrays(data: Any,
cluster: quasardb.Cluster,
table: Optional[Union[str, Table]] = None,
*,
dtype: Optional[Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]]] = None,
index: Optional[NDArrayTime] = None,
push_mode: Optional[quasardb.WriterPushMode] = None,
fast: bool = False,
truncate: Union[bool, Tuple[Any, ...]] = False,
truncate_range: Optional[Tuple[Any, ...]] = None,
deduplicate: Union[bool, str, List[str]] = False,
deduplication_mode: str = 'drop',
infer_types: bool = True,
writer: Optional[Writer] = None,
write_through: bool = True,
retries: Union[int, quasardb.RetryOptions] = 3,
**kwargs: Any) ‑> List[Table]-
Expand source code
def write_arrays( data: Any, cluster: quasardb.Cluster, table: Optional[Union[str, Table]] = None, *, dtype: Optional[ Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]] ] = None, index: Optional[NDArrayTime] = None, # TODO: Set the default push_mode after removing _async, fast and truncate push_mode: Optional[quasardb.WriterPushMode] = None, _async: bool = False, fast: bool = False, truncate: Union[bool, Tuple[Any, ...]] = False, truncate_range: Optional[Tuple[Any, ...]] = None, deduplicate: Union[bool, str, List[str]] = False, deduplication_mode: str = "drop", infer_types: bool = True, writer: Optional[Writer] = None, write_through: bool = True, retries: Union[int, quasardb.RetryOptions] = 3, # We accept additional kwargs that will be passed through the writer.push() methods **kwargs: Any, ) -> List[Table]: """ Write multiple aligned numpy arrays to a table. Parameters: ----------- data: Iterable of np.array, or dict-like of str:np.array Numpy arrays to write into the database. Can either be a list of numpy arrays, in which case they are expected to be in the same order as table.list_columns(), and an array is provided for each of the columns. If `index` is None, the first array will be assumed to be an index with dtype `datetime64[ns]`. Alternatively, a dict of key/values may be provided, where the key is expected to be a table column label, and the value is expected to be a np.array. If present, a column with label '$timestamp' will be used as the index. In all cases, all numpy arrays are expected to be of exactly the same length as the index. cluster: quasardb.Cluster Active connection to the QuasarDB cluster table: quasardb.Table or str Either a string or a reference to a QuasarDB Timeseries table object. For example, 'my_table' or cluster.table('my_table') are both valid values. Defaults to False. index: optional np.array with dtype datetime64[ns] Optionally explicitly provide an array as the $timestamp index. If not provided, the first array provided to `data` will be used as the index. dtype: optional dtype, list of dtype, or dict of dtype Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label. If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence. deduplicate: bool or list[str] Enables server-side deduplication of data when it is written into the table. When True, automatically deduplicates rows when all values of a row are identical. When a list of strings is provided, deduplicates only based on the values of these columns. Defaults to False. deduplication_mode: 'drop' or 'upsert' When `deduplicate` is enabled, decides how deduplication is performed. 'drop' means any newly written duplicates are dropped, where 'upsert' means that the previously written data is updated to reflect the new data. Defaults to 'drop'. infer_types: optional bool If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it. Defaults to True. For production use cases where you want to avoid implicit conversions, we recommend setting this to False. push_mode: optional quasardb.WriterPushMode The mode used for inserting data. Can be either a string or a `WriterPushMode` enumeration item. Available options: * `Truncate`: Truncate (also referred to as upsert) the data in-place. Will detect time range to truncate from the time range inside the dataframe. * `Async`: Uses asynchronous insertion API where commits are buffered server-side and acknowledged before they are written to disk. If you insert to the same table from multiple processes, setting this to True may improve performance. * `Fast`: Whether to use 'fast push'. If you incrementally add small batches of data to table, you may see better performance if you set this to True. * `Transactional`: Ensures full transactional consistency. Defaults to `Transactional`. truncate: optional bool **DEPRECATED** - Use `push_mode=WriterPushMode.Truncate` instead. Truncate (also referred to as upsert) the data in-place. Will detect time range to truncate from the time range inside the dataframe. Defaults to False. truncate_range: optional tuple Time range to truncate from the time range inside the dataframe. _async: optional bool **DEPRECATED** - Use `push_mode=WriterPushMode.Async` instead. If true, uses asynchronous insertion API where commits are buffered server-side and acknowledged before they are written to disk. If you insert to the same table from multiple processes, setting this to True may improve performance. Defaults to False. fast: optional bool **DEPRECATED** - Use `push_mode=WriterPushMode.Fast` instead. Whether to use 'fast push'. If you incrementally add small batches of data to table, you may see better performance if you set this to True. Defaults to False. write_through: optional bool If True, data is not cached after write. By default is False, in which case caching is left at the discretion of the server. writer: optional quasardb.Writer Allows you to explicitly provide a Writer to use, which is expected to be initialized with the `table`. Reuse of the Writer allows for some performance improvements. retries: optional int or quasardb.RetryOptions Number of times to retry in case of a push failure. This is useful in case of async pipeline failures, or when doing transactional inserts that may occasionally cause transaction conflicts. Retries with exponential backoff, starts at 3 seconds, and doubles every retry attempt. Alternatively, a quasardb.RetryOptions object can be passed to more carefully fine-tune retry behavior. """ if table: logger.debug("table explicitly provided, assuming single-table write") data = [(table, data)] table = None _type_check(push_mode, "push_mode", target_type=quasardb.WriterPushMode) deprecation_stacklevel = kwargs.pop("deprecation_stacklevel", 1) + 1 if isinstance(truncate, tuple): # Especial case, truncate might be a tuple indicating the range. _kwarg_deprecation_warning( "truncate", truncate, ["push_mode", "truncate_range"], [quasardb.WriterPushMode.Truncate, truncate], deprecation_stacklevel, ) truncate_range = truncate_range or truncate truncate = True kwarg_to_mode = { # "kwarg": (kwarg_type, kwarg_push_mode, is_deprecated) "fast": (bool, quasardb.WriterPushMode.Fast, True), "_async": (bool, quasardb.WriterPushMode.Async, True), "truncate": (bool, quasardb.WriterPushMode.Truncate, True), "truncate_range": (tuple, quasardb.WriterPushMode.Truncate, False), } for kwarg, info in kwarg_to_mode.items(): expected_type, mode, deprecated = info kwarg_value = locals()[kwarg] _type_check(kwarg_value, kwarg, target_type=expected_type) if kwarg_value: if push_mode and push_mode != mode: raise quasardb.InvalidArgumentError( f"Found '{kwarg}' in kwargs, but push mode is already set to {push_mode}" ) push_mode = mode if deprecated: _kwarg_deprecation_warning( kwarg, kwarg_value, ["push_mode"], [mode], deprecation_stacklevel ) if not push_mode: push_mode = quasardb.WriterPushMode.Transactional # Create batch column info from dataframe if writer is None: writer = cluster.writer() ret: List[Table] = [] n_rows = 0 push_data = quasardb.WriterData() for table_, data_ in data: # Acquire reference to table_ if string is provided if isinstance(table_, str): table_ = table_cache.lookup(table_, cluster) cinfos = [(x.name, x.type) for x in table_.list_columns()] dtype_ = _coerce_dtype(dtype, cinfos) assert type(dtype_) is list assert len(dtype_) is len(cinfos) if index is None and isinstance(data_, dict) and "$timestamp" in data_: # Create shallow copy of `data_` so that we don't modify the reference, i.e. # delete keys. # # This ensures that the user can call the same function multiple times without # side-effects. data_ = data_.copy() index_ = data_.pop("$timestamp") if ma.isMA(index_): # Index might be a masked array index_ = index_.data assert "$timestamp" not in data_ elif index is not None: index_ = index else: raise RuntimeError("Invalid index: no index provided.") assert index_ is not None if infer_types is True: dtype_ = _add_desired_dtypes(dtype_, cinfos) data_ = _ensure_list(data_, cinfos) if len(data_) != len(cinfos): raise InvalidDataCardinalityError(data_, cinfos) data_ = ensure_ma(data_, dtype=dtype_) assert isinstance(data_, list) data_ = _coerce_data(data_, dtype_) # Just some additional friendly information about incorrect dtypes, we'd # prefer to have this information thrown from Python instead of native # code as it generally makes for somewhat better error context. _validate_dtypes(data_, cinfos) deduplicate = _coerce_deduplicate(deduplicate, deduplication_mode, cinfos) # Sanity check assert len(data_) == len(cinfos) for i in range(len(data_)): assert len(data_[i]) == len(index_) push_data.append(table_, index_, data_) n_rows += len(index_) ret.append(table_) retries = _coerce_retries(retries) # By default, we push all additional kwargs to the writer.push() function. This allows transparent propagation # arguments. # # The initial use case was that so we can add additional parameters for test mocks, e.g. `mock_failures` so that # we can validate the retry functionality. push_kwargs = kwargs push_kwargs["deduplicate"] = deduplicate push_kwargs["deduplication_mode"] = deduplication_mode push_kwargs["write_through"] = write_through push_kwargs["retries"] = retries push_kwargs["push_mode"] = push_mode if truncate_range: push_kwargs["range"] = truncate_range logger.debug("pushing %d rows", n_rows) start = time.time() writer.push(push_data, **push_kwargs) logger.debug("pushed %d rows in %s seconds", n_rows, (time.time() - start)) return retWrite multiple aligned numpy arrays to a table.
Parameters:
data: Iterable of np.array, or dict-like of str:np.array Numpy arrays to write into the database. Can either be a list of numpy arrays, in which case they are expected to be in the same order as table.list_columns(), and an array is provided for each of the columns. If
indexis None, the first array will be assumed to be an index with dtypedatetime64[ns].Alternatively, a dict of key/values may be provided, where the key is expected to be a table column label, and the value is expected to be a np.array. If present, a column with label '$timestamp' will be used as the index.
In all cases, all numpy arrays are expected to be of exactly the same length as the index.
cluster: quasardb.Cluster Active connection to the QuasarDB cluster
table: quasardb.Table or str Either a string or a reference to a QuasarDB Timeseries table object. For example, 'my_table' or cluster.table('my_table') are both valid values.
Defaults to False.
index: optional np.array with dtype datetime64[ns] Optionally explicitly provide an array as the $timestamp index. If not provided, the first array provided to
datawill be used as the index.dtype: optional dtype, list of dtype, or dict of dtype Optional data type to force. If a single dtype, will force that dtype to all columns. If list-like, will map dtypes to dataframe columns by their offset. If dict-like, will map dtypes to dataframe columns by their label.
If a dtype for a column is provided in this argument, and infer_types is also True, this argument takes precedence.
deduplicate: bool or list[str] Enables server-side deduplication of data when it is written into the table. When True, automatically deduplicates rows when all values of a row are identical. When a list of strings is provided, deduplicates only based on the values of these columns.
Defaults to False.
deduplication_mode: 'drop' or 'upsert' When
deduplicateis enabled, decides how deduplication is performed. 'drop' means any newly written duplicates are dropped, where 'upsert' means that the previously written data is updated to reflect the new data.Defaults to 'drop'.
infer_types: optional bool If true, will attemp to convert types from Python to QuasarDB natives types if the provided dataframe has incompatible types. For example, a dataframe with integers will automatically convert these to doubles if the QuasarDB table expects it.
Defaults to True. For production use cases where you want to avoid implicit conversions, we recommend setting this to False.
push_mode: optional quasardb.WriterPushMode The mode used for inserting data. Can be either a string or a
WriterPushModeenumeration item. Available options: *Truncate: Truncate (also referred to as upsert) the data in-place. Will detect time range to truncate from the time range inside the dataframe. *Async: Uses asynchronous insertion API where commits are buffered server-side and acknowledged before they are written to disk. If you insert to the same table from multiple processes, setting this to True may improve performance. *Fast: Whether to use 'fast push'. If you incrementally add small batches of data to table, you may see better performance if you set this to True. *Transactional: Ensures full transactional consistency.Defaults to
Transactional.truncate: optional bool DEPRECATED - Use
push_mode=WriterPushMode.Truncateinstead. Truncate (also referred to as upsert) the data in-place. Will detect time range to truncate from the time range inside the dataframe.Defaults to False.
truncate_range: optional tuple Time range to truncate from the time range inside the dataframe.
_async: optional bool DEPRECATED - Use
push_mode=WriterPushMode.Asyncinstead. If true, uses asynchronous insertion API where commits are buffered server-side and acknowledged before they are written to disk. If you insert to the same table from multiple processes, setting this to True may improve performance.Defaults to False.
fast: optional bool DEPRECATED - Use
push_mode=WriterPushMode.Fastinstead. Whether to use 'fast push'. If you incrementally add small batches of data to table, you may see better performance if you set this to True.Defaults to False.
write_through: optional bool If True, data is not cached after write. By default is False, in which case caching is left at the discretion of the server.
writer: optional quasardb.Writer Allows you to explicitly provide a Writer to use, which is expected to be initialized with the
table.Reuse of the Writer allows for some performance improvements.
retries: optional int or quasardb.RetryOptions Number of times to retry in case of a push failure. This is useful in case of async pipeline failures, or when doing transactional inserts that may occasionally cause transaction conflicts.
Retries with exponential backoff, starts at 3 seconds, and doubles every retry attempt.
Alternatively, a quasardb.RetryOptions object can be passed to more carefully fine-tune retry behavior.
Classes
class IncompatibleDtypeError (cname: Optional[str] = None,
ctype: Optional[quasardb.ColumnType] = None,
expected: Optional[List[DType]] = None,
provided: Optional[DType] = None)-
Expand source code
class IncompatibleDtypeError(TypeError): """ Exception raised when a provided dtype is not the expected dtype. """ def __init__( self, cname: Optional[str] = None, ctype: Optional[quasardb.ColumnType] = None, expected: Optional[List[DType]] = None, provided: Optional[DType] = None, ): self.cname = cname self.ctype = ctype self.expected = expected self.provided = provided super().__init__(self.msg()) def msg(self) -> str: return "Data for column '{}' with type '{}' was provided in dtype '{}' but need '{}'.".format( self.cname, self.ctype, self.provided, self.expected )Exception raised when a provided dtype is not the expected dtype.
Ancestors
- builtins.TypeError
- builtins.Exception
- builtins.BaseException
Methods
def msg(self) ‑> str-
Expand source code
def msg(self) -> str: return "Data for column '{}' with type '{}' was provided in dtype '{}' but need '{}'.".format( self.cname, self.ctype, self.provided, self.expected )
class IncompatibleDtypeErrors (xs: List[IncompatibleDtypeError])-
Expand source code
class IncompatibleDtypeErrors(TypeError): """ Wraps multiple dtype errors """ def __init__(self, xs: List[IncompatibleDtypeError]): self.xs = xs super().__init__(self.msg()) def msg(self) -> str: return "\n".join(x.msg() for x in self.xs)Wraps multiple dtype errors
Ancestors
- builtins.TypeError
- builtins.Exception
- builtins.BaseException
Methods
def msg(self) ‑> str-
Expand source code
def msg(self) -> str: return "\n".join(x.msg() for x in self.xs)
class InvalidDataCardinalityError (data: List[Any], cinfos: List[Any])-
Expand source code
class InvalidDataCardinalityError(ValueError): """ Raised when the provided data arrays doesn't match the table's columns. """ def __init__(self, data: List[Any], cinfos: List[Any]) -> None: self.data = data self.cinfos = cinfos super().__init__(self.msg()) def msg(self) -> str: return "Provided data array length '{}' exceeds amount of table columns '{}', unable to map data to columns".format( len(self.data), len(self.cinfos) )Raised when the provided data arrays doesn't match the table's columns.
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
Methods
def msg(self) ‑> str-
Expand source code
def msg(self) -> str: return "Provided data array length '{}' exceeds amount of table columns '{}', unable to map data to columns".format( len(self.data), len(self.cinfos) )
class NumpyRequired (*args, **kwargs)-
Expand source code
class NumpyRequired(ImportError): """ Exception raised when trying to use QuasarDB pandas integration, but pandas has not been installed. """ passException raised when trying to use QuasarDB pandas integration, but pandas has not been installed.
Ancestors
- builtins.ImportError
- builtins.Exception
- builtins.BaseException