big bug on subschunk

This commit is contained in:
2022-02-18 09:58:08 +01:00
parent ce226acac0
commit 2636882f9f
5 changed files with 444 additions and 90 deletions

View File

@ -0,0 +1,96 @@
package obichunk
import (
"io/fs"
"io/ioutil"
"log"
"os"
"path/filepath"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
func tempDir() (string, error) {
dir, err := ioutil.TempDir(os.TempDir(), "obiseq_chunks_")
if err != nil {
return "", err
}
return dir, nil
}
func find(root, ext string) []string {
var a []string
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
if e != nil {
return e
}
if filepath.Ext(d.Name()) == ext {
a = append(a, s)
}
return nil
})
return a
}
func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch,
classifier obiseq.BioSequenceClassifier,
sizes ...int) (obiseq.IBioSequenceBatch, error) {
dir, err := tempDir()
if err != nil {
return obiseq.NilIBioSequenceBatch, err
}
bufferSize := iterator.BufferSize()
if len(sizes) > 0 {
bufferSize = sizes[0]
}
newIter := obiseq.MakeIBioSequenceBatch(bufferSize)
newIter.Add(1)
go func() {
defer func() {
os.RemoveAll(dir)
log.Println("Clear the cache directory")
}()
newIter.Wait()
close(newIter.Channel())
}()
obiformats.WriterDispatcher(dir+"/chunk_%s.fastx",
iterator.Distribute(classifier),
obiformats.WriteSequencesBatchToFile,
)
fileNames := find(dir, ".fastx")
log.Println("batch count ", len(fileNames))
go func() {
for order, file := range fileNames {
iseq, err := obiformats.ReadSequencesBatchFromFile(file)
if err != nil {
panic(err)
}
chunck := make(obiseq.BioSequenceSlice, 0, 1000)
for iseq.Next() {
b := iseq.Get()
chunck = append(chunck, b.Slice()...)
}
newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...)
}
newIter.Done()
}()
return newIter, err
}

View File

@ -1,103 +1,14 @@
package obichunk package obichunk
import ( import (
"io/fs"
"io/ioutil"
"log" "log"
"os"
"path/filepath"
"sync" "sync"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
) )
func tempDir() (string, error) {
dir, err := ioutil.TempDir(os.TempDir(), "obiseq_chunks_")
if err != nil {
return "", err
}
return dir, nil
}
func find(root, ext string) []string {
var a []string
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
if e != nil {
return e
}
if filepath.Ext(d.Name()) == ext {
a = append(a, s)
}
return nil
})
return a
}
func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch,
classifier obiseq.SequenceClassifier,
sizes ...int) (obiseq.IBioSequenceBatch, error) {
dir, err := tempDir()
if err != nil {
return obiseq.NilIBioSequenceBatch, err
}
bufferSize := iterator.BufferSize()
if len(sizes) > 0 {
bufferSize = sizes[0]
}
newIter := obiseq.MakeIBioSequenceBatch(bufferSize)
newIter.Add(1)
go func() {
defer func() {
os.RemoveAll(dir)
log.Println("Clear the cache directory")
}()
newIter.Wait()
close(newIter.Channel())
}()
go func() {
obiformats.WriterDispatcher(dir+"/chunk_%s.fastx",
iterator.Distribute(classifier),
obiformats.WriteSequencesBatchToFile,
)
files := find(dir, ".fastx")
for order, file := range files {
iseq, err := obiformats.ReadSequencesBatchFromFile(file)
if err != nil {
panic(err)
}
chunck := make(obiseq.BioSequenceSlice, 0, 1000)
for iseq.Next() {
b := iseq.Get()
chunck = append(chunck, b.Slice()...)
}
if len(chunck) > 0 {
newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...)
}
}
newIter.Done()
}()
return newIter, err
}
func ISequenceChunk(iterator obiseq.IBioSequenceBatch, func ISequenceChunk(iterator obiseq.IBioSequenceBatch,
classifier obiseq.SequenceClassifier, classifier obiseq.BioSequenceClassifier,
sizes ...int) (obiseq.IBioSequenceBatch, error) { sizes ...int) (obiseq.IBioSequenceBatch, error) {
bufferSize := iterator.BufferSize() bufferSize := iterator.BufferSize()

151
pkg/obichunk/options.go Normal file
View File

@ -0,0 +1,151 @@
package obichunk
type __options__ struct {
statsOn []string
categories []string
navalue string
cacheOnDisk bool
batchCount int
bufferSize int
batchSize int
parallelWorkers int
}
type Options struct {
pointer *__options__
}
type WithOption func(Options)
func MakeOptions(setters []WithOption) Options {
o := __options__{
statsOn: make([]string, 0, 100),
categories: make([]string, 0, 100),
navalue: "NA",
cacheOnDisk: false,
batchCount: 100,
bufferSize: 2,
batchSize: 5000,
parallelWorkers: 4,
}
opt := Options{&o}
for _, set := range setters {
set(opt)
}
return opt
}
func (opt Options) Categories() []string {
return opt.pointer.categories
}
func (opt Options) PopCategories() string {
if len(opt.pointer.categories) > 0 {
c := opt.pointer.categories[0]
opt.pointer.categories = opt.pointer.categories[1:]
return c
}
return ""
}
func (opt Options) StatsOn() []string {
return opt.pointer.statsOn
}
func (opt Options) NAValue() string {
return opt.pointer.navalue
}
func (opt Options) BatchCount() int {
return opt.pointer.batchCount
}
func (opt Options) BufferSize() int {
return opt.pointer.bufferSize
}
func (opt Options) BatchSize() int {
return opt.pointer.batchSize
}
func (opt Options) ParallelWorkers() int {
return opt.pointer.parallelWorkers
}
func (opt Options) SortOnDisk() bool {
return opt.pointer.cacheOnDisk
}
func OptionSortOnDisk() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.cacheOnDisk = true
})
return f
}
func OptionSortOnMemory() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.cacheOnDisk = false
})
return f
}
func OptionSubCategory(keys ...string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.categories = append(opt.pointer.categories, keys...)
})
return f
}
func OptionNAValue(na string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.navalue = na
})
return f
}
func OptionStatOn(keys ...string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.statsOn = append(opt.pointer.categories, keys...)
})
return f
}
func OptionBatchCount(number int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.batchCount = number
})
return f
}
func OptionsParallelWorkers(nworkers int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.parallelWorkers = nworkers
})
return f
}
func OptionsBatchSize(size int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.batchSize = size
})
return f
}
func OptionsBufferSize(size int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.bufferSize = size
})
return f
}

83
pkg/obichunk/subchunks.go Normal file
View File

@ -0,0 +1,83 @@
package obichunk
import (
"sync"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch,
classifier obiseq.BioSequenceClassifier,
sizes ...int) (obiseq.IBioSequenceBatch, error) {
bufferSize := iterator.BufferSize()
nworkers := 4
if len(sizes) > 0 {
nworkers = sizes[0]
}
if len(sizes) > 1 {
bufferSize = sizes[1]
}
newIter := obiseq.MakeIBioSequenceBatch(bufferSize)
newIter.Add(nworkers)
go func() {
newIter.Wait()
close(newIter.Channel())
}()
omutex := sync.Mutex{}
order := 0
nextOrder := func() int {
omutex.Lock()
neworder := order
order++
omutex.Unlock()
return neworder
}
ff := func(iterator obiseq.IBioSequenceBatch) {
chunks := make(map[string]*obiseq.BioSequenceSlice, 100)
for iterator.Next() {
batch := iterator.Get()
for _, s := range batch.Slice() {
key := classifier(s)
slice, ok := chunks[key]
if !ok {
is := make(obiseq.BioSequenceSlice, 0, len(batch.Slice()))
slice = &is
chunks[key] = slice
}
*slice = append(*slice, s)
}
n := 0
for k, chunck := range chunks {
n += len(*chunck)
newIter.Channel() <- obiseq.MakeBioSequenceBatch(nextOrder(), *chunck...)
delete(chunks, k)
}
}
newIter.Done()
}
for i := 0; i < nworkers-1; i++ {
go ff(iterator.Split())
}
go ff(iterator)
return newIter, nil
}

113
pkg/obichunk/unique.go Normal file
View File

@ -0,0 +1,113 @@
package obichunk
import (
"sync"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
func IUniqueSequence(iterator obiseq.IBioSequenceBatch,
options ...WithOption) (obiseq.IBioSequenceBatch, error) {
var err error
opts := MakeOptions(options)
iUnique := obiseq.MakeIBioSequenceBatch(opts.BufferSize())
if opts.SortOnDisk() {
iterator, err = ISequenceChunkOnDisk(iterator,
obiseq.HashClassifier(opts.BatchCount()),
opts.BufferSize())
if err != nil {
return obiseq.NilIBioSequenceBatch, err
}
} else {
iterator, err = ISequenceChunk(iterator,
obiseq.HashClassifier(opts.BatchCount()),
opts.BufferSize())
if err != nil {
return obiseq.NilIBioSequenceBatch, err
}
}
nworkers := opts.ParallelWorkers()
iUnique.Add(nworkers)
go func() {
iUnique.Wait()
close(iUnique.Channel())
}()
omutex := sync.Mutex{}
order := 0
nextOrder := func() int {
omutex.Lock()
neworder := order
order++
omutex.Unlock()
return neworder
}
var ff func(obiseq.IBioSequenceBatch, obiseq.BioSequenceClassifier, int)
cat := opts.Categories()
na := opts.NAValue()
ff = func(input obiseq.IBioSequenceBatch,
classifier obiseq.BioSequenceClassifier,
icat int) {
icat--
input, err = ISequenceSubChunk(input,
classifier,
opts.BufferSize())
var next obiseq.IBioSequenceBatch
if icat >= 0 {
next = obiseq.MakeIBioSequenceBatch(opts.BufferSize())
iUnique.Add(1)
go ff(next,
obiseq.AnnotationClassifier(cat[icat], na),
icat)
}
o := 0
for input.Next() {
batch := input.Get()
if icat < 0 || len(batch.Slice()) == 1 {
iUnique.Channel() <- batch.Reorder(nextOrder())
} else {
next.Channel() <- batch.Reorder(o)
o++
}
}
if icat >= 0 {
close(next.Channel())
}
iUnique.Done()
}
for i := 0; i < nworkers-1; i++ {
go ff(iterator.Split(),
obiseq.SequenceClassifier(),
len(cat))
}
go ff(iterator,
obiseq.SequenceClassifier(),
len(cat))
iMerged := iUnique.MakeISliceWorker(
obiseq.MergeSliceWorker(
opts.NAValue(),
opts.StatsOn()...),
opts.BufferSize(),
)
return iMerged.Rebatch(opts.BatchSize()), nil
}