package obirefidx import ( "fmt" "math" "os" "sync" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obialign" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obichunk" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiiter" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obikmer" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obiseq" "git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obitax" "github.com/schollz/progressbar/v3" log "github.com/sirupsen/logrus" ) func MakeStartClusterSliceWorker(clusterslot string, threshold float64) obiseq.SeqSliceWorker { clusteridslot := fmt.Sprintf("%s_clusterid", clusterslot) clusterheadslot := fmt.Sprintf("%s_clusterhead", clusterslot) cludteridentityslot := fmt.Sprintf("%s_clusteridentity", clusterslot) clusternclusterslot := fmt.Sprintf("%s_cluster_n", clusterslot) StartClusterSliceWorkers := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { sequences.SortOnCount(true) n := 0 for i, seqA := range sequences { if !seqA.HasAttribute(clusteridslot) { n++ seqA.SetAttribute(clusteridslot, seqA.Id()) seqA.SetAttribute(clusterheadslot, true) seqA.SetAttribute(cludteridentityslot, 1.0) for j := i + 1; j < len(sequences); j++ { seqB := sequences[j] if !seqB.HasAttribute(clusteridslot) { errmax := int(math.Ceil(float64(max(seqA.Len(), seqB.Len())) * (1.0 - threshold) * 2)) lca, alilen := obialign.FastLCSScore(seqA, seqB, errmax, nil) id := float64(lca) / float64(alilen) if lca >= 0 && id >= threshold { seqB.SetAttribute(clusteridslot, seqA.Id()) seqB.SetAttribute(clusterheadslot, false) seqB.SetAttribute(cludteridentityslot, id) } } } } } log.Debugf("Clustered %d sequences into %d clusters", len(sequences), n) for _, seq := range sequences { seq.SetAttribute(clusternclusterslot, n) } return sequences, nil } return StartClusterSliceWorkers } func MakeIndexingSliceWorker(indexslot, idslot string, kmers *[]*obikmer.Table4mer, taxonomy *obitax.Taxonomy, ) obiseq.SeqSliceWorker { IndexingSliceWorkers := func(sequences obiseq.BioSequenceSlice) (obiseq.BioSequenceSlice, error) { kmercounts := make( []*obikmer.Table4mer, len(sequences)) taxa := taxonomy.NewTaxonSlice(sequences.Len(), sequences.Len()) for i, seq := range sequences { j, ok := seq.GetIntAttribute(idslot) if !ok { return nil, fmt.Errorf("sequence %s has no %s attribute", seq.Id(), idslot) } kmercounts[i] = (*kmers)[j] taxon := seq.Taxon(taxonomy) if taxon == nil { log.Panicf("%s: Cannot get the taxon %s in %s", seq.Id(), seq.Taxid(), taxonomy.Name()) } taxa.Set(i, taxon) } limits := make(chan [2]int) waiting := sync.WaitGroup{} go func() { for i := 0; i < len(sequences); i += 10 { limits <- [2]int{i, min(i+10, len(sequences))} } close(limits) }() f := func() { for l := range limits { for i := l[0]; i < l[1]; i++ { idx := IndexSequence(i, sequences, &kmercounts, taxa, taxonomy) sequences[i].SetAttribute(indexslot, idx) } } waiting.Done() } nworkers := max(min(obidefault.ParallelWorkers(), len(sequences)/10), 1) waiting.Add(nworkers) for w := 0; w < nworkers; w++ { go f() } waiting.Wait() return sequences, nil } return IndexingSliceWorkers } func IndexFamilyDB(iterator obiiter.IBioSequence) obiiter.IBioSequence { log.Infoln("Family level reference database indexing...") log.Infoln("Loading database...") source, references := iterator.Load() nref := len(references) log.Infof("Done. Database contains %d sequences", nref) taxonomy := obitax.DefaultTaxonomy() if taxonomy == nil { log.Panicln("No taxonomy available use the --taxonomy option") } log.Infoln("Indexing database kmers...") refcounts := make( []*obikmer.Table4mer, len(references)) buffer := make([]byte, 0, 1000) for i, seq := range references { seq.SetAttribute("reffamidx_id", i) refcounts[i] = obikmer.Count4Mer(seq, &buffer, nil) } log.Info("done") partof := obiiter.IBatchOver(source, references, obidefault.BatchSize()).MakeIWorker(obiseq.MakeSetSpeciesWorker(taxonomy), false, obidefault.ParallelWorkers(), ).MakeIWorker(obiseq.MakeSetGenusWorker(taxonomy), false, obidefault.ParallelWorkers(), ).MakeIWorker(obiseq.MakeSetFamilyWorker(taxonomy), false, obidefault.ParallelWorkers(), ) family_iterator, err := obichunk.ISequenceChunkOnMemory( partof, obiseq.AnnotationClassifier("family_taxid", "NA"), ) if err != nil { log.Fatal(err) } family_iterator.MakeISliceWorker( MakeStartClusterSliceWorker("reffamidx", 0.9), false, obidefault.ParallelWorkers(), ).MakeISliceWorker( MakeIndexingSliceWorker("reffamidx_in", "reffamidx_id", &refcounts, taxonomy), false, obidefault.ParallelWorkers(), ).Speed("Family Indexing", nref).Consume() clusters := obiseq.MakeBioSequenceSlice(0) kcluster := make([]*obikmer.Table4mer, 0) taxa := taxonomy.NewTaxonSlice(references.Len(), references.Len()) j := 0 for i, seq := range references { if is_centrer, _ := seq.GetBoolAttribute("reffamidx_clusterhead"); is_centrer { clusters = append(clusters, seq) kcluster = append(kcluster, refcounts[i]) taxon := seq.Taxon(taxonomy) if taxon == nil { log.Panicf("%s: Cannot get the taxon %s in %s", seq.Id(), seq.Taxid(), taxonomy.Name()) } taxa.Set(j, taxon) j++ } } log.Infof("Done. Found %d clusters", clusters.Len()) var bar *progressbar.ProgressBar if obidefault.ProgressBar() { pbopt := make([]progressbar.Option, 0, 5) pbopt = append(pbopt, progressbar.OptionSetWriter(os.Stderr), progressbar.OptionSetWidth(15), progressbar.OptionShowCount(), progressbar.OptionShowIts(), progressbar.OptionSetDescription("Cluster indexing"), ) bar = progressbar.NewOptions(len(clusters), pbopt...) } limits := make(chan [2]int) waiting := sync.WaitGroup{} go func() { for i := 0; i < len(clusters); i += 10 { limits <- [2]int{i, min(i+10, len(clusters))} } close(limits) }() f := func() { for l := range limits { for i := l[0]; i < l[1]; i++ { idx := IndexSequence(i, clusters, &kcluster, taxa, taxonomy) clusters[i].SetOBITagRefIndex(idx) if bar != nil { bar.Add(1) } } } waiting.Done() } nworkers := obidefault.ParallelWorkers() waiting.Add(nworkers) for w := 0; w < nworkers; w++ { go f() } waiting.Wait() results := obiiter.IBatchOver(source, references, obidefault.BatchSize()).Speed("Writing db", nref) return results }