diff --git a/cmd/obitools/obiuniq/main.go b/cmd/obitools/obiuniq/main.go index 959cf91..cba8b73 100644 --- a/cmd/obitools/obiuniq/main.go +++ b/cmd/obitools/obiuniq/main.go @@ -30,6 +30,7 @@ func main() { // trace.Start(ftrace) // defer trace.Stop() + obioptions.SetReadQualities(false) optionParser := obioptions.GenerateOptionParser(obiuniq.OptionSet) _, args := optionParser(os.Args) diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go index bd85a1a..4347180 100644 --- a/pkg/obichunk/chunk_on_disk.go +++ b/pkg/obichunk/chunk_on_disk.go @@ -12,6 +12,13 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" ) +// tempDir creates a temporary directory with a prefix "obiseq_chunks_" +// in the system's temporary directory. It returns the path of the +// created directory and any error encountered during the creation process. +// +// If the directory creation is successful, the path to the new +// temporary directory is returned. If there is an error, it returns +// an empty string and the error encountered. func tempDir() (string, error) { dir, err := os.MkdirTemp(os.TempDir(), "obiseq_chunks_") if err != nil { @@ -20,6 +27,19 @@ func tempDir() (string, error) { return dir, nil } +// find searches for files with a specific extension in the given root directory +// and its subdirectories. It returns a slice of strings containing the paths +// of the found files. +// +// Parameters: +// - root: The root directory to start the search from. +// - ext: The file extension to look for (including the leading dot, e.g., ".txt"). +// +// Returns: +// A slice of strings containing the paths of files that match the specified +// extension. If no files are found, an empty slice is returned. Any errors +// encountered during the directory traversal will be returned as part of the +// WalkDir function's error handling. func find(root, ext string) []string { var a []string filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error { @@ -34,6 +54,24 @@ func find(root, ext string) []string { return a } +// ISequenceChunkOnDisk processes a sequence iterator by distributing the sequences +// into chunks stored on disk. It uses a classifier to determine how to distribute +// the sequences and returns a new iterator for the processed sequences. +// +// Parameters: +// - iterator: An iterator of biosequences to be processed. +// - classifier: A pointer to a BioSequenceClassifier used to classify the sequences +// during distribution. +// +// Returns: +// An iterator of biosequences representing the processed chunks. If an error occurs +// during the creation of the temporary directory or any other operation, it returns +// an error along with a nil iterator. +// +// The function operates asynchronously, creating a temporary directory to store +// the sequence chunks. Once the processing is complete, the temporary directory +// is removed. The function logs the number of batches created and the processing +// status of each batch. func ISequenceChunkOnDisk(iterator obiiter.IBioSequence, classifier *obiseq.BioSequenceClassifier) (obiiter.IBioSequence, error) { dir, err := tempDir() diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go index 0aedf8a..544bace 100644 --- a/pkg/obiformats/dispatcher.go +++ b/pkg/obiformats/dispatcher.go @@ -14,10 +14,40 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" ) +// SequenceBatchWriterToFile is a function type that defines a method for writing +// a batch of biosequences to a specified file. It takes an iterator of biosequences, +// a filename, and optional configuration options, and returns an iterator of biosequences +// along with any error encountered during the writing process. +// +// Parameters: +// - iterator: An iterator of biosequences to be written to the file. +// - filename: The name of the file where the sequences will be written. +// - options: Optional configuration options for the writing process. +// +// Returns: +// An iterator of biosequences that may have been modified during the writing process +// and an error if the writing operation fails. type SequenceBatchWriterToFile func(iterator obiiter.IBioSequence, filename string, options ...WithOption) (obiiter.IBioSequence, error) +// WriterDispatcher manages the writing of data to files based on a given +// prototype name and a dispatcher for distributing the sequences. It +// processes incoming data from the dispatcher in separate goroutines, +// formatting and writing the data to files as specified. +// +// Parameters: +// - prototypename: A string that serves as a template for naming the output files. +// - dispatcher: An instance of IDistribute that provides the data to be written +// and manages the distribution of sequences. +// - formater: A function of type SequenceBatchWriterToFile that formats and writes +// the sequences to the specified file. +// - options: Optional configuration options for the writing process. +// +// The function operates asynchronously, launching goroutines for each new data +// channel received from the dispatcher. It ensures that directories are created +// as needed and handles errors during the writing process. The function blocks +// until all writing jobs are completed. func WriterDispatcher(prototypename string, dispatcher obiiter.IDistribute, formater SequenceBatchWriterToFile, @@ -34,7 +64,7 @@ func WriterDispatcher(prototypename string, data, err := dispatcher.Outputs(newflux) if err != nil { - log.Fatalf("Cannot retreive the new chanel : %v", err) + log.Fatalf("Cannot retrieve the new channel: %v", err) } key := dispatcher.Classifier().Value(newflux) @@ -58,7 +88,7 @@ func WriterDispatcher(prototypename string, info, err := os.Stat(directory) switch { case !os.IsNotExist(err) && !info.IsDir(): - log.Fatalf("Cannot Create the directory %s", directory) + log.Fatalf("Cannot create the directory %s", directory) case os.IsNotExist(err): os.Mkdir(directory, 0755) } @@ -71,7 +101,7 @@ func WriterDispatcher(prototypename string, options...) if err != nil { - log.Fatalf("cannot open the output file for key %s", + log.Fatalf("Cannot open the output file for key %s", dispatcher.Classifier().Value(newflux)) } diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index fe11869..f0716d3 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -131,7 +131,7 @@ func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality out.SetQualities(q) } -func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { +func FastqChunkParser(quality_shift byte, with_quality bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { var identifier string @@ -263,7 +263,9 @@ func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSeq } case 10: if is_end_of_line { - _storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift) + if with_quality { + _storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift) + } state = 11 } else { qualBytes.WriteByte(C) @@ -299,9 +301,10 @@ func _ParseFastqFile( input ChannelSeqFileChunk, out obiiter.IBioSequence, quality_shift byte, + with_quality bool, ) { - parser := FastqChunkParser(quality_shift) + parser := FastqChunkParser(quality_shift, with_quality) for chunks := range input { sequences, err := parser(chunks.Source, chunks.Raw) @@ -339,6 +342,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e chkchan, out, byte(obioptions.InputQualityShift()), + opt.ReadQualities(), ) } diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 0d14d2c..f478b2f 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -20,6 +20,7 @@ type __options__ struct { appendfile bool compressed bool skip_empty bool + with_quality bool csv_id bool csv_sequence bool csv_quality bool @@ -57,6 +58,7 @@ func MakeOptions(setters []WithOption) Options { appendfile: false, compressed: false, skip_empty: false, + with_quality: true, csv_id: true, csv_definition: false, csv_count: false, @@ -133,6 +135,10 @@ func (opt Options) SkipEmptySequence() bool { return opt.pointer.skip_empty } +func (opt Options) ReadQualities() bool { + return opt.pointer.with_quality +} + func (opt Options) CSVId() bool { return opt.pointer.csv_id } @@ -241,6 +247,14 @@ func OptionsSkipEmptySequence(skip bool) WithOption { return f } +func OptionsReadQualities(read bool) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.with_quality = read + }) + + return f +} + func OptionsNewFile() WithOption { f := WithOption(func(opt Options) { opt.pointer.appendfile = false diff --git a/pkg/obiiter/distribute.go b/pkg/obiiter/distribute.go index ee2b080..9015b90 100644 --- a/pkg/obiiter/distribute.go +++ b/pkg/obiiter/distribute.go @@ -4,9 +4,25 @@ import ( "fmt" "sync" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" ) +// IDistribute represents a distribution mechanism for biosequences. +// It manages the outputs of biosequences, provides a channel for +// new data notifications, and maintains a classifier for sequence +// classification. It is designed to facilitate the distribution +// of biosequences to various processing components. +// +// Fields: +// - outputs: A map that associates integer keys with corresponding +// biosequence outputs (IBioSequence). +// - news: A channel that sends notifications of new data available +// for processing, represented by integer identifiers. +// - classifier: A pointer to a BioSequenceClassifier used to classify +// the biosequences during distribution. +// - lock: A mutex for synchronizing access to the outputs and other +// shared resources to ensure thread safety. type IDistribute struct { outputs map[int]IBioSequence news chan int @@ -26,16 +42,39 @@ func (dist *IDistribute) Outputs(key int) (IBioSequence, error) { return iter, nil } +// News returns a channel that provides notifications of new data +// available for processing. The channel sends integer identifiers +// representing the new data. func (dist *IDistribute) News() chan int { return dist.news } +// Classifier returns a pointer to the BioSequenceClassifier +// associated with the distribution mechanism. This classifier +// is used to classify biosequences during the distribution process. func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier { return dist.classifier } +// Distribute organizes the biosequences from the iterator into batches +// based on the provided classifier and batch sizes. It returns an +// IDistribute instance that manages the distribution of the sequences. +// +// Parameters: +// - class: A pointer to a BioSequenceClassifier used to classify +// the biosequences during distribution. +// - sizes: Optional integer values specifying the batch size. If +// no sizes are provided, a default batch size of 5000 is used. +// +// Returns: +// An IDistribute instance that contains the outputs of the +// classified biosequences, a channel for new data notifications, +// and the classifier used for distribution. The method operates +// asynchronously, processing the sequences in separate goroutines. +// It ensures that the outputs are closed and cleaned up once +// processing is complete. func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute { - batchsize := 5000 + batchsize := obioptions.CLIBatchSize() outputs := make(map[int]IBioSequence, 100) slices := make(map[int]*obiseq.BioSequenceSlice, 100) diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index e8b45e8..310d65c 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -27,6 +27,7 @@ var _PprofMudex = 10 var _PprofGoroutine = 6060 var _Quality_Shift_Input = byte(33) var _Quality_Shift_Output = byte(33) +var _Read_Qualities = true type ArgumentParser func([]string) (*getoptions.GetOpt, []string) @@ -259,6 +260,10 @@ func CLIBatchSize() int { return _BatchSize } +func CLIReadQualities() bool { + return _Read_Qualities +} + // SetDebugOn sets the debug mode on. func SetDebugOn() { _Debug = true @@ -269,6 +274,10 @@ func SetDebugOff() { _Debug = false } +func SetReadQualities(status bool) { + _Read_Qualities = status +} + // SetWorkerPerCore sets the number of workers per CPU core. // // It takes a float64 parameter representing the number of workers diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index c904d8d..e800a5d 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,7 +7,7 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "7884a74" +var _Commit = "d29a56d" var _Version = "Release 4.3.0" // Version returns the version of the obitools package. diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index 83b22fe..9c1eaa7 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -99,6 +99,8 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { opts = append(opts, obiformats.OptionsFastSeqHeaderParser(obiformats.ParseGuessedFastSeqHeader)) } + opts = append(opts, obiformats.OptionsReadQualities(obioptions.CLIReadQualities())) + nworkers := obioptions.CLIReadParallelWorkers() if nworkers < 2 { nworkers = 2