Update obiuniq for very large dataset

This commit is contained in:
Eric Coissac
2025-12-03 11:48:50 +01:00
parent 547135c747
commit ac0d3f3fe4
10 changed files with 281 additions and 56 deletions

View File

@@ -98,6 +98,102 @@ else
((failed++))
fi
((ntest++))
if obiuniq "${TEST_DIR}/touniq.fasta" \
> "${TMPDIR}/touniq_u.fasta"
then
log "OBIUniq simple: running OK"
((success++))
else
log "OBIUniq simple: running failed"
((failed++))
fi
obicsv -s --auto ${TEST_DIR}/touniq_u.fasta \
| tail -n +2 \
| sort \
> "${TMPDIR}/touniq_u_ref.csv"
obicsv -s --auto ${TMPDIR}/touniq_u.fasta \
| tail -n +2 \
| sort \
> "${TMPDIR}/touniq_u.csv"
((ntest++))
if diff "${TMPDIR}/touniq_u_ref.csv" \
"${TMPDIR}/touniq_u.csv" > /dev/null
then
log "OBIUniq simple: result OK"
((success++))
else
log "OBIUniq simple: result failed"
((failed++))
fi
((ntest++))
if obiuniq -c a "${TEST_DIR}/touniq.fasta" \
> "${TMPDIR}/touniq_u_a.fasta"
then
log "OBIUniq one category: running OK"
((success++))
else
log "OBIUniq one category: running failed"
((failed++))
fi
obicsv -s --auto ${TEST_DIR}/touniq_u_a.fasta \
| tail -n +2 \
| sort \
> "${TMPDIR}/touniq_u_a_ref.csv"
obicsv -s --auto ${TMPDIR}/touniq_u_a.fasta \
| tail -n +2 \
| sort \
> "${TMPDIR}/touniq_u_a.csv"
((ntest++))
if diff "${TMPDIR}/touniq_u_a_ref.csv" \
"${TMPDIR}/touniq_u_a.csv" > /dev/null
then
log "OBIUniq one category: result OK"
((success++))
else
log "OBIUniq one category: result failed"
((failed++))
fi
((ntest++))
if obiuniq -c a -c b "${TEST_DIR}/touniq.fasta" \
> "${TMPDIR}/touniq_u_a_b.fasta"
then
log "OBIUniq two categories: running OK"
((success++))
else
log "OBIUniq two categories: running failed"
((failed++))
fi
obicsv -s --auto ${TEST_DIR}/touniq_u_a_b.fasta \
| tail -n +2 \
| sort \
> "${TMPDIR}/touniq_u_a_b_ref.csv"
obicsv -s --auto ${TMPDIR}/touniq_u_a_b.fasta \
| tail -n +2 \
| sort \
> "${TMPDIR}/touniq_u_a_b.csv"
((ntest++))
if diff "${TMPDIR}/touniq_u_a_b_ref.csv" \
"${TMPDIR}/touniq_u_a_b.csv" > /dev/null
then
log "OBIUniq two categories: result OK"
((success++))
else
log "OBIUniq two categories: result failed"
((failed++))
fi
#########################################
#

View File

@@ -0,0 +1,16 @@
>seq1 {"a":2, "b":4,"c":5}
aaacccgggttt
>seq2 {"a":3, "b":4,"c":5}
aaacccgggttt
>seq3 {"a":3, "b":5,"c":5}
aaacccgggttt
>seq4 {"a":3, "b":5,"c":6}
aaacccgggttt
>seq5 {"a":2, "b":4,"c":5}
aaacccgggtttca
>seq6 {"a":3, "b":4,"c":5}
aaacccgggtttca
>seq7 {"a":3, "b":5,"c":5}
aaacccgggtttca
>seq8 {"a":3, "b":5,"c":6}
aaacccgggtttca

View File

@@ -0,0 +1,4 @@
>seq5 {"count":4}
aaacccgggtttca
>seq1 {"count":4}
aaacccgggttt

View File

@@ -0,0 +1,8 @@
>seq5 {"a":2,"b":4,"c":5,"count":1}
aaacccgggtttca
>seq6 {"a":3,"count":3}
aaacccgggtttca
>seq1 {"a":2,"b":4,"c":5,"count":1}
aaacccgggttt
>seq2 {"a":3,"count":3}
aaacccgggttt

View File

@@ -0,0 +1,12 @@
>seq5 {"a":2,"b":4,"c":5,"count":1}
aaacccgggtttca
>seq6 {"a":3,"b":4,"c":5,"count":1}
aaacccgggtttca
>seq7 {"a":3,"b":5,"count":2}
aaacccgggtttca
>seq1 {"a":2,"b":4,"c":5,"count":1}
aaacccgggttt
>seq2 {"a":3,"b":4,"c":5,"count":1}
aaacccgggttt
>seq3 {"a":3,"b":5,"count":2}
aaacccgggttt

View File

@@ -7,11 +7,15 @@ import (
func ISequenceChunk(iterator obiiter.IBioSequence,
classifier *obiseq.BioSequenceClassifier,
onMemory bool) (obiiter.IBioSequence, error) {
onMemory bool,
dereplicate bool,
na string,
statsOn obiseq.StatsOnDescriptions,
) (obiiter.IBioSequence, error) {
if onMemory {
return ISequenceChunkOnMemory(iterator, classifier)
} else {
return ISequenceChunkOnDisk(iterator, classifier)
return ISequenceChunkOnDisk(iterator, classifier, dereplicate, na, statsOn)
}
}

View File

@@ -74,7 +74,11 @@ func find(root, ext string) []string {
// is removed. The function logs the number of batches created and the processing
// status of each batch.
func ISequenceChunkOnDisk(iterator obiiter.IBioSequence,
classifier *obiseq.BioSequenceClassifier) (obiiter.IBioSequence, error) {
classifier *obiseq.BioSequenceClassifier,
dereplicate bool,
na string,
statsOn obiseq.StatsOnDescriptions,
) (obiiter.IBioSequence, error) {
obiutils.RegisterAPipe()
dir, err := tempDir()
if err != nil {
@@ -113,11 +117,42 @@ func ISequenceChunkOnDisk(iterator obiiter.IBioSequence,
panic(err)
}
if dereplicate {
u := make(map[string]*obiseq.BioSequence)
var source string
var chunk obiseq.BioSequenceSlice
for iseq.Next() {
batch := iseq.Get()
source = batch.Source()
for _, seq := range batch.Slice() {
sstring := seq.String()
prev, ok := u[sstring]
if ok {
prev.Merge(seq, na, true, statsOn)
} else {
u[sstring] = seq
}
}
chunk = obiseq.MakeBioSequenceSlice(len(u))
i := 0
for _, seq := range u {
chunk[i] = seq
}
}
newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunk))
} else {
source, chunk := iseq.Load()
newIter.Push(obiiter.MakeBioSequenceBatch(source, order, chunk))
log.Infof("Start processing of batch %d/%d : %d sequences",
order, nbatch, len(chunk))
order+1, nbatch, len(chunk))
}
}

View File

@@ -25,18 +25,32 @@ func IUniqueSequence(iterator obiiter.IBioSequence,
log.Infoln("Starting data splitting")
cat := opts.Categories()
na := opts.NAValue()
var classifier *obiseq.BioSequenceClassifier
if len(cat) > 0 {
cls := make([]*obiseq.BioSequenceClassifier, len(cat)+1)
for i, c := range cat {
cls[i+1] = obiseq.AnnotationClassifier(c, na)
}
cls[0] = obiseq.HashClassifier(opts.BatchCount())
classifier = obiseq.CompositeClassifier(cls...)
} else {
classifier = obiseq.HashClassifier(opts.BatchCount())
}
if opts.SortOnDisk() {
nworkers = 1
iterator, err = ISequenceChunkOnDisk(iterator,
obiseq.HashClassifier(opts.BatchCount()))
iterator, err = ISequenceChunkOnDisk(iterator, classifier, true, na, opts.StatsOn())
if err != nil {
return obiiter.NilIBioSequence, err
}
} else {
iterator, err = ISequenceChunkOnMemory(iterator,
obiseq.HashClassifier(opts.BatchCount()))
iterator, err = ISequenceChunkOnMemory(iterator, classifier)
if err != nil {
return obiiter.NilIBioSequence, err
@@ -63,63 +77,25 @@ func IUniqueSequence(iterator obiiter.IBioSequence,
return neworder
}
var ff func(obiiter.IBioSequence,
*obiseq.BioSequenceClassifier,
int)
cat := opts.Categories()
na := opts.NAValue()
ff = func(input obiiter.IBioSequence,
classifier *obiseq.BioSequenceClassifier,
icat int) {
icat--
ff := func(input obiiter.IBioSequence,
classifier *obiseq.BioSequenceClassifier) {
input, err = ISequenceSubChunk(input,
classifier,
1)
var next obiiter.IBioSequence
if icat >= 0 {
next = obiiter.MakeIBioSequence()
iUnique.Add(1)
go ff(next,
obiseq.AnnotationClassifier(cat[icat], na),
icat)
}
o := 0
for input.Next() {
batch := input.Get()
if icat < 0 || len(batch.Slice()) == 1 {
// No more sub classification of sequence or only a single sequence
if !(opts.NoSingleton() && len(batch.Slice()) == 1 && batch.Slice()[0].Count() == 1) {
iUnique.Push(batch.Reorder(nextOrder()))
}
} else {
// A new step of classification must du realized
next.Push(batch.Reorder(o))
o++
}
}
if icat >= 0 {
next.Close()
}
iUnique.Done()
}
for i := 0; i < nworkers-1; i++ {
go ff(iterator.Split(),
obiseq.SequenceClassifier(),
len(cat))
go ff(iterator.Split(), obiseq.SequenceClassifier())
}
go ff(iterator,
obiseq.SequenceClassifier(),
len(cat))
go ff(iterator, obiseq.SequenceClassifier())
iMerged := iUnique.IMergeSequenceBatch(opts.NAValue(),
opts.StatsOn(),

View File

@@ -8,7 +8,7 @@ import (
// corresponds to the last commit, and not the one when the file will be
// commited
var _Commit = "c1b9503"
var _Commit = "547135c"
var _Version = "Release 4.4.0"
// Version returns the version of the obitools package.

View File

@@ -316,3 +316,77 @@ func RotateClassifier(size int) *BioSequenceClassifier {
c := BioSequenceClassifier{code, value, reset, clone, "RotateClassifier"}
return &c
}
func CompositeClassifier(classifiers ...*BioSequenceClassifier) *BioSequenceClassifier {
encode := make(map[string]int, 1000)
decode := make([]string, 0, 1000)
locke := sync.RWMutex{}
maxcode := 0
initbufsize := len(classifiers) * 6
code := func(sequence *BioSequence) int {
buf := make([]byte, 0, initbufsize)
for _, cl := range classifiers {
if cl == nil {
continue
}
rep := cl.Code(sequence)
buf = strconv.AppendInt(buf, int64(rep), 10)
buf = append(buf, ':')
}
locke.Lock()
defer locke.Unlock()
sval := string(buf)
k, ok := encode[sval]
if !ok {
k = maxcode
maxcode++
encode[sval] = k
decode = append(decode, sval)
}
return k
}
value := func(k int) string {
locke.RLock()
defer locke.RUnlock()
if k >= maxcode {
log.Fatalf("value %d not register", k)
}
return decode[k]
}
reset := func() {
locke.Lock()
defer locke.Unlock()
encode = make(map[string]int)
decode = decode[:0]
maxcode = 0
}
clone := func() *BioSequenceClassifier {
clones := make([]*BioSequenceClassifier, 0, len(classifiers))
for _, cl := range classifiers {
if cl == nil {
continue
}
if cl.Clone != nil {
clones = append(clones, cl.Clone())
} else {
clones = append(clones, cl)
}
}
return CompositeClassifier(clones...)
}
c := BioSequenceClassifier{code, value, reset, clone, "CompositeClassifier"}
return &c
}