2026-02-09 21:57:03 +01:00
|
|
|
package obikmer
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
|
|
|
|
"math"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
2026-02-10 18:19:57 +01:00
|
|
|
"slices"
|
2026-02-09 21:57:03 +01:00
|
|
|
"sync"
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
|
2026-02-09 21:57:03 +01:00
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
2026-02-10 18:19:57 +01:00
|
|
|
"github.com/schollz/progressbar/v3"
|
2026-02-09 21:57:03 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// BuilderOption is a functional option for KmerSetGroupBuilder.
|
|
|
|
|
type BuilderOption func(*builderConfig)
|
|
|
|
|
|
|
|
|
|
type builderConfig struct {
|
2026-02-10 18:19:57 +01:00
|
|
|
minFreq int // 0 means no frequency filtering (simple dedup)
|
|
|
|
|
maxFreq int // 0 means no upper bound
|
|
|
|
|
saveFreqTopN int // >0 means save the N most frequent k-mers per set to CSV
|
|
|
|
|
entropyThreshold float64 // >0 means filter k-mers with entropy <= threshold
|
|
|
|
|
entropyLevelMax int // max sub-word size for entropy (typically 6)
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithMinFrequency activates frequency filtering mode.
|
|
|
|
|
// Only k-mers seen >= minFreq times are kept in the final index.
|
|
|
|
|
func WithMinFrequency(minFreq int) BuilderOption {
|
|
|
|
|
return func(c *builderConfig) {
|
|
|
|
|
c.minFreq = minFreq
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 09:26:46 +01:00
|
|
|
// WithMaxFrequency sets the upper frequency bound.
|
|
|
|
|
// Only k-mers seen <= maxFreq times are kept in the final index.
|
|
|
|
|
func WithMaxFrequency(maxFreq int) BuilderOption {
|
|
|
|
|
return func(c *builderConfig) {
|
|
|
|
|
c.maxFreq = maxFreq
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithSaveFreqKmers saves the N most frequent k-mers per set to a CSV file
|
|
|
|
|
// (top_kmers.csv in each set directory).
|
|
|
|
|
func WithSaveFreqKmers(n int) BuilderOption {
|
|
|
|
|
return func(c *builderConfig) {
|
|
|
|
|
c.saveFreqTopN = n
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
// WithEntropyFilter activates entropy-based low-complexity filtering.
|
|
|
|
|
// K-mers with entropy <= threshold are discarded during finalization.
|
|
|
|
|
// levelMax is the maximum sub-word size for entropy computation (typically 6).
|
|
|
|
|
func WithEntropyFilter(threshold float64, levelMax int) BuilderOption {
|
|
|
|
|
return func(c *builderConfig) {
|
|
|
|
|
c.entropyThreshold = threshold
|
|
|
|
|
c.entropyLevelMax = levelMax
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-09 21:57:03 +01:00
|
|
|
// KmerSetGroupBuilder constructs a KmerSetGroup on disk.
|
|
|
|
|
// During construction, super-kmers are written to temporary .skm files
|
|
|
|
|
// partitioned by minimizer. On Close(), each partition is finalized
|
|
|
|
|
// (sort, dedup, optional frequency filter) into .kdi files.
|
|
|
|
|
type KmerSetGroupBuilder struct {
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
dir string
|
|
|
|
|
k int
|
|
|
|
|
m int
|
|
|
|
|
n int // number of NEW sets being built
|
|
|
|
|
P int // number of partitions
|
|
|
|
|
startIndex int // first set index (0 for new groups, existingN for appends)
|
|
|
|
|
config builderConfig
|
|
|
|
|
existing *KmerSetGroup // non-nil when appending to existing group
|
|
|
|
|
writers [][]*SkmWriter // [setIndex][partIndex] (local index 0..n-1)
|
|
|
|
|
mu [][]sync.Mutex // per-writer mutex for concurrent access
|
|
|
|
|
closed bool
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewKmerSetGroupBuilder creates a builder for a new KmerSetGroup.
|
|
|
|
|
//
|
|
|
|
|
// Parameters:
|
|
|
|
|
// - directory: destination directory (created if necessary)
|
|
|
|
|
// - k: k-mer size (1-31)
|
|
|
|
|
// - m: minimizer size (-1 for auto = ceil(k/2.5))
|
|
|
|
|
// - n: number of sets in the group
|
|
|
|
|
// - P: number of partitions (-1 for auto)
|
|
|
|
|
// - options: optional builder options (e.g. WithMinFrequency)
|
|
|
|
|
func NewKmerSetGroupBuilder(directory string, k, m, n, P int,
|
|
|
|
|
options ...BuilderOption) (*KmerSetGroupBuilder, error) {
|
|
|
|
|
|
|
|
|
|
if k < 2 || k > 31 {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: k must be between 2 and 31, got %d", k)
|
|
|
|
|
}
|
|
|
|
|
if n < 1 {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: n must be >= 1, got %d", n)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Auto minimizer size
|
|
|
|
|
if m < 0 {
|
|
|
|
|
m = int(math.Ceil(float64(k) / 2.5))
|
|
|
|
|
}
|
|
|
|
|
if m < 1 {
|
|
|
|
|
m = 1
|
|
|
|
|
}
|
|
|
|
|
if m >= k {
|
|
|
|
|
m = k - 1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Auto partition count
|
|
|
|
|
if P < 0 {
|
|
|
|
|
// Use 4^m as the maximum, capped at a reasonable value
|
|
|
|
|
maxP := 1 << (2 * m) // 4^m
|
|
|
|
|
P = maxP
|
|
|
|
|
if P > 4096 {
|
|
|
|
|
P = 4096
|
|
|
|
|
}
|
|
|
|
|
if P < 64 {
|
|
|
|
|
P = 64
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Apply options
|
|
|
|
|
var config builderConfig
|
|
|
|
|
for _, opt := range options {
|
|
|
|
|
opt(&config)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create build directory structure
|
|
|
|
|
buildDir := filepath.Join(directory, ".build")
|
|
|
|
|
for s := 0; s < n; s++ {
|
|
|
|
|
setDir := filepath.Join(buildDir, fmt.Sprintf("set_%d", s))
|
|
|
|
|
if err := os.MkdirAll(setDir, 0755); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: create build dir: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create SKM writers
|
|
|
|
|
writers := make([][]*SkmWriter, n)
|
|
|
|
|
mutexes := make([][]sync.Mutex, n)
|
|
|
|
|
for s := 0; s < n; s++ {
|
|
|
|
|
writers[s] = make([]*SkmWriter, P)
|
|
|
|
|
mutexes[s] = make([]sync.Mutex, P)
|
|
|
|
|
for p := 0; p < P; p++ {
|
|
|
|
|
path := filepath.Join(buildDir, fmt.Sprintf("set_%d", s),
|
|
|
|
|
fmt.Sprintf("part_%04d.skm", p))
|
|
|
|
|
w, err := NewSkmWriter(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Close already-created writers
|
|
|
|
|
for ss := 0; ss <= s; ss++ {
|
|
|
|
|
for pp := 0; pp < P; pp++ {
|
|
|
|
|
if writers[ss][pp] != nil {
|
|
|
|
|
writers[ss][pp].Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil, fmt.Errorf("obikmer: create skm writer: %w", err)
|
|
|
|
|
}
|
|
|
|
|
writers[s][p] = w
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &KmerSetGroupBuilder{
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
dir: directory,
|
|
|
|
|
k: k,
|
|
|
|
|
m: m,
|
|
|
|
|
n: n,
|
|
|
|
|
P: P,
|
|
|
|
|
startIndex: 0,
|
|
|
|
|
config: config,
|
|
|
|
|
writers: writers,
|
|
|
|
|
mu: mutexes,
|
2026-02-09 21:57:03 +01:00
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
// AppendKmerSetGroupBuilder opens an existing KmerSetGroup and creates
|
|
|
|
|
// a builder that adds n new sets starting from the existing set count.
|
|
|
|
|
// The k, m, and partitions are inherited from the existing group.
|
|
|
|
|
func AppendKmerSetGroupBuilder(directory string, n int, options ...BuilderOption) (*KmerSetGroupBuilder, error) {
|
|
|
|
|
existing, err := OpenKmerSetGroup(directory)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: open existing group: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if n < 1 {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: n must be >= 1, got %d", n)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
k := existing.K()
|
|
|
|
|
m := existing.M()
|
|
|
|
|
P := existing.Partitions()
|
|
|
|
|
startIndex := existing.Size()
|
|
|
|
|
|
|
|
|
|
var config builderConfig
|
|
|
|
|
for _, opt := range options {
|
|
|
|
|
opt(&config)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create build directory structure for new sets
|
|
|
|
|
buildDir := filepath.Join(directory, ".build")
|
|
|
|
|
for s := 0; s < n; s++ {
|
|
|
|
|
setDir := filepath.Join(buildDir, fmt.Sprintf("set_%d", s))
|
|
|
|
|
if err := os.MkdirAll(setDir, 0755); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: create build dir: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create SKM writers for new sets
|
|
|
|
|
writers := make([][]*SkmWriter, n)
|
|
|
|
|
mutexes := make([][]sync.Mutex, n)
|
|
|
|
|
for s := 0; s < n; s++ {
|
|
|
|
|
writers[s] = make([]*SkmWriter, P)
|
|
|
|
|
mutexes[s] = make([]sync.Mutex, P)
|
|
|
|
|
for p := 0; p < P; p++ {
|
|
|
|
|
path := filepath.Join(buildDir, fmt.Sprintf("set_%d", s),
|
|
|
|
|
fmt.Sprintf("part_%04d.skm", p))
|
|
|
|
|
w, err := NewSkmWriter(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
for ss := 0; ss <= s; ss++ {
|
|
|
|
|
for pp := 0; pp < P; pp++ {
|
|
|
|
|
if writers[ss][pp] != nil {
|
|
|
|
|
writers[ss][pp].Close()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil, fmt.Errorf("obikmer: create skm writer: %w", err)
|
|
|
|
|
}
|
|
|
|
|
writers[s][p] = w
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &KmerSetGroupBuilder{
|
|
|
|
|
dir: directory,
|
|
|
|
|
k: k,
|
|
|
|
|
m: m,
|
|
|
|
|
n: n,
|
|
|
|
|
P: P,
|
|
|
|
|
startIndex: startIndex,
|
|
|
|
|
config: config,
|
|
|
|
|
existing: existing,
|
|
|
|
|
writers: writers,
|
|
|
|
|
mu: mutexes,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// StartIndex returns the first global set index for the new sets being built.
|
|
|
|
|
// For new groups this is 0; for appends it is the existing group's Size().
|
|
|
|
|
func (b *KmerSetGroupBuilder) StartIndex() int {
|
|
|
|
|
return b.startIndex
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-09 21:57:03 +01:00
|
|
|
// AddSequence extracts super-kmers from a sequence and writes them
|
|
|
|
|
// to the appropriate partition files for the given set.
|
|
|
|
|
func (b *KmerSetGroupBuilder) AddSequence(setIndex int, seq *obiseq.BioSequence) {
|
|
|
|
|
if setIndex < 0 || setIndex >= b.n {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
rawSeq := seq.Sequence()
|
|
|
|
|
if len(rawSeq) < b.k {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for sk := range IterSuperKmers(rawSeq, b.k, b.m) {
|
|
|
|
|
part := int(sk.Minimizer % uint64(b.P))
|
|
|
|
|
b.mu[setIndex][part].Lock()
|
|
|
|
|
b.writers[setIndex][part].Write(sk)
|
|
|
|
|
b.mu[setIndex][part].Unlock()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// AddSuperKmer writes a single super-kmer to the appropriate partition.
|
|
|
|
|
func (b *KmerSetGroupBuilder) AddSuperKmer(setIndex int, sk SuperKmer) {
|
|
|
|
|
if setIndex < 0 || setIndex >= b.n {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
part := int(sk.Minimizer % uint64(b.P))
|
|
|
|
|
b.mu[setIndex][part].Lock()
|
|
|
|
|
b.writers[setIndex][part].Write(sk)
|
|
|
|
|
b.mu[setIndex][part].Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close finalizes the construction:
|
|
|
|
|
// 1. Flush and close all SKM writers
|
|
|
|
|
// 2. For each partition of each set (in parallel):
|
|
|
|
|
// - Load super-kmers from .skm
|
|
|
|
|
// - Extract canonical k-mers
|
|
|
|
|
// - Sort and deduplicate (count if frequency filter)
|
|
|
|
|
// - Write .kdi file
|
|
|
|
|
// 3. Write metadata.toml
|
|
|
|
|
// 4. Remove .build/ directory
|
|
|
|
|
//
|
|
|
|
|
// Returns the finalized KmerSetGroup in read-only mode.
|
|
|
|
|
func (b *KmerSetGroupBuilder) Close() (*KmerSetGroup, error) {
|
|
|
|
|
if b.closed {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: builder already closed")
|
|
|
|
|
}
|
|
|
|
|
b.closed = true
|
|
|
|
|
|
|
|
|
|
// 1. Close all SKM writers
|
|
|
|
|
for s := 0; s < b.n; s++ {
|
|
|
|
|
for p := 0; p < b.P; p++ {
|
|
|
|
|
if err := b.writers[s][p].Close(); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: close skm writer set=%d part=%d: %w", s, p, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
// 2. Create output directory structure for new sets
|
2026-02-09 21:57:03 +01:00
|
|
|
for s := 0; s < b.n; s++ {
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
globalIdx := b.startIndex + s
|
|
|
|
|
setDir := filepath.Join(b.dir, fmt.Sprintf("set_%d", globalIdx))
|
2026-02-09 21:57:03 +01:00
|
|
|
if err := os.MkdirAll(setDir, 0755); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: create set dir: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
// =====================================================================
|
|
|
|
|
// 2-stage pipeline: readers (pure I/O) → workers (CPU + write)
|
|
|
|
|
//
|
|
|
|
|
// - nReaders goroutines read .skm files (pure I/O, fast)
|
|
|
|
|
// - nWorkers goroutines extract k-mers, sort, dedup, filter, write .kdi
|
|
|
|
|
//
|
|
|
|
|
// One unbuffered channel between stages. Readers are truly I/O-bound
|
|
|
|
|
// (small files, buffered reads), workers are CPU-bound and stay busy.
|
|
|
|
|
// =====================================================================
|
|
|
|
|
totalJobs := b.n * b.P
|
|
|
|
|
|
2026-02-09 21:57:03 +01:00
|
|
|
counts := make([][]uint64, b.n)
|
2026-02-10 09:26:46 +01:00
|
|
|
spectra := make([][]map[int]uint64, b.n)
|
|
|
|
|
var topKmers [][]*TopNKmers
|
2026-02-09 21:57:03 +01:00
|
|
|
for s := 0; s < b.n; s++ {
|
|
|
|
|
counts[s] = make([]uint64, b.P)
|
2026-02-10 09:26:46 +01:00
|
|
|
spectra[s] = make([]map[int]uint64, b.P)
|
|
|
|
|
}
|
|
|
|
|
if b.config.saveFreqTopN > 0 {
|
|
|
|
|
topKmers = make([][]*TopNKmers, b.n)
|
|
|
|
|
for s := 0; s < b.n; s++ {
|
|
|
|
|
topKmers[s] = make([]*TopNKmers, b.P)
|
|
|
|
|
}
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
nCPU := obidefault.ParallelWorkers()
|
|
|
|
|
|
|
|
|
|
// Stage sizing
|
|
|
|
|
nWorkers := nCPU // CPU-bound: one per core
|
|
|
|
|
nReaders := nCPU / 4 // pure I/O: few goroutines suffice
|
|
|
|
|
if nReaders < 2 {
|
|
|
|
|
nReaders = 2
|
|
|
|
|
}
|
|
|
|
|
if nReaders > 4 {
|
|
|
|
|
nReaders = 4
|
|
|
|
|
}
|
|
|
|
|
if nWorkers > totalJobs {
|
|
|
|
|
nWorkers = totalJobs
|
|
|
|
|
}
|
|
|
|
|
if nReaders > totalJobs {
|
|
|
|
|
nReaders = totalJobs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var bar *progressbar.ProgressBar
|
|
|
|
|
if obidefault.ProgressBar() {
|
|
|
|
|
pbopt := []progressbar.Option{
|
|
|
|
|
progressbar.OptionSetWriter(os.Stderr),
|
|
|
|
|
progressbar.OptionSetWidth(15),
|
|
|
|
|
progressbar.OptionShowCount(),
|
|
|
|
|
progressbar.OptionShowIts(),
|
|
|
|
|
progressbar.OptionSetPredictTime(true),
|
|
|
|
|
progressbar.OptionSetDescription("[Finalizing partitions]"),
|
|
|
|
|
}
|
|
|
|
|
bar = progressbar.NewOptions(totalJobs, pbopt...)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --- Channel types ---
|
|
|
|
|
type partitionData struct {
|
|
|
|
|
setIdx int
|
|
|
|
|
partIdx int
|
|
|
|
|
skmers []SuperKmer // raw super-kmers from I/O stage
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
type readJob struct {
|
2026-02-09 21:57:03 +01:00
|
|
|
setIdx int
|
|
|
|
|
partIdx int
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
dataCh := make(chan *partitionData) // unbuffered
|
|
|
|
|
readJobs := make(chan readJob, totalJobs)
|
|
|
|
|
|
2026-02-09 21:57:03 +01:00
|
|
|
var errMu sync.Mutex
|
|
|
|
|
var firstErr error
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
// Fill job queue (buffered, all jobs pre-loaded)
|
|
|
|
|
for s := 0; s < b.n; s++ {
|
|
|
|
|
for p := 0; p < b.P; p++ {
|
|
|
|
|
readJobs <- readJob{s, p}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
close(readJobs)
|
|
|
|
|
|
|
|
|
|
// --- Stage 1: Readers (pure I/O) ---
|
|
|
|
|
var readWg sync.WaitGroup
|
|
|
|
|
for w := 0; w < nReaders; w++ {
|
|
|
|
|
readWg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer readWg.Done()
|
|
|
|
|
for rj := range readJobs {
|
|
|
|
|
skmers, err := b.loadPartitionRaw(rj.setIdx, rj.partIdx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
errMu.Lock()
|
|
|
|
|
if firstErr == nil {
|
|
|
|
|
firstErr = err
|
|
|
|
|
}
|
|
|
|
|
errMu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
dataCh <- &partitionData{rj.setIdx, rj.partIdx, skmers}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
readWg.Wait()
|
|
|
|
|
close(dataCh)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// --- Stage 2: Workers (CPU: extract k-mers + sort/filter + write .kdi) ---
|
|
|
|
|
var workWg sync.WaitGroup
|
2026-02-09 21:57:03 +01:00
|
|
|
for w := 0; w < nWorkers; w++ {
|
2026-02-10 18:19:57 +01:00
|
|
|
workWg.Add(1)
|
2026-02-09 21:57:03 +01:00
|
|
|
go func() {
|
2026-02-10 18:19:57 +01:00
|
|
|
defer workWg.Done()
|
|
|
|
|
for pd := range dataCh {
|
|
|
|
|
// CPU: extract canonical k-mers from super-kmers
|
|
|
|
|
kmers := extractCanonicalKmers(pd.skmers, b.k)
|
|
|
|
|
pd.skmers = nil // allow GC of raw super-kmers
|
|
|
|
|
|
|
|
|
|
// CPU: sort, dedup, filter
|
|
|
|
|
filtered, spectrum, topN := b.sortFilterPartition(kmers)
|
|
|
|
|
kmers = nil // allow GC of unsorted data
|
|
|
|
|
|
|
|
|
|
// I/O: write .kdi file
|
|
|
|
|
globalIdx := b.startIndex + pd.setIdx
|
|
|
|
|
kdiPath := filepath.Join(b.dir,
|
|
|
|
|
fmt.Sprintf("set_%d", globalIdx),
|
|
|
|
|
fmt.Sprintf("part_%04d.kdi", pd.partIdx))
|
|
|
|
|
|
|
|
|
|
n, err := b.writePartitionKdi(kdiPath, filtered)
|
2026-02-10 09:26:46 +01:00
|
|
|
if err != nil {
|
2026-02-09 21:57:03 +01:00
|
|
|
errMu.Lock()
|
|
|
|
|
if firstErr == nil {
|
|
|
|
|
firstErr = err
|
|
|
|
|
}
|
|
|
|
|
errMu.Unlock()
|
|
|
|
|
}
|
2026-02-10 18:19:57 +01:00
|
|
|
counts[pd.setIdx][pd.partIdx] = n
|
|
|
|
|
spectra[pd.setIdx][pd.partIdx] = spectrum
|
2026-02-10 09:26:46 +01:00
|
|
|
if topKmers != nil {
|
2026-02-10 18:19:57 +01:00
|
|
|
topKmers[pd.setIdx][pd.partIdx] = topN
|
|
|
|
|
}
|
|
|
|
|
if bar != nil {
|
|
|
|
|
bar.Add(1)
|
2026-02-10 09:26:46 +01:00
|
|
|
}
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
workWg.Wait()
|
|
|
|
|
|
|
|
|
|
if bar != nil {
|
|
|
|
|
fmt.Fprintln(os.Stderr)
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if firstErr != nil {
|
|
|
|
|
return nil, firstErr
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 09:26:46 +01:00
|
|
|
// Aggregate per-partition spectra into per-set spectra and write spectrum.bin
|
|
|
|
|
for s := 0; s < b.n; s++ {
|
|
|
|
|
globalIdx := b.startIndex + s
|
|
|
|
|
setSpectrum := make(map[int]uint64)
|
|
|
|
|
for p := 0; p < b.P; p++ {
|
|
|
|
|
if spectra[s][p] != nil {
|
|
|
|
|
MergeSpectraMaps(setSpectrum, spectra[s][p])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(setSpectrum) > 0 {
|
|
|
|
|
specPath := filepath.Join(b.dir, fmt.Sprintf("set_%d", globalIdx), "spectrum.bin")
|
|
|
|
|
if err := WriteSpectrum(specPath, MapToSpectrum(setSpectrum)); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: write spectrum set=%d: %w", globalIdx, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Aggregate per-partition top-N k-mers and write CSV
|
|
|
|
|
if topKmers != nil {
|
|
|
|
|
for s := 0; s < b.n; s++ {
|
|
|
|
|
globalIdx := b.startIndex + s
|
|
|
|
|
merged := NewTopNKmers(b.config.saveFreqTopN)
|
|
|
|
|
for p := 0; p < b.P; p++ {
|
|
|
|
|
merged.MergeTopN(topKmers[s][p])
|
|
|
|
|
}
|
|
|
|
|
results := merged.Results()
|
|
|
|
|
if len(results) > 0 {
|
|
|
|
|
csvPath := filepath.Join(b.dir, fmt.Sprintf("set_%d", globalIdx), "top_kmers.csv")
|
|
|
|
|
if err := WriteTopKmersCSV(csvPath, results, b.k); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: write top kmers set=%d: %w", globalIdx, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-09 21:57:03 +01:00
|
|
|
// 3. Build KmerSetGroup and write metadata
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
newCounts := make([]uint64, b.n)
|
2026-02-09 21:57:03 +01:00
|
|
|
for s := 0; s < b.n; s++ {
|
|
|
|
|
for p := 0; p < b.P; p++ {
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
newCounts[s] += counts[s][p]
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
Refactor k-mer index management with subcommands and enhanced metadata support
This commit refactors the k-mer index management tools to use a unified subcommand structure with obik, adds support for per-set metadata and ID management, enhances the k-mer set group builder to support appending to existing groups, and improves command-line option handling with a new global options registration system.
Key changes:
- Introduce obik command with subcommands (index, ls, summary, cp, mv, rm, super, lowmask)
- Add support for per-set metadata and ID management in kmer set groups
- Implement ability to append to existing kmer index groups
- Refactor option parsing to use a global options registration system
- Add new commands for listing, copying, moving, and removing sets
- Enhance low-complexity masking with new options and output formats
- Improve kmer index summary with Jaccard distance matrix support
- Remove deprecated obikindex and obisuperkmer commands
- Update build process to use the new subcommand structure
2026-02-09 23:10:30 +01:00
|
|
|
var ksg *KmerSetGroup
|
|
|
|
|
|
|
|
|
|
if b.existing != nil {
|
|
|
|
|
// Append mode: extend existing group
|
|
|
|
|
ksg = b.existing
|
|
|
|
|
ksg.n += b.n
|
|
|
|
|
ksg.setsIDs = append(ksg.setsIDs, make([]string, b.n)...)
|
|
|
|
|
ksg.counts = append(ksg.counts, newCounts...)
|
|
|
|
|
newMeta := make([]map[string]interface{}, b.n)
|
|
|
|
|
for i := range newMeta {
|
|
|
|
|
newMeta[i] = make(map[string]interface{})
|
|
|
|
|
}
|
|
|
|
|
ksg.setsMetadata = append(ksg.setsMetadata, newMeta...)
|
|
|
|
|
} else {
|
|
|
|
|
// New group
|
|
|
|
|
setsIDs := make([]string, b.n)
|
|
|
|
|
setsMetadata := make([]map[string]interface{}, b.n)
|
|
|
|
|
for i := range setsMetadata {
|
|
|
|
|
setsMetadata[i] = make(map[string]interface{})
|
|
|
|
|
}
|
|
|
|
|
ksg = &KmerSetGroup{
|
|
|
|
|
path: b.dir,
|
|
|
|
|
k: b.k,
|
|
|
|
|
m: b.m,
|
|
|
|
|
partitions: b.P,
|
|
|
|
|
n: b.n,
|
|
|
|
|
setsIDs: setsIDs,
|
|
|
|
|
counts: newCounts,
|
|
|
|
|
setsMetadata: setsMetadata,
|
|
|
|
|
Metadata: make(map[string]interface{}),
|
|
|
|
|
}
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := ksg.saveMetadata(); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("obikmer: write metadata: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 4. Remove .build/ directory
|
|
|
|
|
buildDir := filepath.Join(b.dir, ".build")
|
|
|
|
|
os.RemoveAll(buildDir)
|
|
|
|
|
|
|
|
|
|
return ksg, nil
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
// loadPartitionRaw reads a .skm file and returns raw super-kmers.
|
|
|
|
|
// This is pure I/O — no k-mer extraction is done here.
|
|
|
|
|
// Returns nil (not an error) if the .skm file is empty or missing.
|
|
|
|
|
func (b *KmerSetGroupBuilder) loadPartitionRaw(setIdx, partIdx int) ([]SuperKmer, error) {
|
2026-02-09 21:57:03 +01:00
|
|
|
skmPath := filepath.Join(b.dir, ".build",
|
|
|
|
|
fmt.Sprintf("set_%d", setIdx),
|
|
|
|
|
fmt.Sprintf("part_%04d.skm", partIdx))
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
fi, err := os.Stat(skmPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil // empty partition, not an error
|
|
|
|
|
}
|
2026-02-09 21:57:03 +01:00
|
|
|
|
|
|
|
|
reader, err := NewSkmReader(skmPath)
|
|
|
|
|
if err != nil {
|
2026-02-10 18:19:57 +01:00
|
|
|
return nil, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Estimate capacity from file size. Each super-kmer record is
|
|
|
|
|
// 2 bytes (length) + packed bases (~k/4 bytes), so roughly
|
|
|
|
|
// (2 + k/4) bytes per super-kmer on average.
|
|
|
|
|
avgRecordSize := 2 + b.k/4
|
|
|
|
|
if avgRecordSize < 4 {
|
|
|
|
|
avgRecordSize = 4
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
2026-02-10 18:19:57 +01:00
|
|
|
estCount := int(fi.Size()) / avgRecordSize
|
2026-02-09 21:57:03 +01:00
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
skmers := make([]SuperKmer, 0, estCount)
|
2026-02-09 21:57:03 +01:00
|
|
|
for {
|
|
|
|
|
sk, ok := reader.Next()
|
|
|
|
|
if !ok {
|
|
|
|
|
break
|
|
|
|
|
}
|
2026-02-10 18:19:57 +01:00
|
|
|
skmers = append(skmers, sk)
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
reader.Close()
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
return skmers, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// extractCanonicalKmers extracts all canonical k-mers from a slice of super-kmers.
|
|
|
|
|
// This is CPU-bound work (sliding-window forward/reverse complement).
|
|
|
|
|
func extractCanonicalKmers(skmers []SuperKmer, k int) []uint64 {
|
|
|
|
|
// Pre-compute total capacity to avoid repeated slice growth.
|
|
|
|
|
// Each super-kmer of length L yields L-k+1 canonical k-mers.
|
|
|
|
|
total := 0
|
|
|
|
|
for i := range skmers {
|
|
|
|
|
n := len(skmers[i].Sequence) - k + 1
|
|
|
|
|
if n > 0 {
|
|
|
|
|
total += n
|
|
|
|
|
}
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
kmers := make([]uint64, 0, total)
|
|
|
|
|
for _, sk := range skmers {
|
|
|
|
|
for kmer := range IterCanonicalKmers(sk.Sequence, k) {
|
|
|
|
|
kmers = append(kmers, kmer)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return kmers
|
|
|
|
|
}
|
2026-02-09 21:57:03 +01:00
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
// sortFilterPartition sorts, deduplicates, and filters k-mers in memory (CPU-bound).
|
|
|
|
|
// Returns the filtered sorted slice, frequency spectrum, and optional top-N.
|
|
|
|
|
func (b *KmerSetGroupBuilder) sortFilterPartition(kmers []uint64) ([]uint64, map[int]uint64, *TopNKmers) {
|
|
|
|
|
if len(kmers) == 0 {
|
|
|
|
|
return nil, nil, nil
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
// Sort (CPU-bound) — slices.Sort avoids reflection overhead of sort.Slice
|
|
|
|
|
slices.Sort(kmers)
|
|
|
|
|
|
2026-02-09 21:57:03 +01:00
|
|
|
minFreq := b.config.minFreq
|
|
|
|
|
if minFreq <= 0 {
|
|
|
|
|
minFreq = 1 // simple dedup
|
|
|
|
|
}
|
2026-02-10 18:19:57 +01:00
|
|
|
maxFreq := b.config.maxFreq
|
|
|
|
|
|
|
|
|
|
// Prepare entropy filter if requested
|
|
|
|
|
var entropyFilter *KmerEntropyFilter
|
|
|
|
|
if b.config.entropyThreshold > 0 && b.config.entropyLevelMax > 0 {
|
|
|
|
|
entropyFilter = NewKmerEntropyFilter(b.k, b.config.entropyLevelMax, b.config.entropyThreshold)
|
|
|
|
|
}
|
2026-02-09 21:57:03 +01:00
|
|
|
|
2026-02-10 09:26:46 +01:00
|
|
|
// Prepare top-N collector if requested
|
|
|
|
|
var topN *TopNKmers
|
|
|
|
|
if b.config.saveFreqTopN > 0 {
|
|
|
|
|
topN = NewTopNKmers(b.config.saveFreqTopN)
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
// Linear scan: count consecutive identical values, filter, accumulate spectrum
|
2026-02-10 09:26:46 +01:00
|
|
|
partSpectrum := make(map[int]uint64)
|
2026-02-10 18:19:57 +01:00
|
|
|
filtered := make([]uint64, 0, len(kmers)/2)
|
|
|
|
|
|
2026-02-09 21:57:03 +01:00
|
|
|
i := 0
|
|
|
|
|
for i < len(kmers) {
|
|
|
|
|
val := kmers[i]
|
|
|
|
|
c := 1
|
|
|
|
|
for i+c < len(kmers) && kmers[i+c] == val {
|
|
|
|
|
c++
|
|
|
|
|
}
|
2026-02-10 09:26:46 +01:00
|
|
|
partSpectrum[c]++
|
|
|
|
|
if topN != nil {
|
|
|
|
|
topN.Add(val, c)
|
|
|
|
|
}
|
|
|
|
|
if c >= minFreq && (maxFreq <= 0 || c <= maxFreq) {
|
2026-02-10 18:19:57 +01:00
|
|
|
if entropyFilter == nil || entropyFilter.Accept(val) {
|
|
|
|
|
filtered = append(filtered, val)
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
i += c
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-10 18:19:57 +01:00
|
|
|
return filtered, partSpectrum, topN
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// writePartitionKdi writes a sorted slice of k-mers to a .kdi file (I/O-bound).
|
|
|
|
|
// Returns the number of k-mers written.
|
|
|
|
|
func (b *KmerSetGroupBuilder) writePartitionKdi(kdiPath string, kmers []uint64) (uint64, error) {
|
|
|
|
|
w, err := NewKdiWriter(kdiPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, val := range kmers {
|
|
|
|
|
if err := w.Write(val); err != nil {
|
|
|
|
|
w.Close()
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
n := w.Count()
|
|
|
|
|
return n, w.Close()
|
2026-02-09 21:57:03 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *KmerSetGroupBuilder) writeEmptyKdi(path string, count *uint64) error {
|
|
|
|
|
w, err := NewKdiWriter(path)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
*count = 0
|
|
|
|
|
return w.Close()
|
|
|
|
|
}
|