Intereting Posts
Существуют ли типы, такие как uint32, int32, uint64, int64, определенные в любом заголовке stdlib? Когда мы должны использовать утверждения в C? В C как мне печатать имя файла, перенаправленного как вход в оболочку каковы быстрые алгоритмы поиска дубликатов элементов в коллекции и их группировки? Эффективный способ удаления указанных символов из строки Как обрабатывать сокет, читаемый на C, когда удаленный сервер закрывает сокет, возможно, до того, как чтение будет закончено? Расширения `clang -ansi` Ошибка при чтении чисел из txt-файла в C Что-то не так с этим алгоритмом перетасовки? Печать наибольшего прайм-фактора композитного номера в C Какова временная сложность этого алгоритма умножения? Невозможное ограничение в ‘asm’: __asm__ __volatile порядок оценки || и && в c сравнение скорости между fgetc / fputc и fread / fwrite в C Как преобразовать unsigned long в строку

Многопроцессорность и трубы в C

Я пытаюсь научиться работать с fork() чтобы создавать новые процессы и pipes для связи с каждым процессом. Предположим, у меня есть список, содержащий 20 слов, и я создаю 3 процесса. Теперь мне нужно распределить слова между процессами, используя каналы, и каждый процесс сортирует список слов, которые он получает. То, как я хочу достичь этого, выглядит так:

 Word1 => Process1 Word2 => Process2 Word3 => Process3 Word4 => Process1 Word5 => Process2 Word6 => Process3 . . . 

Таким образом, каждый процесс будет иметь список слов для сортировки, и в конечном итоге я буду использовать MergeSort для объединения всех отсортированных списков, возвращаемых каждым процессом. Я не уверен, как использовать каналы для связи с каждым процессом (например, кормить каждый процесс словом). Любая помощь, которая поставит меня на правильный путь, будет оценена по достоинству.

Попробуйте этот код для размера. Он использует фиксированное количество дочерних процессов, но вы можете изменить это число, MAX_KIDS enum MAX_KIDS (он был в основном протестирован с этим набором в 3, позже я изменил его на 5, чтобы убедиться).

 #include  #include  #include  #include  #include  #include  #include  typedef struct Child { FILE *fp_to; FILE *fp_from; pid_t pid; } Child; enum { P_READ, P_WRITE }; /* Read, write descriptor of a pipe */ enum { MAX_LINE = 4096 }; static void be_childish(void); static void distribute(size_t nkids, Child *kids); static void err_exit(const char *fmt, ...); static void merge(size_t nkids, Child *kids); static void wait_for_kids(size_t nkids, Child *kids); static int make_kid(Child *kid) { int pipe1[2]; /* From parent to child */ int pipe2[2]; /* From child to parent */ if (pipe(pipe1) != 0) return -1; if (pipe(pipe2) != 0) { close(pipe1[P_READ]); close(pipe1[P_WRITE]); return -1; } if ((kid->pid = fork()) < 0) { close(pipe1[P_READ]); close(pipe1[P_WRITE]); close(pipe2[P_READ]); close(pipe2[P_WRITE]); return -1; } else if (kid->pid == 0) { dup2(pipe1[P_READ], STDIN_FILENO); dup2(pipe2[P_WRITE], STDOUT_FILENO); close(pipe1[P_READ]); close(pipe1[P_WRITE]); close(pipe2[P_READ]); close(pipe2[P_WRITE]); /* Reads standard input from parent; writes standard output to parent */ be_childish(); exit(0); } else { kid->fp_to = fdopen(pipe1[P_WRITE], "w"); kid->fp_from = fdopen(pipe2[P_READ], "r"); close(pipe1[P_READ]); close(pipe2[P_WRITE]); return 0; } } int main(void) { enum { NUM_KIDS = 5 }; Child kids[NUM_KIDS]; struct sigaction act; sigfillset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = SIG_DFL; sigaction(SIGCHLD, &act, 0); for (int i = 0; i < NUM_KIDS; i++) { if (make_kid(&kids[i]) != 0) err_exit("Fault starting child %d\n", i); } distribute(NUM_KIDS, kids); merge(NUM_KIDS, kids); wait_for_kids(NUM_KIDS, kids); return(0); } static void err_exit(const char *fmt, ...) { va_list args; va_start(args, fmt); vfprintf(stderr, fmt, args); va_end(args); exit(1); } static int qs_compare(const void *v1, const void *v2) { const char *s1 = *(char **)v1; const char *s2 = *(char **)v2; return(strcmp(s1, s2)); } static char *estrdup(const char *str) { size_t len = strlen(str) + 1; char *out = malloc(len); if (out == 0) err_exit("Out of memory!\n"); memmove(out, str, len); return(out); } static void be_childish(void) { char **lines = 0; size_t num_lines = 0; size_t max_lines = 0; char input[MAX_LINE]; while (fgets(input, sizeof(input), stdin) != 0) { if (num_lines >= max_lines) { size_t n = (2 * max_lines + 2); void *space = realloc(lines, n * sizeof(char *)); if (space == 0) err_exit("Out of memory!\n"); lines = space; max_lines = n; } lines[num_lines++] = estrdup(input); } qsort(lines, num_lines, sizeof(char *), qs_compare); for (size_t i = 0; i < num_lines; i++) { if (fputs(lines[i], stdout) == EOF) err_exit("Short write to parent from %d\n", (int)getpid()); } exit(0); } static void distribute(size_t nkids, Child *kids) { char input[MAX_LINE]; size_t n = 0; while (fgets(input, sizeof(input), stdin) != 0) { if (fputs(input, kids[n].fp_to) == EOF) err_exit("Short write to child %d\n", (int)kids[n].pid); if (++n >= nkids) n = 0; } /* Close pipes to children - let's them get on with sorting */ for (size_t i = 0; i < nkids; i++) { fclose(kids[i].fp_to); kids[i].fp_to = 0; } } static void read_line(Child *kid, char *buffer, size_t maxlen, int *length) { if (fgets(buffer, maxlen, kid->fp_from) != 0) { *length = strlen(buffer); buffer[*length] = '\0'; } else { buffer[0] = '\0'; *length = -1; fclose(kid->fp_from); kid->fp_from = 0; } } static int not_all_done(size_t nkids, int *lengths) { for (size_t i = 0; i < nkids; i++) { if (lengths[i] > 0) return 1; } return 0; } static void min_line(size_t nkids, int *len, char **lines, size_t maxlen, Child *kids, char *output) { size_t min_kid = 0; char *min_str = 0; for (size_t i = 0; i < nkids; i++) { if (len[i] <= 0) continue; if (min_str == 0 || strcmp(min_str, lines[i]) > 0) { min_str = lines[i]; min_kid = i; } } strcpy(output, min_str); read_line(&kids[min_kid], lines[min_kid], maxlen, &len[min_kid]); } static void merge(size_t nkids, Child *kids) { char line_data[nkids][MAX_LINE]; char *lines[nkids]; int len[nkids]; char output[MAX_LINE]; for (size_t i = 0; i < nkids; i++) lines[i] = line_data[i]; /* Preload first line from each kid */ for (size_t i = 0; i < nkids; i++) read_line(&kids[i], lines[i], MAX_LINE, &len[i]); while (not_all_done(nkids, len)) { min_line(nkids, len, lines, MAX_LINE, kids, output); fputs(output, stdout); } } static void wait_for_kids(size_t nkids, Child *kids) { int pid; int status; while ((pid = waitpid(-1, &status, 0)) != -1) { for (size_t i = 0; i < nkids; i++) { if (pid == kids[i].pid) kids[i].pid = -1; } } /* This check loop is not really necessary */ for (size_t i = 0; i < nkids; i++) { if (kids[i].pid != -1) err_exit("Child %d died without being tracked\n", (int)kids[i].pid); } } 

Общая картина обычно:

 pid_t pids[3]; int fd[3][2]; int i; for (i = 0; i < 3; ++i) { /* create the pipe */ if (pipe(fd[i]) < 0) { perror("pipe error"); exit(1); } /* fork the child */ pid[i] = fork(); if (pid[i] < 0) { perror("fork error"); } else if (pid[i] > 0) { /* in parent process */ /* close reading end */ close(fd[i][0]); } else { /* in child process */ /* close writing end */ close(fd[i][1]); /* read from parent */ read(fd[i][0], line, max); ... } } /* in parent process */ char words[100][10] = {...}; int j, child = 0; /* for all words */ for (j = 0; j < 100; ++j) { /* write to child */ write(fd[child][1], words[j], strlen(words[j])); ... ++child; if (child >= 3) child = 0; } 

Дублируйте часть трубы для передачи назад от дочернего к родительскому. Будьте осторожны, чтобы не зайти в тупик, когда родители и ребенок пытаются общаться в обоих направлениях одновременно.

В трубах нет ничего магического – это всего лишь коммуникационная среда с двумя конечными точками. Логика идет примерно так:

Создайте 3 трубки и удерживайте их на одной конечной точке. Вилка три раза, и каждый из этих раздвоенных детей держится на другом конце трубы. Затем ребенок переходит в цикл чтения, ожидая ввода и записи вывода. Родитель может объединять все выходы, а затем выполнять циклическое считывание входных данных. Это не самая красивая страtagsя, но она, безусловно, самая простая. т.е.

 while there is work left to do: for i in 1..3 write current work unit to pipe[i] for i in 1..3 read back response from pipe[i] 

Данный ребенок выглядит следующим образом:

 while(input = read from pipe) result = do work on input write result to pipe 

Следующий шаг будет делать ваш обратный отсчет в родительском процессе асинхронным, неблокирующим образом (возможно, с использованием select или только цикла опроса занятого ожидания). Для этого требуется, чтобы дети сообщали, на какую задачу они возвращают результат, потому что упорядочение может стать беспорядочным (т. Е. Вы больше не можете полагаться на первую рабочую единицу, которую вы отправляете, – это первый ответ, который вы получаете). Добро пожаловать в забавный мир ошибок параллелизма.

Учитывая несколько неопределенный характер вашего вопроса, я надеюсь, что это как-то полезно.