Source code for gnomad.variant_qc.evaluation

# noqa: D100

import logging
from typing import Dict, Optional

import hail as hl

logging.basicConfig(
    format="%(asctime)s (%(name)s %(lineno)s): %(message)s",
    datefmt="%m/%d/%Y %I:%M:%S %p",
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


[docs]def compute_ranked_bin( ht: hl.Table, score_expr: hl.expr.NumericExpression, bin_expr: Dict[str, hl.expr.BooleanExpression] = {"bin": True}, compute_snv_indel_separately: bool = True, n_bins: int = 100, desc: bool = True, ) -> hl.Table: r""" Return a table with a bin for each row based on the ranking of `score_expr`. The bin is computed by dividing the `score_expr` into `n_bins` bins containing approximately equal numbers of elements. This is done by ranking the rows by `score_expr` (and a random number in cases where multiple variants have the same score) and then assigning the variant to a bin based on its ranking. If `compute_snv_indel_separately` is True all items in `bin_expr` will be stratified by snv / indels for the ranking and bin calculation. Because SNV and indel rows are mutually exclusive, they are re-combined into a single annotation. For example if we have the following four variants and scores and `n_bins` of 2: ======== ======= ====== ================= ================= Variant Type Score bin - `compute_snv_indel_separately`: -------- ------- ------ ------------------------------------- \ \ \ False True ======== ======= ====== ================= ================= Var1 SNV 0.1 1 1 Var2 SNV 0.2 1 2 Var3 Indel 0.3 2 1 Var4 Indel 0.4 2 2 ======== ======= ====== ================= ================= .. note:: The `bin_expr` defines which data the bin(s) should be computed on. E.g., to get biallelic specific binning and singleton specific binning, the following could be used: .. code-block:: python bin_expr={ 'biallelic_bin': ~ht.was_split, 'singleton_bin': ht.singleton } :param ht: Input Table :param score_expr: Expression containing the score :param bin_expr: Specific row grouping(s) to perform ranking and binning on (see note) :param compute_snv_indel_separately: Should all `bin_expr` items be stratified by SNVs / indels :param n_bins: Number of bins to bin the data into :param desc: Whether to bin the score in descending order :return: Table with the requested bin annotations """ if compute_snv_indel_separately: # For each bin, add a SNV / indel stratification bin_expr = { f"{bin_id}_{snv}": bin_expr & snv_expr for bin_id, bin_expr in bin_expr.items() for snv, snv_expr in [ ("snv", hl.is_snp(ht.alleles[0], ht.alleles[1])), ("indel", ~hl.is_snp(ht.alleles[0], ht.alleles[1])), ] } bin_ht = ht.select( **{f"_filter_{bin_id}": bin_expr for bin_id, bin_expr in bin_expr.items()}, _score=score_expr, snv=hl.is_snp(ht.alleles[0], ht.alleles[1]), _rand=hl.rand_unif(0, 1), ) # Checkpoint bin Table prior to variant count aggregation. bin_ht = bin_ht.checkpoint(hl.utils.new_temp_file("bin", "ht")) # Compute variant counts per group defined by bin_expr. This is used to determine # bin assignment. bin_group_variant_counts = bin_ht.aggregate( hl.Struct( **{ bin_id: hl.agg.filter( bin_ht[f"_filter_{bin_id}"], hl.agg.count(), ) for bin_id in bin_expr } ) ) logger.info( "Sorting the HT by score_expr followed by a random float between 0 and 1. " "Then adding a row index per grouping defined by bin_expr..." ) bin_ht = bin_ht.key_by("_score", "_rand") bin_ht = bin_ht.annotate( **{ f"{bin_id}_rank": hl.or_missing( bin_ht[f"_filter_{bin_id}"], hl.scan.count_where(bin_ht[f"_filter_{bin_id}"]), ) for bin_id in bin_expr } ) bin_ht = bin_ht.key_by("locus", "alleles") logger.info("Binning ranked rows into %d bins...", n_bins) bin_ht = bin_ht.select( "snv", **{ bin_id: hl.int( hl.floor( ( n_bins * ( bin_ht[f"{bin_id}_rank"] / hl.float64(bin_group_variant_counts[bin_id]) ) ) + 1 ) ) for bin_id in bin_expr }, ) if desc: bin_ht = bin_ht.annotate( **{bin_id: n_bins - bin_ht[bin_id] + 1 for bin_id in bin_expr} ) # Because SNV and indel rows are mutually exclusive, re-combine them into a single bin. # Update the global bin_group_variant_counts struct to reflect the change # in bin names in the table if compute_snv_indel_separately: bin_expr_no_snv = { bin_id.rsplit("_", 1)[0] for bin_id in bin_group_variant_counts } bin_group_variant_counts = hl.struct( **{ bin_id: hl.struct( **{ snv: bin_group_variant_counts[f"{bin_id}_{snv}"] for snv in ["snv", "indel"] } ) for bin_id in bin_expr_no_snv } ) bin_ht = bin_ht.transmute( **{ bin_id: hl.if_else( bin_ht.snv, bin_ht[f"{bin_id}_snv"], bin_ht[f"{bin_id}_indel"], ) for bin_id in bin_expr_no_snv } ) bin_ht = bin_ht.annotate_globals(bin_group_variant_counts=bin_group_variant_counts) return bin_ht
[docs]def compute_grouped_binned_ht( bin_ht: hl.Table, checkpoint_path: Optional[str] = None, ) -> hl.GroupedTable: """ Group a Table that has been annotated with bins (`compute_ranked_bin` or `create_binned_ht`). The table will be grouped by bin_id (bin, biallelic, etc.), contig, snv, bi_allelic and singleton. .. note:: If performing an aggregation following this grouping (such as `score_bin_agg`) then the aggregation function will need to use `ht._parent` to get the origin Table from the GroupedTable for the aggregation :param bin_ht: Input Table with a `bin_id` annotation :param checkpoint_path: If provided an intermediate checkpoint table is created with all required annotations before shuffling. :return: Table grouped by bins(s) """ # Explode the rank table by bin_id bin_ht = bin_ht.annotate( bin_groups=hl.array( [ hl.Struct(bin_id=bin_name, bin=bin_ht[bin_name]) for bin_name in bin_ht.bin_group_variant_counts ] ) ) bin_ht = bin_ht.explode(bin_ht.bin_groups) bin_ht = bin_ht.transmute( bin_id=bin_ht.bin_groups.bin_id, bin=bin_ht.bin_groups.bin ) bin_ht = bin_ht.filter(hl.is_defined(bin_ht.bin)) if checkpoint_path is not None: bin_ht.checkpoint(checkpoint_path, overwrite=True) else: bin_ht = bin_ht.persist() # Group by bin_id, bin and additional stratification desired and compute # QC metrics per bin return bin_ht.group_by( bin_id=bin_ht.bin_id, contig=bin_ht.locus.contig, snv=hl.is_snp(bin_ht.alleles[0], bin_ht.alleles[1]), bi_allelic=~bin_ht.was_split, singleton=bin_ht.singleton, release_adj=bin_ht.ac > 0, bin=bin_ht.bin, )._set_buffer_size(20000)
[docs]def compute_binned_truth_sample_concordance( ht: hl.Table, binned_score_ht: hl.Table, n_bins: int = 100, add_bins: Dict[str, hl.expr.BooleanExpression] = {}, ) -> hl.Table: """ Determine the concordance (TP, FP, FN) between a truth sample within the callset and the samples truth data grouped by bins computed using `compute_ranked_bin`. .. note:: The input 'ht` should contain three row fields: - score: value to use for binning - GT: a CallExpression containing the genotype of the evaluation data for the sample - truth_GT: a CallExpression containing the genotype of the truth sample The input `binned_score_ht` should contain: - score: value used to bin the full callset - bin: the full callset bin 'add_bins` can be used to add additional global and truth sample binning to the final binned truth sample concordance HT. The keys in `add_bins` must be present in `binned_score_ht` and the values in `add_bins` should be expressions on `ht` that define a subset of variants to bin in the truth sample. An example is if we want to look at the global and truth sample binning on only bi-allelic variants. `add_bins` could be set to {'biallelic_bin': ht.biallelic}. The table is grouped by global/truth sample bin and variant type and contains TP, FP and FN. :param ht: Input HT :param binned_score_ht: Table with the bin annotation for each variant :param n_bins: Number of bins to bin the data into :param add_bins: Dictionary of additional global bin columns (key) and the expr to use for binning the truth sample (value) :return: Binned truth sample concordance HT """ # Annotate score and global bin indexed_binned_score_ht = binned_score_ht[ht.key] ht = ht.annotate( **{f"global_{bin_id}": indexed_binned_score_ht[bin_id] for bin_id in add_bins}, **{f"_{bin_id}": bin_expr for bin_id, bin_expr in add_bins.items()}, score=indexed_binned_score_ht.score, global_bin=indexed_binned_score_ht.bin, ) ht = ht.checkpoint(hl.utils.new_temp_file("pre_bin", "ht")) # Annotate the truth sample bin bin_ht = compute_ranked_bin( ht, score_expr=ht.score, bin_expr={ "truth_sample_bin": hl.expr.bool(True), **{f"truth_sample_{bin_id}": ht[f"_{bin_id}"] for bin_id in add_bins}, }, n_bins=n_bins, ) ht = ht.join(bin_ht, how="left") bin_list = [ hl.tuple(["global_bin", ht.global_bin]), hl.tuple(["truth_sample_bin", ht.truth_sample_bin]), ] bin_list.extend( [hl.tuple([f"global_{bin_id}", ht[f"global_{bin_id}"]]) for bin_id in add_bins] ) bin_list.extend( [ hl.tuple([f"truth_sample_{bin_id}", ht[f"truth_sample_{bin_id}"]]) for bin_id in add_bins ] ) # Explode the global and truth sample bins ht = ht.annotate(bin=bin_list) ht = ht.explode(ht.bin) ht = ht.annotate(bin_id=ht.bin[0], bin=hl.int(ht.bin[1])) # Compute TP, FP and FN by bin_id, variant type and bin return ( ht.group_by("bin_id", "snv", "bin") .aggregate( # TP => allele is found in both data sets tp=hl.agg.count_where(ht.GT.is_non_ref() & ht.truth_GT.is_non_ref()), # FP => allele is found only in test data set fp=hl.agg.count_where( ht.GT.is_non_ref() & hl.or_else(ht.truth_GT.is_hom_ref(), True) ), # FN => allele is found in truth data only fn=hl.agg.count_where( hl.or_else(ht.GT.is_hom_ref(), True) & ht.truth_GT.is_non_ref() ), min_score=hl.agg.min(ht.score), max_score=hl.agg.max(ht.score), n_alleles=hl.agg.count(), ) .repartition(5) )
[docs]def create_truth_sample_ht( mt: hl.MatrixTable, truth_mt: hl.MatrixTable, high_confidence_intervals_ht: hl.Table ) -> hl.Table: """ Compute a table comparing a truth sample in callset vs the truth. :param mt: MT of truth sample from callset to be compared to truth :param truth_mt: MT of truth sample :param high_confidence_intervals_ht: High confidence interval HT :return: Table containing both the callset truth sample and the truth data """ def split_filter_and_flatten_ht( truth_mt: hl.MatrixTable, high_confidence_intervals_ht: hl.Table ) -> hl.Table: """ Split a truth sample MT, filter it to the given high confidence intervals, and then "flatten" it as a HT by annotating GT in a row field. :param truth_mt: Truth sample MT :param high_confidence_intervals_ht: High confidence intervals :return: Truth sample table with GT as a row annotation """ assert truth_mt.count_cols() == 1 if not "was_split" in truth_mt.row: truth_mt = hl.split_multi_hts(truth_mt) truth_mt = truth_mt.filter_rows( hl.is_defined(high_confidence_intervals_ht[truth_mt.locus]) ) rename_entries = {"GT": "_GT"} if "adj" in truth_mt.entry: rename_entries.update({"adj": "_adj"}) truth_mt = truth_mt.rename(rename_entries) return truth_mt.annotate_rows( **{x: hl.agg.take(truth_mt[f"_{x}"], 1)[0] for x in rename_entries} ).rows() # Load truth sample MT, # restrict it to high confidence intervals # and flatten it to a HT by annotating GT in a row annotation truth_ht = split_filter_and_flatten_ht(truth_mt, high_confidence_intervals_ht) truth_ht = truth_ht.rename({f: f"truth_{f}" for f in truth_ht.row_value}) # Similarly load, filter and flatten callset truth sample MT ht = split_filter_and_flatten_ht(mt, high_confidence_intervals_ht) # Outer join of truth and callset truth and annotate the score and global bin ht = truth_ht.join(ht, how="outer") ht = ht.annotate(snv=hl.is_snp(ht.alleles[0], ht.alleles[1])) return ht
[docs]def add_rank( ht: hl.Table, score_expr: hl.expr.NumericExpression, subrank_expr: Optional[Dict[str, hl.expr.BooleanExpression]] = None, ) -> hl.Table: """ Add rank based on the `score_expr`. Rank is added for snvs and indels separately. If one or more `subrank_expr` are provided, then subrank is added based on all sites for which the boolean expression is true. In addition, variant counts (snv, indel separately) is added as a global (`rank_variant_counts`). :param ht: input Hail Table containing variants (with QC annotations) to be ranked :param score_expr: the Table annotation by which ranking should be scored :param subrank_expr: Any subranking to be added in the form name_of_subrank: subrank_filtering_expr :return: Table with rankings added """ key = ht.key if subrank_expr is None: subrank_expr = {} temp_expr = {"_score": score_expr} temp_expr.update({f"_{name}": expr for name, expr in subrank_expr.items()}) rank_ht = ht.select(**temp_expr, is_snv=hl.is_snp(ht.alleles[0], ht.alleles[1])) rank_ht = rank_ht.key_by("_score").persist() scan_expr = { "rank": hl.if_else( rank_ht.is_snv, hl.scan.count_where(rank_ht.is_snv), hl.scan.count_where(~rank_ht.is_snv), ) } scan_expr.update( { name: hl.or_missing( rank_ht[f"_{name}"], hl.if_else( rank_ht.is_snv, hl.scan.count_where(rank_ht.is_snv & rank_ht[f"_{name}"]), hl.scan.count_where(~rank_ht.is_snv & rank_ht[f"_{name}"]), ), ) for name in subrank_expr } ) rank_ht = rank_ht.annotate(**scan_expr) rank_ht = rank_ht.key_by(*key).persist() rank_ht = rank_ht.select(*scan_expr.keys()) ht = ht.annotate(**rank_ht[key]) return ht