From 5169f65dc904a71a0f58a1d537b20ada78c833f9 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 9 May 2026 17:20:08 +0800 Subject: [PATCH] feat: implement persistent layered index and chunked binary format Introduce the `obilayeredmap` specification and persistent MPHF-based index architecture for incremental multi-dataset indexing. Implement chunked binary serialization with a fixed `u8` k-mer count limit (256) and overlapping super-kmer segments. Add memory-mapped I/O and a companion `.idx` index file for allocation-free, O(1) unitig access. Update MkDocs navigation, enhance the k-mer comparison script, and add comprehensive tests for serialization, partitioning, and file I/O pipelines. --- docmd/implementation/mphf.md | 61 ++++- docmd/implementation/obilayeredmap.md | 168 ++++++++++++++ docmd/implementation/unitig_evidence.md | 4 +- mkdocs.yml | 1 + scripts/compare_kmers.py | 11 +- src/Cargo.lock | 5 + src/obikpartitionner/Cargo.toml | 7 + src/obikpartitionner/src/partition.rs | 116 ++++++++++ src/obikseq/src/lib.rs | 1 + src/obikseq/src/packed_seq.rs | 35 ++- src/obikseq/src/superkmer.rs | 34 ++- src/obikseq/src/tests/superkmer.rs | 47 +++- src/obikseq/src/tests/unitig.rs | 7 +- src/obiskbuilder/src/iter.rs | 66 +++++- src/obiskio/Cargo.toml | 1 + src/obiskio/src/codec.rs | 62 +---- src/obiskio/src/lib.rs | 2 + src/obiskio/src/pool.rs | 228 +------------------ src/obiskio/src/reader.rs | 69 +----- src/obiskio/src/tests/codec.rs | 64 ++++++ src/obiskio/src/tests/pool.rs | 217 ++++++++++++++++++ src/obiskio/src/tests/reader.rs | 63 ++++++ src/obiskio/src/tests/unitig_index.rs | 169 ++++++++++++++ src/obiskio/src/unitig_index.rs | 286 ++++++++++++++++++++++++ 24 files changed, 1342 insertions(+), 382 deletions(-) create mode 100644 docmd/implementation/obilayeredmap.md create mode 100644 src/obiskio/src/tests/codec.rs create mode 100644 src/obiskio/src/tests/pool.rs create mode 100644 src/obiskio/src/tests/reader.rs create mode 100644 src/obiskio/src/tests/unitig_index.rs create mode 100644 src/obiskio/src/unitig_index.rs diff --git a/docmd/implementation/mphf.md b/docmd/implementation/mphf.md index b7a6b05..cc7c244 100644 --- a/docmd/implementation/mphf.md +++ b/docmd/implementation/mphf.md @@ -49,7 +49,7 @@ Build a new MPHF over the filtered kmer set only, with the exact key count avail **Phase 1** (provisional, discarded after spectrum computation): FMPHGO. Tolerates overestimated capacity, compact, no need to optimise for query speed on a temporary structure. -**Phase 2** (persistent, queried repeatedly): open between FMPHGO and ptr_hash. Exact key count is available, so both operate optimally. ptr_hash's query speed advantage (2.1–3.3×) is meaningful for the persistent index but carries the risk of a very young crate. FMPHGO is the conservative default; ptr_hash is worth revisiting once it has broader production use. +**Phase 2** (persistent, queried repeatedly): **ptr_hash**. Exact key count is available at phase 2, so ptr_hash operates optimally. Its query speed (≥2.1× over FMPHGO) and construction speed (≥3.1×) are meaningful for the persistent index; the space overhead at 2.4 bits/key is acceptable. The crate's youth (Feb 2025) was previously a concern; it is now accepted given the performance profile and the fact that each layer MPHF is independently rebuildable from its unitig file if needed. boomphf is effectively eliminated: its space overhead is the largest and its streaming-construction advantage does not apply here. @@ -73,9 +73,64 @@ All three are in-memory structures. Their internal representation is flat bit ar No established Rust crate provides a natively on-disk MPHF. **SSHash** (Sparse and Skew Hash) is a complete kmer dictionary designed for disk access and is order-preserving (overlapping kmers receive consecutive indices → cache-friendly count access), but it is C++-only and covers more than just the MPHF layer. +--- + +## Multilayer index architecture + +### Motivation + +An index built from a single dataset A can be extended with a new dataset B without rebuilding. This supports incremental construction (adding species, samples, or sequencing runs) and enables set operations across heterogeneous sources. + +### Layer structure + +Each layer is a self-contained unit: + +``` +layer_i/ + unitigs.bin — packed 2-bit nucleotide sequences + mphf.bin — ptr_hash index (phase-2, exact key count) + evidence.bin — [(unitig_id, rank)] per MPHF slot (see unitig_evidence.md) + counts.bin — [u32] per MPHF slot +``` + +Layers are **disjoint**: a canonical kmer belongs to exactly one layer. Layer 0 is built from dataset A. Adding dataset B proceeds as follows: + +1. For each kmer in B: query layer 0 — if found, accumulate count into `counts_0[MPHF_0(kmer)]`. +2. Collect all kmers of B not present in any existing layer → set `B \ A`. +3. Build layer 1 from `B \ A` using the standard two-phase pipeline (spectrum, filter, ptr_hash). + +Adding a third dataset C repeats the process: probe layer 0, then layer 1, then build layer 2 from `C \ A \ B`. + +### Membership verification + +ptr_hash maps any input to a valid slot — it does not natively detect absent keys. Membership is verified using the evidence entry: decode the kmer from `(unitig_id, rank)` and compare to the query. A mismatch means the kmer is absent from this layer; probe the next layer. + +This makes the evidence layer load-bearing for correctness, not only for locality. + +### Query algorithm + +``` +fn query(kmer) → Option: + for layer in layers: + slot = layer.mphf.query(kmer) + if layer.evidence.decode(slot) == kmer: + return Some(layer.counts[slot]) + return None +``` + +Expected probe depth: 1 for kmers present in layer 0, increasing for rare kmers added in later layers. In practice, the dominant dataset (largest A) should be layer 0 to minimise average probe depth. + +### Layer count and probe cost + +Each probe is a ptr_hash lookup (~10 ns) plus one evidence decode (two array accesses). For L layers the worst case is L probes + 1 None. In practice L is small (2–5 for typical multi-species databases). No global data structure is needed to route queries; the layer chain is traversed in order. + +### Merging layers + +Two layer chains can be merged by re-indexing their union through the standard pipeline. This is expensive (full rebuild) but produces an optimal single-layer index. Merge is a maintenance operation, not a query-path requirement. + ## Open questions - Confirm actual partition sizes and overestimation factor on representative metagenomic datasets. -- Revisit ptr_hash for phase 2 once the crate has broader production track record. -- Assess rkyv integration cost for FMPHGO if true zero-copy mmap becomes necessary for the persistent index. +- **rkyv integration**: all flat arrays in a layer (evidence, counts, presence/absence matrix) map trivially to `rkyv::Archive` — fixed-size element types, no heap indirection. The presence/absence matrix is the strongest case: at 10 M kmers × 1 000 samples ≈ 1.25 GB per partition, zero-copy mmap via rkyv avoids loading the entire matrix at open time, letting the OS page cache serve only accessed pages. ptr_hash itself is internally a flat bit array and is structurally compatible with rkyv, but requires either native crate support or a wrapper. Assess the wrapper cost and whether ptr_hash is willing to adopt rkyv upstream. - Keep SSHash in mind if the indexing architecture is reconsidered at a higher level. +- Determine optimal layer ordering heuristic (by kmer count? by query frequency?) for multi-species databases. diff --git a/docmd/implementation/obilayeredmap.md b/docmd/implementation/obilayeredmap.md new file mode 100644 index 0000000..44c6473 --- /dev/null +++ b/docmd/implementation/obilayeredmap.md @@ -0,0 +1,168 @@ +# obilayeredmap — layered kmer index crate + +## Purpose + +`obilayeredmap` implements a persistent, incrementally extensible kmer index. The index is organised in three levels: **collection → partition → layer**. Each layer covers a disjoint kmer set (kmers absent from all earlier layers), wrapping a `ptr_hash` MPHF with associated per-slot data. Adding a new dataset never rebuilds existing layers. + +--- + +## Three-level hierarchy + +``` +index_root/ ← LayeredMap (collection) + meta.json + part_00000/ ← Partition + layer_0/ ← Layer + mphf.bin + unitigs.bin + evidence.bin + counts.bin + presence.bin + layer_1/ + ... + part_00001/ + layer_0/ + layer_1/ + ... +``` + +**Collection** (`index_root/`): global metadata — kmer size k, number of partitions, layer count, sample registry. + +**Partition** (`part_XXXXX/`): one directory per hash bucket. All kmers whose canonical minimiser hashes to bucket X land in `part_XXXXX`. Partitions are independent and can be processed in parallel. The partition count and routing scheme (minimiser → bucket) are fixed at collection creation and recorded in `meta.json`. + +**Layer** (`layer_N/`): within a partition, a layer is the MPHF and its associated data for one dataset addition. Layer 0 is built from the first dataset A; layer 1 covers kmers in B not present in layer 0; and so on. Layers within a partition are disjoint: each kmer belongs to exactly one layer. + +--- + +## Layer file layout + +``` +layer_N/ + mphf.bin — ptr_hash MPHF (exact key count, phase-2 construction) + unitigs.bin — packed 2-bit nucleotide sequences (concatenated, variable-length) + unitig_offsets.bin — u32 per unitig: nucleotide offset of unitig j in unitigs.bin + evidence.bin — u32 per MPHF slot: (unitig_id: 25 | rank: 7) + counts.bin — u32 per MPHF slot (total kmer occurrences) + presence.bin — bit matrix: n_slots × n_samples [optional] +``` + +Unitigs have variable lengths. Each record in `unitigs.bin` is self-delimiting: it begins with a varint `seql` (sequence length in nucleotides) followed by `(seql+3)/4` packed bytes — the streaming format defined in `obiskio`. Sequential scan is always possible using the in-record `seql`. + +For O(1) random access, `unitig_offsets.bin` is a **precomputed derived index**: a u32 array of byte offsets into `unitigs.bin`, with n_unitigs + 1 entries (sentinel = total byte size). Built once at construction by a single sequential scan; reconstructible from `unitigs.bin` if lost. Access: `unitigs.bin[offsets[j] .. offsets[j+1]]`. + +All files except `mphf.bin` are flat arrays of fixed-size elements, serialised with **rkyv** for zero-copy mmap access. `mphf.bin` uses ptr_hash's native serialisation; rkyv integration is deferred (see open questions). + +### Evidence encoding + +Evidence maps each MPHF slot to its kmer's location in the unitig file. It serves two roles: membership verification (ptr_hash maps any input to a valid slot; decoding evidence and comparing to the query detects absent keys) and kmer reconstruction. + +``` +slot s → unitig_id: u25 | rank: u7 +``` + +Packed into a `u32` (29 bits used, 3 spare). Decoding: + +``` +kmer = unitigs[unitig_id][rank .. rank + k] // 2-bit packed slice +``` + +`rank` is the kmer's 0-based index within the unitig (kmer units, not nucleotides). For k=31, m=11, the structural maximum is k − m + 1 = 21 kmers per unitig; the empirical maximum observed is ~46 kmers. A `u7` (0–127) is sufficient. + +### Presence/absence matrix + +Column-major bit matrix: column j (sample j) is a contiguous `n_slots`-bit array. This layout makes per-sample operations (union, intersection, diff over a column) cache-friendly. For large matrices (e.g. 10 M slots × 1 000 samples ≈ 1.25 GB per partition), rkyv + mmap avoids loading the full matrix at open time. + +--- + +## Query path + +A kmer query routes through all three levels: + +1. **Partition routing**: hash canonical minimiser of the query kmer → partition index → open `part_XXXXX/`. +2. **Layer probing**: iterate layers in order within the partition; for each layer compute `slot = mphf.query(kmer)`, then verify `evidence.decode(slot) == kmer`. First match wins. +3. **Data access**: read `counts[slot]` and/or `presence[slot]` from the matching layer. + +``` +fn query(kmer) → Option: + part = partition_of(kmer) + for (i, layer) in part.layers.iter().enumerate(): + slot = layer.mphf.query(kmer) + if layer.evidence.decode(slot) == kmer: + return Some(Hit { layer: i, slot }) + return None +``` + +Expected probe depth: 1 for kmers in layer 0, increasing for later layers. In practice the dominant dataset should be layer 0. + +--- + +## Add-layer algorithm + +When adding dataset B to an existing index: + +1. For each partition, iterate kmers of B routed to that partition. +2. Probe existing layers; collect kmers absent from all layers → `B \ index`. +3. Build a new layer from `B \ index` using the two-phase pipeline (FMPHGO provisional → ptr_hash definitive). +4. Append the new layer directory under each `part_XXXXX/`. +5. Update `meta.json` (layer count, sample registry). + +Each partition's new layer is built independently; the operation is fully parallel across partitions. + +--- + +## Core API (sketch) + +```rust +// Open an existing index +let map = LayeredMap::open(path)?; + +// Query a canonical kmer across all partitions and layers +match map.query(kmer) { + Some(hit) => { + let count = hit.count(); + let present = hit.presence_row(); // bit slice over samples + } + None => { /* absent */ } +} + +// Non-destructive extension with a new dataset +// unitigs produced by the two-phase pipeline, one per partition +let layer_idx = map.add_layer(unitigs_dir, counts_dir, presence_path)?; +``` + +--- + +## Dependencies + +| crate | role | +|---|---| +| `ptr_hash` | phase-2 MPHF per layer | +| `ph` (FMPHGO) | phase-1 provisional MPHF during layer construction | +| `rkyv` | zero-copy serialisation of flat arrays (evidence, counts, presence) | +| `memmap2` | mmap of layer files | +| `bitm` | bit-packed presence matrix | + +--- + +## Serialisation strategy + +All flat arrays use `rkyv::Archive`: + +```rust +#[derive(Archive, Serialize, Deserialize)] +struct Evidence { slots: Vec } // packed (unitig_id: 25 | rank: 7) + +#[derive(Archive, Serialize, Deserialize)] +struct Counts { data: Vec } +``` + +At open time, each file is mmapped and cast to its archived type — no allocation, no copy. The MPHF is loaded via ptr_hash's own API; a rkyv wrapper is a future refinement. + +--- + +## Open questions + +- **ptr_hash + rkyv**: ptr_hash's internals are flat bit arrays; a rkyv-compatible wrapper is structurally feasible. Assess upstream willingness or implement a thin newtype wrapper. +- **Presence matrix layout**: column-major favours per-sample operations; row-major favours per-kmer queries. Decide based on dominant access pattern. +- **Layer merge**: merging two `LayeredMap` instances into a single-layer index requires full rebuild. Define API and cost model; maintenance operation, not query-path. +- **Canonical kmer orientation**: evidence stores canonical kmer; strand recovery requires one 64-bit revcomp comparison at query time. diff --git a/docmd/implementation/unitig_evidence.md b/docmd/implementation/unitig_evidence.md index 1628689..e5a97a8 100644 --- a/docmd/implementation/unitig_evidence.md +++ b/docmd/implementation/unitig_evidence.md @@ -69,7 +69,9 @@ Consequence for `u8` capacity: | nucleotides | 255 nuc | 225 kmers | | **kmers** | **255 kmers** | **285 nuc** | -On *Betula nana* (k=31, 256 partitions), m_u ≈ 37.9 kmers/unitig on average; no unitig length distribution data measured yet. The `rank` field (kmer index within the unitig) fits in a `u8` as long as no unitig exceeds 255 kmers — guaranteed by the split strategy below. +**Structural maximum from superkmer construction.** For k=31 and m=11, the maximum number of consecutive kmers sharing the same minimiser is k − m + 1 = **21 kmers** (the minimiser traverses from position k−m to 0 as the window slides). A unitig that is a single full superkmer therefore has exactly 21 kmers. This is confirmed by a bimodal distribution in empirical data: a sharp peak at 21 kmers appears in all partitions, including the anomalous partition 145. The observed maximum is ~46 kmers (unitigs spanning more than one superkmer), well within u8 range. + +On *Betula nana* (k=31, 256 partitions), m_u ≈ 37.9 kmers/unitig on average. The `rank` field (kmer index within the unitig) fits in a `u8` as long as no unitig exceeds 255 kmers — guaranteed by the split strategy below and amply satisfied by empirical maximums (~46 kmers observed). ### Split strategy for long unitigs diff --git a/mkdocs.yml b/mkdocs.yml index e7fb137..6c2cfa4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -44,6 +44,7 @@ nav: - On-disk storage: implementation/storage.md - MPHF selection: implementation/mphf.md - Unitig evidence encoding: implementation/unitig_evidence.md + - obilayeredmap crate: implementation/obilayeredmap.md - Architecture: - Sequences: architecture/sequences/invariant.md diff --git a/scripts/compare_kmers.py b/scripts/compare_kmers.py index 96126d6..fda795a 100755 --- a/scripts/compare_kmers.py +++ b/scripts/compare_kmers.py @@ -75,7 +75,12 @@ def main(): parser.add_argument("file_a", help="First FASTA file (reference)") parser.add_argument("file_b", help="Second FASTA file (to compare)") parser.add_argument( - "-k", "--kmer-size", type=int, default=31, metavar="K", help="k-mer size (default: 31)" + "-k", + "--kmer-size", + type=int, + default=31, + metavar="K", + help="k-mer size (default: 31)", ) args = parser.parse_args() @@ -104,6 +109,10 @@ def main(): if only_a or only_b: print("\nSets differ.", file=sys.stderr) + if len(only_a) > 0 and len(only_b) <= 10: + print(f"\nOnly in A: {only_a}") + if len(only_b) > 0 and len(only_b) <= 10: + print(f"\nOnly in B: {only_b}") sys.exit(1) else: print("\nSets are identical.") diff --git a/src/Cargo.lock b/src/Cargo.lock index 19eb24f..9761b26 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1615,7 +1615,10 @@ version = "0.1.0" dependencies = [ "memmap2", "niffler 3.0.0", + "obikrope", "obikseq", + "obiread", + "obiskbuilder", "obiskio", "ph", "rayon", @@ -1623,6 +1626,7 @@ dependencies = [ "serde", "serde_json", "sysinfo 0.33.1", + "tempfile", "tracing", ] @@ -1679,6 +1683,7 @@ name = "obiskio" version = "0.1.0" dependencies = [ "lru", + "memmap2", "niffler 3.0.0", "obikseq", "rustix", diff --git a/src/obikpartitionner/Cargo.toml b/src/obikpartitionner/Cargo.toml index d8c1f68..256cabc 100644 --- a/src/obikpartitionner/Cargo.toml +++ b/src/obikpartitionner/Cargo.toml @@ -3,6 +3,13 @@ name = "obikpartitionner" version = "0.1.0" edition = "2024" +[dev-dependencies] +tempfile = "3" +obikseq = { path = "../obikseq", features = ["test-utils"] } +obiskbuilder = { path = "../obiskbuilder" } +obiread = { path = "../obiread" } +obikrope = { path = "../obikrope" } + [dependencies] niffler = "3.0.0" remove_dir_all = "0.8" diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index d1bf7ea..a55bbd9 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -616,3 +616,119 @@ impl Drop for KmerPartition { let _ = self.close(); } } + +// ── integration tests ───────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + use obikrope::Rope; + use obikseq::SuperKmer; + use obiskbuilder::build_superkmers; + + const K: usize = 11; + const M: usize = 5; + + fn setup() { + obikseq::params::set_k(K); + obikseq::params::set_m(M); + } + + /// Direct canonical k-mer counts from ASCII sequences — ground truth. + fn direct_counts(seqs: &[&[u8]]) -> (u64, u64) { + let mut counts: HashMap, u64> = HashMap::new(); + for seq in seqs { + for i in 0..seq.len().saturating_sub(K - 1) { + let km = SuperKmer::from_ascii(&seq[i..i + K]).to_ascii(); + *counts.entry(km).or_insert(0) += 1; + } + } + let f0 = counts.len() as u64; + let f1: u64 = counts.values().sum(); + (f0, f1) + } + + /// Run the full pipeline on a list of sequences and return (f0, f1) from + /// the `kmer_spectrum_raw.json` produced by `count_partition`. + fn pipeline_counts(seqs: &[&[u8]]) -> (u64, u64) { + setup(); + + let mut rope_data: Vec = Vec::new(); + for seq in seqs { + rope_data.extend_from_slice(seq); + rope_data.push(0x00); + } + let mut rope = Rope::new(None); + rope.push(rope_data); + + let superkmers: Vec<_> = build_superkmers(rope, K, 1, 0.0); + + let dir = tempfile::tempdir().unwrap(); + let mut kp = KmerPartition::create(dir.path(), 0, K, M, true).unwrap(); + kp.write_batch(superkmers).unwrap(); + kp.close().unwrap(); + kp.dereplicate().unwrap(); + + let part_dir = dir.path().join("part_00000"); + let dedup_path = part_dir.join("dereplicated.skmer.zst"); + if !dedup_path.exists() { + return (0, 0); + } + count_partition(&part_dir, &dedup_path, K).unwrap(); + + let spec: serde_json::Value = serde_json::from_reader( + fs::File::open(part_dir.join("kmer_spectrum_raw.json")).unwrap(), + ).unwrap(); + let f0 = spec["f0"].as_u64().unwrap_or(0); + let f1 = spec["f1"].as_u64().unwrap_or(0); + (f0, f1) + } + + #[test] + fn single_sequence_f0_f1_match() { + let seqs: &[&[u8]] = &[b"ACGTACGTACGTACGTACGT"]; + let (ef0, ef1) = direct_counts(seqs); + let (gf0, gf1) = pipeline_counts(seqs); + assert_eq!(gf0, ef0, "f0 wrong: expected {ef0}, got {gf0}"); + assert_eq!(gf1, ef1, "f1 wrong: expected {ef1}, got {gf1}"); + } + + #[test] + fn two_sequences_f0_f1_match() { + let seqs: &[&[u8]] = &[ + b"ACGTACGTACGTACGTACGT", + b"TGCATGCATGCATGCATGCA", + ]; + let (ef0, ef1) = direct_counts(seqs); + let (gf0, gf1) = pipeline_counts(seqs); + assert_eq!(gf0, ef0, "f0 wrong: expected {ef0}, got {gf0}"); + assert_eq!(gf1, ef1, "f1 wrong: expected {ef1}, got {gf1}"); + } + + #[test] + fn repeated_sequence_f1_doubles() { + let seq = b"ACGTACGTACGTACGTACGT"; + let seqs: &[&[u8]] = &[seq, seq]; + let (ef0, ef1) = direct_counts(seqs); + let (gf0, gf1) = pipeline_counts(seqs); + assert_eq!(gf0, ef0, "f0 wrong: expected {ef0}, got {gf0}"); + assert_eq!(gf1, ef1, "f1 wrong: expected {ef1}, got {gf1}"); + } + + #[test] + fn many_sequences_f0_f1_match() { + // 20 distinct sequences of length 40 — forces multiple super-kmers and + // multiple minimizer boundaries per sequence. + let bases = b"ACGT"; + let seqs: Vec> = (0..20u32) + .map(|i| (0..40).map(|j| bases[((i * 7 + j * 3) % 4) as usize]).collect()) + .collect(); + let seq_refs: Vec<&[u8]> = seqs.iter().map(|v| v.as_slice()).collect(); + let (ef0, ef1) = direct_counts(&seq_refs); + let (gf0, gf1) = pipeline_counts(&seq_refs); + assert_eq!(gf0, ef0, "f0 wrong: expected {ef0}, got {gf0}"); + assert_eq!(gf1, ef1, "f1 wrong: expected {ef1}, got {gf1}"); + } +} diff --git a/src/obikseq/src/lib.rs b/src/obikseq/src/lib.rs index aaa7a7b..79625d1 100644 --- a/src/obikseq/src/lib.rs +++ b/src/obikseq/src/lib.rs @@ -21,6 +21,7 @@ pub mod unitig; pub use annotations::Annotation; pub use kmer::{CanonicalKmer, Kmer, Minimizer, hash_kmer}; +pub use packed_seq::MAX_KMERS_PER_CHUNK; pub use params::{k, m, set_k, set_m}; pub use routable::RoutableSuperKmer; pub use sequence::Sequence; diff --git a/src/obikseq/src/packed_seq.rs b/src/obikseq/src/packed_seq.rs index 73d0ebc..82ee5f6 100644 --- a/src/obikseq/src/packed_seq.rs +++ b/src/obikseq/src/packed_seq.rs @@ -22,6 +22,9 @@ use crate::kmer::{CanonicalKmer, Kmer, KmerError, KLen, KmerLength, KmerOf, MLen use crate::params::k; use crate::revcomp_lookup::REVCOMP4; +/// Maximum kmers per stored chunk. Enforces the u8 max-kmer-index field in the binary format. +pub const MAX_KMERS_PER_CHUNK: usize = 256; + // ── PackedSeq ───────────────────────────────────────────────────────────────── /// 2-bit packed DNA sequence of arbitrary length ≥ 1. @@ -229,22 +232,36 @@ impl PackedSeq { self.iter_kmers().map(|km| km.canonical()) } - /// Serialise to a compact binary representation. + /// Extract nucleotides `[start, end)` as a new [`PackedSeq`]. Allocates. + pub fn sub(&self, start: usize, end: usize) -> Self { + debug_assert!(end > start && end <= self.seql()); + let nucs: Vec = (start..end).map(|i| self.nucleotide(i)).collect(); + Self::from_nucleotides(&nucs) + } + + /// Serialise one chunk to binary. /// - /// Format: varint(seql) followed by raw packed bytes. - /// `tail` and `byte_len` are both derivable from `seql` and need not be stored. + /// Format: `[u8: n_kmers−1][packed bytes]`. + /// The caller must ensure `seql ≥ k` and `seql − k + 1 ≤ MAX_KMERS_PER_CHUNK`. + /// Use [`SuperKmer::write_to_binary`] for sequences that may exceed one chunk. pub fn write_to_binary(&self, w: &mut W) -> io::Result<()> { - write_varint(w, self.seql() as u64)?; + let k = crate::params::k(); + let seql = self.seql(); + debug_assert!(seql >= k, "sequence shorter than k"); + debug_assert!( + seql - k + 1 <= MAX_KMERS_PER_CHUNK, + "chunk exceeds MAX_KMERS_PER_CHUNK; split before calling write_to_binary" + ); + w.write_all(&[(seql - k) as u8])?; w.write_all(&self.seq) } - /// Deserialise from the compact binary format produced by [`write_to_binary`]. + /// Deserialise one chunk from the binary format produced by [`write_to_binary`]. /// Allocates exactly one `Box<[u8]>` for the packed bytes. pub fn read_from_binary(r: &mut R) -> io::Result { - let seql = read_varint(r)? as usize; - if seql == 0 { - return Err(io::Error::new(io::ErrorKind::InvalidData, "empty sequence")); - } + let mut buf = [0u8; 1]; + r.read_exact(&mut buf)?; + let seql = buf[0] as usize + crate::params::k(); let byte_len = (seql + 3) / 4; let tail = (seql % 4) as u8; let mut seq = vec![0u8; byte_len]; diff --git a/src/obikseq/src/superkmer.rs b/src/obikseq/src/superkmer.rs index 944fbef..1e8c9b5 100644 --- a/src/obikseq/src/superkmer.rs +++ b/src/obikseq/src/superkmer.rs @@ -12,7 +12,7 @@ use xxhash_rust::xxh3::xxh3_64; use crate::Annotation; use crate::Sequence; use crate::kmer::{CanonicalKmer, Kmer, KmerError}; -use crate::packed_seq::{PackedSeq, read_varint, write_varint}; +use crate::packed_seq::{MAX_KMERS_PER_CHUNK, PackedSeq, read_varint, write_varint}; // ── SKAnnotation ────────────────────────────────────────────────────────────── @@ -91,13 +91,37 @@ impl SuperKmer { Self { count: 1, inner } } - /// Serialise to compact binary. Format: varint(count) + varint((byte_len << 2) | tail) + bytes. + /// Serialise to compact binary: `[varint(count)][u8: n_kmers−1][packed bytes]` per chunk. + /// + /// Sequences with more than [`MAX_KMERS_PER_CHUNK`] kmers are transparently split into + /// overlapping chunks (k−1 nucleotide overlap, same count per chunk). Each chunk is an + /// independent, self-contained record — one [`read_from_binary`] call reads exactly one. pub fn write_to_binary(&self, w: &mut W) -> io::Result<()> { - write_varint(w, self.count as u64)?; - self.inner.write_to_binary(w) + let k = crate::params::k(); + let seql = self.seql(); + debug_assert!(seql >= k, "super-kmer shorter than k"); + let n_kmers = seql - k + 1; + if n_kmers <= MAX_KMERS_PER_CHUNK { + write_varint(w, self.count as u64)?; + self.inner.write_to_binary(w) + } else { + let chunk_nucl = MAX_KMERS_PER_CHUNK + k - 1; + let stride = MAX_KMERS_PER_CHUNK; + let mut start = 0; + loop { + let end = (start + chunk_nucl).min(seql); + let mut chunk = self.inner.sub(start, end); + chunk.canonicalize(); + write_varint(w, self.count as u64)?; + chunk.write_to_binary(w)?; + if end == seql { break; } + start += stride; + } + Ok(()) + } } - /// Deserialise from the binary format produced by [`write_to_binary`]. + /// Deserialise one chunk from the binary format produced by [`write_to_binary`]. /// Allocates exactly one `Box<[u8]>` for the packed bytes. pub fn read_from_binary(r: &mut R) -> io::Result { let count = read_varint(r)? as u32; diff --git a/src/obikseq/src/tests/superkmer.rs b/src/obikseq/src/tests/superkmer.rs index 8d8562c..1993028 100644 --- a/src/obikseq/src/tests/superkmer.rs +++ b/src/obikseq/src/tests/superkmer.rs @@ -67,27 +67,57 @@ fn seql_roundtrip() { // ── binary serialisation ────────────────────────────────────────────────────── +fn binary_test_lengths(k: usize) -> Vec { + use crate::packed_seq::MAX_KMERS_PER_CHUNK; + // Only single-chunk lengths: seql in [k, MAX_KMERS_PER_CHUNK+k-1]. + (k..=k + 5).chain([255, 256, 257, MAX_KMERS_PER_CHUNK + k - 1]).collect() +} + #[test] fn binary_roundtrip() { - for len in all_lengths() { + set_k(4); + let k = crate::params::k(); + for len in binary_test_lengths(k) { let mut sk = SuperKmer::from_ascii(&make_seq(len)); sk.set_count(42); let mut buf = Vec::new(); sk.write_to_binary(&mut buf).unwrap(); let sk2 = SuperKmer::read_from_binary(&mut buf.as_slice()).unwrap(); - assert_eq!( - sk.to_ascii(), - sk2.to_ascii(), - "sequence mismatch for len={len}" - ); + assert_eq!(sk.to_ascii(), sk2.to_ascii(), "sequence mismatch for len={len}"); assert_eq!(sk2.count(), 42, "count mismatch for len={len}"); } } +#[test] +fn binary_split_roundtrip() { + // A super-kmer > MAX_KMERS_PER_CHUNK kmers is split into multiple records on write. + use crate::packed_seq::MAX_KMERS_PER_CHUNK; + set_k(4); + let k = crate::params::k(); + // seql = MAX_KMERS_PER_CHUNK + k = 260 → n_kmers = 257 > 256 → 2 chunks + let seql = MAX_KMERS_PER_CHUNK + k; + let mut sk = SuperKmer::from_ascii(&make_seq(seql)); + sk.set_count(7); + let mut buf = Vec::new(); + sk.write_to_binary(&mut buf).unwrap(); + // Read all records back. + let mut slice = buf.as_slice(); + let chunk0 = SuperKmer::read_from_binary(&mut slice).unwrap(); + let chunk1 = SuperKmer::read_from_binary(&mut slice).unwrap(); + assert!(slice.is_empty(), "unexpected trailing bytes"); + assert_eq!(chunk0.count(), 7); + assert_eq!(chunk1.count(), 7); + // Chunks cover the original sequence with k-1 overlap — no kmer lost. + assert_eq!(chunk0.seql(), MAX_KMERS_PER_CHUNK + k - 1); // 259 + assert_eq!(chunk1.seql(), k); // 4 (1 kmer) +} + #[test] fn binary_packed_seq_roundtrip() { use crate::packed_seq::PackedSeq; - for len in all_lengths() { + set_k(4); + let k = crate::params::k(); + for len in binary_test_lengths(k) { let ps = PackedSeq::from_ascii(&make_seq(len)); let mut buf = Vec::new(); ps.write_to_binary(&mut buf).unwrap(); @@ -98,7 +128,8 @@ fn binary_packed_seq_roundtrip() { #[test] fn binary_size_is_compact() { - // seql=4 (1 byte packed): varint(count=1, 1 byte) + varint((1<<2)|0=4, 1 byte) + 1 byte = 3 bytes + // ACGT with k=4: varint(count=1, 1 byte) + u8(n_kmers-1=0, 1 byte) + 1 packed byte = 3 bytes + set_k(4); let sk = SuperKmer::from_ascii(b"ACGT"); let mut buf = Vec::new(); sk.write_to_binary(&mut buf).unwrap(); diff --git a/src/obikseq/src/tests/unitig.rs b/src/obikseq/src/tests/unitig.rs index 7e246c9..54dddbf 100644 --- a/src/obikseq/src/tests/unitig.rs +++ b/src/obikseq/src/tests/unitig.rs @@ -160,7 +160,12 @@ mod tests { #[test] fn binary_roundtrip_all_lengths() { - for len in test_lengths() { + // write_to_binary encodes a single chunk: seql must be in [k, MAX_KMERS_PER_CHUNK+k-1]. + use crate::packed_seq::MAX_KMERS_PER_CHUNK; + set_k(4); + let k = crate::params::k(); + let valid_lengths: Vec = (k..=9).chain([255, 256, 257, MAX_KMERS_PER_CHUNK + k - 1]).collect(); + for len in valid_lengths { let u = Unitig::from_ascii(&make_seq(len)); let mut buf = Vec::new(); u.write_to_binary(&mut buf).unwrap(); diff --git a/src/obiskbuilder/src/iter.rs b/src/obiskbuilder/src/iter.rs index 1505203..f874349 100644 --- a/src/obiskbuilder/src/iter.rs +++ b/src/obiskbuilder/src/iter.rs @@ -96,7 +96,7 @@ impl Iterator for SuperKmerIter<'_> { } // ── 1. Entropy check ───────────────────────────────────────────── - if self.stat.normalized_entropy().unwrap_or(1.0) <= self.theta { + if self.stat.normalized_entropy().unwrap_or(1.0) < self.theta { let result = self.try_emit(); self.cursor.rewind(self.k - 1).ok(); self.reset(); @@ -168,6 +168,70 @@ mod tests { // k=11, m=5 — valeurs minimales du projet (k ∈ [11,31]) const K: usize = 11; + /// Collect the set of canonical k-mers from a raw ASCII sequence (no NUL). + fn direct_canonical_kmers(seq: &[u8]) -> std::collections::HashSet> { + (0..seq.len().saturating_sub(K - 1)) + .map(|i| obikseq::SuperKmer::from_ascii(&seq[i..i + K]).to_ascii()) + .collect() + } + + /// Collect the set of canonical k-mers emitted by SuperKmerIter over a rope. + fn iter_canonical_kmers(rope: &Rope) -> std::collections::HashSet> { + SuperKmerIter::new(rope, K, 1, 0.0) + .flat_map(|rsk| { + rsk.superkmer() + .iter_canonical_kmers() + .map(|km| km.to_ascii()) + .collect::>() + }) + .collect() + } + + #[test] + fn coverage_single_segment() { + setup(); + let seq = b"ACGTACGTACGTACGTACGT"; + let rope = make_rope(&[seq.as_ref(), b"\x00"].concat()); + let direct = direct_canonical_kmers(seq); + let from_iter = iter_canonical_kmers(&rope); + let missing: Vec<_> = direct.difference(&from_iter).collect(); + assert!( + missing.is_empty(), + "k-mers perdus dans segment unique : {missing:?}" + ); + } + + #[test] + fn coverage_two_segments() { + setup(); + let seg1 = b"ACGTACGTACGTACGTACGT"; + let seg2 = b"TGCATGCATGCATGCATGCA"; + let rope = make_rope(&[seg1.as_ref(), b"\x00", seg2.as_ref(), b"\x00"].concat()); + let mut direct = direct_canonical_kmers(seg1); + direct.extend(direct_canonical_kmers(seg2)); + let from_iter = iter_canonical_kmers(&rope); + let missing: Vec<_> = direct.difference(&from_iter).collect(); + assert!( + missing.is_empty(), + "k-mers perdus dans deux segments : {missing:?}" + ); + } + + #[test] + fn coverage_minimizer_boundary() { + setup(); + // sequence assez longue pour forcer plusieurs changements de minimiseur + let seq: Vec = (0..80).map(|i| b"ACGT"[i % 4]).collect(); + let rope = make_rope(&[seq.as_slice(), b"\x00"].concat()); + let direct = direct_canonical_kmers(&seq); + let from_iter = iter_canonical_kmers(&rope); + let missing: Vec<_> = direct.difference(&from_iter).collect(); + assert!( + missing.is_empty(), + "k-mers perdus à la frontière de minimiseur : {missing:?}" + ); + } + #[test] fn single_segment_one_superkmer() { setup(); diff --git a/src/obiskio/Cargo.toml b/src/obiskio/Cargo.toml index 4d60b3f..5a04f40 100644 --- a/src/obiskio/Cargo.toml +++ b/src/obiskio/Cargo.toml @@ -10,6 +10,7 @@ lru = "0.12" serde = { version = "1", features = ["derive"] } serde_json = "1" +memmap2 = "0.9" obikseq = { path = "../obikseq" } [dev-dependencies] diff --git a/src/obiskio/src/codec.rs b/src/obiskio/src/codec.rs index b2b5b44..1a0d065 100644 --- a/src/obiskio/src/codec.rs +++ b/src/obiskio/src/codec.rs @@ -17,63 +17,5 @@ pub(crate) fn read_superkmer(r: &mut R) -> io::Result } #[cfg(test)] -mod tests { - use super::*; - use std::io::Cursor; - - fn make_sk(ascii: &[u8]) -> SuperKmer { - SuperKmer::from_ascii(ascii) - } - - #[test] - fn roundtrip_single() { - let sk = make_sk(b"ACGTACGT"); - let mut buf = Vec::new(); - write_superkmer(&mut buf, &sk).unwrap(); - - let mut cur = Cursor::new(&buf); - let got = read_superkmer(&mut cur).unwrap().unwrap(); - assert_eq!(sk.to_ascii(), got.to_ascii()); - assert_eq!(sk.seql(), got.seql()); - } - - #[test] - fn roundtrip_all_lengths() { - let bases: Vec = (0..300).map(|i| b"ACGT"[i % 4]).collect(); - let k = 11; - for len in (k..=k + 8).chain([255, 256, 257]) { - let sk = make_sk(&bases[..len]); - let mut buf = Vec::new(); - write_superkmer(&mut buf, &sk).unwrap(); - - let mut cur = Cursor::new(&buf); - let got = read_superkmer(&mut cur).unwrap().unwrap(); - assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}"); - assert_eq!(sk.seql(), got.seql(), "len={len}"); - } - } - - #[test] - fn eof_returns_none() { - let buf: Vec = vec![]; - let mut cur = Cursor::new(&buf); - assert!(read_superkmer(&mut cur).unwrap().is_none()); - } - - #[test] - fn multiple_records() { - let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"]; - let mut buf = Vec::new(); - for s in seqs { - write_superkmer(&mut buf, &make_sk(s)).unwrap(); - } - - let mut cur = Cursor::new(&buf); - for s in seqs { - let got = read_superkmer(&mut cur).unwrap().unwrap(); - let expected = make_sk(s); - assert_eq!(expected.to_ascii(), got.to_ascii()); - } - assert!(read_superkmer(&mut cur).unwrap().is_none()); - } -} +#[path = "tests/codec.rs"] +mod tests; diff --git a/src/obiskio/src/lib.rs b/src/obiskio/src/lib.rs index 6af659b..b6f5844 100644 --- a/src/obiskio/src/lib.rs +++ b/src/obiskio/src/lib.rs @@ -4,8 +4,10 @@ pub mod limits; pub mod meta; pub mod pool; pub mod reader; +pub mod unitig_index; pub use error::{SKError, SKResult}; pub use meta::SKFileMeta; pub use pool::{create_token, create_token_with, SKFilePool, SharedPool, SKFileWriter}; pub use reader::{SKFileIter, SKFileReader}; +pub use unitig_index::{UnitigFileReader, UnitigFileWriter}; diff --git a/src/obiskio/src/pool.rs b/src/obiskio/src/pool.rs index 54c3d9c..6b24aa1 100644 --- a/src/obiskio/src/pool.rs +++ b/src/obiskio/src/pool.rs @@ -428,229 +428,5 @@ impl Drop for SKFileWriter { // ── tests ────────────────────────────────────────────────────────────────────── #[cfg(test)] -mod tests { - use super::*; - use crate::reader::SKFileReader; - use obikseq::{SuperKmer, set_k}; - use tempfile::{NamedTempFile, TempDir}; - - const TEST_K: usize = 4; - - fn make_sk(seed: usize) -> SuperKmer { - let bases: Vec = (0..8).map(|j| b"ACGT"[(seed + j) % 4]).collect(); - SuperKmer::from_ascii(&bases) - } - - fn pool(max_open: usize) -> SharedPool { - Arc::new(Mutex::new(SKFilePool::new(max_open))) - } - - fn open_token(t: &mut SKFileWriter, sk: &SuperKmer) { - t.set_flush_threshold(1); - t.write(sk).unwrap(); // pending ≥ 1 → drain → fd opened - } - - #[test] - fn creation_holds_no_fd() { - set_k(TEST_K); - let dir = TempDir::new().unwrap(); - let p = pool(3); - for i in 0..10 { - create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap(); - } - assert_eq!(p.lock().unwrap().open_count(), 0); - } - - #[test] - fn pool_limits_open_fds() { - set_k(TEST_K); - let dir = TempDir::new().unwrap(); - let p = pool(3); - let sk = make_sk(0); - - let mut tokens: Vec = (0..6) - .map(|i| create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap()) - .collect(); - - for t in tokens.iter_mut() { - open_token(t, &sk); - } - - assert!( - p.lock().unwrap().open_count() <= 3, - "open={}", - p.lock().unwrap().open_count() - ); - } - - #[test] - fn evicted_token_stays_logically_open() { - set_k(TEST_K); - let dir = TempDir::new().unwrap(); - let p = pool(1); - let sk = make_sk(0); - - let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); - let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); - - open_token(&mut t0, &sk); // t0 fd open, pool full - open_token(&mut t1, &sk); // evicts t0, t1 fd open - - assert!(t0.is_open(), "t0 must remain logically open after eviction"); - assert_eq!(p.lock().unwrap().open_count(), 1); - } - - #[test] - fn evicted_data_readable_after_close_all() { - set_k(TEST_K); - let dir = TempDir::new().unwrap(); - let p = pool(1); - let sk = make_sk(0); - - let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); - let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); - - t0.set_flush_threshold(1); - t0.write(&sk).unwrap(); // t0 fd open, pool full - t1.set_flush_threshold(1); - t1.write(&sk).unwrap(); // evicts t0, t1 fd open - - // t0 still has the record in pending (eviction just closed fd, pending stays in token) - // Actually: t0's pending was drained before drain() returned (drain clears pending). - // So t0 wrote its record, then was evicted (fd closed). - - drop(t0); - drop(t1); - p.lock().unwrap().close_all().unwrap(); - - for name in &["a.zst", "b.zst"] { - let mut r = SKFileReader::open(dir.path().join(name)).unwrap(); - let got = r.read_batch(10).unwrap(); - assert_eq!(got.len(), 1, "{name}: expected 1 record"); - } - } - - #[test] - fn touch_moves_to_mru_so_lru_is_evicted() { - set_k(TEST_K); - let dir = TempDir::new().unwrap(); - let p = pool(2); - let sk = make_sk(0); - - let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); - let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); - let mut t2 = create_token(&p, dir.path().join("c.zst")).unwrap(); - - open_token(&mut t0, &sk); // t0 open - open_token(&mut t1, &sk); // t1 open, t0 LRU - - // Write to t0 again → t0 becomes MRU, t1 becomes LRU - t0.set_flush_threshold(1); - t0.write(&sk).unwrap(); - - // Writing to t2 fills pool (cap=2) → evicts LRU = t1 - open_token(&mut t2, &sk); - - let open_count = p.lock().unwrap().open_count(); - assert!(open_count <= 2, "open_count={open_count}"); - } - - #[test] - fn close_all_produces_readable_files() { - set_k(TEST_K); - let dir = TempDir::new().unwrap(); - let p = pool(8); - let paths: Vec<_> = (0..4) - .map(|i| dir.path().join(format!("{i}.zst"))) - .collect(); - - let mut tokens: Vec = paths - .iter() - .map(|path| create_token(&p, path.clone()).unwrap()) - .collect(); - - for (i, t) in tokens.iter_mut().enumerate() { - t.write(&make_sk(i)).unwrap(); - } - // close tokens first so pending bytes are flushed and Zstd frames finalized - for t in tokens.iter_mut() { - t.close().unwrap(); - } - p.lock().unwrap().close_all().unwrap(); - - for path in &paths { - let mut r = SKFileReader::open(path).unwrap(); - let got = r.read_batch(10).unwrap(); - assert_eq!(got.len(), 1); - } - } - - #[test] - fn write_batch_roundtrip() { - set_k(TEST_K); - let dir = TempDir::new().unwrap(); - let p = pool(4); - let sks: Vec<_> = (0..50).map(make_sk).collect(); - let path = dir.path().join("batch.zst"); - - let mut t = create_token(&p, path.clone()).unwrap(); - t.write_batch(&sks).unwrap(); - t.close().unwrap(); - - let mut r = SKFileReader::open(&path).unwrap(); - let got = r.read_batch(100).unwrap(); - assert_eq!(got.len(), 50); - for (a, b) in sks.iter().zip(got.iter()) { - assert_eq!(a.to_ascii(), b.to_ascii()); - } - } - - #[test] - fn from_system_limits_bounded() { - set_k(TEST_K); - let pool = SKFilePool::from_system_limits(); - assert!(pool.max_open() >= 16); - assert!(pool.max_open() <= MAX_POOL_SIZE); - } - - #[test] - fn standalone_roundtrip_zstd() { - set_k(TEST_K); - let tmp = NamedTempFile::new().unwrap(); - let sks: Vec<_> = (0..100).map(make_sk).collect(); - { - let mut w = SKFileWriter::create(tmp.path()).unwrap(); - w.write_batch(&sks).unwrap(); - w.close().unwrap(); - } - let mut r = SKFileReader::open(tmp.path()).unwrap(); - let got = r.read_batch(200).unwrap(); - assert_eq!(got.len(), 100); - for (a, b) in sks.iter().zip(got.iter()) { - assert_eq!(a.to_ascii(), b.to_ascii()); - } - } - - #[test] - fn standalone_close_prevents_write() { - set_k(TEST_K); - let tmp = NamedTempFile::new().unwrap(); - let mut w = SKFileWriter::create(tmp.path()).unwrap(); - w.close().unwrap(); - assert!(!w.is_open()); - assert!(w.write(&make_sk(0)).is_err()); - } - - #[test] - fn standalone_is_physically_open() { - set_k(TEST_K); - let tmp = NamedTempFile::new().unwrap(); - let mut w = SKFileWriter::create(tmp.path()).unwrap(); - assert!(!w.is_physically_open()); // fd deferred until first drain - w.set_flush_threshold(1); - w.write(&make_sk(0)).unwrap(); // triggers drain → fd opened - assert!(w.is_physically_open()); - w.close().unwrap(); - assert!(!w.is_physically_open()); - } -} +#[path = "tests/pool.rs"] +mod tests; diff --git a/src/obiskio/src/reader.rs b/src/obiskio/src/reader.rs index d523ab8..139c020 100644 --- a/src/obiskio/src/reader.rs +++ b/src/obiskio/src/reader.rs @@ -143,70 +143,5 @@ impl Iterator for SKFileIter<'_> { } #[cfg(test)] -mod tests { - use super::*; - use crate::pool::SKFileWriter; - use tempfile::NamedTempFile; - - const TEST_K: usize = 4; // test sequences are 8 bases; k=4 gives n_kmers=5 - - fn setup() { - obikseq::params::set_k(TEST_K); - } - - fn make_sks(n: usize) -> Vec { - (0..n) - .map(|i| { - let bases: Vec = (0..8).map(|j| b"ACGT"[(i + j) % 4]).collect(); - SuperKmer::from_ascii(&bases) - }) - .collect() - } - - #[test] - fn iter_all() { - setup(); - let tmp = NamedTempFile::new().unwrap(); - let sks = make_sks(50); - - { - let mut w = SKFileWriter::create(tmp.path()).unwrap(); - w.write_batch(&sks).unwrap(); - } - - let mut r = SKFileReader::open(tmp.path()).unwrap(); - let got: Vec<_> = r.iter().collect(); - assert_eq!(got.len(), 50); - for (a, b) in sks.iter().zip(got.iter()) { - assert_eq!(a.to_ascii(), b.to_ascii()); - } - } - - #[test] - fn reopen_and_seek() { - setup(); - let tmp = NamedTempFile::new().unwrap(); - let sks = make_sks(20); - - { - let mut w = SKFileWriter::create(tmp.path()).unwrap(); - w.write_batch(&sks).unwrap(); - } - - let mut r = SKFileReader::open(tmp.path()).unwrap(); - // Read 10, then simulate pool eviction + re-access - let first = r.read_batch(10).unwrap(); - r.close(); - r.reopen_and_seek().unwrap(); - // Continue from position 10 - let rest = r.read_batch(20).unwrap(); - assert_eq!(first.len(), 10); - assert_eq!(rest.len(), 10); - for (a, b) in sks[..10].iter().zip(first.iter()) { - assert_eq!(a.to_ascii(), b.to_ascii()); - } - for (a, b) in sks[10..].iter().zip(rest.iter()) { - assert_eq!(a.to_ascii(), b.to_ascii()); - } - } -} +#[path = "tests/reader.rs"] +mod tests; diff --git a/src/obiskio/src/tests/codec.rs b/src/obiskio/src/tests/codec.rs new file mode 100644 index 0000000..788c4b1 --- /dev/null +++ b/src/obiskio/src/tests/codec.rs @@ -0,0 +1,64 @@ +use super::*; +use obikseq::set_k; +use std::io::Cursor; + +fn make_sk(ascii: &[u8]) -> SuperKmer { + SuperKmer::from_ascii(ascii) +} + +#[test] +fn roundtrip_single() { + set_k(4); + let sk = make_sk(b"ACGTACGT"); + let mut buf = Vec::new(); + write_superkmer(&mut buf, &sk).unwrap(); + + let mut cur = Cursor::new(&buf); + let got = read_superkmer(&mut cur).unwrap().unwrap(); + assert_eq!(sk.to_ascii(), got.to_ascii()); + assert_eq!(sk.seql(), got.seql()); +} + +#[test] +fn roundtrip_all_lengths() { + set_k(11); + let k: usize = 11; + let bases: Vec = (0..300).map(|i| b"ACGT"[i % 4]).collect(); + // With k=11, seql=257 → n_kmers=247 ≤ 256: single chunk, no split. + for len in (k..=k + 8).chain([255, 256, 257]) { + let sk = make_sk(&bases[..len]); + let mut buf = Vec::new(); + write_superkmer(&mut buf, &sk).unwrap(); + + let mut cur = Cursor::new(&buf); + let got = read_superkmer(&mut cur).unwrap().unwrap(); + assert_eq!(sk.to_ascii(), got.to_ascii(), "len={len}"); + assert_eq!(sk.seql(), got.seql(), "len={len}"); + } +} + +#[test] +fn eof_returns_none() { + set_k(4); + let buf: Vec = vec![]; + let mut cur = Cursor::new(&buf); + assert!(read_superkmer(&mut cur).unwrap().is_none()); +} + +#[test] +fn multiple_records() { + set_k(4); + let seqs: &[&[u8]] = &[b"AAAA", b"CCCC", b"GGGG", b"TTTT"]; + let mut buf = Vec::new(); + for s in seqs { + write_superkmer(&mut buf, &make_sk(s)).unwrap(); + } + + let mut cur = Cursor::new(&buf); + for s in seqs { + let got = read_superkmer(&mut cur).unwrap().unwrap(); + let expected = make_sk(s); + assert_eq!(expected.to_ascii(), got.to_ascii()); + } + assert!(read_superkmer(&mut cur).unwrap().is_none()); +} diff --git a/src/obiskio/src/tests/pool.rs b/src/obiskio/src/tests/pool.rs new file mode 100644 index 0000000..4025776 --- /dev/null +++ b/src/obiskio/src/tests/pool.rs @@ -0,0 +1,217 @@ +use super::*; +use crate::reader::SKFileReader; +use obikseq::{SuperKmer, set_k}; +use tempfile::{NamedTempFile, TempDir}; + +const TEST_K: usize = 4; + +fn make_sk(seed: usize) -> SuperKmer { + let bases: Vec = (0..8).map(|j| b"ACGT"[(seed + j) % 4]).collect(); + SuperKmer::from_ascii(&bases) +} + +fn pool(max_open: usize) -> SharedPool { + Arc::new(Mutex::new(SKFilePool::new(max_open))) +} + +fn open_token(t: &mut SKFileWriter, sk: &SuperKmer) { + t.set_flush_threshold(1); + t.write(sk).unwrap(); +} + +#[test] +fn creation_holds_no_fd() { + set_k(TEST_K); + let dir = TempDir::new().unwrap(); + let p = pool(3); + for i in 0..10 { + create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap(); + } + assert_eq!(p.lock().unwrap().open_count(), 0); +} + +#[test] +fn pool_limits_open_fds() { + set_k(TEST_K); + let dir = TempDir::new().unwrap(); + let p = pool(3); + let sk = make_sk(0); + + let mut tokens: Vec = (0..6) + .map(|i| create_token(&p, dir.path().join(format!("p{i}.zst"))).unwrap()) + .collect(); + + for t in tokens.iter_mut() { + open_token(t, &sk); + } + + assert!( + p.lock().unwrap().open_count() <= 3, + "open={}", + p.lock().unwrap().open_count() + ); +} + +#[test] +fn evicted_token_stays_logically_open() { + set_k(TEST_K); + let dir = TempDir::new().unwrap(); + let p = pool(1); + let sk = make_sk(0); + + let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); + let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); + + open_token(&mut t0, &sk); + open_token(&mut t1, &sk); + + assert!(t0.is_open(), "t0 must remain logically open after eviction"); + assert_eq!(p.lock().unwrap().open_count(), 1); +} + +#[test] +fn evicted_data_readable_after_close_all() { + set_k(TEST_K); + let dir = TempDir::new().unwrap(); + let p = pool(1); + let sk = make_sk(0); + + let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); + let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); + + t0.set_flush_threshold(1); + t0.write(&sk).unwrap(); + t1.set_flush_threshold(1); + t1.write(&sk).unwrap(); + + drop(t0); + drop(t1); + p.lock().unwrap().close_all().unwrap(); + + for name in &["a.zst", "b.zst"] { + let mut r = SKFileReader::open(dir.path().join(name)).unwrap(); + let got = r.read_batch(10).unwrap(); + assert_eq!(got.len(), 1, "{name}: expected 1 record"); + } +} + +#[test] +fn touch_moves_to_mru_so_lru_is_evicted() { + set_k(TEST_K); + let dir = TempDir::new().unwrap(); + let p = pool(2); + let sk = make_sk(0); + + let mut t0 = create_token(&p, dir.path().join("a.zst")).unwrap(); + let mut t1 = create_token(&p, dir.path().join("b.zst")).unwrap(); + let mut t2 = create_token(&p, dir.path().join("c.zst")).unwrap(); + + open_token(&mut t0, &sk); + open_token(&mut t1, &sk); + + t0.set_flush_threshold(1); + t0.write(&sk).unwrap(); + + open_token(&mut t2, &sk); + + let open_count = p.lock().unwrap().open_count(); + assert!(open_count <= 2, "open_count={open_count}"); +} + +#[test] +fn close_all_produces_readable_files() { + set_k(TEST_K); + let dir = TempDir::new().unwrap(); + let p = pool(8); + let paths: Vec<_> = (0..4) + .map(|i| dir.path().join(format!("{i}.zst"))) + .collect(); + + let mut tokens: Vec = paths + .iter() + .map(|path| create_token(&p, path.clone()).unwrap()) + .collect(); + + for (i, t) in tokens.iter_mut().enumerate() { + t.write(&make_sk(i)).unwrap(); + } + for t in tokens.iter_mut() { + t.close().unwrap(); + } + p.lock().unwrap().close_all().unwrap(); + + for path in &paths { + let mut r = SKFileReader::open(path).unwrap(); + let got = r.read_batch(10).unwrap(); + assert_eq!(got.len(), 1); + } +} + +#[test] +fn write_batch_roundtrip() { + set_k(TEST_K); + let dir = TempDir::new().unwrap(); + let p = pool(4); + let sks: Vec<_> = (0..50).map(make_sk).collect(); + let path = dir.path().join("batch.zst"); + + let mut t = create_token(&p, path.clone()).unwrap(); + t.write_batch(&sks).unwrap(); + t.close().unwrap(); + + let mut r = SKFileReader::open(&path).unwrap(); + let got = r.read_batch(100).unwrap(); + assert_eq!(got.len(), 50); + for (a, b) in sks.iter().zip(got.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } +} + +#[test] +fn from_system_limits_bounded() { + set_k(TEST_K); + let pool = SKFilePool::from_system_limits(); + assert!(pool.max_open() >= 16); + assert!(pool.max_open() <= MAX_POOL_SIZE); +} + +#[test] +fn standalone_roundtrip_zstd() { + set_k(TEST_K); + let tmp = NamedTempFile::new().unwrap(); + let sks: Vec<_> = (0..100).map(make_sk).collect(); + { + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.write_batch(&sks).unwrap(); + w.close().unwrap(); + } + let mut r = SKFileReader::open(tmp.path()).unwrap(); + let got = r.read_batch(200).unwrap(); + assert_eq!(got.len(), 100); + for (a, b) in sks.iter().zip(got.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } +} + +#[test] +fn standalone_close_prevents_write() { + set_k(TEST_K); + let tmp = NamedTempFile::new().unwrap(); + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.close().unwrap(); + assert!(!w.is_open()); + assert!(w.write(&make_sk(0)).is_err()); +} + +#[test] +fn standalone_is_physically_open() { + set_k(TEST_K); + let tmp = NamedTempFile::new().unwrap(); + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + assert!(!w.is_physically_open()); + w.set_flush_threshold(1); + w.write(&make_sk(0)).unwrap(); + assert!(w.is_physically_open()); + w.close().unwrap(); + assert!(!w.is_physically_open()); +} diff --git a/src/obiskio/src/tests/reader.rs b/src/obiskio/src/tests/reader.rs new file mode 100644 index 0000000..c8a8cdd --- /dev/null +++ b/src/obiskio/src/tests/reader.rs @@ -0,0 +1,63 @@ +use super::*; +use crate::pool::SKFileWriter; +use tempfile::NamedTempFile; + +const TEST_K: usize = 4; + +fn setup() { + obikseq::params::set_k(TEST_K); +} + +fn make_sks(n: usize) -> Vec { + (0..n) + .map(|i| { + let bases: Vec = (0..8).map(|j| b"ACGT"[(i + j) % 4]).collect(); + SuperKmer::from_ascii(&bases) + }) + .collect() +} + +#[test] +fn iter_all() { + setup(); + let tmp = NamedTempFile::new().unwrap(); + let sks = make_sks(50); + + { + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.write_batch(&sks).unwrap(); + } + + let mut r = SKFileReader::open(tmp.path()).unwrap(); + let got: Vec<_> = r.iter().collect(); + assert_eq!(got.len(), 50); + for (a, b) in sks.iter().zip(got.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } +} + +#[test] +fn reopen_and_seek() { + setup(); + let tmp = NamedTempFile::new().unwrap(); + let sks = make_sks(20); + + { + let mut w = SKFileWriter::create(tmp.path()).unwrap(); + w.write_batch(&sks).unwrap(); + } + + let mut r = SKFileReader::open(tmp.path()).unwrap(); + let first = r.read_batch(10).unwrap(); + r.close(); + r.reopen_and_seek().unwrap(); + let rest = r.read_batch(20).unwrap(); + assert_eq!(first.len(), 10); + assert_eq!(rest.len(), 10); + for (a, b) in sks[..10].iter().zip(first.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } + for (a, b) in sks[10..].iter().zip(rest.iter()) { + assert_eq!(a.to_ascii(), b.to_ascii()); + } +} diff --git a/src/obiskio/src/tests/unitig_index.rs b/src/obiskio/src/tests/unitig_index.rs new file mode 100644 index 0000000..e9a8702 --- /dev/null +++ b/src/obiskio/src/tests/unitig_index.rs @@ -0,0 +1,169 @@ +use super::*; +use obikseq::{Kmer, Sequence as _, Unitig, set_k}; +use tempfile::tempdir; + +fn make_unitig(ascii: &[u8]) -> Unitig { + Unitig::from_ascii(ascii) +} + +fn canonical_of(ascii: &[u8]) -> CanonicalKmer { + Kmer::from_ascii(ascii).unwrap().canonical() +} + +fn write_read(seqs: &[&[u8]]) -> (tempfile::TempDir, UnitigFileReader) { + let dir = tempdir().unwrap(); + let path = dir.path().join("unitigs.bin"); + let mut w = UnitigFileWriter::create(&path).unwrap(); + for s in seqs { + w.write(&make_unitig(s)).unwrap(); + } + w.close().unwrap(); + let r = UnitigFileReader::open(&path).unwrap(); + (dir, r) +} + +// ── I/O round-trip ──────────────────────────────────────────────────────────── + +#[test] +fn roundtrip_empty_index() { + set_k(4); + let dir = tempdir().unwrap(); + let path = dir.path().join("unitigs.bin"); + let w = UnitigFileWriter::create(&path).unwrap(); + w.close().unwrap(); + let r = UnitigFileReader::open(&path).unwrap(); + assert_eq!(r.len(), 0); +} + +#[test] +fn roundtrip_unitigs() { + set_k(4); + let seqs: &[&[u8]] = &[b"ACGTACGT", b"TTTTCCCC", b"GGGAAA"]; + let (_dir, r) = write_read(seqs); + assert_eq!(r.len(), seqs.len()); + for (i, s) in seqs.iter().enumerate() { + assert_eq!(r.unitig(i), make_unitig(s), "unitig {i} mismatch"); + } +} + +// ── Bit extraction ──────────────────────────────────────────────────────────── + +#[test] +fn extract_kmer_raw_basic() { + // ACGT = 00 01 10 11 = 0x1B; k=4, j=0 → 0x1B << 56 + let bytes = [0x1Bu8]; + assert_eq!(extract_kmer_raw(&bytes, 0, 4), 0x1Bu64 << 56); +} + +#[test] +fn extract_kmer_raw_intra_byte_offset() { + // ACGT, j=1, k=3 → CGT = 01 10 11 = 0x1B (6 bits) → 0x1B << 58 + let bytes = [0x1Bu8]; + assert_eq!(extract_kmer_raw(&bytes, 1, 3), 0x1Bu64 << 58); +} + +#[test] +fn extract_kmer_raw_cross_byte() { + // Two bytes: ACGT | ACGT = [0x1B, 0x1B] + // j=3, k=4: nucleotides 3..7 = T A C G = 11 00 01 10 = 0b11000110 = 0xC6 + let bytes = [0x1Bu8, 0x1Bu8]; + assert_eq!(extract_kmer_raw(&bytes, 3, 4), 0xC6u64 << 56); +} + +// ── revcomp / canonical ─────────────────────────────────────────────────────── + +#[test] +fn revcomp_palindrome() { + // ACGT is its own reverse complement + let raw = 0x1Bu64 << 56; // ACGT, k=4 + assert_eq!(revcomp_raw(raw, 4), raw); +} + +#[test] +fn revcomp_asymmetric() { + // revcomp(TTTG) = CAAA + // TTTG = 11 11 11 10 = 0xFE → 0xFE << 56 + // CAAA = 01 00 00 00 = 0x40 → 0x40 << 56 + let tttg = 0xFEu64 << 56; + let caaa = 0x40u64 << 56; + assert_eq!(revcomp_raw(tttg, 4), caaa); + assert_eq!(revcomp_raw(caaa, 4), tttg); +} + +#[test] +fn canonical_raw_selects_minimum() { + let tttg = 0xFEu64 << 56; + let caaa = 0x40u64 << 56; + assert_eq!(canonical_raw(tttg, 4), caaa); // TTTG → canonical is CAAA + assert_eq!(canonical_raw(caaa, 4), caaa); // CAAA already canonical +} + +// ── verify_canonical_kmer ───────────────────────────────────────────────────── + +#[test] +fn verify_forward_canonical() { + // CAAA is canonical (< TTTG); stored forward in the unitig → direct match + set_k(4); + let (_dir, r) = write_read(&[b"CAAAACGT"]); + let query = canonical_of(b"CAAA"); + assert!(r.verify_canonical_kmer(0, 0, query)); +} + +#[test] +fn verify_reverse_complement_stored() { + // TTTG stored in the unitig; canonical form is CAAA + // verify must recognise the match despite the stored orientation being non-canonical + set_k(4); + let (_dir, r) = write_read(&[b"TTTGACGT"]); + let query = canonical_of(b"CAAA"); // == canonical_of(b"TTTG") + assert!(r.verify_canonical_kmer(0, 0, query)); +} + +#[test] +fn verify_wrong_kmer_returns_false() { + set_k(4); + let (_dir, r) = write_read(&[b"TTTGACGT"]); + let wrong = canonical_of(b"AAAC"); + assert!(!r.verify_canonical_kmer(0, 0, wrong)); +} + +#[test] +fn verify_second_unitig_second_position() { + // Two unitigs; check kmer at j=1 of unitig 1 ("TTTGACGT") + // j=1 → nucleotides 1..5 = TTGA + set_k(4); + let (_dir, r) = write_read(&[b"ACGTACGT", b"TTTGACGT"]); + let query = canonical_of(b"TTGA"); + assert!(r.verify_canonical_kmer(1, 1, query)); +} + +// ── Splitting ───────────────────────────────────────────────────────────────── + +#[test] +fn short_unitig_not_split() { + // seql=259 → n_kmers=256 = MAX_KMERS_PER_CHUNK → no split + set_k(4); + let seq: Vec = (0..259_usize).map(|i| b"ACGT"[i % 4]).collect(); + let (_dir, r) = write_read(&[&seq]); + assert_eq!(r.len(), 1); + assert_eq!(r.seql(0), 259); +} + +#[test] +fn long_unitig_split_no_kmer_lost() { + // seql=260 → n_kmers=257 > MAX_KMERS_PER_CHUNK(256) → 2 chunks + // chunk_nucl=259, stride=256 + // Chunk 0: nucl 0..259 (259 nucl, 256 kmers) + // Chunk 1: nucl 256..260 (4 nucl, 1 kmer) + set_k(4); + let seq: Vec = (0..260_usize).map(|i| b"ACGT"[i % 4]).collect(); + let (_dir, r) = write_read(&[&seq]); + assert_eq!(r.len(), 2); + assert_eq!(r.seql(0), 259); + assert_eq!(r.seql(1), 4); + // k-1=3 nucleotide overlap → 0 kmers duplicated, 0 kmers lost. + // Last kmer of chunk 0 = original nucl 255..259. + assert!(r.verify_canonical_kmer(0, 255, canonical_of(&seq[255..259]))); + // First kmer of chunk 1 = original nucl 256..260 — a different, adjacent kmer. + assert!(r.verify_canonical_kmer(1, 0, canonical_of(&seq[256..260]))); +} diff --git a/src/obiskio/src/unitig_index.rs b/src/obiskio/src/unitig_index.rs new file mode 100644 index 0000000..dc674fd --- /dev/null +++ b/src/obiskio/src/unitig_index.rs @@ -0,0 +1,286 @@ +use std::fs::File; +use std::io::{BufWriter, Write as _}; +use std::path::{Path, PathBuf}; + +use memmap2::Mmap; +use obikseq::{CanonicalKmer, Unitig}; + +pub use obikseq::MAX_KMERS_PER_CHUNK; + +use crate::error::{SKError, SKResult}; + +// ── Index file format ───────────────────────────────────────────────────────── +// +// magic: [u8; 4] = b"UIDX" +// n_unitigs: u32 LE +// seqls: [u8; n_unitigs] max kmer index per chunk (= n_kmers − 1) +// packed_offsets: [u32; n_unitigs + 1] byte offsets to packed bytes in the +// sequence file; last entry is sentinel +// +// Each sequence record in the binary file: [u8: n_kmers−1][packed bytes]. +// Offsets point to the first packed byte of each record, past the leading u8. +// Unitigs with more than MAX_KMERS_PER_CHUNK kmers are transparently split by the +// writer into overlapping chunks (k-1 nucleotide overlap) so no kmer is lost. + +const MAGIC: [u8; 4] = *b"UIDX"; + +fn idx_path(path: &Path) -> PathBuf { + let mut s = path.as_os_str().to_owned(); + s.push(".idx"); + PathBuf::from(s) +} + +// Extract a sub-sequence [start, end) nucleotides from a unitig. +fn sub_unitig(unitig: &Unitig, start: usize, end: usize) -> Unitig { + unitig.sub(start, end) +} + +// ── Writer ──────────────────────────────────────────────────────────────────── + +/// Writes a sequence of [`Unitig`] to an uncompressed binary file and builds +/// an offset index at close time. +/// +/// Unitigs with more than [`MAX_KMERS_PER_CHUNK`] kmers are transparently split +/// into overlapping chunks (k-1 nucleotide overlap) so no kmer is lost. +/// +/// The companion index file (`path.idx`) is written on [`close`]. +/// The binary format per record is `[u8: n_kmers−1][packed 2-bit bytes]`. +pub struct UnitigFileWriter { + path: PathBuf, + file: BufWriter, + seqls: Vec, + packed_offsets: Vec, + next_offset: u32, + k: usize, +} + +impl UnitigFileWriter { + pub fn create(path: &Path) -> SKResult { + let file = File::create(path).map_err(SKError::Io)?; + Ok(Self { + path: path.to_owned(), + file: BufWriter::new(file), + seqls: Vec::new(), + packed_offsets: Vec::new(), + next_offset: 0, + k: obikseq::params::k(), + }) + } + + /// Write a unitig, splitting it into chunks if it exceeds [`MAX_KMERS_PER_CHUNK`]. + pub fn write(&mut self, unitig: &Unitig) -> SKResult<()> { + let seql = unitig.seql(); + let k = self.k; + + if seql < k { + return Ok(()); + } + + let n_kmers = seql - k + 1; + if n_kmers <= MAX_KMERS_PER_CHUNK { + return self.write_chunk(unitig); + } + + // Split into overlapping chunks of MAX_KMERS_PER_CHUNK kmers. + // Overlap of k-1 nucleotides ensures no kmer is lost at boundaries. + let chunk_nucl = MAX_KMERS_PER_CHUNK + k - 1; + let stride = MAX_KMERS_PER_CHUNK; + let mut start = 0; + while start < seql { + let end = (start + chunk_nucl).min(seql); + self.write_chunk(&sub_unitig(unitig, start, end))?; + if end == seql { + break; + } + start += stride; + } + Ok(()) + } + + fn write_chunk(&mut self, unitig: &Unitig) -> SKResult<()> { + let seql = unitig.seql(); + let byte_len = (seql + 3) / 4; + + // Header is 1 byte (u8: n_kmers − 1 = seql − k); packed bytes follow. + self.packed_offsets.push(self.next_offset + 1); + self.seqls.push((seql - self.k) as u8); + + unitig + .write_to_binary(&mut self.file) + .map_err(SKError::Io)?; + + self.next_offset += 1 + byte_len as u32; + Ok(()) + } + + /// Flush the sequence file and write the companion `.idx`. + pub fn close(mut self) -> SKResult<()> { + self.file.flush().map_err(SKError::Io)?; + drop(self.file); + + // Sentinel: byte offset past the last record's packed bytes. + let sentinel = match (self.packed_offsets.last(), self.seqls.last()) { + (Some(&last_off), Some(&last_seql)) => { + let seql = last_seql as u32 + self.k as u32; + last_off + (seql + 3) / 4 + } + _ => 0, + }; + self.packed_offsets.push(sentinel); + + write_idx(&idx_path(&self.path), &self.seqls, &self.packed_offsets) + } + + pub fn len(&self) -> usize { + self.seqls.len() + } + + pub fn is_empty(&self) -> bool { + self.seqls.is_empty() + } +} + +fn write_idx(path: &Path, seqls: &[u8], packed_offsets: &[u32]) -> SKResult<()> { + let mut w = BufWriter::new(File::create(path).map_err(SKError::Io)?); + w.write_all(&MAGIC).map_err(SKError::Io)?; + w.write_all(&(seqls.len() as u32).to_le_bytes()).map_err(SKError::Io)?; + w.write_all(seqls).map_err(SKError::Io)?; + for &off in packed_offsets { + w.write_all(&off.to_le_bytes()).map_err(SKError::Io)?; + } + w.flush().map_err(SKError::Io) +} + +// ── Reader ──────────────────────────────────────────────────────────────────── + +/// Read-only random-access view of a unitig file. +/// +/// The sequence file is memory-mapped; the index is loaded into RAM on open. +/// All per-kmer operations are O(1) and allocation-free. +pub struct UnitigFileReader { + mmap: Mmap, + seqls: Vec, + packed_offsets: Vec, + k: usize, +} + +impl UnitigFileReader { + pub fn open(path: &Path) -> SKResult { + let file = File::open(path).map_err(SKError::Io)?; + let mmap = unsafe { Mmap::map(&file).map_err(SKError::Io)? }; + let (seqls, packed_offsets) = read_idx(&idx_path(path))?; + let k = obikseq::params::k(); + Ok(Self { mmap, seqls, packed_offsets, k }) + } + + pub fn len(&self) -> usize { + self.seqls.len() + } + + pub fn is_empty(&self) -> bool { + self.seqls.is_empty() + } + + /// Return the nucleotide length of chunk `i`. + #[inline] + pub fn seql(&self, i: usize) -> usize { + self.seqls[i] as usize + self.k + } + + /// Reconstruct chunk `i` as a [`Unitig`]. Allocates a copy of the packed bytes. + pub fn unitig(&self, i: usize) -> Unitig { + let seql = self.seqls[i] as usize + self.k; + let start = self.packed_offsets[i] as usize; + let byte_len = (seql + 3) / 4; + let tail = (seql % 4) as u8; + let bytes = self.mmap[start..start + byte_len].to_vec().into_boxed_slice(); + Unitig::new(tail, bytes) + } + + /// Extract the raw left-aligned u64 of the kmer at position `j` within chunk `i`. + #[inline] + pub fn raw_kmer(&self, i: usize, j: usize) -> u64 { + let start = self.packed_offsets[i] as usize; + extract_kmer_raw(&self.mmap[start..], j, self.k) + } + + /// Return `true` iff the kmer at position `j` of chunk `i` equals `query`. + /// + /// O(1), zero allocation. The chunk may store either orientation of the kmer; + /// canonicalization is applied before comparison. + #[inline] + pub fn verify_canonical_kmer(&self, i: usize, j: usize, query: CanonicalKmer) -> bool { + canonical_raw(self.raw_kmer(i, j), self.k) == query.raw() + } +} + +fn read_idx(path: &Path) -> SKResult<(Vec, Vec)> { + let data = std::fs::read(path).map_err(SKError::Io)?; + let mut pos = 0; + + if &data[pos..pos + 4] != &MAGIC { + return Err(SKError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "unitig index: bad magic", + ))); + } + pos += 4; + + let n = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize; + pos += 4; + + let seqls = data[pos..pos + n].to_vec(); + pos += n; + + let mut packed_offsets = Vec::with_capacity(n + 1); + for _ in 0..=n { + packed_offsets.push(u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap())); + pos += 4; + } + + Ok((seqls, packed_offsets)) +} + +// ── Kmer utilities ──────────────────────────────────────────────────────────── + +/// Reverse complement of a left-aligned 2-bit kmer (same algorithm as [`KmerOf::revcomp`]). +#[inline] +fn revcomp_raw(raw: u64, k: usize) -> u64 { + let x = !raw; + let x = x.swap_bytes(); + let x = ((x >> 4) & 0x0F0F0F0F0F0F0F0F) | ((x & 0x0F0F0F0F0F0F0F0F) << 4); + let x = ((x >> 2) & 0x3333333333333333) | ((x & 0x3333333333333333) << 2); + x << (64 - 2 * k) +} + +/// Canonical form of a left-aligned 2-bit kmer: `min(kmer, revcomp(kmer))`. +#[inline] +fn canonical_raw(raw: u64, k: usize) -> u64 { + raw.min(revcomp_raw(raw, k)) +} + +// ── Bit extraction ──────────────────────────────────────────────────────────── + +/// Extract the kmer at nucleotide position `j` from MSB-first 2-bit packed `bytes`. +/// Returns a left-aligned u64 matching [`KmerOf`]'s internal representation. +#[inline] +fn extract_kmer_raw(bytes: &[u8], j: usize, k: usize) -> u64 { + let bit_start = j * 2; + let byte_start = bit_start / 8; + let bit_offset = bit_start % 8; // always 0, 2, 4, or 6 + let bytes_needed = (bit_offset + 2 * k + 7) / 8; // ≤ 9 for k ≤ 32 + + let mut acc = 0u128; + for idx in 0..bytes_needed { + acc = (acc << 8) | bytes.get(byte_start + idx).copied().unwrap_or(0) as u128; + } + + let shift = bytes_needed * 8 - bit_offset - 2 * k; + let mask = !0u64 >> (64 - 2 * k); + let raw = (acc >> shift) as u64 & mask; + raw << (64 - 2 * k) +} + +#[cfg(test)] +#[path = "tests/unitig_index.rs"] +mod tests;