diff --git a/TODO.md b/TODO.md index fdb0998..b67b7c3 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,6 @@ ## Chose à vérifier suite à la commande index +- il faudrait lister les fichier qui vont être indexés - partition.meta ne devrait plus exister - les spectrums globaux devrait etre identifier par génome - regrouper dans un sous-dossier spectrums à la racine de l'index avec un nom basé sur le génome @@ -26,3 +27,5 @@ - les arbres NJ sont sauvegardés en Newick avec les longeurs de branche - dump : une table csv de l'index avec les kmer et les genomes associés en mode count ou presence/absence avec une option pour forcer le mode presence/absence meme si l'index est en mode count. Par defaut, le mode count est utilisé pour les index en mode count et le mode presence/absence pour les index en mode presence/absence. + +- status : affiche le statut de l'index diff --git a/src/Cargo.lock b/src/Cargo.lock index 858c7c1..4c1929b 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -1452,17 +1452,10 @@ dependencies = [ name = "obikindex" version = "0.1.0" dependencies = [ - "cacheline-ef", - "epserde", "indicatif", - "obicompactvec", - "obidebruinj", "obikpartitionner", - "obikseq", - "obilayeredmap", "obiskio", "obisys", - "ptr_hash", "rayon", "serde", "serde_json", @@ -1501,8 +1494,10 @@ dependencies = [ "memmap2", "niffler 3.0.0", "obicompactvec", + "obidebruinj", "obikrope", "obikseq", + "obilayeredmap", "obiread", "obiskbuilder", "obiskio", diff --git a/src/obikindex/Cargo.toml b/src/obikindex/Cargo.toml index caa12d6..bf4ebd6 100644 --- a/src/obikindex/Cargo.toml +++ b/src/obikindex/Cargo.toml @@ -5,15 +5,8 @@ edition = "2024" [dependencies] obikpartitionner = { path = "../obikpartitionner" } -obikseq = { path = "../obikseq" } -obisys = { path = "../obisys" } obiskio = { path = "../obiskio" } -obidebruinj = { path = "../obidebruinj" } -obilayeredmap = { path = "../obilayeredmap" } -obicompactvec = { path = "../obicompactvec" } -cacheline-ef = "1.1" -epserde = "0.8" -ptr_hash = "1.1" +obisys = { path = "../obisys" } rayon = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/src/obikindex/src/error.rs b/src/obikindex/src/error.rs index 55b2a79..df191ed 100644 --- a/src/obikindex/src/error.rs +++ b/src/obikindex/src/error.rs @@ -2,14 +2,12 @@ use std::fmt; use std::io; use obiskio::SKError; -use obilayeredmap::OLMError; #[derive(Debug)] pub enum OKIError { Io(io::Error), Json(serde_json::Error), Partition(SKError), - Layer(OLMError), } pub type OKIResult = Result; @@ -20,7 +18,6 @@ impl fmt::Display for OKIError { OKIError::Io(e) => write!(f, "I/O error: {e}"), OKIError::Json(e) => write!(f, "JSON error: {e}"), OKIError::Partition(e) => write!(f, "partition error: {e}"), - OKIError::Layer(e) => write!(f, "layer error: {e}"), } } } @@ -31,7 +28,6 @@ impl std::error::Error for OKIError { OKIError::Io(e) => Some(e), OKIError::Json(e) => Some(e), OKIError::Partition(e) => Some(e), - OKIError::Layer(e) => Some(e), } } } @@ -47,7 +43,3 @@ impl From for OKIError { impl From for OKIError { fn from(e: SKError) -> Self { OKIError::Partition(e) } } - -impl From for OKIError { - fn from(e: OLMError) -> Self { OKIError::Layer(e) } -} diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs index 37b30c1..4e6bd7c 100644 --- a/src/obikindex/src/index.rs +++ b/src/obikindex/src/index.rs @@ -3,16 +3,9 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use cacheline_ef::{CachelineEf, CachelineEfVec}; -use epserde::prelude::*; use indicatif::{ProgressBar, ProgressStyle}; -use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; -use obidebruinj::GraphDeBruijn; use obikpartitionner::KmerPartition; -use obilayeredmap::layer::Layer; -use obiskio::{SKFileMeta, SKFileReader}; use obisys::{Reporter, Stage}; -use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; use rayon::prelude::*; use tracing::info; @@ -20,8 +13,6 @@ use crate::error::{OKIError, OKIResult}; use crate::meta::{IndexConfig, IndexMeta}; use crate::state::{IndexState, SENTINEL_INDEXED, SENTINEL_SCATTERED}; -type Mphf = PtrHash>, Xx64, Vec>; - pub struct KmerIndex { root_path: PathBuf, meta: IndexMeta, @@ -59,7 +50,12 @@ impl KmerIndex { pub fn open>(path: P) -> OKIResult { let root_path = path.as_ref().to_owned(); let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?; - let partition = KmerPartition::open(&root_path)?; + let partition = KmerPartition::open_with_config( + &root_path, + meta.config.kmer_size, + meta.config.minimizer_size, + meta.config.n_bits, + )?; Ok(Self { root_path, meta, partition }) } @@ -87,13 +83,10 @@ impl KmerIndex { /// Mark scatter as complete and write `scatter.done`. /// /// If no genome label was set at creation time, one is derived from - /// `first_scatter_path` (filename stripped of all extensions). - /// If `first_scatter_path` is also `None`, the label defaults to `"unknown"`. - pub fn mark_scattered(&mut self, first_scatter_path: Option<&Path>) -> OKIResult<()> { + /// the index root directory name (stripped of all extensions). + pub fn mark_scattered(&mut self) -> OKIResult<()> { if self.meta.genomes.is_empty() { - let label = first_scatter_path - .map(label_from_path) - .unwrap_or_else(|| "unknown".to_string()); + let label = label_from_path(&self.root_path); self.meta.genomes.push(label); self.meta.write(&self.root_path)?; } @@ -116,20 +109,9 @@ impl KmerIndex { Ok(()) } - /// Build the layered MPHF index for all partitions. - /// - /// Default mode (`config.with_counts = false`): set membership only. - /// With counts: count matrix per kmer. + /// Build the layered MPHF index for all partitions in parallel. /// /// Writes `index.done` upon completion. - /// Path to the unitigs file for partition `part`, layer `layer`. - pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf { - self.partition.part_dir(part) - .join("index") - .join(format!("layer_{layer}")) - .join("unitigs.bin") - } - pub fn build_layers( &self, min_ab: u32, @@ -140,12 +122,8 @@ impl KmerIndex { let n = self.partition.n_partitions(); let t = Stage::start("index"); let with_counts = self.meta.config.with_counts; - let filter_active = min_ab > 1 || max_ab.is_some(); - let need_counts = filter_active || with_counts; let total_kmers = AtomicUsize::new(0); - let partition = &self.partition; - let pb = Arc::new(Mutex::new( ProgressBar::new(n as u64).with_style( ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(), @@ -153,101 +131,19 @@ impl KmerIndex { )); (0..n).into_par_iter().for_each(|i| { - let part_dir = partition.part_dir(i); - let dedup_path = part_dir.join("dereplicated.skmer.zst"); - if !dedup_path.exists() { - return; - } - - let layer_dir = part_dir.join("index").join("layer_0"); - if layer_dir.join("mphf.bin").exists() { - return; - } - - let mphf1_opt: Option = if need_counts { - let p = part_dir.join("mphf1.bin"); - p.exists().then(|| Mphf::load_full(&p).ok()).flatten() - } else { - None - }; - - let counts1_opt: Option = if need_counts { - let p = part_dir.join("counts1.bin"); - p.exists() - .then(|| PersistentCompactIntVec::open(&p).ok()) - .flatten() - } else { - None - }; - - let mut g = GraphDeBruijn::new(); - let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", dedup_path.display()); - std::process::exit(1); - }); - for sk in reader.iter() { - for kmer in sk.iter_canonical_kmers() { - let accept = if filter_active { - match (&mphf1_opt, &counts1_opt) { - (Some(mphf), Some(counts)) => { - let ab = counts.get(mphf.index(&kmer.raw())); - ab >= min_ab && max_ab.map_or(true, |max| ab <= max) - } - _ => true, - } - } else { - true - }; - if accept { - g.push(kmer); - } + match self.partition.build_index_layer(i, min_ab, max_ab, with_counts) { + Ok(0) => {} + Ok(n_kmers) => { + total_kmers.fetch_add(n_kmers, Ordering::Relaxed); + let pb = pb.lock().unwrap(); + pb.inc(1); + pb.set_message(format!("{i}: {n_kmers} kmers")); + } + Err(e) => { + eprintln!("error building layer for partition {i}: {e}"); + std::process::exit(1); } } - - let n_kmers = g.len(); - total_kmers.fetch_add(n_kmers, Ordering::Relaxed); - g.compute_degrees(); - - fs::create_dir_all(&layer_dir).unwrap_or_else(|e| { - eprintln!("error creating {}: {e}", layer_dir.display()); - std::process::exit(1); - }); - let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| { - eprintln!("error creating unitig writer (partition {i}): {e}"); - std::process::exit(1); - }); - for unitig in g.iter_unitig() { - uw.write(&unitig).unwrap_or_else(|e| { - eprintln!("error writing unitig (partition {i}): {e}"); - std::process::exit(1); - }); - } - uw.close().unwrap_or_else(|e| { - eprintln!("error closing unitig writer (partition {i}): {e}"); - std::process::exit(1); - }); - - if with_counts { - Layer::::build(&layer_dir, |kmer| { - match (&mphf1_opt, &counts1_opt) { - (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), - _ => 1, - } - }) - .unwrap_or_else(|e| { - eprintln!("error building count layer (partition {i}): {e}"); - std::process::exit(1); - }); - } else { - Layer::<()>::build(&layer_dir).unwrap_or_else(|e| { - eprintln!("error building set layer (partition {i}): {e}"); - std::process::exit(1); - }); - } - - let pb = pb.lock().unwrap(); - pb.inc(1); - pb.set_message(format!("{i}: {n_kmers} kmers")); }); pb.lock().unwrap().finish_and_clear(); @@ -258,13 +154,7 @@ impl KmerIndex { if !keep_intermediate { for i in 0..n { - let part_dir = partition.part_dir(i); - remove_if_exists(&part_dir.join("dereplicated.skmer.zst")); - remove_if_exists(&SKFileMeta::sidecar_path( - &part_dir.join("dereplicated.skmer.zst"), - )); - remove_if_exists(&part_dir.join("mphf1.bin")); - remove_if_exists(&part_dir.join("counts1.bin")); + self.partition.remove_build_artifacts(i); } } @@ -272,9 +162,16 @@ impl KmerIndex { rep.push(t.stop()); Ok(()) } + + /// Path to the unitigs file for partition `part`, layer `layer`. + pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf { + self.partition.part_dir(part) + .join("index") + .join(format!("layer_{layer}")) + .join("unitigs.bin") + } } -/// Derive a genome label from a file path: filename stripped of all extensions. fn label_from_path(path: &Path) -> String { let name = path .file_name() @@ -291,11 +188,3 @@ fn label_from_path(path: &Path) -> String { fn touch(path: &Path) -> Result<(), std::io::Error> { fs::File::create(path).map(|_| ()) } - -fn remove_if_exists(path: &Path) { - if let Err(e) = fs::remove_file(path) { - if e.kind() != std::io::ErrorKind::NotFound { - eprintln!("warning: could not remove {}: {e}", path.display()); - } - } -} diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 40349e1..089402c 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -72,7 +72,10 @@ pub fn run(args: IndexArgs) { // ── Stage 1: scatter ───────────────────────────────────────────────────── if idx.state() < IndexState::Scattered { - let first_path = args.common.inputs.first().map(PathBuf::from); + let paths: Vec<_> = args.common.seqfile_paths().collect(); + for path in &paths { + info!("indexing: {}", path.display()); + } let k = idx.kmer_size(); let level_max = args.common.level_max; let theta = args.common.theta; @@ -80,7 +83,7 @@ pub fn run(args: IndexArgs) { scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep); - idx.mark_scattered(first_path.as_deref()).unwrap_or_else(|e| { + idx.mark_scattered().unwrap_or_else(|e| { eprintln!("error marking scatter done: {e}"); std::process::exit(1); }); diff --git a/src/obikpartitionner/Cargo.toml b/src/obikpartitionner/Cargo.toml index 8e94ff2..d62fdf4 100644 --- a/src/obikpartitionner/Cargo.toml +++ b/src/obikpartitionner/Cargo.toml @@ -13,8 +13,10 @@ obikrope = { path = "../obikrope" } [dependencies] niffler = "3.0.0" remove_dir_all = "0.8" -obikseq = { path = "../obikseq" } -obiskio = { path = "../obiskio" } +obikseq = { path = "../obikseq" } +obiskio = { path = "../obiskio" } +obidebruinj = { path = "../obidebruinj" } +obilayeredmap = { path = "../obilayeredmap" } rayon = "1" sysinfo = "0.33" serde = { version = "1", features = ["derive"] } diff --git a/src/obikpartitionner/src/index_layer.rs b/src/obikpartitionner/src/index_layer.rs new file mode 100644 index 0000000..a6d873a --- /dev/null +++ b/src/obikpartitionner/src/index_layer.rs @@ -0,0 +1,135 @@ +use std::fs; +use std::io; + +use cacheline_ef::{CachelineEf, CachelineEfVec}; +use epserde::prelude::*; +use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; +use obidebruinj::GraphDeBruijn; +use obilayeredmap::{OLMError, layer::Layer}; +use obiskio::{SKError, SKFileMeta, SKFileReader}; +use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; + +use crate::partition::KmerPartition; + +type Mphf = PtrHash>, Xx64, Vec>; + +fn olm_to_sk(e: OLMError) -> SKError { + match e { + OLMError::Io(io_err) => SKError::Io(io_err), + other => SKError::InvalidData { context: "layer build", detail: other.to_string() }, + } +} + +fn remove_if_exists(path: &std::path::Path) { + if let Err(e) = fs::remove_file(path) { + if e.kind() != io::ErrorKind::NotFound { + eprintln!("warning: could not remove {}: {e}", path.display()); + } + } +} + +impl KmerPartition { + /// Build the layered MPHF index for partition `i`. + /// + /// Returns the number of canonical k-mers indexed, or 0 if the partition + /// has no data or its layer was already built (resume-safe). + /// + /// Abundance filtering is applied when `min_ab > 1` or `max_ab.is_some()`, + /// using `mphf1.bin` + `counts1.bin` if they exist. + /// Count payload is stored iff `with_counts` is true. + pub fn build_index_layer( + &self, + i: usize, + min_ab: u32, + max_ab: Option, + with_counts: bool, + ) -> Result { + let part_dir = self.part_dir(i); + let dedup_path = part_dir.join("dereplicated.skmer.zst"); + if !dedup_path.exists() { + return Ok(0); + } + + let layer_dir = part_dir.join("index").join("layer_0"); + if layer_dir.join("mphf.bin").exists() { + return Ok(0); + } + + let filter_active = min_ab > 1 || max_ab.is_some(); + let need_counts = filter_active || with_counts; + + let mphf1_opt: Option = if need_counts { + let p = part_dir.join("mphf1.bin"); + p.exists().then(|| Mphf::load_full(&p).ok()).flatten() + } else { + None + }; + + let counts1_opt: Option = if need_counts { + let p = part_dir.join("counts1.bin"); + p.exists() + .then(|| PersistentCompactIntVec::open(&p).ok()) + .flatten() + } else { + None + }; + + let mut g = GraphDeBruijn::new(); + let mut reader = SKFileReader::open(&dedup_path)?; + for sk in reader.iter() { + for kmer in sk.iter_canonical_kmers() { + let accept = if filter_active { + match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => { + let ab = counts.get(mphf.index(&kmer.raw())); + ab >= min_ab && max_ab.map_or(true, |max| ab <= max) + } + _ => true, + } + } else { + true + }; + if accept { + g.push(kmer); + } + } + } + + let n_kmers = g.len(); + g.compute_degrees(); + + fs::create_dir_all(&layer_dir)?; + + let mut uw = Layer::<()>::unitig_writer(&layer_dir).map_err(olm_to_sk)?; + for unitig in g.iter_unitig() { + uw.write(&unitig)?; + } + uw.close()?; + + if with_counts { + Layer::::build(&layer_dir, |kmer| { + match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), + _ => 1, + } + }) + .map_err(olm_to_sk)?; + } else { + Layer::<()>::build(&layer_dir).map_err(olm_to_sk)?; + } + + Ok(n_kmers) + } + + /// Remove intermediate build artifacts for partition `i`. + /// + /// Deletes `dereplicated.skmer.zst` (+ sidecar), `mphf1.bin`, `counts1.bin`. + pub fn remove_build_artifacts(&self, i: usize) { + let part_dir = self.part_dir(i); + let dedup = part_dir.join("dereplicated.skmer.zst"); + remove_if_exists(&SKFileMeta::sidecar_path(&dedup)); + remove_if_exists(&dedup); + remove_if_exists(&part_dir.join("mphf1.bin")); + remove_if_exists(&part_dir.join("counts1.bin")); + } +} diff --git a/src/obikpartitionner/src/lib.rs b/src/obikpartitionner/src/lib.rs index 09fc147..bd7934f 100644 --- a/src/obikpartitionner/src/lib.rs +++ b/src/obikpartitionner/src/lib.rs @@ -1,3 +1,4 @@ +mod index_layer; mod kmer_sort; mod partition; diff --git a/src/obikpartitionner/src/partition.rs b/src/obikpartitionner/src/partition.rs index a638f93..5740aa2 100644 --- a/src/obikpartitionner/src/partition.rs +++ b/src/obikpartitionner/src/partition.rs @@ -18,7 +18,6 @@ use obiskio::{SKFileMeta, SKFileReader, SKFileWriter, SKResult}; use ptr_hash::{PtrHash, PtrHashParams, bucket_fn::CubicEps, hash::Xx64}; use rayon::prelude::*; use remove_dir_all::remove_dir_all; -use serde::{Deserialize, Serialize}; use sysinfo::System; use niffler::Level; @@ -28,18 +27,9 @@ use crate::kmer_sort::{chunk_size_from_ram, sort_unique_kmers}; type Mphf = PtrHash>, Xx64, Vec>; -const META_FILENAME: &str = "partition.meta"; const SK_EXT: &str = "skmer.zst"; pub const PARTITIONS_SUBDIR: &str = "partitions"; -#[derive(Serialize, Deserialize)] -struct PartitionMeta { - n_bits: usize, - kmer_size: usize, - minimizer_size: usize, - level: u32, -} - pub struct KmerPartition { root_path: PathBuf, n_partitions: usize, @@ -98,11 +88,15 @@ impl KmerPartition { level, closed: false, }; - partition.write_meta(n_bits)?; Ok(partition) } - pub fn open>(path: P) -> SKResult { + pub fn open_with_config>( + path: P, + kmer_size: usize, + minimizer_size: usize, + n_bits: usize, + ) -> SKResult { let root_path = path.as_ref().to_owned(); if !root_path.exists() { return Err(io::Error::new( @@ -111,22 +105,17 @@ impl KmerPartition { ) .into()); } - let meta_path = root_path.join(META_FILENAME); - let meta: PartitionMeta = - serde_json::from_reader(fs::File::open(&meta_path)?).map_err(io::Error::other)?; - - let level = level_from_u32(meta.level); - let n_partitions = 1usize << meta.n_bits; + let n_partitions = 1usize << n_bits; let writers = (0..n_partitions).map(|_| None).collect(); Ok(Self { root_path, n_partitions, - partitions_mask: (1u64 << meta.n_bits) - 1, - kmer_size: meta.kmer_size, - minimizer_size: meta.minimizer_size, + partitions_mask: (1u64 << n_bits) - 1, + kmer_size, + minimizer_size, writers, - level, - closed: true, // read-only: writing is not allowed on an opened partition + level: Level::One, + closed: true, }) } @@ -339,18 +328,6 @@ impl KmerPartition { } } - fn write_meta(&self, n_bits: usize) -> SKResult<()> { - let meta = PartitionMeta { - n_bits, - kmer_size: self.kmer_size, - minimizer_size: self.minimizer_size, - level: u32::from(self.level), - }; - let f = fs::File::create(self.root_path.join(META_FILENAME))?; - serde_json::to_writer_pretty(f, &meta).map_err(|e| io::Error::other(e))?; - Ok(()) - } - fn ensure_writer(&mut self, partition: usize) -> SKResult<&mut SKFileWriter> { if self.writers[partition].is_none() { let dir = self.root_path.join(PARTITIONS_SUBDIR).join(format!("part_{:05}", partition)); @@ -411,32 +388,6 @@ fn optimal_buckets(raw_path: &Path, available_bytes: u64) -> usize { n.next_power_of_two() as usize } -fn level_from_u32(n: u32) -> Level { - match n { - 0 => Level::Zero, - 1 => Level::One, - 2 => Level::Two, - 3 => Level::Three, - 4 => Level::Four, - 5 => Level::Five, - 6 => Level::Six, - 7 => Level::Seven, - 8 => Level::Eight, - 9 => Level::Nine, - 10 => Level::Ten, - 11 => Level::Eleven, - 12 => Level::Twelve, - 13 => Level::Thirteen, - 14 => Level::Fourteen, - 15 => Level::Fifteen, - 16 => Level::Sixteen, - 17 => Level::Seventeen, - 18 => Level::Eighteen, - 19 => Level::Nineteen, - 20 => Level::Twenty, - _ => Level::TwentyOne, - } -} /// Maximum value that fits in the 24-bit COUNT field of a SuperKmer header. const MAX_SK_COUNT: u64 = (1 << 24) - 1;