Source code for gnomad.sample_qc.relatedness

# noqa: D100

import logging
import random
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union

import hail as hl
import networkx as nx

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

UNRELATED = "unrelated"
"""
String representation for a pair of unrelated individuals in this module.
Typically >2nd degree relatives, but the threshold is user-dependant.
"""

SECOND_DEGREE_RELATIVES = "second degree relatives"
"""
String representation for a pair of 2nd degree relatives in this module.
"""

PARENT_CHILD = "parent-child"
"""
String representation for a parent-child pair in this module.
"""

SIBLINGS = "siblings"
"""
String representation for a sibling pair in this module.
"""

DUPLICATE_OR_TWINS = "duplicate/twins"
"""
String representation for a pair of samples who are identical (either MZ twins of duplicate) in this module.
"""

AMBIGUOUS_RELATIONSHIP = "ambiguous"
"""
String representation for a pair of samples whose relationship is ambiguous.
This is used in the case of a pair of samples which kinship/IBD values do not correspond to any biological relationship between two individuals.
"""


[docs]def get_duplicated_samples( relationship_ht: hl.Table, i_col: str = "i", j_col: str = "j", rel_col: str = "relationship", ) -> List[Set[str]]: """ Extract the list of duplicate samples using a Table ouput from pc_relate. :param relationship_ht: Table with relationships between pairs of samples :param i_col: Column containing the 1st sample :param j_col: Column containing the 2nd sample :param rel_col: Column containing the sample pair relationship annotated with get_relationship_expr :return: List of sets of samples that are duplicates """ def get_all_dups( s: str, dups: Set[str], samples_duplicates: Dict[str, Set[str]] ) -> Tuple[Set[str], Dict[str, Set[str]]]: """ Create the set of all duplicated samples corresponding to `s` that are found in `sample_duplicates`. Also return the remaining sample duplicates after removing all duplicated samples corresponding to `s`. Works by recursively adding duplicated samples to the set. :param s: sample to identify duplicates for :param dups: set of corresponding samples already identified :param samples_duplicates: dict of sample -> duplicate-pair left to assign :return: (set of duplicates corresponding to s found in samples_duplicates, remaining samples_duplicates) """ if s in samples_duplicates: dups.add(s) s_dups = samples_duplicates.pop(s) for s_dup in s_dups: if s_dup not in dups: dups, samples_duplicates = get_all_dups( s_dup, dups, samples_duplicates ) return dups, samples_duplicates logger.info("Computing duplicate sets") dup_pairs = relationship_ht.aggregate( hl.agg.filter( relationship_ht[rel_col] == DUPLICATE_OR_TWINS, hl.agg.collect(hl.tuple([relationship_ht[i_col], relationship_ht[j_col]])), ) ) samples_duplicates = defaultdict(set) for i, j in dup_pairs: samples_duplicates[i].add(j) samples_duplicates[j].add(i) duplicated_samples = [] while len(samples_duplicates) > 0: dup_set, samples_duplicates = get_all_dups( list(samples_duplicates)[0], set(), samples_duplicates ) duplicated_samples.append(dup_set) return duplicated_samples
[docs]def get_duplicated_samples_ht( duplicated_samples: List[Set[str]], samples_rankings_ht: hl.Table, rank_ann: str = "rank", ): """ Create a HT with duplicated samples sets. Each row is indexed by the sample that is kept and also contains the set of duplicate samples that should be filtered. `samples_rankings_ht` is a HT containing a global rank for each of the samples (smaller is better). :param duplicated_samples: List of sets of duplicated samples :param samples_rankings_ht: HT with global rank for each sample :param rank_ann: Annotation in `samples_ranking_ht` containing each sample global rank (smaller is better). :return: HT with duplicate sample sets, including which to keep/filter """ dups_ht = hl.Table.parallelize( [ hl.struct(dup_set=i, dups=duplicated_samples[i]) for i in range(0, len(duplicated_samples)) ] ) dups_ht = dups_ht.explode(dups_ht.dups, name="_dup") dups_ht = dups_ht.key_by("_dup") dups_ht = dups_ht.annotate(rank=samples_rankings_ht[dups_ht.key][rank_ann]) dups_cols = hl.bind( lambda x: hl.struct(kept=x[0], filtered=x[1:]), hl.sorted( hl.agg.collect(hl.tuple([dups_ht._dup, dups_ht.rank])), key=lambda x: x[1] ).map(lambda x: x[0]), ) dups_ht = dups_ht.group_by(dups_ht.dup_set).aggregate(**dups_cols) if isinstance(dups_ht.kept, hl.expr.StructExpression): dups_ht = dups_ht.key_by(**dups_ht.kept).drop("kept") else: dups_ht = dups_ht.key_by( s=dups_ht.kept ) # Since there is no defined name in the case of a non-struct type, use `s` return dups_ht
[docs]def explode_duplicate_samples_ht(dups_ht: hl.Table) -> hl.Table: """ Explode the result of `get_duplicated_samples_ht`, so that each line contains a single sample. An additional annotation is added: `dup_filtered` indicating which of the duplicated samples was kept. Requires a field `filtered` which type should be the same as the input duplicated samples Table key. :param dups_ht: Input HT :return: Flattened HT """ def get_dups_to_keep_expr(): if dups_ht.filtered.dtype.element_type == dups_ht.key.dtype: return (dups_ht.key, False) elif (len(dups_ht.key) == 1) & ( dups_ht.filtered.dtype.element_type == dups_ht.key[0].dtype ): return (dups_ht.key[0], False) else: raise TypeError( "Cannot explode table as types of the filtered field" f" ({dups_ht.filtered.dtype}) and the key ({dups_ht.key.dtype}) are" " incompatible." ) dups_ht = dups_ht.annotate( dups=hl.array([get_dups_to_keep_expr()]).extend( dups_ht.filtered.map(lambda x: (x, True)) ) ) dups_ht = dups_ht.explode("dups") dups_ht = dups_ht.key_by() return dups_ht.select(s=dups_ht.dups[0], dup_filtered=dups_ht.dups[1]).key_by("s")
[docs]def get_relationship_expr( # TODO: The threshold detection could be easily automated by fitting distributions over the data. kin_expr: hl.expr.NumericExpression, ibd0_expr: hl.expr.NumericExpression, ibd1_expr: hl.expr.NumericExpression, ibd2_expr: hl.expr.NumericExpression, first_degree_kin_thresholds: Tuple[float, float] = (0.19, 0.4), second_degree_min_kin: float = 0.1, ibd0_0_max: float = 0.025, ibd0_25_thresholds: Tuple[float, float] = (0.1, 0.425), # ibd0_50_thresholds = [0.37, 0.625], Not useful for relationship inference # ibd0_100_threshold = 0.625 , Not useful for relationship inference ibd1_0_thresholds: Tuple[float, float] = (-0.15, 0.1), # ibd1_25_thresholds: Tuple[float, float] = (0.1, 0.37), Not useful for # relationship inference ibd1_50_thresholds: Tuple[float, float] = (0.275, 0.75), ibd1_100_min: float = 0.75, ibd2_0_max: float = 0.125, ibd2_25_thresholds: Tuple[float, float] = (0.1, 0.5), ibd2_100_thresholds: Tuple[float, float] = (0.75, 1.25), ) -> hl.expr.StringExpression: """ Return an expression indicating the relationship between a pair of samples given their kin coefficient and IBDO, IBD1, IBD2 values. The kinship coefficient values in the defaults are in line with those output from `hail.methods.pc_relate <https://hail.is/docs/0.2/methods/genetics.html?highlight=pc_relate#hail.methods.pc_relate>`. :param kin_expr: Kin coefficient expression :param ibd0_expr: IBDO expression :param ibd1_expr: IBD1 expression :param ibd2_expr: IDB2 expression :param first_degree_kin_thresholds: (min, max) kinship threshold for 1st degree relatives :param second_degree_min_kin: min kinship threshold for 2nd degree relatives :param ibd0_0_max: max IBD0 threshold for 0 IBD0 sharing :param ibd0_25_thresholds: (min, max) thresholds for 0.25 IBD0 sharing :param ibd1_0_thresholds: (min, max) thresholds for 0 IBD1 sharing. Note that the min is there because pc_relate can output large negative values in some corner cases. :param ibd1_50_thresholds: (min, max) thresholds for 0.5 IBD1 sharing :param ibd1_100_min: min IBD1 threshold for 1.0 IBD1 sharing :param ibd2_0_max: max IBD2 threshold for 0 IBD2 sharing :param ibd2_25_thresholds: (min, max) thresholds for 0.25 IBD2 sharing :param ibd2_100_thresholds: (min, max) thresholds for 1.00 IBD2 sharing. Note that the min is there because pc_relate can output much larger IBD2 values in some corner cases. :return: The relationship annotation using the constants defined in this module. """ return ( hl.case() .when(kin_expr < second_degree_min_kin, UNRELATED) .when((kin_expr < first_degree_kin_thresholds[0]), SECOND_DEGREE_RELATIVES) .when( (kin_expr < first_degree_kin_thresholds[1]) & (ibd0_expr <= ibd0_0_max) & (ibd1_expr >= ibd1_100_min) & (ibd2_expr <= ibd2_0_max), PARENT_CHILD, ) .when( (kin_expr < first_degree_kin_thresholds[1]) & (ibd0_expr >= ibd0_25_thresholds[0]) & (ibd0_expr <= ibd0_25_thresholds[1]) & (ibd1_expr >= ibd1_50_thresholds[0]) & (ibd1_expr <= ibd1_50_thresholds[1]) & (ibd2_expr >= ibd2_25_thresholds[0]) & (ibd2_expr <= ibd2_25_thresholds[1]), SIBLINGS, ) .when( (kin_expr > first_degree_kin_thresholds[1]) & (ibd0_expr < ibd0_0_max) & (ibd1_expr >= ibd1_0_thresholds[0]) & (ibd1_expr <= ibd1_0_thresholds[1]) & (ibd2_expr >= ibd2_100_thresholds[0]) & (ibd2_expr <= ibd2_100_thresholds[1]), DUPLICATE_OR_TWINS, ) .default(AMBIGUOUS_RELATIONSHIP) )
[docs]def get_slope_int_relationship_expr( kin_expr: hl.expr.NumericExpression, y_expr: hl.expr.NumericExpression, parent_child_max_y: float, second_degree_sibling_lower_cutoff_slope: float, second_degree_sibling_lower_cutoff_intercept: float, second_degree_upper_sibling_lower_cutoff_slope: float, second_degree_upper_sibling_lower_cutoff_intercept: float, duplicate_twin_min_kin: float = 0.42, second_degree_min_kin: float = 0.1, duplicate_twin_ibd1_min: float = -0.15, duplicate_twin_ibd1_max: float = 0.1, ibd1_expr: Optional[hl.expr.NumericExpression] = None, ): """ Return an expression indicating the relationship between a pair of samples given slope and intercept cutoffs. The kinship coefficient (`kin_expr`) and an additional metric (`y_expr`) are used to define the relationship between a pair of samples. For this function the slope and intercepts should refer to cutoff lines where the x-axis, or independent variable is the kinship coefficient and the y-axis, or dependent variable, is the metric defined by `y_expr`. Typically, the y-axis metric IBS0, IBS0/IBS2, or IBD0. .. note:: No defaults are provided for the slope and intercept cutoffs because they are highly dependent on the dataset and the metric used in `y_expr`. The relationship expression is determined as follows: - If `kin_expr` < `second_degree_min_kin` -> UNRELATED - If `kin_expr` > `duplicate_twin_min_kin`: - If `y_expr` < `parent_child_max_y`: - If `ibd1_expr` is defined: - If `duplicate_twin_ibd1_min` <= `ibd1_expr` <= ` duplicate_twin_ibd1_max` -> DUPLICATE_OR_TWINS - Else -> AMBIGUOUS_RELATIONSHIP - Else -> DUPLICATE_OR_TWINS - If `y_expr` < `parent_child_max_y` -> PARENT_CHILD - If pair is over second_degree_sibling_lower_cutoff line: - If pair is over second_degree_upper_sibling_lower_cutoff line -> SIBLINGS - Else -> SECOND_DEGREE_RELATIVES - If none of the above conditions are met -> AMBIGUOUS_RELATIONSHIP :param kin_expr: Kin coefficient expression. Used as the x-axis, or independent variable, for the slope and intercept cutoffs. :param y_expr: Expression for the metric to use as the y-axis, or dependent variable, for the slope and intercept cutoffs. This is typically an expression for IBS0, IBS0/IBS2, or IBD0. :param parent_child_max_y: Maximum value of the metric defined by `y_expr` for a parent-child pair. :param second_degree_sibling_lower_cutoff_slope: Slope of the line to use as a lower cutoff for second degree relatives and siblings from parent-child pairs. :param second_degree_sibling_lower_cutoff_intercept: Intercept of the line to use as a lower cutoff for second degree relatives and siblings from parent-child pairs. :param second_degree_upper_sibling_lower_cutoff_slope: Slope of the line to use as an upper cutoff for second degree relatives and a lower cutoff for siblings. :param second_degree_upper_sibling_lower_cutoff_intercept: Intercept of the line to use as an upper cutoff for second degree relatives and a lower cutoff for siblings. :param duplicate_twin_min_kin: Minimum kinship for duplicate or twin pairs. Default is 0.42. :param second_degree_min_kin: Minimum kinship threshold for 2nd degree relatives. Default is 0.08838835. Bycroft et al. (2018) calculates a theoretical kinship of 0.08838835 for a second degree relationship cutoff, but this cutoff should be determined by evaluation of the kinship distribution. :param ibd1_expr: Optional IBD1 expression. If this expression is provided, `duplicate_twin_ibd1_min` and `duplicate_twin_ibd1_max` will be used as an additional cutoff for duplicate or twin pairs. :param duplicate_twin_ibd1_min: Minimum IBD1 cutoff for duplicate or twin pairs. Note: the min is because pc_relate can output large negative values in some corner cases. :param duplicate_twin_ibd1_max: Maximum IBD1 cutoff for duplicate or twin pairs. :return: The relationship annotation using the constants defined in this module. """ # Only use a duplicate/twin IBD1 cutoff if an ibd1_expr is supplied. if ibd1_expr is not None: dup_twin_ibd1_expr = (ibd1_expr >= duplicate_twin_ibd1_min) & ( ibd1_expr <= duplicate_twin_ibd1_max ) else: dup_twin_ibd1_expr = True return ( hl.case() .when(kin_expr < second_degree_min_kin, UNRELATED) .when( kin_expr > duplicate_twin_min_kin, hl.if_else( dup_twin_ibd1_expr & (y_expr < parent_child_max_y), DUPLICATE_OR_TWINS, AMBIGUOUS_RELATIONSHIP, ), ) .when(y_expr < parent_child_max_y, PARENT_CHILD) .when( y_expr > second_degree_sibling_lower_cutoff_slope * kin_expr + second_degree_sibling_lower_cutoff_intercept, hl.if_else( y_expr > second_degree_upper_sibling_lower_cutoff_slope * kin_expr + second_degree_upper_sibling_lower_cutoff_intercept, SIBLINGS, SECOND_DEGREE_RELATIVES, ), ) .default(AMBIGUOUS_RELATIONSHIP) )
[docs]def infer_families( relationship_ht: hl.Table, sex: Union[hl.Table, Dict[str, bool]], duplicate_samples_ht: hl.Table, i_col: str = "i", j_col: str = "j", relationship_col: str = "relationship", ) -> hl.Pedigree: """ Generate a pedigree containing trios inferred from the `relationship_ht`. This function takes a hail Table with a row for each pair of related individuals i, j in the data (it's OK to have unrelated samples too). The `relationship_col` should be a column specifying the relationship between each two samples as defined in this module's constants. This function returns a pedigree containing trios inferred from the data. Family ID can be the same for multiple trios if one or more members of the trios are related (e.g. sibs, multi-generational family). Trios are ordered by family ID. .. note:: This function only returns complete trios defined as: one child, one father and one mother (sex is required for both parents). :param relationship_ht: Input relationship table :param sex: A Table or dict giving the sex for each sample (`TRUE`=female, `FALSE`=male). If a Table is given, it should have a field `is_female`. :param duplicated_samples: All duplicated samples TO REMOVE (If not provided, this function won't work as it assumes that each child has exactly two parents) :param i_col: Column containing the 1st sample of the pair in the relationship table :param j_col: Column containing the 2nd sample of the pair in the relationship table :param relationship_col: Column contatining the relationship for the sample pair as defined in this module constants. :return: Pedigree of complete trios """ def group_parent_child_pairs_by_fam( parent_child_pairs: Iterable[Tuple[str, str]], ) -> List[List[Tuple[str, str]]]: """ Group parent-child pairs into a list of families. A family here is defined as a list of sample-pairs which all share at least one sample with at least one other sample-pair in the list. :param parent_child_pairs: All the parent-children pairs :return: A list of families, where each element of the list is a list of the parent-children pairs """ fam_id = 1 # stores the current family id s_fam = dict() # stores the family id for each sample fams = defaultdict(list) # stores fam_id -> sample-pairs for pair in parent_child_pairs: if pair[0] in s_fam: if pair[1] in s_fam: if ( s_fam[pair[0]] != s_fam[pair[1]] ): # If both samples are in different families, merge the families new_fam_id = s_fam[pair[0]] fam_id_to_merge = s_fam[pair[1]] for s in s_fam: if s_fam[s] == fam_id_to_merge: s_fam[s] = new_fam_id fams[new_fam_id].extend(fams.pop(fam_id_to_merge)) else: # If only the 1st sample in the pair is already in a family, assign the 2nd sample in the pair to the same family s_fam[pair[1]] = s_fam[pair[0]] fams[s_fam[pair[0]]].append(pair) elif ( pair[1] in s_fam ): # If only the 2nd sample in the pair is already in a family, assign the 1st sample in the pair to the same family s_fam[pair[0]] = s_fam[pair[1]] fams[s_fam[pair[1]]].append(pair) else: # If none of the samples in the pair is already in a family, create a new family s_fam[pair[0]] = fam_id s_fam[pair[1]] = fam_id fams[fam_id].append(pair) fam_id += 1 return list(fams.values()) def get_trios( fam_id: str, parent_child_pairs: List[Tuple[str, str]], related_pairs: Dict[Tuple[str, str], str], ) -> List[hl.Trio]: """ Generate trios based on the list of parent-child pairs in the family and all related pairs in the data. Only complete parent/offspring trios are included in the results. The trios are assembled as follows: 1. All pairs of unrelated samples with different sexes within the family are extracted as possible parent pairs 2. For each possible parent pair, a list of all children is constructed (each child in the list has a parent-offspring pair with each parent) 3. If there are multiple children for a given parent pair, all children should be siblings with each other 4. Check that each child was only assigned a single pair of parents. If a child is found to have multiple parent pairs, they are ALL discarded. :param fam_id: The family ID :param parent_child_pairs: The parent-child pairs for this family :param related_pairs: All related sample pairs in the data :return: List of trios in the family """ def get_possible_parents(samples: List[str]) -> List[Tuple[str, str]]: """ Return all pairs of unrelated samples with different sexes within the family are extracted as possible parent pairs. :param samples: All samples in the family :return: Possible parent pairs """ possible_parents = [] for i in range(len(samples)): for j in range(i + 1, len(samples)): if ( related_pairs.get(tuple(sorted([samples[i], samples[j]]))) is None ): if sex.get(samples[i]) is False and sex.get(samples[j]) is True: possible_parents.append((samples[i], samples[j])) elif ( sex.get(samples[i]) is True and sex.get(samples[j]) is False ): possible_parents.append((samples[j], samples[i])) return possible_parents def get_children(possible_parents: Tuple[str, str]) -> List[str]: """ Construct a list of all children for a given possible parent pair. Each child in the list has a parent-offspring pair with each parent. :param possible_parents: A pair of possible parents :return: The list of all children (if any) corresponding to the possible parents """ possible_offsprings = defaultdict( set ) # stores sample -> set of parents in the possible_parents where (sample, parent) is found in possible_child_pairs for pair in parent_child_pairs: if possible_parents[0] == pair[0]: possible_offsprings[pair[1]].add(possible_parents[0]) elif possible_parents[0] == pair[1]: possible_offsprings[pair[0]].add(possible_parents[0]) elif possible_parents[1] == pair[0]: possible_offsprings[pair[1]].add(possible_parents[1]) elif possible_parents[1] == pair[1]: possible_offsprings[pair[0]].add(possible_parents[1]) return [ s for s, parents in possible_offsprings.items() if len(parents) == 2 ] def check_sibs(children: List[str]) -> bool: """ Confirm that all children of a parent pair are siblings with each other. If there are multiple children for a given parent pair, all children should be siblings with each other. :param children: List of all children for a given parent pair :return: Whether all children in the list are siblings """ for i in range(len(children)): for j in range(i + 1, len(children)): if ( related_pairs[tuple(sorted([children[i], children[j]]))] != SIBLINGS ): return False return True def discard_multi_parents_children(trios: List[hl.Trio]): """ Check that each child was only assigned a single pair of parents. If a child is found to have multiple parent pairs, they are ALL discarded. :param trios: All trios formed for this family :return: The list of trios for which each child has a single parents pair. """ children_trios = defaultdict(list) for trio in trios: children_trios[trio.s].append(trio) for s, s_trios in children_trios.items(): if len(s_trios) > 1: logger.warning( "Discarded duplicated child %s found multiple in trios: %s", s, ", ".join([str(trio) for trio in s_trios]), ) return [trios[0] for trios in children_trios.values() if len(trios) == 1] # Get all possible pairs of parents in (father, mother) order all_possible_parents = get_possible_parents( list({s for pair in parent_child_pairs for s in pair}) ) trios = [] for possible_parents in all_possible_parents: children = get_children(possible_parents) if check_sibs(children): trios.extend( [ hl.Trio( s=s, fam_id=fam_id, pat_id=possible_parents[0], mat_id=possible_parents[1], is_female=sex.get(s), ) for s in children ] ) else: logger.warning( "Discarded family with same parents, and multiple offspring that" " weren't siblings:\nMother: %s\nFather:%s\nChildren:%s", possible_parents[0], possible_parents[1], ", ".join(children), ) return discard_multi_parents_children(trios) # Get all the relations we care about: # => Remove unrelateds and duplicates dups = duplicate_samples_ht.aggregate( hl.agg.explode( lambda dup: hl.agg.collect_as_set(dup), duplicate_samples_ht.filtered ), _localize=False, ) relationship_ht = relationship_ht.filter( ~dups.contains(relationship_ht[i_col]) & ~dups.contains(relationship_ht[j_col]) & (relationship_ht[relationship_col] != UNRELATED) ) # Check relatedness table format if not relationship_ht[i_col].dtype == relationship_ht[j_col].dtype: logger.error( "i_col and j_col of the relatedness table need to be of the same type." ) # If i_col and j_col aren't str, then convert them if not isinstance(relationship_ht[i_col], hl.expr.StringExpression): logger.warning( "Pedigrees can only be constructed from string IDs, but your relatedness_ht" " ID column is of type: %s. Expression will be converted to string in" " Pedigrees.", relationship_ht[i_col].dtype, ) if isinstance(relationship_ht[i_col], hl.expr.StructExpression): logger.warning( "Struct fields %s will be joined by underscores to use as sample names" " in Pedigree.", list(relationship_ht[i_col]), ) relationship_ht = relationship_ht.key_by( **{ i_col: hl.delimit( hl.array( [ hl.str(relationship_ht[i_col][x]) for x in relationship_ht[i_col] ] ), "_", ), j_col: hl.delimit( hl.array( [ hl.str(relationship_ht[j_col][x]) for x in relationship_ht[j_col] ] ), "_", ), } ) else: raise NotImplementedError( "The `i_col` and `j_col` columns of the `relationship_ht` argument" " passed to infer_families are not of type StringExpression or Struct." ) # If sex is a Table, extract sex information as a Dict if isinstance(sex, hl.Table): sex = dict(hl.tuple([sex.s, sex.is_female]).collect()) # Collect all related sample pairs and # create a dictionnary with pairs as keys and relationships as values # Sample-pairs are tuples ordered by sample name related_pairs = { tuple(sorted([i, j])): rel for i, j, rel in hl.tuple( [relationship_ht.i, relationship_ht.j, relationship_ht.relationship] ).collect() } parent_child_pairs_by_fam = group_parent_child_pairs_by_fam( [pair for pair, rel in related_pairs.items() if rel == PARENT_CHILD] ) return hl.Pedigree( [ trio for fam_index, parent_child_pairs in enumerate(parent_child_pairs_by_fam) for trio in get_trios(str(fam_index), parent_child_pairs, related_pairs) ] )
[docs]def create_fake_pedigree( n: int, sample_list: List[str], exclude_real_probands: bool = False, max_tries: int = 10, real_pedigree: Optional[hl.Pedigree] = None, sample_list_stratification: Optional[Dict[str, str]] = None, ) -> hl.Pedigree: """ Generate a pedigree made of trios created by sampling 3 random samples in the sample list. - If `real_pedigree` is given, then children in the resulting fake trios will not include any trio with proband - parents that are in the real ones. - Each sample can be used only once as a proband in the resulting trios. - Sex of probands in fake trios is random. :param n: Number of fake trios desired in the pedigree. :param sample_list: List of samples. :param exclude_real_probands: If set, then fake trios probands cannot be in the real trios probands. :param max_tries: Maximum number of sampling to try before bailing out (preventing infinite loop if `n` is too large w.r.t. the number of samples). :param real_pedigree: Optional pedigree to exclude children from. :param sample_list_stratification: Optional dictionary with samples as keys and a value that should be used to stratify samples in `sample_list` into groups that the trio should be picked from. This ensures that each fake trio will contain samples from only the same stratification. For example, if all samples within a fake trio should be chosen from the same platform, this can be a dictionary of sample: platform. :return: Fake pedigree. """ real_trios = ( {trio.s: trio for trio in real_pedigree.trios} if real_pedigree is not None else dict() ) if sample_list_stratification is not None: sample_list_stratified = defaultdict(list) for s in sample_list: s_strata = sample_list_stratification.get(s) if s_strata is None: raise ValueError( f"Sample {s} not found in 'sample_list_stratification' dict!" ) sample_list_stratified[s_strata].append(s) else: sample_list_stratified = None if exclude_real_probands and len(real_trios) == len(set(sample_list)): logger.warning( "All samples are in the real probands list; cannot create any fake" " pedigrees with exclude_real_probands=True. Returning an empty Pedigree." ) return hl.Pedigree([]) fake_trios = {} tries = 0 while len(fake_trios) < n and tries < max_tries: s = random.choice(sample_list) if sample_list_stratified is None: curr_sample_list = sample_list else: curr_sample_list = sample_list_stratified[sample_list_stratification[s]] mat_id, pat_id = random.sample(curr_sample_list, 2) if ( s in real_trios and ( exclude_real_probands or {mat_id, pat_id} == {real_trios[s].mat_id, real_trios[s].pat_id} ) ) or s in fake_trios: tries += 1 else: tries = 0 fake_trios[s] = hl.Trio( s=s, pat_id=pat_id, mat_id=mat_id, fam_id=f"fake_{str(len(fake_trios))}", is_female=bool(random.getrandbits(1)), ) if tries == max_tries: logger.warning( "Only returning %d fake trios; random trio sampling stopped after reaching" " the maximum %d iterations", len(fake_trios), max_tries, ) return hl.Pedigree(list(fake_trios.values()))
[docs]def filter_to_trios( mtds: Union[hl.Table, hl.MatrixTable, hl.vds.VariantDataset], fam_ht: hl.Table ) -> Union[hl.Table, hl.MatrixTable, hl.vds.VariantDataset]: """ Filter a Table, MatrixTable or VariantDataset to a set of trios in `fam_ht`. .. note:: Using `filter_cols` in MatrixTable will not affect the number of rows (variants), however, using `filter_samples` in VariantDataset will remove the variants that are not present in any of the trios. :param mtds: A Variant Dataset or a Matrix Table or a Table to filter to only trios. :param fam_ht: A Table of trios to filter to, loaded using `hl.import_fam`. :return: A Table, MatrixTable or VariantDataset with only the trios in `fam_ht`. """ # Filter to samples present in any of the trios. fam_ht = fam_ht.annotate(fam_members=[fam_ht.id, fam_ht.pat_id, fam_ht.mat_id]) fam_ht = fam_ht.explode("fam_members", name="s") fam_ht = fam_ht.key_by("s").select().distinct() if isinstance(mtds, hl.Table): mtds = mtds.semi_join(fam_ht) elif isinstance(mtds, hl.MatrixTable): mtds = mtds.filter_cols(hl.is_defined(fam_ht[mtds.col_key])) elif isinstance(mtds, hl.vds.VariantDataset): mtds = hl.vds.filter_samples(mtds, fam_ht) else: raise ValueError("mtds must be a Table, MatrixTable, or VariantDataset") return mtds
[docs]def generate_trio_stats_expr( trio_mt: hl.MatrixTable, transmitted_strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, de_novo_strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, ac_strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, proband_is_female_expr: Optional[hl.expr.BooleanExpression] = None, ) -> hl.expr.StructExpression: """ Generate a row-wise expression containing trio transmission stats. The expression will generate the following counts: - Number of alleles in het parents transmitted to the proband - Number of alleles in het parents not transmitted to the proband - Number of de novo mutations - Parent allele count - Proband allele count Transmission and de novo mutation metrics and allele counts can be stratified using additional filters. `transmitted_strata`, `de_novo_strata`, and `ac_strata` all expect a dictionary of filtering expressions keyed by their desired suffix to append for labeling. The default will perform counts using all genotypes and append 'raw' to the label. .. note:: Expects that `mt` is dense if dealing with a sparse MT `hl.experimental.densify` must be run first. :param trio_mt: A trio standard trio MT (with the format as produced by hail.methods.trio_matrix) :param transmitted_strata: Strata for the transmission counts :param de_novo_strata: Strata for the de novo counts :param ac_strata: Strata for the parent and child allele counts :param proband_is_female_expr: An optional expression giving the sex the proband. If not given, DNMs are only computed for autosomes. :return: An expression with the counts """ # Create map for transmitted, untransmitted and DNM hom_ref = 0 het = 1 hom_var = 2 auto_or_par = 2 hemi_x = 1 hemi_y = 0 trans_config_counts = { # kid, dad, mom, copy -> t, u (hom_ref, het, het, auto_or_par): (0, 2), (hom_ref, hom_ref, het, auto_or_par): (0, 1), (hom_ref, het, hom_ref, auto_or_par): (0, 1), (het, het, het, auto_or_par): (1, 1), (het, hom_ref, het, auto_or_par): (1, 0), (het, het, hom_ref, auto_or_par): (1, 0), (het, hom_var, het, auto_or_par): (0, 1), (het, het, hom_var, auto_or_par): (0, 1), (hom_var, het, het, auto_or_par): (2, 0), (hom_var, het, hom_var, auto_or_par): (1, 0), (hom_var, hom_var, het, auto_or_par): (1, 0), (hom_ref, hom_ref, het, hemi_x): (0, 1), (hom_ref, hom_var, het, hemi_x): (0, 1), (hom_var, hom_ref, het, hemi_x): (1, 0), (hom_var, hom_var, het, hemi_x): (1, 0), } trans_count_map = hl.literal(trans_config_counts) def _get_copy_state(locus: hl.expr.LocusExpression) -> hl.expr.Int32Expression: """Get copy-state int from LocusExpression for indexing into trans_count_map.""" return ( hl.case() .when(locus.in_autosome_or_par(), auto_or_par) .when(locus.in_x_nonpar(), hemi_x) .when(locus.in_y_nonpar(), hemi_y) .or_missing() ) def _is_dnm( proband_gt: hl.expr.CallExpression, father_gt: hl.expr.CallExpression, mother_gt: hl.expr.CallExpression, locus: hl.expr.LocusExpression, proband_is_female: Optional[hl.expr.BooleanExpression], ) -> hl.expr.BooleanExpression: """Determine whether a trio genotype combination is a DNM.""" if proband_is_female is None: logger.warning( "Since no proband sex expression was given to generate_trio_stats_expr," " only DNMs in autosomes will be counted." ) return hl.or_missing( locus.in_autosome(), proband_gt.is_het() & father_gt.is_hom_ref() & mother_gt.is_hom_ref(), ) return hl.if_else( locus.in_autosome_or_par() | (proband_is_female & locus.in_x_nonpar()), proband_gt.is_het() & father_gt.is_hom_ref() & mother_gt.is_hom_ref(), hl.or_missing( ~proband_is_female, proband_gt.is_hom_var() & father_gt.is_hom_ref() ), ) def _ac_an_parent_child_count( proband_gt: hl.expr.CallExpression, father_gt: hl.expr.CallExpression, mother_gt: hl.expr.CallExpression, ) -> Dict[str, hl.expr.Int64Expression]: """Get AC and AN for parents and children.""" ac_parent_expr = hl.agg.sum( father_gt.n_alt_alleles() + mother_gt.n_alt_alleles() ) an_parent_expr = hl.agg.sum( (hl.is_defined(father_gt) + hl.is_defined(mother_gt)) * 2 ) ac_child_expr = hl.agg.sum(proband_gt.n_alt_alleles()) an_child_expr = hl.agg.sum(hl.is_defined(proband_gt) * 2) return { "ac_parents": ac_parent_expr, "an_parents": an_parent_expr, "ac_children": ac_child_expr, "an_children": an_child_expr, } # Create transmission counters trio_stats = hl.struct( **{ f"{name2}_{name}": hl.agg.filter( ( trio_mt.proband_entry.GT.is_non_ref() | trio_mt.father_entry.GT.is_non_ref() | trio_mt.mother_entry.GT.is_non_ref() ) & expr, hl.agg.sum( trans_count_map.get( ( trio_mt.proband_entry.GT.n_alt_alleles(), trio_mt.father_entry.GT.n_alt_alleles(), trio_mt.mother_entry.GT.n_alt_alleles(), _get_copy_state(trio_mt.locus), ), default=(0, 0), )[i] ), ) for name, expr in transmitted_strata.items() for i, name2 in enumerate(["n_transmitted", "n_untransmitted"]) } ) # Create de novo counters trio_stats = trio_stats.annotate( **{ f"n_de_novos_{name}": hl.agg.filter( _is_dnm( trio_mt.proband_entry.GT, trio_mt.father_entry.GT, trio_mt.mother_entry.GT, trio_mt.locus, proband_is_female_expr, ) & expr, hl.agg.count(), ) for name, expr in de_novo_strata.items() } ) trio_stats = trio_stats.annotate( **{ f"{name2}_{name}": hl.agg.filter( expr, _ac_an_parent_child_count( trio_mt.proband_entry.GT, trio_mt.father_entry.GT, trio_mt.mother_entry.GT, )[name2], ) for name, expr in ac_strata.items() for name2 in ["ac_parents", "an_parents", "ac_children", "an_children"] } ) return trio_stats
[docs]def generate_sib_stats_expr( mt: hl.MatrixTable, sib_ht: hl.Table, i_col: str = "i", j_col: str = "j", strata: Dict[str, hl.expr.BooleanExpression] = {"raw": True}, is_female: Optional[hl.expr.BooleanExpression] = None, ) -> hl.expr.StructExpression: """ Generate a row-wise expression containing the number of alternate alleles in common between sibling pairs. The sibling sharing counts can be stratified using additional filters using `stata`. .. note:: This function expects that the `mt` has either been split or filtered to only bi-allelics If a sample has multiple sibling pairs, only one pair will be counted :param mt: Input matrix table :param sib_ht: Table defining sibling pairs with one sample in a col (`i_col`) and the second in another col (`j_col`) :param i_col: Column containing the 1st sample of the pair in the relationship table :param j_col: Column containing the 2nd sample of the pair in the relationship table :param strata: Dict with additional strata to use when computing shared sibling variant counts :param is_female: An optional column in mt giving the sample sex. If not given, counts are only computed for autosomes. :return: A Table with the sibling shared variant counts """ def _get_alt_count(locus, gt, is_female): """Calculate alt allele count with sex info if present.""" if is_female is None: return hl.or_missing(locus.in_autosome(), gt.n_alt_alleles()) return ( hl.case() .when(locus.in_autosome_or_par(), gt.n_alt_alleles()) .when( ~is_female & (locus.in_x_nonpar() | locus.in_y_nonpar()), hl.min(1, gt.n_alt_alleles()), ) .when(is_female & locus.in_y_nonpar(), 0) .default(0) ) if is_female is None: logger.warning( "Since no sex expression was given to generate_sib_stats_expr, only" " variants in autosomes will be counted." ) # If a sample is in sib_ht more than one time, keep only one of the sibling pairs # First filter to only samples found in mt to keep as many pairs as possible s_to_keep = mt.aggregate_cols(hl.agg.collect_as_set(mt.s), _localize=False) sib_ht = sib_ht.filter( s_to_keep.contains(sib_ht[i_col].s) & s_to_keep.contains(sib_ht[j_col].s) ) sib_ht = sib_ht.add_index("sib_idx") sib_ht = sib_ht.annotate(sibs=[sib_ht[i_col].s, sib_ht[j_col].s]) sib_ht = sib_ht.explode("sibs") sib_ht = sib_ht.group_by("sibs").aggregate( sib_idx=(hl.agg.take(sib_ht.sib_idx, 1, ordering=sib_ht.sib_idx)[0]) ) sib_ht = sib_ht.group_by(sib_ht.sib_idx).aggregate(sibs=hl.agg.collect(sib_ht.sibs)) sib_ht = sib_ht.filter(hl.len(sib_ht.sibs) == 2).persist() logger.info( "Generating sibling variant sharing counts using %d pairs.", sib_ht.count() ) sib_ht = sib_ht.explode("sibs").key_by("sibs")[mt.s] # Create sibling sharing counters sib_stats = hl.struct( **{ f"n_sib_shared_variants_{name}": hl.sum( hl.agg.filter( expr, hl.agg.group_by( sib_ht.sib_idx, hl.or_missing( hl.agg.sum(hl.is_defined(mt.GT)) == 2, hl.agg.min(_get_alt_count(mt.locus, mt.GT, is_female)), ), ), ).values() ) for name, expr in strata.items() } ) sib_stats = sib_stats.annotate( **{ f"ac_sibs_{name}": hl.agg.filter( expr & hl.is_defined(sib_ht.sib_idx), hl.agg.sum(mt.GT.n_alt_alleles()) ) for name, expr in strata.items() } ) return sib_stats