Remove single sequence ierators. Only batch iterators persist

This commit is contained in:
2022-11-16 10:58:59 +01:00
parent f674200a6e
commit 6f853da9df
14 changed files with 4 additions and 651 deletions

View File

@ -5,11 +5,12 @@ import (
"encoding/csv"
"fmt"
"io"
log "github.com/sirupsen/logrus"
"os"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
@ -206,11 +207,6 @@ func ReadEcoPCRBatch(reader io.Reader, options ...WithOption) obiiter.IBioSequen
return newIter
}
func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
ib := ReadEcoPCRBatch(reader, options...)
return ib.SortBatches().IBioSequence()
}
func ReadEcoPCRBatchFromFile(filename string, options ...WithOption) (obiiter.IBioSequenceBatch, error) {
var reader io.Reader
var greader io.Reader
@ -230,9 +226,3 @@ func ReadEcoPCRBatchFromFile(filename string, options ...WithOption) (obiiter.IB
return ReadEcoPCRBatch(reader, options...), nil
}
func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
ib, err := ReadEcoPCRBatchFromFile(filename, options...)
return ib.SortBatches().IBioSequence(), err
}

View File

@ -224,11 +224,6 @@ func ReadEMBLBatch(reader io.Reader, options ...WithOption) obiiter.IBioSequence
return newIter
}
func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
ib := ReadEMBLBatch(reader, options...)
return ib.SortBatches().IBioSequence()
}
func ReadEMBLBatchFromFile(filename string, options ...WithOption) (obiiter.IBioSequenceBatch, error) {
var reader io.Reader
var greader io.Reader
@ -249,9 +244,3 @@ func ReadEMBLBatchFromFile(filename string, options ...WithOption) (obiiter.IBio
return ReadEMBLBatch(reader, options...), nil
}
func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
ib, err := ReadEMBLBatchFromFile(filename, options...)
return ib.SortBatches().IBioSequence(), err
}

View File

@ -22,12 +22,3 @@ func IParseFastSeqHeaderBatch(iterator obiiter.IBioSequenceBatch,
opt.ParallelWorkers(),
opt.BufferSize())
}
func IParseFastSeqHeader(iterator obiiter.IBioSequence,
options ...WithOption) obiiter.IBioSequence {
opt := MakeOptions(options)
return IParseFastSeqHeaderBatch(iterator.IBioSequenceBatch(opt.BatchSize(),
opt.BufferSize()),
options...).SortBatches().IBioSequence()
}

View File

@ -132,11 +132,6 @@ func ReadFastSeqBatchFromFile(filename string, options ...WithOption) (obiiter.I
return newIter, err
}
func ReadFastSeqFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
ib, err := ReadFastSeqBatchFromFile(filename, options...)
return ib.SortBatches().IBioSequence(), err
}
func ReadFastSeqBatchFromStdin(options ...WithOption) obiiter.IBioSequenceBatch {
opt := MakeOptions(options)
newIter := obiiter.MakeIBioSequenceBatch(opt.BufferSize())
@ -158,8 +153,3 @@ func ReadFastSeqBatchFromStdin(options ...WithOption) obiiter.IBioSequenceBatch
return newIter
}
func ReadFastSeqFromStdin(options ...WithOption) obiiter.IBioSequence {
ib := ReadFastSeqBatchFromStdin(options...)
return ib.SortBatches().IBioSequence()
}

View File

@ -61,47 +61,6 @@ func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader) []b
return bs.Bytes()
}
func WriteFasta(iterator obiiter.IBioSequence, file io.Writer, options ...WithOption) error {
opt := MakeOptions(options)
header_format := opt.FormatFastSeqHeader()
for iterator.Next() {
seq := iterator.Get()
fmt.Fprintln(file, FormatFasta(seq, header_format))
}
if opt.CloseFile() {
switch file := file.(type) {
case *os.File:
file.Close()
}
}
return nil
}
func WriteFastaToFile(iterator obiiter.IBioSequence,
filename string,
options ...WithOption) error {
file, err := os.Create(filename)
if err != nil {
log.Fatalf("open file error: %v", err)
return err
}
options = append(options, OptionCloseFile())
return WriteFasta(iterator, file, options...)
}
func WriteFastaToStdout(iterator obiiter.IBioSequence, options ...WithOption) error {
options = append(options, OptionDontCloseFile())
return WriteFasta(iterator, os.Stdout, options...)
}
func WriteFastaBatch(iterator obiiter.IBioSequenceBatch,
file io.Writer,
options ...WithOption) (obiiter.IBioSequenceBatch, error) {

View File

@ -46,47 +46,6 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch, quality_shift int,
return bs.Bytes()
}
func WriteFastq(iterator obiiter.IBioSequence, file io.Writer, options ...WithOption) error {
opt := MakeOptions(options)
header_format := opt.FormatFastSeqHeader()
quality := opt.QualityShift()
for iterator.Next() {
seq := iterator.Get()
fmt.Fprintln(file, FormatFastq(seq, quality, header_format))
}
if opt.CloseFile() {
switch file := file.(type) {
case *os.File:
file.Close()
}
}
return nil
}
func WriteFastqToFile(iterator obiiter.IBioSequence,
filename string,
options ...WithOption) error {
file, err := os.Create(filename)
if err != nil {
log.Fatalf("open file error: %v", err)
return err
}
options = append(options, OptionCloseFile())
return WriteFastq(iterator, file, options...)
}
func WriteFastqToStdout(iterator obiiter.IBioSequence, options ...WithOption) error {
options = append(options, OptionDontCloseFile())
return WriteFastq(iterator, os.Stdout, options...)
}
type FileChunck struct {
text []byte
order int

View File

@ -130,11 +130,6 @@ func ReadGenbankBatch(reader io.Reader, options ...WithOption) obiiter.IBioSeque
return newIter
}
func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
ib := ReadGenbankBatch(reader, options...)
return ib.SortBatches().IBioSequence()
}
func ReadGenbankBatchFromFile(filename string, options ...WithOption) (obiiter.IBioSequenceBatch, error) {
var reader io.Reader
var greader io.Reader
@ -155,9 +150,3 @@ func ReadGenbankBatchFromFile(filename string, options ...WithOption) (obiiter.I
return ReadGenbankBatch(reader, options...), nil
}
func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
ib, err := ReadGenbankBatchFromFile(filename, options...)
return ib.SortBatches().IBioSequence(), err
}

View File

@ -100,10 +100,3 @@ func ReadSequencesBatchFromFile(filename string,
return obiiter.NilIBioSequenceBatch, nil
}
func ReadSequencesFromFile(filename string,
options ...WithOption) (obiiter.IBioSequence, error) {
ib, err := ReadSequencesBatchFromFile(filename, options...)
return ib.SortBatches().IBioSequence(), err
}

View File

@ -3,55 +3,13 @@ package obiformats
import (
"fmt"
"io"
log "github.com/sirupsen/logrus"
"os"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
)
func WriteSequences(iterator obiiter.IBioSequence,
file io.Writer,
options ...WithOption) error {
opts := MakeOptions(options)
header_format := opts.FormatFastSeqHeader()
quality := opts.QualityShift()
ok := iterator.Next()
if ok {
seq := iterator.Get()
if seq.HasQualities() {
fmt.Fprintln(file, FormatFastq(seq, quality, header_format))
WriteFastq(iterator, file, options...)
} else {
fmt.Fprintln(file, FormatFasta(seq, header_format))
WriteFasta(iterator, file, options...)
}
}
return nil
}
func WriteSequencesToFile(iterator obiiter.IBioSequence,
filename string,
options ...WithOption) error {
file, err := os.Create(filename)
if err != nil {
log.Fatalf("open file error: %v", err)
return err
}
return WriteSequences(iterator, file, options...)
}
func WriteSequencesToStdout(iterator obiiter.IBioSequence, options ...WithOption) error {
return WriteSequences(iterator, os.Stdout, options...)
}
func WriteSequenceBatch(iterator obiiter.IBioSequenceBatch,
file io.Writer,
options ...WithOption) (obiiter.IBioSequenceBatch, error) {

View File

@ -244,37 +244,6 @@ func (iterator IBioSequenceBatch) Finished() bool {
return iterator.pointer.finished.IsSet()
}
func (iterator IBioSequenceBatch) IBioSequence(sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.Channel())
}()
go func() {
for iterator.Next() {
batch := iterator.Get()
for batch.NotEmpty() {
newIter.Channel() <- batch.Pop0()
}
batch.Recycle()
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequenceBatch) SortBatches(sizes ...int) IBioSequenceBatch {
buffsize := iterator.BufferSize()
@ -472,7 +441,6 @@ func (iterator IBioSequenceBatch) Consume() {
}
}
func (iterator IBioSequenceBatch) Count(recycle bool) (int, int, int) {
variants := 0
reads := 0

View File

@ -1,342 +0,0 @@
package obiiter
import (
"sync"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
// Private structure implementing an iterator over
// bioseq.BioSequence based on a channel.
type __ibiosequence__ struct {
channel chan *obiseq.BioSequence
current *obiseq.BioSequence
pushBack bool
all_done *sync.WaitGroup
buffer_size int
finished bool
pFinished *bool
}
type IBioSequence struct {
pointer *__ibiosequence__
}
var NilIBioSequence = IBioSequence{pointer: nil}
func (iterator IBioSequence) IsNil() bool {
return iterator.pointer == nil
}
func (iterator IBioSequence) Add(n int) {
iterator.pointer.all_done.Add(n)
}
func (iterator IBioSequence) Done() {
iterator.pointer.all_done.Done()
}
func (iterator IBioSequence) Wait() {
iterator.pointer.all_done.Wait()
}
func (iterator IBioSequence) Channel() chan *obiseq.BioSequence {
return iterator.pointer.channel
}
func (iterator IBioSequence) PChannel() *chan *obiseq.BioSequence {
return &(iterator.pointer.channel)
}
func MakeIBioSequence(sizes ...int) IBioSequence {
buffsize := 1
if len(sizes) > 0 {
buffsize = sizes[0]
}
i := __ibiosequence__{
channel: make(chan *obiseq.BioSequence, buffsize),
current: nil,
pushBack: false,
buffer_size: buffsize,
finished: false,
pFinished: nil,
}
i.pFinished = &i.finished
waiting := sync.WaitGroup{}
i.all_done = &waiting
ii := IBioSequence{&i}
return ii
}
func (iterator IBioSequence) Split() IBioSequence {
i := __ibiosequence__{
channel: iterator.pointer.channel,
current: nil,
pushBack: false,
finished: false,
all_done: iterator.pointer.all_done,
buffer_size: iterator.pointer.buffer_size,
pFinished: iterator.pointer.pFinished,
}
newIter := IBioSequence{&i}
return newIter
}
func (iterator IBioSequence) Next() bool {
if iterator.IsNil() || *(iterator.pointer.pFinished) {
iterator.pointer.current = nil
return false
}
if iterator.pointer.pushBack {
iterator.pointer.pushBack = false
return true
}
next, ok := (<-iterator.pointer.channel)
if ok {
iterator.pointer.current = next
return true
}
iterator.pointer.current = nil
*iterator.pointer.pFinished = true
return false
}
func (iterator IBioSequence) PushBack() {
if !(iterator.pointer.current == nil) {
iterator.pointer.pushBack = true
}
}
// The 'Get' method returns the instance of BioSequence
// currently pointed by the iterator. You have to use the
// 'Next' method to move to the next entry before calling
// 'Get' to retreive the following instance.
func (iterator IBioSequence) Get() *obiseq.BioSequence {
return iterator.pointer.current
}
// Finished returns 'true' value if no more data is available
// from the iterator.
func (iterator IBioSequence) Finished() bool {
return *iterator.pointer.pFinished
}
func (iterator IBioSequence) BufferSize() int {
return iterator.pointer.buffer_size
}
// The IBioSequenceBatch converts a IBioSequence iterator
// into an iterator oveer batches oof sequences. By default
// the size of a batch is of 100 sequences and the iterator
// implements a buffer equal to that of the source iterator.
// These defaults can be overriden by specifying one or two
// optional parametters at the method call. The first one
// indicates the batch size. The second optional parametter
// indicates the size of the buffer.
func (iterator IBioSequence) IBioSequenceBatch(sizes ...int) IBioSequenceBatch {
batchsize := 100
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
batchsize = sizes[0]
}
if len(sizes) > 1 {
buffsize = sizes[1]
}
newIter := MakeIBioSequenceBatch(buffsize)
newIter.Add(1)
go func() {
newIter.WaitAndClose()
}()
go func() {
for j := 0; !iterator.Finished(); j++ {
batch := BioSequenceBatch{
slice: obiseq.MakeBioSequenceSlice(),
order: j}
for i := 0; i < batchsize && iterator.Next(); i++ {
seq := iterator.Get()
batch.slice = append(batch.slice, seq)
}
newIter.pointer.channel <- batch
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequence) IBioSequence(sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
for iterator.Next() {
s := iterator.Get()
newIter.pointer.channel <- s
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequence) Skip(n int, sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
for i := 0; iterator.Next(); i++ {
if i >= n {
s := iterator.Get()
newIter.pointer.channel <- s
}
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequence) Head(n int, sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
not_done := true
for i := 0; iterator.Next(); i++ {
if i < n {
s := iterator.Get()
newIter.pointer.channel <- s
} else {
if not_done {
newIter.Done()
not_done = false
}
}
}
}()
return newIter
}
// The 'Tail' method discard every data from the source iterator
// except the 'n' last ones.
func (iterator IBioSequence) Tail(n int, sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
buffseq := obiseq.MakeBioSequenceSlice()
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
var i int
for i = 0; iterator.Next(); i++ {
buffseq[i%n] = iterator.Get()
}
if i > n {
for j := 0; j < n; j++ {
newIter.Channel() <- buffseq[(i+j)%n]
}
} else {
for j := 0; j < i; j++ {
newIter.Channel() <- buffseq[j]
}
}
newIter.Done()
}()
return newIter
}
func (iterator IBioSequence) Concat(iterators ...IBioSequence) IBioSequence {
if len(iterators) == 0 {
return iterator
}
buffsize := iterator.BufferSize()
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
for iterator.Next() {
s := iterator.Get()
newIter.pointer.channel <- s
}
for _, iter := range iterators {
for iter.Next() {
s := iter.Get()
newIter.pointer.channel <- s
}
}
newIter.Done()
}()
return newIter
}

View File

@ -153,34 +153,6 @@ func (iterator IBioSequenceBatch) MakeISliceWorker(worker SeqSliceWorker, sizes
return newIter
}
func (iterator IBioSequence) MakeIWorker(worker SeqWorker, sizes ...int) IBioSequence {
buffsize := iterator.BufferSize()
if len(sizes) > 0 {
buffsize = sizes[0]
}
newIter := MakeIBioSequence(buffsize)
newIter.Add(1)
go func() {
newIter.Wait()
close(newIter.pointer.channel)
}()
go func() {
for iterator.Next() {
seq := iterator.Get()
seq = worker(seq)
newIter.pointer.channel <- seq
}
newIter.Done()
}()
return newIter
}
func WorkerPipe(worker SeqWorker, sizes ...int) Pipeable {
f := func(iterator IBioSequenceBatch) IBioSequenceBatch {
return iterator.MakeIWorker(worker, sizes...)

View File

@ -175,9 +175,3 @@ func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, erro
return iterator, nil
}
func ReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) {
ib, err := ReadBioSequencesBatch(filenames...)
return ib.SortBatches().IBioSequence(), err
}

View File

@ -8,63 +8,6 @@ import (
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
)
func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error {
opts := make([]obiformats.WithOption, 0, 10)
switch CLIOutputFastHeaderFormat() {
case "json":
log.Println("On output use JSON headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader))
case "obi":
log.Println("On output use OBI headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqOBIHeader))
default:
log.Println("On output use JSON headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader))
}
nworkers := obioptions.CLIParallelWorkers() / 4
if nworkers < 2 {
nworkers = 2
}
opts = append(opts, obiformats.OptionsParallelWorkers(nworkers))
opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize()))
opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize()))
opts = append(opts, obiformats.OptionsQualityShift(CLIOutputQualityShift()))
var err error
if len(filenames) == 0 {
switch CLIOutputFormat() {
case "fastq":
err = obiformats.WriteFastqToStdout(iterator, opts...)
case "fasta":
err = obiformats.WriteFastaToStdout(iterator, opts...)
default:
err = obiformats.WriteSequencesToStdout(iterator, opts...)
}
} else {
switch CLIOutputFormat() {
case "fastq":
err = obiformats.WriteFastqToFile(iterator, filenames[0], opts...)
case "fasta":
err = obiformats.WriteFastaToFile(iterator, filenames[0], opts...)
default:
err = obiformats.WriteSequencesToFile(iterator, filenames[0], opts...)
}
}
if err != nil {
log.Fatalf("Write file error: %v", err)
return err
}
return nil
}
func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch,
terminalAction bool, filenames ...string) (obiiter.IBioSequenceBatch, error) {