Few debug on obidistribute and some progresses on obiunique, but -c and -m options are still not working

This commit is contained in:
2022-02-15 10:49:01 +01:00
parent 3586ecc483
commit ce226acac0
4 changed files with 110 additions and 8 deletions

View File

@ -54,11 +54,12 @@ func InterfaceToInt(i interface{}) (val int, err error) {
return return
} }
func IsAnInt(i interface{}) bool { func CastableToInt(i interface{}) bool {
switch i.(type) { switch i.(type) {
case int, case int,
int8, int16, int32, int64, int8, int16, int32, int64,
float32, float64,
uint8, uint16, uint32, uint64: uint8, uint16, uint32, uint64:
return true return true
default: default:

View File

@ -6,6 +6,7 @@ import (
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
@ -33,7 +34,9 @@ func find(root, ext string) []string {
return a return a
} }
func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) (obiseq.IBioSequenceBatch, error) { func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch,
classifier obiseq.SequenceClassifier,
sizes ...int) (obiseq.IBioSequenceBatch, error) {
dir, err := tempDir() dir, err := tempDir()
if err != nil { if err != nil {
return obiseq.NilIBioSequenceBatch, err return obiseq.NilIBioSequenceBatch, err
@ -50,15 +53,18 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) (
newIter.Add(1) newIter.Add(1)
go func() { go func() {
defer func() {
os.RemoveAll(dir)
log.Println("Clear the cache directory")
}()
newIter.Wait() newIter.Wait()
close(newIter.Channel()) close(newIter.Channel())
log.Println("====>> clear diectory")
os.RemoveAll(dir)
}() }()
go func() { go func() {
obiformats.WriterDispatcher(dir+"/chunk_%s.fastx", obiformats.WriterDispatcher(dir+"/chunk_%s.fastx",
iterator.Distribute(obiseq.HashClassifier(size)), iterator.Distribute(classifier),
obiformats.WriteSequencesBatchToFile, obiformats.WriteSequencesBatchToFile,
) )
@ -71,7 +77,7 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) (
panic(err) panic(err)
} }
chunck := make(obiseq.BioSequenceSlice, 0, 3*size) chunck := make(obiseq.BioSequenceSlice, 0, 1000)
for iseq.Next() { for iseq.Next() {
b := iseq.Get() b := iseq.Get()
@ -89,3 +95,70 @@ func ISequenceChunk(iterator obiseq.IBioSequenceBatch, size int, sizes ...int) (
return newIter, err return newIter, err
} }
func ISequenceChunk(iterator obiseq.IBioSequenceBatch,
classifier obiseq.SequenceClassifier,
sizes ...int) (obiseq.IBioSequenceBatch, error) {
bufferSize := iterator.BufferSize()
if len(sizes) > 0 {
bufferSize = sizes[0]
}
newIter := obiseq.MakeIBioSequenceBatch(bufferSize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.Channel())
}()
go func() {
lock := sync.Mutex{}
dispatcher := iterator.Distribute(classifier)
jobDone := sync.WaitGroup{}
chunks := make(map[string]*obiseq.BioSequenceSlice, 100)
for newflux := range dispatcher.News() {
jobDone.Add(1)
go func(newflux string) {
data, err := dispatcher.Outputs(newflux)
if err != nil {
log.Fatalf("Cannot retreive the new chanel : %v", err)
}
chunk := make(obiseq.BioSequenceSlice, 0, 1000)
for data.Next() {
b := data.Get()
chunk = append(chunk, b.Slice()...)
}
lock.Lock()
chunks[newflux] = &chunk
lock.Unlock()
jobDone.Done()
}(newflux)
}
jobDone.Wait()
order := 0
for _, chunck := range chunks {
if len(*chunck) > 0 {
newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, *chunck...)
order++
}
}
newIter.Done()
}()
return newIter, nil
}

View File

@ -7,6 +7,8 @@ import (
var _StatsOn = make([]string, 0, 10) var _StatsOn = make([]string, 0, 10)
var _Keys = make([]string, 0, 10) var _Keys = make([]string, 0, 10)
var _OnDisk = false
var _chunks = 100
func UniqueOptionSet(options *getoptions.GetOpt) { func UniqueOptionSet(options *getoptions.GetOpt) {
options.StringSliceVar(&_StatsOn, "merge", options.StringSliceVar(&_StatsOn, "merge",
@ -17,6 +19,10 @@ func UniqueOptionSet(options *getoptions.GetOpt) {
1, 1000, 1, 1000,
options.Alias("c"), options.Alias("c"),
options.Description("Adds one attribute to the list of attributes used to define sequence groups (this option can be used several times).")) options.Description("Adds one attribute to the list of attributes used to define sequence groups (this option can be used several times)."))
options.BoolVar(&_OnDisk, "on-disk", true,
options.Description("Allows for using a disk cache during the dereplication process. "))
options.IntVar(&_chunks, "chunk-count", _chunks,
options.Description("In how many chunk the dataset is pre-devided for speeding up the process."))
} }
@ -34,3 +40,15 @@ func CLIStatsOn() []string {
func CLIKeys() []string { func CLIKeys() []string {
return _Keys return _Keys
} }
func CLIUniqueInMemory() bool {
return _OnDisk
}
func CLINumberOfChunks() int {
if _chunks <= 1 {
return 1
}
return _chunks
}

View File

@ -10,7 +10,17 @@ import (
func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch { func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch {
newIter, err := obichunk.ISequenceChunk(sequences, 100, 2) classifier := obiseq.HashClassifier(CLINumberOfChunks())
var newIter obiseq.IBioSequenceBatch
var err error
if CLIUniqueInMemory() {
log.Printf("Running dereplication in memory on %d chunks", CLINumberOfChunks())
newIter, err = obichunk.ISequenceChunk(sequences, classifier, 2)
} else {
log.Printf("Running dereplication on disk with %d chunks", CLINumberOfChunks())
newIter, err = obichunk.ISequenceChunkOnDisk(sequences, classifier, 2)
}
if err != nil { if err != nil {
log.Fatalf("error in spliting the dataset : %v", err) log.Fatalf("error in spliting the dataset : %v", err)
@ -23,6 +33,6 @@ func Unique(sequences obiseq.IBioSequenceBatch) obiseq.IBioSequenceBatch {
newIter = newIter.MakeISliceWorker(obiseq.UniqueSliceWorker(statsOn, keys...), newIter = newIter.MakeISliceWorker(obiseq.UniqueSliceWorker(statsOn, keys...),
parallelWorkers, buffSize) parallelWorkers, buffSize)
return newIter return newIter
} }