MPI асинхронная / односторонняя связь

У меня есть ситуация, аналогичная приведенному ниже коду: рабочие процессы работают над подмножеством данных и должны отправить неизвестный объем данных обратно хозяину. Возможно ли, чтобы хозяин подождал и получил неизвестное количество сообщений от рабочих процессов? Есть ли способ сделать это с помощью одностороннего общения? Заранее спасибо!

#include  #include  #include  #include  #include  /* sample run/output: $mpirun -np 5 practice.exe @[1]: i=30 @[2]: i=0 @[2]: i=75 @[4]: i=40 @[4]: i=55 @[3]: i=85 @[3]: i=65 */ int main(int argc, char *argv[]) { int i, rank, size, np, nw, num; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &np); nw = np -1; srand(time(NULL)*rank); if (rank > 0) { for (i=(rank-1); i<(nw*10); i+=nw) { num = rand() % 100; if (num % 5 == 0) { printf("@[%d]: i=%d\n", rank, num); // SEND num TO MASTER } } } else { // RECEIVE num FROM WORKER } MPI_Finalize(); return EXIT_SUCCESS; } 

Конечно, есть много способов сделать это, но на самом деле это не имеет ничего общего с асинхронной связью. Вы можете сделать это с помощью односторонней связи, но даже у этого есть свои проблемы с этим (вам все равно придется угадывать, сколько полной памяти потребуется для данных).

Один из способов сделать это – просто выяснить, сколько данных у вас есть, отправить это вперед хозяину, чтобы он знал, сколько сообщений получать, а затем отправлять свои данные по одному:

 #include  #include  #include  #include  #define MAXPERWORKER 10 #define TAG_NUM_INCOMING 1 #define TAG_DATA 2 int main(int argc, char *argv[]) { int i, rank, size, np, nw, num; int mynums[MAXPERWORKER], numcount, total; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &np); nw = np -1; srand(time(NULL)*rank); if (rank > 0) { numcount = 0; total = 0; for (i=(rank-1); i<(nw*10); i+=nw) { num = rand() % 100; if (num % 3 == 0) { printf("@[%d]: i=%d\n", rank, num); mynums[numcount] = num; numcount++; total += num; } } /* of course, in this case we could just * do this in one message, but.. */ MPI_Send(&numcount, 1, MPI_INT, 0, TAG_NUM_INCOMING, MPI_COMM_WORLD); for (i=0; i 

Запустив это на 4 процесса, я получаю:

 $ mpirun -np 4 ./masterworker1 @[1]: i=39 @[1]: i=81 @[3]: i=9 @[3]: i=45 @[3]: i=0 @[3]: i=57 @[3]: Total of all nums is 111 @[1]: Total of all nums is 120 From [ 1]: 39 81 | 120 From [ 2]: 24 6 39 | 69 From [ 3]: 9 45 0 57 | 111 @[2]: i=24 @[2]: i=6 @[2]: i=39 @[2]: Total of all nums is 69 

Однако это может оказаться невыполнимым - вы можете не захотеть буферизовать все ваши данные, как это (и если бы вы могли, вы могли бы просто отправить его в одном сообщении).

Другой подход - отправить данные, а затем отправить специальное сообщение, когда вы закончите отправлять данные, и мастер просто продолжает получать, пока не услышит одно из этих сообщений «Готово» от каждого рабочего:

 #include  #include  #include  #include  #define MAXPERWORKER 10 #define TAG_DATA 2 #define TAG_DONE 1 int main(int argc, char *argv[]) { int i, rank, size, np, nw, num; int mynums[MAXPERWORKER], numcount, total; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &np); nw = np -1; srand(time(NULL)*rank); if (rank > 0) { numcount = 0; total = 0; for (i=(rank-1); i<(nw*10); i+=nw) { num = rand() % 100; if (num % 3 == 0) { printf("@[%d]: i=%d\n", rank, num); total += num; MPI_Send(&num, 1, MPI_INT, 0, TAG_DATA, MPI_COMM_WORLD); } } MPI_Send(&num, 1, MPI_INT, 0, TAG_DONE, MPI_COMM_WORLD); printf("@[%d]: Total of all nums is %d\n", rank, total); } else { int *totals = malloc(sizeof(int)*nw); int *counts = malloc(sizeof(int)*nw); int **data = malloc(sizeof(int *)*nw); int rcv; int j; int workernum; int stillsending; MPI_Status status; for (i=0; i 0) { MPI_Recv(&rcv, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); workernum = status.MPI_SOURCE-1; if (status.MPI_TAG == TAG_DONE) { stillsending--; } else if (status.MPI_TAG == TAG_DATA) { data[workernum][counts[workernum]] = rcv; totals[workernum] += rcv; counts[workernum]++; } } /* print results */ for (i=0; i 

Снова по 4 задачам я получаю:

 $ mpirun -np 4 ./masterworker2 @[1]: i=63 @[1]: i=99 @[1]: i=60 @[1]: i=69 @[1]: i=21 @[1]: i=48 @[1]: i=24 @[1]: Total of all nums is 384 @[2]: i=39 @[2]: i=84 @[2]: i=63 @[2]: Total of all nums is 186 @[3]: i=3 @[3]: i=51 @[3]: i=36 @[3]: Total of all nums is 90 From [ 1]: 63 99 60 69 21 48 24 | 384 From [ 2]: 39 84 63 | 186 From [ 3]: 3 51 36 | 90 

Обратите внимание, что в обоих случаях я полагался на некоторый массив размера MAXPERWORKER, чтобы предварительно распределять вещи; вам это действительно не нужно, но вы можете malloc массив и realloc при необходимости или использовать std :: vector вещь, если вы хотите использовать C ++.