skydive/
stage.rs

1// Import the Result type from the anyhow crate for error handling.
2use anyhow::Result;
3use linked_hash_set::LinkedHashSet;
4use parquet::data_type::AsBytes;
5use rust_htslib::bam::record::Aux;
6
7// Import various standard library collections.
8use std::collections::{HashMap, HashSet};
9use std::env;
10use std::fs::File;
11use std::io::BufWriter;
12use std::path::PathBuf;
13
14// Import the Url type to work with URLs.
15use url::Url;
16
17// Import ExponentialBackoff for retrying operations.
18use backoff::ExponentialBackoff;
19
20// Import rayon's parallel iterator traits.
21use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
22
23// Import types from rust_htslib for working with BAM files.
24use bio::io::fastq;
25use rust_htslib::bam::{self, FetchDefinition, IndexedReader, Read};
26use rust_htslib::faidx::Reader;
27
28// Import functions for authorizing access to Google Cloud Storage.
29use crate::env::{gcs_authorize_data_access, local_guess_curl_ca_bundle};
30
31/// Function to open a BAM/CRAM file from a URL and cache its contents locally.
32///
33/// # Arguments
34///
35/// * `seqs_url` - A reference to a URL object representing the sequence file URL.
36///
37/// # Returns
38///
39/// An `IndexedReader` object representing the opened BAM/CRAM file.
40///
41/// # Errors
42///
43/// This function returns an error if the BAM/CRAM file cannot be opened.
44///
45/// # Panics
46///
47/// This function panics if the URL scheme is not recognized.
48pub fn open_bam(seqs_url: &Url) -> Result<IndexedReader> {
49    if seqs_url.to_string().starts_with("gs://") && env::var("GCS_OAUTH_TOKEN").is_err() {
50        gcs_authorize_data_access();
51    }
52
53    // Try to open the BAM file from the URL, with retries for authorization.
54    let bam = match IndexedReader::from_url(seqs_url) {
55        Ok(bam) => bam,
56        Err(_) => {
57            crate::elog!("Read '{}', attempt 2 (reauthorizing to GCS)", seqs_url);
58
59            // If opening fails, try authorizing access to Google Cloud Storage.
60            gcs_authorize_data_access();
61
62            // Try opening the BAM file again.
63            match IndexedReader::from_url(seqs_url) {
64                Ok(bam) => bam,
65                Err(_) => {
66                    crate::elog!("Read '{}', attempt 3 (overriding cURL CA bundle)", seqs_url);
67
68                    // If it still fails, guess the cURL CA bundle path.
69                    local_guess_curl_ca_bundle();
70
71                    // Try one last time to open the BAM file.
72                    IndexedReader::from_url(seqs_url)?
73                }
74            }
75        }
76    };
77
78    Ok(bam)
79}
80
81/// Function to open a FASTA file from a URL and cache its contents locally.
82///
83/// # Arguments
84///
85/// * `seqs_url` - A reference to a URL object representing the sequence file URL.
86///
87/// # Returns
88///
89/// A `Reader` object representing the opened FASTA file.
90///
91/// # Errors
92///
93/// This function returns an error if the FASTA file cannot be opened.
94///
95/// # Panics
96///
97/// This function panics if the URL scheme is not recognized.
98pub fn open_fasta(seqs_url: &Url) -> Result<Reader> {
99    if seqs_url.to_string().starts_with("gs://") && env::var("GCS_OAUTH_TOKEN").is_err() {
100        gcs_authorize_data_access();
101    }
102
103    // Try to open the FASTA file from the URL, with retries for authorization.
104    let fasta = match Reader::from_url(seqs_url) {
105        Ok(fasta) => fasta,
106        Err(_) => {
107            crate::elog!("Read '{}', attempt 2 (reauthorizing to GCS)", seqs_url);
108
109            // If opening fails, try authorizing access to Google Cloud Storage.
110            gcs_authorize_data_access();
111
112            // Try opening the BAM file again.
113            match Reader::from_url(seqs_url) {
114                Ok(bam) => bam,
115                Err(_) => {
116                    crate::elog!("Read '{}', attempt 3 (overriding cURL CA bundle)", seqs_url);
117
118                    // If it still fails, guess the cURL CA bundle path.
119                    local_guess_curl_ca_bundle();
120
121                    // Try one last time to open the FASTA file.
122                    Reader::from_url(seqs_url)?
123                }
124            }
125        }
126    };
127
128    Ok(fasta)
129}
130
131// Function to get a mapping between read group and sample name from a BAM header.
132fn get_rg_to_sm_mapping(bam: &IndexedReader) -> HashMap<String, String> {
133    let header = bam::Header::from_template(bam.header());
134
135    let rg_sm_map: HashMap<String, String> = header
136        .to_hashmap()
137        .into_iter()
138        .flat_map(|(_, records)| records)
139        .filter(|record| record.contains_key("ID") && record.contains_key("SM"))
140        .map(|record| (record["ID"].clone(), record["SM"].clone()))
141        .collect();
142
143    rg_sm_map
144}
145
146fn get_sm_name_from_rg(read: &bam::Record, rg_sm_map: &HashMap<String, String>) -> Result<String> {
147    let rg = read.aux(b"RG")?;
148
149    if let Aux::String(v) = rg {
150        if let Some(sm) = rg_sm_map.get(v) {
151            Ok(sm.to_owned())
152        } else {
153            Err(anyhow::anyhow!(
154                "Sample name not found for read group: {}",
155                v
156            ))
157        }
158    } else {
159        Err(anyhow::anyhow!("Read group is not a string"))
160    }
161}
162
163// Function to extract seqs from a BAM file within a specified genomic region.
164pub fn extract_aligned_bam_reads(
165    _basename: &str,
166    bam: &mut IndexedReader,
167    chr: &str,
168    start: &u64,
169    stop: &u64,
170    name: &str,
171    haplotype: Option<u8>,
172) -> Result<Vec<fastq::Record>> {
173    let rg_sm_map = get_rg_to_sm_mapping(bam);
174
175    let mut bmap = HashMap::new();
176
177    let _ = bam.fetch(((*chr).as_bytes(), *start, *stop));
178    for p in bam.pileup() {
179        let pileup = p.unwrap();
180
181        if *start <= (pileup.pos() as u64) && (pileup.pos() as u64) < *stop {
182            for (i, alignment) in pileup.alignments().enumerate().filter(|(_, a)| {
183                haplotype.is_none()
184                    || a.record()
185                        .aux(b"HP")
186                        .ok()
187                        .map(|aux| match aux {
188                            Aux::U8(v) => v == haplotype.unwrap(),
189                            _ => false,
190                        })
191                        .unwrap_or(false)
192            }) {
193                let qname = String::from_utf8_lossy(alignment.record().qname()).into_owned();
194                let sm = match get_sm_name_from_rg(&alignment.record(), &rg_sm_map) {
195                    Ok(a) => a,
196                    Err(_) => String::from("unknown"),
197                };
198
199                let is_secondary = alignment.record().is_secondary();
200                let is_supplementary = alignment.record().is_supplementary();
201                let seq_name = format!("{qname}|{name}|{sm}|{i}|{is_secondary}|{is_supplementary}");
202
203                if !bmap.contains_key(&seq_name) {
204                    bmap.insert(seq_name.clone(), (String::new(), Vec::new()));
205                }
206
207                if !alignment.is_del() && !alignment.is_refskip() {
208                    let a = alignment.record().seq()[alignment.qpos().unwrap()];
209                    let q = alignment.record().qual()[alignment.qpos().unwrap()];
210
211                    bmap.get_mut(&seq_name).unwrap().0.push(a as char);
212                    bmap.get_mut(&seq_name).unwrap().1.push(q + 33 as u8);
213                }
214
215                if let bam::pileup::Indel::Ins(len) = alignment.indel() {
216                    if let Some(pos1) = alignment.qpos() {
217                        let pos2 = pos1 + (len as usize);
218                        for pos in pos1..pos2 {
219                            let a = alignment.record().seq()[pos];
220                            let q = alignment.record().qual()[pos];
221
222                            bmap.get_mut(&seq_name).unwrap().0.push(a as char);
223                            bmap.get_mut(&seq_name).unwrap().1.push(q + 33 as u8);
224                        }
225                    }
226                }
227            }
228        }
229    }
230
231    let records = bmap
232        .iter()
233        .map(|kv| fastq::Record::with_attrs(kv.0.as_str(), None, kv.1.0.as_bytes(), kv.1.1.as_bytes()))
234        .collect();
235
236    Ok(records)
237}
238
239// Function to extract unaligned seqs from a BAM file
240fn extract_unaligned_bam_reads(
241    _basename: &str,
242    bam: &mut IndexedReader,
243) -> Result<Vec<fastq::Record>> {
244    let rg_sm_map = get_rg_to_sm_mapping(bam);
245
246    let _ = bam.fetch(FetchDefinition::Unmapped);
247    let records = bam
248        .records()
249        .map(|r| {
250            let read = r.unwrap();
251            let qname = String::from_utf8_lossy(read.qname()).into_owned();
252            let sm = get_sm_name_from_rg(&read, &rg_sm_map).unwrap();
253
254            let seq_name = format!("{qname}|{sm}");
255
256            let vseq = read.seq().as_bytes();
257            let bseq = vseq.as_bytes();
258            let qseq = read.qual().iter().map(|&q| q + 33).collect::<Vec<u8>>();
259
260            let seq = fastq::Record::with_attrs(seq_name.as_str(), Some(""), bseq, &qseq);
261
262            seq
263        })
264        .collect();
265
266    Ok(records)
267}
268
269// Function to extract seqs from a FASTA file within a specified genomic region.
270fn extract_fasta_seqs(
271    basename: &String,
272    fasta: &mut Reader,
273    chr: &String,
274    start: &u64,
275    stop: &u64,
276    name: &String,
277) -> Result<Vec<fastq::Record>> {
278    let id = format!("{chr}:{start}-{stop}|{name}|{basename}");
279    let seq = fasta.fetch_seq_string(chr, usize::try_from(*start)?, usize::try_from(*stop - 1)?)?;
280
281    if seq.len() > 0 {
282        let records = vec![fastq::Record::with_attrs(id.as_str(), None, seq.as_bytes(), vec![30; seq.len()].as_slice())];
283
284        return Ok(records);
285    }
286
287    Err(anyhow::anyhow!("No sequence found for locus: {}", id))
288}
289
290// Function to stage data from a single file.
291fn stage_data_from_one_file(
292    seqs_url: &Url,
293    loci: &LinkedHashSet<(String, u64, u64, String)>,
294    unmapped: bool,
295    haplotype: Option<u8>,
296) -> Result<Vec<fastq::Record>> {
297    let mut all_seqs = Vec::new();
298
299    let basename = seqs_url
300        .path_segments()
301        .map(|c| c.collect::<Vec<_>>())
302        .unwrap()
303        .last()
304        .unwrap()
305        .to_string();
306
307    let seqs_str = seqs_url.as_str();
308    if seqs_str.ends_with(".bam") || seqs_str.ends_with(".cram") {
309        // Handle BAM/CRAM file processing
310        let basename = basename
311            .trim_end_matches(".bam")
312            .trim_end_matches(".cram")
313            .to_string();
314        let mut bam = open_bam(seqs_url)?;
315
316        // Extract seqs for the current locus.
317        for (chr, start, stop, name) in loci.iter() {
318            let aligned_seqs =
319                extract_aligned_bam_reads(&basename, &mut bam, chr, start, stop, name, haplotype)
320                    .unwrap();
321            all_seqs.extend(aligned_seqs);
322        }
323
324        // Optionally extract unaligned reads.
325        if unmapped {
326            let unaligned_seqs = extract_unaligned_bam_reads(&basename, &mut bam).unwrap();
327            all_seqs.extend(unaligned_seqs);
328        }
329    } else if seqs_str.ends_with(".fa")
330        || seqs_str.ends_with(".fasta")
331        || seqs_str.ends_with(".fa.gz")
332        || seqs_str.ends_with(".fasta.gz")
333    {
334        // Handle FASTA file processing
335        let basename = basename
336            .trim_end_matches(".fasta.gz")
337            .trim_end_matches(".fasta")
338            .trim_end_matches(".fa.gz")
339            .trim_end_matches(".fa")
340            .to_string();
341        let mut fasta = open_fasta(seqs_url)?;
342
343        for (chr, start, stop, name) in loci.iter() {
344            // Extract seqs for the current locus.
345            let seqs = extract_fasta_seqs(&basename, &mut fasta, chr, start, stop, name)
346                .map_or_else(|_| Vec::new(), |s| s);
347
348            // Extend the all_seqs vector with the seqs from the current locus.
349            all_seqs.extend(seqs);
350        }
351    } else {
352        // Handle unknown file extension
353        return Err(anyhow::anyhow!("Unsupported file type: {}", seqs_url));
354    }
355
356    Ok(all_seqs)
357}
358
359// Function to stage data from multiple BAM files.
360fn stage_data_from_all_files(
361    seq_urls: &HashSet<Url>,
362    loci: &LinkedHashSet<(String, u64, u64, String)>,
363    unmapped: bool,
364    haplotype: Option<u8>,
365) -> Result<Vec<fastq::Record>> {
366    // Use a parallel iterator to process multiple BAM files concurrently.
367    let all_data: Vec<_> = seq_urls
368        .par_iter()
369        .map(|seqs_url| {
370            // Define an operation to stage data from one file.
371            let op = || {
372                let seqs = stage_data_from_one_file(seqs_url, loci, unmapped, haplotype)?;
373                Ok(seqs)
374            };
375
376            // Retry the operation with exponential backoff in case of failure.
377            match backoff::retry(ExponentialBackoff::default(), op) {
378                Ok(seqs) => seqs,
379                Err(e) => {
380                    // If all retries fail, panic with an error message.
381                    panic!("Error: {e}");
382                }
383            }
384        })
385        .collect();
386
387    let flattened_data = all_data.into_iter().flatten().collect::<Vec<_>>();
388
389    // Return a flattened vector of sequences
390    Ok(flattened_data)
391}
392
393/// Checks if a given genomic range spans any of the loci in the provided set.
394///
395/// # Arguments
396///
397/// * `start` - The start position of the genomic range.
398/// * `end` - The end position of the genomic range.
399/// * `loci` - A reference to a `HashSet` of tuples representing the loci.
400///
401/// # Returns
402///
403/// A `Result` containing `true` if the range spans any loci, `false` otherwise. Returns an error if the positions are negative.
404///
405/// # Errors
406///
407/// Returns an error if the start or end positions are negative.
408///
409/// # Panics
410///
411/// This function does not panic.
412pub fn read_spans_locus(
413    start: i64,
414    end: i64,
415    loci: &HashSet<(String, i64, i64)>,
416) -> Result<bool, String> {
417    if start < 0 || end < 0 {
418        return Err("Error: Negative genomic positions are not allowed.".to_string());
419    }
420
421    Ok(loci.iter().any(|e| start <= e.1 && end >= e.2))
422}
423
424/// Public function to stage data from multiple BAM files and write to an output file.
425///
426/// # Arguments
427///
428/// * `output_path` - A reference to a `PathBuf` representing the output file path.
429/// * `loci` - A reference to a `HashSet` of tuples representing the loci to extract.
430/// * `seq_urls` - A reference to a `HashSet` of URLs representing the sequence files.
431/// * `unmapped` - A boolean indicating whether to extract unmapped reads.
432/// * `cache_path` - A reference to a `PathBuf` representing the cache directory path.
433///
434/// # Returns
435///
436/// The number of records written to the output file.
437///
438/// # Errors
439///
440/// This function returns an error if the output file cannot be created.
441///
442/// # Panics
443///
444/// If an error occurs while staging data from the files.
445///
446pub fn stage_data(
447    output_path: &PathBuf,
448    loci: &LinkedHashSet<(String, u64, u64, String)>,
449    seq_urls: &HashSet<Url>,
450    unmapped: bool,
451    haplotype: Option<u8>,
452    cache_path: &PathBuf,
453) -> Result<usize> {
454    let current_dir = env::current_dir()?;
455    env::set_current_dir(cache_path).unwrap();
456
457    // Stage data from all BAM files.
458    let all_data = match stage_data_from_all_files(seq_urls, loci, unmapped, haplotype) {
459        Ok(all_data) => all_data,
460        Err(e) => {
461            panic!("Error: {e}");
462        }
463    };
464
465    env::set_current_dir(current_dir).unwrap();
466
467    // Write to a FASTQ file.
468    let mut buf_writer = BufWriter::new(File::create(output_path)?);
469    let mut fastq_writer = fastq::Writer::new(&mut buf_writer);
470
471    for record in all_data.iter() {
472        if record.seq().len() > 0 {
473            fastq_writer.write_record(record)?;
474        }
475    }
476
477    let _ = fastq_writer.flush();
478
479    Ok(all_data.len())
480}
481
482pub fn stage_data_in_memory(
483    loci: &LinkedHashSet<(String, u64, u64, String)>,
484    seq_urls: &HashSet<Url>,
485    unmapped: bool,
486    cache_path: &PathBuf,
487) -> Result<Vec<fastq::Record>> {
488    let current_dir = env::current_dir()?;
489    env::set_current_dir(cache_path).unwrap();
490
491    // Stage data from all BAM files.
492    let all_data = match stage_data_from_all_files(seq_urls, loci, unmapped, None) {
493        Ok(all_data) => all_data,
494        Err(e) => {
495            panic!("Error: {e}");
496        }
497    };
498
499    env::set_current_dir(current_dir).unwrap();
500
501    Ok(all_data)
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507    use std::collections::HashSet;
508    use url::Url;
509
510    // This test may pass, but still print a message to stderr regarding its failure to access data. This is because
511    // open_bam() tries a couple of authorization methods before accessing data, and the initial failures print a
512    // message to stderr.
513    #[test]
514    fn test_open_bam() {
515        let seqs_url = Url::parse(
516            "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
517        ).unwrap();
518        let bam = open_bam(&seqs_url);
519
520        assert!(bam.is_ok(), "Failed to open bam file");
521    }
522
523    #[test]
524    fn test_stage_data_from_one_file() {
525        let seqs_url = Url::parse(
526            "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
527        ).unwrap();
528        let mut loci = LinkedHashSet::new();
529        loci.insert(("chr15".to_string(), 23960193, 23963918, "test".to_string()));
530
531        let result = stage_data_from_one_file(&seqs_url, &loci, false, None);
532
533        assert!(result.is_ok(), "Failed to stage data from one file");
534    }
535
536    #[test]
537    fn test_stage_data() {
538        let cache_path = std::env::temp_dir();
539        let output_path = cache_path.join("test.bam");
540
541        let mut loci = LinkedHashSet::new();
542        loci.insert(("chr15".to_string(), 23960193, 23963918, "test".to_string()));
543
544        let seqs_url = Url::parse(
545            "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
546        ).unwrap();
547        let seq_urls = HashSet::from([seqs_url]);
548
549        let result = stage_data(&output_path, &loci, &seq_urls, false, None, &cache_path);
550
551        assert!(result.is_ok(), "Failed to stage data from file");
552    }
553
554    #[test]
555    fn test_stage_multiple_data() {
556        let cache_path = std::env::temp_dir();
557        let output_path = cache_path.join("test.bam");
558
559        let seqs_url_1 = Url::parse(
560            "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
561        ).unwrap();
562        let seqs_url_2 = Url::parse(
563            "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84043_230901_211947_s1/reads/ccs/aligned/m84043_230901_211947_s1.hifi_reads.bc2080.bam"
564        ).unwrap();
565        let mut loci = LinkedHashSet::new();
566        loci.insert(("chr15".to_string(), 23960193, 23963918, "test".to_string()));
567
568        let seq_urls = HashSet::from([seqs_url_1, seqs_url_2]);
569
570        let result = stage_data(&output_path, &loci, &seq_urls, false, None, &cache_path);
571
572        assert!(result.is_ok(), "Failed to stage data from all files");
573    }
574}