Package quasardb.pandas
Functions
def query(cluster: Cluster,
query: str,
index: Optional[str] = None,
blobs: bool = False,
numpy: bool = True) ‑> pandas.core.frame.DataFrame-
Expand source code
def query( cluster: Cluster, query: str, index: Optional[str] = None, blobs: bool = False, numpy: bool = True, ) -> pd.DataFrame: """ Execute *query* and return the result as a pandas DataFrame. Parameters ---------- cluster : quasardb.Cluster Active connection to the QuasarDB cluster. query : str The query to execute. index : str | None, default None Column to use as index. When None a synthetic index is created and named "$index". blobs, numpy DEPRECATED - no longer used. Supplying a non-default value raises a DeprecationWarning and the argument is ignored. """ # ------------------------------------------------------------------ deprecations if blobs is not False: warnings.warn( "`blobs` is deprecated and will be removed in a future version; " "the argument is ignored.", DeprecationWarning, stacklevel=2, ) if numpy is not True: warnings.warn( "`numpy` is deprecated and will be removed in a future version; " "the argument is ignored.", DeprecationWarning, stacklevel=2, ) # ------------------------------------------------------------------------------ logger.debug("querying and returning as DataFrame: %s", query) index_vals, m = qdbnp.query(cluster, query, index=index, dict=True) index_name = "$index" if index is None else index index_obj = pd.Index(index_vals, name=index_name) return pd.DataFrame(m, index=index_obj)Execute query and return the result as a pandas DataFrame.
Parameters
cluster:quasardb.Cluster- Active connection to the QuasarDB cluster.
query:str- The query to execute.
index:str | None, defaultNone- Column to use as index. When None a synthetic index is created and named "$index".
blobs,numpy- DEPRECATED - no longer used. Supplying a non-default value raises a DeprecationWarning and the argument is ignored.
def read_dataframe(conn: Cluster,
table: TableLike,
*,
batch_size: Optional[int] = 65536,
column_names: Optional[List[str]] = None,
ranges: Optional[RangeSet] = None) ‑> pandas.core.frame.DataFrame-
Expand source code
def read_dataframe( conn: Cluster, table: TableLike, *, batch_size: Optional[int] = 2**16, column_names: Optional[List[str]] = None, ranges: Optional[RangeSet] = None, ) -> pd.DataFrame: """ Read a Pandas Dataframe from a QuasarDB Timeseries table. Wraps around stream_dataframes(), and returns everything as a single dataframe. batch_size is always explicitly set to 0. Parameters: ----------- conn : quasardb.Cluster Connection to the QuasarDB database. table : str | quasardb.Table QuasarDB table to stream, either as a string or a table object. When re-executing the same function multiple times on the same tables, providing the table as an object has a performance benefit. """ if batch_size is not None and batch_size != 0: logger.warning( "Providing a batch size with read_dataframe is unsupported, overriding batch_size to 65536." ) logger.warning( "If you wish to traverse the data in smaller batches, please use: stream_dataframe()." ) batch_size = 2**16 # Note that this is *lazy*, dfs is a generator, not a list -- as such, dataframes will be # fetched on-demand, which means that an error could occur in the middle of processing # dataframes. dfs = stream_dataframe( conn, table, batch_size=batch_size, column_names=column_names, ranges=ranges ) # if result of stream_dataframe is empty this could result in ValueError on pd.concat() # as stream_dataframe is a generator there is no easy way to check for this condition without evaluation # the most simple way is to catch the ValueError and return an empty DataFrame try: return pd.concat(dfs, copy=False) # type: ignore[call-overload] except ValueError as e: logger.error( "Error while concatenating dataframes. This can happen if result set is empty. Returning empty dataframe. Error: %s", e, ) return pd.DataFrame()Read a Pandas Dataframe from a QuasarDB Timeseries table. Wraps around stream_dataframes(), and returns everything as a single dataframe. batch_size is always explicitly set to 0.
Parameters:
conn : quasardb.Cluster Connection to the QuasarDB database.
table : str | quasardb.Table QuasarDB table to stream, either as a string or a table object. When re-executing the same function multiple times on the same tables, providing the table as an object has a performance benefit.
def stream_dataframe(conn: Cluster,
table: TableLike,
*,
batch_size: Optional[int] = 65536,
column_names: Optional[List[str]] = None,
ranges: Optional[RangeSet] = None) ‑> Iterator[pandas.core.frame.DataFrame]-
Expand source code
def stream_dataframe( conn: Cluster, table: TableLike, *, batch_size: Optional[int] = 2**16, column_names: Optional[List[str]] = None, ranges: Optional[RangeSet] = None, ) -> Iterator[pd.DataFrame]: """ Read a single table and return a stream of dataframes. This is a convenience function that wraps around `stream_dataframes`. """ # For backwards compatibility, we drop the `$table` column returned: this is not strictly # necessary, but it also is somewhat reasonable to drop it when we're reading from a single # table, which is the case here. clean_df_fn = lambda df: df.drop(columns=["$table"]) return ( clean_df_fn(df) for df in stream_dataframes( conn, [table], batch_size=batch_size, column_names=column_names, ranges=ranges, ) )Read a single table and return a stream of dataframes. This is a convenience function that wraps around
stream_dataframes(). def stream_dataframes(conn: Cluster,
tables: List[TableLike],
*,
batch_size: Optional[int] = 65536,
column_names: Optional[List[str]] = None,
ranges: Optional[RangeSet] = None) ‑> Iterator[pandas.core.frame.DataFrame]-
Expand source code
def stream_dataframes( conn: Cluster, tables: List[TableLike], *, batch_size: Optional[int] = 2**16, column_names: Optional[List[str]] = None, ranges: Optional[RangeSet] = None, ) -> Iterator[pd.DataFrame]: """ Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size `batch_size`, which is useful when traversing a large dataset which does not fit into memory. Parameters: ----------- conn : quasardb.Cluster Connection to the QuasarDB database. tables : list[str | quasardb.Table] QuasarDB tables to stream, as a list of strings or quasardb table objects. batch_size : int The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows as batch size by default. column_names : optional list List of columns to read in dataframe. The timestamp column '$timestamp' is always read. Defaults to all columns. ranges: optional list A list of time ranges to read, represented as tuples of Numpy datetime64[ns] objects. Defaults to the entire table. """ for idx, xs in qdbnp.stream_arrays( conn, tables, batch_size=batch_size, column_names=column_names, ranges=ranges, ): yield pd.DataFrame(xs, index=pd.Index(idx, copy=False, name="$timestamp"))Read a Pandas Dataframe from a QuasarDB Timeseries table. Returns a generator with dataframes of size
batch_size, which is useful when traversing a large dataset which does not fit into memory.Parameters:
conn : quasardb.Cluster Connection to the QuasarDB database.
tables : list[str | quasardb.Table] QuasarDB tables to stream, as a list of strings or quasardb table objects.
batch_size : int The amount of rows to fetch in a single read operation. If unset, uses 2^16 (65536) rows as batch size by default.
column_names : optional list List of columns to read in dataframe. The timestamp column '$timestamp' is always read.
Defaults to all columns.
ranges: optional list A list of time ranges to read, represented as tuples of Numpy datetime64[ns] objects. Defaults to the entire table.
def write_dataframe(df: pd.DataFrame,
cluster: quasardb.Cluster,
table: TableLike,
*,
create: bool = False,
shard_size: Optional[timedelta] = None,
dtype: Optional[Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]]] = None,
push_mode: Optional[quasardb.WriterPushMode] = None,
fast: bool = False,
truncate: Union[bool, Range] = False,
truncate_range: Optional[Range] = None,
deduplicate: Union[bool, str, List[str]] = False,
deduplication_mode: str = 'drop',
infer_types: bool = True,
writer: Optional[Writer] = None,
write_through: bool = True,
retries: Union[int, quasardb.RetryOptions] = 3,
**kwargs: Any) ‑> List[Table]-
Expand source code
def write_dataframe( df: pd.DataFrame, cluster: quasardb.Cluster, table: TableLike, *, create: bool = False, shard_size: Optional[timedelta] = None, # numpy.write_arrays passthrough options dtype: Optional[ Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]] ] = None, push_mode: Optional[quasardb.WriterPushMode] = None, _async: bool = False, fast: bool = False, truncate: Union[bool, Range] = False, truncate_range: Optional[Range] = None, deduplicate: Union[bool, str, List[str]] = False, deduplication_mode: str = "drop", infer_types: bool = True, writer: Optional[Writer] = None, write_through: bool = True, retries: Union[int, quasardb.RetryOptions] = 3, **kwargs: Any, ) -> List[Table]: """ Store a single dataframe into a table. Takes the same arguments as `write_dataframes`, except only a single df/table combination. """ kwargs["deprecation_stacklevel"] = kwargs.get("deprecation_stacklevel", 1) + 1 return write_dataframes( [(table, df)], cluster, create=create, shard_size=shard_size, dtype=dtype, push_mode=push_mode, _async=_async, fast=fast, truncate=truncate, truncate_range=truncate_range, deduplicate=deduplicate, deduplication_mode=deduplication_mode, infer_types=infer_types, writer=writer, write_through=write_through, retries=retries, **kwargs, )Store a single dataframe into a table. Takes the same arguments as
write_dataframes(), except only a single df/table combination. def write_dataframes(dfs: Union[Dict[TableLike, pd.DataFrame], List[tuple[TableLike, pd.DataFrame]]],
cluster: quasardb.Cluster,
*,
create: bool = False,
shard_size: Optional[timedelta] = None,
dtype: Optional[Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]]] = None,
push_mode: Optional[quasardb.WriterPushMode] = None,
fast: bool = False,
truncate: Union[bool, Range] = False,
truncate_range: Optional[Range] = None,
deduplicate: Union[bool, str, List[str]] = False,
deduplication_mode: str = 'drop',
infer_types: bool = True,
writer: Optional[Writer] = None,
write_through: bool = True,
retries: Union[int, quasardb.RetryOptions] = 3,
**kwargs: Any) ‑> List[Table]-
Expand source code
def write_dataframes( dfs: Union[ Dict[TableLike, pd.DataFrame], List[tuple[TableLike, pd.DataFrame]], ], cluster: quasardb.Cluster, *, create: bool = False, shard_size: Optional[timedelta] = None, # numpy.write_arrays passthrough options dtype: Optional[ Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]] ] = None, push_mode: Optional[quasardb.WriterPushMode] = None, _async: bool = False, fast: bool = False, truncate: Union[bool, Range] = False, truncate_range: Optional[Range] = None, deduplicate: Union[bool, str, List[str]] = False, deduplication_mode: str = "drop", infer_types: bool = True, writer: Optional[Writer] = None, write_through: bool = True, retries: Union[int, quasardb.RetryOptions] = 3, **kwargs: Any, ) -> List[Table]: """ Store dataframes into a table. Any additional parameters not documented here are passed to numpy.write_arrays(). Please consult the pydoc of that function for additional accepted parameters. Parameters: ----------- dfs: dict[str | quasardb.Table, pd.DataFrame] | list[tuple[str | quasardb.Table, pd.DataFrame]] This can be either a dict that maps table (either objects or names) to a dataframe, or a list of table<>dataframe tuples. cluster: quasardb.Cluster Active connection to the QuasarDB cluster create: optional bool Whether to create the table. Defaults to False. shard_size: optional datetime.timedelta The shard size of the timeseries you wish to create when `create` is True. """ # If dfs is a dict, we convert it to a list of tuples. if isinstance(dfs, dict): dfs = list(dfs.items()) if shard_size is not None and create == False: raise ValueError("Invalid argument: shard size provided while create is False") # If the tables are provided as strings, we look them up. dfs_ = [] for table, df in dfs: if isinstance(table, str): table = table_cache.lookup(table, cluster) dfs_.append((table, df)) data_by_table = [] for table, df in dfs_: logger.debug("quasardb.pandas.write_dataframe, create = %s", create) assert isinstance(df, pd.DataFrame) # Create table if requested if create: _create_table_from_df(df, table, shard_size) cinfos = [(x.name, x.type) for x in table.list_columns()] if not df.index.is_monotonic_increasing: logger.warning( "dataframe index is unsorted, resorting dataframe based on index" ) df = df.sort_index().reindex() # We pass everything else to our qdbnp.write_arrays function, as generally speaking # it is (much) more sensible to deal with numpy arrays than Pandas dataframes: # pandas has the bad habit of wanting to cast data to different types if your data # is sparse, most notably forcing sparse integer arrays to floating points. data = _extract_columns(df, cinfos) data["$timestamp"] = ma.masked_array( df.index.to_numpy(copy=False, dtype="datetime64[ns]") ) # We cast to masked_array to enforce typing compliance data_by_table.append((table, data)) kwargs["deprecation_stacklevel"] = kwargs.get("deprecation_stacklevel", 1) + 1 return qdbnp.write_arrays( data_by_table, cluster, table=None, index=None, dtype=dtype, push_mode=push_mode, _async=_async, fast=fast, truncate=truncate, truncate_range=truncate_range, deduplicate=deduplicate, deduplication_mode=deduplication_mode, infer_types=infer_types, writer=writer, write_through=write_through, retries=retries, **kwargs, )Store dataframes into a table. Any additional parameters not documented here are passed to numpy.write_arrays(). Please consult the pydoc of that function for additional accepted parameters.
Parameters:
dfs: dict[str | quasardb.Table, pd.DataFrame] | list[tuple[str | quasardb.Table, pd.DataFrame]] This can be either a dict that maps table (either objects or names) to a dataframe, or a list of table<>dataframe tuples.
cluster: quasardb.Cluster Active connection to the QuasarDB cluster
create: optional bool Whether to create the table. Defaults to False.
shard_size: optional datetime.timedelta The shard size of the timeseries you wish to create when
createis True. def write_pinned_dataframe(df: pd.DataFrame,
cluster: quasardb.Cluster,
table: TableLike,
*,
create: bool = False,
shard_size: Optional[timedelta] = None,
dtype: Optional[Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]]] = None,
push_mode: Optional[quasardb.WriterPushMode] = None,
fast: bool = False,
truncate: Union[bool, Range] = False,
truncate_range: Optional[Range] = None,
deduplicate: Union[bool, str, List[str]] = False,
deduplication_mode: str = 'drop',
infer_types: bool = True,
writer: Optional[Writer] = None,
write_through: bool = True,
retries: Union[int, quasardb.RetryOptions] = 3,
**kwargs: Any) ‑> List[Table]-
Expand source code
def write_pinned_dataframe( df: pd.DataFrame, cluster: quasardb.Cluster, table: TableLike, *, create: bool = False, shard_size: Optional[timedelta] = None, # numpy.write_arrays passthrough options dtype: Optional[ Union[DType, Dict[str, Optional[DType]], List[Optional[DType]]] ] = None, push_mode: Optional[quasardb.WriterPushMode] = None, _async: bool = False, fast: bool = False, truncate: Union[bool, Range] = False, truncate_range: Optional[Range] = None, deduplicate: Union[bool, str, List[str]] = False, deduplication_mode: str = "drop", infer_types: bool = True, writer: Optional[Writer] = None, write_through: bool = True, retries: Union[int, quasardb.RetryOptions] = 3, **kwargs: Any, ) -> List[Table]: """ Legacy wrapper around write_dataframe() """ logger.warning( "write_pinned_dataframe is deprecated and will be removed in a future release." ) logger.warning("Please use write_dataframe directly instead") kwargs["deprecation_stacklevel"] = 2 return write_dataframe( df, cluster, table, create=create, shard_size=shard_size, dtype=dtype, push_mode=push_mode, _async=_async, fast=fast, truncate=truncate, truncate_range=truncate_range, deduplicate=deduplicate, deduplication_mode=deduplication_mode, infer_types=infer_types, writer=writer, write_through=write_through, retries=retries, **kwargs, )Legacy wrapper around write_dataframe()
Classes
class PandasRequired (*args, **kwargs)-
Expand source code
class PandasRequired(ImportError): """ Exception raised when trying to use QuasarDB pandas integration, but pandas has not been installed. """ passException raised when trying to use QuasarDB pandas integration, but pandas has not been installed.
Ancestors
- builtins.ImportError
- builtins.Exception
- builtins.BaseException