Skip to content

Commit

Permalink
Add table dump to a csv feature (#157)
Browse files Browse the repository at this point in the history
* adding initial version of table dump

* fix missing import

* installing gcloud sdk

* trying to install as nginx

* add error handling

* try new docker

* trying to fix gcloud install

* change docker build

* docker gcloud fix

* fix query syntax

* adding service account activation

* fixing command

* fix export syntax

* improved dump

* typo fix

* missing import

* switch order of header and csv

* add version 0 bypass

* adding unsigned int

* fix metadata for views and int=0

* make message returned more complex
  • Loading branch information
fcollman authored Aug 25, 2024
1 parent 413242d commit 41ee72d
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 33 deletions.
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ RUN mkdir -p /home/nginx/.cloudvolume/secrets \
COPY requirements.txt /app/.
RUN python -m pip install --upgrade pip
RUN pip install -r requirements.txt
# Install gcloud SDK as root and set permissions
# Install gcloud SDK as root
COPY . /app
COPY override/timeout.conf /etc/nginx/conf.d/timeout.conf
COPY gracefully_shutdown_celery.sh /home/nginx
RUN chmod +x /home/nginx/gracefully_shutdown_celery.sh
RUN mkdir -p /home/nginx/tmp/shutdown
RUN chmod +x /entrypoint.sh
WORKDIR /app
WORKDIR /app
USER nginx
RUN curl -sSL https://sdk.cloud.google.com | bash
ENV PATH /home/nginx/google-cloud-sdk/bin:/root/google-cloud-sdk/bin:$PATH
USER root
55 changes: 26 additions & 29 deletions materializationengine/blueprints/client/api2.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from materializationengine.blueprints.client.utils import (
create_query_response,
collect_crud_columns,
get_latest_version,
)
from materializationengine.blueprints.client.schemas import (
ComplexQuerySchema,
Expand Down Expand Up @@ -573,7 +574,9 @@ def get(self, datastack_name: str):
return versions, 200


@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>"
)
class DatastackVersion(Resource):
method_decorators = [
limit_by_category("fast_query"),
Expand Down Expand Up @@ -610,7 +613,7 @@ def get(self, datastack_name: str, version: int):


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/count"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/count"
)
class FrozenTableCount(Resource):
method_decorators = [
Expand Down Expand Up @@ -707,7 +710,9 @@ def get(self, datastack_name: str):
return schema.dump(response, many=True), 200


@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/tables")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/tables"
)
class FrozenTableVersions(Resource):
method_decorators = [
limit_by_category("fast_query"),
Expand Down Expand Up @@ -752,7 +757,7 @@ def get(self, datastack_name: str, version: int):


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/tables/metadata"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/tables/metadata"
)
class FrozenTablesMetadata(Resource):
method_decorators = [
Expand Down Expand Up @@ -811,7 +816,7 @@ def get(


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/metadata"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/metadata"
)
class FrozenTableMetadata(Resource):
method_decorators = [
Expand Down Expand Up @@ -868,7 +873,7 @@ def get(

@client_bp.expect(query_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/query"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/query"
)
class FrozenTableQuery(Resource):
method_decorators = [
Expand Down Expand Up @@ -1144,7 +1149,7 @@ def preprocess_view_dataframe(df, view_name, db_name, column_names):

@client_bp.expect(query_seg_prop_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/info"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/table/<string:table_name>/info"
)
class MatTableSegmentInfo(Resource):
method_decorators = [
Expand Down Expand Up @@ -1330,7 +1335,9 @@ def get(


@client_bp.expect(query_parser)
@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/query")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/query"
)
class FrozenQuery(Resource):
method_decorators = [
validate_datastack,
Expand Down Expand Up @@ -1623,7 +1630,9 @@ def post(self, datastack_name: str):


@client_bp.expect(query_parser)
@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/views")
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views"
)
class AvailableViews(Resource):
method_decorators = [
validate_datastack,
Expand Down Expand Up @@ -1668,7 +1677,7 @@ def get(

@client_bp.expect(query_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<version>/views/<string:view_name>/metadata"
"/datastack/<string:datastack_name>/version/<int(signed+True):version>/views/<string:view_name>/metadata"
)
class ViewMetadata(Resource):
method_decorators = [
Expand Down Expand Up @@ -1868,24 +1877,12 @@ def get(
datastack_name
)

if version == -1:
version = get_latest_version(datastack_name)
print(f"using version {version}")
mat_db_name = f"{datastack_name}__mat{version}"
if version == 0:
mat_db_name = f"{aligned_volume_name}"
elif version == -1:
mat_db_name = f"{aligned_volume_name}"
session = sqlalchemy_cache.get(mat_db_name)
# query the database for the latest valid version
response = (
session.query(AnalysisVersion)
.filter(AnalysisVersion.datastack == datastack_name)
.filter(AnalysisVersion.valid)
.order_by(AnalysisVersion.time_stamp.desc())
.first()
)
version = response.version
print(f"using version {version}")
mat_db_name = f"{datastack_name}__mat{version}"
else:
mat_db_name = f"{datastack_name}__mat{version}"

df, column_names, warnings = assemble_view_dataframe(
datastack_name, version, view_name, {}, {}
Expand Down Expand Up @@ -1920,7 +1917,7 @@ def get(

@client_bp.expect(query_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/views/<string:view_name>/query"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views/<string:view_name>/query"
)
class ViewQuery(Resource):
method_decorators = [
Expand Down Expand Up @@ -2040,7 +2037,7 @@ def get_table_schema(table):


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/views/<string:view_name>/schema"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views/<string:view_name>/schema"
)
class ViewSchema(Resource):
method_decorators = [
Expand Down Expand Up @@ -2086,7 +2083,7 @@ def get(


@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/views/schemas"
"/datastack/<string:datastack_name>/version/<int(signed=True):version>/views/schemas"
)
class ViewSchemas(Resource):
method_decorators = [
Expand Down
2 changes: 2 additions & 0 deletions materializationengine/blueprints/client/datastack.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def wrapper(*args, **kwargs):
AnalysisVersion.datastack == target_datastack
)
if target_version:
if target_version == 0:
return f(*args, **kwargs)
if target_version == -1:
return f(*args, **kwargs)
version_query = version_query.filter(
Expand Down
21 changes: 21 additions & 0 deletions materializationengine/blueprints/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from cloudfiles import compression
from io import BytesIO

from materializationengine.info_client import get_datastack_info
from materializationengine.database import sqlalchemy_cache
from dynamicannotationdb.models import AnalysisVersion


def collect_crud_columns(column_names):
crud_columns = []
Expand Down Expand Up @@ -59,6 +63,23 @@ def update_notice_text_warnings(ann_md, warnings, table_name):
return warnings


def get_latest_version(datastack_name):
aligned_volume_name = get_datastack_info(datastack_name)["aligned_volume"]["name"]
session = sqlalchemy_cache.get(aligned_volume_name)
# query the database for the latest valid version
response = (
session.query(AnalysisVersion)
.filter(AnalysisVersion.datastack == datastack_name)
.filter(AnalysisVersion.valid)
.order_by(AnalysisVersion.time_stamp.desc())
.first()
)
if response is None:
return None
else:
return response.version


def create_query_response(
df,
warnings,
Expand Down
141 changes: 140 additions & 1 deletion materializationengine/blueprints/materialize/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from dynamicannotationdb.models import AnalysisTable, Base
from flask import abort, current_app, request
from flask_accepts import accepts
from flask_restx import Namespace, Resource, inputs, reqparse
from flask_restx import Namespace, Resource, inputs, reqparse, fields
from materializationengine.blueprints.client.utils import get_latest_version
from materializationengine.blueprints.reset_auth import reset_auth
from materializationengine.database import (
create_session,
Expand All @@ -24,6 +25,9 @@
from sqlalchemy.engine.url import make_url
from sqlalchemy.exc import NoSuchTableError
from materializationengine.utils import check_write_permission
import os
import subprocess
import cloudfiles


from materializationengine.blueprints.materialize.schemas import (
Expand Down Expand Up @@ -396,6 +400,141 @@ def post(self, datastack_name: str):
return 200


response_model = mat_bp.model(
"Response",
{
"message": fields.String(description="Response message"),
"csv_path": fields.String(description="Path to csv file", required=False),
"header_path": fields.String(description="Path to header file", required=False),
},
)


@mat_bp.route(
"/materialize/run/dump_csv_table/datastack/<string:datastack_name>/version/<int(signed=True):version>/table_name/<string:table_name>/"
)
class DumpTableToBucketAsCSV(Resource):
@reset_auth
@auth_requires_admin
@mat_bp.doc("Take table or view and dump it to a bucket as csv", security="apikey")
@mat_bp.response(200, "Success", response_model)
@mat_bp.response(500, "Internal Server Error", response_model)
def post(self, datastack_name: str, version: int, table_name: str):
"""Dump table to bucket as csv
Args:
datastack_name (str): name of datastack from infoservice
version (int): version of datastack
table_name (str): name of table or view to dump
"""
mat_db_name = f"{datastack_name}__mat{version}"

# TODO: add validation of parameters
sql_instance_name = current_app.config.get("SQL_INSTANCE_NAME", None)
if not sql_instance_name:
return {"message": "SQL_INSTANCE_NAME not set in app config"}, 500

bucket = current_app.config.get("MATERIALIZATION_DUMP_BUCKET", None)
if not bucket:
return {"message": "MATERIALIZATION_DUMP_BUCKET not set in app config"}, 500

if version == -1:
version = get_latest_version(datastack_name)

cf = cloudfiles.CloudFiles(bucket)
filename = f"{datastack_name}/v{version}/{table_name}.csv.gz"

cloudpath = os.path.join(bucket, filename)
header_file = f"{datastack_name}/v{version}/{table_name}_header.csv"
header_cloudpath = os.path.join(bucket, header_file)

# check if the file already exists
if cf.exists(filename):
# return a flask respoonse 200 message that says that the file already exitss
return {
"message": "file already created",
"csv_path": cloudpath,
"header_path": header_cloudpath,
}, 200

else:
# run a gcloud command to activate the service account for gcloud
activate_command = [
"gcloud",
"auth",
"activate-service-account",
"--key-file",
os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
]
process = subprocess.Popen(
activate_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# run this command and capture the stdout and return code
return_code = process.returncode
if return_code != 0:
return {
"message": f"failed to activate service account using {activate_command}. Error: {stderr.decode()} stdout: {stdout.decode()}"
}, 500

header_command = [
"gcloud",
"sql",
"export",
"csv",
sql_instance_name,
header_cloudpath,
"--database",
mat_db_name,
"--query",
f"SELECT column_name, data_type from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME = '{table_name}'",
]
process = subprocess.Popen(
header_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# run this command and capture the stdout and return code
return_code = process.returncode
if return_code != 0:
return {
"message": f"header file failed to create using:\
{header_command}. Error: {stderr.decode()} stdout: {stdout.decode()}"
}, 500

# run a gcloud command to select * from table and write it to disk as a csv
export_command = [
"gcloud",
"sql",
"export",
"csv",
sql_instance_name,
cloudpath,
"--database",
mat_db_name,
"--async",
"--query",
f"SELECT * from {table_name}",
]

process = subprocess.Popen(
export_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
# run this command and capture the stdout and return code
return_code = process.returncode
if return_code != 0:
return {
"message": f"file failed to create using: {export_command}. Error: {stderr.decode()} stdout: {stdout.decode()}"
}, 500

else:
return {
"message": "file created sucessefully",
"csv_path": cloudpath,
"header_path": header_cloudpath,
}, 200


@mat_bp.route("/materialize/run/update_database/datastack/<string:datastack_name>")
class UpdateLiveDatabaseResource(Resource):
@reset_auth
Expand Down
Loading

0 comments on commit 41ee72d

Please sign in to comment.