From 17c9e076bd5d47bc740d25d78f48b084092e8733 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 20 May 2026 18:21:05 +0200 Subject: [PATCH] refactor: extract obikindex crate and remove deprecated CLI commands Extracted core indexing logic, state tracking, and metadata management into a new `obikindex` crate. Refactored the `index` and `unitig` commands to leverage the `KmerIndex` abstraction and state-driven pipeline transitions. Removed obsolete CLI subcommands (`count`, `fasta`, `longtig`, `partition`) and their associated pipeline steps. Updated FASTA writing utilities for single-line output and deterministic identifiers, and refreshed workspace dependencies. --- TODO.md | 30 +- src/Cargo.lock | 720 ++---------------- src/Cargo.toml | 2 +- src/obidebruinj/src/debruijn.rs | 109 --- src/obifastwrite/src/fasta.rs | 44 +- src/obifastwrite/src/lib.rs | 205 +++-- src/obikindex/Cargo.toml | 21 + src/obikindex/src/error.rs | 53 ++ src/obikindex/src/index.rs | 301 ++++++++ src/obikindex/src/lib.rs | 9 + src/obikindex/src/meta.rs | 45 ++ src/obikindex/src/state.rs | 45 ++ src/obikmer/Cargo.toml | 36 +- src/obikmer/src/cmd/count.rs | 24 - src/obikmer/src/cmd/fasta.rs | 84 -- src/obikmer/src/cmd/index.rs | 104 ++- src/obikmer/src/cmd/longtig.rs | 143 ---- src/obikmer/src/cmd/mod.rs | 4 - src/obikmer/src/cmd/partition.rs | 49 -- src/obikmer/src/cmd/unitig.rs | 141 +--- src/obikmer/src/main.rs | 22 +- src/obikmer/src/steps/build_index.rs | 177 ----- .../src/steps/dereplicate_and_count.rs | 13 - src/obikmer/src/steps/mod.rs | 4 - 24 files changed, 792 insertions(+), 1593 deletions(-) create mode 100644 src/obikindex/Cargo.toml create mode 100644 src/obikindex/src/error.rs create mode 100644 src/obikindex/src/index.rs create mode 100644 src/obikindex/src/lib.rs create mode 100644 src/obikindex/src/meta.rs create mode 100644 src/obikindex/src/state.rs delete mode 100644 src/obikmer/src/cmd/count.rs delete mode 100644 src/obikmer/src/cmd/fasta.rs delete mode 100644 src/obikmer/src/cmd/longtig.rs delete mode 100644 src/obikmer/src/cmd/partition.rs delete mode 100644 src/obikmer/src/steps/build_index.rs delete mode 100644 src/obikmer/src/steps/dereplicate_and_count.rs diff --git a/TODO.md b/TODO.md index d93136d..fdb0998 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,28 @@ -## Dans OBILayeredMap +## Chose à vérifier suite à la commande index -- Est-ce que CachelineEfVec est vraiment justifier dans notre cas. vu les contraintes sur la distribution des valeurs imposées par CachelineEfVec en terme d'ordre, de grandeur et de dispersion ? -- Il semble que le count de kmer soit stocké, ce qui doit-être une possibilité pas une obligation. +- partition.meta ne devrait plus exister +- les spectrums globaux devrait etre identifier par génome + - regrouper dans un sous-dossier spectrums à la racine de l'index avec un nom basé sur le génome +- les spectrum patiels ont-ils vocation à être conserver ? +- l'étape de déreplication dure quasiment autant de temps que le comptage mais ne laisse aucune trace de progression à l'utilisateur + +## commandes à ajouter + +- merge : pour construire un index à partir d'index existants + - deux modes : count et presence/absence. count exige que tous les index mergés soient déjà en mode count. mode presence/absence par defaut. Si passage de mode count à mode presence/absence, par defaut presence = count >= 1. Possibilité de spécifier un seuil personnalisé. + +- filter : produit un nouvel index filtré à partir d'un index existant en verifiant que les kmer présents dans le nouvel index respectent les critères de filtrage spécifiés + - quorum de presence en fraction-(min/max) du nombre de génomes, en nombre-(min/max) de génomes, si mode count la présence peut être défini par un seuil personnalisé minimum et maximum + +- aggregate : aggrege toutes les colonnes d'une matrice d'index en une seule colonne. + +- query : scan un fichier de sequences et retourne pour chaque sequence quels kmer sont présents dans l'index et dans quel genomes + +- distance : calcule la matrice de distance entre les genomes + - proposer une option pour chaque distance à calculer + - un possibité de récuperer la matrice des kmer communs + - un possibité de calculer l'arbre nj + - les matrices sont sauvegardées en CSV + - les arbres NJ sont sauvegardés en Newick avec les longeurs de branche + +- dump : une table csv de l'index avec les kmer et les genomes associés en mode count ou presence/absence avec une option pour forcer le mode presence/absence meme si l'index est en mode count. Par defaut, le mode count est utilisé pour les index en mode count et le mode presence/absence pour les index en mode presence/absence. diff --git a/src/Cargo.lock b/src/Cargo.lock index cd1f37c..858c7c1 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -48,33 +48,12 @@ dependencies = [ "as-slice", ] -[[package]] -name = "aligned-vec" -version = "0.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b" -dependencies = [ - "equator", -] - [[package]] name = "allocator-api2" version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" -[[package]] -name = "ambassador" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e68de4cdc6006162265d0957edb4a860fe4e711b1dc17a5746fd95f952f08285" -dependencies = [ - "itertools 0.10.5", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "anes" version = "0.1.6" @@ -149,12 +128,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ad8689a486416c401ea15715a4694de30054248ec627edbf31f49cb64ee4086" -[[package]] -name = "arrayvec" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" - [[package]] name = "as-slice" version = "0.2.1" @@ -182,7 +155,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -203,12 +176,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "binout" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "222fb4925a15bea6a68075021910e03d6aa2d04951d71ff1d956190a551d738f" - [[package]] name = "bitflags" version = "1.3.2" @@ -221,15 +188,6 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" -[[package]] -name = "bitm" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31468ea4a856000d83cb61960dfdc2980ecd96b15b61321c8c76cc96aea6e688" -dependencies = [ - "dyn_size_of", -] - [[package]] name = "bitvec" version = "1.0.1" @@ -320,8 +278,8 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af737c6c59cb018ecbe6472cbdf86d39c59d78252febfe311953a991b6e4ed85" dependencies = [ - "common_traits 0.11.4", - "epserde 0.8.0", + "common_traits", + "epserde", "mem_dbg", ] @@ -433,7 +391,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -475,18 +433,7 @@ checksum = "fda9ae1f26adcae83adb2e92f69cf59421f2a277a942f49f8e59f2fcbd7cf062" dependencies = [ "anyhow", "half", - "impl-tools 0.10.3", -] - -[[package]] -name = "common_traits" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65d0a1296e8d359cb197a8f8289f3d3f77cdb67f1a83d0aeb0820a5b7aea4058" -dependencies = [ - "anyhow", - "half", - "impl-tools 0.11.4", + "impl-tools", ] [[package]] @@ -648,41 +595,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "darling" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" -dependencies = [ - "darling_core", - "darling_macro", -] - -[[package]] -name = "darling_core" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim", - "syn 2.0.117", -] - -[[package]] -name = "darling_macro" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" -dependencies = [ - "darling_core", - "quote", - "syn 2.0.117", -] - [[package]] name = "debugid" version = "0.8.0" @@ -692,29 +604,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "derivative" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - -[[package]] -name = "derive_setters" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e6f6fa1f03c14ae082120b84b3c7fbd7b8588d924cf2d7c3daf9afd49df8b9" -dependencies = [ - "darling", - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "digest" version = "0.10.7" @@ -733,27 +622,9 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] -[[package]] -name = "dsi-progress-logger" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3969f942da74913b951e19784a2c0b61a2b0d11c2887f6eb3b9b34775b31bf" -dependencies = [ - "log", - "num-format", - "pluralizer", - "sysinfo 0.36.1", -] - -[[package]] -name = "dyn_size_of" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a742b95783b1f45b900129082cbc47717b6a77ee8d17eea70a8ea62462f5de3" - [[package]] name = "either" version = "1.15.0" @@ -775,30 +646,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.117", -] - -[[package]] -name = "env_filter" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "jiff", - "log", + "syn", ] [[package]] @@ -809,8 +657,8 @@ checksum = "c40d342ff20a2ce62d9a85ce406e672dfa137f902ac9670034533184f1533976" dependencies = [ "anyhow", "bitflags 2.11.1", - "common_traits 0.11.4", - "epserde-derive 0.8.0", + "common_traits", + "epserde-derive", "maligned", "mem_dbg", "mmap-rs", @@ -819,23 +667,6 @@ dependencies = [ "xxhash-rust", ] -[[package]] -name = "epserde" -version = "0.11.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8dffc01a379703ad5178f47a22aa532f5811b3ef45979ccd66b79da9856770b" -dependencies = [ - "anyhow", - "bitflags 2.11.1", - "common_traits 0.12.1", - "epserde-derive 0.11.0", - "mem_dbg", - "mmap-rs", - "sealed", - "thiserror 2.0.18", - "xxhash-rust", -] - [[package]] name = "epserde-derive" version = "0.8.0" @@ -844,39 +675,7 @@ checksum = "ac80cc78b69765703f48ad93f33b8919cf5d907cda7459ad6ba2919cbbe605dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", -] - -[[package]] -name = "epserde-derive" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fc2ceb99084df049085a5bdd15e3b2f7275111e2b9029f95fb01a3a06cf1b13" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn 2.0.117", -] - -[[package]] -name = "equator" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc" -dependencies = [ - "equator-macro", -] - -[[package]] -name = "equator-macro" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -895,12 +694,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "fallible-iterator" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" - [[package]] name = "fastrand" version = "2.4.1" @@ -1162,12 +955,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "ident_case" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" - [[package]] name = "idna" version = "1.1.0" @@ -1198,19 +985,7 @@ dependencies = [ "autocfg", "impl-tools-lib", "proc-macro-error2", - "syn 2.0.117", -] - -[[package]] -name = "impl-tools" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ae314a99afb5821e2fda288387546d4a04aace674551e854e6216b892ec3208" -dependencies = [ - "autocfg", - "impl-tools-lib", - "proc-macro-error2", - "syn 2.0.117", + "syn", ] [[package]] @@ -1222,7 +997,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -1307,47 +1082,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" -[[package]] -name = "jiff" -version = "0.2.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00b5dbd620d61dfdcb6007c9c1f6054ebd75319f163d886a9055cec1155073d" -dependencies = [ - "jiff-static", - "jiff-tzdb-platform", - "log", - "portable-atomic", - "portable-atomic-util", - "serde_core", - "windows-sys 0.61.2", -] - -[[package]] -name = "jiff-static" -version = "0.2.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e000de030ff8022ea1da3f466fbb0f3a809f5e51ed31f6dd931c35181ad8e6d7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - -[[package]] -name = "jiff-tzdb" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c900ef84826f1338a557697dc8fc601df9ca9af4ac137c7fb61d4c6f2dfd3076" - -[[package]] -name = "jiff-tzdb-platform" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8" -dependencies = [ - "jiff-tzdb", -] - [[package]] name = "jobserver" version = "0.1.34" @@ -1368,44 +1102,12 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "lambert_w" -version = "1.2.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5f0846ee4f0299ca4c5b9ca06ff55cf88b3430a763bf591474cc734479c9b24" -dependencies = [ - "num-complex", - "num-traits", -] - [[package]] name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" -[[package]] -name = "lender" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c150e24afda8f769930a339cb5ad6e182101fdf1165c30c157b33ce5050fd7ad" -dependencies = [ - "fallible-iterator", - "lender-derive", - "stable_try_trait_v2", -] - -[[package]] -name = "lender-derive" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d074a297c82222d442171bad4f392fef93d35fb31e24a115f605a0c907ce0af9" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "libc" version = "0.2.185" @@ -1522,7 +1224,7 @@ checksum = "d84f40c93b0508d5565db79a814d02d5b2545967205ce44be211592aafa34d6c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -1693,16 +1395,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-format" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3" -dependencies = [ - "arrayvec", - "itoa", -] - [[package]] name = "num-integer" version = "0.1.46" @@ -1757,30 +1449,43 @@ dependencies = [ ] [[package]] -name = "obikmer" +name = "obikindex" version = "0.1.0" dependencies = [ "cacheline-ef", - "clap", - "epserde 0.8.0", + "epserde", "indicatif", - "memmap2", - "niffler 3.0.0", "obicompactvec", "obidebruinj", + "obikpartitionner", + "obikseq", + "obilayeredmap", + "obiskio", + "obisys", + "ptr_hash", + "rayon", + "serde", + "serde_json", + "tracing", +] + +[[package]] +name = "obikmer" +version = "0.1.0" +dependencies = [ + "clap", + "indicatif", "obifastwrite", + "obikindex", "obikpartitionner", "obikrope", "obikseq", - "obilayeredmap", "obipipeline", "obiread", "obiskbuilder", "obiskio", "obisys", - "ph", "pprof", - "ptr_hash", "rayon", "tracing", "tracing-subscriber", @@ -1791,7 +1496,7 @@ name = "obikpartitionner" version = "0.1.0" dependencies = [ "cacheline-ef", - "epserde 0.8.0", + "epserde", "indicatif", "memmap2", "niffler 3.0.0", @@ -1806,7 +1511,7 @@ dependencies = [ "remove_dir_all", "serde", "serde_json", - "sysinfo 0.33.1", + "sysinfo", "tempfile", "tracing", ] @@ -1835,7 +1540,7 @@ name = "obilayeredmap" version = "0.1.0" dependencies = [ "cacheline-ef", - "epserde 0.8.0", + "epserde", "memmap2", "ndarray", "obicompactvec", @@ -1898,25 +1603,6 @@ dependencies = [ "libc", ] -[[package]] -name = "objc2-core-foundation" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" -dependencies = [ - "bitflags 2.11.1", -] - -[[package]] -name = "objc2-io-kit" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" -dependencies = [ - "libc", - "objc2-core-foundation", -] - [[package]] name = "object" version = "0.37.3" @@ -1964,7 +1650,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -1989,25 +1675,6 @@ dependencies = [ "indexmap", ] -[[package]] -name = "ph" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a447f203c7254ebb06aa4f111480e821debb361e80fb4d760554a8460f236550" -dependencies = [ - "aligned-vec", - "arrayvec", - "binout", - "bitm", - "dyn_size_of", - "epserde 0.11.5", - "mem_dbg", - "rayon", - "seedable_hash", - "sux", - "voracious_radix_sort", -] - [[package]] name = "pin-project-lite" version = "0.2.17" @@ -2054,16 +1721,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "pluralizer" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b3eba432a00a1f6c16f39147847a870e94e2e9b992759b503e330efec778cbe" -dependencies = [ - "once_cell", - "regex", -] - [[package]] name = "portable-atomic" version = "1.13.1" @@ -2128,31 +1785,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.117", -] - -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", + "syn", ] [[package]] @@ -2212,7 +1845,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.117", + "syn", "tempfile", ] @@ -2226,7 +1859,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -2249,9 +1882,9 @@ dependencies = [ "cacheline-ef", "clap", "colored", - "common_traits 0.11.4", - "epserde 0.8.0", - "epserde-derive 0.8.0", + "common_traits", + "epserde", + "epserde-derive", "fastrand", "fxhash", "itertools 0.14.0", @@ -2515,15 +2148,9 @@ checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] -[[package]] -name = "seedable_hash" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba2a159211665e29bbf5a2fbb42da50dd6eadff23eef7a6a7ae4a9b0a7cd0152" - [[package]] name = "serde" version = "1.0.228" @@ -2551,7 +2178,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -2611,12 +2238,6 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" -[[package]] -name = "stable_try_trait_v2" -version = "1.75.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c4e48411f4db8ccca0470bfb67e3bb821af4227d455aa147917d8d109be0d13" - [[package]] name = "strsim" version = "0.11.1" @@ -2639,47 +2260,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "sux" -version = "0.10.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29949eff4a64e05149a5147f1695fd8576c990504a217f28391c09e007d831dc" -dependencies = [ - "ambassador", - "anyhow", - "arbitrary-chunks", - "bitflags 2.11.1", - "clap", - "common_traits 0.12.1", - "crossbeam-channel", - "derivative", - "derive_setters", - "dsi-progress-logger", - "env_logger", - "epserde 0.11.5", - "fallible-iterator", - "flate2", - "impl-tools 0.11.4", - "itertools 0.14.0", - "jiff", - "lambert_w", - "lender", - "libc", - "log", - "mem_dbg", - "rand", - "rayon", - "rdst", - "sync-cell-slice", - "tempfile", - "thiserror 2.0.18", - "thread-priority", - "value-traits", - "xxhash-rust", - "zerocopy", - "zstd", -] - [[package]] name = "symbolic-common" version = "12.18.3" @@ -2703,17 +2283,6 @@ dependencies = [ "symbolic-common", ] -[[package]] -name = "syn" -version = "1.0.109" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - [[package]] name = "syn" version = "2.0.117" @@ -2725,12 +2294,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "sync-cell-slice" -version = "0.9.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cf9ace719a3856838781680d5d677c612e01a0bc0b7b1ded355057ca5015997" - [[package]] name = "synstructure" version = "0.13.2" @@ -2739,7 +2302,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -2770,20 +2333,6 @@ dependencies = [ "windows 0.57.0", ] -[[package]] -name = "sysinfo" -version = "0.36.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "252800745060e7b9ffb7b2badbd8b31cfa4aa2e61af879d0a3bf2a317c20217d" -dependencies = [ - "libc", - "memchr", - "ntapi", - "objc2-core-foundation", - "objc2-io-kit", - "windows 0.61.3", -] - [[package]] name = "tap" version = "1.0.1" @@ -2829,7 +2378,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -2840,21 +2389,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", -] - -[[package]] -name = "thread-priority" -version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2210811179577da3d54eb69ab0b50490ee40491a25d95b8c6011ba40771cb721" -dependencies = [ - "bitflags 2.11.1", - "cfg-if", - "libc", - "log", - "rustversion", - "windows 0.61.3", + "syn", ] [[package]] @@ -2925,7 +2460,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -3047,26 +2582,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "value-traits" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17e73c7053d8fa8e9c3c6b16c32d079ed5642a7156514820486a9c4e109cf48d" -dependencies = [ - "value-traits-derive", -] - -[[package]] -name = "value-traits-derive" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d301d1ee4b3eced3e73aa5740a303c7e068f1d4450c5dae4c8cf6bfa266954f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "version_check" version = "0.9.5" @@ -3139,7 +2654,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.117", + "syn", "wasm-bindgen-shared", ] @@ -3242,68 +2757,22 @@ version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" dependencies = [ - "windows-core 0.57.0", + "windows-core", "windows-targets 0.52.6", ] -[[package]] -name = "windows" -version = "0.61.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" -dependencies = [ - "windows-collections", - "windows-core 0.61.2", - "windows-future", - "windows-link 0.1.3", - "windows-numerics", -] - -[[package]] -name = "windows-collections" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" -dependencies = [ - "windows-core 0.61.2", -] - [[package]] name = "windows-core" version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" dependencies = [ - "windows-implement 0.57.0", - "windows-interface 0.57.0", - "windows-result 0.1.2", + "windows-implement", + "windows-interface", + "windows-result", "windows-targets 0.52.6", ] -[[package]] -name = "windows-core" -version = "0.61.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" -dependencies = [ - "windows-implement 0.60.2", - "windows-interface 0.59.3", - "windows-link 0.1.3", - "windows-result 0.3.4", - "windows-strings", -] - -[[package]] -name = "windows-future" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" -dependencies = [ - "windows-core 0.61.2", - "windows-link 0.1.3", - "windows-threading", -] - [[package]] name = "windows-implement" version = "0.57.0" @@ -3312,18 +2781,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", -] - -[[package]] -name = "windows-implement" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -3334,42 +2792,15 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] -[[package]] -name = "windows-interface" -version = "0.59.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - -[[package]] -name = "windows-link" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" - [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" -[[package]] -name = "windows-numerics" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" -dependencies = [ - "windows-core 0.61.2", - "windows-link 0.1.3", -] - [[package]] name = "windows-result" version = "0.1.2" @@ -3379,24 +2810,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-result" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" -dependencies = [ - "windows-link 0.1.3", -] - -[[package]] -name = "windows-strings" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" -dependencies = [ - "windows-link 0.1.3", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -3421,7 +2834,7 @@ version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ - "windows-link 0.2.1", + "windows-link", ] [[package]] @@ -3455,15 +2868,6 @@ dependencies = [ "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows-threading" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" -dependencies = [ - "windows-link 0.1.3", -] - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -3600,7 +3004,7 @@ checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", "synstructure", ] @@ -3621,7 +3025,7 @@ checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] @@ -3641,7 +3045,7 @@ checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", "synstructure", ] @@ -3681,7 +3085,7 @@ checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" dependencies = [ "proc-macro2", "quote", - "syn 2.0.117", + "syn", ] [[package]] diff --git a/src/Cargo.toml b/src/Cargo.toml index 5ffe180..46a4f87 100644 --- a/src/Cargo.toml +++ b/src/Cargo.toml @@ -1,5 +1,5 @@ [workspace] resolver = "3" -members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec", "obisys"] +members = ["obikseq", "obiread", "obiskbuilder", "obifastwrite", "obikmer","obikrope","obipipeline", "obikpartitionner","obiskio","obidebruinj","obilayeredmap", "obicompactvec", "obisys", "obikindex"] [profile.release] debug = 1 diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index c76af41..a27de2a 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -1,12 +1,10 @@ //use ahash::RandomState; use hashbrown::HashMap; -use obifastwrite::write_unitig; use obikseq::k; use obikseq::unitig::Unitig; use obikseq::{CanonicalKmer, Kmer, Sequence}; use std::cell::Cell; use std::fmt; -use std::io; use xxhash_rust::xxh3::Xxh3Builder; // ── Types ───────────────────────────────────────────────────────────────────── @@ -293,59 +291,6 @@ impl GraphDeBruijn { Some(oriented) } - fn next_longtig_kmer(&self, kmer: Kmer) -> Option { - let canonical = kmer.canonical(); - let node = self.nodes.get(&canonical)?.get(); - - let direct = kmer.raw() == canonical.raw(); - - if (direct && node.n_right_neighbours() == 0) || (!direct && node.n_left_neighbours() == 0) - { - return None; - } - - let next_c: CanonicalKmer = if direct { - if node.can_extend_right() { - canonical - .into_kmer() - .push_right(node.right_nuc()) - .canonical() - } else { - self.iter_right_neighbors(canonical) - .filter(|n| !self.is_visited(n).unwrap_or(true)) - .next()? - } - } else { - if node.can_extend_left() { - canonical.into_kmer().push_left(node.left_nuc()).canonical() - } else { - self.iter_left_neighbors(canonical) - .filter(|n| !self.is_visited(n).unwrap_or(true)) - .next()? - } - }; - - let cell = self.nodes.get(&next_c)?; - let next_node = cell.get(); - if next_node.is_visited() { - return None; - } - - let oriented = oriented_next(kmer, next_c); - let ndirect = oriented.raw() == next_c.raw(); - - if (ndirect && next_node.n_right_neighbours() > 1) - || (!ndirect && next_node.n_left_neighbours() > 1) - { - return None; - } - - let mut updated = next_node; - updated.set_visited(); - cell.set(updated); - Some(oriented) - } - fn iter_unitig_kmers(&self, start: Kmer) -> UnitigIter<'_> { UnitigIter { graph: self, @@ -353,13 +298,6 @@ impl GraphDeBruijn { } } - fn iter_longtig_kmers(&self, start: Kmer) -> LongtigIter<'_> { - LongtigIter { - graph: self, - current: Some(start), - } - } - pub fn iter_unitig(&self) -> impl Iterator + '_ { let k = k(); self.start_iter().map(move |(start, first_next)| { @@ -373,36 +311,6 @@ impl GraphDeBruijn { }) } - pub fn iter_longtig(&self) -> impl Iterator + '_ { - let k = k(); - self.start_iter().map(move |(start, first_next)| { - let mut nucs: Vec = (0..k).map(|i| start.nucleotide(i)).collect(); - if let Some(next_c) = first_next { - for kmer in self.iter_longtig_kmers(next_c) { - nucs.push(kmer.nucleotide(k - 1)); - } - } - Unitig::from_nucleotides(&nucs) - }) - } - - /// Write all unitigs to `out` in FASTA format. - /// - /// Calls [`obifastwrite::write_unitig`] for each unitig produced by - /// [`iter_unitig`]. Stops and returns the first I/O error encountered. - pub fn write_fasta(&self, out: &mut W, unitig: bool) -> io::Result<()> { - if unitig { - for unitig in self.iter_unitig() { - write_unitig(&unitig, k(), out)?; - } - } else { - for unitig in self.iter_longtig() { - write_unitig(&unitig, k(), out)?; - } - } - Ok(()) - } - pub fn len(&self) -> usize { self.nodes.len() } @@ -516,23 +424,6 @@ impl Iterator for UnitigIter<'_> { } } -// ── UnitigIter ──────────────────────────────────────────────────────────────── - -struct LongtigIter<'a> { - graph: &'a GraphDeBruijn, - current: Option, -} - -impl Iterator for LongtigIter<'_> { - type Item = Kmer; - - fn next(&mut self) -> Option { - let current = self.current?; - self.current = self.graph.next_longtig_kmer(current); - Some(current) - } -} - // ── helpers ─────────────────────────────────────────────────────────────────── fn oriented_next(from: Kmer, to: CanonicalKmer) -> Kmer { diff --git a/src/obifastwrite/src/fasta.rs b/src/obifastwrite/src/fasta.rs index b23e9f2..3374bbd 100644 --- a/src/obifastwrite/src/fasta.rs +++ b/src/obifastwrite/src/fasta.rs @@ -1,9 +1,11 @@ use std::fmt; use std::io::{self, Write}; -use xxhash_rust::xxh64::xxh64; -pub(crate) enum JsonVal<'a> { +/// A JSON value that is either a number or a quoted string. +pub enum JsonVal<'a> { + /// Integer value, serialised without quotes. Num(u64), + /// String value, serialised with double quotes. Str(&'a str), } @@ -16,11 +18,8 @@ impl fmt::Display for JsonVal<'_> { } } -pub(crate) fn seq_id(ascii: &[u8]) -> String { - format!("{:016X}", xxh64(ascii, 0)) -} - -pub(crate) fn annotation( +/// Write a JSON object `{"k1":v1,"k2":v2,...}` to `writer`. +pub fn annotation( writer: &mut W, fields: &[(&str, JsonVal<'_>)], ) -> io::Result<()> { @@ -34,10 +33,29 @@ pub(crate) fn annotation( write!(writer, "}}") } -pub(crate) fn write_sequence(writer: &mut W, seq: &[u8], width: usize) -> io::Result<()> { - for chunk in seq.chunks(width) { - // SAFETY: seq is valid UTF-8; any contiguous slice of ASCII bytes is too - writeln!(writer, "{}", unsafe { std::str::from_utf8_unchecked(chunk) })?; - } - Ok(()) +/// xxHash-64 of `ascii`, formatted as 16 uppercase hex digits. +pub fn seq_id(ascii: &[u8]) -> String { + use xxhash_rust::xxh64::xxh64; + format!("{:016X}", xxh64(ascii, 0)) +} + +/// Write `seq` as one line of ASCII DNA, followed by a newline. +pub fn write_sequence(writer: &mut W, seq: &[u8]) -> io::Result<()> { + // SAFETY: seq is valid ASCII DNA (A/C/G/T). + writeln!(writer, "{}", unsafe { std::str::from_utf8_unchecked(seq) }) +} + +/// Core FASTA record writer. +/// +/// Writes `>{id} {annotation}\n{sequence}\n`. +pub fn write_record( + seq: &[u8], + id: &str, + fields: &[(&str, JsonVal<'_>)], + out: &mut W, +) -> io::Result<()> { + write!(out, ">{id} ")?; + annotation(out, fields)?; + writeln!(out)?; + write_sequence(out, seq) } diff --git a/src/obifastwrite/src/lib.rs b/src/obifastwrite/src/lib.rs index 0b1f3d9..3e066cd 100644 --- a/src/obifastwrite/src/lib.rs +++ b/src/obifastwrite/src/lib.rs @@ -1,32 +1,20 @@ -//! FASTA serialisation of [`SuperKmer`] values. +//! FASTA serialisation for obikmer sequence types. //! -//! Two functions cover the two phases of the scatter pipeline: +//! Three public functions cover the main output cases: //! -//! - [`write_scatter`]: scatter phase (before routing). The header annotation -//! contains the minimizer sequence decoded from [`SuperKmer::minimizer_pos`]. +//! - [`write_scatter`]: super-kmers in scatter phase (minimizer annotation) +//! - [`write_count`]: super-kmers in count phase (occurrence count annotation) +//! - [`write_unitig`]: unitigs from the layered index (partition + index annotation) //! -//! - [`write_count`]: count phase (after deduplication). The header annotation -//! contains the occurrence count from [`SuperKmer::count`]. -//! -//! Both functions write standard OBITools-compatible FASTA: +//! All produce OBITools-compatible FASTA: //! //! ```text -//! >ID {"seq_length":32,"kmer_size":31,"minimizer_size":11,"partition":42,"minimizer":"CGTGCTAGATC"} -//! GCTAGCATGCTAGCTGTAGCTGTGAGTGCTG +//! >ID {"key":value,...} +//! SEQUENCE //! ``` //! -//! The record identifier is the xxHash-64 of the ASCII sequence, formatted as -//! a 16-digit uppercase hexadecimal string. xxHash-64 is collision-resistant -//! enough for debugging identifiers (collision probability < 1e-9 for billions -//! of distinct super-kmers). -//! -//! # Phase contract -//! -//! `write_scatter` reads [`SuperKmer::minimizer_pos`], which is only valid -//! **before** [`SuperKmer::init_count`] is called. `write_count` reads -//! [`SuperKmer::count`], which is only meaningful **after** `init_count`. -//! Mixing the two functions in the wrong phase produces silently wrong output; -//! this is enforced by pipeline structure, not by the type system. +//! The lower-level primitive [`write_record`] and the [`JsonVal`] type are also +//! public for callers that need custom annotations. #![deny(missing_docs)] @@ -35,22 +23,15 @@ mod fasta; use std::io::{self, Write}; use obikseq::{Minimizer, SuperKmer, Unitig}; -use xxhash_rust::xxh64::xxh64; + +pub use fasta::{JsonVal, annotation, seq_id, write_record}; // ── public API ──────────────────────────────────────────────────────────────── /// Write one super-kmer in FASTA format — **scatter phase**. /// -/// The `minimizer` field in the JSON annotation contains the ASCII sequence of -/// the minimizer, decoded from [`SuperKmer::minimizer_pos`] (scatter-phase -/// value of the payload field). -/// -/// # Parameters -/// - `sk`: the super-kmer to serialise (must be in scatter phase) -/// - `out`: destination writer -/// - `k`: k-mer size used to build `sk` -/// - `m`: minimizer size -/// - `partition`: partition index computed from the minimizer hash +/// ID is the xxHash-64 of the sequence. JSON annotation includes +/// `seq_length`, `kmer_size`, `minimizer_size`, `partition`, `minimizer`. pub fn write_scatter( sk: &SuperKmer, out: &mut W, @@ -61,37 +42,26 @@ pub fn write_scatter( ) -> io::Result<()> { let ascii = sk.to_ascii(); let id = seq_id(&ascii); - let seq_len = ascii.len(); let min_seq = minimizer.to_ascii(); - - writeln!( + let min_str = unsafe { std::str::from_utf8_unchecked(&min_seq) }; + write_record( + &ascii, + &id, + &[ + ("seq_length", JsonVal::Num(ascii.len() as u64)), + ("kmer_size", JsonVal::Num(k as u64)), + ("minimizer_size",JsonVal::Num(m as u64)), + ("partition", JsonVal::Num(partition as u64)), + ("minimizer", JsonVal::Str(min_str)), + ], out, - ">{id} {{\"seq_length\":{seq_len},\"kmer_size\":{k},\ - \"minimizer_size\":{m},\"partition\":{partition},\ - \"minimizer\":\"{min}\"}}", - id = id, - seq_len = seq_len, - k = k, - m = m, - partition = partition, - min = unsafe { std::str::from_utf8_unchecked(&min_seq) }, - )?; - out.write_all(&ascii)?; - out.write_all(b"\n") + ) } /// Write one super-kmer in FASTA format — **count phase**. /// -/// The `count` field in the JSON annotation contains the occurrence count from -/// [`SuperKmer::count`] (count-phase value of the payload field). -/// -/// # Parameters -/// - `sk`: the super-kmer to serialise (must be in count phase, i.e. after -/// [`SuperKmer::init_count`] has been called) -/// - `out`: destination writer -/// - `k`: k-mer size -/// - `m`: minimizer size -/// - `partition`: partition index +/// ID is the xxHash-64 of the sequence. JSON annotation includes +/// `seq_length`, `kmer_size`, `minimizer_size`, `partition`, `count`. pub fn write_count( sk: &SuperKmer, out: &mut W, @@ -101,52 +71,47 @@ pub fn write_count( ) -> io::Result<()> { let ascii = sk.to_ascii(); let id = seq_id(&ascii); - let seq_len = ascii.len(); - let count = sk.count(); - - writeln!( + write_record( + &ascii, + &id, + &[ + ("seq_length", JsonVal::Num(ascii.len() as u64)), + ("kmer_size", JsonVal::Num(k as u64)), + ("minimizer_size",JsonVal::Num(m as u64)), + ("partition", JsonVal::Num(partition as u64)), + ("count", JsonVal::Num(sk.count() as u64)), + ], out, - ">{id} {{\"seq_length\":{seq_len},\"kmer_size\":{k},\ - \"minimizer_size\":{m},\"partition\":{partition},\ - \"count\":{count}}}", - id = id, - seq_len = seq_len, - k = k, - m = m, - partition = partition, - count = count, - )?; - out.write_all(&ascii)?; - out.write_all(b"\n") + ) } /// Write one unitig in FASTA format. /// -/// Header annotation (JSON): -/// ```text -/// >HASH {"seq_length":,"kmer_size":,"n_kmers":} -/// ``` -/// -/// `HASH` is the xxHash-64 of the ASCII sequence (16 uppercase hex digits). -/// `n_kmers` is the number of distinct k-mers covered by this unitig. -pub fn write_unitig(unitig: &Unitig, k: usize, out: &mut W) -> io::Result<()> { +/// ID is `part_PPPPP_unitig_IIIIII` where `P` is the partition index and `I` +/// is the unitig index within that partition. JSON annotation includes +/// `seq_length`, `kmer_size`, `n_kmers`, `partition`, `unitig_index`. +pub fn write_unitig( + unitig: &Unitig, + k: usize, + partition: usize, + index: usize, + out: &mut W, +) -> io::Result<()> { let ascii = unitig.to_ascii(); - let id = seq_id(&ascii); let seql = unitig.seql(); - let n_kmers = seql - k + 1; - writeln!( + let id = format!("part_{partition:05}_unitig_{index:06}"); + write_record( + &ascii, + &id, + &[ + ("seq_length", JsonVal::Num(seql as u64)), + ("kmer_size", JsonVal::Num(k as u64)), + ("n_kmers", JsonVal::Num((seql - k + 1) as u64)), + ("partition", JsonVal::Num(partition as u64)), + ("unitig_index", JsonVal::Num(index as u64)), + ], out, - ">{id} {{\"seq_length\":{seql},\"kmer_size\":{k},\"n_kmers\":{n_kmers}}}", - )?; - out.write_all(&ascii)?; - out.write_all(b"\n") -} - -// ── internal helpers ────────────────────────────────────────────────────────── - -/// xxHash-64 of the ASCII sequence, formatted as 16 uppercase hex digits. -fn seq_id(ascii: &[u8]) -> String { - format!("{:016X}", xxh64(ascii, 0)) + ) } // ── tests ───────────────────────────────────────────────────────────────────── @@ -178,9 +143,6 @@ mod tests { #[test] fn scatter_minimizer_decoded_from_hash() { - // "ACG" right-aligned: A=00, C=01, G=10 → 0b000110 = 6 - // Left-aligned for m=3: shift by 64 − 2·3 = 58. - // set_m(3) so that Minimizer::to_ascii() decodes exactly 3 bases. obikseq::params::set_m(3); let sk = make(b"ACGTACGTACGT"); let minimizer = Minimizer::from_raw_unchecked(6u64 << (64 - 2 * 3)); @@ -230,13 +192,34 @@ mod tests { #[test] fn count_sequence_line_correct() { - // TTTTACGT canonicalises to ACGTAAAA (revcomp is ACGTAAAA < TTTTACGT) let sk = make(b"TTTTACGT"); let out = capture(|w| write_count(&sk, w, 4, 2, 0)); let lines: Vec<&str> = out.lines().collect(); assert_eq!(lines[1], "ACGTAAAA"); } + // ── write_unitig ────────────────────────────────────────────────────────── + + #[test] + fn unitig_id_format() { + obikseq::params::set_k(4); + let unitig = obikseq::packed_seq::PackedSeq::from_ascii(b"ACGTACGT"); + let out = capture(|w| write_unitig(&unitig, 4, 3, 17, w)); + let id = out.lines().next().unwrap(); + assert!(id.starts_with(">part_00003_unitig_000017"), "got: {id}"); + } + + #[test] + fn unitig_annotation_fields() { + obikseq::params::set_k(4); + let unitig = obikseq::packed_seq::PackedSeq::from_ascii(b"ACGTACGT"); + let out = capture(|w| write_unitig(&unitig, 4, 2, 5, w)); + assert!(out.contains("\"partition\":2")); + assert!(out.contains("\"unitig_index\":5")); + assert!(out.contains("\"n_kmers\":5")); + assert!(out.contains("\"kmer_size\":4")); + } + // ── ID stability ────────────────────────────────────────────────────────── #[test] @@ -260,7 +243,7 @@ mod tests { .next() .unwrap()[1..] .to_string(); - assert_eq!(id1, id2, "same sequence must produce same ID"); + assert_eq!(id1, id2); } #[test] @@ -269,21 +252,11 @@ mod tests { let sk2 = make(b"TTTTTTTT"); let id1 = capture(|w| write_scatter(&sk1, w, 4, 2, 0, Minimizer::from_raw_unchecked(0))) - .lines() - .next() - .unwrap() - .split_whitespace() - .next() - .unwrap()[1..] - .to_string(); + .lines().next().unwrap() + .split_whitespace().next().unwrap()[1..].to_string(); let id2 = capture(|w| write_scatter(&sk2, w, 4, 2, 0, Minimizer::from_raw_unchecked(0))) - .lines() - .next() - .unwrap() - .split_whitespace() - .next() - .unwrap()[1..] - .to_string(); + .lines().next().unwrap() + .split_whitespace().next().unwrap()[1..].to_string(); assert_ne!(id1, id2); } @@ -291,7 +264,7 @@ mod tests { fn id_is_16_hex_digits() { let sk = make(b"ACGTACGT"); let out = capture(|w| write_scatter(&sk, w, 4, 2, 0, Minimizer::from_raw_unchecked(0))); - let id = &out.lines().next().unwrap()[1..17]; // skip '>' + let id = &out.lines().next().unwrap()[1..17]; assert_eq!(id.len(), 16); assert!(id.chars().all(|c| c.is_ascii_hexdigit())); } diff --git a/src/obikindex/Cargo.toml b/src/obikindex/Cargo.toml new file mode 100644 index 0000000..caa12d6 --- /dev/null +++ b/src/obikindex/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "obikindex" +version = "0.1.0" +edition = "2024" + +[dependencies] +obikpartitionner = { path = "../obikpartitionner" } +obikseq = { path = "../obikseq" } +obisys = { path = "../obisys" } +obiskio = { path = "../obiskio" } +obidebruinj = { path = "../obidebruinj" } +obilayeredmap = { path = "../obilayeredmap" } +obicompactvec = { path = "../obicompactvec" } +cacheline-ef = "1.1" +epserde = "0.8" +ptr_hash = "1.1" +rayon = "1" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +indicatif = "0.17" +tracing = "0.1.44" diff --git a/src/obikindex/src/error.rs b/src/obikindex/src/error.rs new file mode 100644 index 0000000..55b2a79 --- /dev/null +++ b/src/obikindex/src/error.rs @@ -0,0 +1,53 @@ +use std::fmt; +use std::io; + +use obiskio::SKError; +use obilayeredmap::OLMError; + +#[derive(Debug)] +pub enum OKIError { + Io(io::Error), + Json(serde_json::Error), + Partition(SKError), + Layer(OLMError), +} + +pub type OKIResult = Result; + +impl fmt::Display for OKIError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + OKIError::Io(e) => write!(f, "I/O error: {e}"), + OKIError::Json(e) => write!(f, "JSON error: {e}"), + OKIError::Partition(e) => write!(f, "partition error: {e}"), + OKIError::Layer(e) => write!(f, "layer error: {e}"), + } + } +} + +impl std::error::Error for OKIError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + OKIError::Io(e) => Some(e), + OKIError::Json(e) => Some(e), + OKIError::Partition(e) => Some(e), + OKIError::Layer(e) => Some(e), + } + } +} + +impl From for OKIError { + fn from(e: io::Error) -> Self { OKIError::Io(e) } +} + +impl From for OKIError { + fn from(e: serde_json::Error) -> Self { OKIError::Json(e) } +} + +impl From for OKIError { + fn from(e: SKError) -> Self { OKIError::Partition(e) } +} + +impl From for OKIError { + fn from(e: OLMError) -> Self { OKIError::Layer(e) } +} diff --git a/src/obikindex/src/index.rs b/src/obikindex/src/index.rs new file mode 100644 index 0000000..37b30c1 --- /dev/null +++ b/src/obikindex/src/index.rs @@ -0,0 +1,301 @@ +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +use cacheline_ef::{CachelineEf, CachelineEfVec}; +use epserde::prelude::*; +use indicatif::{ProgressBar, ProgressStyle}; +use obicompactvec::{PersistentCompactIntMatrix, PersistentCompactIntVec}; +use obidebruinj::GraphDeBruijn; +use obikpartitionner::KmerPartition; +use obilayeredmap::layer::Layer; +use obiskio::{SKFileMeta, SKFileReader}; +use obisys::{Reporter, Stage}; +use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; +use rayon::prelude::*; +use tracing::info; + +use crate::error::{OKIError, OKIResult}; +use crate::meta::{IndexConfig, IndexMeta}; +use crate::state::{IndexState, SENTINEL_INDEXED, SENTINEL_SCATTERED}; + +type Mphf = PtrHash>, Xx64, Vec>; + +pub struct KmerIndex { + root_path: PathBuf, + meta: IndexMeta, + partition: KmerPartition, +} + +impl KmerIndex { + /// Create a new index at `path`. + /// + /// If `genome_label` is `Some`, it is stored immediately. + /// If `None`, the label will be derived from the first scatter input path + /// when `mark_scattered` is called. + pub fn create>( + path: P, + config: IndexConfig, + genome_label: Option, + force: bool, + ) -> OKIResult { + let root_path = path.as_ref().to_owned(); + let partition = KmerPartition::create( + &root_path, + config.n_bits, + config.kmer_size, + config.minimizer_size, + force, + )?; + let mut meta = IndexMeta::new(config); + if let Some(label) = genome_label { + meta.genomes.push(label); + } + meta.write(&root_path)?; + Ok(Self { root_path, meta, partition }) + } + + pub fn open>(path: P) -> OKIResult { + let root_path = path.as_ref().to_owned(); + let meta = IndexMeta::read(&root_path).map_err(OKIError::Io)?; + let partition = KmerPartition::open(&root_path)?; + Ok(Self { root_path, meta, partition }) + } + + /// Return `true` if `path` contains an `index.meta` file. + pub fn exists>(path: P) -> bool { + IndexMeta::exists(path.as_ref()) + } + + /// Current construction state, as reported by sentinel files on disk. + pub fn state(&self) -> IndexState { + IndexState::detect(&self.root_path).unwrap_or(IndexState::Empty) + } + + pub fn meta(&self) -> &IndexMeta { &self.meta } + pub fn kmer_size(&self) -> usize { self.meta.config.kmer_size } + pub fn minimizer_size(&self) -> usize { self.meta.config.minimizer_size } + pub fn n_partitions(&self) -> usize { self.partition.n_partitions() } + + /// Expose the inner partition so the caller can run scatter into it. + /// Call `mark_scattered` once scatter is complete. + pub fn partition_mut(&mut self) -> &mut KmerPartition { + &mut self.partition + } + + /// Mark scatter as complete and write `scatter.done`. + /// + /// If no genome label was set at creation time, one is derived from + /// `first_scatter_path` (filename stripped of all extensions). + /// If `first_scatter_path` is also `None`, the label defaults to `"unknown"`. + pub fn mark_scattered(&mut self, first_scatter_path: Option<&Path>) -> OKIResult<()> { + if self.meta.genomes.is_empty() { + let label = first_scatter_path + .map(label_from_path) + .unwrap_or_else(|| "unknown".to_string()); + self.meta.genomes.push(label); + self.meta.write(&self.root_path)?; + } + touch(&self.root_path.join(SENTINEL_SCATTERED))?; + Ok(()) + } + + /// Dereplicate all partitions then compute kmer counts. + /// + /// Writes `kmer_spectrum_raw.json` at the index root upon completion + /// (this file doubles as the `Counted` sentinel). + pub fn dereplicate_and_count(&self, rep: &mut Reporter) -> OKIResult<()> { + let t = Stage::start("dereplicate"); + self.partition.dereplicate()?; + rep.push(t.stop()); + + let t = Stage::start("count_kmer"); + self.partition.count_kmer()?; + rep.push(t.stop()); + Ok(()) + } + + /// Build the layered MPHF index for all partitions. + /// + /// Default mode (`config.with_counts = false`): set membership only. + /// With counts: count matrix per kmer. + /// + /// Writes `index.done` upon completion. + /// Path to the unitigs file for partition `part`, layer `layer`. + pub fn layer_unitigs_path(&self, part: usize, layer: usize) -> PathBuf { + self.partition.part_dir(part) + .join("index") + .join(format!("layer_{layer}")) + .join("unitigs.bin") + } + + pub fn build_layers( + &self, + min_ab: u32, + max_ab: Option, + keep_intermediate: bool, + rep: &mut Reporter, + ) -> OKIResult<()> { + let n = self.partition.n_partitions(); + let t = Stage::start("index"); + let with_counts = self.meta.config.with_counts; + let filter_active = min_ab > 1 || max_ab.is_some(); + let need_counts = filter_active || with_counts; + let total_kmers = AtomicUsize::new(0); + + let partition = &self.partition; + + let pb = Arc::new(Mutex::new( + ProgressBar::new(n as u64).with_style( + ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(), + ), + )); + + (0..n).into_par_iter().for_each(|i| { + let part_dir = partition.part_dir(i); + let dedup_path = part_dir.join("dereplicated.skmer.zst"); + if !dedup_path.exists() { + return; + } + + let layer_dir = part_dir.join("index").join("layer_0"); + if layer_dir.join("mphf.bin").exists() { + return; + } + + let mphf1_opt: Option = if need_counts { + let p = part_dir.join("mphf1.bin"); + p.exists().then(|| Mphf::load_full(&p).ok()).flatten() + } else { + None + }; + + let counts1_opt: Option = if need_counts { + let p = part_dir.join("counts1.bin"); + p.exists() + .then(|| PersistentCompactIntVec::open(&p).ok()) + .flatten() + } else { + None + }; + + let mut g = GraphDeBruijn::new(); + let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| { + eprintln!("error opening {}: {e}", dedup_path.display()); + std::process::exit(1); + }); + for sk in reader.iter() { + for kmer in sk.iter_canonical_kmers() { + let accept = if filter_active { + match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => { + let ab = counts.get(mphf.index(&kmer.raw())); + ab >= min_ab && max_ab.map_or(true, |max| ab <= max) + } + _ => true, + } + } else { + true + }; + if accept { + g.push(kmer); + } + } + } + + let n_kmers = g.len(); + total_kmers.fetch_add(n_kmers, Ordering::Relaxed); + g.compute_degrees(); + + fs::create_dir_all(&layer_dir).unwrap_or_else(|e| { + eprintln!("error creating {}: {e}", layer_dir.display()); + std::process::exit(1); + }); + let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| { + eprintln!("error creating unitig writer (partition {i}): {e}"); + std::process::exit(1); + }); + for unitig in g.iter_unitig() { + uw.write(&unitig).unwrap_or_else(|e| { + eprintln!("error writing unitig (partition {i}): {e}"); + std::process::exit(1); + }); + } + uw.close().unwrap_or_else(|e| { + eprintln!("error closing unitig writer (partition {i}): {e}"); + std::process::exit(1); + }); + + if with_counts { + Layer::::build(&layer_dir, |kmer| { + match (&mphf1_opt, &counts1_opt) { + (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), + _ => 1, + } + }) + .unwrap_or_else(|e| { + eprintln!("error building count layer (partition {i}): {e}"); + std::process::exit(1); + }); + } else { + Layer::<()>::build(&layer_dir).unwrap_or_else(|e| { + eprintln!("error building set layer (partition {i}): {e}"); + std::process::exit(1); + }); + } + + let pb = pb.lock().unwrap(); + pb.inc(1); + pb.set_message(format!("{i}: {n_kmers} kmers")); + }); + + pb.lock().unwrap().finish_and_clear(); + info!( + "done — {} total kmers indexed", + total_kmers.load(Ordering::Relaxed) + ); + + if !keep_intermediate { + for i in 0..n { + let part_dir = partition.part_dir(i); + remove_if_exists(&part_dir.join("dereplicated.skmer.zst")); + remove_if_exists(&SKFileMeta::sidecar_path( + &part_dir.join("dereplicated.skmer.zst"), + )); + remove_if_exists(&part_dir.join("mphf1.bin")); + remove_if_exists(&part_dir.join("counts1.bin")); + } + } + + touch(&self.root_path.join(SENTINEL_INDEXED))?; + rep.push(t.stop()); + Ok(()) + } +} + +/// Derive a genome label from a file path: filename stripped of all extensions. +fn label_from_path(path: &Path) -> String { + let name = path + .file_name() + .unwrap_or(path.as_os_str()) + .to_string_lossy() + .into_owned(); + let mut s = name; + while let Some(pos) = s.rfind('.') { + s.truncate(pos); + } + if s.is_empty() { "unknown".to_string() } else { s } +} + +fn touch(path: &Path) -> Result<(), std::io::Error> { + fs::File::create(path).map(|_| ()) +} + +fn remove_if_exists(path: &Path) { + if let Err(e) = fs::remove_file(path) { + if e.kind() != std::io::ErrorKind::NotFound { + eprintln!("warning: could not remove {}: {e}", path.display()); + } + } +} diff --git a/src/obikindex/src/lib.rs b/src/obikindex/src/lib.rs new file mode 100644 index 0000000..91a728a --- /dev/null +++ b/src/obikindex/src/lib.rs @@ -0,0 +1,9 @@ +pub mod error; +pub mod meta; +pub mod state; +mod index; + +pub use error::{OKIError, OKIResult}; +pub use index::KmerIndex; +pub use meta::{IndexConfig, IndexMeta, META_FILENAME}; +pub use state::{IndexState, SENTINEL_COUNTED, SENTINEL_INDEXED, SENTINEL_SCATTERED}; diff --git a/src/obikindex/src/meta.rs b/src/obikindex/src/meta.rs new file mode 100644 index 0000000..8327e94 --- /dev/null +++ b/src/obikindex/src/meta.rs @@ -0,0 +1,45 @@ +use std::fs; +use std::io; +use std::path::Path; + +use serde::{Deserialize, Serialize}; + +pub const META_FILENAME: &str = "index.meta"; +const META_VERSION: u32 = 1; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IndexConfig { + pub kmer_size: usize, + pub minimizer_size: usize, + pub n_bits: usize, + pub with_counts: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IndexMeta { + pub version: u32, + pub config: IndexConfig, + /// Ordered list of genome labels indexed here. + /// Element 0 is the initial genome; subsequent entries come from merges. + pub genomes: Vec, +} + +impl IndexMeta { + pub fn new(config: IndexConfig) -> Self { + Self { version: META_VERSION, config, genomes: Vec::new() } + } + + pub fn write(&self, root: &Path) -> io::Result<()> { + let file = fs::File::create(root.join(META_FILENAME))?; + serde_json::to_writer_pretty(file, self).map_err(io::Error::other) + } + + pub fn read(root: &Path) -> io::Result { + let file = fs::File::open(root.join(META_FILENAME))?; + serde_json::from_reader(file).map_err(io::Error::other) + } + + pub fn exists(root: &Path) -> bool { + root.join(META_FILENAME).exists() + } +} diff --git a/src/obikindex/src/state.rs b/src/obikindex/src/state.rs new file mode 100644 index 0000000..a32d4d8 --- /dev/null +++ b/src/obikindex/src/state.rs @@ -0,0 +1,45 @@ +use std::path::Path; + +use crate::meta::META_FILENAME; + +pub const SENTINEL_SCATTERED: &str = "scatter.done"; +pub const SENTINEL_COUNTED: &str = "kmer_spectrum_raw.json"; +pub const SENTINEL_INDEXED: &str = "index.done"; + +/// Progression state of a `KmerIndex`. +/// +/// Variants are ordered: `Empty < Scattered < Counted < Indexed`. +/// A state is reported only when its sentinel file is fully present — +/// partial states (e.g. scatter interrupted mid-way) are not accepted. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum IndexState { + /// `index.meta` present; scatter not yet completed. + Empty, + /// `scatter.done` sentinel present — all super-kmers have been routed. + Scattered, + /// `kmer_spectrum_raw.json` present — dereplicate + count complete. + Counted, + /// `index.done` sentinel present — layered MPHF index fully built. + Indexed, +} + +impl IndexState { + /// Detect the state of the index at `root`. + /// + /// Returns `None` if `index.meta` is absent (not an obikindex directory). + pub fn detect(root: &Path) -> Option { + if !root.join(META_FILENAME).exists() { + return None; + } + if root.join(SENTINEL_INDEXED).exists() { + return Some(Self::Indexed); + } + if root.join(SENTINEL_COUNTED).exists() { + return Some(Self::Counted); + } + if root.join(SENTINEL_SCATTERED).exists() { + return Some(Self::Scattered); + } + Some(Self::Empty) + } +} diff --git a/src/obikmer/Cargo.toml b/src/obikmer/Cargo.toml index ea384d4..f61a524 100644 --- a/src/obikmer/Cargo.toml +++ b/src/obikmer/Cargo.toml @@ -8,28 +8,20 @@ name = "obikmer" path = "src/main.rs" [dependencies] -obikseq = { path = "../obikseq" } -obiread = { path = "../obiread" } -obiskbuilder = { path = "../obiskbuilder" } -obifastwrite = { path = "../obifastwrite" } -obipipeline = { path = "../obipipeline" } -obidebruinj = { path = "../obidebruinj" } -clap = { version = "4", features = ["derive"] } -obikrope = { path = "../obikrope" } -obikpartitionner = { path = "../obikpartitionner" } -obisys = { path = "../obisys" } -obiskio = { path = "../obiskio" } -obicompactvec = { path = "../obicompactvec" } -obilayeredmap = { path = "../obilayeredmap" } -niffler = "3" -rayon = "1" -ph = "0.11" -memmap2 = "0.9" -epserde = "0.8" -ptr_hash = "1.1" -cacheline-ef = "1.1" -indicatif = "0.17" -tracing = "0.1.44" +obikseq = { path = "../obikseq" } +obiread = { path = "../obiread" } +obiskbuilder = { path = "../obiskbuilder" } +obifastwrite = { path = "../obifastwrite" } +obipipeline = { path = "../obipipeline" } +obikrope = { path = "../obikrope" } +obikpartitionner = { path = "../obikpartitionner" } +obisys = { path = "../obisys" } +obiskio = { path = "../obiskio" } +obikindex = { path = "../obikindex" } +clap = { version = "4", features = ["derive"] } +rayon = "1" +indicatif = "0.17" +tracing = "0.1.44" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } pprof = { version = "0.13", features = ["prost-codec"], optional = true } diff --git a/src/obikmer/src/cmd/count.rs b/src/obikmer/src/cmd/count.rs deleted file mode 100644 index ed83eda..0000000 --- a/src/obikmer/src/cmd/count.rs +++ /dev/null @@ -1,24 +0,0 @@ -use clap::Args; -use obikpartitionner::KmerPartition; -use std::path::PathBuf; -use tracing::info; - -#[derive(Args)] -pub struct CountArgs { - /// Partition directory produced by the `partition` command - #[arg(short, long)] - pub partition: PathBuf, -} - -pub fn run(args: CountArgs) { - let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| { - eprintln!("error: {e}"); - std::process::exit(1) - }); - - info!("counting kmers in {}", args.partition.display()); - kp.count_kmer().unwrap_or_else(|e| { - eprintln!("error: {e}"); - std::process::exit(1) - }); -} diff --git a/src/obikmer/src/cmd/fasta.rs b/src/obikmer/src/cmd/fasta.rs deleted file mode 100644 index 9eb384a..0000000 --- a/src/obikmer/src/cmd/fasta.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::fs::File; -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use clap::Args; -use niffler::Level; -use niffler::send::compression::Format; -use obifastwrite::write_count; -use obikpartitionner::KmerPartition; -use obiskio::SKFileReader; -use rayon::prelude::*; -use tracing::info; - -#[derive(Args)] -pub struct FastaArgs { - /// Root of the k-mer partition directory (produced by the `partition` command) - pub partition: PathBuf, - - /// Dump dereplicated super-kmers as FASTA (→ /dereplicated.skmer.fasta.gz) - #[arg(long)] - pub super_kmers: bool, -} - -pub fn run(args: FastaArgs) { - if !args.super_kmers { - eprintln!("error: specify at least one output mode (--super-kmers)"); - std::process::exit(1); - } - - let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| { - eprintln!("error opening partition: {e}"); - std::process::exit(1) - }); - - if args.super_kmers { - dump_super_kmers(&kp, &args.partition); - } -} - -fn dump_super_kmers(kp: &KmerPartition, _partition_dir: &PathBuf) { - let k = kp.kmer_size(); - let m = kp.minimizer_size(); - let n = kp.n_partitions(); - - info!("writing {n} partition FASTA files (parallel)"); - - let total = AtomicUsize::new(0); - - (0..n).into_par_iter().for_each(|i| { - let part_dir = kp.part_dir(i); - let in_path = part_dir.join("dereplicated.skmer.zst"); - if !in_path.exists() { - return; - } - let out_path = part_dir.join("dereplicated.skmer.fasta.gz"); - - let file = File::create(&out_path).unwrap_or_else(|e| { - eprintln!("error creating {}: {e}", out_path.display()); - std::process::exit(1) - }); - let mut writer = niffler::send::get_writer(Box::new(file), Format::Gzip, Level::Six) - .unwrap_or_else(|e| { - eprintln!("error creating gzip writer: {e}"); - std::process::exit(1) - }); - - let mut reader = SKFileReader::open(&in_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", in_path.display()); - std::process::exit(1) - }); - let mut count = 0usize; - for sk in reader.iter() { - write_count(&sk, &mut writer, k, m, i as u32).unwrap_or_else(|e| { - eprintln!("write error: {e}"); - std::process::exit(1) - }); - count += 1; - } - info!("partition {i}: {count} super-kmers → {}", out_path.display()); - total.fetch_add(count, Ordering::Relaxed); - }); - - info!("wrote {} super-kmers total", total.load(Ordering::Relaxed)); -} diff --git a/src/obikmer/src/cmd/index.rs b/src/obikmer/src/cmd/index.rs index 609b45d..40349e1 100644 --- a/src/obikmer/src/cmd/index.rs +++ b/src/obikmer/src/cmd/index.rs @@ -1,17 +1,17 @@ use std::path::PathBuf; use clap::Args; -use obikpartitionner::KmerPartition; +use obikindex::{IndexConfig, IndexState, KmerIndex}; use obikseq::{set_k, set_m}; use obisys::Reporter; use tracing::info; use crate::cli::CommonArgs; -use crate::steps::{build_index, dereplicate_and_count, scatter}; +use crate::steps::scatter; #[derive(Args)] pub struct IndexArgs { - /// Output partition directory + /// Output index directory #[arg(short, long)] pub output: PathBuf, @@ -19,6 +19,10 @@ pub struct IndexArgs { #[arg(long, default_value_t = false)] pub force: bool, + /// Genome label (default: input filename without path/extension) + #[arg(long)] + pub label: Option, + /// Minimum kmer abundance (inclusive) #[arg(long, default_value_t = 1)] pub min_abundance: u32, @@ -43,53 +47,71 @@ pub fn run(args: IndexArgs) { let output = args.output.clone(); let mut rep = Reporter::new(); - // ── Stage 1: scatter (skipped if partition already exists) ─────────────── - let kp = if output.join("partition.meta").exists() { - info!("resuming from existing partition at {}", output.display()); - let kp = KmerPartition::open(&output).unwrap_or_else(|e| { - eprintln!("error opening partition: {e}"); + // ── Open or create the index ───────────────────────────────────────────── + let mut idx = if KmerIndex::exists(&output) { + info!("resuming from existing index at {}", output.display()); + KmerIndex::open(&output).unwrap_or_else(|e| { + eprintln!("error opening index: {e}"); std::process::exit(1); - }); - set_k(kp.kmer_size()); - set_m(kp.minimizer_size()); - kp + }) } else { - let k = args.common.kmer_size; - set_k(k); - let m = args.common.minimizer_size; - set_m(m); - let theta = args.common.theta; - let level_max = args.common.level_max; - let n_workers = args.common.threads.max(1); - - let mut kp = - KmerPartition::create(&output, args.common.partition_bits, k, m, args.force) - .unwrap_or_else(|e| { - eprintln!("error: {e}"); - std::process::exit(1); - }); - - scatter(&mut kp, args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep); - kp + let config = IndexConfig { + kmer_size: args.common.kmer_size, + minimizer_size: args.common.minimizer_size, + n_bits: args.common.partition_bits, + with_counts: args.with_counts, + }; + KmerIndex::create(&output, config, args.label.clone(), args.force).unwrap_or_else(|e| { + eprintln!("error creating index: {e}"); + std::process::exit(1); + }) }; + set_k(idx.kmer_size()); + set_m(idx.minimizer_size()); - // ── Stage 2: dereplicate + count (skipped if already done) ─────────────── - if !output.join("kmer_spectrum_raw.json").exists() { - dereplicate_and_count(&kp, &mut rep); + // ── Stage 1: scatter ───────────────────────────────────────────────────── + if idx.state() < IndexState::Scattered { + let first_path = args.common.inputs.first().map(PathBuf::from); + let k = idx.kmer_size(); + let level_max = args.common.level_max; + let theta = args.common.theta; + let n_workers = args.common.threads.max(1); + + scatter(idx.partition_mut(), args.common.seqfile_paths(), k, level_max, theta, n_workers, &mut rep); + + idx.mark_scattered(first_path.as_deref()).unwrap_or_else(|e| { + eprintln!("error marking scatter done: {e}"); + std::process::exit(1); + }); } else { - info!("kmer counts already present, skipping dereplicate + count"); + info!("scatter already done, skipping"); + } + + // ── Stage 2: dereplicate + count ───────────────────────────────────────── + if idx.state() < IndexState::Counted { + idx.dereplicate_and_count(&mut rep).unwrap_or_else(|e| { + eprintln!("error: {e}"); + std::process::exit(1); + }); + } else { + info!("dereplicate+count already done, skipping"); } // ── Stage 3: build layered index ───────────────────────────────────────── - build_index( - &kp, - args.min_abundance, - args.max_abundance, - args.with_counts, - args.keep_intermediate, - &mut rep, - ); + if idx.state() < IndexState::Indexed { + idx.build_layers( + args.min_abundance, + args.max_abundance, + args.keep_intermediate, + &mut rep, + ).unwrap_or_else(|e| { + eprintln!("error: {e}"); + std::process::exit(1); + }); + } else { + info!("index already built, skipping"); + } rep.print(); } diff --git a/src/obikmer/src/cmd/longtig.rs b/src/obikmer/src/cmd/longtig.rs deleted file mode 100644 index 1d8b922..0000000 --- a/src/obikmer/src/cmd/longtig.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::fs::File; -use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use clap::Args; -use niffler::Level; -use niffler::send::compression::Format; -use obidebruinj::GraphDeBruijn; -use obikpartitionner::KmerPartition; -use obikseq::set_k; -use obiskio::SKFileReader; -use ph::fmph::GOFunction; -use rayon::prelude::*; -use tracing::info; - -#[derive(Args)] -pub struct LongtigArgs { - /// Root of the k-mer partition directory (produced by the `partition` command) - pub partition: PathBuf, - - /// Minimum kmer abundance (inclusive); kmers below this threshold are excluded - #[arg(long, default_value_t = 1)] - pub min_abundance: u32, - - /// Maximum kmer abundance (inclusive); kmers above this threshold are excluded - #[arg(long)] - pub max_abundance: Option, -} - -pub fn run(args: LongtigArgs) { - let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| { - eprintln!("error opening partition: {e}"); - std::process::exit(1) - }); - - let k = kp.kmer_size(); - set_k(k); - let n = kp.n_partitions(); - info!("building longtigs from {n} partitions (k={k}, parallel)"); - - let total_kmers = AtomicUsize::new(0); - - (0..n).into_par_iter().for_each(|i| { - let part_dir = kp.part_dir(i); - let in_path = part_dir.join("dereplicated.skmer.zst"); - if !in_path.exists() { - return; - } - let out_path = part_dir.join("longtig.fasta.gz"); - - let mut g = GraphDeBruijn::new(); - - let mphf_path = part_dir.join("mphf1.bin"); - let counts_path = part_dir.join("counts1.bin"); - let filter_active = (args.min_abundance > 1 || args.max_abundance.is_some()) - && mphf_path.exists() - && counts_path.exists(); - - let mphf_opt: Option = if filter_active { - let mut f = File::open(&mphf_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", mphf_path.display()); - std::process::exit(1) - }); - Some(GOFunction::read(&mut f).unwrap_or_else(|e| { - eprintln!("error reading MPHF {}: {e}", mphf_path.display()); - std::process::exit(1) - })) - } else { - None - }; - - let counts_mmap_opt = if filter_active { - let cf = File::open(&counts_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", counts_path.display()); - std::process::exit(1) - }); - Some(unsafe { - memmap2::Mmap::map(&cf).unwrap_or_else(|e| { - eprintln!("error mmapping {}: {e}", counts_path.display()); - std::process::exit(1) - }) - }) - } else { - None - }; - - let counts_slice: Option<&[u32]> = counts_mmap_opt - .as_ref() - .map(|m| unsafe { std::slice::from_raw_parts(m.as_ptr() as *const u32, m.len() / 4) }); - - let mut reader = SKFileReader::open(&in_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", in_path.display()); - std::process::exit(1) - }); - for sk in reader.iter() { - for kmer in sk.iter_canonical_kmers() { - let accept = match (&mphf_opt, counts_slice) { - (Some(mphf), Some(counts)) => { - if let Some(slot) = mphf.get(&kmer) { - let ab = counts[slot as usize]; - ab >= args.min_abundance - && args.max_abundance.map_or(true, |max| ab <= max) - } else { - false - } - } - _ => true, - }; - if accept { - g.push(kmer); - } - } - } - - let n_kmers = g.len(); - total_kmers.fetch_add(n_kmers, Ordering::Relaxed); - info!( - "partition {i}/{n}: {n_kmers} canonical k-mers → {}", - out_path.display() - ); - - g.compute_degrees(); - - let file = File::create(&out_path).unwrap_or_else(|e| { - eprintln!("error creating {}: {e}", out_path.display()); - std::process::exit(1) - }); - let mut writer = niffler::send::get_writer(Box::new(file), Format::Gzip, Level::Six) - .unwrap_or_else(|e| { - eprintln!("error creating gzip writer: {e}"); - std::process::exit(1) - }); - g.write_fasta(&mut writer, false).unwrap_or_else(|e| { - eprintln!("write error on partition {i}: {e}"); - std::process::exit(1) - }); - }); - - info!( - "done — {} total canonical k-mers across all partitions", - total_kmers.load(Ordering::Relaxed) - ); -} diff --git a/src/obikmer/src/cmd/mod.rs b/src/obikmer/src/cmd/mod.rs index bf1fe18..1d2188e 100644 --- a/src/obikmer/src/cmd/mod.rs +++ b/src/obikmer/src/cmd/mod.rs @@ -1,7 +1,3 @@ -pub mod count; -pub mod fasta; pub mod index; -pub mod longtig; -pub mod partition; pub mod superkmer; pub mod unitig; diff --git a/src/obikmer/src/cmd/partition.rs b/src/obikmer/src/cmd/partition.rs deleted file mode 100644 index 6245b83..0000000 --- a/src/obikmer/src/cmd/partition.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::path::PathBuf; - -use clap::Args; -use obikpartitionner::KmerPartition; -use obisys::Reporter; -use obikseq::{set_k, set_m}; - -use crate::cli::CommonArgs; -use crate::steps::{dereplicate_and_count, scatter}; - -#[derive(Args)] -pub struct PartitionArgs { - /// Output partition directory - #[arg(short, long)] - pub output: PathBuf, - - /// Overwrite output directory if it already exists - #[arg(long, default_value_t = false)] - pub force: bool, - - #[command(flatten)] - pub common: CommonArgs, -} - -// ── Entry point ─────────────────────────────────────────────────────────────── - -pub fn run(args: PartitionArgs) { - let k = args.common.kmer_size; - set_k(k); - let m = args.common.minimizer_size; - set_m(m); - let theta = args.common.theta; - let level_max = args.common.level_max; - let n_workers = args.common.threads.max(1); - - let mut kp = KmerPartition::create(&args.output, args.common.partition_bits, k, m, args.force) - .unwrap_or_else(|e| { - eprintln!("error: {e}"); - std::process::exit(1) - }); - - let path_source = args.common.seqfile_paths(); - let mut rep = Reporter::new(); - - scatter(&mut kp, path_source, k, level_max, theta, n_workers, &mut rep); - dereplicate_and_count(&kp, &mut rep); - - rep.print(); -} diff --git a/src/obikmer/src/cmd/unitig.rs b/src/obikmer/src/cmd/unitig.rs index ddf0fc0..ceb71f8 100644 --- a/src/obikmer/src/cmd/unitig.rs +++ b/src/obikmer/src/cmd/unitig.rs @@ -1,143 +1,54 @@ -use std::fs::File; +use std::io::{self, BufWriter, Write}; use std::path::PathBuf; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex; use clap::Args; -use niffler::Level; -use niffler::send::compression::Format; -use obidebruinj::GraphDeBruijn; -use obikpartitionner::KmerPartition; +use obifastwrite::write_unitig; +use obikindex::KmerIndex; use obikseq::set_k; -use obiskio::SKFileReader; -use ph::fmph::GOFunction; +use obiskio::UnitigFileReader; use rayon::prelude::*; use tracing::info; #[derive(Args)] pub struct UnitigArgs { - /// Root of the k-mer partition directory (produced by the `partition` command) - pub partition: PathBuf, - - /// Minimum kmer abundance (inclusive); kmers below this threshold are excluded - #[arg(long, default_value_t = 1)] - pub min_abundance: u32, - - /// Maximum kmer abundance (inclusive); kmers above this threshold are excluded - #[arg(long)] - pub max_abundance: Option, + /// Index directory produced by the `index` command + pub index: PathBuf, } pub fn run(args: UnitigArgs) { - let kp = KmerPartition::open(&args.partition).unwrap_or_else(|e| { - eprintln!("error opening partition: {e}"); + let idx = KmerIndex::open(&args.index).unwrap_or_else(|e| { + eprintln!("error opening index: {e}"); std::process::exit(1) }); - let k = kp.kmer_size(); + let k = idx.kmer_size(); set_k(k); - let n = kp.n_partitions(); - info!("building unitigs from {n} partitions (k={k}, parallel)"); + let n = idx.n_partitions(); + info!("dumping unitigs from {n} partitions (k={k})"); - let total_kmers = AtomicUsize::new(0); + let stdout = Mutex::new(BufWriter::new(io::stdout())); (0..n).into_par_iter().for_each(|i| { - let part_dir = kp.part_dir(i); - let in_path = part_dir.join("dereplicated.skmer.zst"); - if !in_path.exists() { + let path = idx.layer_unitigs_path(i, 0); + if !path.exists() { return; } - let out_path = part_dir.join("unitig.fasta.gz"); - let mut g = GraphDeBruijn::new(); - - let mphf_path = part_dir.join("mphf1.bin"); - let counts_path = part_dir.join("counts1.bin"); - let filter_active = (args.min_abundance > 1 || args.max_abundance.is_some()) - && mphf_path.exists() - && counts_path.exists(); - - let mphf_opt: Option = if filter_active { - let mut f = File::open(&mphf_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", mphf_path.display()); - std::process::exit(1) - }); - Some(GOFunction::read(&mut f).unwrap_or_else(|e| { - eprintln!("error reading MPHF {}: {e}", mphf_path.display()); - std::process::exit(1) - })) - } else { - None - }; - - let counts_mmap_opt = if filter_active { - let cf = File::open(&counts_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", counts_path.display()); - std::process::exit(1) - }); - Some(unsafe { - memmap2::Mmap::map(&cf).unwrap_or_else(|e| { - eprintln!("error mmapping {}: {e}", counts_path.display()); - std::process::exit(1) - }) - }) - } else { - None - }; - - let counts_slice: Option<&[u32]> = counts_mmap_opt - .as_ref() - .map(|m| unsafe { std::slice::from_raw_parts(m.as_ptr() as *const u32, m.len() / 4) }); - - let mut reader = SKFileReader::open(&in_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", in_path.display()); + let reader = UnitigFileReader::open(&path).unwrap_or_else(|e| { + eprintln!("error opening unitigs (partition {i}): {e}"); std::process::exit(1) }); - for sk in reader.iter() { - for kmer in sk.iter_canonical_kmers() { - let accept = match (&mphf_opt, counts_slice) { - (Some(mphf), Some(counts)) => { - if let Some(slot) = mphf.get(&kmer) { - let ab = counts[slot as usize]; - ab >= args.min_abundance - && args.max_abundance.map_or(true, |max| ab <= max) - } else { - false - } - } - _ => true, - }; - if accept { - g.push(kmer); - } - } + + for j in 0..reader.len() { + let unitig = reader.unitig(j); + let mut out = stdout.lock().unwrap(); + write_unitig(&unitig, k, i, j, &mut *out).unwrap_or_else(|e| { + eprintln!("write error: {e}"); + std::process::exit(1) + }); } - - let n_kmers = g.len(); - total_kmers.fetch_add(n_kmers, Ordering::Relaxed); - info!( - "partition {i}/{n}: {n_kmers} canonical k-mers → {}", - out_path.display() - ); - - g.compute_degrees(); - - let file = File::create(&out_path).unwrap_or_else(|e| { - eprintln!("error creating {}: {e}", out_path.display()); - std::process::exit(1) - }); - let mut writer = niffler::send::get_writer(Box::new(file), Format::Gzip, Level::Six) - .unwrap_or_else(|e| { - eprintln!("error creating gzip writer: {e}"); - std::process::exit(1) - }); - g.write_fasta(&mut writer, true).unwrap_or_else(|e| { - eprintln!("write error on partition {i}: {e}"); - std::process::exit(1) - }); }); - info!( - "done — {} total canonical k-mers across all partitions", - total_kmers.load(Ordering::Relaxed) - ); + stdout.into_inner().unwrap().flush().expect("flush error"); } diff --git a/src/obikmer/src/main.rs b/src/obikmer/src/main.rs index 22a3ef8..470da38 100644 --- a/src/obikmer/src/main.rs +++ b/src/obikmer/src/main.rs @@ -14,20 +14,12 @@ struct Cli { #[derive(Subcommand)] enum Commands { - /// Extract super-kmers from a sequence file (scatter phase) + /// Extract super-kmers from a sequence file and write to stdout Superkmer(cmd::superkmer::SuperkmerArgs), - /// Partition super-kmers on disk by minimizer - Partition(cmd::partition::PartitionArgs), - /// Count kmers from an existing dereplicated partition directory - Count(cmd::count::CountArgs), - /// Export partition data to FASTA (--super-kmers: dereplicated super-kmers) - Fasta(cmd::fasta::FastaArgs), - /// Build de Bruijn unitigs for all partitions and write to unitig.fasta.gz - Unitig(cmd::unitig::UnitigArgs), - /// Build de Bruijn longtigs for all partitions and write to longtig.fasta.gz - Longtig(cmd::longtig::LongtigArgs), /// Build the complete genome index (scatter → dereplicate → count → layered MPHF) Index(cmd::index::IndexArgs), + /// Dump unitigs from a built index to stdout (debug) + Unitig(cmd::unitig::UnitigArgs), } fn main() { @@ -50,12 +42,8 @@ fn main() { let cli = Cli::parse(); match cli.command { Commands::Superkmer(args) => cmd::superkmer::run(args), - Commands::Partition(args) => cmd::partition::run(args), - Commands::Count(args) => cmd::count::run(args), - Commands::Fasta(args) => cmd::fasta::run(args), - Commands::Unitig(args) => cmd::unitig::run(args), - Commands::Longtig(args) => cmd::longtig::run(args), - Commands::Index(args) => cmd::index::run(args), + Commands::Index(args) => cmd::index::run(args), + Commands::Unitig(args) => cmd::unitig::run(args), } #[cfg(feature = "profiling")] diff --git a/src/obikmer/src/steps/build_index.rs b/src/obikmer/src/steps/build_index.rs deleted file mode 100644 index 31e7629..0000000 --- a/src/obikmer/src/steps/build_index.rs +++ /dev/null @@ -1,177 +0,0 @@ -use std::fs; -use std::path::Path; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; - -use cacheline_ef::{CachelineEf, CachelineEfVec}; -use epserde::prelude::*; -use indicatif::{ProgressBar, ProgressStyle}; -use obicompactvec::PersistentCompactIntMatrix; -use obicompactvec::PersistentCompactIntVec; -use obidebruinj::GraphDeBruijn; -use obikpartitionner::KmerPartition; -use obilayeredmap::layer::Layer; -use obiskio::{SKFileMeta, SKFileReader}; -use obisys::{Reporter, Stage}; -use ptr_hash::{PtrHash, bucket_fn::CubicEps, hash::Xx64}; -use rayon::prelude::*; -use tracing::info; - -type Mphf = PtrHash>, Xx64, Vec>; - -/// Build the layered MPHF index for all partitions in parallel. -/// -/// Default mode (with_counts = false): set membership only (`Layer<()>`). -/// With counts (with_counts = true): count matrix per kmer (`Layer`). -/// -/// Skips any partition whose `index/layer_0/mphf.bin` already exists (resume). -/// Reports the "index" stage to `rep`. -pub fn build_index( - kp: &KmerPartition, - min_ab: u32, - max_ab: Option, - with_counts: bool, - keep_intermediate: bool, - rep: &mut Reporter, -) { - let n = kp.n_partitions(); - let t = Stage::start("index"); - let total_kmers = AtomicUsize::new(0); - let filter_active = min_ab > 1 || max_ab.is_some(); - let need_counts = filter_active || with_counts; - - let pb = Arc::new(Mutex::new( - ProgressBar::new(n as u64).with_style( - ProgressStyle::with_template("index — [{bar:20}] {pos}/{len} | {msg}").unwrap(), - ), - )); - - (0..n).into_par_iter().for_each(|i| { - let part_dir = kp.part_dir(i); - let dedup_path = part_dir.join("dereplicated.skmer.zst"); - if !dedup_path.exists() { - return; - } - - let layer_dir = part_dir.join("index").join("layer_0"); - if layer_dir.join("mphf.bin").exists() { - return; - } - - // Load partition MPHF + counts when needed for filtering or count payload - let mphf1_opt: Option = if need_counts { - let p = part_dir.join("mphf1.bin"); - p.exists().then(|| Mphf::load_full(&p).ok()).flatten() - } else { - None - }; - - let counts1_opt: Option = if need_counts { - let p = part_dir.join("counts1.bin"); - p.exists() - .then(|| PersistentCompactIntVec::open(&p).ok()) - .flatten() - } else { - None - }; - - // Build de Bruijn graph with optional abundance filter - let mut g = GraphDeBruijn::new(); - let mut reader = SKFileReader::open(&dedup_path).unwrap_or_else(|e| { - eprintln!("error opening {}: {e}", dedup_path.display()); - std::process::exit(1); - }); - for sk in reader.iter() { - for kmer in sk.iter_canonical_kmers() { - let accept = if filter_active { - match (&mphf1_opt, &counts1_opt) { - (Some(mphf), Some(counts)) => { - let ab = counts.get(mphf.index(&kmer.raw())); - ab >= min_ab && max_ab.map_or(true, |max| ab <= max) - } - _ => true, - } - } else { - true - }; - if accept { - g.push(kmer); - } - } - } - - let n_kmers = g.len(); - total_kmers.fetch_add(n_kmers, Ordering::Relaxed); - g.compute_degrees(); - - // Write unitigs to layer_0/unitigs.bin - fs::create_dir_all(&layer_dir).unwrap_or_else(|e| { - eprintln!("error creating {}: {e}", layer_dir.display()); - std::process::exit(1); - }); - let mut uw = Layer::<()>::unitig_writer(&layer_dir).unwrap_or_else(|e| { - eprintln!("error creating unitig writer (partition {i}): {e}"); - std::process::exit(1); - }); - for unitig in g.iter_unitig() { - uw.write(&unitig).unwrap_or_else(|e| { - eprintln!("error writing unitig (partition {i}): {e}"); - std::process::exit(1); - }); - } - uw.close().unwrap_or_else(|e| { - eprintln!("error closing unitig writer (partition {i}): {e}"); - std::process::exit(1); - }); - - // Build MPHF layer — mode depends on --with-counts - if with_counts { - Layer::::build(&layer_dir, |kmer| { - match (&mphf1_opt, &counts1_opt) { - (Some(mphf), Some(counts)) => counts.get(mphf.index(&kmer.raw())), - _ => 1, - } - }) - .unwrap_or_else(|e| { - eprintln!("error building count layer (partition {i}): {e}"); - std::process::exit(1); - }); - } else { - Layer::<()>::build(&layer_dir).unwrap_or_else(|e| { - eprintln!("error building set layer (partition {i}): {e}"); - std::process::exit(1); - }); - } - - let pb = pb.lock().unwrap(); - pb.inc(1); - pb.set_message(format!("{i}: {n_kmers} kmers")); - }); - - pb.lock().unwrap().finish_and_clear(); - info!( - "done — {} total kmers indexed", - total_kmers.load(Ordering::Relaxed) - ); - - // ── Cleanup intermediate build files ────────────────────────────────────── - if !keep_intermediate { - for i in 0..n { - let part_dir = kp.part_dir(i); - remove_if_exists(&part_dir.join("dereplicated.skmer.zst")); - remove_if_exists(&SKFileMeta::sidecar_path(&part_dir.join("dereplicated.skmer.zst"))); - remove_if_exists(&part_dir.join("mphf1.bin")); - remove_if_exists(&part_dir.join("counts1.bin")); - } - } - - rep.push(t.stop()); -} - -fn remove_if_exists(path: &Path) { - if let Err(e) = fs::remove_file(path) { - if e.kind() != std::io::ErrorKind::NotFound { - eprintln!("warning: could not remove {}: {e}", path.display()); - } - } -} diff --git a/src/obikmer/src/steps/dereplicate_and_count.rs b/src/obikmer/src/steps/dereplicate_and_count.rs deleted file mode 100644 index 68fd99c..0000000 --- a/src/obikmer/src/steps/dereplicate_and_count.rs +++ /dev/null @@ -1,13 +0,0 @@ -use obikpartitionner::KmerPartition; -use obisys::{Reporter, Stage}; - -/// Dereplicate then count kmers. Reports each stage to `rep`. -pub fn dereplicate_and_count(kp: &KmerPartition, rep: &mut Reporter) { - let t = Stage::start("dereplicate"); - kp.dereplicate().expect("dereplicate error"); - rep.push(t.stop()); - - let t = Stage::start("count_kmer"); - kp.count_kmer().expect("count kmer error"); - rep.push(t.stop()); -} diff --git a/src/obikmer/src/steps/mod.rs b/src/obikmer/src/steps/mod.rs index 4666631..3e31ab6 100644 --- a/src/obikmer/src/steps/mod.rs +++ b/src/obikmer/src/steps/mod.rs @@ -1,7 +1,3 @@ -mod build_index; -mod dereplicate_and_count; mod scatter; -pub use build_index::build_index; -pub use dereplicate_and_count::dereplicate_and_count; pub use scatter::scatter;