diff --git a/.github/workflows/etl.yml b/.github/workflows/etl.yml index 080a52e..68c1090 100644 --- a/.github/workflows/etl.yml +++ b/.github/workflows/etl.yml @@ -23,33 +23,20 @@ jobs: -e DATABASE_URL=${{ secrets.DATABASE_URL }} \ app make import/candidates import/pacs import/candidate_filings import/pac_filings - import_2023: + import_transactions: runs-on: ubuntu-latest needs: import_filings + strategy: + matrix: + transaction_type: [CON, EXP] + year: [2023, 2024] + quarter: [1, 2, 3, 4] steps: - uses: actions/checkout@v3 with: - ref: "deploy" - - name: Import data for 2023 - run: | - touch .env - docker compose -f docker-compose.etl.yml run --rm \ - -e AWS_STORAGE_BUCKET_NAME=${{ secrets.AWS_STORAGE_BUCKET_NAME }} \ - -e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \ - -e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ - -e DATABASE_URL=${{ secrets.DATABASE_URL }} \ - app make import/CON_2023 import/EXP_2023 - - import_2024: - runs-on: ubuntu-latest - needs: import_filings - - steps: - - uses: actions/checkout@v3 - with: - ref: "deploy" - - name: Import data for 2024 + ref: "hcg/batch-it-up" + - name: Import transaction data run: | touch .env docker compose -f docker-compose.etl.yml run --rm \ @@ -57,4 +44,4 @@ jobs: -e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \ -e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ -e DATABASE_URL=${{ secrets.DATABASE_URL }} \ - app make import/CON_2024 import/EXP_2024 + app make import/${{ matrix.transaction_type }}_${{ matrix.quarter }}_${{ matrix.year }} diff --git a/Makefile b/Makefile index 281676c..ff9baf5 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,26 @@ +THIS_YEAR=$(shell date +"%Y") +NIGHTLY_YEARS=$(shell seq 2023 $(THIS_YEAR)) +QUARTERLY_YEARS=$(shell seq 2020 $(THIS_YEAR)) + +define quarterly_target + $(foreach YEAR,$(1),$(patsubst %,import/$(2)_%_$(YEAR),1 2 3 4)) +endef .PHONY : quarterly -quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2020 import/EXP_2020 import/CON_2021 import/EXP_2021 import/CON_2022 import/EXP_2022 import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024 +quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings \ + $(call quarterly_target,$(QUARTERLY_YEARS),CON) $(call quarterly_target,$(QUARTERLY_YEARS),EXP) python manage.py make_search_index .PHONY : nightly -nightly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024 +nightly: import/candidates import/pacs import/candidate_filings import/pac_filings \ + $(call quarterly_target,$(NIGHTLY_YEARS),CON) $(call quarterly_target,$(NIGHTLY_YEARS),EXP) python manage.py make_search_index -import/% : _data/sorted/%.csv +.SECONDEXPANSION: +import/% : _data/sorted/$$(word 1, $$(subst _, , $$*))_$$(word 3, $$(subst _, , $$*)).csv python manage.py import_transactions --transaction-type $(word 1, $(subst _, , $*)) \ - --year $(word 2, $(subst _, , $*)) \ + --quarters $(word 2, $(subst _, , $*)) \ + --year $(word 3, $(subst _, , $*)) \ --file $< import/pac_filings : _data/raw/pac_committee_filings.csv @@ -30,7 +41,6 @@ _data/raw/%_committees.csv : _data/raw/%_committee_filings.csv : wget --no-check-certificate --no-use-server-timestamps -O $@ "https://openness-project-nmid.s3.amazonaws.com/$*_committee_filings.csv" - _data/sorted/%.csv : _data/raw/%.csv xsv fixlengths $< | xsv sort -s OrgID,"Report Name","Start of Period","End of Period" > $@ diff --git a/camp_fin/management/commands/import_transactions.py b/camp_fin/management/commands/import_transactions.py index 2703d82..46c5773 100644 --- a/camp_fin/management/commands/import_transactions.py +++ b/camp_fin/management/commands/import_transactions.py @@ -1,4 +1,5 @@ import csv +import math import re from itertools import groupby @@ -13,17 +14,35 @@ def filing_key(record): - start_date = parse_date(record["Start of Period"]) - end_date = parse_date(record["End of Period"]) - return ( record["OrgID"], record["Report Name"], - start_date.year if start_date else None, - end_date.year if end_date else None, + parse_date(record["Start of Period"]), + parse_date(record["End of Period"]), ) +def get_quarter(date_str): + date = parse_date(date_str) + return math.ceil(date.month / 3.0) + + +def get_month_range(quarters): + quarter_to_month_range = { + 1: (1, 3), + 2: (4, 6), + 3: (7, 9), + 4: (10, 12), + } + + months = [] + + for q in quarters: + months.extend(quarter_to_month_range[q]) + + return min(months), max(months) + + class Command(BaseCommand): help = """ Import data from the New Mexico Campaign Finance System: @@ -39,12 +58,24 @@ def add_arguments(self, parser): default="CON", help="Type of transaction to import: CON, EXP (Default: CON)", ) + parser.add_argument( + "--quarters", + dest="quarters", + default="1,2,3,4", + help="Comma-separated list of quarters to import (Default: 1,2,3,4)", + ) parser.add_argument( "--year", dest="year", default="2023", help="Year to import (Default: 2023)", ) + parser.add_argument( + "--batch-size", + dest="batch_size", + default=500, + help="Number of transaction records to bulk create at once (Default: 500)", + ) parser.add_argument( "--file", dest="file", @@ -53,25 +84,70 @@ def add_arguments(self, parser): ) def handle(self, *args, **options): - if options["transaction_type"] not in ("EXP", "CON"): + transaction_type = options["transaction_type"] + + if transaction_type not in ("EXP", "CON"): raise ValueError("Transaction type must be one of: EXP, CON") year = options["year"] + self.stdout.write(f"Loading data from {transaction_type}_{year}.csv") + + quarters = {int(q) for q in options["quarters"].split(",")} + quarter_string = ", ".join(f"Q{q}" for q in quarters) + with open(options["file"]) as f: - if options["transaction_type"] == "CON": - self.import_contributions(f, year) + self.stdout.write( + f"Importing transactions from filing periods beginning in {quarter_string}" + ) + + if transaction_type == "CON": + self.import_contributions(f, quarters, year, options["batch_size"]) + + elif transaction_type == "EXP": + self.import_expenditures(f, quarters, year, options["batch_size"]) - elif options["transaction_type"] == "EXP": - self.import_expenditures(f, year) + self.stdout.write(self.style.SUCCESS("Transactions imported!")) + + self.stdout.write( + f"Totaling filings from periods beginning in {quarter_string}" + ) + self.total_filings(quarters, year) + self.stdout.write(self.style.SUCCESS("Filings totaled!")) - self.total_filings(year) call_command("aggregate_data") - def import_contributions(self, f, year): + def _records_by_filing(self, records, filing_quarters): + """ + Group records by filing, then filter for filings beginning in the specified + quarter/s. Note that, because transactions are organized by year, transactions + for one filing can appear across two files, if the reporting period begins in + one year and ends in the next. This approach will return filings beginning in + the specified quarter in *any* year, so that these split cases will be covered. + For example, consider a filing period starting in December 2023 and ending in + February 2024. Transactions would be split across the 2023 and 2024 files. To + get them all, you would run the Q4 import for both 2023 and 2024. + """ + records_in_quarters = filter( + lambda x: get_quarter(x["Start of Period"]) in filing_quarters, records + ) + return groupby(tqdm(records_in_quarters), key=filing_key) + + def _save_batch(self, batch): + """ + Contributions are represented by several different types of models. Sort + then group them by class, then save each group of records. + """ + for cls, cls_records in groupby( + sorted(batch, key=lambda x: str(type(x))), key=lambda x: type(x) + ): + yield cls.objects.bulk_create(cls_records) + + def import_contributions(self, f, quarters, year, batch_size): reader = csv.DictReader(f) + batch = [] - for filing_group, records in groupby(tqdm(reader), key=filing_key): + for _, records in self._records_by_filing(reader, quarters): for i, record in enumerate(records): if i == 0: try: @@ -79,12 +155,12 @@ def import_contributions(self, f, year): except ValueError: break - # the contributions file are organized by the year - # of a transaction date not the date of the + # The contributions files are organized by the year + # of the transaction date, not the date of the # filing, so transactions from the same filing can # appear in multiple contribution files. # - # we need to make sure we just clear out the + # We need to make sure we just clear out the # contributions in a file that were purportedly made # in a given year. models.Loan.objects.filter( @@ -105,17 +181,26 @@ def import_contributions(self, f, year): record["Contribution Type"] in {"Loans Received", "Special Event"} or "Contribution" in record["Contribution Type"] ): - self.make_contribution(record, contributor, filing).save() + contribution = self.make_contribution(record, contributor, filing) + batch.append(contribution) else: self.stderr.write( f"Could not determine contribution type from record: {record['Contribution Type']}" ) - def import_expenditures(self, f, year): + if len(batch) % batch_size == 0: + self._save_batch(batch) + batch = [] + + if len(batch) > 0: + self._save_batch(batch) + + def import_expenditures(self, f, quarters, year, batch_size): reader = csv.DictReader(f) + batch = [] - for filing_group, records in groupby(tqdm(reader), key=filing_key): + for _, records in self._records_by_filing(reader, quarters): for i, record in enumerate(records): if i == 0: try: @@ -129,7 +214,12 @@ def import_expenditures(self, f, year): received_date__year=year, ).delete() - self.make_contribution(record, None, filing).save() + contribution = self.make_contribution(record, None, filing) + batch.append(contribution) + + if not len(batch) % batch_size: + self._save_batch(batch) + batch = [] def make_contributor(self, record): state, _ = models.State.objects.get_or_create( @@ -268,7 +358,10 @@ def _get_filing(self, record): "filing_period__initial_date", "filing_period__end_date", ) - msg = f"{filings.count()} filings found for PAC {pac} from record {record}:\n{filing_meta}\n\nUsing most recent filing matching query..." + msg = ( + f"{filings.count()} filings found for PAC {pac} from record " + f"{record}:\n{filing_meta}\n\nUsing most recent filing matching query..." + ) self.stderr.write(msg) return filing @@ -410,12 +503,16 @@ def make_contribution(self, record, contributor, filing): return contribution - def total_filings(self, year): - for filing in models.Filing.objects.filter( - final=True, - filing_period__initial_date__year__lte=year, - filing_period__end_date__year__gte=year, - ).iterator(): + def total_filings(self, quarters, year): + start, end = get_month_range(quarters) + + for filing in tqdm( + models.Filing.objects.filter( + final=True, + filing_period__initial_date__month__gte=start, + filing_period__initial_date__month__lte=end, + ).iterator() + ): contributions = filing.contributions().aggregate(total=Sum("amount")) expenditures = filing.expenditures().aggregate(total=Sum("amount")) loans = filing.loans().aggregate(total=Sum("amount")) @@ -425,5 +522,3 @@ def total_filings(self, year): filing.total_loans = loans["total"] or 0 filing.save() - - self.stdout.write(f"Totalled {filing}") diff --git a/camp_fin/tests/docker-compose.yml b/camp_fin/tests/docker-compose.yml index e539747..1b9b771 100644 --- a/camp_fin/tests/docker-compose.yml +++ b/camp_fin/tests/docker-compose.yml @@ -1,5 +1,3 @@ -version: "2.4" - services: app: # Don't restart the service when the command exits diff --git a/docker-compose.etl.yml b/docker-compose.etl.yml index 7936cf4..5c2c5cc 100644 --- a/docker-compose.etl.yml +++ b/docker-compose.etl.yml @@ -1,9 +1,7 @@ -version: '2.4' - services: app: image: nmid - build: . + build: . container_name: nmid-etl environment: DJANGO_SECRET_KEY: "etl secret key" diff --git a/docker-compose.yml b/docker-compose.yml index 6acb5d7..46040d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,7 @@ -version: '2.4' - services: app: image: nmid - build: . + build: . container_name: nmid stdin_open: true tty: true