From f91cedee778f56c1bd093ad9fb861d13457d38f5 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 18 Sep 2024 10:18:21 +0200 Subject: [PATCH 1/3] Make `session.id == upload.id` (#713) --- helpers/parallel_upload_processing.py | 64 ---------- services/report/__init__.py | 13 +- services/report/raw_upload_processor.py | 113 ++++++++---------- services/report/tests/unit/test_process.py | 20 ++-- .../tests/unit/test_upload_processing_task.py | 32 ++--- tasks/tests/unit/test_upload_task.py | 15 ++- tasks/upload.py | 35 +----- tasks/upload_finisher.py | 53 ++++---- tasks/upload_processor.py | 42 +++---- 9 files changed, 123 insertions(+), 264 deletions(-) diff --git a/helpers/parallel_upload_processing.py b/helpers/parallel_upload_processing.py index 75a94ca4e..c8faaf797 100644 --- a/helpers/parallel_upload_processing.py +++ b/helpers/parallel_upload_processing.py @@ -1,68 +1,4 @@ -import copy - import sentry_sdk -from shared.utils.sessions import SessionType - -from database.models.reports import Upload - - -# Copied from shared/reports/resources.py Report.next_session_number() -def next_session_number(session_dict): - start_number = len(session_dict) - while start_number in session_dict or str(start_number) in session_dict: - start_number += 1 - return start_number - - -# Copied and cut down from worker/services/report/raw_upload_processor.py -# this version stripped out all the ATS label stuff -def _adjust_sessions( - original_sessions: dict, - to_merge_flags, - current_yaml, -): - session_ids_to_fully_delete = [] - flags_under_carryforward_rules = [ - f for f in to_merge_flags if current_yaml.flag_has_carryfoward(f) - ] - if flags_under_carryforward_rules: - for sess_id, curr_sess in original_sessions.items(): - if curr_sess.session_type == SessionType.carriedforward: - if curr_sess.flags: - if any( - f in flags_under_carryforward_rules for f in curr_sess.flags - ): - session_ids_to_fully_delete.append(sess_id) - if session_ids_to_fully_delete: - # delete sessions from dict - for id in session_ids_to_fully_delete: - original_sessions.pop(id, None) - return - - -def get_parallel_session_ids( - sessions, argument_list, db_session, report_service, commit_yaml -): - num_sessions = len(argument_list) - - mock_sessions = copy.deepcopy(sessions) # the sessions already in the report - get_parallel_session_ids = [] - - # iterate over all uploads, get the next session id, and adjust sessions (remove CFF logic) - for i in range(num_sessions): - next_session_id = next_session_number(mock_sessions) - - upload_pk = argument_list[i]["upload_pk"] - upload = db_session.query(Upload).filter_by(id_=upload_pk).first() - to_merge_session = report_service.build_session(upload) - flags = upload.flag_names - - mock_sessions[next_session_id] = to_merge_session - _adjust_sessions(mock_sessions, flags, commit_yaml) - - get_parallel_session_ids.append(next_session_id) - - return get_parallel_session_ids @sentry_sdk.trace diff --git a/services/report/__init__.py b/services/report/__init__.py index 951d4eeb3..7e9b7e5f4 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -202,7 +202,7 @@ def has_initialized_report(self, commit: Commit) -> bool: @sentry_sdk.trace def initialize_and_save_report( - self, commit: Commit, report_code: str = None + self, commit: Commit, report_code: str | None = None ) -> CommitReport: """ Initializes the commit report @@ -412,7 +412,7 @@ def build_sessions(self, commit: Commit) -> dict[int, Session]: Does not include CF sessions if there is also an upload session with the same flag name. """ - sessions = {} + sessions: dict[int, Session] = {} carryforward_sessions = {} uploaded_flags = set() @@ -873,7 +873,6 @@ def build_report_from_raw_content( report: Report, raw_report_info: RawReportInfo, upload: Upload, - parallel_idx=None, ) -> ProcessingResult: """ Processes an upload on top of an existing report `master` and returns @@ -893,6 +892,7 @@ def build_report_from_raw_content( reportid = upload.external_id session = Session( + id=upload.id, provider=service, build=build, job=job, @@ -919,7 +919,6 @@ def build_report_from_raw_content( reportid=reportid, commit_yaml=self.current_yaml.to_dict(), archive_url=archive_url, - in_parallel=parallel_idx is not None, ), ) result.error = ProcessingError( @@ -955,13 +954,11 @@ def build_report_from_raw_content( raw_report, flags, session, - upload=upload, - parallel_idx=parallel_idx, + upload, ) result.report = process_result.report log.info( - "Successfully processed report" - + (" (in parallel)" if parallel_idx is not None else ""), + "Successfully processed report", extra=dict( session=session.id, ci=f"{session.provider}:{session.build}:{session.job}", diff --git a/services/report/raw_upload_processor.py b/services/report/raw_upload_processor.py index d4eb36fdd..6a4dcd7be 100644 --- a/services/report/raw_upload_processor.py +++ b/services/report/raw_upload_processor.py @@ -22,23 +22,9 @@ log = logging.getLogger(__name__) -GLOBAL_LEVEL_LABEL = ( - SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label -) - - -# This is a lambda function to return different objects -def DEFAULT_LABEL_INDEX(): - return { - SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index: SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label - } - - -def invert_pattern(string: str) -> str: - if string.startswith("!"): - return string[1:] - else: - return "!%s" % string +DEFAULT_LABEL_INDEX = { + SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index: SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label +} @dataclass @@ -60,8 +46,7 @@ def process_raw_upload( raw_reports: ParsedRawReport, flags, session: Session, - upload: Upload = None, - parallel_idx=None, + upload: Upload | None = None, ) -> UploadProcessingResult: toc, env = None, None @@ -85,17 +70,6 @@ def process_raw_upload( else: ignored_file_lines = None - # Get a sessionid to merge into - # anything merged into the original_report - # will take on this sessionid - # But we don't actually merge yet in case the report is empty. - # This is done to avoid garbage sessions to build up in the report - # How can you be sure this will be the sessionid used when you actually merge it? Remember that this piece of code runs inside a lock u.u - if parallel_idx is not None: - sessionid = parallel_idx - else: - sessionid = report.next_session_number() - session.id = sessionid if env: session.env = dict([e.split("=", 1) for e in env.split("\n") if "=" in e]) @@ -103,6 +77,12 @@ def process_raw_upload( session.flags = flags skip_files = set() + # [javascript] check for both coverage.json and coverage/coverage.lcov + for report_file in raw_reports.get_uploaded_files(): + if report_file.filename == "coverage/coverage.json": + skip_files.add("coverage/coverage.lcov") + + temporary_report = Report() should_use_encoded_labels = ( upload @@ -110,15 +90,11 @@ def process_raw_upload( identifier=upload.report.commit.repository.repoid, default=False ) ) - # [javascript] check for both coverage.json and coverage/coverage.lcov - for report_file in raw_reports.get_uploaded_files(): - if report_file.filename == "coverage/coverage.json": - skip_files.add("coverage/coverage.lcov") - temporary_report = Report() if should_use_encoded_labels: # We initialize the labels_index (which defaults to {}) to force the special label # to always be index 0 - temporary_report.labels_index = DEFAULT_LABEL_INDEX() + temporary_report.labels_index = dict(DEFAULT_LABEL_INDEX) + joined = True for flag in flags or []: if read_yaml_field(commit_yaml, ("flags", flag, "joined")) is False: @@ -126,6 +102,7 @@ def process_raw_upload( "Customer is using joined=False feature", extra=dict(flag_used=flag) ) joined = False # TODO: ensure this works for parallel + # --------------- # Process reports # --------------- @@ -139,9 +116,10 @@ def process_raw_upload( path_fixer_to_use = path_fixer.get_relative_path_aware_pathfixer( current_filename ) + report_builder_to_use = ReportBuilder( commit_yaml, - sessionid, + session.id, ignored_lines, path_fixer_to_use, should_use_encoded_labels, @@ -152,7 +130,10 @@ def process_raw_upload( ) except ReportExpiredException as r: r.filename = current_filename + # FIXME: this will raise/abort processing *all* the files within an upload, + # even though maybe just one of those files is expired. raise + if report_from_file: if should_use_encoded_labels: # Copies the labels from report into temporary_report @@ -161,36 +142,23 @@ def process_raw_upload( temporary_report.merge(report_from_file, joined=True) path_fixer_to_use.log_abnormalities() - actual_path_fixes = { - after: before - for (after, before) in path_fixer.calculated_paths.items() - if after is not None - } - if len(actual_path_fixes) > 0: - log.info( - "Example path fixes for this raw upload", - extra={ - "fixes": list(itertools.islice(actual_path_fixes.items(), 10)), - "disable_default_pathfixes": path_fixer.should_disable_default_pathfixes, - }, - ) - _possibly_log_pathfixer_unusual_results(path_fixer, sessionid) + _possibly_log_pathfixer_unusual_results(path_fixer, session.id) + if not temporary_report: raise ReportEmptyError("No files found in report.") if ( should_use_encoded_labels - and temporary_report.labels_index == DEFAULT_LABEL_INDEX() + and temporary_report.labels_index == DEFAULT_LABEL_INDEX ): # This means that, even though this report _could_ use encoded labels, # none of the reports processed contributed any new labels to it. # So we assume there are no labels and just reset the _labels_index of temporary_report temporary_report.labels_index = None + # Now we actually add the session to the original_report # Because we know that the processing was successful - sessionid, session = report.add_session( - session, use_id_from_session=parallel_idx is not None - ) + _sessionid, session = report.add_session(session, use_id_from_session=True) # Adjust sessions removed carryforward sessions that are being replaced session_adjustment = _adjust_sessions( report, @@ -199,13 +167,14 @@ def process_raw_upload( current_yaml=commit_yaml, upload=upload, ) + report.merge(temporary_report, joined=joined) session.totals = temporary_report.totals return UploadProcessingResult(report=report, session_adjustment=session_adjustment) @sentry_sdk.trace -def make_sure_orginal_report_is_using_label_ids(original_report: Report) -> bool: +def make_sure_orginal_report_is_using_label_ids(original_report: Report): """Makes sure that the original_report (that was pulled from DB) has CoverageDatapoints that encode label_ids and not actual labels. """ @@ -215,12 +184,13 @@ def make_sure_orginal_report_is_using_label_ids(original_report: Report) -> bool } if original_report.labels_index is None: original_report.labels_index = {} + labels_index = original_report.labels_index if ( SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index - not in original_report.labels_index + not in labels_index ): - original_report.labels_index[ + labels_index[ SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index ] = SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label @@ -230,17 +200,17 @@ def possibly_translate_label(label_or_id: typing.Union[str, int]) -> int: if label_or_id in reverse_index_cache: return reverse_index_cache[label_or_id] # Search for label in the report index - for idx, label in original_report.labels_index.items(): + for idx, label in labels_index.items(): if label == label_or_id: reverse_index_cache[label] = idx return idx # Label is not present. Add to index. # Notice that this never picks index 0, that is reserved for the special label - new_index = max(original_report.labels_index.keys()) + 1 + new_index = max(labels_index.keys()) + 1 reverse_index_cache[label_or_id] = new_index # It's OK to update this here because it's inside the # UploadProcessing lock, so it's exclusive access - original_report.labels_index[new_index] = label_or_id + labels_index[new_index] = label_or_id return new_index for report_file in original_report: @@ -262,7 +232,7 @@ def make_sure_label_indexes_match( Uses the original_report as reference, and fixes the to_merge_report as needed it also extendes the original_report.labels_index with new labels as needed. """ - if to_merge_report.labels_index is None: + if to_merge_report.labels_index is None or original_report.labels_index is None: # The new report doesn't have labels to fix return @@ -335,6 +305,7 @@ def _adjust_sessions( commit_id = upload.report.commit_id if upload is None and to_partially_overwrite_flags: log.warning("Upload is None, but there are partial_overwrite_flags present") + if ( upload and USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID.check_value( @@ -345,6 +316,7 @@ def _adjust_sessions( # Make sure that the labels in the reports are in a good state to merge them make_sure_orginal_report_is_using_label_ids(original_report) make_sure_label_indexes_match(original_report, to_merge_report) + if to_fully_overwrite_flags or to_partially_overwrite_flags: for sess_id, curr_sess in original_report.sessions.items(): if curr_sess.session_type == SessionType.carriedforward: @@ -353,6 +325,7 @@ def _adjust_sessions( session_ids_to_fully_delete.append(sess_id) if any(f in to_partially_overwrite_flags for f in curr_sess.flags): session_ids_to_partially_delete.append(sess_id) + actually_fully_deleted_sessions = set() if session_ids_to_fully_delete: extra = dict( @@ -366,6 +339,7 @@ def _adjust_sessions( ) original_report.delete_multiple_sessions(session_ids_to_fully_delete) actually_fully_deleted_sessions.update(session_ids_to_fully_delete) + if session_ids_to_partially_delete: extra = dict( deleted_sessions=session_ids_to_partially_delete, @@ -387,6 +361,7 @@ def _adjust_sessions( ) actually_fully_deleted_sessions.add(s) original_report.delete_session(s) + return SessionAdjustmentResult( sorted(actually_fully_deleted_sessions), sorted(set(session_ids_to_partially_delete) - actually_fully_deleted_sessions), @@ -394,6 +369,20 @@ def _adjust_sessions( def _possibly_log_pathfixer_unusual_results(path_fixer: PathFixer, sessionid: int): + actual_path_fixes = { + after: before + for (after, before) in path_fixer.calculated_paths.items() + if after is not None + } + if len(actual_path_fixes) > 0: + log.info( + "Example path fixes for this raw upload", + extra={ + "fixes": list(itertools.islice(actual_path_fixes.items(), 10)), + "disable_default_pathfixes": path_fixer.should_disable_default_pathfixes, + }, + ) + if path_fixer.calculated_paths.get(None): ignored_files = sorted(path_fixer.calculated_paths.pop(None)) log.info( diff --git a/services/report/tests/unit/test_process.py b/services/report/tests/unit/test_process.py index 172cf8931..693358d11 100644 --- a/services/report/tests/unit/test_process.py +++ b/services/report/tests/unit/test_process.py @@ -76,7 +76,7 @@ def test_process_raw_upload(self, keys): report=master, raw_reports=parsed_report, flags=[], - session=Session(), + session=Session(id=3 if "M" in keys else 0), ) master = result.report @@ -203,7 +203,7 @@ def test_process_raw_upload_empty_report(self): raw_reports=LegacyReportParser().parse_raw_report_from_bytes( "\n".join(report_data).encode() ), - session=Session(flags=["fruits"]), + session=Session(id=1, flags=["fruits"]), flags=[], ) assert len(original_report.sessions) == 1 @@ -282,7 +282,7 @@ def test_none(self): Report(), LegacyReportParser().parse_raw_report_from_bytes(b""), [], - Session(), + Session(id=0), ) @@ -307,7 +307,7 @@ def test_fixes(self): reports.encode() ), flags=[], - session=Session(), + session=Session(id=0), ) report = result.report assert 2 not in report["file.go"], "2 never existed" @@ -347,7 +347,7 @@ def test_not_joined(self, mocker, flag, joined): b"a<<<<<< EOF" ), flags=[flag], - session=Session(), + session=Session(id=1), ) merge.assert_called_with(mocker.ANY, joined=joined) call_args, call_kwargs = merge.call_args @@ -363,7 +363,7 @@ def test_flags(self, flag): result = process.process_raw_upload( commit_yaml=UserYaml({"flags": {"docker": flag}}), report=Report(), - session=Session(), + session=Session(id=0), raw_reports=LegacyReportParser().parse_raw_report_from_bytes( b'{"coverage": {"tests/test.py": [null, 0], "folder/file.py": [null, 1]}}' ), @@ -380,7 +380,7 @@ def test_sessions(self): process.process_raw_upload( commit_yaml={}, report=report, - session=Session(), + session=Session(id=0), raw_reports=LegacyReportParser().parse_raw_report_from_bytes( b'{"coverage": {"tests/test.py": [null, 0], "folder/file.py": [null, 1]}}' ), @@ -389,7 +389,7 @@ def test_sessions(self): process.process_raw_upload( commit_yaml={}, report=report, - session=Session(), + session=Session(id=1), raw_reports=LegacyReportParser().parse_raw_report_from_bytes( b'{"coverage": {"tests/test.py": [null, 0], "folder/file.py": [null, 1]}}' ), @@ -914,7 +914,7 @@ def test_process_raw_upload_multiple_raw_reports(self, mocker): third_raw_report_result, ], ) - session = Session() + session = Session(id=1) result = process.process_raw_upload( UserYaml({}), original_report, @@ -1092,7 +1092,7 @@ def test_process_raw_upload_with_carryforwarded_flags(self): ), ], ) - session = Session(flags=upload_flags) + session = Session(id=2, flags=upload_flags) result = process.process_raw_upload( UserYaml( { diff --git a/tasks/tests/unit/test_upload_processing_task.py b/tasks/tests/unit/test_upload_processing_task.py index 727133f91..a8ff283e5 100644 --- a/tasks/tests/unit/test_upload_processing_task.py +++ b/tasks/tests/unit/test_upload_processing_task.py @@ -20,11 +20,7 @@ ) from rollouts import USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID from services.archive import ArchiveService -from services.report import ( - ProcessingError, - RawReportInfo, - ReportService, -) +from services.report import ProcessingError, RawReportInfo, ReportService from services.report.parser.legacy import LegacyReportParser from services.report.raw_upload_processor import ( SessionAdjustmentResult, @@ -133,7 +129,7 @@ def test_upload_processor_task_call( ], }, "sessions": { - "0": { + str(upload.id_): { "N": None, "a": url, "c": None, @@ -144,16 +140,12 @@ def test_upload_processor_task_call( "p": None, "t": [3, 24, 19, 5, 0, "79.16667", 0, 0, 0, 0, 0, 0, 0], "u": None, - "d": commit.report_json["sessions"]["0"]["d"], + "d": commit.report_json["sessions"][str(upload.id_)]["d"], "st": "uploaded", "se": {}, } }, } - assert ( - commit.report_json["sessions"]["0"] - == expected_generated_report["sessions"]["0"] - ) assert commit.report_json == expected_generated_report mocked_1.assert_called_with(commit.commitid, None) # mocked_3.send_task.assert_called_with( @@ -265,7 +257,7 @@ def test_upload_processor_task_call_should_delete( ], }, "sessions": { - "0": { + str(upload.id_): { "N": None, "a": url, "c": None, @@ -276,16 +268,12 @@ def test_upload_processor_task_call_should_delete( "p": None, "t": [3, 24, 19, 5, 0, "79.16667", 0, 0, 0, 0, 0, 0, 0], "u": None, - "d": commit.report_json["sessions"]["0"]["d"], + "d": commit.report_json["sessions"][str(upload.id_)]["d"], "st": "uploaded", "se": {}, } }, } - assert ( - commit.report_json["sessions"]["0"] - == expected_generated_report["sessions"]["0"] - ) assert commit.report_json == expected_generated_report mocked_1.assert_called_with(commit.commitid, None) # mocked_3.send_task.assert_called_with( @@ -385,7 +373,7 @@ def test_upload_processor_call_with_upload_obj( ], }, "sessions": { - "0": { + str(upload.id_): { "N": None, "a": url, "c": None, @@ -396,18 +384,12 @@ def test_upload_processor_call_with_upload_obj( "p": None, "t": [3, 24, 19, 5, 0, "79.16667", 0, 0, 0, 0, 0, 0, 0], "u": None, - "d": commit.report_json["sessions"]["0"]["d"], + "d": commit.report_json["sessions"][str(upload.id_)]["d"], "st": "uploaded", "se": {}, } }, } - assert ( - commit.report_json["files"]["awesome/__init__.py"] - == expected_generated_report["files"]["awesome/__init__.py"] - ) - assert commit.report_json["files"] == expected_generated_report["files"] - assert commit.report_json["sessions"] == expected_generated_report["sessions"] assert commit.report_json == expected_generated_report mocked_1.assert_called_with(commit.commitid, None) diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 2c062ebe7..eda92ad40 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -13,7 +13,6 @@ from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError from shared.torngit.gitlab import Gitlab from shared.utils.sessions import SessionType -from shared.yaml import UserYaml from database.enums import ReportType from database.models import Upload @@ -801,7 +800,7 @@ def test_upload_task_no_bot( mocked_1.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ {"build": "part1", "url": "url1", "upload_pk": mocker.ANY}, {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, @@ -857,7 +856,7 @@ def test_upload_task_bot_no_permissions( mocked_1.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ {"build": "part1", "url": "url1", "upload_pk": mocker.ANY}, {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, @@ -934,7 +933,7 @@ def test_upload_task_bot_unauthorized( mocked_schedule_task.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ {"build": "part1", "url": "url1", "upload_pk": first_session.id}, {"build": "part2", "url": "url2", "upload_pk": second_session.id}, @@ -1025,7 +1024,7 @@ def fail_if_try_to_create_upload(*args, **kwargs): mocked_schedule_task.assert_called_with( mocker.ANY, commit, - UserYaml({"codecov": {"max_report_age": "764y ago"}}), + {"codecov": {"max_report_age": "764y ago"}}, [ { "build": "part1", @@ -1145,7 +1144,7 @@ def test_normalize_upload_arguments( def test_schedule_task_with_one_task(self, dbsession, mocker): mocked_chain = mocker.patch("tasks.upload.chain") commit = CommitFactory.create() - commit_yaml = UserYaml({"codecov": {"max_report_age": "100y ago"}}) + commit_yaml = {"codecov": {"max_report_age": "100y ago"}} argument_dict = {"argument_dict": 1} argument_list = [argument_dict] dbsession.add(commit) @@ -1170,7 +1169,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): {}, repoid=commit.repoid, commitid=commit.commitid, - commit_yaml=commit_yaml.to_dict(), + commit_yaml=commit_yaml, arguments_list=argument_list, report_code=None, in_parallel=False, @@ -1180,7 +1179,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): kwargs={ "repoid": commit.repoid, "commitid": commit.commitid, - "commit_yaml": commit_yaml.to_dict(), + "commit_yaml": commit_yaml, "report_code": None, "in_parallel": False, _kwargs_key(UploadFlow): mocker.ANY, diff --git a/tasks/upload.py b/tasks/upload.py index 3b7c99392..abeb9c751 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -31,7 +31,6 @@ from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task -from helpers.parallel_upload_processing import get_parallel_session_ids from helpers.reports import delete_archive_setting from helpers.save_commit_error import save_commit_error from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO @@ -506,7 +505,7 @@ def run_impl_within_lock( scheduled_tasks = self.schedule_task( db_session, commit, - commit_yaml, + commit_yaml.to_dict(), argument_list, commit_report, upload_context, @@ -536,14 +535,12 @@ def schedule_task( self, db_session: Session, commit: Commit, - commit_yaml: UserYaml, + commit_yaml: dict, argument_list: list[dict], commit_report: CommitReport, upload_context: UploadContext, checkpoints: CheckpointLogger | None, ): - commit_yaml = commit_yaml.to_dict() - # Carryforward the parent BA report for the current commit's BA report when handling uploads # that's not bundle analysis type. self.possibly_carryforward_bundle_report( @@ -629,26 +626,6 @@ def _schedule_coverage_processing_task( if not do_parallel_processing: return serial_tasks.apply_async() - report_service = ReportService(commit_yaml) - sessions = report_service.build_sessions(commit=commit) - - original_session_ids = list(sessions.keys()) - parallel_session_ids = get_parallel_session_ids( - sessions, - argument_list, - db_session, - report_service, - UserYaml(commit_yaml), - ) - - log.info( - "Allocated the following session ids for parallel upload processing: " - + " ".join(str(id) for id in parallel_session_ids), - extra=upload_context.log_extra( - original_session_ids=original_session_ids, - ), - ) - parallel_processing_tasks = [ upload_processor_task.s( repoid=commit.repoid, @@ -656,13 +633,11 @@ def _schedule_coverage_processing_task( commit_yaml=commit_yaml, arguments_list=[arguments], report_code=commit_report.code, - parallel_idx=parallel_session_id, + parallel_idx=arguments["upload_pk"], in_parallel=True, is_final=False, ) - for arguments, parallel_session_id in zip( - argument_list, parallel_session_ids - ) + for arguments in argument_list ] finish_parallel_sig = upload_finisher_task.signature( @@ -746,7 +721,7 @@ def possibly_carryforward_bundle_report( self, commit: Commit, commit_report: CommitReport, - commit_yaml: UserYaml, + commit_yaml: dict, argument_list: List[Dict], ): """ diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 2c9372334..a639237b8 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -21,10 +21,11 @@ from app import celery_app from celery_config import notify_error_task_name from database.models import Commit, Pull +from database.models.core import Repository from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.checkpoint_logger.flows import UploadFlow -from helpers.metrics import KiB, MiB, metrics +from helpers.metrics import KiB, MiB from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.archive import ArchiveService, MinioEndpoints from services.comparison import get_or_create_comparison @@ -131,23 +132,14 @@ def run_impl( PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(identifier=repository.repoid) and in_parallel ): - actual_processing_results = { - "processings_so_far": [], - "parallel_incremental_result": [], + processing_results = { + "processings_so_far": [ + task["processings_so_far"][0] for task in processing_results + ], + "parallel_incremental_result": [ + task["parallel_incremental_result"] for task in processing_results + ], } - pr = None - - # need to transform processing_results produced by chord to get it into the - # same format as the processing_results produced from chain - for task in processing_results: - pr = task["processings_so_far"][0].get("pr") or pr - actual_processing_results["processings_so_far"].append( - task["processings_so_far"][0] - ) - actual_processing_results["parallel_incremental_result"].append( - task["parallel_incremental_result"] - ) - processing_results = actual_processing_results report_service = ReportService(commit_yaml) report = self.merge_incremental_reports( @@ -168,10 +160,9 @@ def run_impl( ), ) - with metrics.timer(f"{self.metrics_prefix}.save_parallel_report_results"): - parallel_paths = report_service.save_parallel_report_to_archive( - commit, report, report_code - ) + parallel_paths = report_service.save_parallel_report_to_archive( + commit, report, report_code + ) # now that we've built the report and stored it to GCS, we have what we need to # compare the results with the current upload pipeline. We end execution of the # finisher task here so that we don't cause any additional side-effects @@ -477,10 +468,10 @@ def invalidate_caches(self, redis_connection, commit: Commit): def merge_incremental_reports( self, commit_yaml: dict, - repository, + repository: Repository, commit: Commit, report_service: ReportService, - processing_results, + processing_results: dict, ): archive_service = report_service.get_archive_service(repository) repoid = repository.repoid @@ -547,12 +538,12 @@ def download_and_build_incremental_report(partial_report): "upload_pk": partial_report["upload_pk"], } - def merge_report(cumulative_report, obj): - incremental_report = obj["report"] + def merge_report(cumulative_report: Report, obj): + incremental_report: Report = obj["report"] parallel_idx = obj["parallel_idx"] if len(incremental_report.sessions) != 1: - log.warn( + log.warning( f"Incremental report does not have 1 session, it has {len(incremental_report.sessions)}", extra=dict( repoid=repoid, @@ -562,13 +553,11 @@ def merge_report(cumulative_report, obj): ), ) - sessionid = next(iter(incremental_report.sessions)) - incremental_report.sessions[sessionid].id = sessionid - - session_id, session = cumulative_report.add_session( - incremental_report.sessions[parallel_idx], use_id_from_session=True + session = incremental_report.sessions[parallel_idx] + session.id = parallel_idx + _session_id, session = cumulative_report.add_session( + session, use_id_from_session=True ) - session.id = session_id _adjust_sessions( cumulative_report, incremental_report, session, UserYaml(commit_yaml) diff --git a/tasks/upload_processor.py b/tasks/upload_processor.py index 00560ef43..8a2d87b8d 100644 --- a/tasks/upload_processor.py +++ b/tasks/upload_processor.py @@ -1,6 +1,5 @@ import logging import random -from copy import deepcopy import sentry_sdk from asgiref.sync import async_to_sync @@ -138,14 +137,23 @@ def run_impl( timeout=max(60 * 5, self.hard_time_limit_task), blocking_timeout=5, ): - actual_arguments_list = deepcopy(arguments_list) + log.info( + "Obtained upload processing lock, starting", + extra=dict( + repoid=repoid, + commit=commitid, + parent_task=self.request.parent_id, + report_code=report_code, + ), + ) + return self.process_impl_within_lock( db_session=db_session, previous_results=previous_results, repoid=repoid, commitid=commitid, commit_yaml=commit_yaml, - arguments_list=actual_arguments_list, + arguments_list=arguments_list, report_code=report_code, parallel_idx=parallel_idx, in_parallel=in_parallel, @@ -180,18 +188,7 @@ def process_impl_within_lock( in_parallel=False, is_final=False, ): - if in_parallel: - log.info( - "Obtained upload processing lock, starting", - extra=dict( - repoid=repoid, - commit=commitid, - parent_task=self.request.parent_id, - report_code=report_code, - ), - ) - - processings_so_far = previous_results.get("processings_so_far", []) + processings_so_far: list[dict] = previous_results.get("processings_so_far", []) n_processed = 0 n_failed = 0 @@ -245,11 +242,8 @@ def process_impl_within_lock( in_parallel=in_parallel, ), ) - individual_info = {"arguments": arguments.copy()} + individual_info = {"arguments": arguments} try: - arguments_commitid = arguments.pop("commit", None) - if arguments_commitid: - assert arguments_commitid == commit.commitid with metrics.timer( f"{self.metrics_prefix}.process_individual_report" ): @@ -260,7 +254,6 @@ def process_impl_within_lock( report, upload_obj, raw_report_info, - parallel_idx=parallel_idx, in_parallel=in_parallel, ) # NOTE: this is only used because test mocking messes with the return value here. @@ -356,15 +349,15 @@ def process_impl_within_lock( ), ) - processing_result = { + processing_results: dict = { "processings_so_far": processings_so_far, } if in_parallel: - processing_result["parallel_incremental_result"] = ( + processing_results["parallel_incremental_result"] = ( parallel_incremental_result ) - return processing_result + return processing_results except CeleryError: raise except Exception: @@ -383,11 +376,10 @@ def process_individual_report( report: Report, upload: Upload, raw_report_info: RawReportInfo, - parallel_idx=None, in_parallel=False, ) -> ProcessingResult: processing_result = report_service.build_report_from_raw_content( - report, raw_report_info, upload=upload, parallel_idx=parallel_idx + report, raw_report_info, upload=upload ) if ( processing_result.error is not None From 67be05283fb412724d2769f114d90ebdb9dfe459 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 18 Sep 2024 14:04:10 +0200 Subject: [PATCH 2/3] Try clamping an upload `order_number` to a smaller int The problem here is that `order_number` is a 32-bit signed int (effectively 31-bit), and duplicating the `reports_upload.id` into it does not work, as that value already exceeds 31-bits in production. This tries to work around that problem by arranging that number to occupy bits 12-28. --- helpers/reports.py | 19 +++++++++++++ services/report/__init__.py | 33 +++++++++------------- tasks/tests/integration/test_upload_e2e.py | 30 +++++++++++++++++--- tasks/upload.py | 4 +-- 4 files changed, 60 insertions(+), 26 deletions(-) diff --git a/helpers/reports.py b/helpers/reports.py index 718ee6fc1..6a960a2c2 100644 --- a/helpers/reports.py +++ b/helpers/reports.py @@ -18,3 +18,22 @@ def delete_archive_setting(commit_yaml: UserYaml | dict) -> bool: return not read_yaml_field( commit_yaml, ("codecov", "archive", "uploads"), _else=True ) + + +def session_id_from_upload_id(upload_id: int) -> int: + """ + Creates a unique `session_id` from an `upload_id`. + + The purpose of the `session_id` is to unique within one report. + Turning the `upload_id` into the `session_id` trivially achieves this, as + the `upload_id` is a unique database id. + + However in production, the `upload_id` exceeds 31 bits, which overflows + the datatype of the `session_id`, which is a signed 32-bit integer + (effectively 31-bits). + + We work around this problem by just masking off the high bits, essentially + wrapping around at that number, and shifting a bit to avoid collisions + with existing low session numbers. + """ + return (upload_id & (2**16 - 1)) << 12 diff --git a/services/report/__init__.py b/services/report/__init__.py index 7e9b7e5f4..a1ea65514 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -44,6 +44,7 @@ RepositoryWithoutValidBotError, ) from helpers.labels import get_labels_per_session +from helpers.reports import session_id_from_upload_id from helpers.telemetry import MetricContext from rollouts import ( CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER, @@ -150,6 +151,7 @@ def create_report_upload( Upload """ db_session = commit_report.get_db_session() + name = normalized_arguments.get("name") upload = Upload( external_id=normalized_arguments.get("reportid"), build_code=normalized_arguments.get("build"), @@ -157,11 +159,7 @@ def create_report_upload( env=None, report_id=commit_report.id_, job_code=normalized_arguments.get("job"), - name=( - normalized_arguments.get("name")[:100] - if normalized_arguments.get("name") - else None - ), + name=(name[:100] if name else None), provider=normalized_arguments.get("service"), state="started", storage_path=normalized_arguments.get("url"), @@ -431,9 +429,9 @@ def build_sessions(self, commit: Commit) -> dict[int, Session]: for upload in report_uploads: session = self.build_session(upload) if session.session_type == SessionType.carriedforward: - carryforward_sessions[upload.order_number] = session + carryforward_sessions[session.id] = session else: - sessions[upload.order_number] = session + sessions[session.id] = session uploaded_flags |= set(session.flags) for sid, session in carryforward_sessions.items(): @@ -883,24 +881,19 @@ def build_report_from_raw_content( """ commit = upload.report.commit flags = upload.flag_names - service = upload.provider - build_url = upload.build_url - build = upload.build_code - job = upload.job_code - name = upload.name archive_url = upload.storage_path reportid = upload.external_id session = Session( - id=upload.id, - provider=service, - build=build, - job=job, - name=name, + id=session_id_from_upload_id(upload.id), + provider=upload.provider, + build=upload.build_code, + job=upload.job_code, + name=upload.name, time=int(time()), flags=flags, archive=archive_url, - url=build_url, + url=upload.build_url, ) result = ProcessingResult(session=session) @@ -1063,7 +1056,7 @@ def update_upload_with_processing_result( error_params=error.params, ) db_session.add(error_obj) - db_session.flush() + db_session.flush() @sentry_sdk.trace def save_report(self, commit: Commit, report: Report, report_code=None): @@ -1190,7 +1183,6 @@ def save_full_report( ), ) db_session.add(upload) - db_session.flush() self._attach_flags_to_upload(upload, session.flags if session.flags else []) if session.totals is not None: upload_totals = UploadLevelTotals(upload_id=upload.id_) @@ -1198,6 +1190,7 @@ def save_full_report( upload_totals.update_from_totals( session.totals, precision=precision, rounding=rounding ) + db_session.flush() return res @sentry_sdk.trace diff --git a/tasks/tests/integration/test_upload_e2e.py b/tasks/tests/integration/test_upload_e2e.py index 359437631..dcbab9d0d 100644 --- a/tasks/tests/integration/test_upload_e2e.py +++ b/tasks/tests/integration/test_upload_e2e.py @@ -1,4 +1,5 @@ import json +import random from functools import partial from typing import Iterable from uuid import uuid4 @@ -27,14 +28,21 @@ def write_raw_upload( repoid: int, commitid: str, contents: bytes, -): + upload_id: int | None = None, +) -> dict: report_id = uuid4().hex written_path = archive_service.write_raw_upload(commitid, report_id, contents) - upload = json.dumps({"reportid": report_id, "url": written_path}) + + upload_json: dict = {"reportid": report_id, "url": written_path} + if upload_id is not None: + upload_json["upload_id"] = upload_id + upload = json.dumps(upload_json) redis_key = f"uploads/{repoid}/{commitid}" redis.lpush(redis_key, upload) + return upload_json + def lines(lines: Iterable[tuple[int, ReportLine]]) -> list[tuple[int, int]]: return list(((lineno, line.coverage) for lineno, line in lines)) @@ -184,7 +192,12 @@ def test_full_upload( commitid, ) - do_upload( + report_service = ReportService({}) + commit_report = report_service.initialize_and_save_report(commit) + + upload_id = 2**33 + int(random.random() * 2**15) + + first_upload_json = do_upload( b""" a.rs <<<<<< network @@ -192,8 +205,17 @@ def test_full_upload( SF:a.rs DA:1,1 end_of_record -""" +""", + upload_id, + ) + + first_upload = report_service.create_report_upload(first_upload_json, commit_report) + + # force the upload to have a really high ID: + dbsession.execute( + f"UPDATE reports_upload SET id={upload_id} WHERE id={first_upload.id}" ) + do_upload( b""" a.rs diff --git a/tasks/upload.py b/tasks/upload.py index abeb9c751..595e80695 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -31,7 +31,7 @@ from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task -from helpers.reports import delete_archive_setting +from helpers.reports import delete_archive_setting, session_id_from_upload_id from helpers.save_commit_error import save_commit_error from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.archive import ArchiveService @@ -633,7 +633,7 @@ def _schedule_coverage_processing_task( commit_yaml=commit_yaml, arguments_list=[arguments], report_code=commit_report.code, - parallel_idx=arguments["upload_pk"], + parallel_idx=session_id_from_upload_id(arguments["upload_pk"]), in_parallel=True, is_final=False, ) From cdf4bba90dda514b6bb2d1dbae06a3a3aec5e79e Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 18 Sep 2024 16:59:23 +0200 Subject: [PATCH 3/3] fix various foreign key column definitions --- database/models/reports.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/database/models/reports.py b/database/models/reports.py index b8017d0fd..bb9d602ec 100644 --- a/database/models/reports.py +++ b/database/models/reports.py @@ -74,8 +74,8 @@ class CommitReport(CodecovBaseModel, MixinBaseClass): uploadflagmembership = Table( "reports_uploadflagmembership", CodecovBaseModel.metadata, - Column("upload_id", types.Integer, ForeignKey("reports_upload.id")), - Column("flag_id", types.Integer, ForeignKey("reports_repositoryflag.id")), + Column("upload_id", types.BigInteger, ForeignKey("reports_upload.id")), + Column("flag_id", types.BigInteger, ForeignKey("reports_repositoryflag.id")), ) @@ -84,7 +84,7 @@ class ReportResults(MixinBaseClass, CodecovBaseModel): state = Column(types.Text) completed_at = Column(types.DateTime(timezone=True), nullable=True) result = Column(postgresql.JSON) - report_id = Column(types.Integer, ForeignKey("reports_commitreport.id")) + report_id = Column(types.BigInteger, ForeignKey("reports_commitreport.id")) report = relationship("CommitReport", foreign_keys=[report_id]) @@ -131,7 +131,7 @@ class UploadError(CodecovBaseModel, MixinBaseClass): class ReportDetails(CodecovBaseModel, MixinBaseClass): __tablename__ = "reports_reportdetails" - report_id = Column(types.Integer, ForeignKey("reports_commitreport.id")) + report_id = Column(types.BigInteger, ForeignKey("reports_commitreport.id")) report: CommitReport = relationship( "CommitReport", foreign_keys=[report_id], back_populates="details" ) @@ -218,13 +218,13 @@ class Meta: class ReportLevelTotals(CodecovBaseModel, AbstractTotals): __tablename__ = "reports_reportleveltotals" - report_id = Column(types.Integer, ForeignKey("reports_commitreport.id")) + report_id = Column(types.BigInteger, ForeignKey("reports_commitreport.id")) report = relationship("CommitReport", foreign_keys=[report_id]) class UploadLevelTotals(CodecovBaseModel, AbstractTotals): __tablename__ = "reports_uploadleveltotals" - upload_id = Column("upload_id", types.Integer, ForeignKey("reports_upload.id")) + upload_id = Column("upload_id", types.BigInteger, ForeignKey("reports_upload.id")) upload = relationship("Upload", foreign_keys=[upload_id]) @@ -234,7 +234,9 @@ class CompareFlag(MixinBaseClass, CodecovBaseModel): commit_comparison_id = Column( types.BigInteger, ForeignKey("compare_commitcomparison.id") ) - repositoryflag_id = Column(types.Integer, ForeignKey("reports_repositoryflag.id")) + repositoryflag_id = Column( + types.BigInteger, ForeignKey("reports_repositoryflag.id") + ) head_totals = Column(postgresql.JSON) base_totals = Column(postgresql.JSON) patch_totals = Column(postgresql.JSON) @@ -304,7 +306,7 @@ class TestInstance(CodecovBaseModel, MixinBaseClass): test = relationship(Test, backref=backref("testinstances")) duration_seconds = Column(types.Float, nullable=False) outcome = Column(types.String(100), nullable=False) - upload_id = Column(types.Integer, ForeignKey("reports_upload.id")) + upload_id = Column(types.BigInteger, ForeignKey("reports_upload.id")) upload = relationship("Upload", backref=backref("testinstances")) failure_message = Column(types.Text) branch = Column(types.Text, nullable=True) @@ -312,14 +314,14 @@ class TestInstance(CodecovBaseModel, MixinBaseClass): repoid = Column(types.Integer, nullable=True) reduced_error_id = Column( - types.Integer, ForeignKey("reports_reducederror.id"), nullable=True + types.BigInteger, ForeignKey("reports_reducederror.id"), nullable=True ) reduced_error = relationship("ReducedError", backref=backref("testinstances")) class TestResultReportTotals(CodecovBaseModel, MixinBaseClass): __tablename__ = "reports_testresultreporttotals" - report_id = Column(types.Integer, ForeignKey("reports_commitreport.id")) + report_id = Column(types.BigInteger, ForeignKey("reports_commitreport.id")) report = relationship("CommitReport", foreign_keys=[report_id]) passed = Column(types.Integer) skipped = Column(types.Integer) @@ -341,7 +343,7 @@ class Flake(CodecovBaseModel, MixinBaseClassNoExternalID): test = relationship(Test, backref=backref("flakes")) reduced_error_id = Column( - types.Integer, ForeignKey("reports_reducederror.id"), nullable=True + types.BigInteger, ForeignKey("reports_reducederror.id"), nullable=True ) reduced_error = relationship(ReducedError, backref=backref("flakes"))