diff --git a/Release-notes.md b/Release-notes.md index 74e22c7..e6459ed 100644 --- a/Release-notes.md +++ b/Release-notes.md @@ -20,6 +20,37 @@ ### New features +- The output of the obitools will evolve to produce results only in standard + formats such as fasta and fastq. For non-sequential data, the output will be + in CSV format, with the separator `,`, the decimal separator `.`, and a + header line with the column names. It is more convenient to use the output + in other programs. For example, you can use the `csvtomd` command to + reformat the csv output into a markdown table. The first command to initiate + this change is `obicount`, which now produces a 3-line CSV output. + + ```bash + obicount data.csv | csvtomd + ``` + +- Adds the new experimental `obicleandb` utility to clean up reference + database files created with `obipcr`. An easy way to create a reference + database for `obitag` is to use `obipcr` on a local copy of Genbank or EMBL. + However, these sequence databases are known to contain many taxonomic + errors, such as bacterial sequences annotated with the taxid of their host + species. obicleandb tries to detect these errors. To do this, it first keeps + only sequences annotated with the taxid to which a species, genus, and + family taxid can be assigned. Then, for each sequence, it compares the + distance of the sequence to the other sequences belonging to the same genus + to the same number of distances between the considered sequence and a + randomly selected set of sequences belonging to another family using a + Mann-Whitney U test. The alternative hypothesis is that out-of-family + distances are greater than intrageneric distances. Sequences are annotated + with the p-value of the Mann-Whitney U test in the **obicleandb_trusted** + slot. Later, the distribution of this p-value can be analyzed to determine a + threshold. Empirically, a threshold of 0.05 is a good compromise and allows + to filter out less than 1‰ of the sequences. These sequences can then be + removed using `obigrep`. + - Adds a new `obijoin` utility to join information contained in a sequence file with that contained in another sequence or CSV file. The command allows you to specify the names of the keys in the main sequence file and in the diff --git a/cmd/obitools/obicount/main.go b/cmd/obitools/obicount/main.go index 2b0927d..750a4be 100644 --- a/cmd/obitools/obicount/main.go +++ b/cmd/obitools/obicount/main.go @@ -45,17 +45,18 @@ func main() { nvariant, nread, nsymbol := fs.Count(true) + fmt.Print("entites,n\n") + if obicount.CLIIsPrintingVariantCount() { - fmt.Printf(" %d", nvariant) + fmt.Printf("variants,%d\n", nvariant) } if obicount.CLIIsPrintingReadCount() { - fmt.Printf(" %d", nread) + fmt.Printf("reads,%d\n", nread) } if obicount.CLIIsPrintingSymbolCount() { - fmt.Printf(" %d", nsymbol) + fmt.Printf("symbols,%d\n", nsymbol) } - fmt.Printf("\n") } diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go index c378c00..bd85a1a 100644 --- a/pkg/obichunk/chunk_on_disk.go +++ b/pkg/obichunk/chunk_on_disk.go @@ -73,11 +73,11 @@ func ISequenceChunkOnDisk(iterator obiiter.IBioSequence, panic(err) } - chunck := iseq.Load() + source, chunk := iseq.Load() - newIter.Push(obiiter.MakeBioSequenceBatch(order, chunck)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunk)) log.Infof("Start processing of batch %d/%d : %d sequences", - order, nbatch, len(chunck)) + order, nbatch, len(chunk)) } diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go index b2774c4..95915c0 100644 --- a/pkg/obichunk/chunks.go +++ b/pkg/obichunk/chunks.go @@ -28,6 +28,7 @@ func ISequenceChunk(iterator obiiter.IBioSequence, jobDone := sync.WaitGroup{} chunks := make(map[int]*obiseq.BioSequenceSlice, 1000) + sources := make(map[int]string, 1000) for newflux := range dispatcher.News() { jobDone.Add(1) @@ -43,12 +44,18 @@ func ISequenceChunk(iterator obiiter.IBioSequence, chunks[newflux] = chunk lock.Unlock() + source := "" for data.Next() { b := data.Get() + source = b.Source() *chunk = append(*chunk, b.Slice()...) b.Recycle(false) } + lock.Lock() + sources[newflux] = source + lock.Unlock() + jobDone.Done() }(newflux) } @@ -56,10 +63,10 @@ func ISequenceChunk(iterator obiiter.IBioSequence, jobDone.Wait() order := 0 - for _, chunck := range chunks { + for i, chunk := range chunks { - if len(*chunck) > 0 { - newIter.Push(obiiter.MakeBioSequenceBatch(order, *chunck)) + if len(*chunk) > 0 { + newIter.Push(obiiter.MakeBioSequenceBatch(sources[i], order, *chunk)) order++ } diff --git a/pkg/obichunk/subchunks.go b/pkg/obichunk/subchunks.go index 2c160f9..0d72cd7 100644 --- a/pkg/obichunk/subchunks.go +++ b/pkg/obichunk/subchunks.go @@ -90,7 +90,7 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence, for iterator.Next() { batch := iterator.Get() - + source := batch.Source() if batch.Len() > 1 { classifier.Reset() @@ -117,7 +117,7 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence, ss := obiseq.MakeBioSequenceSlice() for i, v := range ordered { if v.code != last { - newIter.Push(obiiter.MakeBioSequenceBatch(nextOrder(), ss)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, nextOrder(), ss)) ss = obiseq.MakeBioSequenceSlice() last = v.code } @@ -127,7 +127,7 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence, } if len(ss) > 0 { - newIter.Push(obiiter.MakeBioSequenceBatch(nextOrder(), ss)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, nextOrder(), ss)) } } else { newIter.Push(batch.Reorder(nextOrder())) diff --git a/pkg/obiformats/csv_read.go b/pkg/obiformats/csv_read.go index 4a7604c..79bfb97 100644 --- a/pkg/obiformats/csv_read.go +++ b/pkg/obiformats/csv_read.go @@ -111,14 +111,14 @@ func _ParseCsvFile(source string, slice = append(slice, sequence) if len(slice) >= batchSize { - out.Push(obiiter.MakeBioSequenceBatch(o, slice)) + out.Push(obiiter.MakeBioSequenceBatch(source, o, slice)) o++ slice = obiseq.MakeBioSequenceSlice() } } if len(slice) > 0 { - out.Push(obiiter.MakeBioSequenceBatch(o, slice)) + out.Push(obiiter.MakeBioSequenceBatch(source, o, slice)) } out.Done() diff --git a/pkg/obiformats/csv_writer.go b/pkg/obiformats/csv_writer.go index d4f28ac..7d22ff9 100644 --- a/pkg/obiformats/csv_writer.go +++ b/pkg/obiformats/csv_writer.go @@ -142,7 +142,7 @@ func WriteCSV(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() obiiter.RegisterAPipe() - chunkchan := make(chan FileChunck) + chunkchan := make(chan FileChunk) newIter.Add(nwriters) var waitWriter sync.WaitGroup @@ -161,7 +161,7 @@ func WriteCSV(iterator obiiter.IBioSequence, batch := iterator.Get() - chunkchan <- FileChunck{ + chunkchan <- FileChunk{ FormatCVSBatch(batch, opt), batch.Order(), } @@ -171,7 +171,7 @@ func WriteCSV(iterator obiiter.IBioSequence, } next_to_send := 0 - received := make(map[int]FileChunck, 100) + received := make(map[int]FileChunk, 100) waitWriter.Add(1) go func() { diff --git a/pkg/obiformats/ecopcr_read.go b/pkg/obiformats/ecopcr_read.go index 62b0ceb..5706805 100644 --- a/pkg/obiformats/ecopcr_read.go +++ b/pkg/obiformats/ecopcr_read.go @@ -122,7 +122,7 @@ func __read_ecopcr_bioseq__(file *__ecopcr_file__) (*obiseq.BioSequence, error) return bseq, nil } -func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { +func ReadEcoPCR(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { tag := make([]byte, 11) n, _ := reader.Read(tag) @@ -187,7 +187,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { slice = append(slice, seq) ii++ if ii >= opt.BatchSize() { - newIter.Push(obiiter.MakeBioSequenceBatch(i, slice)) + newIter.Push(obiiter.MakeBioSequenceBatch(opt.Source(), i, slice)) slice = obiseq.MakeBioSequenceSlice() i++ ii = 0 @@ -198,7 +198,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { } if len(slice) > 0 { - newIter.Push(obiiter.MakeBioSequenceBatch(i, slice)) + newIter.Push(obiiter.MakeBioSequenceBatch(opt.Source(), i, slice)) } newIter.Done() @@ -213,7 +213,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { newIter = newIter.CompleteFileIterator() } - return newIter + return newIter, nil } func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { @@ -235,5 +235,5 @@ func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSeq reader = greader } - return ReadEcoPCR(reader, options...), nil + return ReadEcoPCR(reader, options...) } diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 88490f4..dddf1e5 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -15,7 +15,7 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) -// _EndOfLastEntry finds the index of the last entry in the given byte slice 'buff' +// EndOfLastFlatFileEntry finds the index of the last entry in the given byte slice 'buff' // using a pattern match of the form: // ?//? // where and are the ASCII codes for carriage return and line feed, @@ -27,7 +27,7 @@ import ( // // Returns: // int - the index of the end of the last entry or -1 if no match is found. -func _EndOfLastEntry(buff []byte) int { +func EndOfLastFlatFileEntry(buff []byte) int { // 6 5 43 2 1 // ?//? var i int @@ -87,15 +87,9 @@ func _EndOfLastEntry(buff []byte) int { return -1 } -func _ParseEmblFile(source string, input ChannelSeqFileChunk, - out obiiter.IBioSequence, - withFeatureTable bool, - batch_size int, - total_seq_size int) { - - for chunks := range input { - scanner := bufio.NewScanner(chunks.raw) - order := chunks.order +func EmblChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { + parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { + scanner := bufio.NewScanner(input) sequences := make(obiseq.BioSequenceSlice, 0, 100) id := "" scientificName := "" @@ -156,7 +150,31 @@ func _ParseEmblFile(source string, input ChannelSeqFileChunk, seqBytes = new(bytes.Buffer) } } - out.Push(obiiter.MakeBioSequenceBatch(order, sequences)) + + return sequences, nil + + } + + return parser +} + +func _ParseEmblFile( + input ChannelSeqFileChunk, + out obiiter.IBioSequence, + withFeatureTable bool, +) { + + parser := EmblChunkParser(withFeatureTable) + + for chunks := range input { + order := chunks.Order + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("%s : Cannot parse the embl file : %v", chunks.Source, err) + } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, order, sequences)) } out.Done() @@ -166,12 +184,18 @@ func _ParseEmblFile(source string, input ChannelSeqFileChunk, // 6 5 43 2 1 // // ?//? -func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { +func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) buff := make([]byte, 1024*1024*1024*256) - entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry) + entry_channel := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFlatFileEntry, + ) + newIter := obiiter.MakeIBioSequence() nworkers := opt.ParallelWorkers() @@ -179,10 +203,11 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { newIter.Add(1) - go _ParseEmblFile(opt.Source(), entry_channel, newIter, + go _ParseEmblFile( + entry_channel, + newIter, opt.WithFeatureTable(), - opt.BatchSize(), - opt.TotalSeqSize()) + ) } go func() { @@ -193,7 +218,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { newIter = newIter.CompleteFileIterator() } - return newIter + return newIter, nil } func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { @@ -214,5 +239,5 @@ func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSeque return obiiter.NilIBioSequence, err } - return ReadEMBL(reader, options...), nil + return ReadEMBL(reader, options...) } diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index 4eb6656..8e9408c 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -14,7 +14,7 @@ import ( log "github.com/sirupsen/logrus" ) -func _EndOfLastFastaEntry(buffer []byte) int { +func EndOfLastFastaEntry(buffer []byte) int { var i int imax := len(buffer) @@ -39,24 +39,18 @@ func _EndOfLastFastaEntry(buffer []byte) int { return last } -func _ParseFastaFile(source string, - input ChannelSeqFileChunk, - out obiiter.IBioSequence, - no_order bool, - batch_size int, - chunck_order func() int, -) { +func FastaChunkParser() func(string, io.Reader) (obiseq.BioSequenceSlice, error) { - var identifier string - var definition string + parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { + var identifier string + var definition string - idBytes := bytes.Buffer{} - defBytes := bytes.Buffer{} - seqBytes := bytes.Buffer{} + idBytes := bytes.Buffer{} + defBytes := bytes.Buffer{} + seqBytes := bytes.Buffer{} - for chunks := range input { state := 0 - scanner := bufio.NewReader(chunks.raw) + scanner := bufio.NewReader(input) start, _ := scanner.Peek(20) if start[0] != '>' { log.Fatalf("%s : first character is not '>'", string(start)) @@ -64,7 +58,8 @@ func _ParseFastaFile(source string, if start[1] == ' ' { log.Fatalf("%s :Strange", string(start)) } - sequences := make(obiseq.BioSequenceSlice, 0, batch_size) + + sequences := obiseq.MakeBioSequenceSlice(100)[:0] previous := byte(0) @@ -160,12 +155,6 @@ func _ParseFastaFile(source string, s := obiseq.NewBioSequence(identifier, rawseq, definition) s.SetSource(source) sequences = append(sequences, s) - if no_order { - if len(sequences) == batch_size { - out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) - sequences = make(obiseq.BioSequenceSlice, 0, batch_size) - } - } state = 1 } else { // Error @@ -209,13 +198,28 @@ func _ParseFastaFile(source string, sequences = append(sequences, s) } - if len(sequences) > 0 { - co := chunks.order - if no_order { - co = chunck_order() - } - out.Push(obiiter.MakeBioSequenceBatch(co, sequences)) + return sequences, nil + } + + return parser +} + +func _ParseFastaFile( + input ChannelSeqFileChunk, + out obiiter.IBioSequence, +) { + + parser := FastaChunkParser() + + for chunks := range input { + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("File %s : Cannot parse the fasta file : %v", chunks.Source, err) } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences)) + } out.Done() @@ -230,17 +234,16 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e buff := make([]byte, 1024*1024*1024) - chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastaEntry) - chunck_order := obiutils.AtomicCounter() + chkchan := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFastaEntry, + ) for i := 0; i < nworker; i++ { out.Add(1) - go _ParseFastaFile(opt.Source(), - chkchan, - out, - opt.NoOrder(), - opt.BatchSize(), - chunck_order) + go _ParseFastaFile(chkchan, out) } go func() { @@ -282,7 +285,7 @@ func ReadFastaFromFile(filename string, options ...WithOption) (obiiter.IBioSequ } func ReadFastaFromStdin(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { - options = append(options, OptionsSource(obiutils.RemoveAllExt("stdin"))) + options = append(options, OptionsSource("stdin")) input, err := Buf(os.Stdin) if err == ErrNoContent { diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index 1bfcb92..64d37b8 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -14,7 +14,7 @@ import ( log "github.com/sirupsen/logrus" ) -func _EndOfLastFastqEntry(buffer []byte) int { +func EndOfLastFastqEntry(buffer []byte) int { var i int imax := len(buffer) @@ -117,27 +117,20 @@ func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality out.SetQualities(q) } -func _ParseFastqFile(source string, - input ChannelSeqFileChunk, - out obiiter.IBioSequence, - quality_shift byte, - no_order bool, - batch_size int, - chunck_order func() int, -) { +func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { + parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { - var identifier string - var definition string + var identifier string + var definition string - idBytes := bytes.Buffer{} - defBytes := bytes.Buffer{} - qualBytes := bytes.Buffer{} - seqBytes := bytes.Buffer{} + idBytes := bytes.Buffer{} + defBytes := bytes.Buffer{} + qualBytes := bytes.Buffer{} + seqBytes := bytes.Buffer{} - for chunks := range input { state := 0 - scanner := bufio.NewReader(chunks.raw) - sequences := make(obiseq.BioSequenceSlice, 0, 100) + scanner := bufio.NewReader(input) + sequences := obiseq.MakeBioSequenceSlice(100)[:0] previous := byte(0) for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() { @@ -257,14 +250,6 @@ func _ParseFastqFile(source string, case 10: if is_end_of_line { _storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift) - - if no_order { - if len(sequences) == batch_size { - out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) - sequences = make(obiseq.BioSequenceSlice, 0, batch_size) - } - } - state = 11 } else { qualBytes.WriteByte(C) @@ -288,14 +273,31 @@ func _ParseFastqFile(source string, _storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift) state = 1 } - - co := chunks.order - if no_order { - co = chunck_order() - } - out.Push(obiiter.MakeBioSequenceBatch(co, sequences)) } + return sequences, nil + } + + return parser +} + +func _ParseFastqFile( + input ChannelSeqFileChunk, + out obiiter.IBioSequence, + quality_shift byte, +) { + + parser := FastqChunkParser(quality_shift) + + for chunks := range input { + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err) + } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences)) + } out.Done() @@ -307,21 +309,23 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e out := obiiter.MakeIBioSequence() nworker := opt.ParallelWorkers() - chunkorder := obiutils.AtomicCounter() buff := make([]byte, 1024*1024*1024) - chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastqEntry) + chkchan := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFastqEntry, + ) for i := 0; i < nworker; i++ { out.Add(1) - go _ParseFastqFile(opt.Source(), + go _ParseFastqFile( chkchan, out, byte(obioptions.InputQualityShift()), - opt.NoOrder(), - opt.BatchSize(), - chunkorder) + ) } go func() { diff --git a/pkg/obiformats/fastseq_read.go b/pkg/obiformats/fastseq_read.go index 7a9c2c6..f43e361 100644 --- a/pkg/obiformats/fastseq_read.go +++ b/pkg/obiformats/fastseq_read.go @@ -69,7 +69,7 @@ func _FastseqReader(source string, slice = append(slice, rep) ii++ if ii >= batch_size { - iterator.Push(obiiter.MakeBioSequenceBatch(i, slice)) + iterator.Push(obiiter.MakeBioSequenceBatch(source, i, slice)) slice = obiseq.MakeBioSequenceSlice() i++ ii = 0 @@ -77,7 +77,7 @@ func _FastseqReader(source string, } if len(slice) > 0 { - iterator.Push(obiiter.MakeBioSequenceBatch(i, slice)) + iterator.Push(obiiter.MakeBioSequenceBatch(source, i, slice)) } iterator.Done() diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index fbb2be8..5b899e1 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -7,8 +7,6 @@ import ( "io" "os" "strings" - "sync" - "time" log "github.com/sirupsen/logrus" @@ -76,7 +74,7 @@ func FormatFasta(seq *obiseq.BioSequence, formater FormatHeader) string { // - skipEmpty: a boolean indicating whether empty sequences should be skipped or not. // // It returns a byte array containing the formatted sequences. -func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) []byte { +func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) *bytes.Buffer { // Create a buffer to store the formatted sequences var bs bytes.Buffer @@ -116,7 +114,7 @@ func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, ski } // Return the byte array representation of the buffer - return bs.Bytes() + return &bs } // WriteFasta writes a given iterator of bio sequences to a file in FASTA format. @@ -135,21 +133,16 @@ func WriteFasta(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() - obiiter.RegisterAPipe() - chunkchan := make(chan FileChunck) + chunkchan := WriteSeqFileChunk(file, opt.CloseFile()) header_format := opt.FormatFastSeqHeader() newIter.Add(nwriters) - var waitWriter sync.WaitGroup go func() { newIter.WaitAndClose() - for len(chunkchan) > 0 { - time.Sleep(time.Millisecond) - } close(chunkchan) - waitWriter.Wait() + log.Warnf("Writing fasta file done") }() ff := func(iterator obiiter.IBioSequence) { @@ -159,10 +152,12 @@ func WriteFasta(iterator obiiter.IBioSequence, log.Debugf("Formating fasta chunk %d", batch.Order()) - chunkchan <- FileChunck{ - FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()), - batch.Order(), + chunkchan <- SeqFileChunk{ + Source: batch.Source(), + Raw: FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()), + Order: batch.Order(), } + log.Debugf("Fasta chunk %d formated", batch.Order()) newIter.Push(batch) @@ -176,39 +171,6 @@ func WriteFasta(iterator obiiter.IBioSequence, go ff(iterator.Split()) } - next_to_send := 0 - received := make(map[int]FileChunck, 100) - - waitWriter.Add(1) - go func() { - for chunk := range chunkchan { - if chunk.order == next_to_send { - file.Write(chunk.text) - log.Debugf("Fasta chunk %d written", chunk.order) - next_to_send++ - chunk, ok := received[next_to_send] - for ok { - file.Write(chunk.text) - log.Debugf("Fasta chunk %d written", chunk.order) - delete(received, next_to_send) - next_to_send++ - chunk, ok = received[next_to_send] - } - } else { - log.Debugf("Store Fasta chunk %d", chunk.order) - received[chunk.order] = chunk - } - - } - - file.Close() - - log.Debugln("End of the fasta file writing") - obiiter.UnregisterPipe() - waitWriter.Done() - - }() - return newIter, nil } diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 719d429..a09f2f2 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -14,6 +14,8 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) +type FormatSeqBatch func(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) *bytes.Buffer + func _formatFastq(buff *bytes.Buffer, seq *obiseq.BioSequence, formater FormatHeader) { info := "" @@ -49,7 +51,7 @@ func FormatFastq(seq *obiseq.BioSequence, formater FormatHeader) string { } func FormatFastqBatch(batch obiiter.BioSequenceBatch, - formater FormatHeader, skipEmpty bool) []byte { + formater FormatHeader, skipEmpty bool) *bytes.Buffer { var bs bytes.Buffer lt := 0 @@ -82,12 +84,10 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch, } - chunk := bs.Bytes() - - return chunk + return &bs } -type FileChunck struct { +type FileChunk struct { text []byte order int } @@ -105,8 +105,7 @@ func WriteFastq(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() - obiiter.RegisterAPipe() - chunkchan := make(chan FileChunck) + chunkchan := WriteSeqFileChunk(file, opt.CloseFile()) header_format := opt.FormatFastSeqHeader() @@ -126,9 +125,10 @@ func WriteFastq(iterator obiiter.IBioSequence, ff := func(iterator obiiter.IBioSequence) { for iterator.Next() { batch := iterator.Get() - chunk := FileChunck{ - FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()), - batch.Order(), + chunk := SeqFileChunk{ + Source: batch.Source(), + Raw: FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()), + Order: batch.Order(), } chunkchan <- chunk newIter.Push(batch) @@ -142,44 +142,6 @@ func WriteFastq(iterator obiiter.IBioSequence, go ff(iterator.Split()) } - next_to_send := 0 - received := make(map[int]FileChunck, 100) - - waitWriter.Add(1) - go func() { - for chunk := range chunkchan { - if chunk.order == next_to_send { - if chunk.text[0] != '@' { - log.Panicln("WriteFastq: FASTQ format error") - } - file.Write(chunk.text) - next_to_send++ - chunk, ok := received[next_to_send] - for ok { - if chunk.text[0] != '@' { - log.Panicln("WriteFastq: FASTQ format error") - } - file.Write(chunk.text) - delete(received, next_to_send) - next_to_send++ - chunk, ok = received[next_to_send] - } - } else { - if _, ok := received[chunk.order]; ok { - log.Panicln("WriteFastq: Two chunks with the same number") - } - received[chunk.order] = chunk - } - - } - - file.Close() - - log.Debugln("End of the fastq file writing") - obiiter.UnregisterPipe() - waitWriter.Done() - }() - return newIter, nil } diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 0940d61..f1b261a 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -29,27 +29,11 @@ const ( var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp") -func _ParseGenbankFile(source string, - input ChannelSeqFileChunk, - out obiiter.IBioSequence, - chunck_order func() int, - withFeatureTable bool, - batch_size int, - total_seq_size int) { - state := inHeader - previous_chunk := -1 - - for chunks := range input { - - if state != inHeader { - log.Fatalf("Unexpected state %d starting new chunk (id = %d, previous_chunk = %d)", - state, chunks.order, previous_chunk) - } - - previous_chunk = chunks.order - scanner := bufio.NewReader(chunks.raw) - sequences := make(obiseq.BioSequenceSlice, 0, 100) - sumlength := 0 +func GenbankChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { + return func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { + state := inHeader + scanner := bufio.NewReader(input) + sequences := obiseq.MakeBioSequenceSlice(100)[:0] id := "" lseq := -1 scientificName := "" @@ -64,7 +48,7 @@ func _ParseGenbankFile(source string, nl++ line = string(bline) if is_prefix || len(line) > 100 { - log.Fatalf("Chunk %d : Line too long: %s", chunks.order, line) + log.Fatalf("From %s:Line too long: %s", source, line) } processed := false for !processed { @@ -165,15 +149,6 @@ func _ParseGenbankFile(source string, // sequence.Len(), seqBytes.Len()) sequences = append(sequences, sequence) - sumlength += sequence.Len() - - if len(sequences) == batch_size || sumlength > total_seq_size { - oo := chunck_order() - log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences") - out.Push(obiiter.MakeBioSequenceBatch(oo, sequences)) - sequences = make(obiseq.BioSequenceSlice, 0, 100) - sumlength = 0 - } defBytes = bytes.NewBuffer(obiseq.GetSlice(200)) featBytes = new(bytes.Buffer) @@ -219,11 +194,24 @@ func _ParseGenbankFile(source string, } - if len(sequences) > 0 { - oo := chunck_order() - log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences") - out.Push(obiiter.MakeBioSequenceBatch(oo, sequences)) + return sequences, nil + } +} + +func _ParseGenbankFile(input ChannelSeqFileChunk, + out obiiter.IBioSequence, + withFeatureTable bool) { + + parser := GenbankChunkParser(withFeatureTable) + + for chunks := range input { + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("File %s : Cannot parse the genbank file : %v", chunks.Source, err) } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences)) } log.Debug("End of the Genbank thread") @@ -231,26 +219,31 @@ func _ParseGenbankFile(source string, } -func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence { +func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) // entry_channel := make(chan _FileChunk) buff := make([]byte, 1024*1024*1024*256) - entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry) + entry_channel := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFlatFileEntry, + ) + newIter := obiiter.MakeIBioSequence() nworkers := opt.ParallelWorkers() - chunck_order := obiutils.AtomicCounter() // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { newIter.Add(1) - go _ParseGenbankFile(opt.Source(), - entry_channel, newIter, chunck_order, + go _ParseGenbankFile( + entry_channel, + newIter, opt.WithFeatureTable(), - opt.BatchSize(), - opt.TotalSeqSize()) + ) } // go _ReadFlatFileChunk(reader, entry_channel) @@ -264,7 +257,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence { newIter = newIter.CompleteFileIterator() } - return newIter + return newIter, nil } func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { @@ -285,5 +278,5 @@ func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSe return obiiter.NilIBioSequence, err } - return ReadGenbank(reader, options...), nil + return ReadGenbank(reader, options...) } diff --git a/pkg/obiformats/json_writer.go b/pkg/obiformats/json_writer.go index d9576a8..b51210c 100644 --- a/pkg/obiformats/json_writer.go +++ b/pkg/obiformats/json_writer.go @@ -3,7 +3,6 @@ package obiformats import ( "bufio" "bytes" - "github.com/goccy/go-json" "io" "os" "strconv" @@ -11,6 +10,8 @@ import ( "sync" "time" + "github.com/goccy/go-json" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" @@ -87,7 +88,7 @@ func WriteJSON(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() obiiter.RegisterAPipe() - chunkchan := make(chan FileChunck) + chunkchan := make(chan FileChunk) newIter.Add(nwriters) var waitWriter sync.WaitGroup @@ -106,7 +107,7 @@ func WriteJSON(iterator obiiter.IBioSequence, batch := iterator.Get() - chunkchan <- FileChunck{ + chunkchan <- FileChunk{ FormatJSONBatch(batch), batch.Order(), } @@ -116,7 +117,7 @@ func WriteJSON(iterator obiiter.IBioSequence, } next_to_send := 0 - received := make(map[int]FileChunck, 100) + received := make(map[int]FileChunk, 100) waitWriter.Add(1) go func() { diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 9dca35d..0d14d2c 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -7,7 +7,8 @@ import ( type __options__ struct { fastseq_header_parser obiseq.SeqAnnotator - fastseq_header_writer func(*obiseq.BioSequence) string + fastseq_header_writer BioSequenceFormater + seqBatchFormater FormatSeqBatch with_progress_bar bool buffer_size int batch_size int @@ -44,6 +45,7 @@ func MakeOptions(setters []WithOption) Options { o := __options__{ fastseq_header_parser: ParseGuessedFastSeqHeader, fastseq_header_writer: FormatFastSeqJsonHeader, + seqBatchFormater: nil, with_progress_bar: false, buffer_size: 2, parallel_workers: obioptions.CLIReadParallelWorkers(), @@ -103,6 +105,10 @@ func (opt Options) FormatFastSeqHeader() func(*obiseq.BioSequence) string { return opt.pointer.fastseq_header_writer } +func (opt Options) SequenceFormater() FormatSeqBatch { + return opt.pointer.seqBatchFormater +} + func (opt Options) NoOrder() bool { return opt.pointer.no_order } @@ -219,8 +225,6 @@ func OptionNoOrder(no_order bool) WithOption { return f } - - func OptionsCompressed(compressed bool) WithOption { f := WithOption(func(opt Options) { opt.pointer.compressed = compressed @@ -271,6 +275,14 @@ func OptionsFastSeqHeaderFormat(format func(*obiseq.BioSequence) string) WithOpt return f } +func OptionsSequenceFormater(formater FormatSeqBatch) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.seqBatchFormater = formater + }) + + return f +} + func OptionsParallelWorkers(nworkers int) WithOption { f := WithOption(func(opt Options) { opt.pointer.parallel_workers = nworkers diff --git a/pkg/obiformats/seqfile_chunck_read.go b/pkg/obiformats/seqfile_chunk_read.go similarity index 85% rename from pkg/obiformats/seqfile_chunck_read.go rename to pkg/obiformats/seqfile_chunk_read.go index 2eb57c4..fd428bd 100644 --- a/pkg/obiformats/seqfile_chunck_read.go +++ b/pkg/obiformats/seqfile_chunk_read.go @@ -5,14 +5,18 @@ import ( "io" "slices" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" log "github.com/sirupsen/logrus" ) -var _FileChunkSize = 1 << 28 +var _FileChunkSize = 1024 * 1024 * 10 + +type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error) type SeqFileChunk struct { - raw io.Reader - order int + Source string + Raw *bytes.Buffer + Order int } type ChannelSeqFileChunk chan SeqFileChunk @@ -32,7 +36,9 @@ type LastSeqRecord func([]byte) int // // Returns: // None -func ReadSeqFileChunk(reader io.Reader, +func ReadSeqFileChunk( + source string, + reader io.Reader, buff []byte, splitter LastSeqRecord) ChannelSeqFileChunk { var err error @@ -88,7 +94,7 @@ func ReadSeqFileChunk(reader io.Reader, if len(buff) > 0 { io := bytes.NewBuffer(slices.Clone(buff)) - chunk_channel <- SeqFileChunk{io, i} + chunk_channel <- SeqFileChunk{source, io, i} i++ } @@ -96,7 +102,7 @@ func ReadSeqFileChunk(reader io.Reader, buff = fullbuff[0:lremain] lcp := copy(buff, fullbuff[pnext:]) if lcp < lremain { - log.Fatalf("Error copying remaining data of chunck %d : %d < %d", i, lcp, lremain) + log.Fatalf("Error copying remaining data of chunk %d : %d < %d", i, lcp, lremain) } } else { buff = buff[:0] @@ -112,7 +118,7 @@ func ReadSeqFileChunk(reader io.Reader, // Send the last chunk to the channel if len(buff) > 0 { io := bytes.NewBuffer(slices.Clone(buff)) - chunk_channel <- SeqFileChunk{io, i} + chunk_channel <- SeqFileChunk{source, io, i} } // Close the readers channel when the end of the file is reached diff --git a/pkg/obiformats/seqfile_chunk_write.go b/pkg/obiformats/seqfile_chunk_write.go new file mode 100644 index 0000000..acc566a --- /dev/null +++ b/pkg/obiformats/seqfile_chunk_write.go @@ -0,0 +1,51 @@ +package obiformats + +import ( + "io" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" + + log "github.com/sirupsen/logrus" +) + +func WriteSeqFileChunk( + writer io.WriteCloser, + toBeClosed bool) ChannelSeqFileChunk { + + obiiter.RegisterAPipe() + + chunk_channel := make(ChannelSeqFileChunk) + + go func() { + nextToPrint := 0 + toBePrinted := make(map[int]SeqFileChunk) + for chunk := range chunk_channel { + if chunk.Order == nextToPrint { + _, _ = writer.Write(chunk.Raw.Bytes()) + nextToPrint++ + + chunk, ok := toBePrinted[nextToPrint] + for ok { + _, _ = writer.Write(chunk.Raw.Bytes()) + delete(toBePrinted, nextToPrint) + nextToPrint++ + chunk, ok = toBePrinted[nextToPrint] + } + } else { + toBePrinted[chunk.Order] = chunk + } + } + + if toBeClosed { + err := writer.Close() + if err != nil { + log.Fatalf("Cannot close the writer : %v", err) + } + } + + obiiter.UnregisterPipe() + log.Warnf("The writer has been closed") + }() + + return chunk_channel +} diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index 97d820d..2caea86 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -15,6 +15,8 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) +type SequenceReader func(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) + // OBIMimeTypeGuesser is a function that takes an io.Reader as input and guesses the MIME type of the data. // It uses several detectors to identify specific file formats, such as FASTA, FASTQ, ecoPCR2, GenBank, and EMBL. // The function reads data from the input stream and analyzes it using the mimetype library. @@ -172,11 +174,11 @@ func ReadSequencesFromFile(filename string, case "text/fasta": return ReadFasta(reader, options...) case "text/ecopcr2": - return ReadEcoPCR(reader, options...), nil + return ReadEcoPCR(reader, options...) case "text/embl": - return ReadEMBL(reader, options...), nil + return ReadEMBL(reader, options...) case "text/genbank": - return ReadGenbank(reader, options...), nil + return ReadGenbank(reader, options...) case "text/csv": return ReadCSV(reader, options...) default: diff --git a/pkg/obiiter/batch.go b/pkg/obiiter/batch.go index 17f53eb..5c9bdc1 100644 --- a/pkg/obiiter/batch.go +++ b/pkg/obiiter/batch.go @@ -3,50 +3,118 @@ package obiiter import "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" type BioSequenceBatch struct { - slice obiseq.BioSequenceSlice - order int + source string + slice obiseq.BioSequenceSlice + order int } -var NilBioSequenceBatch = BioSequenceBatch{nil, -1} +var NilBioSequenceBatch = BioSequenceBatch{"", nil, -1} -func MakeBioSequenceBatch(order int, +// MakeBioSequenceBatch creates a new BioSequenceBatch with the given source, order, and sequences. +// +// Parameters: +// - source: The source of the BioSequenceBatch. +// - order: The order of the BioSequenceBatch. +// - sequences: The slice of BioSequence. +// +// Returns: +// - BioSequenceBatch: The newly created BioSequenceBatch. +func MakeBioSequenceBatch( + source string, + order int, sequences obiseq.BioSequenceSlice) BioSequenceBatch { return BioSequenceBatch{ - slice: sequences, - order: order, + source: source, + slice: sequences, + order: order, } } +// Order returns the order of the BioSequenceBatch. +// +// Returns: +// - int: The order of the BioSequenceBatch. func (batch BioSequenceBatch) Order() int { return batch.order } +// Source returns the source of the BioSequenceBatch. +// +// Returns: +// - string: The source of the BioSequenceBatch. +func (batch BioSequenceBatch) Source() string { + return batch.source +} + +// Reorder updates the order of the BioSequenceBatch and returns the updated batch. +// +// Parameters: +// - newOrder: The new order value to assign to the BioSequenceBatch. +// +// Returns: +// - BioSequenceBatch: The updated BioSequenceBatch with the new order value. func (batch BioSequenceBatch) Reorder(newOrder int) BioSequenceBatch { batch.order = newOrder return batch } +// Slice returns the BioSequenceSlice contained within the BioSequenceBatch. +// +// Returns: +// - obiseq.BioSequenceSlice: The BioSequenceSlice contained within the BioSequenceBatch. func (batch BioSequenceBatch) Slice() obiseq.BioSequenceSlice { return batch.slice } +// Len returns the number of BioSequence elements in the given BioSequenceBatch. +// +// Parameters: +// - batch: The BioSequenceBatch to get the length from. +// +// Return type: +// - int: The number of BioSequence elements in the BioSequenceBatch. func (batch BioSequenceBatch) Len() int { return len(batch.slice) } +// NotEmpty returns whether the BioSequenceBatch is empty or not. +// +// It checks if the BioSequenceSlice contained within the BioSequenceBatch is not empty. +// +// Returns: +// - bool: True if the BioSequenceBatch is not empty, false otherwise. func (batch BioSequenceBatch) NotEmpty() bool { return batch.slice.NotEmpty() } +// Pop0 returns and removes the first element of the BioSequenceBatch. +// +// It does not take any parameters. +// It returns a pointer to a BioSequence object. func (batch BioSequenceBatch) Pop0() *obiseq.BioSequence { return batch.slice.Pop0() } +// IsNil checks if the BioSequenceBatch's slice is nil. +// +// This function takes a BioSequenceBatch as a parameter and returns a boolean value indicating whether the slice of the BioSequenceBatch is nil or not. +// +// Parameters: +// - batch: The BioSequenceBatch to check for nil slice. +// +// Returns: +// - bool: True if the BioSequenceBatch's slice is nil, false otherwise. func (batch BioSequenceBatch) IsNil() bool { return batch.slice == nil } +// Recycle cleans up the BioSequenceBatch by recycling its elements and resetting its slice. +// +// If including_seq is true, each element of the BioSequenceBatch's slice is recycled using the Recycle method, +// and then set to nil. If including_seq is false, each element is simply set to nil. +// +// This function does not return anything. func (batch BioSequenceBatch) Recycle(including_seq bool) { batch.slice.Recycle(including_seq) batch.slice = nil diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index b82ee2c..a9f28aa 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -424,9 +424,11 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { order := 0 iterator = iterator.SortBatches() buffer := obiseq.MakeBioSequenceSlice() + source := "" for iterator.Next() { seqs := iterator.Get() + source = seqs.Source() lc := seqs.Len() remains := lc i := 0 @@ -436,7 +438,7 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { remains = lc - to_push - i buffer = append(buffer, seqs.Slice()[i:(i+to_push)]...) if len(buffer) == size { - newIter.Push(MakeBioSequenceBatch(order, buffer)) + newIter.Push(MakeBioSequenceBatch(source, order, buffer)) log.Debugf("Rebatch #%d pushd", order) order++ buffer = obiseq.MakeBioSequenceSlice() @@ -447,7 +449,7 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { } log.Debug("End of the rebatch loop") if len(buffer) > 0 { - newIter.Push(MakeBioSequenceBatch(order, buffer)) + newIter.Push(MakeBioSequenceBatch(source, order, buffer)) log.Debugf("Final Rebatch #%d pushd", order) } @@ -526,12 +528,14 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, trueOrder := 0 falseOrder := 0 iterator = iterator.SortBatches() + source := "" trueSlice := obiseq.MakeBioSequenceSlice() falseSlice := obiseq.MakeBioSequenceSlice() for iterator.Next() { seqs := iterator.Get() + source = seqs.Source() for _, s := range seqs.slice { if predicate(s) { trueSlice = append(trueSlice, s) @@ -540,13 +544,13 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, } if len(trueSlice) == size { - trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) + trueIter.Push(MakeBioSequenceBatch(source, trueOrder, trueSlice)) trueOrder++ trueSlice = obiseq.MakeBioSequenceSlice() } if len(falseSlice) == size { - falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) + falseIter.Push(MakeBioSequenceBatch(source, falseOrder, falseSlice)) falseOrder++ falseSlice = obiseq.MakeBioSequenceSlice() } @@ -555,11 +559,11 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, } if len(trueSlice) > 0 { - trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) + trueIter.Push(MakeBioSequenceBatch(source, trueOrder, trueSlice)) } if len(falseSlice) > 0 { - falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) + falseIter.Push(MakeBioSequenceBatch(source, falseOrder, falseSlice)) } trueIter.Done() @@ -686,17 +690,22 @@ func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate, // Load all sequences availables from an IBioSequenceBatch iterator into // a large obiseq.BioSequenceSlice. -func (iterator IBioSequence) Load() obiseq.BioSequenceSlice { +func (iterator IBioSequence) Load() (string, obiseq.BioSequenceSlice) { + + chunk := obiseq.MakeBioSequenceSlice() + source := "" - chunck := obiseq.MakeBioSequenceSlice() for iterator.Next() { b := iterator.Get() + if source == "" { + source = b.Source() + } log.Debugf("append %d sequences", b.Len()) - chunck = append(chunck, b.Slice()...) + chunk = append(chunk, b.Slice()...) b.Recycle(false) } - return chunck + return source, chunk } // CompleteFileIterator generates a new iterator for reading a complete file. @@ -718,10 +727,10 @@ func (iterator IBioSequence) CompleteFileIterator() IBioSequence { }() go func() { - slice := iterator.Load() + source, slice := iterator.Load() log.Printf("A batch of %d sequence is read", len(slice)) if len(slice) > 0 { - newIter.Push(MakeBioSequenceBatch(0, slice)) + newIter.Push(MakeBioSequenceBatch(source, 0, slice)) } newIter.Done() }() @@ -735,7 +744,7 @@ func (iterator IBioSequence) CompleteFileIterator() IBioSequence { // It takes a slice of BioSequence objects, and returns an iterator that will return batches of // BioSequence objects -func IBatchOver(data obiseq.BioSequenceSlice, +func IBatchOver(source string, data obiseq.BioSequenceSlice, size int, sizes ...int) IBioSequence { newIter := MakeIBioSequence() @@ -755,7 +764,7 @@ func IBatchOver(data obiseq.BioSequenceSlice, if next > ldata { next = ldata } - newIter.Push(MakeBioSequenceBatch(batchid, data[i:next])) + newIter.Push(MakeBioSequenceBatch(source, batchid, data[i:next])) batchid++ } diff --git a/pkg/obiiter/distribute.go b/pkg/obiiter/distribute.go index ec540eb..4c61d58 100644 --- a/pkg/obiiter/distribute.go +++ b/pkg/obiiter/distribute.go @@ -61,9 +61,12 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz go func() { iterator = iterator.SortBatches() + source := "" for iterator.Next() { seqs := iterator.Get() + source = seqs.Source() + for _, s := range seqs.Slice() { key := class.Code(s) slice, ok := slices[key] @@ -84,7 +87,7 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz *slice = append(*slice, s) if len(*slice) == batchsize { - outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice)) + outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice)) orders[key]++ s := obiseq.MakeBioSequenceSlice() slices[key] = &s @@ -95,7 +98,7 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz for key, slice := range slices { if len(*slice) > 0 { - outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice)) + outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice)) } } diff --git a/pkg/obiiter/fragment.go b/pkg/obiiter/fragment.go index bbddd23..2c09013 100644 --- a/pkg/obiiter/fragment.go +++ b/pkg/obiiter/fragment.go @@ -20,9 +20,11 @@ func IFragments(minsize, length, overlap, size, nworkers int) Pipeable { }() f := func(iterator IBioSequence, id int) { + source := "" for iterator.Next() { news := obiseq.MakeBioSequenceSlice() sl := iterator.Get() + source = sl.Source() for _, s := range sl.Slice() { if s.Len() <= minsize { @@ -52,7 +54,7 @@ func IFragments(minsize, length, overlap, size, nworkers int) Pipeable { s.Recycle() } } // End of the slice loop - newiter.Push(MakeBioSequenceBatch(sl.Order(), news)) + newiter.Push(MakeBioSequenceBatch(source, sl.Order(), news)) sl.Recycle(false) } // End of the iterator loop diff --git a/pkg/obiiter/paired.go b/pkg/obiiter/paired.go index 862537b..d8c0574 100644 --- a/pkg/obiiter/paired.go +++ b/pkg/obiiter/paired.go @@ -9,9 +9,11 @@ func (b BioSequenceBatch) IsPaired() bool { } func (b BioSequenceBatch) PairedWith() BioSequenceBatch { - return MakeBioSequenceBatch(b.order, - *b.slice.PairedWith()) - + return MakeBioSequenceBatch( + b.Source(), + b.order, + *b.slice.PairedWith(), + ) } func (b *BioSequenceBatch) PairTo(p *BioSequenceBatch) { diff --git a/pkg/obilua/lua.go b/pkg/obilua/lua.go index 9cdf82c..1855e44 100644 --- a/pkg/obilua/lua.go +++ b/pkg/obilua/lua.go @@ -225,7 +225,7 @@ func LuaProcessor(iterator obiiter.IBioSequence, name, program string, breakOnEr } } - newIter.Push(obiiter.MakeBioSequenceBatch(seqs.Order(), ns)) + newIter.Push(obiiter.MakeBioSequenceBatch(seqs.Source(), seqs.Order(), ns)) seqs.Recycle(false) } diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 07b092e..afafa1c 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,7 +7,7 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "2247c3b" +var _Commit = "bc1aaaf" var _Version = "" // Version returns the version of the obitools package. diff --git a/pkg/obitools/obiclean/obiclean.go b/pkg/obitools/obiclean/obiclean.go index de90af1..98530ea 100644 --- a/pkg/obitools/obiclean/obiclean.go +++ b/pkg/obitools/obiclean/obiclean.go @@ -57,7 +57,7 @@ func buildSamples(dataset obiseq.BioSequenceSlice, return samples } -func annotateOBIClean(dataset obiseq.BioSequenceSlice, +func annotateOBIClean(source string, dataset obiseq.BioSequenceSlice, sample map[string]*([]*seqPCR), tag, NAValue string) obiiter.IBioSequence { batchsize := 1000 @@ -91,7 +91,7 @@ func annotateOBIClean(dataset obiseq.BioSequenceSlice, return data, nil } - iter := obiiter.IBatchOver(dataset, batchsize) + iter := obiiter.IBatchOver(source, dataset, batchsize) riter := iter.MakeISliceWorker(annot, false) return riter @@ -288,7 +288,7 @@ func Weight(sequence *obiseq.BioSequence) map[string]int { func CLIOBIClean(itertator obiiter.IBioSequence) obiiter.IBioSequence { - db := itertator.Load() + source, db := itertator.Load() log.Infof("Sequence dataset of %d sequeences loaded\n", len(db)) @@ -365,7 +365,7 @@ func CLIOBIClean(itertator obiiter.IBioSequence) obiiter.IBioSequence { EmpiricalDistCsv(RatioTableFilename(), all_ratio) } - iter := annotateOBIClean(db, samples, SampleAttribute(), "NA") + iter := annotateOBIClean(source, db, samples, SampleAttribute(), "NA") if OnlyHead() { iter = iter.FilterOn(IsHead, 1000) diff --git a/pkg/obitools/obicleandb/obicleandb.go b/pkg/obitools/obicleandb/obicleandb.go index 6d19643..b93c3c6 100644 --- a/pkg/obitools/obicleandb/obicleandb.go +++ b/pkg/obitools/obicleandb/obicleandb.go @@ -274,11 +274,11 @@ func ICleanDB(itertator obiiter.IBioSequence) obiiter.IBioSequence { // obioptions.CLIParallelWorkers(), // ) - references := annotated.Load() + source, references := annotated.Load() mannwithney := MakeSequenceFamilyGenusWorker(references) - partof := obiiter.IBatchOver(references, + partof := obiiter.IBatchOver(source, references, obioptions.CLIBatchSize()) // genera_iterator, err := obichunk.ISequenceChunk( diff --git a/pkg/obitools/obiconsensus/obiconsensus.go b/pkg/obitools/obiconsensus/obiconsensus.go index 544b3bd..c4a147f 100644 --- a/pkg/obitools/obiconsensus/obiconsensus.go +++ b/pkg/obitools/obiconsensus/obiconsensus.go @@ -46,7 +46,12 @@ func BuildConsensus(seqs obiseq.BioSequenceSlice, if err == nil { defer fasta.Close() - fasta.Write(obiformats.FormatFastaBatch(obiiter.MakeBioSequenceBatch(0, seqs), obiformats.FormatFastSeqJsonHeader, false)) + fasta.Write(obiformats.FormatFastaBatch(obiiter.MakeBioSequenceBatch( + fmt.Sprintf("%s_consensus", consensus_id), + 0, + seqs, + ), + obiformats.FormatFastSeqJsonHeader, false).Bytes()) fasta.Close() } @@ -333,7 +338,7 @@ func CLIOBIMinion(itertator obiiter.IBioSequence) obiiter.IBioSequence { dirname := CLIGraphFilesDirectory() newIter := obiiter.MakeIBioSequence() - db := itertator.Load() + source, db := itertator.Load() log.Infof("Sequence dataset of %d sequeences loaded\n", len(db)) @@ -394,7 +399,7 @@ func CLIOBIMinion(itertator obiiter.IBioSequence) obiiter.IBioSequence { CLISampleAttribute(), CLIKmerSize()) - newIter.Push(obiiter.MakeBioSequenceBatch(sample_order, denoised)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, sample_order, denoised)) sample_order++ } diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index 3489236..e545004 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -14,7 +14,7 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" ) -func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { +func ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { var err error list_of_files := orderedset.NewOrderedSet() for _, fn := range filenames { @@ -39,7 +39,7 @@ func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { if info.IsDir() { if path != fn { - subdir, e := _ExpandListOfFiles(true, path) + subdir, e := ExpandListOfFiles(true, path) if e != nil { return e } @@ -113,19 +113,26 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { log.Printf("Reading sequences from stdin in %s\n", CLIInputFormat()) opts = append(opts, obiformats.OptionsSource("stdin")) + var err error + switch CLIInputFormat() { case "ecopcr": - iterator = obiformats.ReadEcoPCR(os.Stdin, opts...) + iterator, err = obiformats.ReadEcoPCR(os.Stdin, opts...) case "embl": - iterator = obiformats.ReadEMBL(os.Stdin, opts...) + iterator, err = obiformats.ReadEMBL(os.Stdin, opts...) case "genbank": - iterator = obiformats.ReadGenbank(os.Stdin, opts...) + iterator, err = obiformats.ReadGenbank(os.Stdin, opts...) default: iterator = obiformats.ReadFastSeqFromStdin(opts...) } + + if err != nil { + return obiiter.NilIBioSequence, err + } + } else { - list_of_files, err := _ExpandListOfFiles(false, filenames...) + list_of_files, err := ExpandListOfFiles(false, filenames...) if err != nil { return obiiter.NilIBioSequence, err } diff --git a/pkg/obitools/obijoin/join.go b/pkg/obitools/obijoin/join.go index 1f087ab..a34bdff 100644 --- a/pkg/obitools/obijoin/join.go +++ b/pkg/obitools/obijoin/join.go @@ -129,7 +129,7 @@ func CLIJoinSequences(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Fatalf("Cannot read the data file to merge with: %s %v", CLIJoinWith(), err) } - data := data_iter.Load() + _, data := data_iter.Load() keys := CLIBy() diff --git a/pkg/obitools/obilandmark/obilandmark.go b/pkg/obitools/obilandmark/obilandmark.go index c2ba2ab..1700def 100644 --- a/pkg/obitools/obilandmark/obilandmark.go +++ b/pkg/obitools/obilandmark/obilandmark.go @@ -103,7 +103,7 @@ func MapOnLandmarkSequences(library obiseq.BioSequenceSlice, landmark_idx []int, // which landmark it corresponds. func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSequence { - library := iterator.Load() + source, library := iterator.Load() library_size := len(library) n_landmark := CLINCenter() @@ -191,6 +191,6 @@ func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSeque } } - return obiiter.IBatchOver(library, obioptions.CLIBatchSize()) + return obiiter.IBatchOver(source, library, obioptions.CLIBatchSize()) } diff --git a/pkg/obitools/obipairing/pairing.go b/pkg/obitools/obipairing/pairing.go index 0ceb7be..1c5a13b 100644 --- a/pkg/obitools/obipairing/pairing.go +++ b/pkg/obitools/obipairing/pairing.go @@ -255,6 +255,7 @@ func IAssemblePESequencesBatch(iterator obiiter.IBioSequence, delta, minOverlap, minIdentity, withStats, true, fastAlign, fastModeRel, arena, &shifts) } newIter.Push(obiiter.MakeBioSequenceBatch( + batch.Source(), batch.Order(), cons, )) diff --git a/pkg/obitools/obirefidx/famlilyindexing.go b/pkg/obitools/obirefidx/famlilyindexing.go index 3e55f93..7f3637f 100644 --- a/pkg/obitools/obirefidx/famlilyindexing.go +++ b/pkg/obitools/obirefidx/famlilyindexing.go @@ -130,7 +130,7 @@ func MakeIndexingSliceWorker(indexslot, idslot string, func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Infoln("Family level reference database indexing...") log.Infoln("Loading database...") - references := iterator.Load() + source, references := iterator.Load() nref := len(references) log.Infof("Done. Database contains %d sequences", nref) @@ -154,7 +154,7 @@ func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Info("done") - partof := obiiter.IBatchOver(references, + partof := obiiter.IBatchOver(source, references, obioptions.CLIBatchSize()).MakeIWorker(taxonomy.MakeSetSpeciesWorker(), false, obioptions.CLIParallelWorkers(), @@ -243,7 +243,7 @@ func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { waiting.Wait() - results := obiiter.IBatchOver(references, + results := obiiter.IBatchOver(source, references, obioptions.CLIBatchSize()).Speed("Writing db", nref) return results diff --git a/pkg/obitools/obirefidx/obirefidx.go b/pkg/obitools/obirefidx/obirefidx.go index 74fcb90..a756217 100644 --- a/pkg/obitools/obirefidx/obirefidx.go +++ b/pkg/obitools/obirefidx/obirefidx.go @@ -125,7 +125,7 @@ func IndexSequence(seqidx int, func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Infoln("Loading database...") - references := iterator.Load() + source, references := iterator.Load() log.Infof("Done. Database contains %d sequences", len(references)) taxo, error := obifind.CLILoadSelectedTaxonomy() @@ -204,7 +204,7 @@ func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { sl = append(sl, iref) bar.Add(1) } - indexed.Push(obiiter.MakeBioSequenceBatch(l[0]/10, sl)) + indexed.Push(obiiter.MakeBioSequenceBatch(source, l[0]/10, sl)) } indexed.Done() diff --git a/pkg/obitools/obitag/options.go b/pkg/obitools/obitag/options.go index f9dbb2a..ce5d233 100644 --- a/pkg/obitools/obitag/options.go +++ b/pkg/obitools/obitag/options.go @@ -57,7 +57,9 @@ func CLIRefDB() obiseq.BioSequenceSlice { log.Panicf("Cannot open the reference library file : %s\n", _RefDB) } - return refdb.Load() + _, db := refdb.Load() + + return db } func CLIGeometricMode() bool { @@ -70,7 +72,7 @@ func CLIShouldISaveRefDB() bool { func CLISaveRefetenceDB(db obiseq.BioSequenceSlice) { if CLIShouldISaveRefDB() { - idb := obiiter.IBatchOver(db, 1000) + idb := obiiter.IBatchOver("", db, 1000) var newIter obiiter.IBioSequence diff --git a/pkg/obitools/obitag2/options.go b/pkg/obitools/obitag2/options.go index 74bdc58..accebb7 100644 --- a/pkg/obitools/obitag2/options.go +++ b/pkg/obitools/obitag2/options.go @@ -57,7 +57,9 @@ func CLIRefDB() obiseq.BioSequenceSlice { log.Panicf("Cannot open the reference library file : %s\n", _RefDB) } - return refdb.Load() + _, db := refdb.Load() + + return db } func CLIGeometricMode() bool { @@ -70,7 +72,7 @@ func CLIShouldISaveRefDB() bool { func CLISaveRefetenceDB(db obiseq.BioSequenceSlice) { if CLIShouldISaveRefDB() { - idb := obiiter.IBatchOver(db, 1000) + idb := obiiter.IBatchOver("", db, 1000) var newIter obiiter.IBioSequence