diff --git a/pkg/obichunk/unique.go b/pkg/obichunk/unique.go index 377733b..ec2d202 100644 --- a/pkg/obichunk/unique.go +++ b/pkg/obichunk/unique.go @@ -133,5 +133,5 @@ func IUniqueSequence(iterator obiiter.IBioSequence, opts.BufferSize(), ) - return iMerged.Speed("Variants identified"), nil + return iMerged, nil } diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index b24bd5b..0071cd9 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -69,7 +69,7 @@ func WriteFasta(iterator obiiter.IBioSequence, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - file,_ = goutils.CompressStream(file,opt.CompressedFile(),opt.CloseFile()) + file, _ = goutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) buffsize := iterator.BufferSize() newIter := obiiter.MakeIBioSequence(buffsize) @@ -136,7 +136,7 @@ func WriteFasta(iterator obiiter.IBioSequence, } file.Close() - + log.Debugln("End of the fasta file writing") obiiter.UnregisterPipe() waitWriter.Done() @@ -156,7 +156,6 @@ func WriteFastaToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) (obiiter.IBioSequence, error) { - opt := MakeOptions(options) flags := os.O_WRONLY | os.O_CREATE @@ -164,7 +163,6 @@ func WriteFastaToFile(iterator obiiter.IBioSequence, flags |= os.O_APPEND } file, err := os.OpenFile(filename, flags, 0660) - if err != nil { log.Fatalf("open file error: %v", err) @@ -173,5 +171,18 @@ func WriteFastaToFile(iterator obiiter.IBioSequence, options = append(options, OptionCloseFile()) - return WriteFasta(iterator, file, options...) + iterator, err = WriteFasta(iterator, file, options...) + + if opt.HaveToSavePaired() { + var revfile *os.File + + revfile, err = os.OpenFile(opt.PairedFileName(), flags, 0660) + if err != nil { + log.Fatalf("open file error: %v", err) + return obiiter.NilIBioSequence, err + } + iterator, err = WriteFasta(iterator.PairedWith(), revfile, options...) + } + + return iterator, err } diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 5d4c7f8..5fb63fb 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -58,7 +58,7 @@ func WriteFastq(iterator obiiter.IBioSequence, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - file,_ = goutils.CompressStream(file,opt.CompressedFile(),opt.CloseFile()) + file, _ = goutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) buffsize := iterator.BufferSize() newIter := obiiter.MakeIBioSequence(buffsize) @@ -152,7 +152,7 @@ func WriteFastqToFile(iterator obiiter.IBioSequence, flags |= os.O_APPEND } file, err := os.OpenFile(filename, flags, 0660) - + if err != nil { log.Fatalf("open file error: %v", err) return obiiter.NilIBioSequence, err @@ -160,5 +160,18 @@ func WriteFastqToFile(iterator obiiter.IBioSequence, options = append(options, OptionCloseFile()) - return WriteFastq(iterator, file, options...) + iterator, err = WriteFastq(iterator, file, options...) + + if opt.HaveToSavePaired() { + var revfile *os.File + + revfile, err = os.OpenFile(opt.PairedFileName(), flags, 0660) + if err != nil { + log.Fatalf("open file error: %v", err) + return obiiter.NilIBioSequence, err + } + iterator, err = WriteFastq(iterator.PairedWith(), revfile, options...) + } + + return iterator, err } diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index d49b9bf..10da43c 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -15,6 +15,11 @@ type __options__ struct { closefile bool appendfile bool compressed bool + csv_ids bool + cvs_sequence bool + csv_definition bool + csv_separator string + paired_filename string } type Options struct { @@ -35,6 +40,11 @@ func MakeOptions(setters []WithOption) Options { closefile: false, appendfile: false, compressed: false, + csv_ids: true, + csv_definition: false, + cvs_sequence: true, + csv_separator: ",", + paired_filename: "", } opt := Options{&o} @@ -86,6 +96,18 @@ func (opt Options) CompressedFile() bool { return opt.pointer.compressed } +func (opt Options) CSVIds() bool { + return opt.pointer.csv_ids +} + +func (opt Options) HaveToSavePaired() bool { + return opt.pointer.paired_filename != "" +} + +func (opt Options) PairedFileName() string { + return opt.pointer.paired_filename +} + func OptionsBufferSize(size int) WithOption { f := WithOption(func(opt Options) { opt.pointer.buffer_size = size @@ -216,3 +238,12 @@ func OptionsWithoutProgressBar() WithOption { return f } + +func WritePairedReadsTo(filename string) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.paired_filename = filename + }) + + return f +} + diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index c3fcf9a..c96e673 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -69,5 +69,19 @@ func WriteSequencesToFile(iterator obiiter.IBioSequence, } options = append(options, OptionCloseFile()) - return WriteSequence(iterator, file, options...) + + iterator, err = WriteSequence(iterator, file, options...) + + if opt.HaveToSavePaired() { + var revfile *os.File + + revfile, err = os.OpenFile(opt.PairedFileName(), flags, 0660) + if err != nil { + log.Fatalf("open file error: %v", err) + return obiiter.NilIBioSequence, err + } + iterator, err = WriteSequence(iterator.PairedWith(), revfile, options...) + } + + return iterator, err } diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 573e05a..f2d5263 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -46,6 +46,7 @@ type _IBioSequence struct { batch_size int32 sequence_format string finished *abool.AtomicBool + paired bool } type IBioSequence struct { @@ -73,6 +74,7 @@ func MakeIBioSequence(sizes ...int) IBioSequence { batch_size: -1, sequence_format: "", finished: abool.New(), + paired: false, } waiting := sync.WaitGroup{} @@ -199,6 +201,11 @@ func (iterator IBioSequence) Split() IBioSequence { i.lock = &lock newIter := IBioSequence{&i} + + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + return newIter } @@ -270,6 +277,7 @@ func (iterator IBioSequence) Finished() bool { return iterator.pointer.finished.IsSet() } +// Sorting the batches of sequences. func (iterator IBioSequence) SortBatches(sizes ...int) IBioSequence { buffsize := iterator.BufferSize() @@ -311,6 +319,10 @@ func (iterator IBioSequence) SortBatches(sizes ...int) IBioSequence { newIter.Done() }() + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + return newIter } @@ -321,6 +333,11 @@ func (iterator IBioSequence) Concat(iterators ...IBioSequence) IBioSequence { return iterator } + allPaired := iterator.IsPaired() + for _, i := range iterators { + allPaired = allPaired && i.IsPaired() + } + buffsize := iterator.BufferSize() newIter := MakeIBioSequence(buffsize) @@ -357,6 +374,10 @@ func (iterator IBioSequence) Concat(iterators ...IBioSequence) IBioSequence { newIter.Done() }() + if allPaired { + newIter.MarkAsPaired() + } + return newIter } @@ -368,6 +389,12 @@ func (iterator IBioSequence) Pool(iterators ...IBioSequence) IBioSequence { return iterator } + allPaired := iterator.IsPaired() + + for _, i := range iterators { + allPaired = allPaired && i.IsPaired() + } + nextCounter := goutils.AtomicCounter() buffsize := iterator.BufferSize() newIter := MakeIBioSequence(buffsize) @@ -392,6 +419,10 @@ func (iterator IBioSequence) Pool(iterators ...IBioSequence) IBioSequence { go ff(i) } + if allPaired { + newIter.MarkAsPaired() + } + return newIter } @@ -441,6 +472,10 @@ func (iterator IBioSequence) Rebatch(size int, sizes ...int) IBioSequence { }() + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + return newIter } @@ -492,47 +527,6 @@ func (iterator IBioSequence) Count(recycle bool) (int, int, int) { return variants, reads, nucleotides } -func (iterator IBioSequence) PairWith(reverse IBioSequence, - sizes ...int) IPairedBioSequenceBatch { - buffsize := iterator.BufferSize() - batchsize := 5000 - - if len(sizes) > 0 { - batchsize = sizes[0] - } - - if len(sizes) > 1 { - buffsize = sizes[1] - } - - iterator = iterator.Rebatch(batchsize) - reverse = reverse.Rebatch(batchsize) - - newIter := MakeIPairedBioSequenceBatch(buffsize) - - newIter.Add(1) - - go func() { - newIter.WaitAndClose() - log.Println("End of association of paired reads") - }() - - log.Println("Start association of paired reads") - go func() { - for iterator.Next() { - if !reverse.Next() { - log.Panicln("Etrange reverse pas prĂȘt") - } - newIter.Channel() <- MakePairedBioSequenceBatch(iterator.Get(), - reverse.Get()) - } - - newIter.Done() - }() - - return newIter -} - // A function that takes a predicate and returns two IBioSequenceBatch iterators. // Sequences extracted from the input iterator are distributed among both the // iterator following the predicate value. @@ -599,6 +593,10 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, falseIter.Done() }() + if iterator.IsPaired() { + trueIter.MarkAsPaired() + falseIter.MarkAsPaired() + } return trueIter, falseIter } @@ -654,6 +652,71 @@ func (iterator IBioSequence) FilterOn(predicate obiseq.SequencePredicate, go ff(iterator) + if iterator.IsPaired() { + trueIter.MarkAsPaired() + } + + return trueIter.Rebatch(size) +} + +func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate, + size int, sizes ...int) IBioSequence { + buffsize := iterator.BufferSize() + nworkers := 4 + + if len(sizes) > 0 { + nworkers = sizes[0] + } + + if len(sizes) > 1 { + buffsize = sizes[1] + } + + trueIter := MakeIBioSequence(buffsize) + + trueIter.Add(nworkers) + + go func() { + trueIter.WaitAndClose() + }() + + ff := func(iterator IBioSequence) { + // iterator = iterator.SortBatches() + + for iterator.Next() { + seqs := iterator.Get() + slice := seqs.slice + j := 0 + for _, s := range slice { + good := predicate(s) + if s.IsPaired() { + good = good && predicate(s.PairedWith()) + } + if good { + slice[j] = s + j++ + } else { + s.Recycle() + } + } + + seqs.slice = slice[:j] + trueIter.pointer.channel <- seqs + } + + trueIter.Done() + } + + for i := 1; i < nworkers; i++ { + go ff(iterator.Split()) + } + + go ff(iterator) + + if iterator.IsPaired() { + trueIter.MarkAsPaired() + } + return trueIter.Rebatch(size) } @@ -673,13 +736,14 @@ func (iterator IBioSequence) Load() obiseq.BioSequenceSlice { // It takes a slice of BioSequence objects, and returns an iterator that will return batches of // BioSequence objects + func IBatchOver(data obiseq.BioSequenceSlice, size int, sizes ...int) IBioSequence { buffsize := 0 if len(sizes) > 0 { - buffsize = sizes[1] + buffsize = sizes[0] } newIter := MakeIBioSequence(buffsize) @@ -706,5 +770,8 @@ func IBatchOver(data obiseq.BioSequenceSlice, newIter.Done() }() + if data.IsPaired() { + newIter.MarkAsPaired() + } return newIter } diff --git a/pkg/obiiter/paired.go b/pkg/obiiter/paired.go new file mode 100644 index 0000000..a0a36d8 --- /dev/null +++ b/pkg/obiiter/paired.go @@ -0,0 +1,95 @@ +package obiiter + +import ( + "log" +) + +func (b BioSequenceBatch) IsPaired() bool { + return b.slice.IsPaired() +} + +func (b BioSequenceBatch) PairedWith() BioSequenceBatch { + return MakeBioSequenceBatch(b.order, + *b.slice.PairedWith()) + +} + +func (b *BioSequenceBatch) PairTo(p *BioSequenceBatch) { + + if b.order != p.order { + log.Fatalf("both batches are not synchronized : (%d,%d)", + b.order, p.order, + ) + } + + b.slice.PairTo(&p.slice) + +} + +func (b *BioSequenceBatch) UnPair() { + b.slice.UnPair() +} + +func (iter IBioSequence) MarkAsPaired() { + iter.pointer.paired = true +} + +func (iter IBioSequence) PairTo(p IBioSequence) IBioSequence { + + newIter := MakeIBioSequence() + + iter = iter.SortBatches() + p = p.SortBatches() + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + + for iter.Next() { + p.Next() + batch := iter.Get() + pbatch := p.Get() + batch.PairTo(&pbatch) + newIter.Push(batch) + } + + newIter.Done() + }() + + newIter.MarkAsPaired() + return newIter + +} + +func (iter IBioSequence) PairedWith() IBioSequence { + + newIter := MakeIBioSequence() + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + + for iter.Next() { + batch := iter.Get().PairedWith() + newIter.Push(batch) + } + + newIter.Done() + }() + + newIter.MarkAsPaired() + return newIter + +} + +func (iter IBioSequence) IsPaired() bool { + return iter.pointer.paired +} diff --git a/pkg/obiiter/pairedbatchiterator.go b/pkg/obiiter/pairedbatchiterator.go deleted file mode 100644 index 004c0a5..0000000 --- a/pkg/obiiter/pairedbatchiterator.go +++ /dev/null @@ -1,239 +0,0 @@ -package obiiter - -import ( - "sync" - "time" - - log "github.com/sirupsen/logrus" - - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" -) - -type PairedBioSequenceBatch struct { - forward obiseq.BioSequenceSlice - reverse obiseq.BioSequenceSlice - order int -} - -var NilPairedBioSequenceBatch = PairedBioSequenceBatch{nil, nil, -1} - -func MakePairedBioSequenceBatch(forward, reverse BioSequenceBatch) PairedBioSequenceBatch { - if forward.order != reverse.order { - log.Fatalf("Forward order : %d and reverse order : %d are not matching", - forward.order, reverse.order) - } - - for i := range reverse.slice { - reverse.slice[i].ReverseComplement(true) - } - - return PairedBioSequenceBatch{ - forward: forward.slice, - reverse: reverse.slice, - order: forward.order, - } -} - -func (batch PairedBioSequenceBatch) Order() int { - return batch.order -} - -func (batch PairedBioSequenceBatch) Reorder(newOrder int) PairedBioSequenceBatch { - batch.order = newOrder - return batch -} - -func (batch PairedBioSequenceBatch) Len() int { - return len(batch.forward) -} - -func (batch PairedBioSequenceBatch) Forward() obiseq.BioSequenceSlice { - return batch.forward -} - -func (batch PairedBioSequenceBatch) Reverse() obiseq.BioSequenceSlice { - return batch.reverse -} - -func (batch PairedBioSequenceBatch) IsNil() bool { - return batch.forward == nil -} - -// Structure implementing an iterator over bioseq.BioSequenceBatch -// based on a channel. -type __ipairedbiosequencebatch__ struct { - channel chan PairedBioSequenceBatch - current PairedBioSequenceBatch - pushBack bool - all_done *sync.WaitGroup - buffer_size int - finished bool - p_finished *bool -} - -type IPairedBioSequenceBatch struct { - pointer *__ipairedbiosequencebatch__ -} - -var NilIPairedBioSequenceBatch = IPairedBioSequenceBatch{pointer: nil} - -func MakeIPairedBioSequenceBatch(sizes ...int) IPairedBioSequenceBatch { - buffsize := 1 - - if len(sizes) > 0 { - buffsize = sizes[0] - } - - i := __ipairedbiosequencebatch__{ - channel: make(chan PairedBioSequenceBatch, buffsize), - current: NilPairedBioSequenceBatch, - pushBack: false, - buffer_size: buffsize, - finished: false, - p_finished: nil, - } - - i.p_finished = &i.finished - waiting := sync.WaitGroup{} - i.all_done = &waiting - ii := IPairedBioSequenceBatch{&i} - - RegisterAPipe() - return ii -} - -func (iterator IPairedBioSequenceBatch) Add(n int) { - iterator.pointer.all_done.Add(n) -} - -func (iterator IPairedBioSequenceBatch) Done() { - iterator.pointer.all_done.Done() -} - -func (iterator IPairedBioSequenceBatch) Wait() { - iterator.pointer.all_done.Wait() -} - -func (iterator IPairedBioSequenceBatch) Channel() chan PairedBioSequenceBatch { - return iterator.pointer.channel -} - -func (iterator IPairedBioSequenceBatch) Close() { - close(iterator.pointer.channel) - UnregisterPipe() -} - -func (iterator IPairedBioSequenceBatch) WaitAndClose() { - iterator.Wait() - - for len(iterator.Channel()) > 0 { - time.Sleep(time.Millisecond) - } - - iterator.Close() -} - -func (iterator IPairedBioSequenceBatch) IsNil() bool { - return iterator.pointer == nil -} - -func (iterator IPairedBioSequenceBatch) BufferSize() int { - return iterator.pointer.buffer_size -} - -func (iterator IPairedBioSequenceBatch) Split() IPairedBioSequenceBatch { - i := __ipairedbiosequencebatch__{ - channel: iterator.pointer.channel, - current: NilPairedBioSequenceBatch, - pushBack: false, - all_done: iterator.pointer.all_done, - buffer_size: iterator.pointer.buffer_size, - finished: false, - p_finished: iterator.pointer.p_finished} - newIter := IPairedBioSequenceBatch{&i} - return newIter -} - -func (iterator IPairedBioSequenceBatch) Next() bool { - if *(iterator.pointer.p_finished) { - return false - } - - if iterator.pointer.pushBack { - iterator.pointer.pushBack = false - return true - } - - next, ok := (<-iterator.pointer.channel) - - if ok { - iterator.pointer.current = next - return true - } - - iterator.pointer.current = NilPairedBioSequenceBatch - *iterator.pointer.p_finished = true - return false -} - -func (iterator IPairedBioSequenceBatch) PushBack() { - if !iterator.pointer.current.IsNil() { - iterator.pointer.pushBack = true - } -} - -// The 'Get' method returns the instance of BioSequenceBatch -// currently pointed by the iterator. You have to use the -// 'Next' method to move to the next entry before calling -// 'Get' to retreive the following instance. -func (iterator IPairedBioSequenceBatch) Get() PairedBioSequenceBatch { - return iterator.pointer.current -} - -// Finished returns 'true' value if no more data is available -// from the iterator. -func (iterator IPairedBioSequenceBatch) Finished() bool { - return *iterator.pointer.p_finished -} - -func (iterator IPairedBioSequenceBatch) SortBatches(sizes ...int) IPairedBioSequenceBatch { - buffsize := iterator.BufferSize() - - if len(sizes) > 0 { - buffsize = sizes[0] - } - - newIter := MakeIPairedBioSequenceBatch(buffsize) - - newIter.Add(1) - - go func() { - newIter.Wait() - close(newIter.pointer.channel) - }() - - next_to_send := 0 - received := make(map[int]PairedBioSequenceBatch) - go func() { - for iterator.Next() { - batch := iterator.Get() - if batch.order == next_to_send { - newIter.pointer.channel <- batch - next_to_send++ - batch, ok := received[next_to_send] - for ok { - newIter.pointer.channel <- batch - delete(received, next_to_send) - next_to_send++ - batch, ok = received[next_to_send] - } - } else { - received[batch.order] = batch - } - } - newIter.Done() - }() - - return newIter - -} diff --git a/pkg/obiiter/pipe.go b/pkg/obiiter/pipe.go index 831b75d..63902df 100644 --- a/pkg/obiiter/pipe.go +++ b/pkg/obiiter/pipe.go @@ -40,5 +40,10 @@ func (input IBioSequence) CopyTee() (IBioSequence, IBioSequence) { } }() + if input.IsPaired() { + first.MarkAsPaired() + second.MarkAsPaired() + } + return first, second } diff --git a/pkg/obiiter/speed.go b/pkg/obiiter/speed.go index 24014df..3bdf336 100644 --- a/pkg/obiiter/speed.go +++ b/pkg/obiiter/speed.go @@ -3,6 +3,7 @@ package obiiter import ( "fmt" "os" + "time" "github.com/schollz/progressbar/v3" ) @@ -45,18 +46,29 @@ func (iterator IBioSequence) Speed(message ...string) IBioSequence { bar := progressbar.NewOptions(-1, pbopt...) go func() { + c := 0 + start := time.Now() for iterator.Next() { batch := iterator.Get() - l := batch.Len() + c += batch.Len() newIter.Push(batch) - bar.Add(l) + elapsed := time.Since(start) + if elapsed > (time.Millisecond * 100) { + bar.Add(c) + c = 0 + start = time.Now() + } } fmt.Fprintln(os.Stderr) newIter.Done() }() + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + return newIter } diff --git a/pkg/obiiter/workers.go b/pkg/obiiter/workers.go index b5eedf5..1e2c503 100644 --- a/pkg/obiiter/workers.go +++ b/pkg/obiiter/workers.go @@ -54,6 +54,10 @@ func (iterator IBioSequence) MakeIWorker(worker obiseq.SeqWorker, sizes ...int) } go f(iterator) + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + return newIter } @@ -99,6 +103,10 @@ func (iterator IBioSequence) MakeIConditionalWorker(predicate obiseq.SequencePre } go f(iterator) + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + return newIter } @@ -138,6 +146,10 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, size } go f(iterator) + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + return newIter } diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 0207f1e..1366aae 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -11,7 +11,7 @@ import ( ) var _Debug = false -var _ParallelWorkers = runtime.NumCPU() - 1 +var _ParallelWorkers = runtime.NumCPU() * 2 - 1 var _MaxAllowedCPU = runtime.NumCPU() var _BufferSize = 1 var _BatchSize = 5000 @@ -19,7 +19,10 @@ var _BatchSize = 5000 type ArgumentParser func([]string) (*getoptions.GetOpt, []string, error) func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser { + options := getoptions.New() + options.SetMode(getoptions.Bundling) + options.SetUnknownMode(getoptions.Fail) options.Bool("help", false, options.Alias("h", "?")) options.BoolVar(&_Debug, "debug", false) @@ -28,6 +31,7 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser options.Description("Number of parallele threads computing the result")) options.IntVar(&_MaxAllowedCPU, "max-cpu", _MaxAllowedCPU, + options.GetEnv("OBIMAXCPU"), options.Description("Number of parallele threads computing the result")) for _, o := range optionset { @@ -42,6 +46,10 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser runtime.GOMAXPROCS(_MaxAllowedCPU) if options.Called("max-cpu") { log.Printf("CPU number limited to %d", _MaxAllowedCPU) + if ! options.Called("workers") { + _ParallelWorkers=_MaxAllowedCPU * 2 - 1 + log.Printf("Number of workers set %d", _ParallelWorkers) + } } if options.Called("no-singleton") { diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index 8031340..a1d4b10 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -55,6 +55,7 @@ type BioSequence struct { sequence []byte // The sequence itself, it is accessible by the methode Sequence qualities []byte // The quality scores of the sequence. feature []byte + paired *BioSequence // A pointer to the paired sequence annotations Annotation } @@ -69,6 +70,7 @@ func MakeEmptyBioSequence() BioSequence { sequence: nil, qualities: nil, feature: nil, + paired: nil, annotations: nil, } } @@ -275,3 +277,4 @@ func (s *BioSequence) WriteByte(data byte) error { func (s *BioSequence) Clear() { s.sequence = s.sequence[0:0] } + diff --git a/pkg/obiseq/biosequenceslice.go b/pkg/obiseq/biosequenceslice.go index 5969ef1..46d2380 100644 --- a/pkg/obiseq/biosequenceslice.go +++ b/pkg/obiseq/biosequenceslice.go @@ -23,7 +23,7 @@ func NewBioSequenceSlice(size ...int) *BioSequenceSlice { slice := _BioSequenceSlicePool.Get().(*BioSequenceSlice) if len(size) > 0 { s := size[0] - slice.InsureCapacity(s) + slice = slice.InsureCapacity(s) (*slice)=(*slice)[0:s] } return slice diff --git a/pkg/obiseq/paired_reads.go b/pkg/obiseq/paired_reads.go new file mode 100644 index 0000000..2586b1d --- /dev/null +++ b/pkg/obiseq/paired_reads.go @@ -0,0 +1,58 @@ +package obiseq + +import log "github.com/sirupsen/logrus" + +func (s *BioSequence) IsPaired() bool { + return s.paired != nil +} + +func (s *BioSequence) PairedWith() *BioSequence { + return s.paired +} + +func (s *BioSequence) PairTo(p *BioSequence) { + s.paired = p + if p != nil { + p.paired = s + } +} + +func (s *BioSequence) UnPair() { + if s.paired != nil { + s.paired.paired = nil + } + s.paired = nil +} + +func (s *BioSequenceSlice) IsPaired() bool { + return (*s)[0].paired != nil +} + +func (s *BioSequenceSlice) PairedWith() *BioSequenceSlice { + ps := NewBioSequenceSlice(len(*s)) + for i, seq := range *s { + (*ps)[i] = seq.PairedWith() + } + + return ps +} + +func (s *BioSequenceSlice) PairTo(p *BioSequenceSlice) { + + if len(*s) != len(*p) { + log.Fatalf("Pairing of iterators: both batches have not the same length : (%d,%d)", + len(*s), len(*p), + ) + } + + for i, seq := range *s { + seq.PairTo((*p)[i]) + } + +} + +func (s *BioSequenceSlice) UnPair() { + for _, seq := range *s { + seq.UnPair() + } +} diff --git a/pkg/obiseq/predicate.go b/pkg/obiseq/predicate.go index ade81cc..31ddf48 100644 --- a/pkg/obiseq/predicate.go +++ b/pkg/obiseq/predicate.go @@ -11,6 +11,61 @@ import ( type SequencePredicate func(*BioSequence) bool +type SeqPredicateMode int + +const ( + ForwardOnly SeqPredicateMode = iota + ReverseOnly + And + Or + AndNot + Xor +) + +func (predicate SequencePredicate) PredicateOnPaired(ifnotpaired bool) SequencePredicate { + if predicate == nil { + return nil + } + + p := func(sequence *BioSequence) bool { + if sequence.IsPaired() { + return predicate(sequence.PairedWith()) + } + return ifnotpaired + } + + return p +} + +func (predicate SequencePredicate) PairedPredicat(mode SeqPredicateMode) SequencePredicate { + if predicate == nil { + return nil + } + + p := func(sequence *BioSequence) bool { + good := predicate(sequence) + + if sequence.IsPaired() && mode != ForwardOnly { + pgood := predicate(sequence.PairedWith()) + switch mode { + case ReverseOnly: + good = pgood + case And: + good = good && pgood + case Or: + good = good || pgood + case AndNot: + good = good && !pgood + case Xor: + good = (good || pgood) && !(good && pgood) + } + } + return good + } + + return p +} + func (predicate1 SequencePredicate) And(predicate2 SequencePredicate) SequencePredicate { switch { case predicate1 == nil: @@ -209,8 +264,8 @@ func ExpressionPredicat(expression string) SequencePredicate { f := func(sequence *BioSequence) bool { value, err := exp.EvalBool(context.Background(), map[string]interface{}{ - "annotations": sequence.Annotations(), - "sequence": sequence, + "annotations": sequence.Annotations(), + "sequence": sequence, }, ) @@ -225,4 +280,3 @@ func ExpressionPredicat(expression string) SequencePredicate { return f } - diff --git a/pkg/obitools/obiconvert/options.go b/pkg/obitools/obiconvert/options.go index 25a3e5a..7a4792d 100644 --- a/pkg/obitools/obiconvert/options.go +++ b/pkg/obitools/obiconvert/options.go @@ -27,6 +27,9 @@ var __output_solexa_quality__ = false var __no_progress_bar__ = false var __compressed__ = false +var __output_file_name__ = "-" +var __paired_file_name__ = "" + func InputOptionSet(options *getoptions.GetOpt) { // options.IntVar(&__skipped_entries__, "skip", __skipped_entries__, // options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) @@ -73,15 +76,29 @@ func OutputOptionSet(options *getoptions.GetOpt) { options.BoolVar(&__no_progress_bar__, "no-progressbar", false, options.Description("Disable the progress bar printing")) - options.BoolVar(&__compressed__, "--compress", false, + options.BoolVar(&__compressed__, "compress", false, options.Alias("Z"), options.Description("Output is compressed")) + options.StringVar(&__output_file_name__, "out", __output_file_name__, + options.Alias("o"), + options.ArgName("FILENAME"), + options.Description("Filename used for saving the output"), + ) + +} + +func PairedFilesOptionSet(options *getoptions.GetOpt) { + options.StringVar(&__paired_file_name__, "paired-with", __paired_file_name__, + options.ArgName("FILENAME"), + options.Description("Filename containing the paired reads"), + ) } func OptionSet(options *getoptions.GetOpt) { InputOptionSet(options) OutputOptionSet(options) + PairedFilesOptionSet(options) } // Returns true if the number of reads described in the @@ -170,3 +187,14 @@ func CLIOutputQualityShift() int { func CLIProgressBar() bool { return !__no_progress_bar__ } + +func CLIOutPutFileName() string { + return __output_file_name__ +} + +func CLIHasPairedFile() bool { + return __paired_file_name__ != "" +} +func CLIPairedFileName() string { + return __paired_file_name__ +} \ No newline at end of file diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index 8a19d49..1d20962 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -67,7 +67,7 @@ func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { return list_of_files, nil } -func ReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { +func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { var iterator obiiter.IBioSequence var reader func(string, ...obiformats.WithOption) (obiiter.IBioSequence, error) @@ -142,6 +142,17 @@ func ReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { if err != nil { return obiiter.NilIBioSequence, err } + + if CLIPairedFileName() != "" { + ip, err := reader(CLIPairedFileName(), opts...) + + if err != nil { + return obiiter.NilIBioSequence, err + } + + iterator = iterator.PairTo(ip) + } + } // list_of_files = list_of_files[1:] diff --git a/pkg/obitools/obiconvert/sequence_writer.go b/pkg/obitools/obiconvert/sequence_writer.go index a4931c6..01b1882 100644 --- a/pkg/obitools/obiconvert/sequence_writer.go +++ b/pkg/obitools/obiconvert/sequence_writer.go @@ -1,6 +1,9 @@ package obiconvert import ( + "path/filepath" + "strings" + log "github.com/sirupsen/logrus" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" @@ -8,6 +11,27 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" ) +func BuildPairedFileNames(filename string) (string, string) { + + dir, name := filepath.Split(filename) + parts := strings.SplitN(name, ".", 2) + + forward := parts[0] + "_R1" + reverse := parts[0] + "_R2" + + if parts[1] != "" { + suffix := "." + parts[1] + forward += suffix + reverse += suffix + } + + if dir != "" { + forward = filepath.Join(dir, forward) + reverse = filepath.Join(dir, reverse) + } + + return forward, reverse +} func CLIWriteBioSequences(iterator obiiter.IBioSequence, terminalAction bool, filenames ...string) (obiiter.IBioSequence, error) { @@ -45,7 +69,32 @@ func CLIWriteBioSequences(iterator obiiter.IBioSequence, var err error - if len(filenames) == 0 { + // No file names are specified or it is "-" : the output is done on stdout + + if CLIOutPutFileName() != "-" || (len(filenames) > 0 && filenames[0] != "-") { + var fn string + + if len(filenames) == 0 { + fn = CLIOutPutFileName() + } else { + fn = filenames[0] + } + + if iterator.IsPaired() { + var reverse string + fn, reverse = BuildPairedFileNames(fn) + opts = append(opts, obiformats.WritePairedReadsTo(reverse)) + } + + switch CLIOutputFormat() { + case "fastq": + newIter, err = obiformats.WriteFastqToFile(iterator, fn, opts...) + case "fasta": + newIter, err = obiformats.WriteFastaToFile(iterator, fn, opts...) + default: + newIter, err = obiformats.WriteSequencesToFile(iterator, fn, opts...) + } + } else { switch CLIOutputFormat() { case "fastq": newIter, err = obiformats.WriteFastqToStdout(iterator, opts...) @@ -54,15 +103,6 @@ func CLIWriteBioSequences(iterator obiiter.IBioSequence, default: newIter, err = obiformats.WriteSequencesToStdout(iterator, opts...) } - } else { - switch CLIOutputFormat() { - case "fastq": - newIter, err = obiformats.WriteFastqToFile(iterator, filenames[0], opts...) - case "fasta": - newIter, err = obiformats.WriteFastaToFile(iterator, filenames[0], opts...) - default: - newIter, err = obiformats.WriteSequencesToFile(iterator, filenames[0], opts...) - } } if err != nil { diff --git a/pkg/obitools/obigrep/grep.go b/pkg/obitools/obigrep/grep.go index 61e6aec..d3e474c 100644 --- a/pkg/obitools/obigrep/grep.go +++ b/pkg/obitools/obigrep/grep.go @@ -8,11 +8,15 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" ) -func IFilterSequence(iterator obiiter.IBioSequence) obiiter.IBioSequence { +func CLIFilterSequence(iterator obiiter.IBioSequence) obiiter.IBioSequence { var newIter obiiter.IBioSequence predicate := CLISequenceSelectionPredicate() + if obiconvert.CLIHasPairedFile() { + predicate = predicate.PairedPredicat(CLIPairedReadMode()) + } + if predicate != nil { if CLISaveDiscardedSequences() { var discarded obiiter.IBioSequence diff --git a/pkg/obitools/obigrep/options.go b/pkg/obitools/obigrep/options.go index aab5594..2c4a5f7 100644 --- a/pkg/obitools/obigrep/options.go +++ b/pkg/obitools/obigrep/options.go @@ -40,6 +40,8 @@ var _AttributePatterns = make(map[string]string, 0) var _InvertMatch = false var _SaveRejected = "" +var _PairedMode = "forward" + func TaxonomySelectionOptionSet(options *getoptions.GetOpt) { options.StringVar(&_Taxdump, "taxdump", _Taxdump, @@ -135,6 +137,11 @@ func SequenceSelectionOptionSet(options *getoptions.GetOpt) { "Several -a options can be used on the same command line and in this last case, the selected "+ "sequence records will match all constraints.")) + options.StringVar(&_PairedMode, "paired-mode", _PairedMode, + options.ArgName("forward|reverse|and|or|andnot|xor"), + options.Description("If paired reads are passed to obibrep, that option determines how the conditions "+ + "are applied to both reads."), + ) } // OptionSet adds to the basic option set every options declared for @@ -412,3 +419,24 @@ func CLISaveDiscardedSequences() bool { func CLIDiscardedFileName() string { return _SaveRejected } + +func CLIPairedReadMode() obiseq.SeqPredicateMode { + switch _PairedMode { + case "forward": + return obiseq.ForwardOnly + case "reverse": + return obiseq.ReverseOnly + case "and": + return obiseq.And + case "or": + return obiseq.Or + case "andnot": + return obiseq.AndNot + case "xor": + return obiseq.Xor + default: + log.Fatalf("Paired reads mode must be forward, reverse, and, or, andnot, or xor (%s)", _PairedMode) + } + + return obiseq.ForwardOnly +} diff --git a/pkg/obitools/obipairing/options.go b/pkg/obitools/obipairing/options.go index 2f5c129..479d45f 100644 --- a/pkg/obitools/obipairing/options.go +++ b/pkg/obitools/obipairing/options.go @@ -6,8 +6,8 @@ import ( "github.com/DavidGamba/go-getoptions" ) -var _ForwardFiles = make([]string, 0, 10) -var _ReverseFiles = make([]string, 0, 10) +var _ForwardFile = "" +var _ReverseFile = "" var _Delta = 5 var _MinOverlap = 20 var _GapPenality = float64(2.0) @@ -15,15 +15,15 @@ var _WithoutStats = false var _MinIdentity = 0.9 func PairingOptionSet(options *getoptions.GetOpt) { - options.StringSliceVar(&_ForwardFiles, "forward-reads", - 1, 1000, + options.StringVar(&_ForwardFile, "forward-reads", "", options.Alias("F"), - options.Required("You must provide at least one forward file"), + options.ArgName("FILENAME_F"), + options.Required("You must provide at a forward file"), options.Description("The file names containing the forward reads")) - options.StringSliceVar(&_ReverseFiles, "reverse-reads", - 1, 1000, + options.StringVar(&_ReverseFile, "reverse-reads", "", options.Alias("R"), - options.Required("You must provide at least one reverse file"), + options.ArgName("FILENAME_R"), + options.Required("You must provide a reverse file"), options.Description("The file names containing the reverse reads")) options.IntVar(&_Delta, "delta", _Delta, options.Alias("D"), @@ -42,42 +42,43 @@ func PairingOptionSet(options *getoptions.GetOpt) { } func OptionSet(options *getoptions.GetOpt) { - obiconvert.OptionSet(options) + obiconvert.OutputOptionSet(options) + obiconvert.InputOptionSet(options) PairingOptionSet(options) } -func IBatchPairedSequence() (obiiter.IPairedBioSequenceBatch, error) { - forward, err := obiconvert.ReadBioSequences(_ForwardFiles...) +func CLIPairedSequence() (obiiter.IBioSequence, error) { + forward, err := obiconvert.CLIReadBioSequences(_ForwardFile) if err != nil { - return obiiter.NilIPairedBioSequenceBatch, err + return obiiter.NilIBioSequence, err } - reverse, err := obiconvert.ReadBioSequences(_ReverseFiles...) + reverse, err := obiconvert.CLIReadBioSequences(_ReverseFile) if err != nil { - return obiiter.NilIPairedBioSequenceBatch, err + return obiiter.NilIBioSequence, err } - paired := forward.PairWith(reverse) + paired := forward.PairTo(reverse) return paired, nil } -func Delta() int { +func CLIDelta() int { return _Delta } -func MinOverlap() int { +func CLIMinOverlap() int { return _MinOverlap } -func MinIdentity() float64 { +func CLIMinIdentity() float64 { return _MinIdentity } -func GapPenality() float64 { +func CLIGapPenality() float64 { return _GapPenality } -func WithStats() bool { +func CLIWithStats() bool { return !_WithoutStats } diff --git a/pkg/obitools/obipairing/pairing.go b/pkg/obitools/obipairing/pairing.go index 5ed581c..5662236 100644 --- a/pkg/obitools/obipairing/pairing.go +++ b/pkg/obitools/obipairing/pairing.go @@ -3,12 +3,12 @@ package obipairing import ( "math" "os" - "runtime" log "github.com/sirupsen/logrus" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obialign" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "github.com/schollz/progressbar/v3" ) @@ -203,12 +203,16 @@ func AssemblePESequences(seqA, seqB *obiseq.BioSequence, // // The function returns an iterator over batches of obiseq.Biosequence object. // each pair of processed sequences produces one sequence in the result iterator. -func IAssemblePESequencesBatch(iterator obiiter.IPairedBioSequenceBatch, +func IAssemblePESequencesBatch(iterator obiiter.IBioSequence, gap float64, delta, minOverlap int, minIdentity float64, withStats bool, sizes ...int) obiiter.IBioSequence { - nworkers := runtime.NumCPU() * 3 / 2 + if !iterator.IsPaired() { + log.Fatalln("Sequence data must be paired") + } + + nworkers := obioptions.CLIMaxCPU() * 3 / 2 buffsize := iterator.BufferSize() if len(sizes) > 0 { @@ -236,15 +240,15 @@ func IAssemblePESequencesBatch(iterator obiiter.IPairedBioSequenceBatch, progressbar.OptionShowIts(), progressbar.OptionSetDescription("[Sequence Pairing]")) - f := func(iterator obiiter.IPairedBioSequenceBatch, wid int) { + f := func(iterator obiiter.IBioSequence, wid int) { arena := obialign.MakePEAlignArena(150, 150) for iterator.Next() { batch := iterator.Get() - cons := make(obiseq.BioSequenceSlice, len(batch.Forward())) + cons := make(obiseq.BioSequenceSlice, len(batch.Slice())) processed := 0 - for i, A := range batch.Forward() { - B := batch.Reverse()[i] + for i, A := range batch.Slice() { + B := A.PairedWith() cons[i] = AssemblePESequences(A, B, gap, delta, minOverlap, minIdentity, withStats, true, arena) if i%59 == 0 { bar.Add(59)