Skip to content

Commit

Permalink
Implementation of the redesigned hypertable to optimize for compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Mar 3, 2023
1 parent 010fac7 commit a146ffc
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 91 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Enabling simple long time state storage (LTSS) for your sensor states in a Postg
The following extensions are required for full functionality:
* TimescaleDB
* PostGIS
* Ltree

LTSS automatically detects the available extensions and creates the necessary table accordingly. A PostgeSQL instance without those extensions can be used but will lack some features: efficient storing and accessing time-series data (without TimescaleDB) and directly accessing geolocation data of logged data (without PostGis).

Expand Down Expand Up @@ -45,7 +46,7 @@ configuration.yaml

ltss:
db_url: postgresql://USER:PASSWORD@HOST_ADRESS/DB_NAME
chunk_time_interval: 2592000000000
chunk_time_interval: 604800000000
include:
domains:
- sensor
Expand All @@ -66,7 +67,11 @@ configuration.yaml

chunk_time_interval
(int)(Optional)
The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 2592000000000 (30 days). Ignored for databases without TimescaleDB extension.
The time interval to be used for chunking in TimescaleDB in microseconds. Defaults to 604800000000 (7 days). Ignored for databases without TimescaleDB extension.

chunk_compression_after
(int)(Optional)
The time interval after which TimescaleDB will compress old chunks in microseconds. Defaults to 1209600000000 (14 days). Ignored for databases without TimescaleDB extension.

exclude
(map)(Optional)
Expand Down
64 changes: 50 additions & 14 deletions custom_components/ltss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
"""Support for recording details."""
import asyncio
import concurrent.futures
from contextlib import contextmanager
from datetime import datetime, timedelta
import logging
import queue
import threading
import time
import json
from typing import Any, Dict, Optional, Callable
from typing import Any, Callable

import voluptuous as vol
from sqlalchemy import exc, create_engine, inspect, text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.dialects.postgresql import insert

import psycopg2

from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_DOMAINS,
CONF_ENTITIES,
CONF_EXCLUDE,
CONF_INCLUDE,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED,
Expand All @@ -39,7 +33,7 @@
import homeassistant.util.dt as dt_util
from homeassistant.helpers.json import JSONEncoder

from .models import Base, LTSS
from .models import Base, LTSS, LTSS_ATTRIBUTES
from .migrations import check_and_migrate

_LOGGER = logging.getLogger(__name__)
Expand All @@ -48,6 +42,7 @@

CONF_DB_URL = "db_url"
CONF_CHUNK_TIME_INTERVAL = "chunk_time_interval"
CONF_CHUNK_COMPRESSION_AFTER = "chunk_compression_after"

CONNECT_RETRY_WAIT = 3

Expand All @@ -57,8 +52,11 @@
{
vol.Required(CONF_DB_URL): cv.string,
vol.Optional(
CONF_CHUNK_TIME_INTERVAL, default=2592000000000
): cv.positive_int, # 30 days
CONF_CHUNK_TIME_INTERVAL, default=604800000000
): cv.positive_int, # 7 days
vol.Optional(
CONF_CHUNK_COMPRESSION_AFTER, default=1209600000000
): cv.positive_int # 14 days
}
)
},
Expand All @@ -72,12 +70,14 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:

db_url = conf.get(CONF_DB_URL)
chunk_time_interval = conf.get(CONF_CHUNK_TIME_INTERVAL)
chunk_compression_after = conf.get(CONF_CHUNK_COMPRESSION_AFTER)
entity_filter = convert_include_exclude_filter(conf)

instance = LTSS_DB(
hass=hass,
uri=db_url,
chunk_time_interval=chunk_time_interval,
chunk_compression_after=chunk_compression_after,
entity_filter=entity_filter,
)
instance.async_initialize()
Expand All @@ -94,6 +94,7 @@ def __init__(
hass: HomeAssistant,
uri: str,
chunk_time_interval: int,
chunk_compression_after: int,
entity_filter: Callable[[str], bool],
) -> None:
"""Initialize the ltss."""
Expand All @@ -104,6 +105,7 @@ def __init__(
self.recording_start = dt_util.utcnow()
self.db_url = uri
self.chunk_time_interval = chunk_time_interval
self.chunk_compression_after = chunk_compression_after
self.async_db_ready = asyncio.Future()
self.engine: Any = None
self.run_info: Any = None
Expand Down Expand Up @@ -206,11 +208,22 @@ def notify_hass_started(event):
with self.get_session() as session:
with session.begin():
try:
row = LTSS.from_event(event)
row, attributes_row = LTSS.from_event(event)
# Insert the actual attributes first for the
# internal trigger to work
stmt = insert(
LTSS_ATTRIBUTES.__table__
).values(
attributes_row
)
session.execute(
stmt.on_conflict_do_nothing()
)
session.add(row)
except (TypeError, ValueError):
except (TypeError, ValueError) as e:
_LOGGER.warning(
"State is not JSON serializable: %s",
"State is not JSON serializable: %s => %s",
e,
event.data.get("new_state"),
)

Expand Down Expand Up @@ -319,6 +332,17 @@ def _create_table(self, available_extensions):
# activate location extraction in model/ORM to add necessary column when calling create_all()
LTSS.activate_location_extraction()

if 'ltree' not in available_extensions:
_LOGGER.error("ltree extension is required, but not found...")

if 'ltree' in available_extensions:
_LOGGER.info("ltree extension is available, enabling it...")
con.execute(
text(
"CREATE EXTENSION IF NOT EXISTS ltree CASCADE"
)
)

Base.metadata.create_all(self.engine)

if "timescaledb" in available_extensions:
Expand All @@ -336,6 +360,18 @@ def _create_table(self, available_extensions):
if_not_exists => TRUE);"""
)
)
con.execute(
text(
f"""ALTER TABLE {LTSS.__tablename__} SET (timescaledb.compress,
timescaledb.compress_orderby = 'time, entity_id')"""
)
)
con.execute(
text(
f"""SELECT add_compression_policy('{LTSS.__tablename__}',
(interval '1us') * {self.chunk_compression_after})"""
)
)

def _close_connection(self):
"""Close the connection."""
Expand Down
5 changes: 3 additions & 2 deletions custom_components/ltss/manifest.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"domain": "ltss",
"version": "2.0.1",
"version": "2.1.0",
"name": "Long Time State Storage (LTSS)",
"documentation": "https://github.com/freol35241/ltss",
"requirements": [
"sqlalchemy>=2.0,<3.0",
"psycopg2>=2.8,<3.0",
"geoalchemy2>=0.13,<1.0"
"geoalchemy2>=0.13,<1.0",
"sqlalchemy-utils>=0.40.0,<1.0"
],
"dependencies": [],
"codeowners": [
Expand Down
Loading

0 comments on commit a146ffc

Please sign in to comment.