Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/mavedb/lib/clingen/alleles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Query helpers for fetching score-set alleles for ClinGen registration.

Both submit_score_set_mappings_to_car and warm_clingen_cache use the same allele
scope: all current MappingRecordAllele links (authoritative and RT-derived) for a
score set. A single definition here prevents the two jobs from drifting apart.
"""

from typing import NamedTuple

from sqlalchemy import select
from sqlalchemy.orm import Session

from mavedb.models.allele import Allele
from mavedb.models.mapping_record import MappingRecord
from mavedb.models.mapping_record_allele import MappingRecordAllele
from mavedb.models.variant import Variant


class ScoreSetAlleleRow(NamedTuple):
"""One (allele, variant) link for a score set. An allele shared by multiple variants
appears once per variant so callers can fan annotation statuses out correctly.

``is_authoritative`` is a property of the link, not the allele: the same VRS allele can be
the authoritative measurement for one variant and an RT-derived equivalence for another.
"""

allele_id: int
post_mapped: dict | None
clingen_allele_id: str | None
variant_id: int
is_authoritative: bool


def get_alleles_for_score_set(db: Session, score_set_id: int) -> list[ScoreSetAlleleRow]:
"""Return all current alleles for a score set with their linked variant IDs.

Covers both authoritative mapper alleles and RT-derived equivalence alleles —
the full set that requires ClinGen registration before the annotation fan-out
can run.

Only alleles with a non-null ``post_mapped`` are returned — variants that failed
or were benignly absent have no allele link and cannot receive a CAID.
"""
rows = db.execute(
select(
Allele.id,
Allele.post_mapped,
Allele.clingen_allele_id,
Variant.id.label("variant_id"),
MappingRecordAllele.is_authoritative,
)
.join(MappingRecordAllele, MappingRecordAllele.allele_id == Allele.id)
.join(MappingRecord, MappingRecord.id == MappingRecordAllele.mapping_record_id)
.join(Variant, Variant.id == MappingRecord.variant_id)
.where(Variant.score_set_id == score_set_id)
.where(MappingRecord.current)
.where(MappingRecordAllele.current)
.where(Allele.post_mapped.is_not(None))
).all()

return [ScoreSetAlleleRow(r.id, r.post_mapped, r.clingen_allele_id, r.variant_id, r.is_authoritative) for r in rows]
26 changes: 14 additions & 12 deletions src/mavedb/lib/clingen/content_constructors.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from datetime import datetime
from typing import Optional
from uuid import uuid4
from urllib.parse import quote_plus

from mavedb import __version__
from mavedb.constants import MAVEDB_BASE_GIT, MAVEDB_FRONTEND_URL
from mavedb.lib.types.clingen import LdhContentLinkedData, LdhContentSubject, LdhEvent, LdhSubmission
from mavedb.lib.clingen.constants import LDH_ENTITY_NAME, LDH_SUBMISSION_TYPE
from mavedb.models.mapped_variant import MappedVariant
from mavedb.models.allele import Allele
from mavedb.models.mapping_record import MappingRecord
from mavedb.models.variant import Variant


Expand All @@ -32,36 +32,38 @@ def construct_ldh_submission_subject(hgvs: str) -> LdhContentSubject:
return {"Variant": {"hgvs": hgvs}}


def construct_ldh_submission_entity(variant: Variant, mapped_variant: Optional[MappedVariant]) -> LdhContentLinkedData:
entity: LdhContentLinkedData = {
def construct_ldh_submission_entity(
variant: Variant, mapping_record: MappingRecord, allele: Allele
) -> LdhContentLinkedData:
# Pre-mapped data and the mapping API version live on the per-variant MappingRecord;
# post-mapped data lives on the (cross-variant deduped) Allele.
return {
# TODO#372: We try to make all possible fields that are non-nullable represented that way.
"MaveDBMapping": [
{
"entContent": {
"mavedb_id": variant.urn, # type: ignore
"score": variant.data["score_data"]["score"], # type: ignore
"score_set_description": variant.score_set.short_description, # type: ignore
"pre_mapped": mapping_record.pre_mapped,
"post_mapped": allele.post_mapped,
"mapping_api_version": mapping_record.mapping_api_version,
},
"entId": variant.urn, # type: ignore
"entIri": f"{MAVEDB_FRONTEND_URL}/score-sets/{quote_plus(variant.score_set.urn)}?variant={quote_plus(variant.urn)}", # type: ignore
}
]
}
if mapped_variant is not None:
entity["MaveDBMapping"][0]["entContent"]["pre_mapped"] = mapped_variant.pre_mapped
entity["MaveDBMapping"][0]["entContent"]["post_mapped"] = mapped_variant.post_mapped
entity["MaveDBMapping"][0]["entContent"]["mapping_api_version"] = mapped_variant.mapping_api_version
return entity


def construct_ldh_submission(
variant_content: list[tuple[str, Variant, Optional[MappedVariant]]],
variant_content: list[tuple[str, Variant, MappingRecord, Allele]],
) -> list[LdhSubmission]:
content_submission: list[LdhSubmission] = []
for hgvs, variant, mapped_variant in variant_content:
for hgvs, variant, mapping_record, allele in variant_content:
subject = construct_ldh_submission_subject(hgvs)
event = construct_ldh_submission_event(subject)
entity = construct_ldh_submission_entity(variant, mapped_variant)
entity = construct_ldh_submission_entity(variant, mapping_record, allele)

content_submission.append(
{
Expand Down
37 changes: 21 additions & 16 deletions src/mavedb/lib/variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@
HGVS_P_REGEX = re.compile(r"(^|:)p\.")


def hgvs_from_vrs_allele(allele: dict) -> str:
def hgvs_from_vrs_allele(allele: dict) -> Optional[str]:
"""
Extract the HGVS notation from the VRS allele.
Extract the HGVS notation from the VRS allele, or None if it carries no expression.
"""
try:
# VRS 2.X
return allele["expressions"][0]["value"]
expressions = allele["expressions"] # VRS 2.X
except KeyError:
if "variation" in allele:
raise ValueError("VRS 1.X format not supported.")
# VRS 1.X. We don't want to allow this.
# return allele["variation"]["expressions"][0]["value"]
else:
raise KeyError("Invalid VRS allele structure. Expected 'expressions'.")
raise KeyError("Invalid VRS allele structure. Expected 'expressions'.")

# A valid VRS allele may simply carry no HGVS expression (None or empty) — e.g. a member of a
# cis-phased block. That is "no HGVS", not a crash.
if not expressions:
return None
return expressions[0]["value"]


def get_hgvs_from_post_mapped(post_mapped_vrs: Optional[Any], *, combine_cis: bool = False) -> Optional[str]:
Expand All @@ -38,22 +41,24 @@ def get_hgvs_from_post_mapped(post_mapped_vrs: Optional[Any], *, combine_cis: bo
if not post_mapped_vrs:
return None

if post_mapped_vrs["type"] == "Haplotype": # type: ignore
variations_hgvs = [hgvs_from_vrs_allele(allele) for allele in post_mapped_vrs["members"]]
elif post_mapped_vrs["type"] == "CisPhasedBlock": # type: ignore
variations_hgvs = [hgvs_from_vrs_allele(allele) for allele in post_mapped_vrs["members"]]
if post_mapped_vrs["type"] in ("Haplotype", "CisPhasedBlock"): # type: ignore
members = post_mapped_vrs["members"]
elif post_mapped_vrs["type"] == "Allele": # type: ignore
variations_hgvs = [hgvs_from_vrs_allele(post_mapped_vrs)]
members = [post_mapped_vrs]
else:
return None

if len(variations_hgvs) == 0:
member_hgvs = [hgvs_from_vrs_allele(allele) for allele in members]

# No members, or a member carrying no HGVS expression — no single/combinable HGVS to return.
if not member_hgvs or any(h is None for h in member_hgvs):
return None

if len(variations_hgvs) > 1:
return join_cis_phased_hgvs(variations_hgvs) if combine_cis else None
hgvs_values: list[str] = [h for h in member_hgvs if h is not None]
if len(hgvs_values) > 1:
return join_cis_phased_hgvs(hgvs_values) if combine_cis else None

return variations_hgvs[0]
return hgvs_values[0]


def get_digest_from_post_mapped(post_mapped_vrs: Optional[Any]) -> Optional[str]:
Expand Down
33 changes: 29 additions & 4 deletions src/mavedb/lib/vrs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,35 @@
from ga4gh.vrs.models import (
Allele,
CisPhasedBlock,
Expression,
LiteralSequenceExpression,
ReferenceLengthExpression,
SequenceLocation,
Syntax,
)
from ga4gh.vrs.normalize import normalize

from mavedb.lib.hgvs import split_cis_phased_hgvs

# HGVS type letter (``accession:g.``) → VRS Expression syntax.
_HGVS_SYNTAX_BY_TYPE = {
"g": Syntax.HGVS_G,
"c": Syntax.HGVS_C,
"p": Syntax.HGVS_P,
"n": Syntax.HGVS_N,
"m": Syntax.HGVS_M,
"r": Syntax.HGVS_R,
}


def _hgvs_syntax(hgvs: str) -> Syntax:
"""Map an HGVS string to its VRS Expression syntax via the type letter after the accession."""
_, _, rest = hgvs.partition(":")
try:
return _HGVS_SYNTAX_BY_TYPE[rest[:1]]
except KeyError:
raise ValueError(f"Cannot determine HGVS syntax for {hgvs!r}")


def translate_hgvs_to_vrs(hgvs: str, translator: AlleleTranslator) -> Allele:
"""Convert HGVS variation description to VRS object.
Expand Down Expand Up @@ -81,10 +102,14 @@ def translate_hgvs_to_variation(hgvs: str, translator: AlleleTranslator) -> Alle
:param translator: caller-owned AlleleTranslator reused across calls
:return: an Allele for a single variant, or a CisPhasedBlock for a cis-phased set
"""
members = [
normalize_and_identify(translate_hgvs_to_vrs(component, translator), translator.data_proxy)
for component in split_cis_phased_hgvs(hgvs)
]
members = []
for component in split_cis_phased_hgvs(hgvs):
allele = normalize_and_identify(translate_hgvs_to_vrs(component, translator), translator.data_proxy)
# Stamp the source HGVS as the allele's expression so post_mapped is self-describing.
# Mirrors the mapper's authoritative alleles.
allele.expressions = [Expression(syntax=_hgvs_syntax(component), value=component)]
members.append(allele)

if len(members) == 1:
return members[0]

Expand Down
2 changes: 1 addition & 1 deletion src/mavedb/models/score_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
class ScoreSet(Base):
__tablename__ = "scoresets"

id = Column(Integer, primary_key=True)
id: Mapped[int] = Column(Integer, primary_key=True)

# TODO(#372)
urn = Column(String(64), default=generate_temp_urn, index=True, nullable=True, unique=True)
Expand Down
Loading
Loading