2022-02-14 00:01:01 +01:00
|
|
|
package obiformats
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2023-02-17 22:52:53 +01:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2023-02-17 12:52:19 +01:00
|
|
|
"strings"
|
2022-02-14 00:01:01 +01:00
|
|
|
"sync"
|
|
|
|
|
2024-06-23 00:36:08 +02:00
|
|
|
"github.com/goccy/go-json"
|
|
|
|
|
2023-01-22 22:04:17 +01:00
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
2023-11-29 12:14:37 +01:00
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
2022-02-14 00:01:01 +01:00
|
|
|
)
|
|
|
|
|
2024-11-27 13:30:16 +01:00
|
|
|
// SequenceBatchWriterToFile is a function type that defines a method for writing
|
|
|
|
// a batch of biosequences to a specified file. It takes an iterator of biosequences,
|
|
|
|
// a filename, and optional configuration options, and returns an iterator of biosequences
|
|
|
|
// along with any error encountered during the writing process.
|
|
|
|
//
|
|
|
|
// Parameters:
|
|
|
|
// - iterator: An iterator of biosequences to be written to the file.
|
|
|
|
// - filename: The name of the file where the sequences will be written.
|
|
|
|
// - options: Optional configuration options for the writing process.
|
|
|
|
//
|
|
|
|
// Returns:
|
|
|
|
// An iterator of biosequences that may have been modified during the writing process
|
|
|
|
// and an error if the writing operation fails.
|
2023-01-22 22:04:17 +01:00
|
|
|
type SequenceBatchWriterToFile func(iterator obiiter.IBioSequence,
|
2022-02-14 00:01:01 +01:00
|
|
|
filename string,
|
2023-01-22 22:04:17 +01:00
|
|
|
options ...WithOption) (obiiter.IBioSequence, error)
|
2022-02-14 00:01:01 +01:00
|
|
|
|
2024-11-27 13:30:16 +01:00
|
|
|
// WriterDispatcher manages the writing of data to files based on a given
|
|
|
|
// prototype name and a dispatcher for distributing the sequences. It
|
|
|
|
// processes incoming data from the dispatcher in separate goroutines,
|
|
|
|
// formatting and writing the data to files as specified.
|
|
|
|
//
|
|
|
|
// Parameters:
|
|
|
|
// - prototypename: A string that serves as a template for naming the output files.
|
|
|
|
// - dispatcher: An instance of IDistribute that provides the data to be written
|
|
|
|
// and manages the distribution of sequences.
|
|
|
|
// - formater: A function of type SequenceBatchWriterToFile that formats and writes
|
|
|
|
// the sequences to the specified file.
|
|
|
|
// - options: Optional configuration options for the writing process.
|
|
|
|
//
|
|
|
|
// The function operates asynchronously, launching goroutines for each new data
|
|
|
|
// channel received from the dispatcher. It ensures that directories are created
|
|
|
|
// as needed and handles errors during the writing process. The function blocks
|
|
|
|
// until all writing jobs are completed.
|
2022-02-14 00:01:01 +01:00
|
|
|
func WriterDispatcher(prototypename string,
|
2022-02-24 07:08:40 +01:00
|
|
|
dispatcher obiiter.IDistribute,
|
2022-02-14 00:01:01 +01:00
|
|
|
formater SequenceBatchWriterToFile,
|
|
|
|
options ...WithOption) {
|
|
|
|
|
|
|
|
jobDone := sync.WaitGroup{}
|
|
|
|
jobDone.Add(1)
|
|
|
|
|
|
|
|
go func() {
|
2023-02-17 12:52:19 +01:00
|
|
|
opt := MakeOptions(options)
|
2022-02-14 00:01:01 +01:00
|
|
|
for newflux := range dispatcher.News() {
|
2022-02-15 00:47:02 +01:00
|
|
|
jobDone.Add(1)
|
2022-02-18 22:53:09 +01:00
|
|
|
go func(newflux int) {
|
2022-02-15 00:47:02 +01:00
|
|
|
data, err := dispatcher.Outputs(newflux)
|
|
|
|
|
|
|
|
if err != nil {
|
2024-11-27 13:30:16 +01:00
|
|
|
log.Fatalf("Cannot retrieve the new channel: %v", err)
|
2022-02-15 00:47:02 +01:00
|
|
|
}
|
|
|
|
|
2023-02-17 22:52:53 +01:00
|
|
|
key := dispatcher.Classifier().Value(newflux)
|
|
|
|
directory := ""
|
|
|
|
if dispatcher.Classifier().Type == "DualAnnotationClassifier" {
|
|
|
|
var keys [2]string
|
|
|
|
err := json.Unmarshal([]byte(key), &keys)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("Error in parsing dispatch key %s", key)
|
|
|
|
}
|
|
|
|
key = keys[0]
|
|
|
|
directory = keys[1]
|
|
|
|
}
|
|
|
|
|
|
|
|
name := fmt.Sprintf(prototypename, key)
|
|
|
|
if opt.CompressedFile() && !strings.HasSuffix(name, ".gz") {
|
2023-02-17 12:52:19 +01:00
|
|
|
name = name + ".gz"
|
|
|
|
}
|
2023-02-17 22:52:53 +01:00
|
|
|
|
|
|
|
if directory != "" {
|
|
|
|
info, err := os.Stat(directory)
|
|
|
|
switch {
|
|
|
|
case !os.IsNotExist(err) && !info.IsDir():
|
2024-11-27 13:30:16 +01:00
|
|
|
log.Fatalf("Cannot create the directory %s", directory)
|
2023-02-17 22:52:53 +01:00
|
|
|
case os.IsNotExist(err):
|
|
|
|
os.Mkdir(directory, 0755)
|
|
|
|
}
|
|
|
|
|
|
|
|
name = filepath.Join(directory, name)
|
|
|
|
}
|
|
|
|
|
2022-02-14 09:12:57 +01:00
|
|
|
out, err := formater(data,
|
2023-02-17 12:52:19 +01:00
|
|
|
name,
|
2022-02-14 09:12:57 +01:00
|
|
|
options...)
|
2022-02-15 00:47:02 +01:00
|
|
|
|
2022-02-14 09:12:57 +01:00
|
|
|
if err != nil {
|
2024-11-27 13:30:16 +01:00
|
|
|
log.Fatalf("Cannot open the output file for key %s",
|
2022-02-21 19:00:23 +01:00
|
|
|
dispatcher.Classifier().Value(newflux))
|
2022-02-14 09:12:57 +01:00
|
|
|
}
|
|
|
|
|
2022-02-14 00:01:01 +01:00
|
|
|
out.Recycle()
|
|
|
|
jobDone.Done()
|
2022-02-14 09:12:57 +01:00
|
|
|
}(newflux)
|
2022-02-14 00:01:01 +01:00
|
|
|
}
|
2022-02-15 00:47:02 +01:00
|
|
|
jobDone.Done()
|
2022-02-14 00:01:01 +01:00
|
|
|
}()
|
|
|
|
|
|
|
|
jobDone.Wait()
|
|
|
|
}
|