From 56722895e4f3caf0dc767d5f1a2cda2c6a63fa3e Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 18 Feb 2023 12:06:52 +0100 Subject: [PATCH] Adds to every obitools the ability to save in gzip compressed files --- pkg/goutils/gzipfile.go | 44 ++++++++++++++++++---- pkg/obiformats/fastseq_write_fasta.go | 25 ++++++------ pkg/obiformats/fastseq_write_fastq.go | 24 +++++------- pkg/obiformats/options.go | 8 ++-- pkg/obiformats/universal_write.go | 13 +++---- pkg/obitools/obiconvert/options.go | 9 +++++ pkg/obitools/obiconvert/sequence_writer.go | 2 + pkg/obitools/obidistribute/distribute.go | 18 +++------ pkg/obitools/obidistribute/options.go | 8 ---- 9 files changed, 85 insertions(+), 66 deletions(-) diff --git a/pkg/goutils/gzipfile.go b/pkg/goutils/gzipfile.go index 252232d..e026b9a 100644 --- a/pkg/goutils/gzipfile.go +++ b/pkg/goutils/gzipfile.go @@ -3,12 +3,14 @@ package goutils import ( "bufio" "compress/gzip" + "io" "os" ) type Wfile struct { compressed bool - f *os.File + close bool + out io.WriteCloser gf *gzip.Writer fw *bufio.Writer } @@ -36,10 +38,33 @@ func OpenWritingFile(name string, compressed bool, append bool) (*Wfile, error) fw = bufio.NewWriter(fi) } - return &Wfile{compressed: compressed, - f: fi, - gf: gf, - fw: fw, + return &Wfile{ + compressed: compressed, + close: true, + out: fi, + gf: gf, + fw: fw, + }, nil +} + +func CompressStream(out io.WriteCloser, compressed bool, close bool) (*Wfile, error) { + var gf *gzip.Writer + var fw *bufio.Writer + + if compressed { + gf = gzip.NewWriter(out) + fw = bufio.NewWriter(gf) + } else { + gf = nil + fw = bufio.NewWriter(out) + } + + return &Wfile{ + compressed: compressed, + close: close, + out: out, + gf: gf, + fw: fw, }, nil } @@ -56,12 +81,17 @@ func (w *Wfile) Close() error { err = nil w.fw.Flush() - + if w.compressed { err = w.gf.Close() } - err2 := w.f.Close() + var err2 error + err2 = nil + + if w.close { + err2 = w.out.Close() + } if err == nil { err = err2 diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index e79574b..b24bd5b 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -65,10 +65,12 @@ func FormatFastaBatch(batch obiiter.BioSequenceBatch, formater FormatHeader) []b } func WriteFasta(iterator obiiter.IBioSequence, - file io.Writer, + file io.WriteCloser, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) + file,_ = goutils.CompressStream(file,opt.CompressedFile(),opt.CloseFile()) + buffsize := iterator.BufferSize() newIter := obiiter.MakeIBioSequence(buffsize) @@ -133,15 +135,8 @@ func WriteFasta(iterator obiiter.IBioSequence, } - if opt.CloseFile() { - switch file := file.(type) { - case *os.File: - file.Close() - case *goutils.Wfile: - file.Close() - } - } - + file.Close() + log.Debugln("End of the fasta file writing") obiiter.UnregisterPipe() waitWriter.Done() @@ -163,11 +158,13 @@ func WriteFastaToFile(iterator obiiter.IBioSequence, opt := MakeOptions(options) + flags := os.O_WRONLY | os.O_CREATE - file,err := goutils.OpenWritingFile(filename, - opt.CompressedFile(), - opt.AppendFile(), - ) + if opt.AppendFile() { + flags |= os.O_APPEND + } + file, err := os.OpenFile(filename, flags, 0660) + if err != nil { log.Fatalf("open file error: %v", err) diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 7126f98..5d4c7f8 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -54,10 +54,12 @@ type FileChunck struct { } func WriteFastq(iterator obiiter.IBioSequence, - file io.Writer, + file io.WriteCloser, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) + file,_ = goutils.CompressStream(file,opt.CompressedFile(),opt.CloseFile()) + buffsize := iterator.BufferSize() newIter := obiiter.MakeIBioSequence(buffsize) @@ -123,14 +125,7 @@ func WriteFastq(iterator obiiter.IBioSequence, } - if opt.CloseFile() { - switch file := file.(type) { - case *os.File: - file.Close() - case *goutils.Wfile: - file.Close() - } - } + file.Close() log.Debugln("End of the fastq file writing") obiiter.UnregisterPipe() @@ -151,12 +146,13 @@ func WriteFastqToFile(iterator obiiter.IBioSequence, options ...WithOption) (obiiter.IBioSequence, error) { opt := MakeOptions(options) + flags := os.O_WRONLY | os.O_CREATE - file, err := goutils.OpenWritingFile(filename, - opt.CompressedFile(), - opt.AppendFile(), - ) - + if opt.AppendFile() { + flags |= os.O_APPEND + } + file, err := os.OpenFile(filename, flags, 0660) + if err != nil { log.Fatalf("open file error: %v", err) return obiiter.NilIBioSequence, err diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 786881c..d49b9bf 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -110,17 +110,17 @@ func OptionDontCloseFile() WithOption { return f } -func OptionsAppendFile() WithOption { +func OptionsAppendFile(append bool) WithOption { f := WithOption(func(opt Options) { - opt.pointer.appendfile = true + opt.pointer.appendfile = append }) return f } -func OptionsCompressed() WithOption { +func OptionsCompressed(compressed bool) WithOption { f := WithOption(func(opt Options) { - opt.pointer.compressed = true + opt.pointer.compressed = compressed }) return f diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index 835f14f..c3fcf9a 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -7,12 +7,11 @@ import ( log "github.com/sirupsen/logrus" - "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" ) func WriteSequence(iterator obiiter.IBioSequence, - file io.Writer, + file io.WriteCloser, options ...WithOption) (obiiter.IBioSequence, error) { iterator = iterator.Rebatch(1000) @@ -56,13 +55,13 @@ func WriteSequencesToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) (obiiter.IBioSequence, error) { - opt := MakeOptions(options) + flags := os.O_WRONLY | os.O_CREATE - file, err := goutils.OpenWritingFile(filename, - opt.CompressedFile(), - opt.AppendFile(), - ) + if opt.AppendFile() { + flags |= os.O_APPEND + } + file, err := os.OpenFile(filename, flags, 0660) if err != nil { log.Fatalf("open file error: %v", err) diff --git a/pkg/obitools/obiconvert/options.go b/pkg/obitools/obiconvert/options.go index 34c88fd..25a3e5a 100644 --- a/pkg/obitools/obiconvert/options.go +++ b/pkg/obitools/obiconvert/options.go @@ -25,6 +25,7 @@ var __output_fastobi_format__ = false var __output_solexa_quality__ = false var __no_progress_bar__ = false +var __compressed__ = false func InputOptionSet(options *getoptions.GetOpt) { // options.IntVar(&__skipped_entries__, "skip", __skipped_entries__, @@ -72,6 +73,10 @@ func OutputOptionSet(options *getoptions.GetOpt) { options.BoolVar(&__no_progress_bar__, "no-progressbar", false, options.Description("Disable the progress bar printing")) + options.BoolVar(&__compressed__, "--compress", false, + options.Alias("Z"), + options.Description("Output is compressed")) + } func OptionSet(options *getoptions.GetOpt) { @@ -110,6 +115,10 @@ func CLIOutputFormat() string { } } +func CLICompressed() bool { + return __compressed__ +} + func CLIInputFastHeaderFormat() string { switch { case __input_fastjson_format__: diff --git a/pkg/obitools/obiconvert/sequence_writer.go b/pkg/obitools/obiconvert/sequence_writer.go index fa13269..a4931c6 100644 --- a/pkg/obitools/obiconvert/sequence_writer.go +++ b/pkg/obitools/obiconvert/sequence_writer.go @@ -41,6 +41,8 @@ func CLIWriteBioSequences(iterator obiiter.IBioSequence, opts = append(opts, obiformats.OptionsQualityShift(CLIOutputQualityShift())) + opts = append(opts, obiformats.OptionsCompressed(CLICompressed())) + var err error if len(filenames) == 0 { diff --git a/pkg/obitools/obidistribute/distribute.go b/pkg/obitools/obidistribute/distribute.go index c64baef..88a77b9 100644 --- a/pkg/obitools/obidistribute/distribute.go +++ b/pkg/obitools/obidistribute/distribute.go @@ -30,19 +30,13 @@ func DistributeSequence(sequences obiiter.IBioSequence) { nworkers = 2 } - opts = append(opts, obiformats.OptionsParallelWorkers(nworkers)) - opts = append(opts, obiformats.OptionsBufferSize(obioptions.CLIBufferSize())) - opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) + opts = append(opts, obiformats.OptionsParallelWorkers(nworkers), + obiformats.OptionsBufferSize(obioptions.CLIBufferSize()), + obiformats.OptionsBatchSize(obioptions.CLIBatchSize()), + obiformats.OptionsQualityShift(obiconvert.CLIOutputQualityShift()), + obiformats.OptionsAppendFile(CLIAppendSequences()), + obiformats.OptionsCompressed(obiconvert.CLICompressed())) - opts = append(opts, obiformats.OptionsQualityShift(obiconvert.CLIOutputQualityShift())) - - if CLIAppendSequences() { - opts = append(opts, obiformats.OptionsAppendFile()) - } - - if CLICompressed() { - opts = append(opts, obiformats.OptionsCompressed()) - } var formater obiformats.SequenceBatchWriterToFile switch obiconvert.CLIOutputFormat() { diff --git a/pkg/obitools/obidistribute/options.go b/pkg/obitools/obidistribute/options.go index b1b4faa..0214b18 100644 --- a/pkg/obitools/obidistribute/options.go +++ b/pkg/obitools/obidistribute/options.go @@ -18,7 +18,6 @@ var _BatchCount = 0 var _HashSize = 0 var _NAValue = "NA" var _append = false -var _compressed = false func DistributeOptionSet(options *getoptions.GetOpt) { options.StringVar(&_FilenamePattern, "pattern", _FilenamePattern, @@ -52,10 +51,6 @@ func DistributeOptionSet(options *getoptions.GetOpt) { options.Alias("A"), options.Description("Indicates to append sequence to files if they already exist.")) - options.BoolVar(&_compressed, "--compress", false, - options.Alias("Z"), - options.Description("Output is compressed")) - options.IntVar(&_HashSize, "hash", 0, options.Alias("H"), options.Description("Indicates to split the input into at most batch based on a hash code of the seequence.")) @@ -71,9 +66,6 @@ func CLIAppendSequences() bool { return _append } -func CLICompressed() bool { - return _compressed -} func CLISequenceClassifier() *obiseq.BioSequenceClassifier { switch {