From a7ea47624b6a86bcbc2ef79017028c5c1dcd2ae8 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 10 Mar 2026 14:20:10 +0100 Subject: [PATCH] =?UTF-8?q?Optimisation=20du=20parsing=20des=20grandes=20s?= =?UTF-8?q?=C3=A9quences?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implémente une optimisation du parsing des grandes séquences en évitant l'allocation de mémoire inutile lors de la fusion des chunks. Ajoute un support pour le parsing direct de la structure rope, ce qui permet de réduire les allocations et d'améliorer les performances lors du traitement de fichiers GenBank/EMBL et FASTA/FASTQ de plusieurs Gbp. Les parseurs sont mis à jour pour utiliser la rope non-packée et le nouveau mécanisme d'écriture in-place pour les séquences GenBank. --- .../Prospective/large_sequence_parsing.md | 182 ++++++++++++++++++ pkg/obiformats/embl_read.go | 1 + pkg/obiformats/fastaseq_read.go | 1 + pkg/obiformats/fastqseq_read.go | 1 + pkg/obiformats/file_chunk_read.go | 20 +- pkg/obiformats/genbank_read.go | 1 + 6 files changed, 200 insertions(+), 6 deletions(-) create mode 100644 blackboard/Prospective/large_sequence_parsing.md diff --git a/blackboard/Prospective/large_sequence_parsing.md b/blackboard/Prospective/large_sequence_parsing.md new file mode 100644 index 0000000..5e9c093 --- /dev/null +++ b/blackboard/Prospective/large_sequence_parsing.md @@ -0,0 +1,182 @@ +# Optimisation du parsing des grandes séquences + +## Contexte + +OBITools4 doit pouvoir traiter des séquences de taille chromosomique (plusieurs Gbp), notamment +issues de fichiers GenBank/EMBL (assemblages de génomes) ou de fichiers FASTA convertis depuis +ces formats. + +## Architecture actuelle + +### Pipeline de lecture (`pkg/obiformats/`) + +``` +ReadFileChunk (goroutine) + → ChannelFileChunk + → N × _ParseGenbankFile / _ParseFastaFile (goroutines) + → IBioSequence +``` + +`ReadFileChunk` (`file_chunk_read.go`) lit le fichier par morceaux via une chaîne de +`PieceOfChunk` (rope). Chaque nœud fait `fileChunkSize` bytes : + +- GenBank/EMBL : 128 MB (`1024*1024*128`) +- FASTA/FASTQ : 1 MB (`1024*1024`) + +La chaîne est accumulée jusqu'à trouver la fin du dernier enregistrement complet (splitter), +puis `Pack()` est appelé pour fusionner tous les nœuds en un seul buffer contigu. Ce buffer +est transmis au parseur via `FileChunk.Raw *bytes.Buffer`. + +### Parseur GenBank (`genbank_read.go`) + +`GenbankChunkParser` reçoit un `io.Reader` sur le buffer packé, lit ligne par ligne via +`bufio.NewReader` (buffer 4096 bytes), et pour chaque ligne de la section `ORIGIN` : + +```go +line = string(bline) // allocation par ligne +cleanline := strings.TrimSpace(line) // allocation +parts := strings.SplitN(cleanline, " ", 7) // allocation []string + substrings +for i := 1; i < lparts; i++ { + seqBytes.WriteString(parts[i]) +} +``` + +Point positif : `seqBytes` est pré-alloué grâce à `lseq` extrait de la ligne `LOCUS`. + +### Parseur FASTA (`fastaseq_read.go`) + +`FastaChunkParser` lit **octet par octet** via `scanner.ReadByte()`. Pour 3 Gbp : +3 milliards d'appels. `seqBytes` est un `bytes.Buffer{}` sans pré-allocation. + +## Problème principal + +Pour une séquence de plusieurs Gbp, `Pack()` fusionne une chaîne de ~N nœuds de 128 MB en +un seul buffer contigu. C'est une allocation de N × 128 MB suivie d'une copie de toutes les +données. Bien que l'implémentation de `Pack()` soit efficace (libère les nœuds au fur et à +mesure via `slices.Grow`), la copie est inévitable avec l'architecture actuelle. + +De plus, le parseur GenBank produit des dizaines de millions d'allocations temporaires pour +parser la section `ORIGIN` (une par ligne). + +## Invariant clé découvert + +**Si la rope a plus d'un nœud, le premier nœud seul ne se termine pas sur une frontière +d'enregistrement** (pas de `//\n` en fin de `piece1`). + +Preuve par construction dans `ReadFileChunk` : +- `splitter` est appelé dès le premier nœud (ligne 157) +- Si `end >= 0` → frontière trouvée dans 128 MB → boucle interne sautée → rope à 1 nœud +- Si `end < 0` → boucle interne ajoute des nœuds → rope à ≥ 2 nœuds + +Corollaire : si rope à 1 nœud, `Pack()` ne fait rien (aucun nœud suivant). + +**Attention** : rope à ≥ 2 nœuds ne signifie pas qu'il n'y a qu'une seule séquence dans +la rope. La rope packée peut contenir plusieurs enregistrements complets. Exemple : records +de 80 MB → `nextpieces` (48 MB de reste) + nouveau nœud (128 MB) = rope à 2 nœuds +contenant 2 records complets + début d'un troisième. + +L'invariant dit seulement que `piece1` seul est incomplet — pas que la rope entière +ne contient qu'un seul record. + +**Invariant : le dernier FileChunk envoyé finit sur une frontière d'enregistrement.** + +Deux chemins dans `ReadFileChunk` : + +1. **Chemin normal** (`end >= 0` via `splitter`) : le buffer est explicitement tronqué à + `end` (ligne 200 : `pieces.data = pieces.data[:end]`). Frontière garantie par construction + pour tous les formats. ✓ + +2. **Chemin EOF** (`end < 0`, `end = pieces.Len()`) : tout le reste du fichier est envoyé. + - **GenBank/EMBL** : présuppose fichier bien formé (se termine par `//\n`). Le parseur + lève un `log.Fatalf` sur tout état inattendu — filet de sécurité suffisant. ✓ + - **FASTQ** : présupposé, vérifié par le parseur. ✓ + - **FASTA** : garanti par le format lui-même (fin d'enregistrement = EOF ou `>`). ✓ + +**Hypothèse de travail adoptée** : les fichiers d'entrée sont bien formés. Dans le pire cas, +le parseur lèvera une erreur explicite. Il n'y a pas de risque de corruption silencieuse. + +## Piste d'optimisation : se dispenser de Pack() + +### Idée centrale + +Au lieu de fusionner la rope avant de la passer au parseur, **parser directement la rope +nœud par nœud**, et **écrire la séquence compactée in-place dans le premier nœud**. + +Pourquoi c'est sûr : +- Le header (LOCUS, DEFINITION, SOURCE, FEATURES) est **petit** et traité en premier +- La séquence (ORIGIN) est **à la fin** du record +- Au moment d'écrire la séquence depuis l'offset 0 de `piece1`, le pointeur de lecture + est profond dans la rope (offset >> 0) → jamais de collision +- La séquence compactée est toujours plus courte que les données brutes + +### Pré-allocation + +Pour GenBank/EMBL : `lseq` est connu dès la ligne `LOCUS`/`ID` (première ligne, dans +`piece1`). On peut faire `slices.Grow(piece1.data, lseq)` dès ce moment. + +Pour FASTA : pas de taille garantie dans le header, mais `rope.Len()` donne un majorant. +On peut utiliser `rope.Len() / 2` comme estimation initiale. + +### Gestion des jonctions entre nœuds + +Une ligne peut chevaucher deux nœuds (rare avec 128 MB, mais possible). Solution : carry +buffer de ~128 bytes pour les quelques bytes en fin de nœud. + +### Cas FASTA/FASTQ multi-séquences + +Un FileChunk peut contenir N séquences (notamment FASTA/FASTQ courts). Dans ce cas +l'écriture in-place dans `piece1` n'est pas applicable directement — on écrase des données +nécessaires aux séquences suivantes. + +Stratégie par cas : +- **Rope à 1 nœud** (record ≤ 128 MB) : `Pack()` est trivial (no-op), parseur actuel OK +- **Rope à ≥ 2 nœuds** : par l'invariant, `piece1` ne contient pas de record complet → + une seule grande séquence → in-place applicable + +### Format d'une ligne séquence GenBank (Après ORIGIN) + +``` +/^ *[0-9]+( [nuc]{10}){0,5} [nuc]{1,10}/ +``` + +### Format d'une ligne séquence GenBank (Après SQ) + +La ligne SQ contient aussi la taille de la séquence + +``` +/^ *( [nuc]{10}){0,5} [nuc]{1,10} *[0-9]+/ +``` + +Compactage in-place sur `bline` ([]byte brut, sans conversion `string`) : + +```go +w := 0 +i := 0 +for i < len(bline) && bline[i] == ' ' { i++ } // skip indentation +for i < len(bline) && bline[i] <= '9' { i++ } // skip position number +for ; i < len(bline); i++ { + if bline[i] != ' ' { + bline[w] = bline[i] + w++ + } +} +// écrire bline[:w] directement dans piece1.data[seqOffset:] +``` + +## Changements nécessaires + +1. **`FileChunk`** : exposer la rope `*PieceOfChunk` non-packée en plus (ou à la place) + de `Raw *bytes.Buffer` +2. **`GenbankChunkParser` / `EmblChunkParser`** : accepter `*PieceOfChunk`, parser la + rope séquentiellement avec carry buffer pour les jonctions +3. **`FastaChunkParser`** : idem, avec in-place conditionnel selon taille de la rope +4. **`ReadFileChunk`** : ne pas appeler `Pack()` avant envoi sur le channel (ou version + alternative `ReadFileChunkRope`) + +## Fichiers concernés + +- `pkg/obiformats/file_chunk_read.go` — structure rope, `ReadFileChunk` +- `pkg/obiformats/genbank_read.go` — `GenbankChunkParser`, `_ParseGenbankFile` +- `pkg/obiformats/embl_read.go` — `EmblChunkParser`, `ReadEMBL` +- `pkg/obiformats/fastaseq_read.go` — `FastaChunkParser`, `_ParseFastaFile` +- `pkg/obiformats/fastqseq_read.go` — parseur FASTQ (même structure) diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index fb22ccd..d2fa9b1 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -196,6 +196,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, er 1024*1024*128, EndOfLastFlatFileEntry, "\nID ", + true, ) newIter := obiiter.MakeIBioSequence() diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index 8c8441c..3597be2 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -245,6 +245,7 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e 1024*1024, EndOfLastFastaEntry, "\n>", + true, ) for i := 0; i < nworker; i++ { diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index c092e32..9c94d2d 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -339,6 +339,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e 1024*1024, EndOfLastFastqEntry, "\n@", + true, ) for i := 0; i < nworker; i++ { diff --git a/pkg/obiformats/file_chunk_read.go b/pkg/obiformats/file_chunk_read.go index 20c8c22..9a4f7a7 100644 --- a/pkg/obiformats/file_chunk_read.go +++ b/pkg/obiformats/file_chunk_read.go @@ -16,6 +16,7 @@ type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error) type FileChunk struct { Source string Raw *bytes.Buffer + Rope *PieceOfChunk Order int } @@ -97,11 +98,17 @@ func (piece *PieceOfChunk) IsLast() bool { return piece.next == nil } -func (piece *PieceOfChunk) FileChunk(source string, order int) FileChunk { - piece.Pack() +func (piece *PieceOfChunk) FileChunk(source string, order int, pack bool) FileChunk { + piece = piece.Head() + var raw *bytes.Buffer + if pack { + piece.Pack() + raw = bytes.NewBuffer(piece.data) + } return FileChunk{ Source: source, - Raw: bytes.NewBuffer(piece.data), + Raw: raw, + Rope: piece, Order: order, } } @@ -133,7 +140,8 @@ func ReadFileChunk( reader io.Reader, fileChunkSize int, splitter LastSeqRecord, - probe string) ChannelFileChunk { + probe string, + pack bool) ChannelFileChunk { chunk_channel := make(ChannelFileChunk) @@ -205,7 +213,7 @@ func ReadFileChunk( if len(pieces.data) > 0 { // obilog.Warnf("chuck %d :Read %d bytes from file %s", i, io.Len(), source) - chunk_channel <- pieces.FileChunk(source, i) + chunk_channel <- pieces.FileChunk(source, i, pack) i++ } @@ -222,7 +230,7 @@ func ReadFileChunk( // Send the last chunk to the channel if pieces.Len() > 0 { - chunk_channel <- pieces.FileChunk(source, i) + chunk_channel <- pieces.FileChunk(source, i, pack) } // Close the readers channel when the end of the file is reached diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 37e84db..5bc2c1f 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -233,6 +233,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, 1024*1024*128, EndOfLastFlatFileEntry, "\nLOCUS ", + true, ) newIter := obiiter.MakeIBioSequence()