a functional new version of obifind

This commit is contained in:
Eric Coissac
2024-11-24 19:33:24 +01:00
parent 36327c79c8
commit 3d06978808
21 changed files with 1805 additions and 343 deletions

View File

@ -17,7 +17,7 @@ func main() {
fs, err := obiconvert.CLIReadBioSequences(args...)
obiconvert.OpenSequenceDataErrorMessage(args, err)
obicsv.CLIWriteCSV(fs, true)
obicsv.CLIWriteSequenceCSV(fs, true)
obiiter.WaitForLastPipe()

View File

@ -1,9 +1,10 @@
package main
import (
"fmt"
"log"
"os"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitax"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obifind"
@ -14,13 +15,7 @@ func main() {
_, args := optionParser(os.Args)
//prof, _ := os.Create("obifind.prof")
//pprof.StartCPUProfile(prof)
restrictions, err := obifind.ITaxonRestrictions()
if err != nil {
fmt.Printf("%+v", err)
}
var iterator *obitax.ITaxon
switch {
case obifind.CLIRequestsPathForTaxid() != "NA":
@ -28,34 +23,45 @@ func main() {
taxon := obitax.DefaultTaxonomy().Taxon(obifind.CLIRequestsPathForTaxid())
if taxon == nil {
fmt.Printf("%+v", err)
log.Fatalf("Cannot identify the requested taxon: %s",
obifind.CLIRequestsPathForTaxid())
}
s := taxon.Path()
if err != nil {
fmt.Printf("%+v", err)
if s == nil {
log.Fatalf("Cannot extract taxonomic path describing %s", taxon.String())
}
obifind.TaxonWriter(s.Iterator(),
fmt.Sprintf("path:%s", taxon.String()))
iterator = s.Iterator()
if obifind.CLIWithQuery() {
iterator = iterator.AddMetadata("query", taxon.String())
}
case len(args) == 0:
taxonomy := obitax.DefaultTaxonomy()
obifind.TaxonWriter(restrictions(taxonomy.Iterator()), "")
iterator = obitax.DefaultTaxonomy().Iterator()
default:
matcher, err := obifind.ITaxonNameMatcher()
iters := make([]*obitax.ITaxon, len(args))
if err != nil {
fmt.Printf("%+v", err)
for i, pat := range args {
ii := obitax.DefaultTaxonomy().IFilterOnName(pat, obifind.CLIFixedPattern(), true)
if obifind.CLIWithQuery() {
ii = ii.AddMetadata("query", pat)
}
iters[i] = ii
}
for _, pattern := range args {
s := restrictions(matcher(pattern))
obifind.TaxonWriter(s, pattern)
iterator = iters[0]
if len(iters) > 1 {
iterator = iterator.Concat(iters[1:]...)
}
}
//pprof.StopCPUProfile()
iterator = obifind.CLITaxonRestrictions(iterator)
obifind.CLICSVTaxaWriter(iterator, true)
obiiter.WaitForLastPipe()
}

1
go.mod
View File

@ -24,6 +24,7 @@ require (
)
require (
github.com/Clever/csvlint v0.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goombaio/orderedmap v0.0.0-20180924084748-ba921b7e2419 // indirect
github.com/kr/pretty v0.3.0 // indirect

3
go.sum
View File

@ -1,3 +1,5 @@
github.com/Clever/csvlint v0.3.0 h1:58WEFXWy+i0fCbxTXscR2QwYESRuAUFjEGLgZs6j2iU=
github.com/Clever/csvlint v0.3.0/go.mod h1:+wLRuW/bI8NhpRoeyUBxqKsK35OhvgJhXHSWdKp5XJU=
github.com/DavidGamba/go-getoptions v0.28.0 h1:18wgEvfZdrlfIhVDGEBO3Dl0fkOyXqXLa0tLMCKxM1c=
github.com/DavidGamba/go-getoptions v0.28.0/go.mod h1:zE97E3PR9P3BI/HKyNYgdMlYxodcuiC6W68KIgeYT84=
github.com/PaesslerAG/gval v1.2.2 h1:Y7iBzhgE09IGTt5QgGQ2IdaYYYOU134YGHBThD+wm9E=
@ -65,6 +67,7 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=

View File

@ -1,22 +1,14 @@
package obiformats
import (
"bytes"
"encoding/csv"
"fmt"
"io"
"os"
"sync"
"time"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
log "github.com/sirupsen/logrus"
)
func CSVRecord(sequence *obiseq.BioSequence, opt Options) []string {
func CSVSequenceRecord(sequence *obiseq.BioSequence, opt Options) []string {
keys := opt.CSVKeys()
record := make([]string, 0, len(keys)+4)
@ -74,182 +66,3 @@ func CSVRecord(sequence *obiseq.BioSequence, opt Options) []string {
return record
}
func CSVHeader(opt Options) []string {
keys := opt.CSVKeys()
record := make([]string, 0, len(keys)+4)
if opt.CSVId() {
record = append(record, "id")
}
if opt.CSVCount() {
record = append(record, "count")
}
if opt.CSVTaxon() {
record = append(record, "taxid", "scientific_name")
}
if opt.CSVDefinition() {
record = append(record, "definition")
}
record = append(record, opt.CSVKeys()...)
if opt.CSVSequence() {
record = append(record, "sequence")
}
if opt.CSVQuality() {
record = append(record, "quality")
}
return record
}
func FormatCVSBatch(batch obiiter.BioSequenceBatch, opt Options) []byte {
buff := new(bytes.Buffer)
csv := csv.NewWriter(buff)
if batch.Order() == 0 {
csv.Write(CSVHeader(opt))
}
for _, s := range batch.Slice() {
csv.Write(CSVRecord(s, opt))
}
csv.Flush()
return buff.Bytes()
}
func WriteCSV(iterator obiiter.IBioSequence,
file io.WriteCloser,
options ...WithOption) (obiiter.IBioSequence, error) {
var auto_slot obiutils.Set[string]
opt := MakeOptions(options)
file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile())
newIter := obiiter.MakeIBioSequence()
nwriters := opt.ParallelWorkers()
obiiter.RegisterAPipe()
chunkchan := make(chan FileChunk)
newIter.Add(nwriters)
var waitWriter sync.WaitGroup
go func() {
newIter.WaitAndClose()
for len(chunkchan) > 0 {
time.Sleep(time.Millisecond)
}
close(chunkchan)
waitWriter.Wait()
}()
ff := func(iterator obiiter.IBioSequence) {
for iterator.Next() {
batch := iterator.Get()
chunkchan <- FileChunk{
FormatCVSBatch(batch, opt),
batch.Order(),
}
newIter.Push(batch)
}
newIter.Done()
}
next_to_send := 0
received := make(map[int]FileChunk, 100)
waitWriter.Add(1)
go func() {
for chunk := range chunkchan {
if chunk.order == next_to_send {
file.Write(chunk.text)
next_to_send++
chunk, ok := received[next_to_send]
for ok {
file.Write(chunk.text)
delete(received, next_to_send)
next_to_send++
chunk, ok = received[next_to_send]
}
} else {
received[chunk.order] = chunk
}
}
file.Close()
log.Debugln("End of the CSV file writing")
obiiter.UnregisterPipe()
waitWriter.Done()
}()
if opt.pointer.csv_auto {
if iterator.Next() {
batch := iterator.Get()
auto_slot = batch.Slice().AttributeKeys(true)
CSVKeys(auto_slot.Members())(opt)
iterator.PushBack()
}
}
log.Debugln("Start of the CSV file writing")
go ff(iterator)
for i := 0; i < nwriters-1; i++ {
go ff(iterator.Split())
}
return newIter, nil
}
func WriteCSVToStdout(iterator obiiter.IBioSequence,
options ...WithOption) (obiiter.IBioSequence, error) {
options = append(options, OptionDontCloseFile())
return WriteCSV(iterator, os.Stdout, options...)
}
func WriteCSVToFile(iterator obiiter.IBioSequence,
filename string,
options ...WithOption) (obiiter.IBioSequence, error) {
opt := MakeOptions(options)
flags := os.O_WRONLY | os.O_CREATE
if opt.AppendFile() {
flags |= os.O_APPEND
}
file, err := os.OpenFile(filename, flags, 0660)
if err != nil {
log.Fatalf("open file error: %v", err)
return obiiter.NilIBioSequence, err
}
options = append(options, OptionCloseFile())
iterator, err = WriteCSV(iterator, file, options...)
if opt.HaveToSavePaired() {
var revfile *os.File
revfile, err = os.OpenFile(opt.PairedFileName(), flags, 0660)
if err != nil {
log.Fatalf("open file error: %v", err)
return obiiter.NilIBioSequence, err
}
iterator, err = WriteCSV(iterator.PairedWith(), revfile, options...)
}
return iterator, err
}

View File

@ -13,7 +13,6 @@ func WriteSeqFileChunk(
toBeClosed bool) ChannelSeqFileChunk {
obiiter.RegisterAPipe()
chunk_channel := make(ChannelSeqFileChunk)
go func() {

View File

@ -7,7 +7,7 @@ import (
// TODO: The version number is extracted from git. This induces that the version
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "7633fc4"
var _Commit = "36327c7"
var _Version = "Release 4.2.0"
// Version returns the version of the obitools package.

View File

@ -14,18 +14,8 @@ import (
//
// Returns:
// - A pointer to a new ITaxon iterator containing only the Taxon instances that match the specified name.
func (taxonomy *Taxonomy) IFilterOnName(name string, strict bool) *ITaxon {
if strict {
nodes, ok := taxonomy.index[taxonomy.names.Innerize(name)]
if ok {
return nodes.Iterator()
} else {
empty := taxonomy.NewTaxonSet()
return empty.Iterator()
}
}
return taxonomy.Iterator().IFilterOnName(name, strict)
func (taxonomy *Taxonomy) IFilterOnName(name string, strict bool, ingnoreCase bool) *ITaxon {
return taxonomy.Iterator().IFilterOnName(name, strict, ingnoreCase)
}
// IFilterOnName filters the Taxon instances in the iterator based on the specified name.
@ -38,7 +28,7 @@ func (taxonomy *Taxonomy) IFilterOnName(name string, strict bool) *ITaxon {
//
// Returns:
// - A pointer to a new ITaxon iterator containing only the Taxon instances that match the specified name.
func (iterator *ITaxon) IFilterOnName(name string, strict bool) *ITaxon {
func (iterator *ITaxon) IFilterOnName(name string, strict bool, ignoreCase bool) *ITaxon {
newIterator := NewITaxon()
sentTaxa := make(map[*string]bool)
@ -48,16 +38,21 @@ func (iterator *ITaxon) IFilterOnName(name string, strict bool) *ITaxon {
taxon := iterator.Get()
node := taxon.Node
if _, ok := sentTaxa[node.id]; !ok {
if taxon.IsNameEqual(name) {
if taxon.IsNameEqual(name, ignoreCase) {
sentTaxa[node.id] = true
newIterator.source <- taxon
newIterator.Push(taxon)
}
}
}
close(newIterator.source)
}()
} else {
pattern := regexp.MustCompile(name)
var pattern *regexp.Regexp
if ignoreCase {
pattern = regexp.MustCompile("(?i)" + name)
} else {
pattern = regexp.MustCompile(name)
}
go func() {
for iterator.Next() {
@ -66,11 +61,11 @@ func (iterator *ITaxon) IFilterOnName(name string, strict bool) *ITaxon {
if _, ok := sentTaxa[node.id]; !ok {
if taxon.IsNameMatching(pattern) {
sentTaxa[node.id] = true
newIterator.source <- taxon
newIterator.Push(taxon)
}
}
}
close(newIterator.source)
newIterator.Close()
}()
}

View File

@ -28,10 +28,12 @@ func (set *TaxonSet) Iterator() *ITaxon {
go func() {
for _, t := range set.set {
i.source <- &Taxon{
taxon := &Taxon{
Taxonomy: set.taxonomy,
Metadata: nil,
Node: t,
}
i.Push(taxon)
}
close(i.source)
}()
@ -46,17 +48,25 @@ func (set *TaxonSlice) Iterator() *ITaxon {
go func() {
for _, t := range set.slice {
i.source <- &Taxon{
i.Push(&Taxon{
Taxonomy: set.taxonomy,
Node: t,
}
})
}
close(i.source)
i.Close()
}()
return i
}
func (iterator *ITaxon) Push(taxon *Taxon) {
iterator.source <- taxon
}
func (iterator *ITaxon) Close() {
close(iterator.source)
}
// Iterator creates a new ITaxon iterator for the Taxonomy's nodes.
func (taxonomy *Taxonomy) Iterator() *ITaxon {
return taxonomy.nodes.Iterator()
@ -83,17 +93,28 @@ func (iterator *ITaxon) Next() bool {
// Get returns the current Taxon instance pointed to by the iterator.
// You must call 'Next' before calling 'Get' to retrieve the next instance.
func (iterator *ITaxon) Get() *Taxon {
if iterator == nil {
return nil
}
return iterator.current
}
// Finished returns true if no more data is available from the iterator.
func (iterator *ITaxon) Finished() bool {
if iterator == nil {
return true
}
return *iterator.p_finished
}
// Split creates a new ITaxon iterator that shares the same source channel
// and finished status as the original iterator.
func (iterator *ITaxon) Split() *ITaxon {
if iterator == nil {
return nil
}
return &ITaxon{
source: iterator.source,
current: nil,
@ -101,3 +122,49 @@ func (iterator *ITaxon) Split() *ITaxon {
p_finished: iterator.p_finished,
}
}
func (iterator *ITaxon) AddMetadata(name string, value interface{}) *ITaxon {
if iterator == nil {
return nil
}
i := NewITaxon()
go func() {
for iterator.Next() {
taxon := iterator.Get()
taxon.SetMetadata(name, value)
i.Push(taxon)
}
i.Close()
}()
return i
}
func (iterator *ITaxon) Concat(iterators ...*ITaxon) *ITaxon {
newIter := NewITaxon()
go func() {
if iterator != nil {
for iterator.Next() {
taxon := iterator.Get()
newIter.Push(taxon)
}
}
for _, iter := range iterators {
if iter != nil {
for iter.Next() {
taxon := iter.Get()
newIter.Push(taxon)
}
}
}
newIter.Close()
}()
return newIter
}

View File

@ -4,6 +4,7 @@ import (
"iter"
"regexp"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
log "github.com/sirupsen/logrus"
)
@ -15,6 +16,7 @@ import (
// - Node: A pointer to the TaxNode instance representing the specific taxon.
type Taxon struct {
Taxonomy *Taxonomy
Metadata *map[string]*interface{}
Node *TaxNode
}
@ -70,12 +72,12 @@ func (taxon *Taxon) Name(class string) string {
//
// Returns:
// - A boolean indicating whether the names are equal.
func (taxon *Taxon) IsNameEqual(name string) bool {
func (taxon *Taxon) IsNameEqual(name string, ignoreCase bool) bool {
if taxon == nil {
return false
}
return taxon.Node.IsNameEqual(name)
return taxon.Node.IsNameEqual(name, ignoreCase)
}
// IsNameMatching checks if the name of the Taxon matches the given regular expression pattern.
@ -283,3 +285,89 @@ func (taxon *Taxon) Genus() *Taxon {
func (taxon *Taxon) Family() *Taxon {
return taxon.TaxonAtRank("family")
}
func (taxon *Taxon) SetMetadata(name string, value interface{}) *Taxon {
if taxon == nil {
return nil
}
if taxon.Metadata == nil {
m := make(map[string]*interface{})
taxon.Metadata = &m
}
(*taxon.Metadata)[name] = &value
return taxon
}
func (taxon *Taxon) GetMetadata(name string) *interface{} {
if taxon == nil || taxon.Metadata == nil {
return nil
}
return (*taxon.Metadata)[name]
}
func (taxon *Taxon) HasMetadata(name string) bool {
if taxon == nil || taxon.Metadata == nil {
return false
}
_, ok := (*taxon.Metadata)[name]
return ok
}
func (taxon *Taxon) RemoveMetadata(name string) {
if taxon == nil || taxon.Metadata == nil {
return
}
delete(*taxon.Metadata, name)
}
func (taxon *Taxon) MetadataAsString(name string) string {
meta := taxon.GetMetadata(name)
if meta == nil {
return ""
}
value, err := obiutils.InterfaceToString(*meta)
if err != nil {
return ""
}
return value
}
func (taxon *Taxon) MetadataKeys() []string {
if taxon == nil || taxon.Metadata == nil {
return nil
}
keys := make([]string, 0, len(*taxon.Metadata))
for k := range *taxon.Metadata {
keys = append(keys, k)
}
return keys
}
func (taxon *Taxon) MetadataValues() []interface{} {
if taxon == nil || taxon.Metadata == nil {
return nil
}
values := make([]interface{}, 0, len(*taxon.Metadata))
for _, v := range *taxon.Metadata {
values = append(values, v)
}
return values
}
func (taxon *Taxon) MetadataStringValues() []string {
if taxon == nil || taxon.Metadata == nil {
return nil
}
values := make([]string, 0, len(*taxon.Metadata))
for _, v := range *taxon.Metadata {
value, err := obiutils.InterfaceToString(v)
if err != nil {
value = ""
}
values = append(values, value)
}
return values
}

View File

@ -2,8 +2,10 @@ package obitax
import (
"fmt"
"log"
"regexp"
"strings"
log "github.com/sirupsen/logrus"
)
// TaxNode represents a single taxon in a taxonomy.
@ -161,13 +163,17 @@ func (node *TaxNode) Rank() string {
// Returns:
// - A boolean indicating whether the provided name is equal to the scientific name or exists
// as an alternate name for the taxon.
func (node *TaxNode) IsNameEqual(name string) bool {
if *(node.scientificname) == name {
func (node *TaxNode) IsNameEqual(name string, ignoreCase bool) bool {
if node == nil {
return false
}
if *(node.scientificname) == name || (ignoreCase && strings.EqualFold(*(node.scientificname), name)) {
return true
}
if node.alternatenames != nil {
for _, n := range *node.alternatenames {
if n != nil && *n == name {
if n != nil && (ignoreCase && strings.EqualFold(*n, name)) {
return true
}
}

View File

@ -84,14 +84,14 @@ func (path *TaxonSlice) String() string {
if path.Len() > 0 {
taxon := path.slice[path.Len()-1]
fmt.Fprintf(&buffer, "%v@%s@%s",
taxon.Id(),
*taxon.Id(),
taxon.ScientificName(),
taxon.Rank())
for i := path.Len() - 2; i >= 0; i-- {
taxon := path.slice[i]
fmt.Fprintf(&buffer, "|%v@%s@%s",
taxon.Id(),
*taxon.Id(),
taxon.ScientificName(),
taxon.Rank())
}

View File

@ -207,7 +207,7 @@ func EvalAttributeWorker(expression map[string]string) obiseq.SeqWorker {
func AddTaxonAtRankWorker(taxonomy *obitax.Taxonomy, ranks ...string) obiseq.SeqWorker {
f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) {
for _, r := range ranks {
taxonomy.SetTaxonAtRank(s, r)
s.SetTaxonAtRank(taxonomy, r)
}
return obiseq.BioSequenceSlice{s}, nil
}
@ -217,7 +217,7 @@ func AddTaxonAtRankWorker(taxonomy *obitax.Taxonomy, ranks ...string) obiseq.Seq
func AddTaxonRankWorker(taxonomy *obitax.Taxonomy) obiseq.SeqWorker {
f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) {
taxonomy.SetTaxonomicRank(s)
s.SetTaxonomicRank(taxonomy)
return obiseq.BioSequenceSlice{s}, nil
}
@ -226,7 +226,7 @@ func AddTaxonRankWorker(taxonomy *obitax.Taxonomy) obiseq.SeqWorker {
func AddScientificNameWorker(taxonomy *obitax.Taxonomy) obiseq.SeqWorker {
f := func(s *obiseq.BioSequence) (obiseq.BioSequenceSlice, error) {
taxonomy.SetScientificName(s)
s.SetScientificName(taxonomy)
return obiseq.BioSequenceSlice{s}, nil
}
@ -280,7 +280,7 @@ func CLIAnnotationWorker() obiseq.SeqWorker {
if CLISetTaxonomicPath() {
taxo := obigrep.CLILoadSelectedTaxonomy()
w := taxo.MakeSetPathWorker()
w := obiseq.MakeSetPathWorker(taxo)
annotator = annotator.ChainWorkers(w)
}
@ -298,7 +298,7 @@ func CLIAnnotationWorker() obiseq.SeqWorker {
if CLIHasAddLCA() {
taxo := obigrep.CLILoadSelectedTaxonomy()
w := obitax.AddLCAWorker(taxo, CLILCASlotName(), CLILCAThreshold())
w := obiseq.AddLCAWorker(taxo, CLILCASlotName(), CLILCAThreshold())
annotator = annotator.ChainWorkers(w)
}

View File

@ -0,0 +1,489 @@
package obicsv
import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
)
// __options__ holds configuration options for processing.
// Each field corresponds to a specific setting that can be adjusted.
type __options__ struct {
with_progress_bar bool // Indicates whether to display a progress bar
filename string
buffer_size int // Size of the buffer for processing
batch_size int // Number of items to process in a batch
full_file_batch bool // Indicates whether to process the full file in a batch
parallel_workers int // Number of parallel workers to use
no_order bool // Indicates whether to process items in no specific order
closefile bool // Indicates whether to close the file after processing
appendfile bool // Indicates whether to append to the file instead of overwriting
compressed bool // Indicates whether the input data is compressed
skip_empty bool // Indicates whether to skip empty entries
csv_naomit bool // Indicates whether to omit NA values in CSV output
csv_id bool // Indicates whether to include ID in CSV output
csv_sequence bool // Indicates whether to include sequence in CSV output
csv_quality bool // Indicates whether to include quality in CSV output
csv_definition bool // Indicates whether to include definition in CSV output
csv_count bool // Indicates whether to include count in CSV output
csv_taxon bool // Indicates whether to include taxon in CSV output
csv_keys []string // List of keys to include in CSV output
csv_separator string // Separator to use in CSV output
csv_navalue string // Value to use for NA entries in CSV output
csv_auto bool // Indicates whether to automatically determine CSV format
source string // Source of the data
}
// Options wraps the __options__ struct to provide a pointer to the options.
type Options struct {
pointer *__options__ // Pointer to the underlying options
}
// WithOption is a function type that takes an Options parameter and modifies it.
type WithOption func(Options)
// MakeOptions creates an Options instance with default settings and applies any provided setters.
// It returns the configured Options.
//
// Parameters:
// - setters: A slice of WithOption functions to customize the options.
//
// Returns:
// - An Options instance with the specified settings.
func MakeOptions(setters []WithOption) Options {
o := __options__{
with_progress_bar: false,
filename: "-",
buffer_size: 2,
parallel_workers: obioptions.CLIReadParallelWorkers(),
batch_size: obioptions.CLIBatchSize(),
no_order: false,
full_file_batch: false,
closefile: false,
appendfile: false,
compressed: false,
skip_empty: false,
csv_id: true,
csv_definition: false,
csv_count: false,
csv_taxon: false,
csv_sequence: true,
csv_quality: false,
csv_separator: ",",
csv_navalue: "NA",
csv_keys: make(CSVHeader, 0),
csv_auto: false,
source: "unknown",
}
opt := Options{&o}
for _, set := range setters {
set(opt)
}
return opt
}
// BatchSize returns the size of the batch to be processed.
// It retrieves the batch size from the underlying options.
func (opt Options) BatchSize() int {
return opt.pointer.batch_size
}
func (opt Options) FileName() string {
return opt.pointer.filename
}
// FullFileBatch returns whether the full file should be processed in a single batch.
// It retrieves the setting from the underlying options.
func (opt Options) FullFileBatch() bool {
return opt.pointer.full_file_batch
}
// ParallelWorkers returns the number of parallel workers to be used for processing.
// It retrieves the number of workers from the underlying options.
func (opt Options) ParallelWorkers() int {
return opt.pointer.parallel_workers
}
// NoOrder returns whether the processing should occur in no specific order.
// It retrieves the setting from the underlying options.
func (opt Options) NoOrder() bool {
return opt.pointer.no_order
}
// ProgressBar returns whether a progress bar should be displayed during processing.
// It retrieves the setting from the underlying options.
func (opt Options) ProgressBar() bool {
return opt.pointer.with_progress_bar
}
// CloseFile returns whether the file should be closed after processing.
// It retrieves the setting from the underlying options.
func (opt Options) CloseFile() bool {
return opt.pointer.closefile
}
// AppendFile returns whether to append to the file instead of overwriting it.
// It retrieves the setting from the underlying options.
func (opt Options) AppendFile() bool {
return opt.pointer.appendfile
}
// CompressedFile returns whether the input data is compressed.
// It retrieves the setting from the underlying options.
func (opt Options) CompressedFile() bool {
return opt.pointer.compressed
}
// SkipEmptySequence returns whether empty sequences should be skipped during processing.
// It retrieves the setting from the underlying options.
func (opt Options) SkipEmptySequence() bool {
return opt.pointer.skip_empty
}
// CSVId returns whether the ID should be included in the CSV output.
// It retrieves the setting from the underlying options.
func (opt Options) CSVId() bool {
return opt.pointer.csv_id
}
// CSVDefinition returns whether the definition should be included in the CSV output.
// It retrieves the setting from the underlying options.
func (opt Options) CSVDefinition() bool {
return opt.pointer.csv_definition
}
// CSVCount returns whether the count should be included in the CSV output.
// It retrieves the setting from the underlying options.
func (opt Options) CSVCount() bool {
return opt.pointer.csv_count
}
// CSVTaxon returns whether the taxon should be included in the CSV output.
// It retrieves the setting from the underlying options.
func (opt Options) CSVTaxon() bool {
return opt.pointer.csv_taxon
}
// CSVSequence returns whether the sequence should be included in the CSV output.
// It retrieves the setting from the underlying options.
func (opt Options) CSVSequence() bool {
return opt.pointer.csv_sequence
}
// CSVQuality returns whether the quality should be included in the CSV output.
// It retrieves the setting from the underlying options.
func (opt Options) CSVQuality() bool {
return opt.pointer.csv_quality
}
// CSVKeys returns the list of keys to include in the CSV output.
// It retrieves the keys from the underlying options.
func (opt Options) CSVKeys() []string {
return opt.pointer.csv_keys
}
// CSVSeparator returns the separator used in the CSV output.
// It retrieves the separator from the underlying options.
func (opt Options) CSVSeparator() string {
return opt.pointer.csv_separator
}
// CSVNAValue returns the value used for NA entries in the CSV output.
// It retrieves the NA value from the underlying options.
func (opt Options) CSVNAValue() string {
return opt.pointer.csv_navalue
}
// CSVAutoColumn returns whether to automatically determine the CSV format.
// It retrieves the setting from the underlying options.
func (opt Options) CSVAutoColumn() bool {
return opt.pointer.csv_auto
}
// HasSource returns whether a source has been specified.
// It checks if the source field in the underlying options is not empty.
func (opt Options) HasSource() bool {
return opt.pointer.source != ""
}
// Source returns the source of the data.
// It retrieves the source from the underlying options.
func (opt Options) Source() string {
return opt.pointer.source
}
func OptionFileName(filename string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.filename = filename
})
return f
}
// OptionCloseFile returns a WithOption function that sets the closefile option to true.
func OptionCloseFile() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.closefile = true
})
return f
}
// OptionDontCloseFile returns a WithOption function that sets the closefile option to false.
func OptionDontCloseFile() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.closefile = false
})
return f
}
// OptionsAppendFile returns a WithOption function that sets the appendfile option.
// Parameters:
// - append: A boolean indicating whether to append to the file.
func OptionsAppendFile(append bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.appendfile = append
})
return f
}
// OptionNoOrder returns a WithOption function that sets the no_order option.
// Parameters:
// - no_order: A boolean indicating whether to process items in no specific order.
func OptionNoOrder(no_order bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.no_order = no_order
})
return f
}
// OptionsCompressed returns a WithOption function that sets the compressed option.
// Parameters:
// - compressed: A boolean indicating whether the input data is compressed.
func OptionsCompressed(compressed bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.compressed = compressed
})
return f
}
// OptionsSkipEmptySequence returns a WithOption function that sets the skip_empty option.
// Parameters:
// - skip: A boolean indicating whether to skip empty sequences.
func OptionsSkipEmptySequence(skip bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.skip_empty = skip
})
return f
}
// OptionsNewFile returns a WithOption function that sets the appendfile option to false,
// indicating that a new file should be created instead of appending to an existing one.
func OptionsNewFile() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.appendfile = false
})
return f
}
// OptionsParallelWorkers returns a WithOption function that sets the number of parallel workers.
// Parameters:
// - nworkers: An integer specifying the number of parallel workers to use.
func OptionsParallelWorkers(nworkers int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.parallel_workers = nworkers
})
return f
}
// OptionsBatchSize returns a WithOption function that sets the batch_size option.
// Parameters:
// - size: An integer specifying the size of the batch to be processed.
func OptionsBatchSize(size int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.batch_size = size
})
return f
}
// OptionsBatchSizeDefault returns a WithOption function that sets the default batch_size option.
// Parameters:
// - bp: An integer specifying the default size of the batch to be processed.
func OptionsBatchSizeDefault(bp int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.batch_size = bp
})
return f
}
// OptionsFullFileBatch returns a WithOption function that sets the full_file_batch option.
// Parameters:
// - full: A boolean indicating whether to process the full file in a single batch.
func OptionsFullFileBatch(full bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.full_file_batch = full
})
return f
}
// OptionsSource returns a WithOption function that sets the source option.
// Parameters:
// - source: A string specifying the source of the data.
func OptionsSource(source string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.source = source
})
return f
}
// OptionsWithProgressBar returns a WithOption function that sets the with_progress_bar option to true,
// indicating that a progress bar should be displayed during processing.
func OptionsWithProgressBar() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_progress_bar = true
})
return f
}
// OptionsWithoutProgressBar returns a WithOption function that sets the with_progress_bar option to false,
// indicating that a progress bar should not be displayed during processing.
func OptionsWithoutProgressBar() WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_progress_bar = false
})
return f
}
// CSVId returns a WithOption function that sets the csv_id option.
// Parameters:
// - include: A boolean indicating whether to include the ID in the CSV output.
func CSVId(include bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_id = include
})
return f
}
// CSVSequence returns a WithOption function that sets the csv_sequence option.
// Parameters:
// - include: A boolean indicating whether to include the sequence in the CSV output.
func CSVSequence(include bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_sequence = include
})
return f
}
// CSVQuality returns a WithOption function that sets the csv_quality option.
// Parameters:
// - include: A boolean indicating whether to include the quality in the CSV output.
func CSVQuality(include bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_quality = include
})
return f
}
// CSVDefinition returns a WithOption function that sets the csv_definition option.
// Parameters:
// - include: A boolean indicating whether to include the definition in the CSV output.
func CSVDefinition(include bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_definition = include
})
return f
}
// CSVCount returns a WithOption function that sets the csv_count option.
// Parameters:
// - include: A boolean indicating whether to include the count in the CSV output.
func CSVCount(include bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_count = include
})
return f
}
// CSVTaxon returns a WithOption function that sets the csv_taxon option.
// Parameters:
// - include: A boolean indicating whether to include the taxon in the CSV output.
func CSVTaxon(include bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_taxon = include
})
return f
}
// CSVKey returns a WithOption function that adds a key to the list of keys to include in the CSV output.
// Parameters:
// - key: A string specifying the key to include in the CSV output.
func CSVKey(key string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_keys = append(opt.pointer.csv_keys, key)
})
return f
}
// CSVKeys returns a WithOption function that adds multiple keys to the list of keys to include in the CSV output.
// Parameters:
// - keys: A slice of strings specifying the keys to include in the CSV output.
func CSVKeys(keys []string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_keys = append(opt.pointer.csv_keys, keys...)
})
return f
}
// CSVSeparator returns a WithOption function that sets the separator used in the CSV output.
// Parameters:
// - separator: A string specifying the separator to use in the CSV output.
func CSVSeparator(separator string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_separator = separator
})
return f
}
// CSVNAValue returns a WithOption function that sets the value used for NA entries in the CSV output.
// Parameters:
// - navalue: A string specifying the value to use for NA entries in the CSV output.
func CSVNAValue(navalue string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_navalue = navalue
})
return f
}
// CSVAutoColumn returns a WithOption function that sets the csv_auto option.
// Parameters:
// - auto: A boolean indicating whether to automatically determine the CSV format.
func CSVAutoColumn(auto bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.csv_auto = auto
})
return f
}

341
pkg/obitools/obicsv/iter.go Normal file
View File

@ -0,0 +1,341 @@
package obicsv
import (
"fmt"
"sync"
"sync/atomic"
"time"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"github.com/tevino/abool/v2"
log "github.com/sirupsen/logrus"
)
type CSVHeader []string
type CSVRecord map[string]interface{}
type CSVRecordBatch struct {
source string
data []CSVRecord
order int
}
var NilCSVRecordBatch = CSVRecordBatch{"", nil, -1}
// Structure implementing an iterator over bioseq.BioSequenceBatch
// based on a channel.
type ICSVRecord struct {
channel chan CSVRecordBatch
current CSVRecordBatch
pushBack *abool.AtomicBool
all_done *sync.WaitGroup
lock *sync.RWMutex
buffer_size int32
batch_size int32
sequence_format string
finished *abool.AtomicBool
header CSVHeader
}
var NilIBioSequenceBatch = (*ICSVRecord)(nil)
func NewICSVRecord() *ICSVRecord {
i := ICSVRecord{
channel: make(chan CSVRecordBatch),
current: NilCSVRecordBatch,
pushBack: abool.New(),
batch_size: -1,
sequence_format: "",
finished: abool.New(),
header: make(CSVHeader, 0),
}
waiting := sync.WaitGroup{}
i.all_done = &waiting
lock := sync.RWMutex{}
i.lock = &lock
obiiter.RegisterAPipe()
return &i
}
func MakeCSVRecordBatch(source string, order int, data []CSVRecord) CSVRecordBatch {
return CSVRecordBatch{
source: source,
order: order,
data: data,
}
}
func (batch *CSVRecordBatch) Order() int {
return batch.order
}
func (batch *CSVRecordBatch) Source() string {
return batch.source
}
func (batch *CSVRecordBatch) Slice() []CSVRecord {
return batch.data
}
// NotEmpty returns whether the BioSequenceBatch is empty or not.
//
// It checks if the BioSequenceSlice contained within the BioSequenceBatch is not empty.
//
// Returns:
// - bool: True if the BioSequenceBatch is not empty, false otherwise.
func (batch *CSVRecordBatch) NotEmpty() bool {
return len(batch.data) > 0
}
// IsNil checks if the BioSequenceBatch's slice is nil.
//
// This function takes a BioSequenceBatch as a parameter and returns a boolean value indicating whether the slice of the BioSequenceBatch is nil or not.
//
// Parameters:
// - batch: The BioSequenceBatch to check for nil slice.
//
// Returns:
// - bool: True if the BioSequenceBatch's slice is nil, false otherwise.
func (batch *CSVRecordBatch) IsNil() bool {
return batch.data == nil
}
func (iterator *ICSVRecord) Add(n int) {
if iterator == nil {
log.Panic("call of ICSVRecord.Add method on NilIBioSequenceBatch")
}
iterator.all_done.Add(n)
}
func (iterator *ICSVRecord) Done() {
if iterator == nil {
log.Panic("call of ICSVRecord.Done method on NilIBioSequenceBatch")
}
iterator.all_done.Done()
}
func (iterator *ICSVRecord) Unlock() {
if iterator == nil {
log.Panic("call of ICSVRecord.Unlock method on NilIBioSequenceBatch")
}
iterator.lock.Unlock()
}
func (iterator *ICSVRecord) Lock() {
if iterator == nil {
log.Panic("call of ICSVRecord.Lock method on NilIBioSequenceBatch")
}
iterator.lock.Lock()
}
func (iterator *ICSVRecord) RLock() {
if iterator == nil {
log.Panic("call of ICSVRecord.RLock method on NilIBioSequenceBatch")
}
iterator.lock.RLock()
}
func (iterator *ICSVRecord) RUnlock() {
if iterator == nil {
log.Panic("call of ICSVRecord.RUnlock method on NilIBioSequenceBatch")
}
iterator.lock.RUnlock()
}
func (iterator *ICSVRecord) Wait() {
if iterator == nil {
log.Panic("call of ICSVRecord.Wait method on NilIBioSequenceBatch")
}
iterator.all_done.Wait()
}
func (iterator *ICSVRecord) Channel() chan CSVRecordBatch {
if iterator == nil {
log.Panic("call of ICSVRecord.Channel method on NilIBioSequenceBatch")
}
return iterator.channel
}
func (iterator *ICSVRecord) IsNil() bool {
if iterator == nil {
log.Panic("call of ICSVRecord.IsNil method on NilIBioSequenceBatch")
}
return iterator == nil
}
func (iterator *ICSVRecord) BatchSize() int {
if iterator == nil {
log.Panic("call of ICSVRecord.BatchSize method on NilIBioSequenceBatch")
}
return int(atomic.LoadInt32(&iterator.batch_size))
}
func (iterator *ICSVRecord) SetBatchSize(size int) error {
if size >= 0 {
atomic.StoreInt32(&iterator.batch_size, int32(size))
return nil
}
return fmt.Errorf("size (%d) cannot be negative", size)
}
func (iterator *ICSVRecord) Split() *ICSVRecord {
iterator.lock.RLock()
defer iterator.lock.RUnlock()
i := ICSVRecord{
channel: iterator.channel,
current: NilCSVRecordBatch,
pushBack: abool.New(),
all_done: iterator.all_done,
buffer_size: iterator.buffer_size,
batch_size: iterator.batch_size,
sequence_format: iterator.sequence_format,
finished: iterator.finished,
header: iterator.header,
}
lock := sync.RWMutex{}
i.lock = &lock
return &i
}
func (iterator *ICSVRecord) Header() CSVHeader {
return iterator.header
}
func (iterator *ICSVRecord) SetHeader(header CSVHeader) {
iterator.header = header
}
func (iterator *ICSVRecord) AppendField(field string) {
iterator.header = append(iterator.header, field)
}
func (iterator *ICSVRecord) Next() bool {
if iterator.pushBack.IsSet() {
iterator.pushBack.UnSet()
return true
}
if iterator.finished.IsSet() {
return false
}
next, ok := (<-iterator.channel)
if ok {
iterator.current = next
return true
}
iterator.current = NilCSVRecordBatch
iterator.finished.Set()
return false
}
func (iterator *ICSVRecord) PushBack() {
if !iterator.current.IsNil() {
iterator.pushBack.Set()
}
}
// The 'Get' method returns the instance of BioSequenceBatch
// currently pointed by the iterator. You have to use the
// 'Next' method to move to the next entry before calling
// 'Get' to retreive the following instance.
func (iterator *ICSVRecord) Get() CSVRecordBatch {
return iterator.current
}
func (iterator *ICSVRecord) Push(batch CSVRecordBatch) {
if batch.IsNil() {
log.Panicln("A Nil batch is pushed on the channel")
}
// if batch.Len() == 0 {
// log.Panicln("An empty batch is pushed on the channel")
// }
iterator.channel <- batch
}
func (iterator *ICSVRecord) Close() {
close(iterator.channel)
obiiter.UnregisterPipe()
}
func (iterator *ICSVRecord) WaitAndClose() {
iterator.Wait()
for len(iterator.Channel()) > 0 {
time.Sleep(time.Millisecond)
}
iterator.Close()
}
// Finished returns 'true' value if no more data is available
// from the iterator.
func (iterator *ICSVRecord) Finished() bool {
return iterator.finished.IsSet()
}
// Sorting the batches of sequences.
func (iterator *ICSVRecord) SortBatches(sizes ...int) *ICSVRecord {
newIter := NewICSVRecord()
newIter.Add(1)
go func() {
newIter.WaitAndClose()
}()
next_to_send := 0
//log.Println("wait for batch #", next_to_send)
received := make(map[int]CSVRecordBatch)
go func() {
for iterator.Next() {
batch := iterator.Get()
// log.Println("\nPushd seq #\n", batch.order, next_to_send)
if batch.order == next_to_send {
newIter.channel <- batch
next_to_send++
//log.Println("\nwait for batch #\n", next_to_send)
batch, ok := received[next_to_send]
for ok {
newIter.channel <- batch
delete(received, next_to_send)
next_to_send++
batch, ok = received[next_to_send]
}
} else {
received[batch.order] = batch
}
}
newIter.Done()
}()
return newIter
}
func (iterator *ICSVRecord) Consume() {
for iterator.Next() {
iterator.Get()
}
}

View File

@ -3,59 +3,78 @@ package obicsv
import (
log "github.com/sirupsen/logrus"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obiconvert"
)
func CLIWriteCSV(iterator obiiter.IBioSequence,
terminalAction bool, filenames ...string) (obiiter.IBioSequence, error) {
func CLIWriteSequenceCSV(iterator obiiter.IBioSequence,
terminalAction bool, filenames ...string) *ICSVRecord {
if obiconvert.CLIProgressBar() {
iterator = iterator.Speed("Writing CSV")
}
var newIter obiiter.IBioSequence
opts := make([]obiformats.WithOption, 0, 10)
opts := make([]WithOption, 0, 10)
nworkers := obioptions.CLIParallelWorkers() / 4
if nworkers < 2 {
nworkers = 2
}
opts = append(opts, obiformats.OptionsParallelWorkers(nworkers))
opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize()))
opts = append(opts, obiformats.OptionsCompressed(obiconvert.CLICompressed()))
opts = append(opts, OptionsParallelWorkers(nworkers))
opts = append(opts, OptionsBatchSize(obioptions.CLIBatchSize()))
opts = append(opts, OptionsCompressed(obiconvert.CLICompressed()))
opts = append(opts, obiformats.CSVId(CLIPrintId()),
obiformats.CSVCount(CLIPrintCount()),
obiformats.CSVTaxon(CLIPrintTaxon()),
obiformats.CSVDefinition(CLIPrintDefinition()),
obiformats.CSVKeys(CLIToBeKeptAttributes()),
obiformats.CSVSequence(CLIPrintSequence()),
obiformats.CSVAutoColumn(CLIAutoColumns()),
opts = append(opts, CSVId(CLIPrintId()),
CSVCount(CLIPrintCount()),
CSVTaxon(CLIPrintTaxon()),
CSVDefinition(CLIPrintDefinition()),
CSVKeys(CLIToBeKeptAttributes()),
CSVSequence(CLIPrintSequence()),
CSVAutoColumn(CLIAutoColumns()),
)
var err error
csvIter := NewCSVSequenceIterator(iterator, opts...)
newIter := CLICSVWriter(csvIter, terminalAction, opts...)
if len(filenames) == 0 {
newIter, err = obiformats.WriteCSVToStdout(iterator, opts...)
} else {
newIter, err = obiformats.WriteCSVToFile(iterator, filenames[0], opts...)
return newIter
}
func CLICSVWriter(iterator *ICSVRecord,
terminalAction bool,
options ...WithOption) *ICSVRecord {
var err error
var newIter *ICSVRecord
if obiconvert.CLIOutPutFileName() != "-" {
options = append(options, OptionFileName(obiconvert.CLIOutPutFileName()))
}
if err != nil {
log.Fatalf("Write file error: %v", err)
return obiiter.NilIBioSequence, err
opt := MakeOptions(options)
if opt.FileName() != "-" {
newIter, err = WriteCSVToFile(iterator, opt.FileName(), options...)
if err != nil {
log.Fatalf("Cannot write to file : %+v", err)
}
} else {
newIter, err = WriteCSVToStdout(iterator, options...)
if err != nil {
log.Fatalf("Cannot write to stdout : %+v", err)
}
}
if terminalAction {
newIter.Recycle()
return obiiter.NilIBioSequence, nil
newIter.Consume()
return nil
}
return newIter, nil
return newIter
}

View File

@ -0,0 +1,140 @@
package obicsv
import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
log "github.com/sirupsen/logrus"
)
func CSVSequenceHeader(opt Options) CSVHeader {
keys := opt.CSVKeys()
record := make([]string, 0, len(keys)+4)
if opt.CSVId() {
record = append(record, "id")
}
if opt.CSVCount() {
record = append(record, "count")
}
if opt.CSVTaxon() {
record = append(record, "taxid")
}
if opt.CSVDefinition() {
record = append(record, "definition")
}
record = append(record, opt.CSVKeys()...)
if opt.CSVSequence() {
record = append(record, "sequence")
}
if opt.CSVQuality() {
record = append(record, "quality")
}
return record
}
func CSVBatchFromSequences(batch obiiter.BioSequenceBatch, opt Options) CSVRecordBatch {
keys := opt.CSVKeys()
csvslice := make([]CSVRecord, batch.Len())
for i, sequence := range batch.Slice() {
record := make(CSVRecord)
if opt.CSVId() {
record["id"] = sequence.Id()
}
if opt.CSVCount() {
record["count"] = sequence.Count()
}
if opt.CSVTaxon() {
var taxid string
taxon := sequence.Taxon(nil)
if taxon != nil {
taxid = taxon.String()
} else {
taxid = sequence.Taxid()
}
record["taxid"] = taxid
}
if opt.CSVDefinition() {
record["definition"] = sequence.Definition()
}
for _, key := range keys {
value, ok := sequence.GetAttribute(key)
if !ok {
value = opt.CSVNAValue()
}
record[key] = value
}
if opt.CSVSequence() {
record["sequence"] = string(sequence.Sequence())
}
if opt.CSVQuality() {
if sequence.HasQualities() {
l := sequence.Len()
q := sequence.Qualities()
ascii := make([]byte, l)
quality_shift := obioptions.OutputQualityShift()
for j := 0; j < l; j++ {
ascii[j] = uint8(q[j]) + uint8(quality_shift)
}
record["quality"] = string(ascii)
} else {
record["quality"] = opt.CSVNAValue()
}
}
csvslice[i] = record
}
return MakeCSVRecordBatch(batch.Source(), batch.Order(), csvslice)
}
func NewCSVSequenceIterator(iter obiiter.IBioSequence, options ...WithOption) *ICSVRecord {
opt := MakeOptions(options)
newIter := NewICSVRecord()
newIter.SetHeader(CSVSequenceHeader(opt))
log.Warnf("", newIter.Header())
nwriters := opt.ParallelWorkers()
newIter.Add(nwriters)
go func() {
newIter.WaitAndClose()
}()
ff := func(iterator obiiter.IBioSequence) {
for iterator.Next() {
batch := iterator.Get()
newIter.Push(CSVBatchFromSequences(batch, opt))
}
newIter.Done()
}
go ff(iter)
for i := 0; i < nwriters-1; i++ {
go ff(iter.Split())
}
return newIter
}

View File

@ -0,0 +1,156 @@
package obicsv
import (
"bytes"
"encoding/csv"
"io"
"os"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiformats"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiutils"
log "github.com/sirupsen/logrus"
)
func FormatCVSBatch(batch CSVRecordBatch, header CSVHeader, navalue string) *bytes.Buffer {
buff := new(bytes.Buffer)
csv := csv.NewWriter(buff)
log.Warn("Header:", header)
if batch.Order() == 0 {
csv.Write(header)
}
for _, s := range batch.Slice() {
data := make([]string, len(header))
for i, key := range header {
var sval string
val, ok := s[key]
if !ok {
sval = navalue
} else {
var err error
sval, err = obiutils.InterfaceToString(val)
if err != nil {
sval = navalue
}
}
data[i] = sval
}
csv.Write(data)
}
csv.Flush()
return buff
}
func WriteCSV(iterator *ICSVRecord,
file io.WriteCloser,
options ...WithOption) (*ICSVRecord, error) {
opt := MakeOptions(options)
file, _ = obiutils.CompressStream(file, opt.CompressedFile(), opt.CloseFile())
newIter := NewICSVRecord()
nwriters := opt.ParallelWorkers()
chunkchan := obiformats.WriteSeqFileChunk(file, opt.CloseFile())
newIter.Add(nwriters)
go func() {
newIter.WaitAndClose()
close(chunkchan)
log.Debugf("Writing CSV file done")
}()
ff := func(iterator *ICSVRecord) {
for iterator.Next() {
batch := iterator.Get()
log.Debugf("Formating CSV chunk %d", batch.Order())
ss := obiformats.SeqFileChunk{
Source: batch.Source(),
Raw: FormatCVSBatch(
batch,
iterator.Header(),
opt.CSVNAValue(),
),
Order: batch.Order(),
}
chunkchan <- ss
log.Debugf("CSV chunk %d formated", batch.Order())
newIter.Push(batch)
}
newIter.Done()
}
log.Debugln("Start of the CSV file writing")
go ff(iterator)
for i := 0; i < nwriters-1; i++ {
go ff(iterator.Split())
}
return newIter, nil
}
// WriteFastaToStdout writes the given bio sequence iterator to standard output in FASTA format.
//
// The function takes an iterator of bio sequences as the first parameter and optional
// configuration options as variadic arguments. It appends the option to not close the file
// to the options slice and then calls the WriteFasta function passing the iterator,
// os.Stdout as the output file, and the options slice.
//
// The function returns the same bio sequence iterator and an error if any occurred.
func WriteCSVToStdout(iterator *ICSVRecord,
options ...WithOption) (*ICSVRecord, error) {
// options = append(options, OptionDontCloseFile())
options = append(options, OptionCloseFile())
return WriteCSV(iterator, os.Stdout, options...)
}
// WriteFastaToFile writes the given iterator of biosequences to a file with the specified filename,
// using the provided options. It returns the updated iterator and any error that occurred.
//
// Parameters:
// - iterator: The biosequence iterator to write to the file.
// - filename: The name of the file to write to.
// - options: Zero or more optional parameters to customize the writing process.
//
// Returns:
// - obiiter.IBioSequence: The updated biosequence iterator.
// - error: Any error that occurred during the writing process.
func WriteCSVToFile(iterator *ICSVRecord,
filename string,
options ...WithOption) (*ICSVRecord, error) {
opt := MakeOptions(options)
flags := os.O_WRONLY | os.O_CREATE
if opt.AppendFile() {
flags |= os.O_APPEND
} else {
flags |= os.O_TRUNC
}
file, err := os.OpenFile(filename, flags, 0660)
if err != nil {
log.Fatalf("open file error: %v", err)
return nil, err
}
options = append(options, OptionCloseFile())
iterator, err = WriteCSV(iterator, file, options...)
return iterator, err
}

View File

@ -1,77 +1,300 @@
package obifind
import (
"fmt"
"slices"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitax"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obicsv"
)
func IFilterRankRestriction() func(*obitax.ITaxon) *obitax.ITaxon {
f := func(s *obitax.ITaxon) *obitax.ITaxon {
return s
type __options__ struct {
batch_size int // Number of items to process in a batch
with_pattern bool
with_parent bool
with_path bool
with_rank bool
with_scientific_name bool
raw_taxid bool
with_metadata []string
source string // Source of the data
}
// Options wraps the __options__ struct to provide a pointer to the options.
type Options struct {
pointer *__options__ // Pointer to the underlying options
}
// WithOption is a function type that takes an Options parameter and modifies it.
type WithOption func(Options)
// MakeOptions creates an Options instance with default settings and applies any provided setters.
// It returns the configured Options.
//
// Parameters:
// - setters: A slice of WithOption functions to customize the options.
//
// Returns:
// - An Options instance with the specified settings.
func MakeOptions(setters []WithOption) Options {
o := __options__{
batch_size: obioptions.CLIBatchSize(), // Number of items to process in a batch
with_pattern: true,
with_parent: false,
with_path: false,
with_rank: true,
with_scientific_name: false,
raw_taxid: false,
source: "unknown",
}
opt := Options{&o}
for _, set := range setters {
set(opt)
}
if __restrict_rank__ != "" {
f = func(s *obitax.ITaxon) *obitax.ITaxon {
return s.IFilterOnTaxRank(__restrict_rank__)
return opt
}
// BatchSize returns the size of the batch to be processed.
// It retrieves the batch size from the underlying options.
func (o *Options) BatchSize() int {
return o.pointer.batch_size
}
// WithPattern returns whether the pattern option is enabled.
// It retrieves the setting from the underlying options.
func (o *Options) WithPattern() bool {
return o.pointer.with_pattern
}
// WithParent returns whether the parent option is enabled.
// It retrieves the setting from the underlying options.
func (o *Options) WithParent() bool {
return o.pointer.with_parent
}
// WithPath returns whether the path option is enabled.
// It retrieves the setting from the underlying options.
func (o *Options) WithPath() bool {
return o.pointer.with_path
}
// WithRank returns whether the rank option is enabled.
// It retrieves the setting from the underlying options.
func (o *Options) WithRank() bool {
return o.pointer.with_rank
}
// WithScientificName returns whether the scientific name option is enabled.
// It retrieves the setting from the underlying options.
func (o *Options) WithScientificName() bool {
return o.pointer.with_scientific_name
}
// RawTaxid returns whether the raw taxid option is enabled.
// It retrieves the setting from the underlying options.
func (o *Options) RawTaxid() bool {
return o.pointer.raw_taxid
}
// Source returns the source of the data.
// It retrieves the source from the underlying options.
func (o *Options) Source() string {
return o.pointer.source
}
// WithMetadata returns a slice of strings containing the metadata
// associated with the Options instance. It retrieves the metadata
// from the pointer's with_metadata field.
func (o *Options) WithMetadata() []string {
if o.WithPattern() {
idx := slices.Index(o.pointer.with_metadata, "query")
if idx >= 0 {
o.pointer.with_metadata = slices.Delete(o.pointer.with_metadata, idx, idx+1)
}
}
return o.pointer.with_metadata
}
// OptionsBatchSize returns a WithOption function that sets the batch_size option.
// Parameters:
// - size: An integer specifying the size of the batch to be processed.
func OptionsBatchSize(size int) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.batch_size = size
})
return f
}
func ITaxonNameMatcher() (func(string) *obitax.ITaxon, error) {
taxonomy := obitax.DefaultTaxonomy()
func OptionsWithPattern(value bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_pattern = value
})
fun := func(name string) *obitax.ITaxon {
return taxonomy.IFilterOnName(name, __fixed_pattern__)
}
return fun, nil
return f
}
func ITaxonRestrictions() (func(*obitax.ITaxon) *obitax.ITaxon, error) {
func OptionsWithParent(value bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_parent = value
})
clades, err := CLITaxonomicalRestrictions()
if err != nil {
return nil, err
}
rankfilter := IFilterRankRestriction()
fun := func(iterator *obitax.ITaxon) *obitax.ITaxon {
return rankfilter(iterator).IFilterBelongingSubclades(clades)
}
return fun, nil
return f
}
func TaxonAsString(taxon *obitax.Taxon, pattern string) string {
// var text string
// if __with_path__ {
// var bf bytes.Buffer
// path := taxon.Path()
func OptionsWithPath(value bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_path = value
})
// bf.WriteString(path.Get(path.Len() - 1).ScientificName())
// for i := path.Len() - 2; i >= 0; i-- {
// fmt.Fprintf(&bf, ":%s", path.Get(i).ScientificName())
// }
// text = bf.String()
// }
return fmt.Sprintf("%-20s | %10s | %10s | %-20s",
pattern,
taxon.String(),
taxon.Parent().String(),
taxon.Rank())
return f
}
func TaxonWriter(itaxa *obitax.ITaxon, pattern string) {
func OptionsWithRank(value bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_rank = value
})
for itaxa.Next() {
fmt.Println(TaxonAsString(itaxa.Get(), pattern))
return f
}
func OptionsWithScientificName(value bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_scientific_name = value
})
return f
}
func OptionsRawTaxid(value bool) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.raw_taxid = value
})
return f
}
func OptionsSource(value string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.source = value
})
return f
}
func OptionsWithMetadata(values ...string) WithOption {
f := WithOption(func(opt Options) {
opt.pointer.with_metadata = values
})
return f
}
func NewCSVTaxaIterator(iterator *obitax.ITaxon, options ...WithOption) *obicsv.ICSVRecord {
opt := MakeOptions(options)
metakeys := make([]string, 0)
newIter := obicsv.NewICSVRecord()
newIter.Add(1)
batch_size := opt.BatchSize()
if opt.WithPattern() {
newIter.AppendField("query")
opt.pointer.with_metadata = append(opt.pointer.with_metadata, "query")
}
newIter.AppendField("taxid")
rawtaxid := opt.RawTaxid()
if opt.WithParent() {
newIter.AppendField("parent")
}
if opt.WithRank() {
newIter.AppendField("taxonomic_rank")
}
if opt.WithScientificName() {
newIter.AppendField("scientific_name")
}
if opt.WithMetadata() != nil {
metakeys = opt.WithMetadata()
for _, metadata := range metakeys {
newIter.AppendField(metadata)
}
}
if opt.WithPath() {
newIter.AppendField("path")
}
go func() {
newIter.WaitAndClose()
}()
go func() {
o := 0
data := make([]obicsv.CSVRecord, 0, batch_size)
for iterator.Next() {
taxon := iterator.Get()
record := make(obicsv.CSVRecord)
if opt.WithPattern() {
record["query"] = taxon.MetadataAsString("query")
}
if rawtaxid {
record["taxid"] = *taxon.Node.Id()
} else {
record["taxid"] = taxon.String()
}
if opt.WithParent() {
if rawtaxid {
record["parent"] = *taxon.Node.ParentId()
} else {
record["parent"] = taxon.Parent().String()
}
}
if opt.WithRank() {
record["taxonomic_rank"] = taxon.Rank()
}
if opt.WithScientificName() {
record["scientific_name"] = taxon.ScientificName()
}
if opt.WithPath() {
record["path"] = taxon.Path().String()
}
for _, key := range metakeys {
record[key] = taxon.MetadataAsString(key)
}
data = append(data, record)
if len(data) >= batch_size {
newIter.Push(obicsv.MakeCSVRecordBatch(opt.Source(), o, data))
data = make([]obicsv.CSVRecord, 0, batch_size)
o++
}
}
if len(data) > 0 {
newIter.Push(obicsv.MakeCSVRecordBatch(opt.Source(), o, data))
}
newIter.Done()
}()
return newIter
}

View File

@ -0,0 +1,64 @@
package obifind
import (
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obioptions"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitax"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitools/obicsv"
log "github.com/sirupsen/logrus"
)
func CLITaxonRestrictions(iterator *obitax.ITaxon) *obitax.ITaxon {
if iterator == nil {
return nil
}
clades, err := CLITaxonomicalRestrictions()
if err != nil {
log.Errorf("Error on taxonomy restriction: %v", err)
return nil
}
iterator = CLIFilterRankRestriction(iterator.Split().IFilterBelongingSubclades(clades))
return iterator
}
func CLIFilterRankRestriction(iterator *obitax.ITaxon) *obitax.ITaxon {
if iterator == nil {
return nil
}
rr := CLIRankRestriction()
if rr != "" {
iterator = iterator.IFilterOnTaxRank(rr)
}
return iterator
}
func CLICSVTaxaIterator(iterator *obitax.ITaxon) *obicsv.ICSVRecord {
if iterator == nil {
return nil
}
options := make([]WithOption, 0)
options = append(options,
OptionsWithPattern(CLIWithQuery()),
OptionsWithParent(CLIWithParent()),
OptionsWithRank(CLIWithRank()),
OptionsWithScientificName(CLIWithScientificName()),
OptionsWithPath(CLIWithPath()),
OptionsRawTaxid(CLIRawTaxid()),
OptionsSource(obioptions.CLISelectedNCBITaxDump()),
)
return NewCSVTaxaIterator(iterator, options...)
}
func CLICSVTaxaWriter(iterator *obitax.ITaxon, terminalAction bool) *obicsv.ICSVRecord {
return obicsv.CLICSVWriter(CLICSVTaxaIterator(iterator), terminalAction)
}

View File

@ -13,6 +13,11 @@ var __taxonomical_restriction__ = make([]string, 0)
var __fixed_pattern__ = false
var __with_path__ = false
var __with_query__ = false
var __without_rank__ = false
var __without_parent__ = false
var __with_scientific_name__ = false
var __raw_taxid__ = false
var __taxid_path__ = "NA"
var __taxid_sons__ = "NA"
var __restrict_rank__ = ""
@ -27,6 +32,37 @@ func FilterTaxonomyOptionSet(options *getoptions.GetOpt) {
options.Description("Restrict output to some subclades."))
}
func OptionSet(options *getoptions.GetOpt) {
obioptions.LoadTaxonomyOptionSet(options, true, true)
FilterTaxonomyOptionSet(options)
options.BoolVar(&__fixed_pattern__, "fixed", false,
options.Alias("F"),
options.Description("Match taxon names using a fixed pattern, not a regular expression"))
options.StringVar(&__taxid_path__, "parents", "NA",
options.Alias("p"),
options.Description("Displays every parental tree's information for the provided taxid."))
options.StringVar(&__restrict_rank__, "rank", "",
options.Description("Restrict to the given taxonomic rank."))
options.BoolVar(&__without_parent__, "without-parent", __without_parent__,
options.Description("Adds a column containing the parent's taxonid for each displayed taxon."))
options.StringVar(&__taxid_sons__, "sons", "NA",
options.Alias("s"),
options.Description("Displays every sons' tree's information for the provided taxid."))
options.BoolVar(&__with_path__, "with-path", false,
options.Description("Adds a column containing the full path for each displayed taxon."))
options.BoolVar(&__without_rank__, "without-rank", __without_rank__,
options.Alias("R"),
options.Description("Adds a column containing the taxonomic rank for each displayed taxon."))
options.BoolVar(&__with_query__, "with-query", false,
options.Alias("P"),
options.Description("Adds a column containing query used to filter taxon name for each displayed taxon."))
options.BoolVar(&__with_scientific_name__, "with-scientific-name", false,
options.Alias("S"),
options.Description("Adds a column containing the scientific name for each displayed taxon."))
options.BoolVar(&__raw_taxid__, "raw-taxid", false,
options.Description("Displays the raw taxid for each displayed taxon."))
}
func CLITaxonomicalRestrictions() (*obitax.TaxonSet, error) {
taxonomy := obitax.DefaultTaxonomy()
@ -52,22 +88,6 @@ func CLITaxonomicalRestrictions() (*obitax.TaxonSet, error) {
return ts, nil
}
func OptionSet(options *getoptions.GetOpt) {
obioptions.LoadTaxonomyOptionSet(options, true, true)
FilterTaxonomyOptionSet(options)
options.BoolVar(&__fixed_pattern__, "fixed", false,
options.Alias("F"),
options.Description("Match taxon names using a fixed pattern, not a regular expression"))
options.BoolVar(&__with_path__, "with-path", false,
options.Alias("P"),
options.Description("Adds a column containing the full path for each displayed taxon."))
options.StringVar(&__taxid_path__, "parents", "NA",
options.Alias("p"),
options.Description("Displays every parental tree's information for the provided taxid."))
options.StringVar(&__restrict_rank__, "rank", "",
options.Description("Restrict to the given taxonomic rank."))
}
func CLIRequestsPathForTaxid() string {
return __taxid_path__
}
@ -75,3 +95,35 @@ func CLIRequestsPathForTaxid() string {
func CLIRequestsSonsForTaxid() string {
return __taxid_sons__
}
func CLIWithParent() bool {
return !__without_parent__
}
func CLIWithPath() bool {
return __with_path__
}
func CLIWithRank() bool {
return !__without_rank__
}
func CLIWithScientificName() bool {
return __with_scientific_name__
}
func CLIRawTaxid() bool {
return __raw_taxid__
}
func CLIRankRestriction() string {
return __restrict_rank__
}
func CLIFixedPattern() bool {
return __fixed_pattern__
}
func CLIWithQuery() bool {
return __with_query__
}