Push ywwwypqxrtmy #14

Merged
coissac merged 6 commits from push-ywwwypqxrtmy into main 2026-06-03 13:18:41 +00:00
4 changed files with 137 additions and 11 deletions
Showing only changes of commit 1661dd6b1c - Show all commits
+21
View File
@@ -0,0 +1,21 @@
use std::sync::OnceLock;
use indicatif::MultiProgress;
static MULTI: OnceLock<MultiProgress> = OnceLock::new();
/// Initialise the shared progress display. Call once from the binary before
/// any index operation. Subsequent calls are silently ignored.
pub fn init(multi: MultiProgress) {
let _ = MULTI.set(multi);
}
/// Return the shared `MultiProgress`, creating a plain default one if the
/// binary never called [`init`].
pub fn get() -> &'static MultiProgress {
MULTI.get_or_init(MultiProgress::new)
}
pub(crate) fn multi() -> &'static MultiProgress {
get()
}
+16 -6
View File
@@ -5,6 +5,7 @@ use std::sync::Arc;
use clap::Args; use clap::Args;
use obikindex::KmerIndex; use obikindex::KmerIndex;
use obikpartitionner::PreloadedIndex;
use obilayeredmap::IndexMode; use obilayeredmap::IndexMode;
use obiread::chunk::read_sequence_chunks; use obiread::chunk::read_sequence_chunks;
use obiread::record::{SeqRecord, parse_chunk}; use obiread::record::{SeqRecord, parse_chunk};
@@ -211,6 +212,7 @@ fn apply_findere(
fn process_chunk( fn process_chunk(
idx: &KmerIndex, idx: &KmerIndex,
preloaded: &PreloadedIndex,
rope: Rope, rope: Rope,
k: usize, k: usize,
n_genomes: usize, n_genomes: usize,
@@ -243,9 +245,8 @@ fn process_chunk(
continue; continue;
} }
let kmer_results = idx let kmer_results = preloaded
.partition() .query_partition(part_idx, part_sks, k, n_genomes)
.query_partition(part_idx, part_sks, k, n_genomes, with_counts)
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
eprintln!("query error on partition {part_idx}: {e}"); eprintln!("query error on partition {part_idx}: {e}");
std::process::exit(1); std::process::exit(1);
@@ -351,6 +352,14 @@ pub fn run(args: QueryArgs) {
eprintln!("warning: --mismatch not yet implemented, ignored"); eprintln!("warning: --mismatch not yet implemented, ignored");
} }
let preloaded = Arc::new(
PreloadedIndex::new(idx.partition(), n_partitions, with_counts)
.unwrap_or_else(|e| {
eprintln!("error loading index layers: {e}");
std::process::exit(1);
})
);
let detail = args.detail; let detail = args.detail;
let count_missing = args.count_missing; let count_missing = args.count_missing;
let force_presence = args.force_presence; let force_presence = args.force_presence;
@@ -376,11 +385,12 @@ pub fn run(args: QueryArgs) {
let pipe = obipipeline::make_pipe! { let pipe = obipipeline::make_pipe! {
QueryData : Rope => Vec<u8>, QueryData : Rope => Vec<u8>,
| { | {
let idx = Arc::clone(&idx); let idx = Arc::clone(&idx);
let preloaded = Arc::clone(&preloaded);
move |rope: Rope| { move |rope: Rope| {
process_chunk( process_chunk(
&idx, rope, k, n_genomes, n_partitions, with_counts, effective_z, &idx, &preloaded, rope, k, n_genomes, n_partitions, with_counts,
detail, count_missing, force_presence, presence_threshold, effective_z, detail, count_missing, force_presence, presence_threshold,
) )
} }
} : Chunk => Output, } : Chunk => Output,
+1
View File
@@ -11,3 +11,4 @@ mod rebuild_layer;
pub use filter::KmerFilter; pub use filter::KmerFilter;
pub use merge_layer::MergeMode; pub use merge_layer::MergeMode;
pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR}; pub use partition::{KmerPartition, KmerSpectrum, PARTITIONS_SUBDIR};
pub use query_layer::PreloadedIndex;
+99 -5
View File
@@ -68,17 +68,111 @@ impl QueryLayer {
} }
} }
// ── KmerPartition::query_partition ─────────────────────────────────────────── // ── PreloadedIndex ────────────────────────────────────────────────────────────
impl KmerPartition { /// All query layers for every partition, opened once at startup.
/// Query a single partition for a slice of (already-routed) super-kmers. ///
/// Wrap in `Arc` and share across worker threads — all access is read-only.
pub struct PreloadedIndex {
/// `layers[part_idx]` — ordered vec of query layers for that partition.
/// Empty vec when the partition has no index directory yet.
layers: Vec<Vec<QueryLayer>>,
}
// SAFETY: QueryLayer and its contents are opened read-only (mmap + in-memory
// data structures). No mutation occurs after construction.
unsafe impl Sync for PreloadedIndex {}
unsafe impl Send for PreloadedIndex {}
impl PreloadedIndex {
/// Open all partition index directories and deserialise every MPHF once.
///
/// This is the expensive call — do it once before spawning query workers.
pub fn new(
partition: &KmerPartition,
n_partitions: usize,
with_counts: bool,
) -> SKResult<Self> {
let active: Vec<usize> = (0..n_partitions).collect();
Self::new_subset(partition, n_partitions, &active, with_counts)
}
/// Open only the listed partition indices.
///
/// Keeps file-descriptor and memory usage bounded to the active set.
/// Unlisted partitions have an empty layer vec and return all-None on query.
pub fn new_subset(
partition: &KmerPartition,
n_partitions: usize,
active: &[usize],
with_counts: bool,
) -> SKResult<Self> {
let mut layers: Vec<Vec<QueryLayer>> = (0..n_partitions).map(|_| Vec::new()).collect();
for &i in active {
let index_dir = partition.part_dir(i).join(INDEX_SUBDIR);
if !index_dir.exists() {
continue;
}
let meta = PartitionMeta::load(&index_dir).map_err(olm_to_sk)?;
layers[i] = (0..meta.n_layers)
.map(|l| QueryLayer::open(
&index_dir.join(format!("layer_{l}")),
with_counts,
&meta.mode,
))
.collect::<SKResult<_>>()?;
}
Ok(Self { layers })
}
/// Query one partition for a slice of already-routed super-kmers.
/// ///
/// Returns one entry per input super-kmer; each entry is a `Vec` with one /// Returns one entry per input super-kmer; each entry is a `Vec` with one
/// `Option<Box<[u32]>>` per k-mer inside that super-kmer: /// `Option<Box<[u32]>>` per k-mer inside that super-kmer:
/// - `None` — k-mer absent from the index /// - `None` — k-mer absent from the index
/// - `Some(row)` — per-genome count (count index) or 0/1 (presence index) /// - `Some(row)` — per-genome count or 0/1 presence
pub fn query_partition(
&self,
part_idx: usize,
superkmers: &[&RoutableSuperKmer],
k: usize,
n_genomes: usize,
) -> SKResult<Vec<Vec<Option<Box<[u32]>>>>> {
if superkmers.is_empty() {
return Ok(Vec::new());
}
let layers = &self.layers[part_idx];
if layers.is_empty() {
return Ok(superkmers
.iter()
.map(|rsk| vec![None; rsk.seql() - k + 1])
.collect());
}
Ok(superkmers
.iter()
.map(|rsk| {
rsk.superkmer()
.iter_canonical_kmers()
.map(|kmer| {
layers.iter().find_map(|layer| layer.find(kmer, n_genomes))
})
.collect()
})
.collect())
}
}
// ── KmerPartition::query_partition (kept for backward compatibility) ──────────
impl KmerPartition {
/// Query a single partition for a slice of (already-routed) super-kmers.
/// ///
/// All `superkmers` must belong to this partition (same minimizer bucket). /// **Prefer [`PreloadedIndex`] for repeated queries** — this method
/// re-opens and deserialises the MPHF on every call.
#[deprecated(note = "use PreloadedIndex::query_partition to avoid repeated MPHF I/O")]
pub fn query_partition( pub fn query_partition(
&self, &self,
part_idx: usize, part_idx: usize,