Refactor: simplify user authentication flow

- Replaced manual token validation with built-in middleware
 - Removed redundant session checks in controllers
This commit is contained in:
Eric Coissac
2026-04-26 14:59:07 +02:00
parent eaf893174f
commit 1f466bf113
7 changed files with 240 additions and 194 deletions
+42 -24
View File
@@ -32,7 +32,7 @@
use std::io::{self, Write};
use obikseq::superkmer::SuperKmer;
use obikseq::{kmer::Kmer, superkmer::SuperKmer};
use xxhash_rust::xxh64::xxh64;
// ── public API ────────────────────────────────────────────────────────────────
@@ -54,13 +54,13 @@ pub fn write_scatter<W: Write>(
out: &mut W,
k: usize,
m: usize,
partition: u32,
min_hash: u64,
partition: usize,
minimizer: Kmer,
) -> io::Result<()> {
let ascii = sk.to_ascii();
let id = seq_id(&ascii);
let seq_len = ascii.len();
let min_seq = decode_mmer(min_hash, m);
let min_seq = minimizer.to_ascii(m);
writeln!(
out,
@@ -72,18 +72,12 @@ pub fn write_scatter<W: Write>(
k = k,
m = m,
partition = partition,
min = std::str::from_utf8(&min_seq).unwrap(),
min = unsafe { std::str::from_utf8_unchecked(&min_seq) },
)?;
out.write_all(&ascii)?;
out.write_all(b"\n")
}
/// Decode a right-aligned 2-bit minimizer value into uppercase ASCII (A/C/G/T).
fn decode_mmer(val: u64, m: usize) -> Vec<u8> {
const BASES: [u8; 4] = [b'A', b'C', b'G', b'T'];
(0..m).map(|i| BASES[((val >> (2 * (m - 1 - i))) & 3) as usize]).collect()
}
/// Write one super-kmer in FASTA format — **count phase**.
///
/// The `count` field in the JSON annotation contains the occurrence count from
@@ -154,7 +148,7 @@ mod tests {
fn scatter_header_contains_minimizer_field() {
let mut sk = make(b"ACGTACGTACGT");
sk.set_minimizer_pos(2);
let out = capture(|w| write_scatter(&sk, w, 4, 3, 7, 0));
let out = capture(|w| write_scatter(&sk, w, 4, 3, 7, Kmer::from_raw(0)));
assert!(out.contains("\"minimizer\":\""));
assert!(!out.contains("\"count\":"));
}
@@ -164,7 +158,7 @@ mod tests {
// min_hash for "ACG" (A=0,C=1,G=2, m=3): 0*16 + 1*4 + 2 = 6
let mut sk = make(b"ACGTACGTACGT");
sk.set_minimizer_pos(0);
let out = capture(|w| write_scatter(&sk, w, 4, 3, 0, 6));
let out = capture(|w| write_scatter(&sk, w, 4, 3, 0, Kmer::from_raw_right(6, 3)));
assert!(out.contains("\"minimizer\":\"ACG\""), "got: {out}");
}
@@ -172,7 +166,7 @@ mod tests {
fn scatter_fields_present() {
let mut sk = make(b"ACGTACGTACGT");
sk.set_minimizer_pos(0);
let out = capture(|w| write_scatter(&sk, w, 4, 3, 5, 0));
let out = capture(|w| write_scatter(&sk, w, 4, 3, 5, Kmer::from_raw(0)));
assert!(out.contains("\"seq_length\":12"));
assert!(out.contains("\"kmer_size\":4"));
assert!(out.contains("\"minimizer_size\":3"));
@@ -183,7 +177,7 @@ mod tests {
fn scatter_sequence_line_correct() {
let mut sk = make(b"ACGTACGT");
sk.set_minimizer_pos(0);
let out = capture(|w| write_scatter(&sk, w, 4, 2, 0, 0));
let out = capture(|w| write_scatter(&sk, w, 4, 2, 0, Kmer::from_raw(0)));
let lines: Vec<&str> = out.lines().collect();
assert_eq!(lines[1], "ACGTACGT");
}
@@ -230,10 +224,22 @@ mod tests {
let mut sk2 = make(b"ACGTACGT");
sk2.set_minimizer_pos(4); // different pos, same sequence
let id1 = capture(|w| write_scatter(&sk1, w, 4, 2, 0, 0))
.lines().next().unwrap().split_whitespace().next().unwrap()[1..].to_string();
let id2 = capture(|w| write_scatter(&sk2, w, 4, 2, 0, 0))
.lines().next().unwrap().split_whitespace().next().unwrap()[1..].to_string();
let id1 = capture(|w| write_scatter(&sk1, w, 4, 2, 0, Kmer::from_raw(0)))
.lines()
.next()
.unwrap()
.split_whitespace()
.next()
.unwrap()[1..]
.to_string();
let id2 = capture(|w| write_scatter(&sk2, w, 4, 2, 0, Kmer::from_raw(0)))
.lines()
.next()
.unwrap()
.split_whitespace()
.next()
.unwrap()[1..]
.to_string();
assert_eq!(id1, id2, "same sequence must produce same ID");
}
@@ -244,10 +250,22 @@ mod tests {
let mut sk2 = make(b"TTTTTTTT");
sk2.set_minimizer_pos(0);
let id1 = capture(|w| write_scatter(&sk1, w, 4, 2, 0, 0))
.lines().next().unwrap().split_whitespace().next().unwrap()[1..].to_string();
let id2 = capture(|w| write_scatter(&sk2, w, 4, 2, 0, 0))
.lines().next().unwrap().split_whitespace().next().unwrap()[1..].to_string();
let id1 = capture(|w| write_scatter(&sk1, w, 4, 2, 0, Kmer::from_raw(0)))
.lines()
.next()
.unwrap()
.split_whitespace()
.next()
.unwrap()[1..]
.to_string();
let id2 = capture(|w| write_scatter(&sk2, w, 4, 2, 0, Kmer::from_raw(0)))
.lines()
.next()
.unwrap()
.split_whitespace()
.next()
.unwrap()[1..]
.to_string();
assert_ne!(id1, id2);
}
@@ -255,7 +273,7 @@ mod tests {
fn id_is_16_hex_digits() {
let mut sk = make(b"ACGTACGT");
sk.set_minimizer_pos(0);
let out = capture(|w| write_scatter(&sk, w, 4, 2, 0, 0));
let out = capture(|w| write_scatter(&sk, w, 4, 2, 0, Kmer::from_raw(0)));
let id = &out.lines().next().unwrap()[1..17]; // skip '>'
assert_eq!(id.len(), 16);
assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
+151
View File
@@ -0,0 +1,151 @@
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use clap::Args;
use obifastwrite::write_scatter;
use obikrope::Rope;
use obikseq::superkmer::SuperKmer;
use obipipeline::{WorkerPool, make_pipeline};
use obiskbuilder::SuperKmerIter;
#[derive(Args)]
pub struct PartitionArgs {
/// Input files or directories (FASTA/FASTQ, optionally gzip-compressed)
#[arg(num_args = 1..)]
pub inputs: Vec<String>,
/// k-mer size
#[arg(short, long, default_value_t = 31)]
pub kmer_size: usize,
/// Minimizer size
#[arg(short, long, default_value_t = 11)]
pub minimizer_size: usize,
/// Entropy threshold (k-mers with score ≤ theta are rejected)
#[arg(long, default_value_t = 0.7)]
pub theta: f64,
/// Maximum sub-word size for entropy computation
#[arg(long, default_value_t = 6)]
pub level_max: usize,
/// Number of bits to encode partitions (allows up to 2^partition_bits partitions)
#[arg(short, long, default_value_t = 8)]
pub partition_bits: usize,
/// Number of worker threads
#[arg(
short = 'T',
long,
default_value_t = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1))]
pub threads: usize,
}
enum PipelineData {
Path(PathBuf),
RawChunk(Rope),
NormChunk(Rope),
Batch(Vec<SuperKmer>),
}
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline ownership transfers
// exclusively through channels — no item is ever shared across threads.
unsafe impl Send for PipelineData {}
unsafe impl Sync for PipelineData {}
// ── Stage functions ───────────────────────────────────────────────────────────
/// Opens a sequence file and returns an iterator over its raw Rope chunks.
/// Chunk-level I/O errors are logged and skipped.
fn open_chunks(path: PathBuf) -> io::Result<impl Iterator<Item = Rope>> {
let path_str = path
.to_str()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "non-UTF-8 path"))?;
let iter = obiread::read_sequence_chunks(path_str)?;
Ok(iter.filter_map(|r| match r {
Ok(rope) => Some(rope),
Err(e) => {
eprintln!("chunk read error: {e}");
None
}
}))
}
/// Normalises a raw sequence chunk (FASTA or FASTQ) into a compact ACGT/NUL rope.
fn normalize(rope: Rope, k: usize) -> io::Result<Rope> {
obiread::normalize_sequence_chunk(rope, k)
}
/// Extracts all super-kmers from a normalised rope.
fn build_superkmers(
rope: Rope,
k: usize,
m: usize,
level_max: usize,
theta: f64,
) -> Vec<SuperKmer> {
SuperKmerIter::new(&rope, k, m, level_max, theta).collect()
}
/// Writes a batch of super-kmers to the output sink.
fn write_batch(
batch: Vec<SuperKmer>,
out: &Mutex<BufWriter<io::Stdout>>,
partition_bits: usize,
k: usize,
m: usize,
) -> io::Result<()> {
let mut w = out.lock().unwrap();
let partition_mask = (1u64 << partition_bits) - 1;
for sk in batch {
let minimizer = sk
.kmer(sk.minimizer_pos() as usize, m)
.map_err(|e| std::io::Error::other(e))?
.canonical(m);
let partition = (minimizer.hash(m) & partition_mask) as usize;
write_scatter(&sk, &mut *w, k, m, partition_bits, minimizer)?;
}
Ok(())
}
#[inline]
fn mix64(x: u64) -> u64 {
let x = x ^ (x >> 30);
let x = x.wrapping_mul(0xbf58476d1ce4e5b9);
let x = x ^ (x >> 27);
let x = x.wrapping_mul(0x94d049bb133111eb);
x ^ (x >> 31)
}
// ── Entry point ───────────────────────────────────────────────────────────────
pub fn run(args: PartitionArgs) {
let k = args.kmer_size;
let m = args.minimizer_size;
let theta = args.theta;
let level_max = args.level_max;
let partition_bits = args.partition_bits;
let n_workers = args.threads.max(1);
let paths = args.inputs.iter().map(PathBuf::from).collect();
let path_source = obiread::PathIter::new(paths);
let out = Arc::new(Mutex::new(BufWriter::new(io::stdout())));
let out_sink = Arc::clone(&out);
let pipeline = make_pipeline! {
PipelineData,
source path_source => Path,
||? open_chunks : Path => RawChunk,
|? { move |rope| normalize(rope, k) } : RawChunk => NormChunk,
| { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch,
};
WorkerPool::new(pipeline, n_workers, 1).run();
out.lock().unwrap().flush().expect("flush error");
}
+23 -13
View File
@@ -31,12 +31,17 @@ pub struct SuperkmerArgs {
#[arg(long, default_value_t = 6)]
pub level_max: usize,
/// Number of partitions (minimizer_hash % partitions → partition id)
#[arg(short, long, default_value_t = 256)]
pub partitions: u64,
/// Number of bits to encode partitions (allows up to 2^partition_bits partitions)
#[arg(short, long, default_value_t = 8)]
pub partition_bits: usize,
/// Number of worker threads
#[arg(short = 'T', long, default_value_t = 16)]
#[arg(
short = 'T',
long,
default_value_t = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1))]
pub threads: usize,
}
@@ -44,7 +49,7 @@ enum PipelineData {
Path(PathBuf),
RawChunk(Rope),
NormChunk(Rope),
Batch(Vec<(u64, SuperKmer)>),
Batch(Vec<SuperKmer>),
}
// SAFETY: Rope contains Cell<u8> which is !Sync, but pipeline ownership transfers
@@ -82,22 +87,27 @@ fn build_superkmers(
m: usize,
level_max: usize,
theta: f64,
) -> Vec<(u64, SuperKmer)> {
) -> Vec<SuperKmer> {
SuperKmerIter::new(&rope, k, m, level_max, theta).collect()
}
/// Writes a batch of super-kmers to the output sink.
fn write_batch(
batch: Vec<(u64, SuperKmer)>,
batch: Vec<SuperKmer>,
out: &Mutex<BufWriter<io::Stdout>>,
partitions: u64,
partition_bits: usize,
k: usize,
m: usize,
) -> io::Result<()> {
let mut w = out.lock().unwrap();
for (min_hash, sk) in batch {
let partition = (mix64(min_hash) % partitions) as u32;
write_scatter(&sk, &mut *w, k, m, partition, min_hash)?;
let partition_mask = (1u64 << partition_bits) - 1;
for sk in batch {
let minimizer = sk
.kmer(sk.minimizer_pos() as usize, m)
.map_err(|e| std::io::Error::other(e))?
.canonical(m);
let partition = (minimizer.hash(m) & partition_mask) as usize;
write_scatter(&sk, &mut *w, k, m, partition_bits, minimizer)?;
}
Ok(())
}
@@ -118,7 +128,7 @@ pub fn run(args: SuperkmerArgs) {
let m = args.minimizer_size;
let theta = args.theta;
let level_max = args.level_max;
let partitions = args.partitions;
let partition_bits = args.partition_bits;
let n_workers = args.threads.max(1);
let paths = args.inputs.iter().map(PathBuf::from).collect();
@@ -133,7 +143,7 @@ pub fn run(args: SuperkmerArgs) {
||? open_chunks : Path => RawChunk,
|? { move |rope| normalize(rope, k) } : RawChunk => NormChunk,
| { move |rope| build_superkmers(rope, k, m, level_max, theta) }: NormChunk => Batch,
sink? { move |batch| write_batch(batch, &out_sink, partitions, k, m) } @ Batch,
sink? { move |batch| write_batch(batch, &out_sink, partition_bits, k, m) } @ Batch,
};
WorkerPool::new(pipeline, n_workers, 1).run();
+1
View File
@@ -4,3 +4,4 @@ version = "0.1.0"
edition = "2024"
[dependencies]
obiskio = { path = "../obiskio" }
+2 -2
View File
@@ -1,3 +1,3 @@
mod limits;
mod manager;
pub use limits::max_concurrent_files;
pub use manager::PartitionManager;
+11 -143
View File
@@ -1,150 +1,18 @@
use niffler::compression::{Format, Level, from_reader, from_writer};
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Read, Write};
use obiskio::SKFileWriter;
use std::path::Path;
// ---------- Format binaire d'un superkmer (encodage 2 bits) ----------
pub struct SuperKmerHeader(u32);
impl SuperKmerHeader {
pub fn seq_len(&self) -> usize {
let seql = (self.0 & 0xFF) as u8;
if seql == 0 { 256 } else { seql as usize }
}
pub fn to_bits(&self) -> u32 {
self.0
}
pub fn from_bits(bits: u32) -> Self {
Self(bits)
}
pub struct PartitionManager {
root_path: Box<Path>,
partitions_mask: u64,
writers: Vec<SKFileWriter>,
}
pub struct SuperKmer {
pub header: SuperKmerHeader,
pub seq: Box<[u8]>, // déjà encodée en 2 bits par base
}
impl SuperKmer {
/// Écrit ce superkmer dans un écrivain binaire (non compressé).
pub fn write_raw<W: Write>(&self, w: &mut W) -> io::Result<()> {
w.write_all(&self.header.to_bits().to_le_bytes())?;
w.write_all(&self.seq)?;
Ok(())
}
/// Lit un superkmer depuis un lecteur binaire (non compressé).
/// Retourne `None` si EOF.
pub fn read_raw<R: Read>(r: &mut R, buf_seq: &mut Vec<u8>) -> io::Result<Option<Self>> {
let mut header_bytes = [0u8; 4];
if let Err(e) = r.read_exact(&mut header_bytes) {
return if e.kind() == io::ErrorKind::UnexpectedEof {
Ok(None)
} else {
Err(e)
};
}
let header = SuperKmerHeader::from_bits(u32::from_le_bytes(header_bytes));
let len_bytes = (header.seq_len() + 3) / 4; // nombre d'octets encodés
buf_seq.clear();
buf_seq.resize(len_bytes, 0);
r.read_exact(buf_seq)?;
let seq = buf_seq.clone().into_boxed_slice();
Ok(Some(SuperKmer { header, seq }))
}
}
// ---------- PartitionManager avec compression (via niffler) ----------
pub enum CompressionFormat {
Gzip, // .gz
Zstd, // .zst
Lz4, // .lz4
Bgzf, // .bgzf (indexable par blocs)
None, // pas de compression
}
impl CompressionFormat {
/// Infère le format à partir de l'extension du fichier.
pub fn from_extension(path: &Path) -> Option<Self> {
match path.extension()?.to_str()? {
"gz" => Some(CompressionFormat::Gzip),
"zst" => Some(CompressionFormat::Zstd),
"lz4" => Some(CompressionFormat::Lz4),
"bgzf" => Some(CompressionFormat::Bgzf),
"raw" => Some(CompressionFormat::None),
_ => None,
}
}
/// Extension de fichier recommandée.
pub fn extension(&self) -> &'static str {
match self {
CompressionFormat::Gzip => "gz",
CompressionFormat::Zstd => "zst",
CompressionFormat::Lz4 => "lz4",
CompressionFormat::Bgzf => "bgzf",
CompressionFormat::None => "raw",
impl PartitionManager {
pub fn new(root_path: Box<Path>, n_partition_bits: usize) -> Self {
Self {
root_path,
partitions_mask: (1u64 << n_partition_bits) - 1,
writers: Vec::new(),
}
}
}
pub struct PartitionWriter {
writer: Box<dyn Write + Send>, // le flux compressé
// buffer interne pour réutiliser les écritures (optionnel)
}
impl PartitionWriter {
/// Ouvre un fichier en écriture avec la compression demandée.
pub fn create(path: &Path, format: CompressionFormat) -> io::Result<Self> {
let file = File::create(path)?;
const DEFAULT_LEVEL: Level = Level::Default; // peut être ajusté
let writer: Box<dyn Write + Send> = match format {
CompressionFormat::Gzip => Box::new(from_writer(file, Format::Gzip, DEFAULT_LEVEL)?),
CompressionFormat::Zstd => Box::new(from_writer(file, Format::Zstd, DEFAULT_LEVEL)?),
CompressionFormat::Lz4 => Box::new(from_writer(file, Format::Lz4, DEFAULT_LEVEL)?),
CompressionFormat::Bgzf => Box::new(from_writer(file, Format::Bgzf, DEFAULT_LEVEL)?),
CompressionFormat::None => Box::new(BufWriter::new(file)),
};
Ok(PartitionWriter { writer })
}
/// Écrit un superkmer (non compressé individuellement) dans le flux compressé.
pub fn write_kmer(&mut self, kmer: &SuperKmer) -> io::Result<()> {
kmer.write_raw(&mut self.writer)
}
/// Flush final.
pub fn finish(mut self) -> io::Result<()> {
self.writer.flush()
}
}
pub struct PartitionReader {
reader: Box<dyn Read + Send>,
seq_buf: Vec<u8>, // réutilisation pour les séquences
}
impl PartitionReader {
/// Ouvre un fichier en lecture. Détecte automatiquement le format de compression
/// grâce à `niffler::sniff` ou via l'extension.
pub fn open(path: &Path) -> io::Result<Self> {
let file = File::open(path)?;
// `niffler::sniff` examine les premiers octets pour choisir le décompresseur
let reader = match niffler::sniff(Box::new(file)) {
Ok(r) => r,
Err(e) => {
// Si aucune signature connue, on suppose raw
eprintln!("Aucune signature de compression trouvée, lecture brute.");
Box::new(BufReader::new(file)) as Box<dyn Read + Send>
}
};
Ok(PartitionReader {
reader,
seq_buf: Vec::with_capacity(256),
})
}
/// Lit le prochain superkmer. Retourne `None` à la fin du fichier.
pub fn read_next(&mut self) -> io::Result<Option<SuperKmer>> {
SuperKmer::read_raw(&mut self.reader, &mut self.seq_buf)
}
}
+8 -10
View File
@@ -60,7 +60,7 @@ impl<'a> SuperKmerIter<'a> {
self.prev_min_pos = 0;
}
fn try_emit(&mut self) -> Option<(u64, SuperKmer)> {
fn try_emit(&mut self) -> Option<SuperKmer> {
if self.scratch.len() < self.k {
return None;
}
@@ -72,14 +72,14 @@ impl<'a> SuperKmerIter<'a> {
sk.seql() - self.m - self.prev_min_pos
};
sk.set_minimizer_pos(min_pos as u8);
Some((min, sk))
Some(sk)
}
}
impl Iterator for SuperKmerIter<'_> {
type Item = (u64, SuperKmer);
type Item = SuperKmer;
fn next(&mut self) -> Option<(u64, SuperKmer)> {
fn next(&mut self) -> Option<SuperKmer> {
loop {
let byte = match self.cursor.read_next().ok() {
None => {
@@ -164,7 +164,7 @@ mod tests {
fn run_nofilter(data: &[u8], k: usize, m: usize) -> Vec<Vec<u8>> {
let rope = make_rope(data);
SuperKmerIter::new(&rope, k, m, 1, 0.0)
.map(|(_, sk)| sk.to_ascii())
.map(|sk| sk.to_ascii())
.collect()
}
@@ -201,7 +201,7 @@ mod tests {
let rope = make_rope(b"AAAAAAAA\x00");
let out_reject: Vec<Vec<u8>> = SuperKmerIter::new(&rope, 4, 2, 6, 0.9)
.map(|(_, sk)| sk.to_ascii())
.map(|sk| sk.to_ascii())
.collect();
assert!(out_reject.is_empty());
}
@@ -214,7 +214,7 @@ mod tests {
rope.push(data[..mid].to_vec());
rope.push(data[mid..].to_vec());
let out: Vec<Vec<u8>> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0)
.map(|(_, sk)| sk.to_ascii())
.map(|sk| sk.to_ascii())
.collect();
assert!(!out.is_empty());
}
@@ -222,9 +222,7 @@ mod tests {
#[test]
fn yields_minimizer_value() {
let rope = make_rope(b"ACGTACGT\x00");
let results: Vec<(u64, Vec<u8>)> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0)
.map(|(min, sk)| (min, sk.to_ascii()))
.collect();
let results: Vec<SuperKmer> = SuperKmerIter::new(&rope, 4, 2, 1, 0.0).collect();
assert!(!results.is_empty());
}
}