Module quasardb.stats
Expand source code
import re
import quasardb
import logging
from datetime import datetime
logger = logging.getLogger('quasardb.stats')
stats_prefix = '$qdb.statistics.'
user_pattern = re.compile(r'\$qdb.statistics.(.*).uid_([0-9]+)$')
total_pattern = re.compile(r'\$qdb.statistics.(.*)$')
def is_user_stat(s):
return user_pattern.match(s) is not None
def is_cumulative_stat(s):
# NOTE(leon): It's quite difficult to express in Python that you don't want any
# regex to _end_ with uid_[0-9]+, because Python's regex engine doesn't support
# variable width look-behind.
#
# An alternative would be to use the PyPi regex library (for POSIX regexes), but
# want to stay light on dependencies#
#
# As such, we define a 'cumulative' stat as anything that's not a user stat.
# Simple but effective.
return user_pattern.match(s) is None
def by_node(conn):
"""
Returns statistic grouped by node URI.
Parameters:
conn: quasardb.Cluster
Active connection to the QuasarDB cluster
"""
return {x: of_node(conn.node(x)) for x in conn.endpoints()}
def of_node(dconn):
"""
Returns statistic for a single node.
Parameters:
dconn: quasardb.Node
Direct node connection to the node we wish to connect to
"""
start = datetime.now()
raw = {
k: _get_stat(dconn,k) for k in dconn.prefix_get(stats_prefix, 1000)}
ret = {'by_uid': _by_uid(raw),
'cumulative': _cumulative(raw)}
check_duration = datetime.now() - start
ret['cumulative']['check.online'] = 1
ret['cumulative']['check.duration_ms'] = int(check_duration.total_seconds() * 1000)
return ret
_stat_types = {'node_id': ('constant', None),
'operating_system': ('constant', None),
'partitions_count': ('constant', 'count'),
'cpu.system': ('counter', 'ns'),
'cpu.user': ('counter', 'ns'),
'cpu.idle': ('counter', 'ns'),
'startup': ('constant', None),
'startup_time': ('constant', None),
'shutdown_time': ('constant', None),
'network.current_users_count': ('gauge', 'count'),
'hardware_concurrency': ('gauge', 'count'),
'check.online': ('gauge', 'count'),
'check.duration_ms': ('constant', 'ms'),
'requests.bytes_in': ('counter', 'bytes'),
'requests.bytes_out': ('counter', 'bytes'),
'requests.errors_count': ('counter', 'count'),
'requests.successes_count': ('counter', 'count'),
'requests.total_count': ('counter', 'count'),
'async_pipelines.merge.bucket_count': ('counter', 'count'),
'async_pipelines.merge.duration_us': ('counter', 'us'),
'async_pipelines.write.successes_count': ('counter', 'count'),
'async_pipelines.write.failures_count': ('counter', 'count'),
'async_pipelines.write.time_us': ('counter', 'us'),
'async_pipelines.merge.max_bucket_count': ('gauge', 'count'),
'async_pipelines.merge.max_depth_count': ('gauge', 'count'),
'async_pipelines.merge.requests_count': ('counter', 'count'),
'evicted.count': ('counter', 'count'),
'pageins.count': ('counter', 'count'),
}
async_pipeline_bytes_pattern = re.compile(r'async_pipelines.pipe_[0-9]+.merge_map.bytes')
async_pipeline_count_pattern = re.compile(r'async_pipelines.pipe_[0-9]+.merge_map.count')
def _stat_type(stat_id):
if stat_id in _stat_types:
return _stat_types[stat_id]
elif stat_id.endswith('total_ns'):
return ('counter', 'ns')
elif stat_id.endswith('total_bytes'):
return ('counter', 'bytes')
elif stat_id.endswith('read_bytes'):
return ('counter', 'bytes')
elif stat_id.endswith('written_bytes'):
return ('counter', 'bytes')
elif stat_id.endswith('total_count'):
return ('counter', 'count')
elif stat_id.startswith('network.sessions.'):
return ('gauge', 'count')
elif stat_id.startswith('memory.'):
# memory statistics are all gauges i think, describes how much memory currently allocated where
return ('gauge', 'bytes')
elif stat_id.startswith('persistence.') or stat_id.startswith('disk'):
# persistence are also all gauges, describes mostly how much is currently available/used on storage
return ('gauge', 'bytes')
elif stat_id.startswith('license.'):
return ('gauge', None)
elif stat_id.startswith('engine_'):
return ('constant', None)
elif async_pipeline_bytes_pattern.match(stat_id):
return ('gauge', 'bytes')
elif async_pipeline_count_pattern.match(stat_id):
return ('gauge', 'count')
else:
return None
def stat_type(stat_id):
"""
Returns the statistic type by a stat id. Returns one of:
- 'gauge'
- 'counter'
- None in case of unrecognized statistics
This is useful for determining which value should be reported in a dashboard.
"""
return _stat_type(stat_id)
def _calculate_delta_stat(stat_id, prev, cur):
logger.info("calculating delta for stat_id = {}, prev = {}. cur = {}".format(stat_id, prev, cur))
stat_type = _stat_type(stat_id)
if stat_type == 'counter':
return cur - prev
elif stat_type == 'gauge':
return cur
else:
return None
def _calculate_delta_stats(prev_stats, cur_stats):
ret = {}
for stat_id in cur_stats.keys():
try:
prev_stat = cur_stats[stat_id]
cur_stat = cur_stats[stat_id]
value = _calculate_delta_stat(stat_id, prev_stat, cur_stat)
if value is not None:
ret[stat_id] = value
except KeyError:
# Stat likely was not present yet in prev_stats
pass
return ret
def calculate_delta(prev, cur):
"""
Calculates the 'delta' between two successive statistic measurements.
"""
ret = {}
for node_id in cur.keys():
ret[node_id] = _calculate_delta_stats(prev[node_id]['cumulative'],
cur[node_id]['cumulative'])
return ret
def _clean_blob(x):
x_ = x.decode('utf-8', 'replace')
# remove trailing zero-terminator
return ''.join(c for c in x_ if ord(c) != 0)
def _get_stat(dconn, k):
# Ugly, but works: try to retrieve as integer, if not an int, retrieve as
# blob
try:
return dconn.integer(k).get()
except quasardb.quasardb.AliasNotFoundError:
return _clean_blob(dconn.blob(k).get())
def _by_uid(stats):
xs = {}
for k, v in stats.items():
matches = user_pattern.match(k)
if is_user_stat(k) and matches:
(metric, uid_str) = matches.groups()
uid = int(uid_str)
if uid not in xs:
xs[uid] = {}
if not metric.startswith('serialized'):
xs[uid][metric] = v
return xs
def _cumulative(stats):
xs = {}
for k, v in stats.items():
matches = total_pattern.match(k)
if is_cumulative_stat(k) and matches:
metric = matches.groups()[0]
if not metric.startswith('serialized'):
xs[metric] = v
return xs
Functions
def by_node(conn)
-
Returns statistic grouped by node URI.
Parameters: conn: quasardb.Cluster Active connection to the QuasarDB cluster
Expand source code
def by_node(conn): """ Returns statistic grouped by node URI. Parameters: conn: quasardb.Cluster Active connection to the QuasarDB cluster """ return {x: of_node(conn.node(x)) for x in conn.endpoints()}
def calculate_delta(prev, cur)
-
Calculates the 'delta' between two successive statistic measurements.
Expand source code
def calculate_delta(prev, cur): """ Calculates the 'delta' between two successive statistic measurements. """ ret = {} for node_id in cur.keys(): ret[node_id] = _calculate_delta_stats(prev[node_id]['cumulative'], cur[node_id]['cumulative']) return ret
def is_cumulative_stat(s)
-
Expand source code
def is_cumulative_stat(s): # NOTE(leon): It's quite difficult to express in Python that you don't want any # regex to _end_ with uid_[0-9]+, because Python's regex engine doesn't support # variable width look-behind. # # An alternative would be to use the PyPi regex library (for POSIX regexes), but # want to stay light on dependencies# # # As such, we define a 'cumulative' stat as anything that's not a user stat. # Simple but effective. return user_pattern.match(s) is None
def is_user_stat(s)
-
Expand source code
def is_user_stat(s): return user_pattern.match(s) is not None
def of_node(dconn)
-
Returns statistic for a single node.
Parameters: dconn: quasardb.Node Direct node connection to the node we wish to connect to
Expand source code
def of_node(dconn): """ Returns statistic for a single node. Parameters: dconn: quasardb.Node Direct node connection to the node we wish to connect to """ start = datetime.now() raw = { k: _get_stat(dconn,k) for k in dconn.prefix_get(stats_prefix, 1000)} ret = {'by_uid': _by_uid(raw), 'cumulative': _cumulative(raw)} check_duration = datetime.now() - start ret['cumulative']['check.online'] = 1 ret['cumulative']['check.duration_ms'] = int(check_duration.total_seconds() * 1000) return ret
def stat_type(stat_id)
-
Returns the statistic type by a stat id. Returns one of:
- 'gauge'
- 'counter'
- None in case of unrecognized statistics
This is useful for determining which value should be reported in a dashboard.
Expand source code
def stat_type(stat_id): """ Returns the statistic type by a stat id. Returns one of: - 'gauge' - 'counter' - None in case of unrecognized statistics This is useful for determining which value should be reported in a dashboard. """ return _stat_type(stat_id)