diff --git a/pkg/obikmer/entropy.go b/pkg/obikmer/entropy.go new file mode 100644 index 0000000..94f8ab7 --- /dev/null +++ b/pkg/obikmer/entropy.go @@ -0,0 +1,281 @@ +package obikmer + +import "math" + +// KmerEntropy computes the entropy of a single encoded k-mer. +// +// The algorithm mirrors the lowmask entropy calculation: it decodes the k-mer +// to a DNA sequence, extracts all sub-words of each size from 1 to levelMax, +// normalizes them by circular canonical form, counts their frequencies, and +// computes Shannon entropy normalized by the maximum possible entropy. +// The returned value is the minimum entropy across all word sizes. +// +// A value close to 0 indicates very low complexity (e.g. "AAAA..."), +// while a value close to 1 indicates high complexity. +// +// Parameters: +// - kmer: the encoded k-mer (2 bits per base) +// - k: the k-mer size +// - levelMax: maximum sub-word size for entropy (typically 6) +// +// Returns: +// - minimum normalized entropy across all word sizes 1..levelMax +func KmerEntropy(kmer uint64, k int, levelMax int) float64 { + if k < 1 || levelMax < 1 { + return 1.0 + } + if levelMax >= k { + levelMax = k - 1 + } + if levelMax < 1 { + return 1.0 + } + + // Decode k-mer to DNA sequence + var seqBuf [32]byte + seq := DecodeKmer(kmer, k, seqBuf[:]) + + // Pre-compute nLogN lookup (same as lowmask) + nLogN := make([]float64, k+1) + for i := 1; i <= k; i++ { + nLogN[i] = float64(i) * math.Log(float64(i)) + } + + // Build circular-canonical normalization tables per word size + normTables := make([][]int, levelMax+1) + for ws := 1; ws <= levelMax; ws++ { + size := 1 << (ws * 2) + normTables[ws] = make([]int, size) + for code := 0; code < size; code++ { + normTables[ws][code] = int(NormalizeCircular(uint64(code), ws)) + } + } + + minEntropy := math.MaxFloat64 + + for ws := 1; ws <= levelMax; ws++ { + nwords := k - ws + 1 + if nwords < 1 { + continue + } + + // Count circular-canonical sub-word frequencies + tableSize := 1 << (ws * 2) + table := make([]int, tableSize) + mask := (1 << (ws * 2)) - 1 + + wordIndex := 0 + for i := 0; i < ws-1; i++ { + wordIndex = (wordIndex << 2) + int(EncodeNucleotide(seq[i])) + } + + for i, j := 0, ws-1; j < k; i, j = i+1, j+1 { + wordIndex = ((wordIndex << 2) & mask) + int(EncodeNucleotide(seq[j])) + normWord := normTables[ws][wordIndex] + table[normWord]++ + } + + // Compute Shannon entropy + floatNwords := float64(nwords) + logNwords := math.Log(floatNwords) + + var sumNLogN float64 + for j := 0; j < tableSize; j++ { + n := table[j] + if n > 0 { + sumNLogN += nLogN[n] + } + } + + // Compute emax (maximum possible entropy for this word size) + na := CanonicalCircularKmerCount(ws) + var emax float64 + if nwords < na { + emax = math.Log(float64(nwords)) + } else { + cov := nwords / na + remains := nwords - (na * cov) + f1 := float64(cov) / floatNwords + f2 := float64(cov+1) / floatNwords + emax = -(float64(na-remains)*f1*math.Log(f1) + + float64(remains)*f2*math.Log(f2)) + } + + if emax <= 0 { + continue + } + + entropy := (logNwords - sumNLogN/floatNwords) / emax + if entropy < 0 { + entropy = 0 + } + + if entropy < minEntropy { + minEntropy = entropy + } + } + + if minEntropy == math.MaxFloat64 { + return 1.0 + } + + return math.Round(minEntropy*10000) / 10000 +} + +// KmerEntropyFilter is a reusable entropy filter for batch processing. +// It pre-computes normalization tables and lookup values to avoid repeated +// allocation across millions of k-mers. +// +// IMPORTANT: a KmerEntropyFilter is NOT safe for concurrent use. +// Each goroutine must create its own instance via NewKmerEntropyFilter. +type KmerEntropyFilter struct { + k int + levelMax int + threshold float64 + nLogN []float64 + normTables [][]int + emaxValues []float64 + logNwords []float64 + // Pre-allocated frequency tables reused across Entropy() calls. + // One per word size (index 0 unused). Reset to zero before each use. + freqTables [][]int +} + +// NewKmerEntropyFilter creates an entropy filter with pre-computed tables. +// +// Parameters: +// - k: the k-mer size +// - levelMax: maximum sub-word size for entropy (typically 6) +// - threshold: entropy threshold (k-mers with entropy <= threshold are rejected) +func NewKmerEntropyFilter(k, levelMax int, threshold float64) *KmerEntropyFilter { + if levelMax >= k { + levelMax = k - 1 + } + if levelMax < 1 { + levelMax = 1 + } + + nLogN := make([]float64, k+1) + for i := 1; i <= k; i++ { + nLogN[i] = float64(i) * math.Log(float64(i)) + } + + normTables := make([][]int, levelMax+1) + for ws := 1; ws <= levelMax; ws++ { + size := 1 << (ws * 2) + normTables[ws] = make([]int, size) + for code := 0; code < size; code++ { + normTables[ws][code] = int(NormalizeCircular(uint64(code), ws)) + } + } + + emaxValues := make([]float64, levelMax+1) + logNwords := make([]float64, levelMax+1) + for ws := 1; ws <= levelMax; ws++ { + nw := k - ws + 1 + na := CanonicalCircularKmerCount(ws) + if nw < na { + logNwords[ws] = math.Log(float64(nw)) + emaxValues[ws] = math.Log(float64(nw)) + } else { + cov := nw / na + remains := nw - (na * cov) + f1 := float64(cov) / float64(nw) + f2 := float64(cov+1) / float64(nw) + logNwords[ws] = math.Log(float64(nw)) + emaxValues[ws] = -(float64(na-remains)*f1*math.Log(f1) + + float64(remains)*f2*math.Log(f2)) + } + } + + // Pre-allocate frequency tables per word size + freqTables := make([][]int, levelMax+1) + for ws := 1; ws <= levelMax; ws++ { + freqTables[ws] = make([]int, 1<<(ws*2)) + } + + return &KmerEntropyFilter{ + k: k, + levelMax: levelMax, + threshold: threshold, + nLogN: nLogN, + normTables: normTables, + emaxValues: emaxValues, + logNwords: logNwords, + freqTables: freqTables, + } +} + +// Accept returns true if the k-mer has entropy strictly above the threshold. +// Low-complexity k-mers (entropy <= threshold) are rejected. +func (ef *KmerEntropyFilter) Accept(kmer uint64) bool { + return ef.Entropy(kmer) > ef.threshold +} + +// Entropy computes the entropy for a single k-mer using pre-computed tables. +func (ef *KmerEntropyFilter) Entropy(kmer uint64) float64 { + k := ef.k + + // Decode k-mer to DNA sequence + var seqBuf [32]byte + seq := DecodeKmer(kmer, k, seqBuf[:]) + + minEntropy := math.MaxFloat64 + + for ws := 1; ws <= ef.levelMax; ws++ { + nwords := k - ws + 1 + if nwords < 1 { + continue + } + + emax := ef.emaxValues[ws] + if emax <= 0 { + continue + } + + // Count circular-canonical sub-word frequencies + tableSize := 1 << (ws * 2) + table := ef.freqTables[ws] + clear(table) // reset to zero + mask := (1 << (ws * 2)) - 1 + normTable := ef.normTables[ws] + + wordIndex := 0 + for i := 0; i < ws-1; i++ { + wordIndex = (wordIndex << 2) + int(EncodeNucleotide(seq[i])) + } + + for i, j := 0, ws-1; j < k; i, j = i+1, j+1 { + wordIndex = ((wordIndex << 2) & mask) + int(EncodeNucleotide(seq[j])) + normWord := normTable[wordIndex] + table[normWord]++ + } + + // Compute Shannon entropy + floatNwords := float64(nwords) + logNwords := ef.logNwords[ws] + + var sumNLogN float64 + for j := 0; j < tableSize; j++ { + n := table[j] + if n > 0 { + sumNLogN += ef.nLogN[n] + } + } + + entropy := (logNwords - sumNLogN/floatNwords) / emax + if entropy < 0 { + entropy = 0 + } + + if entropy < minEntropy { + minEntropy = entropy + } + } + + if minEntropy == math.MaxFloat64 { + return 1.0 + } + + return math.Round(minEntropy*10000) / 10000 +} diff --git a/pkg/obikmer/kmer_set_builder.go b/pkg/obikmer/kmer_set_builder.go index 565eaf5..c58efa6 100644 --- a/pkg/obikmer/kmer_set_builder.go +++ b/pkg/obikmer/kmer_set_builder.go @@ -5,20 +5,23 @@ import ( "math" "os" "path/filepath" - "runtime" - "sort" + "slices" "sync" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" + "github.com/schollz/progressbar/v3" ) // BuilderOption is a functional option for KmerSetGroupBuilder. type BuilderOption func(*builderConfig) type builderConfig struct { - minFreq int // 0 means no frequency filtering (simple dedup) - maxFreq int // 0 means no upper bound - saveFreqTopN int // >0 means save the N most frequent k-mers per set to CSV + minFreq int // 0 means no frequency filtering (simple dedup) + maxFreq int // 0 means no upper bound + saveFreqTopN int // >0 means save the N most frequent k-mers per set to CSV + entropyThreshold float64 // >0 means filter k-mers with entropy <= threshold + entropyLevelMax int // max sub-word size for entropy (typically 6) } // WithMinFrequency activates frequency filtering mode. @@ -45,6 +48,16 @@ func WithSaveFreqKmers(n int) BuilderOption { } } +// WithEntropyFilter activates entropy-based low-complexity filtering. +// K-mers with entropy <= threshold are discarded during finalization. +// levelMax is the maximum sub-word size for entropy computation (typically 6). +func WithEntropyFilter(threshold float64, levelMax int) BuilderOption { + return func(c *builderConfig) { + c.entropyThreshold = threshold + c.entropyLevelMax = levelMax + } +} + // KmerSetGroupBuilder constructs a KmerSetGroup on disk. // During construction, super-kmers are written to temporary .skm files // partitioned by minimizer. On Close(), each partition is finalized @@ -299,7 +312,17 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) { } } - // Process partitions in parallel + // ===================================================================== + // 2-stage pipeline: readers (pure I/O) → workers (CPU + write) + // + // - nReaders goroutines read .skm files (pure I/O, fast) + // - nWorkers goroutines extract k-mers, sort, dedup, filter, write .kdi + // + // One unbuffered channel between stages. Readers are truly I/O-bound + // (small files, buffered reads), workers are CPU-bound and stay busy. + // ===================================================================== + totalJobs := b.n * b.P + counts := make([][]uint64, b.n) spectra := make([][]map[int]uint64, b.n) var topKmers [][]*TopNKmers @@ -314,27 +337,71 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) { } } - nWorkers := runtime.NumCPU() - if nWorkers > b.P { - nWorkers = b.P + nCPU := obidefault.ParallelWorkers() + + // Stage sizing + nWorkers := nCPU // CPU-bound: one per core + nReaders := nCPU / 4 // pure I/O: few goroutines suffice + if nReaders < 2 { + nReaders = 2 + } + if nReaders > 4 { + nReaders = 4 + } + if nWorkers > totalJobs { + nWorkers = totalJobs + } + if nReaders > totalJobs { + nReaders = totalJobs } - type job struct { + var bar *progressbar.ProgressBar + if obidefault.ProgressBar() { + pbopt := []progressbar.Option{ + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionSetWidth(15), + progressbar.OptionShowCount(), + progressbar.OptionShowIts(), + progressbar.OptionSetPredictTime(true), + progressbar.OptionSetDescription("[Finalizing partitions]"), + } + bar = progressbar.NewOptions(totalJobs, pbopt...) + } + + // --- Channel types --- + type partitionData struct { + setIdx int + partIdx int + skmers []SuperKmer // raw super-kmers from I/O stage + } + + type readJob struct { setIdx int partIdx int } - jobs := make(chan job, b.n*b.P) - var wg sync.WaitGroup + dataCh := make(chan *partitionData) // unbuffered + readJobs := make(chan readJob, totalJobs) + var errMu sync.Mutex var firstErr error - for w := 0; w < nWorkers; w++ { - wg.Add(1) + // Fill job queue (buffered, all jobs pre-loaded) + for s := 0; s < b.n; s++ { + for p := 0; p < b.P; p++ { + readJobs <- readJob{s, p} + } + } + close(readJobs) + + // --- Stage 1: Readers (pure I/O) --- + var readWg sync.WaitGroup + for w := 0; w < nReaders; w++ { + readWg.Add(1) go func() { - defer wg.Done() - for j := range jobs { - partSpec, partTop, err := b.finalizePartition(j.setIdx, j.partIdx, &counts[j.setIdx][j.partIdx]) + defer readWg.Done() + for rj := range readJobs { + skmers, err := b.loadPartitionRaw(rj.setIdx, rj.partIdx) if err != nil { errMu.Lock() if firstErr == nil { @@ -342,21 +409,62 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) { } errMu.Unlock() } - spectra[j.setIdx][j.partIdx] = partSpec + dataCh <- &partitionData{rj.setIdx, rj.partIdx, skmers} + } + }() + } + + go func() { + readWg.Wait() + close(dataCh) + }() + + // --- Stage 2: Workers (CPU: extract k-mers + sort/filter + write .kdi) --- + var workWg sync.WaitGroup + for w := 0; w < nWorkers; w++ { + workWg.Add(1) + go func() { + defer workWg.Done() + for pd := range dataCh { + // CPU: extract canonical k-mers from super-kmers + kmers := extractCanonicalKmers(pd.skmers, b.k) + pd.skmers = nil // allow GC of raw super-kmers + + // CPU: sort, dedup, filter + filtered, spectrum, topN := b.sortFilterPartition(kmers) + kmers = nil // allow GC of unsorted data + + // I/O: write .kdi file + globalIdx := b.startIndex + pd.setIdx + kdiPath := filepath.Join(b.dir, + fmt.Sprintf("set_%d", globalIdx), + fmt.Sprintf("part_%04d.kdi", pd.partIdx)) + + n, err := b.writePartitionKdi(kdiPath, filtered) + if err != nil { + errMu.Lock() + if firstErr == nil { + firstErr = err + } + errMu.Unlock() + } + counts[pd.setIdx][pd.partIdx] = n + spectra[pd.setIdx][pd.partIdx] = spectrum if topKmers != nil { - topKmers[j.setIdx][j.partIdx] = partTop + topKmers[pd.setIdx][pd.partIdx] = topN + } + if bar != nil { + bar.Add(1) } } }() } - for s := 0; s < b.n; s++ { - for p := 0; p < b.P; p++ { - jobs <- job{s, p} - } + workWg.Wait() + + if bar != nil { + fmt.Fprintln(os.Stderr) } - close(jobs) - wg.Wait() if firstErr != nil { return nil, firstErr @@ -449,58 +557,89 @@ func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) { return ksg, nil } -// finalizePartition processes a single partition: load SKM, extract k-mers, -// sort, dedup/count, write KDI. Returns a partial frequency spectrum -// (frequency → count of distinct k-mers) computed before filtering, -// and optionally the top-N most frequent k-mers. -func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint64) (map[int]uint64, *TopNKmers, error) { - // setIdx is local (0..n-1); build dirs use local index, output dirs use global +// loadPartitionRaw reads a .skm file and returns raw super-kmers. +// This is pure I/O — no k-mer extraction is done here. +// Returns nil (not an error) if the .skm file is empty or missing. +func (b *KmerSetGroupBuilder) loadPartitionRaw(setIdx, partIdx int) ([]SuperKmer, error) { skmPath := filepath.Join(b.dir, ".build", fmt.Sprintf("set_%d", setIdx), fmt.Sprintf("part_%04d.skm", partIdx)) - globalIdx := b.startIndex + setIdx - kdiPath := filepath.Join(b.dir, - fmt.Sprintf("set_%d", globalIdx), - fmt.Sprintf("part_%04d.kdi", partIdx)) - - // Load super-kmers and extract canonical k-mers - reader, err := NewSkmReader(skmPath) + fi, err := os.Stat(skmPath) if err != nil { - // If file doesn't exist or is empty, write empty KDI - return nil, nil, b.writeEmptyKdi(kdiPath, count) + return nil, nil // empty partition, not an error } - var kmers []uint64 + reader, err := NewSkmReader(skmPath) + if err != nil { + return nil, nil + } + + // Estimate capacity from file size. Each super-kmer record is + // 2 bytes (length) + packed bases (~k/4 bytes), so roughly + // (2 + k/4) bytes per super-kmer on average. + avgRecordSize := 2 + b.k/4 + if avgRecordSize < 4 { + avgRecordSize = 4 + } + estCount := int(fi.Size()) / avgRecordSize + + skmers := make([]SuperKmer, 0, estCount) for { sk, ok := reader.Next() if !ok { break } - for kmer := range IterCanonicalKmers(sk.Sequence, b.k) { - kmers = append(kmers, kmer) - } + skmers = append(skmers, sk) } reader.Close() + return skmers, nil +} + +// extractCanonicalKmers extracts all canonical k-mers from a slice of super-kmers. +// This is CPU-bound work (sliding-window forward/reverse complement). +func extractCanonicalKmers(skmers []SuperKmer, k int) []uint64 { + // Pre-compute total capacity to avoid repeated slice growth. + // Each super-kmer of length L yields L-k+1 canonical k-mers. + total := 0 + for i := range skmers { + n := len(skmers[i].Sequence) - k + 1 + if n > 0 { + total += n + } + } + + kmers := make([]uint64, 0, total) + for _, sk := range skmers { + for kmer := range IterCanonicalKmers(sk.Sequence, k) { + kmers = append(kmers, kmer) + } + } + return kmers +} + +// sortFilterPartition sorts, deduplicates, and filters k-mers in memory (CPU-bound). +// Returns the filtered sorted slice, frequency spectrum, and optional top-N. +func (b *KmerSetGroupBuilder) sortFilterPartition(kmers []uint64) ([]uint64, map[int]uint64, *TopNKmers) { if len(kmers) == 0 { - return nil, nil, b.writeEmptyKdi(kdiPath, count) + return nil, nil, nil } - // Sort - sort.Slice(kmers, func(i, j int) bool { return kmers[i] < kmers[j] }) - - // Write KDI based on mode - w, err := NewKdiWriter(kdiPath) - if err != nil { - return nil, nil, err - } + // Sort (CPU-bound) — slices.Sort avoids reflection overhead of sort.Slice + slices.Sort(kmers) minFreq := b.config.minFreq if minFreq <= 0 { minFreq = 1 // simple dedup } - maxFreq := b.config.maxFreq // 0 means no upper bound + maxFreq := b.config.maxFreq + + // Prepare entropy filter if requested + var entropyFilter *KmerEntropyFilter + if b.config.entropyThreshold > 0 && b.config.entropyLevelMax > 0 { + entropyFilter = NewKmerEntropyFilter(b.k, b.config.entropyLevelMax, b.config.entropyThreshold) + } // Prepare top-N collector if requested var topN *TopNKmers @@ -508,8 +647,10 @@ func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint topN = NewTopNKmers(b.config.saveFreqTopN) } - // Linear scan: count consecutive identical values and accumulate spectrum + // Linear scan: count consecutive identical values, filter, accumulate spectrum partSpectrum := make(map[int]uint64) + filtered := make([]uint64, 0, len(kmers)/2) + i := 0 for i < len(kmers) { val := kmers[i] @@ -522,16 +663,33 @@ func (b *KmerSetGroupBuilder) finalizePartition(setIdx, partIdx int, count *uint topN.Add(val, c) } if c >= minFreq && (maxFreq <= 0 || c <= maxFreq) { - if err := w.Write(val); err != nil { - w.Close() - return nil, nil, err + if entropyFilter == nil || entropyFilter.Accept(val) { + filtered = append(filtered, val) } } i += c } - *count = w.Count() - return partSpectrum, topN, w.Close() + return filtered, partSpectrum, topN +} + +// writePartitionKdi writes a sorted slice of k-mers to a .kdi file (I/O-bound). +// Returns the number of k-mers written. +func (b *KmerSetGroupBuilder) writePartitionKdi(kdiPath string, kmers []uint64) (uint64, error) { + w, err := NewKdiWriter(kdiPath) + if err != nil { + return 0, err + } + + for _, val := range kmers { + if err := w.Write(val); err != nil { + w.Close() + return 0, err + } + } + + n := w.Count() + return n, w.Close() } func (b *KmerSetGroupBuilder) writeEmptyKdi(path string, count *uint64) error { diff --git a/pkg/obikmer/kmer_set_disk.go b/pkg/obikmer/kmer_set_disk.go index 3932fdc..7397fcf 100644 --- a/pkg/obikmer/kmer_set_disk.go +++ b/pkg/obikmer/kmer_set_disk.go @@ -128,6 +128,27 @@ func OpenKmerSetGroup(directory string) (*KmerSetGroup, error) { return ksg, nil } +// NewFilteredKmerSetGroup creates a KmerSetGroup from pre-computed data. +// Used by the filter command to construct a new group after filtering partitions. +func NewFilteredKmerSetGroup( + directory string, k, m, partitions, n int, + setsIDs []string, counts []uint64, + setsMetadata []map[string]interface{}, +) (*KmerSetGroup, error) { + ksg := &KmerSetGroup{ + path: directory, + k: k, + m: m, + partitions: partitions, + n: n, + setsIDs: setsIDs, + counts: counts, + setsMetadata: setsMetadata, + Metadata: make(map[string]interface{}), + } + return ksg, nil +} + // SaveMetadata writes the metadata.toml file. This is useful after // modifying attributes or IDs on an already-finalized index. func (ksg *KmerSetGroup) SaveMetadata() error { diff --git a/pkg/obitools/obik/filter.go b/pkg/obitools/obik/filter.go new file mode 100644 index 0000000..ee8aad6 --- /dev/null +++ b/pkg/obitools/obik/filter.go @@ -0,0 +1,344 @@ +package obik + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + + "github.com/schollz/progressbar/v3" + log "github.com/sirupsen/logrus" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer" + "github.com/DavidGamba/go-getoptions" +) + +// KmerFilter is a predicate applied to individual k-mers during filtering. +// Returns true if the k-mer should be kept. +type KmerFilter func(kmer uint64) bool + +// KmerFilterFactory creates a new KmerFilter instance. +// Each goroutine should call the factory to get its own filter, +// since some filters (e.g. KmerEntropyFilter) are not thread-safe. +type KmerFilterFactory func() KmerFilter + +// chainFilterFactories combines multiple KmerFilterFactory into one. +// The resulting factory creates a filter that accepts a k-mer only +// if all individual filters accept it. +func chainFilterFactories(factories []KmerFilterFactory) KmerFilterFactory { + switch len(factories) { + case 0: + return func() KmerFilter { return func(uint64) bool { return true } } + case 1: + return factories[0] + default: + return func() KmerFilter { + filters := make([]KmerFilter, len(factories)) + for i, f := range factories { + filters[i] = f() + } + return func(kmer uint64) bool { + for _, f := range filters { + if !f(kmer) { + return false + } + } + return true + } + } + } +} + +// runFilter implements the "obik filter" subcommand. +// It reads an existing kmer index, applies a chain of filters, +// and writes a new filtered index. +func runFilter(ctx context.Context, opt *getoptions.GetOpt, args []string) error { + if len(args) < 1 { + return fmt.Errorf("usage: obik filter [options] --out ") + } + + srcDir := args[0] + destDir := CLIOutputDirectory() + if destDir == "" || destDir == "-" { + return fmt.Errorf("--out option is required and must specify a destination directory") + } + + // Open source index + src, err := obikmer.OpenKmerSetGroup(srcDir) + if err != nil { + return fmt.Errorf("failed to open source index: %w", err) + } + + k := src.K() + + // Build filter factory chain from CLI options. + // Factories are used so each goroutine creates its own filter instance, + // since some filters (e.g. KmerEntropyFilter) have mutable state. + var factories []KmerFilterFactory + var filterDescriptions []string + + // Entropy filter + entropyThreshold := CLIIndexEntropyThreshold() + entropySize := CLIIndexEntropySize() + if entropyThreshold > 0 { + factories = append(factories, func() KmerFilter { + ef := obikmer.NewKmerEntropyFilter(k, entropySize, entropyThreshold) + return ef.Accept + }) + filterDescriptions = append(filterDescriptions, + fmt.Sprintf("entropy(threshold=%.4f, level-max=%d)", entropyThreshold, entropySize)) + } + + // Future filters will be added here, e.g.: + // quorumFilter, frequencyFilter, ... + + if len(factories) == 0 { + return fmt.Errorf("no filter specified; use --entropy-filter or other filter options") + } + + filterFactory := chainFilterFactories(factories) + + // Resolve set selection (default: all sets) + patterns := CLISetPatterns() + var setIndices []int + if len(patterns) > 0 { + setIndices, err = src.MatchSetIDs(patterns) + if err != nil { + return fmt.Errorf("failed to match set patterns: %w", err) + } + if len(setIndices) == 0 { + return fmt.Errorf("no sets match the given patterns") + } + } else { + setIndices = make([]int, src.Size()) + for i := range setIndices { + setIndices[i] = i + } + } + + log.Infof("Filtering %d set(s) from %s with: %s", + len(setIndices), srcDir, strings.Join(filterDescriptions, " + ")) + + // Create destination directory + if err := os.MkdirAll(destDir, 0755); err != nil { + return fmt.Errorf("failed to create destination: %w", err) + } + + P := src.Partitions() + + // Progress bar for partition filtering + totalPartitions := len(setIndices) * P + var bar *progressbar.ProgressBar + if obidefault.ProgressBar() { + pbopt := []progressbar.Option{ + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionSetWidth(15), + progressbar.OptionShowCount(), + progressbar.OptionShowIts(), + progressbar.OptionSetPredictTime(true), + progressbar.OptionSetDescription("[Filtering partitions]"), + } + bar = progressbar.NewOptions(totalPartitions, pbopt...) + } + + // Process each selected set + newCounts := make([]uint64, len(setIndices)) + + for si, srcIdx := range setIndices { + setID := src.SetIDOf(srcIdx) + if setID == "" { + setID = fmt.Sprintf("set_%d", srcIdx) + } + + destSetDir := filepath.Join(destDir, fmt.Sprintf("set_%d", si)) + if err := os.MkdirAll(destSetDir, 0755); err != nil { + return fmt.Errorf("failed to create set directory: %w", err) + } + + // Process partitions in parallel + nWorkers := obidefault.ParallelWorkers() + if nWorkers > P { + nWorkers = P + } + + var totalKept atomic.Uint64 + var totalProcessed atomic.Uint64 + + type job struct { + partIdx int + } + + jobs := make(chan job, P) + var wg sync.WaitGroup + var errMu sync.Mutex + var firstErr error + + for w := 0; w < nWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + // Each goroutine gets its own filter instance + workerFilter := filterFactory() + for j := range jobs { + kept, processed, err := filterPartition( + src.PartitionPath(srcIdx, j.partIdx), + filepath.Join(destSetDir, fmt.Sprintf("part_%04d.kdi", j.partIdx)), + workerFilter, + ) + if err != nil { + errMu.Lock() + if firstErr == nil { + firstErr = err + } + errMu.Unlock() + return + } + totalKept.Add(kept) + totalProcessed.Add(processed) + if bar != nil { + bar.Add(1) + } + } + }() + } + + for p := 0; p < P; p++ { + jobs <- job{p} + } + close(jobs) + wg.Wait() + + if firstErr != nil { + return fmt.Errorf("failed to filter set %q: %w", setID, firstErr) + } + + kept := totalKept.Load() + processed := totalProcessed.Load() + newCounts[si] = kept + log.Infof("Set %q: %d/%d k-mers kept (%.1f%% removed)", + setID, kept, processed, + 100.0*float64(processed-kept)/float64(max(processed, 1))) + + // Copy spectrum.bin if it exists + srcSpecPath := src.SpectrumPath(srcIdx) + if _, err := os.Stat(srcSpecPath); err == nil { + destSpecPath := filepath.Join(destSetDir, "spectrum.bin") + if err := copyFileHelper(srcSpecPath, destSpecPath); err != nil { + log.Warnf("Could not copy spectrum for set %q: %v", setID, err) + } + } + } + + if bar != nil { + fmt.Fprintln(os.Stderr) + } + + // Build destination metadata + setsIDs := make([]string, len(setIndices)) + setsMetadata := make([]map[string]interface{}, len(setIndices)) + for i, srcIdx := range setIndices { + setsIDs[i] = src.SetIDOf(srcIdx) + setsMetadata[i] = src.AllSetMetadata(srcIdx) + if setsMetadata[i] == nil { + setsMetadata[i] = make(map[string]interface{}) + } + } + + // Write metadata for the filtered index + dest, err := obikmer.NewFilteredKmerSetGroup( + destDir, k, src.M(), P, + len(setIndices), setsIDs, newCounts, setsMetadata, + ) + if err != nil { + return fmt.Errorf("failed to create filtered metadata: %w", err) + } + + // Copy group-level metadata and record applied filters + for key, value := range src.Metadata { + dest.SetAttribute(key, value) + } + if entropyThreshold > 0 { + dest.SetAttribute("entropy_filter", entropyThreshold) + dest.SetAttribute("entropy_filter_size", entropySize) + } + dest.SetAttribute("filtered_from", srcDir) + + if err := dest.SaveMetadata(); err != nil { + return fmt.Errorf("failed to save metadata: %w", err) + } + + log.Info("Done.") + return nil +} + +// filterPartition reads a single .kdi partition, applies the filter predicate, +// and writes the accepted k-mers to a new .kdi file. +// Returns (kept, processed, error). +func filterPartition(srcPath, destPath string, accept KmerFilter) (uint64, uint64, error) { + reader, err := obikmer.NewKdiReader(srcPath) + if err != nil { + // Empty partition — write empty KDI + w, err2 := obikmer.NewKdiWriter(destPath) + if err2 != nil { + return 0, 0, err2 + } + return 0, 0, w.Close() + } + defer reader.Close() + + w, err := obikmer.NewKdiWriter(destPath) + if err != nil { + return 0, 0, err + } + + var kept, processed uint64 + for { + kmer, ok := reader.Next() + if !ok { + break + } + processed++ + if accept(kmer) { + if err := w.Write(kmer); err != nil { + w.Close() + return 0, 0, err + } + kept++ + } + } + + return kept, processed, w.Close() +} + +// copyFileHelper copies a file (used for spectrum.bin etc.) +func copyFileHelper(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + buf := make([]byte, 32*1024) + for { + n, readErr := in.Read(buf) + if n > 0 { + if _, writeErr := out.Write(buf[:n]); writeErr != nil { + return writeErr + } + } + if readErr != nil { + break + } + } + return out.Close() +} diff --git a/pkg/obitools/obik/index.go b/pkg/obitools/obik/index.go index cdce3e7..3a1a813 100644 --- a/pkg/obitools/obik/index.go +++ b/pkg/obitools/obik/index.go @@ -33,6 +33,9 @@ func runIndex(ctx context.Context, opt *getoptions.GetOpt, args []string) error maxOcc := CLIMaxOccurrence() + entropyThreshold := CLIIndexEntropyThreshold() + entropySize := CLIIndexEntropySize() + // Build options var opts []obikmer.BuilderOption if minOcc > 1 { @@ -44,6 +47,9 @@ func runIndex(ctx context.Context, opt *getoptions.GetOpt, args []string) error if topN := CLISaveFreqKmer(); topN > 0 { opts = append(opts, obikmer.WithSaveFreqKmers(topN)) } + if entropyThreshold > 0 { + opts = append(opts, obikmer.WithEntropyFilter(entropyThreshold, entropySize)) + } // Determine whether to append to existing group or create new var builder *obikmer.KmerSetGroupBuilder @@ -115,6 +121,11 @@ func runIndex(ctx context.Context, opt *getoptions.GetOpt, args []string) error ksg.SetAttribute("max_occurrence", maxOcc) } + if entropyThreshold > 0 { + ksg.SetAttribute("entropy_filter", entropyThreshold) + ksg.SetAttribute("entropy_filter_size", entropySize) + } + if err := ksg.SaveMetadata(); err != nil { return fmt.Errorf("failed to save metadata: %w", err) } diff --git a/pkg/obitools/obik/obik.go b/pkg/obitools/obik/obik.go index b268f30..5fc0176 100644 --- a/pkg/obitools/obik/obik.go +++ b/pkg/obitools/obik/obik.go @@ -74,4 +74,11 @@ func OptionSet(opt *getoptions.GetOpt) { obiconvert.OutputOptionSet(matchCmd) SetSelectionOptionSet(matchCmd) matchCmd.SetCommandFn(runMatch) + + // filter: filter an index to remove low-complexity k-mers + filterCmd := opt.NewCommand("filter", "Filter a kmer index to remove low-complexity k-mers") + obiconvert.OutputModeOptionSet(filterCmd, false) + EntropyFilterOptionSet(filterCmd) + SetSelectionOptionSet(filterCmd) + filterCmd.SetCommandFn(runFilter) } diff --git a/pkg/obitools/obik/options.go b/pkg/obitools/obik/options.go index 62b8434..608ab76 100644 --- a/pkg/obitools/obik/options.go +++ b/pkg/obitools/obik/options.go @@ -105,6 +105,8 @@ var _minOccurrence = 1 var _maxOccurrence = 0 var _saveFullFilter = false var _saveFreqKmer = 0 +var _indexEntropyThreshold = 0.0 +var _indexEntropySize = 6 // KmerIndexOptionSet defines every option related to kmer index building. func KmerIndexOptionSet(options *getoptions.GetOpt) { @@ -133,6 +135,22 @@ func KmerIndexOptionSet(options *getoptions.GetOpt) { options.IntVar(&_saveFreqKmer, "save-freq-kmer", _saveFreqKmer, options.Description("Save the N most frequent k-mers per set to a CSV file (top_kmers.csv).")) + + options.Float64Var(&_indexEntropyThreshold, "entropy-filter", _indexEntropyThreshold, + options.Description("Filter low-complexity k-mers with entropy <= threshold (0 = disabled).")) + + options.IntVar(&_indexEntropySize, "entropy-filter-size", _indexEntropySize, + options.Description("Maximum word size for entropy filter computation (default 6).")) +} + +// EntropyFilterOptionSet registers entropy filter options for commands +// that process existing indices (e.g. filter). +func EntropyFilterOptionSet(options *getoptions.GetOpt) { + options.Float64Var(&_indexEntropyThreshold, "entropy-filter", _indexEntropyThreshold, + options.Description("Filter low-complexity k-mers with entropy <= threshold (0 = disabled).")) + + options.IntVar(&_indexEntropySize, "entropy-filter-size", _indexEntropySize, + options.Description("Maximum word size for entropy filter computation (default 6).")) } // ============================== @@ -262,6 +280,16 @@ func CLIKeepShorter() bool { return _keepShorter } +// CLIIndexEntropyThreshold returns the entropy filter threshold for index building (0 = disabled). +func CLIIndexEntropyThreshold() float64 { + return _indexEntropyThreshold +} + +// CLIIndexEntropySize returns the entropy filter word size for index building. +func CLIIndexEntropySize() int { + return _indexEntropySize +} + // OutputFormatOptionSet registers --json-output, --csv-output, --yaml-output. func OutputFormatOptionSet(options *getoptions.GetOpt) { options.BoolVar(&_jsonOutput, "json-output", false,