From 0a62491c51fc1032a3a267b96c8bf68a96002088 Mon Sep 17 00:00:00 2001 From: Benjamin Capodanno Date: Thu, 18 Jun 2026 15:07:06 -0700 Subject: [PATCH 1/2] fix(vrs): stamp HGVS expressions on reverse-translated alleles and handle null gracefully - `translate_hgvs_to_variation` now attaches an `Expression` to each allele produced from a cis-phased or single HGVS string, mirroring the dcd_mapping authoritative-allele convention so `post_mapped` is self-describing without a separate round-trip. - `hgvs_from_vrs_allele` returns `None` instead of crashing when `expressions` is null or empty (valid for cis-phased block members), and `get_hgvs_from_post_mapped` propagates that as a `None` result. - `post_mapped` is now serialized with `exclude_none=True`, matching the mapper's output format. - Minor formatting clean-ups in reverse_translation.py (no logic change). --- src/mavedb/lib/variants.py | 37 +++++++++++-------- src/mavedb/lib/vrs_utils.py | 33 +++++++++++++++-- .../variant_processing/reverse_translation.py | 3 +- tests/lib/test_variants.py | 20 ++++++++++ .../test_reverse_translation.py | 3 +- 5 files changed, 74 insertions(+), 22 deletions(-) diff --git a/src/mavedb/lib/variants.py b/src/mavedb/lib/variants.py index a941006c..c2d5595f 100644 --- a/src/mavedb/lib/variants.py +++ b/src/mavedb/lib/variants.py @@ -10,20 +10,23 @@ HGVS_P_REGEX = re.compile(r"(^|:)p\.") -def hgvs_from_vrs_allele(allele: dict) -> str: +def hgvs_from_vrs_allele(allele: dict) -> Optional[str]: """ - Extract the HGVS notation from the VRS allele. + Extract the HGVS notation from the VRS allele, or None if it carries no expression. """ try: - # VRS 2.X - return allele["expressions"][0]["value"] + expressions = allele["expressions"] # VRS 2.X except KeyError: if "variation" in allele: raise ValueError("VRS 1.X format not supported.") # VRS 1.X. We don't want to allow this. - # return allele["variation"]["expressions"][0]["value"] - else: - raise KeyError("Invalid VRS allele structure. Expected 'expressions'.") + raise KeyError("Invalid VRS allele structure. Expected 'expressions'.") + + # A valid VRS allele may simply carry no HGVS expression (None or empty) — e.g. a member of a + # cis-phased block. That is "no HGVS", not a crash. + if not expressions: + return None + return expressions[0]["value"] def get_hgvs_from_post_mapped(post_mapped_vrs: Optional[Any], *, combine_cis: bool = False) -> Optional[str]: @@ -38,22 +41,24 @@ def get_hgvs_from_post_mapped(post_mapped_vrs: Optional[Any], *, combine_cis: bo if not post_mapped_vrs: return None - if post_mapped_vrs["type"] == "Haplotype": # type: ignore - variations_hgvs = [hgvs_from_vrs_allele(allele) for allele in post_mapped_vrs["members"]] - elif post_mapped_vrs["type"] == "CisPhasedBlock": # type: ignore - variations_hgvs = [hgvs_from_vrs_allele(allele) for allele in post_mapped_vrs["members"]] + if post_mapped_vrs["type"] in ("Haplotype", "CisPhasedBlock"): # type: ignore + members = post_mapped_vrs["members"] elif post_mapped_vrs["type"] == "Allele": # type: ignore - variations_hgvs = [hgvs_from_vrs_allele(post_mapped_vrs)] + members = [post_mapped_vrs] else: return None - if len(variations_hgvs) == 0: + member_hgvs = [hgvs_from_vrs_allele(allele) for allele in members] + + # No members, or a member carrying no HGVS expression — no single/combinable HGVS to return. + if not member_hgvs or any(h is None for h in member_hgvs): return None - if len(variations_hgvs) > 1: - return join_cis_phased_hgvs(variations_hgvs) if combine_cis else None + hgvs_values: list[str] = [h for h in member_hgvs if h is not None] + if len(hgvs_values) > 1: + return join_cis_phased_hgvs(hgvs_values) if combine_cis else None - return variations_hgvs[0] + return hgvs_values[0] def get_digest_from_post_mapped(post_mapped_vrs: Optional[Any]) -> Optional[str]: diff --git a/src/mavedb/lib/vrs_utils.py b/src/mavedb/lib/vrs_utils.py index 052c4b70..d5a6432e 100644 --- a/src/mavedb/lib/vrs_utils.py +++ b/src/mavedb/lib/vrs_utils.py @@ -17,14 +17,35 @@ from ga4gh.vrs.models import ( Allele, CisPhasedBlock, + Expression, LiteralSequenceExpression, ReferenceLengthExpression, SequenceLocation, + Syntax, ) from ga4gh.vrs.normalize import normalize from mavedb.lib.hgvs import split_cis_phased_hgvs +# HGVS type letter (``accession:g.``) → VRS Expression syntax. +_HGVS_SYNTAX_BY_TYPE = { + "g": Syntax.HGVS_G, + "c": Syntax.HGVS_C, + "p": Syntax.HGVS_P, + "n": Syntax.HGVS_N, + "m": Syntax.HGVS_M, + "r": Syntax.HGVS_R, +} + + +def _hgvs_syntax(hgvs: str) -> Syntax: + """Map an HGVS string to its VRS Expression syntax via the type letter after the accession.""" + _, _, rest = hgvs.partition(":") + try: + return _HGVS_SYNTAX_BY_TYPE[rest[:1]] + except KeyError: + raise ValueError(f"Cannot determine HGVS syntax for {hgvs!r}") + def translate_hgvs_to_vrs(hgvs: str, translator: AlleleTranslator) -> Allele: """Convert HGVS variation description to VRS object. @@ -81,10 +102,14 @@ def translate_hgvs_to_variation(hgvs: str, translator: AlleleTranslator) -> Alle :param translator: caller-owned AlleleTranslator reused across calls :return: an Allele for a single variant, or a CisPhasedBlock for a cis-phased set """ - members = [ - normalize_and_identify(translate_hgvs_to_vrs(component, translator), translator.data_proxy) - for component in split_cis_phased_hgvs(hgvs) - ] + members = [] + for component in split_cis_phased_hgvs(hgvs): + allele = normalize_and_identify(translate_hgvs_to_vrs(component, translator), translator.data_proxy) + # Stamp the source HGVS as the allele's expression so post_mapped is self-describing. + # Mirrors the mapper's authoritative alleles. + allele.expressions = [Expression(syntax=_hgvs_syntax(component), value=component)] + members.append(allele) + if len(members) == 1: return members[0] diff --git a/src/mavedb/worker/jobs/variant_processing/reverse_translation.py b/src/mavedb/worker/jobs/variant_processing/reverse_translation.py index 1bf918b0..07df529f 100644 --- a/src/mavedb/worker/jobs/variant_processing/reverse_translation.py +++ b/src/mavedb/worker/jobs/variant_processing/reverse_translation.py @@ -391,7 +391,8 @@ async def reverse_translate_variants_for_score_set( seen_digests.add(variation.id) draft_allele = AlleleDbModel( vrs_digest=variation.id, - post_mapped=variation.model_dump(), + # exclude_none mirrors the mapper's serialization. + post_mapped=variation.model_dump(exclude_none=True), level=level, **{hgvs_field: hgvs}, # type: ignore[arg-type] ) diff --git a/tests/lib/test_variants.py b/tests/lib/test_variants.py index 9cffaa79..8b25197c 100644 --- a/tests/lib/test_variants.py +++ b/tests/lib/test_variants.py @@ -83,6 +83,26 @@ def test_get_hgvs_from_post_mapped_invalid_structure(): get_hgvs_from_post_mapped({"invalid_key": "InvalidType"}) +def test_hgvs_from_vrs_allele_null_or_empty_expressions(): + # A VRS allele may carry `expressions: null` or `[]` — that is "no HGVS", not a crash. + assert hgvs_from_vrs_allele({"type": "Allele", "expressions": None}) is None + assert hgvs_from_vrs_allele({"type": "Allele", "expressions": []}) is None + + +def test_get_hgvs_from_post_mapped_member_without_expression(): + # Regression: a cis-phased block member whose `expressions` is null must yield None, not raise + # `TypeError: 'NoneType' object is not subscriptable` (which previously killed the CAR job). + block = { + "type": "CisPhasedBlock", + "members": [ + {"type": "Allele", "expressions": [{"value": "NM_003345:p.Asp5Phe"}]}, + {"type": "Allele", "expressions": None}, + ], + } + assert get_hgvs_from_post_mapped(block) is None + assert get_hgvs_from_post_mapped(block, combine_cis=True) is None + + ### Tests for get_digest_from_post_mapped function ### diff --git a/tests/worker/jobs/variant_processing/test_reverse_translation.py b/tests/worker/jobs/variant_processing/test_reverse_translation.py index ec926f26..970fa854 100644 --- a/tests/worker/jobs/variant_processing/test_reverse_translation.py +++ b/tests/worker/jobs/variant_processing/test_reverse_translation.py @@ -50,7 +50,8 @@ def __init__(self, vrs_id: str, vrs_type: str = "Allele"): self.id = vrs_id self.vrs_type = vrs_type - def model_dump(self) -> dict: + def model_dump(self, **kwargs) -> dict: + # Accept (and ignore) model_dump kwargs like exclude_none, mirroring the real VRS model. return {"type": self.vrs_type, "id": self.id} From a6980f516baf476321c0a6d36f8c46a7dae2ea13 Mon Sep 17 00:00:00 2001 From: Benjamin Capodanno Date: Thu, 18 Jun 2026 15:10:28 -0700 Subject: [PATCH 2/2] feat(clingen): migrate CAR/LDH jobs from MappedVariant to Allele data model - `submit_score_set_mappings_to_car` now operates on Allele rows (authoritative + RT-derived) rather than MappedVariant, deduplicating by allele_id so each VRS allele is registered exactly once regardless of how many variants share it. Adds `force_reregister` param and per-allele outcome counters. - `submit_score_set_mappings_to_ldh` queries MappingRecord + Allele for pre/post-mapped data instead of the deprecated MappedVariant join. - `construct_ldh_submission_entity` signature updated to accept MappingRecord and Allele separately, since those fields now live on different models. - `warm_clingen_cache` switched to the shared `get_alleles_for_score_set` helper to keep allele scope consistent across all three jobs. - Extracts `get_alleles_for_score_set` and `ScoreSetAlleleRow` into `lib/clingen/alleles.py` as the single canonical query for both CAR and cache jobs. --- src/mavedb/lib/clingen/alleles.py | 61 ++ .../lib/clingen/content_constructors.py | 26 +- src/mavedb/models/score_set.py | 2 +- .../worker/jobs/external_services/clingen.py | 481 +++++++---- .../jobs/external_services/clingen_cache.py | 26 +- .../lib/clingen/test_content_constructors.py | 27 +- tests/lib/conftest.py | 17 + .../jobs/external_services/test_clingen.py | 771 ++++++++++++------ .../external_services/test_clingen_cache.py | 185 +++-- 9 files changed, 1100 insertions(+), 496 deletions(-) create mode 100644 src/mavedb/lib/clingen/alleles.py diff --git a/src/mavedb/lib/clingen/alleles.py b/src/mavedb/lib/clingen/alleles.py new file mode 100644 index 00000000..52757223 --- /dev/null +++ b/src/mavedb/lib/clingen/alleles.py @@ -0,0 +1,61 @@ +"""Query helpers for fetching score-set alleles for ClinGen registration. + +Both submit_score_set_mappings_to_car and warm_clingen_cache use the same allele +scope: all current MappingRecordAllele links (authoritative and RT-derived) for a +score set. A single definition here prevents the two jobs from drifting apart. +""" + +from typing import NamedTuple + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from mavedb.models.allele import Allele +from mavedb.models.mapping_record import MappingRecord +from mavedb.models.mapping_record_allele import MappingRecordAllele +from mavedb.models.variant import Variant + + +class ScoreSetAlleleRow(NamedTuple): + """One (allele, variant) link for a score set. An allele shared by multiple variants + appears once per variant so callers can fan annotation statuses out correctly. + + ``is_authoritative`` is a property of the link, not the allele: the same VRS allele can be + the authoritative measurement for one variant and an RT-derived equivalence for another. + """ + + allele_id: int + post_mapped: dict | None + clingen_allele_id: str | None + variant_id: int + is_authoritative: bool + + +def get_alleles_for_score_set(db: Session, score_set_id: int) -> list[ScoreSetAlleleRow]: + """Return all current alleles for a score set with their linked variant IDs. + + Covers both authoritative mapper alleles and RT-derived equivalence alleles — + the full set that requires ClinGen registration before the annotation fan-out + can run. + + Only alleles with a non-null ``post_mapped`` are returned — variants that failed + or were benignly absent have no allele link and cannot receive a CAID. + """ + rows = db.execute( + select( + Allele.id, + Allele.post_mapped, + Allele.clingen_allele_id, + Variant.id.label("variant_id"), + MappingRecordAllele.is_authoritative, + ) + .join(MappingRecordAllele, MappingRecordAllele.allele_id == Allele.id) + .join(MappingRecord, MappingRecord.id == MappingRecordAllele.mapping_record_id) + .join(Variant, Variant.id == MappingRecord.variant_id) + .where(Variant.score_set_id == score_set_id) + .where(MappingRecord.current) + .where(MappingRecordAllele.current) + .where(Allele.post_mapped.is_not(None)) + ).all() + + return [ScoreSetAlleleRow(r.id, r.post_mapped, r.clingen_allele_id, r.variant_id, r.is_authoritative) for r in rows] diff --git a/src/mavedb/lib/clingen/content_constructors.py b/src/mavedb/lib/clingen/content_constructors.py index 1f55437f..ce62ebbb 100644 --- a/src/mavedb/lib/clingen/content_constructors.py +++ b/src/mavedb/lib/clingen/content_constructors.py @@ -1,5 +1,4 @@ from datetime import datetime -from typing import Optional from uuid import uuid4 from urllib.parse import quote_plus @@ -7,7 +6,8 @@ from mavedb.constants import MAVEDB_BASE_GIT, MAVEDB_FRONTEND_URL from mavedb.lib.types.clingen import LdhContentLinkedData, LdhContentSubject, LdhEvent, LdhSubmission from mavedb.lib.clingen.constants import LDH_ENTITY_NAME, LDH_SUBMISSION_TYPE -from mavedb.models.mapped_variant import MappedVariant +from mavedb.models.allele import Allele +from mavedb.models.mapping_record import MappingRecord from mavedb.models.variant import Variant @@ -32,8 +32,12 @@ def construct_ldh_submission_subject(hgvs: str) -> LdhContentSubject: return {"Variant": {"hgvs": hgvs}} -def construct_ldh_submission_entity(variant: Variant, mapped_variant: Optional[MappedVariant]) -> LdhContentLinkedData: - entity: LdhContentLinkedData = { +def construct_ldh_submission_entity( + variant: Variant, mapping_record: MappingRecord, allele: Allele +) -> LdhContentLinkedData: + # Pre-mapped data and the mapping API version live on the per-variant MappingRecord; + # post-mapped data lives on the (cross-variant deduped) Allele. + return { # TODO#372: We try to make all possible fields that are non-nullable represented that way. "MaveDBMapping": [ { @@ -41,27 +45,25 @@ def construct_ldh_submission_entity(variant: Variant, mapped_variant: Optional[M "mavedb_id": variant.urn, # type: ignore "score": variant.data["score_data"]["score"], # type: ignore "score_set_description": variant.score_set.short_description, # type: ignore + "pre_mapped": mapping_record.pre_mapped, + "post_mapped": allele.post_mapped, + "mapping_api_version": mapping_record.mapping_api_version, }, "entId": variant.urn, # type: ignore "entIri": f"{MAVEDB_FRONTEND_URL}/score-sets/{quote_plus(variant.score_set.urn)}?variant={quote_plus(variant.urn)}", # type: ignore } ] } - if mapped_variant is not None: - entity["MaveDBMapping"][0]["entContent"]["pre_mapped"] = mapped_variant.pre_mapped - entity["MaveDBMapping"][0]["entContent"]["post_mapped"] = mapped_variant.post_mapped - entity["MaveDBMapping"][0]["entContent"]["mapping_api_version"] = mapped_variant.mapping_api_version - return entity def construct_ldh_submission( - variant_content: list[tuple[str, Variant, Optional[MappedVariant]]], + variant_content: list[tuple[str, Variant, MappingRecord, Allele]], ) -> list[LdhSubmission]: content_submission: list[LdhSubmission] = [] - for hgvs, variant, mapped_variant in variant_content: + for hgvs, variant, mapping_record, allele in variant_content: subject = construct_ldh_submission_subject(hgvs) event = construct_ldh_submission_event(subject) - entity = construct_ldh_submission_entity(variant, mapped_variant) + entity = construct_ldh_submission_entity(variant, mapping_record, allele) content_submission.append( { diff --git a/src/mavedb/models/score_set.py b/src/mavedb/models/score_set.py index 5cf638df..b629dd89 100644 --- a/src/mavedb/models/score_set.py +++ b/src/mavedb/models/score_set.py @@ -74,7 +74,7 @@ class ScoreSet(Base): __tablename__ = "scoresets" - id = Column(Integer, primary_key=True) + id: Mapped[int] = Column(Integer, primary_key=True) # TODO(#372) urn = Column(String(64), default=generate_temp_urn, index=True, nullable=True, unique=True) diff --git a/src/mavedb/worker/jobs/external_services/clingen.py b/src/mavedb/worker/jobs/external_services/clingen.py index 5a65cc72..27aa1b96 100644 --- a/src/mavedb/worker/jobs/external_services/clingen.py +++ b/src/mavedb/worker/jobs/external_services/clingen.py @@ -3,7 +3,6 @@ This module contains jobs for submitting mapped variants to ClinGen services: - ClinGen Allele Registry (CAR) for allele registration - ClinGen Linked Data Hub (LDH) for data submission -- Variant linking and association management These jobs enable integration with the ClinGen ecosystem for clinical variant interpretation and data sharing. @@ -12,10 +11,12 @@ import asyncio import functools import logging +from dataclasses import dataclass, field from sqlalchemy import select from mavedb.lib.annotation_status_manager import AnnotationStatusManager +from mavedb.lib.clingen.alleles import get_alleles_for_score_set from mavedb.lib.clingen.constants import ( CAR_SUBMISSION_ENDPOINT, CLIN_GEN_SUBMISSION_ENABLED, @@ -29,9 +30,11 @@ ) from mavedb.lib.types.workflow import JobExecutionOutcome from mavedb.lib.variants import get_hgvs_from_post_mapped +from mavedb.models.allele import Allele as AlleleModel from mavedb.models.enums.annotation_type import AnnotationType from mavedb.models.enums.job_pipeline import AnnotationFailureCategory, AnnotationStatus, FailureCategory -from mavedb.models.mapped_variant import MappedVariant +from mavedb.models.mapping_record import MappingRecord +from mavedb.models.mapping_record_allele import MappingRecordAllele from mavedb.models.score_set import ScoreSet from mavedb.models.variant import Variant from mavedb.worker.jobs.utils.setup import validate_job_params @@ -41,6 +44,50 @@ logger = logging.getLogger(__name__) +@dataclass +class _AlleleEntry: + post_mapped: dict | None + existing_caid: str | None + # Variants for which THIS allele is the authoritative measurement — the only ones that receive a + # per-variant VAS row. INTERIM BANDAID (do not deploy as final): keying clingen's per-variant + # status to the single authoritative link sidesteps the multiple "current" rows a full allele + # fan-out would write for one variant. Durable fix is an allele-level event log; rationale and + # migration seam in docs/design/allele-annotation-status.md. + authoritative_variant_ids: list[int] = field(default_factory=list) + + +def _annotate_caid( + annotation_manager: AnnotationStatusManager, + variant_ids: list[int], + status: AnnotationStatus, + *, + failure_category: AnnotationFailureCategory | None = None, + error_message: str | None = None, + metadata: dict | None = None, +) -> None: + """Fan a CLINGEN_ALLELE_ID annotation out to every variant served by an allele. + + AAS migration seam: the single choke point for clingen's per-variant VAS writes. At migration it + becomes an allele-keyed event writer; the per-variant fan-out goes away, and the variant + association narrows to provenance (who caused the registration). See + docs/design/allele-annotation-status.md. + """ + annotation_data: dict = {"annotation_metadata": metadata or {}} + if error_message is not None: + annotation_data["error_message"] = error_message + + for variant_id in variant_ids: + annotation_manager.add_annotation( + variant_id=variant_id, + annotation_type=AnnotationType.CLINGEN_ALLELE_ID, + version=None, + status=status, + failure_category=failure_category, + annotation_data=annotation_data, + current=True, + ) + + @with_pipeline_management async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: JobManager) -> JobExecutionOutcome: """ @@ -58,11 +105,11 @@ async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: job_manager (JobManager): Manager for job lifecycle and DB operations Side Effects: - - Updates MappedVariant records with ClinGen Allele IDs + - Updates Allele records with ClinGen Allele IDs - Submits data to ClinGen Allele Registry Returns: - dict: Result indicating success and any exception details + JobExecutionOutcome: outcome with per-allele counts (submitted/registered/already-registered/failed). """ # Get the job definition we are working on job = job_manager.get_job() @@ -70,11 +117,10 @@ async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: _job_required_params = ["score_set_id", "correlation_id"] validate_job_params(_job_required_params, job) - # Fetch required resources based on param inputs. Safely ignore mypy warnings here, as they were checked above. score_set = job_manager.db.scalars(select(ScoreSet).where(ScoreSet.id == job.job_params["score_set_id"])).one() # type: ignore correlation_id = job.job_params["correlation_id"] # type: ignore + force_reregister = bool(job.job_params.get("force_reregister", False)) # type: ignore[union-attr] - # Setup initial context and progress job_manager.save_to_context( { "application": "mavedb-worker", @@ -86,7 +132,6 @@ async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: job_manager.update_progress(0, 100, "Starting CAR mapped resource submission.") logger.info(msg="Started CAR mapped resource submission", extra=job_manager.logging_context()) - # Ensure we've enabled ClinGen submission if not CLIN_GEN_SUBMISSION_ENABLED: logger.warning( msg="ClinGen submission is disabled via configuration, skipping submission of mapped variants to CAR.", @@ -95,7 +140,6 @@ async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: job_manager.db.flush() return JobExecutionOutcome.skipped(data={"reason": "ClinGen submission disabled"}) - # Check for CAR submission endpoint if not CAR_SUBMISSION_ENDPOINT: logger.warning( msg="ClinGen Allele Registry submission is disabled (no submission endpoint), unable to complete submission of mapped variants to CAR.", @@ -107,183 +151,300 @@ async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: failure_category=FailureCategory.CONFIGURATION_ERROR, ) - # Fetch mapped variants with post-mapped data for the score set - variant_post_mapped_objects = job_manager.db.execute( - select(MappedVariant.id, MappedVariant.post_mapped) - .join(Variant) - .join(ScoreSet) - .where(ScoreSet.urn == score_set.urn) - .where(MappedVariant.post_mapped.is_not(None)) - .where(MappedVariant.current.is_(True)) - ).all() - - # Track total variants to submit - job_manager.save_to_context({"total_variants_to_submit_car": len(variant_post_mapped_objects)}) - if not variant_post_mapped_objects: + allele_rows = get_alleles_for_score_set(job_manager.db, score_set.id) + job_manager.save_to_context({"total_allele_variant_pairs": len(allele_rows)}) + if not allele_rows: logger.warning( - msg="No current mapped variants with post mapped metadata were found for this score set. Skipping CAR submission.", + msg="No current alleles found for this score set. Skipping CAR submission.", extra=job_manager.logging_context(), ) job_manager.db.flush() - return JobExecutionOutcome.succeeded(data={"submitted_count": 0, "matched_count": 0}) + return JobExecutionOutcome.succeeded( + data={ + "submitted_allele_count": 0, + "registered_allele_count": 0, + "already_registered_allele_count": 0, + "failed_allele_count": 0, + } + ) - job_manager.update_progress( - 10, 100, f"Preparing {len(variant_post_mapped_objects)} mapped variants for CAR submission." - ) + # Group by allele_id: one allele may serve multiple variants (cross-score-set dedup). + allele_data: dict[int, _AlleleEntry] = {} + for row in allele_rows: + if row.allele_id not in allele_data: + allele_data[row.allele_id] = _AlleleEntry(post_mapped=row.post_mapped, existing_caid=row.clingen_allele_id) + + if row.is_authoritative: + allele_data[row.allele_id].authoritative_variant_ids.append(row.variant_id) - # Build HGVS strings for submission. Don't do duplicate submissions-- store mapped variant IDs by HGVS. - # Variants that can't produce an HGVS string are annotated as failures immediately. annotation_manager = AnnotationStatusManager(job_manager.db, job_run_id=job_manager.job_id) - variant_post_mapped_hgvs: dict[str, list[int]] = {} - no_hgvs_count = 0 - for mapped_variant_id, post_mapped in variant_post_mapped_objects: - # Intentionally not combine_cis=True: multi-variant cis-phased blocks have no single - # CAID, so they are skipped here pending ClinGen guidance on how to register them - # (https://github.com/VariantEffect/mavedb-api/issues/764). - hgvs_for_post_mapped = get_hgvs_from_post_mapped(post_mapped) - if not hgvs_for_post_mapped: - no_hgvs_count += 1 + # Track outcomes by distinct allele_id. clingen_allele_id is an allele-level fact (the CAID + # lives on the Allele) and CAR's operation is per-allele, so the reported counts are in allele + # units — and they cover every allele submitted, including the RT-derived ones that produce no + # per-variant status row. Each allele has exactly one outcome (submitted once → one response), so + # these sets are disjoint by construction. (Per-variant VAS rows are still written via the + # authoritative link below — that is the interim bandaid, separate from these operation counts.) + linked_allele_ids: set[int] = set() + preexisting_allele_ids: set[int] = set() + failed_allele_ids: set[int] = set() + + # Pre-existing CAIDs: record success without re-submitting unless force_reregister is set. + preexisting = [aid for aid, entry in allele_data.items() if entry.existing_caid] if not force_reregister else [] + for allele_id in preexisting: + entry = allele_data[allele_id] + + preexisting_allele_ids.add(allele_id) + _annotate_caid( + annotation_manager, + entry.authoritative_variant_ids, + AnnotationStatus.SUCCESS, + metadata={"clingen_allele_id": entry.existing_caid, "registration_source": "preexisting"}, + ) + + # Alleles that need CAR submission: new ones, or all when force_reregister=True. + pending_allele_ids = [aid for aid, entry in allele_data.items() if force_reregister or not entry.existing_caid] + job_manager.update_progress(10, 100, f"Preparing {len(pending_allele_ids)} alleles for CAR submission.") + + # Build HGVS → [allele_ids] map. Multi-variant cis-phased blocks produce no HGVS + # (combine_cis defaults to False); those alleles are annotated as failures immediately. + hgvs_to_allele_ids: dict[str, list[int]] = {} + for allele_id in pending_allele_ids: + entry = allele_data[allele_id] + hgvs = get_hgvs_from_post_mapped(entry.post_mapped) + + if hgvs: + hgvs_to_allele_ids.setdefault(hgvs, []).append(allele_id) + + # Allele is registered but post_mapped can no longer produce HGVS — data + # regression worth surfacing, but the CAID is still valid so treat it as + # preexisting rather than failing the variant. + elif entry.existing_caid: + preexisting_allele_ids.add(allele_id) logger.warning( - msg=f"Could not construct a valid HGVS string for mapped variant {mapped_variant_id}. Skipping submission of this variant.", + msg=( + f"Could not construct HGVS for allele {allele_id} during force re-registration " + f"(existing CAID: {entry.existing_caid!r}). Reconfirmation skipped; existing CAID retained." + ), extra=job_manager.logging_context(), ) + _annotate_caid( + annotation_manager, + entry.authoritative_variant_ids, + AnnotationStatus.SUCCESS, + metadata={"clingen_allele_id": entry.existing_caid, "registration_source": "reconfirmation_skipped"}, + ) - mapped_variant = job_manager.db.scalars( - select(MappedVariant).where(MappedVariant.id == mapped_variant_id) - ).one() - annotation_manager.add_annotation( - variant_id=mapped_variant.variant_id, # type: ignore - annotation_type=AnnotationType.CLINGEN_ALLELE_ID, - version=None, - status=AnnotationStatus.FAILED, + # No HGVS-- un-submittable. + else: + failed_allele_ids.add(allele_id) + logger.warning( + msg=f"Could not construct HGVS for allele {allele_id}. Skipping CAR submission.", + extra=job_manager.logging_context(), + ) + _annotate_caid( + annotation_manager, + entry.authoritative_variant_ids, + AnnotationStatus.FAILED, failure_category=AnnotationFailureCategory.MISSING_IDENTIFIER, - annotation_data={ - "error_message": "Could not extract a valid HGVS string from post-mapped variant data.", - "annotation_metadata": {}, - }, - current=True, + error_message="Could not extract a valid HGVS string from post-mapped allele data.", ) - continue - if hgvs_for_post_mapped in variant_post_mapped_hgvs: - variant_post_mapped_hgvs[hgvs_for_post_mapped].append(mapped_variant_id) - else: - variant_post_mapped_hgvs[hgvs_for_post_mapped] = [mapped_variant_id] + job_manager.save_to_context({"unique_hgvs_to_submit_car": len(hgvs_to_allele_ids)}) - job_manager.save_to_context({"unique_variants_to_submit_car": len(variant_post_mapped_hgvs)}) - job_manager.update_progress(15, 100, "Submitting mapped variants to CAR.") + # Distinct alleles actually sent to CAR this run (pending alleles that yielded HGVS). + submitted_allele_ids: set[int] = { + allele_id for allele_ids in hgvs_to_allele_ids.values() for allele_id in allele_ids + } + + def _outcome_data() -> dict[str, int]: + return { + "submitted_allele_count": len(submitted_allele_ids), + "registered_allele_count": len(linked_allele_ids), + "already_registered_allele_count": len(preexisting_allele_ids), + "failed_allele_count": len(failed_allele_ids), + } - # Do submission + # All pending alleles failed HGVS extraction; annotations already written above. + if not hgvs_to_allele_ids: + annotation_manager.flush() + job_manager.db.flush() + if preexisting_allele_ids: + return JobExecutionOutcome.succeeded(data=_outcome_data()) + + return JobExecutionOutcome.failed( + reason=f"No submittable alleles for score set {score_set.urn}.", + data=_outcome_data(), + failure_category=FailureCategory.DEPENDENCY_FAILURE, + ) + + job_manager.update_progress(15, 100, "Submitting alleles to CAR.") car_service = ClinGenAlleleRegistryService(url=CAR_SUBMISSION_ENDPOINT) - hgvs_list = list(variant_post_mapped_hgvs.keys()) + hgvs_list = list(hgvs_to_allele_ids.keys()) registered_alleles = car_service.dispatch_submissions(hgvs_list) job_manager.update_progress(60, 100, "Processing registered alleles from CAR.") - # CAR returns one response per submitted HGVS in the same order (see CAR API docs). - # Zip the submissions with the responses and annotate each based on success or error. - linked_count = 0 - error_count = 0 + # Bulk-load every allele that could be linked in one query, keyed by id, rather than + # issuing a SELECT per CAID inside the response loop. + alleles_by_id = { + allele.id: allele + for allele in job_manager.db.scalars(select(AlleleModel).where(AlleleModel.id.in_(submitted_allele_ids))).all() + } + + # CAR's contract is one result per submitted HGVS, in order — that is what makes the + # positional zip below valid. A different count (including the empty list dispatch returns + # on request failure) means alignment cannot be trusted for ANY position, so we register + # nothing and fail the whole batch rather than risk writing a CAID to the wrong allele. + aligned = len(registered_alleles) == len(hgvs_list) + if not aligned: + logger.error( + msg=( + f"CAR returned {len(registered_alleles)} results for {len(hgvs_list)} submitted HGVS; " + "positional alignment cannot be trusted. Failing the entire batch." + ), + extra=job_manager.logging_context(), + ) - for hgvs_string, response in zip(hgvs_list, registered_alleles): - mapped_variant_ids = variant_post_mapped_hgvs[hgvs_string] - mapped_variants = job_manager.db.scalars( - select(MappedVariant).where(MappedVariant.id.in_(mapped_variant_ids)) - ).all() + for hgvs_string, response in zip(hgvs_list, registered_alleles if aligned else []): + allele_ids_for_hgvs = hgvs_to_allele_ids[hgvs_string] if "errorType" in response: - error_count += 1 logger.warning( msg=f"CAR rejected HGVS '{hgvs_string}' ({response.get('errorType', 'unknown')}): {response.get('message', 'unknown')}", extra=job_manager.logging_context(), ) + for allele_id in allele_ids_for_hgvs: + failed_allele_ids.add(allele_id) + _annotate_caid( + annotation_manager, + allele_data[allele_id].authoritative_variant_ids, + AnnotationStatus.FAILED, + failure_category=AnnotationFailureCategory.EXTERNAL_SERVICE_REJECTED, + error_message="Failed to register allele with ClinGen Allele Registry.", + metadata={ + "submitted_hgvs": hgvs_string, + "car_error_type": response.get("errorType"), + "car_error_message": response.get("message"), + }, + ) + + continue + + # A response that is neither an error nor a registration (no "@id") is malformed. + caid_iri = response.get("@id") + if not caid_iri: + logger.error( + msg=f"CAR returned a response for HGVS '{hgvs_string}' with neither an error nor an allele identifier.", + extra=job_manager.logging_context(), + ) + for allele_id in allele_ids_for_hgvs: + failed_allele_ids.add(allele_id) + _annotate_caid( + annotation_manager, + allele_data[allele_id].authoritative_variant_ids, + AnnotationStatus.FAILED, + failure_category=AnnotationFailureCategory.EXTERNAL_SERVICE_REJECTED, + error_message="ClinGen Allele Registry returned a malformed response with no allele identifier.", + metadata={"submitted_hgvs": hgvs_string}, + ) + + continue - for mapped_variant in mapped_variants: - annotation_manager.add_annotation( - variant_id=mapped_variant.variant_id, # type: ignore - annotation_type=AnnotationType.CLINGEN_ALLELE_ID, - version=None, - status=AnnotationStatus.FAILED, + caid = caid_iri.split("/")[-1] + for allele_id in allele_ids_for_hgvs: + entry = allele_data[allele_id] + prior_caid = entry.existing_caid + + # CAID is immutable — a different value returned by CAR is a hard invariant + # violation. Do not overwrite; record a failure with full audit context. + if prior_caid and prior_caid != caid: + logger.error( + msg=( + f"CAR returned a different CAID for allele {allele_id}: " + f"stored={prior_caid!r}, returned={caid!r}. " + "Not overwriting. Investigate immediately." + ), + extra=job_manager.logging_context(), + ) + + failed_allele_ids.add(allele_id) + _annotate_caid( + annotation_manager, + entry.authoritative_variant_ids, + AnnotationStatus.FAILED, failure_category=AnnotationFailureCategory.EXTERNAL_SERVICE_REJECTED, - annotation_data={ - "error_message": "Failed to register variant with ClinGen Allele Registry.", - "annotation_metadata": { - "submitted_hgvs": hgvs_string, - "car_error_type": response.get("errorType"), - "car_error_message": response.get("message"), - }, + error_message="CAR returned a CAID that conflicts with the stored value.", + metadata={ + "clingen_allele_id": prior_caid, + "conflicting_caid": caid, + "submitted_hgvs": hgvs_string, }, - current=True, ) - else: - linked_count += 1 - caid = response["@id"].split("/")[-1] - for mapped_variant in mapped_variants: - mapped_variant.clingen_allele_id = caid - job_manager.db.add(mapped_variant) - - annotation_manager.add_annotation( - variant_id=mapped_variant.variant_id, # type: ignore - annotation_type=AnnotationType.CLINGEN_ALLELE_ID, - version=None, - status=AnnotationStatus.SUCCESS, - annotation_data={"annotation_metadata": {"clingen_allele_id": caid}}, - current=True, + # CAID is new or matches the stored value — link it to the allele and record success. + else: + linked_allele_ids.add(allele_id) + allele = alleles_by_id[allele_id] + allele.clingen_allele_id = caid + + registration_source = "reconfirmed" if prior_caid else "this_run" + if prior_caid: + logger.info( + msg=f"Force re-registration confirmed same CAID {caid!r} for allele {allele_id}.", + extra=job_manager.logging_context(), + ) + + _annotate_caid( + annotation_manager, + entry.authoritative_variant_ids, + AnnotationStatus.SUCCESS, + metadata={"clingen_allele_id": caid, "registration_source": registration_source}, ) - # Any HGVS strings CAR did not respond to (network drop, service-side omission). - # Use EXTERNAL_SERVICE_REJECTED for explicit CAR errors, EXTERNAL_API_ERROR for silent failures. - no_response_hgvs = hgvs_list[len(registered_alleles) :] - for hgvs_string in no_response_hgvs: - mapped_variants = job_manager.db.scalars( - select(MappedVariant).where(MappedVariant.id.in_(variant_post_mapped_hgvs[hgvs_string])) - ).all() - for mapped_variant in mapped_variants: - annotation_manager.add_annotation( - variant_id=mapped_variant.variant_id, # type: ignore - annotation_type=AnnotationType.CLINGEN_ALLELE_ID, - version=None, - status=AnnotationStatus.FAILED, + # Submitted HGVS with no trustworthy response: the truncated tail when the counts line up + # (network drop, service-side omission), or the entire batch when the response count + # violated CAR's one-result-per-input contract and we rejected it above. + unattributed_hgvs = hgvs_list if not aligned else hgvs_list[len(registered_alleles) :] + for hgvs_string in unattributed_hgvs: + for allele_id in hgvs_to_allele_ids[hgvs_string]: + failed_allele_ids.add(allele_id) + _annotate_caid( + annotation_manager, + allele_data[allele_id].authoritative_variant_ids, + AnnotationStatus.FAILED, failure_category=AnnotationFailureCategory.EXTERNAL_API_ERROR, - annotation_data={ - "error_message": "Failed to register variant with ClinGen Allele Registry.", - "annotation_metadata": {"submitted_hgvs": hgvs_string}, - }, - current=True, + error_message="Failed to register allele with ClinGen Allele Registry.", + metadata={"submitted_hgvs": hgvs_string}, ) annotation_manager.flush() - failed_count = no_hgvs_count + error_count + len(no_response_hgvs) + outcome_data = _outcome_data() - # When all registrations fail we will not be able to render any annotations. Fail the job - # to explicitly halt the pipeline. - if linked_count == 0: - error_message = f"CAR submission failed for all {len(hgvs_list)} variants in score set {score_set.urn}." + # When no allele ended up with a CAID (none linked this run, none already registered), the + # pipeline cannot continue — downstream jobs need CAIDs to function. + if not linked_allele_ids and not preexisting_allele_ids: + error_message = ( + f"CAR submission failed for all {outcome_data['submitted_allele_count']} " + f"submitted alleles in score set {score_set.urn}." + ) logger.error(msg=error_message, extra=job_manager.logging_context()) job_manager.db.flush() return JobExecutionOutcome.failed( reason=error_message, - data={"submitted_count": len(hgvs_list), "matched_count": 0, "failed_count": failed_count}, + data=outcome_data, failure_category=FailureCategory.DEPENDENCY_FAILURE, ) - if failed_count > 0: - # CAR rejections are typically per-variant data quality issues (e.g. invalid HGVS) rather than - # systemic failures. Per-variant AnnotationStatus.FAILED records are already written above for - # traceability. We continue the pipeline so that successfully registered variants still receive - # downstream annotations (warm_clingen_cache, gnomAD, ClinVar, HGVS, translations). + if outcome_data["failed_allele_count"] > 0: logger.warning( - msg=f"CAR submission failed for {failed_count} of {len(hgvs_list)} variants in score set {score_set.urn}.", + msg=f"CAR submission failed for {outcome_data['failed_allele_count']} alleles in score set {score_set.urn}.", extra=job_manager.logging_context(), ) logger.info(msg="Completed CAR mapped resource submission", extra=job_manager.logging_context()) job_manager.db.flush() - return JobExecutionOutcome.succeeded( - data={"submitted_count": len(hgvs_list), "matched_count": linked_count, "failed_count": failed_count} - ) + return JobExecutionOutcome.succeeded(data=outcome_data) @with_pipeline_management @@ -306,7 +467,7 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: - Submits data to ClinGen Linked Data Hub Returns: - dict: Result indicating success and any exception details + JobExecutionOutcome: outcome with per-variant submitted/failed counts. """ # Get the job definition we are working on job = job_manager.get_job() @@ -334,14 +495,21 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: ldh_service = ClinGenLdhService(url=LDH_SUBMISSION_ENDPOINT) ldh_service.authenticate() - # Fetch mapped variants with post-mapped data for the score set + # Fetch each variant's authoritative allele for the score set. Post-mapped data and HGVS + # come from the Allele; pre-mapped data and the mapping API version come from the + # MappingRecord. RT-derived equivalence alleles are intentionally excluded — LDH links each + # MaveDB score to its canonical mapped variant, not to every equivalent allele (unlike CAR, + # which registers a CAID per allele). variant_objects = job_manager.db.execute( - select(Variant, MappedVariant) - .join(MappedVariant) - .join(ScoreSet) - .where(ScoreSet.urn == score_set.urn) - .where(MappedVariant.post_mapped.is_not(None)) - .where(MappedVariant.current.is_(True)) + select(Variant, MappingRecord, AlleleModel) + .join(MappingRecord, MappingRecord.variant_id == Variant.id) + .join(MappingRecordAllele, MappingRecordAllele.mapping_record_id == MappingRecord.id) + .join(AlleleModel, AlleleModel.id == MappingRecordAllele.allele_id) + .where(Variant.score_set_id == score_set.id) + .where(MappingRecord.current) + .where(MappingRecordAllele.current) + .where(MappingRecordAllele.is_authoritative.is_(True)) + .where(AlleleModel.post_mapped.is_not(None)) ).all() # Track total variants to submit @@ -359,19 +527,19 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: # Build submission content variant_content = [] variant_for_urn = {} - for variant, mapped_variant in variant_objects: + for variant, mapping_record, allele in variant_objects: # See the note above: cis-phased blocks are skipped here pending ClinGen guidance # (https://github.com/VariantEffect/mavedb-api/issues/764). - variation = get_hgvs_from_post_mapped(mapped_variant.post_mapped) + variation = get_hgvs_from_post_mapped(allele.post_mapped) if not variation: logger.warning( - msg=f"Could not construct a valid HGVS string for mapped variant {mapped_variant.id}. Skipping submission of this variant.", + msg=f"Could not construct a valid HGVS string for allele {allele.id} (variant {variant.urn}). Skipping submission of this variant.", extra=job_manager.logging_context(), ) continue - variant_content.append((variation, variant, mapped_variant)) + variant_content.append((variation, variant, mapping_record, allele)) variant_for_urn[variant.urn] = variant if not variant_content: @@ -409,7 +577,15 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: ) submitted_urn = success["data"]["entId"] - submitted_variant = variant_for_urn[submitted_urn] + submitted_variant = variant_for_urn.get(submitted_urn) + if submitted_variant is None: + # LDH echoed back an entId we never submitted — record it for investigation rather + # than crashing the whole job mid-batch. + logger.warning( + msg=f"LDH returned an unrecognized entId not in this submission: {submitted_urn!r}.", + extra=job_manager.logging_context(), + ) + continue annotation_manager.add_annotation( variant_id=submitted_variant.id, @@ -427,7 +603,8 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: # especially when submission occurred in batch. Save all failures generically here. # Note that failures may not be present in the submission failures list, but they are # guaranteed to be absent from the successes list. - for failure_urn in set(variant_for_urn.keys()) - submitted_variant_urns: + failed_variant_urns = set(variant_for_urn.keys()) - submitted_variant_urns + for failure_urn in failed_variant_urns: logger.error( msg=f"Failed to submit mapped variant to LDH: {failure_urn}", extra=job_manager.logging_context(), @@ -449,9 +626,15 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: annotation_manager.flush() + # Report per-variant counts (matching the annotations written above), not the per-batch + # counts returned by the service — the two use different denominators. + submitted_count = len(submitted_variant_urns) + failed_count = len(failed_variant_urns) + if submission_failures: logger.warning( - msg=f"LDH mapped resource submission encountered {len(submission_failures)} failures.", + msg=f"LDH mapped resource submission encountered {len(submission_failures)} batch failures " + f"({failed_count} variants unconfirmed).", extra=job_manager.logging_context(), ) @@ -465,7 +648,7 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: job_manager.db.flush() return JobExecutionOutcome.failed( reason=error_message, - data={"submitted_count": 0, "failed_count": len(submission_failures)}, + data={"submitted_count": submitted_count, "failed_count": failed_count}, failure_category=FailureCategory.DEPENDENCY_FAILURE, ) @@ -475,6 +658,4 @@ async def submit_score_set_mappings_to_ldh(ctx: dict, job_id: int, job_manager: ) job_manager.db.flush() - return JobExecutionOutcome.succeeded( - data={"submitted_count": len(submission_successes), "failed_count": len(submission_failures)} - ) + return JobExecutionOutcome.succeeded(data={"submitted_count": submitted_count, "failed_count": failed_count}) diff --git a/src/mavedb/worker/jobs/external_services/clingen_cache.py b/src/mavedb/worker/jobs/external_services/clingen_cache.py index 10890f23..d530d881 100644 --- a/src/mavedb/worker/jobs/external_services/clingen_cache.py +++ b/src/mavedb/worker/jobs/external_services/clingen_cache.py @@ -15,11 +15,10 @@ from sqlalchemy import select from mavedb.lib.clingen.allele_registry import get_clingen_allele_data +from mavedb.lib.clingen.alleles import get_alleles_for_score_set from mavedb.lib.clingen.constants import CLINGEN_CACHE_WARMING_CONCURRENCY from mavedb.lib.types.workflow import JobExecutionOutcome -from mavedb.models.mapped_variant import MappedVariant from mavedb.models.score_set import ScoreSet -from mavedb.models.variant import Variant from mavedb.worker.jobs.utils.setup import validate_job_params from mavedb.worker.lib.decorators.pipeline_management import with_pipeline_management from mavedb.worker.lib.managers.job_manager import JobManager @@ -55,19 +54,16 @@ async def warm_clingen_cache(ctx: dict, job_id: int, job_manager: JobManager) -> job_manager.update_progress(0, 100, "Starting ClinGen cache pre-warming.") logger.info("Starting ClinGen cache pre-warming", extra=job_manager.logging_context()) - # Get distinct clingen_allele_ids for this score set's current mapped variants - allele_ids = job_manager.db.scalars( - select(MappedVariant.clingen_allele_id) - .join(Variant) - .where( - Variant.score_set_id == score_set.id, - MappedVariant.current.is_(True), - MappedVariant.clingen_allele_id.isnot(None), - # Exclude multi-variant IDs (comma-separated) — they can't be fetched individually - MappedVariant.clingen_allele_id.not_like("%,%"), - ) - .distinct() - ).all() + # Get distinct clingen_allele_ids registered for alleles in this score set. + # Exclude None and comma-separated multi-variant IDs that can't be fetched individually. + allele_rows = get_alleles_for_score_set(job_manager.db, score_set.id) + allele_ids = list( + { + row.clingen_allele_id + for row in allele_rows + if row.clingen_allele_id is not None and "," not in row.clingen_allele_id + } + ) total = len(allele_ids) job_manager.save_to_context({"total_allele_ids_to_warm": total}) diff --git a/tests/lib/clingen/test_content_constructors.py b/tests/lib/clingen/test_content_constructors.py index 7aab47f7..f691a449 100644 --- a/tests/lib/clingen/test_content_constructors.py +++ b/tests/lib/clingen/test_content_constructors.py @@ -11,7 +11,6 @@ ) from mavedb.lib.clingen.constants import LDH_ENTITY_NAME, LDH_SUBMISSION_TYPE from mavedb import __version__ -import pytest from tests.helpers.constants import ( TEST_HGVS_IDENTIFIER, @@ -54,10 +53,8 @@ def test_construct_ldh_submission_event(): } -@pytest.mark.parametrize("has_mapped_variant", [(True), (False)]) -def test_construct_ldh_submission_entity(mock_variant, mock_mapped_variant, has_mapped_variant: bool): - mapped_variant = mock_mapped_variant if has_mapped_variant else None - result = construct_ldh_submission_entity(mock_variant, mapped_variant) +def test_construct_ldh_submission_entity(mock_variant, mock_mapping_record, mock_allele): + result = construct_ldh_submission_entity(mock_variant, mock_mapping_record, mock_allele) assert "MaveDBMapping" in result assert len(result["MaveDBMapping"]) == 1 @@ -65,15 +62,9 @@ def test_construct_ldh_submission_entity(mock_variant, mock_mapped_variant, has_ assert mapping["entContent"]["mavedb_id"] == VALID_VARIANT_URN assert mapping["entContent"]["score"] == 1.0 - - if has_mapped_variant: - assert mapping["entContent"]["pre_mapped"] == TEST_VALID_PRE_MAPPED_VRS_ALLELE_VRS2_X - assert mapping["entContent"]["post_mapped"] == TEST_VALID_POST_MAPPED_VRS_ALLELE_VRS2_X - assert mapping["entContent"]["mapping_api_version"] == "pytest.mapping.1.0" - else: - assert "pre_mapped" not in mapping["entContent"] - assert "post_mapped" not in mapping["entContent"] - assert "mapping_api_version" not in mapping["entContent"] + assert mapping["entContent"]["pre_mapped"] == TEST_VALID_PRE_MAPPED_VRS_ALLELE_VRS2_X + assert mapping["entContent"]["post_mapped"] == TEST_VALID_POST_MAPPED_VRS_ALLELE_VRS2_X + assert mapping["entContent"]["mapping_api_version"] == "pytest.mapping.1.0" assert mapping["entId"] == VALID_VARIANT_URN assert ( @@ -82,12 +73,10 @@ def test_construct_ldh_submission_entity(mock_variant, mock_mapped_variant, has_ ) -@pytest.mark.parametrize("has_mapped_variant", [(True), (False)]) -def test_construct_ldh_submission(mock_variant, mock_mapped_variant, has_mapped_variant: bool): - mapped_variant = mock_mapped_variant if has_mapped_variant else None +def test_construct_ldh_submission(mock_variant, mock_mapping_record, mock_allele): variant_content = [ - (TEST_HGVS_IDENTIFIER, mock_variant, mapped_variant), - (TEST_HGVS_IDENTIFIER, mock_variant, mapped_variant), + (TEST_HGVS_IDENTIFIER, mock_variant, mock_mapping_record, mock_allele), + (TEST_HGVS_IDENTIFIER, mock_variant, mock_mapping_record, mock_allele), ] uuid_1 = UUID("12345678-1234-5678-1234-567812345678") diff --git a/tests/lib/conftest.py b/tests/lib/conftest.py index 2ac2183f..adfe2a1b 100644 --- a/tests/lib/conftest.py +++ b/tests/lib/conftest.py @@ -307,6 +307,23 @@ def mock_mapped_variant(mock_variant): return mv +@pytest.fixture +def mock_mapping_record(mock_mapped_variant): + # Pre-mapped data and the mapping API version now live on the per-variant MappingRecord. + rec = mock.Mock() + rec.pre_mapped = mock_mapped_variant.pre_mapped + rec.mapping_api_version = mock_mapped_variant.mapping_api_version + return rec + + +@pytest.fixture +def mock_allele(mock_mapped_variant): + # Post-mapped data now lives on the (cross-variant deduped) Allele. + allele = mock.Mock() + allele.post_mapped = mock_mapped_variant.post_mapped + return allele + + @pytest.fixture def mock_mapped_variant_with_functional_calibration_score_set( mock_mapped_variant, mock_variant_with_functional_calibration_score_set diff --git a/tests/worker/jobs/external_services/test_clingen.py b/tests/worker/jobs/external_services/test_clingen.py index 616263df..76b1b0b2 100644 --- a/tests/worker/jobs/external_services/test_clingen.py +++ b/tests/worker/jobs/external_services/test_clingen.py @@ -5,14 +5,16 @@ pytest.importorskip("arq") from asyncio.unix_events import _UnixSelectorEventLoop +from copy import deepcopy from unittest.mock import patch from sqlalchemy import select from mavedb.lib.types.workflow import JobExecutionOutcome from mavedb.lib.variants import get_hgvs_from_post_mapped +from mavedb.models.allele import Allele from mavedb.models.enums.job_pipeline import JobStatus, PipelineStatus -from mavedb.models.mapped_variant import MappedVariant +from mavedb.models.mapping_record_allele import MappingRecordAllele from mavedb.models.variant import Variant from mavedb.models.variant_annotation_status import VariantAnnotationStatus from mavedb.worker.jobs.external_services.clingen import ( @@ -51,9 +53,9 @@ async def test_submit_score_set_mappings_to_car_submission_disabled( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SKIPPED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 async def test_submit_score_set_mappings_to_car_no_mappings( self, @@ -76,9 +78,9 @@ async def test_submit_score_set_mappings_to_car_no_mappings( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 async def test_submit_score_set_mappings_to_car_submission_endpoint_not_set( self, @@ -101,9 +103,9 @@ async def test_submit_score_set_mappings_to_car_submission_endpoint_not_set( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 async def test_submit_score_set_mappings_to_car_no_registered_alleles( self, @@ -147,11 +149,11 @@ async def test_submit_score_set_mappings_to_car_no_registered_alleles( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 - # Verify annotation statuses were rendered as failed + # Verify annotation statuses were rendered as failed — 4 variants, all failed annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -184,18 +186,18 @@ async def test_submit_score_set_mappings_to_car_all_car_errors( dummy_variant_mapping_job_run, ) - # Build error responses for every submitted HGVS — CAR explicitly rejected all of them - mapped_variants = session.scalars(select(MappedVariant)).all() + # All 4 variants share 1 allele (same VRS digest). Build an error response for that 1 HGVS. + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 registered_alleles_mock = [ { "errorType": "InvalidHGVS", - "hgvs": get_hgvs_from_post_mapped(mv.post_mapped) or "", + "hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped) or "", "message": "Invalid HGVS expression.", "description": "", - "inputLine": get_hgvs_from_post_mapped(mv.post_mapped) or "", - "position": str(i), + "inputLine": get_hgvs_from_post_mapped(alleles[0].post_mapped) or "", + "position": "0", } - for i, mv in enumerate(mapped_variants) ] with ( @@ -215,11 +217,11 @@ async def test_submit_score_set_mappings_to_car_all_car_errors( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 - # Verify annotation statuses were rendered as failed + # 1 allele failed → all 4 variant annotations are failed annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -228,6 +230,211 @@ async def test_submit_score_set_mappings_to_car_all_car_errors( assert ann.status == "failed" assert ann.annotation_type == "clingen_allele_id" + async def test_submit_score_set_mappings_to_car_derived_allele_no_duplicate_annotation( + self, + mock_worker_ctx, + session, + with_submit_score_set_mappings_to_car_job, + submit_score_set_mappings_to_car_sample_job_run, + mock_s3_client, + sample_score_dataframe, + sample_count_dataframe, + with_dummy_setup_jobs, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ): + """A variant linked to an authoritative AND a derived allele gets exactly one VAS row (from the + authoritative link), while the derived allele is still registered with a CAID.""" + await create_mappings_in_score_set( + session, + mock_s3_client, + mock_worker_ctx, + sample_score_dataframe, + sample_count_dataframe, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ) + + # The sample's 4 variants dedup to one authoritative allele. + authoritative_allele = session.scalars(select(Allele)).one() + + # Attach a second, derived (is_authoritative=False) allele to one variant's current mapping + # record. It shares the authoritative allele's HGVS (different vrs_digest) so it is submitted + # under the same line — the realistic shape is a different level, but identical HGVS is enough + # to exercise the multi-allele-per-variant fan-out. + a_link = session.scalars( + select(MappingRecordAllele).where(MappingRecordAllele.allele_id == authoritative_allele.id) + ).first() + derived_allele = Allele( + vrs_digest=f"{authoritative_allele.vrs_digest}-derived", + level=authoritative_allele.level, + post_mapped={**deepcopy(authoritative_allele.post_mapped), "id": "derived-allele-id"}, + ) + session.add(derived_allele) + session.flush() + session.add( + MappingRecordAllele( + mapping_record_id=a_link.mapping_record_id, + allele_id=derived_allele.id, + is_authoritative=False, + ) + ) + session.flush() + + def fake_dispatch(hgvs_list): + return [{"@id": f"CA{idx}", "type": "nucleotide", "genomicAlleles": []} for idx, _ in enumerate(hgvs_list)] + + with ( + patch( + "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", + side_effect=fake_dispatch, + ), + patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), + patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), + ): + result = await submit_score_set_mappings_to_car( + mock_worker_ctx, + submit_score_set_mappings_to_car_sample_job_run.id, + JobManager(session, mock_worker_ctx["redis"], submit_score_set_mappings_to_car_sample_job_run.id), + ) + + assert result.status == JobStatus.SUCCEEDED + + # Exactly one current VAS row per variant (4 total) — no duplicate from the derived allele. + current_statuses = session.scalars( + select(VariantAnnotationStatus).where( + VariantAnnotationStatus.annotation_type == "clingen_allele_id", + VariantAnnotationStatus.current.is_(True), + ) + ).all() + assert len(current_statuses) == 4 + assert len({s.variant_id for s in current_statuses}) == 4 + + # Both alleles registered: registration breadth is preserved even though the derived allele + # produced no VAS row. + session.refresh(authoritative_allele) + session.refresh(derived_allele) + assert authoritative_allele.clingen_allele_id is not None + assert derived_allele.clingen_allele_id is not None + + async def test_submit_score_set_mappings_to_car_response_count_mismatch( + self, + mock_worker_ctx, + session, + with_submit_score_set_mappings_to_car_job, + submit_score_set_mappings_to_car_sample_job_run, + mock_s3_client, + sample_score_dataframe, + sample_count_dataframe, + with_dummy_setup_jobs, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ): + # Create mappings in the score set + await create_mappings_in_score_set( + session, + mock_s3_client, + mock_worker_ctx, + sample_score_dataframe, + sample_count_dataframe, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ) + + # All 4 variants share 1 allele → 1 submitted HGVS, but CAR returns 2 results. The count + # violates the one-result-per-input contract, so positional alignment can't be trusted + # and the whole batch must be rejected without writing any CAID. + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 + registered_alleles_mock = [ + {"@id": "CA111111", "type": "nucleotide", "genomicAlleles": []}, + {"@id": "CA222222", "type": "nucleotide", "genomicAlleles": []}, + ] + + with ( + patch( + "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", + return_value=registered_alleles_mock, + ), + patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), + patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), + ): + result = await submit_score_set_mappings_to_car( + mock_worker_ctx, + submit_score_set_mappings_to_car_sample_job_run.id, + JobManager(session, mock_worker_ctx["redis"], submit_score_set_mappings_to_car_sample_job_run.id), + ) + + assert isinstance(result, JobExecutionOutcome) + assert result.status == JobStatus.FAILED + + # No CAID written — neither of the returned values is trusted. + assert len(session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all()) == 0 + annotation_statuses = session.scalars( + select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") + ).all() + assert len(annotation_statuses) == 4 + for ann in annotation_statuses: + assert ann.status == "failed" + assert ann.failure_category == "external_api_error" + + async def test_submit_score_set_mappings_to_car_malformed_response( + self, + mock_worker_ctx, + session, + with_submit_score_set_mappings_to_car_job, + submit_score_set_mappings_to_car_sample_job_run, + mock_s3_client, + sample_score_dataframe, + sample_count_dataframe, + with_dummy_setup_jobs, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ): + # Create mappings in the score set + await create_mappings_in_score_set( + session, + mock_s3_client, + mock_worker_ctx, + sample_score_dataframe, + sample_count_dataframe, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ) + + # CAR returns a response that is neither an error (no "errorType") nor a registration + # (no "@id"). This must be surfaced as a rejection, not crash the loop with a KeyError. + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 + registered_alleles_mock = [{"type": "nucleotide", "genomicAlleles": []}] + + with ( + patch( + "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", + return_value=registered_alleles_mock, + ), + patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), + patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), + ): + result = await submit_score_set_mappings_to_car( + mock_worker_ctx, + submit_score_set_mappings_to_car_sample_job_run.id, + JobManager(session, mock_worker_ctx["redis"], submit_score_set_mappings_to_car_sample_job_run.id), + ) + + assert isinstance(result, JobExecutionOutcome) + assert result.status == JobStatus.FAILED + + # No CAID assigned, and all 4 variant annotations are failed as rejected. + assert len(session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all()) == 0 + annotation_statuses = session.scalars( + select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") + ).all() + assert len(annotation_statuses) == 4 + for ann in annotation_statuses: + assert ann.status == "failed" + assert ann.failure_category == "external_service_rejected" + async def test_submit_score_set_mappings_to_car_repeated_hgvs( self, mock_worker_ctx, @@ -252,13 +459,14 @@ async def test_submit_score_set_mappings_to_car_repeated_hgvs( dummy_variant_mapping_job_run, ) - # Patch ClinGenAlleleRegistryService to return registered alleles with repeated HGVS - mapped_variants = session.scalars(select(MappedVariant)).all() + # 4 variants share 1 allele; CAR returns 1 response for the 1 submitted HGVS + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 registered_alleles_mock = [ { "@id": "CA_DUPLICATE", "type": "nucleotide", - "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(mapped_variants[0].post_mapped)}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } ] @@ -267,11 +475,6 @@ async def test_submit_score_set_mappings_to_car_repeated_hgvs( "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", return_value=registered_alleles_mock, ), - # Patch get_hgvs_from_post_mapped to return the same HGVS for all variants - patch( - "mavedb.worker.jobs.external_services.clingen.get_hgvs_from_post_mapped", - return_value=get_hgvs_from_post_mapped(mapped_variants[0].post_mapped), - ), patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), ): @@ -284,13 +487,12 @@ async def test_submit_score_set_mappings_to_car_repeated_hgvs( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - # Verify variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 4 - for variant in variants: - assert variant.clingen_allele_id == "CA_DUPLICATE" + # 1 allele received the CAID + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 1 + assert alleles[0].clingen_allele_id == "CA_DUPLICATE" - # Verify annotation statuses were rendered as success + # 4 per-variant annotations — all success annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -312,7 +514,7 @@ async def test_submit_score_set_mappings_to_car_partial_failure( dummy_variant_creation_job_run, dummy_variant_mapping_job_run, ): - """Test that partial CAR failures (some matched, some not) result in a succeeded outcome with failure annotations.""" + """All 4 variants share 1 allele; CAR returns 1 success → 1 registered allele, 0 failed.""" # Create mappings in the score set await create_mappings_in_score_set( session, @@ -324,16 +526,14 @@ async def test_submit_score_set_mappings_to_car_partial_failure( dummy_variant_mapping_job_run, ) - # Get mapped variants; return a CAR response that only matches the first variant - mapped_variants = session.scalars(select(MappedVariant)).all() - assert len(mapped_variants) == 4 + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 - first_hgvs = get_hgvs_from_post_mapped(mapped_variants[0].post_mapped) registered_alleles_mock = [ { - "@id": f"CA{mapped_variants[0].id}", + "@id": f"CA{alleles[0].id}", "type": "nucleotide", - "genomicAlleles": [{"hgvs": first_hgvs}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } ] @@ -353,31 +553,22 @@ async def test_submit_score_set_mappings_to_car_partial_failure( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - assert result.data["matched_count"] == 1 - assert result.data["failed_count"] == 3 + assert result.data["registered_allele_count"] == 1 + assert result.data["failed_allele_count"] == 0 - # Verify only the first variant got a CAID - variants_with_caid = session.scalars( - select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None)) - ).all() - assert len(variants_with_caid) == 1 - assert variants_with_caid[0].clingen_allele_id == f"CA{mapped_variants[0].id}" + # 1 allele got a CAID + alleles_with_caid = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles_with_caid) == 1 + assert alleles_with_caid[0].clingen_allele_id == f"CA{alleles[0].id}" - # Verify annotation statuses: 1 success, 3 failed + # All 4 variant annotations succeeded success_annotations = session.scalars( select(VariantAnnotationStatus).where( VariantAnnotationStatus.annotation_type == "clingen_allele_id", VariantAnnotationStatus.status == "success", ) ).all() - failed_annotations = session.scalars( - select(VariantAnnotationStatus).where( - VariantAnnotationStatus.annotation_type == "clingen_allele_id", - VariantAnnotationStatus.status == "failed", - ) - ).all() - assert len(success_annotations) == 1 - assert len(failed_annotations) == 3 + assert len(success_annotations) == 4 async def test_submit_score_set_mappings_to_car_hgvs_not_found( self, @@ -403,32 +594,11 @@ async def test_submit_score_set_mappings_to_car_hgvs_not_found( dummy_variant_mapping_job_run, ) - # Get the mapped variants from score set before submission - mapped_variants = session.scalars( - select(MappedVariant) - .join(Variant) - .where(Variant.score_set_id == submit_score_set_mappings_to_car_sample_job_run.job_params["score_set_id"]) - ).all() - - # Patch ClinGenAlleleRegistryService to return registered alleles - registered_alleles_mock = [ - { - "@id": f"CA{mv.id}", - "type": "nucleotide", - "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(mv.post_mapped)}], - } - for mv in mapped_variants - ] - + # Patch get_hgvs_from_post_mapped to return None for all alleles with ( - patch( - "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", - return_value=registered_alleles_mock, - ), - # Patch get_hgvs_from_post_mapped to not find any HGVS in registered alleles - patch("mavedb.worker.jobs.external_services.clingen.get_hgvs_from_post_mapped", return_value=None), patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), + patch("mavedb.worker.jobs.external_services.clingen.get_hgvs_from_post_mapped", return_value=None), ): result = await submit_score_set_mappings_to_car( mock_worker_ctx, @@ -439,11 +609,11 @@ async def test_submit_score_set_mappings_to_car_hgvs_not_found( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 - # Verify annotation statuses were rendered as failed + # Verify annotation statuses were rendered as failed — 4 variants, all failed annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -519,20 +689,16 @@ async def test_submit_score_set_mappings_to_car_success( dummy_variant_mapping_job_run, ) - # Get the mapped variants from score set before submission - mapped_variants = session.scalars( - select(MappedVariant).join(Variant).where(Variant.score_set_id == sample_score_set.id) - ).all() - assert len(mapped_variants) == 4 + # All 4 variants share 1 allele (same VRS digest from construct_mock_mapping_output) + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 - # Patch ClinGenAlleleRegistryService to return registered alleles registered_alleles_mock = [ { - "@id": f"CA{mv.id}", + "@id": f"CA{alleles[0].id}", "type": "nucleotide", - "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(mv.post_mapped)}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } - for mv in mapped_variants ] with ( @@ -552,13 +718,12 @@ async def test_submit_score_set_mappings_to_car_success( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - # Verify variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 4 - for variant in variants: - assert variant.clingen_allele_id == f"CA{variant.id}" + # 1 allele received the CAID + alleles_with_caid = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles_with_caid) == 1 + assert alleles_with_caid[0].clingen_allele_id == f"CA{alleles[0].id}" - # Verify annotation statuses were rendered as success + # 4 per-variant annotations — all success annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -567,6 +732,192 @@ async def test_submit_score_set_mappings_to_car_success( assert ann.status == "success" assert ann.annotation_type == "clingen_allele_id" + async def test_submit_score_set_mappings_to_car_preexisting( + self, + mock_worker_ctx, + session, + with_submit_score_set_mappings_to_car_job, + submit_score_set_mappings_to_car_sample_job_run, + mock_s3_client, + sample_score_dataframe, + sample_count_dataframe, + with_dummy_setup_jobs, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ): + """Already-registered allele is re-annotated as preexisting, not re-submitted.""" + await create_mappings_in_score_set( + session, + mock_s3_client, + mock_worker_ctx, + sample_score_dataframe, + sample_count_dataframe, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ) + + # Pre-set the CAID on the allele to simulate prior registration + allele = session.scalars(select(Allele)).first() + allele.clingen_allele_id = "CA_PRIOR" + session.flush() + + with ( + patch( + "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", + ) as mock_dispatch, + patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), + patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), + ): + result = await submit_score_set_mappings_to_car( + mock_worker_ctx, + submit_score_set_mappings_to_car_sample_job_run.id, + JobManager(session, mock_worker_ctx["redis"], submit_score_set_mappings_to_car_sample_job_run.id), + ) + + # CAR should NOT be called since the allele is already registered + mock_dispatch.assert_not_called() + assert result.status == JobStatus.SUCCEEDED + assert result.data["already_registered_allele_count"] == 1 + assert result.data["submitted_allele_count"] == 0 + + annotation_statuses = session.scalars( + select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") + ).all() + assert len(annotation_statuses) == 4 + for ann in annotation_statuses: + assert ann.status == "success" + assert ann.annotation_metadata["registration_source"] == "preexisting" + assert ann.annotation_metadata["clingen_allele_id"] == "CA_PRIOR" + + async def test_submit_score_set_mappings_to_car_force_reregister_same_caid( + self, + mock_worker_ctx, + session, + with_submit_score_set_mappings_to_car_job, + submit_score_set_mappings_to_car_sample_job_run, + mock_s3_client, + sample_score_dataframe, + sample_count_dataframe, + with_dummy_setup_jobs, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ): + """Force re-registration that returns the same CAID is a success with registration_source=reconfirmed.""" + await create_mappings_in_score_set( + session, + mock_s3_client, + mock_worker_ctx, + sample_score_dataframe, + sample_count_dataframe, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ) + + allele = session.scalars(select(Allele)).first() + allele.clingen_allele_id = "CA_CONFIRMED" + session.flush() + + submit_score_set_mappings_to_car_sample_job_run.job_params = { + **submit_score_set_mappings_to_car_sample_job_run.job_params, + "force_reregister": True, + } + session.flush() + + registered_alleles_mock = [{"@id": "CA_CONFIRMED", "type": "nucleotide", "genomicAlleles": []}] + + with ( + patch( + "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", + return_value=registered_alleles_mock, + ), + patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), + patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), + ): + result = await submit_score_set_mappings_to_car( + mock_worker_ctx, + submit_score_set_mappings_to_car_sample_job_run.id, + JobManager(session, mock_worker_ctx["redis"], submit_score_set_mappings_to_car_sample_job_run.id), + ) + + assert result.status == JobStatus.SUCCEEDED + assert result.data["registered_allele_count"] == 1 + assert result.data["submitted_allele_count"] == 1 + + annotation_statuses = session.scalars( + select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") + ).all() + assert len(annotation_statuses) == 4 + for ann in annotation_statuses: + assert ann.status == "success" + assert ann.annotation_metadata["registration_source"] == "reconfirmed" + + async def test_submit_score_set_mappings_to_car_force_reregister_caid_conflict( + self, + mock_worker_ctx, + session, + with_submit_score_set_mappings_to_car_job, + submit_score_set_mappings_to_car_sample_job_run, + mock_s3_client, + sample_score_dataframe, + sample_count_dataframe, + with_dummy_setup_jobs, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ): + """Force re-registration returning a different CAID fails without overwriting the stored CAID.""" + await create_mappings_in_score_set( + session, + mock_s3_client, + mock_worker_ctx, + sample_score_dataframe, + sample_count_dataframe, + dummy_variant_creation_job_run, + dummy_variant_mapping_job_run, + ) + + allele = session.scalars(select(Allele)).first() + allele.clingen_allele_id = "CA_STORED" + session.flush() + + submit_score_set_mappings_to_car_sample_job_run.job_params = { + **submit_score_set_mappings_to_car_sample_job_run.job_params, + "force_reregister": True, + } + session.flush() + + # CAR returns a DIFFERENT CAID — this is an invariant violation + registered_alleles_mock = [{"@id": "CA_DIFFERENT", "type": "nucleotide", "genomicAlleles": []}] + + with ( + patch( + "mavedb.worker.jobs.external_services.clingen.ClinGenAlleleRegistryService.dispatch_submissions", + return_value=registered_alleles_mock, + ), + patch("mavedb.worker.jobs.external_services.clingen.CAR_SUBMISSION_ENDPOINT", "http://fake-endpoint"), + patch("mavedb.worker.jobs.external_services.clingen.CLIN_GEN_SUBMISSION_ENABLED", True), + ): + result = await submit_score_set_mappings_to_car( + mock_worker_ctx, + submit_score_set_mappings_to_car_sample_job_run.id, + JobManager(session, mock_worker_ctx["redis"], submit_score_set_mappings_to_car_sample_job_run.id), + ) + + # Job fails because all CAID-returning submissions had a conflict + assert result.status == JobStatus.FAILED + + # CAID must NOT have been overwritten + session.refresh(allele) + assert allele.clingen_allele_id == "CA_STORED" + + annotation_statuses = session.scalars( + select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") + ).all() + assert len(annotation_statuses) == 4 + for ann in annotation_statuses: + assert ann.status == "failed" + assert ann.annotation_metadata["clingen_allele_id"] == "CA_STORED" + assert ann.annotation_metadata["conflicting_caid"] == "CA_DIFFERENT" + @pytest.mark.integration @pytest.mark.asyncio @@ -597,15 +948,14 @@ async def test_submit_score_set_mappings_to_car_independent_ctx( dummy_variant_mapping_job_run, ) - # Patch ClinGenAlleleRegistryService to return registered alleles - mapped_variants = session.scalars(select(MappedVariant)).all() + # All 4 variants share 1 allele; build 1 CAR response + alleles = session.scalars(select(Allele)).all() registered_alleles_mock = [ { - "@id": f"CA{mv.id}", + "@id": f"CA{alleles[0].id}", "type": "nucleotide", - "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(mv.post_mapped)}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } - for mv in mapped_variants ] with ( @@ -623,17 +973,16 @@ async def test_submit_score_set_mappings_to_car_independent_ctx( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - # Verify variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == len(mapped_variants) - for variant in variants: - assert variant.clingen_allele_id == f"CA{variant.id}" + # 1 allele received the CAID + alleles_with_caid = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles_with_caid) == 1 + assert alleles_with_caid[0].clingen_allele_id == f"CA{alleles[0].id}" - # Verify annotation statuses were rendered as success + # 4 per-variant annotations — all success annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() - assert len(annotation_statuses) == len(mapped_variants) + assert len(annotation_statuses) == 4 for ann in annotation_statuses: assert ann.status == "success" @@ -666,15 +1015,14 @@ async def test_submit_score_set_mappings_to_car_pipeline_ctx( dummy_variant_mapping_job_run, ) - # Patch ClinGenAlleleRegistryService to return registered alleles - mapped_variants = session.scalars(select(MappedVariant)).all() + # All 4 variants share 1 allele; build 1 CAR response + alleles = session.scalars(select(Allele)).all() registered_alleles_mock = [ { - "@id": f"CA{mv.id}", + "@id": f"CA{alleles[0].id}", "type": "nucleotide", - "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(mv.post_mapped)}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } - for mv in mapped_variants ] with ( @@ -692,17 +1040,16 @@ async def test_submit_score_set_mappings_to_car_pipeline_ctx( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - # Verify variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == len(mapped_variants) - for variant in variants: - assert variant.clingen_allele_id == f"CA{variant.id}" + # 1 allele received the CAID + alleles_with_caid = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles_with_caid) == 1 + assert alleles_with_caid[0].clingen_allele_id == f"CA{alleles[0].id}" - # Verify annotation statuses were rendered as success + # 4 per-variant annotations — all success annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() - assert len(annotation_statuses) == len(mapped_variants) + assert len(annotation_statuses) == 4 for ann in annotation_statuses: assert ann.status == "success" @@ -738,9 +1085,9 @@ async def test_submit_score_set_mappings_to_car_submission_disabled( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SKIPPED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 # Verify no annotation statuses were created annotation_statuses = session.scalars(select(VariantAnnotationStatus)).all() @@ -777,9 +1124,9 @@ async def test_submit_score_set_mappings_to_car_no_submission_endpoint( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 # Verify no annotation statuses were created annotation_statuses = session.scalars(select(VariantAnnotationStatus)).all() @@ -808,9 +1155,9 @@ async def test_submit_score_set_mappings_to_car_no_mappings( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 # Verify no annotation statuses were created annotation_statuses = session.scalars(select(VariantAnnotationStatus)).all() @@ -862,11 +1209,11 @@ async def test_submit_score_set_mappings_to_car_no_registered_alleles( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 - # Verify annotation statuses were rendered as failed + # Verify annotation statuses were rendered as failed — 4 variants, all failed annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -903,9 +1250,6 @@ async def test_submit_score_set_mappings_to_car_no_linked_alleles( # Patch ClinGenAlleleRegistryService to return only errors with no linked alleles registered_alleles_mock = [ {"errorType": "InvalidHGVS", "hgvs": "test"}, - {"errorType": "InvalidHGVS", "hgvs": "test2"}, - {"errorType": "InvalidHGVS", "hgvs": "test3"}, - {"errorType": "InvalidHGVS", "hgvs": "test4"}, ] with ( @@ -925,11 +1269,11 @@ async def test_submit_score_set_mappings_to_car_no_linked_alleles( assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 - # Verify annotation statuses were rendered as failed + # Verify annotation statuses were rendered as failed — 4 variants, all failed annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -952,7 +1296,7 @@ async def test_submit_score_set_mappings_to_car_partial_failure( dummy_variant_creation_job_run, dummy_variant_mapping_job_run, ): - """Test that partial CAR failures result in SUCCEEDED status with per-variant failure annotations committed.""" + """All 4 variants share 1 allele; CAR returns 1 success → 1 registered allele, 0 failed, job SUCCEEDED.""" # Create mappings in the score set await create_mappings_in_score_set( session, @@ -964,14 +1308,14 @@ async def test_submit_score_set_mappings_to_car_partial_failure( dummy_variant_mapping_job_run, ) - # Return a CAR response that only matches the first variant's HGVS - mapped_variants = session.scalars(select(MappedVariant)).all() - first_hgvs = get_hgvs_from_post_mapped(mapped_variants[0].post_mapped) + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 + registered_alleles_mock = [ { - "@id": f"CA{mapped_variants[0].id}", + "@id": f"CA{alleles[0].id}", "type": "nucleotide", - "genomicAlleles": [{"hgvs": first_hgvs}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } ] @@ -991,31 +1335,22 @@ async def test_submit_score_set_mappings_to_car_partial_failure( mock_send_slack_error.assert_not_called() assert isinstance(result, JobExecutionOutcome) assert result.status == JobStatus.SUCCEEDED - assert result.data["matched_count"] == 1 - assert result.data["failed_count"] == 3 + assert result.data["registered_allele_count"] == 1 + assert result.data["failed_allele_count"] == 0 - # Verify the successfully matched variant got a CAID - variants_with_caid = session.scalars( - select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None)) - ).all() - assert len(variants_with_caid) == 1 - assert variants_with_caid[0].clingen_allele_id == f"CA{mapped_variants[0].id}" + # 1 allele got a CAID + alleles_with_caid = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles_with_caid) == 1 + assert alleles_with_caid[0].clingen_allele_id == f"CA{alleles[0].id}" - # Verify annotation statuses: 1 success, 3 failed + # All 4 variant annotations succeeded success_annotations = session.scalars( select(VariantAnnotationStatus).where( VariantAnnotationStatus.annotation_type == "clingen_allele_id", VariantAnnotationStatus.status == "success", ) ).all() - failed_annotations = session.scalars( - select(VariantAnnotationStatus).where( - VariantAnnotationStatus.annotation_type == "clingen_allele_id", - VariantAnnotationStatus.status == "failed", - ) - ).all() - assert len(success_annotations) == 1 - assert len(failed_annotations) == 3 + assert len(success_annotations) == 4 # Verify the job status is updated in the database session.refresh(submit_score_set_mappings_to_car_sample_job_run) @@ -1046,22 +1381,17 @@ async def test_submit_score_set_mappings_to_car_car_error_details_stored_in_anno dummy_variant_mapping_job_run, ) - # Return a CAR response where: first variant succeeds, second has explicit CAR error, rest are silent failures - mapped_variants = session.scalars(select(MappedVariant)).all() - first_hgvs = get_hgvs_from_post_mapped(mapped_variants[0].post_mapped) - second_hgvs = get_hgvs_from_post_mapped(mapped_variants[1].post_mapped) + # All 4 variants share 1 allele. CAR returns an explicit error for that 1 HGVS. + alleles = session.scalars(select(Allele)).all() + assert len(alleles) == 1 + allele_hgvs = get_hgvs_from_post_mapped(alleles[0].post_mapped) registered_alleles_mock = [ - { - "@id": f"CA{mapped_variants[0].id}", - "type": "nucleotide", - "genomicAlleles": [{"hgvs": first_hgvs}], - }, { "errorType": "InvalidHGVS", - "hgvs": second_hgvs, + "hgvs": allele_hgvs, "message": "The HGVS string is invalid.", "description": "error", - "inputLine": second_hgvs, + "inputLine": allele_hgvs, "position": "0", }, ] @@ -1079,31 +1409,18 @@ async def test_submit_score_set_mappings_to_car_car_error_details_stored_in_anno standalone_worker_context, submit_score_set_mappings_to_car_sample_job_run.id ) - # Verify the variant whose HGVS returned an explicit CAR error has error details in annotation_metadata. - # Only 1 annotation should have EXTERNAL_SERVICE_REJECTED since only one CAR error was in the response. + # All 4 variant annotations should have EXTERNAL_SERVICE_REJECTED since the 1 shared allele was rejected car_rejected_annotations = session.scalars( select(VariantAnnotationStatus).where( VariantAnnotationStatus.annotation_type == "clingen_allele_id", VariantAnnotationStatus.failure_category == "external_service_rejected", ) ).all() - assert len(car_rejected_annotations) == 1 - rejected = car_rejected_annotations[0] - assert rejected.annotation_metadata["submitted_hgvs"] == second_hgvs - assert rejected.annotation_metadata["car_error_type"] == "InvalidHGVS" - assert rejected.annotation_metadata["car_error_message"] == "The HGVS string is invalid." - - # The remaining 2 failures (variants 3 and 4) got no CAR response — silent failures get EXTERNAL_API_ERROR. - silent_failure_annotations = session.scalars( - select(VariantAnnotationStatus).where( - VariantAnnotationStatus.annotation_type == "clingen_allele_id", - VariantAnnotationStatus.failure_category == "external_api_error", - ) - ).all() - assert len(silent_failure_annotations) == 2 - for ann in silent_failure_annotations: - assert ann.annotation_metadata["submitted_hgvs"] is not None - assert "car_error_type" not in ann.annotation_metadata + assert len(car_rejected_annotations) == 4 + for rejected in car_rejected_annotations: + assert rejected.annotation_metadata["submitted_hgvs"] == allele_hgvs + assert rejected.annotation_metadata["car_error_type"] == "InvalidHGVS" + assert rejected.annotation_metadata["car_error_message"] == "The HGVS string is invalid." async def test_submit_score_set_mappings_to_car_propagates_exception_to_decorator( self, @@ -1185,15 +1502,14 @@ async def test_submit_score_set_mappings_to_car_with_arq_context_independent( dummy_variant_mapping_job_run, ) - # Patch ClinGenAlleleRegistryService to return registered alleles - mapped_variants = session.scalars(select(MappedVariant)).all() + # All 4 variants share 1 allele; build 1 CAR response + alleles = session.scalars(select(Allele)).all() registered_alleles_mock = [ { - "@id": f"CA{mv.id}", + "@id": f"CA{alleles[0].id}", "type": "nucleotide", - "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(mv.post_mapped)}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } - for mv in mapped_variants ] with ( @@ -1214,13 +1530,12 @@ async def test_submit_score_set_mappings_to_car_with_arq_context_independent( session.refresh(submit_score_set_mappings_to_car_sample_job_run) assert submit_score_set_mappings_to_car_sample_job_run.status == JobStatus.SUCCEEDED - # Verify variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == len(mapped_variants) - for variant in variants: - assert variant.clingen_allele_id == f"CA{variant.id}" + # 1 allele received the CAID + alleles_with_caid = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles_with_caid) == 1 + assert alleles_with_caid[0].clingen_allele_id == f"CA{alleles[0].id}" - # Verify annotation statuses were rendered as success + # 4 per-variant annotations — all success annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -1255,15 +1570,14 @@ async def test_submit_score_set_mappings_to_car_with_arq_context_pipeline( dummy_variant_mapping_job_run, ) - # Patch ClinGenAlleleRegistryService to return registered alleles - mapped_variants = session.scalars(select(MappedVariant)).all() + # All 4 variants share 1 allele; build 1 CAR response + alleles = session.scalars(select(Allele)).all() registered_alleles_mock = [ { - "@id": f"CA{mv.id}", + "@id": f"CA{alleles[0].id}", "type": "nucleotide", - "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(mv.post_mapped)}], + "genomicAlleles": [{"hgvs": get_hgvs_from_post_mapped(alleles[0].post_mapped)}], } - for mv in mapped_variants ] with ( @@ -1288,13 +1602,12 @@ async def test_submit_score_set_mappings_to_car_with_arq_context_pipeline( session.refresh(submit_score_set_mappings_to_car_sample_pipeline) assert submit_score_set_mappings_to_car_sample_pipeline.status == PipelineStatus.SUCCEEDED - # Verify variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == len(mapped_variants) - for variant in variants: - assert variant.clingen_allele_id == f"CA{variant.id}" + # 1 allele received the CAID + alleles_with_caid = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles_with_caid) == 1 + assert alleles_with_caid[0].clingen_allele_id == f"CA{alleles[0].id}" - # Verify annotation statuses were rendered as success + # 4 per-variant annotations — all success annotation_statuses = session.scalars( select(VariantAnnotationStatus).where(VariantAnnotationStatus.annotation_type == "clingen_allele_id") ).all() @@ -1350,9 +1663,9 @@ async def test_submit_score_set_mappings_to_car_with_arq_context_exception_handl assert submit_score_set_mappings_to_car_sample_job_run.status == JobStatus.ERRORED assert submit_score_set_mappings_to_car_sample_job_run.error_message == "ClinGen service error" - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 # Verify no annotation statuses were created annotation_statuses = session.scalars( @@ -1413,9 +1726,9 @@ async def test_submit_score_set_mappings_to_car_with_arq_context_exception_handl session.refresh(submit_score_set_mappings_to_car_sample_pipeline) assert submit_score_set_mappings_to_car_sample_pipeline.status == PipelineStatus.FAILED - # Verify no variants have CAIDs assigned - variants = session.scalars(select(MappedVariant).where(MappedVariant.clingen_allele_id.isnot(None))).all() - assert len(variants) == 0 + # Verify no alleles have CAIDs assigned + alleles = session.scalars(select(Allele).where(Allele.clingen_allele_id.isnot(None))).all() + assert len(alleles) == 0 # Verify no annotation statuses were created annotation_statuses = session.scalars( diff --git a/tests/worker/jobs/external_services/test_clingen_cache.py b/tests/worker/jobs/external_services/test_clingen_cache.py index a55eb6b8..6d95b917 100644 --- a/tests/worker/jobs/external_services/test_clingen_cache.py +++ b/tests/worker/jobs/external_services/test_clingen_cache.py @@ -4,11 +4,14 @@ pytest.importorskip("arq") +from datetime import date from unittest.mock import AsyncMock, patch from mavedb.lib.types.workflow import JobExecutionOutcome +from mavedb.models.allele import Allele from mavedb.models.enums.job_pipeline import JobStatus -from mavedb.models.mapped_variant import MappedVariant +from mavedb.models.mapping_record import MappingRecord +from mavedb.models.mapping_record_allele import MappingRecordAllele from mavedb.models.score_set import ScoreSet from mavedb.models.variant import Variant from mavedb.worker.jobs.external_services.clingen_cache import warm_clingen_cache @@ -17,6 +20,51 @@ pytestmark = pytest.mark.usefixtures("patch_db_session_ctxmgr") +def _make_allele_with_caid(session, score_set_id: int, urn_suffix: str, caid: str | None, vrs_digest: str) -> Allele: + """Create a Variant → MappingRecord → Allele chain and return the Allele. + + get_alleles_for_score_set joins: Allele ← MappingRecordAllele (current) ← MappingRecord (current) ← Variant. + Leaving valid_to=NULL on MappingRecord and MappingRecordAllele makes them current. + """ + variant = Variant( + urn=f"urn:variant:{urn_suffix}", + score_set_id=score_set_id, + hgvs_nt="NM_000000.1:c.1A>G", + hgvs_pro="NP_000000.1:p.Met1Val", + data={}, + ) + session.add(variant) + session.flush() + + allele = Allele( + vrs_digest=vrs_digest, + level="genomic", + post_mapped={"type": "Allele", "location": {"sequenceReference": {"refgetAccession": vrs_digest}}}, + clingen_allele_id=caid, + ) + session.add(allele) + session.flush() + + mapping_record = MappingRecord( + variant_id=variant.id, + assay_level="genomic", + mapping_api_version="1.0.0", + mapped_date=date.today(), + ) + session.add(mapping_record) + session.flush() + + link = MappingRecordAllele( + mapping_record_id=mapping_record.id, + allele_id=allele.id, + is_authoritative=True, + ) + session.add(link) + session.flush() + + return allele + + @pytest.mark.unit @pytest.mark.asyncio class TestWarmClingenCacheUnit: @@ -49,26 +97,15 @@ async def test_warms_cache_for_variants_with_caids( """Job calls get_clingen_allele_data for each distinct allele ID.""" score_set = session.get(ScoreSet, sample_warm_clingen_cache_job_run.job_params["score_set_id"]) - # Create two variants with the same CAID — should only warm once (distinct) - for i, caid in enumerate(["CA111111", "CA222222", "CA111111"]): - variant = Variant( - urn=f"urn:variant:warm-test-{i}", - score_set_id=score_set.id, - hgvs_nt=f"NM_000000.1:c.{i + 1}A>G", - hgvs_pro=f"NP_000000.1:p.Met{i + 1}Val", - data={}, - ) - session.add(variant) - session.commit() - mapped_variant = MappedVariant( - variant_id=variant.id, - clingen_allele_id=caid, - current=True, - mapped_date="2024-01-01T00:00:00Z", - mapping_api_version="1.0.0", - ) - session.add(mapped_variant) - session.commit() + # Three variants, two sharing the same CAID — should only warm 2 distinct IDs. + # Two separate allele rows are needed (different VRS digests) to get 2 distinct CAIDs. + _make_allele_with_caid(session, score_set.id, "warm-test-0", "CA111111", "digest-warm-0") + _make_allele_with_caid(session, score_set.id, "warm-test-1", "CA222222", "digest-warm-1") + # Third variant points to same allele as first (same CAID CA111111 via a fresh allele row + # with the same digest — but since get_alleles_for_score_set returns per-variant rows and + # caid dedup happens in the warmer, we can share the digest). + _make_allele_with_caid(session, score_set.id, "warm-test-2", "CA111111", "digest-warm-2") + session.commit() mock_get_allele_data = AsyncMock(return_value={"some": "data"}) @@ -100,24 +137,8 @@ async def test_skips_null_and_multi_variant_caids( caids = ["CA333333", None, "CA-MULTI-001,CA-MULTI-002"] for i, caid in enumerate(caids): - variant = Variant( - urn=f"urn:variant:warm-filter-{i}", - score_set_id=score_set.id, - hgvs_nt=f"NM_000000.1:c.{i + 10}A>G", - hgvs_pro=f"NP_000000.1:p.Met{i + 10}Val", - data={}, - ) - session.add(variant) - session.commit() - mapped_variant = MappedVariant( - variant_id=variant.id, - clingen_allele_id=caid, - current=True, - mapped_date="2024-01-01T00:00:00Z", - mapping_api_version="1.0.0", - ) - session.add(mapped_variant) - session.commit() + _make_allele_with_caid(session, score_set.id, f"warm-filter-{i}", caid, f"digest-filter-{i}") + session.commit() mock_get_allele_data = AsyncMock(return_value={"some": "data"}) @@ -147,24 +168,8 @@ async def test_continues_on_individual_fetch_failure( score_set = session.get(ScoreSet, sample_warm_clingen_cache_job_run.job_params["score_set_id"]) for i, caid in enumerate(["CA444444", "CA555555"]): - variant = Variant( - urn=f"urn:variant:warm-fail-{i}", - score_set_id=score_set.id, - hgvs_nt=f"NM_000000.1:c.{i + 20}A>G", - hgvs_pro=f"NP_000000.1:p.Met{i + 20}Val", - data={}, - ) - session.add(variant) - session.commit() - mapped_variant = MappedVariant( - variant_id=variant.id, - clingen_allele_id=caid, - current=True, - mapped_date="2024-01-01T00:00:00Z", - mapping_api_version="1.0.0", - ) - session.add(mapped_variant) - session.commit() + _make_allele_with_caid(session, score_set.id, f"warm-fail-{i}", caid, f"digest-fail-{i}") + session.commit() # First call raises, second succeeds mock_get_allele_data = AsyncMock( @@ -192,7 +197,7 @@ async def test_only_warms_current_mapped_variants( with_warm_clingen_cache_job, sample_warm_clingen_cache_job_run, ): - """Job only fetches allele IDs from current (not superseded) mapped variants.""" + """Job only fetches allele IDs from current (not superseded) mapping records.""" score_set = session.get(ScoreSet, sample_warm_clingen_cache_job_run.job_params["score_set_id"]) variant = Variant( @@ -203,25 +208,65 @@ async def test_only_warms_current_mapped_variants( data={}, ) session.add(variant) - session.commit() + session.flush() - # Non-current mapped variant should be ignored - old_mv = MappedVariant( - variant_id=variant.id, + # Superseded allele (MappingRecord with valid_to set — not current) + old_allele = Allele( + vrs_digest="digest-old-mv", + level="genomic", + post_mapped={"type": "Allele"}, clingen_allele_id="CA666666", - current=False, - mapped_date="2023-01-01T00:00:00Z", - mapping_api_version="0.9.0", ) - # Current mapped variant should be included - current_mv = MappedVariant( + session.add(old_allele) + session.flush() + + from datetime import datetime, timezone + + old_mapping_record = MappingRecord( variant_id=variant.id, + assay_level="genomic", + mapping_api_version="0.9.0", + mapped_date=date(2023, 1, 1), + ) + # Close this record so it is non-current (valid_to set) + old_mapping_record.valid_to = datetime(2024, 1, 1, tzinfo=timezone.utc) + session.add(old_mapping_record) + session.flush() + + old_link = MappingRecordAllele( + mapping_record_id=old_mapping_record.id, + allele_id=old_allele.id, + is_authoritative=True, + ) + old_link.valid_to = datetime(2024, 1, 1, tzinfo=timezone.utc) + session.add(old_link) + session.flush() + + # Current allele (MappingRecord with valid_to=NULL — current) + current_allele = Allele( + vrs_digest="digest-current-mv", + level="genomic", + post_mapped={"type": "Allele"}, clingen_allele_id="CA777777", - current=True, - mapped_date="2024-01-01T00:00:00Z", + ) + session.add(current_allele) + session.flush() + + current_mapping_record = MappingRecord( + variant_id=variant.id, + assay_level="genomic", mapping_api_version="1.0.0", + mapped_date=date.today(), + ) + session.add(current_mapping_record) + session.flush() + + current_link = MappingRecordAllele( + mapping_record_id=current_mapping_record.id, + allele_id=current_allele.id, + is_authoritative=True, ) - session.add_all([old_mv, current_mv]) + session.add(current_link) session.commit() mock_get_allele_data = AsyncMock(return_value={"some": "data"})