Add max frequency filtering and top-kmer saving capabilities

This commit introduces max frequency filtering to limit k-mer occurrences and adds functionality to save the N most frequent k-mers per set to CSV files. It also includes the ability to output k-mer frequency spectra as CSV and updates the CLI options accordingly.
This commit is contained in:
Eric Coissac
2026-02-10 09:26:46 +01:00
parent 56c1f4180c
commit f2937af1ad
7 changed files with 530 additions and 12 deletions

View File

@@ -16,7 +16,9 @@ import (
type BuilderOption func(*builderConfig)
type builderConfig struct {
minFreq int // 0 means no frequency filtering (simple dedup)
minFreq int // 0 means no frequency filtering (simple dedup)
maxFreq int // 0 means no upper bound
saveFreqTopN int // >0 means save the N most frequent k-mers per set to CSV
}
// WithMinFrequency activates frequency filtering mode.
@@ -27,6 +29,22 @@ func WithMinFrequency(minFreq int) BuilderOption {
}
}
// WithMaxFrequency sets the upper frequency bound.
// Only k-mers seen <= maxFreq times are kept in the final index.
func WithMaxFrequency(maxFreq int) BuilderOption {
return func(c *builderConfig) {
c.maxFreq = maxFreq
}
}
// WithSaveFreqKmers saves the N most frequent k-mers per set to a CSV file
// (top_kmers.csv in each set directory).
func WithSaveFreqKmers(n int) BuilderOption {
return func(c *builderConfig) {
c.saveFreqTopN = n
}
}
// KmerSetGroupBuilder constructs a KmerSetGroup on disk.
// During construction, super-kmers are written to temporary .skm files
// partitioned by minimizer. On Close(), each partition is finalized
@@ -283,8 +301,17 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) {
// Process partitions in parallel
counts := make([][]uint64, b.n)
spectra := make([][]map[int]uint64, b.n)
var topKmers [][]*TopNKmers
for s := 0; s < b.n; s++ {
counts[s] = make([]uint64, b.P)
spectra[s] = make([]map[int]uint64, b.P)
}
if b.config.saveFreqTopN > 0 {
topKmers = make([][]*TopNKmers, b.n)
for s := 0; s < b.n; s++ {
topKmers[s] = make([]*TopNKmers, b.P)
}
}
nWorkers := runtime.NumCPU()
@@ -307,13 +334,18 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) {
go func() {
defer wg.Done()
for j := range jobs {
if err := b.finalizePartition(j.setIdx, j.partIdx, &counts[j.setIdx][j.partIdx]); err != nil {
partSpec, partTop, err := b.finalizePartition(j.setIdx, j.partIdx, &counts[j.setIdx][j.partIdx])
if err != nil {
errMu.Lock()
if firstErr == nil {
firstErr = err
}
errMu.Unlock()
}
spectra[j.setIdx][j.partIdx] = partSpec
if topKmers != nil {
topKmers[j.setIdx][j.partIdx] = partTop
}
}
}()
}
@@ -330,6 +362,41 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) {
return nil, firstErr
}
// Aggregate per-partition spectra into per-set spectra and write spectrum.bin
for s := 0; s < b.n; s++ {
globalIdx := b.startIndex + s
setSpectrum := make(map[int]uint64)
for p := 0; p < b.P; p++ {
if spectra[s][p] != nil {
MergeSpectraMaps(setSpectrum, spectra[s][p])
}
}
if len(setSpectrum) > 0 {
specPath := filepath.Join(b.dir, fmt.Sprintf("set_%d", globalIdx), "spectrum.bin")
if err := WriteSpectrum(specPath, MapToSpectrum(setSpectrum)); err != nil {
return nil, fmt.Errorf("obikmer: write spectrum set=%d: %w", globalIdx, err)
}
}
}
// Aggregate per-partition top-N k-mers and write CSV
if topKmers != nil {
for s := 0; s < b.n; s++ {
globalIdx := b.startIndex + s
merged := NewTopNKmers(b.config.saveFreqTopN)
for p := 0; p < b.P; p++ {
merged.MergeTopN(topKmers[s][p])
}
results := merged.Results()
if len(results) > 0 {
csvPath := filepath.Join(b.dir, fmt.Sprintf("set_%d", globalIdx), "top_kmers.csv")
if err := WriteTopKmersCSV(csvPath, results, b.k); err != nil {
return nil, fmt.Errorf("obikmer: write top kmers set=%d: %w", globalIdx, err)
}
}
}
}
// 3. Build KmerSetGroup and write metadata
newCounts := make([]uint64, b.n)
for s := 0; s < b.n; s++ {
@@ -383,8 +450,10 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) {
}
// finalizePartition processes a single partition: load SKM, extract k-mers,
// sort, dedup/count, write KDI.
func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint64) error {
// sort, dedup/count, write KDI. Returns a partial frequency spectrum
// (frequency → count of distinct k-mers) computed before filtering,
// and optionally the top-N most frequent k-mers.
func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint64) (map[int]uint64, *TopNKmers, error) {
// setIdx is local (0..n-1); build dirs use local index, output dirs use global
skmPath := filepath.Join(b.dir, ".build",
fmt.Sprintf("set_%d", setIdx),
@@ -399,7 +468,7 @@ func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint
reader, err := NewSkmReader(skmPath)
if err != nil {
// If file doesn't exist or is empty, write empty KDI
return b.writeEmptyKdi(kdiPath, count)
return nil, nil, b.writeEmptyKdi(kdiPath, count)
}
var kmers []uint64
@@ -415,7 +484,7 @@ func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint
reader.Close()
if len(kmers) == 0 {
return b.writeEmptyKdi(kdiPath, count)
return nil, nil, b.writeEmptyKdi(kdiPath, count)
}
// Sort
@@ -424,15 +493,23 @@ func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint
// Write KDI based on mode
w, err := NewKdiWriter(kdiPath)
if err != nil {
return err
return nil, nil, err
}
minFreq := b.config.minFreq
if minFreq <= 0 {
minFreq = 1 // simple dedup
}
maxFreq := b.config.maxFreq // 0 means no upper bound
// Linear scan: count consecutive identical values
// Prepare top-N collector if requested
var topN *TopNKmers
if b.config.saveFreqTopN > 0 {
topN = NewTopNKmers(b.config.saveFreqTopN)
}
// Linear scan: count consecutive identical values and accumulate spectrum
partSpectrum := make(map[int]uint64)
i := 0
for i < len(kmers) {
val := kmers[i]
@@ -440,17 +517,21 @@ func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint
for i+c < len(kmers) && kmers[i+c] == val {
c++
}
if c >= minFreq {
partSpectrum[c]++
if topN != nil {
topN.Add(val, c)
}
if c >= minFreq && (maxFreq <= 0 || c <= maxFreq) {
if err := w.Write(val); err != nil {
w.Close()
return err
return nil, nil, err
}
}
i += c
}
*count = w.Count()
return w.Close()
return partSpectrum, topN, w.Close()
}
func (b *KmerSetGroupBuilder) writeEmptyKdi(path string, count *uint64) error {

View File

@@ -706,6 +706,21 @@ func (ksg *KmerSetGroup) PartitionPath(setIndex, partIndex int) string {
return ksg.partitionPath(setIndex, partIndex)
}
// SpectrumPath returns the path to the spectrum.bin file for the given set.
func (ksg *KmerSetGroup) SpectrumPath(setIndex int) string {
return filepath.Join(ksg.path, fmt.Sprintf("set_%d", setIndex), "spectrum.bin")
}
// Spectrum reads the k-mer frequency spectrum for the given set.
// Returns nil, nil if no spectrum file exists.
func (ksg *KmerSetGroup) Spectrum(setIndex int) (*KmerSpectrum, error) {
path := ksg.SpectrumPath(setIndex)
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil, nil
}
return ReadSpectrum(path)
}
// IsCompatibleWith returns true if the other group has the same k, m, and partitions.
func (ksg *KmerSetGroup) IsCompatibleWith(other *KmerSetGroup) bool {
return ksg.k == other.k && ksg.m == other.m && ksg.partitions == other.partitions
@@ -847,6 +862,15 @@ func (ksg *KmerSetGroup) CopySetsByIDTo(ids []string, destDir string, force bool
}
}
// Copy spectrum.bin if it exists
srcSpecPath := ksg.SpectrumPath(srcIdx)
if _, err := os.Stat(srcSpecPath); err == nil {
destSpecPath := filepath.Join(destSetDir, "spectrum.bin")
if err := copyFile(srcSpecPath, destSpecPath); err != nil {
return nil, fmt.Errorf("obikmer: copy spectrum of set %q: %w", srcID, err)
}
}
// Update destination metadata
dest.setsIDs = append(dest.setsIDs, srcID)
dest.counts = append(dest.counts, ksg.counts[srcIdx])

253
pkg/obikmer/spectrum.go Normal file
View File

@@ -0,0 +1,253 @@
package obikmer
import (
"bufio"
"container/heap"
"encoding/csv"
"fmt"
"os"
"sort"
"strconv"
)
// KSP file magic bytes: "KSP\x01" (K-mer SPectrum v1)
var kspMagic = [4]byte{'K', 'S', 'P', 0x01}
// SpectrumEntry represents one entry in a k-mer frequency spectrum.
type SpectrumEntry struct {
Frequency int // how many times a k-mer was observed
Count uint64 // how many distinct k-mers have this frequency
}
// KmerSpectrum represents the frequency distribution of k-mers.
// Entries are sorted by Frequency in ascending order and only include
// non-zero counts.
type KmerSpectrum struct {
Entries []SpectrumEntry
}
// MaxFrequency returns the highest frequency in the spectrum, or 0 if empty.
func (s *KmerSpectrum) MaxFrequency() int {
if len(s.Entries) == 0 {
return 0
}
return s.Entries[len(s.Entries)-1].Frequency
}
// ToMap converts a KmerSpectrum back to a map for easy lookup.
func (s *KmerSpectrum) ToMap() map[int]uint64 {
m := make(map[int]uint64, len(s.Entries))
for _, e := range s.Entries {
m[e.Frequency] = e.Count
}
return m
}
// MapToSpectrum converts a map[int]uint64 to a sorted KmerSpectrum.
func MapToSpectrum(m map[int]uint64) *KmerSpectrum {
entries := make([]SpectrumEntry, 0, len(m))
for freq, count := range m {
if count > 0 {
entries = append(entries, SpectrumEntry{Frequency: freq, Count: count})
}
}
sort.Slice(entries, func(i, j int) bool {
return entries[i].Frequency < entries[j].Frequency
})
return &KmerSpectrum{Entries: entries}
}
// MergeSpectraMaps adds all entries from b into a.
func MergeSpectraMaps(a, b map[int]uint64) {
for freq, count := range b {
a[freq] += count
}
}
// WriteSpectrum writes a KmerSpectrum to a binary file.
//
// Format:
//
// [magic: 4 bytes "KSP\x01"]
// [n_entries: varint]
// For each entry (sorted by frequency ascending):
// [frequency: varint]
// [count: varint]
func WriteSpectrum(path string, spectrum *KmerSpectrum) error {
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("create spectrum file: %w", err)
}
w := bufio.NewWriterSize(f, 65536)
// Magic
if _, err := w.Write(kspMagic[:]); err != nil {
f.Close()
return err
}
// Number of entries
if _, err := EncodeVarint(w, uint64(len(spectrum.Entries))); err != nil {
f.Close()
return err
}
// Entries
for _, e := range spectrum.Entries {
if _, err := EncodeVarint(w, uint64(e.Frequency)); err != nil {
f.Close()
return err
}
if _, err := EncodeVarint(w, e.Count); err != nil {
f.Close()
return err
}
}
if err := w.Flush(); err != nil {
f.Close()
return err
}
return f.Close()
}
// ReadSpectrum reads a KmerSpectrum from a binary file.
func ReadSpectrum(path string) (*KmerSpectrum, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
r := bufio.NewReaderSize(f, 65536)
// Check magic
var magic [4]byte
if _, err := r.Read(magic[:]); err != nil {
return nil, fmt.Errorf("read spectrum magic: %w", err)
}
if magic != kspMagic {
return nil, fmt.Errorf("invalid spectrum file magic: %v", magic)
}
// Number of entries
nEntries, err := DecodeVarint(r)
if err != nil {
return nil, fmt.Errorf("read spectrum entry count: %w", err)
}
entries := make([]SpectrumEntry, nEntries)
for i := uint64(0); i < nEntries; i++ {
freq, err := DecodeVarint(r)
if err != nil {
return nil, fmt.Errorf("read spectrum freq at entry %d: %w", i, err)
}
count, err := DecodeVarint(r)
if err != nil {
return nil, fmt.Errorf("read spectrum count at entry %d: %w", i, err)
}
entries[i] = SpectrumEntry{
Frequency: int(freq),
Count: count,
}
}
return &KmerSpectrum{Entries: entries}, nil
}
// KmerFreq associates a k-mer (encoded as uint64) with its observed frequency.
type KmerFreq struct {
Kmer uint64
Freq int
}
// kmerFreqHeap is a min-heap of KmerFreq ordered by Freq (lowest first).
// Used to maintain a top-N most frequent k-mers set.
type kmerFreqHeap []KmerFreq
func (h kmerFreqHeap) Len() int { return len(h) }
func (h kmerFreqHeap) Less(i, j int) bool { return h[i].Freq < h[j].Freq }
func (h kmerFreqHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *kmerFreqHeap) Push(x interface{}) { *h = append(*h, x.(KmerFreq)) }
func (h *kmerFreqHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
// TopNKmers maintains a collection of the N most frequent k-mers
// using a min-heap. Thread-safe usage requires external synchronization.
type TopNKmers struct {
n int
h kmerFreqHeap
}
// NewTopNKmers creates a new top-N collector.
func NewTopNKmers(n int) *TopNKmers {
return &TopNKmers{
n: n,
h: make(kmerFreqHeap, 0, n+1),
}
}
// Add considers a k-mer with the given frequency for inclusion in the top-N.
func (t *TopNKmers) Add(kmer uint64, freq int) {
if t.n <= 0 {
return
}
if len(t.h) < t.n {
heap.Push(&t.h, KmerFreq{Kmer: kmer, Freq: freq})
} else if freq > t.h[0].Freq {
t.h[0] = KmerFreq{Kmer: kmer, Freq: freq}
heap.Fix(&t.h, 0)
}
}
// Results returns the collected k-mers sorted by frequency descending.
func (t *TopNKmers) Results() []KmerFreq {
result := make([]KmerFreq, len(t.h))
copy(result, t.h)
sort.Slice(result, func(i, j int) bool {
return result[i].Freq > result[j].Freq
})
return result
}
// MergeTopN merges another TopNKmers into this one.
func (t *TopNKmers) MergeTopN(other *TopNKmers) {
if other == nil {
return
}
for _, kf := range other.h {
t.Add(kf.Kmer, kf.Freq)
}
}
// WriteTopKmersCSV writes the top k-mers to a CSV file.
// Columns: sequence, frequency
func WriteTopKmersCSV(path string, topKmers []KmerFreq, k int) error {
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("create top-kmers file: %w", err)
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
if err := w.Write([]string{"sequence", "frequency"}); err != nil {
return err
}
buf := make([]byte, k)
for _, kf := range topKmers {
seq := DecodeKmer(kf.Kmer, k, buf)
if err := w.Write([]string{string(seq), strconv.Itoa(kf.Freq)}); err != nil {
return err
}
}
return nil
}

View File

@@ -31,11 +31,19 @@ func runIndex(ctx context.Context, opt *getoptions.GetOpt, args []string) error
return fmt.Errorf("invalid min-occurrence: %d (must be >= 1)", minOcc)
}
maxOcc := CLIMaxOccurrence()
// Build options
var opts []obikmer.BuilderOption
if minOcc > 1 {
opts = append(opts, obikmer.WithMinFrequency(minOcc))
}
if maxOcc > 0 {
opts = append(opts, obikmer.WithMaxFrequency(maxOcc))
}
if topN := CLISaveFreqKmer(); topN > 0 {
opts = append(opts, obikmer.WithSaveFreqKmers(topN))
}
// Determine whether to append to existing group or create new
var builder *obikmer.KmerSetGroupBuilder
@@ -50,7 +58,11 @@ func runIndex(ctx context.Context, opt *getoptions.GetOpt, args []string) error
}
} else {
// New group
log.Infof("Creating new kmer index: k=%d, m=%d, min-occurrence=%d", k, m, minOcc)
if maxOcc > 0 {
log.Infof("Creating new kmer index: k=%d, m=%d, occurrence=[%d,%d]", k, m, minOcc, maxOcc)
} else {
log.Infof("Creating new kmer index: k=%d, m=%d, min-occurrence=%d", k, m, minOcc)
}
builder, err = obikmer.NewKmerSetGroupBuilder(outDir, k, m, 1, -1, opts...)
if err != nil {
return fmt.Errorf("failed to create kmer index builder: %w", err)
@@ -99,6 +111,9 @@ func runIndex(ctx context.Context, opt *getoptions.GetOpt, args []string) error
if minOcc > 1 {
ksg.SetAttribute("min_occurrence", minOcc)
}
if maxOcc > 0 {
ksg.SetAttribute("max_occurrence", maxOcc)
}
if err := ksg.SaveMetadata(); err != nil {
return fmt.Errorf("failed to save metadata: %w", err)

View File

@@ -48,6 +48,12 @@ func OptionSet(opt *getoptions.GetOpt) {
SetSelectionOptionSet(rmCmd)
rmCmd.SetCommandFn(runRm)
// spectrum: output k-mer frequency spectrum as CSV
spectrumCmd := opt.NewCommand("spectrum", "Output k-mer frequency spectrum as CSV")
SetSelectionOptionSet(spectrumCmd)
obiconvert.OutputModeOptionSet(spectrumCmd, false)
spectrumCmd.SetCommandFn(runSpectrum)
// super: extract super k-mers from sequences
superCmd := opt.NewCommand("super", "Extract super k-mers from sequence files")
obiconvert.InputOptionSet(superCmd)

View File

@@ -38,7 +38,9 @@ var _indexId = ""
var _metadataFormat = "toml"
var _setTag = make(map[string]string, 0)
var _minOccurrence = 1
var _maxOccurrence = 0
var _saveFullFilter = false
var _saveFreqKmer = 0
// KmerIndexOptionSet defines every option related to kmer index building.
func KmerIndexOptionSet(options *getoptions.GetOpt) {
@@ -64,8 +66,14 @@ func KmerIndexOptionSet(options *getoptions.GetOpt) {
options.IntVar(&_minOccurrence, "min-occurrence", _minOccurrence,
options.Description("Minimum number of occurrences for a k-mer to be kept (default 1 = keep all)."))
options.IntVar(&_maxOccurrence, "max-occurrence", _maxOccurrence,
options.Description("Maximum number of occurrences for a k-mer to be kept (default 0 = no upper bound)."))
options.BoolVar(&_saveFullFilter, "save-full-filter", _saveFullFilter,
options.Description("When using --min-occurrence > 1, save the full frequency filter instead of just the filtered index."))
options.IntVar(&_saveFreqKmer, "save-freq-kmer", _saveFreqKmer,
options.Description("Save the N most frequent k-mers per set to a CSV file (top_kmers.csv)."))
}
// CLIKmerSize returns the k-mer size.
@@ -114,11 +122,21 @@ func CLIMinOccurrence() int {
return _minOccurrence
}
// CLIMaxOccurrence returns the maximum occurrence threshold (0 = no upper bound).
func CLIMaxOccurrence() int {
return _maxOccurrence
}
// CLISaveFullFilter returns whether to save the full frequency filter.
func CLISaveFullFilter() bool {
return _saveFullFilter
}
// CLISaveFreqKmer returns the number of top frequent k-mers to save (0 = disabled).
func CLISaveFreqKmer() int {
return _saveFreqKmer
}
// CLIOutputDirectory returns the output directory path.
func CLIOutputDirectory() string {
return obiconvert.CLIOutPutFileName()

View File

@@ -0,0 +1,121 @@
package obik
import (
"context"
"encoding/csv"
"fmt"
"os"
"strconv"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
"github.com/DavidGamba/go-getoptions"
)
// runSpectrum implements the "obik spectrum" subcommand.
// It outputs k-mer frequency spectra as CSV with one column per set.
func runSpectrum(ctx context.Context, opt *getoptions.GetOpt, args []string) error {
if len(args) < 1 {
return fmt.Errorf("usage: obik spectrum [options] <index_directory>")
}
ksg, err := obikmer.OpenKmerSetGroup(args[0])
if err != nil {
return fmt.Errorf("failed to open kmer index: %w", err)
}
// Determine which sets to include
patterns := CLISetPatterns()
var indices []int
if len(patterns) > 0 {
indices, err = ksg.MatchSetIDs(patterns)
if err != nil {
return fmt.Errorf("failed to match set patterns: %w", err)
}
if len(indices) == 0 {
return fmt.Errorf("no sets match the given patterns")
}
} else {
// All sets
indices = make([]int, ksg.Size())
for i := range indices {
indices[i] = i
}
}
// Read spectra for selected sets
spectraMaps := make([]map[int]uint64, len(indices))
maxFreq := 0
for i, idx := range indices {
spectrum, err := ksg.Spectrum(idx)
if err != nil {
return fmt.Errorf("failed to read spectrum for set %d: %w", idx, err)
}
if spectrum == nil {
log.Warnf("No spectrum data for set %d (%s)", idx, ksg.SetIDOf(idx))
spectraMaps[i] = make(map[int]uint64)
continue
}
spectraMaps[i] = spectrum.ToMap()
if mf := spectrum.MaxFrequency(); mf > maxFreq {
maxFreq = mf
}
}
if maxFreq == 0 {
return fmt.Errorf("no spectrum data found in any selected set")
}
// Determine output destination
outFile := obiconvert.CLIOutPutFileName()
var w *csv.Writer
if outFile == "" || outFile == "-" {
w = csv.NewWriter(os.Stdout)
} else {
f, err := os.Create(outFile)
if err != nil {
return fmt.Errorf("failed to create output file: %w", err)
}
defer f.Close()
w = csv.NewWriter(f)
}
defer w.Flush()
// Build header: frequency, set_id_1, set_id_2, ...
header := make([]string, 1+len(indices))
header[0] = "frequency"
for i, idx := range indices {
id := ksg.SetIDOf(idx)
if id == "" {
id = fmt.Sprintf("set_%d", idx)
}
header[i+1] = id
}
if err := w.Write(header); err != nil {
return err
}
// Write rows for each frequency from 1 to maxFreq
record := make([]string, 1+len(indices))
for freq := 1; freq <= maxFreq; freq++ {
record[0] = strconv.Itoa(freq)
hasData := false
for i := range indices {
count := spectraMaps[i][freq]
record[i+1] = strconv.FormatUint(count, 10)
if count > 0 {
hasData = true
}
}
// Only write rows where at least one set has a non-zero count
if hasData {
if err := w.Write(record); err != nil {
return err
}
}
}
return nil
}