From 2b8dd5e560bcdbe488d693bf5744c4ceacf8f841 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Wed, 18 Sep 2024 12:31:58 -0500 Subject: [PATCH 01/10] Add month filter to transaction import --- Makefile | 7 ++- .../commands/import_transactions.py | 55 ++++++++++++------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 281676c..48a86ec 100644 --- a/Makefile +++ b/Makefile @@ -7,9 +7,11 @@ quarterly: import/candidates import/pacs import/candidate_filings import/pac_fil nightly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024 python manage.py make_search_index -import/% : _data/sorted/%.csv +.SECONDEXPANSION: +import/% : _data/sorted/$$(word 1, $$(subst _, , $$*))_$$(word 3, $$(subst _, , $$*)).csv python manage.py import_transactions --transaction-type $(word 1, $(subst _, , $*)) \ - --year $(word 2, $(subst _, , $*)) \ + --months $(word 2, $(subst _, , $*)) \ + --year $(word 3, $(subst _, , $*)) \ --file $< import/pac_filings : _data/raw/pac_committee_filings.csv @@ -30,7 +32,6 @@ _data/raw/%_committees.csv : _data/raw/%_committee_filings.csv : wget --no-check-certificate --no-use-server-timestamps -O $@ "https://openness-project-nmid.s3.amazonaws.com/$*_committee_filings.csv" - _data/sorted/%.csv : _data/raw/%.csv xsv fixlengths $< | xsv sort -s OrgID,"Report Name","Start of Period","End of Period" > $@ diff --git a/camp_fin/management/commands/import_transactions.py b/camp_fin/management/commands/import_transactions.py index 2703d82..7705ab8 100644 --- a/camp_fin/management/commands/import_transactions.py +++ b/camp_fin/management/commands/import_transactions.py @@ -13,14 +13,11 @@ def filing_key(record): - start_date = parse_date(record["Start of Period"]) - end_date = parse_date(record["End of Period"]) - return ( record["OrgID"], record["Report Name"], - start_date.year if start_date else None, - end_date.year if end_date else None, + parse_date(record["Start of Period"]), + parse_date(record["End of Period"]), ) @@ -39,6 +36,12 @@ def add_arguments(self, parser): default="CON", help="Type of transaction to import: CON, EXP (Default: CON)", ) + parser.add_argument( + "--months", + dest="months", + default="1,2,3,4,5,6,7,8,9,10,11,12", + help="Comma-separated list of months to import (Default: 1,2,3,4,5,6,7,8,9,10,11,12)", + ) parser.add_argument( "--year", dest="year", @@ -57,21 +60,33 @@ def handle(self, *args, **options): raise ValueError("Transaction type must be one of: EXP, CON") year = options["year"] + months = [int(m) for m in options["months"].split(",")] with open(options["file"]) as f: - if options["transaction_type"] == "CON": - self.import_contributions(f, year) + for month in months: + self.stdout.write(f"Importing transactions from filing periods beginning {month}/{year}") + + if options["transaction_type"] == "CON": + self.import_contributions(f, month, year) - elif options["transaction_type"] == "EXP": - self.import_expenditures(f, year) + elif options["transaction_type"] == "EXP": + self.import_expenditures(f, month, year) + + self.stdout.write(self.style.SUCCESS("Transactions imported!")) + + self.stdout.write(f"Totaling filings from periods beginning {month}/{year}") + self.total_filings(month, year) + self.stdout.write(self.style.SUCCESS("Filings totaled!")) - self.total_filings(year) call_command("aggregate_data") - def import_contributions(self, f, year): + def import_contributions(self, f, month, year): reader = csv.DictReader(f) - for filing_group, records in groupby(tqdm(reader), key=filing_key): + for _, records in tqdm(filter( + lambda x: x[0][2].month == month, + groupby(reader, key=filing_key) + )): for i, record in enumerate(records): if i == 0: try: @@ -112,10 +127,13 @@ def import_contributions(self, f, year): f"Could not determine contribution type from record: {record['Contribution Type']}" ) - def import_expenditures(self, f, year): + def import_expenditures(self, f, month, year): reader = csv.DictReader(f) - for filing_group, records in groupby(tqdm(reader), key=filing_key): + for _, records in tqdm(filter( + lambda x: x[0][2].month == month, + groupby(reader, key=filing_key) + )): for i, record in enumerate(records): if i == 0: try: @@ -410,12 +428,13 @@ def make_contribution(self, record, contributor, filing): return contribution - def total_filings(self, year): - for filing in models.Filing.objects.filter( + def total_filings(self, month, year): + for filing in tqdm(models.Filing.objects.filter( final=True, + filing_period__initial_date__month=month, filing_period__initial_date__year__lte=year, filing_period__end_date__year__gte=year, - ).iterator(): + ).iterator()): contributions = filing.contributions().aggregate(total=Sum("amount")) expenditures = filing.expenditures().aggregate(total=Sum("amount")) loans = filing.loans().aggregate(total=Sum("amount")) @@ -425,5 +444,3 @@ def total_filings(self, year): filing.total_loans = loans["total"] or 0 filing.save() - - self.stdout.write(f"Totalled {filing}") From 17b9b272ecc8c91db213bed627000f522a7b2632 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Wed, 18 Sep 2024 15:32:56 -0500 Subject: [PATCH 02/10] Import by quarter rather than month --- Makefile | 15 ++++-- .../commands/import_transactions.py | 46 +++++++++++-------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/Makefile b/Makefile index 48a86ec..ff9baf5 100644 --- a/Makefile +++ b/Makefile @@ -1,16 +1,25 @@ +THIS_YEAR=$(shell date +"%Y") +NIGHTLY_YEARS=$(shell seq 2023 $(THIS_YEAR)) +QUARTERLY_YEARS=$(shell seq 2020 $(THIS_YEAR)) + +define quarterly_target + $(foreach YEAR,$(1),$(patsubst %,import/$(2)_%_$(YEAR),1 2 3 4)) +endef .PHONY : quarterly -quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2020 import/EXP_2020 import/CON_2021 import/EXP_2021 import/CON_2022 import/EXP_2022 import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024 +quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings \ + $(call quarterly_target,$(QUARTERLY_YEARS),CON) $(call quarterly_target,$(QUARTERLY_YEARS),EXP) python manage.py make_search_index .PHONY : nightly -nightly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024 +nightly: import/candidates import/pacs import/candidate_filings import/pac_filings \ + $(call quarterly_target,$(NIGHTLY_YEARS),CON) $(call quarterly_target,$(NIGHTLY_YEARS),EXP) python manage.py make_search_index .SECONDEXPANSION: import/% : _data/sorted/$$(word 1, $$(subst _, , $$*))_$$(word 3, $$(subst _, , $$*)).csv python manage.py import_transactions --transaction-type $(word 1, $(subst _, , $*)) \ - --months $(word 2, $(subst _, , $*)) \ + --quarters $(word 2, $(subst _, , $*)) \ --year $(word 3, $(subst _, , $*)) \ --file $< diff --git a/camp_fin/management/commands/import_transactions.py b/camp_fin/management/commands/import_transactions.py index 7705ab8..6bc1991 100644 --- a/camp_fin/management/commands/import_transactions.py +++ b/camp_fin/management/commands/import_transactions.py @@ -1,4 +1,5 @@ import csv +import math import re from itertools import groupby @@ -21,6 +22,10 @@ def filing_key(record): ) +def get_quarter(date): + return math.ceil(date.month/3.) + + class Command(BaseCommand): help = """ Import data from the New Mexico Campaign Finance System: @@ -37,10 +42,10 @@ def add_arguments(self, parser): help="Type of transaction to import: CON, EXP (Default: CON)", ) parser.add_argument( - "--months", - dest="months", - default="1,2,3,4,5,6,7,8,9,10,11,12", - help="Comma-separated list of months to import (Default: 1,2,3,4,5,6,7,8,9,10,11,12)", + "--quarters", + dest="quarters", + default="1,2,3,4", + help="Comma-separated list of months to import (Default: 1,2,3,4)", ) parser.add_argument( "--year", @@ -56,35 +61,40 @@ def add_arguments(self, parser): ) def handle(self, *args, **options): - if options["transaction_type"] not in ("EXP", "CON"): + transaction_type = options["transaction_type"] + + if transaction_type not in ("EXP", "CON"): raise ValueError("Transaction type must be one of: EXP, CON") year = options["year"] - months = [int(m) for m in options["months"].split(",")] + + self.stdout.write(f"Loading data from {transaction_type}_{year}.csv") + + quarters = [int(q) for q in options["quarters"].split(",")] with open(options["file"]) as f: - for month in months: - self.stdout.write(f"Importing transactions from filing periods beginning {month}/{year}") + for quarter in quarters: + self.stdout.write(f"Importing transactions from filing periods beginning in Q{quarter}") - if options["transaction_type"] == "CON": - self.import_contributions(f, month, year) + if transaction_type == "CON": + self.import_contributions(f, quarter, year) - elif options["transaction_type"] == "EXP": - self.import_expenditures(f, month, year) + elif transaction_type == "EXP": + self.import_expenditures(f, quarter, year) self.stdout.write(self.style.SUCCESS("Transactions imported!")) - self.stdout.write(f"Totaling filings from periods beginning {month}/{year}") - self.total_filings(month, year) + self.stdout.write(f"Totaling filings from periods beginning in Q{quarter}") + self.total_filings(quarter, year) self.stdout.write(self.style.SUCCESS("Filings totaled!")) call_command("aggregate_data") - def import_contributions(self, f, month, year): + def import_contributions(self, f, quarter, year): reader = csv.DictReader(f) for _, records in tqdm(filter( - lambda x: x[0][2].month == month, + lambda x: get_quarter(x[0][2]) == quarter, groupby(reader, key=filing_key) )): for i, record in enumerate(records): @@ -127,11 +137,11 @@ def import_contributions(self, f, month, year): f"Could not determine contribution type from record: {record['Contribution Type']}" ) - def import_expenditures(self, f, month, year): + def import_expenditures(self, f, quarter, year): reader = csv.DictReader(f) for _, records in tqdm(filter( - lambda x: x[0][2].month == month, + lambda x: get_quarter(x[0][2]) == quarter, groupby(reader, key=filing_key) )): for i, record in enumerate(records): From 9bf8ad3d5b0fe0e14d313e5bb36d6036ebdcb2db Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Thu, 19 Sep 2024 14:45:01 -0500 Subject: [PATCH 03/10] Batch transaction saves --- .../commands/import_transactions.py | 117 +++++++++++++----- 1 file changed, 87 insertions(+), 30 deletions(-) diff --git a/camp_fin/management/commands/import_transactions.py b/camp_fin/management/commands/import_transactions.py index 6bc1991..c5c5ae8 100644 --- a/camp_fin/management/commands/import_transactions.py +++ b/camp_fin/management/commands/import_transactions.py @@ -21,11 +21,26 @@ def filing_key(record): parse_date(record["End of Period"]), ) - def get_quarter(date): return math.ceil(date.month/3.) +def get_month_range(quarters): + quarter_to_month_range = { + 1: (1, 3), + 2: (4, 6), + 3: (7, 9), + 4: (10, 12), + } + + months = [] + + for q in quarters: + months.extend(quarter_to_month_range[q]) + + return min(months), max(months) + + class Command(BaseCommand): help = """ Import data from the New Mexico Campaign Finance System: @@ -45,7 +60,7 @@ def add_arguments(self, parser): "--quarters", dest="quarters", default="1,2,3,4", - help="Comma-separated list of months to import (Default: 1,2,3,4)", + help="Comma-separated list of quarters to import (Default: 1,2,3,4)", ) parser.add_argument( "--year", @@ -53,6 +68,12 @@ def add_arguments(self, parser): default="2023", help="Year to import (Default: 2023)", ) + parser.add_argument( + "--batch-size", + dest="batch_size", + default=500, + help="Number of transaction records to bulk create at once (Default: 500)" + ) parser.add_argument( "--file", dest="file", @@ -70,33 +91,58 @@ def handle(self, *args, **options): self.stdout.write(f"Loading data from {transaction_type}_{year}.csv") - quarters = [int(q) for q in options["quarters"].split(",")] + quarters = {int(q) for q in options["quarters"].split(",")} + quarter_string = ", ".join(f"Q{q}" for q in quarters) with open(options["file"]) as f: - for quarter in quarters: - self.stdout.write(f"Importing transactions from filing periods beginning in Q{quarter}") - - if transaction_type == "CON": - self.import_contributions(f, quarter, year) + self.stdout.write(f"Importing transactions from filing periods beginning in {quarter_string}") + + if transaction_type == "CON": + self.import_contributions(f, quarters, year, options["batch_size"]) - elif transaction_type == "EXP": - self.import_expenditures(f, quarter, year) + elif transaction_type == "EXP": + self.import_expenditures(f, quarters, year, options["batch_size"]) - self.stdout.write(self.style.SUCCESS("Transactions imported!")) + self.stdout.write(self.style.SUCCESS("Transactions imported!")) - self.stdout.write(f"Totaling filings from periods beginning in Q{quarter}") - self.total_filings(quarter, year) + self.stdout.write(f"Totaling filings from periods beginning in {quarter_string}") + self.total_filings(quarters, year) self.stdout.write(self.style.SUCCESS("Filings totaled!")) call_command("aggregate_data") - def import_contributions(self, f, quarter, year): + def _records_by_filing(self, records, filing_quarters): + """ + Group records by filing, then filter for filings beginning in the specified + quarter/s. Note that, because transactions are organized by year, transactions + for one filing can appear across two files, if the reporting period begins in + one year and ends in the next. This approach will return filings beginning in + the specified quarter in *any* year, so that these split cases will be covered. + For example, consider a filing period starting in December 2023 and ending in + February 2024. Transactions would be split across the 2023 and 2024 files. To + get them all, you would run the Q4 import for both 2023 and 2024. + """ + return filter( + lambda x: get_quarter(x[0][2]) in filing_quarters, + groupby(tqdm(records), key=filing_key) + ) + + def _save_batch(self, batch): + """ + Contributions are represented by several different types of models. Sort + then group them by class, then save each group of records. + """ + for cls, cls_records in groupby( + sorted(batch, key=lambda x: str(type(x))), + key=lambda x: type(x) + ): + yield cls.objects.bulk_create(cls_records) + + def import_contributions(self, f, quarters, year, batch_size): reader = csv.DictReader(f) + batch = [] - for _, records in tqdm(filter( - lambda x: get_quarter(x[0][2]) == quarter, - groupby(reader, key=filing_key) - )): + for _, records in self._records_by_filing(reader, quarters): for i, record in enumerate(records): if i == 0: try: @@ -104,12 +150,12 @@ def import_contributions(self, f, quarter, year): except ValueError: break - # the contributions file are organized by the year - # of a transaction date not the date of the + # The contributions files are organized by the year + # of the transaction date, not the date of the # filing, so transactions from the same filing can # appear in multiple contribution files. # - # we need to make sure we just clear out the + # We need to make sure we just clear out the # contributions in a file that were purportedly made # in a given year. models.Loan.objects.filter( @@ -130,20 +176,23 @@ def import_contributions(self, f, quarter, year): record["Contribution Type"] in {"Loans Received", "Special Event"} or "Contribution" in record["Contribution Type"] ): - self.make_contribution(record, contributor, filing).save() + contribution = self.make_contribution(record, contributor, filing) + batch.append(contribution) else: self.stderr.write( f"Could not determine contribution type from record: {record['Contribution Type']}" ) - def import_expenditures(self, f, quarter, year): + if not len(batch) % batch_size: + self._save_batch(batch) + batch = [] + + def import_expenditures(self, f, quarters, year, batch_size): reader = csv.DictReader(f) + batch = [] - for _, records in tqdm(filter( - lambda x: get_quarter(x[0][2]) == quarter, - groupby(reader, key=filing_key) - )): + for _, records in self._records_by_filing(reader, quarters): for i, record in enumerate(records): if i == 0: try: @@ -157,7 +206,12 @@ def import_expenditures(self, f, quarter, year): received_date__year=year, ).delete() - self.make_contribution(record, None, filing).save() + constribution = self.make_contribution(record, None, filing) + batch.append(contribution) + + if not len(batch) % batch_size: + self._save_batch(batch) + batch = [] def make_contributor(self, record): state, _ = models.State.objects.get_or_create( @@ -438,10 +492,13 @@ def make_contribution(self, record, contributor, filing): return contribution - def total_filings(self, month, year): + def total_filings(self, quarters, year): + start, end = get_month_range(quarters) + for filing in tqdm(models.Filing.objects.filter( final=True, - filing_period__initial_date__month=month, + filing_period__initial_date__month__gte=start, + filing_period__initial_date__month__lte=end, filing_period__initial_date__year__lte=year, filing_period__end_date__year__gte=year, ).iterator()): From 422d309d52dc96da78ec02c3a28983c78e735f66 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Thu, 19 Sep 2024 15:00:08 -0500 Subject: [PATCH 04/10] Don't filter filings to total by year --- camp_fin/management/commands/import_transactions.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/camp_fin/management/commands/import_transactions.py b/camp_fin/management/commands/import_transactions.py index c5c5ae8..ecbcc90 100644 --- a/camp_fin/management/commands/import_transactions.py +++ b/camp_fin/management/commands/import_transactions.py @@ -498,9 +498,7 @@ def total_filings(self, quarters, year): for filing in tqdm(models.Filing.objects.filter( final=True, filing_period__initial_date__month__gte=start, - filing_period__initial_date__month__lte=end, - filing_period__initial_date__year__lte=year, - filing_period__end_date__year__gte=year, + filing_period__initial_date__month__lte=end ).iterator()): contributions = filing.contributions().aggregate(total=Sum("amount")) expenditures = filing.expenditures().aggregate(total=Sum("amount")) From 45d6ab63e4fc705fa2caff1cb3f7c1b4fb0fde24 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Thu, 19 Sep 2024 15:04:35 -0500 Subject: [PATCH 05/10] Use a matrix strategy for imports --- .github/workflows/etl.yml | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/.github/workflows/etl.yml b/.github/workflows/etl.yml index 080a52e..8f2ca40 100644 --- a/.github/workflows/etl.yml +++ b/.github/workflows/etl.yml @@ -23,15 +23,20 @@ jobs: -e DATABASE_URL=${{ secrets.DATABASE_URL }} \ app make import/candidates import/pacs import/candidate_filings import/pac_filings - import_2023: + import_transactions: runs-on: ubuntu-latest needs: import_filings + strategy: + matrix: + transaction_type: [CON, EXP] + year: [2023, 2024] + quarter: [1, 2, 3, 4] steps: - uses: actions/checkout@v3 with: ref: "deploy" - - name: Import data for 2023 + - name: Import transaction data run: | touch .env docker compose -f docker-compose.etl.yml run --rm \ @@ -39,22 +44,4 @@ jobs: -e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \ -e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ -e DATABASE_URL=${{ secrets.DATABASE_URL }} \ - app make import/CON_2023 import/EXP_2023 - - import_2024: - runs-on: ubuntu-latest - needs: import_filings - - steps: - - uses: actions/checkout@v3 - with: - ref: "deploy" - - name: Import data for 2024 - run: | - touch .env - docker compose -f docker-compose.etl.yml run --rm \ - -e AWS_STORAGE_BUCKET_NAME=${{ secrets.AWS_STORAGE_BUCKET_NAME }} \ - -e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \ - -e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ - -e DATABASE_URL=${{ secrets.DATABASE_URL }} \ - app make import/CON_2024 import/EXP_2024 + app make import/${{ matrix.transaction_type }}_${{ matrix.quarter }}_${{ matrix.year }} From 2eb539d92e123ea9419a198e6963e91b012e1105 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Thu, 19 Sep 2024 15:12:35 -0500 Subject: [PATCH 06/10] Check out my branch --- .github/workflows/etl.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/etl.yml b/.github/workflows/etl.yml index 8f2ca40..68c1090 100644 --- a/.github/workflows/etl.yml +++ b/.github/workflows/etl.yml @@ -35,7 +35,7 @@ jobs: steps: - uses: actions/checkout@v3 with: - ref: "deploy" + ref: "hcg/batch-it-up" - name: Import transaction data run: | touch .env From 33f3c33336642a2e266cc98d5be7662ee7cbb950 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Thu, 19 Sep 2024 16:59:56 -0500 Subject: [PATCH 07/10] Spell the dang variable name correctly --- .../commands/import_transactions.py | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/camp_fin/management/commands/import_transactions.py b/camp_fin/management/commands/import_transactions.py index ecbcc90..b1292c8 100644 --- a/camp_fin/management/commands/import_transactions.py +++ b/camp_fin/management/commands/import_transactions.py @@ -21,8 +21,9 @@ def filing_key(record): parse_date(record["End of Period"]), ) + def get_quarter(date): - return math.ceil(date.month/3.) + return math.ceil(date.month / 3.0) def get_month_range(quarters): @@ -72,7 +73,7 @@ def add_arguments(self, parser): "--batch-size", dest="batch_size", default=500, - help="Number of transaction records to bulk create at once (Default: 500)" + help="Number of transaction records to bulk create at once (Default: 500)", ) parser.add_argument( "--file", @@ -82,7 +83,7 @@ def add_arguments(self, parser): ) def handle(self, *args, **options): - transaction_type = options["transaction_type"] + transaction_type = options["transaction_type"] if transaction_type not in ("EXP", "CON"): raise ValueError("Transaction type must be one of: EXP, CON") @@ -90,13 +91,15 @@ def handle(self, *args, **options): year = options["year"] self.stdout.write(f"Loading data from {transaction_type}_{year}.csv") - + quarters = {int(q) for q in options["quarters"].split(",")} quarter_string = ", ".join(f"Q{q}" for q in quarters) with open(options["file"]) as f: - self.stdout.write(f"Importing transactions from filing periods beginning in {quarter_string}") - + self.stdout.write( + f"Importing transactions from filing periods beginning in {quarter_string}" + ) + if transaction_type == "CON": self.import_contributions(f, quarters, year, options["batch_size"]) @@ -105,7 +108,9 @@ def handle(self, *args, **options): self.stdout.write(self.style.SUCCESS("Transactions imported!")) - self.stdout.write(f"Totaling filings from periods beginning in {quarter_string}") + self.stdout.write( + f"Totaling filings from periods beginning in {quarter_string}" + ) self.total_filings(quarters, year) self.stdout.write(self.style.SUCCESS("Filings totaled!")) @@ -123,8 +128,8 @@ def _records_by_filing(self, records, filing_quarters): get them all, you would run the Q4 import for both 2023 and 2024. """ return filter( - lambda x: get_quarter(x[0][2]) in filing_quarters, - groupby(tqdm(records), key=filing_key) + lambda x: get_quarter(x[0][2]) in filing_quarters, + groupby(tqdm(records), key=filing_key), ) def _save_batch(self, batch): @@ -133,8 +138,7 @@ def _save_batch(self, batch): then group them by class, then save each group of records. """ for cls, cls_records in groupby( - sorted(batch, key=lambda x: str(type(x))), - key=lambda x: type(x) + sorted(batch, key=lambda x: str(type(x))), key=lambda x: type(x) ): yield cls.objects.bulk_create(cls_records) @@ -185,7 +189,7 @@ def import_contributions(self, f, quarters, year, batch_size): ) if not len(batch) % batch_size: - self._save_batch(batch) + self._save_batch(batch) batch = [] def import_expenditures(self, f, quarters, year, batch_size): @@ -206,11 +210,11 @@ def import_expenditures(self, f, quarters, year, batch_size): received_date__year=year, ).delete() - constribution = self.make_contribution(record, None, filing) + contribution = self.make_contribution(record, None, filing) batch.append(contribution) if not len(batch) % batch_size: - self._save_batch(batch) + self._save_batch(batch) batch = [] def make_contributor(self, record): @@ -350,7 +354,10 @@ def _get_filing(self, record): "filing_period__initial_date", "filing_period__end_date", ) - msg = f"{filings.count()} filings found for PAC {pac} from record {record}:\n{filing_meta}\n\nUsing most recent filing matching query..." + msg = ( + f"{filings.count()} filings found for PAC {pac} from record " + f"{record}:\n{filing_meta}\n\nUsing most recent filing matching query..." + ) self.stderr.write(msg) return filing @@ -495,11 +502,13 @@ def make_contribution(self, record, contributor, filing): def total_filings(self, quarters, year): start, end = get_month_range(quarters) - for filing in tqdm(models.Filing.objects.filter( - final=True, - filing_period__initial_date__month__gte=start, - filing_period__initial_date__month__lte=end - ).iterator()): + for filing in tqdm( + models.Filing.objects.filter( + final=True, + filing_period__initial_date__month__gte=start, + filing_period__initial_date__month__lte=end, + ).iterator() + ): contributions = filing.contributions().aggregate(total=Sum("amount")) expenditures = filing.expenditures().aggregate(total=Sum("amount")) loans = filing.loans().aggregate(total=Sum("amount")) From c5645cefb230a660a299bafd5690fd33d7014eb7 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Thu, 19 Sep 2024 17:03:02 -0500 Subject: [PATCH 08/10] Remove version from docker-compose.yml --- docker-compose.etl.yml | 4 +--- docker-compose.yml | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/docker-compose.etl.yml b/docker-compose.etl.yml index 7936cf4..5c2c5cc 100644 --- a/docker-compose.etl.yml +++ b/docker-compose.etl.yml @@ -1,9 +1,7 @@ -version: '2.4' - services: app: image: nmid - build: . + build: . container_name: nmid-etl environment: DJANGO_SECRET_KEY: "etl secret key" diff --git a/docker-compose.yml b/docker-compose.yml index 6acb5d7..46040d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,7 @@ -version: '2.4' - services: app: image: nmid - build: . + build: . container_name: nmid stdin_open: true tty: true From c453cb554f05f6b1bdf8859bc0576516cff8f074 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Thu, 19 Sep 2024 17:04:39 -0500 Subject: [PATCH 09/10] Strike version from test Compose file --- camp_fin/tests/docker-compose.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/camp_fin/tests/docker-compose.yml b/camp_fin/tests/docker-compose.yml index e539747..1b9b771 100644 --- a/camp_fin/tests/docker-compose.yml +++ b/camp_fin/tests/docker-compose.yml @@ -1,5 +1,3 @@ -version: "2.4" - services: app: # Don't restart the service when the command exits From 0c189ca4d84a0d19f6ee43ef367f798ef30a40d5 Mon Sep 17 00:00:00 2001 From: Hannah Cushman Garland Date: Mon, 23 Sep 2024 13:26:34 -0500 Subject: [PATCH 10/10] Apply tqdm to filtered records, import last batch --- .../management/commands/import_transactions.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/camp_fin/management/commands/import_transactions.py b/camp_fin/management/commands/import_transactions.py index b1292c8..46c5773 100644 --- a/camp_fin/management/commands/import_transactions.py +++ b/camp_fin/management/commands/import_transactions.py @@ -22,7 +22,8 @@ def filing_key(record): ) -def get_quarter(date): +def get_quarter(date_str): + date = parse_date(date_str) return math.ceil(date.month / 3.0) @@ -127,10 +128,10 @@ def _records_by_filing(self, records, filing_quarters): February 2024. Transactions would be split across the 2023 and 2024 files. To get them all, you would run the Q4 import for both 2023 and 2024. """ - return filter( - lambda x: get_quarter(x[0][2]) in filing_quarters, - groupby(tqdm(records), key=filing_key), + records_in_quarters = filter( + lambda x: get_quarter(x["Start of Period"]) in filing_quarters, records ) + return groupby(tqdm(records_in_quarters), key=filing_key) def _save_batch(self, batch): """ @@ -188,10 +189,13 @@ def import_contributions(self, f, quarters, year, batch_size): f"Could not determine contribution type from record: {record['Contribution Type']}" ) - if not len(batch) % batch_size: + if len(batch) % batch_size == 0: self._save_batch(batch) batch = [] + if len(batch) > 0: + self._save_batch(batch) + def import_expenditures(self, f, quarters, year, batch_size): reader = csv.DictReader(f) batch = []