From ce226acac0ad9765806169c114dad951244461b2 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 15 Feb 2022 10:49:01 +0100 Subject: [PATCH] Few debug on obidistribute and some progresses on obiunique, but -c and -m options are still not working --- pkg/goutils/goutils.go | 3 +- pkg/obichunk/chunks.go | 83 +++++++++++++++++++++++++++++++-- pkg/obitools/obiuniq/options.go | 18 +++++++ pkg/obitools/obiuniq/unique.go | 14 +++++- 4 files changed, 110 insertions(+), 8 deletions(-) diff --git a/pkg/goutils/goutils.go b/pkg/goutils/goutils.go index 468bdae..cadbed3 100644 --- a/pkg/goutils/goutils.go +++ b/pkg/goutils/goutils.go @@ -54,11 +54,12 @@ func InterfaceToInt(i interface{}) (val int, err error) { return } -func IsAnInt(i interface{}) bool { +func CastableToInt(i interface{}) bool { switch i.(type) { case int, int8, int16, int32, int64, + float32, float64, uint8, uint16, uint32, uint64: return true default: diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go index 7f0d2ab..7e2bef5 100644 --- a/pkg/obichunk/chunks.go +++ b/pkg/obichunk/chunks.go @@ -6,6 +6,7 @@ import ( "log" "os" "path/filepath" + "sync" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" @@ -33,7 +34,9 @@ func find(root, ext string) []string { return a } -func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) (obiseq.IBioSequenceBatch, error) { +func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, + classifier obiseq.SequenceClassifier, + sizes ...int) (obiseq.IBioSequenceBatch, error) { dir, err := tempDir() if err != nil { return obiseq.NilIBioSequenceBatch, err @@ -50,15 +53,18 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) ( newIter.Add(1) go func() { + defer func() { + os.RemoveAll(dir) + log.Println("Clear the cache directory") + }() + newIter.Wait() close(newIter.Channel()) - log.Println("====>> clear diectory") - os.RemoveAll(dir) }() go func() { obiformats.WriterDispatcher(dir+"/chunk_%s.fastx", - iterator.Distribute(obiseq.HashClassifier(size)), + iterator.Distribute(classifier), obiformats.WriteSequencesBatchToFile, ) @@ -71,7 +77,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) ( panic(err) } - chunck := make(obiseq.BioSequenceSlice, 0, 3*size) + chunck := make(obiseq.BioSequenceSlice, 0, 1000) for iseq.Next() { b := iseq.Get() @@ -89,3 +95,70 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) ( return newIter, err } + +func ISequenceChunk(iterator obiseq.IBioSequenceBatch, + classifier obiseq.SequenceClassifier, + sizes ...int) (obiseq.IBioSequenceBatch, error) { + + bufferSize := iterator.BufferSize() + + if len(sizes) > 0 { + bufferSize = sizes[0] + } + + newIter := obiseq.MakeIBioSequenceBatch(bufferSize) + + newIter.Add(1) + + go func() { + newIter.Wait() + close(newIter.Channel()) + }() + + go func() { + lock := sync.Mutex{} + + dispatcher := iterator.Distribute(classifier) + + jobDone := sync.WaitGroup{} + chunks := make(map[string]*obiseq.BioSequenceSlice, 100) + + for newflux := range dispatcher.News() { + jobDone.Add(1) + go func(newflux string) { + data, err := dispatcher.Outputs(newflux) + + if err != nil { + log.Fatalf("Cannot retreive the new chanel : %v", err) + } + + chunk := make(obiseq.BioSequenceSlice, 0, 1000) + + for data.Next() { + b := data.Get() + chunk = append(chunk, b.Slice()...) + } + + lock.Lock() + chunks[newflux] = &chunk + lock.Unlock() + jobDone.Done() + }(newflux) + } + + jobDone.Wait() + order := 0 + + for _, chunck := range chunks { + + if len(*chunck) > 0 { + newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, *chunck...) + order++ + } + + } + newIter.Done() + }() + + return newIter, nil +} diff --git a/pkg/obitools/obiuniq/options.go b/pkg/obitools/obiuniq/options.go index 4a953d9..2820b65 100644 --- a/pkg/obitools/obiuniq/options.go +++ b/pkg/obitools/obiuniq/options.go @@ -7,6 +7,8 @@ import ( var _StatsOn = make([]string, 0, 10) var _Keys = make([]string, 0, 10) +var _OnDisk = false +var _chunks = 100 func UniqueOptionSet(options *getoptions.GetOpt) { options.StringSliceVar(&_StatsOn, "merge", @@ -17,6 +19,10 @@ func UniqueOptionSet(options *getoptions.GetOpt) { 1, 1000, options.Alias("c"), options.Description("Adds one attribute to the list of attributes used to define sequence groups (this option can be used several times).")) + options.BoolVar(&_OnDisk, "on-disk", true, + options.Description("Allows for using a disk cache during the dereplication process. ")) + options.IntVar(&_chunks, "chunk-count", _chunks, + options.Description("In how many chunk the dataset is pre-devided for speeding up the process.")) } @@ -34,3 +40,15 @@ func CLIStatsOn() []string { func CLIKeys() []string { return _Keys } + +func CLIUniqueInMemory() bool { + return _OnDisk +} + +func CLINumberOfChunks() int { + if _chunks <= 1 { + return 1 + } + + return _chunks +} diff --git a/pkg/obitools/obiuniq/unique.go b/pkg/obitools/obiuniq/unique.go index 3cfb7be..7bbcaf2 100644 --- a/pkg/obitools/obiuniq/unique.go +++ b/pkg/obitools/obiuniq/unique.go @@ -10,7 +10,17 @@ import ( func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch { - newIter, err := obichunk.ISequenceChunk(sequences, 100, 2) + classifier := obiseq.HashClassifier(CLINumberOfChunks()) + var newIter obiseq.IBioSequenceBatch + var err error + + if CLIUniqueInMemory() { + log.Printf("Running dereplication in memory on %d chunks", CLINumberOfChunks()) + newIter, err = obichunk.ISequenceChunk(sequences, classifier, 2) + } else { + log.Printf("Running dereplication on disk with %d chunks", CLINumberOfChunks()) + newIter, err = obichunk.ISequenceChunkOnDisk(sequences, classifier, 2) + } if err != nil { log.Fatalf("error in spliting the dataset : %v", err) @@ -23,6 +33,6 @@ func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch { newIter = newIter.MakeISliceWorker(obiseq.UniqueSliceWorker(statsOn, keys...), parallelWorkers, buffSize) - + return newIter }