From 2f660a80c560df05dc6912432fcf4999b2189cec Mon Sep 17 00:00:00 2001 From: Mararok Date: Wed, 29 Jan 2020 09:12:11 +0100 Subject: [PATCH] extend standard message sender to support JsonSerializable (#13) --- .../Client/DI/Amqp/AmqpDefinitionsBase.php | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/src/SAREhub/Client/DI/Amqp/AmqpDefinitionsBase.php b/src/SAREhub/Client/DI/Amqp/AmqpDefinitionsBase.php index 1dc82ff..d34a5e9 100644 --- a/src/SAREhub/Client/DI/Amqp/AmqpDefinitionsBase.php +++ b/src/SAREhub/Client/DI/Amqp/AmqpDefinitionsBase.php @@ -56,17 +56,37 @@ protected static function channelWrapperDef() protected static function messageSenderDef() { - return ProcessorDefinitionHelper::pipeline([ - ProcessorDefinitionHelper::headerAppender([ - AmqpMessageHeaders::EXCHANGE => static::getMessageSenderExchange(), - ]), - ProcessorDefinitionHelper::filter(function (Exchange $exchange) { - return $exchange->getIn()->getBody() instanceof Event; - }, ProcessorDefinitionHelper::pipeline([ - ProcessorDefinitionHelper::marshal(get(RawEventDataFormat::class)), - ProcessorDefinitionHelper::marshal(get(JsonDataFormat::class)) - ])), + return ProcessorDefinitionHelper::filter(function (Exchange $exchange) { + return !empty($exchange->getInBody()); + }, ProcessorDefinitionHelper::pipeline([ + self::appendMessageSenderExchangeHeaderDef(), + self::bodyAsEventFilterDef(), + self::bodyAsArrayOrJsonSerializableFilterDef(), get(AmqpProducer::class) + ])); + + } + + private static function bodyAsEventFilterDef() + { + return ProcessorDefinitionHelper::filter(function (Exchange $exchange) { + return $exchange->getIn()->getBody() instanceof Event; + }, ProcessorDefinitionHelper::pipeline([ + ProcessorDefinitionHelper::marshal(get(RawEventDataFormat::class)) + ])); + } + + private static function bodyAsArrayOrJsonSerializableFilterDef() + { + return ProcessorDefinitionHelper::filter(function (Exchange $exchange) { + return is_array($exchange->getInBody()) || $exchange->getInBody() instanceof \JsonSerializable; + }, ProcessorDefinitionHelper::marshal(get(JsonDataFormat::class))); + } + + protected static function appendMessageSenderExchangeHeaderDef() + { + return ProcessorDefinitionHelper::headerAppender([ + AmqpMessageHeaders::EXCHANGE => static::getMessageSenderExchange(), ]); }