diff --git a/pkg/obiformats/fastaseq_read.go b/pkg/obiformats/fastaseq_read.go index c902857..a61414c 100644 --- a/pkg/obiformats/fastaseq_read.go +++ b/pkg/obiformats/fastaseq_read.go @@ -16,7 +16,7 @@ import ( log "github.com/sirupsen/logrus" ) -// lastSequenceCut extracts the up to the last sequence cut from a given buffer. +// lastFastaCut extracts the up to the last sequence cut from a given buffer. // // It takes a parameter: // - buffer []byte: the buffer to extract the sequence cut from. @@ -24,7 +24,7 @@ import ( // It returns two values: // - []byte: the extracted sequences. // - []byte: the remaining buffer after the sequence cut (the last sequence). -func lastSequenceCut(buffer []byte) ([]byte, []byte) { +func lastFastaCut(buffer []byte) ([]byte, []byte) { imax := len(buffer) last := 0 state := 0 @@ -45,13 +45,13 @@ func lastSequenceCut(buffer []byte) ([]byte, []byte) { return []byte{}, buffer } -// firstSequenceCut cuts the input buffer at the first occurrence of a ">" character +// firstFastaCut cuts the input buffer at the first occurrence of a ">" character // following a sequence of "\r" or "\n" characters. // // It takes a byte slice as input, representing the buffer to be cut. // It returns two byte slices: the first slice contains the part of the buffer before the cut, // and the second slice contains the part of the buffer after the cut. -func firstSequenceCut(buffer []byte) ([]byte, []byte) { +func firstFastaCut(buffer []byte) ([]byte, []byte) { imax := len(buffer) last := 0 state := 0 @@ -73,17 +73,6 @@ func firstSequenceCut(buffer []byte) ([]byte, []byte) { } -func fullSequenceCut(buffer []byte) ([]byte, []byte, []byte) { - before, buffer := firstSequenceCut(buffer) - - if len(buffer) == 0 { - return before, []byte{}, []byte{} - } - - buffer, after := lastSequenceCut(buffer) - return before, buffer, after -} - func Concatenate[S ~[]E, E any](s1, s2 S) S { if len(s1) > 0 { if len(s2) > 0 { @@ -109,7 +98,7 @@ func FastaChunkReader(r io.Reader, size int, cutHead bool) (chan FastxChunk, err buff = buff[:n] } - begin, buff := firstSequenceCut(buff) + begin, buff := firstFastaCut(buff) if len(begin) > 0 && !cutHead { return out, fmt.Errorf("begin is not empty : %s", string(begin)) @@ -127,7 +116,7 @@ func FastaChunkReader(r io.Reader, size int, cutHead bool) (chan FastxChunk, err buff = Concatenate(end, buff) // fmt.Println("------------buff--pasted----------------") // fmt.Println(string(buff)) - buff, end = lastSequenceCut(buff) + buff, end = lastFastaCut(buff) // fmt.Println("----------------buff--cutted------------") // fmt.Println(string(buff)) // fmt.Println("------------------end-------------------") diff --git a/pkg/obiformats/fastqseq_read.go b/pkg/obiformats/fastqseq_read.go new file mode 100644 index 0000000..62b559d --- /dev/null +++ b/pkg/obiformats/fastqseq_read.go @@ -0,0 +1,359 @@ +package obiformats + +import ( + "bytes" + "io" + "os" + "path" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" + log "github.com/sirupsen/logrus" + "golang.org/x/exp/slices" +) + +func lastFastqCut(buffer []byte) ([]byte, []byte) { + imax := len(buffer) + cut := imax + state := 0 + restart := imax - 1 + for i := restart; i >= 0 && state < 7; i-- { + C := buffer[i] + is_end_of_line := C == '\r' || C == '\n' + is_space := C == ' ' || C == '\t' + is_sep := is_space || is_end_of_line + + switch state { + case 0: + if C == '+' { + // Potential start of quality part step 1 + state = 1 + restart = i + } + case 1: + if is_end_of_line { + // Potential start of quality part step 2 + state = 2 + } else { + // it was not the start of quality part + state = 0 + i = restart + } + case 2: + if is_sep { + // Potential start of quality part step 2 (stay in the same state) + state = 2 + } else if (C >= 'a' && C <= 'z') || C == '-' || C == '.' { + // End of the sequence + state = 3 + } else { + // it was not the start of quality part + state = 0 + i = restart + } + case 3: + if is_end_of_line { + // Entrering in the header line + state = 4 + } else if (C >= 'a' && C <= 'z') || C == '-' || C == '.' { + // progressing along of the sequence + state = 3 + } else { + // it was not the sequence part + state = 0 + i = restart + } + case 4: + if is_end_of_line { + state = 4 + } else { + state = 5 + } + case 5: + if is_end_of_line { + // It was not the header line + state = 0 + i = restart + } else if C == '@' { + state = 6 + cut = i + } + case 6: + if is_end_of_line { + state = 7 + } else { + state = 0 + i = restart + } + } + } + if state == 7 { + return buffer[:cut], bytes.Clone(buffer[cut:]) + } + return []byte{}, buffer +} + +func FastqChunkReader(r io.Reader, size int) (chan FastxChunk, error) { + out := make(chan FastxChunk) + buff := make([]byte, size) + + n, err := r.Read(buff) + + if n > 0 && err == nil { + if n < size { + buff = buff[:n] + } + + go func(buff []byte) { + idx := 0 + end := []byte{} + + for err == nil && n > 0 { + // fmt.Println("============end=========================") + // fmt.Println(string(end)) + // fmt.Println("------------buff------------------------") + // fmt.Println(string(buff)) + buff = Concatenate(end, buff) + // fmt.Println("------------buff--pasted----------------") + // fmt.Println(string(buff)) + buff, end = lastFastqCut(buff) + // fmt.Println("----------------buff--cutted------------") + // fmt.Println(string(buff)) + // fmt.Println("------------------end-------------------") + // fmt.Println(string(end)) + // fmt.Println("========================================") + if len(buff) > 0 { + out <- FastxChunk{ + Bytes: bytes.Clone(buff), + index: idx, + } + idx++ + } + + buff = slices.Grow(buff[:0], size)[0:size] + n, err = r.Read(buff) + if n < size { + buff = buff[:n] + } + // fmt.Printf("n = %d, err = %v\n", n, err) + } + + if len(end) > 0 { + out <- FastxChunk{ + Bytes: bytes.Clone(end), + index: idx, + } + } + + close(out) + }(buff) + } + + return out, nil +} + +func ParseFastqChunk(source string, ch FastxChunk, quality_shift byte) *obiiter.BioSequenceBatch { + slice := make(obiseq.BioSequenceSlice, 0, obioptions.CLIBatchSize()) + + state := 0 + start := 0 + current := 0 + var identifier string + var definition string + + for i := 0; i < len(ch.Bytes); i++ { + C := ch.Bytes[i] + is_end_of_line := C == '\r' || C == '\n' + is_space := C == ' ' || C == '\t' + is_sep := is_space || is_end_of_line + + switch state { + case 0: + if C == '@' { + // Beginning of sequence + state = 1 + } + case 1: + if is_sep { + // No identifier -> ERROR + log.Errorf("%s : sequence identifier is empty", source) + return nil + } else { + // Beginning of identifier + state = 2 + start = i + } + case 2: + if is_sep { + // End of identifier + identifier = string(ch.Bytes[start:i]) + state = 3 + } + case 3: + if is_end_of_line { + // Definition empty + definition = "" + state = 5 + } else if !is_space { + // Beginning of definition + start = i + state = 4 + } + case 4: + if is_end_of_line { + definition = string(ch.Bytes[start:i]) + state = 5 + + } + case 5: + if !is_end_of_line { + // Beginning of sequence + start = i + if C >= 'A' && C <= 'Z' { + ch.Bytes[current] = C + 'a' - 'A' + } + current = i + 1 + state = 6 + } + case 6: + if is_end_of_line { + // End of sequence + s := obiseq.NewBioSequence(identifier, bytes.Clone(ch.Bytes[start:current]), definition) + s.SetSource(source) + slice = append(slice, s) + state = 7 + } else { + if C >= 'A' && C <= 'Z' { + ch.Bytes[current] = C + 'a' - 'A' + } + current = i + 1 + } + case 7: + if is_end_of_line { + state = 7 + } else if C == '+' { + state = 8 + } else { + log.Errorf("%s[%s] : sequence data not followed by a line starting with +", identifier, source) + return nil // Error + } + case 8: + if is_end_of_line { + state = 9 + } + case 9: + if is_end_of_line { + state = 9 + } else { + // beginning of quality + state = 10 + start = i + } + case 10: + if is_end_of_line { + // End of quality + q := ch.Bytes[start:i] + if len(q) != slice[len(slice)-1].Len() { + log.Errorf("%s[%s] : sequence data and quality lenght not equal (%d/%d)", + identifier, source, len(q), slice[len(slice)-1].Len()) + return nil // Error quality lenght not equal to sequence length + } + for i := 0; i < len(q); i++ { + q[i] = q[i] - quality_shift + } + slice[len(slice)-1].SetQualities(q) + state = 11 + } + case 11: + if is_end_of_line { + state = 11 + } else if C == '@' { + state = 1 + } else { + log.Errorf("%s[%s] : sequence record not followed by a line starting with @", identifier, source) + return nil + } + + } + } + + batch := obiiter.MakeBioSequenceBatch(ch.index, slice) + return &batch +} + +func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { + opt := MakeOptions(options) + out := obiiter.MakeIBioSequence() + + source := opt.Source() + + nworker := obioptions.CLIReadParallelWorkers() + out.Add(nworker) + + chkchan, err := FastqChunkReader(reader, 1024*500) + + if err != nil { + return obiiter.NilIBioSequence, err + } + + go func() { + out.WaitAndClose() + }() + + parser := func() { + defer out.Done() + for chk := range chkchan { + seqs := ParseFastqChunk(source, chk, byte(opt.QualityShift())) + if seqs != nil { + out.Push(*seqs) + } else { + log.Fatalf("error parsing %s", source) + } + } + } + + for i := 0; i < nworker; i++ { + go parser() + } + + newIter := out.SortBatches().Rebatch(opt.BatchSize()) + + log.Debugln("Full file batch mode : ", opt.FullFileBatch()) + if opt.FullFileBatch() { + newIter = newIter.CompleteFileIterator() + } + + annotParser := opt.ParseFastSeqHeader() + + if annotParser != nil { + return IParseFastSeqHeaderBatch(newIter, options...), nil + } + + return newIter, nil +} + +func ReadFastqFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { + options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename))))) + + file, err := Ropen(filename) + + if err != nil { + return obiiter.NilIBioSequence, err + } + + return ReadFastq(file, options...) +} + +func ReadFastqFromStdin(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) { + options = append(options, OptionsSource(obiutils.RemoveAllExt("stdin"))) + input, err := Buf(os.Stdin) + + if err != nil { + log.Fatalf("open file error: %v", err) + return obiiter.NilIBioSequence, err + } + + return ReadFastq(input, options...) +} diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index 84ca323..bd7660b 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -148,14 +148,15 @@ func ReadSequencesFromFile(filename string, if err != nil { return obiiter.NilIBioSequence, err } - + log.Infof("%s mime type: %s", filename, mime.String()) reader = bufio.NewReader(reader) switch mime.String() { case "text/fastq": - file.Close() - is, err := ReadFastSeqFromFile(filename, options...) - return is, err + return ReadFastq(reader, options...) + // file.Close() + // is, err := ReadFastSeqFromFile(filename, options...) + // return is, err case "text/fasta": return ReadFasta(reader, options...) case "text/ecopcr2":