From 3586ecc483d248b271f8c9ad6a9962c4439fff2b Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 15 Feb 2022 00:47:02 +0100 Subject: [PATCH] second version of obidistribute and a first buggy version of obiuniq --- .gitignore | 1 + cmd/obitools/obiuniq/main.go | 36 ++++++++ pkg/obichunk/chunks.go | 91 ++++++++++++++++++++ pkg/obiformats/dispatcher.go | 17 ++-- pkg/obiformats/fastseq_write_fasta.go | 22 +++++ pkg/obiformats/fastseq_write_fastq.go | 22 ++++- pkg/obiformats/options.go | 26 +++++- pkg/obiformats/universal_read.go | 6 ++ pkg/obiformats/universal_write.go | 2 + pkg/obiseq/batchiterator.go | 8 +- pkg/obiseq/class.go | 8 +- pkg/obiseq/distribute.go | 5 +- pkg/obiseq/merge.go | 115 +++++++++++++++++++++++++- pkg/obitools/obiuniq/options.go | 36 ++++++++ pkg/obitools/obiuniq/unique.go | 28 +++++++ 15 files changed, 402 insertions(+), 21 deletions(-) create mode 100644 cmd/obitools/obiuniq/main.go create mode 100644 pkg/obichunk/chunks.go create mode 100644 pkg/obitools/obiuniq/options.go create mode 100644 pkg/obitools/obiuniq/unique.go diff --git a/.gitignore b/.gitignore index c7715b4..bf66a58 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ vendor /obipcr /obifind /obidistribute +/obiuniq diff --git a/cmd/obitools/obiuniq/main.go b/cmd/obitools/obiuniq/main.go new file mode 100644 index 0000000..b63fea5 --- /dev/null +++ b/cmd/obitools/obiuniq/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "os" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiuniq" +) + +func main() { + + // go tool pprof -http=":8000" ./obipairing ./cpu.pprof + // f, err := os.Create("cpu.pprof") + // if err != nil { + // log.Fatal(err) + // } + // pprof.StartCPUProfile(f) + // defer pprof.StopCPUProfile() + + // go tool trace cpu.trace + // ftrace, err := os.Create("cpu.trace") + // if err != nil { + // log.Fatal(err) + // } + // trace.Start(ftrace) + // defer trace.Stop() + + optionParser := obioptions.GenerateOptionParser(obiuniq.OptionSet) + + _, args, _ := optionParser(os.Args) + + sequences, _ := obiconvert.ReadBioSequencesBatch(args...) + unique := obiuniq.Unique(sequences) + obiconvert.WriteBioSequencesBatch(unique, true) +} diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go new file mode 100644 index 0000000..7f0d2ab --- /dev/null +++ b/pkg/obichunk/chunks.go @@ -0,0 +1,91 @@ +package obichunk + +import ( + "io/fs" + "io/ioutil" + "log" + "os" + "path/filepath" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +func tempDir() (string, error) { + dir, err := ioutil.TempDir(os.TempDir(), "obiseq_chunks_") + if err != nil { + return "", err + } + return dir, nil +} + +func find(root, ext string) []string { + var a []string + filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error { + if e != nil { + return e + } + if filepath.Ext(d.Name()) == ext { + a = append(a, s) + } + return nil + }) + return a +} + +func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) (obiseq.IBioSequenceBatch, error) { + dir, err := tempDir() + if err != nil { + return obiseq.NilIBioSequenceBatch, err + } + + bufferSize := iterator.BufferSize() + + if len(sizes) > 0 { + bufferSize = sizes[0] + } + + newIter := obiseq.MakeIBioSequenceBatch(bufferSize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.Channel()) + log.Println("====>> clear diectory") + os.RemoveAll(dir) + }() + + go func() { + obiformats.WriterDispatcher(dir+"/chunk_%s.fastx", + iterator.Distribute(obiseq.HashClassifier(size)), + obiformats.WriteSequencesBatchToFile, + ) + + files := find(dir, ".fastx") + + for order, file := range files { + iseq, err := obiformats.ReadSequencesBatchFromFile(file) + + if err != nil { + panic(err) + } + + chunck := make(obiseq.BioSequenceSlice, 0, 3*size) + + for iseq.Next() { + b := iseq.Get() + chunck = append(chunck, b.Slice()...) + } + + if len(chunck) > 0 { + newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...) + } + + } + + newIter.Done() + }() + + return newIter, err +} diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go index 1ebd038..b069de0 100644 --- a/pkg/obiformats/dispatcher.go +++ b/pkg/obiformats/dispatcher.go @@ -4,7 +4,6 @@ import ( "fmt" "log" "sync" - "sync/atomic" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -22,26 +21,28 @@ func WriterDispatcher(prototypename string, jobDone.Add(1) go func() { - n := int32(0) for newflux := range dispatcher.News() { + jobDone.Add(1) go func(newflux string) { - data, _ := dispatcher.Outputs(newflux) + data, err := dispatcher.Outputs(newflux) + + if err != nil { + log.Fatalf("Cannot retreive the new chanel : %v", err) + } + out, err := formater(data, fmt.Sprintf(prototypename, newflux), options...) + if err != nil { log.Fatalf("cannot open the output file for key %s", newflux) } - atomic.AddInt32(&n, 1) - - if atomic.LoadInt32(&n) > 1 { - jobDone.Add(1) - } out.Recycle() jobDone.Done() }(newflux) } + jobDone.Done() }() jobDone.Wait() diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 9171534..4aeadee 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -60,6 +60,13 @@ func WriteFasta(iterator obiseq.IBioSequence, file io.Writer, options ...WithOpt fmt.Fprintln(file, FormatFasta(seq, header_format)) } + if opt.CloseFile() { + switch file := file.(type) { + case *os.File: + file.Close() + } + } + return nil } @@ -74,10 +81,13 @@ func WriteFastaToFile(iterator obiseq.IBioSequence, return err } + options = append(options, OptionCloseFile()) + return WriteFasta(iterator, file, options...) } func WriteFastaToStdout(iterator obiseq.IBioSequence, options ...WithOption) error { + options = append(options, OptionDontCloseFile()) return WriteFasta(iterator, os.Stdout, options...) } @@ -105,6 +115,7 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options time.Sleep(time.Millisecond) } close(newIter.Channel()) + }() ff := func(iterator obiseq.IBioSequenceBatch) { @@ -145,12 +156,21 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } } + + if opt.CloseFile() { + switch file := file.(type) { + case *os.File: + file.Close() + } + } + }() return newIter, nil } func WriteFastaBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { + options = append(options, OptionDontCloseFile()) return WriteFastaBatch(iterator, os.Stdout, options...) } @@ -165,5 +185,7 @@ func WriteFastaBatchToFile(iterator obiseq.IBioSequenceBatch, return obiseq.NilIBioSequenceBatch, err } + options = append(options, OptionCloseFile()) + return WriteFastaBatch(iterator, file, options...) } diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 620bc53..c2a9b9d 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -55,6 +55,13 @@ func WriteFastq(iterator obiseq.IBioSequence, file io.Writer, options ...WithOpt fmt.Fprintln(file, FormatFastq(seq, quality, header_format)) } + if opt.CloseFile() { + switch file := file.(type) { + case *os.File: + file.Close() + } + } + return nil } @@ -69,10 +76,12 @@ func WriteFastqToFile(iterator obiseq.IBioSequence, return err } + options = append(options, OptionCloseFile()) return WriteFastq(iterator, file, options...) } func WriteFastqToStdout(iterator obiseq.IBioSequence, options ...WithOption) error { + options = append(options, OptionDontCloseFile()) return WriteFastq(iterator, os.Stdout, options...) } @@ -122,10 +131,10 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } log.Println("Start of the fastq file writing") + go ff(iterator) for i := 0; i < nwriters-1; i++ { go ff(iterator.Split()) } - go ff(iterator) next_to_send := 0 received := make(map[int]FileChunck, 100) @@ -147,12 +156,21 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } } + + if opt.CloseFile() { + switch file := file.(type) { + case *os.File: + file.Close() + } + } + }() return newIter, nil } func WriteFastqBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { + options = append(options, OptionDontCloseFile()) return WriteFastqBatch(iterator, os.Stdout, options...) } @@ -167,5 +185,7 @@ func WriteFastqBatchToFile(iterator obiseq.IBioSequenceBatch, return obiseq.NilIBioSequenceBatch, err } + options = append(options, OptionCloseFile()) + return WriteFastqBatch(iterator, file, options...) } diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 089d902..41359c7 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -1,6 +1,8 @@ package obiformats -import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +import ( + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) type __options__ struct { fastseq_header_parser obiseq.SeqAnnotator @@ -10,6 +12,7 @@ type __options__ struct { batch_size int quality_shift int parallel_workers int + closefile bool } type Options struct { @@ -27,6 +30,7 @@ func MakeOptions(setters []WithOption) Options { quality_shift: 33, parallel_workers: 4, batch_size: 5000, + closefile: false, } opt := Options{&o} @@ -66,6 +70,10 @@ func (opt Options) ProgressBar() bool { return opt.pointer.with_progress_bar } +func (opt Options) CloseFile() bool { + return opt.pointer.closefile +} + func OptionsBufferSize(size int) WithOption { f := WithOption(func(opt Options) { opt.pointer.buffer_size = size @@ -74,6 +82,22 @@ func OptionsBufferSize(size int) WithOption { return f } +func OptionCloseFile() WithOption { + f := WithOption(func(opt Options) { + opt.pointer.closefile = true + }) + + return f +} + +func OptionDontCloseFile() WithOption { + f := WithOption(func(opt Options) { + opt.pointer.closefile = false + }) + + return f +} + // Allows to specify the ascii code corresponding to // a quality of 0 in fastq encoded quality scores. func OptionsQualityShift(shift int) WithOption { diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index c4bdaa2..e10911b 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -64,6 +64,12 @@ func ReadSequencesBatchFromFile(filename string, options ...WithOption) (obiseq. tag, _ := breader.Peek(30) + if len(tag) < 30 { + newIter := obiseq.MakeIBioSequenceBatch() + close(newIter.Channel()) + return newIter, nil + } + filetype := GuessSeqFileType(string(tag)) log.Printf("File guessed format : %s (tag: %s)", filetype, (strings.Split(string(tag), "\n"))[0]) diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index 63f7630..109bfaa 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -89,6 +89,7 @@ func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch, func WriteSequencesBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) { + options = append(options, OptionDontCloseFile()) return WriteSequenceBatch(iterator, os.Stdout, options...) } @@ -103,5 +104,6 @@ func WriteSequencesBatchToFile(iterator obiseq.IBioSequenceBatch, return obiseq.NilIBioSequenceBatch, err } + options = append(options, OptionCloseFile()) return WriteSequenceBatch(iterator, file, options...) } diff --git a/pkg/obiseq/batchiterator.go b/pkg/obiseq/batchiterator.go index 42e5cfa..7914adc 100644 --- a/pkg/obiseq/batchiterator.go +++ b/pkg/obiseq/batchiterator.go @@ -156,15 +156,15 @@ func (iterator IBioSequenceBatch) Split() IBioSequenceBatch { } func (iterator IBioSequenceBatch) Next() bool { - if iterator.pointer.finished.IsSet() { - return false - } - if iterator.pointer.pushBack.IsSet() { iterator.pointer.pushBack.UnSet() return true } + if iterator.pointer.finished.IsSet() { + return false + } + next, ok := (<-iterator.pointer.channel) if ok { diff --git a/pkg/obiseq/class.go b/pkg/obiseq/class.go index c79b99a..f8414fa 100644 --- a/pkg/obiseq/class.go +++ b/pkg/obiseq/class.go @@ -25,7 +25,7 @@ func AnnotationClassifier(key string) SequenceClassifier { return "" } - return SequenceClassifier(f) + return f } var SampleClassifier = AnnotationClassifier("sample") @@ -39,7 +39,7 @@ func PredicateClassifier(predicate SequencePredicate) SequenceClassifier { } } - return SequenceClassifier(f) + return f } // Builds a classifier function based on CRC32 of the sequence @@ -50,7 +50,7 @@ func HashClassifier(size int) SequenceClassifier { return strconv.Itoa(int(h)) } - return SequenceClassifier(f) + return f } func RotateClassifier(size int) SequenceClassifier { @@ -61,5 +61,5 @@ func RotateClassifier(size int) SequenceClassifier { return strconv.Itoa(int(h)) } - return SequenceClassifier(f) + return f } diff --git a/pkg/obiseq/distribute.go b/pkg/obiseq/distribute.go index 81063a5..5f0c26b 100644 --- a/pkg/obiseq/distribute.go +++ b/pkg/obiseq/distribute.go @@ -62,7 +62,7 @@ func (iterator IBioSequenceBatch) Distribute(class SequenceClassifier, sizes ... for iterator.Next() { seqs := iterator.Get() - for _, s := range seqs.slice { + for _, s := range seqs.Slice() { key := class(s) slice, ok := slices[key] @@ -73,13 +73,14 @@ func (iterator IBioSequenceBatch) Distribute(class SequenceClassifier, sizes ... orders[key] = 0 lock.Lock() - outputs[key] = MakeIBioSequenceBatch(batchsize, buffsize) + outputs[key] = MakeIBioSequenceBatch(buffsize) lock.Unlock() news <- key } *slice = append(*slice, s) + if len(*slice) == batchsize { outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...) orders[key]++ diff --git a/pkg/obiseq/merge.go b/pkg/obiseq/merge.go index d7cabcf..27f47b3 100644 --- a/pkg/obiseq/merge.go +++ b/pkg/obiseq/merge.go @@ -3,6 +3,7 @@ package obiseq import ( "fmt" "log" + "strings" ) type StatsOnValues map[string]int @@ -98,9 +99,13 @@ func (sequence BioSequence) Merge(tomerge BioSequence, inplace bool, keys ...str sequence = sequence.Copy() } + if sequence.HasQualities() { + sequence.SetQualities(nil) + } + annotation := sequence.Annotations() - annotation["count"] = tomerge.Count() + sequence.Count() + count := tomerge.Count() + sequence.Count() for _, key := range keys { if tomerge.HasStatsOn(key) { @@ -112,5 +117,113 @@ func (sequence BioSequence) Merge(tomerge BioSequence, inplace bool, keys ...str } } + if tomerge.HasAnnotation() { + ma := tomerge.Annotations() + for k, va := range annotation { + if !strings.HasPrefix(k, "merged_") { + vm, ok := ma[k] + if !ok || vm != va { + delete(annotation, k) + } + } + } + } else { + for k := range annotation { + if !strings.HasPrefix(k, "merged_") { + delete(annotation, k) + } + } + } + + annotation["count"] = count + return sequence } + +func (sequences BioSequenceSlice) Unique(statsOn []string, keys ...string) BioSequenceSlice { + uniq := make(map[string]*BioSequenceSlice, len(sequences)) + nVariant := 0 + + for _, seq := range sequences { + + sstring := seq.String() + pgroup, ok := uniq[sstring] + + if !ok { + group := make(BioSequenceSlice, 0, 10) + pgroup = &group + uniq[sstring] = pgroup + } + + ok = false + i := 0 + var s BioSequence + + for i, s = range *pgroup { + ok = true + switch { + case seq.HasAnnotation() && s.HasAnnotation(): + for _, k := range keys { + seqV, seqOk := seq.Annotations()[k] + sV, sOk := s.Annotations()[k] + + ok = ok && ((!seqOk && !sOk) || ((seqOk && sOk) && (seqV == sV))) + + if !ok { + break + } + } + case seq.HasAnnotation() && !s.HasAnnotation(): + for _, k := range keys { + _, seqOk := seq.Annotations()[k] + ok = ok && !seqOk + if !ok { + break + } + } + case !seq.HasAnnotation() && s.HasAnnotation(): + for _, k := range keys { + _, sOk := s.Annotations()[k] + ok = ok && !sOk + if !ok { + break + } + } + default: + ok = true + } + + if ok { + break + } + } + + if ok { + (*pgroup)[i] = s.Merge(seq, true, statsOn...) + } else { + seq.SetQualities(nil) + if seq.Count() == 1 { + seq.Annotations()["count"] = 1 + } + *pgroup = append(*pgroup, seq) + nVariant++ + } + + } + + output := make(BioSequenceSlice, 0, nVariant) + for _, seqs := range uniq { + output = append(output, *seqs...) + } + + return output +} + +func UniqueSliceWorker(statsOn []string, keys ...string) SeqSliceWorker { + + worker := func(sequences BioSequenceSlice) BioSequenceSlice { + return sequences.Unique(statsOn, keys...) + } + + return worker +} diff --git a/pkg/obitools/obiuniq/options.go b/pkg/obitools/obiuniq/options.go new file mode 100644 index 0000000..4a953d9 --- /dev/null +++ b/pkg/obitools/obiuniq/options.go @@ -0,0 +1,36 @@ +package obiuniq + +import ( + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" + "github.com/DavidGamba/go-getoptions" +) + +var _StatsOn = make([]string, 0, 10) +var _Keys = make([]string, 0, 10) + +func UniqueOptionSet(options *getoptions.GetOpt) { + options.StringSliceVar(&_StatsOn, "merge", + 1, 1000, + options.Alias("m"), + options.Description("Adds a merged attribute containing the list of sequence record ids merged within this group.")) + options.StringSliceVar(&_Keys, "category-attribute", + 1, 1000, + options.Alias("c"), + options.Description("Adds one attribute to the list of attributes used to define sequence groups (this option can be used several times).")) + +} + +// OptionSet adds to the basic option set every options declared for +// the obipcr command +func OptionSet(options *getoptions.GetOpt) { + obiconvert.OptionSet(options) + UniqueOptionSet(options) +} + +func CLIStatsOn() []string { + return _StatsOn +} + +func CLIKeys() []string { + return _Keys +} diff --git a/pkg/obitools/obiuniq/unique.go b/pkg/obitools/obiuniq/unique.go new file mode 100644 index 0000000..3cfb7be --- /dev/null +++ b/pkg/obitools/obiuniq/unique.go @@ -0,0 +1,28 @@ +package obiuniq + +import ( + "log" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obichunk" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch { + + newIter, err := obichunk.ISequenceChunk(sequences, 100, 2) + + if err != nil { + log.Fatalf("error in spliting the dataset : %v", err) + } + + statsOn := CLIStatsOn() + keys := CLIKeys() + parallelWorkers := obioptions.CLIParallelWorkers() + buffSize := obioptions.CLIBufferSize() + + newIter = newIter.MakeISliceWorker(obiseq.UniqueSliceWorker(statsOn, keys...), + parallelWorkers, buffSize) + + return newIter +}