diff --git a/pkg/goutils/gzipfile.go b/pkg/goutils/gzipfile.go new file mode 100644 index 0000000..317e1c9 --- /dev/null +++ b/pkg/goutils/gzipfile.go @@ -0,0 +1,69 @@ +package goutils + +import ( + "bufio" + "compress/gzip" + "os" +) + +type Wfile struct { + compressed bool + f *os.File + gf *gzip.Writer + fw *bufio.Writer +} + +func OpenWritingFile(name string, compressed bool, append bool) (*Wfile, error) { + + flags := os.O_WRONLY | os.O_CREATE + + if append { + flags |= os.O_APPEND + } + fi, err := os.OpenFile(name, flags, 0660) + if err != nil { + return nil, err + } + + var gf *gzip.Writer + var fw *bufio.Writer + + if compressed { + gf = gzip.NewWriter(fi) + fw = bufio.NewWriter(gf) + } else { + gf = nil + fw = bufio.NewWriter(fi) + } + + return &Wfile{compressed: compressed, + f: fi, + gf: gf, + fw: fw, + }, nil +} + +func (w *Wfile) Write(p []byte) (n int, err error) { + return w.fw.Write(p) +} + +func (w *Wfile) WriteString(s string) (n int, err error) { + return w.fw.Write([]byte(s)) +} + +func (w *Wfile) Close() error { + var err error + err = nil + + if w.compressed { + err = w.gf.Close() + } + + err2 := w.f.Close() + + if err == nil { + err = err2 + } + + return err +} diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go index f3df1ac..a6a36d4 100644 --- a/pkg/obiformats/dispatcher.go +++ b/pkg/obiformats/dispatcher.go @@ -2,6 +2,7 @@ package obiformats import ( "fmt" + "strings" "sync" log "github.com/sirupsen/logrus" @@ -22,6 +23,7 @@ func WriterDispatcher(prototypename string, jobDone.Add(1) go func() { + opt := MakeOptions(options) for newflux := range dispatcher.News() { jobDone.Add(1) go func(newflux int) { @@ -31,8 +33,13 @@ func WriterDispatcher(prototypename string, log.Fatalf("Cannot retreive the new chanel : %v", err) } + name:=fmt.Sprintf(prototypename, dispatcher.Classifier().Value(newflux)) + if opt.CompressedFile() && ! strings.HasSuffix(name,".gz") { + name = name + ".gz" + } + out, err := formater(data, - fmt.Sprintf(prototypename, dispatcher.Classifier().Value(newflux)), + name, options...) if err != nil { diff --git a/pkg/obiformats/fastseq_write_fasta.go b/pkg/obiformats/fastseq_write_fasta.go index c61714d..ac4b9bd 100644 --- a/pkg/obiformats/fastseq_write_fasta.go +++ b/pkg/obiformats/fastseq_write_fasta.go @@ -11,6 +11,7 @@ import ( log "github.com/sirupsen/logrus" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -138,6 +139,8 @@ func WriteFasta(iterator obiiter.IBioSequence, switch file := file.(type) { case *os.File: file.Close() + case *goutils.Wfile: + file.Close() } } waitWriter.Done() @@ -157,19 +160,13 @@ func WriteFastaToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) (obiiter.IBioSequence, error) { - var file *os.File - var err error opt := MakeOptions(options) - if opt.AppendFile() { - log.Debug("Open files in appending mode") - file, err = os.OpenFile(filename, - os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - } else { - file, err = os.Create(filename) - } - + file,err := goutils.OpenWritingFile(filename, + opt.CompressedFile(), + opt.AppendFile(), + ) if err != nil { log.Fatalf("open file error: %v", err) diff --git a/pkg/obiformats/fastseq_write_fastq.go b/pkg/obiformats/fastseq_write_fastq.go index 273435a..67b877f 100644 --- a/pkg/obiformats/fastseq_write_fastq.go +++ b/pkg/obiformats/fastseq_write_fastq.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -128,6 +129,8 @@ func WriteFastq(iterator obiiter.IBioSequence, switch file := file.(type) { case *os.File: file.Close() + case *goutils.Wfile: + file.Close() } } @@ -147,18 +150,12 @@ func WriteFastqToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) (obiiter.IBioSequence, error) { - var file *os.File - var err error - opt := MakeOptions(options) - if opt.AppendFile() { - log.Debug("Open files in appending mode") - file, err = os.OpenFile(filename, - os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - } else { - file, err = os.Create(filename) - } + file, err := goutils.OpenWritingFile(filename, + opt.CompressedFile(), + opt.AppendFile(), + ) if err != nil { log.Fatalf("open file error: %v", err) diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index a6e0346..786881c 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -14,6 +14,7 @@ type __options__ struct { parallel_workers int closefile bool appendfile bool + compressed bool } type Options struct { @@ -33,6 +34,7 @@ func MakeOptions(setters []WithOption) Options { batch_size: 5000, closefile: false, appendfile: false, + compressed: false, } opt := Options{&o} @@ -80,6 +82,10 @@ func (opt Options) AppendFile() bool { return opt.pointer.appendfile } +func (opt Options) CompressedFile() bool { + return opt.pointer.compressed +} + func OptionsBufferSize(size int) WithOption { f := WithOption(func(opt Options) { opt.pointer.buffer_size = size @@ -112,6 +118,14 @@ func OptionsAppendFile() WithOption { return f } +func OptionsCompressed() WithOption { + f := WithOption(func(opt Options) { + opt.pointer.compressed = true + }) + + return f +} + func OptionsNewFile() WithOption { f := WithOption(func(opt Options) { opt.pointer.appendfile = false diff --git a/pkg/obiformats/universal_write.go b/pkg/obiformats/universal_write.go index d875ebf..835f14f 100644 --- a/pkg/obiformats/universal_write.go +++ b/pkg/obiformats/universal_write.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" ) @@ -55,18 +56,13 @@ func WriteSequencesToFile(iterator obiiter.IBioSequence, filename string, options ...WithOption) (obiiter.IBioSequence, error) { - var file *os.File - var err error opt := MakeOptions(options) - if opt.AppendFile() { - log.Debug("Open files in appending mode") - file, err = os.OpenFile(filename, - os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - } else { - file, err = os.Create(filename) - } + file, err := goutils.OpenWritingFile(filename, + opt.CompressedFile(), + opt.AppendFile(), + ) if err != nil { log.Fatalf("open file error: %v", err) diff --git a/pkg/obitools/obiconvert/options.go b/pkg/obitools/obiconvert/options.go index 85faacd..34c88fd 100644 --- a/pkg/obitools/obiconvert/options.go +++ b/pkg/obitools/obiconvert/options.go @@ -71,6 +71,7 @@ func OutputOptionSet(options *getoptions.GetOpt) { options.BoolVar(&__no_progress_bar__, "no-progressbar", false, options.Description("Disable the progress bar printing")) + } func OptionSet(options *getoptions.GetOpt) { diff --git a/pkg/obitools/obidistribute/distribute.go b/pkg/obitools/obidistribute/distribute.go index b56d21c..c64baef 100644 --- a/pkg/obitools/obidistribute/distribute.go +++ b/pkg/obitools/obidistribute/distribute.go @@ -40,6 +40,9 @@ func DistributeSequence(sequences obiiter.IBioSequence) { opts = append(opts, obiformats.OptionsAppendFile()) } + if CLICompressed() { + opts = append(opts, obiformats.OptionsCompressed()) + } var formater obiformats.SequenceBatchWriterToFile switch obiconvert.CLIOutputFormat() { diff --git a/pkg/obitools/obidistribute/options.go b/pkg/obitools/obidistribute/options.go index 8fd873e..acd4be9 100644 --- a/pkg/obitools/obidistribute/options.go +++ b/pkg/obitools/obidistribute/options.go @@ -17,6 +17,8 @@ var _BatchCount = 0 var _HashSize = 0 var _NAValue = "NA" var _append = false +var _compressed = false + func DistributeOptionSet(options *getoptions.GetOpt) { options.StringVar(&_FilenamePattern, "pattern", _FilenamePattern, @@ -43,6 +45,10 @@ func DistributeOptionSet(options *getoptions.GetOpt) { options.Alias("A"), options.Description("Indicates to append sequence to files if they already exist.")) + options.BoolVar(&_compressed, "--compress", false, + options.Alias("Z"), + options.Description("Output is compressed")) + options.IntVar(&_HashSize, "hash", 0, options.Alias("H"), options.Description("Indicates to split the input into at most batch based on a hash code of the seequence.")) @@ -58,6 +64,10 @@ func CLIAppendSequences() bool { return _append } +func CLICompressed() bool { + return _compressed +} + func CLISequenceClassifier() *obiseq.BioSequenceClassifier { switch { case _SequenceClassifierTag != "":