From 23c37646d823f40fcd54f92a244440c227faf915 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 9 Jun 2023 01:27:57 +0200 Subject: [PATCH] Improve export capabilities - Add parameters `sort`, `direction`, `limit`, and `scalar`. - Improve documentation --- CHANGES.rst | 2 + doc/source/handbook/export/index.rst | 26 +++++-- kotori/io/protocol/target.py | 16 +++-- test/test_export.py | 103 +++++++++++++++++++++++++++ test/util.py | 11 ++- 5 files changed, 145 insertions(+), 13 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 756e8aa2..450b6e45 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -25,6 +25,8 @@ in progress a single TTN Application. Thanks, @thiasB, @einsiedlerkrebs, and @ClemensGruber. - [core] Fix error when connecting to MQTT broker without authentication credentials - [docs] Refactor "decoders" section to "integrations", and improve index/overview page +- [export] Improve export capabilities by adding parameters ``sort``, ``direction``, + ``limit``, and ``scalar``. Thanks, @ClemensGruber. .. _kotori-0.27.0: diff --git a/doc/source/handbook/export/index.rst b/doc/source/handbook/export/index.rst index 7860414f..07d5f2cc 100644 --- a/doc/source/handbook/export/index.rst +++ b/doc/source/handbook/export/index.rst @@ -160,13 +160,31 @@ This would yield the result with "weight" field omitted:: 2016-07-01 16:58:34.788767764,64.64,48.48 2016-07-01 16:58:37.645754806,64.64,48.48 -There's also the url parameter ``include`` which does things the other way round: +Include fields +============== +There is also the url parameter ``include`` which does things the other way round: It will only export the named fields:: http GET $HTTP_URI/api/$MQTT_TOPIC/data.csv include=weight Both parameters take a comma-separated list of field names. +Sort records +============ +The URL parameter ``sort`` specifies a field name you would like to sort by, e.g. +``sort=temperature``. The parameter ``direction`` can be used to control the sort +oder, e.g. ``direction=desc`` for sorting in descending order. If you omit the +``direction`` parameter, the sort order is ascending. + +Limit result size +================= +The ``limit`` parameter controls the number of records to be returned. + +Return scalar values +==================== +In order to return a single scalar value, use the ``scalar`` parameter. It will +omit any headers. For example, ``sort=time&direction=desc&scalar=humidity`` will +return the most recent humidity value in the database. Hierarchical data ================= @@ -182,8 +200,4 @@ Todo Describe parameters:: - include, exclude, pad - interpolate=true - &from=20160519 - sorted - + pad, interpolate=true diff --git a/kotori/io/protocol/target.py b/kotori/io/protocol/target.py index 56f3abd8..3c18dd16 100644 --- a/kotori/io/protocol/target.py +++ b/kotori/io/protocol/target.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# (c) 2016-2021 Andreas Motl +# (c) 2016-2023 Andreas Motl from pyramid.settings import asbool from twisted.internet import threads from twisted.web import http, server @@ -108,6 +108,10 @@ def emit(self, uri, bucket): # DataFrame manipulation + if 'sort' in bucket.tdata and bucket.tdata.sort: + # http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.sort.html + df.sort_values(by=read_list(bucket.tdata.sort), ascending=bucket.tdata.get("direction", "ascending").startswith("asc"), inplace=True) + # Drop some fields from DataFrame as requested if 'exclude' in bucket.tdata and bucket.tdata.exclude: drop_fields = read_list(bucket.tdata.exclude, empty_elements=False) @@ -144,10 +148,14 @@ def emit(self, uri, bucket): # http://pandas.pydata.org/pandas-docs/stable/missing_data.html#interpolation df.interpolate(inplace=True) - if 'sorted' in bucket.tdata and asbool(bucket.tdata.sorted): - # http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.sort.html - df.sort(axis='columns', inplace=True) + # Only return specified number of records. + if 'limit' in bucket.tdata and bucket.tdata.limit: + df = df[:int(bucket.tdata.limit)] + if 'scalar' in bucket.tdata and bucket.tdata.scalar: + bucket.request.setHeader('Content-Type', 'text/plain; charset=utf-8') + value = df[bucket.tdata.scalar].values[0] + return str(value) # Compute http response from DataFrame, taking designated output format into account response = HttpDataFrameResponse(bucket, dataframe=df) diff --git a/test/test_export.py b/test/test_export.py index 519b632e..c4a89b2a 100644 --- a/test/test_export.py +++ b/test/test_export.py @@ -104,6 +104,109 @@ def verify_export_general(channel_path, http_submit, http_fetch): assert b"cdn.datatables.net" in deferred.result +@pytest_twisted.inlineCallbacks +@pytest.mark.http +@pytest.mark.export +def test_export_exclude_include(machinery, create_influxdb, reset_influxdb): + """ + Verify `exclude` and `include` transformation parameters of HTTP export API. + """ + + # Submit a single measurement, with timestamp. + data = { + 'time': 1583810982, + 'temperature': 25.26, + 'humidity': 51.8, + } + yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + + # Excluding fields. + deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={ + "from": ts_from, + "to": ts_to, + "exclude": "time,humidity", + }) + yield deferred + result = json.loads(deferred.result) + assert_equal(result, [{"temperature": 25.26}]) + + # Including fields. + deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={ + "from": ts_from, + "to": ts_to, + "include": "temperature", + }) + yield deferred + result = json.loads(deferred.result) + assert_equal(result, [{"time": "2020-03-10T03:29:42.000Z", "temperature": 25.26}]) + + +@pytest_twisted.inlineCallbacks +@pytest.mark.http +@pytest.mark.export +def test_export_sorting_limit_scalar(machinery, create_influxdb, reset_influxdb): + """ + Verify `sort` and `direction` transformation parameters of HTTP export API. + """ + + # Submit two measurements, with timestamp. + data = { + 'time': 1583810982, + 'temperature': 25.26, + 'humidity': 51.8, + } + yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data) + + data = { + 'time': 1583810993, + 'temperature': 32.26, + 'humidity': 64.8, + } + yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + + # Sorting. + deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={ + "from": ts_from, + "to": ts_to, + "exclude": "time,humidity", + "sort": "temperature", + "direction": "descending", + }) + yield deferred + result = json.loads(deferred.result) + assert_equal(result, [{"temperature": 32.26}, {'temperature': 25.26}]) + + # Limit. + deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={ + "from": ts_from, + "to": ts_to, + "exclude": "time,humidity", + "sort": "temperature", + "direction": "descending", + "limit": 1, + }) + yield deferred + result = json.loads(deferred.result) + assert_equal(result, [{"temperature": 32.26}]) + + # Scalar. + deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="txt", params={ + "from": ts_from, + "to": ts_to, + "sort": "temperature", + "direction": "descending", + "scalar": "temperature", + }) + yield deferred + assert deferred.result == "32.26" + + @pytest_twisted.inlineCallbacks @pytest.mark.http @pytest.mark.export diff --git a/test/util.py b/test/util.py index 83f75e9c..feaee391 100644 --- a/test/util.py +++ b/test/util.py @@ -256,11 +256,16 @@ def http_csv_sensor(topic, data): return requests.post(uri, data=body, headers={'Content-Type': 'text/csv'}) -def http_get_data(path: str = None, format='csv', ts_from=None, ts_to=None, port=24642): +def http_get_data(path: str = None, format='csv', params=None, ts_from=None, ts_to=None, port=24642): path = path.lstrip("/") - uri = f'http://localhost:{port}/api/{path}.{format}?from={ts_from}&to={ts_to}' + uri = f'http://localhost:{port}/api/{path}.{format}' logger.info('HTTP: Exporting data from {} using format "{}"'.format(uri, format)) - payload = requests.get(uri).content + params = params or {} + if ts_from: + params["from"] = ts_from + if ts_to: + params["to"] = ts_to + payload = requests.get(uri, params=params).content if format in ["csv", "txt", "json", "html"]: payload = payload.decode() return payload