2026-02-10 13:23:56 +01:00
|
|
|
package obikmer
|
|
|
|
|
|
|
|
|
|
import (
|
2026-02-10 22:10:22 +01:00
|
|
|
"cmp"
|
|
|
|
|
"slices"
|
2026-02-10 13:23:56 +01:00
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// QueryEntry represents a canonical k-mer to look up, together with
|
|
|
|
|
// metadata to trace the result back to the originating sequence and position.
|
|
|
|
|
type QueryEntry struct {
|
|
|
|
|
Kmer uint64 // canonical k-mer value
|
|
|
|
|
SeqIdx int // index within the batch
|
2026-02-10 22:10:22 +01:00
|
|
|
Pos int // 1-based position in the sequence
|
2026-02-10 13:23:56 +01:00
|
|
|
}
|
|
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
// MatchResult holds matched positions for each sequence in a batch.
|
|
|
|
|
// results[i] contains the sorted matched positions for sequence i.
|
|
|
|
|
type MatchResult [][]int
|
2026-02-10 13:23:56 +01:00
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
// PreparedQueries holds pre-computed query buckets along with the number
|
|
|
|
|
// of sequences they were built from. This is used by the accumulation
|
|
|
|
|
// pipeline to merge queries from multiple batches.
|
|
|
|
|
type PreparedQueries struct {
|
|
|
|
|
Buckets [][]QueryEntry // queries[partition], each sorted by Kmer
|
|
|
|
|
NSeqs int // number of sequences that produced these queries
|
|
|
|
|
NKmers int // total number of k-mer entries across all partitions
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MergeQueries merges src into dst, offsetting all SeqIdx values in src
|
|
|
|
|
// by dst.NSeqs. Both dst and src must have the same number of partitions.
|
|
|
|
|
// After merging, src should not be reused.
|
|
|
|
|
//
|
|
|
|
|
// Each partition's entries are merged in sorted order (merge-sort of two
|
|
|
|
|
// already-sorted slices).
|
|
|
|
|
func MergeQueries(dst, src *PreparedQueries) {
|
|
|
|
|
for p := range dst.Buckets {
|
|
|
|
|
if len(src.Buckets[p]) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
offset := dst.NSeqs
|
|
|
|
|
srcB := src.Buckets[p]
|
|
|
|
|
|
|
|
|
|
// Offset SeqIdx in src entries
|
|
|
|
|
for i := range srcB {
|
|
|
|
|
srcB[i].SeqIdx += offset
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(dst.Buckets[p]) == 0 {
|
|
|
|
|
dst.Buckets[p] = srcB
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Merge two sorted slices
|
|
|
|
|
dstB := dst.Buckets[p]
|
|
|
|
|
merged := make([]QueryEntry, 0, len(dstB)+len(srcB))
|
|
|
|
|
i, j := 0, 0
|
|
|
|
|
for i < len(dstB) && j < len(srcB) {
|
|
|
|
|
if dstB[i].Kmer <= srcB[j].Kmer {
|
|
|
|
|
merged = append(merged, dstB[i])
|
|
|
|
|
i++
|
|
|
|
|
} else {
|
|
|
|
|
merged = append(merged, srcB[j])
|
|
|
|
|
j++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
merged = append(merged, dstB[i:]...)
|
|
|
|
|
merged = append(merged, srcB[j:]...)
|
|
|
|
|
dst.Buckets[p] = merged
|
|
|
|
|
}
|
|
|
|
|
dst.NSeqs += src.NSeqs
|
|
|
|
|
dst.NKmers += src.NKmers
|
2026-02-10 13:23:56 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// PrepareQueries extracts all canonical k-mers from a batch of sequences
|
|
|
|
|
// and groups them by partition using super-kmer minimizers.
|
|
|
|
|
//
|
2026-02-10 22:10:22 +01:00
|
|
|
// Returns a PreparedQueries with sorted per-partition buckets.
|
|
|
|
|
func (ksg *KmerSetGroup) PrepareQueries(sequences []*obiseq.BioSequence) *PreparedQueries {
|
2026-02-10 13:23:56 +01:00
|
|
|
P := ksg.partitions
|
|
|
|
|
k := ksg.k
|
|
|
|
|
m := ksg.m
|
|
|
|
|
|
|
|
|
|
// Pre-allocate partition buckets
|
|
|
|
|
buckets := make([][]QueryEntry, P)
|
|
|
|
|
for i := range buckets {
|
|
|
|
|
buckets[i] = make([]QueryEntry, 0, 64)
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
totalKmers := 0
|
2026-02-10 13:23:56 +01:00
|
|
|
for seqIdx, seq := range sequences {
|
|
|
|
|
bseq := seq.Sequence()
|
|
|
|
|
if len(bseq) < k {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Iterate super-kmers to get minimizer → partition mapping
|
|
|
|
|
for sk := range IterSuperKmers(bseq, k, m) {
|
|
|
|
|
partition := int(sk.Minimizer % uint64(P))
|
|
|
|
|
|
|
|
|
|
// Iterate canonical k-mers within this super-kmer
|
|
|
|
|
skSeq := sk.Sequence
|
|
|
|
|
if len(skSeq) < k {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
localPos := 0
|
|
|
|
|
for kmer := range IterCanonicalKmers(skSeq, k) {
|
|
|
|
|
buckets[partition] = append(buckets[partition], QueryEntry{
|
|
|
|
|
Kmer: kmer,
|
|
|
|
|
SeqIdx: seqIdx,
|
2026-02-10 22:10:22 +01:00
|
|
|
Pos: sk.Start + localPos + 1,
|
2026-02-10 13:23:56 +01:00
|
|
|
})
|
|
|
|
|
localPos++
|
2026-02-10 22:10:22 +01:00
|
|
|
totalKmers++
|
2026-02-10 13:23:56 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Sort each bucket by k-mer value for merge-scan
|
|
|
|
|
for p := range buckets {
|
2026-02-10 22:10:22 +01:00
|
|
|
slices.SortFunc(buckets[p], func(a, b QueryEntry) int {
|
|
|
|
|
return cmp.Compare(a.Kmer, b.Kmer)
|
2026-02-10 13:23:56 +01:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
return &PreparedQueries{
|
|
|
|
|
Buckets: buckets,
|
|
|
|
|
NSeqs: len(sequences),
|
|
|
|
|
NKmers: totalKmers,
|
|
|
|
|
}
|
2026-02-10 13:23:56 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MatchBatch looks up pre-sorted queries against one set of the index.
|
|
|
|
|
// Partitions are processed in parallel. For each partition, a merge-scan
|
|
|
|
|
// compares the sorted queries against the sorted KDI stream.
|
|
|
|
|
//
|
2026-02-10 22:10:22 +01:00
|
|
|
// Returns a MatchResult where result[i] contains sorted matched positions
|
|
|
|
|
// for sequence i.
|
|
|
|
|
func (ksg *KmerSetGroup) MatchBatch(setIndex int, pq *PreparedQueries) MatchResult {
|
2026-02-10 13:23:56 +01:00
|
|
|
P := ksg.partitions
|
|
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
// Pre-allocated per-sequence results and mutexes.
|
|
|
|
|
// Each partition goroutine appends to results[seqIdx] with mus[seqIdx] held.
|
|
|
|
|
// Contention is low: a sequence's k-mers span many partitions, but each
|
|
|
|
|
// partition processes its queries sequentially and the critical section is tiny.
|
|
|
|
|
results := make([][]int, pq.NSeqs)
|
|
|
|
|
mus := make([]sync.Mutex, pq.NSeqs)
|
2026-02-10 13:23:56 +01:00
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
|
|
for p := 0; p < P; p++ {
|
2026-02-10 22:10:22 +01:00
|
|
|
if len(pq.Buckets[p]) == 0 {
|
2026-02-10 13:23:56 +01:00
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func(part int) {
|
|
|
|
|
defer wg.Done()
|
2026-02-10 22:10:22 +01:00
|
|
|
ksg.matchPartition(setIndex, part, pq.Buckets[part], results, mus)
|
2026-02-10 13:23:56 +01:00
|
|
|
}(p)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
// Sort positions within each sequence
|
|
|
|
|
for i := range results {
|
|
|
|
|
if len(results[i]) > 1 {
|
|
|
|
|
slices.Sort(results[i])
|
2026-02-10 13:23:56 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
return MatchResult(results)
|
2026-02-10 13:23:56 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// matchPartition processes one partition: opens the KDI reader (with index),
|
|
|
|
|
// seeks to the first query, then merge-scans queries against the KDI stream.
|
|
|
|
|
func (ksg *KmerSetGroup) matchPartition(
|
|
|
|
|
setIndex int,
|
|
|
|
|
partIndex int,
|
|
|
|
|
queries []QueryEntry, // sorted by Kmer
|
2026-02-10 22:10:22 +01:00
|
|
|
results [][]int,
|
|
|
|
|
mus []sync.Mutex,
|
2026-02-10 13:23:56 +01:00
|
|
|
) {
|
|
|
|
|
r, err := NewKdiIndexedReader(ksg.partitionPath(setIndex, partIndex))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
defer r.Close()
|
|
|
|
|
|
|
|
|
|
if r.Count() == 0 || len(queries) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Seek to the first query's neighborhood
|
|
|
|
|
if err := r.SeekTo(queries[0].Kmer); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read first kmer from the stream after seek
|
|
|
|
|
currentKmer, ok := r.Next()
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qi := 0 // query index
|
|
|
|
|
|
|
|
|
|
for qi < len(queries) {
|
|
|
|
|
q := queries[qi]
|
|
|
|
|
|
2026-02-10 22:10:22 +01:00
|
|
|
// If the next query is far ahead, re-seek instead of linear scan.
|
|
|
|
|
// Only seek if we'd skip more k-mers than the index stride,
|
|
|
|
|
// otherwise linear scan through the buffer is faster than a syscall.
|
|
|
|
|
if r.index != nil && q.Kmer > currentKmer && r.Remaining() > uint64(r.index.stride) {
|
|
|
|
|
_, skipCount, found := r.index.FindOffset(q.Kmer)
|
|
|
|
|
if found && skipCount > r.read+uint64(r.index.stride) {
|
|
|
|
|
if err := r.SeekTo(q.Kmer); err == nil {
|
|
|
|
|
nextKmer, nextOk := r.Next()
|
|
|
|
|
if !nextOk {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
currentKmer = nextKmer
|
|
|
|
|
ok = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 13:23:56 +01:00
|
|
|
// Advance KDI stream until >= query kmer
|
|
|
|
|
for currentKmer < q.Kmer {
|
|
|
|
|
currentKmer, ok = r.Next()
|
|
|
|
|
if !ok {
|
|
|
|
|
return // KDI exhausted
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if currentKmer == q.Kmer {
|
|
|
|
|
// Match! Record all queries with this same k-mer value
|
|
|
|
|
matchedKmer := q.Kmer
|
|
|
|
|
for qi < len(queries) && queries[qi].Kmer == matchedKmer {
|
2026-02-10 22:10:22 +01:00
|
|
|
idx := queries[qi].SeqIdx
|
|
|
|
|
mus[idx].Lock()
|
|
|
|
|
results[idx] = append(results[idx], queries[qi].Pos)
|
|
|
|
|
mus[idx].Unlock()
|
2026-02-10 13:23:56 +01:00
|
|
|
qi++
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// currentKmer > q.Kmer: skip all queries with this kmer value
|
|
|
|
|
skippedKmer := q.Kmer
|
|
|
|
|
for qi < len(queries) && queries[qi].Kmer == skippedKmer {
|
|
|
|
|
qi++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|