From 3d2e2057228d111aadfbd0d4cd6a73c71480947d Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 10 Mar 2026 16:46:53 +0100 Subject: [PATCH] Refactor rope scanner and add FASTQ rope parser This commit refactors the rope scanner implementation by renaming gbRopeScanner to ropeScanner and extracting the common functionality into a new file. It also introduces a new FastqChunkParserRope function that parses FASTQ chunks directly from a rope without Pack(), enabling more efficient memory usage. The existing parsers are updated to use the new rope-based parser when available. The BioSequence type is enhanced with a TakeQualities method for more efficient quality data handling. --- pkg/obiformats/fastaseq_read.go | 4 +- pkg/obiformats/fastqseq_read.go | 85 ++++++++++++++++++++++++++++++++- pkg/obiformats/genbank_read.go | 81 +------------------------------ pkg/obiformats/rope_scanner.go | 80 +++++++++++++++++++++++++++++++ pkg/obiseq/biosequence.go | 9 ++++ 5 files changed, 176 insertions(+), 83 deletions(-) create mode 100644 pkg/obiformats/rope_scanner.go diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index c700ea2..5a5bbdd 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -214,7 +214,7 @@ func FastaChunkParser(UtoT bool) func(string, io.Reader) (obiseq.BioSequenceSlic // Stops when '>' is found at the start of a line (next record) or at EOF. // Returns (dest with appended bases, hasMore). // hasMore=true means scanner is now positioned at '>' of the next record. -func (s *gbRopeScanner) extractFastaSeq(dest []byte, UtoT bool) ([]byte, bool) { +func (s *ropeScanner) extractFastaSeq(dest []byte, UtoT bool) ([]byte, bool) { lineStart := true for s.current != nil { @@ -252,7 +252,7 @@ func (s *gbRopeScanner) extractFastaSeq(dest []byte, UtoT bool) ([]byte, bool) { // FastaChunkParserRope parses a FASTA chunk directly from the rope without Pack(). func FastaChunkParserRope(source string, rope *PieceOfChunk, UtoT bool) (obiseq.BioSequenceSlice, error) { - scanner := newGbRopeScanner(rope) + scanner := newRopeScanner(rope) sequences := obiseq.MakeBioSequenceSlice(100)[:0] for { diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index 9c94d2d..861705f 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -303,6 +303,80 @@ func FastqChunkParser(quality_shift byte, with_quality bool, UtoT bool) func(str return parser } +// FastqChunkParserRope parses a FASTQ chunk directly from a rope without Pack(). +func FastqChunkParserRope(source string, rope *PieceOfChunk, quality_shift byte, with_quality, UtoT bool) (obiseq.BioSequenceSlice, error) { + scanner := newRopeScanner(rope) + sequences := obiseq.MakeBioSequenceSlice(100)[:0] + + for { + // Line 1: @id [definition] + hline := scanner.ReadLine() + if hline == nil { + break + } + if len(hline) == 0 || hline[0] != '@' { + continue + } + header := hline[1:] + var id string + var definition string + sp := bytes.IndexByte(header, ' ') + if sp < 0 { + sp = bytes.IndexByte(header, '\t') + } + if sp < 0 { + id = string(header) + } else { + id = string(header[:sp]) + definition = string(bytes.TrimSpace(header[sp+1:])) + } + + // Line 2: sequence + sline := scanner.ReadLine() + if sline == nil { + log.Fatalf("@%s[%s]: unexpected EOF after header", id, source) + } + seqDest := make([]byte, len(sline)) + w := 0 + for _, b := range sline { + if b >= 'A' && b <= 'Z' { + b += 'a' - 'A' + } + if UtoT && b == 'u' { + b = 't' + } + seqDest[w] = b + w++ + } + seqDest = seqDest[:w] + if len(seqDest) == 0 { + log.Fatalf("@%s[%s]: sequence is empty", id, source) + } + + // Line 3: + (skip) + scanner.ReadLine() + + // Line 4: quality + qline := scanner.ReadLine() + + seq := obiseq.NewBioSequenceOwning(id, seqDest, definition) + seq.SetSource(source) + + if with_quality && qline != nil { + qDest := make([]byte, len(qline)) + copy(qDest, qline) + for i := range qDest { + qDest[i] -= quality_shift + } + seq.TakeQualities(qDest) + } + + sequences = append(sequences, seq) + } + + return sequences, nil +} + func _ParseFastqFile( input ChannelFileChunk, out obiiter.IBioSequence, @@ -313,7 +387,14 @@ func _ParseFastqFile( parser := FastqChunkParser(quality_shift, with_quality, UtoT) for chunks := range input { - sequences, err := parser(chunks.Source, chunks.Raw) + var sequences obiseq.BioSequenceSlice + var err error + + if chunks.Rope != nil { + sequences, err = FastqChunkParserRope(chunks.Source, chunks.Rope, quality_shift, with_quality, UtoT) + } else { + sequences, err = parser(chunks.Source, chunks.Raw) + } if err != nil { log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err) @@ -339,7 +420,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e 1024*1024, EndOfLastFastqEntry, "\n@", - true, + false, ) for i := 0; i < nworker; i++ { diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 53a3057..3cabd28 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -29,70 +29,11 @@ const ( var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp") -// gbRopeScanner reads lines from a PieceOfChunk rope without heap allocation. -// The carry buffer (stack) handles lines that span two rope nodes. -type gbRopeScanner struct { - current *PieceOfChunk - pos int - carry [256]byte // max GenBank line = 80 chars; 256 gives ample margin - carryN int -} - -func newGbRopeScanner(rope *PieceOfChunk) *gbRopeScanner { - return &gbRopeScanner{current: rope} -} - -// ReadLine returns the next line without the trailing \n (or \r\n). -// Returns nil at end of rope. The returned slice aliases carry[] or the node -// data and is valid only until the next ReadLine call. -func (s *gbRopeScanner) ReadLine() []byte { - for { - if s.current == nil { - if s.carryN > 0 { - n := s.carryN - s.carryN = 0 - return s.carry[:n] - } - return nil - } - - data := s.current.data[s.pos:] - idx := bytes.IndexByte(data, '\n') - - if idx >= 0 { - var line []byte - if s.carryN == 0 { - line = data[:idx] - } else { - n := copy(s.carry[s.carryN:], data[:idx]) - s.carryN += n - line = s.carry[:s.carryN] - s.carryN = 0 - } - s.pos += idx + 1 - if s.pos >= len(s.current.data) { - s.current = s.current.Next() - s.pos = 0 - } - if len(line) > 0 && line[len(line)-1] == '\r' { - line = line[:len(line)-1] - } - return line - } - - // No \n in this node: accumulate into carry and advance - n := copy(s.carry[s.carryN:], data) - s.carryN += n - s.current = s.current.Next() - s.pos = 0 - } -} - // extractSequence scans the ORIGIN section byte-by-byte directly on the rope, // appending compacted bases to dest. Returns the extended slice. // Stops and returns when "//" is found at the start of a line. // The scanner is left positioned after the "//" line. -func (s *gbRopeScanner) extractSequence(dest []byte, UtoT bool) []byte { +func (s *ropeScanner) extractSequence(dest []byte, UtoT bool) []byte { lineStart := true skipDigits := true @@ -139,24 +80,6 @@ func (s *gbRopeScanner) extractSequence(dest []byte, UtoT bool) []byte { return dest } -// skipToNewline advances the scanner past the next '\n'. -func (s *gbRopeScanner) skipToNewline() { - for s.current != nil { - data := s.current.data[s.pos:] - idx := bytes.IndexByte(data, '\n') - if idx >= 0 { - s.pos += idx + 1 - if s.pos >= len(s.current.data) { - s.current = s.current.Next() - s.pos = 0 - } - return - } - s.current = s.current.Next() - s.pos = 0 - } -} - // parseLseqFromLocus extracts the declared sequence length from a LOCUS line. // Format: "LOCUS bp ..." // Returns -1 if not found or parse error. @@ -205,7 +128,7 @@ func GenbankChunkParserRope(source string, rope *PieceOfChunk, withFeatureTable, UtoT bool) (obiseq.BioSequenceSlice, error) { state := inHeader - scanner := newGbRopeScanner(rope) + scanner := newRopeScanner(rope) sequences := obiseq.MakeBioSequenceSlice(100)[:0] id := "" diff --git a/pkg/obiformats/rope_scanner.go b/pkg/obiformats/rope_scanner.go new file mode 100644 index 0000000..a7217f5 --- /dev/null +++ b/pkg/obiformats/rope_scanner.go @@ -0,0 +1,80 @@ +package obiformats + +import "bytes" + +// ropeScanner reads lines from a PieceOfChunk rope without heap allocation. +// The carry buffer (stack) handles lines that span two rope nodes. +type ropeScanner struct { + current *PieceOfChunk + pos int + carry [256]byte // 256 gives ample margin for typical flat-file lines + carryN int +} + +func newRopeScanner(rope *PieceOfChunk) *ropeScanner { + return &ropeScanner{current: rope} +} + +// ReadLine returns the next line without the trailing \n (or \r\n). +// Returns nil at end of rope. The returned slice aliases carry[] or the node +// data and is valid only until the next ReadLine call. +func (s *ropeScanner) ReadLine() []byte { + for { + if s.current == nil { + if s.carryN > 0 { + n := s.carryN + s.carryN = 0 + return s.carry[:n] + } + return nil + } + + data := s.current.data[s.pos:] + idx := bytes.IndexByte(data, '\n') + + if idx >= 0 { + var line []byte + if s.carryN == 0 { + line = data[:idx] + } else { + n := copy(s.carry[s.carryN:], data[:idx]) + s.carryN += n + line = s.carry[:s.carryN] + s.carryN = 0 + } + s.pos += idx + 1 + if s.pos >= len(s.current.data) { + s.current = s.current.Next() + s.pos = 0 + } + if len(line) > 0 && line[len(line)-1] == '\r' { + line = line[:len(line)-1] + } + return line + } + + // No \n in this node: accumulate into carry and advance + n := copy(s.carry[s.carryN:], data) + s.carryN += n + s.current = s.current.Next() + s.pos = 0 + } +} + +// skipToNewline advances the scanner past the next '\n'. +func (s *ropeScanner) skipToNewline() { + for s.current != nil { + data := s.current.data[s.pos:] + idx := bytes.IndexByte(data, '\n') + if idx >= 0 { + s.pos += idx + 1 + if s.pos >= len(s.current.data) { + s.current = s.current.Next() + s.pos = 0 + } + return + } + s.current = s.current.Next() + s.pos = 0 + } +} diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index f3939d8..a362a34 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -480,6 +480,15 @@ func (s *BioSequence) SetQualities(qualities Quality) { s.qualities = CopySlice(qualities) } +// TakeQualities stores the slice directly without copying. +// The caller must not use the slice after this call. +func (s *BioSequence) TakeQualities(qualities Quality) { + if s.qualities != nil { + RecycleSlice(&s.qualities) + } + s.qualities = qualities +} + // A method that appends a byte slice to the qualities of the BioSequence. func (s *BioSequence) WriteQualities(data []byte) (int, error) { s.qualities = append(s.qualities, data...)