diff --git a/.gitignore b/.gitignore index 739a66d..c7715b4 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ vendor /obipairing /obipcr /obifind +/obidistribute diff --git a/pkg/obiformats/dispatcher.go b/pkg/obiformats/dispatcher.go index a47d4ee..1ebd038 100644 --- a/pkg/obiformats/dispatcher.go +++ b/pkg/obiformats/dispatcher.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "sync" + "sync/atomic" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" ) @@ -21,26 +22,25 @@ func WriterDispatcher(prototypename string, jobDone.Add(1) go func() { - n := 0 + n := int32(0) for newflux := range dispatcher.News() { - data, _ := dispatcher.Outputs(newflux) - out, err := formater(data, - fmt.Sprintf(prototypename, newflux), - options...) - if err != nil { - log.Fatalf("cannot open the output file for key %s", newflux) - } + go func(newflux string) { + data, _ := dispatcher.Outputs(newflux) + out, err := formater(data, + fmt.Sprintf(prototypename, newflux), + options...) + if err != nil { + log.Fatalf("cannot open the output file for key %s", newflux) + } - n++ + atomic.AddInt32(&n, 1) - if n > 1 { - jobDone.Add(1) - } - - go func() { + if atomic.LoadInt32(&n) > 1 { + jobDone.Add(1) + } out.Recycle() jobDone.Done() - }() + }(newflux) } }() diff --git a/pkg/obitools/obidistribute/options.go b/pkg/obitools/obidistribute/options.go index 96100cd..64d20e2 100644 --- a/pkg/obitools/obidistribute/options.go +++ b/pkg/obitools/obidistribute/options.go @@ -13,20 +13,29 @@ import ( var _FilenamePattern = "" var _SequenceClassifierTag = "" var _BatchCount = 0 +var _HashSize = 0 func DistributeOptionSet(options *getoptions.GetOpt) { options.StringVar(&_FilenamePattern, "pattern", _FilenamePattern, options.Alias("p"), options.Required("You must provide at pattern for the file names "), - options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) + options.Description("The template used to build the names of the output files. "+ + "The variable part is represented by '%s'. "+ + "Example : toto_%s.fastq.")) options.StringVar(&_SequenceClassifierTag, "classifier", _SequenceClassifierTag, options.Alias("c"), - options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) + options.Description("The name of a tag annotating thes sequences. "+ + "The name must corresponds to a string, a integer or a boolean value. "+ + "That value will be used to dispatch sequences amoong the different files")) - options.IntVar(&_BatchCount, "batch", 0, + options.IntVar(&_BatchCount, "batches", 0, options.Alias("n"), - options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) + options.Description("Indicates in how many batches the input file must bee splitted.")) + + options.IntVar(&_HashSize, "hash", 0, + options.Alias("H"), + options.Description("Indicates to split the input into at most batch based on a hash code of the seequence.")) } func OptionSet(options *getoptions.GetOpt) { @@ -41,10 +50,11 @@ func CLISequenceClassifier() obiseq.SequenceClassifier { return obiseq.AnnotationClassifier(_SequenceClassifierTag) case _BatchCount > 0: return obiseq.RotateClassifier(_BatchCount) - + case _HashSize > 0: + return obiseq.HashClassifier(_HashSize) } - log.Fatal("one of the options --classifier or --batch must be specified") + log.Fatal("one of the options --classifier, -- hash or --batch must be specified") return nil }