diff --git a/cmd/obitools/obicount/main.go b/cmd/obitools/obicount/main.go index 2b0927d..b0cf51b 100644 --- a/cmd/obitools/obicount/main.go +++ b/cmd/obitools/obicount/main.go @@ -4,8 +4,7 @@ import ( "fmt" "os" - log "github.com/sirupsen/logrus" - + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiblackboard" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obicount" @@ -35,26 +34,27 @@ func main() { _, args := optionParser(os.Args) - obioptions.SetStrictReadWorker(min(4, obioptions.CLIParallelWorkers())) - fs, err := obiconvert.CLIReadBioSequences(args...) + black := obiblackboard.NewBlackBoard(obioptions.CLIParallelWorkers()) - if err != nil { - log.Errorf("Cannot open file (%v)", err) - os.Exit(1) - } + black.ReadSequences(args) - nvariant, nread, nsymbol := fs.Count(true) + counter := obiblackboard.CountSequenceAggregator("to_delete") + + black.RegisterRunner("sequences", counter.Runner) + black.RegisterRunner("to_delete", obiblackboard.RecycleSequences(true, "final")) + + black.Run() if obicount.CLIIsPrintingVariantCount() { - fmt.Printf(" %d", nvariant) + fmt.Printf(" %d", counter.Variants) } if obicount.CLIIsPrintingReadCount() { - fmt.Printf(" %d", nread) + fmt.Printf(" %d", counter.Reads) } if obicount.CLIIsPrintingSymbolCount() { - fmt.Printf(" %d", nsymbol) + fmt.Printf(" %d", counter.Nucleotides) } fmt.Printf("\n") diff --git a/pkg/obiblackboard/blackboard.go b/pkg/obiblackboard/blackboard.go index a110d46..a51796f 100644 --- a/pkg/obiblackboard/blackboard.go +++ b/pkg/obiblackboard/blackboard.go @@ -3,29 +3,25 @@ package obiblackboard import ( "slices" "sync" + "time" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) type DoTask func(*Blackboard, *Task) *Task -type Runner struct { - Run DoTask - To string -} - type Blackboard struct { Board map[int]Queue BoardLock *sync.Mutex - Runners map[string]Runner + Runners map[string]DoTask Running *obiutils.Counter TargetSize int Size int } func doFinal(bb *Blackboard, task *Task) *Task { - if bb.Len() > bb.TargetSize { - return nil + if task.SavedTask != nil { + return task.SavedTask } return NewInitialTask() @@ -33,7 +29,7 @@ func doFinal(bb *Blackboard, task *Task) *Task { func NewBlackBoard(size int) *Blackboard { board := make(map[int]Queue, 0) - runners := make(map[string]Runner, 0) + runners := make(map[string]DoTask, 0) if size < 2 { size = 2 @@ -52,16 +48,13 @@ func NewBlackBoard(size int) *Blackboard { bb.PushTask(NewInitialTask()) } - bb.RegisterRunner("final", "initial", doFinal) + bb.RegisterRunner("final", doFinal) return bb } -func (bb *Blackboard) RegisterRunner(from, to string, runner DoTask) { - bb.Runners[from] = Runner{ - Run: runner, - To: to, - } +func (bb *Blackboard) RegisterRunner(target string, runner DoTask) { + bb.Runners[target] = runner } func (bb *Blackboard) MaxQueue() Queue { @@ -125,10 +118,7 @@ func (bb *Blackboard) Run() { runner, ok := bb.Runners[task.Role] if ok { - task = runner.Run(bb, task) - if task != nil { - task.Role = runner.To - } + task = runner(bb, task) } bb.PushTask(task) @@ -155,9 +145,10 @@ func (bb *Blackboard) Run() { ctask <- task } else { bb.Running.Dec() - if bb.Running.Value() <= 0 { + if bb.Running.Value()+bb.Len() <= 0 { break } + time.Sleep(time.Millisecond) } } @@ -168,6 +159,46 @@ func (bb *Blackboard) Run() { lock.Wait() } +// func (bb *Blackboard) Run() { +// lock := &sync.WaitGroup{} + +// launcher := func(runner DoTask, task *Task) { +// task = runner(bb, task) + +// if task != nil { +// for bb.Len() > bb.TargetSize { +// time.Sleep(time.Millisecond) +// } +// bb.PushTask(task) +// } + +// bb.Running.Dec() +// lock.Done() +// } + +// lock.Add(1) + +// func() { +// for bb.Len()+bb.Running.Value() > 0 { +// bb.Running.Inc() +// task := bb.PopTask() + +// if task != nil { +// lock.Add(1) +// go launcher(bb.Runners[task.Role], task) +// } else { +// bb.Running.Dec() +// } +// } + +// lock.Done() +// }() + +// lock.Wait() +// } + func (bb *Blackboard) Len() int { return bb.Size } + +// 151431044 151431044 15083822152 diff --git a/pkg/obiblackboard/count_sequences.go b/pkg/obiblackboard/count_sequences.go new file mode 100644 index 0000000..f115d05 --- /dev/null +++ b/pkg/obiblackboard/count_sequences.go @@ -0,0 +1,50 @@ +package obiblackboard + +import ( + "sync" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" +) + +type SequenceCounter struct { + Variants int + Reads int + Nucleotides int + Runner DoTask +} + +func CountSequenceAggregator(target string) *SequenceCounter { + cc := &SequenceCounter{ + Variants: 0, + Reads: 0, + Nucleotides: 0, + Runner: nil, + } + + mutex := sync.Mutex{} + + runner := func(bb *Blackboard, task *Task) *Task { + body := task.Body.(obiiter.BioSequenceBatch) + + mutex.Lock() + cc.Variants += body.Len() + cc.Reads += body.Slice().Count() + cc.Nucleotides += body.Slice().Size() + mutex.Unlock() + + nt := task.GetNext(target, true, false) + return nt + } + + cc.Runner = runner + return cc +} + +func RecycleSequences(rescycleSequence bool, target string) DoTask { + return func(bb *Blackboard, task *Task) *Task { + body := task.Body.(obiiter.BioSequenceBatch) + // log.Warningf("With priority %d, Recycling %s[%d]", task.Priority, body.Source(), body.Order()) + body.Recycle(rescycleSequence) + return task.GetNext(target, false, false) + } +} diff --git a/pkg/obiblackboard/display_task.go b/pkg/obiblackboard/display_task.go new file mode 100644 index 0000000..9085781 --- /dev/null +++ b/pkg/obiblackboard/display_task.go @@ -0,0 +1,17 @@ +package obiblackboard + +import "fmt" + +func DisplayTask(bb *Blackboard, task *Task) *Task { + if task == nil { + return nil + } + + fmt.Printf("Task: %s:\n%v\n\n", task.Role, task.Body) + + return task +} + +func (runner DoTask) Display() DoTask { + return runner.CombineWith(DisplayTask) +} diff --git a/pkg/obiblackboard/doIterate.go b/pkg/obiblackboard/doIterate.go new file mode 100644 index 0000000..3de3b92 --- /dev/null +++ b/pkg/obiblackboard/doIterate.go @@ -0,0 +1,70 @@ +package obiblackboard + +import "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" + +type Iteration[T any] struct { + Index int + Value T +} + +// DoIterateSlice generates a DoTask function that iterates over a given slice and +// creates a new InitialTask for each element. The function takes in a slice of type +// T and a target string. It returns a DoTask function that can be used to execute +// the iteration. The DoTask function takes a Blackboard and a Task as input and +// returns a new Task. The Task's Role is set to the target string and its Body is +// set to an Iteration struct containing the index i and the element s[i] from the +// input slice. The iteration stops when the index i is equal to or greater than +// the length of the input slice. +// +// Parameters: +// - s: The slice of type T to iterate over. +// - target: The target string to set as the Task's Role. +// +// Return type: +// - DoTask: The DoTask function that can be used to execute the iteration. +func DoIterateSlice[T any](s []T, target string) DoTask { + n := len(s) + idx := obiutils.AtomicCounter() + + dt := func(bb *Blackboard, t *Task) *Task { + i := idx() + if i < n { + nt := t.GetNext(target, false, false) + nt.Body = Iteration[T]{i, s[i]} + return nt + } + return nil + } + + return dt +} + +// DoCount generates a DoTask function that iterates over a given integer n and +// creates a new InitialTask for each iteration. The function takes in an integer n +// and a target string. It returns a DoTask function that can be used to execute +// the iteration. The DoTask function takes a Blackboard and a Task as input and +// returns a new Task. The Task's Role is set to the target string and its Body is +// set to the current iteration index i. The iteration stops when the index i is +// equal to or greater than the input integer n. +// +// Parameters: +// - n: The integer to iterate over. +// - target: The target string to set as the Task's Role. +// +// Return type: +// - DoTask: The DoTask function that can be used to execute the iteration. +func DoCount(n int, target string) DoTask { + idx := obiutils.AtomicCounter() + + dt := func(bb *Blackboard, t *Task) *Task { + i := idx() + if i < n { + nt := t.GetNext(target, false, false) + nt.Body = i + return nt + } + return nil + } + + return dt +} diff --git a/pkg/obiblackboard/read_sequences.go b/pkg/obiblackboard/read_sequences.go new file mode 100644 index 0000000..e68a6a3 --- /dev/null +++ b/pkg/obiblackboard/read_sequences.go @@ -0,0 +1,534 @@ +package obiblackboard + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os" + "path" + "path/filepath" + "regexp" + "strings" + + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" + + "github.com/gabriel-vasile/mimetype" + "github.com/goombaio/orderedset" + log "github.com/sirupsen/logrus" +) + +func ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { + res, err := _ExpandListOfFiles(check_ext, filenames...) + + if err != nil { + log.Infof("Found %d files to process", len(res)) + } + + return res, err +} + +func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { + var err error + list_of_files := orderedset.NewOrderedSet() + for _, fn := range filenames { + // Special case for stdin + if fn == "-" { + list_of_files.Add(fn) + continue + } + + err = filepath.Walk(fn, + func(path string, info os.FileInfo, err error) error { + var e error + if info == nil { + return fmt.Errorf("cannot open path") + } + for info.Mode()&os.ModeSymlink == os.ModeSymlink { + path, e = filepath.EvalSymlinks(path) + if e != nil { + return e + } + + info, e = os.Stat(path) + if e != nil { + return e + } + } + + if info.IsDir() { + if path != fn { + subdir, e := ExpandListOfFiles(true, path) + if e != nil { + return e + } + for _, f := range subdir { + list_of_files.Add(f) + } + } else { + check_ext = true + } + } else { + if !check_ext || + strings.HasSuffix(path, "csv") || + strings.HasSuffix(path, "csv.gz") || + strings.HasSuffix(path, "fasta") || + strings.HasSuffix(path, "fasta.gz") || + strings.HasSuffix(path, "fastq") || + strings.HasSuffix(path, "fastq.gz") || + strings.HasSuffix(path, "seq") || + strings.HasSuffix(path, "seq.gz") || + strings.HasSuffix(path, "gb") || + strings.HasSuffix(path, "gb.gz") || + strings.HasSuffix(path, "dat") || + strings.HasSuffix(path, "dat.gz") || + strings.HasSuffix(path, "ecopcr") || + strings.HasSuffix(path, "ecopcr.gz") { + log.Debugf("Appending %s file\n", path) + list_of_files.Add(path) + } + } + return nil + }) + + if err != nil { + return nil, err + } + } + + res := make([]string, 0, list_of_files.Size()) + for _, v := range list_of_files.Values() { + res = append(res, v.(string)) + } + + return res, nil +} + +// OBIMimeTypeGuesser is a function that takes an io.Reader as input and guesses the MIME type of the data. +// It uses several detectors to identify specific file formats, such as FASTA, FASTQ, ecoPCR2, GenBank, and EMBL. +// The function reads data from the input stream and analyzes it using the mimetype library. +// It then returns the detected MIME type, a modified reader with the read data, and any error encountered during the process. +// +// The following file types are recognized: +// - "text/ecopcr": if the first line starts with "#@ecopcr-v2". +// - "text/fasta": if the first line starts with ">". +// - "text/fastq": if the first line starts with "@". +// - "text/embl": if the first line starts with "ID ". +// - "text/genbank": if the first line starts with "LOCUS ". +// - "text/genbank" (special case): if the first line "Genetic Sequence Data Bank" (for genbank release files). +// - "text/csv" +// +// Parameters: +// - stream: An io.Reader representing the input stream to read data from. +// +// Returns: +// - *mimetype.MIME: The detected MIME type of the data. +// - io.Reader: A modified reader with the read data. +// - error: Any error encountered during the process. +func OBIMimeTypeGuesser(stream io.Reader) (*mimetype.MIME, io.Reader, error) { + fastaDetector := func(raw []byte, limit uint32) bool { + ok, err := regexp.Match("^>[^ ]", raw) + return ok && err == nil + } + + fastqDetector := func(raw []byte, limit uint32) bool { + ok, err := regexp.Match("^@[^ ].*\n[^ ]+\n\\+", raw) + return ok && err == nil + } + + ecoPCR2Detector := func(raw []byte, limit uint32) bool { + ok := bytes.HasPrefix(raw, []byte("#@ecopcr-v2")) + return ok + } + + genbankDetector := func(raw []byte, limit uint32) bool { + ok2 := bytes.HasPrefix(raw, []byte("LOCUS ")) + ok1, err := regexp.Match("^[^ ]* +Genetic Sequence Data Bank *\n", raw) + return ok2 || (ok1 && err == nil) + } + + emblDetector := func(raw []byte, limit uint32) bool { + ok := bytes.HasPrefix(raw, []byte("ID ")) + return ok + } + + mimetype.Lookup("text/plain").Extend(fastaDetector, "text/fasta", ".fasta") + mimetype.Lookup("text/plain").Extend(fastqDetector, "text/fastq", ".fastq") + mimetype.Lookup("text/plain").Extend(ecoPCR2Detector, "text/ecopcr2", ".ecopcr") + mimetype.Lookup("text/plain").Extend(genbankDetector, "text/genbank", ".seq") + mimetype.Lookup("text/plain").Extend(emblDetector, "text/embl", ".dat") + + mimetype.Lookup("application/octet-stream").Extend(fastaDetector, "text/fasta", ".fasta") + mimetype.Lookup("application/octet-stream").Extend(fastqDetector, "text/fastq", ".fastq") + mimetype.Lookup("application/octet-stream").Extend(ecoPCR2Detector, "text/ecopcr2", ".ecopcr") + mimetype.Lookup("application/octet-stream").Extend(genbankDetector, "text/genbank", ".seq") + mimetype.Lookup("application/octet-stream").Extend(emblDetector, "text/embl", ".dat") + + // Create a buffer to store the read data + buf := make([]byte, 1024*128) + n, err := io.ReadFull(stream, buf) + + if err != nil && err != io.ErrUnexpectedEOF { + return nil, nil, err + } + + // Detect the MIME type using the mimetype library + mimeType := mimetype.Detect(buf) + if mimeType == nil { + return nil, nil, err + } + + // Create a new reader based on the read data + newReader := io.Reader(bytes.NewReader(buf[:n])) + + if err == nil { + newReader = io.MultiReader(newReader, stream) + } + + return mimeType, newReader, nil +} + +func TextChunkParser(parser obiformats.SeqFileChunkParser, target string) DoTask { + + return func(bb *Blackboard, task *Task) *Task { + chunk := task.Body.(obiformats.SeqFileChunk) + sequences, err := parser(chunk.Source, chunk.Raw) + + if err != nil { + return nil + } + + nt := task.GetNext(target, false, false) + nt.Body = obiiter.MakeBioSequenceBatch( + chunk.Source, + chunk.Order, + sequences) + + return nt + } +} + +func SeqAnnotParser(parser obiseq.SeqAnnotator, target string) DoTask { + worker := obiseq.SeqToSliceWorker(obiseq.AnnotatorToSeqWorker(parser), false) + + return func(bb *Blackboard, task *Task) *Task { + batch := task.Body.(obiiter.BioSequenceBatch) + sequences, err := worker(batch.Slice()) + + if err != nil { + log.Errorf("SeqAnnotParser on %s[%d]: %v", batch.Source(), batch.Order(), err) + return nil + } + + nt := task.GetNext(target, false, false) + nt.Body = obiiter.MakeBioSequenceBatch( + batch.Source(), + batch.Order(), + sequences, + ) + return nt + } + +} + +// OpenStream opens a file specified by the given filename and returns a reader for the file, +// the detected MIME type of the file, and any error encountered during the process. +// +// Parameters: +// - filename: A string representing the path to the file to be opened. If the filename is "-", +// the function opens the standard input stream. +// +// Returns: +// - io.Reader: A reader for the file. +// - *mimetype.MIME: The detected MIME type of the file. +// - error: Any error encountered during the process. +func OpenStream(filename string) (io.Reader, *mimetype.MIME, error) { + var stream io.Reader + var err error + if filename == "-" { + stream, err = obiformats.Buf(os.Stdin) + } else { + stream, err = obiformats.Ropen(filename) + } + + if err != nil { + return nil, nil, err + } + + // Detect the MIME type using the mimetype library + mimeType, newReader, err := OBIMimeTypeGuesser(stream) + if err != nil { + return nil, nil, err + } + + log.Infof("%s mime type: %s", filename, mimeType.String()) + + return bufio.NewReader(newReader), mimeType, nil +} + +type OpenedStreamBody struct { + Stream io.Reader + Filename string + Source string + Mime *mimetype.MIME + ToBeClosed bool +} + +func FilenameToStream(target string) DoTask { + + return func(bb *Blackboard, task *Task) *Task { + filename := task.Body.(Iteration[string]).Value + stream, mimetype, err := OpenStream(filename) + + if err != nil { + log.Errorf("Error opening %s: %v", filename, err) + return nil + } + + tobeclosed := filename != "-" + + switch mimetype.String() { + case "text/fasta", "text/fastq", "text/ecopcr2", "text/genbank", "text/embl", "text/csv": + nt := task.GetNext(target+":"+mimetype.String(), false, false) + nt.Body = OpenedStreamBody{ + Stream: stream, + Mime: mimetype, + Filename: filename, + Source: obiutils.RemoveAllExt((path.Base(filename))), + ToBeClosed: tobeclosed, + } + + return nt + + default: + log.Errorf("File %s (mime type %s) is an unsupported format", filename, mimetype.String()) + return nil + } + } +} + +type TextChunkIteratorBody struct { + Chunks obiformats.ChannelSeqFileChunk + Stream io.Reader + Source string + ToBeClosed bool +} + +func StreamToTextChunkReader(lastEntry obiformats.LastSeqRecord, target string) DoTask { + return func(bb *Blackboard, task *Task) *Task { + + body := task.Body.(OpenedStreamBody) + iterator := obiformats.ReadSeqFileChunk( + body.Source, + body.Stream, + make([]byte, 64*1024*1024), + lastEntry, + ) + + nt := task.GetNext(target, false, false) + nt.Body = TextChunkIteratorBody{ + Chunks: iterator, + Stream: body.Stream, + Source: body.Source, + ToBeClosed: body.ToBeClosed, + } + + return nt + } +} + +func TextChuckIterator(endTask *Task, target string) DoTask { + return func(bb *Blackboard, task *Task) *Task { + body := task.Body.(TextChunkIteratorBody) + + chunk, ok := <-body.Chunks + + if !ok { + return endTask + } + + var nt *Task + + if bb.Len() > bb.TargetSize { + nt = task.GetNext(target, false, true) + } else { + nt = task.GetNext(target, false, false) + bb.PushTask(task) + } + + nt.Body = chunk + return nt + } +} + +type SequenceIteratorBody struct { + Iterator obiiter.IBioSequence + Stream io.Reader + Source string + ToBeClosed bool +} + +func StreamToSequenceReader( + reader obiformats.SequenceReader, + options []obiformats.WithOption, + target string) DoTask { + return func(bb *Blackboard, task *Task) *Task { + body := task.Body.(OpenedStreamBody) + iterator, err := reader(body.Stream, options...) + + if err != nil { + log.Errorf("Error opening %s: %v", body.Filename, err) + return nil + } + + nt := task.GetNext(target, false, false) + nt.Body = SequenceIteratorBody{ + Iterator: iterator, + Stream: body.Stream, + Source: body.Source, + ToBeClosed: body.ToBeClosed, + } + + return nt + } +} + +func SequenceIterator(endTask *Task, target string) DoTask { + return func(bb *Blackboard, task *Task) *Task { + body := task.Body.(SequenceIteratorBody) + + if body.Iterator.Next() { + batch := body.Iterator.Get() + + var nt *Task + if bb.Len() > bb.TargetSize { + nt = task.GetNext(target, false, true) + } else { + nt = task.GetNext(target, false, false) + bb.PushTask(task) + } + + nt.Body = batch + + return nt + } else { + return endTask + } + } +} + +func (bb *Blackboard) ReadSequences(filepath []string, options ...obiformats.WithOption) { + + var err error + + opts := obiformats.MakeOptions(options) + + if len(filepath) == 0 { + filepath = []string{"-"} + } + + filepath, err = ExpandListOfFiles(false, filepath...) + + if err != nil { + log.Fatalf("Cannot expand list of files : %v", err) + } + + bb.RegisterRunner( + "initial", + DoIterateSlice(filepath, "filename"), + ) + + bb.RegisterRunner( + "filename", + FilenameToStream("stream"), + ) + + bb.RegisterRunner("stream:text/fasta", + StreamToTextChunkReader( + obiformats.EndOfLastFastaEntry, + "fasta_text_reader", + )) + + bb.RegisterRunner("fasta_text_reader", + TextChuckIterator(NewInitialTask(), "fasta_text_chunk"), + ) + + bb.RegisterRunner( + "fasta_text_chunk", + TextChunkParser( + obiformats.FastaChunkParser(), + "unannotated_sequences", + ), + ) + + bb.RegisterRunner("stream:text/fastq", + StreamToTextChunkReader(obiformats.EndOfLastFastqEntry, + "fastq_text_reader")) + + bb.RegisterRunner("fastq_text_reader", + TextChuckIterator(NewInitialTask(), "fastq_text_chunk"), + ) + + bb.RegisterRunner( + "fastq_text_chunk", + TextChunkParser( + obiformats.FastqChunkParser(obioptions.InputQualityShift()), + "unannotated_sequences", + ), + ) + + bb.RegisterRunner("stream:text/embl", + StreamToTextChunkReader(obiformats.EndOfLastFlatFileEntry, + "embl_text_reader")) + + bb.RegisterRunner("embl_text_reader", + TextChuckIterator(NewInitialTask(), "embl_text_chunk"), + ) + + bb.RegisterRunner( + "embl_text_chunk", + TextChunkParser( + obiformats.EmblChunkParser(opts.WithFeatureTable()), + "sequences", + ), + ) + + bb.RegisterRunner("stream:text/genbank", + StreamToTextChunkReader(obiformats.EndOfLastFlatFileEntry, + "genbank_text_reader")) + + bb.RegisterRunner("genbank_text_reader", + TextChuckIterator(NewInitialTask(), "genbank_text_chunk"), + ) + + bb.RegisterRunner( + "genbank_text_chunk", + TextChunkParser( + obiformats.GenbankChunkParser(opts.WithFeatureTable()), + "sequences", + ), + ) + + bb.RegisterRunner( + "unannotated_sequences", + SeqAnnotParser( + opts.ParseFastSeqHeader(), + "sequences", + ), + ) + + bb.RegisterRunner("stream:text/csv", + StreamToSequenceReader(obiformats.ReadCSV, options, "sequence_reader")) + + bb.RegisterRunner("stream:text/ecopcr2", + StreamToSequenceReader(obiformats.ReadEcoPCR, options, "sequence_reader")) + + bb.RegisterRunner("sequence_reader", + SequenceIterator(NewInitialTask(), "sequences"), + ) + +} diff --git a/pkg/obiblackboard/subtask.go b/pkg/obiblackboard/subtask.go new file mode 100644 index 0000000..a2709f5 --- /dev/null +++ b/pkg/obiblackboard/subtask.go @@ -0,0 +1,108 @@ +package obiblackboard + +import ( + "sync" + + log "github.com/sirupsen/logrus" +) + +// RepeatTask creates a new DoTask function that repeats the given task n times. +// +// It takes an integer n as input, which specifies the number of times the task should be repeated. +// It returns a new DoTask function that can be used to execute the repeated task. +// +// The returned DoTask function maintains a map of tasks to their counts and tasks. +// When a task is executed, it checks if the task has been executed before. +// If it has, it increments the count and returns the previously executed task. +// If it has not been executed before, it executes the task using the provided runner function. +// If the runner function returns nil, the task is not added to the task memory and nil is returned. +// If the runner function returns a non-nil task, it is added to the task memory with a count of 0. +// After executing the task, the function checks if the count is less than (n-1). +// If it is, the task is added back to the blackboard to be executed again. +// If the count is equal to (n-1), the task is removed from the task memory. +// Finally, the function returns the executed task. +func (runner DoTask) RepeatTask(n int) DoTask { + type memtask struct { + count int + task *Task + } + taskMemory := make(map[*Task]*memtask) + taskMemoryLock := sync.Mutex{} + + if n < 1 { + log.Fatalf("Cannot repeat a task less than once (n=%d)", n) + } + + st := func(bb *Blackboard, task *Task) *Task { + taskMemoryLock.Lock() + + mem, ok := taskMemory[task] + + if !ok { + nt := runner(bb, task) + + if nt == nil { + taskMemoryLock.Unlock() + return nt + } + + mem = &memtask{ + count: 0, + task: nt, + } + + taskMemory[task] = mem + } else { + mem.count++ + } + + taskMemoryLock.Unlock() + + if mem.count < (n - 1) { + bb.PushTask(task) + } + + if mem.count == (n - 1) { + taskMemoryLock.Lock() + delete(taskMemory, task) + taskMemoryLock.Unlock() + } + + return mem.task + } + + return st +} + +// CombineWith returns a new DoTask function that combines the given DoTask +// functions. The returned function applies the `other` function to the result +// of the `runner` function. The `bb` parameter is the Blackboard instance, +// and the `task` parameter is the Task instance. +// +// Parameters: +// - bb: The Blackboard instance. +// - task: The Task instance. +// +// Returns: +// - *Task: The result of applying the `other` function to the result of the +// `runner` function. +func (runner DoTask) CombineWith(other DoTask) DoTask { + return func(bb *Blackboard, task *Task) *Task { + return other(bb, runner(bb, task)) + } +} + +// SetTarget sets the target role for the task. +// +// Parameters: +// - target: The target role to set. +// +// Returns: +// - DoTask: The modified DoTask function. +func (runner DoTask) SetTarget(target string) DoTask { + return func(bb *Blackboard, task *Task) *Task { + nt := runner(bb, task) + nt.Role = target + return nt + } +} diff --git a/pkg/obiblackboard/task.go b/pkg/obiblackboard/task.go index eca9c61..6636369 100644 --- a/pkg/obiblackboard/task.go +++ b/pkg/obiblackboard/task.go @@ -1,21 +1,34 @@ package obiblackboard type Task struct { - Role string - Priority int - Body interface{} + Role string + SavedTask *Task + Priority int + Body interface{} } func NewInitialTask() *Task { return &Task{ - Role: "initial", - Priority: 0, - Body: nil, + Role: "initial", + SavedTask: nil, + Priority: 0, + Body: nil, } } -func (task *Task) GetNext() *Task { +func (task *Task) GetNext(target string, copy bool, save bool) *Task { t := NewInitialTask() t.Priority = task.Priority + 1 + t.Role = target + if copy { + t.Body = task.Body + } + + if save { + t.SavedTask = task + } else { + t.SavedTask = task.SavedTask + } + return t } diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go index c378c00..c6f177c 100644 --- a/pkg/obichunk/chunk_on_disk.go +++ b/pkg/obichunk/chunk_on_disk.go @@ -73,9 +73,9 @@ func ISequenceChunkOnDisk(iterator obiiter.IBioSequence, panic(err) } - chunck := iseq.Load() + source, chunck := iseq.Load() - newIter.Push(obiiter.MakeBioSequenceBatch(order, chunck)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunck)) log.Infof("Start processing of batch %d/%d : %d sequences", order, nbatch, len(chunck)) diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go index b2774c4..de90945 100644 --- a/pkg/obichunk/chunks.go +++ b/pkg/obichunk/chunks.go @@ -28,6 +28,7 @@ func ISequenceChunk(iterator obiiter.IBioSequence, jobDone := sync.WaitGroup{} chunks := make(map[int]*obiseq.BioSequenceSlice, 1000) + sources := make(map[int]string, 1000) for newflux := range dispatcher.News() { jobDone.Add(1) @@ -43,12 +44,18 @@ func ISequenceChunk(iterator obiiter.IBioSequence, chunks[newflux] = chunk lock.Unlock() + source := "" for data.Next() { b := data.Get() + source = b.Source() *chunk = append(*chunk, b.Slice()...) b.Recycle(false) } + lock.Lock() + sources[newflux] = source + lock.Unlock() + jobDone.Done() }(newflux) } @@ -56,10 +63,10 @@ func ISequenceChunk(iterator obiiter.IBioSequence, jobDone.Wait() order := 0 - for _, chunck := range chunks { + for i, chunck := range chunks { if len(*chunck) > 0 { - newIter.Push(obiiter.MakeBioSequenceBatch(order, *chunck)) + newIter.Push(obiiter.MakeBioSequenceBatch(sources[i], order, *chunck)) order++ } diff --git a/pkg/obichunk/subchunks.go b/pkg/obichunk/subchunks.go index 2c160f9..0d72cd7 100644 --- a/pkg/obichunk/subchunks.go +++ b/pkg/obichunk/subchunks.go @@ -90,7 +90,7 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence, for iterator.Next() { batch := iterator.Get() - + source := batch.Source() if batch.Len() > 1 { classifier.Reset() @@ -117,7 +117,7 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence, ss := obiseq.MakeBioSequenceSlice() for i, v := range ordered { if v.code != last { - newIter.Push(obiiter.MakeBioSequenceBatch(nextOrder(), ss)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, nextOrder(), ss)) ss = obiseq.MakeBioSequenceSlice() last = v.code } @@ -127,7 +127,7 @@ func ISequenceSubChunk(iterator obiiter.IBioSequence, } if len(ss) > 0 { - newIter.Push(obiiter.MakeBioSequenceBatch(nextOrder(), ss)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, nextOrder(), ss)) } } else { newIter.Push(batch.Reorder(nextOrder())) diff --git a/pkg/obiformats/csv_read.go b/pkg/obiformats/csv_read.go index 4a7604c..79bfb97 100644 --- a/pkg/obiformats/csv_read.go +++ b/pkg/obiformats/csv_read.go @@ -111,14 +111,14 @@ func _ParseCsvFile(source string, slice = append(slice, sequence) if len(slice) >= batchSize { - out.Push(obiiter.MakeBioSequenceBatch(o, slice)) + out.Push(obiiter.MakeBioSequenceBatch(source, o, slice)) o++ slice = obiseq.MakeBioSequenceSlice() } } if len(slice) > 0 { - out.Push(obiiter.MakeBioSequenceBatch(o, slice)) + out.Push(obiiter.MakeBioSequenceBatch(source, o, slice)) } out.Done() diff --git a/pkg/obiformats/ecopcr_read.go b/pkg/obiformats/ecopcr_read.go index 62b0ceb..5706805 100644 --- a/pkg/obiformats/ecopcr_read.go +++ b/pkg/obiformats/ecopcr_read.go @@ -122,7 +122,7 @@ func __read_ecopcr_bioseq__(file *__ecopcr_file__) (*obiseq.BioSequence, error) return bseq, nil } -func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { +func ReadEcoPCR(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { tag := make([]byte, 11) n, _ := reader.Read(tag) @@ -187,7 +187,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { slice = append(slice, seq) ii++ if ii >= opt.BatchSize() { - newIter.Push(obiiter.MakeBioSequenceBatch(i, slice)) + newIter.Push(obiiter.MakeBioSequenceBatch(opt.Source(), i, slice)) slice = obiseq.MakeBioSequenceSlice() i++ ii = 0 @@ -198,7 +198,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { } if len(slice) > 0 { - newIter.Push(obiiter.MakeBioSequenceBatch(i, slice)) + newIter.Push(obiiter.MakeBioSequenceBatch(opt.Source(), i, slice)) } newIter.Done() @@ -213,7 +213,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { newIter = newIter.CompleteFileIterator() } - return newIter + return newIter, nil } func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { @@ -235,5 +235,5 @@ func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSeq reader = greader } - return ReadEcoPCR(reader, options...), nil + return ReadEcoPCR(reader, options...) } diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 88490f4..dddf1e5 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -15,7 +15,7 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) -// _EndOfLastEntry finds the index of the last entry in the given byte slice 'buff' +// EndOfLastFlatFileEntry finds the index of the last entry in the given byte slice 'buff' // using a pattern match of the form: // ?//? // where and are the ASCII codes for carriage return and line feed, @@ -27,7 +27,7 @@ import ( // // Returns: // int - the index of the end of the last entry or -1 if no match is found. -func _EndOfLastEntry(buff []byte) int { +func EndOfLastFlatFileEntry(buff []byte) int { // 6 5 43 2 1 // ?//? var i int @@ -87,15 +87,9 @@ func _EndOfLastEntry(buff []byte) int { return -1 } -func _ParseEmblFile(source string, input ChannelSeqFileChunk, - out obiiter.IBioSequence, - withFeatureTable bool, - batch_size int, - total_seq_size int) { - - for chunks := range input { - scanner := bufio.NewScanner(chunks.raw) - order := chunks.order +func EmblChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { + parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { + scanner := bufio.NewScanner(input) sequences := make(obiseq.BioSequenceSlice, 0, 100) id := "" scientificName := "" @@ -156,7 +150,31 @@ func _ParseEmblFile(source string, input ChannelSeqFileChunk, seqBytes = new(bytes.Buffer) } } - out.Push(obiiter.MakeBioSequenceBatch(order, sequences)) + + return sequences, nil + + } + + return parser +} + +func _ParseEmblFile( + input ChannelSeqFileChunk, + out obiiter.IBioSequence, + withFeatureTable bool, +) { + + parser := EmblChunkParser(withFeatureTable) + + for chunks := range input { + order := chunks.Order + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("%s : Cannot parse the embl file : %v", chunks.Source, err) + } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, order, sequences)) } out.Done() @@ -166,12 +184,18 @@ func _ParseEmblFile(source string, input ChannelSeqFileChunk, // 6 5 43 2 1 // // ?//? -func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { +func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) buff := make([]byte, 1024*1024*1024*256) - entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry) + entry_channel := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFlatFileEntry, + ) + newIter := obiiter.MakeIBioSequence() nworkers := opt.ParallelWorkers() @@ -179,10 +203,11 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { newIter.Add(1) - go _ParseEmblFile(opt.Source(), entry_channel, newIter, + go _ParseEmblFile( + entry_channel, + newIter, opt.WithFeatureTable(), - opt.BatchSize(), - opt.TotalSeqSize()) + ) } go func() { @@ -193,7 +218,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { newIter = newIter.CompleteFileIterator() } - return newIter + return newIter, nil } func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { @@ -214,5 +239,5 @@ func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSeque return obiiter.NilIBioSequence, err } - return ReadEMBL(reader, options...), nil + return ReadEMBL(reader, options...) } diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index 4eb6656..8e9408c 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -14,7 +14,7 @@ import ( log "github.com/sirupsen/logrus" ) -func _EndOfLastFastaEntry(buffer []byte) int { +func EndOfLastFastaEntry(buffer []byte) int { var i int imax := len(buffer) @@ -39,24 +39,18 @@ func _EndOfLastFastaEntry(buffer []byte) int { return last } -func _ParseFastaFile(source string, - input ChannelSeqFileChunk, - out obiiter.IBioSequence, - no_order bool, - batch_size int, - chunck_order func() int, -) { +func FastaChunkParser() func(string, io.Reader) (obiseq.BioSequenceSlice, error) { - var identifier string - var definition string + parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { + var identifier string + var definition string - idBytes := bytes.Buffer{} - defBytes := bytes.Buffer{} - seqBytes := bytes.Buffer{} + idBytes := bytes.Buffer{} + defBytes := bytes.Buffer{} + seqBytes := bytes.Buffer{} - for chunks := range input { state := 0 - scanner := bufio.NewReader(chunks.raw) + scanner := bufio.NewReader(input) start, _ := scanner.Peek(20) if start[0] != '>' { log.Fatalf("%s : first character is not '>'", string(start)) @@ -64,7 +58,8 @@ func _ParseFastaFile(source string, if start[1] == ' ' { log.Fatalf("%s :Strange", string(start)) } - sequences := make(obiseq.BioSequenceSlice, 0, batch_size) + + sequences := obiseq.MakeBioSequenceSlice(100)[:0] previous := byte(0) @@ -160,12 +155,6 @@ func _ParseFastaFile(source string, s := obiseq.NewBioSequence(identifier, rawseq, definition) s.SetSource(source) sequences = append(sequences, s) - if no_order { - if len(sequences) == batch_size { - out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) - sequences = make(obiseq.BioSequenceSlice, 0, batch_size) - } - } state = 1 } else { // Error @@ -209,13 +198,28 @@ func _ParseFastaFile(source string, sequences = append(sequences, s) } - if len(sequences) > 0 { - co := chunks.order - if no_order { - co = chunck_order() - } - out.Push(obiiter.MakeBioSequenceBatch(co, sequences)) + return sequences, nil + } + + return parser +} + +func _ParseFastaFile( + input ChannelSeqFileChunk, + out obiiter.IBioSequence, +) { + + parser := FastaChunkParser() + + for chunks := range input { + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("File %s : Cannot parse the fasta file : %v", chunks.Source, err) } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences)) + } out.Done() @@ -230,17 +234,16 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e buff := make([]byte, 1024*1024*1024) - chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastaEntry) - chunck_order := obiutils.AtomicCounter() + chkchan := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFastaEntry, + ) for i := 0; i < nworker; i++ { out.Add(1) - go _ParseFastaFile(opt.Source(), - chkchan, - out, - opt.NoOrder(), - opt.BatchSize(), - chunck_order) + go _ParseFastaFile(chkchan, out) } go func() { @@ -282,7 +285,7 @@ func ReadFastaFromFile(filename string, options ...WithOption) (obiiter.IBioSequ } func ReadFastaFromStdin(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { - options = append(options, OptionsSource(obiutils.RemoveAllExt("stdin"))) + options = append(options, OptionsSource("stdin")) input, err := Buf(os.Stdin) if err == ErrNoContent { diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index 1bfcb92..64d37b8 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -14,7 +14,7 @@ import ( log "github.com/sirupsen/logrus" ) -func _EndOfLastFastqEntry(buffer []byte) int { +func EndOfLastFastqEntry(buffer []byte) int { var i int imax := len(buffer) @@ -117,27 +117,20 @@ func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality out.SetQualities(q) } -func _ParseFastqFile(source string, - input ChannelSeqFileChunk, - out obiiter.IBioSequence, - quality_shift byte, - no_order bool, - batch_size int, - chunck_order func() int, -) { +func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { + parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { - var identifier string - var definition string + var identifier string + var definition string - idBytes := bytes.Buffer{} - defBytes := bytes.Buffer{} - qualBytes := bytes.Buffer{} - seqBytes := bytes.Buffer{} + idBytes := bytes.Buffer{} + defBytes := bytes.Buffer{} + qualBytes := bytes.Buffer{} + seqBytes := bytes.Buffer{} - for chunks := range input { state := 0 - scanner := bufio.NewReader(chunks.raw) - sequences := make(obiseq.BioSequenceSlice, 0, 100) + scanner := bufio.NewReader(input) + sequences := obiseq.MakeBioSequenceSlice(100)[:0] previous := byte(0) for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() { @@ -257,14 +250,6 @@ func _ParseFastqFile(source string, case 10: if is_end_of_line { _storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift) - - if no_order { - if len(sequences) == batch_size { - out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) - sequences = make(obiseq.BioSequenceSlice, 0, batch_size) - } - } - state = 11 } else { qualBytes.WriteByte(C) @@ -288,14 +273,31 @@ func _ParseFastqFile(source string, _storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift) state = 1 } - - co := chunks.order - if no_order { - co = chunck_order() - } - out.Push(obiiter.MakeBioSequenceBatch(co, sequences)) } + return sequences, nil + } + + return parser +} + +func _ParseFastqFile( + input ChannelSeqFileChunk, + out obiiter.IBioSequence, + quality_shift byte, +) { + + parser := FastqChunkParser(quality_shift) + + for chunks := range input { + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err) + } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences)) + } out.Done() @@ -307,21 +309,23 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e out := obiiter.MakeIBioSequence() nworker := opt.ParallelWorkers() - chunkorder := obiutils.AtomicCounter() buff := make([]byte, 1024*1024*1024) - chkchan := ReadSeqFileChunk(reader, buff, _EndOfLastFastqEntry) + chkchan := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFastqEntry, + ) for i := 0; i < nworker; i++ { out.Add(1) - go _ParseFastqFile(opt.Source(), + go _ParseFastqFile( chkchan, out, byte(obioptions.InputQualityShift()), - opt.NoOrder(), - opt.BatchSize(), - chunkorder) + ) } go func() { diff --git a/pkg/obiformats/fastseq_read.go b/pkg/obiformats/fastseq_read.go index 7a9c2c6..f43e361 100644 --- a/pkg/obiformats/fastseq_read.go +++ b/pkg/obiformats/fastseq_read.go @@ -69,7 +69,7 @@ func _FastseqReader(source string, slice = append(slice, rep) ii++ if ii >= batch_size { - iterator.Push(obiiter.MakeBioSequenceBatch(i, slice)) + iterator.Push(obiiter.MakeBioSequenceBatch(source, i, slice)) slice = obiseq.MakeBioSequenceSlice() i++ ii = 0 @@ -77,7 +77,7 @@ func _FastseqReader(source string, } if len(slice) > 0 { - iterator.Push(obiiter.MakeBioSequenceBatch(i, slice)) + iterator.Push(obiiter.MakeBioSequenceBatch(source, i, slice)) } iterator.Done() diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 0940d61..f1b261a 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -29,27 +29,11 @@ const ( var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp") -func _ParseGenbankFile(source string, - input ChannelSeqFileChunk, - out obiiter.IBioSequence, - chunck_order func() int, - withFeatureTable bool, - batch_size int, - total_seq_size int) { - state := inHeader - previous_chunk := -1 - - for chunks := range input { - - if state != inHeader { - log.Fatalf("Unexpected state %d starting new chunk (id = %d, previous_chunk = %d)", - state, chunks.order, previous_chunk) - } - - previous_chunk = chunks.order - scanner := bufio.NewReader(chunks.raw) - sequences := make(obiseq.BioSequenceSlice, 0, 100) - sumlength := 0 +func GenbankChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) { + return func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) { + state := inHeader + scanner := bufio.NewReader(input) + sequences := obiseq.MakeBioSequenceSlice(100)[:0] id := "" lseq := -1 scientificName := "" @@ -64,7 +48,7 @@ func _ParseGenbankFile(source string, nl++ line = string(bline) if is_prefix || len(line) > 100 { - log.Fatalf("Chunk %d : Line too long: %s", chunks.order, line) + log.Fatalf("From %s:Line too long: %s", source, line) } processed := false for !processed { @@ -165,15 +149,6 @@ func _ParseGenbankFile(source string, // sequence.Len(), seqBytes.Len()) sequences = append(sequences, sequence) - sumlength += sequence.Len() - - if len(sequences) == batch_size || sumlength > total_seq_size { - oo := chunck_order() - log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences") - out.Push(obiiter.MakeBioSequenceBatch(oo, sequences)) - sequences = make(obiseq.BioSequenceSlice, 0, 100) - sumlength = 0 - } defBytes = bytes.NewBuffer(obiseq.GetSlice(200)) featBytes = new(bytes.Buffer) @@ -219,11 +194,24 @@ func _ParseGenbankFile(source string, } - if len(sequences) > 0 { - oo := chunck_order() - log.Debugln("Pushing sequence batch ", oo, " with ", len(sequences), " sequences") - out.Push(obiiter.MakeBioSequenceBatch(oo, sequences)) + return sequences, nil + } +} + +func _ParseGenbankFile(input ChannelSeqFileChunk, + out obiiter.IBioSequence, + withFeatureTable bool) { + + parser := GenbankChunkParser(withFeatureTable) + + for chunks := range input { + sequences, err := parser(chunks.Source, chunks.Raw) + + if err != nil { + log.Fatalf("File %s : Cannot parse the genbank file : %v", chunks.Source, err) } + + out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences)) } log.Debug("End of the Genbank thread") @@ -231,26 +219,31 @@ func _ParseGenbankFile(source string, } -func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence { +func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) // entry_channel := make(chan _FileChunk) buff := make([]byte, 1024*1024*1024*256) - entry_channel := ReadSeqFileChunk(reader, buff, _EndOfLastEntry) + entry_channel := ReadSeqFileChunk( + opt.Source(), + reader, + buff, + EndOfLastFlatFileEntry, + ) + newIter := obiiter.MakeIBioSequence() nworkers := opt.ParallelWorkers() - chunck_order := obiutils.AtomicCounter() // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { newIter.Add(1) - go _ParseGenbankFile(opt.Source(), - entry_channel, newIter, chunck_order, + go _ParseGenbankFile( + entry_channel, + newIter, opt.WithFeatureTable(), - opt.BatchSize(), - opt.TotalSeqSize()) + ) } // go _ReadFlatFileChunk(reader, entry_channel) @@ -264,7 +257,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence { newIter = newIter.CompleteFileIterator() } - return newIter + return newIter, nil } func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { @@ -285,5 +278,5 @@ func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSe return obiiter.NilIBioSequence, err } - return ReadGenbank(reader, options...), nil + return ReadGenbank(reader, options...) } diff --git a/pkg/obiformats/seqfile_chunck_read.go b/pkg/obiformats/seqfile_chunck_read.go index 2eb57c4..f1ba355 100644 --- a/pkg/obiformats/seqfile_chunck_read.go +++ b/pkg/obiformats/seqfile_chunck_read.go @@ -5,14 +5,18 @@ import ( "io" "slices" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" log "github.com/sirupsen/logrus" ) -var _FileChunkSize = 1 << 28 +var _FileChunkSize = 1024 * 1024 * 10 + +type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error) type SeqFileChunk struct { - raw io.Reader - order int + Source string + Raw io.Reader + Order int } type ChannelSeqFileChunk chan SeqFileChunk @@ -32,7 +36,9 @@ type LastSeqRecord func([]byte) int // // Returns: // None -func ReadSeqFileChunk(reader io.Reader, +func ReadSeqFileChunk( + source string, + reader io.Reader, buff []byte, splitter LastSeqRecord) ChannelSeqFileChunk { var err error @@ -88,7 +94,7 @@ func ReadSeqFileChunk(reader io.Reader, if len(buff) > 0 { io := bytes.NewBuffer(slices.Clone(buff)) - chunk_channel <- SeqFileChunk{io, i} + chunk_channel <- SeqFileChunk{source, io, i} i++ } @@ -112,7 +118,7 @@ func ReadSeqFileChunk(reader io.Reader, // Send the last chunk to the channel if len(buff) > 0 { io := bytes.NewBuffer(slices.Clone(buff)) - chunk_channel <- SeqFileChunk{io, i} + chunk_channel <- SeqFileChunk{source, io, i} } // Close the readers channel when the end of the file is reached diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index 97d820d..2caea86 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -15,6 +15,8 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" ) +type SequenceReader func(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) + // OBIMimeTypeGuesser is a function that takes an io.Reader as input and guesses the MIME type of the data. // It uses several detectors to identify specific file formats, such as FASTA, FASTQ, ecoPCR2, GenBank, and EMBL. // The function reads data from the input stream and analyzes it using the mimetype library. @@ -172,11 +174,11 @@ func ReadSequencesFromFile(filename string, case "text/fasta": return ReadFasta(reader, options...) case "text/ecopcr2": - return ReadEcoPCR(reader, options...), nil + return ReadEcoPCR(reader, options...) case "text/embl": - return ReadEMBL(reader, options...), nil + return ReadEMBL(reader, options...) case "text/genbank": - return ReadGenbank(reader, options...), nil + return ReadGenbank(reader, options...) case "text/csv": return ReadCSV(reader, options...) default: diff --git a/pkg/obiiter/batch.go b/pkg/obiiter/batch.go index 17f53eb..5c9bdc1 100644 --- a/pkg/obiiter/batch.go +++ b/pkg/obiiter/batch.go @@ -3,50 +3,118 @@ package obiiter import "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" type BioSequenceBatch struct { - slice obiseq.BioSequenceSlice - order int + source string + slice obiseq.BioSequenceSlice + order int } -var NilBioSequenceBatch = BioSequenceBatch{nil, -1} +var NilBioSequenceBatch = BioSequenceBatch{"", nil, -1} -func MakeBioSequenceBatch(order int, +// MakeBioSequenceBatch creates a new BioSequenceBatch with the given source, order, and sequences. +// +// Parameters: +// - source: The source of the BioSequenceBatch. +// - order: The order of the BioSequenceBatch. +// - sequences: The slice of BioSequence. +// +// Returns: +// - BioSequenceBatch: The newly created BioSequenceBatch. +func MakeBioSequenceBatch( + source string, + order int, sequences obiseq.BioSequenceSlice) BioSequenceBatch { return BioSequenceBatch{ - slice: sequences, - order: order, + source: source, + slice: sequences, + order: order, } } +// Order returns the order of the BioSequenceBatch. +// +// Returns: +// - int: The order of the BioSequenceBatch. func (batch BioSequenceBatch) Order() int { return batch.order } +// Source returns the source of the BioSequenceBatch. +// +// Returns: +// - string: The source of the BioSequenceBatch. +func (batch BioSequenceBatch) Source() string { + return batch.source +} + +// Reorder updates the order of the BioSequenceBatch and returns the updated batch. +// +// Parameters: +// - newOrder: The new order value to assign to the BioSequenceBatch. +// +// Returns: +// - BioSequenceBatch: The updated BioSequenceBatch with the new order value. func (batch BioSequenceBatch) Reorder(newOrder int) BioSequenceBatch { batch.order = newOrder return batch } +// Slice returns the BioSequenceSlice contained within the BioSequenceBatch. +// +// Returns: +// - obiseq.BioSequenceSlice: The BioSequenceSlice contained within the BioSequenceBatch. func (batch BioSequenceBatch) Slice() obiseq.BioSequenceSlice { return batch.slice } +// Len returns the number of BioSequence elements in the given BioSequenceBatch. +// +// Parameters: +// - batch: The BioSequenceBatch to get the length from. +// +// Return type: +// - int: The number of BioSequence elements in the BioSequenceBatch. func (batch BioSequenceBatch) Len() int { return len(batch.slice) } +// NotEmpty returns whether the BioSequenceBatch is empty or not. +// +// It checks if the BioSequenceSlice contained within the BioSequenceBatch is not empty. +// +// Returns: +// - bool: True if the BioSequenceBatch is not empty, false otherwise. func (batch BioSequenceBatch) NotEmpty() bool { return batch.slice.NotEmpty() } +// Pop0 returns and removes the first element of the BioSequenceBatch. +// +// It does not take any parameters. +// It returns a pointer to a BioSequence object. func (batch BioSequenceBatch) Pop0() *obiseq.BioSequence { return batch.slice.Pop0() } +// IsNil checks if the BioSequenceBatch's slice is nil. +// +// This function takes a BioSequenceBatch as a parameter and returns a boolean value indicating whether the slice of the BioSequenceBatch is nil or not. +// +// Parameters: +// - batch: The BioSequenceBatch to check for nil slice. +// +// Returns: +// - bool: True if the BioSequenceBatch's slice is nil, false otherwise. func (batch BioSequenceBatch) IsNil() bool { return batch.slice == nil } +// Recycle cleans up the BioSequenceBatch by recycling its elements and resetting its slice. +// +// If including_seq is true, each element of the BioSequenceBatch's slice is recycled using the Recycle method, +// and then set to nil. If including_seq is false, each element is simply set to nil. +// +// This function does not return anything. func (batch BioSequenceBatch) Recycle(including_seq bool) { batch.slice.Recycle(including_seq) batch.slice = nil diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index b82ee2c..4761c5b 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -424,9 +424,11 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { order := 0 iterator = iterator.SortBatches() buffer := obiseq.MakeBioSequenceSlice() + source := "" for iterator.Next() { seqs := iterator.Get() + source = seqs.Source() lc := seqs.Len() remains := lc i := 0 @@ -436,7 +438,7 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { remains = lc - to_push - i buffer = append(buffer, seqs.Slice()[i:(i+to_push)]...) if len(buffer) == size { - newIter.Push(MakeBioSequenceBatch(order, buffer)) + newIter.Push(MakeBioSequenceBatch(source, order, buffer)) log.Debugf("Rebatch #%d pushd", order) order++ buffer = obiseq.MakeBioSequenceSlice() @@ -447,7 +449,7 @@ func (iterator IBioSequence) Rebatch(size int) IBioSequence { } log.Debug("End of the rebatch loop") if len(buffer) > 0 { - newIter.Push(MakeBioSequenceBatch(order, buffer)) + newIter.Push(MakeBioSequenceBatch(source, order, buffer)) log.Debugf("Final Rebatch #%d pushd", order) } @@ -526,12 +528,14 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, trueOrder := 0 falseOrder := 0 iterator = iterator.SortBatches() + source := "" trueSlice := obiseq.MakeBioSequenceSlice() falseSlice := obiseq.MakeBioSequenceSlice() for iterator.Next() { seqs := iterator.Get() + source = seqs.Source() for _, s := range seqs.slice { if predicate(s) { trueSlice = append(trueSlice, s) @@ -540,13 +544,13 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, } if len(trueSlice) == size { - trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) + trueIter.Push(MakeBioSequenceBatch(source, trueOrder, trueSlice)) trueOrder++ trueSlice = obiseq.MakeBioSequenceSlice() } if len(falseSlice) == size { - falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) + falseIter.Push(MakeBioSequenceBatch(source, falseOrder, falseSlice)) falseOrder++ falseSlice = obiseq.MakeBioSequenceSlice() } @@ -555,11 +559,11 @@ func (iterator IBioSequence) DivideOn(predicate obiseq.SequencePredicate, } if len(trueSlice) > 0 { - trueIter.Push(MakeBioSequenceBatch(trueOrder, trueSlice)) + trueIter.Push(MakeBioSequenceBatch(source, trueOrder, trueSlice)) } if len(falseSlice) > 0 { - falseIter.Push(MakeBioSequenceBatch(falseOrder, falseSlice)) + falseIter.Push(MakeBioSequenceBatch(source, falseOrder, falseSlice)) } trueIter.Done() @@ -686,17 +690,22 @@ func (iterator IBioSequence) FilterAnd(predicate obiseq.SequencePredicate, // Load all sequences availables from an IBioSequenceBatch iterator into // a large obiseq.BioSequenceSlice. -func (iterator IBioSequence) Load() obiseq.BioSequenceSlice { +func (iterator IBioSequence) Load() (string, obiseq.BioSequenceSlice) { chunck := obiseq.MakeBioSequenceSlice() + source := "" + for iterator.Next() { b := iterator.Get() + if source == "" { + source = b.Source() + } log.Debugf("append %d sequences", b.Len()) chunck = append(chunck, b.Slice()...) b.Recycle(false) } - return chunck + return source, chunck } // CompleteFileIterator generates a new iterator for reading a complete file. @@ -718,10 +727,10 @@ func (iterator IBioSequence) CompleteFileIterator() IBioSequence { }() go func() { - slice := iterator.Load() + source, slice := iterator.Load() log.Printf("A batch of %d sequence is read", len(slice)) if len(slice) > 0 { - newIter.Push(MakeBioSequenceBatch(0, slice)) + newIter.Push(MakeBioSequenceBatch(source, 0, slice)) } newIter.Done() }() @@ -735,7 +744,7 @@ func (iterator IBioSequence) CompleteFileIterator() IBioSequence { // It takes a slice of BioSequence objects, and returns an iterator that will return batches of // BioSequence objects -func IBatchOver(data obiseq.BioSequenceSlice, +func IBatchOver(source string, data obiseq.BioSequenceSlice, size int, sizes ...int) IBioSequence { newIter := MakeIBioSequence() @@ -755,7 +764,7 @@ func IBatchOver(data obiseq.BioSequenceSlice, if next > ldata { next = ldata } - newIter.Push(MakeBioSequenceBatch(batchid, data[i:next])) + newIter.Push(MakeBioSequenceBatch(source, batchid, data[i:next])) batchid++ } diff --git a/pkg/obiiter/distribute.go b/pkg/obiiter/distribute.go index ec540eb..4c61d58 100644 --- a/pkg/obiiter/distribute.go +++ b/pkg/obiiter/distribute.go @@ -61,9 +61,12 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz go func() { iterator = iterator.SortBatches() + source := "" for iterator.Next() { seqs := iterator.Get() + source = seqs.Source() + for _, s := range seqs.Slice() { key := class.Code(s) slice, ok := slices[key] @@ -84,7 +87,7 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz *slice = append(*slice, s) if len(*slice) == batchsize { - outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice)) + outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice)) orders[key]++ s := obiseq.MakeBioSequenceSlice() slices[key] = &s @@ -95,7 +98,7 @@ func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, siz for key, slice := range slices { if len(*slice) > 0 { - outputs[key].Push(MakeBioSequenceBatch(orders[key], *slice)) + outputs[key].Push(MakeBioSequenceBatch(source, orders[key], *slice)) } } diff --git a/pkg/obiiter/fragment.go b/pkg/obiiter/fragment.go index bbddd23..2c09013 100644 --- a/pkg/obiiter/fragment.go +++ b/pkg/obiiter/fragment.go @@ -20,9 +20,11 @@ func IFragments(minsize, length, overlap, size, nworkers int) Pipeable { }() f := func(iterator IBioSequence, id int) { + source := "" for iterator.Next() { news := obiseq.MakeBioSequenceSlice() sl := iterator.Get() + source = sl.Source() for _, s := range sl.Slice() { if s.Len() <= minsize { @@ -52,7 +54,7 @@ func IFragments(minsize, length, overlap, size, nworkers int) Pipeable { s.Recycle() } } // End of the slice loop - newiter.Push(MakeBioSequenceBatch(sl.Order(), news)) + newiter.Push(MakeBioSequenceBatch(source, sl.Order(), news)) sl.Recycle(false) } // End of the iterator loop diff --git a/pkg/obiiter/paired.go b/pkg/obiiter/paired.go index 862537b..d8c0574 100644 --- a/pkg/obiiter/paired.go +++ b/pkg/obiiter/paired.go @@ -9,9 +9,11 @@ func (b BioSequenceBatch) IsPaired() bool { } func (b BioSequenceBatch) PairedWith() BioSequenceBatch { - return MakeBioSequenceBatch(b.order, - *b.slice.PairedWith()) - + return MakeBioSequenceBatch( + b.Source(), + b.order, + *b.slice.PairedWith(), + ) } func (b *BioSequenceBatch) PairTo(p *BioSequenceBatch) { diff --git a/pkg/obilua/lua.go b/pkg/obilua/lua.go index 9cdf82c..1855e44 100644 --- a/pkg/obilua/lua.go +++ b/pkg/obilua/lua.go @@ -225,7 +225,7 @@ func LuaProcessor(iterator obiiter.IBioSequence, name, program string, breakOnEr } } - newIter.Push(obiiter.MakeBioSequenceBatch(seqs.Order(), ns)) + newIter.Push(obiiter.MakeBioSequenceBatch(seqs.Source(), seqs.Order(), ns)) seqs.Recycle(false) } diff --git a/pkg/obioptions/options.go b/pkg/obioptions/options.go index 773dcf3..25bd8dc 100644 --- a/pkg/obioptions/options.go +++ b/pkg/obioptions/options.go @@ -14,7 +14,7 @@ import ( ) var _Debug = false -var _WorkerPerCore = 2.0 +var _WorkerPerCore = 1.0 var _ReadWorkerPerCore = 0.5 var _WriteWorkerPerCore = 0.25 var _StrictReadWorker = 0 diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 9eae625..dfb6d5f 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,8 +7,8 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "4d86483" -var _Version = "Release 4.2.0" +var _Commit = "dfe2fc3" +var _Version = "" // Version returns the version of the obitools package. // diff --git a/pkg/obiseq/biosequenceslice.go b/pkg/obiseq/biosequenceslice.go index 5e56ed5..6645c31 100644 --- a/pkg/obiseq/biosequenceslice.go +++ b/pkg/obiseq/biosequenceslice.go @@ -186,6 +186,21 @@ func (s BioSequenceSlice) Size() int { return size } +// Count calculates the total count of all BioSequence elements in the BioSequenceSlice. +// +// It iterates over each BioSequence in the slice and adds the count of each BioSequence to the total count. +// +// Returns the total count as an integer. +func (s BioSequenceSlice) Count() int { + size := 0 + + for _, s := range s { + size += s.Count() + } + + return size +} + func (s BioSequenceSlice) AttributeKeys(skip_map bool) obiutils.Set[string] { keys := obiutils.MakeSet[string]() diff --git a/pkg/obitools/obiclean/obiclean.go b/pkg/obitools/obiclean/obiclean.go index de90af1..98530ea 100644 --- a/pkg/obitools/obiclean/obiclean.go +++ b/pkg/obitools/obiclean/obiclean.go @@ -57,7 +57,7 @@ func buildSamples(dataset obiseq.BioSequenceSlice, return samples } -func annotateOBIClean(dataset obiseq.BioSequenceSlice, +func annotateOBIClean(source string, dataset obiseq.BioSequenceSlice, sample map[string]*([]*seqPCR), tag, NAValue string) obiiter.IBioSequence { batchsize := 1000 @@ -91,7 +91,7 @@ func annotateOBIClean(dataset obiseq.BioSequenceSlice, return data, nil } - iter := obiiter.IBatchOver(dataset, batchsize) + iter := obiiter.IBatchOver(source, dataset, batchsize) riter := iter.MakeISliceWorker(annot, false) return riter @@ -288,7 +288,7 @@ func Weight(sequence *obiseq.BioSequence) map[string]int { func CLIOBIClean(itertator obiiter.IBioSequence) obiiter.IBioSequence { - db := itertator.Load() + source, db := itertator.Load() log.Infof("Sequence dataset of %d sequeences loaded\n", len(db)) @@ -365,7 +365,7 @@ func CLIOBIClean(itertator obiiter.IBioSequence) obiiter.IBioSequence { EmpiricalDistCsv(RatioTableFilename(), all_ratio) } - iter := annotateOBIClean(db, samples, SampleAttribute(), "NA") + iter := annotateOBIClean(source, db, samples, SampleAttribute(), "NA") if OnlyHead() { iter = iter.FilterOn(IsHead, 1000) diff --git a/pkg/obitools/obicleandb/obicleandb.go b/pkg/obitools/obicleandb/obicleandb.go index 6d19643..b93c3c6 100644 --- a/pkg/obitools/obicleandb/obicleandb.go +++ b/pkg/obitools/obicleandb/obicleandb.go @@ -274,11 +274,11 @@ func ICleanDB(itertator obiiter.IBioSequence) obiiter.IBioSequence { // obioptions.CLIParallelWorkers(), // ) - references := annotated.Load() + source, references := annotated.Load() mannwithney := MakeSequenceFamilyGenusWorker(references) - partof := obiiter.IBatchOver(references, + partof := obiiter.IBatchOver(source, references, obioptions.CLIBatchSize()) // genera_iterator, err := obichunk.ISequenceChunk( diff --git a/pkg/obitools/obiconsensus/obiconsensus.go b/pkg/obitools/obiconsensus/obiconsensus.go index 544b3bd..a8e0bda 100644 --- a/pkg/obitools/obiconsensus/obiconsensus.go +++ b/pkg/obitools/obiconsensus/obiconsensus.go @@ -46,7 +46,12 @@ func BuildConsensus(seqs obiseq.BioSequenceSlice, if err == nil { defer fasta.Close() - fasta.Write(obiformats.FormatFastaBatch(obiiter.MakeBioSequenceBatch(0, seqs), obiformats.FormatFastSeqJsonHeader, false)) + fasta.Write(obiformats.FormatFastaBatch(obiiter.MakeBioSequenceBatch( + fmt.Sprintf("%s_consensus", consensus_id), + 0, + seqs, + ), + obiformats.FormatFastSeqJsonHeader, false)) fasta.Close() } @@ -333,7 +338,7 @@ func CLIOBIMinion(itertator obiiter.IBioSequence) obiiter.IBioSequence { dirname := CLIGraphFilesDirectory() newIter := obiiter.MakeIBioSequence() - db := itertator.Load() + source, db := itertator.Load() log.Infof("Sequence dataset of %d sequeences loaded\n", len(db)) @@ -394,7 +399,7 @@ func CLIOBIMinion(itertator obiiter.IBioSequence) obiiter.IBioSequence { CLISampleAttribute(), CLIKmerSize()) - newIter.Push(obiiter.MakeBioSequenceBatch(sample_order, denoised)) + newIter.Push(obiiter.MakeBioSequenceBatch(source, sample_order, denoised)) sample_order++ } diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index 3489236..e545004 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -14,7 +14,7 @@ import ( "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" ) -func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { +func ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { var err error list_of_files := orderedset.NewOrderedSet() for _, fn := range filenames { @@ -39,7 +39,7 @@ func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) { if info.IsDir() { if path != fn { - subdir, e := _ExpandListOfFiles(true, path) + subdir, e := ExpandListOfFiles(true, path) if e != nil { return e } @@ -113,19 +113,26 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { log.Printf("Reading sequences from stdin in %s\n", CLIInputFormat()) opts = append(opts, obiformats.OptionsSource("stdin")) + var err error + switch CLIInputFormat() { case "ecopcr": - iterator = obiformats.ReadEcoPCR(os.Stdin, opts...) + iterator, err = obiformats.ReadEcoPCR(os.Stdin, opts...) case "embl": - iterator = obiformats.ReadEMBL(os.Stdin, opts...) + iterator, err = obiformats.ReadEMBL(os.Stdin, opts...) case "genbank": - iterator = obiformats.ReadGenbank(os.Stdin, opts...) + iterator, err = obiformats.ReadGenbank(os.Stdin, opts...) default: iterator = obiformats.ReadFastSeqFromStdin(opts...) } + + if err != nil { + return obiiter.NilIBioSequence, err + } + } else { - list_of_files, err := _ExpandListOfFiles(false, filenames...) + list_of_files, err := ExpandListOfFiles(false, filenames...) if err != nil { return obiiter.NilIBioSequence, err } diff --git a/pkg/obitools/obijoin/join.go b/pkg/obitools/obijoin/join.go index 1f087ab..a34bdff 100644 --- a/pkg/obitools/obijoin/join.go +++ b/pkg/obitools/obijoin/join.go @@ -129,7 +129,7 @@ func CLIJoinSequences(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Fatalf("Cannot read the data file to merge with: %s %v", CLIJoinWith(), err) } - data := data_iter.Load() + _, data := data_iter.Load() keys := CLIBy() diff --git a/pkg/obitools/obilandmark/obilandmark.go b/pkg/obitools/obilandmark/obilandmark.go index c2ba2ab..1700def 100644 --- a/pkg/obitools/obilandmark/obilandmark.go +++ b/pkg/obitools/obilandmark/obilandmark.go @@ -103,7 +103,7 @@ func MapOnLandmarkSequences(library obiseq.BioSequenceSlice, landmark_idx []int, // which landmark it corresponds. func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSequence { - library := iterator.Load() + source, library := iterator.Load() library_size := len(library) n_landmark := CLINCenter() @@ -191,6 +191,6 @@ func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSeque } } - return obiiter.IBatchOver(library, obioptions.CLIBatchSize()) + return obiiter.IBatchOver(source, library, obioptions.CLIBatchSize()) } diff --git a/pkg/obitools/obipairing/pairing.go b/pkg/obitools/obipairing/pairing.go index 0ceb7be..1c5a13b 100644 --- a/pkg/obitools/obipairing/pairing.go +++ b/pkg/obitools/obipairing/pairing.go @@ -255,6 +255,7 @@ func IAssemblePESequencesBatch(iterator obiiter.IBioSequence, delta, minOverlap, minIdentity, withStats, true, fastAlign, fastModeRel, arena, &shifts) } newIter.Push(obiiter.MakeBioSequenceBatch( + batch.Source(), batch.Order(), cons, )) diff --git a/pkg/obitools/obirefidx/famlilyindexing.go b/pkg/obitools/obirefidx/famlilyindexing.go index 3e55f93..7f3637f 100644 --- a/pkg/obitools/obirefidx/famlilyindexing.go +++ b/pkg/obitools/obirefidx/famlilyindexing.go @@ -130,7 +130,7 @@ func MakeIndexingSliceWorker(indexslot, idslot string, func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Infoln("Family level reference database indexing...") log.Infoln("Loading database...") - references := iterator.Load() + source, references := iterator.Load() nref := len(references) log.Infof("Done. Database contains %d sequences", nref) @@ -154,7 +154,7 @@ func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Info("done") - partof := obiiter.IBatchOver(references, + partof := obiiter.IBatchOver(source, references, obioptions.CLIBatchSize()).MakeIWorker(taxonomy.MakeSetSpeciesWorker(), false, obioptions.CLIParallelWorkers(), @@ -243,7 +243,7 @@ func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { waiting.Wait() - results := obiiter.IBatchOver(references, + results := obiiter.IBatchOver(source, references, obioptions.CLIBatchSize()).Speed("Writing db", nref) return results diff --git a/pkg/obitools/obirefidx/obirefidx.go b/pkg/obitools/obirefidx/obirefidx.go index 74fcb90..a756217 100644 --- a/pkg/obitools/obirefidx/obirefidx.go +++ b/pkg/obitools/obirefidx/obirefidx.go @@ -125,7 +125,7 @@ func IndexSequence(seqidx int, func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Infoln("Loading database...") - references := iterator.Load() + source, references := iterator.Load() log.Infof("Done. Database contains %d sequences", len(references)) taxo, error := obifind.CLILoadSelectedTaxonomy() @@ -204,7 +204,7 @@ func IndexReferenceDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { sl = append(sl, iref) bar.Add(1) } - indexed.Push(obiiter.MakeBioSequenceBatch(l[0]/10, sl)) + indexed.Push(obiiter.MakeBioSequenceBatch(source, l[0]/10, sl)) } indexed.Done() diff --git a/pkg/obitools/obitag/options.go b/pkg/obitools/obitag/options.go index f9dbb2a..ce5d233 100644 --- a/pkg/obitools/obitag/options.go +++ b/pkg/obitools/obitag/options.go @@ -57,7 +57,9 @@ func CLIRefDB() obiseq.BioSequenceSlice { log.Panicf("Cannot open the reference library file : %s\n", _RefDB) } - return refdb.Load() + _, db := refdb.Load() + + return db } func CLIGeometricMode() bool { @@ -70,7 +72,7 @@ func CLIShouldISaveRefDB() bool { func CLISaveRefetenceDB(db obiseq.BioSequenceSlice) { if CLIShouldISaveRefDB() { - idb := obiiter.IBatchOver(db, 1000) + idb := obiiter.IBatchOver("", db, 1000) var newIter obiiter.IBioSequence diff --git a/pkg/obitools/obitag2/options.go b/pkg/obitools/obitag2/options.go index 74bdc58..accebb7 100644 --- a/pkg/obitools/obitag2/options.go +++ b/pkg/obitools/obitag2/options.go @@ -57,7 +57,9 @@ func CLIRefDB() obiseq.BioSequenceSlice { log.Panicf("Cannot open the reference library file : %s\n", _RefDB) } - return refdb.Load() + _, db := refdb.Load() + + return db } func CLIGeometricMode() bool { @@ -70,7 +72,7 @@ func CLIShouldISaveRefDB() bool { func CLISaveRefetenceDB(db obiseq.BioSequenceSlice) { if CLIShouldISaveRefDB() { - idb := obiiter.IBatchOver(db, 1000) + idb := obiiter.IBatchOver("", db, 1000) var newIter obiiter.IBioSequence