From 71aa84806d73c62b2b6bdf6b3c29b8ba8a3d70af Mon Sep 17 00:00:00 2001 From: Jesper Date: Thu, 5 Dec 2019 14:24:54 +0100 Subject: [PATCH 1/9] Added support for timescaledb --- agnostic/__init__.py | 16 ++++ agnostic/cli.py | 2 +- agnostic/timescale.py | 216 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 agnostic/timescale.py diff --git a/agnostic/__init__.py b/agnostic/__init__.py index bcb8cdd..823732b 100644 --- a/agnostic/__init__.py +++ b/agnostic/__init__.py @@ -117,6 +117,22 @@ def askpass(user, db): raise return PostgresBackend(host, port, user, password, database, schema) + elif db_type == 'timescale': + if user is None or database is None: + raise RuntimeError('TimescaleDB requires user and database ' + 'arguments.') + host = host or 'localhost' + password = password or askpass(user, database) + try: + from agnostic.timescale import TimescaleBackend + except ImportError as ie: # pragma: no cover + if ie.name == 'pg8000': + raise RuntimeError('The `pg8000` module is required for ' + ' TimescaleDB.') + else: + raise + return TimescaleBackend(host, port, user, password, database, schema) + elif db_type == 'sqlite': if (host is not None or port is not None or user is not None or password is not None or schema is not None): diff --git a/agnostic/cli.py b/agnostic/cli.py index f577a5b..05df7b2 100644 --- a/agnostic/cli.py +++ b/agnostic/cli.py @@ -28,7 +28,7 @@ def __init__(self): envvar='AGNOSTIC_TYPE', metavar='', required=True, - type=click.Choice(['mysql', 'postgres', 'sqlite']), + type=click.Choice(['mysql', 'postgres', 'sqlite', 'timescale']), help='Type of database.' ) @click.option( diff --git a/agnostic/timescale.py b/agnostic/timescale.py new file mode 100644 index 0000000..bb62c7c --- /dev/null +++ b/agnostic/timescale.py @@ -0,0 +1,216 @@ +import os +import subprocess + +import pg8000 + +from agnostic import AbstractBackend + + +class TimescaleBackend(AbstractBackend): + ''' Support for TimescaleDB. ''' + + def backup_db(self, backup_file): + ''' + Return a ``Popen`` instance that will backup the database to the + ``backup_file`` handle. + ''' + + env = {'PGPASSWORD': self._password} + env.update(os.environ) + + command = [ + 'pg_dump', + '-h', self._host, + '-U', self._user, + ] + + if self._port is not None: + command.append('-p') + command.append(str(self._port)) + + for schema in self._split_schema(): + command.append('-n') + command.append(schema) + + command.append(self._database) + + process = subprocess.Popen( + command, + env=env, + stdout=backup_file, + stderr=subprocess.PIPE + ) + + return process + + def clear_db(self, cursor): + ''' Remove all objects from the database. ''' + + # Drop tables. + cursor.execute(''' + SELECT schemaname, tablename FROM pg_tables + WHERE tableowner = %s + AND schemaname != 'pg_catalog' + AND schemaname != 'information_schema' + AND schemaname != '_timescaledb_catalog' + AND schemaname != '_timescaledb_config' + AND schemaname != '_timescaledb_internal' + AND schemaname != '_timescaledb_cache' + ''', (self._user,)) + + tables = ['"{}"."{}"'.format(r[0], r[1]) for r in cursor.fetchall()] + + if len(tables) > 0: + sql = 'DROP TABLE {} CASCADE'.format(', '.join(tables)) + cursor.execute(sql) + + # Drop sequences. + cursor.execute(''' + SELECT relname FROM pg_class + WHERE relkind = 'S' + ''') + + sequences = ['"{}"'.format(row[0]) for row in cursor.fetchall()] + + if len(sequences) > 0: + sql = 'DROP SEQUENCE IF EXISTS {} CASCADE'.format(','.join(sequences)) + cursor.execute(sql) + + # Drop custom types, e.g. ENUM types. + cursor.execute(''' + SELECT typname FROM pg_type + WHERE typtype = 'e' + ''') + + types = ['"{}"'.format(row[0]) for row in cursor.fetchall()] + + if len(types) > 0: + sql = 'DROP TYPE {} CASCADE'.format(','.join(types)) + cursor.execute(sql) + + # Drop schema objects. + for schema in self._split_schema(): + if schema != 'public': + sql = 'DROP SCHEMA IF EXISTS {} CASCADE'.format(schema) + cursor.execute(sql) + + def connect_db(self): + ''' Connect to PostgreSQL. ''' + + connect_args = { + 'host': self._host, + 'user': self._user, + 'password': self._password, + 'database': self._database, + } + + if self._port is not None: + connect_args['port'] = self._port + + db = pg8000.connect(**connect_args) + db.autocommit = True + if self._schema is not None: + cursor = db.cursor() + cursor.execute("SET SCHEMA '{}'".format(self._schema)) + return db + + def get_schema_command(self): + ''' Return a command that will set the current schema. ''' + + if self._schema is None: + return 'SET search_path = "$user",public;\n' + else: + return 'SET search_path = {};\n'.format(self._schema) + + def restore_db(self, backup_file): + ''' + Return a ``Popen`` instance that will restore the database from the + ``backup_file`` handle. + ''' + + env = {'PGPASSWORD': self._password} + env.update(os.environ) + + command = [ + 'psql', + '-h', self._host, + '-U', self._user, + '-v', 'ON_ERROR_STOP=1', # Fail fast if an error occurs. + ] + + if self._port is not None: + command.append('-p') + command.append(str(self._port)) + + command.append(self._database) + + process = subprocess.Popen( + command, + env=env, + stdin=backup_file, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE + ) + + return process + + def snapshot_db(self, snapshot_file): + ''' + Return a ``Popen`` instance that writes a snapshot to ``snapshot_file``. + ''' + + env = {'PGPASSWORD': self._password} + env.update(os.environ) + + command = [ + 'pg_dump', + '-h', self._host, + '-U', self._user, + '-s', # dump schema only + '-x', # don't dump grant/revoke statements + '-O', # don't dump ownership commands + '--no-tablespaces', + ] + + if self._port is not None: + command.append('-p') + command.append(str(self._port)) + + if self._schema is not None: + for schema in self._split_schema(): + command.extend(('-n', schema)) + + command.append(self._database) + + process = subprocess.Popen( + command, + env=env, + stdout=snapshot_file, + stderr=subprocess.PIPE + ) + + return process + + def _split_schema(self): + ''' + Split schema string into separate schema names. + + PostgreSQL allows specifying the schema name as a search path that + look for objects in more than one schema. This method breaks that + search path into individual schema names. + + It also replaces the special schema name ``"$user"`` (quotes included) + with the current username, mimicking the ``SET SEARCH PATH TO ...`` + behavior in PostgreSQL. + ''' + + schemas = list() + + if self._schema is not None: + for schema in map(str.strip, self._schema.split(',')): + if schema == '"$user"': + schemas.append(self._user) + else: + schemas.append(schema) + + return schemas From df8af3c251a3dd60e634c353e3741d57ed0249f7 Mon Sep 17 00:00:00 2001 From: Jesper Date: Thu, 5 Dec 2019 14:48:53 +0100 Subject: [PATCH 2/9] Added another fix --- agnostic/timescale.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index bb62c7c..dd6872c 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -47,15 +47,24 @@ def clear_db(self, cursor): ''' Remove all objects from the database. ''' # Drop tables. + + # First hypertables + cursor.execute(''' + SELECT schemaname, tablename FROM _timescaledb_catalog.hypertable + WHERE tableowner = %s + ''', (self._user,)) + + tables = ['"{}"."{}"'.format(r[0], r[1]) for r in cursor.fetchall()] + + if len(tables) > 0: + sql = 'DROP TABLE {} CASCADE'.format(', '.join(tables)) + cursor.execute(sql) + cursor.execute(''' SELECT schemaname, tablename FROM pg_tables WHERE tableowner = %s AND schemaname != 'pg_catalog' AND schemaname != 'information_schema' - AND schemaname != '_timescaledb_catalog' - AND schemaname != '_timescaledb_config' - AND schemaname != '_timescaledb_internal' - AND schemaname != '_timescaledb_cache' ''', (self._user,)) tables = ['"{}"."{}"'.format(r[0], r[1]) for r in cursor.fetchall()] From eb22b030c8f760e0495563dd35fe851e6f499da2 Mon Sep 17 00:00:00 2001 From: Jesper Date: Thu, 5 Dec 2019 14:51:58 +0100 Subject: [PATCH 3/9] Fixed underscore. --- agnostic/timescale.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index dd6872c..b65df14 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -50,7 +50,7 @@ def clear_db(self, cursor): # First hypertables cursor.execute(''' - SELECT schemaname, tablename FROM _timescaledb_catalog.hypertable + SELECT schema_name, table_name FROM _timescaledb_catalog.hypertable WHERE tableowner = %s ''', (self._user,)) From 999425c6ef89907ad7f92769f8fdd03a1ab9c830 Mon Sep 17 00:00:00 2001 From: Jesper Date: Thu, 5 Dec 2019 15:02:23 +0100 Subject: [PATCH 4/9] No user info --- agnostic/timescale.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index b65df14..81e80c6 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -51,8 +51,7 @@ def clear_db(self, cursor): # First hypertables cursor.execute(''' SELECT schema_name, table_name FROM _timescaledb_catalog.hypertable - WHERE tableowner = %s - ''', (self._user,)) + ''') tables = ['"{}"."{}"'.format(r[0], r[1]) for r in cursor.fetchall()] From 86e0f99f3354c5782a8fc9b6b1c773e1fb8b1ecb Mon Sep 17 00:00:00 2001 From: Jesper Date: Thu, 5 Dec 2019 15:06:45 +0100 Subject: [PATCH 5/9] Drop extension --- agnostic/timescale.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index 81e80c6..738d415 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -46,6 +46,11 @@ def backup_db(self, backup_file): def clear_db(self, cursor): ''' Remove all objects from the database. ''' + # Drop extension + cursor.execute(''' + DROP EXTENSION timescaledb CASCADE; + ''') + # Drop tables. # First hypertables From 7617db8ff28825742634ba56c7ef2e9c7a8b6b55 Mon Sep 17 00:00:00 2001 From: Jesper Date: Thu, 5 Dec 2019 15:08:35 +0100 Subject: [PATCH 6/9] No tables to drop. --- agnostic/timescale.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index 738d415..22d150d 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -53,17 +53,6 @@ def clear_db(self, cursor): # Drop tables. - # First hypertables - cursor.execute(''' - SELECT schema_name, table_name FROM _timescaledb_catalog.hypertable - ''') - - tables = ['"{}"."{}"'.format(r[0], r[1]) for r in cursor.fetchall()] - - if len(tables) > 0: - sql = 'DROP TABLE {} CASCADE'.format(', '.join(tables)) - cursor.execute(sql) - cursor.execute(''' SELECT schemaname, tablename FROM pg_tables WHERE tableowner = %s From 4601a4d33ff1e9001dab42ba8161976538d5424c Mon Sep 17 00:00:00 2001 From: Jesper Date: Thu, 5 Dec 2019 15:11:56 +0100 Subject: [PATCH 7/9] If exists. --- agnostic/timescale.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index 22d150d..2d40603 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -48,7 +48,7 @@ def clear_db(self, cursor): # Drop extension cursor.execute(''' - DROP EXTENSION timescaledb CASCADE; + DROP EXTENSION IF EXISTS timescaledb CASCADE; ''') # Drop tables. From 60aab7e469410117a96a34a2c3471123de93ee53 Mon Sep 17 00:00:00 2001 From: jesper Date: Fri, 27 Dec 2019 15:15:39 +0100 Subject: [PATCH 8/9] Subclassed --- agnostic/timescale.py | 212 ++---------------------------------------- 1 file changed, 7 insertions(+), 205 deletions(-) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index 2d40603..a21dd53 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -3,216 +3,18 @@ import pg8000 -from agnostic import AbstractBackend +from agnostic import PostgresBackend -class TimescaleBackend(AbstractBackend): - ''' Support for TimescaleDB. ''' - - def backup_db(self, backup_file): - ''' - Return a ``Popen`` instance that will backup the database to the - ``backup_file`` handle. - ''' - - env = {'PGPASSWORD': self._password} - env.update(os.environ) - - command = [ - 'pg_dump', - '-h', self._host, - '-U', self._user, - ] - - if self._port is not None: - command.append('-p') - command.append(str(self._port)) - - for schema in self._split_schema(): - command.append('-n') - command.append(schema) - - command.append(self._database) - - process = subprocess.Popen( - command, - env=env, - stdout=backup_file, - stderr=subprocess.PIPE - ) - - return process - +class TimescaleBackend(PostgresBackend): + def clear_db(self, cursor): - ''' Remove all objects from the database. ''' + + ''' Remove timescale extension. ''' # Drop extension cursor.execute(''' DROP EXTENSION IF EXISTS timescaledb CASCADE; ''') - - # Drop tables. - - cursor.execute(''' - SELECT schemaname, tablename FROM pg_tables - WHERE tableowner = %s - AND schemaname != 'pg_catalog' - AND schemaname != 'information_schema' - ''', (self._user,)) - - tables = ['"{}"."{}"'.format(r[0], r[1]) for r in cursor.fetchall()] - - if len(tables) > 0: - sql = 'DROP TABLE {} CASCADE'.format(', '.join(tables)) - cursor.execute(sql) - - # Drop sequences. - cursor.execute(''' - SELECT relname FROM pg_class - WHERE relkind = 'S' - ''') - - sequences = ['"{}"'.format(row[0]) for row in cursor.fetchall()] - - if len(sequences) > 0: - sql = 'DROP SEQUENCE IF EXISTS {} CASCADE'.format(','.join(sequences)) - cursor.execute(sql) - - # Drop custom types, e.g. ENUM types. - cursor.execute(''' - SELECT typname FROM pg_type - WHERE typtype = 'e' - ''') - - types = ['"{}"'.format(row[0]) for row in cursor.fetchall()] - - if len(types) > 0: - sql = 'DROP TYPE {} CASCADE'.format(','.join(types)) - cursor.execute(sql) - - # Drop schema objects. - for schema in self._split_schema(): - if schema != 'public': - sql = 'DROP SCHEMA IF EXISTS {} CASCADE'.format(schema) - cursor.execute(sql) - - def connect_db(self): - ''' Connect to PostgreSQL. ''' - - connect_args = { - 'host': self._host, - 'user': self._user, - 'password': self._password, - 'database': self._database, - } - - if self._port is not None: - connect_args['port'] = self._port - - db = pg8000.connect(**connect_args) - db.autocommit = True - if self._schema is not None: - cursor = db.cursor() - cursor.execute("SET SCHEMA '{}'".format(self._schema)) - return db - - def get_schema_command(self): - ''' Return a command that will set the current schema. ''' - - if self._schema is None: - return 'SET search_path = "$user",public;\n' - else: - return 'SET search_path = {};\n'.format(self._schema) - - def restore_db(self, backup_file): - ''' - Return a ``Popen`` instance that will restore the database from the - ``backup_file`` handle. - ''' - - env = {'PGPASSWORD': self._password} - env.update(os.environ) - - command = [ - 'psql', - '-h', self._host, - '-U', self._user, - '-v', 'ON_ERROR_STOP=1', # Fail fast if an error occurs. - ] - - if self._port is not None: - command.append('-p') - command.append(str(self._port)) - - command.append(self._database) - - process = subprocess.Popen( - command, - env=env, - stdin=backup_file, - stdout=subprocess.DEVNULL, - stderr=subprocess.PIPE - ) - - return process - - def snapshot_db(self, snapshot_file): - ''' - Return a ``Popen`` instance that writes a snapshot to ``snapshot_file``. - ''' - - env = {'PGPASSWORD': self._password} - env.update(os.environ) - - command = [ - 'pg_dump', - '-h', self._host, - '-U', self._user, - '-s', # dump schema only - '-x', # don't dump grant/revoke statements - '-O', # don't dump ownership commands - '--no-tablespaces', - ] - - if self._port is not None: - command.append('-p') - command.append(str(self._port)) - - if self._schema is not None: - for schema in self._split_schema(): - command.extend(('-n', schema)) - - command.append(self._database) - - process = subprocess.Popen( - command, - env=env, - stdout=snapshot_file, - stderr=subprocess.PIPE - ) - - return process - - def _split_schema(self): - ''' - Split schema string into separate schema names. - - PostgreSQL allows specifying the schema name as a search path that - look for objects in more than one schema. This method breaks that - search path into individual schema names. - - It also replaces the special schema name ``"$user"`` (quotes included) - with the current username, mimicking the ``SET SEARCH PATH TO ...`` - behavior in PostgreSQL. - ''' - - schemas = list() - - if self._schema is not None: - for schema in map(str.strip, self._schema.split(',')): - if schema == '"$user"': - schemas.append(self._user) - else: - schemas.append(schema) - - return schemas + + return super().clear_db(cursor) From 3021dedbaaca28459a7729ac585173d302c23749 Mon Sep 17 00:00:00 2001 From: jesper Date: Fri, 27 Dec 2019 15:18:32 +0100 Subject: [PATCH 9/9] Fixed import --- agnostic/timescale.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agnostic/timescale.py b/agnostic/timescale.py index a21dd53..3ce89e0 100644 --- a/agnostic/timescale.py +++ b/agnostic/timescale.py @@ -3,7 +3,7 @@ import pg8000 -from agnostic import PostgresBackend +from agnostic.postgres import PostgresBackend class TimescaleBackend(PostgresBackend):