docs: expand kmer indexing, filtering, and merging documentation

Expands MkDocs navigation and documentation for evidence elimination, the merge command, and kmer filtering. Refactors kmer representation to a generic `KmerOf<L>` type with a bitwise reverse complement algorithm. Unifies MPHF construction, introduces approximate fingerprint-based indexing, and updates the pipeline, chunkreader, and storage layouts. Adds code coverage reports and clarifies architectural invariants for improved maintainability.
This commit is contained in:
Eric Coissac
2026-06-04 21:27:01 +02:00
parent 9306ec1c56
commit bb7adc1154
50 changed files with 34226 additions and 1576 deletions
+155 -21
View File
@@ -662,6 +662,17 @@
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#make_pipe-dsl" class="md-nav__link">
<span class="md-ellipsis">
make_pipe! DSL
</span>
</a>
</li>
</ul>
@@ -801,6 +812,34 @@
<li class="md-nav__item">
<a href="../evidence_elimination/" class="md-nav__link">
<span class="md-ellipsis">
Evidence elimination (discussion)
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../obilayeredmap/" class="md-nav__link">
@@ -879,6 +918,62 @@
<li class="md-nav__item">
<a href="../merge/" class="md-nav__link">
<span class="md-ellipsis">
Merge command
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../rebuild_filter/" class="md-nav__link">
<span class="md-ellipsis">
Kmer filtering (rebuild/dump/unitig)
</span>
</a>
</li>
</ul>
</nav>
@@ -1087,6 +1182,17 @@
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#make_pipe-dsl" class="md-nav__link">
<span class="md-ellipsis">
make_pipe! DSL
</span>
</a>
</li>
</ul>
@@ -1145,7 +1251,7 @@
<h1 id="obipipeline-parallel-pipeline-library">obipipeline — parallel pipeline library</h1>
<p><code>obipipeline</code> is a generic, multi-threaded data pipeline crate. It connects a <strong>source</strong>, a chain of <strong>transforms</strong>, and a <strong>sink</strong> via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.</p>
<p><code>obipipeline</code> is a generic, multi-threaded data pipeline crate. It connects a <strong>source</strong>, a chain of <strong>stages</strong>, and a <strong>sink</strong> via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.</p>
<h2 id="core-types">Core types</h2>
<table>
<thead>
@@ -1158,22 +1264,33 @@
<tbody>
<tr>
<td><code>SourceFn&lt;D&gt;</code></td>
<td><code>Box&lt;dyn FnMut() -&gt; Result&lt;D, PipelineError&gt; + Send+Sync&gt;</code></td>
<td><code>Box&lt;dyn FnMut() -&gt; Result&lt;D, PipelineError&gt; + Send&gt;</code></td>
<td>Called repeatedly; <code>FnMut</code> because it holds iterator state</td>
</tr>
<tr>
<td><code>SharedFn&lt;D&gt;</code></td>
<td><code>Arc&lt;dyn Fn(D) -&gt; Result&lt;D, PipelineError&gt; + Send+Sync&gt;</code></td>
<td>Shared across workers via <code>Arc::clone</code> (no copy of the closure)</td>
<td><code>Arc&lt;dyn Fn(D) -&gt; Result&lt;D, PipelineError&gt; + Send + Sync&gt;</code></td>
<td>1→1 transform shared across workers via <code>Arc::clone</code></td>
</tr>
<tr>
<td><code>SharedFlatFn&lt;D&gt;</code></td>
<td><code>Arc&lt;dyn Fn(D, &amp;Sender&lt;Result&lt;D, _&gt;&gt;, &amp;Sender&lt;isize&gt;) + Send + Sync&gt;</code></td>
<td>1→N transform; pushes items into channel, sends delta</td>
</tr>
<tr>
<td><code>SinkFn&lt;D&gt;</code></td>
<td><code>Box&lt;dyn Fn(D) -&gt; Result&lt;(), PipelineError&gt; + Send+Sync&gt;</code></td>
<td><code>Box&lt;dyn Fn(D) -&gt; Result&lt;(), PipelineError&gt; + Send&gt;</code></td>
<td>Final consumer; returns <code>Result</code> so errors propagate back</td>
</tr>
</tbody>
</table>
<p><code>Pipeline&lt;D&gt;</code> holds one <code>SourceFn</code>, a <code>Vec&lt;SharedFn&gt;</code>, and one <code>SinkFn</code>.<br />
<p>Stages come in two variants:</p>
<div class="highlight"><pre><span></span><code><span class="k">pub</span><span class="w"> </span><span class="k">enum</span><span class="w"> </span><span class="nc">Stage</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">Transform</span><span class="p">(</span><span class="n">SharedFn</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="p">),</span><span class="w"> </span><span class="c1">// 1→1</span>
<span class="w"> </span><span class="n">Flat</span><span class="p">(</span><span class="n">SharedFlatFn</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="p">),</span><span class="w"> </span><span class="c1">// 1→N</span>
<span class="p">}</span>
</code></pre></div>
<p><code>Pipeline&lt;D&gt;</code> holds one <code>SourceFn</code>, a <code>Vec&lt;Stage&gt;</code>, and one <code>SinkFn</code>.<br />
<code>WorkerPool&lt;D&gt;</code> wraps a <code>Pipeline</code> with <code>n_workers</code> and channel <code>capacity</code>.</p>
<h2 id="workerpool">WorkerPool</h2>
<div class="highlight"><pre><span></span><code><span class="n">WorkerPool</span><span class="p">::</span><span class="n">new</span><span class="p">(</span><span class="n">pipeline</span><span class="p">:</span><span class="w"> </span><span class="nc">Pipeline</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">n_workers</span><span class="p">:</span><span class="w"> </span><span class="kt">usize</span><span class="p">,</span><span class="w"> </span><span class="n">capacity</span><span class="p">:</span><span class="w"> </span><span class="kt">usize</span><span class="p">)</span><span class="w"> </span><span class="p">-&gt;</span><span class="w"> </span><span class="nc">Self</span>
@@ -1193,7 +1310,7 @@
</tr>
<tr>
<td><code>capacity</code></td>
<td>Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees.</td>
<td>Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees.</td>
</tr>
</tbody>
</table>
@@ -1208,7 +1325,7 @@
</code></pre></div>
<p>Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.</p>
<h2 id="macros">Macros</h2>
<p>Six low-level macros build individual stages; one high-level macro (<code>make_pipeline!</code>) composes them.</p>
<p>Eight low-level macros build individual stages; one high-level macro (<code>make_pipeline!</code>) composes them.</p>
<h3 id="low-level">Low-level</h3>
<div class="highlight"><pre><span></span><code><span class="n">make_source</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">iterator</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// iterator yields T</span>
<span class="n">make_source_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">iterator</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// iterator yields Result&lt;T, E&gt;</span>
@@ -1216,6 +1333,9 @@
<span class="n">make_transform</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; U</span>
<span class="n">make_transform_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; Result&lt;U, E&gt;</span>
<span class="n">make_flat_transform</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; impl IntoIterator&lt;Item=U&gt;</span>
<span class="n">make_flat_transform_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; Result&lt;impl IntoIterator&lt;Item=U&gt;, E&gt;</span>
<span class="n">make_sink</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; ()</span>
<span class="n">make_sink_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; Result&lt;(), E&gt;</span>
</code></pre></div>
@@ -1224,17 +1344,31 @@
<div class="highlight"><pre><span></span><code>make_pipeline! {
DataEnum,
source iterator =&gt; OutputVariant, // or source? for fallible
| func: In =&gt; Out, // non-fallible transform
|? func: In =&gt; Out, // fallible transform
| func: In =&gt; Out, // 1→1 non-fallible transform
|? func: In =&gt; Out, // 1→1 fallible transform
|| func: In =&gt; Out, // 1→N non-fallible flat transform
||? func: In =&gt; Out, // 1→N fallible flat transform
sink func @ InputVariant, // or sink? for fallible
}
</code></pre></div>
<p><code>?</code> marks fallibility on source, individual transforms, or sink independently.<br />
Implemented as a <strong>TT muncher</strong>: the internal rule <code>@build</code> recurses over transform tokens one at a time, accumulating them into a <code>vec![]</code>, then terminates on <code>sink</code>/<code>sink?</code>.</p>
<h3 id="make_pipe-dsl">make_pipe! DSL</h3>
<p><code>make_pipe!</code> builds a sourceless/sinkless <code>Pipe&lt;D, In, Out&gt;</code> — a reusable, composable stage sequence:</p>
<div class="highlight"><pre><span></span><code>make_pipe! {
DataEnum : InType =&gt; OutType,
| func: InVariant =&gt; OutVariant,
|? func: InVariant =&gt; OutVariant,
|| func: InVariant =&gt; OutVariant,
||? func: InVariant =&gt; OutVariant,
}
</code></pre></div>
<p>Two pipes compose with <code>.then(other)</code>. Apply to an iterator with <code>.apply(iter, n_workers, capacity)</code> to get a <code>PipeIter&lt;Out&gt;</code> — an iterator over the pipeline output, backed by a background <code>WorkerPool</code>. The scatter step in <code>obikmer</code> uses <code>make_pipe!</code> and <code>.apply()</code> rather than the full <code>make_pipeline!</code> / <code>WorkerPool</code> pattern.</p>
<h2 id="scheduler-architecture">Scheduler architecture</h2>
<div class="highlight"><pre><span></span><code>Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
▲ │
[stage_rxs] ────────┘◄──────────────────────────────┘
[flat_delta_rx] ──► Scheduler (in_flight adjustment)
[sink_err_rx] ← errors from sink (highest priority)
@@ -1242,20 +1376,20 @@ Implemented as a <strong>TT muncher</strong>: the internal rule <code>@build</co
</code></pre></div>
<p>The scheduler is a single thread running a biased <code>Select</code> over all input channels. Priority order (highest first):</p>
<div class="highlight"><pre><span></span><code>index 0 sink_err_rx abort on sink error
index 1 stage_rxs[N-1] drain last stage first
...
index N stage_rxs[0]
index N+1 source_rx pull new data last
index 1 flat_delta_rx adjust in_flight before dispatching
index 2..=n+1 stage_rxs[n-1..0] drain last stage first
index n+2 source_rx pull new data last
</code></pre></div>
<p>This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.</p>
<p><strong>Workers</strong> are generic: each receives <code>(data, SharedFn, result_tx)</code> and calls <code>f(data)</code>, sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.</p>
<p><strong>Termination</strong> uses an <code>in_flight</code> counter:</p>
<p><strong>Workers</strong> are generic: each receives a <code>WorkerTask</code> — either <code>Transform(data, stage_idx)</code> or <code>Flat(data, stage_idx)</code>. For <code>Transform</code>, the worker calls <code>f(data)</code> and sends the result to <code>stage_txs[stage_idx]</code>. For <code>Flat</code>, the worker calls <code>f(data, &amp;push_tx, &amp;delta_tx)</code>: the closure pushes N items into <code>push_tx</code> then sends <code>N-1</code> to <code>delta_tx</code>. The scheduler uses the delta to adjust <code>in_flight</code> without knowing N in advance.</p>
<p><strong>Termination</strong> uses an <code>in_flight: isize</code> counter and a <code>flat_workers_active: usize</code> counter:</p>
<ul>
<li>incremented when an item is dispatched from source to workers</li>
<li>decremented when the item exits the last stage</li>
<li>the loop exits only when <code>source_done &amp;&amp; in_flight == 0</code></li>
<li><code>in_flight</code> incremented when an item is dispatched from source to workers</li>
<li><code>in_flight</code> decremented when the item exits the last stage to the sink</li>
<li><code>flat_workers_active</code> incremented when a <code>Flat</code> task is dispatched, decremented when the delta arrives</li>
<li>the loop exits only when <code>source_done &amp;&amp; in_flight == 0 &amp;&amp; flat_workers_active == 0</code></li>
</ul>
<p>This guarantees all in-flight items complete before <code>join()</code>.</p>
<p>This guarantees all in-flight items complete (including all N outputs of a flat stage) before <code>join()</code>.</p>
<h2 id="error-handling">Error handling</h2>
<p><code>PipelineError</code> has four variants:</p>
<table>
@@ -1279,7 +1413,7 @@ index N+1 source_rx pull new data last
<td>Internal routing error</td>
</tr>
<tr>
<td><code>StepError(Box&lt;dyn Error&gt;)</code></td>
<td><code>StepError(Box&lt;dyn Error + Send + Sync&gt;)</code></td>
<td>Error from user code (wrapped by <code>make_*_fallible!</code>)</td>
</tr>
</tbody>