Skip to content

ARINA-Q - Очередь собщений и конвейер (машина состояний) на основе СУБД.

Notifications You must be signed in to change notification settings

sergey-golovkin/arina-q

Repository files navigation

ARINA-Q - Очередь собщений и конвейер (машина состояний) на основе СУБД.

Проект "arina-q" - это попытка закончить беспорядочные попытки программистов делать интеграции по принципу "просто табличка". Постоянно с этим сталкиваясь, я решил поделиться тем, что уже более 7 лет успешно используется в продакшене ряда крупных компаний.

Все что вам нужно - это СУБД и Apache Camel , в рамках которого эти библиотеки работают.

Поддерживаемые СУБД:

  • ORACLE
  • MS SQL
  • POSTGRESQL
  • MYSQL
  • MARIABD
  • FIREFIRD

Я буду рад, если вам это понравится.

Версии

JRE: 11

Apache Camel: 3.11.0

Термины:

  1. DATA_IN - входящая очередь
  2. DATA_OUT - исходящая очередь
  3. STAGES - конвейер

arina-q - очередь, предназначена для однократной последовательной обработки каждого сообщения в соответсвии с заданными фильтрами, после успешной обработки которого оно изымается из очереди. В случае неуспешной обработки очередь будет пытаться раза за разом обработать первое сообщение из очереди в соответствии с указанными фильтрами.

DATA_SOURCE - имя bean, с описанием Data Source.
SOURCE_SYSID - идентификатор системы источника сообщения (строка)
DESTINATION_SYSID - идентификатор системы назначения сообщения (строка)
MESSAGE_DATA_TYPE - идентификатор типа сообщения (число)
SUB_Q_NO - номер подочереди, по умолчнию = 0 (число)
ADDITIONAL_INFO - служебная информация сопровождающая основное сообщение (строка)
EXPIRE_DATE - время жизни сообщения, (дата-время) в формате yyyy-MM-dd HH:mm:ss.fff
REPLACE_ID - некий уникальный идентификатор сообщения (строка). Используется для подмены сообщения стоящего во входящей очереди.
TRANS_ID - идентификатор группы сообщений (строка) - логической транзакции. При указании идентификатора логической транзакции сообщения помещаются в исходящую очередь,
           но реально отправляются из нее только после специального указания об этом, либо удаляются автоматически по истечению времени жизни, если указания не поступило.
TRANS_SEQ_NO - номер сообщения внутри логической транзакции (число). Позволяет указать порядок следования отсылаемых сообщений и
               позволяет помещать сообщения в логическую транзакцию в произвольной последовательности,
               но при получении указания на отсылку логической транзакции, сообщения автоматически сортируются в соответсвии с указанным номером внутри нее.
TRANS_TTL - время жизни логической транзакции сообщений, (время в секундах), по умолчанию 3600 сек.
            По истечению указанного времени, если не поступила команда на отсылку логической транзакции, сообщения входящие в нее автоматически удаляются.

Чтение из очереди:

<from uri="arina-q:{DATA_SOURCE}?
               [mode={Direct|Reverse}&amp;]
               [systemsFilter={SOURCE_SYSID_01}[,|;]{SOURCE_SYSID_02}&amp;]
               [typesFilter={MESSAGE_DATA_TYPE_01}[,|;]{MESSAGE_DATA_TYPE_02}&amp;]
               [subQ={SUB_Q_NO_00}[,|;]{SUB_Q_NO_01}&amp;]
               [initialDelay=60000&amp;]
               [loopDelays=100[,|;]200[,|;]300[,|;]400[,|;]500[,|;]1000[,|;]...]&amp;]
               [errorDelays=10000[,|;]20000[,|;]30000[,|;]60000[,|;]120000[,|;]...]
"/>

    mode={Direct|Reverse|ChangeSubQ} - режим работы очереди (если не указан, то используется Direct)
        - Direct - читать из DATA_IN
	    - Reverse - читать из DATA_OUT

    systemsFilter - фильтр, список систем источников сообщений, разделенный знаком запятая "," или точкой с запятой ";", которые требуется читать (обрабатывается параллельно).
                    Если не указан, то читаются сообщения от всех систем по мере поступления.
    typesFilter - фильтр, список идентификаторов типов сообщений, разделенный знаком запятая "," или точкой с запятой ";", которые требуется читать (обрабатывается последовательно).
                  Если не указан, то читаются сообщения всех типов по мере поступления.
    subQ - фильтр, список номеров подочередей, разделенный знаком запятая "," или точкой с запятой ";", которые требуется читать (обрабатывается параллельно).
                  Если не указан, то читается из подочереди по умолчанию с номером 0.
    initialDelay - задержка в мсек перед началом чтения сообщений сразу после старта.
                   Если не указан, то используется значние 60000 мсек (1 мин)
    loopDelays -  список задержек в мсек перед началом очередного цикла чтения сообщений в случае пустой очереди, разделенный знаком запятая "," или точкой с запятой ";".
                  В случае если в очереди есть сообщения задержки нет, если не указан, то используется задержка 100 мсек

                  В случае первой цикла происходит остановка обработки сообщений на первое значение мсек указанное в списке.
                  В случае второго цикла происходит остановка обработки сообщений на второе значение мсек указанное в списке, итд

                  Если, например, указано три значения, то на четвертый и более раз происходит остановка обработки сообщений
                  на последнее (третье) значение мсек указанное в списке.

                  Если, последним значением указано троеточие (...) , то на четвертый раз происходит остановка обработки сообщений
                  на первое значение мсек указанное в списке итд циклично.
    errorDelays - список задержек в мсек при возникновении ошибок обработки сообщения, разделенный знаком запятая "," или точкой с запятой ";".
                  Если не указан, то используется значение 60000 мсек (1 мин)

                  В случае первой ошибки происходит остановка обработки сообщений на первое значение мсек указанное в списке.
                  В случае второй ошибки происходит остановка обработки сообщений на второе значение мсек указанное в списке, итд

                  Если, например, указано три значения, то на четвертый и более раз происходит остановка обработки сообщений 
                  на последнее (третье) значение мсек указанное в списке.
                  
                  Если, последним значением указано троеточие (...) , то на четвертый раз происходит остановка обработки сообщений
                  на первое значение мсек указанное в списке итд циклично.
    skipExpired - признак, указывающий пропускать или обрабатывать сообщения с истекшим временем жизни, по умолчанию - true - игнорировать и сразу перекладывать в лог обработанных сообщений.
    expiredWarningText - текст предупреждения, которое записывается в лог обработанных сообщений, в случае игнорирования сообщения с истекшим временем жизни. По умолчанию "Message expired!"

    Чтение происходит в следующие переменные:
        Тело сообщения: ${body}

        Остальные данные:
    	header.ARINA-Q-InId
    	header.ARINA-Q-OutId
    	header.ARINA-Q-FromSystem
    	header.ARINA-Q-ToSystem
    	header.ARINA-Q-DataType
    	header.ARINA-Q-DataDate
        header.ARINA-Q-MetaInfo
        header.ARINA-Q-ExpireDate
        header.ARINA-Q-ReplaceId
        header.ARINA-Q-MsgId

Запись в очередь:

    <to uri="arina-q:{DATA_SOURCE}?
             [mode={Direct|Reverse|ChangeSubQ}&amp;]
             [messageBody={MESSAGE_BODY}&amp;]
             [fromSystem={SOURCE_SYSID}&amp;]
             [toSystem={DESTINATION_SYSID}&amp;]
             [dataType={MESSAGE_DATA_TYPE}&amp;]
             [subQ={SUB_Q_NO}&amp;]
             [metaInfo={ADDITIONAL_INFO}&amp;]
             [expireDate={EXPIRE_DATE}&amp;]
             [replaceId={REPLACE_ID}&amp;]
             [transId={TRANS_ID}&amp;]
             [transSeqNo={TRANS_SEQ_NO}&amp;]
             [transTTL={TRANS_TTL}&amp;]
             [sendTrans=true|false]
    "/>

    mode={Direct|Reverse|ChangeSubQ} - режим работы очереди (если не указан, то используется Direct)
        - Direct - писать в DATA_OUT
	    - Reverse - писать в DATA_IN
        - ChangeSubQ - переместить сообщенение из текущей подочереди в указанную, не изымая сообщение из очереди.

    messageBody - содержимое сообщения (может быть выражением на любом языке Camel).
                 Если не указано, то используется ${body}
    fromSystem - система источник для отсылаемого сообщения (может быть выражением на любом языке Camel).
                 Если не указан, то используется значение из header.ARINA-Q-FromSystem.
    toSystem - система назначения для отсылаемого сообщения (может быть выражением на любом языке Camel).
                 Если не указан, то используется значение из header.ARINA-Q-ToSystem.
    dataType - идентификатор типа отсылаемого сообщения (может быть выражением на любом языке Camel).
                 Если не указан, то используется значение из header.ARINA-Q-DataType.
    subQ - номеров подочереди, в которую требуется записать отсылаемое сообщение (только для режимлв работы Reverse или ChangeSubQ) (может быть выражением на любом языке Camel).
                 Если не указан, то используется значение из header.ARINA-Q-SubQ, а если нет такого, то равен 0.
    metaInfo - служебная информация сопровождающая основное сообщение (может быть выражением на любом языке Camel).
                 Если не указано, то используется ${header.ARINA-Q-MetaInfo}
    expireDate - время жизни сообщения. По умолчанию, сообщения с истекшим временем жизни сразу переносятся в лог обработанных сообщения, при этом в таблице ошибок остается предупреждение.
                 Формат yyyy-MM-dd HH:mm:ss.fff, например expireDate=date:now:yyyy-MM-dd HH:mm:ss или 2017-06-30 13:43:44.727 или language:groovy:java.time.OffsetDateTime.now().plusDays(1)
                 Если не указано, то используется ${header.ARINA-Q-ExpireDate}
    replaceId - идентификатор сообщения для его подмены во входящей очереди.
                 Использование идентификатора сообщения позволяет реализовать следующие задачи:
                     - замена ошибочного сообщения - система приемник никак не может обработать входящее сообщение, например из за ошибки.
                       Она сообщает системе отправителю об этом и система отправитель посылает скорректированное сообщение с таким же идентификатором сообщения,
                       при попадании во входящую очередь оно сразу же автоматически заменяет собой ошибочное, при этом ошибочное уходит в лог успешнообработанных сообщений DATA_IN_LOG,
                       а так же формируется запись в PROCESS_DATA_IN_LOG с кодом 0 и сообщением 'Message replaced!'
                     - обработка потока телеметрической/статистической информации, когда обработка каждого сообщения не важна, а система приемник не успевает за потоком входящих сообщений.
    transId - идентификатор группы сообщений - логической транзакции.
    transSeqNo - номер сообщения внутри логической транзакции.
    transTTL - время жизни логической транзакции в секундах.
    sendTrans - признак необходимости отсылки сообщений логической транзакции.
                По умолчанию - false, поместить сообщение в логическую транзакцию, но не отсылать.
                true - поместить последнее сообщение в логическую транзакцию и отослать все сообщения вхлдящие в нее.

arina-sq - конвейер (машина состояний), предназначен для многоэтапной параллельной обработки каждого сообщения прежде чем оно будет изъято c конвейера. Каждый этап характеризуется своим уникальным строковым идентификатором.

DATA_SOURCE - имя bean, с описанием Data Source.
SOURCE_SYSID - идентификатор системы источника сообщения (строка)
DESTINATION_SYSID - идентификатор системы назначения сообщения (строка)

Чтение с конвейера:

    <from uri="arina-sq:{DATA_SOURCE}?
               stage={STAGE_NAME}&amp;
               [systemsFilter={SOURCE_SYSID}&amp;]
               [delay=0&amp;]
               [autoincrement=true|false&amp;]
               [threads=1&amp;]
               [initialDelay=60000&amp;]
               [loopDelays=100[,|;]200[,|;]300[,|;]400[,|;]500[,|;]1000[,|;]...]&amp;]
               [errorDelays=10000[,|]30000[,|]60000[,|]10000[,|]30000[,|]60000[,|]...]
    "/>

    stage - идентификатор этапа обработки сообщения конвейером (строка, обязательное значение)
    systemsFilter - фильтр, система источник сообщений, которые требуется читать.
                    Если не указан, то читаются сообщения от всех систем по мере поступления.
    delay - задержка в мсек после окончания очередной итерации обработки сообщения текущем этапом конвейера и началом новой итерации.
            Если не указан, то используется значние 0 мсек (без задержки)
    autoincrement - увеличивать ли счетчик итераций обработке на текущем этапе обработки сообщения конвейером.
                    Если не указан, то используется значение (true).
    threads - каждое сообщение на конвейере готово обрабатываеться параллельно. Данный параметр указывает во сколько потоков возможна параллельная обработка сообщений.
                    Если не указан, то используется оден поток.
    initialDelay - задержка в мсек перед началом чтения сообщений сразу после старта.
                   Если не указан, то используется значние 60000 мсек (1 мин)
    loopDelays -  список задержек в мсек перед началом очередного цикла чтения сообщений в случае пустой очереди, разделенный знаком запятая "," или точкой с запятой ";".
                  В случае если в очереди есть сообщения задержки нет, если не указан, то используется задержка 100 мсек

                  В случае первой цикла происходит остановка обработки сообщений на первое значение мсек указанное в списке.
                  В случае второго цикла происходит остановка обработки сообщений на второе значение мсек указанное в списке, итд

                  Если, например, указано три значения, то на четвертый и более раз происходит остановка обработки сообщений
                  на последнее (третье) значение мсек указанное в списке.

                  Если, последним значением указано троеточие (...) , то на четвертый раз происходит остановка обработки сообщений
                  на первое значение мсек указанное в списке итд циклично.
    errorDelays - список задержек в мсек при возникновении ошибок обработки сообщения, разделенный знаком запятая "," или точкой с запятой ";".
                  Если не указан, то используется значение 60000 мсек (1 мин)

                  В случае первой ошибки происходит остановка обработки сообщений на первое значение мсек указанное в списке.
                  В случае второй ошибки происходит остановка обработки сообщений на второе значение мсек указанное в списке, итд

                  Если, например, указано три значения, то на четвертый и более раз происходит остановка обработки сообщений
                  на последнее (третье) значение мсек указанное в списке.

                  Если, последним значением указано троеточие (...) , то на четвертый раз происходит остановка обработки сообщений
                  на первое значение мсек указанное в списке итд циклично.

    Чтение происходит в следующие переменные:
        Тело сообщения: ${body}

        Остальные данные:
            header.ARINA-Q-RequestId
            header.ARINA-Q-RequestDate
            header.ARINA-Q-StageDate
            header.ARINA-Q-Stage
            header.ARINA-Q-Iteration
            header.ARINA-Q-DepId
            header.ARINA-Q-ParentDepId
            header.ARINA-Q-MetaInfo

Запись в конвейер:

	<to uri="arina-sq:{DATA_SOURCE}?
	         stage={STAGE_NAME}&amp;
                 [messageBody={MESSAGE_BODY}&amp;]
                 [delay=0&amp;]
                 [depId={BUSINESS_MESSAGE_ID}&amp;]
                 [parentDepId={PARENT_BUSINESS_MESSAGE_ID}&amp;]
                 [metaInfo={ADDITIONAL_INFO}&amp;]
                 [waitFinal=0&amp;]
                 [loopDelays=100[,|;]200[,|;]300[,|;]400[,|;]500[,|;]1000[,|;]...]&amp;]
                 [final=true|false]
	"/>

    stage - идентификатор этапа обработки сообщения конвейером (строка, обязательное значение) (может быть выражением на любом языке Camel)
    messageBody - содержимое сообщения (может быть выражением на любом языке Camel).
                  Если не указано, то используется ${body}
    delay - задержка в мсек после окончания очередной итерации обработки сообщения текущем этапом конвейера и началом первой итерации обработки новым этапом.
            Если не указан, то используется значние 0 мсек (без задержки)
    depId - собственный бизнес-идентификатор сообщения (может быть выражением на любом языке Camel).
                  Если не указан, то используется значение из header.ARINA-Q-DepId, а если нет такого, то равен null.
    parentDepId - список бизнес-идентификаторов сообщений от которых зависит обработка данного сообщения,
                  разделенный знаком запятая "," или точкой с запятой ";" (может быть выражением на любом языке Camel).
                  Если не указан, то используется значение из header.ARINA-Q-ParentDepId, а если нет такого, то равен null.
    metaInfo - служебная информация сопровождающая основное сообщение (может быть выражением на любом языке Camel).
                 Если не указано, то используется header.ARINA-Q-MetaInfo
    waitFinal - позволяет записать в конвейер сообщение и ждать указанное количество миллисекунд или окончания обработки сообщения на конвейере (что раньше произойдет).
                 При этом, в случае. если произошел таймаут header.ARINA-Q-WaitFinalTimeout будет равен true, а если результат получен, то false.
                 Если header.ARINA-Q-WaitFinalTimeout = false, то ${body} будет содержать обработанное финальным этапом сообщение.
    loopDelays - список задержек в мсек перед началом очередного цикла чтения сообщений в финальном статусе, разделенный знаком запятая "," или точкой с запятой ";".
                 Если не указан, то используется задержка 100 мсек

                 В случае первой цикла происходит остановка обработки сообщений на первое значение мсек указанное в списке.
                 В случае второго цикла происходит остановка обработки сообщений на второе значение мсек указанное в списке, итд

                 Если, например, указано три значения, то на четвертый и более раз происходит остановка обработки сообщений
                 на последнее (третье) значение мсек указанное в списке.

                 Если, последним значением указано троеточие (...) , то на четвертый раз происходит остановка обработки сообщений
                 на первое значение мсек указанное в списке итд циклично.
    final - признак, что сообщение полностью обработано и его надо изъять с конвейера.

    Конвейер подразумевает, что каждое сообщение полностью независимо от остальных и может обрабатываться параллельно наряду с другими сообщениями.
    Но это не всегда так, при помещении сообщения на конвейер можно указать бизнес-идентификатор (строка до 100 символов) сообщения, а так же список
    бизнес-идентификаторов сообщений (строка до 4000 символов) от которых зависит его обработка.
    Таким образом можно добиться отсрочки начала обработки сообщения конвейером до полного окончания обработки всех сообщений от которых зависит его обработка.
    При этом зависимость устанавливается только между сообщениями помещенными на конвейер до вставки текущего сообщения. Вставленные после этого сообщения
    с такими же бизнес-идентификаторами на уже установленную зависимость не влияют.

About

ARINA-Q - Очередь собщений и конвейер (машина состояний) на основе СУБД.

Resources

Stars

Watchers

Forks

Packages

No packages published