Refactor sequence file reading

Former-commit-id: 3dcb96e68da648d72bb585da047e3496427d7851
This commit is contained in:
Eric Coissac
2024-05-01 00:50:23 +02:00
parent 33d4d63acd
commit 5b98393a68
7 changed files with 491 additions and 555 deletions

View File

@@ -1,19 +1,105 @@
package obiformats
import (
"bufio"
"bytes"
"io"
"os"
"path"
"slices"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
)
func _EndOfLastFastqEntry(buffer []byte) int {
var i int
imax := len(buffer)
state := 0
restart := imax - 1
cut := imax
for i = imax - 1; i >= 0 && state < 7; i-- {
C := buffer[i]
is_end_of_line := C == '\r' || C == '\n'
is_space := C == ' ' || C == '\t'
is_sep := is_space || is_end_of_line
switch state {
case 0:
if C == '+' {
// Potential start of quality part step 1
state = 1
restart = i
}
case 1:
if is_end_of_line {
// Potential start of quality part step 2
state = 2
} else {
// it was not the start of quality part
state = 0
i = restart
}
case 2:
if is_sep {
// Potential start of quality part step 2 (stay in the same state)
state = 2
} else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' {
// End of the sequence
state = 3
} else {
// it was not the start of quality part
state = 0
i = restart
}
case 3:
if is_end_of_line {
// Entrering in the header line
state = 4
} else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' {
// progressing along of the sequence
state = 3
} else {
// it was not the sequence part
state = 0
i = restart
}
case 4:
if is_end_of_line {
state = 4
} else {
state = 5
}
case 5:
if is_end_of_line {
// It was not the header line
state = 0
i = restart
} else if C == '@' {
state = 6
cut = i
}
case 6:
if is_end_of_line {
state = 7
} else {
state = 0
i = restart
}
}
}
if i == 0 || state != 7 {
return -1
}
return cut
}
func lastFastqCut(buffer []byte) ([]byte, []byte) {
imax := len(buffer)
cut := imax
@@ -95,212 +181,154 @@ func lastFastqCut(buffer []byte) ([]byte, []byte) {
return []byte{}, buffer
}
func FastqChunkReader(r io.Reader, size int) (chan FastxChunk, error) {
out := make(chan FastxChunk)
buff := make([]byte, size)
func _ParseFastqFile(source string,
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
quality_shift byte) {
n, err := io.ReadFull(r, buff)
if err == io.ErrUnexpectedEOF {
err = nil
}
if n > 0 && err == nil {
if n < size {
buff = buff[:n]
}
go func(buff []byte) {
idx := 0
end := []byte{}
for err == nil && n > 0 {
buff = Concatenate(end, buff)
// fmt.Println("------------buff--pasted----------------")
// fmt.Println(string(buff))
buff, end = lastFastqCut(buff)
// fmt.Println("----------------buff--cutted------------")
// fmt.Println(string(buff))
// fmt.Println("------------------end-------------------")
// fmt.Println(string(end))
// fmt.Println("========================================")
if len(buff) > 0 {
out <- FastxChunk{
Bytes: bytes.Clone(buff),
index: idx,
}
idx++
} else {
size = size * 2
}
buff = slices.Grow(buff[:0], size)[0:size]
n, err = io.ReadFull(r, buff)
if n < size {
buff = buff[:n]
}
if err == io.ErrUnexpectedEOF {
err = nil
}
// fmt.Printf("n = %d, err = %v\n", n, err)
}
if len(end) > 0 {
out <- FastxChunk{
Bytes: bytes.Clone(end),
index: idx,
}
}
close(out)
}(buff)
}
return out, nil
}
func ParseFastqChunk(source string, ch FastxChunk, quality_shift byte) *obiiter.BioSequenceBatch {
slice := make(obiseq.BioSequenceSlice, 0, obioptions.CLIBatchSize())
state := 0
start := 0
current := 0
var identifier string
var definition string
for i := 0; i < len(ch.Bytes); i++ {
C := ch.Bytes[i]
is_end_of_line := C == '\r' || C == '\n'
is_space := C == ' ' || C == '\t'
is_sep := is_space || is_end_of_line
state := 0
// log.Infof("%s : state = %d pos = %d character = %c (%d)", source, state, i, C, C)
idBytes := new(bytes.Buffer)
defBytes := new(bytes.Buffer)
qualBytes := new(bytes.Buffer)
seqBytes := new(bytes.Buffer)
switch state {
case 0: // Beginning of sequence chunk must start with @
for chunks := range input {
scanner := bufio.NewReader(chunks.raw)
sequences := make(obiseq.BioSequenceSlice, 0, 100)
for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() {
if C == '@' {
// Beginning of sequence
state = 1
} else {
log.Errorf("%s : sequence entry is not starting with @", source)
return nil
}
case 1: // Beginning of identifier (Mandatory)
if is_sep {
// No identifier -> ERROR
log.Errorf("%s : sequence identifier is empty", source)
return nil
} else {
// Beginning of identifier
state = 2
start = i
}
case 2: // Following of the identifier
if is_sep {
// End of identifier
identifier = string(ch.Bytes[start:i])
state = 3
}
if is_end_of_line {
// Definition empty
definition = ""
state = 5
}
case 3: // Beginning of definition
if is_end_of_line {
// Definition empty
definition = ""
state = 5
} else if !is_space {
// Beginning of definition
start = i
state = 4
}
case 4: // Following of the definition
if is_end_of_line {
definition = string(ch.Bytes[start:i])
state = 5
}
case 5: // Beginning of sequence
if !is_end_of_line {
// Beginning of sequence
start = i
if C >= 'A' && C <= 'Z' {
ch.Bytes[current] = C + 'a' - 'A'
is_end_of_line := C == '\r' || C == '\n'
is_space := C == ' ' || C == '\t'
is_sep := is_space || is_end_of_line
switch state {
case 0: // Beginning of sequence chunk must start with @
if C == '@' {
// Beginning of sequence
state = 1
} else {
log.Errorf("%s : sequence entry is not starting with @", source)
}
current = i + 1
state = 6
}
case 6:
if is_end_of_line {
// End of sequence
s := obiseq.NewBioSequence(identifier, bytes.Clone(ch.Bytes[start:current]), definition)
s.SetSource(source)
slice = append(slice, s)
state = 7
} else {
if C >= 'A' && C <= 'Z' {
ch.Bytes[current] = C + 'a' - 'A'
case 1: // Beginning of identifier (Mandatory)
if is_sep {
// No identifier -> ERROR
log.Errorf("%s : sequence identifier is empty", source)
} else {
// Beginning of identifier
state = 2
idBytes.Reset()
idBytes.WriteByte(C)
}
case 2: // Following of the identifier
if is_sep {
// End of identifier
identifier = idBytes.String()
state = 3
}
if is_end_of_line {
// Definition empty
definition = ""
state = 5
}
case 3: // Beginning of definition
if is_end_of_line {
// Definition empty
definition = ""
state = 5
} else if !is_space {
// Beginning of definition
defBytes.Reset()
defBytes.WriteByte(C)
state = 4
}
case 4: // Following of the definition
if is_end_of_line {
definition = defBytes.String()
state = 5
}
case 5: // Beginning of sequence
if !is_end_of_line {
// Beginning of sequence
if C >= 'A' && C <= 'Z' {
C = C + 'a' - 'A'
}
seqBytes.Reset()
seqBytes.WriteByte(C)
state = 6
}
case 6:
if is_end_of_line {
// End of sequence
s := obiseq.NewBioSequence(identifier, slices.Clone(seqBytes.Bytes()), definition)
s.SetSource(source)
sequences = append(sequences, s)
state = 7
} else {
if C >= 'A' && C <= 'Z' {
C = C + 'a' - 'A'
}
seqBytes.WriteByte(C)
}
case 7:
if is_end_of_line {
state = 7
} else if C == '+' {
state = 8
} else {
log.Errorf("@%s[%s] : sequence data not followed by a line starting with + but a %c", identifier, source, C)
}
case 8:
if is_end_of_line {
state = 9
}
case 9:
if is_end_of_line {
state = 9
} else {
// beginning of quality
state = 10
qualBytes.Reset()
qualBytes.WriteByte(C)
}
case 10:
if is_end_of_line {
// End of quality
q := qualBytes.Bytes()
if len(q) != sequences[len(sequences)-1].Len() {
log.Errorf("%s[%s] : sequence data and quality lenght not equal (%d/%d)",
identifier, source, len(q), sequences[len(sequences)-1].Len())
}
for i := 0; i < len(q); i++ {
q[i] = q[i] - quality_shift
}
sequences[len(sequences)-1].SetQualities(q)
state = 11
}
case 11:
if is_end_of_line {
state = 11
} else if C == '@' {
state = 1
} else {
log.Errorf("%s[%s] : sequence record not followed by a line starting with @", identifier, source)
}
current = i + 1
}
case 7:
if is_end_of_line {
state = 7
} else if C == '+' {
state = 8
} else {
log.Info(ch.Bytes[0:i])
log.Info(string(ch.Bytes[0:i]))
log.Info(C)
log.Errorf("@%s[%s] : sequence data not followed by a line starting with +", identifier, source)
return nil // Error
}
case 8:
if is_end_of_line {
state = 9
}
case 9:
if is_end_of_line {
state = 9
} else {
// beginning of quality
state = 10
start = i
}
case 10:
if is_end_of_line {
// End of quality
q := ch.Bytes[start:i]
if len(q) != slice[len(slice)-1].Len() {
log.Errorf("%s[%s] : sequence data and quality lenght not equal (%d/%d)",
identifier, source, len(q), slice[len(slice)-1].Len())
return nil // Error quality lenght not equal to sequence length
}
for i := 0; i < len(q); i++ {
q[i] = q[i] - quality_shift
}
slice[len(slice)-1].SetQualities(q)
state = 11
}
case 11:
if is_end_of_line {
state = 11
} else if C == '@' {
state = 1
} else {
log.Errorf("%s[%s] : sequence record not followed by a line starting with @", identifier, source)
return nil
}
}
if len(sequences) > 0 {
log.Debugln("Pushing sequences")
out.Push(obiiter.MakeBioSequenceBatch(chunks.order, sequences))
}
}
batch := obiiter.MakeBioSequenceBatch(ch.index, slice)
return &batch
out.Done()
}
func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
@@ -310,37 +338,22 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
source := opt.Source()
nworker := obioptions.CLIReadParallelWorkers()
out.Add(nworker)
chkchan := ReadSeqFileChunk(reader, _EndOfLastFastqEntry)
chkchan, err := FastqChunkReader(reader, 1024*500)
if err != nil {
return obiiter.NilIBioSequence, err
for i := 0; i < nworker; i++ {
out.Add(1)
go _ParseFastqFile(source, chkchan, out,
byte(obioptions.InputQualityShift()))
}
go func() {
out.WaitAndClose()
}()
parser := func() {
defer out.Done()
for chk := range chkchan {
seqs := ParseFastqChunk(source, chk, byte(obioptions.InputQualityShift()))
if seqs != nil {
out.Push(*seqs)
} else {
log.Fatalf("error parsing %s", source)
}
}
}
for i := 0; i < nworker; i++ {
go parser()
}
newIter := out.SortBatches().Rebatch(opt.BatchSize())
log.Debugln("Full file batch mode : ", opt.FullFileBatch())
if opt.FullFileBatch() {
newIter = newIter.CompleteFileIterator()
}