diff --git a/src/dcd_mapping/annotate.py b/src/dcd_mapping/annotate.py index 9982eb4..6a39a9f 100644 --- a/src/dcd_mapping/annotate.py +++ b/src/dcd_mapping/annotate.py @@ -44,6 +44,7 @@ GeneInfo, MappedReferenceSequence, MappedScore, + MappingOutcome, ScoreAnnotation, ScoresetMapping, ScoresetMetadata, @@ -520,7 +521,11 @@ def _iter_genomic_spans_from_mapped_scores( ) if refget_chrom: spans.append( - (get_ucsc_chromosome_name(refget_chrom), loc.start, loc.end) + ( + get_ucsc_chromosome_name(refget_chrom), + loc.start, + loc.end, + ) ) elif isinstance(ms.post_mapped, Haplotype): @@ -531,7 +536,11 @@ def _iter_genomic_spans_from_mapped_scores( ) if refget_chrom: spans.append( - (get_ucsc_chromosome_name(refget_chrom), loc.start, loc.end) + ( + get_ucsc_chromosome_name(refget_chrom), + loc.start, + loc.end, + ) ) return spans @@ -741,6 +750,52 @@ def _get_hgvs_string(allele: Allele, accession: str) -> tuple[str, Syntax]: return var, syntax +def _resolve_outcome(mapped_score: MappedScore) -> MappingOutcome: + """Resolve the typed :class:`MappingOutcome` for an emitted annotation. + + Projected records arrive with an explicit outcome (``MAPPED`` / ``INTRONIC`` / + ``NO_PROTEIN_CONSEQUENCE`` / ``FAILED``) -- keep it. Measured records (and any legacy + record) carry none, so derive it so *every* emitted annotation is typed uniformly: + ``MAPPED`` when a post-mapped allele was produced, else ``FAILED``. + """ + if mapped_score.outcome is not None: + return mapped_score.outcome + + return ( + MappingOutcome.MAPPED + if mapped_score.post_mapped is not None + else MappingOutcome.FAILED + ) + + +def _resolve_postmapped_accession( + alignment_level: AnnotationLayer | None, + representative_allele: Allele, + metadata: TargetGene, + tx_results: TxSelectResult | TxSelectError | None, +) -> str | None: + """Resolve the accession to reconstruct a post-mapped HGVS against: the contig for + genomic, the transcript's protein for protein or coding for cdna. + ``representative_allele`` is any member for a haplotype. ``None`` if unresolvable + (the caller surfaces that as an annotation error). + """ + if alignment_level == AnnotationLayer.GENOMIC: + sequence_id = ( + f"ga4gh:{representative_allele.location.sequenceReference.refgetAccession}" + ) + accession = get_chromosome_identifier_from_vrs_id(sequence_id) + if accession is not None and accession.startswith("refseq:"): + return accession[len("refseq:") :] + return accession + if alignment_level == AnnotationLayer.CDNA: + return _resolve_cdna_accession(metadata, tx_results, None) + + if tx_results is None or isinstance(tx_results, TxSelectError): + return None + + return tx_results.np + + def _annotate_allele_mapping( mapped_score: MappedScore, tx_results: TxSelectResult | TxSelectError | None, @@ -764,22 +819,6 @@ def _annotate_allele_mapping( pre_mapped.extensions = [ref_allele_seq_extension] if post_mapped: - # Determine reference sequence - if mapped_score.alignment_level == AnnotationLayer.GENOMIC: - sequence_id = f"ga4gh:{mapped_score.post_mapped.location.sequenceReference.refgetAccession}" - accession = get_chromosome_identifier_from_vrs_id(sequence_id) - if accession is None: - accession = None - mapped_score.error_message = "Could not determine accession for this annotation. No allele expression is available." - elif accession.startswith("refseq:"): - accession = accession[7:] - else: - if tx_results is None or isinstance(tx_results, TxSelectError): - accession = None - mapped_score.error_message = "Could not determine accession for this annotation. No allele expression is available." - else: - accession = tx_results.np - sr = get_seqrepo() loc = mapped_score.post_mapped.location sequence_id = f"ga4gh:{loc.sequenceReference.refgetAccession}" @@ -791,9 +830,16 @@ def _annotate_allele_mapping( Extension(type="Extension", name="vrs_ref_allele_seq", value=ref) ] - if accession: - hgvs_string, syntax = _get_hgvs_string(post_mapped, accession) - post_mapped.expressions = [Expression(syntax=syntax, value=hgvs_string)] + # Trust a carried HGVS (coding records carry their c.); else reconstruct (g./p. only). + if not post_mapped.expressions: + accession = _resolve_postmapped_accession( + mapped_score.alignment_level, post_mapped, metadata, tx_results + ) + if accession is None: + mapped_score.error_message = "Could not determine accession for this annotation. No allele expression is available." + else: + hgvs_string, syntax = _get_hgvs_string(post_mapped, accession) + post_mapped.expressions = [Expression(syntax=syntax, value=hgvs_string)] if vrs_version == VrsVersion.V_1_3: pre_mapped = _allele_to_vod(pre_mapped) @@ -807,6 +853,7 @@ def _annotate_allele_mapping( score=float(mapped_score.score) if mapped_score.score is not None else None, error_message=mapped_score.error_message, alignment_level=mapped_score.alignment_level, + outcome=_resolve_outcome(mapped_score), ) @@ -831,22 +878,10 @@ def _annotate_haplotype_mapping( allele.extensions = [ref_allele_seq_extension] if post_mapped: - # Determine reference sequence - if mapped_score.alignment_level == AnnotationLayer.GENOMIC: - sequence_id = f"ga4gh:{post_mapped.members[0].location.sequenceReference.refgetAccession}" - accession = get_chromosome_identifier_from_vrs_id(sequence_id) - if accession is None: - accession = None - mapped_score.error_message = "Could not determine accession for this annotation. No allele expression is available." - elif accession.startswith("refseq:"): - accession = accession[7:] - else: - if tx_results is None or isinstance(tx_results, TxSelectError): - # impossible by definition - accession = None - mapped_score.error_message = "Could not determine accession for this annotation. No allele expression is available." - else: - accession = tx_results.np + # Members share one reference; resolve the reconstruction accession once. + accession = _resolve_postmapped_accession( + mapped_score.alignment_level, post_mapped.members[0], metadata, tx_results + ) sr = get_seqrepo() for allele in post_mapped.members: @@ -862,7 +897,11 @@ def _annotate_haplotype_mapping( Extension(type="Extension", name="vrs_ref_allele_seq", value=ref) ] - if accession: + if allele.expressions: + continue + if accession is None: + mapped_score.error_message = "Could not determine accession for this annotation. No allele expression is available." + else: hgvs, syntax = _get_hgvs_string(allele, accession) allele.expressions = [Expression(syntax=syntax, value=hgvs)] @@ -878,6 +917,7 @@ def _annotate_haplotype_mapping( score=float(mapped_score.score) if mapped_score.score is not None else None, error_message=mapped_score.error_message, alignment_level=mapped_score.alignment_level, + outcome=_resolve_outcome(mapped_score), ) @@ -917,6 +957,7 @@ def annotate( vrs_version=vrs_version, error_message=mapped_score.error_message, alignment_level=mapped_score.alignment_level, + outcome=_resolve_outcome(mapped_score), ) ) elif isinstance(mapped_score.pre_mapped, Haplotype) and ( @@ -949,6 +990,7 @@ def annotate( else None, error_message=f"Multiple issues with annotation: Inconsistent variant structure (Allele and Haplotype mix).{' ' + mapped_score.error_message if mapped_score.error_message else ''}", alignment_level=mapped_score.alignment_level, + outcome=_resolve_outcome(mapped_score), ) ) @@ -1073,18 +1115,47 @@ def _align_result_for_target( return result -def _pick_preferred_genomic_or_protein_layer( +def _pick_preferred_layer( + target_meta: TargetGene, mappings: list[ScoreAnnotation], ) -> AnnotationLayer: - """Return GENOMIC if any annotation has that layer, else PROTEIN. - - Precondition: ``mappings`` must contain at least one annotation with - ``alignment_level`` in {GENOMIC, PROTEIN}. CDNA-only score sets are not - currently supported (``vrs_map`` does not emit CDNA-layer scores); this - function will silently return PROTEIN for them, which would then drop - every CDNA score from the output. Update this function if CDNA scores - become emittable. + """Return the preferred (assay) layer: the least-derived standard frame the target's + input lets us assert. + + A variant is most faithfully represented in the coordinate system its assay natively + *described* it in. Every other layer is a projection -- a transform that can lose or + distort information -- so the assay layer is the one record we did not have to derive, + and the one carried as preferred (``preferred_layer_only=True`` keeps exactly this + layer and suppresses the projected forms). Two qualifiers sharpen "least-derived": + + * *standard frame* -- a target's bespoke sequence coordinates are not a shareable + reference, so they never count; we express against g./c./p. references only. + * *we can assert* -- the frame must actually be reachable for this target; an + unreachable native frame falls through to the next-best reachable one. + + Accession-based targets name their frame in the accession, and it is always assertable + (the accession *is* the reference), so selection is deterministic: + + * ``NP_``/``ENSP`` -> PROTEIN (variants described as protein consequences) + * ``NM_``/``ENST`` -> CDNA (variants already coding on the transcript) + * ``NC_`` -> GENOMIC (variants described against the contig) + + Sequence-based targets have no standard native frame, so we fall to the nearest one + reachable by alignment: genomic -- the universal anchor -- when the genome alignment + succeeded (the common case, hence the reachability check against ``mappings``), else + protein (e.g. a construct that does not place on the genome but aligns to a reference + protein). This is why ``mappings`` is consulted only here, never for accessions. """ + accession = target_meta.target_accession_id + if accession is not None: + if accession.startswith(("NP", "ENSP")): + return AnnotationLayer.PROTEIN + if accession.startswith(("NM", "ENST")): + return AnnotationLayer.CDNA + if accession.startswith("NC"): + return AnnotationLayer.GENOMIC + + # Sequence-based: prefer the genomic anchor when it was reachable, else protein. for mapping in mappings: if mapping.alignment_level == AnnotationLayer.GENOMIC: return AnnotationLayer.GENOMIC @@ -1217,11 +1288,10 @@ def _build_target_mapping( ``align_result.alignment_qc`` for GENOMIC rows or ``protein_align_result.alignment_qc`` for PROTEIN rows. - Note: CDNA ``TargetMapping`` rows are **not** emitted today — no - ``mapped_score`` carries ``alignment_level=CDNA`` (``vrs_map`` only - produces GENOMIC and PROTEIN scores). The CDNA entry added to - ``reference_sequences`` in ``build_scoreset_mapping`` is purely a - reference-sequence accession record, not a scored layer. + CDNA rows are scored here for cdna-source (``NM_``/``ENST``) targets and, in + all-layers mode, for projected cdna. They carry no alignment QC -- there is no + cdna-frame alignment, so ``qc_source`` stays ``None``. Reference-only cdna/protein + layers (no variants) come from ``_build_identity_target_mapping`` instead. """ reference_accession = _reference_accession_for_target_level( alignment_level, target_meta, tx_result, align_result @@ -1237,9 +1307,7 @@ def _build_target_mapping( # Pick the alignment that lives in this row's coordinate frame: # GENOMIC -> BLAT genomic alignment; PROTEIN -> target-protein-to-reference - # alignment from vrs_map. - # No CDNA branch: vrs_map never emits CDNA-layer scores, so this function - # is never called with alignment_level=CDNA in practice. + # alignment from vrs_map. CDNA has no own-frame alignment, so qc_source stays None. qc_source: AlignmentResult | None = None if alignment_level == AnnotationLayer.GENOMIC: qc_source = align_result @@ -1351,6 +1419,46 @@ def _build_target_mapping( ) +def _build_identity_target_mapping( + target_gene_identifier: str, + alignment_level: AnnotationLayer, + reference_accession: str, + vrs_version: VrsVersion, +) -> TargetMapping: + """Assemble an identity ``target_mappings[]`` row for a coding layer that + produced no per-variant mappings this run. + + A coding target's coding transcript (and protein reference) is known/selected + by the mapper even when it does not emit per-variant cdna/protein mappings -- + the genomic-accession (``NC_``) path projects onto a gene-selected MANE + transcript, and sequence-based / ``NM_`` targets carry the transcript trivially. + This row surfaces that reference identity as target metadata so the API/RT can + resolve the projection transcript without re-deriving it. It carries **null QC + and null counts**: no ``mapped_score`` joins it (the + ``mapped_score -> TargetGeneMapping`` join guarantee applies only to scored + layers), so there is nothing to tally. + """ + reference_sequence_id: str | None = None + try: + reference_sequence_id = get_vrs_id_from_identifier(reference_accession) + except Exception: + _logger.exception( + "Could not resolve VRS sequence id for %s", reference_accession + ) + + return TargetMapping( + target_gene_identifier=target_gene_identifier, + alignment_level=alignment_level, + preferred=False, + tool_name="dcd-mapping", + tool_version=dcd_mapping_version, + tool_parameters={"aligner": "transcript_identity"}, + reference_accession=reference_accession, + reference_sequence_id=reference_sequence_id, + vrs_version=vrs_version, + ) + + def build_scoreset_mapping( metadata: ScoresetMetadata, raw_metadata: dict, @@ -1413,12 +1521,8 @@ def build_scoreset_mapping( near_gap_window, ) - # preferred_layer_for_target is the single "best" layer for this target - # (GENOMIC when available, else PROTEIN). It drives both which - # TargetMapping row gets preferred=True and where completely-failed - # variants (annotation_layer=None) are attributed. - preferred_layer_for_target = _pick_preferred_genomic_or_protein_layer( - mappings[target_gene] + preferred_layer_for_target = _pick_preferred_layer( + metadata.target_genes[target_gene], mappings[target_gene] ) if preferred_layer_only: preferred_layers = {preferred_layer_for_target} @@ -1434,6 +1538,13 @@ def build_scoreset_mapping( # before constructing TargetAnnotation (which requires valid AnnotationLayer keys). preferred_layers.discard(None) + _logger.info( + "For target %s, preferred layer is %s and layers seen are %s", + target_gene_name, + preferred_layer_for_target, + preferred_layers, + ) + reference_sequences[target_gene_name] = TargetAnnotation( gene_info=gene_info.get(target_gene), layers={ @@ -1459,12 +1570,16 @@ def build_scoreset_mapping( genomic_align_for_target, ) - # if genomic layer, not accession-based, and target gene type is coding, add cdna entry (just the sequence accession) to reference_sequences dict + # If genomic layer and coding target, add a cdna entry (just the sequence + # accession) to the reference_sequences dict. Covers both sequence-based + # targets and genomic-accession (NC_) coding targets -- both carry the + # selected coding transcript on tx_output[...].nm. NM_/ENST and NP_ accession + # targets do not qualify (no TxSelectResult with nm), so the tx checks below + # are the gate rather than an explicit accession-prefix test. if ( AnnotationLayer.GENOMIC in reference_sequences[target_gene_name].layers and metadata.target_genes[target_gene].target_gene_category == TargetType.PROTEIN_CODING - and metadata.target_genes[target_gene].target_accession_id is None and tx_output[target_gene] is not None and isinstance(tx_output[target_gene], TxSelectResult) and tx_output[target_gene].nm is not None @@ -1476,62 +1591,75 @@ def build_scoreset_mapping( }, } - for m in mappings[target_gene]: - if m.alignment_level is None and m.pre_mapped is None: - # Completely-failed variant — vrs_map could not determine a layer. - # Re-attribute to the preferred layer so every mapped_score has a - # parent TargetMapping row (the API joins on alignment_level). - score_dict = m.model_dump() - score_dict["alignment_level"] = preferred_layer_for_target - score_dict["target_gene_identifier"] = target_gene_name - mapped_scores.append(ScoreAnnotation(**score_dict)) - elif m.alignment_level in preferred_layers: - score_dict = m.model_dump() - score_dict["target_gene_identifier"] = target_gene_name - mapped_scores.append(ScoreAnnotation(**score_dict)) - - # Provenance/QC: emit one TargetMapping per (target, alignment_level) that - # actually produced variants in this run. The (target_gene_identifier, - # alignment_level) pair must be unique per run; the API uses it to attribute - # each mapped_variant to its source row via target_gene_mapping_id. - # - # Completely-failed variants (annotation_layer=None) were re-attributed to - # preferred_layer_for_target in mapped_scores above; include them in that - # layer's TargetMapping count so total_variants is accurate. - null_failures = [ - m - for m in mappings[target_gene] - if m.alignment_level is None and m.pre_mapped is None - ] - align_result_for_target = genomic_align_for_target - emitted_mappings = [ + # Every input variant must yield exactly one record at the preferred layer. + # Records already at a preferred layer are emitted directly. A variant with none + # gets a single re-attributed failure there: its own null-layer failure if it has + # one, else a synthesized failure (it mapped only at non-preferred layers -- e.g. a + # wild-type p.= on a genomic-preferred target). A variant already represented is + # never also re-attributed, so a dead attempt can't duplicate its real record. + preferred_mappings = [ m for m in mappings[target_gene] if m.alignment_level in preferred_layers ] + represented_ids = {m.mavedb_id for m in preferred_mappings} - # Group by alignment_level. - layers_seen: dict = {} - for m in emitted_mappings: - layers_seen.setdefault(m.alignment_level, []).append(m) + records_by_id: dict[str, list[ScoreAnnotation]] = {} + for m in mappings[target_gene]: + records_by_id.setdefault(m.mavedb_id, []).append(m) - for layer, layer_annotations in layers_seen.items(): - if layer is None: + reattributed: list[ScoreAnnotation] = [] + for variant_id, recs in records_by_id.items(): + if variant_id in represented_ids: continue - - # Defensive: coerce raw string values (e.g. "g") to AnnotationLayer. - if not isinstance(layer, AnnotationLayer): - try: - layer = AnnotationLayer(layer) - except ValueError: - _logger.warning( - "Skipping target_mappings row for unknown annotation layer %r", - layer, + null_failure = next( + (m for m in recs if m.alignment_level is None and m.pre_mapped is None), + None, + ) + if null_failure is not None: + score_dict = null_failure.model_dump() + else: + # Mapped only at non-preferred layers; synthesize a failure there. + mapped_layers = sorted( + { + m.alignment_level.value + for m in recs + if m.alignment_level is not None + } + ) + score_dict = recs[0].model_dump() + score_dict["pre_mapped"] = None + score_dict["post_mapped"] = None + score_dict["outcome"] = MappingOutcome.FAILED + score_dict["error_message"] = ( + f"No representation at preferred layer {preferred_layer_for_target.value}" + + ( + f"; mapped only at: {', '.join(mapped_layers)}" + if mapped_layers + else "" ) - continue + ) + score_dict["alignment_level"] = preferred_layer_for_target + score_dict["target_gene_identifier"] = target_gene_name + reattributed.append(ScoreAnnotation(**score_dict)) + + for m in preferred_mappings: + score_dict = m.model_dump() + score_dict["target_gene_identifier"] = target_gene_name + mapped_scores.append(ScoreAnnotation(**score_dict)) + mapped_scores.extend(reattributed) + + # One TargetMapping per preferred layer that produced records. The + # (target_gene_identifier, alignment_level) key must be unique per run and + # cover every mapped_score, so the preferred layer also gets a row whenever + # failures were re-attributed to it. Re-attributed failures count toward its totals. + layers_seen: dict[AnnotationLayer, list[ScoreAnnotation]] = {} + for m in preferred_mappings: + layers_seen.setdefault(m.alignment_level, []).append(m) + if reattributed: + layers_seen.setdefault(preferred_layer_for_target, []) - # Null-layer failures are attributed to the preferred layer; fold them - # into that layer's annotation list so variant counts are correct. + for layer, layer_annotations in layers_seen.items(): annotations_for_tm = ( - list(layer_annotations) + null_failures + layer_annotations + reattributed if layer == preferred_layer_for_target else layer_annotations ) @@ -1542,7 +1670,7 @@ def build_scoreset_mapping( alignment_level=layer, preferred=(layer == preferred_layer_for_target), tx_result=tx_output.get(target_gene), - align_result=align_result_for_target, + align_result=genomic_align_for_target, vrs_version=vrs_version, annotations=annotations_for_tm, protein_align_result=protein_align_for_target, @@ -1550,6 +1678,39 @@ def build_scoreset_mapping( ) ) + # Identity target_mappings for coding layers that produced no per-variant + # mappings this run. A coding target's coding transcript (and protein + # reference) may be known to the mapper even when it emits no per-variant + # cdna/protein mappings: the genomic-accession (NC_) path projects onto a + # gene-selected MANE transcript; sequence-based and NM_/ENST targets carry + # it trivially. Surfacing it as a cdna (and protein) identity row lets consumers + # prefer the mapper-selected transcript over the NP_->NM_ fallback. + # Decoupled from layers_seen on purpose -- the cdna/protein per-variant + # forms are filtered, so those layers never appear in layers_seen. + target_meta = metadata.target_genes[target_gene] + if target_meta.target_gene_category == TargetType.PROTEIN_CODING: + scored_levels = {m.alignment_level for m in preferred_mappings} + scored_levels.discard(None) + for identity_level in (AnnotationLayer.CDNA, AnnotationLayer.PROTEIN): + if identity_level in scored_levels: + continue + identity_accession = _reference_accession_for_target_level( + identity_level, + target_meta, + tx_output.get(target_gene), + genomic_align_for_target, + ) + if identity_accession is None: + continue + target_mappings.append( + _build_identity_target_mapping( + target_gene_identifier=target_gene_name, + alignment_level=identity_level, + reference_accession=identity_accession, + vrs_version=vrs_version, + ) + ) + # Drop layers where both reference sequence entries are None, and any None-keyed # layers. Moved outside the per-target loop to avoid O(n²) scans and eliminate # the variable-shadowing hazard (the inner loop previously reused `target_gene`). diff --git a/src/dcd_mapping/exceptions.py b/src/dcd_mapping/exceptions.py index a2817c9..a70860c 100644 --- a/src/dcd_mapping/exceptions.py +++ b/src/dcd_mapping/exceptions.py @@ -57,3 +57,16 @@ class ResourceAcquisitionError(ValueError): class TxSelectError(ValueError): """Raise for transcript selection failure.""" + + +class NoCodingTranscriptError(TxSelectError): + """Raise when a protein-coding target has no resolvable coding transcript. + + Distinct from the regulatory/non-coding case (which returns ``None`` from + transcript selection because no coding transcript is expected): this signals + a coding target for which selection *should* have produced a transcript but + could not -- no resolvable gene symbol, no MANE/compatible transcript for the + gene, or a projection that does not land cleanly on the selected transcript. + Downstream records this as a recoverable skip, distinct from "no protein + consequence exists." + """ diff --git a/src/dcd_mapping/lookup.py b/src/dcd_mapping/lookup.py index bcffc8b..ca8c0d1 100644 --- a/src/dcd_mapping/lookup.py +++ b/src/dcd_mapping/lookup.py @@ -485,6 +485,118 @@ def get_gene_location(target_gene: TargetGene) -> GeneLocation | None: return None +_MAX_LOCUS_INFERENCE_SPAN = 5_000_000 +""" +The maximum span between the most distant variant loci of a target for which we will attempt +to infer the gene via Ensembl overlap. If the span is wider than this, we will skip locus +inference and fall back to the declared gene metadata. This threshold is set based on the +fact that even the largest human genes are well under 5 Mb in length, so a wider span likely +indicates an anomalous target (e.g. a multi-gene construct or stray positions) for which +locus inference would be unreliable and an oversized request to Ensembl would be doomed to fail. +""" + + +def infer_hgnc_symbol_from_genomic_loci( + accession: str, positions: list[int] +) -> str | None: + """Resolve the gene at a genomic accession's variant loci via Ensembl overlap. + + For a genomic-accession (``NC_``) target the variants' chromosomal positions are + known before mapping, so the "mapper-resolved" gene can be found by asking which + gene overlaps those loci -- the same Ensembl overlap source the post-mapping + gene-info cascade uses, but driven from the raw positions rather than mapped VRS + spans so it is available in time for transcript selection. The gene that selection + stamps onto ``TxSelectResult.hgnc_symbol`` is then reused by + ``annotate.compute_target_gene_info`` (its first-priority path), so Ensembl is + queried once, not twice. + + Issues a **single** Ensembl overlap request over the bounding span of all loci + (the variants of a coding target sit within one gene's span), then disambiguates + locally using the gene coordinates the response carries -- counting how many of the + query loci fall inside each returned gene -- so the cost is one request per target, + not one per variant. Only ``protein_coding`` genes are considered (a coding/MANE + transcript exists for no others). Returns the coding gene containing the most loci; + ``None`` on a tie, no protein-coding overlap, or a bounding span too wide to be a + single gene (see ``_MAX_LOCUS_INFERENCE_SPAN``). + + :param accession: genomic accession the variants are described against (``NC_``) + :param positions: 0-based genomic positions of the variants + :return: HGNC gene symbol if one gene clearly dominates, else ``None`` + """ + if not positions: + return None + + unique_positions = sorted(set(positions)) + span = unique_positions[-1] - unique_positions[0] + if span >= _MAX_LOCUS_INFERENCE_SPAN: + _logger.warning( + "Variant loci of %s span %d bp (>= %d), too wide for single-gene locus " + "inference; falling back to declared gene metadata.", + accession, + span, + _MAX_LOCUS_INFERENCE_SPAN, + ) + return None + + try: + features = get_overlapping_features_for_region( + accession, unique_positions[0], unique_positions[-1] + 1, features=["gene"] + ) + except Exception: + _logger.exception( + "Overlap query failed for %s over %d-%d", + accession, + unique_positions[0], + unique_positions[-1], + ) + return None + + # Disambiguate locally from the returned gene coordinates -- no extra requests. + # Restrict to protein-coding genes: a coding transcript (MANE) only exists for one, + # so a non-coding gene (lncRNA, pseudogene, etc.) overlapping the same locus would + # just fail the downstream MANE lookup. Filtering here also resolves the common tie + # between a coding gene and an overlapping non-coding one. + counts: dict[str, int] = {} + for feature in features: + symbol = feature.get("external_name") + start = feature.get("start") + end = feature.get("end") + biotype = feature.get("biotype") + if not symbol or start is None or end is None or biotype != "protein_coding": + continue + + contained = sum(1 for p in unique_positions if start <= p <= end) + if contained: + counts[symbol] = contained + + if not counts: + _logger.info( + "No protein-coding gene overlaps the variant loci of %s; cannot infer gene.", + accession, + ) + return None + + max_count = max(counts.values()) + candidates = sorted(g for g, c in counts.items() if c == max_count) + if len(candidates) > 1: + _logger.warning( + "Multiple candidate genes overlap the variant loci of %s: %s. " + "No gene inferred from loci.", + accession, + candidates, + ) + return None + + # Ensembl's ``external_name`` is already the gene's HGNC symbol, which is what the + # MANE-by-gene lookup matches on. Canonicalize through the gene normalizer when it + # resolves, but fall back to the Ensembl symbol rather than dropping the gene if the + # normalizer returns nothing -- losing a locus-confirmed gene to a normalizer miss + # would force an avoidable "no coding transcript" skip. + inferred = _get_hgnc_symbol(candidates[0]) or candidates[0] + _logger.info("Inferred gene %s from variant loci of %s.", inferred, accession) + return inferred + + # --------------------------------- SeqRepo --------------------------------- # @@ -704,9 +816,154 @@ def translate_ref_identical_to_vrs(hgvs_string: str) -> Allele: return build_ref_identical_allele(accession) +# ------------------------------ HGVS Projection ------------------------------ # + + +def project_genomic_hgvs_to_coding(g_hgvs: str, transcript: str) -> str: + """Project a genomic HGVS expression onto a coding transcript (``g.`` -> ``c.``). + + Uses the cdot-backed hgvs ``VariantMapper`` already wired up for this package + (see :func:`init_hgvs_tools`). This is the mapper's *projection* of a measured + variant onto its own coding form -- distinct from the equivalence-class + expansion (all synonymous codons) that the reverse-translation job owns. + + :param g_hgvs: genomic HGVS string, e.g. ``NC_000001.11:g.123A>G`` + :param transcript: coding transcript accession to project onto, e.g. ``NM_...`` + :return: coding HGVS string (``NM_...:c....``) + """ + tools = TranslatorBuilder(get_seqrepo()).hgvs_tools + var_g = tools.parser.parse(g_hgvs) + var_c = tools.variant_mapper.g_to_c(var_g, transcript) + return str(var_c) + + +def coding_hgvs_is_intronic(c_hgvs: str) -> bool: + """Return whether a coding HGVS expression refers to an intronic position. + + Detected from the parsed base-offset position (a non-zero intron offset), not the + string: a textual ``-`` would misclassify 5'UTR positions (``c.-20``, offset 0) as + intronic. Intronic variants have no protein consequence and cannot be represented by + ga4gh's VRS tools, so callers projecting a variant onto a transcript skip them. + + :param c_hgvs: coding HGVS string, e.g. ``NM_...:c.2002-1_2003del`` + :return: ``True`` if either endpoint carries a non-zero intron offset + """ + tools = TranslatorBuilder(get_seqrepo()).hgvs_tools + pos = tools.parser.parse(c_hgvs).posedit.pos + return bool(getattr(pos.start, "offset", 0) or getattr(pos.end, "offset", 0)) + + +def project_coding_hgvs_to_protein(c_hgvs: str) -> str: + """Project a coding HGVS expression onto its protein consequence (``c.`` -> ``p.``). + + :param c_hgvs: coding HGVS string, e.g. ``NM_...:c.76A>G`` + :return: protein HGVS string (``NP_...:p....``) + """ + tools = TranslatorBuilder(get_seqrepo()).hgvs_tools + var_c = tools.parser.parse(c_hgvs) + var_p = tools.variant_mapper.c_to_p(var_c) + return str(var_p) + + +def get_genomic_accession_for_transcript( + transcript: str, assembly: str = "GRCh38" +) -> str | None: + """Resolve the genomic contig (``NC_``) a coding transcript aligns to on an assembly. + + Needed for the ``c.`` -> ``g.`` projection: unlike :func:`project_genomic_hgvs_to_coding` + (whose ``g_to_c`` reads the genomic accession from the input string), hgvs's + ``c_to_g`` requires the target genomic accession passed explicitly. cdot carries one + contig per genome build in the transcript record, but its public + ``get_tx_mapping_options`` flattens the build keys away, so disambiguate by + intersecting the candidate contigs with the requested assembly's contig set + (``get_assembly_map``). + + :param transcript: coding transcript accession, e.g. ``NM_004333.6`` + :param assembly: genome build to resolve the contig on (``GRCh38`` or ``GRCh37``) + :return: the assembly's ``NC_`` contig for the transcript, or ``None`` if unresolvable + """ + cd = cdot_rest() + try: + options = cd.get_tx_mapping_options(transcript) + assembly_contigs = cd.get_assembly_map(assembly) + except Exception: + _logger.exception( + "Could not resolve genomic contig for transcript %s on %s", + transcript, + assembly, + ) + return None + + for option in options: + if option["alt_ac"] in assembly_contigs: + return option["alt_ac"] + + return None + + +def project_coding_hgvs_to_genomic(c_hgvs: str, alt_ac: str) -> str: + """Project a coding HGVS expression onto its genomic form (``c.`` -> ``g.``). + + The deterministic genomic re-expression of a coding variant, on the contig + ``alt_ac`` (resolve it with :func:`get_genomic_accession_for_transcript`). Like the + other projection helpers this routes through the cdot-backed package hgvs + ``VariantMapper`` (see :func:`init_hgvs_tools`); ``c_to_g`` needs the target contig + explicitly because the coding expression does not name one. + + :param c_hgvs: coding HGVS string, e.g. ``NM_004333.6:c.1A>G`` + :param alt_ac: genomic contig accession to project onto, e.g. ``NC_000007.14`` + :return: genomic HGVS string (``NC_...:g....``) + """ + tools = TranslatorBuilder(get_seqrepo()).hgvs_tools + var_c = tools.parser.parse(c_hgvs) + var_g = tools.variant_mapper.c_to_g(var_c, alt_ac) + return str(var_g) + + # ----------------------------------- MANE ----------------------------------- # +def _sort_mane_result(description: ManeDescription) -> int: + if description.transcript_priority == TranscriptPriority.MANE_SELECT: + return 2 + if description.transcript_priority == TranscriptPriority.MANE_PLUS_CLINICAL: + return 1 + + # should be unreachable. + _logger.warning( + "Unrecognized transcript priority value %s for transcript description of %s", + description.transcript_priority, + description.refseq_nuc, + ) + return 0 + + +def _mane_row_to_description(row: dict[str, Any]) -> ManeDescription: + """Build a ``ManeDescription`` from a row of the MANE summary dataframe. + + Both the transcript-keyed and gene-keyed lookups read the same dataframe + (``ManeTranscriptMappings.df``), so they share column names. + """ + return ManeDescription( + ncbi_gene_id=row["#NCBI_GeneID"], + ensembl_gene_id=row["Ensembl_Gene"], + hgnc_gene_id=row["HGNC_ID"], + symbol=row["symbol"], + name=row["name"], + refseq_nuc=row["RefSeq_nuc"], + refseq_prot=row["RefSeq_prot"], + ensembl_nuc=row["Ensembl_nuc"], + ensembl_prot=row["Ensembl_prot"], + transcript_priority=TranscriptPriority( + "_".join(row["MANE_status"].lower().split()) + ), + grch38_chr=row["GRCh38_chr"], + chr_start=row["chr_start"], + chr_end=row["chr_end"], + chr_strand=row["chr_strand"], + ) + + def get_mane_transcripts(transcripts: list[str]) -> list[ManeDescription]: """Get corresponding MANE data for transcripts. Results given in order of transcript preference. @@ -714,44 +971,27 @@ def get_mane_transcripts(transcripts: list[str]) -> list[ManeDescription]: :param transcripts: candidate transcripts list :return: complete MANE descriptions """ - - def _sort_mane_result(description: ManeDescription) -> int: - if description.transcript_priority == TranscriptPriority.MANE_SELECT: - return 2 - if description.transcript_priority == TranscriptPriority.MANE_PLUS_CLINICAL: - return 1 - # should be impossible - _logger.warning( - "Unrecognized transcript priority value %s for transcript description of %s", - description.transcript_priority, - description.refseq_nuc, - ) - return 0 - mane_df = CoolSeqToolBuilder().mane_transcript_mappings.df mane_results = mane_df.filter(pl.col("RefSeq_nuc").is_in(transcripts)) - mane_data = [] - for row in mane_results.rows(named=True): - mane_data.append( - ManeDescription( - ncbi_gene_id=row["#NCBI_GeneID"], - ensembl_gene_id=row["Ensembl_Gene"], - hgnc_gene_id=row["HGNC_ID"], - symbol=row["symbol"], - name=row["name"], - refseq_nuc=row["RefSeq_nuc"], - refseq_prot=row["RefSeq_prot"], - ensembl_nuc=row["Ensembl_nuc"], - ensembl_prot=row["Ensembl_prot"], - transcript_priority=TranscriptPriority( - "_".join(row["MANE_status"].lower().split()) - ), - grch38_chr=row["GRCh38_chr"], - chr_start=row["chr_start"], - chr_end=row["chr_end"], - chr_strand=row["chr_strand"], - ) - ) + mane_data = [_mane_row_to_description(row) for row in mane_results.rows(named=True)] + mane_data.sort(key=_sort_mane_result) + return mane_data + + +def get_mane_transcripts_for_gene(gene_symbol: str) -> list[ManeDescription]: + """Get MANE transcript(s) for a gene symbol directly, without a prior alignment. + + Uses cool-seq-tool's gene-keyed MANE lookup. Unlike ``get_mane_transcripts`` + (which filters by candidate transcript accessions seeded from a genomic + alignment), this resolves the gene's MANE transcript straight from the gene + symbol -- the path needed for accession-based genomic (``NC_``) targets that + have no BLAT alignment to seed candidate transcripts. + + :param gene_symbol: HGNC gene symbol + :return: MANE descriptions, MANE Select first then MANE Plus Clinical + """ + rows = CoolSeqToolBuilder().mane_transcript_mappings.get_gene_mane_data(gene_symbol) + mane_data = [_mane_row_to_description(row) for row in rows] mane_data.sort(key=_sort_mane_result) return mane_data diff --git a/src/dcd_mapping/schemas.py b/src/dcd_mapping/schemas.py index fa09939..999dff6 100644 --- a/src/dcd_mapping/schemas.py +++ b/src/dcd_mapping/schemas.py @@ -42,6 +42,31 @@ class VrsVersion(StrEnum): V_2 = "2" +class MappingOutcome(StrEnum): + """Per-record outcome for one (variant, annotation level) pair. + + The mapper's output is a complete accounting: for every variant and every + annotation level in that variant's deterministically-reachable set, there is one + record carrying its outcome -- never a silent omission. This field is uniform across + measured (assay-level) and projected (deterministic non-assay) records so the two can + be treated identically by consumers; it distinguishes a benign absence from a genuine + failure, which a populated ``error_message`` alone cannot. + + - ``MAPPED`` -- a VRS allele was produced (``pre_mapped``/``post_mapped`` populated). + - ``INTRONIC`` -- the variant's coding projection is intronic: no VRS-representable + coding form and no protein consequence. Benign (``error_message`` is ``None``). + - ``NO_PROTEIN_CONSEQUENCE`` -- the protein layer was reachable but yields no + projectable protein change (e.g. UTR). Benign (``error_message`` is ``None``). + - ``FAILED`` -- the mapping/projection genuinely failed (mis-selected transcript, + projection error, unresolvable reference contig). ``error_message`` carries detail. + """ + + MAPPED = "mapped" + INTRONIC = "intronic" + NO_PROTEIN_CONSEQUENCE = "no_protein_consequence" + FAILED = "failed" + + class UniProtRef(BaseModel): """Store metadata associated with MaveDB UniProt reference""" @@ -250,6 +275,9 @@ class MappedScore(BaseModel): pre_mapped: Allele | Haplotype | None = None post_mapped: Allele | Haplotype | None = None error_message: str | None = None + # Typed outcome for this (variant, level) record. None until stamped (legacy / + # pre-annotation); ``annotate`` resolves it for every emitted record. + outcome: MappingOutcome | None = None class ScoreAnnotation(BaseModel): @@ -285,6 +313,10 @@ class ScoreAnnotation(BaseModel): score: float | None = None error_message: str | None = None alignment_level: AnnotationLayer | None = None + # Typed outcome for this (variant, level) record -- see MappingOutcome. Always + # populated on emitted annotations; distinguishes a benign absence (intronic, no + # protein consequence) from a genuine failure that error_message alone cannot. + outcome: MappingOutcome | None = None # Per-variant alignment-locus flags. None means "not evaluated" (e.g. no # genomic alignment available, or non-genomic annotation layer); True/False # mean the flag was computed against this run's alignment. diff --git a/src/dcd_mapping/transcripts.py b/src/dcd_mapping/transcripts.py index 6f6e0c5..9475a69 100644 --- a/src/dcd_mapping/transcripts.py +++ b/src/dcd_mapping/transcripts.py @@ -2,23 +2,27 @@ import logging import re -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from Bio import Align from Bio.Data.CodonTable import IUPACData from Bio.Seq import Seq from Bio.SeqUtils import seq1 from cool_seq_tool.schemas import TranscriptPriority +from mavehgvs.util import parse_variant_strings -from dcd_mapping.exceptions import TxSelectError +from dcd_mapping.exceptions import NoCodingTranscriptError, TxSelectError from dcd_mapping.lookup import ( get_chromosome_identifier, + get_gene_symbol, get_mane_transcripts, + get_mane_transcripts_for_gene, get_protein_accession, get_seqrepo, get_sequence, get_transcripts, get_uniprot_sequence, + infer_hgnc_symbol_from_genomic_loci, ) from dcd_mapping.schemas import ( AlignmentResult, @@ -380,6 +384,125 @@ def _handle_edge_cases( return transcript_reference +def _genomic_positions_from_records(records: list[ScoreRow]) -> list[int]: + """Extract 0-based genomic positions from a target's ``NC_:g.`` variant rows. + + Used to resolve the target's gene by genomic-locus overlap before mapping (see + :func:`_select_genomic_accession_reference`). A single representative position per + variant is enough to identify the gene at the locus; unparseable, reference, and + intronic rows are skipped. + """ + positions: list[int] = [] + for row in records: + if row.hgvs_nt in {"_wt", "_sy", "="} or "fs" in row.hgvs_nt: + continue + + try: + variants, errors = parse_variant_strings([row.hgvs_nt]) + except Exception: # noqa: S112 -- best-effort locus parse; skip unparseable rows + continue + + variant, error = variants[0], errors[0] + if error is not None or variant is None or variant.positions is None: + continue + + variant_positions = ( + variant.positions + if isinstance(variant.positions, Iterable) + else [variant.positions] + ) + for position in variant_positions: + if position.is_intronic(): + continue + # mavehgvs positions are 1-based; Ensembl overlap is 0-based. + positions.append(position.position - 1) + + return positions + + +def _select_genomic_accession_reference( + target_gene: TargetGene, records: list[ScoreRow] +) -> TxSelectResult: + """Select the coding transcript for a protein-coding genomic-accession target. + + Genomic-accession targets (variants submitted as ``NC_:g.``) have no BLAT + alignment to seed candidate transcripts, so transcript selection resolves the + target's gene symbol and asks cool-seq-tool for that gene's MANE transcript + directly (MANE Select, then MANE Plus Clinical). This is the coding transcript + the genomic variants project onto for reverse translation; the mapper surfaces + it as cdna target metadata rather than emitting per-variant coding mappings. + + Gene resolution prefers the mapper-resolved gene -- inferred from the variants' + genomic loci via Ensembl overlap -- and falls back to normalizing the declared + target metadata. The inferred gene is more reliable than the declared name (which + may be e.g. ``"Wildtype G6PD"``), and stamping it onto the returned + ``hgnc_symbol`` lets ``compute_target_gene_info`` reuse it instead of re-querying + Ensembl. + + The returned ``TxSelectResult`` carries ``nm``/``np``/``hgnc_symbol`` for that + purpose; ``start``/``sequence`` are inert (mirroring the ``NP_`` accession + passthrough) because no protein-sequence offset is computed for this path. + + :param target_gene: target gene metadata from MaveDB + :param records: the target's score rows (source of the genomic loci) + :return: transcript selection carrying the selected coding transcript + :raise NoCodingTranscriptError: if no gene symbol resolves or the gene has no + MANE transcript + """ + gene_symbol = None + gene_source = None + if target_gene.target_accession_id: + gene_symbol = infer_hgnc_symbol_from_genomic_loci( + target_gene.target_accession_id, + _genomic_positions_from_records(records), + ) + if gene_symbol: + gene_source = "locus_overlap" + + if not gene_symbol: + gene_symbol = get_gene_symbol(target_gene) + if gene_symbol: + gene_source = "target_metadata" + + if not gene_symbol: + msg = ( + f"Unable to resolve a gene symbol for genomic-accession target " + f"{target_gene.target_gene_name!r} (accession " + f"{target_gene.target_accession_id!r}); cannot select a coding transcript." + ) + _logger.warning(msg) + raise NoCodingTranscriptError(msg) + + mane_transcripts = get_mane_transcripts_for_gene(gene_symbol) + best_tx = _choose_best_mane_transcript(mane_transcripts) + if not best_tx: + msg = ( + f"No MANE transcript found for gene {gene_symbol!r} (resolved via " + f"{gene_source}) for target {target_gene.target_gene_name!r}." + ) + _logger.warning(msg) + raise NoCodingTranscriptError(msg) + + _logger.info( + "Selected coding transcript %s (%s) for genomic-accession target %r: " + "gene %s resolved via %s.", + best_tx.refseq_nuc, + best_tx.transcript_priority, + target_gene.target_gene_name, + gene_symbol, + gene_source, + ) + return TxSelectResult( + nm=best_tx.refseq_nuc, + np=best_tx.refseq_prot, + start=0, + is_full_match=True, + sequence="", + transcript_mode=best_tx.transcript_priority, + hgnc_symbol=best_tx.symbol, + ) + + async def select_transcript( scoreset_urn: str, target_gene: TargetGene, @@ -432,23 +555,44 @@ async def select_transcripts( str, TxSelectResult | TxSelectError | KeyError | None ] = {} for target_gene in scoreset_metadata.target_genes: - if scoreset_metadata.target_genes[target_gene].target_accession_id: - # for accession-based targets, create tx select objects for protein sequence accessions only - accession_id = scoreset_metadata.target_genes[ - target_gene - ].target_accession_id - # TODO create full list of possible protein accession prefixes - if accession_id.startswith(("NP_", "ENSP_")): - # TODO make sequence field optional instead of leaving blank here? + target = scoreset_metadata.target_genes[target_gene] + if target.target_accession_id: + accession_id = target.target_accession_id + # Passthrough for targets with NP_ or ENSP_ accessions -- skip selection + # and just use the provided protein accession as the reference. + if accession_id.startswith( + ( + "NP_", + "ENSP_", + ) + ): # TODO create full list of possible protein accession prefixes selected_transcripts[target_gene] = TxSelectResult( np=accession_id, start=0, is_full_match=True, sequence="", transcript_mode=None, - ) + ) # TODO make sequence field optional instead of leaving blank here? + + # Genomic accession targets (variants submitted as NC_:g.) are handled by a + # special path in transcript selection because they have no BLAT alignment. + # Resolve the gene's MANE coding transcript directly. + elif ( + accession_id.startswith("NC_") + and target.target_gene_category == TargetType.PROTEIN_CODING + ): + try: + selected_transcripts[target_gene] = ( + _select_genomic_accession_reference( + target, records[target_gene] + ) + ) + except (TxSelectError, KeyError) as e: + selected_transcripts[target_gene] = e + else: selected_transcripts[target_gene] = None + else: try: selected_transcripts[target_gene] = await select_transcript( diff --git a/src/dcd_mapping/vrs_map.py b/src/dcd_mapping/vrs_map.py index 28c3795..fc13f23 100644 --- a/src/dcd_mapping/vrs_map.py +++ b/src/dcd_mapping/vrs_map.py @@ -2,7 +2,9 @@ import logging import os +from collections import Counter from collections.abc import Iterable +from enum import StrEnum from itertools import cycle from Bio.Seq import Seq @@ -11,11 +13,13 @@ from ga4gh.core import sha512t24u from ga4gh.vrs._internal.models import ( Allele, + Expression, Haplotype, LiteralSequenceExpression, ReferenceLengthExpression, SequenceLocation, SequenceString, + Syntax, ) from ga4gh.vrs.normalize import normalize from mavehgvs.util import parse_variant_strings @@ -30,8 +34,13 @@ from dcd_mapping.lookup import ( build_ref_identical_allele, cdot_rest, + coding_hgvs_is_intronic, get_chromosome_identifier, + get_genomic_accession_for_transcript, get_seqrepo, + project_coding_hgvs_to_genomic, + project_coding_hgvs_to_protein, + project_genomic_hgvs_to_coding, translate_hgvs_to_vrs, translate_ref_identical_to_vrs, ) @@ -39,6 +48,7 @@ from dcd_mapping.schemas import ( AlignmentResult, MappedScore, + MappingOutcome, ScoreRow, TargetGene, TargetSequenceType, @@ -57,19 +67,32 @@ CLINGEN_API_URL = os.environ.get("CLINGEN_API_URL", "https://reg.genome.network/allele") +class ProjectionOutcome(StrEnum): + """Per-variant projection QC (internal; drives the validation log only, never emitted). + + ``failed`` means the selected transcript could not project its own variant -- a + mis-selection signal. + """ + + PROJECTED = "projected" # coding (and usually protein) form constructed cleanly + INTRONIC = "intronic" # coding projection lands in an intron -- benign, no protein consequence + SKIPPED = "skipped" # not a projectable variant row (_wt/_sy/=/fs) + FAILED = "failed" # g.->c. projection or coding allele construction failed -- selection suspect + + +_PROJECTION_FAILURE_WARN_FRACTION = 0.25 +"""Projection-failure fraction above which the per-target summary logs at WARNING, not INFO.""" + + def _hgvs_variant_is_valid(hgvs_string: str) -> bool: return not hgvs_string.endswith((".=", ")", "X")) def _process_any_aa_code(hgvs_pro_string: str) -> str: - """Substitute "Xaa" for "?" in variation expression. - - Some expressions seem to use the single-character "?" wildcard in the context of - three-letter amino acid codes. This is weird, and the proper replacement is "Xaa". + """Substitute "Xaa" for the "?" wildcard in a protein expression. - Note that we currently do NOT make any alterations to nucleotide strings that use - weird apparently-wildcard characters like "X" -- we just treat them as invalid (see - _hgvs_variant_is_valid()). + Nucleotide strings with "X"-style wildcards are not adjusted -- they're treated as + invalid (see ``_hgvs_variant_is_valid``). :param hgvs_string: MAVE HGVS expression :return: processed variation (equivalent to input if no wildcard code found) @@ -126,12 +149,11 @@ def _create_pre_mapped_hgvs_strings( alignment: AlignmentResult | None = None, accession_id: str | None = None, ) -> list[str]: - """Generate a list of (pre-mapped) HGVS strings from one long string containing many valid HGVS substrings + """Generate pre-mapped HGVS strings from a raw string of one or more HGVS substrings. - Currently, the provided transcript is used as the reference for the hgvs string, but this is inaccurate - because pre-mapped variants should be relative to the user-provided target sequence, not an external accession. - Any offset between the transcript and target sequence is not taken into account here (the variant position - is relative to the target sequence). + Known limitation: the transcript is used as the reference, but pre-mapped variants + should be relative to the user-provided target sequence; any transcript/target offset + is not accounted for here. :param raw_description: A string containing valid HGVS sub-strings :param layer: An enum denoting the targeted annotation layer of these HGVS strings @@ -156,8 +178,7 @@ def _create_pre_mapped_hgvs_strings( msg = f"Variant could not be parsed by mavehgvs: {error}" raise ValueError(msg) - # ga4gh hgvs_tools does not support intronic variants, so they will err out when vrs allele translator is called - # therefore skip them there + # ga4gh hgvs_tools can't translate intronic variants -- reject them here. if is_intronic_variant(variant): msg = f"Variant is intronic and cannot be processed: {variant}" raise ValueError(msg) @@ -220,8 +241,7 @@ def _create_post_mapped_hgvs_strings( msg = f"Variant could not be parsed by mavehgvs: {error}" raise ValueError(msg) - # ga4gh hgvs_tools does not support intronic variants, so they will err out when vrs allele translator is called - # therefore skip them there + # ga4gh hgvs_tools can't translate intronic variants -- reject them here. if is_intronic_variant(variant): msg = f"Variant is intronic and cannot be processed: {variant}" raise ValueError(msg) @@ -744,6 +764,57 @@ def _map_genomic( ) +def _map_coding(row: ScoreRow, sequence_id: str) -> MappedScore: + """Map a cdna-source (``NM_``/``ENST``) measured variant on its native transcript. + + The measured ``c.`` variant is already the least-derived layer, and the target *is* the + reference transcript, so pre- and post-mapped are the same coding allele -- no genomic + round-trip. Genomic/protein forms come separately from :func:`_construct_projected_layers`. + + :param row: a MaveDB score row + :param sequence_id: the coding transcript accession the variant is described against + :return: a CDNA-layer mapping, or a failed ``MappedScore`` carrying the error + """ + if row.hgvs_nt in {"_wt", "_sy", "="} or "fs" in row.hgvs_nt: + _logger.warning( + "Can't process variant syntax %s for %s", row.hgvs_nt, row.accession + ) + return MappedScore( + accession_id=row.accession, + score=row.score, + error_message=f"Can't process variant syntax {row.hgvs_nt}", + ) + + try: + coding_hgvs_strings = _create_pre_mapped_hgvs_strings( + row.hgvs_nt, AnnotationLayer.CDNA, accession_id=sequence_id + ) + coding_allele = _construct_vrs_allele( + coding_hgvs_strings, AnnotationLayer.CDNA, None, False + ) + except Exception as e: + _logger.warning( + "An error occurred while generating coding variant for %s, accession %s: %s", + row.hgvs_nt, + row.accession, + e, + exc_info=True, + ) + return MappedScore( + accession_id=row.accession, + score=row.score, + error_message=f"{type(e).__name__}: {e}", + ) + + return MappedScore( + accession_id=row.accession, + score=row.score, + alignment_level=AnnotationLayer.CDNA, + pre_mapped=coding_allele, + post_mapped=coding_allele, + ) + + def _get_allele_sequence(allele: Allele) -> str: """Get sequence for Allele @@ -797,6 +868,45 @@ def _hgvs_pro_is_valid(hgvs_pro: str) -> bool: ) +def _failed_score(row: ScoreRow, error_message: str) -> MappedScore: + """Build a bare failed mapping for a row -- no layer, no allele, just the error.""" + return MappedScore( + accession_id=row.accession, score=row.score, error_message=error_message + ) + + +def _map_protein_layer( + row: ScoreRow, + psequence_id: str, + transcript: TxSelectResult | TxSelectError, + protein_align_result: AlignmentResult | None, +) -> tuple[MappedScore | None, str | None]: + """Map the row's protein layer, or ``None`` when there is no protein layer to map. + + Returns ``None`` when the row carries no valid protein variant or the target protein + could not be aligned -- those are not protein-layer failures, they just mean this row + has no protein record. A row that maps at no layer is failed once, layer-agnostically, + by the caller. A genuine protein-layer failure (a valid ``p.`` that fails to map) is + still returned by ``_map_protein_coding_pro``. + """ + if isinstance(transcript, TxSelectError): + return None, str(transcript).strip("'") + if not _hgvs_pro_is_valid(row.hgvs_pro): + return ( + None, + f"Can't process variant syntax (hgvs_nt={row.hgvs_nt!r}, hgvs_pro={row.hgvs_pro!r})", + ) + if protein_align_result is None: + return ( + None, + "Could not perform mapping for protein variant because transcript sequence is missing or could not be aligned to reference sequence", + ) + return ( + _map_protein_coding_pro(row, psequence_id, transcript, protein_align_result), + None, + ) + + def _map_protein_coding( metadata: TargetGene, records: list[ScoreRow], @@ -833,7 +943,8 @@ def _map_protein_coding( ) _logger.info( "Protein-to-protein alignment produced for %s (ref protein: %s). " - "This alignment will be used for pro-layer variants in place of the genomic alignment.", + "Pro-layer variants are offset against this alignment rather than the genomic " + "alignment; this does not change which layer is preferred in the output.", metadata.target_gene_name, transcript.np, ) @@ -845,51 +956,57 @@ def _map_protein_coding( transcript, ) + # Sequence-based coding target: measures a genomic variant, carries a BLAT-selected + # coding transcript. Project onto the unmeasured layers (c., and p. if no protein was + # measured); build_scoreset_mapping routes them via preferred_layer_only. + project_nm = transcript.nm if isinstance(transcript, TxSelectResult) else None + projection_outcomes: list[ProjectionOutcome] = [] + variations: list[MappedScore] = [] for row in records: - hgvs_nt_mappings = None - hgvs_pro_mappings = None + # Nucleotide (genomic) layer, plus its deterministic projected layers. + nt_mapping = None + projected: list[MappedScore] = [] if _hgvs_nt_is_valid(row.hgvs_nt): - hgvs_nt_mappings = _map_genomic(row, gsequence_id, align_result) - - if ( - isinstance(transcript, TxSelectError) and not hgvs_nt_mappings - ): # only create error message if there is not an hgvs nt mapping - # TODO create pre mapped allele - hgvs_pro_mappings = MappedScore( - accession_id=row.accession, - score=row.score, - error_message=str(transcript).strip("'"), - ) - else: - if _hgvs_pro_is_valid(row.hgvs_pro): - if protein_align_result is not None: - hgvs_pro_mappings = _map_protein_coding_pro( - row, psequence_id, transcript, protein_align_result - ) - # Only create this error message if there is not a valid hgvs nt mapping, because if there is a valid hgvs nt mapping, - # it indicates we expect protein alignemnt to fail and we don't want to create redundant error messages about missing - # transcript sequence or alignment failure - elif protein_align_result is None and not hgvs_nt_mappings: - hgvs_pro_mappings = MappedScore( - accession_id=row.accession, - score=row.score, - error_message="Could not perform mapping for protein variant because transcript sequence is missing or could not be aligned to reference sequence", - ) - elif ( - not hgvs_nt_mappings - ): # only create error message if there is not an hgvs nt mapping - hgvs_pro_mappings = MappedScore( - accession_id=row.accession, - score=row.score, - error_message="Invalid protein variant syntax", + nt_mapping = _map_genomic(row, gsequence_id, align_result) + if project_nm and align_result is not None: + projected, outcome = _construct_projected_layers( + row, + AnnotationLayer.GENOMIC, + project_nm, + alignment=align_result, + # Protein is measured directly below when hgvs_pro is valid; don't + # project a redundant protein layer in that case. + project_protein=not _hgvs_pro_is_valid(row.hgvs_pro), ) + projection_outcomes.append(outcome) - # append both pro and nt mappings if both available - if hgvs_pro_mappings: - variations.append(hgvs_pro_mappings) - if hgvs_nt_mappings: - variations.append(hgvs_nt_mappings) + # Protein layer: a success (or genuine protein failure), else None. + pro_mapping, unmappable_reason = _map_protein_layer( + row, psequence_id, transcript, protein_align_result + ) + + # A row that mapped at no measured layer (neither nt nor protein) is failed once, + # layer-agnostically; build_scoreset_mapping re-attributes it to the preferred + # layer. Projections don't count toward "the row mapped" -- they're derived from + # the measured nt variant, so they're added only alongside a real mapping. + row_records = [m for m in (pro_mapping, nt_mapping) if m] + if not row_records: + row_records = [ + _failed_score( + row, unmappable_reason or "No valid measured layer could be mapped" + ) + ] + # Projected layers are suppressed as variants by the API, emitted by the CLI. + row_records.extend(projected) + variations.extend(row_records) + + if project_nm and projection_outcomes: + _log_projection_validation( + metadata.target_accession_id or metadata.target_gene_name, + project_nm, + projection_outcomes, + ) return variations, protein_align_result @@ -927,6 +1044,293 @@ def store_accession( sr.sr.store(sequence, alias_dict_list) +def _coding_pivot_hgvs_strings( + row: ScoreRow, + source_layer: AnnotationLayer, + transcript_nm: str, + accession_id: str | None, + alignment: AlignmentResult | None, +) -> list[str]: + """Build the coding (``c.``) HGVS form(s) of a measured variant -- the pivot every + deterministic projection runs through. + + The coding form is reached differently per assay level: + + * **genomic source** (``NC_`` accession or sequence-based BLAT alignment) -- build + the genomic HGVS first (pre-mapped on the accession, or post-mapped onto the + reference contig via the alignment), then project ``g. -> c.`` onto the transcript. + * **cdna source** (``NM_``/``ENST`` accession) -- the measured variant is already + coding on ``transcript_nm``; prefix the accession onto the parsed variant string(s). + + :raise Exception: if the genomic build or ``g. -> c.`` projection fails (the caller + classifies this as ``FAILED``). + """ + if source_layer is AnnotationLayer.CDNA: + # Measured variant is already coding on transcript_nm; just prefix the accession. + return [f"{accession_id}:{v}" for v in _parse_raw_variant_str(row.hgvs_nt)] + + if accession_id is not None: + genomic_hgvs_strings = _create_pre_mapped_hgvs_strings( + row.hgvs_nt, AnnotationLayer.GENOMIC, accession_id=accession_id + ) + else: + genomic_hgvs_strings = _create_post_mapped_hgvs_strings( + row.hgvs_nt, AnnotationLayer.GENOMIC, alignment=alignment + ) + return [ + project_genomic_hgvs_to_coding(g, transcript_nm) for g in genomic_hgvs_strings + ] + + +def _projection_record( + row: ScoreRow, + layer: AnnotationLayer, + *, + outcome: MappingOutcome, + allele: Allele | Haplotype | None = None, + error_message: str | None = None, +) -> MappedScore: + """Build one projected-layer record carrying its typed :class:`MappingOutcome`. + + Benign outcomes (``INTRONIC`` / ``NO_PROTEIN_CONSEQUENCE``) leave ``error_message`` + ``None`` so a populated ``error_message`` always means a genuine failure. + """ + return MappedScore( + accession_id=row.accession, + score=row.score, + alignment_level=layer, + pre_mapped=allele, + post_mapped=allele, + error_message=error_message, + outcome=outcome, + ) + + +def _construct_projected_layers( + row: ScoreRow, + source_layer: AnnotationLayer, + transcript_nm: str, + *, + accession_id: str | None = None, + alignment: AlignmentResult | None = None, + genomic_accession: str | None = None, + project_protein: bool = True, +) -> tuple[list[MappedScore], ProjectionOutcome]: + """Project a measured variant onto its own deterministic non-measured forms. + + A 1-to-1 transform of one measured variant onto the other layers -- distinct from the + equivalence-class expansion (synonymous codons) the reverse-translation job owns. The + non-measured forms depend on ``source_layer``: + + * **genomic source** (``NC_`` accession, or sequence-based via ``alignment``) -- cdna + (``g. -> c.``) and, unless ``project_protein`` is False, protein (``c. -> p.``). + * **cdna source** (``NM_``/``ENST``) -- genomic (``c. -> g.`` onto ``genomic_accession``, + from :func:`lookup.get_genomic_accession_for_transcript`) and protein (``c. -> p.``). + + Every expected level yields exactly one record (never a silent omission), each with a + typed :class:`MappingOutcome`: ``MAPPED``, benign ``INTRONIC``/``NO_PROTEIN_CONSEQUENCE`` + (no ``error_message``), or ``FAILED`` (``error_message`` populated). Non-variant rows + (``_wt``/``_sy``/``=``/``fs``) yield none. The returned :class:`ProjectionOutcome` (for + the per-target validation log) tracks the load-bearing nucleotide form -- ``FAILED`` + there is a mis-selection signal; a protein-stage miss never fails the aggregate. + + :param row: a MaveDB score row + :param source_layer: the assay (measured) level -- ``GENOMIC`` or ``CDNA`` + :param transcript_nm: the coding transcript the cdna form is expressed against + :param accession_id: accession the measured variant is described against; ``None`` for + a sequence-based genomic source, which uses ``alignment`` + :param alignment: BLAT alignment for a sequence-based genomic source + :param genomic_accession: reference contig for the ``c. -> g.`` projection (cdna source) + :param project_protein: construct the protein form (False when protein was measured) + :return: one typed ``MappedScore`` per expected non-measured level, and the outcome + """ + if row.hgvs_nt in {"_wt", "_sy", "="} or "fs" in row.hgvs_nt: + # Not an input variant (wildtype/synonymous/reference/frameshift) -- the measured + # path already emits the row's record; there is nothing to project. + return [], ProjectionOutcome.SKIPPED + + # The deterministic non-measured layer set, with the load-bearing nucleotide + # re-expression first: cdna for a genomic source, genomic for a cdna source. + load_bearing_layer = ( + AnnotationLayer.CDNA + if source_layer is AnnotationLayer.GENOMIC + else AnnotationLayer.GENOMIC + ) + expected_layers = [load_bearing_layer] + if project_protein: + expected_layers.append(AnnotationLayer.PROTEIN) + + try: + coding_hgvs_strings = _coding_pivot_hgvs_strings( + row, source_layer, transcript_nm, accession_id, alignment + ) + except Exception as e: + # The coding pivot underpins every projection; if it cannot be built, none of the + # expected layers can. Emit a FAILED record for each rather than omitting them. + _logger.debug( + "Projection for %s (accession %s) failed: coding pivot failed: %s", + row.hgvs_nt, + row.accession, + e, + ) + records = [ + _projection_record( + row, + layer, + outcome=MappingOutcome.FAILED, + error_message=f"coding pivot ({source_layer} -> c.) failed: {e}", + ) + for layer in expected_layers + ] + return records, ProjectionOutcome.FAILED + + if any(coding_hgvs_is_intronic(c) for c in coding_hgvs_strings): + # Intronic: the coding form is not VRS-representable and there is no protein + # consequence. A benign absence for every expected layer (no error_message). + _logger.debug( + "Projection for %s (accession %s): coding projection is intronic.", + row.hgvs_nt, + row.accession, + ) + records = [ + _projection_record(row, layer, outcome=MappingOutcome.INTRONIC) + for layer in expected_layers + ] + return records, ProjectionOutcome.INTRONIC + + # 1. Resolve this layer's HGVS strings. The cdna form is the pivot itself; the + # genomic form is the c.->g. re-expression (cdna source); the protein form is + # the c.->p. consequence. A protein string-projection miss is a *benign* + # no-consequence; a load-bearing nucleotide miss is a FAILED record. + records: list[MappedScore] = [] + aggregate = ProjectionOutcome.PROJECTED + for layer in expected_layers: + if layer is AnnotationLayer.CDNA: + hgvs_strings = coding_hgvs_strings + + elif layer is AnnotationLayer.GENOMIC: + if genomic_accession is None: + records.append( + _projection_record( + row, + layer, + outcome=MappingOutcome.FAILED, + error_message="no resolvable reference contig for c. -> g. projection", + ) + ) + aggregate = ProjectionOutcome.FAILED + continue + + try: + hgvs_strings = [ + project_coding_hgvs_to_genomic(c, genomic_accession) + for c in coding_hgvs_strings + ] + except Exception as e: + records.append( + _projection_record( + row, + layer, + outcome=MappingOutcome.FAILED, + error_message=f"c. -> g. projection failed: {e}", + ) + ) + aggregate = ProjectionOutcome.FAILED + continue + + elif layer is AnnotationLayer.PROTEIN: + try: + hgvs_strings = [ + project_coding_hgvs_to_protein(c) for c in coding_hgvs_strings + ] + except Exception as e: + # Non-projectable protein is a benign no-consequence (e.g. UTR); the + # load-bearing nucleotide layer carries any mis-selection signal. + _logger.debug( + "No protein consequence for %s (accession %s): %s", + row.hgvs_nt, + row.accession, + e, + ) + records.append( + _projection_record( + row, layer, outcome=MappingOutcome.NO_PROTEIN_CONSEQUENCE + ) + ) + continue + + else: + msg = f"Unexpected annotation layer: {layer}" + raise ValueError(msg) + + # 2. Construct the VRS allele for this layer. + try: + allele = _construct_vrs_allele(hgvs_strings, layer, None, False) + records.append( + _projection_record( + row, layer, outcome=MappingOutcome.MAPPED, allele=allele + ) + ) + except Exception as e: + records.append( + _projection_record( + row, + layer, + outcome=MappingOutcome.FAILED, + error_message=f"{layer} allele construction failed: {e}", + ) + ) + if layer is load_bearing_layer: + aggregate = ProjectionOutcome.FAILED + + return records, aggregate + + +def _log_projection_validation( + accession_id: str, + transcript_nm: str, + outcomes: list[ProjectionOutcome], +) -> None: + """Log a per-target summary of how cleanly a target's variants projected. + + For a selected transcript (genomic/sequence source) a high failure fraction signals a + mis-selection; for a cdna source (``accession_id == transcript_nm``) nothing was + selected, so this just reports the c. -> g./p. projection. WARNING above + ``_PROJECTION_FAILURE_WARN_FRACTION`` of attempted (non-skipped) variants, else INFO. + """ + counts = Counter(outcomes) + projected = counts[ProjectionOutcome.PROJECTED] + intronic = counts[ProjectionOutcome.INTRONIC] + failed = counts[ProjectionOutcome.FAILED] + skipped = counts[ProjectionOutcome.SKIPPED] + attempted = projected + intronic + failed # non-variant skipped rows excluded + if attempted == 0: + return + + failure_fraction = failed / attempted + log = ( + _logger.warning + if failure_fraction > _PROJECTION_FAILURE_WARN_FRACTION + else _logger.info + ) + + if accession_id == transcript_nm: + scope = f"Projecting cdna target {accession_id} to its genomic/protein forms" + else: + scope = f"Projection of {accession_id} onto selected transcript {transcript_nm}" + + log( + "%s: %d projected, %d intronic, %d failed, %d skipped (%.0f%% of %d attempted failed).", + scope, + projected, + intronic, + failed, + skipped, + 100 * failure_fraction, + attempted, + ) + + def _map_accession( metadata: TargetGene, records: list[ScoreRow], @@ -942,8 +1346,12 @@ def _map_accession( store_accession(sequence_id) - # TODO full list of protein accession id prefixes - if metadata.target_accession_id.startswith(("NP", "ENSP")): + if metadata.target_accession_id.startswith( + ( + "NP", + "ENSP", + ) + ): # TODO full list of protein accession id prefixes for row in records: hgvs_pro_mappings = _map_protein_coding_pro( row, @@ -951,11 +1359,114 @@ def _map_accession( transcript, ) variations.append(hgvs_pro_mappings) - # TODO full list of transcript and contig accession id prefixes - elif metadata.target_accession_id.startswith(("NM", "ENST", "NC")): + + # Coding accession targets project their measured variant onto the unmeasured + # deterministic layers; build_scoreset_mapping routes them via preferred_layer_only. + # NC_ (genomic source): measured variant is genomic; project g.->c.->p. onto the + # gene's selected MANE transcript (transcript.nm). + # NM_/ENST (cdna source): measured variant is already coding; project c.->g. (onto + # the transcript's reference contig) and c.->p. + elif metadata.target_accession_id.startswith( + ( + "NM", + "ENST", + "NC", + ) + ): # TODO full list of transcript and contig accession id prefixes + is_genomic_source = metadata.target_accession_id.startswith("NC") + project_nm = transcript.nm if isinstance(transcript, TxSelectResult) else None + + # NM_/ENST accession is itself the coding transcript even without a + # TxSelectResult (none is produced for transcript accessions). + if not is_genomic_source and project_nm is None: + project_nm = sequence_id + + # The c. -> g. projection (cdna source only) needs the transcript's reference + # contig; resolve it once per target, not per variant. + genomic_accession = ( + get_genomic_accession_for_transcript(project_nm) + if project_nm and not is_genomic_source + else None + ) + + # Map the measured variant in its native layer: genomic for an NC_ source, + # coding (directly on the transcript) for an NM_/ENST source. + projection_outcomes: list[ProjectionOutcome] = [] for row in records: - hgvs_nt_mappings = _map_genomic(row, sequence_id, align_result) + if is_genomic_source: + hgvs_nt_mappings = _map_genomic(row, sequence_id, align_result) + else: + hgvs_nt_mappings = _map_coding(row, sequence_id) variations.append(hgvs_nt_mappings) + + # cdna source: map a measured hgvs_pro directly rather than projecting it -- + # it's already in reference coordinates (pre_mapped == post_mapped). Needs the + # NP_ from a TxSelectResult; if unavailable, skip and log rather than fall back + # to a projection that could silently diverge from the measured value. + has_measured_protein = False + if not is_genomic_source and _hgvs_pro_is_valid(row.hgvs_pro): + has_measured_protein = True + np_accession = ( + transcript.np + if isinstance(transcript, TxSelectResult) and transcript.np + else None + ) + if np_accession: + try: + pro_hgvs = _create_pre_mapped_hgvs_strings( + row.hgvs_pro, AnnotationLayer.PROTEIN, tx=transcript + ) + pro_allele = _construct_vrs_allele( + pro_hgvs, AnnotationLayer.PROTEIN, None, False + ) + variations.append( + MappedScore( + accession_id=row.accession, + score=row.score, + alignment_level=AnnotationLayer.PROTEIN, + pre_mapped=pro_allele, + post_mapped=pro_allele, + ) + ) + except Exception as e: + _logger.warning( + "Could not map measured hgvs_pro %s for %s: %s", + row.hgvs_pro, + row.accession, + e, + ) + variations.append( + MappedScore( + accession_id=row.accession, + score=row.score, + alignment_level=AnnotationLayer.PROTEIN, + error_message=f"measured protein mapping failed: {e}", + ) + ) + else: + _logger.warning( + "hgvs_pro %s present for %s but no NP_ accession resolvable " + "from transcript; skipping protein layer", + row.hgvs_pro, + row.accession, + ) + + if project_nm: + projected, outcome = _construct_projected_layers( + row, + AnnotationLayer.GENOMIC + if is_genomic_source + else AnnotationLayer.CDNA, + project_nm, + accession_id=sequence_id, + genomic_accession=genomic_accession, + project_protein=not has_measured_protein, + ) + variations.extend(projected) + projection_outcomes.append(outcome) + if project_nm: + _log_projection_validation(sequence_id, project_nm, projection_outcomes) + else: msg = f"Unrecognized accession prefix for accession id: {metadata.target_accession_id}" raise UnsupportedReferenceSequencePrefixError(msg) @@ -994,9 +1505,8 @@ def _construct_vrs_allele( for hgvs_string in hgvs_strings: _logger.debug("Processing HGVS string: %s", hgvs_string) - # Special handling for reference-identical variants, which must be represented as simple Alleles with a - # ReferenceLengthExpression state rather than beeing translated from HGVS. This translation is - # currently unsupported by ga4gh hgvs_tools and will raise an error if attempted. + # Reference-identical variants must be built as Alleles with a + # ReferenceLengthExpression state; ga4gh hgvs_tools can't translate them from HGVS. if hgvs_string.endswith(".="): if pre_map: if sequence_id is None: @@ -1038,6 +1548,11 @@ def _construct_vrs_allele( ) allele.state = _rle_to_lse(allele.state, allele.location) + # Carry the verbatim c. HGVS so annotate need not reconstruct it (it can't: its + # reconstructor only handles g./p. frames). + if not pre_map and layer is AnnotationLayer.CDNA: + allele.expressions = [Expression(syntax=Syntax.HGVS_C, value=hgvs_string)] + allele.id = identify_allele(allele) alleles.append(allele) diff --git a/tests/test_annotate_target_mapping.py b/tests/test_annotate_target_mapping.py index e73664a..06aaa0a 100644 --- a/tests/test_annotate_target_mapping.py +++ b/tests/test_annotate_target_mapping.py @@ -1,12 +1,23 @@ """Tests for annotate._reference_accession_for_target_level and build_scoreset_mapping.""" -from unittest.mock import patch +from unittest.mock import MagicMock, patch from cool_seq_tool.schemas import AnnotationLayer +from ga4gh.vrs._internal.models import ( + Allele, + Expression, + LiteralSequenceExpression, + SequenceLocation, + SequenceReference, + Syntax, +) from dcd_mapping.annotate import ( _align_result_for_target, + _annotate_allele_mapping, + _pick_preferred_layer, _reference_accession_for_target_level, + _resolve_outcome, _stamp_alignment_locus_flags, build_scoreset_mapping, ) @@ -14,6 +25,8 @@ AlignmentQc, AlignmentResult, GeneInfo, + MappedScore, + MappingOutcome, ScoreAnnotation, ScoresetMapping, ScoresetMetadata, @@ -165,10 +178,11 @@ def _make_annotation( layer: AnnotationLayer | None, post_mapped=None, error_message: str | None = None, + mavedb_id: str = "urn:mavedb:00000001-a-1#1", ) -> ScoreAnnotation: """Minimal ScoreAnnotation for golden test.""" return ScoreAnnotation( - mavedb_id="urn:mavedb:00000001-a-1#1", + mavedb_id=mavedb_id, alignment_level=layer, pre_mapped=None, post_mapped=post_mapped, @@ -177,6 +191,237 @@ def _make_annotation( ) +class TestResolveOutcome: + """Every emitted annotation is typed uniformly. + + Projected records keep their explicit outcome; measured/legacy records derive one so + consumers can treat all levels alike. + """ + + def _ms(self, *, post_mapped=None, outcome=None) -> MappedScore: + return MappedScore( + accession_id="urn:mavedb:00000001-a-1#1", + score=None, + post_mapped=post_mapped, + outcome=outcome, + ) + + def _allele(self) -> Allele: + return Allele( + location=SequenceLocation( + sequenceReference=SequenceReference(refgetAccession="SQ." + "A" * 32), + start=0, + end=1, + ), + state=LiteralSequenceExpression(sequence="A"), + ) + + def test_explicit_projected_outcome_preserved(self): + for outcome in ( + MappingOutcome.INTRONIC, + MappingOutcome.NO_PROTEIN_CONSEQUENCE, + MappingOutcome.FAILED, + MappingOutcome.MAPPED, + ): + assert _resolve_outcome(self._ms(outcome=outcome)) == outcome + + def test_measured_success_derives_mapped(self): + assert ( + _resolve_outcome(self._ms(post_mapped=self._allele())) + == MappingOutcome.MAPPED + ) + + def test_measured_failure_derives_failed(self): + assert _resolve_outcome(self._ms()) == MappingOutcome.FAILED + + +class TestCarriedCodingExpression: + """Coding records keep their carried c. HGVS; annotate must not reconstruct it (the + reconstructor handles only g./p. and would emit ``NM_…:g.delins…``). + """ + + def _allele(self, *, layer_expr: str | None) -> Allele: + allele = Allele( + location=SequenceLocation( + sequenceReference=SequenceReference(refgetAccession="SQ." + "B" * 32), + start=7006, + end=7007, + ), + state=LiteralSequenceExpression(sequence="T"), + ) + if layer_expr is not None: + allele.expressions = [Expression(syntax=Syntax.HGVS_C, value=layer_expr)] + return allele + + def _ms(self, allele: Allele, layer: AnnotationLayer) -> MappedScore: + return MappedScore( + accession_id="urn:mavedb:00000001-a-1#1", + score=None, + alignment_level=layer, + pre_mapped=allele, + post_mapped=allele, + ) + + def test_cdna_keeps_carried_expression_without_reconstructing(self): + allele = self._allele(layer_expr="NM_000059.4:c.7007G>T") + mapped_score = self._ms(allele, AnnotationLayer.CDNA) + seqrepo = MagicMock() + seqrepo.get_sequence.return_value = "G" + with ( + patch("dcd_mapping.annotate.get_seqrepo", return_value=seqrepo), + patch("dcd_mapping.annotate._get_vrs_ref_allele_seq", return_value=None), + # If reconstruction were attempted for a coding record, that is the bug. + patch( + "dcd_mapping.annotate._get_hgvs_string", + side_effect=AssertionError("coding records must not be reconstructed"), + ), + ): + annotation = _annotate_allele_mapping( + mapped_score, + None, + _make_acc_target("NM_000059.4"), + "urn:mavedb:00000001-a-1", + ) + expr = annotation.post_mapped.expressions[0] + assert expr.value == "NM_000059.4:c.7007G>T" + assert expr.syntax == Syntax.HGVS_C.value + + def test_genomic_without_expression_is_reconstructed(self): + allele = self._allele(layer_expr=None) + mapped_score = self._ms(allele, AnnotationLayer.GENOMIC) + seqrepo = MagicMock() + seqrepo.get_sequence.return_value = "G" + with ( + patch("dcd_mapping.annotate.get_seqrepo", return_value=seqrepo), + patch("dcd_mapping.annotate._get_vrs_ref_allele_seq", return_value=None), + patch( + "dcd_mapping.annotate.get_chromosome_identifier_from_vrs_id", + return_value="NC_000013.11", + ), + patch( + "dcd_mapping.annotate._get_hgvs_string", + return_value=("NC_000013.11:g.32346896G>T", Syntax.HGVS_G), + ) as get_hgvs, + ): + annotation = _annotate_allele_mapping( + mapped_score, + None, + _make_acc_target("NC_000013.11"), + "urn:mavedb:00000001-a-1", + ) + get_hgvs.assert_called_once() + assert ( + annotation.post_mapped.expressions[0].value == "NC_000013.11:g.32346896G>T" + ) + + +class TestPickPreferredLayer: + """The preferred layer is the assay level, derived from the target's input form.""" + + def test_nc_accession_is_genomic(self): + target = _make_acc_target("NC_000001.11") + assert _pick_preferred_layer(target, []) == AnnotationLayer.GENOMIC + + def test_nm_accession_is_cdna(self): + target = _make_acc_target("NM_000001.1") + assert _pick_preferred_layer(target, []) == AnnotationLayer.CDNA + + def test_enst_accession_is_cdna(self): + target = _make_acc_target("ENST00000123456.1") + assert _pick_preferred_layer(target, []) == AnnotationLayer.CDNA + + def test_np_accession_is_protein(self): + target = _make_acc_target("NP_000001.1") + assert _pick_preferred_layer(target, []) == AnnotationLayer.PROTEIN + + def test_sequence_based_genomic_when_genomic_mapping_present(self): + target = _make_seq_target() + mappings = [ + _make_annotation(AnnotationLayer.GENOMIC), + _make_annotation(AnnotationLayer.PROTEIN), + ] + assert _pick_preferred_layer(target, mappings) == AnnotationLayer.GENOMIC + + def test_sequence_based_protein_when_no_genomic_mapping(self): + target = _make_seq_target() + mappings = [_make_annotation(AnnotationLayer.PROTEIN)] + assert _pick_preferred_layer(target, mappings) == AnnotationLayer.PROTEIN + + +class TestProjectedLayerRouting: + """The deterministic projected layers are routed by ``preferred_layer_only``. + + A single input variant projected to g./c./p. is suppressed down to its assay layer + for the API (one mapped score per variant) and emitted in full for the CLI. + """ + + def _run(self, *, preferred_layer_only: bool): + # One NC_ (genomic-assay) variant present at all three deterministic layers. + metadata = ScoresetMetadata( + urn="urn:mavedb:00000001-a-1", + target_genes={"GENE1": _make_acc_target("NC_000001.11")}, + ) + mappings = { + "GENE1": [ + _make_annotation(AnnotationLayer.GENOMIC), + _make_annotation(AnnotationLayer.CDNA), + _make_annotation(AnnotationLayer.PROTEIN), + ] + } + with ( + patch( + "dcd_mapping.annotate.get_vrs_id_from_identifier", + return_value="ga4gh:SQ.test", + ), + patch( + "dcd_mapping.annotate.get_chromosome_identifier", + return_value="NC_000001.11", + ), + patch( + "dcd_mapping.annotate._get_computed_reference_sequence", + return_value=None, + ), + patch( + "dcd_mapping.annotate._get_mapped_reference_sequence", return_value=None + ), + ): + return build_scoreset_mapping( + metadata=metadata, + raw_metadata={}, + mappings=mappings, + align_results={"GENE1": _make_align()}, + tx_output={"GENE1": _make_tx()}, + gene_info={"GENE1": GeneInfo(hgnc_symbol="GENE1")}, + preferred_layer_only=preferred_layer_only, + vrs_version=VrsVersion.V_2, + ) + + def test_api_keeps_only_the_assay_layer(self): + result = self._run(preferred_layer_only=True) + levels = [ms.alignment_level for ms in result.mapped_scores] + # Exactly one mapped score, at the assay (genomic) level -- the projected + # cdna/protein layers are suppressed as variants. + assert levels == [AnnotationLayer.GENOMIC] + + def test_cli_emits_every_deterministic_layer(self): + result = self._run(preferred_layer_only=False) + levels = {ms.alignment_level for ms in result.mapped_scores} + assert levels == { + AnnotationLayer.GENOMIC, + AnnotationLayer.CDNA, + AnnotationLayer.PROTEIN, + } + # Every mapped score still resolves to a TargetMapping (orphan invariant). + tm_keys = { + (tm.target_gene_identifier, tm.alignment_level) + for tm in result.target_mappings + } + assert all( + (ms.target_gene_identifier, ms.alignment_level) in tm_keys + for ms in result.mapped_scores + ) + + class TestBuildScoresetMapping: def test_emits_one_target_mapping_per_layer(self): """Each (target, layer) pair appearing in mappings generates exactly one TargetMapping.""" @@ -201,7 +446,7 @@ def test_emits_one_target_mapping_per_layer(self): return_value="NC_000001.11", ), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -228,8 +473,125 @@ def test_emits_one_target_mapping_per_layer(self): layers_seen = {tm.alignment_level for tm in result.target_mappings} assert AnnotationLayer.GENOMIC in layers_seen assert AnnotationLayer.PROTEIN in layers_seen - # Exactly one mapping per layer - assert len(result.target_mappings) == 2 + # A coding target with a selected transcript also emits an identity cdna + # TargetMapping for the unscored cdna layer (carrying the transcript's nm), + # so the response surfaces the coding transcript even though no per-variant + # cdna mappings were produced. + assert AnnotationLayer.CDNA in layers_seen + assert len(result.target_mappings) == 3 + cdna_tm = next( + tm + for tm in result.target_mappings + if tm.alignment_level == AnnotationLayer.CDNA + ) + assert cdna_tm.reference_accession == "NM_000001.1" + assert cdna_tm.preferred is False + # Identity row: no mapped_scores join it, so QC/counts are null. + assert cdna_tm.total_variants is None + assert cdna_tm.variants_mapped_cleanly is None + assert cdna_tm.percent_identity is None + + def test_nc_coding_target_emits_identity_cdna_and_protein_rows(self): + """A genomic-accession (NC_) coding target emits its genomic scored layer plus + identity cdna and protein TargetMappings (carrying the selected transcript's + nm/np with null QC), so the projection transcript is surfaced for RT. + """ + metadata = ScoresetMetadata( + urn="urn:mavedb:00000001-a-1", + target_genes={"GENE1": _make_acc_target("NC_000007.14")}, + ) + # NC_ coding targets emit only genomic per-variant mappings; the cdna/protein + # projection forms are filtered, so those layers never appear in layers_seen. + mappings = {"GENE1": [_make_annotation(AnnotationLayer.GENOMIC)]} + + with ( + patch( + "dcd_mapping.annotate.get_vrs_id_from_identifier", + return_value="ga4gh:SQ.test", + ), + patch( + "dcd_mapping.annotate.get_chromosome_identifier", + return_value="refseq:NC_000007.14", + ), + patch( + "dcd_mapping.annotate._pick_preferred_layer", + return_value=AnnotationLayer.GENOMIC, + ), + patch( + "dcd_mapping.annotate._get_computed_reference_sequence", + return_value=None, + ), + patch( + "dcd_mapping.annotate._get_mapped_reference_sequence", return_value=None + ), + ): + result = build_scoreset_mapping( + metadata=metadata, + raw_metadata={}, + mappings=mappings, + align_results={"GENE1": None}, + tx_output={"GENE1": _make_tx(nm="NM_004333.6", np="NP_004324.2")}, + gene_info={"GENE1": GeneInfo(hgnc_symbol="BRAF")}, + preferred_layer_only=False, + vrs_version=VrsVersion.V_2, + ) + + by_level = {tm.alignment_level: tm for tm in result.target_mappings} + assert AnnotationLayer.GENOMIC in by_level # scored, assay-level + assert AnnotationLayer.CDNA in by_level + assert AnnotationLayer.PROTEIN in by_level + + cdna_tm = by_level[AnnotationLayer.CDNA] + assert cdna_tm.reference_accession == "NM_004333.6" + assert cdna_tm.preferred is False + assert cdna_tm.total_variants is None # identity row: no scores join it + assert cdna_tm.percent_identity is None + + protein_tm = by_level[AnnotationLayer.PROTEIN] + assert protein_tm.reference_accession == "NP_004324.2" + assert protein_tm.total_variants is None + + def test_regulatory_nc_target_emits_no_identity_rows(self): + """A non-coding (regulatory) target gets no cdna/protein identity rows.""" + target = _make_acc_target("NC_000007.14") + target.target_gene_category = TargetType.REGULATORY + metadata = ScoresetMetadata( + urn="urn:mavedb:00000001-a-1", target_genes={"GENE1": target} + ) + mappings = {"GENE1": [_make_annotation(AnnotationLayer.GENOMIC)]} + + with ( + patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), + patch( + "dcd_mapping.annotate.get_chromosome_identifier", + return_value="refseq:NC_000007.14", + ), + patch( + "dcd_mapping.annotate._pick_preferred_layer", + return_value=AnnotationLayer.GENOMIC, + ), + patch( + "dcd_mapping.annotate._get_computed_reference_sequence", + return_value=None, + ), + patch( + "dcd_mapping.annotate._get_mapped_reference_sequence", return_value=None + ), + ): + result = build_scoreset_mapping( + metadata=metadata, + raw_metadata={}, + mappings=mappings, + align_results={"GENE1": None}, + tx_output={"GENE1": None}, + gene_info={"GENE1": GeneInfo(hgnc_symbol=None)}, + preferred_layer_only=False, + vrs_version=VrsVersion.V_2, + ) + + levels = {tm.alignment_level for tm in result.target_mappings} + assert AnnotationLayer.CDNA not in levels + assert AnnotationLayer.PROTEIN not in levels def test_preferred_flag_on_exactly_one_row_per_target(self): """preferred=True must appear on exactly one TargetMapping per target.""" @@ -247,7 +609,7 @@ def test_preferred_flag_on_exactly_one_row_per_target(self): return_value="NC_000001.11", ), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -273,8 +635,14 @@ def test_preferred_flag_on_exactly_one_row_per_target(self): assert len(preferred) == 1 assert preferred[0].alignment_level == AnnotationLayer.GENOMIC - def test_annotation_qc_counts_sum_correctly(self): - """total_variants == clean + warnings + failed.""" + def test_annotation_qc_counts_relate_correctly(self): + """Clean + failed == total; alignment warnings are a sub-count of clean. + + ``variants_mapped_cleanly``/``variants_failed`` partition on whether a + ``post_mapped`` allele exists. ``variants_with_alignment_warnings`` is a separate, + overlapping sub-count driven by alignment-locus flags (near_gap / + at_mismatched_locus) -- a mapped variant can be both clean and flagged. + """ from ga4gh.vrs._internal.models import ( Allele, LiteralSequenceExpression, @@ -299,11 +667,24 @@ def test_annotation_qc_counts_sum_correctly(self): annotations = [ _make_annotation(AnnotationLayer.GENOMIC, post_mapped=allele), # clean _make_annotation( - AnnotationLayer.GENOMIC, post_mapped=allele, error_message="warn" - ), # warning - _make_annotation(AnnotationLayer.GENOMIC), # failed + AnnotationLayer.GENOMIC, post_mapped=allele + ), # clean + flag + _make_annotation(AnnotationLayer.GENOMIC), # failed (no post_mapped) ] + # Warnings come from alignment-locus flags, not error_message. Stamp exactly one + # mapped variant as near a gap so we can assert it counts as both clean and warned. + def _stamp_one_near_gap( + anns: list[ScoreAnnotation], *_args: object, **_kwargs: object + ) -> None: + marked = False + for ann in anns: + ann.at_mismatched_locus = False + ann.near_gap = False + if ann.post_mapped is not None and not marked: + ann.near_gap = True + marked = True + with ( patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), patch( @@ -311,7 +692,7 @@ def test_annotation_qc_counts_sum_correctly(self): return_value="NC_000001.11", ), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -321,6 +702,10 @@ def test_annotation_qc_counts_sum_correctly(self): patch( "dcd_mapping.annotate._get_mapped_reference_sequence", return_value=None ), + patch( + "dcd_mapping.annotate._stamp_alignment_locus_flags", + _stamp_one_near_gap, + ), ): result = build_scoreset_mapping( metadata=metadata, @@ -336,14 +721,18 @@ def test_annotation_qc_counts_sum_correctly(self): assert result.target_mappings tm = result.target_mappings[0] assert tm.total_variants == 3 - assert tm.variants_mapped_cleanly == 1 - assert tm.variants_with_mapping_warnings == 1 - assert tm.variants_failed == 1 + assert tm.variants_failed == 1 # only the row without a post_mapped allele + assert tm.variants_mapped_cleanly == 2 # both mapped rows, flagged or not + assert tm.variants_with_alignment_warnings == 1 # the near_gap row + # clean and failed partition the total; warnings overlap clean, not the total. assert ( - (tm.variants_mapped_cleanly or 0) - + (tm.variants_with_mapping_warnings or 0) - + (tm.variants_failed or 0) + (tm.variants_mapped_cleanly or 0) + (tm.variants_failed or 0) ) == tm.total_variants + assert ( + 0 + <= (tm.variants_with_alignment_warnings or 0) + <= (tm.variants_mapped_cleanly or 0) + ) def test_tool_parameters_for_sequence_based_target(self): """tool_parameters should contain BLAT aligner key for sequence-based targets.""" @@ -360,7 +749,7 @@ def test_tool_parameters_for_sequence_based_target(self): return_value="NC_000001.11", ), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -398,7 +787,7 @@ def test_tool_parameters_for_accession_based_target(self): with ( patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.CDNA, ), patch( @@ -436,7 +825,7 @@ def test_tool_parameters_for_nc_accession_target(self): with ( patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -491,7 +880,7 @@ def test_mapped_scores_alignment_levels_subset_of_target_mappings(self): return_value="NC_000001.11", ), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -532,8 +921,12 @@ def test_null_layer_failures_attributed_to_preferred_layer(self): urn="urn:mavedb:00000001-a-1", target_genes={"GENE1": _make_seq_target("GENE1")}, ) - g_ann = _make_annotation(AnnotationLayer.GENOMIC) - null_ann = _make_annotation(None) # completely failed + g_ann = _make_annotation( + AnnotationLayer.GENOMIC, mavedb_id="urn:mavedb:00000001-a-1#1" + ) + null_ann = _make_annotation( + None, mavedb_id="urn:mavedb:00000001-a-1#2" + ) # completely failed, distinct variant with ( patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), @@ -542,7 +935,7 @@ def test_null_layer_failures_attributed_to_preferred_layer(self): return_value="NC_000001.11", ), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -586,9 +979,17 @@ def test_preferred_layer_only_total_variants_includes_null_failures(self): urn="urn:mavedb:00000001-a-1", target_genes={"GENE1": _make_seq_target("GENE1")}, ) - # 3 genomic successes + 2 completely-failed variants - g_anns = [_make_annotation(AnnotationLayer.GENOMIC) for _ in range(3)] - null_anns = [_make_annotation(None) for _ in range(2)] + # 3 genomic successes + 2 completely-failed variants, each a distinct variant + g_anns = [ + _make_annotation( + AnnotationLayer.GENOMIC, mavedb_id=f"urn:mavedb:00000001-a-1#{i}" + ) + for i in range(1, 4) + ] + null_anns = [ + _make_annotation(None, mavedb_id=f"urn:mavedb:00000001-a-1#{i}") + for i in range(4, 6) + ] with ( patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), @@ -597,7 +998,7 @@ def test_preferred_layer_only_total_variants_includes_null_failures(self): return_value="NC_000001.11", ), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.GENOMIC, ), patch( @@ -828,7 +1229,7 @@ def test_cdot_data_version_propagates_when_present(self): with ( patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), patch( - "dcd_mapping.annotate._pick_preferred_genomic_or_protein_layer", + "dcd_mapping.annotate._pick_preferred_layer", return_value=AnnotationLayer.CDNA, ), patch( @@ -854,3 +1255,181 @@ def test_cdot_data_version_propagates_when_present(self): assert tm.tool_parameters is not None assert "cdot_data_version" in tm.tool_parameters assert tm.tool_parameters["cdot_data_version"] == "0.2.26" + + +class TestNullFailureDedup: + """A completely-failed (null-layer) record is re-attributed to the preferred layer + only when the variant is not already represented there. + + The motivating case is a codon-optimized (e.g. yeast-expressed) target: its genomic + mapping fails wholesale, the preferred layer falls back to PROTEIN, and a variant's + dead genomic attempt must not duplicate its measured protein record -- which would + emit two mapped_scores for one input variant (and over-count the protein QC row). + """ + + def _protein_allele(self) -> Allele: + return Allele( + location=SequenceLocation( + sequenceReference=SequenceReference(refgetAccession="SQ." + "A" * 32), + start=0, + end=1, + ), + state=LiteralSequenceExpression(sequence="A"), + ) + + def _run(self, mappings: dict[str, list[ScoreAnnotation]]): + metadata = ScoresetMetadata( + urn="urn:mavedb:00000001-a-1", + target_genes={"GENE1": _make_seq_target("GENE1")}, + ) + with ( + patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), + patch( + "dcd_mapping.annotate.get_chromosome_identifier", + return_value="NC_000001.11", + ), + patch( + "dcd_mapping.annotate._pick_preferred_layer", + return_value=AnnotationLayer.PROTEIN, + ), + patch( + "dcd_mapping.annotate._get_computed_reference_sequence", + return_value=None, + ), + patch( + "dcd_mapping.annotate._get_mapped_reference_sequence", return_value=None + ), + ): + return build_scoreset_mapping( + metadata=metadata, + raw_metadata={}, + mappings=mappings, + align_results={"GENE1": _make_align()}, + tx_output={"GENE1": None}, + gene_info={"GENE1": None}, + preferred_layer_only=True, + vrs_version=VrsVersion.V_2, + ) + + def test_failing_genomic_does_not_duplicate_measured_protein(self): + variant = "urn:mavedb:00000001-a-1#1" + mappings = { + "GENE1": [ + # Dead genomic attempt (null layer) and the measured protein record, + # same input variant. + _make_annotation(None, mavedb_id=variant), + _make_annotation( + AnnotationLayer.PROTEIN, + post_mapped=self._protein_allele(), + mavedb_id=variant, + ), + ] + } + result = self._run(mappings) + + # Exactly one mapped_score, the protein record -- not the re-attributed failure. + assert len(result.mapped_scores) == 1 + assert result.mapped_scores[0].alignment_level == AnnotationLayer.PROTEIN + + protein_tm = next( + tm + for tm in result.target_mappings + if tm.alignment_level == AnnotationLayer.PROTEIN + ) + # The variant counts once, as a clean map -- the dead genomic attempt is not + # folded back in as a phantom failure. + assert protein_tm.total_variants == 1 + assert protein_tm.variants_failed == 0 + assert protein_tm.variants_mapped_cleanly == 1 + + def test_completely_failed_variant_reattributed_without_orphan(self): + variant = "urn:mavedb:00000001-a-1#1" + # Only a null-layer failure: nothing represents the variant at the preferred + # layer, so it must be re-attributed there and still get a parent TargetMapping. + mappings = {"GENE1": [_make_annotation(None, mavedb_id=variant)]} + result = self._run(mappings) + + assert len(result.mapped_scores) == 1 + assert result.mapped_scores[0].alignment_level == AnnotationLayer.PROTEIN + + protein_tm = next( + tm + for tm in result.target_mappings + if tm.alignment_level == AnnotationLayer.PROTEIN + ) + assert protein_tm.total_variants == 1 + assert protein_tm.variants_failed == 1 + + # Orphan invariant: the re-attributed score resolves to a TargetMapping. + tm_keys = { + (tm.target_gene_identifier, tm.alignment_level) + for tm in result.target_mappings + } + assert ( + result.mapped_scores[0].target_gene_identifier, + result.mapped_scores[0].alignment_level, + ) in tm_keys + + def test_non_preferred_success_synthesizes_preferred_failure(self): + """A variant whose only record is a success at a non-preferred layer (a wild-type + p.= on a genomic-preferred target) still needs one preferred-layer record. It gets + a synthesized failure there; the off-layer success survives only in CLI output. + """ + variant = "urn:mavedb:00000001-a-1#1" + mappings = { + "GENE1": [ + _make_annotation( + AnnotationLayer.PROTEIN, + post_mapped=self._protein_allele(), + mavedb_id=variant, + ) + ] + } + metadata = ScoresetMetadata( + urn="urn:mavedb:00000001-a-1", + target_genes={"GENE1": _make_seq_target("GENE1")}, + ) + with ( + patch("dcd_mapping.annotate.get_vrs_id_from_identifier", return_value=None), + patch( + "dcd_mapping.annotate.get_chromosome_identifier", + return_value="NC_000001.11", + ), + patch( + "dcd_mapping.annotate._pick_preferred_layer", + return_value=AnnotationLayer.GENOMIC, + ), + patch( + "dcd_mapping.annotate._get_computed_reference_sequence", + return_value=None, + ), + patch( + "dcd_mapping.annotate._get_mapped_reference_sequence", return_value=None + ), + ): + result = build_scoreset_mapping( + metadata=metadata, + raw_metadata={}, + mappings=mappings, + align_results={"GENE1": _make_align()}, + tx_output={"GENE1": None}, + gene_info={"GENE1": None}, + preferred_layer_only=True, + vrs_version=VrsVersion.V_2, + ) + + # Exactly one mapped_score: a synthesized failure at the preferred (genomic) layer. + assert len(result.mapped_scores) == 1 + synth = result.mapped_scores[0] + assert synth.alignment_level == AnnotationLayer.GENOMIC + assert synth.post_mapped is None + assert synth.error_message is not None + assert "preferred layer" in synth.error_message + + genomic_tm = next( + tm + for tm in result.target_mappings + if tm.alignment_level == AnnotationLayer.GENOMIC + ) + assert genomic_tm.total_variants == 1 + assert genomic_tm.variants_failed == 1 diff --git a/tests/test_genomic_accession_transcript.py b/tests/test_genomic_accession_transcript.py new file mode 100644 index 0000000..c2825dc --- /dev/null +++ b/tests/test_genomic_accession_transcript.py @@ -0,0 +1,339 @@ +"""Tests for genomic-accession (NC_) coding-target transcript selection. + +Covers the path added so that genomic-accession coding targets resolve a coding +(MANE) transcript -- gene inferred from the variants' genomic loci (preferred) or +the declared target metadata (fallback) -- which the mapper surfaces as cdna target +metadata for reverse translation. See ``dcd_mapping.transcripts``. +""" + +from unittest.mock import patch + +import pytest +from cool_seq_tool.schemas import TranscriptPriority + +from dcd_mapping.exceptions import NoCodingTranscriptError +from dcd_mapping.lookup import infer_hgnc_symbol_from_genomic_loci +from dcd_mapping.schemas import ( + ManeDescription, + ScoreRow, + ScoresetMetadata, + TargetGene, + TargetType, + TxSelectResult, +) +from dcd_mapping.transcripts import ( + _genomic_positions_from_records, + _select_genomic_accession_reference, + select_transcripts, +) + +MODULE = "dcd_mapping.transcripts" +LOOKUP = "dcd_mapping.lookup" + + +def _mane( + nm: str = "NM_004333.6", + np: str = "NP_004324.2", + symbol: str = "BRAF", + priority: TranscriptPriority = TranscriptPriority.MANE_SELECT, +) -> ManeDescription: + return ManeDescription( + refseq_nuc=nm, + refseq_prot=np, + transcript_priority=priority, + ncbi_gene_id="673", + ensembl_gene_id="ENSG00000157764", + hgnc_gene_id="HGNC:1097", + symbol=symbol, + name="B-Raf proto-oncogene", + ensembl_nuc="ENST00000646891.2", + ensembl_prot="ENSP00000493543.1", + grch38_chr="7", + chr_start=140730665, + chr_end=140924764, + chr_strand="-", + ) + + +def _nc_coding_target( + accession: str = "NC_000007.14", name: str = "BRAF" +) -> TargetGene: + return TargetGene( + target_gene_name=name, + target_gene_category=TargetType.PROTEIN_CODING, + target_accession_id=accession, + target_accession_assembly="GRCh38", + ) + + +def _row(hgvs_nt: str) -> ScoreRow: + return ScoreRow(hgvs_nt=hgvs_nt, hgvs_pro="_wt", score="1.0", accession="urn#1") + + +class TestSelectGenomicAccessionReference: + def test_inferred_gene_preferred(self): + """The gene inferred from genomic loci is preferred and drives MANE selection.""" + target = _nc_coding_target() + with ( + patch( + f"{MODULE}.infer_hgnc_symbol_from_genomic_loci", return_value="BRAF" + ) as infer, + patch(f"{MODULE}.get_gene_symbol") as declared, + patch( + f"{MODULE}.get_mane_transcripts_for_gene", return_value=[_mane()] + ) as mane, + ): + result = _select_genomic_accession_reference(target, [_row("g.123A>G")]) + + assert isinstance(result, TxSelectResult) + assert result.nm == "NM_004333.6" + assert result.np == "NP_004324.2" + assert result.hgnc_symbol == "BRAF" + assert result.transcript_mode == TranscriptPriority.MANE_SELECT + infer.assert_called_once() + mane.assert_called_once_with("BRAF") + # Declared-metadata normalization is not consulted when inference succeeds. + declared.assert_not_called() + + def test_falls_back_to_declared_gene(self): + """When locus inference yields nothing, fall back to target-metadata gene.""" + target = _nc_coding_target(name="Wildtype BRAF") + with ( + patch(f"{MODULE}.infer_hgnc_symbol_from_genomic_loci", return_value=None), + patch(f"{MODULE}.get_gene_symbol", return_value="BRAF") as declared, + patch(f"{MODULE}.get_mane_transcripts_for_gene", return_value=[_mane()]), + ): + result = _select_genomic_accession_reference(target, [_row("g.123A>G")]) + + assert result.nm == "NM_004333.6" + declared.assert_called_once() + + def test_no_gene_raises_no_coding_transcript(self): + """No resolvable gene -> typed NoCodingTranscriptError (recoverable skip).""" + target = _nc_coding_target(name="mystery element") + with ( + patch(f"{MODULE}.infer_hgnc_symbol_from_genomic_loci", return_value=None), + patch(f"{MODULE}.get_gene_symbol", return_value=None), + pytest.raises(NoCodingTranscriptError), + ): + _select_genomic_accession_reference(target, [_row("g.123A>G")]) + + def test_no_mane_raises_no_coding_transcript(self): + """Gene resolves but has no MANE transcript -> NoCodingTranscriptError.""" + target = _nc_coding_target() + with ( + patch(f"{MODULE}.infer_hgnc_symbol_from_genomic_loci", return_value="BRAF"), + patch(f"{MODULE}.get_mane_transcripts_for_gene", return_value=[]), + pytest.raises(NoCodingTranscriptError), + ): + _select_genomic_accession_reference(target, [_row("g.123A>G")]) + + def test_mane_plus_clinical_secondary(self): + """MANE Plus Clinical is selected when no MANE Select is present.""" + target = _nc_coding_target() + plus = _mane(priority=TranscriptPriority.MANE_PLUS_CLINICAL) + with ( + patch(f"{MODULE}.infer_hgnc_symbol_from_genomic_loci", return_value="BRAF"), + patch(f"{MODULE}.get_mane_transcripts_for_gene", return_value=[plus]), + ): + result = _select_genomic_accession_reference(target, [_row("g.123A>G")]) + assert result.transcript_mode == TranscriptPriority.MANE_PLUS_CLINICAL + + +class TestInferHgncSymbolFromGenomicLoci: + def _feature( + self, + name: str, + start: int = 0, + end: int = 10**9, + biotype: str = "protein_coding", + ) -> dict: + return { + "external_name": name, + "feature_type": "gene", + "start": start, + "end": end, + "biotype": biotype, + } + + def test_returns_gene_containing_loci_with_one_request(self): + """A single Ensembl request over the bounding span resolves the gene -- not one + request per position. + """ + with ( + patch( + f"{LOOKUP}.get_overlapping_features_for_region", + return_value=[self._feature("BARD1", 214725646, 214809707)], + ) as overlap, + patch(f"{LOOKUP}._get_hgnc_symbol", return_value="BARD1"), + ): + result = infer_hgnc_symbol_from_genomic_loci( + "NC_000002.12", [214728639, 214728788, 214728789, 214728789] + ) + assert result == "BARD1" + # One request for the whole target, regardless of variant/duplicate count. + overlap.assert_called_once() + + def test_picks_gene_containing_most_loci(self): + """When the bounding span overlaps several genes, the one containing the most + query loci wins -- resolved locally from coordinates, no extra requests. + """ + with ( + patch( + f"{LOOKUP}.get_overlapping_features_for_region", + return_value=[ + self._feature("MAINGENE", 100, 200), + self._feature( + "EDGEGENE", 199, 1000 + ), # contains only the last locus + ], + ), + patch(f"{LOOKUP}._get_hgnc_symbol", side_effect=lambda s: s), + ): + result = infer_hgnc_symbol_from_genomic_loci( + "NC_000002.12", [110, 120, 130, 500] + ) + assert result == "MAINGENE" + + def test_ignores_non_coding_overlapping_genes(self): + """A non-coding gene overlapping the same loci (which has no MANE transcript) is + excluded, so it neither wins nor forces a tie against the coding gene. + """ + with ( + patch( + f"{LOOKUP}.get_overlapping_features_for_region", + return_value=[ + self._feature("CODINGGENE", 0, 1000, biotype="protein_coding"), + self._feature("AS-LNCRNA", 0, 1000, biotype="lncRNA"), + ], + ), + patch(f"{LOOKUP}._get_hgnc_symbol", side_effect=lambda s: s), + ): + result = infer_hgnc_symbol_from_genomic_loci("NC_000002.12", [10, 20, 30]) + assert result == "CODINGGENE" + + def test_no_protein_coding_overlap_returns_none(self): + """Only non-coding genes overlap -> no coding gene to infer.""" + with patch( + f"{LOOKUP}.get_overlapping_features_for_region", + return_value=[self._feature("AS-LNCRNA", 0, 1000, biotype="lncRNA")], + ): + assert infer_hgnc_symbol_from_genomic_loci("NC_000002.12", [10]) is None + + def test_falls_back_to_ensembl_symbol_when_normalizer_misses(self): + """A locus-confirmed gene is not dropped just because the gene normalizer + returns nothing -- the Ensembl external_name is used directly. + """ + with ( + patch( + f"{LOOKUP}.get_overlapping_features_for_region", + return_value=[self._feature("BARD1", 214725646, 214809707)], + ), + patch(f"{LOOKUP}._get_hgnc_symbol", return_value=None), + ): + assert ( + infer_hgnc_symbol_from_genomic_loci("NC_000002.12", [214728639]) + == "BARD1" + ) + + def test_no_overlap_returns_none(self): + with patch(f"{LOOKUP}.get_overlapping_features_for_region", return_value=[]): + assert infer_hgnc_symbol_from_genomic_loci("NC_000002.12", [1]) is None + + def test_tie_returns_none(self): + # Two genes each contain the same loci equally -> no single dominant gene. + with patch( + f"{LOOKUP}.get_overlapping_features_for_region", + return_value=[ + self._feature("GENEA", 0, 100), + self._feature("GENEB", 0, 100), + ], + ): + assert infer_hgnc_symbol_from_genomic_loci("NC_000002.12", [10, 20]) is None + + def test_empty_positions_returns_none(self): + assert infer_hgnc_symbol_from_genomic_loci("NC_000002.12", []) is None + + def test_span_too_wide_skips_query(self): + """A bounding span too wide to be one gene declines inference without firing a + doomed oversized Ensembl request. + """ + with patch(f"{LOOKUP}.get_overlapping_features_for_region") as overlap: + result = infer_hgnc_symbol_from_genomic_loci( + "NC_000002.12", [1, 1 + 6_000_000] + ) + assert result is None + overlap.assert_not_called() + + +class TestGenomicPositionsFromRecords: + def test_extracts_and_skips(self): + rows = [ + _row("g.140753336A>T"), + _row("_wt"), + _row("_sy"), + _row("="), + ] + positions = _genomic_positions_from_records(rows) + # 1-based 140753336 -> 0-based 140753335; reference/special rows skipped. + assert positions == [140753335] + + def test_unparseable_rows_skipped(self): + positions = _genomic_positions_from_records([_row("not-a-variant")]) + assert positions == [] + + +class TestSelectTranscriptsRouting: + def _metadata(self, target: TargetGene) -> ScoresetMetadata: + return ScoresetMetadata( + urn="urn:mavedb:00000001-a-1", target_genes={"T": target} + ) + + @pytest.mark.asyncio + async def test_nc_coding_routes_to_genomic_accession_path(self): + target = _nc_coding_target() + metadata = self._metadata(target) + with patch( + f"{MODULE}._select_genomic_accession_reference", + return_value=TxSelectResult( + nm="NM_004333.6", + np="NP_004324.2", + start=0, + is_full_match=True, + sequence="", + transcript_mode=TranscriptPriority.MANE_SELECT, + hgnc_symbol="BRAF", + ), + ) as sel: + result = await select_transcripts( + metadata, {"T": [_row("g.123A>G")]}, {"T": None} + ) + assert isinstance(result["T"], TxSelectResult) + assert result["T"].nm == "NM_004333.6" + sel.assert_called_once() + + @pytest.mark.asyncio + async def test_nc_regulatory_returns_none(self): + target = _nc_coding_target() + target.target_gene_category = TargetType.REGULATORY + metadata = self._metadata(target) + with patch(f"{MODULE}._select_genomic_accession_reference") as sel: + result = await select_transcripts( + metadata, {"T": [_row("g.123A>G")]}, {"T": None} + ) + # Regulatory NC_ target: no coding transcript expected, no typed error. + assert result["T"] is None + sel.assert_not_called() + + @pytest.mark.asyncio + async def test_nc_coding_no_transcript_stores_typed_error(self): + target = _nc_coding_target() + metadata = self._metadata(target) + with patch( + f"{MODULE}._select_genomic_accession_reference", + side_effect=NoCodingTranscriptError("no gene"), + ): + result = await select_transcripts( + metadata, {"T": [_row("g.123A>G")]}, {"T": None} + ) + assert isinstance(result["T"], NoCodingTranscriptError) diff --git a/tests/test_variant_projection.py b/tests/test_variant_projection.py new file mode 100644 index 0000000..9a2461d --- /dev/null +++ b/tests/test_variant_projection.py @@ -0,0 +1,599 @@ +"""Tests for per-variant projection across assay levels. + +Covers the mapper's *projection* of a measured variant onto its own deterministic +forms -- ``g. -> c. -> p.`` for a genomic source, ``c. -> g.`` / ``c. -> p.`` for a +cdna source -- against the coding transcript, distinct from the equivalence-class +expansion the reverse-translation job owns. The projected forms are emitted alongside +the measured one and routed by ``preferred_layer_only`` (API keeps the assay layer, +CLI keeps all); :class:`~dcd_mapping.vrs_map.ProjectionOutcome` and the per-target +summary log additionally surface a mis-selected transcript. + +See ``dcd_mapping.vrs_map._construct_projected_layers`` and the projection helpers in +``dcd_mapping.lookup``. +""" + +import logging +from unittest.mock import MagicMock, patch + +import hgvs.parser +from cool_seq_tool.schemas import AnnotationLayer +from ga4gh.vrs._internal.models import ( + Allele, + LiteralSequenceExpression, + SequenceLocation, + SequenceReference, +) + +from dcd_mapping.lookup import ( + coding_hgvs_is_intronic, + get_genomic_accession_for_transcript, + project_coding_hgvs_to_genomic, + project_coding_hgvs_to_protein, + project_genomic_hgvs_to_coding, +) +from dcd_mapping.schemas import ( + MappedScore, + MappingOutcome, + ScoreRow, + TargetGene, + TargetType, + TxSelectResult, +) +from dcd_mapping.vrs_map import ( + ProjectionOutcome, + _construct_projected_layers, + _log_projection_validation, + _map_accession, +) + +VRS_MAP = "dcd_mapping.vrs_map" +LOOKUP = "dcd_mapping.lookup" + +NC = "NC_000007.14" +NM = "NM_004333.6" + + +def _row(hgvs_nt: str, accession: str = "urn:mavedb:00000001-a-1#1") -> ScoreRow: + return ScoreRow(hgvs_nt=hgvs_nt, hgvs_pro="_wt", score="1.0", accession=accession) + + +def _allele() -> Allele: + """Build a minimal, schema-valid VRS Allele for use as a construction stand-in.""" + return Allele( + location=SequenceLocation( + sequenceReference=SequenceReference(refgetAccession="SQ." + "A" * 32), + start=0, + end=1, + ), + state=LiteralSequenceExpression(sequence="A"), + ) + + +class TestCodingHgvsIsIntronic: + """Intronic detection reads the parsed base-offset, not the textual ``-``.""" + + def _patched(self): + # Real hgvs parser (pure string parsing, no network); fake out the seqrepo- + # backed builder so only the parser is exercised. + builder = MagicMock() + builder.hgvs_tools.parser = hgvs.parser.Parser() + return patch(f"{LOOKUP}.TranslatorBuilder", return_value=builder), patch( + f"{LOOKUP}.get_seqrepo" + ) + + def test_intron_offset_is_intronic(self): + tb, sr = self._patched() + with tb, sr: + assert coding_hgvs_is_intronic("NM_000051.4:c.2002-1del") is True + + def test_exonic_is_not_intronic(self): + tb, sr = self._patched() + with tb, sr: + assert coding_hgvs_is_intronic("NM_000051.4:c.76A>G") is False + + def test_five_prime_utr_dash_is_not_intronic(self): + """``c.-20A>G`` carries a textual ``-`` but offset 0 -- not intronic.""" + tb, sr = self._patched() + with tb, sr: + assert coding_hgvs_is_intronic("NM_000051.4:c.-20A>G") is False + + +class TestProjectionHelpersWiring: + """The g.->c. and c.->p. helpers route through the package hgvs VariantMapper.""" + + def test_project_genomic_to_coding(self): + builder = MagicMock() + tools = builder.hgvs_tools + tools.parser.parse.return_value = "parsed_g" + tools.variant_mapper.g_to_c.return_value = f"{NM}:c.1A>G" + with ( + patch(f"{LOOKUP}.TranslatorBuilder", return_value=builder), + patch(f"{LOOKUP}.get_seqrepo"), + ): + result = project_genomic_hgvs_to_coding(f"{NC}:g.140A>G", NM) + tools.parser.parse.assert_called_once_with(f"{NC}:g.140A>G") + tools.variant_mapper.g_to_c.assert_called_once_with("parsed_g", NM) + assert result == f"{NM}:c.1A>G" + + def test_project_coding_to_protein(self): + builder = MagicMock() + tools = builder.hgvs_tools + tools.parser.parse.return_value = "parsed_c" + tools.variant_mapper.c_to_p.return_value = "NP_004324.2:p.Lys1Glu" + with ( + patch(f"{LOOKUP}.TranslatorBuilder", return_value=builder), + patch(f"{LOOKUP}.get_seqrepo"), + ): + result = project_coding_hgvs_to_protein(f"{NM}:c.1A>G") + tools.parser.parse.assert_called_once_with(f"{NM}:c.1A>G") + tools.variant_mapper.c_to_p.assert_called_once_with("parsed_c") + assert result == "NP_004324.2:p.Lys1Glu" + + def test_project_coding_to_genomic(self): + """``c. -> g.`` routes through ``c_to_g`` with the explicit target contig.""" + builder = MagicMock() + tools = builder.hgvs_tools + tools.parser.parse.return_value = "parsed_c" + tools.variant_mapper.c_to_g.return_value = f"{NC}:g.140A>G" + with ( + patch(f"{LOOKUP}.TranslatorBuilder", return_value=builder), + patch(f"{LOOKUP}.get_seqrepo"), + ): + result = project_coding_hgvs_to_genomic(f"{NM}:c.1A>G", NC) + tools.parser.parse.assert_called_once_with(f"{NM}:c.1A>G") + tools.variant_mapper.c_to_g.assert_called_once_with("parsed_c", NC) + assert result == f"{NC}:g.140A>G" + + +class TestGenomicAccessionForTranscript: + """The contig resolver intersects cdot's mapping options with the assembly's contigs.""" + + def test_picks_contig_in_requested_assembly(self): + cd = MagicMock() + cd.get_tx_mapping_options.return_value = [ + {"alt_ac": "NC_000007.13"}, # GRCh37 contig -- not in the GRCh38 set + {"alt_ac": "NC_000007.14"}, # GRCh38 contig + ] + cd.get_assembly_map.return_value = {"NC_000007.14": "7"} + with patch(f"{LOOKUP}.cdot_rest", return_value=cd): + assert get_genomic_accession_for_transcript(NM) == "NC_000007.14" + cd.get_assembly_map.assert_called_once_with("GRCh38") + + def test_no_matching_contig_returns_none(self): + cd = MagicMock() + cd.get_tx_mapping_options.return_value = [{"alt_ac": "NC_000007.13"}] + cd.get_assembly_map.return_value = {"NC_000007.14": "7"} + with patch(f"{LOOKUP}.cdot_rest", return_value=cd): + assert get_genomic_accession_for_transcript(NM) is None + + def test_lookup_failure_returns_none(self): + cd = MagicMock() + cd.get_tx_mapping_options.side_effect = RuntimeError("cdot down") + with patch(f"{LOOKUP}.cdot_rest", return_value=cd): + assert get_genomic_accession_for_transcript(NM) is None + + +class TestConstructProjectedLayers: + """Branch + outcome classification for a single variant's projection.""" + + def test_non_variant_rows_skipped(self): + for hgvs_nt in ("_wt", "_sy", "=", "c.1_2delinsAA fs"): + projected, outcome = _construct_projected_layers( + _row(hgvs_nt), AnnotationLayer.GENOMIC, NM, accession_id=NC + ) + assert projected == [] + assert outcome is ProjectionOutcome.SKIPPED + + def test_genomic_to_coding_failure_records_all_layers_failed(self): + """A pivot failure emits a FAILED record for every expected level, not silence.""" + with ( + patch( + f"{VRS_MAP}._create_pre_mapped_hgvs_strings", + return_value=[f"{NC}:g.140A>G"], + ), + patch( + f"{VRS_MAP}.project_genomic_hgvs_to_coding", + side_effect=ValueError("off transcript"), + ), + ): + projected, outcome = _construct_projected_layers( + _row("140A>G"), AnnotationLayer.GENOMIC, NM, accession_id=NC + ) + assert outcome is ProjectionOutcome.FAILED + assert [(m.alignment_level, m.outcome) for m in projected] == [ + (AnnotationLayer.CDNA, MappingOutcome.FAILED), + (AnnotationLayer.PROTEIN, MappingOutcome.FAILED), + ] + # Genuine failures carry an error_message; benign absences would not. + assert all(m.error_message for m in projected) + + def test_intronic_records_all_layers_benign(self): + """Intronic is a benign absence: every expected level gets an INTRONIC record + with no error_message (benign != error). + """ + with ( + patch( + f"{VRS_MAP}._create_pre_mapped_hgvs_strings", + return_value=[f"{NC}:g.140A>G"], + ), + patch( + f"{VRS_MAP}.project_genomic_hgvs_to_coding", + return_value=[f"{NM}:c.2002-1A>G"], + ), + patch(f"{VRS_MAP}.coding_hgvs_is_intronic", return_value=True), + patch(f"{VRS_MAP}.project_coding_hgvs_to_protein") as to_protein, + patch(f"{VRS_MAP}._construct_vrs_allele") as construct, + ): + projected, outcome = _construct_projected_layers( + _row("140A>G"), AnnotationLayer.GENOMIC, NM, accession_id=NC + ) + assert outcome is ProjectionOutcome.INTRONIC + assert [(m.alignment_level, m.outcome) for m in projected] == [ + (AnnotationLayer.CDNA, MappingOutcome.INTRONIC), + (AnnotationLayer.PROTEIN, MappingOutcome.INTRONIC), + ] + assert all(m.error_message is None for m in projected) + # No protein projection or allele construction attempted for an intronic variant. + to_protein.assert_not_called() + construct.assert_not_called() + + def test_clean_projection_emits_cdna_and_protein(self): + allele = _allele() + with ( + patch( + f"{VRS_MAP}._create_pre_mapped_hgvs_strings", + return_value=[f"{NC}:g.140A>G"], + ), + patch( + f"{VRS_MAP}.project_genomic_hgvs_to_coding", + return_value=[f"{NM}:c.1A>G"], + ), + patch(f"{VRS_MAP}.coding_hgvs_is_intronic", return_value=False), + patch( + f"{VRS_MAP}.project_coding_hgvs_to_protein", + return_value="NP_004324.2:p.Lys1Glu", + ), + patch(f"{VRS_MAP}._construct_vrs_allele", return_value=allele), + ): + projected, outcome = _construct_projected_layers( + _row("140A>G"), AnnotationLayer.GENOMIC, NM, accession_id=NC + ) + assert outcome is ProjectionOutcome.PROJECTED + assert [(m.alignment_level, m.outcome) for m in projected] == [ + (AnnotationLayer.CDNA, MappingOutcome.MAPPED), + (AnnotationLayer.PROTEIN, MappingOutcome.MAPPED), + ] + + def test_protein_failure_alone_is_no_consequence(self): + """A c.->p. miss is a benign no-consequence; the coding form still maps and the + aggregate stays PROJECTED. + """ + allele = _allele() + with ( + patch( + f"{VRS_MAP}._create_pre_mapped_hgvs_strings", + return_value=[f"{NC}:g.140A>G"], + ), + patch( + f"{VRS_MAP}.project_genomic_hgvs_to_coding", + return_value=[f"{NM}:c.1A>G"], + ), + patch(f"{VRS_MAP}.coding_hgvs_is_intronic", return_value=False), + patch( + f"{VRS_MAP}.project_coding_hgvs_to_protein", + side_effect=ValueError("no p."), + ), + patch(f"{VRS_MAP}._construct_vrs_allele", return_value=allele), + ): + projected, outcome = _construct_projected_layers( + _row("140A>G"), AnnotationLayer.GENOMIC, NM, accession_id=NC + ) + assert outcome is ProjectionOutcome.PROJECTED + assert [(m.alignment_level, m.outcome) for m in projected] == [ + (AnnotationLayer.CDNA, MappingOutcome.MAPPED), + (AnnotationLayer.PROTEIN, MappingOutcome.NO_PROTEIN_CONSEQUENCE), + ] + protein = projected[1] + assert protein.error_message is None # benign, not an error + assert protein.post_mapped is None + + def test_coding_allele_construction_failure_is_failed(self): + """Coding hgvs existed but could not become an allele -> FAILED record.""" + with ( + patch( + f"{VRS_MAP}._create_pre_mapped_hgvs_strings", + return_value=[f"{NC}:g.140A>G"], + ), + patch( + f"{VRS_MAP}.project_genomic_hgvs_to_coding", + return_value=[f"{NM}:c.1A>G"], + ), + patch(f"{VRS_MAP}.coding_hgvs_is_intronic", return_value=False), + patch( + f"{VRS_MAP}.project_coding_hgvs_to_protein", + return_value="NP_004324.2:p.Lys1Glu", + ), + patch( + f"{VRS_MAP}._construct_vrs_allele", side_effect=ValueError("bad allele") + ), + ): + projected, outcome = _construct_projected_layers( + _row("140A>G"), AnnotationLayer.GENOMIC, NM, accession_id=NC + ) + assert outcome is ProjectionOutcome.FAILED + assert [(m.alignment_level, m.outcome) for m in projected] == [ + (AnnotationLayer.CDNA, MappingOutcome.FAILED), + (AnnotationLayer.PROTEIN, MappingOutcome.FAILED), + ] + + +class TestConstructProjectedLayersCdnaSource: + """A cdna assay (NM_/ENST) projects its measured coding variant to g. and p. + + The measured variant is already coding on the transcript, so there is no g.->c. + pivot: the coding form is built by prefixing the accession, the genomic re-expression + (c.->g.) is the load-bearing form, and the protein consequence (c.->p.) follows. + """ + + def test_cdna_source_emits_genomic_and_protein(self): + allele = _allele() + with ( + patch(f"{VRS_MAP}.coding_hgvs_is_intronic", return_value=False), + patch( + f"{VRS_MAP}.project_coding_hgvs_to_genomic", + return_value=f"{NC}:g.140A>G", + ), + patch( + f"{VRS_MAP}.project_coding_hgvs_to_protein", + return_value="NP_004324.2:p.Lys1Glu", + ), + patch(f"{VRS_MAP}._construct_vrs_allele", return_value=allele), + ): + projected, outcome = _construct_projected_layers( + _row("c.1A>G"), + AnnotationLayer.CDNA, + NM, + accession_id=NM, + genomic_accession=NC, + ) + assert outcome is ProjectionOutcome.PROJECTED + assert [m.alignment_level for m in projected] == [ + AnnotationLayer.GENOMIC, + AnnotationLayer.PROTEIN, + ] + + def test_cdna_source_genomic_projection_failure_is_failed(self): + """The c.->g. re-expression is load-bearing: its failure is a FAILED genomic + record, while the protein consequence still maps. + """ + allele = _allele() + with ( + patch(f"{VRS_MAP}.coding_hgvs_is_intronic", return_value=False), + patch( + f"{VRS_MAP}.project_coding_hgvs_to_genomic", + side_effect=ValueError("off contig"), + ), + patch( + f"{VRS_MAP}.project_coding_hgvs_to_protein", + return_value="NP_004324.2:p.Lys1Glu", + ), + patch(f"{VRS_MAP}._construct_vrs_allele", return_value=allele), + ): + projected, outcome = _construct_projected_layers( + _row("c.1A>G"), + AnnotationLayer.CDNA, + NM, + accession_id=NM, + genomic_accession=NC, + ) + assert outcome is ProjectionOutcome.FAILED + assert [(m.alignment_level, m.outcome) for m in projected] == [ + (AnnotationLayer.GENOMIC, MappingOutcome.FAILED), + (AnnotationLayer.PROTEIN, MappingOutcome.MAPPED), + ] + assert projected[0].error_message # genomic failure carries detail + + def test_cdna_source_without_contig_records_genomic_failed(self): + """An unresolvable contig is still an accounted-for outcome: the genomic layer + gets a FAILED record (not silence), while the protein consequence still maps. + """ + allele = _allele() + with ( + patch(f"{VRS_MAP}.coding_hgvs_is_intronic", return_value=False), + patch(f"{VRS_MAP}.project_coding_hgvs_to_genomic") as to_genomic, + patch( + f"{VRS_MAP}.project_coding_hgvs_to_protein", + return_value="NP_004324.2:p.Lys1Glu", + ), + patch(f"{VRS_MAP}._construct_vrs_allele", return_value=allele), + ): + projected, outcome = _construct_projected_layers( + _row("c.1A>G"), + AnnotationLayer.CDNA, + NM, + accession_id=NM, + genomic_accession=None, + ) + to_genomic.assert_not_called() + assert outcome is ProjectionOutcome.FAILED + assert [(m.alignment_level, m.outcome) for m in projected] == [ + (AnnotationLayer.GENOMIC, MappingOutcome.FAILED), + (AnnotationLayer.PROTEIN, MappingOutcome.MAPPED), + ] + + +class TestLogProjectionValidation: + """Per-target summary escalates to WARNING when too many variants fail.""" + + def _levels(self, caplog, outcomes): + caplog.clear() + with caplog.at_level(logging.INFO, logger=VRS_MAP): + _log_projection_validation(NC, NM, outcomes) + return [r.levelno for r in caplog.records] + + def test_clean_run_logs_info(self, caplog): + outcomes = [ProjectionOutcome.PROJECTED] * 3 + [ProjectionOutcome.INTRONIC] + levels = self._levels(caplog, outcomes) + assert levels == [logging.INFO] + + def test_high_failure_fraction_logs_warning(self, caplog): + # 2 failed of 4 attempted = 50% > 25% threshold. + outcomes = [ProjectionOutcome.PROJECTED] * 2 + [ProjectionOutcome.FAILED] * 2 + levels = self._levels(caplog, outcomes) + assert levels == [logging.WARNING] + + def test_intronic_excluded_from_failure_fraction(self, caplog): + # 1 failed of 4 attempted = 25%, not > 25%, so INFO despite many intronic. + outcomes = [ProjectionOutcome.PROJECTED] * 3 + [ProjectionOutcome.FAILED] + levels = self._levels(caplog, outcomes) + assert levels == [logging.INFO] + + def test_all_skipped_logs_nothing(self, caplog): + levels = self._levels(caplog, [ProjectionOutcome.SKIPPED] * 5) + assert levels == [] + + def test_message_distinguishes_cdna_self_pivot_from_selection(self, caplog): + outcomes = [ProjectionOutcome.PROJECTED] * 3 + with caplog.at_level(logging.INFO, logger=VRS_MAP): + _log_projection_validation(NC, NM, outcomes) # different -> selection + _log_projection_validation(NM, NM, outcomes) # same -> cdna self-pivot + selected, cdna = caplog.records[0].getMessage(), caplog.records[1].getMessage() + assert f"onto selected transcript {NM}" in selected + assert f"Projecting cdna target {NM}" in cdna + + +class TestNmAccessionMeasuredProtein: + """NM_/ENST targets with a measured hgvs_pro use it directly rather than projecting. + + The measured protein is already in reference coordinates so pre_mapped == post_mapped. + When hgvs_pro is present, _construct_projected_layers must not produce a redundant + projected protein layer (project_protein=False). + """ + + NP = "NP_004324.2" + + def _tx(self) -> TxSelectResult: + return TxSelectResult( + nm=NM, + np=self.NP, + start=0, + is_full_match=True, + sequence="MAAAA", + hgnc_symbol="BRAF", + ) + + def _run(self, hgvs_pro: str, transcript=None): + """Call _map_accession for a single NM_ row via vrs_map, fully mocked.""" + metadata = TargetGene( + target_gene_name="BRAF", + target_gene_category=TargetType.PROTEIN_CODING, + target_sequence=None, + target_sequence_type=None, + target_accession_id=NM, + ) + row = ScoreRow( + hgvs_nt="c.1799T>A", + hgvs_pro=hgvs_pro, + score="1.0", + accession="urn:mavedb:00000001-a-1#1", + ) + allele = _allele() + with ( + patch(f"{VRS_MAP}.store_accession"), + patch(f"{VRS_MAP}.get_genomic_accession_for_transcript", return_value=NC), + patch(f"{VRS_MAP}._map_genomic") as mock_genomic, + patch( + f"{VRS_MAP}._create_pre_mapped_hgvs_strings", + return_value=[f"{self.NP}:p.Val600Glu"], + ), + patch(f"{VRS_MAP}._construct_vrs_allele", return_value=allele), + patch( + f"{VRS_MAP}._construct_projected_layers", + return_value=([], ProjectionOutcome.PROJECTED), + ) as mock_proj, + patch(f"{VRS_MAP}._log_projection_validation"), + ): + mock_genomic.return_value = MappedScore( + accession_id="urn:mavedb:00000001-a-1#1", + score="1.0", + alignment_level=AnnotationLayer.GENOMIC, + ) + result = _map_accession(metadata, [row], None, transcript or self._tx()) + return result, mock_proj + + def test_measured_protein_emitted_directly(self): + """When hgvs_pro is valid and NP_ is available, a protein MappedScore is emitted + with pre_mapped == post_mapped (reference-coordinate form, no alignment offset). + """ + variations, _ = self._run("p.Val600Glu") + protein_scores = [ + v for v in variations if v.alignment_level == AnnotationLayer.PROTEIN + ] + assert len(protein_scores) == 1 + assert protein_scores[0].pre_mapped is not None + assert protein_scores[0].pre_mapped is protein_scores[0].post_mapped + + def test_measured_protein_suppresses_projection(self): + """project_protein=False must be passed to _construct_projected_layers so the + projected c.->p. form is not emitted alongside the measured one. + """ + _, mock_proj = self._run("p.Val600Glu") + _, kwargs = mock_proj.call_args + assert kwargs.get("project_protein") is False + + def test_no_hgvs_pro_still_projects(self): + """When hgvs_pro is absent, projection behaviour is unchanged (project_protein=True).""" + _, mock_proj = self._run("_wt") + _, kwargs = mock_proj.call_args + assert ( + kwargs.get("project_protein") is not False + ) # default True (not passed or True) + + def test_missing_np_accession_skips_protein(self, caplog): + """When there is no TxSelectResult (the common case for NM_ accession targets + where transcript selection produces no result), the protein layer is skipped with + a warning rather than silently emitting a potentially wrong projected form. + """ + from dcd_mapping.schemas import TargetGene, TargetType + from dcd_mapping.vrs_map import _map_accession + + metadata = TargetGene( + target_gene_name="BRAF", + target_gene_category=TargetType.PROTEIN_CODING, + target_sequence=None, + target_sequence_type=None, + target_accession_id=NM, + ) + row = ScoreRow( + hgvs_nt="c.1799T>A", + hgvs_pro="p.Val600Glu", + score="1.0", + accession="urn:mavedb:00000001-a-1#1", + ) + with ( + patch(f"{VRS_MAP}.store_accession"), + patch(f"{VRS_MAP}.get_genomic_accession_for_transcript", return_value=NC), + patch(f"{VRS_MAP}._map_genomic") as mock_genomic, + patch( + f"{VRS_MAP}._construct_projected_layers", + return_value=([], ProjectionOutcome.PROJECTED), + ) as mock_proj, + patch(f"{VRS_MAP}._log_projection_validation"), + caplog.at_level(logging.WARNING, logger=VRS_MAP), + ): + mock_genomic.return_value = MappedScore( + accession_id="urn:mavedb:00000001-a-1#1", + score="1.0", + alignment_level=AnnotationLayer.GENOMIC, + ) + # transcript=None: no TxSelectResult produced for this NM_ target + result = _map_accession(metadata, [row], None, None) + + protein_scores = [ + v for v in result if v.alignment_level == AnnotationLayer.PROTEIN + ] + assert protein_scores == [] + assert any("no NP_ accession" in r.message for r in caplog.records) + # project_protein must still be False so no projected protein is emitted + _, kwargs = mock_proj.call_args + assert kwargs.get("project_protein") is False diff --git a/tests/test_vrs_map.py b/tests/test_vrs_map.py index 5d5d443..873b506 100644 --- a/tests/test_vrs_map.py +++ b/tests/test_vrs_map.py @@ -19,10 +19,13 @@ from dcd_mapping.schemas import ( AlignmentResult, MappedScore, + ScoreRow, ScoresetMetadata, + SequenceRange, TxSelectResult, ) -from dcd_mapping.vrs_map import vrs_map +from dcd_mapping.transcripts import TxSelectError +from dcd_mapping.vrs_map import _map_protein_layer, vrs_map def _assert_correct_vrs_map( @@ -398,3 +401,61 @@ def test_1_b_2( for call in store_calls: mock_seqrepo_access.sr.store.assert_any_call(*call) assert len(store_calls) == len(mock_seqrepo_access.sr.store.call_args_list) + + +class TestMapProteinLayerReason: + """``_map_protein_layer`` returns ``(mapping, reason)``: no mapping plus a specific + reason when there is no protein layer to map. A row that maps at no layer is failed + once, layer-agnostically, carrying this detailed reason rather than a failure + arbitrarily tagged to the protein layer. + """ + + def _row(self, hgvs_pro: str) -> ScoreRow: + return ScoreRow( + hgvs_nt="_wt", + hgvs_pro=hgvs_pro, + score="1.0", + accession="urn:mavedb:00000001-a-2#1", + ) + + def _tx(self) -> TxSelectResult: + return TxSelectResult( + nm="NM_000001.1", + np="NP_000001.1", + start=0, + is_full_match=True, + sequence="MAA", + hgnc_symbol="GENE1", + ) + + def _align(self) -> AlignmentResult: + return AlignmentResult( + chrom="chr1", + query_range=SequenceRange(start=0, end=9), + query_subranges=[SequenceRange(start=0, end=9)], + hit_range=SequenceRange(start=1000, end=1009), + hit_subranges=[SequenceRange(start=1000, end=1009)], + aligner_parameters={"aligner": "blat"}, + ) + + def test_transcript_error_surfaces_its_message(self): + err = TxSelectError("no transcript selected for GENE1") + mapping, reason = _map_protein_layer(self._row("p.Ala2Val"), "SQ.x", err, None) + assert mapping is None + assert reason == "no transcript selected for GENE1" + + @pytest.mark.parametrize("hgvs_pro", ["_wt", "_sy"]) + def test_invalid_protein_variant_names_inputs(self, hgvs_pro): + mapping, reason = _map_protein_layer( + self._row(hgvs_pro), "SQ.x", self._tx(), self._align() + ) + assert mapping is None + assert "Can't process variant syntax" in reason + assert hgvs_pro in reason # names the actual input, not a layer + + def test_valid_protein_without_alignment_reports_alignment_failure(self): + mapping, reason = _map_protein_layer( + self._row("p.Ala2Val"), "SQ.x", self._tx(), None + ) + assert mapping is None + assert "could not be aligned" in reason