From eb32620bb39966cb1ebf2c75584d45f03b245180 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 14 Feb 2022 00:01:01 +0100 Subject: [PATCH] Adds a first version of a new obidistribute command --- cmd/obitools/obidistribute/main.go | 19 ++++ go.mod | 1 + go.sum | 2 + pkg/goutils/goutils.go | 12 +++ pkg/obiformats/dispatcher.go | 48 ++++++++++ pkg/obiformats/fastseq_write_fastq.go | 2 +- pkg/obiseq/batchiterator.go | 114 +++++++++++++++------- pkg/obiseq/class.go | 65 +++++++++++++ pkg/obiseq/distribute.go | 107 +++++++++++++++++++++ pkg/obiseq/merge.go | 116 +++++++++++++++++++++++ pkg/obitools/obidistribute/distribute.go | 57 +++++++++++ pkg/obitools/obidistribute/options.go | 58 ++++++++++++ pkg/obitools/obipairing/options.go | 2 + 13 files changed, 567 insertions(+), 36 deletions(-) create mode 100644 cmd/obitools/obidistribute/main.go create mode 100644 pkg/obiformats/dispatcher.go create mode 100644 pkg/obiseq/class.go create mode 100644 pkg/obiseq/distribute.go create mode 100644 pkg/obiseq/merge.go create mode 100644 pkg/obitools/obidistribute/distribute.go create mode 100644 pkg/obitools/obidistribute/options.go diff --git a/cmd/obitools/obidistribute/main.go b/cmd/obitools/obidistribute/main.go new file mode 100644 index 0000000..7f1b094 --- /dev/null +++ b/cmd/obitools/obidistribute/main.go @@ -0,0 +1,19 @@ +package main + +import ( + "os" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obidistribute" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" +) + +func main() { + optionParser := obioptions.GenerateOptionParser(obidistribute.OptionSet) + + _, args, _ := optionParser(os.Args) + + fs, _ := obiconvert.ReadBioSequencesBatch(args...) + obidistribute.DistributeSequence(fs) +} diff --git a/go.mod b/go.mod index 24e0aa7..a833265 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/DavidGamba/go-getoptions v0.25.3 github.com/goccy/go-json v0.9.4 github.com/schollz/progressbar/v3 v3.8.6 + github.com/tevino/abool/v2 v2.0.1 ) require ( diff --git a/go.sum b/go.sum index e318311..af63a02 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/schollz/progressbar/v3 v3.8.6/go.mod h1:W5IEwbJecncFGBvuEh4A7HT1nZZ6W github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tevino/abool/v2 v2.0.1 h1:OF7FC5V5z3yAWyixbc32ecEzrgAJCsPkVOsPM2qoZPI= +github.com/tevino/abool/v2 v2.0.1/go.mod h1:+Lmlqk6bHDWHqN1cbxqhwEAwMPXgc8I1SDEamtseuXY= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= diff --git a/pkg/goutils/goutils.go b/pkg/goutils/goutils.go index c2309ea..468bdae 100644 --- a/pkg/goutils/goutils.go +++ b/pkg/goutils/goutils.go @@ -54,6 +54,18 @@ func InterfaceToInt(i interface{}) (val int, err error) { return } +func IsAnInt(i interface{}) bool { + + switch i.(type) { + case int, + int8, int16, int32, int64, + uint8, uint16, uint32, uint64: + return true + default: + return false + } +} + // CopyMap makes a deep copy of a map[string]interface{}. func CopyMap(dest, src map[string]interface{}) { buf := new(bytes.Buffer) diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go new file mode 100644 index 0000000..a47d4ee --- /dev/null +++ b/pkg/obiformats/dispatcher.go @@ -0,0 +1,48 @@ +package obiformats + +import ( + "fmt" + "log" + "sync" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +type SequenceBatchWriterToFile func(iterator obiseq.IBioSequenceBatch, + filename string, + options ...WithOption) (obiseq.IBioSequenceBatch, error) + +func WriterDispatcher(prototypename string, + dispatcher obiseq.IDistribute, + formater SequenceBatchWriterToFile, + options ...WithOption) { + + jobDone := sync.WaitGroup{} + jobDone.Add(1) + + go func() { + n := 0 + for newflux := range dispatcher.News() { + data, _ := dispatcher.Outputs(newflux) + out, err := formater(data, + fmt.Sprintf(prototypename, newflux), + options...) + if err != nil { + log.Fatalf("cannot open the output file for key %s", newflux) + } + + n++ + + if n > 1 { + jobDone.Add(1) + } + + go func() { + out.Recycle() + jobDone.Done() + }() + } + }() + + jobDone.Wait() +} diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index a28575a..620bc53 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -122,10 +122,10 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options } log.Println("Start of the fastq file writing") - go ff(iterator) for i := 0; i < nwriters-1; i++ { go ff(iterator.Split()) } + go ff(iterator) next_to_send := 0 received := make(map[int]FileChunck, 100) diff --git a/pkg/obiseq/batchiterator.go b/pkg/obiseq/batchiterator.go index 18b4c72..42e5cfa 100644 --- a/pkg/obiseq/batchiterator.go +++ b/pkg/obiseq/batchiterator.go @@ -1,8 +1,12 @@ package obiseq import ( + "fmt" "log" "sync" + "sync/atomic" + + "github.com/tevino/abool/v2" ) type BioSequenceBatch struct { @@ -36,40 +40,45 @@ func (batch BioSequenceBatch) IsNil() bool { // Structure implementing an iterator over bioseq.BioSequenceBatch // based on a channel. -type __ibiosequencebatch__ struct { - channel chan BioSequenceBatch - current BioSequenceBatch - pushBack bool - all_done *sync.WaitGroup - buffer_size int - finished bool - p_finished *bool +type _IBioSequenceBatch struct { + channel chan BioSequenceBatch + current BioSequenceBatch + pushBack *abool.AtomicBool + all_done *sync.WaitGroup + lock *sync.RWMutex + buffer_size int32 + batch_size int32 + sequence_format string + finished *abool.AtomicBool } type IBioSequenceBatch struct { - pointer *__ibiosequencebatch__ + pointer *_IBioSequenceBatch } var NilIBioSequenceBatch = IBioSequenceBatch{pointer: nil} func MakeIBioSequenceBatch(sizes ...int) IBioSequenceBatch { - buffsize := 1 + buffsize := int32(1) if len(sizes) > 0 { - buffsize = sizes[0] + buffsize = int32(sizes[0]) } - i := __ibiosequencebatch__{ - channel: make(chan BioSequenceBatch, buffsize), - current: NilBioSequenceBatch, - pushBack: false, - buffer_size: buffsize, - finished: false, - p_finished: nil, + i := _IBioSequenceBatch{ + channel: make(chan BioSequenceBatch, buffsize), + current: NilBioSequenceBatch, + pushBack: abool.New(), + buffer_size: buffsize, + batch_size: -1, + sequence_format: "", + finished: abool.New(), } - i.p_finished = &i.finished + waiting := sync.WaitGroup{} i.all_done = &waiting + lock := sync.RWMutex{} + i.lock = &lock ii := IBioSequenceBatch{&i} return ii } @@ -82,6 +91,22 @@ func (iterator IBioSequenceBatch) Done() { iterator.pointer.all_done.Done() } +func (iterator IBioSequenceBatch) Unlock() { + iterator.pointer.lock.Unlock() +} + +func (iterator IBioSequenceBatch) Lock() { + iterator.pointer.lock.Lock() +} + +func (iterator IBioSequenceBatch) RLock() { + iterator.pointer.lock.RLock() +} + +func (iterator IBioSequenceBatch) RUnlock() { + iterator.pointer.lock.RUnlock() +} + func (iterator IBioSequenceBatch) Wait() { iterator.pointer.all_done.Wait() } @@ -95,29 +120,48 @@ func (iterator IBioSequenceBatch) IsNil() bool { } func (iterator IBioSequenceBatch) BufferSize() int { - return iterator.pointer.buffer_size + return int(atomic.LoadInt32(&iterator.pointer.buffer_size)) +} + +func (iterator IBioSequenceBatch) BatchSize() int { + return int(atomic.LoadInt32(&iterator.pointer.batch_size)) +} + +func (iterator IBioSequenceBatch) SetBatchSize(size int) error { + if size >= 0 { + atomic.StoreInt32(&iterator.pointer.batch_size, int32(size)) + return nil + } + + return fmt.Errorf("size (%d) cannot be negative", size) } func (iterator IBioSequenceBatch) Split() IBioSequenceBatch { - i := __ibiosequencebatch__{ - channel: iterator.pointer.channel, - current: NilBioSequenceBatch, - pushBack: false, - all_done: iterator.pointer.all_done, - buffer_size: iterator.pointer.buffer_size, - finished: false, - p_finished: iterator.pointer.p_finished} + iterator.pointer.lock.RLock() + defer iterator.pointer.lock.RUnlock() + i := _IBioSequenceBatch{ + channel: iterator.pointer.channel, + current: NilBioSequenceBatch, + pushBack: abool.New(), + all_done: iterator.pointer.all_done, + buffer_size: iterator.pointer.buffer_size, + batch_size: iterator.pointer.batch_size, + sequence_format: iterator.pointer.sequence_format, + finished: iterator.pointer.finished} + lock := sync.RWMutex{} + i.lock = &lock + newIter := IBioSequenceBatch{&i} return newIter } func (iterator IBioSequenceBatch) Next() bool { - if *(iterator.pointer.p_finished) { + if iterator.pointer.finished.IsSet() { return false } - if iterator.pointer.pushBack { - iterator.pointer.pushBack = false + if iterator.pointer.pushBack.IsSet() { + iterator.pointer.pushBack.UnSet() return true } @@ -129,13 +173,13 @@ func (iterator IBioSequenceBatch) Next() bool { } iterator.pointer.current = NilBioSequenceBatch - *iterator.pointer.p_finished = true + iterator.pointer.finished.Set() return false } func (iterator IBioSequenceBatch) PushBack() { if !iterator.pointer.current.IsNil() { - iterator.pointer.pushBack = true + iterator.pointer.pushBack.Set() } } @@ -150,7 +194,7 @@ func (iterator IBioSequenceBatch) Get() BioSequenceBatch { // Finished returns 'true' value if no more data is available // from the iterator. func (iterator IBioSequenceBatch) Finished() bool { - return *iterator.pointer.p_finished + return iterator.pointer.finished.IsSet() } func (iterator IBioSequenceBatch) IBioSequence(sizes ...int) IBioSequence { @@ -378,7 +422,7 @@ func (iterator IBioSequenceBatch) DivideOn(predicate SequencePredicate, buffsize := iterator.BufferSize() if len(sizes) > 0 { - buffsize = sizes[1] + buffsize = sizes[0] } trueIter := MakeIBioSequenceBatch(buffsize) diff --git a/pkg/obiseq/class.go b/pkg/obiseq/class.go new file mode 100644 index 0000000..c79b99a --- /dev/null +++ b/pkg/obiseq/class.go @@ -0,0 +1,65 @@ +package obiseq + +import ( + "fmt" + "hash/crc32" + "strconv" +) + +type SequenceClassifier func(sequence BioSequence) string + +func AnnotationClassifier(key string) SequenceClassifier { + f := func(sequence BioSequence) string { + if sequence.HasAnnotation() { + value, ok := sequence.Annotations()[key] + + if ok { + switch value := value.(type) { + case string: + return value + default: + return fmt.Sprint(value) + } + } + } + return "" + } + + return SequenceClassifier(f) +} + +var SampleClassifier = AnnotationClassifier("sample") + +func PredicateClassifier(predicate SequencePredicate) SequenceClassifier { + f := func(sequence BioSequence) string { + if predicate(sequence) { + return "true" + } else { + return "false" + } + } + + return SequenceClassifier(f) +} + +// Builds a classifier function based on CRC32 of the sequence +// +func HashClassifier(size int) SequenceClassifier { + f := func(sequence BioSequence) string { + h := crc32.ChecksumIEEE(sequence.Sequence()) % uint32(size) + return strconv.Itoa(int(h)) + } + + return SequenceClassifier(f) +} + +func RotateClassifier(size int) SequenceClassifier { + n := 0 + f := func(sequence BioSequence) string { + h := n % size + n++ + return strconv.Itoa(int(h)) + } + + return SequenceClassifier(f) +} diff --git a/pkg/obiseq/distribute.go b/pkg/obiseq/distribute.go new file mode 100644 index 0000000..81063a5 --- /dev/null +++ b/pkg/obiseq/distribute.go @@ -0,0 +1,107 @@ +package obiseq + +import ( + "fmt" + "sync" +) + +type IDistribute struct { + outputs map[string]IBioSequenceBatch + news chan string + lock *sync.Mutex +} + +func (dist *IDistribute) Outputs(key string) (IBioSequenceBatch, error) { + dist.lock.Lock() + iter, ok := dist.outputs[key] + dist.lock.Unlock() + + if !ok { + return NilIBioSequenceBatch, fmt.Errorf("key %s unknown", key) + } + + return iter, nil +} + +func (dist *IDistribute) News() chan string { + return dist.news +} + +func (iterator IBioSequenceBatch) Distribute(class SequenceClassifier, sizes ...int) IDistribute { + batchsize := 5000 + buffsize := 2 + + outputs := make(map[string]IBioSequenceBatch, 100) + slices := make(map[string]*BioSequenceSlice, 100) + orders := make(map[string]int, 100) + news := make(chan string) + + if len(sizes) > 0 { + batchsize = sizes[0] + } + + if len(sizes) > 1 { + buffsize = sizes[1] + } + + jobDone := sync.WaitGroup{} + lock := sync.Mutex{} + + jobDone.Add(1) + + go func() { + jobDone.Wait() + close(news) + for _, i := range outputs { + close(i.Channel()) + } + }() + + go func() { + iterator = iterator.SortBatches() + + for iterator.Next() { + seqs := iterator.Get() + for _, s := range seqs.slice { + key := class(s) + slice, ok := slices[key] + + if !ok { + s := make(BioSequenceSlice, 0, batchsize) + slice = &s + slices[key] = slice + orders[key] = 0 + + lock.Lock() + outputs[key] = MakeIBioSequenceBatch(batchsize, buffsize) + lock.Unlock() + + news <- key + } + + *slice = append(*slice, s) + if len(*slice) == batchsize { + outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...) + orders[key]++ + s := make(BioSequenceSlice, 0, batchsize) + slices[key] = &s + } + } + } + + for key, slice := range slices { + if len(*slice) > 0 { + outputs[key].Channel() <- MakeBioSequenceBatch(orders[key], *slice...) + } + } + + jobDone.Done() + + }() + + return IDistribute{ + outputs, + news, + &lock} + +} diff --git a/pkg/obiseq/merge.go b/pkg/obiseq/merge.go new file mode 100644 index 0000000..d7cabcf --- /dev/null +++ b/pkg/obiseq/merge.go @@ -0,0 +1,116 @@ +package obiseq + +import ( + "fmt" + "log" +) + +type StatsOnValues map[string]int + +func (sequence BioSequence) HasStatsOn(key string) bool { + if !sequence.HasAnnotation() { + return false + } + + mkey := "merged_" + key + annotations := sequence.Annotations() + _, ok := annotations[mkey] + + return ok +} + +func (sequence BioSequence) StatsOn(key string) StatsOnValues { + mkey := "merged_" + key + annotations := sequence.Annotations() + istat, ok := annotations[mkey] + + var stats StatsOnValues + var newstat bool + + if ok { + switch istat := istat.(type) { + case StatsOnValues: + stats = istat + newstat = false + default: + stats = make(StatsOnValues, 100) + annotations[mkey] = stats + newstat = true + } + } else { + stats = make(StatsOnValues, 100) + annotations[mkey] = stats + newstat = true + } + + if newstat && sequence.StatsPlusOne(key, sequence) { + delete(sequence.Annotations(), key) + } + + return stats +} + +func (sequence BioSequence) StatsPlusOne(key string, toAdd BioSequence) bool { + if toAdd.HasAnnotation() { + stats := sequence.StatsOn(key) + value, ok := toAdd.Annotations()[key] + + if ok { + var sval string + + switch value := value.(type) { + case string: + sval = value + case int, + uint8, uint16, uint32, uint64, + int8, int16, int32, int64, bool: + sval = fmt.Sprint(value) + default: + log.Fatalf("Trying to make stats on a none string, integer or boolean value (%v)", value) + } + old, ok := stats[sval] + if !ok { + old = 0 + } + stats[sval] = old + 1 + + return true + } + } + + return false +} + +func (stats StatsOnValues) Merge(toMerged StatsOnValues) StatsOnValues { + for k, val := range toMerged { + old, ok := stats[k] + if !ok { + old = 0 + } + stats[k] = old + val + } + + return stats +} + +func (sequence BioSequence) Merge(tomerge BioSequence, inplace bool, keys ...string) BioSequence { + if !inplace { + sequence = sequence.Copy() + } + + annotation := sequence.Annotations() + + annotation["count"] = tomerge.Count() + sequence.Count() + + for _, key := range keys { + if tomerge.HasStatsOn(key) { + smk := sequence.StatsOn(key) + mmk := tomerge.StatsOn(key) + smk.Merge(mmk) + } else { + sequence.StatsPlusOne(key, tomerge) + } + } + + return sequence +} diff --git a/pkg/obitools/obidistribute/distribute.go b/pkg/obitools/obidistribute/distribute.go new file mode 100644 index 0000000..731f8a0 --- /dev/null +++ b/pkg/obitools/obidistribute/distribute.go @@ -0,0 +1,57 @@ +package obidistribute + +import ( + "log" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" +) + +func DistributeSequence(sequences obiseq.IBioSequenceBatch) { + + opts := make([]obiformats.WithOption, 0, 10) + + switch obiconvert.OutputFastHeaderFormat() { + case "json": + log.Println("On output use JSON headers") + opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) + case "obi": + log.Println("On output use OBI headers") + opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqOBIHeader)) + default: + log.Println("On output use JSON headers") + opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) + } + + nworkers := obioptions.CLIParallelWorkers() / 4 + if nworkers < 2 { + nworkers = 2 + } + + opts = append(opts, obiformats.OptionsParallelWorkers(nworkers)) + opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) + opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) + + opts = append(opts, obiformats.OptionsQualityShift(obiconvert.OutputQualityShift())) + + var formater obiformats.SequenceBatchWriterToFile + + switch obiconvert.OutputFormat() { + case "fastq": + formater = obiformats.WriteFastqBatchToFile + case "fasta": + formater = obiformats.WriteFastaBatchToFile + default: + formater = obiformats.WriteSequencesBatchToFile + } + + dispatcher := sequences.Distribute(CLISequenceClassifier(), + obioptions.CLIBatchSize()) + + obiformats.WriterDispatcher(CLIFileNamePattern(), + dispatcher, formater, opts..., + ) + +} diff --git a/pkg/obitools/obidistribute/options.go b/pkg/obitools/obidistribute/options.go new file mode 100644 index 0000000..96100cd --- /dev/null +++ b/pkg/obitools/obidistribute/options.go @@ -0,0 +1,58 @@ +package obidistribute + +import ( + "fmt" + "log" + "strings" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" + "github.com/DavidGamba/go-getoptions" +) + +var _FilenamePattern = "" +var _SequenceClassifierTag = "" +var _BatchCount = 0 + +func DistributeOptionSet(options *getoptions.GetOpt) { + options.StringVar(&_FilenamePattern, "pattern", _FilenamePattern, + options.Alias("p"), + options.Required("You must provide at pattern for the file names "), + options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) + + options.StringVar(&_SequenceClassifierTag, "classifier", _SequenceClassifierTag, + options.Alias("c"), + options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) + + options.IntVar(&_BatchCount, "batch", 0, + options.Alias("n"), + options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) +} + +func OptionSet(options *getoptions.GetOpt) { + obiconvert.InputOptionSet(options) + obiconvert.OutputOptionSet(options) + DistributeOptionSet(options) +} + +func CLISequenceClassifier() obiseq.SequenceClassifier { + switch { + case _SequenceClassifierTag != "": + return obiseq.AnnotationClassifier(_SequenceClassifierTag) + case _BatchCount > 0: + return obiseq.RotateClassifier(_BatchCount) + + } + + log.Fatal("one of the options --classifier or --batch must be specified") + return nil +} + +func CLIFileNamePattern() string { + x := fmt.Sprintf(_FilenamePattern, "_xxx_") + if strings.Contains(x, "(string=_xxx_)") { + log.Panicf("patern %s is not correct : %s", _FilenamePattern, x) + } + + return _FilenamePattern +} diff --git a/pkg/obitools/obipairing/options.go b/pkg/obitools/obipairing/options.go index c16e4d4..3c4e28a 100644 --- a/pkg/obitools/obipairing/options.go +++ b/pkg/obitools/obipairing/options.go @@ -18,10 +18,12 @@ func PairingOptionSet(options *getoptions.GetOpt) { options.StringSliceVar(&_ForwardFiles, "forward-reads", 1, 1000, options.Alias("F"), + options.Required("You must provide at least one forward file"), options.Description("The file names containing the forward reads")) options.StringSliceVar(&_ReverseFiles, "reverse-reads", 1, 1000, options.Alias("R"), + options.Required("You must provide at least one reverse file"), options.Description("The file names containing the reverse reads")) options.IntVar(&_Delta, "delta", _Delta, options.Alias("D"),