Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import transactions by quarter, in chunks of a few hundred #212

Merged
merged 10 commits into from
Sep 23, 2024
31 changes: 9 additions & 22 deletions .github/workflows/etl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,25 @@ jobs:
-e DATABASE_URL=${{ secrets.DATABASE_URL }} \
app make import/candidates import/pacs import/candidate_filings import/pac_filings

import_2023:
import_transactions:
runs-on: ubuntu-latest
needs: import_filings
strategy:
matrix:
transaction_type: [CON, EXP]
year: [2023, 2024]
quarter: [1, 2, 3, 4]

steps:
- uses: actions/checkout@v3
with:
ref: "deploy"
- name: Import data for 2023
run: |
touch .env
docker compose -f docker-compose.etl.yml run --rm \
-e AWS_STORAGE_BUCKET_NAME=${{ secrets.AWS_STORAGE_BUCKET_NAME }} \
-e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
-e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
-e DATABASE_URL=${{ secrets.DATABASE_URL }} \
app make import/CON_2023 import/EXP_2023

import_2024:
runs-on: ubuntu-latest
needs: import_filings

steps:
- uses: actions/checkout@v3
with:
ref: "deploy"
- name: Import data for 2024
ref: "hcg/batch-it-up"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Change back to deploy before merge.

- name: Import transaction data
run: |
touch .env
docker compose -f docker-compose.etl.yml run --rm \
-e AWS_STORAGE_BUCKET_NAME=${{ secrets.AWS_STORAGE_BUCKET_NAME }} \
-e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
-e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
-e DATABASE_URL=${{ secrets.DATABASE_URL }} \
app make import/CON_2024 import/EXP_2024
app make import/${{ matrix.transaction_type }}_${{ matrix.quarter }}_${{ matrix.year }}
20 changes: 15 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
THIS_YEAR=$(shell date +"%Y")
NIGHTLY_YEARS=$(shell seq 2023 $(THIS_YEAR))
QUARTERLY_YEARS=$(shell seq 2020 $(THIS_YEAR))

define quarterly_target
$(foreach YEAR,$(1),$(patsubst %,import/$(2)_%_$(YEAR),1 2 3 4))
endef
Comment on lines +5 to +7
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a target for each year ($1), transaction type ($2), and quarter.


.PHONY : quarterly
quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2020 import/EXP_2020 import/CON_2021 import/EXP_2021 import/CON_2022 import/EXP_2022 import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024
quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings \
$(call quarterly_target,$(QUARTERLY_YEARS),CON) $(call quarterly_target,$(QUARTERLY_YEARS),EXP)
python manage.py make_search_index

.PHONY : nightly
nightly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024
nightly: import/candidates import/pacs import/candidate_filings import/pac_filings \
$(call quarterly_target,$(NIGHTLY_YEARS),CON) $(call quarterly_target,$(NIGHTLY_YEARS),EXP)
python manage.py make_search_index

import/% : _data/sorted/%.csv
.SECONDEXPANSION:
import/% : _data/sorted/$$(word 1, $$(subst _, , $$*))_$$(word 3, $$(subst _, , $$*)).csv
Comment on lines +19 to +20
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parse the transaction type and year out of a pattern like CON_1_2023. One transaction file covers the entire year, so we don't need to download it again for each quarterly import.

python manage.py import_transactions --transaction-type $(word 1, $(subst _, , $*)) \
--year $(word 2, $(subst _, , $*)) \
--quarters $(word 2, $(subst _, , $*)) \
--year $(word 3, $(subst _, , $*)) \
--file $<

import/pac_filings : _data/raw/pac_committee_filings.csv
Expand All @@ -30,7 +41,6 @@ _data/raw/%_committees.csv :
_data/raw/%_committee_filings.csv :
wget --no-check-certificate --no-use-server-timestamps -O $@ "https://openness-project-nmid.s3.amazonaws.com/$*_committee_filings.csv"


_data/sorted/%.csv : _data/raw/%.csv
xsv fixlengths $< | xsv sort -s OrgID,"Report Name","Start of Period","End of Period" > $@

Expand Down
149 changes: 120 additions & 29 deletions camp_fin/management/commands/import_transactions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import math
import re
from itertools import groupby

Expand All @@ -13,17 +14,34 @@


def filing_key(record):
start_date = parse_date(record["Start of Period"])
end_date = parse_date(record["End of Period"])

return (
record["OrgID"],
record["Report Name"],
start_date.year if start_date else None,
end_date.year if end_date else None,
parse_date(record["Start of Period"]),
parse_date(record["End of Period"]),
)


def get_quarter(date):
return math.ceil(date.month / 3.0)


def get_month_range(quarters):
quarter_to_month_range = {
1: (1, 3),
2: (4, 6),
3: (7, 9),
4: (10, 12),
}

months = []

for q in quarters:
months.extend(quarter_to_month_range[q])

return min(months), max(months)


class Command(BaseCommand):
help = """
Import data from the New Mexico Campaign Finance System:
Expand All @@ -39,12 +57,24 @@ def add_arguments(self, parser):
default="CON",
help="Type of transaction to import: CON, EXP (Default: CON)",
)
parser.add_argument(
"--quarters",
dest="quarters",
default="1,2,3,4",
help="Comma-separated list of quarters to import (Default: 1,2,3,4)",
)
parser.add_argument(
"--year",
dest="year",
default="2023",
help="Year to import (Default: 2023)",
)
parser.add_argument(
"--batch-size",
dest="batch_size",
default=500,
help="Number of transaction records to bulk create at once (Default: 500)",
)
parser.add_argument(
"--file",
dest="file",
Expand All @@ -53,38 +83,83 @@ def add_arguments(self, parser):
)

def handle(self, *args, **options):
if options["transaction_type"] not in ("EXP", "CON"):
transaction_type = options["transaction_type"]

if transaction_type not in ("EXP", "CON"):
raise ValueError("Transaction type must be one of: EXP, CON")

year = options["year"]

self.stdout.write(f"Loading data from {transaction_type}_{year}.csv")

quarters = {int(q) for q in options["quarters"].split(",")}
quarter_string = ", ".join(f"Q{q}" for q in quarters)

with open(options["file"]) as f:
if options["transaction_type"] == "CON":
self.import_contributions(f, year)
self.stdout.write(
f"Importing transactions from filing periods beginning in {quarter_string}"
)

if transaction_type == "CON":
self.import_contributions(f, quarters, year, options["batch_size"])

elif transaction_type == "EXP":
self.import_expenditures(f, quarters, year, options["batch_size"])

elif options["transaction_type"] == "EXP":
self.import_expenditures(f, year)
self.stdout.write(self.style.SUCCESS("Transactions imported!"))

self.stdout.write(
f"Totaling filings from periods beginning in {quarter_string}"
)
self.total_filings(quarters, year)
self.stdout.write(self.style.SUCCESS("Filings totaled!"))

self.total_filings(year)
call_command("aggregate_data")

def import_contributions(self, f, year):
def _records_by_filing(self, records, filing_quarters):
"""
Group records by filing, then filter for filings beginning in the specified
quarter/s. Note that, because transactions are organized by year, transactions
for one filing can appear across two files, if the reporting period begins in
one year and ends in the next. This approach will return filings beginning in
the specified quarter in *any* year, so that these split cases will be covered.
For example, consider a filing period starting in December 2023 and ending in
February 2024. Transactions would be split across the 2023 and 2024 files. To
get them all, you would run the Q4 import for both 2023 and 2024.
"""
return filter(
lambda x: get_quarter(x[0][2]) in filing_quarters,
groupby(tqdm(records), key=filing_key),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this tqdm is a bit confusing, as it is over all the records, not just the filtered records.

would something like

return groupby(tqdm(filter(records, lambda x: ...)), key=filing_key)

work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Done.

)

def _save_batch(self, batch):
"""
Contributions are represented by several different types of models. Sort
then group them by class, then save each group of records.
"""
for cls, cls_records in groupby(
sorted(batch, key=lambda x: str(type(x))), key=lambda x: type(x)
hancush marked this conversation as resolved.
Show resolved Hide resolved
):
yield cls.objects.bulk_create(cls_records)

def import_contributions(self, f, quarters, year, batch_size):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such a clean function.

reader = csv.DictReader(f)
batch = []

for filing_group, records in groupby(tqdm(reader), key=filing_key):
for _, records in self._records_by_filing(reader, quarters):
for i, record in enumerate(records):
if i == 0:
try:
filing = self._get_filing(record)
except ValueError:
break

# the contributions file are organized by the year
# of a transaction date not the date of the
# The contributions files are organized by the year
# of the transaction date, not the date of the
# filing, so transactions from the same filing can
# appear in multiple contribution files.
#
# we need to make sure we just clear out the
# We need to make sure we just clear out the
# contributions in a file that were purportedly made
# in a given year.
models.Loan.objects.filter(
Expand All @@ -105,17 +180,23 @@ def import_contributions(self, f, year):
record["Contribution Type"] in {"Loans Received", "Special Event"}
or "Contribution" in record["Contribution Type"]
):
self.make_contribution(record, contributor, filing).save()
contribution = self.make_contribution(record, contributor, filing)
batch.append(contribution)

else:
self.stderr.write(
f"Could not determine contribution type from record: {record['Contribution Type']}"
)

def import_expenditures(self, f, year):
if not len(batch) % batch_size:
self._save_batch(batch)
batch = []

def import_expenditures(self, f, quarters, year, batch_size):
reader = csv.DictReader(f)
batch = []

for filing_group, records in groupby(tqdm(reader), key=filing_key):
for _, records in self._records_by_filing(reader, quarters):
for i, record in enumerate(records):
if i == 0:
try:
Expand All @@ -129,7 +210,12 @@ def import_expenditures(self, f, year):
received_date__year=year,
).delete()

self.make_contribution(record, None, filing).save()
contribution = self.make_contribution(record, None, filing)
batch.append(contribution)

if not len(batch) % batch_size:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when doing modulo, i think it's better to have the form that

len(batch) % batch_size == 0

i think it's just a touch more explicit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it. Done.

self._save_batch(batch)
batch = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you also need to handle the case that you have iterated through all the records, but the batch isn't modulo the batch_size

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, thanks.


def make_contributor(self, record):
state, _ = models.State.objects.get_or_create(
Expand Down Expand Up @@ -268,7 +354,10 @@ def _get_filing(self, record):
"filing_period__initial_date",
"filing_period__end_date",
)
msg = f"{filings.count()} filings found for PAC {pac} from record {record}:\n{filing_meta}\n\nUsing most recent filing matching query..."
msg = (
f"{filings.count()} filings found for PAC {pac} from record "
f"{record}:\n{filing_meta}\n\nUsing most recent filing matching query..."
)
self.stderr.write(msg)

return filing
Expand Down Expand Up @@ -410,12 +499,16 @@ def make_contribution(self, record, contributor, filing):

return contribution

def total_filings(self, year):
for filing in models.Filing.objects.filter(
final=True,
filing_period__initial_date__year__lte=year,
filing_period__end_date__year__gte=year,
).iterator():
def total_filings(self, quarters, year):
start, end = get_month_range(quarters)

for filing in tqdm(
models.Filing.objects.filter(
final=True,
filing_period__initial_date__month__gte=start,
filing_period__initial_date__month__lte=end,
).iterator()
):
contributions = filing.contributions().aggregate(total=Sum("amount"))
expenditures = filing.expenditures().aggregate(total=Sum("amount"))
loans = filing.loans().aggregate(total=Sum("amount"))
Expand All @@ -425,5 +518,3 @@ def total_filings(self, year):
filing.total_loans = loans["total"] or 0

filing.save()

self.stdout.write(f"Totalled {filing}")
2 changes: 0 additions & 2 deletions camp_fin/tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "2.4"

services:
app:
# Don't restart the service when the command exits
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.etl.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
version: '2.4'

services:
app:
image: nmid
build: .
build: .
container_name: nmid-etl
environment:
DJANGO_SECRET_KEY: "etl secret key"
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
version: '2.4'

services:
app:
image: nmid
build: .
build: .
container_name: nmid
stdin_open: true
tty: true
Expand Down
Loading