Add some code refactoring from the blackboard branch

This commit is contained in:
Eric Coissac
2024-08-02 12:35:46 +02:00
parent bc1aaaf7d9
commit 1b1cd41fd3
38 changed files with 491 additions and 330 deletions

View File

@@ -111,14 +111,14 @@ func _ParseCsvFile(source string,
slice = append(slice, sequence)
if len(slice) >= batchSize {
out.Push(obiiter.MakeBioSequenceBatch(o, slice))
out.Push(obiiter.MakeBioSequenceBatch(source, o, slice))
o++
slice = obiseq.MakeBioSequenceSlice()
}
}
if len(slice) > 0 {
out.Push(obiiter.MakeBioSequenceBatch(o, slice))
out.Push(obiiter.MakeBioSequenceBatch(source, o, slice))
}
out.Done()

View File

@@ -142,7 +142,7 @@ func WriteCSV(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunck)
chunkchan := make(chan FileChunk)
newIter.Add(nwriters)
var waitWriter sync.WaitGroup
@@ -161,7 +161,7 @@ func WriteCSV(iterator obiiter.IBioSequence,
batch := iterator.Get()
chunkchan <- FileChunck{
chunkchan <- FileChunk{
FormatCVSBatch(batch, opt),
batch.Order(),
}
@@ -171,7 +171,7 @@ func WriteCSV(iterator obiiter.IBioSequence,
}
next_to_send := 0
received := make(map[int]FileChunck, 100)
received := make(map[int]FileChunk, 100)
waitWriter.Add(1)
go func() {

View File

@@ -122,7 +122,7 @@ func __read_ecopcr_bioseq__(file *__ecopcr_file__) (*obiseq.BioSequence, error)
return bseq, nil
}
func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
func ReadEcoPCR(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
tag := make([]byte, 11)
n, _ := reader.Read(tag)
@@ -187,7 +187,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
slice = append(slice, seq)
ii++
if ii >= opt.BatchSize() {
newIter.Push(obiiter.MakeBioSequenceBatch(i, slice))
newIter.Push(obiiter.MakeBioSequenceBatch(opt.Source(), i, slice))
slice = obiseq.MakeBioSequenceSlice()
i++
ii = 0
@@ -198,7 +198,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
}
if len(slice) > 0 {
newIter.Push(obiiter.MakeBioSequenceBatch(i, slice))
newIter.Push(obiiter.MakeBioSequenceBatch(opt.Source(), i, slice))
}
newIter.Done()
@@ -213,7 +213,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
newIter = newIter.CompleteFileIterator()
}
return newIter
return newIter, nil
}
func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
@@ -235,5 +235,5 @@ func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSeq
reader = greader
}
return ReadEcoPCR(reader, options...), nil
return ReadEcoPCR(reader, options...)
}

View File

@@ -15,7 +15,7 @@ import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
// _EndOfLastEntry finds the index of the last entry in the given byte slice 'buff'
// EndOfLastFlatFileEntry finds the index of the last entry in the given byte slice 'buff'
// using a pattern match of the form:
// <CR>?<LF>//<CR>?<LF>
// where <CR> and <LF> are the ASCII codes for carriage return and line feed,
@@ -27,7 +27,7 @@ import (
//
// Returns:
// int - the index of the end of the last entry or -1 if no match is found.
func _EndOfLastEntry(buff []byte) int {
func EndOfLastFlatFileEntry(buff []byte) int {
// 6 5 43 2 1
// <CR>?<LF>//<CR>?<LF>
var i int
@@ -87,15 +87,9 @@ func _EndOfLastEntry(buff []byte) int {
return -1
}
func _ParseEmblFile(source string, input ChannelSeqFileChunk,
out obiiter.IBioSequence,
withFeatureTable bool,
batch_size int,
total_seq_size int) {
for chunks := range input {
scanner := bufio.NewScanner(chunks.raw)
order := chunks.order
func EmblChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
scanner := bufio.NewScanner(input)
sequences := make(obiseq.BioSequenceSlice, 0, 100)
id := ""
scientificName := ""
@@ -156,7 +150,31 @@ func _ParseEmblFile(source string, input ChannelSeqFileChunk,
seqBytes = new(bytes.Buffer)
}
}
out.Push(obiiter.MakeBioSequenceBatch(order, sequences))
return sequences, nil
}
return parser
}
func _ParseEmblFile(
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
withFeatureTable bool,
) {
parser := EmblChunkParser(withFeatureTable)
for chunks := range input {
order := chunks.Order
sequences, err := parser(chunks.Source, chunks.Raw)
if err != nil {
log.Fatalf("%s : Cannot parse the embl file : %v", chunks.Source, err)
}
out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, order, sequences))
}
out.Done()
@@ -166,12 +184,18 @@ func _ParseEmblFile(source string, input ChannelSeqFileChunk,
// 6 5 43 2 1
//
// <CR>?<LF>//<CR>?<LF>
func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options)
buff := make([]byte, 1024*1024*1024*256)
entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry)
entry_channel := ReadSeqFileChunk(
opt.Source(),
reader,
buff,
EndOfLastFlatFileEntry,
)
newIter := obiiter.MakeIBioSequence()
nworkers := opt.ParallelWorkers()
@@ -179,10 +203,11 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
// for j := 0; j < opt.ParallelWorkers(); j++ {
for j := 0; j < nworkers; j++ {
newIter.Add(1)
go _ParseEmblFile(opt.Source(), entry_channel, newIter,
go _ParseEmblFile(
entry_channel,
newIter,
opt.WithFeatureTable(),
opt.BatchSize(),
opt.TotalSeqSize())
)
}
go func() {
@@ -193,7 +218,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
newIter = newIter.CompleteFileIterator()
}
return newIter
return newIter, nil
}
func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
@@ -214,5 +239,5 @@ func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSeque
return obiiter.NilIBioSequence, err
}
return ReadEMBL(reader, options...), nil
return ReadEMBL(reader, options...)
}

View File

@@ -14,7 +14,7 @@ import (
log "github.com/sirupsen/logrus"
)
func _EndOfLastFastaEntry(buffer []byte) int {
func EndOfLastFastaEntry(buffer []byte) int {
var i int
imax := len(buffer)
@@ -39,24 +39,18 @@ func _EndOfLastFastaEntry(buffer []byte) int {
return last
}
func _ParseFastaFile(source string,
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
no_order bool,
batch_size int,
chunck_order func() int,
) {
func FastaChunkParser() func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
var identifier string
var definition string
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
var identifier string
var definition string
idBytes := bytes.Buffer{}
defBytes := bytes.Buffer{}
seqBytes := bytes.Buffer{}
idBytes := bytes.Buffer{}
defBytes := bytes.Buffer{}
seqBytes := bytes.Buffer{}
for chunks := range input {
state := 0
scanner := bufio.NewReader(chunks.raw)
scanner := bufio.NewReader(input)
start, _ := scanner.Peek(20)
if start[0] != '>' {
log.Fatalf("%s : first character is not '>'", string(start))
@@ -64,7 +58,8 @@ func _ParseFastaFile(source string,
if start[1] == ' ' {
log.Fatalf("%s :Strange", string(start))
}
sequences := make(obiseq.BioSequenceSlice, 0, batch_size)
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
previous := byte(0)
@@ -160,12 +155,6 @@ func _ParseFastaFile(source string,
s := obiseq.NewBioSequence(identifier, rawseq, definition)
s.SetSource(source)
sequences = append(sequences, s)
if no_order {
if len(sequences) == batch_size {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
sequences = make(obiseq.BioSequenceSlice, 0, batch_size)
}
}
state = 1
} else {
// Error
@@ -209,13 +198,28 @@ func _ParseFastaFile(source string,
sequences = append(sequences, s)
}
if len(sequences) > 0 {
co := chunks.order
if no_order {
co = chunck_order()
}
out.Push(obiiter.MakeBioSequenceBatch(co, sequences))
return sequences, nil
}
return parser
}
func _ParseFastaFile(
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
) {
parser := FastaChunkParser()
for chunks := range input {
sequences, err := parser(chunks.Source, chunks.Raw)
if err != nil {
log.Fatalf("File %s : Cannot parse the fasta file : %v", chunks.Source, err)
}
out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences))
}
out.Done()
@@ -230,17 +234,16 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
buff := make([]byte, 1024*1024*1024)
chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastaEntry)
chunck_order := obiutils.AtomicCounter()
chkchan := ReadSeqFileChunk(
opt.Source(),
reader,
buff,
EndOfLastFastaEntry,
)
for i := 0; i < nworker; i++ {
out.Add(1)
go _ParseFastaFile(opt.Source(),
chkchan,
out,
opt.NoOrder(),
opt.BatchSize(),
chunck_order)
go _ParseFastaFile(chkchan, out)
}
go func() {
@@ -282,7 +285,7 @@ func ReadFastaFromFile(filename string, options ...WithOption) (obiiter.IBioSequ
}
func ReadFastaFromStdin(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
options = append(options, OptionsSource(obiutils.RemoveAllExt("stdin")))
options = append(options, OptionsSource("stdin"))
input, err := Buf(os.Stdin)
if err == ErrNoContent {

View File

@@ -14,7 +14,7 @@ import (
log "github.com/sirupsen/logrus"
)
func _EndOfLastFastqEntry(buffer []byte) int {
func EndOfLastFastqEntry(buffer []byte) int {
var i int
imax := len(buffer)
@@ -117,27 +117,20 @@ func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality
out.SetQualities(q)
}
func _ParseFastqFile(source string,
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
quality_shift byte,
no_order bool,
batch_size int,
chunck_order func() int,
) {
func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
var identifier string
var definition string
var identifier string
var definition string
idBytes := bytes.Buffer{}
defBytes := bytes.Buffer{}
qualBytes := bytes.Buffer{}
seqBytes := bytes.Buffer{}
idBytes := bytes.Buffer{}
defBytes := bytes.Buffer{}
qualBytes := bytes.Buffer{}
seqBytes := bytes.Buffer{}
for chunks := range input {
state := 0
scanner := bufio.NewReader(chunks.raw)
sequences := make(obiseq.BioSequenceSlice, 0, 100)
scanner := bufio.NewReader(input)
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
previous := byte(0)
for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() {
@@ -257,14 +250,6 @@ func _ParseFastqFile(source string,
case 10:
if is_end_of_line {
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
if no_order {
if len(sequences) == batch_size {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
sequences = make(obiseq.BioSequenceSlice, 0, batch_size)
}
}
state = 11
} else {
qualBytes.WriteByte(C)
@@ -288,14 +273,31 @@ func _ParseFastqFile(source string,
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
state = 1
}
co := chunks.order
if no_order {
co = chunck_order()
}
out.Push(obiiter.MakeBioSequenceBatch(co, sequences))
}
return sequences, nil
}
return parser
}
func _ParseFastqFile(
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
quality_shift byte,
) {
parser := FastqChunkParser(quality_shift)
for chunks := range input {
sequences, err := parser(chunks.Source, chunks.Raw)
if err != nil {
log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err)
}
out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences))
}
out.Done()
@@ -307,21 +309,23 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
out := obiiter.MakeIBioSequence()
nworker := opt.ParallelWorkers()
chunkorder := obiutils.AtomicCounter()
buff := make([]byte, 1024*1024*1024)
chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastqEntry)
chkchan := ReadSeqFileChunk(
opt.Source(),
reader,
buff,
EndOfLastFastqEntry,
)
for i := 0; i < nworker; i++ {
out.Add(1)
go _ParseFastqFile(opt.Source(),
go _ParseFastqFile(
chkchan,
out,
byte(obioptions.InputQualityShift()),
opt.NoOrder(),
opt.BatchSize(),
chunkorder)
)
}
go func() {

View File

@@ -69,7 +69,7 @@ func _FastseqReader(source string,
slice = append(slice, rep)
ii++
if ii >= batch_size {
iterator.Push(obiiter.MakeBioSequenceBatch(i, slice))
iterator.Push(obiiter.MakeBioSequenceBatch(source, i, slice))
slice = obiseq.MakeBioSequenceSlice()
i++
ii = 0
@@ -77,7 +77,7 @@ func _FastseqReader(source string,
}
if len(slice) > 0 {
iterator.Push(obiiter.MakeBioSequenceBatch(i, slice))
iterator.Push(obiiter.MakeBioSequenceBatch(source, i, slice))
}
iterator.Done()

View File

@@ -7,8 +7,6 @@ import (
"io"
"os"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
@@ -76,7 +74,7 @@ func FormatFasta(seq *obiseq.BioSequence, formater FormatHeader) string {
// - skipEmpty: a boolean indicating whether empty sequences should be skipped or not.
//
// It returns a byte array containing the formatted sequences.
func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) []byte {
func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) *bytes.Buffer {
// Create a buffer to store the formatted sequences
var bs bytes.Buffer
@@ -116,7 +114,7 @@ func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, ski
}
// Return the byte array representation of the buffer
return bs.Bytes()
return &bs
}
// WriteFasta writes a given iterator of bio sequences to a file in FASTA format.
@@ -135,21 +133,16 @@ func WriteFasta(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunck)
chunkchan := WriteSeqFileChunk(file, opt.CloseFile())
header_format := opt.FormatFastSeqHeader()
newIter.Add(nwriters)
var waitWriter sync.WaitGroup
go func() {
newIter.WaitAndClose()
for len(chunkchan) > 0 {
time.Sleep(time.Millisecond)
}
close(chunkchan)
waitWriter.Wait()
log.Warnf("Writing fasta file done")
}()
ff := func(iterator obiiter.IBioSequence) {
@@ -159,10 +152,12 @@ func WriteFasta(iterator obiiter.IBioSequence,
log.Debugf("Formating fasta chunk %d", batch.Order())
chunkchan <- FileChunck{
FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()),
batch.Order(),
chunkchan <- SeqFileChunk{
Source: batch.Source(),
Raw: FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()),
Order: batch.Order(),
}
log.Debugf("Fasta chunk %d formated", batch.Order())
newIter.Push(batch)
@@ -176,39 +171,6 @@ func WriteFasta(iterator obiiter.IBioSequence,
go ff(iterator.Split())
}
next_to_send := 0
received := make(map[int]FileChunck, 100)
waitWriter.Add(1)
go func() {
for chunk := range chunkchan {
if chunk.order == next_to_send {
file.Write(chunk.text)
log.Debugf("Fasta chunk %d written", chunk.order)
next_to_send++
chunk, ok := received[next_to_send]
for ok {
file.Write(chunk.text)
log.Debugf("Fasta chunk %d written", chunk.order)
delete(received, next_to_send)
next_to_send++
chunk, ok = received[next_to_send]
}
} else {
log.Debugf("Store Fasta chunk %d", chunk.order)
received[chunk.order] = chunk
}
}
file.Close()
log.Debugln("End of the fasta file writing")
obiiter.UnregisterPipe()
waitWriter.Done()
}()
return newIter, nil
}

View File

@@ -14,6 +14,8 @@ import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
type FormatSeqBatch func(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) *bytes.Buffer
func _formatFastq(buff *bytes.Buffer, seq *obiseq.BioSequence, formater FormatHeader) {
info := ""
@@ -49,7 +51,7 @@ func FormatFastq(seq *obiseq.BioSequence, formater FormatHeader) string {
}
func FormatFastqBatch(batch obiiter.BioSequenceBatch,
formater FormatHeader, skipEmpty bool) []byte {
formater FormatHeader, skipEmpty bool) *bytes.Buffer {
var bs bytes.Buffer
lt := 0
@@ -82,12 +84,10 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch,
}
chunk := bs.Bytes()
return chunk
return &bs
}
type FileChunck struct {
type FileChunk struct {
text []byte
order int
}
@@ -105,8 +105,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunck)
chunkchan := WriteSeqFileChunk(file, opt.CloseFile())
header_format := opt.FormatFastSeqHeader()
@@ -126,9 +125,10 @@ func WriteFastq(iterator obiiter.IBioSequence,
ff := func(iterator obiiter.IBioSequence) {
for iterator.Next() {
batch := iterator.Get()
chunk := FileChunck{
FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()),
batch.Order(),
chunk := SeqFileChunk{
Source: batch.Source(),
Raw: FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()),
Order: batch.Order(),
}
chunkchan <- chunk
newIter.Push(batch)
@@ -142,44 +142,6 @@ func WriteFastq(iterator obiiter.IBioSequence,
go ff(iterator.Split())
}
next_to_send := 0
received := make(map[int]FileChunck, 100)
waitWriter.Add(1)
go func() {
for chunk := range chunkchan {
if chunk.order == next_to_send {
if chunk.text[0] != '@' {
log.Panicln("WriteFastq: FASTQ format error")
}
file.Write(chunk.text)
next_to_send++
chunk, ok := received[next_to_send]
for ok {
if chunk.text[0] != '@' {
log.Panicln("WriteFastq: FASTQ format error")
}
file.Write(chunk.text)
delete(received, next_to_send)
next_to_send++
chunk, ok = received[next_to_send]
}
} else {
if _, ok := received[chunk.order]; ok {
log.Panicln("WriteFastq: Two chunks with the same number")
}
received[chunk.order] = chunk
}
}
file.Close()
log.Debugln("End of the fastq file writing")
obiiter.UnregisterPipe()
waitWriter.Done()
}()
return newIter, nil
}

View File

@@ -29,27 +29,11 @@ const (
var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp")
func _ParseGenbankFile(source string,
input ChannelSeqFileChunk,
out obiiter.IBioSequence,
chunck_order func() int,
withFeatureTable bool,
batch_size int,
total_seq_size int) {
state := inHeader
previous_chunk := -1
for chunks := range input {
if state != inHeader {
log.Fatalf("Unexpected state %d starting new chunk (id = %d, previous_chunk = %d)",
state, chunks.order, previous_chunk)
}
previous_chunk = chunks.order
scanner := bufio.NewReader(chunks.raw)
sequences := make(obiseq.BioSequenceSlice, 0, 100)
sumlength := 0
func GenbankChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
return func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
state := inHeader
scanner := bufio.NewReader(input)
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
id := ""
lseq := -1
scientificName := ""
@@ -64,7 +48,7 @@ func _ParseGenbankFile(source string,
nl++
line = string(bline)
if is_prefix || len(line) > 100 {
log.Fatalf("Chunk %d : Line too long: %s", chunks.order, line)
log.Fatalf("From %s:Line too long: %s", source, line)
}
processed := false
for !processed {
@@ -165,15 +149,6 @@ func _ParseGenbankFile(source string,
// sequence.Len(), seqBytes.Len())
sequences = append(sequences, sequence)
sumlength += sequence.Len()
if len(sequences) == batch_size || sumlength > total_seq_size {
oo := chunck_order()
log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences")
out.Push(obiiter.MakeBioSequenceBatch(oo, sequences))
sequences = make(obiseq.BioSequenceSlice, 0, 100)
sumlength = 0
}
defBytes = bytes.NewBuffer(obiseq.GetSlice(200))
featBytes = new(bytes.Buffer)
@@ -219,11 +194,24 @@ func _ParseGenbankFile(source string,
}
if len(sequences) > 0 {
oo := chunck_order()
log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences")
out.Push(obiiter.MakeBioSequenceBatch(oo, sequences))
return sequences, nil
}
}
func _ParseGenbankFile(input ChannelSeqFileChunk,
out obiiter.IBioSequence,
withFeatureTable bool) {
parser := GenbankChunkParser(withFeatureTable)
for chunks := range input {
sequences, err := parser(chunks.Source, chunks.Raw)
if err != nil {
log.Fatalf("File %s : Cannot parse the genbank file : %v", chunks.Source, err)
}
out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences))
}
log.Debug("End of the Genbank thread")
@@ -231,26 +219,31 @@ func _ParseGenbankFile(source string,
}
func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options)
// entry_channel := make(chan _FileChunk)
buff := make([]byte, 1024*1024*1024*256)
entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry)
entry_channel := ReadSeqFileChunk(
opt.Source(),
reader,
buff,
EndOfLastFlatFileEntry,
)
newIter := obiiter.MakeIBioSequence()
nworkers := opt.ParallelWorkers()
chunck_order := obiutils.AtomicCounter()
// for j := 0; j < opt.ParallelWorkers(); j++ {
for j := 0; j < nworkers; j++ {
newIter.Add(1)
go _ParseGenbankFile(opt.Source(),
entry_channel, newIter, chunck_order,
go _ParseGenbankFile(
entry_channel,
newIter,
opt.WithFeatureTable(),
opt.BatchSize(),
opt.TotalSeqSize())
)
}
// go _ReadFlatFileChunk(reader, entry_channel)
@@ -264,7 +257,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
newIter = newIter.CompleteFileIterator()
}
return newIter
return newIter, nil
}
func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
@@ -285,5 +278,5 @@ func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSe
return obiiter.NilIBioSequence, err
}
return ReadGenbank(reader, options...), nil
return ReadGenbank(reader, options...)
}

View File

@@ -3,7 +3,6 @@ package obiformats
import (
"bufio"
"bytes"
"github.com/goccy/go-json"
"io"
"os"
"strconv"
@@ -11,6 +10,8 @@ import (
"sync"
"time"
"github.com/goccy/go-json"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
@@ -87,7 +88,7 @@ func WriteJSON(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunck)
chunkchan := make(chan FileChunk)
newIter.Add(nwriters)
var waitWriter sync.WaitGroup
@@ -106,7 +107,7 @@ func WriteJSON(iterator obiiter.IBioSequence,
batch := iterator.Get()
chunkchan <- FileChunck{
chunkchan <- FileChunk{
FormatJSONBatch(batch),
batch.Order(),
}
@@ -116,7 +117,7 @@ func WriteJSON(iterator obiiter.IBioSequence,
}
next_to_send := 0
received := make(map[int]FileChunck, 100)
received := make(map[int]FileChunk, 100)
waitWriter.Add(1)
go func() {

View File

@@ -7,7 +7,8 @@ import (
type __options__ struct {
fastseq_header_parser obiseq.SeqAnnotator
fastseq_header_writer func(*obiseq.BioSequence) string
fastseq_header_writer BioSequenceFormater
seqBatchFormater FormatSeqBatch
with_progress_bar bool
buffer_size int
batch_size int
@@ -44,6 +45,7 @@ func MakeOptions(setters []WithOption) Options {
o := __options__{
fastseq_header_parser: ParseGuessedFastSeqHeader,
fastseq_header_writer: FormatFastSeqJsonHeader,
seqBatchFormater: nil,
with_progress_bar: false,
buffer_size: 2,
parallel_workers: obioptions.CLIReadParallelWorkers(),
@@ -103,6 +105,10 @@ func (opt Options) FormatFastSeqHeader() func(*obiseq.BioSequence) string {
return opt.pointer.fastseq_header_writer
}
func (opt Options) SequenceFormater() FormatSeqBatch {
return opt.pointer.seqBatchFormater
}
func (opt Options) NoOrder() bool {
return opt.pointer.no_order
}
@@ -219,8 +225,6 @@ func OptionNoOrder(no_order bool) WithOption {
return f
}
func OptionsCompressed(compressed bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.compressed = compressed
@@ -271,6 +275,14 @@ func OptionsFastSeqHeaderFormat(format func(*obiseq.BioSequence) string) WithOpt
return f
}
func OptionsSequenceFormater(formater FormatSeqBatch) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.seqBatchFormater = formater
})
return f
}
func OptionsParallelWorkers(nworkers int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.parallel_workers = nworkers

View File

@@ -5,14 +5,18 @@ import (
"io"
"slices"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
log "github.com/sirupsen/logrus"
)
var _FileChunkSize = 1 << 28
var _FileChunkSize = 1024 * 1024 * 10
type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error)
type SeqFileChunk struct {
raw io.Reader
order int
Source string
Raw *bytes.Buffer
Order int
}
type ChannelSeqFileChunk chan SeqFileChunk
@@ -32,7 +36,9 @@ type LastSeqRecord func([]byte) int
//
// Returns:
// None
func ReadSeqFileChunk(reader io.Reader,
func ReadSeqFileChunk(
source string,
reader io.Reader,
buff []byte,
splitter LastSeqRecord) ChannelSeqFileChunk {
var err error
@@ -88,7 +94,7 @@ func ReadSeqFileChunk(reader io.Reader,
if len(buff) > 0 {
io := bytes.NewBuffer(slices.Clone(buff))
chunk_channel <- SeqFileChunk{io, i}
chunk_channel <- SeqFileChunk{source, io, i}
i++
}
@@ -96,7 +102,7 @@ func ReadSeqFileChunk(reader io.Reader,
buff = fullbuff[0:lremain]
lcp := copy(buff, fullbuff[pnext:])
if lcp < lremain {
log.Fatalf("Error copying remaining data of chunck %d : %d < %d", i, lcp, lremain)
log.Fatalf("Error copying remaining data of chunk %d : %d < %d", i, lcp, lremain)
}
} else {
buff = buff[:0]
@@ -112,7 +118,7 @@ func ReadSeqFileChunk(reader io.Reader,
// Send the last chunk to the channel
if len(buff) > 0 {
io := bytes.NewBuffer(slices.Clone(buff))
chunk_channel <- SeqFileChunk{io, i}
chunk_channel <- SeqFileChunk{source, io, i}
}
// Close the readers channel when the end of the file is reached

View File

@@ -0,0 +1,51 @@
package obiformats
import (
"io"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
log "github.com/sirupsen/logrus"
)
func WriteSeqFileChunk(
writer io.WriteCloser,
toBeClosed bool) ChannelSeqFileChunk {
obiiter.RegisterAPipe()
chunk_channel := make(ChannelSeqFileChunk)
go func() {
nextToPrint := 0
toBePrinted := make(map[int]SeqFileChunk)
for chunk := range chunk_channel {
if chunk.Order == nextToPrint {
_, _ = writer.Write(chunk.Raw.Bytes())
nextToPrint++
chunk, ok := toBePrinted[nextToPrint]
for ok {
_, _ = writer.Write(chunk.Raw.Bytes())
delete(toBePrinted, nextToPrint)
nextToPrint++
chunk, ok = toBePrinted[nextToPrint]
}
} else {
toBePrinted[chunk.Order] = chunk
}
}
if toBeClosed {
err := writer.Close()
if err != nil {
log.Fatalf("Cannot close the writer : %v", err)
}
}
obiiter.UnregisterPipe()
log.Warnf("The writer has been closed")
}()
return chunk_channel
}

View File

@@ -15,6 +15,8 @@ import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
type SequenceReader func(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error)
// OBIMimeTypeGuesser is a function that takes an io.Reader as input and guesses the MIME type of the data.
// It uses several detectors to identify specific file formats, such as FASTA, FASTQ, ecoPCR2, GenBank, and EMBL.
// The function reads data from the input stream and analyzes it using the mimetype library.
@@ -172,11 +174,11 @@ func ReadSequencesFromFile(filename string,
case "text/fasta":
return ReadFasta(reader, options...)
case "text/ecopcr2":
return ReadEcoPCR(reader, options...), nil
return ReadEcoPCR(reader, options...)
case "text/embl":
return ReadEMBL(reader, options...), nil
return ReadEMBL(reader, options...)
case "text/genbank":
return ReadGenbank(reader, options...), nil
return ReadGenbank(reader, options...)
case "text/csv":
return ReadCSV(reader, options...)
default: