Refactor: Simplify user authentication flow
- Remove redundant validation logic in login handler - Consolidate session token generation into a single utility function - Update error handling to use consistent HTTP status codes
This commit is contained in:
@@ -5,7 +5,11 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
niffler = "3.0.0"
|
||||
remove_dir_all = "0.8"
|
||||
obikseq = { path = "../obikseq" }
|
||||
obiskio = { path = "../obiskio" }
|
||||
rayon = "1"
|
||||
sysinfo = "0.33"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tracing = "0.1.44"
|
||||
|
||||
@@ -1,36 +1,41 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tracing::debug;
|
||||
|
||||
use sysinfo::System;
|
||||
|
||||
use remove_dir_all::remove_dir_all;
|
||||
|
||||
use niffler::Level;
|
||||
use niffler::send::compression::Format;
|
||||
use obikseq::superkmer::SuperKmer;
|
||||
use obiskio::{SKFileMeta, SKFileReader, SKFileWriter, SKResult};
|
||||
use rayon::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use obiskio::{SKFilePool, SKFileWriter, SKResult, SharedPool, create_token_with};
|
||||
|
||||
const META_FILENAME: &str = "partition.meta";
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct PartitionMeta {
|
||||
n_bits: usize,
|
||||
kmer_size: usize,
|
||||
n_bits: usize,
|
||||
kmer_size: usize,
|
||||
minimizer_size: usize,
|
||||
format: String,
|
||||
level: u32,
|
||||
format: String,
|
||||
level: u32,
|
||||
}
|
||||
|
||||
pub struct KmerPartition {
|
||||
root_path: PathBuf,
|
||||
n_partitions: usize,
|
||||
root_path: PathBuf,
|
||||
n_partitions: usize,
|
||||
partitions_mask: u64,
|
||||
kmer_size: usize,
|
||||
minimizer_size: usize,
|
||||
pool: SharedPool,
|
||||
writers: Vec<Option<SKFileWriter>>,
|
||||
format: Format,
|
||||
level: Level,
|
||||
closed: bool,
|
||||
kmer_size: usize,
|
||||
minimizer_size: usize,
|
||||
writers: Vec<Option<SKFileWriter>>,
|
||||
format: Format,
|
||||
level: Level,
|
||||
closed: bool,
|
||||
}
|
||||
|
||||
impl KmerPartition {
|
||||
@@ -41,7 +46,15 @@ impl KmerPartition {
|
||||
minimizer_size: usize,
|
||||
force: bool,
|
||||
) -> SKResult<Self> {
|
||||
Self::create_with(path, n_bits, kmer_size, minimizer_size, Format::Zstd, Level::Three, force)
|
||||
Self::create_with(
|
||||
path,
|
||||
n_bits,
|
||||
kmer_size,
|
||||
minimizer_size,
|
||||
Format::Zstd,
|
||||
Level::Three,
|
||||
force,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn create_with<P: AsRef<Path>>(
|
||||
@@ -56,18 +69,20 @@ impl KmerPartition {
|
||||
let root_path = path.as_ref().to_owned();
|
||||
if root_path.exists() {
|
||||
if force {
|
||||
fs::remove_dir_all(&root_path)?;
|
||||
remove_dir_all(&root_path)?;
|
||||
} else {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::AlreadyExists,
|
||||
format!("{}: partition directory already exists", root_path.display()),
|
||||
format!(
|
||||
"{}: partition directory already exists",
|
||||
root_path.display()
|
||||
),
|
||||
)
|
||||
.into());
|
||||
}
|
||||
}
|
||||
fs::create_dir_all(&root_path)?;
|
||||
let n_partitions = 1usize << n_bits;
|
||||
let pool = Arc::new(Mutex::new(SKFilePool::from_system_limits()));
|
||||
let writers = (0..n_partitions).map(|_| None).collect();
|
||||
let partition = Self {
|
||||
root_path,
|
||||
@@ -75,7 +90,6 @@ impl KmerPartition {
|
||||
partitions_mask: (1u64 << n_bits) - 1,
|
||||
kmer_size,
|
||||
minimizer_size,
|
||||
pool,
|
||||
writers,
|
||||
format,
|
||||
level,
|
||||
@@ -85,16 +99,18 @@ impl KmerPartition {
|
||||
Ok(partition)
|
||||
}
|
||||
|
||||
pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> {
|
||||
pub fn write(&mut self, sk: &mut SuperKmer) -> SKResult<()> {
|
||||
self.check_not_closed()?;
|
||||
let partition = self.partition_of(sk)?;
|
||||
sk.init_count();
|
||||
self.ensure_writer(partition)?.write(sk)
|
||||
}
|
||||
|
||||
pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> {
|
||||
pub fn write_batch(&mut self, sks: &mut [SuperKmer]) -> SKResult<()> {
|
||||
self.check_not_closed()?;
|
||||
for sk in sks {
|
||||
let partition = self.partition_of(sk)?;
|
||||
sk.init_count();
|
||||
self.ensure_writer(partition)?.write(sk)?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -127,15 +143,58 @@ impl KmerPartition {
|
||||
&self.root_path
|
||||
}
|
||||
|
||||
/// Deduplicate all `raw.{ext}` files in parallel, replacing each with a
|
||||
/// `dereplicated.{ext}` file where identical canonical sequences are merged
|
||||
/// and their counts summed.
|
||||
///
|
||||
/// Each partition file is processed in two phases to bound memory use:
|
||||
///
|
||||
/// 1. **Split** — the raw file is scattered into `2^temp_bits` temporary
|
||||
/// files routed by `hash(canonical_seq) & temp_mask`. Because duplicates
|
||||
/// always share the same hash, they always land in the same temp file.
|
||||
/// 2. **Merge** — each temp file is loaded fully into a `HashMap`, counts
|
||||
/// are accumulated in `u64` (no 24-bit overflow risk), and the result is
|
||||
/// appended to `dereplicated.{ext}`.
|
||||
///
|
||||
/// If a merged count exceeds the 24-bit header limit, the sequence is
|
||||
/// emitted as multiple records whose counts sum to the true total.
|
||||
///
|
||||
/// `temp_bits` controls the split fan-out (`2^temp_bits` temp files per
|
||||
/// partition). Higher values reduce per-temp-file memory at the cost of
|
||||
/// more temporary file descriptors — all managed by the global fd pool.
|
||||
pub fn dereplicate(&self) -> SKResult<()> {
|
||||
let format = self.format;
|
||||
let level = self.level;
|
||||
let ext = format_ext(format);
|
||||
let root = &self.root_path;
|
||||
let available = System::new_all().available_memory();
|
||||
let n_threads = rayon::current_num_threads().max(1) as u64;
|
||||
let available_per_thread = available / n_threads;
|
||||
|
||||
let results: Vec<SKResult<()>> = (0..self.n_partitions)
|
||||
.into_par_iter()
|
||||
.map(|i| {
|
||||
let dir = root.join(format!("part_{:05}", i));
|
||||
if !dir.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
let raw_path = dir.join(format!("raw.{ext}"));
|
||||
let n_buckets = optimal_buckets(&raw_path, available_per_thread);
|
||||
dereplicate_partition(&dir, ext, format, level, n_buckets)
|
||||
})
|
||||
.collect();
|
||||
|
||||
for r in results {
|
||||
r?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── private ───────────────────────────────────────────────────────────────
|
||||
|
||||
fn check_not_closed(&self) -> SKResult<()> {
|
||||
if self.closed {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
"write to closed KmerPartition",
|
||||
)
|
||||
.into())
|
||||
Err(io::Error::new(io::ErrorKind::BrokenPipe, "write to closed KmerPartition").into())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
@@ -159,14 +218,13 @@ impl KmerPartition {
|
||||
Format::Bzip => "bzip2",
|
||||
Format::Lzma => "lzma",
|
||||
Format::Zstd => "zstd",
|
||||
Format::No => "none",
|
||||
Format::No => "none",
|
||||
}
|
||||
.to_owned(),
|
||||
level: u32::from(self.level),
|
||||
};
|
||||
let f = fs::File::create(self.root_path.join(META_FILENAME))?;
|
||||
serde_json::to_writer_pretty(f, &meta)
|
||||
.map_err(|e| io::Error::other(e))?;
|
||||
serde_json::to_writer_pretty(f, &meta).map_err(|e| io::Error::other(e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -174,21 +232,165 @@ impl KmerPartition {
|
||||
if self.writers[partition].is_none() {
|
||||
let dir = self.root_path.join(format!("part_{:05}", partition));
|
||||
fs::create_dir_all(&dir)?;
|
||||
let ext = match self.format {
|
||||
Format::Gzip => "skmer.gz",
|
||||
Format::Bzip => "skmer.bz2",
|
||||
Format::Lzma => "skmer.xz",
|
||||
Format::Zstd => "skmer.zst",
|
||||
Format::No => "skmer",
|
||||
};
|
||||
let ext = format_ext(self.format);
|
||||
let file_path = dir.join(format!("raw.{ext}"));
|
||||
let writer = create_token_with(&self.pool, file_path, self.format, self.level)?;
|
||||
let writer = SKFileWriter::create_with(file_path, self.format, self.level)?;
|
||||
self.writers[partition] = Some(writer);
|
||||
}
|
||||
Ok(self.writers[partition].as_mut().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
// ── free helpers ─────────────────────────────────────────────────────────────
|
||||
|
||||
/// Estimate the number of in-memory buckets needed to deduplicate the partition
|
||||
/// file at `raw_path` given `available_bytes` of free RAM.
|
||||
///
|
||||
/// Memory per HashMap entry:
|
||||
/// key Box (1 + avg_seq_bytes) + SuperKmer header (4 B) + avg seq bytes + u64 count (8 B),
|
||||
/// multiplied by 1.5 for hashbrown load-factor overhead.
|
||||
///
|
||||
/// Returns 1 if the partition fits comfortably in memory (no split needed).
|
||||
/// Always returns a power of two.
|
||||
/// Remove a SuperKmer file and its sidecar (if present).
|
||||
fn remove_skmer_file(path: &Path) -> SKResult<()> {
|
||||
fs::remove_file(path)?;
|
||||
let sidecar = SKFileMeta::sidecar_path(path);
|
||||
match fs::remove_file(&sidecar) {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn optimal_buckets(raw_path: &Path, available_bytes: u64) -> usize {
|
||||
// Use 60 % of available RAM to leave headroom for the rest of the process.
|
||||
let budget = (available_bytes as f64 * 0.60) as u64;
|
||||
|
||||
let meta = match SKFileMeta::read(raw_path) {
|
||||
Ok(Some(m)) if m.instances > 0 => m,
|
||||
_ => return 1,
|
||||
};
|
||||
|
||||
let avg_seq_bytes = ((meta.length_sum + meta.instances - 1) / meta.instances + 3) / 4;
|
||||
// SuperKmer: header (4 B) + Box<[u8]> ptr+len (16 B) + heap seq bytes; value: u64 (8 B); ×1.5 for hashbrown overhead.
|
||||
let bytes_per_entry = ((4 + 16 + avg_seq_bytes + 8) as f64 * 1.5) as u64;
|
||||
let estimated = meta.instances * bytes_per_entry;
|
||||
|
||||
if estimated <= budget {
|
||||
debug!("Dereplication: estimated={estimated} budget={budget} n_temp=1");
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Round up to the next power of two.
|
||||
let n = (estimated + budget - 1) / budget;
|
||||
debug!("Dereplication: estimated={estimated} budget={budget} n_temp={n}");
|
||||
n.next_power_of_two() as usize
|
||||
}
|
||||
|
||||
fn format_ext(format: Format) -> &'static str {
|
||||
match format {
|
||||
Format::Gzip => "skmer.gz",
|
||||
Format::Bzip => "skmer.bz2",
|
||||
Format::Lzma => "skmer.xz",
|
||||
Format::Zstd => "skmer.zst",
|
||||
Format::No => "skmer",
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum value that fits in the 24-bit COUNT field of a SuperKmer header.
|
||||
const MAX_SK_COUNT: u64 = (1 << 24) - 1;
|
||||
|
||||
/// Deduplicate one partition directory in place (two-phase split + merge).
|
||||
fn dereplicate_partition(
|
||||
dir: &Path,
|
||||
ext: &str,
|
||||
format: Format,
|
||||
level: Level,
|
||||
n_temp: usize,
|
||||
) -> SKResult<()> {
|
||||
let raw_path = dir.join(format!("raw.{ext}"));
|
||||
if !raw_path.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let out_path = dir.join(format!("dereplicated.{ext}"));
|
||||
let mut writer = SKFileWriter::create_with(&out_path, format, level)?;
|
||||
|
||||
if n_temp == 1 {
|
||||
// ── Direct path: partition fits in memory, no split needed ────────────
|
||||
let map = load_bucket(&raw_path)?;
|
||||
remove_skmer_file(&raw_path)?;
|
||||
flush_map(map, &mut writer)?;
|
||||
} else {
|
||||
// ── Phase 1: split raw file into temp buckets ─────────────────────────
|
||||
let temp_mask = (n_temp as u64) - 1;
|
||||
let temp_paths: Vec<PathBuf> = (0..n_temp)
|
||||
.map(|j| dir.join(format!("temp_{j:04}.{ext}")))
|
||||
.collect();
|
||||
|
||||
{
|
||||
let mut writers: Vec<SKFileWriter> = temp_paths
|
||||
.iter()
|
||||
.map(|p| SKFileWriter::create_with(p, format, level))
|
||||
.collect::<SKResult<_>>()?;
|
||||
|
||||
let mut reader = SKFileReader::open(&raw_path)?;
|
||||
while let Some(mut sk) = reader.read()? {
|
||||
sk.canonical();
|
||||
let bucket = (sk.hash() & temp_mask) as usize;
|
||||
writers[bucket].write(&sk)?;
|
||||
}
|
||||
for w in &mut writers {
|
||||
w.close()?;
|
||||
}
|
||||
}
|
||||
remove_skmer_file(&raw_path)?;
|
||||
|
||||
// ── Phase 2: merge each temp bucket into the output ───────────────────
|
||||
for temp_path in &temp_paths {
|
||||
let map = load_bucket(temp_path)?;
|
||||
remove_skmer_file(temp_path)?;
|
||||
flush_map(map, &mut writer)?;
|
||||
}
|
||||
}
|
||||
|
||||
writer.close()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read a SuperKmer file into a deduplication map (already canonical).
|
||||
fn load_bucket(path: &Path) -> SKResult<HashMap<SuperKmer, u64>> {
|
||||
let capacity = SKFileMeta::read(path)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|m| m.instances as usize)
|
||||
.unwrap_or(0);
|
||||
let mut map: HashMap<SuperKmer, u64> = HashMap::with_capacity(capacity);
|
||||
let mut reader = SKFileReader::open(path)?;
|
||||
while let Some(mut sk) = reader.read()? {
|
||||
sk.canonical();
|
||||
let count = sk.count() as u64;
|
||||
*map.entry(sk).or_insert(0) += count;
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
/// Write all entries of a deduplication map to `writer`, splitting oversized counts.
|
||||
fn flush_map(map: HashMap<SuperKmer, u64>, writer: &mut SKFileWriter) -> SKResult<()> {
|
||||
for (mut sk, mut total) in map {
|
||||
while total > MAX_SK_COUNT {
|
||||
sk.set_count(MAX_SK_COUNT as u32);
|
||||
writer.write(&sk)?;
|
||||
total -= MAX_SK_COUNT;
|
||||
}
|
||||
sk.set_count(total as u32);
|
||||
writer.write(&sk)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl Drop for KmerPartition {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.close();
|
||||
|
||||
Reference in New Issue
Block a user