From cbd42d5b30413f4264369d633e4a482c781e569c Mon Sep 17 00:00:00 2001 From: Eric Coissac Date: Sat, 26 Aug 2023 01:26:40 +0200 Subject: [PATCH] Implements a parallel version of Kmeans Former-commit-id: 58ab24b9b274451e00eea0275249234e2c2ea09b --- pkg/obistats/kmeans.go | 204 ++++++++++++++++-------- pkg/obistats/random.go | 12 +- pkg/obitools/obilandmark/obilandmark.go | 38 ++++- 3 files changed, 182 insertions(+), 72 deletions(-) diff --git a/pkg/obistats/kmeans.go b/pkg/obistats/kmeans.go index d513e9d..2d1f8d3 100644 --- a/pkg/obistats/kmeans.go +++ b/pkg/obistats/kmeans.go @@ -1,9 +1,11 @@ package obistats import ( + "math" + "sync" + "git.metabarcoding.org/lecasofts/go/obitools/pkg/obiutils" log "github.com/sirupsen/logrus" - "math" ) // AssignToClass applies the nearest neighbor algorithm to assign data points to classes. @@ -16,22 +18,52 @@ import ( // - classes: a slice of int representing the assigned class for each data point. func AssignToClass(data, centers *obiutils.Matrix[float64]) []int { classes := make([]int, len(*data)) - for i, rowData := range *data { - minDist := math.MaxFloat64 - for j, centerData := range *centers { - dist := 0.0 - for d, val := range rowData { - dist += math.Pow(val-centerData[d], 2) + numData := len(*data) + numCenters := len(*centers) + + var wg sync.WaitGroup + wg.Add(numData) + + for i := 0; i < numData; i++ { + go func(i int) { + defer wg.Done() + minDist := math.MaxFloat64 + minDistIndex := -1 + rowData := (*data)[i] + + for j := 0; j < numCenters; j++ { + centerData := (*centers)[j] + dist := 0.0 + + for d, val := range rowData { + diff := val - centerData[d] + dist += diff * diff + } + + if dist < minDist { + minDist = dist + minDistIndex = j + } } - if dist < minDist { - minDist = dist - classes[i] = j - } - } + + classes[i] = minDistIndex + }(i) } + + wg.Wait() + return classes } +// ComputeCenters calculates the centers of clusters for a given data set. +// +// Parameters: +// - data: a pointer to a matrix of float64 values representing the data set. +// - k: an integer representing the number of clusters. +// - classes: a slice of integers representing the assigned cluster for each data point. +// +// Returns: +// - centers: a pointer to a matrix of float64 values representing the centers of the clusters. // ComputeCenters calculates the centers of clusters for a given data set. // // Parameters: @@ -46,17 +78,33 @@ func ComputeCenters(data *obiutils.Matrix[float64], k int, classes []int) *obiut centers.Init(0.0) ns := make([]int, k) + var wg sync.WaitGroup + for i := range ns { ns[i] = 0 } - for i, row := range *data { - ns[classes[i]]++ - for j, val := range row { - centers[classes[i]][j] += val + // Goroutine code + goroutine := func(centerIdx int) { + defer wg.Done() + for j, row := range *data { + class := classes[j] + if class == centerIdx { + ns[centerIdx]++ + for l, val := range row { + centers[centerIdx][l] += val + } + } } } + for i := 0; i < k; i++ { + wg.Add(1) + go goroutine(i) + } + + wg.Wait() + for i := range centers { for j := range centers[i] { centers[i][j] /= float64(ns[i]) @@ -66,63 +114,73 @@ func ComputeCenters(data *obiutils.Matrix[float64], k int, classes []int) *obiut return ¢ers } -// ComputeInertia computes the inertia of the given data and centers. +// ComputeInertia computes the inertia of the given data and centers in parallel. // // Parameters: // - data: A pointer to a Matrix of float64 representing the data. +// - classes: A slice of int representing the class labels for each data point. // - centers: A pointer to a Matrix of float64 representing the centers. // // Return type: // - float64: The computed inertia. func ComputeInertia(data *obiutils.Matrix[float64], classes []int, centers *obiutils.Matrix[float64]) float64 { - inertia := 0.0 - for i, row := range *data { - for j, val := range row { - inertia += math.Pow(val-(*centers)[classes[i]][j], 2) - } + inertia := make(chan float64) + numRows := len(*data) + wg := sync.WaitGroup{} + wg.Add(numRows) + + for i := 0; i < numRows; i++ { + go func(i int) { + defer wg.Done() + row := (*data)[i] + class := classes[i] + center := (*centers)[class] + inertiaLocal := 0.0 + for j, val := range row { + diff := val - center[j] + inertiaLocal += diff * diff + } + inertia <- inertiaLocal + }(i) } - return inertia + + go func() { + wg.Wait() + close(inertia) + }() + + totalInertia := 0.0 + for localInertia := range inertia { + totalInertia += localInertia + } + + return totalInertia } -// Kmeans performs the k-means clustering algorithm on the given data. +// Kmeans performs the K-means clustering algorithm on the given data. // // if centers and *center is not nil, centers is considered as initialized // and the number of classes (k) is set to the number of rows in centers. // overwise, the number of classes is defined by the value of k. // // Parameters: -// - data: A pointer to a matrix containing the input data. -// - k: An integer representing the number of clusters. -// - centers: A pointer to a matrix representing the initial cluster centers. +// - data: A pointer to a Matrix[float64] that represents the input data. +// - k: An integer that specifies the number of clusters to create. +// - threshold: A float64 value that determines the convergence threshold. +// - centers: A pointer to a Matrix[float64] that represents the initial cluster centers. // // Returns: -// - A slice of integers representing the assigned class labels for each data point. -// - A pointer to a matrix representing the final cluster centers. - +// - classes: A slice of integers that assigns each data point to a cluster. +// - centers: A pointer to a Matrix[float64] that contains the final cluster centers. +// - inertia: A float64 value that represents the overall inertia of the clustering. +// - converged: A boolean value indicating whether the algorithm converged. func Kmeans(data *obiutils.Matrix[float64], k int, - // Kmeans performs the K-means clustering algorithm on the given data. - // - // if centers and *center is not nil, centers is considered as initialized - // and the number of classes (k) is set to the number of rows in centers. - // overwise, the number of classes is defined by the value of k. - // - // Parameters: - // - data: A pointer to a Matrix[float64] that represents the input data. - // - k: An integer that specifies the number of clusters to create. - // - threshold: A float64 value that determines the convergence threshold. - // - centers: A pointer to a Matrix[float64] that represents the initial cluster centers. - // - // Returns: - // - classes: A slice of integers that assigns each data point to a cluster. - // - centers: A pointer to a Matrix[float64] that contains the final cluster centers. - // - inertia: A float64 value that represents the overall inertia of the clustering. - // - converged: A boolean value indicating whether the algorithm converged. threshold float64, centers *obiutils.Matrix[float64]) ([]int, *obiutils.Matrix[float64], float64, bool) { if centers == nil || *centers == nil { *centers = obiutils.Make2DArray[float64](k, len((*data)[0])) - center_ids := SampleIntWithoutReplacemant(k, len(*data)) + center_ids := SampleIntWithoutReplacement(k, len(*data)) for i, id := range center_ids { (*centers)[i] = (*data)[id] } @@ -146,31 +204,45 @@ func Kmeans(data *obiutils.Matrix[float64], return classes, centers, inertia, delta < threshold } -// KmeansBestRepresentative finds the best representative among the data point of each cluster. +// KmeansBestRepresentative finds the best representative among the data point of each cluster in parallel. // // It takes a matrix of data points and a matrix of centers as input. // The best representative is the data point that is closest to the center of the cluster. // Returns an array of integers containing the index of the best representative for each cluster. func KmeansBestRepresentative(data *obiutils.Matrix[float64], centers *obiutils.Matrix[float64]) []int { - best_dist_to_centers := make([]float64, len(*centers)) - best_representative := make([]int, len(*centers)) + bestRepresentative := make([]int, len(*centers)) - for i := range best_dist_to_centers { - best_dist_to_centers[i] = math.MaxFloat64 + var wg sync.WaitGroup + wg.Add(len(*centers)) + + for j, center := range *centers { + go func(j int, center []float64) { + defer wg.Done() + + bestDistToCenter := math.MaxFloat64 + best := -1 + + for i, row := range *data { + dist := 0.0 + for d, val := range row { + diff := val - center[d] + dist += diff * diff + } + if dist < bestDistToCenter { + bestDistToCenter = dist + best = i + } + } + + if best == -1 { + log.Fatalf("No representative found for cluster %d", j) + } + + bestRepresentative[j] = best + }(j, center) } - for i, row := range *data { - for j, center := range *centers { - dist := 0.0 - for d, val := range row { - dist += math.Pow(val-center[d], 2) - } - if dist < best_dist_to_centers[j] { - best_dist_to_centers[j] = dist - best_representative[j] = i - } - } - } + wg.Wait() - return best_representative + return bestRepresentative } diff --git a/pkg/obistats/random.go b/pkg/obistats/random.go index 5ecbf45..f73ead4 100644 --- a/pkg/obistats/random.go +++ b/pkg/obistats/random.go @@ -2,7 +2,17 @@ package obistats import "math/rand" -func SampleIntWithoutReplacemant(n, max int) []int { +// SampleIntWithoutReplacement generates a random sample of unique integers without replacement. +// +// Generates a random sample of n unique integers without replacement included in the range [0, max). +// +// Parameters: +// - n: the number of integers to generate. +// - max: the maximum value for the generated integers. +// +// Returns: +// - []int: a slice of integers containing the generated sample. +func SampleIntWithoutReplacement(n, max int) []int { draw := make(map[int]int, n) diff --git a/pkg/obitools/obilandmark/obilandmark.go b/pkg/obitools/obilandmark/obilandmark.go index bec88a6..6609995 100644 --- a/pkg/obitools/obilandmark/obilandmark.go +++ b/pkg/obitools/obilandmark/obilandmark.go @@ -1,7 +1,6 @@ package obilandmark import ( - "math" "os" "sort" "sync" @@ -16,6 +15,18 @@ import ( log "github.com/sirupsen/logrus" ) +// MapOnLandmarkSequences performs sequence mapping on a given library of bio sequences. +// +// Computes for each sequence in the library a descriptor vector containing describing the sequence +// as the set of its distances to every landmark sequence. +// +// Parameters: +// - library: A slice of bio sequences to be mapped. +// - landmark_idx: A list of indices representing landmark sequences. +// - sizes: Optional argument specifying the number of workers to use. +// +// Returns: +// - seqworld: A matrix of float64 values representing the mapped coordinates. func MapOnLandmarkSequences(library obiseq.BioSequenceSlice, landmark_idx []int, sizes ...int) obiutils.Matrix[float64] { nworkers := obioptions.CLIParallelWorkers() @@ -73,6 +84,20 @@ func MapOnLandmarkSequences(library obiseq.BioSequenceSlice, landmark_idx []int, return seqworld } +// CLISelectLandmarkSequences selects landmark sequences from the given iterator and assigns attributes to the sequences. +// +// The fonction annotate the input set of sequences with two or three attributes: +// - 'landmark_id' indicating which sequence was selected and to which landmark it corresponds. +// - 'landmark_coord' indicates the coordinates of the sequence. +// - 'landmark_class' indicates to which landmark (landmark_id) the sequence is the closest. +// +// Parameters: +// - iterator: an object of type obiiter.IBioSequence representing the iterator to select landmark sequences from. +// +// Returns: +// - an object of type obiiter.IBioSequence providing the input sequence annotated with their coordinates respectively to +// each selected landmark sequences and with an attribute 'landmark_id' indicating which sequence was selected and to +// which landmark it corresponds. func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSequence { library := iterator.Load() @@ -80,14 +105,14 @@ func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSeque library_size := len(library) n_landmark := NCenter() - landmark_idx := obistats.SampleIntWithoutReplacemant(n_landmark, library_size) + landmark_idx := obistats.SampleIntWithoutReplacement(n_landmark, library_size) log.Infof("Library contains %d sequence", len(library)) var seqworld obiutils.Matrix[float64] for loop := 0; loop < 5; loop++ { sort.IntSlice(landmark_idx).Sort() - log.Infof("Selected indices : %v", landmark_idx) + log.Debugf("Selected indices : %v", landmark_idx) seqworld = MapOnLandmarkSequences(library, landmark_idx) initialCenters := obiutils.Make2DArray[float64](n_landmark, n_landmark) @@ -100,8 +125,11 @@ func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSeque dist_centers := 0.0 for i := 0; i < n_landmark; i++ { + center := (*centers)[i] + icenter := initialCenters[i] for j := 0; j < n_landmark; j++ { - dist_centers += math.Pow((*centers)[i][j]-initialCenters[i][j], 2) + diff := center[j] - icenter[j] + dist_centers += diff * diff } } @@ -112,7 +140,7 @@ func CLISelectLandmarkSequences(iterator obiiter.IBioSequence) obiiter.IBioSeque sort.IntSlice(landmark_idx).Sort() - log.Infof("Selected indices : %v", landmark_idx) + log.Debugf("Selected indices : %v", landmark_idx) seqworld = MapOnLandmarkSequences(library, landmark_idx) seq_landmark := make(map[int]int, n_landmark)