diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index ab4634c..d2e16bd 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -15,13 +15,6 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) -var _FileChunkSize = 1 << 28 - -type _FileChunk struct { - raw io.Reader - order int -} - // _EndOfLastEntry finds the index of the last entry in the given byte slice 'buff' // using a pattern match of the form: // ?//? @@ -94,7 +87,7 @@ func _EndOfLastEntry(buff []byte) int { return -1 } -func _ParseEmblFile(source string, input <-chan _FileChunk, +func _ParseEmblFile(source string, input ChannelSeqFileChunk, out obiiter.IBioSequence, withFeatureTable bool, batch_size int, @@ -170,115 +163,28 @@ func _ParseEmblFile(source string, input <-chan _FileChunk, } -// _ReadFlatFileChunk reads a chunk of data from the given 'reader' and sends it to the -// 'readers' channel as a _FileChunk struct. The function reads from the reader until -// the end of the last entry is found, then sends the chunk to the channel. If the end -// of the last entry is not found in the current chunk, the function reads from the reader -// in 1 MB increments until the end of the last entry is found. The function repeats this -// process until the end of the file is reached. -// -// Arguments: -// reader io.Reader - an io.Reader to read data from -// readers chan _FileChunk - a channel to send the data as a _FileChunk struct -// -// Returns: -// None -func _ReadFlatFileChunk(reader io.Reader, readers chan _FileChunk) { - var err error - var buff []byte - - size := 0 - l := 0 - i := 0 - - // Initialize the buffer to the size of a chunk of data - buff = make([]byte, _FileChunkSize) - - // Read from the reader until the buffer is full or the end of the file is reached - l, err = io.ReadFull(reader, buff) - buff = buff[:l] - - if err == io.ErrUnexpectedEOF { - err = nil - } - - // Read from the reader until the end of the last entry is found or the end of the file is reached - for err == nil { - - // Create an extended buffer to read from if the end of the last entry is not found in the current buffer - extbuff := make([]byte, _FileChunkSize) - end := 0 - ic := 0 - - // Read from the reader in 1 MB increments until the end of the last entry is found - for end = _EndOfLastEntry(buff); err == nil && end < 0; end = _EndOfLastEntry(buff) { - ic++ - size, err = io.ReadFull(reader, extbuff) - buff = append(buff, extbuff[:size]...) - } - - if len(buff) > 0 { - if end < 0 { - end = len(buff) - } - lremain := len(buff) - end - remains := make([]byte, max(lremain, _FileChunkSize)) - - lcp := copy(remains, buff[end:]) - remains = remains[:lcp] - if lcp < lremain { - log.Fatalf("Error copying remaining data of chunck %d : %d < %d", i, lcp, len(remains)) - } - - buff = buff[:end] - - for len(buff) > 0 && (buff[len(buff)-1] == '\n' || buff[len(buff)-1] == '\r') { - buff = buff[:len(buff)-1] - } - - if len(buff) > 0 { - io := bytes.NewBuffer(buff) - - if string(buff[io.Len()-2:]) != "//" { - log.Fatalf("File chunck ends with 3 bytes : %s", io.Bytes()[io.Len()-3:]) - } - - readers <- _FileChunk{io, i} - i++ - buff = remains - } - } - } - - // Close the readers channel when the end of the file is reached - close(readers) - -} - // 6 5 43 2 1 // // ?//? func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { opt := MakeOptions(options) - entry_channel := make(chan _FileChunk) + entry_channel := ReadSeqFileChunk(reader, _EndOfLastEntry) newIter := obiiter.MakeIBioSequence() nworkers := opt.ParallelWorkers() - newIter.Add(nworkers) - - go func() { - newIter.WaitAndClose() - }() // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { + newIter.Add(1) go _ParseEmblFile(opt.Source(), entry_channel, newIter, opt.WithFeatureTable(), opt.BatchSize(), opt.TotalSeqSize()) } - go _ReadFlatFileChunk(reader, entry_channel) + go func() { + newIter.WaitAndClose() + }() if opt.pointer.full_file_batch { newIter = newIter.CompleteFileIterator() diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index 7a3e642..95fa162 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -1,250 +1,155 @@ package obiformats import ( + "bufio" "bytes" - "fmt" "io" "os" "path" + "slices" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" - "golang.org/x/exp/slices" log "github.com/sirupsen/logrus" ) -// lastFastaCut extracts the up to the last sequence cut from a given buffer. -// -// It takes a parameter: -// - buffer []byte: the buffer to extract the sequence cut from. -// -// It returns two values: -// - []byte: the extracted sequences. -// - []byte: the remaining buffer after the sequence cut (the last sequence). -func lastFastaCut(buffer []byte) ([]byte, []byte) { +func _EndOfLastFastaEntry(buffer []byte) int { + var i int + imax := len(buffer) last := 0 state := 0 - for i := imax - 1; i >= 0 && state < 2; i-- { - if state == 0 && buffer[i] == '>' { + + for i = imax - 1; i >= 0 && state < 2; i-- { + C := buffer[i] + if C == '>' && state == 0 { state = 1 last = i - } else if state == 1 && (buffer[i] == '\r' || buffer[i] == '\n') { + } else if state == 1 && (C == '\n' || C == '\r') { state = 2 } else { state = 0 } } - if state == 2 { - return buffer[:last], bytes.Clone(buffer[last:]) + if i == 0 || state != 2 { + return -1 } - return []byte{}, buffer + return last } -// firstFastaCut cuts the input buffer at the first occurrence of a ">" character -// following a sequence of "\r" or "\n" characters. -// -// It takes a byte slice as input, representing the buffer to be cut. -// It returns two byte slices: the first slice contains the part of the buffer before the cut, -// and the second slice contains the part of the buffer after the cut. -func firstFastaCut(buffer []byte) ([]byte, []byte) { - imax := len(buffer) - last := 0 - state := 0 - for i := 0; i < imax && state < 2; i++ { - if (state == 0 || state == 1) && (buffer[i] == '\r' || buffer[i] == '\n') { - state = 1 - } else if (state == 1 || i == 0) && buffer[i] == '>' { - state = 2 - last = i - } else { - state = 0 - } - } +func _ParseFastaFile(source string, + input ChannelSeqFileChunk, + out obiiter.IBioSequence) { - if state == 2 { - return bytes.Clone(buffer[:last]), buffer[last:] - } - return buffer, []byte{} - -} - -func Concatenate[S ~[]E, E any](s1, s2 S) S { - if len(s1) > 0 { - if len(s2) > 0 { - return append(s1[:len(s1):len(s1)], s2...) - } - return s1 - } - return s2 -} - -type FastxChunk struct { - Bytes []byte - index int -} - -func FastaChunkReader(r io.Reader, size int, cutHead bool) (chan FastxChunk, error) { - out := make(chan FastxChunk) - buff := make([]byte, size) - - n, err := io.ReadFull(r, buff) - - if err == io.ErrUnexpectedEOF { - err = nil - } - - if n > 0 && err == nil { - if n < size { - buff = buff[:n] - } - - begin, buff := firstFastaCut(buff) - - if len(begin) > 0 && !cutHead { - return out, fmt.Errorf("begin is not empty : %s", string(begin)) - } - - go func(buff []byte) { - idx := 0 - end := []byte{} - - for err == nil && n > 0 { - buff = Concatenate(end, buff) - buff, end = lastFastaCut(buff) - if len(buff) > 0 { - out <- FastxChunk{ - Bytes: bytes.Clone(buff), - index: idx, - } - idx++ - } else { - size = size * 2 - } - - buff = slices.Grow(buff[:0], size)[0:size] - n, err = io.ReadFull(r, buff) - if n < size { - buff = buff[:n] - } - - if err == io.ErrUnexpectedEOF { - err = nil - } - - // fmt.Printf("n = %d, err = %v\n", n, err) - } - - if len(end) > 0 { - out <- FastxChunk{ - Bytes: bytes.Clone(end), - index: idx, - } - } - - close(out) - }(buff) - } - - return out, nil -} - -func ParseFastaChunk(source string, ch FastxChunk) *obiiter.BioSequenceBatch { - slice := make(obiseq.BioSequenceSlice, 0, obioptions.CLIBatchSize()) - - state := 0 - start := 0 - current := 0 var identifier string var definition string - for i := 0; i < len(ch.Bytes); i++ { - C := ch.Bytes[i] - is_end_of_line := C == '\r' || C == '\n' - is_space := C == ' ' || C == '\t' - is_sep := is_space || is_end_of_line + state := 0 - switch state { - case 0: - if C == '>' { - // Beginning of sequence - state = 1 - } - case 1: - if is_sep { - // No identifier -> ERROR - log.Errorf("%s : sequence entry does not have an identifier", source) - return nil - } else { - // Beginning of identifier - state = 2 - start = i - } - case 2: - if is_sep { - // End of identifier - identifier = string(ch.Bytes[start:i]) - state = 3 - } - if is_end_of_line { - // Definition empty - definition = "" - state = 5 - } - case 3: - if is_end_of_line { - // Definition empty - definition = "" - state = 5 - } else if !is_space { - // Beginning of definition - start = i - state = 4 - } - case 4: - if is_end_of_line { - definition = string(ch.Bytes[start:i]) - state = 5 + idBytes := new(bytes.Buffer) + defBytes := new(bytes.Buffer) + seqBytes := new(bytes.Buffer) - } - case 5: - if !is_end_of_line { - // Beginning of sequence - start = i - if C >= 'A' && C <= 'Z' { - ch.Bytes[current] = C + 'a' - 'A' + for chunks := range input { + scanner := bufio.NewReader(chunks.raw) + sequences := make(obiseq.BioSequenceSlice, 0, 100) + for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() { + + is_end_of_line := C == '\r' || C == '\n' + is_space := C == ' ' || C == '\t' + is_sep := is_space || is_end_of_line + + switch state { + case 0: + if C == '>' { + // Beginning of sequence + state = 1 } - current = i + 1 - state = 6 - } - case 6: - if C == '>' { - // End of sequence - s := obiseq.NewBioSequence(identifier, bytes.Clone(ch.Bytes[start:current]), definition) - s.SetSource(source) - slice = append(slice, s) - state = 1 - - } else if !is_sep { - if C >= 'A' && C <= 'Z' { - C = C + 'a' - 'A' + case 1: + if is_sep { + // No identifier -> ERROR + log.Errorf("%s : sequence entry does not have an identifier", source) + } else { + // Beginning of identifier + idBytes.Reset() + state = 2 + idBytes.WriteByte(C) } - // Removing white space from the sequence - if (C >= 'a' && C <= 'z') || C == '-' || C == '.' || C == '[' || C == ']' { - ch.Bytes[current] = C - current++ + case 2: + if is_sep { + // End of identifier + identifier = idBytes.String() + idBytes.Reset() + state = 3 + } else { + idBytes.WriteByte(C) + } + if is_end_of_line { + // Definition empty + definition = "" + state = 5 + } + case 3: + if is_end_of_line { + // Definition empty + definition = "" + state = 5 + } else if !is_space { + // Beginning of definition + defBytes.Reset() + defBytes.WriteByte(C) + state = 4 + } + case 4: + if is_end_of_line { + definition = defBytes.String() + state = 5 + } + case 5: + if !is_end_of_line { + // Beginning of sequence + seqBytes.Reset() + if C >= 'A' && C <= 'Z' { + C = C + 'a' - 'A' + } + + if (C >= 'a' && C <= 'z') || C == '-' || C == '.' || C == '[' || C == ']' { + seqBytes.WriteByte(C) + } + state = 6 + } + case 6: + if C == '>' { + // End of sequence + s := obiseq.NewBioSequence(identifier, slices.Clone(seqBytes.Bytes()), definition) + s.SetSource(source) + sequences = append(sequences, s) + state = 1 + + } else if !is_sep { + if C >= 'A' && C <= 'Z' { + C = C + 'a' - 'A' + } + // Removing white space from the sequence + if (C >= 'a' && C <= 'z') || C == '-' || C == '.' || C == '[' || C == ']' { + seqBytes.WriteByte(C) + } } } } + + if len(sequences) > 0 { + log.Debugln("Pushing sequences") + out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences)) + } } - slice = append(slice, obiseq.NewBioSequence(identifier, bytes.Clone(ch.Bytes[start:current]), definition)) - batch := obiiter.MakeBioSequenceBatch(ch.index, slice) - return &batch + out.Done() + } func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { @@ -254,35 +159,21 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e source := opt.Source() nworker := obioptions.CLIReadParallelWorkers() - out.Add(nworker) + chkchan := ReadSeqFileChunk(reader, _EndOfLastFastaEntry) - chkchan, err := FastaChunkReader(reader, 1024*500, false) - - if err != nil { - return obiiter.NilIBioSequence, err + for i := 0; i < nworker; i++ { + out.Add(1) + go _ParseFastaFile(source, chkchan, out) } go func() { out.WaitAndClose() }() - parser := func() { - defer out.Done() - for chk := range chkchan { - seqs := ParseFastaChunk(source, chk) - if seqs != nil { - out.Push(*seqs) - } - } - } - - for i := 0; i < nworker; i++ { - go parser() - } - newIter := out.SortBatches().Rebatch(opt.BatchSize()) log.Debugln("Full file batch mode : ", opt.FullFileBatch()) + if opt.FullFileBatch() { newIter = newIter.CompleteFileIterator() } diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index b5ba76d..1c6c22c 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -1,19 +1,105 @@ package obiformats import ( + "bufio" "bytes" "io" "os" "path" + "slices" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" log "github.com/sirupsen/logrus" - "golang.org/x/exp/slices" ) +func _EndOfLastFastqEntry(buffer []byte) int { + var i int + + imax := len(buffer) + state := 0 + restart := imax - 1 + cut := imax + + for i = imax - 1; i >= 0 && state < 7; i-- { + C := buffer[i] + is_end_of_line := C == '\r' || C == '\n' + is_space := C == ' ' || C == '\t' + is_sep := is_space || is_end_of_line + + switch state { + case 0: + if C == '+' { + // Potential start of quality part step 1 + state = 1 + restart = i + } + case 1: + if is_end_of_line { + // Potential start of quality part step 2 + state = 2 + } else { + // it was not the start of quality part + state = 0 + i = restart + } + case 2: + if is_sep { + // Potential start of quality part step 2 (stay in the same state) + state = 2 + } else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' { + // End of the sequence + state = 3 + } else { + // it was not the start of quality part + state = 0 + i = restart + } + case 3: + if is_end_of_line { + // Entrering in the header line + state = 4 + } else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' { + // progressing along of the sequence + state = 3 + } else { + // it was not the sequence part + state = 0 + i = restart + } + case 4: + if is_end_of_line { + state = 4 + } else { + state = 5 + } + case 5: + if is_end_of_line { + // It was not the header line + state = 0 + i = restart + } else if C == '@' { + state = 6 + cut = i + } + case 6: + if is_end_of_line { + state = 7 + } else { + state = 0 + i = restart + } + } + } + + if i == 0 || state != 7 { + return -1 + } + return cut +} + func lastFastqCut(buffer []byte) ([]byte, []byte) { imax := len(buffer) cut := imax @@ -95,212 +181,154 @@ func lastFastqCut(buffer []byte) ([]byte, []byte) { return []byte{}, buffer } -func FastqChunkReader(r io.Reader, size int) (chan FastxChunk, error) { - out := make(chan FastxChunk) - buff := make([]byte, size) +func _ParseFastqFile(source string, + input ChannelSeqFileChunk, + out obiiter.IBioSequence, + quality_shift byte) { - n, err := io.ReadFull(r, buff) - - if err == io.ErrUnexpectedEOF { - err = nil - } - - if n > 0 && err == nil { - if n < size { - buff = buff[:n] - } - - go func(buff []byte) { - idx := 0 - end := []byte{} - - for err == nil && n > 0 { - buff = Concatenate(end, buff) - // fmt.Println("------------buff--pasted----------------") - // fmt.Println(string(buff)) - buff, end = lastFastqCut(buff) - // fmt.Println("----------------buff--cutted------------") - // fmt.Println(string(buff)) - // fmt.Println("------------------end-------------------") - // fmt.Println(string(end)) - // fmt.Println("========================================") - if len(buff) > 0 { - out <- FastxChunk{ - Bytes: bytes.Clone(buff), - index: idx, - } - idx++ - } else { - size = size * 2 - } - - buff = slices.Grow(buff[:0], size)[0:size] - n, err = io.ReadFull(r, buff) - if n < size { - buff = buff[:n] - } - - if err == io.ErrUnexpectedEOF { - err = nil - } - // fmt.Printf("n = %d, err = %v\n", n, err) - } - - if len(end) > 0 { - out <- FastxChunk{ - Bytes: bytes.Clone(end), - index: idx, - } - } - - close(out) - }(buff) - } - - return out, nil -} - -func ParseFastqChunk(source string, ch FastxChunk, quality_shift byte) *obiiter.BioSequenceBatch { - slice := make(obiseq.BioSequenceSlice, 0, obioptions.CLIBatchSize()) - - state := 0 - start := 0 - current := 0 var identifier string var definition string - for i := 0; i < len(ch.Bytes); i++ { - C := ch.Bytes[i] - is_end_of_line := C == '\r' || C == '\n' - is_space := C == ' ' || C == '\t' - is_sep := is_space || is_end_of_line + state := 0 - // log.Infof("%s : state = %d pos = %d character = %c (%d)", source, state, i, C, C) + idBytes := new(bytes.Buffer) + defBytes := new(bytes.Buffer) + qualBytes := new(bytes.Buffer) + seqBytes := new(bytes.Buffer) - switch state { - case 0: // Beginning of sequence chunk must start with @ + for chunks := range input { + scanner := bufio.NewReader(chunks.raw) + sequences := make(obiseq.BioSequenceSlice, 0, 100) + for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() { - if C == '@' { - // Beginning of sequence - state = 1 - } else { - log.Errorf("%s : sequence entry is not starting with @", source) - return nil - } - case 1: // Beginning of identifier (Mandatory) - if is_sep { - // No identifier -> ERROR - log.Errorf("%s : sequence identifier is empty", source) - return nil - } else { - // Beginning of identifier - state = 2 - start = i - } - case 2: // Following of the identifier - if is_sep { - // End of identifier - identifier = string(ch.Bytes[start:i]) - state = 3 - } - if is_end_of_line { - // Definition empty - definition = "" - state = 5 - } - case 3: // Beginning of definition - if is_end_of_line { - // Definition empty - definition = "" - state = 5 - } else if !is_space { - // Beginning of definition - start = i - state = 4 - } - case 4: // Following of the definition - if is_end_of_line { - definition = string(ch.Bytes[start:i]) - state = 5 - } - case 5: // Beginning of sequence - if !is_end_of_line { - // Beginning of sequence - start = i - if C >= 'A' && C <= 'Z' { - ch.Bytes[current] = C + 'a' - 'A' + is_end_of_line := C == '\r' || C == '\n' + is_space := C == ' ' || C == '\t' + is_sep := is_space || is_end_of_line + + switch state { + case 0: // Beginning of sequence chunk must start with @ + + if C == '@' { + // Beginning of sequence + state = 1 + } else { + log.Errorf("%s : sequence entry is not starting with @", source) } - current = i + 1 - state = 6 - } - case 6: - if is_end_of_line { - // End of sequence - s := obiseq.NewBioSequence(identifier, bytes.Clone(ch.Bytes[start:current]), definition) - s.SetSource(source) - slice = append(slice, s) - state = 7 - } else { - if C >= 'A' && C <= 'Z' { - ch.Bytes[current] = C + 'a' - 'A' + case 1: // Beginning of identifier (Mandatory) + if is_sep { + // No identifier -> ERROR + log.Errorf("%s : sequence identifier is empty", source) + } else { + // Beginning of identifier + state = 2 + idBytes.Reset() + idBytes.WriteByte(C) + } + case 2: // Following of the identifier + if is_sep { + // End of identifier + identifier = idBytes.String() + state = 3 + } + if is_end_of_line { + // Definition empty + definition = "" + state = 5 + } + case 3: // Beginning of definition + if is_end_of_line { + // Definition empty + definition = "" + state = 5 + } else if !is_space { + // Beginning of definition + defBytes.Reset() + defBytes.WriteByte(C) + state = 4 + } + case 4: // Following of the definition + if is_end_of_line { + definition = defBytes.String() + state = 5 + } + case 5: // Beginning of sequence + if !is_end_of_line { + // Beginning of sequence + if C >= 'A' && C <= 'Z' { + C = C + 'a' - 'A' + } + seqBytes.Reset() + seqBytes.WriteByte(C) + state = 6 + } + case 6: + if is_end_of_line { + // End of sequence + s := obiseq.NewBioSequence(identifier, slices.Clone(seqBytes.Bytes()), definition) + s.SetSource(source) + sequences = append(sequences, s) + state = 7 + } else { + if C >= 'A' && C <= 'Z' { + C = C + 'a' - 'A' + } + seqBytes.WriteByte(C) + } + case 7: + if is_end_of_line { + state = 7 + } else if C == '+' { + state = 8 + } else { + log.Errorf("@%s[%s] : sequence data not followed by a line starting with + but a %c", identifier, source, C) + } + case 8: + if is_end_of_line { + state = 9 + } + case 9: + if is_end_of_line { + state = 9 + } else { + // beginning of quality + state = 10 + qualBytes.Reset() + qualBytes.WriteByte(C) + } + case 10: + if is_end_of_line { + // End of quality + q := qualBytes.Bytes() + if len(q) != sequences[len(sequences)-1].Len() { + log.Errorf("%s[%s] : sequence data and quality lenght not equal (%d/%d)", + identifier, source, len(q), sequences[len(sequences)-1].Len()) + } + for i := 0; i < len(q); i++ { + q[i] = q[i] - quality_shift + } + sequences[len(sequences)-1].SetQualities(q) + state = 11 + } + case 11: + if is_end_of_line { + state = 11 + } else if C == '@' { + state = 1 + } else { + log.Errorf("%s[%s] : sequence record not followed by a line starting with @", identifier, source) } - current = i + 1 - } - case 7: - if is_end_of_line { - state = 7 - } else if C == '+' { - state = 8 - } else { - log.Info(ch.Bytes[0:i]) - log.Info(string(ch.Bytes[0:i])) - log.Info(C) - log.Errorf("@%s[%s] : sequence data not followed by a line starting with +", identifier, source) - return nil // Error - } - case 8: - if is_end_of_line { - state = 9 - } - case 9: - if is_end_of_line { - state = 9 - } else { - // beginning of quality - state = 10 - start = i - } - case 10: - if is_end_of_line { - // End of quality - q := ch.Bytes[start:i] - if len(q) != slice[len(slice)-1].Len() { - log.Errorf("%s[%s] : sequence data and quality lenght not equal (%d/%d)", - identifier, source, len(q), slice[len(slice)-1].Len()) - return nil // Error quality lenght not equal to sequence length - } - for i := 0; i < len(q); i++ { - q[i] = q[i] - quality_shift - } - slice[len(slice)-1].SetQualities(q) - state = 11 - } - case 11: - if is_end_of_line { - state = 11 - } else if C == '@' { - state = 1 - } else { - log.Errorf("%s[%s] : sequence record not followed by a line starting with @", identifier, source) - return nil } + } + if len(sequences) > 0 { + log.Debugln("Pushing sequences") + out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences)) } } - batch := obiiter.MakeBioSequenceBatch(ch.index, slice) - return &batch + out.Done() + } func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { @@ -310,37 +338,22 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e source := opt.Source() nworker := obioptions.CLIReadParallelWorkers() - out.Add(nworker) + chkchan := ReadSeqFileChunk(reader, _EndOfLastFastqEntry) - chkchan, err := FastqChunkReader(reader, 1024*500) - - if err != nil { - return obiiter.NilIBioSequence, err + for i := 0; i < nworker; i++ { + out.Add(1) + go _ParseFastqFile(source, chkchan, out, + byte(obioptions.InputQualityShift())) } go func() { out.WaitAndClose() }() - parser := func() { - defer out.Done() - for chk := range chkchan { - seqs := ParseFastqChunk(source, chk, byte(obioptions.InputQualityShift())) - if seqs != nil { - out.Push(*seqs) - } else { - log.Fatalf("error parsing %s", source) - } - } - } - - for i := 0; i < nworker; i++ { - go parser() - } - newIter := out.SortBatches().Rebatch(opt.BatchSize()) log.Debugln("Full file batch mode : ", opt.FullFileBatch()) + if opt.FullFileBatch() { newIter = newIter.CompleteFileIterator() } diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index d287521..e4dbbaf 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -30,7 +30,8 @@ const ( var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp") func _ParseGenbankFile(source string, - input <-chan _FileChunk, out obiiter.IBioSequence, + input ChannelSeqFileChunk, + out obiiter.IBioSequence, chunck_order func() int, withFeatureTable bool, batch_size int, @@ -230,27 +231,31 @@ func _ParseGenbankFile(source string, func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence { opt := MakeOptions(options) - entry_channel := make(chan _FileChunk) + // entry_channel := make(chan _FileChunk) + entry_channel := ReadSeqFileChunk(reader, _EndOfLastEntry) newIter := obiiter.MakeIBioSequence() nworkers := opt.ParallelWorkers() chunck_order := obiutils.AtomicCounter() - newIter.Add(nworkers) + + // for j := 0; j < opt.ParallelWorkers(); j++ { + for j := 0; j < nworkers; j++ { + newIter.Add(1) + go _ParseGenbankFile(opt.Source(), + entry_channel, newIter, chunck_order, + opt.WithFeatureTable(), + opt.BatchSize(), + opt.TotalSeqSize()) + } + + // go _ReadFlatFileChunk(reader, entry_channel) go func() { newIter.WaitAndClose() }() - // for j := 0; j < opt.ParallelWorkers(); j++ { - for j := 0; j < nworkers; j++ { - go _ParseGenbankFile(opt.Source(), entry_channel, newIter, chunck_order, - opt.WithFeatureTable(), opt.BatchSize(), opt.TotalSeqSize()) - } - - go _ReadFlatFileChunk(reader, entry_channel) - - if opt.pointer.full_file_batch { + if opt.FullFileBatch() { newIter = newIter.CompleteFileIterator() } diff --git a/pkg/obiformats/seqfile_chunck_read.go b/pkg/obiformats/seqfile_chunck_read.go new file mode 100644 index 0000000..cc4c864 --- /dev/null +++ b/pkg/obiformats/seqfile_chunck_read.go @@ -0,0 +1,121 @@ +package obiformats + +import ( + "bytes" + "io" + "slices" + + log "github.com/sirupsen/logrus" +) + +var _FileChunkSize = 1 << 28 + +type SeqFileChunk struct { + raw io.Reader + order int +} + +type ChannelSeqFileChunk chan SeqFileChunk + +type LastSeqRecord func([]byte) int + +// _ReadFlatFileChunk reads a chunk of data from the given 'reader' and sends it to the +// 'readers' channel as a _FileChunk struct. The function reads from the reader until +// the end of the last entry is found, then sends the chunk to the channel. If the end +// of the last entry is not found in the current chunk, the function reads from the reader +// in 1 MB increments until the end of the last entry is found. The function repeats this +// process until the end of the file is reached. +// +// Arguments: +// reader io.Reader - an io.Reader to read data from +// readers chan _FileChunk - a channel to send the data as a _FileChunk struct +// +// Returns: +// None +func ReadSeqFileChunk(reader io.Reader, + splitter LastSeqRecord) ChannelSeqFileChunk { + var err error + var fullbuff []byte + var buff []byte + + chunk_channel := make(ChannelSeqFileChunk) + + go func() { + size := 0 + l := 0 + i := 0 + + // Initialize the buffer to the size of a chunk of data + fullbuff = make([]byte, _FileChunkSize, _FileChunkSize*2) + buff = fullbuff + + // Read from the reader until the buffer is full or the end of the file is reached + l, err = io.ReadFull(reader, buff) + buff = buff[:l] + + if err == io.ErrUnexpectedEOF { + err = nil + } + + // Read from the reader until the end of the last entry is found or the end of the file is reached + for err == nil { + // Create an extended buffer to read from if the end of the last entry is not found in the current buffer + end := 0 + ic := 0 + + // Read from the reader in 1 MB increments until the end of the last entry is found + for end = splitter(buff); err == nil && end < 0; end = splitter(buff) { + ic++ + buff = slices.Grow(buff, _FileChunkSize) + l := len(buff) + extbuff := buff[l:(l + _FileChunkSize - 1)] + size, err = io.ReadFull(reader, extbuff) + buff = buff[0:(l + size)] + } + + fullbuff = buff + + if len(buff) > 0 { + if end < 0 { + end = len(buff) + } + + + pnext := end + lremain := len(buff) - pnext + buff = buff[:end] + for len(buff) > 0 && (buff[len(buff)-1] == '\n' || buff[len(buff)-1] == '\r') { + buff = buff[:len(buff)-1] + } + + if len(buff) > 0 { + io := bytes.NewBuffer(slices.Clone(buff)) + chunk_channel <- SeqFileChunk{io, i} + i++ + + // if string(buff[io.Len()-2:]) != "//" { + // log.Fatalf("File chunck ends with 3 bytes : %s", io.Bytes()[io.Len()-3:]) + // } + + } + + if lremain > 0 { + buff = fullbuff[0:lremain] + lcp := copy(buff, fullbuff[pnext:]) + if lcp < lremain { + log.Fatalf("Error copying remaining data of chunck %d : %d < %d", i, lcp, lremain) + } + } else { + buff = buff[:0] + } + + } + } + + // Close the readers channel when the end of the file is reached + close(chunk_channel) + }() + + return chunk_channel + +} diff --git a/pkg/obiiter/workers.go b/pkg/obiiter/workers.go index 1ff72d9..b403584 100644 --- a/pkg/obiiter/workers.go +++ b/pkg/obiiter/workers.go @@ -1,8 +1,6 @@ package obiiter import ( - "runtime" - log "github.com/sirupsen/logrus" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" @@ -82,7 +80,6 @@ func (iterator IBioSequence) MakeISliceWorker(worker obiseq.SeqSliceWorker, brea log.Fatalf("Error on sequence processing : %v", err) } newIter.Push(batch) - runtime.GC() } newIter.Done() } diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index ce9b180..f9bf573 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -76,8 +76,11 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser } if options.Called("pprof") { - go http.ListenAndServe("localhost:8080", nil) - log.Infoln("Start a pprof server at address http://localhost:8080/debug/pprof") + url := "localhost:6060" + go http.ListenAndServe(url, nil) + log.Infof("Start a pprof server at address %s/debug/pprof", url) + log.Info("Profil can be followed running concurrently the command :") + log.Info(" go tool pprof -http=127.0.0.1:8080 'http://localhost:6060/debug/pprof/profile?seconds=30'") } // Handle user errors @@ -104,9 +107,9 @@ func GenerateOptionParser(optionset ...func(*getoptions.GetOpt)) ArgumentParser log.Printf("Number of workers set %d", CLIParallelWorkers()) - if options.Called("workers") { + // if options.Called("workers") { - } + // } if options.Called("solexa") { SetInputQualityShift(64)