Source code for gnomad.sample_qc.platform

# noqa: D100

import logging
from typing import List, Optional, Tuple

import hail as hl
import numpy as np

from gnomad.utils.annotations import bi_allelic_expr
from gnomad.utils.filtering import filter_to_autosomes

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


[docs]def compute_callrate_mt( mt: hl.MatrixTable, intervals_ht: hl.Table, bi_allelic_only: bool = True, autosomes_only: bool = True, match: bool = True, ) -> hl.MatrixTable: """ Compute a sample/interval MT with each entry containing the call rate for that sample/interval. This can be used as input for imputing exome sequencing platforms. .. note:: The input interval HT should have a key of type Interval. The resulting table will have a key of the same type as the `intervals_ht` table and contain an `interval_info` field containing all non-key fields of the `intervals_ht`. :param mt: Input MT :param intervals_ht: Table containing the intervals. This table has to be keyed by locus. :param bi_allelic_only: If set, only bi-allelic sites are used for the computation :param autosomes_only: If set, only autosomal intervals are used. :param matches: If set, returns all intervals in intervals_ht that overlap the locus in the input MT. :return: Callrate MT """ logger.info("Computing call rate MatrixTable") if len(intervals_ht.key) != 1 or not isinstance( intervals_ht.key[0], hl.expr.IntervalExpression ): logger.warning( "Call rate matrix computation expects `intervals_ht` with a key of type" " Interval. Found: %s", intervals_ht.key, ) if autosomes_only: callrate_mt = filter_to_autosomes(mt) if bi_allelic_only: callrate_mt = callrate_mt.filter_rows(bi_allelic_expr(callrate_mt)) intervals_ht = intervals_ht.annotate(_interval_key=intervals_ht.key) callrate_mt = callrate_mt.annotate_rows( _interval_key=intervals_ht.index( callrate_mt.locus, all_matches=match )._interval_key ) if match: callrate_mt = callrate_mt.explode_rows("_interval_key") callrate_mt = callrate_mt.filter_rows( hl.is_defined(callrate_mt._interval_key.interval) ) callrate_mt = callrate_mt.select_entries( GT=hl.or_missing(hl.is_defined(callrate_mt.GT), hl.struct()) ) callrate_mt = callrate_mt.group_rows_by(**callrate_mt._interval_key).aggregate( callrate=hl.agg.fraction(hl.is_defined(callrate_mt.GT)) ) intervals_ht = intervals_ht.drop("_interval_key") callrate_mt = callrate_mt.annotate_rows( interval_info=hl.struct(**intervals_ht[callrate_mt.row_key]) ) return callrate_mt
[docs]def run_platform_pca( callrate_mt: hl.MatrixTable, binarization_threshold: Optional[float] = 0.25, n_pcs: int = 10, ) -> Tuple[List[float], hl.Table, hl.Table]: """ Run PCA on a sample/interval MT with each entry containing the call rate. When `binzarization_threshold` is set, the callrate is transformed to a 0/1 value based on the threshold. E.g. with the default threshold of 0.25, all entries with a callrate < 0.25 are considered as 0s, others as 1s. :param callrate_mt: Input callrate MT :param binarization_threshold: binzarization_threshold. None is no threshold desired :param n_pcs: Number of PCs to compute :return: eigenvalues, scores_ht, loadings_ht """ logger.info("Running platform PCA") if binarization_threshold is not None: callrate_mt = callrate_mt.annotate_entries( callrate=hl.int(callrate_mt.callrate > binarization_threshold) ) # Center until Hail's PCA does it for you callrate_mt = callrate_mt.annotate_rows( mean_callrate=hl.agg.mean(callrate_mt.callrate) ) callrate_mt = callrate_mt.annotate_entries( callrate=callrate_mt.callrate - callrate_mt.mean_callrate ) eigenvalues, scores, loadings = hl.pca( callrate_mt.callrate, compute_loadings=True, k=n_pcs, ) # TODO: Evaluate whether computing loadings is a good / worthy thing logger.info("Platform PCA eigenvalues: %s", eigenvalues) return eigenvalues, scores, loadings
[docs]def assign_platform_from_pcs( platform_pca_scores_ht: hl.Table, pc_scores_ann: str = "scores", hdbscan_min_cluster_size: Optional[int] = None, hdbscan_min_samples: int = None, ) -> hl.Table: """ Assign platforms using HBDSCAN on the results of call rate PCA. :param platform_pca_scores_ht: Input table with the PCA score for each sample :param pc_scores_ann: Field containing the scores :param hdbscan_min_cluster_size: HDBSCAN `min_cluster_size` parameter. If not specified the smallest of 500 and 0.1*n_samples will be used. :param hdbscan_min_samples: HDBSCAN `min_samples` parameter :return: A Table with a `qc_platform` annotation containing the platform based on HDBSCAN clustering """ import hdbscan logger.info("Assigning platforms based on platform PCA clustering") # Read and format data for clustering data = platform_pca_scores_ht.to_pandas() callrate_data = np.matrix(data[pc_scores_ann].tolist()) logger.info("Assigning platforms to %d samples.", len(callrate_data)) # Cluster data if hdbscan_min_cluster_size is None: hdbscan_min_cluster_size = min(500, 0.1 * data.shape[0]) clusterer = hdbscan.HDBSCAN( min_cluster_size=hdbscan_min_cluster_size, min_samples=hdbscan_min_samples ) cluster_labels = clusterer.fit_predict(callrate_data) n_clusters = len(set(cluster_labels)) - ( -1 in cluster_labels ) # NOTE: -1 is the label for noisy (un-classifiable) data points logger.info("Found %d unique platforms during platform imputation.", n_clusters) data["qc_platform"] = cluster_labels ht = hl.Table.from_pandas(data, key=[*platform_pca_scores_ht.key]) ht = ht.annotate(qc_platform="platform_" + hl.str(ht.qc_platform)) return ht