Correct Shannon entropy bias for canonical k-mers

Multiple raw k-mers collapsing into identical circular canonical forms introduce bias into complexity estimates. This change pre-computes `log(class_size)` tables and per-word-size maximum entropy bounds. The `KmerEntropy` function and `KmerEntropyFilter` are updated to apply the corrected formula `(log(N) + Σf·log(s) - Σf·log(f))/N / emax`, ensuring accurate sequence complexity estimation.
This commit is contained in:
Eric Coissac
2026-05-17 14:52:31 +08:00
parent cecf90fa40
commit af7ae3d60c
+75 -40
View File
@@ -4,22 +4,21 @@ import "math"
// KmerEntropy computes the entropy of a single encoded k-mer. // KmerEntropy computes the entropy of a single encoded k-mer.
// //
// The algorithm mirrors the lowmask entropy calculation: it decodes the k-mer // The algorithm mirrors the Rust obiskbuilder entropy: it decodes the k-mer
// to a DNA sequence, extracts all sub-words of each size from 1 to levelMax, // to a DNA sequence, extracts all sub-words of each size from 1 to levelMax,
// normalizes them by circular canonical form, counts their frequencies, and // normalizes them by circular canonical form, counts their frequencies, and
// computes Shannon entropy normalized by the maximum possible entropy. // computes Shannon entropy corrected for class sizes, normalized by the
// maximum possible entropy over 4^ws raw bins.
// The returned value is the minimum entropy across all word sizes. // The returned value is the minimum entropy across all word sizes.
// //
// Correction for small sequences: the raw entropy H = log(N) - Σ f·log(f)/N
// under-estimates the true complexity when many raw words collapse to the same
// canonical form. Adding Σ f·log(class_size)/N recovers the entropy of the
// underlying uncollapsed distribution (assuming uniform mixing within each
// equivalence class).
//
// A value close to 0 indicates very low complexity (e.g. "AAAA..."), // A value close to 0 indicates very low complexity (e.g. "AAAA..."),
// while a value close to 1 indicates high complexity. // while a value close to 1 indicates high complexity.
//
// Parameters:
// - kmer: the encoded k-mer (2 bits per base)
// - k: the k-mer size
// - levelMax: maximum sub-word size for entropy (typically 6)
//
// Returns:
// - minimum normalized entropy across all word sizes 1..levelMax
func KmerEntropy(kmer uint64, k int, levelMax int) float64 { func KmerEntropy(kmer uint64, k int, levelMax int) float64 {
if k < 1 || levelMax < 1 { if k < 1 || levelMax < 1 {
return 1.0 return 1.0
@@ -35,7 +34,7 @@ func KmerEntropy(kmer uint64, k int, levelMax int) float64 {
var seqBuf [32]byte var seqBuf [32]byte
seq := DecodeKmer(kmer, k, seqBuf[:]) seq := DecodeKmer(kmer, k, seqBuf[:])
// Pre-compute nLogN lookup (same as lowmask) // Pre-compute nLogN lookup
nLogN := make([]float64, k+1) nLogN := make([]float64, k+1)
for i := 1; i <= k; i++ { for i := 1; i <= k; i++ {
nLogN[i] = float64(i) * math.Log(float64(i)) nLogN[i] = float64(i) * math.Log(float64(i))
@@ -51,6 +50,23 @@ func KmerEntropy(kmer uint64, k int, levelMax int) float64 {
} }
} }
// Build ln(class_size) tables: for each canonical form, how many raw
// words map to it under circular normalization.
classLogSizeTables := make([][]float64, levelMax+1)
for ws := 1; ws <= levelMax; ws++ {
tableSize := 1 << (ws * 2)
classSize := make([]int, tableSize)
for code := 0; code < tableSize; code++ {
classSize[normTables[ws][code]]++
}
classLogSizeTables[ws] = make([]float64, tableSize)
for j := 0; j < tableSize; j++ {
if classSize[j] > 0 {
classLogSizeTables[ws][j] = math.Log(float64(classSize[j]))
}
}
}
minEntropy := math.MaxFloat64 minEntropy := math.MaxFloat64
for ws := 1; ws <= levelMax; ws++ { for ws := 1; ws <= levelMax; ws++ {
@@ -75,23 +91,13 @@ func KmerEntropy(kmer uint64, k int, levelMax int) float64 {
table[normWord]++ table[normWord]++
} }
// Compute Shannon entropy // Compute emax over 4^ws raw bins (uncollapsed distribution).
floatNwords := float64(nwords) floatNwords := float64(nwords)
logNwords := math.Log(floatNwords) logNwords := math.Log(floatNwords)
na := tableSize // 4^ws
var sumNLogN float64
for j := 0; j < tableSize; j++ {
n := table[j]
if n > 0 {
sumNLogN += nLogN[n]
}
}
// Compute emax (maximum possible entropy for this word size)
na := CanonicalCircularKmerCount(ws)
var emax float64 var emax float64
if nwords < na { if nwords < na {
emax = math.Log(float64(nwords)) emax = logNwords
} else { } else {
cov := nwords / na cov := nwords / na
remains := nwords - (na * cov) remains := nwords - (na * cov)
@@ -105,7 +111,19 @@ func KmerEntropy(kmer uint64, k int, levelMax int) float64 {
continue continue
} }
entropy := (logNwords - sumNLogN/floatNwords) / emax // Accumulate Σ f·log(f) and Σ f·log(class_size) over canonical forms.
classLogSize := classLogSizeTables[ws]
var sumNLogN, sumClassLogN float64
for j := 0; j < tableSize; j++ {
n := table[j]
if n > 0 {
sumNLogN += nLogN[n]
sumClassLogN += float64(n) * classLogSize[j]
}
}
// Corrected entropy: H_raw ≈ log(N) + (Σf·log(s) - Σf·log(f)) / N
entropy := (logNwords + sumClassLogN/floatNwords - sumNLogN/floatNwords) / emax
if entropy < 0 { if entropy < 0 {
entropy = 0 entropy = 0
} }
@@ -134,6 +152,7 @@ type KmerEntropyFilter struct {
threshold float64 threshold float64
nLogN []float64 nLogN []float64
normTables [][]int normTables [][]int
classLogSizeTables [][]float64
emaxValues []float64 emaxValues []float64
logNwords []float64 logNwords []float64
// Pre-allocated frequency tables reused across Entropy() calls. // Pre-allocated frequency tables reused across Entropy() calls.
@@ -142,11 +161,6 @@ type KmerEntropyFilter struct {
} }
// NewKmerEntropyFilter creates an entropy filter with pre-computed tables. // NewKmerEntropyFilter creates an entropy filter with pre-computed tables.
//
// Parameters:
// - k: the k-mer size
// - levelMax: maximum sub-word size for entropy (typically 6)
// - threshold: entropy threshold (k-mers with entropy <= threshold are rejected)
func NewKmerEntropyFilter(k, levelMax int, threshold float64) *KmerEntropyFilter { func NewKmerEntropyFilter(k, levelMax int, threshold float64) *KmerEntropyFilter {
if levelMax >= k { if levelMax >= k {
levelMax = k - 1 levelMax = k - 1
@@ -169,20 +183,38 @@ func NewKmerEntropyFilter(k, levelMax int, threshold float64) *KmerEntropyFilter
} }
} }
// ln(class_size) for each canonical form under circular normalization.
classLogSizeTables := make([][]float64, levelMax+1)
for ws := 1; ws <= levelMax; ws++ {
tableSize := 1 << (ws * 2)
classSize := make([]int, tableSize)
for code := 0; code < tableSize; code++ {
classSize[normTables[ws][code]]++
}
classLogSizeTables[ws] = make([]float64, tableSize)
for j := 0; j < tableSize; j++ {
if classSize[j] > 0 {
classLogSizeTables[ws][j] = math.Log(float64(classSize[j]))
}
}
}
// Pre-compute emax and logNwords per word size.
// emax uses 4^ws raw bins to match the corrected entropy.
emaxValues := make([]float64, levelMax+1) emaxValues := make([]float64, levelMax+1)
logNwords := make([]float64, levelMax+1) logNwords := make([]float64, levelMax+1)
for ws := 1; ws <= levelMax; ws++ { for ws := 1; ws <= levelMax; ws++ {
nw := k - ws + 1 nw := k - ws + 1
na := CanonicalCircularKmerCount(ws) na := 1 << (ws * 2) // 4^ws raw bins
floatNw := float64(nw)
logNwords[ws] = math.Log(floatNw)
if nw < na { if nw < na {
logNwords[ws] = math.Log(float64(nw)) emaxValues[ws] = logNwords[ws]
emaxValues[ws] = math.Log(float64(nw))
} else { } else {
cov := nw / na cov := nw / na
remains := nw - (na * cov) remains := nw - (na * cov)
f1 := float64(cov) / float64(nw) f1 := float64(cov) / floatNw
f2 := float64(cov+1) / float64(nw) f2 := float64(cov+1) / floatNw
logNwords[ws] = math.Log(float64(nw))
emaxValues[ws] = -(float64(na-remains)*f1*math.Log(f1) + emaxValues[ws] = -(float64(na-remains)*f1*math.Log(f1) +
float64(remains)*f2*math.Log(f2)) float64(remains)*f2*math.Log(f2))
} }
@@ -200,6 +232,7 @@ func NewKmerEntropyFilter(k, levelMax int, threshold float64) *KmerEntropyFilter
threshold: threshold, threshold: threshold,
nLogN: nLogN, nLogN: nLogN,
normTables: normTables, normTables: normTables,
classLogSizeTables: classLogSizeTables,
emaxValues: emaxValues, emaxValues: emaxValues,
logNwords: logNwords, logNwords: logNwords,
freqTables: freqTables, freqTables: freqTables,
@@ -236,7 +269,7 @@ func (ef *KmerEntropyFilter) Entropy(kmer uint64) float64 {
// Count circular-canonical sub-word frequencies // Count circular-canonical sub-word frequencies
tableSize := 1 << (ws * 2) tableSize := 1 << (ws * 2)
table := ef.freqTables[ws] table := ef.freqTables[ws]
clear(table) // reset to zero clear(table)
mask := (1 << (ws * 2)) - 1 mask := (1 << (ws * 2)) - 1
normTable := ef.normTables[ws] normTable := ef.normTables[ws]
@@ -251,19 +284,21 @@ func (ef *KmerEntropyFilter) Entropy(kmer uint64) float64 {
table[normWord]++ table[normWord]++
} }
// Compute Shannon entropy
floatNwords := float64(nwords) floatNwords := float64(nwords)
logNwords := ef.logNwords[ws] logNwords := ef.logNwords[ws]
classLogSize := ef.classLogSizeTables[ws]
var sumNLogN float64 var sumNLogN, sumClassLogN float64
for j := 0; j < tableSize; j++ { for j := 0; j < tableSize; j++ {
n := table[j] n := table[j]
if n > 0 { if n > 0 {
sumNLogN += ef.nLogN[n] sumNLogN += ef.nLogN[n]
sumClassLogN += float64(n) * classLogSize[j]
} }
} }
entropy := (logNwords - sumNLogN/floatNwords) / emax // Corrected entropy: H_raw ≈ log(N) + (Σf·log(s) - Σf·log(f)) / N
entropy := (logNwords + sumClassLogN/floatNwords - sumNLogN/floatNwords) / emax
if entropy < 0 { if entropy < 0 {
entropy = 0 entropy = 0
} }