Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for amqp payload compress/decompress #119

Merged
merged 18 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
require 'event_source/event'
require 'event_source/subscriber'
require 'event_source/operations/codec64'
require 'event_source/operations/mime_encode'
require 'event_source/operations/mime_decode'
require 'event_source/operations/create_message'
require 'event_source/operations/fetch_session'
require 'event_source/operations/build_message_options'
Expand Down
2 changes: 2 additions & 0 deletions lib/event_source/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ class Error < StandardError
ServerConfigurationNotFound = Class.new(Error)
ServerConfigurationInvalid = Class.new(Error)
MessageBuildError = Class.new(Error)
PayloadEncodeError = Class.new(Error)
PayloadDecodeError = Class.new(Error)
end
end
80 changes: 80 additions & 0 deletions lib/event_source/operations/mime_decode.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# frozen_string_literal: true

require "dry/monads"
require "dry/monads/do"

module EventSource
module Operations
# Operation for decoding payloads, including decompression using Zlib.
class MimeDecode
include Dry::Monads[:result, :do]

# Supported MIME types for decoding.
MIME_TYPES = %w[application/zlib application/json].freeze

# Decodes the payload based on the specified MIME type.
# For example, decompresses the payload using Zlib for 'application/zlib'.
#
# @param mime_type [String] the MIME type of the payload (e.g., 'application/zlib', 'application/json')
# @param payload [String] the encoded payload to decode
#
# @return [Dry::Monads::Success<String>] if decoding is successful
# @return [Dry::Monads::Failure<String>] if an error occurs (e.g., invalid MIME type, decoding failure)
def call(mime_type, payload)
valid_payload = yield validate_payload(payload, mime_type)
decoded_data = yield decode(valid_payload, mime_type)

Success(decoded_data)
end

private

# Validates the payload based on the MIME type.
# Ensures the payload is binary-encoded for 'application/zlib' MIME type.
#
# @param payload [String] the payload to validate
# @param mime_type [String] the MIME type of the payload
#
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
def validate_payload(payload, mime_type)
unless MIME_TYPES.include?(mime_type.to_s)
return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.")
end

if mime_type.to_s == 'application/zlib' && !binary_payload?(payload)
return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.")
end

Success(payload)
end

# Decodes the payload using the specified MIME type.
# For 'application/zlib', it decompresses the payload using Zlib.
#
# @param payload [String] the payload to decode
# @param mime_type [String] the MIME type of the payload
#
# @return [Dry::Monads::Success<String>] if decoding is successful
# @return [Dry::Monads::Failure<String>] if decoding fails
def decode(payload, mime_type)
decoded_data = Zlib.inflate(payload) if mime_type.to_s == 'application/zlib'

Success(decoded_data || payload)
rescue Zlib::Error => e
Failure("Failed to decode payload using Zlib: #{e.message}")
end

# Checks whether the payload is binary-encoded.
#
# @param payload [String] the payload to check
#
# @return [Boolean] true if the payload is binary-encoded, false otherwise
def binary_payload?(payload)
return false unless payload.respond_to?(:encoding)

payload.encoding == Encoding::BINARY
end
end
end
end
87 changes: 87 additions & 0 deletions lib/event_source/operations/mime_encode.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true

require "dry/monads"
require "dry/monads/do"

module EventSource
module Operations
# Operation for encoding payloads into specified MIME types.
# For example, it supports compression using Zlib for 'application/zlib'.
class MimeEncode
include Dry::Monads[:result, :do]
include EventSource::Logging

# Supported MIME types for encoding.
MIME_TYPES = %w[application/zlib application/json].freeze

# Encodes the given payload into the specified MIME type.
# For example, compresses the payload using Zlib for 'application/zlib'.
#
# @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json')
# @param payload [String, Hash] the payload to encode; must be a Hash or String
#
# @return [Dry::Monads::Success<String>] if encoding is successful
# @return [Dry::Monads::Failure<String>] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure)
def call(mime_type, payload)
json_payload = yield validate_payload(payload, mime_type)
encoded_data = yield encode(json_payload, mime_type)

Success(encoded_data)
end

private

# Validates the payload and MIME type before encoding.
# Ensures the MIME type is supported and the payload is either a Hash or a String.
#
# @param payload [String, Hash] the payload to validate
# @param mime_type [String] the MIME type for encoding
#
# @return [Dry::Monads::Success<String>] if the payload and MIME type are valid
# @return [Dry::Monads::Failure<String>] if the MIME type is unsupported or the payload is invalid
def validate_payload(payload, mime_type)
unless MIME_TYPES.include?(mime_type.to_s)
return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.")
end

unless payload.is_a?(Hash) || payload.is_a?(String)
return Failure("Invalid payload type. Expected a Hash or String, but received #{payload.class}.")
end

Success(payload.is_a?(Hash) ? payload.to_json : payload)
end

# Encodes the payload based on the MIME type.
# For 'application/zlib', compresses the payload using Zlib.
# Logs the original and encoded payload sizes for debugging.
#
# @param json_payload [String] the JSON stringified payload to encode
# @param mime_type [String] the MIME type for encoding
#
# @return [Dry::Monads::Success<String>] if encoding is successful
# @return [Dry::Monads::Failure<String>] if encoding fails
def encode(json_payload, mime_type)
encoded_data = Zlib.deflate(json_payload) if mime_type.to_s == 'application/zlib'

logger.debug "*" * 80
logger.debug "Starting payload encoding for MIME type: '#{mime_type}'"
logger.debug "Original payload size: #{data_size_in_kb(json_payload)} KB"
logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" if encoded_data
logger.debug "*" * 80

Success(encoded_data || json_payload)
rescue Zlib::Error => e
Failure("Failed to compress payload using Zlib: #{e.message}")
end

# Calculates the size of the data in kilobytes (KB).
#
# @param data [String] the data whose size is to be calculated
#
# @return [Float] the size of the data in KB, rounded to two decimal places
def data_size_in_kb(data)
(data.bytesize / 1024.0).round(2)
end
end
end
end
11 changes: 10 additions & 1 deletion lib/event_source/protocols/amqp/bunny_exchange_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def publish(payload:, publish_bindings:, headers: {})
bunny_publish_bindings[:headers] = headers unless headers.empty?

logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}"
@subject.publish(payload.to_json, bunny_publish_bindings)

payload = payload.to_json unless is_binary?(payload)
@subject.publish(payload, bunny_publish_bindings)

logger.debug "BunnyExchange#publish published message: #{payload}"
logger.debug "BunnyExchange#publish published message to exchange: #{@subject.name}"
end
Expand All @@ -67,6 +70,12 @@ def message_id
SecureRandom.uuid
end

def is_binary?(payload)
return false unless payload.respond_to?(:encoding)

payload.encoding == Encoding::BINARY
end

# Filtering and renaming AsyncAPI Operation bindings to Bunny/RabitMQ
# bindings
#
Expand Down
29 changes: 27 additions & 2 deletions lib/event_source/protocols/amqp/bunny_queue_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class BunnyQueueProxy
# @return [Bunny::Queue]
def initialize(channel_proxy, async_api_channel_item)
@channel_proxy = channel_proxy
@async_api_channel_item = async_api_channel_item
bindings = async_api_channel_item.bindings
@consumers = []

Expand Down Expand Up @@ -134,7 +135,7 @@ def on_receive_message(

subscriber = subscriber_klass.new
subscriber.channel = @subject.channel

payload = decode_payload(payload)
subscription_handler =
EventSource::Protocols::Amqp::BunnyConsumerHandler.new(
subscriber,
Expand All @@ -143,14 +144,38 @@ def on_receive_message(
payload,
&executable
)

subscription_handler.run
rescue Bunny::Exception => e
logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}"
ensure
subscriber = nil
end

# Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings.
#
# For example, if `contentEncoding` is set to `application/zlib`, the payload will be decompressed using zlib.
# If no `contentEncoding` is provided, the payload will be returned unchanged.
#
# @param payload [String] The payload to be decoded.
# @return [String] The decoded payload, or the original payload if no encoding is specified.
# @raise [EventSource::Error::PayloadDecodeError] if the decoding process fails.
def decode_payload(payload)
async_api_subscribe_operation = @async_api_channel_item.subscribe
return payload unless async_api_subscribe_operation.message

message_bindings = async_api_subscribe_operation.message['bindings']
encoding = message_bindings.first[1]['contentEncoding'] if message_bindings
return payload unless encoding

output = EventSource::Operations::MimeDecode.new.call(encoding, payload)
if output.success?
output.value!
else
logger.error "Failed to decompress message \n due to: #{output.failure}"
raise EventSource::Error::PayloadDecodeError, output.failure
end
end

def find_executable(subscriber_klass, delivery_info)
subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass)

Expand Down
31 changes: 31 additions & 0 deletions lib/event_source/publish_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module EventSource
# Publish {EventSource::Event} messages
class PublishOperation
include EventSource::Logging

# @attr_reader [EventSource::Channel] channel the channel instance used by
# this PublishOperation
# @attr_reader [Object] subject instance of the protocol's publish class
Expand All @@ -26,11 +28,40 @@ def initialize(channel, publish_proxy, async_api_publish_operation)
# @example
# #publish("Message", :headers => { })
def call(payload, options = {})
payload = encode_payload(payload)
@subject.publish(
payload: payload,
publish_bindings: @async_api_publish_operation[:bindings],
headers: options[:headers] || {}
)
end

# Encodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_publish.yml message bindings.
#
# For example, if `contentEncoding` is set to `application/zlib`, the payload will be compressed using zlib.
# If no `contentEncoding` is provided, the payload will be returned as-is without modification.
#
# Note:
# - Encoding is not needed for the HTTP protocol, as encoding is handled at the server level.
# - For other protocols like AMQP, encoding is supported to ensure proper message transmission.
#
# @param payload [String, Hash] The payload to be encoded.
# @return [String] The encoded payload, or the original payload if no encoding is specified.
# @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails.
def encode_payload(payload)
return payload unless @async_api_publish_operation.message

message_bindings = @async_api_publish_operation.message['bindings']
encoding = message_bindings.first[1]['contentEncoding'] if message_bindings
return payload unless encoding

output = EventSource::Operations::MimeEncode.new.call(encoding, payload)
if output.success?
output.value!
else
logger.error "Failed to decompress message \n due to: #{output.failure}"
raise EventSource::Error::PayloadEncodeError, output.failure
end
end
end
end
56 changes: 56 additions & 0 deletions spec/event_source/operations/mime_decode_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

RSpec.describe EventSource::Operations::MimeDecode do
subject { described_class.new }

describe "#call" do
context "when the payload and mime type are valid" do
let(:payload) { { message: "Hello, World!" } }
let(:compressed_payload) { Zlib.deflate(payload.to_json) }
let(:mime_type) { "application/zlib" }

it "successfully decodes the payload" do
result = subject.call(mime_type, compressed_payload)

expect(result).to be_success
expect(result.value!).to eq(payload.to_json)
end
end

context "when the payload is not binary for application/zlib" do
let(:invalid_payload) { "Not binary data" }
let(:mime_type) { "application/zlib" }

it "returns a failure" do
result = subject.call(mime_type, invalid_payload)

expect(result).to be_failure
expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.")
end
end

context "when the mime type is invalid" do
let(:payload) { { message: "Hello, World!" }.to_json }
let(:mime_type) { "text/plain" }

it "returns a failure" do
result = subject.call(mime_type, payload)

expect(result).to be_failure
expect(result.failure).to eq("Invalid MIME type 'text/plain'. Supported types are: application/zlib, application/json.")
end
end

context "when decoding fails" do
let(:invalid_compressed_payload) { "Invalid compressed data" }
let(:mime_type) { "application/zlib" }

it "returns a failure with an error message" do
result = subject.call(mime_type, invalid_compressed_payload)

expect(result).to be_failure
expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.")
end
end
end
end
Loading
Loading