2026-04-29 22:52:42 +02:00
<!doctype html>
< html lang = "en" class = "no-js" >
< head >
< meta charset = "utf-8" >
< meta name = "viewport" content = "width=device-width,initial-scale=1" >
< link rel = "prev" href = "../pipeline/" >
< link rel = "next" href = "../storage/" >
< link rel = "icon" href = "../../assets/images/favicon.png" >
< meta name = "generator" content = "mkdocs-1.6.1, mkdocs-material-9.7.6" >
< title > obipipeline library - obikmer< / title >
< link rel = "stylesheet" href = "../../assets/stylesheets/main.484c7ddc.min.css" >
< link rel = "preconnect" href = "https://fonts.gstatic.com" crossorigin >
< link rel = "stylesheet" href = "https://fonts.googleapis.com/css?family=Roboto:300,300i,400,400i,700,700i%7CRoboto+Mono:400,400i,700,700i&display=fallback" >
< style > : root { --md-text-font : "Roboto" ; --md-code-font : "Roboto Mono" } < / style >
< script > _ _md _scope = new URL ( "../.." , location ) , _ _md _hash = e => [ ... e ] . reduce ( ( ( e , _ ) => ( e << 5 ) - e + _ . charCodeAt ( 0 ) ) , 0 ) , _ _md _get = ( e , _ = localStorage , t = _ _md _scope ) => JSON . parse ( _ . getItem ( t . pathname + "." + e ) ) , _ _md _set = ( e , _ , t = localStorage , a = _ _md _scope ) => { try { t . setItem ( a . pathname + "." + e , JSON . stringify ( _ ) ) } catch ( e ) { } } < / script >
< / head >
< body dir = "ltr" >
< input class = "md-toggle" data-md-toggle = "drawer" type = "checkbox" id = "__drawer" autocomplete = "off" >
< input class = "md-toggle" data-md-toggle = "search" type = "checkbox" id = "__search" autocomplete = "off" >
< label class = "md-overlay" for = "__drawer" > < / label >
< div data-md-component = "skip" >
< a href = "#obipipeline-parallel-pipeline-library" class = "md-skip" >
Skip to content
< / a >
< / div >
< div data-md-component = "announce" >
< / div >
< header class = "md-header md-header--shadow" data-md-component = "header" >
< nav class = "md-header__inner md-grid" aria-label = "Header" >
< a href = "../.." title = "obikmer" class = "md-header__button md-logo" aria-label = "obikmer" data-md-component = "logo" >
< svg xmlns = "http://www.w3.org/2000/svg" viewBox = "0 0 24 24" > < path d = "M12 8a3 3 0 0 0 3-3 3 3 0 0 0-3-3 3 3 0 0 0-3 3 3 3 0 0 0 3 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54" / > < / svg >
< / a >
< label class = "md-header__button md-icon" for = "__drawer" >
< svg xmlns = "http://www.w3.org/2000/svg" viewBox = "0 0 24 24" > < path d = "M3 6h18v2H3zm0 5h18v2H3zm0 5h18v2H3z" / > < / svg >
< / label >
< div class = "md-header__title" data-md-component = "header-title" >
< div class = "md-header__ellipsis" >
< div class = "md-header__topic" >
< span class = "md-ellipsis" >
obikmer
< / span >
< / div >
< div class = "md-header__topic" data-md-component = "header-topic" >
< span class = "md-ellipsis" >
obipipeline library
< / span >
< / div >
< / div >
< / div >
< script > var palette = _ _md _get ( "__palette" ) ; if ( palette && palette . color ) { if ( "(prefers-color-scheme)" === palette . color . media ) { var media = matchMedia ( "(prefers-color-scheme: light)" ) , input = document . querySelector ( media . matches ? "[data-md-color-media='(prefers-color-scheme: light)']" : "[data-md-color-media='(prefers-color-scheme: dark)']" ) ; palette . color . media = input . getAttribute ( "data-md-color-media" ) , palette . color . scheme = input . getAttribute ( "data-md-color-scheme" ) , palette . color . primary = input . getAttribute ( "data-md-color-primary" ) , palette . color . accent = input . getAttribute ( "data-md-color-accent" ) } for ( var [ key , value ] of Object . entries ( palette . color ) ) document . body . setAttribute ( "data-md-color-" + key , value ) } < / script >
< / nav >
< / header >
< div class = "md-container" data-md-component = "container" >
< main class = "md-main" data-md-component = "main" >
< div class = "md-main__inner md-grid" >
< div class = "md-sidebar md-sidebar--primary" data-md-component = "sidebar" data-md-type = "navigation" >
< div class = "md-sidebar__scrollwrap" >
< div class = "md-sidebar__inner" >
< nav class = "md-nav md-nav--primary" aria-label = "Navigation" data-md-level = "0" >
< label class = "md-nav__title" for = "__drawer" >
< a href = "../.." title = "obikmer" class = "md-nav__button md-logo" aria-label = "obikmer" data-md-component = "logo" >
< svg xmlns = "http://www.w3.org/2000/svg" viewBox = "0 0 24 24" > < path d = "M12 8a3 3 0 0 0 3-3 3 3 0 0 0-3-3 3 3 0 0 0-3 3 3 3 0 0 0 3 3m0 3.54C9.64 9.35 6.5 8 3 8v11c3.5 0 6.64 1.35 9 3.54 2.36-2.19 5.5-3.54 9-3.54V8c-3.5 0-6.64 1.35-9 3.54" / > < / svg >
< / a >
obikmer
< / label >
< ul class = "md-nav__list" data-md-scrollfix >
< li class = "md-nav__item" >
< a href = "../.." class = "md-nav__link" >
< span class = "md-ellipsis" >
Home
< / span >
< / a >
< / li >
< li class = "md-nav__item md-nav__item--nested" >
< input class = "md-nav__toggle md-toggle " type = "checkbox" id = "__nav_2" >
< label class = "md-nav__link" for = "__nav_2" id = "__nav_2_label" tabindex = "0" >
< span class = "md-ellipsis" >
Theory
< / span >
< span class = "md-nav__icon md-icon" > < / span >
< / label >
< nav class = "md-nav" data-md-level = "1" aria-labelledby = "__nav_2_label" aria-expanded = "false" >
< label class = "md-nav__title" for = "__nav_2" >
< span class = "md-nav__icon md-icon" > < / span >
Theory
< / label >
< ul class = "md-nav__list" data-md-scrollfix >
< li class = "md-nav__item" >
< a href = "../../kmers/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Kmers and super-kmers
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../../theory/encoding/" class = "md-nav__link" >
< span class = "md-ellipsis" >
DNA encoding
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../../theory/entropy/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Entropy filter
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../../theory/minimizer/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Minimizer selection
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../../theory/indexing/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Partitioning architecture
< / span >
< / a >
< / li >
< / ul >
< / nav >
< / li >
< li class = "md-nav__item md-nav__item--active md-nav__item--nested" >
< input class = "md-nav__toggle md-toggle " type = "checkbox" id = "__nav_3" checked >
< label class = "md-nav__link" for = "__nav_3" id = "__nav_3_label" tabindex = "0" >
< span class = "md-ellipsis" >
Implementation
< / span >
< span class = "md-nav__icon md-icon" > < / span >
< / label >
< nav class = "md-nav" data-md-level = "1" aria-labelledby = "__nav_3_label" aria-expanded = "true" >
< label class = "md-nav__title" for = "__nav_3" >
< span class = "md-nav__icon md-icon" > < / span >
Implementation
< / label >
< ul class = "md-nav__list" data-md-scrollfix >
< li class = "md-nav__item" >
< a href = "../superkmer/" class = "md-nav__link" >
< span class = "md-ellipsis" >
SuperKmer
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../kmer/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Kmer
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../chunkreader/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Chunk reader
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../pipeline/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Construction pipeline
< / span >
< / a >
< / li >
< li class = "md-nav__item md-nav__item--active" >
< input class = "md-nav__toggle md-toggle" type = "checkbox" id = "__toc" >
< label class = "md-nav__link md-nav__link--active" for = "__toc" >
< span class = "md-ellipsis" >
obipipeline library
< / span >
< span class = "md-nav__icon md-icon" > < / span >
< / label >
< a href = "./" class = "md-nav__link md-nav__link--active" >
< span class = "md-ellipsis" >
obipipeline library
< / span >
< / a >
< nav class = "md-nav md-nav--secondary" aria-label = "Table of contents" >
< label class = "md-nav__title" for = "__toc" >
< span class = "md-nav__icon md-icon" > < / span >
Table of contents
< / label >
< ul class = "md-nav__list" data-md-component = "toc" data-md-scrollfix >
< li class = "md-nav__item" >
< a href = "#core-types" class = "md-nav__link" >
< span class = "md-ellipsis" >
Core types
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#workerpool" class = "md-nav__link" >
< span class = "md-ellipsis" >
WorkerPool
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#data-enum" class = "md-nav__link" >
< span class = "md-ellipsis" >
Data enum
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#macros" class = "md-nav__link" >
< span class = "md-ellipsis" >
Macros
< / span >
< / a >
< nav class = "md-nav" aria-label = "Macros" >
< ul class = "md-nav__list" >
< li class = "md-nav__item" >
< a href = "#low-level" class = "md-nav__link" >
< span class = "md-ellipsis" >
Low-level
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#make_pipeline-dsl" class = "md-nav__link" >
< span class = "md-ellipsis" >
make_pipeline! DSL
< / span >
< / a >
< / li >
< / ul >
< / nav >
< / li >
< li class = "md-nav__item" >
< a href = "#scheduler-architecture" class = "md-nav__link" >
< span class = "md-ellipsis" >
Scheduler architecture
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#error-handling" class = "md-nav__link" >
< span class = "md-ellipsis" >
Error handling
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#example" class = "md-nav__link" >
< span class = "md-ellipsis" >
Example
< / span >
< / a >
< / li >
< / ul >
< / nav >
< / li >
< li class = "md-nav__item" >
< a href = "../storage/" class = "md-nav__link" >
< span class = "md-ellipsis" >
On-disk storage
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../mphf/" class = "md-nav__link" >
< span class = "md-ellipsis" >
MPHF selection
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../unitig_evidence/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Unitig evidence encoding
< / span >
< / a >
< / li >
2026-05-15 21:07:23 +08:00
< li class = "md-nav__item" >
< a href = "../obilayeredmap/" class = "md-nav__link" >
< span class = "md-ellipsis" >
obilayeredmap crate
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../persistent_compact_int_vec/" class = "md-nav__link" >
< span class = "md-ellipsis" >
PersistentCompactIntVec
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "../persistent_bit_vec/" class = "md-nav__link" >
< span class = "md-ellipsis" >
PersistentBitVec
< / span >
< / a >
< / li >
2026-04-29 22:52:42 +02:00
< / ul >
< / nav >
< / li >
< li class = "md-nav__item md-nav__item--nested" >
< input class = "md-nav__toggle md-toggle " type = "checkbox" id = "__nav_4" >
< label class = "md-nav__link" for = "__nav_4" id = "__nav_4_label" tabindex = "0" >
< span class = "md-ellipsis" >
Architecture
< / span >
< span class = "md-nav__icon md-icon" > < / span >
< / label >
< nav class = "md-nav" data-md-level = "1" aria-labelledby = "__nav_4_label" aria-expanded = "false" >
< label class = "md-nav__title" for = "__nav_4" >
< span class = "md-nav__icon md-icon" > < / span >
Architecture
< / label >
< ul class = "md-nav__list" data-md-scrollfix >
< li class = "md-nav__item" >
< a href = "../../architecture/sequences/invariant/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Sequences
< / span >
< / a >
< / li >
2026-05-15 21:07:23 +08:00
< li class = "md-nav__item" >
< a href = "../../architecture/index_architecture/" class = "md-nav__link" >
< span class = "md-ellipsis" >
Kmer index
< / span >
< / a >
< / li >
2026-04-29 22:52:42 +02:00
< / ul >
< / nav >
< / li >
< / ul >
< / nav >
< / div >
< / div >
< / div >
< div class = "md-sidebar md-sidebar--secondary" data-md-component = "sidebar" data-md-type = "toc" >
< div class = "md-sidebar__scrollwrap" >
< div class = "md-sidebar__inner" >
< nav class = "md-nav md-nav--secondary" aria-label = "Table of contents" >
< label class = "md-nav__title" for = "__toc" >
< span class = "md-nav__icon md-icon" > < / span >
Table of contents
< / label >
< ul class = "md-nav__list" data-md-component = "toc" data-md-scrollfix >
< li class = "md-nav__item" >
< a href = "#core-types" class = "md-nav__link" >
< span class = "md-ellipsis" >
Core types
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#workerpool" class = "md-nav__link" >
< span class = "md-ellipsis" >
WorkerPool
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#data-enum" class = "md-nav__link" >
< span class = "md-ellipsis" >
Data enum
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#macros" class = "md-nav__link" >
< span class = "md-ellipsis" >
Macros
< / span >
< / a >
< nav class = "md-nav" aria-label = "Macros" >
< ul class = "md-nav__list" >
< li class = "md-nav__item" >
< a href = "#low-level" class = "md-nav__link" >
< span class = "md-ellipsis" >
Low-level
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#make_pipeline-dsl" class = "md-nav__link" >
< span class = "md-ellipsis" >
make_pipeline! DSL
< / span >
< / a >
< / li >
< / ul >
< / nav >
< / li >
< li class = "md-nav__item" >
< a href = "#scheduler-architecture" class = "md-nav__link" >
< span class = "md-ellipsis" >
Scheduler architecture
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#error-handling" class = "md-nav__link" >
< span class = "md-ellipsis" >
Error handling
< / span >
< / a >
< / li >
< li class = "md-nav__item" >
< a href = "#example" class = "md-nav__link" >
< span class = "md-ellipsis" >
Example
< / span >
< / a >
< / li >
< / ul >
< / nav >
< / div >
< / div >
< / div >
< div class = "md-content" data-md-component = "content" >
< article class = "md-content__inner md-typeset" >
< h1 id = "obipipeline-parallel-pipeline-library" > obipipeline — parallel pipeline library< / h1 >
< p > < code > obipipeline< / code > is a generic, multi-threaded data pipeline crate. It connects a < strong > source< / strong > , a chain of < strong > transforms< / strong > , and a < strong > sink< / strong > via crossbeam channels, running each stage with a shared worker pool and a biased scheduler.< / p >
< h2 id = "core-types" > Core types< / h2 >
< table >
< thead >
< tr >
< th > Type alias< / th >
< th > Rust type< / th >
< th > Role< / th >
< / tr >
< / thead >
< tbody >
< tr >
< td > < code > SourceFn< D> < / code > < / td >
< td > < code > Box< dyn FnMut() -> Result< D, PipelineError> + Send+Sync> < / code > < / td >
< td > Called repeatedly; < code > FnMut< / code > because it holds iterator state< / td >
< / tr >
< tr >
< td > < code > SharedFn< D> < / code > < / td >
< td > < code > Arc< dyn Fn(D) -> Result< D, PipelineError> + Send+Sync> < / code > < / td >
< td > Shared across workers via < code > Arc::clone< / code > (no copy of the closure)< / td >
< / tr >
< tr >
< td > < code > SinkFn< D> < / code > < / td >
< td > < code > Box< dyn Fn(D) -> Result< (), PipelineError> + Send+Sync> < / code > < / td >
< td > Final consumer; returns < code > Result< / code > so errors propagate back< / td >
< / tr >
< / tbody >
< / table >
< p > < code > Pipeline< D> < / code > holds one < code > SourceFn< / code > , a < code > Vec< SharedFn> < / code > , and one < code > SinkFn< / code > .< br / >
< code > WorkerPool< D> < / code > wraps a < code > Pipeline< / code > with < code > n_workers< / code > and channel < code > capacity< / code > .< / p >
< h2 id = "workerpool" > WorkerPool< / h2 >
< div class = "highlight" > < pre > < span > < / span > < code > < span class = "n" > WorkerPool< / span > < span class = "p" > ::< / span > < span class = "n" > new< / span > < span class = "p" > (< / span > < span class = "n" > pipeline< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "nc" > Pipeline< / span > < span class = "o" > < < / span > < span class = "n" > D< / span > < span class = "o" > > < / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > n_workers< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "kt" > usize< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > capacity< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "kt" > usize< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > -> < / span > < span class = "w" > < / span > < span class = "nc" > Self< / span >
< span class = "n" > WorkerPool< / span > < span class = "p" > ::< / span > < span class = "n" > run< / span > < span class = "p" > (< / span > < span class = "bp" > self< / span > < span class = "p" > )< / span >
< / code > < / pre > < / div >
< table >
< thead >
< tr >
< th > Parameter< / th >
< th > Role< / th >
< / tr >
< / thead >
< tbody >
< tr >
< td > < code > n_workers< / code > < / td >
< td > Number of parallel worker threads. Each worker is generic — it executes whichever transform the scheduler assigns it.< / td >
< / tr >
< tr >
< td > < code > capacity< / code > < / td >
< td > Bound on every crossbeam channel in the pipeline (source output, inter-stage channels, worker input, sink input, sink error). Controls memory and back-pressure: a full channel blocks the sender until a slot frees.< / td >
< / tr >
< / tbody >
< / table >
< p > < code > run< / code > consumes < code > self< / code > (all fields are moved into threads). It blocks the calling thread until the pipeline has fully drained — source exhausted and every in-flight item processed by the sink — then joins all threads before returning.< / p >
< h2 id = "data-enum" > Data enum< / h2 >
< p > All pipeline stages communicate through a single user-defined enum:< / p >
< div class = "highlight" > < pre > < span > < / span > < code > < span class = "k" > enum< / span > < span class = "w" > < / span > < span class = "nc" > MyData< / span > < span class = "w" > < / span > < span class = "p" > {< / span >
< span class = "w" > < / span > < span class = "n" > Unsigned< / span > < span class = "p" > (< / span > < span class = "kt" > u64< / span > < span class = "p" > ),< / span >
< span class = "w" > < / span > < span class = "n" > Number< / span > < span class = "p" > (< / span > < span class = "kt" > f64< / span > < span class = "p" > ),< / span >
< span class = "w" > < / span > < span class = "n" > Text< / span > < span class = "p" > (< / span > < span class = "nb" > String< / span > < span class = "p" > ),< / span >
< span class = "p" > }< / span >
< / code > < / pre > < / div >
< p > Each variant carries the concrete type for one stage's output. The macros pattern-match on this enum to route values between stages.< / p >
< h2 id = "macros" > Macros< / h2 >
< p > Six low-level macros build individual stages; one high-level macro (< code > make_pipeline!< / code > ) composes them.< / p >
< h3 id = "low-level" > Low-level< / h3 >
< div class = "highlight" > < pre > < span > < / span > < code > < span class = "n" > make_source< / span > < span class = "o" > !< / span > < span class = "p" > (< / span > < span class = "n" > Enum< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > iterator< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > OutputVariant< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "c1" > // iterator yields T< / span >
< span class = "n" > make_source_fallible< / span > < span class = "o" > !< / span > < span class = "p" > (< / span > < span class = "n" > Enum< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > iterator< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > OutputVariant< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "c1" > // iterator yields Result< T, E> < / span >
< span class = "n" > make_transform< / span > < span class = "o" > !< / span > < span class = "p" > (< / span > < span class = "n" > Enum< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > func< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > InputVariant< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > OutputVariant< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "c1" > // func: T -> U< / span >
< span class = "n" > make_transform_fallible< / span > < span class = "o" > !< / span > < span class = "p" > (< / span > < span class = "n" > Enum< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > func< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > InputVariant< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > OutputVariant< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "c1" > // func: T -> Result< U, E> < / span >
< span class = "n" > make_sink< / span > < span class = "o" > !< / span > < span class = "p" > (< / span > < span class = "n" > Enum< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > func< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > InputVariant< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "c1" > // func: T -> ()< / span >
< span class = "n" > make_sink_fallible< / span > < span class = "o" > !< / span > < span class = "p" > (< / span > < span class = "n" > Enum< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > func< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > InputVariant< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "c1" > // func: T -> Result< (), E> < / span >
< / code > < / pre > < / div >
< p > Each macro wraps the closure in the correct smart pointer (< code > Box< / code > for source/sink, < code > Arc< / code > for transforms).< / p >
< h3 id = "make_pipeline-dsl" > make_pipeline! DSL< / h3 >
< div class = "highlight" > < pre > < span > < / span > < code > make_pipeline! {
DataEnum,
source iterator => OutputVariant, // or source? for fallible
| func: In => Out, // non-fallible transform
|? func: In => Out, // fallible transform
sink func @ InputVariant, // or sink? for fallible
}
< / code > < / pre > < / div >
< p > < code > ?< / code > marks fallibility on source, individual transforms, or sink independently.< br / >
Implemented as a < strong > TT muncher< / strong > : the internal rule < code > @build< / code > recurses over transform tokens one at a time, accumulating them into a < code > vec![]< / code > , then terminates on < code > sink< / code > /< code > sink?< / code > .< / p >
< h2 id = "scheduler-architecture" > Scheduler architecture< / h2 >
< div class = "highlight" > < pre > < span > < / span > < code > Source thread ──► [source_rx] ──► Scheduler ──► [worker_tx] ──► Workers (× N)
▲ │
[stage_rxs] ────────┘◄──────────────────────────────┘
│
[sink_err_rx] ← errors from sink (highest priority)
│
Sink thread
< / code > < / pre > < / div >
< p > The scheduler is a single thread running a biased < code > Select< / code > over all input channels. Priority order (highest first):< / p >
< div class = "highlight" > < pre > < span > < / span > < code > index 0 sink_err_rx abort on sink error
index 1 stage_rxs[N-1] drain last stage first
...
index N stage_rxs[0]
index N+1 source_rx pull new data last
< / code > < / pre > < / div >
< p > This back-pressure-friendly ordering ensures downstream stages are drained before new items enter the pipeline.< / p >
< p > < strong > Workers< / strong > are generic: each receives < code > (data, SharedFn, result_tx)< / code > and calls < code > f(data)< / code > , sending the result to the provided channel. The scheduler decides which transform to apply and where to route the result.< / p >
< p > < strong > Termination< / strong > uses an < code > in_flight< / code > counter:< / p >
< ul >
< li > incremented when an item is dispatched from source to workers< / li >
< li > decremented when the item exits the last stage< / li >
< li > the loop exits only when < code > source_done & & in_flight == 0< / code > < / li >
< / ul >
< p > This guarantees all in-flight items complete before < code > join()< / code > .< / p >
< h2 id = "error-handling" > Error handling< / h2 >
< p > < code > PipelineError< / code > has four variants:< / p >
< table >
< thead >
< tr >
< th > Variant< / th >
< th > Meaning< / th >
< / tr >
< / thead >
< tbody >
< tr >
< td > < code > EndOfStream< / code > < / td >
< td > Source exhausted (normal termination, not sent downstream)< / td >
< / tr >
< tr >
< td > < code > TypeMismatch< / code > < / td >
< td > Wrong enum variant arrived at a stage< / td >
< / tr >
< tr >
< td > < code > StepKindMismatch< / code > < / td >
< td > Internal routing error< / td >
< / tr >
< tr >
< td > < code > StepError(Box< dyn Error> )< / code > < / td >
< td > Error from user code (wrapped by < code > make_*_fallible!< / code > )< / td >
< / tr >
< / tbody >
< / table >
< p > Sink errors flow back to the scheduler via a dedicated < code > Receiver< PipelineError> < / code > registered at index 0 of the Select — the pipeline stops immediately on the first sink error.< / p >
< h2 id = "example" > Example< / h2 >
< div class = "highlight" > < pre > < span > < / span > < code > < span class = "k" > enum< / span > < span class = "w" > < / span > < span class = "nc" > PipelineData< / span > < span class = "w" > < / span > < span class = "p" > {< / span > < span class = "w" > < / span > < span class = "n" > Unsigned< / span > < span class = "p" > (< / span > < span class = "kt" > u64< / span > < span class = "p" > ),< / span > < span class = "w" > < / span > < span class = "n" > Number< / span > < span class = "p" > (< / span > < span class = "kt" > f64< / span > < span class = "p" > ),< / span > < span class = "w" > < / span > < span class = "n" > Text< / span > < span class = "p" > (< / span > < span class = "nb" > String< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > }< / span >
< span class = "k" > fn< / span > < span class = "w" > < / span > < span class = "nf" > to_f64< / span > < span class = "p" > (< / span > < span class = "n" > x< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "kt" > u64< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > -> < / span > < span class = "w" > < / span > < span class = "kt" > f64< / span > < span class = "w" > < / span > < span class = "p" > {< / span > < span class = "w" > < / span > < span class = "n" > x< / span > < span class = "w" > < / span > < span class = "k" > as< / span > < span class = "w" > < / span > < span class = "kt" > f64< / span > < span class = "w" > < / span > < span class = "p" > }< / span >
< span class = "k" > fn< / span > < span class = "w" > < / span > < span class = "nf" > format_num< / span > < span class = "p" > (< / span > < span class = "n" > n< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "kt" > f64< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > -> < / span > < span class = "w" > < / span > < span class = "nb" > String< / span > < span class = "w" > < / span > < span class = "p" > {< / span > < span class = "w" > < / span > < span class = "fm" > format!< / span > < span class = "p" > (< / span > < span class = "s" > " {}" < / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > n< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > }< / span >
< span class = "k" > fn< / span > < span class = "w" > < / span > < span class = "nf" > reverse< / span > < span class = "p" > (< / span > < span class = "n" > s< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "nb" > String< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > -> < / span > < span class = "w" > < / span > < span class = "nb" > String< / span > < span class = "w" > < / span > < span class = "p" > {< / span > < span class = "w" > < / span > < span class = "n" > s< / span > < span class = "p" > .< / span > < span class = "n" > chars< / span > < span class = "p" > ().< / span > < span class = "n" > rev< / span > < span class = "p" > ().< / span > < span class = "n" > collect< / span > < span class = "p" > ()< / span > < span class = "w" > < / span > < span class = "p" > }< / span >
< span class = "k" > fn< / span > < span class = "w" > < / span > < span class = "nf" > hash< / span > < span class = "p" > (< / span > < span class = "n" > s< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "nb" > String< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > -> < / span > < span class = "w" > < / span > < span class = "kt" > u64< / span > < span class = "w" > < / span > < span class = "p" > {< / span > < span class = "w" > < / span > < span class = "cm" > /* djb2 */< / span > < span class = "w" > < / span > < span class = "p" > }< / span >
< span class = "k" > fn< / span > < span class = "w" > < / span > < span class = "nf" > print_hash< / span > < span class = "p" > (< / span > < span class = "n" > h< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "kt" > u64< / span > < span class = "p" > )< / span > < span class = "w" > < / span > < span class = "p" > -> < / span > < span class = "w" > < / span > < span class = "nb" > Result< / span > < span class = "o" > < < / span > < span class = "p" > (),< / span > < span class = "w" > < / span > < span class = "n" > std< / span > < span class = "p" > ::< / span > < span class = "n" > io< / span > < span class = "p" > ::< / span > < span class = "n" > Error< / span > < span class = "o" > > < / span > < span class = "w" > < / span > < span class = "p" > {< / span > < span class = "w" > < / span > < span class = "fm" > println!< / span > < span class = "p" > (< / span > < span class = "s" > " {}" < / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "n" > h< / span > < span class = "p" > );< / span > < span class = "w" > < / span > < span class = "nb" > Ok< / span > < span class = "p" > (())< / span > < span class = "w" > < / span > < span class = "p" > }< / span >
< span class = "kd" > let< / span > < span class = "w" > < / span > < span class = "n" > pipeline< / span > < span class = "w" > < / span > < span class = "o" > =< / span > < span class = "w" > < / span > < span class = "n" > make_pipeline< / span > < span class = "o" > !< / span > < span class = "w" > < / span > < span class = "p" > {< / span >
< span class = "w" > < / span > < span class = "n" > PipelineData< / span > < span class = "p" > ,< / span >
< span class = "w" > < / span > < span class = "n" > source< / span > < span class = "w" > < / span > < span class = "mi" > 1< / span > < span class = "k" > u64< / span > < span class = "o" > ..=< / span > < span class = "mi" > 10< / span > < span class = "w" > < / span > < span class = "o" > => < / span > < span class = "w" > < / span > < span class = "n" > Unsigned< / span > < span class = "p" > ,< / span >
< span class = "w" > < / span > < span class = "o" > |< / span > < span class = "w" > < / span > < span class = "n" > to_f64< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "nc" > Unsigned< / span > < span class = "w" > < / span > < span class = "o" > => < / span > < span class = "w" > < / span > < span class = "n" > Number< / span > < span class = "p" > ,< / span >
< span class = "w" > < / span > < span class = "o" > |< / span > < span class = "w" > < / span > < span class = "n" > format_num< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "nc" > Number< / span > < span class = "w" > < / span > < span class = "o" > => < / span > < span class = "w" > < / span > < span class = "n" > Text< / span > < span class = "p" > ,< / span >
< span class = "w" > < / span > < span class = "o" > |< / span > < span class = "w" > < / span > < span class = "n" > reverse< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "nc" > Text< / span > < span class = "w" > < / span > < span class = "o" > => < / span > < span class = "w" > < / span > < span class = "n" > Text< / span > < span class = "p" > ,< / span >
< span class = "w" > < / span > < span class = "o" > |< / span > < span class = "w" > < / span > < span class = "n" > hash< / span > < span class = "p" > :< / span > < span class = "w" > < / span > < span class = "nc" > Text< / span > < span class = "w" > < / span > < span class = "o" > => < / span > < span class = "w" > < / span > < span class = "n" > Unsigned< / span > < span class = "p" > ,< / span >
< span class = "w" > < / span > < span class = "n" > sink< / span > < span class = "o" > ?< / span > < span class = "w" > < / span > < span class = "n" > print_hash< / span > < span class = "w" > < / span > < span class = "o" > @< / span > < span class = "w" > < / span > < span class = "n" > Unsigned< / span > < span class = "p" > ,< / span >
< span class = "p" > };< / span >
< span class = "n" > WorkerPool< / span > < span class = "p" > ::< / span > < span class = "n" > new< / span > < span class = "p" > (< / span > < span class = "n" > pipeline< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "mi" > 4< / span > < span class = "p" > ,< / span > < span class = "w" > < / span > < span class = "mi" > 64< / span > < span class = "p" > ).< / span > < span class = "n" > run< / span > < span class = "p" > ();< / span >
< / code > < / pre > < / div >
< / article >
< / div >
< script > var target = document . getElementById ( location . hash . slice ( 1 ) ) ; target && target . name && ( target . checked = target . name . startsWith ( "__tabbed_" ) ) < / script >
< / div >
< / main >
< footer class = "md-footer" >
< div class = "md-footer-meta md-typeset" >
< div class = "md-footer-meta__inner md-grid" >
< div class = "md-copyright" >
Made with
< a href = "https://squidfunk.github.io/mkdocs-material/" target = "_blank" rel = "noopener" >
Material for MkDocs
< / a >
< / div >
< / div >
< / div >
< / footer >
< / div >
< div class = "md-dialog" data-md-component = "dialog" >
< div class = "md-dialog__inner md-typeset" > < / div >
< / div >
< script id = "__config" type = "application/json" > { "annotate" : null , "base" : "../.." , "features" : [ ] , "search" : "../../assets/javascripts/workers/search.2c215733.min.js" , "tags" : null , "translations" : { "clipboard.copied" : "Copied to clipboard" , "clipboard.copy" : "Copy to clipboard" , "search.result.more.one" : "1 more on this page" , "search.result.more.other" : "# more on this page" , "search.result.none" : "No matching documents" , "search.result.one" : "1 matching document" , "search.result.other" : "# matching documents" , "search.result.placeholder" : "Type to start searching" , "search.result.term.missing" : "Missing" , "select.version" : "Select version" } , "version" : null } < / script >
< script src = "../../assets/javascripts/bundle.79ae519e.min.js" > < / script >
< script src = "https://unpkg.com/mathjax@3/es5/tex-mml-chtml.js" > < / script >
< / body >
< / html >