Push rwqsmuvystym #24

Merged
coissac merged 3 commits from push-rwqsmuvystym into main 2026-06-12 19:33:21 +00:00
6 changed files with 258 additions and 211 deletions
+1
View File
@@ -1562,6 +1562,7 @@ dependencies = [
"obikrope", "obikrope",
"obikseq", "obikseq",
"obilayeredmap", "obilayeredmap",
"obipipeline",
"obiread", "obiread",
"obiskbuilder", "obiskbuilder",
"obiskio", "obiskio",
+50 -170
View File
@@ -2,11 +2,8 @@ use std::collections::HashMap;
use std::fs; use std::fs;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use obisys::{MemoryBudget, Reporter, Stage, available_memory_bytes, progress_bar, spinner}; use obisys::{Reporter, Stage, progress_bar, spinner};
use rayon::prelude::*;
use tracing::{debug, info}; use tracing::{debug, info};
use obilayeredmap::IndexMode; use obilayeredmap::IndexMode;
@@ -22,10 +19,9 @@ pub use obikpartitionner::MergeMode;
#[derive(Debug)] #[derive(Debug)]
struct PartStat { struct PartStat {
id: usize, id: usize,
unitig_bytes: u64, // sum of unitigs.bin across remaining sources unitig_bytes: u64,
g_len: usize, // actual new kmers inserted into GraphDeBruijn g_len: usize,
exp_at_acquire: f64, // expansion factor used to size the budget reservation
} }
// ── main merge entry point ──────────────────────────────────────────────────── // ── main merge entry point ────────────────────────────────────────────────────
@@ -195,8 +191,7 @@ impl KmerIndex {
let mut order: Vec<usize> = (0..n_partitions).collect(); let mut order: Vec<usize> = (0..n_partitions).collect();
order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i])); order.sort_unstable_by_key(|&i| std::cmp::Reverse(partition_sizes[i]));
// ── Sequential pilot: worst partition → seed expansion factor ───── // ── First partition (largest) ─────────────────────────────────────
const FALLBACK_EXPANSION: u64 = 4_000; // 4× in fixed-point ×1000
let worst_id = order[0]; let worst_id = order[0];
let worst_bytes = partition_sizes[worst_id]; let worst_bytes = partition_sizes[worst_id];
@@ -205,108 +200,39 @@ impl KmerIndex {
.map_err(OKIError::Partition)?; .map_err(OKIError::Partition)?;
pb.inc(1); pb.inc(1);
let seed_expansion = if worst_bytes > 0 {
worst_g_len as u64 * 16 * 1000 / worst_bytes
} else {
FALLBACK_EXPANSION
};
info!( info!(
"merge_partitions: pilot partition {} — {} unitig bytes → {} new kmers, \ "merge_partitions: first partition {} — {} unitig bytes → {} new kmers",
expansion {:.2}×", worst_id, fmt_bytes(worst_bytes), worst_g_len,
worst_id, worst_bytes, worst_g_len,
seed_expansion as f64 / 1000.0,
); );
let part_stats: Arc<Mutex<Vec<PartStat>>> = Arc::new(Mutex::new({ let mut part_stats: Vec<PartStat> = Vec::with_capacity(n_partitions);
let mut v = Vec::with_capacity(n_partitions); part_stats.push(PartStat {
v.push(PartStat { id: worst_id,
id: worst_id, unitig_bytes: worst_bytes,
unitig_bytes: worst_bytes, g_len: worst_g_len,
g_len: worst_g_len, });
exp_at_acquire: seed_expansion as f64 / 1000.0,
});
v
}));
let max_expansion = AtomicU64::new(seed_expansion); // ── Sequential remainder ──────────────────────────────────────────
// One partition at a time; each partition uses an internal pipeline
// (obipipeline) to parallelise file I/O and dst_map filtering.
let _ = budget_fraction; // kept in signature for CLI compatibility
for &i in &order[1..] {
let ubytes = partition_sizes[i];
debug!("partition {i}: start — {} unitig bytes", fmt_bytes(ubytes));
// ── Parallel remainder under memory budget ──────────────────────── let g_len = dst_partition
let available = available_memory_bytes(); .merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence)
let budget_bytes = (available as f64 * budget_fraction) as u64; .map_err(OKIError::Partition)?;
let budget = Arc::new(MemoryBudget::new(budget_bytes)); pb.inc(1);
info!( debug!("partition {i}: done — {} new kmers", g_len);
"merge_partitions: available RAM {}, budget {:.0}% = {}", part_stats.push(PartStat { id: i, unitig_bytes: ubytes, g_len });
fmt_bytes(available),
budget_fraction * 100.0,
fmt_bytes(budget_bytes),
);
let errors: Vec<OKIError> = order[1..].into_par_iter()
.filter_map(|&i| {
let ubytes = partition_sizes[i];
let exp = max_expansion.load(Ordering::Relaxed);
let cost = ubytes * exp / 1000;
budget.acquire(cost);
debug!(
"partition {i}: start — est. {} ({:.2}×), \
{} workers active, {} budget remaining",
fmt_bytes(cost),
exp as f64 / 1000.0,
budget.active(),
fmt_bytes(budget.remaining()),
);
let result = dst_partition
.merge_partition(i, &srcs, mode, n_dst_genomes, block_bits, &evidence);
budget.release(cost);
pb.inc(1);
match result {
Ok(g_len) => {
let actual_exp = if ubytes > 0 {
g_len as u64 * 16 * 1000 / ubytes
} else {
0
};
max_expansion.fetch_max(actual_exp, Ordering::Relaxed);
debug!(
"partition {i}: done — {} new kmers, actual {:.2}× \
(estimated {:.2}×)",
g_len,
actual_exp as f64 / 1000.0,
exp as f64 / 1000.0,
);
part_stats.lock().unwrap().push(PartStat {
id: i,
unitig_bytes: ubytes,
g_len,
exp_at_acquire: exp as f64 / 1000.0,
});
None
}
Err(e) => Some(OKIError::Partition(e)),
}
})
.collect();
pb.finish_and_clear();
if let Some(e) = errors.into_iter().next() {
return Err(e);
} }
pb.finish_and_clear();
// ── Diagnostic report ───────────────────────────────────────────── // ── Diagnostic report ─────────────────────────────────────────────
let stats = Arc::try_unwrap(part_stats).unwrap().into_inner().unwrap(); print_merge_partition_report(&part_stats);
print_merge_partition_report(
&stats,
available,
budget_fraction,
seed_expansion as f64 / 1000.0,
max_expansion.load(Ordering::Relaxed) as f64 / 1000.0,
budget.peak_active(),
);
rep.push(t.stop()); rep.push(t.stop());
} }
@@ -328,82 +254,36 @@ impl KmerIndex {
// ── Diagnostic report ───────────────────────────────────────────────────────── // ── Diagnostic report ─────────────────────────────────────────────────────────
fn print_merge_partition_report( fn print_merge_partition_report(stats: &[PartStat]) {
stats: &[PartStat], let total_new: usize = stats.iter().map(|s| s.g_len).sum();
available_ram: u64, let non_empty = stats.iter().filter(|s| s.unitig_bytes > 0).count();
budget_fraction: f64,
seed_expansion: f64,
final_expansion: f64,
peak_active: usize,
) {
// Compute actual expansion per partition (skip empty partitions)
let expansions: Vec<(usize, f64)> = stats
.iter()
.filter(|s| s.unitig_bytes > 0)
.map(|s| (s.id, s.g_len as f64 * 16.0 / s.unitig_bytes as f64))
.collect();
if expansions.is_empty() { if non_empty == 0 {
info!("merge_partitions report: no data (all partitions empty)"); info!("merge_partitions report: no data (all partitions empty)");
return; return;
} }
let mut sorted_exp: Vec<f64> = expansions.iter().map(|(_, e)| *e).collect(); info!("─── merge_partitions report ───");
sorted_exp.sort_by(|a, b| a.partial_cmp(b).unwrap());
let n = sorted_exp.len();
let mean_exp = sorted_exp.iter().sum::<f64>() / n as f64;
let median_exp = sorted_exp[n / 2];
let max_exp = sorted_exp[n - 1];
info!("─── merge_partitions memory report ───");
info!( info!(
" available RAM : {} budget {:.0}% = {}", " {} partition(s) processed, {} total new kmers",
fmt_bytes(available_ram), non_empty, total_new,
budget_fraction * 100.0,
fmt_bytes((available_ram as f64 * budget_fraction) as u64),
); );
info!(
" expansion factor — seed: {:.2}× final max: {:.2}× \
(mean: {:.2}× median: {:.2}× observed max: {:.2}×)",
seed_expansion, final_expansion, mean_exp, median_exp, max_exp,
);
info!(" peak concurrent workers: {}", peak_active);
// Histogram of actual expansion factors // Top 8 partitions by new-kmer count
let min_e = sorted_exp[0]; let mut by_new: Vec<&PartStat> = stats.iter().filter(|s| s.g_len > 0).collect();
let max_e = sorted_exp[n - 1]; by_new.sort_by_key(|s| std::cmp::Reverse(s.g_len));
let n_buckets = 8usize; if !by_new.is_empty() {
let bucket_w = (max_e - min_e).max(0.01) / n_buckets as f64; info!(" top partitions by new kmers:");
let mut counts = vec![0usize; n_buckets]; for s in by_new.iter().take(8) {
for &e in &sorted_exp { info!(
let b = (((e - min_e) / bucket_w) as usize).min(n_buckets - 1); " partition {:4} : {}M new kmers ({} unitig bytes)",
counts[b] += 1; s.id,
s.g_len / 1_000_000,
fmt_bytes(s.unitig_bytes),
);
}
} }
let max_count = *counts.iter().max().unwrap(); info!("───────────────────────────────");
info!(" expansion factor distribution ({} partitions with data):", n);
for (i, &c) in counts.iter().enumerate() {
let lo = min_e + i as f64 * bucket_w;
let hi = min_e + (i + 1) as f64 * bucket_w;
let bar = "".repeat(if max_count > 0 { c * 30 / max_count } else { 0 });
info!(" {:5.2}× {:5.2}× │{:<30} {}", lo, hi, bar, c);
}
// Top 8 by actual expansion
let mut by_exp: Vec<(usize, f64)> = expansions.clone();
by_exp.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
info!(" top partitions by actual expansion factor:");
for (id, exp) in by_exp.iter().take(8) {
let s = stats.iter().find(|s| s.id == *id).unwrap();
info!(
" partition {:4} : {:.2}× ({} unitigs → {}M kmers, \
reserved at {:.2}×)",
id, exp,
fmt_bytes(s.unitig_bytes),
s.g_len / 1_000_000,
s.exp_at_acquire,
);
}
info!("──────────────────────────────────────");
} }
// ── helpers ─────────────────────────────────────────────────────────────────── // ── helpers ───────────────────────────────────────────────────────────────────
+2 -1
View File
@@ -28,4 +28,5 @@ memmap2 = "0.9.10"
obicompactvec = { path = "../obicompactvec" } obicompactvec = { path = "../obicompactvec" }
ptr_hash = "1.1" ptr_hash = "1.1"
indicatif = "0.17" indicatif = "0.17"
obisys = { path = "../obisys" } obisys = { path = "../obisys" }
obipipeline = { path = "../obipipeline" }
+198 -40
View File
@@ -1,6 +1,9 @@
use std::fs; use std::fs;
use std::io; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use obipipeline::{Pipeline, WorkerPool, make_flat_transform, make_sink, make_source, make_transform};
use obicompactvec::{ use obicompactvec::{
PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder, PersistentBitMatrix, PersistentBitMatrixBuilder, PersistentBitVecBuilder,
@@ -173,7 +176,7 @@ impl KmerPartition {
} }
load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open load_meta(&dst_index_dir)?; // ensure meta.json exists before LayeredMap::open
let dst_map = LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?; let dst_map = Arc::new(LayeredMap::<()>::open(&dst_index_dir).map_err(olm_to_sk)?);
let n_dst_layers = dst_map.n_layers(); let n_dst_layers = dst_map.n_layers();
let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum(); let n_src_total: usize = sources.iter().map(|(_, n)| *n).sum();
@@ -187,29 +190,95 @@ impl KmerPartition {
} }
} }
// ── Pass 1: classify kmers, build new-kmer de Bruijn graph ────────── // ── Pass 1: pipeline — parallel file read + dst_map filter + graph fill
let mut g = GraphDeBruijn::new(); //
let mut any_new = false; // Source : list of unitigs.bin paths (one per source × layer)
// Flat : open file, emit Vec<CanonicalKmer> batches (BeeGFS parallel I/O)
// Transform: filter via dst_map.query() — thread-safe, LayeredMap<()>: Sync
// Sink : push new kmers into GraphDeBruijn (single thread, no locks needed)
// Collect file paths (propagates load_meta errors before the pipeline starts)
let mut unitig_paths: Vec<PathBuf> = Vec::new();
for (src, _) in sources.iter() { for (src, _) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() { if !src_index_dir.exists() {
continue; continue;
} }
let src_meta = load_meta(&src_index_dir)?; let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers { for l in 0..src_meta.n_layers {
let unitigs_path = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin"); let p = src_index_dir.join(format!("layer_{l}")).join("unitigs.bin");
let reader = UnitigFileReader::open_sequential(&unitigs_path)?; if p.exists() {
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { unitig_paths.push(p);
if dst_map.query(kmer).is_none() {
g.push(kmer);
any_new = true;
}
} }
} }
} }
enum Pass1Data {
File(PathBuf),
Batch(Vec<CanonicalKmer>),
NewKmers(Vec<CanonicalKmer>),
}
const BATCH: usize = 4096;
let n_workers = std::thread::available_parallelism().map_or(4, |n| n.get());
let capacity = n_workers * 8;
let dst_filter = Arc::clone(&dst_map);
let g_shared = Arc::new(Mutex::new(GraphDeBruijn::new()));
let g_sink = Arc::clone(&g_shared);
let pass1_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let err_cap = Arc::clone(&pass1_err);
let pipeline = Pipeline::new(
make_source!(Pass1Data, unitig_paths, File),
vec![
make_flat_transform!(Pass1Data, {
move |path: PathBuf| -> Vec<Vec<CanonicalKmer>> {
match UnitigFileReader::open_sequential(&path) {
Err(e) => {
*err_cap.lock().unwrap() = Some(e.to_string());
vec![]
}
Ok(reader) => {
let kmers: Vec<CanonicalKmer> = reader
.iter_indexed_canonical_kmers()
.map(|(k, _, _)| k)
.collect();
kmers.chunks(BATCH).map(|c| c.to_vec()).collect()
}
}
}
}, File, Batch),
make_transform!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| -> Vec<CanonicalKmer> {
batch.into_iter()
.filter(|&k| dst_filter.query(k).is_none())
.collect()
}
}, Batch, NewKmers),
],
make_sink!(Pass1Data, {
move |batch: Vec<CanonicalKmer>| {
let mut g = g_sink.lock().unwrap();
for kmer in batch {
g.push(kmer);
}
}
}, NewKmers),
);
WorkerPool::new(pipeline, n_workers, capacity).run();
if let Some(msg) = Arc::try_unwrap(pass1_err).unwrap().into_inner().unwrap() {
return Err(SKError::InvalidData { context: "merge pass1", detail: msg });
}
let g = Arc::try_unwrap(g_shared)
.unwrap_or_else(|_| panic!("pass1: g_shared not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
let any_new = g.len() > 0;
// Build new layer from de Bruijn graph if there are new kmers. // Build new layer from de Bruijn graph if there are new kmers.
let new_layer_idx = n_dst_layers; let new_layer_idx = n_dst_layers;
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
@@ -223,15 +292,17 @@ impl KmerPartition {
uw.write(&unitig) uw.write(&unitig)
})?; })?;
uw.close()?; uw.close()?;
let n = g.len();
drop(g); // release GraphDeBruijn before MPHF build
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
g.len() n
} else { } else {
drop(g);
0 0
}; };
drop(g);
let new_mphf = if any_new { let new_mphf: Option<Arc<MphfOnly>> = if any_new {
Some(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?) Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?))
} else { } else {
None None
}; };
@@ -239,7 +310,7 @@ impl KmerPartition {
// ── Prepare matrix directories for the new layer ────────────────────── // ── Prepare matrix directories for the new layer ──────────────────────
// Absent columns (dst genomes) are written via append_column (all-zero/false). // Absent columns (dst genomes) are written via append_column (all-zero/false).
// Source-genome columns are created as mutable builders for pass 2. // Source-genome columns are created as mutable builders for pass 2.
let mut new_src_builders: Vec<ColBuilder> = if any_new { let new_src_builders: Vec<ColBuilder> = if any_new {
let data_dir = match mode { let data_dir = match mode {
MergeMode::Presence => new_layer_dir.join("presence"), MergeMode::Presence => new_layer_dir.join("presence"),
MergeMode::Count => new_layer_dir.join("counts"), MergeMode::Count => new_layer_dir.join("counts"),
@@ -291,7 +362,7 @@ impl KmerPartition {
// Builders for existing layers: n_src_total per layer. // Builders for existing layers: n_src_total per layer.
// Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1. // Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1.
let mut exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers) let exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers)
.map(|l| { .map(|l| {
let layer_dir = dst_index_dir.join(format!("layer_{l}")); let layer_dir = dst_index_dir.join(format!("layer_{l}"));
let n = dst_map.layer(l).n(); let n = dst_map.layer(l).n();
@@ -320,37 +391,124 @@ impl KmerPartition {
}) })
.collect::<SKResult<_>>()?; .collect::<SKResult<_>>()?;
// ── Pass 2: fill builders ───────────────────────────────────────────── // ── Pass 2: fill builders (pipeline) ─────────────────────────────────
let mut col_offset = 0usize; // Collect source items before the pipeline so load_meta errors propagate
for (src, src_n) in sources.iter() { // via ? before any worker thread is spawned.
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR); let mut pass2_items: Vec<(usize, usize, PathBuf)> = Vec::new();
if !src_index_dir.exists() { {
let mut col_offset = 0usize;
for (src, src_n) in sources.iter() {
let src_index_dir = src.part_dir(i).join(INDEX_SUBDIR);
if !src_index_dir.exists() {
col_offset += src_n;
continue;
}
let src_meta = load_meta(&src_index_dir)?;
for l in 0..src_meta.n_layers {
let src_layer_dir = src_index_dir.join(format!("layer_{l}"));
if src_layer_dir.join("unitigs.bin").exists() {
pass2_items.push((col_offset, *src_n, src_layer_dir));
}
}
col_offset += src_n; col_offset += src_n;
continue;
} }
let src_meta = load_meta(&src_index_dir)?; }
for l in 0..src_meta.n_layers { enum Pass2Data {
let src_layer_dir = src_index_dir.join(format!("layer_{l}")); SrcLayer((usize, usize, PathBuf)),
let reader = UnitigFileReader::open_sequential(&src_layer_dir.join("unitigs.bin"))?; RawBatch((usize, usize, Vec<(CanonicalKmer, Vec<u32>)>)),
let src_data = SrcLayerData::open(&src_layer_dir, mode)?; WriteBatch(Vec<(Option<usize>, usize, usize, u32)>),
}
for (kmer, _, _) in reader.iter_indexed_canonical_kmers() { let builders = Arc::new(Mutex::new((exist_builders, new_src_builders)));
let values = src_data.lookup(kmer, *src_n); let builders_sink = Arc::clone(&builders);
for (g, &value) in values.iter().enumerate() { let dst_map_t2 = Arc::clone(&dst_map);
let builder_idx = col_offset + g; let new_mphf_t2 = new_mphf.clone();
if let Some((dst_layer, hit)) = dst_map.query(kmer) { let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
exist_builders[dst_layer][builder_idx].set_val(hit.slot, value); let err_cap2 = Arc::clone(&pass2_err);
} else if let Some(ref mphf) = new_mphf {
let slot = mphf.index(kmer); let pipeline2 = Pipeline::new(
new_src_builders[builder_idx].set_val(slot, value); make_source!(Pass2Data, pass2_items, SrcLayer),
vec![
make_flat_transform!(Pass2Data, {
move |(col_offset, src_n, src_layer_dir): (usize, usize, PathBuf)|
-> Vec<(usize, usize, Vec<(CanonicalKmer, Vec<u32>)>)>
{
let reader = match UnitigFileReader::open_sequential(
&src_layer_dir.join("unitigs.bin"),
) {
Ok(r) => r,
Err(e) => {
*err_cap2.lock().unwrap() = Some(e.to_string());
return vec![];
}
};
let src_data = match SrcLayerData::open(&src_layer_dir, mode) {
Ok(d) => d,
Err(e) => {
*err_cap2.lock().unwrap() = Some(e.to_string());
return vec![];
}
};
let all_items: Vec<(CanonicalKmer, Vec<u32>)> = reader
.iter_indexed_canonical_kmers()
.map(|(kmer, _, _)| (kmer, src_data.lookup(kmer, src_n)))
.collect();
all_items
.chunks(BATCH)
.map(|c| (col_offset, src_n, c.to_vec()))
.collect()
}
}, SrcLayer, RawBatch),
make_transform!(Pass2Data, {
move |(col_offset, _src_n, items): (usize, usize, Vec<(CanonicalKmer, Vec<u32>)>)|
-> Vec<(Option<usize>, usize, usize, u32)>
{
let mut ops: Vec<(Option<usize>, usize, usize, u32)> = Vec::new();
for (kmer, values) in items {
if let Some((dst_layer, hit)) = dst_map_t2.query(kmer) {
for (g, val) in values.into_iter().enumerate() {
ops.push((Some(dst_layer), col_offset + g, hit.slot, val));
}
} else if let Some(ref mphf) = new_mphf_t2 {
let slot = mphf.index(kmer);
for (g, val) in values.into_iter().enumerate() {
ops.push((None, col_offset + g, slot, val));
}
}
}
ops
}
}, RawBatch, WriteBatch),
],
make_sink!(Pass2Data, {
move |ops: Vec<(Option<usize>, usize, usize, u32)>| {
let mut guard = builders_sink.lock().unwrap();
for (layer_opt, col, slot, val) in ops {
match layer_opt {
Some(l) => guard.0[l][col].set_val(slot, val),
None => guard.1[col].set_val(slot, val),
} }
} }
} }
} }, WriteBatch),
col_offset += src_n; );
WorkerPool::new(pipeline2, n_workers, capacity).run();
if let Some(msg) = Arc::try_unwrap(pass2_err)
.unwrap_or_else(|_| panic!("pass2: pass2_err not uniquely owned"))
.into_inner()
.unwrap_or_else(|e| e.into_inner())
{
return Err(SKError::InvalidData { context: "merge pass2", detail: msg });
} }
let (exist_builders, new_src_builders) = Arc::try_unwrap(builders)
.unwrap_or_else(|_| panic!("pass2: builders not uniquely owned after pipeline"))
.into_inner()
.unwrap_or_else(|e| e.into_inner());
// ── Close builders and update metadata ──────────────────────────────── // ── Close builders and update metadata ────────────────────────────────
for (l, builders) in exist_builders.into_iter().enumerate() { for (l, builders) in exist_builders.into_iter().enumerate() {
let layer_dir = dst_index_dir.join(format!("layer_{l}")); let layer_dir = dst_index_dir.join(format!("layer_{l}"));
+1
View File
@@ -104,3 +104,4 @@ fn layer_dir(root: &Path, i: usize) -> PathBuf {
#[cfg(test)] #[cfg(test)]
#[path = "tests/map.rs"] #[path = "tests/map.rs"]
mod tests; mod tests;
+6
View File
@@ -119,6 +119,12 @@ use sysinfo::System;
/// ///
/// On macOS, `available_memory()` can return 0 when the memory compressor /// On macOS, `available_memory()` can return 0 when the memory compressor
/// inflates the page count; in that case we fall back to half of total memory. /// inflates the page count; in that case we fall back to half of total memory.
/// Returns the process peak RSS (high-water mark since process start).
/// Monotonically increasing — use delta before/after a phase to measure its RAM cost.
pub fn peak_rss_bytes() -> u64 {
rss_to_bytes(&get_rusage())
}
pub fn available_memory_bytes() -> u64 { pub fn available_memory_bytes() -> u64 {
let sys = System::new_all(); let sys = System::new_all();
let host_avail = match sys.available_memory() { let host_avail = match sys.available_memory() {