Source code for gnomad.sample_qc.relatedness

# noqa: D100

import logging
import random
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union

import hail as hl
import networkx as nx

from gnomad.utils.annotations import annotate_adj

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

UNRELATED = "unrelated"
"""
String representation for a pair of unrelated individuals in this module.
Typically >2nd degree relatives, but the threshold is user-dependant.
"""

SECOND_DEGREE_RELATIVES = "second degree relatives"
"""
String representation for a pair of 2nd degree relatives in this module.
"""

PARENT_CHILD = "parent-child"
"""
String representation for a parent-child pair in this module.
"""

SIBLINGS = "siblings"
"""
String representation for a sibling pair in this module.
"""

DUPLICATE_OR_TWINS = "duplicate/twins"
"""
String representation for a pair of samples who are identical (either MZ twins of duplicate) in this module.
"""

AMBIGUOUS_RELATIONSHIP = "ambiguous"
"""
String representation for a pair of samples whose relationship is ambiguous.
This is used in the case of a pair of samples which kinship/IBD values do not correspond to any biological relationship between two individuals.
"""


[docs]def get_duplicated_samples( relationship_ht: hl.Table, i_col: str = "i", j_col: str = "j", rel_col: str = "relationship", ) -> List[Set[str]]: """ Extract the list of duplicate samples using a Table ouput from pc_relate. :param relationship_ht: Table with relationships between pairs of samples :param i_col: Column containing the 1st sample :param j_col: Column containing the 2nd sample :param rel_col: Column containing the sample pair relationship annotated with get_relationship_expr :return: List of sets of samples that are duplicates """ def get_all_dups( s: str, dups: Set[str], samples_duplicates: Dict[str, Set[str]] ) -> Tuple[Set[str], Dict[str, Set[str]]]: """ Create the set of all duplicated samples corresponding to `s` that are found in `sample_duplicates`. Also return the remaining sample duplicates after removing all duplicated samples corresponding to `s`. Works by recursively adding duplicated samples to the set. :param s: sample to identify duplicates for :param dups: set of corresponding samples already identified :param samples_duplicates: dict of sample -> duplicate-pair left to assign :return: (set of duplicates corresponding to s found in samples_duplicates, remaining samples_duplicates) """ if s in samples_duplicates: dups.add(s) s_dups = samples_duplicates.pop(s) for s_dup in s_dups: if s_dup not in dups: dups, samples_duplicates = get_all_dups( s_dup, dups, samples_duplicates ) return dups, samples_duplicates logger.info("Computing duplicate sets") dup_pairs = relationship_ht.aggregate( hl.agg.filter( relationship_ht[rel_col] == DUPLICATE_OR_TWINS, hl.agg.collect(hl.tuple([relationship_ht[i_col], relationship_ht[j_col]])), ) ) samples_duplicates = defaultdict(set) for i, j in dup_pairs: samples_duplicates[i].add(j) samples_duplicates[j].add(i) duplicated_samples = [] while len(samples_duplicates) > 0: dup_set, samples_duplicates = get_all_dups( list(samples_duplicates)[0], set(), samples_duplicates ) duplicated_samples.append(dup_set) return duplicated_samples
[docs]def get_duplicated_samples_ht( duplicated_samples: List[Set[str]], samples_rankings_ht: hl.Table, rank_ann: str = "rank", ): """ Create a HT with duplicated samples sets. Each row is indexed by the sample that is kept and also contains the set of duplicate samples that should be filtered. `samples_rankings_ht` is a HT containing a global rank for each of the samples (smaller is better). :param duplicated_samples: List of sets of duplicated samples :param samples_rankings_ht: HT with global rank for each sample :param rank_ann: Annotation in `samples_ranking_ht` containing each sample global rank (smaller is better). :return: HT with duplicate sample sets, including which to keep/filter """ dups_ht = hl.Table.parallelize( [ hl.struct(dup_set=i, dups=duplicated_samples[i]) for i in range(0, len(duplicated_samples)) ] ) dups_ht = dups_ht.explode(dups_ht.dups, name="_dup") dups_ht = dups_ht.key_by("_dup") dups_ht = dups_ht.annotate(rank=samples_rankings_ht[dups_ht.key][rank_ann]) dups_cols = hl.bind( lambda x: hl.struct(kept=x[0], filtered=x[1:]), hl.sorted( hl.agg.collect(hl.tuple([dups_ht._dup, dups_ht.rank])), key=lambda x: x[1] ).map(lambda x: x[0]), ) dups_ht = dups_ht.group_by(dups_ht.dup_set).aggregate(**dups_cols) if isinstance(dups_ht.kept, hl.expr.StructExpression): dups_ht = dups_ht.key_by(**dups_ht.kept).drop("kept") else: dups_ht = dups_ht.key_by( s=dups_ht.kept ) # Since there is no defined name in the case of a non-struct type, use `s` return dups_ht
[docs]def explode_duplicate_samples_ht(dups_ht: hl.Table) -> hl.Table: """ Explode the result of `get_duplicated_samples_ht`, so that each line contains a single sample. An additional annotation is added: `dup_filtered` indicating which of the duplicated samples was kept. Requires a field `filtered` which type should be the same as the input duplicated samples Table key. :param dups_ht: Input HT :return: Flattened HT """ def get_dups_to_keep_expr(): if dups_ht.filtered.dtype.element_type == dups_ht.key.dtype: return (dups_ht.key, False) elif (len(dups_ht.key) == 1) & ( dups_ht.filtered.dtype.element_type == dups_ht.key[0].dtype ): return (dups_ht.key[0], False) else: raise TypeError( "Cannot explode table as types of the filtered field" f" ({dups_ht.filtered.dtype}) and the key ({dups_ht.key.dtype}) are" " incompatible." ) dups_ht = dups_ht.annotate( dups=hl.array([get_dups_to_keep_expr()]).extend( dups_ht.filtered.map(lambda x: (x, True)) ) ) dups_ht = dups_ht.explode("dups") dups_ht = dups_ht.key_by() return dups_ht.select(s=dups_ht.dups[0], dup_filtered=dups_ht.dups[1]).key_by("s")
[docs]def get_relationship_expr( # TODO: The threshold detection could be easily automated by fitting distributions over the data. kin_expr: hl.expr.NumericExpression, ibd0_expr: hl.expr.NumericExpression, ibd1_expr: hl.expr.NumericExpression, ibd2_expr: hl.expr.NumericExpression, first_degree_kin_thresholds: Tuple[float, float] = (0.19, 0.4), second_degree_min_kin: float = 0.1, ibd0_0_max: float = 0.025, ibd0_25_thresholds: Tuple[float, float] = (0.1, 0.425), # ibd0_50_thresholds = [0.37, 0.625], Not useful for relationship inference # ibd0_100_threshold = 0.625 , Not useful for relationship inference ibd1_0_thresholds: Tuple[float, float] = (-0.15, 0.1), # ibd1_25_thresholds: Tuple[float, float] = (0.1, 0.37), Not useful for # relationship inference ibd1_50_thresholds: Tuple[float, float] = (0.275, 0.75), ibd1_100_min: float = 0.75, ibd2_0_max: float = 0.125, ibd2_25_thresholds: Tuple[float, float] = (0.1, 0.5), ibd2_100_thresholds: Tuple[float, float] = (0.75, 1.25), ) -> hl.expr.StringExpression: """ Return an expression indicating the relationship between a pair of samples given their kin coefficient and IBDO, IBD1, IBD2 values. The kinship coefficient values in the defaults are in line with those output from `hail.methods.pc_relate <https://hail.is/docs/0.2/methods/genetics.html?highlight=pc_relate#hail.methods.pc_relate>`. :param kin_expr: Kin coefficient expression :param ibd0_expr: IBDO expression :param ibd1_expr: IBD1 expression :param ibd2_expr: IDB2 expression :param first_degree_kin_thresholds: (min, max) kinship threshold for 1st degree relatives :param second_degree_min_kin: min kinship threshold for 2nd degree relatives :param ibd0_0_max: max IBD0 threshold for 0 IBD0 sharing :param ibd0_25_thresholds: (min, max) thresholds for 0.25 IBD0 sharing :param ibd1_0_thresholds: (min, max) thresholds for 0 IBD1 sharing. Note that the min is there because pc_relate can output large negative values in some corner cases. :param ibd1_50_thresholds: (min, max) thresholds for 0.5 IBD1 sharing :param ibd1_100_min: min IBD1 threshold for 1.0 IBD1 sharing :param ibd2_0_max: max IBD2 threshold for 0 IBD2 sharing :param ibd2_25_thresholds: (min, max) thresholds for 0.25 IBD2 sharing :param ibd2_100_thresholds: (min, max) thresholds for 1.00 IBD2 sharing. Note that the min is there because pc_relate can output much larger IBD2 values in some corner cases. :return: The relationship annotation using the constants defined in this module. """ return ( hl.case() .when(kin_expr < second_degree_min_kin, UNRELATED) .when((kin_expr < first_degree_kin_thresholds[0]), SECOND_DEGREE_RELATIVES) .when( (kin_expr < first_degree_kin_thresholds[1]) & (ibd0_expr <= ibd0_0_max) & (ibd1_expr >= ibd1_100_min) & (ibd2_expr <= ibd2_0_max), PARENT_CHILD, ) .when( (kin_expr < first_degree_kin_thresholds[1]) & (ibd0_expr >= ibd0_25_thresholds[0]) & (ibd0_expr <= ibd0_25_thresholds[1]) & (ibd1_expr >= ibd1_50_thresholds[0]) & (ibd1_expr <= ibd1_50_thresholds[1]) & (ibd2_expr >= ibd2_25_thresholds[0]) & (ibd2_expr <= ibd2_25_thresholds[1]), SIBLINGS, ) .when( (kin_expr > first_degree_kin_thresholds[1]) & (ibd0_expr < ibd0_0_max) & (ibd1_expr >= ibd1_0_thresholds[0]) & (ibd1_expr <= ibd1_0_thresholds[1]) & (ibd2_expr >= ibd2_100_thresholds[0]) & (ibd2_expr <= ibd2_100_thresholds[1]), DUPLICATE_OR_TWINS, ) .default(AMBIGUOUS_RELATIONSHIP) )
[docs]def get_slope_int_relationship_expr( kin_expr: hl.expr.NumericExpression, y_expr: hl.expr.NumericExpression, parent_child_max_y: float, second_degree_sibling_lower_cutoff_slope: float, second_degree_sibling_lower_cutoff_intercept: float, second_degree_upper_sibling_lower_cutoff_slope: float, second_degree_upper_sibling_lower_cutoff_intercept: float, duplicate_twin_min_kin: float = 0.42, second_degree_min_kin: float = 0.1, duplicate_twin_ibd1_min: float = -0.15, duplicate_twin_ibd1_max: float = 0.1, ibd1_expr: Optional[hl.expr.NumericExpression] = None, ): """ Return an expression indicating the relationship between a pair of samples given slope and intercept cutoffs. The kinship coefficient (`kin_expr`) and an additional metric (`y_expr`) are used to define the relationship between a pair of samples. For this function the slope and intercepts should refer to cutoff lines where the x-axis, or independent variable is the kinship coefficient and the y-axis, or dependent variable, is the metric defined by `y_expr`. Typically, the y-axis metric IBS0, IBS0/IBS2, or IBD0. .. note:: No defaults are provided for the slope and intercept cutoffs because they are highly dependent on the dataset and the metric used in `y_expr`. The relationship expression is determined as follows: - If `kin_expr` < `second_degree_min_kin` -> UNRELATED - If `kin_expr` > `duplicate_twin_min_kin`: - If `y_expr` < `parent_child_max_y`: - If `ibd1_expr` is defined: - If `duplicate_twin_ibd1_min` <= `ibd1_expr` <= ` duplicate_twin_ibd1_max` -> DUPLICATE_OR_TWINS - Else -> AMBIGUOUS_RELATIONSHIP - Else -> DUPLICATE_OR_TWINS - If `y_expr` < `parent_child_max_y` -> PARENT_CHILD - If pair is over second_degree_sibling_lower_cutoff line: - If pair is over second_degree_upper_sibling_lower_cutoff line -> SIBLINGS - Else -> SECOND_DEGREE_RELATIVES - If none of the above conditions are met -> AMBIGUOUS_RELATIONSHIP :param kin_expr: Kin coefficient expression. Used as the x-axis, or independent variable, for the slope and intercept cutoffs. :param y_expr: Expression for the metric to use as the y-axis, or dependent variable, for the slope and intercept cutoffs. This is typically an expression for IBS0, IBS0/IBS2, or IBD0. :param parent_child_max_y: Maximum value of the metric defined by `y_expr` for a parent-child pair. :param second_degree_sibling_lower_cutoff_slope: Slope of the line to use as a lower cutoff for second degree relatives and siblings from parent-child pairs. :param second_degree_sibling_lower_cutoff_intercept: Intercept of the line to use as a lower cutoff for second degree relatives and siblings from parent-child pairs. :param second_degree_upper_sibling_lower_cutoff_slope: Slope of the line to use as an upper cutoff for second degree relatives and a lower cutoff for siblings. :param second_degree_upper_sibling_lower_cutoff_intercept: Intercept of the line to use as an upper cutoff for second degree relatives and a lower cutoff for siblings. :param duplicate_twin_min_kin: Minimum kinship for duplicate or twin pairs. Default is 0.42. :param second_degree_min_kin: Minimum kinship threshold for 2nd degree relatives. Default is 0.08838835. Bycroft et al. (2018) calculates a theoretical kinship of 0.08838835 for a second degree relationship cutoff, but this cutoff should be determined by evaluation of the kinship distribution. :param ibd1_expr: Optional IBD1 expression. If this expression is provided, `duplicate_twin_ibd1_min` and `duplicate_twin_ibd1_max` will be used as an additional cutoff for duplicate or twin pairs. :param duplicate_twin_ibd1_min: Minimum IBD1 cutoff for duplicate or twin pairs. Note: the min is because pc_relate can output large negative values in some corner cases. :param duplicate_twin_ibd1_max: Maximum IBD1 cutoff for duplicate or twin pairs. :return: The relationship annotation using the constants defined in this module. """ # Only use a duplicate/twin IBD1 cutoff if an ibd1_expr is supplied. if ibd1_expr is not None: dup_twin_ibd1_expr = (ibd1_expr >= duplicate_twin_ibd1_min) & ( ibd1_expr <= duplicate_twin_ibd1_max ) else: dup_twin_ibd1_expr = True return ( hl.case() .when(kin_expr < second_degree_min_kin, UNRELATED) .when( kin_expr > duplicate_twin_min_kin, hl.if_else( dup_twin_ibd1_expr & (y_expr < parent_child_max_y), DUPLICATE_OR_TWINS, AMBIGUOUS_RELATIONSHIP, ), ) .when(y_expr < parent_child_max_y, PARENT_CHILD) .when( y_expr > second_degree_sibling_lower_cutoff_slope * kin_expr + second_degree_sibling_lower_cutoff_intercept, hl.if_else( y_expr > second_degree_upper_sibling_lower_cutoff_slope * kin_expr + second_degree_upper_sibling_lower_cutoff_intercept, SIBLINGS, SECOND_DEGREE_RELATIVES, ), ) .default(AMBIGUOUS_RELATIONSHIP) )
[docs]def infer_families( relationship_ht: hl.Table, sex: Union[hl.Table, Dict[str, bool]], duplicate_samples_ht: hl.Table, i_col: str = "i", j_col: str = "j", relationship_col: str = "relationship", ) -> hl.Pedigree: """ Generate a pedigree containing trios inferred from the `relationship_ht`. This function takes a hail Table with a row for each pair of related individuals i, j in the data (it's OK to have unrelated samples too). The `relationship_col` should be a column specifying the relationship between each two samples as defined in this module's constants. This function returns a pedigree containing trios inferred from the data. Family ID can be the same for multiple trios if one or more members of the trios are related (e.g. sibs, multi-generational family). Trios are ordered by family ID. .. note:: This function only returns complete trios defined as: one child, one father and one mother (sex is required for both parents). :param relationship_ht: Input relationship table :param sex: A Table or dict giving the sex for each sample (`TRUE`=female, `FALSE`=male). If a Table is given, it should have a field `is_female`. :param duplicated_samples: All duplicated samples TO REMOVE (If not provided, this function won't work as it assumes that each child has exactly two parents) :param i_col: Column containing the 1st sample of the pair in the relationship table :param j_col: Column containing the 2nd sample of the pair in the relationship table :param relationship_col: Column contatining the relationship for the sample pair as defined in this module constants. :return: Pedigree of complete trios """ def group_parent_child_pairs_by_fam( parent_child_pairs: Iterable[Tuple[str, str]], ) -> List[List[Tuple[str, str]]]: """ Group parent-child pairs into a list of families. A family here is defined as a list of sample-pairs which all share at least one sample with at least one other sample-pair in the list. :param parent_child_pairs: All the parent-children pairs :return: A list of families, where each element of the list is a list of the parent-children pairs """ fam_id = 1 # stores the current family id s_fam = dict() # stores the family id for each sample fams = defaultdict(list) # stores fam_id -> sample-pairs for pair in parent_child_pairs: if pair[0] in s_fam: if pair[1] in s_fam: if ( s_fam[pair[0]] != s_fam[pair[1]] ): # If both samples are in different families, merge the families new_fam_id = s_fam[pair[0]] fam_id_to_merge = s_fam[pair[1]] for s in s_fam: if s_fam[s] == fam_id_to_merge: s_fam[s] = new_fam_id fams[new_fam_id].extend(fams.pop(fam_id_to_merge)) else: # If only the 1st sample in the pair is already in a family, assign the 2nd sample in the pair to the same family s_fam[pair[1]] = s_fam[pair[0]] fams[s_fam[pair[0]]].append(pair) elif ( pair[1] in s_fam ): # If only the 2nd sample in the pair is already in a family, assign the 1st sample in the pair to the same family s_fam[pair[0]] = s_fam[pair[1]] fams[s_fam[pair[1]]].append(pair) else: # If none of the samples in the pair is already in a family, create a new family s_fam[pair[0]] = fam_id s_fam[pair[1]] = fam_id fams[fam_id].append(pair) fam_id += 1 return list(fams.values()) def get_trios( fam_id: str, parent_child_pairs: List[Tuple[str, str]], related_pairs: Dict[Tuple[str, str], str], ) -> List[hl.Trio]: """ Generate trios based on the list of parent-child pairs in the family and all related pairs in the data. Only complete parent/offspring trios are included in the results. The trios are assembled as follows: 1. All pairs of unrelated samples with different sexes within the family are extracted as possible parent pairs 2. For each possible parent pair, a list of all children is constructed (each child in the list has a parent-offspring pair with each parent) 3. If there are multiple children for a given parent pair, all children should be siblings with each other 4. Check that each child was only assigned a single pair of parents. If a child is found to have multiple parent pairs, they are ALL discarded. :param fam_id: The family ID :param parent_child_pairs: The parent-child pairs for this family :param related_pairs: All related sample pairs in the data :return: List of trios in the family """ def get_possible_parents(samples: List[str]) -> List[Tuple[str, str]]: """ Return all pairs of unrelated samples with different sexes within the family are extracted as possible parent pairs. :param samples: All samples in the family :return: Possible parent pairs """ possible_parents = [] for i in range(len(samples)): for j in range(i + 1, len(samples)): if ( related_pairs.get(tuple(sorted([samples[i], samples[j]]))) is None ): if sex.get(samples[i]) is False and sex.get(samples[j]) is True: possible_parents.append((samples[i], samples[j])) elif ( sex.get(samples[i]) is True and sex.get(samples[j]) is False ): possible_parents.append((samples[j], samples[i])) return possible_parents def get_children(possible_parents: Tuple[str, str]) -> List[str]: """ Construct a list of all children for a given possible parent pair. Each child in the list has a parent-offspring pair with each parent. :param possible_parents: A pair of possible parents :return: The list of all children (if any) corresponding to the possible parents """ possible_offsprings = defaultdict( set ) # stores sample -> set of parents in the possible_parents where (sample, parent) is found in possible_child_pairs for pair in parent_child_pairs: if possible_parents[0] == pair[0]: possible_offsprings[pair[1]].add(possible_parents[0]) elif possible_parents[0] == pair[1]: possible_offsprings[pair[0]].add(possible_parents[0]) elif possible_parents[1] == pair[0]: possible_offsprings[pair[1]].add(possible_parents[1]) elif possible_parents[1] == pair[1]: possible_offsprings[pair[0]].add(possible_parents[1]) return [ s for s, parents in possible_offsprings.items() if len(parents) == 2 ] def check_sibs(children: List[str]) -> bool: """ Confirm that all children of a parent pair are siblings with each other. If there are multiple children for a given parent pair, all children should be siblings with each other. :param children: List of all children for a given parent pair :return: Whether all children in the list are siblings """ for i in range(len(children)): for j in range(i + 1, len(children)): if ( related_pairs[tuple(sorted([children[i], children[j]]))] != SIBLINGS ): return False return True def discard_multi_parents_children(trios: List[hl.Trio]): """ Check that each child was only assigned a single pair of parents. If a child is found to have multiple parent pairs, they are ALL discarded. :param trios: All trios formed for this family :return: The list of trios for which each child has a single parents pair. """ children_trios = defaultdict(list) for trio in trios: children_trios[trio.s].append(trio) for s, s_trios in children_trios.items(): if len(s_trios) > 1: logger.warning( "Discarded duplicated child %s found multiple in trios: %s", s, ", ".join([str(trio) for trio in s_trios]), ) return [trios[0] for trios in children_trios.values() if len(trios) == 1] # Get all possible pairs of parents in (father, mother) order all_possible_parents = get_possible_parents( list({s for pair in parent_child_pairs for s in pair}) ) trios = [] for possible_parents in all_possible_parents: children = get_children(possible_parents) if check_sibs(children): trios.extend( [ hl.Trio( s=s, fam_id=fam_id, pat_id=possible_parents[0], mat_id=possible_parents[1], is_female=sex.get(s), ) for s in children ] ) else: logger.warning( "Discarded family with same parents, and multiple offspring that" " weren't siblings:\nMother: %s\nFather:%s\nChildren:%s", possible_parents[0], possible_parents[1], ", ".join(children), ) return discard_multi_parents_children(trios) # Get all the relations we care about: # => Remove unrelateds and duplicates dups = duplicate_samples_ht.aggregate( hl.agg.explode( lambda dup: hl.agg.collect_as_set(dup), duplicate_samples_ht.filtered ), _localize=False, ) relationship_ht = relationship_ht.filter( ~dups.contains(relationship_ht[i_col]) & ~dups.contains(relationship_ht[j_col]) & (relationship_ht[relationship_col] != UNRELATED) ) # Check relatedness table format if not relationship_ht[i_col].dtype == relationship_ht[j_col].dtype: logger.error( "i_col and j_col of the relatedness table need to be of the same type." ) # If i_col and j_col aren't str, then convert them if not isinstance(relationship_ht[i_col], hl.expr.StringExpression): logger.warning( "Pedigrees can only be constructed from string IDs, but your relatedness_ht" " ID column is of type: %s. Expression will be converted to string in" " Pedigrees.", relationship_ht[i_col].dtype, ) if isinstance(relationship_ht[i_col], hl.expr.StructExpression): logger.warning( "Struct fields %s will be joined by underscores to use as sample names" " in Pedigree.", list(relationship_ht[i_col]), ) relationship_ht = relationship_ht.key_by( **{ i_col: hl.delimit( hl.array( [ hl.str(relationship_ht[i_col][x]) for x in relationship_ht[i_col] ] ), "_", ), j_col: hl.delimit( hl.array( [ hl.str(relationship_ht[j_col][x]) for x in relationship_ht[j_col] ] ), "_", ), } ) else: raise NotImplementedError( "The `i_col` and `j_col` columns of the `relationship_ht` argument" " passed to infer_families are not of type StringExpression or Struct." ) # If sex is a Table, extract sex information as a Dict if isinstance(sex, hl.Table): sex = dict(hl.tuple([sex.s, sex.is_female]).collect()) # Collect all related sample pairs and # create a dictionnary with pairs as keys and relationships as values # Sample-pairs are tuples ordered by sample name related_pairs = { tuple(sorted([i, j])): rel for i, j, rel in hl.tuple( [relationship_ht.i, relationship_ht.j, relationship_ht.relationship] ).collect() } parent_child_pairs_by_fam = group_parent_child_pairs_by_fam( [pair for pair, rel in related_pairs.items() if rel == PARENT_CHILD] ) return hl.Pedigree( [ trio for fam_index, parent_child_pairs in enumerate(parent_child_pairs_by_fam) for trio in get_trios(str(fam_index), parent_child_pairs, related_pairs) ] )
[docs]def create_fake_pedigree( n: int, sample_list: List[str], exclude_real_probands: bool = False, max_tries: int = 10, real_pedigree: Optional[hl.Pedigree] = None, sample_list_stratification: Optional[Dict[str, str]] = None, ) -> hl.Pedigree: """ Generate a pedigree made of trios created by sampling 3 random samples in the sample list. - If `real_pedigree` is given, then children in the resulting fake trios will not include any trio with proband - parents that are in the real ones. - Each sample can be used only once as a proband in the resulting trios. - Sex of probands in fake trios is random. :param n: Number of fake trios desired in the pedigree. :param sample_list: List of samples. :param exclude_real_probands: If set, then fake trios probands cannot be in the real trios probands. :param max_tries: Maximum number of sampling to try before bailing out (preventing infinite loop if `n` is too large w.r.t. the number of samples). :param real_pedigree: Optional pedigree to exclude children from. :param sample_list_stratification: Optional dictionary with samples as keys and a value that should be used to stratify samples in `sample_list` into groups that the trio should be picked from. This ensures that each fake trio will contain samples from only the same stratification. For example, if all samples within a fake trio should be chosen from the same platform, this can be a dictionary of sample: platform. :return: Fake pedigree. """ real_trios = ( {trio.s: trio for trio in real_pedigree.trios} if real_pedigree is not None else dict() ) if sample_list_stratification is not None: sample_list_stratified = defaultdict(list) for s in sample_list: s_strata = sample_list_stratification.get(s) if s_strata is None: raise ValueError( f"Sample {s} not found in 'sample_list_stratification' dict!" ) sample_list_stratified[s_strata].append(s) else: sample_list_stratified = None if exclude_real_probands and len(real_trios) == len(set(sample_list)): logger.warning( "All samples are in the real probands list; cannot create any fake" " pedigrees with exclude_real_probands=True. Returning an empty Pedigree." ) return hl.Pedigree([]) fake_trios = {} tries = 0 while len(fake_trios) < n and tries < max_tries: s = random.choice(sample_list) if sample_list_stratified is None: curr_sample_list = sample_list else: curr_sample_list = sample_list_stratified[sample_list_stratification[s]] mat_id, pat_id = random.sample(curr_sample_list, 2) if ( s in real_trios and ( exclude_real_probands or {mat_id, pat_id} == {real_trios[s].mat_id, real_trios[s].pat_id} ) ) or s in fake_trios: tries += 1 else: tries = 0 fake_trios[s] = hl.Trio( s=s, pat_id=pat_id, mat_id=mat_id, fam_id=f"fake_{str(len(fake_trios))}", is_female=bool(random.getrandbits(1)), ) if tries == max_tries: logger.warning( "Only returning %d fake trios; random trio sampling stopped after reaching" " the maximum %d iterations", len(fake_trios), max_tries, ) return hl.Pedigree(list(fake_trios.values()))
[docs]def filter_mt_to_trios(mt: hl.MatrixTable, fam_ht: hl.Table) -> hl.MatrixTable: """ Filter a MatrixTable to a set of trios in `fam_ht` and annotates with adj. :param mt: A Matrix Table to filter to only trios :param fam_ht: A Table of trios to filter to, loaded using `hl.import_fam` :return: A MT filtered to trios and adj annotated """ # Filter MT to samples present in any of the trios fam_ht = fam_ht.annotate(fam_members=[fam_ht.id, fam_ht.pat_id, fam_ht.mat_id]) fam_ht = fam_ht.explode("fam_members", name="s") fam_ht = fam_ht.key_by("s").select().distinct() mt = mt.filter_cols(hl.is_defined(fam_ht[mt.col_key])) if "adj" not in mt.entry: mt = annotate_adj(mt) return mt
[docs]def generate_trio_stats_expr( trio_mt: hl.MatrixTable, transmitted_strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, de_novo_strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, ac_strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, proband_is_female_expr: Optional[hl.expr.BooleanExpression] = None, ) -> hl.expr.StructExpression: """ Generate a row-wise expression containing trio transmission stats. The expression will generate the following counts: - Number of alleles in het parents transmitted to the proband - Number of alleles in het parents not transmitted to the proband - Number of de novo mutations - Parent allele count - Proband allele count Transmission and de novo mutation metrics and allele counts can be stratified using additional filters. `transmitted_strata`, `de_novo_strata`, and `ac_strata` all expect a dictionary of filtering expressions keyed by their desired suffix to append for labeling. The default will perform counts using all genotypes and append 'raw' to the label. .. note:: Expects that `mt` is dense if dealing with a sparse MT `hl.experimental.densify` must be run first. :param trio_mt: A trio standard trio MT (with the format as produced by hail.methods.trio_matrix) :param transmitted_strata: Strata for the transmission counts :param de_novo_strata: Strata for the de novo counts :param ac_strata: Strata for the parent and child allele counts :param proband_is_female_expr: An optional expression giving the sex the proband. If not given, DNMs are only computed for autosomes. :return: An expression with the counts """ # Create map for transmitted, untransmitted and DNM hom_ref = 0 het = 1 hom_var = 2 auto_or_par = 2 hemi_x = 1 hemi_y = 0 trans_config_counts = { # kid, dad, mom, copy -> t, u (hom_ref, het, het, auto_or_par): (0, 2), (hom_ref, hom_ref, het, auto_or_par): (0, 1), (hom_ref, het, hom_ref, auto_or_par): (0, 1), (het, het, het, auto_or_par): (1, 1), (het, hom_ref, het, auto_or_par): (1, 0), (het, het, hom_ref, auto_or_par): (1, 0), (het, hom_var, het, auto_or_par): (0, 1), (het, het, hom_var, auto_or_par): (0, 1), (hom_var, het, het, auto_or_par): (2, 0), (hom_var, het, hom_var, auto_or_par): (1, 0), (hom_var, hom_var, het, auto_or_par): (1, 0), (hom_ref, hom_ref, het, hemi_x): (0, 1), (hom_ref, hom_var, het, hemi_x): (0, 1), (hom_var, hom_ref, het, hemi_x): (1, 0), (hom_var, hom_var, het, hemi_x): (1, 0), } trans_count_map = hl.literal(trans_config_counts) def _get_copy_state(locus: hl.expr.LocusExpression) -> hl.expr.Int32Expression: """Get copy-state int from LocusExpression for indexing into trans_count_map.""" return ( hl.case() .when(locus.in_autosome_or_par(), auto_or_par) .when(locus.in_x_nonpar(), hemi_x) .when(locus.in_y_nonpar(), hemi_y) .or_missing() ) def _is_dnm( proband_gt: hl.expr.CallExpression, father_gt: hl.expr.CallExpression, mother_gt: hl.expr.CallExpression, locus: hl.expr.LocusExpression, proband_is_female: Optional[hl.expr.BooleanExpression], ) -> hl.expr.BooleanExpression: """Determine whether a trio genotype combination is a DNM.""" if proband_is_female is None: logger.warning( "Since no proband sex expression was given to generate_trio_stats_expr," " only DNMs in autosomes will be counted." ) return hl.or_missing( locus.in_autosome(), proband_gt.is_het() & father_gt.is_hom_ref() & mother_gt.is_hom_ref(), ) return hl.if_else( locus.in_autosome_or_par() | (proband_is_female & locus.in_x_nonpar()), proband_gt.is_het() & father_gt.is_hom_ref() & mother_gt.is_hom_ref(), hl.or_missing( ~proband_is_female, proband_gt.is_hom_var() & father_gt.is_hom_ref() ), ) def _ac_an_parent_child_count( proband_gt: hl.expr.CallExpression, father_gt: hl.expr.CallExpression, mother_gt: hl.expr.CallExpression, ) -> Dict[str, hl.expr.Int64Expression]: """Get AC and AN for parents and children.""" ac_parent_expr = hl.agg.sum( father_gt.n_alt_alleles() + mother_gt.n_alt_alleles() ) an_parent_expr = hl.agg.sum( (hl.is_defined(father_gt) + hl.is_defined(mother_gt)) * 2 ) ac_child_expr = hl.agg.sum(proband_gt.n_alt_alleles()) an_child_expr = hl.agg.sum(hl.is_defined(proband_gt) * 2) return { "ac_parents": ac_parent_expr, "an_parents": an_parent_expr, "ac_children": ac_child_expr, "an_children": an_child_expr, } # Create transmission counters trio_stats = hl.struct( **{ f"{name2}_{name}": hl.agg.filter( ( trio_mt.proband_entry.GT.is_non_ref() | trio_mt.father_entry.GT.is_non_ref() | trio_mt.mother_entry.GT.is_non_ref() ) & expr, hl.agg.sum( trans_count_map.get( ( trio_mt.proband_entry.GT.n_alt_alleles(), trio_mt.father_entry.GT.n_alt_alleles(), trio_mt.mother_entry.GT.n_alt_alleles(), _get_copy_state(trio_mt.locus), ), default=(0, 0), )[i] ), ) for name, expr in transmitted_strata.items() for i, name2 in enumerate(["n_transmitted", "n_untransmitted"]) } ) # Create de novo counters trio_stats = trio_stats.annotate( **{ f"n_de_novos_{name}": hl.agg.filter( _is_dnm( trio_mt.proband_entry.GT, trio_mt.father_entry.GT, trio_mt.mother_entry.GT, trio_mt.locus, proband_is_female_expr, ) & expr, hl.agg.count(), ) for name, expr in de_novo_strata.items() } ) trio_stats = trio_stats.annotate( **{ f"{name2}_{name}": hl.agg.filter( expr, _ac_an_parent_child_count( trio_mt.proband_entry.GT, trio_mt.father_entry.GT, trio_mt.mother_entry.GT, )[name2], ) for name, expr in ac_strata.items() for name2 in ["ac_parents", "an_parents", "ac_children", "an_children"] } ) return trio_stats
[docs]def generate_sib_stats_expr( mt: hl.MatrixTable, sib_ht: hl.Table, i_col: str = "i", j_col: str = "j", strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, is_female: Optional[hl.expr.BooleanExpression] = None, ) -> hl.expr.StructExpression: """ Generate a row-wise expression containing the number of alternate alleles in common between sibling pairs. The sibling sharing counts can be stratified using additional filters using `stata`. .. note:: This function expects that the `mt` has either been split or filtered to only bi-allelics If a sample has multiple sibling pairs, only one pair will be counted :param mt: Input matrix table :param sib_ht: Table defining sibling pairs with one sample in a col (`i_col`) and the second in another col (`j_col`) :param i_col: Column containing the 1st sample of the pair in the relationship table :param j_col: Column containing the 2nd sample of the pair in the relationship table :param strata: Dict with additional strata to use when computing shared sibling variant counts :param is_female: An optional column in mt giving the sample sex. If not given, counts are only computed for autosomes. :return: A Table with the sibling shared variant counts """ def _get_alt_count(locus, gt, is_female): """Calculate alt allele count with sex info if present.""" if is_female is None: return hl.or_missing(locus.in_autosome(), gt.n_alt_alleles()) return ( hl.case() .when(locus.in_autosome_or_par(), gt.n_alt_alleles()) .when( ~is_female & (locus.in_x_nonpar() | locus.in_y_nonpar()), hl.min(1, gt.n_alt_alleles()), ) .when(is_female & locus.in_y_nonpar(), 0) .default(0) ) if is_female is None: logger.warning( "Since no sex expression was given to generate_sib_stats_expr, only" " variants in autosomes will be counted." ) # If a sample is in sib_ht more than one time, keep only one of the sibling pairs # First filter to only samples found in mt to keep as many pairs as possible s_to_keep = mt.aggregate_cols(hl.agg.collect_as_set(mt.s), _localize=False) sib_ht = sib_ht.filter( s_to_keep.contains(sib_ht[i_col].s) & s_to_keep.contains(sib_ht[j_col].s) ) sib_ht = sib_ht.add_index("sib_idx") sib_ht = sib_ht.annotate(sibs=[sib_ht[i_col].s, sib_ht[j_col].s]) sib_ht = sib_ht.explode("sibs") sib_ht = sib_ht.group_by("sibs").aggregate( sib_idx=(hl.agg.take(sib_ht.sib_idx, 1, ordering=sib_ht.sib_idx)[0]) ) sib_ht = sib_ht.group_by(sib_ht.sib_idx).aggregate(sibs=hl.agg.collect(sib_ht.sibs)) sib_ht = sib_ht.filter(hl.len(sib_ht.sibs) == 2).persist() logger.info( "Generating sibling variant sharing counts using %d pairs.", sib_ht.count() ) sib_ht = sib_ht.explode("sibs").key_by("sibs")[mt.s] # Create sibling sharing counters sib_stats = hl.struct( **{ f"n_sib_shared_variants_{name}": hl.sum( hl.agg.filter( expr, hl.agg.group_by( sib_ht.sib_idx, hl.or_missing( hl.agg.sum(hl.is_defined(mt.GT)) == 2, hl.agg.min(_get_alt_count(mt.locus, mt.GT, is_female)), ), ), ).values() ) for name, expr in strata.items() } ) sib_stats = sib_stats.annotate( **{ f"ac_sibs_{name}": hl.agg.filter( expr & hl.is_defined(sib_ht.sib_idx), hl.agg.sum(mt.GT.n_alt_alleles()) ) for name, expr in strata.items() } ) return sib_stats