diff --git a/memory/MEMORY.md b/memory/MEMORY.md index 1e068c8..d23e4be 100644 --- a/memory/MEMORY.md +++ b/memory/MEMORY.md @@ -2,3 +2,4 @@ - [Project domain](project_domain.md) — obikmer est pour la génomique (génomes individuels), pas la métagénomique - [No architectural decisions without authorization](feedback_architectural_decisions.md) — toute décision architecturale (mémoire, algo, structure) requiert l'accord explicite de l'utilisateur avant toute action +- [Phases intra-partition parallèles](feedback_phases_parallelism.md) — graph build, compute_degrees, unitig traversal, MPHF utilisent Rayon — ne jamais les appeler "séquentielles" diff --git a/memory/feedback_phases_parallelism.md b/memory/feedback_phases_parallelism.md new file mode 100644 index 0000000..c6823d0 --- /dev/null +++ b/memory/feedback_phases_parallelism.md @@ -0,0 +1,12 @@ +--- +name: feedback-phases-parallelism +description: Les phases intra-partition (graph build, compute_degrees, unitig traversal, MPHF) utilisent toutes Rayon — elles ne sont PAS séquentielles +metadata: + type: feedback +--- + +Ne jamais qualifier les phases intra-partition de "séquentielles". Chaque phase (graph build, compute_degrees, unitig traversal, MPHF build) utilise Rayon en interne et s'exécute en parallèle sur plusieurs cœurs. + +**Why:** L'utilisateur a corrigé ce point plusieurs fois. Le décrire comme "séquentiel" est une erreur factuelle qui fausse l'analyse de performance. + +**How to apply:** Quand on analyse l'efficacité CPU ou les 25% manquants, chercher la cause dans le déséquilibre de charge entre partitions, la contention Rayon entre workers, ou la latence inter-partitions — pas dans une prétendue sérialisation des phases. diff --git a/scripts/merge_report.py b/scripts/merge_report.py new file mode 100755 index 0000000..745e7f5 --- /dev/null +++ b/scripts/merge_report.py @@ -0,0 +1,347 @@ +#!/usr/bin/env python3 +"""Parse obikmer merge debug log → Markdown performance report.""" + +import re +import sys +from datetime import datetime +from collections import defaultdict +from statistics import mean, median, stdev + +ANSI = re.compile(r'\x1b\[[0-9;]*m') + +def strip(s): + return ANSI.sub('', s) + +def parse_ts(s): + return datetime.fromisoformat(s.replace('Z', '+00:00')) + +def dur_s(s): + s = s.strip() + if s.endswith('ms'): return float(s[:-2]) / 1e3 + if s.endswith('µs'): return float(s[:-2]) / 1e6 + if s.endswith('us'): return float(s[:-2]) / 1e6 + if s.endswith('ns'): return float(s[:-2]) / 1e9 + if s.endswith('s'): return float(s[:-1]) + return float(s) + +def fmt_s(s): + if s < 0.001: return f"{s*1e6:.0f}µs" + if s < 1: return f"{s*1e3:.0f}ms" + if s < 60: return f"{s:.2f}s" + return f"{s/60:.1f}min ({s:.0f}s)" + +def fmt_rate(n, s): + if s <= 0: return "—" + r = n / s + if r >= 1e9: return f"{r/1e9:.2f}G/s" + if r >= 1e6: return f"{r/1e6:.2f}M/s" + if r >= 1e3: return f"{r/1e3:.2f}K/s" + return f"{r:.0f}/s" + +def pct(a, b): + return f"{100*a/b:.1f}%" if b else "—" + +def stats_row(label, values, unit="s", fmt=fmt_s): + if not values: return f"| {label} | — | — | — | — | — |" + mn, mx, med, av = min(values), max(values), median(values), mean(values) + sd = stdev(values) if len(values) > 1 else 0 + return f"| {label} | {fmt(mn)} | {fmt(med)} | {fmt(av)} | {fmt(mx)} | {fmt(sd)} |" + +# ── patterns ────────────────────────────────────────────────────────────────── + +TS = r'(\d{4}-\d{2}-\d{2}T[\d:.]+Z)' + +pats = { + 'graph_done': re.compile(TS + r'.*partition (\d+): de Bruijn graph done — (\d+) new kmers'), + 'trav_start': re.compile(TS + r'.*partition (\d+): unitig traversal start — (\d+) nodes'), + 'trav_closing': re.compile(TS + r'.*partition (\d+): unitig writer closing'), + 'trav_closed': re.compile(TS + r'.*partition (\d+): unitig writer closed'), + 'graph_dropped': re.compile(TS + r'.*partition (\d+): graph dropped — starting MPHF build \((\d+) unitigs\)'), + 'mphf_done': re.compile(TS + r'.*partition (\d+): MPHF build done'), + 'mphf_open': re.compile(TS + r'.*partition (\d+): MPHF open in ([\d.]+)s'), + 'bld_ready': re.compile(TS + r'.*partition (\d+): builders ready in ([\d.]+)s'), + 'pass2_done': re.compile(TS + r'.*partition (\d+): pass2 pipeline done in ([\d.]+)s'), + 'bld_closed': re.compile(TS + r'.*partition (\d+): builders closed in ([\d.]+)s'), + 'part_done': re.compile(TS + r'.*partition (\d+): done in ([\d.]+)s — (\d+) new kmers'), + 'worker': re.compile(TS + r'.*activated worker (\d+).*efficiency (\d+)%.*gain vs prev (\d+)%'), + 'worker_poll': re.compile(TS + r'.*activated worker (\d+) \(poll\).*efficiency (\d+)%'), + 'compute_deg': re.compile(TS + r'.*partition (\d+): compute_degrees in ([\d.]+)s — (\d+) nodes'), + 'stage_done': re.compile(TS + r'.*done stage=merge_partitions wall_secs=([\d.]+)'), + 'workers_rep': re.compile(r'workers spawned: (\d+) / (\d+)'), +} + +# ── parse ───────────────────────────────────────────────────────────────────── + +P = defaultdict(dict) # partition_id → timing dict +workers_ev = [] +wall_total = None +workers_final = (None, None) + +with open(sys.argv[1]) as f: + for raw in f: + line = strip(raw) + + m = pats['graph_done'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['n_kmers'] = int(m.group(3)) + P[pid]['graph_done_ts'] = parse_ts(m.group(1)) + continue + + m = pats['trav_start'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['trav_start_ts'] = parse_ts(m.group(1)) + P[pid]['n_nodes'] = int(m.group(3)) + continue + + m = pats['trav_closing'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['trav_closing_ts'] = parse_ts(m.group(1)) + continue + + m = pats['trav_closed'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['trav_closed_ts'] = parse_ts(m.group(1)) + continue + + m = pats['graph_dropped'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['drop_ts'] = parse_ts(m.group(1)) + P[pid]['n_unitigs'] = int(m.group(3)) + continue + + m = pats['mphf_done'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['mphf_done_ts'] = parse_ts(m.group(1)) + continue + + m = pats['mphf_open'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['mphf_open_s'] = float(m.group(3)) + continue + + m = pats['bld_ready'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['bld_ready_s'] = float(m.group(3)) + continue + + m = pats['pass2_done'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['pass2_s'] = float(m.group(3)) + continue + + m = pats['bld_closed'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['bld_closed_s'] = float(m.group(3)) + continue + + m = pats['part_done'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['total_s'] = float(m.group(3)) + P[pid]['done_ts'] = parse_ts(m.group(1)) + continue + + m = pats['worker'].search(line) + if m: + workers_ev.append({'n': int(m.group(2)), 'eff': int(m.group(3)), + 'gain': int(m.group(4)), 'ts': parse_ts(m.group(1)), 'poll': False}) + continue + + m = pats['worker_poll'].search(line) + if m: + workers_ev.append({'n': int(m.group(2)), 'eff': int(m.group(3)), + 'gain': None, 'ts': parse_ts(m.group(1)), 'poll': True}) + continue + + m = pats['compute_deg'].search(line) + if m: + pid = int(m.group(2)) + P[pid]['cdeg_s'] = float(m.group(3)) + P[pid]['n_nodes'] = P[pid].get('n_nodes') or int(m.group(4)) + continue + + m = pats['stage_done'].search(line) + if m: + wall_total = float(m.group(2)) + continue + + m = pats['workers_rep'].search(line) + if m: + workers_final = (int(m.group(1)), int(m.group(2))) + continue + +# ── derive per-partition phases ─────────────────────────────────────────────── + +def tsdiff(p, k1, k2): + if k1 in p and k2 in p: + return (p[k2] - p[k1]).total_seconds() + return None + +phases = {} +for pid, p in P.items(): + row = {'pid': pid} + row['n_kmers'] = p.get('n_kmers', 0) + row['n_nodes'] = p.get('n_nodes', 0) + row['n_unitigs']= p.get('n_unitigs', 0) + row['total_s'] = p.get('total_s') + row['cdeg_s'] = p.get('cdeg_s') + row['mphf_open_s'] = p.get('mphf_open_s') + row['bld_ready_s'] = p.get('bld_ready_s') + row['pass2_s'] = p.get('pass2_s') + row['bld_closed_s'] = p.get('bld_closed_s') + + # Traversal: trav_start → trav_closing (= writing all unitigs) + row['trav_s'] = tsdiff(p, 'trav_start_ts', 'trav_closing_ts') + # Writer close: trav_closing → trav_closed + row['close_s'] = tsdiff(p, 'trav_closing_ts', 'trav_closed_ts') + # Graph drop: trav_closed → drop_ts + row['drop_s'] = tsdiff(p, 'trav_closed_ts', 'drop_ts') + # MPHF build: drop_ts → mphf_done_ts + row['mphf_s'] = tsdiff(p, 'drop_ts', 'mphf_done_ts') + # After MPHF: mphf_done → done_ts + row['post_s'] = tsdiff(p, 'mphf_done_ts', 'done_ts') + + # Graph build: total - known phases (rough estimate) + known = sum(v for v in [row['cdeg_s'], row['trav_s'], row['close_s'], row['drop_s'], + row['mphf_s'], row['mphf_open_s'], row['bld_ready_s'], + row['pass2_s'], row['bld_closed_s']] if v is not None) + row['graph_build_s'] = (row['total_s'] - known) if row['total_s'] else None + + phases[pid] = row + +# helpers +def collect(key): + return [r[key] for r in phases.values() if r.get(key) is not None] + +def rate_stats(n_key, t_key): + """Returns list of throughput values (items/s).""" + result = [] + for r in phases.values(): + n, t = r.get(n_key), r.get(t_key) + if n and t and t > 0: + result.append(n / t) + return result + +# ── output ──────────────────────────────────────────────────────────────────── + +out = [] +w = out.append + +w("# obikmer merge — performance report\n") + +# Run info +n_parts = len([r for r in phases.values() if r['n_kmers'] > 0]) +n_empty = len([r for r in phases.values() if r['n_kmers'] == 0]) +total_kmers = sum(r['n_kmers'] for r in phases.values()) +w("## Run summary\n") +w(f"- **Partitions**: {len(phases)} total — {n_parts} non-empty, {n_empty} empty") +w(f"- **New kmers (total)**: {total_kmers:,}") +if wall_total: + w(f"- **merge_partitions wall time**: {fmt_s(wall_total)}") +if workers_final[0]: + w(f"- **Workers spawned**: {workers_final[0]} / {workers_final[1]} (max)") +w("") + +# Worker spawn timeline +if workers_ev: + w("## Worker activation\n") + w("| Time | Worker # | Trigger | Efficiency | Gain vs prev |") + w("|------|----------|---------|------------|--------------|") + t0 = workers_ev[0]['ts'] + for e in workers_ev: + elapsed = fmt_s((e['ts'] - t0).total_seconds()) + trigger = "poll (timeout)" if e['poll'] else "partition done" + gain = f"{e['gain']}%" if e.get('gain') is not None else "—" + w(f"| +{elapsed} | {e['n']} | {trigger} | {e['eff']}% | {gain} |") + w("") + +# Phase breakdown table +w("## Phase timing statistics\n") +w("Columns: min | median | mean | max | stdev\n") +w("| Phase | min | median | mean | max | stdev |") +w("|-------|-----|--------|------|-----|-------|") +w(stats_row("Graph build (estimated)", collect('graph_build_s'))) +w(stats_row("compute_degrees", collect('cdeg_s'))) +w(stats_row("Unitig traversal", collect('trav_s'))) +w(stats_row("Writer close (uw.close)", collect('close_s'))) +w(stats_row("Graph drop", collect('drop_s'))) +w(stats_row("MPHF build", collect('mphf_s'))) +w(stats_row("MPHF open", collect('mphf_open_s'))) +w(stats_row("Builders ready", collect('bld_ready_s'))) +w(stats_row("Pass2 pipeline", collect('pass2_s'))) +w(stats_row("Builders close", collect('bld_closed_s'))) +w(stats_row("Post-MPHF (residual)", collect('post_s'))) +w(stats_row("**Total per partition**", collect('total_s'))) +w("") + +# Throughput +w("## Throughput by phase\n") +w("| Phase | metric | min | median | mean | max |") +w("|-------|--------|-----|--------|------|-----|") + +def rate_row(label, rates): + if not rates: return f"| {label} | — | — | — | — | — |" + f = lambda x: fmt_rate(x, 1) + mn, med, av, mx = min(rates), median(rates), mean(rates), max(rates) + return f"| {label} | nodes/s | {f(mn)} | {f(med)} | {f(av)} | {f(mx)} |" + +w(rate_row("compute_degrees", rate_stats('n_nodes', 'cdeg_s'))) +w(rate_row("Unitig traversal", rate_stats('n_nodes', 'trav_s'))) +w(rate_row("MPHF build", rate_stats('n_unitigs', 'mphf_s'))) +w("") + + +# Top 10 slowest partitions +w("## Top 10 slowest partitions\n") +w("| Partition | nodes | unitigs | total | trav | MPHF | graph build |") +w("|-----------|-------|---------|-------|------|------|-------------|") +sorted_parts = sorted(phases.values(), key=lambda r: r['total_s'] or 0, reverse=True) +for r in sorted_parts[:10]: + pid = r['pid'] + def f(k): return fmt_s(r[k]) if r.get(k) is not None else "—" + nodes = f"{r['n_nodes']/1e6:.1f}M" if r['n_nodes'] else "—" + unitigs = f"{r['n_unitigs']/1e6:.1f}M" if r['n_unitigs'] else "—" + w(f"| {pid} | {nodes} | {unitigs} | {f('total_s')} | {f('trav_s')} | {f('mphf_s')} | {f('graph_build_s')} |") +w("") + +# Phase share of total time (for non-empty partitions with full data) +complete = [r for r in phases.values() + if all(r.get(k) is not None + for k in ('total_s','trav_s','close_s','drop_s','mphf_s', + 'mphf_open_s','bld_ready_s','pass2_s','bld_closed_s')) + and r['total_s'] and r['total_s'] > 0] +if complete: + w("## Phase share of total time (mean across complete partitions)\n") + total_mean = mean(r['total_s'] for r in complete) + w(f"_Based on {len(complete)} partitions with full timing data. Mean total: {fmt_s(total_mean)}_\n") + w("| Phase | mean time | share |") + w("|-------|-----------|-------|") + for label, key in [ + ("Graph build", 'graph_build_s'), + ("compute_degrees", 'cdeg_s'), + ("Unitig traversal", 'trav_s'), + ("Writer close", 'close_s'), + ("Graph drop", 'drop_s'), + ("MPHF build", 'mphf_s'), + ("MPHF open", 'mphf_open_s'), + ("Builders ready", 'bld_ready_s'), + ("Pass2 pipeline", 'pass2_s'), + ("Builders close", 'bld_closed_s'), + ("Post-MPHF (residual)", 'post_s'), + ]: + vals = [r[key] for r in complete] + m = mean(vals) + w(f"| {label} | {fmt_s(m)} | {pct(m, total_mean)} |") + w("") + +print('\n'.join(out)) diff --git a/src/obidebruinj/src/debruijn.rs b/src/obidebruinj/src/debruijn.rs index 23c1a1c..8d300f2 100644 --- a/src/obidebruinj/src/debruijn.rs +++ b/src/obidebruinj/src/debruijn.rs @@ -7,8 +7,6 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::cell::RefCell; use std::fmt; use std::sync::atomic::{AtomicU8, Ordering}; -use std::time::Instant; -use tracing::{debug, info}; use xxhash_rust::xxh3::Xxh3Builder; // ── Types ───────────────────────────────────────────────────────────────────── @@ -285,7 +283,6 @@ impl GraphDeBruijn { pub fn compute_degrees_and_mark_starts(&self) { // Pass 1: count right/left neighbors for each node - let t1 = Instant::now(); self.for_each_node(|kmer, atomic| { let mut old = Node(atomic.load(Ordering::Relaxed)); if old.is_visited() { @@ -295,20 +292,13 @@ impl GraphDeBruijn { } let (rc, rn) = count_neighbors(&kmer.right_canonical_neighbors(), &self.nodes); let (lc, ln) = count_neighbors(&kmer.left_canonical_neighbors(), &self.nodes); - let mut node = Node(0); // reset all bits (visited=0, start=0) + let mut node = Node(0); node.set_right(rc, rn); node.set_left(lc, ln); atomic.store(node.0, Ordering::Relaxed); }); - debug!( - "[compute_degrees] pass 1 (degrees): {:?} — {} nodes", - t1.elapsed(), - self.nodes.len() - ); // Pass 2: mark start nodes - - let t2 = Instant::now(); self.for_each_node(|kmer, atomic| { let mut node = Node(atomic.load(Ordering::Relaxed)); if node.is_visited() { @@ -319,11 +309,6 @@ impl GraphDeBruijn { atomic.store(node.0, Ordering::Relaxed); } }); - debug!( - "[compute_degrees] pass 2 (starts): {:?} — {} nodes", - t2.elapsed(), - self.nodes.len() - ); } pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option { @@ -391,7 +376,6 @@ impl GraphDeBruijn { let n2 = std::sync::atomic::AtomicUsize::new(0); // Boucle unique : traiter les starts, recalculer les arités, recommencer - let mut pass = 0usize; loop { let n_new = std::sync::atomic::AtomicUsize::new(0); @@ -421,9 +405,7 @@ impl GraphDeBruijn { }); let n = n_new.load(Ordering::Relaxed); - debug!("[for_each_unitig] pass {}: {} starts", pass, n); n_chains.fetch_add(n, Ordering::Relaxed); - pass += 1; if n == 0 { break; } @@ -452,12 +434,6 @@ impl GraphDeBruijn { } } - debug!( - chains = n_chains.load(Ordering::Relaxed), - phase2 = n2.load(Ordering::Relaxed), - total = n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed), - "unitig traversal complete" - ); } /// Merge `other` into `self`. diff --git a/src/obikindex/src/merge.rs b/src/obikindex/src/merge.rs index f32fc8f..3dd3f05 100644 --- a/src/obikindex/src/merge.rs +++ b/src/obikindex/src/merge.rs @@ -289,14 +289,39 @@ impl KmerIndex { activate_tx.send(()).ok(); n_workers = 1; + const SPAWN_POLL: Duration = Duration::from_secs(10); + let mut completed = 0usize; while completed < n_partitions { - let (i, r, dur) = result_rx.recv().map_err(|_| { - OKIError::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "worker channel closed", - )) - })?; + let result = result_rx.recv_timeout(SPAWN_POLL); + + // On timeout: no partition finished yet, just check efficiency. + let (i, r, dur) = match result { + Ok(v) => v, + Err(crossbeam_channel::RecvTimeoutError::Timeout) => { + if n_workers < max_workers { + let eff = cpu_sample.cpu_efficiency(n_cores); + if eff < SPAWN_THRESHOLD { + debug!( + "activated worker {} (poll) — efficiency {:.0}%", + n_workers + 1, + eff * 100.0, + ); + efficiency_at_last_spawn = eff; + activate_tx.send(()).ok(); + n_workers += 1; + cpu_sample = CpuSample::now(); + } + } + continue; + } + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { + return Err(OKIError::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "worker channel closed", + ))); + } + }; let g_len = r.map_err(OKIError::Partition)?; pb.inc(1); debug!( diff --git a/src/obikpartitionner/src/merge_layer.rs b/src/obikpartitionner/src/merge_layer.rs index 494cb71..dc0aa15 100644 --- a/src/obikpartitionner/src/merge_layer.rs +++ b/src/obikpartitionner/src/merge_layer.rs @@ -304,27 +304,37 @@ impl KmerPartition { let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}")); let n_new = if any_new { + let t_deg = std::time::Instant::now(); g.compute_degrees_and_mark_starts(); + debug!("partition {i}: compute_degrees in {:.3}s — {} nodes", + t_deg.elapsed().as_secs_f64(), g.len()); fs::create_dir_all(&new_layer_dir)?; let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?; + debug!("partition {i}: unitig traversal start — {} nodes", g.len()); g.try_for_each_unitig(|unitig| { uw.write(unitig) })?; + debug!("partition {i}: unitig writer closing"); uw.close()?; + debug!("partition {i}: unitig writer closed — dropping graph ({} nodes)", g.len()); let n = g.len(); - drop(g); // release GraphDeBruijn before MPHF build + drop(g); + debug!("partition {i}: graph dropped — starting MPHF build ({n} unitigs)"); Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?; + debug!("partition {i}: MPHF build done"); n } else { drop(g); 0 }; + let t_open = std::time::Instant::now(); let new_mphf: Option> = if any_new { Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?)) } else { None }; + debug!("partition {i}: MPHF open in {:.3}s", t_open.elapsed().as_secs_f64()); // ── Prepare matrix directories for the new layer ────────────────────── // Absent columns (dst genomes) are written via append_column (all-zero/false). @@ -379,6 +389,7 @@ impl KmerPartition { vec![] }; + let t_builders = std::time::Instant::now(); // Builders for existing layers: n_src_total per layer. // Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1. let exist_builders: Vec> = (0..n_dst_layers) @@ -410,7 +421,10 @@ impl KmerPartition { }) .collect::>()?; + debug!("partition {i}: builders ready in {:.3}s", t_builders.elapsed().as_secs_f64()); + // ── Pass 2: fill builders (pipeline) ───────────────────────────────── + let t_pass2 = std::time::Instant::now(); // Collect source items before the pipeline so load_meta errors propagate // via ? before any worker thread is spawned. let mut pass2_items: Vec<(usize, usize, PathBuf)> = Vec::new(); @@ -439,8 +453,18 @@ impl KmerPartition { WriteBatch(Vec<(Option, usize, usize, u32)>), } - let builders = Arc::new(Mutex::new((exist_builders, new_src_builders))); - let builders_sink = Arc::clone(&builders); + let exist_locked: Vec>>> = exist_builders + .into_iter() + .map(|layer| layer.into_iter().map(|b| Arc::new(Mutex::new(b))).collect()) + .collect(); + let new_locked: Vec>> = new_src_builders + .into_iter() + .map(|b| Arc::new(Mutex::new(b))) + .collect(); + let exist_sink: Vec>>> = exist_locked.iter() + .map(|layer| layer.iter().map(Arc::clone).collect()) + .collect(); + let new_sink: Vec>> = new_locked.iter().map(Arc::clone).collect(); let dst_map_t2 = Arc::clone(&dst_map); let new_mphf_t2 = new_mphf.clone(); let pass2_err: Arc>> = Arc::new(Mutex::new(None)); @@ -519,11 +543,10 @@ impl KmerPartition { ], make_sink!(Pass2Data, { move |ops: Vec<(Option, usize, usize, u32)>| { - let mut guard = builders_sink.lock().unwrap(); for (layer_opt, col, slot, val) in ops { match layer_opt { - Some(l) => guard.0[l][col].set_val(slot, val), - None => guard.1[col].set_val(slot, val), + Some(l) => exist_sink[l][col].lock().unwrap().set_val(slot, val), + None => new_sink[col].lock().unwrap().set_val(slot, val), } } } @@ -531,6 +554,7 @@ impl KmerPartition { ); WorkerPool::new(pipeline2, n_workers, capacity).run(); + debug!("partition {i}: pass2 pipeline done in {:.3}s", t_pass2.elapsed().as_secs_f64()); if let Some(msg) = Arc::try_unwrap(pass2_err) .unwrap_or_else(|_| panic!("pass2: pass2_err not uniquely owned")) @@ -540,16 +564,16 @@ impl KmerPartition { return Err(SKError::InvalidData { context: "merge pass2", detail: msg }); } - let (exist_builders, new_src_builders) = Arc::try_unwrap(builders) - .unwrap_or_else(|_| panic!("pass2: builders not uniquely owned after pipeline")) - .into_inner() - .unwrap_or_else(|e| e.into_inner()); - + let t_close = std::time::Instant::now(); // ── Close builders and update metadata ──────────────────────────────── - for (l, builders) in exist_builders.into_iter().enumerate() { + for (l, builders) in exist_locked.into_iter().enumerate() { let layer_dir = dst_index_dir.join(format!("layer_{l}")); for b in builders { - b.close()?; + Arc::try_unwrap(b) + .unwrap_or_else(|_| panic!("pass2: exist_builder[{l}] not uniquely owned")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()) + .close()?; } let n = dst_map.layer(l).n(); let data_dir = match mode { @@ -559,8 +583,12 @@ impl KmerPartition { write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?; } - for b in new_src_builders { - b.close()?; + for b in new_locked { + Arc::try_unwrap(b) + .unwrap_or_else(|_| panic!("pass2: new_builder not uniquely owned")) + .into_inner() + .unwrap_or_else(|e| e.into_inner()) + .close()?; } if any_new { let data_dir = match mode { @@ -575,6 +603,8 @@ impl KmerPartition { part_meta.save(&dst_index_dir).map_err(olm_to_sk)?; } + debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64()); + Ok(n_new) } }