diff --git a/cmd/obitools/obireffamidx/main.go b/cmd/obitools/obireffamidx/main.go new file mode 100644 index 0000000..dea18c4 --- /dev/null +++ b/cmd/obitools/obireffamidx/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "os" + + log "github.com/sirupsen/logrus" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obirefidx" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" +) + +func main() { + optionParser := obioptions.GenerateOptionParser(obirefidx.OptionSet) + + _, args := optionParser(os.Args) + + fs, err := obiconvert.CLIReadBioSequences(args...) + + if err != nil { + log.Errorf("Cannot open file (%v)", err) + os.Exit(1) + } + + indexed := obirefidx.IndexFamilyDB(fs) + + obiconvert.CLIWriteBioSequences(indexed, true) + obiiter.WaitForLastPipe() + +} diff --git a/pkg/obiiter/speed.go b/pkg/obiiter/speed.go index 3bdf336..19dfd04 100644 --- a/pkg/obiiter/speed.go +++ b/pkg/obiiter/speed.go @@ -8,7 +8,7 @@ import ( "github.com/schollz/progressbar/v3" ) -func (iterator IBioSequence) Speed(message ...string) IBioSequence { +func (iterator IBioSequence) Speed(message string, size ...int) IBioSequence { // If the STDERR is redicted and doesn't end up to a terminal // No progress bar is printed. @@ -31,19 +31,15 @@ func (iterator IBioSequence) Speed(message ...string) IBioSequence { progressbar.OptionSetWidth(15), progressbar.OptionShowCount(), progressbar.OptionShowIts(), + progressbar.OptionSetDescription(message), ) - if len(message) > 0 { - pbopt = append(pbopt, - progressbar.OptionSetDescription(message[0]), - ) - } else { - pbopt = append(pbopt, - progressbar.OptionSetDescription("[Sequence Processing]"), - ) + n := -1 + if len(size) > 0 { + n = size[0] } - bar := progressbar.NewOptions(-1, pbopt...) + bar := progressbar.NewOptions(n, pbopt...) go func() { c := 0 @@ -68,13 +64,13 @@ func (iterator IBioSequence) Speed(message ...string) IBioSequence { if iterator.IsPaired() { newIter.MarkAsPaired() } - + return newIter } -func SpeedPipe(message ...string) Pipeable { +func SpeedPipe(message string) Pipeable { f := func(iterator IBioSequence) IBioSequence { - return iterator.Speed(message...) + return iterator.Speed(message) } return f diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index c58660b..14ad09d 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,7 +7,7 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "8a1ed26" +var _Commit = "f265a39" var _Version = "Release 4.2.0" // Version returns the version of the obitools package. diff --git a/pkg/obiseq/biosequenceslice.go b/pkg/obiseq/biosequenceslice.go index bb7b3cf..5e56ed5 100644 --- a/pkg/obiseq/biosequenceslice.go +++ b/pkg/obiseq/biosequenceslice.go @@ -195,3 +195,23 @@ func (s BioSequenceSlice) AttributeKeys(skip_map bool) obiutils.Set[string] { return keys } + +func (s *BioSequenceSlice) SortOnCount(reverse bool) { + slices.SortFunc(*s, func(a, b *BioSequence) int { + if reverse { + return b.Count() - a.Count() + } else { + return a.Count() - b.Count() + } + }) +} + +func (s *BioSequenceSlice) SortOnLength(reverse bool) { + slices.SortFunc(*s, func(a, b *BioSequence) int { + if reverse { + return b.Len() - a.Len() + } else { + return a.Len() - b.Len() + } + }) +} diff --git a/pkg/obitools/obicleandb/obicleandb.go b/pkg/obitools/obicleandb/obicleandb.go index 0ded75e..6d19643 100644 --- a/pkg/obitools/obicleandb/obicleandb.go +++ b/pkg/obitools/obicleandb/obicleandb.go @@ -279,7 +279,7 @@ func ICleanDB(itertator obiiter.IBioSequence) obiiter.IBioSequence { mannwithney := MakeSequenceFamilyGenusWorker(references) partof := obiiter.IBatchOver(references, - obioptions.CLIBatchSize()).Speed("Testing belonging to genus") + obioptions.CLIBatchSize()) // genera_iterator, err := obichunk.ISequenceChunk( // annotated, @@ -295,5 +295,5 @@ func ICleanDB(itertator obiiter.IBioSequence) obiiter.IBioSequence { // false, // ) - return partof.MakeIWorker(mannwithney, true) + return partof.MakeIWorker(mannwithney, true).Speed("Testing belonging", references.Len()) } diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index 22e7272..3489236 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -6,8 +6,8 @@ import ( "path/filepath" "strings" + "github.com/goombaio/orderedset" log "github.com/sirupsen/logrus" - "github.com/goombaio/orderedset" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" @@ -176,7 +176,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { } if CLIProgressBar() { - iterator = iterator.Speed() + iterator = iterator.Speed("Reading sequences") } return iterator, nil diff --git a/pkg/obitools/obicsv/obicsv.go b/pkg/obitools/obicsv/obicsv.go index 2014617..807fa03 100644 --- a/pkg/obitools/obicsv/obicsv.go +++ b/pkg/obitools/obicsv/obicsv.go @@ -13,7 +13,7 @@ func CLIWriteCSV(iterator obiiter.IBioSequence, terminalAction bool, filenames ...string) (obiiter.IBioSequence, error) { if obiconvert.CLIProgressBar() { - iterator = iterator.Speed() + iterator = iterator.Speed("Writing CSV") } var newIter obiiter.IBioSequence diff --git a/pkg/obitools/obirefidx/famlilyindexing.go b/pkg/obitools/obirefidx/famlilyindexing.go new file mode 100644 index 0000000..3e55f93 --- /dev/null +++ b/pkg/obitools/obirefidx/famlilyindexing.go @@ -0,0 +1,250 @@ +package obirefidx + +import ( + "fmt" + "math" + "os" + "sync" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obialign" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obichunk" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitax" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obifind" + "github.com/schollz/progressbar/v3" + log "github.com/sirupsen/logrus" +) + +func MakeStartClusterSliceWorker(clusterslot string, threshold float64) obiseq.SeqSliceWorker { + + clusteridslot := fmt.Sprintf("%s_clusterid", clusterslot) + clusterheadslot := fmt.Sprintf("%s_clusterhead", clusterslot) + cludteridentityslot := fmt.Sprintf("%s_clusteridentity", clusterslot) + clusternclusterslot := fmt.Sprintf("%s_cluster_n", clusterslot) + StartClusterSliceWorkers := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { + sequences.SortOnCount(true) + n := 0 + for i, seqA := range sequences { + if !seqA.HasAttribute(clusteridslot) { + n++ + seqA.SetAttribute(clusteridslot, seqA.Id()) + seqA.SetAttribute(clusterheadslot, true) + seqA.SetAttribute(cludteridentityslot, 1.0) + for j := i + 1; j < len(sequences); j++ { + seqB := sequences[j] + if !seqB.HasAttribute(clusteridslot) { + errmax := int(math.Ceil(float64(max(seqA.Len(), seqB.Len())) * (1.0 - threshold) * 2)) + lca, alilen := obialign.FastLCSScore(seqA, seqB, errmax, nil) + id := float64(lca) / float64(alilen) + + if lca >= 0 && id >= threshold { + seqB.SetAttribute(clusteridslot, seqA.Id()) + seqB.SetAttribute(clusterheadslot, false) + seqB.SetAttribute(cludteridentityslot, id) + } + } + } + } + + } + + log.Debugf("Clustered %d sequences into %d clusters", len(sequences), n) + + for _, seq := range sequences { + seq.SetAttribute(clusternclusterslot, n) + } + + return sequences, nil + } + + return StartClusterSliceWorkers +} + +func MakeIndexingSliceWorker(indexslot, idslot string, + kmers *[]*obikmer.Table4mer, + taxonomy *obitax.Taxonomy, +) obiseq.SeqSliceWorker { + IndexingSliceWorkers := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { + + kmercounts := make( + []*obikmer.Table4mer, + len(sequences)) + + taxa := make( + obitax.TaxonSet, + len(sequences)) + + for i, seq := range sequences { + j, ok := seq.GetIntAttribute(idslot) + + if !ok { + return nil, fmt.Errorf("sequence %s has no %s attribute", seq.Id(), idslot) + } + + kmercounts[i] = (*kmers)[j] + + taxa[i], _ = taxonomy.Taxon(seq.Taxid()) + + } + + limits := make(chan [2]int) + waiting := sync.WaitGroup{} + + go func() { + for i := 0; i < len(sequences); i += 10 { + limits <- [2]int{i, min(i+10, len(sequences))} + } + close(limits) + }() + + f := func() { + for l := range limits { + for i := l[0]; i < l[1]; i++ { + idx := IndexSequence(i, sequences, &kmercounts, &taxa, taxonomy) + sequences[i].SetAttribute(indexslot, idx) + } + } + + waiting.Done() + } + + nworkers := max(min(obioptions.CLIParallelWorkers(), len(sequences)/10), 1) + + waiting.Add(nworkers) + + for w := 0; w < nworkers; w++ { + go f() + } + + waiting.Wait() + + return sequences, nil + } + + return IndexingSliceWorkers +} + +func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { + log.Infoln("Family level reference database indexing...") + log.Infoln("Loading database...") + references := iterator.Load() + nref := len(references) + log.Infof("Done. Database contains %d sequences", nref) + + taxonomy, error := obifind.CLILoadSelectedTaxonomy() + if error != nil { + log.Panicln(error) + } + + log.Infoln("Indexing database kmers...") + + refcounts := make( + []*obikmer.Table4mer, + len(references)) + + buffer := make([]byte, 0, 1000) + + for i, seq := range references { + seq.SetAttribute("reffamidx_id", i) + refcounts[i] = obikmer.Count4Mer(seq, &buffer, nil) + } + + log.Info("done") + + partof := obiiter.IBatchOver(references, + obioptions.CLIBatchSize()).MakeIWorker(taxonomy.MakeSetSpeciesWorker(), + false, + obioptions.CLIParallelWorkers(), + ).MakeIWorker(taxonomy.MakeSetGenusWorker(), + false, + obioptions.CLIParallelWorkers(), + ).MakeIWorker(taxonomy.MakeSetFamilyWorker(), + false, + obioptions.CLIParallelWorkers(), + ) + + family_iterator, err := obichunk.ISequenceChunk( + partof, + obiseq.AnnotationClassifier("family_taxid", "NA"), + ) + + if err != nil { + log.Fatal(err) + } + + family_iterator.MakeISliceWorker( + MakeStartClusterSliceWorker("reffamidx", 0.9), + false, + obioptions.CLIParallelWorkers(), + ).MakeISliceWorker( + MakeIndexingSliceWorker("reffamidx_in", "reffamidx_id", &refcounts, taxonomy), + false, + obioptions.CLIParallelWorkers(), + ).Speed("Family Indexing", nref).Consume() + + clusters := obiseq.MakeBioSequenceSlice(0) + kcluster := make([]*obikmer.Table4mer, 0) + taxa := make(obitax.TaxonSet, 0) + + j := 0 + for i, seq := range references { + if is_centrer, _ := seq.GetBoolAttribute("reffamidx_clusterhead"); is_centrer { + clusters = append(clusters, seq) + kcluster = append(kcluster, refcounts[i]) + taxa[j], _ = taxonomy.Taxon(seq.Taxid()) + j++ + } + } + + log.Infof("Done. Found %d clusters", clusters.Len()) + + pbopt := make([]progressbar.Option, 0, 5) + pbopt = append(pbopt, + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionSetWidth(15), + progressbar.OptionShowCount(), + progressbar.OptionShowIts(), + progressbar.OptionSetDescription("Cluster indexing"), + ) + + bar := progressbar.NewOptions(len(clusters), pbopt...) + + limits := make(chan [2]int) + waiting := sync.WaitGroup{} + + go func() { + for i := 0; i < len(clusters); i += 10 { + limits <- [2]int{i, min(i+10, len(clusters))} + } + close(limits) + }() + + f := func() { + for l := range limits { + for i := l[0]; i < l[1]; i++ { + idx := IndexSequence(i, clusters, &kcluster, &taxa, taxonomy) + clusters[i].SetOBITagRefIndex(idx) + bar.Add(1) + } + } + + waiting.Done() + } + + nworkers := obioptions.CLIParallelWorkers() + waiting.Add(nworkers) + + for w := 0; w < nworkers; w++ { + go f() + } + + waiting.Wait() + + results := obiiter.IBatchOver(references, + obioptions.CLIBatchSize()).Speed("Writing db", nref) + + return results +} diff --git a/pkg/obiutils/cast_interface.go b/pkg/obiutils/cast_interface.go index 3835657..23c5d6a 100644 --- a/pkg/obiutils/cast_interface.go +++ b/pkg/obiutils/cast_interface.go @@ -42,6 +42,8 @@ func InterfaceToBool(i interface{}) (val bool, err error) { val = false switch t := i.(type) { + case bool: + val = t case int: val = t != 0 case int8: