Source code for gnomad_qc.v5.annotations.generate_variant_qc_annotations

"""Script to generate annotations for variant QC on gnomAD v5."""

import argparse
import logging
from typing import Dict

import hail as hl
from gnomad.resources.grch38.gnomad import GROUPS
from gnomad.resources.grch38.reference_data import get_truth_ht
from gnomad.utils.annotations import (
    annotate_allele_info,
    get_lowqual_expr,
    pab_max_expr,
)
from gnomad.utils.sparse_mt import split_info_annotation
from gnomad.utils.vcf import adjust_vcf_incompatible_types
from gnomad.variant_qc.pipeline import (
    INFO_FEATURES,
    NON_INFO_FEATURES,
    TRUTH_DATA,
    generate_sib_stats,
    generate_trio_stats,
)
from gnomad.variant_qc.random_forest import median_impute_features

from gnomad_qc.v5.annotations.annotation_utils import annotate_adj_no_dp, get_adj_expr
from gnomad_qc.v5.resources.annotations import (
    get_aou_annotated_sites_only_vcf,
    get_aou_vcf_header,
    get_info_ht,
    get_sib_stats,
    get_trio_stats,
    get_true_positive_vcf_path,
    get_variant_qc_annotations,
    info_vcf_path,
)
from gnomad_qc.v5.resources.basics import (
    _check_resource_existence,
    _get_batch_resource_kwargs,
    _init_hail,
    get_aou_vds,
    get_logging_path,
)
from gnomad_qc.v5.resources.sample_qc import dense_trios, pedigree, relatedness

logging.basicConfig(
    format="%(levelname)s (%(name)s %(lineno)s): %(message)s",
    level=logging.INFO,
    force=True,
)
logger = logging.getLogger("generate_variant_qc_annotations")
logger.setLevel(logging.INFO)


[docs]def generate_ac_info_ht(vds: hl.vds.VariantDataset) -> hl.Table: """ Compute AC and AC_raw annotations for each allele count filter group. Function also adds `AS_pab_max` and `allele_info` annotations. :param vds: VariantDataset to use for computing AC and AC_raw annotations. :return: Table with AC and AC_raw annotations split by high quality, release, and unrelated. """ mt = vds.variant_data ac_filter_groups = { "high_quality": mt.meta.high_quality, "release": mt.meta.release, "unrelated": ~mt.meta.relatedness_inference.relatedness_filters.related, } mt = mt.annotate_cols(_ac_filter_groups=ac_filter_groups) mt = mt.annotate_rows(alt_alleles_range_array=hl.range(1, hl.len(mt.alleles))) ac_info_expr = hl.struct() # First compute ACs for each non-ref allele, grouped by adj. grp_ac_expr = { f: hl.agg.array_agg( lambda ai: hl.agg.filter( mt.LA.contains(ai) & mt._ac_filter_groups[f], hl.agg.group_by( get_adj_expr(mt.LGT, mt.GQ, mt.LAD), hl.agg.sum( mt.LGT.one_hot_alleles(mt.LA.map(lambda x: hl.str(x)))[ mt.LA.index(ai) ] ), ), ), mt.alt_alleles_range_array, ) for f in ac_filter_groups } # Then, for each non-ref allele, compute # 'AC' as the adj group # 'AC_raw' as the sum of adj and non-adj groups ac_info_expr = ac_info_expr.annotate( **{ f"AC{'_' + f if f else f}_raw": grp.map( lambda i: hl.int32(i.get(True, 0) + i.get(False, 0)) ) for f, grp in grp_ac_expr.items() }, **{ f"AC{'_' + f if f else f}": grp.map(lambda i: hl.int32(i.get(True, 0))) for f, grp in grp_ac_expr.items() }, ) ac_info_expr = ac_info_expr.annotate( AS_pab_max=pab_max_expr(mt.LGT, mt.LAD, mt.LA, hl.len(mt.alleles)) ) ac_info_ht = mt.select_rows(AC_info=ac_info_expr).rows() # Split multi-allelic sites. ac_info_ht = annotate_allele_info(ac_info_ht) ac_info_ht = ac_info_ht.annotate( **{ a: ( ac_info_ht[a].annotate( **split_info_annotation(ac_info_ht[a], ac_info_ht.a_index), ) ) for a in ["AC_info"] }, ) return ac_info_ht
[docs]def create_info_ht( vcf_path: str, header_path: str, lowqual_indel_phred_het_prior: int = 40, vds: hl.vds.VariantDataset = None, test: bool = False, ) -> hl.Table: """ Import a VCF of AoU annotated sites, reformat annotations, and add AS_lowqual. :param vcf_path: Path to the annotated sites-only VCF. :param header_path: Path to the header file for the VCF. :param lowqual_indel_phred_het_prior: Phred-scaled prior for a het genotype at a site with a low quality indel. Default is 40. We use 1/10k bases (phred=40) to be more consistent with the filtering used by Broad's Data Sciences Platform for VQSR. :param vds: VariantDataset to use for computing AC and AC_raw annotations. :param test: Whether to write run a test using just the first two partitions of the loaded VCF. :return: Hail Table with reformatted annotations. """ ht = hl.import_vcf( vcf_path, force_bgz=True, header_file=header_path, reference_genome="GRCh38", ).rows() if test: ht = ht._filter_partitions(range(2)) logger.info("Reformatting annotations...") array_annotations = [ "AS_FS", "AS_MQ", "AS_MQRankSum", "AS_ReadPosRankSum", "AS_SOR", ] # AS_VarDP is in the format of "ref|alt" and VarDP is an int of the alt value from this, so can just set AS_VarDP to VarDP. # AS_SB_TABLE is an alternate formatting (string ref1, ref2 | alt 1, alt2) # of SB_TABLE, so can just set AS_SB_TABLE to SB_TABLE. info_updates = { # Convert single-element array annotations to float64. **{ann: hl.float64(ht.info[ann][0]) for ann in array_annotations}, # Extract singular element from AS_QD array and convert to float32. "AS_QD": hl.float32(ht.info.AS_QD[0]), "AS_QUALapprox": hl.int64(ht.info.QUALapprox), "AS_VarDP": ht.info.VarDP, "AS_SB_TABLE": ht.info.SB_TABLE, } ht = ht.transmute(info=ht.info.annotate(**info_updates)) ht = ht.annotate( info=ht.info.drop("SB", "QUALapprox", "VarDP", "SB_TABLE", "AS_RAW_MQ") ) ht = ht.drop("rsid") ht = ht.annotate( AS_lowqual=get_lowqual_expr( ht.alleles, ht.info.AS_QUALapprox, indel_phred_het_prior=lowqual_indel_phred_het_prior, ) ) logger.info("Adding AC info annotations to info ht...") ac_info_ht = generate_ac_info_ht(vds) ac_info_ht = ac_info_ht.checkpoint(hl.utils.new_temp_file("ac_info_ht", "ht")) # Annotate ac_info_ht with info annotations because info VCF # provided by AoU has variants not present in samples we consider high quality. info_ht = ac_info_ht.annotate(**ht[ac_info_ht.key]) info_ht = info_ht.annotate( info=info_ht.info.annotate(AS_pab_max=info_ht.AC_info.AS_pab_max), AC_info=info_ht.AC_info.drop("AS_pab_max"), ) return info_ht
[docs]def run_generate_trio_stats( mt: hl.MatrixTable, fam_ped: hl.Pedigree, ) -> hl.Table: """ Generate trio transmission stats from a VariantDataset and pedigree info. :param mt: Dense trio MatrixTable. :param fam_ped: Pedigree containing trio info. :return: Table containing trio stats. """ # Add adj annotation and convert LGT to GT since only # autosomal bi-allelics are used to calculate trio stats. mt = annotate_adj_no_dp(mt) mt = mt.transmute_entries(GT=mt.LGT) mt = hl.trio_matrix(mt, pedigree=fam_ped, complete_trios=True) return generate_trio_stats(mt)
[docs]def run_generate_sib_stats( mt: hl.MatrixTable, relatedness_ht: hl.Table, ) -> hl.Table: """ Generate sibling stats from a VariantDataset and relatedness info. :param mt: Input MatrixTable. :param relatedness_ht: Table containing relatedness info. :return: Table containing sibling stats. """ mt = annotate_adj_no_dp(mt) mt = hl.experimental.sparse_split_multi(mt) return generate_sib_stats(mt, relatedness_ht)
[docs]def create_variant_qc_annotation_ht( info_ht: hl.Table, trio_stats_ht: hl.Table, sib_stats_ht: hl.Table, impute_features: bool = True, n_partitions: int = 5000, ) -> hl.Table: """ Create a Table with all necessary annotations for variant QC. Annotations that are included: Features for RF: - variant_type - allele_type - n_alt_alleles - has_star - AS_QD - AS_pab_max - AS_MQRankSum - AS_SOR - AS_ReadPosRankSum Training sites (bool): - transmitted_singleton - sibling_singleton - fail_hard_filters - (ht.AS_QD < 0.5) | (ht.AS_FS > 60) | (ht.AS_MQ < 30) :param info_ht: Info Table with split multi-allelics. :param trio_stats_ht: Table with trio statistics. :param sib_stats_ht: Table with sibling statistics. :param impute_features: Whether to impute features using feature medians (this is done by variant type). :param n_partitions: Number of partitions to use for final annotated Table. :return: Hail Table with all annotations needed for variant QC. """ truth_data_ht = get_truth_ht() ht = info_ht.transmute(**info_ht.AC_info, **info_ht.allele_info) ht = ht.annotate(**ht.info.select(*INFO_FEATURES)) if impute_features: impute_ht = ht.select("variant_type", **ht.info) impute_ht = median_impute_features( impute_ht, {"variant_type": impute_ht.variant_type} ).checkpoint(hl.utils.new_temp_file("median_impute", "ht")) impute_result = impute_ht[ht.key] ht = ht.annotate( info=impute_result.drop("feature_imputed", "variant_type"), feature_imputed=impute_result.feature_imputed, ) ht = ht.annotate_globals(feature_medians=hl.eval(impute_ht.feature_medians)) logger.info("Annotating Table with trio and sibling stats and reference truth data") trio_stats_ht = trio_stats_ht.select( *[f"{a}_{group}" for a in ["n_transmitted", "ac_children"] for group in GROUPS] ) ht = ht.annotate( **trio_stats_ht[ht.key], **sib_stats_ht[ht.key], **truth_data_ht[ht.key], ) tp_map = { "transmitted_singleton": "n_transmitted", "sibling_singleton": "n_sib_shared_variants", } # Filter to only variants found in high quality samples and are not lowqual. ht = ht.filter((ht.AC_high_quality_raw > 0) & ~ht.AS_lowqual) select_dict = {tp: hl.or_else(ht[tp], False) for tp in TRUTH_DATA} select_dict.update( { f"{tp}_{group}": hl.or_else( (ht[f"{n}_{group}"] == 1) & (ht[f"AC_high_quality{'' if group == 'adj' else '_raw'}"] == 2), False, ) for tp, n in tp_map.items() for group in GROUPS } ) select_dict.update( { # NOTE: Previous versions used QD < 2, but we decided this was too # stringent based on the distribution of this metric in AoU. "fail_hard_filters": (ht.AS_QD < 0.5) | (ht.AS_FS > 60) | (ht.AS_MQ < 30), "singleton": ht.AC_release_raw == 1, "ac_raw": ht.AC_high_quality_raw, "ac": ht.AC_release, "ac_unrelated_raw": ht.AC_unrelated_raw, } ) if impute_features: select_dict["feature_imputed"] = ht.feature_imputed ht = ht.select( "a_index", "was_split", *NON_INFO_FEATURES, "info", **select_dict, ) temp_path = hl.utils.new_temp_file("variant_qc_annotations", "ht") ht.write(temp_path) ht = hl.read_table(temp_path, _n_partitions=n_partitions) ht.describe() summary = ht.group_by( *TRUTH_DATA, *[f"{tp}_{group}" for tp in tp_map for group in GROUPS] ).aggregate(n=hl.agg.count()) logger.info("Summary of truth data annotations:") summary.show(-1) return ht
[docs]def get_tp_ht_for_vcf_export( ht: hl.Table, transmitted_singletons: bool = False, sibling_singletons: bool = False, ) -> Dict[str, hl.Table]: """ Get Tables with raw and adj true positive variants to export as a VCF for use in VQSR. :param ht: Input Table with transmitted singleton and sibling singleton information. :param transmitted_singletons: Whether to include transmitted singletons in the true positive variants. :param sibling_singletons: Whether to include sibling singletons in the true positive variants. :return: Dictionary of 'raw' and 'adj' true positive variant sites Tables. """ if not transmitted_singletons and not sibling_singletons: raise ValueError( "At least one of transmitted_singletons or sibling_singletons must be set " "to True" ) tp_hts = {} for group in GROUPS: filter_expr = False if transmitted_singletons: filter_expr = ht[f"transmitted_singleton_{group}"] if sibling_singletons: filter_expr = filter_expr | ht[f"sibling_singleton_{group}"] filtered_ht = ht.filter(filter_expr).select().select_globals() filtered_ht = filtered_ht.checkpoint( hl.utils.new_temp_file("true_positive_variants", "ht"), ) logger.info( "True positive %s Table for VCF export contains %d variants", group, filtered_ht.count(), ) tp_hts[group] = filtered_ht return tp_hts
[docs]def main(args): """Generate all variant annotations needed for variant QC.""" environment = args.environment _init_hail( "generate_variant_qc_annotations", environment, **_get_batch_resource_kwargs(args), ) overwrite = args.overwrite test_n_partitions = args.test_n_partitions test = args.test or test_n_partitions is not None info_ht_path = get_info_ht(test=test, environment=environment).path trio_stats_ht_path = get_trio_stats(test=test, environment=environment).path sib_stats_ht_path = get_sib_stats(test=test, environment=environment).path variant_qc_annotation_ht_path = get_variant_qc_annotations( test=test, environment=environment ).path if args.export_true_positive_vcfs and not ( args.transmitted_singletons or args.sibling_singletons ): raise ValueError( "--export-true-positive-vcfs requires at least one of" " --transmitted-singletons or --sibling-singletons" ) # Load VDS only if needed for create_info_ht or generate_sibling_stats. if args.create_info_ht or args.generate_sibling_stats: # NOTE: VDS will have 'aou_' prefix on sample IDs. vds = get_aou_vds( high_quality_only=True, filter_partitions=range(test_n_partitions) if test_n_partitions else None, annotate_meta=True, # NOTE: Using args.test here so that sibling stats test can be calculated from # a few partitions of the full (not test) VDS). test=args.test, environment=environment, ) try: if args.create_info_ht: _check_resource_existence( environment=environment, output_step_resources={ "info_ht": [info_ht_path], }, overwrite=overwrite, ) ht = create_info_ht( vcf_path=get_aou_annotated_sites_only_vcf(environment=environment), header_path=get_aou_vcf_header(environment=environment), lowqual_indel_phred_het_prior=args.lowqual_indel_phred_het_prior, vds=vds, test=test, ) ht.write(info_ht_path, overwrite=overwrite) if args.export_info_vcf: logger.info("Exporting info ht as VCF...") out_info_vcf_path = info_vcf_path(test=test, environment=environment) _check_resource_existence( environment=environment, input_step_resources={ "info_ht": [info_ht_path], }, output_step_resources={ "info_vcf_path": [out_info_vcf_path], }, overwrite=overwrite, ) info_ht = hl.read_table(info_ht_path) # TODO: Check if AS_QUALapprox and AS_VarDP are needed for v5 (not used for v4) and if so need preceeded pipe. # Reformat AS_SB_TABLE to be a nested array of arrays for proper use # within the 'adjust_vcf_incompatible_types' function. info_ht = info_ht.annotate( info=info_ht.info.annotate( AS_SB_TABLE=hl.array( [info_ht.info.AS_SB_TABLE[0:2], info_ht.info.AS_SB_TABLE[2:4]] ) ) ) info_ht = adjust_vcf_incompatible_types( info_ht, pipe_delimited_annotations=[] ) hl.export_vcf(info_ht, out_info_vcf_path, tabix=True) if args.generate_trio_stats: logger.info("Generating trio stats...") _check_resource_existence( environment=environment, output_step_resources={"trio_stats_ht": [trio_stats_ht_path]}, overwrite=overwrite, ) ht = run_generate_trio_stats( dense_trios(test=test, environment=environment).mt(), pedigree(test=test, environment=environment).pedigree(), ) ht.write(trio_stats_ht_path, overwrite=overwrite) if args.generate_sibling_stats: logger.info("Generating sibling stats...") _check_resource_existence( environment=environment, output_step_resources={"sib_stats_ht": [sib_stats_ht_path]}, overwrite=overwrite, ) # Note: Checked sibling IDs; none of them have sample ID collisions. ht = run_generate_sib_stats( vds.variant_data, relatedness(environment=environment).ht() ) ht.write(sib_stats_ht_path, overwrite=overwrite) if args.create_variant_qc_annotation_ht: logger.info("Creating variant QC annotation HT...") _check_resource_existence( environment=environment, output_step_resources={ "variant_qc_annotation_ht": [variant_qc_annotation_ht_path] }, overwrite=overwrite, ) ht = create_variant_qc_annotation_ht( hl.read_table(info_ht_path), hl.read_table(trio_stats_ht_path), hl.read_table(sib_stats_ht_path), impute_features=args.impute_features, n_partitions=args.n_partitions, ) ht.write(variant_qc_annotation_ht_path, overwrite=overwrite) if args.export_true_positive_vcfs: logger.info("Exporting true positive VCFs...") tp_parts = [] if args.transmitted_singletons: tp_parts.append("transmitted_singleton") if args.sibling_singletons: tp_parts.append("sibling_singleton") tp_type = ".".join(tp_parts) vcf_path_kwargs = dict( test=test, environment=environment, true_positive_type=tp_type ) raw_tp_vcf_path = get_true_positive_vcf_path(**vcf_path_kwargs, adj=False) adj_tp_vcf_path = get_true_positive_vcf_path(**vcf_path_kwargs, adj=True) _check_resource_existence( environment=environment, input_step_resources={ "variant_qc_annotation_ht": [variant_qc_annotation_ht_path], }, output_step_resources={ "raw_true_positive_vcf_path": [raw_tp_vcf_path], "adj_true_positive_vcf_path": [adj_tp_vcf_path], }, overwrite=overwrite, ) tp_hts = get_tp_ht_for_vcf_export( hl.read_table(variant_qc_annotation_ht_path), transmitted_singletons=args.transmitted_singletons, sibling_singletons=args.sibling_singletons, ) hl.export_vcf(tp_hts["raw"], raw_tp_vcf_path, tabix=True) hl.export_vcf(tp_hts["adj"], adj_tp_vcf_path, tabix=True) finally: if environment == "rwb": logger.info("Copying log to logging bucket...") hl.copy_log( get_logging_path( "generate_variant_qc_annotations", environment=environment ) )
def get_script_argument_parser() -> argparse.ArgumentParser: """Get script argument parser.""" parser = argparse.ArgumentParser() parser.add_argument( "--environment", help="Environment where script will run.", choices=["rwb", "batch"], default="rwb", ) parser.add_argument( "--app-name", type=str, default=None, help="Job name for batch/QoB backend.", ) parser.add_argument( "--driver-cores", help="Number of cores. Applies to Batch environment only. Hail default is 1 if unspecified.", type=int, default=None, ) parser.add_argument( "--driver-memory", help="Memory for driver node. Applies to Batch environment only. Hail default is 'standard' if unspecified.", type=str, default=None, ) parser.add_argument( "--worker-cores", help="Number of cores. Applies to Batch environment only. Hail default is 1 if unspecified.", type=int, default=None, ) parser.add_argument( "--worker-memory", help="Memory for worker nodes. Applies to Batch environment only. Hail default is 'standard' if unspecified.", type=str, default=None, ) parser.add_argument( "--overwrite", help="Overwrite output files.", action="store_true", ) parser.add_argument( "--test", help="Write to test path.", action="store_true", ) parser.add_argument( "--test-n-partitions", help="Use only n partitions of the VDS as input for testing purposes (default: 2).", nargs="?", const=2, type=int, ) parser.add_argument( "--generate-trio-stats", help="Calculates trio stats.", action="store_true", ) parser.add_argument( "--generate-sibling-stats", help="Calculates sibling stats.", action="store_true", ) parser.add_argument( "--create-info-ht", help="Create the info ht containing annotations needed for variant QC.", action="store_true", ) parser.add_argument( "--lowqual-indel-phred-het-prior", help="Phred-scaled prior for a het genotype at a site with a low quality indel. Default is 40. We use 1/10k bases (phred=40) to be more consistent with the filtering used by Broad's Data Sciences Platform for VQSR.", default=40, type=int, ) parser.add_argument( "--export-info-vcf", help="Export info ht as VCF.", action="store_true" ) variant_qc_annotation_args = parser.add_argument_group( "Variant QC annotation HT parameters." ) variant_qc_annotation_args.add_argument( "--create-variant-qc-annotation-ht", help="Creates an annotated HT with features for variant QC.", action="store_true", ) variant_qc_annotation_args.add_argument( "--impute-features", help="If set, imputation is performed for variant QC features.", action="store_true", ) variant_qc_annotation_args.add_argument( "--n-partitions", help="Desired number of partitions for variant QC annotation HT.", type=int, default=5000, ) tp_vcf_args = parser.add_argument_group( "Export true positive VCFs", "Arguments used to define true positive variant set.", ) tp_vcf_args.add_argument( "--export-true-positive-vcfs", help=( "Exports true positive variants (--transmitted-singletons and/or" " --sibling-singletons) to VCF files." ), action="store_true", ) tp_vcf_args.add_argument( "--transmitted-singletons", help=( "Include transmitted singletons in the exports of true positive variants to" " VCF files." ), action="store_true", ) tp_vcf_args.add_argument( "--sibling-singletons", help=( "Include sibling singletons in the exports of true positive variants to VCF" " files." ), action="store_true", ) return parser if __name__ == "__main__": parser = get_script_argument_parser() args = parser.parse_args() main(args)