-
Notifications
You must be signed in to change notification settings - Fork 20
/
main.py
67 lines (54 loc) · 1.86 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""Import a json file into BigQuery."""
import logging
import os
import re
from google.cloud import bigquery
GCP_PROJECT = os.environ.get('GCP_PROJECT')
def bigqueryImport(data, context):
"""Import a json file into BigQuery."""
# get storage update data
bucketname = data['bucket']
filename = data['name']
timeCreated = data['timeCreated']
# check filename format - dataset_name/table_name.json
if not re.search('^[a-zA-Z_-]+/[a-zA-Z_-]+.json$', filename):
logging.error('Unrecognized filename format: %s' % (filename))
return
# parse filename
datasetname, tablename = filename.replace('.json', '').split('/')
table_id = '%s.%s.%s' % (GCP_PROJECT, datasetname, tablename)
# log the receipt of the file
uri = 'gs://%s/%s' % (bucketname, filename)
print('Received file "%s" at %s.' % (
uri,
timeCreated
))
# create bigquery client
client = bigquery.Client()
# get dataset reference
dataset_ref = client.dataset(datasetname)
# check if dataset exists, otherwise create
try:
client.get_dataset(dataset_ref)
except Exception:
logging.warn('Creating dataset: %s' % (datasetname))
client.create_dataset(dataset_ref)
# create a bigquery load job config
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.create_disposition = 'CREATE_IF_NEEDED',
job_config.source_format = 'NEWLINE_DELIMITED_JSON',
job_config.write_disposition = 'WRITE_TRUNCATE',
# create a bigquery load job
try:
load_job = client.load_table_from_uri(
uri,
table_id,
job_config=job_config,
)
print('Load job: %s [%s]' % (
load_job.job_id,
table_id
))
except Exception as e:
logging.error('Failed to create load job: %s' % (e))