diff --git a/obitests/obitools/obiconvert/gbpln1088.4Mb.fasta.gz b/obitests/obitools/obiconvert/gbpln1088.4Mb.fasta.gz new file mode 100644 index 0000000..1db096a Binary files /dev/null and b/obitests/obitools/obiconvert/gbpln1088.4Mb.fasta.gz differ diff --git a/obitests/obitools/obiconvert/test.sh b/obitests/obitools/obiconvert/test.sh index 12db4d0..7fd72c1 100755 --- a/obitests/obitools/obiconvert/test.sh +++ b/obitests/obitools/obiconvert/test.sh @@ -13,6 +13,11 @@ CMD=obiconvert # ###### TEST_DIR="$(dirname "$(readlink -f "${BASH_SOURCE[0]}")")" + +if [ -z "$TEST_DIR" ] ; then + TEST_DIR="." +fi + OBITOOLS_DIR="${TEST_DIR/obitest*/}build" export PATH="${OBITOOLS_DIR}:${PATH}" @@ -99,6 +104,36 @@ else fi +((ntest++)) +if obiconvert -Z "${TEST_DIR}/gbpln1088.4Mb.fasta.gz" \ + > "${TMPDIR}/xxx.fasta.gz" && \ + zdiff "${TEST_DIR}/gbpln1088.4Mb.fasta.gz" \ + "${TMPDIR}/xxx.fasta.gz" +then + log "$MCMD: converting large fasta file to fasta OK" + ((success++)) +else + log "$MCMD: converting large fasta file to fasta failed" + ((failed++)) +fi + +((ntest++)) +if obiconvert -Z --fastq-output \ + "${TEST_DIR}/gbpln1088.4Mb.fasta.gz" \ + > "${TMPDIR}/xxx.fastq.gz" && \ + obiconvert -Z --fasta-output \ + "${TMPDIR}/xxx.fastq.gz" \ + > "${TMPDIR}/yyy.fasta.gz" && \ + zdiff "${TEST_DIR}/gbpln1088.4Mb.fasta.gz" \ + "${TMPDIR}/yyy.fasta.gz" +then + log "$MCMD: converting large file between fasta and fastq OK" + ((success++)) +else + log "$MCMD: converting large file between fasta and fastq failed" + ((failed++)) +fi + ######################################### # # At the end of the tests diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 4a5ca60..4150f57 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -187,13 +187,12 @@ func _ParseEmblFile( func ReadEMBL(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) - buff := make([]byte, 1024*1024*128) // 128 MB - entry_channel := ReadFileChunk( opt.Source(), reader, - buff, + 1024*1024*128, EndOfLastFlatFileEntry, + "\nID ", ) newIter := obiiter.MakeIBioSequence() diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index a3ab5c8..2a8114e 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -233,13 +233,12 @@ func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e nworker := opt.ParallelWorkers() - buff := make([]byte, 1024*1024) - chkchan := ReadFileChunk( opt.Source(), reader, - buff, + 1024*1024, EndOfLastFastaEntry, + "\n>", ) for i := 0; i < nworker; i++ { diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go index 0000e73..cab6a5f 100644 --- a/pkg/obiformats/fastqseq_read.go +++ b/pkg/obiformats/fastqseq_read.go @@ -327,13 +327,12 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e nworker := opt.ParallelWorkers() - buff := make([]byte, 1024*1024) - chkchan := ReadFileChunk( opt.Source(), reader, - buff, + 1024*1024, EndOfLastFastqEntry, + "\n@", ) for i := 0; i < nworker; i++ { diff --git a/pkg/obiformats/file_chunk_read.go b/pkg/obiformats/file_chunk_read.go index 17eb0c1..b42b883 100644 --- a/pkg/obiformats/file_chunk_read.go +++ b/pkg/obiformats/file_chunk_read.go @@ -4,8 +4,10 @@ import ( "bytes" "io" "slices" + "strings" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" + "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils" log "github.com/sirupsen/logrus" ) @@ -17,10 +19,102 @@ type FileChunk struct { Order int } +type PieceOfChunk struct { + head *PieceOfChunk + next *PieceOfChunk + data []byte +} + +func NewPieceOfChunk(size int) *PieceOfChunk { + data := make([]byte, size) + p := &PieceOfChunk{ + next: nil, + data: data, + } + p.head = p + return p +} + +func (piece *PieceOfChunk) NewPieceOfChunk(size int) *PieceOfChunk { + if piece == nil { + return NewPieceOfChunk(size) + } + + if piece.next != nil { + log.Panic("Try to create a new piece of chunk when next already exist") + } + + n := NewPieceOfChunk(size) + n.head = piece.head + piece.next = n + + return n +} + +func (piece *PieceOfChunk) Next() *PieceOfChunk { + return piece.next +} + +func (piece *PieceOfChunk) Head() *PieceOfChunk { + if piece == nil { + return nil + } + return piece.head +} + +func (piece *PieceOfChunk) Len() int { + if piece == nil { + return 0 + } + + if piece.next == nil { + return len(piece.data) + } + return len(piece.data) + piece.next.Len() +} + +func (piece *PieceOfChunk) Pack() *PieceOfChunk { + if piece == nil { + return nil + } + size := piece.next.Len() + piece.data = slices.Grow(piece.data, size) + + for p := piece.next; p != nil; { + piece.data = append(piece.data, p.data...) + p.data = nil + n := p.next + p.next = nil + p = n + } + + piece.next = nil + + return piece +} + +func (piece *PieceOfChunk) IsLast() bool { + return piece.next == nil +} + +func (piece *PieceOfChunk) FileChunk(source string, order int) FileChunk { + piece.Pack() + return FileChunk{ + Source: source, + Raw: bytes.NewBuffer(piece.data), + Order: order, + } +} + type ChannelFileChunk chan FileChunk type LastSeqRecord func([]byte) int +func ispossible(data []byte, probe string) bool { + s := obiutils.UnsafeString(data) + return strings.Index(s, probe) != -1 +} + // _ReadFlatFileChunk reads a chunk of data from the given 'reader' and sends it to the // 'readers' channel as a _FileChunk struct. The function reads from the reader until // the end of the last entry is found, then sends the chunk to the channel. If the end @@ -37,81 +131,86 @@ type LastSeqRecord func([]byte) int func ReadFileChunk( source string, reader io.Reader, - buff []byte, - splitter LastSeqRecord) ChannelFileChunk { - var err error - var fullbuff []byte + fileChunkSize int, + splitter LastSeqRecord, + probe string) ChannelFileChunk { chunk_channel := make(ChannelFileChunk) - fileChunkSize := len(buff) - go func() { + var err error size := 0 l := 0 i := 0 + pieces := NewPieceOfChunk(fileChunkSize) // Initialize the buffer to the size of a chunk of data - fullbuff = buff // Read from the reader until the buffer is full or the end of the file is reached - l, err = io.ReadFull(reader, buff) - buff = buff[:l] + l, err = io.ReadFull(reader, pieces.data) + pieces.data = pieces.data[:l] if err == io.ErrUnexpectedEOF { err = nil } + end := splitter(pieces.data) + // Read from the reader until the end of the last entry is found or the end of the file is reached for err == nil { // Create an extended buffer to read from if the end of the last entry is not found in the current buffer - end := 0 - ic := 0 // Read from the reader in 1 MB increments until the end of the last entry is found - for end = splitter(buff); err == nil && end < 0; end = splitter(buff) { - ic++ - buff = slices.Grow(buff, fileChunkSize) - l := len(buff) - extbuff := buff[l:(l + fileChunkSize - 1)] - size, err = io.ReadFull(reader, extbuff) - buff = buff[0:(l + size)] - // log.Warnf("Splitter not found, attempting %d to read in %d B increments : len(buff) = %d/%d", ic, fileChunkSize, len(extbuff), len(buff)) + for err == nil && end < 0 { + pieces = pieces.NewPieceOfChunk(fileChunkSize) + size, err = io.ReadFull(reader, pieces.data) + pieces.data = pieces.data[:size] + if ispossible(pieces.data, probe) { + pieces = pieces.Head().Pack() + end = splitter(pieces.data) + } else { + end = -1 + } + // log.Warnf("Splitter not found, attempting %d to read in %d B increments : len(buff) = %d/%d", ic, fileChunkSize, len(extbuff), len(buff)) } - fullbuff = buff + pieces = pieces.Head().Pack() + lbuff := pieces.Len() - if len(buff) > 0 { + if lbuff > 0 { if end < 0 { - end = len(buff) + end = pieces.Len() } - pnext := end - lremain := len(buff) - pnext - buff = buff[:end] - for len(buff) > 0 && (buff[len(buff)-1] == '\n' || buff[len(buff)-1] == '\r') { - buff = buff[:len(buff)-1] - } + lremain := lbuff - end - if len(buff) > 0 { - cbuff := slices.Clone(buff) - io := bytes.NewBuffer(cbuff) - // log.Warnf("chuck %d :Read %d bytes from file %s", i, io.Len(), source) - chunk_channel <- FileChunk{source, io, i} - i++ - } + var nextpieces *PieceOfChunk if lremain > 0 { - buff = fullbuff[0:lremain] - lcp := copy(buff, fullbuff[pnext:]) + nextpieces = NewPieceOfChunk(lremain) + lcp := copy(nextpieces.data, pieces.data[end:]) if lcp < lremain { log.Fatalf("Error copying remaining data of chunk %d : %d < %d", i, lcp, lremain) } } else { - buff = buff[:0] + nextpieces = nil } + pieces.data = pieces.data[:end] + + for len(pieces.data) > 0 && (pieces.data[len(pieces.data)-1] == '\n' || pieces.data[len(pieces.data)-1] == '\r') { + pieces.data = pieces.data[:len(pieces.data)-1] + } + + if len(pieces.data) > 0 { + // log.Warnf("chuck %d :Read %d bytes from file %s", i, io.Len(), source) + chunk_channel <- pieces.FileChunk(source, i) + i++ + } + + pieces = nextpieces + end = -1 } } @@ -119,10 +218,11 @@ func ReadFileChunk( log.Fatalf("Error reading data from file : %s", err) } + pieces.Head().Pack() + // Send the last chunk to the channel - if len(buff) > 0 { - io := bytes.NewBuffer(slices.Clone(buff)) - chunk_channel <- FileChunk{source, io, i} + if pieces.Len() > 0 { + chunk_channel <- pieces.FileChunk(source, i) } // Close the readers channel when the end of the file is reached diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 34fed24..4d57235 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -223,13 +223,12 @@ func ReadGenbank(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, opt := MakeOptions(options) // entry_channel := make(chan _FileChunk) - buff := make([]byte, 1024*1024*128) // 128 MB - entry_channel := ReadFileChunk( opt.Source(), reader, - buff, + 1024*1024*128, EndOfLastFlatFileEntry, + "\nLOCUS ", ) newIter := obiiter.MakeIBioSequence() diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index d23f1c8..bde6c12 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -8,7 +8,7 @@ import ( // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "f21f51a" +var _Commit = "937a483" var _Version = "Release 4.4.0" // Version returns the version of the obitools package.