Work on iterators and recycling of biosequences

This commit is contained in:
2022-01-14 23:11:36 +01:00
parent ef66ca4972
commit e8fff6477b
22 changed files with 350 additions and 111 deletions

View File

@@ -7,7 +7,6 @@ import (
"log"
"os"
"strings"
"sync"
"time"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
@@ -82,26 +81,30 @@ func WriteFastaToStdout(iterator obiseq.IBioSequence, options ...WithOption) err
return WriteFasta(iterator, os.Stdout, options...)
}
func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) error {
func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) {
opt := MakeOptions(options)
buffsize := iterator.BufferSize()
newIter := obiseq.MakeIBioSequenceBatch(buffsize)
opt := MakeOptions(options)
nwriters := 4
nwriters := opt.ParallelWorkers()
chunkchan := make(chan FileChunck)
chunkwait := sync.WaitGroup{}
header_format := opt.FormatFastSeqHeader()
chunkwait.Add(nwriters)
newIter.Add(nwriters)
go func() {
chunkwait.Wait()
newIter.Wait()
for len(chunkchan) > 0 {
time.Sleep(time.Millisecond)
}
close(chunkchan)
for len(newIter.Channel()) > 0 {
time.Sleep(time.Millisecond)
}
close(newIter.Channel())
}()
ff := func(iterator obiseq.IBioSequenceBatch) {
@@ -116,9 +119,11 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options
newIter.Done()
}
for i := 0; i < nwriters; i++ {
log.Println("Start of the fasta file writing")
for i := 0; i < nwriters-1; i++ {
go ff(iterator.Split())
}
go ff(iterator)
next_to_send := 0
received := make(map[int]FileChunck, 100)
@@ -142,22 +147,22 @@ func WriteFastaBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options
}
}()
return nil
return newIter, nil
}
func WriteFastaBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) error {
func WriteFastaBatchToStdout(iterator obiseq.IBioSequenceBatch, options ...WithOption) (obiseq.IBioSequenceBatch, error) {
return WriteFastaBatch(iterator, os.Stdout, options...)
}
func WriteFastaBatchToFile(iterator obiseq.IBioSequenceBatch,
filename string,
options ...WithOption) error {
options ...WithOption) (obiseq.IBioSequenceBatch, error) {
file, err := os.Create(filename)
if err != nil {
log.Fatalf("open file error: %v", err)
return err
return obiseq.NilIBioSequenceBatch, err
}
return WriteFastaBatch(iterator, file, options...)

View File

@@ -82,11 +82,12 @@ type FileChunck struct {
}
func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options ...WithOption) (obiseq.IBioSequenceBatch, error) {
opt := MakeOptions(options)
buffsize := iterator.BufferSize()
newIter := obiseq.MakeIBioSequenceBatch(buffsize)
opt := MakeOptions(options)
nwriters := 4
nwriters := opt.ParallelWorkers()
chunkchan := make(chan FileChunck)
@@ -110,19 +111,21 @@ func WriteFastqBatch(iterator obiseq.IBioSequenceBatch, file io.Writer, options
ff := func(iterator obiseq.IBioSequenceBatch) {
for iterator.Next() {
batch := iterator.Get()
chunkchan <- FileChunck{
chunk := FileChunck{
FormatFastqBatch(batch, quality, header_format),
batch.Order(),
}
chunkchan <- chunk
newIter.Channel() <- batch
}
newIter.Done()
}
log.Println("Start of the fastq file reading")
for i := 0; i < nwriters; i++ {
log.Println("Start of the fastq file writing")
for i := 0; i < nwriters-1; i++ {
go ff(iterator.Split())
}
go ff(iterator)
next_to_send := 0
received := make(map[int]FileChunck, 100)

View File

@@ -52,28 +52,45 @@ func WriteSequencesToStdout(iterator obiseq.IBioSequence, options ...WithOption)
return WriteSequences(iterator, os.Stdout, options...)
}
// func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch,
// file io.Writer,
// options ...WithOption) error {
func WriteSequenceBatch(iterator obiseq.IBioSequenceBatch,
file io.Writer,
options ...WithOption) (obiseq.IBioSequenceBatch,error) {
// opts := MakeOptions(options)
var newIter obiseq.IBioSequenceBatch
var err error
// header_format := opts.FormatFastSeqHeader()
// quality := opts.QualityShift()
ok := iterator.Next()
// ok := iterator.Next()
if ok {
iterator.PushBack()
batch := iterator.Get()
if batch.Slice()[0].HasQualities() {
newIter,err = WriteFastqBatch(iterator, file, options...)
} else {
newIter,err = WriteFastaBatch(iterator, file, options...)
}
// if ok {
// batch := iterator.Get()
// if batch.Slice()[0].HasQualities() {
// file.Write()
// fmt.Fprintln(file, FormatFastq(seq, quality, header_format))
// WriteFastq(iterator, file, options...)
// } else {
// fmt.Fprintln(file, FormatFasta(seq, header_format))
// WriteFasta(iterator, file, options...)
// }
// }
return newIter,err
}
// return nil
// }
return obiseq.NilIBioSequenceBatch,fmt.Errorf("input iterator not ready")
}
func WriteSequencesBatchToStdout(iterator obiseq.IBioSequenceBatch,
options ...WithOption) (obiseq.IBioSequenceBatch,error) {
return WriteSequenceBatch(iterator, os.Stdout, options...)
}
func WriteSequencesBatchToFile(iterator obiseq.IBioSequenceBatch,
filename string,
options ...WithOption) (obiseq.IBioSequenceBatch,error) {
file, err := os.Create(filename)
if err != nil {
log.Fatalf("open file error: %v", err)
return obiseq.NilIBioSequenceBatch, err
}
return WriteSequenceBatch(iterator, file, options...)
}