From a33e471b39bf24da471badfc3e6bb14becef366c Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Mon, 27 Mar 2023 19:51:10 +0700 Subject: [PATCH] First attempt for obiconsensus... The graph traversing algorithm is too simple Former-commit-id: 0456e6c7fd55d6d0fcf9856c40386b976b912cba --- cmd/obitools/obiconsensus/main.go | 33 ++ pkg/obiformats/batch_of_files_reader.go | 4 +- pkg/obiformats/ecopcr_read.go | 12 +- pkg/obiformats/embl_read.go | 15 +- pkg/obiformats/fastseq_read.go | 45 ++- pkg/obiformats/genbank_read.go | 15 +- pkg/obiformats/options.go | 34 +- pkg/obiformats/universal_read.go | 4 + pkg/obiiter/batchiterator.go | 32 +- pkg/obikmer/debruijn.go | 376 +++++++++++++++++++++ pkg/obiseq/biosequence.go | 17 + pkg/obisuffix/suffix_array.go | 137 ++++++++ pkg/obitools/obiconsensus/obiconsensus.go | 123 +++++++ pkg/obitools/obiconsensus/options.go | 13 + pkg/obitools/obiconvert/options.go | 9 + pkg/obitools/obiconvert/sequence_reader.go | 6 +- pkg/obiutils/path.go | 16 + 17 files changed, 868 insertions(+), 23 deletions(-) create mode 100644 cmd/obitools/obiconsensus/main.go create mode 100644 pkg/obikmer/debruijn.go create mode 100644 pkg/obisuffix/suffix_array.go create mode 100644 pkg/obitools/obiconsensus/obiconsensus.go create mode 100644 pkg/obitools/obiconsensus/options.go create mode 100644 pkg/obiutils/path.go diff --git a/cmd/obitools/obiconsensus/main.go b/cmd/obitools/obiconsensus/main.go new file mode 100644 index 0000000..2f5d61a --- /dev/null +++ b/cmd/obitools/obiconsensus/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "os" + + log "github.com/sirupsen/logrus" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconsensus" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obioptions" +) + +func main() { + optionParser := obioptions.GenerateOptionParser(obiconvert.OptionSet) + + _, args := optionParser(os.Args) + obiconvert.SetFullFileBatch() + + fs, err := obiconvert.CLIReadBioSequences(args...) + + if err != nil { + log.Errorf("Cannot open file (%v)", err) + os.Exit(1) + } + + consensus := obiconsensus.Consensus(fs, 0.99) + obiconvert.CLIWriteBioSequences(consensus, true) + + obiiter.WaitForLastPipe() + +} diff --git a/pkg/obiformats/batch_of_files_reader.go b/pkg/obiformats/batch_of_files_reader.go index 4395daa..df714da 100644 --- a/pkg/obiformats/batch_of_files_reader.go +++ b/pkg/obiformats/batch_of_files_reader.go @@ -16,7 +16,7 @@ func ReadSequencesBatchFromFiles(filenames []string, reader = ReadSequencesFromFile } - batchiter := obiiter.MakeIBioSequence(0) + batchiter := obiiter.MakeIBioSequence() nextCounter := obiutils.AtomicCounter() batchiter.Add(concurrent_readers) @@ -48,6 +48,8 @@ func ReadSequencesBatchFromFiles(filenames []string, log.Printf("Start reading of file : %s", filename) + + for iter.Next() { batch := iter.Get() batchiter.Push(batch.Reorder(nextCounter())) diff --git a/pkg/obiformats/ecopcr_read.go b/pkg/obiformats/ecopcr_read.go index c3da13e..cf5be7d 100644 --- a/pkg/obiformats/ecopcr_read.go +++ b/pkg/obiformats/ecopcr_read.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "path" "strconv" "strings" @@ -14,6 +15,7 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" ) type __ecopcr_file__ struct { @@ -177,6 +179,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { go func() { seq, err := __read_ecopcr_bioseq__(&ecopcr) + seq.SetSource(opt.Source()) slice := make(obiseq.BioSequenceSlice, 0, opt.BatchSize()) i := 0 ii := 0 @@ -191,6 +194,7 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { } seq, err = __read_ecopcr_bioseq__(&ecopcr) + seq.SetSource(opt.Source()) } if len(slice) > 0 { @@ -205,14 +209,20 @@ func ReadEcoPCR(reader io.Reader, options ...WithOption) obiiter.IBioSequence { }() + if opt.pointer.full_file_batch { + newIter = newIter.FullFileIterator() + } + return newIter } -func ReadEcoPCRBatchFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { +func ReadEcoPCRFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { var reader io.Reader var greader io.Reader var err error + options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename))))) + reader, err = os.Open(filename) if err != nil { log.Printf("open file error: %+v", err) diff --git a/pkg/obiformats/embl_read.go b/pkg/obiformats/embl_read.go index 13fc176..a0f81ec 100644 --- a/pkg/obiformats/embl_read.go +++ b/pkg/obiformats/embl_read.go @@ -5,6 +5,7 @@ import ( "bytes" "io" "os" + "path" "strconv" "strings" @@ -14,6 +15,7 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" ) var _FileChunkSize = 1 << 26 @@ -95,7 +97,7 @@ func _EndOfLastEntry(buff []byte) int { return -1 } -func _ParseEmblFile(input <-chan _FileChunk, out obiiter.IBioSequence) { +func _ParseEmblFile(source string, input <-chan _FileChunk, out obiiter.IBioSequence) { for chunks := range input { scanner := bufio.NewScanner(chunks.raw) @@ -141,7 +143,8 @@ func _ParseEmblFile(input <-chan _FileChunk, out obiiter.IBioSequence) { sequence := obiseq.NewBioSequence(id, bytes.ToLower(seqBytes.Bytes()), defBytes.String()) - + sequence.SetSource(source) + sequence.SetFeatures(featBytes.Bytes()) annot := sequence.Annotations() @@ -257,11 +260,15 @@ func ReadEMBL(reader io.Reader, options ...WithOption) obiiter.IBioSequence { // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { - go _ParseEmblFile(entry_channel, newIter) + go _ParseEmblFile(opt.Source(),entry_channel, newIter) } go _ReadFlatFileChunk(reader, entry_channel) + if opt.pointer.full_file_batch { + newIter = newIter.FullFileIterator() + } + return newIter } @@ -270,6 +277,8 @@ func ReadEMBLFromFile(filename string, options ...WithOption) (obiiter.IBioSeque var greader io.Reader var err error + options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename))))) + reader, err = os.Open(filename) if err != nil { log.Printf("open file error: %+v", err) diff --git a/pkg/obiformats/fastseq_read.go b/pkg/obiformats/fastseq_read.go index 79873b8..e73db30 100644 --- a/pkg/obiformats/fastseq_read.go +++ b/pkg/obiformats/fastseq_read.go @@ -10,15 +10,18 @@ import ( "bytes" "fmt" "os" + "path" "unsafe" log "github.com/sirupsen/logrus" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" ) -func _FastseqReader(seqfile C.fast_kseq_p, +func _FastseqReader(source string, + seqfile C.fast_kseq_p, iterator obiiter.IBioSequence, batch_size int) { var comment string @@ -40,7 +43,7 @@ func _FastseqReader(seqfile C.fast_kseq_p, } rep := obiseq.NewBioSequence(name, bytes.ToLower(sequence), comment) - + rep.SetSource(source) if s.qual.l > C.ulong(0) { cquality := unsafe.Slice(s.qual.s, C.int(s.qual.l)) l := int(s.qual.l) @@ -81,6 +84,9 @@ func _FastseqReader(seqfile C.fast_kseq_p, } func ReadFastSeqFromFile(filename string, options ...WithOption) (obiiter.IBioSequence, error) { + + options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename))))) + opt := MakeOptions(options) name := C.CString(filename) @@ -108,14 +114,20 @@ func ReadFastSeqFromFile(filename string, options ...WithOption) (obiiter.IBioSe newIter := obiiter.MakeIBioSequence() newIter.Add(1) - go func() { - newIter.WaitAndClose() - log.Debugln("End of the fastq file reading") - }() + go func(iter obiiter.IBioSequence) { + iter.WaitAndClose() + log.Debugln("End of the fastx file reading") + }(newIter) - log.Debugln("Start of the fastq file reading") + log.Debugln("Start of the fastx file reading") + + go _FastseqReader(opt.Source(), pointer, newIter, opt.BatchSize()) + + log.Debugln("Full file batch mode : ", opt.FullFileBatch()) + if opt.FullFileBatch() { + newIter = newIter.FullFileIterator() + } - go _FastseqReader(pointer, newIter, opt.BatchSize()) parser := opt.ParseFastSeqHeader() if parser != nil { @@ -126,18 +138,27 @@ func ReadFastSeqFromFile(filename string, options ...WithOption) (obiiter.IBioSe } func ReadFastSeqFromStdin(options ...WithOption) obiiter.IBioSequence { + + options = append(options, OptionsSource("stdin")) + opt := MakeOptions(options) newIter := obiiter.MakeIBioSequence() newIter.Add(1) - go func() { - newIter.WaitAndClose() - }() + go func(iter obiiter.IBioSequence) { + iter.WaitAndClose() + }(newIter) - go _FastseqReader(C.open_fast_sek_stdin(C.int32_t(opt.QualityShift())), + go _FastseqReader(opt.Source(), + C.open_fast_sek_stdin(C.int32_t(opt.QualityShift())), newIter, opt.BatchSize()) + log.Debugln("Full file batch mode : ", opt.FullFileBatch()) + if opt.FullFileBatch() { + newIter = newIter.FullFileIterator() + } + parser := opt.ParseFastSeqHeader() if parser != nil { diff --git a/pkg/obiformats/genbank_read.go b/pkg/obiformats/genbank_read.go index 5b73282..1ca1f59 100644 --- a/pkg/obiformats/genbank_read.go +++ b/pkg/obiformats/genbank_read.go @@ -5,6 +5,7 @@ import ( "bytes" "io" "os" + "path" "strconv" "strings" @@ -14,6 +15,7 @@ import ( "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" ) type gbstate int @@ -26,7 +28,8 @@ const ( inSequence gbstate = 4 ) -func _ParseGenbankFile(input <-chan _FileChunk, out obiiter.IBioSequence) { +func _ParseGenbankFile(source string, + input <-chan _FileChunk, out obiiter.IBioSequence) { state := inHeader @@ -68,6 +71,7 @@ func _ParseGenbankFile(input <-chan _FileChunk, out obiiter.IBioSequence) { sequence := obiseq.NewBioSequence(id, bytes.ToLower(seqBytes.Bytes()), defBytes.String()) + sequence.SetSource(source) state = inHeader sequence.SetFeatures(featBytes.Bytes()) @@ -129,11 +133,15 @@ func ReadGenbank(reader io.Reader, options ...WithOption) obiiter.IBioSequence { // for j := 0; j < opt.ParallelWorkers(); j++ { for j := 0; j < nworkers; j++ { - go _ParseGenbankFile(entry_channel, newIter) + go _ParseGenbankFile(opt.Source(),entry_channel, newIter) } go _ReadFlatFileChunk(reader, entry_channel) + if opt.pointer.full_file_batch { + newIter = newIter.FullFileIterator() + } + return newIter } @@ -142,6 +150,9 @@ func ReadGenbankFromFile(filename string, options ...WithOption) (obiiter.IBioSe var greader io.Reader var err error + options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename))))) + + reader, err = os.Open(filename) if err != nil { log.Printf("open file error: %+v", err) diff --git a/pkg/obiformats/options.go b/pkg/obiformats/options.go index 87780ac..43c3f92 100644 --- a/pkg/obiformats/options.go +++ b/pkg/obiformats/options.go @@ -10,6 +10,7 @@ type __options__ struct { with_progress_bar bool buffer_size int batch_size int + full_file_batch bool quality_shift int parallel_workers int closefile bool @@ -25,6 +26,7 @@ type __options__ struct { csv_separator string csv_navalue string paired_filename string + source string } type Options struct { @@ -42,6 +44,7 @@ func MakeOptions(setters []WithOption) Options { quality_shift: 33, parallel_workers: 4, batch_size: 5000, + full_file_batch: false, closefile: false, appendfile: false, compressed: false, @@ -52,9 +55,10 @@ func MakeOptions(setters []WithOption) Options { csv_sequence: true, csv_quality: false, csv_separator: ",", - csv_navalue: "NA", + csv_navalue: "NA", csv_keys: make([]string, 0), paired_filename: "", + source: "", } opt := Options{&o} @@ -74,6 +78,10 @@ func (opt Options) BatchSize() int { return opt.pointer.batch_size } +func (opt Options) FullFileBatch() bool { + return opt.pointer.full_file_batch +} + func (opt Options) ParallelWorkers() int { return opt.pointer.parallel_workers } @@ -146,6 +154,14 @@ func (opt Options) PairedFileName() string { return opt.pointer.paired_filename } +func (opt Options) HasSource() bool { + return opt.pointer.source != "" +} + +func (opt Options) Source() string { + return opt.pointer.source +} + func OptionCloseFile() WithOption { f := WithOption(func(opt Options) { opt.pointer.closefile = true @@ -253,6 +269,22 @@ func OptionsBatchSize(size int) WithOption { return f } +func OptionsFullFileBatch(full bool) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.full_file_batch = full + }) + + return f +} + +func OptionsSource(source string) WithOption { + f := WithOption(func(opt Options) { + opt.pointer.source = source + }) + + return f +} + func OptionsWithProgressBar() WithOption { f := WithOption(func(opt Options) { opt.pointer.with_progress_bar = true diff --git a/pkg/obiformats/universal_read.go b/pkg/obiformats/universal_read.go index a9625e4..5143303 100644 --- a/pkg/obiformats/universal_read.go +++ b/pkg/obiformats/universal_read.go @@ -5,11 +5,13 @@ import ( "compress/gzip" "io" "os" + "path" "strings" log "github.com/sirupsen/logrus" "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" ) func GuessSeqFileType(firstline string) string { @@ -49,6 +51,8 @@ func ReadSequencesFromFile(filename string, var greader io.Reader var err error + options = append(options, OptionsSource(obiutils.RemoveAllExt((path.Base(filename))))) + file, err = os.Open(filename) if err != nil { diff --git a/pkg/obiiter/batchiterator.go b/pkg/obiiter/batchiterator.go index 5219f8f..6e86d82 100644 --- a/pkg/obiiter/batchiterator.go +++ b/pkg/obiiter/batchiterator.go @@ -59,7 +59,7 @@ type IBioSequence struct { // IBioSequenceBatch type. var NilIBioSequence = IBioSequence{pointer: nil} -func MakeIBioSequence(sizes ...int) IBioSequence { +func MakeIBioSequence() IBioSequence { i := _IBioSequence{ channel: make(chan BioSequenceBatch), @@ -409,7 +409,7 @@ func (iterator IBioSequence) Pool(iterators ...IBioSequence) IBioSequence { // IBioSequenceBatch with every batches having the same size // indicated in parameter. Rebatching implies to sort the // source IBioSequenceBatch. -func (iterator IBioSequence) Rebatch(size int, sizes ...int) IBioSequence { +func (iterator IBioSequence) Rebatch(size int) IBioSequence { newIter := MakeIBioSequence() @@ -686,6 +686,7 @@ func (iterator IBioSequence) Load() obiseq.BioSequenceSlice { chunck := obiseq.MakeBioSequenceSlice() for iterator.Next() { b := iterator.Get() + log.Debugf("append %d sequences",b.Len()) chunck = append(chunck, b.Slice()...) b.Recycle() } @@ -693,6 +694,33 @@ func (iterator IBioSequence) Load() obiseq.BioSequenceSlice { return chunck } +func (iterator IBioSequence) FullFileIterator() IBioSequence { + + newIter := MakeIBioSequence() + log.Debug("Stream is read in full file mode") + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + slice := iterator.Load() + log.Printf("A batch of %d sequence is read",len(slice)) + if len(slice) > 0 { + newIter.Push(MakeBioSequenceBatch(0, slice)) + } + newIter.Done() + }() + + if iterator.IsPaired() { + newIter.MarkAsPaired() + } + + return newIter +} + // It takes a slice of BioSequence objects, and returns an iterator that will return batches of // BioSequence objects diff --git a/pkg/obikmer/debruijn.go b/pkg/obikmer/debruijn.go new file mode 100644 index 0000000..ac41dbd --- /dev/null +++ b/pkg/obikmer/debruijn.go @@ -0,0 +1,376 @@ +package obikmer + +import ( + "bytes" + "fmt" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" +) + +type KmerIdx32 uint32 +type KmerIdx64 uint64 +type KmerIdx128 struct { + Lo uint64 + Hi uint64 +} + +var iupac = map[byte][]uint64{ + 'a': {0}, + 'c': {1}, + 'g': {2}, + 't': {3}, + 'u': {3}, + 'r': {0, 2}, + 'y': {1, 3}, + 's': {1, 2}, + 'w': {0, 3}, + 'k': {2, 3}, + 'm': {0, 1}, + 'b': {1, 2, 3}, + 'd': {0, 2, 3}, + 'h': {0, 1, 3}, + 'v': {0, 1, 2}, + 'n': {0, 1, 2, 3}, +} + +var decode = map[uint64]byte{ + 0: 'a', + 1: 'c', + 2: 'g', + 3: 't', +} + +type KmerIdx_t interface { + KmerIdx32 | KmerIdx64 | KmerIdx128 +} + +type DeBruijnGraph struct { + kmersize int + kmermask uint64 + prevc uint64 + prevg uint64 + prevt uint64 + graph map[uint64]uint +} + +func MakeDeBruijnGraph(kmersize int) *DeBruijnGraph { + g := DeBruijnGraph{ + kmersize: kmersize, + kmermask: ^(^uint64(0) << (uint64(kmersize+1) * 2)), + prevc: uint64(1) << (uint64(kmersize) * 2), + prevg: uint64(2) << (uint64(kmersize) * 2), + prevt: uint64(3) << (uint64(kmersize) * 2), + graph: make(map[uint64]uint), + } + + return &g +} + +func (g *DeBruijnGraph) KmerSize() int { + return g.kmersize +} + +func (g *DeBruijnGraph) Len() int { + return len(g.graph) +} + +func (g *DeBruijnGraph) MaxLink() int { + max := uint(0) + for _, count := range g.graph { + if count > max { + max = count + } + } + + return int(max) +} + +func (g *DeBruijnGraph) LinkSpectrum() []int { + max := g.MaxLink() + spectrum := make([]int, max+1) + for _, count := range g.graph { + spectrum[int(count)]++ + } + + return spectrum +} + +func (g *DeBruijnGraph) FilterMin(min int) { + umin := uint(min) + for idx, count := range g.graph { + if count < umin { + delete(g.graph, idx) + } + } +} + +func (g *DeBruijnGraph) Previouses(index uint64) []uint64 { + rep := make([]uint64, 0, 4) + index = index >> 2 + + if _, ok := g.graph[index]; ok { + rep = append(rep, index) + } + + key := index | g.prevc + if _, ok := g.graph[key]; ok { + rep = append(rep, key) + } + + key = index | g.prevg + if _, ok := g.graph[key]; ok { + rep = append(rep, key) + } + + key = index | g.prevt + if _, ok := g.graph[key]; ok { + rep = append(rep, key) + } + + return rep +} + +func (g *DeBruijnGraph) Nexts(index uint64) []uint64 { + rep := make([]uint64, 0, 4) + index = index << 2 & g.kmermask + + if _, ok := g.graph[index]; ok { + rep = append(rep, index) + } + + key := index | 1 + if _, ok := g.graph[key]; ok { + rep = append(rep, key) + } + + key = index | 2 + if _, ok := g.graph[key]; ok { + rep = append(rep, key) + } + + key = index | 3 + if _, ok := g.graph[key]; ok { + rep = append(rep, key) + } + + return rep +} + +func (g *DeBruijnGraph) MaxNext(index uint64) (uint64, bool) { + ns := g.Nexts(index) + + if len(ns) == 0 { + return uint64(0), false + } + + max := uint(0) + rep := uint64(0) + for _, idx := range ns { + w, _ := g.graph[idx] + if w > max { + rep = idx + } + } + + return rep, true +} + +func (g *DeBruijnGraph) MaxPath() []uint64 { + path := make([]uint64, 0, 1000) + ok := false + idx := uint64(0) + + idx, ok = g.MaxHead() + + for ok { + path = append(path, idx) + idx, ok = g.MaxNext(idx) + } + + return path +} + +func (g *DeBruijnGraph) LongestPath() []uint64 { + var path []uint64 + wmax := uint(0) + ok := true + + starts:= g.Heads() + + for _,idx := range starts { + lp := make([]uint64, 0, 1000) + w := uint(0) + for ok { + nw:= g.graph[idx] + w+=nw + lp = append(lp, idx) + idx, ok = g.MaxNext(idx) + } + + if w > wmax { + path=lp + wmax=w + } + } + + + return path +} + +func (g *DeBruijnGraph) LongestConsensus(id string) (*obiseq.BioSequence,error) { + path := g.LongestPath() + s := g.DecodePath(path) + + if len(s) > 0 { + seq := obiseq.MakeBioSequence( + id, + []byte(s), + "", + ) + + return &seq,nil + } + + return nil,fmt.Errorf("cannot identify optimum path") +} + + +func (g *DeBruijnGraph) Heads() []uint64 { + rep := make([]uint64, 0, 10) + + for k := range g.graph { + if len(g.Previouses(k)) == 0 { + rep = append(rep, k) + } + } + + return rep +} + +func (g *DeBruijnGraph) MaxHead() (uint64, bool) { + rep := uint64(0) + max := uint(0) + found := false + for k, w := range g.graph { + if len(g.Previouses(k)) == 0 && w > max { + rep = k + found = true + } + } + + return rep, found +} + +func (g *DeBruijnGraph) DecodeNode(index uint64) string { + rep := make([]byte, g.kmersize) + index >>= 2 + for i := g.kmersize - 1; i >= 0; i-- { + rep[i], _ = decode[index&3] + index >>= 2 + } + + return string(rep) +} + +func (g *DeBruijnGraph) DecodePath(path []uint64) string { + rep := make([]byte, 0, len(path)+g.kmersize) + buf := bytes.NewBuffer(rep) + + if len(path) > 0 { + buf.WriteString(g.DecodeNode(path[0])) + + for _, idx := range path[1:] { + buf.WriteByte(decode[idx&3]) + } + } + + return buf.String() +} + +func (g *DeBruijnGraph) BestConsensus(id string) (*obiseq.BioSequence,error) { + path := g.MaxPath() + s := g.DecodePath(path) + + if len(s) > 0 { + seq := obiseq.MakeBioSequence( + id, + []byte(s), + "", + ) + + return &seq,nil + } + + return nil,fmt.Errorf("cannot identify optimum path") +} + +func (g *DeBruijnGraph) Weight(index uint64) int { + val, ok := g.graph[index] + if !ok { + val = 0 + } + return int(val) +} + +func (graph *DeBruijnGraph) append(sequence []byte, current uint64) { + + for i := 0; i < len(sequence); i++ { + current <<= 2 + current &= graph.kmermask + b := iupac[sequence[i]] + if len(b) == 1 { + current |= b[0] + + weight, ok := graph.graph[current] + if !ok { + weight = 0 + } + graph.graph[current] = weight + 1 + } else { + for j := 0; j < len(b); j++ { + current &= ^uint64(3) + current |= b[j] + + weight, ok := graph.graph[current] + if !ok { + weight = 0 + } + graph.graph[current] = weight + 1 + graph.append(sequence[(i+1):], current) + } + return + } + + } +} + +func (graph *DeBruijnGraph) Push(sequence *obiseq.BioSequence) { + key := uint64(0) + s := sequence.Sequence() + init := make([]uint64, 0, 16) + var f func(start int, key uint64) + f = func(start int, key uint64) { + for i := start; i < graph.kmersize; i++ { + key <<= 2 + b := iupac[s[i]] + if len(b) == 1 { + key |= b[0] + } else { + for j := 0; j < len(b); j++ { + key &= ^uint64(3) + key |= b[j] + f(i+1, key) + } + return + } + } + init = append(init, key&graph.kmermask) + } + + f(0, key) + + for _, idx := range init { + graph.append(s[graph.kmersize:], idx) + } + +} diff --git a/pkg/obiseq/biosequence.go b/pkg/obiseq/biosequence.go index f399cd1..99b1e4c 100644 --- a/pkg/obiseq/biosequence.go +++ b/pkg/obiseq/biosequence.go @@ -52,6 +52,7 @@ type Annotation map[string]interface{} type BioSequence struct { id string // The identidier of the sequence (private accessible through the method Id) definition string // The documentation of the sequence (private accessible through the method Definition) + source string // The filename without directory name and extension from where the sequence was read. sequence []byte // The sequence itself, it is accessible by the methode Sequence qualities []byte // The quality scores of the sequence. feature []byte @@ -67,6 +68,7 @@ func MakeEmptyBioSequence() BioSequence { return BioSequence{ id: "", definition: "", + source: "", sequence: nil, qualities: nil, feature: nil, @@ -199,6 +201,16 @@ func (s *BioSequence) Annotations() Annotation { return s.annotations } +// Checking if the BioSequence has a source. +func (s *BioSequence) HasSource() bool { + return len(s.source) > 0 +} + +func (s *BioSequence) Source() string { + return s.source +} + + // Returning the MD5 hash of the sequence. func (s *BioSequence) MD5() [16]byte { return md5.Sum(s.sequence) @@ -214,6 +226,11 @@ func (s *BioSequence) SetDefinition(definition string) { s.definition = definition } +// Setting the source of the sequence. +func (s *BioSequence) SetSource(source string) { + s.source = source +} + // Setting the features of the BioSequence. func (s *BioSequence) SetFeatures(feature []byte) { if cap(s.feature) >= 300 { diff --git a/pkg/obisuffix/suffix_array.go b/pkg/obisuffix/suffix_array.go new file mode 100644 index 0000000..ae7d727 --- /dev/null +++ b/pkg/obisuffix/suffix_array.go @@ -0,0 +1,137 @@ +package obisuffix + +import ( + "bytes" + "fmt" + "sort" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" +) + +type Suffix struct { + Idx uint32 + Pos uint32 +} + +type SuffixArray struct { + Sequences *obiseq.BioSequenceSlice + Suffixes []Suffix + Common []int +} + +func SuffixLess(suffixarray SuffixArray) func(i, j int) bool { + less := func(i, j int) bool { + si := suffixarray.Suffixes[i] + bi := (*suffixarray.Sequences)[int(si.Idx)].Sequence()[si.Pos:] + sj := suffixarray.Suffixes[j] + bj := (*suffixarray.Sequences)[int(sj.Idx)].Sequence()[sj.Pos:] + + l := obiutils.MinInt(len(bi), len(bj)) + p := 0 + for p < l && bi[p] == bj[p] { + p++ + } + + // if p < l { + // log.Debugln(si, sj, p, l, rune(bi[p]), rune(bj[p])) + // } else { + // log.Debugln(si, sj, p, l, rune('-'), rune('-')) + // } + + if p == l { + switch { + case len(bi) != len(bj): + return len(bi) < len(bj) + case si.Idx != sj.Idx: + return si.Idx < sj.Idx + default: + return si.Pos < sj.Pos + } + } + + return bi[p] < bj[p] + } + + return less +} + +func BuildSuffixArray(data *obiseq.BioSequenceSlice) SuffixArray { + totalLength := 0 + for _, s := range *data { + totalLength += s.Len() + } + + sa := SuffixArray{ + Sequences: data, + Suffixes: make([]Suffix, 0, totalLength), + } + + for i, s := range *data { + sl := uint32(s.Len()) + for p := uint32(0); p < sl; p++ { + sa.Suffixes = append(sa.Suffixes, Suffix{uint32(i), p}) + } + } + + sort.SliceStable(sa.Suffixes, SuffixLess(sa)) + return sa +} + +func (suffixarray *SuffixArray) CommonSuffix() []int { + if len(suffixarray.Common) == len(suffixarray.Suffixes) { + return suffixarray.Common + } + + lrep := len(suffixarray.Suffixes) + rep := make([]int, lrep) + + sp := suffixarray.Suffixes[0] + bp := (*suffixarray.Sequences)[int(sp.Idx)].Sequence()[sp.Pos:] + for i := 1; i < lrep; i++ { + si := suffixarray.Suffixes[i] + bi := (*suffixarray.Sequences)[int(si.Idx)].Sequence()[si.Pos:] + + l := obiutils.MinInt(len(bi), len(bp)) + p := 0 + for p < l && bi[p] == bp[p] { + p++ + } + rep[i-1] = p + sp = si + bp = bi + } + + rep[lrep-1] = 0 + + suffixarray.Common = rep + + return suffixarray.Common +} + +func (suffixarray *SuffixArray) String() string { + sb := bytes.Buffer{} + + common := suffixarray.CommonSuffix() + + sb.WriteString("Common\tSeqIdx\tPosition\tSuffix\n") + + for i := range suffixarray.Suffixes { + idx := suffixarray.Suffixes[i].Idx + pos := suffixarray.Suffixes[i].Pos + sb.WriteString(fmt.Sprintf("%6d\t%6d\t%8d\t%s\n", + common[i], + idx, + pos, + string((*suffixarray.Sequences)[idx].Sequence()[pos:]), + )) + } + + return sb.String() +} + +// func LongestInternalRepeat(suffixarray SuffixArray) []int { +// common := suffixarray.CommonSuffix() +// rep := make([]int, len(*suffixarray.Sequences)) + +// } diff --git a/pkg/obitools/obiconsensus/obiconsensus.go b/pkg/obitools/obiconsensus/obiconsensus.go new file mode 100644 index 0000000..de21d7d --- /dev/null +++ b/pkg/obitools/obiconsensus/obiconsensus.go @@ -0,0 +1,123 @@ +package obiconsensus + +import ( + "sort" + + log "github.com/sirupsen/logrus" + + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiiter" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obikmer" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiseq" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obisuffix" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" +) + +func BuildConsensus(seqs obiseq.BioSequenceSlice, quorum float64) (*obiseq.BioSequence, error) { + + log.Printf("Number of reads : %d\n", len(seqs)) + + longest := make([]int, len(seqs)) + + for i := range seqs { + s := seqs[i : i+1] + sa := obisuffix.BuildSuffixArray(&s) + longest[i] = obiutils.MaxSlice(sa.CommonSuffix()) + } + + o := obiutils.Order(sort.IntSlice(longest)) + i := int(float64(len(seqs)) * quorum) + + kmersize := longest[o[i]] + 1 + log.Printf("estimated kmer size : %d", kmersize) + + graph := obikmer.MakeDeBruijnGraph(kmersize) + + for _, s := range seqs { + graph.Push(s) + } + + log.Printf("Graph size : %d\n", graph.Len()) + total_kmer := graph.Len() + spectrum := graph.LinkSpectrum() + cum := make(map[int]int) + + spectrum[1]=0 + for i := 2; i < len(spectrum); i++ { + spectrum[i] += spectrum[i-1] + cum[spectrum[i]]++ + } + + max := 0 + kmax := 0 + for k, obs := range cum { + if obs > max { + max = obs + kmax = k + } + } + + threshold := 0 + for i, total := range spectrum { + if total == kmax { + threshold = i + break + } + } + + graph.FilterMin(threshold) + log.Printf("Graph size : %d\n", graph.Len()) + + seq,err := graph.LongestConsensus(seqs[0].Source()) + + seq.SetCount(len(seqs)) + seq.SetAttribute("seq_length",seq.Len()) + seq.SetAttribute("kmer_size",kmersize) + seq.SetAttribute("kmer_min_occur",threshold) + seq.SetAttribute("kmer_max_occur",graph.MaxLink()) + seq.SetAttribute("filtered_graph_size",graph.Len()) + seq.SetAttribute("full_graph_size",total_kmer) + + return seq,err +} + +func Consensus(iterator obiiter.IBioSequence, quorum float64) obiiter.IBioSequence { + newIter := obiiter.MakeIBioSequence() + size:=10 + + newIter.Add(1) + + go func() { + newIter.WaitAndClose() + }() + + go func() { + order := 0 + iterator = iterator.SortBatches() + buffer := obiseq.MakeBioSequenceSlice() + + for iterator.Next() { + seqs := iterator.Get() + consensus,err := BuildConsensus(seqs.Slice(),quorum) + + if err == nil { + buffer = append(buffer, consensus) + } + + if len(buffer) == size { + newIter.Push(obiiter.MakeBioSequenceBatch(order, buffer)) + order++ + buffer = obiseq.MakeBioSequenceSlice() + } + seqs.Recycle() + } + + if len(buffer) > 0 { + newIter.Push(obiiter.MakeBioSequenceBatch(order, buffer)) + } + + newIter.Done() + + }() + + return newIter +} diff --git a/pkg/obitools/obiconsensus/options.go b/pkg/obitools/obiconsensus/options.go new file mode 100644 index 0000000..3a418cc --- /dev/null +++ b/pkg/obitools/obiconsensus/options.go @@ -0,0 +1,13 @@ +package obiconsensus + +import ( + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obitools/obiconvert" + "github.com/DavidGamba/go-getoptions" +) + + +func OptionSet(options *getoptions.GetOpt) { + obiconvert.InputOptionSet(options) + obiconvert.OutputOptionSet(options) +} + diff --git a/pkg/obitools/obiconvert/options.go b/pkg/obitools/obiconvert/options.go index 006a390..eca2294 100644 --- a/pkg/obitools/obiconvert/options.go +++ b/pkg/obitools/obiconvert/options.go @@ -30,6 +30,8 @@ var __compressed__ = false var __output_file_name__ = "-" var __paired_file_name__ = "" +var __full_file_batch__ = false + func InputOptionSet(options *getoptions.GetOpt) { // options.IntVar(&__skipped_entries__, "skip", __skipped_entries__, // options.Description("The N first sequence records of the file are discarded from the analysis and not reported to the output file.")) @@ -201,3 +203,10 @@ func CLIHasPairedFile() bool { func CLIPairedFileName() string { return __paired_file_name__ } + +func SetFullFileBatch() { + __full_file_batch__ = true +} +func FullFileBatch() bool { + return __full_file_batch__ +} \ No newline at end of file diff --git a/pkg/obitools/obiconvert/sequence_reader.go b/pkg/obitools/obiconvert/sequence_reader.go index b9e2d1e..42f2bbb 100644 --- a/pkg/obitools/obiconvert/sequence_reader.go +++ b/pkg/obitools/obiconvert/sequence_reader.go @@ -99,9 +99,13 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { opts = append(opts, obiformats.OptionsBatchSize(obioptions.CLIBatchSize())) opts = append(opts, obiformats.OptionsQualityShift(CLIInputQualityShift())) + opts = append(opts, obiformats.OptionsFullFileBatch(FullFileBatch())) + if len(filenames) == 0 { log.Printf("Reading sequences from stdin in %s\n", CLIInputFormat()) + opts = append(opts, obiformats.OptionsSource("stdin")) + switch CLIInputFormat() { case "ecopcr": iterator = obiformats.ReadEcoPCR(os.Stdin, opts...) @@ -121,7 +125,7 @@ func CLIReadBioSequences(filenames ...string) (obiiter.IBioSequence, error) { switch CLIInputFormat() { case "ecopcr": - reader = obiformats.ReadEcoPCRBatchFromFile + reader = obiformats.ReadEcoPCRFromFile case "embl": reader = obiformats.ReadEMBLFromFile case "genbank": diff --git a/pkg/obiutils/path.go b/pkg/obiutils/path.go new file mode 100644 index 0000000..7d941d9 --- /dev/null +++ b/pkg/obiutils/path.go @@ -0,0 +1,16 @@ +package obiutils + +import ( + "path" + "strings" +) + +func RemoveAllExt(p string) string { + + for ext := path.Ext(p); len(ext) > 0; ext = path.Ext(p) { + p = strings.TrimSuffix(p, ext) + } + + return p + + } \ No newline at end of file