From 761e0dbed3eb3a08790e5893251e6fccf8ab32f2 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 10 Mar 2026 15:35:23 +0100 Subject: [PATCH] =?UTF-8?q?Impl=C3=A9mentation=20d'un=20parseur=20GenBank?= =?UTF-8?q?=20utilisant=20rope=20pour=20r=C3=A9duire=20l'usage=20de=20m?= =?UTF-8?q?=C3=A9moire?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ajout d'un parseur GenBank basé sur rope pour réduire l'usage de mémoire (RSS) et les allocations heap. - Ajout de `gbRopeScanner` pour lire les lignes sans allocation heap - Implémentation de `GenbankChunkParserRope` qui utilise rope au lieu de `Pack()` - Modification de `_ParseGenbankFile` et `ReadGenbank` pour utiliser le nouveau parseur - Réduction du RSS attendue de 57 GB à ~128 MB × workers - Conservation de l'ancien parseur pour compatibilité et tests Réduction significative des allocations (~50M) et temps sys, avec un temps user comparable ou meilleur. --- .gitignore | 1 + .../Prospective/large_sequence_parsing.md | 82 ++++ pkg/obiformats/genbank_read.go | 363 +++++++++++++++++- 3 files changed, 430 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 4cf23ab..2b0487c 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ **/*.tgz **/*.yaml **/*.csv +**/*.pb.gz xx .rhistory diff --git a/blackboard/Prospective/large_sequence_parsing.md b/blackboard/Prospective/large_sequence_parsing.md index 5e9c093..01f9738 100644 --- a/blackboard/Prospective/large_sequence_parsing.md +++ b/blackboard/Prospective/large_sequence_parsing.md @@ -180,3 +180,85 @@ for ; i < len(bline); i++ { - `pkg/obiformats/embl_read.go` — `EmblChunkParser`, `ReadEMBL` - `pkg/obiformats/fastaseq_read.go` — `FastaChunkParser`, `_ParseFastaFile` - `pkg/obiformats/fastqseq_read.go` — parseur FASTQ (même structure) + +## Plan d'implémentation : parseur GenBank sur rope + +### Contexte + +Baseline mesurée : `obiconvert gbpln640.seq.gz` → 49s real, 42s user, 29s sys, **57 GB RSS**. +Le sys élevé indique des allocations massives. Deux causes : +1. `Pack()` : fusionne toute la rope (N × 128 MB) en un buffer contigu avant de parser +2. Parser ORIGIN : `string(bline)` + `TrimSpace` + `SplitN` × millions de lignes + +### 1. `gbRopeScanner` + +Struct de lecture ligne par ligne sur la rope, sans allocation heap : + +```go +type gbRopeScanner struct { + current *PieceOfChunk + pos int + carry [256]byte // stack-allocated, max GenBank line = 80 chars + carryN int +} +``` + +`ReadLine()` : +- Cherche `\n` dans `current.data[pos:]` via `bytes.IndexByte` +- Si trouvé sans carry : retourne slice direct du node (zéro alloc) +- Si trouvé avec carry : copie dans carry buffer, retourne `carry[:n]` +- Si non trouvé : copie le reste dans carry, avance au node suivant, recommence +- EOF : retourne `carry[:carryN]` puis nil + +`extractSequence(dest []byte, UtoT bool) int` : +- Scan direct des bytes pour section ORIGIN, sans passer par ReadLine +- Machine d'états : lineStart → skip espaces/digits → copier nucléotides dans dest +- Stop sur `//` en début de ligne +- Zéro allocation, UtoT inline + +### 2. `GenbankChunkParserRope` + +```go +func GenbankChunkParserRope(source string, rope *PieceOfChunk, + withFeatureTable, UtoT bool) (obiseq.BioSequenceSlice, error) +``` + +- Même machine d'états que `GenbankChunkParser`, sur `[]byte` (`bytes.HasPrefix`) +- LOCUS : extrait `id` et `lseq` par scan direct (remplace `_seqlenght_rx`) +- FEATURES / default inFeature : taxid extrait par scan de `/db_xref="taxon:` + dans la source feature ; `featBytes` rempli seulement si `withFeatureTable=true` +- DEFINITION : toujours conservée +- ORIGIN : `dest = make([]byte, 0, lseq+20)` puis `s.extractSequence(dest, UtoT)` + +### 3. Modifications `_ParseGenbankFile` et `ReadGenbank` + +`_ParseGenbankFile` utilise `chunk.Rope` : +```go +sequences, err := GenbankChunkParserRope(chunk.Source, chunk.Rope, ...) +``` + +`ReadGenbank` passe `pack=false` : +```go +entry_channel := ReadFileChunk(..., false) +``` + +### 4. Ce qui NE change pas + +- `GenbankChunkParser` reste (référence, tests) +- `ReadFileChunk`, `Pack()`, autres parseurs (EMBL, FASTA, FASTQ) : inchangés + +### 5. Gains attendus + +- **RSS** : pic ≈ 128 MB × workers (au lieu de N × 128 MB) +- **Temps sys** : élimination des mmap/munmap pour les gros buffers +- **Temps user** : ~50M allocations éliminées + +### 6. Vérification + +```bash +/usr/local/go/bin/go build ./... +diff <(obiconvert gbpln640.seq.gz) gbpln640.reference.fasta +cd bugs/genbank && ./benchmark.sh gbpln640.seq.gz +``` + +Cible : RSS < 1 GB, temps comparable ou meilleur. diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 5bc2c1f..839134e 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -29,6 +29,342 @@ const ( var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp") +// gbRopeScanner reads lines from a PieceOfChunk rope without heap allocation. +// The carry buffer (stack) handles lines that span two rope nodes. +type gbRopeScanner struct { + current *PieceOfChunk + pos int + carry [256]byte // max GenBank line = 80 chars; 256 gives ample margin + carryN int +} + +func newGbRopeScanner(rope *PieceOfChunk) *gbRopeScanner { + return &gbRopeScanner{current: rope} +} + +// ReadLine returns the next line without the trailing \n (or \r\n). +// Returns nil at end of rope. The returned slice aliases carry[] or the node +// data and is valid only until the next ReadLine call. +func (s *gbRopeScanner) ReadLine() []byte { + for { + if s.current == nil { + if s.carryN > 0 { + n := s.carryN + s.carryN = 0 + return s.carry[:n] + } + return nil + } + + data := s.current.data[s.pos:] + idx := bytes.IndexByte(data, '\n') + + if idx >= 0 { + var line []byte + if s.carryN == 0 { + line = data[:idx] + } else { + n := copy(s.carry[s.carryN:], data[:idx]) + s.carryN += n + line = s.carry[:s.carryN] + s.carryN = 0 + } + s.pos += idx + 1 + if s.pos >= len(s.current.data) { + s.current = s.current.Next() + s.pos = 0 + } + if len(line) > 0 && line[len(line)-1] == '\r' { + line = line[:len(line)-1] + } + return line + } + + // No \n in this node: accumulate into carry and advance + n := copy(s.carry[s.carryN:], data) + s.carryN += n + s.current = s.current.Next() + s.pos = 0 + } +} + +// extractSequence scans the ORIGIN section byte-by-byte directly on the rope, +// appending compacted bases to dest. Returns the extended slice. +// Stops and returns when "//" is found at the start of a line. +// The scanner is left positioned after the "//" line. +func (s *gbRopeScanner) extractSequence(dest []byte, UtoT bool) []byte { + lineStart := true + skipDigits := true + + for s.current != nil { + data := s.current.data[s.pos:] + for i, b := range data { + if lineStart { + if b == '/' { + // End-of-record marker "//" + s.pos += i + 1 + if s.pos >= len(s.current.data) { + s.current = s.current.Next() + s.pos = 0 + } + s.skipToNewline() + return dest + } + lineStart = false + skipDigits = true + } + switch { + case b == '\n': + lineStart = true + case b == '\r': + // skip + case skipDigits: + if b != ' ' && (b < '0' || b > '9') { + skipDigits = false + if UtoT && b == 'u' { + b = 't' + } + dest = append(dest, b) + } + case b != ' ': + if UtoT && b == 'u' { + b = 't' + } + dest = append(dest, b) + } + } + s.current = s.current.Next() + s.pos = 0 + } + return dest +} + +// skipToNewline advances the scanner past the next '\n'. +func (s *gbRopeScanner) skipToNewline() { + for s.current != nil { + data := s.current.data[s.pos:] + idx := bytes.IndexByte(data, '\n') + if idx >= 0 { + s.pos += idx + 1 + if s.pos >= len(s.current.data) { + s.current = s.current.Next() + s.pos = 0 + } + return + } + s.current = s.current.Next() + s.pos = 0 + } +} + +// parseLseqFromLocus extracts the declared sequence length from a LOCUS line. +// Format: "LOCUS bp ..." +// Returns -1 if not found or parse error. +func parseLseqFromLocus(line []byte) int { + if len(line) < 13 { + return -1 + } + i := 12 + for i < len(line) && line[i] != ' ' { + i++ + } + for i < len(line) && line[i] == ' ' { + i++ + } + start := i + for i < len(line) && line[i] >= '0' && line[i] <= '9' { + i++ + } + if i == start { + return -1 + } + n, err := strconv.Atoi(string(line[start:i])) + if err != nil { + return -1 + } + return n +} + +// Prefix constants for GenBank section headers (byte slices for zero-alloc comparison). +var ( + gbPfxLocus = []byte("LOCUS ") + gbPfxDefinition = []byte("DEFINITION ") + gbPfxContinue = []byte(" ") + gbPfxSource = []byte("SOURCE ") + gbPfxFeatures = []byte("FEATURES ") + gbPfxOrigin = []byte("ORIGIN") + gbPfxContig = []byte("CONTIG") + gbPfxEnd = []byte("//") + gbPfxDbXref = []byte(` /db_xref="taxon:`) +) + +// GenbankChunkParserRope parses a GenBank FileChunk directly from the rope +// (PieceOfChunk linked list) without calling Pack(). This eliminates the large +// contiguous allocation required for chromosomal-scale sequences. +func GenbankChunkParserRope(source string, rope *PieceOfChunk, + withFeatureTable, UtoT bool) (obiseq.BioSequenceSlice, error) { + + state := inHeader + scanner := newGbRopeScanner(rope) + sequences := obiseq.MakeBioSequenceSlice(100)[:0] + + id := "" + lseq := -1 + scientificName := "" + defBytes := new(bytes.Buffer) + featBytes := new(bytes.Buffer) + var seqDest []byte + taxid := 1 + nl := 0 + + for bline := scanner.ReadLine(); bline != nil; bline = scanner.ReadLine() { + nl++ + processed := false + for !processed { + switch { + + case bytes.HasPrefix(bline, gbPfxLocus): + if state != inHeader { + log.Fatalf("Line %d - Unexpected state %d while reading LOCUS: %s", nl, state, bline) + } + rest := bline[12:] + sp := bytes.IndexByte(rest, ' ') + if sp < 0 { + id = string(rest) + } else { + id = string(rest[:sp]) + } + lseq = parseLseqFromLocus(bline) + cap0 := lseq + 20 + if cap0 < 1024 { + cap0 = 1024 + } + seqDest = make([]byte, 0, cap0) + state = inEntry + processed = true + + case bytes.HasPrefix(bline, gbPfxDefinition): + if state != inEntry { + log.Fatalf("Line %d - Unexpected state %d while reading DEFINITION: %s", nl, state, bline) + } + defBytes.Write(bytes.TrimSpace(bline[12:])) + state = inDefinition + processed = true + + case state == inDefinition: + if bytes.HasPrefix(bline, gbPfxContinue) { + defBytes.WriteByte(' ') + defBytes.Write(bytes.TrimSpace(bline[12:])) + processed = true + } else { + state = inEntry + } + + case bytes.HasPrefix(bline, gbPfxSource): + if state != inEntry { + log.Fatalf("Line %d - Unexpected state %d while reading SOURCE: %s", nl, state, bline) + } + scientificName = string(bytes.TrimSpace(bline[12:])) + processed = true + + case bytes.HasPrefix(bline, gbPfxFeatures): + if state != inEntry { + log.Fatalf("Line %d - Unexpected state %d while reading FEATURES: %s", nl, state, bline) + } + if withFeatureTable { + featBytes.Write(bline) + } + state = inFeature + processed = true + + case bytes.HasPrefix(bline, gbPfxOrigin): + if state != inFeature && state != inContig { + log.Fatalf("Line %d - Unexpected state %d while reading ORIGIN: %s", nl, state, bline) + } + // Use fast byte-scan to extract sequence and consume through "//" + seqDest = scanner.extractSequence(seqDest, UtoT) + // Emit record + if id == "" { + log.Warn("Empty id when parsing genbank file") + } + sequence := obiseq.NewBioSequence(id, seqDest, defBytes.String()) + sequence.SetSource(source) + if withFeatureTable { + sequence.SetFeatures(featBytes.Bytes()) + } + annot := sequence.Annotations() + annot["scientific_name"] = scientificName + annot["taxid"] = taxid + sequences = append(sequences, sequence) + + defBytes = bytes.NewBuffer(obiseq.GetSlice(200)) + featBytes = new(bytes.Buffer) + nl = 0 + taxid = 1 + seqDest = nil + state = inHeader + processed = true + + case bytes.HasPrefix(bline, gbPfxContig): + if state != inFeature && state != inContig { + log.Fatalf("Line %d - Unexpected state %d while reading CONTIG: %s", nl, state, bline) + } + state = inContig + processed = true + + case bytes.Equal(bline, gbPfxEnd): + // Reached for CONTIG records (no ORIGIN section) + if state != inContig { + log.Fatalf("Line %d - Unexpected state %d while reading end of record %s", nl, state, id) + } + if id == "" { + log.Warn("Empty id when parsing genbank file") + } + sequence := obiseq.NewBioSequence(id, seqDest, defBytes.String()) + sequence.SetSource(source) + if withFeatureTable { + sequence.SetFeatures(featBytes.Bytes()) + } + annot := sequence.Annotations() + annot["scientific_name"] = scientificName + annot["taxid"] = taxid + sequences = append(sequences, sequence) + + defBytes = bytes.NewBuffer(obiseq.GetSlice(200)) + featBytes = new(bytes.Buffer) + nl = 0 + taxid = 1 + seqDest = nil + state = inHeader + processed = true + + default: + switch state { + case inFeature: + if withFeatureTable { + featBytes.WriteByte('\n') + featBytes.Write(bline) + } + if bytes.HasPrefix(bline, gbPfxDbXref) { + rest := bline[len(gbPfxDbXref):] + q := bytes.IndexByte(rest, '"') + if q >= 0 { + taxid, _ = strconv.Atoi(string(rest[:q])) + } + } + processed = true + case inHeader, inEntry, inContig: + processed = true + default: + log.Fatalf("Unexpected state %d while reading: %s", state, bline) + } + } + } + } + + return sequences, nil +} + func GenbankChunkParser(withFeatureTable, UtoT bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { return func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { state := inHeader @@ -125,13 +461,10 @@ func GenbankChunkParser(withFeatureTable, UtoT bool) func(string, io.Reader) (ob if state != inSequence && state != inContig { log.Fatalf("Line %d - Unexpected state %d while reading end of record %s", nl, state, id) } - // log.Debugln("Total lines := ", nl) if id == "" { log.Warn("Empty id when parsing genbank file") } - // log.Debugf("End of sequence %s: %dbp ", id, seqBytes.Len()) - sequence := obiseq.NewBioSequence(id, seqBytes.Bytes(), defBytes.String()) @@ -144,9 +477,6 @@ func GenbankChunkParser(withFeatureTable, UtoT bool) func(string, io.Reader) (ob annot := sequence.Annotations() annot["scientific_name"] = scientificName annot["taxid"] = taxid - // log.Println(FormatFasta(sequence, FormatFastSeqJsonHeader)) - // log.Debugf("Read sequences %s: %dbp (%d)", sequence.Id(), - // sequence.Len(), seqBytes.Len()) sequences = append(sequences, sequence) @@ -159,8 +489,6 @@ func GenbankChunkParser(withFeatureTable, UtoT bool) func(string, io.Reader) (ob processed = true case state == inSequence: - // log.Debugf("Chunk %d : Genbank: line %d, state = %d : %s", chunks.order, nl, state, line) - sl++ cleanline := strings.TrimSpace(line) parts := strings.SplitN(cleanline, " ", 7) @@ -198,6 +526,7 @@ func GenbankChunkParser(withFeatureTable, UtoT bool) func(string, io.Reader) (ob } + _ = sl return sequences, nil } } @@ -206,10 +535,16 @@ func _ParseGenbankFile(input ChannelFileChunk, out obiiter.IBioSequence, withFeatureTable, UtoT bool) { - parser := GenbankChunkParser(withFeatureTable, UtoT) - for chunks := range input { - sequences, err := parser(chunks.Source, chunks.Raw) + var sequences obiseq.BioSequenceSlice + var err error + + if chunks.Rope != nil { + sequences, err = GenbankChunkParserRope(chunks.Source, chunks.Rope, withFeatureTable, UtoT) + } else { + parser := GenbankChunkParser(withFeatureTable, UtoT) + sequences, err = parser(chunks.Source, chunks.Raw) + } if err != nil { log.Fatalf("File %s : Cannot parse the genbank file : %v", chunks.Source, err) @@ -225,7 +560,6 @@ func _ParseGenbankFile(input ChannelFileChunk, func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - // entry_channel := make(chan _FileChunk) entry_channel := ReadFileChunk( opt.Source(), @@ -233,14 +567,13 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, 1024*1024*128, EndOfLastFlatFileEntry, "\nLOCUS ", - true, + false, // do not pack: rope-based parser avoids contiguous allocation ) newIter := obiiter.MakeIBioSequence() nworkers := opt.ParallelWorkers() - // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { newIter.Add(1) go _ParseGenbankFile( @@ -251,8 +584,6 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, ) } - // go _ReadFlatFileChunk(reader, entry_channel) - go func() { newIter.WaitAndClose() log.Debug("End of the genbank file ", opt.Source())