From ac0d3f3fe40595b9f525b4cbb4efa99a00caf571 Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Wed, 3 Dec 2025 11:48:50 +0100 Subject: [PATCH] Update obiuniq for very large dataset --- obitests/obitools/obiuniq/test.sh | 96 ++++++++++++++++++++ obitests/obitools/obiuniq/touniq.fasta | 16 ++++ obitests/obitools/obiuniq/touniq_u.fasta | 4 + obitests/obitools/obiuniq/touniq_u_a.fasta | 8 ++ obitests/obitools/obiuniq/touniq_u_a_b.fasta | 12 +++ pkg/obichunk/chunk.go | 8 +- pkg/obichunk/chunk_on_disk.go | 45 ++++++++- pkg/obichunk/unique.go | 72 +++++---------- pkg/obioptions/version.go | 2 +- pkg/obiseq/class.go | 74 +++++++++++++++ 10 files changed, 281 insertions(+), 56 deletions(-) create mode 100644 obitests/obitools/obiuniq/touniq.fasta create mode 100644 obitests/obitools/obiuniq/touniq_u.fasta create mode 100644 obitests/obitools/obiuniq/touniq_u_a.fasta create mode 100644 obitests/obitools/obiuniq/touniq_u_a_b.fasta diff --git a/obitests/obitools/obiuniq/test.sh b/obitests/obitools/obiuniq/test.sh index 65fcd3e..0f9c9aa 100755 --- a/obitests/obitools/obiuniq/test.sh +++ b/obitests/obitools/obiuniq/test.sh @@ -98,6 +98,102 @@ else ((failed++)) fi +((ntest++)) +if obiuniq "${TEST_DIR}/touniq.fasta" \ + > "${TMPDIR}/touniq_u.fasta" +then + log "OBIUniq simple: running OK" + ((success++)) +else + log "OBIUniq simple: running failed" + ((failed++)) +fi + +obicsv -s --auto ${TEST_DIR}/touniq_u.fasta \ +| tail -n +2 \ +| sort \ +> "${TMPDIR}/touniq_u_ref.csv" + +obicsv -s --auto ${TMPDIR}/touniq_u.fasta \ +| tail -n +2 \ +| sort \ +> "${TMPDIR}/touniq_u.csv" + +((ntest++)) +if diff "${TMPDIR}/touniq_u_ref.csv" \ + "${TMPDIR}/touniq_u.csv" > /dev/null +then + log "OBIUniq simple: result OK" + ((success++)) +else + log "OBIUniq simple: result failed" + ((failed++)) +fi + +((ntest++)) +if obiuniq -c a "${TEST_DIR}/touniq.fasta" \ + > "${TMPDIR}/touniq_u_a.fasta" +then + log "OBIUniq one category: running OK" + ((success++)) +else + log "OBIUniq one category: running failed" + ((failed++)) +fi + +obicsv -s --auto ${TEST_DIR}/touniq_u_a.fasta \ +| tail -n +2 \ +| sort \ +> "${TMPDIR}/touniq_u_a_ref.csv" + +obicsv -s --auto ${TMPDIR}/touniq_u_a.fasta \ +| tail -n +2 \ +| sort \ +> "${TMPDIR}/touniq_u_a.csv" + + +((ntest++)) +if diff "${TMPDIR}/touniq_u_a_ref.csv" \ + "${TMPDIR}/touniq_u_a.csv" > /dev/null +then + log "OBIUniq one category: result OK" + ((success++)) +else + log "OBIUniq one category: result failed" + ((failed++)) +fi + +((ntest++)) +if obiuniq -c a -c b "${TEST_DIR}/touniq.fasta" \ + > "${TMPDIR}/touniq_u_a_b.fasta" +then + log "OBIUniq two categories: running OK" + ((success++)) +else + log "OBIUniq two categories: running failed" + ((failed++)) +fi + +obicsv -s --auto ${TEST_DIR}/touniq_u_a_b.fasta \ +| tail -n +2 \ +| sort \ +> "${TMPDIR}/touniq_u_a_b_ref.csv" + +obicsv -s --auto ${TMPDIR}/touniq_u_a_b.fasta \ +| tail -n +2 \ +| sort \ +> "${TMPDIR}/touniq_u_a_b.csv" + +((ntest++)) +if diff "${TMPDIR}/touniq_u_a_b_ref.csv" \ + "${TMPDIR}/touniq_u_a_b.csv" > /dev/null +then + log "OBIUniq two categories: result OK" + ((success++)) +else + log "OBIUniq two categories: result failed" + ((failed++)) +fi ######################################### # diff --git a/obitests/obitools/obiuniq/touniq.fasta b/obitests/obitools/obiuniq/touniq.fasta new file mode 100644 index 0000000..330ff14 --- /dev/null +++ b/obitests/obitools/obiuniq/touniq.fasta @@ -0,0 +1,16 @@ +>seq1 {"a":2, "b":4,"c":5} +aaacccgggttt +>seq2 {"a":3, "b":4,"c":5} +aaacccgggttt +>seq3 {"a":3, "b":5,"c":5} +aaacccgggttt +>seq4 {"a":3, "b":5,"c":6} +aaacccgggttt +>seq5 {"a":2, "b":4,"c":5} +aaacccgggtttca +>seq6 {"a":3, "b":4,"c":5} +aaacccgggtttca +>seq7 {"a":3, "b":5,"c":5} +aaacccgggtttca +>seq8 {"a":3, "b":5,"c":6} +aaacccgggtttca diff --git a/obitests/obitools/obiuniq/touniq_u.fasta b/obitests/obitools/obiuniq/touniq_u.fasta new file mode 100644 index 0000000..8c19a51 --- /dev/null +++ b/obitests/obitools/obiuniq/touniq_u.fasta @@ -0,0 +1,4 @@ +>seq5 {"count":4} +aaacccgggtttca +>seq1 {"count":4} +aaacccgggttt diff --git a/obitests/obitools/obiuniq/touniq_u_a.fasta b/obitests/obitools/obiuniq/touniq_u_a.fasta new file mode 100644 index 0000000..cea9dd7 --- /dev/null +++ b/obitests/obitools/obiuniq/touniq_u_a.fasta @@ -0,0 +1,8 @@ +>seq5 {"a":2,"b":4,"c":5,"count":1} +aaacccgggtttca +>seq6 {"a":3,"count":3} +aaacccgggtttca +>seq1 {"a":2,"b":4,"c":5,"count":1} +aaacccgggttt +>seq2 {"a":3,"count":3} +aaacccgggttt diff --git a/obitests/obitools/obiuniq/touniq_u_a_b.fasta b/obitests/obitools/obiuniq/touniq_u_a_b.fasta new file mode 100644 index 0000000..ce80f4c --- /dev/null +++ b/obitests/obitools/obiuniq/touniq_u_a_b.fasta @@ -0,0 +1,12 @@ +>seq5 {"a":2,"b":4,"c":5,"count":1} +aaacccgggtttca +>seq6 {"a":3,"b":4,"c":5,"count":1} +aaacccgggtttca +>seq7 {"a":3,"b":5,"count":2} +aaacccgggtttca +>seq1 {"a":2,"b":4,"c":5,"count":1} +aaacccgggttt +>seq2 {"a":3,"b":4,"c":5,"count":1} +aaacccgggttt +>seq3 {"a":3,"b":5,"count":2} +aaacccgggttt diff --git a/pkg/obichunk/chunk.go b/pkg/obichunk/chunk.go index 240ed44..644349b 100644 --- a/pkg/obichunk/chunk.go +++ b/pkg/obichunk/chunk.go @@ -7,11 +7,15 @@ import ( func ISequenceChunk(iterator obiiter.IBioSequence, classifier *obiseq.BioSequenceClassifier, - onMemory bool) (obiiter.IBioSequence, error) { + onMemory bool, + dereplicate bool, + na string, + statsOn obiseq.StatsOnDescriptions, +) (obiiter.IBioSequence, error) { if onMemory { return ISequenceChunkOnMemory(iterator, classifier) } else { - return ISequenceChunkOnDisk(iterator, classifier) + return ISequenceChunkOnDisk(iterator, classifier, dereplicate, na, statsOn) } } diff --git a/pkg/obichunk/chunk_on_disk.go b/pkg/obichunk/chunk_on_disk.go index 4e95fd9..2a71c78 100644 --- a/pkg/obichunk/chunk_on_disk.go +++ b/pkg/obichunk/chunk_on_disk.go @@ -74,7 +74,11 @@ func find(root, ext string) []string { // is removed. The function logs the number of batches created and the processing // status of each batch. func ISequenceChunkOnDisk(iterator obiiter.IBioSequence, - classifier *obiseq.BioSequenceClassifier) (obiiter.IBioSequence, error) { + classifier *obiseq.BioSequenceClassifier, + dereplicate bool, + na string, + statsOn obiseq.StatsOnDescriptions, +) (obiiter.IBioSequence, error) { obiutils.RegisterAPipe() dir, err := tempDir() if err != nil { @@ -113,11 +117,42 @@ func ISequenceChunkOnDisk(iterator obiiter.IBioSequence, panic(err) } - source, chunk := iseq.Load() + if dereplicate { + u := make(map[string]*obiseq.BioSequence) + var source string + var chunk obiseq.BioSequenceSlice - newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunk)) - log.Infof("Start processing of batch %d/%d : %d sequences", - order, nbatch, len(chunk)) + for iseq.Next() { + batch := iseq.Get() + source = batch.Source() + + for _, seq := range batch.Slice() { + sstring := seq.String() + prev, ok := u[sstring] + if ok { + prev.Merge(seq, na, true, statsOn) + } else { + u[sstring] = seq + } + } + + chunk = obiseq.MakeBioSequenceSlice(len(u)) + i := 0 + + for _, seq := range u { + chunk[i] = seq + } + + } + newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunk)) + + } else { + source, chunk := iseq.Load() + + newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunk)) + log.Infof("Start processing of batch %d/%d : %d sequences", + order+1, nbatch, len(chunk)) + } } diff --git a/pkg/obichunk/unique.go b/pkg/obichunk/unique.go index 9e98bf0..ba8cc4a 100644 --- a/pkg/obichunk/unique.go +++ b/pkg/obichunk/unique.go @@ -25,18 +25,32 @@ func IUniqueSequence(iterator obiiter.IBioSequence, log.Infoln("Starting data splitting") + cat := opts.Categories() + na := opts.NAValue() + + var classifier *obiseq.BioSequenceClassifier + + if len(cat) > 0 { + cls := make([]*obiseq.BioSequenceClassifier, len(cat)+1) + for i, c := range cat { + cls[i+1] = obiseq.AnnotationClassifier(c, na) + } + cls[0] = obiseq.HashClassifier(opts.BatchCount()) + classifier = obiseq.CompositeClassifier(cls...) + } else { + classifier = obiseq.HashClassifier(opts.BatchCount()) + } + if opts.SortOnDisk() { nworkers = 1 - iterator, err = ISequenceChunkOnDisk(iterator, - obiseq.HashClassifier(opts.BatchCount())) + iterator, err = ISequenceChunkOnDisk(iterator, classifier, true, na, opts.StatsOn()) if err != nil { return obiiter.NilIBioSequence, err } } else { - iterator, err = ISequenceChunkOnMemory(iterator, - obiseq.HashClassifier(opts.BatchCount())) + iterator, err = ISequenceChunkOnMemory(iterator, classifier) if err != nil { return obiiter.NilIBioSequence, err @@ -63,63 +77,25 @@ func IUniqueSequence(iterator obiiter.IBioSequence, return neworder } - var ff func(obiiter.IBioSequence, - *obiseq.BioSequenceClassifier, - int) - - cat := opts.Categories() - na := opts.NAValue() - - ff = func(input obiiter.IBioSequence, - classifier *obiseq.BioSequenceClassifier, - icat int) { - icat-- + ff := func(input obiiter.IBioSequence, + classifier *obiseq.BioSequenceClassifier) { input, err = ISequenceSubChunk(input, classifier, 1) - var next obiiter.IBioSequence - if icat >= 0 { - next = obiiter.MakeIBioSequence() - - iUnique.Add(1) - - go ff(next, - obiseq.AnnotationClassifier(cat[icat], na), - icat) - } - - o := 0 for input.Next() { batch := input.Get() - - if icat < 0 || len(batch.Slice()) == 1 { - // No more sub classification of sequence or only a single sequence - if !(opts.NoSingleton() && len(batch.Slice()) == 1 && batch.Slice()[0].Count() == 1) { - iUnique.Push(batch.Reorder(nextOrder())) - } - } else { - // A new step of classification must du realized - next.Push(batch.Reorder(o)) - o++ + if !(opts.NoSingleton() && len(batch.Slice()) == 1 && batch.Slice()[0].Count() == 1) { + iUnique.Push(batch.Reorder(nextOrder())) } } - - if icat >= 0 { - next.Close() - } - iUnique.Done() } for i := 0; i < nworkers-1; i++ { - go ff(iterator.Split(), - obiseq.SequenceClassifier(), - len(cat)) + go ff(iterator.Split(), obiseq.SequenceClassifier()) } - go ff(iterator, - obiseq.SequenceClassifier(), - len(cat)) + go ff(iterator, obiseq.SequenceClassifier()) iMerged := iUnique.IMergeSequenceBatch(opts.NAValue(), opts.StatsOn(), diff --git a/pkg/obioptions/version.go b/pkg/obioptions/version.go index 235595b..0cbe8a9 100644 --- a/pkg/obioptions/version.go +++ b/pkg/obioptions/version.go @@ -8,7 +8,7 @@ import ( // corresponds to the last commit, and not the one when the file will be // commited -var _Commit = "c1b9503" +var _Commit = "547135c" var _Version = "Release 4.4.0" // Version returns the version of the obitools package. diff --git a/pkg/obiseq/class.go b/pkg/obiseq/class.go index 10d1247..cf9366f 100644 --- a/pkg/obiseq/class.go +++ b/pkg/obiseq/class.go @@ -316,3 +316,77 @@ func RotateClassifier(size int) *BioSequenceClassifier { c := BioSequenceClassifier{code, value, reset, clone, "RotateClassifier"} return &c } + +func CompositeClassifier(classifiers ...*BioSequenceClassifier) *BioSequenceClassifier { + encode := make(map[string]int, 1000) + decode := make([]string, 0, 1000) + locke := sync.RWMutex{} + maxcode := 0 + initbufsize := len(classifiers) * 6 + + code := func(sequence *BioSequence) int { + buf := make([]byte, 0, initbufsize) + for _, cl := range classifiers { + if cl == nil { + continue + } + rep := cl.Code(sequence) + buf = strconv.AppendInt(buf, int64(rep), 10) + buf = append(buf, ':') + } + + locke.Lock() + defer locke.Unlock() + + sval := string(buf) + + k, ok := encode[sval] + + if !ok { + k = maxcode + maxcode++ + encode[sval] = k + decode = append(decode, sval) + } + + return k + } + + value := func(k int) string { + locke.RLock() + defer locke.RUnlock() + + if k >= maxcode { + log.Fatalf("value %d not register", k) + } + return decode[k] + } + + reset := func() { + locke.Lock() + defer locke.Unlock() + + encode = make(map[string]int) + decode = decode[:0] + maxcode = 0 + } + + clone := func() *BioSequenceClassifier { + clones := make([]*BioSequenceClassifier, 0, len(classifiers)) + for _, cl := range classifiers { + if cl == nil { + continue + } + if cl.Clone != nil { + clones = append(clones, cl.Clone()) + } else { + clones = append(clones, cl) + } + } + return CompositeClassifier(clones...) + } + + c := BioSequenceClassifier{code, value, reset, clone, "CompositeClassifier"} + return &c + +}