diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 876866f..992660c 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -159,7 +159,7 @@ func EmblChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioS } func _ParseEmblFile( - input ChannelSeqFileChunk, + input ChannelFileChunk, out obiiter.IBioSequence, withFeatureTable bool, ) { @@ -189,7 +189,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, er buff := make([]byte, 1024*1024*128) // 128 MB - entry_channel := ReadSeqFileChunk( + entry_channel := ReadFileChunk( opt.Source(), reader, buff, diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index cb162db..86b5de7 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -205,7 +205,7 @@ func FastaChunkParser() func(string, io.Reader) (obiseq.BioSequenceSlice, error) } func _ParseFastaFile( - input ChannelSeqFileChunk, + input ChannelFileChunk, out obiiter.IBioSequence, ) { @@ -213,6 +213,7 @@ func _ParseFastaFile( for chunks := range input { sequences, err := parser(chunks.Source, chunks.Raw) + // log.Warnf("Chunck(%d:%d) -%d- ", chunks.Order, l, sequences.Len()) if err != nil { log.Fatalf("File %s : Cannot parse the fasta file : %v", chunks.Source, err) @@ -234,7 +235,7 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e buff := make([]byte, 1024*1024) - chkchan := ReadSeqFileChunk( + chkchan := ReadFileChunk( opt.Source(), reader, buff, diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index fe11869..43a5aca 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -296,7 +296,7 @@ func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSeq } func _ParseFastqFile( - input ChannelSeqFileChunk, + input ChannelFileChunk, out obiiter.IBioSequence, quality_shift byte, ) { @@ -326,7 +326,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e buff := make([]byte, 1024*1024) - chkchan := ReadSeqFileChunk( + chkchan := ReadFileChunk( opt.Source(), reader, buff, diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index 4c9f7ef..7aa71fb 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -7,6 +7,7 @@ import ( "io" "os" "strings" + "time" log "github.com/sirupsen/logrus" @@ -132,7 +133,7 @@ func WriteFasta(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() - chunkchan := WriteSeqFileChunk(file, opt.CloseFile()) + chunkchan := WriteFileChunk(file, opt.CloseFile()) header_format := opt.FormatFastSeqHeader() @@ -140,6 +141,9 @@ func WriteFasta(iterator obiiter.IBioSequence, go func() { newIter.WaitAndClose() + for len(chunkchan) > 0 { + time.Sleep(time.Millisecond) + } close(chunkchan) log.Debugf("Writing fasta file done") }() @@ -151,7 +155,7 @@ func WriteFasta(iterator obiiter.IBioSequence, log.Debugf("Formating fasta chunk %d", batch.Order()) - chunkchan <- SeqFileChunk{ + chunkchan <- FileChunk{ Source: batch.Source(), Raw: FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()), Order: batch.Order(), @@ -166,7 +170,7 @@ func WriteFasta(iterator obiiter.IBioSequence, log.Debugln("Start of the fasta file writing") go ff(iterator) - for i := 0; i < nwriters-1; i++ { + for i := 1; i < nwriters; i++ { go ff(iterator.Split()) } diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index e3959c1..08e47f6 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -4,7 +4,6 @@ import ( "bytes" "io" "os" - "sync" "time" log "github.com/sirupsen/logrus" @@ -87,11 +86,6 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch, return &bs } -type FileChunk struct { - text []byte - order int -} - func WriteFastq(iterator obiiter.IBioSequence, file io.WriteCloser, options ...WithOption) (obiiter.IBioSequence, error) { @@ -104,27 +98,25 @@ func WriteFastq(iterator obiiter.IBioSequence, nwriters := opt.ParallelWorkers() - chunkchan := WriteSeqFileChunk(file, opt.CloseFile()) + chunkchan := WriteFileChunk(file, opt.CloseFile()) header_format := opt.FormatFastSeqHeader() newIter.Add(nwriters) - var waitWriter sync.WaitGroup - go func() { newIter.WaitAndClose() for len(chunkchan) > 0 { time.Sleep(time.Millisecond) } close(chunkchan) - waitWriter.Wait() + log.Debugf("Writing fastq file done") }() ff := func(iterator obiiter.IBioSequence) { for iterator.Next() { batch := iterator.Get() - chunk := SeqFileChunk{ + chunk := FileChunk{ Source: batch.Source(), Raw: FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()), Order: batch.Order(), @@ -137,7 +129,7 @@ func WriteFastq(iterator obiiter.IBioSequence, log.Debugln("Start of the fastq file writing") go ff(iterator) - for i := 0; i < nwriters-1; i++ { + for i := 1; i < nwriters; i++ { go ff(iterator.Split()) } diff --git a/pkg/obiformats/seqfile_chunk_read.go b/pkg/obiformats/file_chunk_read.go similarity index 88% rename from pkg/obiformats/seqfile_chunk_read.go rename to pkg/obiformats/file_chunk_read.go index b729829..17eb0c1 100644 --- a/pkg/obiformats/seqfile_chunk_read.go +++ b/pkg/obiformats/file_chunk_read.go @@ -11,13 +11,13 @@ import ( type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error) -type SeqFileChunk struct { +type FileChunk struct { Source string Raw *bytes.Buffer Order int } -type ChannelSeqFileChunk chan SeqFileChunk +type ChannelFileChunk chan FileChunk type LastSeqRecord func([]byte) int @@ -34,15 +34,15 @@ type LastSeqRecord func([]byte) int // // Returns: // None -func ReadSeqFileChunk( +func ReadFileChunk( source string, reader io.Reader, buff []byte, - splitter LastSeqRecord) ChannelSeqFileChunk { + splitter LastSeqRecord) ChannelFileChunk { var err error var fullbuff []byte - chunk_channel := make(ChannelSeqFileChunk) + chunk_channel := make(ChannelFileChunk) fileChunkSize := len(buff) @@ -95,8 +95,10 @@ func ReadSeqFileChunk( } if len(buff) > 0 { - io := bytes.NewBuffer(slices.Clone(buff)) - chunk_channel <- SeqFileChunk{source, io, i} + cbuff := slices.Clone(buff) + io := bytes.NewBuffer(cbuff) + // log.Warnf("chuck %d :Read %d bytes from file %s", i, io.Len(), source) + chunk_channel <- FileChunk{source, io, i} i++ } @@ -120,7 +122,7 @@ func ReadSeqFileChunk( // Send the last chunk to the channel if len(buff) > 0 { io := bytes.NewBuffer(slices.Clone(buff)) - chunk_channel <- SeqFileChunk{source, io, i} + chunk_channel <- FileChunk{source, io, i} } // Close the readers channel when the end of the file is reached diff --git a/pkg/obiformats/seqfile_chunk_write.go b/pkg/obiformats/file_chunk_write.go similarity index 88% rename from pkg/obiformats/seqfile_chunk_write.go rename to pkg/obiformats/file_chunk_write.go index 573ea20..820a821 100644 --- a/pkg/obiformats/seqfile_chunk_write.go +++ b/pkg/obiformats/file_chunk_write.go @@ -8,16 +8,16 @@ import ( log "github.com/sirupsen/logrus" ) -func WriteSeqFileChunk( +func WriteFileChunk( writer io.WriteCloser, - toBeClosed bool) ChannelSeqFileChunk { + toBeClosed bool) ChannelFileChunk { obiiter.RegisterAPipe() - chunk_channel := make(ChannelSeqFileChunk) + chunk_channel := make(ChannelFileChunk) go func() { nextToPrint := 0 - toBePrinted := make(map[int]SeqFileChunk) + toBePrinted := make(map[int]FileChunk) for chunk := range chunk_channel { if chunk.Order == nextToPrint { log.Debugf("Writing chunk: %d of length %d bytes", diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index f9dfed2..b0a0dfc 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -198,7 +198,7 @@ func GenbankChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.B } } -func _ParseGenbankFile(input ChannelSeqFileChunk, +func _ParseGenbankFile(input ChannelFileChunk, out obiiter.IBioSequence, withFeatureTable bool) { @@ -225,7 +225,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, buff := make([]byte, 1024*1024*128) // 128 MB - entry_channel := ReadSeqFileChunk( + entry_channel := ReadFileChunk( opt.Source(), reader, buff, diff --git a/pkg/obiformats/json_writer.go b/pkg/obiformats/json_writer.go index b51210c..2e5cf76 100644 --- a/pkg/obiformats/json_writer.go +++ b/pkg/obiformats/json_writer.go @@ -7,7 +7,6 @@ import ( "os" "strconv" "strings" - "sync" "time" "github.com/goccy/go-json" @@ -58,9 +57,17 @@ func JSONRecord(sequence *obiseq.BioSequence) []byte { return text } -func FormatJSONBatch(batch obiiter.BioSequenceBatch) []byte { +func FormatJSONBatch(batch obiiter.BioSequenceBatch) *bytes.Buffer { buff := new(bytes.Buffer) + json := bufio.NewWriter(buff) + + if batch.Order() == 0 { + json.WriteString("[\n") + } else { + json.WriteString(",\n") + } + n := batch.Slice().Len() - 1 for i, s := range batch.Slice() { json.WriteString(" ") @@ -71,8 +78,7 @@ func FormatJSONBatch(batch obiiter.BioSequenceBatch) []byte { } json.Flush() - - return buff.Bytes() + return buff } func WriteJSON(iterator obiiter.IBioSequence, @@ -84,14 +90,10 @@ func WriteJSON(iterator obiiter.IBioSequence, file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile()) newIter := obiiter.MakeIBioSequence() - nwriters := opt.ParallelWorkers() - obiiter.RegisterAPipe() - chunkchan := make(chan FileChunk) - + chunkchan := WriteFileChunk(file, opt.CloseFile()) newIter.Add(nwriters) - var waitWriter sync.WaitGroup go func() { newIter.WaitAndClose() @@ -99,7 +101,6 @@ func WriteJSON(iterator obiiter.IBioSequence, time.Sleep(time.Millisecond) } close(chunkchan) - waitWriter.Wait() }() ff := func(iterator obiiter.IBioSequence) { @@ -107,62 +108,31 @@ func WriteJSON(iterator obiiter.IBioSequence, batch := iterator.Get() - chunkchan <- FileChunk{ - FormatJSONBatch(batch), - batch.Order(), + ss := FileChunk{ + Source: batch.Source(), + Raw: FormatJSONBatch(batch), + Order: batch.Order(), } + + chunkchan <- ss newIter.Push(batch) } newIter.Done() } - next_to_send := 0 - received := make(map[int]FileChunk, 100) - - waitWriter.Add(1) - go func() { - for chunk := range chunkchan { - if chunk.order == next_to_send { - if next_to_send > 0 { - file.Write([]byte(",\n")) - } - file.Write(chunk.text) - next_to_send++ - chunk, ok := received[next_to_send] - for ok { - file.Write(chunk.text) - delete(received, next_to_send) - next_to_send++ - chunk, ok = received[next_to_send] - } - } else { - received[chunk.order] = chunk - } - - } - - file.Write([]byte("\n]\n")) - file.Close() - - log.Debugln("End of the JSON file writing") - obiiter.UnregisterPipe() - waitWriter.Done() - - }() - log.Debugln("Start of the JSON file writing") - file.Write([]byte("[\n")) - go ff(iterator) - for i := 0; i < nwriters-1; i++ { + for i := 1; i < nwriters; i++ { go ff(iterator.Split()) } + go ff(iterator) return newIter, nil } func WriteJSONToStdout(iterator obiiter.IBioSequence, options ...WithOption) (obiiter.IBioSequence, error) { - options = append(options, OptionDontCloseFile()) + options = append(options, OptionCloseFile()) + return WriteJSON(iterator, os.Stdout, options...) } diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 53233c0..a786cbe 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -7,7 +7,7 @@ import ( // TODO: The version number is extracted from git. This induces that the version // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "3d06978" +var _Commit = "69ef175" var _Version = "Release 4.2.0" // Version returns the version of the obitools package. diff --git a/pkg/obiseq/attributes.go b/pkg/obiseq/attributes.go index 5a604d9..26fff9c 100644 --- a/pkg/obiseq/attributes.go +++ b/pkg/obiseq/attributes.go @@ -16,11 +16,12 @@ import ( // // Returns: // - A set of strings containing the keys of the BioSequence attributes. -func (s *BioSequence) AttributeKeys(skip_container bool) obiutils.Set[string] { +func (s *BioSequence) AttributeKeys(skip_container, skip_definition bool) obiutils.Set[string] { keys := obiutils.MakeSet[string]() for k, v := range s.Annotations() { - if !skip_container || !obiutils.IsAContainer(v) { + if !((skip_container && obiutils.IsAContainer(v)) || + (skip_definition && k == "definition")) { keys.Add(k) } } @@ -38,8 +39,8 @@ func (s *BioSequence) AttributeKeys(skip_container bool) obiutils.Set[string] { // // Returns: // - A set of strings containing the keys of the BioSequence. -func (s *BioSequence) Keys(skip_container bool) obiutils.Set[string] { - keys := s.AttributeKeys(skip_container) +func (s *BioSequence) Keys(skip_container, skip_definition bool) obiutils.Set[string] { + keys := s.AttributeKeys(skip_container, skip_definition) keys.Add("id") if s.HasSequence() { diff --git a/pkg/obiseq/biosequenceslice.go b/pkg/obiseq/biosequenceslice.go index 24993ec..12210a6 100644 --- a/pkg/obiseq/biosequenceslice.go +++ b/pkg/obiseq/biosequenceslice.go @@ -150,11 +150,11 @@ func (s BioSequenceSlice) Size() int { return size } -func (s BioSequenceSlice) AttributeKeys(skip_map bool) obiutils.Set[string] { +func (s BioSequenceSlice) AttributeKeys(skip_map, skip_definition bool) obiutils.Set[string] { keys := obiutils.MakeSet[string]() for _, k := range s { - keys = keys.Union(k.AttributeKeys(skip_map)) + keys = keys.Union(k.AttributeKeys(skip_map, skip_definition)) } return keys diff --git a/pkg/obitax/default_taxonomy.go b/pkg/obitax/default_taxonomy.go index d963f0f..9a10ac1 100644 --- a/pkg/obitax/default_taxonomy.go +++ b/pkg/obitax/default_taxonomy.go @@ -1,16 +1,17 @@ package obitax -import "log" +import log "github.com/sirupsen/logrus" var __defaut_taxonomy__ *Taxonomy func (taxonomy *Taxonomy) SetAsDefault() { + log.Infof("Set as default taxonomy %s", taxonomy.Name()) __defaut_taxonomy__ = taxonomy } func (taxonomy *Taxonomy) OrDefault(panicOnNil bool) *Taxonomy { if taxonomy == nil { - return __defaut_taxonomy__ + taxonomy = __defaut_taxonomy__ } if panicOnNil && taxonomy == nil { diff --git a/pkg/obitools/obicsv/options.go b/pkg/obitools/obicsv/options.go index 08f320d..cff5e4c 100644 --- a/pkg/obitools/obicsv/options.go +++ b/pkg/obitools/obicsv/options.go @@ -1,6 +1,7 @@ package obicsv import ( + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" "github.com/DavidGamba/go-getoptions" @@ -66,6 +67,7 @@ func CSVOptionSet(options *getoptions.GetOpt) { func OptionSet(options *getoptions.GetOpt) { obiconvert.InputOptionSet(options) obiconvert.OutputModeOptionSet(options) + obioptions.LoadTaxonomyOptionSet(options, false, false) CSVOptionSet(options) } diff --git a/pkg/obitools/obicsv/sequence.go b/pkg/obitools/obicsv/sequence.go index 71e80e2..cc3023b 100644 --- a/pkg/obitools/obicsv/sequence.go +++ b/pkg/obitools/obicsv/sequence.go @@ -121,7 +121,7 @@ func NewCSVSequenceIterator(iter obiiter.IBioSequence, options ...WithOption) *I if len(batch.Slice()) == 0 { log.Panicf("first batch should not be empty") } - auto_slot := batch.Slice().AttributeKeys(true).Members() + auto_slot := batch.Slice().AttributeKeys(true, true).Members() slices.Sort(auto_slot) CSVKeys(auto_slot)(opt) iter.PushBack() diff --git a/pkg/obitools/obicsv/writer.go b/pkg/obitools/obicsv/writer.go index c11342c..941f99a 100644 --- a/pkg/obitools/obicsv/writer.go +++ b/pkg/obitools/obicsv/writer.go @@ -55,7 +55,7 @@ func WriteCSV(iterator *ICSVRecord, nwriters := opt.ParallelWorkers() - chunkchan := obiformats.WriteSeqFileChunk(file, opt.CloseFile()) + chunkchan := obiformats.WriteFileChunk(file, opt.CloseFile()) newIter.Add(nwriters) @@ -72,7 +72,7 @@ func WriteCSV(iterator *ICSVRecord, log.Debugf("Formating CSV chunk %d", batch.Order()) - ss := obiformats.SeqFileChunk{ + ss := obiformats.FileChunk{ Source: batch.Source(), Raw: FormatCVSBatch( batch,