mirror of
https://github.com/metabarcoding/obitools4.git
synced 2026-03-25 21:40:52 +00:00
This commit introduces sparse index support for KDI files to enable fast random access during k-mer matching. It adds a new .kdx index file format and updates the KDI reader and writer to handle index creation and seeking. The changes include: - New KdxIndex struct and related functions for loading, searching, and writing .kdx files - Modified KdiReader to support seeking with the new index - Updated KdiWriter to create .kdx index files during writing - Enhanced KmerSetGroup.Contains to use the new index for faster lookups - Added a new 'match' command to annotate sequences with k-mer match positions The index is created automatically during KDI file creation and allows for O(log N / stride) binary search followed by at most stride linear scan steps, significantly improving performance for large datasets.
171 lines
4.5 KiB
Go
171 lines
4.5 KiB
Go
package obikmer
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
)
|
|
|
|
// KdiReader reads k-mers from a .kdi file using streaming delta-varint decoding.
|
|
type KdiReader struct {
|
|
r *bufio.Reader
|
|
file *os.File
|
|
count uint64 // total number of k-mers
|
|
read uint64 // number of k-mers already consumed
|
|
prev uint64 // last decoded value
|
|
started bool // whether first value has been read
|
|
index *KdxIndex // optional sparse index for seeking
|
|
}
|
|
|
|
// NewKdiReader opens a .kdi file for streaming reading (no index).
|
|
func NewKdiReader(path string) (*KdiReader, error) {
|
|
return openKdiReader(path, nil)
|
|
}
|
|
|
|
// NewKdiIndexedReader opens a .kdi file with its companion .kdx index
|
|
// loaded for fast seeking. If the .kdx file does not exist, it gracefully
|
|
// falls back to sequential reading.
|
|
func NewKdiIndexedReader(path string) (*KdiReader, error) {
|
|
kdxPath := KdxPathForKdi(path)
|
|
idx, err := LoadKdxIndex(kdxPath)
|
|
if err != nil {
|
|
// Index load failed — fall back to non-indexed
|
|
return openKdiReader(path, nil)
|
|
}
|
|
// idx may be nil if file does not exist — that's fine
|
|
return openKdiReader(path, idx)
|
|
}
|
|
|
|
func openKdiReader(path string, idx *KdxIndex) (*KdiReader, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r := bufio.NewReaderSize(f, 65536)
|
|
|
|
// Read and verify magic
|
|
var magic [4]byte
|
|
if _, err := io.ReadFull(r, magic[:]); err != nil {
|
|
f.Close()
|
|
return nil, fmt.Errorf("kdi: read magic: %w", err)
|
|
}
|
|
if magic != kdiMagic {
|
|
f.Close()
|
|
return nil, fmt.Errorf("kdi: bad magic %v", magic)
|
|
}
|
|
|
|
// Read count
|
|
var countBuf [8]byte
|
|
if _, err := io.ReadFull(r, countBuf[:]); err != nil {
|
|
f.Close()
|
|
return nil, fmt.Errorf("kdi: read count: %w", err)
|
|
}
|
|
count := binary.LittleEndian.Uint64(countBuf[:])
|
|
|
|
return &KdiReader{
|
|
r: r,
|
|
file: f,
|
|
count: count,
|
|
index: idx,
|
|
}, nil
|
|
}
|
|
|
|
// Next returns the next k-mer and true, or (0, false) when exhausted.
|
|
func (kr *KdiReader) Next() (uint64, bool) {
|
|
if kr.read >= kr.count {
|
|
return 0, false
|
|
}
|
|
|
|
if !kr.started {
|
|
// Read first value as absolute uint64 LE
|
|
var buf [8]byte
|
|
if _, err := io.ReadFull(kr.r, buf[:]); err != nil {
|
|
return 0, false
|
|
}
|
|
kr.prev = binary.LittleEndian.Uint64(buf[:])
|
|
kr.started = true
|
|
kr.read++
|
|
return kr.prev, true
|
|
}
|
|
|
|
// Read delta varint
|
|
delta, err := DecodeVarint(kr.r)
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
kr.prev += delta
|
|
kr.read++
|
|
return kr.prev, true
|
|
}
|
|
|
|
// SeekTo positions the reader near the target k-mer using the sparse .kdx index.
|
|
// After SeekTo, the reader is positioned so that the next call to Next()
|
|
// returns the k-mer immediately after the indexed entry at or before target.
|
|
//
|
|
// If the reader has no index, or the target is before the current position,
|
|
// SeekTo does nothing (linear scan continues from current position).
|
|
func (kr *KdiReader) SeekTo(target uint64) error {
|
|
if kr.index == nil {
|
|
return nil
|
|
}
|
|
|
|
// If we've already passed the target, we can't seek backwards
|
|
if kr.started && kr.prev >= target {
|
|
return nil
|
|
}
|
|
|
|
offset, skipCount, ok := kr.index.FindOffset(target)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// skipCount is the number of k-mers consumed at the indexed position.
|
|
// The index was recorded AFTER writing the k-mer at position skipCount-1
|
|
// (since count%stride==0 after incrementing count). So the actual number
|
|
// of k-mers consumed is skipCount (the entry's kmer is the last one
|
|
// before the offset).
|
|
|
|
// Only seek if it would skip significant work
|
|
if kr.started && skipCount <= kr.read {
|
|
return nil
|
|
}
|
|
|
|
// The index entry stores (kmer_value, byte_offset_after_that_kmer).
|
|
// skipCount = (entryIdx+1)*stride, so entryIdx = skipCount/stride - 1
|
|
// We seek to that offset, set prev = indexedKmer, and the next Next()
|
|
// call will read the delta-varint of the following k-mer.
|
|
entryIdx := int(skipCount)/kr.index.stride - 1
|
|
if entryIdx < 0 || entryIdx >= len(kr.index.entries) {
|
|
return nil
|
|
}
|
|
indexedKmer := kr.index.entries[entryIdx].kmer
|
|
|
|
if _, err := kr.file.Seek(int64(offset), io.SeekStart); err != nil {
|
|
return fmt.Errorf("kdi: seek: %w", err)
|
|
}
|
|
kr.r.Reset(kr.file)
|
|
|
|
kr.prev = indexedKmer
|
|
kr.started = true
|
|
kr.read = skipCount
|
|
|
|
return nil
|
|
}
|
|
|
|
// Count returns the total number of k-mers in this partition.
|
|
func (kr *KdiReader) Count() uint64 {
|
|
return kr.count
|
|
}
|
|
|
|
// Remaining returns how many k-mers have not been read yet.
|
|
func (kr *KdiReader) Remaining() uint64 {
|
|
return kr.count - kr.read
|
|
}
|
|
|
|
// Close closes the underlying file.
|
|
func (kr *KdiReader) Close() error {
|
|
return kr.file.Close()
|
|
}
|