second version of obidistribute and a first buggy version of obiuniq

This commit is contained in:
2022-02-15 00:47:02 +01:00
parent b931321ba1
commit 3586ecc483
15 changed files with 402 additions and 21 deletions

1
.gitignore vendored
View File

@ -13,3 +13,4 @@ vendor
/obipcr
/obifind
/obidistribute
/obiuniq

View File

@ -0,0 +1,36 @@
package main
import (
"os"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiuniq"
)
func main() {
// go tool pprof -http=":8000" ./obipairing ./cpu.pprof
// f, err := os.Create("cpu.pprof")
// if err != nil {
// log.Fatal(err)
// }
// pprof.StartCPUProfile(f)
// defer pprof.StopCPUProfile()
// go tool trace cpu.trace
// ftrace, err := os.Create("cpu.trace")
// if err != nil {
// log.Fatal(err)
// }
// trace.Start(ftrace)
// defer trace.Stop()
optionParser := obioptions.GenerateOptionParser(obiuniq.OptionSet)
_, args, _ := optionParser(os.Args)
sequences, _ := obiconvert.ReadBioSequencesBatch(args...)
unique := obiuniq.Unique(sequences)
obiconvert.WriteBioSequencesBatch(unique, true)
}

91
pkg/obichunk/chunks.go Normal file
View File

@ -0,0 +1,91 @@
package obichunk
import (
"io/fs"
"io/ioutil"
"log"
"os"
"path/filepath"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
func tempDir() (string, error) {
dir, err := ioutil.TempDir(os.TempDir(), "obiseq_chunks_")
if err != nil {
return "", err
}
return dir, nil
}
func find(root, ext string) []string {
var a []string
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
if e != nil {
return e
}
if filepath.Ext(d.Name()) == ext {
a = append(a, s)
}
return nil
})
return a
}
func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) (obiseq.IBioSequenceBatch, error) {
dir, err := tempDir()
if err != nil {
return obiseq.NilIBioSequenceBatch, err
}
bufferSize := iterator.BufferSize()
if len(sizes) > 0 {
bufferSize = sizes[0]
}
newIter := obiseq.MakeIBioSequenceBatch(bufferSize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.Channel())
log.Println("====>> clear diectory")
os.RemoveAll(dir)
}()
go func() {
obiformats.WriterDispatcher(dir+"/chunk_%s.fastx",
iterator.Distribute(obiseq.HashClassifier(size)),
obiformats.WriteSequencesBatchToFile,
)
files := find(dir, ".fastx")
for order, file := range files {
iseq, err := obiformats.ReadSequencesBatchFromFile(file)
if err != nil {
panic(err)
}
chunck := make(obiseq.BioSequenceSlice, 0, 3*size)
for iseq.Next() {
b := iseq.Get()
chunck = append(chunck, b.Slice()...)
}
if len(chunck) > 0 {
newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...)
}
}
newIter.Done()
}()
return newIter, err
}

View File

@ -4,7 +4,6 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
@ -22,26 +21,28 @@ func WriterDispatcher(prototypename string,
jobDone.Add(1)
go func() {
n := int32(0)
for newflux := range dispatcher.News() {
jobDone.Add(1)
go func(newflux string) {
data, _ := dispatcher.Outputs(newflux)
data, err := dispatcher.Outputs(newflux)
if err != nil {
log.Fatalf("Cannot retreive the new chanel : %v", err)
}
out, err := formater(data,
fmt.Sprintf(prototypename, newflux),
options...)
if err != nil {
log.Fatalf("cannot open the output file for key %s", newflux)
}
atomic.AddInt32(&n, 1)
if atomic.LoadInt32(&n) > 1 {
jobDone.Add(1)
}
out.Recycle()
jobDone.Done()
}(newflux)
}
jobDone.Done()
}()
jobDone.Wait()

View File

@ -60,6 +60,13 @@ func WriteFasta(iterator obiseq.IBioSequence, file io.Writer, options ...WithOpt
fmt.Fprintln(file, FormatFasta(seq, header_format))
}
if opt.CloseFile() {
switch file := file.(type) {
case *os.File:
file.Close()
}
}
return nil
}
@ -74,10 +81,13 @@ func WriteFastaToFile(iterator obiseq.IBioSequence,
return err
}
options = append(options, OptionCloseFile())
return WriteFasta(iterator, file, options...)
}
func WriteFastaToStdout(iterator obiseq.IBioSequence, options ...WithOption) error {
options = append(options, OptionDontCloseFile())
return WriteFasta(iterator, os.Stdout, options...)
}
@ -105,6 +115,7 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options
time.Sleep(time.Millisecond)
}
close(newIter.Channel())
}()
ff := func(iterator obiseq.IBioSequenceBatch) {
@ -145,12 +156,21 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options
}
}
if opt.CloseFile() {
switch file := file.(type) {
case *os.File:
file.Close()
}
}
}()
return newIter, nil
}
func WriteFastaBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) {
options = append(options, OptionDontCloseFile())
return WriteFastaBatch(iterator, os.Stdout, options...)
}
@ -165,5 +185,7 @@ func WriteFastaBatchToFile(iterator obiseq.IBioSequenceBatch,
return obiseq.NilIBioSequenceBatch, err
}
options = append(options, OptionCloseFile())
return WriteFastaBatch(iterator, file, options...)
}

View File

@ -55,6 +55,13 @@ func WriteFastq(iterator obiseq.IBioSequence, file io.Writer, options ...WithOpt
fmt.Fprintln(file, FormatFastq(seq, quality, header_format))
}
if opt.CloseFile() {
switch file := file.(type) {
case *os.File:
file.Close()
}
}
return nil
}
@ -69,10 +76,12 @@ func WriteFastqToFile(iterator obiseq.IBioSequence,
return err
}
options = append(options, OptionCloseFile())
return WriteFastq(iterator, file, options...)
}
func WriteFastqToStdout(iterator obiseq.IBioSequence, options ...WithOption) error {
options = append(options, OptionDontCloseFile())
return WriteFastq(iterator, os.Stdout, options...)
}
@ -122,10 +131,10 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options
}
log.Println("Start of the fastq file writing")
go ff(iterator)
for i := 0; i < nwriters-1; i++ {
go ff(iterator.Split())
}
go ff(iterator)
next_to_send := 0
received := make(map[int]FileChunck, 100)
@ -147,12 +156,21 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options
}
}
if opt.CloseFile() {
switch file := file.(type) {
case *os.File:
file.Close()
}
}
}()
return newIter, nil
}
func WriteFastqBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) {
options = append(options, OptionDontCloseFile())
return WriteFastqBatch(iterator, os.Stdout, options...)
}
@ -167,5 +185,7 @@ func WriteFastqBatchToFile(iterator obiseq.IBioSequenceBatch,
return obiseq.NilIBioSequenceBatch, err
}
options = append(options, OptionCloseFile())
return WriteFastqBatch(iterator, file, options...)
}

View File

@ -1,6 +1,8 @@
package obiformats
import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
import (
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
type __options__ struct {
fastseq_header_parser obiseq.SeqAnnotator
@ -10,6 +12,7 @@ type __options__ struct {
batch_size int
quality_shift int
parallel_workers int
closefile bool
}
type Options struct {
@ -27,6 +30,7 @@ func MakeOptions(setters []WithOption) Options {
quality_shift: 33,
parallel_workers: 4,
batch_size: 5000,
closefile: false,
}
opt := Options{&o}
@ -66,6 +70,10 @@ func (opt Options) ProgressBar() bool {
return opt.pointer.with_progress_bar
}
func (opt Options) CloseFile() bool {
return opt.pointer.closefile
}
func OptionsBufferSize(size int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.buffer_size = size
@ -74,6 +82,22 @@ func OptionsBufferSize(size int) WithOption {
return f
}
func OptionCloseFile() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.closefile = true
})
return f
}
func OptionDontCloseFile() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.closefile = false
})
return f
}
// Allows to specify the ascii code corresponding to
// a quality of 0 in fastq encoded quality scores.
func OptionsQualityShift(shift int) WithOption {

View File

@ -64,6 +64,12 @@ func ReadSequencesBatchFromFile(filename string, options ...WithOption) (obiseq.
tag, _ := breader.Peek(30)
if len(tag) < 30 {
newIter := obiseq.MakeIBioSequenceBatch()
close(newIter.Channel())
return newIter, nil
}
filetype := GuessSeqFileType(string(tag))
log.Printf("File guessed format : %s (tag: %s)",
filetype, (strings.Split(string(tag), "\n"))[0])

View File

@ -89,6 +89,7 @@ func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch,
func WriteSequencesBatchToStdout(iterator obiseq.IBioSequenceBatch,
options ...WithOption) (obiseq.IBioSequenceBatch, error) {
options = append(options, OptionDontCloseFile())
return WriteSequenceBatch(iterator, os.Stdout, options...)
}
@ -103,5 +104,6 @@ func WriteSequencesBatchToFile(iterator obiseq.IBioSequenceBatch,
return obiseq.NilIBioSequenceBatch, err
}
options = append(options, OptionCloseFile())
return WriteSequenceBatch(iterator, file, options...)
}

View File

@ -156,15 +156,15 @@ func (iterator IBioSequenceBatch) Split() IBioSequenceBatch {
}
func (iterator IBioSequenceBatch) Next() bool {
if iterator.pointer.finished.IsSet() {
return false
}
if iterator.pointer.pushBack.IsSet() {
iterator.pointer.pushBack.UnSet()
return true
}
if iterator.pointer.finished.IsSet() {
return false
}
next, ok := (<-iterator.pointer.channel)
if ok {

View File

@ -25,7 +25,7 @@ func AnnotationClassifier(key string) SequenceClassifier {
return ""
}
return SequenceClassifier(f)
return f
}
var SampleClassifier = AnnotationClassifier("sample")
@ -39,7 +39,7 @@ func PredicateClassifier(predicate SequencePredicate) SequenceClassifier {
}
}
return SequenceClassifier(f)
return f
}
// Builds a classifier function based on CRC32 of the sequence
@ -50,7 +50,7 @@ func HashClassifier(size int) SequenceClassifier {
return strconv.Itoa(int(h))
}
return SequenceClassifier(f)
return f
}
func RotateClassifier(size int) SequenceClassifier {
@ -61,5 +61,5 @@ func RotateClassifier(size int) SequenceClassifier {
return strconv.Itoa(int(h))
}
return SequenceClassifier(f)
return f
}

View File

@ -62,7 +62,7 @@ func (iterator IBioSequenceBatch) Distribute(class SequenceClassifier, sizes ...
for iterator.Next() {
seqs := iterator.Get()
for _, s := range seqs.slice {
for _, s := range seqs.Slice() {
key := class(s)
slice, ok := slices[key]
@ -73,13 +73,14 @@ func (iterator IBioSequenceBatch) Distribute(class SequenceClassifier, sizes ...
orders[key] = 0
lock.Lock()
outputs[key] = MakeIBioSequenceBatch(batchsize, buffsize)
outputs[key] = MakeIBioSequenceBatch(buffsize)
lock.Unlock()
news <- key
}
*slice = append(*slice, s)
if len(*slice) == batchsize {
outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...)
orders[key]++

View File

@ -3,6 +3,7 @@ package obiseq
import (
"fmt"
"log"
"strings"
)
type StatsOnValues map[string]int
@ -98,9 +99,13 @@ func (sequence BioSequence) Merge(tomerge BioSequence, inplace bool, keys ...str
sequence = sequence.Copy()
}
if sequence.HasQualities() {
sequence.SetQualities(nil)
}
annotation := sequence.Annotations()
annotation["count"] = tomerge.Count() + sequence.Count()
count := tomerge.Count() + sequence.Count()
for _, key := range keys {
if tomerge.HasStatsOn(key) {
@ -112,5 +117,113 @@ func (sequence BioSequence) Merge(tomerge BioSequence, inplace bool, keys ...str
}
}
if tomerge.HasAnnotation() {
ma := tomerge.Annotations()
for k, va := range annotation {
if !strings.HasPrefix(k, "merged_") {
vm, ok := ma[k]
if !ok || vm != va {
delete(annotation, k)
}
}
}
} else {
for k := range annotation {
if !strings.HasPrefix(k, "merged_") {
delete(annotation, k)
}
}
}
annotation["count"] = count
return sequence
}
func (sequences BioSequenceSlice) Unique(statsOn []string, keys ...string) BioSequenceSlice {
uniq := make(map[string]*BioSequenceSlice, len(sequences))
nVariant := 0
for _, seq := range sequences {
sstring := seq.String()
pgroup, ok := uniq[sstring]
if !ok {
group := make(BioSequenceSlice, 0, 10)
pgroup = &group
uniq[sstring] = pgroup
}
ok = false
i := 0
var s BioSequence
for i, s = range *pgroup {
ok = true
switch {
case seq.HasAnnotation() && s.HasAnnotation():
for _, k := range keys {
seqV, seqOk := seq.Annotations()[k]
sV, sOk := s.Annotations()[k]
ok = ok && ((!seqOk && !sOk) || ((seqOk && sOk) && (seqV == sV)))
if !ok {
break
}
}
case seq.HasAnnotation() && !s.HasAnnotation():
for _, k := range keys {
_, seqOk := seq.Annotations()[k]
ok = ok && !seqOk
if !ok {
break
}
}
case !seq.HasAnnotation() && s.HasAnnotation():
for _, k := range keys {
_, sOk := s.Annotations()[k]
ok = ok && !sOk
if !ok {
break
}
}
default:
ok = true
}
if ok {
break
}
}
if ok {
(*pgroup)[i] = s.Merge(seq, true, statsOn...)
} else {
seq.SetQualities(nil)
if seq.Count() == 1 {
seq.Annotations()["count"] = 1
}
*pgroup = append(*pgroup, seq)
nVariant++
}
}
output := make(BioSequenceSlice, 0, nVariant)
for _, seqs := range uniq {
output = append(output, *seqs...)
}
return output
}
func UniqueSliceWorker(statsOn []string, keys ...string) SeqSliceWorker {
worker := func(sequences BioSequenceSlice) BioSequenceSlice {
return sequences.Unique(statsOn, keys...)
}
return worker
}

View File

@ -0,0 +1,36 @@
package obiuniq
import (
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert"
"github.com/DavidGamba/go-getoptions"
)
var _StatsOn = make([]string, 0, 10)
var _Keys = make([]string, 0, 10)
func UniqueOptionSet(options *getoptions.GetOpt) {
options.StringSliceVar(&_StatsOn, "merge",
1, 1000,
options.Alias("m"),
options.Description("Adds a merged attribute containing the list of sequence record ids merged within this group."))
options.StringSliceVar(&_Keys, "category-attribute",
1, 1000,
options.Alias("c"),
options.Description("Adds one attribute to the list of attributes used to define sequence groups (this option can be used several times)."))
}
// OptionSet adds to the basic option set every options declared for
// the obipcr command
func OptionSet(options *getoptions.GetOpt) {
obiconvert.OptionSet(options)
UniqueOptionSet(options)
}
func CLIStatsOn() []string {
return _StatsOn
}
func CLIKeys() []string {
return _Keys
}

View File

@ -0,0 +1,28 @@
package obiuniq
import (
"log"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obichunk"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch {
newIter, err := obichunk.ISequenceChunk(sequences, 100, 2)
if err != nil {
log.Fatalf("error in spliting the dataset : %v", err)
}
statsOn := CLIStatsOn()
keys := CLIKeys()
parallelWorkers := obioptions.CLIParallelWorkers()
buffSize := obioptions.CLIBufferSize()
newIter = newIter.MakeISliceWorker(obiseq.UniqueSliceWorker(statsOn, keys...),
parallelWorkers, buffSize)
return newIter
}