Files
obitools4/pkg/obiformats/fastqseq_read.go
Eric Coissac a7ea47624b Optimisation du parsing des grandes séquences
Implémente une optimisation du parsing des grandes séquences en évitant l'allocation de mémoire inutile lors de la fusion des chunks. Ajoute un support pour le parsing direct de la structure rope, ce qui permet de réduire les allocations et d'améliorer les performances lors du traitement de fichiers GenBank/EMBL et FASTA/FASTQ de plusieurs Gbp. Les parseurs sont mis à jour pour utiliser la rope non-packée et le nouveau mécanisme d'écriture in-place pour les séquences GenBank.
2026-03-10 14:20:21 +01:00

410 lines
9.6 KiB
Go

package obiformats
import (
"bufio"
"bytes"
"io"
"os"
"path"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
log "github.com/sirupsen/logrus"
)
func EndOfLastFastqEntry(buffer []byte) int {
var i int
// obilog.Warnf("EndOfLastFastqEntry(%d): %s", len(buffer), string(buffer[0:20]))
imax := len(buffer)
state := 0
restart := imax - 1
cut := imax
for i = imax - 1; i >= 0 && state < 7; i-- {
C := buffer[i]
is_end_of_line := C == '\r' || C == '\n'
is_space := C == ' ' || C == '\t'
is_sep := is_space || is_end_of_line
switch state {
case 0:
if C == '+' {
// Potential start of quality part step 1
// log.Warn("Potential start of quality part step 1 - +")
state = 1
restart = i
}
case 1:
if is_end_of_line {
// Potential start of quality part step 2
// log.Warn("Potential start of quality part step 2 - +/end of line")
state = 2
} else {
// it was not the start of quality part
// log.Warn("it was not the start of quality part")
state = 0
i = restart
}
case 2:
if is_sep {
// Potential start of quality part step 2 (stay in the same state)
// log.Warn("Potential start of quality part step 2 - skipping separator")
state = 2
} else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' {
// progressing along of the sequence
// log.Warn("Detected the end of the sequence switching to state 3")
state = 3
} else {
// it was not the start of quality part
// log.Warn("it was not the start of quality part because is not preceded by sequence")
state = 0
i = restart
}
case 3:
if is_end_of_line {
// Entrering in the header line
// log.Warn("Potentially entrering in the header line")
state = 4
} else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' {
// progressing along of the sequence
// log.Warn("Progressing along of the sequence")
state = 3
} else {
// it was not the sequence part
// obilog.Warnf("it was not the sequence part : %c", C)
state = 0
i = restart
}
case 4:
if is_end_of_line {
state = 4
} else {
state = 5
}
case 5:
if is_end_of_line {
// It was not the header line
state = 0
i = restart
} else if C == '@' {
// It was the header line
// log.Warn("It was the header line")
state = 6
cut = i
}
case 6:
if is_end_of_line {
// log.Warn("====> End of the last sequence")
state = 7
} else {
// obilog.Warnf("%s: Strange it was not the end of the last sequence : %c : %s", string(buffer[0:40]), C, string(buffer[i-20:i+5]))
state = 5
}
}
}
if i == 0 || state != 7 {
return -1
}
return cut
}
func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality_shift byte) {
q := bytes.Bytes()
if len(q) == 0 {
log.Fatalf("@%s[%s] : sequence quality is empty", out.Id(), out.Source())
}
if len(q) != out.Len() {
log.Fatalf("%s[%s] : sequence data and quality lenght not equal (%d <> %d)",
out.Id(), out.Source(), len(q), out.Len())
}
for i := 0; i < len(q); i++ {
q[i] -= quality_shift
}
out.SetQualities(q)
}
func FastqChunkParser(quality_shift byte, with_quality bool, UtoT bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
var identifier string
var definition string
idBytes := bytes.Buffer{}
defBytes := bytes.Buffer{}
qualBytes := bytes.Buffer{}
seqBytes := bytes.Buffer{}
state := 0
scanner := bufio.NewReader(input)
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
previous := byte(0)
for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() {
is_end_of_line := C == '\r' || C == '\n'
is_space := C == ' ' || C == '\t'
is_sep := is_space || is_end_of_line
switch state {
case 0: // Beginning of sequence chunk must start with @
if C == '@' {
// Beginning of sequence
state = 1
} else {
log.Fatalf("%s : sequence entry is not starting with @", source)
}
case 1: // Beginning of identifier (Mandatory)
if is_sep {
// No identifier -> ERROR
log.Fatalf("%s : sequence identifier is empty", source)
} else {
// Beginning of identifier
state = 2
idBytes.Reset()
idBytes.WriteByte(C)
}
case 2: // Following of the identifier
if is_sep {
// End of identifier
identifier = idBytes.String()
state = 3
} else {
idBytes.WriteByte(C)
}
if is_end_of_line {
// Definition empty
definition = ""
state = 5
}
case 3: // Beginning of definition
if is_end_of_line {
// Definition empty
definition = ""
state = 5
} else if !is_space {
// Beginning of definition
defBytes.Reset()
defBytes.WriteByte(C)
state = 4
}
case 4: // Following of the definition
if is_end_of_line {
definition = defBytes.String()
state = 5
} else {
defBytes.WriteByte(C)
}
case 5: // Beginning of sequence
if !is_end_of_line {
// Beginning of sequence
if C >= 'A' && C <= 'Z' {
C = C + 'a' - 'A'
}
if C == 'u' && UtoT {
C = 't'
}
seqBytes.Reset()
seqBytes.WriteByte(C)
state = 6
}
case 6:
if is_end_of_line {
// End of sequence
rawseq := seqBytes.Bytes()
if len(rawseq) == 0 {
log.Fatalf("@%s[%s] : sequence is empty", identifier, source)
}
s := obiseq.NewBioSequence(identifier, rawseq, definition)
s.SetSource(source)
sequences = append(sequences, s)
state = 7
} else {
if C >= 'A' && C <= 'Z' {
C = C + 'a' - 'A'
}
if C == 'u' && UtoT {
C = 't'
}
if (C >= 'a' && C <= 'z') || C == '-' || C == '.' || C == '[' || C == ']' {
seqBytes.WriteByte(C)
} else {
context, _ := scanner.Peek(30)
context = append(
append([]byte{previous}, C),
context...)
log.Fatalf("%s [%s]: sequence contains invalid character %c (%s)",
source, identifier, C, string(context))
}
}
case 7:
if is_end_of_line {
state = 7
} else if C == '+' {
state = 8
} else {
log.Fatalf("@%s[%s] : sequence data not followed by a line starting with + but a %c", identifier, source, C)
}
case 8:
// State consuming the + internal header line
if is_end_of_line {
state = 9
}
case 9:
if is_end_of_line {
state = 9
} else {
// beginning of quality
state = 10
qualBytes.Reset()
qualBytes.WriteByte(C)
}
case 10:
if is_end_of_line {
if with_quality {
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
}
state = 11
} else {
qualBytes.WriteByte(C)
}
case 11:
if is_end_of_line {
state = 11
} else if C == '@' {
state = 1
} else {
log.Fatalf("%s[%s] : sequence record not followed by a line starting with @", identifier, source)
}
}
previous = C
}
if len(sequences) > 0 {
if state == 10 {
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
state = 1
}
}
return sequences, nil
}
return parser
}
func _ParseFastqFile(
input ChannelFileChunk,
out obiiter.IBioSequence,
quality_shift byte,
with_quality, UtoT bool,
) {
parser := FastqChunkParser(quality_shift, with_quality, UtoT)
for chunks := range input {
sequences, err := parser(chunks.Source, chunks.Raw)
if err != nil {
log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err)
}
out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences))
}
out.Done()
}
func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options)
out := obiiter.MakeIBioSequence()
nworker := opt.ParallelWorkers()
chkchan := ReadFileChunk(
opt.Source(),
reader,
1024*1024,
EndOfLastFastqEntry,
"\n@",
true,
)
for i := 0; i < nworker; i++ {
out.Add(1)
go _ParseFastqFile(
chkchan,
out,
obidefault.ReadQualitiesShift(),
opt.ReadQualities(),
opt.UtoT(),
)
}
go func() {
out.WaitAndClose()
}()
newIter := out.SortBatches()
log.Debugln("Full file batch mode : ", opt.FullFileBatch())
if opt.FullFileBatch() {
newIter = newIter.CompleteFileIterator()
}
annotParser := opt.ParseFastSeqHeader()
if annotParser != nil {
return IParseFastSeqHeaderBatch(newIter, options...), nil
}
return newIter, nil
}
func ReadFastqFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename)))))
file, err := obiutils.Ropen(filename)
if err == obiutils.ErrNoContent {
log.Infof("file %s is empty", filename)
return ReadEmptyFile(options...)
}
if err != nil {
return obiiter.NilIBioSequence, err
}
return ReadFastq(file, options...)
}
func ReadFastqFromStdin(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
options = append(options, OptionsSource(obiutils.RemoveAllExt("stdin")))
input, err := obiutils.Buf(os.Stdin)
if err == obiutils.ErrNoContent {
log.Infof("stdin is empty")
return ReadEmptyFile(options...)
}
if err != nil {
log.Fatalf("open file error: %v", err)
return obiiter.NilIBioSequence, err
}
return ReadFastq(input, options...)
}