A go implementation of the fasta reader

Former-commit-id: 603592c4761fb0722e9e0501d78de1bd3ba238fa
This commit is contained in:
2023-09-01 09:30:12 +02:00
parent 3f8c0d6a2f
commit 62b57f4ede
15 changed files with 1403 additions and 77 deletions

View File

@@ -3,13 +3,14 @@ package obiformats
import (
"encoding/csv"
"fmt"
gzip "github.com/klauspost/pgzip"
"io"
"os"
"path"
"strconv"
"strings"
gzip "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
@@ -209,7 +210,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
}()
if opt.pointer.full_file_batch {
newIter = newIter.FullFileIterator()
newIter = newIter.CompleteFileIterator()
}
return newIter

View File

@@ -266,7 +266,7 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
go _ReadFlatFileChunk(reader, entry_channel)
if opt.pointer.full_file_batch {
newIter = newIter.FullFileIterator()
newIter = newIter.CompleteFileIterator()
}
return newIter

View File

@@ -0,0 +1,322 @@
package obiformats
import (
"bytes"
"fmt"
"io"
"os"
"path"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils"
"golang.org/x/exp/slices"
log "github.com/sirupsen/logrus"
)
// lastSequenceCut extracts the up to the last sequence cut from a given buffer.
//
// It takes a parameter:
// - buffer []byte: the buffer to extract the sequence cut from.
//
// It returns two values:
// - []byte: the extracted sequences.
// - []byte: the remaining buffer after the sequence cut (the last sequence).
func lastSequenceCut(buffer []byte) ([]byte, []byte) {
imax := len(buffer)
last := 0
state := 0
for i := imax - 1; i >= 0 && state < 2; i-- {
if state == 0 && buffer[i] == '>' {
state = 1
last = i
} else if state == 1 && (buffer[i] == '\r' || buffer[i] == '\n') {
state = 2
} else {
state = 0
}
}
if state == 2 {
return buffer[:last], bytes.Clone(buffer[last:])
}
return []byte{}, buffer
}
// firstSequenceCut cuts the input buffer at the first occurrence of a ">" character
// following a sequence of "\r" or "\n" characters.
//
// It takes a byte slice as input, representing the buffer to be cut.
// It returns two byte slices: the first slice contains the part of the buffer before the cut,
// and the second slice contains the part of the buffer after the cut.
func firstSequenceCut(buffer []byte) ([]byte, []byte) {
imax := len(buffer)
last := 0
state := 0
for i := 0; i < imax && state < 2; i++ {
if (state == 0 || state == 1) && (buffer[i] == '\r' || buffer[i] == '\n') {
state = 1
} else if (state == 1 || i == 0) && buffer[i] == '>' {
state = 2
last = i
} else {
state = 0
}
}
if state == 2 {
return bytes.Clone(buffer[:last]), buffer[last:]
}
return buffer, []byte{}
}
func fullSequenceCut(buffer []byte) ([]byte, []byte, []byte) {
before, buffer := firstSequenceCut(buffer)
if len(buffer) == 0 {
return before, []byte{}, []byte{}
}
buffer, after := lastSequenceCut(buffer)
return before, buffer, after
}
func Concatenate[S ~[]E, E any](s1, s2 S) S {
if len(s1) > 0 {
if len(s2) > 0 {
return append(s1[:len(s1):len(s1)], s2...)
}
return s1
}
return s2
}
type FastxChunk struct {
Bytes []byte
index int
}
func FastaChunkReader(r io.Reader, size int, cutHead bool) (chan FastxChunk, error) {
out := make(chan FastxChunk)
buff := make([]byte, size)
n, err := r.Read(buff)
if n > 0 && err == nil {
if n < size {
buff = buff[:n]
}
begin, buff := firstSequenceCut(buff)
if len(begin) > 0 && !cutHead {
return out, fmt.Errorf("begin is not empty : %s", string(begin))
}
go func(buff []byte) {
idx := 0
end := []byte{}
for err == nil && n > 0 {
// fmt.Println("============end=========================")
// fmt.Println(string(end))
// fmt.Println("------------buff------------------------")
// fmt.Println(string(buff))
buff = Concatenate(end, buff)
// fmt.Println("------------buff--pasted----------------")
// fmt.Println(string(buff))
buff, end = lastSequenceCut(buff)
// fmt.Println("----------------buff--cutted------------")
// fmt.Println(string(buff))
// fmt.Println("------------------end-------------------")
// fmt.Println(string(end))
// fmt.Println("========================================")
if len(buff) > 0 {
out <- FastxChunk{
Bytes: bytes.Clone(buff),
index: idx,
}
idx++
}
buff = slices.Grow(buff[:0], size)[0:size]
n, err = r.Read(buff)
if n < size {
buff = buff[:n]
}
// fmt.Printf("n = %d, err = %v\n", n, err)
}
if len(end) > 0 {
out <- FastxChunk{
Bytes: bytes.Clone(end),
index: idx,
}
}
close(out)
}(buff)
}
return out, nil
}
func ParseFastaChunk(source string, ch FastxChunk) *obiiter.BioSequenceBatch {
slice := make(obiseq.BioSequenceSlice, 0, obioptions.CLIBatchSize())
state := 0
start := 0
current := 0
var identifier string
var definition string
for i := 0; i < len(ch.Bytes); i++ {
C := ch.Bytes[i]
is_end_of_line := C == '\r' || C == '\n'
is_space := C == ' ' || C == '\t'
is_sep := is_space || is_end_of_line
switch state {
case 0:
if C == '>' {
// Beginning of sequence
state = 1
}
case 1:
if is_sep {
// No identifier -> ERROR
return nil
} else {
// Beginning of identifier
state = 2
start = i
}
case 2:
if is_sep {
// End of identifier
identifier = string(ch.Bytes[start:i])
state = 3
}
case 3:
if is_end_of_line {
// Definition empty
definition = ""
state = 5
} else if !is_space {
// Beginning of definition
start = i
state = 4
}
case 4:
if is_end_of_line {
definition = string(ch.Bytes[start:i])
state = 5
}
case 5:
if !is_end_of_line {
// Beginning of sequence
start = i
current = i + 1
state = 6
}
case 6:
if C == '>' {
// End of sequence
s := obiseq.NewBioSequence(identifier, bytes.Clone(ch.Bytes[start:current]), definition)
s.SetSource(source)
slice = append(slice, s)
state = 1
} else if !is_sep {
if C >= 'A' && C <= 'Z' {
C = C + 'a' - 'A'
}
// Removing white space from the sequence
if (C >= 'a' && C <= 'z') || C == '-' || C == '.' {
ch.Bytes[current] = C
current++
}
}
}
}
slice = append(slice, obiseq.NewBioSequence(identifier, bytes.Clone(ch.Bytes[start:current]), definition))
batch := obiiter.MakeBioSequenceBatch(ch.index, slice)
return &batch
}
func ReadFasta(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options)
out := obiiter.MakeIBioSequence()
source := opt.Source()
nworker := obioptions.CLIReadParallelWorkers()
out.Add(nworker)
chkchan, err := FastaChunkReader(reader, 1024*500, false)
if err != nil {
return obiiter.NilIBioSequence, err
}
go func() {
out.WaitAndClose()
}()
parser := func() {
defer out.Done()
for chk := range chkchan {
seqs := ParseFastaChunk(source, chk)
if seqs != nil {
out.Push(*seqs)
}
}
}
for i := 0; i < nworker; i++ {
go parser()
}
newIter := out.SortBatches().Rebatch(opt.BatchSize())
log.Debugln("Full file batch mode : ", opt.FullFileBatch())
if opt.FullFileBatch() {
newIter = newIter.CompleteFileIterator()
}
annotParser := opt.ParseFastSeqHeader()
if annotParser != nil {
return IParseFastSeqHeaderBatch(newIter, options...), nil
}
return newIter, nil
}
func ReadFastaFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) {
options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename)))))
file, err := Ropen(filename)
if err != nil {
return obiiter.NilIBioSequence, err
}
return ReadFasta(file, options...)
}
func ReadFastaFromStdin(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, error) {
options = append(options, OptionsSource(obiutils.RemoveAllExt("stdin")))
input, err := Buf(os.Stdin)
if err != nil {
log.Fatalf("open file error: %v", err)
return obiiter.NilIBioSequence, err
}
return ReadFasta(input, options...)
}

View File

@@ -124,7 +124,7 @@ func ReadFastSeqFromFile(filename string, options ...WithOption) (obiiter.IBioSe
log.Debugln("Full file batch mode : ", opt.FullFileBatch())
if opt.FullFileBatch() {
newIter = newIter.FullFileIterator()
newIter = newIter.CompleteFileIterator()
}
parser := opt.ParseFastSeqHeader()
@@ -155,7 +155,7 @@ func ReadFastSeqFromStdin(options ...WithOption) obiiter.IBioSequence {
log.Debugln("Full file batch mode : ", opt.FullFileBatch())
if opt.FullFileBatch() {
newIter = newIter.FullFileIterator()
newIter = newIter.CompleteFileIterator()
}
parser := opt.ParseFastSeqHeader()

View File

@@ -34,14 +34,14 @@ var _seqlenght_rx = regexp.MustCompile(" +([0-9]+) bp")
func _ParseGenbankFile(source string,
input <-chan _FileChunk, out obiiter.IBioSequence,
chunck_order func() int) {
var err error
var err error
state := inHeader
for chunks := range input {
// log.Debugln("Chunk size", (chunks.raw.(*bytes.Buffer)).Len())
scanner := bufio.NewScanner(chunks.raw)
sequences := make(obiseq.BioSequenceSlice, 0, 100)
sumlength:=0
sumlength := 0
id := ""
lseq := -1
scientificName := ""
@@ -61,12 +61,12 @@ func _ParseGenbankFile(source string,
case strings.HasPrefix(line, "LOCUS "):
state = inEntry
id = strings.SplitN(line[12:], " ", 2)[0]
match_length := _seqlenght_rx.FindStringSubmatch(line)
match_length := _seqlenght_rx.FindStringSubmatch(line)
if len(match_length) > 0 {
lseq,err = strconv.Atoi(match_length[1])
lseq, err = strconv.Atoi(match_length[1])
if err != nil {
lseq = -1
}
}
}
if lseq > 0 {
seqBytes = bytes.NewBuffer(obiseq.GetSlice(lseq + 20))
@@ -101,7 +101,7 @@ func _ParseGenbankFile(source string,
// sequence.Len(), seqBytes.Len())
sequences = append(sequences, sequence)
sumlength+=sequence.Len()
sumlength += sequence.Len()
if len(sequences) == 100 || sumlength > 1e7 {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
@@ -137,7 +137,7 @@ func _ParseGenbankFile(source string,
if len(sequences) > 0 {
out.Push(obiiter.MakeBioSequenceBatch(chunck_order(), sequences))
}
}
}
out.Done()
@@ -159,13 +159,13 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence {
// for j := 0; j < opt.ParallelWorkers(); j++ {
for j := 0; j < nworkers; j++ {
go _ParseGenbankFile(opt.Source(), entry_channel, newIter,chunck_order)
go _ParseGenbankFile(opt.Source(), entry_channel, newIter, chunck_order)
}
go _ReadFlatFileChunk(reader, entry_channel)
if opt.pointer.full_file_batch {
newIter = newIter.FullFileIterator()
newIter = newIter.CompleteFileIterator()
}
return newIter

View File

@@ -1,6 +1,7 @@
package obiformats
import (
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
)
@@ -43,8 +44,8 @@ func MakeOptions(setters []WithOption) Options {
with_progress_bar: false,
buffer_size: 2,
quality_shift: 33,
parallel_workers: 4,
batch_size: 5000,
parallel_workers: obioptions.CLIReadParallelWorkers(),
batch_size: obioptions.CLIBatchSize(),
full_file_batch: false,
closefile: false,
appendfile: false,

View File

@@ -4,12 +4,10 @@ import (
"bufio"
"bytes"
"io"
"os"
"path"
"regexp"
"github.com/gabriel-vasile/mimetype"
gzip "github.com/klauspost/pgzip"
log "github.com/sirupsen/logrus"
@@ -91,6 +89,36 @@ func OBIMimeTypeGuesser(stream io.Reader) (*mimetype.MIME, io.Reader, error) {
return mimeType, newReader, nil
}
// func ReadSequences(reader io.Reader,
// options ...WithOption) (obiiter.IBioSequence, error) {
// mime, reader, err := OBIMimeTypeGuesser(reader)
// if err != nil {
// return obiiter.NilIBioSequence, err
// }
// reader = bufio.NewReader(reader)
// switch mime.String() {
// case "text/fasta", "text/fastq":
// file.Close()
// is, err := ReadFastSeqFromFile(filename, options...)
// return is, err
// case "text/ecopcr2":
// return ReadEcoPCR(reader, options...), nil
// case "text/embl":
// return ReadEMBL(reader, options...), nil
// case "text/genbank":
// return ReadGenbank(reader, options...), nil
// default:
// log.Fatalf("File %s has guessed format %s which is not yet implemented",
// filename, mime.String())
// }
// return obiiter.NilIBioSequence, nil
// }
// ReadSequencesFromFile reads sequences from a file and returns an iterator of bio sequences and an error.
//
// Parameters:
@@ -102,32 +130,20 @@ func OBIMimeTypeGuesser(stream io.Reader) (*mimetype.MIME, io.Reader, error) {
// - error: An error if any occurred during the reading process.
func ReadSequencesFromFile(filename string,
options ...WithOption) (obiiter.IBioSequence, error) {
var file *os.File
var file *Reader
var reader io.Reader
var greader io.Reader
var err error
options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename)))))
file, err = os.Open(filename)
file, err = Ropen(filename)
if err != nil {
log.Fatalf("open file error: %v", err)
return obiiter.NilIBioSequence, err
}
reader = file
// Test if the flux is compressed by gzip
greader, err = gzip.NewReader(reader)
if err != nil {
file.Seek(0, 0)
} else {
log.Debugf("File %s is gz compressed ", filename)
reader = greader
}
mime, reader, err := OBIMimeTypeGuesser(reader)
mime, reader, err := OBIMimeTypeGuesser(file)
if err != nil {
return obiiter.NilIBioSequence, err
@@ -136,10 +152,12 @@ func ReadSequencesFromFile(filename string,
reader = bufio.NewReader(reader)
switch mime.String() {
case "text/fasta", "text/fastq":
case "text/fastq":
file.Close()
is, err := ReadFastSeqFromFile(filename, options...)
return is, err
case "text/fasta":
return ReadFasta(reader, options...)
case "text/ecopcr2":
return ReadEcoPCR(reader, options...), nil
case "text/embl":
@@ -153,3 +171,9 @@ func ReadSequencesFromFile(filename string,
return obiiter.NilIBioSequence, nil
}
// func ReadSequencesFromStdin(options ...WithOption) obiiter.IBioSequence {
// options = append(options, OptionsSource("stdin"))
// }

437
pkg/obiformats/xopen.go Normal file
View File

@@ -0,0 +1,437 @@
// This is an integration of the xopen package originally written by Brent Pedersen
// (https://github.com/brentp/xopen).
//
// Here it can be considered as a fork of [Wei Shen](http://shenwei.me) the version :
//
// https://github.com/shenwei356/xopen
//
// Package xopen makes it easy to get buffered readers and writers.
// Ropen opens a (possibly gzipped) file/process/http site for buffered reading.
// Wopen opens a (possibly gzipped) file for buffered writing.
// Both will use gzip when appropriate and will user buffered IO.
package obiformats
import (
"bufio"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"os/user"
"path/filepath"
"strings"
"github.com/dsnet/compress/bzip2"
"github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip"
"github.com/ulikunitz/xz"
)
// Level is the default compression level of gzip.
// This value will be automatically adjusted to the default value of zstd or bzip2.
var Level = gzip.DefaultCompression
// ErrNoContent means nothing in the stream/file.
var ErrNoContent = errors.New("xopen: no content")
// ErrDirNotSupported means the path is a directory.
var ErrDirNotSupported = errors.New("xopen: input is a directory")
// IsGzip returns true buffered Reader has the gzip magic.
func IsGzip(b *bufio.Reader) (bool, error) {
return CheckBytes(b, []byte{0x1f, 0x8b})
}
// IsXz returns true buffered Reader has the xz magic.
func IsXz(b *bufio.Reader) (bool, error) {
return CheckBytes(b, []byte{0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00})
}
// IsZst returns true buffered Reader has the zstd magic.
func IsZst(b *bufio.Reader) (bool, error) {
return CheckBytes(b, []byte{0x28, 0xB5, 0x2f, 0xfd})
}
// IsBzip2 returns true buffered Reader has the bzip2 magic.
func IsBzip2(b *bufio.Reader) (bool, error) {
return CheckBytes(b, []byte{0x42, 0x5a, 0x68})
}
// IsStdin checks if we are getting data from stdin.
func IsStdin() bool {
// http://stackoverflow.com/a/26567513
stat, err := os.Stdin.Stat()
if err != nil {
return false
}
return (stat.Mode() & os.ModeCharDevice) == 0
}
// ExpandUser expands ~/path and ~otheruser/path appropriately
func ExpandUser(path string) (string, error) {
if len(path) == 0 || path[0] != '~' {
return path, nil
}
var u *user.User
var err error
if len(path) == 1 || path[1] == '/' {
u, err = user.Current()
} else {
name := strings.Split(path[1:], "/")[0]
u, err = user.Lookup(name)
}
if err != nil {
return "", err
}
home := u.HomeDir
path = home + "/" + path[1:]
return path, nil
}
// Exists checks if a local file exits
func Exists(path string) bool {
path, perr := ExpandUser(path)
if perr != nil {
return false
}
_, err := os.Stat(path)
return err == nil
}
// CheckBytes peeks at a buffered stream and checks if the first read bytes match.
func CheckBytes(b *bufio.Reader, buf []byte) (bool, error) {
m, err := b.Peek(len(buf))
if err != nil {
// return false, ErrNoContent
return false, err // EOF
}
for i := range buf {
if m[i] != buf[i] {
return false, nil
}
}
return true, nil
}
// Reader is returned by Ropen
type Reader struct {
*bufio.Reader
rdr io.Reader
gz io.ReadCloser
}
// Close the associated files.
func (r *Reader) Close() error {
var err error
if r.gz != nil {
err = r.gz.Close()
if err != nil {
return err
}
}
if c, ok := r.rdr.(io.ReadCloser); ok {
err = c.Close()
if err != nil {
return err
}
}
return nil
}
// Writer is returned by Wopen
type Writer struct {
*bufio.Writer
wtr *os.File
gz *gzip.Writer
xw *xz.Writer
zw *zstd.Encoder
bz2 *bzip2.Writer
}
// Close the associated files.
func (w *Writer) Close() error {
var err error
err = w.Flush()
if err != nil {
return err
}
if w.gz != nil {
err = w.gz.Close()
if err != nil {
return err
}
}
if w.xw != nil {
err = w.xw.Close()
if err != nil {
return err
}
}
if w.zw != nil {
err = w.zw.Close()
if err != nil {
return err
}
}
if w.bz2 != nil {
err = w.bz2.Close()
if err != nil {
return err
}
}
return w.wtr.Close()
}
// Flush the writer.
func (w *Writer) Flush() error {
var err error
err = w.Writer.Flush()
if err != nil {
return err
}
if w.gz != nil {
err = w.gz.Flush()
if err != nil {
return err
}
}
if w.zw != nil {
err = w.zw.Flush()
if err != nil {
return err
}
}
return nil
}
var bufSize = 65536
// Buf returns a buffered reader from an io.Reader
// If f == "-", then it will attempt to read from os.Stdin.
// If the file is gzipped, it will be read as such.
func Buf(r io.Reader) (*Reader, error) {
b := bufio.NewReaderSize(r, bufSize)
var rd io.Reader
var rdr io.ReadCloser
if is, err := IsGzip(b); err != nil {
// check BOM
t, _, err := b.ReadRune() // no content
if err != nil {
return nil, ErrNoContent
}
if t != '\uFEFF' {
b.UnreadRune()
}
return &Reader{b, r, rdr}, nil // non-gzip file with content less than 2 bytes
} else if is {
rdr, err = gzip.NewReader(b)
if err != nil {
return nil, err
}
b = bufio.NewReaderSize(rdr, bufSize)
} else if is, err := IsZst(b); err != nil {
// check BOM
t, _, err := b.ReadRune() // no content
if err != nil {
return nil, ErrNoContent
}
if t != '\uFEFF' {
b.UnreadRune()
}
return &Reader{b, r, rdr}, nil // non-gzip/zst file with content less than 4 bytes
} else if is {
rd, err = zstd.NewReader(b)
if err != nil {
return nil, err
}
b = bufio.NewReaderSize(rd, bufSize)
} else if is, err := IsXz(b); err != nil {
// check BOM
t, _, err := b.ReadRune() // no content
if err != nil {
return nil, ErrNoContent
}
if t != '\uFEFF' {
b.UnreadRune()
}
return &Reader{b, r, rdr}, nil // non-gzip/zst/xz file with content less than 6 bytes
} else if is {
rd, err = xz.NewReader(b)
if err != nil {
return nil, err
}
b = bufio.NewReaderSize(rd, bufSize)
} else if is, err := IsBzip2(b); err != nil {
// check BOM
t, _, err := b.ReadRune() // no content
if err != nil {
return nil, ErrNoContent
}
if t != '\uFEFF' {
b.UnreadRune()
}
return &Reader{b, r, rdr}, nil // non-gzip/zst/xz file with content less than 6 bytes
} else if is {
rd, err = bzip2.NewReader(b, &bzip2.ReaderConfig{})
if err != nil {
return nil, err
}
b = bufio.NewReaderSize(rd, bufSize)
}
// other files with content >= 6 bytes
// check BOM
t, _, err := b.ReadRune()
if err != nil {
return nil, ErrNoContent
}
if t != '\uFEFF' {
b.UnreadRune()
}
return &Reader{b, r, rdr}, nil
}
// XReader returns a reader from a url string or a file.
func XReader(f string) (io.Reader, error) {
if strings.HasPrefix(f, "http://") || strings.HasPrefix(f, "https://") {
var rsp *http.Response
rsp, err := http.Get(f)
if err != nil {
return nil, err
}
if rsp.StatusCode != 200 {
return nil, fmt.Errorf("http error downloading %s. status: %s", f, rsp.Status)
}
rdr := rsp.Body
return rdr, nil
}
f, err := ExpandUser(f)
if err != nil {
return nil, err
}
fi, err := os.Stat(f)
if err != nil {
return nil, err
}
if fi.IsDir() {
return nil, ErrDirNotSupported
}
return os.Open(f)
}
// Ropen opens a buffered reader.
func Ropen(f string) (*Reader, error) {
var err error
var rdr io.Reader
if f == "-" {
if !IsStdin() {
return nil, errors.New("stdin not detected")
}
b, err := Buf(os.Stdin)
return b, err
} else if f[0] == '|' {
// TODO: use csv to handle quoted file names.
cmdStrs := strings.Split(f[1:], " ")
var cmd *exec.Cmd
if len(cmdStrs) == 2 {
cmd = exec.Command(cmdStrs[0], cmdStrs[1:]...)
} else {
cmd = exec.Command(cmdStrs[0])
}
rdr, err = cmd.StdoutPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
} else {
rdr, err = XReader(f)
}
if err != nil {
return nil, err
}
b, err := Buf(rdr)
return b, err
}
// Wopen opens a buffered reader.
// If f == "-", then stdout will be used.
// If f endswith ".gz", then the output will be gzipped.
// If f endswith ".xz", then the output will be zx-compressed.
// If f endswith ".zst", then the output will be zstd-compressed.
// If f endswith ".bz2", then the output will be bzip2-compressed.
func Wopen(f string) (*Writer, error) {
return WopenFile(f, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
}
// WopenFile opens a buffered reader.
// If f == "-", then stdout will be used.
// If f endswith ".gz", then the output will be gzipped.
// If f endswith ".xz", then the output will be zx-compressed.
// If f endswith ".bz2", then the output will be bzip2-compressed.
func WopenFile(f string, flag int, perm os.FileMode) (*Writer, error) {
var wtr *os.File
if f == "-" {
wtr = os.Stdout
} else {
dir := filepath.Dir(f)
fi, err := os.Stat(dir)
if err == nil && !fi.IsDir() {
return nil, fmt.Errorf("can not write file into a non-directory path: %s", dir)
}
if os.IsNotExist(err) {
os.MkdirAll(dir, 0755)
}
wtr, err = os.OpenFile(f, flag, perm)
if err != nil {
return nil, err
}
}
f2 := strings.ToLower(f)
if strings.HasSuffix(f2, ".gz") {
gz, err := gzip.NewWriterLevel(wtr, Level)
if err != nil {
err = errors.New(fmt.Sprintf("xopen: %s", err))
}
return &Writer{bufio.NewWriterSize(gz, bufSize), wtr, gz, nil, nil, nil}, err
}
if strings.HasSuffix(f2, ".xz") {
xw, err := xz.NewWriter(wtr)
return &Writer{bufio.NewWriterSize(xw, bufSize), wtr, nil, xw, nil, nil}, err
}
if strings.HasSuffix(f2, ".zst") {
level := Level
if level == gzip.DefaultCompression {
level = 2
}
zw, err := zstd.NewWriter(wtr, zstd.WithEncoderLevel(zstd.EncoderLevel(level)))
if err != nil {
err = errors.New(fmt.Sprintf("xopen: zstd: %s", err))
}
return &Writer{bufio.NewWriterSize(zw, bufSize), wtr, nil, nil, zw, nil}, err
}
if strings.HasSuffix(f2, ".bz2") {
level := Level
if level == gzip.DefaultCompression {
level = 6
}
bz2, err := bzip2.NewWriter(wtr, &bzip2.WriterConfig{Level: level})
if err != nil {
err = errors.New(fmt.Sprintf("xopen: %s", err))
}
return &Writer{bufio.NewWriterSize(bz2, bufSize), wtr, nil, nil, nil, bz2}, err
}
return &Writer{bufio.NewWriterSize(wtr, bufSize), wtr, nil, nil, nil, nil}, nil
}

View File

@@ -0,0 +1,148 @@
package obiformats
import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
"os"
"strings"
"testing"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) { TestingT(t) }
type XopenTest struct{}
var _ = Suite(&XopenTest{})
func gzFromString(s string) string {
var c bytes.Buffer
gz := gzip.NewWriter(&c)
gz.Write([]byte(s))
return c.String()
}
var gzTests = []struct {
isGz bool
data string
}{
{false, "asdf"},
{true, gzFromString("asdf")},
}
func (s *XopenTest) TestIsGzip(c *C) {
for _, t := range gzTests {
isGz, err := IsGzip(bufio.NewReader(strings.NewReader(t.data)))
c.Assert(err, IsNil)
c.Assert(t.isGz, Equals, isGz)
}
}
func (s *XopenTest) TestIsStdin(c *C) {
r := IsStdin()
c.Assert(r, Equals, false)
}
func (s *XopenTest) TestRopen(c *C) {
rdr, err := Ropen("-")
c.Assert(err, ErrorMatches, "stdin not detected")
c.Assert(rdr, IsNil)
}
func (s *XopenTest) TestWopen(c *C) {
for _, f := range []string{"t.gz", "t"} {
testString := "ASDF1234"
wtr, err := Wopen(f)
c.Assert(err, IsNil)
_, err = os.Stat(f)
c.Assert(err, IsNil)
c.Assert(wtr.wtr, NotNil)
fmt.Fprintf(wtr, testString)
wtr.Close()
rdr, err := Ropen(f)
c.Assert(err, IsNil)
str, err := rdr.ReadString(99)
c.Assert(str, Equals, testString)
c.Assert(err, Equals, io.EOF)
str, err = rdr.ReadString(99)
c.Assert(str, Equals, "")
rdr.Close()
os.Remove(f)
}
}
var httpTests = []struct {
url string
expectError bool
}{
{"https://raw.githubusercontent.com/brentp/xopen/master/README.md", false},
{"http://raw.githubusercontent.com/brentp/xopen/master/README.md", false},
{"http://raw.githubusercontent.com/brentp/xopen/master/BAD.md", true},
}
func (s *XopenTest) TestReadHttp(c *C) {
for _, t := range httpTests {
rdr, err := Ropen(t.url)
if !t.expectError {
c.Assert(err, IsNil)
v, err := rdr.ReadString(byte('\n'))
c.Assert(err, IsNil)
c.Assert(len(v), Not(Equals), 0)
} else {
c.Assert(err, ErrorMatches, ".* 404 Not Found")
}
}
}
func (s *XopenTest) TestReadProcess(c *C) {
for _, cmd := range []string{"|ls -lh", "|ls", "|ls -lh xopen_test.go"} {
rdr, err := Ropen(cmd)
c.Assert(err, IsNil)
b := make([]byte, 1000)
_, err = rdr.Read(b)
if err != io.EOF {
c.Assert(err, IsNil)
}
lines := strings.Split(string(b), "\n")
has := false
for _, line := range lines {
if strings.Contains(line, "xopen_test.go") {
has = true
}
}
c.Assert(has, Equals, true)
}
}
func (s *XopenTest) TestOpenStdout(c *C) {
w, err := Wopen("-")
c.Assert(err, IsNil)
c.Assert(w.wtr, Equals, os.Stdout)
}
func (s *XopenTest) TestOpenBadFile(c *C) {
r, err := Ropen("XXXXXXXXXXXXXXXXXXXXXXX")
c.Assert(r, IsNil)
c.Assert(err, ErrorMatches, ".*no such file.*")
}
func (s *XopenTest) TestExists(c *C) {
c.Assert(Exists("xopen.go"), Equals, true)
c.Assert(Exists("____xx"), Equals, false)
}
func (s *XopenTest) TestUser(c *C) {
c.Assert(Exists("~"), Equals, true)
}
func (s *XopenTest) TestExpand(c *C) {
_, err := ExpandUser("~baduser66")
c.Assert(err, Not(IsNil))
}