Fisrt versin of the two levels indexing

Former-commit-id: 4d86483bc120e27cb6f5d2c216596d410274fc69
This commit is contained in:
Eric Coissac
2024-07-12 15:17:48 +02:00
parent 42c5881ddc
commit 4e4fac491f
9 changed files with 319 additions and 19 deletions

View File

@ -0,0 +1,32 @@
package main
import (
"os"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obirefidx"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
)
func main() {
optionParser := obioptions.GenerateOptionParser(obirefidx.OptionSet)
_, args := optionParser(os.Args)
fs, err := obiconvert.CLIReadBioSequences(args...)
if err != nil {
log.Errorf("Cannot open file (%v)", err)
os.Exit(1)
}
indexed := obirefidx.IndexFamilyDB(fs)
obiconvert.CLIWriteBioSequences(indexed, true)
obiiter.WaitForLastPipe()
}

View File

@ -8,7 +8,7 @@ import (
"github.com/schollz/progressbar/v3"
)
func (iterator IBioSequence) Speed(message ...string) IBioSequence {
func (iterator IBioSequence) Speed(message string, size ...int) IBioSequence {
// If the STDERR is redicted and doesn't end up to a terminal
// No progress bar is printed.
@ -31,19 +31,15 @@ func (iterator IBioSequence) Speed(message ...string) IBioSequence {
progressbar.OptionSetWidth(15),
progressbar.OptionShowCount(),
progressbar.OptionShowIts(),
progressbar.OptionSetDescription(message),
)
if len(message) > 0 {
pbopt = append(pbopt,
progressbar.OptionSetDescription(message[0]),
)
} else {
pbopt = append(pbopt,
progressbar.OptionSetDescription("[Sequence Processing]"),
)
n := -1
if len(size) > 0 {
n = size[0]
}
bar := progressbar.NewOptions(-1, pbopt...)
bar := progressbar.NewOptions(n, pbopt...)
go func() {
c := 0
@ -68,13 +64,13 @@ func (iterator IBioSequence) Speed(message ...string) IBioSequence {
if iterator.IsPaired() {
newIter.MarkAsPaired()
}
return newIter
}
func SpeedPipe(message ...string) Pipeable {
func SpeedPipe(message string) Pipeable {
f := func(iterator IBioSequence) IBioSequence {
return iterator.Speed(message...)
return iterator.Speed(message)
}
return f

View File

@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "8a1ed26"
var _Commit = "f265a39"
var _Version = "Release 4.2.0"
// Version returns the version of the obitools package.

View File

@ -195,3 +195,23 @@ func (s BioSequenceSlice) AttributeKeys(skip_map bool) obiutils.Set[string] {
return keys
}
func (s *BioSequenceSlice) SortOnCount(reverse bool) {
slices.SortFunc(*s, func(a, b *BioSequence) int {
if reverse {
return b.Count() - a.Count()
} else {
return a.Count() - b.Count()
}
})
}
func (s *BioSequenceSlice) SortOnLength(reverse bool) {
slices.SortFunc(*s, func(a, b *BioSequence) int {
if reverse {
return b.Len() - a.Len()
} else {
return a.Len() - b.Len()
}
})
}

View File

@ -279,7 +279,7 @@ func ICleanDB(itertator obiiter.IBioSequence) obiiter.IBioSequence {
mannwithney := MakeSequenceFamilyGenusWorker(references)
partof := obiiter.IBatchOver(references,
obioptions.CLIBatchSize()).Speed("Testing belonging to genus")
obioptions.CLIBatchSize())
// genera_iterator, err := obichunk.ISequenceChunk(
// annotated,
@ -295,5 +295,5 @@ func ICleanDB(itertator obiiter.IBioSequence) obiiter.IBioSequence {
// false,
// )
return partof.MakeIWorker(mannwithney, true)
return partof.MakeIWorker(mannwithney, true).Speed("Testing belonging", references.Len())
}

View File

@ -6,8 +6,8 @@ import (
"path/filepath"
"strings"
"github.com/goombaio/orderedset"
log "github.com/sirupsen/logrus"
"github.com/goombaio/orderedset"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
@ -176,7 +176,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) {
}
if CLIProgressBar() {
iterator = iterator.Speed()
iterator = iterator.Speed("Reading sequences")
}
return iterator, nil

View File

@ -13,7 +13,7 @@ func CLIWriteCSV(iterator obiiter.IBioSequence,
terminalAction bool, filenames ...string) (obiiter.IBioSequence, error) {
if obiconvert.CLIProgressBar() {
iterator = iterator.Speed()
iterator = iterator.Speed("Writing CSV")
}
var newIter obiiter.IBioSequence

View File

@ -0,0 +1,250 @@
package obirefidx
import (
"fmt"
"math"
"os"
"sync"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obialign"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obichunk"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitax"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obifind"
"github.com/schollz/progressbar/v3"
log "github.com/sirupsen/logrus"
)
func MakeStartClusterSliceWorker(clusterslot string, threshold float64) obiseq.SeqSliceWorker {
clusteridslot := fmt.Sprintf("%s_clusterid", clusterslot)
clusterheadslot := fmt.Sprintf("%s_clusterhead", clusterslot)
cludteridentityslot := fmt.Sprintf("%s_clusteridentity", clusterslot)
clusternclusterslot := fmt.Sprintf("%s_cluster_n", clusterslot)
StartClusterSliceWorkers := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) {
sequences.SortOnCount(true)
n := 0
for i, seqA := range sequences {
if !seqA.HasAttribute(clusteridslot) {
n++
seqA.SetAttribute(clusteridslot, seqA.Id())
seqA.SetAttribute(clusterheadslot, true)
seqA.SetAttribute(cludteridentityslot, 1.0)
for j := i + 1; j < len(sequences); j++ {
seqB := sequences[j]
if !seqB.HasAttribute(clusteridslot) {
errmax := int(math.Ceil(float64(max(seqA.Len(), seqB.Len())) * (1.0 - threshold) * 2))
lca, alilen := obialign.FastLCSScore(seqA, seqB, errmax, nil)
id := float64(lca) / float64(alilen)
if lca >= 0 && id >= threshold {
seqB.SetAttribute(clusteridslot, seqA.Id())
seqB.SetAttribute(clusterheadslot, false)
seqB.SetAttribute(cludteridentityslot, id)
}
}
}
}
}
log.Debugf("Clustered %d sequences into %d clusters", len(sequences), n)
for _, seq := range sequences {
seq.SetAttribute(clusternclusterslot, n)
}
return sequences, nil
}
return StartClusterSliceWorkers
}
func MakeIndexingSliceWorker(indexslot, idslot string,
kmers *[]*obikmer.Table4mer,
taxonomy *obitax.Taxonomy,
) obiseq.SeqSliceWorker {
IndexingSliceWorkers := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) {
kmercounts := make(
[]*obikmer.Table4mer,
len(sequences))
taxa := make(
obitax.TaxonSet,
len(sequences))
for i, seq := range sequences {
j, ok := seq.GetIntAttribute(idslot)
if !ok {
return nil, fmt.Errorf("sequence %s has no %s attribute", seq.Id(), idslot)
}
kmercounts[i] = (*kmers)[j]
taxa[i], _ = taxonomy.Taxon(seq.Taxid())
}
limits := make(chan [2]int)
waiting := sync.WaitGroup{}
go func() {
for i := 0; i < len(sequences); i += 10 {
limits <- [2]int{i, min(i+10, len(sequences))}
}
close(limits)
}()
f := func() {
for l := range limits {
for i := l[0]; i < l[1]; i++ {
idx := IndexSequence(i, sequences, &kmercounts, &taxa, taxonomy)
sequences[i].SetAttribute(indexslot, idx)
}
}
waiting.Done()
}
nworkers := max(min(obioptions.CLIParallelWorkers(), len(sequences)/10), 1)
waiting.Add(nworkers)
for w := 0; w < nworkers; w++ {
go f()
}
waiting.Wait()
return sequences, nil
}
return IndexingSliceWorkers
}
func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence {
log.Infoln("Family level reference database indexing...")
log.Infoln("Loading database...")
references := iterator.Load()
nref := len(references)
log.Infof("Done. Database contains %d sequences", nref)
taxonomy, error := obifind.CLILoadSelectedTaxonomy()
if error != nil {
log.Panicln(error)
}
log.Infoln("Indexing database kmers...")
refcounts := make(
[]*obikmer.Table4mer,
len(references))
buffer := make([]byte, 0, 1000)
for i, seq := range references {
seq.SetAttribute("reffamidx_id", i)
refcounts[i] = obikmer.Count4Mer(seq, &buffer, nil)
}
log.Info("done")
partof := obiiter.IBatchOver(references,
obioptions.CLIBatchSize()).MakeIWorker(taxonomy.MakeSetSpeciesWorker(),
false,
obioptions.CLIParallelWorkers(),
).MakeIWorker(taxonomy.MakeSetGenusWorker(),
false,
obioptions.CLIParallelWorkers(),
).MakeIWorker(taxonomy.MakeSetFamilyWorker(),
false,
obioptions.CLIParallelWorkers(),
)
family_iterator, err := obichunk.ISequenceChunk(
partof,
obiseq.AnnotationClassifier("family_taxid", "NA"),
)
if err != nil {
log.Fatal(err)
}
family_iterator.MakeISliceWorker(
MakeStartClusterSliceWorker("reffamidx", 0.9),
false,
obioptions.CLIParallelWorkers(),
).MakeISliceWorker(
MakeIndexingSliceWorker("reffamidx_in", "reffamidx_id", &refcounts, taxonomy),
false,
obioptions.CLIParallelWorkers(),
).Speed("Family Indexing", nref).Consume()
clusters := obiseq.MakeBioSequenceSlice(0)
kcluster := make([]*obikmer.Table4mer, 0)
taxa := make(obitax.TaxonSet, 0)
j := 0
for i, seq := range references {
if is_centrer, _ := seq.GetBoolAttribute("reffamidx_clusterhead"); is_centrer {
clusters = append(clusters, seq)
kcluster = append(kcluster, refcounts[i])
taxa[j], _ = taxonomy.Taxon(seq.Taxid())
j++
}
}
log.Infof("Done. Found %d clusters", clusters.Len())
pbopt := make([]progressbar.Option, 0, 5)
pbopt = append(pbopt,
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionSetWidth(15),
progressbar.OptionShowCount(),
progressbar.OptionShowIts(),
progressbar.OptionSetDescription("Cluster indexing"),
)
bar := progressbar.NewOptions(len(clusters), pbopt...)
limits := make(chan [2]int)
waiting := sync.WaitGroup{}
go func() {
for i := 0; i < len(clusters); i += 10 {
limits <- [2]int{i, min(i+10, len(clusters))}
}
close(limits)
}()
f := func() {
for l := range limits {
for i := l[0]; i < l[1]; i++ {
idx := IndexSequence(i, clusters, &kcluster, &taxa, taxonomy)
clusters[i].SetOBITagRefIndex(idx)
bar.Add(1)
}
}
waiting.Done()
}
nworkers := obioptions.CLIParallelWorkers()
waiting.Add(nworkers)
for w := 0; w < nworkers; w++ {
go f()
}
waiting.Wait()
results := obiiter.IBatchOver(references,
obioptions.CLIBatchSize()).Speed("Writing db", nref)
return results
}

View File

@ -42,6 +42,8 @@ func InterfaceToBool(i interface{}) (val bool, err error) {
val = false
switch t := i.(type) {
case bool:
val = t
case int:
val = t != 0
case int8: