diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go new file mode 100644 index 0000000..b0ed456 --- /dev/null +++ b/pkg/obichunk/chunk_on_disk.go @@ -0,0 +1,96 @@ +package obichunk + +import ( + "io/fs" + "io/ioutil" + "log" + "os" + "path/filepath" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +func tempDir() (string, error) { + dir, err := ioutil.TempDir(os.TempDir(), "obiseq_chunks_") + if err != nil { + return "", err + } + return dir, nil +} + +func find(root, ext string) []string { + var a []string + filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error { + if e != nil { + return e + } + if filepath.Ext(d.Name()) == ext { + a = append(a, s) + } + return nil + }) + return a +} + +func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, + classifier obiseq.BioSequenceClassifier, + sizes ...int) (obiseq.IBioSequenceBatch, error) { + dir, err := tempDir() + if err != nil { + return obiseq.NilIBioSequenceBatch, err + } + + bufferSize := iterator.BufferSize() + + if len(sizes) > 0 { + bufferSize = sizes[0] + } + + newIter := obiseq.MakeIBioSequenceBatch(bufferSize) + + newIter.Add(1) + + go func() { + defer func() { + os.RemoveAll(dir) + log.Println("Clear the cache directory") + }() + + newIter.Wait() + close(newIter.Channel()) + }() + + obiformats.WriterDispatcher(dir+"/chunk_%s.fastx", + iterator.Distribute(classifier), + obiformats.WriteSequencesBatchToFile, + ) + + fileNames := find(dir, ".fastx") + log.Println("batch count ", len(fileNames)) + + go func() { + + for order, file := range fileNames { + iseq, err := obiformats.ReadSequencesBatchFromFile(file) + + if err != nil { + panic(err) + } + + chunck := make(obiseq.BioSequenceSlice, 0, 1000) + + for iseq.Next() { + b := iseq.Get() + chunck = append(chunck, b.Slice()...) + } + + newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...) + + } + + newIter.Done() + }() + + return newIter, err +} diff --git a/pkg/obichunk/chunks.go b/pkg/obichunk/chunks.go index 7e2bef5..50c8a29 100644 --- a/pkg/obichunk/chunks.go +++ b/pkg/obichunk/chunks.go @@ -1,103 +1,14 @@ package obichunk import ( - "io/fs" - "io/ioutil" "log" - "os" - "path/filepath" "sync" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiformats" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) -func tempDir() (string, error) { - dir, err := ioutil.TempDir(os.TempDir(), "obiseq_chunks_") - if err != nil { - return "", err - } - return dir, nil -} - -func find(root, ext string) []string { - var a []string - filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error { - if e != nil { - return e - } - if filepath.Ext(d.Name()) == ext { - a = append(a, s) - } - return nil - }) - return a -} - -func ISequenceChunkOnDisk(iterator obiseq.IBioSequenceBatch, - classifier obiseq.SequenceClassifier, - sizes ...int) (obiseq.IBioSequenceBatch, error) { - dir, err := tempDir() - if err != nil { - return obiseq.NilIBioSequenceBatch, err - } - - bufferSize := iterator.BufferSize() - - if len(sizes) > 0 { - bufferSize = sizes[0] - } - - newIter := obiseq.MakeIBioSequenceBatch(bufferSize) - - newIter.Add(1) - - go func() { - defer func() { - os.RemoveAll(dir) - log.Println("Clear the cache directory") - }() - - newIter.Wait() - close(newIter.Channel()) - }() - - go func() { - obiformats.WriterDispatcher(dir+"/chunk_%s.fastx", - iterator.Distribute(classifier), - obiformats.WriteSequencesBatchToFile, - ) - - files := find(dir, ".fastx") - - for order, file := range files { - iseq, err := obiformats.ReadSequencesBatchFromFile(file) - - if err != nil { - panic(err) - } - - chunck := make(obiseq.BioSequenceSlice, 0, 1000) - - for iseq.Next() { - b := iseq.Get() - chunck = append(chunck, b.Slice()...) - } - - if len(chunck) > 0 { - newIter.Channel() <- obiseq.MakeBioSequenceBatch(order, chunck...) - } - - } - - newIter.Done() - }() - - return newIter, err -} - func ISequenceChunk(iterator obiseq.IBioSequenceBatch, - classifier obiseq.SequenceClassifier, + classifier obiseq.BioSequenceClassifier, sizes ...int) (obiseq.IBioSequenceBatch, error) { bufferSize := iterator.BufferSize() diff --git a/pkg/obichunk/options.go b/pkg/obichunk/options.go new file mode 100644 index 0000000..41d53a5 --- /dev/null +++ b/pkg/obichunk/options.go @@ -0,0 +1,151 @@ +package obichunk + +type __options__ struct { + statsOn []string + categories []string + navalue string + cacheOnDisk bool + batchCount int + bufferSize int + batchSize int + parallelWorkers int +} + +type Options struct { + pointer *__options__ +} + +type WithOption func(Options) + +func MakeOptions(setters []WithOption) Options { + o := __options__{ + statsOn: make([]string, 0, 100), + categories: make([]string, 0, 100), + navalue: "NA", + cacheOnDisk: false, + batchCount: 100, + bufferSize: 2, + batchSize: 5000, + parallelWorkers: 4, + } + + opt := Options{&o} + + for _, set := range setters { + set(opt) + } + + return opt +} + +func (opt Options) Categories() []string { + return opt.pointer.categories +} + +func (opt Options) PopCategories() string { + if len(opt.pointer.categories) > 0 { + c := opt.pointer.categories[0] + opt.pointer.categories = opt.pointer.categories[1:] + return c + } + return "" +} + +func (opt Options) StatsOn() []string { + return opt.pointer.statsOn +} + +func (opt Options) NAValue() string { + return opt.pointer.navalue +} + +func (opt Options) BatchCount() int { + return opt.pointer.batchCount +} + +func (opt Options) BufferSize() int { + return opt.pointer.bufferSize +} + +func (opt Options) BatchSize() int { + return opt.pointer.batchSize +} + +func (opt Options) ParallelWorkers() int { + return opt.pointer.parallelWorkers +} + +func (opt Options) SortOnDisk() bool { + return opt.pointer.cacheOnDisk +} + +func OptionSortOnDisk() WithOption { + f := WithOption(func(opt Options) { + opt.pointer.cacheOnDisk = true + }) + + return f +} + +func OptionSortOnMemory() WithOption { + f := WithOption(func(opt Options) { + opt.pointer.cacheOnDisk = false + }) + + return f +} +func OptionSubCategory(keys ...string) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.categories = append(opt.pointer.categories, keys...) + }) + + return f +} + +func OptionNAValue(na string) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.navalue = na + }) + + return f +} + +func OptionStatOn(keys ...string) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.statsOn = append(opt.pointer.categories, keys...) + }) + + return f +} + +func OptionBatchCount(number int) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.batchCount = number + }) + + return f +} + +func OptionsParallelWorkers(nworkers int) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.parallelWorkers = nworkers + }) + + return f +} + +func OptionsBatchSize(size int) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.batchSize = size + }) + + return f +} + +func OptionsBufferSize(size int) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.bufferSize = size + }) + + return f +} diff --git a/pkg/obichunk/subchunks.go b/pkg/obichunk/subchunks.go new file mode 100644 index 0000000..7d7a818 --- /dev/null +++ b/pkg/obichunk/subchunks.go @@ -0,0 +1,83 @@ +package obichunk + +import ( + "sync" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +func ISequenceSubChunk(iterator obiseq.IBioSequenceBatch, + classifier obiseq.BioSequenceClassifier, + sizes ...int) (obiseq.IBioSequenceBatch, error) { + + bufferSize := iterator.BufferSize() + nworkers := 4 + + if len(sizes) > 0 { + nworkers = sizes[0] + } + + if len(sizes) > 1 { + bufferSize = sizes[1] + } + + newIter := obiseq.MakeIBioSequenceBatch(bufferSize) + + newIter.Add(nworkers) + + go func() { + newIter.Wait() + close(newIter.Channel()) + }() + + omutex := sync.Mutex{} + order := 0 + + nextOrder := func() int { + omutex.Lock() + neworder := order + order++ + omutex.Unlock() + return neworder + } + + ff := func(iterator obiseq.IBioSequenceBatch) { + chunks := make(map[string]*obiseq.BioSequenceSlice, 100) + + for iterator.Next() { + + batch := iterator.Get() + + for _, s := range batch.Slice() { + key := classifier(s) + + slice, ok := chunks[key] + + if !ok { + is := make(obiseq.BioSequenceSlice, 0, len(batch.Slice())) + slice = &is + chunks[key] = slice + } + + *slice = append(*slice, s) + } + + n := 0 + for k, chunck := range chunks { + n += len(*chunck) + newIter.Channel() <- obiseq.MakeBioSequenceBatch(nextOrder(), *chunck...) + delete(chunks, k) + } + + } + + newIter.Done() + } + + for i := 0; i < nworkers-1; i++ { + go ff(iterator.Split()) + } + go ff(iterator) + + return newIter, nil +} diff --git a/pkg/obichunk/unique.go b/pkg/obichunk/unique.go new file mode 100644 index 0000000..49ce662 --- /dev/null +++ b/pkg/obichunk/unique.go @@ -0,0 +1,113 @@ +package obichunk + +import ( + "sync" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +func IUniqueSequence(iterator obiseq.IBioSequenceBatch, + options ...WithOption) (obiseq.IBioSequenceBatch, error) { + + var err error + opts := MakeOptions(options) + + iUnique := obiseq.MakeIBioSequenceBatch(opts.BufferSize()) + + if opts.SortOnDisk() { + iterator, err = ISequenceChunkOnDisk(iterator, + obiseq.HashClassifier(opts.BatchCount()), + opts.BufferSize()) + + if err != nil { + return obiseq.NilIBioSequenceBatch, err + } + + } else { + iterator, err = ISequenceChunk(iterator, + obiseq.HashClassifier(opts.BatchCount()), + opts.BufferSize()) + + if err != nil { + return obiseq.NilIBioSequenceBatch, err + } + } + + nworkers := opts.ParallelWorkers() + iUnique.Add(nworkers) + + go func() { + iUnique.Wait() + close(iUnique.Channel()) + }() + + omutex := sync.Mutex{} + order := 0 + + nextOrder := func() int { + omutex.Lock() + neworder := order + order++ + omutex.Unlock() + return neworder + } + + var ff func(obiseq.IBioSequenceBatch, obiseq.BioSequenceClassifier, int) + + cat := opts.Categories() + na := opts.NAValue() + + ff = func(input obiseq.IBioSequenceBatch, + classifier obiseq.BioSequenceClassifier, + icat int) { + icat-- + input, err = ISequenceSubChunk(input, + classifier, + opts.BufferSize()) + + var next obiseq.IBioSequenceBatch + if icat >= 0 { + next = obiseq.MakeIBioSequenceBatch(opts.BufferSize()) + + iUnique.Add(1) + go ff(next, + obiseq.AnnotationClassifier(cat[icat], na), + icat) + } + + o := 0 + for input.Next() { + batch := input.Get() + if icat < 0 || len(batch.Slice()) == 1 { + iUnique.Channel() <- batch.Reorder(nextOrder()) + } else { + next.Channel() <- batch.Reorder(o) + o++ + } + } + + if icat >= 0 { + close(next.Channel()) + } + + iUnique.Done() + } + + for i := 0; i < nworkers-1; i++ { + go ff(iterator.Split(), + obiseq.SequenceClassifier(), + len(cat)) + } + go ff(iterator, + obiseq.SequenceClassifier(), + len(cat)) + + iMerged := iUnique.MakeISliceWorker( + obiseq.MergeSliceWorker( + opts.NAValue(), + opts.StatsOn()...), + opts.BufferSize(), + ) + + return iMerged.Rebatch(opts.BatchSize()), nil +}