mirror of
https://github.com/metabarcoding/obitools4.git
synced 2026-03-25 13:30:52 +00:00
Add sparse index support for KDI files with fast seeking
This commit introduces sparse index support for KDI files to enable fast random access during k-mer matching. It adds a new .kdx index file format and updates the KDI reader and writer to handle index creation and seeking. The changes include: - New KdxIndex struct and related functions for loading, searching, and writing .kdx files - Modified KdiReader to support seeking with the new index - Updated KdiWriter to create .kdx index files during writing - Enhanced KmerSetGroup.Contains to use the new index for faster lookups - Added a new 'match' command to annotate sequences with k-mer match positions The index is created automatically during KDI file creation and allows for O(log N / stride) binary search followed by at most stride linear scan steps, significantly improving performance for large datasets.
This commit is contained in:
@@ -16,10 +16,29 @@ type KdiReader struct {
|
|||||||
read uint64 // number of k-mers already consumed
|
read uint64 // number of k-mers already consumed
|
||||||
prev uint64 // last decoded value
|
prev uint64 // last decoded value
|
||||||
started bool // whether first value has been read
|
started bool // whether first value has been read
|
||||||
|
index *KdxIndex // optional sparse index for seeking
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKdiReader opens a .kdi file for streaming reading.
|
// NewKdiReader opens a .kdi file for streaming reading (no index).
|
||||||
func NewKdiReader(path string) (*KdiReader, error) {
|
func NewKdiReader(path string) (*KdiReader, error) {
|
||||||
|
return openKdiReader(path, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewKdiIndexedReader opens a .kdi file with its companion .kdx index
|
||||||
|
// loaded for fast seeking. If the .kdx file does not exist, it gracefully
|
||||||
|
// falls back to sequential reading.
|
||||||
|
func NewKdiIndexedReader(path string) (*KdiReader, error) {
|
||||||
|
kdxPath := KdxPathForKdi(path)
|
||||||
|
idx, err := LoadKdxIndex(kdxPath)
|
||||||
|
if err != nil {
|
||||||
|
// Index load failed — fall back to non-indexed
|
||||||
|
return openKdiReader(path, nil)
|
||||||
|
}
|
||||||
|
// idx may be nil if file does not exist — that's fine
|
||||||
|
return openKdiReader(path, idx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func openKdiReader(path string, idx *KdxIndex) (*KdiReader, error) {
|
||||||
f, err := os.Open(path)
|
f, err := os.Open(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -49,6 +68,7 @@ func NewKdiReader(path string) (*KdiReader, error) {
|
|||||||
r: r,
|
r: r,
|
||||||
file: f,
|
file: f,
|
||||||
count: count,
|
count: count,
|
||||||
|
index: idx,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,6 +100,60 @@ func (kr *KdiReader) Next() (uint64, bool) {
|
|||||||
return kr.prev, true
|
return kr.prev, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SeekTo positions the reader near the target k-mer using the sparse .kdx index.
|
||||||
|
// After SeekTo, the reader is positioned so that the next call to Next()
|
||||||
|
// returns the k-mer immediately after the indexed entry at or before target.
|
||||||
|
//
|
||||||
|
// If the reader has no index, or the target is before the current position,
|
||||||
|
// SeekTo does nothing (linear scan continues from current position).
|
||||||
|
func (kr *KdiReader) SeekTo(target uint64) error {
|
||||||
|
if kr.index == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we've already passed the target, we can't seek backwards
|
||||||
|
if kr.started && kr.prev >= target {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
offset, skipCount, ok := kr.index.FindOffset(target)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// skipCount is the number of k-mers consumed at the indexed position.
|
||||||
|
// The index was recorded AFTER writing the k-mer at position skipCount-1
|
||||||
|
// (since count%stride==0 after incrementing count). So the actual number
|
||||||
|
// of k-mers consumed is skipCount (the entry's kmer is the last one
|
||||||
|
// before the offset).
|
||||||
|
|
||||||
|
// Only seek if it would skip significant work
|
||||||
|
if kr.started && skipCount <= kr.read {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The index entry stores (kmer_value, byte_offset_after_that_kmer).
|
||||||
|
// skipCount = (entryIdx+1)*stride, so entryIdx = skipCount/stride - 1
|
||||||
|
// We seek to that offset, set prev = indexedKmer, and the next Next()
|
||||||
|
// call will read the delta-varint of the following k-mer.
|
||||||
|
entryIdx := int(skipCount)/kr.index.stride - 1
|
||||||
|
if entryIdx < 0 || entryIdx >= len(kr.index.entries) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
indexedKmer := kr.index.entries[entryIdx].kmer
|
||||||
|
|
||||||
|
if _, err := kr.file.Seek(int64(offset), io.SeekStart); err != nil {
|
||||||
|
return fmt.Errorf("kdi: seek: %w", err)
|
||||||
|
}
|
||||||
|
kr.r.Reset(kr.file)
|
||||||
|
|
||||||
|
kr.prev = indexedKmer
|
||||||
|
kr.started = true
|
||||||
|
kr.read = skipCount
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Count returns the total number of k-mers in this partition.
|
// Count returns the total number of k-mers in this partition.
|
||||||
func (kr *KdiReader) Count() uint64 {
|
func (kr *KdiReader) Count() uint64 {
|
||||||
return kr.count
|
return kr.count
|
||||||
|
|||||||
@@ -9,6 +9,9 @@ import (
|
|||||||
// KDI file magic bytes: "KDI\x01"
|
// KDI file magic bytes: "KDI\x01"
|
||||||
var kdiMagic = [4]byte{'K', 'D', 'I', 0x01}
|
var kdiMagic = [4]byte{'K', 'D', 'I', 0x01}
|
||||||
|
|
||||||
|
// kdiHeaderSize is the size of the KDI header: magic(4) + count(8) = 12 bytes.
|
||||||
|
const kdiHeaderSize = 12
|
||||||
|
|
||||||
// KdiWriter writes a sorted sequence of uint64 k-mers to a .kdi file
|
// KdiWriter writes a sorted sequence of uint64 k-mers to a .kdi file
|
||||||
// using delta-varint encoding.
|
// using delta-varint encoding.
|
||||||
//
|
//
|
||||||
@@ -22,6 +25,9 @@ var kdiMagic = [4]byte{'K', 'D', 'I', 0x01}
|
|||||||
// ...
|
// ...
|
||||||
//
|
//
|
||||||
// The caller must write k-mers in strictly increasing order.
|
// The caller must write k-mers in strictly increasing order.
|
||||||
|
//
|
||||||
|
// On Close(), a companion .kdx sparse index file is written alongside
|
||||||
|
// the .kdi file for fast random access.
|
||||||
type KdiWriter struct {
|
type KdiWriter struct {
|
||||||
w *bufio.Writer
|
w *bufio.Writer
|
||||||
file *os.File
|
file *os.File
|
||||||
@@ -29,6 +35,8 @@ type KdiWriter struct {
|
|||||||
prev uint64
|
prev uint64
|
||||||
first bool
|
first bool
|
||||||
path string
|
path string
|
||||||
|
bytesWritten uint64 // bytes written after header (data section offset)
|
||||||
|
indexEntries []kdxEntry // sparse index entries collected during writes
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKdiWriter creates a new KdiWriter writing to the given file path.
|
// NewKdiWriter creates a new KdiWriter writing to the given file path.
|
||||||
@@ -58,6 +66,8 @@ func NewKdiWriter(path string) (*KdiWriter, error) {
|
|||||||
file: f,
|
file: f,
|
||||||
first: true,
|
first: true,
|
||||||
path: path,
|
path: path,
|
||||||
|
bytesWritten: 0,
|
||||||
|
indexEntries: make([]kdxEntry, 0, 256),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,16 +81,32 @@ func (kw *KdiWriter) Write(kmer uint64) error {
|
|||||||
if _, err := kw.w.Write(buf[:]); err != nil {
|
if _, err := kw.w.Write(buf[:]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
kw.bytesWritten += 8
|
||||||
kw.prev = kmer
|
kw.prev = kmer
|
||||||
kw.first = false
|
kw.first = false
|
||||||
} else {
|
} else {
|
||||||
delta := kmer - kw.prev
|
delta := kmer - kw.prev
|
||||||
if _, err := EncodeVarint(kw.w, delta); err != nil {
|
n, err := EncodeVarint(kw.w, delta)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
kw.bytesWritten += uint64(n)
|
||||||
kw.prev = kmer
|
kw.prev = kmer
|
||||||
}
|
}
|
||||||
kw.count++
|
kw.count++
|
||||||
|
|
||||||
|
// Record sparse index entry every defaultKdxStride k-mers.
|
||||||
|
// The offset recorded is AFTER writing this k-mer, so it points to
|
||||||
|
// where the next k-mer's data will start. SeekTo uses this: it seeks
|
||||||
|
// to the recorded offset, sets prev = indexedKmer, and Next() reads
|
||||||
|
// the delta of the following k-mer.
|
||||||
|
if kw.count%defaultKdxStride == 0 {
|
||||||
|
kw.indexEntries = append(kw.indexEntries, kdxEntry{
|
||||||
|
kmer: kmer,
|
||||||
|
offset: kdiHeaderSize + kw.bytesWritten,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,7 +116,7 @@ func (kw *KdiWriter) Count() uint64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Close flushes buffered data, patches the count in the header,
|
// Close flushes buffered data, patches the count in the header,
|
||||||
// and closes the file.
|
// writes the companion .kdx index file, and closes the file.
|
||||||
func (kw *KdiWriter) Close() error {
|
func (kw *KdiWriter) Close() error {
|
||||||
if err := kw.w.Flush(); err != nil {
|
if err := kw.w.Flush(); err != nil {
|
||||||
kw.file.Close()
|
kw.file.Close()
|
||||||
@@ -109,5 +135,17 @@ func (kw *KdiWriter) Close() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return kw.file.Close()
|
if err := kw.file.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write .kdx index file if there are entries to index
|
||||||
|
if len(kw.indexEntries) > 0 {
|
||||||
|
kdxPath := KdxPathForKdi(kw.path)
|
||||||
|
if err := WriteKdxIndex(kdxPath, defaultKdxStride, kw.indexEntries); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
170
pkg/obikmer/kdx.go
Normal file
170
pkg/obikmer/kdx.go
Normal file
@@ -0,0 +1,170 @@
|
|||||||
|
package obikmer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KDX file magic bytes: "KDX\x01"
|
||||||
|
var kdxMagic = [4]byte{'K', 'D', 'X', 0x01}
|
||||||
|
|
||||||
|
// defaultKdxStride is the number of k-mers between consecutive index entries.
|
||||||
|
const defaultKdxStride = 4096
|
||||||
|
|
||||||
|
// kdxEntry is a single entry in the sparse index: the absolute k-mer value
|
||||||
|
// and the byte offset in the corresponding .kdi file where that k-mer is stored.
|
||||||
|
type kdxEntry struct {
|
||||||
|
kmer uint64
|
||||||
|
offset uint64 // absolute byte offset in .kdi file
|
||||||
|
}
|
||||||
|
|
||||||
|
// KdxIndex is a sparse, in-memory index for a .kdi file.
|
||||||
|
// It stores one entry every `stride` k-mers, enabling O(log N / stride)
|
||||||
|
// binary search followed by at most `stride` linear scan steps.
|
||||||
|
type KdxIndex struct {
|
||||||
|
stride int
|
||||||
|
entries []kdxEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadKdxIndex reads a .kdx file into memory.
|
||||||
|
// Returns (nil, nil) if the file does not exist (graceful degradation).
|
||||||
|
func LoadKdxIndex(path string) (*KdxIndex, error) {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// Read magic
|
||||||
|
var magic [4]byte
|
||||||
|
if _, err := io.ReadFull(f, magic[:]); err != nil {
|
||||||
|
return nil, fmt.Errorf("kdx: read magic: %w", err)
|
||||||
|
}
|
||||||
|
if magic != kdxMagic {
|
||||||
|
return nil, fmt.Errorf("kdx: bad magic %v", magic)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read stride (uint32 LE)
|
||||||
|
var buf4 [4]byte
|
||||||
|
if _, err := io.ReadFull(f, buf4[:]); err != nil {
|
||||||
|
return nil, fmt.Errorf("kdx: read stride: %w", err)
|
||||||
|
}
|
||||||
|
stride := int(binary.LittleEndian.Uint32(buf4[:]))
|
||||||
|
|
||||||
|
// Read count (uint32 LE)
|
||||||
|
if _, err := io.ReadFull(f, buf4[:]); err != nil {
|
||||||
|
return nil, fmt.Errorf("kdx: read count: %w", err)
|
||||||
|
}
|
||||||
|
count := int(binary.LittleEndian.Uint32(buf4[:]))
|
||||||
|
|
||||||
|
// Read entries
|
||||||
|
entries := make([]kdxEntry, count)
|
||||||
|
var buf16 [16]byte
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
if _, err := io.ReadFull(f, buf16[:]); err != nil {
|
||||||
|
return nil, fmt.Errorf("kdx: read entry %d: %w", i, err)
|
||||||
|
}
|
||||||
|
entries[i] = kdxEntry{
|
||||||
|
kmer: binary.LittleEndian.Uint64(buf16[0:8]),
|
||||||
|
offset: binary.LittleEndian.Uint64(buf16[8:16]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &KdxIndex{
|
||||||
|
stride: stride,
|
||||||
|
entries: entries,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindOffset locates the best starting point in the .kdi file to scan for
|
||||||
|
// the target k-mer. It returns:
|
||||||
|
// - offset: the byte offset in the .kdi file to seek to (positioned after
|
||||||
|
// the indexed k-mer, ready to read the next delta)
|
||||||
|
// - skipCount: the number of k-mers already consumed at that offset
|
||||||
|
// (to set the reader's internal counter)
|
||||||
|
// - ok: true if the index provides a useful starting point
|
||||||
|
//
|
||||||
|
// Index entries are recorded at k-mer count positions stride, 2*stride, etc.
|
||||||
|
// Entry i corresponds to the k-mer written at count = (i+1)*stride.
|
||||||
|
func (idx *KdxIndex) FindOffset(target uint64) (offset uint64, skipCount uint64, ok bool) {
|
||||||
|
if idx == nil || len(idx.entries) == 0 {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Binary search: find the largest entry with kmer <= target
|
||||||
|
i := sort.Search(len(idx.entries), func(i int) bool {
|
||||||
|
return idx.entries[i].kmer > target
|
||||||
|
})
|
||||||
|
// i is the first entry with kmer > target, so i-1 is the last with kmer <= target
|
||||||
|
if i == 0 {
|
||||||
|
// Target is before the first index entry.
|
||||||
|
// No useful jump point — caller should scan from the beginning.
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
i-- // largest entry with kmer <= target
|
||||||
|
// Entry i was recorded after writing k-mer at count = (i+1)*stride
|
||||||
|
skipCount = uint64(i+1) * uint64(idx.stride)
|
||||||
|
return idx.entries[i].offset, skipCount, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stride returns the stride of this index.
|
||||||
|
func (idx *KdxIndex) Stride() int {
|
||||||
|
return idx.stride
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len returns the number of entries in this index.
|
||||||
|
func (idx *KdxIndex) Len() int {
|
||||||
|
return len(idx.entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteKdxIndex writes a .kdx file from a slice of entries.
|
||||||
|
func WriteKdxIndex(path string, stride int, entries []kdxEntry) error {
|
||||||
|
f, err := os.Create(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// Magic
|
||||||
|
if _, err := f.Write(kdxMagic[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stride (uint32 LE)
|
||||||
|
var buf4 [4]byte
|
||||||
|
binary.LittleEndian.PutUint32(buf4[:], uint32(stride))
|
||||||
|
if _, err := f.Write(buf4[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count (uint32 LE)
|
||||||
|
binary.LittleEndian.PutUint32(buf4[:], uint32(len(entries)))
|
||||||
|
if _, err := f.Write(buf4[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Entries
|
||||||
|
var buf16 [16]byte
|
||||||
|
for _, e := range entries {
|
||||||
|
binary.LittleEndian.PutUint64(buf16[0:8], e.kmer)
|
||||||
|
binary.LittleEndian.PutUint64(buf16[8:16], e.offset)
|
||||||
|
if _, err := f.Write(buf16[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KdxPathForKdi returns the .kdx path corresponding to a .kdi path.
|
||||||
|
func KdxPathForKdi(kdiPath string) string {
|
||||||
|
return strings.TrimSuffix(kdiPath, ".kdi") + ".kdx"
|
||||||
|
}
|
||||||
190
pkg/obikmer/kmer_match.go
Normal file
190
pkg/obikmer/kmer_match.go
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
package obikmer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
||||||
|
)
|
||||||
|
|
||||||
|
// QueryEntry represents a canonical k-mer to look up, together with
|
||||||
|
// metadata to trace the result back to the originating sequence and position.
|
||||||
|
type QueryEntry struct {
|
||||||
|
Kmer uint64 // canonical k-mer value
|
||||||
|
SeqIdx int // index within the batch
|
||||||
|
Pos int // 0-based position in the sequence
|
||||||
|
}
|
||||||
|
|
||||||
|
// MatchResult maps sequence index → sorted slice of matched positions.
|
||||||
|
type MatchResult map[int][]int
|
||||||
|
|
||||||
|
// seqMatchResult collects matched positions for a single sequence.
|
||||||
|
type seqMatchResult struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
positions []int
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrepareQueries extracts all canonical k-mers from a batch of sequences
|
||||||
|
// and groups them by partition using super-kmer minimizers.
|
||||||
|
//
|
||||||
|
// Returns queries[partition] where each slice is sorted by Kmer value.
|
||||||
|
func (ksg *KmerSetGroup) PrepareQueries(sequences []*obiseq.BioSequence) [][]QueryEntry {
|
||||||
|
P := ksg.partitions
|
||||||
|
k := ksg.k
|
||||||
|
m := ksg.m
|
||||||
|
|
||||||
|
// Pre-allocate partition buckets
|
||||||
|
buckets := make([][]QueryEntry, P)
|
||||||
|
for i := range buckets {
|
||||||
|
buckets[i] = make([]QueryEntry, 0, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
for seqIdx, seq := range sequences {
|
||||||
|
bseq := seq.Sequence()
|
||||||
|
if len(bseq) < k {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate super-kmers to get minimizer → partition mapping
|
||||||
|
for sk := range IterSuperKmers(bseq, k, m) {
|
||||||
|
partition := int(sk.Minimizer % uint64(P))
|
||||||
|
|
||||||
|
// Iterate canonical k-mers within this super-kmer
|
||||||
|
skSeq := sk.Sequence
|
||||||
|
if len(skSeq) < k {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
localPos := 0
|
||||||
|
for kmer := range IterCanonicalKmers(skSeq, k) {
|
||||||
|
buckets[partition] = append(buckets[partition], QueryEntry{
|
||||||
|
Kmer: kmer,
|
||||||
|
SeqIdx: seqIdx,
|
||||||
|
Pos: sk.Start + localPos,
|
||||||
|
})
|
||||||
|
localPos++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort each bucket by k-mer value for merge-scan
|
||||||
|
for p := range buckets {
|
||||||
|
sort.Slice(buckets[p], func(i, j int) bool {
|
||||||
|
return buckets[p][i].Kmer < buckets[p][j].Kmer
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return buckets
|
||||||
|
}
|
||||||
|
|
||||||
|
// MatchBatch looks up pre-sorted queries against one set of the index.
|
||||||
|
// Partitions are processed in parallel. For each partition, a merge-scan
|
||||||
|
// compares the sorted queries against the sorted KDI stream.
|
||||||
|
//
|
||||||
|
// Returns a MatchResult mapping sequence index to sorted matched positions.
|
||||||
|
func (ksg *KmerSetGroup) MatchBatch(setIndex int, queries [][]QueryEntry) MatchResult {
|
||||||
|
P := ksg.partitions
|
||||||
|
|
||||||
|
// Per-sequence result collectors
|
||||||
|
var resultMu sync.Mutex
|
||||||
|
resultMap := make(map[int]*seqMatchResult)
|
||||||
|
|
||||||
|
getResult := func(seqIdx int) *seqMatchResult {
|
||||||
|
resultMu.Lock()
|
||||||
|
sr, ok := resultMap[seqIdx]
|
||||||
|
if !ok {
|
||||||
|
sr = &seqMatchResult{}
|
||||||
|
resultMap[seqIdx] = sr
|
||||||
|
}
|
||||||
|
resultMu.Unlock()
|
||||||
|
return sr
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for p := 0; p < P; p++ {
|
||||||
|
if len(queries[p]) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(part int) {
|
||||||
|
defer wg.Done()
|
||||||
|
ksg.matchPartition(setIndex, part, queries[part], getResult)
|
||||||
|
}(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Build final result with sorted positions
|
||||||
|
result := make(MatchResult, len(resultMap))
|
||||||
|
for seqIdx, sr := range resultMap {
|
||||||
|
if len(sr.positions) > 0 {
|
||||||
|
sort.Ints(sr.positions)
|
||||||
|
result[seqIdx] = sr.positions
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchPartition processes one partition: opens the KDI reader (with index),
|
||||||
|
// seeks to the first query, then merge-scans queries against the KDI stream.
|
||||||
|
func (ksg *KmerSetGroup) matchPartition(
|
||||||
|
setIndex int,
|
||||||
|
partIndex int,
|
||||||
|
queries []QueryEntry, // sorted by Kmer
|
||||||
|
getResult func(int) *seqMatchResult,
|
||||||
|
) {
|
||||||
|
r, err := NewKdiIndexedReader(ksg.partitionPath(setIndex, partIndex))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
if r.Count() == 0 || len(queries) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek to the first query's neighborhood
|
||||||
|
if err := r.SeekTo(queries[0].Kmer); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read first kmer from the stream after seek
|
||||||
|
currentKmer, ok := r.Next()
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
qi := 0 // query index
|
||||||
|
|
||||||
|
for qi < len(queries) {
|
||||||
|
q := queries[qi]
|
||||||
|
|
||||||
|
// Advance KDI stream until >= query kmer
|
||||||
|
for currentKmer < q.Kmer {
|
||||||
|
currentKmer, ok = r.Next()
|
||||||
|
if !ok {
|
||||||
|
return // KDI exhausted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if currentKmer == q.Kmer {
|
||||||
|
// Match! Record all queries with this same k-mer value
|
||||||
|
matchedKmer := q.Kmer
|
||||||
|
for qi < len(queries) && queries[qi].Kmer == matchedKmer {
|
||||||
|
sr := getResult(queries[qi].SeqIdx)
|
||||||
|
sr.mu.Lock()
|
||||||
|
sr.positions = append(sr.positions, queries[qi].Pos)
|
||||||
|
sr.mu.Unlock()
|
||||||
|
qi++
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// currentKmer > q.Kmer: skip all queries with this kmer value
|
||||||
|
skippedKmer := q.Kmer
|
||||||
|
for qi < len(queries) && queries[qi].Kmer == skippedKmer {
|
||||||
|
qi++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -219,21 +219,15 @@ func (ksg *KmerSetGroup) Len(setIndex ...int) uint64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Contains checks if a k-mer is present in the specified set.
|
// Contains checks if a k-mer is present in the specified set.
|
||||||
// Uses binary search on the appropriate partition's KDI file.
|
// Uses the .kdx sparse index (if available) for fast seeking within
|
||||||
|
// each partition, then a short linear scan of at most `stride` entries.
|
||||||
|
// All partitions are searched in parallel since the k-mer's partition
|
||||||
|
// is not known without its minimizer context.
|
||||||
func (ksg *KmerSetGroup) Contains(setIndex int, kmer uint64) bool {
|
func (ksg *KmerSetGroup) Contains(setIndex int, kmer uint64) bool {
|
||||||
if setIndex < 0 || setIndex >= ksg.n {
|
if setIndex < 0 || setIndex >= ksg.n {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
// Determine partition from minimizer
|
|
||||||
// For a canonical k-mer, we need to find which partition it would fall into.
|
|
||||||
// The partition is determined by the minimizer during construction.
|
|
||||||
// For Contains, we must scan all partitions of this set (linear search within each).
|
|
||||||
// A full binary-search approach would require an index file.
|
|
||||||
// For now, scan the partition determined by the k-mer's minimizer.
|
|
||||||
// Since we don't know the minimizer, we do a linear scan of all partitions.
|
|
||||||
// This is O(total_kmers / P) per partition on average.
|
|
||||||
|
|
||||||
// Optimization: scan all partitions in parallel
|
|
||||||
type result struct {
|
type result struct {
|
||||||
found bool
|
found bool
|
||||||
}
|
}
|
||||||
@@ -241,12 +235,20 @@ func (ksg *KmerSetGroup) Contains(setIndex int, kmer uint64) bool {
|
|||||||
|
|
||||||
for p := 0; p < ksg.partitions; p++ {
|
for p := 0; p < ksg.partitions; p++ {
|
||||||
go func(part int) {
|
go func(part int) {
|
||||||
r, err := NewKdiReader(ksg.partitionPath(setIndex, part))
|
r, err := NewKdiIndexedReader(ksg.partitionPath(setIndex, part))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ch <- result{false}
|
ch <- result{false}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
|
// Use index to jump near the target
|
||||||
|
if err := r.SeekTo(kmer); err != nil {
|
||||||
|
ch <- result{false}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Linear scan from the seek position
|
||||||
for {
|
for {
|
||||||
v, ok := r.Next()
|
v, ok := r.Next()
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -853,13 +855,21 @@ func (ksg *KmerSetGroup) CopySetsByIDTo(ids []string, destDir string, force bool
|
|||||||
return nil, fmt.Errorf("obikmer: create dest set dir: %w", err)
|
return nil, fmt.Errorf("obikmer: create dest set dir: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy all partition files
|
// Copy all partition files and their .kdx indices
|
||||||
for p := 0; p < ksg.partitions; p++ {
|
for p := 0; p < ksg.partitions; p++ {
|
||||||
srcPath := ksg.partitionPath(srcIdx, p)
|
srcPath := ksg.partitionPath(srcIdx, p)
|
||||||
destPath := dest.partitionPath(destIdx, p)
|
destPath := dest.partitionPath(destIdx, p)
|
||||||
if err := copyFile(srcPath, destPath); err != nil {
|
if err := copyFile(srcPath, destPath); err != nil {
|
||||||
return nil, fmt.Errorf("obikmer: copy partition %d of set %q: %w", p, srcID, err)
|
return nil, fmt.Errorf("obikmer: copy partition %d of set %q: %w", p, srcID, err)
|
||||||
}
|
}
|
||||||
|
// Copy .kdx index if it exists
|
||||||
|
srcKdx := KdxPathForKdi(srcPath)
|
||||||
|
if _, err := os.Stat(srcKdx); err == nil {
|
||||||
|
destKdx := KdxPathForKdi(destPath)
|
||||||
|
if err := copyFile(srcKdx, destKdx); err != nil {
|
||||||
|
return nil, fmt.Errorf("obikmer: copy index %d of set %q: %w", p, srcID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy spectrum.bin if it exists
|
// Copy spectrum.bin if it exists
|
||||||
|
|||||||
123
pkg/obitools/obik/match.go
Normal file
123
pkg/obitools/obik/match.go
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
package obik
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer"
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
||||||
|
"github.com/DavidGamba/go-getoptions"
|
||||||
|
)
|
||||||
|
|
||||||
|
// matchSliceWorker creates a SeqSliceWorker that annotates each sequence
|
||||||
|
// in a batch with k-mer match positions from the index.
|
||||||
|
// For each set, an attribute "kmer_matched_<setID>" is added containing
|
||||||
|
// a sorted []int of 0-based positions where matched k-mers start.
|
||||||
|
func matchSliceWorker(ksg *obikmer.KmerSetGroup, setIndices []int) obiseq.SeqSliceWorker {
|
||||||
|
return func(batch obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) {
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return batch, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build slice of *BioSequence for PrepareQueries
|
||||||
|
seqs := make([]*obiseq.BioSequence, len(batch))
|
||||||
|
for i := range batch {
|
||||||
|
seqs[i] = batch[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare queries once (shared across sets)
|
||||||
|
queries := ksg.PrepareQueries(seqs)
|
||||||
|
|
||||||
|
// Match against each selected set
|
||||||
|
for _, setIdx := range setIndices {
|
||||||
|
result := ksg.MatchBatch(setIdx, queries)
|
||||||
|
|
||||||
|
setID := ksg.SetIDOf(setIdx)
|
||||||
|
if setID == "" {
|
||||||
|
setID = fmt.Sprintf("set_%d", setIdx)
|
||||||
|
}
|
||||||
|
attrName := "kmer_matched_" + setID
|
||||||
|
|
||||||
|
for seqIdx, positions := range result {
|
||||||
|
if len(positions) > 0 {
|
||||||
|
batch[seqIdx].SetAttribute(attrName, positions)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return batch, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// runMatch implements the "obik match" subcommand.
|
||||||
|
// It reads sequences, looks up their k-mers in a disk-based index,
|
||||||
|
// and annotates each sequence with match positions.
|
||||||
|
func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error {
|
||||||
|
if len(args) < 1 {
|
||||||
|
return fmt.Errorf("usage: obik match [options] <index_directory> [sequence_files...]")
|
||||||
|
}
|
||||||
|
|
||||||
|
indexDir := args[0]
|
||||||
|
seqArgs := args[1:]
|
||||||
|
|
||||||
|
// Open the k-mer index
|
||||||
|
ksg, err := obikmer.OpenKmerSetGroup(indexDir)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open kmer index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Opened index: k=%d, m=%d, %d partitions, %d set(s)",
|
||||||
|
ksg.K(), ksg.M(), ksg.Partitions(), ksg.Size())
|
||||||
|
|
||||||
|
// Resolve which sets to match against
|
||||||
|
patterns := CLISetPatterns()
|
||||||
|
var setIndices []int
|
||||||
|
if len(patterns) > 0 {
|
||||||
|
setIndices, err = ksg.MatchSetIDs(patterns)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to match set patterns: %w", err)
|
||||||
|
}
|
||||||
|
if len(setIndices) == 0 {
|
||||||
|
return fmt.Errorf("no sets match the given patterns")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// All sets
|
||||||
|
setIndices = make([]int, ksg.Size())
|
||||||
|
for i := range setIndices {
|
||||||
|
setIndices[i] = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log which sets we'll match
|
||||||
|
for _, idx := range setIndices {
|
||||||
|
id := ksg.SetIDOf(idx)
|
||||||
|
if id == "" {
|
||||||
|
id = fmt.Sprintf("set_%d", idx)
|
||||||
|
}
|
||||||
|
log.Infof("Matching against set %d (%s): %d k-mers", idx, id, ksg.Len(idx))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read sequences
|
||||||
|
sequences, err := obiconvert.CLIReadBioSequences(seqArgs...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open sequence files: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the batch worker
|
||||||
|
worker := matchSliceWorker(ksg, setIndices)
|
||||||
|
matched := sequences.MakeISliceWorker(
|
||||||
|
worker,
|
||||||
|
false,
|
||||||
|
obidefault.ParallelWorkers(),
|
||||||
|
)
|
||||||
|
|
||||||
|
obiconvert.CLIWriteBioSequences(matched, true)
|
||||||
|
obiutils.WaitForLastPipe()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -67,4 +67,11 @@ func OptionSet(opt *getoptions.GetOpt) {
|
|||||||
obiconvert.OutputOptionSet(lowmaskCmd)
|
obiconvert.OutputOptionSet(lowmaskCmd)
|
||||||
LowMaskOptionSet(lowmaskCmd)
|
LowMaskOptionSet(lowmaskCmd)
|
||||||
lowmaskCmd.SetCommandFn(runLowmask)
|
lowmaskCmd.SetCommandFn(runLowmask)
|
||||||
|
|
||||||
|
// match: annotate sequences with k-mer match positions from an index
|
||||||
|
matchCmd := opt.NewCommand("match", "Annotate sequences with k-mer match positions from an index")
|
||||||
|
obiconvert.InputOptionSet(matchCmd)
|
||||||
|
obiconvert.OutputOptionSet(matchCmd)
|
||||||
|
SetSelectionOptionSet(matchCmd)
|
||||||
|
matchCmd.SetCommandFn(runMatch)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user