From c6e04265f14dcfaad54a434b696041b3f060312a Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 10 Feb 2026 13:23:56 +0100 Subject: [PATCH] Add sparse index support for KDI files with fast seeking This commit introduces sparse index support for KDI files to enable fast random access during k-mer matching. It adds a new .kdx index file format and updates the KDI reader and writer to handle index creation and seeking. The changes include: - New KdxIndex struct and related functions for loading, searching, and writing .kdx files - Modified KdiReader to support seeking with the new index - Updated KdiWriter to create .kdx index files during writing - Enhanced KmerSetGroup.Contains to use the new index for faster lookups - Added a new 'match' command to annotate sequences with k-mer match positions The index is created automatically during KDI file creation and allows for O(log N / stride) binary search followed by at most stride linear scan steps, significantly improving performance for large datasets. --- pkg/obikmer/kdi_reader.go | 84 +++++++++++++++- pkg/obikmer/kdi_writer.go | 64 +++++++++--- pkg/obikmer/kdx.go | 170 +++++++++++++++++++++++++++++++ pkg/obikmer/kmer_match.go | 190 +++++++++++++++++++++++++++++++++++ pkg/obikmer/kmer_set_disk.go | 34 ++++--- pkg/obitools/obik/match.go | 123 +++++++++++++++++++++++ pkg/obitools/obik/obik.go | 7 ++ 7 files changed, 642 insertions(+), 30 deletions(-) create mode 100644 pkg/obikmer/kdx.go create mode 100644 pkg/obikmer/kmer_match.go create mode 100644 pkg/obitools/obik/match.go diff --git a/pkg/obikmer/kdi_reader.go b/pkg/obikmer/kdi_reader.go index 22722f9..09cbb46 100644 --- a/pkg/obikmer/kdi_reader.go +++ b/pkg/obikmer/kdi_reader.go @@ -12,14 +12,33 @@ import ( type KdiReader struct { r *bufio.Reader file *os.File - count uint64 // total number of k-mers - read uint64 // number of k-mers already consumed - prev uint64 // last decoded value - started bool // whether first value has been read + count uint64 // total number of k-mers + read uint64 // number of k-mers already consumed + prev uint64 // last decoded value + started bool // whether first value has been read + index *KdxIndex // optional sparse index for seeking } -// NewKdiReader opens a .kdi file for streaming reading. +// NewKdiReader opens a .kdi file for streaming reading (no index). func NewKdiReader(path string) (*KdiReader, error) { + return openKdiReader(path, nil) +} + +// NewKdiIndexedReader opens a .kdi file with its companion .kdx index +// loaded for fast seeking. If the .kdx file does not exist, it gracefully +// falls back to sequential reading. +func NewKdiIndexedReader(path string) (*KdiReader, error) { + kdxPath := KdxPathForKdi(path) + idx, err := LoadKdxIndex(kdxPath) + if err != nil { + // Index load failed — fall back to non-indexed + return openKdiReader(path, nil) + } + // idx may be nil if file does not exist — that's fine + return openKdiReader(path, idx) +} + +func openKdiReader(path string, idx *KdxIndex) (*KdiReader, error) { f, err := os.Open(path) if err != nil { return nil, err @@ -49,6 +68,7 @@ func NewKdiReader(path string) (*KdiReader, error) { r: r, file: f, count: count, + index: idx, }, nil } @@ -80,6 +100,60 @@ func (kr *KdiReader) Next() (uint64, bool) { return kr.prev, true } +// SeekTo positions the reader near the target k-mer using the sparse .kdx index. +// After SeekTo, the reader is positioned so that the next call to Next() +// returns the k-mer immediately after the indexed entry at or before target. +// +// If the reader has no index, or the target is before the current position, +// SeekTo does nothing (linear scan continues from current position). +func (kr *KdiReader) SeekTo(target uint64) error { + if kr.index == nil { + return nil + } + + // If we've already passed the target, we can't seek backwards + if kr.started && kr.prev >= target { + return nil + } + + offset, skipCount, ok := kr.index.FindOffset(target) + if !ok { + return nil + } + + // skipCount is the number of k-mers consumed at the indexed position. + // The index was recorded AFTER writing the k-mer at position skipCount-1 + // (since count%stride==0 after incrementing count). So the actual number + // of k-mers consumed is skipCount (the entry's kmer is the last one + // before the offset). + + // Only seek if it would skip significant work + if kr.started && skipCount <= kr.read { + return nil + } + + // The index entry stores (kmer_value, byte_offset_after_that_kmer). + // skipCount = (entryIdx+1)*stride, so entryIdx = skipCount/stride - 1 + // We seek to that offset, set prev = indexedKmer, and the next Next() + // call will read the delta-varint of the following k-mer. + entryIdx := int(skipCount)/kr.index.stride - 1 + if entryIdx < 0 || entryIdx >= len(kr.index.entries) { + return nil + } + indexedKmer := kr.index.entries[entryIdx].kmer + + if _, err := kr.file.Seek(int64(offset), io.SeekStart); err != nil { + return fmt.Errorf("kdi: seek: %w", err) + } + kr.r.Reset(kr.file) + + kr.prev = indexedKmer + kr.started = true + kr.read = skipCount + + return nil +} + // Count returns the total number of k-mers in this partition. func (kr *KdiReader) Count() uint64 { return kr.count diff --git a/pkg/obikmer/kdi_writer.go b/pkg/obikmer/kdi_writer.go index f853586..325e745 100644 --- a/pkg/obikmer/kdi_writer.go +++ b/pkg/obikmer/kdi_writer.go @@ -9,6 +9,9 @@ import ( // KDI file magic bytes: "KDI\x01" var kdiMagic = [4]byte{'K', 'D', 'I', 0x01} +// kdiHeaderSize is the size of the KDI header: magic(4) + count(8) = 12 bytes. +const kdiHeaderSize = 12 + // KdiWriter writes a sorted sequence of uint64 k-mers to a .kdi file // using delta-varint encoding. // @@ -22,13 +25,18 @@ var kdiMagic = [4]byte{'K', 'D', 'I', 0x01} // ... // // The caller must write k-mers in strictly increasing order. +// +// On Close(), a companion .kdx sparse index file is written alongside +// the .kdi file for fast random access. type KdiWriter struct { - w *bufio.Writer - file *os.File - count uint64 - prev uint64 - first bool - path string + w *bufio.Writer + file *os.File + count uint64 + prev uint64 + first bool + path string + bytesWritten uint64 // bytes written after header (data section offset) + indexEntries []kdxEntry // sparse index entries collected during writes } // NewKdiWriter creates a new KdiWriter writing to the given file path. @@ -54,10 +62,12 @@ func NewKdiWriter(path string) (*KdiWriter, error) { } return &KdiWriter{ - w: w, - file: f, - first: true, - path: path, + w: w, + file: f, + first: true, + path: path, + bytesWritten: 0, + indexEntries: make([]kdxEntry, 0, 256), }, nil } @@ -71,16 +81,32 @@ func (kw *KdiWriter) Write(kmer uint64) error { if _, err := kw.w.Write(buf[:]); err != nil { return err } + kw.bytesWritten += 8 kw.prev = kmer kw.first = false } else { delta := kmer - kw.prev - if _, err := EncodeVarint(kw.w, delta); err != nil { + n, err := EncodeVarint(kw.w, delta) + if err != nil { return err } + kw.bytesWritten += uint64(n) kw.prev = kmer } kw.count++ + + // Record sparse index entry every defaultKdxStride k-mers. + // The offset recorded is AFTER writing this k-mer, so it points to + // where the next k-mer's data will start. SeekTo uses this: it seeks + // to the recorded offset, sets prev = indexedKmer, and Next() reads + // the delta of the following k-mer. + if kw.count%defaultKdxStride == 0 { + kw.indexEntries = append(kw.indexEntries, kdxEntry{ + kmer: kmer, + offset: kdiHeaderSize + kw.bytesWritten, + }) + } + return nil } @@ -90,7 +116,7 @@ func (kw *KdiWriter) Count() uint64 { } // Close flushes buffered data, patches the count in the header, -// and closes the file. +// writes the companion .kdx index file, and closes the file. func (kw *KdiWriter) Close() error { if err := kw.w.Flush(); err != nil { kw.file.Close() @@ -109,5 +135,17 @@ func (kw *KdiWriter) Close() error { return err } - return kw.file.Close() + if err := kw.file.Close(); err != nil { + return err + } + + // Write .kdx index file if there are entries to index + if len(kw.indexEntries) > 0 { + kdxPath := KdxPathForKdi(kw.path) + if err := WriteKdxIndex(kdxPath, defaultKdxStride, kw.indexEntries); err != nil { + return err + } + } + + return nil } diff --git a/pkg/obikmer/kdx.go b/pkg/obikmer/kdx.go new file mode 100644 index 0000000..19b7dd9 --- /dev/null +++ b/pkg/obikmer/kdx.go @@ -0,0 +1,170 @@ +package obikmer + +import ( + "encoding/binary" + "fmt" + "io" + "os" + "sort" + "strings" +) + +// KDX file magic bytes: "KDX\x01" +var kdxMagic = [4]byte{'K', 'D', 'X', 0x01} + +// defaultKdxStride is the number of k-mers between consecutive index entries. +const defaultKdxStride = 4096 + +// kdxEntry is a single entry in the sparse index: the absolute k-mer value +// and the byte offset in the corresponding .kdi file where that k-mer is stored. +type kdxEntry struct { + kmer uint64 + offset uint64 // absolute byte offset in .kdi file +} + +// KdxIndex is a sparse, in-memory index for a .kdi file. +// It stores one entry every `stride` k-mers, enabling O(log N / stride) +// binary search followed by at most `stride` linear scan steps. +type KdxIndex struct { + stride int + entries []kdxEntry +} + +// LoadKdxIndex reads a .kdx file into memory. +// Returns (nil, nil) if the file does not exist (graceful degradation). +func LoadKdxIndex(path string) (*KdxIndex, error) { + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + defer f.Close() + + // Read magic + var magic [4]byte + if _, err := io.ReadFull(f, magic[:]); err != nil { + return nil, fmt.Errorf("kdx: read magic: %w", err) + } + if magic != kdxMagic { + return nil, fmt.Errorf("kdx: bad magic %v", magic) + } + + // Read stride (uint32 LE) + var buf4 [4]byte + if _, err := io.ReadFull(f, buf4[:]); err != nil { + return nil, fmt.Errorf("kdx: read stride: %w", err) + } + stride := int(binary.LittleEndian.Uint32(buf4[:])) + + // Read count (uint32 LE) + if _, err := io.ReadFull(f, buf4[:]); err != nil { + return nil, fmt.Errorf("kdx: read count: %w", err) + } + count := int(binary.LittleEndian.Uint32(buf4[:])) + + // Read entries + entries := make([]kdxEntry, count) + var buf16 [16]byte + for i := 0; i < count; i++ { + if _, err := io.ReadFull(f, buf16[:]); err != nil { + return nil, fmt.Errorf("kdx: read entry %d: %w", i, err) + } + entries[i] = kdxEntry{ + kmer: binary.LittleEndian.Uint64(buf16[0:8]), + offset: binary.LittleEndian.Uint64(buf16[8:16]), + } + } + + return &KdxIndex{ + stride: stride, + entries: entries, + }, nil +} + +// FindOffset locates the best starting point in the .kdi file to scan for +// the target k-mer. It returns: +// - offset: the byte offset in the .kdi file to seek to (positioned after +// the indexed k-mer, ready to read the next delta) +// - skipCount: the number of k-mers already consumed at that offset +// (to set the reader's internal counter) +// - ok: true if the index provides a useful starting point +// +// Index entries are recorded at k-mer count positions stride, 2*stride, etc. +// Entry i corresponds to the k-mer written at count = (i+1)*stride. +func (idx *KdxIndex) FindOffset(target uint64) (offset uint64, skipCount uint64, ok bool) { + if idx == nil || len(idx.entries) == 0 { + return 0, 0, false + } + + // Binary search: find the largest entry with kmer <= target + i := sort.Search(len(idx.entries), func(i int) bool { + return idx.entries[i].kmer > target + }) + // i is the first entry with kmer > target, so i-1 is the last with kmer <= target + if i == 0 { + // Target is before the first index entry. + // No useful jump point — caller should scan from the beginning. + return 0, 0, false + } + + i-- // largest entry with kmer <= target + // Entry i was recorded after writing k-mer at count = (i+1)*stride + skipCount = uint64(i+1) * uint64(idx.stride) + return idx.entries[i].offset, skipCount, true +} + +// Stride returns the stride of this index. +func (idx *KdxIndex) Stride() int { + return idx.stride +} + +// Len returns the number of entries in this index. +func (idx *KdxIndex) Len() int { + return len(idx.entries) +} + +// WriteKdxIndex writes a .kdx file from a slice of entries. +func WriteKdxIndex(path string, stride int, entries []kdxEntry) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + // Magic + if _, err := f.Write(kdxMagic[:]); err != nil { + return err + } + + // Stride (uint32 LE) + var buf4 [4]byte + binary.LittleEndian.PutUint32(buf4[:], uint32(stride)) + if _, err := f.Write(buf4[:]); err != nil { + return err + } + + // Count (uint32 LE) + binary.LittleEndian.PutUint32(buf4[:], uint32(len(entries))) + if _, err := f.Write(buf4[:]); err != nil { + return err + } + + // Entries + var buf16 [16]byte + for _, e := range entries { + binary.LittleEndian.PutUint64(buf16[0:8], e.kmer) + binary.LittleEndian.PutUint64(buf16[8:16], e.offset) + if _, err := f.Write(buf16[:]); err != nil { + return err + } + } + + return nil +} + +// KdxPathForKdi returns the .kdx path corresponding to a .kdi path. +func KdxPathForKdi(kdiPath string) string { + return strings.TrimSuffix(kdiPath, ".kdi") + ".kdx" +} diff --git a/pkg/obikmer/kmer_match.go b/pkg/obikmer/kmer_match.go new file mode 100644 index 0000000..69acc06 --- /dev/null +++ b/pkg/obikmer/kmer_match.go @@ -0,0 +1,190 @@ +package obikmer + +import ( + "sort" + "sync" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" +) + +// QueryEntry represents a canonical k-mer to look up, together with +// metadata to trace the result back to the originating sequence and position. +type QueryEntry struct { + Kmer uint64 // canonical k-mer value + SeqIdx int // index within the batch + Pos int // 0-based position in the sequence +} + +// MatchResult maps sequence index → sorted slice of matched positions. +type MatchResult map[int][]int + +// seqMatchResult collects matched positions for a single sequence. +type seqMatchResult struct { + mu sync.Mutex + positions []int +} + +// PrepareQueries extracts all canonical k-mers from a batch of sequences +// and groups them by partition using super-kmer minimizers. +// +// Returns queries[partition] where each slice is sorted by Kmer value. +func (ksg *KmerSetGroup) PrepareQueries(sequences []*obiseq.BioSequence) [][]QueryEntry { + P := ksg.partitions + k := ksg.k + m := ksg.m + + // Pre-allocate partition buckets + buckets := make([][]QueryEntry, P) + for i := range buckets { + buckets[i] = make([]QueryEntry, 0, 64) + } + + for seqIdx, seq := range sequences { + bseq := seq.Sequence() + if len(bseq) < k { + continue + } + + // Iterate super-kmers to get minimizer → partition mapping + for sk := range IterSuperKmers(bseq, k, m) { + partition := int(sk.Minimizer % uint64(P)) + + // Iterate canonical k-mers within this super-kmer + skSeq := sk.Sequence + if len(skSeq) < k { + continue + } + + localPos := 0 + for kmer := range IterCanonicalKmers(skSeq, k) { + buckets[partition] = append(buckets[partition], QueryEntry{ + Kmer: kmer, + SeqIdx: seqIdx, + Pos: sk.Start + localPos, + }) + localPos++ + } + } + } + + // Sort each bucket by k-mer value for merge-scan + for p := range buckets { + sort.Slice(buckets[p], func(i, j int) bool { + return buckets[p][i].Kmer < buckets[p][j].Kmer + }) + } + + return buckets +} + +// MatchBatch looks up pre-sorted queries against one set of the index. +// Partitions are processed in parallel. For each partition, a merge-scan +// compares the sorted queries against the sorted KDI stream. +// +// Returns a MatchResult mapping sequence index to sorted matched positions. +func (ksg *KmerSetGroup) MatchBatch(setIndex int, queries [][]QueryEntry) MatchResult { + P := ksg.partitions + + // Per-sequence result collectors + var resultMu sync.Mutex + resultMap := make(map[int]*seqMatchResult) + + getResult := func(seqIdx int) *seqMatchResult { + resultMu.Lock() + sr, ok := resultMap[seqIdx] + if !ok { + sr = &seqMatchResult{} + resultMap[seqIdx] = sr + } + resultMu.Unlock() + return sr + } + + var wg sync.WaitGroup + + for p := 0; p < P; p++ { + if len(queries[p]) == 0 { + continue + } + wg.Add(1) + go func(part int) { + defer wg.Done() + ksg.matchPartition(setIndex, part, queries[part], getResult) + }(p) + } + + wg.Wait() + + // Build final result with sorted positions + result := make(MatchResult, len(resultMap)) + for seqIdx, sr := range resultMap { + if len(sr.positions) > 0 { + sort.Ints(sr.positions) + result[seqIdx] = sr.positions + } + } + + return result +} + +// matchPartition processes one partition: opens the KDI reader (with index), +// seeks to the first query, then merge-scans queries against the KDI stream. +func (ksg *KmerSetGroup) matchPartition( + setIndex int, + partIndex int, + queries []QueryEntry, // sorted by Kmer + getResult func(int) *seqMatchResult, +) { + r, err := NewKdiIndexedReader(ksg.partitionPath(setIndex, partIndex)) + if err != nil { + return + } + defer r.Close() + + if r.Count() == 0 || len(queries) == 0 { + return + } + + // Seek to the first query's neighborhood + if err := r.SeekTo(queries[0].Kmer); err != nil { + return + } + + // Read first kmer from the stream after seek + currentKmer, ok := r.Next() + if !ok { + return + } + + qi := 0 // query index + + for qi < len(queries) { + q := queries[qi] + + // Advance KDI stream until >= query kmer + for currentKmer < q.Kmer { + currentKmer, ok = r.Next() + if !ok { + return // KDI exhausted + } + } + + if currentKmer == q.Kmer { + // Match! Record all queries with this same k-mer value + matchedKmer := q.Kmer + for qi < len(queries) && queries[qi].Kmer == matchedKmer { + sr := getResult(queries[qi].SeqIdx) + sr.mu.Lock() + sr.positions = append(sr.positions, queries[qi].Pos) + sr.mu.Unlock() + qi++ + } + } else { + // currentKmer > q.Kmer: skip all queries with this kmer value + skippedKmer := q.Kmer + for qi < len(queries) && queries[qi].Kmer == skippedKmer { + qi++ + } + } + } +} diff --git a/pkg/obikmer/kmer_set_disk.go b/pkg/obikmer/kmer_set_disk.go index 9613ebe..3932fdc 100644 --- a/pkg/obikmer/kmer_set_disk.go +++ b/pkg/obikmer/kmer_set_disk.go @@ -219,21 +219,15 @@ func (ksg *KmerSetGroup) Len(setIndex ...int) uint64 { } // Contains checks if a k-mer is present in the specified set. -// Uses binary search on the appropriate partition's KDI file. +// Uses the .kdx sparse index (if available) for fast seeking within +// each partition, then a short linear scan of at most `stride` entries. +// All partitions are searched in parallel since the k-mer's partition +// is not known without its minimizer context. func (ksg *KmerSetGroup) Contains(setIndex int, kmer uint64) bool { if setIndex < 0 || setIndex >= ksg.n { return false } - // Determine partition from minimizer - // For a canonical k-mer, we need to find which partition it would fall into. - // The partition is determined by the minimizer during construction. - // For Contains, we must scan all partitions of this set (linear search within each). - // A full binary-search approach would require an index file. - // For now, scan the partition determined by the k-mer's minimizer. - // Since we don't know the minimizer, we do a linear scan of all partitions. - // This is O(total_kmers / P) per partition on average. - // Optimization: scan all partitions in parallel type result struct { found bool } @@ -241,12 +235,20 @@ func (ksg *KmerSetGroup) Contains(setIndex int, kmer uint64) bool { for p := 0; p < ksg.partitions; p++ { go func(part int) { - r, err := NewKdiReader(ksg.partitionPath(setIndex, part)) + r, err := NewKdiIndexedReader(ksg.partitionPath(setIndex, part)) if err != nil { ch <- result{false} return } defer r.Close() + + // Use index to jump near the target + if err := r.SeekTo(kmer); err != nil { + ch <- result{false} + return + } + + // Linear scan from the seek position for { v, ok := r.Next() if !ok { @@ -853,13 +855,21 @@ func (ksg *KmerSetGroup) CopySetsByIDTo(ids []string, destDir string, force bool return nil, fmt.Errorf("obikmer: create dest set dir: %w", err) } - // Copy all partition files + // Copy all partition files and their .kdx indices for p := 0; p < ksg.partitions; p++ { srcPath := ksg.partitionPath(srcIdx, p) destPath := dest.partitionPath(destIdx, p) if err := copyFile(srcPath, destPath); err != nil { return nil, fmt.Errorf("obikmer: copy partition %d of set %q: %w", p, srcID, err) } + // Copy .kdx index if it exists + srcKdx := KdxPathForKdi(srcPath) + if _, err := os.Stat(srcKdx); err == nil { + destKdx := KdxPathForKdi(destPath) + if err := copyFile(srcKdx, destKdx); err != nil { + return nil, fmt.Errorf("obikmer: copy index %d of set %q: %w", p, srcID, err) + } + } } // Copy spectrum.bin if it exists diff --git a/pkg/obitools/obik/match.go b/pkg/obitools/obik/match.go new file mode 100644 index 0000000..62fc9b8 --- /dev/null +++ b/pkg/obitools/obik/match.go @@ -0,0 +1,123 @@ +package obik + +import ( + "context" + "fmt" + + log "github.com/sirupsen/logrus" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" + "github.com/DavidGamba/go-getoptions" +) + +// matchSliceWorker creates a SeqSliceWorker that annotates each sequence +// in a batch with k-mer match positions from the index. +// For each set, an attribute "kmer_matched_" is added containing +// a sorted []int of 0-based positions where matched k-mers start. +func matchSliceWorker(ksg *obikmer.KmerSetGroup, setIndices []int) obiseq.SeqSliceWorker { + return func(batch obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { + if len(batch) == 0 { + return batch, nil + } + + // Build slice of *BioSequence for PrepareQueries + seqs := make([]*obiseq.BioSequence, len(batch)) + for i := range batch { + seqs[i] = batch[i] + } + + // Prepare queries once (shared across sets) + queries := ksg.PrepareQueries(seqs) + + // Match against each selected set + for _, setIdx := range setIndices { + result := ksg.MatchBatch(setIdx, queries) + + setID := ksg.SetIDOf(setIdx) + if setID == "" { + setID = fmt.Sprintf("set_%d", setIdx) + } + attrName := "kmer_matched_" + setID + + for seqIdx, positions := range result { + if len(positions) > 0 { + batch[seqIdx].SetAttribute(attrName, positions) + } + } + } + + return batch, nil + } +} + +// runMatch implements the "obik match" subcommand. +// It reads sequences, looks up their k-mers in a disk-based index, +// and annotates each sequence with match positions. +func runMatch(ctx context.Context, opt *getoptions.GetOpt, args []string) error { + if len(args) < 1 { + return fmt.Errorf("usage: obik match [options] [sequence_files...]") + } + + indexDir := args[0] + seqArgs := args[1:] + + // Open the k-mer index + ksg, err := obikmer.OpenKmerSetGroup(indexDir) + if err != nil { + return fmt.Errorf("failed to open kmer index: %w", err) + } + + log.Infof("Opened index: k=%d, m=%d, %d partitions, %d set(s)", + ksg.K(), ksg.M(), ksg.Partitions(), ksg.Size()) + + // Resolve which sets to match against + patterns := CLISetPatterns() + var setIndices []int + if len(patterns) > 0 { + setIndices, err = ksg.MatchSetIDs(patterns) + if err != nil { + return fmt.Errorf("failed to match set patterns: %w", err) + } + if len(setIndices) == 0 { + return fmt.Errorf("no sets match the given patterns") + } + } else { + // All sets + setIndices = make([]int, ksg.Size()) + for i := range setIndices { + setIndices[i] = i + } + } + + // Log which sets we'll match + for _, idx := range setIndices { + id := ksg.SetIDOf(idx) + if id == "" { + id = fmt.Sprintf("set_%d", idx) + } + log.Infof("Matching against set %d (%s): %d k-mers", idx, id, ksg.Len(idx)) + } + + // Read sequences + sequences, err := obiconvert.CLIReadBioSequences(seqArgs...) + if err != nil { + return fmt.Errorf("failed to open sequence files: %w", err) + } + + // Apply the batch worker + worker := matchSliceWorker(ksg, setIndices) + matched := sequences.MakeISliceWorker( + worker, + false, + obidefault.ParallelWorkers(), + ) + + obiconvert.CLIWriteBioSequences(matched, true) + obiutils.WaitForLastPipe() + + return nil +} diff --git a/pkg/obitools/obik/obik.go b/pkg/obitools/obik/obik.go index 3b5c7bc..b268f30 100644 --- a/pkg/obitools/obik/obik.go +++ b/pkg/obitools/obik/obik.go @@ -67,4 +67,11 @@ func OptionSet(opt *getoptions.GetOpt) { obiconvert.OutputOptionSet(lowmaskCmd) LowMaskOptionSet(lowmaskCmd) lowmaskCmd.SetCommandFn(runLowmask) + + // match: annotate sequences with k-mer match positions from an index + matchCmd := opt.NewCommand("match", "Annotate sequences with k-mer match positions from an index") + obiconvert.InputOptionSet(matchCmd) + obiconvert.OutputOptionSet(matchCmd) + SetSelectionOptionSet(matchCmd) + matchCmd.SetCommandFn(runMatch) }