first implementation but far to be optimal

This commit is contained in:
Eric Coissac
2026-04-16 22:38:20 +02:00
commit de3f9b16cf
19336 changed files with 380276 additions and 0 deletions
+9
View File
@@ -0,0 +1,9 @@
[package]
name = "obiread"
version = "0.1.0"
edition = "2024"
[dependencies]
bytes = "1"
niffler = { version = "2", default-features = false, features = ["gz", "bz2", "lzma", "zstd"] }
ureq = "2"
+281
View File
@@ -0,0 +1,281 @@
//! Chunk iterator: yields rope slices that each end on a complete sequence record.
//!
//! Each `Vec<Bytes>` yielded by [`SeqChunkIter`] contains one or more reference-counted
//! byte slices that together form a self-contained block of complete sequence records.
//! The slices are NOT contiguous in memory — the consumer iterates over them in order.
//!
//! The splitter operates directly on the rope via [`RopeCursor`], so no packing
//! (flattening into a contiguous buffer) is ever required — even for sequences
//! longer than the read block size.
use std::io::{self, Read};
use bytes::{Bytes, BytesMut};
/// A splitter function: given the accumulated rope, returns the absolute byte
/// offset at which to cut, or `None` if no complete-record boundary was found.
pub type Splitter = fn(&[Bytes]) -> Option<usize>;
/// Iterator that reads from `R` in blocks and yields `Vec<Bytes>` chunks,
/// each ending on a complete sequence record boundary.
pub struct SeqChunkIter<R> {
source: R,
rope: Vec<Bytes>,
block_size: usize,
splitter: Splitter,
probe: &'static [u8],
eof: bool,
}
impl<R: Read> SeqChunkIter<R> {
/// Create a new iterator.
///
/// - `block_size`: bytes per read call (default 1 MiB).
/// - `splitter`: format-specific backward boundary detector working on the rope.
/// - `probe`: short byte string whose presence is necessary (not sufficient)
/// for a boundary to exist (e.g. `b"\n>"` for FASTA, `b"\n@"` for FASTQ).
pub fn new(source: R, block_size: usize, splitter: Splitter, probe: &'static [u8]) -> Self {
Self {
source,
rope: Vec::with_capacity(4),
block_size,
splitter,
probe,
eof: false,
}
}
/// Read one block from source into a fresh `Bytes`.
/// Returns `None` on EOF (zero bytes read).
fn read_block(&mut self) -> io::Result<Option<Bytes>> {
let mut buf = BytesMut::zeroed(self.block_size);
let mut filled = 0;
loop {
match self.source.read(&mut buf[filled..]) {
Ok(0) => break,
Ok(n) => {
filled += n;
if filled == self.block_size {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
if filled == 0 {
return Ok(None);
}
buf.truncate(filled);
Ok(Some(buf.freeze()))
}
/// Check whether the boundary probe might appear in the newly added block
/// or at the seam between the last two blocks.
///
/// This is a fast O(block_size) heuristic: if the probe is absent, the
/// splitter is not called.
fn probe_in_last_chunk(&self) -> bool {
let last = match self.rope.last() {
Some(b) => b,
None => return false,
};
// Within the last block.
if last.windows(self.probe.len()).any(|w| w == self.probe) {
return true;
}
// At the seam between the previous block and this one.
if self.rope.len() >= 2 {
let prev = &self.rope[self.rope.len() - 2];
let overlap = self.probe.len() - 1;
let from = prev.len().saturating_sub(overlap);
let seam: Vec<u8> = prev[from..]
.iter()
.chain(last.iter().take(overlap))
.copied()
.collect();
if seam.windows(self.probe.len()).any(|w| w == self.probe) {
return true;
}
}
false
}
}
impl<R: Read> Iterator for SeqChunkIter<R> {
type Item = io::Result<Vec<Bytes>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.eof {
if self.rope.is_empty() {
return None;
}
return Some(Ok(std::mem::take(&mut self.rope)));
}
match self.read_block() {
Err(e) => return Some(Err(e)),
Ok(None) => {
self.eof = true;
continue;
}
Ok(Some(block)) => {
self.rope.push(block);
}
}
if self.probe_in_last_chunk() {
if let Some(abs_offset) = (self.splitter)(&self.rope) {
return Some(Ok(self.split_at(abs_offset)));
}
}
}
}
}
impl<R> SeqChunkIter<R> {
/// Split the rope at absolute byte offset `abs_offset`.
///
/// Returns the chunk (`rope[..abs_offset]`) as a `Vec<Bytes>` and stores
/// the remainder (`rope[abs_offset..]`) in `self.rope`.
fn split_at(&mut self, abs_offset: usize) -> Vec<Bytes> {
let mut chunk = Vec::with_capacity(self.rope.len());
let mut remaining = abs_offset;
let mut old_rope = std::mem::take(&mut self.rope);
let mut remainder_rope: Vec<Bytes> = Vec::with_capacity(2);
for piece in old_rope.drain(..) {
if remaining == 0 {
remainder_rope.push(piece);
} else if remaining >= piece.len() {
remaining -= piece.len();
chunk.push(piece);
} else {
// The cut falls inside this piece.
// Copy both halves so each gets its own Arc (unique ownership),
// which is required by RopeTape for in-place writing.
let head = BytesMut::from(&piece[..remaining]).freeze();
let tail = BytesMut::from(&piece[remaining..]).freeze();
remaining = 0;
if !head.is_empty() {
chunk.push(head);
}
if !tail.is_empty() {
remainder_rope.push(tail);
}
}
}
self.rope = remainder_rope;
chunk
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fasta::end_of_last_fasta_entry;
use crate::fastq::end_of_last_fastq_entry;
fn fasta_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fasta_entry, b"\n>")
}
fn fastq_iter(data: &'static [u8], block_size: usize) -> SeqChunkIter<&'static [u8]> {
SeqChunkIter::new(data, block_size, end_of_last_fastq_entry, b"\n@")
}
fn collect_fasta(chunks: Vec<Vec<Bytes>>) -> Vec<Vec<u8>> {
chunks.into_iter().map(|rope| {
rope.into_iter().flat_map(|b| b.to_vec()).collect()
}).collect()
}
// ── FASTA ─────────────────────────────────────────────────────────────────
#[test]
fn fasta_single_record_one_chunk() {
let data: &[u8] = b">s1\nACGT\n";
let chunks: Vec<_> = fasta_iter(data, 64).collect::<Result<_, _>>().unwrap();
assert_eq!(chunks.len(), 1);
let flat: Vec<u8> = chunks.into_iter().flatten().flat_map(|b| b.to_vec()).collect();
assert_eq!(flat, b">s1\nACGT\n");
}
#[test]
fn fasta_two_records_split_across_chunks() {
let data: &[u8] = b">s1\nACGT\n>s2\nTTTT\n";
let chunks: Vec<_> = fasta_iter(data, 10).collect::<Result<_, _>>().unwrap();
let flat = collect_fasta(chunks);
let all: Vec<u8> = flat.into_iter().flatten().collect();
assert_eq!(all, b">s1\nACGT\n>s2\nTTTT\n");
}
#[test]
fn fasta_each_chunk_ends_on_complete_record() {
let data: &[u8] = b">s1\nACGT\n>s2\nCCCC\n>s3\nGGGG\n>s4\nTTTT\n";
for block in [8, 12, 20, 100] {
let chunks: Vec<_> = fasta_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks {
let flat: Vec<u8> = rope.iter().flat_map(|b| b.to_vec()).collect();
assert_eq!(flat[0], b'>', "block={block}: chunk doesn't start with '>'");
assert_eq!(*flat.last().unwrap(), b'\n', "block={block}: chunk doesn't end with newline");
}
}
}
// ── FASTQ ─────────────────────────────────────────────────────────────────
fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (seq, qual) in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(qual);
buf.push(b'\n');
}
buf
}
#[test]
fn fastq_single_record_one_chunk() {
let data = Box::leak(make_fastq(&[(b"ACGT", b"IIII")]).into_boxed_slice());
let chunks: Vec<_> = fastq_iter(data, 64).collect::<Result<_, _>>().unwrap();
assert_eq!(chunks.len(), 1);
}
#[test]
fn fastq_at_in_quality_handled() {
let data = Box::leak(make_fastq(&[
(b"ACGTACGT", b"@@@@IIII"),
(b"TTTTTTTT", b"HHHHHHHH"),
]).into_boxed_slice());
let chunks: Vec<_> = fastq_iter(data, 16).collect::<Result<_, _>>().unwrap();
let all: Vec<u8> = chunks.into_iter().flatten().flat_map(|b| b.to_vec()).collect();
assert_eq!(all, *data);
}
#[test]
fn fastq_each_chunk_starts_with_at() {
let data = Box::leak(make_fastq(&[
(b"ACGT", b"IIII"),
(b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"),
(b"TTTT", b"LLLL"),
]).into_boxed_slice());
for block in [18, 30, 60] {
let chunks: Vec<_> = fastq_iter(data, block).collect::<Result<_, _>>().unwrap();
for rope in &chunks {
let first_byte = rope.iter().flat_map(|b| b.iter().copied()).next().unwrap();
assert_eq!(first_byte, b'@', "block={block}: chunk doesn't start with '@'");
}
}
}
}
+109
View File
@@ -0,0 +1,109 @@
//! Backward boundary detection for FASTA chunks.
use bytes::Bytes;
use crate::tape::RopeCursor;
/// Scan the rope backward for the start of the last complete FASTA entry.
///
/// Returns the absolute byte offset of the `>` that begins the last complete
/// record, so that `rope[..offset]` is a self-contained chunk and
/// `rope[offset..]` is the remainder carried into the next chunk.
/// Returns `None` if no valid boundary is found (need more data).
///
/// Port of Go's `EndOfLastFastaEntry`, now working directly on a rope
/// via [`RopeCursor`] — no contiguous packing required.
pub fn end_of_last_fasta_entry(rope: &[Bytes]) -> Option<usize> {
let total_len: usize = rope.iter().map(|b| b.len()).sum();
if total_len == 0 {
return None;
}
let mut cursor = RopeCursor::end(rope)?;
let mut state: u8 = 0;
let mut last: usize = 0;
let mut i = total_len as isize - 1;
while i >= 0 && state < 2 {
let c = cursor.peek(rope).unwrap();
match state {
0 if c == b'>' => {
state = 1;
last = i as usize;
}
1 if c == b'\n' || c == b'\r' => {
state = 2;
}
_ => {
state = 0;
}
}
i -= 1;
if i >= 0 {
cursor.previous(rope);
}
}
if i <= 0 || state != 2 {
return None;
}
Some(last)
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
fn rope(data: &[u8]) -> Vec<Bytes> {
vec![BytesMut::from(data).freeze()]
}
fn rope2(a: &[u8], b: &[u8]) -> Vec<Bytes> {
vec![BytesMut::from(a).freeze(), BytesMut::from(b).freeze()]
}
#[test]
fn single_entry_no_boundary() {
assert_eq!(end_of_last_fasta_entry(&rope(b">seq1\nACGT\n")), None);
}
#[test]
fn two_entries_cuts_at_second_header() {
let data = b">seq1\nACGT\n>seq2\nTTTT\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap();
assert_eq!(&data[pos..], b">seq2\nTTTT\n");
assert_eq!(&data[..pos], b">seq1\nACGT\n");
}
#[test]
fn three_entries_cuts_at_last_header() {
let data = b">s1\nAA\n>s2\nCC\n>s3\nGG\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap();
assert_eq!(&data[pos..], b">s3\nGG\n");
}
#[test]
fn multiline_sequence() {
let data = b">s1\nACGT\nACGT\n>s2\nTTTT\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap();
assert_eq!(&data[pos..], b">s2\nTTTT\n");
}
#[test]
fn crlf_line_endings() {
let data = b">s1\r\nACGT\r\n>s2\r\nTTTT\r\n";
let pos = end_of_last_fasta_entry(&rope(data)).unwrap();
assert_eq!(&data[pos..], b">s2\r\nTTTT\r\n");
}
#[test]
fn boundary_spans_two_blocks() {
// Split so that "\n" is at end of first block and ">" at start of second.
let a = b">s1\nACGT\n";
let b = b">s2\nTTTT\n";
let total: Vec<u8> = a.iter().chain(b.iter()).copied().collect();
let pos = end_of_last_fasta_entry(&rope2(a, b)).unwrap();
assert_eq!(&total[pos..], b">s2\nTTTT\n");
}
}
+207
View File
@@ -0,0 +1,207 @@
//! Backward boundary detection for FASTQ chunks.
//!
//! FASTQ records have a rigid 4-line structure:
//! @header
//! sequence
//! +
//! quality
//!
//! The `@` in quality lines (Phred 31 = ASCII 64) makes forward heuristics
//! unreliable. The backward scanner identifies a genuine record start by
//! verifying the structural context around each `@` candidate.
//!
//! Port of Go's `EndOfLastFastqEntry` (7-state machine), now working directly
//! on a rope via [`RopeCursor`] — no contiguous packing required.
use bytes::Bytes;
use crate::tape::RopeCursor;
#[inline]
fn is_eol(c: u8) -> bool {
c == b'\n' || c == b'\r'
}
#[inline]
fn is_sep(c: u8) -> bool {
c == b' ' || c == b'\t' || is_eol(c)
}
#[inline]
fn is_seq_char(c: u8) -> bool {
c.is_ascii_alphabetic() || c == b'-' || c == b'.' || c == b'[' || c == b']'
}
/// Scan the rope backward for the start of the last complete FASTQ record.
///
/// Returns the absolute byte offset of the `@` that begins the last complete
/// record, so that `rope[..offset]` is a self-contained chunk and
/// `rope[offset..]` is the remainder for the next chunk.
/// Returns `None` if no valid boundary is found (need more data).
pub fn end_of_last_fastq_entry(rope: &[Bytes]) -> Option<usize> {
let total_len: usize = rope.iter().map(|b| b.len()).sum();
if total_len == 0 {
return None;
}
let mut cursor = RopeCursor::end(rope)?;
let mut state: u8 = 0;
let mut restart: isize = total_len as isize - 1;
let mut restart_cursor = cursor;
let mut cut: usize = total_len;
let mut i: isize = total_len as isize - 1;
while i >= 0 && state < 7 {
let c = cursor.peek(rope).unwrap();
match state {
// Looking for `+` separator line content
0 => {
if c == b'+' {
state = 1;
restart = i;
restart_cursor = cursor;
}
}
// Found `+` — expect end-of-line immediately before it (going backward)
1 => {
if is_eol(c) {
state = 2;
} else {
state = 0;
i = restart;
cursor = restart_cursor;
}
}
// After `\n+`: skip separators, then expect sequence characters
2 => {
if is_sep(c) {
// stay
} else if is_seq_char(c) {
state = 3;
} else {
state = 0;
i = restart;
cursor = restart_cursor;
}
}
// Scanning sequence characters backward
3 => {
if is_eol(c) {
state = 4;
} else if is_seq_char(c) {
// stay
} else {
state = 0;
i = restart;
cursor = restart_cursor;
}
}
// Found end-of-line before sequence — skip any extra newlines
4 => {
if is_eol(c) {
// stay
} else {
state = 5;
}
}
// Scanning header content — looking for `@` at start of line
5 => {
if is_eol(c) {
state = 0;
i = restart;
cursor = restart_cursor;
} else if c == b'@' {
state = 6;
cut = i as usize;
}
// else: stay
}
// Found `@` — expect end-of-line before it
6 => {
if is_eol(c) {
state = 7; // success
} else {
state = 5;
}
}
_ => unreachable!(),
}
i -= 1;
if i >= 0 {
cursor.previous(rope);
}
}
if i <= 0 || state != 7 {
return None;
}
Some(cut)
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
fn rope(data: &[u8]) -> Vec<Bytes> {
vec![BytesMut::from(data).freeze()]
}
fn make_fastq(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (seq, qual) in records {
buf.extend_from_slice(b"@header\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(qual);
buf.push(b'\n');
}
buf
}
#[test]
fn single_record_no_boundary() {
let buf = make_fastq(&[(b"ACGT", b"IIII")]);
assert_eq!(end_of_last_fastq_entry(&rope(&buf)), None);
}
#[test]
fn two_records_cuts_at_second() {
let buf = make_fastq(&[(b"ACGT", b"IIII"), (b"TTTT", b"HHHH")]);
let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap();
assert_eq!(buf[pos], b'@');
assert_eq!(&buf[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice());
}
#[test]
fn three_records_cuts_at_last() {
let buf = make_fastq(&[
(b"ACGT", b"IIII"),
(b"CCCC", b"JJJJ"),
(b"GGGG", b"KKKK"),
]);
let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap();
assert_eq!(&buf[pos..], make_fastq(&[(b"GGGG", b"KKKK")]).as_slice());
}
#[test]
fn at_sign_in_quality_does_not_confuse() {
let buf = make_fastq(&[
(b"ACGTACGT", b"@@@@IIII"),
(b"TTTT", b"HHHH"),
]);
let pos = end_of_last_fastq_entry(&rope(&buf)).unwrap();
assert_eq!(&buf[pos..], make_fastq(&[(b"TTTT", b"HHHH")]).as_slice());
}
#[test]
fn crlf_line_endings() {
let buf = b"@h\r\nACGT\r\n+\r\nIIII\r\n@h\r\nTTTT\r\n+\r\nHHHH\r\n";
let pos = end_of_last_fastq_entry(&rope(buf)).unwrap();
assert_eq!(buf[pos], b'@');
assert_eq!(&buf[pos..], b"@h\r\nTTTT\r\n+\r\nHHHH\r\n");
}
}
+41
View File
@@ -0,0 +1,41 @@
//! Streaming sequence file reader for obikmer.
//!
//! Yields rope chunks (`Vec<Bytes>`) from FASTA or FASTQ files, each ending
//! on a complete sequence record boundary. Zero allocation in the common case.
#![deny(missing_docs)]
mod fasta;
mod fastq;
pub mod chunk;
pub mod normalize;
pub mod tape;
pub mod xopen;
use std::io::Read;
use chunk::SeqChunkIter;
pub use xopen::xopen;
/// Default read block size: 1 MiB.
pub const DEFAULT_BLOCK_SIZE: usize = 1024 * 1024;
/// Create a FASTA chunk iterator over `source`.
pub fn fasta_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new(
source,
DEFAULT_BLOCK_SIZE,
fasta::end_of_last_fasta_entry,
b"\n>",
)
}
/// Create a FASTQ chunk iterator over `source`.
pub fn fastq_chunks<R: Read>(source: R) -> SeqChunkIter<R> {
SeqChunkIter::new(
source,
DEFAULT_BLOCK_SIZE,
fastq::end_of_last_fastq_entry,
b"\n@",
)
}
+440
View File
@@ -0,0 +1,440 @@
//! Sequence normalisation automata for FASTA and FASTQ rope chunks.
//!
//! Both automata operate on a [`RopeTape`] in place (two-cursor, zero
//! reallocation in the common case) and produce a compact byte stream:
//! ACGT segments of length ≥ k separated by `0x00`, uppercased.
//!
//! Ambiguous bases terminate the current segment. Segments shorter than k
//! are silently discarded by retreating the write cursor.
use bytes::Bytes;
use crate::tape::{RopeCursor, RopeTape};
// ── public entry points ───────────────────────────────────────────────────────
/// Normalise a FASTA chunk: skip headers; copy, filter and concatenate
/// sequence lines across multi-line records.
///
/// Returns the written region of the tape as a rope of `Bytes`.
pub fn normalize_fasta_chunk(chunks: Vec<Bytes>, k: usize) -> Vec<Bytes> {
let mut tape = RopeTape::new(chunks);
normalize_fasta(&mut tape, k);
tape.into_output()
}
/// Normalise a FASTQ chunk: skip headers, `+` lines and quality lines;
/// copy and filter sequence lines.
///
/// Returns the written region of the tape as a rope of `Bytes`.
pub fn normalize_fastq_chunk(chunks: Vec<Bytes>, k: usize) -> Vec<Bytes> {
let mut tape = RopeTape::new(chunks);
normalize_fastq(&mut tape, k);
tape.into_output()
}
// ── FASTA automaton ───────────────────────────────────────────────────────────
/// Drive the FASTA normalisation automaton on `tape`.
///
/// After the initial header skip, the automaton reads sequence characters
/// one by one. At each newline run it peeks at the next character to decide:
/// `>` → new record (close segment, skip header); anything else → line
/// continuation within the same sequence.
fn normalize_fasta(tape: &mut RopeTape, k: usize) {
// Skip the first header line (includes the leading `>`).
skip_line(tape);
loop {
let mut seg_start = tape.write_snapshot();
'seq: loop {
let Some(c) = tape.read_next() else {
end_segment(tape, &mut seg_start, k);
return;
};
if is_newline(c) {
// Peek at the very next character — no newline run to skip.
// \r\n is handled naturally: \r → peek \n → continue; \n → peek content.
match tape.peek() {
None => {
end_segment(tape, &mut seg_start, k);
return;
}
Some(b'>') => {
// New record: close current segment, skip next header.
end_segment(tape, &mut seg_start, k);
skip_line(tape);
break 'seq; // restart outer loop
}
Some(_) => {
// Line continuation or \r before \n: next char is a nucleotide.
continue 'seq;
}
}
}
let upper = c & !0x20u8;
if is_acgt(upper) {
tape.write_next(upper);
} else {
// Ambiguous base: close segment, skip the non-ACGT run.
end_segment(tape, &mut seg_start, k);
skip_until_acgt_or_newline(tape);
seg_start = tape.write_snapshot();
}
}
// Outer loop restarts for the next record.
}
}
// ── FASTQ automaton ───────────────────────────────────────────────────────────
/// Drive the FASTQ normalisation automaton on `tape`.
///
/// The FASTQ 4-line structure (`@header`, sequence, `+`, quality) is rigid,
/// so the automaton cycles through four fixed phases without backtracking.
fn normalize_fastq(tape: &mut RopeTape, k: usize) {
loop {
// ── Phase 1: skip header line (@…\n) ─────────────────────────────
skip_line(tape);
// ── Phase 2: copy sequence ────────────────────────────────────────
let mut seg_start = tape.write_snapshot();
'seq: loop {
let Some(c) = tape.read_next() else {
// EOF inside a sequence: flush whatever we have.
end_segment(tape, &mut seg_start, k);
return;
};
if is_newline(c) {
skip_newlines(tape);
break 'seq;
}
let upper = c & !0x20u8; // ASCII uppercase trick
if is_acgt(upper) {
tape.write_next(upper);
} else {
// Ambiguous base: close the current segment, skip non-ACGT run.
end_segment(tape, &mut seg_start, k);
skip_until_acgt_or_newline(tape);
seg_start = tape.write_snapshot();
}
}
// Sequence line ended: close the last segment.
end_segment(tape, &mut seg_start, k);
// ── Phase 3: skip + line ──────────────────────────────────────────
skip_line(tape);
// ── Phase 4: skip quality line ────────────────────────────────────
skip_line(tape);
if tape.peek().is_none() {
return;
}
}
}
// ── shared helpers ────────────────────────────────────────────────────────────
/// Skip to the end of the current line, consuming the newline run.
fn skip_line(tape: &mut RopeTape) {
while let Some(c) = tape.read_next() {
if is_newline(c) {
skip_newlines(tape);
return;
}
}
}
/// Consume a contiguous run of `\n` / `\r` characters.
fn skip_newlines(tape: &mut RopeTape) {
while matches!(tape.peek(), Some(c) if is_newline(c)) {
tape.read_next();
}
}
/// Consume characters until the next ACGT base or newline (leaving it unread).
fn skip_until_acgt_or_newline(tape: &mut RopeTape) {
while let Some(c) = tape.peek() {
if is_newline(c) || is_acgt(c & !0x20u8) {
return;
}
tape.read_next();
}
}
/// Close the current segment.
///
/// - If `seg_len >= k`: write `0x00` terminator and advance `seg_start`.
/// - If `0 < seg_len < k`: erase by retreating the write cursor.
/// - If `seg_len == 0`: nothing to do.
fn end_segment(tape: &mut RopeTape, seg_start: &mut RopeCursor, k: usize) {
let seg_len = tape.written_since(*seg_start);
if seg_len >= k {
tape.write_next(0x00);
} else if seg_len > 0 {
tape.write_retreat(seg_len);
}
*seg_start = tape.write_snapshot();
}
#[inline] fn is_newline(c: u8) -> bool { c == b'\n' || c == b'\r' }
#[inline] fn is_acgt(upper: u8) -> bool { matches!(upper, b'A' | b'C' | b'G' | b'T') }
// ── tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
fn run(data: &[u8], k: usize) -> Vec<u8> {
let chunks = vec![BytesMut::from(data).freeze()];
normalize_fastq_chunk(chunks, k)
.into_iter()
.flat_map(|b| b.to_vec())
.collect()
}
fn make_fastq(records: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::new();
for seq in records {
buf.extend_from_slice(b"@hdr\n");
buf.extend_from_slice(seq);
buf.push(b'\n');
buf.extend_from_slice(b"+\n");
buf.extend_from_slice(&vec![b'I'; seq.len()]);
buf.push(b'\n');
}
buf
}
// ── basic output format ───────────────────────────────────────────────────
#[test]
fn single_record_produces_seq_then_null() {
let out = run(&make_fastq(&[b"ACGTACGT"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn two_records_concatenated() {
let out = run(&make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]), 4);
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── uppercase normalisation ───────────────────────────────────────────────
#[test]
fn lowercase_input_uppercased() {
let out = run(&make_fastq(&[b"acgtacgt"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn mixed_case_uppercased() {
let out = run(&make_fastq(&[b"AcGtAcGt"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
// ── k filter ─────────────────────────────────────────────────────────────
#[test]
fn sequence_shorter_than_k_discarded() {
let out = run(&make_fastq(&[b"ACG"]), 4);
assert_eq!(out, b"");
}
#[test]
fn sequence_exactly_k_kept() {
let out = run(&make_fastq(&[b"ACGT"]), 4);
assert_eq!(out, b"ACGT\x00");
}
#[test]
fn short_record_among_valid_ones_discarded() {
let out = run(&make_fastq(&[b"ACGTACGT", b"AC", b"TTTTTTTT"]), 4);
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── ambiguous bases ───────────────────────────────────────────────────────
#[test]
fn ambiguous_splits_into_two_segments() {
// 'N' in the middle splits "ACGT" + "ACGT"
let out = run(&make_fastq(&[b"ACGTNACGT"]), 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
#[test]
fn segment_after_ambiguous_too_short_discarded() {
// "ACGTACGT" + 'N' + "AC" (< k=4)
let out = run(&make_fastq(&[b"ACGTACGTNAC"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn consecutive_ambiguous_produce_no_empty_segment() {
let out = run(&make_fastq(&[b"ACGTNNNNACGT"]), 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
#[test]
fn ambiguous_at_start_skipped() {
let out = run(&make_fastq(&[b"NNACGTACGT"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn ambiguous_at_end_produces_no_trailing_empty() {
let out = run(&make_fastq(&[b"ACGTACGTNN"]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
// ── CRLF line endings ─────────────────────────────────────────────────────
#[test]
fn crlf_handled() {
let data = b"@hdr\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n";
let out = run(data, 4);
assert_eq!(out, b"ACGTACGT\x00");
}
// ── FASTA ─────────────────────────────────────────────────────────────────
fn run_fasta(data: &[u8], k: usize) -> Vec<u8> {
let chunks = vec![BytesMut::from(data).freeze()];
normalize_fasta_chunk(chunks, k)
.into_iter()
.flat_map(|b| b.to_vec())
.collect()
}
fn make_fasta(records: &[(&[u8], &[u8])]) -> Vec<u8> {
let mut buf = Vec::new();
for (id, seq) in records {
buf.push(b'>');
buf.extend_from_slice(id);
buf.push(b'\n');
buf.extend_from_slice(seq);
buf.push(b'\n');
}
buf
}
#[test]
fn fasta_single_record() {
let out = run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT")]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn fasta_two_records() {
let out = run_fasta(&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]), 4);
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
#[test]
fn fasta_multiline_sequence_concatenated() {
let data = b">s1\nACGT\nACGT\nACGT\n";
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGTACGT\x00");
}
#[test]
fn fasta_lowercase_uppercased() {
let out = run_fasta(&make_fasta(&[(b"s1", b"acgtacgt")]), 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn fasta_short_record_discarded() {
let out = run_fasta(&make_fasta(&[(b"s1", b"ACG")]), 4);
assert_eq!(out, b"");
}
#[test]
fn fasta_short_among_valid_discarded() {
let out = run_fasta(
&make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"AC"), (b"s3", b"TTTTTTTT")]),
4,
);
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
#[test]
fn fasta_ambiguous_splits_segments() {
let data = b">s1\nACGTNACGT\n";
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
#[test]
fn fasta_ambiguous_across_line_boundary() {
// 'N' at start of second line — still ambiguous
let data = b">s1\nACGT\nNACGT\n";
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGT\x00ACGT\x00");
}
#[test]
fn fasta_ambiguous_short_segment_discarded() {
let data = b">s1\nACGTACGTNAC\n";
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn fasta_no_trailing_newline() {
let data = b">s1\nACGTACGT";
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGT\x00");
}
#[test]
fn fasta_crlf_line_endings() {
let data = b">s1\r\nACGT\r\nACGT\r\n>s2\r\nTTTT\r\n";
let out = run_fasta(data, 4);
assert_eq!(out, b"ACGTACGT\x00TTTT\x00");
}
#[test]
fn fasta_multi_slice_rope() {
let data = make_fasta(&[(b"s1", b"ACGTACGT"), (b"s2", b"TTTTTTTT")]);
let mid = data.len() / 2;
let chunks = vec![
BytesMut::from(&data[..mid]).freeze(),
BytesMut::from(&data[mid..]).freeze(),
];
let out: Vec<u8> = normalize_fasta_chunk(chunks, 4)
.into_iter()
.flat_map(|b| b.to_vec())
.collect();
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
// ── multi-record rope (multiple Bytes slices) ─────────────────────────────
#[test]
fn multi_slice_rope() {
let data = make_fastq(&[b"ACGTACGT", b"TTTTTTTT"]);
// Split into two slices at an arbitrary boundary.
let mid = data.len() / 2;
let chunks = vec![
BytesMut::from(&data[..mid]).freeze(),
BytesMut::from(&data[mid..]).freeze(),
];
let out: Vec<u8> = normalize_fastq_chunk(chunks, 4)
.into_iter()
.flat_map(|b| b.to_vec())
.collect();
assert_eq!(out, b"ACGTACGT\x00TTTTTTTT\x00");
}
}
+427
View File
@@ -0,0 +1,427 @@
//! Two-cursor tape over a rope of `BytesMut` slices.
//!
//! A [`RopeCursor`] is a position `(slice, byte)` within a rope.
//! Read and write cursors are the same type — the only difference is which
//! primitive they call: [`RopeCursor::peek`] vs [`RopeCursor::poke`].
//!
//! The read-only methods (`peek`, `advance`, `retreat`, `distance`) are generic
//! over `T: std::ops::Deref<Target = [u8]>` and work on both `Vec<Bytes>` and
//! `Vec<BytesMut>`. The write method (`poke`) is specific to `BytesMut`.
//!
//! [`RopeTape`] owns the rope and two cursors, and exposes the four Turing-machine
//! primitives used by the sequence normalisation automata.
use std::ops::Deref;
use bytes::{Bytes, BytesMut};
// ── RopeCursor ────────────────────────────────────────────────────────────────
/// A position within a rope (`Vec<BytesMut>`), usable as either a read or write head.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RopeCursor {
slice: usize,
byte: usize,
}
impl RopeCursor {
/// Position at the very start of the rope.
#[inline]
pub fn new() -> Self {
Self { slice: 0, byte: 0 }
}
/// Read the byte at the current position without moving.
/// Returns `None` at end of rope.
#[inline]
pub fn peek<T: Deref<Target = [u8]>>(&self, rope: &[T]) -> Option<u8> {
rope.get(self.slice)?.get(self.byte).copied()
}
/// Write `b` at the current position without moving.
#[inline]
pub fn poke(&self, rope: &mut [BytesMut], b: u8) {
rope[self.slice][self.byte] = b;
}
/// Advance by one byte, crossing slice boundaries transparently.
/// Returns `false` when the end of the rope is reached.
#[inline]
pub fn advance<T: Deref<Target = [u8]>>(&mut self, rope: &[T]) -> bool {
if self.slice >= rope.len() {
return false;
}
self.byte += 1;
if self.byte >= rope[self.slice].len() {
self.slice += 1;
self.byte = 0;
}
self.slice < rope.len()
}
/// Position at the last byte of the rope.
///
/// Returns `None` if the rope is empty or all slices are empty.
pub fn end<T: Deref<Target = [u8]>>(rope: &[T]) -> Option<Self> {
for (i, slice) in rope.iter().enumerate().rev() {
if !slice.is_empty() {
return Some(Self { slice: i, byte: slice.len() - 1 });
}
}
None
}
/// Retreat by one byte and return the byte now under the cursor.
///
/// Returns `None` if already at the beginning of the rope.
/// This is the inverse of the read-advance pattern used by `RopeTape::read_next`.
pub fn previous<T: Deref<Target = [u8]>>(&mut self, rope: &[T]) -> Option<u8> {
if self.slice == 0 && self.byte == 0 {
return None;
}
self.retreat(1, rope);
self.peek(rope)
}
/// Retreat by `n` bytes, crossing slice boundaries transparently.
///
/// Panics in debug mode if retreating past the beginning of the rope.
pub fn retreat<T: Deref<Target = [u8]>>(&mut self, mut n: usize, rope: &[T]) {
while n > 0 {
if self.byte >= n {
self.byte -= n;
return;
}
// Consume all bytes in current slice and cross boundary.
n -= self.byte + 1;
debug_assert!(self.slice > 0, "retreat past beginning of rope");
self.slice -= 1;
self.byte = rope[self.slice].len() - 1;
}
}
/// Number of bytes from `from` (inclusive) to `to` (exclusive).
///
/// Requires `from` ≤ `to` in rope order.
pub fn distance<T: Deref<Target = [u8]>>(from: &Self, to: &Self, rope: &[T]) -> usize {
if from.slice == to.slice {
to.byte - from.byte
} else {
let mut d = rope[from.slice].len() - from.byte;
for s in (from.slice + 1)..to.slice {
d += rope[s].len();
}
d += to.byte;
d
}
}
}
impl Default for RopeCursor {
fn default() -> Self { Self::new() }
}
// ── RopeTape ──────────────────────────────────────────────────────────────────
/// A mutable rope with independent read and write cursors.
///
/// The write cursor always stays at or behind the read cursor — this invariant
/// is guaranteed structurally by the formats (FASTA headers and FASTQ
/// non-sequence lines are consumed before any sequence byte reaches the same
/// position).
pub struct RopeTape {
rope: Vec<BytesMut>,
read: RopeCursor,
write: RopeCursor,
}
impl RopeTape {
/// Build a tape from a chunk produced by [`crate::chunk::SeqChunkIter`].
///
/// Each `Bytes` is converted to `BytesMut` in O(1): succeeds because the
/// chunk reader yields uniquely-owned, non-shared buffers.
pub fn new(chunks: Vec<Bytes>) -> Self {
let rope = chunks
.into_iter()
.map(|b| b.try_into_mut().expect("Bytes not uniquely owned"))
.collect();
Self { rope, read: RopeCursor::new(), write: RopeCursor::new() }
}
/// Peek at the byte under the read cursor without advancing.
#[inline]
pub fn peek(&self) -> Option<u8> {
self.read.peek(&self.rope)
}
/// Read the byte under the read cursor and advance it.
#[inline]
pub fn read_next(&mut self) -> Option<u8> {
let b = self.read.peek(&self.rope)?;
self.read.advance(&self.rope);
Some(b)
}
/// Write `b` at the write cursor and advance it.
#[inline]
pub fn write_next(&mut self, b: u8) {
self.write.poke(&mut self.rope, b);
self.write.advance(&self.rope);
}
/// Retreat the read cursor by one byte and return the byte now under it.
///
/// This is the inverse of `read_next`: move back one position and read.
/// Returns `None` if already at the beginning of the tape.
#[inline]
pub fn previous(&mut self) -> Option<u8> {
self.read.previous(&self.rope)
}
/// Retreat the write cursor by `n` bytes (to erase a sequence shorter than k).
#[inline]
pub fn write_retreat(&mut self, n: usize) {
self.write.retreat(n, &self.rope);
}
/// Snapshot the current write position, typically used as `seq_start`.
#[inline]
pub fn write_snapshot(&self) -> RopeCursor {
self.write
}
/// Number of bytes written since `snapshot`.
#[inline]
pub fn written_since(&self, snapshot: RopeCursor) -> usize {
RopeCursor::distance(&snapshot, &self.write, &self.rope)
}
/// Consume the tape and return the written region as a `Vec<Bytes>`.
///
/// Slices fully overwritten are returned in full; the last partial slice is
/// truncated to the write position; all remaining (unread) slices are dropped.
pub fn into_output(mut self) -> Vec<Bytes> {
let ws = self.write.slice;
let wb = self.write.byte;
if ws < self.rope.len() {
self.rope.truncate(ws + 1);
if let Some(last) = self.rope.last_mut() {
last.truncate(wb);
}
}
self.rope
.into_iter()
.filter(|s| !s.is_empty())
.map(BytesMut::freeze)
.collect()
}
}
// ── tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn rope(slices: &[&[u8]]) -> Vec<BytesMut> {
slices.iter().map(|s| BytesMut::from(*s)).collect()
}
/// Mimics what SeqChunkIter produces: Bytes allocated from BytesMut (uniquely owned).
fn owned_bytes(data: &[u8]) -> Bytes {
BytesMut::from(data).freeze()
}
// ── RopeCursor::peek ──────────────────────────────────────────────────────
#[test]
fn peek_first_byte() {
let r = rope(&[b"ACGT"]);
assert_eq!(RopeCursor::new().peek(&r), Some(b'A'));
}
#[test]
fn peek_past_end_returns_none() {
let r = rope(&[b"A"]);
let mut c = RopeCursor::new();
c.advance(&r);
assert_eq!(c.peek(&r), None);
}
// ── RopeCursor::advance ───────────────────────────────────────────────────
#[test]
fn advance_crosses_slice_boundary() {
let r = rope(&[b"AC", b"GT"]);
let mut c = RopeCursor::new();
c.advance(&r); // → 'C'
c.advance(&r); // → 'G' (slice 1)
assert_eq!(c.peek(&r), Some(b'G'));
}
#[test]
fn advance_at_end_returns_false() {
let r = rope(&[b"A"]);
let mut c = RopeCursor::new();
let still_in = c.advance(&r);
assert!(!still_in);
assert_eq!(c.peek(&r), None);
}
// ── RopeCursor::poke ──────────────────────────────────────────────────────
#[test]
fn poke_writes_without_advancing() {
let mut r = rope(&[b"XXXX"]);
let c = RopeCursor::new();
c.poke(&mut r, b'A');
assert_eq!(r[0][0], b'A');
assert_eq!(r[0][1], b'X'); // unchanged
// cursor position unchanged: peek still at 0
assert_eq!(c.peek(&r), Some(b'A'));
}
// ── RopeCursor::retreat ───────────────────────────────────────────────────
#[test]
fn retreat_within_slice() {
let r = rope(&[b"ACGT"]);
let mut c = RopeCursor::new();
c.advance(&r); c.advance(&r); c.advance(&r); // → 'T'
c.retreat(2, &r);
assert_eq!(c.peek(&r), Some(b'C'));
}
#[test]
fn retreat_crosses_slice_boundary() {
let r = rope(&[b"ACGT", b"TTTT"]);
let mut c = RopeCursor::new();
for _ in 0..4 { c.advance(&r); } // → first 'T' of slice 1
c.retreat(1, &r);
assert_eq!(c.peek(&r), Some(b'T')); // last byte of "ACGT"
}
#[test]
fn retreat_exactly_to_slice_start() {
let r = rope(&[b"ACGT", b"TTTT"]);
let mut c = RopeCursor::new();
for _ in 0..5 { c.advance(&r); } // → second 'T' of slice 1
c.retreat(5, &r); // back to very first byte
assert_eq!(c.peek(&r), Some(b'A'));
}
#[test]
fn retreat_zero_is_noop() {
let r = rope(&[b"ACGT"]);
let mut c = RopeCursor::new();
c.advance(&r); c.advance(&r); // → 'G'
c.retreat(0, &r);
assert_eq!(c.peek(&r), Some(b'G'));
}
// ── RopeCursor::distance ──────────────────────────────────────────────────
#[test]
fn distance_same_slice() {
let r = rope(&[b"ACGTACGT"]);
let mut a = RopeCursor::new();
let mut b = RopeCursor::new();
b.advance(&r); b.advance(&r); b.advance(&r); // at offset 3
a.advance(&r); // at offset 1
assert_eq!(RopeCursor::distance(&a, &b, &r), 2);
}
#[test]
fn distance_across_slices() {
let r = rope(&[b"ACGT", b"TTTT"]);
let from = RopeCursor::new(); // (0,0)
let mut to = RopeCursor::new();
for _ in 0..6 { to.advance(&r); } // (1,2)
assert_eq!(RopeCursor::distance(&from, &to, &r), 6);
}
#[test]
fn distance_zero() {
let r = rope(&[b"ACGT"]);
let c = RopeCursor::new();
assert_eq!(RopeCursor::distance(&c, &c, &r), 0);
}
// ── RopeTape ──────────────────────────────────────────────────────────────
#[test]
fn tape_read_skips_write_copies() {
// Simulate: 5 bytes header, then 4 bytes sequence.
// Read 5 without writing, then copy 4.
let chunks = vec![owned_bytes(b">hdr\nACGT")];
let mut tape = RopeTape::new(chunks);
for _ in 0..5 { tape.read_next(); } // skip ">hdr\n"
for _ in 0..4 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
let out: Vec<u8> = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect();
assert_eq!(out, b"ACGT");
}
#[test]
fn tape_write_retreat_erases() {
let chunks = vec![owned_bytes(b"XXXXXXACGT")];
let mut tape = RopeTape::new(chunks);
// Write 3 bytes, then retreat — simulates a sequence < k being discarded.
let snap = tape.write_snapshot();
for _ in 0..3 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
assert_eq!(tape.written_since(snap), 3);
tape.write_retreat(3); // erase the 3 bytes
assert_eq!(tape.written_since(snap), 0);
// Now write 4 more bytes (the "ACGT").
for _ in 0..3 { tape.read_next(); } // skip "XXX"
for _ in 0..4 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
let out: Vec<u8> = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect();
assert_eq!(out, b"ACGT");
}
#[test]
fn tape_into_output_exact_boundary() {
// Write exactly until the last byte of the first slice, nothing more.
let chunks = vec![
owned_bytes(b"ACGT"),
owned_bytes(b"TTTT"),
];
let mut tape = RopeTape::new(chunks);
for _ in 0..4 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
// Skip the rest
let out: Vec<u8> = tape.into_output().into_iter().flat_map(|b| b.to_vec()).collect();
assert_eq!(out, b"ACGT");
}
#[test]
fn tape_written_since_across_slices() {
let chunks = vec![
owned_bytes(b"XXXX"),
owned_bytes(b"YYYY"),
];
let mut tape = RopeTape::new(chunks);
let snap = tape.write_snapshot();
for _ in 0..6 {
let b = tape.read_next().unwrap();
tape.write_next(b);
}
assert_eq!(tape.written_since(snap), 6);
}
}
+66
View File
@@ -0,0 +1,66 @@
//! Transparent reader for local files, HTTP/HTTPS URLs, and stdin.
//!
//! Compression is detected from the magic bytes (not the file extension),
//! so gzip, bzip2, xz and zstd files are decompressed automatically regardless
//! of whether they carry a `.gz` / `.bz2` / `.xz` / `.zst` suffix.
//!
//! # Source strings
//!
//! | Prefix | Behaviour |
//! |--------|-----------|
//! | `-` | reads from stdin |
//! | `http://` or `https://` | HTTP GET via `ureq` |
//! | anything else | local file; `~/` is expanded to the home directory |
use std::io::{self, Read};
use std::fs::File;
// ── public API ────────────────────────────────────────────────────────────────
/// Open any source for reading, with transparent decompression.
///
/// Returns a `Box<dyn Read>` that yields uncompressed bytes regardless of
/// whether the underlying source is plain text, gzip, bzip2, xz or zstd.
///
/// # Errors
/// Returns an `io::Error` if the file cannot be opened, the URL cannot be
/// fetched, or the compression header is malformed.
pub fn xopen(source: &str) -> io::Result<Box<dyn Read + Send>> {
let raw: Box<dyn Read + Send> = match source {
"-" => Box::new(io::stdin()),
s if s.starts_with("http://") || s.starts_with("https://") => {
http_reader(s)?
}
path => {
let expanded = expand_tilde(path);
Box::new(File::open(expanded.as_ref())?)
}
};
decompress(raw)
}
// ── internal helpers ──────────────────────────────────────────────────────────
fn http_reader(url: &str) -> io::Result<Box<dyn Read + Send>> {
ureq::get(url)
.call()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
.map(|resp| -> Box<dyn Read + Send> { Box::new(resp.into_reader()) })
}
fn decompress(raw: Box<dyn Read + Send>) -> io::Result<Box<dyn Read + Send>> {
niffler::send::get_reader(raw)
.map(|(reader, _fmt)| reader)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
}
fn expand_tilde(path: &str) -> std::borrow::Cow<'_, str> {
if path.starts_with("~/") {
if let Some(home) = std::env::var_os("HOME") {
let mut expanded = home.to_string_lossy().into_owned();
expanded.push_str(&path[1..]);
return std::borrow::Cow::Owned(expanded);
}
}
std::borrow::Cow::Borrowed(path)
}