Refactor k-mer matching pipeline with improved concurrency and memory management

Refactor k-mer matching to use a pipeline architecture with improved concurrency and memory management:

- Replace sort.Slice with slices.SortFunc and cmp.Compare for better performance
- Introduce PreparedQueries struct to encapsulate query buckets with metadata
- Implement MergeQueries function to merge query buckets from multiple batches
- Rewrite MatchBatch to use pre-allocated results and mutexes instead of map-based accumulation
- Add seek optimization in matchPartition to reduce linear scanning
- Refactor match command to use a multi-stage pipeline with proper batching and merging
- Add index directory option for match command
- Improve parallel processing of sequence batches

This refactoring improves performance by reducing memory allocations, optimizing k-mer lookup, and implementing a more efficient pipeline for large-scale k-mer matching operations.
This commit is contained in:
Eric Coissac
2026-02-10 22:10:22 +01:00
parent bebbbbfe7d
commit ac41dd8a22
5 changed files with 311 additions and 108 deletions

View File

@@ -3,10 +3,12 @@ package obik
import (
"context"
"fmt"
"sync"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
@@ -14,56 +16,47 @@ import (
"github.com/DavidGamba/go-getoptions"
)
// matchSliceWorker creates a SeqSliceWorker that annotates each sequence
// in a batch with k-mer match positions from the index.
// For each set, an attribute "kmer_matched_<setID>" is added containing
// a sorted []int of 0-based positions where matched k-mers start.
func matchSliceWorker(ksg *obikmer.KmerSetGroup, setIndices []int) obiseq.SeqSliceWorker {
return func(batch obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) {
if len(batch) == 0 {
return batch, nil
}
// defaultMatchQueryThreshold is the minimum number of k-mer entries to
// accumulate before launching a MatchBatch. Larger values amortize the
// cost of opening .kdi files across more query k-mers.
const defaultMatchQueryThreshold = 10_000_000
// Build slice of *BioSequence for PrepareQueries
seqs := make([]*obiseq.BioSequence, len(batch))
for i := range batch {
seqs[i] = batch[i]
}
// preparedBatch pairs a batch with its pre-computed queries.
type preparedBatch struct {
batch obiiter.BioSequenceBatch
seqs []*obiseq.BioSequence
queries *obikmer.PreparedQueries
}
// Prepare queries once (shared across sets)
queries := ksg.PrepareQueries(seqs)
// Match against each selected set
for _, setIdx := range setIndices {
result := ksg.MatchBatch(setIdx, queries)
setID := ksg.SetIDOf(setIdx)
if setID == "" {
setID = fmt.Sprintf("set_%d", setIdx)
}
attrName := "kmer_matched_" + setID
for seqIdx, positions := range result {
if len(positions) > 0 {
batch[seqIdx].SetAttribute(attrName, positions)
}
}
}
return batch, nil
}
// accumulatedWork holds multiple prepared batches whose queries have been
// merged into a single PreparedQueries. The flat seqs slice allows
// MatchBatch results (indexed by merged SeqIdx) to be mapped back to
// the original sequences.
type accumulatedWork struct {
batches []obiiter.BioSequenceBatch // original batches in order
seqs []*obiseq.BioSequence // flat: seqs from all batches concatenated
queries *obikmer.PreparedQueries // merged queries with rebased SeqIdx
}
// runMatch implements the "obik match" subcommand.
// It reads sequences, looks up their k-mers in a disk-based index,
// and annotates each sequence with match positions.
//
// Pipeline architecture (no shared mutable state between stages):
//
// [input batches]
// │ Split across nCPU goroutines
// ▼
// PrepareQueries (CPU, parallel)
// │ preparedCh
// ▼
// Accumulate & MergeQueries (1 goroutine)
// │ matchCh — fires when totalKmers >= threshold
// ▼
// MatchBatch + annotate (1 goroutine, internal parallelism per partition)
// │
// ▼
// [output batches]
func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error {
if len(args) < 1 {
return fmt.Errorf("usage: obik match [options] <index_directory> [sequence_files...]")
}
indexDir := args[0]
seqArgs := args[1:]
indexDir := CLIIndexDirectory()
// Open the k-mer index
ksg, err := obikmer.OpenKmerSetGroup(indexDir)
@@ -86,14 +79,12 @@ func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error
return fmt.Errorf("no sets match the given patterns")
}
} else {
// All sets
setIndices = make([]int, ksg.Size())
for i := range setIndices {
setIndices[i] = i
}
}
// Log which sets we'll match
for _, idx := range setIndices {
id := ksg.SetIDOf(idx)
if id == "" {
@@ -102,21 +93,128 @@ func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error
log.Infof("Matching against set %d (%s): %d k-mers", idx, id, ksg.Len(idx))
}
// Read sequences
sequences, err := obiconvert.CLIReadBioSequences(seqArgs...)
// Read input sequences
sequences, err := obiconvert.CLIReadBioSequences(args...)
if err != nil {
return fmt.Errorf("failed to open sequence files: %w", err)
}
// Apply the batch worker
worker := matchSliceWorker(ksg, setIndices)
matched := sequences.MakeISliceWorker(
worker,
false,
obidefault.ParallelWorkers(),
)
nworkers := obidefault.ParallelWorkers()
obiconvert.CLIWriteBioSequences(matched, true)
// --- Stage 1: Prepare queries in parallel ---
preparedCh := make(chan preparedBatch, nworkers)
var prepWg sync.WaitGroup
preparer := func(iter obiiter.IBioSequence) {
defer prepWg.Done()
for iter.Next() {
batch := iter.Get()
slice := batch.Slice()
seqs := make([]*obiseq.BioSequence, len(slice))
for i, s := range slice {
seqs[i] = s
}
pq := ksg.PrepareQueries(seqs)
preparedCh <- preparedBatch{
batch: batch,
seqs: seqs,
queries: pq,
}
}
}
for i := 1; i < nworkers; i++ {
prepWg.Add(1)
go preparer(sequences.Split())
}
prepWg.Add(1)
go preparer(sequences)
go func() {
prepWg.Wait()
close(preparedCh)
}()
// --- Stage 2: Accumulate & merge queries ---
matchCh := make(chan *accumulatedWork, 2)
go func() {
defer close(matchCh)
var acc *accumulatedWork
for pb := range preparedCh {
if acc == nil {
acc = &accumulatedWork{
batches: []obiiter.BioSequenceBatch{pb.batch},
seqs: pb.seqs,
queries: pb.queries,
}
} else {
// Merge this batch's queries into the accumulator
obikmer.MergeQueries(acc.queries, pb.queries)
acc.batches = append(acc.batches, pb.batch)
acc.seqs = append(acc.seqs, pb.seqs...)
}
// Flush when we exceed the threshold
if acc.queries.NKmers >= defaultMatchQueryThreshold {
matchCh <- acc
acc = nil
}
}
// Flush remaining
if acc != nil {
matchCh <- acc
}
}()
// --- Stage 3: Match & annotate ---
output := obiiter.MakeIBioSequence()
if sequences.IsPaired() {
output.MarkAsPaired()
}
output.Add(1)
go func() {
defer output.Done()
for work := range matchCh {
// Match against each selected set
for _, setIdx := range setIndices {
result := ksg.MatchBatch(setIdx, work.queries)
setID := ksg.SetIDOf(setIdx)
if setID == "" {
setID = fmt.Sprintf("set_%d", setIdx)
}
attrName := "kmer_matched_" + setID
for seqIdx, positions := range result {
if len(positions) > 0 {
work.seqs[seqIdx].SetAttribute(attrName, positions)
}
}
}
// Push annotated batches to output
for _, b := range work.batches {
output.Push(b)
}
// Help GC
work.seqs = nil
work.queries = nil
}
}()
go output.WaitAndClose()
obiconvert.CLIWriteBioSequences(output, true)
obiutils.WaitForLastPipe()
return nil