3.7. Spark connector#
3.7.1. Introduction#
Official quasardb Spark connector. It extends quasardb’s support to allow storing and retrieving data as Spark RDDs or DataFrames. You may read and download the connector’s code from GitHub at https://github.com/bureau14/qdb-spark-connector.
3.7.2. Querying quasardb#
Given a quasardb timeseries table doubles_test
that looks as follows:
timestamp |
value |
---|---|
2017-10-01 12:09:03 |
1.2345678 |
2017-10-01 12:09:04 |
8.7654321 |
2017-10-01 12:09:05 |
5.6789012 |
2017-10-01 12:09:06 |
2.1098765 |
3.7.2.1. Querying using RDD#
The qdbDoubleColumn
is an implicit method on an RDD[(Timestamp, Double)]
,
and the qdbBlobColumn
is an implicit method on an RDD[(Timestamp, Array[Byte]))
.
Both methods require explicitly passing a qdbUri
, a tableName
, a columnName
and a QdbTimeRangeCollection
as demonstrated below.
import java.sql.Timestamp
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import com.quasardb.spark._
import net.quasardb.qdb.QdbTimeRangeCollection
val qdbUri = "qdb://127.0.01:2836"
val timeRanges = new QdbTimeRangeCollection
timeRanges.add(
new QdbTimeRange(
Timestamp.valueOf("2017-10-01 12:09:03"),
Timestamp.valueOf("2017-10-01 12:09:07")))
val sc = new SparkContext("local", "qdb-test")
val sqlContext = new SQLContext(sc)
val rdd = sqlContext
.qdbDoubleColumn(
qdbUri,
"doubles_test",
"value",
timeRanges)
.collect
3.7.2.2. Querying using a DataFrame#
The qdbDoubleColumnAsDataFrame
and the qdbBlobColumnAsDataFrame
allows querying data from quasardb directly as a DataFrame. It exposes a DataFrame with the columns timestamp
and value
, where the value corresponds to the value type of the data being queried (Double
or Array[Byte]
).
import java.sql.Timestamp
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import com.quasardb.spark._
import net.quasardb.qdb.QdbTimeRangeCollection
val qdbUri = "qdb://127.0.01:2836"
val timeRanges = new QdbTimeRangeCollection
timeRanges.add(
new QdbTimeRange(
Timestamp.valueOf("2017-10-01 12:09:03"),
Timestamp.valueOf("2017-10-01 12:09:07")))
val sc = new SparkContext("local", "qdb-test")
val sqlContext = new SQLContext(sc)
val df = sqlContext
.qdbDoubleColumnAsDataFrame(
qdbUri,
"doubles_test",
"dbl_value",
timeRanges)
.collect
3.7.2.3. Aggregating using an RDD#
quasardb exposes its native aggregation capabilities as RDD using the implicit methods
qdbAggregateDoubleColumn
and qdbAggregateBlobColumn
that requires passing
a qdbUri
, tableName
, columnName
and a sequence of AggregateQuery
.
It returns exactly one result row for each AggregateQuery
provided.
import java.sql.Timestamp
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import net.quasardb.qdb.QdbAggregation
import com.quasardb.spark._
import com.quasardb.spark.rdd.AggregateQuery
val qdbUri = "qdb://127.0.01:2836"
val sc = new SparkContext("local", "qdb-test")
val sqlContext = new SQLContext(sc)
val results = sqlContext
.qdbAggregateDoubleColumn(
qdbUri,
"doubles_test",
"value",
List(
AggregateQuery(
begin = Timestamp.valueOf("2017-10-01 12:09:03"),
end = = Timestamp.valueOf("2017-10-01 12:09:07"),
operation = QdbAggregation.Type.COUNT))
.collect
results.length should equal(1)
results.head.count should equal(doubleCollection.size)
3.7.2.4. Aggregating using a DataFrame#
quasardb exposes its native aggregation capabilities as DataFrame using the implicit methods
qdbAggregateDoubleColumnAsDataFrame
and qdbAggregateBlobColumnAsDataFrame
that requires passing
a qdbUri
, tableName
, columnName
and a sequence of AggregateQuery
.
It returns exactly one result row for each AggregateQuery
provided.
import java.sql.Timestamp
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import net.quasardb.qdb.QdbAggregation
import com.quasardb.spark._
import com.quasardb.spark.rdd.AggregateQuery
val qdbUri = "qdb://127.0.01:2836"
val sc = new SparkContext("local", "qdb-test")
val sqlContext = new SQLContext(sc)
val results = sqlContext
.qdbAggregateDoubleColumnAsDataFrame(
qdbUri,
"doubles_test",
"value",
List(
AggregateQuery(
begin = Timestamp.valueOf("2017-10-01 12:09:03"),
end = = Timestamp.valueOf("2017-10-01 12:09:07"),
operation = QdbAggregation.Type.COUNT))
.collect
results.length should equal(1)
results.head.getLong(0) should equal(doubleCollection.size)
3.7.3. Writing to quasardb#
3.7.3.1. Writing using an RDD#
import java.sql.Timestamp
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import com.quasardb.spark._
val qdbUri = "qdb://127.0.01:2836"
val sc = new SparkContext("local", "qdb-test")
val sqlContext = new SQLContext(sc)
val dataSet = List(
(Timestamp.valueOf("2017-10-01 12:09:03"), 1.2345678),
(Timestamp.valueOf("2017-10-01 12:09:04"), 8.7654321),
(Timestamp.valueOf("2017-10-01 12:09:05"), 5.6789012),
(Timestamp.valueOf("2017-10-01 12:09:06"), 2.1098765))
sc
.parallelize(dataSet)
.toQdbDoubleColumn(qdbUri, "doubles_test", "dbl_value")
3.7.3.2. Writing using a DataFrame#
The qdbDoubleColumnAsDataFrame
and qdbBlobColumnAsDataFrame
functions are available
in the Spark SQLContext to allow storing data into quasardb.
You must also pass a qdbUri
, a tableName
and a columnName
to the function to store the data in the correct location.
The code example below copies all the data from the doubles_test
table into the doubles_test_copy
table.
import java.sql.Timestamp
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import com.quasardb.spark._
import net.quasardb.qdb.QdbTimeRangeCollection
val qdbUri = "qdb://127.0.01:2836"
val timeRanges = new QdbTimeRangeCollection
timeRanges.add(
new QdbTimeRange(
Timestamp.valueOf("2017-10-01 12:09:03"),
Timestamp.valueOf("2017-10-01 12:09:07")))
val sc = new SparkContext("local", "qdb-test")
val sqlContext = new SQLContext(sc)
val df = sqlContext
.qdbDoubleColumnAsDataFrame(
qdbUri,
"doubles_test",
"dbl_value",
timeRanges)
.toQdbDoubleColumn(qdbUri, "doubles_test_copy", "dbl_value")
3.7.4. Tests#
In order to run the tests, please download the latest quasardb-server and extract in a qdb
subdirectory like this:
mkdir qdb
cd qdb
wget "https://download.quasardb.net/quasardb/2.1/2.1.0-beta.1/server/qdb-2.1.0master-darwin-64bit-server.tar.gz"
tar -xzf "qdb-2.1.0master-darwin-64bit-server.tar.gz"
Then launch the integration test using sbt:
sbt test
3.7.5. A note for macOS users#
quasardb uses a C++ standard library that is not shipped with macOS by default. Unfortunately, due to static linking restrictions on macOS this can cause runtime errors such like these:
dyld: Symbol not found: __ZTISt18bad_variant_access
Until a fix is available for the quasardb client libraries, the best course of action is to download the libc++ libraries and tell your shell where to find them:
cd qdb
wget "http://releases.llvm.org/5.0.0/clang+llvm-5.0.0-x86_64-apple-darwin.tar.xz"
tar -xzf "clang+llvm-5.0.0-x86_64-apple-darwin.tar.xz"
And then run the tests like this:
DYLD_LIBRARY_PATH=qdb/clang+llvm-5.0.0-x86_64-apple-darwin/lib/ sbt test