diff --git a/Release-notes.md b/Release-notes.md index f6476c0..7317eac 100644 --- a/Release-notes.md +++ b/Release-notes.md @@ -2,13 +2,26 @@ ## Latest changes +### API Changes +- Two of the main class `obiseq.SeqWorker` and `obiseq.SeqWorker` have their declaration changed. + Both now return two values a `obiseq.BioSequenceSlice` and an `error`. This allow a worker to + return potentially several sequences as the result of the processing of a single sequence, or + zero, which is equivalent to filter out the input sequence. + ### Enhancement - The bug corrected in the parsing of EMBL and Genbank files as implemented in version 4.1.2 of OBITools4, potentially induced some reduction in the performance of the parsing. This should have been now fixed. -- In the same idea, parsing of genbank and EMBL files were reading and storing in memory not only the sequence +- In the same idea, parsing of genbank and EMBL files were reading and storing in memory not only + the sequence but also the annotations (features table). Up to now none of the obitools are using this information, but - with large complete genomes, it is occupying a lot of memory. To reduce this impact, the new version of the - parser doesn't any more store in memory the annotations by default. + with large complete genomes, it is occupying a lot of memory. To reduce this impact, the new version of + the parser doesn't any more store in memory the annotations by default. +- Add a **--taxonomic-path** to `obiannotate`. The option adds a `taxonomic_path` tag to sequences describing + the taxonomic classification of the sequence according to its taxid. The path is a string. Each level of the + path is delimited by a `|` character. A level consists of three parts separated by a `@`. The first part is the + taxid, the second the scientific name and the last the taxonomic rank. The first level described is always the + root of the taxonomy. The latest corresponds to the taxid of the sequence. If a sequence is not annotated by + a taxid, as usual the sequence is assumed having the taxid 1 (the root of the taxonomy). ## February 16th, 2024. Release 4.1.2 diff --git a/cmd/obitools/obicomplement/main.go b/cmd/obitools/obicomplement/main.go index 10669ff..252bfb5 100644 --- a/cmd/obitools/obicomplement/main.go +++ b/cmd/obitools/obicomplement/main.go @@ -24,7 +24,7 @@ func main() { os.Exit(1) } - comp := fs.MakeIWorker(obiseq.ReverseComplementWorker(true)) + comp := fs.MakeIWorker(obiseq.ReverseComplementWorker(true), true) obiconvert.CLIWriteBioSequences(comp, true) obiiter.WaitForLastPipe() diff --git a/pkg/obiapat/pcr.go b/pkg/obiapat/pcr.go index c75c307..67e415d 100644 --- a/pkg/obiapat/pcr.go +++ b/pkg/obiapat/pcr.go @@ -524,10 +524,10 @@ func PCRSlice(sequences obiseq.BioSequenceSlice, func PCRSliceWorker(options ...WithOption) obiseq.SeqSliceWorker { opt := MakeOptions(options) - worker := func(sequences obiseq.BioSequenceSlice) obiseq.BioSequenceSlice { + worker := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { result := _PCRSlice(sequences, opt) sequences.Recycle(true) - return result + return result, nil } return worker diff --git a/pkg/obicorazick/worker.go b/pkg/obicorazick/worker.go index 3c5f5d1..e454f0e 100644 --- a/pkg/obicorazick/worker.go +++ b/pkg/obicorazick/worker.go @@ -14,7 +14,7 @@ func AhoCorazickWorker(slot string, patterns []string) obiseq.SeqWorker { fslot := slot + "_Fwd" rslot := slot + "_Rev" - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { matchesF := len(matcher.FindAllByteSlice(s.Sequence())) matchesR := len(matcher.FindAllByteSlice(s.ReverseComplement(false).Sequence())) @@ -26,7 +26,7 @@ func AhoCorazickWorker(slot string, patterns []string) obiseq.SeqWorker { s.SetAttribute(rslot, matchesR) } - return s + return obiseq.BioSequenceSlice{s}, nil } return f diff --git a/pkg/obiformats/fastseq_header.go b/pkg/obiformats/fastseq_header.go index c231eec..7df46d1 100644 --- a/pkg/obiformats/fastseq_header.go +++ b/pkg/obiformats/fastseq_header.go @@ -31,5 +31,6 @@ func IParseFastSeqHeaderBatch(iterator obiiter.IBioSequence, options ...WithOption) obiiter.IBioSequence { opt := MakeOptions(options) return iterator.MakeIWorker(obiseq.AnnotatorToSeqWorker(opt.ParseFastSeqHeader()), + false, opt.ParallelWorkers()) } diff --git a/pkg/obiiter/workers.go b/pkg/obiiter/workers.go index 9961aad..00ed1db 100644 --- a/pkg/obiiter/workers.go +++ b/pkg/obiiter/workers.go @@ -15,7 +15,9 @@ import ( // Moreover the SeqWorker function, the method accepted two optional integer parameters. // - First is allowing to indicates the number of workers running in parallele (default 4) // - The second the size of the chanel buffer. By default set to the same value than the input buffer. -func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) IBioSequence { +func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, + breakOnError bool, + sizes ...int) IBioSequence { nworkers := obioptions.CLIParallelWorkers() if len(sizes) > 0 { @@ -32,11 +34,15 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) }() + sw := obiseq.SeqToSliceWorker(worker, true, breakOnError) + f := func(iterator IBioSequence) { + var err error for iterator.Next() { batch := iterator.Get() - for i, seq := range batch.slice { - batch.slice[i] = worker(seq) + batch.slice, err = sw(batch.slice) + if err != nil && breakOnError { + log.Fatalf("Error on sequence processing : %v", err) } newIter.Push(batch) } @@ -67,7 +73,7 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) // Return: // - newIter: A new IBioSequence iterator with the modified sequences. func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePredicate, - worker obiseq.SeqWorker, sizes ...int) IBioSequence { + worker obiseq.SeqWorker, breakOnError bool, sizes ...int) IBioSequence { nworkers := obioptions.CLIReadParallelWorkers() if len(sizes) > 0 { @@ -84,13 +90,15 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre }() + sw := obiseq.SeqToSliceConditionalWorker(predicate, worker, true, breakOnError) + f := func(iterator IBioSequence) { + var err error for iterator.Next() { batch := iterator.Get() - for i, seq := range batch.slice { - if predicate(batch.slice[i]) { - batch.slice[i] = worker(seq) - } + batch.slice, err = sw(batch.slice) + if err != nil && breakOnError { + log.Fatalf("Error on sequence processing : %v", err) } newIter.Push(batch) } @@ -120,7 +128,7 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre // provided, the default number of workers is used. // // The function returns a new IBioSequence containing the modified slices. -func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, sizes ...int) IBioSequence { +func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, breakOnError bool, sizes ...int) IBioSequence { nworkers := obioptions.CLIParallelWorkers() if len(sizes) > 0 { @@ -137,9 +145,13 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, size }() f := func(iterator IBioSequence) { + var err error for iterator.Next() { batch := iterator.Get() - batch.slice = worker(batch.slice) + batch.slice, err = worker(batch.slice) + if err != nil && breakOnError { + log.Fatalf("Error on sequence processing : %v", err) + } newIter.Push(batch) } newIter.Done() @@ -169,9 +181,9 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, size // // Return: // - f: A Pipeable object that represents the closure created by the WorkerPipe function. -func WorkerPipe(worker obiseq.SeqWorker, sizes ...int) Pipeable { +func WorkerPipe(worker obiseq.SeqWorker, breakOnError bool, sizes ...int) Pipeable { f := func(iterator IBioSequence) IBioSequence { - return iterator.MakeIWorker(worker, sizes...) + return iterator.MakeIWorker(worker, breakOnError, sizes...) } return f @@ -182,9 +194,9 @@ func WorkerPipe(worker obiseq.SeqWorker, sizes ...int) Pipeable { // The worker parameter is the SeqSliceWorker to be applied. // The sizes parameter is a variadic parameter representing the sizes of the slices. // The function returns a Pipeable function that applies the SeqSliceWorker to the iterator. -func SliceWorkerPipe(worker obiseq.SeqSliceWorker, sizes ...int) Pipeable { +func SliceWorkerPipe(worker obiseq.SeqSliceWorker, breakOnError bool, sizes ...int) Pipeable { f := func(iterator IBioSequence) IBioSequence { - return iterator.MakeISliceWorker(worker, sizes...) + return iterator.MakeISliceWorker(worker, breakOnError, sizes...) } return f diff --git a/pkg/obingslibrary/worker.go b/pkg/obingslibrary/worker.go index 60909aa..0ff065e 100644 --- a/pkg/obingslibrary/worker.go +++ b/pkg/obingslibrary/worker.go @@ -172,8 +172,8 @@ func ExtractBarcodeSliceWorker(ngslibrary NGSLibrary, ngslibrary.Compile(opt.AllowedMismatch(), opt.AllowsIndel()) - worker := func(sequences obiseq.BioSequenceSlice) obiseq.BioSequenceSlice { - return _ExtractBarcodeSlice(ngslibrary, sequences, opt) + worker := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { + return _ExtractBarcodeSlice(ngslibrary, sequences, opt), nil } return worker diff --git a/pkg/obiseq/eval.go b/pkg/obiseq/eval.go index a919142..c93238e 100644 --- a/pkg/obiseq/eval.go +++ b/pkg/obiseq/eval.go @@ -28,16 +28,18 @@ func Expression(expression string) func(*BioSequence) (interface{}, error) { func EditIdWorker(expression string) SeqWorker { e := Expression(expression) - f := func(sequence *BioSequence) *BioSequence { + f := func(sequence *BioSequence) (BioSequenceSlice, error) { v, err := e(sequence) - - if err != nil { - log.Fatalf("Expression '%s' cannot be evaluated on sequence %s", + if err == nil { + sequence.SetId(fmt.Sprintf("%v", v)) + } else { + err = fmt.Errorf("Expression '%s' cannot be evaluated on sequence %s : %v", expression, - sequence.Id()) + sequence.Id(), + err) } - sequence.SetId(fmt.Sprintf("%v", v)) - return sequence + + return BioSequenceSlice{sequence}, err } return f @@ -45,16 +47,18 @@ func EditIdWorker(expression string) SeqWorker { func EditAttributeWorker(key string, expression string) SeqWorker { e := Expression(expression) - f := func(sequence *BioSequence) *BioSequence { + f := func(sequence *BioSequence) (BioSequenceSlice, error) { v, err := e(sequence) - - if err != nil { - log.Fatalf("Expression '%s' cannot be evaluated on sequence %s", + if err == nil { + sequence.SetAttribute(key, v) + } else { + err = fmt.Errorf("Expression '%s' cannot be evaluated on sequence %s : %v", expression, - sequence.Id()) + sequence.Id(), + err) } - sequence.SetAttribute(key, v) - return sequence + + return BioSequenceSlice{sequence}, err } return f diff --git a/pkg/obiseq/revcomp.go b/pkg/obiseq/revcomp.go index 7530192..580ea40 100644 --- a/pkg/obiseq/revcomp.go +++ b/pkg/obiseq/revcomp.go @@ -105,8 +105,8 @@ func (sequence *BioSequence) _revcmpMutation() *BioSequence { * @returns {SeqWorker} A function that accepts *BioSequence and returns its reversed-complement form. */ func ReverseComplementWorker(inplace bool) SeqWorker { - f := func(input *BioSequence) *BioSequence { - return input.ReverseComplement(inplace) + f := func(input *BioSequence) (BioSequenceSlice, error) { + return BioSequenceSlice{input.ReverseComplement(inplace)}, nil } return f diff --git a/pkg/obiseq/worker.go b/pkg/obiseq/worker.go index a848706..df517d7 100644 --- a/pkg/obiseq/worker.go +++ b/pkg/obiseq/worker.go @@ -1,20 +1,25 @@ package obiseq -import log "github.com/sirupsen/logrus" +import ( + "fmt" + "slices" + + log "github.com/sirupsen/logrus" +) type SeqAnnotator func(*BioSequence) -type SeqWorker func(*BioSequence) *BioSequence -type SeqSliceWorker func(BioSequenceSlice) BioSequenceSlice +type SeqWorker func(*BioSequence) (BioSequenceSlice, error) +type SeqSliceWorker func(BioSequenceSlice) (BioSequenceSlice, error) -func NilSeqWorker(seq *BioSequence) *BioSequence { - return seq +func NilSeqWorker(seq *BioSequence) (BioSequenceSlice, error) { + return BioSequenceSlice{seq}, nil } func AnnotatorToSeqWorker(function SeqAnnotator) SeqWorker { - f := func(seq *BioSequence) *BioSequence { + f := func(seq *BioSequence) (BioSequenceSlice, error) { function(seq) - return seq + return BioSequenceSlice{seq}, nil } return f } @@ -25,35 +30,47 @@ func SeqToSliceWorker(worker SeqWorker, if worker == nil { if inplace { - f = func(input BioSequenceSlice) BioSequenceSlice { - return input + f = func(input BioSequenceSlice) (BioSequenceSlice, error) { + return input, nil } } else { - f = func(input BioSequenceSlice) BioSequenceSlice { + f = func(input BioSequenceSlice) (BioSequenceSlice, error) { output := MakeBioSequenceSlice(len(input)) copy(output, input) - return output + return output, nil } } } else { - f = func(input BioSequenceSlice) BioSequenceSlice { + f = func(input BioSequenceSlice) (BioSequenceSlice, error) { output := input if !inplace { output = MakeBioSequenceSlice(len(input)) } i := 0 for _, s := range input { - r := worker(s) - if r != nil { - output[i] = r - i++ - } else if breakOnError { - log.Fatalf("got an error on sequence %s processing", - r.Id()) + r, err := worker(s) + if err == nil { + for _, rs := range r { + output[i] = rs + i++ + if i == cap(output) { + slices.Grow(output, cap(output)) + } + } + + } else { + if breakOnError { + err = fmt.Errorf("got an error on sequence %s processing : %v", + s.Id(), err) + return BioSequenceSlice{}, err + } else { + log.Warnf("got an error on sequence %s processing", + s.Id()) + } } } - return output[0:i] + return output[0:i], nil } } @@ -61,15 +78,16 @@ func SeqToSliceWorker(worker SeqWorker, return f } -func SeqToSliceConditionalWorker(worker SeqWorker, +func SeqToSliceConditionalWorker( condition SequencePredicate, + worker SeqWorker, inplace, breakOnError bool) SeqSliceWorker { if condition == nil { return SeqToSliceWorker(worker, inplace, breakOnError) } - f := func(input BioSequenceSlice) BioSequenceSlice { + f := func(input BioSequenceSlice) (BioSequenceSlice, error) { output := input if !inplace { output = MakeBioSequenceSlice(len(input)) @@ -79,18 +97,29 @@ func SeqToSliceConditionalWorker(worker SeqWorker, for _, s := range input { if condition(s) { - r := worker(s) - if r != nil { - output[i] = r - i++ - } else if breakOnError { - log.Fatalf("got an error on sequence %s processing", - r.Id()) + r, err := worker(s) + if err == nil { + for _, rs := range r { + output[i] = rs + i++ + if i == cap(output) { + slices.Grow(output, cap(output)) + } + } + } else { + if breakOnError { + err = fmt.Errorf("got an error on sequence %s processing : %v", + s.Id(), err) + return BioSequenceSlice{}, err + } else { + log.Warnf("got an error on sequence %s processing", + s.Id()) + } } } } - return output[0:i] + return output[0:i], nil } return f @@ -105,11 +134,17 @@ func (worker SeqWorker) ChainWorkers(next SeqWorker) SeqWorker { } } - f := func(seq *BioSequence) *BioSequence { + sw := SeqToSliceWorker(next, true, false) + + f := func(seq *BioSequence) (BioSequenceSlice, error) { if seq == nil { - return nil + return BioSequenceSlice{}, nil } - return next(worker(seq)) + slice, err := worker(seq) + if err == nil { + slice, err = sw(slice) + } + return slice, err } return f diff --git a/pkg/obitax/lca.go b/pkg/obitax/lca.go index 7060203..55650d5 100644 --- a/pkg/obitax/lca.go +++ b/pkg/obitax/lca.go @@ -147,14 +147,14 @@ func AddLCAWorker(taxonomy *Taxonomy, slot_name string, threshold float64) obise lca_name = "scientific_name" } - f := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + f := func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { lca, rans, _ := taxonomy.LCA(sequence, threshold) sequence.SetAttribute(slot_name, lca.Taxid()) sequence.SetAttribute(lca_name, lca.ScientificName()) sequence.SetAttribute(lca_error, math.Round((1-rans)*1000)/1000) - return sequence + return obiseq.BioSequenceSlice{sequence}, nil } return f diff --git a/pkg/obitax/sequence_workers.go b/pkg/obitax/sequence_workers.go index 6f57acf..0e8524f 100644 --- a/pkg/obitax/sequence_workers.go +++ b/pkg/obitax/sequence_workers.go @@ -14,9 +14,9 @@ func (taxonomy *Taxonomy) MakeSetTaxonAtRankWorker(rank string) obiseq.SeqWorker taxonomy.RankList()) } - w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + w := func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { taxonomy.SetTaxonAtRank(sequence, rank) - return sequence + return obiseq.BioSequenceSlice{sequence}, nil } return w @@ -24,9 +24,9 @@ func (taxonomy *Taxonomy) MakeSetTaxonAtRankWorker(rank string) obiseq.SeqWorker func (taxonomy *Taxonomy) MakeSetSpeciesWorker() obiseq.SeqWorker { - w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + w := func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { taxonomy.SetSpecies(sequence) - return sequence + return obiseq.BioSequenceSlice{sequence}, nil } return w @@ -34,9 +34,9 @@ func (taxonomy *Taxonomy) MakeSetSpeciesWorker() obiseq.SeqWorker { func (taxonomy *Taxonomy) MakeSetGenusWorker() obiseq.SeqWorker { - w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + w := func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { taxonomy.SetGenus(sequence) - return sequence + return obiseq.BioSequenceSlice{sequence}, nil } return w @@ -44,9 +44,9 @@ func (taxonomy *Taxonomy) MakeSetGenusWorker() obiseq.SeqWorker { func (taxonomy *Taxonomy) MakeSetFamilyWorker() obiseq.SeqWorker { - w := func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + w := func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { taxonomy.SetFamily(sequence) - return sequence + return obiseq.BioSequenceSlice{sequence}, nil } return w @@ -54,9 +54,9 @@ func (taxonomy *Taxonomy) MakeSetFamilyWorker() obiseq.SeqWorker { func (taxonomy *Taxonomy) MakeSetPathWorker() obiseq.SeqWorker { - w := func(s *obiseq.BioSequence) *obiseq.BioSequence { - taxonomy.SetPath(s) - return s + w := func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { + taxonomy.SetPath(sequence) + return obiseq.BioSequenceSlice{sequence}, nil } return w diff --git a/pkg/obitools/obiannotate/obiannotate.go b/pkg/obitools/obiannotate/obiannotate.go index e1a5846..274896f 100644 --- a/pkg/obitools/obiannotate/obiannotate.go +++ b/pkg/obitools/obiannotate/obiannotate.go @@ -15,11 +15,11 @@ import ( ) func DeleteAttributesWorker(toBeDeleted []string) obiseq.SeqWorker { - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { for _, k := range toBeDeleted { s.DeleteAttribute(k) } - return s + return obiseq.BioSequenceSlice{s}, nil } return f @@ -48,7 +48,7 @@ func MatchPatternWorker(pattern, name string, errormax int, allowsIndel bool) ob slot_error := fmt.Sprintf("%s_error", name) slot_location := fmt.Sprintf("%s_location", name) - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { apats, err := obiapat.MakeApatSequence(s, false) if err != nil { log.Fatalf("error in preparing sequence %s : %v", s.Id(), err) @@ -59,6 +59,11 @@ func MatchPatternWorker(pattern, name string, errormax int, allowsIndel bool) ob if matched { annot := s.Annotations() annot[slot] = pattern + + if start < 0 { + start = 0 + } + match, err := s.Subsequence(start, end, false) if err != nil { log.Fatalf("Error in extracting pattern of sequence %s [%d;%d[ : %v", @@ -83,7 +88,7 @@ func MatchPatternWorker(pattern, name string, errormax int, allowsIndel bool) ob annot[slot_location] = fmt.Sprintf("complement(%d..%d)", start+1, end) } } - return s + return obiseq.BioSequenceSlice{s}, nil } return f @@ -97,14 +102,14 @@ func ToBeKeptAttributesWorker(toBeKept []string) obiseq.SeqWorker { d[v] = true } - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { annot := s.Annotations() for key := range annot { if _, ok := d[key]; !ok { s.DeleteAttribute(key) } } - return s + return obiseq.BioSequenceSlice{s}, nil } return f @@ -112,7 +117,7 @@ func ToBeKeptAttributesWorker(toBeKept []string) obiseq.SeqWorker { func CutSequenceWorker(from, to int, breakOnError bool) obiseq.SeqWorker { - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { var f, t int switch { @@ -142,16 +147,15 @@ func CutSequenceWorker(from, to int, breakOnError bool) obiseq.SeqWorker { if breakOnError { log.Fatalf("Cannot cut sequence %s (%v)", s.Id(), err) } else { - log.Warnf("Cannot cut sequence %s (%v), sequence discarded", s.Id(), err) - return nil + err = fmt.Errorf("Cannot cut sequence %s (%v), sequence discarded", s.Id(), err) } } - return rep + return obiseq.BioSequenceSlice{rep}, err } if from == 0 && to == 0 { - f = func(s *obiseq.BioSequence) *obiseq.BioSequence { - return s + f = func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { + return obiseq.BioSequenceSlice{s}, nil } } @@ -163,23 +167,23 @@ func CutSequenceWorker(from, to int, breakOnError bool) obiseq.SeqWorker { } func ClearAllAttributesWorker() obiseq.SeqWorker { - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { annot := s.Annotations() for key := range annot { s.DeleteAttribute(key) } - return s + return obiseq.BioSequenceSlice{s}, nil } return f } func RenameAttributeWorker(toBeRenamed map[string]string) obiseq.SeqWorker { - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { for newName, oldName := range toBeRenamed { s.RenameAttribute(newName, oldName) } - return s + return obiseq.BioSequenceSlice{s}, nil } return f @@ -201,20 +205,20 @@ func EvalAttributeWorker(expression map[string]string) obiseq.SeqWorker { } func AddTaxonAtRankWorker(taxonomy *obitax.Taxonomy, ranks ...string) obiseq.SeqWorker { - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { for _, r := range ranks { taxonomy.SetTaxonAtRank(s, r) } - return s + return obiseq.BioSequenceSlice{s}, nil } return f } func AddSeqLengthWorker() obiseq.SeqWorker { - f := func(s *obiseq.BioSequence) *obiseq.BioSequence { + f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { s.SetAttribute("seq_length", s.Len()) - return s + return obiseq.BioSequenceSlice{s}, nil } return f @@ -309,8 +313,8 @@ func CLIAnnotationPipeline() obiiter.Pipeable { predicate := obigrep.CLISequenceSelectionPredicate() worker := CLIAnnotationWorker() - annotator := obiseq.SeqToSliceConditionalWorker(worker, predicate, true, false) - f := obiiter.SliceWorkerPipe(annotator, obioptions.CLIParallelWorkers()) + annotator := obiseq.SeqToSliceConditionalWorker(predicate, worker, true, false) + f := obiiter.SliceWorkerPipe(annotator, false, obioptions.CLIParallelWorkers()) return f } diff --git a/pkg/obitools/obiclean/obiclean.go b/pkg/obitools/obiclean/obiclean.go index f993c11..4804aa6 100644 --- a/pkg/obitools/obiclean/obiclean.go +++ b/pkg/obitools/obiclean/obiclean.go @@ -60,7 +60,7 @@ func annotateOBIClean(dataset obiseq.BioSequenceSlice, sample map[string]*([]*seqPCR), tag, NAValue string) obiiter.IBioSequence { batchsize := 1000 - var annot = func(data obiseq.BioSequenceSlice) obiseq.BioSequenceSlice { + var annot = func(data obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { for _, s := range data { status := Status(s) @@ -87,11 +87,11 @@ func annotateOBIClean(dataset obiseq.BioSequenceSlice, annotation["obiclean_samplecount"] = head + internal + singleton } - return data + return data, nil } iter := obiiter.IBatchOver(dataset, batchsize) - riter := iter.MakeISliceWorker(annot) + riter := iter.MakeISliceWorker(annot, false) return riter } diff --git a/pkg/obitools/obicleandb/obicleandb.go b/pkg/obitools/obicleandb/obicleandb.go index 863712f..18234d6 100644 --- a/pkg/obitools/obicleandb/obicleandb.go +++ b/pkg/obitools/obicleandb/obicleandb.go @@ -50,10 +50,13 @@ func ICleanDB(itertator obiiter.IBioSequence) obiiter.IBioSequence { obioptions.CLIParallelWorkers()) annotated := usable.MakeIWorker(taxonomy.MakeSetSpeciesWorker(), + false, obioptions.CLIParallelWorkers(), ).MakeIWorker(taxonomy.MakeSetGenusWorker(), + false, obioptions.CLIParallelWorkers(), ).MakeIWorker(taxonomy.MakeSetFamilyWorker(), + false, obioptions.CLIParallelWorkers(), ) diff --git a/pkg/obitools/obimultiplex/demultiplex.go b/pkg/obitools/obimultiplex/demultiplex.go index 3cb696e..7b3f8ca 100644 --- a/pkg/obitools/obimultiplex/demultiplex.go +++ b/pkg/obitools/obimultiplex/demultiplex.go @@ -30,7 +30,7 @@ func IExtractBarcode(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error worker := obingslibrary.ExtractBarcodeSliceWorker(ngsfilter, opts...) - newIter := iterator.MakeISliceWorker(worker) + newIter := iterator.MakeISliceWorker(worker, false) if !CLIConservedErrors() { log.Println("Discards unassigned sequences") diff --git a/pkg/obitools/obipcr/pcr.go b/pkg/obitools/obipcr/pcr.go index d8a0c82..cf5312f 100644 --- a/pkg/obitools/obipcr/pcr.go +++ b/pkg/obitools/obipcr/pcr.go @@ -60,5 +60,5 @@ func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) { iterator = iterator.Pipe(frags) } - return iterator.MakeISliceWorker(worker, obioptions.CLIParallelWorkers(), 0), nil + return iterator.MakeISliceWorker(worker, false, obioptions.CLIParallelWorkers(), 0), nil } diff --git a/pkg/obitools/obitag/obigeomtag.go b/pkg/obitools/obitag/obigeomtag.go index 0eb2118..c46d187 100644 --- a/pkg/obitools/obitag/obigeomtag.go +++ b/pkg/obitools/obitag/obigeomtag.go @@ -1,9 +1,10 @@ package obitag import ( - log "github.com/sirupsen/logrus" "math" + log "github.com/sirupsen/logrus" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obialign" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" @@ -190,9 +191,10 @@ func GeomIdentifySeqWorker(references *obiseq.BioSequenceSlice, landmarks := ExtractLandmarkSeqs(references) taxa := ExtractTaxonSet(references, taxo) - return func(sequence *obiseq.BioSequence) *obiseq.BioSequence { + return func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { buffer := make([]uint64, 100) - return GeomIdentify(sequence, landmarks, references, taxa, taxo, &buffer) + return obiseq.BioSequenceSlice{GeomIdentify(sequence, landmarks, references, taxa, taxo, &buffer)}, + nil } } @@ -202,5 +204,5 @@ func CLIGeomAssignTaxonomy(iterator obiiter.IBioSequence, ) obiiter.IBioSequence { worker := GeomIdentifySeqWorker(&references, taxo) - return iterator.MakeIWorker(worker, obioptions.CLIParallelWorkers(), 0) + return iterator.MakeIWorker(worker, false, obioptions.CLIParallelWorkers(), 0) } diff --git a/pkg/obitools/obitag/obitag.go b/pkg/obitools/obitag/obitag.go index 0f6f5cd..57e3869 100644 --- a/pkg/obitools/obitag/obitag.go +++ b/pkg/obitools/obitag/obitag.go @@ -259,8 +259,8 @@ func IdentifySeqWorker(references obiseq.BioSequenceSlice, taxa obitax.TaxonSet, taxo *obitax.Taxonomy, runExact bool) obiseq.SeqWorker { - return func(sequence *obiseq.BioSequence) *obiseq.BioSequence { - return Identify(sequence, references, refcounts, taxa, taxo, runExact) + return func(sequence *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) { + return obiseq.BioSequenceSlice{Identify(sequence, references, refcounts, taxa, taxo, runExact)}, nil } } @@ -285,5 +285,5 @@ func CLIAssignTaxonomy(iterator obiiter.IBioSequence, worker := IdentifySeqWorker(references, refcounts, taxa, taxo, CLIRunExact()) - return iterator.MakeIWorker(worker, obioptions.CLIParallelWorkers(), 0) + return iterator.MakeIWorker(worker, false, obioptions.CLIParallelWorkers(), 0) }