mirror of
https://github.com/metabarcoding/obitools4.git
synced 2025-06-29 16:20:46 +00:00
127 lines
3.6 KiB
Go
127 lines
3.6 KiB
Go
package obichunk
|
|
|
|
import (
|
|
"io/fs"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
|
)
|
|
|
|
// tempDir creates a temporary directory with a prefix "obiseq_chunks_"
|
|
// in the system's temporary directory. It returns the path of the
|
|
// created directory and any error encountered during the creation process.
|
|
//
|
|
// If the directory creation is successful, the path to the new
|
|
// temporary directory is returned. If there is an error, it returns
|
|
// an empty string and the error encountered.
|
|
func tempDir() (string, error) {
|
|
dir, err := os.MkdirTemp(os.TempDir(), "obiseq_chunks_")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return dir, nil
|
|
}
|
|
|
|
// find searches for files with a specific extension in the given root directory
|
|
// and its subdirectories. It returns a slice of strings containing the paths
|
|
// of the found files.
|
|
//
|
|
// Parameters:
|
|
// - root: The root directory to start the search from.
|
|
// - ext: The file extension to look for (including the leading dot, e.g., ".txt").
|
|
//
|
|
// Returns:
|
|
// A slice of strings containing the paths of files that match the specified
|
|
// extension. If no files are found, an empty slice is returned. Any errors
|
|
// encountered during the directory traversal will be returned as part of the
|
|
// WalkDir function's error handling.
|
|
func find(root, ext string) []string {
|
|
var a []string
|
|
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
|
|
if e != nil {
|
|
return e
|
|
}
|
|
if filepath.Ext(d.Name()) == ext {
|
|
a = append(a, s)
|
|
}
|
|
return nil
|
|
})
|
|
return a
|
|
}
|
|
|
|
// ISequenceChunkOnDisk processes a sequence iterator by distributing the sequences
|
|
// into chunks stored on disk. It uses a classifier to determine how to distribute
|
|
// the sequences and returns a new iterator for the processed sequences.
|
|
//
|
|
// Parameters:
|
|
// - iterator: An iterator of biosequences to be processed.
|
|
// - classifier: A pointer to a BioSequenceClassifier used to classify the sequences
|
|
// during distribution.
|
|
//
|
|
// Returns:
|
|
// An iterator of biosequences representing the processed chunks. If an error occurs
|
|
// during the creation of the temporary directory or any other operation, it returns
|
|
// an error along with a nil iterator.
|
|
//
|
|
// The function operates asynchronously, creating a temporary directory to store
|
|
// the sequence chunks. Once the processing is complete, the temporary directory
|
|
// is removed. The function logs the number of batches created and the processing
|
|
// status of each batch.
|
|
func ISequenceChunkOnDisk(iterator obiiter.IBioSequence,
|
|
classifier *obiseq.BioSequenceClassifier) (obiiter.IBioSequence, error) {
|
|
dir, err := tempDir()
|
|
if err != nil {
|
|
return obiiter.NilIBioSequence, err
|
|
}
|
|
|
|
newIter := obiiter.MakeIBioSequence()
|
|
|
|
newIter.Add(1)
|
|
|
|
go func() {
|
|
defer func() {
|
|
os.RemoveAll(dir)
|
|
log.Debugln("Clear the cache directory")
|
|
}()
|
|
|
|
newIter.Wait()
|
|
newIter.Close()
|
|
}()
|
|
|
|
obiformats.WriterDispatcher(dir+"/chunk_%s.fastx",
|
|
iterator.Distribute(classifier),
|
|
obiformats.WriteSequencesToFile,
|
|
)
|
|
|
|
fileNames := find(dir, ".fastx")
|
|
nbatch := len(fileNames)
|
|
log.Infof("Data splitted over %d batches", nbatch)
|
|
|
|
go func() {
|
|
|
|
for order, file := range fileNames {
|
|
iseq, err := obiformats.ReadSequencesFromFile(file)
|
|
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
source, chunk := iseq.Load()
|
|
|
|
newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunk))
|
|
log.Infof("Start processing of batch %d/%d : %d sequences",
|
|
order, nbatch, len(chunk))
|
|
|
|
}
|
|
|
|
newIter.Done()
|
|
}()
|
|
|
|
return newIter, err
|
|
}
|