Work on EMBL and Genbank parser efficienct

Former-commit-id: 309cc9ce4eea4c8085d7d4451a66a81710532f07
This commit is contained in:
2024-02-20 13:23:07 +01:00
parent 95caebec8f
commit 1542ce4c63
4 changed files with 69 additions and 14 deletions

View File

@ -94,7 +94,11 @@ func _EndOfLastEntry(buff []byte) int {
return -1
}
func _ParseEmblFile(source string, input <-chan _FileChunk, out obiiter.IBioSequence) {
func _ParseEmblFile(source string, input <-chan _FileChunk,
out obiiter.IBioSequence,
withFeatureTable bool,
batch_size int,
total_seq_size int) {
for chunks := range input {
scanner := bufio.NewScanner(chunks.raw)
@ -120,14 +124,16 @@ func _ParseEmblFile(source string, input <-chan _FileChunk, out obiiter.IBioSequ
defBytes.WriteByte(' ')
}
defBytes.WriteString(strings.TrimSpace(line[5:]))
case strings.HasPrefix(line, "FH "):
case withFeatureTable && strings.HasPrefix(line, "FH "):
featBytes.WriteString(line)
case line == "FH":
case withFeatureTable && line == "FH":
featBytes.WriteByte('\n')
featBytes.WriteString(line)
case strings.HasPrefix(line, "FT "):
featBytes.WriteByte('\n')
featBytes.WriteString(line)
if withFeatureTable {
featBytes.WriteByte('\n')
featBytes.WriteString(line)
}
if strings.HasPrefix(line, `FT /db_xref="taxon:`) {
taxid, _ = strconv.Atoi(strings.SplitN(line[37:], `"`, 2)[0])
}
@ -143,7 +149,9 @@ func _ParseEmblFile(source string, input <-chan _FileChunk, out obiiter.IBioSequ
defBytes.String())
sequence.SetSource(source)
sequence.SetFeatures(featBytes.Bytes())
if withFeatureTable {
sequence.SetFeatures(featBytes.Bytes())
}
annot := sequence.Annotations()
annot["scientific_name"] = scientificName
@ -198,7 +206,7 @@ func _ReadFlatFileChunk(reader io.Reader, readers chan _FileChunk) {
for err == nil {
// Create an extended buffer to read from if the end of the last entry is not found in the current buffer
extbuff := make([]byte, 1<<22)
extbuff := make([]byte, _FileChunkSize)
end := 0
ic := 0
@ -278,7 +286,9 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
// for j := 0; j < opt.ParallelWorkers(); j++ {
for j := 0; j < nworkers; j++ {
go _ParseEmblFile(opt.Source(), entry_channel, newIter)
go _ParseEmblFile(opt.Source(), entry_channel, newIter,
opt.WithFeatureTable(),
opt.BatchSize(), opt.TotalSeqSize())
}
go _ReadFlatFileChunk(reader, entry_channel)

View File

@ -30,7 +30,10 @@ var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp")
func _ParseGenbankFile(source string,
input <-chan _FileChunk, out obiiter.IBioSequence,
chunck_order func() int) {
chunck_order func() int,
withFeatureTable bool,
batch_size int,
total_seq_size int) {
state := inHeader
previous_chunk := -1
@ -143,7 +146,10 @@ func _ParseGenbankFile(source string,
seqBytes.Bytes(),
defBytes.String())
sequence.SetSource(source)
sequence.SetFeatures(featBytes.Bytes())
if withFeatureTable {
sequence.SetFeatures(featBytes.Bytes())
}
annot := sequence.Annotations()
annot["scientific_name"] = scientificName
@ -155,7 +161,7 @@ func _ParseGenbankFile(source string,
sequences = append(sequences, sequence)
sumlength += sequence.Len()
if len(sequences) == 100 || sumlength > 1e7 {
if len(sequences) == batch_size || sumlength > total_seq_size {
log.Debugln("Pushing sequences")
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
sequences = make(obiseq.BioSequenceSlice, 0, 100)
@ -184,8 +190,10 @@ func _ParseGenbankFile(source string,
default:
switch state {
case inFeature:
featBytes.WriteByte('\n')
featBytes.WriteString(line)
if withFeatureTable {
featBytes.WriteByte('\n')
featBytes.WriteString(line)
}
if strings.HasPrefix(line, ` /db_xref="taxon:`) {
taxid, _ = strconv.Atoi(strings.SplitN(line[37:], `"`, 2)[0])
}
@ -227,7 +235,8 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
// for j := 0; j < opt.ParallelWorkers(); j++ {
for j := 0; j < nworkers; j++ {
go _ParseGenbankFile(opt.Source(), entry_channel, newIter, chunck_order)
go _ParseGenbankFile(opt.Source(), entry_channel, newIter, chunck_order,
opt.WithFeatureTable(), opt.BatchSize(), opt.TotalSeqSize())
}
go _ReadFlatFileChunk(reader, entry_channel)

View File

@ -11,6 +11,7 @@ type __options__ struct {
with_progress_bar bool
buffer_size int
batch_size int
total_seq_size int
full_file_batch bool
parallel_workers int
closefile bool
@ -29,6 +30,7 @@ type __options__ struct {
csv_auto bool
paired_filename string
source string
with_feature_table bool
}
type Options struct {
@ -45,6 +47,7 @@ func MakeOptions(setters []WithOption) Options {
buffer_size: 2,
parallel_workers: obioptions.CLIReadParallelWorkers(),
batch_size: obioptions.CLIBatchSize(),
total_seq_size: 1024 * 1024 * 100, // 100 MB by default
full_file_batch: false,
closefile: false,
appendfile: false,
@ -62,6 +65,7 @@ func MakeOptions(setters []WithOption) Options {
csv_auto: false,
paired_filename: "",
source: "",
with_feature_table: false,
}
opt := Options{&o}
@ -77,6 +81,10 @@ func (opt Options) BatchSize() int {
return opt.pointer.batch_size
}
func (opt Options) TotalSeqSize() int {
return opt.pointer.total_seq_size
}
func (opt Options) FullFileBatch() bool {
return opt.pointer.full_file_batch
}
@ -169,6 +177,10 @@ func (opt Options) Source() string {
return opt.pointer.source
}
func (opt Options) WithFeatureTable() bool {
return opt.pointer.with_feature_table
}
func OptionCloseFile() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.closefile = true
@ -259,6 +271,14 @@ func OptionsBatchSize(size int) WithOption {
return f
}
func OptionsBatchSizeDefault(bp int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.batch_size = bp
})
return f
}
func OptionsFullFileBatch(full bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.full_file_batch = full
@ -386,3 +406,11 @@ func CSVAutoColumn(auto bool) WithOption {
return f
}
func WithFeatureTable(with bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_feature_table = with
})
return f
}