diff --git a/src/obiskio/src/pool.rs b/src/obiskio/src/pool.rs index 6b24aa1..85ce42c 100644 --- a/src/obiskio/src/pool.rs +++ b/src/obiskio/src/pool.rs @@ -152,8 +152,8 @@ impl SKFilePool { /// Ensure entry `id` has an open fd. Evicts LRU if at capacity. /// Must be called under pool lock. fn ensure_open(&mut self, id: usize) -> SKResult<()> { - if self.open.contains(&id) { - return Ok(()); + if self.open.get(&id).is_some() { + return Ok(()); // promotes to MRU } if self.entries[id].logically_closed { return Err(std::io::Error::new( @@ -272,27 +272,25 @@ impl SKFileWriter { /// Accumulate one SuperKmer. Drains to fd when `pending ≥ flush_threshold`. pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> { self.check_not_closed()?; - write_superkmer(&mut self.pending, sk)?; - self.meta.instances += 1; - self.meta.count_sum += sk.count() as u64; - self.meta.length_sum += sk.seql() as u64; - if self.pending.len() >= self.flush_threshold { - self.drain()?; - } - Ok(()) + self.write_one(sk) } /// Accumulate a slice of SuperKmers, draining whenever the threshold is exceeded. pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> { self.check_not_closed()?; for sk in sks { - write_superkmer(&mut self.pending, sk)?; - self.meta.instances += 1; - self.meta.count_sum += sk.count() as u64; - self.meta.length_sum += sk.seql() as u64; - if self.pending.len() >= self.flush_threshold { - self.drain()?; - } + self.write_one(sk)?; + } + Ok(()) + } + + fn write_one(&mut self, sk: &SuperKmer) -> SKResult<()> { + write_superkmer(&mut self.pending, sk)?; + self.meta.instances += 1; + self.meta.count_sum += sk.count() as u64; + self.meta.length_sum += sk.seql() as u64; + if self.pending.len() >= self.flush_threshold { + self.drain()?; } Ok(()) } @@ -308,7 +306,6 @@ impl SKFileWriter { { let mut pool = self.pool.lock().unwrap(); pool.ensure_open(self.id)?; - let _ = pool.open.get(&self.id); fd_arc = Arc::clone(&pool.entries[self.id].fd); fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock // pool drops here → pool lock released, fd lock still held @@ -390,7 +387,7 @@ impl SKFileWriter { /// Drain pending bytes to the fd (no compressor flush). /// /// Two-phase locking: - /// 1. Pool lock → ensure_open → promote MRU → acquire entry fd lock (under pool lock). + /// 1. Pool lock → ensure_open (promotes to MRU) → acquire entry fd lock (under pool lock). /// 2. Release pool lock. Write pending under entry fd lock only. /// /// Holding fd lock while releasing pool lock prevents eviction of our entry @@ -402,7 +399,6 @@ impl SKFileWriter { { let mut pool = self.pool.lock().unwrap(); pool.ensure_open(self.id)?; - let _ = pool.open.get(&self.id); // promote to MRU fd_arc = Arc::clone(&pool.entries[self.id].fd); fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock // pool drops here → pool lock released, fd lock still held