Push zkspuxlpumpw #27
@@ -2,3 +2,4 @@
|
|||||||
|
|
||||||
- [Project domain](project_domain.md) — obikmer est pour la génomique (génomes individuels), pas la métagénomique
|
- [Project domain](project_domain.md) — obikmer est pour la génomique (génomes individuels), pas la métagénomique
|
||||||
- [No architectural decisions without authorization](feedback_architectural_decisions.md) — toute décision architecturale (mémoire, algo, structure) requiert l'accord explicite de l'utilisateur avant toute action
|
- [No architectural decisions without authorization](feedback_architectural_decisions.md) — toute décision architecturale (mémoire, algo, structure) requiert l'accord explicite de l'utilisateur avant toute action
|
||||||
|
- [Phases intra-partition parallèles](feedback_phases_parallelism.md) — graph build, compute_degrees, unitig traversal, MPHF utilisent Rayon — ne jamais les appeler "séquentielles"
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
---
|
||||||
|
name: feedback-phases-parallelism
|
||||||
|
description: Les phases intra-partition (graph build, compute_degrees, unitig traversal, MPHF) utilisent toutes Rayon — elles ne sont PAS séquentielles
|
||||||
|
metadata:
|
||||||
|
type: feedback
|
||||||
|
---
|
||||||
|
|
||||||
|
Ne jamais qualifier les phases intra-partition de "séquentielles". Chaque phase (graph build, compute_degrees, unitig traversal, MPHF build) utilise Rayon en interne et s'exécute en parallèle sur plusieurs cœurs.
|
||||||
|
|
||||||
|
**Why:** L'utilisateur a corrigé ce point plusieurs fois. Le décrire comme "séquentiel" est une erreur factuelle qui fausse l'analyse de performance.
|
||||||
|
|
||||||
|
**How to apply:** Quand on analyse l'efficacité CPU ou les 25% manquants, chercher la cause dans le déséquilibre de charge entre partitions, la contention Rayon entre workers, ou la latence inter-partitions — pas dans une prétendue sérialisation des phases.
|
||||||
Executable
+347
@@ -0,0 +1,347 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Parse obikmer merge debug log → Markdown performance report."""
|
||||||
|
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
from collections import defaultdict
|
||||||
|
from statistics import mean, median, stdev
|
||||||
|
|
||||||
|
ANSI = re.compile(r'\x1b\[[0-9;]*m')
|
||||||
|
|
||||||
|
def strip(s):
|
||||||
|
return ANSI.sub('', s)
|
||||||
|
|
||||||
|
def parse_ts(s):
|
||||||
|
return datetime.fromisoformat(s.replace('Z', '+00:00'))
|
||||||
|
|
||||||
|
def dur_s(s):
|
||||||
|
s = s.strip()
|
||||||
|
if s.endswith('ms'): return float(s[:-2]) / 1e3
|
||||||
|
if s.endswith('µs'): return float(s[:-2]) / 1e6
|
||||||
|
if s.endswith('us'): return float(s[:-2]) / 1e6
|
||||||
|
if s.endswith('ns'): return float(s[:-2]) / 1e9
|
||||||
|
if s.endswith('s'): return float(s[:-1])
|
||||||
|
return float(s)
|
||||||
|
|
||||||
|
def fmt_s(s):
|
||||||
|
if s < 0.001: return f"{s*1e6:.0f}µs"
|
||||||
|
if s < 1: return f"{s*1e3:.0f}ms"
|
||||||
|
if s < 60: return f"{s:.2f}s"
|
||||||
|
return f"{s/60:.1f}min ({s:.0f}s)"
|
||||||
|
|
||||||
|
def fmt_rate(n, s):
|
||||||
|
if s <= 0: return "—"
|
||||||
|
r = n / s
|
||||||
|
if r >= 1e9: return f"{r/1e9:.2f}G/s"
|
||||||
|
if r >= 1e6: return f"{r/1e6:.2f}M/s"
|
||||||
|
if r >= 1e3: return f"{r/1e3:.2f}K/s"
|
||||||
|
return f"{r:.0f}/s"
|
||||||
|
|
||||||
|
def pct(a, b):
|
||||||
|
return f"{100*a/b:.1f}%" if b else "—"
|
||||||
|
|
||||||
|
def stats_row(label, values, unit="s", fmt=fmt_s):
|
||||||
|
if not values: return f"| {label} | — | — | — | — | — |"
|
||||||
|
mn, mx, med, av = min(values), max(values), median(values), mean(values)
|
||||||
|
sd = stdev(values) if len(values) > 1 else 0
|
||||||
|
return f"| {label} | {fmt(mn)} | {fmt(med)} | {fmt(av)} | {fmt(mx)} | {fmt(sd)} |"
|
||||||
|
|
||||||
|
# ── patterns ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
TS = r'(\d{4}-\d{2}-\d{2}T[\d:.]+Z)'
|
||||||
|
|
||||||
|
pats = {
|
||||||
|
'graph_done': re.compile(TS + r'.*partition (\d+): de Bruijn graph done — (\d+) new kmers'),
|
||||||
|
'trav_start': re.compile(TS + r'.*partition (\d+): unitig traversal start — (\d+) nodes'),
|
||||||
|
'trav_closing': re.compile(TS + r'.*partition (\d+): unitig writer closing'),
|
||||||
|
'trav_closed': re.compile(TS + r'.*partition (\d+): unitig writer closed'),
|
||||||
|
'graph_dropped': re.compile(TS + r'.*partition (\d+): graph dropped — starting MPHF build \((\d+) unitigs\)'),
|
||||||
|
'mphf_done': re.compile(TS + r'.*partition (\d+): MPHF build done'),
|
||||||
|
'mphf_open': re.compile(TS + r'.*partition (\d+): MPHF open in ([\d.]+)s'),
|
||||||
|
'bld_ready': re.compile(TS + r'.*partition (\d+): builders ready in ([\d.]+)s'),
|
||||||
|
'pass2_done': re.compile(TS + r'.*partition (\d+): pass2 pipeline done in ([\d.]+)s'),
|
||||||
|
'bld_closed': re.compile(TS + r'.*partition (\d+): builders closed in ([\d.]+)s'),
|
||||||
|
'part_done': re.compile(TS + r'.*partition (\d+): done in ([\d.]+)s — (\d+) new kmers'),
|
||||||
|
'worker': re.compile(TS + r'.*activated worker (\d+).*efficiency (\d+)%.*gain vs prev (\d+)%'),
|
||||||
|
'worker_poll': re.compile(TS + r'.*activated worker (\d+) \(poll\).*efficiency (\d+)%'),
|
||||||
|
'compute_deg': re.compile(TS + r'.*partition (\d+): compute_degrees in ([\d.]+)s — (\d+) nodes'),
|
||||||
|
'stage_done': re.compile(TS + r'.*done stage=merge_partitions wall_secs=([\d.]+)'),
|
||||||
|
'workers_rep': re.compile(r'workers spawned: (\d+) / (\d+)'),
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── parse ─────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
P = defaultdict(dict) # partition_id → timing dict
|
||||||
|
workers_ev = []
|
||||||
|
wall_total = None
|
||||||
|
workers_final = (None, None)
|
||||||
|
|
||||||
|
with open(sys.argv[1]) as f:
|
||||||
|
for raw in f:
|
||||||
|
line = strip(raw)
|
||||||
|
|
||||||
|
m = pats['graph_done'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['n_kmers'] = int(m.group(3))
|
||||||
|
P[pid]['graph_done_ts'] = parse_ts(m.group(1))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['trav_start'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['trav_start_ts'] = parse_ts(m.group(1))
|
||||||
|
P[pid]['n_nodes'] = int(m.group(3))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['trav_closing'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['trav_closing_ts'] = parse_ts(m.group(1))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['trav_closed'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['trav_closed_ts'] = parse_ts(m.group(1))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['graph_dropped'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['drop_ts'] = parse_ts(m.group(1))
|
||||||
|
P[pid]['n_unitigs'] = int(m.group(3))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['mphf_done'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['mphf_done_ts'] = parse_ts(m.group(1))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['mphf_open'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['mphf_open_s'] = float(m.group(3))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['bld_ready'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['bld_ready_s'] = float(m.group(3))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['pass2_done'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['pass2_s'] = float(m.group(3))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['bld_closed'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['bld_closed_s'] = float(m.group(3))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['part_done'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['total_s'] = float(m.group(3))
|
||||||
|
P[pid]['done_ts'] = parse_ts(m.group(1))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['worker'].search(line)
|
||||||
|
if m:
|
||||||
|
workers_ev.append({'n': int(m.group(2)), 'eff': int(m.group(3)),
|
||||||
|
'gain': int(m.group(4)), 'ts': parse_ts(m.group(1)), 'poll': False})
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['worker_poll'].search(line)
|
||||||
|
if m:
|
||||||
|
workers_ev.append({'n': int(m.group(2)), 'eff': int(m.group(3)),
|
||||||
|
'gain': None, 'ts': parse_ts(m.group(1)), 'poll': True})
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['compute_deg'].search(line)
|
||||||
|
if m:
|
||||||
|
pid = int(m.group(2))
|
||||||
|
P[pid]['cdeg_s'] = float(m.group(3))
|
||||||
|
P[pid]['n_nodes'] = P[pid].get('n_nodes') or int(m.group(4))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['stage_done'].search(line)
|
||||||
|
if m:
|
||||||
|
wall_total = float(m.group(2))
|
||||||
|
continue
|
||||||
|
|
||||||
|
m = pats['workers_rep'].search(line)
|
||||||
|
if m:
|
||||||
|
workers_final = (int(m.group(1)), int(m.group(2)))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# ── derive per-partition phases ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
def tsdiff(p, k1, k2):
|
||||||
|
if k1 in p and k2 in p:
|
||||||
|
return (p[k2] - p[k1]).total_seconds()
|
||||||
|
return None
|
||||||
|
|
||||||
|
phases = {}
|
||||||
|
for pid, p in P.items():
|
||||||
|
row = {'pid': pid}
|
||||||
|
row['n_kmers'] = p.get('n_kmers', 0)
|
||||||
|
row['n_nodes'] = p.get('n_nodes', 0)
|
||||||
|
row['n_unitigs']= p.get('n_unitigs', 0)
|
||||||
|
row['total_s'] = p.get('total_s')
|
||||||
|
row['cdeg_s'] = p.get('cdeg_s')
|
||||||
|
row['mphf_open_s'] = p.get('mphf_open_s')
|
||||||
|
row['bld_ready_s'] = p.get('bld_ready_s')
|
||||||
|
row['pass2_s'] = p.get('pass2_s')
|
||||||
|
row['bld_closed_s'] = p.get('bld_closed_s')
|
||||||
|
|
||||||
|
# Traversal: trav_start → trav_closing (= writing all unitigs)
|
||||||
|
row['trav_s'] = tsdiff(p, 'trav_start_ts', 'trav_closing_ts')
|
||||||
|
# Writer close: trav_closing → trav_closed
|
||||||
|
row['close_s'] = tsdiff(p, 'trav_closing_ts', 'trav_closed_ts')
|
||||||
|
# Graph drop: trav_closed → drop_ts
|
||||||
|
row['drop_s'] = tsdiff(p, 'trav_closed_ts', 'drop_ts')
|
||||||
|
# MPHF build: drop_ts → mphf_done_ts
|
||||||
|
row['mphf_s'] = tsdiff(p, 'drop_ts', 'mphf_done_ts')
|
||||||
|
# After MPHF: mphf_done → done_ts
|
||||||
|
row['post_s'] = tsdiff(p, 'mphf_done_ts', 'done_ts')
|
||||||
|
|
||||||
|
# Graph build: total - known phases (rough estimate)
|
||||||
|
known = sum(v for v in [row['cdeg_s'], row['trav_s'], row['close_s'], row['drop_s'],
|
||||||
|
row['mphf_s'], row['mphf_open_s'], row['bld_ready_s'],
|
||||||
|
row['pass2_s'], row['bld_closed_s']] if v is not None)
|
||||||
|
row['graph_build_s'] = (row['total_s'] - known) if row['total_s'] else None
|
||||||
|
|
||||||
|
phases[pid] = row
|
||||||
|
|
||||||
|
# helpers
|
||||||
|
def collect(key):
|
||||||
|
return [r[key] for r in phases.values() if r.get(key) is not None]
|
||||||
|
|
||||||
|
def rate_stats(n_key, t_key):
|
||||||
|
"""Returns list of throughput values (items/s)."""
|
||||||
|
result = []
|
||||||
|
for r in phases.values():
|
||||||
|
n, t = r.get(n_key), r.get(t_key)
|
||||||
|
if n and t and t > 0:
|
||||||
|
result.append(n / t)
|
||||||
|
return result
|
||||||
|
|
||||||
|
# ── output ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
out = []
|
||||||
|
w = out.append
|
||||||
|
|
||||||
|
w("# obikmer merge — performance report\n")
|
||||||
|
|
||||||
|
# Run info
|
||||||
|
n_parts = len([r for r in phases.values() if r['n_kmers'] > 0])
|
||||||
|
n_empty = len([r for r in phases.values() if r['n_kmers'] == 0])
|
||||||
|
total_kmers = sum(r['n_kmers'] for r in phases.values())
|
||||||
|
w("## Run summary\n")
|
||||||
|
w(f"- **Partitions**: {len(phases)} total — {n_parts} non-empty, {n_empty} empty")
|
||||||
|
w(f"- **New kmers (total)**: {total_kmers:,}")
|
||||||
|
if wall_total:
|
||||||
|
w(f"- **merge_partitions wall time**: {fmt_s(wall_total)}")
|
||||||
|
if workers_final[0]:
|
||||||
|
w(f"- **Workers spawned**: {workers_final[0]} / {workers_final[1]} (max)")
|
||||||
|
w("")
|
||||||
|
|
||||||
|
# Worker spawn timeline
|
||||||
|
if workers_ev:
|
||||||
|
w("## Worker activation\n")
|
||||||
|
w("| Time | Worker # | Trigger | Efficiency | Gain vs prev |")
|
||||||
|
w("|------|----------|---------|------------|--------------|")
|
||||||
|
t0 = workers_ev[0]['ts']
|
||||||
|
for e in workers_ev:
|
||||||
|
elapsed = fmt_s((e['ts'] - t0).total_seconds())
|
||||||
|
trigger = "poll (timeout)" if e['poll'] else "partition done"
|
||||||
|
gain = f"{e['gain']}%" if e.get('gain') is not None else "—"
|
||||||
|
w(f"| +{elapsed} | {e['n']} | {trigger} | {e['eff']}% | {gain} |")
|
||||||
|
w("")
|
||||||
|
|
||||||
|
# Phase breakdown table
|
||||||
|
w("## Phase timing statistics\n")
|
||||||
|
w("Columns: min | median | mean | max | stdev\n")
|
||||||
|
w("| Phase | min | median | mean | max | stdev |")
|
||||||
|
w("|-------|-----|--------|------|-----|-------|")
|
||||||
|
w(stats_row("Graph build (estimated)", collect('graph_build_s')))
|
||||||
|
w(stats_row("compute_degrees", collect('cdeg_s')))
|
||||||
|
w(stats_row("Unitig traversal", collect('trav_s')))
|
||||||
|
w(stats_row("Writer close (uw.close)", collect('close_s')))
|
||||||
|
w(stats_row("Graph drop", collect('drop_s')))
|
||||||
|
w(stats_row("MPHF build", collect('mphf_s')))
|
||||||
|
w(stats_row("MPHF open", collect('mphf_open_s')))
|
||||||
|
w(stats_row("Builders ready", collect('bld_ready_s')))
|
||||||
|
w(stats_row("Pass2 pipeline", collect('pass2_s')))
|
||||||
|
w(stats_row("Builders close", collect('bld_closed_s')))
|
||||||
|
w(stats_row("Post-MPHF (residual)", collect('post_s')))
|
||||||
|
w(stats_row("**Total per partition**", collect('total_s')))
|
||||||
|
w("")
|
||||||
|
|
||||||
|
# Throughput
|
||||||
|
w("## Throughput by phase\n")
|
||||||
|
w("| Phase | metric | min | median | mean | max |")
|
||||||
|
w("|-------|--------|-----|--------|------|-----|")
|
||||||
|
|
||||||
|
def rate_row(label, rates):
|
||||||
|
if not rates: return f"| {label} | — | — | — | — | — |"
|
||||||
|
f = lambda x: fmt_rate(x, 1)
|
||||||
|
mn, med, av, mx = min(rates), median(rates), mean(rates), max(rates)
|
||||||
|
return f"| {label} | nodes/s | {f(mn)} | {f(med)} | {f(av)} | {f(mx)} |"
|
||||||
|
|
||||||
|
w(rate_row("compute_degrees", rate_stats('n_nodes', 'cdeg_s')))
|
||||||
|
w(rate_row("Unitig traversal", rate_stats('n_nodes', 'trav_s')))
|
||||||
|
w(rate_row("MPHF build", rate_stats('n_unitigs', 'mphf_s')))
|
||||||
|
w("")
|
||||||
|
|
||||||
|
|
||||||
|
# Top 10 slowest partitions
|
||||||
|
w("## Top 10 slowest partitions\n")
|
||||||
|
w("| Partition | nodes | unitigs | total | trav | MPHF | graph build |")
|
||||||
|
w("|-----------|-------|---------|-------|------|------|-------------|")
|
||||||
|
sorted_parts = sorted(phases.values(), key=lambda r: r['total_s'] or 0, reverse=True)
|
||||||
|
for r in sorted_parts[:10]:
|
||||||
|
pid = r['pid']
|
||||||
|
def f(k): return fmt_s(r[k]) if r.get(k) is not None else "—"
|
||||||
|
nodes = f"{r['n_nodes']/1e6:.1f}M" if r['n_nodes'] else "—"
|
||||||
|
unitigs = f"{r['n_unitigs']/1e6:.1f}M" if r['n_unitigs'] else "—"
|
||||||
|
w(f"| {pid} | {nodes} | {unitigs} | {f('total_s')} | {f('trav_s')} | {f('mphf_s')} | {f('graph_build_s')} |")
|
||||||
|
w("")
|
||||||
|
|
||||||
|
# Phase share of total time (for non-empty partitions with full data)
|
||||||
|
complete = [r for r in phases.values()
|
||||||
|
if all(r.get(k) is not None
|
||||||
|
for k in ('total_s','trav_s','close_s','drop_s','mphf_s',
|
||||||
|
'mphf_open_s','bld_ready_s','pass2_s','bld_closed_s'))
|
||||||
|
and r['total_s'] and r['total_s'] > 0]
|
||||||
|
if complete:
|
||||||
|
w("## Phase share of total time (mean across complete partitions)\n")
|
||||||
|
total_mean = mean(r['total_s'] for r in complete)
|
||||||
|
w(f"_Based on {len(complete)} partitions with full timing data. Mean total: {fmt_s(total_mean)}_\n")
|
||||||
|
w("| Phase | mean time | share |")
|
||||||
|
w("|-------|-----------|-------|")
|
||||||
|
for label, key in [
|
||||||
|
("Graph build", 'graph_build_s'),
|
||||||
|
("compute_degrees", 'cdeg_s'),
|
||||||
|
("Unitig traversal", 'trav_s'),
|
||||||
|
("Writer close", 'close_s'),
|
||||||
|
("Graph drop", 'drop_s'),
|
||||||
|
("MPHF build", 'mphf_s'),
|
||||||
|
("MPHF open", 'mphf_open_s'),
|
||||||
|
("Builders ready", 'bld_ready_s'),
|
||||||
|
("Pass2 pipeline", 'pass2_s'),
|
||||||
|
("Builders close", 'bld_closed_s'),
|
||||||
|
("Post-MPHF (residual)", 'post_s'),
|
||||||
|
]:
|
||||||
|
vals = [r[key] for r in complete]
|
||||||
|
m = mean(vals)
|
||||||
|
w(f"| {label} | {fmt_s(m)} | {pct(m, total_mean)} |")
|
||||||
|
w("")
|
||||||
|
|
||||||
|
print('\n'.join(out))
|
||||||
@@ -7,8 +7,6 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::atomic::{AtomicU8, Ordering};
|
use std::sync::atomic::{AtomicU8, Ordering};
|
||||||
use std::time::Instant;
|
|
||||||
use tracing::{debug, info};
|
|
||||||
use xxhash_rust::xxh3::Xxh3Builder;
|
use xxhash_rust::xxh3::Xxh3Builder;
|
||||||
|
|
||||||
// ── Types ─────────────────────────────────────────────────────────────────────
|
// ── Types ─────────────────────────────────────────────────────────────────────
|
||||||
@@ -285,7 +283,6 @@ impl GraphDeBruijn {
|
|||||||
pub fn compute_degrees_and_mark_starts(&self) {
|
pub fn compute_degrees_and_mark_starts(&self) {
|
||||||
// Pass 1: count right/left neighbors for each node
|
// Pass 1: count right/left neighbors for each node
|
||||||
|
|
||||||
let t1 = Instant::now();
|
|
||||||
self.for_each_node(|kmer, atomic| {
|
self.for_each_node(|kmer, atomic| {
|
||||||
let mut old = Node(atomic.load(Ordering::Relaxed));
|
let mut old = Node(atomic.load(Ordering::Relaxed));
|
||||||
if old.is_visited() {
|
if old.is_visited() {
|
||||||
@@ -295,20 +292,13 @@ impl GraphDeBruijn {
|
|||||||
}
|
}
|
||||||
let (rc, rn) = count_neighbors(&kmer.right_canonical_neighbors(), &self.nodes);
|
let (rc, rn) = count_neighbors(&kmer.right_canonical_neighbors(), &self.nodes);
|
||||||
let (lc, ln) = count_neighbors(&kmer.left_canonical_neighbors(), &self.nodes);
|
let (lc, ln) = count_neighbors(&kmer.left_canonical_neighbors(), &self.nodes);
|
||||||
let mut node = Node(0); // reset all bits (visited=0, start=0)
|
let mut node = Node(0);
|
||||||
node.set_right(rc, rn);
|
node.set_right(rc, rn);
|
||||||
node.set_left(lc, ln);
|
node.set_left(lc, ln);
|
||||||
atomic.store(node.0, Ordering::Relaxed);
|
atomic.store(node.0, Ordering::Relaxed);
|
||||||
});
|
});
|
||||||
debug!(
|
|
||||||
"[compute_degrees] pass 1 (degrees): {:?} — {} nodes",
|
|
||||||
t1.elapsed(),
|
|
||||||
self.nodes.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Pass 2: mark start nodes
|
// Pass 2: mark start nodes
|
||||||
|
|
||||||
let t2 = Instant::now();
|
|
||||||
self.for_each_node(|kmer, atomic| {
|
self.for_each_node(|kmer, atomic| {
|
||||||
let mut node = Node(atomic.load(Ordering::Relaxed));
|
let mut node = Node(atomic.load(Ordering::Relaxed));
|
||||||
if node.is_visited() {
|
if node.is_visited() {
|
||||||
@@ -319,11 +309,6 @@ impl GraphDeBruijn {
|
|||||||
atomic.store(node.0, Ordering::Relaxed);
|
atomic.store(node.0, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
debug!(
|
|
||||||
"[compute_degrees] pass 2 (starts): {:?} — {} nodes",
|
|
||||||
t2.elapsed(),
|
|
||||||
self.nodes.len()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
|
pub fn is_visited(&self, kmer: &CanonicalKmer) -> Option<bool> {
|
||||||
@@ -391,7 +376,6 @@ impl GraphDeBruijn {
|
|||||||
let n2 = std::sync::atomic::AtomicUsize::new(0);
|
let n2 = std::sync::atomic::AtomicUsize::new(0);
|
||||||
|
|
||||||
// Boucle unique : traiter les starts, recalculer les arités, recommencer
|
// Boucle unique : traiter les starts, recalculer les arités, recommencer
|
||||||
let mut pass = 0usize;
|
|
||||||
loop {
|
loop {
|
||||||
let n_new = std::sync::atomic::AtomicUsize::new(0);
|
let n_new = std::sync::atomic::AtomicUsize::new(0);
|
||||||
|
|
||||||
@@ -421,9 +405,7 @@ impl GraphDeBruijn {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let n = n_new.load(Ordering::Relaxed);
|
let n = n_new.load(Ordering::Relaxed);
|
||||||
debug!("[for_each_unitig] pass {}: {} starts", pass, n);
|
|
||||||
n_chains.fetch_add(n, Ordering::Relaxed);
|
n_chains.fetch_add(n, Ordering::Relaxed);
|
||||||
pass += 1;
|
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -452,12 +434,6 @@ impl GraphDeBruijn {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
|
||||||
chains = n_chains.load(Ordering::Relaxed),
|
|
||||||
phase2 = n2.load(Ordering::Relaxed),
|
|
||||||
total = n_chains.load(Ordering::Relaxed) + n2.load(Ordering::Relaxed),
|
|
||||||
"unitig traversal complete"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Merge `other` into `self`.
|
/// Merge `other` into `self`.
|
||||||
|
|||||||
@@ -289,14 +289,39 @@ impl KmerIndex {
|
|||||||
activate_tx.send(()).ok();
|
activate_tx.send(()).ok();
|
||||||
n_workers = 1;
|
n_workers = 1;
|
||||||
|
|
||||||
|
const SPAWN_POLL: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
let mut completed = 0usize;
|
let mut completed = 0usize;
|
||||||
while completed < n_partitions {
|
while completed < n_partitions {
|
||||||
let (i, r, dur) = result_rx.recv().map_err(|_| {
|
let result = result_rx.recv_timeout(SPAWN_POLL);
|
||||||
OKIError::Io(io::Error::new(
|
|
||||||
|
// On timeout: no partition finished yet, just check efficiency.
|
||||||
|
let (i, r, dur) = match result {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
|
||||||
|
if n_workers < max_workers {
|
||||||
|
let eff = cpu_sample.cpu_efficiency(n_cores);
|
||||||
|
if eff < SPAWN_THRESHOLD {
|
||||||
|
debug!(
|
||||||
|
"activated worker {} (poll) — efficiency {:.0}%",
|
||||||
|
n_workers + 1,
|
||||||
|
eff * 100.0,
|
||||||
|
);
|
||||||
|
efficiency_at_last_spawn = eff;
|
||||||
|
activate_tx.send(()).ok();
|
||||||
|
n_workers += 1;
|
||||||
|
cpu_sample = CpuSample::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
|
||||||
|
return Err(OKIError::Io(io::Error::new(
|
||||||
io::ErrorKind::UnexpectedEof,
|
io::ErrorKind::UnexpectedEof,
|
||||||
"worker channel closed",
|
"worker channel closed",
|
||||||
))
|
)));
|
||||||
})?;
|
}
|
||||||
|
};
|
||||||
let g_len = r.map_err(OKIError::Partition)?;
|
let g_len = r.map_err(OKIError::Partition)?;
|
||||||
pb.inc(1);
|
pb.inc(1);
|
||||||
debug!(
|
debug!(
|
||||||
|
|||||||
@@ -304,27 +304,37 @@ impl KmerPartition {
|
|||||||
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
|
let new_layer_dir = dst_index_dir.join(format!("layer_{new_layer_idx}"));
|
||||||
|
|
||||||
let n_new = if any_new {
|
let n_new = if any_new {
|
||||||
|
let t_deg = std::time::Instant::now();
|
||||||
g.compute_degrees_and_mark_starts();
|
g.compute_degrees_and_mark_starts();
|
||||||
|
debug!("partition {i}: compute_degrees in {:.3}s — {} nodes",
|
||||||
|
t_deg.elapsed().as_secs_f64(), g.len());
|
||||||
fs::create_dir_all(&new_layer_dir)?;
|
fs::create_dir_all(&new_layer_dir)?;
|
||||||
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
|
let mut uw = Layer::<()>::unitig_writer(&new_layer_dir).map_err(olm_to_sk)?;
|
||||||
|
debug!("partition {i}: unitig traversal start — {} nodes", g.len());
|
||||||
g.try_for_each_unitig(|unitig| {
|
g.try_for_each_unitig(|unitig| {
|
||||||
uw.write(unitig)
|
uw.write(unitig)
|
||||||
})?;
|
})?;
|
||||||
|
debug!("partition {i}: unitig writer closing");
|
||||||
uw.close()?;
|
uw.close()?;
|
||||||
|
debug!("partition {i}: unitig writer closed — dropping graph ({} nodes)", g.len());
|
||||||
let n = g.len();
|
let n = g.len();
|
||||||
drop(g); // release GraphDeBruijn before MPHF build
|
drop(g);
|
||||||
|
debug!("partition {i}: graph dropped — starting MPHF build ({n} unitigs)");
|
||||||
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
|
Layer::<()>::build(&new_layer_dir, block_bits, evidence).map_err(olm_to_sk)?;
|
||||||
|
debug!("partition {i}: MPHF build done");
|
||||||
n
|
n
|
||||||
} else {
|
} else {
|
||||||
drop(g);
|
drop(g);
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let t_open = std::time::Instant::now();
|
||||||
let new_mphf: Option<Arc<MphfOnly>> = if any_new {
|
let new_mphf: Option<Arc<MphfOnly>> = if any_new {
|
||||||
Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?))
|
Some(Arc::new(MphfOnly::open(&new_layer_dir).map_err(olm_to_sk)?))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
debug!("partition {i}: MPHF open in {:.3}s", t_open.elapsed().as_secs_f64());
|
||||||
|
|
||||||
// ── Prepare matrix directories for the new layer ──────────────────────
|
// ── Prepare matrix directories for the new layer ──────────────────────
|
||||||
// Absent columns (dst genomes) are written via append_column (all-zero/false).
|
// Absent columns (dst genomes) are written via append_column (all-zero/false).
|
||||||
@@ -379,6 +389,7 @@ impl KmerPartition {
|
|||||||
vec![]
|
vec![]
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let t_builders = std::time::Instant::now();
|
||||||
// Builders for existing layers: n_src_total per layer.
|
// Builders for existing layers: n_src_total per layer.
|
||||||
// Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1.
|
// Columns n_dst_genomes .. n_dst_genomes + n_src_total - 1.
|
||||||
let exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers)
|
let exist_builders: Vec<Vec<ColBuilder>> = (0..n_dst_layers)
|
||||||
@@ -410,7 +421,10 @@ impl KmerPartition {
|
|||||||
})
|
})
|
||||||
.collect::<SKResult<_>>()?;
|
.collect::<SKResult<_>>()?;
|
||||||
|
|
||||||
|
debug!("partition {i}: builders ready in {:.3}s", t_builders.elapsed().as_secs_f64());
|
||||||
|
|
||||||
// ── Pass 2: fill builders (pipeline) ─────────────────────────────────
|
// ── Pass 2: fill builders (pipeline) ─────────────────────────────────
|
||||||
|
let t_pass2 = std::time::Instant::now();
|
||||||
// Collect source items before the pipeline so load_meta errors propagate
|
// Collect source items before the pipeline so load_meta errors propagate
|
||||||
// via ? before any worker thread is spawned.
|
// via ? before any worker thread is spawned.
|
||||||
let mut pass2_items: Vec<(usize, usize, PathBuf)> = Vec::new();
|
let mut pass2_items: Vec<(usize, usize, PathBuf)> = Vec::new();
|
||||||
@@ -439,8 +453,18 @@ impl KmerPartition {
|
|||||||
WriteBatch(Vec<(Option<usize>, usize, usize, u32)>),
|
WriteBatch(Vec<(Option<usize>, usize, usize, u32)>),
|
||||||
}
|
}
|
||||||
|
|
||||||
let builders = Arc::new(Mutex::new((exist_builders, new_src_builders)));
|
let exist_locked: Vec<Vec<Arc<Mutex<ColBuilder>>>> = exist_builders
|
||||||
let builders_sink = Arc::clone(&builders);
|
.into_iter()
|
||||||
|
.map(|layer| layer.into_iter().map(|b| Arc::new(Mutex::new(b))).collect())
|
||||||
|
.collect();
|
||||||
|
let new_locked: Vec<Arc<Mutex<ColBuilder>>> = new_src_builders
|
||||||
|
.into_iter()
|
||||||
|
.map(|b| Arc::new(Mutex::new(b)))
|
||||||
|
.collect();
|
||||||
|
let exist_sink: Vec<Vec<Arc<Mutex<ColBuilder>>>> = exist_locked.iter()
|
||||||
|
.map(|layer| layer.iter().map(Arc::clone).collect())
|
||||||
|
.collect();
|
||||||
|
let new_sink: Vec<Arc<Mutex<ColBuilder>>> = new_locked.iter().map(Arc::clone).collect();
|
||||||
let dst_map_t2 = Arc::clone(&dst_map);
|
let dst_map_t2 = Arc::clone(&dst_map);
|
||||||
let new_mphf_t2 = new_mphf.clone();
|
let new_mphf_t2 = new_mphf.clone();
|
||||||
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
let pass2_err: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||||
@@ -519,11 +543,10 @@ impl KmerPartition {
|
|||||||
],
|
],
|
||||||
make_sink!(Pass2Data, {
|
make_sink!(Pass2Data, {
|
||||||
move |ops: Vec<(Option<usize>, usize, usize, u32)>| {
|
move |ops: Vec<(Option<usize>, usize, usize, u32)>| {
|
||||||
let mut guard = builders_sink.lock().unwrap();
|
|
||||||
for (layer_opt, col, slot, val) in ops {
|
for (layer_opt, col, slot, val) in ops {
|
||||||
match layer_opt {
|
match layer_opt {
|
||||||
Some(l) => guard.0[l][col].set_val(slot, val),
|
Some(l) => exist_sink[l][col].lock().unwrap().set_val(slot, val),
|
||||||
None => guard.1[col].set_val(slot, val),
|
None => new_sink[col].lock().unwrap().set_val(slot, val),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -531,6 +554,7 @@ impl KmerPartition {
|
|||||||
);
|
);
|
||||||
|
|
||||||
WorkerPool::new(pipeline2, n_workers, capacity).run();
|
WorkerPool::new(pipeline2, n_workers, capacity).run();
|
||||||
|
debug!("partition {i}: pass2 pipeline done in {:.3}s", t_pass2.elapsed().as_secs_f64());
|
||||||
|
|
||||||
if let Some(msg) = Arc::try_unwrap(pass2_err)
|
if let Some(msg) = Arc::try_unwrap(pass2_err)
|
||||||
.unwrap_or_else(|_| panic!("pass2: pass2_err not uniquely owned"))
|
.unwrap_or_else(|_| panic!("pass2: pass2_err not uniquely owned"))
|
||||||
@@ -540,16 +564,16 @@ impl KmerPartition {
|
|||||||
return Err(SKError::InvalidData { context: "merge pass2", detail: msg });
|
return Err(SKError::InvalidData { context: "merge pass2", detail: msg });
|
||||||
}
|
}
|
||||||
|
|
||||||
let (exist_builders, new_src_builders) = Arc::try_unwrap(builders)
|
let t_close = std::time::Instant::now();
|
||||||
.unwrap_or_else(|_| panic!("pass2: builders not uniquely owned after pipeline"))
|
|
||||||
.into_inner()
|
|
||||||
.unwrap_or_else(|e| e.into_inner());
|
|
||||||
|
|
||||||
// ── Close builders and update metadata ────────────────────────────────
|
// ── Close builders and update metadata ────────────────────────────────
|
||||||
for (l, builders) in exist_builders.into_iter().enumerate() {
|
for (l, builders) in exist_locked.into_iter().enumerate() {
|
||||||
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
|
let layer_dir = dst_index_dir.join(format!("layer_{l}"));
|
||||||
for b in builders {
|
for b in builders {
|
||||||
b.close()?;
|
Arc::try_unwrap(b)
|
||||||
|
.unwrap_or_else(|_| panic!("pass2: exist_builder[{l}] not uniquely owned"))
|
||||||
|
.into_inner()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.close()?;
|
||||||
}
|
}
|
||||||
let n = dst_map.layer(l).n();
|
let n = dst_map.layer(l).n();
|
||||||
let data_dir = match mode {
|
let data_dir = match mode {
|
||||||
@@ -559,8 +583,12 @@ impl KmerPartition {
|
|||||||
write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?;
|
write_matrix_meta(&data_dir, n, n_dst_genomes + n_src_total).map_err(SKError::Io)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for b in new_src_builders {
|
for b in new_locked {
|
||||||
b.close()?;
|
Arc::try_unwrap(b)
|
||||||
|
.unwrap_or_else(|_| panic!("pass2: new_builder not uniquely owned"))
|
||||||
|
.into_inner()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.close()?;
|
||||||
}
|
}
|
||||||
if any_new {
|
if any_new {
|
||||||
let data_dir = match mode {
|
let data_dir = match mode {
|
||||||
@@ -575,6 +603,8 @@ impl KmerPartition {
|
|||||||
part_meta.save(&dst_index_dir).map_err(olm_to_sk)?;
|
part_meta.save(&dst_index_dir).map_err(olm_to_sk)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug!("partition {i}: builders closed in {:.3}s", t_close.elapsed().as_secs_f64());
|
||||||
|
|
||||||
Ok(n_new)
|
Ok(n_new)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user