mirror of
https://github.com/metabarcoding/obitools4.git
synced 2025-06-29 16:20:46 +00:00
Merge branch 'master' into taxonomy
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@ -127,3 +127,4 @@ ncbitaxo/merged.dmp
|
|||||||
ncbitaxo/names.dmp
|
ncbitaxo/names.dmp
|
||||||
ncbitaxo/nodes.dmp
|
ncbitaxo/nodes.dmp
|
||||||
ncbitaxo/readme.txt
|
ncbitaxo/readme.txt
|
||||||
|
template.16S
|
||||||
|
@ -2,13 +2,27 @@
|
|||||||
|
|
||||||
## Latest changes
|
## Latest changes
|
||||||
|
|
||||||
|
### Bug fixes
|
||||||
|
|
||||||
|
- In `obipairing`, correct the stats `seq_a_single` and `seq_b_single` when
|
||||||
|
on right alignment mode
|
||||||
|
|
||||||
|
- Not really a bug but the memory impact of `obiuniq` has been reduced by reducing
|
||||||
|
the batch size and not reading the qualities from the fastq files as `obiuniq`
|
||||||
|
is producing only fasta output without qualities.
|
||||||
|
|
||||||
### New features
|
### New features
|
||||||
|
|
||||||
- Most of the time obitools identify automatically sequence file format. But
|
- Most of the time obitools identify automatically sequence file format. But
|
||||||
it fails sometimes. Two new option **--fasta** and **--fastq** are added to
|
it fails sometimes. Two new option **--fasta** and **--fastq** are added to
|
||||||
allow the processing of the rare fasta and fastq files not recognized.
|
allow the processing of the rare fasta and fastq files not recognized.
|
||||||
|
|
||||||
## August 2nd, 2024. Release 4.3.0
|
- In `obiscript`, adds new methods to the Lua sequence object:
|
||||||
|
- `md5_string()`: returning the MD5 check sum as an hexadecimal string,
|
||||||
|
- `subsequence(from,to)`: allows to extract a subsequence on a 0 based
|
||||||
|
coordinate system, upper bound expluded like in go.
|
||||||
|
- `reverse_complement`: returning a sequence object corresponding to the reverse complement
|
||||||
|
of the current sequence.
|
||||||
|
|
||||||
### Change of git repositiory
|
### Change of git repositiory
|
||||||
|
|
||||||
|
@ -30,6 +30,8 @@ func main() {
|
|||||||
// trace.Start(ftrace)
|
// trace.Start(ftrace)
|
||||||
// defer trace.Stop()
|
// defer trace.Stop()
|
||||||
|
|
||||||
|
obioptions.SetBatchSize(10)
|
||||||
|
obioptions.SetReadQualities(false)
|
||||||
optionParser := obioptions.GenerateOptionParser(obiuniq.OptionSet)
|
optionParser := obioptions.GenerateOptionParser(obiuniq.OptionSet)
|
||||||
|
|
||||||
_, args := optionParser(os.Args)
|
_, args := optionParser(os.Args)
|
||||||
|
@ -485,7 +485,8 @@ func PECenterAlign(seqA, seqB *obiseq.BioSequence, gap, scale float64,
|
|||||||
|
|
||||||
func PEAlign(seqA, seqB *obiseq.BioSequence,
|
func PEAlign(seqA, seqB *obiseq.BioSequence,
|
||||||
gap, scale float64, fastAlign bool, delta int, fastScoreRel bool,
|
gap, scale float64, fastAlign bool, delta int, fastScoreRel bool,
|
||||||
arena PEAlignArena, shift_buff *map[int]int) (int, []int, int, int, float64) {
|
arena PEAlignArena, shift_buff *map[int]int) (bool, int, []int, int, int, float64) {
|
||||||
|
var isLeftAlign bool
|
||||||
var score, shift int
|
var score, shift int
|
||||||
var startA, startB int
|
var startA, startB int
|
||||||
var partLen, over int
|
var partLen, over int
|
||||||
@ -536,6 +537,7 @@ func PEAlign(seqA, seqB *obiseq.BioSequence,
|
|||||||
rawSeqB = seqB.Sequence()[0:partLen]
|
rawSeqB = seqB.Sequence()[0:partLen]
|
||||||
qualSeqB = seqB.Qualities()[0:partLen]
|
qualSeqB = seqB.Qualities()[0:partLen]
|
||||||
extra3 = seqB.Len() - partLen
|
extra3 = seqB.Len() - partLen
|
||||||
|
isLeftAlign = true
|
||||||
score = _FillMatrixPeLeftAlign(
|
score = _FillMatrixPeLeftAlign(
|
||||||
rawSeqA, qualSeqA, rawSeqB, qualSeqB, gap, scale,
|
rawSeqA, qualSeqA, rawSeqB, qualSeqB, gap, scale,
|
||||||
&arena.pointer.scoreMatrix,
|
&arena.pointer.scoreMatrix,
|
||||||
@ -557,7 +559,7 @@ func PEAlign(seqA, seqB *obiseq.BioSequence,
|
|||||||
rawSeqA = seqA.Sequence()[:partLen]
|
rawSeqA = seqA.Sequence()[:partLen]
|
||||||
qualSeqA = seqA.Qualities()[:partLen]
|
qualSeqA = seqA.Qualities()[:partLen]
|
||||||
extra3 = partLen - seqA.Len()
|
extra3 = partLen - seqA.Len()
|
||||||
|
isLeftAlign = false
|
||||||
score = _FillMatrixPeRightAlign(
|
score = _FillMatrixPeRightAlign(
|
||||||
rawSeqA, qualSeqA, rawSeqB, qualSeqB, gap, scale,
|
rawSeqA, qualSeqA, rawSeqB, qualSeqB, gap, scale,
|
||||||
&arena.pointer.scoreMatrix,
|
&arena.pointer.scoreMatrix,
|
||||||
@ -581,6 +583,7 @@ func PEAlign(seqA, seqB *obiseq.BioSequence,
|
|||||||
qualSeqB = seqB.Qualities()[0:partLen]
|
qualSeqB = seqB.Qualities()[0:partLen]
|
||||||
extra3 = seqB.Len() - partLen
|
extra3 = seqB.Len() - partLen
|
||||||
score = 0
|
score = 0
|
||||||
|
isLeftAlign = true
|
||||||
} else {
|
} else {
|
||||||
startA = 0
|
startA = 0
|
||||||
startB = -shift
|
startB = -shift
|
||||||
@ -589,6 +592,7 @@ func PEAlign(seqA, seqB *obiseq.BioSequence,
|
|||||||
partLen = len(qualSeqB)
|
partLen = len(qualSeqB)
|
||||||
extra3 = partLen - seqA.Len()
|
extra3 = partLen - seqA.Len()
|
||||||
qualSeqA = seqA.Qualities()[:partLen]
|
qualSeqA = seqA.Qualities()[:partLen]
|
||||||
|
isLeftAlign = false
|
||||||
}
|
}
|
||||||
score = 0
|
score = 0
|
||||||
for i, qualA := range qualSeqA {
|
for i, qualA := range qualSeqA {
|
||||||
@ -625,6 +629,8 @@ func PEAlign(seqA, seqB *obiseq.BioSequence,
|
|||||||
len(rawSeqA), len(rawSeqB),
|
len(rawSeqA), len(rawSeqB),
|
||||||
&(arena.pointer.path))
|
&(arena.pointer.path))
|
||||||
|
|
||||||
|
isLeftAlign = false
|
||||||
|
|
||||||
scoreL := _FillMatrixPeLeftAlign(
|
scoreL := _FillMatrixPeLeftAlign(
|
||||||
rawSeqA, qualSeqA, rawSeqB, qualSeqB, gap, scale,
|
rawSeqA, qualSeqA, rawSeqB, qualSeqB, gap, scale,
|
||||||
&arena.pointer.scoreMatrix,
|
&arena.pointer.scoreMatrix,
|
||||||
@ -634,9 +640,10 @@ func PEAlign(seqA, seqB *obiseq.BioSequence,
|
|||||||
path = _Backtracking(arena.pointer.pathMatrix,
|
path = _Backtracking(arena.pointer.pathMatrix,
|
||||||
len(rawSeqA), len(rawSeqB),
|
len(rawSeqA), len(rawSeqB),
|
||||||
&(arena.pointer.path))
|
&(arena.pointer.path))
|
||||||
|
isLeftAlign = true
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return score, path, fastCount, over, fastScore
|
return isLeftAlign, score, path, fastCount, over, fastScore
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,13 @@ import (
|
|||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// tempDir creates a temporary directory with a prefix "obiseq_chunks_"
|
||||||
|
// in the system's temporary directory. It returns the path of the
|
||||||
|
// created directory and any error encountered during the creation process.
|
||||||
|
//
|
||||||
|
// If the directory creation is successful, the path to the new
|
||||||
|
// temporary directory is returned. If there is an error, it returns
|
||||||
|
// an empty string and the error encountered.
|
||||||
func tempDir() (string, error) {
|
func tempDir() (string, error) {
|
||||||
dir, err := os.MkdirTemp(os.TempDir(), "obiseq_chunks_")
|
dir, err := os.MkdirTemp(os.TempDir(), "obiseq_chunks_")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -20,6 +27,19 @@ func tempDir() (string, error) {
|
|||||||
return dir, nil
|
return dir, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// find searches for files with a specific extension in the given root directory
|
||||||
|
// and its subdirectories. It returns a slice of strings containing the paths
|
||||||
|
// of the found files.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - root: The root directory to start the search from.
|
||||||
|
// - ext: The file extension to look for (including the leading dot, e.g., ".txt").
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// A slice of strings containing the paths of files that match the specified
|
||||||
|
// extension. If no files are found, an empty slice is returned. Any errors
|
||||||
|
// encountered during the directory traversal will be returned as part of the
|
||||||
|
// WalkDir function's error handling.
|
||||||
func find(root, ext string) []string {
|
func find(root, ext string) []string {
|
||||||
var a []string
|
var a []string
|
||||||
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
|
filepath.WalkDir(root, func(s string, d fs.DirEntry, e error) error {
|
||||||
@ -34,6 +54,24 @@ func find(root, ext string) []string {
|
|||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ISequenceChunkOnDisk processes a sequence iterator by distributing the sequences
|
||||||
|
// into chunks stored on disk. It uses a classifier to determine how to distribute
|
||||||
|
// the sequences and returns a new iterator for the processed sequences.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - iterator: An iterator of biosequences to be processed.
|
||||||
|
// - classifier: A pointer to a BioSequenceClassifier used to classify the sequences
|
||||||
|
// during distribution.
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// An iterator of biosequences representing the processed chunks. If an error occurs
|
||||||
|
// during the creation of the temporary directory or any other operation, it returns
|
||||||
|
// an error along with a nil iterator.
|
||||||
|
//
|
||||||
|
// The function operates asynchronously, creating a temporary directory to store
|
||||||
|
// the sequence chunks. Once the processing is complete, the temporary directory
|
||||||
|
// is removed. The function logs the number of batches created and the processing
|
||||||
|
// status of each batch.
|
||||||
func ISequenceChunkOnDisk(iterator obiiter.IBioSequence,
|
func ISequenceChunkOnDisk(iterator obiiter.IBioSequence,
|
||||||
classifier *obiseq.BioSequenceClassifier) (obiiter.IBioSequence, error) {
|
classifier *obiseq.BioSequenceClassifier) (obiiter.IBioSequence, error) {
|
||||||
dir, err := tempDir()
|
dir, err := tempDir()
|
||||||
|
@ -14,10 +14,40 @@ import (
|
|||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SequenceBatchWriterToFile is a function type that defines a method for writing
|
||||||
|
// a batch of biosequences to a specified file. It takes an iterator of biosequences,
|
||||||
|
// a filename, and optional configuration options, and returns an iterator of biosequences
|
||||||
|
// along with any error encountered during the writing process.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - iterator: An iterator of biosequences to be written to the file.
|
||||||
|
// - filename: The name of the file where the sequences will be written.
|
||||||
|
// - options: Optional configuration options for the writing process.
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// An iterator of biosequences that may have been modified during the writing process
|
||||||
|
// and an error if the writing operation fails.
|
||||||
type SequenceBatchWriterToFile func(iterator obiiter.IBioSequence,
|
type SequenceBatchWriterToFile func(iterator obiiter.IBioSequence,
|
||||||
filename string,
|
filename string,
|
||||||
options ...WithOption) (obiiter.IBioSequence, error)
|
options ...WithOption) (obiiter.IBioSequence, error)
|
||||||
|
|
||||||
|
// WriterDispatcher manages the writing of data to files based on a given
|
||||||
|
// prototype name and a dispatcher for distributing the sequences. It
|
||||||
|
// processes incoming data from the dispatcher in separate goroutines,
|
||||||
|
// formatting and writing the data to files as specified.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - prototypename: A string that serves as a template for naming the output files.
|
||||||
|
// - dispatcher: An instance of IDistribute that provides the data to be written
|
||||||
|
// and manages the distribution of sequences.
|
||||||
|
// - formater: A function of type SequenceBatchWriterToFile that formats and writes
|
||||||
|
// the sequences to the specified file.
|
||||||
|
// - options: Optional configuration options for the writing process.
|
||||||
|
//
|
||||||
|
// The function operates asynchronously, launching goroutines for each new data
|
||||||
|
// channel received from the dispatcher. It ensures that directories are created
|
||||||
|
// as needed and handles errors during the writing process. The function blocks
|
||||||
|
// until all writing jobs are completed.
|
||||||
func WriterDispatcher(prototypename string,
|
func WriterDispatcher(prototypename string,
|
||||||
dispatcher obiiter.IDistribute,
|
dispatcher obiiter.IDistribute,
|
||||||
formater SequenceBatchWriterToFile,
|
formater SequenceBatchWriterToFile,
|
||||||
@ -34,7 +64,7 @@ func WriterDispatcher(prototypename string,
|
|||||||
data, err := dispatcher.Outputs(newflux)
|
data, err := dispatcher.Outputs(newflux)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Cannot retreive the new chanel : %v", err)
|
log.Fatalf("Cannot retrieve the new channel: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
key := dispatcher.Classifier().Value(newflux)
|
key := dispatcher.Classifier().Value(newflux)
|
||||||
@ -58,7 +88,7 @@ func WriterDispatcher(prototypename string,
|
|||||||
info, err := os.Stat(directory)
|
info, err := os.Stat(directory)
|
||||||
switch {
|
switch {
|
||||||
case !os.IsNotExist(err) && !info.IsDir():
|
case !os.IsNotExist(err) && !info.IsDir():
|
||||||
log.Fatalf("Cannot Create the directory %s", directory)
|
log.Fatalf("Cannot create the directory %s", directory)
|
||||||
case os.IsNotExist(err):
|
case os.IsNotExist(err):
|
||||||
os.Mkdir(directory, 0755)
|
os.Mkdir(directory, 0755)
|
||||||
}
|
}
|
||||||
@ -71,7 +101,7 @@ func WriterDispatcher(prototypename string,
|
|||||||
options...)
|
options...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("cannot open the output file for key %s",
|
log.Fatalf("Cannot open the output file for key %s",
|
||||||
dispatcher.Classifier().Value(newflux))
|
dispatcher.Classifier().Value(newflux))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ func _storeSequenceQuality(bytes *bytes.Buffer, out *obiseq.BioSequence, quality
|
|||||||
out.SetQualities(q)
|
out.SetQualities(q)
|
||||||
}
|
}
|
||||||
|
|
||||||
func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
|
func FastqChunkParser(quality_shift byte, with_quality bool) func(string, io.Reader) (obiseq.BioSequenceSlice, error) {
|
||||||
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
|
parser := func(source string, input io.Reader) (obiseq.BioSequenceSlice, error) {
|
||||||
|
|
||||||
var identifier string
|
var identifier string
|
||||||
@ -263,7 +263,9 @@ func FastqChunkParser(quality_shift byte) func(string, io.Reader) (obiseq.BioSeq
|
|||||||
}
|
}
|
||||||
case 10:
|
case 10:
|
||||||
if is_end_of_line {
|
if is_end_of_line {
|
||||||
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
|
if with_quality {
|
||||||
|
_storeSequenceQuality(&qualBytes, sequences[len(sequences)-1], quality_shift)
|
||||||
|
}
|
||||||
state = 11
|
state = 11
|
||||||
} else {
|
} else {
|
||||||
qualBytes.WriteByte(C)
|
qualBytes.WriteByte(C)
|
||||||
@ -299,9 +301,10 @@ func _ParseFastqFile(
|
|||||||
input ChannelFileChunk,
|
input ChannelFileChunk,
|
||||||
out obiiter.IBioSequence,
|
out obiiter.IBioSequence,
|
||||||
quality_shift byte,
|
quality_shift byte,
|
||||||
|
with_quality bool,
|
||||||
) {
|
) {
|
||||||
|
|
||||||
parser := FastqChunkParser(quality_shift)
|
parser := FastqChunkParser(quality_shift, with_quality)
|
||||||
|
|
||||||
for chunks := range input {
|
for chunks := range input {
|
||||||
sequences, err := parser(chunks.Source, chunks.Raw)
|
sequences, err := parser(chunks.Source, chunks.Raw)
|
||||||
@ -339,6 +342,7 @@ func ReadFastq(reader io.Reader, options ...WithOption) (obiiter.IBioSequence, e
|
|||||||
chkchan,
|
chkchan,
|
||||||
out,
|
out,
|
||||||
byte(obioptions.InputQualityShift()),
|
byte(obioptions.InputQualityShift()),
|
||||||
|
opt.ReadQualities(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ type __options__ struct {
|
|||||||
appendfile bool
|
appendfile bool
|
||||||
compressed bool
|
compressed bool
|
||||||
skip_empty bool
|
skip_empty bool
|
||||||
|
with_quality bool
|
||||||
csv_id bool
|
csv_id bool
|
||||||
csv_sequence bool
|
csv_sequence bool
|
||||||
csv_quality bool
|
csv_quality bool
|
||||||
@ -57,6 +58,7 @@ func MakeOptions(setters []WithOption) Options {
|
|||||||
appendfile: false,
|
appendfile: false,
|
||||||
compressed: false,
|
compressed: false,
|
||||||
skip_empty: false,
|
skip_empty: false,
|
||||||
|
with_quality: true,
|
||||||
csv_id: true,
|
csv_id: true,
|
||||||
csv_definition: false,
|
csv_definition: false,
|
||||||
csv_count: false,
|
csv_count: false,
|
||||||
@ -133,6 +135,10 @@ func (opt Options) SkipEmptySequence() bool {
|
|||||||
return opt.pointer.skip_empty
|
return opt.pointer.skip_empty
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (opt Options) ReadQualities() bool {
|
||||||
|
return opt.pointer.with_quality
|
||||||
|
}
|
||||||
|
|
||||||
func (opt Options) CSVId() bool {
|
func (opt Options) CSVId() bool {
|
||||||
return opt.pointer.csv_id
|
return opt.pointer.csv_id
|
||||||
}
|
}
|
||||||
@ -241,6 +247,14 @@ func OptionsSkipEmptySequence(skip bool) WithOption {
|
|||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func OptionsReadQualities(read bool) WithOption {
|
||||||
|
f := WithOption(func(opt Options) {
|
||||||
|
opt.pointer.with_quality = read
|
||||||
|
})
|
||||||
|
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
||||||
func OptionsNewFile() WithOption {
|
func OptionsNewFile() WithOption {
|
||||||
f := WithOption(func(opt Options) {
|
f := WithOption(func(opt Options) {
|
||||||
opt.pointer.appendfile = false
|
opt.pointer.appendfile = false
|
||||||
|
@ -4,9 +4,25 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// IDistribute represents a distribution mechanism for biosequences.
|
||||||
|
// It manages the outputs of biosequences, provides a channel for
|
||||||
|
// new data notifications, and maintains a classifier for sequence
|
||||||
|
// classification. It is designed to facilitate the distribution
|
||||||
|
// of biosequences to various processing components.
|
||||||
|
//
|
||||||
|
// Fields:
|
||||||
|
// - outputs: A map that associates integer keys with corresponding
|
||||||
|
// biosequence outputs (IBioSequence).
|
||||||
|
// - news: A channel that sends notifications of new data available
|
||||||
|
// for processing, represented by integer identifiers.
|
||||||
|
// - classifier: A pointer to a BioSequenceClassifier used to classify
|
||||||
|
// the biosequences during distribution.
|
||||||
|
// - lock: A mutex for synchronizing access to the outputs and other
|
||||||
|
// shared resources to ensure thread safety.
|
||||||
type IDistribute struct {
|
type IDistribute struct {
|
||||||
outputs map[int]IBioSequence
|
outputs map[int]IBioSequence
|
||||||
news chan int
|
news chan int
|
||||||
@ -26,16 +42,39 @@ func (dist *IDistribute) Outputs(key int) (IBioSequence, error) {
|
|||||||
return iter, nil
|
return iter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// News returns a channel that provides notifications of new data
|
||||||
|
// available for processing. The channel sends integer identifiers
|
||||||
|
// representing the new data.
|
||||||
func (dist *IDistribute) News() chan int {
|
func (dist *IDistribute) News() chan int {
|
||||||
return dist.news
|
return dist.news
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Classifier returns a pointer to the BioSequenceClassifier
|
||||||
|
// associated with the distribution mechanism. This classifier
|
||||||
|
// is used to classify biosequences during the distribution process.
|
||||||
func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier {
|
func (dist *IDistribute) Classifier() *obiseq.BioSequenceClassifier {
|
||||||
return dist.classifier
|
return dist.classifier
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Distribute organizes the biosequences from the iterator into batches
|
||||||
|
// based on the provided classifier and batch sizes. It returns an
|
||||||
|
// IDistribute instance that manages the distribution of the sequences.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - class: A pointer to a BioSequenceClassifier used to classify
|
||||||
|
// the biosequences during distribution.
|
||||||
|
// - sizes: Optional integer values specifying the batch size. If
|
||||||
|
// no sizes are provided, a default batch size of 5000 is used.
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// An IDistribute instance that contains the outputs of the
|
||||||
|
// classified biosequences, a channel for new data notifications,
|
||||||
|
// and the classifier used for distribution. The method operates
|
||||||
|
// asynchronously, processing the sequences in separate goroutines.
|
||||||
|
// It ensures that the outputs are closed and cleaned up once
|
||||||
|
// processing is complete.
|
||||||
func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute {
|
func (iterator IBioSequence) Distribute(class *obiseq.BioSequenceClassifier, sizes ...int) IDistribute {
|
||||||
batchsize := 5000
|
batchsize := obioptions.CLIBatchSize()
|
||||||
|
|
||||||
outputs := make(map[int]IBioSequence, 100)
|
outputs := make(map[int]IBioSequence, 100)
|
||||||
slices := make(map[int]*obiseq.BioSequenceSlice, 100)
|
slices := make(map[int]*obiseq.BioSequenceSlice, 100)
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
|
||||||
@ -119,11 +120,17 @@ func LuaWorker(proto *lua.FunctionProto) obiseq.SeqWorker {
|
|||||||
case *obiseq.BioSequenceSlice:
|
case *obiseq.BioSequenceSlice:
|
||||||
return *val, err
|
return *val, err
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("worker function doesn't return the correct type %T", val)
|
r := reflect.TypeOf(val)
|
||||||
|
return nil, fmt.Errorf("worker function doesn't return the correct type %s", r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("worker function doesn't return the correct type")
|
// If worker retuns nothing then it is considered as nil biosequence
|
||||||
|
if _, ok = lreponse.(*lua.LNilType); ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("worker function doesn't return the correct type %T", lreponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
return f
|
return f
|
||||||
|
@ -46,6 +46,8 @@ func pushInterfaceToLua(L *lua.LState, val interface{}) {
|
|||||||
pushSliceNumericToLua(L, v)
|
pushSliceNumericToLua(L, v)
|
||||||
case []bool:
|
case []bool:
|
||||||
pushSliceBoolToLua(L, v)
|
pushSliceBoolToLua(L, v)
|
||||||
|
case []interface{}:
|
||||||
|
pushSliceInterfaceToLua(L, v)
|
||||||
case nil:
|
case nil:
|
||||||
L.Push(lua.LNil)
|
L.Push(lua.LNil)
|
||||||
case *sync.Mutex:
|
case *sync.Mutex:
|
||||||
@ -78,6 +80,29 @@ func pushMapStringInterfaceToLua(L *lua.LState, m map[string]interface{}) {
|
|||||||
L.Push(luaTable)
|
L.Push(luaTable)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pushSliceInterfaceToLua(L *lua.LState, s []interface{}) {
|
||||||
|
// Create a new Lua table
|
||||||
|
luaTable := L.NewTable()
|
||||||
|
// Iterate over the Go map and set the key-value pairs in the Lua table
|
||||||
|
for _, value := range s {
|
||||||
|
switch v := value.(type) {
|
||||||
|
case int:
|
||||||
|
luaTable.Append(lua.LNumber(v))
|
||||||
|
case float64:
|
||||||
|
luaTable.Append(lua.LNumber(v))
|
||||||
|
case bool:
|
||||||
|
luaTable.Append(lua.LBool(v))
|
||||||
|
case string:
|
||||||
|
luaTable.Append(lua.LString(v))
|
||||||
|
default:
|
||||||
|
log.Fatalf("Doesn't deal with slice containing value %v of type %T", v, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push the Lua table onto the stack
|
||||||
|
L.Push(luaTable)
|
||||||
|
}
|
||||||
|
|
||||||
// pushMapStringIntToLua creates a new Lua table and iterates over the Go map to set key-value pairs in the Lua table. It then pushes the Lua table onto the stack.
|
// pushMapStringIntToLua creates a new Lua table and iterates over the Go map to set key-value pairs in the Lua table. It then pushes the Lua table onto the stack.
|
||||||
//
|
//
|
||||||
// L *lua.LState - the Lua state
|
// L *lua.LState - the Lua state
|
||||||
|
@ -47,18 +47,21 @@ func newObiSeq(luaState *lua.LState) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var bioSequenceMethods = map[string]lua.LGFunction{
|
var bioSequenceMethods = map[string]lua.LGFunction{
|
||||||
"id": bioSequenceGetSetId,
|
"id": bioSequenceGetSetId,
|
||||||
"sequence": bioSequenceGetSetSequence,
|
"sequence": bioSequenceGetSetSequence,
|
||||||
"qualities": bioSequenceGetSetQualities,
|
"qualities": bioSequenceGetSetQualities,
|
||||||
"definition": bioSequenceGetSetDefinition,
|
"definition": bioSequenceGetSetDefinition,
|
||||||
"count": bioSequenceGetSetCount,
|
"count": bioSequenceGetSetCount,
|
||||||
"taxid": bioSequenceGetSetTaxid,
|
"taxid": bioSequenceGetSetTaxid,
|
||||||
"attribute": bioSequenceGetSetAttribute,
|
"attribute": bioSequenceGetSetAttribute,
|
||||||
"len": bioSequenceGetLength,
|
"len": bioSequenceGetLength,
|
||||||
"has_sequence": bioSequenceHasSequence,
|
"has_sequence": bioSequenceHasSequence,
|
||||||
"has_qualities": bioSequenceHasQualities,
|
"has_qualities": bioSequenceHasQualities,
|
||||||
"source": bioSequenceGetSource,
|
"source": bioSequenceGetSource,
|
||||||
"md5": bioSequenceGetMD5,
|
"md5": bioSequenceGetMD5,
|
||||||
|
"md5_string": bioSequenceGetMD5String,
|
||||||
|
"subsequence": bioSequenceGetSubsequence,
|
||||||
|
"reverse_complement": bioSequenceGetRevcomp,
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkBioSequence checks if the first argument in the Lua stack is a *obiseq.BioSequence.
|
// checkBioSequence checks if the first argument in the Lua stack is a *obiseq.BioSequence.
|
||||||
@ -224,3 +227,30 @@ func bioSequenceGetMD5(luaState *lua.LState) int {
|
|||||||
luaState.Push(rt)
|
luaState.Push(rt)
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func bioSequenceGetMD5String(luaState *lua.LState) int {
|
||||||
|
s := checkBioSequence(luaState)
|
||||||
|
md5 := s.MD5String()
|
||||||
|
luaState.Push(lua.LString(md5))
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func bioSequenceGetSubsequence(luaState *lua.LState) int {
|
||||||
|
s := checkBioSequence(luaState)
|
||||||
|
start := luaState.CheckInt(2)
|
||||||
|
end := luaState.CheckInt(3)
|
||||||
|
subseq, err := s.Subsequence(start, end, false)
|
||||||
|
if err != nil {
|
||||||
|
luaState.RaiseError("%s : Error on subseq: %v", s.Id(), err)
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
luaState.Push(obiseq2Lua(luaState, subseq))
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func bioSequenceGetRevcomp(luaState *lua.LState) int {
|
||||||
|
s := checkBioSequence(luaState)
|
||||||
|
revcomp := s.ReverseComplement(false)
|
||||||
|
luaState.Push(obiseq2Lua(luaState, revcomp))
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
@ -30,6 +30,7 @@ var _PprofMudex = 10
|
|||||||
var _PprofGoroutine = 6060
|
var _PprofGoroutine = 6060
|
||||||
var _Quality_Shift_Input = byte(33)
|
var _Quality_Shift_Input = byte(33)
|
||||||
var _Quality_Shift_Output = byte(33)
|
var _Quality_Shift_Output = byte(33)
|
||||||
|
var _Read_Qualities = true
|
||||||
|
|
||||||
var __taxdump__ = ""
|
var __taxdump__ = ""
|
||||||
var __alternative_name__ = false
|
var __alternative_name__ = false
|
||||||
@ -291,6 +292,10 @@ func CLIBatchSize() int {
|
|||||||
return _BatchSize
|
return _BatchSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CLIReadQualities() bool {
|
||||||
|
return _Read_Qualities
|
||||||
|
}
|
||||||
|
|
||||||
// SetDebugOn sets the debug mode on.
|
// SetDebugOn sets the debug mode on.
|
||||||
func SetDebugOn() {
|
func SetDebugOn() {
|
||||||
_Debug = true
|
_Debug = true
|
||||||
@ -301,6 +306,10 @@ func SetDebugOff() {
|
|||||||
_Debug = false
|
_Debug = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SetReadQualities(status bool) {
|
||||||
|
_Read_Qualities = status
|
||||||
|
}
|
||||||
|
|
||||||
// SetWorkerPerCore sets the number of workers per CPU core.
|
// SetWorkerPerCore sets the number of workers per CPU core.
|
||||||
//
|
//
|
||||||
// It takes a float64 parameter representing the number of workers
|
// It takes a float64 parameter representing the number of workers
|
||||||
|
@ -7,8 +7,9 @@ import (
|
|||||||
// TODO: The version number is extracted from git. This induces that the version
|
// TODO: The version number is extracted from git. This induces that the version
|
||||||
// corresponds to the last commit, and not the one when the file will be
|
// corresponds to the last commit, and not the one when the file will be
|
||||||
// commited
|
// commited
|
||||||
var _Commit = "abfa8f3"
|
|
||||||
var _Version = "Release 4.2.0"
|
var _Commit = "39dd3e3"
|
||||||
|
var _Version = "Release 4.3.0"
|
||||||
|
|
||||||
// Version returns the version of the obitools package.
|
// Version returns the version of the obitools package.
|
||||||
//
|
//
|
||||||
|
@ -12,6 +12,7 @@ package obiseq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"encoding/hex"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -182,6 +183,9 @@ func (s *BioSequence) Copy() *BioSequence {
|
|||||||
newSeq.sequence = CopySlice(s.sequence)
|
newSeq.sequence = CopySlice(s.sequence)
|
||||||
newSeq.qualities = CopySlice(s.qualities)
|
newSeq.qualities = CopySlice(s.qualities)
|
||||||
newSeq.feature = CopySlice(s.feature)
|
newSeq.feature = CopySlice(s.feature)
|
||||||
|
if s.revcomp != nil {
|
||||||
|
newSeq.revcomp = s.revcomp.Copy()
|
||||||
|
}
|
||||||
|
|
||||||
if len(s.annotations) > 0 {
|
if len(s.annotations) > 0 {
|
||||||
s.annot_lock.Lock()
|
s.annot_lock.Lock()
|
||||||
@ -375,6 +379,11 @@ func (s *BioSequence) MD5() [16]byte {
|
|||||||
return md5.Sum(s.sequence)
|
return md5.Sum(s.sequence)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *BioSequence) MD5String() string {
|
||||||
|
md5_hash := s.MD5()
|
||||||
|
return hex.EncodeToString(md5_hash[:])
|
||||||
|
}
|
||||||
|
|
||||||
// SetId sets the id of the BioSequence.
|
// SetId sets the id of the BioSequence.
|
||||||
//
|
//
|
||||||
// Parameters:
|
// Parameters:
|
||||||
|
@ -55,8 +55,7 @@ func (sequence *BioSequence) ReverseComplement(inplace bool) *BioSequence {
|
|||||||
|
|
||||||
if !inplace {
|
if !inplace {
|
||||||
original = sequence
|
original = sequence
|
||||||
sequence.revcomp = sequence.Copy()
|
sequence = sequence.Copy()
|
||||||
sequence = sequence.revcomp
|
|
||||||
sequence.revcomp = original
|
sequence.revcomp = original
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,6 +99,8 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) {
|
|||||||
opts = append(opts, obiformats.OptionsFastSeqHeaderParser(obiformats.ParseGuessedFastSeqHeader))
|
opts = append(opts, obiformats.OptionsFastSeqHeaderParser(obiformats.ParseGuessedFastSeqHeader))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts = append(opts, obiformats.OptionsReadQualities(obioptions.CLIReadQualities()))
|
||||||
|
|
||||||
nworkers := obioptions.CLIReadParallelWorkers()
|
nworkers := obioptions.CLIReadParallelWorkers()
|
||||||
if nworkers < 2 {
|
if nworkers < 2 {
|
||||||
nworkers = 2
|
nworkers = 2
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
|
||||||
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
|
||||||
|
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func _Abs(x int) int {
|
func _Abs(x int) int {
|
||||||
@ -112,7 +113,7 @@ func AssemblePESequences(seqA, seqB *obiseq.BioSequence,
|
|||||||
inplace bool, fastAlign, fastModeRel bool,
|
inplace bool, fastAlign, fastModeRel bool,
|
||||||
arenaAlign obialign.PEAlignArena, shifh_buff *map[int]int) *obiseq.BioSequence {
|
arenaAlign obialign.PEAlignArena, shifh_buff *map[int]int) *obiseq.BioSequence {
|
||||||
|
|
||||||
score, path, fastcount, over, fastscore := obialign.PEAlign(
|
isLeftAlign, score, path, fastcount, over, fastscore := obialign.PEAlign(
|
||||||
seqA, seqB,
|
seqA, seqB,
|
||||||
gap, scale,
|
gap, scale,
|
||||||
fastAlign, delta, fastModeRel,
|
fastAlign, delta, fastModeRel,
|
||||||
@ -143,19 +144,14 @@ func AssemblePESequences(seqA, seqB *obiseq.BioSequence,
|
|||||||
if aliLength >= minOverlap && identity >= minIdentity {
|
if aliLength >= minOverlap && identity >= minIdentity {
|
||||||
annot["mode"] = "alignment"
|
annot["mode"] = "alignment"
|
||||||
if withStats {
|
if withStats {
|
||||||
if left < 0 {
|
if isLeftAlign {
|
||||||
annot["seq_a_single"] = -left
|
|
||||||
annot["ali_dir"] = "left"
|
annot["ali_dir"] = "left"
|
||||||
|
annot["seq_a_single"] = obiutils.Abs(left)
|
||||||
|
annot["seq_b_single"] = obiutils.Abs(right)
|
||||||
} else {
|
} else {
|
||||||
annot["seq_b_single"] = left
|
|
||||||
annot["ali_dir"] = "right"
|
annot["ali_dir"] = "right"
|
||||||
}
|
annot["seq_a_single"] = obiutils.Abs(right)
|
||||||
|
annot["seq_b_single"] = obiutils.Abs(left)
|
||||||
if right < 0 {
|
|
||||||
right = -right
|
|
||||||
annot["seq_a_single"] = right
|
|
||||||
} else {
|
|
||||||
annot["seq_b_single"] = right
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if inplace {
|
if inplace {
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
var _reorientate = false
|
var _reorientate = false
|
||||||
|
|
||||||
func TagPCROptionSet(options *getoptions.GetOpt) {
|
func TagPCROptionSet(options *getoptions.GetOpt) {
|
||||||
options.BoolVar(&_reorientate, "reference-db", _reorientate,
|
options.BoolVar(&_reorientate, "reorientate", _reorientate,
|
||||||
options.Description("Reverse complemente reads if needed to store all the sequences in "+
|
options.Description("Reverse complemente reads if needed to store all the sequences in "+
|
||||||
"the same orientation respectively to forward and reverse primers"))
|
"the same orientation respectively to forward and reverse primers"))
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ func IPCRTagPESequencesBatch(iterator obiiter.IBioSequence,
|
|||||||
newIter := obiiter.MakeIBioSequence()
|
newIter := obiiter.MakeIBioSequence()
|
||||||
newIter.MarkAsPaired()
|
newIter.MarkAsPaired()
|
||||||
|
|
||||||
f := func(iterator obiiter.IBioSequence, wid int) {
|
f := func(iterator obiiter.IBioSequence) {
|
||||||
arena := obialign.MakePEAlignArena(150, 150)
|
arena := obialign.MakePEAlignArena(150, 150)
|
||||||
shifts := make(map[int]int)
|
shifts := make(map[int]int)
|
||||||
|
|
||||||
@ -89,46 +89,51 @@ func IPCRTagPESequencesBatch(iterator obiiter.IBioSequence,
|
|||||||
|
|
||||||
forward_match := annot["obimultiplex_forward_match"].(string)
|
forward_match := annot["obimultiplex_forward_match"].(string)
|
||||||
forward_mismatches := annot["obimultiplex_forward_error"].(int)
|
forward_mismatches := annot["obimultiplex_forward_error"].(int)
|
||||||
forward_tag := annot["obimultiplex_forward_tag"].(string)
|
|
||||||
|
|
||||||
reverse_match := annot["obimultiplex_reverse_match"].(string)
|
reverse_match := annot["obimultiplex_reverse_match"].(string)
|
||||||
reverse_mismatches := annot["obimultiplex_reverse_error"].(int)
|
reverse_mismatches := annot["obimultiplex_reverse_error"].(int)
|
||||||
reverse_tag := annot["obimultiplex_reverse_tag"].(string)
|
|
||||||
|
|
||||||
sample := annot["sample"].(string)
|
sample := annot["sample"].(string)
|
||||||
experiment := annot["experiment"].(string)
|
experiment := annot["experiment"].(string)
|
||||||
|
|
||||||
aanot := A.Annotations()
|
aanot := A.Annotations()
|
||||||
|
banot := B.Annotations()
|
||||||
|
|
||||||
|
if value, ok := annot["obimultiplex_forward_tag"]; ok {
|
||||||
|
forward_tag := value.(string)
|
||||||
|
aanot["obimultiplex_forward_tag"] = forward_tag
|
||||||
|
banot["obimultiplex_forward_tag"] = forward_tag
|
||||||
|
}
|
||||||
|
|
||||||
|
if value, ok := annot["obimultiplex_reverse_tag"]; ok {
|
||||||
|
reverse_tag := value.(string)
|
||||||
|
aanot["obimultiplex_reverse_tag"] = reverse_tag
|
||||||
|
banot["obimultiplex_reverse_tag"] = reverse_tag
|
||||||
|
}
|
||||||
|
|
||||||
aanot["obimultiplex_direction"] = direction
|
aanot["obimultiplex_direction"] = direction
|
||||||
|
|
||||||
aanot["obimultiplex_forward_match"] = forward_match
|
aanot["obimultiplex_forward_match"] = forward_match
|
||||||
aanot["obimultiplex_forward_mismatches"] = forward_mismatches
|
aanot["obimultiplex_forward_mismatches"] = forward_mismatches
|
||||||
aanot["obimultiplex_forward_tag"] = forward_tag
|
|
||||||
|
|
||||||
aanot["obimultiplex_reverse_match"] = reverse_match
|
aanot["obimultiplex_reverse_match"] = reverse_match
|
||||||
aanot["obimultiplex_reverse_mismatches"] = reverse_mismatches
|
aanot["obimultiplex_reverse_mismatches"] = reverse_mismatches
|
||||||
aanot["obimultiplex_reverse_tag"] = reverse_tag
|
|
||||||
|
|
||||||
aanot["sample"] = sample
|
aanot["sample"] = sample
|
||||||
aanot["experiment"] = experiment
|
aanot["experiment"] = experiment
|
||||||
|
|
||||||
banot := B.Annotations()
|
|
||||||
banot["obimultiplex_direction"] = direction
|
banot["obimultiplex_direction"] = direction
|
||||||
|
|
||||||
banot["obimultiplex_forward_match"] = forward_match
|
banot["obimultiplex_forward_match"] = forward_match
|
||||||
banot["obimultiplex_forward_mismatches"] = forward_mismatches
|
banot["obimultiplex_forward_mismatches"] = forward_mismatches
|
||||||
banot["obimultiplex_forward_tag"] = forward_tag
|
|
||||||
|
|
||||||
banot["obimultiplex_reverse_match"] = reverse_match
|
banot["obimultiplex_reverse_match"] = reverse_match
|
||||||
banot["obimultiplex_reverse_mismatches"] = reverse_mismatches
|
banot["obimultiplex_reverse_mismatches"] = reverse_mismatches
|
||||||
banot["obimultiplex_reverse_tag"] = reverse_tag
|
|
||||||
|
|
||||||
banot["sample"] = sample
|
banot["sample"] = sample
|
||||||
banot["experiment"] = experiment
|
banot["experiment"] = experiment
|
||||||
|
|
||||||
if CLIReorientate() && direction == "reverse" {
|
if CLIReorientate() && direction == "reverse" {
|
||||||
B.ReverseComplement(true)
|
|
||||||
A.ReverseComplement(true)
|
|
||||||
B.PairTo(A)
|
B.PairTo(A)
|
||||||
batch.Slice()[i] = B
|
batch.Slice()[i] = B
|
||||||
}
|
}
|
||||||
@ -162,9 +167,9 @@ func IPCRTagPESequencesBatch(iterator obiiter.IBioSequence,
|
|||||||
|
|
||||||
newIter.Add(nworkers)
|
newIter.Add(nworkers)
|
||||||
for i := 1; i < nworkers; i++ {
|
for i := 1; i < nworkers; i++ {
|
||||||
go f(iterator.Split(), i)
|
go f(iterator.Split())
|
||||||
}
|
}
|
||||||
go f(iterator, 0)
|
go f(iterator)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
newIter.WaitAndClose()
|
newIter.WaitAndClose()
|
||||||
|
Reference in New Issue
Block a user