Add capacity to obidistribute to save gzipped files

This commit is contained in:
2023-02-17 12:52:19 +01:00
parent 6fd426d8be
commit 9554a32490
9 changed files with 124 additions and 30 deletions

69
pkg/goutils/gzipfile.go Normal file
View File

@ -0,0 +1,69 @@
package goutils
import (
"bufio"
"compress/gzip"
"os"
)
type Wfile struct {
compressed bool
f *os.File
gf *gzip.Writer
fw *bufio.Writer
}
func OpenWritingFile(name string, compressed bool, append bool) (*Wfile, error) {
flags := os.O_WRONLY | os.O_CREATE
if append {
flags |= os.O_APPEND
}
fi, err := os.OpenFile(name, flags, 0660)
if err != nil {
return nil, err
}
var gf *gzip.Writer
var fw *bufio.Writer
if compressed {
gf = gzip.NewWriter(fi)
fw = bufio.NewWriter(gf)
} else {
gf = nil
fw = bufio.NewWriter(fi)
}
return &Wfile{compressed: compressed,
f: fi,
gf: gf,
fw: fw,
}, nil
}
func (w *Wfile) Write(p []byte) (n int, err error) {
return w.fw.Write(p)
}
func (w *Wfile) WriteString(s string) (n int, err error) {
return w.fw.Write([]byte(s))
}
func (w *Wfile) Close() error {
var err error
err = nil
if w.compressed {
err = w.gf.Close()
}
err2 := w.f.Close()
if err == nil {
err = err2
}
return err
}

View File

@ -2,6 +2,7 @@ package obiformats
import ( import (
"fmt" "fmt"
"strings"
"sync" "sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -22,6 +23,7 @@ func WriterDispatcher(prototypename string,
jobDone.Add(1) jobDone.Add(1)
go func() { go func() {
opt := MakeOptions(options)
for newflux := range dispatcher.News() { for newflux := range dispatcher.News() {
jobDone.Add(1) jobDone.Add(1)
go func(newflux int) { go func(newflux int) {
@ -31,8 +33,13 @@ func WriterDispatcher(prototypename string,
log.Fatalf("Cannot retreive the new chanel : %v", err) log.Fatalf("Cannot retreive the new chanel : %v", err)
} }
name:=fmt.Sprintf(prototypename, dispatcher.Classifier().Value(newflux))
if opt.CompressedFile() && ! strings.HasSuffix(name,".gz") {
name = name + ".gz"
}
out, err := formater(data, out, err := formater(data,
fmt.Sprintf(prototypename, dispatcher.Classifier().Value(newflux)), name,
options...) options...)
if err != nil { if err != nil {

View File

@ -11,6 +11,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
) )
@ -138,6 +139,8 @@ func WriteFasta(iterator obiiter.IBioSequence,
switch file := file.(type) { switch file := file.(type) {
case *os.File: case *os.File:
file.Close() file.Close()
case *goutils.Wfile:
file.Close()
} }
} }
waitWriter.Done() waitWriter.Done()
@ -157,19 +160,13 @@ func WriteFastaToFile(iterator obiiter.IBioSequence,
filename string, filename string,
options ...WithOption) (obiiter.IBioSequence, error) { options ...WithOption) (obiiter.IBioSequence, error) {
var file *os.File
var err error
opt := MakeOptions(options) opt := MakeOptions(options)
if opt.AppendFile() { file,err := goutils.OpenWritingFile(filename,
log.Debug("Open files in appending mode") opt.CompressedFile(),
file, err = os.OpenFile(filename, opt.AppendFile(),
os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) )
} else {
file, err = os.Create(filename)
}
if err != nil { if err != nil {
log.Fatalf("open file error: %v", err) log.Fatalf("open file error: %v", err)

View File

@ -10,6 +10,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq"
) )
@ -128,6 +129,8 @@ func WriteFastq(iterator obiiter.IBioSequence,
switch file := file.(type) { switch file := file.(type) {
case *os.File: case *os.File:
file.Close() file.Close()
case *goutils.Wfile:
file.Close()
} }
} }
@ -147,18 +150,12 @@ func WriteFastqToFile(iterator obiiter.IBioSequence,
filename string, filename string,
options ...WithOption) (obiiter.IBioSequence, error) { options ...WithOption) (obiiter.IBioSequence, error) {
var file *os.File
var err error
opt := MakeOptions(options) opt := MakeOptions(options)
if opt.AppendFile() { file, err := goutils.OpenWritingFile(filename,
log.Debug("Open files in appending mode") opt.CompressedFile(),
file, err = os.OpenFile(filename, opt.AppendFile(),
os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) )
} else {
file, err = os.Create(filename)
}
if err != nil { if err != nil {
log.Fatalf("open file error: %v", err) log.Fatalf("open file error: %v", err)

View File

@ -14,6 +14,7 @@ type __options__ struct {
parallel_workers int parallel_workers int
closefile bool closefile bool
appendfile bool appendfile bool
compressed bool
} }
type Options struct { type Options struct {
@ -33,6 +34,7 @@ func MakeOptions(setters []WithOption) Options {
batch_size: 5000, batch_size: 5000,
closefile: false, closefile: false,
appendfile: false, appendfile: false,
compressed: false,
} }
opt := Options{&o} opt := Options{&o}
@ -80,6 +82,10 @@ func (opt Options) AppendFile() bool {
return opt.pointer.appendfile return opt.pointer.appendfile
} }
func (opt Options) CompressedFile() bool {
return opt.pointer.compressed
}
func OptionsBufferSize(size int) WithOption { func OptionsBufferSize(size int) WithOption {
f := WithOption(func(opt Options) { f := WithOption(func(opt Options) {
opt.pointer.buffer_size = size opt.pointer.buffer_size = size
@ -112,6 +118,14 @@ func OptionsAppendFile() WithOption {
return f return f
} }
func OptionsCompressed() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.compressed = true
})
return f
}
func OptionsNewFile() WithOption { func OptionsNewFile() WithOption {
f := WithOption(func(opt Options) { f := WithOption(func(opt Options) {
opt.pointer.appendfile = false opt.pointer.appendfile = false

View File

@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/goutils"
"git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter"
) )
@ -55,18 +56,13 @@ func WriteSequencesToFile(iterator obiiter.IBioSequence,
filename string, filename string,
options ...WithOption) (obiiter.IBioSequence, error) { options ...WithOption) (obiiter.IBioSequence, error) {
var file *os.File
var err error
opt := MakeOptions(options) opt := MakeOptions(options)
if opt.AppendFile() { file, err := goutils.OpenWritingFile(filename,
log.Debug("Open files in appending mode") opt.CompressedFile(),
file, err = os.OpenFile(filename, opt.AppendFile(),
os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) )
} else {
file, err = os.Create(filename)
}
if err != nil { if err != nil {
log.Fatalf("open file error: %v", err) log.Fatalf("open file error: %v", err)

View File

@ -71,6 +71,7 @@ func OutputOptionSet(options *getoptions.GetOpt) {
options.BoolVar(&__no_progress_bar__, "no-progressbar", false, options.BoolVar(&__no_progress_bar__, "no-progressbar", false,
options.Description("Disable the progress bar printing")) options.Description("Disable the progress bar printing"))
} }
func OptionSet(options *getoptions.GetOpt) { func OptionSet(options *getoptions.GetOpt) {

View File

@ -40,6 +40,9 @@ func DistributeSequence(sequences obiiter.IBioSequence) {
opts = append(opts, obiformats.OptionsAppendFile()) opts = append(opts, obiformats.OptionsAppendFile())
} }
if CLICompressed() {
opts = append(opts, obiformats.OptionsCompressed())
}
var formater obiformats.SequenceBatchWriterToFile var formater obiformats.SequenceBatchWriterToFile
switch obiconvert.CLIOutputFormat() { switch obiconvert.CLIOutputFormat() {

View File

@ -17,6 +17,8 @@ var _BatchCount = 0
var _HashSize = 0 var _HashSize = 0
var _NAValue = "NA" var _NAValue = "NA"
var _append = false var _append = false
var _compressed = false
func DistributeOptionSet(options *getoptions.GetOpt) { func DistributeOptionSet(options *getoptions.GetOpt) {
options.StringVar(&_FilenamePattern, "pattern", _FilenamePattern, options.StringVar(&_FilenamePattern, "pattern", _FilenamePattern,
@ -43,6 +45,10 @@ func DistributeOptionSet(options *getoptions.GetOpt) {
options.Alias("A"), options.Alias("A"),
options.Description("Indicates to append sequence to files if they already exist.")) options.Description("Indicates to append sequence to files if they already exist."))
options.BoolVar(&_compressed, "--compress", false,
options.Alias("Z"),
options.Description("Output is compressed"))
options.IntVar(&_HashSize, "hash", 0, options.IntVar(&_HashSize, "hash", 0,
options.Alias("H"), options.Alias("H"),
options.Description("Indicates to split the input into at most <n> batch based on a hash code of the seequence.")) options.Description("Indicates to split the input into at most <n> batch based on a hash code of the seequence."))
@ -58,6 +64,10 @@ func CLIAppendSequences() bool {
return _append return _append
} }
func CLICompressed() bool {
return _compressed
}
func CLISequenceClassifier() *obiseq.BioSequenceClassifier { func CLISequenceClassifier() *obiseq.BioSequenceClassifier {
switch { switch {
case _SequenceClassifierTag != "": case _SequenceClassifierTag != "":