perf: enable zero-allocation queries and memory-mapped indexes
Introduce zero-allocation row extraction and query result buffers across `obicompactvec` and `obikpartitionner` to eliminate per-kmer heap allocations. Replace in-memory MPHF deserialization with memory-mapped, zero-copy views to reduce runtime memory footprint. Add configurable I/O chunking, a RAM-aware `--chunk-size` parameter, and system memory monitoring via the new `sysinfo` dependency. Re-export `PreloadedIndex` for external consumers.
This commit is contained in:
@@ -11,4 +11,3 @@ mod rebuild_layer;
|
||||
pub use filter::KmerFilter;
|
||||
pub use merge_layer::MergeMode;
|
||||
pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR};
|
||||
pub use query_layer::PreloadedIndex;
|
||||
|
||||
@@ -50,129 +50,91 @@ impl QueryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return `Some(per-genome row)` if `kmer` is indexed in this layer, else `None`.
|
||||
fn find(&self, kmer: CanonicalKmer, n_genomes: usize) -> Option<Box<[u32]>> {
|
||||
/// Write per-genome values into `buf` if `kmer` is indexed in this layer.
|
||||
/// Returns `true` on hit; `buf` is untouched on miss.
|
||||
fn find_into(&self, kmer: CanonicalKmer, n_genomes: usize, buf: &mut [u32]) -> bool {
|
||||
match self {
|
||||
QueryLayer::SetOnly(mphf) => {
|
||||
mphf.find(kmer)
|
||||
.map(|_| vec![1u32; n_genomes].into_boxed_slice())
|
||||
if mphf.find(kmer).is_some() {
|
||||
buf[..n_genomes].fill(1);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
QueryLayer::Presence(mphf, mat) => {
|
||||
mphf.find(kmer)
|
||||
.map(|slot| mat.row(slot).iter().map(|&b| b as u32).collect())
|
||||
if let Some(slot) = mphf.find(kmer) {
|
||||
mat.fill_row(slot, &mut buf[..n_genomes]);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
QueryLayer::Count(mphf, mat) => {
|
||||
mphf.find(kmer).map(|slot| mat.row(slot))
|
||||
if let Some(slot) = mphf.find(kmer) {
|
||||
mat.fill_row(slot, &mut buf[..n_genomes]);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── PreloadedIndex ────────────────────────────────────────────────────────────
|
||||
// ── KmerPartition::query_partition* ──────────────────────────────────────────
|
||||
|
||||
/// All query layers for every partition, opened once at startup.
|
||||
///
|
||||
/// Wrap in `Arc` and share across worker threads — all access is read-only.
|
||||
pub struct PreloadedIndex {
|
||||
/// `layers[part_idx]` — ordered vec of query layers for that partition.
|
||||
/// Empty vec when the partition has no index directory yet.
|
||||
layers: Vec<Vec<QueryLayer>>,
|
||||
}
|
||||
|
||||
// SAFETY: QueryLayer and its contents are opened read-only (mmap + in-memory
|
||||
// data structures). No mutation occurs after construction.
|
||||
unsafe impl Sync for PreloadedIndex {}
|
||||
unsafe impl Send for PreloadedIndex {}
|
||||
|
||||
impl PreloadedIndex {
|
||||
/// Open all partition index directories and deserialise every MPHF once.
|
||||
impl KmerPartition {
|
||||
/// Query a single partition, calling `on_hit(sk_idx, kmer_idx, row)` for
|
||||
/// every found k-mer without allocating intermediate result vectors.
|
||||
///
|
||||
/// This is the expensive call — do it once before spawning query workers.
|
||||
pub fn new(
|
||||
partition: &KmerPartition,
|
||||
n_partitions: usize,
|
||||
with_counts: bool,
|
||||
) -> SKResult<Self> {
|
||||
let active: Vec<usize> = (0..n_partitions).collect();
|
||||
Self::new_subset(partition, n_partitions, &active, with_counts)
|
||||
}
|
||||
|
||||
/// Open only the listed partition indices.
|
||||
///
|
||||
/// Keeps file-descriptor and memory usage bounded to the active set.
|
||||
/// Unlisted partitions have an empty layer vec and return all-None on query.
|
||||
pub fn new_subset(
|
||||
partition: &KmerPartition,
|
||||
n_partitions: usize,
|
||||
active: &[usize],
|
||||
with_counts: bool,
|
||||
) -> SKResult<Self> {
|
||||
let mut layers: Vec<Vec<QueryLayer>> = (0..n_partitions).map(|_| Vec::new()).collect();
|
||||
for &i in active {
|
||||
let index_dir = partition.part_dir(i).join(INDEX_SUBDIR);
|
||||
if !index_dir.exists() {
|
||||
continue;
|
||||
}
|
||||
let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?;
|
||||
layers[i] = (0..meta.n_layers)
|
||||
.map(|l| QueryLayer::open(
|
||||
&index_dir.join(format!("layer_{l}")),
|
||||
with_counts,
|
||||
&meta.mode,
|
||||
))
|
||||
.collect::<SKResult<_>>()?;
|
||||
}
|
||||
Ok(Self { layers })
|
||||
}
|
||||
|
||||
/// Query one partition for a slice of already-routed super-kmers.
|
||||
///
|
||||
/// Returns one entry per input super-kmer; each entry is a `Vec` with one
|
||||
/// `Option<Box<[u32]>>` per k-mer inside that super-kmer:
|
||||
/// - `None` — k-mer absent from the index
|
||||
/// - `Some(row)` — per-genome count or 0/1 presence
|
||||
pub fn query_partition(
|
||||
/// `row` is a shared scratch buffer valid only for the duration of the call;
|
||||
/// the callback must copy what it needs before returning.
|
||||
pub fn query_partition_with<F>(
|
||||
&self,
|
||||
part_idx: usize,
|
||||
superkmers: &[&RoutableSuperKmer],
|
||||
k: usize,
|
||||
n_genomes: usize,
|
||||
) -> SKResult<Vec<Vec<Option<Box<[u32]>>>>> {
|
||||
with_counts: bool,
|
||||
mut on_hit: F,
|
||||
) -> SKResult<()>
|
||||
where
|
||||
F: FnMut(usize, usize, &[u32]),
|
||||
{
|
||||
if superkmers.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let layers = &self.layers[part_idx];
|
||||
|
||||
if layers.is_empty() {
|
||||
return Ok(superkmers
|
||||
.iter()
|
||||
.map(|rsk| vec![None; rsk.seql() - k + 1])
|
||||
.collect());
|
||||
let index_dir = self.part_dir(part_idx).join(INDEX_SUBDIR);
|
||||
if !index_dir.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Ok(superkmers
|
||||
.iter()
|
||||
.map(|rsk| {
|
||||
rsk.superkmer()
|
||||
.iter_canonical_kmers()
|
||||
.map(|kmer| {
|
||||
layers.iter().find_map(|layer| layer.find(kmer, n_genomes))
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.collect())
|
||||
let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?;
|
||||
let layers: Vec<QueryLayer> = (0..meta.n_layers)
|
||||
.map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode))
|
||||
.collect::<SKResult<_>>()?;
|
||||
|
||||
let mut buf = vec![0u32; n_genomes];
|
||||
|
||||
for (sk_idx, rsk) in superkmers.iter().enumerate() {
|
||||
for (kmer_idx, kmer) in rsk.superkmer().iter_canonical_kmers().enumerate() {
|
||||
for layer in &layers {
|
||||
if layer.find_into(kmer, n_genomes, &mut buf) {
|
||||
on_hit(sk_idx, kmer_idx, &buf);
|
||||
buf.fill(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ── KmerPartition::query_partition (kept for backward compatibility) ──────────
|
||||
|
||||
impl KmerPartition {
|
||||
/// Query a single partition for a slice of (already-routed) super-kmers.
|
||||
///
|
||||
/// **Prefer [`PreloadedIndex`] for repeated queries** — this method
|
||||
/// re-opens and deserialises the MPHF on every call.
|
||||
#[deprecated(note = "use PreloadedIndex::query_partition to avoid repeated MPHF I/O")]
|
||||
/// Prefer [`query_partition_with`] to avoid per-kmer heap allocations.
|
||||
pub fn query_partition(
|
||||
&self,
|
||||
part_idx: usize,
|
||||
@@ -199,13 +161,21 @@ impl KmerPartition {
|
||||
.map(|i| QueryLayer::open(&index_dir.join(format!("layer_{i}")), with_counts, &meta.mode))
|
||||
.collect::<SKResult<_>>()?;
|
||||
|
||||
let mut buf = vec![0u32; n_genomes];
|
||||
Ok(superkmers
|
||||
.iter()
|
||||
.map(|rsk| {
|
||||
rsk.superkmer()
|
||||
.iter_canonical_kmers()
|
||||
.map(|kmer| {
|
||||
layers.iter().find_map(|layer| layer.find(kmer, n_genomes))
|
||||
for layer in &layers {
|
||||
if layer.find_into(kmer, n_genomes, &mut buf) {
|
||||
let row: Box<[u32]> = buf[..n_genomes].into();
|
||||
buf.fill(0);
|
||||
return Some(row);
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user