Source code for gnomad.variant_qc.pipeline

# noqa: D100

import logging
from typing import Dict, List, Optional, Tuple, Union

import hail as hl
import pyspark.sql

import gnomad.resources.grch37 as grch37_resources
import gnomad.resources.grch38 as grch38_resources
from gnomad.sample_qc.relatedness import (
    SIBLINGS,
    generate_sib_stats_expr,
    generate_trio_stats_expr,
)
from gnomad.utils.annotations import annotate_adj, bi_allelic_expr
from gnomad.utils.filtering import filter_to_autosomes, filter_to_clinvar_pathogenic
from gnomad.utils.reference_genome import get_reference_genome
from gnomad.variant_qc.evaluation import compute_ranked_bin
from gnomad.variant_qc.random_forest import (
    get_features_importance,
    test_model,
    train_rf,
)
from gnomad.variant_qc.training import sample_training_examples

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

INBREEDING_COEFF_HARD_CUTOFF = -0.3


[docs]def create_binned_ht( ht: hl.Table, n_bins: int = 100, singleton: bool = True, biallelic: bool = True, adj: bool = True, add_substrat: Optional[Dict[str, hl.expr.BooleanExpression]] = None, ) -> hl.Table: """ Annotate each row of `ht` with a bin based on binning the score annotation into `n_bins` equally-sized bins. This is meant as a default wrapper for `compute_ranked_bin`. .. note:: The following fields should be present: - score - ac - expected that this is the adj filtered allele count - ac_raw - expected that this is the raw allele count before adj filtering Computes bin numbers stratified by SNV / Indels and with the following optional sub bins - singletons - biallelics - biallelic singletons - adj - adj biallelics - adj singletons - adj biallelic singletons :param ht: Input table :param n_bins: Number of bins to bin into :param singleton: Should bins be stratified by singletons :param biallelic: Should bins be stratified by bi-alleleic variants :param adj: Should bins be stratified by adj filtering :param add_substrat: Any additional stratifications for adding bins :return: table with bin number for each variant """ def _new_bin_expr( bin_expr: Union[Dict[str, hl.expr.BooleanExpression], Dict[str, bool]], new_expr: hl.expr.BooleanExpression, new_id: str, update: bool = False, ) -> Dict[str, hl.expr.BooleanExpression]: """ Update a dictionary of expressions to add another stratification. :param bin_expr: Dictionary of expressions to add another stratification to :param new_expr: New Boolean expression to add to `bin_expr` :param new_id: Name to add to each current key in `bin_expr` to indicate the new stratification :return: Dictionary of `bin_expr` updated with `new_expr` added as an additional stratification to all expressions already in `bin_expr` """ new_bin_expr = { f"{new_id}_{bin_id}": bin_expr & new_expr for bin_id, bin_expr in bin_expr.items() } if update: bin_expr.update(new_bin_expr) return bin_expr else: return new_bin_expr # Desired bins and sub-bins bin_expr = {"bin": True} if singleton: bin_expr = _new_bin_expr(bin_expr, ht.ac_raw == 1, "singleton", update=True) if biallelic: bin_expr = _new_bin_expr(bin_expr, ~ht.was_split, "biallelic", update=True) if adj: bin_expr = _new_bin_expr(bin_expr, (ht.ac > 0), "adj", update=True) if add_substrat is not None: new_bin_expr = {} for add_id, add_expr in add_substrat.items(): new_bin_expr.update(_new_bin_expr(bin_expr, add_expr, add_id)) bin_expr.update(new_bin_expr) bin_ht = compute_ranked_bin( ht, score_expr=ht.score, bin_expr=bin_expr, n_bins=n_bins ) ht = ht.select_globals() ht = ht.join(bin_ht, how="left") return ht
[docs]def score_bin_agg( ht: hl.GroupedTable, fam_stats_ht: hl.Table ) -> Dict[str, hl.expr.Aggregation]: """ Make dict of aggregations for min/max of score, number of ClinVar variants, number of truth variants, and family statistics. .. note:: This function uses `ht._parent` to get the origin Table from the GroupedTable for the aggregation This can easily be combined with the GroupedTable returned by `compute_grouped_binned_ht`, For example: .. code-block:: python binned_ht = create_binned_ht(...) grouped_binned_ht = compute_grouped_binned_ht(binned_ht) agg_ht = grouped_binned_ht.aggregate(score_bin_agg(**grouped_binned_ht, ...)) .. note:: The following annotations should be present: In ht: - score - singleton - positive_train_site - negative_train_site - ac_raw - expected that this is the raw allele count before adj filtering - ac - expected that this is the allele count after adj filtering - ac_qc_samples_unrelated_raw - allele count before adj filtering for unrelated samples passing sample QC - info - struct that includes QD, FS, and MQ in order to add an annotation for fail_hard_filters In truth_ht: - omni - mills - hapmap - kgp_phase1_hc In fam_stats_ht: - n_de_novos_adj - n_de_novos_raw - n_transmitted_raw - n_untransmitted_raw Automatic aggregations that will be done are: - `min_score` - minimun of score annotation per group - `max_score` - maiximum of score annotation per group - `n` - count of variants per group - `n_ins` - count of insertion per group - `n_ins` - count of insertion per group - `n_del` - count of deletions per group - `n_ti` - count of transitions per group - `n_tv` - count of trnasversions per group - `n_1bp_indel` - count of one base pair indels per group - `n_mod3bp_indel` - count of indels with a length divisible by three per group - `n_singleton` - count of singletons per group - `fail_hard_filters` - count of variants per group with QD < 2 | FS > 60 | MQ < 30 - `n_vqsr_pos_train` - count of variants that were a VQSR positive train site per group - `n_vqsr_neg_train` - count of variants that were a VQSR negative train site per group - `n_clinvar` - count of clinvar variants - `n_de_novos_singleton_adj` - count of singleton de novo variants after adj filtration - `n_de_novo_singleton` - count of raw unfiltered singleton de novo variants - `n_de_novos_adj` - count of adj filtered de novo variants - `n_de_novos` - count of raw unfiltered de novo variants - `n_trans_singletons` - count of transmitted singletons - `n_untrans_singletons` - count of untransmitted singletons - `n_omni` - count of omni truth variants - `n_mills` - count of mills truth variants - `n_hapmap` - count of hapmap truth variants - `n_kgp_phase1_hc` - count of 1000 genomes phase 1 high confidence truth variants :param ht: Table that aggregation will be performed on :param fam_stats_ht: Path to family statistics HT :return: a dictionary containing aggregations to perform on ht """ # Annotate binned table with the evaluation data ht = ht._parent indel_length = hl.abs(ht.alleles[0].length() - ht.alleles[1].length()) # Load external evaluation data build = get_reference_genome(ht.locus).name clinvar_ht = ( grch37_resources.reference_data.clinvar if build == "GRCh37" else grch38_resources.reference_data.clinvar ).ht() # Filter to ClinVar pathogenic data. clinvar_path = filter_to_clinvar_pathogenic(clinvar_ht)[ht.key] clinvar = clinvar_ht[ht.key] truth_data = ( grch37_resources.reference_data.get_truth_ht() if build == "GRCh37" else grch38_resources.reference_data.get_truth_ht() )[ht.key] fam = fam_stats_ht[ht.key] if "fail_hard_filters" in ht.row: fail_hard_filters_expr = ht.fail_hard_filters elif "info" in ht.row: fail_hard_filters_expr = ( (ht.info.QD < 2) | (ht.info.FS > 60) | (ht.info.MQ < 30) ) else: raise ValueError( "Either 'fail_hard_filters' or 'info' must be present in the input Table!" ) ins_expr = hl.is_insertion(ht.alleles[0], ht.alleles[1]) del_expr = hl.is_deletion(ht.alleles[0], ht.alleles[1]) indel_1bp_expr = indel_length == 1 count_where_expr = { "n_ins": ins_expr, "n_del": del_expr, "n_ti": hl.is_transition(ht.alleles[0], ht.alleles[1]), "n_tv": hl.is_transversion(ht.alleles[0], ht.alleles[1]), "n_1bp_indel": indel_1bp_expr, "n_1bp_ins": ins_expr & indel_1bp_expr, "n_2bp_ins": ins_expr & (indel_length == 2), "n_3bp_ins": ins_expr & (indel_length == 3), "n_1bp_del": del_expr & indel_1bp_expr, "n_2bp_del": del_expr & (indel_length == 2), "n_3bp_del": del_expr & (indel_length == 3), "n_mod3bp_indel": (indel_length % 3) == 0, "n_singleton": ht.singleton, "fail_hard_filters": fail_hard_filters_expr, "n_pos_train": ht.positive_train_site, "n_neg_train": ht.negative_train_site, "n_clinvar": hl.is_defined(clinvar), "n_clinvar_path": hl.is_defined(clinvar_path), "n_omni": truth_data.omni, "n_mills": truth_data.mills, "n_hapmap": truth_data.hapmap, "n_kgp_phase1_hc": truth_data.kgp_phase1_hc, } return dict( min_score=hl.agg.min(ht.score), max_score=hl.agg.max(ht.score), n=hl.agg.count(), **{k: hl.agg.count_where(v) for k, v in count_where_expr.items()}, n_de_novos_singleton_adj=hl.agg.filter( ht.ac == 1, hl.agg.sum(fam.n_de_novos_adj) ), n_de_novo_singleton=hl.agg.filter( ht.ac_raw == 1, hl.agg.sum(fam.n_de_novos_raw) ), n_de_novos_adj=hl.agg.sum(fam.n_de_novos_adj), n_de_novo=hl.agg.sum(fam.n_de_novos_raw), n_de_novos_AF_001_adj=hl.agg.filter( hl.if_else( fam.ac_parents_adj == 0, 0.0, fam.ac_parents_adj / fam.an_parents_adj ) < 0.001, hl.agg.sum(fam.n_de_novos_adj), ), n_de_novos_AF_001=hl.agg.filter( hl.if_else( fam.ac_parents_raw == 0, 0.0, fam.ac_parents_raw / fam.an_parents_raw ) < 0.001, hl.agg.sum(fam.n_de_novos_raw), ), n_trans_singletons=hl.agg.filter( ht.ac_raw == 2, hl.agg.sum(fam.n_transmitted_raw) ), n_untrans_singletons=hl.agg.filter( (ht.ac_raw < 3) & (ht.ac_qc_samples_unrelated_raw == 1), hl.agg.sum(fam.n_untransmitted_raw), ), n_train_trans_singletons=hl.agg.filter( (ht.ac_raw == 2) & ht.positive_train_site, hl.agg.sum(fam.n_transmitted_raw) ), )
[docs]def generate_trio_stats( mt: hl.MatrixTable, autosomes_only: bool = True, bi_allelic_only: bool = True ) -> hl.Table: """ Run `generate_trio_stats_expr` with variant QC pipeline defaults to get trio stats stratified by raw and adj. .. note:: Expects that `mt` is it a trio matrix table that was annotated with adj and if dealing with a sparse MT `hl.experimental.densify` must be run first. By default this pipeline function will filter `mt` to only autosomes and bi-allelic sites. :param mt: A Trio Matrix Table returned from `hl.trio_matrix`. Must be dense :param autosomes_only: If set, only autosomal intervals are used. :param bi_allelic_only: If set, only bi-allelic sites are used for the computation :return: Table with trio stats """ if autosomes_only: mt = filter_to_autosomes(mt) if bi_allelic_only: mt = mt.filter_rows(bi_allelic_expr(mt)) logger.info("Generating trio stats using %d trios.", mt.count_cols()) trio_adj = mt.proband_entry.adj & mt.father_entry.adj & mt.mother_entry.adj ht = mt.select_rows( **generate_trio_stats_expr( mt, transmitted_strata={"raw": True, "adj": trio_adj}, de_novo_strata={"raw": True, "adj": trio_adj}, ac_strata={"raw": True, "adj": trio_adj}, ) ).rows() return ht
[docs]def generate_sib_stats( mt: hl.MatrixTable, relatedness_ht: hl.Table, i_col: str = "i", j_col: str = "j", relationship_col: str = "relationship", autosomes_only: bool = True, bi_allelic_only: bool = True, ) -> hl.Table: """ Generate a hail table with counts of variants shared by pairs of siblings in `relatedness_ht`. This is meant as a default wrapper for `generate_sib_stats_expr`. This function takes a hail Table with a row for each pair of individuals i,j in the data that are related (it's OK to have unrelated samples too). The `relationship_col` should be a column specifying the relationship between each two samples as defined by the constants in `gnomad.utils.relatedness`. This relationship_col will be used to filter to only pairs of samples that are annotated as `SIBLINGS`. .. note:: By default this pipeline function will filter `mt` to only autosomes and bi-allelic sites. :param mt: Input Matrix table :param relatedness_ht: Input relationship table :param i_col: Column containing the 1st sample of the pair in the relationship table :param j_col: Column containing the 2nd sample of the pair in the relationship table :param relationship_col: Column containing the relationship for the sample pair as defined in this module constants. :param autosomes_only: If set, only autosomal intervals are used. :param bi_allelic_only: If set, only bi-allelic sites are used for the computation :return: A Table with the sibling shared variant counts """ if autosomes_only: mt = filter_to_autosomes(mt) if bi_allelic_only: mt = mt.filter_rows(bi_allelic_expr(mt)) sib_ht = relatedness_ht.filter(relatedness_ht[relationship_col] == SIBLINGS) s_to_keep = sib_ht.aggregate( hl.agg.explode( lambda s: hl.agg.collect_as_set(s), [sib_ht[i_col].s, sib_ht[j_col].s] ), _localize=False, ) mt = mt.filter_cols(s_to_keep.contains(mt.s)) if "adj" not in mt.entry: mt = annotate_adj(mt) sib_stats_ht = mt.select_rows( **generate_sib_stats_expr( mt, sib_ht, i_col=i_col, j_col=j_col, strata={"raw": True, "adj": mt.adj}, ) ).rows() return sib_stats_ht
[docs]def train_rf_model( ht: hl.Table, rf_features: List[str], tp_expr: hl.expr.BooleanExpression, fp_expr: hl.expr.BooleanExpression, fp_to_tp: float = 1.0, num_trees: int = 500, max_depth: int = 5, test_expr: hl.expr.BooleanExpression = False, ) -> Tuple[hl.Table, pyspark.ml.PipelineModel]: """ Perform random forest (RF) training using a Table annotated with features and training data. .. note:: This function uses `train_rf` and extends it by: - Adding an option to apply the resulting model to test variants which are withheld from training. - Uses a false positive (FP) to true positive (TP) ratio to determine what variants to use for RF training. The returned Table includes the following annotations: - rf_train: indicates if the variant was used for training of the RF model. - rf_label: indicates if the variant is a TP or FP. - rf_test: indicates if the variant was used in testing of the RF model. - features: global annotation of the features used for the RF model. - features_importance: global annotation of the importance of each feature in the model. - test_results: results from testing the model on variants defined by `test_expr`. :param ht: Table annotated with features for the RF model and the positive and negative training data. :param rf_features: List of column names to use as features in the RF training. :param tp_expr: TP training expression. :param fp_expr: FP training expression. :param fp_to_tp: Ratio of FPs to TPs for creating the RF model. If set to 0, all training examples are used. :param num_trees: Number of trees in the RF model. :param max_depth: Maxmimum tree depth in the RF model. :param test_expr: An expression specifying variants to hold out for testing and use for evaluation only. :return: Table with TP and FP training sets used in the RF training and the resulting RF model. """ ht = ht.annotate(_tp=tp_expr, _fp=fp_expr, rf_test=test_expr) rf_ht = sample_training_examples( ht, tp_expr=ht._tp, fp_expr=ht._fp, fp_to_tp=fp_to_tp, test_expr=ht.rf_test ) ht = ht.annotate(rf_train=rf_ht[ht.key].train, rf_label=rf_ht[ht.key].label) summary = ht.group_by("_tp", "_fp", "rf_train", "rf_label", "rf_test").aggregate( n=hl.agg.count() ) logger.info("Summary of TP/FP and RF training data:") summary.show(n=20) logger.info( "Training RF model:\nfeatures: %s\nnum_tree: %d\nmax_depth:%d", ",".join(rf_features), num_trees, max_depth, ) rf_model = train_rf( ht.filter(ht.rf_train), features=rf_features, label="rf_label", num_trees=num_trees, max_depth=max_depth, ) test_results = None if test_expr is not None: logger.info("Testing model on specified variants or intervals...") test_ht = ht.filter(hl.is_defined(ht.rf_label) & ht.rf_test) test_results = test_model( test_ht, rf_model, features=rf_features, label="rf_label" ) features_importance = get_features_importance(rf_model) ht = ht.select_globals( features_importance=features_importance, features=rf_features, test_results=test_results, ) return ht.select("rf_train", "rf_label", "rf_test"), rf_model