Source code for gnomad_qc.v4.create_release.create_release_sites_ht

"""Script to create release sites HT for v4.0 exomes and genomes."""

import argparse
import json
import logging
from copy import deepcopy
from datetime import datetime
from functools import reduce
from typing import Any, Dict, List, Optional, Union

import hail as hl
from gnomad.resources.grch38.gnomad import SUBSETS
from gnomad.resources.grch38.reference_data import (
    dbsnp,
    lcr_intervals,
    seg_dup_intervals,
)
from gnomad.utils.annotations import region_flag_expr
from gnomad.utils.filtering import filter_arrays_by_meta, remove_fields_from_constant
from gnomad.utils.release import make_freq_index_dict_from_meta
from gnomad.utils.slack import slack_notifications
from gnomad.utils.vcf import ALLELE_TYPE_FIELDS, AS_FIELDS, SITE_FIELDS
from gnomad.utils.vep import update_loftee_end_trunc_filter
from hail.utils import new_temp_file

from gnomad_qc.slack_creds import slack_token
from gnomad_qc.v3.resources.annotations import get_info as get_info_v3
from gnomad_qc.v4.annotations.insilico_predictors import get_sift_polyphen_from_vep
from gnomad_qc.v4.create_release.create_release_utils import (
    DBSNP_VERSION,
    GENCODE_VERSION,
    GENCODE_VERSIONS,
    MANE_SELECT_VERSION,
    MANE_SELECT_VERSIONS,
    POLYPHEN_VERSION,
    SEQREPO_VERSION,
    SIFT_VERSION,
    VEP_VERSIONS_TO_ADD,
    VRS_PYTHON_VERSION,
    VRS_SCHEMA_VERSION,
    remove_missing_vep_fields,
)
from gnomad_qc.v4.resources.annotations import (
    get_freq,
    get_info,
    get_insilico_predictors,
    get_vep,
    get_vrs,
)
from gnomad_qc.v4.resources.basics import calling_intervals, qc_temp_prefix
from gnomad_qc.v4.resources.constants import CURRENT_RELEASE, DEFAULT_VEP_VERSION
from gnomad_qc.v4.resources.meta import meta
from gnomad_qc.v4.resources.release import (
    get_freq_array_readme,
    included_datasets_json_path,
    release_sites,
)
from gnomad_qc.v4.resources.sample_qc import interval_qc_pass
from gnomad_qc.v4.resources.variant_qc import final_filter

logging.basicConfig(
    format="%(asctime)s (%(name)s %(lineno)s): %(message)s",
    datefmt="%m/%d/%Y %I:%M:%S %p",
)
logger = logging.getLogger("create_release_ht")
logger.setLevel(logging.INFO)

# All v3 subsets except HGDP and TGP will be dropped from the v4.0 genomes.
SUBSETS_TO_DROP = remove_fields_from_constant(SUBSETS["v3"], ["hgdp", "tgp"])

# Remove InbreedingCoeff from allele-specific fields because it is processed separately
# from the other fields.
AS_FIELDS = [x for x in AS_FIELDS if x != "InbreedingCoeff"]
SITE_FIELDS = deepcopy(SITE_FIELDS)

# Remove original_alleles for containing non-releasable alleles.
ALLELE_TYPE_FIELDS = deepcopy(ALLELE_TYPE_FIELDS)
ALLELE_TYPE_FIELDS = remove_fields_from_constant(
    ALLELE_TYPE_FIELDS, ["original_alleles", "has_star"]
)


[docs]def get_tables_for_release(version: str) -> List[str]: """ Get list of tables to include in release based on version. :param version: Release version (e.g., "4.0", "4.1", "4.1.1"). :return: List of table names to include in release. """ tables = [ "dbsnp", "filters", "freq", "info", "region_flags", "in_silico", "vep", ] # Add additional VEP versions based on release version. if version in VEP_VERSIONS_TO_ADD: for vep_version in VEP_VERSIONS_TO_ADD[version]: tables.append(f"vep{vep_version}") # "joint_faf", # NOTE: joint_faf was only included in the v4.0 release. return tables
# Default tables for backward compatibility. TABLES_FOR_RELEASE = get_tables_for_release(CURRENT_RELEASE) INSILICO_PREDICTORS = ["cadd", "revel", "spliceai", "pangolin", "phylop"]
[docs]def get_finalized_schema(version: str) -> Dict[str, List[str]]: """ Get finalized schema for release HT based on version. :param version: Release version (e.g., "4.0", "4.1", "4.1.1"). :return: Dictionary with "globals" and "rows" lists of field names. """ globals_list = [ "freq_meta", "freq_index_dict", "freq_meta_sample_count", "faf_meta", "faf_index_dict", "joint_freq_meta", "joint_freq_index_dict", "joint_freq_meta_sample_count", "joint_faf_meta", "joint_faf_index_dict", "age_distribution", "downsamplings", "filtering_model", "inbreeding_coeff_cutoff", "interval_qc_parameters", "tool_versions", "vrs_versions", "vep_globals", ] rows_list = [ "freq", "grpmax", "faf", "fafmax", "joint_freq", "joint_grpmax", "joint_faf", "joint_fafmax", "a_index", "was_split", "rsid", "filters", "info", "vep", "vqsr_results", "region_flags", "allele_info", "histograms", "in_silico_predictors", ] # Add additional VEP versions based on release version. if version in VEP_VERSIONS_TO_ADD: for vep_version in VEP_VERSIONS_TO_ADD[version]: globals_list.append(f"vep{vep_version}_globals") rows_list.append(f"vep{vep_version}") globals_list.extend( [ "frequency_README", "date", "version", ] ) return { "globals": globals_list, "rows": rows_list, }
# Default schema for backward compatibility. FINALIZED_SCHEMA = get_finalized_schema(CURRENT_RELEASE) # Config is added as a function, so it is not evaluated until the function is called.
[docs]def get_config( data_type: str, tables_for_join: List[str], release_exists: bool = False, version: str = CURRENT_RELEASE, base_release_version: Optional[str] = None, test: bool = False, ) -> Dict[str, Dict[str, hl.expr.Expression]]: """ Get configuration dictionary for specified data type. Format: .. code-block:: '<Name of dataset>': { 'ht': '<Optional Hail Table for direct annotation extraction. This is not used for the join.>', 'path': 'gs://path/to/hail_table.ht', 'select': '<Optional list of fields to select or dict of new field name to location of old field in the dataset.>', 'field_name': '<Optional name of root annotation in combined dataset, defaults to name of dataset.>', 'custom_select': '<Optional function name of custom select function that is needed for more advanced logic>', 'custom_globals_select': '<Optional function name of custom globals select function that is needed for more advanced logic>', 'select_globals': '<Optional list of globals to select or dict of new global field name to old global field name. If not specified, all globals are selected.>' }, .. warning:: The 'in_silico' key's 'ht' logic is handled separately because it is a list of HTs. In this list, the phyloP HT is keyed by locus only and thus the 'ht' code below sets the join key to 1, which will grab the first key of ht.key.dtype.values() e.g. 'locus', when an HT's keys are not {'locus', 'alleles'}. All future in_silico predictors should have the keys confirmed to be 'locus' with or without 'alleles' before using this logic. :param data_type: Dataset's data type: 'exomes' or 'genomes'. :param tables_for_join: List of tables to join. :param release_exists: Whether the release HT already exists. If True, uses the specified `version` as the base release HT. Mutually exclusive with `base_release_version`. :param version: Release version for output (e.g., "4.0", "4.1", "4.1.1"). :param base_release_version: Specific release version to use as the base HT (e.g., "4.1"). When provided, loads that version's release HT as the base. Mutually exclusive with `release_exists`. :param test: Whether this is for a test run. Default is False. :return: Dict of dataset's configs. """ if base_release_version is not None and release_exists: raise ValueError( "Cannot specify both --release-exists and --base-release-version. " "Use --base-release-version to specify a specific version to use as the " "base, or use --release-exists to use the output version as the base." ) config = {} if "dbsnp" in tables_for_join: config["dbsnp"] = { "ht": dbsnp.ht(), "path": dbsnp.path, "select": ["rsid"], } if "filters" in tables_for_join: config["filters"] = { "ht": final_filter(data_type=data_type).ht(), "path": final_filter(data_type=data_type).path, "select": ["filters"], "custom_select": custom_filters_select, "custom_globals_select": custom_filters_select_globals, } if "in_silico" in tables_for_join: config["in_silico"] = { "ht": reduce( ( lambda joined_ht, ht: ( joined_ht.join(ht, "outer") if set(ht.key) == {"locus", "alleles"} else joined_ht.join(ht, "outer", _join_key=1) ) ), [ get_insilico_predictors(predictor=predictor).ht() for predictor in INSILICO_PREDICTORS ], ), "path": [ get_insilico_predictors(predictor=predictor).path for predictor in INSILICO_PREDICTORS ], "field_name": "in_silico_predictors", "select": [ "cadd", "revel_max", "spliceai_ds_max", "pangolin_largest_ds", "phylop", ], "custom_select": custom_in_silico_select, "select_globals": [ "cadd_version", "revel_version", "spliceai_version", "pangolin_version", "phylop_version", ], "global_name": "tool_versions", } if "info" in tables_for_join: config["info"] = { "ht": get_info().ht() if data_type == "exomes" else get_info_v3().ht(), "path": get_info().path if data_type == "exomes" else get_info_v3().path, "select": ["was_split", "a_index"], "custom_select": custom_info_select, } if "freq" in tables_for_join: config["freq"] = { "ht": get_freq(data_type=data_type).ht(), "path": get_freq(data_type=data_type).path, "select": [ "freq", "faf", "histograms", ], "custom_select": custom_freq_select, "custom_globals_select": ( custom_freq_globals_select if data_type == "genomes" else None ), "select_globals": ( [ "freq_meta", "freq_index_dict", "freq_meta_sample_count", "faf_meta", "faf_index_dict", ] + ( ["age_distribution", "downsamplings"] if data_type == "exomes" else [] ) ), } if "vep" in tables_for_join: config["vep"] = { "ht": get_vep(data_type=data_type, vep_version=DEFAULT_VEP_VERSION).ht(), "path": get_vep(data_type=data_type, vep_version=DEFAULT_VEP_VERSION).path, "select": ["vep"], "custom_select": get_custom_vep_select(DEFAULT_VEP_VERSION), "select_globals": [ "vep_version", "vep_help", "vep_config", ], "global_name": "vep_globals", } # Dynamically add additional VEP versions based on release version. if version in VEP_VERSIONS_TO_ADD: for vep_version in VEP_VERSIONS_TO_ADD[version]: vep_table_name = f"vep{vep_version}" if vep_table_name in tables_for_join: config[vep_table_name] = { "ht": ( get_vep( data_type=data_type, vep_version=vep_version, test=test ).ht() ), "path": ( get_vep( data_type=data_type, vep_version=vep_version, test=test ).path ), "select": ["vep"], "custom_select": get_custom_vep_select(vep_version), "select_globals": [ "vep_version", "vep_help", "vep_config", ], "global_name": f"{vep_table_name}_globals", } if "region_flags" in tables_for_join: config["region_flags"] = { "ht": get_freq(data_type=data_type).ht(), "path": get_freq(data_type=data_type).path, "custom_select": custom_region_flags_select, } if "joint_faf" in tables_for_join: config["joint_faf"] = { "ht": release_sites(data_type="joint").ht(), "path": release_sites(data_type="joint").path, "select": ["joint_freq", "joint_faf", "joint_fafmax"], "custom_select": custom_joint_faf_select, "select_globals": [ "joint_freq_meta", "joint_freq_index_dict", "joint_freq_meta_sample_count", "joint_faf_meta", "joint_faf_index_dict", ], } if release_exists or base_release_version is not None: # Determine which release version to use as the base HT. base_version = base_release_version or version release_res = release_sites(data_type=data_type).versions[base_version] release_ht = release_res.ht() config["release"] = { "ht": release_ht, "path": release_res.path, "select": [r for r in release_ht.row_value], "select_globals": [g for g in release_ht.globals], } return config
[docs]def drop_v3_subsets(freq_ht: hl.Table) -> hl.Table: """ Drop the frequencies of all v3 subsets except 'hgdp' and 'tgp' from `freq_ht`. :param freq_ht: v4.0 genomes freq Table. :return: v4.0 genomes freq Table with some v3 subsets dropped. """ freq_meta, array_exprs = filter_arrays_by_meta( freq_ht.freq_meta, { "freq": freq_ht.freq, "freq_meta_sample_count": freq_ht.index_globals().freq_meta_sample_count, }, {"subset": SUBSETS_TO_DROP}, keep=False, combine_operator="or", ) freq_ht = freq_ht.annotate(freq=array_exprs["freq"]) freq_ht = freq_ht.annotate_globals( freq_meta=freq_meta, freq_index_dict=make_freq_index_dict_from_meta(hl.literal(freq_meta)), freq_meta_sample_count=array_exprs["freq_meta_sample_count"], ) return freq_ht
[docs]def custom_joint_faf_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: """ Drop faf95 from 'grpmax'. This annotation will be combined with the others from joint_faf's select in the config. See note in `custom_freq_select` explaining why this field is removed. :param ht: Joint FAF Hail Table. :return: Select expression dict. """ selects = {"joint_grpmax": ht.joint_grpmax.drop("faf95")} return selects
[docs]def custom_freq_select( ht: hl.Table, data_type: str, **_ ) -> Dict[str, hl.expr.Expression]: """ Drop faf95 from both 'gnomad' and 'non_ukb' in 'grpmax' and rename `gen_anc_faf_max` to `fafmax`. These annotations will be combined with the others from freq's select in the config. .. note:: - The faf95 field in the grpmax struct is the FAF of the genetic ancestry group with the largest AF (grpmax AF). - The FAF fields within the gen_anc_faf_max struct contains the FAFs from the genetic ancestry group(s) with the largest FAFs - These values aren't necessarily the same; the group with the highest AF for a variant isn't necessarily the group with the highest FAF for a variant - The filtering allele frequencies that are used by the community are the values within the gen_anc_faf_max struct, NOT grpmax FAF, which is why we are dropping grpmax.faf95 and renaming gen_anc_faf_max :param ht: Freq Hail Table :param data_type: Dataset's data type: 'exomes' or 'genomes'. :return: Select expression dict. """ selects = { "grpmax": ( hl.struct(**{k: ht.grpmax[k].drop("faf95") for k in ht.grpmax}) if data_type == "exomes" else ht.grpmax.drop("faf95") ), "fafmax": ht.gen_anc_faf_max, } return selects
[docs]def custom_freq_globals_select(_ht: hl.Table) -> Dict[str, hl.expr.Expression]: """ Select freq table globals for genomes. Recomputes age_distribution from meta table to ensure age_alt data is included for samples where age is stored as a bin range. :param _ht: Freq Hail Table (unused, but required by interface). :return: Global select expression dict. """ logger.info("Recomputing age_distribution for genomes from meta table...") # Load genomes meta table and filter to release samples. meta_ht = meta(data_type="genomes").ht() meta_ht = meta_ht.filter(meta_ht.release) age_distribution = meta_ht.aggregate( hl.agg.hist( hl.if_else( hl.is_defined(meta_ht.project_meta.age), meta_ht.project_meta.age, meta_ht.project_meta.age_alt, ), 30, 80, 10, ) ) logger.info(f"Recomputed age_distribution: {age_distribution}") return {"age_distribution": hl.literal(age_distribution)}
[docs]def custom_in_silico_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: """ Get in silico predictors from VEP for release. This function currently selects only SIFT and Polyphen from VEP. :param ht: VEP Hail Table. :return: Select expression dict. """ vep_in_silico = get_sift_polyphen_from_vep(get_vep().ht()) selects = { "sift_max": vep_in_silico[ht.key].sift_max, "polyphen_max": vep_in_silico[ht.key].polyphen_max, } return selects
[docs]def custom_region_flags_select( ht: hl.Table, data_type: str, **_ ) -> Dict[str, hl.expr.Expression]: """ Select region flags for release. :param ht: Hail Table. :param data_type: Dataset's data type: 'exomes' or 'genomes'. :return: Select expression dict. """ selects = { "region_flags": region_flag_expr( ht, non_par=True, prob_regions={"lcr": lcr_intervals.ht(), "segdup": seg_dup_intervals.ht()}, ) } if data_type == "exomes": selects["region_flags"] = selects["region_flags"].annotate( fail_interval_qc=~interval_qc_pass(all_platforms=True) .ht()[ht.locus] .pass_interval_qc, outside_ukb_capture_region=~hl.is_defined( calling_intervals(interval_name="ukb", calling_interval_padding=0).ht()[ ht.locus ] ), outside_broad_capture_region=~hl.is_defined( calling_intervals( interval_name="broad", calling_interval_padding=0 ).ht()[ht.locus] ), ) return selects
[docs]def custom_filters_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: """ Select gnomAD filter HT fields for release dataset. Extract "results" field and rename based on filtering method. :param ht: Filters Hail Table. :return: Select expression dict. """ filter_name = hl.eval(ht.filtering_model.filter_name) if filter_name == "RF": name = "random_forest_results" elif filter_name == "AS_VQSR": name = "vqsr_results" elif filter_name == "IF": name = "isolation_forest_results" else: raise ValueError(f"Filtering method {filter_name} not recognized.") selects = {name: ht.results.annotate(**ht.training_info)} return selects
[docs]def custom_filters_select_globals(ht: hl.Table) -> Dict[str, hl.expr.Expression]: """ Select filter HT globals for release dataset. :param ht: Filters Hail Table. :return: Select expression dict. """ selects = { "filtering_model": hl.struct( **{ "filter_name": ht.filtering_model.filter_name, "score_name": ht.filtering_model.score_name, "snv_cutoff": ht.filtering_model.snv_cutoff.drop("bin_id"), "indel_cutoff": ht.filtering_model.indel_cutoff.drop("bin_id"), "snv_training_variables": ht.filtering_model.snv_training_variables, "indel_training_variables": ht.filtering_model.indel_training_variables, } ), "inbreeding_coeff_cutoff": ht.inbreeding_coeff_cutoff, } return selects
[docs]def custom_info_select( ht: hl.Table, data_type: str, config ) -> Dict[str, hl.expr.Expression]: """ Select fields for info Hail Table annotation in release. The info field requires fields from the freq HT and the filters HT so those are pulled in here along with all info HT fields. It also adds the `allele_info` struct to release HT. :param ht: Info Hail Table. :param data_type: Dataset's data type: 'exomes' or 'genomes'. :return: Select expression dict. """ # Create a dict of the fields from the filters HT that we want to add to the info. filters_ht = config.get("filters")["ht"] if data_type == "exomes": # For more information on the selected compute_info_method, please see the # run_compute_info in gnomad_qc.v4.annotations.generate_variant_qc_annotations.py compute_info_method = hl.eval( filters_ht.filtering_model_specific_info.compute_info_method ) score_name = hl.eval(filters_ht.filtering_model.score_name) filters_ht = filters_ht.transmute(**filters_ht.truth_sets) filters = filters_ht[ht.key] filters_info_fields = [ "singleton", "transmitted_singleton", "sibling_singleton", "omni", "mills", "monoallelic", "only_het", ] if data_type == "genomes": filters_info_fields.remove("sibling_singleton") filters_info_dict = {field: filters[field] for field in filters_info_fields} filters_info_dict.update({**{f"{score_name}": filters[f"{score_name}"]}}) # Create a dict of the fields from the freq HT that we want to add to the info. freq_ht = config.get("freq")["ht"] # TODO: will change back to 'inbreeding_coeff' once we have the new freq_ht if data_type == "exomes": freq_info_dict = {"inbreeding_coeff": freq_ht[ht.key]["inbreeding_coeff"]} else: freq_info_dict = {"inbreeding_coeff": freq_ht[ht.key]["InbreedingCoeff"]} # Create a dict of the fields from the VRS HT that we want to add to the info. vrs_ht = get_vrs(data_type=data_type).ht() vrs_info_fields = {"vrs": vrs_ht[ht.key].vrs} # Create a dict of the fields from the info HT that we want keep in the info. if data_type == "exomes": info_struct = hl.struct(**ht.site_info, **ht[f"{compute_info_method}_info"]) else: # v3 info HT has no SOR or AS_SOR fields. They are computed by VQSR, so we can # grab them from the filters HT. info_struct = hl.struct( **ht.info, SOR=filters.SOR, AS_SOR=filters.features.AS_SOR ) info_dict = {field: info_struct[field] for field in SITE_FIELDS + AS_FIELDS} info_dict.update(filters_info_dict) info_dict.update(freq_info_dict) info_dict.update(vrs_info_fields) # Select the info and allele info annotations. We drop nonsplit_alleles from # allele_info so that we don't release alleles that are found in non-releasable # samples. selects = {"info": hl.struct(**info_dict)} if data_type == "exomes": selects["allele_info"] = ht.allele_info.drop("nonsplit_alleles") if data_type == "genomes": selects["allele_info"] = filters.allele_info return selects
[docs]def get_custom_vep_select(vep_version: str): """ Get custom VEP select function for a given VEP version. :param vep_version: VEP version (e.g., "105", "115"). :return: Custom select function for the VEP version. """ def custom_vep_version_select(ht: hl.Table, **_) -> Dict[str, hl.expr.Expression]: """ Select fields for VEP Hail Table annotation in release. This custom select function does the following: - For VEP 105: Removes fields from VEP annotations that have been excluded in past releases or are missing in all rows. - Drops sift_prediction, sift_score, polyphen_prediction, and polyphen_score fields from the transcript_consequences array. - Updates the loftee annotations to use a GERP score threshold of 0 instead of the default applied by the LOFTEE plugin. :param ht: VEP Hail table. :return: Select expression dict. """ # Only apply remove_missing_vep_fields for VEP 105. if vep_version == "105": vep_expr = remove_missing_vep_fields(ht.vep) else: vep_expr = ht.vep field_name = ( f"vep{vep_version}" if vep_version != DEFAULT_VEP_VERSION else "vep" ) csqs_expr = vep_expr.transcript_consequences.map( lambda x: update_loftee_end_trunc_filter(x) ) # Only drop sift/polyphen fields for default VEP version. if vep_version == DEFAULT_VEP_VERSION: csqs_expr = csqs_expr.map( lambda x: x.drop( "sift_prediction", "sift_score", "polyphen_prediction", "polyphen_score", ) ) return {field_name: vep_expr.annotate(transcript_consequences=csqs_expr)} return custom_vep_version_select
# TODO: REMINDER TO DO THE BELOW TODO or remove it. # TODO: Move all below functions to create_release_utils.py after approval.
[docs]def get_select_global_fields( ht: hl.Table, config: Dict[str, Dict[str, Any]], tables_for_join: List[str] = TABLES_FOR_RELEASE, ) -> Dict[str, hl.expr.Expression]: """ Generate a dictionary of globals to select by checking the config of all tables joined. .. note:: This function will place the globals within the select_globals value above any globals returned from custom_select_globals. If ordering is important, use only custom_select_globals. :param ht: Final joined HT with globals. :param config: Dictionary with configuration options for each dataset. Expects the dataset name (matching values in `tables_for_join`) as the key and a dictionary of options as the value, where the options include any of the following keys: 'select_globals', 'custom_globals_select', 'global_name'. :param tables_for_join: List of tables to join into final release HT. :return: select mapping from global annotation name to `ht` annotation. """ t_globals = [] select_globals = {} for t in tables_for_join: t_config = config.get(t) if "select_globals" in t_config: select_globals = get_select_fields(t_config["select_globals"], ht) if t_config.get("custom_globals_select"): custom_globals_select_fn = t_config["custom_globals_select"] select_globals = { **select_globals, **custom_globals_select_fn(ht), } if "global_name" in t_config: global_name = t_config.get("global_name") select_globals = {global_name: hl.struct(**select_globals)} t_globals.append(select_globals) t_globals = reduce(lambda a, b: dict(a, **b), t_globals) return t_globals
[docs]def get_select_fields( selects: Union[List, Dict], base_ht: hl.Table ) -> Dict[str, hl.expr.Expression]: """ Generate a select dict from traversing the `base_ht` and extracting annotations. :param selects: Mapping or list of selections. :param base_ht: Base Hail Table to traverse. :return: select Mapping from annotation name to `base_ht` annotation. """ select_fields = {} if selects is not None: if isinstance(selects, list): select_fields = {selection: base_ht[selection] for selection in selects} elif isinstance(selects, dict): for key, val in selects.items(): # Grab the field and continually select it from the hail table. ht = base_ht for attr in val.split("."): ht = ht[attr] select_fields[key] = ht return select_fields
[docs]def get_final_ht_fields( ht: hl.Table, schema: Dict[str, List[str]] = FINALIZED_SCHEMA, ) -> Dict[str, List[str]]: """ Get the final fields for the release HT. Create a dictionary of lists of fields that are in the `schema` and are present in the HT. If a field is not present in the HT, log a warning. :param ht: Hail Table. :param schema: Schema for the release HT. :return: Dict of final fields for the release HT. """ final_fields = {"rows": [], "globals": []} for field in schema["rows"]: if field in ht.row: final_fields["rows"].append(field) else: logger.warning(f"Field {field} from schema not found in HT.") for field in schema["globals"]: if field in ht.globals: final_fields["globals"].append(field) else: logger.warning(f"Global {field} from schema not found in HT.") return final_fields
[docs]def get_ht( dataset: str, data_type: str, config: Dict[str, Dict[str, Any]], use_config_ht: bool = False, checkpoint: bool = False, test: bool = False, _intervals: Optional[Any] = None, base_ht_filter: Optional[hl.Table] = None, ) -> hl.Table: """ Return the appropriate Hail table with selects applied. :param dataset: Hail Table to join. :param data_type: Dataset's data type: 'exomes' or 'genomes'. :param config: Dictionary with configuration options for each dataset. Expects the dataset name as the key and a dictionary of options as the value, where the options include some of the following: 'ht', 'path', 'select', 'field_name', 'custom_select'. :param use_config_ht: Whether to use the 'ht' in the dataset config instead of reading in `path`. Default is False. :param checkpoint: Whether to checkpoint the Hail Table. Default is False. :param test: Whether call is for a test run. If True, the dataset will be filtered to PCSK9. Default is False. :param _intervals: Intervals for reading in hail Table. Used to optimize join. :param base_ht_filter: Optional Hail Table to filter on before joining. Default is None. :return: Hail Table with fields to select. """ logger.info("Getting the %s dataset and its selected annotations...", dataset) dataset_config = config[dataset] if use_config_ht: ht = dataset_config["ht"] else: logger.info("Reading in %s", dataset) ht = hl.read_table(dataset_config["path"], _intervals=_intervals) if dataset == "freq" and data_type == "genomes": ht = drop_v3_subsets(ht) if test: # Keep only PCSK9. ht = hl.filter_intervals( ht, [ hl.parse_locus_interval( "chr1:55039447-55064852", reference_genome="GRCh38" ) ], ) select_fields = get_select_fields(dataset_config.get("select"), ht) if "custom_select" in dataset_config: custom_select_fn = dataset_config["custom_select"] select_fields = { **select_fields, **custom_select_fn(ht, data_type=data_type, config=config), } if "field_name" in dataset_config: field_name = dataset_config.get("field_name") select_query = {field_name: hl.struct(**select_fields)} else: select_query = select_fields ht = ht.select(**select_query) if base_ht_filter: ht_key = list(ht.key) if list(base_ht_filter.key) != ht_key: base_ht_filter = base_ht_filter.key_by(*ht_key) ht = ht.semi_join(base_ht_filter) if checkpoint: ht = ht.checkpoint(new_temp_file(f"{dataset}.for_release", "ht")) return ht
[docs]def join_hts( base_table: str, tables: List[str], data_type: str, config: Dict[str, Dict[str, Any]], version: str, new_partition_percent: Optional[float] = None, new_n_partitions: Optional[float] = None, checkpoint_tables: bool = False, track_included_datasets: bool = False, use_annotate: bool = False, test: bool = False, ) -> hl.Table: """ Outer join a list of Hail Tables. :param base_table: Dataset to use for interval partitioning. :param tables: List of tables to join. :param data_type: Dataset's data type: 'exomes' or 'genomes'. :param config: Dictionary with configuration options for each dataset. Expects the dataset name (matching values in `tables`) as the key and a dictionary of options as the value, where the options include some of the following: 'ht', 'path', 'select', 'field_name', 'custom_select', 'select_globals', 'custom_globals_select', 'global_name'. :param version: Release version. :param new_partition_percent: Percent of base_table partitions used for final release Hail Table. :param new_n_partitions: Number of partitions for final release Hail Table. :param checkpoint_tables: Whether to checkpoint the tables before join. Default is False. :param track_included_datasets: Whether to track the datasets included in the release. This is used to update the included_datasets json file. Default is False. :param use_annotate: Whether to use annotate instead of join. Default is False. :param test: Whether this is for a test run. Default is False. :return: Hail Table with datasets joined. """ logger.info( "Reading in %s to determine partition intervals for efficient join", base_table, ) partition_intervals = None if new_n_partitions or new_partition_percent: base_ht = get_ht( base_table, data_type, config, use_config_ht=True, test=test, ) new_n_partitions = new_n_partitions or base_ht.n_partitions() if new_partition_percent: new_n_partitions = int(base_ht.n_partitions() * new_partition_percent) partition_intervals = base_ht._calculate_new_partitions(new_n_partitions) base_ht = get_ht( base_table, data_type, config, checkpoint=checkpoint_tables, _intervals=partition_intervals, test=test, ) # Remove base_table from tables if it is in the list and add it to the front. if base_table in tables: tables.remove(base_table) logger.info("Joining datasets: %s...", tables) hts = [base_ht] + [ get_ht( table, data_type, config, # There is no single path for insilico predictors, so we need to handle # this case separately. use_config_ht=True if table in ["in_silico", "exomes_an"] else False, checkpoint=checkpoint_tables, _intervals=partition_intervals, test=test, base_ht_filter=base_ht.select(), ) for table in tables ] if use_annotate: joined_ht = reduce( lambda joined_ht, ht: joined_ht.annotate( **ht.index([*[joined_ht[k] for k in list(ht.key)]]) ), hts, ) hts = [g_ht.index_globals() for g_ht in hts] global_struct = reduce(lambda g, ann_g: g.annotate(**ann_g), hts) joined_ht = joined_ht.select_globals(**global_struct) else: joined_ht = reduce((lambda joined_ht, ht: joined_ht.join(ht, "left")), hts) joined_ht = joined_ht.select_globals( **get_select_global_fields(joined_ht, config, [base_table] + tables) ) if track_included_datasets: # Track the datasets we've added as well as the source paths. # If release HT is included in tables, read in the included datasets json # and update the keys to the path for any new tables included_datasets = {} if "release" in tables: with hl.utils.hadoop_open( included_datasets_json_path( data_type=data_type, test=test, release_version=hl.eval(config["release"]["ht"].version), ) ) as f: included_datasets = json.loads(f.read()) included_datasets.update({t: config[t]["path"] for t in tables}) with hl.utils.hadoop_open( included_datasets_json_path( data_type=data_type, test=test, release_version=version ), "w", ) as f: f.write(hl.eval(hl.json(included_datasets))) return joined_ht
[docs]def main(args): """Create release ht.""" data_type = args.data_type hl.init( log=f"/create_release_ht_{data_type}.log", tmp_dir="gs://gnomad-tmp-4day", default_reference="GRCh38", ) # Determine tables to join based on version if not explicitly provided. tables_for_join = ( args.tables_for_join if args.tables_for_join else get_tables_for_release(args.version) ) # Get schema based on version. finalized_schema = get_finalized_schema(args.version) if data_type == "genomes": finalized_schema["globals"].remove("interval_qc_parameters") finalized_schema["globals"].remove("downsamplings") logger.info( "Getting config for %s release HT to check Tables for duplicate variants...", data_type, ) config = get_config( data_type=data_type, tables_for_join=tables_for_join, release_exists=args.release_exists, version=args.version, base_release_version=args.base_release_version, test=args.test, ) dup_errors = [] if not args.test: for c in config: if "ht" in config[c]: ht = config[c]["ht"] distinct_count = ht.select().distinct().count() if distinct_count != ht.count(): dup_errors.append( f"HT {c} has {distinct_count} distinct rows but {ht.count()} total" " rows." ) else: logger.info(f"HT {c} has no duplicate rows.") if dup_errors: raise ValueError("\n".join(dup_errors)) logger.info(f"Creating {data_type} release HT...") ht = join_hts( args.base_table, tables_for_join, data_type, config, args.version, new_partition_percent=args.new_partition_percent, track_included_datasets=True, test=args.test, ) # Filter out chrM, AS_lowqual sites (these sites are dropped in the final_filters HT # so will not have information in `filters`) and AC_raw == 0. logger.info("Filtering out chrM, AS_lowqual, and AC_raw == 0 sites...") ht = hl.filter_intervals(ht, [hl.parse_locus_interval("chrM")], keep=False) ht = ht.filter(hl.is_defined(ht.filters) & (ht.freq[1].AC > 0)) logger.info("Finalizing the release HT global and row fields...") # Add additional globals that were not present on the joined HTs. globals_to_annotate = { "vep_globals": ht.vep_globals.annotate( gencode_version=GENCODE_VERSION, mane_select_version=MANE_SELECT_VERSION, ), "tool_versions": ht.tool_versions.annotate( dbsnp_version=DBSNP_VERSION, sift_version=SIFT_VERSION, polyphen_version=POLYPHEN_VERSION, ), "vrs_versions": hl.struct( **{ "vrs_schema_version": VRS_SCHEMA_VERSION, "vrs_python_version": VRS_PYTHON_VERSION, "seqrepo_version": SEQREPO_VERSION, }, ), "date": datetime.now().isoformat(), "version": args.version, "frequency_README": get_freq_array_readme(data_type=data_type), } # Add additional VEP version globals annotations if they exist if args.version in VEP_VERSIONS_TO_ADD: for vep_version in VEP_VERSIONS_TO_ADD[args.version]: vep_globals_name = f"vep{vep_version}_globals" if vep_globals_name in ht.globals: globals_to_annotate[vep_globals_name] = ht[vep_globals_name].annotate( gencode_version=GENCODE_VERSIONS[vep_version], mane_select_version=MANE_SELECT_VERSIONS[vep_version], ) ht = ht.annotate_globals(**globals_to_annotate) if data_type == "exomes": ht = ht.annotate_globals( interval_qc_parameters=interval_qc_pass(all_platforms=True) .ht() .index_globals() .high_qual_interval_parameters ) # Organize the fields in the release HT to match the order of finalized_schema when # the fields are present in the HT. final_fields = get_final_ht_fields(ht, schema=finalized_schema) ht = ht.select(*final_fields["rows"]).select_globals(*final_fields["globals"]) output_path = ( f"{qc_temp_prefix(data_type=data_type)}release/gnomad.{data_type}.sites.test.{datetime.today().strftime('%Y-%m-%d')}.ht" if args.test else release_sites(data_type=data_type).path ) logger.info(f"Writing out {data_type} release HT to %s", output_path) ht = ht.naive_coalesce(args.n_partitions).checkpoint( output_path, args.overwrite, ) logger.info("Final release HT schema:") ht.describe() logger.info("Final variant count: %d", ht.count())
def get_script_argument_parser() -> argparse.ArgumentParser: """Get script argument parser.""" parser = argparse.ArgumentParser() parser.add_argument( "--new-partition-percent", help=( "Percent of start dataset partitions to use for release HT. Default is 1.1" " (110%)" ), default=1.1, ) parser.add_argument("--overwrite", help="Overwrite data", action="store_true") parser.add_argument( "-v", "--version", help="The version of gnomAD.", default=CURRENT_RELEASE, ) parser.add_argument( "-t", "--test", help="Runs a test on PCSK9 region, chr1:55039447-55064852", action="store_true", ) parser.add_argument( "-d", "--data-type", help="Data type to create release HT for.", default="exomes", choices=["exomes", "genomes"], ) parser.add_argument( "-j", "--tables-for-join", help="Tables to join for release", default=None, type=str, nargs="+", ) parser.add_argument( "-b", "--base-table", help="Base table for interval partition calculation.", default="freq", ) parser.add_argument( "--release-exists", help=( "Use the release version specified by --version as the base HT. When " "specified, loads the release HT for the version being updated and uses it " "as the base for joining other tables. This is useful when updating an " "existing release in place. Mutually exclusive with --base-release-version." ), action="store_true", ) parser.add_argument( "--base-release-version", help=( "Specific release version to use as the base HT (e.g., '4.1'). When " "specified, loads that version's release HT as the base for joining other " "tables. This is useful when creating a new release version (specified by " "--version) from an existing release version. For example, to create " "v4.1.1 from v4.1 base, use --base-release-version 4.1 --version 4.1.1. " "Mutually exclusive with --release-exists." ), type=str, default=None, ) parser.add_argument( "--slack-channel", help="Slack channel to post results and notifications to." ) parser.add_argument( "--n-partitions", help="Number of partitions to naive coalesce the release Table to.", type=int, default=10000, ) return parser if __name__ == "__main__": parser = get_script_argument_parser() args = parser.parse_args() if args.slack_channel: with slack_notifications(slack_token, args.slack_channel): main(args) else: main(args)