Module quasardb.pool

Functions

def initialize(*args, **kwargs)
Expand source code
def initialize(*args, **kwargs):
    """
    Initialize a new global SingletonPool. Forwards all arguments to the constructor of
    `SingletonPool()`.

    After initialization, the instance can be used by invoking `instance()`.

    Example usage:
    --------------

    ```
    import quasardb.pool as pool

    pool.initialize(cluster='qdb://127.0.0.1:2836')

    # ...

    def myfunction()
      with pool.instance().connect() as conn:
        conn.query(...)
    ```
    """
    global __instance
    __instance = SingletonPool(*args, **kwargs)

Initialize a new global SingletonPool. Forwards all arguments to the constructor of SingletonPool.

After initialization, the instance can be used by invoking instance().

Example usage:

import quasardb.pool as pool

pool.initialize(cluster='qdb://127.0.0.1:2836')

# ...

def myfunction()
  with pool.instance().connect() as conn:
    conn.query(...)
def instance() ‑> SingletonPool
Expand source code
def instance() -> SingletonPool:
    """
    Singleton accessor. Instance must have been initialized using initialize()
    prior to invoking this function.

    Returns:
    --------
    `SingletonPool`

    """
    global __instance
    assert (
        __instance is not None
    ), "Global connection pool uninitialized: please initialize by calling the initialize() function."
    return __instance

Singleton accessor. Instance must have been initialized using initialize() prior to invoking this function.

Returns:

SingletonPool

def with_conn(*, arg=0)
Expand source code
def with_conn(_fn=None, *, arg=0):
    """
    Decorator function that handles connection assignment, release and invocation for you.
    Should be used in conjuction with the global singleton accessor, see also: `initialize()`.

    By default, the decorator function injects the connection as the first argument to the
    function:
    ```
    import quasardb.pool as pool
    pool.initialize(...)

    @pool.with_conn()
    def myfunction(conn, arg1, arg2):
       conn.query(...)
    ```

    You can optionally provide an `arg` parameter to denote which named keyword to provide
    it as:
    ```
    import quasardb.pool as pool
    pool.initialize(...)

    @pool.with_conn(arg='conn')
    def myfunction(arg1, arg2, conn=None):
       conn.query(...)
    ```
    """

    def inner(fn):
        def wrapper(*args, **kwargs):
            pool = instance()

            with pool.connect() as conn:
                (args, kwargs) = _inject_conn_arg(conn, arg, args, kwargs)
                return fn(*args, **kwargs)

        return wrapper

    if _fn is None:
        return inner
    else:
        return inner(_fn)

Decorator function that handles connection assignment, release and invocation for you. Should be used in conjuction with the global singleton accessor, see also: initialize().

By default, the decorator function injects the connection as the first argument to the function:

import quasardb.pool as pool
pool.initialize(...)

@pool.with_conn()
def myfunction(conn, arg1, arg2):
   conn.query(...)

You can optionally provide an arg parameter to denote which named keyword to provide it as:

import quasardb.pool as pool
pool.initialize(...)

@pool.with_conn(arg='conn')
def myfunction(arg1, arg2, conn=None):
   conn.query(...)

Classes

class Pool (get_conn=None, **kwargs)
Expand source code
class Pool(object):
    """
    A connection pool. This class should not be initialized directly, but
    rather the subclass `SingletonPool` should be initialized.

    The constructor either accepts all regular `quasardb.Cluster()` connection parameters,
    or a `get_conn` parameter which is invoked any time a new connection should be
    created.

    Example usage:
    --------------

    Initialize the pool by passthrough of `quasardb.Cluster()` arguments:
    ```
    import quasardb.pool as pool

    pool.SingletonPool(uri='qdb://127.0.0.1:2836',
                       cluster_public_key='...',
                       user_private_key='...',
                       user_name='...')
    ```

    Initialize pool by providing a callback function
    ```
    import quasardb
    import quasardb.pool as pool

    def my_qdb_connection_create():
       # This function is invoked any time the pool needs to allocate
       # a new connection.
       return quasardb.Cluster(uri='qdb://127.0.0.1:2836',
                               cluster_public_key='...',
                               user_private_key='...',
                               user_name='...')

    pool.SingletonPool(get_conn=my_qdb_connection_create)
    ```

    """

    def __init__(self, get_conn=None, **kwargs):
        self._all_connections = []

        if get_conn is None:
            get_conn = functools.partial(_create_conn, **kwargs)

        if not callable(get_conn):
            raise TypeError("get_conn must be callable")

        self._get_conn = get_conn

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def _create_conn(self):
        return SessionWrapper(self, self._get_conn())

    def close(self):
        """
        Close this connection pool, and all associated connections. This function
        is automatically invoked when used in a with-block or when using the global
        `instance()` singleton.
        """
        for conn in self._all_connections:
            logger.debug("closing connection {}".format(conn))
            conn.close()

    def connect(self) -> quasardb.Cluster:
        """
        Acquire a new connection from the pool. Returned connection must either
        be explicitly released using `pool.release()`, or should be wrapped in a
        with-block.

        Returns:
        --------
        `quasardb.Cluster`
        """
        logger.info("Acquiring connection from pool")
        return self._do_connect()

    def release(self, conn: quasardb.Cluster):
        """
        Put a connection back into the pool
        """
        logger.info("Putting connection back onto pool")
        return self._do_release(conn)

A connection pool. This class should not be initialized directly, but rather the subclass SingletonPool should be initialized.

The constructor either accepts all regular quasardb.Cluster() connection parameters, or a get_conn parameter which is invoked any time a new connection should be created.

Example usage:

Initialize the pool by passthrough of quasardb.Cluster() arguments:

import quasardb.pool as pool

pool.SingletonPool(uri='qdb://127.0.0.1:2836',
                   cluster_public_key='...',
                   user_private_key='...',
                   user_name='...')

Initialize pool by providing a callback function

import quasardb
import quasardb.pool as pool

def my_qdb_connection_create():
   # This function is invoked any time the pool needs to allocate
   # a new connection.
   return quasardb.Cluster(uri='qdb://127.0.0.1:2836',
                           cluster_public_key='...',
                           user_private_key='...',
                           user_name='...')

pool.SingletonPool(get_conn=my_qdb_connection_create)

Subclasses

Methods

def close(self)
Expand source code
def close(self):
    """
    Close this connection pool, and all associated connections. This function
    is automatically invoked when used in a with-block or when using the global
    `instance()` singleton.
    """
    for conn in self._all_connections:
        logger.debug("closing connection {}".format(conn))
        conn.close()

Close this connection pool, and all associated connections. This function is automatically invoked when used in a with-block or when using the global instance() singleton.

def connect(self) ‑> Cluster
Expand source code
def connect(self) -> quasardb.Cluster:
    """
    Acquire a new connection from the pool. Returned connection must either
    be explicitly released using `pool.release()`, or should be wrapped in a
    with-block.

    Returns:
    --------
    `quasardb.Cluster`
    """
    logger.info("Acquiring connection from pool")
    return self._do_connect()

Acquire a new connection from the pool. Returned connection must either be explicitly released using pool.release(), or should be wrapped in a with-block.

Returns:

quasardb.Cluster

def release(self,
conn: Cluster)
Expand source code
def release(self, conn: quasardb.Cluster):
    """
    Put a connection back into the pool
    """
    logger.info("Putting connection back onto pool")
    return self._do_release(conn)

Put a connection back into the pool

class SessionWrapper (pool, conn)
Expand source code
class SessionWrapper(object):
    def __init__(self, pool, conn):
        self._conn = conn
        self._pool = pool

    def __getattr__(self, attr):
        # This hack copies all the quasardb.Cluster() properties, functions and
        # whatnot, and pretends that this class is actually a quasardb.Cluster.
        #
        # The background is that when someone does this:
        #
        # with pool.connect() as conn:
        #   ...
        #
        # we don't want the the connection to be closed near the end, but rather
        # released back onto the pool.
        #
        # Now, my initial approach was to build a pool.Session class which inherited
        # from quasardb.Cluster, and just overload the __exit__ function there. But,
        # we want people to have the flexibility to pass in an external `get_conn` callback
        # in the pool, which establishes a connection, because they may have to look up
        # some dynamic security tokens. This function should then, in turn, return a vanilla
        # quasardb.Cluster() object.
        #
        # And this is why we end up with the solution below.
        if attr in self.__dict__:
            return getattr(self, attr)

        return getattr(self._conn, attr)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._pool.release(self._conn)
class SingletonPool (**kwargs)
Expand source code
class SingletonPool(Pool):
    """
    Implementation of our connection pool that maintains just a single connection
    per thread.

    Example usage:
    --------

    Using pool.connect() in a with-block:
    ```
    import quasardb.pool as pool

    with pool.Pool(uri='qdb://127.0.0.1:2836') as pool:
      with pool.connect() as conn:
        conn.query(...)
    ```

    Explicitly releasing the connection using `Pool.release()`:
    ```
    import quasardb.pool as pool

    with pool.Pool(uri='qdb://127.0.0.1:2836') as pool:
      conn = pool.connect()
      try:
        conn.query(...)
      finally:
        pool.release(conn)
    ```
    """

    def __init__(self, **kwargs):
        Pool.__init__(self, **kwargs)
        self._conn = threading.local()

    def _do_connect(self):
        try:
            c = self._conn.current()
            if c:
                return c
        except AttributeError:
            pass

        c = self._create_conn()
        self._conn.current = weakref.ref(c)
        self._all_connections.append(c)

        return c

    def _do_release(self, conn):
        # Thread-local connections do not have to be 'released'.
        pass

Implementation of our connection pool that maintains just a single connection per thread.

Example usage:

Using pool.connect() in a with-block:

import quasardb.pool as pool

with pool.Pool(uri='qdb://127.0.0.1:2836') as pool:
  with pool.connect() as conn:
    conn.query(...)

Explicitly releasing the connection using Pool.release():

import quasardb.pool as pool

with pool.Pool(uri='qdb://127.0.0.1:2836') as pool:
  conn = pool.connect()
  try:
    conn.query(...)
  finally:
    pool.release(conn)

Ancestors

Inherited members