mirror of
https://github.com/metabarcoding/obitools4.git
synced 2026-03-25 05:20:52 +00:00
Refactor rope scanner and add FASTQ rope parser
This commit refactors the rope scanner implementation by renaming gbRopeScanner to ropeScanner and extracting the common functionality into a new file. It also introduces a new FastqChunkParserRope function that parses FASTQ chunks directly from a rope without Pack(), enabling more efficient memory usage. The existing parsers are updated to use the new rope-based parser when available. The BioSequence type is enhanced with a TakeQualities method for more efficient quality data handling.
This commit is contained in:
@@ -214,7 +214,7 @@ func FastaChunkParser(UtoT bool) func(string, io.Reader) (obiseq.BioSequenceSlic
|
|||||||
// Stops when '>' is found at the start of a line (next record) or at EOF.
|
// Stops when '>' is found at the start of a line (next record) or at EOF.
|
||||||
// Returns (dest with appended bases, hasMore).
|
// Returns (dest with appended bases, hasMore).
|
||||||
// hasMore=true means scanner is now positioned at '>' of the next record.
|
// hasMore=true means scanner is now positioned at '>' of the next record.
|
||||||
func (s *gbRopeScanner) extractFastaSeq(dest []byte, UtoT bool) ([]byte, bool) {
|
func (s *ropeScanner) extractFastaSeq(dest []byte, UtoT bool) ([]byte, bool) {
|
||||||
lineStart := true
|
lineStart := true
|
||||||
|
|
||||||
for s.current != nil {
|
for s.current != nil {
|
||||||
@@ -252,7 +252,7 @@ func (s *gbRopeScanner) extractFastaSeq(dest []byte, UtoT bool) ([]byte, bool) {
|
|||||||
|
|
||||||
// FastaChunkParserRope parses a FASTA chunk directly from the rope without Pack().
|
// FastaChunkParserRope parses a FASTA chunk directly from the rope without Pack().
|
||||||
func FastaChunkParserRope(source string, rope *PieceOfChunk, UtoT bool) (obiseq.BioSequenceSlice, error) {
|
func FastaChunkParserRope(source string, rope *PieceOfChunk, UtoT bool) (obiseq.BioSequenceSlice, error) {
|
||||||
scanner := newGbRopeScanner(rope)
|
scanner := newRopeScanner(rope)
|
||||||
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
|
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|||||||
@@ -303,6 +303,80 @@ func FastqChunkParser(quality_shift byte, with_quality bool, UtoT bool) func(str
|
|||||||
return parser
|
return parser
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FastqChunkParserRope parses a FASTQ chunk directly from a rope without Pack().
|
||||||
|
func FastqChunkParserRope(source string, rope *PieceOfChunk, quality_shift byte, with_quality, UtoT bool) (obiseq.BioSequenceSlice, error) {
|
||||||
|
scanner := newRopeScanner(rope)
|
||||||
|
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Line 1: @id [definition]
|
||||||
|
hline := scanner.ReadLine()
|
||||||
|
if hline == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if len(hline) == 0 || hline[0] != '@' {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
header := hline[1:]
|
||||||
|
var id string
|
||||||
|
var definition string
|
||||||
|
sp := bytes.IndexByte(header, ' ')
|
||||||
|
if sp < 0 {
|
||||||
|
sp = bytes.IndexByte(header, '\t')
|
||||||
|
}
|
||||||
|
if sp < 0 {
|
||||||
|
id = string(header)
|
||||||
|
} else {
|
||||||
|
id = string(header[:sp])
|
||||||
|
definition = string(bytes.TrimSpace(header[sp+1:]))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Line 2: sequence
|
||||||
|
sline := scanner.ReadLine()
|
||||||
|
if sline == nil {
|
||||||
|
log.Fatalf("@%s[%s]: unexpected EOF after header", id, source)
|
||||||
|
}
|
||||||
|
seqDest := make([]byte, len(sline))
|
||||||
|
w := 0
|
||||||
|
for _, b := range sline {
|
||||||
|
if b >= 'A' && b <= 'Z' {
|
||||||
|
b += 'a' - 'A'
|
||||||
|
}
|
||||||
|
if UtoT && b == 'u' {
|
||||||
|
b = 't'
|
||||||
|
}
|
||||||
|
seqDest[w] = b
|
||||||
|
w++
|
||||||
|
}
|
||||||
|
seqDest = seqDest[:w]
|
||||||
|
if len(seqDest) == 0 {
|
||||||
|
log.Fatalf("@%s[%s]: sequence is empty", id, source)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Line 3: + (skip)
|
||||||
|
scanner.ReadLine()
|
||||||
|
|
||||||
|
// Line 4: quality
|
||||||
|
qline := scanner.ReadLine()
|
||||||
|
|
||||||
|
seq := obiseq.NewBioSequenceOwning(id, seqDest, definition)
|
||||||
|
seq.SetSource(source)
|
||||||
|
|
||||||
|
if with_quality && qline != nil {
|
||||||
|
qDest := make([]byte, len(qline))
|
||||||
|
copy(qDest, qline)
|
||||||
|
for i := range qDest {
|
||||||
|
qDest[i] -= quality_shift
|
||||||
|
}
|
||||||
|
seq.TakeQualities(qDest)
|
||||||
|
}
|
||||||
|
|
||||||
|
sequences = append(sequences, seq)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sequences, nil
|
||||||
|
}
|
||||||
|
|
||||||
func _ParseFastqFile(
|
func _ParseFastqFile(
|
||||||
input ChannelFileChunk,
|
input ChannelFileChunk,
|
||||||
out obiiter.IBioSequence,
|
out obiiter.IBioSequence,
|
||||||
@@ -313,7 +387,14 @@ func _ParseFastqFile(
|
|||||||
parser := FastqChunkParser(quality_shift, with_quality, UtoT)
|
parser := FastqChunkParser(quality_shift, with_quality, UtoT)
|
||||||
|
|
||||||
for chunks := range input {
|
for chunks := range input {
|
||||||
sequences, err := parser(chunks.Source, chunks.Raw)
|
var sequences obiseq.BioSequenceSlice
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if chunks.Rope != nil {
|
||||||
|
sequences, err = FastqChunkParserRope(chunks.Source, chunks.Rope, quality_shift, with_quality, UtoT)
|
||||||
|
} else {
|
||||||
|
sequences, err = parser(chunks.Source, chunks.Raw)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err)
|
log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err)
|
||||||
@@ -339,7 +420,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
|
|||||||
1024*1024,
|
1024*1024,
|
||||||
EndOfLastFastqEntry,
|
EndOfLastFastqEntry,
|
||||||
"\n@",
|
"\n@",
|
||||||
true,
|
false,
|
||||||
)
|
)
|
||||||
|
|
||||||
for i := 0; i < nworker; i++ {
|
for i := 0; i < nworker; i++ {
|
||||||
|
|||||||
@@ -29,70 +29,11 @@ const (
|
|||||||
|
|
||||||
var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp")
|
var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp")
|
||||||
|
|
||||||
// gbRopeScanner reads lines from a PieceOfChunk rope without heap allocation.
|
|
||||||
// The carry buffer (stack) handles lines that span two rope nodes.
|
|
||||||
type gbRopeScanner struct {
|
|
||||||
current *PieceOfChunk
|
|
||||||
pos int
|
|
||||||
carry [256]byte // max GenBank line = 80 chars; 256 gives ample margin
|
|
||||||
carryN int
|
|
||||||
}
|
|
||||||
|
|
||||||
func newGbRopeScanner(rope *PieceOfChunk) *gbRopeScanner {
|
|
||||||
return &gbRopeScanner{current: rope}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadLine returns the next line without the trailing \n (or \r\n).
|
|
||||||
// Returns nil at end of rope. The returned slice aliases carry[] or the node
|
|
||||||
// data and is valid only until the next ReadLine call.
|
|
||||||
func (s *gbRopeScanner) ReadLine() []byte {
|
|
||||||
for {
|
|
||||||
if s.current == nil {
|
|
||||||
if s.carryN > 0 {
|
|
||||||
n := s.carryN
|
|
||||||
s.carryN = 0
|
|
||||||
return s.carry[:n]
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
data := s.current.data[s.pos:]
|
|
||||||
idx := bytes.IndexByte(data, '\n')
|
|
||||||
|
|
||||||
if idx >= 0 {
|
|
||||||
var line []byte
|
|
||||||
if s.carryN == 0 {
|
|
||||||
line = data[:idx]
|
|
||||||
} else {
|
|
||||||
n := copy(s.carry[s.carryN:], data[:idx])
|
|
||||||
s.carryN += n
|
|
||||||
line = s.carry[:s.carryN]
|
|
||||||
s.carryN = 0
|
|
||||||
}
|
|
||||||
s.pos += idx + 1
|
|
||||||
if s.pos >= len(s.current.data) {
|
|
||||||
s.current = s.current.Next()
|
|
||||||
s.pos = 0
|
|
||||||
}
|
|
||||||
if len(line) > 0 && line[len(line)-1] == '\r' {
|
|
||||||
line = line[:len(line)-1]
|
|
||||||
}
|
|
||||||
return line
|
|
||||||
}
|
|
||||||
|
|
||||||
// No \n in this node: accumulate into carry and advance
|
|
||||||
n := copy(s.carry[s.carryN:], data)
|
|
||||||
s.carryN += n
|
|
||||||
s.current = s.current.Next()
|
|
||||||
s.pos = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// extractSequence scans the ORIGIN section byte-by-byte directly on the rope,
|
// extractSequence scans the ORIGIN section byte-by-byte directly on the rope,
|
||||||
// appending compacted bases to dest. Returns the extended slice.
|
// appending compacted bases to dest. Returns the extended slice.
|
||||||
// Stops and returns when "//" is found at the start of a line.
|
// Stops and returns when "//" is found at the start of a line.
|
||||||
// The scanner is left positioned after the "//" line.
|
// The scanner is left positioned after the "//" line.
|
||||||
func (s *gbRopeScanner) extractSequence(dest []byte, UtoT bool) []byte {
|
func (s *ropeScanner) extractSequence(dest []byte, UtoT bool) []byte {
|
||||||
lineStart := true
|
lineStart := true
|
||||||
skipDigits := true
|
skipDigits := true
|
||||||
|
|
||||||
@@ -139,24 +80,6 @@ func (s *gbRopeScanner) extractSequence(dest []byte, UtoT bool) []byte {
|
|||||||
return dest
|
return dest
|
||||||
}
|
}
|
||||||
|
|
||||||
// skipToNewline advances the scanner past the next '\n'.
|
|
||||||
func (s *gbRopeScanner) skipToNewline() {
|
|
||||||
for s.current != nil {
|
|
||||||
data := s.current.data[s.pos:]
|
|
||||||
idx := bytes.IndexByte(data, '\n')
|
|
||||||
if idx >= 0 {
|
|
||||||
s.pos += idx + 1
|
|
||||||
if s.pos >= len(s.current.data) {
|
|
||||||
s.current = s.current.Next()
|
|
||||||
s.pos = 0
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.current = s.current.Next()
|
|
||||||
s.pos = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseLseqFromLocus extracts the declared sequence length from a LOCUS line.
|
// parseLseqFromLocus extracts the declared sequence length from a LOCUS line.
|
||||||
// Format: "LOCUS <id> <length> bp ..."
|
// Format: "LOCUS <id> <length> bp ..."
|
||||||
// Returns -1 if not found or parse error.
|
// Returns -1 if not found or parse error.
|
||||||
@@ -205,7 +128,7 @@ func GenbankChunkParserRope(source string, rope *PieceOfChunk,
|
|||||||
withFeatureTable, UtoT bool) (obiseq.BioSequenceSlice, error) {
|
withFeatureTable, UtoT bool) (obiseq.BioSequenceSlice, error) {
|
||||||
|
|
||||||
state := inHeader
|
state := inHeader
|
||||||
scanner := newGbRopeScanner(rope)
|
scanner := newRopeScanner(rope)
|
||||||
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
|
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
|
||||||
|
|
||||||
id := ""
|
id := ""
|
||||||
|
|||||||
80
pkg/obiformats/rope_scanner.go
Normal file
80
pkg/obiformats/rope_scanner.go
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package obiformats
|
||||||
|
|
||||||
|
import "bytes"
|
||||||
|
|
||||||
|
// ropeScanner reads lines from a PieceOfChunk rope without heap allocation.
|
||||||
|
// The carry buffer (stack) handles lines that span two rope nodes.
|
||||||
|
type ropeScanner struct {
|
||||||
|
current *PieceOfChunk
|
||||||
|
pos int
|
||||||
|
carry [256]byte // 256 gives ample margin for typical flat-file lines
|
||||||
|
carryN int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRopeScanner(rope *PieceOfChunk) *ropeScanner {
|
||||||
|
return &ropeScanner{current: rope}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadLine returns the next line without the trailing \n (or \r\n).
|
||||||
|
// Returns nil at end of rope. The returned slice aliases carry[] or the node
|
||||||
|
// data and is valid only until the next ReadLine call.
|
||||||
|
func (s *ropeScanner) ReadLine() []byte {
|
||||||
|
for {
|
||||||
|
if s.current == nil {
|
||||||
|
if s.carryN > 0 {
|
||||||
|
n := s.carryN
|
||||||
|
s.carryN = 0
|
||||||
|
return s.carry[:n]
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
data := s.current.data[s.pos:]
|
||||||
|
idx := bytes.IndexByte(data, '\n')
|
||||||
|
|
||||||
|
if idx >= 0 {
|
||||||
|
var line []byte
|
||||||
|
if s.carryN == 0 {
|
||||||
|
line = data[:idx]
|
||||||
|
} else {
|
||||||
|
n := copy(s.carry[s.carryN:], data[:idx])
|
||||||
|
s.carryN += n
|
||||||
|
line = s.carry[:s.carryN]
|
||||||
|
s.carryN = 0
|
||||||
|
}
|
||||||
|
s.pos += idx + 1
|
||||||
|
if s.pos >= len(s.current.data) {
|
||||||
|
s.current = s.current.Next()
|
||||||
|
s.pos = 0
|
||||||
|
}
|
||||||
|
if len(line) > 0 && line[len(line)-1] == '\r' {
|
||||||
|
line = line[:len(line)-1]
|
||||||
|
}
|
||||||
|
return line
|
||||||
|
}
|
||||||
|
|
||||||
|
// No \n in this node: accumulate into carry and advance
|
||||||
|
n := copy(s.carry[s.carryN:], data)
|
||||||
|
s.carryN += n
|
||||||
|
s.current = s.current.Next()
|
||||||
|
s.pos = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// skipToNewline advances the scanner past the next '\n'.
|
||||||
|
func (s *ropeScanner) skipToNewline() {
|
||||||
|
for s.current != nil {
|
||||||
|
data := s.current.data[s.pos:]
|
||||||
|
idx := bytes.IndexByte(data, '\n')
|
||||||
|
if idx >= 0 {
|
||||||
|
s.pos += idx + 1
|
||||||
|
if s.pos >= len(s.current.data) {
|
||||||
|
s.current = s.current.Next()
|
||||||
|
s.pos = 0
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.current = s.current.Next()
|
||||||
|
s.pos = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -480,6 +480,15 @@ func (s *BioSequence) SetQualities(qualities Quality) {
|
|||||||
s.qualities = CopySlice(qualities)
|
s.qualities = CopySlice(qualities)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TakeQualities stores the slice directly without copying.
|
||||||
|
// The caller must not use the slice after this call.
|
||||||
|
func (s *BioSequence) TakeQualities(qualities Quality) {
|
||||||
|
if s.qualities != nil {
|
||||||
|
RecycleSlice(&s.qualities)
|
||||||
|
}
|
||||||
|
s.qualities = qualities
|
||||||
|
}
|
||||||
|
|
||||||
// A method that appends a byte slice to the qualities of the BioSequence.
|
// A method that appends a byte slice to the qualities of the BioSequence.
|
||||||
func (s *BioSequence) WriteQualities(data []byte) (int, error) {
|
func (s *BioSequence) WriteQualities(data []byte) (int, error) {
|
||||||
s.qualities = append(s.qualities, data...)
|
s.qualities = append(s.qualities, data...)
|
||||||
|
|||||||
Reference in New Issue
Block a user