feat: add input file logging and optimize path traversal
Instrument index and scatter stages with `tracing::info` to log input file paths for better runtime observability. Additionally, optimize the path iterator by replacing redundant `is_dir()` checks with explicit `is_file()` validation and deferring metadata resolution, eliminating unnecessary `stat()` syscalls and improving traversal performance on high-latency network filesystems like Lustre and NFS.
This commit is contained in:
@@ -100,10 +100,6 @@ pub fn run(args: IndexArgs) {
|
|||||||
|
|
||||||
// ── Stage 1: scatter ─────────────────────────────────────────────────────
|
// ── Stage 1: scatter ─────────────────────────────────────────────────────
|
||||||
if idx.state() < IndexState::Scattered {
|
if idx.state() < IndexState::Scattered {
|
||||||
let paths: Vec<_> = args.common.seqfile_paths().collect();
|
|
||||||
for path in &paths {
|
|
||||||
info!("indexing: {}", path.display());
|
|
||||||
}
|
|
||||||
let k = idx.kmer_size();
|
let k = idx.kmer_size();
|
||||||
let level_max = args.common.level_max;
|
let level_max = args.common.level_max;
|
||||||
let theta = args.common.theta;
|
let theta = args.common.theta;
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use std::time::{Duration, Instant};
|
|||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use obikpartitionner::KmerPartition;
|
use obikpartitionner::KmerPartition;
|
||||||
use obisys::{Reporter, Stage};
|
use obisys::{Reporter, Stage};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
use crate::cli::{PipelineData, open_chunks};
|
use crate::cli::{PipelineData, open_chunks};
|
||||||
|
|
||||||
@@ -11,7 +12,7 @@ use crate::cli::{PipelineData, open_chunks};
|
|||||||
/// Reports the "scatter" stage to `rep`.
|
/// Reports the "scatter" stage to `rep`.
|
||||||
pub fn scatter(
|
pub fn scatter(
|
||||||
kp: &mut KmerPartition,
|
kp: &mut KmerPartition,
|
||||||
path_source: obiread::PathIter,
|
path_source: impl Iterator<Item = std::path::PathBuf> + Send + 'static,
|
||||||
k: usize,
|
k: usize,
|
||||||
level_max: usize,
|
level_max: usize,
|
||||||
theta: f64,
|
theta: f64,
|
||||||
@@ -21,6 +22,8 @@ pub fn scatter(
|
|||||||
use obikseq::RoutableSuperKmer;
|
use obikseq::RoutableSuperKmer;
|
||||||
|
|
||||||
let t = Stage::start("scatter");
|
let t = Stage::start("scatter");
|
||||||
|
let path_source = path_source.inspect(|p| info!("indexing: {}", p.display()));
|
||||||
|
|
||||||
let pipe = obipipeline::make_pipe! {
|
let pipe = obipipeline::make_pipe! {
|
||||||
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
PipelineData : PathBuf => Vec<RoutableSuperKmer>,
|
||||||
||? { |path| open_chunks(path) } : Path => RawChunk,
|
||? { |path| open_chunks(path) } : Path => RawChunk,
|
||||||
|
|||||||
@@ -9,16 +9,25 @@ pub struct PathIter {
|
|||||||
|
|
||||||
impl PathIter {
|
impl PathIter {
|
||||||
/// Creates a new `PathIter` that will yield paths to fasta or fastq files.
|
/// Creates a new `PathIter` that will yield paths to fasta or fastq files.
|
||||||
|
///
|
||||||
|
/// For non-directory paths only the file extension is checked (no stat syscall).
|
||||||
|
/// Directories are left on the stack and expanded lazily in `next()`.
|
||||||
|
/// Invalid or missing paths are silently skipped when `open_chunks` fails later.
|
||||||
pub fn new(paths: Vec<PathBuf>) -> Self {
|
pub fn new(paths: Vec<PathBuf>) -> Self {
|
||||||
let mut iter = PathIter {
|
let mut iter = PathIter {
|
||||||
dir_stack: Vec::new(),
|
dir_stack: Vec::new(),
|
||||||
file_buffer: Vec::new(),
|
file_buffer: Vec::new(),
|
||||||
};
|
};
|
||||||
for path in paths {
|
for path in paths {
|
||||||
if path.is_dir() {
|
// Avoid stat() at construction time on network filesystems (Lustre, NFS)
|
||||||
iter.dir_stack.push(path);
|
// where metadata operations can be 100s of milliseconds each.
|
||||||
} else if path.is_file() && is_fasta_or_fastq(&path) {
|
// Paths that look like sequence files are assumed to be files.
|
||||||
|
// Anything else is treated as a potential directory and expanded lazily
|
||||||
|
// in next(); read_dir errors are silently skipped.
|
||||||
|
if is_fasta_or_fastq(&path) {
|
||||||
iter.file_buffer.push(path);
|
iter.file_buffer.push(path);
|
||||||
|
} else {
|
||||||
|
iter.dir_stack.push(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
iter
|
iter
|
||||||
@@ -40,9 +49,12 @@ impl Iterator for PathIter {
|
|||||||
};
|
};
|
||||||
for entry in entries.flatten() {
|
for entry in entries.flatten() {
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
if path.is_dir() {
|
// Prefer file_type() over path.is_dir()/is_file() — on most filesystems
|
||||||
|
// (including Lustre with DT_UNKNOWN fallback) this avoids an extra stat.
|
||||||
|
let is_dir = entry.file_type().map(|t| t.is_dir()).unwrap_or(false);
|
||||||
|
if is_dir {
|
||||||
self.dir_stack.push(path);
|
self.dir_stack.push(path);
|
||||||
} else if path.is_file() && is_fasta_or_fastq(&path) {
|
} else if is_fasta_or_fastq(&path) {
|
||||||
self.file_buffer.push(path);
|
self.file_buffer.push(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user