refactor code and change algorithm used to read from many files

This commit is contained in:
2022-08-23 15:07:06 +02:00
parent bdf317819b
commit 989e678f6b
6 changed files with 157 additions and 49 deletions

View File

@ -6,6 +6,7 @@ import (
"encoding/gob" "encoding/gob"
"io" "io"
"os" "os"
"sync"
) )
// NotAnInteger defines a new type of Error : "NotAnInteger" // NotAnInteger defines a new type of Error : "NotAnInteger"
@ -69,7 +70,6 @@ func (m *NotABoolean) Error() string {
return m.message return m.message
} }
// It converts an interface{} to a bool, and returns an error if the interface{} cannot be converted // It converts an interface{} to a bool, and returns an error if the interface{} cannot be converted
// to a bool // to a bool
func InterfaceToBool(i interface{}) (val bool, err error) { func InterfaceToBool(i interface{}) (val bool, err error) {
@ -79,27 +79,27 @@ func InterfaceToBool(i interface{}) (val bool, err error) {
switch t := i.(type) { switch t := i.(type) {
case int: case int:
val = t!=0 val = t != 0
case int8: case int8:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case int16: case int16:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case int32: case int32:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case int64: case int64:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case float32: case float32:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case float64: case float64:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case uint8: case uint8:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case uint16: case uint16:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case uint32: case uint32:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
case uint64: case uint64:
val = t!=0 // standardizes across systems val = t != 0 // standardizes across systems
default: default:
err = &NotABoolean{"value attribute cannot be casted to a boolean"} err = &NotABoolean{"value attribute cannot be casted to a boolean"}
} }
@ -121,7 +121,7 @@ func CastableToInt(i interface{}) bool {
} }
// "CopyMap copies the contents of a map[string]interface{} to another map[string]interface{}." // "CopyMap copies the contents of a map[string]interface{} to another map[string]interface{}."
// //
// The function uses the gob package to encode the source map into a buffer and then decode the buffer // The function uses the gob package to encode the source map into a buffer and then decode the buffer
// into the destination map // into the destination map
func CopyMap(dest, src map[string]interface{}) { func CopyMap(dest, src map[string]interface{}) {
@ -161,12 +161,30 @@ func ReadLines(path string) (lines []string, err error) {
return return
} }
func Contains[T comparable](arr []T, x T) bool { func Contains[T comparable](arr []T, x T) bool {
for _, v := range arr { for _, v := range arr {
if v == x { if v == x {
return true return true
} }
} }
return false return false
}
func AtomicCounter(initial ...int) func() int {
counterMutex := sync.Mutex{}
counter := 0
if len(initial) > 0 {
counter = initial[0]
}
nextCounter := func() int {
counterMutex.Lock()
defer counterMutex.Unlock()
val := counter
counter++
return val
}
return nextCounter
} }

View File

@ -0,0 +1,64 @@
package obiformats
import (
"log"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
)
func ReadSequencesBatchFromFiles(filenames []string,
reader IBatchReader,
concurrent_readers int,
options ...WithOption) obiiter.IBioSequenceBatch {
if reader == nil {
reader = ReadSequencesBatchFromFile
}
batchiter := obiiter.MakeIBioSequenceBatch(0)
nextCounter := goutils.AtomicCounter()
batchiter.Add(concurrent_readers)
go func() {
batchiter.WaitAndClose()
log.Println("Finnished to read every files")
}()
filenameChan := make(chan string)
go func() {
for _, filename := range filenames {
filenameChan <- filename
}
close(filenameChan)
}()
for i := 0; i < concurrent_readers; i++ {
go func() {
for filename := range filenameChan {
iter, err := reader(filename, options...)
if err != nil {
log.Panicf("Cannot open file %s : %v", filename, err)
}
log.Printf("Start reading of file : %s", filename)
for iter.Next() {
batch := iter.Get()
batchiter.Push(batch.Reorder(nextCounter()))
}
log.Printf("End of reading of file : %s", filename)
}
batchiter.Done()
}()
}
return batchiter
}

View File

@ -0,0 +1,5 @@
package obiformats
import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
type IBatchReader func(string, ...WithOption) (obiiter.IBioSequenceBatch, error)

View File

@ -120,7 +120,9 @@ func WriteFastaBatch(iterator obiiter.IBioSequenceBatch,
ff := func(iterator obiiter.IBioSequenceBatch) { ff := func(iterator obiiter.IBioSequenceBatch) {
for iterator.Next() { for iterator.Next() {
batch := iterator.Get() batch := iterator.Get()
chunkchan <- FileChunck{ chunkchan <- FileChunck{
FormatFastaBatch(batch, header_format), FormatFastaBatch(batch, header_format),
batch.Order(), batch.Order(),

View File

@ -73,7 +73,7 @@ func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, erro
opts := make([]obiformats.WithOption, 0, 10) opts := make([]obiformats.WithOption, 0, 10)
switch InputFastHeaderFormat() { switch CLIInputFastHeaderFormat() {
case "json": case "json":
opts = append(opts, obiformats.OptionsFastSeqHeaderParser(obiformats.ParseFastSeqJsonHeader)) opts = append(opts, obiformats.OptionsFastSeqHeaderParser(obiformats.ParseFastSeqJsonHeader))
case "obi": case "obi":
@ -91,15 +91,17 @@ func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, erro
opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize()))
opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize()))
opts = append(opts, obiformats.OptionsQualityShift(InputQualityShift())) opts = append(opts, obiformats.OptionsQualityShift(CLIInputQualityShift()))
if len(filenames) == 0 { if len(filenames) == 0 {
switch InputFormat() { switch CLIInputFormat() {
case "ecopcr": case "ecopcr":
iterator = obiformats.ReadEcoPCRBatch(os.Stdin, opts...) iterator = obiformats.ReadEcoPCRBatch(os.Stdin, opts...)
case "embl": case "embl":
iterator = obiformats.ReadEMBLBatch(os.Stdin, opts...) iterator = obiformats.ReadEMBLBatch(os.Stdin, opts...)
case "genbank":
iterator = obiformats.ReadGenbankBatch(os.Stdin, opts...)
default: default:
iterator = obiformats.ReadFastSeqBatchFromStdin(opts...) iterator = obiformats.ReadFastSeqBatchFromStdin(opts...)
} }
@ -110,39 +112,56 @@ func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, erro
return obiiter.NilIBioSequenceBatch, err return obiiter.NilIBioSequenceBatch, err
} }
switch InputFormat() { switch CLIInputFormat() {
case "ecopcr": case "ecopcr":
reader = obiformats.ReadEcoPCRBatchFromFile reader = obiformats.ReadEcoPCRBatchFromFile
case "embl": case "embl":
reader = obiformats.ReadEMBLBatchFromFile reader = obiformats.ReadEMBLBatchFromFile
case "genbank":
reader = obiformats.ReadGenbankBatchFromFile
default: default:
reader = obiformats.ReadSequencesBatchFromFile reader = obiformats.ReadSequencesBatchFromFile
} }
iterator, err = reader(list_of_files[0], opts...) if len(list_of_files) > 1 {
nreader := 1
if err != nil { if CLINoInputOrder() {
return obiiter.NilIBioSequenceBatch, err nreader = obioptions.CLIParallelWorkers()
} }
iterator = obiformats.ReadSequencesBatchFromFiles(
filenames,
reader,
nreader,
opts...,
)
} else {
iterator, err = reader(list_of_files[0], opts...)
list_of_files = list_of_files[1:]
others := make([]obiiter.IBioSequenceBatch, 0, len(list_of_files))
for _, fn := range list_of_files {
r, err := reader(fn, opts...)
if err != nil { if err != nil {
return obiiter.NilIBioSequenceBatch, err return obiiter.NilIBioSequenceBatch, err
} }
others = append(others, r)
} }
if len(others) > 0 { // list_of_files = list_of_files[1:]
if NoInputOrder() { // others := make([]obiiter.IBioSequenceBatch, 0, len(list_of_files))
iterator = iterator.Pool(others...)
} else { // for _, fn := range list_of_files {
iterator = iterator.Concat(others...) // r, err := reader(fn, opts...)
} // if err != nil {
} // return obiiter.NilIBioSequenceBatch, err
// }
// others = append(others, r)
// }
// if len(others) > 0 {
// if CLINoInputOrder() {
// iterator = iterator.Pool(others...)
// } else {
// iterator = iterator.Concat(others...)
// }
// }
} }

View File

@ -12,7 +12,7 @@ func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error
opts := make([]obiformats.WithOption, 0, 10) opts := make([]obiformats.WithOption, 0, 10)
switch OutputFastHeaderFormat() { switch CLIOutputFastHeaderFormat() {
case "json": case "json":
log.Println("On output use JSON headers") log.Println("On output use JSON headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader))
@ -33,12 +33,12 @@ func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error
opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize()))
opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize()))
opts = append(opts, obiformats.OptionsQualityShift(OutputQualityShift())) opts = append(opts, obiformats.OptionsQualityShift(CLIOutputQualityShift()))
var err error var err error
if len(filenames) == 0 { if len(filenames) == 0 {
switch OutputFormat() { switch CLIOutputFormat() {
case "fastq": case "fastq":
err = obiformats.WriteFastqToStdout(iterator, opts...) err = obiformats.WriteFastqToStdout(iterator, opts...)
case "fasta": case "fasta":
@ -47,7 +47,7 @@ func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error
err = obiformats.WriteSequencesToStdout(iterator, opts...) err = obiformats.WriteSequencesToStdout(iterator, opts...)
} }
} else { } else {
switch OutputFormat() { switch CLIOutputFormat() {
case "fastq": case "fastq":
err = obiformats.WriteFastqToFile(iterator, filenames[0], opts...) err = obiformats.WriteFastqToFile(iterator, filenames[0], opts...)
case "fasta": case "fasta":
@ -72,7 +72,7 @@ func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch,
opts := make([]obiformats.WithOption, 0, 10) opts := make([]obiformats.WithOption, 0, 10)
switch OutputFastHeaderFormat() { switch CLIOutputFastHeaderFormat() {
case "json": case "json":
log.Println("On output use JSON headers") log.Println("On output use JSON headers")
opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader))
@ -93,12 +93,12 @@ func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch,
opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize()))
opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize()))
opts = append(opts, obiformats.OptionsQualityShift(OutputQualityShift())) opts = append(opts, obiformats.OptionsQualityShift(CLIOutputQualityShift()))
var err error var err error
if len(filenames) == 0 { if len(filenames) == 0 {
switch OutputFormat() { switch CLIOutputFormat() {
case "fastq": case "fastq":
newIter, err = obiformats.WriteFastqBatchToStdout(iterator, opts...) newIter, err = obiformats.WriteFastqBatchToStdout(iterator, opts...)
case "fasta": case "fasta":
@ -107,7 +107,7 @@ func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch,
newIter, err = obiformats.WriteSequencesBatchToStdout(iterator, opts...) newIter, err = obiformats.WriteSequencesBatchToStdout(iterator, opts...)
} }
} else { } else {
switch OutputFormat() { switch CLIOutputFormat() {
case "fastq": case "fastq":
newIter, err = obiformats.WriteFastqBatchToFile(iterator, filenames[0], opts...) newIter, err = obiformats.WriteFastqBatchToFile(iterator, filenames[0], opts...)
case "fasta": case "fasta":