Skip to content

Commit

Permalink
[InfluxDB] Improve database subsystem, and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jun 9, 2023
1 parent 693ff48 commit b2817b6
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 177 deletions.
2 changes: 0 additions & 2 deletions .coveragerc

This file was deleted.

7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ $(eval venv := .venv)
$(eval pip := $(venv)/bin/pip)
$(eval python := $(venv)/bin/python)
$(eval pytest := $(venv)/bin/pytest)
$(eval coverage := $(venv)/bin/coverage)
$(eval bumpversion := $(venv)/bin/bumpversion)
$(eval twine := $(venv)/bin/twine)
$(eval sphinx-build := $(venv)/bin/sphinx-build)
Expand Down Expand Up @@ -94,8 +95,10 @@ test: virtualenv-dev

.PHONY:
test-coverage: virtualenv-dev
$(pytest) --cov --cov-report=term-missing --cov-report=xml kotori test

$(coverage) run --concurrency=multiprocessing,thread --parallel-mode --timid $(pytest) kotori test
$(coverage) combine
$(coverage) report
$(coverage) xml


# =============
Expand Down
5 changes: 5 additions & 0 deletions etc/test/main.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ include =

; http server
[kotori]

; TODO: Refactor to [http] section.
http_listen = localhost
http_port = 24642

; TODO: Implement backend database selection.
; use_database = influxdb

; mqtt bus adapter
[mqtt]
host = localhost
Expand Down
1 change: 1 addition & 0 deletions kotori/daq/services/mig.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, channel=None, graphing=None, strategy=None):
self.strategy = strategy

self.name = u'service-mig-' + self.channel.get('realm', str(id(self)))
self.database = None

def setupService(self):

Expand Down
165 changes: 5 additions & 160 deletions kotori/daq/storage/influx.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# -*- coding: utf-8 -*-
# (c) 2015-2021 Andreas Motl <[email protected]>
import math

# (c) 2015-2023 Andreas Motl <[email protected]>
import requests
from copy import deepcopy
from funcy import project
from collections import OrderedDict
from twisted.logger import Logger
from influxdb.client import InfluxDBClient, InfluxDBClientError
from kotori.io.protocol.util import parse_timestamp, is_number, convert_floats

from kotori.daq.storage.util import format_chunk

log = Logger()

Expand All @@ -33,7 +32,7 @@ def __init__(self, settings=None, database=None):

self.__dict__.update(**settings)

# Bookeeping for all databases having been touched already
# Bookkeeping for all databases having been touched already
self.databases_written_once = set()

# Knowledge about all databases to be accessed using UDP
Expand Down Expand Up @@ -71,7 +70,7 @@ def write(self, meta, data):
data_copy = deepcopy(data)

try:
chunk = self.format_chunk(meta, data)
chunk = format_chunk(meta, data)

except Exception as ex:
log.failure(u'Could not format chunk (ex={ex_name}: {ex}): data={data}, meta={meta}',
Expand Down Expand Up @@ -122,160 +121,6 @@ def write_chunk(self, meta, chunk):
def get_tags(data):
return project(data, ['gateway', 'node'])

def format_chunk(self, meta, data):
"""
Format for InfluxDB >= 0.9::
{
"measurement": "hiveeyes_100",
"tags": {
"host": "server01",
"region": "europe"
},
"time": "2015-10-17T19:30:00Z",
"fields": {
"value": 0.42
}
}
"""

assert isinstance(data, dict), 'Data payload is not a dictionary'

chunk = {
"measurement": meta['measurement'],
"tags": {},
}

"""
if "gateway" in meta:
chunk["tags"]["gateway"] = meta["gateway"]
if "node" in meta:
chunk["tags"]["node"] = meta["node"]
"""

# TODO: Refactor to some knowledgebase component.
time_field_candidates = [
'time', # Vanilla
'datetime', # Vanilla
'Time', # Tasmota
'dateTime', # WeeWX
'timestamp', # Contrib
]

# Extract timestamp field from data
chunk['time_precision'] = 'n'
# FIXME: Unify with ``kotori.io.protocol.http.data_acquisition()``.
for time_field in time_field_candidates:
if time_field in data:

# WeeWX. TODO: Move to specific vendor configuration.
# Disabled in favor of precision detection heuristic.
#if time_field == 'dateTime':
# chunk['time_precision'] = 's'

# Process timestamp field.
if data[time_field]:

# Decode timestamp.
chunk['time'] = data[time_field]
if is_number(chunk['time']):
chunk['time'] = float(chunk['time'])

# Remove timestamp from data payload.
del data[time_field]

# If we found a timestamp field already,
# don't look out for more.
break

# Extract geohash from data. Finally, thanks Rich!
# TODO: Also precompute geohash with 3-4 different zoomlevels and add them as tags
if "geohash" in data:
chunk["tags"]["geohash"] = data["geohash"]
del data['geohash']

# Extract more information specific to luftdaten.info
for field in ['location', 'location_id', 'location_name', 'sensor_id', 'sensor_type']:
if field in data:
chunk["tags"][field] = data[field]
del data[field]

# TODO: Maybe do this at data acquisition / transformation time, not here.
if 'time' in chunk:
timestamp = chunk['time'] = parse_timestamp(chunk['time'])

# Heuristically compute timestamp precision
if isinstance(timestamp, (int, float)):
if timestamp >= 1e17 or timestamp <= -1e17:
time_precision = 'n'
elif timestamp >= 1e14 or timestamp <= -1e14:
time_precision = 'u'
elif timestamp >= 1e11 or timestamp <= -1e11:
time_precision = 'ms'

# TODO: Is this a reasonable default?
else:
time_precision = 's'

# Support fractional epoch timestamps like `1637431069.6585083`.
if isinstance(timestamp, float):
fractional, whole = math.modf(timestamp)
fracdigits = len(str(fractional)) - 2
if fracdigits > 0:
if fracdigits <= 3:
exponent = 3
time_precision = "ms"
elif fracdigits <= 6:
exponent = 6
time_precision = "u"
else:
exponent = 9
time_precision = "n"
timestamp = timestamp * (10 ** exponent)

chunk['time'] = int(timestamp)
chunk['time_precision'] = time_precision

"""
# FIXME: Breaks CSV data acquisition. Why?
if isinstance(chunk['time'], datetime.datetime):
if chunk['time'].microsecond == 0:
chunk['time_precision'] = 's'
"""

"""
Prevent errors like
ERROR: InfluxDBClientError: 400:
write failed: field type conflict:
input field "pitch" on measurement "01_position" is type float64, already exists as type integer
"""
self.data_to_float(data)

assert data, 'Data payload is empty'

chunk["fields"] = data

return chunk

def data_to_float(self, data):
return convert_floats(data)

for key, value in data.items():

# Sanity checks
if isinstance(value, str):
continue

if value is None:
data[key] = None
continue

# Convert to float
try:
data[key] = float(value)
except (TypeError, ValueError) as ex:
log.warn(u'Measurement "{key}: {value}" float conversion failed: {ex}', key=key, value=value, ex=ex)


class BusInfluxForwarder(object):
"""
Expand Down
142 changes: 142 additions & 0 deletions kotori/daq/storage/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
# (c) 2015-2023 Andreas Motl <[email protected]>
import math

from kotori.io.protocol.util import convert_floats, is_number, parse_timestamp


def format_chunk(meta, data):
"""
Format for InfluxDB >= 0.9::
{
"measurement": "hiveeyes_100",
"tags": {
"host": "server01",
"region": "europe"
},
"time": "2015-10-17T19:30:00Z",
"fields": {
"value": 0.42
}
}
"""

assert isinstance(data, dict), 'Data payload is not a dictionary'

chunk = {
"measurement": meta['measurement'],
"tags": {},
}

"""
if "gateway" in meta:
chunk["tags"]["gateway"] = meta["gateway"]
if "node" in meta:
chunk["tags"]["node"] = meta["node"]
"""

# TODO: Refactor to some knowledgebase component.
time_field_candidates = [
'time', # Vanilla
'datetime', # Vanilla
'Time', # Tasmota
'dateTime', # WeeWX
'timestamp', # Contrib
]

# Extract timestamp field from data
chunk['time_precision'] = 'n'
# FIXME: Unify with ``kotori.io.protocol.http.data_acquisition()``.
for time_field in time_field_candidates:
if time_field in data:

# WeeWX. TODO: Move to specific vendor configuration.
# Disabled in favor of precision detection heuristic.
# if time_field == 'dateTime':
# chunk['time_precision'] = 's'

# Process timestamp field.
if data[time_field]:

# Decode timestamp.
chunk['time'] = data[time_field]
if is_number(chunk['time']):
chunk['time'] = float(chunk['time'])

# Remove timestamp from data payload.
del data[time_field]

# If we found a timestamp field already,
# don't look out for more.
break

# Extract geohash from data. Finally, thanks Rich!
# TODO: Also precompute geohash with 3-4 different zoomlevels and add them as tags
if "geohash" in data:
chunk["tags"]["geohash"] = data["geohash"]
del data['geohash']

# Extract more information specific to luftdaten.info
for field in ['location', 'location_id', 'location_name', 'sensor_id', 'sensor_type']:
if field in data:
chunk["tags"][field] = data[field]
del data[field]

# TODO: Maybe do this at data acquisition / transformation time, not here.
if 'time' in chunk:
timestamp = chunk['time'] = parse_timestamp(chunk['time'])

# Heuristically compute timestamp precision
if isinstance(timestamp, (int, float)):
if timestamp >= 1e17 or timestamp <= -1e17:
time_precision = 'n'
elif timestamp >= 1e14 or timestamp <= -1e14:
time_precision = 'u'
elif timestamp >= 1e11 or timestamp <= -1e11:
time_precision = 'ms'

# TODO: Is this a reasonable default?
else:
time_precision = 's'

# Support fractional epoch timestamps like `1637431069.6585083`.
if isinstance(timestamp, float):
fractional, whole = math.modf(timestamp)
fracdigits = len(str(fractional)) - 2
if fracdigits > 0:
if fracdigits <= 3:
exponent = 3
time_precision = "ms"
elif fracdigits <= 6:
exponent = 6
time_precision = "u"
else:
exponent = 9
time_precision = "n"
timestamp = timestamp * (10 ** exponent)

chunk['time'] = int(timestamp)
chunk['time_precision'] = time_precision

"""
# FIXME: Breaks CSV data acquisition. Why?
if isinstance(chunk['time'], datetime.datetime):
if chunk['time'].microsecond == 0:
chunk['time_precision'] = 's'
"""

# Make sure numeric data in `fields` is in float format.
"""
Prevent errors like
ERROR: InfluxDBClientError: 400:
write failed: field type conflict:
input field "pitch" on measurement "01_position" is type float64, already exists as type integer
"""
convert_floats(data)

assert data, 'Data payload is empty'

chunk["fields"] = data

return chunk
Loading

0 comments on commit b2817b6

Please sign in to comment.