3.2. Streaming changes and continuous queries

3.2. Streaming changes and continuous queries

3.2.1. Purpose

When the user runs a query on the database, the client sends various network requests to the server and assembles the results, and gives you a result. Running the query consumes resources on both the client and the server.

For example, when building a dashboard, the simplest way is to fetch the data on a regular interval from the database and update the content dashboard. This is called polling. It works, but if values don’t change often, it wastes many computing resources on both the client and the server. That’s even more true if the query result triggers some more computations. For example, in finance, building the order book can take a non-negligible amount of time.

It is possible to reduce waste by increasing the interval at which you run the query at the cost of increased latency.

In QuasarDB, continuous queries turn the logic around. When using continuous queries, instead of the client requesting the server’s data, the server sends the results to the client when there’s a change.

With continuous queries, a user can get low-latency updates with optimal resource usage.

3.2.2. Features

Continuous queries are available starting with QuasarDB 3.11. The following is supported:

  • At most once notifications guarantees;

  • Latency as low as the network permits;

  • Any valid SELECT query can be run as a continuous query;

  • Continuous queries results are real-time and not pre-computed;

  • Continuous queries use very little additional resources on the server and the client;

  • As many continuous queries as the computing and networking resources allow can co-exist in parallel.

3.2.3. Limitations

Continuous queries is a beta feature. The following is not yet supported:

  • The notification mechanism is not authenticated and could thus create potential security risks in untrusted networks. Data results are however, transmitted securely;

  • Table creation and tagging are currently not monitored by the continuous query engine. Thus, you may not get a notification if a table created impacts your result (for example, select with a find);

  • WHERE clauses are not inspected, meaning that, if the updated data does not match the WHERE clause, the continuous query may receive an update;

  • Only TCP notifications are currently supported.

3.2.4. When to use

Continuous queries are a great fit if you need to have the result of a query updated when there is a change, on the updates to this query are at least an order of magnitude less frequent than the time it takes for the query to run.

3.2.5. When not to use

Continuous queries can be an anti-pattern if the data touching your query is touched so often your continuous query is refreshed frequently.

Another reason not to use continuous queries is when the refresh interval you need is greater than the update interval. For example, if you want your query to be refreshed every second, but data is updated 10,000 times a second, continuous queries will use more resources than a regular query, because they will be triggered up to 10,000 times a second.

3.2.6. How it works

When you submit a continuous query from the client, the following happens:

  1. The user specifies a query to be run and a callback to receive the results;

  2. The client parses and validates your query;

  3. The query is then analyzed to see what data changes in the database could impact the results;

  4. The client subscribes to the firehose publisher on the server to be notified when these data changes occur;

  5. The server sends the results for the query;

  6. The client receives the results from the server and exposes them through the user-provided callback;

  7. The client waits for changes from the server;

  8. The server notifies the client when changes occurred;

  9. When a change occurs, the client requests the modified data from the server and exposes them through the user-provided callback;

  10. Until the continuous query is destroyed, the client will go back to 7.

  11. When the continuous query is destroyed, the callback will be called with an “interrupted” error to allow the user to take any action

3.2.7. Available modes

Continuous queries have two modes of operations:

  1. Full results

  2. New results only

When running in the “full results” mode, the callback will receive all results matching the query, including results that may have been received before. If you are unsure which mode you need, this is the one you should use.

When running in “new results only” mode, the callback will only receive new rows since the callback was called without an error. This mode is compelling when you need to forward your query results to another application, but the application cannot have duplicate rows.

3.2.8. Server configuration

Continuous queries require the firehose publishing to be enabled on the server. It is disabled by default. To learn how to enable the firehose publishing, see the configuration chapter.

3.2.9. C Programming

Continuous queries in C are more low-level than in other programming languages, which is why this documentation provides specific guidelines for C programming.

3.2.9.1. Initialization

In C, the continuous query API call is qdb_query_continuous. It takes as a paramater a query, a mode of operation, a callback, an arbirary pointer, and a pointer to a continuous query handle qdb_query_cont_handle_t.

Here, we use a previously opened handle to run a continuous query:

qdb_query_cont_handle_t cont_query = NULL;
error                              = qdb_query_continuous(handle, query, qdb_query_continuous_full, &query_callback, NULL, &cont_query);
if (QDB_FAILURE(error))
{
    fprintf(stderr, "Cannot run continuous query %s: %s (%#x)\n", query, qdb_error(error), error);
    return EXIT_FAILURE;
}

3.2.9.2. Callback

In C, a callback takes three arguments: an user provided pointer, an error code, and a pointer to the results. If the callback returns a non-zero value, the continuous query will stop and the callback will no longer be called (the resources however need to be manually de-allocated with qdb_release).

typedef int (*qdb_query_cont_callback_t)(void *,
                                         qdb_error_t,
                                         const qdb_query_result_t *);

If the error code is different from qdb_e_ok, the pointer to the value to the results is undefined. The user must thus first check that the error code is qdb_e_ok before acceding the results. The pointer to the results will contain values that are valid only in the context of the callback. If the results are needed outside, the user must copy them.

The callback is called from the context of an API-managed thread. The user is responsible for managing access to any resource the callback may be sharing with other threads. A callback given to qdb_query_continuous will always be called from the same thread. However, successive calls to qdb_query_continuous offer no such guarantee.

It is advised to minimize the amount of work done in the callback context by, for example, doing the processing in other threads.

static int query_callback(void * p, qdb_error_t err, const qdb_query_result_t * result)
{
    (void)p; // unused in this example

    if (QDB_FAILURE(err))
    {
        printf("Error received: %s\n", qdb_error(err));
        return 0;
    }

    // access the results only if there is no error
    printf("%d column(s) and %d row(s) received!\n", (int)result->column_count, (int)result->row_count);

    // return 0, unless you want the continuous queries to end
    return 0;
}

When the continuous query is destroyed, the callback is called with the error code qdb_e_interrupted which gives the opportunity for the user to take an action on exit, such as user-managed resources deallocation.

3.2.9.3. When is the callback called?

The callback is called at least once, immediately after qdb_query_continuous has been called, it will receive the current results for the query.

The callback is called when new data for the query is available. The client might agglomerate several updates into one call if they happened faster than the client could be notified.

In case of an error, the callback is called with the corresponding error code. The value of the results is undefined in that case.

When the continuous query is destroyed, the callback will be called, before exit, with the error code qdb_e_interrupted. This is true, even if the callback return a non-zero value previously to request the exit. This means, that if the callback returns a non-zero value, it will be called an additional time with the error code qdb_e_interrupted.

Spurious calls to the callback are rare but may happen, for example, when the cluster has been resized or after network errors. The user must not assume that because the callback is called, new data is availabe but should instead inspect the results. When the “new results only” mode is used, spurious calls to the callback will have empty results.

Lastly, the server and client use “at most once” delivery guarantee. In case of heavy traffic, the client may not receive all the notifications from the server. This can result in increased latency. However, no data loss can occur; queries will always show the correct number of rows available in the database.

3.2.9.4. Clean-up

Once the continuous query is no longer needed, the user must call qdb_release on the handle allocated by qdb_query_continuous, even if the callback returned a non-zero value to signal the termination of the continuous query.

When the user calls qdb_release, the function will wait for the callback to return. The user is reponsible for ensuring the callback does not wait on a resource preventing this.

qdb_release(handle, cont_query);
qdb_close(handle);

Before clean-up, the callback will be called with qdb_e_interrupted. This can happen when qdb_release is called, or before, for example if the callback returned a non-zero value.

3.2.10. Python programming

In Python, the internal complexities of continuous queries are abstracted away.

Here is how one runs a continuous query in Python:

c = quasardb.Cluster("qdb://127.0.0.1:2836")
cont = c.query_continuous_full(q)
for res in cont:
    # the res structure is identical to a query() call
    for r in res:
        # print rows
        print(r)

The continuous query object exposes an iterator interface that will block until new results are available. Calling the results method achieves the same effect.

To receive the new values only, the user calls query_continuous_new_values.