Compare commits

..

5 Commits

Author SHA1 Message Date
Eric Coissac
c0c18030c8 Adds information on the new github address 2024-08-02 14:26:20 +02:00
Eric Coissac
242f4d8f56 Realease notes 2024-08-02 13:50:33 +02:00
Eric Coissac
1b1cd41fd3 Add some code refactoring from the blackboard branch 2024-08-02 12:35:46 +02:00
Eric Coissac
bc1aaaf7d9 New install script updated for github 2024-07-29 14:35:21 +02:00
Eric Coissac
2247c3bc0a obitag xprize version
Former-commit-id: 2c47d88724102b115aac5641abccb880dfbec64d
2024-07-25 18:11:11 -04:00
22 changed files with 192 additions and 1092 deletions

View File

@@ -5,7 +5,7 @@ They are implemented in *GO* and are tens of times faster than OBITools2.
The git for *OBITools4* is available at :
> https://metabarcoding.org/obitools4
> https://github.com/metabarcoding/obitools4
## Installing *OBITools V4*
@@ -13,7 +13,7 @@ An installation script that compiles the new *OBITools* on your Unix-like system
The easiest way to run it is to copy and paste the following command into your terminal
```{bash}
curl -L https://metabarcoding.org/obitools4/install.sh | bash
curl -L https://raw.githubusercontent.com/metabarcoding/obitools4/master/install_obitools.sh | bash
```
By default, the script installs the *OBITools* commands and other associated files into the `/usr/local` directory.
@@ -33,7 +33,7 @@ available on your system, the installation script offers two options:
You can use these options by following the installation command:
```{bash}
curl -L https://metabarcoding.org/obitools4/install.sh | \
curl -L https://raw.githubusercontent.com/metabarcoding/obitools4/master/install_obitools.sh | \
bash -s -- --install-dir test_install --obitools-prefix k
```

View File

@@ -2,6 +2,26 @@
## Latest changes
## August 2nd, 2024. Release 4.3.0
### Change of git repositiory
- The OBITools4 git repository has been moved to the github repository.
The new address is: https://github.com/metabarcoding/obitools4.
Take care for using the new install script for retrieving the new version.
```bash
curl -L https://raw.githubusercontent.com/metabarcoding/obitools4/master/install_obitools.sh \
| bash
```
or with options:
```bash
curl -L https://raw.githubusercontent.com/metabarcoding/obitools4/master/install_obitools.sh \
| bash -s -- --install-dir test_install --obitools-prefix k
```
### CPU limitation
- By default, *OBITools4* tries to use all the computing power available on
@@ -20,6 +40,37 @@
### New features
- The output of the obitools will evolve to produce results only in standard
formats such as fasta and fastq. For non-sequential data, the output will be
in CSV format, with the separator `,`, the decimal separator `.`, and a
header line with the column names. It is more convenient to use the output
in other programs. For example, you can use the `csvtomd` command to
reformat the csv output into a markdown table. The first command to initiate
this change is `obicount`, which now produces a 3-line CSV output.
```bash
obicount data.csv | csvtomd
```
- Adds the new experimental `obicleandb` utility to clean up reference
database files created with `obipcr`. An easy way to create a reference
database for `obitag` is to use `obipcr` on a local copy of Genbank or EMBL.
However, these sequence databases are known to contain many taxonomic
errors, such as bacterial sequences annotated with the taxid of their host
species. obicleandb tries to detect these errors. To do this, it first keeps
only sequences annotated with the taxid to which a species, genus, and
family taxid can be assigned. Then, for each sequence, it compares the
distance of the sequence to the other sequences belonging to the same genus
to the same number of distances between the considered sequence and a
randomly selected set of sequences belonging to another family using a
Mann-Whitney U test. The alternative hypothesis is that out-of-family
distances are greater than intrageneric distances. Sequences are annotated
with the p-value of the Mann-Whitney U test in the **obicleandb_trusted**
slot. Later, the distribution of this p-value can be analyzed to determine a
threshold. Empirically, a threshold of 0.05 is a good compromise and allows
to filter out less than 1‰ of the sequences. These sequences can then be
removed using `obigrep`.
- Adds a new `obijoin` utility to join information contained in a sequence
file with that contained in another sequence or CSV file. The command allows
you to specify the names of the keys in the main sequence file and in the

View File

@@ -4,7 +4,8 @@ import (
"fmt"
"os"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiblackboard"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obicount"
@@ -34,28 +35,28 @@ func main() {
_, args := optionParser(os.Args)
black := obiblackboard.NewBlackBoard(obioptions.CLIParallelWorkers())
obioptions.SetStrictReadWorker(min(4, obioptions.CLIParallelWorkers()))
fs, err := obiconvert.CLIReadBioSequences(args...)
black.ReadSequences(args)
if err != nil {
log.Errorf("Cannot open file (%v)", err)
os.Exit(1)
}
counter := obiblackboard.CountSequenceAggregator("to_delete")
nvariant, nread, nsymbol := fs.Count(true)
black.RegisterRunner("sequences", counter.Runner)
black.RegisterRunner("to_delete", obiblackboard.RecycleSequences(true, "final"))
black.Run()
fmt.Print("entity,n\n")
fmt.Print("entites,n\n")
if obicount.CLIIsPrintingVariantCount() {
fmt.Printf("variants,%d\n", counter.Variants)
fmt.Printf("variants,%d\n", nvariant)
}
if obicount.CLIIsPrintingReadCount() {
fmt.Printf("reads,%d\n", counter.Reads)
fmt.Printf("reads,%d\n", nread)
}
if obicount.CLIIsPrintingSymbolCount() {
fmt.Printf("nucleotides,%d\n", counter.Nucleotides)
fmt.Printf("symbols,%d\n", nsymbol)
}
}

View File

@@ -0,0 +1,65 @@
package main
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obifind"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obitag"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obitag2"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
)
func main() {
// go tool pprof -http=":8000" ./build/obitag ./cpu.pprof
// f, err := os.Create("cpu.pprof")
// if err != nil {
// log.Fatal(err)
// }
// pprof.StartCPUProfile(f)
// defer pprof.StopCPUProfile()
// go tool trace cpu.trace
// ftrace, err := os.Create("cpu.trace")
// if err != nil {
// log.Fatal(err)
// }
// trace.Start(ftrace)
// defer trace.Stop()
obioptions.SetWorkerPerCore(2)
obioptions.SetStrictReadWorker(1)
obioptions.SetStrictWriteWorker(1)
obioptions.SetBatchSize(10)
optionParser := obioptions.GenerateOptionParser(obitag.OptionSet)
_, args := optionParser(os.Args)
fs, err := obiconvert.CLIReadBioSequences(args...)
if err != nil {
log.Errorf("Cannot open file (%v)", err)
os.Exit(1)
}
taxo, error := obifind.CLILoadSelectedTaxonomy()
if error != nil {
log.Panicln(error)
}
references := obitag.CLIRefDB()
identified := obitag2.CLIAssignTaxonomy(fs, references, taxo)
obiconvert.CLIWriteBioSequences(identified, true)
obiiter.WaitForLastPipe()
fmt.Println("")
}

View File

@@ -1,29 +1,38 @@
package main
import (
"fmt"
"os"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiblackboard"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
)
func r2(bb *obiblackboard.Blackboard, task *obiblackboard.Task) *obiblackboard.Task {
fmt.Printf("value : %v\n", task.Body)
return obiblackboard.NewInitialTask()
}
func rmul(bb *obiblackboard.Blackboard, task *obiblackboard.Task) *obiblackboard.Task {
nt := task.GetNext()
nt.Body = task.Body.(int) * 2
return nt
}
func main() {
optionParser := obioptions.GenerateOptionParser(obiconvert.OptionSet)
black := obiblackboard.NewBlackBoard(20)
_, args := optionParser(os.Args)
black.RegisterRunner("todisplay", "final", r2)
black.RegisterRunner("multiply", "todisplay", rmul)
black.RegisterRunner("initial", "multiply", obiblackboard.DoCount(1000).RepeatTask(4))
fs, err := obiconvert.CLIReadBioSequences(args...)
if err != nil {
log.Errorf("Cannot open file (%v)", err)
os.Exit(1)
}
frags := obiiter.IFragments(
1000,
100,
10,
100,
obioptions.CLIParallelWorkers(),
)
obiconvert.CLIWriteBioSequences(fs.Pipe(frags), true)
obiiter.WaitForLastPipe()
black.Run()
}

View File

@@ -4,7 +4,7 @@ INSTALL_DIR="/usr/local"
OBITOOLS_PREFIX=""
# default values
URL="https://go.dev/dl/"
OBIURL4="https://git.metabarcoding.org/obitools/obitools4/obitools4/-/archive/master/obitools4-master.tar.gz"
OBIURL4="https://github.com/metabarcoding/obitools4/archive/refs/heads/master.zip"
INSTALL_DIR="/usr/local"
OBITOOLS_PREFIX=""
@@ -106,8 +106,10 @@ curl "$GOURL" \
PATH="$(pwd)/go/bin:$PATH"
export PATH
curl -L "$OBIURL4" \
| tar zxf -
curl -L "$OBIURL4" > master.zip
unzip master.zip
echo "Install OBITOOLS from : $OBIURL4"
cd obitools4-master || exit

View File

@@ -1,204 +0,0 @@
package obiblackboard
import (
"slices"
"sync"
"time"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
type DoTask func(*Blackboard, *Task) *Task
type Blackboard struct {
Board map[int]Queue
BoardLock *sync.Mutex
Runners map[string]DoTask
Running *obiutils.Counter
TargetSize int
Size int
}
func doFinal(bb *Blackboard, task *Task) *Task {
if task.SavedTask != nil {
return task.SavedTask
}
return NewInitialTask()
}
func NewBlackBoard(size int) *Blackboard {
board := make(map[int]Queue, 0)
runners := make(map[string]DoTask, 0)
if size < 2 {
size = 2
}
bb := &Blackboard{
Board: board,
BoardLock: &sync.Mutex{},
Runners: runners,
Running: obiutils.NewCounter(),
TargetSize: size,
Size: 0,
}
for i := 0; i < size; i++ {
bb.PushTask(NewInitialTask())
}
bb.RegisterRunner("final", doFinal)
return bb
}
func (bb *Blackboard) RegisterRunner(target string, runner DoTask) {
bb.Runners[target] = runner
}
func (bb *Blackboard) MaxQueue() Queue {
max_priority := -1
max_queue := Queue(nil)
for priority, queue := range bb.Board {
if priority > max_priority {
max_queue = queue
}
}
return max_queue
}
func (bb *Blackboard) PopTask() *Task {
bb.BoardLock.Lock()
defer bb.BoardLock.Unlock()
q := bb.MaxQueue()
if q != nil {
next_task := (*q)[0]
(*q) = (*q)[1:]
if len(*q) == 0 {
delete(bb.Board, next_task.Priority)
}
bb.Size--
return next_task
}
return (*Task)(nil)
}
func (bb *Blackboard) PushTask(task *Task) {
bb.BoardLock.Lock()
defer bb.BoardLock.Unlock()
if task != nil {
priority := task.Priority
queue, ok := bb.Board[priority]
if !ok {
queue = NewQueue()
bb.Board[priority] = queue
}
*queue = slices.Grow(*queue, 1)
*queue = append((*queue), task)
bb.Size++
}
}
func (bb *Blackboard) Run() {
ctask := make(chan *Task)
lock := &sync.WaitGroup{}
launcher := func() {
for task := range ctask {
runner, ok := bb.Runners[task.Role]
if ok {
task = runner(bb, task)
}
bb.PushTask(task)
bb.Running.Dec()
}
lock.Done()
}
parallel := bb.TargetSize - 1
lock.Add(parallel)
for i := 0; i < parallel; i++ {
go launcher()
}
go func() {
for {
bb.Running.Inc()
task := bb.PopTask()
if task != nil {
ctask <- task
} else {
bb.Running.Dec()
if bb.Running.Value()+bb.Len() <= 0 {
break
}
time.Sleep(time.Millisecond)
}
}
close(ctask)
}()
lock.Wait()
}
// func (bb *Blackboard) Run() {
// lock := &sync.WaitGroup{}
// launcher := func(runner DoTask, task *Task) {
// task = runner(bb, task)
// if task != nil {
// for bb.Len() > bb.TargetSize {
// time.Sleep(time.Millisecond)
// }
// bb.PushTask(task)
// }
// bb.Running.Dec()
// lock.Done()
// }
// lock.Add(1)
// func() {
// for bb.Len()+bb.Running.Value() > 0 {
// bb.Running.Inc()
// task := bb.PopTask()
// if task != nil {
// lock.Add(1)
// go launcher(bb.Runners[task.Role], task)
// } else {
// bb.Running.Dec()
// }
// }
// lock.Done()
// }()
// lock.Wait()
// }
func (bb *Blackboard) Len() int {
return bb.Size
}
// 151431044 151431044 15083822152

View File

@@ -1,50 +0,0 @@
package obiblackboard
import (
"sync"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
)
type SequenceCounter struct {
Variants int
Reads int
Nucleotides int
Runner DoTask
}
func CountSequenceAggregator(target string) *SequenceCounter {
cc := &SequenceCounter{
Variants: 0,
Reads: 0,
Nucleotides: 0,
Runner: nil,
}
mutex := sync.Mutex{}
runner := func(bb *Blackboard, task *Task) *Task {
body := task.Body.(obiiter.BioSequenceBatch)
mutex.Lock()
cc.Variants += body.Len()
cc.Reads += body.Slice().Count()
cc.Nucleotides += body.Slice().Size()
mutex.Unlock()
nt := task.GetNext(target, true, false)
return nt
}
cc.Runner = runner
return cc
}
func RecycleSequences(rescycleSequence bool, target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
body := task.Body.(obiiter.BioSequenceBatch)
// log.Warningf("With priority %d, Recycling %s[%d]", task.Priority, body.Source(), body.Order())
body.Recycle(rescycleSequence)
return task.GetNext(target, false, false)
}
}

View File

@@ -1,17 +0,0 @@
package obiblackboard
import "fmt"
func DisplayTask(bb *Blackboard, task *Task) *Task {
if task == nil {
return nil
}
fmt.Printf("Task: %s:\n%v\n\n", task.Role, task.Body)
return task
}
func (runner DoTask) Display() DoTask {
return runner.CombineWith(DisplayTask)
}

View File

@@ -1,70 +0,0 @@
package obiblackboard
import "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
type Iteration[T any] struct {
Index int
Value T
}
// DoIterateSlice generates a DoTask function that iterates over a given slice and
// creates a new InitialTask for each element. The function takes in a slice of type
// T and a target string. It returns a DoTask function that can be used to execute
// the iteration. The DoTask function takes a Blackboard and a Task as input and
// returns a new Task. The Task's Role is set to the target string and its Body is
// set to an Iteration struct containing the index i and the element s[i] from the
// input slice. The iteration stops when the index i is equal to or greater than
// the length of the input slice.
//
// Parameters:
// - s: The slice of type T to iterate over.
// - target: The target string to set as the Task's Role.
//
// Return type:
// - DoTask: The DoTask function that can be used to execute the iteration.
func DoIterateSlice[T any](s []T, target string) DoTask {
n := len(s)
idx := obiutils.AtomicCounter()
dt := func(bb *Blackboard, t *Task) *Task {
i := idx()
if i < n {
nt := t.GetNext(target, false, false)
nt.Body = Iteration[T]{i, s[i]}
return nt
}
return nil
}
return dt
}
// DoCount generates a DoTask function that iterates over a given integer n and
// creates a new InitialTask for each iteration. The function takes in an integer n
// and a target string. It returns a DoTask function that can be used to execute
// the iteration. The DoTask function takes a Blackboard and a Task as input and
// returns a new Task. The Task's Role is set to the target string and its Body is
// set to the current iteration index i. The iteration stops when the index i is
// equal to or greater than the input integer n.
//
// Parameters:
// - n: The integer to iterate over.
// - target: The target string to set as the Task's Role.
//
// Return type:
// - DoTask: The DoTask function that can be used to execute the iteration.
func DoCount(n int, target string) DoTask {
idx := obiutils.AtomicCounter()
dt := func(bb *Blackboard, t *Task) *Task {
i := idx()
if i < n {
nt := t.GetNext(target, false, false)
nt.Body = i
return nt
}
return nil
}
return dt
}

View File

@@ -1,8 +0,0 @@
package obiblackboard
type Queue *[]*Task
func NewQueue() Queue {
q := make([]*Task, 0)
return &q
}

View File

@@ -1,534 +0,0 @@
package obiblackboard
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
"github.com/gabriel-vasile/mimetype"
"github.com/goombaio/orderedset"
log "github.com/sirupsen/logrus"
)
func ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) {
res, err := _ExpandListOfFiles(check_ext, filenames...)
if err != nil {
log.Infof("Found %d files to process", len(res))
}
return res, err
}
func _ExpandListOfFiles(check_ext bool, filenames ...string) ([]string, error) {
var err error
list_of_files := orderedset.NewOrderedSet()
for _, fn := range filenames {
// Special case for stdin
if fn == "-" {
list_of_files.Add(fn)
continue
}
err = filepath.Walk(fn,
func(path string, info os.FileInfo, err error) error {
var e error
if info == nil {
return fmt.Errorf("cannot open path")
}
for info.Mode()&os.ModeSymlink == os.ModeSymlink {
path, e = filepath.EvalSymlinks(path)
if e != nil {
return e
}
info, e = os.Stat(path)
if e != nil {
return e
}
}
if info.IsDir() {
if path != fn {
subdir, e := ExpandListOfFiles(true, path)
if e != nil {
return e
}
for _, f := range subdir {
list_of_files.Add(f)
}
} else {
check_ext = true
}
} else {
if !check_ext ||
strings.HasSuffix(path, "csv") ||
strings.HasSuffix(path, "csv.gz") ||
strings.HasSuffix(path, "fasta") ||
strings.HasSuffix(path, "fasta.gz") ||
strings.HasSuffix(path, "fastq") ||
strings.HasSuffix(path, "fastq.gz") ||
strings.HasSuffix(path, "seq") ||
strings.HasSuffix(path, "seq.gz") ||
strings.HasSuffix(path, "gb") ||
strings.HasSuffix(path, "gb.gz") ||
strings.HasSuffix(path, "dat") ||
strings.HasSuffix(path, "dat.gz") ||
strings.HasSuffix(path, "ecopcr") ||
strings.HasSuffix(path, "ecopcr.gz") {
log.Debugf("Appending %s file\n", path)
list_of_files.Add(path)
}
}
return nil
})
if err != nil {
return nil, err
}
}
res := make([]string, 0, list_of_files.Size())
for _, v := range list_of_files.Values() {
res = append(res, v.(string))
}
return res, nil
}
// OBIMimeTypeGuesser is a function that takes an io.Reader as input and guesses the MIME type of the data.
// It uses several detectors to identify specific file formats, such as FASTA, FASTQ, ecoPCR2, GenBank, and EMBL.
// The function reads data from the input stream and analyzes it using the mimetype library.
// It then returns the detected MIME type, a modified reader with the read data, and any error encountered during the process.
//
// The following file types are recognized:
// - "text/ecopcr": if the first line starts with "#@ecopcr-v2".
// - "text/fasta": if the first line starts with ">".
// - "text/fastq": if the first line starts with "@".
// - "text/embl": if the first line starts with "ID ".
// - "text/genbank": if the first line starts with "LOCUS ".
// - "text/genbank" (special case): if the first line "Genetic Sequence Data Bank" (for genbank release files).
// - "text/csv"
//
// Parameters:
// - stream: An io.Reader representing the input stream to read data from.
//
// Returns:
// - *mimetype.MIME: The detected MIME type of the data.
// - io.Reader: A modified reader with the read data.
// - error: Any error encountered during the process.
func OBIMimeTypeGuesser(stream io.Reader) (*mimetype.MIME, io.Reader, error) {
fastaDetector := func(raw []byte, limit uint32) bool {
ok, err := regexp.Match("^>[^ ]", raw)
return ok && err == nil
}
fastqDetector := func(raw []byte, limit uint32) bool {
ok, err := regexp.Match("^@[^ ].*\n[^ ]+\n\\+", raw)
return ok && err == nil
}
ecoPCR2Detector := func(raw []byte, limit uint32) bool {
ok := bytes.HasPrefix(raw, []byte("#@ecopcr-v2"))
return ok
}
genbankDetector := func(raw []byte, limit uint32) bool {
ok2 := bytes.HasPrefix(raw, []byte("LOCUS "))
ok1, err := regexp.Match("^[^ ]* +Genetic Sequence Data Bank *\n", raw)
return ok2 || (ok1 && err == nil)
}
emblDetector := func(raw []byte, limit uint32) bool {
ok := bytes.HasPrefix(raw, []byte("ID "))
return ok
}
mimetype.Lookup("text/plain").Extend(fastaDetector, "text/fasta", ".fasta")
mimetype.Lookup("text/plain").Extend(fastqDetector, "text/fastq", ".fastq")
mimetype.Lookup("text/plain").Extend(ecoPCR2Detector, "text/ecopcr2", ".ecopcr")
mimetype.Lookup("text/plain").Extend(genbankDetector, "text/genbank", ".seq")
mimetype.Lookup("text/plain").Extend(emblDetector, "text/embl", ".dat")
mimetype.Lookup("application/octet-stream").Extend(fastaDetector, "text/fasta", ".fasta")
mimetype.Lookup("application/octet-stream").Extend(fastqDetector, "text/fastq", ".fastq")
mimetype.Lookup("application/octet-stream").Extend(ecoPCR2Detector, "text/ecopcr2", ".ecopcr")
mimetype.Lookup("application/octet-stream").Extend(genbankDetector, "text/genbank", ".seq")
mimetype.Lookup("application/octet-stream").Extend(emblDetector, "text/embl", ".dat")
// Create a buffer to store the read data
buf := make([]byte, 1024*128)
n, err := io.ReadFull(stream, buf)
if err != nil && err != io.ErrUnexpectedEOF {
return nil, nil, err
}
// Detect the MIME type using the mimetype library
mimeType := mimetype.Detect(buf)
if mimeType == nil {
return nil, nil, err
}
// Create a new reader based on the read data
newReader := io.Reader(bytes.NewReader(buf[:n]))
if err == nil {
newReader = io.MultiReader(newReader, stream)
}
return mimeType, newReader, nil
}
func TextChunkParser(parser obiformats.SeqFileChunkParser, target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
chunk := task.Body.(obiformats.SeqFileChunk)
sequences, err := parser(chunk.Source, chunk.Raw)
if err != nil {
return nil
}
nt := task.GetNext(target, false, false)
nt.Body = obiiter.MakeBioSequenceBatch(
chunk.Source,
chunk.Order,
sequences)
return nt
}
}
func SeqAnnotParser(parser obiseq.SeqAnnotator, target string) DoTask {
worker := obiseq.SeqToSliceWorker(obiseq.AnnotatorToSeqWorker(parser), false)
return func(bb *Blackboard, task *Task) *Task {
batch := task.Body.(obiiter.BioSequenceBatch)
sequences, err := worker(batch.Slice())
if err != nil {
log.Errorf("SeqAnnotParser on %s[%d]: %v", batch.Source(), batch.Order(), err)
return nil
}
nt := task.GetNext(target, false, false)
nt.Body = obiiter.MakeBioSequenceBatch(
batch.Source(),
batch.Order(),
sequences,
)
return nt
}
}
// OpenStream opens a file specified by the given filename and returns a reader for the file,
// the detected MIME type of the file, and any error encountered during the process.
//
// Parameters:
// - filename: A string representing the path to the file to be opened. If the filename is "-",
// the function opens the standard input stream.
//
// Returns:
// - io.Reader: A reader for the file.
// - *mimetype.MIME: The detected MIME type of the file.
// - error: Any error encountered during the process.
func OpenStream(filename string) (io.Reader, *mimetype.MIME, error) {
var stream io.Reader
var err error
if filename == "-" {
stream, err = obiformats.Buf(os.Stdin)
} else {
stream, err = obiformats.Ropen(filename)
}
if err != nil {
return nil, nil, err
}
// Detect the MIME type using the mimetype library
mimeType, newReader, err := OBIMimeTypeGuesser(stream)
if err != nil {
return nil, nil, err
}
log.Infof("%s mime type: %s", filename, mimeType.String())
return bufio.NewReader(newReader), mimeType, nil
}
type OpenedStreamBody struct {
Stream io.Reader
Filename string
Source string
Mime *mimetype.MIME
ToBeClosed bool
}
func FilenameToStream(target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
filename := task.Body.(Iteration[string]).Value
stream, mimetype, err := OpenStream(filename)
if err != nil {
log.Errorf("Error opening %s: %v", filename, err)
return nil
}
tobeclosed := filename != "-"
switch mimetype.String() {
case "text/fasta", "text/fastq", "text/ecopcr2", "text/genbank", "text/embl", "text/csv":
nt := task.GetNext(target+":"+mimetype.String(), false, false)
nt.Body = OpenedStreamBody{
Stream: stream,
Mime: mimetype,
Filename: filename,
Source: obiutils.RemoveAllExt((path.Base(filename))),
ToBeClosed: tobeclosed,
}
return nt
default:
log.Errorf("File %s (mime type %s) is an unsupported format", filename, mimetype.String())
return nil
}
}
}
type TextChunkIteratorBody struct {
Chunks obiformats.ChannelSeqFileChunk
Stream io.Reader
Source string
ToBeClosed bool
}
func StreamToTextChunkReader(lastEntry obiformats.LastSeqRecord, target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
body := task.Body.(OpenedStreamBody)
iterator := obiformats.ReadSeqFileChunk(
body.Source,
body.Stream,
make([]byte, 64*1024*1024),
lastEntry,
)
nt := task.GetNext(target, false, false)
nt.Body = TextChunkIteratorBody{
Chunks: iterator,
Stream: body.Stream,
Source: body.Source,
ToBeClosed: body.ToBeClosed,
}
return nt
}
}
func TextChuckIterator(endTask *Task, target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
body := task.Body.(TextChunkIteratorBody)
chunk, ok := <-body.Chunks
if !ok {
return endTask
}
var nt *Task
if bb.Len() > bb.TargetSize {
nt = task.GetNext(target, false, true)
} else {
nt = task.GetNext(target, false, false)
bb.PushTask(task)
}
nt.Body = chunk
return nt
}
}
type SequenceIteratorBody struct {
Iterator obiiter.IBioSequence
Stream io.Reader
Source string
ToBeClosed bool
}
func StreamToSequenceReader(
reader obiformats.SequenceReader,
options []obiformats.WithOption,
target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
body := task.Body.(OpenedStreamBody)
iterator, err := reader(body.Stream, options...)
if err != nil {
log.Errorf("Error opening %s: %v", body.Filename, err)
return nil
}
nt := task.GetNext(target, false, false)
nt.Body = SequenceIteratorBody{
Iterator: iterator,
Stream: body.Stream,
Source: body.Source,
ToBeClosed: body.ToBeClosed,
}
return nt
}
}
func SequenceIterator(endTask *Task, target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
body := task.Body.(SequenceIteratorBody)
if body.Iterator.Next() {
batch := body.Iterator.Get()
var nt *Task
if bb.Len() > bb.TargetSize {
nt = task.GetNext(target, false, true)
} else {
nt = task.GetNext(target, false, false)
bb.PushTask(task)
}
nt.Body = batch
return nt
} else {
return endTask
}
}
}
func (bb *Blackboard) ReadSequences(filepath []string, options ...obiformats.WithOption) {
var err error
opts := obiformats.MakeOptions(options)
if len(filepath) == 0 {
filepath = []string{"-"}
}
filepath, err = ExpandListOfFiles(false, filepath...)
if err != nil {
log.Fatalf("Cannot expand list of files : %v", err)
}
bb.RegisterRunner(
"initial",
DoIterateSlice(filepath, "filename"),
)
bb.RegisterRunner(
"filename",
FilenameToStream("stream"),
)
bb.RegisterRunner("stream:text/fasta",
StreamToTextChunkReader(
obiformats.EndOfLastFastaEntry,
"fasta_text_reader",
))
bb.RegisterRunner("fasta_text_reader",
TextChuckIterator(NewInitialTask(), "fasta_text_chunk"),
)
bb.RegisterRunner(
"fasta_text_chunk",
TextChunkParser(
obiformats.FastaChunkParser(),
"unannotated_sequences",
),
)
bb.RegisterRunner("stream:text/fastq",
StreamToTextChunkReader(obiformats.EndOfLastFastqEntry,
"fastq_text_reader"))
bb.RegisterRunner("fastq_text_reader",
TextChuckIterator(NewInitialTask(), "fastq_text_chunk"),
)
bb.RegisterRunner(
"fastq_text_chunk",
TextChunkParser(
obiformats.FastqChunkParser(obioptions.InputQualityShift()),
"unannotated_sequences",
),
)
bb.RegisterRunner("stream:text/embl",
StreamToTextChunkReader(obiformats.EndOfLastFlatFileEntry,
"embl_text_reader"))
bb.RegisterRunner("embl_text_reader",
TextChuckIterator(NewInitialTask(), "embl_text_chunk"),
)
bb.RegisterRunner(
"embl_text_chunk",
TextChunkParser(
obiformats.EmblChunkParser(opts.WithFeatureTable()),
"sequences",
),
)
bb.RegisterRunner("stream:text/genbank",
StreamToTextChunkReader(obiformats.EndOfLastFlatFileEntry,
"genbank_text_reader"))
bb.RegisterRunner("genbank_text_reader",
TextChuckIterator(NewInitialTask(), "genbank_text_chunk"),
)
bb.RegisterRunner(
"genbank_text_chunk",
TextChunkParser(
obiformats.GenbankChunkParser(opts.WithFeatureTable()),
"sequences",
),
)
bb.RegisterRunner(
"unannotated_sequences",
SeqAnnotParser(
opts.ParseFastSeqHeader(),
"sequences",
),
)
bb.RegisterRunner("stream:text/csv",
StreamToSequenceReader(obiformats.ReadCSV, options, "sequence_reader"))
bb.RegisterRunner("stream:text/ecopcr2",
StreamToSequenceReader(obiformats.ReadEcoPCR, options, "sequence_reader"))
bb.RegisterRunner("sequence_reader",
SequenceIterator(NewInitialTask(), "sequences"),
)
}

View File

@@ -1,108 +0,0 @@
package obiblackboard
import (
"sync"
log "github.com/sirupsen/logrus"
)
// RepeatTask creates a new DoTask function that repeats the given task n times.
//
// It takes an integer n as input, which specifies the number of times the task should be repeated.
// It returns a new DoTask function that can be used to execute the repeated task.
//
// The returned DoTask function maintains a map of tasks to their counts and tasks.
// When a task is executed, it checks if the task has been executed before.
// If it has, it increments the count and returns the previously executed task.
// If it has not been executed before, it executes the task using the provided runner function.
// If the runner function returns nil, the task is not added to the task memory and nil is returned.
// If the runner function returns a non-nil task, it is added to the task memory with a count of 0.
// After executing the task, the function checks if the count is less than (n-1).
// If it is, the task is added back to the blackboard to be executed again.
// If the count is equal to (n-1), the task is removed from the task memory.
// Finally, the function returns the executed task.
func (runner DoTask) RepeatTask(n int) DoTask {
type memtask struct {
count int
task *Task
}
taskMemory := make(map[*Task]*memtask)
taskMemoryLock := sync.Mutex{}
if n < 1 {
log.Fatalf("Cannot repeat a task less than once (n=%d)", n)
}
st := func(bb *Blackboard, task *Task) *Task {
taskMemoryLock.Lock()
mem, ok := taskMemory[task]
if !ok {
nt := runner(bb, task)
if nt == nil {
taskMemoryLock.Unlock()
return nt
}
mem = &memtask{
count: 0,
task: nt,
}
taskMemory[task] = mem
} else {
mem.count++
}
taskMemoryLock.Unlock()
if mem.count < (n - 1) {
bb.PushTask(task)
}
if mem.count == (n - 1) {
taskMemoryLock.Lock()
delete(taskMemory, task)
taskMemoryLock.Unlock()
}
return mem.task
}
return st
}
// CombineWith returns a new DoTask function that combines the given DoTask
// functions. The returned function applies the `other` function to the result
// of the `runner` function. The `bb` parameter is the Blackboard instance,
// and the `task` parameter is the Task instance.
//
// Parameters:
// - bb: The Blackboard instance.
// - task: The Task instance.
//
// Returns:
// - *Task: The result of applying the `other` function to the result of the
// `runner` function.
func (runner DoTask) CombineWith(other DoTask) DoTask {
return func(bb *Blackboard, task *Task) *Task {
return other(bb, runner(bb, task))
}
}
// SetTarget sets the target role for the task.
//
// Parameters:
// - target: The target role to set.
//
// Returns:
// - DoTask: The modified DoTask function.
func (runner DoTask) SetTarget(target string) DoTask {
return func(bb *Blackboard, task *Task) *Task {
nt := runner(bb, task)
nt.Role = target
return nt
}
}

View File

@@ -1,34 +0,0 @@
package obiblackboard
type Task struct {
Role string
SavedTask *Task
Priority int
Body interface{}
}
func NewInitialTask() *Task {
return &Task{
Role: "initial",
SavedTask: nil,
Priority: 0,
Body: nil,
}
}
func (task *Task) GetNext(target string, copy bool, save bool) *Task {
t := NewInitialTask()
t.Priority = task.Priority + 1
t.Role = target
if copy {
t.Body = task.Body
}
if save {
t.SavedTask = task
} else {
t.SavedTask = task.SavedTask
}
return t
}

View File

@@ -74,7 +74,7 @@ func FormatFasta(seq *obiseq.BioSequence, formater FormatHeader) string {
// - skipEmpty: a boolean indicating whether empty sequences should be skipped or not.
//
// It returns a byte array containing the formatted sequences.
func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) []byte {
func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) *bytes.Buffer {
// Create a buffer to store the formatted sequences
var bs bytes.Buffer
@@ -114,7 +114,7 @@ func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader, ski
}
// Return the byte array representation of the buffer
return bs.Bytes()
return &bs
}
// WriteFasta writes a given iterator of bio sequences to a file in FASTA format.
@@ -154,7 +154,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
chunkchan <- SeqFileChunk{
Source: batch.Source(),
Raw: bytes.NewBuffer(FormatFastaBatch(batch, header_format, opt.SkipEmptySequence())),
Raw: FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()),
Order: batch.Order(),
}

View File

@@ -14,6 +14,8 @@ import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
)
type FormatSeqBatch func(batch obiiter.BioSequenceBatch, formater FormatHeader, skipEmpty bool) *bytes.Buffer
func _formatFastq(buff *bytes.Buffer, seq *obiseq.BioSequence, formater FormatHeader) {
info := ""
@@ -49,7 +51,7 @@ func FormatFastq(seq *obiseq.BioSequence, formater FormatHeader) string {
}
func FormatFastqBatch(batch obiiter.BioSequenceBatch,
formater FormatHeader, skipEmpty bool) []byte {
formater FormatHeader, skipEmpty bool) *bytes.Buffer {
var bs bytes.Buffer
lt := 0
@@ -82,9 +84,7 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch,
}
chunk := bs.Bytes()
return chunk
return &bs
}
type FileChunk struct {
@@ -127,7 +127,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
batch := iterator.Get()
chunk := SeqFileChunk{
Source: batch.Source(),
Raw: bytes.NewBuffer(FormatFastqBatch(batch, header_format, opt.SkipEmptySequence())),
Raw: FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()),
Order: batch.Order(),
}
chunkchan <- chunk

View File

@@ -7,7 +7,8 @@ import (
type __options__ struct {
fastseq_header_parser obiseq.SeqAnnotator
fastseq_header_writer func(*obiseq.BioSequence) string
fastseq_header_writer BioSequenceFormater
seqBatchFormater FormatSeqBatch
with_progress_bar bool
buffer_size int
batch_size int
@@ -44,6 +45,7 @@ func MakeOptions(setters []WithOption) Options {
o := __options__{
fastseq_header_parser: ParseGuessedFastSeqHeader,
fastseq_header_writer: FormatFastSeqJsonHeader,
seqBatchFormater: nil,
with_progress_bar: false,
buffer_size: 2,
parallel_workers: obioptions.CLIReadParallelWorkers(),
@@ -103,6 +105,10 @@ func (opt Options) FormatFastSeqHeader() func(*obiseq.BioSequence) string {
return opt.pointer.fastseq_header_writer
}
func (opt Options) SequenceFormater() FormatSeqBatch {
return opt.pointer.seqBatchFormater
}
func (opt Options) NoOrder() bool {
return opt.pointer.no_order
}
@@ -219,8 +225,6 @@ func OptionNoOrder(no_order bool) WithOption {
return f
}
func OptionsCompressed(compressed bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.compressed = compressed
@@ -271,6 +275,14 @@ func OptionsFastSeqHeaderFormat(format func(*obiseq.BioSequence) string) WithOpt
return f
}
func OptionsSequenceFormater(formater FormatSeqBatch) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.seqBatchFormater = formater
})
return f
}
func OptionsParallelWorkers(nworkers int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.parallel_workers = nworkers

View File

@@ -14,7 +14,7 @@ import (
)
var _Debug = false
var _WorkerPerCore = 1.0
var _WorkerPerCore = 2.0
var _ReadWorkerPerCore = 0.5
var _WriteWorkerPerCore = 0.25
var _StrictReadWorker = 0

View File

@@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "776b8f7"
var _Commit = "1b1cd41"
var _Version = ""
// Version returns the version of the obitools package.

View File

@@ -186,21 +186,6 @@ func (s BioSequenceSlice) Size() int {
return size
}
// Count calculates the total count of all BioSequence elements in the BioSequenceSlice.
//
// It iterates over each BioSequence in the slice and adds the count of each BioSequence to the total count.
//
// Returns the total count as an integer.
func (s BioSequenceSlice) Count() int {
size := 0
for _, s := range s {
size += s.Count()
}
return size
}
func (s BioSequenceSlice) AttributeKeys(skip_map bool) obiutils.Set[string] {
keys := obiutils.MakeSet[string]()

View File

@@ -51,7 +51,7 @@ func BuildConsensus(seqs obiseq.BioSequenceSlice,
0,
seqs,
),
obiformats.FormatFastSeqJsonHeader, false))
obiformats.FormatFastSeqJsonHeader, false).Bytes())
fasta.Close()
}

View File

@@ -52,7 +52,7 @@ func CLIPCR(iterator obiiter.IBioSequence) (obiiter.IBioSequence, error) {
100,
obioptions.CLIParallelWorkers(),
)
log.Infof("Fragmenting sequence longer than %dbp into chunks of %dbp",
log.Infof("Fragmenting sequence longer than %dbp into chuncks of %dbp",
CLIMaxLength()*1000,
CLIMaxLength()*100,
)