Библиотека для streamа данных в C

Как я могу выполнять stream данных (трубы и фильтры, поточную обработку, основанную на streamе) в C? И не с UNIX-трубами.

Недавно я встретил stream.py .

Потоки являются итерами с механизмом конвейерной обработки, позволяющим программировать stream данных и легко распараллеливать.

Идея состоит в том, чтобы вывести на выходе функцию, которая превращает итерабельность в другую итерабельную, и подключить ее к входу другой такой функции. Хотя вы уже можете это сделать, используя композицию функций, этот пакет обеспечивает элегантную нотацию для него, перегружая оператор >>.

Я хотел бы дублировать простую версию такого типа функций в C. Мне особенно нравится перегрузка оператора >> чтобы избежать беспорядка функции. Википедия указывает на этот намек с поста Usenet в 1990 году.

Почему C? Потому что я хотел бы иметь возможность делать это на микроcontrollerах и в C-расширениях для других языков высокого уровня (Max, Pd *, Python).

* (по иронии, учитывая, что Max и Pd были написаны на C, специально для этой цели – я искал что-то баребоны)

    Я знаю, что это не очень хороший ответ, но вы должны создать собственную простую инфраструктуру streamа данных.

    Я написал прототип DF-сервера (вместе с моим другом), у которого еще есть несколько нереализованных функций: он может передавать только сообщения Integer и Trigger в сообщениях, и он не поддерживает паралеллизм. Я просто пропустил эту работу: порты производителей компонентов имеют список указателей на функции для пользовательских портов, которые настроены на инициализацию, и они называют это (если список не пуст). Таким образом, когда происходит событие, компоненты выполняют древовидную диаграмму streamа данных. Поскольку они работают с целыми и триггерами, это очень быстро.

    Кроме того, я написал странный компонент, у которого есть один потребительский и один порт производителя, он просто передает данные через – но в другой stream. Это обычная процедура заканчивается быстро, так как она просто помещает данные и устанавливает флаг для streamа сторон производителя. Грязный, но он подходит моим потребностям: он отделяет длительные процессы древовидной ходьбы.

    Таким образом, как вы признаете, это асинхронная система с низким трафиком для быстрых задач, где размер графа не имеет значения.

    К несчастью, ваша проблема отличается от моего количества моментов, так же как и многие системы streamа данных могут отличаться от других, вам нужно иметь синхронное решение для обработки параллельных streamов.

    Я думаю, самая большая проблема на DF-сервере – диспетчер. Параллелизм, столкновение, streamи, приоритет … как я уже сказал, я просто пропустил проблему, а не решил. Вы тоже должны пропустить это. И вы также должны пропустить другие проблемы.

    диспетчер

    В случае синхронной архитектуры DF все компоненты должны запускаться один раз за цикл, за исключением особых случаев. У них есть простое предварительное условие: доступны ли входные данные? Таким образом, вы должны просто сканировать через компоненты и передавать их в stream бесплатных вызовов, если данные доступны. После обработки всех из них у вас останется N оставшихся компонентов, которые не обработаны. Вы должны обработать список еще раз. После второй обработки у вас будет M остатков. Если N == M, цикл закончен.

    Я думаю, что будут работать одни и те же вещи, если количество компонентов ниже 100.

    переплет

    Да, лучший способ привязки – это визуальное программирование. До окончания редактирования, config-like-код должен использовать insetad, что-то вроде:

      // disclaimer: not actual code Component* c1 = new AddComponent(); Component* c2 = new PrintComponent(); c2->format = "The result is %d\n"; bind(c1->result,c2->feed); 

    Легко ли писать, хорошо читаемо, другое желание?

    Сообщение

    Вы должны передавать чистые сырые пакеты между портами компонентов. Вам нужен только список привязок, которые содержат пары указателей на порты производителя и потребителя и содержат обработанный флаг, который использует «диспетчер».

    Вызов

    Проблема в том, что производитель не должен вызывать потребительский порт, а компонент; все компоненты (class) и стрелки находятся в компоненте. Таким образом, производитель должен сразу вызвать общую точку входа компонента, передать ему идентификатор потребителя или вызвать порт, который должен вызывать любой метод компонента, к которому он принадлежит.


    Итак, если вы можете жить с некоторыми ограничениями, я говорю, продолжайте, и напишите свой lite framework. Это хорошая задача, но писать небольшие компоненты и видеть, насколько умны они могут быть связаны друг с другом, создавая отличное приложение – это самое интересное.

    Если у вас есть дополнительные вопросы, не стесняйтесь спрашивать, я часто просматриваю ключевое слово «dataflow» здесь.

    Возможно, вы можете найти более простую модель данных для вашей программы.

    Я не знаю ни одной библиотеки для этой цели. Мой друг реализовал что-то подобное в стилистике как назначение лаборатории. Основными проблемами таких систем являются низкая производительность (очень плохо, если функции в длинных трубопроводных линиях небольшие) и потенциальная потребность в планировании (обнаружение блокировок и повышение приоритета, чтобы избежать перегрузки буфера).

    Из моего опыта аналогичной обработки данных обработка ошибок довольно обременительна. Поскольку функции в конвейере мало знают о контексте (намеренно, для повторного использования), они не могут создать разумное сообщение об ошибке. Можно внедрить обработку ошибок в streamе – передавать ошибки по каналу в виде данных, но для этого потребуется специальная обработка по всему месту, особенно на выходе, поскольку streamи не могут коррелировать с тем, что соответствует ошибке.

    Учитывая известные проблемы производительности подхода, мне трудно представить, как это будет соответствовать микроcontrollerам. По производительности, ничто не сравнится с простой функцией: можно создать функцию для каждого пути через линию данных.

    Вероятно, вы можете искать некоторую реализацию сети Петри (симулятор или генератор кода), поскольку они являются одной из теоретических основ для streamов.

    Это classно: http://code.google.com/p/libconcurrency/

    Легкая библиотека параллелизма для C с симметричными сопрограммами в качестве основной абстракции streamа управления. Библиотека похожа на State Threads, но использует сопрограммы вместо зеленых streamов. Это упрощает межпроцедурные вызовы и в значительной степени устраняет необходимость в мьютексах и семафорах для сигнализации.

    В конечном итоге вызовы coroutine также смогут безопасно перемещаться между streamами ядра, поэтому достижимая масштабируемость, следовательно, намного выше, чем State Threads, которая намеренно однопоточная.

    Эта библиотека была вдохновлена ​​«минимальным пакетом streamов пользовательского уровня» Дугласа У. Джонса. Псевдо-платформенно-нейтральный алгоритм зондирования на svn trunk получается из его кода.

    Существует также более безопасная и более портативная версия coroutine, основанная на копировании стека, которая была вдохновлена ​​страницей sigfpe на переносных продолжениях в C. Копирование более переносима и гибка, чем переключение стека, и изучает процесс копирования, совместимый с переключением.