From 98a4363d22f10140ffd565d986d0513ac773b34f Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 1 Feb 2022 23:25:19 +0100 Subject: [PATCH] obimultiplex saves unassigned sequence --- cmd/obitools/obimultiplex/main.go | 1 + pkg/obiformats/universal_write.go | 4 ++ pkg/obiseq/batchiterator.go | 67 +++++++++++++++++++++ pkg/obiseq/predicate.go | 77 ++++++++++++++++++++++++ pkg/obitax/issuubcladeof.go | 21 +++++++ pkg/obitools/obimultiplex/demultiplex.go | 19 ++++++ 6 files changed, 189 insertions(+) create mode 100644 pkg/obiseq/predicate.go diff --git a/cmd/obitools/obimultiplex/main.go b/cmd/obitools/obimultiplex/main.go index aa72cab..132d06a 100644 --- a/cmd/obitools/obimultiplex/main.go +++ b/cmd/obitools/obimultiplex/main.go @@ -31,4 +31,5 @@ func main() { sequences, _ := obiconvert.ReadBioSequencesBatch(args...) amplicons, _ := obimultiplex.IExtractBarcodeBatches(sequences) obiconvert.WriteBioSequencesBatch(amplicons, true) + amplicons.Wait() } diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index 68d97da..63f7630 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -80,6 +80,10 @@ func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, return newIter, err } + if iterator.Finished() { + return iterator, nil + } + return obiseq.NilIBioSequenceBatch, fmt.Errorf("input iterator not ready") } diff --git a/pkg/obiseq/batchiterator.go b/pkg/obiseq/batchiterator.go index c72f956..18b4c72 100644 --- a/pkg/obiseq/batchiterator.go +++ b/pkg/obiseq/batchiterator.go @@ -372,3 +372,70 @@ func (iterator IBioSequenceBatch) PairWith(reverse IBioSequenceBatch, sizes ...i return newIter } + +func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate, + size int, sizes ...int) (IBioSequenceBatch, IBioSequenceBatch) { + buffsize := iterator.BufferSize() + + if len(sizes) > 0 { + buffsize = sizes[1] + } + + trueIter := MakeIBioSequenceBatch(buffsize) + falseIter := MakeIBioSequenceBatch(buffsize) + + trueIter.Add(1) + falseIter.Add(1) + + go func() { + trueIter.Wait() + falseIter.Wait() + close(trueIter.Channel()) + close(falseIter.Channel()) + }() + + go func() { + trueOrder := 0 + falseOrder := 0 + iterator = iterator.SortBatches() + + trueSlice := make(BioSequenceSlice, 0, size) + falseSlice := make(BioSequenceSlice, 0, size) + + for iterator.Next() { + seqs := iterator.Get() + for _, s := range seqs.slice { + if predicate(s) { + trueSlice = append(trueSlice, s) + } else { + falseSlice = append(falseSlice, s) + } + + if len(trueSlice) == size { + trueIter.Channel() <- MakeBioSequenceBatch(trueOrder, trueSlice...) + trueOrder++ + trueSlice = make(BioSequenceSlice, 0, size) + } + + if len(falseSlice) == size { + falseIter.Channel() <- MakeBioSequenceBatch(falseOrder, falseSlice...) + falseOrder++ + falseSlice = make(BioSequenceSlice, 0, size) + } + } + } + + if len(trueSlice) > 0 { + trueIter.Channel() <- MakeBioSequenceBatch(trueOrder, trueSlice...) + } + + if len(falseSlice) > 0 { + falseIter.Channel() <- MakeBioSequenceBatch(falseOrder, falseSlice...) + } + + trueIter.Done() + falseIter.Done() + }() + + return trueIter, falseIter +} diff --git a/pkg/obiseq/predicate.go b/pkg/obiseq/predicate.go new file mode 100644 index 0000000..44c8031 --- /dev/null +++ b/pkg/obiseq/predicate.go @@ -0,0 +1,77 @@ +package obiseq + +type SequencePredicate func(BioSequence) bool + + +func (predicate1 SequencePredicate) And(predicate2 SequencePredicate) SequencePredicate { + f := func(sequence BioSequence) bool { + return predicate1(sequence) && predicate2(sequence) + } + + return f +} + +func (predicate1 SequencePredicate) Or(predicate2 SequencePredicate) SequencePredicate { + f := func(sequence BioSequence) bool { + return predicate1(sequence) || predicate2(sequence) + } + + return f +} + +func (predicate1 SequencePredicate) Xor(predicate2 SequencePredicate) SequencePredicate { + f := func(sequence BioSequence) bool { + p1 := predicate1(sequence) + p2 := predicate2(sequence) + return (p1 && !p2) || (p2 && !p1) + } + + return f +} + +func (predicate1 SequencePredicate) Not() SequencePredicate { + f := func(sequence BioSequence) bool { + return !predicate1(sequence) + } + + return f +} + +func HasAttribute(name string) SequencePredicate { + + f := func(sequence BioSequence) bool { + if sequence.HasAnnotation() { + _, ok := (sequence.Annotations())[name] + return ok + } + + return false + } + + return f +} + +func MoreAbundantThan(count int) SequencePredicate { + f := func(sequence BioSequence) bool { + return sequence.Count() > count + } + + return f +} + +func IsLongerOrEqualTo(length int) SequencePredicate { + f := func(sequence BioSequence) bool { + return sequence.Length() >= length + } + + return f +} + +func IsShorterOrEqualTo(length int) SequencePredicate { + f := func(sequence BioSequence) bool { + return sequence.Length() <= length + } + + return f +} + diff --git a/pkg/obitax/issuubcladeof.go b/pkg/obitax/issuubcladeof.go index 94f0c03..37a05e6 100644 --- a/pkg/obitax/issuubcladeof.go +++ b/pkg/obitax/issuubcladeof.go @@ -1,5 +1,11 @@ package obitax +import ( + "log" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + func (taxon *TaxNode) IsSubCladeOf(parent *TaxNode) bool { for taxon.taxid != parent.taxid && taxon.parent != taxon.taxid { @@ -19,3 +25,18 @@ func (taxon *TaxNode) IsBelongingSubclades(clades *TaxonSet) bool { return ok } + +func IsSubCladeOf(taxonomy Taxonomy, taxid int) obiseq.SequencePredicate { + parent, err := taxonomy.Taxon(taxid) + + if err != nil { + log.Fatalf("Cannot find taxon : %d (%v)", taxid, err) + } + + f := func(sequence obiseq.BioSequence) bool { + taxon, err := taxonomy.Taxon(sequence.Taxid()) + return err == nil && taxon.IsSubCladeOf(parent) + } + + return f +} diff --git a/pkg/obitools/obimultiplex/demultiplex.go b/pkg/obitools/obimultiplex/demultiplex.go index 1de9802..55831f4 100644 --- a/pkg/obitools/obimultiplex/demultiplex.go +++ b/pkg/obitools/obimultiplex/demultiplex.go @@ -6,6 +6,7 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obingslibrary" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" ) func IExtractBarcodeBatches(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSequenceBatch, error) { @@ -31,9 +32,27 @@ func IExtractBarcodeBatches(iterator obiseq.IBioSequenceBatch) (obiseq.IBioSeque newIter := iterator.MakeISliceWorker(worker) if !CLIConservedErrors() { + log.Println("Discards unassigned sequences") newIter = newIter.Rebatch(obioptions.CLIBatchSize()) } + var unidentified obiseq.IBioSequenceBatch + if CLIUnidentifiedFileName() != "" { + log.Printf("Unassigned sequences saved in file: %s\n", CLIUnidentifiedFileName()) + unidentified, newIter = newIter.DivideOn(obiseq.HasAttribute("demultiplex_error"), + obioptions.CLIBatchSize()) + + go func() { + _, err := obiconvert.WriteBioSequencesBatch(unidentified, + true, + CLIUnidentifiedFileName()) + + if err != nil { + log.Fatalf("%v", err) + } + }() + + } log.Printf("Sequence demultiplexing using %d workers\n", obioptions.CLIParallelWorkers()) return newIter, nil