mirror of
https://github.com/metabarcoding/obitools4.git
synced 2025-06-29 16:20:46 +00:00
402 lines
9.4 KiB
Go
402 lines
9.4 KiB
Go
package obiformats
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
func EndOfLastFastqEntry(buffer []byte) int {
|
|
var i int
|
|
|
|
// obilog.Warnf("EndOfLastFastqEntry(%d): %s", len(buffer), string(buffer[0:20]))
|
|
imax := len(buffer)
|
|
state := 0
|
|
restart := imax - 1
|
|
cut := imax
|
|
|
|
for i = imax - 1; i >= 0 && state < 7; i-- {
|
|
C := buffer[i]
|
|
is_end_of_line := C == '\r' || C == '\n'
|
|
is_space := C == ' ' || C == '\t'
|
|
is_sep := is_space || is_end_of_line
|
|
|
|
switch state {
|
|
case 0:
|
|
if C == '+' {
|
|
// Potential start of quality part step 1
|
|
// log.Warn("Potential start of quality part step 1 - +")
|
|
state = 1
|
|
restart = i
|
|
}
|
|
case 1:
|
|
if is_end_of_line {
|
|
// Potential start of quality part step 2
|
|
// log.Warn("Potential start of quality part step 2 - +/end of line")
|
|
state = 2
|
|
} else {
|
|
// it was not the start of quality part
|
|
// log.Warn("it was not the start of quality part")
|
|
state = 0
|
|
i = restart
|
|
}
|
|
case 2:
|
|
if is_sep {
|
|
// Potential start of quality part step 2 (stay in the same state)
|
|
// log.Warn("Potential start of quality part step 2 - skipping separator")
|
|
state = 2
|
|
} else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' {
|
|
// progressing along of the sequence
|
|
// log.Warn("Detected the end of the sequence switching to state 3")
|
|
state = 3
|
|
} else {
|
|
// it was not the start of quality part
|
|
// log.Warn("it was not the start of quality part because is not preceded by sequence")
|
|
state = 0
|
|
i = restart
|
|
}
|
|
case 3:
|
|
if is_end_of_line {
|
|
// Entrering in the header line
|
|
// log.Warn("Potentially entrering in the header line")
|
|
state = 4
|
|
} else if (C >= 'a' && C <= 'z') || (C >= 'A' && C <= 'Z') || C == '-' || C == '.' || C == '[' || C == ']' {
|
|
// progressing along of the sequence
|
|
// log.Warn("Progressing along of the sequence")
|
|
state = 3
|
|
} else {
|
|
// it was not the sequence part
|
|
// obilog.Warnf("it was not the sequence part : %c", C)
|
|
state = 0
|
|
i = restart
|
|
}
|
|
case 4:
|
|
if is_end_of_line {
|
|
state = 4
|
|
} else {
|
|
|
|
state = 5
|
|
}
|
|
case 5:
|
|
if is_end_of_line {
|
|
// It was not the header line
|
|
state = 0
|
|
i = restart
|
|
} else if C == '@' {
|
|
// It was the header line
|
|
// log.Warn("It was the header line")
|
|
state = 6
|
|
cut = i
|
|
}
|
|
case 6:
|
|
if is_end_of_line {
|
|
// log.Warn("====> End of the last sequence")
|
|
state = 7
|
|
} else {
|
|
// obilog.Warnf("%s: Strange it was not the end of the last sequence : %c : %s", string(buffer[0:40]), C, string(buffer[i-20:i+5]))
|
|
state = 5
|
|
}
|
|
}
|
|
}
|
|
|
|
if i == 0 || state != 7 {
|
|
return -1
|
|
}
|
|
|
|
return cut
|
|
}
|
|
|
|
func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality_shift byte) {
|
|
q := bytes.Bytes()
|
|
if len(q) == 0 {
|
|
log.Fatalf("@%s[%s] : sequence quality is empty", out.Id(), out.Source())
|
|
}
|
|
|
|
if len(q) != out.Len() {
|
|
log.Fatalf("%s[%s] : sequence data and quality lenght not equal (%d <> %d)",
|
|
out.Id(), out.Source(), len(q), out.Len())
|
|
}
|
|
|
|
for i := 0; i < len(q); i++ {
|
|
q[i] -= quality_shift
|
|
}
|
|
out.SetQualities(q)
|
|
}
|
|
|
|
func FastqChunkParser(quality_shift byte, with_quality bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
|
|
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
|
|
|
|
var identifier string
|
|
var definition string
|
|
|
|
idBytes := bytes.Buffer{}
|
|
defBytes := bytes.Buffer{}
|
|
qualBytes := bytes.Buffer{}
|
|
seqBytes := bytes.Buffer{}
|
|
|
|
state := 0
|
|
scanner := bufio.NewReader(input)
|
|
sequences := obiseq.MakeBioSequenceSlice(100)[:0]
|
|
previous := byte(0)
|
|
|
|
for C, err := scanner.ReadByte(); err != io.EOF; C, err = scanner.ReadByte() {
|
|
|
|
is_end_of_line := C == '\r' || C == '\n'
|
|
is_space := C == ' ' || C == '\t'
|
|
is_sep := is_space || is_end_of_line
|
|
|
|
switch state {
|
|
case 0: // Beginning of sequence chunk must start with @
|
|
|
|
if C == '@' {
|
|
// Beginning of sequence
|
|
state = 1
|
|
} else {
|
|
log.Fatalf("%s : sequence entry is not starting with @", source)
|
|
}
|
|
case 1: // Beginning of identifier (Mandatory)
|
|
if is_sep {
|
|
// No identifier -> ERROR
|
|
log.Fatalf("%s : sequence identifier is empty", source)
|
|
} else {
|
|
// Beginning of identifier
|
|
state = 2
|
|
idBytes.Reset()
|
|
idBytes.WriteByte(C)
|
|
}
|
|
case 2: // Following of the identifier
|
|
if is_sep {
|
|
// End of identifier
|
|
identifier = idBytes.String()
|
|
state = 3
|
|
} else {
|
|
idBytes.WriteByte(C)
|
|
}
|
|
if is_end_of_line {
|
|
// Definition empty
|
|
definition = ""
|
|
state = 5
|
|
}
|
|
case 3: // Beginning of definition
|
|
if is_end_of_line {
|
|
// Definition empty
|
|
definition = ""
|
|
state = 5
|
|
} else if !is_space {
|
|
// Beginning of definition
|
|
defBytes.Reset()
|
|
defBytes.WriteByte(C)
|
|
state = 4
|
|
}
|
|
case 4: // Following of the definition
|
|
if is_end_of_line {
|
|
definition = defBytes.String()
|
|
state = 5
|
|
} else {
|
|
defBytes.WriteByte(C)
|
|
}
|
|
case 5: // Beginning of sequence
|
|
if !is_end_of_line {
|
|
// Beginning of sequence
|
|
if C >= 'A' && C <= 'Z' {
|
|
C = C + 'a' - 'A'
|
|
}
|
|
seqBytes.Reset()
|
|
seqBytes.WriteByte(C)
|
|
state = 6
|
|
}
|
|
case 6:
|
|
if is_end_of_line {
|
|
// End of sequence
|
|
rawseq := seqBytes.Bytes()
|
|
if len(rawseq) == 0 {
|
|
log.Fatalf("@%s[%s] : sequence is empty", identifier, source)
|
|
}
|
|
s := obiseq.NewBioSequence(identifier, rawseq, definition)
|
|
s.SetSource(source)
|
|
sequences = append(sequences, s)
|
|
state = 7
|
|
} else {
|
|
if C >= 'A' && C <= 'Z' {
|
|
C = C + 'a' - 'A'
|
|
}
|
|
if (C >= 'a' && C <= 'z') || C == '-' || C == '.' || C == '[' || C == ']' {
|
|
seqBytes.WriteByte(C)
|
|
} else {
|
|
context, _ := scanner.Peek(30)
|
|
context = append(
|
|
append([]byte{previous}, C),
|
|
context...)
|
|
log.Fatalf("%s [%s]: sequence contains invalid character %c (%s)",
|
|
source, identifier, C, string(context))
|
|
}
|
|
}
|
|
case 7:
|
|
if is_end_of_line {
|
|
state = 7
|
|
} else if C == '+' {
|
|
state = 8
|
|
} else {
|
|
log.Fatalf("@%s[%s] : sequence data not followed by a line starting with + but a %c", identifier, source, C)
|
|
}
|
|
case 8:
|
|
// State consuming the + internal header line
|
|
if is_end_of_line {
|
|
state = 9
|
|
}
|
|
case 9:
|
|
if is_end_of_line {
|
|
state = 9
|
|
} else {
|
|
// beginning of quality
|
|
state = 10
|
|
qualBytes.Reset()
|
|
qualBytes.WriteByte(C)
|
|
}
|
|
case 10:
|
|
if is_end_of_line {
|
|
if with_quality {
|
|
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
|
|
}
|
|
state = 11
|
|
} else {
|
|
qualBytes.WriteByte(C)
|
|
}
|
|
case 11:
|
|
if is_end_of_line {
|
|
state = 11
|
|
} else if C == '@' {
|
|
state = 1
|
|
} else {
|
|
log.Fatalf("%s[%s] : sequence record not followed by a line starting with @", identifier, source)
|
|
}
|
|
|
|
}
|
|
|
|
previous = C
|
|
}
|
|
|
|
if len(sequences) > 0 {
|
|
if state == 10 {
|
|
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
|
|
state = 1
|
|
}
|
|
}
|
|
|
|
return sequences, nil
|
|
}
|
|
|
|
return parser
|
|
}
|
|
|
|
func _ParseFastqFile(
|
|
input ChannelFileChunk,
|
|
out obiiter.IBioSequence,
|
|
quality_shift byte,
|
|
with_quality bool,
|
|
) {
|
|
|
|
parser := FastqChunkParser(quality_shift, with_quality)
|
|
|
|
for chunks := range input {
|
|
sequences, err := parser(chunks.Source, chunks.Raw)
|
|
|
|
if err != nil {
|
|
log.Fatalf("File %s : Cannot parse the fastq file : %v", chunks.Source, err)
|
|
}
|
|
|
|
out.Push(obiiter.MakeBioSequenceBatch(chunks.Source, chunks.Order, sequences))
|
|
|
|
}
|
|
|
|
out.Done()
|
|
|
|
}
|
|
|
|
func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
|
|
opt := MakeOptions(options)
|
|
out := obiiter.MakeIBioSequence()
|
|
|
|
nworker := opt.ParallelWorkers()
|
|
|
|
chkchan := ReadFileChunk(
|
|
opt.Source(),
|
|
reader,
|
|
1024*1024,
|
|
EndOfLastFastqEntry,
|
|
"\n@",
|
|
)
|
|
|
|
for i := 0; i < nworker; i++ {
|
|
out.Add(1)
|
|
go _ParseFastqFile(
|
|
chkchan,
|
|
out,
|
|
obidefault.ReadQualitiesShift(),
|
|
opt.ReadQualities(),
|
|
)
|
|
}
|
|
|
|
go func() {
|
|
out.WaitAndClose()
|
|
}()
|
|
|
|
newIter := out.SortBatches()
|
|
|
|
log.Debugln("Full file batch mode : ", opt.FullFileBatch())
|
|
|
|
if opt.FullFileBatch() {
|
|
newIter = newIter.CompleteFileIterator()
|
|
}
|
|
|
|
annotParser := opt.ParseFastSeqHeader()
|
|
|
|
if annotParser != nil {
|
|
return IParseFastSeqHeaderBatch(newIter, options...), nil
|
|
}
|
|
|
|
return newIter, nil
|
|
}
|
|
|
|
func ReadFastqFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
|
|
options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename)))))
|
|
|
|
file, err := obiutils.Ropen(filename)
|
|
|
|
if err == obiutils.ErrNoContent {
|
|
log.Infof("file %s is empty", filename)
|
|
return ReadEmptyFile(options...)
|
|
}
|
|
|
|
if err != nil {
|
|
return obiiter.NilIBioSequence, err
|
|
}
|
|
|
|
return ReadFastq(file, options...)
|
|
}
|
|
|
|
func ReadFastqFromStdin(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
|
|
options = append(options, OptionsSource(obiutils.RemoveAllExt("stdin")))
|
|
input, err := obiutils.Buf(os.Stdin)
|
|
|
|
if err == obiutils.ErrNoContent {
|
|
log.Infof("stdin is empty")
|
|
return ReadEmptyFile(options...)
|
|
}
|
|
|
|
if err != nil {
|
|
log.Fatalf("open file error: %v", err)
|
|
return obiiter.NilIBioSequence, err
|
|
}
|
|
|
|
return ReadFastq(input, options...)
|
|
}
|