"""Script to generate annotations for variant QC on gnomAD v5."""
import argparse
import logging
from typing import Dict
import hail as hl
from gnomad.resources.grch38.gnomad import GROUPS
from gnomad.resources.grch38.reference_data import get_truth_ht
from gnomad.utils.annotations import (
annotate_allele_info,
get_lowqual_expr,
pab_max_expr,
)
from gnomad.utils.sparse_mt import split_info_annotation
from gnomad.utils.vcf import adjust_vcf_incompatible_types
from gnomad.variant_qc.pipeline import (
INFO_FEATURES,
NON_INFO_FEATURES,
TRUTH_DATA,
generate_sib_stats,
generate_trio_stats,
)
from gnomad.variant_qc.random_forest import median_impute_features
from gnomad_qc.v5.annotations.annotation_utils import annotate_adj_no_dp, get_adj_expr
from gnomad_qc.v5.resources.annotations import (
get_aou_annotated_sites_only_vcf,
get_aou_vcf_header,
get_info_ht,
get_sib_stats,
get_trio_stats,
get_true_positive_vcf_path,
get_variant_qc_annotations,
info_vcf_path,
)
from gnomad_qc.v5.resources.basics import (
_check_resource_existence,
_get_batch_resource_kwargs,
_init_hail,
get_aou_vds,
get_logging_path,
)
from gnomad_qc.v5.resources.sample_qc import dense_trios, pedigree, relatedness
logging.basicConfig(
format="%(levelname)s (%(name)s %(lineno)s): %(message)s",
level=logging.INFO,
force=True,
)
logger = logging.getLogger("generate_variant_qc_annotations")
logger.setLevel(logging.INFO)
[docs]def generate_ac_info_ht(vds: hl.vds.VariantDataset) -> hl.Table:
"""
Compute AC and AC_raw annotations for each allele count filter group.
Function also adds `AS_pab_max` and `allele_info` annotations.
:param vds: VariantDataset to use for computing AC and AC_raw annotations.
:return: Table with AC and AC_raw annotations split by high quality, release, and unrelated.
"""
mt = vds.variant_data
ac_filter_groups = {
"high_quality": mt.meta.high_quality,
"release": mt.meta.release,
"unrelated": ~mt.meta.relatedness_inference.relatedness_filters.related,
}
mt = mt.annotate_cols(_ac_filter_groups=ac_filter_groups)
mt = mt.annotate_rows(alt_alleles_range_array=hl.range(1, hl.len(mt.alleles)))
ac_info_expr = hl.struct()
# First compute ACs for each non-ref allele, grouped by adj.
grp_ac_expr = {
f: hl.agg.array_agg(
lambda ai: hl.agg.filter(
mt.LA.contains(ai) & mt._ac_filter_groups[f],
hl.agg.group_by(
get_adj_expr(mt.LGT, mt.GQ, mt.LAD),
hl.agg.sum(
mt.LGT.one_hot_alleles(mt.LA.map(lambda x: hl.str(x)))[
mt.LA.index(ai)
]
),
),
),
mt.alt_alleles_range_array,
)
for f in ac_filter_groups
}
# Then, for each non-ref allele, compute
# 'AC' as the adj group
# 'AC_raw' as the sum of adj and non-adj groups
ac_info_expr = ac_info_expr.annotate(
**{
f"AC{'_' + f if f else f}_raw": grp.map(
lambda i: hl.int32(i.get(True, 0) + i.get(False, 0))
)
for f, grp in grp_ac_expr.items()
},
**{
f"AC{'_' + f if f else f}": grp.map(lambda i: hl.int32(i.get(True, 0)))
for f, grp in grp_ac_expr.items()
},
)
ac_info_expr = ac_info_expr.annotate(
AS_pab_max=pab_max_expr(mt.LGT, mt.LAD, mt.LA, hl.len(mt.alleles))
)
ac_info_ht = mt.select_rows(AC_info=ac_info_expr).rows()
# Split multi-allelic sites.
ac_info_ht = annotate_allele_info(ac_info_ht)
ac_info_ht = ac_info_ht.annotate(
**{
a: (
ac_info_ht[a].annotate(
**split_info_annotation(ac_info_ht[a], ac_info_ht.a_index),
)
)
for a in ["AC_info"]
},
)
return ac_info_ht
[docs]def create_info_ht(
vcf_path: str,
header_path: str,
lowqual_indel_phred_het_prior: int = 40,
vds: hl.vds.VariantDataset = None,
test: bool = False,
) -> hl.Table:
"""
Import a VCF of AoU annotated sites, reformat annotations, and add AS_lowqual.
:param vcf_path: Path to the annotated sites-only VCF.
:param header_path: Path to the header file for the VCF.
:param lowqual_indel_phred_het_prior: Phred-scaled prior for a het genotype at a site with a low quality indel. Default is 40. We use 1/10k bases (phred=40) to be more consistent with the filtering used by Broad's Data Sciences Platform for VQSR.
:param vds: VariantDataset to use for computing AC and AC_raw annotations.
:param test: Whether to write run a test using just the first two partitions of the loaded VCF.
:return: Hail Table with reformatted annotations.
"""
ht = hl.import_vcf(
vcf_path,
force_bgz=True,
header_file=header_path,
reference_genome="GRCh38",
).rows()
if test:
ht = ht._filter_partitions(range(2))
logger.info("Reformatting annotations...")
array_annotations = [
"AS_FS",
"AS_MQ",
"AS_MQRankSum",
"AS_ReadPosRankSum",
"AS_SOR",
]
# AS_VarDP is in the format of "ref|alt" and VarDP is an int of the alt value from this, so can just set AS_VarDP to VarDP.
# AS_SB_TABLE is an alternate formatting (string ref1, ref2 | alt 1, alt2)
# of SB_TABLE, so can just set AS_SB_TABLE to SB_TABLE.
info_updates = {
# Convert single-element array annotations to float64.
**{ann: hl.float64(ht.info[ann][0]) for ann in array_annotations},
# Extract singular element from AS_QD array and convert to float32.
"AS_QD": hl.float32(ht.info.AS_QD[0]),
"AS_QUALapprox": hl.int64(ht.info.QUALapprox),
"AS_VarDP": ht.info.VarDP,
"AS_SB_TABLE": ht.info.SB_TABLE,
}
ht = ht.transmute(info=ht.info.annotate(**info_updates))
ht = ht.annotate(
info=ht.info.drop("SB", "QUALapprox", "VarDP", "SB_TABLE", "AS_RAW_MQ")
)
ht = ht.drop("rsid")
ht = ht.annotate(
AS_lowqual=get_lowqual_expr(
ht.alleles,
ht.info.AS_QUALapprox,
indel_phred_het_prior=lowqual_indel_phred_het_prior,
)
)
logger.info("Adding AC info annotations to info ht...")
ac_info_ht = generate_ac_info_ht(vds)
ac_info_ht = ac_info_ht.checkpoint(hl.utils.new_temp_file("ac_info_ht", "ht"))
# Annotate ac_info_ht with info annotations because info VCF
# provided by AoU has variants not present in samples we consider high quality.
info_ht = ac_info_ht.annotate(**ht[ac_info_ht.key])
info_ht = info_ht.annotate(
info=info_ht.info.annotate(AS_pab_max=info_ht.AC_info.AS_pab_max),
AC_info=info_ht.AC_info.drop("AS_pab_max"),
)
return info_ht
[docs]def run_generate_trio_stats(
mt: hl.MatrixTable,
fam_ped: hl.Pedigree,
) -> hl.Table:
"""
Generate trio transmission stats from a VariantDataset and pedigree info.
:param mt: Dense trio MatrixTable.
:param fam_ped: Pedigree containing trio info.
:return: Table containing trio stats.
"""
# Add adj annotation and convert LGT to GT since only
# autosomal bi-allelics are used to calculate trio stats.
mt = annotate_adj_no_dp(mt)
mt = mt.transmute_entries(GT=mt.LGT)
mt = hl.trio_matrix(mt, pedigree=fam_ped, complete_trios=True)
return generate_trio_stats(mt)
[docs]def run_generate_sib_stats(
mt: hl.MatrixTable,
relatedness_ht: hl.Table,
) -> hl.Table:
"""
Generate sibling stats from a VariantDataset and relatedness info.
:param mt: Input MatrixTable.
:param relatedness_ht: Table containing relatedness info.
:return: Table containing sibling stats.
"""
mt = annotate_adj_no_dp(mt)
mt = hl.experimental.sparse_split_multi(mt)
return generate_sib_stats(mt, relatedness_ht)
[docs]def create_variant_qc_annotation_ht(
info_ht: hl.Table,
trio_stats_ht: hl.Table,
sib_stats_ht: hl.Table,
impute_features: bool = True,
n_partitions: int = 5000,
) -> hl.Table:
"""
Create a Table with all necessary annotations for variant QC.
Annotations that are included:
Features for RF:
- variant_type
- allele_type
- n_alt_alleles
- has_star
- AS_QD
- AS_pab_max
- AS_MQRankSum
- AS_SOR
- AS_ReadPosRankSum
Training sites (bool):
- transmitted_singleton
- sibling_singleton
- fail_hard_filters - (ht.AS_QD < 0.5) | (ht.AS_FS > 60) | (ht.AS_MQ < 30)
:param info_ht: Info Table with split multi-allelics.
:param trio_stats_ht: Table with trio statistics.
:param sib_stats_ht: Table with sibling statistics.
:param impute_features: Whether to impute features using feature medians (this is
done by variant type).
:param n_partitions: Number of partitions to use for final annotated Table.
:return: Hail Table with all annotations needed for variant QC.
"""
truth_data_ht = get_truth_ht()
ht = info_ht.transmute(**info_ht.AC_info, **info_ht.allele_info)
ht = ht.annotate(**ht.info.select(*INFO_FEATURES))
if impute_features:
impute_ht = ht.select("variant_type", **ht.info)
impute_ht = median_impute_features(
impute_ht, {"variant_type": impute_ht.variant_type}
).checkpoint(hl.utils.new_temp_file("median_impute", "ht"))
impute_result = impute_ht[ht.key]
ht = ht.annotate(
info=impute_result.drop("feature_imputed", "variant_type"),
feature_imputed=impute_result.feature_imputed,
)
ht = ht.annotate_globals(feature_medians=hl.eval(impute_ht.feature_medians))
logger.info("Annotating Table with trio and sibling stats and reference truth data")
trio_stats_ht = trio_stats_ht.select(
*[f"{a}_{group}" for a in ["n_transmitted", "ac_children"] for group in GROUPS]
)
ht = ht.annotate(
**trio_stats_ht[ht.key],
**sib_stats_ht[ht.key],
**truth_data_ht[ht.key],
)
tp_map = {
"transmitted_singleton": "n_transmitted",
"sibling_singleton": "n_sib_shared_variants",
}
# Filter to only variants found in high quality samples and are not lowqual.
ht = ht.filter((ht.AC_high_quality_raw > 0) & ~ht.AS_lowqual)
select_dict = {tp: hl.or_else(ht[tp], False) for tp in TRUTH_DATA}
select_dict.update(
{
f"{tp}_{group}": hl.or_else(
(ht[f"{n}_{group}"] == 1)
& (ht[f"AC_high_quality{'' if group == 'adj' else '_raw'}"] == 2),
False,
)
for tp, n in tp_map.items()
for group in GROUPS
}
)
select_dict.update(
{
# NOTE: Previous versions used QD < 2, but we decided this was too
# stringent based on the distribution of this metric in AoU.
"fail_hard_filters": (ht.AS_QD < 0.5) | (ht.AS_FS > 60) | (ht.AS_MQ < 30),
"singleton": ht.AC_release_raw == 1,
"ac_raw": ht.AC_high_quality_raw,
"ac": ht.AC_release,
"ac_unrelated_raw": ht.AC_unrelated_raw,
}
)
if impute_features:
select_dict["feature_imputed"] = ht.feature_imputed
ht = ht.select(
"a_index",
"was_split",
*NON_INFO_FEATURES,
"info",
**select_dict,
)
temp_path = hl.utils.new_temp_file("variant_qc_annotations", "ht")
ht.write(temp_path)
ht = hl.read_table(temp_path, _n_partitions=n_partitions)
ht.describe()
summary = ht.group_by(
*TRUTH_DATA, *[f"{tp}_{group}" for tp in tp_map for group in GROUPS]
).aggregate(n=hl.agg.count())
logger.info("Summary of truth data annotations:")
summary.show(-1)
return ht
[docs]def get_tp_ht_for_vcf_export(
ht: hl.Table,
transmitted_singletons: bool = False,
sibling_singletons: bool = False,
) -> Dict[str, hl.Table]:
"""
Get Tables with raw and adj true positive variants to export as a VCF for use in VQSR.
:param ht: Input Table with transmitted singleton and sibling singleton information.
:param transmitted_singletons: Whether to include transmitted singletons in the
true positive variants.
:param sibling_singletons: Whether to include sibling singletons in the true
positive variants.
:return: Dictionary of 'raw' and 'adj' true positive variant sites Tables.
"""
if not transmitted_singletons and not sibling_singletons:
raise ValueError(
"At least one of transmitted_singletons or sibling_singletons must be set "
"to True"
)
tp_hts = {}
for group in GROUPS:
filter_expr = False
if transmitted_singletons:
filter_expr = ht[f"transmitted_singleton_{group}"]
if sibling_singletons:
filter_expr = filter_expr | ht[f"sibling_singleton_{group}"]
filtered_ht = ht.filter(filter_expr).select().select_globals()
filtered_ht = filtered_ht.checkpoint(
hl.utils.new_temp_file("true_positive_variants", "ht"),
)
logger.info(
"True positive %s Table for VCF export contains %d variants",
group,
filtered_ht.count(),
)
tp_hts[group] = filtered_ht
return tp_hts
[docs]def main(args):
"""Generate all variant annotations needed for variant QC."""
environment = args.environment
_init_hail(
"generate_variant_qc_annotations",
environment,
**_get_batch_resource_kwargs(args),
)
overwrite = args.overwrite
test_n_partitions = args.test_n_partitions
test = args.test or test_n_partitions is not None
info_ht_path = get_info_ht(test=test, environment=environment).path
trio_stats_ht_path = get_trio_stats(test=test, environment=environment).path
sib_stats_ht_path = get_sib_stats(test=test, environment=environment).path
variant_qc_annotation_ht_path = get_variant_qc_annotations(
test=test, environment=environment
).path
if args.export_true_positive_vcfs and not (
args.transmitted_singletons or args.sibling_singletons
):
raise ValueError(
"--export-true-positive-vcfs requires at least one of"
" --transmitted-singletons or --sibling-singletons"
)
# Load VDS only if needed for create_info_ht or generate_sibling_stats.
if args.create_info_ht or args.generate_sibling_stats:
# NOTE: VDS will have 'aou_' prefix on sample IDs.
vds = get_aou_vds(
high_quality_only=True,
filter_partitions=range(test_n_partitions) if test_n_partitions else None,
annotate_meta=True,
# NOTE: Using args.test here so that sibling stats test can be calculated from
# a few partitions of the full (not test) VDS).
test=args.test,
environment=environment,
)
try:
if args.create_info_ht:
_check_resource_existence(
environment=environment,
output_step_resources={
"info_ht": [info_ht_path],
},
overwrite=overwrite,
)
ht = create_info_ht(
vcf_path=get_aou_annotated_sites_only_vcf(environment=environment),
header_path=get_aou_vcf_header(environment=environment),
lowqual_indel_phred_het_prior=args.lowqual_indel_phred_het_prior,
vds=vds,
test=test,
)
ht.write(info_ht_path, overwrite=overwrite)
if args.export_info_vcf:
logger.info("Exporting info ht as VCF...")
out_info_vcf_path = info_vcf_path(test=test, environment=environment)
_check_resource_existence(
environment=environment,
input_step_resources={
"info_ht": [info_ht_path],
},
output_step_resources={
"info_vcf_path": [out_info_vcf_path],
},
overwrite=overwrite,
)
info_ht = hl.read_table(info_ht_path)
# TODO: Check if AS_QUALapprox and AS_VarDP are needed for v5 (not used for v4) and if so need preceeded pipe.
# Reformat AS_SB_TABLE to be a nested array of arrays for proper use
# within the 'adjust_vcf_incompatible_types' function.
info_ht = info_ht.annotate(
info=info_ht.info.annotate(
AS_SB_TABLE=hl.array(
[info_ht.info.AS_SB_TABLE[0:2], info_ht.info.AS_SB_TABLE[2:4]]
)
)
)
info_ht = adjust_vcf_incompatible_types(
info_ht, pipe_delimited_annotations=[]
)
hl.export_vcf(info_ht, out_info_vcf_path, tabix=True)
if args.generate_trio_stats:
logger.info("Generating trio stats...")
_check_resource_existence(
environment=environment,
output_step_resources={"trio_stats_ht": [trio_stats_ht_path]},
overwrite=overwrite,
)
ht = run_generate_trio_stats(
dense_trios(test=test, environment=environment).mt(),
pedigree(test=test, environment=environment).pedigree(),
)
ht.write(trio_stats_ht_path, overwrite=overwrite)
if args.generate_sibling_stats:
logger.info("Generating sibling stats...")
_check_resource_existence(
environment=environment,
output_step_resources={"sib_stats_ht": [sib_stats_ht_path]},
overwrite=overwrite,
)
# Note: Checked sibling IDs; none of them have sample ID collisions.
ht = run_generate_sib_stats(
vds.variant_data, relatedness(environment=environment).ht()
)
ht.write(sib_stats_ht_path, overwrite=overwrite)
if args.create_variant_qc_annotation_ht:
logger.info("Creating variant QC annotation HT...")
_check_resource_existence(
environment=environment,
output_step_resources={
"variant_qc_annotation_ht": [variant_qc_annotation_ht_path]
},
overwrite=overwrite,
)
ht = create_variant_qc_annotation_ht(
hl.read_table(info_ht_path),
hl.read_table(trio_stats_ht_path),
hl.read_table(sib_stats_ht_path),
impute_features=args.impute_features,
n_partitions=args.n_partitions,
)
ht.write(variant_qc_annotation_ht_path, overwrite=overwrite)
if args.export_true_positive_vcfs:
logger.info("Exporting true positive VCFs...")
tp_parts = []
if args.transmitted_singletons:
tp_parts.append("transmitted_singleton")
if args.sibling_singletons:
tp_parts.append("sibling_singleton")
tp_type = ".".join(tp_parts)
vcf_path_kwargs = dict(
test=test, environment=environment, true_positive_type=tp_type
)
raw_tp_vcf_path = get_true_positive_vcf_path(**vcf_path_kwargs, adj=False)
adj_tp_vcf_path = get_true_positive_vcf_path(**vcf_path_kwargs, adj=True)
_check_resource_existence(
environment=environment,
input_step_resources={
"variant_qc_annotation_ht": [variant_qc_annotation_ht_path],
},
output_step_resources={
"raw_true_positive_vcf_path": [raw_tp_vcf_path],
"adj_true_positive_vcf_path": [adj_tp_vcf_path],
},
overwrite=overwrite,
)
tp_hts = get_tp_ht_for_vcf_export(
hl.read_table(variant_qc_annotation_ht_path),
transmitted_singletons=args.transmitted_singletons,
sibling_singletons=args.sibling_singletons,
)
hl.export_vcf(tp_hts["raw"], raw_tp_vcf_path, tabix=True)
hl.export_vcf(tp_hts["adj"], adj_tp_vcf_path, tabix=True)
finally:
if environment == "rwb":
logger.info("Copying log to logging bucket...")
hl.copy_log(
get_logging_path(
"generate_variant_qc_annotations", environment=environment
)
)
def get_script_argument_parser() -> argparse.ArgumentParser:
"""Get script argument parser."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--environment",
help="Environment where script will run.",
choices=["rwb", "batch"],
default="rwb",
)
parser.add_argument(
"--app-name",
type=str,
default=None,
help="Job name for batch/QoB backend.",
)
parser.add_argument(
"--driver-cores",
help="Number of cores. Applies to Batch environment only. Hail default is 1 if unspecified.",
type=int,
default=None,
)
parser.add_argument(
"--driver-memory",
help="Memory for driver node. Applies to Batch environment only. Hail default is 'standard' if unspecified.",
type=str,
default=None,
)
parser.add_argument(
"--worker-cores",
help="Number of cores. Applies to Batch environment only. Hail default is 1 if unspecified.",
type=int,
default=None,
)
parser.add_argument(
"--worker-memory",
help="Memory for worker nodes. Applies to Batch environment only. Hail default is 'standard' if unspecified.",
type=str,
default=None,
)
parser.add_argument(
"--overwrite",
help="Overwrite output files.",
action="store_true",
)
parser.add_argument(
"--test",
help="Write to test path.",
action="store_true",
)
parser.add_argument(
"--test-n-partitions",
help="Use only n partitions of the VDS as input for testing purposes (default: 2).",
nargs="?",
const=2,
type=int,
)
parser.add_argument(
"--generate-trio-stats",
help="Calculates trio stats.",
action="store_true",
)
parser.add_argument(
"--generate-sibling-stats",
help="Calculates sibling stats.",
action="store_true",
)
parser.add_argument(
"--create-info-ht",
help="Create the info ht containing annotations needed for variant QC.",
action="store_true",
)
parser.add_argument(
"--lowqual-indel-phred-het-prior",
help="Phred-scaled prior for a het genotype at a site with a low quality indel. Default is 40. We use 1/10k bases (phred=40) to be more consistent with the filtering used by Broad's Data Sciences Platform for VQSR.",
default=40,
type=int,
)
parser.add_argument(
"--export-info-vcf", help="Export info ht as VCF.", action="store_true"
)
variant_qc_annotation_args = parser.add_argument_group(
"Variant QC annotation HT parameters."
)
variant_qc_annotation_args.add_argument(
"--create-variant-qc-annotation-ht",
help="Creates an annotated HT with features for variant QC.",
action="store_true",
)
variant_qc_annotation_args.add_argument(
"--impute-features",
help="If set, imputation is performed for variant QC features.",
action="store_true",
)
variant_qc_annotation_args.add_argument(
"--n-partitions",
help="Desired number of partitions for variant QC annotation HT.",
type=int,
default=5000,
)
tp_vcf_args = parser.add_argument_group(
"Export true positive VCFs",
"Arguments used to define true positive variant set.",
)
tp_vcf_args.add_argument(
"--export-true-positive-vcfs",
help=(
"Exports true positive variants (--transmitted-singletons and/or"
" --sibling-singletons) to VCF files."
),
action="store_true",
)
tp_vcf_args.add_argument(
"--transmitted-singletons",
help=(
"Include transmitted singletons in the exports of true positive variants to"
" VCF files."
),
action="store_true",
)
tp_vcf_args.add_argument(
"--sibling-singletons",
help=(
"Include sibling singletons in the exports of true positive variants to VCF"
" files."
),
action="store_true",
)
return parser
if __name__ == "__main__":
parser = get_script_argument_parser()
args = parser.parse_args()
main(args)