26 #include "density_clustering_mpi.hpp"
34 namespace Clustering {
59 std::vector<std::size_t>
61 std::size_t n_nodes) {
62 auto young_gauss = [](std::size_t n) -> std::size_t {
65 std::size_t workload = young_gauss(n_rows) / n_nodes;
66 std::size_t last_index = 0;
67 std::vector<std::size_t> load_balanced_indices(n_nodes);
68 for (
int i=n_nodes-1; i >= 0; --i) {
70 load_balanced_indices[i] = 0;
72 last_index = (std::size_t) sqrt(2*(young_gauss(last_index) + workload));
73 load_balanced_indices[i] = n_rows - last_index;
76 return load_balanced_indices;
81 std::vector<std::size_t>
83 const std::size_t n_rows,
84 const std::size_t n_cols,
86 const int mpi_n_nodes,
87 const int mpi_node_id) {
88 std::vector<float> radii = {radius};
89 std::map<float, std::vector<std::size_t>> pop_map =
calculate_populations(coords, n_rows, n_cols, radii, mpi_n_nodes, mpi_node_id);
90 return pop_map[radius];
94 std::map<float, std::vector<std::size_t>>
96 const std::size_t n_rows,
97 const std::size_t n_cols,
98 std::vector<float> radii,
99 const int mpi_n_nodes,
100 const int mpi_node_id) {
101 std::vector<std::size_t> load_balanced_indices = triangular_load_balance(n_rows, mpi_n_nodes);
102 unsigned int i_row_from = load_balanced_indices[mpi_node_id];
103 unsigned int i_row_to;
104 if (mpi_node_id == mpi_n_nodes-1) {
108 i_row_to = load_balanced_indices[mpi_node_id+1];
110 std::map<float, std::vector<unsigned int>> pops;
111 for (
float rad: radii) {
112 pops[rad].resize(n_rows, 1);
114 std::sort(radii.begin(), radii.end(), std::greater<float>());
115 std::size_t n_radii = radii.size();
116 std::vector<float> rad2(n_radii);
117 for (std::size_t i=0; i < n_radii; ++i) {
118 rad2[i] = radii[i]*radii[i];
125 std::size_t i, j, k, l;
127 ASSUME_ALIGNED(coords);
128 #pragma omp parallel for default(none) private(i,j,k,l,c,dist) \
129 firstprivate(i_row_from,i_row_to,n_rows,n_cols,rad2,radii,n_radii) \
130 shared(coords,pops) \
131 schedule(dynamic,1024)
132 for (i=i_row_from; i < i_row_to; ++i) {
133 for (j=i+1; j < n_rows; ++j) {
135 #pragma simd reduction(+:dist)
136 for (k=0; k < n_cols; ++k) {
137 c = coords[i*n_cols+k] - coords[j*n_cols+k];
140 for (l=0; l < n_radii; ++l) {
141 if (dist < rad2[l]) {
143 pops[radii[l]][i] += 1;
145 pops[radii[l]][j] += 1;
155 std::map<float, std::vector<std::size_t>> pops_results;
156 for (
auto& rad_pops: pops) {
158 MPI_Barrier(MPI_COMM_WORLD);
161 for (
int slave_id=1; slave_id < mpi_n_nodes; ++slave_id) {
162 std::vector<unsigned int> pops_buf(n_rows);
163 MPI_Recv(pops_buf.data(), n_rows, MPI_UNSIGNED, slave_id, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
164 for (std::size_t i=0; i < n_rows; ++i) {
165 rad_pops.second[i] += pops_buf[i];
170 MPI_Send(rad_pops.second.data(), n_rows, MPI_UNSIGNED,
MAIN_PROCESS, 0, MPI_COMM_WORLD);
172 MPI_Barrier(MPI_COMM_WORLD);
174 MPI_Bcast(rad_pops.second.data(), n_rows, MPI_UNSIGNED,
MAIN_PROCESS, MPI_COMM_WORLD);
175 MPI_Barrier(MPI_COMM_WORLD);
177 pops_results[rad_pops.first].resize(n_rows);
178 for (std::size_t i=0; i < n_rows; ++i) {
179 pops_results[rad_pops.first][i] = (std::size_t) rad_pops.second[i] + 1;
186 std::tuple<Neighborhood, Neighborhood>
188 const std::size_t n_rows,
189 const std::size_t n_cols,
190 const std::vector<float>& free_energy,
191 const int mpi_n_nodes,
192 const int mpi_node_id) {
193 unsigned int rows_per_chunk = n_rows / mpi_n_nodes;
194 unsigned int i_row_from = mpi_node_id * rows_per_chunk;
195 unsigned int i_row_to = i_row_from + rows_per_chunk;
198 if (mpi_node_id == mpi_n_nodes-1 ) {
204 for (std::size_t i=i_row_from; i < i_row_to; ++i) {
205 nh[i] =
Neighbor(n_rows+1, std::numeric_limits<float>::max());
206 nh_high_dens[i] =
Neighbor(n_rows+1, std::numeric_limits<float>::max());
210 std::size_t i, j, c, min_j, min_j_high_dens;
211 float dist, d, mindist, mindist_high_dens;
212 ASSUME_ALIGNED(coords);
213 #pragma omp parallel for default(none) \
214 private(i,j,c,dist,d,mindist,mindist_high_dens,min_j,min_j_high_dens) \
215 firstprivate(i_row_from,i_row_to,n_rows,n_cols) \
216 shared(coords,nh,nh_high_dens,free_energy) \
217 schedule(dynamic, 2048)
218 for (i=i_row_from; i < i_row_to; ++i) {
219 mindist = std::numeric_limits<float>::max();
220 mindist_high_dens = std::numeric_limits<float>::max();
222 min_j_high_dens = n_rows+1;
223 for (j=0; j < n_rows; ++j) {
226 #pragma simd reduction(+:dist)
227 for (c=0; c < n_cols; ++c) {
228 d = coords[i*n_cols+c] - coords[j*n_cols+c];
232 if (dist < mindist) {
237 if (free_energy[j] < free_energy[i] && dist < mindist_high_dens) {
238 mindist_high_dens = dist;
244 nh_high_dens[i] =
Neighbor(min_j_high_dens, mindist_high_dens);
248 MPI_Barrier(MPI_COMM_WORLD);
254 while (nh.size() != n_rows) {
255 MPI_Recv(buf, BUF_SIZE, MPI_FLOAT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
256 nh[(int) buf[0]] =
Neighbor((
int) buf[1], buf[2]);
257 nh_high_dens[(int) buf[0]] =
Neighbor((
int) buf[3], buf[4]);
260 for (std::size_t i=i_row_from; i < i_row_to; ++i) {
262 buf[1] = (float) nh[i].first;
263 buf[2] = nh[i].second;
264 buf[3] = nh_high_dens[i].first;
265 buf[4] = nh_high_dens[i].second;
266 MPI_Send(buf, BUF_SIZE, MPI_FLOAT,
MAIN_PROCESS, 0, MPI_COMM_WORLD);
271 MPI_Barrier(MPI_COMM_WORLD);
274 int BUF_SIZE = 4*n_rows;
275 std::vector<float> buf(BUF_SIZE);
277 for (std::size_t i=0; i < n_rows; ++i) {
278 buf[4*i] = (float) nh[i].first;
279 buf[4*i+1] = nh[i].second;
280 buf[4*i+2] = (float) nh_high_dens[i].first;
281 buf[4*i+3] = nh_high_dens[i].second;
284 MPI_Bcast(buf.data(), BUF_SIZE, MPI_FLOAT,
MAIN_PROCESS, MPI_COMM_WORLD);
285 if (mpi_node_id != MAIN_PROCESS) {
287 for (std::size_t i=0; i < n_rows; ++i) {
288 nh[i] =
Neighbor((
int) buf[4*i], buf[4*i+1]);
289 nh_high_dens[i] =
Neighbor((
int) buf[4*i+2], buf[4*i+3]);
293 return std::make_tuple(nh, nh_high_dens);
297 std::set<std::size_t>
299 const std::size_t n_cols,
300 const std::vector<FreeEnergy>& sorted_fe,
301 const std::size_t i_frame,
302 const std::size_t limit,
303 const float max_dist,
304 const int mpi_n_nodes,
305 const int mpi_node_id) {
306 unsigned int rows_per_chunk = limit / mpi_n_nodes;
307 unsigned int i_row_from = mpi_node_id * rows_per_chunk;
308 unsigned int i_row_to = i_row_from + rows_per_chunk;
309 if (mpi_node_id == mpi_n_nodes-1) {
314 std::set<std::size_t> nh;
318 std::vector<int> frame_in_nh(limit, 0);
320 const std::size_t i_frame_sorted = sorted_fe[i_frame].first * n_cols;
322 ASSUME_ALIGNED(coords);
323 #pragma omp parallel for default(none) private(j,c,d,dist2) \
324 firstprivate(i_frame,i_frame_sorted,i_row_from,i_row_to,max_dist,n_cols) \
325 shared(coords,sorted_fe,frame_in_nh)
326 for (j=i_row_from; j < i_row_to; ++j) {
329 #pragma simd reduction(+:dist2)
330 for (c=0; c < n_cols; ++c) {
331 d = coords[i_frame_sorted+c] - coords[sorted_fe[j].first*n_cols+c];
334 if (dist2 < max_dist) {
340 for (j=i_row_from; j < i_row_to; ++j) {
341 if (frame_in_nh[j] > 0) {
347 MPI_Barrier(MPI_COMM_WORLD);
349 for (
int i=1; i < mpi_n_nodes; ++i) {
352 MPI_Probe(i, MPI_ANY_TAG, MPI_COMM_WORLD, &stat);
354 MPI_Get_count(&stat, MPI_INT, &n_neighbors);
355 std::vector<unsigned int> buf(n_neighbors);
356 MPI_Recv(buf.data(), n_neighbors, MPI_UNSIGNED, i, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
357 for (
auto i_neighbor: buf) {
358 nh.insert(i_neighbor);
364 std::vector<unsigned int> buf(nh.size());
365 std::copy(nh.begin(), nh.end(), buf.begin());
366 MPI_Send(buf.data(), buf.size(), MPI_UNSIGNED,
MAIN_PROCESS, MPI_ANY_TAG, MPI_COMM_WORLD);
369 unsigned int n_neighbors_total;
370 std::vector<unsigned int> buf;
372 n_neighbors_total = nh.size();
374 MPI_Barrier(MPI_COMM_WORLD);
375 MPI_Bcast(&n_neighbors_total, 1, MPI_UNSIGNED,
MAIN_PROCESS, MPI_COMM_WORLD);
376 buf.resize(n_neighbors_total);
379 std::copy(nh.begin(), nh.end(), buf.begin());
381 MPI_Bcast(buf.data(), n_neighbors_total, MPI_UNSIGNED,
MAIN_PROCESS, MPI_COMM_WORLD);
382 if (mpi_node_id != MAIN_PROCESS) {
385 for (
auto i_neighbor: buf) {
386 nh.insert(i_neighbor);
393 main(boost::program_options::variables_map args) {
395 MPI_Init(NULL, NULL);
397 MPI_Comm_size(MPI_COMM_WORLD, &n_nodes);
399 MPI_Comm_rank(MPI_COMM_WORLD, &node_id);
401 const std::string input_file = args[
"file"].as<std::string>();
409 std::tie(coords, n_rows, n_cols) = Clustering::Tools::read_coords<float>(input_file);
411 std::vector<float> free_energies;
412 if (args.count(
"free-energy-input")) {
417 }
else if (args.count(
"free-energy") || args.count(
"population") || args.count(
"output")) {
418 if (args.count(
"radii")) {
420 if (args.count(
"output")) {
422 std::cerr <<
"error: clustering cannot be done with several radii (-R is set)." << std::endl;
426 std::vector<float> radii = args[
"radii"].as<std::vector<float>>();
427 std::map<float, std::vector<std::size_t>> pops =
calculate_populations(coords, n_rows, n_cols, radii, n_nodes, node_id);
429 for (
auto radius_pops: pops) {
430 std::string basename_pop = args[
"population"].as<std::string>() +
"_%f";
431 std::string basename_fe = args[
"free-energy"].as<std::string>() +
"_%f";
437 if ( ! args.count(
"radius")) {
438 std::cerr <<
"error: radius (-r) is required!" << std::endl;
440 const float radius = args[
"radius"].as<
float>();
444 std::vector<std::size_t> pops =
calculate_populations(coords, n_rows, n_cols, radius, n_nodes, node_id);
445 if (node_id ==
MAIN_PROCESS && args.count(
"population")) {
446 Clustering::Tools::write_single_column<std::size_t>(args[
"population"].as<std::string>(), pops);
452 if (node_id ==
MAIN_PROCESS && args.count(
"free-energy")) {
453 Clustering::Tools::write_single_column<float>(args[
"free-energy"].as<std::string>(), free_energies,
true);
460 if (args.count(
"nearest-neighbors-input")) {
464 nh_high_dens = nh_pair.second;
465 }
else if (args.count(
"nearest-neighbors") || args.count(
"output")) {
467 auto nh_tuple =
nearest_neighbors(coords, n_rows, n_cols, free_energies, n_nodes, node_id);
468 nh = std::get<0>(nh_tuple);
469 nh_high_dens = std::get<1>(nh_tuple);
470 if (node_id ==
MAIN_PROCESS && args.count(
"nearest-neighbors")) {
475 if (args.count(
"output")) {
476 const std::string output_file = args[
"output"].as<std::string>();
477 std::vector<std::size_t> clustering;
478 if (args.count(
"input")) {
483 if (args.count(
"threshold") == 0) {
484 std::cerr <<
"error: need threshold value for initial clustering" << std::endl;
487 float threshold = args[
"threshold"].as<
float>();
491 if ( ! args[
"only-initial"].as<bool>()) {
492 Clustering::logger(std::cout) <<
"assigning low density states to initial clusters" << std::endl;
495 Clustering::logger(std::cout) <<
"writing clusters to file " << output_file << std::endl;
496 Clustering::Tools::write_single_column<std::size_t>(output_file, clustering);
const int MAIN_PROCESS
identify MPI process 0 as main process
std::vector< std::size_t > assign_low_density_frames(const std::vector< std::size_t > &initial_clustering, const Neighborhood &nh_high_dens, const std::vector< float > &free_energy)
std::vector< std::size_t > triangular_load_balance(std::size_t n_rows, std::size_t n_nodes)
std::tuple< Neighborhood, Neighborhood > nearest_neighbors(const float *coords, const std::size_t n_rows, const std::size_t n_cols, const std::vector< float > &free_energy, const int mpi_n_nodes, const int mpi_node_id)
Clustering::Tools::Neighborhood Neighborhood
map frame id to neighbors
std::ostream & logger(std::ostream &s)
std::vector< std::size_t > initial_density_clustering(const std::vector< float > &free_energy, const Neighborhood &nh, const float free_energy_threshold, const float *coords, const std::size_t n_rows, const std::size_t n_cols, const std::vector< std::size_t > initial_clusters)
std::vector< float > calculate_free_energies(const std::vector< std::size_t > &pops)
void main(boost::program_options::variables_map args)
Clustering::Tools::Neighbor Neighbor
matches neighbor's frame id to distance
std::vector< std::size_t > calculate_populations(const float *coords, const std::size_t n_rows, const std::size_t n_cols, const float radius, const int mpi_n_nodes, const int mpi_node_id)
std::set< std::size_t > high_density_neighborhood(const float *coords, const std::size_t n_cols, const std::vector< FreeEnergy > &sorted_fe, const std::size_t i_frame, const std::size_t limit, const float max_dist, const int mpi_n_nodes, const int mpi_node_id)