refactor(pool): optimize write batch and LRU cache handling
Replaces the `write_one` loop with threshold-based draining and inlines metadata updates. Explicitly promotes accessed entries to MRU during flush and drain operations to prevent premature LRU eviction. Updates comments to clarify two-phase locking and MRU promotion semantics.
This commit is contained in:
+16
-20
@@ -152,8 +152,8 @@ impl SKFilePool {
|
||||
/// Ensure entry `id` has an open fd. Evicts LRU if at capacity.
|
||||
/// Must be called under pool lock.
|
||||
fn ensure_open(&mut self, id: usize) -> SKResult<()> {
|
||||
if self.open.contains(&id) {
|
||||
return Ok(());
|
||||
if self.open.get(&id).is_some() {
|
||||
return Ok(()); // promotes to MRU
|
||||
}
|
||||
if self.entries[id].logically_closed {
|
||||
return Err(std::io::Error::new(
|
||||
@@ -272,27 +272,25 @@ impl SKFileWriter {
|
||||
/// Accumulate one SuperKmer. Drains to fd when `pending ≥ flush_threshold`.
|
||||
pub fn write(&mut self, sk: &SuperKmer) -> SKResult<()> {
|
||||
self.check_not_closed()?;
|
||||
write_superkmer(&mut self.pending, sk)?;
|
||||
self.meta.instances += 1;
|
||||
self.meta.count_sum += sk.count() as u64;
|
||||
self.meta.length_sum += sk.seql() as u64;
|
||||
if self.pending.len() >= self.flush_threshold {
|
||||
self.drain()?;
|
||||
}
|
||||
Ok(())
|
||||
self.write_one(sk)
|
||||
}
|
||||
|
||||
/// Accumulate a slice of SuperKmers, draining whenever the threshold is exceeded.
|
||||
pub fn write_batch(&mut self, sks: &[SuperKmer]) -> SKResult<()> {
|
||||
self.check_not_closed()?;
|
||||
for sk in sks {
|
||||
write_superkmer(&mut self.pending, sk)?;
|
||||
self.meta.instances += 1;
|
||||
self.meta.count_sum += sk.count() as u64;
|
||||
self.meta.length_sum += sk.seql() as u64;
|
||||
if self.pending.len() >= self.flush_threshold {
|
||||
self.drain()?;
|
||||
}
|
||||
self.write_one(sk)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_one(&mut self, sk: &SuperKmer) -> SKResult<()> {
|
||||
write_superkmer(&mut self.pending, sk)?;
|
||||
self.meta.instances += 1;
|
||||
self.meta.count_sum += sk.count() as u64;
|
||||
self.meta.length_sum += sk.seql() as u64;
|
||||
if self.pending.len() >= self.flush_threshold {
|
||||
self.drain()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -308,7 +306,6 @@ impl SKFileWriter {
|
||||
{
|
||||
let mut pool = self.pool.lock().unwrap();
|
||||
pool.ensure_open(self.id)?;
|
||||
let _ = pool.open.get(&self.id);
|
||||
fd_arc = Arc::clone(&pool.entries[self.id].fd);
|
||||
fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock
|
||||
// pool drops here → pool lock released, fd lock still held
|
||||
@@ -390,7 +387,7 @@ impl SKFileWriter {
|
||||
/// Drain pending bytes to the fd (no compressor flush).
|
||||
///
|
||||
/// Two-phase locking:
|
||||
/// 1. Pool lock → ensure_open → promote MRU → acquire entry fd lock (under pool lock).
|
||||
/// 1. Pool lock → ensure_open (promotes to MRU) → acquire entry fd lock (under pool lock).
|
||||
/// 2. Release pool lock. Write pending under entry fd lock only.
|
||||
///
|
||||
/// Holding fd lock while releasing pool lock prevents eviction of our entry
|
||||
@@ -402,7 +399,6 @@ impl SKFileWriter {
|
||||
{
|
||||
let mut pool = self.pool.lock().unwrap();
|
||||
pool.ensure_open(self.id)?;
|
||||
let _ = pool.open.get(&self.id); // promote to MRU
|
||||
fd_arc = Arc::clone(&pool.entries[self.id].fd);
|
||||
fd_guard = fd_arc.lock().unwrap(); // acquire fd lock under pool lock
|
||||
// pool drops here → pool lock released, fd lock still held
|
||||
|
||||
Reference in New Issue
Block a user