Skip to content

Commit

Permalink
Improve export capabilities
Browse files Browse the repository at this point in the history
- Add parameters `sort`, `direction`, `limit`, and `scalar`.
- Improve documentation
  • Loading branch information
amotl committed Aug 14, 2023
1 parent 5c3c856 commit 23c3764
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ in progress
a single TTN Application. Thanks, @thiasB, @einsiedlerkrebs, and @ClemensGruber.
- [core] Fix error when connecting to MQTT broker without authentication credentials
- [docs] Refactor "decoders" section to "integrations", and improve index/overview page
- [export] Improve export capabilities by adding parameters ``sort``, ``direction``,
``limit``, and ``scalar``. Thanks, @ClemensGruber.


.. _kotori-0.27.0:
Expand Down
26 changes: 20 additions & 6 deletions doc/source/handbook/export/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,31 @@ This would yield the result with "weight" field omitted::
2016-07-01 16:58:34.788767764,64.64,48.48
2016-07-01 16:58:37.645754806,64.64,48.48

There's also the url parameter ``include`` which does things the other way round:
Include fields
==============
There is also the url parameter ``include`` which does things the other way round:
It will only export the named fields::

http GET $HTTP_URI/api/$MQTT_TOPIC/data.csv include=weight

Both parameters take a comma-separated list of field names.

Sort records
============
The URL parameter ``sort`` specifies a field name you would like to sort by, e.g.
``sort=temperature``. The parameter ``direction`` can be used to control the sort
oder, e.g. ``direction=desc`` for sorting in descending order. If you omit the
``direction`` parameter, the sort order is ascending.

Limit result size
=================
The ``limit`` parameter controls the number of records to be returned.

Return scalar values
====================
In order to return a single scalar value, use the ``scalar`` parameter. It will
omit any headers. For example, ``sort=time&direction=desc&scalar=humidity`` will
return the most recent humidity value in the database.

Hierarchical data
=================
Expand All @@ -182,8 +200,4 @@ Todo

Describe parameters::

include, exclude, pad
interpolate=true
&from=20160519
sorted

pad, interpolate=true
16 changes: 12 additions & 4 deletions kotori/io/protocol/target.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# (c) 2016-2021 Andreas Motl <[email protected]>
# (c) 2016-2023 Andreas Motl <[email protected]>
from pyramid.settings import asbool
from twisted.internet import threads
from twisted.web import http, server
Expand Down Expand Up @@ -108,6 +108,10 @@ def emit(self, uri, bucket):

# DataFrame manipulation

if 'sort' in bucket.tdata and bucket.tdata.sort:
# http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.sort.html
df.sort_values(by=read_list(bucket.tdata.sort), ascending=bucket.tdata.get("direction", "ascending").startswith("asc"), inplace=True)

# Drop some fields from DataFrame as requested
if 'exclude' in bucket.tdata and bucket.tdata.exclude:
drop_fields = read_list(bucket.tdata.exclude, empty_elements=False)
Expand Down Expand Up @@ -144,10 +148,14 @@ def emit(self, uri, bucket):
# http://pandas.pydata.org/pandas-docs/stable/missing_data.html#interpolation
df.interpolate(inplace=True)

if 'sorted' in bucket.tdata and asbool(bucket.tdata.sorted):
# http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.sort.html
df.sort(axis='columns', inplace=True)
# Only return specified number of records.
if 'limit' in bucket.tdata and bucket.tdata.limit:
df = df[:int(bucket.tdata.limit)]

if 'scalar' in bucket.tdata and bucket.tdata.scalar:
bucket.request.setHeader('Content-Type', 'text/plain; charset=utf-8')
value = df[bucket.tdata.scalar].values[0]
return str(value)

# Compute http response from DataFrame, taking designated output format into account
response = HttpDataFrameResponse(bucket, dataframe=df)
Expand Down
103 changes: 103 additions & 0 deletions test/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,109 @@ def verify_export_general(channel_path, http_submit, http_fetch):
assert b"cdn.datatables.net" in deferred.result


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.export
def test_export_exclude_include(machinery, create_influxdb, reset_influxdb):
"""
Verify `exclude` and `include` transformation parameters of HTTP export API.
"""

# Submit a single measurement, with timestamp.
data = {
'time': 1583810982,
'temperature': 25.26,
'humidity': 51.8,
}
yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Excluding fields.
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
"from": ts_from,
"to": ts_to,
"exclude": "time,humidity",
})
yield deferred
result = json.loads(deferred.result)
assert_equal(result, [{"temperature": 25.26}])

# Including fields.
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
"from": ts_from,
"to": ts_to,
"include": "temperature",
})
yield deferred
result = json.loads(deferred.result)
assert_equal(result, [{"time": "2020-03-10T03:29:42.000Z", "temperature": 25.26}])


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.export
def test_export_sorting_limit_scalar(machinery, create_influxdb, reset_influxdb):
"""
Verify `sort` and `direction` transformation parameters of HTTP export API.
"""

# Submit two measurements, with timestamp.
data = {
'time': 1583810982,
'temperature': 25.26,
'humidity': 51.8,
}
yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data)

data = {
'time': 1583810993,
'temperature': 32.26,
'humidity': 64.8,
}
yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Sorting.
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
"from": ts_from,
"to": ts_to,
"exclude": "time,humidity",
"sort": "temperature",
"direction": "descending",
})
yield deferred
result = json.loads(deferred.result)
assert_equal(result, [{"temperature": 32.26}, {'temperature': 25.26}])

# Limit.
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
"from": ts_from,
"to": ts_to,
"exclude": "time,humidity",
"sort": "temperature",
"direction": "descending",
"limit": 1,
})
yield deferred
result = json.loads(deferred.result)
assert_equal(result, [{"temperature": 32.26}])

# Scalar.
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="txt", params={
"from": ts_from,
"to": ts_to,
"sort": "temperature",
"direction": "descending",
"scalar": "temperature",
})
yield deferred
assert deferred.result == "32.26"


@pytest_twisted.inlineCallbacks
@pytest.mark.http
@pytest.mark.export
Expand Down
11 changes: 8 additions & 3 deletions test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,16 @@ def http_csv_sensor(topic, data):
return requests.post(uri, data=body, headers={'Content-Type': 'text/csv'})


def http_get_data(path: str = None, format='csv', ts_from=None, ts_to=None, port=24642):
def http_get_data(path: str = None, format='csv', params=None, ts_from=None, ts_to=None, port=24642):
path = path.lstrip("/")
uri = f'http://localhost:{port}/api/{path}.{format}?from={ts_from}&to={ts_to}'
uri = f'http://localhost:{port}/api/{path}.{format}'
logger.info('HTTP: Exporting data from {} using format "{}"'.format(uri, format))
payload = requests.get(uri).content
params = params or {}
if ts_from:
params["from"] = ts_from
if ts_to:
params["to"] = ts_to
payload = requests.get(uri, params=params).content
if format in ["csv", "txt", "json", "html"]:
payload = payload.decode()
return payload
Expand Down

0 comments on commit 23c3764

Please sign in to comment.