From 37ce3536e18ea352c08ee1412330d8efbc824468 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Fri, 18 Feb 2022 10:01:23 +0100 Subject: [PATCH] New more functionnal version of obiuniq. But it has a memory leak --- pkg/obitools/obiuniq/options.go | 15 ++++++++++-- pkg/obitools/obiuniq/unique.go | 42 +++++++++++++++++++++------------ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/pkg/obitools/obiuniq/options.go b/pkg/obitools/obiuniq/options.go index 2820b65..b06f4d8 100644 --- a/pkg/obitools/obiuniq/options.go +++ b/pkg/obitools/obiuniq/options.go @@ -9,18 +9,25 @@ var _StatsOn = make([]string, 0, 10) var _Keys = make([]string, 0, 10) var _OnDisk = false var _chunks = 100 +var _NAValue = "NA" func UniqueOptionSet(options *getoptions.GetOpt) { options.StringSliceVar(&_StatsOn, "merge", - 1, 1000, + 1, 1, options.Alias("m"), options.Description("Adds a merged attribute containing the list of sequence record ids merged within this group.")) + options.StringSliceVar(&_Keys, "category-attribute", - 1, 1000, + 1, 1, options.Alias("c"), options.Description("Adds one attribute to the list of attributes used to define sequence groups (this option can be used several times).")) + + options.StringVar(&_NAValue, "na-value", _NAValue, + options.Description("Value used when the classifier tag is not defined for a sequence.")) + options.BoolVar(&_OnDisk, "on-disk", true, options.Description("Allows for using a disk cache during the dereplication process. ")) + options.IntVar(&_chunks, "chunk-count", _chunks, options.Description("In how many chunk the dataset is pre-devided for speeding up the process.")) @@ -52,3 +59,7 @@ func CLINumberOfChunks() int { return _chunks } + +func CLINAValue() string { + return _NAValue +} diff --git a/pkg/obitools/obiuniq/unique.go b/pkg/obitools/obiuniq/unique.go index 7bbcaf2..20aa0cb 100644 --- a/pkg/obitools/obiuniq/unique.go +++ b/pkg/obitools/obiuniq/unique.go @@ -10,29 +10,41 @@ import ( func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch { - classifier := obiseq.HashClassifier(CLINumberOfChunks()) - var newIter obiseq.IBioSequenceBatch - var err error + options := make([]obichunk.WithOption, 0, 30) + + options = append(options, + obichunk.OptionBatchCount(CLINumberOfChunks()), + ) if CLIUniqueInMemory() { log.Printf("Running dereplication in memory on %d chunks", CLINumberOfChunks()) - newIter, err = obichunk.ISequenceChunk(sequences, classifier, 2) + options = append(options, obichunk.OptionSortOnMemory()) } else { log.Printf("Running dereplication on disk with %d chunks", CLINumberOfChunks()) - newIter, err = obichunk.ISequenceChunkOnDisk(sequences, classifier, 2) + options = append(options, obichunk.OptionSortOnDisk()) } + options = append(options, + obichunk.OptionStatOn(CLIStatsOn()...)) + + options = append(options, + obichunk.OptionSubCategory(CLIKeys()...)) + + options = append(options, + obichunk.OptionsParallelWorkers( + obioptions.CLIParallelWorkers()), + obichunk.OptionsBufferSize( + obioptions.CLIBufferSize()), + obichunk.OptionsBatchSize( + obioptions.CLIBatchSize()), + obichunk.OptionNAValue(CLINAValue()), + ) + + iUnique, err := obichunk.IUniqueSequence(sequences, options...) + if err != nil { - log.Fatalf("error in spliting the dataset : %v", err) + log.Fatal(err) } - statsOn := CLIStatsOn() - keys := CLIKeys() - parallelWorkers := obioptions.CLIParallelWorkers() - buffSize := obioptions.CLIBufferSize() - - newIter = newIter.MakeISliceWorker(obiseq.UniqueSliceWorker(statsOn, keys...), - parallelWorkers, buffSize) - - return newIter + return iUnique }