Source code for gnomad_qc.v5.annotations.generate_variant_qc_annotations

"""Script to generate annotations for variant QC on gnomAD v5."""

import argparse
import logging

import hail as hl
from gnomad.utils.annotations import annotate_allele_info, get_lowqual_expr
from gnomad.utils.sparse_mt import split_info_annotation
from gnomad.variant_qc.pipeline import generate_sib_stats, generate_trio_stats

from gnomad_qc.resource_utils import check_resource_existence
from gnomad_qc.v5.annotations.annotation_utils import annotate_adj_no_dp, get_adj_expr
from gnomad_qc.v5.resources.annotations import (
    aou_annotated_sites_only_vcf,
    aou_vcf_header,
    get_info_ht,
    get_sib_stats,
    get_trio_stats,
)
from gnomad_qc.v5.resources.basics import get_aou_vds, get_logging_path
from gnomad_qc.v5.resources.constants import GNOMAD_TMP_BUCKET, WORKSPACE_BUCKET
from gnomad_qc.v5.resources.sample_qc import dense_trios, pedigree, relatedness

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("generate_variant_qc_annotations")
logger.setLevel(logging.INFO)


[docs]def generate_ac_info_ht(vds: hl.vds.VariantDataset) -> hl.Table: """ Compute AC and AC_raw annotations for each allele count filter group. :param vds: VariantDataset to use for computing AC and AC_raw annotations. :return: Table with AC and AC_raw annotations split by high quality, release, and unrelated. """ mt = vds.variant_data ac_filter_groups = { "high_quality": mt.meta.high_quality, "release": mt.meta.release, "unrelated": ~mt.meta.relatedness_inference.relatedness_filters.related, } mt = mt.annotate_cols(_ac_filter_groups=ac_filter_groups) mt = mt.annotate_rows(alt_alleles_range_array=hl.range(1, hl.len(mt.alleles))) ac_info_expr = hl.struct() # First compute ACs for each non-ref allele, grouped by adj. grp_ac_expr = { f: hl.agg.array_agg( lambda ai: hl.agg.filter( mt.LA.contains(ai) & mt._ac_filter_groups[f], hl.agg.group_by( get_adj_expr(mt.LGT, mt.GQ, mt.LAD), hl.agg.sum( mt.LGT.one_hot_alleles(mt.LA.map(lambda x: hl.str(x)))[ mt.LA.index(ai) ] ), ), ), mt.alt_alleles_range_array, ) for f in ac_filter_groups } # Then, for each non-ref allele, compute # 'AC' as the adj group # 'AC_raw' as the sum of adj and non-adj groups ac_info_expr = ac_info_expr.annotate( **{ f"AC{'_' + f if f else f}_raw": grp.map( lambda i: hl.int32(i.get(True, 0) + i.get(False, 0)) ) for f, grp in grp_ac_expr.items() }, **{ f"AC{'_' + f if f else f}": grp.map(lambda i: hl.int32(i.get(True, 0))) for f, grp in grp_ac_expr.items() }, ) ac_info_ht = mt.select_rows(AC_info=ac_info_expr).rows() # Split multi-allelic sites. ac_info_ht = annotate_allele_info(ac_info_ht) ac_info_ht = ac_info_ht.annotate( **{ a: ac_info_ht[a].annotate( **split_info_annotation(ac_info_ht[a], ac_info_ht.a_index), ) for a in ["AC_info"] }, ) return ac_info_ht
[docs]def create_info_ht( vcf_path: str, header_path: str, lowqual_indel_phred_het_prior: int = 40, vds: hl.vds.VariantDataset = None, test: bool = False, ) -> hl.Table: """ Import a VCF of AoU annotated sites, reformat annotations, and add AS_lowqual. :param vcf_path: Path to the annotated sites-only VCF. :param header_path: Path to the header file for the VCF. :param lowqual_indel_phred_het_prior: Phred-scaled prior for a het genotype at a site with a low quality indel. Default is 40. We use 1/10k bases (phred=40) to be more consistent with the filtering used by Broad's Data Sciences Platform for VQSR. :param vds: VariantDataset to use for computing AC and AC_raw annotations. :param test: Whether to write run a test using just the first two partitions of the loaded VCF. :return: Hail Table with reformatted annotations. """ ht = hl.import_vcf( vcf_path, force_bgz=True, header_file=header_path, reference_genome="GRCh38", ).rows() if test: ht = ht._filter_partitions(range(2)) logger.info("Reformatting annotations...") array_annotations = [ "AS_FS", "AS_MQ", "AS_MQRankSum", "AS_ReadPosRankSum", "AS_SOR", ] # AS_VarDP is in the format of "ref|alt" and VarDP is an int of the alt value from this, so can just set AS_VarDP to VarDP. # AS_SB_TABLE is an alternate formatting (string ref1, ref2 | alt 1, alt2) # of SB_TABLE, so can just set AS_SB_TABLE to SB_TABLE. info_updates = { # Convert single-element array annotations to float64. **{ann: hl.float64(ht.info[ann][0]) for ann in array_annotations}, # Extract singular element from AS_QD array and convert to float32. "AS_QD": hl.float32(ht.info.AS_QD[0]), "AS_QUALapprox": hl.int64(ht.info.QUALapprox), "AS_VarDP": ht.info.VarDP, "AS_SB_TABLE": ht.info.SB_TABLE, } ht = ht.transmute(info=ht.info.annotate(**info_updates)) ht = ht.annotate( info=ht.info.drop("SB", "QUALapprox", "VarDP", "SB_TABLE", "AS_RAW_MQ") ) ht = ht.drop("rsid") ht = ht.annotate( AS_lowqual=get_lowqual_expr( ht.alleles, ht.info.AS_QUALapprox, indel_phred_het_prior=lowqual_indel_phred_het_prior, ) ) logger.info("Adding AC info annotations to info ht...") ac_info_ht = generate_ac_info_ht(vds) ht = ht.annotate(**ac_info_ht[ht.key]) return ht
[docs]def run_generate_trio_stats( mt: hl.MatrixTable, fam_ped: hl.Pedigree, ) -> hl.Table: """ Generate trio transmission stats from a VariantDataset and pedigree info. :param mt: Dense trio MatrixTable. :param fam_ped: Pedigree containing trio info. :return: Table containing trio stats. """ # Add adj annotation and convert LGT to GT since only # autosomal bi-allelics are used to calculate trio stats. mt = annotate_adj_no_dp(mt) mt = mt.transmute_entries(GT=mt.LGT) mt = hl.trio_matrix(mt, pedigree=fam_ped, complete_trios=True) return generate_trio_stats(mt)
[docs]def run_generate_sib_stats( mt: hl.MatrixTable, relatedness_ht: hl.Table, ) -> hl.Table: """ Generate sibling stats from a VariantDataset and relatedness info. :param mt: Input MatrixTable. :param relatedness_ht: Table containing relatedness info. :return: Table containing sibling stats. """ mt = annotate_adj_no_dp(mt) mt = hl.experimental.sparse_split_multi(mt) return generate_sib_stats(mt, relatedness_ht)
[docs]def main(args): """Generate all variant annotations needed for variant QC.""" if args.rwb: environment = "rwb" hl.init( log="/home/jupyter/workspaces/gnomadproduction/generate_variant_qc_annotations.log", tmp_dir=f"gs://{WORKSPACE_BUCKET}/tmp/4_day", ) else: environment = "batch" hl.init( tmp_dir=f"gs://{GNOMAD_TMP_BUCKET}-4day", log="generate_variant_qc_annotations.log", ) # TODO: Add machine configurations for Batch. hl.default_reference("GRCh38") overwrite = args.overwrite test_n_partitions = args.test_n_partitions test = args.test or test_n_partitions is not None # NOTE: VDS will have 'aou_' prefix on sample IDs. vds = get_aou_vds( high_quality_only=True, filter_partitions=range(test_n_partitions) if test_n_partitions else None, annotate_meta=True, # NOTE: Using args.test here so that sibling stats test can be calculated from # a few partitions of the full (not test) VDS). test=args.test, ) try: if args.create_info_ht: info_ht_path = get_info_ht(test=test).path check_resource_existence( output_step_resources={ "info_ht": [info_ht_path], }, overwrite=overwrite, ) ht = create_info_ht( vcf_path=aou_annotated_sites_only_vcf, header_path=aou_vcf_header, lowqual_indel_phred_het_prior=args.lowqual_indel_phred_het_prior, vds=vds, test=test, ) ht.write(info_ht_path, overwrite=overwrite) if args.generate_trio_stats: logger.info("Generating trio stats...") trio_stats_ht_path = get_trio_stats(test=test, environment=environment).path check_resource_existence( output_step_resources={"trio_stats_ht": [trio_stats_ht_path]}, overwrite=overwrite, ) ht = run_generate_trio_stats( dense_trios(test=test, environment=environment).mt(), pedigree(test=test, environment=environment).pedigree(), ) ht.write(trio_stats_ht_path, overwrite=overwrite) if args.generate_sibling_stats: logger.info("Generating sibling stats...") sib_stats_ht_path = get_sib_stats(test=test, environment=environment).path check_resource_existence( output_step_resources={"sib_stats_ht": [sib_stats_ht_path]}, overwrite=overwrite, ) # Note: Checked sibling IDs; none of them have sample ID collisions. ht = run_generate_sib_stats(vds.variant_data, relatedness().ht()) ht.write(sib_stats_ht_path, overwrite=overwrite) finally: logger.info("Copying log to logging bucket...") hl.copy_log( get_logging_path("generate_variant_qc_annotations", environment=environment) )
def get_script_argument_parser() -> argparse.ArgumentParser: """Get script argument parser.""" parser = argparse.ArgumentParser() parser.add_argument( "--rwb", help="Run the script in RWB environment.", action="store_true", ) parser.add_argument( "--overwrite", help="Overwrite output files.", action="store_true", ) parser.add_argument( "--test", help="Write to test path.", action="store_true", ) parser.add_argument( "--test-n-partitions", help="Use only n partitions of the VDS as input for testing purposes (default: 2).", nargs="?", const=2, type=int, ) parser.add_argument( "--generate-trio-stats", help="Calculates trio stats.", action="store_true", ) parser.add_argument( "--generate-sibling-stats", help="Calculates sibling stats.", action="store_true", ) parser.add_argument( "--create-info-ht", help="Create the info ht containing annotations needed for variant QC.", action="store_true", ) parser.add_argument( "--lowqual-indel-phred-het-prior", help="Phred-scaled prior for a het genotype at a site with a low quality indel. Default is 40. We use 1/10k bases (phred=40) to be more consistent with the filtering used by Broad's Data Sciences Platform for VQSR.", default=40, type=int, ) return parser if __name__ == "__main__": parser = get_script_argument_parser() args = parser.parse_args() main(args)