1use anyhow::Result;
3use linked_hash_set::LinkedHashSet;
4use parquet::data_type::AsBytes;
5use rust_htslib::bam::record::Aux;
6
7use std::collections::{HashMap, HashSet};
9use std::env;
10use std::fs::File;
11use std::io::BufWriter;
12use std::path::PathBuf;
13
14use url::Url;
16
17use backoff::ExponentialBackoff;
19
20use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
22
23use bio::io::fastq;
25use rust_htslib::bam::{self, FetchDefinition, IndexedReader, Read};
26use rust_htslib::faidx::Reader;
27
28use crate::env::{gcs_authorize_data_access, local_guess_curl_ca_bundle};
30
31pub fn open_bam(seqs_url: &Url) -> Result<IndexedReader> {
49 if seqs_url.to_string().starts_with("gs://") && env::var("GCS_OAUTH_TOKEN").is_err() {
50 gcs_authorize_data_access();
51 }
52
53 let bam = match IndexedReader::from_url(seqs_url) {
55 Ok(bam) => bam,
56 Err(_) => {
57 crate::elog!("Read '{}', attempt 2 (reauthorizing to GCS)", seqs_url);
58
59 gcs_authorize_data_access();
61
62 match IndexedReader::from_url(seqs_url) {
64 Ok(bam) => bam,
65 Err(_) => {
66 crate::elog!("Read '{}', attempt 3 (overriding cURL CA bundle)", seqs_url);
67
68 local_guess_curl_ca_bundle();
70
71 IndexedReader::from_url(seqs_url)?
73 }
74 }
75 }
76 };
77
78 Ok(bam)
79}
80
81pub fn open_fasta(seqs_url: &Url) -> Result<Reader> {
99 if seqs_url.to_string().starts_with("gs://") && env::var("GCS_OAUTH_TOKEN").is_err() {
100 gcs_authorize_data_access();
101 }
102
103 let fasta = match Reader::from_url(seqs_url) {
105 Ok(fasta) => fasta,
106 Err(_) => {
107 crate::elog!("Read '{}', attempt 2 (reauthorizing to GCS)", seqs_url);
108
109 gcs_authorize_data_access();
111
112 match Reader::from_url(seqs_url) {
114 Ok(bam) => bam,
115 Err(_) => {
116 crate::elog!("Read '{}', attempt 3 (overriding cURL CA bundle)", seqs_url);
117
118 local_guess_curl_ca_bundle();
120
121 Reader::from_url(seqs_url)?
123 }
124 }
125 }
126 };
127
128 Ok(fasta)
129}
130
131fn get_rg_to_sm_mapping(bam: &IndexedReader) -> HashMap<String, String> {
133 let header = bam::Header::from_template(bam.header());
134
135 let rg_sm_map: HashMap<String, String> = header
136 .to_hashmap()
137 .into_iter()
138 .flat_map(|(_, records)| records)
139 .filter(|record| record.contains_key("ID") && record.contains_key("SM"))
140 .map(|record| (record["ID"].clone(), record["SM"].clone()))
141 .collect();
142
143 rg_sm_map
144}
145
146fn get_sm_name_from_rg(read: &bam::Record, rg_sm_map: &HashMap<String, String>) -> Result<String> {
147 let rg = read.aux(b"RG")?;
148
149 if let Aux::String(v) = rg {
150 if let Some(sm) = rg_sm_map.get(v) {
151 Ok(sm.to_owned())
152 } else {
153 Err(anyhow::anyhow!(
154 "Sample name not found for read group: {}",
155 v
156 ))
157 }
158 } else {
159 Err(anyhow::anyhow!("Read group is not a string"))
160 }
161}
162
163pub fn extract_aligned_bam_reads(
165 _basename: &str,
166 bam: &mut IndexedReader,
167 chr: &str,
168 start: &u64,
169 stop: &u64,
170 name: &str,
171 haplotype: Option<u8>,
172) -> Result<Vec<fastq::Record>> {
173 let rg_sm_map = get_rg_to_sm_mapping(bam);
174
175 let mut bmap = HashMap::new();
176
177 let _ = bam.fetch(((*chr).as_bytes(), *start, *stop));
178 for p in bam.pileup() {
179 let pileup = p.unwrap();
180
181 if *start <= (pileup.pos() as u64) && (pileup.pos() as u64) < *stop {
182 for (i, alignment) in pileup.alignments().enumerate().filter(|(_, a)| {
183 haplotype.is_none()
184 || a.record()
185 .aux(b"HP")
186 .ok()
187 .map(|aux| match aux {
188 Aux::U8(v) => v == haplotype.unwrap(),
189 _ => false,
190 })
191 .unwrap_or(false)
192 }) {
193 let qname = String::from_utf8_lossy(alignment.record().qname()).into_owned();
194 let sm = match get_sm_name_from_rg(&alignment.record(), &rg_sm_map) {
195 Ok(a) => a,
196 Err(_) => String::from("unknown"),
197 };
198
199 let is_secondary = alignment.record().is_secondary();
200 let is_supplementary = alignment.record().is_supplementary();
201 let seq_name = format!("{qname}|{name}|{sm}|{i}|{is_secondary}|{is_supplementary}");
202
203 if !bmap.contains_key(&seq_name) {
204 bmap.insert(seq_name.clone(), (String::new(), Vec::new()));
205 }
206
207 if !alignment.is_del() && !alignment.is_refskip() {
208 let a = alignment.record().seq()[alignment.qpos().unwrap()];
209 let q = alignment.record().qual()[alignment.qpos().unwrap()];
210
211 bmap.get_mut(&seq_name).unwrap().0.push(a as char);
212 bmap.get_mut(&seq_name).unwrap().1.push(q + 33 as u8);
213 }
214
215 if let bam::pileup::Indel::Ins(len) = alignment.indel() {
216 if let Some(pos1) = alignment.qpos() {
217 let pos2 = pos1 + (len as usize);
218 for pos in pos1..pos2 {
219 let a = alignment.record().seq()[pos];
220 let q = alignment.record().qual()[pos];
221
222 bmap.get_mut(&seq_name).unwrap().0.push(a as char);
223 bmap.get_mut(&seq_name).unwrap().1.push(q + 33 as u8);
224 }
225 }
226 }
227 }
228 }
229 }
230
231 let records = bmap
232 .iter()
233 .map(|kv| fastq::Record::with_attrs(kv.0.as_str(), None, kv.1.0.as_bytes(), kv.1.1.as_bytes()))
234 .collect();
235
236 Ok(records)
237}
238
239fn extract_unaligned_bam_reads(
241 _basename: &str,
242 bam: &mut IndexedReader,
243) -> Result<Vec<fastq::Record>> {
244 let rg_sm_map = get_rg_to_sm_mapping(bam);
245
246 let _ = bam.fetch(FetchDefinition::Unmapped);
247 let records = bam
248 .records()
249 .map(|r| {
250 let read = r.unwrap();
251 let qname = String::from_utf8_lossy(read.qname()).into_owned();
252 let sm = get_sm_name_from_rg(&read, &rg_sm_map).unwrap();
253
254 let seq_name = format!("{qname}|{sm}");
255
256 let vseq = read.seq().as_bytes();
257 let bseq = vseq.as_bytes();
258 let qseq = read.qual().iter().map(|&q| q + 33).collect::<Vec<u8>>();
259
260 let seq = fastq::Record::with_attrs(seq_name.as_str(), Some(""), bseq, &qseq);
261
262 seq
263 })
264 .collect();
265
266 Ok(records)
267}
268
269fn extract_fasta_seqs(
271 basename: &String,
272 fasta: &mut Reader,
273 chr: &String,
274 start: &u64,
275 stop: &u64,
276 name: &String,
277) -> Result<Vec<fastq::Record>> {
278 let id = format!("{chr}:{start}-{stop}|{name}|{basename}");
279 let seq = fasta.fetch_seq_string(chr, usize::try_from(*start)?, usize::try_from(*stop - 1)?)?;
280
281 if seq.len() > 0 {
282 let records = vec![fastq::Record::with_attrs(id.as_str(), None, seq.as_bytes(), vec![30; seq.len()].as_slice())];
283
284 return Ok(records);
285 }
286
287 Err(anyhow::anyhow!("No sequence found for locus: {}", id))
288}
289
290fn stage_data_from_one_file(
292 seqs_url: &Url,
293 loci: &LinkedHashSet<(String, u64, u64, String)>,
294 unmapped: bool,
295 haplotype: Option<u8>,
296) -> Result<Vec<fastq::Record>> {
297 let mut all_seqs = Vec::new();
298
299 let basename = seqs_url
300 .path_segments()
301 .map(|c| c.collect::<Vec<_>>())
302 .unwrap()
303 .last()
304 .unwrap()
305 .to_string();
306
307 let seqs_str = seqs_url.as_str();
308 if seqs_str.ends_with(".bam") || seqs_str.ends_with(".cram") {
309 let basename = basename
311 .trim_end_matches(".bam")
312 .trim_end_matches(".cram")
313 .to_string();
314 let mut bam = open_bam(seqs_url)?;
315
316 for (chr, start, stop, name) in loci.iter() {
318 let aligned_seqs =
319 extract_aligned_bam_reads(&basename, &mut bam, chr, start, stop, name, haplotype)
320 .unwrap();
321 all_seqs.extend(aligned_seqs);
322 }
323
324 if unmapped {
326 let unaligned_seqs = extract_unaligned_bam_reads(&basename, &mut bam).unwrap();
327 all_seqs.extend(unaligned_seqs);
328 }
329 } else if seqs_str.ends_with(".fa")
330 || seqs_str.ends_with(".fasta")
331 || seqs_str.ends_with(".fa.gz")
332 || seqs_str.ends_with(".fasta.gz")
333 {
334 let basename = basename
336 .trim_end_matches(".fasta.gz")
337 .trim_end_matches(".fasta")
338 .trim_end_matches(".fa.gz")
339 .trim_end_matches(".fa")
340 .to_string();
341 let mut fasta = open_fasta(seqs_url)?;
342
343 for (chr, start, stop, name) in loci.iter() {
344 let seqs = extract_fasta_seqs(&basename, &mut fasta, chr, start, stop, name)
346 .map_or_else(|_| Vec::new(), |s| s);
347
348 all_seqs.extend(seqs);
350 }
351 } else {
352 return Err(anyhow::anyhow!("Unsupported file type: {}", seqs_url));
354 }
355
356 Ok(all_seqs)
357}
358
359fn stage_data_from_all_files(
361 seq_urls: &HashSet<Url>,
362 loci: &LinkedHashSet<(String, u64, u64, String)>,
363 unmapped: bool,
364 haplotype: Option<u8>,
365) -> Result<Vec<fastq::Record>> {
366 let all_data: Vec<_> = seq_urls
368 .par_iter()
369 .map(|seqs_url| {
370 let op = || {
372 let seqs = stage_data_from_one_file(seqs_url, loci, unmapped, haplotype)?;
373 Ok(seqs)
374 };
375
376 match backoff::retry(ExponentialBackoff::default(), op) {
378 Ok(seqs) => seqs,
379 Err(e) => {
380 panic!("Error: {e}");
382 }
383 }
384 })
385 .collect();
386
387 let flattened_data = all_data.into_iter().flatten().collect::<Vec<_>>();
388
389 Ok(flattened_data)
391}
392
393pub fn read_spans_locus(
413 start: i64,
414 end: i64,
415 loci: &HashSet<(String, i64, i64)>,
416) -> Result<bool, String> {
417 if start < 0 || end < 0 {
418 return Err("Error: Negative genomic positions are not allowed.".to_string());
419 }
420
421 Ok(loci.iter().any(|e| start <= e.1 && end >= e.2))
422}
423
424pub fn stage_data(
447 output_path: &PathBuf,
448 loci: &LinkedHashSet<(String, u64, u64, String)>,
449 seq_urls: &HashSet<Url>,
450 unmapped: bool,
451 haplotype: Option<u8>,
452 cache_path: &PathBuf,
453) -> Result<usize> {
454 let current_dir = env::current_dir()?;
455 env::set_current_dir(cache_path).unwrap();
456
457 let all_data = match stage_data_from_all_files(seq_urls, loci, unmapped, haplotype) {
459 Ok(all_data) => all_data,
460 Err(e) => {
461 panic!("Error: {e}");
462 }
463 };
464
465 env::set_current_dir(current_dir).unwrap();
466
467 let mut buf_writer = BufWriter::new(File::create(output_path)?);
469 let mut fastq_writer = fastq::Writer::new(&mut buf_writer);
470
471 for record in all_data.iter() {
472 if record.seq().len() > 0 {
473 fastq_writer.write_record(record)?;
474 }
475 }
476
477 let _ = fastq_writer.flush();
478
479 Ok(all_data.len())
480}
481
482pub fn stage_data_in_memory(
483 loci: &LinkedHashSet<(String, u64, u64, String)>,
484 seq_urls: &HashSet<Url>,
485 unmapped: bool,
486 cache_path: &PathBuf,
487) -> Result<Vec<fastq::Record>> {
488 let current_dir = env::current_dir()?;
489 env::set_current_dir(cache_path).unwrap();
490
491 let all_data = match stage_data_from_all_files(seq_urls, loci, unmapped, None) {
493 Ok(all_data) => all_data,
494 Err(e) => {
495 panic!("Error: {e}");
496 }
497 };
498
499 env::set_current_dir(current_dir).unwrap();
500
501 Ok(all_data)
502}
503
504#[cfg(test)]
505mod tests {
506 use super::*;
507 use std::collections::HashSet;
508 use url::Url;
509
510 #[test]
514 fn test_open_bam() {
515 let seqs_url = Url::parse(
516 "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
517 ).unwrap();
518 let bam = open_bam(&seqs_url);
519
520 assert!(bam.is_ok(), "Failed to open bam file");
521 }
522
523 #[test]
524 fn test_stage_data_from_one_file() {
525 let seqs_url = Url::parse(
526 "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
527 ).unwrap();
528 let mut loci = LinkedHashSet::new();
529 loci.insert(("chr15".to_string(), 23960193, 23963918, "test".to_string()));
530
531 let result = stage_data_from_one_file(&seqs_url, &loci, false, None);
532
533 assert!(result.is_ok(), "Failed to stage data from one file");
534 }
535
536 #[test]
537 fn test_stage_data() {
538 let cache_path = std::env::temp_dir();
539 let output_path = cache_path.join("test.bam");
540
541 let mut loci = LinkedHashSet::new();
542 loci.insert(("chr15".to_string(), 23960193, 23963918, "test".to_string()));
543
544 let seqs_url = Url::parse(
545 "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
546 ).unwrap();
547 let seq_urls = HashSet::from([seqs_url]);
548
549 let result = stage_data(&output_path, &loci, &seq_urls, false, None, &cache_path);
550
551 assert!(result.is_ok(), "Failed to stage data from file");
552 }
553
554 #[test]
555 fn test_stage_multiple_data() {
556 let cache_path = std::env::temp_dir();
557 let output_path = cache_path.join("test.bam");
558
559 let seqs_url_1 = Url::parse(
560 "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84060_230907_210011_s2/reads/ccs/aligned/m84060_230907_210011_s2.bam"
561 ).unwrap();
562 let seqs_url_2 = Url::parse(
563 "gs://fc-8c3900db-633f-477f-96b3-fb31ae265c44/results/PBFlowcell/m84043_230901_211947_s1/reads/ccs/aligned/m84043_230901_211947_s1.hifi_reads.bc2080.bam"
564 ).unwrap();
565 let mut loci = LinkedHashSet::new();
566 loci.insert(("chr15".to_string(), 23960193, 23963918, "test".to_string()));
567
568 let seq_urls = HashSet::from([seqs_url_1, seqs_url_2]);
569
570 let result = stage_data(&output_path, &loci, &seq_urls, false, None, &cache_path);
571
572 assert!(result.is_ok(), "Failed to stage data from all files");
573 }
574}