From 989e678f6bdbdbaba98424c50db822ea1f8a1da3 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Tue, 23 Aug 2022 15:07:06 +0200 Subject: [PATCH] refactor code and change algorithm used to read from many files --- pkg/goutils/goutils.go | 58 +++++++++++++------- pkg/obiformats/batch_of_files_reader.go | 64 ++++++++++++++++++++++ pkg/obiformats/batch_reader_type.go | 5 ++ pkg/obiformats/fastseq_write_fasta.go | 2 + pkg/obitools/obiconvert/sequence_reader.go | 61 ++++++++++++++------- pkg/obitools/obiconvert/sequence_writer.go | 16 +++--- 6 files changed, 157 insertions(+), 49 deletions(-) create mode 100644 pkg/obiformats/batch_of_files_reader.go create mode 100644 pkg/obiformats/batch_reader_type.go diff --git a/pkg/goutils/goutils.go b/pkg/goutils/goutils.go index 55b2d01..8bd0308 100644 --- a/pkg/goutils/goutils.go +++ b/pkg/goutils/goutils.go @@ -6,6 +6,7 @@ import ( "encoding/gob" "io" "os" + "sync" ) // NotAnInteger defines a new type of Error : "NotAnInteger" @@ -69,7 +70,6 @@ func (m *NotABoolean) Error() string { return m.message } - // It converts an interface{} to a bool, and returns an error if the interface{} cannot be converted // to a bool func InterfaceToBool(i interface{}) (val bool, err error) { @@ -79,27 +79,27 @@ func InterfaceToBool(i interface{}) (val bool, err error) { switch t := i.(type) { case int: - val = t!=0 + val = t != 0 case int8: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case int16: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case int32: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case int64: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case float32: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case float64: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case uint8: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case uint16: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case uint32: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems case uint64: - val = t!=0 // standardizes across systems + val = t != 0 // standardizes across systems default: err = &NotABoolean{"value attribute cannot be casted to a boolean"} } @@ -121,7 +121,7 @@ func CastableToInt(i interface{}) bool { } // "CopyMap copies the contents of a map[string]interface{} to another map[string]interface{}." -// +// // The function uses the gob package to encode the source map into a buffer and then decode the buffer // into the destination map func CopyMap(dest, src map[string]interface{}) { @@ -161,12 +161,30 @@ func ReadLines(path string) (lines []string, err error) { return } - func Contains[T comparable](arr []T, x T) bool { - for _, v := range arr { - if v == x { - return true - } - } - return false + for _, v := range arr { + if v == x { + return true + } + } + return false +} + +func AtomicCounter(initial ...int) func() int { + counterMutex := sync.Mutex{} + counter := 0 + if len(initial) > 0 { + counter = initial[0] + } + + nextCounter := func() int { + counterMutex.Lock() + defer counterMutex.Unlock() + val := counter + counter++ + + return val + } + + return nextCounter } diff --git a/pkg/obiformats/batch_of_files_reader.go b/pkg/obiformats/batch_of_files_reader.go new file mode 100644 index 0000000..2d5aa4c --- /dev/null +++ b/pkg/obiformats/batch_of_files_reader.go @@ -0,0 +1,64 @@ +package obiformats + +import ( + "log" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" +) + +func ReadSequencesBatchFromFiles(filenames []string, + reader IBatchReader, + concurrent_readers int, + options ...WithOption) obiiter.IBioSequenceBatch { + + if reader == nil { + reader = ReadSequencesBatchFromFile + } + + batchiter := obiiter.MakeIBioSequenceBatch(0) + nextCounter := goutils.AtomicCounter() + + batchiter.Add(concurrent_readers) + + go func() { + batchiter.WaitAndClose() + log.Println("Finnished to read every files") + }() + + filenameChan := make(chan string) + + go func() { + for _, filename := range filenames { + filenameChan <- filename + } + + close(filenameChan) + }() + + for i := 0; i < concurrent_readers; i++ { + go func() { + + for filename := range filenameChan { + iter, err := reader(filename, options...) + + if err != nil { + log.Panicf("Cannot open file %s : %v", filename, err) + } + + log.Printf("Start reading of file : %s", filename) + + for iter.Next() { + batch := iter.Get() + batchiter.Push(batch.Reorder(nextCounter())) + } + + log.Printf("End of reading of file : %s", filename) + + } + batchiter.Done() + }() + } + + return batchiter +} diff --git a/pkg/obiformats/batch_reader_type.go b/pkg/obiformats/batch_reader_type.go new file mode 100644 index 0000000..db9d83f --- /dev/null +++ b/pkg/obiformats/batch_reader_type.go @@ -0,0 +1,5 @@ +package obiformats + +import "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + +type IBatchReader func(string, ...WithOption) (obiiter.IBioSequenceBatch, error) \ No newline at end of file diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 7573777..3563c07 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -120,7 +120,9 @@ func WriteFastaBatch(iterator obiiter.IBioSequenceBatch, ff := func(iterator obiiter.IBioSequenceBatch) { for iterator.Next() { + batch := iterator.Get() + chunkchan <- FileChunck{ FormatFastaBatch(batch, header_format), batch.Order(), diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index 697a7b6..6ab8fca 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -73,7 +73,7 @@ func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, erro opts := make([]obiformats.WithOption, 0, 10) - switch InputFastHeaderFormat() { + switch CLIInputFastHeaderFormat() { case "json": opts = append(opts, obiformats.OptionsFastSeqHeaderParser(obiformats.ParseFastSeqJsonHeader)) case "obi": @@ -91,15 +91,17 @@ func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, erro opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) - opts = append(opts, obiformats.OptionsQualityShift(InputQualityShift())) + opts = append(opts, obiformats.OptionsQualityShift(CLIInputQualityShift())) if len(filenames) == 0 { - switch InputFormat() { + switch CLIInputFormat() { case "ecopcr": iterator = obiformats.ReadEcoPCRBatch(os.Stdin, opts...) case "embl": iterator = obiformats.ReadEMBLBatch(os.Stdin, opts...) + case "genbank": + iterator = obiformats.ReadGenbankBatch(os.Stdin, opts...) default: iterator = obiformats.ReadFastSeqBatchFromStdin(opts...) } @@ -110,39 +112,56 @@ func ReadBioSequencesBatch(filenames ...string) (obiiter.IBioSequenceBatch, erro return obiiter.NilIBioSequenceBatch, err } - switch InputFormat() { + switch CLIInputFormat() { case "ecopcr": reader = obiformats.ReadEcoPCRBatchFromFile case "embl": reader = obiformats.ReadEMBLBatchFromFile + case "genbank": + reader = obiformats.ReadGenbankBatchFromFile default: reader = obiformats.ReadSequencesBatchFromFile } - iterator, err = reader(list_of_files[0], opts...) + if len(list_of_files) > 1 { + nreader := 1 - if err != nil { - return obiiter.NilIBioSequenceBatch, err - } + if CLINoInputOrder() { + nreader = obioptions.CLIParallelWorkers() + } + + iterator = obiformats.ReadSequencesBatchFromFiles( + filenames, + reader, + nreader, + opts..., + ) + } else { + iterator, err = reader(list_of_files[0], opts...) - list_of_files = list_of_files[1:] - others := make([]obiiter.IBioSequenceBatch, 0, len(list_of_files)) - - for _, fn := range list_of_files { - r, err := reader(fn, opts...) if err != nil { return obiiter.NilIBioSequenceBatch, err } - others = append(others, r) } - if len(others) > 0 { - if NoInputOrder() { - iterator = iterator.Pool(others...) - } else { - iterator = iterator.Concat(others...) - } - } + // list_of_files = list_of_files[1:] + // others := make([]obiiter.IBioSequenceBatch, 0, len(list_of_files)) + + // for _, fn := range list_of_files { + // r, err := reader(fn, opts...) + // if err != nil { + // return obiiter.NilIBioSequenceBatch, err + // } + // others = append(others, r) + // } + + // if len(others) > 0 { + // if CLINoInputOrder() { + // iterator = iterator.Pool(others...) + // } else { + // iterator = iterator.Concat(others...) + // } + // } } diff --git a/pkg/obitools/obiconvert/sequence_writer.go b/pkg/obitools/obiconvert/sequence_writer.go index 7c75dd7..94a9284 100644 --- a/pkg/obitools/obiconvert/sequence_writer.go +++ b/pkg/obitools/obiconvert/sequence_writer.go @@ -12,7 +12,7 @@ func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error opts := make([]obiformats.WithOption, 0, 10) - switch OutputFastHeaderFormat() { + switch CLIOutputFastHeaderFormat() { case "json": log.Println("On output use JSON headers") opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) @@ -33,12 +33,12 @@ func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) - opts = append(opts, obiformats.OptionsQualityShift(OutputQualityShift())) + opts = append(opts, obiformats.OptionsQualityShift(CLIOutputQualityShift())) var err error if len(filenames) == 0 { - switch OutputFormat() { + switch CLIOutputFormat() { case "fastq": err = obiformats.WriteFastqToStdout(iterator, opts...) case "fasta": @@ -47,7 +47,7 @@ func WriteBioSequences(iterator obiiter.IBioSequence, filenames ...string) error err = obiformats.WriteSequencesToStdout(iterator, opts...) } } else { - switch OutputFormat() { + switch CLIOutputFormat() { case "fastq": err = obiformats.WriteFastqToFile(iterator, filenames[0], opts...) case "fasta": @@ -72,7 +72,7 @@ func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch, opts := make([]obiformats.WithOption, 0, 10) - switch OutputFastHeaderFormat() { + switch CLIOutputFastHeaderFormat() { case "json": log.Println("On output use JSON headers") opts = append(opts, obiformats.OptionsFastSeqHeaderFormat(obiformats.FormatFastSeqJsonHeader)) @@ -93,12 +93,12 @@ func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch, opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) - opts = append(opts, obiformats.OptionsQualityShift(OutputQualityShift())) + opts = append(opts, obiformats.OptionsQualityShift(CLIOutputQualityShift())) var err error if len(filenames) == 0 { - switch OutputFormat() { + switch CLIOutputFormat() { case "fastq": newIter, err = obiformats.WriteFastqBatchToStdout(iterator, opts...) case "fasta": @@ -107,7 +107,7 @@ func WriteBioSequencesBatch(iterator obiiter.IBioSequenceBatch, newIter, err = obiformats.WriteSequencesBatchToStdout(iterator, opts...) } } else { - switch OutputFormat() { + switch CLIOutputFormat() { case "fastq": newIter, err = obiformats.WriteFastqBatchToFile(iterator, filenames[0], opts...) case "fasta":