Условный сигнал Pthread – работает не так, как ожидалось

Я работаю над проектом и пытаюсь использовать pthread_cond_wait() и pthread_cond_signal() для синхронизации двух streamов.

Мой код выглядит примерно так:

 pthread_mutex_t lock_it = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t write_it = PTHREAD_COND_INITIALIZER; int main(int argc, char**argv) { pthread_t t_send_segments, t_recv_acks; pthread_create(&t_send_segments, NULL, send_segments, (void*)NULL); pthread_create(&t_recv_acks, NULL, recv_acks, (void*)NULL); pthread_join(t_recv_acks, (void**)NULL); pthread_mutex_destroy(&lock_it); pthread_cond_destroy(&write_it); } void* send_segments(void *v) { for(;;) { pthread_mutex_lock(&lock_it); printf("s1\n"); printf("s2\n"); pthread_cond_wait(&write_it, &lock_it); printf("s3\n"); printf("s4\n"); printf("s5\n"); pthread_mutex_unlock(&lock_it); } return 0; } void* recv_acks(void *v) { for(;;) { pthread_mutex_lock(&lock_it); printf("r1\n"); pthread_cond_signal(&write_it); printf("r2\n"); pthread_mutex_unlock(&lock_it); } return 0; } 

Ожидаемый результат:

 s1 s2 r1 s3 s4 s5 s1 s2 r2 r1 s3 s4 s5 (etc) 

Мой вывод не соответствует этой схеме вообще. Ясно, что у меня есть логическая ошибка где-то, но я не понимаю, где. Почему stream recv_acks() всегда не recv_acks() когда он попадает в pthread_cond_signal() поскольку pthread_cond_wait() всегда выполняется первым (из-за порядка, в котором я создаю streamи), и cond_wait() всегда выполняется, так как его в критический раздел?

Функция pthread_cond_signal не выдает текущую нить и не освобождает мьютекс. Все, что он делает, это перезапустить один stream, который приостановил себя при условии через pthread_cond_wait . Это просто означает, что пробужденный stream доступен для планирования, он не вызывает его немедленного выполнения. Планировщик streamов запланирует его когда-нибудь в будущем.

Кроме того, только потому, что s-thread был пробужден и соперничает за мьютекс, это не значит, что он собирается получить мьютекс. Мьютексы не обязательно справедливы для всех streamов, которые запросили его. Согласно странице man pthread_mutex : « pthread_mutex_lock блокирует данный мьютекс. Если мьютекс в настоящее время разблокирован, он становится заблокированным и принадлежит вызывающему streamу, а pthread_mutex_lock немедленно возвращается». Таким образом, r-thread может несколько раз вращаться в своем цикле, с радостью разблокировать и повторно переместить мьютекс несколько раз, прежде чем его заменит планировщик. Это означает, что s-thread получит шанс на мьютекс, если планировщик прерывает r-нить в течение короткого времени, в которое он выпустил мьютекс.

Чтобы добиться нужного результата, обе streamи должны будут контролировать свое выполнение с условием и сигнализировать друг другу, прежде чем приостанавливать себя. Однако это может быть или не быть тем, что вы действительно хотите сделать с вашим реальным проектом.

Еще одно замечание: на самом деле не имеет значения, в каком порядке вы создали streamи. Создание streamа не дает создающего streamа. Таким образом, основной stream, вероятно, создаст оба streamа до того, как он будет назначен, и планировщик streamов может запланировать либо один из них для выполнения следующего. Если s-thread запускается сначала на вашей платформе, это, скорее всего, является реализацией на вашей платформе, и на этом не стоит полагаться.

Кажется, я помню, как читал где-то, что pthread_cond_signal() фактически не вызывает немедленный выход streamа. Поскольку pthread_cond_signal() не освобождает блокировку, stream, который вызывает его, должен продолжать выполняться до тех пор, пока он не освободит блокировку, и только тогда он даст и позволит сигнальному streamу вернуться из pthread_cond_wait() .

Если это так, то я думаю, что ваш вывод будет выглядеть так:

 s1 s2 r1 r2 s3 s4 s5 

и т. д., если это не так, вам может потребоваться отредактировать фактический результат в вашем вопросе, чтобы получить точный ответ.

Вы неправильно используете условия. Я не понял точно, что вы хотите, но то, что вы пытаетесь сделать, похоже на типичную проблему синхронизации, называемую проблемой производителя / потребителя, которая может быть реализована с такими условиями, как я сделал.

Вы должны взглянуть на это, чтобы понять, как используются условия. Проблема заключается в следующем: у меня есть два streamа, один записывает данные в буфер фиксированного размера и другие данные чтения из буфера. Первый stream не может записать больше данных, если буфер заполнен, второй не может быть прочитан, если буфер пуст.

 #include  #include  #include  #define BUFFER_SIZE 4 int buffer_nb_entries = 0; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; void send(){ for(;;){ pthread_mutex_lock(&mutex); while( buffer_nb_entries == BUFFER_SIZE) /* If buffer is full, then wait */ pthread_cond_wait(&cond, &mutex); /* Here I am sure that buffer is not full */ printf("sending\n"); buffer_nb_entries++; pthread_cond_signal(&cond); // signal that the condition has changed. pthread_mutex_unlock(&mutex); } } void receive(){ for(;;){ pthread_mutex_lock(&mutex); while(buffer_nb_entries == 0) pthread_cond_wait(&cond, &mutex); /* Here I am sure that buffer is not empty */ printf("receiving\n"); buffer_nb_entries--; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); } } int main(){ pthread_t s, r; pthread_create(&s, NULL, (void *(*)(void*))send, NULL); pthread_create(&r, NULL, (void *(*)(void*))receive, NULL); pthread_join(s, NULL); } 

Обратите внимание, что перед вызовом pthread_cond_wait () вы должны что-то проверить перед тем, как вы это сделаете, и если вы вызываете функцию раньше, вы можете спать навсегда.

Я думаю, что твоя проблема исходит от попыток использовать один замок для слишком многих вещей. Вы должны использовать только блокировку для 1 вещи, таким образом, нет путаницы в том, чего вы ждете. Я предлагаю добавить вторую блокировку для сигнала записи. Кроме того, вы должны добавить второй cond_wait для второй группировки сообщений. Если вы этого не сделаете, порядок, в котором все будет работать, будет случайным. Вот моя отредактированная версия вашей программы:

 #include  #include  #include  #define NUM_MESSAGES 3 int messages = 0; void * send_segments(void *); void * recv_acks(void *v); pthread_mutex_t lock_it = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t write_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; int main(int argc, char**argv) { pthread_t t_send_segments, t_recv_acks; pthread_create(&t_send_segments, NULL, send_segments, (void*)NULL); pthread_create(&t_recv_acks, NULL, recv_acks, (void*)NULL); pthread_join(t_recv_acks, (void**)NULL); pthread_join(t_send_segments, (void**)NULL); //pthread_mutex_destroy(&lock_it); //pthread_cond_destroy(&write_it); } void* send_segments(void *v) { for(;;) { pthread_mutex_lock(&lock_it); printf("s1\n"); printf("s2\n"); pthread_mutex_unlock(&lock_it); // wait for other thread pthread_mutex_lock(&write_mutex); pthread_cond_wait(&write_cond, &write_mutex); pthread_mutex_unlock(&write_mutex); pthread_mutex_lock(&lock_it); printf("s3\n"); printf("s4\n"); printf("s5\n"); pthread_mutex_unlock(&lock_it); // wait for other thread pthread_mutex_lock(&write_mutex); pthread_cond_wait(&write_cond, &write_mutex); pthread_mutex_unlock(&write_mutex); if (messages > NUM_MESSAGES) return( NULL ); } return 0; } void* recv_acks(void *v) { for(;;) { // write first response pthread_mutex_lock(&lock_it); printf("r1\n"); pthread_mutex_unlock(&lock_it); // signal other thread pthread_mutex_lock(&write_mutex); pthread_cond_signal(&write_cond); pthread_mutex_unlock(&write_mutex); // write second response pthread_mutex_lock(&lock_it); printf("r2\n\n"); // increment count before releasing lock, otherwise the other thread // will be stuck waiting for a write_cond signal messages++; pthread_mutex_unlock(&lock_it); // signal other thread pthread_mutex_lock(&write_mutex); pthread_cond_signal(&write_cond); pthread_mutex_unlock(&write_mutex); if (messages > NUM_MESSAGES) return(NULL); } return 0; } 

Если я запустил эту программу, я получаю следующий вывод (обратите внимание, что я добавил вторую строку новой строки после r2 для ясности):

 leif@indurain:~/tmp$ ./test s1 s2 r1 s3 s4 s5 r2 s1 s2 r1 s3 s4 s5 r2 s1 s2 r1 s3 s4 s5 r2 s1 s2 r1 s3 s4 s5 r2 leif@indurain:~/tmp$