From ac41dd8a22c74212e36349f5395a643433a58b9f Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 10 Feb 2026 22:10:22 +0100 Subject: [PATCH] Refactor k-mer matching pipeline with improved concurrency and memory management Refactor k-mer matching to use a pipeline architecture with improved concurrency and memory management: - Replace sort.Slice with slices.SortFunc and cmp.Compare for better performance - Introduce PreparedQueries struct to encapsulate query buckets with metadata - Implement MergeQueries function to merge query buckets from multiple batches - Rewrite MatchBatch to use pre-allocated results and mutexes instead of map-based accumulation - Add seek optimization in matchPartition to reduce linear scanning - Refactor match command to use a multi-stage pipeline with proper batching and merging - Add index directory option for match command - Improve parallel processing of sequence batches This refactoring improves performance by reducing memory allocations, optimizing k-mer lookup, and implementing a more efficient pipeline for large-scale k-mer matching operations. --- pkg/obikmer/kmer_match.go | 154 +++++++++++++++++-------- pkg/obitools/obik/index.go | 34 ++++-- pkg/obitools/obik/match.go | 210 +++++++++++++++++++++++++---------- pkg/obitools/obik/obik.go | 1 + pkg/obitools/obik/options.go | 20 ++++ 5 files changed, 311 insertions(+), 108 deletions(-) diff --git a/pkg/obikmer/kmer_match.go b/pkg/obikmer/kmer_match.go index 69acc06..14e303d 100644 --- a/pkg/obikmer/kmer_match.go +++ b/pkg/obikmer/kmer_match.go @@ -1,7 +1,8 @@ package obikmer import ( - "sort" + "cmp" + "slices" "sync" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" @@ -12,23 +13,73 @@ import ( type QueryEntry struct { Kmer uint64 // canonical k-mer value SeqIdx int // index within the batch - Pos int // 0-based position in the sequence + Pos int // 1-based position in the sequence } -// MatchResult maps sequence index → sorted slice of matched positions. -type MatchResult map[int][]int +// MatchResult holds matched positions for each sequence in a batch. +// results[i] contains the sorted matched positions for sequence i. +type MatchResult [][]int -// seqMatchResult collects matched positions for a single sequence. -type seqMatchResult struct { - mu sync.Mutex - positions []int +// PreparedQueries holds pre-computed query buckets along with the number +// of sequences they were built from. This is used by the accumulation +// pipeline to merge queries from multiple batches. +type PreparedQueries struct { + Buckets [][]QueryEntry // queries[partition], each sorted by Kmer + NSeqs int // number of sequences that produced these queries + NKmers int // total number of k-mer entries across all partitions +} + +// MergeQueries merges src into dst, offsetting all SeqIdx values in src +// by dst.NSeqs. Both dst and src must have the same number of partitions. +// After merging, src should not be reused. +// +// Each partition's entries are merged in sorted order (merge-sort of two +// already-sorted slices). +func MergeQueries(dst, src *PreparedQueries) { + for p := range dst.Buckets { + if len(src.Buckets[p]) == 0 { + continue + } + + offset := dst.NSeqs + srcB := src.Buckets[p] + + // Offset SeqIdx in src entries + for i := range srcB { + srcB[i].SeqIdx += offset + } + + if len(dst.Buckets[p]) == 0 { + dst.Buckets[p] = srcB + continue + } + + // Merge two sorted slices + dstB := dst.Buckets[p] + merged := make([]QueryEntry, 0, len(dstB)+len(srcB)) + i, j := 0, 0 + for i < len(dstB) && j < len(srcB) { + if dstB[i].Kmer <= srcB[j].Kmer { + merged = append(merged, dstB[i]) + i++ + } else { + merged = append(merged, srcB[j]) + j++ + } + } + merged = append(merged, dstB[i:]...) + merged = append(merged, srcB[j:]...) + dst.Buckets[p] = merged + } + dst.NSeqs += src.NSeqs + dst.NKmers += src.NKmers } // PrepareQueries extracts all canonical k-mers from a batch of sequences // and groups them by partition using super-kmer minimizers. // -// Returns queries[partition] where each slice is sorted by Kmer value. -func (ksg *KmerSetGroup) PrepareQueries(sequences []*obiseq.BioSequence) [][]QueryEntry { +// Returns a PreparedQueries with sorted per-partition buckets. +func (ksg *KmerSetGroup) PrepareQueries(sequences []*obiseq.BioSequence) *PreparedQueries { P := ksg.partitions k := ksg.k m := ksg.m @@ -39,6 +90,7 @@ func (ksg *KmerSetGroup) PrepareQueries(sequences []*obiseq.BioSequence) [][]Que buckets[i] = make([]QueryEntry, 0, 64) } + totalKmers := 0 for seqIdx, seq := range sequences { bseq := seq.Sequence() if len(bseq) < k { @@ -60,71 +112,67 @@ func (ksg *KmerSetGroup) PrepareQueries(sequences []*obiseq.BioSequence) [][]Que buckets[partition] = append(buckets[partition], QueryEntry{ Kmer: kmer, SeqIdx: seqIdx, - Pos: sk.Start + localPos, + Pos: sk.Start + localPos + 1, }) localPos++ + totalKmers++ } } } // Sort each bucket by k-mer value for merge-scan for p := range buckets { - sort.Slice(buckets[p], func(i, j int) bool { - return buckets[p][i].Kmer < buckets[p][j].Kmer + slices.SortFunc(buckets[p], func(a, b QueryEntry) int { + return cmp.Compare(a.Kmer, b.Kmer) }) } - return buckets + return &PreparedQueries{ + Buckets: buckets, + NSeqs: len(sequences), + NKmers: totalKmers, + } } // MatchBatch looks up pre-sorted queries against one set of the index. // Partitions are processed in parallel. For each partition, a merge-scan // compares the sorted queries against the sorted KDI stream. // -// Returns a MatchResult mapping sequence index to sorted matched positions. -func (ksg *KmerSetGroup) MatchBatch(setIndex int, queries [][]QueryEntry) MatchResult { +// Returns a MatchResult where result[i] contains sorted matched positions +// for sequence i. +func (ksg *KmerSetGroup) MatchBatch(setIndex int, pq *PreparedQueries) MatchResult { P := ksg.partitions - // Per-sequence result collectors - var resultMu sync.Mutex - resultMap := make(map[int]*seqMatchResult) - - getResult := func(seqIdx int) *seqMatchResult { - resultMu.Lock() - sr, ok := resultMap[seqIdx] - if !ok { - sr = &seqMatchResult{} - resultMap[seqIdx] = sr - } - resultMu.Unlock() - return sr - } + // Pre-allocated per-sequence results and mutexes. + // Each partition goroutine appends to results[seqIdx] with mus[seqIdx] held. + // Contention is low: a sequence's k-mers span many partitions, but each + // partition processes its queries sequentially and the critical section is tiny. + results := make([][]int, pq.NSeqs) + mus := make([]sync.Mutex, pq.NSeqs) var wg sync.WaitGroup for p := 0; p < P; p++ { - if len(queries[p]) == 0 { + if len(pq.Buckets[p]) == 0 { continue } wg.Add(1) go func(part int) { defer wg.Done() - ksg.matchPartition(setIndex, part, queries[part], getResult) + ksg.matchPartition(setIndex, part, pq.Buckets[part], results, mus) }(p) } wg.Wait() - // Build final result with sorted positions - result := make(MatchResult, len(resultMap)) - for seqIdx, sr := range resultMap { - if len(sr.positions) > 0 { - sort.Ints(sr.positions) - result[seqIdx] = sr.positions + // Sort positions within each sequence + for i := range results { + if len(results[i]) > 1 { + slices.Sort(results[i]) } } - return result + return MatchResult(results) } // matchPartition processes one partition: opens the KDI reader (with index), @@ -133,7 +181,8 @@ func (ksg *KmerSetGroup) matchPartition( setIndex int, partIndex int, queries []QueryEntry, // sorted by Kmer - getResult func(int) *seqMatchResult, + results [][]int, + mus []sync.Mutex, ) { r, err := NewKdiIndexedReader(ksg.partitionPath(setIndex, partIndex)) if err != nil { @@ -161,6 +210,23 @@ func (ksg *KmerSetGroup) matchPartition( for qi < len(queries) { q := queries[qi] + // If the next query is far ahead, re-seek instead of linear scan. + // Only seek if we'd skip more k-mers than the index stride, + // otherwise linear scan through the buffer is faster than a syscall. + if r.index != nil && q.Kmer > currentKmer && r.Remaining() > uint64(r.index.stride) { + _, skipCount, found := r.index.FindOffset(q.Kmer) + if found && skipCount > r.read+uint64(r.index.stride) { + if err := r.SeekTo(q.Kmer); err == nil { + nextKmer, nextOk := r.Next() + if !nextOk { + return + } + currentKmer = nextKmer + ok = true + } + } + } + // Advance KDI stream until >= query kmer for currentKmer < q.Kmer { currentKmer, ok = r.Next() @@ -173,10 +239,10 @@ func (ksg *KmerSetGroup) matchPartition( // Match! Record all queries with this same k-mer value matchedKmer := q.Kmer for qi < len(queries) && queries[qi].Kmer == matchedKmer { - sr := getResult(queries[qi].SeqIdx) - sr.mu.Lock() - sr.positions = append(sr.positions, queries[qi].Pos) - sr.mu.Unlock() + idx := queries[qi].SeqIdx + mus[idx].Lock() + results[idx] = append(results[idx], queries[qi].Pos) + mus[idx].Unlock() qi++ } } else { diff --git a/pkg/obitools/obik/index.go b/pkg/obitools/obik/index.go index 3a1a813..71605c2 100644 --- a/pkg/obitools/obik/index.go +++ b/pkg/obitools/obik/index.go @@ -5,9 +5,13 @@ import ( "fmt" "os" "path/filepath" + "sync" + "sync/atomic" log "github.com/sirupsen/logrus" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" "github.com/DavidGamba/go-getoptions" @@ -75,22 +79,36 @@ func runIndex(ctx context.Context, opt *getoptions.GetOpt, args []string) error } } - // Read and process sequences + // Read and process sequences in parallel sequences, err := obiconvert.CLIReadBioSequences(args...) if err != nil { return fmt.Errorf("failed to open sequence files: %w", err) } - seqCount := 0 - for sequences.Next() { - batch := sequences.Get() - for _, seq := range batch.Slice() { - builder.AddSequence(0, seq) - seqCount++ + nworkers := obidefault.ParallelWorkers() + var seqCount atomic.Int64 + var wg sync.WaitGroup + + consumer := func(iter obiiter.IBioSequence) { + defer wg.Done() + for iter.Next() { + batch := iter.Get() + for _, seq := range batch.Slice() { + builder.AddSequence(0, seq) + seqCount.Add(1) + } } } - log.Infof("Processed %d sequences", seqCount) + for i := 1; i < nworkers; i++ { + wg.Add(1) + go consumer(sequences.Split()) + } + wg.Add(1) + go consumer(sequences) + wg.Wait() + + log.Infof("Processed %d sequences", seqCount.Load()) // Finalize ksg, err := builder.Close() diff --git a/pkg/obitools/obik/match.go b/pkg/obitools/obik/match.go index 62fc9b8..2ff51bb 100644 --- a/pkg/obitools/obik/match.go +++ b/pkg/obitools/obik/match.go @@ -3,10 +3,12 @@ package obik import ( "context" "fmt" + "sync" log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" @@ -14,56 +16,47 @@ import ( "github.com/DavidGamba/go-getoptions" ) -// matchSliceWorker creates a SeqSliceWorker that annotates each sequence -// in a batch with k-mer match positions from the index. -// For each set, an attribute "kmer_matched_" is added containing -// a sorted []int of 0-based positions where matched k-mers start. -func matchSliceWorker(ksg *obikmer.KmerSetGroup, setIndices []int) obiseq.SeqSliceWorker { - return func(batch obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { - if len(batch) == 0 { - return batch, nil - } +// defaultMatchQueryThreshold is the minimum number of k-mer entries to +// accumulate before launching a MatchBatch. Larger values amortize the +// cost of opening .kdi files across more query k-mers. +const defaultMatchQueryThreshold = 10_000_000 - // Build slice of *BioSequence for PrepareQueries - seqs := make([]*obiseq.BioSequence, len(batch)) - for i := range batch { - seqs[i] = batch[i] - } +// preparedBatch pairs a batch with its pre-computed queries. +type preparedBatch struct { + batch obiiter.BioSequenceBatch + seqs []*obiseq.BioSequence + queries *obikmer.PreparedQueries +} - // Prepare queries once (shared across sets) - queries := ksg.PrepareQueries(seqs) - - // Match against each selected set - for _, setIdx := range setIndices { - result := ksg.MatchBatch(setIdx, queries) - - setID := ksg.SetIDOf(setIdx) - if setID == "" { - setID = fmt.Sprintf("set_%d", setIdx) - } - attrName := "kmer_matched_" + setID - - for seqIdx, positions := range result { - if len(positions) > 0 { - batch[seqIdx].SetAttribute(attrName, positions) - } - } - } - - return batch, nil - } +// accumulatedWork holds multiple prepared batches whose queries have been +// merged into a single PreparedQueries. The flat seqs slice allows +// MatchBatch results (indexed by merged SeqIdx) to be mapped back to +// the original sequences. +type accumulatedWork struct { + batches []obiiter.BioSequenceBatch // original batches in order + seqs []*obiseq.BioSequence // flat: seqs from all batches concatenated + queries *obikmer.PreparedQueries // merged queries with rebased SeqIdx } // runMatch implements the "obik match" subcommand. -// It reads sequences, looks up their k-mers in a disk-based index, -// and annotates each sequence with match positions. +// +// Pipeline architecture (no shared mutable state between stages): +// +// [input batches] +// │ Split across nCPU goroutines +// ▼ +// PrepareQueries (CPU, parallel) +// │ preparedCh +// ▼ +// Accumulate & MergeQueries (1 goroutine) +// │ matchCh — fires when totalKmers >= threshold +// ▼ +// MatchBatch + annotate (1 goroutine, internal parallelism per partition) +// │ +// ▼ +// [output batches] func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error { - if len(args) < 1 { - return fmt.Errorf("usage: obik match [options] [sequence_files...]") - } - - indexDir := args[0] - seqArgs := args[1:] + indexDir := CLIIndexDirectory() // Open the k-mer index ksg, err := obikmer.OpenKmerSetGroup(indexDir) @@ -86,14 +79,12 @@ func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error return fmt.Errorf("no sets match the given patterns") } } else { - // All sets setIndices = make([]int, ksg.Size()) for i := range setIndices { setIndices[i] = i } } - // Log which sets we'll match for _, idx := range setIndices { id := ksg.SetIDOf(idx) if id == "" { @@ -102,21 +93,128 @@ func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error log.Infof("Matching against set %d (%s): %d k-mers", idx, id, ksg.Len(idx)) } - // Read sequences - sequences, err := obiconvert.CLIReadBioSequences(seqArgs...) + // Read input sequences + sequences, err := obiconvert.CLIReadBioSequences(args...) if err != nil { return fmt.Errorf("failed to open sequence files: %w", err) } - // Apply the batch worker - worker := matchSliceWorker(ksg, setIndices) - matched := sequences.MakeISliceWorker( - worker, - false, - obidefault.ParallelWorkers(), - ) + nworkers := obidefault.ParallelWorkers() - obiconvert.CLIWriteBioSequences(matched, true) + // --- Stage 1: Prepare queries in parallel --- + preparedCh := make(chan preparedBatch, nworkers) + + var prepWg sync.WaitGroup + preparer := func(iter obiiter.IBioSequence) { + defer prepWg.Done() + for iter.Next() { + batch := iter.Get() + slice := batch.Slice() + + seqs := make([]*obiseq.BioSequence, len(slice)) + for i, s := range slice { + seqs[i] = s + } + + pq := ksg.PrepareQueries(seqs) + + preparedCh <- preparedBatch{ + batch: batch, + seqs: seqs, + queries: pq, + } + } + } + + for i := 1; i < nworkers; i++ { + prepWg.Add(1) + go preparer(sequences.Split()) + } + prepWg.Add(1) + go preparer(sequences) + + go func() { + prepWg.Wait() + close(preparedCh) + }() + + // --- Stage 2: Accumulate & merge queries --- + matchCh := make(chan *accumulatedWork, 2) + + go func() { + defer close(matchCh) + + var acc *accumulatedWork + + for pb := range preparedCh { + if acc == nil { + acc = &accumulatedWork{ + batches: []obiiter.BioSequenceBatch{pb.batch}, + seqs: pb.seqs, + queries: pb.queries, + } + } else { + // Merge this batch's queries into the accumulator + obikmer.MergeQueries(acc.queries, pb.queries) + acc.batches = append(acc.batches, pb.batch) + acc.seqs = append(acc.seqs, pb.seqs...) + } + + // Flush when we exceed the threshold + if acc.queries.NKmers >= defaultMatchQueryThreshold { + matchCh <- acc + acc = nil + } + } + + // Flush remaining + if acc != nil { + matchCh <- acc + } + }() + + // --- Stage 3: Match & annotate --- + output := obiiter.MakeIBioSequence() + if sequences.IsPaired() { + output.MarkAsPaired() + } + + output.Add(1) + go func() { + defer output.Done() + + for work := range matchCh { + // Match against each selected set + for _, setIdx := range setIndices { + result := ksg.MatchBatch(setIdx, work.queries) + + setID := ksg.SetIDOf(setIdx) + if setID == "" { + setID = fmt.Sprintf("set_%d", setIdx) + } + attrName := "kmer_matched_" + setID + + for seqIdx, positions := range result { + if len(positions) > 0 { + work.seqs[seqIdx].SetAttribute(attrName, positions) + } + } + } + + // Push annotated batches to output + for _, b := range work.batches { + output.Push(b) + } + + // Help GC + work.seqs = nil + work.queries = nil + } + }() + + go output.WaitAndClose() + + obiconvert.CLIWriteBioSequences(output, true) obiutils.WaitForLastPipe() return nil diff --git a/pkg/obitools/obik/obik.go b/pkg/obitools/obik/obik.go index 5fc0176..28441d4 100644 --- a/pkg/obitools/obik/obik.go +++ b/pkg/obitools/obik/obik.go @@ -70,6 +70,7 @@ func OptionSet(opt *getoptions.GetOpt) { // match: annotate sequences with k-mer match positions from an index matchCmd := opt.NewCommand("match", "Annotate sequences with k-mer match positions from an index") + IndexDirectoryOptionSet(matchCmd) obiconvert.InputOptionSet(matchCmd) obiconvert.OutputOptionSet(matchCmd) SetSelectionOptionSet(matchCmd) diff --git a/pkg/obitools/obik/options.go b/pkg/obitools/obik/options.go index 608ab76..5ad6d1d 100644 --- a/pkg/obitools/obik/options.go +++ b/pkg/obitools/obik/options.go @@ -280,6 +280,26 @@ func CLIKeepShorter() bool { return _keepShorter } +// ============================== +// Match-specific options +// ============================== + +var _indexDirectory = "" + +// IndexDirectoryOptionSet registers --index / -i (mandatory directory for match). +func IndexDirectoryOptionSet(options *getoptions.GetOpt) { + options.StringVar(&_indexDirectory, "index", _indexDirectory, + options.Alias("i"), + options.Required(), + options.ArgName("DIRECTORY"), + options.Description("Path to the kmer index directory.")) +} + +// CLIIndexDirectory returns the --index directory path. +func CLIIndexDirectory() string { + return _indexDirectory +} + // CLIIndexEntropyThreshold returns the entropy filter threshold for index building (0 = disabled). func CLIIndexEntropyThreshold() float64 { return _indexEntropyThreshold