1.2. Streaming changes and continuous queries#

1.2.1. Purpose#

When the user runs a query on the database, the client sends various network requests to the server and assembles the results, and gives you a result. Running the query consumes resources on both the client and the server.

For example, when building a dashboard, the simplest way is to fetch the data on a regular interval from the database and update the content dashboard. This is called polling. It works, but if values don’t change often, it wastes many computing resources on both the client and the server. That’s even more true if the query result triggers some more computations. For example, in finance, building the order book can take a non-negligible amount of time.

It is possible to reduce waste by increasing the interval at which you run the query at the cost of increased latency.

In QuasarDB, continuous queries turn the logic around. When using continuous queries, instead of the client requesting the server’s data, the server sends the results to the client when there’s a change.

With continuous queries, a user can get low-latency updates with optimal resource usage.

1.2.2. Features#

Continuous queries are available starting with QuasarDB 3.11. The following is supported:

  • At most once notifications guarantees;

  • Latency as low as the network permits;

  • Any valid SELECT query can be run as a continuous query;

  • Continuous queries results are real-time and not pre-computed;

  • Continuous queries use very little additional resources on the server and the client;

  • As many continuous queries as the computing and networking resources allow can co-exist in parallel.

1.2.3. Limitations#

Continuous queries is a beta feature. The following is not yet supported:

  • The notification mechanism is not authenticated and could thus create potential security risks in untrusted networks. Data results are however, transmitted securely;

  • Table creation and tagging are currently not monitored by the continuous query engine. Thus, you may not get a notification if a table created impacts your result (for example, select with a find);

  • WHERE clauses are not inspected, meaning that, if the updated data does not match the WHERE clause, the continuous query may receive an update;

  • Only TCP notifications are currently supported.

1.2.4. When to use#

Continuous queries are a great fit if you need to have the result of a query updated when there is a change, on the updates to this query are at least an order of magnitude less frequent than the time it takes for the query to run.

1.2.5. Pacing#

Continuous queries can be an anti-pattern if the data touching your query is touched so often that your continuous query is refreshed frequently.

For example, let’s assume your query takes 1 ms to run and updates are even more frequent; your application could receive up to 1,000 updates per second!

Fortunately, there’s a way to “pace” the continuous queries so that you are not notified more often than a specified refresh interval. When you create your continuous query, you can specify a minimum refresh interval; you will not be notified more often than the specified value.

In other words, if you provide a refresh interval of 1,000 milliseconds (1 second), you will receive, at most, one update per second, even if the value has been updated more frequently.

1.2.6. When not to use#

Continuous queries are not a good fit when the data does not need to be refreshed regularly, in that case, use standard queries.

1.2.7. How it works#

When you submit a continuous query from the client, the following happens:

  1. The user specifies a query to be run and a callback to receive the results;

  2. The client parses and validates your query;

  3. The query is then analyzed to see what data changes in the database could impact the results;

  4. The client subscribes to the firehose publisher on the server to be notified when these data changes occur;

  5. The server sends the results for the query;

  6. The client receives the results from the server and exposes them through the user-provided callback;

  7. The client waits for changes from the server;

  8. The server notifies the client when changes occurred;

  9. When a change occurs, the client requests the modified data from the server and exposes them through the user-provided callback;

  10. Until the continuous query is destroyed, the client will go back to 7.

  11. When the continuous query is destroyed, the callback will be called with an “interrupted” error to allow the user to take any action

1.2.8. Available modes#

Continuous queries have two modes of operations:

  1. Full results

  2. New results only

When running in the “full results” mode, the callback will receive all results matching the query, including results that may have been received before. If you are unsure which mode you need, this is the one you should use.

When running in “new results only” mode, the callback will only receive new rows since the callback was called without an error. This mode is compelling when you need to forward your query results to another application, but the application cannot have duplicate rows.

1.2.9. Server configuration#

Continuous queries require the firehose publishing to be enabled on the server. It is disabled by default. To learn how to enable the firehose publishing, see the configuration chapter.

1.2.10. C Programming#

Continuous queries in C are more low-level than in other programming languages, which is why this documentation provides specific guidelines for C programming.

1.2.10.1. Initialization#

In C, the continuous query API call is qdb_query_continuous. It takes as a paramater a query, a mode of operation, a refresh rate in milliseconds, a callback, an arbirary pointer, and a pointer to a continuous query handle qdb_query_cont_handle_t.

Here, we use a previously opened handle to run a continuous query:

qdb_query_cont_handle_t cont_query = NULL;
const int refresh_rate_in_ms       = 10000;
error = qdb_query_continuous(handle, query, qdb_query_continuous_full, refresh_rate_in_ms, &query_callback, NULL, &cont_query);
if (QDB_FAILURE(error))
{
    fprintf(stderr, "Cannot run continuous query %s: %s (%#x)\n", query, qdb_error(error), error);
    return EXIT_FAILURE;
}

1.2.10.2. Callback#

In C, a callback takes three arguments: an user provided pointer, an error code, and a pointer to the results. If the callback returns a non-zero value, the continuous query will stop and the callback will no longer be called (the resources however need to be manually de-allocated with qdb_release).

typedef int (*qdb_query_cont_callback_t)(void *,
                                         qdb_error_t,
                                         const qdb_query_result_t *);

If the error code is different from qdb_e_ok, the pointer to the value to the results is undefined. The user must thus first check that the error code is qdb_e_ok before acceding the results. The pointer to the results will contain values that are valid only in the context of the callback. If the results are needed outside, the user must copy them.

The callback is called from the context of an API-managed thread. The user is responsible for managing access to any resource the callback may be sharing with other threads. A callback given to qdb_query_continuous will always be called from the same thread. However, successive calls to qdb_query_continuous offer no such guarantee.

It is advised to minimize the amount of work done in the callback context by, for example, doing the processing in other threads.

static int query_callback(void * p, qdb_error_t err, const qdb_query_result_t * result)
{
    (void)p; // unused in this example

    if (QDB_FAILURE(err))
    {
        printf("Error received: %s\n", qdb_error(err));
        return 0;
    }

    // access the results only if there is no error
    printf("%d column(s) and %d row(s) received!\n", (int)result->column_count, (int)result->row_count);

    // return 0, unless you want the continuous queries to end
    return 0;
}

When the continuous query is destroyed, the callback is called with the error code qdb_e_interrupted which gives the opportunity for the user to take an action on exit, such as user-managed resources deallocation.

..note::

The results (const qdb_query_result_t * pointer) are guaranteed to be valid in the context of the callback only. If you need to make use of the results outside of the scope of the callback, you must make a deep copy.

1.2.10.3. When is the callback called?#

The callback is called at least once, immediately after qdb_query_continuous has been called, it will receive the current results for the query.

The callback is called when new data for the query is available, and, when a refresh interval is specified, a refresh interval amount of time has elapsed. The client might agglomerate several updates into one call if they happened faster than the client could be notified.

In case of an error, the callback is called with the corresponding error code. The value of the results is undefined in that case.

When the continuous query is destroyed, the callback will be called, before exit, with the error code qdb_e_interrupted. This is true, even if the callback return a non-zero value previously to request the exit. This means, that if the callback returns a non-zero value, it will be called an additional time with the error code qdb_e_interrupted.

Spurious calls to the callback are rare but may happen, for example, when the cluster has been resized or after network errors. The user must not assume that because the callback is called, new data is availabe but should instead inspect the results. When the “new results only” mode is used, spurious calls to the callback will have empty results.

Lastly, the server and client use “at most once” delivery guarantee. In case of heavy traffic, the client may not receive all the notifications from the server. This can result in increased latency. However, no data loss can occur; queries will always show the correct number of rows available in the database.

1.2.10.4. Clean-up#

Once the continuous query is no longer needed, the user must call qdb_release on the handle allocated by qdb_query_continuous, even if the callback returned a non-zero value to signal the termination of the continuous query.

When the user calls qdb_release, the function will wait for the callback to return. The user is reponsible for ensuring the callback does not wait on a resource preventing this.

qdb_release(handle, cont_query);
qdb_close(handle);

Before clean-up, the callback will be called with qdb_e_interrupted. This can happen when qdb_release is called, or before, for example if the callback returned a non-zero value.

1.2.11. Python programming#

In Python, the internal complexities of continuous queries are abstracted away.

Here is how one runs a continuous query in Python, with a refresh interval of at least 100ms:

import quasardb
import datetime

c = quasardb.Cluster("qdb://127.0.0.1:2836")
cont = c.query_continuous_full(q, datetime.timedelta(milliseconds=100))
for res in cont:
    # the res structure is identical to a query() call
    for r in res:
        # print rows
        print(r)

The continuous query object exposes an iterator interface that will block until new results are available. Calling the results method achieves the same effect.

To receive the new values only, the user calls query_continuous_new_values.