Files
obitools4/pkg/obiformats/genbank_read.go
Eric Coissac a7ea47624b Optimisation du parsing des grandes séquences
Implémente une optimisation du parsing des grandes séquences en évitant l'allocation de mémoire inutile lors de la fusion des chunks. Ajoute un support pour le parsing direct de la structure rope, ce qui permet de réduire les allocations et d'améliorer les performances lors du traitement de fichiers GenBank/EMBL et FASTA/FASTQ de plusieurs Gbp. Les parseurs sont mis à jour pour utiliser la rope non-packée et le nouveau mécanisme d'écriture in-place pour les séquences GenBank.
2026-03-10 14:20:21 +01:00

288 lines
7.2 KiB
Go

package obiformats
import (
"bufio"
"bytes"
"io"
"path"
"regexp"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
type gbstate int
const (
inHeader gbstate = 0
inEntry gbstate = 1
inDefinition gbstate = 2
inFeature gbstate = 3
inSequence gbstate = 4
inContig gbstate = 5
)
var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp")
func GenbankChunkParser(withFeatureTable, UtoT bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
return func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
state := inHeader
scanner := bufio.NewReader(input)
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
id := ""
lseq := -1
scientificName := ""
defBytes := new(bytes.Buffer)
featBytes := new(bytes.Buffer)
seqBytes := new(bytes.Buffer)
taxid := 1
nl := 0
sl := 0
var line string
for bline, is_prefix, err := scanner.ReadLine(); err != io.EOF; bline, is_prefix, err = scanner.ReadLine() {
nl++
line = string(bline)
if is_prefix || len(line) > 100 {
log.Fatalf("From %s:Line too long: %s", source, line)
}
processed := false
for !processed {
switch {
case strings.HasPrefix(line, "LOCUS "):
if state != inHeader {
log.Fatalf("Line %d - Unexpected state %d while reading LOCUS: %s", nl, state, line)
}
id = strings.SplitN(line[12:], " ", 2)[0]
match_length := _seqlenght_rx.FindStringSubmatch(line)
if len(match_length) > 0 {
lseq, err = strconv.Atoi(match_length[1])
if err != nil {
lseq = -1
}
}
if lseq > 0 {
seqBytes = bytes.NewBuffer(obiseq.GetSlice(lseq + 20))
} else {
seqBytes = new(bytes.Buffer)
}
state = inEntry
processed = true
case strings.HasPrefix(line, "DEFINITION "):
if state != inEntry {
log.Fatalf("Line %d - Unexpected state %d while reading DEFINITION: %s", nl, state, line)
}
defBytes.WriteString(strings.TrimSpace(line[12:]))
state = inDefinition
processed = true
case state == inDefinition:
if strings.HasPrefix(line, " ") {
defBytes.WriteByte(' ')
defBytes.WriteString(strings.TrimSpace(line[12:]))
processed = true
} else {
state = inEntry
}
case strings.HasPrefix(line, "SOURCE "):
if state != inEntry {
log.Fatalf("Line %d - Unexpected state %d while reading SOURCE: %s", nl, state, line)
}
scientificName = strings.TrimSpace(line[12:])
processed = true
case strings.HasPrefix(line, "FEATURES "):
if state != inEntry {
log.Fatalf("Line %d - Unexpected state %d while reading FEATURES: %s", nl, state, line)
}
featBytes.WriteString(line)
state = inFeature
processed = true
case strings.HasPrefix(line, "ORIGIN"):
if state != inFeature && state != inContig {
log.Fatalf("Line %d - Unexpected state %d while reading ORIGIN: %s", nl, state, line)
}
state = inSequence
processed = true
case strings.HasPrefix(line, "CONTIG"):
if state != inFeature && state != inContig {
log.Fatalf("Line %d - Unexpected state %d while reading ORIGIN: %s", nl, state, line)
}
state = inContig
processed = true
case line == "//":
if state != inSequence && state != inContig {
log.Fatalf("Line %d - Unexpected state %d while reading end of record %s", nl, state, id)
}
// log.Debugln("Total lines := ", nl)
if id == "" {
log.Warn("Empty id when parsing genbank file")
}
// log.Debugf("End of sequence %s: %dbp ", id, seqBytes.Len())
sequence := obiseq.NewBioSequence(id,
seqBytes.Bytes(),
defBytes.String())
sequence.SetSource(source)
if withFeatureTable {
sequence.SetFeatures(featBytes.Bytes())
}
annot := sequence.Annotations()
annot["scientific_name"] = scientificName
annot["taxid"] = taxid
// log.Println(FormatFasta(sequence, FormatFastSeqJsonHeader))
// log.Debugf("Read sequences %s: %dbp (%d)", sequence.Id(),
// sequence.Len(), seqBytes.Len())
sequences = append(sequences, sequence)
defBytes = bytes.NewBuffer(obiseq.GetSlice(200))
featBytes = new(bytes.Buffer)
nl = 0
sl = 0
state = inHeader
processed = true
case state == inSequence:
// log.Debugf("Chunk %d : Genbank: line %d, state = %d : %s", chunks.order, nl, state, line)
sl++
cleanline := strings.TrimSpace(line)
parts := strings.SplitN(cleanline, " ", 7)
lparts := len(parts)
for i := 1; i < lparts; i++ {
if UtoT {
parts[i] = strings.ReplaceAll(parts[i], "u", "t")
}
seqBytes.WriteString(parts[i])
}
processed = true
default:
switch state {
case inFeature:
if withFeatureTable {
featBytes.WriteByte('\n')
featBytes.WriteString(line)
}
if strings.HasPrefix(line, ` /db_xref="taxon:`) {
taxid, _ = strconv.Atoi(strings.SplitN(line[37:], `"`, 2)[0])
}
processed = true
case inHeader:
processed = true
case inEntry:
processed = true
case inContig:
processed = true
default:
log.Fatalf("Unexpected state %d while reading: %s", state, line)
}
}
}
}
return sequences, nil
}
}
func _ParseGenbankFile(input ChannelFileChunk,
out obiiter.IBioSequence,
withFeatureTable, UtoT bool) {
parser := GenbankChunkParser(withFeatureTable, UtoT)
for chunks := range input {
sequences, err := parser(chunks.Source, chunks.Raw)
if err != nil {
log.Fatalf("File %s : Cannot parse the genbank file : %v", chunks.Source, err)
}
out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences))
}
log.Debug("End of the Genbank thread")
out.Done()
}
func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options)
// entry_channel := make(chan _FileChunk)
entry_channel := ReadFileChunk(
opt.Source(),
reader,
1024*1024*128,
EndOfLastFlatFileEntry,
"\nLOCUS ",
true,
)
newIter := obiiter.MakeIBioSequence()
nworkers := opt.ParallelWorkers()
// for j := 0; j < opt.ParallelWorkers(); j++ {
for j := 0; j < nworkers; j++ {
newIter.Add(1)
go _ParseGenbankFile(
entry_channel,
newIter,
opt.WithFeatureTable(),
opt.UtoT(),
)
}
// go _ReadFlatFileChunk(reader, entry_channel)
go func() {
newIter.WaitAndClose()
log.Debug("End of the genbank file ", opt.Source())
}()
if opt.FullFileBatch() {
newIter = newIter.CompleteFileIterator()
}
return newIter, nil
}
func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
var reader io.Reader
var err error
options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename)))))
reader, err = obiutils.Ropen(filename)
if err == obiutils.ErrNoContent {
log.Infof("file %s is empty", filename)
return ReadEmptyFile(options...)
}
if err != nil {
log.Printf("open file error: %+v", err)
return obiiter.NilIBioSequence, err
}
return ReadGenbank(reader, options...)
}