mirror of
https://github.com/metabarcoding/obitools4.git
synced 2026-04-30 12:00:39 +00:00
8c7017a99d
- Update obioptions.Version from "Release 4.4.29" to "/v/ Release v5" - Update version.txt from 4.29 → .30 (automated by Makefile)
3.4 KiB
3.4 KiB
obiitercsv: CSV Record Iterator for Streaming and Batch Processing
A Go package providing a thread-safe, channel-based iterator (ICSVRecord) for efficient streaming and batch processing of CSV data. Designed with scalability in mind—especially for bioinformatics pipelines like OBITools4—it enables ordered, concurrent handling of large CSV files without loading all records into memory.
Core Concepts
CSVHeader: A[]stringrepresenting column names; used to define the schema of records.CSVRecord: Amap[string]interface{}mapping field names to values, supporting flexible typed data.CSVRecordBatch: A structured batch of records ([]*CSVRecord) enriched with metadata:source: origin identifier (e.g., file or shard name),order: sequence index for deterministic reassembly,data: the slice of records.
Iterator Interface (ICSVRecord)
Implements a standard iterator protocol over batches via an unbuffered channel:
Next() bool: Advances to the next batch; returnsfalsewhen exhausted.Get() *CSVRecordBatch: Retrieves the current batch (nil-safe).PushBack(): Requeues the last retrieved batch for reprocessing—useful in error recovery or conditional branching.Channel() <-chan *CSVRecordBatch: Exposes the internal channel for external consumption.
Thread-Safe Operations
- All shared state (e.g., batch queue, flags) is guarded by a
sync.RWMutex. - Atomic operations (
atomic.Bool,int32) are used for lightweight flags likefinishedand counters such asbatch_size. - Methods ensure safe concurrent access across multiple goroutines.
Header Management
Supports dynamic schema evolution:
SetHeader(header CSVHeader): Sets or replaces the header (must be called before firstNext()).AppendField(name string, value interface{}) bool: Adds a new field to the current record (returnsfalseif no active batch or header mismatch).
Batch Lifecycle Control
Add()/Done(): Track active producer/consumer goroutines using async.WaitGroup.WaitAndClose(): Blocks until all tracked goroutines complete, then closes the output channel—ensuring no data loss.
Utility & Validation
NotEmpty(batch *CSVRecordBatch) bool: Returnstrueif the batch is non-nil and contains ≥1 record.IsNil(batch *CSVRecordBatch) bool: Returnstrueif the batch is nil.Consume(iterator ICSVRecord, fn func(*CSVRecordBatch)): Drains the iterator by applyingfnto each batch—ideal for side-effect processing (e.g., writing, aggregation).
Ordering & Recovery
SortBatches(batches []*CSVRecordBatch) [](*CSVRecordBatch): Reorders batches byorder, buffering out-of-sequence items until missing predecessors arrive—critical for reconstructing global order in distributed or parallel pipelines.
Splitting & Sharing
Split() ICSVRecord: Creates a new iterator instance sharing the same underlying channel but with independent locking—enables fan-out patterns without duplicating data.
Design Goals
- Memory efficiency: Processes records in streaming batches, avoiding full-file loads.
- Deterministic ordering: Supports reconstruction of sequential order despite concurrent delivery.
- Robustness: Graceful handling of race conditions, nil states, and partial batches.
Intended for high-throughput CSV pipelines where correctness, concurrency safety, and low latency are paramount.