diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 914c80a..55e0054 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -186,54 +186,72 @@ func _ReadFlatFileChunk(reader io.Reader, readers chan _FileChunk) { // Initialize the buffer to the size of a chunk of data buff = make([]byte, _FileChunkSize) + // Read from the reader until the buffer is full or the end of the file is reached + l, err = io.ReadFull(reader, buff) + buff = buff[:l] + + if err == io.ErrUnexpectedEOF { + err = nil + } + // Read from the reader until the end of the last entry is found or the end of the file is reached for err == nil { - // Read from the reader until the buffer is full or the end of the file is reached - l, err = io.ReadFull(reader, buff) - - if err == io.ErrUnexpectedEOF { - err = nil - } - // Create an extended buffer to read from if the end of the last entry is not found in the current buffer extbuff := make([]byte, 1<<22) - buff = buff[:l] end := 0 ic := 0 // Read from the reader in 1 MB increments until the end of the last entry is found - for end = _EndOfLastEntry(buff); err == nil && end < 0; end = _EndOfLastEntry(extbuff[:size]) { + for end = _EndOfLastEntry(buff); err == nil && end < 0; end = _EndOfLastEntry(buff) { ic++ size, err = io.ReadFull(reader, extbuff) buff = append(buff, extbuff[:size]...) } - end = _EndOfLastEntry(buff) - - // If an extension was read, log the size and number of extensions - if len(buff) > 0 { - remains := buff[end:] + lremain := len(buff) - end + remains := make([]byte, max(lremain, _FileChunkSize)) + lcp := copy(remains, buff[end:]) + remains = remains[:lcp] + if lcp < lremain { + log.Fatalf("Error copying remaining data of chunck %d : %d < %d", i, lcp, len(remains)) + } + buff = buff[:end] // Send the chunk of data as a _FileChunk struct to the readers channel io := bytes.NewBuffer(buff) - log.Debugf("Flat File chunck : final buff size %d bytes (%d) (%d extensions) -> end = %d\n", + nzero := 0 + for j := 0; j < len(buff); j++ { + if buff[j] == 0 { + nzero++ + } + } + + if nzero > 0 { + log.Fatalf("File chunck %d contains %d zero bytes", i, nzero) + } + + log.Debugf("Flat File chunck %d : final buff size %d bytes (%d) (%d extensions count) -> end = %d starting by = %s, ending by = %s, remaining = %s", + i, len(buff), io.Cap(), ic, end, + io.Bytes()[0:30], + io.Bytes()[io.Len()-3:], + remains[0:30], ) + if string(buff[io.Len()-3:]) != "//\n" { + log.Fatalf("File chunck ends with 3 bytes : %s", io.Bytes()[io.Len()-3:]) + } + readers <- _FileChunk{io, i} i++ - - // Set the buffer to the size of a chunk of data and copy any remaining data to the new buffer - buff = make([]byte, _FileChunkSize) - copy(buff, remains) - //l = len(remains) + buff = remains } } diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 9a267c5..502d1a9 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -31,12 +31,18 @@ var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp") func _ParseGenbankFile(source string, input <-chan _FileChunk, out obiiter.IBioSequence, chunck_order func() int) { - var err error state := inHeader + previous_chunk := -1 for chunks := range input { - // log.Debugln("Chunk size", (chunks.raw.(*bytes.Buffer)).Len()) - scanner := bufio.NewScanner(chunks.raw) + + if state != inHeader { + log.Fatalf("Unexpected state %d starting new chunk (id = %d, previous_chunk = %d)", + state, chunks.order, previous_chunk) + } + + previous_chunk = chunks.order + scanner := bufio.NewReader(chunks.raw) sequences := make(obiseq.BioSequenceSlice, 0, 100) sumlength := 0 id := "" @@ -48,90 +54,155 @@ func _ParseGenbankFile(source string, taxid := 1 nl := 0 sl := 0 - for scanner.Scan() { + var line string + for bline, is_prefix, err := scanner.ReadLine(); err != io.EOF; bline, is_prefix, err = scanner.ReadLine() { nl++ - line := scanner.Text() - switch { - case state == inDefinition && !strings.HasPrefix(line, " "): - state = inEntry - fallthrough - case strings.HasPrefix(line, "LOCUS "): - state = inEntry - id = strings.SplitN(line[12:], " ", 2)[0] - match_length := _seqlenght_rx.FindStringSubmatch(line) - if len(match_length) > 0 { - lseq, err = strconv.Atoi(match_length[1]) - if err != nil { - lseq = -1 + line = string(bline) + if is_prefix || len(line) > 100 { + log.Fatalf("Chunk %d : Line too long: %s", chunks.order, line) + } + processed := false + for !processed { + switch { + + case strings.HasPrefix(line, "LOCUS "): + if state != inHeader { + log.Fatalf("Unexpected state %d while reading LOCUS: %s", state, line) } - } - if lseq > 0 { - seqBytes = bytes.NewBuffer(obiseq.GetSlice(lseq + 20)) - } else { - seqBytes = new(bytes.Buffer) - } - case strings.HasPrefix(line, "SOURCE "): - scientificName = strings.TrimSpace(line[12:]) - case strings.HasPrefix(line, "DEFINITION "): - defBytes.WriteString(strings.TrimSpace(line[12:])) - state = inDefinition - case strings.HasPrefix(line, "FEATURES "): - featBytes.WriteString(line) - state = inFeature - case strings.HasPrefix(line, "ORIGIN"): - state = inSequence - case line == "//": - // log.Debugln("Total lines := ", nl) - sequence := obiseq.NewBioSequence(id, - seqBytes.Bytes(), - defBytes.String()) - sequence.SetSource(source) - state = inHeader + id = strings.SplitN(line[12:], " ", 2)[0] + match_length := _seqlenght_rx.FindStringSubmatch(line) + if len(match_length) > 0 { + lseq, err = strconv.Atoi(match_length[1]) + if err != nil { + lseq = -1 + } + } + if lseq > 0 { + seqBytes = bytes.NewBuffer(obiseq.GetSlice(lseq + 20)) + } else { + seqBytes = new(bytes.Buffer) + } + state = inEntry + processed = true - sequence.SetFeatures(featBytes.Bytes()) + case strings.HasPrefix(line, "DEFINITION "): + if state != inEntry { + log.Fatalf("Unexpected state %d while reading DEFINITION: %s", state, line) + } + defBytes.WriteString(strings.TrimSpace(line[12:])) + state = inDefinition + processed = true - annot := sequence.Annotations() - annot["scientific_name"] = scientificName - annot["taxid"] = taxid - // log.Println(FormatFasta(sequence, FormatFastSeqJsonHeader)) - // log.Debugf("Read sequences %s: %dbp (%d)", sequence.Id(), - // sequence.Len(), seqBytes.Len()) + case state == inDefinition: + if strings.HasPrefix(line, " ") { + defBytes.WriteByte(' ') + defBytes.WriteString(strings.TrimSpace(line[12:])) + processed = true + } else { + state = inEntry + } - sequences = append(sequences, sequence) - sumlength += sequence.Len() + case strings.HasPrefix(line, "SOURCE "): + if state != inEntry { + log.Fatalf("Unexpected state %d while reading SOURCE: %s", state, line) + } + scientificName = strings.TrimSpace(line[12:]) + processed = true - if len(sequences) == 100 || sumlength > 1e7 { - out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) - sequences = make(obiseq.BioSequenceSlice, 0, 100) - sumlength = 0 - } - defBytes = bytes.NewBuffer(obiseq.GetSlice(200)) - featBytes = new(bytes.Buffer) - nl = 0 - sl = 0 - default: - switch state { - case inDefinition: - defBytes.WriteByte(' ') - defBytes.WriteString(strings.TrimSpace(line[5:])) - case inFeature: - featBytes.WriteByte('\n') + case strings.HasPrefix(line, "FEATURES "): + if state != inEntry { + log.Fatalf("Unexpected state %d while reading FEATURES: %s", state, line) + } featBytes.WriteString(line) - if strings.HasPrefix(line, ` /db_xref="taxon:`) { - taxid, _ = strconv.Atoi(strings.SplitN(line[37:], `"`, 2)[0]) + state = inFeature + processed = true + + case strings.HasPrefix(line, "ORIGIN"): + if state != inFeature { + log.Fatalf("Unexpected state %d while reading ORIGIN: %s", state, line) } - case inSequence: + state = inSequence + processed = true + + case line == "//": + + if state != inSequence { + log.Fatalf("Unexpected state %d while reading end of record %s", state, id) + } + // log.Debugln("Total lines := ", nl) + if id == "" { + log.Warn("Empty id when parsing genbank file") + } + if seqBytes.Len() == 0 { + log.Warn("Empty sequence when parsing genbank file") + } + + log.Debugf("End of sequence %s: %dbp ", id, seqBytes.Len()) + + sequence := obiseq.NewBioSequence(id, + seqBytes.Bytes(), + defBytes.String()) + sequence.SetSource(source) + sequence.SetFeatures(featBytes.Bytes()) + + annot := sequence.Annotations() + annot["scientific_name"] = scientificName + annot["taxid"] = taxid + // log.Println(FormatFasta(sequence, FormatFastSeqJsonHeader)) + // log.Debugf("Read sequences %s: %dbp (%d)", sequence.Id(), + // sequence.Len(), seqBytes.Len()) + + sequences = append(sequences, sequence) + sumlength += sequence.Len() + + if len(sequences) == 100 || sumlength > 1e7 { + log.Debugln("Pushing sequences") + out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) + sequences = make(obiseq.BioSequenceSlice, 0, 100) + sumlength = 0 + } + + defBytes = bytes.NewBuffer(obiseq.GetSlice(200)) + featBytes = new(bytes.Buffer) + nl = 0 + sl = 0 + + state = inHeader + processed = true + + case state == inSequence: + log.Debugf("Chunk %d : Genbank: line %d, state = %d : %s", chunks.order, nl, state, line) + sl++ - parts := strings.SplitN(line[10:], " ", 7) + parts := strings.SplitN(line[10:], " ", 6) lparts := len(parts) for i := 0; i < lparts; i++ { seqBytes.WriteString(parts[i]) } + processed = true + + default: + switch state { + case inFeature: + featBytes.WriteByte('\n') + featBytes.WriteString(line) + if strings.HasPrefix(line, ` /db_xref="taxon:`) { + taxid, _ = strconv.Atoi(strings.SplitN(line[37:], `"`, 2)[0]) + } + processed = true + case inHeader: + processed = true + case inEntry: + processed = true + } } } } + + log.Debugf("End of chunk %d : %s", chunks.order, line) if len(sequences) > 0 { + log.Debugln("Pushing sequences") out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences)) } }