mirror of
https://github.com/metabarcoding/obitools4.git
synced 2025-06-29 16:20:46 +00:00
reduce the memory impact of obiuniq.
This commit is contained in:
@ -14,10 +14,40 @@ import (
|
||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
||||
)
|
||||
|
||||
// SequenceBatchWriterToFile is a function type that defines a method for writing
|
||||
// a batch of biosequences to a specified file. It takes an iterator of biosequences,
|
||||
// a filename, and optional configuration options, and returns an iterator of biosequences
|
||||
// along with any error encountered during the writing process.
|
||||
//
|
||||
// Parameters:
|
||||
// - iterator: An iterator of biosequences to be written to the file.
|
||||
// - filename: The name of the file where the sequences will be written.
|
||||
// - options: Optional configuration options for the writing process.
|
||||
//
|
||||
// Returns:
|
||||
// An iterator of biosequences that may have been modified during the writing process
|
||||
// and an error if the writing operation fails.
|
||||
type SequenceBatchWriterToFile func(iterator obiiter.IBioSequence,
|
||||
filename string,
|
||||
options ...WithOption) (obiiter.IBioSequence, error)
|
||||
|
||||
// WriterDispatcher manages the writing of data to files based on a given
|
||||
// prototype name and a dispatcher for distributing the sequences. It
|
||||
// processes incoming data from the dispatcher in separate goroutines,
|
||||
// formatting and writing the data to files as specified.
|
||||
//
|
||||
// Parameters:
|
||||
// - prototypename: A string that serves as a template for naming the output files.
|
||||
// - dispatcher: An instance of IDistribute that provides the data to be written
|
||||
// and manages the distribution of sequences.
|
||||
// - formater: A function of type SequenceBatchWriterToFile that formats and writes
|
||||
// the sequences to the specified file.
|
||||
// - options: Optional configuration options for the writing process.
|
||||
//
|
||||
// The function operates asynchronously, launching goroutines for each new data
|
||||
// channel received from the dispatcher. It ensures that directories are created
|
||||
// as needed and handles errors during the writing process. The function blocks
|
||||
// until all writing jobs are completed.
|
||||
func WriterDispatcher(prototypename string,
|
||||
dispatcher obiiter.IDistribute,
|
||||
formater SequenceBatchWriterToFile,
|
||||
@ -34,7 +64,7 @@ func WriterDispatcher(prototypename string,
|
||||
data, err := dispatcher.Outputs(newflux)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot retreive the new chanel : %v", err)
|
||||
log.Fatalf("Cannot retrieve the new channel: %v", err)
|
||||
}
|
||||
|
||||
key := dispatcher.Classifier().Value(newflux)
|
||||
@ -58,7 +88,7 @@ func WriterDispatcher(prototypename string,
|
||||
info, err := os.Stat(directory)
|
||||
switch {
|
||||
case !os.IsNotExist(err) && !info.IsDir():
|
||||
log.Fatalf("Cannot Create the directory %s", directory)
|
||||
log.Fatalf("Cannot create the directory %s", directory)
|
||||
case os.IsNotExist(err):
|
||||
os.Mkdir(directory, 0755)
|
||||
}
|
||||
@ -71,7 +101,7 @@ func WriterDispatcher(prototypename string,
|
||||
options...)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("cannot open the output file for key %s",
|
||||
log.Fatalf("Cannot open the output file for key %s",
|
||||
dispatcher.Classifier().Value(newflux))
|
||||
}
|
||||
|
||||
|
@ -131,7 +131,7 @@ func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality
|
||||
out.SetQualities(q)
|
||||
}
|
||||
|
||||
func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
|
||||
func FastqChunkParser(quality_shift byte, with_quality bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
|
||||
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
|
||||
|
||||
var identifier string
|
||||
@ -263,7 +263,9 @@ func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSeq
|
||||
}
|
||||
case 10:
|
||||
if is_end_of_line {
|
||||
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
|
||||
if with_quality {
|
||||
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
|
||||
}
|
||||
state = 11
|
||||
} else {
|
||||
qualBytes.WriteByte(C)
|
||||
@ -299,9 +301,10 @@ func _ParseFastqFile(
|
||||
input ChannelSeqFileChunk,
|
||||
out obiiter.IBioSequence,
|
||||
quality_shift byte,
|
||||
with_quality bool,
|
||||
) {
|
||||
|
||||
parser := FastqChunkParser(quality_shift)
|
||||
parser := FastqChunkParser(quality_shift, with_quality)
|
||||
|
||||
for chunks := range input {
|
||||
sequences, err := parser(chunks.Source, chunks.Raw)
|
||||
@ -339,6 +342,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
|
||||
chkchan,
|
||||
out,
|
||||
byte(obioptions.InputQualityShift()),
|
||||
opt.ReadQualities(),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@ type __options__ struct {
|
||||
appendfile bool
|
||||
compressed bool
|
||||
skip_empty bool
|
||||
with_quality bool
|
||||
csv_id bool
|
||||
csv_sequence bool
|
||||
csv_quality bool
|
||||
@ -57,6 +58,7 @@ func MakeOptions(setters []WithOption) Options {
|
||||
appendfile: false,
|
||||
compressed: false,
|
||||
skip_empty: false,
|
||||
with_quality: true,
|
||||
csv_id: true,
|
||||
csv_definition: false,
|
||||
csv_count: false,
|
||||
@ -133,6 +135,10 @@ func (opt Options) SkipEmptySequence() bool {
|
||||
return opt.pointer.skip_empty
|
||||
}
|
||||
|
||||
func (opt Options) ReadQualities() bool {
|
||||
return opt.pointer.with_quality
|
||||
}
|
||||
|
||||
func (opt Options) CSVId() bool {
|
||||
return opt.pointer.csv_id
|
||||
}
|
||||
@ -241,6 +247,14 @@ func OptionsSkipEmptySequence(skip bool) WithOption {
|
||||
return f
|
||||
}
|
||||
|
||||
func OptionsReadQualities(read bool) WithOption {
|
||||
f := WithOption(func(opt Options) {
|
||||
opt.pointer.with_quality = read
|
||||
})
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func OptionsNewFile() WithOption {
|
||||
f := WithOption(func(opt Options) {
|
||||
opt.pointer.appendfile = false
|
||||
|
Reference in New Issue
Block a user