Files

1501 lines
41 KiB
HTML
Raw Permalink Normal View History

<!doctype html>
<html lang="en" class="no-js">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<link rel="prev" href="../pipeline/">
<link rel="next" href="../storage/">
<link rel="icon" href="../../assets/images/favicon.png">
<meta name="generator" content="mkdocs-1.6.1, mkdocs-material-9.7.6">
<title>obipipeline library - obikmer</title>
<link rel="stylesheet" href="../../assets/stylesheets/main.484c7ddc.min.css">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto:300,300i,400,400i,700,700i%7CRoboto+Mono:400,400i,700,700i&display=fallback">
<style>:root{--md-text-font:"Roboto";--md-code-font:"Roboto Mono"}</style>
<script>__md_scope=new URL("../..",location),__md_hash=e=>[...e].reduce(((e,_)=>(e<<5)-e+_.charCodeAt(0)),0),__md_get=(e,_=localStorage,t=__md_scope)=>JSON.parse(_.getItem(t.pathname+"."+e)),__md_set=(e,_,t=localStorage,a=__md_scope)=>{try{t.setItem(a.pathname+"."+e,JSON.stringify(_))}catch(e){}}</script>
</head>
<body dir="ltr">
<input class="md-toggle" data-md-toggle="drawer" type="checkbox" id="__drawer" autocomplete="off">
<input class="md-toggle" data-md-toggle="search" type="checkbox" id="__search" autocomplete="off">
<label class="md-overlay" for="__drawer"></label>
<div data-md-component="skip">
<a href="#obipipeline-parallel-pipeline-library" class="md-skip">
Skip to content
</a>
</div>
<div data-md-component="announce">
</div>
<header class="md-header md-header--shadow" data-md-component="header">
<nav class="md-header__inner md-grid" aria-label="Header">
<a href="../.." title="obikmer" class="md-header__button md-logo" aria-label="obikmer" data-md-component="logo">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M12 8a3 3 0 0 0 3-3 3 3 0 0 0-3-3 3 3 0 0 0-3 3 3 3 0 0 0 3 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54"/></svg>
</a>
<label class="md-header__button md-icon" for="__drawer">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M3 6h18v2H3zm0 5h18v2H3zm0 5h18v2H3z"/></svg>
</label>
<div class="md-header__title" data-md-component="header-title">
<div class="md-header__ellipsis">
<div class="md-header__topic">
<span class="md-ellipsis">
obikmer
</span>
</div>
<div class="md-header__topic" data-md-component="header-topic">
<span class="md-ellipsis">
obipipeline library
</span>
</div>
</div>
</div>
<script>var palette=__md_get("__palette");if(palette&&palette.color){if("(prefers-color-scheme)"===palette.color.media){var media=matchMedia("(prefers-color-scheme: light)"),input=document.querySelector(media.matches?"[data-md-color-media='(prefers-color-scheme: light)']":"[data-md-color-media='(prefers-color-scheme: dark)']");palette.color.media=input.getAttribute("data-md-color-media"),palette.color.scheme=input.getAttribute("data-md-color-scheme"),palette.color.primary=input.getAttribute("data-md-color-primary"),palette.color.accent=input.getAttribute("data-md-color-accent")}for(var[key,value]of Object.entries(palette.color))document.body.setAttribute("data-md-color-"+key,value)}</script>
</nav>
</header>
<div class="md-container" data-md-component="container">
<main class="md-main" data-md-component="main">
<div class="md-main__inner md-grid">
<div class="md-sidebar md-sidebar--primary" data-md-component="sidebar" data-md-type="navigation" >
<div class="md-sidebar__scrollwrap">
<div class="md-sidebar__inner">
<nav class="md-nav md-nav--primary" aria-label="Navigation" data-md-level="0">
<label class="md-nav__title" for="__drawer">
<a href="../.." title="obikmer" class="md-nav__button md-logo" aria-label="obikmer" data-md-component="logo">
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24"><path d="M12 8a3 3 0 0 0 3-3 3 3 0 0 0-3-3 3 3 0 0 0-3 3 3 3 0 0 0 3 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54"/></svg>
</a>
obikmer
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item">
<a href="../.." class="md-nav__link">
<span class="md-ellipsis">
Home
</span>
</a>
</li>
<li class="md-nav__item md-nav__item--nested">
<input class="md-nav__toggle md-toggle " type="checkbox" id="__nav_2" >
<label class="md-nav__link" for="__nav_2" id="__nav_2_label" tabindex="0">
<span class="md-ellipsis">
Theory
</span>
<span class="md-nav__icon md-icon"></span>
</label>
<nav class="md-nav" data-md-level="1" aria-labelledby="__nav_2_label" aria-expanded="false">
<label class="md-nav__title" for="__nav_2">
<span class="md-nav__icon md-icon"></span>
Theory
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item">
<a href="../../kmers/" class="md-nav__link">
<span class="md-ellipsis">
Kmers and super-kmers
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../theory/encoding/" class="md-nav__link">
<span class="md-ellipsis">
DNA encoding
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../theory/entropy/" class="md-nav__link">
<span class="md-ellipsis">
Entropy filter
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../theory/minimizer/" class="md-nav__link">
<span class="md-ellipsis">
Minimizer selection
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../theory/indexing/" class="md-nav__link">
<span class="md-ellipsis">
Partitioning architecture
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item md-nav__item--active md-nav__item--nested">
<input class="md-nav__toggle md-toggle " type="checkbox" id="__nav_3" checked>
<label class="md-nav__link" for="__nav_3" id="__nav_3_label" tabindex="0">
<span class="md-ellipsis">
Implementation
</span>
<span class="md-nav__icon md-icon"></span>
</label>
<nav class="md-nav" data-md-level="1" aria-labelledby="__nav_3_label" aria-expanded="true">
<label class="md-nav__title" for="__nav_3">
<span class="md-nav__icon md-icon"></span>
Implementation
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item">
<a href="../superkmer/" class="md-nav__link">
<span class="md-ellipsis">
SuperKmer
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../kmer/" class="md-nav__link">
<span class="md-ellipsis">
Kmer
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../chunkreader/" class="md-nav__link">
<span class="md-ellipsis">
Chunk reader
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../pipeline/" class="md-nav__link">
<span class="md-ellipsis">
Construction pipeline
</span>
</a>
</li>
<li class="md-nav__item md-nav__item--active">
<input class="md-nav__toggle md-toggle" type="checkbox" id="__toc">
<label class="md-nav__link md-nav__link--active" for="__toc">
<span class="md-ellipsis">
obipipeline library
</span>
<span class="md-nav__icon md-icon"></span>
</label>
<a href="./" class="md-nav__link md-nav__link--active">
<span class="md-ellipsis">
obipipeline library
</span>
</a>
<nav class="md-nav md-nav--secondary" aria-label="Table of contents">
<label class="md-nav__title" for="__toc">
<span class="md-nav__icon md-icon"></span>
Table of contents
</label>
<ul class="md-nav__list" data-md-component="toc" data-md-scrollfix>
<li class="md-nav__item">
<a href="#core-types" class="md-nav__link">
<span class="md-ellipsis">
Core types
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#workerpool" class="md-nav__link">
<span class="md-ellipsis">
WorkerPool
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#data-enum" class="md-nav__link">
<span class="md-ellipsis">
Data enum
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#macros" class="md-nav__link">
<span class="md-ellipsis">
Macros
</span>
</a>
<nav class="md-nav" aria-label="Macros">
<ul class="md-nav__list">
<li class="md-nav__item">
<a href="#low-level" class="md-nav__link">
<span class="md-ellipsis">
Low-level
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#make_pipeline-dsl" class="md-nav__link">
<span class="md-ellipsis">
make_pipeline! DSL
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#make_pipe-dsl" class="md-nav__link">
<span class="md-ellipsis">
make_pipe! DSL
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item">
<a href="#scheduler-architecture" class="md-nav__link">
<span class="md-ellipsis">
Scheduler architecture
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#error-handling" class="md-nav__link">
<span class="md-ellipsis">
Error handling
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#example" class="md-nav__link">
<span class="md-ellipsis">
Example
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item">
<a href="../storage/" class="md-nav__link">
<span class="md-ellipsis">
On-disk storage
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../mphf/" class="md-nav__link">
<span class="md-ellipsis">
MPHF selection
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../unitig_evidence/" class="md-nav__link">
<span class="md-ellipsis">
Unitig evidence encoding
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../evidence_elimination/" class="md-nav__link">
<span class="md-ellipsis">
Evidence elimination (discussion)
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../obilayeredmap/" class="md-nav__link">
<span class="md-ellipsis">
obilayeredmap crate
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../persistent_compact_int_vec/" class="md-nav__link">
<span class="md-ellipsis">
PersistentCompactIntVec
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../persistent_bit_vec/" class="md-nav__link">
<span class="md-ellipsis">
PersistentBitVec
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../merge/" class="md-nav__link">
<span class="md-ellipsis">
Merge command
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../rebuild_filter/" class="md-nav__link">
<span class="md-ellipsis">
Kmer filtering (rebuild/dump/unitig)
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item md-nav__item--nested">
<input class="md-nav__toggle md-toggle " type="checkbox" id="__nav_4" >
<label class="md-nav__link" for="__nav_4" id="__nav_4_label" tabindex="0">
<span class="md-ellipsis">
Architecture
</span>
<span class="md-nav__icon md-icon"></span>
</label>
<nav class="md-nav" data-md-level="1" aria-labelledby="__nav_4_label" aria-expanded="false">
<label class="md-nav__title" for="__nav_4">
<span class="md-nav__icon md-icon"></span>
Architecture
</label>
<ul class="md-nav__list" data-md-scrollfix>
<li class="md-nav__item">
<a href="../../architecture/sequences/invariant/" class="md-nav__link">
<span class="md-ellipsis">
Sequences
</span>
</a>
</li>
<li class="md-nav__item">
<a href="../../architecture/index_architecture/" class="md-nav__link">
<span class="md-ellipsis">
Kmer index
</span>
</a>
</li>
</ul>
</nav>
</li>
</ul>
</nav>
</div>
</div>
</div>
<div class="md-sidebar md-sidebar--secondary" data-md-component="sidebar" data-md-type="toc" >
<div class="md-sidebar__scrollwrap">
<div class="md-sidebar__inner">
<nav class="md-nav md-nav--secondary" aria-label="Table of contents">
<label class="md-nav__title" for="__toc">
<span class="md-nav__icon md-icon"></span>
Table of contents
</label>
<ul class="md-nav__list" data-md-component="toc" data-md-scrollfix>
<li class="md-nav__item">
<a href="#core-types" class="md-nav__link">
<span class="md-ellipsis">
Core types
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#workerpool" class="md-nav__link">
<span class="md-ellipsis">
WorkerPool
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#data-enum" class="md-nav__link">
<span class="md-ellipsis">
Data enum
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#macros" class="md-nav__link">
<span class="md-ellipsis">
Macros
</span>
</a>
<nav class="md-nav" aria-label="Macros">
<ul class="md-nav__list">
<li class="md-nav__item">
<a href="#low-level" class="md-nav__link">
<span class="md-ellipsis">
Low-level
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#make_pipeline-dsl" class="md-nav__link">
<span class="md-ellipsis">
make_pipeline! DSL
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#make_pipe-dsl" class="md-nav__link">
<span class="md-ellipsis">
make_pipe! DSL
</span>
</a>
</li>
</ul>
</nav>
</li>
<li class="md-nav__item">
<a href="#scheduler-architecture" class="md-nav__link">
<span class="md-ellipsis">
Scheduler architecture
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#error-handling" class="md-nav__link">
<span class="md-ellipsis">
Error handling
</span>
</a>
</li>
<li class="md-nav__item">
<a href="#example" class="md-nav__link">
<span class="md-ellipsis">
Example
</span>
</a>
</li>
</ul>
</nav>
</div>
</div>
</div>
<div class="md-content" data-md-component="content">
<article class="md-content__inner md-typeset">
<h1 id="obipipeline-parallel-pipeline-library">obipipeline — parallel pipeline library</h1>
<p><code>obipipeline</code> is a generic, multi-threaded data pipeline crate. It connects a <strong>source</strong>, a chain of <strong>stages</strong>, and a <strong>sink</strong> via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.</p>
<h2 id="core-types">Core types</h2>
<table>
<thead>
<tr>
<th>Type alias</th>
<th>Rust type</th>
<th>Role</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>SourceFn&lt;D&gt;</code></td>
<td><code>Box&lt;dyn FnMut() -&gt; Result&lt;D, PipelineError&gt; + Send&gt;</code></td>
<td>Called repeatedly; <code>FnMut</code> because it holds iterator state</td>
</tr>
<tr>
<td><code>SharedFn&lt;D&gt;</code></td>
<td><code>Arc&lt;dyn Fn(D) -&gt; Result&lt;D, PipelineError&gt; + Send + Sync&gt;</code></td>
<td>1→1 transform shared across workers via <code>Arc::clone</code></td>
</tr>
<tr>
<td><code>SharedFlatFn&lt;D&gt;</code></td>
<td><code>Arc&lt;dyn Fn(D, &amp;Sender&lt;Result&lt;D, _&gt;&gt;, &amp;Sender&lt;isize&gt;) + Send + Sync&gt;</code></td>
<td>1→N transform; pushes items into channel, sends delta</td>
</tr>
<tr>
<td><code>SinkFn&lt;D&gt;</code></td>
<td><code>Box&lt;dyn Fn(D) -&gt; Result&lt;(), PipelineError&gt; + Send&gt;</code></td>
<td>Final consumer; returns <code>Result</code> so errors propagate back</td>
</tr>
</tbody>
</table>
<p>Stages come in two variants:</p>
<div class="highlight"><pre><span></span><code><span class="k">pub</span><span class="w"> </span><span class="k">enum</span><span class="w"> </span><span class="nc">Stage</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">Transform</span><span class="p">(</span><span class="n">SharedFn</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="p">),</span><span class="w"> </span><span class="c1">// 1→1</span>
<span class="w"> </span><span class="n">Flat</span><span class="p">(</span><span class="n">SharedFlatFn</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="p">),</span><span class="w"> </span><span class="c1">// 1→N</span>
<span class="p">}</span>
</code></pre></div>
<p><code>Pipeline&lt;D&gt;</code> holds one <code>SourceFn</code>, a <code>Vec&lt;Stage&gt;</code>, and one <code>SinkFn</code>.<br />
<code>WorkerPool&lt;D&gt;</code> wraps a <code>Pipeline</code> with <code>n_workers</code> and channel <code>capacity</code>.</p>
<h2 id="workerpool">WorkerPool</h2>
<div class="highlight"><pre><span></span><code><span class="n">WorkerPool</span><span class="p">::</span><span class="n">new</span><span class="p">(</span><span class="n">pipeline</span><span class="p">:</span><span class="w"> </span><span class="nc">Pipeline</span><span class="o">&lt;</span><span class="n">D</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">n_workers</span><span class="p">:</span><span class="w"> </span><span class="kt">usize</span><span class="p">,</span><span class="w"> </span><span class="n">capacity</span><span class="p">:</span><span class="w"> </span><span class="kt">usize</span><span class="p">)</span><span class="w"> </span><span class="p">-&gt;</span><span class="w"> </span><span class="nc">Self</span>
<span class="n">WorkerPool</span><span class="p">::</span><span class="n">run</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
</code></pre></div>
<table>
<thead>
<tr>
<th>Parameter</th>
<th>Role</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>n_workers</code></td>
<td>Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it.</td>
</tr>
<tr>
<td><code>capacity</code></td>
<td>Bound on every crossbeam channel in the pipeline. Controls memory and back-pressure: a full channel blocks the sender until a slot frees.</td>
</tr>
</tbody>
</table>
<p><code>run</code> consumes <code>self</code> (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.</p>
<h2 id="data-enum">Data enum</h2>
<p>All pipeline stages communicate through a single user-defined enum:</p>
<div class="highlight"><pre><span></span><code><span class="k">enum</span><span class="w"> </span><span class="nc">MyData</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">Unsigned</span><span class="p">(</span><span class="kt">u64</span><span class="p">),</span>
<span class="w"> </span><span class="n">Number</span><span class="p">(</span><span class="kt">f64</span><span class="p">),</span>
<span class="w"> </span><span class="n">Text</span><span class="p">(</span><span class="nb">String</span><span class="p">),</span>
<span class="p">}</span>
</code></pre></div>
<p>Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.</p>
<h2 id="macros">Macros</h2>
<p>Eight low-level macros build individual stages; one high-level macro (<code>make_pipeline!</code>) composes them.</p>
<h3 id="low-level">Low-level</h3>
<div class="highlight"><pre><span></span><code><span class="n">make_source</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">iterator</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// iterator yields T</span>
<span class="n">make_source_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">iterator</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// iterator yields Result&lt;T, E&gt;</span>
<span class="n">make_transform</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; U</span>
<span class="n">make_transform_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; Result&lt;U, E&gt;</span>
<span class="n">make_flat_transform</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; impl IntoIterator&lt;Item=U&gt;</span>
<span class="n">make_flat_transform_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">,</span><span class="w"> </span><span class="n">OutputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; Result&lt;impl IntoIterator&lt;Item=U&gt;, E&gt;</span>
<span class="n">make_sink</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; ()</span>
<span class="n">make_sink_fallible</span><span class="o">!</span><span class="p">(</span><span class="n">Enum</span><span class="p">,</span><span class="w"> </span><span class="n">func</span><span class="p">,</span><span class="w"> </span><span class="n">InputVariant</span><span class="p">)</span><span class="w"> </span><span class="c1">// func: T -&gt; Result&lt;(), E&gt;</span>
</code></pre></div>
<p>Each macro wraps the closure in the correct smart pointer (<code>Box</code> for source/sink, <code>Arc</code> for transforms).</p>
<h3 id="make_pipeline-dsl">make_pipeline! DSL</h3>
<div class="highlight"><pre><span></span><code>make_pipeline! {
DataEnum,
source iterator =&gt; OutputVariant, // or source? for fallible
| func: In =&gt; Out, // 1→1 non-fallible transform
|? func: In =&gt; Out, // 1→1 fallible transform
|| func: In =&gt; Out, // 1→N non-fallible flat transform
||? func: In =&gt; Out, // 1→N fallible flat transform
sink func @ InputVariant, // or sink? for fallible
}
</code></pre></div>
<p><code>?</code> marks fallibility on source, individual transforms, or sink independently.<br />
Implemented as a <strong>TT muncher</strong>: the internal rule <code>@build</code> recurses over transform tokens one at a time, accumulating them into a <code>vec![]</code>, then terminates on <code>sink</code>/<code>sink?</code>.</p>
<h3 id="make_pipe-dsl">make_pipe! DSL</h3>
<p><code>make_pipe!</code> builds a sourceless/sinkless <code>Pipe&lt;D, In, Out&gt;</code> — a reusable, composable stage sequence:</p>
<div class="highlight"><pre><span></span><code>make_pipe! {
DataEnum : InType =&gt; OutType,
| func: InVariant =&gt; OutVariant,
|? func: InVariant =&gt; OutVariant,
|| func: InVariant =&gt; OutVariant,
||? func: InVariant =&gt; OutVariant,
}
</code></pre></div>
<p>Two pipes compose with <code>.then(other)</code>. Apply to an iterator with <code>.apply(iter, n_workers, capacity)</code> to get a <code>PipeIter&lt;Out&gt;</code> — an iterator over the pipeline output, backed by a background <code>WorkerPool</code>. The scatter step in <code>obikmer</code> uses <code>make_pipe!</code> and <code>.apply()</code> rather than the full <code>make_pipeline!</code> / <code>WorkerPool</code> pattern.</p>
<h2 id="scheduler-architecture">Scheduler architecture</h2>
<div class="highlight"><pre><span></span><code>Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (×N)
▲ │
[stage_rxs] ────────┘◄──────────────────────────────┘
[flat_delta_rx] ──► Scheduler (in_flight adjustment)
[sink_err_rx] ← errors from sink (highest priority)
Sink thread
</code></pre></div>
<p>The scheduler is a single thread running a biased <code>Select</code> over all input channels. Priority order (highest first):</p>
<div class="highlight"><pre><span></span><code>index 0 sink_err_rx abort on sink error
index 1 flat_delta_rx adjust in_flight before dispatching
index 2..=n+1 stage_rxs[n-1..0] drain last stage first
index n+2 source_rx pull new data last
</code></pre></div>
<p>This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.</p>
<p><strong>Workers</strong> are generic: each receives a <code>WorkerTask</code> — either <code>Transform(data, stage_idx)</code> or <code>Flat(data, stage_idx)</code>. For <code>Transform</code>, the worker calls <code>f(data)</code> and sends the result to <code>stage_txs[stage_idx]</code>. For <code>Flat</code>, the worker calls <code>f(data, &amp;push_tx, &amp;delta_tx)</code>: the closure pushes N items into <code>push_tx</code> then sends <code>N-1</code> to <code>delta_tx</code>. The scheduler uses the delta to adjust <code>in_flight</code> without knowing N in advance.</p>
<p><strong>Termination</strong> uses an <code>in_flight: isize</code> counter and a <code>flat_workers_active: usize</code> counter:</p>
<ul>
<li><code>in_flight</code> incremented when an item is dispatched from source to workers</li>
<li><code>in_flight</code> decremented when the item exits the last stage to the sink</li>
<li><code>flat_workers_active</code> incremented when a <code>Flat</code> task is dispatched, decremented when the delta arrives</li>
<li>the loop exits only when <code>source_done &amp;&amp; in_flight == 0 &amp;&amp; flat_workers_active == 0</code></li>
</ul>
<p>This guarantees all in-flight items complete (including all N outputs of a flat stage) before <code>join()</code>.</p>
<h2 id="error-handling">Error handling</h2>
<p><code>PipelineError</code> has four variants:</p>
<table>
<thead>
<tr>
<th>Variant</th>
<th>Meaning</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>EndOfStream</code></td>
<td>Source exhausted (normal termination, not sent downstream)</td>
</tr>
<tr>
<td><code>TypeMismatch</code></td>
<td>Wrong enum variant arrived at a stage</td>
</tr>
<tr>
<td><code>StepKindMismatch</code></td>
<td>Internal routing error</td>
</tr>
<tr>
<td><code>StepError(Box&lt;dyn Error + Send + Sync&gt;)</code></td>
<td>Error from user code (wrapped by <code>make_*_fallible!</code>)</td>
</tr>
</tbody>
</table>
<p>Sink errors flow back to the scheduler via a dedicated <code>Receiver&lt;PipelineError&gt;</code> registered at index 0 of the Select — the pipeline stops immediately on the first sink error.</p>
<h2 id="example">Example</h2>
<div class="highlight"><pre><span></span><code><span class="k">enum</span><span class="w"> </span><span class="nc">PipelineData</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="n">Unsigned</span><span class="p">(</span><span class="kt">u64</span><span class="p">),</span><span class="w"> </span><span class="n">Number</span><span class="p">(</span><span class="kt">f64</span><span class="p">),</span><span class="w"> </span><span class="n">Text</span><span class="p">(</span><span class="nb">String</span><span class="p">)</span><span class="w"> </span><span class="p">}</span>
<span class="k">fn</span><span class="w"> </span><span class="nf">to_f64</span><span class="p">(</span><span class="n">x</span><span class="p">:</span><span class="w"> </span><span class="kt">u64</span><span class="p">)</span><span class="w"> </span><span class="p">-&gt;</span><span class="w"> </span><span class="kt">f64</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="n">x</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="kt">f64</span><span class="w"> </span><span class="p">}</span>
<span class="k">fn</span><span class="w"> </span><span class="nf">format_num</span><span class="p">(</span><span class="n">n</span><span class="p">:</span><span class="w"> </span><span class="kt">f64</span><span class="p">)</span><span class="w"> </span><span class="p">-&gt;</span><span class="w"> </span><span class="nb">String</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="fm">format!</span><span class="p">(</span><span class="s">&quot;{}&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">n</span><span class="p">)</span><span class="w"> </span><span class="p">}</span>
<span class="k">fn</span><span class="w"> </span><span class="nf">reverse</span><span class="p">(</span><span class="n">s</span><span class="p">:</span><span class="w"> </span><span class="nb">String</span><span class="p">)</span><span class="w"> </span><span class="p">-&gt;</span><span class="w"> </span><span class="nb">String</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="n">s</span><span class="p">.</span><span class="n">chars</span><span class="p">().</span><span class="n">rev</span><span class="p">().</span><span class="n">collect</span><span class="p">()</span><span class="w"> </span><span class="p">}</span>
<span class="k">fn</span><span class="w"> </span><span class="nf">hash</span><span class="p">(</span><span class="n">s</span><span class="p">:</span><span class="w"> </span><span class="nb">String</span><span class="p">)</span><span class="w"> </span><span class="p">-&gt;</span><span class="w"> </span><span class="kt">u64</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="cm">/* djb2 */</span><span class="w"> </span><span class="p">}</span>
<span class="k">fn</span><span class="w"> </span><span class="nf">print_hash</span><span class="p">(</span><span class="n">h</span><span class="p">:</span><span class="w"> </span><span class="kt">u64</span><span class="p">)</span><span class="w"> </span><span class="p">-&gt;</span><span class="w"> </span><span class="nb">Result</span><span class="o">&lt;</span><span class="p">(),</span><span class="w"> </span><span class="n">std</span><span class="p">::</span><span class="n">io</span><span class="p">::</span><span class="n">Error</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w"> </span><span class="fm">println!</span><span class="p">(</span><span class="s">&quot;{}&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">h</span><span class="p">);</span><span class="w"> </span><span class="nb">Ok</span><span class="p">(())</span><span class="w"> </span><span class="p">}</span>
<span class="kd">let</span><span class="w"> </span><span class="n">pipeline</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">make_pipeline</span><span class="o">!</span><span class="w"> </span><span class="p">{</span>
<span class="w"> </span><span class="n">PipelineData</span><span class="p">,</span>
<span class="w"> </span><span class="n">source</span><span class="w"> </span><span class="mi">1</span><span class="k">u64</span><span class="o">..=</span><span class="mi">10</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="n">Unsigned</span><span class="p">,</span>
<span class="w"> </span><span class="o">|</span><span class="w"> </span><span class="n">to_f64</span><span class="p">:</span><span class="w"> </span><span class="nc">Unsigned</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="n">Number</span><span class="p">,</span>
<span class="w"> </span><span class="o">|</span><span class="w"> </span><span class="n">format_num</span><span class="p">:</span><span class="w"> </span><span class="nc">Number</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="n">Text</span><span class="p">,</span>
<span class="w"> </span><span class="o">|</span><span class="w"> </span><span class="n">reverse</span><span class="p">:</span><span class="w"> </span><span class="nc">Text</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="n">Text</span><span class="p">,</span>
<span class="w"> </span><span class="o">|</span><span class="w"> </span><span class="n">hash</span><span class="p">:</span><span class="w"> </span><span class="nc">Text</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="n">Unsigned</span><span class="p">,</span>
<span class="w"> </span><span class="n">sink</span><span class="o">?</span><span class="w"> </span><span class="n">print_hash</span><span class="w"> </span><span class="o">@</span><span class="w"> </span><span class="n">Unsigned</span><span class="p">,</span>
<span class="p">};</span>
<span class="n">WorkerPool</span><span class="p">::</span><span class="n">new</span><span class="p">(</span><span class="n">pipeline</span><span class="p">,</span><span class="w"> </span><span class="mi">4</span><span class="p">,</span><span class="w"> </span><span class="mi">64</span><span class="p">).</span><span class="n">run</span><span class="p">();</span>
</code></pre></div>
</article>
</div>
<script>var target=document.getElementById(location.hash.slice(1));target&&target.name&&(target.checked=target.name.startsWith("__tabbed_"))</script>
</div>
</main>
<footer class="md-footer">
<div class="md-footer-meta md-typeset">
<div class="md-footer-meta__inner md-grid">
<div class="md-copyright">
Made with
<a href="https://squidfunk.github.io/mkdocs-material/" target="_blank" rel="noopener">
Material for MkDocs
</a>
</div>
</div>
</div>
</footer>
</div>
<div class="md-dialog" data-md-component="dialog">
<div class="md-dialog__inner md-typeset"></div>
</div>
<script id="__config" type="application/json">{"annotate": null, "base": "../..", "features": [], "search": "../../assets/javascripts/workers/search.2c215733.min.js", "tags": null, "translations": {"clipboard.copied": "Copied to clipboard", "clipboard.copy": "Copy to clipboard", "search.result.more.one": "1 more on this page", "search.result.more.other": "# more on this page", "search.result.none": "No matching documents", "search.result.one": "1 matching document", "search.result.other": "# matching documents", "search.result.placeholder": "Type to start searching", "search.result.term.missing": "Missing", "select.version": "Select version"}, "version": null}</script>
<script src="../../assets/javascripts/bundle.79ae519e.min.js"></script>
<script src="https://unpkg.com/mathjax@3/es5/tex-mml-chtml.js"></script>
</body>
</html>