Skip to content

Commit

Permalink
extend standard message sender to support JsonSerializable (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mararok authored Jan 29, 2020
1 parent f14f330 commit 2f660a8
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions src/SAREhub/Client/DI/Amqp/AmqpDefinitionsBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,37 @@ protected static function channelWrapperDef()

protected static function messageSenderDef()
{
return ProcessorDefinitionHelper::pipeline([
ProcessorDefinitionHelper::headerAppender([
AmqpMessageHeaders::EXCHANGE => static::getMessageSenderExchange(),
]),
ProcessorDefinitionHelper::filter(function (Exchange $exchange) {
return $exchange->getIn()->getBody() instanceof Event;
}, ProcessorDefinitionHelper::pipeline([
ProcessorDefinitionHelper::marshal(get(RawEventDataFormat::class)),
ProcessorDefinitionHelper::marshal(get(JsonDataFormat::class))
])),
return ProcessorDefinitionHelper::filter(function (Exchange $exchange) {
return !empty($exchange->getInBody());
}, ProcessorDefinitionHelper::pipeline([
self::appendMessageSenderExchangeHeaderDef(),
self::bodyAsEventFilterDef(),
self::bodyAsArrayOrJsonSerializableFilterDef(),
get(AmqpProducer::class)
]));

}

private static function bodyAsEventFilterDef()
{
return ProcessorDefinitionHelper::filter(function (Exchange $exchange) {
return $exchange->getIn()->getBody() instanceof Event;
}, ProcessorDefinitionHelper::pipeline([
ProcessorDefinitionHelper::marshal(get(RawEventDataFormat::class))
]));
}

private static function bodyAsArrayOrJsonSerializableFilterDef()
{
return ProcessorDefinitionHelper::filter(function (Exchange $exchange) {
return is_array($exchange->getInBody()) || $exchange->getInBody() instanceof \JsonSerializable;
}, ProcessorDefinitionHelper::marshal(get(JsonDataFormat::class)));
}

protected static function appendMessageSenderExchangeHeaderDef()
{
return ProcessorDefinitionHelper::headerAppender([
AmqpMessageHeaders::EXCHANGE => static::getMessageSenderExchange(),
]);
}

Expand Down

0 comments on commit 2f660a8

Please sign in to comment.