#include <iostream>
#include <algorithm>
#include <mpi.h>
#include <vector>
#include <sys/time.h>
using namespace std;
void mpi_sort(vector<float>* B, int arr_num, int arr_len, int g);
void matrix_inti(vector<float>* arr, int arr_num, int arr_len);
void show(vector<float>* arr, int arr_num, int arr_len);
int main(int argc, char* argv[]) {
int rank, size;
struct timeval start, end;
long total_time;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int g, arr_num, arr_len;
arr_num = 9;
arr_len = 9;
g = 1;
vector<float> B[arr_num];
vector<float> C[arr_num];
matrix_inti(B, arr_num, arr_len);
matrix_inti(C, arr_num, arr_len);
if (rank == 0) {
int i;
int finished = 1;
gettimeofday(&start, nullptr);
for (i = 1; i < size; i++)
MPI_Send(&B[(i - 1) * g][0], g * arr_len, MPI_FLOAT, i, (i - 1) * g, MPI_COMM_WORLD);
// 等所有任务都分配完后, 返回一个进程就结束一个进程
while(finished < size) {
MPI_Recv(&C[0][0], arr_len * g, MPI_FLOAT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
memcpy(&B[status.MPI_TAG][0], &C[0][0], sizeof(float) * arr_len * g);
// 判断任务是否所有都分配完毕
if (i * g <= arr_num) {
MPI_Send(&B[(i - 1) * g][0], g * arr_len, MPI_FLOAT, status.MPI_SOURCE, (i - 1) * g, MPI_COMM_WORLD);
// 推出循环的时候 i是size, 所以可以再发送一次, 之后再++
i++;
} else {
// 告诉刚才返回值的进程停止工作
MPI_Send(&B[0][0], g * arr_len, MPI_FLOAT, status.MPI_SOURCE, arr_num + 1, MPI_COMM_WORLD);
finished++;
}
}
gettimeofday(&end, nullptr);
show(B, arr_num,arr_len);
total_time = (1000000 * (end.tv_sec - start.tv_sec) + end.tv_usec - start.tv_usec) / 1000;
printf("total time is %ld\n", total_time);
// MPI_Finalize() 之后的不会被执行
MPI_Finalize();
} else {
mpi_sort(B, arr_num, arr_len, g);
}
return 0;
}
void mpi_sort(vector<float>* B, int arr_num, int arr_len, int g) {
MPI_Status status;
// 除了主进程0, 其余进程都时刻等待着接收数据
while(1) {
MPI_Recv(&B[0][0], g * arr_len, MPI_FLOAT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// 注意这种临界条件到底有没有等于
if (status.MPI_TAG + g > arr_num)
return;
for (int i = 0; i < g; i++)
stable_sort(B[i].begin(), B[i].end());
MPI_Send(&B[0][0], g * arr_len, MPI_FLOAT, 0, status.MPI_TAG, MPI_COMM_WORLD);
}
}
void matrix_inti(vector<float>* arr, int arr_num, int arr_len) {
srand(unsigned(time(nullptr)));
for (int i = 0; i < arr_num; i++) {
arr[i].resize(arr_len);
for(int j = 0; j < arr_len; j++) {
arr[i][j] = rand() % 10;
}
}
}
void show(vector<float>* arr, int arr_num, int arr_len) {
for (int i = 0; i < arr_num; i++) {
for(int j = 0; j < arr_len; j++) {
printf("%f ", arr[i][j]);
}
printf("\n");
}
}