refactoring of the file chunck writing

This commit is contained in:
Eric Coissac
2024-11-29 18:15:03 +01:00
parent 69ef1758a2
commit 00b0edc15a
16 changed files with 71 additions and 98 deletions

View File

@ -159,7 +159,7 @@ func EmblChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.BioS
}
func _ParseEmblFile(
input ChannelSeqFileChunk,
input ChannelFileChunk,
out obiiter.IBioSequence,
withFeatureTable bool,
) {
@ -189,7 +189,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, er
buff := make([]byte, 1024*1024*128) // 128 MB
entry_channel := ReadSeqFileChunk(
entry_channel := ReadFileChunk(
opt.Source(),
reader,
buff,

View File

@ -205,7 +205,7 @@ func FastaChunkParser() func(string, io.Reader) (obiseq.BioSequenceSlice, error)
}
func _ParseFastaFile(
input ChannelSeqFileChunk,
input ChannelFileChunk,
out obiiter.IBioSequence,
) {
@ -213,6 +213,7 @@ func _ParseFastaFile(
for chunks := range input {
sequences, err := parser(chunks.Source, chunks.Raw)
// log.Warnf("Chunck(%d:%d) -%d- ", chunks.Order, l, sequences.Len())
if err != nil {
log.Fatalf("File %s : Cannot parse the fasta file : %v", chunks.Source, err)
@ -234,7 +235,7 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
buff := make([]byte, 1024*1024)
chkchan := ReadSeqFileChunk(
chkchan := ReadFileChunk(
opt.Source(),
reader,
buff,

View File

@ -296,7 +296,7 @@ func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSeq
}
func _ParseFastqFile(
input ChannelSeqFileChunk,
input ChannelFileChunk,
out obiiter.IBioSequence,
quality_shift byte,
) {
@ -326,7 +326,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
buff := make([]byte, 1024*1024)
chkchan := ReadSeqFileChunk(
chkchan := ReadFileChunk(
opt.Source(),
reader,
buff,

View File

@ -7,6 +7,7 @@ import (
"io"
"os"
"strings"
"time"
log "github.com/sirupsen/logrus"
@ -132,7 +133,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers()
chunkchan := WriteSeqFileChunk(file, opt.CloseFile())
chunkchan := WriteFileChunk(file, opt.CloseFile())
header_format := opt.FormatFastSeqHeader()
@ -140,6 +141,9 @@ func WriteFasta(iterator obiiter.IBioSequence,
go func() {
newIter.WaitAndClose()
for len(chunkchan) > 0 {
time.Sleep(time.Millisecond)
}
close(chunkchan)
log.Debugf("Writing fasta file done")
}()
@ -151,7 +155,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
log.Debugf("Formating fasta chunk %d", batch.Order())
chunkchan <- SeqFileChunk{
chunkchan <- FileChunk{
Source: batch.Source(),
Raw: FormatFastaBatch(batch, header_format, opt.SkipEmptySequence()),
Order: batch.Order(),
@ -166,7 +170,7 @@ func WriteFasta(iterator obiiter.IBioSequence,
log.Debugln("Start of the fasta file writing")
go ff(iterator)
for i := 0; i < nwriters-1; i++ {
for i := 1; i < nwriters; i++ {
go ff(iterator.Split())
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"io"
"os"
"sync"
"time"
log "github.com/sirupsen/logrus"
@ -87,11 +86,6 @@ func FormatFastqBatch(batch obiiter.BioSequenceBatch,
return &bs
}
type FileChunk struct {
text []byte
order int
}
func WriteFastq(iterator obiiter.IBioSequence,
file io.WriteCloser,
options ...WithOption) (obiiter.IBioSequence, error) {
@ -104,27 +98,25 @@ func WriteFastq(iterator obiiter.IBioSequence,
nwriters := opt.ParallelWorkers()
chunkchan := WriteSeqFileChunk(file, opt.CloseFile())
chunkchan := WriteFileChunk(file, opt.CloseFile())
header_format := opt.FormatFastSeqHeader()
newIter.Add(nwriters)
var waitWriter sync.WaitGroup
go func() {
newIter.WaitAndClose()
for len(chunkchan) > 0 {
time.Sleep(time.Millisecond)
}
close(chunkchan)
waitWriter.Wait()
log.Debugf("Writing fastq file done")
}()
ff := func(iterator obiiter.IBioSequence) {
for iterator.Next() {
batch := iterator.Get()
chunk := SeqFileChunk{
chunk := FileChunk{
Source: batch.Source(),
Raw: FormatFastqBatch(batch, header_format, opt.SkipEmptySequence()),
Order: batch.Order(),
@ -137,7 +129,7 @@ func WriteFastq(iterator obiiter.IBioSequence,
log.Debugln("Start of the fastq file writing")
go ff(iterator)
for i := 0; i < nwriters-1; i++ {
for i := 1; i < nwriters; i++ {
go ff(iterator.Split())
}

View File

@ -11,13 +11,13 @@ import (
type SeqFileChunkParser func(string, io.Reader) (obiseq.BioSequenceSlice, error)
type SeqFileChunk struct {
type FileChunk struct {
Source string
Raw *bytes.Buffer
Order int
}
type ChannelSeqFileChunk chan SeqFileChunk
type ChannelFileChunk chan FileChunk
type LastSeqRecord func([]byte) int
@ -34,15 +34,15 @@ type LastSeqRecord func([]byte) int
//
// Returns:
// None
func ReadSeqFileChunk(
func ReadFileChunk(
source string,
reader io.Reader,
buff []byte,
splitter LastSeqRecord) ChannelSeqFileChunk {
splitter LastSeqRecord) ChannelFileChunk {
var err error
var fullbuff []byte
chunk_channel := make(ChannelSeqFileChunk)
chunk_channel := make(ChannelFileChunk)
fileChunkSize := len(buff)
@ -95,8 +95,10 @@ func ReadSeqFileChunk(
}
if len(buff) > 0 {
io := bytes.NewBuffer(slices.Clone(buff))
chunk_channel <- SeqFileChunk{source, io, i}
cbuff := slices.Clone(buff)
io := bytes.NewBuffer(cbuff)
// log.Warnf("chuck %d :Read %d bytes from file %s", i, io.Len(), source)
chunk_channel <- FileChunk{source, io, i}
i++
}
@ -120,7 +122,7 @@ func ReadSeqFileChunk(
// Send the last chunk to the channel
if len(buff) > 0 {
io := bytes.NewBuffer(slices.Clone(buff))
chunk_channel <- SeqFileChunk{source, io, i}
chunk_channel <- FileChunk{source, io, i}
}
// Close the readers channel when the end of the file is reached

View File

@ -8,16 +8,16 @@ import (
log "github.com/sirupsen/logrus"
)
func WriteSeqFileChunk(
func WriteFileChunk(
writer io.WriteCloser,
toBeClosed bool) ChannelSeqFileChunk {
toBeClosed bool) ChannelFileChunk {
obiiter.RegisterAPipe()
chunk_channel := make(ChannelSeqFileChunk)
chunk_channel := make(ChannelFileChunk)
go func() {
nextToPrint := 0
toBePrinted := make(map[int]SeqFileChunk)
toBePrinted := make(map[int]FileChunk)
for chunk := range chunk_channel {
if chunk.Order == nextToPrint {
log.Debugf("Writing chunk: %d of length %d bytes",

View File

@ -198,7 +198,7 @@ func GenbankChunkParser(withFeatureTable bool) func(string, io.Reader) (obiseq.B
}
}
func _ParseGenbankFile(input ChannelSeqFileChunk,
func _ParseGenbankFile(input ChannelFileChunk,
out obiiter.IBioSequence,
withFeatureTable bool) {
@ -225,7 +225,7 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence,
buff := make([]byte, 1024*1024*128) // 128 MB
entry_channel := ReadSeqFileChunk(
entry_channel := ReadFileChunk(
opt.Source(),
reader,
buff,

View File

@ -7,7 +7,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/goccy/go-json"
@ -58,9 +57,17 @@ func JSONRecord(sequence *obiseq.BioSequence) []byte {
return text
}
func FormatJSONBatch(batch obiiter.BioSequenceBatch) []byte {
func FormatJSONBatch(batch obiiter.BioSequenceBatch) *bytes.Buffer {
buff := new(bytes.Buffer)
json := bufio.NewWriter(buff)
if batch.Order() == 0 {
json.WriteString("[\n")
} else {
json.WriteString(",\n")
}
n := batch.Slice().Len() - 1
for i, s := range batch.Slice() {
json.WriteString(" ")
@ -71,8 +78,7 @@ func FormatJSONBatch(batch obiiter.BioSequenceBatch) []byte {
}
json.Flush()
return buff.Bytes()
return buff
}
func WriteJSON(iterator obiiter.IBioSequence,
@ -84,14 +90,10 @@ func WriteJSON(iterator obiiter.IBioSequence,
file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile())
newIter := obiiter.MakeIBioSequence()
nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunk)
chunkchan := WriteFileChunk(file, opt.CloseFile())
newIter.Add(nwriters)
var waitWriter sync.WaitGroup
go func() {
newIter.WaitAndClose()
@ -99,7 +101,6 @@ func WriteJSON(iterator obiiter.IBioSequence,
time.Sleep(time.Millisecond)
}
close(chunkchan)
waitWriter.Wait()
}()
ff := func(iterator obiiter.IBioSequence) {
@ -107,62 +108,31 @@ func WriteJSON(iterator obiiter.IBioSequence,
batch := iterator.Get()
chunkchan <- FileChunk{
FormatJSONBatch(batch),
batch.Order(),
ss := FileChunk{
Source: batch.Source(),
Raw: FormatJSONBatch(batch),
Order: batch.Order(),
}
chunkchan <- ss
newIter.Push(batch)
}
newIter.Done()
}
next_to_send := 0
received := make(map[int]FileChunk, 100)
waitWriter.Add(1)
go func() {
for chunk := range chunkchan {
if chunk.order == next_to_send {
if next_to_send > 0 {
file.Write([]byte(",\n"))
}
file.Write(chunk.text)
next_to_send++
chunk, ok := received[next_to_send]
for ok {
file.Write(chunk.text)
delete(received, next_to_send)
next_to_send++
chunk, ok = received[next_to_send]
}
} else {
received[chunk.order] = chunk
}
}
file.Write([]byte("\n]\n"))
file.Close()
log.Debugln("End of the JSON file writing")
obiiter.UnregisterPipe()
waitWriter.Done()
}()
log.Debugln("Start of the JSON file writing")
file.Write([]byte("[\n"))
go ff(iterator)
for i := 0; i < nwriters-1; i++ {
for i := 1; i < nwriters; i++ {
go ff(iterator.Split())
}
go ff(iterator)
return newIter, nil
}
func WriteJSONToStdout(iterator obiiter.IBioSequence,
options ...WithOption) (obiiter.IBioSequence, error) {
options = append(options, OptionDontCloseFile())
options = append(options, OptionCloseFile())
return WriteJSON(iterator, os.Stdout, options...)
}

View File

@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "3d06978"
var _Commit = "69ef175"
var _Version = "Release 4.2.0"
// Version returns the version of the obitools package.

View File

@ -16,11 +16,12 @@ import (
//
// Returns:
// - A set of strings containing the keys of the BioSequence attributes.
func (s *BioSequence) AttributeKeys(skip_container bool) obiutils.Set[string] {
func (s *BioSequence) AttributeKeys(skip_container, skip_definition bool) obiutils.Set[string] {
keys := obiutils.MakeSet[string]()
for k, v := range s.Annotations() {
if !skip_container || !obiutils.IsAContainer(v) {
if !((skip_container && obiutils.IsAContainer(v)) ||
(skip_definition && k == "definition")) {
keys.Add(k)
}
}
@ -38,8 +39,8 @@ func (s *BioSequence) AttributeKeys(skip_container bool) obiutils.Set[string] {
//
// Returns:
// - A set of strings containing the keys of the BioSequence.
func (s *BioSequence) Keys(skip_container bool) obiutils.Set[string] {
keys := s.AttributeKeys(skip_container)
func (s *BioSequence) Keys(skip_container, skip_definition bool) obiutils.Set[string] {
keys := s.AttributeKeys(skip_container, skip_definition)
keys.Add("id")
if s.HasSequence() {

View File

@ -150,11 +150,11 @@ func (s BioSequenceSlice) Size() int {
return size
}
func (s BioSequenceSlice) AttributeKeys(skip_map bool) obiutils.Set[string] {
func (s BioSequenceSlice) AttributeKeys(skip_map, skip_definition bool) obiutils.Set[string] {
keys := obiutils.MakeSet[string]()
for _, k := range s {
keys = keys.Union(k.AttributeKeys(skip_map))
keys = keys.Union(k.AttributeKeys(skip_map, skip_definition))
}
return keys

View File

@ -1,16 +1,17 @@
package obitax
import "log"
import log "github.com/sirupsen/logrus"
var __defaut_taxonomy__ *Taxonomy
func (taxonomy *Taxonomy) SetAsDefault() {
log.Infof("Set as default taxonomy %s", taxonomy.Name())
__defaut_taxonomy__ = taxonomy
}
func (taxonomy *Taxonomy) OrDefault(panicOnNil bool) *Taxonomy {
if taxonomy == nil {
return __defaut_taxonomy__
taxonomy = __defaut_taxonomy__
}
if panicOnNil && taxonomy == nil {

View File

@ -1,6 +1,7 @@
package obicsv
import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
"github.com/DavidGamba/go-getoptions"
@ -66,6 +67,7 @@ func CSVOptionSet(options *getoptions.GetOpt) {
func OptionSet(options *getoptions.GetOpt) {
obiconvert.InputOptionSet(options)
obiconvert.OutputModeOptionSet(options)
obioptions.LoadTaxonomyOptionSet(options, false, false)
CSVOptionSet(options)
}

View File

@ -121,7 +121,7 @@ func NewCSVSequenceIterator(iter obiiter.IBioSequence, options ...WithOption) *I
if len(batch.Slice()) == 0 {
log.Panicf("first batch should not be empty")
}
auto_slot := batch.Slice().AttributeKeys(true).Members()
auto_slot := batch.Slice().AttributeKeys(true, true).Members()
slices.Sort(auto_slot)
CSVKeys(auto_slot)(opt)
iter.PushBack()

View File

@ -55,7 +55,7 @@ func WriteCSV(iterator *ICSVRecord,
nwriters := opt.ParallelWorkers()
chunkchan := obiformats.WriteSeqFileChunk(file, opt.CloseFile())
chunkchan := obiformats.WriteFileChunk(file, opt.CloseFile())
newIter.Add(nwriters)
@ -72,7 +72,7 @@ func WriteCSV(iterator *ICSVRecord,
log.Debugf("Formating CSV chunk %d", batch.Order())
ss := obiformats.SeqFileChunk{
ss := obiformats.FileChunk{
Source: batch.Source(),
Raw: FormatCVSBatch(
batch,