Есть ли способ вызвать библиотеку thread-local init / cleanup при создании / уничтожении streamов?

Этот вопрос аналогичен Как вызвать функцию при создании и выходе streamа? но более конкретным. В другом проекте совместной работы с несколькими процессами я использовал комбинацию __attribute __ ((конструктор)), помеченную подпрограмму библиотеки init, ленивую инициализацию для каждого streamа и надежные futexes, чтобы гарантировать, что ресурсы не просочились в разделяемую память, даже если sys admin выбрал SIGKILL один из процессов, использующих его. Однако futexes в API-интерфейсах слишком тяжеловесны для моего текущего проекта, и даже несколько инструкций, чтобы обмениваться некоторой ленивой инициализацией, я бы предпочел избежать. Библиотечные API будут буквально называться несколько триллионов раз по нескольким сотням streamов в нескольких процессах (каждый API составляет всего пару сотен инструкций).

Я предполагаю, что ответ отрицательный, но поскольку я потратил пару часов на поиски и не нашел окончательного ответа, я думал, что попрошу его здесь, тогда следующий человек, который ищет простой ответ, сможет быстрее найти его.

Моя цель довольно проста: выполнить некоторую инициализацию по streamам, так как streamи создаются в нескольких процессах асинхронно и надежно выполняют некоторую очистку в какой-то момент, когда streamи уничтожаются асинхронно. Не обязательно быть незамеченным, это должно произойти в конце концов.

Некоторые гипотетические идеи для привлечения критического мышления: гипотетический pthread_atclone (), вызванный из __attribute __ ((constructor)), помеченный библиотекой init func, будет удовлетворять первому условию. И расширение для futex () es, чтобы добавить операцию semop-like с значением futex_adj per-thread, которое, если отличное от нуля в do_exit (), приводит к установке FUTEX_OWNER_DIED для семафора futex, что позволяет очистить в следующий раз futex коснулся.

    Ну, во-первых, вы должны документировать, что пользователи библиотеки не должны асинхронно прерывать streamи таким образом, чтобы они не эксплицитно выделяли ресурсы, принадлежащие вашей библиотеке (закрытие дескриптора, что бы то ни было), TBH, просто завершая streamи вообще до завершения процесса плохая идея.

    Труднее определить, является ли весь процесс SIGKILLed, пока он использует вашу библиотеку. Моя лучшая догадка заключается в том, что все процессы, желающие использовать вашу библиотеку, должны сначала войти в систему, чтобы их pid можно было добавить в контейнер. Используя stream, запущенный в вашей инициализации lib, опрос для pid’s, который исчез с kill (pid, 0), и выполните любую apporiate-очистку. Это не очень удовлетворительно, (я ненавижу опрос), но я не вижу альтернатив, которые не очень грязны 🙁

    Насколько я могу судить, после исследований и экспериментов я придумал то, что, кажется, является «лучшей практикой». Если кто-нибудь знает лучше, прокомментируйте!

    Для первой части, для инициализации в streamе, я не смог найти альтернативы простой ленивой инициализации. Тем не менее, я решил, что немного более эффективно перемещать ветвь на вызывающего, так что конвейерная обработка в новом стеке стека не сразу сталкивается с фактически ненужной ветвью. поэтому вместо этого:

     __thread int tInf = 0; void threadDoSomething(void *data) { if (!tInf) { _threadInitInfo(&tInf); } /*l * do Something. */ } 

    Это:

     __thread int tInf = 0; #define threadDoSomething(data) (((!tInf)?_threadInitInfo(&tInf):0), \ _threadDoSomething((data))) void _threadDoSomething(void *data) { /*l * do Something. */ } 

    Комментарии к (хотя бы незначительной) пользе этого приветствия!

    Во второй части, прочно выполняющей некоторую очистку, когда нити умирают независимо от того, насколько асинхронно, я не смог найти какое-либо решение лучше, чем иметь процесс reaping epoll_wait () в файловом дескрипторе для прочитанного конца открытого канала, переданного ему через управляющее сообщение SCM_RIGHTS в вызове sendmsg () на абстрактном адресе сокета домена UNIX. Звучит сложно, но это не так уж плохо, вот клиентская сторона:

     /*m * Client that registers a thread with a server who will do cleanup of a * shared interprocess object even if the thread dies asynchronously. */ #include  // socket(), bind(), recvmsg() #include  // syscall() #include  // sockaddr_un #include  // uint64_t #include  // O_CLOEXEC() #include  // malloc() #include  // random() #include  // close(), usleep() #include  // pthread_create() #include  // Our API. char iovBuf[] = "SP1"; // 3 char buf to send client type __thread pid_t cliTid = 0; // per-thread copy of self's Thread ID /*f * initClient() is called when we realise we need to lazily initialise * our thread based on cliTid being zero. */ void * initClient(void *ptr) { struct sockaddr_un svAddr; struct msghdr msg; struct iovec io; struct cmsghdr *ctrMsg; uint64_t ltid; // local 8-byte copy of the tid int pfds[2], // two fds of our pipe sfd; // socket fd /*s * This union is necessary to ensure that the buffer is aligned such that * we can read cmsg_{len,level,type} from the cmsghdr without causing an * alignment fault (SIGBUS.) */ union { struct cmsghdr hdr; char buf[CMSG_SPACE(sizeof(int))]; } ctrBuf; pfds[0] = pfds[1] = sfd = -1; /*l * Get our Thread ID. */ ltid = (uint64_t)(cliTid = syscall(SYS_gettid)); /*l * Set up an abstract unix domain socket address. */ svAddr.sun_family = AF_UNIX; svAddr.sun_path[0] = '\0'; strcpy(&svAddr.sun_path[1], EPLS_SRV_ADDR); /*l * Set up a socket datagram send buffer. */ io.iov_base = iovBuf; io.iov_len = sizeof(iovBuf); msg.msg_iov = &io; msg.msg_iovlen = 1; msg.msg_control = ctrBuf.buf; msg.msg_controllen = sizeof(ctrBuf); msg.msg_name = (struct sockaddr *)&svAddr, msg.msg_namelen = (&svAddr.sun_path[0] - (char *)&svAddr) + 1 + sizeof(EPLS_SRV_ADDR); /*l * Set up the control message header to indicate we are sharing a file * descriptor. */ ctrMsg = CMSG_FIRSTHDR(&msg); ctrMsg->cmsg_len = CMSG_LEN(sizeof(int)); ctrMsg->cmsg_level = SOL_SOCKET; ctrMsg->cmsg_type = SCM_RIGHTS; /*l * Create file descriptors with pipe(). */ if (-1 == pipe(pfds)) { printErrMsg("TID: %d pipe() failed", cliTid); } else { /*l * Write our tid to the pipe. */ memmove(CMSG_DATA(ctrMsg), &pfds[0], sizeof(int)); if (-1 == write(pfds[1], &ltid, sizeof(uint64_t))) { printErrMsg("TID: %d write() failed", cliTid); } if (-1 == (sfd = socket(AF_UNIX, SOCK_DGRAM, 0))) { printErrMsg("TID: %d socket() failed", cliTid); } else if (-1 == sendmsg(sfd, &msg, 0)) { printErrMsg("TID: %d sendmsg() failed", cliTid); } else { printVerbMsg("TID: %d sent write fd %d to server kept read fd %d", cliTid, pfds[0], pfds[1]); /*l * Close the read end of the pipe, the server has it now. */ close(pfds[0]); pfds[0] = -1; } } if (-1 != pfds[1]) close(pfds[1]); if (-1 != pfds[0]) close(pfds[0]); if (-1 != sfd) close(sfd); return (void *)0; } 

    И код жнеца:

     /*m * Abstract datagram socket listening for FD's from clients. */ #include  // socket(), bind(), recvmsg() #include  // epoll_{create,wait}() #include  // sockaddr_un #include  // malloc() #include  // close() #include  // Our API. /*s * socket datagram structs for receiving structured messages used to transfer * fds from our clients. */ struct msghdr msg = { 0 }; struct iovec io = { 0 }; char iovBuf[EPLS_MSG_LEN]; // 3 char buf to receive client type /*s * This union is necessary to ensure that the buffer is aligned such that * we can read cmsg_{len,level,type} from the cmsghdr without causing an * alignment fault (SIGBUS.) */ union { struct cmsghdr hdr; char buf[CMSG_SPACE(sizeof(int))]; } ctrBuf; typedef struct _tidFd_t { struct _tidFd_t *next; pid_t tid; int fd; } tidFd_t; tidFd_t *tidFdLst = (tidFd_t *)0; /*f * Perform some handshaking with a new client and add the file descriptor * it shared with us to the epoll set. */ static void welcomeClient(int efd, int cfd) { uint64_t tid; tidFd_t *tfd; struct epoll_event epEv; tfd = (tidFd_t *)-1; /*l * The fd is a pipe and should be readable, and should contain the * tid of the client. */ if (-1 != read(cfd, &tid, sizeof(tid)) && (tfd = malloc(sizeof(*tfd)))) { tfd->fd = cfd; tfd->tid = (pid_t)tid; tfd->next = tidFdLst; /*l * Single threaded process, no race condition here. */ tidFdLst = tfd; /*l * Add the fd to the epoll() set so that we will be woken up with * an error if the thread dies. */ epEv.events = EPOLLIN; epEv.data.fd = cfd; if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, cfd, &epEv)) { printErrMsg("TID: %ld Could not register fd %d with epoll set", tid, cfd); } else { printVerbMsg("TID: %ld Registered fd %d with epoll set", tid, cfd); } /*l * Couldn't allocate memory for the new client. */ } else if (!tfd) { printErrMsg("Could not allocate memory for new client"); /*l * Could not read from the eventfd() file descriptor. */ } else { printErrMsg("Could not read from client file descriptor"); } } /*f * Perform some handshaking with a new client and add the file descriptor * it shared with us to the epoll set. */ static void processClientEvent(int efd, struct epoll_event *epEv) { tidFd_t *tfd, **bLnk; /*l * Walk the list of per-tid fd structs. */ for (bLnk = &tidFdLst; (tfd = *bLnk); bLnk = &tfd->next) if (tfd->fd == epEv->data.fd) break; if (!tfd) { printErrMsg("client file descriptor %d not found on the tfd list!", epEv->data.fd); /*l * If we received an EPOLLHUP on the fd, cleanup. */ } else if (epEv->events & EPOLLHUP) { /*l * Try to remove the tid's pipe fd from the epoll set. */ if (-1 == epoll_ctl(efd, EPOLL_CTL_DEL, epEv->data.fd, epEv)) { printErrMsg("couldn't delete epoll for tid %d", tfd->tid); /*l * Do tid cleanup here. */ } else { printVerbMsg("TID: %d closing fd: %d", tfd->tid, epEv->data.fd); close(epEv->data.fd); /*l * Remove the per-tid struct from the list and free it. */ *bLnk = tfd->next; free(tfd); } } else { printVerbMsg("TID: %d Received unexpected epoll event %d", tfd->tid, epEv->events); } } /*f * Create and listen on a datagram socket for eventfd() file descriptors * from clients. */ int main(int argc, char *argv[]) { struct sockaddr_un svAddr; struct cmsghdr *ctrMsg; struct epoll_event *epEv, epEvs[EPLS_MAX_EPEVS]; int sfd, efd, cfd, nfds; sfd = efd = -1; /*l * Set up an abstract unix domain socket address. */ svAddr.sun_family = AF_UNIX; svAddr.sun_path[0] = '\0'; strcpy(&svAddr.sun_path[1], EPLS_SRV_ADDR); /*l * Set up a socket datagram receive buffer. */ io.iov_base = iovBuf; // 3-char buffer to ID client type io.iov_len = sizeof(iovBuf); msg.msg_name = (char *)0; // No need for the client addr msg.msg_namelen = 0; msg.msg_iov = &io; // single IO vector in the S/G array msg.msg_iovlen = 1; msg.msg_control = ctrBuf.buf; // Control message buffer msg.msg_controllen = sizeof(ctrBuf); /*l * Set up an epoll event. */ epEv = &epEvs[0]; epEv->events = EPOLLIN; /*l * Create a socket to receive datagrams on and register the socket * with our epoll event. */ if (-1 == (epEv->data.fd = sfd = socket(AF_UNIX, SOCK_DGRAM, 0))) { printErrMsg("socket creation failed"); /*l * Bind to the abstract address. The pointer math is to portably * handle weird structure packing _just_in_case_. */ } else if (-1 == bind(sfd, (struct sockaddr *)&svAddr, (&svAddr.sun_path[0] - (char *)&svAddr) + 1 + sizeof(EPLS_SRV_ADDR))) { printErrMsg("could not bind address: %s", &svAddr.sun_path[1]); /*l * Create an epoll interface. Set CLOEXEC for tidiness in case a thread * in the server fork()s and exec()s. */ } else if (-1 == (efd = epoll_create1(EPOLL_CLOEXEC))) { printErrMsg("could not create epoll instance"); /*l * Add our socket fd to the epoll instance. */ } else if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, sfd, epEv)) { printErrMsg("could not add socket to epoll instance"); /*l * Loop receiving events on our epoll instance. */ } else { printVerbMsg("server listening on abstract address: %s", &svAddr.sun_path[1]); /*l * Loop forever listening for events on the fds we are interested * in. */ while (-1 != (nfds = epoll_wait(efd, epEvs, EPLS_MAX_EPEVS, -1))) { /*l * For each fd with an event, figure out what's up! */ do { /*l * Transform nfds from a count to an index. */ --nfds; /*l * If the fd with an event is the listening socket a client * is trying to send us their eventfd() file descriptor. */ if (sfd == epEvs[nfds].data.fd) { if (EPOLLIN != epEvs[nfds].events) { printErrMsg("unexpected condition on socket: %d", epEvs[nfds].events); nfds = -1; break; } /*l * Reset the sizes of the receive buffers to their * actual value; on return they will be set to the * read value. */ io.iov_len = sizeof(iovBuf); msg.msg_controllen = sizeof(ctrBuf); /*l * Receive the waiting message. */ if (-1 == recvmsg(sfd, &msg, MSG_CMSG_CLOEXEC)) { printVerbMsg("failed datagram read on socket"); /*l * Verify that the message's control buffer contains * a file descriptor. */ } else if ( NULL != (ctrMsg = CMSG_FIRSTHDR(&msg)) && CMSG_LEN(sizeof(int)) == ctrMsg->cmsg_len && SOL_SOCKET == ctrMsg->cmsg_level && SCM_RIGHTS == ctrMsg->cmsg_type) { /*l * Unpack the file descriptor. */ memmove(&cfd, CMSG_DATA(ctrMsg), sizeof(cfd)); printVerbMsg("Received fd %d from client type %c%c%c", cfd, ((char *)msg.msg_iov->iov_base)[0], ((char *)msg.msg_iov->iov_base)[1], ((char *)msg.msg_iov->iov_base)[2]); /*l * Process the incoming file descriptor and add * it to the epoll() list. */ welcomeClient(efd, cfd); /*l * Note but ignore incorrectly formed datagrams. */ } else { printVerbMsg("could not extract file descriptor " "from client's datagram"); } /*l * The epoll() event is on one of the file descriptors * shared with a client, process it. */ } else { processClientEvent(efd, &epEvs[nfds]); } } while (nfds); /*l * If something happened to our socket break the epoll_wait() * loop. */ if (nfds) break; } } /*l * An error occurred, cleanup. */ if (-1 != efd) close(efd); if (-1 != sfd) close(sfd); return -1; } 

    Сначала я пытался использовать eventfd (), а не pipe (), но дескрипторы файла eventfd представляют объекты не соединения, поэтому закрытие fd в клиентском коде не создало EPOLLHUP в жатке. Если кто-нибудь знает лучшую альтернативу pipe () для этого, дайте мне знать!

    Для полноты здесь #defines, используемые для построения абстрактного адреса:

     /*d * server abstract address. */ #define EPLS_SRV_NAM "_abssSrv" #define EPLS_SRV_VER "0.0.1" #define EPLS_SRV_ADDR EPLS_SRV_NAM "." EPLS_SRV_NAM #define EPLS_MSG_LEN 3 #define EPLS_MAX_EPEVS 32 

    Вот и все, надеюсь, что это полезно для кого-то.