Intereting Posts
Opencv 2.3.1 невозможно прочитать изображение имя переменной, аргументы функции во время выполнения в C В чем разница между выходом (0) и выходом (1) в C? Максимальная длина идентификатора Максимальное количество streamов для каждого процесса – sysconf (_SC_THREAD_THREADS_MAX) сбой C: «Исключение с плавающей запятой» в коде, который не содержит поплавков Может ли реализация, совместимая с ANSI C, включать дополнительные функции в свою стандартную библиотеку? Получение ошибки «ожидаемый идентификатор» или «(до« {»токена в C Запись streamа RTSP с помощью FFmpeg libavformat Возможно ли разместить CLR в программе на C? Получение собственного внешнего IP-адреса в POSIX C Переключение endiannes в C Совместимость типов указателей функции C Использование fseek и ftell для определения размера файла имеет уязвимость? Ошибка сегментации при выполнении программы, скомпилированной из сборки x86?

Продюсер-потребитель, использующий OpenMP-задачи

Я пытаюсь реализовать параллельный алгоритм, используя задачи в OpenMP. Параллельный шаблон программирования основан на идее производителя-потребителя, но поскольку потребительский процесс медленнее, чем производитель, я хочу использовать несколько производителей и несколько потребителей. Основная идея состоит в том, чтобы создать столько же streamов ОС, сколько и производителей, и тогда каждая из них создаст задачи, которые будут выполняться параллельно (потребителями). Каждый производитель будет связан с пропорциональным количеством потребителей (например, numCheckers / numSeekers). Я запускаю алгоритм на двухъядерном сервере Intel с 6 ядрами на чип. Дело в том, что когда я использую только одного продюсера (ищущего) и все большее число потребителей (шашек), производительность быстро падает, поскольку число потребителей растет (см. Таблицу ниже), хотя правильное количество ядер работает на 100%. С другой стороны, если я увеличиваю количество производителей, среднее время уменьшается или, по крайней мере, остается стабильным даже при пропорциональном числе потребителей. Мне кажется, что все улучшение происходит за счет разделения входных данных между производителями, и задачи только прослушиваются. Но опять же, у меня нет никаких объяснений поведению с одним продюсером. Я что-то упустил в логике OpenMP-Task? Я делаю что-то неправильно?

------------------------------------------------------------------------- | producers | consumers | time | ------------------------------------------------------------------------- | 1 | 1 | 0.642935 | | 1 | 2 | 3.004023 | | 1 | 3 | 5.332524 | | 1 | 4 | 7.222009 | | 1 | 5 | 9.472093 | | 1 | 6 | 10.372389 | | 1 | 7 | 12.671839 | | 1 | 8 | 14.631013 | | 1 | 9 | 14.500603 | | 1 | 10 | 18.034931 | | 1 | 11 | 17.835978 | ------------------------------------------------------------------------- | 2 | 2 | 0.357881 | | 2 | 4 | 0.361383 | | 2 | 6 | 0.362556 | | 2 | 8 | 0.359722 | | 2 | 10 | 0.358816 | ------------------------------------------------------------------------- 

Основной раздел моего кода – это пара:

 int main( int argc, char** argv) { // ... process the input (read from file, etc...) const char *buffer_start[numSeekers]; int buffer_len[numSeekers]; //populate these arrays dividing the input //I need to do this because I need to overlap the buffers for //correctness, so I simple parallel-for it's not enough //Here is where I create the producers int num = 0; #pragma omp parallel for num_threads(numSeekers) reduction(+:num) for (int i = 0; i < numSeekers; i++) { num += seek(buffer_start[i], buffer_len[i]); } return (int*)num; } int seek(const char* buffer, int n){ int num = 0; //asign the same number of consumers for each producer #pragma omp parallel num_threads(numCheckers/numSeekers) shared(num) { //only one time for every producer #pragma omp single { for(int pos = 0; pos < n; pos += STEP){ if (condition(buffer[pos])){ #pragma omp task shared(num) { //check() is a sequential function num += check(buffer[pos]); } } } #pragma omp taskwait } return num; } 

Наблюдаемое поведение связано с тем, что у вас нет вложенных parallel областей. Случается, что в первом случае вы действительно испытываете ОГРОМНЫЕ накладные расходы на задачи OpenMP. Это, скорее всего, связано с тем, что check() не выполняет достаточную работу по сравнению с накладными расходами, которые вводит время выполнения OpenMP. Почему он так себя ведет с 1 и двумя продюсерами?

При работе с одним производителем внешняя parallel область выполняется только с одним streamом. Такие parallel области неактивны в соответствии со спецификацией OpenMP API, и они просто выполняют код внутри серийно (единственные служебные данные являются дополнительным вызовом функции и доступом к общим переменным через указатели). В этом случае внутренняя parallel область, хотя она вложена, в то время как вложенная параллельность отключена, становится активной и отталкивает множество задач. Задачи вводят относительно высокие накладные расходы, и эти накладные расходы увеличиваются с количеством streamов. С 1 потребителем внутренняя parallel область также неактивна и, следовательно, работает последовательно без накладных расходов.

При работе с двумя производителями внешняя parallel область активна и, следовательно, внутренняя parallel область оказывается неактивной (помните, что вложен ни один вложенный параллелизм не включен), и, как следствие, никакие задачи не создаются вообще – seek() просто запускается серийно. Накладные расходы не требуются, и код работает почти в два раза быстрее, чем один случай 1 производитель / 1. Время выполнения не зависит от количества потребителей, потому что внутренняя parallel область всегда неактивна , независимо от того, сколько streamов указано.

Насколько велики накладные расходы, которые задают задачи и согласованный доступ к общим переменным? Я создал простой синтетический тест, который выполняет следующий код:

 for (int i = 0; i < 10000000; i++) { ssum += sin(i*0.001); } 

Серийно это выполняется менее чем на секунду на процессоре Westmere с уровнем оптимизации по умолчанию GCC 4.7.2. Затем я представил задачи с использованием простых atomic конструкций для защиты доступа к общей переменной ssum :

 #pragma omp parallel { #pragma omp single for (int i = 0; i < 10000000; i++) { #pragma omp task { #pragma omp atomic ssum += sin(i*0.001); } } } 

(здесь нет необходимости в taskwait поскольку имеется точка планирования на неявном барьере в конце parallel области)

Я также создал более сложный вариант, который выполняет редукцию так же, как Массимилиано предложил:

 #define STRIDE 8 #pragma omp parallel { #pragma omp single for (int i = 0; i < 10000000; i++) { #pragma omp task { const int idx = omp_get_thread_num(); ssumt[idx*STRIDE] += sin(i*0.001); } } #pragma omp taskwait const int idx = omp_get_thread_num(); #pragma omp atomic ssum += ssumt[idx*STRIDE]; } 

Код был скомпилирован с GCC 4.7.2 следующим образом:

 g++ -fopenmp -o test.exe test.cc 

Запустив его в пакетном режиме (так что другие процессы не могут вмешиваться) в систему с двумя гнездами Westmere (всего 12 ядер) с различным количеством streamов и разнесением streamов в сокетах, для обоих кодов получается следующее время выполнения:

 Configuration ATOMIC Reduction ATOMIC slowdown 2 + 0 2,79 ±0,15 2,74 ±0,19 1,8% 1 + 1 2,72 ±0,21 2,51 ±0,22 8,4% <----- 6 + 0 10,14 ±0,01 10,12 ±0,01 0,2% 3 + 3 22,60 ±0,24 22,69 ±0,33 -0,4% 6 + 6 37,85 ±0,67 38,90 ±0,89 -2,7% 

(время выполнения задается в секундах, измеренное omp_get_wtime() , усредненное по 10 прогонов / стандартное отклонение, также показанное /; x + y в столбце Configuration означает x streamов в первом сокете и y streamов во втором сокете)

Как вы можете видеть, накладные расходы от задач огромны. Это намного выше, чем накладные расходы от использования atomic вместо применения сокращения к поточно-частным аккумуляторам. Кроме того, присваивающая часть atomic с += компилируется в блокированную команду сравнения и обмена ( LOCK CMPXCHG ) - не намного выше накладных расходов, чем вызов omp_get_thread_num() каждый раз.

Следует также отметить, что система с двумя гнездами Westmere - это NUMA, поскольку каждый процессор имеет собственную память и доступ к памяти другого процессора проходит через QPI-связь и, следовательно, с повышенной задержкой (и, возможно, с меньшей пропускной способностью). Поскольку переменная ssum разделяется в atomic случае, streamи, которые запускаются на втором процессоре, по существу делают удаленные запросы. Тем не менее, разница между обоими кодами незначительна (за исключением отмеченного случая с двумя streamами - я должен исследовать, почему), и atomic код даже начинает превосходить тот, у кого сокращение, когда число streamов становится выше.

В многомасштабных системах NUMA синхронизация в atomic подходе может стать большей нагрузкой, поэтому она добавляет блокировку на более медленный удаленный доступ. Одна из таких систем является одним из наших узлов, связанных с BCS. BCS (коммутатор Bull Coherence Switch) является запатентованным решением от Bull, которое использует XQPI (eXternal QPI) для объединения нескольких плат Nehalem-EX в единую систему, представляя три уровня NUMA на пути (локальная память, удаленная память на одной плате удаленная память на удаленной плате). При запуске на одной такой системе, состоящей из 4-х плат с 4-мя октакоровыми процессорами Nehalem-EX каждый (всего 128 ядер), atomic исполняемый файл работает на 1036 с (!!), тогда как подход сокращения работает на 1047 с, то есть оба выполняются примерно в одно и то же время (мое предыдущее утверждение о том, что atomic подход на 21,5% медленнее был вызван джиттером сервисов ОС во время измерения). Оба числа от одиночных прогонов и, следовательно, не вполне репрезентативны. Обратите внимание, что в этой системе ссылка XQPI вводит очень высокую задержку для межплатных сообщений QPI, и, таким образом, блокировка очень дорогая, но не такая дорогостоящая. Часть накладных расходов может быть устранена с помощью сокращения, но она должна быть выполнена должным образом. Во-первых, локальные копии редукционной переменной должны быть также локальными для узла NUMA, где stream выполняется, а во-вторых, следует найти способ не совершать вызовы omp_get_thread_num() . Эти два могут быть достигнуты разными способами, но самый простой способ - использовать переменные threadprivate :

 static double ssumt; #pragma omp threadprivate(ssumt) #pragma omp parallel { ssumt = 0.0; #pragma omp single for (int i = 0; i < 10000000; i++) { #pragma omp task { ssumt += sin(i*0.001); } } #pragma omp taskwait #pragma omp atomic ssum += ssumt; } 

Доступ к ssumt нуждается в защите, так как две задачи редко выполняются одновременно в одном и том же streamе (нужно будет исследовать, соответствует ли это спецификациям OpenMP). Эта версия кода выполняется для 972 с. Еще раз это не так далеко от 1036 с и происходит только от одного измерения (т. Е. Это может быть просто статистическая флуктуация), но теоретически это должно быть быстрее.

Уроки, чтобы забрать домой:

  • Прочтите спецификацию OpenMP о вложенности parallel областей. Вложенный параллелизм включен либо путем установки переменной окружения OMP_NESTED в true либо путем вызова omp_set_nested(1); , Если включено, уровень активной вложенности может контролироваться OMP_MAX_ACTIVE_LEVELS как указано Массимилиано.
  • Следите за расчетом данных и старайтесь не использовать их, используя самый простой подход. Не каждый раз использование более сложного подхода дает вам лучшую производительность.
  • Специальные системы обычно требуют специального программирования.
  • Если есть сомнения, используйте инструмент проверки нити, если он доступен. У Intel есть одна (коммерческая) и Solaris Studio (ранее известная как Sun Studio) у Oracle тоже одна (бесплатная, версия Linux, несмотря на название продукта).
  • Подумайте над головой! Попытайтесь разделить работу на достаточно большие куски, чтобы накладные расходы от создания миллионов созданных задач не отрицали полученный параллельный выигрыш.

Как уже указывал Христос в комментарии, вы должны включить вложенный параллелизм. Это делается при настройке переменных среды:

  • OMP_NESTED (который включает или отключает вложенный параллелизм)
  • OMP_MAX_ACTIVE_LEVELS (который контролирует максимальное количество вложенных активных параллельных областей)

С другой стороны, вместо того, чтобы защищать накопление atomic конструкцией, я бы предложил следующую страtagsю:

 ... // Create a local buffer to accumulate partial results const int nthreads = numCheckers/numSeekers; const int stride = 32; // Choose a value that avoids false sharing int numt[stride*nthreads]; // Initialize to zero as we are reducing on + operator for (int ii = 0; ii < stride*nthreads; ii++) numt[ii] = 0; #pragma omp parallel num_threads(numCheckers/numSeekers) { //only one time for every producer #pragma omp single { for(int pos = 0; pos < n; pos += STEP){ if (condition(buffer[pos])){ #pragma omp task { //check() is a sequential function const int idx = omp_get_thread_num(); numt[idx*stride] += check(buffer[pos]); } } } #pragma omp taskwait // Accumulate partial results const int idx = omp_get_thread_num(); #pragma atomic num += numt[stride*idx]; } 

Это должно предотвратить возможное замедление из-за одновременных запросов на запись в том же месте памяти.

Обратите внимание, что предыдущая версия ответа, предполагающая использование reduction в самой внутренней параллельной области, была неправильной:

Элемент списка, который появляется в предложении сокращения самой внутренней охватывающей совместной работы или параллельной конструкции, может быть недоступен в явной задаче

не допускается в п. 2.1.9.6 спецификаций OpenMP 3.1.